|
|
@@ -0,0 +1,213 @@
|
|
|
+namespace Admin.NET.Plugin.AiDOP.MaterialWarehouse;
|
|
|
+
|
|
|
+/// <summary>
|
|
|
+/// S5 采购收货单 数据中台只读同步转换服务(DOP 内部,方案 1)。
|
|
|
+///
|
|
|
+/// 源:aidopdev PurOrdRctDetail(p) + PurOrdRctMaster(d),业务类型 RctType='rc'(采购收货);
|
|
|
+/// 头明细关联 p.Domain=d.Domain AND p.Receiver=d.Receiver;
|
|
|
+/// 维表 ItemMaster(i) / SuppMaster(s) / ConsigneeAddressMaster(a) / PurOrdDetail(pd,sd) / srm_pr_main(dr) 全 LEFT JOIN。
|
|
|
+/// 链路(方案 1):源表只读 → mdp_std_purchase_receipt(typed)。不经 mdp_stg(S3 已持有 mdp_stg_receipt),不建 dwd(只读列表)。
|
|
|
+///
|
|
|
+/// 约束:
|
|
|
+/// - 只读源表,仅写 mdp_std_purchase_receipt;绝不 INSERT/UPDATE/DELETE 任何源表;不读/不改 S3 mdp_stg_receipt、S4 ado_s4_receipt。
|
|
|
+/// - 过滤 p.RctType='rc';不套 ERP 端 UserFactoryNum/UserAccount 占位过滤(全量同步,租户隔离在读 API 侧按 tenant_id)。
|
|
|
+/// - 派生列对齐旧系统 SQL:sort_name/req/dop_req 保持 CASE d.OrdType='DO' 分支语义;danjia/jiage 本批不落。
|
|
|
+/// - 明细粒度 = PurOrdRctDetail 一行(domain#receiver#line);rc 为 0 行时成功完成、处理数为 0,不报错。
|
|
|
+/// </summary>
|
|
|
+public class PurchaseReceiptMdpSyncService : ITransient
|
|
|
+{
|
|
|
+ private const string JobCode = "S5_PURCHASE_RECEIPT_MDP_SYNC";
|
|
|
+
|
|
|
+ private readonly ISqlSugarClient _db;
|
|
|
+
|
|
|
+ public PurchaseReceiptMdpSyncService(ISqlSugarClient db)
|
|
|
+ {
|
|
|
+ _db = db;
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>全量同步转换:源表(rc) → mdp_std_purchase_receipt。</summary>
|
|
|
+ public async Task<PurchaseReceiptMdpSyncResult> RunFullAsync(CancellationToken cancellationToken = default, string triggerType = "AUTO")
|
|
|
+ {
|
|
|
+ cancellationToken.ThrowIfCancellationRequested();
|
|
|
+ await EnsureTablesAsync();
|
|
|
+
|
|
|
+ var now = DateTime.Now;
|
|
|
+ var batchId = $"S5_PUR_RCT_FULL_{now:yyyyMMddHHmmss}";
|
|
|
+ var runLogId = await InsertRunLogAsync(batchId, now, triggerType);
|
|
|
+ var result = new PurchaseReceiptMdpSyncResult { BatchId = batchId, RunLogId = runLogId };
|
|
|
+
|
|
|
+ try
|
|
|
+ {
|
|
|
+ result.StdRows = await TransformStandardAsync(batchId, now);
|
|
|
+ await MarkRunSuccessAsync(runLogId, now, result);
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ await MarkRunFailedAsync(runLogId, now, ex.Message);
|
|
|
+ throw;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>防御式建表(与 UpdateScripts WIP-S5PR.sql / 正式 1.0.<n>.sql 同构,幂等)。</summary>
|
|
|
+ private async Task EnsureTablesAsync()
|
|
|
+ {
|
|
|
+ await _db.Ado.ExecuteCommandAsync(
|
|
|
+ """
|
|
|
+ CREATE TABLE IF NOT EXISTS mdp_std_purchase_receipt (
|
|
|
+ id BIGINT AUTO_INCREMENT PRIMARY KEY,
|
|
|
+ tenant_id BIGINT NOT NULL DEFAULT 0,
|
|
|
+ factory_id BIGINT NULL DEFAULT 1,
|
|
|
+ source_system VARCHAR(50) NOT NULL DEFAULT 'AIDOP',
|
|
|
+ domain VARCHAR(24) NOT NULL,
|
|
|
+ receiver VARCHAR(24) NOT NULL,
|
|
|
+ line SMALLINT NOT NULL DEFAULT 0,
|
|
|
+ rct_date DATETIME NULL,
|
|
|
+ supp VARCHAR(20) NULL,
|
|
|
+ sort_name VARCHAR(255) NULL,
|
|
|
+ item_num VARCHAR(60) NULL,
|
|
|
+ item_name VARCHAR(200) NULL,
|
|
|
+ item_spec VARCHAR(200) NULL,
|
|
|
+ um VARCHAR(8) NULL,
|
|
|
+ qty_ordered DECIMAL(18,6) NULL DEFAULT 0,
|
|
|
+ qty_received DECIMAL(18,6) NULL DEFAULT 0,
|
|
|
+ lot_serial VARCHAR(120) NULL,
|
|
|
+ location VARCHAR(8) NULL,
|
|
|
+ ord_nbr VARCHAR(24) NULL,
|
|
|
+ ord_line SMALLINT NULL,
|
|
|
+ blanket_line INT NULL,
|
|
|
+ pur_ord VARCHAR(24) NULL,
|
|
|
+ pur_line SMALLINT NULL,
|
|
|
+ sales_job VARCHAR(200) NULL,
|
|
|
+ address1 VARCHAR(200) NULL,
|
|
|
+ req VARCHAR(20) NULL,
|
|
|
+ req_line INT NULL,
|
|
|
+ dop_req VARCHAR(255) NULL,
|
|
|
+ source_biz_key VARCHAR(200) NULL,
|
|
|
+ sync_batch_id VARCHAR(100) NOT NULL,
|
|
|
+ sync_time DATETIME NOT NULL,
|
|
|
+ create_time DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
|
+ update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
|
|
+ UNIQUE KEY uk_mdp_std_pur_rct (tenant_id, domain, receiver, line),
|
|
|
+ KEY idx_mdp_std_pur_rct_date (tenant_id, rct_date),
|
|
|
+ KEY idx_mdp_std_pur_rct_item (tenant_id, item_num),
|
|
|
+ KEY idx_mdp_std_pur_rct_supp (tenant_id, supp),
|
|
|
+ KEY idx_mdp_std_pur_rct_purord (tenant_id, pur_ord),
|
|
|
+ KEY idx_mdp_std_pur_rct_salesjob (tenant_id, sales_job),
|
|
|
+ KEY idx_mdp_std_pur_rct_req (tenant_id, req),
|
|
|
+ KEY idx_mdp_std_pur_rct_dopreq (tenant_id, dop_req)
|
|
|
+ ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='S5采购收货单标准层'
|
|
|
+ """);
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// 标准化:PurOrdRctDetail(rc) + Master + 维表 -> mdp_std_purchase_receipt。返回处理明细行数。
|
|
|
+ /// SQL 由旧系统源 SQL 翻译(SQL Server -> MySQL):with(nolock) 去除、IsNull->IFNULL、
|
|
|
+ /// rtrim(a+' '+b)->TRIM(CONCAT(...))、convert(varchar,x)->CAST(x AS CHAR)、left(...)/占位过滤去除。
|
|
|
+ /// </summary>
|
|
|
+ private async Task<int> TransformStandardAsync(string batchId, DateTime now)
|
|
|
+ {
|
|
|
+ var rows = await _db.Ado.GetIntAsync(
|
|
|
+ "SELECT COUNT(1) FROM PurOrdRctDetail p INNER JOIN PurOrdRctMaster d ON p.Domain=d.Domain AND p.Receiver=d.Receiver WHERE p.RctType='rc'");
|
|
|
+ await _db.Ado.ExecuteCommandAsync(
|
|
|
+ """
|
|
|
+ INSERT INTO mdp_std_purchase_receipt
|
|
|
+ (tenant_id, factory_id, source_system, domain, receiver, line, rct_date, supp, sort_name,
|
|
|
+ item_num, item_name, item_spec, um, qty_ordered, qty_received, lot_serial, location,
|
|
|
+ ord_nbr, ord_line, blanket_line, pur_ord, pur_line, sales_job, address1,
|
|
|
+ req, req_line, dop_req, source_biz_key, sync_batch_id, sync_time)
|
|
|
+ SELECT
|
|
|
+ IFNULL(p.tenant_id, 0), 1, 'AIDOP', p.Domain, p.Receiver, p.Line, d.RctDate, p.Supp,
|
|
|
+ TRIM(CONCAT(d.Supp, ' ', IFNULL(s.SortName, ''))),
|
|
|
+ p.ItemNum, i.Descr, i.Descr1, p.UM, p.QtyOrded, p.QtyReceived, p.LotSerial, p.Location,
|
|
|
+ p.OrdNbr, p.OrdLine, pd.BlanketLine, pd.PurOrd, pd.Line, pd.SalesJob, a.Address1,
|
|
|
+ (CASE WHEN d.OrdType='DO' THEN '' ELSE sd.Req END),
|
|
|
+ sd.ReqLine,
|
|
|
+ (CASE WHEN d.OrdType='DO' THEN sd.Req ELSE dr.pr_billno END),
|
|
|
+ CONCAT(p.Domain, '#', p.Receiver, '#', p.Line), @BatchId, @Now
|
|
|
+ FROM PurOrdRctDetail p
|
|
|
+ INNER JOIN PurOrdRctMaster d ON p.Domain = d.Domain AND p.Receiver = d.Receiver
|
|
|
+ LEFT JOIN ItemMaster i ON p.Domain = i.Domain AND p.ItemNum = i.ItemNum
|
|
|
+ LEFT JOIN SuppMaster s ON d.Domain = s.Domain AND d.Supp = s.Supp
|
|
|
+ LEFT JOIN ConsigneeAddressMaster a ON p.Domain = a.Domain AND p.Supp = a.Address AND a.Typed = 'Supp'
|
|
|
+ LEFT JOIN PurOrdDetail pd ON p.Domain = pd.Domain
|
|
|
+ AND (CASE WHEN d.OrdType='DO' AND IFNULL(p.RctNbr,'')<>'' THEN p.RctNbr ELSE p.OrdNbr END)
|
|
|
+ = (CASE WHEN d.OrdType='DO' AND IFNULL(p.RctNbr,'')<>'' THEN pd.Contract ELSE pd.PurOrd END)
|
|
|
+ AND (CASE WHEN d.OrdType='DO' AND IFNULL(p.RctNbr,'')<>'' THEN p.BlanketLine ELSE p.OrdLine END) = pd.Line
|
|
|
+ LEFT JOIN PurOrdDetail sd ON p.Domain = sd.Domain AND p.OrdNbr = sd.PurOrd AND p.OrdLine = sd.Line
|
|
|
+ LEFT JOIN srm_pr_main dr ON CAST(dr.factory_id AS CHAR) = sd.Domain AND dr.SAP_pr_billno = sd.Req AND IFNULL(dr.SAP_pr_billno,'')<>''
|
|
|
+ WHERE p.RctType = 'rc'
|
|
|
+ ON DUPLICATE KEY UPDATE
|
|
|
+ factory_id=VALUES(factory_id), rct_date=VALUES(rct_date), supp=VALUES(supp), sort_name=VALUES(sort_name),
|
|
|
+ item_num=VALUES(item_num), item_name=VALUES(item_name), item_spec=VALUES(item_spec), um=VALUES(um),
|
|
|
+ qty_ordered=VALUES(qty_ordered), qty_received=VALUES(qty_received), lot_serial=VALUES(lot_serial), location=VALUES(location),
|
|
|
+ ord_nbr=VALUES(ord_nbr), ord_line=VALUES(ord_line), blanket_line=VALUES(blanket_line),
|
|
|
+ pur_ord=VALUES(pur_ord), pur_line=VALUES(pur_line), sales_job=VALUES(sales_job), address1=VALUES(address1),
|
|
|
+ req=VALUES(req), req_line=VALUES(req_line), dop_req=VALUES(dop_req),
|
|
|
+ sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
|
|
|
+ """,
|
|
|
+ new SugarParameter("@BatchId", batchId),
|
|
|
+ new SugarParameter("@Now", now));
|
|
|
+ return rows;
|
|
|
+ }
|
|
|
+
|
|
|
+ private async Task<long> InsertRunLogAsync(string batchId, DateTime startedAt, string triggerType)
|
|
|
+ {
|
|
|
+ await _db.Ado.ExecuteCommandAsync(
|
|
|
+ """
|
|
|
+ INSERT INTO mdp_transform_run_log
|
|
|
+ (tenant_id, job_code, job_name, trigger_type, batch_id, status, start_time)
|
|
|
+ VALUES (0, @JobCode, 'S5采购收货单MDP同步与标准化转换', @TriggerType, @BatchId, 'RUNNING', @StartTime)
|
|
|
+ """,
|
|
|
+ new SugarParameter("@JobCode", JobCode),
|
|
|
+ new SugarParameter("@TriggerType", NormalizeTriggerType(triggerType)),
|
|
|
+ new SugarParameter("@BatchId", batchId),
|
|
|
+ new SugarParameter("@StartTime", startedAt));
|
|
|
+ return await _db.Ado.GetLongAsync(
|
|
|
+ "SELECT id FROM mdp_transform_run_log WHERE batch_id=@BatchId ORDER BY id DESC LIMIT 1",
|
|
|
+ new List<SugarParameter> { new("@BatchId", batchId) });
|
|
|
+ }
|
|
|
+
|
|
|
+ private async Task MarkRunSuccessAsync(long runLogId, DateTime startedAt, PurchaseReceiptMdpSyncResult result)
|
|
|
+ {
|
|
|
+ var finishedAt = DateTime.Now;
|
|
|
+ await _db.Ado.ExecuteCommandAsync(
|
|
|
+ """
|
|
|
+ UPDATE mdp_transform_run_log
|
|
|
+ SET status='SUCCESS', end_time=@EndTime, duration_ms=@DurationMs,
|
|
|
+ stage_rows=0, standard_rows=@StandardRows, dwd_rows=0, update_time=CURRENT_TIMESTAMP
|
|
|
+ WHERE id=@Id
|
|
|
+ """,
|
|
|
+ new SugarParameter("@EndTime", finishedAt),
|
|
|
+ new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds),
|
|
|
+ new SugarParameter("@StandardRows", result.StdRows),
|
|
|
+ new SugarParameter("@Id", runLogId));
|
|
|
+ }
|
|
|
+
|
|
|
+ private async Task MarkRunFailedAsync(long runLogId, DateTime startedAt, string message)
|
|
|
+ {
|
|
|
+ var finishedAt = DateTime.Now;
|
|
|
+ await _db.Ado.ExecuteCommandAsync(
|
|
|
+ """
|
|
|
+ UPDATE mdp_transform_run_log
|
|
|
+ SET status='FAILED', end_time=@EndTime, duration_ms=@DurationMs,
|
|
|
+ error_message=@ErrorMessage, update_time=CURRENT_TIMESTAMP
|
|
|
+ WHERE id=@Id
|
|
|
+ """,
|
|
|
+ new SugarParameter("@EndTime", finishedAt),
|
|
|
+ new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds),
|
|
|
+ new SugarParameter("@ErrorMessage", message.Length > 2000 ? message[..2000] : message),
|
|
|
+ new SugarParameter("@Id", runLogId));
|
|
|
+ }
|
|
|
+
|
|
|
+ private static string NormalizeTriggerType(string? triggerType)
|
|
|
+ => string.IsNullOrWhiteSpace(triggerType) ? "AUTO" : triggerType.Trim().ToUpperInvariant();
|
|
|
+}
|
|
|
+
|
|
|
+/// <summary>采购收货单 MDP 同步转换结果。</summary>
|
|
|
+public sealed class PurchaseReceiptMdpSyncResult
|
|
|
+{
|
|
|
+ public long RunLogId { get; set; }
|
|
|
+ public string BatchId { get; set; } = string.Empty;
|
|
|
+ public int StdRows { get; set; }
|
|
|
+}
|