S4MdpSyncTransformService.cs 48 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939
  1. using Admin.NET.Plugin.AiDOP.Infrastructure;
  2. namespace Admin.NET.Plugin.AiDOP.ProcurementExecution;
  3. /// <summary>
  4. /// S4 采购执行 MDP 转换:S4 专属 STG/STD + 消费 S3 共享 DWD,写入 dwd_s4_purchase_execution / dwd_po_trans 与 S4 KPI。
  5. /// </summary>
  6. public class S4MdpSyncTransformService : ITransient
  7. {
  8. private const string JobCode = "S4_MDP_SYNC_TRANSFORM";
  9. private readonly ISqlSugarClient _db;
  10. public S4MdpSyncTransformService(ISqlSugarClient db)
  11. {
  12. _db = db;
  13. }
  14. public async Task<S4MdpSyncTransformResult> RunFullAsync(CancellationToken cancellationToken = default, string triggerType = "AUTO")
  15. {
  16. cancellationToken.ThrowIfCancellationRequested();
  17. await EnsureS4TablesAsync();
  18. var now = DateTime.Now;
  19. var batchId = $"S4_MDP_FULL_{now:yyyyMMddHHmmss}";
  20. var runLogId = await InsertTransformRunLogAsync(batchId, now, triggerType);
  21. var result = new S4MdpSyncTransformResult { BatchId = batchId, RunLogId = runLogId };
  22. try
  23. {
  24. result.StageRows = await SyncStagingAsync(batchId, now, cancellationToken);
  25. result.StandardRows = await TransformStandardAsync(batchId, now, cancellationToken);
  26. result.DwdRows = await BuildDwdAsync(batchId, now, cancellationToken);
  27. result.KpiRows = await BuildS4KpiValuesAsync(now, cancellationToken);
  28. await MarkTransformRunSuccessAsync(runLogId, now, result);
  29. return result;
  30. }
  31. catch (Exception ex)
  32. {
  33. await MarkTransformRunFailedAsync(runLogId, now, ex.Message);
  34. throw;
  35. }
  36. }
  37. private async Task EnsureS4TablesAsync()
  38. {
  39. const string ddl = """
  40. CREATE TABLE IF NOT EXISTS mdp_stg_s4_iqc (
  41. id BIGINT AUTO_INCREMENT PRIMARY KEY,
  42. tenant_id BIGINT NOT NULL DEFAULT 0,
  43. source_system VARCHAR(50) NOT NULL DEFAULT 'AIDOP',
  44. source_table VARCHAR(100) NOT NULL,
  45. source_row_id VARCHAR(100) NOT NULL,
  46. source_biz_key VARCHAR(200) NULL,
  47. sync_batch_id VARCHAR(100) NOT NULL,
  48. sync_time DATETIME NOT NULL,
  49. process_status VARCHAR(20) NOT NULL DEFAULT 'PENDING',
  50. raw_data JSON NOT NULL,
  51. update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  52. UNIQUE KEY uk_mdp_stg_s4_iqc (tenant_id, source_table, source_row_id)
  53. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
  54. CREATE TABLE IF NOT EXISTS mdp_stg_s4_shipment LIKE mdp_stg_s4_iqc;
  55. CREATE TABLE IF NOT EXISTS mdp_stg_s4_return LIKE mdp_stg_s4_iqc;
  56. CREATE TABLE IF NOT EXISTS mdp_stg_s4_shortage LIKE mdp_stg_s4_iqc;
  57. CREATE TABLE IF NOT EXISTS mdp_std_s4_iqc (
  58. id BIGINT AUTO_INCREMENT PRIMARY KEY,
  59. tenant_id BIGINT NOT NULL DEFAULT 0,
  60. factory_id BIGINT NULL DEFAULT 1,
  61. source_system VARCHAR(50) NOT NULL DEFAULT 'AIDOP',
  62. po_no VARCHAR(50) NULL, po_line VARCHAR(50) NULL,
  63. supplier_code VARCHAR(50) NULL, item_code VARCHAR(50) NULL,
  64. receipt_qty DECIMAL(18,6) NULL DEFAULT 0,
  65. sample_qty DECIMAL(18,6) NULL DEFAULT 0,
  66. defect_qty DECIMAL(18,6) NULL DEFAULT 0,
  67. qc_result VARCHAR(20) NULL, receipt_date DATETIME NULL,
  68. source_biz_key VARCHAR(200) NULL,
  69. sync_batch_id VARCHAR(100) NOT NULL, sync_time DATETIME NOT NULL,
  70. update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  71. UNIQUE KEY uk_std_s4_iqc (tenant_id, source_biz_key)
  72. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
  73. CREATE TABLE IF NOT EXISTS mdp_std_s4_shipment (
  74. id BIGINT AUTO_INCREMENT PRIMARY KEY,
  75. tenant_id BIGINT NOT NULL DEFAULT 0,
  76. factory_id BIGINT NULL DEFAULT 1,
  77. source_system VARCHAR(50) NOT NULL DEFAULT 'AIDOP',
  78. shipment_no VARCHAR(50) NULL, po_no VARCHAR(50) NULL, po_line VARCHAR(50) NULL,
  79. supplier_code VARCHAR(50) NULL, item_code VARCHAR(50) NULL,
  80. ship_qty DECIMAL(18,6) NULL DEFAULT 0, ship_date DATETIME NULL,
  81. source_biz_key VARCHAR(200) NULL,
  82. sync_batch_id VARCHAR(100) NOT NULL, sync_time DATETIME NOT NULL,
  83. update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  84. UNIQUE KEY uk_std_s4_shipment (tenant_id, source_biz_key)
  85. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
  86. CREATE TABLE IF NOT EXISTS mdp_std_s4_return (
  87. id BIGINT AUTO_INCREMENT PRIMARY KEY,
  88. tenant_id BIGINT NOT NULL DEFAULT 0,
  89. factory_id BIGINT NULL DEFAULT 1,
  90. source_system VARCHAR(50) NOT NULL DEFAULT 'AIDOP',
  91. po_no VARCHAR(50) NULL, po_line VARCHAR(50) NULL,
  92. supplier_code VARCHAR(50) NULL, item_code VARCHAR(50) NULL,
  93. return_qty DECIMAL(18,6) NULL DEFAULT 0,
  94. return_reason VARCHAR(200) NULL, return_status VARCHAR(50) NULL,
  95. source_biz_key VARCHAR(200) NULL,
  96. sync_batch_id VARCHAR(100) NOT NULL, sync_time DATETIME NOT NULL,
  97. update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  98. UNIQUE KEY uk_std_s4_return (tenant_id, source_biz_key)
  99. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
  100. CREATE TABLE IF NOT EXISTS mdp_std_s4_shortage (
  101. id BIGINT AUTO_INCREMENT PRIMARY KEY,
  102. tenant_id BIGINT NOT NULL DEFAULT 0,
  103. factory_id BIGINT NULL DEFAULT 1,
  104. source_system VARCHAR(50) NOT NULL DEFAULT 'AIDOP',
  105. work_order VARCHAR(100) NULL, supplier_code VARCHAR(50) NULL, item_code VARCHAR(50) NULL,
  106. shortage_qty DECIMAL(18,6) NULL DEFAULT 0, risk_level VARCHAR(20) NULL, need_date DATETIME NULL,
  107. source_biz_key VARCHAR(200) NULL,
  108. sync_batch_id VARCHAR(100) NOT NULL, sync_time DATETIME NOT NULL,
  109. update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  110. UNIQUE KEY uk_std_s4_shortage (tenant_id, source_biz_key)
  111. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
  112. CREATE TABLE IF NOT EXISTS dwd_s4_purchase_execution (
  113. id BIGINT AUTO_INCREMENT PRIMARY KEY,
  114. tenant_id BIGINT NOT NULL, factory_id BIGINT NOT NULL DEFAULT 1, stat_date DATE NOT NULL,
  115. po_no VARCHAR(50) NULL, po_line VARCHAR(50) NULL,
  116. supplier_code VARCHAR(50) NULL, item_code VARCHAR(50) NULL,
  117. order_qty DECIMAL(12,3) NULL DEFAULT 0, delivery_qty DECIMAL(12,3) NULL DEFAULT 0,
  118. received_qty DECIMAL(12,3) NULL DEFAULT 0, returned_qty DECIMAL(12,3) NULL DEFAULT 0,
  119. shortage_qty DECIMAL(12,3) NULL DEFAULT 0,
  120. due_date DATE NULL, actual_arrival_date DATE NULL, risk_level VARCHAR(20) NULL,
  121. source_system VARCHAR(20) NULL DEFAULT 'AIDOP',
  122. sync_batch_id VARCHAR(100) NULL, sync_time DATETIME NULL,
  123. update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  124. UNIQUE KEY uk_dwd_s4_pe (tenant_id, stat_date, po_no, po_line, item_code),
  125. KEY idx_dwd_s4_pe_date (tenant_id, stat_date),
  126. KEY idx_dwd_s4_pe_supplier (tenant_id, supplier_code)
  127. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
  128. """;
  129. foreach (var statement in ddl.Split(';', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries))
  130. {
  131. if (statement.Length > 10)
  132. await _db.Ado.ExecuteCommandAsync(statement);
  133. }
  134. }
  135. private async Task<int> SyncStagingAsync(string batchId, DateTime now, CancellationToken cancellationToken)
  136. {
  137. var total = 0;
  138. foreach (var entity in S4MdpEntityConfig.All)
  139. {
  140. cancellationToken.ThrowIfCancellationRequested();
  141. total += await SyncOneEntityAsync(entity, batchId, now);
  142. }
  143. total += await CountSharedStagingRowsAsync();
  144. return total;
  145. }
  146. private async Task<int> CountSharedStagingRowsAsync()
  147. {
  148. try
  149. {
  150. return await _db.Ado.GetIntAsync(
  151. """
  152. SELECT IFNULL(SUM(cnt), 0) FROM (
  153. SELECT COUNT(1) AS cnt FROM mdp_stg_purchase_order
  154. UNION ALL SELECT COUNT(1) FROM mdp_stg_delivery
  155. UNION ALL SELECT COUNT(1) FROM mdp_stg_receipt
  156. ) t
  157. """);
  158. }
  159. catch
  160. {
  161. return 0;
  162. }
  163. }
  164. private async Task<int> SyncOneEntityAsync(S4MdpEntityConfig entity, string batchId, DateTime now)
  165. {
  166. var entityRow = await _db.Ado.SqlQuerySingleAsync<S4MdpEntityRow>(
  167. "SELECT id AS Id, entity_name AS EntityName FROM mdp_entity WHERE tenant_id=0 AND entity_code=@EntityCode LIMIT 1",
  168. new SugarParameter("@EntityCode", entity.EntityCode));
  169. if (entityRow == null)
  170. throw Oops.Oh($"未找到 MDP 实体配置:{entity.EntityCode},请先执行 1.0.154.sql 或启动迁移。");
  171. var tableExists = await _db.Ado.GetIntAsync(
  172. """
  173. SELECT COUNT(1) FROM information_schema.TABLES
  174. WHERE TABLE_SCHEMA=DATABASE() AND TABLE_NAME=@TableName
  175. """,
  176. new SugarParameter("@TableName", entity.SourceTable));
  177. if (tableExists == 0)
  178. {
  179. if (entity.Optional)
  180. return 0;
  181. throw Oops.Oh($"未找到 S4 源表:{entity.SourceTable}(实体 {entity.EntityCode})");
  182. }
  183. var columns = await _db.Ado.SqlQueryAsync<S4ColumnRow>(
  184. """
  185. SELECT COLUMN_NAME AS ColumnName
  186. FROM information_schema.COLUMNS
  187. WHERE TABLE_SCHEMA=DATABASE() AND TABLE_NAME=@TableName
  188. ORDER BY ORDINAL_POSITION
  189. """,
  190. new SugarParameter("@TableName", entity.SourceTable));
  191. if (columns.Count == 0)
  192. throw Oops.Oh($"未找到源表字段:{entity.SourceTable}");
  193. var names = columns.Select(u => u.ColumnName).ToList();
  194. var tenantExpr = names.Any(u => string.Equals(u, "tenant_id", StringComparison.OrdinalIgnoreCase))
  195. ? $"IFNULL(s.`{FindColumn(names, "tenant_id")}`,0)"
  196. : "0";
  197. var sourceRowExpr = names.Any(u => string.Equals(u, entity.SourceRowIdExpression, StringComparison.OrdinalIgnoreCase))
  198. ? $"s.`{FindColumn(names, entity.SourceRowIdExpression)}`"
  199. : entity.SourceRowIdExpression;
  200. var rawDataExpr = BuildJsonObjectExpression(names);
  201. var rowsRead = await _db.Ado.GetIntAsync($"SELECT COUNT(1) FROM `{entity.SourceTable}`");
  202. var logId = await InsertSyncLogAsync(entityRow.Id, entityRow.EntityName, batchId, rowsRead);
  203. var started = DateTime.Now;
  204. try
  205. {
  206. var affected = await _db.Ado.ExecuteCommandAsync(
  207. $"""
  208. INSERT INTO `{entity.TargetTable}`
  209. (tenant_id, source_system, source_table, source_row_id, source_biz_key, sync_batch_id, sync_time, process_status, raw_data)
  210. SELECT
  211. {tenantExpr},
  212. 'AIDOP',
  213. @SourceTable,
  214. CAST({sourceRowExpr} AS CHAR),
  215. CAST(COALESCE({entity.SourceBizKeyExpression}, CAST({sourceRowExpr} AS CHAR)) AS CHAR),
  216. @BatchId,
  217. @Now,
  218. 'PENDING',
  219. {rawDataExpr}
  220. FROM `{entity.SourceTable}` s
  221. ON DUPLICATE KEY UPDATE
  222. source_row_id=VALUES(source_row_id),
  223. sync_batch_id=VALUES(sync_batch_id),
  224. sync_time=VALUES(sync_time),
  225. process_status=VALUES(process_status),
  226. raw_data=VALUES(raw_data),
  227. update_time=CURRENT_TIMESTAMP
  228. """,
  229. new SugarParameter("@SourceTable", entity.SourceTable),
  230. new SugarParameter("@BatchId", batchId),
  231. new SugarParameter("@Now", now));
  232. await MarkSyncLogSuccessAsync(logId, started, affected);
  233. return rowsRead;
  234. }
  235. catch (Exception ex)
  236. {
  237. await MarkSyncLogFailedAsync(logId, started, ex.Message);
  238. if (entity.Optional) return 0;
  239. throw;
  240. }
  241. }
  242. private async Task<int> TransformStandardAsync(string batchId, DateTime now, CancellationToken cancellationToken)
  243. {
  244. var total = 0;
  245. foreach (var command in BuildStandardCommands(batchId, now))
  246. {
  247. cancellationToken.ThrowIfCancellationRequested();
  248. try
  249. {
  250. total += await _db.Ado.ExecuteCommandAsync(command.Sql, command.Parameters);
  251. }
  252. catch
  253. {
  254. // 标准层表可能尚未有贴源数据,跳过单条失败
  255. }
  256. }
  257. total += await CountSharedStandardRowsAsync();
  258. return total;
  259. }
  260. private async Task<int> CountSharedStandardRowsAsync()
  261. {
  262. try
  263. {
  264. return await _db.Ado.GetIntAsync(
  265. """
  266. SELECT IFNULL(SUM(cnt), 0) FROM (
  267. SELECT COUNT(1) AS cnt FROM mdp_std_purchase_order
  268. UNION ALL SELECT COUNT(1) FROM mdp_std_delivery_schedule
  269. UNION ALL SELECT COUNT(1) FROM mdp_std_delivery_result
  270. ) t
  271. """);
  272. }
  273. catch
  274. {
  275. return 0;
  276. }
  277. }
  278. private IEnumerable<S4MdpSqlCommand> BuildStandardCommands(string batchId, DateTime now)
  279. {
  280. yield return Cmd(
  281. """
  282. INSERT INTO mdp_std_s4_iqc
  283. (tenant_id, factory_id, source_system, po_no, po_line, supplier_code, item_code, receipt_qty, sample_qty, defect_qty, qc_result, receipt_date, source_biz_key, sync_batch_id, sync_time)
  284. SELECT tenant_id, 1, 'AIDOP',
  285. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.PurOrd')),
  286. CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Line')) AS CHAR),
  287. IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Supp')), ''),
  288. IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ItemNum')), ''),
  289. COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.QtyReceived')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.QtyReceived')) AS DECIMAL(18,6)) END, 0),
  290. COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.SampleQty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.SampleQty')) AS DECIMAL(18,6)) END, 0),
  291. COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.RejectQty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.RejectQty')) AS DECIMAL(18,6)) END, 0),
  292. CASE WHEN COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.RejectQty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.RejectQty')) AS DECIMAL(18,6)) END, 0) > 0 THEN 'FAIL' ELSE 'PASS' END,
  293. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.RcptDate')), 'null'), ''),
  294. source_biz_key, @BatchId, @Now
  295. FROM mdp_stg_s4_iqc
  296. WHERE source_table='PurOrdRctDetail'
  297. ON DUPLICATE KEY UPDATE receipt_qty=VALUES(receipt_qty), sample_qty=VALUES(sample_qty), defect_qty=VALUES(defect_qty),
  298. qc_result=VALUES(qc_result), receipt_date=VALUES(receipt_date), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
  299. """, batchId, now);
  300. yield return Cmd(
  301. """
  302. INSERT INTO mdp_std_s4_shipment
  303. (tenant_id, factory_id, source_system, shipment_no, po_no, po_line, supplier_code, item_code, ship_qty, ship_date, source_biz_key, sync_batch_id, sync_time)
  304. SELECT tenant_id, 1, 'AIDOP',
  305. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.shddh')), source_row_id),
  306. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.po_bill')),
  307. CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.po_billline')) AS CHAR),
  308. IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.suppliercode')), ''),
  309. IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.itemnum')), ''),
  310. COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.sh_delivery_quantity')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.sh_delivery_quantity')) AS DECIMAL(18,6)) END, 0),
  311. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.updatetime')), 'null'), ''),
  312. source_biz_key, @BatchId, @Now
  313. FROM mdp_stg_s4_shipment
  314. WHERE source_table='scm_shdzb'
  315. ON DUPLICATE KEY UPDATE ship_qty=VALUES(ship_qty), ship_date=VALUES(ship_date), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
  316. """, batchId, now);
  317. yield return Cmd(
  318. """
  319. INSERT INTO mdp_std_s4_return
  320. (tenant_id, factory_id, source_system, po_no, po_line, supplier_code, item_code, return_qty, return_reason, return_status, source_biz_key, sync_batch_id, sync_time)
  321. SELECT tenant_id, 1, 'AIDOP',
  322. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ponumber')),
  323. CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.poline')) AS CHAR),
  324. IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.suppliercode')), ''),
  325. IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.itemnum')), ''),
  326. COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.returnqty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.returnqty')) AS DECIMAL(18,6)) END, 0),
  327. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.remark')),
  328. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.status')),
  329. source_biz_key, @BatchId, @Now
  330. FROM mdp_stg_s4_return
  331. WHERE source_table='srm_polist_ds'
  332. AND COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.returnqty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.returnqty')) AS DECIMAL(18,6)) END, 0) > 0
  333. ON DUPLICATE KEY UPDATE return_qty=VALUES(return_qty), return_reason=VALUES(return_reason), return_status=VALUES(return_status),
  334. sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
  335. """, batchId, now);
  336. yield return Cmd(
  337. """
  338. INSERT INTO mdp_std_s4_shortage
  339. (tenant_id, factory_id, source_system, work_order, supplier_code, item_code, shortage_qty, risk_level, need_date, source_biz_key, sync_batch_id, sync_time)
  340. SELECT tenant_id, 1, 'AIDOP',
  341. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.work_order')),
  342. IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.supplier_code')), ''),
  343. IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.component_item_code')), ''),
  344. COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.shortage_qty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.shortage_qty')) AS DECIMAL(18,6)) END, 0),
  345. IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.risk_level')), 'MEDIUM'),
  346. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.expected_supply_date')), 'null'), ''),
  347. source_biz_key, @BatchId, @Now
  348. FROM mdp_stg_s4_shortage
  349. WHERE source_table='dwd_material_shortage'
  350. AND COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.shortage_qty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.shortage_qty')) AS DECIMAL(18,6)) END, 0) > 0
  351. ON DUPLICATE KEY UPDATE shortage_qty=VALUES(shortage_qty), risk_level=VALUES(risk_level), need_date=VALUES(need_date),
  352. sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
  353. """, batchId, now);
  354. }
  355. private async Task<int> BuildDwdAsync(string batchId, DateTime now, CancellationToken cancellationToken)
  356. {
  357. cancellationToken.ThrowIfCancellationRequested();
  358. var statDate = now.Date;
  359. var total = 0;
  360. try
  361. {
  362. await _db.Ado.ExecuteCommandAsync(
  363. "DELETE FROM dwd_s4_purchase_execution WHERE stat_date=@StatDate",
  364. new SugarParameter("@StatDate", statDate));
  365. total += await _db.Ado.ExecuteCommandAsync(
  366. """
  367. INSERT INTO dwd_s4_purchase_execution
  368. (tenant_id, factory_id, stat_date, po_no, po_line, supplier_code, item_code,
  369. order_qty, delivery_qty, received_qty, returned_qty, shortage_qty,
  370. due_date, actual_arrival_date, risk_level, source_system, sync_batch_id, sync_time)
  371. SELECT d.tenant_id, 1, @StatDate,
  372. d.po_no, d.po_line, IFNULL(d.supplier_code,''), IFNULL(d.item_code,''),
  373. IFNULL(d.order_qty,0), IFNULL(d.delivery_qty,0), IFNULL(d.receipt_qty,0),
  374. IFNULL(ret.return_qty,0), IFNULL(sh.shortage_qty,0),
  375. DATE(d.due_date), DATE(d.last_receipt_date),
  376. CASE WHEN d.risk_level IN ('HIGH','MEDIUM','LOW') THEN LOWER(d.risk_level)
  377. WHEN IFNULL(d.remaining_qty,0) > 0 THEN 'high'
  378. WHEN IFNULL(d.receipt_qty,0) >= IFNULL(d.order_qty,0) AND IFNULL(d.order_qty,0) > 0 THEN 'low'
  379. ELSE 'medium' END,
  380. 'AIDOP', @BatchId, @Now
  381. FROM dwd_supplier_delivery d
  382. LEFT JOIN (
  383. SELECT tenant_id, po_no, po_line, SUM(IFNULL(return_qty,0)) AS return_qty
  384. FROM mdp_std_s4_return
  385. WHERE IFNULL(po_no,'') <> ''
  386. GROUP BY tenant_id, po_no, po_line
  387. ) ret ON d.tenant_id=ret.tenant_id AND d.po_no=ret.po_no AND d.po_line=ret.po_line
  388. LEFT JOIN (
  389. SELECT tenant_id, supplier_code, item_code, SUM(IFNULL(shortage_qty,0)) AS shortage_qty
  390. FROM mdp_std_s4_shortage
  391. GROUP BY tenant_id, supplier_code, item_code
  392. ) sh ON d.tenant_id=sh.tenant_id AND IFNULL(d.supplier_code,'')=IFNULL(sh.supplier_code,'') AND IFNULL(d.item_code,'')=IFNULL(sh.item_code,'')
  393. WHERE d.stat_date=@StatDate AND IFNULL(d.po_no,'') <> ''
  394. ON DUPLICATE KEY UPDATE
  395. order_qty=VALUES(order_qty), delivery_qty=VALUES(delivery_qty), received_qty=VALUES(received_qty),
  396. returned_qty=VALUES(returned_qty), shortage_qty=VALUES(shortage_qty),
  397. due_date=VALUES(due_date), actual_arrival_date=VALUES(actual_arrival_date),
  398. risk_level=VALUES(risk_level), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
  399. """,
  400. new SugarParameter("@StatDate", statDate),
  401. new SugarParameter("@BatchId", batchId),
  402. new SugarParameter("@Now", now));
  403. }
  404. catch
  405. {
  406. // dwd_supplier_delivery 可能尚未由 S3 生成
  407. }
  408. try
  409. {
  410. await _db.Ado.ExecuteCommandAsync(
  411. "DELETE FROM dwd_po_trans WHERE trans_date=@StatDate",
  412. new SugarParameter("@StatDate", statDate));
  413. total += await _db.Ado.ExecuteCommandAsync(
  414. """
  415. INSERT INTO dwd_po_trans
  416. (tenant_id, factory_id, po_no, po_line, supplier_code, item_code, order_qty, received_qty,
  417. returned_qty, shortage_qty, due_date, actual_arrival_date, risk_level,
  418. trans_date, source_system, sync_batch_id, sync_time)
  419. SELECT tenant_id, factory_id, po_no, po_line, supplier_code, item_code,
  420. order_qty, received_qty, returned_qty, shortage_qty,
  421. due_date, actual_arrival_date, risk_level,
  422. stat_date, source_system, sync_batch_id, sync_time
  423. FROM dwd_s4_purchase_execution
  424. WHERE stat_date=@StatDate
  425. """,
  426. new SugarParameter("@StatDate", statDate));
  427. }
  428. catch
  429. {
  430. try
  431. {
  432. total += await _db.Ado.ExecuteCommandAsync(
  433. """
  434. INSERT INTO dwd_po_trans
  435. (tenant_id, po_no, supplier_code, item_code, order_qty, received_qty, trans_date, source_system, sync_time)
  436. SELECT po.tenant_id, po.po_no, IFNULL(po.supplier_code,''), IFNULL(po.item_code,''),
  437. IFNULL(po.order_qty,0), IFNULL(po.received_qty,0), @StatDate, 'AIDOP', @Now
  438. FROM mdp_std_purchase_order po
  439. WHERE IFNULL(po.po_no,'') <> ''
  440. """,
  441. new SugarParameter("@StatDate", statDate),
  442. new SugarParameter("@Now", now));
  443. }
  444. catch
  445. {
  446. // ignore
  447. }
  448. }
  449. try
  450. {
  451. total += await _db.Ado.ExecuteCommandAsync(
  452. """
  453. INSERT INTO dwd_qc_trans
  454. (tenant_id, item_code, supplier_code, batch_no, sample_qty, defect_qty, result, trans_date, source_system, sync_time)
  455. SELECT tenant_id, IFNULL(item_code,''), IFNULL(supplier_code,''), source_biz_key,
  456. CAST(IFNULL(sample_qty,0) AS SIGNED), CAST(IFNULL(defect_qty,0) AS SIGNED),
  457. CASE WHEN qc_result='FAIL' THEN 'FAIL' WHEN qc_result='CONCESSION' THEN 'CONCESSION' ELSE 'PASS' END,
  458. @StatDate, 'AIDOP', @Now
  459. FROM mdp_std_s4_iqc
  460. WHERE IFNULL(item_code,'') <> ''
  461. ON DUPLICATE KEY UPDATE sample_qty=VALUES(sample_qty), defect_qty=VALUES(defect_qty), result=VALUES(result), sync_time=VALUES(sync_time)
  462. """,
  463. new SugarParameter("@StatDate", statDate),
  464. new SugarParameter("@Now", now));
  465. }
  466. catch
  467. {
  468. // dwd_qc_trans 可能无唯一键,忽略
  469. }
  470. return total;
  471. }
  472. private async Task<int> BuildS4KpiValuesAsync(DateTime now, CancellationToken cancellationToken)
  473. {
  474. var statDate = now.Date;
  475. var rows = await CalculateS4KpiValuesAsync(statDate);
  476. var affected = 0;
  477. foreach (var row in rows)
  478. {
  479. cancellationToken.ThrowIfCancellationRequested();
  480. affected += await UpsertS4KpiValueAsync(row, statDate, now);
  481. }
  482. return affected;
  483. }
  484. private async Task<List<S4KpiCalcRow>> CalculateS4KpiValuesAsync(DateTime statDate)
  485. {
  486. try
  487. {
  488. return await _db.Ado.SqlQueryAsync<S4KpiCalcRow>(
  489. """
  490. SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S4_L1_001' AS MetricCode,
  491. ROUND(AVG(CASE WHEN ds.request_date IS NOT NULL AND COALESCE(ds.submit_date, ds.last_sent_date) IS NOT NULL
  492. THEN TIMESTAMPDIFF(DAY, ds.request_date, COALESCE(ds.submit_date, ds.last_sent_date)) END), 4) AS MetricValue
  493. FROM mdp_std_delivery_schedule ds
  494. WHERE ds.request_date IS NOT NULL
  495. GROUP BY tenant_id
  496. HAVING MetricValue IS NOT NULL
  497. UNION ALL
  498. SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S4_L1_002' AS MetricCode,
  499. ROUND(100 * SUM(CASE WHEN IFNULL(d.receipt_qty,0) >= IFNULL(d.order_qty,0) AND IFNULL(d.order_qty,0) > 0 THEN 1
  500. WHEN d.delivery_status='COMPLETED' THEN 1 ELSE 0 END) / NULLIF(COUNT(1), 0), 4) AS MetricValue
  501. FROM dwd_supplier_delivery d
  502. WHERE d.stat_date=@StatDate AND IFNULL(d.order_qty,0) > 0
  503. GROUP BY tenant_id
  504. UNION ALL
  505. SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S4_L1_003' AS MetricCode,
  506. ROUND(SUM(IFNULL(d.receipt_qty,0)) / GREATEST(COUNT(DISTINCT NULLIF(d.supplier_code,'')), 1), 4) AS MetricValue
  507. FROM dwd_supplier_delivery d
  508. WHERE d.stat_date=@StatDate
  509. GROUP BY tenant_id
  510. UNION ALL
  511. SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S4_L1_004' AS MetricCode,
  512. ROUND(AVG(CASE WHEN IFNULL(d.order_qty,0) > 0
  513. THEN (IFNULL(d.remaining_qty,0) / d.order_qty) * 30 END), 4) AS MetricValue
  514. FROM dwd_supplier_delivery d
  515. WHERE d.stat_date=@StatDate AND IFNULL(d.order_qty,0) > 0
  516. GROUP BY tenant_id
  517. UNION ALL
  518. SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S4_L2_001' AS MetricCode,
  519. ROUND(AVG(CASE WHEN ds.request_date IS NOT NULL AND COALESCE(ds.submit_date, ds.last_sent_date) IS NOT NULL
  520. THEN TIMESTAMPDIFF(DAY, ds.request_date, COALESCE(ds.submit_date, ds.last_sent_date)) END), 4) AS MetricValue
  521. FROM mdp_std_delivery_schedule ds
  522. WHERE ds.request_date IS NOT NULL
  523. GROUP BY tenant_id
  524. HAVING MetricValue IS NOT NULL
  525. UNION ALL
  526. SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S4_L2_002' AS MetricCode,
  527. ROUND(100 * SUM(CASE WHEN IFNULL(d.receipt_qty,0) >= IFNULL(d.order_qty,0) AND IFNULL(d.order_qty,0) > 0 THEN 1
  528. WHEN d.delivery_status='COMPLETED' THEN 1 ELSE 0 END) / NULLIF(COUNT(1), 0), 4) AS MetricValue
  529. FROM dwd_supplier_delivery d
  530. WHERE d.stat_date=@StatDate AND IFNULL(d.order_qty,0) > 0
  531. GROUP BY tenant_id
  532. UNION ALL
  533. SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S4_L2_003' AS MetricCode,
  534. ROUND(SUM(IFNULL(pe.received_qty,0)) / GREATEST(COUNT(DISTINCT NULLIF(pe.item_code,'')), 1), 4) AS MetricValue
  535. FROM dwd_s4_purchase_execution pe
  536. WHERE pe.stat_date=@StatDate
  537. GROUP BY tenant_id
  538. UNION ALL
  539. SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S4_L2_004' AS MetricCode,
  540. ROUND(AVG(CASE WHEN IFNULL(pe.order_qty,0) > 0
  541. THEN ((IFNULL(pe.order_qty,0) - IFNULL(pe.received_qty,0)) / pe.order_qty) * 30 END), 4) AS MetricValue
  542. FROM dwd_s4_purchase_execution pe
  543. WHERE pe.stat_date=@StatDate AND IFNULL(pe.order_qty,0) > 0
  544. GROUP BY tenant_id
  545. UNION ALL
  546. SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S4_L3_001' AS MetricCode,
  547. ROUND(AVG(CASE WHEN ds.request_date IS NOT NULL AND COALESCE(ds.submit_date, ds.last_sent_date) IS NOT NULL
  548. AND IFNULL(ds.supplier_code,'') <> ''
  549. THEN TIMESTAMPDIFF(DAY, ds.request_date, COALESCE(ds.submit_date, ds.last_sent_date)) END), 4) AS MetricValue
  550. FROM mdp_std_delivery_schedule ds
  551. WHERE ds.request_date IS NOT NULL AND IFNULL(ds.supplier_code,'') <> ''
  552. GROUP BY tenant_id
  553. HAVING MetricValue IS NOT NULL
  554. UNION ALL
  555. SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S4_L3_002' AS MetricCode,
  556. ROUND(100 * SUM(CASE WHEN IFNULL(d.receipt_qty,0) >= IFNULL(d.order_qty,0) AND IFNULL(d.order_qty,0) > 0 THEN 1
  557. WHEN d.delivery_status='COMPLETED' THEN 1 ELSE 0 END) / NULLIF(COUNT(1), 0), 4) AS MetricValue
  558. FROM dwd_supplier_delivery d
  559. WHERE d.stat_date=@StatDate AND IFNULL(d.supplier_code,'') <> '' AND IFNULL(d.order_qty,0) > 0
  560. GROUP BY tenant_id
  561. UNION ALL
  562. SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S4_L3_003' AS MetricCode,
  563. ROUND(COUNT(DISTINCT NULLIF(pe.supplier_code,'')) / GREATEST(COUNT(DISTINCT NULLIF(pe.po_no,'')), 1), 4) AS MetricValue
  564. FROM dwd_s4_purchase_execution pe
  565. WHERE pe.stat_date=@StatDate AND IFNULL(pe.supplier_code,'') <> ''
  566. GROUP BY tenant_id
  567. UNION ALL
  568. SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S4_L3_004' AS MetricCode,
  569. ROUND(AVG(CASE WHEN IFNULL(pe.order_qty,0) > 0 AND IFNULL(pe.supplier_code,'') <> ''
  570. THEN ((IFNULL(pe.order_qty,0) - IFNULL(pe.received_qty,0)) / pe.order_qty) * 30 END), 4) AS MetricValue
  571. FROM dwd_s4_purchase_execution pe
  572. WHERE pe.stat_date=@StatDate AND IFNULL(pe.order_qty,0) > 0 AND IFNULL(pe.supplier_code,'') <> ''
  573. GROUP BY tenant_id
  574. """,
  575. new SugarParameter("@StatDate", statDate));
  576. }
  577. catch
  578. {
  579. return new List<S4KpiCalcRow>();
  580. }
  581. }
  582. private async Task<int> UpsertS4KpiValueAsync(S4KpiCalcRow row, DateTime statDate, DateTime now)
  583. {
  584. var meta = await _db.Ado.SqlQuerySingleAsync<S4KpiMetaRow>(
  585. """
  586. SELECT MetricLevel, Direction, YellowThreshold, RedThreshold
  587. FROM ado_smart_ops_kpi_master
  588. WHERE TenantId=@TenantId AND ModuleCode='S4' AND MetricCode=@MetricCode AND IsEnabled=1
  589. LIMIT 1
  590. """,
  591. new SugarParameter("@TenantId", row.TenantId),
  592. new SugarParameter("@MetricCode", row.MetricCode));
  593. if (meta == null || row.MetricValue == null)
  594. return 0;
  595. var table = ResolveKpiValueTable(meta.MetricLevel);
  596. var current = await _db.Ado.SqlQuerySingleAsync<S4KpiValueRow>(
  597. $"""
  598. SELECT id AS Id, metric_value AS MetricValue, target_value AS TargetValue
  599. FROM {table}
  600. WHERE tenant_id=@TenantId AND factory_id=@FactoryId AND module_code='S4'
  601. AND metric_code=@MetricCode AND biz_date=@BizDate AND is_deleted=0
  602. ORDER BY id
  603. LIMIT 1
  604. """,
  605. new SugarParameter("@TenantId", row.TenantId),
  606. new SugarParameter("@FactoryId", row.FactoryId),
  607. new SugarParameter("@MetricCode", row.MetricCode),
  608. new SugarParameter("@BizDate", statDate));
  609. var prior = await _db.Ado.SqlQuerySingleAsync<S4KpiValueRow>(
  610. $"""
  611. SELECT id AS Id, metric_value AS MetricValue, target_value AS TargetValue
  612. FROM {table}
  613. WHERE tenant_id=@TenantId AND factory_id=@FactoryId AND module_code='S4'
  614. AND metric_code=@MetricCode AND biz_date<@BizDate AND is_deleted=0
  615. ORDER BY biz_date DESC, id DESC
  616. LIMIT 1
  617. """,
  618. new SugarParameter("@TenantId", row.TenantId),
  619. new SugarParameter("@FactoryId", row.FactoryId),
  620. new SugarParameter("@MetricCode", row.MetricCode),
  621. new SugarParameter("@BizDate", statDate));
  622. var actual = Math.Round(row.MetricValue.Value, 4);
  623. var target = current?.TargetValue ?? prior?.TargetValue ?? DefaultS4Target(row.MetricCode);
  624. var status = ResolveKpiStatus(actual, target, meta.Direction, meta.YellowThreshold, meta.RedThreshold);
  625. var trend = ResolveTrendFlag(actual, prior?.MetricValue);
  626. if (current != null)
  627. {
  628. return await _db.Ado.ExecuteCommandAsync(
  629. $"""
  630. UPDATE {table}
  631. SET metric_value=@MetricValue, target_value=@TargetValue, status_color=@StatusColor, trend_flag=@TrendFlag,
  632. is_active=1, status='ACTIVE', calc_time=@CalcTime, update_time=@CalcTime
  633. WHERE tenant_id=@TenantId AND factory_id=@FactoryId AND module_code='S4'
  634. AND metric_code=@MetricCode AND biz_date=@BizDate AND is_deleted=0
  635. """,
  636. new SugarParameter("@MetricValue", actual),
  637. new SugarParameter("@TargetValue", target),
  638. new SugarParameter("@StatusColor", status),
  639. new SugarParameter("@TrendFlag", trend),
  640. new SugarParameter("@CalcTime", now),
  641. new SugarParameter("@TenantId", row.TenantId),
  642. new SugarParameter("@FactoryId", row.FactoryId),
  643. new SugarParameter("@MetricCode", row.MetricCode),
  644. new SugarParameter("@BizDate", statDate));
  645. }
  646. var nextId = await _db.Ado.GetLongAsync($"SELECT COALESCE(MAX(id), 0) + 1 FROM {table}");
  647. return await _db.Ado.ExecuteCommandAsync(
  648. $"""
  649. INSERT INTO {table}
  650. (id, tenant_id, factory_id, status, biz_date, create_time, update_time, is_deleted, is_active,
  651. module_code, metric_code, metric_value, target_value, status_color, trend_flag, calc_time)
  652. VALUES
  653. (@Id, @TenantId, @FactoryId, 'ACTIVE', @BizDate, @CalcTime, @CalcTime, 0, 1,
  654. 'S4', @MetricCode, @MetricValue, @TargetValue, @StatusColor, @TrendFlag, @CalcTime)
  655. """,
  656. new SugarParameter("@Id", nextId),
  657. new SugarParameter("@TenantId", row.TenantId),
  658. new SugarParameter("@FactoryId", row.FactoryId),
  659. new SugarParameter("@BizDate", statDate),
  660. new SugarParameter("@CalcTime", now),
  661. new SugarParameter("@MetricCode", row.MetricCode),
  662. new SugarParameter("@MetricValue", actual),
  663. new SugarParameter("@TargetValue", target),
  664. new SugarParameter("@StatusColor", status),
  665. new SugarParameter("@TrendFlag", trend));
  666. }
  667. private async Task<long> InsertSyncLogAsync(long entityId, string entityName, string batchId, int rowsRead)
  668. {
  669. await _db.Ado.ExecuteCommandAsync(
  670. """
  671. INSERT INTO mdp_sync_log
  672. (tenant_id, entity_id, source_code, entity_name, sync_batch_id, sync_type, trigger_type, sync_start, rows_read, status)
  673. VALUES (0, @EntityId, 'AIDOPDEV_MYSQL', @EntityName, @BatchId, 'FULL', 'AUTO', NOW(), @RowsRead, 'RUNNING')
  674. """,
  675. new SugarParameter("@EntityId", entityId),
  676. new SugarParameter("@EntityName", entityName),
  677. new SugarParameter("@BatchId", batchId),
  678. new SugarParameter("@RowsRead", rowsRead));
  679. return await _db.Ado.GetLongAsync(
  680. "SELECT id FROM mdp_sync_log WHERE sync_batch_id=@BatchId AND entity_id=@EntityId ORDER BY id DESC LIMIT 1",
  681. new List<SugarParameter> { new("@BatchId", batchId), new("@EntityId", entityId) });
  682. }
  683. private async Task MarkSyncLogSuccessAsync(long logId, DateTime started, int affected)
  684. {
  685. await _db.Ado.ExecuteCommandAsync(
  686. """
  687. UPDATE mdp_sync_log
  688. SET status='SUCCESS', sync_end=NOW(), duration_ms=@DurationMs, rows_written=@RowsWritten
  689. WHERE id=@Id
  690. """,
  691. new SugarParameter("@DurationMs", (int)(DateTime.Now - started).TotalMilliseconds),
  692. new SugarParameter("@RowsWritten", affected),
  693. new SugarParameter("@Id", logId));
  694. }
  695. private async Task MarkSyncLogFailedAsync(long logId, DateTime started, string message)
  696. {
  697. await _db.Ado.ExecuteCommandAsync(
  698. """
  699. UPDATE mdp_sync_log
  700. SET status='FAILED', sync_end=NOW(), duration_ms=@DurationMs, error_message=@ErrorMessage
  701. WHERE id=@Id
  702. """,
  703. new SugarParameter("@DurationMs", (int)(DateTime.Now - started).TotalMilliseconds),
  704. new SugarParameter("@ErrorMessage", Truncate(message, 2000)),
  705. new SugarParameter("@Id", logId));
  706. }
  707. private async Task<long> InsertTransformRunLogAsync(string batchId, DateTime startedAt, string triggerType)
  708. {
  709. await _db.Ado.ExecuteCommandAsync(
  710. """
  711. INSERT INTO mdp_transform_run_log
  712. (tenant_id, job_code, job_name, trigger_type, batch_id, status, start_time)
  713. VALUES (0, @JobCode, 'S4 MDP同步与标准化转换', @TriggerType, @BatchId, 'RUNNING', @StartTime)
  714. """,
  715. new SugarParameter("@JobCode", JobCode),
  716. new SugarParameter("@TriggerType", NormalizeTriggerType(triggerType)),
  717. new SugarParameter("@BatchId", batchId),
  718. new SugarParameter("@StartTime", startedAt));
  719. return await _db.Ado.GetLongAsync(
  720. "SELECT id FROM mdp_transform_run_log WHERE batch_id=@BatchId ORDER BY id DESC LIMIT 1",
  721. new List<SugarParameter> { new("@BatchId", batchId) });
  722. }
  723. private async Task MarkTransformRunSuccessAsync(long runLogId, DateTime startedAt, S4MdpSyncTransformResult result)
  724. {
  725. var finishedAt = DateTime.Now;
  726. await _db.Ado.ExecuteCommandAsync(
  727. """
  728. UPDATE mdp_transform_run_log
  729. SET status='SUCCESS', end_time=@EndTime, duration_ms=@DurationMs,
  730. stage_rows=@StageRows, standard_rows=@StandardRows, dwd_rows=@DwdRows,
  731. summary_json=@SummaryJson, update_time=CURRENT_TIMESTAMP
  732. WHERE id=@Id
  733. """,
  734. new SugarParameter("@EndTime", finishedAt),
  735. new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds),
  736. new SugarParameter("@StageRows", result.StageRows),
  737. new SugarParameter("@StandardRows", result.StandardRows),
  738. new SugarParameter("@DwdRows", result.DwdRows),
  739. new SugarParameter("@SummaryJson", BuildRunSummaryJson(result)),
  740. new SugarParameter("@Id", runLogId));
  741. }
  742. private async Task MarkTransformRunFailedAsync(long runLogId, DateTime startedAt, string message)
  743. {
  744. try
  745. {
  746. var finishedAt = DateTime.Now;
  747. await _db.Ado.ExecuteCommandAsync(
  748. """
  749. UPDATE mdp_transform_run_log
  750. SET status='FAILED', end_time=@EndTime, duration_ms=@DurationMs,
  751. error_message=@ErrorMessage, update_time=CURRENT_TIMESTAMP
  752. WHERE id=@Id
  753. """,
  754. new SugarParameter("@EndTime", finishedAt),
  755. new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds),
  756. new SugarParameter("@ErrorMessage", Truncate(message, 2000)),
  757. new SugarParameter("@Id", runLogId));
  758. }
  759. catch (Exception ex)
  760. {
  761. Console.Error.WriteLine($"[S4MdpSyncTransform] MarkTransformRunFailed write failed (runLogId={runLogId}): {ex.Message}");
  762. }
  763. }
  764. private static S4MdpSqlCommand Cmd(string sql, string batchId, DateTime now) =>
  765. new(sql, new[]
  766. {
  767. new SugarParameter("@BatchId", batchId),
  768. new SugarParameter("@Now", now),
  769. new SugarParameter("@StatDate", now.Date)
  770. });
  771. private static string BuildJsonObjectExpression(IEnumerable<string> columns)
  772. {
  773. var parts = columns.SelectMany(c => new[] { $"'{c.Replace("'", "''")}'", $"s.`{c}`" });
  774. return $"JSON_OBJECT({string.Join(",", parts)})";
  775. }
  776. private static string FindColumn(IEnumerable<string> columns, string expected) =>
  777. columns.First(u => string.Equals(u, expected, StringComparison.OrdinalIgnoreCase));
  778. private static string BuildRunSummaryJson(S4MdpSyncTransformResult result) =>
  779. $$"""{"batchId":"{{result.BatchId}}","stageRows":{{result.StageRows}},"standardRows":{{result.StandardRows}},"dwdRows":{{result.DwdRows}},"kpiRows":{{result.KpiRows}}}""";
  780. private static string ResolveKpiValueTable(int metricLevel) => metricLevel switch
  781. {
  782. 1 => "ado_s9_kpi_value_l1_day",
  783. 2 => "ado_s9_kpi_value_l2_day",
  784. 3 => "ado_s9_kpi_value_l3_day",
  785. 4 => "ado_s9_kpi_value_l4_day",
  786. _ => "ado_s9_kpi_value_l2_day"
  787. };
  788. private static decimal DefaultS4Target(string metricCode) => metricCode switch
  789. {
  790. "S4_L1_001" or "S4_L2_001" or "S4_L3_001" => 10.26m,
  791. "S4_L1_002" or "S4_L2_002" or "S4_L3_002" => 99m,
  792. "S4_L1_003" or "S4_L2_003" or "S4_L3_003" => 250m,
  793. "S4_L1_004" or "S4_L2_004" or "S4_L3_004" => 45m,
  794. _ => 0m
  795. };
  796. private static string ResolveKpiStatus(decimal actual, decimal target, string? direction, decimal? yellowThreshold, decimal? redThreshold)
  797. {
  798. if (target <= 0) return "gray";
  799. var ratio = actual / target * 100m;
  800. if (string.Equals(direction, "lower_is_better", StringComparison.OrdinalIgnoreCase))
  801. {
  802. if (actual <= target) return "green";
  803. if (ratio <= (yellowThreshold ?? 110m)) return "yellow";
  804. return ratio >= (redThreshold ?? 120m) ? "red" : "yellow";
  805. }
  806. if (actual >= target) return "green";
  807. if (ratio >= (yellowThreshold ?? 95m)) return "yellow";
  808. return ratio <= (redThreshold ?? 80m) ? "red" : "yellow";
  809. }
  810. private static string ResolveTrendFlag(decimal actual, decimal? previous)
  811. {
  812. if (previous == null) return "flat";
  813. if (actual > previous.Value) return "up";
  814. if (actual < previous.Value) return "down";
  815. return "flat";
  816. }
  817. private static string NormalizeTriggerType(string? triggerType)
  818. => string.IsNullOrWhiteSpace(triggerType) ? "AUTO" : triggerType.Trim().ToUpperInvariant();
  819. private static string Truncate(string? raw, int maxLength)
  820. {
  821. if (string.IsNullOrEmpty(raw)) return string.Empty;
  822. return raw.Length <= maxLength ? raw : raw[..maxLength];
  823. }
  824. private sealed class S4ColumnRow
  825. {
  826. public string ColumnName { get; set; } = string.Empty;
  827. }
  828. private sealed class S4MdpEntityRow
  829. {
  830. public long Id { get; set; }
  831. public string EntityName { get; set; } = string.Empty;
  832. }
  833. private sealed class S4KpiCalcRow
  834. {
  835. public long TenantId { get; set; }
  836. public long FactoryId { get; set; }
  837. public string MetricCode { get; set; } = string.Empty;
  838. public decimal? MetricValue { get; set; }
  839. }
  840. private sealed class S4KpiMetaRow
  841. {
  842. public int MetricLevel { get; set; }
  843. public string Direction { get; set; } = "higher_is_better";
  844. public decimal? YellowThreshold { get; set; }
  845. public decimal? RedThreshold { get; set; }
  846. }
  847. private sealed class S4KpiValueRow
  848. {
  849. public long Id { get; set; }
  850. public decimal? MetricValue { get; set; }
  851. public decimal? TargetValue { get; set; }
  852. }
  853. }
  854. public sealed class S4MdpSyncTransformResult
  855. {
  856. public long RunLogId { get; set; }
  857. public string BatchId { get; set; } = string.Empty;
  858. public int StageRows { get; set; }
  859. public int StandardRows { get; set; }
  860. public int DwdRows { get; set; }
  861. public int KpiRows { get; set; }
  862. }
  863. internal sealed record S4MdpSqlCommand(string Sql, SugarParameter[] Parameters);
  864. internal sealed record S4MdpEntityConfig(
  865. string EntityCode,
  866. string SourceTable,
  867. string TargetTable,
  868. string SourceRowIdExpression,
  869. string SourceBizKeyExpression,
  870. bool Optional = false)
  871. {
  872. public static readonly IReadOnlyList<S4MdpEntityConfig> All = new List<S4MdpEntityConfig>
  873. {
  874. new("S4_IQC_RECEIPT", "PurOrdRctDetail", "mdp_stg_s4_iqc", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`Receiver`,''), ':', IFNULL(s.`Line`,''))"),
  875. new("S4_SHIPMENT_EXEC", "scm_shdzb", "mdp_stg_s4_shipment", "id", "CONCAT(IFNULL(s.`glid`,''), ':', IFNULL(s.`id`,''))"),
  876. new("S4_RETURN_EXEC", "srm_polist_ds", "mdp_stg_s4_return", "Id", "s.`dsnum`"),
  877. new("S4_SHORTAGE_EXEC", "dwd_material_shortage", "mdp_stg_s4_shortage", "id", "CONCAT(IFNULL(s.`work_order`,''), ':', IFNULL(s.`component_item_code`,''))", Optional: true)
  878. };
  879. }