S1MdpSyncTransformService.cs 81 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360
  1. using Admin.NET.Plugin.AiDOP.SmartOps;
  2. namespace Admin.NET.Plugin.AiDOP.Order;
  3. /// <summary>
  4. /// S1 首批 MDP 同步和标准化转换服务。
  5. /// </summary>
  6. public class S1MdpSyncTransformService : ITransient
  7. {
  8. private const string JobCode = "S1_MDP_SYNC_TRANSFORM";
  9. private readonly ISqlSugarClient _db;
  10. private readonly SmartOpsKpiAtomicBuildService _atomicBuild;
  11. public S1MdpSyncTransformService(ISqlSugarClient db, SmartOpsKpiAtomicBuildService atomicBuild)
  12. {
  13. _db = db;
  14. _atomicBuild = atomicBuild;
  15. }
  16. public async Task<S1MdpSyncTransformResult> RunFullAsync(CancellationToken cancellationToken = default, string triggerType = "AUTO")
  17. {
  18. cancellationToken.ThrowIfCancellationRequested();
  19. var now = DateTime.Now;
  20. var batchId = $"S1_MDP_FULL_{now:yyyyMMddHHmmss}";
  21. var runLogId = await InsertTransformRunLogAsync(batchId, now, triggerType);
  22. var result = new S1MdpSyncTransformResult { BatchId = batchId, RunLogId = runLogId };
  23. try
  24. {
  25. await EnsureS1RuntimeObjectsAsync();
  26. result.StageRows = await SyncStagingAsync(batchId, now, cancellationToken);
  27. result.StandardRows = await TransformStandardAsync(batchId, now, cancellationToken);
  28. result.DwdRows = await BuildDwdAsync(batchId, now, cancellationToken);
  29. result.KpiRows = await BuildS1KpiValuesAsync(batchId, now, cancellationToken);
  30. result.AtomicRows = await _atomicBuild.BuildOrderDeliveryDomainForAllDatesAsync(batchId, cancellationToken: cancellationToken);
  31. await MarkTransformRunSuccessAsync(runLogId, now, result);
  32. return result;
  33. }
  34. catch (Exception ex)
  35. {
  36. await MarkTransformRunFailedAsync(runLogId, now, ex.Message);
  37. throw;
  38. }
  39. }
  40. private async Task EnsureS1RuntimeObjectsAsync()
  41. {
  42. await _db.Ado.ExecuteCommandAsync(
  43. """
  44. CREATE TABLE IF NOT EXISTS dwd_requirement_examine_detail (
  45. id BIGINT AUTO_INCREMENT PRIMARY KEY,
  46. tenant_id BIGINT NOT NULL,
  47. factory_id BIGINT NULL,
  48. stat_date DATE NOT NULL,
  49. row_id BIGINT NOT NULL,
  50. parent_row_id BIGINT NULL,
  51. examine_id BIGINT NULL,
  52. order_entry_id BIGINT NULL,
  53. bill_no VARCHAR(100) NULL,
  54. morder_no VARCHAR(100) NULL,
  55. num VARCHAR(100) NULL,
  56. item_number VARCHAR(100) NULL,
  57. item_name VARCHAR(200) NULL,
  58. bom_number VARCHAR(100) NULL,
  59. model VARCHAR(200) NULL,
  60. kitting_time DATE NULL,
  61. item_type VARCHAR(50) NULL,
  62. erp_cls_name VARCHAR(80) NULL,
  63. qty DECIMAL(18,4) NULL,
  64. wastage DECIMAL(18,4) NULL,
  65. need_count DECIMAL(18,4) NULL,
  66. sqty DECIMAL(18,4) NULL,
  67. use_qty DECIMAL(18,4) NULL,
  68. self_lack_qty DECIMAL(18,4) NULL,
  69. lack_qty DECIMAL(18,4) NULL,
  70. mo_qty DECIMAL(18,4) NULL,
  71. make_qty DECIMAL(18,4) NULL,
  72. purchase_qty DECIMAL(18,4) NULL,
  73. purchase_occupy_qty DECIMAL(18,4) NULL,
  74. satisfy_time DATE NULL,
  75. have_ic_subs VARCHAR(10) NULL,
  76. substitute_code VARCHAR(100) NULL,
  77. create_time DATETIME NULL,
  78. source_system VARCHAR(50) NOT NULL DEFAULT 'AIDOP',
  79. sync_batch_id VARCHAR(100) NOT NULL,
  80. calc_batch_id VARCHAR(100) NOT NULL,
  81. calc_time DATETIME NOT NULL,
  82. update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  83. UNIQUE KEY uk_dwd_req_exam_detail (tenant_id, row_id, calc_batch_id),
  84. KEY idx_req_exam_tenant_batch (tenant_id, calc_batch_id),
  85. KEY idx_req_exam_bill (tenant_id, bill_no),
  86. KEY idx_req_exam_item (tenant_id, item_number)
  87. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='S1需求明细核验DWD';
  88. """);
  89. await _db.Ado.ExecuteCommandAsync(
  90. """
  91. INSERT INTO mdp_entity
  92. (tenant_id, source_id, entity_code, entity_name, entity_type, source_table_name, target_table_name, sync_mode, batch_size, status, remark)
  93. SELECT 0, s.id, 'S1_REQUIREMENT_EXAMINE_RESULT', 'S1需求核验结果主表', 'TABLE',
  94. 'b_examine_result', 'mdp_stg_so', 'FULL', 5000, 1, '需求核验结果主表,进入 S1 贴源层'
  95. FROM mdp_source s
  96. WHERE s.tenant_id=0 AND s.source_code='AIDOPDEV_MYSQL'
  97. LIMIT 1
  98. ON DUPLICATE KEY UPDATE
  99. source_id=VALUES(source_id), entity_name=VALUES(entity_name), source_table_name=VALUES(source_table_name),
  100. target_table_name=VALUES(target_table_name), sync_mode=VALUES(sync_mode), status=VALUES(status),
  101. remark=VALUES(remark), update_time=CURRENT_TIMESTAMP;
  102. """);
  103. await _db.Ado.ExecuteCommandAsync(
  104. """
  105. INSERT INTO mdp_entity
  106. (tenant_id, source_id, entity_code, entity_name, entity_type, source_table_name, target_table_name, sync_mode, batch_size, status, remark)
  107. SELECT 0, s.id, 'S1_REQUIREMENT_EXAMINE_DETAIL', 'S1需求核验BOM明细', 'TABLE',
  108. 'b_bom_child_examine', 'mdp_stg_so', 'FULL', 5000, 1, '需求核验BOM明细,进入 S1 贴源层和 DWD'
  109. FROM mdp_source s
  110. WHERE s.tenant_id=0 AND s.source_code='AIDOPDEV_MYSQL'
  111. LIMIT 1
  112. ON DUPLICATE KEY UPDATE
  113. source_id=VALUES(source_id), entity_name=VALUES(entity_name), source_table_name=VALUES(source_table_name),
  114. target_table_name=VALUES(target_table_name), sync_mode=VALUES(sync_mode), status=VALUES(status),
  115. remark=VALUES(remark), update_time=CURRENT_TIMESTAMP;
  116. """);
  117. await _db.Ado.ExecuteCommandAsync(
  118. """
  119. INSERT INTO mdp_entity
  120. (tenant_id, source_id, entity_code, entity_name, entity_type, source_table_name, target_table_name, sync_mode, incr_column, batch_size, status, remark)
  121. SELECT 0, s.id, v.entity_code, v.entity_name, 'TABLE', v.source_table_name, 'mdp_stg_so', 'INCR', 'UpdateTime', 5000, 1, v.remark
  122. FROM mdp_source s
  123. JOIN (
  124. SELECT 'S1_PRODUCT_DESIGN' AS entity_code, 'S1产品设计主表' AS entity_name, 'ado_product_design' AS source_table_name, '产品设计主表,进入订单标准层' AS remark
  125. UNION ALL SELECT 'S1_PRODUCT_DESIGN_BOM', 'S1产品设计BOM', 'ado_product_design_bom', '产品设计BOM,进入订单标准层'
  126. UNION ALL SELECT 'S1_PRODUCT_DESIGN_ROUTING', 'S1产品设计工艺路线', 'ado_product_design_routing', '产品设计工艺路线,进入订单标准层'
  127. ) v
  128. WHERE s.tenant_id=0 AND s.source_code='AIDOPDEV_MYSQL'
  129. ON DUPLICATE KEY UPDATE
  130. source_id=VALUES(source_id), entity_name=VALUES(entity_name), source_table_name=VALUES(source_table_name),
  131. target_table_name=VALUES(target_table_name), sync_mode=VALUES(sync_mode), incr_column=VALUES(incr_column),
  132. status=VALUES(status), remark=VALUES(remark), update_time=CURRENT_TIMESTAMP;
  133. """);
  134. }
  135. private async Task<int> SyncStagingAsync(string batchId, DateTime now, CancellationToken cancellationToken)
  136. {
  137. var total = 0;
  138. foreach (var entity in S1MdpEntityConfig.All)
  139. {
  140. cancellationToken.ThrowIfCancellationRequested();
  141. total += await SyncOneEntityAsync(entity, batchId, now);
  142. }
  143. return total;
  144. }
  145. private async Task<int> SyncOneEntityAsync(S1MdpEntityConfig entity, string batchId, DateTime now)
  146. {
  147. var entityRow = await _db.Ado.SqlQuerySingleAsync<S1MdpEntityRow>(
  148. "SELECT id AS Id, entity_name AS EntityName FROM mdp_entity WHERE tenant_id=0 AND entity_code=@EntityCode LIMIT 1",
  149. new SugarParameter("@EntityCode", entity.EntityCode));
  150. if (entityRow == null) throw Oops.Oh($"未找到 MDP 实体配置:{entity.EntityCode}");
  151. var columns = await _db.Ado.SqlQueryAsync<S1ColumnRow>(
  152. """
  153. SELECT COLUMN_NAME AS ColumnName
  154. FROM information_schema.COLUMNS
  155. WHERE TABLE_SCHEMA=DATABASE() AND TABLE_NAME=@TableName
  156. ORDER BY ORDINAL_POSITION
  157. """,
  158. new SugarParameter("@TableName", entity.SourceTable));
  159. if (columns.Count == 0) throw Oops.Oh($"未找到源表:{entity.SourceTable}");
  160. var names = columns.Select(u => u.ColumnName).ToList();
  161. var tenantExpr = BuildOptionalColumnExpr(names, "tenant_id", "0");
  162. var factoryExpr = BuildOptionalColumnExpr(names, "factory_id", "NULL");
  163. var companyExpr = BuildOptionalColumnExpr(names, "company_id", "NULL");
  164. var sourceRowExpr = names.Any(u => string.Equals(u, entity.SourceRowIdExpression, StringComparison.OrdinalIgnoreCase))
  165. ? $"s.`{FindColumn(names, entity.SourceRowIdExpression)}`"
  166. : entity.SourceRowIdExpression;
  167. var rawDataExpr = BuildJsonObjectExpression(names);
  168. var rowsRead = await _db.Ado.GetIntAsync($"SELECT COUNT(1) FROM `{entity.SourceTable}`");
  169. var logId = await InsertSyncLogAsync(entityRow.Id, entityRow.EntityName, batchId, rowsRead);
  170. var started = DateTime.Now;
  171. try
  172. {
  173. var affected = await _db.Ado.ExecuteCommandAsync(
  174. $"""
  175. INSERT INTO `{entity.TargetTable}`
  176. (tenant_id, factory_id, company_id, source_system, source_table, source_row_id, source_biz_key, sync_batch_id, sync_time, process_status, raw_data)
  177. SELECT
  178. {tenantExpr},
  179. {factoryExpr},
  180. {companyExpr},
  181. 'AIDOP',
  182. @SourceTable,
  183. CAST({sourceRowExpr} AS CHAR),
  184. CAST(COALESCE({entity.SourceBizKeyExpression}, CAST({sourceRowExpr} AS CHAR)) AS CHAR),
  185. @BatchId,
  186. @Now,
  187. 'PENDING',
  188. {rawDataExpr}
  189. FROM `{entity.SourceTable}` s
  190. ON DUPLICATE KEY UPDATE
  191. tenant_id=VALUES(tenant_id),
  192. factory_id=VALUES(factory_id),
  193. company_id=VALUES(company_id),
  194. source_row_id=VALUES(source_row_id),
  195. sync_batch_id=VALUES(sync_batch_id),
  196. sync_time=VALUES(sync_time),
  197. process_status=VALUES(process_status),
  198. raw_data=VALUES(raw_data),
  199. update_time=CURRENT_TIMESTAMP
  200. """,
  201. new SugarParameter("@SourceTable", entity.SourceTable),
  202. new SugarParameter("@BatchId", batchId),
  203. new SugarParameter("@Now", now));
  204. await MarkSyncLogSuccessAsync(logId, started, affected);
  205. return rowsRead;
  206. }
  207. catch (Exception ex)
  208. {
  209. await MarkSyncLogFailedAsync(logId, started, ex.Message);
  210. throw;
  211. }
  212. }
  213. private async Task<int> TransformStandardAsync(string batchId, DateTime now, CancellationToken cancellationToken)
  214. {
  215. var total = 0;
  216. foreach (var command in BuildStandardCommands(batchId, now))
  217. {
  218. cancellationToken.ThrowIfCancellationRequested();
  219. total += await _db.Ado.ExecuteCommandAsync(command.Sql, command.Parameters);
  220. }
  221. return total;
  222. }
  223. private async Task<int> BuildDwdAsync(string batchId, DateTime now, CancellationToken cancellationToken)
  224. {
  225. var total = 0;
  226. foreach (var command in BuildDwdCommands(batchId, now))
  227. {
  228. cancellationToken.ThrowIfCancellationRequested();
  229. total += await _db.Ado.ExecuteCommandAsync(command.Sql, command.Parameters);
  230. }
  231. return total;
  232. }
  233. private async Task<int> BuildS1KpiValuesAsync(string batchId, DateTime now, CancellationToken cancellationToken)
  234. {
  235. var statDate = now.Date;
  236. var rows = await CalculateS1KpiValuesAsync(batchId, statDate);
  237. var affected = 0;
  238. foreach (var row in rows)
  239. {
  240. cancellationToken.ThrowIfCancellationRequested();
  241. affected += await UpsertS1KpiValueAsync(row, statDate, now);
  242. }
  243. return affected;
  244. }
  245. private async Task<List<S1KpiCalcRow>> CalculateS1KpiValuesAsync(string batchId, DateTime statDate)
  246. {
  247. return await _db.Ado.SqlQueryAsync<S1KpiCalcRow>(
  248. """
  249. SELECT tenant_id AS TenantId, COALESCE(NULLIF(factory_id, 0), 1) AS FactoryId,
  250. 'S1_L1_001' AS MetricCode,
  251. ROUND(AVG(TIMESTAMPDIFF(HOUR, order_date, COALESCE(promised_delivery_date, capacity_date, material_ready_date, plan_delivery_date)) / 24), 4) AS MetricValue
  252. FROM mdp_std_so
  253. WHERE order_date IS NOT NULL
  254. AND COALESCE(promised_delivery_date, capacity_date, material_ready_date, plan_delivery_date) IS NOT NULL
  255. AND COALESCE(promised_delivery_date, capacity_date, material_ready_date, plan_delivery_date) >= order_date
  256. GROUP BY tenant_id, COALESCE(NULLIF(factory_id, 0), 1)
  257. UNION ALL
  258. SELECT tenant_id AS TenantId, COALESCE(NULLIF(factory_id, 0), 1) AS FactoryId,
  259. 'S1_L1_002' AS MetricCode,
  260. ROUND(100 * SUM(CASE WHEN TIMESTAMPDIFF(HOUR, order_date, COALESCE(promised_delivery_date, capacity_date, material_ready_date, plan_delivery_date)) <= 72 THEN 1 ELSE 0 END) / COUNT(1), 4) AS MetricValue
  261. FROM mdp_std_so
  262. WHERE order_date IS NOT NULL
  263. AND COALESCE(promised_delivery_date, capacity_date, material_ready_date, plan_delivery_date) IS NOT NULL
  264. GROUP BY tenant_id, COALESCE(NULLIF(factory_id, 0), 1)
  265. UNION ALL
  266. SELECT tenant_id AS TenantId, COALESCE(NULLIF(factory_id, 0), 1) AS FactoryId,
  267. 'S1_L1_003' AS MetricCode,
  268. ROUND(COUNT(1) / GREATEST(COUNT(DISTINCT NULLIF(planner_no, '')), 1), 4) AS MetricValue
  269. FROM mdp_std_so
  270. WHERE order_date IS NOT NULL AND order_date <= @StatDate
  271. GROUP BY tenant_id, COALESCE(NULLIF(factory_id, 0), 1)
  272. UNION ALL
  273. SELECT tenant_id AS TenantId, 1 AS FactoryId,
  274. 'S1_L1_004' AS MetricCode,
  275. ROUND(AVG(GREATEST(IFNULL(remaining_qty, 0), 0)) / GREATEST(SUM(IFNULL(shipped_qty, 0)), 1) * 30, 4) AS MetricValue
  276. FROM dwd_ship_trans
  277. WHERE calc_batch_id=@BatchId
  278. GROUP BY tenant_id
  279. HAVING SUM(IFNULL(shipped_qty, 0)) > 0
  280. UNION ALL
  281. SELECT tenant_id AS TenantId, COALESCE(NULLIF(factory_id, 0), 1) AS FactoryId,
  282. 'S1_L2_010' AS MetricCode,
  283. ROUND(AVG(TIMESTAMPDIFF(HOUR, order_date, COALESCE(promised_delivery_date, capacity_date, material_ready_date, plan_delivery_date)) / 24), 4) AS MetricValue
  284. FROM mdp_std_so
  285. WHERE order_date IS NOT NULL
  286. AND COALESCE(promised_delivery_date, capacity_date, material_ready_date, plan_delivery_date) IS NOT NULL
  287. AND COALESCE(promised_delivery_date, capacity_date, material_ready_date, plan_delivery_date) >= order_date
  288. GROUP BY tenant_id, COALESCE(NULLIF(factory_id, 0), 1)
  289. UNION ALL
  290. SELECT tenant_id AS TenantId, COALESCE(NULLIF(factory_id, 0), 1) AS FactoryId,
  291. 'S1_L2_011' AS MetricCode,
  292. ROUND(100 * SUM(CASE WHEN TIMESTAMPDIFF(HOUR, order_date, COALESCE(promised_delivery_date, capacity_date, material_ready_date, plan_delivery_date)) <= 72 THEN 1 ELSE 0 END) / COUNT(1), 4) AS MetricValue
  293. FROM mdp_std_so
  294. WHERE order_date IS NOT NULL
  295. AND COALESCE(promised_delivery_date, capacity_date, material_ready_date, plan_delivery_date) IS NOT NULL
  296. GROUP BY tenant_id, COALESCE(NULLIF(factory_id, 0), 1)
  297. UNION ALL
  298. SELECT tenant_id AS TenantId, COALESCE(NULLIF(factory_id, 0), 1) AS FactoryId,
  299. 'S1_L2_012' AS MetricCode,
  300. ROUND(COUNT(1) / GREATEST(COUNT(DISTINCT NULLIF(planner_no, '')), 1), 4) AS MetricValue
  301. FROM mdp_std_so
  302. WHERE order_date IS NOT NULL AND order_date <= @StatDate
  303. GROUP BY tenant_id, COALESCE(NULLIF(factory_id, 0), 1)
  304. UNION ALL
  305. SELECT tenant_id AS TenantId, 1 AS FactoryId,
  306. 'S1_L2_013' AS MetricCode,
  307. ROUND(100 * SUM(CASE WHEN planned_ship_qty > 0 THEN 1 ELSE 0 END) / COUNT(1), 4) AS MetricValue
  308. FROM dwd_ship_trans
  309. WHERE calc_batch_id=@BatchId
  310. GROUP BY tenant_id
  311. UNION ALL
  312. SELECT tenant_id AS TenantId, 1 AS FactoryId,
  313. 'S1_L2_014' AS MetricCode,
  314. ROUND(100 * SUM(CASE WHEN shipped_qty >= planned_ship_qty AND planned_ship_qty > 0 THEN 1 ELSE 0 END)
  315. / GREATEST(SUM(CASE WHEN planned_ship_qty > 0 THEN 1 ELSE 0 END), 1), 4) AS MetricValue
  316. FROM dwd_ship_trans
  317. WHERE calc_batch_id=@BatchId
  318. GROUP BY tenant_id
  319. HAVING SUM(CASE WHEN planned_ship_qty > 0 THEN 1 ELSE 0 END) > 0
  320. UNION ALL
  321. SELECT tenant_id AS TenantId, 1 AS FactoryId,
  322. 'S1_L2_015' AS MetricCode,
  323. ROUND(100 * SUM(CASE WHEN linkage_status = 'LINKED' THEN 1 ELSE 0 END)
  324. / GREATEST(SUM(CASE WHEN planned_ship_qty > 0 THEN 1 ELSE 0 END), 1), 4) AS MetricValue
  325. FROM dwd_ship_trans
  326. WHERE calc_batch_id=@BatchId
  327. GROUP BY tenant_id
  328. HAVING SUM(CASE WHEN planned_ship_qty > 0 THEN 1 ELSE 0 END) > 0
  329. UNION ALL
  330. SELECT tenant_id AS TenantId, 1 AS FactoryId,
  331. 'S1_L2_001' AS MetricCode,
  332. ROUND(AVG(TIMESTAMPDIFF(HOUR, create_time, update_time) / 24), 4) AS MetricValue
  333. FROM (
  334. SELECT tenant_id,
  335. STR_TO_DATE(NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.CreateTime')), 'null'), ''), '%Y-%m-%d %H:%i:%s.%f') AS create_time,
  336. COALESCE(
  337. STR_TO_DATE(NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.UpdateTime')), 'null'), ''), '%Y-%m-%d %H:%i:%s.%f'),
  338. STR_TO_DATE(NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.CreateTime')), 'null'), ''), '%Y-%m-%d %H:%i:%s.%f')
  339. ) AS update_time
  340. FROM mdp_stg_so
  341. WHERE source_table = 'ado_contract_review'
  342. ) t
  343. WHERE create_time IS NOT NULL AND update_time IS NOT NULL AND update_time >= create_time
  344. GROUP BY tenant_id
  345. UNION ALL
  346. SELECT tenant_id AS TenantId, 1 AS FactoryId,
  347. 'S1_L2_002' AS MetricCode,
  348. ROUND(100 * SUM(CASE WHEN TIMESTAMPDIFF(HOUR, create_time, update_time) <= 72 THEN 1 ELSE 0 END) / COUNT(1), 4) AS MetricValue
  349. FROM (
  350. SELECT tenant_id,
  351. STR_TO_DATE(NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.CreateTime')), 'null'), ''), '%Y-%m-%d %H:%i:%s.%f') AS create_time,
  352. COALESCE(
  353. STR_TO_DATE(NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.UpdateTime')), 'null'), ''), '%Y-%m-%d %H:%i:%s.%f'),
  354. STR_TO_DATE(NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.CreateTime')), 'null'), ''), '%Y-%m-%d %H:%i:%s.%f')
  355. ) AS update_time
  356. FROM mdp_stg_so
  357. WHERE source_table = 'ado_contract_review'
  358. ) t
  359. WHERE create_time IS NOT NULL AND update_time IS NOT NULL AND update_time >= create_time
  360. GROUP BY tenant_id
  361. UNION ALL
  362. SELECT tenant_id AS TenantId, 1 AS FactoryId,
  363. 'S1_L2_003' AS MetricCode,
  364. ROUND(COUNT(1) / GREATEST(COUNT(DISTINCT NULLIF(owner_account, '')), 1), 4) AS MetricValue
  365. FROM (
  366. SELECT tenant_id,
  367. COALESCE(
  368. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ResponsibleAccount')),
  369. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.CreateUser'))
  370. ) AS owner_account
  371. FROM mdp_stg_so
  372. WHERE source_table = 'ado_contract_review'
  373. ) t
  374. GROUP BY tenant_id
  375. UNION ALL
  376. SELECT tenant_id AS TenantId, 1 AS FactoryId,
  377. 'S1_L2_004' AS MetricCode,
  378. ROUND(AVG(cycle_hours / 24), 4) AS MetricValue
  379. FROM (
  380. SELECT tenant_id,
  381. COALESCE(
  382. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.DrawingDesignCycle')) REGEXP '^-?[0-9]+$'
  383. THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.DrawingDesignCycle')) AS DECIMAL(18,6)) END,
  384. TIMESTAMPDIFF(HOUR,
  385. STR_TO_DATE(NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.DrawingActualStart')), 'null'), ''), '%Y-%m-%d %H:%i:%s.%f'),
  386. STR_TO_DATE(NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.DrawingActualEnd')), 'null'), ''), '%Y-%m-%d %H:%i:%s.%f')
  387. )
  388. ) AS cycle_hours
  389. FROM mdp_stg_so
  390. WHERE source_table = 'ado_product_design'
  391. ) t
  392. WHERE cycle_hours IS NOT NULL AND cycle_hours >= 0
  393. GROUP BY tenant_id
  394. UNION ALL
  395. SELECT tenant_id AS TenantId, 1 AS FactoryId,
  396. 'S1_L2_005' AS MetricCode,
  397. ROUND(100 * SUM(CASE WHEN actual_end <= plan_end THEN 1 ELSE 0 END) / COUNT(1), 4) AS MetricValue
  398. FROM (
  399. SELECT tenant_id,
  400. STR_TO_DATE(NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.DrawingPlanEnd')), 'null'), ''), '%Y-%m-%d %H:%i:%s.%f') AS plan_end,
  401. STR_TO_DATE(NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.DrawingActualEnd')), 'null'), ''), '%Y-%m-%d %H:%i:%s.%f') AS actual_end
  402. FROM mdp_stg_so
  403. WHERE source_table = 'ado_product_design'
  404. ) t
  405. WHERE plan_end IS NOT NULL AND actual_end IS NOT NULL
  406. GROUP BY tenant_id
  407. UNION ALL
  408. SELECT tenant_id AS TenantId, 1 AS FactoryId,
  409. 'S1_L2_006' AS MetricCode,
  410. ROUND(COUNT(1) / GREATEST(COUNT(DISTINCT NULLIF(design_lead, '')), 1), 4) AS MetricValue
  411. FROM (
  412. SELECT tenant_id,
  413. COALESCE(
  414. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.DesignLeadAccount')),
  415. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.DesignLeadName')),
  416. JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.CreateUser'))
  417. ) AS design_lead
  418. FROM mdp_stg_so
  419. WHERE source_table = 'ado_product_design'
  420. ) t
  421. GROUP BY tenant_id
  422. UNION ALL
  423. SELECT tenant_id AS TenantId, 1 AS FactoryId,
  424. CASE stage_no
  425. WHEN 1 THEN 'S1_L3_001'
  426. WHEN 2 THEN 'S1_L3_002'
  427. WHEN 3 THEN 'S1_L3_003'
  428. WHEN 4 THEN 'S1_L3_004'
  429. WHEN 5 THEN 'S1_L3_005'
  430. END AS MetricCode,
  431. ROUND(AVG(cycle_hours), 4) AS MetricValue
  432. FROM (
  433. SELECT tenant_id,
  434. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.StageNo')) REGEXP '^-?[0-9]+$'
  435. THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.StageNo')) AS SIGNED) END AS stage_no,
  436. COALESCE(
  437. TIMESTAMPDIFF(HOUR,
  438. STR_TO_DATE(NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.StartTime')), 'null'), ''), '%Y-%m-%d %H:%i:%s.%f'),
  439. STR_TO_DATE(NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.CompleteTime')), 'null'), ''), '%Y-%m-%d %H:%i:%s.%f')
  440. ),
  441. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ActualDays')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$'
  442. THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ActualDays')) AS DECIMAL(18,6)) * 24 END
  443. ) AS cycle_hours
  444. FROM mdp_stg_so
  445. WHERE source_table = 'ado_contract_review_flow'
  446. ) t
  447. WHERE stage_no BETWEEN 1 AND 5
  448. AND cycle_hours IS NOT NULL AND cycle_hours >= 0
  449. GROUP BY tenant_id, stage_no
  450. UNION ALL
  451. SELECT tenant_id AS TenantId, 1 AS FactoryId,
  452. CASE stage_no
  453. WHEN 1 THEN 'S1_L3_101'
  454. WHEN 2 THEN 'S1_L3_102'
  455. WHEN 3 THEN 'S1_L3_103'
  456. WHEN 4 THEN 'S1_L3_104'
  457. WHEN 5 THEN 'S1_L3_105'
  458. END AS MetricCode,
  459. ROUND(AVG(cycle_hours), 4) AS MetricValue
  460. FROM (
  461. SELECT tenant_id,
  462. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.StageNo')) REGEXP '^-?[0-9]+$'
  463. THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.StageNo')) AS SIGNED) END AS stage_no,
  464. COALESCE(
  465. TIMESTAMPDIFF(HOUR,
  466. STR_TO_DATE(NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.StartTime')), 'null'), ''), '%Y-%m-%d %H:%i:%s.%f'),
  467. STR_TO_DATE(NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.CompleteTime')), 'null'), ''), '%Y-%m-%d %H:%i:%s.%f')
  468. ),
  469. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ActualDays')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$'
  470. THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ActualDays')) AS DECIMAL(18,6)) * 24 END
  471. ) AS cycle_hours
  472. FROM mdp_stg_so
  473. WHERE source_table = 'ado_contract_review_flow'
  474. ) t
  475. WHERE stage_no BETWEEN 1 AND 5
  476. AND cycle_hours IS NOT NULL AND cycle_hours >= 0
  477. GROUP BY tenant_id, stage_no
  478. UNION ALL
  479. SELECT tenant_id AS TenantId, 1 AS FactoryId,
  480. metric_code AS MetricCode,
  481. ROUND(AVG(cycle_hours), 4) AS MetricValue
  482. FROM (
  483. SELECT tenant_id,
  484. CASE COALESCE(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.DeptNo')), ''), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Department')))
  485. WHEN 'LAW' THEN 'S1_L4_001'
  486. WHEN 'PRE_SALES' THEN 'S1_L4_002'
  487. WHEN 'MPS' THEN 'S1_L4_003'
  488. WHEN 'TEST' THEN 'S1_L4_004'
  489. WHEN '法律事务部' THEN 'S1_L4_001'
  490. WHEN '技术售前组' THEN 'S1_L4_002'
  491. WHEN '综合主计划' THEN 'S1_L4_003'
  492. WHEN '试验站' THEN 'S1_L4_004'
  493. END AS metric_code,
  494. COALESCE(
  495. TIMESTAMPDIFF(HOUR,
  496. STR_TO_DATE(NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.StartTime')), 'null'), ''), '%Y-%m-%d %H:%i:%s.%f'),
  497. STR_TO_DATE(NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.CompleteTime')), 'null'), ''), '%Y-%m-%d %H:%i:%s.%f')
  498. ),
  499. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ActualDays')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$'
  500. THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ActualDays')) AS DECIMAL(18,6)) * 24 END
  501. ) AS cycle_hours
  502. FROM mdp_stg_so
  503. WHERE source_table = 'ado_contract_review_flow'
  504. AND JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.StageNo')) = '1'
  505. ) t
  506. WHERE metric_code IS NOT NULL
  507. AND cycle_hours IS NOT NULL AND cycle_hours >= 0
  508. GROUP BY tenant_id, metric_code
  509. UNION ALL
  510. SELECT tenant_id AS TenantId, 1 AS FactoryId,
  511. metric_code AS MetricCode,
  512. ROUND(AVG(cycle_hours), 4) AS MetricValue
  513. FROM (
  514. SELECT tenant_id,
  515. CASE COALESCE(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.DeptNo')), ''), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Department')))
  516. WHEN 'LAW' THEN 'S1_L4_101'
  517. WHEN 'PRE_SALES' THEN 'S1_L4_102'
  518. WHEN 'MPS' THEN 'S1_L4_103'
  519. WHEN 'TEST' THEN 'S1_L4_104'
  520. WHEN '法律事务部' THEN 'S1_L4_101'
  521. WHEN '技术售前组' THEN 'S1_L4_102'
  522. WHEN '综合主计划' THEN 'S1_L4_103'
  523. WHEN '试验站' THEN 'S1_L4_104'
  524. END AS metric_code,
  525. COALESCE(
  526. TIMESTAMPDIFF(HOUR,
  527. STR_TO_DATE(NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.StartTime')), 'null'), ''), '%Y-%m-%d %H:%i:%s.%f'),
  528. STR_TO_DATE(NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.CompleteTime')), 'null'), ''), '%Y-%m-%d %H:%i:%s.%f')
  529. ),
  530. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ActualDays')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$'
  531. THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ActualDays')) AS DECIMAL(18,6)) * 24 END
  532. ) AS cycle_hours
  533. FROM mdp_stg_so
  534. WHERE source_table = 'ado_contract_review_flow'
  535. AND JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.StageNo')) = '1'
  536. ) t
  537. WHERE metric_code IS NOT NULL
  538. AND cycle_hours IS NOT NULL AND cycle_hours >= 0
  539. GROUP BY tenant_id, metric_code
  540. """,
  541. new SugarParameter("@BatchId", batchId),
  542. new SugarParameter("@StatDate", statDate));
  543. }
  544. private async Task<int> UpsertS1KpiValueAsync(S1KpiCalcRow row, DateTime statDate, DateTime now)
  545. {
  546. var meta = await _db.Ado.SqlQuerySingleAsync<S1KpiMetaRow>(
  547. """
  548. SELECT MetricLevel, Direction, YellowThreshold, RedThreshold
  549. FROM ado_smart_ops_kpi_master
  550. WHERE TenantId=@TenantId AND ModuleCode='S1' AND MetricCode=@MetricCode AND IsEnabled=1
  551. LIMIT 1
  552. """,
  553. new SugarParameter("@TenantId", row.TenantId),
  554. new SugarParameter("@MetricCode", row.MetricCode));
  555. if (meta == null || row.MetricValue == null)
  556. return 0;
  557. var table = ResolveKpiValueTable(meta.MetricLevel);
  558. var current = await _db.Ado.SqlQuerySingleAsync<S1KpiValueRow>(
  559. $"""
  560. SELECT id AS Id, metric_value AS MetricValue, target_value AS TargetValue
  561. FROM {table}
  562. WHERE tenant_id=@TenantId AND factory_id=@FactoryId AND module_code='S1'
  563. AND metric_code=@MetricCode AND biz_date=@BizDate AND is_deleted=0
  564. ORDER BY id
  565. LIMIT 1
  566. """,
  567. new SugarParameter("@TenantId", row.TenantId),
  568. new SugarParameter("@FactoryId", row.FactoryId),
  569. new SugarParameter("@MetricCode", row.MetricCode),
  570. new SugarParameter("@BizDate", statDate));
  571. var prior = await _db.Ado.SqlQuerySingleAsync<S1KpiValueRow>(
  572. $"""
  573. SELECT id AS Id, metric_value AS MetricValue, target_value AS TargetValue
  574. FROM {table}
  575. WHERE tenant_id=@TenantId AND factory_id=@FactoryId AND module_code='S1'
  576. AND metric_code=@MetricCode AND biz_date<@BizDate AND is_deleted=0
  577. ORDER BY biz_date DESC, id DESC
  578. LIMIT 1
  579. """,
  580. new SugarParameter("@TenantId", row.TenantId),
  581. new SugarParameter("@FactoryId", row.FactoryId),
  582. new SugarParameter("@MetricCode", row.MetricCode),
  583. new SugarParameter("@BizDate", statDate));
  584. var actual = Math.Round(row.MetricValue.Value, 4);
  585. var target = current?.TargetValue ?? prior?.TargetValue ?? DefaultS1Target(row.MetricCode);
  586. var status = ResolveKpiStatus(actual, target, meta.Direction, meta.YellowThreshold, meta.RedThreshold);
  587. var trend = ResolveTrendFlag(actual, prior?.MetricValue);
  588. if (current != null)
  589. {
  590. return await _db.Ado.ExecuteCommandAsync(
  591. $"""
  592. UPDATE {table}
  593. SET metric_value=@MetricValue, target_value=@TargetValue, status_color=@StatusColor, trend_flag=@TrendFlag,
  594. is_active=1, status='ACTIVE', calc_time=@CalcTime, update_time=@CalcTime
  595. WHERE tenant_id=@TenantId AND factory_id=@FactoryId AND module_code='S1'
  596. AND metric_code=@MetricCode AND biz_date=@BizDate AND is_deleted=0
  597. """,
  598. new SugarParameter("@MetricValue", actual),
  599. new SugarParameter("@TargetValue", target),
  600. new SugarParameter("@StatusColor", status),
  601. new SugarParameter("@TrendFlag", trend),
  602. new SugarParameter("@CalcTime", now),
  603. new SugarParameter("@TenantId", row.TenantId),
  604. new SugarParameter("@FactoryId", row.FactoryId),
  605. new SugarParameter("@MetricCode", row.MetricCode),
  606. new SugarParameter("@BizDate", statDate));
  607. }
  608. var nextId = await _db.Ado.GetLongAsync($"SELECT COALESCE(MAX(id), 0) + 1 FROM {table}");
  609. return await _db.Ado.ExecuteCommandAsync(
  610. $"""
  611. INSERT INTO {table}
  612. (id, tenant_id, factory_id, status, biz_date, create_time, update_time, is_deleted, is_active,
  613. module_code, metric_code, metric_value, target_value, status_color, trend_flag, calc_time)
  614. VALUES
  615. (@Id, @TenantId, @FactoryId, 'ACTIVE', @BizDate, @CalcTime, @CalcTime, 0, 1,
  616. 'S1', @MetricCode, @MetricValue, @TargetValue, @StatusColor, @TrendFlag, @CalcTime)
  617. """,
  618. new SugarParameter("@Id", nextId),
  619. new SugarParameter("@TenantId", row.TenantId),
  620. new SugarParameter("@FactoryId", row.FactoryId),
  621. new SugarParameter("@BizDate", statDate),
  622. new SugarParameter("@CalcTime", now),
  623. new SugarParameter("@MetricCode", row.MetricCode),
  624. new SugarParameter("@MetricValue", actual),
  625. new SugarParameter("@TargetValue", target),
  626. new SugarParameter("@StatusColor", status),
  627. new SugarParameter("@TrendFlag", trend));
  628. }
  629. private IEnumerable<S1MdpSqlCommand> BuildStandardCommands(string batchId, DateTime now)
  630. {
  631. yield return Cmd(
  632. """
  633. INSERT INTO mdp_std_so
  634. (tenant_id, factory_id, company_id, source_system, order_id, order_entry_id, order_no, order_line, order_type,
  635. customer_id, customer_no, customer_name, customer_order_no, country, item_code, item_name, item_spec,
  636. map_number, map_name, bom_number, unit, order_qty, delivered_notice_qty, delivered_qty, price, tax_price,
  637. amount, total_amount, order_date, customer_request_date, plan_delivery_date, promised_delivery_date,
  638. capacity_date, material_ready_date, planner_no, planner_name, order_status, review_status, review_stage,
  639. flow_state, progress, urgent, closed, deleted_flag, source_table, source_row_id, source_biz_key, sync_batch_id, sync_time)
  640. SELECT
  641. COALESCE(e.tenant_id, h.tenant_id, 0),
  642. COALESCE(e.factory_id, h.factory_id),
  643. e.company_id,
  644. 'AIDOP',
  645. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.Id')) REGEXP '^-?[0-9]+$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.Id')) AS SIGNED) END,
  646. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.Id')) REGEXP '^-?[0-9]+$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.Id')) AS SIGNED) END,
  647. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.bill_no')), JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.bill_no')), e.source_biz_key),
  648. CAST(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.entry_seq')), JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.Id'))) AS CHAR),
  649. CAST(JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.order_type')) AS CHAR),
  650. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.custom_id')) REGEXP '^-?[0-9]+$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.custom_id')) AS SIGNED) END,
  651. JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.custom_no')),
  652. JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.custom_name')),
  653. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.custom_order_bill_no')), JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.bill_from'))),
  654. JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.country')),
  655. JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.item_number')),
  656. JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.item_name')),
  657. JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.specification')),
  658. JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.map_number')),
  659. JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.map_name')),
  660. JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.bom_number')),
  661. JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.unit')),
  662. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.qty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.qty')) AS DECIMAL(18,6)) END,
  663. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.deliver_notice_count')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.deliver_notice_count')) AS DECIMAL(18,6)) END,
  664. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.deliver_count')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.deliver_count')) AS DECIMAL(18,6)) END,
  665. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.price')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.price')) AS DECIMAL(18,6)) END,
  666. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.tax_price')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.tax_price')) AS DECIMAL(18,6)) END,
  667. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.amount')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.amount')) AS DECIMAL(18,6)) END,
  668. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.total_amount')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.total_amount')) AS DECIMAL(18,6)) END,
  669. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.date')), 'null'), ''),
  670. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.rdate')), 'null'), ''),
  671. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.plan_date')), 'null'), ''),
  672. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.date')), 'null'), ''),
  673. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.sys_capacity_date')), 'null'), ''),
  674. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.sys_material_date')), 'null'), ''),
  675. JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.planner_no')),
  676. JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.planner_name')),
  677. CASE WHEN JSON_EXTRACT(h.raw_data,'$.closed') IN (1, true) THEN 'CLOSED' ELSE 'OPEN' END,
  678. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(r.raw_data,'$.FlowStatus')), JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.flowstate'))),
  679. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(r.raw_data,'$.CurrentDept')), JSON_UNQUOTE(JSON_EXTRACT(r.raw_data,'$.CurrentStage'))),
  680. JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.flowstate')),
  681. JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.progress')),
  682. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(COALESCE(e.raw_data, h.raw_data),'$.urgent')) REGEXP '^-?[0-9]+$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(COALESCE(e.raw_data, h.raw_data),'$.urgent')) AS SIGNED) END,
  683. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.closed')) REGEXP '^-?[0-9]+$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.closed')) AS SIGNED) END,
  684. COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(COALESCE(e.raw_data, h.raw_data),'$.IsDeleted')) REGEXP '^-?[0-9]+$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(COALESCE(e.raw_data, h.raw_data),'$.IsDeleted')) AS SIGNED) END, 0),
  685. e.source_table,
  686. e.source_row_id,
  687. e.source_biz_key,
  688. @BatchId,
  689. @Now
  690. FROM mdp_stg_so e
  691. LEFT JOIN mdp_stg_so h ON h.source_table='crm_seorder'
  692. AND JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.Id')) = JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.seorder_id'))
  693. LEFT JOIN mdp_stg_so r ON r.source_table='ado_contract_review'
  694. AND JSON_UNQUOTE(JSON_EXTRACT(r.raw_data,'$.BillNo')) = COALESCE(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.contract_no')), JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.bill_no')), JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.bill_no')))
  695. WHERE e.source_table='crm_seorderentry'
  696. AND IFNULL(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(e.raw_data,'$.bill_no')), JSON_UNQUOTE(JSON_EXTRACT(h.raw_data,'$.bill_no'))), '') <> ''
  697. ON DUPLICATE KEY UPDATE
  698. customer_no=VALUES(customer_no), customer_name=VALUES(customer_name), item_code=VALUES(item_code),
  699. item_name=VALUES(item_name), item_spec=VALUES(item_spec), order_qty=VALUES(order_qty),
  700. delivered_notice_qty=VALUES(delivered_notice_qty), delivered_qty=VALUES(delivered_qty),
  701. plan_delivery_date=VALUES(plan_delivery_date), promised_delivery_date=VALUES(promised_delivery_date),
  702. capacity_date=VALUES(capacity_date), material_ready_date=VALUES(material_ready_date),
  703. order_status=VALUES(order_status), review_status=VALUES(review_status), review_stage=VALUES(review_stage),
  704. flow_state=VALUES(flow_state), progress=VALUES(progress), deleted_flag=VALUES(deleted_flag),
  705. sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
  706. """, batchId, now);
  707. yield return Cmd(
  708. """
  709. INSERT INTO mdp_std_ship_trans
  710. (tenant_id, factory_id, company_id, source_system, trans_type, plan_id, plan_no, plan_line, order_id, order_entry_id,
  711. order_no, order_line, customer_no, customer_name, country, item_code, item_name, item_spec, qty, plan_qty,
  712. weight, volume, order_date, plan_ship_date, shipping_site, shipping_address, consignee, telephone,
  713. status, confirm_status, source_table, source_row_id, source_biz_key, sync_batch_id, sync_time)
  714. SELECT
  715. IFNULL(d.tenant_id, 0),
  716. d.factory_id,
  717. d.company_id,
  718. 'AIDOP',
  719. 'SHIP_PLAN',
  720. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.plan_id')) REGEXP '^-?[0-9]+$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.plan_id')) AS SIGNED) END,
  721. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.LotSerial')), CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.plan_id')) AS CHAR)),
  722. CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.RecID')) AS CHAR),
  723. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.seorder_id')) REGEXP '^-?[0-9]+$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.seorder_id')) AS SIGNED) END,
  724. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.sentry_id')) REGEXP '^-?[0-9]+$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.sentry_id')) AS SIGNED) END,
  725. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.bill_no')), JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.OrdNbr'))),
  726. CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.sentry_id')) AS CHAR),
  727. JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.CustomNo')),
  728. JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.CustomName')),
  729. JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Country')),
  730. JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.ItemNum')),
  731. JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.ItemName')),
  732. JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Specification')),
  733. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Qty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Qty')) AS DECIMAL(18,6)) END,
  734. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Qty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Qty')) AS DECIMAL(18,6)) END,
  735. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Weight')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Weight')) AS DECIMAL(18,6)) END,
  736. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Volume')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Volume')) AS DECIMAL(18,6)) END,
  737. NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.OrdDate')), 'null'), ''),
  738. NULLIF(NULLIF(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.ShippingDate')), JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.CreateTime'))), 'null'), ''),
  739. JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.ShippingSite')),
  740. JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.ShippingAddress')),
  741. JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.Consignee')),
  742. JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.Telephone')),
  743. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Status')), JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.Status'))),
  744. CAST(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.IsConfirm')), JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.IsConfirm'))) AS CHAR),
  745. d.source_table,
  746. d.source_row_id,
  747. d.source_biz_key,
  748. @BatchId,
  749. @Now
  750. FROM mdp_stg_ship_trans d
  751. LEFT JOIN mdp_stg_ship_trans m ON m.source_table='ShippingPlan'
  752. AND JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.RecID')) = JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.plan_id'))
  753. WHERE d.source_table='ShippingPlanDetail'
  754. AND IFNULL(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.bill_no')), JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.OrdNbr'))), '') <> ''
  755. ON DUPLICATE KEY UPDATE
  756. plan_no=VALUES(plan_no), order_no=VALUES(order_no), customer_no=VALUES(customer_no), customer_name=VALUES(customer_name),
  757. item_code=VALUES(item_code), item_name=VALUES(item_name), qty=VALUES(qty), plan_qty=VALUES(plan_qty),
  758. plan_ship_date=VALUES(plan_ship_date), shipping_site=VALUES(shipping_site), shipping_address=VALUES(shipping_address),
  759. status=VALUES(status), confirm_status=VALUES(confirm_status), sync_batch_id=VALUES(sync_batch_id),
  760. sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
  761. """, batchId, now);
  762. yield return Cmd(
  763. """
  764. INSERT INTO mdp_std_ship_trans
  765. (tenant_id, factory_id, source_system, trans_type, shipper_rec_id, shipper_no, shipper_line, order_no, order_line,
  766. customer_no, item_code, item_name, qty_to_ship, picking_qty, real_qty, gross_weight, net_weight, volume,
  767. plan_ship_date, actual_ship_date, site, status, confirm_status, source_table, source_row_id, source_biz_key, sync_batch_id, sync_time)
  768. SELECT
  769. IFNULL(d.tenant_id, 0),
  770. d.factory_id,
  771. 'AIDOP',
  772. 'ASN_SHIPPER',
  773. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.ASNBOLShipperRecID')) REGEXP '^-?[0-9]+$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.ASNBOLShipperRecID')) AS SIGNED) END,
  774. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Id')), JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.Id'))),
  775. CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Line')) AS CHAR),
  776. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.OrdNbr')), JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.OrdNbr'))),
  777. CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.OrdLine')) AS CHAR),
  778. JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.SoldTo')),
  779. JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.ContainerItem')),
  780. JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Descr')),
  781. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.QtyToShip')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.QtyToShip')) AS DECIMAL(18,6)) END,
  782. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.PickingQty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.PickingQty')) AS DECIMAL(18,6)) END,
  783. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.RealQty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.RealQty')) AS DECIMAL(18,6)) END,
  784. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.GrossWeight')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.GrossWeight')) AS DECIMAL(18,6)) END,
  785. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.NetWeight')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.NetWeight')) AS DECIMAL(18,6)) END,
  786. CASE WHEN COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Volume')), JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.Volume'))) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Volume')), JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.Volume'))) AS DECIMAL(18,6)) END,
  787. NULLIF(NULLIF(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.ShipDate')), JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.ShipDate'))), 'null'), ''),
  788. NULLIF(NULLIF(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.ShipDate')), JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.ShipDate'))), 'null'), ''),
  789. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Site')), JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.Site'))),
  790. COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Status')), JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.Status'))),
  791. CAST(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.IsConfirm')), JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.IsConfirm'))) AS CHAR),
  792. d.source_table,
  793. d.source_row_id,
  794. d.source_biz_key,
  795. @BatchId,
  796. @Now
  797. FROM mdp_stg_ship_trans d
  798. LEFT JOIN mdp_stg_ship_trans m ON m.source_table='ASNBOLShipperMaster'
  799. AND JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.RecID')) = JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.ASNBOLShipperRecID'))
  800. WHERE d.source_table='ASNBOLShipperDetail'
  801. AND IFNULL(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Id')), JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.Id'))), '') <> ''
  802. ON DUPLICATE KEY UPDATE
  803. shipper_no=VALUES(shipper_no), order_no=VALUES(order_no), order_line=VALUES(order_line),
  804. customer_no=VALUES(customer_no), item_code=VALUES(item_code), item_name=VALUES(item_name),
  805. qty_to_ship=VALUES(qty_to_ship), picking_qty=VALUES(picking_qty), real_qty=VALUES(real_qty),
  806. actual_ship_date=VALUES(actual_ship_date), site=VALUES(site), status=VALUES(status),
  807. confirm_status=VALUES(confirm_status), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time),
  808. update_time=CURRENT_TIMESTAMP
  809. """, batchId, now);
  810. yield return Cmd(
  811. """
  812. INSERT INTO mdp_std_ship_trans
  813. (tenant_id, factory_id, source_system, trans_type, order_no, customer_no, item_code, item_name, qty,
  814. plan_ship_date, status, linkage_status, source_table, source_row_id, source_biz_key, sync_batch_id, sync_time)
  815. SELECT
  816. IFNULL(d.tenant_id, 0),
  817. d.factory_id,
  818. 'AIDOP',
  819. 'LINKAGE_PLAN',
  820. JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.bill_no')),
  821. JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.custom_no')),
  822. JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.item_number')),
  823. JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Descr')),
  824. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.qty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.qty')) AS DECIMAL(18,6)) END,
  825. NULLIF(NULLIF(COALESCE(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.sys_capacity_date')), JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.fystarttime'))), 'null'), ''),
  826. JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.type')),
  827. CASE
  828. WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.isuse')) = '1' THEN 'LINKED'
  829. ELSE 'INACTIVE'
  830. END,
  831. d.source_table,
  832. d.source_row_id,
  833. d.source_biz_key,
  834. @BatchId,
  835. @Now
  836. FROM mdp_stg_ship_trans d
  837. WHERE d.source_table='LinkagePlan'
  838. AND IFNULL(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.bill_no')), '') <> ''
  839. ON DUPLICATE KEY UPDATE
  840. order_no=VALUES(order_no), customer_no=VALUES(customer_no), item_code=VALUES(item_code),
  841. item_name=VALUES(item_name), qty=VALUES(qty), plan_ship_date=VALUES(plan_ship_date),
  842. status=VALUES(status), linkage_status=VALUES(linkage_status), sync_batch_id=VALUES(sync_batch_id),
  843. sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
  844. """, batchId, now);
  845. }
  846. private IEnumerable<S1MdpSqlCommand> BuildDwdCommands(string batchId, DateTime now)
  847. {
  848. yield return Cmd(
  849. """
  850. INSERT INTO dwd_requirement_examine_detail
  851. (tenant_id, factory_id, stat_date, row_id, parent_row_id, examine_id, order_entry_id, bill_no, morder_no,
  852. num, item_number, item_name, bom_number, model, kitting_time, item_type, erp_cls_name, qty, wastage,
  853. need_count, sqty, use_qty, self_lack_qty, lack_qty, mo_qty, make_qty, purchase_qty, purchase_occupy_qty,
  854. satisfy_time, have_ic_subs, substitute_code, create_time, source_system, sync_batch_id, calc_batch_id, calc_time)
  855. SELECT
  856. COALESCE(d.tenant_id, r.tenant_id, so.tenant_id, 0),
  857. COALESCE(d.factory_id, r.factory_id, so.factory_id),
  858. @StatDate,
  859. CAST(d.source_row_id AS SIGNED),
  860. CASE
  861. WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.num')) = '1' THEN NULL
  862. ELSE p.parent_row_id
  863. END,
  864. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.examine_id')) REGEXP '^-?[0-9]+$'
  865. THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.examine_id')) AS SIGNED) END,
  866. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(r.raw_data,'$.sentry_id')) REGEXP '^-?[0-9]+$'
  867. THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(r.raw_data,'$.sentry_id')) AS SIGNED) END,
  868. COALESCE(so.order_no, JSON_UNQUOTE(JSON_EXTRACT(r.raw_data,'$.bill_no'))),
  869. JSON_UNQUOTE(JSON_EXTRACT(r.raw_data,'$.morder_no')),
  870. CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.num')) AS CHAR),
  871. JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.item_number')),
  872. JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.item_name')),
  873. JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.bom_number')),
  874. JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.model')),
  875. STR_TO_DATE(NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.kitting_time')), 'null'), ''), '%Y-%m-%d %H:%i:%s.%f'),
  876. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.type')) = '1' THEN '替代件' ELSE '标准件' END,
  877. COALESCE(
  878. JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.erp_cls_name')),
  879. CASE JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.erp_cls'))
  880. WHEN '0' THEN '配置类'
  881. WHEN '1' THEN '自制'
  882. WHEN '2' THEN '委外加工'
  883. WHEN '3' THEN '外购'
  884. WHEN '4' THEN '虚拟件'
  885. END
  886. ),
  887. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.qty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.qty')) AS DECIMAL(18,4)) END,
  888. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.wastage')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.wastage')) AS DECIMAL(18,4)) END,
  889. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.needCount')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.needCount')) AS DECIMAL(18,4)) END,
  890. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.sqty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.sqty')) AS DECIMAL(18,4)) END,
  891. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.use_qty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.use_qty')) AS DECIMAL(18,4)) END,
  892. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.self_lack_qty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.self_lack_qty')) AS DECIMAL(18,4)) END,
  893. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.lack_qty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.lack_qty')) AS DECIMAL(18,4)) END,
  894. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.mo_qty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.mo_qty')) AS DECIMAL(18,4)) END,
  895. CASE
  896. WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.make_qty')) = '0' AND JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.erp_cls')) = '1'
  897. THEN CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.lack_qty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.lack_qty')) AS DECIMAL(18,4)) END
  898. WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.make_qty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$'
  899. THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.make_qty')) AS DECIMAL(18,4))
  900. END,
  901. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.purchase_qty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.purchase_qty')) AS DECIMAL(18,4)) END,
  902. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.purchase_occupy_qty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.purchase_occupy_qty')) AS DECIMAL(18,4)) END,
  903. STR_TO_DATE(NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.satisfy_time')), 'null'), ''), '%Y-%m-%d %H:%i:%s.%f'),
  904. CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.haveicsubs')) = '1' THEN '是' ELSE '否' END,
  905. JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.substitute_code')),
  906. STR_TO_DATE(NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(r.raw_data,'$.create_time')), 'null'), ''), '%Y-%m-%d %H:%i:%s.%f'),
  907. 'AIDOP',
  908. d.sync_batch_id,
  909. @BatchId,
  910. @Now
  911. FROM mdp_stg_so d
  912. INNER JOIN mdp_stg_so r ON r.source_table='b_examine_result'
  913. AND r.source_row_id = JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.examine_id'))
  914. LEFT JOIN mdp_std_so so ON so.tenant_id = COALESCE(r.tenant_id, d.tenant_id)
  915. AND so.order_entry_id = CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(r.raw_data,'$.sentry_id')) REGEXP '^-?[0-9]+$'
  916. THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(r.raw_data,'$.sentry_id')) AS SIGNED) END
  917. LEFT JOIN (
  918. SELECT examine_id, MIN(source_row_id) AS parent_row_id
  919. FROM (
  920. SELECT JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.examine_id')) AS examine_id, CAST(source_row_id AS SIGNED) AS source_row_id
  921. FROM mdp_stg_so
  922. WHERE source_table='b_bom_child_examine'
  923. AND JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.num')) = '1'
  924. AND JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.is_use')) IN ('1', 'true', 'True', 'base64:type16:AQ==')
  925. ) x
  926. GROUP BY examine_id
  927. ) p ON p.examine_id = JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.examine_id'))
  928. WHERE d.source_table='b_bom_child_examine'
  929. AND JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.is_use')) IN ('1', 'true', 'True', 'base64:type16:AQ==')
  930. ON DUPLICATE KEY UPDATE
  931. parent_row_id=VALUES(parent_row_id), order_entry_id=VALUES(order_entry_id), bill_no=VALUES(bill_no),
  932. morder_no=VALUES(morder_no), num=VALUES(num), item_number=VALUES(item_number), item_name=VALUES(item_name),
  933. bom_number=VALUES(bom_number), model=VALUES(model), kitting_time=VALUES(kitting_time),
  934. item_type=VALUES(item_type), erp_cls_name=VALUES(erp_cls_name), qty=VALUES(qty), wastage=VALUES(wastage),
  935. need_count=VALUES(need_count), sqty=VALUES(sqty), use_qty=VALUES(use_qty), self_lack_qty=VALUES(self_lack_qty),
  936. lack_qty=VALUES(lack_qty), mo_qty=VALUES(mo_qty), make_qty=VALUES(make_qty), purchase_qty=VALUES(purchase_qty),
  937. purchase_occupy_qty=VALUES(purchase_occupy_qty), satisfy_time=VALUES(satisfy_time), have_ic_subs=VALUES(have_ic_subs),
  938. substitute_code=VALUES(substitute_code), create_time=VALUES(create_time), sync_batch_id=VALUES(sync_batch_id),
  939. calc_time=VALUES(calc_time), update_time=CURRENT_TIMESTAMP
  940. """, batchId, now);
  941. yield return Cmd(
  942. """
  943. INSERT INTO dwd_ship_trans
  944. (tenant_id, factory_id, company_id, stat_date, order_id, order_entry_id, order_no, order_line, customer_no,
  945. customer_name, country, item_code, item_name, item_spec, order_qty, planned_ship_qty, shipped_qty,
  946. remaining_qty, order_date, customer_request_date, plan_delivery_date, promised_delivery_date,
  947. plan_ship_date, actual_ship_date, review_status, order_status, delivery_status, linkage_status, risk_level,
  948. source_system, source_table, source_row_id, source_biz_key, sync_batch_id, calc_batch_id, calc_time)
  949. SELECT
  950. so.tenant_id,
  951. so.factory_id,
  952. so.company_id,
  953. @StatDate,
  954. so.order_id,
  955. so.order_entry_id,
  956. so.order_no,
  957. IFNULL(so.order_line, ''),
  958. so.customer_no,
  959. so.customer_name,
  960. so.country,
  961. IFNULL(so.item_code, ''),
  962. so.item_name,
  963. so.item_spec,
  964. IFNULL(so.order_qty, 0),
  965. IFNULL(p.plan_qty, 0),
  966. IFNULL(a.real_qty, 0),
  967. GREATEST(IFNULL(so.order_qty, 0) - IFNULL(a.real_qty, 0), 0),
  968. so.order_date,
  969. so.customer_request_date,
  970. so.plan_delivery_date,
  971. so.promised_delivery_date,
  972. p.plan_ship_date,
  973. a.actual_ship_date,
  974. so.review_status,
  975. so.order_status,
  976. CASE
  977. WHEN IFNULL(so.order_qty, 0) > 0 AND IFNULL(a.real_qty, 0) >= IFNULL(so.order_qty, 0) THEN 'COMPLETED'
  978. WHEN COALESCE(p.plan_ship_date, so.promised_delivery_date, so.plan_delivery_date) < @Now THEN 'DELAYED'
  979. WHEN IFNULL(p.plan_qty, 0) > 0 THEN 'PLANNED'
  980. ELSE 'OPEN'
  981. END,
  982. l.linkage_status,
  983. CASE
  984. WHEN COALESCE(p.plan_ship_date, so.promised_delivery_date, so.plan_delivery_date) < @Now
  985. AND IFNULL(a.real_qty, 0) < IFNULL(so.order_qty, 0) THEN 'HIGH'
  986. WHEN IFNULL(a.real_qty, 0) < IFNULL(so.order_qty, 0) THEN 'MEDIUM'
  987. ELSE 'LOW'
  988. END,
  989. 'AIDOP',
  990. so.source_table,
  991. so.source_row_id,
  992. so.source_biz_key,
  993. so.sync_batch_id,
  994. @BatchId,
  995. @Now
  996. FROM mdp_std_so so
  997. LEFT JOIN (
  998. SELECT tenant_id, order_no, order_entry_id, IFNULL(order_line, '') AS order_line, IFNULL(item_code, '') AS item_code,
  999. SUM(IFNULL(plan_qty, IFNULL(qty, 0))) AS plan_qty,
  1000. MIN(plan_ship_date) AS plan_ship_date
  1001. FROM mdp_std_ship_trans
  1002. WHERE trans_type='SHIP_PLAN'
  1003. GROUP BY tenant_id, order_no, order_entry_id, IFNULL(order_line, ''), IFNULL(item_code, '')
  1004. ) p ON so.tenant_id=p.tenant_id
  1005. AND so.order_no=p.order_no
  1006. AND IFNULL(so.item_code, '')=p.item_code
  1007. AND (
  1008. (p.order_entry_id IS NOT NULL AND so.order_entry_id=p.order_entry_id)
  1009. OR (p.order_entry_id IS NULL AND IFNULL(so.order_line, '')=p.order_line)
  1010. )
  1011. LEFT JOIN (
  1012. SELECT tenant_id, order_no, IFNULL(order_line, '') AS order_line, IFNULL(item_code, '') AS item_code,
  1013. SUM(IFNULL(real_qty, IFNULL(qty_to_ship, 0))) AS real_qty,
  1014. MAX(actual_ship_date) AS actual_ship_date
  1015. FROM mdp_std_ship_trans
  1016. WHERE trans_type='ASN_SHIPPER'
  1017. GROUP BY tenant_id, order_no, IFNULL(order_line, ''), IFNULL(item_code, '')
  1018. ) a ON so.tenant_id=a.tenant_id AND so.order_no=a.order_no AND IFNULL(so.order_line, '')=a.order_line AND IFNULL(so.item_code, '')=a.item_code
  1019. LEFT JOIN (
  1020. SELECT tenant_id, order_no, item_code, MAX(linkage_status) AS linkage_status
  1021. FROM mdp_std_ship_trans
  1022. WHERE IFNULL(linkage_status, '') <> ''
  1023. GROUP BY tenant_id, order_no, item_code
  1024. ) l ON so.tenant_id=l.tenant_id AND so.order_no=l.order_no AND IFNULL(so.item_code, '')=IFNULL(l.item_code, '')
  1025. WHERE IFNULL(so.order_no, '') <> ''
  1026. ON DUPLICATE KEY UPDATE
  1027. customer_no=VALUES(customer_no), customer_name=VALUES(customer_name), item_name=VALUES(item_name),
  1028. order_qty=VALUES(order_qty), planned_ship_qty=VALUES(planned_ship_qty), shipped_qty=VALUES(shipped_qty),
  1029. remaining_qty=VALUES(remaining_qty), plan_ship_date=VALUES(plan_ship_date), actual_ship_date=VALUES(actual_ship_date),
  1030. review_status=VALUES(review_status), order_status=VALUES(order_status), delivery_status=VALUES(delivery_status),
  1031. linkage_status=VALUES(linkage_status), risk_level=VALUES(risk_level), sync_batch_id=VALUES(sync_batch_id),
  1032. calc_batch_id=VALUES(calc_batch_id), calc_time=VALUES(calc_time), update_time=CURRENT_TIMESTAMP
  1033. """, batchId, now);
  1034. }
  1035. private async Task<long> InsertSyncLogAsync(long entityId, string entityName, string batchId, int rowsRead)
  1036. {
  1037. await _db.Ado.ExecuteCommandAsync(
  1038. """
  1039. INSERT INTO mdp_sync_log
  1040. (tenant_id, entity_id, source_code, entity_name, sync_batch_id, sync_type, trigger_type, sync_start, rows_read, status)
  1041. VALUES (0, @EntityId, 'AIDOPDEV_MYSQL', @EntityName, @BatchId, 'FULL', 'AUTO', NOW(), @RowsRead, 'RUNNING')
  1042. """,
  1043. new SugarParameter("@EntityId", entityId),
  1044. new SugarParameter("@EntityName", entityName),
  1045. new SugarParameter("@BatchId", batchId),
  1046. new SugarParameter("@RowsRead", rowsRead));
  1047. return await _db.Ado.GetLongAsync(
  1048. "SELECT id FROM mdp_sync_log WHERE sync_batch_id=@BatchId AND entity_id=@EntityId ORDER BY id DESC LIMIT 1",
  1049. new List<SugarParameter>
  1050. {
  1051. new("@BatchId", batchId),
  1052. new("@EntityId", entityId)
  1053. });
  1054. }
  1055. private async Task MarkSyncLogSuccessAsync(long logId, DateTime started, int affected)
  1056. {
  1057. await _db.Ado.ExecuteCommandAsync(
  1058. """
  1059. UPDATE mdp_sync_log
  1060. SET sync_end=NOW(), duration_ms=@DurationMs, rows_insert=@RowsInsert, rows_update=0, rows_skip=0, rows_error=0, status='SUCCESS'
  1061. WHERE id=@Id
  1062. """,
  1063. new SugarParameter("@DurationMs", (int)(DateTime.Now - started).TotalMilliseconds),
  1064. new SugarParameter("@RowsInsert", affected),
  1065. new SugarParameter("@Id", logId));
  1066. }
  1067. private async Task MarkSyncLogFailedAsync(long logId, DateTime started, string message)
  1068. {
  1069. try
  1070. {
  1071. await _db.Ado.ExecuteCommandAsync(
  1072. """
  1073. UPDATE mdp_sync_log
  1074. SET sync_end=NOW(), duration_ms=@DurationMs, rows_error=1, status='FAILED', error_msg=@ErrorMsg
  1075. WHERE id=@Id
  1076. """,
  1077. new SugarParameter("@DurationMs", (int)(DateTime.Now - started).TotalMilliseconds),
  1078. new SugarParameter("@ErrorMsg", Truncate(message, 1000)),
  1079. new SugarParameter("@Id", logId));
  1080. }
  1081. catch (Exception ex)
  1082. {
  1083. // 写库自身失败兜底:避免再抛掩盖原异常;遗留 RUNNING 行可由运维手动清理
  1084. Console.Error.WriteLine($"[S1MdpSyncTransform] MarkSyncLogFailed write failed (syncLogId={logId}): {ex.Message}");
  1085. }
  1086. }
  1087. private async Task<long> InsertTransformRunLogAsync(string batchId, DateTime startedAt, string triggerType)
  1088. {
  1089. await _db.Ado.ExecuteCommandAsync(
  1090. """
  1091. INSERT INTO mdp_transform_run_log
  1092. (tenant_id, job_code, job_name, trigger_type, batch_id, status, start_time)
  1093. VALUES (0, @JobCode, 'S1 MDP同步与标准化转换', @TriggerType, @BatchId, 'RUNNING', @StartTime)
  1094. """,
  1095. new SugarParameter("@JobCode", JobCode),
  1096. new SugarParameter("@TriggerType", NormalizeTriggerType(triggerType)),
  1097. new SugarParameter("@BatchId", batchId),
  1098. new SugarParameter("@StartTime", startedAt));
  1099. return await _db.Ado.GetLongAsync(
  1100. "SELECT id FROM mdp_transform_run_log WHERE batch_id=@BatchId ORDER BY id DESC LIMIT 1",
  1101. new List<SugarParameter> { new("@BatchId", batchId) });
  1102. }
  1103. private async Task MarkTransformRunSuccessAsync(long runLogId, DateTime startedAt, S1MdpSyncTransformResult result)
  1104. {
  1105. var finishedAt = DateTime.Now;
  1106. await _db.Ado.ExecuteCommandAsync(
  1107. """
  1108. UPDATE mdp_transform_run_log
  1109. SET status='SUCCESS', end_time=@EndTime, duration_ms=@DurationMs,
  1110. stage_rows=@StageRows, standard_rows=@StandardRows, dwd_rows=@DwdRows,
  1111. summary_json=@SummaryJson, update_time=CURRENT_TIMESTAMP
  1112. WHERE id=@Id
  1113. """,
  1114. new SugarParameter("@EndTime", finishedAt),
  1115. new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds),
  1116. new SugarParameter("@StageRows", result.StageRows),
  1117. new SugarParameter("@StandardRows", result.StandardRows),
  1118. new SugarParameter("@DwdRows", result.DwdRows),
  1119. new SugarParameter("@SummaryJson", BuildRunSummaryJson(result)),
  1120. new SugarParameter("@Id", runLogId));
  1121. }
  1122. private async Task MarkTransformRunFailedAsync(long runLogId, DateTime startedAt, string message)
  1123. {
  1124. try
  1125. {
  1126. var finishedAt = DateTime.Now;
  1127. await _db.Ado.ExecuteCommandAsync(
  1128. """
  1129. UPDATE mdp_transform_run_log
  1130. SET status='FAILED', end_time=@EndTime, duration_ms=@DurationMs,
  1131. error_message=@ErrorMessage, update_time=CURRENT_TIMESTAMP
  1132. WHERE id=@Id
  1133. """,
  1134. new SugarParameter("@EndTime", finishedAt),
  1135. new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds),
  1136. new SugarParameter("@ErrorMessage", Truncate(message, 2000)),
  1137. new SugarParameter("@Id", runLogId));
  1138. }
  1139. catch (Exception ex)
  1140. {
  1141. // 写库自身失败兜底(典型场景:远端 MySQL 瞬断导致 MarkFailed 自身也连不上):
  1142. // 避免再抛二次异常掩盖原错;遗留 RUNNING 行可由运维手动清理。
  1143. Console.Error.WriteLine($"[S1MdpSyncTransform] MarkTransformRunFailed write failed (runLogId={runLogId}): {ex.Message}");
  1144. }
  1145. }
  1146. private static S1MdpSqlCommand Cmd(string sql, string batchId, DateTime now)
  1147. {
  1148. return new S1MdpSqlCommand(sql, new[]
  1149. {
  1150. new SugarParameter("@BatchId", batchId),
  1151. new SugarParameter("@Now", now),
  1152. new SugarParameter("@StatDate", now.Date)
  1153. });
  1154. }
  1155. private static string BuildJsonObjectExpression(IEnumerable<string> columns)
  1156. {
  1157. var parts = columns.SelectMany(c => new[] { $"'{c.Replace("'", "''")}'", $"s.`{c}`" });
  1158. return $"JSON_OBJECT({string.Join(",", parts)})";
  1159. }
  1160. private static string BuildOptionalColumnExpr(IReadOnlyCollection<string> columns, string expected, string fallback)
  1161. {
  1162. return columns.Any(u => string.Equals(u, expected, StringComparison.OrdinalIgnoreCase))
  1163. ? $"s.`{FindColumn(columns, expected)}`"
  1164. : fallback;
  1165. }
  1166. private static string FindColumn(IEnumerable<string> columns, string expected)
  1167. {
  1168. return columns.First(u => string.Equals(u, expected, StringComparison.OrdinalIgnoreCase));
  1169. }
  1170. private static string NormalizeTriggerType(string? triggerType)
  1171. {
  1172. return string.IsNullOrWhiteSpace(triggerType) ? "AUTO" : triggerType.Trim().ToUpperInvariant();
  1173. }
  1174. private static string BuildRunSummaryJson(S1MdpSyncTransformResult result)
  1175. {
  1176. return $$"""{"batchId":"{{result.BatchId}}","stageRows":{{result.StageRows}},"standardRows":{{result.StandardRows}},"dwdRows":{{result.DwdRows}},"kpiRows":{{result.KpiRows}}}""";
  1177. }
  1178. private static string ResolveKpiValueTable(int metricLevel)
  1179. {
  1180. return metricLevel switch
  1181. {
  1182. 1 => "ado_s9_kpi_value_l1_day",
  1183. 2 => "ado_s9_kpi_value_l2_day",
  1184. 3 => "ado_s9_kpi_value_l3_day",
  1185. 4 => "ado_s9_kpi_value_l4_day",
  1186. _ => "ado_s9_kpi_value_l2_day"
  1187. };
  1188. }
  1189. private static decimal DefaultS1Target(string metricCode)
  1190. {
  1191. return metricCode switch
  1192. {
  1193. "S1_L1_001" or "S1_L2_001" or "S1_L2_004" or "S1_L2_010" => 3m,
  1194. "S1_L1_004" => 60m,
  1195. "S1_L1_002" or "S1_L2_002" or "S1_L2_005" or "S1_L2_011" or "S1_L2_013" or "S1_L2_014" or "S1_L2_015" => 95m,
  1196. "S1_L1_003" or "S1_L2_003" or "S1_L2_006" or "S1_L2_012" => 100m,
  1197. "S1_L3_001" or "S1_L3_101" => 8m,
  1198. "S1_L3_002" or "S1_L3_102" => 12m,
  1199. "S1_L3_003" or "S1_L3_103" => 8m,
  1200. "S1_L3_004" or "S1_L3_104" => 10m,
  1201. "S1_L3_005" or "S1_L3_105" => 2m,
  1202. "S1_L4_001" or "S1_L4_101" => 2m,
  1203. "S1_L4_002" or "S1_L4_102" => 3m,
  1204. "S1_L4_003" or "S1_L4_103" => 1m,
  1205. "S1_L4_004" or "S1_L4_104" => 2m,
  1206. _ => 0m
  1207. };
  1208. }
  1209. private static string ResolveKpiStatus(decimal actual, decimal target, string? direction, decimal? yellowThreshold, decimal? redThreshold)
  1210. {
  1211. if (target <= 0) return "gray";
  1212. var ratio = actual / target * 100m;
  1213. if (string.Equals(direction, "lower_is_better", StringComparison.OrdinalIgnoreCase))
  1214. {
  1215. if (actual <= target) return "green";
  1216. if (ratio <= (yellowThreshold ?? 110m)) return "yellow";
  1217. return ratio >= (redThreshold ?? 120m) ? "red" : "yellow";
  1218. }
  1219. if (actual >= target) return "green";
  1220. if (ratio >= (yellowThreshold ?? 95m)) return "yellow";
  1221. return ratio <= (redThreshold ?? 80m) ? "red" : "yellow";
  1222. }
  1223. private static string ResolveTrendFlag(decimal actual, decimal? previous)
  1224. {
  1225. if (previous == null) return "flat";
  1226. if (actual > previous.Value) return "up";
  1227. if (actual < previous.Value) return "down";
  1228. return "flat";
  1229. }
  1230. private static string Truncate(string? raw, int maxLength)
  1231. {
  1232. if (string.IsNullOrEmpty(raw)) return string.Empty;
  1233. return raw.Length <= maxLength ? raw : raw[..maxLength];
  1234. }
  1235. private sealed class S1ColumnRow
  1236. {
  1237. public string ColumnName { get; set; } = string.Empty;
  1238. }
  1239. private sealed class S1MdpEntityRow
  1240. {
  1241. public long Id { get; set; }
  1242. public string EntityName { get; set; } = string.Empty;
  1243. }
  1244. private sealed class S1KpiCalcRow
  1245. {
  1246. public long TenantId { get; set; }
  1247. public long FactoryId { get; set; }
  1248. public string MetricCode { get; set; } = string.Empty;
  1249. public decimal? MetricValue { get; set; }
  1250. }
  1251. private sealed class S1KpiMetaRow
  1252. {
  1253. public int MetricLevel { get; set; }
  1254. public string Direction { get; set; } = "higher_is_better";
  1255. public decimal? YellowThreshold { get; set; }
  1256. public decimal? RedThreshold { get; set; }
  1257. }
  1258. private sealed class S1KpiValueRow
  1259. {
  1260. public long Id { get; set; }
  1261. public decimal? MetricValue { get; set; }
  1262. public decimal? TargetValue { get; set; }
  1263. }
  1264. }
  1265. public sealed class S1MdpSyncTransformResult
  1266. {
  1267. public long RunLogId { get; set; }
  1268. public string BatchId { get; set; } = string.Empty;
  1269. public int StageRows { get; set; }
  1270. public int StandardRows { get; set; }
  1271. public int DwdRows { get; set; }
  1272. public int KpiRows { get; set; }
  1273. public int AtomicRows { get; set; }
  1274. }
  1275. internal sealed record S1MdpSqlCommand(string Sql, SugarParameter[] Parameters);
  1276. internal sealed record S1MdpEntityConfig(
  1277. string EntityCode,
  1278. string SourceTable,
  1279. string TargetTable,
  1280. string SourceRowIdExpression,
  1281. string SourceBizKeyExpression)
  1282. {
  1283. public static readonly IReadOnlyList<S1MdpEntityConfig> All = new List<S1MdpEntityConfig>
  1284. {
  1285. new("S1_SEORDER", "crm_seorder", "mdp_stg_so", "Id", "COALESCE(s.`bill_no`, CAST(s.`Id` AS CHAR))"),
  1286. new("S1_SEORDER_ENTRY", "crm_seorderentry", "mdp_stg_so", "Id", "CONCAT(IFNULL(s.`bill_no`,''), ':', IFNULL(s.`entry_seq`, CAST(s.`Id` AS CHAR)))"),
  1287. new("S1_SEORDER_CHANGE", "crm_seorder_change", "mdp_stg_so", "Id", "CONCAT(IFNULL(s.`bill_no`,''), ':', CAST(s.`Id` AS CHAR))"),
  1288. new("S1_CONTRACT_REVIEW", "ado_contract_review", "mdp_stg_so", "RecID", "COALESCE(s.`BillNo`, CAST(s.`RecID` AS CHAR))"),
  1289. new("S1_CONTRACT_REVIEW_FLOW", "ado_contract_review_flow", "mdp_stg_so", "RecID", "CONCAT(IFNULL(s.`ReviewBillNo`,''), ':', IFNULL(s.`StageNo`,''), ':', CAST(s.`RecID` AS CHAR))"),
  1290. new("S1_PRODUCT_DESIGN", "ado_product_design", "mdp_stg_so", "Id", "COALESCE(s.`BillNo`, CAST(s.`Id` AS CHAR))"),
  1291. new("S1_PRODUCT_DESIGN_BOM", "ado_product_design_bom", "mdp_stg_so", "Id", "CONCAT(CAST(s.`ProductDesignId` AS CHAR), ':', CAST(s.`Id` AS CHAR))"),
  1292. new("S1_PRODUCT_DESIGN_ROUTING", "ado_product_design_routing", "mdp_stg_so", "Id", "CONCAT(CAST(s.`ProductDesignId` AS CHAR), ':', CAST(s.`Id` AS CHAR))"),
  1293. new("S1_REQUIREMENT_EXAMINE_RESULT", "b_examine_result", "mdp_stg_so", "Id", "CONCAT(IFNULL(s.`bill_no`,''), ':', IFNULL(s.`morder_no`,''), ':', CAST(s.`Id` AS CHAR))"),
  1294. new("S1_REQUIREMENT_EXAMINE_DETAIL", "b_bom_child_examine", "mdp_stg_so", "Id", "CONCAT(IFNULL(s.`examine_id`,''), ':', IFNULL(s.`item_number`,''), ':', CAST(s.`Id` AS CHAR))"),
  1295. new("S1_SHIPPING_PLAN", "ShippingPlan", "mdp_stg_ship_trans", "RecID", "COALESCE(s.`LotSerial`, CAST(s.`RecID` AS CHAR))"),
  1296. new("S1_SHIPPING_PLAN_DETAIL", "ShippingPlanDetail", "mdp_stg_ship_trans", "RecID", "CONCAT(IFNULL(s.`plan_id`,''), ':', IFNULL(s.`OrdNbr`,''), ':', CAST(s.`RecID` AS CHAR))"),
  1297. new("S1_ASN_SHIPPER_MASTER", "ASNBOLShipperMaster", "mdp_stg_ship_trans", "RecID", "COALESCE(s.`Id`, CONCAT(IFNULL(s.`OrdNbr`,''), ':', CAST(s.`RecID` AS CHAR)))"),
  1298. new("S1_ASN_SHIPPER_DETAIL", "ASNBOLShipperDetail", "mdp_stg_ship_trans", "RecID", "CONCAT(IFNULL(s.`Id`,''), ':', IFNULL(s.`Line`, CAST(s.`RecID` AS CHAR)))"),
  1299. new("S1_LINKAGE_PLAN", "LinkagePlan", "mdp_stg_ship_trans", "id", "CONCAT(IFNULL(s.`bill_no`,''), ':', IFNULL(s.`item_number`,''), ':', CAST(s.`id` AS CHAR))")
  1300. };
  1301. }