namespace Admin.NET.Plugin.AiDOP.MaterialWarehouse; /// /// S5 采购收货单 数据中台只读同步转换服务(DOP 内部,方案 1)。 /// /// 源:aidopdev PurOrdRctDetail(p) + PurOrdRctMaster(d),业务类型 RctType='rc'(采购收货); /// 头明细关联 p.Domain=d.Domain AND p.Receiver=d.Receiver; /// 维表 ItemMaster(i) / SuppMaster(s) / ConsigneeAddressMaster(a) / PurOrdDetail(pd,sd) / srm_pr_main(dr) 全 LEFT JOIN。 /// 链路(方案 1):源表只读 → mdp_std_purchase_receipt(typed)。不经 mdp_stg(S3 已持有 mdp_stg_receipt),不建 dwd(只读列表)。 /// /// 约束: /// - 只读源表,仅写 mdp_std_purchase_receipt;绝不 INSERT/UPDATE/DELETE 任何源表;不读/不改 S3 mdp_stg_receipt、S4 ado_s4_receipt。 /// - 过滤 p.RctType='rc';不套 ERP 端 UserFactoryNum/UserAccount 占位过滤(全量同步,租户隔离在读 API 侧按 tenant_id)。 /// - 派生列对齐旧系统 SQL:sort_name/req/dop_req 保持 CASE d.OrdType='DO' 分支语义;danjia/jiage 本批不落。 /// - 明细粒度 = PurOrdRctDetail 一行(domain#receiver#line);rc 为 0 行时成功完成、处理数为 0,不报错。 /// public class PurchaseReceiptMdpSyncService : ITransient { private const string JobCode = "S5_PURCHASE_RECEIPT_MDP_SYNC"; private readonly ISqlSugarClient _db; public PurchaseReceiptMdpSyncService(ISqlSugarClient db) { _db = db; } /// 全量同步转换:源表(rc) → mdp_std_purchase_receipt。 public async Task RunFullAsync(CancellationToken cancellationToken = default, string triggerType = "AUTO") { cancellationToken.ThrowIfCancellationRequested(); await EnsureTablesAsync(); var now = DateTime.Now; var batchId = $"S5_PUR_RCT_FULL_{now:yyyyMMddHHmmss}"; var runLogId = await InsertRunLogAsync(batchId, now, triggerType); var result = new PurchaseReceiptMdpSyncResult { BatchId = batchId, RunLogId = runLogId }; try { result.StdRows = await TransformStandardAsync(batchId, now); await MarkRunSuccessAsync(runLogId, now, result); return result; } catch (Exception ex) { await MarkRunFailedAsync(runLogId, now, ex.Message); throw; } } /// 防御式建表(与 UpdateScripts WIP-S5PR.sql / 正式 1.0.<n>.sql 同构,幂等)。 private async Task EnsureTablesAsync() { await _db.Ado.ExecuteCommandAsync( """ CREATE TABLE IF NOT EXISTS mdp_std_purchase_receipt ( id BIGINT AUTO_INCREMENT PRIMARY KEY, tenant_id BIGINT NOT NULL DEFAULT 0, factory_id BIGINT NULL DEFAULT 1, source_system VARCHAR(50) NOT NULL DEFAULT 'AIDOP', domain VARCHAR(24) NOT NULL, receiver VARCHAR(24) NOT NULL, line SMALLINT NOT NULL DEFAULT 0, rct_date DATETIME NULL, supp VARCHAR(20) NULL, sort_name VARCHAR(255) NULL, item_num VARCHAR(60) NULL, item_name VARCHAR(200) NULL, item_spec VARCHAR(200) NULL, um VARCHAR(8) NULL, qty_ordered DECIMAL(18,6) NULL DEFAULT 0, qty_received DECIMAL(18,6) NULL DEFAULT 0, lot_serial VARCHAR(120) NULL, location VARCHAR(8) NULL, ord_nbr VARCHAR(24) NULL, ord_line SMALLINT NULL, blanket_line INT NULL, pur_ord VARCHAR(24) NULL, pur_line SMALLINT NULL, sales_job VARCHAR(200) NULL, address1 VARCHAR(200) NULL, req VARCHAR(20) NULL, req_line INT NULL, dop_req VARCHAR(255) NULL, source_biz_key VARCHAR(200) NULL, sync_batch_id VARCHAR(100) NOT NULL, sync_time DATETIME NOT NULL, create_time DATETIME DEFAULT CURRENT_TIMESTAMP, update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, UNIQUE KEY uk_mdp_std_pur_rct (tenant_id, domain, receiver, line), KEY idx_mdp_std_pur_rct_date (tenant_id, rct_date), KEY idx_mdp_std_pur_rct_item (tenant_id, item_num), KEY idx_mdp_std_pur_rct_supp (tenant_id, supp), KEY idx_mdp_std_pur_rct_purord (tenant_id, pur_ord), KEY idx_mdp_std_pur_rct_salesjob (tenant_id, sales_job), KEY idx_mdp_std_pur_rct_req (tenant_id, req), KEY idx_mdp_std_pur_rct_dopreq (tenant_id, dop_req) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='S5采购收货单标准层' """); } /// /// 标准化:PurOrdRctDetail(rc) + Master + 维表 -> mdp_std_purchase_receipt。返回处理明细行数。 /// SQL 由旧系统源 SQL 翻译(SQL Server -> MySQL):with(nolock) 去除、IsNull->IFNULL、 /// rtrim(a+' '+b)->TRIM(CONCAT(...))、convert(varchar,x)->CAST(x AS CHAR)、left(...)/占位过滤去除。 /// private async Task TransformStandardAsync(string batchId, DateTime now) { var rows = await _db.Ado.GetIntAsync( "SELECT COUNT(1) FROM PurOrdRctDetail p INNER JOIN PurOrdRctMaster d ON p.Domain=d.Domain AND p.Receiver=d.Receiver WHERE p.RctType='rc'"); await _db.Ado.ExecuteCommandAsync( """ INSERT INTO mdp_std_purchase_receipt (tenant_id, factory_id, source_system, domain, receiver, line, rct_date, supp, sort_name, item_num, item_name, item_spec, um, qty_ordered, qty_received, lot_serial, location, ord_nbr, ord_line, blanket_line, pur_ord, pur_line, sales_job, address1, req, req_line, dop_req, source_biz_key, sync_batch_id, sync_time) SELECT IFNULL(p.tenant_id, 0), 1, 'AIDOP', p.Domain, p.Receiver, p.Line, d.RctDate, p.Supp, TRIM(CONCAT(d.Supp, ' ', IFNULL(s.SortName, ''))), p.ItemNum, i.Descr, i.Descr1, p.UM, p.QtyOrded, p.QtyReceived, p.LotSerial, p.Location, p.OrdNbr, p.OrdLine, pd.BlanketLine, pd.PurOrd, pd.Line, pd.SalesJob, a.Address1, (CASE WHEN d.OrdType='DO' THEN '' ELSE sd.Req END), sd.ReqLine, (CASE WHEN d.OrdType='DO' THEN sd.Req ELSE dr.pr_billno END), CONCAT(p.Domain, '#', p.Receiver, '#', p.Line), @BatchId, @Now FROM PurOrdRctDetail p INNER JOIN PurOrdRctMaster d ON p.Domain = d.Domain AND p.Receiver = d.Receiver LEFT JOIN ItemMaster i ON p.Domain = i.Domain AND p.ItemNum = i.ItemNum LEFT JOIN SuppMaster s ON d.Domain = s.Domain AND d.Supp = s.Supp LEFT JOIN ConsigneeAddressMaster a ON p.Domain = a.Domain AND p.Supp = a.Address AND a.Typed = 'Supp' LEFT JOIN PurOrdDetail pd ON p.Domain = pd.Domain AND (CASE WHEN d.OrdType='DO' AND IFNULL(p.RctNbr,'')<>'' THEN p.RctNbr ELSE p.OrdNbr END) = (CASE WHEN d.OrdType='DO' AND IFNULL(p.RctNbr,'')<>'' THEN pd.Contract ELSE pd.PurOrd END) AND (CASE WHEN d.OrdType='DO' AND IFNULL(p.RctNbr,'')<>'' THEN p.BlanketLine ELSE p.OrdLine END) = pd.Line LEFT JOIN PurOrdDetail sd ON p.Domain = sd.Domain AND p.OrdNbr = sd.PurOrd AND p.OrdLine = sd.Line LEFT JOIN srm_pr_main dr ON CAST(dr.factory_id AS CHAR) = sd.Domain AND dr.SAP_pr_billno = sd.Req AND IFNULL(dr.SAP_pr_billno,'')<>'' WHERE p.RctType = 'rc' ON DUPLICATE KEY UPDATE factory_id=VALUES(factory_id), rct_date=VALUES(rct_date), supp=VALUES(supp), sort_name=VALUES(sort_name), item_num=VALUES(item_num), item_name=VALUES(item_name), item_spec=VALUES(item_spec), um=VALUES(um), qty_ordered=VALUES(qty_ordered), qty_received=VALUES(qty_received), lot_serial=VALUES(lot_serial), location=VALUES(location), ord_nbr=VALUES(ord_nbr), ord_line=VALUES(ord_line), blanket_line=VALUES(blanket_line), pur_ord=VALUES(pur_ord), pur_line=VALUES(pur_line), sales_job=VALUES(sales_job), address1=VALUES(address1), req=VALUES(req), req_line=VALUES(req_line), dop_req=VALUES(dop_req), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP """, new SugarParameter("@BatchId", batchId), new SugarParameter("@Now", now)); return rows; } private async Task InsertRunLogAsync(string batchId, DateTime startedAt, string triggerType) { await _db.Ado.ExecuteCommandAsync( """ INSERT INTO mdp_transform_run_log (tenant_id, job_code, job_name, trigger_type, batch_id, status, start_time) VALUES (0, @JobCode, 'S5采购收货单MDP同步与标准化转换', @TriggerType, @BatchId, 'RUNNING', @StartTime) """, new SugarParameter("@JobCode", JobCode), new SugarParameter("@TriggerType", NormalizeTriggerType(triggerType)), new SugarParameter("@BatchId", batchId), new SugarParameter("@StartTime", startedAt)); return await _db.Ado.GetLongAsync( "SELECT id FROM mdp_transform_run_log WHERE batch_id=@BatchId ORDER BY id DESC LIMIT 1", new List { new("@BatchId", batchId) }); } private async Task MarkRunSuccessAsync(long runLogId, DateTime startedAt, PurchaseReceiptMdpSyncResult result) { var finishedAt = DateTime.Now; await _db.Ado.ExecuteCommandAsync( """ UPDATE mdp_transform_run_log SET status='SUCCESS', end_time=@EndTime, duration_ms=@DurationMs, stage_rows=0, standard_rows=@StandardRows, dwd_rows=0, update_time=CURRENT_TIMESTAMP WHERE id=@Id """, new SugarParameter("@EndTime", finishedAt), new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds), new SugarParameter("@StandardRows", result.StdRows), new SugarParameter("@Id", runLogId)); } private async Task MarkRunFailedAsync(long runLogId, DateTime startedAt, string message) { var finishedAt = DateTime.Now; await _db.Ado.ExecuteCommandAsync( """ UPDATE mdp_transform_run_log SET status='FAILED', end_time=@EndTime, duration_ms=@DurationMs, error_message=@ErrorMessage, update_time=CURRENT_TIMESTAMP WHERE id=@Id """, new SugarParameter("@EndTime", finishedAt), new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds), new SugarParameter("@ErrorMessage", message.Length > 2000 ? message[..2000] : message), new SugarParameter("@Id", runLogId)); } private static string NormalizeTriggerType(string? triggerType) => string.IsNullOrWhiteSpace(triggerType) ? "AUTO" : triggerType.Trim().ToUpperInvariant(); } /// 采购收货单 MDP 同步转换结果。 public sealed class PurchaseReceiptMdpSyncResult { public long RunLogId { get; set; } public string BatchId { get; set; } = string.Empty; public int StdRows { get; set; } }