namespace Admin.NET.Plugin.AiDOP.MaterialWarehouse;
///
/// S5 委外发料单 数据中台只读同步转换服务(DOP 内部,独立于 S5MdpSyncTransformService 的 KPI 管线)。
///
/// 源:aidopdev NbrMaster / NbrDetail,业务类型 Type='CA';头明细关联 NbrDetail.NbrRecID -> NbrMaster.RecID。
/// 链路:NbrMaster/NbrDetail(CA) 只读 → mdp_stg_outsource_issue(_detail)(raw_data JSON) → mdp_std_outsource_issue(_detail)(typed)。
///
/// 约束:
/// - 只读源表,仅写 mdp_stg_*/mdp_std_*;绝不 INSERT/UPDATE/DELETE NbrMaster/NbrDetail。
/// - 过滤 NbrMaster.Type='CA' AND IsActive=1;NbrDetail.Type='CA'。
/// - status_desc 规则本批仅:Status='C' -> '关闭',其他 -> ''。
/// - department_name 经 NbrMaster.Domain + NbrMaster.Department 关联 DepartmentMaster.Descr。
/// - 明细只转换已确认 9 列(ItemNum/ItemName/UM/QtyOrd/LocationFrom/LocationTo/Line/Status/Remark);
/// 「发料数量/已发数/批次号」3 候选列本批后置,不转换、不落 typed 字段。
/// - CA 为 0 行时转换成功完成、处理数为 0,不报错。
///
public class OutsourceIssueMdpSyncService : ITransient
{
private const string JobCode = "S5_OUTSOURCE_ISSUE_MDP_SYNC";
private readonly ISqlSugarClient _db;
public OutsourceIssueMdpSyncService(ISqlSugarClient db)
{
_db = db;
}
///
/// 全量同步转换:贴源(头/明细) → 标准化(头/明细)。
///
public async Task RunFullAsync(CancellationToken cancellationToken = default, string triggerType = "AUTO")
{
cancellationToken.ThrowIfCancellationRequested();
await EnsureTablesAsync();
var now = DateTime.Now;
var batchId = $"S5_OUTSRC_ISSUE_FULL_{now:yyyyMMddHHmmss}";
var runLogId = await InsertRunLogAsync(batchId, now, triggerType);
var result = new OutsourceIssueMdpSyncResult { BatchId = batchId, RunLogId = runLogId };
try
{
result.HeadStageRows = await SyncHeadStagingAsync(batchId, now);
result.DetailStageRows = await SyncDetailStagingAsync(batchId, now);
result.HeadStandardRows = await TransformHeadStandardAsync(batchId, now);
result.DetailStandardRows = await TransformDetailStandardAsync(batchId, now);
await MarkRunSuccessAsync(runLogId, now, result);
return result;
}
catch (Exception ex)
{
await MarkRunFailedAsync(runLogId, now, ex.Message);
throw;
}
}
/// 防御式建表(与 UpdateScripts/1.0.206.sql 同构,幂等)。
private async Task EnsureTablesAsync()
{
await _db.Ado.ExecuteCommandAsync(
"""
CREATE TABLE IF NOT EXISTS mdp_stg_outsource_issue (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
tenant_id BIGINT NOT NULL DEFAULT 0,
factory_id BIGINT NULL DEFAULT 1,
source_system VARCHAR(50) NOT NULL DEFAULT 'AIDOP',
source_table VARCHAR(100) NOT NULL,
source_row_id VARCHAR(100) NOT NULL,
source_biz_key VARCHAR(200) NULL,
sync_batch_id VARCHAR(100) NOT NULL,
sync_time DATETIME NOT NULL,
process_status VARCHAR(20) NOT NULL DEFAULT 'PENDING',
raw_data JSON NOT NULL,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uk_mdp_stg_outsrc_issue (tenant_id, source_table, source_row_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='S5委外发料单头贴源层'
""");
await _db.Ado.ExecuteCommandAsync(
"""
CREATE TABLE IF NOT EXISTS mdp_stg_outsource_issue_detail (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
tenant_id BIGINT NOT NULL DEFAULT 0,
factory_id BIGINT NULL DEFAULT 1,
source_system VARCHAR(50) NOT NULL DEFAULT 'AIDOP',
source_table VARCHAR(100) NOT NULL,
source_row_id VARCHAR(100) NOT NULL,
source_biz_key VARCHAR(200) NULL,
sync_batch_id VARCHAR(100) NOT NULL,
sync_time DATETIME NOT NULL,
process_status VARCHAR(20) NOT NULL DEFAULT 'PENDING',
raw_data JSON NOT NULL,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uk_mdp_stg_outsrc_issue_dtl (tenant_id, source_table, source_row_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='S5委外发料单明细贴源层'
""");
await _db.Ado.ExecuteCommandAsync(
"""
CREATE TABLE IF NOT EXISTS mdp_std_outsource_issue (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
tenant_id BIGINT NOT NULL DEFAULT 0,
factory_id BIGINT NULL DEFAULT 1,
source_system VARCHAR(50) NOT NULL DEFAULT 'AIDOP',
bill_no VARCHAR(24) NOT NULL,
issue_date DATETIME NULL,
outsource_no VARCHAR(60) NULL,
work_order VARCHAR(64) NULL,
department_code VARCHAR(20) NULL,
department_name VARCHAR(255) NULL,
issuer VARCHAR(255) NULL,
status VARCHAR(8) NULL,
status_desc VARCHAR(20) NULL,
remark VARCHAR(200) NULL,
create_user VARCHAR(24) NULL,
source_create_time DATETIME NULL,
eff_date DATETIME NULL,
source_biz_key VARCHAR(200) NULL,
sync_batch_id VARCHAR(100) NOT NULL,
sync_time DATETIME NOT NULL,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uk_mdp_std_outsrc_issue (tenant_id, bill_no),
KEY idx_mdp_std_outsrc_issue_date (tenant_id, issue_date)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='S5委外发料单头标准层'
""");
await _db.Ado.ExecuteCommandAsync(
"""
CREATE TABLE IF NOT EXISTS mdp_std_outsource_issue_detail (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
tenant_id BIGINT NOT NULL DEFAULT 0,
std_head_id BIGINT NULL,
bill_no VARCHAR(24) NOT NULL,
line SMALLINT NOT NULL DEFAULT 0,
item_num VARCHAR(24) NULL,
item_name TEXT NULL,
um VARCHAR(8) NULL,
qty_ord DECIMAL(18,6) NULL DEFAULT 0,
location_from VARCHAR(8) NULL,
location_to VARCHAR(8) NULL,
status VARCHAR(8) NULL,
remark TEXT NULL,
source_biz_key VARCHAR(200) NULL,
sync_batch_id VARCHAR(100) NOT NULL,
sync_time DATETIME NOT NULL,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uk_mdp_std_outsrc_issue_dtl (tenant_id, source_biz_key),
KEY idx_mdp_std_outsrc_issue_dtl_head (std_head_id),
KEY idx_mdp_std_outsrc_issue_dtl_bill (tenant_id, bill_no)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='S5委外发料单明细标准层'
""");
}
/// 贴源头:NbrMaster(CA) -> mdp_stg_outsource_issue(raw_data JSON 快照)。返回源 CA 头行数。
private async Task SyncHeadStagingAsync(string batchId, DateTime now)
{
var rows = await _db.Ado.GetIntAsync("SELECT COUNT(1) FROM NbrMaster WHERE Type='CA' AND IsActive=1");
await _db.Ado.ExecuteCommandAsync(
"""
INSERT INTO mdp_stg_outsource_issue
(tenant_id, factory_id, source_system, source_table, source_row_id, source_biz_key, sync_batch_id, sync_time, process_status, raw_data)
SELECT
IFNULL(m.tenant_id, 0), 1, 'AIDOP', 'NbrMaster',
CAST(m.RecID AS CHAR), CAST(m.Nbr AS CHAR), @BatchId, @Now, 'PENDING',
JSON_OBJECT(
'RecID', m.RecID, 'Nbr', m.Nbr, 'Date', m.Date, 'Address', m.Address,
'WorkOrd', m.WorkOrd, 'Remark', m.Remark, 'CreateUser', m.CreateUser, 'CreateTime', m.CreateTime,
'Status', m.Status, 'Domain', m.Domain, 'Department', m.Department,
'User1', CAST(m.User1 AS CHAR), 'EffDate', m.EffDate, 'Ufld1', m.Ufld1
)
FROM NbrMaster m
WHERE m.Type='CA' AND m.IsActive=1
ON DUPLICATE KEY UPDATE
source_biz_key=VALUES(source_biz_key), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time),
process_status=VALUES(process_status), raw_data=VALUES(raw_data), update_time=CURRENT_TIMESTAMP
""",
new SugarParameter("@BatchId", batchId),
new SugarParameter("@Now", now));
return rows;
}
/// 贴源明细:NbrDetail(CA) -> mdp_stg_outsource_issue_detail。返回源 CA 明细行数。
private async Task SyncDetailStagingAsync(string batchId, DateTime now)
{
var rows = await _db.Ado.GetIntAsync(
"""
SELECT COUNT(1) FROM NbrDetail dt
JOIN NbrMaster m ON m.RecID = dt.NbrRecID AND m.Type='CA' AND m.IsActive=1
WHERE dt.Type='CA'
""");
await _db.Ado.ExecuteCommandAsync(
"""
INSERT INTO mdp_stg_outsource_issue_detail
(tenant_id, factory_id, source_system, source_table, source_row_id, source_biz_key, sync_batch_id, sync_time, process_status, raw_data)
SELECT
IFNULL(dt.tenant_id, 0), 1, 'AIDOP', 'NbrDetail',
CAST(dt.RecID AS CHAR), CONCAT(CAST(dt.Nbr AS CHAR), '#', dt.Line), @BatchId, @Now, 'PENDING',
JSON_OBJECT(
'RecID', dt.RecID, 'NbrRecID', dt.NbrRecID, 'Nbr', dt.Nbr, 'Line', dt.Line,
'ItemNum', dt.ItemNum, 'ItemName', CAST(dt.ItemName AS CHAR), 'UM', dt.UM, 'QtyOrd', dt.QtyOrd,
'LocationFrom', dt.LocationFrom, 'LocationTo', dt.LocationTo, 'Status', dt.Status, 'Remark', CAST(dt.Remark AS CHAR)
)
FROM NbrDetail dt
JOIN NbrMaster m ON m.RecID = dt.NbrRecID AND m.Type='CA' AND m.IsActive=1
WHERE dt.Type='CA'
ON DUPLICATE KEY UPDATE
source_biz_key=VALUES(source_biz_key), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time),
process_status=VALUES(process_status), raw_data=VALUES(raw_data), update_time=CURRENT_TIMESTAMP
""",
new SugarParameter("@BatchId", batchId),
new SugarParameter("@Now", now));
return rows;
}
/// 标准化头:NbrMaster(CA) + DepartmentMaster -> mdp_std_outsource_issue。返回处理头行数。
private async Task TransformHeadStandardAsync(string batchId, DateTime now)
{
var rows = await _db.Ado.GetIntAsync("SELECT COUNT(1) FROM NbrMaster WHERE Type='CA' AND IsActive=1");
await _db.Ado.ExecuteCommandAsync(
"""
INSERT INTO mdp_std_outsource_issue
(tenant_id, factory_id, source_system, bill_no, issue_date, outsource_no, work_order,
department_code, department_name, issuer, status, status_desc, remark, create_user,
source_create_time, eff_date, source_biz_key, sync_batch_id, sync_time)
SELECT
IFNULL(m.tenant_id, 0), 1, 'AIDOP', m.Nbr, m.Date, m.Address, m.WorkOrd,
m.Department, d.Descr, CAST(m.User1 AS CHAR), m.Status,
CASE WHEN m.Status='C' THEN '关闭' ELSE '' END,
m.Remark, m.CreateUser, m.CreateTime, m.EffDate,
CAST(m.Nbr AS CHAR), @BatchId, @Now
FROM NbrMaster m
LEFT JOIN DepartmentMaster d ON d.Domain = m.Domain AND d.Department = m.Department
WHERE m.Type='CA' AND m.IsActive=1
ON DUPLICATE KEY UPDATE
issue_date=VALUES(issue_date), outsource_no=VALUES(outsource_no), work_order=VALUES(work_order),
department_code=VALUES(department_code), department_name=VALUES(department_name), issuer=VALUES(issuer),
status=VALUES(status), status_desc=VALUES(status_desc), remark=VALUES(remark), create_user=VALUES(create_user),
source_create_time=VALUES(source_create_time), eff_date=VALUES(eff_date),
sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
""",
new SugarParameter("@BatchId", batchId),
new SugarParameter("@Now", now));
return rows;
}
/// 标准化明细:NbrDetail(CA) -> mdp_std_outsource_issue_detail(仅已确认 9 列;回填 std_head_id)。返回处理明细行数。
private async Task TransformDetailStandardAsync(string batchId, DateTime now)
{
var rows = await _db.Ado.GetIntAsync(
"""
SELECT COUNT(1) FROM NbrDetail dt
JOIN NbrMaster m ON m.RecID = dt.NbrRecID AND m.Type='CA' AND m.IsActive=1
WHERE dt.Type='CA'
""");
await _db.Ado.ExecuteCommandAsync(
"""
INSERT INTO mdp_std_outsource_issue_detail
(tenant_id, std_head_id, bill_no, line, item_num, item_name, um, qty_ord,
location_from, location_to, status, remark, source_biz_key, sync_batch_id, sync_time)
SELECT
IFNULL(dt.tenant_id, 0), h.id, dt.Nbr, dt.Line, dt.ItemNum, CAST(dt.ItemName AS CHAR), dt.UM, dt.QtyOrd,
dt.LocationFrom, dt.LocationTo, dt.Status, CAST(dt.Remark AS CHAR),
CONCAT(CAST(dt.Nbr AS CHAR), '#', dt.Line), @BatchId, @Now
FROM NbrDetail dt
JOIN NbrMaster m ON m.RecID = dt.NbrRecID AND m.Type='CA' AND m.IsActive=1
LEFT JOIN mdp_std_outsource_issue h ON h.tenant_id = IFNULL(dt.tenant_id, 0) AND h.bill_no = dt.Nbr
WHERE dt.Type='CA'
ON DUPLICATE KEY UPDATE
std_head_id=VALUES(std_head_id), line=VALUES(line), item_num=VALUES(item_num), item_name=VALUES(item_name),
um=VALUES(um), qty_ord=VALUES(qty_ord), location_from=VALUES(location_from), location_to=VALUES(location_to),
status=VALUES(status), remark=VALUES(remark), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time),
update_time=CURRENT_TIMESTAMP
""",
new SugarParameter("@BatchId", batchId),
new SugarParameter("@Now", now));
return rows;
}
private async Task InsertRunLogAsync(string batchId, DateTime startedAt, string triggerType)
{
await _db.Ado.ExecuteCommandAsync(
"""
INSERT INTO mdp_transform_run_log
(tenant_id, job_code, job_name, trigger_type, batch_id, status, start_time)
VALUES (0, @JobCode, 'S5委外发料单MDP同步与标准化转换', @TriggerType, @BatchId, 'RUNNING', @StartTime)
""",
new SugarParameter("@JobCode", JobCode),
new SugarParameter("@TriggerType", NormalizeTriggerType(triggerType)),
new SugarParameter("@BatchId", batchId),
new SugarParameter("@StartTime", startedAt));
return await _db.Ado.GetLongAsync(
"SELECT id FROM mdp_transform_run_log WHERE batch_id=@BatchId ORDER BY id DESC LIMIT 1",
new List { new("@BatchId", batchId) });
}
private async Task MarkRunSuccessAsync(long runLogId, DateTime startedAt, OutsourceIssueMdpSyncResult result)
{
var finishedAt = DateTime.Now;
await _db.Ado.ExecuteCommandAsync(
"""
UPDATE mdp_transform_run_log
SET status='SUCCESS', end_time=@EndTime, duration_ms=@DurationMs,
stage_rows=@StageRows, standard_rows=@StandardRows, dwd_rows=0, update_time=CURRENT_TIMESTAMP
WHERE id=@Id
""",
new SugarParameter("@EndTime", finishedAt),
new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds),
new SugarParameter("@StageRows", result.HeadStageRows + result.DetailStageRows),
new SugarParameter("@StandardRows", result.HeadStandardRows + result.DetailStandardRows),
new SugarParameter("@Id", runLogId));
}
private async Task MarkRunFailedAsync(long runLogId, DateTime startedAt, string message)
{
var finishedAt = DateTime.Now;
await _db.Ado.ExecuteCommandAsync(
"""
UPDATE mdp_transform_run_log
SET status='FAILED', end_time=@EndTime, duration_ms=@DurationMs,
error_message=@ErrorMessage, update_time=CURRENT_TIMESTAMP
WHERE id=@Id
""",
new SugarParameter("@EndTime", finishedAt),
new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds),
new SugarParameter("@ErrorMessage", message.Length > 2000 ? message[..2000] : message),
new SugarParameter("@Id", runLogId));
}
private static string NormalizeTriggerType(string? triggerType)
=> string.IsNullOrWhiteSpace(triggerType) ? "AUTO" : triggerType.Trim().ToUpperInvariant();
}
/// 委外发料单 MDP 同步转换结果。
public sealed class OutsourceIssueMdpSyncResult
{
public long RunLogId { get; set; }
public string BatchId { get; set; } = string.Empty;
public int HeadStageRows { get; set; }
public int DetailStageRows { get; set; }
public int HeadStandardRows { get; set; }
public int DetailStandardRows { get; set; }
}