namespace Admin.NET.Plugin.AiDOP.MaterialWarehouse;
///
/// S5 采购收货单 数据中台只读同步转换服务(DOP 内部,方案 1)。
///
/// 源:aidopdev PurOrdRctDetail(p) + PurOrdRctMaster(d),业务类型 RctType='rc'(采购收货);
/// 头明细关联 p.Domain=d.Domain AND p.Receiver=d.Receiver;
/// 维表 ItemMaster(i) / SuppMaster(s) / ConsigneeAddressMaster(a) / PurOrdDetail(pd,sd) / srm_pr_main(dr) 全 LEFT JOIN。
/// 链路(方案 1):源表只读 → mdp_std_purchase_receipt(typed)。不经 mdp_stg(S3 已持有 mdp_stg_receipt),不建 dwd(只读列表)。
///
/// 约束:
/// - 只读源表,仅写 mdp_std_purchase_receipt;绝不 INSERT/UPDATE/DELETE 任何源表;不读/不改 S3 mdp_stg_receipt、S4 ado_s4_receipt。
/// - 过滤 p.RctType='rc';不套 ERP 端 UserFactoryNum/UserAccount 占位过滤(全量同步,租户隔离在读 API 侧按 tenant_id)。
/// - 派生列对齐旧系统 SQL:sort_name/req/dop_req 保持 CASE d.OrdType='DO' 分支语义;danjia/jiage 本批不落。
/// - 明细粒度 = PurOrdRctDetail 一行(domain#receiver#line);rc 为 0 行时成功完成、处理数为 0,不报错。
///
public class PurchaseReceiptMdpSyncService : ITransient
{
private const string JobCode = "S5_PURCHASE_RECEIPT_MDP_SYNC";
private readonly ISqlSugarClient _db;
public PurchaseReceiptMdpSyncService(ISqlSugarClient db)
{
_db = db;
}
/// 全量同步转换:源表(rc) → mdp_std_purchase_receipt。
public async Task RunFullAsync(CancellationToken cancellationToken = default, string triggerType = "AUTO")
{
cancellationToken.ThrowIfCancellationRequested();
await EnsureTablesAsync();
var now = DateTime.Now;
var batchId = $"S5_PUR_RCT_FULL_{now:yyyyMMddHHmmss}";
var runLogId = await InsertRunLogAsync(batchId, now, triggerType);
var result = new PurchaseReceiptMdpSyncResult { BatchId = batchId, RunLogId = runLogId };
try
{
result.StdRows = await TransformStandardAsync(batchId, now);
await MarkRunSuccessAsync(runLogId, now, result);
return result;
}
catch (Exception ex)
{
await MarkRunFailedAsync(runLogId, now, ex.Message);
throw;
}
}
/// 防御式建表(与 UpdateScripts WIP-S5PR.sql / 正式 1.0.<n>.sql 同构,幂等)。
private async Task EnsureTablesAsync()
{
await _db.Ado.ExecuteCommandAsync(
"""
CREATE TABLE IF NOT EXISTS mdp_std_purchase_receipt (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
tenant_id BIGINT NOT NULL DEFAULT 0,
factory_id BIGINT NULL DEFAULT 1,
source_system VARCHAR(50) NOT NULL DEFAULT 'AIDOP',
domain VARCHAR(24) NOT NULL,
receiver VARCHAR(24) NOT NULL,
line SMALLINT NOT NULL DEFAULT 0,
rct_date DATETIME NULL,
supp VARCHAR(20) NULL,
sort_name VARCHAR(255) NULL,
item_num VARCHAR(60) NULL,
item_name VARCHAR(200) NULL,
item_spec VARCHAR(200) NULL,
um VARCHAR(8) NULL,
qty_ordered DECIMAL(18,6) NULL DEFAULT 0,
qty_received DECIMAL(18,6) NULL DEFAULT 0,
lot_serial VARCHAR(120) NULL,
location VARCHAR(8) NULL,
ord_nbr VARCHAR(24) NULL,
ord_line SMALLINT NULL,
blanket_line INT NULL,
pur_ord VARCHAR(24) NULL,
pur_line SMALLINT NULL,
sales_job VARCHAR(200) NULL,
address1 VARCHAR(200) NULL,
req VARCHAR(20) NULL,
req_line INT NULL,
dop_req VARCHAR(255) NULL,
source_biz_key VARCHAR(200) NULL,
sync_batch_id VARCHAR(100) NOT NULL,
sync_time DATETIME NOT NULL,
create_time DATETIME DEFAULT CURRENT_TIMESTAMP,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uk_mdp_std_pur_rct (tenant_id, domain, receiver, line),
KEY idx_mdp_std_pur_rct_date (tenant_id, rct_date),
KEY idx_mdp_std_pur_rct_item (tenant_id, item_num),
KEY idx_mdp_std_pur_rct_supp (tenant_id, supp),
KEY idx_mdp_std_pur_rct_purord (tenant_id, pur_ord),
KEY idx_mdp_std_pur_rct_salesjob (tenant_id, sales_job),
KEY idx_mdp_std_pur_rct_req (tenant_id, req),
KEY idx_mdp_std_pur_rct_dopreq (tenant_id, dop_req)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='S5采购收货单标准层'
""");
}
///
/// 标准化:PurOrdRctDetail(rc) + Master + 维表 -> mdp_std_purchase_receipt。返回处理明细行数。
/// SQL 由旧系统源 SQL 翻译(SQL Server -> MySQL):with(nolock) 去除、IsNull->IFNULL、
/// rtrim(a+' '+b)->TRIM(CONCAT(...))、convert(varchar,x)->CAST(x AS CHAR)、left(...)/占位过滤去除。
///
private async Task TransformStandardAsync(string batchId, DateTime now)
{
var rows = await _db.Ado.GetIntAsync(
"SELECT COUNT(1) FROM PurOrdRctDetail p INNER JOIN PurOrdRctMaster d ON p.Domain=d.Domain AND p.Receiver=d.Receiver WHERE p.RctType='rc'");
await _db.Ado.ExecuteCommandAsync(
"""
INSERT INTO mdp_std_purchase_receipt
(tenant_id, factory_id, source_system, domain, receiver, line, rct_date, supp, sort_name,
item_num, item_name, item_spec, um, qty_ordered, qty_received, lot_serial, location,
ord_nbr, ord_line, blanket_line, pur_ord, pur_line, sales_job, address1,
req, req_line, dop_req, source_biz_key, sync_batch_id, sync_time)
SELECT
IFNULL(p.tenant_id, 0), 1, 'AIDOP', p.Domain, p.Receiver, p.Line, d.RctDate, p.Supp,
TRIM(CONCAT(d.Supp, ' ', IFNULL(s.SortName, ''))),
p.ItemNum, i.Descr, i.Descr1, p.UM, p.QtyOrded, p.QtyReceived, p.LotSerial, p.Location,
p.OrdNbr, p.OrdLine, pd.BlanketLine, pd.PurOrd, pd.Line, pd.SalesJob, a.Address1,
(CASE WHEN d.OrdType='DO' THEN '' ELSE sd.Req END),
sd.ReqLine,
(CASE WHEN d.OrdType='DO' THEN sd.Req ELSE dr.pr_billno END),
CONCAT(p.Domain, '#', p.Receiver, '#', p.Line), @BatchId, @Now
FROM PurOrdRctDetail p
INNER JOIN PurOrdRctMaster d ON p.Domain = d.Domain AND p.Receiver = d.Receiver
LEFT JOIN ItemMaster i ON p.Domain = i.Domain AND p.ItemNum = i.ItemNum
LEFT JOIN SuppMaster s ON d.Domain = s.Domain AND d.Supp = s.Supp
LEFT JOIN ConsigneeAddressMaster a ON p.Domain = a.Domain AND p.Supp = a.Address AND a.Typed = 'Supp'
LEFT JOIN PurOrdDetail pd ON p.Domain = pd.Domain
AND (CASE WHEN d.OrdType='DO' AND IFNULL(p.RctNbr,'')<>'' THEN p.RctNbr ELSE p.OrdNbr END)
= (CASE WHEN d.OrdType='DO' AND IFNULL(p.RctNbr,'')<>'' THEN pd.Contract ELSE pd.PurOrd END)
AND (CASE WHEN d.OrdType='DO' AND IFNULL(p.RctNbr,'')<>'' THEN p.BlanketLine ELSE p.OrdLine END) = pd.Line
LEFT JOIN PurOrdDetail sd ON p.Domain = sd.Domain AND p.OrdNbr = sd.PurOrd AND p.OrdLine = sd.Line
LEFT JOIN srm_pr_main dr ON CAST(dr.factory_id AS CHAR) = sd.Domain AND dr.SAP_pr_billno = sd.Req AND IFNULL(dr.SAP_pr_billno,'')<>''
WHERE p.RctType = 'rc'
ON DUPLICATE KEY UPDATE
factory_id=VALUES(factory_id), rct_date=VALUES(rct_date), supp=VALUES(supp), sort_name=VALUES(sort_name),
item_num=VALUES(item_num), item_name=VALUES(item_name), item_spec=VALUES(item_spec), um=VALUES(um),
qty_ordered=VALUES(qty_ordered), qty_received=VALUES(qty_received), lot_serial=VALUES(lot_serial), location=VALUES(location),
ord_nbr=VALUES(ord_nbr), ord_line=VALUES(ord_line), blanket_line=VALUES(blanket_line),
pur_ord=VALUES(pur_ord), pur_line=VALUES(pur_line), sales_job=VALUES(sales_job), address1=VALUES(address1),
req=VALUES(req), req_line=VALUES(req_line), dop_req=VALUES(dop_req),
sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
""",
new SugarParameter("@BatchId", batchId),
new SugarParameter("@Now", now));
return rows;
}
private async Task InsertRunLogAsync(string batchId, DateTime startedAt, string triggerType)
{
await _db.Ado.ExecuteCommandAsync(
"""
INSERT INTO mdp_transform_run_log
(tenant_id, job_code, job_name, trigger_type, batch_id, status, start_time)
VALUES (0, @JobCode, 'S5采购收货单MDP同步与标准化转换', @TriggerType, @BatchId, 'RUNNING', @StartTime)
""",
new SugarParameter("@JobCode", JobCode),
new SugarParameter("@TriggerType", NormalizeTriggerType(triggerType)),
new SugarParameter("@BatchId", batchId),
new SugarParameter("@StartTime", startedAt));
return await _db.Ado.GetLongAsync(
"SELECT id FROM mdp_transform_run_log WHERE batch_id=@BatchId ORDER BY id DESC LIMIT 1",
new List { new("@BatchId", batchId) });
}
private async Task MarkRunSuccessAsync(long runLogId, DateTime startedAt, PurchaseReceiptMdpSyncResult result)
{
var finishedAt = DateTime.Now;
await _db.Ado.ExecuteCommandAsync(
"""
UPDATE mdp_transform_run_log
SET status='SUCCESS', end_time=@EndTime, duration_ms=@DurationMs,
stage_rows=0, standard_rows=@StandardRows, dwd_rows=0, update_time=CURRENT_TIMESTAMP
WHERE id=@Id
""",
new SugarParameter("@EndTime", finishedAt),
new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds),
new SugarParameter("@StandardRows", result.StdRows),
new SugarParameter("@Id", runLogId));
}
private async Task MarkRunFailedAsync(long runLogId, DateTime startedAt, string message)
{
var finishedAt = DateTime.Now;
await _db.Ado.ExecuteCommandAsync(
"""
UPDATE mdp_transform_run_log
SET status='FAILED', end_time=@EndTime, duration_ms=@DurationMs,
error_message=@ErrorMessage, update_time=CURRENT_TIMESTAMP
WHERE id=@Id
""",
new SugarParameter("@EndTime", finishedAt),
new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds),
new SugarParameter("@ErrorMessage", message.Length > 2000 ? message[..2000] : message),
new SugarParameter("@Id", runLogId));
}
private static string NormalizeTriggerType(string? triggerType)
=> string.IsNullOrWhiteSpace(triggerType) ? "AUTO" : triggerType.Trim().ToUpperInvariant();
}
/// 采购收货单 MDP 同步转换结果。
public sealed class PurchaseReceiptMdpSyncResult
{
public long RunLogId { get; set; }
public string BatchId { get; set; } = string.Empty;
public int StdRows { get; set; }
}