S1MdpSyncTransformService.cs 49 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858
  1. namespace Admin.NET.Plugin.AiDOP.Order;
  2. /// <summary>
  3. /// S1 首批 MDP 同步和标准化转换服务。
  4. /// </summary>
  5. public class S1MdpSyncTransformService : ITransient
  6. {
  7. private const string JobCode = "S1_MDP_SYNC_TRANSFORM";
  8. private readonly ISqlSugarClient _db;
  9. public S1MdpSyncTransformService(ISqlSugarClient db)
  10. {
  11. _db = db;
  12. }
  13. public async Task<S1MdpSyncTransformResult> RunFullAsync(CancellationToken cancellationToken = default, string triggerType = "AUTO")
  14. {
  15. cancellationToken.ThrowIfCancellationRequested();
  16. var now = DateTime.Now;
  17. var batchId = $"S1_MDP_FULL_{now:yyyyMMddHHmmss}";
  18. var runLogId = await InsertTransformRunLogAsync(batchId, now, triggerType);
  19. var result = new S1MdpSyncTransformResult { BatchId = batchId, RunLogId = runLogId };
  20. try
  21. {
  22. result.StageRows = await SyncStagingAsync(batchId, now, cancellationToken);
  23. result.StandardRows = await TransformStandardAsync(batchId, now, cancellationToken);
  24. result.DwdRows = await BuildDwdAsync(batchId, now, cancellationToken);
  25. result.KpiRows = await BuildS1KpiValuesAsync(now, cancellationToken);
  26. await MarkTransformRunSuccessAsync(runLogId, now, result);
  27. return result;
  28. }
  29. catch (Exception ex)
  30. {
  31. await MarkTransformRunFailedAsync(runLogId, now, ex.Message);
  32. throw;
  33. }
  34. }
  35. private async Task<int> SyncStagingAsync(string batchId, DateTime now, CancellationToken cancellationToken)
  36. {
  37. var total = 0;
  38. foreach (var entity in S1MdpEntityConfig.All)
  39. {
  40. cancellationToken.ThrowIfCancellationRequested();
  41. total += await SyncOneEntityAsync(entity, batchId, now);
  42. }
  43. return total;
  44. }
  45. private async Task<int> SyncOneEntityAsync(S1MdpEntityConfig entity, string batchId, DateTime now)
  46. {
  47. var entityRow = await _db.Ado.SqlQuerySingleAsync<S1MdpEntityRow>(
  48. "SELECT id AS Id, entity_name AS EntityName FROM mdp_entity WHERE tenant_id=0 AND entity_code=@EntityCode LIMIT 1",
  49. new SugarParameter("@EntityCode", entity.EntityCode));
  50. if (entityRow == null) throw Oops.Oh($"未找到 MDP 实体配置:{entity.EntityCode}");
  51. var columns = await _db.Ado.SqlQueryAsync<S1ColumnRow>(
  52. """
  53. SELECT COLUMN_NAME AS ColumnName
  54. FROM information_schema.COLUMNS
  55. WHERE TABLE_SCHEMA=DATABASE() AND TABLE_NAME=@TableName
  56. ORDER BY ORDINAL_POSITION
  57. """,
  58. new SugarParameter("@TableName", entity.SourceTable));
  59. if (columns.Count == 0) throw Oops.Oh($"未找到源表:{entity.SourceTable}");
  60. var names = columns.Select(u => u.ColumnName).ToList();
  61. var tenantExpr = BuildOptionalColumnExpr(names, "tenant_id", "0");
  62. var factoryExpr = BuildOptionalColumnExpr(names, "factory_id", "NULL");
  63. var companyExpr = BuildOptionalColumnExpr(names, "company_id", "NULL");
  64. var sourceRowExpr = names.Any(u => string.Equals(u, entity.SourceRowIdExpression, StringComparison.OrdinalIgnoreCase))
  65. ? $"s.`{FindColumn(names, entity.SourceRowIdExpression)}`"
  66. : entity.SourceRowIdExpression;
  67. var rawDataExpr = BuildJsonObjectExpression(names);
  68. var rowsRead = await _db.Ado.GetIntAsync($"SELECT COUNT(1) FROM `{entity.SourceTable}`");
  69. var logId = await InsertSyncLogAsync(entityRow.Id, entityRow.EntityName, batchId, rowsRead);
  70. var started = DateTime.Now;
  71. try
  72. {
  73. var affected = await _db.Ado.ExecuteCommandAsync(
  74. $"""
  75. INSERT INTO `{entity.TargetTable}`
  76. (tenant_id, factory_id, company_id, source_system, source_table, source_row_id, source_biz_key, sync_batch_id, sync_time, process_status, raw_data)
  77. SELECT
  78. {tenantExpr},
  79. {factoryExpr},
  80. {companyExpr},
  81. 'AIDOP',
  82. @SourceTable,
  83. CAST({sourceRowExpr} AS CHAR),
  84. CAST(COALESCE({entity.SourceBizKeyExpression}, CAST({sourceRowExpr} AS CHAR)) AS CHAR),
  85. @BatchId,
  86. @Now,
  87. 'PENDING',
  88. {rawDataExpr}
  89. FROM `{entity.SourceTable}` s
  90. ON DUPLICATE KEY UPDATE
  91. tenant_id=VALUES(tenant_id),
  92. factory_id=VALUES(factory_id),
  93. company_id=VALUES(company_id),
  94. source_row_id=VALUES(source_row_id),
  95. sync_batch_id=VALUES(sync_batch_id),
  96. sync_time=VALUES(sync_time),
  97. process_status=VALUES(process_status),
  98. raw_data=VALUES(raw_data),
  99. update_time=CURRENT_TIMESTAMP
  100. """,
  101. new SugarParameter("@SourceTable", entity.SourceTable),
  102. new SugarParameter("@BatchId", batchId),
  103. new SugarParameter("@Now", now));
  104. await MarkSyncLogSuccessAsync(logId, started, affected);
  105. return rowsRead;
  106. }
  107. catch (Exception ex)
  108. {
  109. await MarkSyncLogFailedAsync(logId, started, ex.Message);
  110. throw;
  111. }
  112. }
  113. private async Task<int> TransformStandardAsync(string batchId, DateTime now, CancellationToken cancellationToken)
  114. {
  115. var total = 0;
  116. foreach (var command in BuildStandardCommands(batchId, now))
  117. {
  118. cancellationToken.ThrowIfCancellationRequested();
  119. total += await _db.Ado.ExecuteCommandAsync(command.Sql, command.Parameters);
  120. }
  121. return total;
  122. }
  123. private async Task<int> BuildDwdAsync(string batchId, DateTime now, CancellationToken cancellationToken)
  124. {
  125. var total = 0;
  126. foreach (var command in BuildDwdCommands(batchId, now))
  127. {
  128. cancellationToken.ThrowIfCancellationRequested();
  129. total += await _db.Ado.ExecuteCommandAsync(command.Sql, command.Parameters);
  130. }
  131. return total;
  132. }
  133. private async Task<int> BuildS1KpiValuesAsync(DateTime now, CancellationToken cancellationToken)
  134. {
  135. var statDate = now.Date;
  136. var rows = await CalculateS1KpiValuesAsync(statDate);
  137. var affected = 0;
  138. foreach (var row in rows)
  139. {
  140. cancellationToken.ThrowIfCancellationRequested();
  141. affected += await UpsertS1KpiValueAsync(row, statDate, now);
  142. }
  143. return affected;
  144. }
  145. private async Task<List<S1KpiCalcRow>> CalculateS1KpiValuesAsync(DateTime statDate)
  146. {
  147. return await _db.Ado.SqlQueryAsync<S1KpiCalcRow>(
  148. """
  149. SELECT tenant_id AS TenantId, COALESCE(NULLIF(factory_id, 0), 1) AS FactoryId,
  150. 'S1_L1_001' AS MetricCode,
  151. ROUND(AVG(TIMESTAMPDIFF(HOUR, order_date, COALESCE(promised_delivery_date, capacity_date, material_ready_date, plan_delivery_date)) / 24), 4) AS MetricValue
  152. FROM mdp_std_so
  153. WHERE order_date IS NOT NULL
  154. AND COALESCE(promised_delivery_date, capacity_date, material_ready_date, plan_delivery_date) IS NOT NULL
  155. AND COALESCE(promised_delivery_date, capacity_date, material_ready_date, plan_delivery_date) >= order_date
  156. GROUP BY tenant_id, COALESCE(NULLIF(factory_id, 0), 1)
  157. UNION ALL
  158. SELECT tenant_id AS TenantId, COALESCE(NULLIF(factory_id, 0), 1) AS FactoryId,
  159. 'S1_L1_002' AS MetricCode,
  160. ROUND(100 * SUM(CASE WHEN TIMESTAMPDIFF(HOUR, order_date, COALESCE(promised_delivery_date, capacity_date, material_ready_date, plan_delivery_date)) <= 72 THEN 1 ELSE 0 END) / COUNT(1), 4) AS MetricValue
  161. FROM mdp_std_so
  162. WHERE order_date IS NOT NULL
  163. AND COALESCE(promised_delivery_date, capacity_date, material_ready_date, plan_delivery_date) IS NOT NULL
  164. GROUP BY tenant_id, COALESCE(NULLIF(factory_id, 0), 1)
  165. UNION ALL
  166. SELECT tenant_id AS TenantId, COALESCE(NULLIF(factory_id, 0), 1) AS FactoryId,
  167. 'S1_L1_003' AS MetricCode,
  168. ROUND(COUNT(1) / GREATEST(COUNT(DISTINCT NULLIF(planner_no, '')), 1), 4) AS MetricValue
  169. FROM mdp_std_so
  170. WHERE order_date IS NOT NULL AND order_date <= @StatDate
  171. GROUP BY tenant_id, COALESCE(NULLIF(factory_id, 0), 1)
  172. UNION ALL
  173. SELECT tenant_id AS TenantId, COALESCE(NULLIF(factory_id, 0), 1) AS FactoryId,
  174. 'S1_L2_010' AS MetricCode,
  175. ROUND(AVG(TIMESTAMPDIFF(HOUR, order_date, COALESCE(promised_delivery_date, capacity_date, material_ready_date, plan_delivery_date)) / 24), 4) AS MetricValue
  176. FROM mdp_std_so
  177. WHERE order_date IS NOT NULL
  178. AND COALESCE(promised_delivery_date, capacity_date, material_ready_date, plan_delivery_date) IS NOT NULL
  179. AND COALESCE(promised_delivery_date, capacity_date, material_ready_date, plan_delivery_date) >= order_date
  180. GROUP BY tenant_id, COALESCE(NULLIF(factory_id, 0), 1)
  181. UNION ALL
  182. SELECT tenant_id AS TenantId, COALESCE(NULLIF(factory_id, 0), 1) AS FactoryId,
  183. 'S1_L2_011' AS MetricCode,
  184. ROUND(100 * SUM(CASE WHEN TIMESTAMPDIFF(HOUR, order_date, COALESCE(promised_delivery_date, capacity_date, material_ready_date, plan_delivery_date)) <= 72 THEN 1 ELSE 0 END) / COUNT(1), 4) AS MetricValue
  185. FROM mdp_std_so
  186. WHERE order_date IS NOT NULL
  187. AND COALESCE(promised_delivery_date, capacity_date, material_ready_date, plan_delivery_date) IS NOT NULL
  188. GROUP BY tenant_id, COALESCE(NULLIF(factory_id, 0), 1)
  189. UNION ALL
  190. SELECT tenant_id AS TenantId, COALESCE(NULLIF(factory_id, 0), 1) AS FactoryId,
  191. 'S1_L2_012' AS MetricCode,
  192. ROUND(COUNT(1) / GREATEST(COUNT(DISTINCT NULLIF(planner_no, '')), 1), 4) AS MetricValue
  193. FROM mdp_std_so
  194. WHERE order_date IS NOT NULL AND order_date <= @StatDate
  195. GROUP BY tenant_id, COALESCE(NULLIF(factory_id, 0), 1)
  196. """,
  197. new SugarParameter("@StatDate", statDate));
  198. }
  199. private async Task<int> UpsertS1KpiValueAsync(S1KpiCalcRow row, DateTime statDate, DateTime now)
  200. {
  201. var meta = await _db.Ado.SqlQuerySingleAsync<S1KpiMetaRow>(
  202. """
  203. SELECT MetricLevel, Direction, YellowThreshold, RedThreshold
  204. FROM ado_smart_ops_kpi_master
  205. WHERE TenantId=@TenantId AND ModuleCode='S1' AND MetricCode=@MetricCode AND IsEnabled=1
  206. LIMIT 1
  207. """,
  208. new SugarParameter("@TenantId", row.TenantId),
  209. new SugarParameter("@MetricCode", row.MetricCode));
  210. if (meta == null || row.MetricValue == null)
  211. return 0;
  212. var table = ResolveKpiValueTable(meta.MetricLevel);
  213. var current = await _db.Ado.SqlQuerySingleAsync<S1KpiValueRow>(
  214. $"""
  215. SELECT id AS Id, metric_value AS MetricValue, target_value AS TargetValue
  216. FROM {table}
  217. WHERE tenant_id=@TenantId AND factory_id=@FactoryId AND module_code='S1'
  218. AND metric_code=@MetricCode AND biz_date=@BizDate AND is_deleted=0
  219. ORDER BY id
  220. LIMIT 1
  221. """,
  222. new SugarParameter("@TenantId", row.TenantId),
  223. new SugarParameter("@FactoryId", row.FactoryId),
  224. new SugarParameter("@MetricCode", row.MetricCode),
  225. new SugarParameter("@BizDate", statDate));
  226. var prior = await _db.Ado.SqlQuerySingleAsync<S1KpiValueRow>(
  227. $"""
  228. SELECT id AS Id, metric_value AS MetricValue, target_value AS TargetValue
  229. FROM {table}
  230. WHERE tenant_id=@TenantId AND factory_id=@FactoryId AND module_code='S1'
  231. AND metric_code=@MetricCode AND biz_date<@BizDate AND is_deleted=0
  232. ORDER BY biz_date DESC, id DESC
  233. LIMIT 1
  234. """,
  235. new SugarParameter("@TenantId", row.TenantId),
  236. new SugarParameter("@FactoryId", row.FactoryId),
  237. new SugarParameter("@MetricCode", row.MetricCode),
  238. new SugarParameter("@BizDate", statDate));
  239. var actual = Math.Round(row.MetricValue.Value, 4);
  240. var target = current?.TargetValue ?? prior?.TargetValue ?? DefaultS1Target(row.MetricCode);
  241. var status = ResolveKpiStatus(actual, target, meta.Direction, meta.YellowThreshold, meta.RedThreshold);
  242. var trend = ResolveTrendFlag(actual, prior?.MetricValue);
  243. if (current != null)
  244. {
  245. return await _db.Ado.ExecuteCommandAsync(
  246. $"""
  247. UPDATE {table}
  248. SET metric_value=@MetricValue, target_value=@TargetValue, status_color=@StatusColor, trend_flag=@TrendFlag,
  249. is_active=1, status='ACTIVE', calc_time=@CalcTime, update_time=@CalcTime
  250. WHERE tenant_id=@TenantId AND factory_id=@FactoryId AND module_code='S1'
  251. AND metric_code=@MetricCode AND biz_date=@BizDate AND is_deleted=0
  252. """,
  253. new SugarParameter("@MetricValue", actual),
  254. new SugarParameter("@TargetValue", target),
  255. new SugarParameter("@StatusColor", status),
  256. new SugarParameter("@TrendFlag", trend),
  257. new SugarParameter("@CalcTime", now),
  258. new SugarParameter("@TenantId", row.TenantId),
  259. new SugarParameter("@FactoryId", row.FactoryId),
  260. new SugarParameter("@MetricCode", row.MetricCode),
  261. new SugarParameter("@BizDate", statDate));
  262. }
  263. var nextId = await _db.Ado.GetLongAsync($"SELECT COALESCE(MAX(id), 0) + 1 FROM {table}");
  264. return await _db.Ado.ExecuteCommandAsync(
  265. $"""
  266. INSERT INTO {table}
  267. (id, tenant_id, factory_id, status, biz_date, create_time, update_time, is_deleted, is_active,
  268. module_code, metric_code, metric_value, target_value, status_color, trend_flag, calc_time)
  269. VALUES
  270. (@Id, @TenantId, @FactoryId, 'ACTIVE', @BizDate, @CalcTime, @CalcTime, 0, 1,
  271. 'S1', @MetricCode, @MetricValue, @TargetValue, @StatusColor, @TrendFlag, @CalcTime)
  272. """,
  273. new SugarParameter("@Id", nextId),
  274. new SugarParameter("@TenantId", row.TenantId),
  275. new SugarParameter("@FactoryId", row.FactoryId),
  276. new SugarParameter("@BizDate", statDate),
  277. new SugarParameter("@CalcTime", now),
  278. new SugarParameter("@MetricCode", row.MetricCode),
  279. new SugarParameter("@MetricValue", actual),
  280. new SugarParameter("@TargetValue", target),
  281. new SugarParameter("@StatusColor", status),
  282. new SugarParameter("@TrendFlag", trend));
  283. }
  284. private IEnumerable<S1MdpSqlCommand> BuildStandardCommands(string batchId, DateTime now)
  285. {
  286. yield return Cmd(
  287. """
  288. INSERT INTO mdp_std_so
  289. (tenant_id, factory_id, company_id, source_system, order_id, order_entry_id, order_no, order_line, order_type,
  290. customer_id, customer_no, customer_name, customer_order_no, country, item_code, item_name, item_spec,
  291. map_number, map_name, bom_number, unit, order_qty, delivered_notice_qty, delivered_qty, price, tax_price,
  292. amount, total_amount, order_date, customer_request_date, plan_delivery_date, promised_delivery_date,
  293. capacity_date, material_ready_date, planner_no, planner_name, order_status, review_status, review_stage,
  294. flow_state, progress, urgent, closed, deleted_flag, source_table, source_row_id, source_biz_key, sync_batch_id, sync_time)
  295. SELECT
  296. COALESCE(e.tenant_id, h.tenant_id, 0),
  297. COALESCE(e.factory_id, h.factory_id),
  298. e.company_id,
  299. 'AIDOP',
  300. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.Id')) REGEXP '^-?[0-9]+$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.Id')) AS SIGNED) END,
  301. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.Id')) REGEXP '^-?[0-9]+$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.Id')) AS SIGNED) END,
  302. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.bill_no')), JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.bill_no')), e.source_biz_key),
  303. CAST(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.entry_seq')), JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.Id'))) AS CHAR),
  304. CAST(JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.order_type')) AS CHAR),
  305. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.custom_id')) REGEXP '^-?[0-9]+$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.custom_id')) AS SIGNED) END,
  306. JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.custom_no')),
  307. JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.custom_name')),
  308. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.custom_order_bill_no')), JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.bill_from'))),
  309. JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.country')),
  310. JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.item_number')),
  311. JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.item_name')),
  312. JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.specification')),
  313. JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.map_number')),
  314. JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.map_name')),
  315. JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.bom_number')),
  316. JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.unit')),
  317. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.qty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.qty')) AS DECIMAL(18,6)) END,
  318. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.deliver_notice_count')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.deliver_notice_count')) AS DECIMAL(18,6)) END,
  319. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.deliver_count')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.deliver_count')) AS DECIMAL(18,6)) END,
  320. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.price')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.price')) AS DECIMAL(18,6)) END,
  321. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.tax_price')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.tax_price')) AS DECIMAL(18,6)) END,
  322. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.amount')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.amount')) AS DECIMAL(18,6)) END,
  323. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.total_amount')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.total_amount')) AS DECIMAL(18,6)) END,
  324. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.date')), 'null'), ''),
  325. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.rdate')), 'null'), ''),
  326. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.plan_date')), 'null'), ''),
  327. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.date')), 'null'), ''),
  328. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.sys_capacity_date')), 'null'), ''),
  329. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.sys_material_date')), 'null'), ''),
  330. JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.planner_no')),
  331. JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.planner_name')),
  332. CASE WHEN JSON_EXTRACT(h.raw_data,'$.closed') IN (1, true) THEN 'CLOSED' ELSE 'OPEN' END,
  333. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(r.raw_data,'$.FlowStatus')), JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.flowstate'))),
  334. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(r.raw_data,'$.CurrentDept')), JSON_UNQUOTE(JSON_EXTRACT(r.raw_data,'$.CurrentStage'))),
  335. JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.flowstate')),
  336. JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.progress')),
  337. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(COALESCE(e.raw_data, h.raw_data),'$.urgent')) REGEXP '^-?[0-9]+$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(COALESCE(e.raw_data, h.raw_data),'$.urgent')) AS SIGNED) END,
  338. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.closed')) REGEXP '^-?[0-9]+$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.closed')) AS SIGNED) END,
  339. COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(COALESCE(e.raw_data, h.raw_data),'$.IsDeleted')) REGEXP '^-?[0-9]+$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(COALESCE(e.raw_data, h.raw_data),'$.IsDeleted')) AS SIGNED) END, 0),
  340. e.source_table,
  341. e.source_row_id,
  342. e.source_biz_key,
  343. @BatchId,
  344. @Now
  345. FROM mdp_stg_so e
  346. LEFT JOIN mdp_stg_so h ON h.source_table='crm_seorder'
  347. AND JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.Id')) = JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.seorder_id'))
  348. LEFT JOIN mdp_stg_so r ON r.source_table='ado_contract_review'
  349. AND JSON_UNQUOTE(JSON_EXTRACT(r.raw_data,'$.BillNo')) = COALESCE(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.contract_no')), JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.bill_no')), JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.bill_no')))
  350. WHERE e.source_table='crm_seorderentry'
  351. AND IFNULL(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.bill_no')), JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.bill_no'))), '') <> ''
  352. ON DUPLICATE KEY UPDATE
  353. customer_no=VALUES(customer_no), customer_name=VALUES(customer_name), item_code=VALUES(item_code),
  354. item_name=VALUES(item_name), item_spec=VALUES(item_spec), order_qty=VALUES(order_qty),
  355. delivered_notice_qty=VALUES(delivered_notice_qty), delivered_qty=VALUES(delivered_qty),
  356. plan_delivery_date=VALUES(plan_delivery_date), promised_delivery_date=VALUES(promised_delivery_date),
  357. capacity_date=VALUES(capacity_date), material_ready_date=VALUES(material_ready_date),
  358. order_status=VALUES(order_status), review_status=VALUES(review_status), review_stage=VALUES(review_stage),
  359. flow_state=VALUES(flow_state), progress=VALUES(progress), deleted_flag=VALUES(deleted_flag),
  360. sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
  361. """, batchId, now);
  362. yield return Cmd(
  363. """
  364. INSERT INTO mdp_std_ship_trans
  365. (tenant_id, factory_id, company_id, source_system, trans_type, plan_id, plan_no, plan_line, order_id, order_entry_id,
  366. order_no, order_line, customer_no, customer_name, country, item_code, item_name, item_spec, qty, plan_qty,
  367. weight, volume, order_date, plan_ship_date, shipping_site, shipping_address, consignee, telephone,
  368. status, confirm_status, source_table, source_row_id, source_biz_key, sync_batch_id, sync_time)
  369. SELECT
  370. IFNULL(d.tenant_id, 0),
  371. d.factory_id,
  372. d.company_id,
  373. 'AIDOP',
  374. 'SHIP_PLAN',
  375. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.plan_id')) REGEXP '^-?[0-9]+$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.plan_id')) AS SIGNED) END,
  376. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.LotSerial')), CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.plan_id')) AS CHAR)),
  377. CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.RecID')) AS CHAR),
  378. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.seorder_id')) REGEXP '^-?[0-9]+$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.seorder_id')) AS SIGNED) END,
  379. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.sentry_id')) REGEXP '^-?[0-9]+$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.sentry_id')) AS SIGNED) END,
  380. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.bill_no')), JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.OrdNbr'))),
  381. CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.sentry_id')) AS CHAR),
  382. JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.CustomNo')),
  383. JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.CustomName')),
  384. JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Country')),
  385. JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.ItemNum')),
  386. JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.ItemName')),
  387. JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Specification')),
  388. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Qty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Qty')) AS DECIMAL(18,6)) END,
  389. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Qty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Qty')) AS DECIMAL(18,6)) END,
  390. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Weight')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Weight')) AS DECIMAL(18,6)) END,
  391. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Volume')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Volume')) AS DECIMAL(18,6)) END,
  392. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.OrdDate')), 'null'), ''),
  393. NULLIF(NULLIF(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.ShippingDate')), JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.CreateTime'))), 'null'), ''),
  394. JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.ShippingSite')),
  395. JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.ShippingAddress')),
  396. JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.Consignee')),
  397. JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.Telephone')),
  398. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Status')), JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.Status'))),
  399. CAST(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.IsConfirm')), JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.IsConfirm'))) AS CHAR),
  400. d.source_table,
  401. d.source_row_id,
  402. d.source_biz_key,
  403. @BatchId,
  404. @Now
  405. FROM mdp_stg_ship_trans d
  406. LEFT JOIN mdp_stg_ship_trans m ON m.source_table='ShippingPlan'
  407. AND JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.RecID')) = JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.plan_id'))
  408. WHERE d.source_table='ShippingPlanDetail'
  409. AND IFNULL(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.bill_no')), JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.OrdNbr'))), '') <> ''
  410. ON DUPLICATE KEY UPDATE
  411. plan_no=VALUES(plan_no), order_no=VALUES(order_no), customer_no=VALUES(customer_no), customer_name=VALUES(customer_name),
  412. item_code=VALUES(item_code), item_name=VALUES(item_name), qty=VALUES(qty), plan_qty=VALUES(plan_qty),
  413. plan_ship_date=VALUES(plan_ship_date), shipping_site=VALUES(shipping_site), shipping_address=VALUES(shipping_address),
  414. status=VALUES(status), confirm_status=VALUES(confirm_status), sync_batch_id=VALUES(sync_batch_id),
  415. sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
  416. """, batchId, now);
  417. yield return Cmd(
  418. """
  419. INSERT INTO mdp_std_ship_trans
  420. (tenant_id, factory_id, source_system, trans_type, shipper_rec_id, shipper_no, shipper_line, order_no, order_line,
  421. customer_no, item_code, item_name, qty_to_ship, picking_qty, real_qty, gross_weight, net_weight, volume,
  422. plan_ship_date, actual_ship_date, site, status, confirm_status, source_table, source_row_id, source_biz_key, sync_batch_id, sync_time)
  423. SELECT
  424. IFNULL(d.tenant_id, 0),
  425. d.factory_id,
  426. 'AIDOP',
  427. 'ASN_SHIPPER',
  428. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.ASNBOLShipperRecID')) REGEXP '^-?[0-9]+$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.ASNBOLShipperRecID')) AS SIGNED) END,
  429. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Id')), JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.Id'))),
  430. CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Line')) AS CHAR),
  431. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.OrdNbr')), JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.OrdNbr'))),
  432. CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.OrdLine')) AS CHAR),
  433. JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.SoldTo')),
  434. JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.ContainerItem')),
  435. JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Descr')),
  436. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.QtyToShip')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.QtyToShip')) AS DECIMAL(18,6)) END,
  437. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.PickingQty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.PickingQty')) AS DECIMAL(18,6)) END,
  438. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.RealQty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.RealQty')) AS DECIMAL(18,6)) END,
  439. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.GrossWeight')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.GrossWeight')) AS DECIMAL(18,6)) END,
  440. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.NetWeight')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.NetWeight')) AS DECIMAL(18,6)) END,
  441. CASE WHEN COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Volume')), JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.Volume'))) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Volume')), JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.Volume'))) AS DECIMAL(18,6)) END,
  442. NULLIF(NULLIF(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.ShipDate')), JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.ShipDate'))), 'null'), ''),
  443. NULLIF(NULLIF(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.ShipDate')), JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.ShipDate'))), 'null'), ''),
  444. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Site')), JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.Site'))),
  445. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Status')), JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.Status'))),
  446. CAST(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.IsConfirm')), JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.IsConfirm'))) AS CHAR),
  447. d.source_table,
  448. d.source_row_id,
  449. d.source_biz_key,
  450. @BatchId,
  451. @Now
  452. FROM mdp_stg_ship_trans d
  453. LEFT JOIN mdp_stg_ship_trans m ON m.source_table='ASNBOLShipperMaster'
  454. AND JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.RecID')) = JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.ASNBOLShipperRecID'))
  455. WHERE d.source_table='ASNBOLShipperDetail'
  456. AND IFNULL(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Id')), JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.Id'))), '') <> ''
  457. ON DUPLICATE KEY UPDATE
  458. shipper_no=VALUES(shipper_no), order_no=VALUES(order_no), order_line=VALUES(order_line),
  459. customer_no=VALUES(customer_no), item_code=VALUES(item_code), item_name=VALUES(item_name),
  460. qty_to_ship=VALUES(qty_to_ship), picking_qty=VALUES(picking_qty), real_qty=VALUES(real_qty),
  461. actual_ship_date=VALUES(actual_ship_date), site=VALUES(site), status=VALUES(status),
  462. confirm_status=VALUES(confirm_status), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time),
  463. update_time=CURRENT_TIMESTAMP
  464. """, batchId, now);
  465. }
  466. private IEnumerable<S1MdpSqlCommand> BuildDwdCommands(string batchId, DateTime now)
  467. {
  468. yield return Cmd(
  469. """
  470. INSERT INTO dwd_ship_trans
  471. (tenant_id, factory_id, company_id, stat_date, order_id, order_entry_id, order_no, order_line, customer_no,
  472. customer_name, country, item_code, item_name, item_spec, order_qty, planned_ship_qty, shipped_qty,
  473. remaining_qty, order_date, customer_request_date, plan_delivery_date, promised_delivery_date,
  474. plan_ship_date, actual_ship_date, review_status, order_status, delivery_status, linkage_status, risk_level,
  475. source_system, source_table, source_row_id, source_biz_key, sync_batch_id, calc_batch_id, calc_time)
  476. SELECT
  477. so.tenant_id,
  478. so.factory_id,
  479. so.company_id,
  480. @StatDate,
  481. so.order_id,
  482. so.order_entry_id,
  483. so.order_no,
  484. IFNULL(so.order_line, ''),
  485. so.customer_no,
  486. so.customer_name,
  487. so.country,
  488. IFNULL(so.item_code, ''),
  489. so.item_name,
  490. so.item_spec,
  491. IFNULL(so.order_qty, 0),
  492. IFNULL(p.plan_qty, 0),
  493. IFNULL(a.real_qty, 0),
  494. GREATEST(IFNULL(so.order_qty, 0) - IFNULL(a.real_qty, 0), 0),
  495. so.order_date,
  496. so.customer_request_date,
  497. so.plan_delivery_date,
  498. so.promised_delivery_date,
  499. p.plan_ship_date,
  500. a.actual_ship_date,
  501. so.review_status,
  502. so.order_status,
  503. CASE
  504. WHEN IFNULL(so.order_qty, 0) > 0 AND IFNULL(a.real_qty, 0) >= IFNULL(so.order_qty, 0) THEN 'COMPLETED'
  505. WHEN COALESCE(p.plan_ship_date, so.promised_delivery_date, so.plan_delivery_date) < @Now THEN 'DELAYED'
  506. WHEN IFNULL(p.plan_qty, 0) > 0 THEN 'PLANNED'
  507. ELSE 'OPEN'
  508. END,
  509. l.linkage_status,
  510. CASE
  511. WHEN COALESCE(p.plan_ship_date, so.promised_delivery_date, so.plan_delivery_date) < @Now
  512. AND IFNULL(a.real_qty, 0) < IFNULL(so.order_qty, 0) THEN 'HIGH'
  513. WHEN IFNULL(a.real_qty, 0) < IFNULL(so.order_qty, 0) THEN 'MEDIUM'
  514. ELSE 'LOW'
  515. END,
  516. 'AIDOP',
  517. so.source_table,
  518. so.source_row_id,
  519. so.source_biz_key,
  520. so.sync_batch_id,
  521. @BatchId,
  522. @Now
  523. FROM mdp_std_so so
  524. LEFT JOIN (
  525. SELECT tenant_id, order_no, IFNULL(order_line, '') AS order_line, IFNULL(item_code, '') AS item_code,
  526. SUM(IFNULL(plan_qty, IFNULL(qty, 0))) AS plan_qty,
  527. MIN(plan_ship_date) AS plan_ship_date
  528. FROM mdp_std_ship_trans
  529. WHERE trans_type='SHIP_PLAN'
  530. GROUP BY tenant_id, order_no, IFNULL(order_line, ''), IFNULL(item_code, '')
  531. ) p ON so.tenant_id=p.tenant_id AND so.order_no=p.order_no AND IFNULL(so.order_line, '')=p.order_line AND IFNULL(so.item_code, '')=p.item_code
  532. LEFT JOIN (
  533. SELECT tenant_id, order_no, IFNULL(order_line, '') AS order_line, IFNULL(item_code, '') AS item_code,
  534. SUM(IFNULL(real_qty, IFNULL(qty_to_ship, 0))) AS real_qty,
  535. MAX(actual_ship_date) AS actual_ship_date
  536. FROM mdp_std_ship_trans
  537. WHERE trans_type='ASN_SHIPPER'
  538. GROUP BY tenant_id, order_no, IFNULL(order_line, ''), IFNULL(item_code, '')
  539. ) a ON so.tenant_id=a.tenant_id AND so.order_no=a.order_no AND IFNULL(so.order_line, '')=a.order_line AND IFNULL(so.item_code, '')=a.item_code
  540. LEFT JOIN (
  541. SELECT tenant_id, order_no, item_code, MAX(linkage_status) AS linkage_status
  542. FROM mdp_std_ship_trans
  543. WHERE IFNULL(linkage_status, '') <> ''
  544. GROUP BY tenant_id, order_no, item_code
  545. ) l ON so.tenant_id=l.tenant_id AND so.order_no=l.order_no AND IFNULL(so.item_code, '')=IFNULL(l.item_code, '')
  546. WHERE IFNULL(so.order_no, '') <> ''
  547. ON DUPLICATE KEY UPDATE
  548. customer_no=VALUES(customer_no), customer_name=VALUES(customer_name), item_name=VALUES(item_name),
  549. order_qty=VALUES(order_qty), planned_ship_qty=VALUES(planned_ship_qty), shipped_qty=VALUES(shipped_qty),
  550. remaining_qty=VALUES(remaining_qty), plan_ship_date=VALUES(plan_ship_date), actual_ship_date=VALUES(actual_ship_date),
  551. review_status=VALUES(review_status), order_status=VALUES(order_status), delivery_status=VALUES(delivery_status),
  552. linkage_status=VALUES(linkage_status), risk_level=VALUES(risk_level), sync_batch_id=VALUES(sync_batch_id),
  553. calc_batch_id=VALUES(calc_batch_id), calc_time=VALUES(calc_time), update_time=CURRENT_TIMESTAMP
  554. """, batchId, now);
  555. }
  556. private async Task<long> InsertSyncLogAsync(long entityId, string entityName, string batchId, int rowsRead)
  557. {
  558. await _db.Ado.ExecuteCommandAsync(
  559. """
  560. INSERT INTO mdp_sync_log
  561. (tenant_id, entity_id, source_code, entity_name, sync_batch_id, sync_type, trigger_type, sync_start, rows_read, status)
  562. VALUES (0, @EntityId, 'AIDOPDEV_MYSQL', @EntityName, @BatchId, 'FULL', 'AUTO', NOW(), @RowsRead, 'RUNNING')
  563. """,
  564. new SugarParameter("@EntityId", entityId),
  565. new SugarParameter("@EntityName", entityName),
  566. new SugarParameter("@BatchId", batchId),
  567. new SugarParameter("@RowsRead", rowsRead));
  568. return await _db.Ado.GetLongAsync(
  569. "SELECT id FROM mdp_sync_log WHERE sync_batch_id=@BatchId AND entity_id=@EntityId ORDER BY id DESC LIMIT 1",
  570. new List<SugarParameter>
  571. {
  572. new("@BatchId", batchId),
  573. new("@EntityId", entityId)
  574. });
  575. }
  576. private async Task MarkSyncLogSuccessAsync(long logId, DateTime started, int affected)
  577. {
  578. await _db.Ado.ExecuteCommandAsync(
  579. """
  580. UPDATE mdp_sync_log
  581. SET sync_end=NOW(), duration_ms=@DurationMs, rows_insert=@RowsInsert, rows_update=0, rows_skip=0, rows_error=0, status='SUCCESS'
  582. WHERE id=@Id
  583. """,
  584. new SugarParameter("@DurationMs", (int)(DateTime.Now - started).TotalMilliseconds),
  585. new SugarParameter("@RowsInsert", affected),
  586. new SugarParameter("@Id", logId));
  587. }
  588. private async Task MarkSyncLogFailedAsync(long logId, DateTime started, string message)
  589. {
  590. try
  591. {
  592. await _db.Ado.ExecuteCommandAsync(
  593. """
  594. UPDATE mdp_sync_log
  595. SET sync_end=NOW(), duration_ms=@DurationMs, rows_error=1, status='FAILED', error_msg=@ErrorMsg
  596. WHERE id=@Id
  597. """,
  598. new SugarParameter("@DurationMs", (int)(DateTime.Now - started).TotalMilliseconds),
  599. new SugarParameter("@ErrorMsg", Truncate(message, 1000)),
  600. new SugarParameter("@Id", logId));
  601. }
  602. catch (Exception ex)
  603. {
  604. // 写库自身失败兜底:避免再抛掩盖原异常;遗留 RUNNING 行可由运维手动清理
  605. Console.Error.WriteLine($"[S1MdpSyncTransform] MarkSyncLogFailed write failed (syncLogId={logId}): {ex.Message}");
  606. }
  607. }
  608. private async Task<long> InsertTransformRunLogAsync(string batchId, DateTime startedAt, string triggerType)
  609. {
  610. await _db.Ado.ExecuteCommandAsync(
  611. """
  612. INSERT INTO mdp_transform_run_log
  613. (tenant_id, job_code, job_name, trigger_type, batch_id, status, start_time)
  614. VALUES (0, @JobCode, 'S1 MDP同步与标准化转换', @TriggerType, @BatchId, 'RUNNING', @StartTime)
  615. """,
  616. new SugarParameter("@JobCode", JobCode),
  617. new SugarParameter("@TriggerType", NormalizeTriggerType(triggerType)),
  618. new SugarParameter("@BatchId", batchId),
  619. new SugarParameter("@StartTime", startedAt));
  620. return await _db.Ado.GetLongAsync(
  621. "SELECT id FROM mdp_transform_run_log WHERE batch_id=@BatchId ORDER BY id DESC LIMIT 1",
  622. new List<SugarParameter> { new("@BatchId", batchId) });
  623. }
  624. private async Task MarkTransformRunSuccessAsync(long runLogId, DateTime startedAt, S1MdpSyncTransformResult result)
  625. {
  626. var finishedAt = DateTime.Now;
  627. await _db.Ado.ExecuteCommandAsync(
  628. """
  629. UPDATE mdp_transform_run_log
  630. SET status='SUCCESS', end_time=@EndTime, duration_ms=@DurationMs,
  631. stage_rows=@StageRows, standard_rows=@StandardRows, dwd_rows=@DwdRows,
  632. summary_json=@SummaryJson, update_time=CURRENT_TIMESTAMP
  633. WHERE id=@Id
  634. """,
  635. new SugarParameter("@EndTime", finishedAt),
  636. new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds),
  637. new SugarParameter("@StageRows", result.StageRows),
  638. new SugarParameter("@StandardRows", result.StandardRows),
  639. new SugarParameter("@DwdRows", result.DwdRows),
  640. new SugarParameter("@SummaryJson", BuildRunSummaryJson(result)),
  641. new SugarParameter("@Id", runLogId));
  642. }
  643. private async Task MarkTransformRunFailedAsync(long runLogId, DateTime startedAt, string message)
  644. {
  645. try
  646. {
  647. var finishedAt = DateTime.Now;
  648. await _db.Ado.ExecuteCommandAsync(
  649. """
  650. UPDATE mdp_transform_run_log
  651. SET status='FAILED', end_time=@EndTime, duration_ms=@DurationMs,
  652. error_message=@ErrorMessage, update_time=CURRENT_TIMESTAMP
  653. WHERE id=@Id
  654. """,
  655. new SugarParameter("@EndTime", finishedAt),
  656. new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds),
  657. new SugarParameter("@ErrorMessage", Truncate(message, 2000)),
  658. new SugarParameter("@Id", runLogId));
  659. }
  660. catch (Exception ex)
  661. {
  662. // 写库自身失败兜底(典型场景:远端 MySQL 瞬断导致 MarkFailed 自身也连不上):
  663. // 避免再抛二次异常掩盖原错;遗留 RUNNING 行可由运维手动清理。
  664. Console.Error.WriteLine($"[S1MdpSyncTransform] MarkTransformRunFailed write failed (runLogId={runLogId}): {ex.Message}");
  665. }
  666. }
  667. private static S1MdpSqlCommand Cmd(string sql, string batchId, DateTime now)
  668. {
  669. return new S1MdpSqlCommand(sql, new[]
  670. {
  671. new SugarParameter("@BatchId", batchId),
  672. new SugarParameter("@Now", now),
  673. new SugarParameter("@StatDate", now.Date)
  674. });
  675. }
  676. private static string BuildJsonObjectExpression(IEnumerable<string> columns)
  677. {
  678. var parts = columns.SelectMany(c => new[] { $"'{c.Replace("'", "''")}'", $"s.`{c}`" });
  679. return $"JSON_OBJECT({string.Join(",", parts)})";
  680. }
  681. private static string BuildOptionalColumnExpr(IReadOnlyCollection<string> columns, string expected, string fallback)
  682. {
  683. return columns.Any(u => string.Equals(u, expected, StringComparison.OrdinalIgnoreCase))
  684. ? $"s.`{FindColumn(columns, expected)}`"
  685. : fallback;
  686. }
  687. private static string FindColumn(IEnumerable<string> columns, string expected)
  688. {
  689. return columns.First(u => string.Equals(u, expected, StringComparison.OrdinalIgnoreCase));
  690. }
  691. private static string NormalizeTriggerType(string? triggerType)
  692. {
  693. return string.IsNullOrWhiteSpace(triggerType) ? "AUTO" : triggerType.Trim().ToUpperInvariant();
  694. }
  695. private static string BuildRunSummaryJson(S1MdpSyncTransformResult result)
  696. {
  697. return $$"""{"batchId":"{{result.BatchId}}","stageRows":{{result.StageRows}},"standardRows":{{result.StandardRows}},"dwdRows":{{result.DwdRows}},"kpiRows":{{result.KpiRows}}}""";
  698. }
  699. private static string ResolveKpiValueTable(int metricLevel)
  700. {
  701. return metricLevel switch
  702. {
  703. 1 => "ado_s9_kpi_value_l1_day",
  704. 2 => "ado_s9_kpi_value_l2_day",
  705. 3 => "ado_s9_kpi_value_l3_day",
  706. 4 => "ado_s9_kpi_value_l4_day",
  707. _ => "ado_s9_kpi_value_l2_day"
  708. };
  709. }
  710. private static decimal DefaultS1Target(string metricCode)
  711. {
  712. return metricCode switch
  713. {
  714. "S1_L1_001" or "S1_L2_010" => 3m,
  715. "S1_L1_002" or "S1_L2_011" => 95m,
  716. "S1_L1_003" or "S1_L2_012" => 100m,
  717. _ => 0m
  718. };
  719. }
  720. private static string ResolveKpiStatus(decimal actual, decimal target, string? direction, decimal? yellowThreshold, decimal? redThreshold)
  721. {
  722. if (target <= 0) return "gray";
  723. var ratio = actual / target * 100m;
  724. if (string.Equals(direction, "lower_is_better", StringComparison.OrdinalIgnoreCase))
  725. {
  726. if (actual <= target) return "green";
  727. if (ratio <= (yellowThreshold ?? 110m)) return "yellow";
  728. return ratio >= (redThreshold ?? 120m) ? "red" : "yellow";
  729. }
  730. if (actual >= target) return "green";
  731. if (ratio >= (yellowThreshold ?? 95m)) return "yellow";
  732. return ratio <= (redThreshold ?? 80m) ? "red" : "yellow";
  733. }
  734. private static string ResolveTrendFlag(decimal actual, decimal? previous)
  735. {
  736. if (previous == null) return "flat";
  737. if (actual > previous.Value) return "up";
  738. if (actual < previous.Value) return "down";
  739. return "flat";
  740. }
  741. private static string Truncate(string? raw, int maxLength)
  742. {
  743. if (string.IsNullOrEmpty(raw)) return string.Empty;
  744. return raw.Length <= maxLength ? raw : raw[..maxLength];
  745. }
  746. private sealed class S1ColumnRow
  747. {
  748. public string ColumnName { get; set; } = string.Empty;
  749. }
  750. private sealed class S1MdpEntityRow
  751. {
  752. public long Id { get; set; }
  753. public string EntityName { get; set; } = string.Empty;
  754. }
  755. private sealed class S1KpiCalcRow
  756. {
  757. public long TenantId { get; set; }
  758. public long FactoryId { get; set; }
  759. public string MetricCode { get; set; } = string.Empty;
  760. public decimal? MetricValue { get; set; }
  761. }
  762. private sealed class S1KpiMetaRow
  763. {
  764. public int MetricLevel { get; set; }
  765. public string Direction { get; set; } = "higher_is_better";
  766. public decimal? YellowThreshold { get; set; }
  767. public decimal? RedThreshold { get; set; }
  768. }
  769. private sealed class S1KpiValueRow
  770. {
  771. public long Id { get; set; }
  772. public decimal? MetricValue { get; set; }
  773. public decimal? TargetValue { get; set; }
  774. }
  775. }
  776. public sealed class S1MdpSyncTransformResult
  777. {
  778. public long RunLogId { get; set; }
  779. public string BatchId { get; set; } = string.Empty;
  780. public int StageRows { get; set; }
  781. public int StandardRows { get; set; }
  782. public int DwdRows { get; set; }
  783. public int KpiRows { get; set; }
  784. }
  785. internal sealed record S1MdpSqlCommand(string Sql, SugarParameter[] Parameters);
  786. internal sealed record S1MdpEntityConfig(
  787. string EntityCode,
  788. string SourceTable,
  789. string TargetTable,
  790. string SourceRowIdExpression,
  791. string SourceBizKeyExpression)
  792. {
  793. public static readonly IReadOnlyList<S1MdpEntityConfig> All = new List<S1MdpEntityConfig>
  794. {
  795. new("S1_SEORDER", "crm_seorder", "mdp_stg_so", "Id", "COALESCE(s.`bill_no`, CAST(s.`Id` AS CHAR))"),
  796. new("S1_SEORDER_ENTRY", "crm_seorderentry", "mdp_stg_so", "Id", "CONCAT(IFNULL(s.`bill_no`,''), ':', IFNULL(s.`entry_seq`, CAST(s.`Id` AS CHAR)))"),
  797. new("S1_SEORDER_CHANGE", "crm_seorder_change", "mdp_stg_so", "Id", "CONCAT(IFNULL(s.`bill_no`,''), ':', CAST(s.`Id` AS CHAR))"),
  798. new("S1_CONTRACT_REVIEW", "ado_contract_review", "mdp_stg_so", "RecID", "COALESCE(s.`BillNo`, CAST(s.`RecID` AS CHAR))"),
  799. new("S1_CONTRACT_REVIEW_FLOW", "ado_contract_review_flow", "mdp_stg_so", "RecID", "CONCAT(IFNULL(s.`ReviewBillNo`,''), ':', IFNULL(s.`StageNo`,''), ':', CAST(s.`RecID` AS CHAR))"),
  800. new("S1_SHIPPING_PLAN", "ShippingPlan", "mdp_stg_ship_trans", "RecID", "COALESCE(s.`LotSerial`, CAST(s.`RecID` AS CHAR))"),
  801. new("S1_SHIPPING_PLAN_DETAIL", "ShippingPlanDetail", "mdp_stg_ship_trans", "RecID", "CONCAT(IFNULL(s.`plan_id`,''), ':', IFNULL(s.`OrdNbr`,''), ':', CAST(s.`RecID` AS CHAR))"),
  802. new("S1_ASN_SHIPPER_MASTER", "ASNBOLShipperMaster", "mdp_stg_ship_trans", "RecID", "COALESCE(s.`Id`, CONCAT(IFNULL(s.`OrdNbr`,''), ':', CAST(s.`RecID` AS CHAR)))"),
  803. new("S1_ASN_SHIPPER_DETAIL", "ASNBOLShipperDetail", "mdp_stg_ship_trans", "RecID", "CONCAT(IFNULL(s.`Id`,''), ':', IFNULL(s.`Line`, CAST(s.`RecID` AS CHAR)))"),
  804. new("S1_LINKAGE_PLAN", "LinkagePlan", "mdp_stg_ship_trans", "id", "CONCAT(IFNULL(s.`bill_no`,''), ':', IFNULL(s.`item_number`,''), ':', CAST(s.`id` AS CHAR))")
  805. };
  806. }