S3MdpSyncTransformService.cs 64 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067
  1. namespace Admin.NET.Plugin.AiDOP.Supply;
  2. /// <summary>
  3. /// S3 首批 MDP 同步和标准化转换服务。
  4. /// </summary>
  5. public class S3MdpSyncTransformService : ITransient
  6. {
  7. private readonly ISqlSugarClient _db;
  8. public S3MdpSyncTransformService(ISqlSugarClient db)
  9. {
  10. _db = db;
  11. }
  12. public async Task<S3MdpSyncTransformResult> RunFullAsync(CancellationToken cancellationToken = default, string triggerType = "AUTO")
  13. {
  14. cancellationToken.ThrowIfCancellationRequested();
  15. var now = DateTime.Now;
  16. var batchId = $"S3_MDP_FULL_{now:yyyyMMddHHmmss}";
  17. var runLogId = await InsertTransformRunLogAsync(batchId, now, triggerType);
  18. var result = new S3MdpSyncTransformResult { BatchId = batchId, RunLogId = runLogId };
  19. try
  20. {
  21. result.StageRows = await SyncStagingAsync(batchId, now, cancellationToken);
  22. result.StandardRows = await TransformStandardAsync(batchId, now, cancellationToken);
  23. result.DwdRows = await BuildDwdAsync(batchId, now, cancellationToken);
  24. result.KpiRows = await BuildS3KpiValuesAsync(now, cancellationToken);
  25. await MarkTransformRunSuccessAsync(runLogId, now, result);
  26. return result;
  27. }
  28. catch (Exception ex)
  29. {
  30. await MarkTransformRunFailedAsync(runLogId, now, ex.Message);
  31. throw;
  32. }
  33. }
  34. public async Task<S3MaterialRefreshResult> RefreshMaterialReadinessAsync(string? batchId = null, CancellationToken cancellationToken = default)
  35. {
  36. cancellationToken.ThrowIfCancellationRequested();
  37. var now = DateTime.Now;
  38. batchId ??= $"S3_MATERIAL_{now:yyyyMMddHHmmss}";
  39. _db.Ado.BeginTran();
  40. try
  41. {
  42. var stdCount = await _db.Ado.GetIntAsync("SELECT COUNT(1) FROM mdp_std_material_readiness WHERE IFNULL(work_order, '') <> ''");
  43. var readinessCount = await UpsertMaterialReadinessDwdAsync(batchId, now);
  44. await DeleteCurrentShortageAsync(now.Date);
  45. var shortageCount = await InsertMaterialShortageAsync(batchId, now);
  46. _db.Ado.CommitTran();
  47. return new S3MaterialRefreshResult
  48. {
  49. BatchId = batchId,
  50. StdCount = stdCount,
  51. ReadinessCount = readinessCount,
  52. ShortageCount = shortageCount
  53. };
  54. }
  55. catch
  56. {
  57. _db.Ado.RollbackTran();
  58. throw;
  59. }
  60. }
  61. private async Task<int> SyncStagingAsync(string batchId, DateTime now, CancellationToken cancellationToken)
  62. {
  63. var total = 0;
  64. foreach (var entity in S3MdpEntityConfig.All)
  65. {
  66. cancellationToken.ThrowIfCancellationRequested();
  67. total += await SyncOneEntityAsync(entity, batchId, now);
  68. }
  69. return total;
  70. }
  71. private async Task<int> SyncOneEntityAsync(S3MdpEntityConfig entity, string batchId, DateTime now)
  72. {
  73. var entityRow = await _db.Ado.SqlQuerySingleAsync<S3MdpEntityRow>(
  74. "SELECT id AS Id, entity_name AS EntityName FROM mdp_entity WHERE tenant_id=0 AND entity_code=@EntityCode LIMIT 1",
  75. new SugarParameter("@EntityCode", entity.EntityCode));
  76. if (entityRow == null) throw Oops.Oh($"未找到 MDP 实体配置:{entity.EntityCode}");
  77. var columns = await _db.Ado.SqlQueryAsync<S3ColumnRow>(
  78. """
  79. SELECT COLUMN_NAME AS ColumnName
  80. FROM information_schema.COLUMNS
  81. WHERE TABLE_SCHEMA=DATABASE() AND TABLE_NAME=@TableName
  82. ORDER BY ORDINAL_POSITION
  83. """,
  84. new SugarParameter("@TableName", entity.SourceTable));
  85. if (columns.Count == 0) throw Oops.Oh($"未找到源表:{entity.SourceTable}");
  86. var names = columns.Select(u => u.ColumnName).ToList();
  87. var tenantExpr = names.Any(u => string.Equals(u, "tenant_id", StringComparison.OrdinalIgnoreCase))
  88. ? $"IFNULL(s.`{FindColumn(names, "tenant_id")}`,0)"
  89. : "0";
  90. var sourceRowExpr = names.Any(u => string.Equals(u, entity.SourceRowIdExpression, StringComparison.OrdinalIgnoreCase))
  91. ? $"s.`{FindColumn(names, entity.SourceRowIdExpression)}`"
  92. : entity.SourceRowIdExpression;
  93. var rawDataExpr = BuildJsonObjectExpression(names);
  94. var rowsRead = await _db.Ado.GetIntAsync($"SELECT COUNT(1) FROM `{entity.SourceTable}`");
  95. var logId = await InsertSyncLogAsync(entityRow.Id, entityRow.EntityName, batchId, rowsRead);
  96. var started = DateTime.Now;
  97. try
  98. {
  99. var affected = await _db.Ado.ExecuteCommandAsync(
  100. $"""
  101. INSERT INTO `{entity.TargetTable}`
  102. (tenant_id, source_system, source_table, source_row_id, source_biz_key, sync_batch_id, sync_time, process_status, raw_data)
  103. SELECT
  104. {tenantExpr},
  105. 'AIDOP',
  106. @SourceTable,
  107. CAST({sourceRowExpr} AS CHAR),
  108. CAST(COALESCE({entity.SourceBizKeyExpression}, CAST({sourceRowExpr} AS CHAR)) AS CHAR),
  109. @BatchId,
  110. @Now,
  111. 'PENDING',
  112. {rawDataExpr}
  113. FROM `{entity.SourceTable}` s
  114. ON DUPLICATE KEY UPDATE
  115. source_row_id=VALUES(source_row_id),
  116. sync_batch_id=VALUES(sync_batch_id),
  117. sync_time=VALUES(sync_time),
  118. process_status=VALUES(process_status),
  119. raw_data=VALUES(raw_data),
  120. update_time=CURRENT_TIMESTAMP
  121. """,
  122. new SugarParameter("@SourceTable", entity.SourceTable),
  123. new SugarParameter("@BatchId", batchId),
  124. new SugarParameter("@Now", now));
  125. await MarkSyncLogSuccessAsync(logId, started, affected);
  126. return rowsRead;
  127. }
  128. catch (Exception ex)
  129. {
  130. await MarkSyncLogFailedAsync(logId, started, ex.Message);
  131. throw;
  132. }
  133. }
  134. private async Task<int> TransformStandardAsync(string batchId, DateTime now, CancellationToken cancellationToken)
  135. {
  136. var total = 0;
  137. foreach (var command in BuildStandardCommands(batchId, now))
  138. {
  139. cancellationToken.ThrowIfCancellationRequested();
  140. total += await _db.Ado.ExecuteCommandAsync(command.Sql, command.Parameters);
  141. }
  142. return total;
  143. }
  144. private async Task<int> BuildDwdAsync(string batchId, DateTime now, CancellationToken cancellationToken)
  145. {
  146. var total = 0;
  147. foreach (var command in BuildDwdCommands(batchId, now))
  148. {
  149. cancellationToken.ThrowIfCancellationRequested();
  150. total += await _db.Ado.ExecuteCommandAsync(command.Sql, command.Parameters);
  151. }
  152. total += await UpsertMaterialReadinessDwdAsync(batchId, now);
  153. await DeleteCurrentShortageAsync(now.Date);
  154. total += await InsertMaterialShortageAsync(batchId, now);
  155. return total;
  156. }
  157. private async Task<int> BuildS3KpiValuesAsync(DateTime now, CancellationToken cancellationToken)
  158. {
  159. var statDate = now.Date;
  160. var rows = await CalculateS3KpiValuesAsync(statDate);
  161. var affected = 0;
  162. foreach (var row in rows)
  163. {
  164. cancellationToken.ThrowIfCancellationRequested();
  165. affected += await UpsertS3KpiValueAsync(row, statDate, now);
  166. }
  167. return affected;
  168. }
  169. private async Task<List<S3KpiCalcRow>> CalculateS3KpiValuesAsync(DateTime statDate)
  170. {
  171. return await _db.Ado.SqlQueryAsync<S3KpiCalcRow>(
  172. """
  173. SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S3_L2_004' AS MetricCode,
  174. ROUND(AVG(TIMESTAMPDIFF(HOUR, request_date, submit_date) / 24), 4) AS MetricValue
  175. FROM mdp_std_delivery_schedule
  176. WHERE request_date IS NOT NULL AND submit_date IS NOT NULL AND submit_date >= request_date
  177. GROUP BY tenant_id
  178. UNION ALL
  179. SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S3_L2_005' AS MetricCode,
  180. ROUND(100 * SUM(CASE WHEN IFNULL(schedule_qty,0) >= IFNULL(order_qty,0) AND IFNULL(order_qty,0) > 0 THEN 1 ELSE 0 END) / COUNT(1), 4) AS MetricValue
  181. FROM dwd_supplier_delivery
  182. WHERE stat_date=@StatDate AND IFNULL(order_qty,0) > 0
  183. GROUP BY tenant_id
  184. UNION ALL
  185. SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S3_L1_001' AS MetricCode,
  186. ROUND(AVG(TIMESTAMPDIFF(HOUR, request_date, submit_date) / 24), 4) AS MetricValue
  187. FROM mdp_std_delivery_schedule
  188. WHERE request_date IS NOT NULL AND submit_date IS NOT NULL AND submit_date >= request_date
  189. GROUP BY tenant_id
  190. UNION ALL
  191. SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S3_L1_002' AS MetricCode,
  192. ROUND(100 * SUM(CASE WHEN IFNULL(schedule_qty,0) >= IFNULL(order_qty,0) AND IFNULL(order_qty,0) > 0 THEN 1 ELSE 0 END) / COUNT(1), 4) AS MetricValue
  193. FROM dwd_supplier_delivery
  194. WHERE stat_date=@StatDate AND IFNULL(order_qty,0) > 0
  195. GROUP BY tenant_id
  196. UNION ALL
  197. SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S3_L3_001' AS MetricCode,
  198. ROUND(AVG(TIMESTAMPDIFF(HOUR, request_date, send_date) / 24), 4) AS MetricValue
  199. FROM mdp_std_purchase_request
  200. WHERE request_date IS NOT NULL AND send_date IS NOT NULL AND send_date >= request_date
  201. GROUP BY tenant_id
  202. UNION ALL
  203. SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S3_L3_002' AS MetricCode,
  204. ROUND(AVG(TIMESTAMPDIFF(HOUR, order_date, COALESCE(need_date, due_date)) / 24), 4) AS MetricValue
  205. FROM mdp_std_purchase_order
  206. WHERE order_date IS NOT NULL AND COALESCE(need_date, due_date) IS NOT NULL
  207. AND COALESCE(need_date, due_date) >= order_date
  208. GROUP BY tenant_id
  209. UNION ALL
  210. SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S3_L3_003' AS MetricCode,
  211. ROUND(100 * SUM(CASE WHEN ready_status='READY' THEN 1 ELSE 0 END) / NULLIF(COUNT(1), 0), 4) AS MetricValue
  212. FROM dwd_material_readiness
  213. WHERE stat_date=@StatDate
  214. GROUP BY tenant_id
  215. UNION ALL
  216. SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S3_L3_004' AS MetricCode,
  217. ROUND(100 * SUM(CASE WHEN submit_date IS NOT NULL AND request_date IS NOT NULL
  218. AND TIMESTAMPDIFF(HOUR, request_date, submit_date) <= 72 THEN 1 ELSE 0 END) / NULLIF(COUNT(1), 0), 4) AS MetricValue
  219. FROM mdp_std_delivery_schedule
  220. WHERE request_date IS NOT NULL
  221. GROUP BY tenant_id
  222. UNION ALL
  223. SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S3_L3_005' AS MetricCode,
  224. ROUND(100 * SUM(CASE WHEN risk_level IN ('HIGH','MEDIUM') OR IFNULL(remaining_qty,0) > 0 THEN 1 ELSE 0 END) / NULLIF(COUNT(1), 0), 4) AS MetricValue
  225. FROM dwd_supplier_delivery
  226. WHERE stat_date=@StatDate
  227. GROUP BY tenant_id
  228. UNION ALL
  229. SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S3_L3_006' AS MetricCode,
  230. ROUND(100 * (1 - SUM(CASE WHEN IFNULL(shortage_qty,0) > 0 THEN 1 ELSE 0 END) / NULLIF(COUNT(1), 0)), 4) AS MetricValue
  231. FROM dwd_material_readiness
  232. WHERE stat_date=@StatDate
  233. GROUP BY tenant_id
  234. """,
  235. new SugarParameter("@StatDate", statDate));
  236. }
  237. private async Task<int> UpsertS3KpiValueAsync(S3KpiCalcRow row, DateTime statDate, DateTime now)
  238. {
  239. var meta = await _db.Ado.SqlQuerySingleAsync<S3KpiMetaRow>(
  240. """
  241. SELECT MetricLevel, Direction, YellowThreshold, RedThreshold
  242. FROM ado_smart_ops_kpi_master
  243. WHERE TenantId=@TenantId AND ModuleCode='S3' AND MetricCode=@MetricCode AND IsEnabled=1
  244. LIMIT 1
  245. """,
  246. new SugarParameter("@TenantId", row.TenantId),
  247. new SugarParameter("@MetricCode", row.MetricCode));
  248. if (meta == null || row.MetricValue == null)
  249. return 0;
  250. var table = ResolveKpiValueTable(meta.MetricLevel);
  251. var current = await _db.Ado.SqlQuerySingleAsync<S3KpiValueRow>(
  252. $"""
  253. SELECT id AS Id, metric_value AS MetricValue, target_value AS TargetValue
  254. FROM {table}
  255. WHERE tenant_id=@TenantId AND factory_id=@FactoryId AND module_code='S3'
  256. AND metric_code=@MetricCode AND biz_date=@BizDate AND is_deleted=0
  257. ORDER BY id
  258. LIMIT 1
  259. """,
  260. new SugarParameter("@TenantId", row.TenantId),
  261. new SugarParameter("@FactoryId", row.FactoryId),
  262. new SugarParameter("@MetricCode", row.MetricCode),
  263. new SugarParameter("@BizDate", statDate));
  264. var prior = await _db.Ado.SqlQuerySingleAsync<S3KpiValueRow>(
  265. $"""
  266. SELECT id AS Id, metric_value AS MetricValue, target_value AS TargetValue
  267. FROM {table}
  268. WHERE tenant_id=@TenantId AND factory_id=@FactoryId AND module_code='S3'
  269. AND metric_code=@MetricCode AND biz_date<@BizDate AND is_deleted=0
  270. ORDER BY biz_date DESC, id DESC
  271. LIMIT 1
  272. """,
  273. new SugarParameter("@TenantId", row.TenantId),
  274. new SugarParameter("@FactoryId", row.FactoryId),
  275. new SugarParameter("@MetricCode", row.MetricCode),
  276. new SugarParameter("@BizDate", statDate));
  277. var actual = Math.Round(row.MetricValue.Value, 4);
  278. var target = current?.TargetValue ?? prior?.TargetValue ?? DefaultS3Target(row.MetricCode);
  279. var status = ResolveKpiStatus(actual, target, meta.Direction, meta.YellowThreshold, meta.RedThreshold);
  280. var trend = ResolveTrendFlag(actual, prior?.MetricValue);
  281. if (current != null)
  282. {
  283. return await _db.Ado.ExecuteCommandAsync(
  284. $"""
  285. UPDATE {table}
  286. SET metric_value=@MetricValue, target_value=@TargetValue, status_color=@StatusColor, trend_flag=@TrendFlag,
  287. is_active=1, status='ACTIVE', calc_time=@CalcTime, update_time=@CalcTime
  288. WHERE tenant_id=@TenantId AND factory_id=@FactoryId AND module_code='S3'
  289. AND metric_code=@MetricCode AND biz_date=@BizDate AND is_deleted=0
  290. """,
  291. new SugarParameter("@MetricValue", actual),
  292. new SugarParameter("@TargetValue", target),
  293. new SugarParameter("@StatusColor", status),
  294. new SugarParameter("@TrendFlag", trend),
  295. new SugarParameter("@CalcTime", now),
  296. new SugarParameter("@TenantId", row.TenantId),
  297. new SugarParameter("@FactoryId", row.FactoryId),
  298. new SugarParameter("@MetricCode", row.MetricCode),
  299. new SugarParameter("@BizDate", statDate));
  300. }
  301. var nextId = await _db.Ado.GetLongAsync($"SELECT COALESCE(MAX(id), 0) + 1 FROM {table}");
  302. return await _db.Ado.ExecuteCommandAsync(
  303. $"""
  304. INSERT INTO {table}
  305. (id, tenant_id, factory_id, status, biz_date, create_time, update_time, is_deleted, is_active,
  306. module_code, metric_code, metric_value, target_value, status_color, trend_flag, calc_time)
  307. VALUES
  308. (@Id, @TenantId, @FactoryId, 'ACTIVE', @BizDate, @CalcTime, @CalcTime, 0, 1,
  309. 'S3', @MetricCode, @MetricValue, @TargetValue, @StatusColor, @TrendFlag, @CalcTime)
  310. """,
  311. new SugarParameter("@Id", nextId),
  312. new SugarParameter("@TenantId", row.TenantId),
  313. new SugarParameter("@FactoryId", row.FactoryId),
  314. new SugarParameter("@BizDate", statDate),
  315. new SugarParameter("@CalcTime", now),
  316. new SugarParameter("@MetricCode", row.MetricCode),
  317. new SugarParameter("@MetricValue", actual),
  318. new SugarParameter("@TargetValue", target),
  319. new SugarParameter("@StatusColor", status),
  320. new SugarParameter("@TrendFlag", trend));
  321. }
  322. private IEnumerable<S3MdpSqlCommand> BuildStandardCommands(string batchId, DateTime now)
  323. {
  324. yield return Cmd(
  325. """
  326. INSERT INTO mdp_std_supplier
  327. (tenant_id, factory_id, company_id, source_system, supplier_code, supplier_name, supplier_type, status, contact, address, currency_type, source_biz_key, sync_batch_id, sync_time)
  328. SELECT tenant_id, factory_id, company_id, 'AIDOP',
  329. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Supp')),
  330. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.SortName')),
  331. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Type')),
  332. CASE WHEN JSON_EXTRACT(raw_data,'$.IsActive') IN (1, true) THEN 'ACTIVE' ELSE 'INACTIVE' END,
  333. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.contact')),
  334. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.address')),
  335. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Curr')),
  336. source_biz_key, @BatchId, @Now
  337. FROM mdp_stg_supplier
  338. WHERE source_table='SuppMaster' AND IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Supp')), '') <> ''
  339. ON DUPLICATE KEY UPDATE supplier_name=VALUES(supplier_name), supplier_type=VALUES(supplier_type), status=VALUES(status),
  340. contact=VALUES(contact), address=VALUES(address), currency_type=VALUES(currency_type), sync_batch_id=VALUES(sync_batch_id),
  341. sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
  342. """, batchId, now);
  343. yield return Cmd(
  344. """
  345. INSERT INTO mdp_std_item
  346. (tenant_id, factory_id, company_id, source_system, item_code, item_name, model, unit, item_type, status, source_biz_key, sync_batch_id, sync_time)
  347. SELECT tenant_id, factory_id, company_id, 'AIDOP',
  348. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ItemNum')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.number'))),
  349. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Descr')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.name')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.icitem_name'))),
  350. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Drawing')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.model'))),
  351. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.UM')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.unit'))),
  352. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ItemType')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.item_type'))),
  353. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Status')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.is_active'))),
  354. source_biz_key, @BatchId, @Now
  355. FROM mdp_stg_item
  356. WHERE COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ItemNum')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.number')), '') <> ''
  357. ON DUPLICATE KEY UPDATE item_name=VALUES(item_name), model=VALUES(model), unit=VALUES(unit), item_type=VALUES(item_type),
  358. status=VALUES(status), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
  359. """, batchId, now);
  360. yield return Cmd(
  361. """
  362. INSERT INTO mdp_std_supplier_item
  363. (tenant_id, factory_id, company_id, source_system, item_code, supplier_code, supplier_name, supplier_type, quota_rate, lead_time, min_qty, packaging_qty, price, currency_type, effective_date, expire_date, status, source_biz_key, sync_batch_id, sync_time)
  364. SELECT tenant_id, factory_id, company_id, 'AIDOP',
  365. IFNULL(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.number')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.icitem_number')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.icitem_name'))), ''),
  366. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.supplier_number')),
  367. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.supplier_name')),
  368. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.supplier_type')),
  369. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.quota_rate')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.quota_rate')) AS DECIMAL(18,6)) END,
  370. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.lead_time')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.lead_time')) AS DECIMAL(18,6)) END,
  371. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.qty_min')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.qty_min')) AS DECIMAL(18,6)) END,
  372. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.packaging_qty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.packaging_qty')) AS DECIMAL(18,6)) END,
  373. COALESCE(
  374. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.netpurchase_price')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.netpurchase_price')) AS DECIMAL(18,6)) END,
  375. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.order_price')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.order_price')) AS DECIMAL(18,6)) END
  376. ),
  377. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.currency_type')),
  378. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.effective_date')), 'null'), ''),
  379. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.expiring_date')), 'null'), ''),
  380. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.is_active')),
  381. source_biz_key, @BatchId, @Now
  382. FROM mdp_stg_source_list
  383. WHERE source_table='srm_purchase' AND IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.supplier_number')), '') <> ''
  384. AND IFNULL(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.number')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.icitem_number')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.icitem_name'))), '') <> ''
  385. ON DUPLICATE KEY UPDATE supplier_name=VALUES(supplier_name), quota_rate=VALUES(quota_rate), lead_time=VALUES(lead_time),
  386. min_qty=VALUES(min_qty), packaging_qty=VALUES(packaging_qty), price=VALUES(price), sync_batch_id=VALUES(sync_batch_id),
  387. sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
  388. """, batchId, now);
  389. yield return Cmd(
  390. """
  391. INSERT INTO mdp_std_supply_demand
  392. (tenant_id, factory_id, company_id, source_system, demand_no, demand_line, demand_type, item_code, item_name, required_qty, required_date, supplier_code, status, source_biz_key, sync_batch_id, sync_time)
  393. SELECT tenant_id, factory_id, company_id, 'AIDOP',
  394. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.number')), source_row_id),
  395. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.line')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Line'))),
  396. source_table,
  397. IFNULL(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.itemnum')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.item_number')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ItemNum'))), ''),
  398. IFNULL(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.itemname')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.item_name')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Descr'))), ''),
  399. COALESCE(
  400. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.qty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.qty')) AS DECIMAL(18,6)) END,
  401. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.required_qty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.required_qty')) AS DECIMAL(18,6)) END,
  402. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.shortqty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.shortqty')) AS DECIMAL(18,6)) END,
  403. 0),
  404. COALESCE(NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.required_date')), 'null'), ''), NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.needdate')), 'null'), ''), NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.NeedDate')), 'null'), '')),
  405. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.supplier_number')),
  406. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.status')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Status'))),
  407. source_biz_key, @BatchId, @Now
  408. FROM mdp_stg_supply_demand
  409. WHERE IFNULL(source_biz_key,'') <> ''
  410. ON DUPLICATE KEY UPDATE item_code=VALUES(item_code), item_name=VALUES(item_name), required_qty=VALUES(required_qty),
  411. required_date=VALUES(required_date), supplier_code=VALUES(supplier_code), status=VALUES(status), sync_batch_id=VALUES(sync_batch_id),
  412. sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
  413. """, batchId, now);
  414. yield return Cmd(
  415. """
  416. INSERT INTO mdp_std_purchase_request
  417. (tenant_id, factory_id, company_id, source_system, pr_no, pr_line, item_code, item_name, supplier_code, request_qty, request_date, send_date, arrive_date, status, source_biz_key, sync_batch_id, sync_time)
  418. SELECT tenant_id, factory_id, company_id, 'AIDOP',
  419. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.pr_billno')), source_row_id),
  420. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.line_no')),
  421. IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.item_number')), ''),
  422. IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.item_name')), ''),
  423. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.supplier_number')),
  424. COALESCE(
  425. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.qty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.qty')) AS DECIMAL(18,6)) END,
  426. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.request_qty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.request_qty')) AS DECIMAL(18,6)) END,
  427. 0),
  428. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.create_time')), 'null'), ''),
  429. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.send_date')), 'null'), ''),
  430. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.arrive_date')), 'null'), ''),
  431. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.status')),
  432. source_biz_key, @BatchId, @Now
  433. FROM mdp_stg_supply_demand
  434. WHERE source_table='srm_pr_main'
  435. ON DUPLICATE KEY UPDATE item_code=VALUES(item_code), item_name=VALUES(item_name), supplier_code=VALUES(supplier_code),
  436. request_qty=VALUES(request_qty), status=VALUES(status), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time),
  437. update_time=CURRENT_TIMESTAMP
  438. """, batchId, now);
  439. yield return Cmd(
  440. """
  441. INSERT INTO mdp_std_purchase_order
  442. (tenant_id, factory_id, company_id, source_system, po_no, po_line, po_type, supplier_code, item_code, item_name, order_qty, received_qty, returned_qty, due_date, need_date, order_date, status, buyer, work_order, source_biz_key, sync_batch_id, sync_time)
  443. SELECT d.tenant_id, d.factory_id, d.company_id, 'AIDOP',
  444. JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.PurOrd')),
  445. CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Line')) AS CHAR),
  446. JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Potype')),
  447. JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.Supp')),
  448. IFNULL(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.ItemNum')), ''),
  449. IFNULL(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(i.raw_data,'$.Descr')), JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Descr'))), ''),
  450. COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.QtyOrded')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.QtyOrded')) AS DECIMAL(18,6)) END, 0),
  451. COALESCE(
  452. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.QtyReceived')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.QtyReceived')) AS DECIMAL(18,6)) END,
  453. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.RctQty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.RctQty')) AS DECIMAL(18,6)) END,
  454. 0),
  455. COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.QtyReturned')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.QtyReturned')) AS DECIMAL(18,6)) END, 0),
  456. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.DueDate')), 'null'), ''),
  457. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.NeedDate')), 'null'), ''),
  458. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.OrdDate')), 'null'), ''),
  459. JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Status')),
  460. JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.Buyer')),
  461. JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.WorkOrd')),
  462. d.source_biz_key, @BatchId, @Now
  463. FROM mdp_stg_purchase_order d
  464. LEFT JOIN mdp_stg_purchase_order m ON m.source_table='PurOrdMaster'
  465. AND JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.PurOrd')) = JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.PurOrd'))
  466. LEFT JOIN mdp_stg_item i ON i.source_table='ItemMaster'
  467. AND JSON_UNQUOTE(JSON_EXTRACT(i.raw_data,'$.ItemNum')) = JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.ItemNum'))
  468. WHERE d.source_table='PurOrdDetail' AND IFNULL(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.PurOrd')), '') <> ''
  469. ON DUPLICATE KEY UPDATE po_type=VALUES(po_type), supplier_code=VALUES(supplier_code), item_code=VALUES(item_code),
  470. item_name=VALUES(item_name), order_qty=VALUES(order_qty), received_qty=VALUES(received_qty), returned_qty=VALUES(returned_qty),
  471. due_date=VALUES(due_date), need_date=VALUES(need_date), order_date=VALUES(order_date), status=VALUES(status),
  472. buyer=VALUES(buyer), work_order=VALUES(work_order), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time),
  473. update_time=CURRENT_TIMESTAMP
  474. """, batchId, now);
  475. yield return Cmd(
  476. """
  477. INSERT INTO mdp_std_delivery_schedule
  478. (tenant_id, source_system, delivery_plan_no, po_no, po_line, item_code, supplier_code, supplier_name, schedule_qty, sent_qty, rest_qty, return_qty, request_date, need_date, submit_date, last_sent_date, status, source_biz_key, sync_batch_id, sync_time)
  479. SELECT tenant_id, 'AIDOP',
  480. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.dsnum')),
  481. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ponumber')),
  482. CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.poline')) AS CHAR),
  483. IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.itemnum')), ''),
  484. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.suppliercode')),
  485. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.supplier')),
  486. COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.schedqty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.schedqty')) AS DECIMAL(18,6)) END,0),
  487. COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.sentqty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.sentqty')) AS DECIMAL(18,6)) END,0),
  488. COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.restqty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.restqty')) AS DECIMAL(18,6)) END,0),
  489. COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.returnqty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.returnqty')) AS DECIMAL(18,6)) END,0),
  490. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.requestdate')), 'null'), ''),
  491. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.needdate')), 'null'), ''),
  492. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.submitdate')), 'null'), ''),
  493. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.lastsentdate')), 'null'), ''),
  494. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.status')),
  495. source_biz_key, @BatchId, @Now
  496. FROM mdp_stg_delivery
  497. WHERE source_table='srm_polist_ds' AND IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.dsnum')), '') <> ''
  498. ON DUPLICATE KEY UPDATE po_no=VALUES(po_no), po_line=VALUES(po_line), item_code=VALUES(item_code), supplier_code=VALUES(supplier_code),
  499. supplier_name=VALUES(supplier_name), schedule_qty=VALUES(schedule_qty), sent_qty=VALUES(sent_qty), rest_qty=VALUES(rest_qty),
  500. return_qty=VALUES(return_qty), need_date=VALUES(need_date), status=VALUES(status), sync_batch_id=VALUES(sync_batch_id),
  501. sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
  502. """, batchId, now);
  503. yield return Cmd(
  504. """
  505. INSERT INTO mdp_std_delivery_result
  506. (tenant_id, source_system, delivery_no, delivery_line, delivery_plan_no, po_no, po_line, item_code, delivery_qty, receipt_qty, return_qty, receipt_date, qc_status, event_time, source_biz_key, sync_batch_id, sync_time)
  507. SELECT tenant_id, 'AIDOP',
  508. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.shddh')), source_row_id),
  509. CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.id')) AS CHAR),
  510. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.dsnum')),
  511. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.po_bill')),
  512. CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.po_billline')) AS CHAR),
  513. IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.itemnum')), ''),
  514. COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.sh_delivery_quantity')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.sh_delivery_quantity')) AS DECIMAL(18,6)) END,0),
  515. 0,
  516. 0,
  517. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.receipt_date')), 'null'), ''),
  518. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.qc_status')),
  519. COALESCE(NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.updatetime')), 'null'), ''), NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.createtime')), 'null'), '')),
  520. source_biz_key, @BatchId, @Now
  521. FROM mdp_stg_delivery
  522. WHERE source_table IN ('scm_shd','scm_shdzb')
  523. ON DUPLICATE KEY UPDATE delivery_qty=VALUES(delivery_qty), receipt_qty=VALUES(receipt_qty), return_qty=VALUES(return_qty),
  524. receipt_date=VALUES(receipt_date), event_time=VALUES(event_time), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time),
  525. update_time=CURRENT_TIMESTAMP
  526. """, batchId, now);
  527. yield return Cmd(
  528. """
  529. INSERT INTO mdp_std_material_readiness
  530. (tenant_id, source_system, work_order, op_code, item_code, component_item_code, required_qty, issued_qty, received_qty, available_qty, in_transit_qty, incoming_qty, shortage_qty, ready_status, need_date, supplier_code, source_biz_key, sync_batch_id, sync_time)
  531. SELECT tenant_id, 'AIDOP',
  532. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkOrd')),
  533. CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Op')) AS CHAR),
  534. IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.PMBOM')), ''),
  535. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ItemNum')), source_row_id),
  536. COALESCE(
  537. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.QtyRequired')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.QtyRequired')) AS DECIMAL(18,6)) END,
  538. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.QtyReq')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.QtyReq')) AS DECIMAL(18,6)) END,
  539. 0),
  540. COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.QtyIssued')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.QtyIssued')) AS DECIMAL(18,6)) END, 0),
  541. COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.QtyReceived')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.QtyReceived')) AS DECIMAL(18,6)) END, 0),
  542. 0, 0, 0,
  543. 0,
  544. 'READY',
  545. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.DueDate')), 'null'), ''),
  546. NULL,
  547. source_biz_key, @BatchId, @Now
  548. FROM mdp_stg_work_order_material
  549. WHERE source_table='WorkOrdDetail' AND IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkOrd')), '') <> ''
  550. ON DUPLICATE KEY UPDATE required_qty=VALUES(required_qty), received_qty=VALUES(received_qty), need_date=VALUES(need_date),
  551. sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
  552. """, batchId, now);
  553. yield return Cmd(
  554. """
  555. INSERT INTO mdp_std_process_outsource_order
  556. (tenant_id, source_system, work_order, op_code, routing_code, supplier_code, po_no, po_line, order_qty, completed_qty, due_date, status, source_biz_key, sync_batch_id, sync_time)
  557. SELECT tenant_id, 'AIDOP',
  558. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkOrd')), ''),
  559. CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Op')) AS CHAR),
  560. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.RoutingCode')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.RouteCode'))),
  561. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.SupplierCode')),
  562. NULL, NULL,
  563. COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.PackingQty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.PackingQty')) AS DECIMAL(18,6)) END, 0),
  564. 0,
  565. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.UpdatedAt')), 'null'), ''),
  566. CASE WHEN JSON_EXTRACT(raw_data,'$.IsEnabled') IN (1, true) THEN 'OPEN' ELSE 'DISABLED' END,
  567. source_biz_key, @BatchId, @Now
  568. FROM mdp_stg_work_order_material
  569. WHERE source_table='RoutingOpDetail'
  570. ON DUPLICATE KEY UPDATE routing_code=VALUES(routing_code), supplier_code=VALUES(supplier_code), order_qty=VALUES(order_qty),
  571. due_date=VALUES(due_date), status=VALUES(status), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time),
  572. update_time=CURRENT_TIMESTAMP
  573. """, batchId, now);
  574. }
  575. private IEnumerable<S3MdpSqlCommand> BuildDwdCommands(string batchId, DateTime now)
  576. {
  577. yield return Cmd(
  578. """
  579. INSERT INTO dwd_supply_demand
  580. (tenant_id, stat_date, demand_no, demand_line, demand_type, item_code, item_name, supplier_code, required_qty, fulfilled_qty, shortage_qty, required_date, demand_status, source_system, sync_batch_id, calc_time)
  581. SELECT tenant_id, @StatDate, demand_no, IFNULL(demand_line,''), demand_type, IFNULL(item_code,''), IFNULL(item_name,''), supplier_code,
  582. IFNULL(required_qty,0), 0, IFNULL(required_qty,0), required_date, status, source_system, @BatchId, @Now
  583. FROM mdp_std_supply_demand
  584. WHERE IFNULL(demand_no,'') <> ''
  585. ON DUPLICATE KEY UPDATE item_code=VALUES(item_code), item_name=VALUES(item_name), supplier_code=VALUES(supplier_code),
  586. required_qty=VALUES(required_qty), fulfilled_qty=VALUES(fulfilled_qty), shortage_qty=VALUES(shortage_qty),
  587. demand_status=VALUES(demand_status), sync_batch_id=VALUES(sync_batch_id), calc_time=VALUES(calc_time), update_time=CURRENT_TIMESTAMP
  588. """, batchId, now);
  589. yield return Cmd(
  590. """
  591. INSERT INTO dwd_supplier_delivery
  592. (tenant_id, stat_date, po_no, po_line, po_type, supplier_code, supplier_name, item_code, item_name, order_qty, schedule_qty, delivery_qty, receipt_qty, return_qty, remaining_qty, due_date, need_date, last_delivery_date, last_receipt_date, delivery_status, risk_level, source_system, sync_batch_id, calc_time)
  593. SELECT po.tenant_id, @StatDate, po.po_no, po.po_line, po.po_type, po.supplier_code, IFNULL(s.supplier_name, ds.supplier_name),
  594. IFNULL(po.item_code,''), IFNULL(po.item_name,''), IFNULL(po.order_qty,0), IFNULL(ds.schedule_qty,0), IFNULL(dr.delivery_qty,0),
  595. IFNULL(po.received_qty,0) + IFNULL(dr.receipt_qty,0), IFNULL(po.returned_qty,0) + IFNULL(dr.return_qty,0),
  596. GREATEST(IFNULL(po.order_qty,0) - IFNULL(po.received_qty,0) - IFNULL(dr.receipt_qty,0) - IFNULL(po.returned_qty,0), 0),
  597. po.due_date, COALESCE(ds.need_date, po.need_date), dr.event_time, dr.receipt_date,
  598. CASE
  599. WHEN IFNULL(po.order_qty,0) <= IFNULL(po.received_qty,0) + IFNULL(dr.receipt_qty,0) THEN 'COMPLETED'
  600. WHEN COALESCE(ds.need_date, po.need_date, po.due_date) < @Now THEN 'DELAYED'
  601. ELSE 'OPEN'
  602. END,
  603. CASE
  604. WHEN COALESCE(ds.need_date, po.need_date, po.due_date) < @Now
  605. AND IFNULL(po.order_qty,0) > IFNULL(po.received_qty,0) + IFNULL(dr.receipt_qty,0) THEN 'HIGH'
  606. WHEN IFNULL(po.order_qty,0) > IFNULL(po.received_qty,0) + IFNULL(dr.receipt_qty,0) THEN 'MEDIUM'
  607. ELSE 'LOW'
  608. END,
  609. 'AIDOP', @BatchId, @Now
  610. FROM mdp_std_purchase_order po
  611. LEFT JOIN (
  612. SELECT tenant_id, po_no, po_line, MIN(supplier_name) AS supplier_name, SUM(IFNULL(schedule_qty,0)) AS schedule_qty, MIN(need_date) AS need_date
  613. FROM mdp_std_delivery_schedule
  614. WHERE IFNULL(po_no,'') <> '' AND IFNULL(po_line,'') <> ''
  615. GROUP BY tenant_id, po_no, po_line
  616. ) ds ON po.tenant_id=ds.tenant_id AND po.po_no=ds.po_no AND po.po_line=ds.po_line
  617. LEFT JOIN (
  618. SELECT tenant_id, po_no, po_line, SUM(IFNULL(delivery_qty,0)) AS delivery_qty, SUM(IFNULL(receipt_qty,0)) AS receipt_qty,
  619. SUM(IFNULL(return_qty,0)) AS return_qty, MAX(event_time) AS event_time, MAX(receipt_date) AS receipt_date
  620. FROM mdp_std_delivery_result
  621. WHERE IFNULL(po_no,'') <> '' AND IFNULL(po_line,'') <> ''
  622. GROUP BY tenant_id, po_no, po_line
  623. ) dr ON po.tenant_id=dr.tenant_id AND po.po_no=dr.po_no AND po.po_line=dr.po_line
  624. LEFT JOIN (
  625. SELECT tenant_id, supplier_code, MAX(supplier_name) AS supplier_name
  626. FROM mdp_std_supplier
  627. WHERE IFNULL(supplier_code,'') <> ''
  628. GROUP BY tenant_id, supplier_code
  629. ) s ON po.tenant_id=s.tenant_id AND po.supplier_code=s.supplier_code
  630. WHERE IFNULL(po.po_no,'') <> '' AND IFNULL(po.po_line,'') <> ''
  631. ON DUPLICATE KEY UPDATE supplier_name=VALUES(supplier_name), item_code=VALUES(item_code), item_name=VALUES(item_name),
  632. order_qty=VALUES(order_qty), schedule_qty=VALUES(schedule_qty), delivery_qty=VALUES(delivery_qty), receipt_qty=VALUES(receipt_qty),
  633. return_qty=VALUES(return_qty), remaining_qty=VALUES(remaining_qty), delivery_status=VALUES(delivery_status),
  634. risk_level=VALUES(risk_level), sync_batch_id=VALUES(sync_batch_id), calc_time=VALUES(calc_time), update_time=CURRENT_TIMESTAMP
  635. """, batchId, now);
  636. yield return Cmd("DELETE FROM dwd_supplier_risk WHERE stat_date=@StatDate", batchId, now);
  637. yield return Cmd(
  638. """
  639. INSERT INTO dwd_supplier_risk
  640. (tenant_id, stat_date, supplier_code, supplier_name, item_code, risk_type, risk_level, risk_count, risk_qty, risk_reason, source_system, calc_batch_id, calc_time)
  641. SELECT tenant_id, stat_date, IFNULL(supplier_code,''), supplier_name, IFNULL(item_code,''),
  642. 'DELIVERY_DELAY', risk_level, COUNT(1), SUM(IFNULL(remaining_qty,0)),
  643. '供应交付存在延期或未完成风险', 'AIDOP', @BatchId, @Now
  644. FROM dwd_supplier_delivery
  645. WHERE stat_date=@StatDate AND risk_level IN ('HIGH','MEDIUM') AND IFNULL(supplier_code,'') <> ''
  646. GROUP BY tenant_id, stat_date, supplier_code, supplier_name, item_code, risk_level
  647. ON DUPLICATE KEY UPDATE risk_level=VALUES(risk_level), risk_count=VALUES(risk_count), risk_qty=VALUES(risk_qty),
  648. risk_reason=VALUES(risk_reason), calc_batch_id=VALUES(calc_batch_id), calc_time=VALUES(calc_time), update_time=CURRENT_TIMESTAMP
  649. """, batchId, now);
  650. yield return Cmd(
  651. """
  652. INSERT INTO dwd_process_outsource_delivery
  653. (tenant_id, stat_date, work_order, op_code, routing_code, supplier_code, supplier_name, po_no, po_line, order_qty, completed_qty, remaining_qty, due_date, delivery_status, risk_level, source_system, calc_batch_id, calc_time)
  654. SELECT o.tenant_id, @StatDate, o.work_order, o.op_code, o.routing_code, o.supplier_code, s.supplier_name, o.po_no, IFNULL(o.po_line,''),
  655. IFNULL(o.order_qty,0), IFNULL(o.completed_qty,0), GREATEST(IFNULL(o.order_qty,0)-IFNULL(o.completed_qty,0),0),
  656. o.due_date,
  657. CASE WHEN IFNULL(o.completed_qty,0) >= IFNULL(o.order_qty,0) AND IFNULL(o.order_qty,0) > 0 THEN 'COMPLETED'
  658. WHEN o.due_date < @Now THEN 'DELAYED' ELSE IFNULL(o.status,'OPEN') END,
  659. CASE WHEN o.due_date < @Now AND IFNULL(o.completed_qty,0) < IFNULL(o.order_qty,0) THEN 'HIGH'
  660. WHEN IFNULL(o.completed_qty,0) < IFNULL(o.order_qty,0) THEN 'MEDIUM' ELSE 'LOW' END,
  661. 'AIDOP', @BatchId, @Now
  662. FROM mdp_std_process_outsource_order o
  663. LEFT JOIN (
  664. SELECT tenant_id, supplier_code, MAX(supplier_name) AS supplier_name
  665. FROM mdp_std_supplier
  666. WHERE IFNULL(supplier_code,'') <> ''
  667. GROUP BY tenant_id, supplier_code
  668. ) s ON o.tenant_id=s.tenant_id AND o.supplier_code=s.supplier_code
  669. WHERE IFNULL(o.work_order,'') <> '' OR IFNULL(o.routing_code,'') <> ''
  670. ON DUPLICATE KEY UPDATE supplier_code=VALUES(supplier_code), supplier_name=VALUES(supplier_name), order_qty=VALUES(order_qty),
  671. completed_qty=VALUES(completed_qty), remaining_qty=VALUES(remaining_qty), due_date=VALUES(due_date),
  672. delivery_status=VALUES(delivery_status), risk_level=VALUES(risk_level), calc_batch_id=VALUES(calc_batch_id),
  673. calc_time=VALUES(calc_time), update_time=CURRENT_TIMESTAMP
  674. """, batchId, now);
  675. }
  676. private async Task<int> UpsertMaterialReadinessDwdAsync(string batchId, DateTime now)
  677. {
  678. return await _db.Ado.ExecuteCommandAsync(
  679. """
  680. INSERT INTO dwd_material_readiness
  681. (tenant_id, stat_date, work_order, op_code, parent_item_code, component_item_code, component_item_name, required_qty, cumulative_required_qty, stock_available_qty, qc_pending_qty, in_transit_qty, delivery_reply_qty, available_qty, shortage_qty, ready_status, supplier_code, supplier_name, need_date, calc_batch_id, calc_time)
  682. SELECT s.tenant_id, @StatDate, s.work_order, s.op_code, s.item_code, s.component_item_code, IFNULL(i.item_name, ''),
  683. IFNULL(s.required_qty, 0), IFNULL(s.required_qty, 0), IFNULL(s.available_qty, 0), 0, IFNULL(s.in_transit_qty, 0),
  684. IFNULL(s.incoming_qty, 0), IFNULL(s.available_qty, 0), IFNULL(s.shortage_qty, 0),
  685. CASE WHEN IFNULL(s.shortage_qty, 0) > 0 THEN 'SHORTAGE' ELSE 'READY' END,
  686. s.supplier_code, IFNULL(sp.supplier_name, ''), s.need_date, @BatchId, @Now
  687. FROM mdp_std_material_readiness s
  688. LEFT JOIN mdp_std_item i ON s.tenant_id=i.tenant_id AND s.component_item_code=i.item_code
  689. LEFT JOIN mdp_std_supplier sp ON s.tenant_id=sp.tenant_id AND s.supplier_code=sp.supplier_code
  690. WHERE IFNULL(s.work_order, '') <> ''
  691. ON DUPLICATE KEY UPDATE component_item_name=VALUES(component_item_name), required_qty=VALUES(required_qty),
  692. cumulative_required_qty=VALUES(cumulative_required_qty), stock_available_qty=VALUES(stock_available_qty),
  693. qc_pending_qty=VALUES(qc_pending_qty), in_transit_qty=VALUES(in_transit_qty), delivery_reply_qty=VALUES(delivery_reply_qty),
  694. available_qty=VALUES(available_qty), shortage_qty=VALUES(shortage_qty), ready_status=VALUES(ready_status),
  695. supplier_code=VALUES(supplier_code), supplier_name=VALUES(supplier_name), need_date=VALUES(need_date),
  696. calc_batch_id=VALUES(calc_batch_id), calc_time=VALUES(calc_time), update_time=CURRENT_TIMESTAMP
  697. """,
  698. new SugarParameter("@StatDate", now.Date),
  699. new SugarParameter("@BatchId", batchId),
  700. new SugarParameter("@Now", now));
  701. }
  702. private async Task<int> DeleteCurrentShortageAsync(DateTime statDate)
  703. {
  704. return await _db.Ado.ExecuteCommandAsync(
  705. """
  706. DELETE s
  707. FROM dwd_material_shortage s
  708. JOIN (
  709. SELECT DISTINCT work_order
  710. FROM mdp_std_material_readiness
  711. WHERE IFNULL(work_order, '') <> ''
  712. ) w ON s.work_order = w.work_order
  713. WHERE s.stat_date = @StatDate
  714. """,
  715. new SugarParameter("@StatDate", statDate));
  716. }
  717. private async Task<int> InsertMaterialShortageAsync(string batchId, DateTime now)
  718. {
  719. return await _db.Ado.ExecuteCommandAsync(
  720. """
  721. INSERT INTO dwd_material_shortage
  722. (tenant_id, stat_date, work_order, op_code, component_item_code, shortage_qty, shortage_reason, expected_supply_date, supplier_code, related_po_no, risk_level, calc_batch_id, calc_time)
  723. SELECT s.tenant_id, @StatDate, s.work_order, s.op_code, s.component_item_code, IFNULL(s.shortage_qty, 0),
  724. '标准层缺料数量大于 0', s.need_date, s.supplier_code, NULL,
  725. CASE WHEN IFNULL(s.shortage_qty, 0) > IFNULL(s.required_qty, 0) THEN 'HIGH'
  726. WHEN IFNULL(s.shortage_qty, 0) > 0 THEN 'MEDIUM' ELSE 'LOW' END,
  727. @BatchId, @Now
  728. FROM mdp_std_material_readiness s
  729. WHERE IFNULL(s.work_order, '') <> '' AND IFNULL(s.shortage_qty, 0) > 0
  730. """,
  731. new SugarParameter("@StatDate", now.Date),
  732. new SugarParameter("@BatchId", batchId),
  733. new SugarParameter("@Now", now));
  734. }
  735. private async Task<long> InsertSyncLogAsync(long entityId, string entityName, string batchId, int rowsRead)
  736. {
  737. await _db.Ado.ExecuteCommandAsync(
  738. """
  739. INSERT INTO mdp_sync_log
  740. (tenant_id, entity_id, source_code, entity_name, sync_batch_id, sync_type, trigger_type, sync_start, rows_read, status)
  741. VALUES (0, @EntityId, 'AIDOPDEV_MYSQL', @EntityName, @BatchId, 'FULL', 'AUTO', NOW(), @RowsRead, 'RUNNING')
  742. """,
  743. new SugarParameter("@EntityId", entityId),
  744. new SugarParameter("@EntityName", entityName),
  745. new SugarParameter("@BatchId", batchId),
  746. new SugarParameter("@RowsRead", rowsRead));
  747. return await _db.Ado.GetLongAsync(
  748. "SELECT id FROM mdp_sync_log WHERE sync_batch_id=@BatchId AND entity_id=@EntityId ORDER BY id DESC LIMIT 1",
  749. new List<SugarParameter>
  750. {
  751. new("@BatchId", batchId),
  752. new("@EntityId", entityId)
  753. });
  754. }
  755. private async Task MarkSyncLogSuccessAsync(long logId, DateTime started, int affected)
  756. {
  757. await _db.Ado.ExecuteCommandAsync(
  758. """
  759. UPDATE mdp_sync_log
  760. SET sync_end=NOW(), duration_ms=@DurationMs, rows_insert=@RowsInsert, rows_update=0, rows_skip=0, rows_error=0, status='SUCCESS'
  761. WHERE id=@Id
  762. """,
  763. new SugarParameter("@DurationMs", (int)(DateTime.Now - started).TotalMilliseconds),
  764. new SugarParameter("@RowsInsert", affected),
  765. new SugarParameter("@Id", logId));
  766. }
  767. private async Task MarkSyncLogFailedAsync(long logId, DateTime started, string message)
  768. {
  769. try
  770. {
  771. await _db.Ado.ExecuteCommandAsync(
  772. """
  773. UPDATE mdp_sync_log
  774. SET sync_end=NOW(), duration_ms=@DurationMs, rows_error=1, status='FAILED', error_msg=@ErrorMsg
  775. WHERE id=@Id
  776. """,
  777. new SugarParameter("@DurationMs", (int)(DateTime.Now - started).TotalMilliseconds),
  778. new SugarParameter("@ErrorMsg", message.Length <= 1000 ? message : message[..1000]),
  779. new SugarParameter("@Id", logId));
  780. }
  781. catch (Exception ex)
  782. {
  783. // 写库自身失败兜底:避免再抛掩盖原异常;遗留 RUNNING 行可由运维手动清理
  784. Console.Error.WriteLine($"[S3MdpSyncTransform] MarkSyncLogFailed write failed (syncLogId={logId}): {ex.Message}");
  785. }
  786. }
  787. private async Task<long> InsertTransformRunLogAsync(string batchId, DateTime startedAt, string triggerType)
  788. {
  789. await _db.Ado.ExecuteCommandAsync(
  790. """
  791. INSERT INTO mdp_transform_run_log
  792. (tenant_id, job_code, job_name, trigger_type, batch_id, status, start_time)
  793. VALUES (0, 'S3_MDP_SYNC_TRANSFORM', 'S3 MDP同步与标准化转换', @TriggerType, @BatchId, 'RUNNING', @StartTime)
  794. """,
  795. new SugarParameter("@TriggerType", NormalizeTriggerType(triggerType)),
  796. new SugarParameter("@BatchId", batchId),
  797. new SugarParameter("@StartTime", startedAt));
  798. return await _db.Ado.GetLongAsync(
  799. "SELECT id FROM mdp_transform_run_log WHERE batch_id=@BatchId ORDER BY id DESC LIMIT 1",
  800. new List<SugarParameter> { new("@BatchId", batchId) });
  801. }
  802. private async Task MarkTransformRunSuccessAsync(long runLogId, DateTime startedAt, S3MdpSyncTransformResult result)
  803. {
  804. var finishedAt = DateTime.Now;
  805. await _db.Ado.ExecuteCommandAsync(
  806. """
  807. UPDATE mdp_transform_run_log
  808. SET status='SUCCESS', end_time=@EndTime, duration_ms=@DurationMs,
  809. stage_rows=@StageRows, standard_rows=@StandardRows, dwd_rows=@DwdRows,
  810. summary_json=@SummaryJson, update_time=CURRENT_TIMESTAMP
  811. WHERE id=@Id
  812. """,
  813. new SugarParameter("@EndTime", finishedAt),
  814. new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds),
  815. new SugarParameter("@StageRows", result.StageRows),
  816. new SugarParameter("@StandardRows", result.StandardRows),
  817. new SugarParameter("@DwdRows", result.DwdRows),
  818. new SugarParameter("@SummaryJson", BuildRunSummaryJson(result)),
  819. new SugarParameter("@Id", runLogId));
  820. }
  821. private async Task MarkTransformRunFailedAsync(long runLogId, DateTime startedAt, string message)
  822. {
  823. try
  824. {
  825. var finishedAt = DateTime.Now;
  826. await _db.Ado.ExecuteCommandAsync(
  827. """
  828. UPDATE mdp_transform_run_log
  829. SET status='FAILED', end_time=@EndTime, duration_ms=@DurationMs,
  830. error_message=@ErrorMessage, update_time=CURRENT_TIMESTAMP
  831. WHERE id=@Id
  832. """,
  833. new SugarParameter("@EndTime", finishedAt),
  834. new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds),
  835. new SugarParameter("@ErrorMessage", Truncate(message, 2000)),
  836. new SugarParameter("@Id", runLogId));
  837. }
  838. catch (Exception ex)
  839. {
  840. // 写库自身失败兜底(典型场景:远端 MySQL 瞬断导致 MarkFailed 自身也连不上)
  841. Console.Error.WriteLine($"[S3MdpSyncTransform] MarkTransformRunFailed write failed (runLogId={runLogId}): {ex.Message}");
  842. }
  843. }
  844. private static S3MdpSqlCommand Cmd(string sql, string batchId, DateTime now)
  845. {
  846. return new S3MdpSqlCommand(sql, new[]
  847. {
  848. new SugarParameter("@BatchId", batchId),
  849. new SugarParameter("@Now", now),
  850. new SugarParameter("@StatDate", now.Date)
  851. });
  852. }
  853. private static string BuildJsonObjectExpression(IEnumerable<string> columns)
  854. {
  855. var parts = columns.SelectMany(c => new[] { $"'{c.Replace("'", "''")}'", $"s.`{c}`" });
  856. return $"JSON_OBJECT({string.Join(",", parts)})";
  857. }
  858. private static string FindColumn(IEnumerable<string> columns, string expected)
  859. {
  860. return columns.First(u => string.Equals(u, expected, StringComparison.OrdinalIgnoreCase));
  861. }
  862. private static string NormalizeTriggerType(string? triggerType)
  863. {
  864. return string.IsNullOrWhiteSpace(triggerType) ? "AUTO" : triggerType.Trim().ToUpperInvariant();
  865. }
  866. private static string BuildRunSummaryJson(S3MdpSyncTransformResult result)
  867. {
  868. return $$"""{"batchId":"{{result.BatchId}}","stageRows":{{result.StageRows}},"standardRows":{{result.StandardRows}},"dwdRows":{{result.DwdRows}},"kpiRows":{{result.KpiRows}}}""";
  869. }
  870. private static string ResolveKpiValueTable(int metricLevel)
  871. {
  872. return metricLevel switch
  873. {
  874. 1 => "ado_s9_kpi_value_l1_day",
  875. 2 => "ado_s9_kpi_value_l2_day",
  876. 3 => "ado_s9_kpi_value_l3_day",
  877. 4 => "ado_s9_kpi_value_l4_day",
  878. _ => "ado_s9_kpi_value_l2_day"
  879. };
  880. }
  881. private static decimal DefaultS3Target(string metricCode)
  882. {
  883. return metricCode switch
  884. {
  885. "S3_L1_001" => 15m,
  886. "S3_L1_002" => 95m,
  887. "S3_L2_004" => 10.5m,
  888. "S3_L2_005" => 96.72m,
  889. "S3_L3_001" => 5m,
  890. "S3_L3_002" => 7m,
  891. "S3_L3_003" => 92m,
  892. "S3_L3_004" => 90m,
  893. "S3_L3_005" => 15m,
  894. "S3_L3_006" => 85m,
  895. _ => 0m
  896. };
  897. }
  898. private static string ResolveKpiStatus(decimal actual, decimal target, string? direction, decimal? yellowThreshold, decimal? redThreshold)
  899. {
  900. if (target <= 0) return "gray";
  901. var ratio = actual / target * 100m;
  902. if (string.Equals(direction, "lower_is_better", StringComparison.OrdinalIgnoreCase))
  903. {
  904. if (actual <= target) return "green";
  905. if (ratio <= (yellowThreshold ?? 110m)) return "yellow";
  906. return ratio >= (redThreshold ?? 120m) ? "red" : "yellow";
  907. }
  908. if (actual >= target) return "green";
  909. if (ratio >= (yellowThreshold ?? 95m)) return "yellow";
  910. return ratio <= (redThreshold ?? 80m) ? "red" : "yellow";
  911. }
  912. private static string ResolveTrendFlag(decimal actual, decimal? previous)
  913. {
  914. if (previous == null) return "flat";
  915. if (actual > previous.Value) return "up";
  916. if (actual < previous.Value) return "down";
  917. return "flat";
  918. }
  919. private static string Truncate(string? raw, int maxLength)
  920. {
  921. if (string.IsNullOrEmpty(raw)) return string.Empty;
  922. return raw.Length <= maxLength ? raw : raw[..maxLength];
  923. }
  924. private sealed class S3ColumnRow
  925. {
  926. public string ColumnName { get; set; } = string.Empty;
  927. }
  928. private sealed class S3MdpEntityRow
  929. {
  930. public long Id { get; set; }
  931. public string EntityName { get; set; } = string.Empty;
  932. }
  933. private sealed class S3KpiCalcRow
  934. {
  935. public long TenantId { get; set; }
  936. public long FactoryId { get; set; }
  937. public string MetricCode { get; set; } = string.Empty;
  938. public decimal? MetricValue { get; set; }
  939. }
  940. private sealed class S3KpiMetaRow
  941. {
  942. public int MetricLevel { get; set; }
  943. public string Direction { get; set; } = "higher_is_better";
  944. public decimal? YellowThreshold { get; set; }
  945. public decimal? RedThreshold { get; set; }
  946. }
  947. private sealed class S3KpiValueRow
  948. {
  949. public long Id { get; set; }
  950. public decimal? MetricValue { get; set; }
  951. public decimal? TargetValue { get; set; }
  952. }
  953. }
  954. public sealed class S3MdpSyncTransformResult
  955. {
  956. public long RunLogId { get; set; }
  957. public string BatchId { get; set; } = string.Empty;
  958. public int StageRows { get; set; }
  959. public int StandardRows { get; set; }
  960. public int DwdRows { get; set; }
  961. public int KpiRows { get; set; }
  962. }
  963. public sealed class S3MaterialRefreshResult
  964. {
  965. public string BatchId { get; set; } = string.Empty;
  966. public int StdCount { get; set; }
  967. public int ReadinessCount { get; set; }
  968. public int ShortageCount { get; set; }
  969. }
  970. internal sealed record S3MdpSqlCommand(string Sql, SugarParameter[] Parameters);
  971. internal sealed record S3MdpEntityConfig(
  972. string EntityCode,
  973. string SourceTable,
  974. string TargetTable,
  975. string SourceRowIdExpression,
  976. string SourceBizKeyExpression)
  977. {
  978. public static readonly IReadOnlyList<S3MdpEntityConfig> All = new List<S3MdpEntityConfig>
  979. {
  980. new("S3_SUPPLIER", "SuppMaster", "mdp_stg_supplier", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`Supp`,''))"),
  981. new("S3_CONSIGNEE_SUPPLIER", "ConsigneeAddressMaster", "mdp_stg_supplier", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`Address`,''))"),
  982. new("S3_ITEM_ERP", "ItemMaster", "mdp_stg_item", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`ItemNum`,''))"),
  983. new("S3_ITEM_NEW", "ic_item", "mdp_stg_item", "Id", "CONCAT(IFNULL(s.`tenant_id`,0), ':', IFNULL(s.`number`,''))"),
  984. new("S3_SOURCE_LIST", "srm_purchase", "mdp_stg_source_list", "Id", "CONCAT(IFNULL(s.`number`,''), ':', IFNULL(s.`supplier_number`,''))"),
  985. new("S3_SUPPLY_DEMAND", "ic_demandschedule", "mdp_stg_supply_demand", "Id", "CAST(s.`Id` AS CHAR)"),
  986. new("S3_PURCHASE_REQUEST", "srm_pr_main", "mdp_stg_supply_demand", "Id", "COALESCE(s.`pr_billno`, CAST(s.`Id` AS CHAR))"),
  987. new("S3_PURCHASE_ORDER_MASTER", "PurOrdMaster", "mdp_stg_purchase_order", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`PurOrd`,''))"),
  988. new("S3_PURCHASE_ORDER_DETAIL", "PurOrdDetail", "mdp_stg_purchase_order", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`PurOrd`,''), ':', IFNULL(s.`Line`,''))"),
  989. new("S3_DELIVERY_PLAN", "srm_polist_ds", "mdp_stg_delivery", "Id", "s.`dsnum`"),
  990. new("S3_SHIPPER_MASTER", "scm_shd", "mdp_stg_delivery", "id", "COALESCE(s.`shddh`, CAST(s.`id` AS CHAR))"),
  991. new("S3_SHIPPER_DETAIL", "scm_shdzb", "mdp_stg_delivery", "id", "CONCAT(IFNULL(s.`glid`,''), ':', IFNULL(s.`id`,''))"),
  992. new("S3_RECEIPT_MASTER", "PurOrdRctMaster", "mdp_stg_receipt", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`Receiver`,''))"),
  993. new("S3_RECEIPT_DETAIL", "PurOrdRctDetail", "mdp_stg_receipt", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`Receiver`,''), ':', IFNULL(s.`Line`,''))"),
  994. new("S3_WORK_ORDER_MASTER", "WorkOrdMaster", "mdp_stg_work_order_material", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`WorkOrd`,''))"),
  995. new("S3_WORK_ORDER_DETAIL", "WorkOrdDetail", "mdp_stg_work_order_material", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`WorkOrd`,''), ':', IFNULL(s.`Op`,''), ':', IFNULL(s.`ItemNum`,''))"),
  996. new("S3_WORK_ORDER_ROUTING", "WorkOrdRouting", "mdp_stg_work_order_material", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`WorkOrd`,''), ':', IFNULL(s.`OP`,''))"),
  997. new("S3_ROUTING_OUTSOURCE", "RoutingOpDetail", "mdp_stg_work_order_material", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`RoutingCode`,''), ':', IFNULL(s.`Op`,''), ':', IFNULL(s.`SupplierCode`,''))"),
  998. new("S3_INVENTORY", "InvMaster", "mdp_stg_work_order_material", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`ItemNum`,''), ':', IFNULL(s.`Location`,''))")
  999. };
  1000. }