| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340 |
- namespace Admin.NET.Plugin.AiDOP.MaterialWarehouse;
- /// <summary>
- /// S5 委外发料单 数据中台只读同步转换服务(DOP 内部,独立于 S5MdpSyncTransformService 的 KPI 管线)。
- ///
- /// 源:aidopdev NbrMaster / NbrDetail,业务类型 Type='CA';头明细关联 NbrDetail.NbrRecID -> NbrMaster.RecID。
- /// 链路:NbrMaster/NbrDetail(CA) 只读 → mdp_stg_outsource_issue(_detail)(raw_data JSON) → mdp_std_outsource_issue(_detail)(typed)。
- ///
- /// 约束:
- /// - 只读源表,仅写 mdp_stg_*/mdp_std_*;绝不 INSERT/UPDATE/DELETE NbrMaster/NbrDetail。
- /// - 过滤 NbrMaster.Type='CA' AND IsActive=1;NbrDetail.Type='CA'。
- /// - status_desc 规则本批仅:Status='C' -> '关闭',其他 -> ''。
- /// - department_name 经 NbrMaster.Domain + NbrMaster.Department 关联 DepartmentMaster.Descr。
- /// - 明细只转换已确认 9 列(ItemNum/ItemName/UM/QtyOrd/LocationFrom/LocationTo/Line/Status/Remark);
- /// 「发料数量/已发数/批次号」3 候选列本批后置,不转换、不落 typed 字段。
- /// - CA 为 0 行时转换成功完成、处理数为 0,不报错。
- /// </summary>
- public class OutsourceIssueMdpSyncService : ITransient
- {
- private const string JobCode = "S5_OUTSOURCE_ISSUE_MDP_SYNC";
- private readonly ISqlSugarClient _db;
- public OutsourceIssueMdpSyncService(ISqlSugarClient db)
- {
- _db = db;
- }
- /// <summary>
- /// 全量同步转换:贴源(头/明细) → 标准化(头/明细)。
- /// </summary>
- public async Task<OutsourceIssueMdpSyncResult> RunFullAsync(CancellationToken cancellationToken = default, string triggerType = "AUTO")
- {
- cancellationToken.ThrowIfCancellationRequested();
- await EnsureTablesAsync();
- var now = DateTime.Now;
- var batchId = $"S5_OUTSRC_ISSUE_FULL_{now:yyyyMMddHHmmss}";
- var runLogId = await InsertRunLogAsync(batchId, now, triggerType);
- var result = new OutsourceIssueMdpSyncResult { BatchId = batchId, RunLogId = runLogId };
- try
- {
- result.HeadStageRows = await SyncHeadStagingAsync(batchId, now);
- result.DetailStageRows = await SyncDetailStagingAsync(batchId, now);
- result.HeadStandardRows = await TransformHeadStandardAsync(batchId, now);
- result.DetailStandardRows = await TransformDetailStandardAsync(batchId, now);
- await MarkRunSuccessAsync(runLogId, now, result);
- return result;
- }
- catch (Exception ex)
- {
- await MarkRunFailedAsync(runLogId, now, ex.Message);
- throw;
- }
- }
- /// <summary>防御式建表(与 UpdateScripts/1.0.206.sql 同构,幂等)。</summary>
- private async Task EnsureTablesAsync()
- {
- await _db.Ado.ExecuteCommandAsync(
- """
- CREATE TABLE IF NOT EXISTS mdp_stg_outsource_issue (
- id BIGINT AUTO_INCREMENT PRIMARY KEY,
- tenant_id BIGINT NOT NULL DEFAULT 0,
- factory_id BIGINT NULL DEFAULT 1,
- source_system VARCHAR(50) NOT NULL DEFAULT 'AIDOP',
- source_table VARCHAR(100) NOT NULL,
- source_row_id VARCHAR(100) NOT NULL,
- source_biz_key VARCHAR(200) NULL,
- sync_batch_id VARCHAR(100) NOT NULL,
- sync_time DATETIME NOT NULL,
- process_status VARCHAR(20) NOT NULL DEFAULT 'PENDING',
- raw_data JSON NOT NULL,
- update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
- UNIQUE KEY uk_mdp_stg_outsrc_issue (tenant_id, source_table, source_row_id)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='S5委外发料单头贴源层'
- """);
- await _db.Ado.ExecuteCommandAsync(
- """
- CREATE TABLE IF NOT EXISTS mdp_stg_outsource_issue_detail (
- id BIGINT AUTO_INCREMENT PRIMARY KEY,
- tenant_id BIGINT NOT NULL DEFAULT 0,
- factory_id BIGINT NULL DEFAULT 1,
- source_system VARCHAR(50) NOT NULL DEFAULT 'AIDOP',
- source_table VARCHAR(100) NOT NULL,
- source_row_id VARCHAR(100) NOT NULL,
- source_biz_key VARCHAR(200) NULL,
- sync_batch_id VARCHAR(100) NOT NULL,
- sync_time DATETIME NOT NULL,
- process_status VARCHAR(20) NOT NULL DEFAULT 'PENDING',
- raw_data JSON NOT NULL,
- update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
- UNIQUE KEY uk_mdp_stg_outsrc_issue_dtl (tenant_id, source_table, source_row_id)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='S5委外发料单明细贴源层'
- """);
- await _db.Ado.ExecuteCommandAsync(
- """
- CREATE TABLE IF NOT EXISTS mdp_std_outsource_issue (
- id BIGINT AUTO_INCREMENT PRIMARY KEY,
- tenant_id BIGINT NOT NULL DEFAULT 0,
- factory_id BIGINT NULL DEFAULT 1,
- source_system VARCHAR(50) NOT NULL DEFAULT 'AIDOP',
- bill_no VARCHAR(24) NOT NULL,
- issue_date DATETIME NULL,
- outsource_no VARCHAR(60) NULL,
- work_order VARCHAR(64) NULL,
- department_code VARCHAR(20) NULL,
- department_name VARCHAR(255) NULL,
- issuer VARCHAR(255) NULL,
- status VARCHAR(8) NULL,
- status_desc VARCHAR(20) NULL,
- remark VARCHAR(200) NULL,
- create_user VARCHAR(24) NULL,
- source_create_time DATETIME NULL,
- eff_date DATETIME NULL,
- source_biz_key VARCHAR(200) NULL,
- sync_batch_id VARCHAR(100) NOT NULL,
- sync_time DATETIME NOT NULL,
- update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
- UNIQUE KEY uk_mdp_std_outsrc_issue (tenant_id, bill_no),
- KEY idx_mdp_std_outsrc_issue_date (tenant_id, issue_date)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='S5委外发料单头标准层'
- """);
- await _db.Ado.ExecuteCommandAsync(
- """
- CREATE TABLE IF NOT EXISTS mdp_std_outsource_issue_detail (
- id BIGINT AUTO_INCREMENT PRIMARY KEY,
- tenant_id BIGINT NOT NULL DEFAULT 0,
- std_head_id BIGINT NULL,
- bill_no VARCHAR(24) NOT NULL,
- line SMALLINT NOT NULL DEFAULT 0,
- item_num VARCHAR(24) NULL,
- item_name TEXT NULL,
- um VARCHAR(8) NULL,
- qty_ord DECIMAL(18,6) NULL DEFAULT 0,
- location_from VARCHAR(8) NULL,
- location_to VARCHAR(8) NULL,
- status VARCHAR(8) NULL,
- remark TEXT NULL,
- source_biz_key VARCHAR(200) NULL,
- sync_batch_id VARCHAR(100) NOT NULL,
- sync_time DATETIME NOT NULL,
- update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
- UNIQUE KEY uk_mdp_std_outsrc_issue_dtl (tenant_id, source_biz_key),
- KEY idx_mdp_std_outsrc_issue_dtl_head (std_head_id),
- KEY idx_mdp_std_outsrc_issue_dtl_bill (tenant_id, bill_no)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='S5委外发料单明细标准层'
- """);
- }
- /// <summary>贴源头:NbrMaster(CA) -> mdp_stg_outsource_issue(raw_data JSON 快照)。返回源 CA 头行数。</summary>
- private async Task<int> SyncHeadStagingAsync(string batchId, DateTime now)
- {
- var rows = await _db.Ado.GetIntAsync("SELECT COUNT(1) FROM NbrMaster WHERE Type='CA' AND IsActive=1");
- await _db.Ado.ExecuteCommandAsync(
- """
- INSERT INTO mdp_stg_outsource_issue
- (tenant_id, factory_id, source_system, source_table, source_row_id, source_biz_key, sync_batch_id, sync_time, process_status, raw_data)
- SELECT
- IFNULL(m.tenant_id, 0), 1, 'AIDOP', 'NbrMaster',
- CAST(m.RecID AS CHAR), CAST(m.Nbr AS CHAR), @BatchId, @Now, 'PENDING',
- JSON_OBJECT(
- 'RecID', m.RecID, 'Nbr', m.Nbr, 'Date', m.Date, 'Address', m.Address,
- 'WorkOrd', m.WorkOrd, 'Remark', m.Remark, 'CreateUser', m.CreateUser, 'CreateTime', m.CreateTime,
- 'Status', m.Status, 'Domain', m.Domain, 'Department', m.Department,
- 'User1', CAST(m.User1 AS CHAR), 'EffDate', m.EffDate, 'Ufld1', m.Ufld1
- )
- FROM NbrMaster m
- WHERE m.Type='CA' AND m.IsActive=1
- ON DUPLICATE KEY UPDATE
- source_biz_key=VALUES(source_biz_key), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time),
- process_status=VALUES(process_status), raw_data=VALUES(raw_data), update_time=CURRENT_TIMESTAMP
- """,
- new SugarParameter("@BatchId", batchId),
- new SugarParameter("@Now", now));
- return rows;
- }
- /// <summary>贴源明细:NbrDetail(CA) -> mdp_stg_outsource_issue_detail。返回源 CA 明细行数。</summary>
- private async Task<int> SyncDetailStagingAsync(string batchId, DateTime now)
- {
- var rows = await _db.Ado.GetIntAsync(
- """
- SELECT COUNT(1) FROM NbrDetail dt
- JOIN NbrMaster m ON m.RecID = dt.NbrRecID AND m.Type='CA' AND m.IsActive=1
- WHERE dt.Type='CA'
- """);
- await _db.Ado.ExecuteCommandAsync(
- """
- INSERT INTO mdp_stg_outsource_issue_detail
- (tenant_id, factory_id, source_system, source_table, source_row_id, source_biz_key, sync_batch_id, sync_time, process_status, raw_data)
- SELECT
- IFNULL(dt.tenant_id, 0), 1, 'AIDOP', 'NbrDetail',
- CAST(dt.RecID AS CHAR), CONCAT(CAST(dt.Nbr AS CHAR), '#', dt.Line), @BatchId, @Now, 'PENDING',
- JSON_OBJECT(
- 'RecID', dt.RecID, 'NbrRecID', dt.NbrRecID, 'Nbr', dt.Nbr, 'Line', dt.Line,
- 'ItemNum', dt.ItemNum, 'ItemName', CAST(dt.ItemName AS CHAR), 'UM', dt.UM, 'QtyOrd', dt.QtyOrd,
- 'LocationFrom', dt.LocationFrom, 'LocationTo', dt.LocationTo, 'Status', dt.Status, 'Remark', CAST(dt.Remark AS CHAR)
- )
- FROM NbrDetail dt
- JOIN NbrMaster m ON m.RecID = dt.NbrRecID AND m.Type='CA' AND m.IsActive=1
- WHERE dt.Type='CA'
- ON DUPLICATE KEY UPDATE
- source_biz_key=VALUES(source_biz_key), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time),
- process_status=VALUES(process_status), raw_data=VALUES(raw_data), update_time=CURRENT_TIMESTAMP
- """,
- new SugarParameter("@BatchId", batchId),
- new SugarParameter("@Now", now));
- return rows;
- }
- /// <summary>标准化头:NbrMaster(CA) + DepartmentMaster -> mdp_std_outsource_issue。返回处理头行数。</summary>
- private async Task<int> TransformHeadStandardAsync(string batchId, DateTime now)
- {
- var rows = await _db.Ado.GetIntAsync("SELECT COUNT(1) FROM NbrMaster WHERE Type='CA' AND IsActive=1");
- await _db.Ado.ExecuteCommandAsync(
- """
- INSERT INTO mdp_std_outsource_issue
- (tenant_id, factory_id, source_system, bill_no, issue_date, outsource_no, work_order,
- department_code, department_name, issuer, status, status_desc, remark, create_user,
- source_create_time, eff_date, source_biz_key, sync_batch_id, sync_time)
- SELECT
- IFNULL(m.tenant_id, 0), 1, 'AIDOP', m.Nbr, m.Date, m.Address, m.WorkOrd,
- m.Department, d.Descr, CAST(m.User1 AS CHAR), m.Status,
- CASE WHEN m.Status='C' THEN '关闭' ELSE '' END,
- m.Remark, m.CreateUser, m.CreateTime, m.EffDate,
- CAST(m.Nbr AS CHAR), @BatchId, @Now
- FROM NbrMaster m
- LEFT JOIN DepartmentMaster d ON d.Domain = m.Domain AND d.Department = m.Department
- WHERE m.Type='CA' AND m.IsActive=1
- ON DUPLICATE KEY UPDATE
- issue_date=VALUES(issue_date), outsource_no=VALUES(outsource_no), work_order=VALUES(work_order),
- department_code=VALUES(department_code), department_name=VALUES(department_name), issuer=VALUES(issuer),
- status=VALUES(status), status_desc=VALUES(status_desc), remark=VALUES(remark), create_user=VALUES(create_user),
- source_create_time=VALUES(source_create_time), eff_date=VALUES(eff_date),
- sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
- """,
- new SugarParameter("@BatchId", batchId),
- new SugarParameter("@Now", now));
- return rows;
- }
- /// <summary>标准化明细:NbrDetail(CA) -> mdp_std_outsource_issue_detail(仅已确认 9 列;回填 std_head_id)。返回处理明细行数。</summary>
- private async Task<int> TransformDetailStandardAsync(string batchId, DateTime now)
- {
- var rows = await _db.Ado.GetIntAsync(
- """
- SELECT COUNT(1) FROM NbrDetail dt
- JOIN NbrMaster m ON m.RecID = dt.NbrRecID AND m.Type='CA' AND m.IsActive=1
- WHERE dt.Type='CA'
- """);
- await _db.Ado.ExecuteCommandAsync(
- """
- INSERT INTO mdp_std_outsource_issue_detail
- (tenant_id, std_head_id, bill_no, line, item_num, item_name, um, qty_ord,
- location_from, location_to, status, remark, source_biz_key, sync_batch_id, sync_time)
- SELECT
- IFNULL(dt.tenant_id, 0), h.id, dt.Nbr, dt.Line, dt.ItemNum, CAST(dt.ItemName AS CHAR), dt.UM, dt.QtyOrd,
- dt.LocationFrom, dt.LocationTo, dt.Status, CAST(dt.Remark AS CHAR),
- CONCAT(CAST(dt.Nbr AS CHAR), '#', dt.Line), @BatchId, @Now
- FROM NbrDetail dt
- JOIN NbrMaster m ON m.RecID = dt.NbrRecID AND m.Type='CA' AND m.IsActive=1
- LEFT JOIN mdp_std_outsource_issue h ON h.tenant_id = IFNULL(dt.tenant_id, 0) AND h.bill_no = dt.Nbr
- WHERE dt.Type='CA'
- ON DUPLICATE KEY UPDATE
- std_head_id=VALUES(std_head_id), line=VALUES(line), item_num=VALUES(item_num), item_name=VALUES(item_name),
- um=VALUES(um), qty_ord=VALUES(qty_ord), location_from=VALUES(location_from), location_to=VALUES(location_to),
- status=VALUES(status), remark=VALUES(remark), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time),
- update_time=CURRENT_TIMESTAMP
- """,
- new SugarParameter("@BatchId", batchId),
- new SugarParameter("@Now", now));
- return rows;
- }
- private async Task<long> InsertRunLogAsync(string batchId, DateTime startedAt, string triggerType)
- {
- await _db.Ado.ExecuteCommandAsync(
- """
- INSERT INTO mdp_transform_run_log
- (tenant_id, job_code, job_name, trigger_type, batch_id, status, start_time)
- VALUES (0, @JobCode, 'S5委外发料单MDP同步与标准化转换', @TriggerType, @BatchId, 'RUNNING', @StartTime)
- """,
- new SugarParameter("@JobCode", JobCode),
- new SugarParameter("@TriggerType", NormalizeTriggerType(triggerType)),
- new SugarParameter("@BatchId", batchId),
- new SugarParameter("@StartTime", startedAt));
- return await _db.Ado.GetLongAsync(
- "SELECT id FROM mdp_transform_run_log WHERE batch_id=@BatchId ORDER BY id DESC LIMIT 1",
- new List<SugarParameter> { new("@BatchId", batchId) });
- }
- private async Task MarkRunSuccessAsync(long runLogId, DateTime startedAt, OutsourceIssueMdpSyncResult result)
- {
- var finishedAt = DateTime.Now;
- await _db.Ado.ExecuteCommandAsync(
- """
- UPDATE mdp_transform_run_log
- SET status='SUCCESS', end_time=@EndTime, duration_ms=@DurationMs,
- stage_rows=@StageRows, standard_rows=@StandardRows, dwd_rows=0, update_time=CURRENT_TIMESTAMP
- WHERE id=@Id
- """,
- new SugarParameter("@EndTime", finishedAt),
- new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds),
- new SugarParameter("@StageRows", result.HeadStageRows + result.DetailStageRows),
- new SugarParameter("@StandardRows", result.HeadStandardRows + result.DetailStandardRows),
- new SugarParameter("@Id", runLogId));
- }
- private async Task MarkRunFailedAsync(long runLogId, DateTime startedAt, string message)
- {
- var finishedAt = DateTime.Now;
- await _db.Ado.ExecuteCommandAsync(
- """
- UPDATE mdp_transform_run_log
- SET status='FAILED', end_time=@EndTime, duration_ms=@DurationMs,
- error_message=@ErrorMessage, update_time=CURRENT_TIMESTAMP
- WHERE id=@Id
- """,
- new SugarParameter("@EndTime", finishedAt),
- new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds),
- new SugarParameter("@ErrorMessage", message.Length > 2000 ? message[..2000] : message),
- new SugarParameter("@Id", runLogId));
- }
- private static string NormalizeTriggerType(string? triggerType)
- => string.IsNullOrWhiteSpace(triggerType) ? "AUTO" : triggerType.Trim().ToUpperInvariant();
- }
- /// <summary>委外发料单 MDP 同步转换结果。</summary>
- public sealed class OutsourceIssueMdpSyncResult
- {
- public long RunLogId { get; set; }
- public string BatchId { get; set; } = string.Empty;
- public int HeadStageRows { get; set; }
- public int DetailStageRows { get; set; }
- public int HeadStandardRows { get; set; }
- public int DetailStandardRows { get; set; }
- }
|