using Admin.NET.Plugin.AiDOP.Infrastructure;
namespace Admin.NET.Plugin.AiDOP.ProcurementExecution;
///
/// S4 采购执行 MDP 转换:S4 专属 STG/STD + 消费 S3 共享 DWD,写入 dwd_s4_purchase_execution / dwd_po_trans 与 S4 KPI。
///
public class S4MdpSyncTransformService : ITransient
{
private const string JobCode = "S4_MDP_SYNC_TRANSFORM";
private readonly ISqlSugarClient _db;
public S4MdpSyncTransformService(ISqlSugarClient db)
{
_db = db;
}
public async Task RunFullAsync(CancellationToken cancellationToken = default, string triggerType = "AUTO")
{
cancellationToken.ThrowIfCancellationRequested();
await EnsureS4TablesAsync();
var now = DateTime.Now;
var batchId = $"S4_MDP_FULL_{now:yyyyMMddHHmmss}";
var runLogId = await InsertTransformRunLogAsync(batchId, now, triggerType);
var result = new S4MdpSyncTransformResult { BatchId = batchId, RunLogId = runLogId };
try
{
result.StageRows = await SyncStagingAsync(batchId, now, cancellationToken);
result.StandardRows = await TransformStandardAsync(batchId, now, cancellationToken);
result.DwdRows = await BuildDwdAsync(batchId, now, cancellationToken);
result.KpiRows = await BuildS4KpiValuesAsync(now, cancellationToken);
await MarkTransformRunSuccessAsync(runLogId, now, result);
return result;
}
catch (Exception ex)
{
await MarkTransformRunFailedAsync(runLogId, now, ex.Message);
throw;
}
}
private async Task EnsureS4TablesAsync()
{
const string ddl = """
CREATE TABLE IF NOT EXISTS mdp_stg_s4_iqc (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
tenant_id BIGINT NOT NULL DEFAULT 0,
source_system VARCHAR(50) NOT NULL DEFAULT 'AIDOP',
source_table VARCHAR(100) NOT NULL,
source_row_id VARCHAR(100) NOT NULL,
source_biz_key VARCHAR(200) NULL,
sync_batch_id VARCHAR(100) NOT NULL,
sync_time DATETIME NOT NULL,
process_status VARCHAR(20) NOT NULL DEFAULT 'PENDING',
raw_data JSON NOT NULL,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uk_mdp_stg_s4_iqc (tenant_id, source_table, source_row_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE IF NOT EXISTS mdp_stg_s4_shipment LIKE mdp_stg_s4_iqc;
CREATE TABLE IF NOT EXISTS mdp_stg_s4_return LIKE mdp_stg_s4_iqc;
CREATE TABLE IF NOT EXISTS mdp_stg_s4_shortage LIKE mdp_stg_s4_iqc;
CREATE TABLE IF NOT EXISTS mdp_std_s4_iqc (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
tenant_id BIGINT NOT NULL DEFAULT 0,
factory_id BIGINT NULL DEFAULT 1,
source_system VARCHAR(50) NOT NULL DEFAULT 'AIDOP',
po_no VARCHAR(50) NULL, po_line VARCHAR(50) NULL,
supplier_code VARCHAR(50) NULL, item_code VARCHAR(50) NULL,
receipt_qty DECIMAL(18,6) NULL DEFAULT 0,
sample_qty DECIMAL(18,6) NULL DEFAULT 0,
defect_qty DECIMAL(18,6) NULL DEFAULT 0,
qc_result VARCHAR(20) NULL, receipt_date DATETIME NULL,
source_biz_key VARCHAR(200) NULL,
sync_batch_id VARCHAR(100) NOT NULL, sync_time DATETIME NOT NULL,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uk_std_s4_iqc (tenant_id, source_biz_key)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE IF NOT EXISTS mdp_std_s4_shipment (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
tenant_id BIGINT NOT NULL DEFAULT 0,
factory_id BIGINT NULL DEFAULT 1,
source_system VARCHAR(50) NOT NULL DEFAULT 'AIDOP',
shipment_no VARCHAR(50) NULL, po_no VARCHAR(50) NULL, po_line VARCHAR(50) NULL,
supplier_code VARCHAR(50) NULL, item_code VARCHAR(50) NULL,
ship_qty DECIMAL(18,6) NULL DEFAULT 0, ship_date DATETIME NULL,
source_biz_key VARCHAR(200) NULL,
sync_batch_id VARCHAR(100) NOT NULL, sync_time DATETIME NOT NULL,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uk_std_s4_shipment (tenant_id, source_biz_key)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE IF NOT EXISTS mdp_std_s4_return (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
tenant_id BIGINT NOT NULL DEFAULT 0,
factory_id BIGINT NULL DEFAULT 1,
source_system VARCHAR(50) NOT NULL DEFAULT 'AIDOP',
po_no VARCHAR(50) NULL, po_line VARCHAR(50) NULL,
supplier_code VARCHAR(50) NULL, item_code VARCHAR(50) NULL,
return_qty DECIMAL(18,6) NULL DEFAULT 0,
return_reason VARCHAR(200) NULL, return_status VARCHAR(50) NULL,
source_biz_key VARCHAR(200) NULL,
sync_batch_id VARCHAR(100) NOT NULL, sync_time DATETIME NOT NULL,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uk_std_s4_return (tenant_id, source_biz_key)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE IF NOT EXISTS mdp_std_s4_shortage (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
tenant_id BIGINT NOT NULL DEFAULT 0,
factory_id BIGINT NULL DEFAULT 1,
source_system VARCHAR(50) NOT NULL DEFAULT 'AIDOP',
work_order VARCHAR(100) NULL, supplier_code VARCHAR(50) NULL, item_code VARCHAR(50) NULL,
shortage_qty DECIMAL(18,6) NULL DEFAULT 0, risk_level VARCHAR(20) NULL, need_date DATETIME NULL,
source_biz_key VARCHAR(200) NULL,
sync_batch_id VARCHAR(100) NOT NULL, sync_time DATETIME NOT NULL,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uk_std_s4_shortage (tenant_id, source_biz_key)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE IF NOT EXISTS dwd_s4_purchase_execution (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
tenant_id BIGINT NOT NULL, factory_id BIGINT NOT NULL DEFAULT 1, stat_date DATE NOT NULL,
po_no VARCHAR(50) NULL, po_line VARCHAR(50) NULL,
supplier_code VARCHAR(50) NULL, item_code VARCHAR(50) NULL,
order_qty DECIMAL(12,3) NULL DEFAULT 0, delivery_qty DECIMAL(12,3) NULL DEFAULT 0,
received_qty DECIMAL(12,3) NULL DEFAULT 0, returned_qty DECIMAL(12,3) NULL DEFAULT 0,
shortage_qty DECIMAL(12,3) NULL DEFAULT 0,
due_date DATE NULL, actual_arrival_date DATE NULL, risk_level VARCHAR(20) NULL,
source_system VARCHAR(20) NULL DEFAULT 'AIDOP',
sync_batch_id VARCHAR(100) NULL, sync_time DATETIME NULL,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uk_dwd_s4_pe (tenant_id, stat_date, po_no, po_line, item_code),
KEY idx_dwd_s4_pe_date (tenant_id, stat_date),
KEY idx_dwd_s4_pe_supplier (tenant_id, supplier_code)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
""";
foreach (var statement in ddl.Split(';', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries))
{
if (statement.Length > 10)
await _db.Ado.ExecuteCommandAsync(statement);
}
}
private async Task SyncStagingAsync(string batchId, DateTime now, CancellationToken cancellationToken)
{
var total = 0;
foreach (var entity in S4MdpEntityConfig.All)
{
cancellationToken.ThrowIfCancellationRequested();
total += await SyncOneEntityAsync(entity, batchId, now);
}
total += await CountSharedStagingRowsAsync();
return total;
}
private async Task CountSharedStagingRowsAsync()
{
try
{
return await _db.Ado.GetIntAsync(
"""
SELECT IFNULL(SUM(cnt), 0) FROM (
SELECT COUNT(1) AS cnt FROM mdp_stg_purchase_order
UNION ALL SELECT COUNT(1) FROM mdp_stg_delivery
UNION ALL SELECT COUNT(1) FROM mdp_stg_receipt
) t
""");
}
catch
{
return 0;
}
}
private async Task SyncOneEntityAsync(S4MdpEntityConfig entity, string batchId, DateTime now)
{
var entityRow = await _db.Ado.SqlQuerySingleAsync(
"SELECT id AS Id, entity_name AS EntityName FROM mdp_entity WHERE tenant_id=0 AND entity_code=@EntityCode LIMIT 1",
new SugarParameter("@EntityCode", entity.EntityCode));
if (entityRow == null)
throw Oops.Oh($"未找到 MDP 实体配置:{entity.EntityCode},请先执行 1.0.154.sql 或启动迁移。");
var tableExists = await _db.Ado.GetIntAsync(
"""
SELECT COUNT(1) FROM information_schema.TABLES
WHERE TABLE_SCHEMA=DATABASE() AND TABLE_NAME=@TableName
""",
new SugarParameter("@TableName", entity.SourceTable));
if (tableExists == 0)
{
if (entity.Optional)
return 0;
throw Oops.Oh($"未找到 S4 源表:{entity.SourceTable}(实体 {entity.EntityCode})");
}
var columns = await _db.Ado.SqlQueryAsync(
"""
SELECT COLUMN_NAME AS ColumnName
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA=DATABASE() AND TABLE_NAME=@TableName
ORDER BY ORDINAL_POSITION
""",
new SugarParameter("@TableName", entity.SourceTable));
if (columns.Count == 0)
throw Oops.Oh($"未找到源表字段:{entity.SourceTable}");
var names = columns.Select(u => u.ColumnName).ToList();
var tenantExpr = names.Any(u => string.Equals(u, "tenant_id", StringComparison.OrdinalIgnoreCase))
? $"IFNULL(s.`{FindColumn(names, "tenant_id")}`,0)"
: "0";
var sourceRowExpr = names.Any(u => string.Equals(u, entity.SourceRowIdExpression, StringComparison.OrdinalIgnoreCase))
? $"s.`{FindColumn(names, entity.SourceRowIdExpression)}`"
: entity.SourceRowIdExpression;
var rawDataExpr = BuildJsonObjectExpression(names);
var rowsRead = await _db.Ado.GetIntAsync($"SELECT COUNT(1) FROM `{entity.SourceTable}`");
var logId = await InsertSyncLogAsync(entityRow.Id, entityRow.EntityName, batchId, rowsRead);
var started = DateTime.Now;
try
{
var affected = await _db.Ado.ExecuteCommandAsync(
$"""
INSERT INTO `{entity.TargetTable}`
(tenant_id, source_system, source_table, source_row_id, source_biz_key, sync_batch_id, sync_time, process_status, raw_data)
SELECT
{tenantExpr},
'AIDOP',
@SourceTable,
CAST({sourceRowExpr} AS CHAR),
CAST(COALESCE({entity.SourceBizKeyExpression}, CAST({sourceRowExpr} AS CHAR)) AS CHAR),
@BatchId,
@Now,
'PENDING',
{rawDataExpr}
FROM `{entity.SourceTable}` s
ON DUPLICATE KEY UPDATE
source_row_id=VALUES(source_row_id),
sync_batch_id=VALUES(sync_batch_id),
sync_time=VALUES(sync_time),
process_status=VALUES(process_status),
raw_data=VALUES(raw_data),
update_time=CURRENT_TIMESTAMP
""",
new SugarParameter("@SourceTable", entity.SourceTable),
new SugarParameter("@BatchId", batchId),
new SugarParameter("@Now", now));
await MarkSyncLogSuccessAsync(logId, started, affected);
return rowsRead;
}
catch (Exception ex)
{
await MarkSyncLogFailedAsync(logId, started, ex.Message);
if (entity.Optional) return 0;
throw;
}
}
private async Task TransformStandardAsync(string batchId, DateTime now, CancellationToken cancellationToken)
{
var total = 0;
foreach (var command in BuildStandardCommands(batchId, now))
{
cancellationToken.ThrowIfCancellationRequested();
try
{
total += await _db.Ado.ExecuteCommandAsync(command.Sql, command.Parameters);
}
catch
{
// 标准层表可能尚未有贴源数据,跳过单条失败
}
}
total += await CountSharedStandardRowsAsync();
return total;
}
private async Task CountSharedStandardRowsAsync()
{
try
{
return await _db.Ado.GetIntAsync(
"""
SELECT IFNULL(SUM(cnt), 0) FROM (
SELECT COUNT(1) AS cnt FROM mdp_std_purchase_order
UNION ALL SELECT COUNT(1) FROM mdp_std_delivery_schedule
UNION ALL SELECT COUNT(1) FROM mdp_std_delivery_result
) t
""");
}
catch
{
return 0;
}
}
private IEnumerable BuildStandardCommands(string batchId, DateTime now)
{
yield return Cmd(
"""
INSERT INTO mdp_std_s4_iqc
(tenant_id, factory_id, source_system, po_no, po_line, supplier_code, item_code, receipt_qty, sample_qty, defect_qty, qc_result, receipt_date, source_biz_key, sync_batch_id, sync_time)
SELECT tenant_id, 1, 'AIDOP',
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.PurOrd')),
CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Line')) AS CHAR),
IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Supp')), ''),
IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ItemNum')), ''),
COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.QtyReceived')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.QtyReceived')) AS DECIMAL(18,6)) END, 0),
COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.SampleQty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.SampleQty')) AS DECIMAL(18,6)) END, 0),
COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.RejectQty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.RejectQty')) AS DECIMAL(18,6)) END, 0),
CASE WHEN COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.RejectQty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.RejectQty')) AS DECIMAL(18,6)) END, 0) > 0 THEN 'FAIL' ELSE 'PASS' END,
NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.RcptDate')), 'null'), ''),
source_biz_key, @BatchId, @Now
FROM mdp_stg_s4_iqc
WHERE source_table='PurOrdRctDetail'
ON DUPLICATE KEY UPDATE receipt_qty=VALUES(receipt_qty), sample_qty=VALUES(sample_qty), defect_qty=VALUES(defect_qty),
qc_result=VALUES(qc_result), receipt_date=VALUES(receipt_date), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
""", batchId, now);
yield return Cmd(
"""
INSERT INTO mdp_std_s4_shipment
(tenant_id, factory_id, source_system, shipment_no, po_no, po_line, supplier_code, item_code, ship_qty, ship_date, source_biz_key, sync_batch_id, sync_time)
SELECT tenant_id, 1, 'AIDOP',
COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.shddh')), source_row_id),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.po_bill')),
CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.po_billline')) AS CHAR),
IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.suppliercode')), ''),
IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.itemnum')), ''),
COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.sh_delivery_quantity')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.sh_delivery_quantity')) AS DECIMAL(18,6)) END, 0),
NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.updatetime')), 'null'), ''),
source_biz_key, @BatchId, @Now
FROM mdp_stg_s4_shipment
WHERE source_table='scm_shdzb'
ON DUPLICATE KEY UPDATE ship_qty=VALUES(ship_qty), ship_date=VALUES(ship_date), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
""", batchId, now);
yield return Cmd(
"""
INSERT INTO mdp_std_s4_return
(tenant_id, factory_id, source_system, po_no, po_line, supplier_code, item_code, return_qty, return_reason, return_status, source_biz_key, sync_batch_id, sync_time)
SELECT tenant_id, 1, 'AIDOP',
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ponumber')),
CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.poline')) AS CHAR),
IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.suppliercode')), ''),
IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.itemnum')), ''),
COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.returnqty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.returnqty')) AS DECIMAL(18,6)) END, 0),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.remark')),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.status')),
source_biz_key, @BatchId, @Now
FROM mdp_stg_s4_return
WHERE source_table='srm_polist_ds'
AND COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.returnqty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.returnqty')) AS DECIMAL(18,6)) END, 0) > 0
ON DUPLICATE KEY UPDATE return_qty=VALUES(return_qty), return_reason=VALUES(return_reason), return_status=VALUES(return_status),
sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
""", batchId, now);
yield return Cmd(
"""
INSERT INTO mdp_std_s4_shortage
(tenant_id, factory_id, source_system, work_order, supplier_code, item_code, shortage_qty, risk_level, need_date, source_biz_key, sync_batch_id, sync_time)
SELECT tenant_id, 1, 'AIDOP',
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.work_order')),
IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.supplier_code')), ''),
IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.component_item_code')), ''),
COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.shortage_qty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.shortage_qty')) AS DECIMAL(18,6)) END, 0),
IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.risk_level')), 'MEDIUM'),
NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.expected_supply_date')), 'null'), ''),
source_biz_key, @BatchId, @Now
FROM mdp_stg_s4_shortage
WHERE source_table='dwd_material_shortage'
AND COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.shortage_qty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.shortage_qty')) AS DECIMAL(18,6)) END, 0) > 0
ON DUPLICATE KEY UPDATE shortage_qty=VALUES(shortage_qty), risk_level=VALUES(risk_level), need_date=VALUES(need_date),
sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
""", batchId, now);
}
private async Task BuildDwdAsync(string batchId, DateTime now, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
var statDate = now.Date;
var total = 0;
try
{
await _db.Ado.ExecuteCommandAsync(
"DELETE FROM dwd_s4_purchase_execution WHERE stat_date=@StatDate",
new SugarParameter("@StatDate", statDate));
total += await _db.Ado.ExecuteCommandAsync(
"""
INSERT INTO dwd_s4_purchase_execution
(tenant_id, factory_id, stat_date, po_no, po_line, supplier_code, item_code,
order_qty, delivery_qty, received_qty, returned_qty, shortage_qty,
due_date, actual_arrival_date, risk_level, source_system, sync_batch_id, sync_time)
SELECT d.tenant_id, 1, @StatDate,
d.po_no, d.po_line, IFNULL(d.supplier_code,''), IFNULL(d.item_code,''),
IFNULL(d.order_qty,0), IFNULL(d.delivery_qty,0), IFNULL(d.receipt_qty,0),
IFNULL(ret.return_qty,0), IFNULL(sh.shortage_qty,0),
DATE(d.due_date), DATE(d.last_receipt_date),
CASE WHEN d.risk_level IN ('HIGH','MEDIUM','LOW') THEN LOWER(d.risk_level)
WHEN IFNULL(d.remaining_qty,0) > 0 THEN 'high'
WHEN IFNULL(d.receipt_qty,0) >= IFNULL(d.order_qty,0) AND IFNULL(d.order_qty,0) > 0 THEN 'low'
ELSE 'medium' END,
'AIDOP', @BatchId, @Now
FROM dwd_supplier_delivery d
LEFT JOIN (
SELECT tenant_id, po_no, po_line, SUM(IFNULL(return_qty,0)) AS return_qty
FROM mdp_std_s4_return
WHERE IFNULL(po_no,'') <> ''
GROUP BY tenant_id, po_no, po_line
) ret ON d.tenant_id=ret.tenant_id AND d.po_no=ret.po_no AND d.po_line=ret.po_line
LEFT JOIN (
SELECT tenant_id, supplier_code, item_code, SUM(IFNULL(shortage_qty,0)) AS shortage_qty
FROM mdp_std_s4_shortage
GROUP BY tenant_id, supplier_code, item_code
) sh ON d.tenant_id=sh.tenant_id AND IFNULL(d.supplier_code,'')=IFNULL(sh.supplier_code,'') AND IFNULL(d.item_code,'')=IFNULL(sh.item_code,'')
WHERE d.stat_date=@StatDate AND IFNULL(d.po_no,'') <> ''
ON DUPLICATE KEY UPDATE
order_qty=VALUES(order_qty), delivery_qty=VALUES(delivery_qty), received_qty=VALUES(received_qty),
returned_qty=VALUES(returned_qty), shortage_qty=VALUES(shortage_qty),
due_date=VALUES(due_date), actual_arrival_date=VALUES(actual_arrival_date),
risk_level=VALUES(risk_level), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
""",
new SugarParameter("@StatDate", statDate),
new SugarParameter("@BatchId", batchId),
new SugarParameter("@Now", now));
}
catch
{
// dwd_supplier_delivery 可能尚未由 S3 生成
}
try
{
await _db.Ado.ExecuteCommandAsync(
"DELETE FROM dwd_po_trans WHERE trans_date=@StatDate",
new SugarParameter("@StatDate", statDate));
total += await _db.Ado.ExecuteCommandAsync(
"""
INSERT INTO dwd_po_trans
(tenant_id, factory_id, po_no, po_line, supplier_code, item_code, order_qty, received_qty,
returned_qty, shortage_qty, due_date, actual_arrival_date, risk_level,
trans_date, source_system, sync_batch_id, sync_time)
SELECT tenant_id, factory_id, po_no, po_line, supplier_code, item_code,
order_qty, received_qty, returned_qty, shortage_qty,
due_date, actual_arrival_date, risk_level,
stat_date, source_system, sync_batch_id, sync_time
FROM dwd_s4_purchase_execution
WHERE stat_date=@StatDate
""",
new SugarParameter("@StatDate", statDate));
}
catch
{
try
{
total += await _db.Ado.ExecuteCommandAsync(
"""
INSERT INTO dwd_po_trans
(tenant_id, po_no, supplier_code, item_code, order_qty, received_qty, trans_date, source_system, sync_time)
SELECT po.tenant_id, po.po_no, IFNULL(po.supplier_code,''), IFNULL(po.item_code,''),
IFNULL(po.order_qty,0), IFNULL(po.received_qty,0), @StatDate, 'AIDOP', @Now
FROM mdp_std_purchase_order po
WHERE IFNULL(po.po_no,'') <> ''
""",
new SugarParameter("@StatDate", statDate),
new SugarParameter("@Now", now));
}
catch
{
// ignore
}
}
try
{
total += await _db.Ado.ExecuteCommandAsync(
"""
INSERT INTO dwd_qc_trans
(tenant_id, item_code, supplier_code, batch_no, sample_qty, defect_qty, result, trans_date, source_system, sync_time)
SELECT tenant_id, IFNULL(item_code,''), IFNULL(supplier_code,''), source_biz_key,
CAST(IFNULL(sample_qty,0) AS SIGNED), CAST(IFNULL(defect_qty,0) AS SIGNED),
CASE WHEN qc_result='FAIL' THEN 'FAIL' WHEN qc_result='CONCESSION' THEN 'CONCESSION' ELSE 'PASS' END,
@StatDate, 'AIDOP', @Now
FROM mdp_std_s4_iqc
WHERE IFNULL(item_code,'') <> ''
ON DUPLICATE KEY UPDATE sample_qty=VALUES(sample_qty), defect_qty=VALUES(defect_qty), result=VALUES(result), sync_time=VALUES(sync_time)
""",
new SugarParameter("@StatDate", statDate),
new SugarParameter("@Now", now));
}
catch
{
// dwd_qc_trans 可能无唯一键,忽略
}
return total;
}
private async Task BuildS4KpiValuesAsync(DateTime now, CancellationToken cancellationToken)
{
var statDate = now.Date;
var rows = await CalculateS4KpiValuesAsync(statDate);
var affected = 0;
foreach (var row in rows)
{
cancellationToken.ThrowIfCancellationRequested();
affected += await UpsertS4KpiValueAsync(row, statDate, now);
}
return affected;
}
private async Task> CalculateS4KpiValuesAsync(DateTime statDate)
{
try
{
return await _db.Ado.SqlQueryAsync(
"""
SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S4_L1_001' AS MetricCode,
ROUND(AVG(CASE WHEN ds.request_date IS NOT NULL AND COALESCE(ds.submit_date, ds.last_sent_date) IS NOT NULL
THEN TIMESTAMPDIFF(DAY, ds.request_date, COALESCE(ds.submit_date, ds.last_sent_date)) END), 4) AS MetricValue
FROM mdp_std_delivery_schedule ds
WHERE ds.request_date IS NOT NULL
GROUP BY tenant_id
HAVING MetricValue IS NOT NULL
UNION ALL
SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S4_L1_002' AS MetricCode,
ROUND(100 * SUM(CASE WHEN IFNULL(d.receipt_qty,0) >= IFNULL(d.order_qty,0) AND IFNULL(d.order_qty,0) > 0 THEN 1
WHEN d.delivery_status='COMPLETED' THEN 1 ELSE 0 END) / NULLIF(COUNT(1), 0), 4) AS MetricValue
FROM dwd_supplier_delivery d
WHERE d.stat_date=@StatDate AND IFNULL(d.order_qty,0) > 0
GROUP BY tenant_id
UNION ALL
SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S4_L1_003' AS MetricCode,
ROUND(SUM(IFNULL(d.receipt_qty,0)) / GREATEST(COUNT(DISTINCT NULLIF(d.supplier_code,'')), 1), 4) AS MetricValue
FROM dwd_supplier_delivery d
WHERE d.stat_date=@StatDate
GROUP BY tenant_id
UNION ALL
SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S4_L1_004' AS MetricCode,
ROUND(AVG(CASE WHEN IFNULL(d.order_qty,0) > 0
THEN (IFNULL(d.remaining_qty,0) / d.order_qty) * 30 END), 4) AS MetricValue
FROM dwd_supplier_delivery d
WHERE d.stat_date=@StatDate AND IFNULL(d.order_qty,0) > 0
GROUP BY tenant_id
UNION ALL
SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S4_L2_001' AS MetricCode,
ROUND(AVG(CASE WHEN ds.request_date IS NOT NULL AND COALESCE(ds.submit_date, ds.last_sent_date) IS NOT NULL
THEN TIMESTAMPDIFF(DAY, ds.request_date, COALESCE(ds.submit_date, ds.last_sent_date)) END), 4) AS MetricValue
FROM mdp_std_delivery_schedule ds
WHERE ds.request_date IS NOT NULL
GROUP BY tenant_id
HAVING MetricValue IS NOT NULL
UNION ALL
SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S4_L2_002' AS MetricCode,
ROUND(100 * SUM(CASE WHEN IFNULL(d.receipt_qty,0) >= IFNULL(d.order_qty,0) AND IFNULL(d.order_qty,0) > 0 THEN 1
WHEN d.delivery_status='COMPLETED' THEN 1 ELSE 0 END) / NULLIF(COUNT(1), 0), 4) AS MetricValue
FROM dwd_supplier_delivery d
WHERE d.stat_date=@StatDate AND IFNULL(d.order_qty,0) > 0
GROUP BY tenant_id
UNION ALL
SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S4_L2_003' AS MetricCode,
ROUND(SUM(IFNULL(pe.received_qty,0)) / GREATEST(COUNT(DISTINCT NULLIF(pe.item_code,'')), 1), 4) AS MetricValue
FROM dwd_s4_purchase_execution pe
WHERE pe.stat_date=@StatDate
GROUP BY tenant_id
UNION ALL
SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S4_L2_004' AS MetricCode,
ROUND(AVG(CASE WHEN IFNULL(pe.order_qty,0) > 0
THEN ((IFNULL(pe.order_qty,0) - IFNULL(pe.received_qty,0)) / pe.order_qty) * 30 END), 4) AS MetricValue
FROM dwd_s4_purchase_execution pe
WHERE pe.stat_date=@StatDate AND IFNULL(pe.order_qty,0) > 0
GROUP BY tenant_id
UNION ALL
SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S4_L3_001' AS MetricCode,
ROUND(AVG(CASE WHEN ds.request_date IS NOT NULL AND COALESCE(ds.submit_date, ds.last_sent_date) IS NOT NULL
AND IFNULL(ds.supplier_code,'') <> ''
THEN TIMESTAMPDIFF(DAY, ds.request_date, COALESCE(ds.submit_date, ds.last_sent_date)) END), 4) AS MetricValue
FROM mdp_std_delivery_schedule ds
WHERE ds.request_date IS NOT NULL AND IFNULL(ds.supplier_code,'') <> ''
GROUP BY tenant_id
HAVING MetricValue IS NOT NULL
UNION ALL
SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S4_L3_002' AS MetricCode,
ROUND(100 * SUM(CASE WHEN IFNULL(d.receipt_qty,0) >= IFNULL(d.order_qty,0) AND IFNULL(d.order_qty,0) > 0 THEN 1
WHEN d.delivery_status='COMPLETED' THEN 1 ELSE 0 END) / NULLIF(COUNT(1), 0), 4) AS MetricValue
FROM dwd_supplier_delivery d
WHERE d.stat_date=@StatDate AND IFNULL(d.supplier_code,'') <> '' AND IFNULL(d.order_qty,0) > 0
GROUP BY tenant_id
UNION ALL
SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S4_L3_003' AS MetricCode,
ROUND(COUNT(DISTINCT NULLIF(pe.supplier_code,'')) / GREATEST(COUNT(DISTINCT NULLIF(pe.po_no,'')), 1), 4) AS MetricValue
FROM dwd_s4_purchase_execution pe
WHERE pe.stat_date=@StatDate AND IFNULL(pe.supplier_code,'') <> ''
GROUP BY tenant_id
UNION ALL
SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S4_L3_004' AS MetricCode,
ROUND(AVG(CASE WHEN IFNULL(pe.order_qty,0) > 0 AND IFNULL(pe.supplier_code,'') <> ''
THEN ((IFNULL(pe.order_qty,0) - IFNULL(pe.received_qty,0)) / pe.order_qty) * 30 END), 4) AS MetricValue
FROM dwd_s4_purchase_execution pe
WHERE pe.stat_date=@StatDate AND IFNULL(pe.order_qty,0) > 0 AND IFNULL(pe.supplier_code,'') <> ''
GROUP BY tenant_id
""",
new SugarParameter("@StatDate", statDate));
}
catch
{
return new List();
}
}
private async Task UpsertS4KpiValueAsync(S4KpiCalcRow row, DateTime statDate, DateTime now)
{
var meta = await _db.Ado.SqlQuerySingleAsync(
"""
SELECT MetricLevel, Direction, YellowThreshold, RedThreshold
FROM ado_smart_ops_kpi_master
WHERE TenantId=@TenantId AND ModuleCode='S4' AND MetricCode=@MetricCode AND IsEnabled=1
LIMIT 1
""",
new SugarParameter("@TenantId", row.TenantId),
new SugarParameter("@MetricCode", row.MetricCode));
if (meta == null || row.MetricValue == null)
return 0;
var table = ResolveKpiValueTable(meta.MetricLevel);
var current = await _db.Ado.SqlQuerySingleAsync(
$"""
SELECT id AS Id, metric_value AS MetricValue, target_value AS TargetValue
FROM {table}
WHERE tenant_id=@TenantId AND factory_id=@FactoryId AND module_code='S4'
AND metric_code=@MetricCode AND biz_date=@BizDate AND is_deleted=0
ORDER BY id
LIMIT 1
""",
new SugarParameter("@TenantId", row.TenantId),
new SugarParameter("@FactoryId", row.FactoryId),
new SugarParameter("@MetricCode", row.MetricCode),
new SugarParameter("@BizDate", statDate));
var prior = await _db.Ado.SqlQuerySingleAsync(
$"""
SELECT id AS Id, metric_value AS MetricValue, target_value AS TargetValue
FROM {table}
WHERE tenant_id=@TenantId AND factory_id=@FactoryId AND module_code='S4'
AND metric_code=@MetricCode AND biz_date<@BizDate AND is_deleted=0
ORDER BY biz_date DESC, id DESC
LIMIT 1
""",
new SugarParameter("@TenantId", row.TenantId),
new SugarParameter("@FactoryId", row.FactoryId),
new SugarParameter("@MetricCode", row.MetricCode),
new SugarParameter("@BizDate", statDate));
var actual = Math.Round(row.MetricValue.Value, 4);
var target = current?.TargetValue ?? prior?.TargetValue ?? DefaultS4Target(row.MetricCode);
var status = ResolveKpiStatus(actual, target, meta.Direction, meta.YellowThreshold, meta.RedThreshold);
var trend = ResolveTrendFlag(actual, prior?.MetricValue);
if (current != null)
{
return await _db.Ado.ExecuteCommandAsync(
$"""
UPDATE {table}
SET metric_value=@MetricValue, target_value=@TargetValue, status_color=@StatusColor, trend_flag=@TrendFlag,
is_active=1, status='ACTIVE', calc_time=@CalcTime, update_time=@CalcTime
WHERE tenant_id=@TenantId AND factory_id=@FactoryId AND module_code='S4'
AND metric_code=@MetricCode AND biz_date=@BizDate AND is_deleted=0
""",
new SugarParameter("@MetricValue", actual),
new SugarParameter("@TargetValue", target),
new SugarParameter("@StatusColor", status),
new SugarParameter("@TrendFlag", trend),
new SugarParameter("@CalcTime", now),
new SugarParameter("@TenantId", row.TenantId),
new SugarParameter("@FactoryId", row.FactoryId),
new SugarParameter("@MetricCode", row.MetricCode),
new SugarParameter("@BizDate", statDate));
}
var nextId = await _db.Ado.GetLongAsync($"SELECT COALESCE(MAX(id), 0) + 1 FROM {table}");
return await _db.Ado.ExecuteCommandAsync(
$"""
INSERT INTO {table}
(id, tenant_id, factory_id, status, biz_date, create_time, update_time, is_deleted, is_active,
module_code, metric_code, metric_value, target_value, status_color, trend_flag, calc_time)
VALUES
(@Id, @TenantId, @FactoryId, 'ACTIVE', @BizDate, @CalcTime, @CalcTime, 0, 1,
'S4', @MetricCode, @MetricValue, @TargetValue, @StatusColor, @TrendFlag, @CalcTime)
""",
new SugarParameter("@Id", nextId),
new SugarParameter("@TenantId", row.TenantId),
new SugarParameter("@FactoryId", row.FactoryId),
new SugarParameter("@BizDate", statDate),
new SugarParameter("@CalcTime", now),
new SugarParameter("@MetricCode", row.MetricCode),
new SugarParameter("@MetricValue", actual),
new SugarParameter("@TargetValue", target),
new SugarParameter("@StatusColor", status),
new SugarParameter("@TrendFlag", trend));
}
private async Task InsertSyncLogAsync(long entityId, string entityName, string batchId, int rowsRead)
{
await _db.Ado.ExecuteCommandAsync(
"""
INSERT INTO mdp_sync_log
(tenant_id, entity_id, source_code, entity_name, sync_batch_id, sync_type, trigger_type, sync_start, rows_read, status)
VALUES (0, @EntityId, 'AIDOPDEV_MYSQL', @EntityName, @BatchId, 'FULL', 'AUTO', NOW(), @RowsRead, 'RUNNING')
""",
new SugarParameter("@EntityId", entityId),
new SugarParameter("@EntityName", entityName),
new SugarParameter("@BatchId", batchId),
new SugarParameter("@RowsRead", rowsRead));
return await _db.Ado.GetLongAsync(
"SELECT id FROM mdp_sync_log WHERE sync_batch_id=@BatchId AND entity_id=@EntityId ORDER BY id DESC LIMIT 1",
new List { new("@BatchId", batchId), new("@EntityId", entityId) });
}
private async Task MarkSyncLogSuccessAsync(long logId, DateTime started, int affected)
{
await _db.Ado.ExecuteCommandAsync(
"""
UPDATE mdp_sync_log
SET status='SUCCESS', sync_end=NOW(), duration_ms=@DurationMs, rows_written=@RowsWritten
WHERE id=@Id
""",
new SugarParameter("@DurationMs", (int)(DateTime.Now - started).TotalMilliseconds),
new SugarParameter("@RowsWritten", affected),
new SugarParameter("@Id", logId));
}
private async Task MarkSyncLogFailedAsync(long logId, DateTime started, string message)
{
await _db.Ado.ExecuteCommandAsync(
"""
UPDATE mdp_sync_log
SET status='FAILED', sync_end=NOW(), duration_ms=@DurationMs, error_message=@ErrorMessage
WHERE id=@Id
""",
new SugarParameter("@DurationMs", (int)(DateTime.Now - started).TotalMilliseconds),
new SugarParameter("@ErrorMessage", Truncate(message, 2000)),
new SugarParameter("@Id", logId));
}
private async Task InsertTransformRunLogAsync(string batchId, DateTime startedAt, string triggerType)
{
await _db.Ado.ExecuteCommandAsync(
"""
INSERT INTO mdp_transform_run_log
(tenant_id, job_code, job_name, trigger_type, batch_id, status, start_time)
VALUES (0, @JobCode, 'S4 MDP同步与标准化转换', @TriggerType, @BatchId, 'RUNNING', @StartTime)
""",
new SugarParameter("@JobCode", JobCode),
new SugarParameter("@TriggerType", NormalizeTriggerType(triggerType)),
new SugarParameter("@BatchId", batchId),
new SugarParameter("@StartTime", startedAt));
return await _db.Ado.GetLongAsync(
"SELECT id FROM mdp_transform_run_log WHERE batch_id=@BatchId ORDER BY id DESC LIMIT 1",
new List { new("@BatchId", batchId) });
}
private async Task MarkTransformRunSuccessAsync(long runLogId, DateTime startedAt, S4MdpSyncTransformResult result)
{
var finishedAt = DateTime.Now;
await _db.Ado.ExecuteCommandAsync(
"""
UPDATE mdp_transform_run_log
SET status='SUCCESS', end_time=@EndTime, duration_ms=@DurationMs,
stage_rows=@StageRows, standard_rows=@StandardRows, dwd_rows=@DwdRows,
summary_json=@SummaryJson, update_time=CURRENT_TIMESTAMP
WHERE id=@Id
""",
new SugarParameter("@EndTime", finishedAt),
new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds),
new SugarParameter("@StageRows", result.StageRows),
new SugarParameter("@StandardRows", result.StandardRows),
new SugarParameter("@DwdRows", result.DwdRows),
new SugarParameter("@SummaryJson", BuildRunSummaryJson(result)),
new SugarParameter("@Id", runLogId));
}
private async Task MarkTransformRunFailedAsync(long runLogId, DateTime startedAt, string message)
{
try
{
var finishedAt = DateTime.Now;
await _db.Ado.ExecuteCommandAsync(
"""
UPDATE mdp_transform_run_log
SET status='FAILED', end_time=@EndTime, duration_ms=@DurationMs,
error_message=@ErrorMessage, update_time=CURRENT_TIMESTAMP
WHERE id=@Id
""",
new SugarParameter("@EndTime", finishedAt),
new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds),
new SugarParameter("@ErrorMessage", Truncate(message, 2000)),
new SugarParameter("@Id", runLogId));
}
catch (Exception ex)
{
Console.Error.WriteLine($"[S4MdpSyncTransform] MarkTransformRunFailed write failed (runLogId={runLogId}): {ex.Message}");
}
}
private static S4MdpSqlCommand Cmd(string sql, string batchId, DateTime now) =>
new(sql, new[]
{
new SugarParameter("@BatchId", batchId),
new SugarParameter("@Now", now),
new SugarParameter("@StatDate", now.Date)
});
private static string BuildJsonObjectExpression(IEnumerable columns)
{
var parts = columns.SelectMany(c => new[] { $"'{c.Replace("'", "''")}'", $"s.`{c}`" });
return $"JSON_OBJECT({string.Join(",", parts)})";
}
private static string FindColumn(IEnumerable columns, string expected) =>
columns.First(u => string.Equals(u, expected, StringComparison.OrdinalIgnoreCase));
private static string BuildRunSummaryJson(S4MdpSyncTransformResult result) =>
$$"""{"batchId":"{{result.BatchId}}","stageRows":{{result.StageRows}},"standardRows":{{result.StandardRows}},"dwdRows":{{result.DwdRows}},"kpiRows":{{result.KpiRows}}}""";
private static string ResolveKpiValueTable(int metricLevel) => metricLevel switch
{
1 => "ado_s9_kpi_value_l1_day",
2 => "ado_s9_kpi_value_l2_day",
3 => "ado_s9_kpi_value_l3_day",
4 => "ado_s9_kpi_value_l4_day",
_ => "ado_s9_kpi_value_l2_day"
};
private static decimal DefaultS4Target(string metricCode) => metricCode switch
{
"S4_L1_001" or "S4_L2_001" or "S4_L3_001" => 10.26m,
"S4_L1_002" or "S4_L2_002" or "S4_L3_002" => 99m,
"S4_L1_003" or "S4_L2_003" or "S4_L3_003" => 250m,
"S4_L1_004" or "S4_L2_004" or "S4_L3_004" => 45m,
_ => 0m
};
private static string ResolveKpiStatus(decimal actual, decimal target, string? direction, decimal? yellowThreshold, decimal? redThreshold)
{
if (target <= 0) return "gray";
var ratio = actual / target * 100m;
if (string.Equals(direction, "lower_is_better", StringComparison.OrdinalIgnoreCase))
{
if (actual <= target) return "green";
if (ratio <= (yellowThreshold ?? 110m)) return "yellow";
return ratio >= (redThreshold ?? 120m) ? "red" : "yellow";
}
if (actual >= target) return "green";
if (ratio >= (yellowThreshold ?? 95m)) return "yellow";
return ratio <= (redThreshold ?? 80m) ? "red" : "yellow";
}
private static string ResolveTrendFlag(decimal actual, decimal? previous)
{
if (previous == null) return "flat";
if (actual > previous.Value) return "up";
if (actual < previous.Value) return "down";
return "flat";
}
private static string NormalizeTriggerType(string? triggerType)
=> string.IsNullOrWhiteSpace(triggerType) ? "AUTO" : triggerType.Trim().ToUpperInvariant();
private static string Truncate(string? raw, int maxLength)
{
if (string.IsNullOrEmpty(raw)) return string.Empty;
return raw.Length <= maxLength ? raw : raw[..maxLength];
}
private sealed class S4ColumnRow
{
public string ColumnName { get; set; } = string.Empty;
}
private sealed class S4MdpEntityRow
{
public long Id { get; set; }
public string EntityName { get; set; } = string.Empty;
}
private sealed class S4KpiCalcRow
{
public long TenantId { get; set; }
public long FactoryId { get; set; }
public string MetricCode { get; set; } = string.Empty;
public decimal? MetricValue { get; set; }
}
private sealed class S4KpiMetaRow
{
public int MetricLevel { get; set; }
public string Direction { get; set; } = "higher_is_better";
public decimal? YellowThreshold { get; set; }
public decimal? RedThreshold { get; set; }
}
private sealed class S4KpiValueRow
{
public long Id { get; set; }
public decimal? MetricValue { get; set; }
public decimal? TargetValue { get; set; }
}
}
public sealed class S4MdpSyncTransformResult
{
public long RunLogId { get; set; }
public string BatchId { get; set; } = string.Empty;
public int StageRows { get; set; }
public int StandardRows { get; set; }
public int DwdRows { get; set; }
public int KpiRows { get; set; }
}
internal sealed record S4MdpSqlCommand(string Sql, SugarParameter[] Parameters);
internal sealed record S4MdpEntityConfig(
string EntityCode,
string SourceTable,
string TargetTable,
string SourceRowIdExpression,
string SourceBizKeyExpression,
bool Optional = false)
{
public static readonly IReadOnlyList All = new List
{
new("S4_IQC_RECEIPT", "PurOrdRctDetail", "mdp_stg_s4_iqc", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`Receiver`,''), ':', IFNULL(s.`Line`,''))"),
new("S4_SHIPMENT_EXEC", "scm_shdzb", "mdp_stg_s4_shipment", "id", "CONCAT(IFNULL(s.`glid`,''), ':', IFNULL(s.`id`,''))"),
new("S4_RETURN_EXEC", "srm_polist_ds", "mdp_stg_s4_return", "Id", "s.`dsnum`"),
new("S4_SHORTAGE_EXEC", "dwd_material_shortage", "mdp_stg_s4_shortage", "id", "CONCAT(IFNULL(s.`work_order`,''), ':', IFNULL(s.`component_item_code`,''))", Optional: true)
};
}