| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213 |
- namespace Admin.NET.Plugin.AiDOP.MaterialWarehouse;
- /// <summary>
- /// S5 采购收货单 数据中台只读同步转换服务(DOP 内部,方案 1)。
- ///
- /// 源:aidopdev PurOrdRctDetail(p) + PurOrdRctMaster(d),业务类型 RctType='rc'(采购收货);
- /// 头明细关联 p.Domain=d.Domain AND p.Receiver=d.Receiver;
- /// 维表 ItemMaster(i) / SuppMaster(s) / ConsigneeAddressMaster(a) / PurOrdDetail(pd,sd) / srm_pr_main(dr) 全 LEFT JOIN。
- /// 链路(方案 1):源表只读 → mdp_std_purchase_receipt(typed)。不经 mdp_stg(S3 已持有 mdp_stg_receipt),不建 dwd(只读列表)。
- ///
- /// 约束:
- /// - 只读源表,仅写 mdp_std_purchase_receipt;绝不 INSERT/UPDATE/DELETE 任何源表;不读/不改 S3 mdp_stg_receipt、S4 ado_s4_receipt。
- /// - 过滤 p.RctType='rc';不套 ERP 端 UserFactoryNum/UserAccount 占位过滤(全量同步,租户隔离在读 API 侧按 tenant_id)。
- /// - 派生列对齐旧系统 SQL:sort_name/req/dop_req 保持 CASE d.OrdType='DO' 分支语义;danjia/jiage 本批不落。
- /// - 明细粒度 = PurOrdRctDetail 一行(domain#receiver#line);rc 为 0 行时成功完成、处理数为 0,不报错。
- /// </summary>
- public class PurchaseReceiptMdpSyncService : ITransient
- {
- private const string JobCode = "S5_PURCHASE_RECEIPT_MDP_SYNC";
- private readonly ISqlSugarClient _db;
- public PurchaseReceiptMdpSyncService(ISqlSugarClient db)
- {
- _db = db;
- }
- /// <summary>全量同步转换:源表(rc) → mdp_std_purchase_receipt。</summary>
- public async Task<PurchaseReceiptMdpSyncResult> RunFullAsync(CancellationToken cancellationToken = default, string triggerType = "AUTO")
- {
- cancellationToken.ThrowIfCancellationRequested();
- await EnsureTablesAsync();
- var now = DateTime.Now;
- var batchId = $"S5_PUR_RCT_FULL_{now:yyyyMMddHHmmss}";
- var runLogId = await InsertRunLogAsync(batchId, now, triggerType);
- var result = new PurchaseReceiptMdpSyncResult { BatchId = batchId, RunLogId = runLogId };
- try
- {
- result.StdRows = await TransformStandardAsync(batchId, now);
- await MarkRunSuccessAsync(runLogId, now, result);
- return result;
- }
- catch (Exception ex)
- {
- await MarkRunFailedAsync(runLogId, now, ex.Message);
- throw;
- }
- }
- /// <summary>防御式建表(与 UpdateScripts WIP-S5PR.sql / 正式 1.0.<n>.sql 同构,幂等)。</summary>
- private async Task EnsureTablesAsync()
- {
- await _db.Ado.ExecuteCommandAsync(
- """
- CREATE TABLE IF NOT EXISTS mdp_std_purchase_receipt (
- id BIGINT AUTO_INCREMENT PRIMARY KEY,
- tenant_id BIGINT NOT NULL DEFAULT 0,
- factory_id BIGINT NULL DEFAULT 1,
- source_system VARCHAR(50) NOT NULL DEFAULT 'AIDOP',
- domain VARCHAR(24) NOT NULL,
- receiver VARCHAR(24) NOT NULL,
- line SMALLINT NOT NULL DEFAULT 0,
- rct_date DATETIME NULL,
- supp VARCHAR(20) NULL,
- sort_name VARCHAR(255) NULL,
- item_num VARCHAR(60) NULL,
- item_name VARCHAR(200) NULL,
- item_spec VARCHAR(200) NULL,
- um VARCHAR(8) NULL,
- qty_ordered DECIMAL(18,6) NULL DEFAULT 0,
- qty_received DECIMAL(18,6) NULL DEFAULT 0,
- lot_serial VARCHAR(120) NULL,
- location VARCHAR(8) NULL,
- ord_nbr VARCHAR(24) NULL,
- ord_line SMALLINT NULL,
- blanket_line INT NULL,
- pur_ord VARCHAR(24) NULL,
- pur_line SMALLINT NULL,
- sales_job VARCHAR(200) NULL,
- address1 VARCHAR(200) NULL,
- req VARCHAR(20) NULL,
- req_line INT NULL,
- dop_req VARCHAR(255) NULL,
- source_biz_key VARCHAR(200) NULL,
- sync_batch_id VARCHAR(100) NOT NULL,
- sync_time DATETIME NOT NULL,
- create_time DATETIME DEFAULT CURRENT_TIMESTAMP,
- update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
- UNIQUE KEY uk_mdp_std_pur_rct (tenant_id, domain, receiver, line),
- KEY idx_mdp_std_pur_rct_date (tenant_id, rct_date),
- KEY idx_mdp_std_pur_rct_item (tenant_id, item_num),
- KEY idx_mdp_std_pur_rct_supp (tenant_id, supp),
- KEY idx_mdp_std_pur_rct_purord (tenant_id, pur_ord),
- KEY idx_mdp_std_pur_rct_salesjob (tenant_id, sales_job),
- KEY idx_mdp_std_pur_rct_req (tenant_id, req),
- KEY idx_mdp_std_pur_rct_dopreq (tenant_id, dop_req)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='S5采购收货单标准层'
- """);
- }
- /// <summary>
- /// 标准化:PurOrdRctDetail(rc) + Master + 维表 -> mdp_std_purchase_receipt。返回处理明细行数。
- /// SQL 由旧系统源 SQL 翻译(SQL Server -> MySQL):with(nolock) 去除、IsNull->IFNULL、
- /// rtrim(a+' '+b)->TRIM(CONCAT(...))、convert(varchar,x)->CAST(x AS CHAR)、left(...)/占位过滤去除。
- /// </summary>
- private async Task<int> TransformStandardAsync(string batchId, DateTime now)
- {
- var rows = await _db.Ado.GetIntAsync(
- "SELECT COUNT(1) FROM PurOrdRctDetail p INNER JOIN PurOrdRctMaster d ON p.Domain=d.Domain AND p.Receiver=d.Receiver WHERE p.RctType='rc'");
- await _db.Ado.ExecuteCommandAsync(
- """
- INSERT INTO mdp_std_purchase_receipt
- (tenant_id, factory_id, source_system, domain, receiver, line, rct_date, supp, sort_name,
- item_num, item_name, item_spec, um, qty_ordered, qty_received, lot_serial, location,
- ord_nbr, ord_line, blanket_line, pur_ord, pur_line, sales_job, address1,
- req, req_line, dop_req, source_biz_key, sync_batch_id, sync_time)
- SELECT
- IFNULL(p.tenant_id, 0), 1, 'AIDOP', p.Domain, p.Receiver, p.Line, d.RctDate, p.Supp,
- TRIM(CONCAT(d.Supp, ' ', IFNULL(s.SortName, ''))),
- p.ItemNum, i.Descr, i.Descr1, p.UM, p.QtyOrded, p.QtyReceived, p.LotSerial, p.Location,
- p.OrdNbr, p.OrdLine, pd.BlanketLine, pd.PurOrd, pd.Line, pd.SalesJob, a.Address1,
- (CASE WHEN d.OrdType='DO' THEN '' ELSE sd.Req END),
- sd.ReqLine,
- (CASE WHEN d.OrdType='DO' THEN sd.Req ELSE dr.pr_billno END),
- CONCAT(p.Domain, '#', p.Receiver, '#', p.Line), @BatchId, @Now
- FROM PurOrdRctDetail p
- INNER JOIN PurOrdRctMaster d ON p.Domain = d.Domain AND p.Receiver = d.Receiver
- LEFT JOIN ItemMaster i ON p.Domain = i.Domain AND p.ItemNum = i.ItemNum
- LEFT JOIN SuppMaster s ON d.Domain = s.Domain AND d.Supp = s.Supp
- LEFT JOIN ConsigneeAddressMaster a ON p.Domain = a.Domain AND p.Supp = a.Address AND a.Typed = 'Supp'
- LEFT JOIN PurOrdDetail pd ON p.Domain = pd.Domain
- AND (CASE WHEN d.OrdType='DO' AND IFNULL(p.RctNbr,'')<>'' THEN p.RctNbr ELSE p.OrdNbr END)
- = (CASE WHEN d.OrdType='DO' AND IFNULL(p.RctNbr,'')<>'' THEN pd.Contract ELSE pd.PurOrd END)
- AND (CASE WHEN d.OrdType='DO' AND IFNULL(p.RctNbr,'')<>'' THEN p.BlanketLine ELSE p.OrdLine END) = pd.Line
- LEFT JOIN PurOrdDetail sd ON p.Domain = sd.Domain AND p.OrdNbr = sd.PurOrd AND p.OrdLine = sd.Line
- LEFT JOIN srm_pr_main dr ON CAST(dr.factory_id AS CHAR) = sd.Domain AND dr.SAP_pr_billno = sd.Req AND IFNULL(dr.SAP_pr_billno,'')<>''
- WHERE p.RctType = 'rc'
- ON DUPLICATE KEY UPDATE
- factory_id=VALUES(factory_id), rct_date=VALUES(rct_date), supp=VALUES(supp), sort_name=VALUES(sort_name),
- item_num=VALUES(item_num), item_name=VALUES(item_name), item_spec=VALUES(item_spec), um=VALUES(um),
- qty_ordered=VALUES(qty_ordered), qty_received=VALUES(qty_received), lot_serial=VALUES(lot_serial), location=VALUES(location),
- ord_nbr=VALUES(ord_nbr), ord_line=VALUES(ord_line), blanket_line=VALUES(blanket_line),
- pur_ord=VALUES(pur_ord), pur_line=VALUES(pur_line), sales_job=VALUES(sales_job), address1=VALUES(address1),
- req=VALUES(req), req_line=VALUES(req_line), dop_req=VALUES(dop_req),
- sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
- """,
- new SugarParameter("@BatchId", batchId),
- new SugarParameter("@Now", now));
- return rows;
- }
- private async Task<long> InsertRunLogAsync(string batchId, DateTime startedAt, string triggerType)
- {
- await _db.Ado.ExecuteCommandAsync(
- """
- INSERT INTO mdp_transform_run_log
- (tenant_id, job_code, job_name, trigger_type, batch_id, status, start_time)
- VALUES (0, @JobCode, 'S5采购收货单MDP同步与标准化转换', @TriggerType, @BatchId, 'RUNNING', @StartTime)
- """,
- new SugarParameter("@JobCode", JobCode),
- new SugarParameter("@TriggerType", NormalizeTriggerType(triggerType)),
- new SugarParameter("@BatchId", batchId),
- new SugarParameter("@StartTime", startedAt));
- return await _db.Ado.GetLongAsync(
- "SELECT id FROM mdp_transform_run_log WHERE batch_id=@BatchId ORDER BY id DESC LIMIT 1",
- new List<SugarParameter> { new("@BatchId", batchId) });
- }
- private async Task MarkRunSuccessAsync(long runLogId, DateTime startedAt, PurchaseReceiptMdpSyncResult result)
- {
- var finishedAt = DateTime.Now;
- await _db.Ado.ExecuteCommandAsync(
- """
- UPDATE mdp_transform_run_log
- SET status='SUCCESS', end_time=@EndTime, duration_ms=@DurationMs,
- stage_rows=0, standard_rows=@StandardRows, dwd_rows=0, update_time=CURRENT_TIMESTAMP
- WHERE id=@Id
- """,
- new SugarParameter("@EndTime", finishedAt),
- new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds),
- new SugarParameter("@StandardRows", result.StdRows),
- new SugarParameter("@Id", runLogId));
- }
- private async Task MarkRunFailedAsync(long runLogId, DateTime startedAt, string message)
- {
- var finishedAt = DateTime.Now;
- await _db.Ado.ExecuteCommandAsync(
- """
- UPDATE mdp_transform_run_log
- SET status='FAILED', end_time=@EndTime, duration_ms=@DurationMs,
- error_message=@ErrorMessage, update_time=CURRENT_TIMESTAMP
- WHERE id=@Id
- """,
- new SugarParameter("@EndTime", finishedAt),
- new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds),
- new SugarParameter("@ErrorMessage", message.Length > 2000 ? message[..2000] : message),
- new SugarParameter("@Id", runLogId));
- }
- private static string NormalizeTriggerType(string? triggerType)
- => string.IsNullOrWhiteSpace(triggerType) ? "AUTO" : triggerType.Trim().ToUpperInvariant();
- }
- /// <summary>采购收货单 MDP 同步转换结果。</summary>
- public sealed class PurchaseReceiptMdpSyncResult
- {
- public long RunLogId { get; set; }
- public string BatchId { get; set; } = string.Empty;
- public int StdRows { get; set; }
- }
|