PurchaseReceiptMdpSyncService.cs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. namespace Admin.NET.Plugin.AiDOP.MaterialWarehouse;
  2. /// <summary>
  3. /// S5 采购收货单 数据中台只读同步转换服务(DOP 内部,方案 1)。
  4. ///
  5. /// 源:aidopdev PurOrdRctDetail(p) + PurOrdRctMaster(d),业务类型 RctType='rc'(采购收货);
  6. /// 头明细关联 p.Domain=d.Domain AND p.Receiver=d.Receiver;
  7. /// 维表 ItemMaster(i) / SuppMaster(s) / ConsigneeAddressMaster(a) / PurOrdDetail(pd,sd) / srm_pr_main(dr) 全 LEFT JOIN。
  8. /// 链路(方案 1):源表只读 → mdp_std_purchase_receipt(typed)。不经 mdp_stg(S3 已持有 mdp_stg_receipt),不建 dwd(只读列表)。
  9. ///
  10. /// 约束:
  11. /// - 只读源表,仅写 mdp_std_purchase_receipt;绝不 INSERT/UPDATE/DELETE 任何源表;不读/不改 S3 mdp_stg_receipt、S4 ado_s4_receipt。
  12. /// - 过滤 p.RctType='rc';不套 ERP 端 UserFactoryNum/UserAccount 占位过滤(全量同步,租户隔离在读 API 侧按 tenant_id)。
  13. /// - 派生列对齐旧系统 SQL:sort_name/req/dop_req 保持 CASE d.OrdType='DO' 分支语义;danjia/jiage 本批不落。
  14. /// - 明细粒度 = PurOrdRctDetail 一行(domain#receiver#line);rc 为 0 行时成功完成、处理数为 0,不报错。
  15. /// </summary>
  16. public class PurchaseReceiptMdpSyncService : ITransient
  17. {
  18. private const string JobCode = "S5_PURCHASE_RECEIPT_MDP_SYNC";
  19. private readonly ISqlSugarClient _db;
  20. public PurchaseReceiptMdpSyncService(ISqlSugarClient db)
  21. {
  22. _db = db;
  23. }
  24. /// <summary>全量同步转换:源表(rc) → mdp_std_purchase_receipt。</summary>
  25. public async Task<PurchaseReceiptMdpSyncResult> RunFullAsync(CancellationToken cancellationToken = default, string triggerType = "AUTO")
  26. {
  27. cancellationToken.ThrowIfCancellationRequested();
  28. await EnsureTablesAsync();
  29. var now = DateTime.Now;
  30. var batchId = $"S5_PUR_RCT_FULL_{now:yyyyMMddHHmmss}";
  31. var runLogId = await InsertRunLogAsync(batchId, now, triggerType);
  32. var result = new PurchaseReceiptMdpSyncResult { BatchId = batchId, RunLogId = runLogId };
  33. try
  34. {
  35. result.StdRows = await TransformStandardAsync(batchId, now);
  36. await MarkRunSuccessAsync(runLogId, now, result);
  37. return result;
  38. }
  39. catch (Exception ex)
  40. {
  41. await MarkRunFailedAsync(runLogId, now, ex.Message);
  42. throw;
  43. }
  44. }
  45. /// <summary>防御式建表(与 UpdateScripts WIP-S5PR.sql / 正式 1.0.&lt;n&gt;.sql 同构,幂等)。</summary>
  46. private async Task EnsureTablesAsync()
  47. {
  48. await _db.Ado.ExecuteCommandAsync(
  49. """
  50. CREATE TABLE IF NOT EXISTS mdp_std_purchase_receipt (
  51. id BIGINT AUTO_INCREMENT PRIMARY KEY,
  52. tenant_id BIGINT NOT NULL DEFAULT 0,
  53. factory_id BIGINT NULL DEFAULT 1,
  54. source_system VARCHAR(50) NOT NULL DEFAULT 'AIDOP',
  55. domain VARCHAR(24) NOT NULL,
  56. receiver VARCHAR(24) NOT NULL,
  57. line SMALLINT NOT NULL DEFAULT 0,
  58. rct_date DATETIME NULL,
  59. supp VARCHAR(20) NULL,
  60. sort_name VARCHAR(255) NULL,
  61. item_num VARCHAR(60) NULL,
  62. item_name VARCHAR(200) NULL,
  63. item_spec VARCHAR(200) NULL,
  64. um VARCHAR(8) NULL,
  65. qty_ordered DECIMAL(18,6) NULL DEFAULT 0,
  66. qty_received DECIMAL(18,6) NULL DEFAULT 0,
  67. lot_serial VARCHAR(120) NULL,
  68. location VARCHAR(8) NULL,
  69. ord_nbr VARCHAR(24) NULL,
  70. ord_line SMALLINT NULL,
  71. blanket_line INT NULL,
  72. pur_ord VARCHAR(24) NULL,
  73. pur_line SMALLINT NULL,
  74. sales_job VARCHAR(200) NULL,
  75. address1 VARCHAR(200) NULL,
  76. req VARCHAR(20) NULL,
  77. req_line INT NULL,
  78. dop_req VARCHAR(255) NULL,
  79. source_biz_key VARCHAR(200) NULL,
  80. sync_batch_id VARCHAR(100) NOT NULL,
  81. sync_time DATETIME NOT NULL,
  82. create_time DATETIME DEFAULT CURRENT_TIMESTAMP,
  83. update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  84. UNIQUE KEY uk_mdp_std_pur_rct (tenant_id, domain, receiver, line),
  85. KEY idx_mdp_std_pur_rct_date (tenant_id, rct_date),
  86. KEY idx_mdp_std_pur_rct_item (tenant_id, item_num),
  87. KEY idx_mdp_std_pur_rct_supp (tenant_id, supp),
  88. KEY idx_mdp_std_pur_rct_purord (tenant_id, pur_ord),
  89. KEY idx_mdp_std_pur_rct_salesjob (tenant_id, sales_job),
  90. KEY idx_mdp_std_pur_rct_req (tenant_id, req),
  91. KEY idx_mdp_std_pur_rct_dopreq (tenant_id, dop_req)
  92. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='S5采购收货单标准层'
  93. """);
  94. }
  95. /// <summary>
  96. /// 标准化:PurOrdRctDetail(rc) + Master + 维表 -> mdp_std_purchase_receipt。返回处理明细行数。
  97. /// SQL 由旧系统源 SQL 翻译(SQL Server -&gt; MySQL):with(nolock) 去除、IsNull-&gt;IFNULL、
  98. /// rtrim(a+' '+b)-&gt;TRIM(CONCAT(...))、convert(varchar,x)-&gt;CAST(x AS CHAR)、left(...)/占位过滤去除。
  99. /// </summary>
  100. private async Task<int> TransformStandardAsync(string batchId, DateTime now)
  101. {
  102. var rows = await _db.Ado.GetIntAsync(
  103. "SELECT COUNT(1) FROM PurOrdRctDetail p INNER JOIN PurOrdRctMaster d ON p.Domain=d.Domain AND p.Receiver=d.Receiver WHERE p.RctType='rc'");
  104. await _db.Ado.ExecuteCommandAsync(
  105. """
  106. INSERT INTO mdp_std_purchase_receipt
  107. (tenant_id, factory_id, source_system, domain, receiver, line, rct_date, supp, sort_name,
  108. item_num, item_name, item_spec, um, qty_ordered, qty_received, lot_serial, location,
  109. ord_nbr, ord_line, blanket_line, pur_ord, pur_line, sales_job, address1,
  110. req, req_line, dop_req, source_biz_key, sync_batch_id, sync_time)
  111. SELECT
  112. IFNULL(p.tenant_id, 0), 1, 'AIDOP', p.Domain, p.Receiver, p.Line, d.RctDate, p.Supp,
  113. TRIM(CONCAT(d.Supp, ' ', IFNULL(s.SortName, ''))),
  114. p.ItemNum, i.Descr, i.Descr1, p.UM, p.QtyOrded, p.QtyReceived, p.LotSerial, p.Location,
  115. p.OrdNbr, p.OrdLine, pd.BlanketLine, pd.PurOrd, pd.Line, pd.SalesJob, a.Address1,
  116. (CASE WHEN d.OrdType='DO' THEN '' ELSE sd.Req END),
  117. sd.ReqLine,
  118. (CASE WHEN d.OrdType='DO' THEN sd.Req ELSE dr.pr_billno END),
  119. CONCAT(p.Domain, '#', p.Receiver, '#', p.Line), @BatchId, @Now
  120. FROM PurOrdRctDetail p
  121. INNER JOIN PurOrdRctMaster d ON p.Domain = d.Domain AND p.Receiver = d.Receiver
  122. LEFT JOIN ItemMaster i ON p.Domain = i.Domain AND p.ItemNum = i.ItemNum
  123. LEFT JOIN SuppMaster s ON d.Domain = s.Domain AND d.Supp = s.Supp
  124. LEFT JOIN ConsigneeAddressMaster a ON p.Domain = a.Domain AND p.Supp = a.Address AND a.Typed = 'Supp'
  125. LEFT JOIN PurOrdDetail pd ON p.Domain = pd.Domain
  126. AND (CASE WHEN d.OrdType='DO' AND IFNULL(p.RctNbr,'')<>'' THEN p.RctNbr ELSE p.OrdNbr END)
  127. = (CASE WHEN d.OrdType='DO' AND IFNULL(p.RctNbr,'')<>'' THEN pd.Contract ELSE pd.PurOrd END)
  128. AND (CASE WHEN d.OrdType='DO' AND IFNULL(p.RctNbr,'')<>'' THEN p.BlanketLine ELSE p.OrdLine END) = pd.Line
  129. LEFT JOIN PurOrdDetail sd ON p.Domain = sd.Domain AND p.OrdNbr = sd.PurOrd AND p.OrdLine = sd.Line
  130. LEFT JOIN srm_pr_main dr ON CAST(dr.factory_id AS CHAR) = sd.Domain AND dr.SAP_pr_billno = sd.Req AND IFNULL(dr.SAP_pr_billno,'')<>''
  131. WHERE p.RctType = 'rc'
  132. ON DUPLICATE KEY UPDATE
  133. factory_id=VALUES(factory_id), rct_date=VALUES(rct_date), supp=VALUES(supp), sort_name=VALUES(sort_name),
  134. item_num=VALUES(item_num), item_name=VALUES(item_name), item_spec=VALUES(item_spec), um=VALUES(um),
  135. qty_ordered=VALUES(qty_ordered), qty_received=VALUES(qty_received), lot_serial=VALUES(lot_serial), location=VALUES(location),
  136. ord_nbr=VALUES(ord_nbr), ord_line=VALUES(ord_line), blanket_line=VALUES(blanket_line),
  137. pur_ord=VALUES(pur_ord), pur_line=VALUES(pur_line), sales_job=VALUES(sales_job), address1=VALUES(address1),
  138. req=VALUES(req), req_line=VALUES(req_line), dop_req=VALUES(dop_req),
  139. sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
  140. """,
  141. new SugarParameter("@BatchId", batchId),
  142. new SugarParameter("@Now", now));
  143. return rows;
  144. }
  145. private async Task<long> InsertRunLogAsync(string batchId, DateTime startedAt, string triggerType)
  146. {
  147. await _db.Ado.ExecuteCommandAsync(
  148. """
  149. INSERT INTO mdp_transform_run_log
  150. (tenant_id, job_code, job_name, trigger_type, batch_id, status, start_time)
  151. VALUES (0, @JobCode, 'S5采购收货单MDP同步与标准化转换', @TriggerType, @BatchId, 'RUNNING', @StartTime)
  152. """,
  153. new SugarParameter("@JobCode", JobCode),
  154. new SugarParameter("@TriggerType", NormalizeTriggerType(triggerType)),
  155. new SugarParameter("@BatchId", batchId),
  156. new SugarParameter("@StartTime", startedAt));
  157. return await _db.Ado.GetLongAsync(
  158. "SELECT id FROM mdp_transform_run_log WHERE batch_id=@BatchId ORDER BY id DESC LIMIT 1",
  159. new List<SugarParameter> { new("@BatchId", batchId) });
  160. }
  161. private async Task MarkRunSuccessAsync(long runLogId, DateTime startedAt, PurchaseReceiptMdpSyncResult result)
  162. {
  163. var finishedAt = DateTime.Now;
  164. await _db.Ado.ExecuteCommandAsync(
  165. """
  166. UPDATE mdp_transform_run_log
  167. SET status='SUCCESS', end_time=@EndTime, duration_ms=@DurationMs,
  168. stage_rows=0, standard_rows=@StandardRows, dwd_rows=0, update_time=CURRENT_TIMESTAMP
  169. WHERE id=@Id
  170. """,
  171. new SugarParameter("@EndTime", finishedAt),
  172. new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds),
  173. new SugarParameter("@StandardRows", result.StdRows),
  174. new SugarParameter("@Id", runLogId));
  175. }
  176. private async Task MarkRunFailedAsync(long runLogId, DateTime startedAt, string message)
  177. {
  178. var finishedAt = DateTime.Now;
  179. await _db.Ado.ExecuteCommandAsync(
  180. """
  181. UPDATE mdp_transform_run_log
  182. SET status='FAILED', end_time=@EndTime, duration_ms=@DurationMs,
  183. error_message=@ErrorMessage, update_time=CURRENT_TIMESTAMP
  184. WHERE id=@Id
  185. """,
  186. new SugarParameter("@EndTime", finishedAt),
  187. new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds),
  188. new SugarParameter("@ErrorMessage", message.Length > 2000 ? message[..2000] : message),
  189. new SugarParameter("@Id", runLogId));
  190. }
  191. private static string NormalizeTriggerType(string? triggerType)
  192. => string.IsNullOrWhiteSpace(triggerType) ? "AUTO" : triggerType.Trim().ToUpperInvariant();
  193. }
  194. /// <summary>采购收货单 MDP 同步转换结果。</summary>
  195. public sealed class PurchaseReceiptMdpSyncResult
  196. {
  197. public long RunLogId { get; set; }
  198. public string BatchId { get; set; } = string.Empty;
  199. public int StdRows { get; set; }
  200. }