namespace Admin.NET.Plugin.AiDOP.MaterialWarehouse; /// /// S5 委外发料单 数据中台只读同步转换服务(DOP 内部,独立于 S5MdpSyncTransformService 的 KPI 管线)。 /// /// 源:aidopdev NbrMaster / NbrDetail,业务类型 Type='CA';头明细关联 NbrDetail.NbrRecID -> NbrMaster.RecID。 /// 链路:NbrMaster/NbrDetail(CA) 只读 → mdp_stg_outsource_issue(_detail)(raw_data JSON) → mdp_std_outsource_issue(_detail)(typed)。 /// /// 约束: /// - 只读源表,仅写 mdp_stg_*/mdp_std_*;绝不 INSERT/UPDATE/DELETE NbrMaster/NbrDetail。 /// - 过滤 NbrMaster.Type='CA' AND IsActive=1;NbrDetail.Type='CA'。 /// - status_desc 规则本批仅:Status='C' -> '关闭',其他 -> ''。 /// - department_name 经 NbrMaster.Domain + NbrMaster.Department 关联 DepartmentMaster.Descr。 /// - 明细只转换已确认 9 列(ItemNum/ItemName/UM/QtyOrd/LocationFrom/LocationTo/Line/Status/Remark); /// 「发料数量/已发数/批次号」3 候选列本批后置,不转换、不落 typed 字段。 /// - CA 为 0 行时转换成功完成、处理数为 0,不报错。 /// public class OutsourceIssueMdpSyncService : ITransient { private const string JobCode = "S5_OUTSOURCE_ISSUE_MDP_SYNC"; private readonly ISqlSugarClient _db; public OutsourceIssueMdpSyncService(ISqlSugarClient db) { _db = db; } /// /// 全量同步转换:贴源(头/明细) → 标准化(头/明细)。 /// public async Task RunFullAsync(CancellationToken cancellationToken = default, string triggerType = "AUTO") { cancellationToken.ThrowIfCancellationRequested(); await EnsureTablesAsync(); var now = DateTime.Now; var batchId = $"S5_OUTSRC_ISSUE_FULL_{now:yyyyMMddHHmmss}"; var runLogId = await InsertRunLogAsync(batchId, now, triggerType); var result = new OutsourceIssueMdpSyncResult { BatchId = batchId, RunLogId = runLogId }; try { result.HeadStageRows = await SyncHeadStagingAsync(batchId, now); result.DetailStageRows = await SyncDetailStagingAsync(batchId, now); result.HeadStandardRows = await TransformHeadStandardAsync(batchId, now); result.DetailStandardRows = await TransformDetailStandardAsync(batchId, now); await MarkRunSuccessAsync(runLogId, now, result); return result; } catch (Exception ex) { await MarkRunFailedAsync(runLogId, now, ex.Message); throw; } } /// 防御式建表(与 UpdateScripts/1.0.206.sql 同构,幂等)。 private async Task EnsureTablesAsync() { await _db.Ado.ExecuteCommandAsync( """ CREATE TABLE IF NOT EXISTS mdp_stg_outsource_issue ( id BIGINT AUTO_INCREMENT PRIMARY KEY, tenant_id BIGINT NOT NULL DEFAULT 0, factory_id BIGINT NULL DEFAULT 1, source_system VARCHAR(50) NOT NULL DEFAULT 'AIDOP', source_table VARCHAR(100) NOT NULL, source_row_id VARCHAR(100) NOT NULL, source_biz_key VARCHAR(200) NULL, sync_batch_id VARCHAR(100) NOT NULL, sync_time DATETIME NOT NULL, process_status VARCHAR(20) NOT NULL DEFAULT 'PENDING', raw_data JSON NOT NULL, update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, UNIQUE KEY uk_mdp_stg_outsrc_issue (tenant_id, source_table, source_row_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='S5委外发料单头贴源层' """); await _db.Ado.ExecuteCommandAsync( """ CREATE TABLE IF NOT EXISTS mdp_stg_outsource_issue_detail ( id BIGINT AUTO_INCREMENT PRIMARY KEY, tenant_id BIGINT NOT NULL DEFAULT 0, factory_id BIGINT NULL DEFAULT 1, source_system VARCHAR(50) NOT NULL DEFAULT 'AIDOP', source_table VARCHAR(100) NOT NULL, source_row_id VARCHAR(100) NOT NULL, source_biz_key VARCHAR(200) NULL, sync_batch_id VARCHAR(100) NOT NULL, sync_time DATETIME NOT NULL, process_status VARCHAR(20) NOT NULL DEFAULT 'PENDING', raw_data JSON NOT NULL, update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, UNIQUE KEY uk_mdp_stg_outsrc_issue_dtl (tenant_id, source_table, source_row_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='S5委外发料单明细贴源层' """); await _db.Ado.ExecuteCommandAsync( """ CREATE TABLE IF NOT EXISTS mdp_std_outsource_issue ( id BIGINT AUTO_INCREMENT PRIMARY KEY, tenant_id BIGINT NOT NULL DEFAULT 0, factory_id BIGINT NULL DEFAULT 1, source_system VARCHAR(50) NOT NULL DEFAULT 'AIDOP', bill_no VARCHAR(24) NOT NULL, issue_date DATETIME NULL, outsource_no VARCHAR(60) NULL, work_order VARCHAR(64) NULL, department_code VARCHAR(20) NULL, department_name VARCHAR(255) NULL, issuer VARCHAR(255) NULL, status VARCHAR(8) NULL, status_desc VARCHAR(20) NULL, remark VARCHAR(200) NULL, create_user VARCHAR(24) NULL, source_create_time DATETIME NULL, eff_date DATETIME NULL, source_biz_key VARCHAR(200) NULL, sync_batch_id VARCHAR(100) NOT NULL, sync_time DATETIME NOT NULL, update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, UNIQUE KEY uk_mdp_std_outsrc_issue (tenant_id, bill_no), KEY idx_mdp_std_outsrc_issue_date (tenant_id, issue_date) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='S5委外发料单头标准层' """); await _db.Ado.ExecuteCommandAsync( """ CREATE TABLE IF NOT EXISTS mdp_std_outsource_issue_detail ( id BIGINT AUTO_INCREMENT PRIMARY KEY, tenant_id BIGINT NOT NULL DEFAULT 0, std_head_id BIGINT NULL, bill_no VARCHAR(24) NOT NULL, line SMALLINT NOT NULL DEFAULT 0, item_num VARCHAR(24) NULL, item_name TEXT NULL, um VARCHAR(8) NULL, qty_ord DECIMAL(18,6) NULL DEFAULT 0, location_from VARCHAR(8) NULL, location_to VARCHAR(8) NULL, status VARCHAR(8) NULL, remark TEXT NULL, source_biz_key VARCHAR(200) NULL, sync_batch_id VARCHAR(100) NOT NULL, sync_time DATETIME NOT NULL, update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, UNIQUE KEY uk_mdp_std_outsrc_issue_dtl (tenant_id, source_biz_key), KEY idx_mdp_std_outsrc_issue_dtl_head (std_head_id), KEY idx_mdp_std_outsrc_issue_dtl_bill (tenant_id, bill_no) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='S5委外发料单明细标准层' """); } /// 贴源头:NbrMaster(CA) -> mdp_stg_outsource_issue(raw_data JSON 快照)。返回源 CA 头行数。 private async Task SyncHeadStagingAsync(string batchId, DateTime now) { var rows = await _db.Ado.GetIntAsync("SELECT COUNT(1) FROM NbrMaster WHERE Type='CA' AND IsActive=1"); await _db.Ado.ExecuteCommandAsync( """ INSERT INTO mdp_stg_outsource_issue (tenant_id, factory_id, source_system, source_table, source_row_id, source_biz_key, sync_batch_id, sync_time, process_status, raw_data) SELECT IFNULL(m.tenant_id, 0), 1, 'AIDOP', 'NbrMaster', CAST(m.RecID AS CHAR), CAST(m.Nbr AS CHAR), @BatchId, @Now, 'PENDING', JSON_OBJECT( 'RecID', m.RecID, 'Nbr', m.Nbr, 'Date', m.Date, 'Address', m.Address, 'WorkOrd', m.WorkOrd, 'Remark', m.Remark, 'CreateUser', m.CreateUser, 'CreateTime', m.CreateTime, 'Status', m.Status, 'Domain', m.Domain, 'Department', m.Department, 'User1', CAST(m.User1 AS CHAR), 'EffDate', m.EffDate, 'Ufld1', m.Ufld1 ) FROM NbrMaster m WHERE m.Type='CA' AND m.IsActive=1 ON DUPLICATE KEY UPDATE source_biz_key=VALUES(source_biz_key), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time), process_status=VALUES(process_status), raw_data=VALUES(raw_data), update_time=CURRENT_TIMESTAMP """, new SugarParameter("@BatchId", batchId), new SugarParameter("@Now", now)); return rows; } /// 贴源明细:NbrDetail(CA) -> mdp_stg_outsource_issue_detail。返回源 CA 明细行数。 private async Task SyncDetailStagingAsync(string batchId, DateTime now) { var rows = await _db.Ado.GetIntAsync( """ SELECT COUNT(1) FROM NbrDetail dt JOIN NbrMaster m ON m.RecID = dt.NbrRecID AND m.Type='CA' AND m.IsActive=1 WHERE dt.Type='CA' """); await _db.Ado.ExecuteCommandAsync( """ INSERT INTO mdp_stg_outsource_issue_detail (tenant_id, factory_id, source_system, source_table, source_row_id, source_biz_key, sync_batch_id, sync_time, process_status, raw_data) SELECT IFNULL(dt.tenant_id, 0), 1, 'AIDOP', 'NbrDetail', CAST(dt.RecID AS CHAR), CONCAT(CAST(dt.Nbr AS CHAR), '#', dt.Line), @BatchId, @Now, 'PENDING', JSON_OBJECT( 'RecID', dt.RecID, 'NbrRecID', dt.NbrRecID, 'Nbr', dt.Nbr, 'Line', dt.Line, 'ItemNum', dt.ItemNum, 'ItemName', CAST(dt.ItemName AS CHAR), 'UM', dt.UM, 'QtyOrd', dt.QtyOrd, 'LocationFrom', dt.LocationFrom, 'LocationTo', dt.LocationTo, 'Status', dt.Status, 'Remark', CAST(dt.Remark AS CHAR) ) FROM NbrDetail dt JOIN NbrMaster m ON m.RecID = dt.NbrRecID AND m.Type='CA' AND m.IsActive=1 WHERE dt.Type='CA' ON DUPLICATE KEY UPDATE source_biz_key=VALUES(source_biz_key), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time), process_status=VALUES(process_status), raw_data=VALUES(raw_data), update_time=CURRENT_TIMESTAMP """, new SugarParameter("@BatchId", batchId), new SugarParameter("@Now", now)); return rows; } /// 标准化头:NbrMaster(CA) + DepartmentMaster -> mdp_std_outsource_issue。返回处理头行数。 private async Task TransformHeadStandardAsync(string batchId, DateTime now) { var rows = await _db.Ado.GetIntAsync("SELECT COUNT(1) FROM NbrMaster WHERE Type='CA' AND IsActive=1"); await _db.Ado.ExecuteCommandAsync( """ INSERT INTO mdp_std_outsource_issue (tenant_id, factory_id, source_system, bill_no, issue_date, outsource_no, work_order, department_code, department_name, issuer, status, status_desc, remark, create_user, source_create_time, eff_date, source_biz_key, sync_batch_id, sync_time) SELECT IFNULL(m.tenant_id, 0), 1, 'AIDOP', m.Nbr, m.Date, m.Address, m.WorkOrd, m.Department, d.Descr, CAST(m.User1 AS CHAR), m.Status, CASE WHEN m.Status='C' THEN '关闭' ELSE '' END, m.Remark, m.CreateUser, m.CreateTime, m.EffDate, CAST(m.Nbr AS CHAR), @BatchId, @Now FROM NbrMaster m LEFT JOIN DepartmentMaster d ON d.Domain = m.Domain AND d.Department = m.Department WHERE m.Type='CA' AND m.IsActive=1 ON DUPLICATE KEY UPDATE issue_date=VALUES(issue_date), outsource_no=VALUES(outsource_no), work_order=VALUES(work_order), department_code=VALUES(department_code), department_name=VALUES(department_name), issuer=VALUES(issuer), status=VALUES(status), status_desc=VALUES(status_desc), remark=VALUES(remark), create_user=VALUES(create_user), source_create_time=VALUES(source_create_time), eff_date=VALUES(eff_date), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP """, new SugarParameter("@BatchId", batchId), new SugarParameter("@Now", now)); return rows; } /// 标准化明细:NbrDetail(CA) -> mdp_std_outsource_issue_detail(仅已确认 9 列;回填 std_head_id)。返回处理明细行数。 private async Task TransformDetailStandardAsync(string batchId, DateTime now) { var rows = await _db.Ado.GetIntAsync( """ SELECT COUNT(1) FROM NbrDetail dt JOIN NbrMaster m ON m.RecID = dt.NbrRecID AND m.Type='CA' AND m.IsActive=1 WHERE dt.Type='CA' """); await _db.Ado.ExecuteCommandAsync( """ INSERT INTO mdp_std_outsource_issue_detail (tenant_id, std_head_id, bill_no, line, item_num, item_name, um, qty_ord, location_from, location_to, status, remark, source_biz_key, sync_batch_id, sync_time) SELECT IFNULL(dt.tenant_id, 0), h.id, dt.Nbr, dt.Line, dt.ItemNum, CAST(dt.ItemName AS CHAR), dt.UM, dt.QtyOrd, dt.LocationFrom, dt.LocationTo, dt.Status, CAST(dt.Remark AS CHAR), CONCAT(CAST(dt.Nbr AS CHAR), '#', dt.Line), @BatchId, @Now FROM NbrDetail dt JOIN NbrMaster m ON m.RecID = dt.NbrRecID AND m.Type='CA' AND m.IsActive=1 LEFT JOIN mdp_std_outsource_issue h ON h.tenant_id = IFNULL(dt.tenant_id, 0) AND h.bill_no = dt.Nbr WHERE dt.Type='CA' ON DUPLICATE KEY UPDATE std_head_id=VALUES(std_head_id), line=VALUES(line), item_num=VALUES(item_num), item_name=VALUES(item_name), um=VALUES(um), qty_ord=VALUES(qty_ord), location_from=VALUES(location_from), location_to=VALUES(location_to), status=VALUES(status), remark=VALUES(remark), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP """, new SugarParameter("@BatchId", batchId), new SugarParameter("@Now", now)); return rows; } private async Task InsertRunLogAsync(string batchId, DateTime startedAt, string triggerType) { await _db.Ado.ExecuteCommandAsync( """ INSERT INTO mdp_transform_run_log (tenant_id, job_code, job_name, trigger_type, batch_id, status, start_time) VALUES (0, @JobCode, 'S5委外发料单MDP同步与标准化转换', @TriggerType, @BatchId, 'RUNNING', @StartTime) """, new SugarParameter("@JobCode", JobCode), new SugarParameter("@TriggerType", NormalizeTriggerType(triggerType)), new SugarParameter("@BatchId", batchId), new SugarParameter("@StartTime", startedAt)); return await _db.Ado.GetLongAsync( "SELECT id FROM mdp_transform_run_log WHERE batch_id=@BatchId ORDER BY id DESC LIMIT 1", new List { new("@BatchId", batchId) }); } private async Task MarkRunSuccessAsync(long runLogId, DateTime startedAt, OutsourceIssueMdpSyncResult result) { var finishedAt = DateTime.Now; await _db.Ado.ExecuteCommandAsync( """ UPDATE mdp_transform_run_log SET status='SUCCESS', end_time=@EndTime, duration_ms=@DurationMs, stage_rows=@StageRows, standard_rows=@StandardRows, dwd_rows=0, update_time=CURRENT_TIMESTAMP WHERE id=@Id """, new SugarParameter("@EndTime", finishedAt), new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds), new SugarParameter("@StageRows", result.HeadStageRows + result.DetailStageRows), new SugarParameter("@StandardRows", result.HeadStandardRows + result.DetailStandardRows), new SugarParameter("@Id", runLogId)); } private async Task MarkRunFailedAsync(long runLogId, DateTime startedAt, string message) { var finishedAt = DateTime.Now; await _db.Ado.ExecuteCommandAsync( """ UPDATE mdp_transform_run_log SET status='FAILED', end_time=@EndTime, duration_ms=@DurationMs, error_message=@ErrorMessage, update_time=CURRENT_TIMESTAMP WHERE id=@Id """, new SugarParameter("@EndTime", finishedAt), new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds), new SugarParameter("@ErrorMessage", message.Length > 2000 ? message[..2000] : message), new SugarParameter("@Id", runLogId)); } private static string NormalizeTriggerType(string? triggerType) => string.IsNullOrWhiteSpace(triggerType) ? "AUTO" : triggerType.Trim().ToUpperInvariant(); } /// 委外发料单 MDP 同步转换结果。 public sealed class OutsourceIssueMdpSyncResult { public long RunLogId { get; set; } public string BatchId { get; set; } = string.Empty; public int HeadStageRows { get; set; } public int DetailStageRows { get; set; } public int HeadStandardRows { get; set; } public int DetailStandardRows { get; set; } }