S3MdpSyncTransformService.cs 65 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074
  1. using Admin.NET.Plugin.AiDOP.SmartOps;
  2. namespace Admin.NET.Plugin.AiDOP.Supply;
  3. /// <summary>
  4. /// S3 首批 MDP 同步和标准化转换服务。
  5. /// </summary>
  6. public class S3MdpSyncTransformService : ITransient
  7. {
  8. private readonly ISqlSugarClient _db;
  9. private readonly SmartOpsKpiAtomicBuildService _atomicBuild;
  10. public S3MdpSyncTransformService(ISqlSugarClient db, SmartOpsKpiAtomicBuildService atomicBuild)
  11. {
  12. _db = db;
  13. _atomicBuild = atomicBuild;
  14. }
  15. public async Task<S3MdpSyncTransformResult> RunFullAsync(CancellationToken cancellationToken = default, string triggerType = "AUTO")
  16. {
  17. cancellationToken.ThrowIfCancellationRequested();
  18. var now = DateTime.Now;
  19. var batchId = $"S3_MDP_FULL_{now:yyyyMMddHHmmss}";
  20. var runLogId = await InsertTransformRunLogAsync(batchId, now, triggerType);
  21. var result = new S3MdpSyncTransformResult { BatchId = batchId, RunLogId = runLogId };
  22. try
  23. {
  24. result.StageRows = await SyncStagingAsync(batchId, now, cancellationToken);
  25. result.StandardRows = await TransformStandardAsync(batchId, now, cancellationToken);
  26. result.DwdRows = await BuildDwdAsync(batchId, now, cancellationToken);
  27. result.KpiRows = await BuildS3KpiValuesAsync(now, cancellationToken);
  28. result.AtomicRows = await _atomicBuild.BuildSupplyPurchaseDomainForAllDatesAsync(batchId, cancellationToken: cancellationToken);
  29. result.AtomicRows += await _atomicBuild.BuildInventoryDomainForAllDatesAsync(batchId, cancellationToken: cancellationToken);
  30. await MarkTransformRunSuccessAsync(runLogId, now, result);
  31. return result;
  32. }
  33. catch (Exception ex)
  34. {
  35. await MarkTransformRunFailedAsync(runLogId, now, ex.Message);
  36. throw;
  37. }
  38. }
  39. public async Task<S3MaterialRefreshResult> RefreshMaterialReadinessAsync(string? batchId = null, CancellationToken cancellationToken = default)
  40. {
  41. cancellationToken.ThrowIfCancellationRequested();
  42. var now = DateTime.Now;
  43. batchId ??= $"S3_MATERIAL_{now:yyyyMMddHHmmss}";
  44. _db.Ado.BeginTran();
  45. try
  46. {
  47. var stdCount = await _db.Ado.GetIntAsync("SELECT COUNT(1) FROM mdp_std_material_readiness WHERE IFNULL(work_order, '') <> ''");
  48. var readinessCount = await UpsertMaterialReadinessDwdAsync(batchId, now);
  49. await DeleteCurrentShortageAsync(now.Date);
  50. var shortageCount = await InsertMaterialShortageAsync(batchId, now);
  51. _db.Ado.CommitTran();
  52. return new S3MaterialRefreshResult
  53. {
  54. BatchId = batchId,
  55. StdCount = stdCount,
  56. ReadinessCount = readinessCount,
  57. ShortageCount = shortageCount
  58. };
  59. }
  60. catch
  61. {
  62. _db.Ado.RollbackTran();
  63. throw;
  64. }
  65. }
  66. private async Task<int> SyncStagingAsync(string batchId, DateTime now, CancellationToken cancellationToken)
  67. {
  68. var total = 0;
  69. foreach (var entity in S3MdpEntityConfig.All)
  70. {
  71. cancellationToken.ThrowIfCancellationRequested();
  72. total += await SyncOneEntityAsync(entity, batchId, now);
  73. }
  74. return total;
  75. }
  76. private async Task<int> SyncOneEntityAsync(S3MdpEntityConfig entity, string batchId, DateTime now)
  77. {
  78. var entityRow = await _db.Ado.SqlQuerySingleAsync<S3MdpEntityRow>(
  79. "SELECT id AS Id, entity_name AS EntityName FROM mdp_entity WHERE tenant_id=0 AND entity_code=@EntityCode LIMIT 1",
  80. new SugarParameter("@EntityCode", entity.EntityCode));
  81. if (entityRow == null) throw Oops.Oh($"未找到 MDP 实体配置:{entity.EntityCode}");
  82. var columns = await _db.Ado.SqlQueryAsync<S3ColumnRow>(
  83. """
  84. SELECT COLUMN_NAME AS ColumnName
  85. FROM information_schema.COLUMNS
  86. WHERE TABLE_SCHEMA=DATABASE() AND TABLE_NAME=@TableName
  87. ORDER BY ORDINAL_POSITION
  88. """,
  89. new SugarParameter("@TableName", entity.SourceTable));
  90. if (columns.Count == 0) throw Oops.Oh($"未找到源表:{entity.SourceTable}");
  91. var names = columns.Select(u => u.ColumnName).ToList();
  92. var tenantExpr = names.Any(u => string.Equals(u, "tenant_id", StringComparison.OrdinalIgnoreCase))
  93. ? $"IFNULL(s.`{FindColumn(names, "tenant_id")}`,0)"
  94. : "0";
  95. var sourceRowExpr = names.Any(u => string.Equals(u, entity.SourceRowIdExpression, StringComparison.OrdinalIgnoreCase))
  96. ? $"s.`{FindColumn(names, entity.SourceRowIdExpression)}`"
  97. : entity.SourceRowIdExpression;
  98. var rawDataExpr = BuildJsonObjectExpression(names);
  99. var rowsRead = await _db.Ado.GetIntAsync($"SELECT COUNT(1) FROM `{entity.SourceTable}`");
  100. var logId = await InsertSyncLogAsync(entityRow.Id, entityRow.EntityName, batchId, rowsRead);
  101. var started = DateTime.Now;
  102. try
  103. {
  104. var affected = await _db.Ado.ExecuteCommandAsync(
  105. $"""
  106. INSERT INTO `{entity.TargetTable}`
  107. (tenant_id, source_system, source_table, source_row_id, source_biz_key, sync_batch_id, sync_time, process_status, raw_data)
  108. SELECT
  109. {tenantExpr},
  110. 'AIDOP',
  111. @SourceTable,
  112. CAST({sourceRowExpr} AS CHAR),
  113. CAST(COALESCE({entity.SourceBizKeyExpression}, CAST({sourceRowExpr} AS CHAR)) AS CHAR),
  114. @BatchId,
  115. @Now,
  116. 'PENDING',
  117. {rawDataExpr}
  118. FROM `{entity.SourceTable}` s
  119. ON DUPLICATE KEY UPDATE
  120. source_row_id=VALUES(source_row_id),
  121. sync_batch_id=VALUES(sync_batch_id),
  122. sync_time=VALUES(sync_time),
  123. process_status=VALUES(process_status),
  124. raw_data=VALUES(raw_data),
  125. update_time=CURRENT_TIMESTAMP
  126. """,
  127. new SugarParameter("@SourceTable", entity.SourceTable),
  128. new SugarParameter("@BatchId", batchId),
  129. new SugarParameter("@Now", now));
  130. await MarkSyncLogSuccessAsync(logId, started, affected);
  131. return rowsRead;
  132. }
  133. catch (Exception ex)
  134. {
  135. await MarkSyncLogFailedAsync(logId, started, ex.Message);
  136. throw;
  137. }
  138. }
  139. private async Task<int> TransformStandardAsync(string batchId, DateTime now, CancellationToken cancellationToken)
  140. {
  141. var total = 0;
  142. foreach (var command in BuildStandardCommands(batchId, now))
  143. {
  144. cancellationToken.ThrowIfCancellationRequested();
  145. total += await _db.Ado.ExecuteCommandAsync(command.Sql, command.Parameters);
  146. }
  147. return total;
  148. }
  149. private async Task<int> BuildDwdAsync(string batchId, DateTime now, CancellationToken cancellationToken)
  150. {
  151. var total = 0;
  152. foreach (var command in BuildDwdCommands(batchId, now))
  153. {
  154. cancellationToken.ThrowIfCancellationRequested();
  155. total += await _db.Ado.ExecuteCommandAsync(command.Sql, command.Parameters);
  156. }
  157. total += await UpsertMaterialReadinessDwdAsync(batchId, now);
  158. await DeleteCurrentShortageAsync(now.Date);
  159. total += await InsertMaterialShortageAsync(batchId, now);
  160. return total;
  161. }
  162. private async Task<int> BuildS3KpiValuesAsync(DateTime now, CancellationToken cancellationToken)
  163. {
  164. var statDate = now.Date;
  165. var rows = await CalculateS3KpiValuesAsync(statDate);
  166. var affected = 0;
  167. foreach (var row in rows)
  168. {
  169. cancellationToken.ThrowIfCancellationRequested();
  170. affected += await UpsertS3KpiValueAsync(row, statDate, now);
  171. }
  172. return affected;
  173. }
  174. private async Task<List<S3KpiCalcRow>> CalculateS3KpiValuesAsync(DateTime statDate)
  175. {
  176. return await _db.Ado.SqlQueryAsync<S3KpiCalcRow>(
  177. """
  178. SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S3_L2_004' AS MetricCode,
  179. ROUND(AVG(TIMESTAMPDIFF(HOUR, request_date, submit_date) / 24), 4) AS MetricValue
  180. FROM mdp_std_delivery_schedule
  181. WHERE request_date IS NOT NULL AND submit_date IS NOT NULL AND submit_date >= request_date
  182. GROUP BY tenant_id
  183. UNION ALL
  184. SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S3_L2_005' AS MetricCode,
  185. ROUND(100 * SUM(CASE WHEN IFNULL(schedule_qty,0) >= IFNULL(order_qty,0) AND IFNULL(order_qty,0) > 0 THEN 1 ELSE 0 END) / COUNT(1), 4) AS MetricValue
  186. FROM dwd_supplier_delivery
  187. WHERE stat_date=@StatDate AND IFNULL(order_qty,0) > 0
  188. GROUP BY tenant_id
  189. UNION ALL
  190. SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S3_L1_001' AS MetricCode,
  191. ROUND(AVG(TIMESTAMPDIFF(HOUR, request_date, submit_date) / 24), 4) AS MetricValue
  192. FROM mdp_std_delivery_schedule
  193. WHERE request_date IS NOT NULL AND submit_date IS NOT NULL AND submit_date >= request_date
  194. GROUP BY tenant_id
  195. UNION ALL
  196. SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S3_L1_002' AS MetricCode,
  197. ROUND(100 * SUM(CASE WHEN IFNULL(schedule_qty,0) >= IFNULL(order_qty,0) AND IFNULL(order_qty,0) > 0 THEN 1 ELSE 0 END) / COUNT(1), 4) AS MetricValue
  198. FROM dwd_supplier_delivery
  199. WHERE stat_date=@StatDate AND IFNULL(order_qty,0) > 0
  200. GROUP BY tenant_id
  201. UNION ALL
  202. SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S3_L3_001' AS MetricCode,
  203. ROUND(AVG(TIMESTAMPDIFF(HOUR, request_date, send_date) / 24), 4) AS MetricValue
  204. FROM mdp_std_purchase_request
  205. WHERE request_date IS NOT NULL AND send_date IS NOT NULL AND send_date >= request_date
  206. GROUP BY tenant_id
  207. UNION ALL
  208. SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S3_L3_002' AS MetricCode,
  209. ROUND(AVG(TIMESTAMPDIFF(HOUR, order_date, COALESCE(need_date, due_date)) / 24), 4) AS MetricValue
  210. FROM mdp_std_purchase_order
  211. WHERE order_date IS NOT NULL AND COALESCE(need_date, due_date) IS NOT NULL
  212. AND COALESCE(need_date, due_date) >= order_date
  213. GROUP BY tenant_id
  214. UNION ALL
  215. SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S3_L3_003' AS MetricCode,
  216. ROUND(100 * SUM(CASE WHEN ready_status='READY' THEN 1 ELSE 0 END) / NULLIF(COUNT(1), 0), 4) AS MetricValue
  217. FROM dwd_material_readiness
  218. WHERE stat_date=@StatDate
  219. GROUP BY tenant_id
  220. UNION ALL
  221. SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S3_L3_004' AS MetricCode,
  222. ROUND(100 * SUM(CASE WHEN submit_date IS NOT NULL AND request_date IS NOT NULL
  223. AND TIMESTAMPDIFF(HOUR, request_date, submit_date) <= 72 THEN 1 ELSE 0 END) / NULLIF(COUNT(1), 0), 4) AS MetricValue
  224. FROM mdp_std_delivery_schedule
  225. WHERE request_date IS NOT NULL
  226. GROUP BY tenant_id
  227. UNION ALL
  228. SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S3_L3_005' AS MetricCode,
  229. ROUND(100 * SUM(CASE WHEN risk_level IN ('HIGH','MEDIUM') OR IFNULL(remaining_qty,0) > 0 THEN 1 ELSE 0 END) / NULLIF(COUNT(1), 0), 4) AS MetricValue
  230. FROM dwd_supplier_delivery
  231. WHERE stat_date=@StatDate
  232. GROUP BY tenant_id
  233. UNION ALL
  234. SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S3_L3_006' AS MetricCode,
  235. ROUND(100 * (1 - SUM(CASE WHEN IFNULL(shortage_qty,0) > 0 THEN 1 ELSE 0 END) / NULLIF(COUNT(1), 0)), 4) AS MetricValue
  236. FROM dwd_material_readiness
  237. WHERE stat_date=@StatDate
  238. GROUP BY tenant_id
  239. """,
  240. new SugarParameter("@StatDate", statDate));
  241. }
  242. private async Task<int> UpsertS3KpiValueAsync(S3KpiCalcRow row, DateTime statDate, DateTime now)
  243. {
  244. var meta = await _db.Ado.SqlQuerySingleAsync<S3KpiMetaRow>(
  245. """
  246. SELECT MetricLevel, Direction, YellowThreshold, RedThreshold
  247. FROM ado_smart_ops_kpi_master
  248. WHERE TenantId=@TenantId AND ModuleCode='S3' AND MetricCode=@MetricCode AND IsEnabled=1
  249. LIMIT 1
  250. """,
  251. new SugarParameter("@TenantId", row.TenantId),
  252. new SugarParameter("@MetricCode", row.MetricCode));
  253. if (meta == null || row.MetricValue == null)
  254. return 0;
  255. var table = ResolveKpiValueTable(meta.MetricLevel);
  256. var current = await _db.Ado.SqlQuerySingleAsync<S3KpiValueRow>(
  257. $"""
  258. SELECT id AS Id, metric_value AS MetricValue, target_value AS TargetValue
  259. FROM {table}
  260. WHERE tenant_id=@TenantId AND factory_id=@FactoryId AND module_code='S3'
  261. AND metric_code=@MetricCode AND biz_date=@BizDate AND is_deleted=0
  262. ORDER BY id
  263. LIMIT 1
  264. """,
  265. new SugarParameter("@TenantId", row.TenantId),
  266. new SugarParameter("@FactoryId", row.FactoryId),
  267. new SugarParameter("@MetricCode", row.MetricCode),
  268. new SugarParameter("@BizDate", statDate));
  269. var prior = await _db.Ado.SqlQuerySingleAsync<S3KpiValueRow>(
  270. $"""
  271. SELECT id AS Id, metric_value AS MetricValue, target_value AS TargetValue
  272. FROM {table}
  273. WHERE tenant_id=@TenantId AND factory_id=@FactoryId AND module_code='S3'
  274. AND metric_code=@MetricCode AND biz_date<@BizDate AND is_deleted=0
  275. ORDER BY biz_date DESC, id DESC
  276. LIMIT 1
  277. """,
  278. new SugarParameter("@TenantId", row.TenantId),
  279. new SugarParameter("@FactoryId", row.FactoryId),
  280. new SugarParameter("@MetricCode", row.MetricCode),
  281. new SugarParameter("@BizDate", statDate));
  282. var actual = Math.Round(row.MetricValue.Value, 4);
  283. var target = current?.TargetValue ?? prior?.TargetValue ?? DefaultS3Target(row.MetricCode);
  284. var status = ResolveKpiStatus(actual, target, meta.Direction, meta.YellowThreshold, meta.RedThreshold);
  285. var trend = ResolveTrendFlag(actual, prior?.MetricValue);
  286. if (current != null)
  287. {
  288. return await _db.Ado.ExecuteCommandAsync(
  289. $"""
  290. UPDATE {table}
  291. SET metric_value=@MetricValue, target_value=@TargetValue, status_color=@StatusColor, trend_flag=@TrendFlag,
  292. is_active=1, status='ACTIVE', calc_time=@CalcTime, update_time=@CalcTime
  293. WHERE tenant_id=@TenantId AND factory_id=@FactoryId AND module_code='S3'
  294. AND metric_code=@MetricCode AND biz_date=@BizDate AND is_deleted=0
  295. """,
  296. new SugarParameter("@MetricValue", actual),
  297. new SugarParameter("@TargetValue", target),
  298. new SugarParameter("@StatusColor", status),
  299. new SugarParameter("@TrendFlag", trend),
  300. new SugarParameter("@CalcTime", now),
  301. new SugarParameter("@TenantId", row.TenantId),
  302. new SugarParameter("@FactoryId", row.FactoryId),
  303. new SugarParameter("@MetricCode", row.MetricCode),
  304. new SugarParameter("@BizDate", statDate));
  305. }
  306. var nextId = await _db.Ado.GetLongAsync($"SELECT COALESCE(MAX(id), 0) + 1 FROM {table}");
  307. return await _db.Ado.ExecuteCommandAsync(
  308. $"""
  309. INSERT INTO {table}
  310. (id, tenant_id, factory_id, status, biz_date, create_time, update_time, is_deleted, is_active,
  311. module_code, metric_code, metric_value, target_value, status_color, trend_flag, calc_time)
  312. VALUES
  313. (@Id, @TenantId, @FactoryId, 'ACTIVE', @BizDate, @CalcTime, @CalcTime, 0, 1,
  314. 'S3', @MetricCode, @MetricValue, @TargetValue, @StatusColor, @TrendFlag, @CalcTime)
  315. """,
  316. new SugarParameter("@Id", nextId),
  317. new SugarParameter("@TenantId", row.TenantId),
  318. new SugarParameter("@FactoryId", row.FactoryId),
  319. new SugarParameter("@BizDate", statDate),
  320. new SugarParameter("@CalcTime", now),
  321. new SugarParameter("@MetricCode", row.MetricCode),
  322. new SugarParameter("@MetricValue", actual),
  323. new SugarParameter("@TargetValue", target),
  324. new SugarParameter("@StatusColor", status),
  325. new SugarParameter("@TrendFlag", trend));
  326. }
  327. private IEnumerable<S3MdpSqlCommand> BuildStandardCommands(string batchId, DateTime now)
  328. {
  329. yield return Cmd(
  330. """
  331. INSERT INTO mdp_std_supplier
  332. (tenant_id, factory_id, company_id, source_system, supplier_code, supplier_name, supplier_type, status, contact, address, currency_type, source_biz_key, sync_batch_id, sync_time)
  333. SELECT tenant_id, factory_id, company_id, 'AIDOP',
  334. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Supp')),
  335. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.SortName')),
  336. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Type')),
  337. CASE WHEN JSON_EXTRACT(raw_data,'$.IsActive') IN (1, true) THEN 'ACTIVE' ELSE 'INACTIVE' END,
  338. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.contact')),
  339. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.address')),
  340. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Curr')),
  341. source_biz_key, @BatchId, @Now
  342. FROM mdp_stg_supplier
  343. WHERE source_table='SuppMaster' AND IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Supp')), '') <> ''
  344. ON DUPLICATE KEY UPDATE supplier_name=VALUES(supplier_name), supplier_type=VALUES(supplier_type), status=VALUES(status),
  345. contact=VALUES(contact), address=VALUES(address), currency_type=VALUES(currency_type), sync_batch_id=VALUES(sync_batch_id),
  346. sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
  347. """, batchId, now);
  348. yield return Cmd(
  349. """
  350. INSERT INTO mdp_std_item
  351. (tenant_id, factory_id, company_id, source_system, item_code, item_name, model, unit, item_type, status, source_biz_key, sync_batch_id, sync_time)
  352. SELECT tenant_id, factory_id, company_id, 'AIDOP',
  353. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ItemNum')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.number'))),
  354. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Descr')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.name')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.icitem_name'))),
  355. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Drawing')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.model'))),
  356. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.UM')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.unit'))),
  357. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ItemType')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.item_type'))),
  358. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Status')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.is_active'))),
  359. source_biz_key, @BatchId, @Now
  360. FROM mdp_stg_item
  361. WHERE COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ItemNum')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.number')), '') <> ''
  362. ON DUPLICATE KEY UPDATE item_name=VALUES(item_name), model=VALUES(model), unit=VALUES(unit), item_type=VALUES(item_type),
  363. status=VALUES(status), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
  364. """, batchId, now);
  365. yield return Cmd(
  366. """
  367. INSERT INTO mdp_std_supplier_item
  368. (tenant_id, factory_id, company_id, source_system, item_code, supplier_code, supplier_name, supplier_type, quota_rate, lead_time, min_qty, packaging_qty, price, currency_type, effective_date, expire_date, status, source_biz_key, sync_batch_id, sync_time)
  369. SELECT tenant_id, factory_id, company_id, 'AIDOP',
  370. IFNULL(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.number')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.icitem_number')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.icitem_name'))), ''),
  371. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.supplier_number')),
  372. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.supplier_name')),
  373. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.supplier_type')),
  374. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.quota_rate')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.quota_rate')) AS DECIMAL(18,6)) END,
  375. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.lead_time')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.lead_time')) AS DECIMAL(18,6)) END,
  376. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.qty_min')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.qty_min')) AS DECIMAL(18,6)) END,
  377. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.packaging_qty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.packaging_qty')) AS DECIMAL(18,6)) END,
  378. COALESCE(
  379. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.netpurchase_price')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.netpurchase_price')) AS DECIMAL(18,6)) END,
  380. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.order_price')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.order_price')) AS DECIMAL(18,6)) END
  381. ),
  382. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.currency_type')),
  383. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.effective_date')), 'null'), ''),
  384. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.expiring_date')), 'null'), ''),
  385. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.is_active')),
  386. source_biz_key, @BatchId, @Now
  387. FROM mdp_stg_source_list
  388. WHERE source_table='srm_purchase' AND IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.supplier_number')), '') <> ''
  389. AND IFNULL(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.number')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.icitem_number')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.icitem_name'))), '') <> ''
  390. ON DUPLICATE KEY UPDATE supplier_name=VALUES(supplier_name), quota_rate=VALUES(quota_rate), lead_time=VALUES(lead_time),
  391. min_qty=VALUES(min_qty), packaging_qty=VALUES(packaging_qty), price=VALUES(price), sync_batch_id=VALUES(sync_batch_id),
  392. sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
  393. """, batchId, now);
  394. yield return Cmd(
  395. """
  396. INSERT INTO mdp_std_supply_demand
  397. (tenant_id, factory_id, company_id, source_system, demand_no, demand_line, demand_type, item_code, item_name, required_qty, required_date, supplier_code, status, source_biz_key, sync_batch_id, sync_time)
  398. SELECT tenant_id, factory_id, company_id, 'AIDOP',
  399. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.number')), source_row_id),
  400. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.line')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Line'))),
  401. source_table,
  402. IFNULL(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.itemnum')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.item_number')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ItemNum'))), ''),
  403. IFNULL(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.itemname')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.item_name')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Descr'))), ''),
  404. COALESCE(
  405. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.qty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.qty')) AS DECIMAL(18,6)) END,
  406. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.required_qty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.required_qty')) AS DECIMAL(18,6)) END,
  407. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.shortqty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.shortqty')) AS DECIMAL(18,6)) END,
  408. 0),
  409. COALESCE(NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.required_date')), 'null'), ''), NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.needdate')), 'null'), ''), NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.NeedDate')), 'null'), '')),
  410. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.supplier_number')),
  411. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.status')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Status'))),
  412. source_biz_key, @BatchId, @Now
  413. FROM mdp_stg_supply_demand
  414. WHERE IFNULL(source_biz_key,'') <> ''
  415. ON DUPLICATE KEY UPDATE item_code=VALUES(item_code), item_name=VALUES(item_name), required_qty=VALUES(required_qty),
  416. required_date=VALUES(required_date), supplier_code=VALUES(supplier_code), status=VALUES(status), sync_batch_id=VALUES(sync_batch_id),
  417. sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
  418. """, batchId, now);
  419. yield return Cmd(
  420. """
  421. INSERT INTO mdp_std_purchase_request
  422. (tenant_id, factory_id, company_id, source_system, pr_no, pr_line, item_code, item_name, supplier_code, request_qty, request_date, send_date, arrive_date, status, source_biz_key, sync_batch_id, sync_time)
  423. SELECT tenant_id, factory_id, company_id, 'AIDOP',
  424. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.pr_billno')), source_row_id),
  425. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.line_no')),
  426. IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.item_number')), ''),
  427. IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.item_name')), ''),
  428. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.supplier_number')),
  429. COALESCE(
  430. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.qty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.qty')) AS DECIMAL(18,6)) END,
  431. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.request_qty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.request_qty')) AS DECIMAL(18,6)) END,
  432. 0),
  433. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.create_time')), 'null'), ''),
  434. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.send_date')), 'null'), ''),
  435. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.arrive_date')), 'null'), ''),
  436. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.status')),
  437. source_biz_key, @BatchId, @Now
  438. FROM mdp_stg_supply_demand
  439. WHERE source_table='srm_pr_main'
  440. ON DUPLICATE KEY UPDATE item_code=VALUES(item_code), item_name=VALUES(item_name), supplier_code=VALUES(supplier_code),
  441. request_qty=VALUES(request_qty), status=VALUES(status), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time),
  442. update_time=CURRENT_TIMESTAMP
  443. """, batchId, now);
  444. yield return Cmd(
  445. """
  446. INSERT INTO mdp_std_purchase_order
  447. (tenant_id, factory_id, company_id, source_system, po_no, po_line, po_type, supplier_code, item_code, item_name, order_qty, received_qty, returned_qty, due_date, need_date, order_date, status, buyer, work_order, source_biz_key, sync_batch_id, sync_time)
  448. SELECT d.tenant_id, d.factory_id, d.company_id, 'AIDOP',
  449. JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.PurOrd')),
  450. CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Line')) AS CHAR),
  451. JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Potype')),
  452. JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.Supp')),
  453. IFNULL(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.ItemNum')), ''),
  454. IFNULL(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(i.raw_data,'$.Descr')), JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Descr'))), ''),
  455. COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.QtyOrded')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.QtyOrded')) AS DECIMAL(18,6)) END, 0),
  456. COALESCE(
  457. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.QtyReceived')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.QtyReceived')) AS DECIMAL(18,6)) END,
  458. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.RctQty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.RctQty')) AS DECIMAL(18,6)) END,
  459. 0),
  460. COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.QtyReturned')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.QtyReturned')) AS DECIMAL(18,6)) END, 0),
  461. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.DueDate')), 'null'), ''),
  462. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.NeedDate')), 'null'), ''),
  463. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.OrdDate')), 'null'), ''),
  464. JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Status')),
  465. JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.Buyer')),
  466. JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.WorkOrd')),
  467. d.source_biz_key, @BatchId, @Now
  468. FROM mdp_stg_purchase_order d
  469. LEFT JOIN mdp_stg_purchase_order m ON m.source_table='PurOrdMaster'
  470. AND JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.PurOrd')) = JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.PurOrd'))
  471. LEFT JOIN mdp_stg_item i ON i.source_table='ItemMaster'
  472. AND JSON_UNQUOTE(JSON_EXTRACT(i.raw_data,'$.ItemNum')) = JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.ItemNum'))
  473. WHERE d.source_table='PurOrdDetail' AND IFNULL(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.PurOrd')), '') <> ''
  474. ON DUPLICATE KEY UPDATE po_type=VALUES(po_type), supplier_code=VALUES(supplier_code), item_code=VALUES(item_code),
  475. item_name=VALUES(item_name), order_qty=VALUES(order_qty), received_qty=VALUES(received_qty), returned_qty=VALUES(returned_qty),
  476. due_date=VALUES(due_date), need_date=VALUES(need_date), order_date=VALUES(order_date), status=VALUES(status),
  477. buyer=VALUES(buyer), work_order=VALUES(work_order), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time),
  478. update_time=CURRENT_TIMESTAMP
  479. """, batchId, now);
  480. yield return Cmd(
  481. """
  482. INSERT INTO mdp_std_delivery_schedule
  483. (tenant_id, source_system, delivery_plan_no, po_no, po_line, item_code, supplier_code, supplier_name, schedule_qty, sent_qty, rest_qty, return_qty, request_date, need_date, submit_date, last_sent_date, status, source_biz_key, sync_batch_id, sync_time)
  484. SELECT tenant_id, 'AIDOP',
  485. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.dsnum')),
  486. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ponumber')),
  487. CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.poline')) AS CHAR),
  488. IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.itemnum')), ''),
  489. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.suppliercode')),
  490. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.supplier')),
  491. COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.schedqty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.schedqty')) AS DECIMAL(18,6)) END,0),
  492. COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.sentqty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.sentqty')) AS DECIMAL(18,6)) END,0),
  493. COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.restqty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.restqty')) AS DECIMAL(18,6)) END,0),
  494. COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.returnqty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.returnqty')) AS DECIMAL(18,6)) END,0),
  495. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.requestdate')), 'null'), ''),
  496. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.needdate')), 'null'), ''),
  497. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.submitdate')), 'null'), ''),
  498. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.lastsentdate')), 'null'), ''),
  499. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.status')),
  500. source_biz_key, @BatchId, @Now
  501. FROM mdp_stg_delivery
  502. WHERE source_table='srm_polist_ds' AND IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.dsnum')), '') <> ''
  503. ON DUPLICATE KEY UPDATE po_no=VALUES(po_no), po_line=VALUES(po_line), item_code=VALUES(item_code), supplier_code=VALUES(supplier_code),
  504. supplier_name=VALUES(supplier_name), schedule_qty=VALUES(schedule_qty), sent_qty=VALUES(sent_qty), rest_qty=VALUES(rest_qty),
  505. return_qty=VALUES(return_qty), need_date=VALUES(need_date), status=VALUES(status), sync_batch_id=VALUES(sync_batch_id),
  506. sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
  507. """, batchId, now);
  508. yield return Cmd(
  509. """
  510. INSERT INTO mdp_std_delivery_result
  511. (tenant_id, source_system, delivery_no, delivery_line, delivery_plan_no, po_no, po_line, item_code, delivery_qty, receipt_qty, return_qty, receipt_date, qc_status, event_time, source_biz_key, sync_batch_id, sync_time)
  512. SELECT tenant_id, 'AIDOP',
  513. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.shddh')), source_row_id),
  514. CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.id')) AS CHAR),
  515. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.dsnum')),
  516. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.po_bill')),
  517. CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.po_billline')) AS CHAR),
  518. IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.itemnum')), ''),
  519. COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.sh_delivery_quantity')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.sh_delivery_quantity')) AS DECIMAL(18,6)) END,0),
  520. 0,
  521. 0,
  522. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.receipt_date')), 'null'), ''),
  523. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.qc_status')),
  524. COALESCE(NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.updatetime')), 'null'), ''), NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.createtime')), 'null'), '')),
  525. source_biz_key, @BatchId, @Now
  526. FROM mdp_stg_delivery
  527. WHERE source_table IN ('scm_shd','scm_shdzb')
  528. ON DUPLICATE KEY UPDATE delivery_qty=VALUES(delivery_qty), receipt_qty=VALUES(receipt_qty), return_qty=VALUES(return_qty),
  529. receipt_date=VALUES(receipt_date), event_time=VALUES(event_time), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time),
  530. update_time=CURRENT_TIMESTAMP
  531. """, batchId, now);
  532. yield return Cmd(
  533. """
  534. INSERT INTO mdp_std_material_readiness
  535. (tenant_id, source_system, work_order, op_code, item_code, component_item_code, required_qty, issued_qty, received_qty, available_qty, in_transit_qty, incoming_qty, shortage_qty, ready_status, need_date, supplier_code, source_biz_key, sync_batch_id, sync_time)
  536. SELECT tenant_id, 'AIDOP',
  537. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkOrd')),
  538. CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Op')) AS CHAR),
  539. IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.PMBOM')), ''),
  540. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ItemNum')), source_row_id),
  541. COALESCE(
  542. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.QtyRequired')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.QtyRequired')) AS DECIMAL(18,6)) END,
  543. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.QtyReq')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.QtyReq')) AS DECIMAL(18,6)) END,
  544. 0),
  545. COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.QtyIssued')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.QtyIssued')) AS DECIMAL(18,6)) END, 0),
  546. COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.QtyReceived')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.QtyReceived')) AS DECIMAL(18,6)) END, 0),
  547. 0, 0, 0,
  548. 0,
  549. 'READY',
  550. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.DueDate')), 'null'), ''),
  551. NULL,
  552. source_biz_key, @BatchId, @Now
  553. FROM mdp_stg_work_order_material
  554. WHERE source_table='WorkOrdDetail' AND IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkOrd')), '') <> ''
  555. ON DUPLICATE KEY UPDATE required_qty=VALUES(required_qty), received_qty=VALUES(received_qty), need_date=VALUES(need_date),
  556. sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
  557. """, batchId, now);
  558. yield return Cmd(
  559. """
  560. INSERT INTO mdp_std_process_outsource_order
  561. (tenant_id, source_system, work_order, op_code, routing_code, supplier_code, po_no, po_line, order_qty, completed_qty, due_date, status, source_biz_key, sync_batch_id, sync_time)
  562. SELECT tenant_id, 'AIDOP',
  563. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkOrd')), ''),
  564. CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Op')) AS CHAR),
  565. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.RoutingCode')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.RouteCode'))),
  566. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.SupplierCode')),
  567. NULL, NULL,
  568. COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.PackingQty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.PackingQty')) AS DECIMAL(18,6)) END, 0),
  569. 0,
  570. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.UpdatedAt')), 'null'), ''),
  571. CASE WHEN JSON_EXTRACT(raw_data,'$.IsEnabled') IN (1, true) THEN 'OPEN' ELSE 'DISABLED' END,
  572. source_biz_key, @BatchId, @Now
  573. FROM mdp_stg_work_order_material
  574. WHERE source_table='RoutingOpDetail'
  575. ON DUPLICATE KEY UPDATE routing_code=VALUES(routing_code), supplier_code=VALUES(supplier_code), order_qty=VALUES(order_qty),
  576. due_date=VALUES(due_date), status=VALUES(status), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time),
  577. update_time=CURRENT_TIMESTAMP
  578. """, batchId, now);
  579. }
  580. private IEnumerable<S3MdpSqlCommand> BuildDwdCommands(string batchId, DateTime now)
  581. {
  582. yield return Cmd(
  583. """
  584. INSERT INTO dwd_supply_demand
  585. (tenant_id, stat_date, demand_no, demand_line, demand_type, item_code, item_name, supplier_code, required_qty, fulfilled_qty, shortage_qty, required_date, demand_status, source_system, sync_batch_id, calc_time)
  586. SELECT tenant_id, @StatDate, demand_no, IFNULL(demand_line,''), demand_type, IFNULL(item_code,''), IFNULL(item_name,''), supplier_code,
  587. IFNULL(required_qty,0), 0, IFNULL(required_qty,0), required_date, status, source_system, @BatchId, @Now
  588. FROM mdp_std_supply_demand
  589. WHERE IFNULL(demand_no,'') <> ''
  590. ON DUPLICATE KEY UPDATE item_code=VALUES(item_code), item_name=VALUES(item_name), supplier_code=VALUES(supplier_code),
  591. required_qty=VALUES(required_qty), fulfilled_qty=VALUES(fulfilled_qty), shortage_qty=VALUES(shortage_qty),
  592. demand_status=VALUES(demand_status), sync_batch_id=VALUES(sync_batch_id), calc_time=VALUES(calc_time), update_time=CURRENT_TIMESTAMP
  593. """, batchId, now);
  594. yield return Cmd(
  595. """
  596. INSERT INTO dwd_supplier_delivery
  597. (tenant_id, stat_date, po_no, po_line, po_type, supplier_code, supplier_name, item_code, item_name, order_qty, schedule_qty, delivery_qty, receipt_qty, return_qty, remaining_qty, due_date, need_date, last_delivery_date, last_receipt_date, delivery_status, risk_level, source_system, sync_batch_id, calc_time)
  598. SELECT po.tenant_id, @StatDate, po.po_no, po.po_line, po.po_type, po.supplier_code, IFNULL(s.supplier_name, ds.supplier_name),
  599. IFNULL(po.item_code,''), IFNULL(po.item_name,''), IFNULL(po.order_qty,0), IFNULL(ds.schedule_qty,0), IFNULL(dr.delivery_qty,0),
  600. IFNULL(po.received_qty,0) + IFNULL(dr.receipt_qty,0), IFNULL(po.returned_qty,0) + IFNULL(dr.return_qty,0),
  601. GREATEST(IFNULL(po.order_qty,0) - IFNULL(po.received_qty,0) - IFNULL(dr.receipt_qty,0) - IFNULL(po.returned_qty,0), 0),
  602. po.due_date, COALESCE(ds.need_date, po.need_date), dr.event_time, dr.receipt_date,
  603. CASE
  604. WHEN IFNULL(po.order_qty,0) <= IFNULL(po.received_qty,0) + IFNULL(dr.receipt_qty,0) THEN 'COMPLETED'
  605. WHEN COALESCE(ds.need_date, po.need_date, po.due_date) < @Now THEN 'DELAYED'
  606. ELSE 'OPEN'
  607. END,
  608. CASE
  609. WHEN COALESCE(ds.need_date, po.need_date, po.due_date) < @Now
  610. AND IFNULL(po.order_qty,0) > IFNULL(po.received_qty,0) + IFNULL(dr.receipt_qty,0) THEN 'HIGH'
  611. WHEN IFNULL(po.order_qty,0) > IFNULL(po.received_qty,0) + IFNULL(dr.receipt_qty,0) THEN 'MEDIUM'
  612. ELSE 'LOW'
  613. END,
  614. 'AIDOP', @BatchId, @Now
  615. FROM mdp_std_purchase_order po
  616. LEFT JOIN (
  617. SELECT tenant_id, po_no, po_line, MIN(supplier_name) AS supplier_name, SUM(IFNULL(schedule_qty,0)) AS schedule_qty, MIN(need_date) AS need_date
  618. FROM mdp_std_delivery_schedule
  619. WHERE IFNULL(po_no,'') <> '' AND IFNULL(po_line,'') <> ''
  620. GROUP BY tenant_id, po_no, po_line
  621. ) ds ON po.tenant_id=ds.tenant_id AND po.po_no=ds.po_no AND po.po_line=ds.po_line
  622. LEFT JOIN (
  623. SELECT tenant_id, po_no, po_line, SUM(IFNULL(delivery_qty,0)) AS delivery_qty, SUM(IFNULL(receipt_qty,0)) AS receipt_qty,
  624. SUM(IFNULL(return_qty,0)) AS return_qty, MAX(event_time) AS event_time, MAX(receipt_date) AS receipt_date
  625. FROM mdp_std_delivery_result
  626. WHERE IFNULL(po_no,'') <> '' AND IFNULL(po_line,'') <> ''
  627. GROUP BY tenant_id, po_no, po_line
  628. ) dr ON po.tenant_id=dr.tenant_id AND po.po_no=dr.po_no AND po.po_line=dr.po_line
  629. LEFT JOIN (
  630. SELECT tenant_id, supplier_code, MAX(supplier_name) AS supplier_name
  631. FROM mdp_std_supplier
  632. WHERE IFNULL(supplier_code,'') <> ''
  633. GROUP BY tenant_id, supplier_code
  634. ) s ON po.tenant_id=s.tenant_id AND po.supplier_code=s.supplier_code
  635. WHERE IFNULL(po.po_no,'') <> '' AND IFNULL(po.po_line,'') <> ''
  636. ON DUPLICATE KEY UPDATE supplier_name=VALUES(supplier_name), item_code=VALUES(item_code), item_name=VALUES(item_name),
  637. order_qty=VALUES(order_qty), schedule_qty=VALUES(schedule_qty), delivery_qty=VALUES(delivery_qty), receipt_qty=VALUES(receipt_qty),
  638. return_qty=VALUES(return_qty), remaining_qty=VALUES(remaining_qty), delivery_status=VALUES(delivery_status),
  639. risk_level=VALUES(risk_level), sync_batch_id=VALUES(sync_batch_id), calc_time=VALUES(calc_time), update_time=CURRENT_TIMESTAMP
  640. """, batchId, now);
  641. yield return Cmd("DELETE FROM dwd_supplier_risk WHERE stat_date=@StatDate", batchId, now);
  642. yield return Cmd(
  643. """
  644. INSERT INTO dwd_supplier_risk
  645. (tenant_id, stat_date, supplier_code, supplier_name, item_code, risk_type, risk_level, risk_count, risk_qty, risk_reason, source_system, calc_batch_id, calc_time)
  646. SELECT tenant_id, stat_date, IFNULL(supplier_code,''), supplier_name, IFNULL(item_code,''),
  647. 'DELIVERY_DELAY', risk_level, COUNT(1), SUM(IFNULL(remaining_qty,0)),
  648. '供应交付存在延期或未完成风险', 'AIDOP', @BatchId, @Now
  649. FROM dwd_supplier_delivery
  650. WHERE stat_date=@StatDate AND risk_level IN ('HIGH','MEDIUM') AND IFNULL(supplier_code,'') <> ''
  651. GROUP BY tenant_id, stat_date, supplier_code, supplier_name, item_code, risk_level
  652. ON DUPLICATE KEY UPDATE risk_level=VALUES(risk_level), risk_count=VALUES(risk_count), risk_qty=VALUES(risk_qty),
  653. risk_reason=VALUES(risk_reason), calc_batch_id=VALUES(calc_batch_id), calc_time=VALUES(calc_time), update_time=CURRENT_TIMESTAMP
  654. """, batchId, now);
  655. yield return Cmd(
  656. """
  657. INSERT INTO dwd_process_outsource_delivery
  658. (tenant_id, stat_date, work_order, op_code, routing_code, supplier_code, supplier_name, po_no, po_line, order_qty, completed_qty, remaining_qty, due_date, delivery_status, risk_level, source_system, calc_batch_id, calc_time)
  659. SELECT o.tenant_id, @StatDate, o.work_order, o.op_code, o.routing_code, o.supplier_code, s.supplier_name, o.po_no, IFNULL(o.po_line,''),
  660. IFNULL(o.order_qty,0), IFNULL(o.completed_qty,0), GREATEST(IFNULL(o.order_qty,0)-IFNULL(o.completed_qty,0),0),
  661. o.due_date,
  662. CASE WHEN IFNULL(o.completed_qty,0) >= IFNULL(o.order_qty,0) AND IFNULL(o.order_qty,0) > 0 THEN 'COMPLETED'
  663. WHEN o.due_date < @Now THEN 'DELAYED' ELSE IFNULL(o.status,'OPEN') END,
  664. CASE WHEN o.due_date < @Now AND IFNULL(o.completed_qty,0) < IFNULL(o.order_qty,0) THEN 'HIGH'
  665. WHEN IFNULL(o.completed_qty,0) < IFNULL(o.order_qty,0) THEN 'MEDIUM' ELSE 'LOW' END,
  666. 'AIDOP', @BatchId, @Now
  667. FROM mdp_std_process_outsource_order o
  668. LEFT JOIN (
  669. SELECT tenant_id, supplier_code, MAX(supplier_name) AS supplier_name
  670. FROM mdp_std_supplier
  671. WHERE IFNULL(supplier_code,'') <> ''
  672. GROUP BY tenant_id, supplier_code
  673. ) s ON o.tenant_id=s.tenant_id AND o.supplier_code=s.supplier_code
  674. WHERE IFNULL(o.work_order,'') <> '' OR IFNULL(o.routing_code,'') <> ''
  675. ON DUPLICATE KEY UPDATE supplier_code=VALUES(supplier_code), supplier_name=VALUES(supplier_name), order_qty=VALUES(order_qty),
  676. completed_qty=VALUES(completed_qty), remaining_qty=VALUES(remaining_qty), due_date=VALUES(due_date),
  677. delivery_status=VALUES(delivery_status), risk_level=VALUES(risk_level), calc_batch_id=VALUES(calc_batch_id),
  678. calc_time=VALUES(calc_time), update_time=CURRENT_TIMESTAMP
  679. """, batchId, now);
  680. }
  681. private async Task<int> UpsertMaterialReadinessDwdAsync(string batchId, DateTime now)
  682. {
  683. return await _db.Ado.ExecuteCommandAsync(
  684. """
  685. INSERT INTO dwd_material_readiness
  686. (tenant_id, stat_date, work_order, op_code, parent_item_code, component_item_code, component_item_name, required_qty, cumulative_required_qty, stock_available_qty, qc_pending_qty, in_transit_qty, delivery_reply_qty, available_qty, shortage_qty, ready_status, supplier_code, supplier_name, need_date, calc_batch_id, calc_time)
  687. SELECT s.tenant_id, @StatDate, s.work_order, s.op_code, s.item_code, s.component_item_code, IFNULL(i.item_name, ''),
  688. IFNULL(s.required_qty, 0), IFNULL(s.required_qty, 0), IFNULL(s.available_qty, 0), 0, IFNULL(s.in_transit_qty, 0),
  689. IFNULL(s.incoming_qty, 0), IFNULL(s.available_qty, 0), IFNULL(s.shortage_qty, 0),
  690. CASE WHEN IFNULL(s.shortage_qty, 0) > 0 THEN 'SHORTAGE' ELSE 'READY' END,
  691. s.supplier_code, IFNULL(sp.supplier_name, ''), s.need_date, @BatchId, @Now
  692. FROM mdp_std_material_readiness s
  693. LEFT JOIN mdp_std_item i ON s.tenant_id=i.tenant_id AND s.component_item_code=i.item_code
  694. LEFT JOIN mdp_std_supplier sp ON s.tenant_id=sp.tenant_id AND s.supplier_code=sp.supplier_code
  695. WHERE IFNULL(s.work_order, '') <> ''
  696. ON DUPLICATE KEY UPDATE component_item_name=VALUES(component_item_name), required_qty=VALUES(required_qty),
  697. cumulative_required_qty=VALUES(cumulative_required_qty), stock_available_qty=VALUES(stock_available_qty),
  698. qc_pending_qty=VALUES(qc_pending_qty), in_transit_qty=VALUES(in_transit_qty), delivery_reply_qty=VALUES(delivery_reply_qty),
  699. available_qty=VALUES(available_qty), shortage_qty=VALUES(shortage_qty), ready_status=VALUES(ready_status),
  700. supplier_code=VALUES(supplier_code), supplier_name=VALUES(supplier_name), need_date=VALUES(need_date),
  701. calc_batch_id=VALUES(calc_batch_id), calc_time=VALUES(calc_time), update_time=CURRENT_TIMESTAMP
  702. """,
  703. new SugarParameter("@StatDate", now.Date),
  704. new SugarParameter("@BatchId", batchId),
  705. new SugarParameter("@Now", now));
  706. }
  707. private async Task<int> DeleteCurrentShortageAsync(DateTime statDate)
  708. {
  709. return await _db.Ado.ExecuteCommandAsync(
  710. """
  711. DELETE s
  712. FROM dwd_material_shortage s
  713. JOIN (
  714. SELECT DISTINCT work_order
  715. FROM mdp_std_material_readiness
  716. WHERE IFNULL(work_order, '') <> ''
  717. ) w ON s.work_order = w.work_order
  718. WHERE s.stat_date = @StatDate
  719. """,
  720. new SugarParameter("@StatDate", statDate));
  721. }
  722. private async Task<int> InsertMaterialShortageAsync(string batchId, DateTime now)
  723. {
  724. return await _db.Ado.ExecuteCommandAsync(
  725. """
  726. INSERT INTO dwd_material_shortage
  727. (tenant_id, stat_date, work_order, op_code, component_item_code, shortage_qty, shortage_reason, expected_supply_date, supplier_code, related_po_no, risk_level, calc_batch_id, calc_time)
  728. SELECT s.tenant_id, @StatDate, s.work_order, s.op_code, s.component_item_code, IFNULL(s.shortage_qty, 0),
  729. '标准层缺料数量大于 0', s.need_date, s.supplier_code, NULL,
  730. CASE WHEN IFNULL(s.shortage_qty, 0) > IFNULL(s.required_qty, 0) THEN 'HIGH'
  731. WHEN IFNULL(s.shortage_qty, 0) > 0 THEN 'MEDIUM' ELSE 'LOW' END,
  732. @BatchId, @Now
  733. FROM mdp_std_material_readiness s
  734. WHERE IFNULL(s.work_order, '') <> '' AND IFNULL(s.shortage_qty, 0) > 0
  735. """,
  736. new SugarParameter("@StatDate", now.Date),
  737. new SugarParameter("@BatchId", batchId),
  738. new SugarParameter("@Now", now));
  739. }
  740. private async Task<long> InsertSyncLogAsync(long entityId, string entityName, string batchId, int rowsRead)
  741. {
  742. await _db.Ado.ExecuteCommandAsync(
  743. """
  744. INSERT INTO mdp_sync_log
  745. (tenant_id, entity_id, source_code, entity_name, sync_batch_id, sync_type, trigger_type, sync_start, rows_read, status)
  746. VALUES (0, @EntityId, 'AIDOPDEV_MYSQL', @EntityName, @BatchId, 'FULL', 'AUTO', NOW(), @RowsRead, 'RUNNING')
  747. """,
  748. new SugarParameter("@EntityId", entityId),
  749. new SugarParameter("@EntityName", entityName),
  750. new SugarParameter("@BatchId", batchId),
  751. new SugarParameter("@RowsRead", rowsRead));
  752. return await _db.Ado.GetLongAsync(
  753. "SELECT id FROM mdp_sync_log WHERE sync_batch_id=@BatchId AND entity_id=@EntityId ORDER BY id DESC LIMIT 1",
  754. new List<SugarParameter>
  755. {
  756. new("@BatchId", batchId),
  757. new("@EntityId", entityId)
  758. });
  759. }
  760. private async Task MarkSyncLogSuccessAsync(long logId, DateTime started, int affected)
  761. {
  762. await _db.Ado.ExecuteCommandAsync(
  763. """
  764. UPDATE mdp_sync_log
  765. SET sync_end=NOW(), duration_ms=@DurationMs, rows_insert=@RowsInsert, rows_update=0, rows_skip=0, rows_error=0, status='SUCCESS'
  766. WHERE id=@Id
  767. """,
  768. new SugarParameter("@DurationMs", (int)(DateTime.Now - started).TotalMilliseconds),
  769. new SugarParameter("@RowsInsert", affected),
  770. new SugarParameter("@Id", logId));
  771. }
  772. private async Task MarkSyncLogFailedAsync(long logId, DateTime started, string message)
  773. {
  774. try
  775. {
  776. await _db.Ado.ExecuteCommandAsync(
  777. """
  778. UPDATE mdp_sync_log
  779. SET sync_end=NOW(), duration_ms=@DurationMs, rows_error=1, status='FAILED', error_msg=@ErrorMsg
  780. WHERE id=@Id
  781. """,
  782. new SugarParameter("@DurationMs", (int)(DateTime.Now - started).TotalMilliseconds),
  783. new SugarParameter("@ErrorMsg", message.Length <= 1000 ? message : message[..1000]),
  784. new SugarParameter("@Id", logId));
  785. }
  786. catch (Exception ex)
  787. {
  788. // 写库自身失败兜底:避免再抛掩盖原异常;遗留 RUNNING 行可由运维手动清理
  789. Console.Error.WriteLine($"[S3MdpSyncTransform] MarkSyncLogFailed write failed (syncLogId={logId}): {ex.Message}");
  790. }
  791. }
  792. private async Task<long> InsertTransformRunLogAsync(string batchId, DateTime startedAt, string triggerType)
  793. {
  794. await _db.Ado.ExecuteCommandAsync(
  795. """
  796. INSERT INTO mdp_transform_run_log
  797. (tenant_id, job_code, job_name, trigger_type, batch_id, status, start_time)
  798. VALUES (0, 'S3_MDP_SYNC_TRANSFORM', 'S3 MDP同步与标准化转换', @TriggerType, @BatchId, 'RUNNING', @StartTime)
  799. """,
  800. new SugarParameter("@TriggerType", NormalizeTriggerType(triggerType)),
  801. new SugarParameter("@BatchId", batchId),
  802. new SugarParameter("@StartTime", startedAt));
  803. return await _db.Ado.GetLongAsync(
  804. "SELECT id FROM mdp_transform_run_log WHERE batch_id=@BatchId ORDER BY id DESC LIMIT 1",
  805. new List<SugarParameter> { new("@BatchId", batchId) });
  806. }
  807. private async Task MarkTransformRunSuccessAsync(long runLogId, DateTime startedAt, S3MdpSyncTransformResult result)
  808. {
  809. var finishedAt = DateTime.Now;
  810. await _db.Ado.ExecuteCommandAsync(
  811. """
  812. UPDATE mdp_transform_run_log
  813. SET status='SUCCESS', end_time=@EndTime, duration_ms=@DurationMs,
  814. stage_rows=@StageRows, standard_rows=@StandardRows, dwd_rows=@DwdRows,
  815. summary_json=@SummaryJson, update_time=CURRENT_TIMESTAMP
  816. WHERE id=@Id
  817. """,
  818. new SugarParameter("@EndTime", finishedAt),
  819. new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds),
  820. new SugarParameter("@StageRows", result.StageRows),
  821. new SugarParameter("@StandardRows", result.StandardRows),
  822. new SugarParameter("@DwdRows", result.DwdRows),
  823. new SugarParameter("@SummaryJson", BuildRunSummaryJson(result)),
  824. new SugarParameter("@Id", runLogId));
  825. }
  826. private async Task MarkTransformRunFailedAsync(long runLogId, DateTime startedAt, string message)
  827. {
  828. try
  829. {
  830. var finishedAt = DateTime.Now;
  831. await _db.Ado.ExecuteCommandAsync(
  832. """
  833. UPDATE mdp_transform_run_log
  834. SET status='FAILED', end_time=@EndTime, duration_ms=@DurationMs,
  835. error_message=@ErrorMessage, update_time=CURRENT_TIMESTAMP
  836. WHERE id=@Id
  837. """,
  838. new SugarParameter("@EndTime", finishedAt),
  839. new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds),
  840. new SugarParameter("@ErrorMessage", Truncate(message, 2000)),
  841. new SugarParameter("@Id", runLogId));
  842. }
  843. catch (Exception ex)
  844. {
  845. // 写库自身失败兜底(典型场景:远端 MySQL 瞬断导致 MarkFailed 自身也连不上)
  846. Console.Error.WriteLine($"[S3MdpSyncTransform] MarkTransformRunFailed write failed (runLogId={runLogId}): {ex.Message}");
  847. }
  848. }
  849. private static S3MdpSqlCommand Cmd(string sql, string batchId, DateTime now)
  850. {
  851. return new S3MdpSqlCommand(sql, new[]
  852. {
  853. new SugarParameter("@BatchId", batchId),
  854. new SugarParameter("@Now", now),
  855. new SugarParameter("@StatDate", now.Date)
  856. });
  857. }
  858. private static string BuildJsonObjectExpression(IEnumerable<string> columns)
  859. {
  860. var parts = columns.SelectMany(c => new[] { $"'{c.Replace("'", "''")}'", $"s.`{c}`" });
  861. return $"JSON_OBJECT({string.Join(",", parts)})";
  862. }
  863. private static string FindColumn(IEnumerable<string> columns, string expected)
  864. {
  865. return columns.First(u => string.Equals(u, expected, StringComparison.OrdinalIgnoreCase));
  866. }
  867. private static string NormalizeTriggerType(string? triggerType)
  868. {
  869. return string.IsNullOrWhiteSpace(triggerType) ? "AUTO" : triggerType.Trim().ToUpperInvariant();
  870. }
  871. private static string BuildRunSummaryJson(S3MdpSyncTransformResult result)
  872. {
  873. return $$"""{"batchId":"{{result.BatchId}}","stageRows":{{result.StageRows}},"standardRows":{{result.StandardRows}},"dwdRows":{{result.DwdRows}},"kpiRows":{{result.KpiRows}}}""";
  874. }
  875. private static string ResolveKpiValueTable(int metricLevel)
  876. {
  877. return metricLevel switch
  878. {
  879. 1 => "ado_s9_kpi_value_l1_day",
  880. 2 => "ado_s9_kpi_value_l2_day",
  881. 3 => "ado_s9_kpi_value_l3_day",
  882. 4 => "ado_s9_kpi_value_l4_day",
  883. _ => "ado_s9_kpi_value_l2_day"
  884. };
  885. }
  886. private static decimal DefaultS3Target(string metricCode)
  887. {
  888. return metricCode switch
  889. {
  890. "S3_L1_001" => 15m,
  891. "S3_L1_002" => 95m,
  892. "S3_L2_004" => 10.5m,
  893. "S3_L2_005" => 96.72m,
  894. "S3_L3_001" => 5m,
  895. "S3_L3_002" => 7m,
  896. "S3_L3_003" => 92m,
  897. "S3_L3_004" => 90m,
  898. "S3_L3_005" => 15m,
  899. "S3_L3_006" => 85m,
  900. _ => 0m
  901. };
  902. }
  903. private static string ResolveKpiStatus(decimal actual, decimal target, string? direction, decimal? yellowThreshold, decimal? redThreshold)
  904. {
  905. if (target <= 0) return "gray";
  906. var ratio = actual / target * 100m;
  907. if (string.Equals(direction, "lower_is_better", StringComparison.OrdinalIgnoreCase))
  908. {
  909. if (actual <= target) return "green";
  910. if (ratio <= (yellowThreshold ?? 110m)) return "yellow";
  911. return ratio >= (redThreshold ?? 120m) ? "red" : "yellow";
  912. }
  913. if (actual >= target) return "green";
  914. if (ratio >= (yellowThreshold ?? 95m)) return "yellow";
  915. return ratio <= (redThreshold ?? 80m) ? "red" : "yellow";
  916. }
  917. private static string ResolveTrendFlag(decimal actual, decimal? previous)
  918. {
  919. if (previous == null) return "flat";
  920. if (actual > previous.Value) return "up";
  921. if (actual < previous.Value) return "down";
  922. return "flat";
  923. }
  924. private static string Truncate(string? raw, int maxLength)
  925. {
  926. if (string.IsNullOrEmpty(raw)) return string.Empty;
  927. return raw.Length <= maxLength ? raw : raw[..maxLength];
  928. }
  929. private sealed class S3ColumnRow
  930. {
  931. public string ColumnName { get; set; } = string.Empty;
  932. }
  933. private sealed class S3MdpEntityRow
  934. {
  935. public long Id { get; set; }
  936. public string EntityName { get; set; } = string.Empty;
  937. }
  938. private sealed class S3KpiCalcRow
  939. {
  940. public long TenantId { get; set; }
  941. public long FactoryId { get; set; }
  942. public string MetricCode { get; set; } = string.Empty;
  943. public decimal? MetricValue { get; set; }
  944. }
  945. private sealed class S3KpiMetaRow
  946. {
  947. public int MetricLevel { get; set; }
  948. public string Direction { get; set; } = "higher_is_better";
  949. public decimal? YellowThreshold { get; set; }
  950. public decimal? RedThreshold { get; set; }
  951. }
  952. private sealed class S3KpiValueRow
  953. {
  954. public long Id { get; set; }
  955. public decimal? MetricValue { get; set; }
  956. public decimal? TargetValue { get; set; }
  957. }
  958. }
  959. public sealed class S3MdpSyncTransformResult
  960. {
  961. public long RunLogId { get; set; }
  962. public string BatchId { get; set; } = string.Empty;
  963. public int StageRows { get; set; }
  964. public int StandardRows { get; set; }
  965. public int DwdRows { get; set; }
  966. public int KpiRows { get; set; }
  967. public int AtomicRows { get; set; }
  968. }
  969. public sealed class S3MaterialRefreshResult
  970. {
  971. public string BatchId { get; set; } = string.Empty;
  972. public int StdCount { get; set; }
  973. public int ReadinessCount { get; set; }
  974. public int ShortageCount { get; set; }
  975. }
  976. internal sealed record S3MdpSqlCommand(string Sql, SugarParameter[] Parameters);
  977. internal sealed record S3MdpEntityConfig(
  978. string EntityCode,
  979. string SourceTable,
  980. string TargetTable,
  981. string SourceRowIdExpression,
  982. string SourceBizKeyExpression)
  983. {
  984. public static readonly IReadOnlyList<S3MdpEntityConfig> All = new List<S3MdpEntityConfig>
  985. {
  986. new("S3_SUPPLIER", "SuppMaster", "mdp_stg_supplier", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`Supp`,''))"),
  987. new("S3_CONSIGNEE_SUPPLIER", "ConsigneeAddressMaster", "mdp_stg_supplier", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`Address`,''))"),
  988. new("S3_ITEM_ERP", "ItemMaster", "mdp_stg_item", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`ItemNum`,''))"),
  989. new("S3_ITEM_NEW", "ic_item", "mdp_stg_item", "Id", "CONCAT(IFNULL(s.`tenant_id`,0), ':', IFNULL(s.`number`,''))"),
  990. new("S3_SOURCE_LIST", "srm_purchase", "mdp_stg_source_list", "Id", "CONCAT(IFNULL(s.`number`,''), ':', IFNULL(s.`supplier_number`,''))"),
  991. new("S3_SUPPLY_DEMAND", "ic_demandschedule", "mdp_stg_supply_demand", "Id", "CAST(s.`Id` AS CHAR)"),
  992. new("S3_PURCHASE_REQUEST", "srm_pr_main", "mdp_stg_supply_demand", "Id", "COALESCE(s.`pr_billno`, CAST(s.`Id` AS CHAR))"),
  993. new("S3_PURCHASE_ORDER_MASTER", "PurOrdMaster", "mdp_stg_purchase_order", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`PurOrd`,''))"),
  994. new("S3_PURCHASE_ORDER_DETAIL", "PurOrdDetail", "mdp_stg_purchase_order", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`PurOrd`,''), ':', IFNULL(s.`Line`,''))"),
  995. new("S3_DELIVERY_PLAN", "srm_polist_ds", "mdp_stg_delivery", "Id", "s.`dsnum`"),
  996. new("S3_SHIPPER_MASTER", "scm_shd", "mdp_stg_delivery", "id", "COALESCE(s.`shddh`, CAST(s.`id` AS CHAR))"),
  997. new("S3_SHIPPER_DETAIL", "scm_shdzb", "mdp_stg_delivery", "id", "CONCAT(IFNULL(s.`glid`,''), ':', IFNULL(s.`id`,''))"),
  998. new("S3_RECEIPT_MASTER", "PurOrdRctMaster", "mdp_stg_receipt", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`Receiver`,''))"),
  999. new("S3_RECEIPT_DETAIL", "PurOrdRctDetail", "mdp_stg_receipt", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`Receiver`,''), ':', IFNULL(s.`Line`,''))"),
  1000. new("S3_WORK_ORDER_MASTER", "WorkOrdMaster", "mdp_stg_work_order_material", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`WorkOrd`,''))"),
  1001. new("S3_WORK_ORDER_DETAIL", "WorkOrdDetail", "mdp_stg_work_order_material", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`WorkOrd`,''), ':', IFNULL(s.`Op`,''), ':', IFNULL(s.`ItemNum`,''))"),
  1002. new("S3_WORK_ORDER_ROUTING", "WorkOrdRouting", "mdp_stg_work_order_material", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`WorkOrd`,''), ':', IFNULL(s.`OP`,''))"),
  1003. new("S3_ROUTING_OUTSOURCE", "RoutingOpDetail", "mdp_stg_work_order_material", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`RoutingCode`,''), ':', IFNULL(s.`Op`,''), ':', IFNULL(s.`SupplierCode`,''))"),
  1004. new("S3_INVENTORY", "InvMaster", "mdp_stg_work_order_material", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`ItemNum`,''), ':', IFNULL(s.`Location`,''))")
  1005. };
  1006. }