OutsourceIssueMdpSyncService.cs 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. namespace Admin.NET.Plugin.AiDOP.MaterialWarehouse;
  2. /// <summary>
  3. /// S5 委外发料单 数据中台只读同步转换服务(DOP 内部,独立于 S5MdpSyncTransformService 的 KPI 管线)。
  4. ///
  5. /// 源:aidopdev NbrMaster / NbrDetail,业务类型 Type='CA';头明细关联 NbrDetail.NbrRecID -> NbrMaster.RecID。
  6. /// 链路:NbrMaster/NbrDetail(CA) 只读 → mdp_stg_outsource_issue(_detail)(raw_data JSON) → mdp_std_outsource_issue(_detail)(typed)。
  7. ///
  8. /// 约束:
  9. /// - 只读源表,仅写 mdp_stg_*/mdp_std_*;绝不 INSERT/UPDATE/DELETE NbrMaster/NbrDetail。
  10. /// - 过滤 NbrMaster.Type='CA' AND IsActive=1;NbrDetail.Type='CA'。
  11. /// - status_desc 规则本批仅:Status='C' -> '关闭',其他 -> ''。
  12. /// - department_name 经 NbrMaster.Domain + NbrMaster.Department 关联 DepartmentMaster.Descr。
  13. /// - 明细只转换已确认 9 列(ItemNum/ItemName/UM/QtyOrd/LocationFrom/LocationTo/Line/Status/Remark);
  14. /// 「发料数量/已发数/批次号」3 候选列本批后置,不转换、不落 typed 字段。
  15. /// - CA 为 0 行时转换成功完成、处理数为 0,不报错。
  16. /// </summary>
  17. public class OutsourceIssueMdpSyncService : ITransient
  18. {
  19. private const string JobCode = "S5_OUTSOURCE_ISSUE_MDP_SYNC";
  20. private readonly ISqlSugarClient _db;
  21. public OutsourceIssueMdpSyncService(ISqlSugarClient db)
  22. {
  23. _db = db;
  24. }
  25. /// <summary>
  26. /// 全量同步转换:贴源(头/明细) → 标准化(头/明细)。
  27. /// </summary>
  28. public async Task<OutsourceIssueMdpSyncResult> RunFullAsync(CancellationToken cancellationToken = default, string triggerType = "AUTO")
  29. {
  30. cancellationToken.ThrowIfCancellationRequested();
  31. await EnsureTablesAsync();
  32. var now = DateTime.Now;
  33. var batchId = $"S5_OUTSRC_ISSUE_FULL_{now:yyyyMMddHHmmss}";
  34. var runLogId = await InsertRunLogAsync(batchId, now, triggerType);
  35. var result = new OutsourceIssueMdpSyncResult { BatchId = batchId, RunLogId = runLogId };
  36. try
  37. {
  38. result.HeadStageRows = await SyncHeadStagingAsync(batchId, now);
  39. result.DetailStageRows = await SyncDetailStagingAsync(batchId, now);
  40. result.HeadStandardRows = await TransformHeadStandardAsync(batchId, now);
  41. result.DetailStandardRows = await TransformDetailStandardAsync(batchId, now);
  42. await MarkRunSuccessAsync(runLogId, now, result);
  43. return result;
  44. }
  45. catch (Exception ex)
  46. {
  47. await MarkRunFailedAsync(runLogId, now, ex.Message);
  48. throw;
  49. }
  50. }
  51. /// <summary>防御式建表(与 UpdateScripts/1.0.206.sql 同构,幂等)。</summary>
  52. private async Task EnsureTablesAsync()
  53. {
  54. await _db.Ado.ExecuteCommandAsync(
  55. """
  56. CREATE TABLE IF NOT EXISTS mdp_stg_outsource_issue (
  57. id BIGINT AUTO_INCREMENT PRIMARY KEY,
  58. tenant_id BIGINT NOT NULL DEFAULT 0,
  59. factory_id BIGINT NULL DEFAULT 1,
  60. source_system VARCHAR(50) NOT NULL DEFAULT 'AIDOP',
  61. source_table VARCHAR(100) NOT NULL,
  62. source_row_id VARCHAR(100) NOT NULL,
  63. source_biz_key VARCHAR(200) NULL,
  64. sync_batch_id VARCHAR(100) NOT NULL,
  65. sync_time DATETIME NOT NULL,
  66. process_status VARCHAR(20) NOT NULL DEFAULT 'PENDING',
  67. raw_data JSON NOT NULL,
  68. update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  69. UNIQUE KEY uk_mdp_stg_outsrc_issue (tenant_id, source_table, source_row_id)
  70. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='S5委外发料单头贴源层'
  71. """);
  72. await _db.Ado.ExecuteCommandAsync(
  73. """
  74. CREATE TABLE IF NOT EXISTS mdp_stg_outsource_issue_detail (
  75. id BIGINT AUTO_INCREMENT PRIMARY KEY,
  76. tenant_id BIGINT NOT NULL DEFAULT 0,
  77. factory_id BIGINT NULL DEFAULT 1,
  78. source_system VARCHAR(50) NOT NULL DEFAULT 'AIDOP',
  79. source_table VARCHAR(100) NOT NULL,
  80. source_row_id VARCHAR(100) NOT NULL,
  81. source_biz_key VARCHAR(200) NULL,
  82. sync_batch_id VARCHAR(100) NOT NULL,
  83. sync_time DATETIME NOT NULL,
  84. process_status VARCHAR(20) NOT NULL DEFAULT 'PENDING',
  85. raw_data JSON NOT NULL,
  86. update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  87. UNIQUE KEY uk_mdp_stg_outsrc_issue_dtl (tenant_id, source_table, source_row_id)
  88. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='S5委外发料单明细贴源层'
  89. """);
  90. await _db.Ado.ExecuteCommandAsync(
  91. """
  92. CREATE TABLE IF NOT EXISTS mdp_std_outsource_issue (
  93. id BIGINT AUTO_INCREMENT PRIMARY KEY,
  94. tenant_id BIGINT NOT NULL DEFAULT 0,
  95. factory_id BIGINT NULL DEFAULT 1,
  96. source_system VARCHAR(50) NOT NULL DEFAULT 'AIDOP',
  97. bill_no VARCHAR(24) NOT NULL,
  98. issue_date DATETIME NULL,
  99. outsource_no VARCHAR(60) NULL,
  100. work_order VARCHAR(64) NULL,
  101. department_code VARCHAR(20) NULL,
  102. department_name VARCHAR(255) NULL,
  103. issuer VARCHAR(255) NULL,
  104. status VARCHAR(8) NULL,
  105. status_desc VARCHAR(20) NULL,
  106. remark VARCHAR(200) NULL,
  107. create_user VARCHAR(24) NULL,
  108. source_create_time DATETIME NULL,
  109. eff_date DATETIME NULL,
  110. source_biz_key VARCHAR(200) NULL,
  111. sync_batch_id VARCHAR(100) NOT NULL,
  112. sync_time DATETIME NOT NULL,
  113. update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  114. UNIQUE KEY uk_mdp_std_outsrc_issue (tenant_id, bill_no),
  115. KEY idx_mdp_std_outsrc_issue_date (tenant_id, issue_date)
  116. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='S5委外发料单头标准层'
  117. """);
  118. await _db.Ado.ExecuteCommandAsync(
  119. """
  120. CREATE TABLE IF NOT EXISTS mdp_std_outsource_issue_detail (
  121. id BIGINT AUTO_INCREMENT PRIMARY KEY,
  122. tenant_id BIGINT NOT NULL DEFAULT 0,
  123. std_head_id BIGINT NULL,
  124. bill_no VARCHAR(24) NOT NULL,
  125. line SMALLINT NOT NULL DEFAULT 0,
  126. item_num VARCHAR(24) NULL,
  127. item_name TEXT NULL,
  128. um VARCHAR(8) NULL,
  129. qty_ord DECIMAL(18,6) NULL DEFAULT 0,
  130. location_from VARCHAR(8) NULL,
  131. location_to VARCHAR(8) NULL,
  132. status VARCHAR(8) NULL,
  133. remark TEXT NULL,
  134. source_biz_key VARCHAR(200) NULL,
  135. sync_batch_id VARCHAR(100) NOT NULL,
  136. sync_time DATETIME NOT NULL,
  137. update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  138. UNIQUE KEY uk_mdp_std_outsrc_issue_dtl (tenant_id, source_biz_key),
  139. KEY idx_mdp_std_outsrc_issue_dtl_head (std_head_id),
  140. KEY idx_mdp_std_outsrc_issue_dtl_bill (tenant_id, bill_no)
  141. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='S5委外发料单明细标准层'
  142. """);
  143. }
  144. /// <summary>贴源头:NbrMaster(CA) -> mdp_stg_outsource_issue(raw_data JSON 快照)。返回源 CA 头行数。</summary>
  145. private async Task<int> SyncHeadStagingAsync(string batchId, DateTime now)
  146. {
  147. var rows = await _db.Ado.GetIntAsync("SELECT COUNT(1) FROM NbrMaster WHERE Type='CA' AND IsActive=1");
  148. await _db.Ado.ExecuteCommandAsync(
  149. """
  150. INSERT INTO mdp_stg_outsource_issue
  151. (tenant_id, factory_id, source_system, source_table, source_row_id, source_biz_key, sync_batch_id, sync_time, process_status, raw_data)
  152. SELECT
  153. IFNULL(m.tenant_id, 0), 1, 'AIDOP', 'NbrMaster',
  154. CAST(m.RecID AS CHAR), CAST(m.Nbr AS CHAR), @BatchId, @Now, 'PENDING',
  155. JSON_OBJECT(
  156. 'RecID', m.RecID, 'Nbr', m.Nbr, 'Date', m.Date, 'Address', m.Address,
  157. 'WorkOrd', m.WorkOrd, 'Remark', m.Remark, 'CreateUser', m.CreateUser, 'CreateTime', m.CreateTime,
  158. 'Status', m.Status, 'Domain', m.Domain, 'Department', m.Department,
  159. 'User1', CAST(m.User1 AS CHAR), 'EffDate', m.EffDate, 'Ufld1', m.Ufld1
  160. )
  161. FROM NbrMaster m
  162. WHERE m.Type='CA' AND m.IsActive=1
  163. ON DUPLICATE KEY UPDATE
  164. source_biz_key=VALUES(source_biz_key), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time),
  165. process_status=VALUES(process_status), raw_data=VALUES(raw_data), update_time=CURRENT_TIMESTAMP
  166. """,
  167. new SugarParameter("@BatchId", batchId),
  168. new SugarParameter("@Now", now));
  169. return rows;
  170. }
  171. /// <summary>贴源明细:NbrDetail(CA) -> mdp_stg_outsource_issue_detail。返回源 CA 明细行数。</summary>
  172. private async Task<int> SyncDetailStagingAsync(string batchId, DateTime now)
  173. {
  174. var rows = await _db.Ado.GetIntAsync(
  175. """
  176. SELECT COUNT(1) FROM NbrDetail dt
  177. JOIN NbrMaster m ON m.RecID = dt.NbrRecID AND m.Type='CA' AND m.IsActive=1
  178. WHERE dt.Type='CA'
  179. """);
  180. await _db.Ado.ExecuteCommandAsync(
  181. """
  182. INSERT INTO mdp_stg_outsource_issue_detail
  183. (tenant_id, factory_id, source_system, source_table, source_row_id, source_biz_key, sync_batch_id, sync_time, process_status, raw_data)
  184. SELECT
  185. IFNULL(dt.tenant_id, 0), 1, 'AIDOP', 'NbrDetail',
  186. CAST(dt.RecID AS CHAR), CONCAT(CAST(dt.Nbr AS CHAR), '#', dt.Line), @BatchId, @Now, 'PENDING',
  187. JSON_OBJECT(
  188. 'RecID', dt.RecID, 'NbrRecID', dt.NbrRecID, 'Nbr', dt.Nbr, 'Line', dt.Line,
  189. 'ItemNum', dt.ItemNum, 'ItemName', CAST(dt.ItemName AS CHAR), 'UM', dt.UM, 'QtyOrd', dt.QtyOrd,
  190. 'LocationFrom', dt.LocationFrom, 'LocationTo', dt.LocationTo, 'Status', dt.Status, 'Remark', CAST(dt.Remark AS CHAR)
  191. )
  192. FROM NbrDetail dt
  193. JOIN NbrMaster m ON m.RecID = dt.NbrRecID AND m.Type='CA' AND m.IsActive=1
  194. WHERE dt.Type='CA'
  195. ON DUPLICATE KEY UPDATE
  196. source_biz_key=VALUES(source_biz_key), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time),
  197. process_status=VALUES(process_status), raw_data=VALUES(raw_data), update_time=CURRENT_TIMESTAMP
  198. """,
  199. new SugarParameter("@BatchId", batchId),
  200. new SugarParameter("@Now", now));
  201. return rows;
  202. }
  203. /// <summary>标准化头:NbrMaster(CA) + DepartmentMaster -> mdp_std_outsource_issue。返回处理头行数。</summary>
  204. private async Task<int> TransformHeadStandardAsync(string batchId, DateTime now)
  205. {
  206. var rows = await _db.Ado.GetIntAsync("SELECT COUNT(1) FROM NbrMaster WHERE Type='CA' AND IsActive=1");
  207. await _db.Ado.ExecuteCommandAsync(
  208. """
  209. INSERT INTO mdp_std_outsource_issue
  210. (tenant_id, factory_id, source_system, bill_no, issue_date, outsource_no, work_order,
  211. department_code, department_name, issuer, status, status_desc, remark, create_user,
  212. source_create_time, eff_date, source_biz_key, sync_batch_id, sync_time)
  213. SELECT
  214. IFNULL(m.tenant_id, 0), 1, 'AIDOP', m.Nbr, m.Date, m.Address, m.WorkOrd,
  215. m.Department, d.Descr, CAST(m.User1 AS CHAR), m.Status,
  216. CASE WHEN m.Status='C' THEN '关闭' ELSE '' END,
  217. m.Remark, m.CreateUser, m.CreateTime, m.EffDate,
  218. CAST(m.Nbr AS CHAR), @BatchId, @Now
  219. FROM NbrMaster m
  220. LEFT JOIN DepartmentMaster d ON d.Domain = m.Domain AND d.Department = m.Department
  221. WHERE m.Type='CA' AND m.IsActive=1
  222. ON DUPLICATE KEY UPDATE
  223. issue_date=VALUES(issue_date), outsource_no=VALUES(outsource_no), work_order=VALUES(work_order),
  224. department_code=VALUES(department_code), department_name=VALUES(department_name), issuer=VALUES(issuer),
  225. status=VALUES(status), status_desc=VALUES(status_desc), remark=VALUES(remark), create_user=VALUES(create_user),
  226. source_create_time=VALUES(source_create_time), eff_date=VALUES(eff_date),
  227. sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
  228. """,
  229. new SugarParameter("@BatchId", batchId),
  230. new SugarParameter("@Now", now));
  231. return rows;
  232. }
  233. /// <summary>标准化明细:NbrDetail(CA) -> mdp_std_outsource_issue_detail(仅已确认 9 列;回填 std_head_id)。返回处理明细行数。</summary>
  234. private async Task<int> TransformDetailStandardAsync(string batchId, DateTime now)
  235. {
  236. var rows = await _db.Ado.GetIntAsync(
  237. """
  238. SELECT COUNT(1) FROM NbrDetail dt
  239. JOIN NbrMaster m ON m.RecID = dt.NbrRecID AND m.Type='CA' AND m.IsActive=1
  240. WHERE dt.Type='CA'
  241. """);
  242. await _db.Ado.ExecuteCommandAsync(
  243. """
  244. INSERT INTO mdp_std_outsource_issue_detail
  245. (tenant_id, std_head_id, bill_no, line, item_num, item_name, um, qty_ord,
  246. location_from, location_to, status, remark, source_biz_key, sync_batch_id, sync_time)
  247. SELECT
  248. IFNULL(dt.tenant_id, 0), h.id, dt.Nbr, dt.Line, dt.ItemNum, CAST(dt.ItemName AS CHAR), dt.UM, dt.QtyOrd,
  249. dt.LocationFrom, dt.LocationTo, dt.Status, CAST(dt.Remark AS CHAR),
  250. CONCAT(CAST(dt.Nbr AS CHAR), '#', dt.Line), @BatchId, @Now
  251. FROM NbrDetail dt
  252. JOIN NbrMaster m ON m.RecID = dt.NbrRecID AND m.Type='CA' AND m.IsActive=1
  253. LEFT JOIN mdp_std_outsource_issue h ON h.tenant_id = IFNULL(dt.tenant_id, 0) AND h.bill_no = dt.Nbr
  254. WHERE dt.Type='CA'
  255. ON DUPLICATE KEY UPDATE
  256. std_head_id=VALUES(std_head_id), line=VALUES(line), item_num=VALUES(item_num), item_name=VALUES(item_name),
  257. um=VALUES(um), qty_ord=VALUES(qty_ord), location_from=VALUES(location_from), location_to=VALUES(location_to),
  258. status=VALUES(status), remark=VALUES(remark), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time),
  259. update_time=CURRENT_TIMESTAMP
  260. """,
  261. new SugarParameter("@BatchId", batchId),
  262. new SugarParameter("@Now", now));
  263. return rows;
  264. }
  265. private async Task<long> InsertRunLogAsync(string batchId, DateTime startedAt, string triggerType)
  266. {
  267. await _db.Ado.ExecuteCommandAsync(
  268. """
  269. INSERT INTO mdp_transform_run_log
  270. (tenant_id, job_code, job_name, trigger_type, batch_id, status, start_time)
  271. VALUES (0, @JobCode, 'S5委外发料单MDP同步与标准化转换', @TriggerType, @BatchId, 'RUNNING', @StartTime)
  272. """,
  273. new SugarParameter("@JobCode", JobCode),
  274. new SugarParameter("@TriggerType", NormalizeTriggerType(triggerType)),
  275. new SugarParameter("@BatchId", batchId),
  276. new SugarParameter("@StartTime", startedAt));
  277. return await _db.Ado.GetLongAsync(
  278. "SELECT id FROM mdp_transform_run_log WHERE batch_id=@BatchId ORDER BY id DESC LIMIT 1",
  279. new List<SugarParameter> { new("@BatchId", batchId) });
  280. }
  281. private async Task MarkRunSuccessAsync(long runLogId, DateTime startedAt, OutsourceIssueMdpSyncResult result)
  282. {
  283. var finishedAt = DateTime.Now;
  284. await _db.Ado.ExecuteCommandAsync(
  285. """
  286. UPDATE mdp_transform_run_log
  287. SET status='SUCCESS', end_time=@EndTime, duration_ms=@DurationMs,
  288. stage_rows=@StageRows, standard_rows=@StandardRows, dwd_rows=0, update_time=CURRENT_TIMESTAMP
  289. WHERE id=@Id
  290. """,
  291. new SugarParameter("@EndTime", finishedAt),
  292. new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds),
  293. new SugarParameter("@StageRows", result.HeadStageRows + result.DetailStageRows),
  294. new SugarParameter("@StandardRows", result.HeadStandardRows + result.DetailStandardRows),
  295. new SugarParameter("@Id", runLogId));
  296. }
  297. private async Task MarkRunFailedAsync(long runLogId, DateTime startedAt, string message)
  298. {
  299. var finishedAt = DateTime.Now;
  300. await _db.Ado.ExecuteCommandAsync(
  301. """
  302. UPDATE mdp_transform_run_log
  303. SET status='FAILED', end_time=@EndTime, duration_ms=@DurationMs,
  304. error_message=@ErrorMessage, update_time=CURRENT_TIMESTAMP
  305. WHERE id=@Id
  306. """,
  307. new SugarParameter("@EndTime", finishedAt),
  308. new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds),
  309. new SugarParameter("@ErrorMessage", message.Length > 2000 ? message[..2000] : message),
  310. new SugarParameter("@Id", runLogId));
  311. }
  312. private static string NormalizeTriggerType(string? triggerType)
  313. => string.IsNullOrWhiteSpace(triggerType) ? "AUTO" : triggerType.Trim().ToUpperInvariant();
  314. }
  315. /// <summary>委外发料单 MDP 同步转换结果。</summary>
  316. public sealed class OutsourceIssueMdpSyncResult
  317. {
  318. public long RunLogId { get; set; }
  319. public string BatchId { get; set; } = string.Empty;
  320. public int HeadStageRows { get; set; }
  321. public int DetailStageRows { get; set; }
  322. public int HeadStandardRows { get; set; }
  323. public int DetailStandardRows { get; set; }
  324. }