namespace Admin.NET.Plugin.AiDOP.Supply;
///
/// S3 首批 MDP 同步和标准化转换服务。
///
public class S3MdpSyncTransformService : ITransient
{
private readonly ISqlSugarClient _db;
public S3MdpSyncTransformService(ISqlSugarClient db)
{
_db = db;
}
public async Task RunFullAsync(CancellationToken cancellationToken = default, string triggerType = "AUTO")
{
cancellationToken.ThrowIfCancellationRequested();
var now = DateTime.Now;
var batchId = $"S3_MDP_FULL_{now:yyyyMMddHHmmss}";
var runLogId = await InsertTransformRunLogAsync(batchId, now, triggerType);
var result = new S3MdpSyncTransformResult { BatchId = batchId, RunLogId = runLogId };
try
{
result.StageRows = await SyncStagingAsync(batchId, now, cancellationToken);
result.StandardRows = await TransformStandardAsync(batchId, now, cancellationToken);
result.DwdRows = await BuildDwdAsync(batchId, now, cancellationToken);
result.KpiRows = await BuildS3KpiValuesAsync(now, cancellationToken);
await MarkTransformRunSuccessAsync(runLogId, now, result);
return result;
}
catch (Exception ex)
{
await MarkTransformRunFailedAsync(runLogId, now, ex.Message);
throw;
}
}
public async Task RefreshMaterialReadinessAsync(string? batchId = null, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
var now = DateTime.Now;
batchId ??= $"S3_MATERIAL_{now:yyyyMMddHHmmss}";
_db.Ado.BeginTran();
try
{
var stdCount = await _db.Ado.GetIntAsync("SELECT COUNT(1) FROM mdp_std_material_readiness WHERE IFNULL(work_order, '') <> ''");
var readinessCount = await UpsertMaterialReadinessDwdAsync(batchId, now);
await DeleteCurrentShortageAsync(now.Date);
var shortageCount = await InsertMaterialShortageAsync(batchId, now);
_db.Ado.CommitTran();
return new S3MaterialRefreshResult
{
BatchId = batchId,
StdCount = stdCount,
ReadinessCount = readinessCount,
ShortageCount = shortageCount
};
}
catch
{
_db.Ado.RollbackTran();
throw;
}
}
private async Task SyncStagingAsync(string batchId, DateTime now, CancellationToken cancellationToken)
{
var total = 0;
foreach (var entity in S3MdpEntityConfig.All)
{
cancellationToken.ThrowIfCancellationRequested();
total += await SyncOneEntityAsync(entity, batchId, now);
}
return total;
}
private async Task SyncOneEntityAsync(S3MdpEntityConfig entity, string batchId, DateTime now)
{
var entityRow = await _db.Ado.SqlQuerySingleAsync(
"SELECT id AS Id, entity_name AS EntityName FROM mdp_entity WHERE tenant_id=0 AND entity_code=@EntityCode LIMIT 1",
new SugarParameter("@EntityCode", entity.EntityCode));
if (entityRow == null) throw Oops.Oh($"未找到 MDP 实体配置:{entity.EntityCode}");
var columns = await _db.Ado.SqlQueryAsync(
"""
SELECT COLUMN_NAME AS ColumnName
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA=DATABASE() AND TABLE_NAME=@TableName
ORDER BY ORDINAL_POSITION
""",
new SugarParameter("@TableName", entity.SourceTable));
if (columns.Count == 0) throw Oops.Oh($"未找到源表:{entity.SourceTable}");
var names = columns.Select(u => u.ColumnName).ToList();
var tenantExpr = names.Any(u => string.Equals(u, "tenant_id", StringComparison.OrdinalIgnoreCase))
? $"IFNULL(s.`{FindColumn(names, "tenant_id")}`,0)"
: "0";
var sourceRowExpr = names.Any(u => string.Equals(u, entity.SourceRowIdExpression, StringComparison.OrdinalIgnoreCase))
? $"s.`{FindColumn(names, entity.SourceRowIdExpression)}`"
: entity.SourceRowIdExpression;
var rawDataExpr = BuildJsonObjectExpression(names);
var rowsRead = await _db.Ado.GetIntAsync($"SELECT COUNT(1) FROM `{entity.SourceTable}`");
var logId = await InsertSyncLogAsync(entityRow.Id, entityRow.EntityName, batchId, rowsRead);
var started = DateTime.Now;
try
{
var affected = await _db.Ado.ExecuteCommandAsync(
$"""
INSERT INTO `{entity.TargetTable}`
(tenant_id, source_system, source_table, source_row_id, source_biz_key, sync_batch_id, sync_time, process_status, raw_data)
SELECT
{tenantExpr},
'AIDOP',
@SourceTable,
CAST({sourceRowExpr} AS CHAR),
CAST(COALESCE({entity.SourceBizKeyExpression}, CAST({sourceRowExpr} AS CHAR)) AS CHAR),
@BatchId,
@Now,
'PENDING',
{rawDataExpr}
FROM `{entity.SourceTable}` s
ON DUPLICATE KEY UPDATE
source_row_id=VALUES(source_row_id),
sync_batch_id=VALUES(sync_batch_id),
sync_time=VALUES(sync_time),
process_status=VALUES(process_status),
raw_data=VALUES(raw_data),
update_time=CURRENT_TIMESTAMP
""",
new SugarParameter("@SourceTable", entity.SourceTable),
new SugarParameter("@BatchId", batchId),
new SugarParameter("@Now", now));
await MarkSyncLogSuccessAsync(logId, started, affected);
return rowsRead;
}
catch (Exception ex)
{
await MarkSyncLogFailedAsync(logId, started, ex.Message);
throw;
}
}
private async Task TransformStandardAsync(string batchId, DateTime now, CancellationToken cancellationToken)
{
var total = 0;
foreach (var command in BuildStandardCommands(batchId, now))
{
cancellationToken.ThrowIfCancellationRequested();
total += await _db.Ado.ExecuteCommandAsync(command.Sql, command.Parameters);
}
return total;
}
private async Task BuildDwdAsync(string batchId, DateTime now, CancellationToken cancellationToken)
{
var total = 0;
foreach (var command in BuildDwdCommands(batchId, now))
{
cancellationToken.ThrowIfCancellationRequested();
total += await _db.Ado.ExecuteCommandAsync(command.Sql, command.Parameters);
}
total += await UpsertMaterialReadinessDwdAsync(batchId, now);
await DeleteCurrentShortageAsync(now.Date);
total += await InsertMaterialShortageAsync(batchId, now);
return total;
}
private async Task BuildS3KpiValuesAsync(DateTime now, CancellationToken cancellationToken)
{
var statDate = now.Date;
var rows = await CalculateS3KpiValuesAsync(statDate);
var affected = 0;
foreach (var row in rows)
{
cancellationToken.ThrowIfCancellationRequested();
affected += await UpsertS3KpiValueAsync(row, statDate, now);
}
return affected;
}
private async Task> CalculateS3KpiValuesAsync(DateTime statDate)
{
return await _db.Ado.SqlQueryAsync(
"""
SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S3_L2_004' AS MetricCode,
ROUND(AVG(TIMESTAMPDIFF(HOUR, request_date, submit_date) / 24), 4) AS MetricValue
FROM mdp_std_delivery_schedule
WHERE request_date IS NOT NULL AND submit_date IS NOT NULL AND submit_date >= request_date
GROUP BY tenant_id
UNION ALL
SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S3_L2_005' AS MetricCode,
ROUND(100 * SUM(CASE WHEN IFNULL(schedule_qty,0) >= IFNULL(order_qty,0) AND IFNULL(order_qty,0) > 0 THEN 1 ELSE 0 END) / COUNT(1), 4) AS MetricValue
FROM dwd_supplier_delivery
WHERE stat_date=@StatDate AND IFNULL(order_qty,0) > 0
GROUP BY tenant_id
UNION ALL
SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S3_L1_001' AS MetricCode,
ROUND(AVG(TIMESTAMPDIFF(HOUR, request_date, submit_date) / 24), 4) AS MetricValue
FROM mdp_std_delivery_schedule
WHERE request_date IS NOT NULL AND submit_date IS NOT NULL AND submit_date >= request_date
GROUP BY tenant_id
UNION ALL
SELECT tenant_id AS TenantId, 1 AS FactoryId, 'S3_L1_002' AS MetricCode,
ROUND(100 * SUM(CASE WHEN IFNULL(schedule_qty,0) >= IFNULL(order_qty,0) AND IFNULL(order_qty,0) > 0 THEN 1 ELSE 0 END) / COUNT(1), 4) AS MetricValue
FROM dwd_supplier_delivery
WHERE stat_date=@StatDate AND IFNULL(order_qty,0) > 0
GROUP BY tenant_id
""",
new SugarParameter("@StatDate", statDate));
}
private async Task UpsertS3KpiValueAsync(S3KpiCalcRow row, DateTime statDate, DateTime now)
{
var meta = await _db.Ado.SqlQuerySingleAsync(
"""
SELECT MetricLevel, Direction, YellowThreshold, RedThreshold
FROM ado_smart_ops_kpi_master
WHERE TenantId=@TenantId AND ModuleCode='S3' AND MetricCode=@MetricCode AND IsEnabled=1
LIMIT 1
""",
new SugarParameter("@TenantId", row.TenantId),
new SugarParameter("@MetricCode", row.MetricCode));
if (meta == null || row.MetricValue == null)
return 0;
var table = ResolveKpiValueTable(meta.MetricLevel);
var current = await _db.Ado.SqlQuerySingleAsync(
$"""
SELECT id AS Id, metric_value AS MetricValue, target_value AS TargetValue
FROM {table}
WHERE tenant_id=@TenantId AND factory_id=@FactoryId AND module_code='S3'
AND metric_code=@MetricCode AND biz_date=@BizDate AND is_deleted=0
ORDER BY id
LIMIT 1
""",
new SugarParameter("@TenantId", row.TenantId),
new SugarParameter("@FactoryId", row.FactoryId),
new SugarParameter("@MetricCode", row.MetricCode),
new SugarParameter("@BizDate", statDate));
var prior = await _db.Ado.SqlQuerySingleAsync(
$"""
SELECT id AS Id, metric_value AS MetricValue, target_value AS TargetValue
FROM {table}
WHERE tenant_id=@TenantId AND factory_id=@FactoryId AND module_code='S3'
AND metric_code=@MetricCode AND biz_date<@BizDate AND is_deleted=0
ORDER BY biz_date DESC, id DESC
LIMIT 1
""",
new SugarParameter("@TenantId", row.TenantId),
new SugarParameter("@FactoryId", row.FactoryId),
new SugarParameter("@MetricCode", row.MetricCode),
new SugarParameter("@BizDate", statDate));
var actual = Math.Round(row.MetricValue.Value, 4);
var target = current?.TargetValue ?? prior?.TargetValue ?? DefaultS3Target(row.MetricCode);
var status = ResolveKpiStatus(actual, target, meta.Direction, meta.YellowThreshold, meta.RedThreshold);
var trend = ResolveTrendFlag(actual, prior?.MetricValue);
if (current != null)
{
return await _db.Ado.ExecuteCommandAsync(
$"""
UPDATE {table}
SET metric_value=@MetricValue, target_value=@TargetValue, status_color=@StatusColor, trend_flag=@TrendFlag,
is_active=1, status='ACTIVE', calc_time=@CalcTime, update_time=@CalcTime
WHERE tenant_id=@TenantId AND factory_id=@FactoryId AND module_code='S3'
AND metric_code=@MetricCode AND biz_date=@BizDate AND is_deleted=0
""",
new SugarParameter("@MetricValue", actual),
new SugarParameter("@TargetValue", target),
new SugarParameter("@StatusColor", status),
new SugarParameter("@TrendFlag", trend),
new SugarParameter("@CalcTime", now),
new SugarParameter("@TenantId", row.TenantId),
new SugarParameter("@FactoryId", row.FactoryId),
new SugarParameter("@MetricCode", row.MetricCode),
new SugarParameter("@BizDate", statDate));
}
var nextId = await _db.Ado.GetLongAsync($"SELECT COALESCE(MAX(id), 0) + 1 FROM {table}");
return await _db.Ado.ExecuteCommandAsync(
$"""
INSERT INTO {table}
(id, tenant_id, factory_id, status, biz_date, create_time, update_time, is_deleted, is_active,
module_code, metric_code, metric_value, target_value, status_color, trend_flag, calc_time)
VALUES
(@Id, @TenantId, @FactoryId, 'ACTIVE', @BizDate, @CalcTime, @CalcTime, 0, 1,
'S3', @MetricCode, @MetricValue, @TargetValue, @StatusColor, @TrendFlag, @CalcTime)
""",
new SugarParameter("@Id", nextId),
new SugarParameter("@TenantId", row.TenantId),
new SugarParameter("@FactoryId", row.FactoryId),
new SugarParameter("@BizDate", statDate),
new SugarParameter("@CalcTime", now),
new SugarParameter("@MetricCode", row.MetricCode),
new SugarParameter("@MetricValue", actual),
new SugarParameter("@TargetValue", target),
new SugarParameter("@StatusColor", status),
new SugarParameter("@TrendFlag", trend));
}
private IEnumerable BuildStandardCommands(string batchId, DateTime now)
{
yield return Cmd(
"""
INSERT INTO mdp_std_supplier
(tenant_id, factory_id, company_id, source_system, supplier_code, supplier_name, supplier_type, status, contact, address, currency_type, source_biz_key, sync_batch_id, sync_time)
SELECT tenant_id, factory_id, company_id, 'AIDOP',
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Supp')),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.SortName')),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Type')),
CASE WHEN JSON_EXTRACT(raw_data,'$.IsActive') IN (1, true) THEN 'ACTIVE' ELSE 'INACTIVE' END,
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.contact')),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.address')),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Curr')),
source_biz_key, @BatchId, @Now
FROM mdp_stg_supplier
WHERE source_table='SuppMaster' AND IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Supp')), '') <> ''
ON DUPLICATE KEY UPDATE supplier_name=VALUES(supplier_name), supplier_type=VALUES(supplier_type), status=VALUES(status),
contact=VALUES(contact), address=VALUES(address), currency_type=VALUES(currency_type), sync_batch_id=VALUES(sync_batch_id),
sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
""", batchId, now);
yield return Cmd(
"""
INSERT INTO mdp_std_item
(tenant_id, factory_id, company_id, source_system, item_code, item_name, model, unit, item_type, status, source_biz_key, sync_batch_id, sync_time)
SELECT tenant_id, factory_id, company_id, 'AIDOP',
COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ItemNum')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.number'))),
COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Descr')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.name')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.icitem_name'))),
COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Drawing')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.model'))),
COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.UM')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.unit'))),
COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ItemType')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.item_type'))),
COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Status')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.is_active'))),
source_biz_key, @BatchId, @Now
FROM mdp_stg_item
WHERE COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ItemNum')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.number')), '') <> ''
ON DUPLICATE KEY UPDATE item_name=VALUES(item_name), model=VALUES(model), unit=VALUES(unit), item_type=VALUES(item_type),
status=VALUES(status), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
""", batchId, now);
yield return Cmd(
"""
INSERT INTO mdp_std_supplier_item
(tenant_id, factory_id, company_id, source_system, item_code, supplier_code, supplier_name, supplier_type, quota_rate, lead_time, min_qty, packaging_qty, price, currency_type, effective_date, expire_date, status, source_biz_key, sync_batch_id, sync_time)
SELECT tenant_id, factory_id, company_id, 'AIDOP',
COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.number')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.icitem_number')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.icitem_name'))),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.supplier_number')),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.supplier_name')),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.supplier_type')),
CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.quota_rate')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.quota_rate')) AS DECIMAL(18,6)) END,
CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.lead_time')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.lead_time')) AS DECIMAL(18,6)) END,
CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.qty_min')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.qty_min')) AS DECIMAL(18,6)) END,
CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.packaging_qty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.packaging_qty')) AS DECIMAL(18,6)) END,
COALESCE(
CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.netpurchase_price')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.netpurchase_price')) AS DECIMAL(18,6)) END,
CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.order_price')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.order_price')) AS DECIMAL(18,6)) END
),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.currency_type')),
NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.effective_date')), 'null'), ''),
NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.expiring_date')), 'null'), ''),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.is_active')),
source_biz_key, @BatchId, @Now
FROM mdp_stg_source_list
WHERE source_table='srm_purchase' AND IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.supplier_number')), '') <> ''
ON DUPLICATE KEY UPDATE supplier_name=VALUES(supplier_name), quota_rate=VALUES(quota_rate), lead_time=VALUES(lead_time),
min_qty=VALUES(min_qty), packaging_qty=VALUES(packaging_qty), price=VALUES(price), sync_batch_id=VALUES(sync_batch_id),
sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
""", batchId, now);
yield return Cmd(
"""
INSERT INTO mdp_std_supply_demand
(tenant_id, factory_id, company_id, source_system, demand_no, demand_line, demand_type, item_code, item_name, required_qty, required_date, supplier_code, status, source_biz_key, sync_batch_id, sync_time)
SELECT tenant_id, factory_id, company_id, 'AIDOP',
COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.number')), source_row_id),
COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.line')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Line'))),
source_table,
COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.itemnum')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.item_number')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ItemNum'))),
COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.itemname')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.item_name')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Descr'))),
COALESCE(
CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.qty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.qty')) AS DECIMAL(18,6)) END,
CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.required_qty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.required_qty')) AS DECIMAL(18,6)) END,
CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.shortqty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.shortqty')) AS DECIMAL(18,6)) END,
0),
COALESCE(NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.required_date')), 'null'), ''), NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.needdate')), 'null'), ''), NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.NeedDate')), 'null'), '')),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.supplier_number')),
COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.status')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Status'))),
source_biz_key, @BatchId, @Now
FROM mdp_stg_supply_demand
WHERE IFNULL(source_biz_key,'') <> ''
ON DUPLICATE KEY UPDATE item_code=VALUES(item_code), item_name=VALUES(item_name), required_qty=VALUES(required_qty),
required_date=VALUES(required_date), supplier_code=VALUES(supplier_code), status=VALUES(status), sync_batch_id=VALUES(sync_batch_id),
sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
""", batchId, now);
yield return Cmd(
"""
INSERT INTO mdp_std_purchase_request
(tenant_id, factory_id, company_id, source_system, pr_no, pr_line, item_code, item_name, supplier_code, request_qty, request_date, send_date, arrive_date, status, source_biz_key, sync_batch_id, sync_time)
SELECT tenant_id, factory_id, company_id, 'AIDOP',
COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.pr_billno')), source_row_id),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.line_no')),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.item_number')),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.item_name')),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.supplier_number')),
COALESCE(
CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.qty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.qty')) AS DECIMAL(18,6)) END,
CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.request_qty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.request_qty')) AS DECIMAL(18,6)) END,
0),
NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.create_time')), 'null'), ''),
NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.send_date')), 'null'), ''),
NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.arrive_date')), 'null'), ''),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.status')),
source_biz_key, @BatchId, @Now
FROM mdp_stg_supply_demand
WHERE source_table='srm_pr_main'
ON DUPLICATE KEY UPDATE item_code=VALUES(item_code), item_name=VALUES(item_name), supplier_code=VALUES(supplier_code),
request_qty=VALUES(request_qty), status=VALUES(status), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time),
update_time=CURRENT_TIMESTAMP
""", batchId, now);
yield return Cmd(
"""
INSERT INTO mdp_std_purchase_order
(tenant_id, factory_id, company_id, source_system, po_no, po_line, po_type, supplier_code, item_code, item_name, order_qty, received_qty, returned_qty, due_date, need_date, order_date, status, buyer, work_order, source_biz_key, sync_batch_id, sync_time)
SELECT d.tenant_id, d.factory_id, d.company_id, 'AIDOP',
JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.PurOrd')),
CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Line')) AS CHAR),
JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Potype')),
JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.Supp')),
JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.ItemNum')),
COALESCE(JSON_UNQUOTE(JSON_EXTRACT(i.raw_data,'$.Descr')), JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Descr'))),
COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.QtyOrded')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.QtyOrded')) AS DECIMAL(18,6)) END, 0),
COALESCE(
CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.QtyReceived')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.QtyReceived')) AS DECIMAL(18,6)) END,
CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.RctQty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.RctQty')) AS DECIMAL(18,6)) END,
0),
COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.QtyReturned')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.QtyReturned')) AS DECIMAL(18,6)) END, 0),
NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.DueDate')), 'null'), ''),
NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.NeedDate')), 'null'), ''),
NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.OrdDate')), 'null'), ''),
JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.Status')),
JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.Buyer')),
JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.WorkOrd')),
d.source_biz_key, @BatchId, @Now
FROM mdp_stg_purchase_order d
LEFT JOIN mdp_stg_purchase_order m ON m.source_table='PurOrdMaster'
AND JSON_UNQUOTE(JSON_EXTRACT(m.raw_data,'$.PurOrd')) = JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.PurOrd'))
LEFT JOIN mdp_stg_item i ON i.source_table='ItemMaster'
AND JSON_UNQUOTE(JSON_EXTRACT(i.raw_data,'$.ItemNum')) = JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.ItemNum'))
WHERE d.source_table='PurOrdDetail' AND IFNULL(JSON_UNQUOTE(JSON_EXTRACT(d.raw_data,'$.PurOrd')), '') <> ''
ON DUPLICATE KEY UPDATE po_type=VALUES(po_type), supplier_code=VALUES(supplier_code), item_code=VALUES(item_code),
item_name=VALUES(item_name), order_qty=VALUES(order_qty), received_qty=VALUES(received_qty), returned_qty=VALUES(returned_qty),
due_date=VALUES(due_date), need_date=VALUES(need_date), order_date=VALUES(order_date), status=VALUES(status),
buyer=VALUES(buyer), work_order=VALUES(work_order), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time),
update_time=CURRENT_TIMESTAMP
""", batchId, now);
yield return Cmd(
"""
INSERT INTO mdp_std_delivery_schedule
(tenant_id, source_system, delivery_plan_no, po_no, po_line, item_code, supplier_code, supplier_name, schedule_qty, sent_qty, rest_qty, return_qty, request_date, need_date, submit_date, last_sent_date, status, source_biz_key, sync_batch_id, sync_time)
SELECT tenant_id, 'AIDOP',
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.dsnum')),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ponumber')),
CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.poline')) AS CHAR),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.itemnum')),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.suppliercode')),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.supplier')),
COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.schedqty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.schedqty')) AS DECIMAL(18,6)) END,0),
COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.sentqty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.sentqty')) AS DECIMAL(18,6)) END,0),
COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.restqty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.restqty')) AS DECIMAL(18,6)) END,0),
COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.returnqty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.returnqty')) AS DECIMAL(18,6)) END,0),
NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.requestdate')), 'null'), ''),
NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.needdate')), 'null'), ''),
NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.submitdate')), 'null'), ''),
NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.lastsentdate')), 'null'), ''),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.status')),
source_biz_key, @BatchId, @Now
FROM mdp_stg_delivery
WHERE source_table='srm_polist_ds' AND IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.dsnum')), '') <> ''
ON DUPLICATE KEY UPDATE po_no=VALUES(po_no), po_line=VALUES(po_line), item_code=VALUES(item_code), supplier_code=VALUES(supplier_code),
supplier_name=VALUES(supplier_name), schedule_qty=VALUES(schedule_qty), sent_qty=VALUES(sent_qty), rest_qty=VALUES(rest_qty),
return_qty=VALUES(return_qty), need_date=VALUES(need_date), status=VALUES(status), sync_batch_id=VALUES(sync_batch_id),
sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
""", batchId, now);
yield return Cmd(
"""
INSERT INTO mdp_std_delivery_result
(tenant_id, source_system, delivery_no, delivery_line, delivery_plan_no, po_no, po_line, item_code, delivery_qty, receipt_qty, return_qty, receipt_date, qc_status, event_time, source_biz_key, sync_batch_id, sync_time)
SELECT tenant_id, 'AIDOP',
COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.shddh')), source_row_id),
CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.id')) AS CHAR),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.dsnum')),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.po_bill')),
CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.po_billline')) AS CHAR),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.itemnum')),
COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.sh_delivery_quantity')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.sh_delivery_quantity')) AS DECIMAL(18,6)) END,0),
0,
0,
NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.receipt_date')), 'null'), ''),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.qc_status')),
COALESCE(NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.updatetime')), 'null'), ''), NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.createtime')), 'null'), '')),
source_biz_key, @BatchId, @Now
FROM mdp_stg_delivery
WHERE source_table IN ('scm_shd','scm_shdzb')
ON DUPLICATE KEY UPDATE delivery_qty=VALUES(delivery_qty), receipt_qty=VALUES(receipt_qty), return_qty=VALUES(return_qty),
receipt_date=VALUES(receipt_date), event_time=VALUES(event_time), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time),
update_time=CURRENT_TIMESTAMP
""", batchId, now);
yield return Cmd(
"""
INSERT INTO mdp_std_material_readiness
(tenant_id, source_system, work_order, op_code, item_code, component_item_code, required_qty, issued_qty, received_qty, available_qty, in_transit_qty, incoming_qty, shortage_qty, ready_status, need_date, supplier_code, source_biz_key, sync_batch_id, sync_time)
SELECT tenant_id, 'AIDOP',
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkOrd')),
CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Op')) AS CHAR),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.PMBOM')),
COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ItemNum')), source_row_id),
COALESCE(
CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.QtyRequired')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.QtyRequired')) AS DECIMAL(18,6)) END,
CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.QtyReq')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.QtyReq')) AS DECIMAL(18,6)) END,
0),
COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.QtyIssued')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.QtyIssued')) AS DECIMAL(18,6)) END, 0),
COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.QtyReceived')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.QtyReceived')) AS DECIMAL(18,6)) END, 0),
0, 0, 0,
0,
'READY',
NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.DueDate')), 'null'), ''),
NULL,
source_biz_key, @BatchId, @Now
FROM mdp_stg_work_order_material
WHERE source_table='WorkOrdDetail' AND IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkOrd')), '') <> ''
ON DUPLICATE KEY UPDATE required_qty=VALUES(required_qty), received_qty=VALUES(received_qty), need_date=VALUES(need_date),
sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
""", batchId, now);
yield return Cmd(
"""
INSERT INTO mdp_std_process_outsource_order
(tenant_id, source_system, work_order, op_code, routing_code, supplier_code, po_no, po_line, order_qty, completed_qty, due_date, status, source_biz_key, sync_batch_id, sync_time)
SELECT tenant_id, 'AIDOP',
COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkOrd')), ''),
CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Op')) AS CHAR),
COALESCE(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.RoutingCode')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.RouteCode'))),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.SupplierCode')),
NULL, NULL,
COALESCE(CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.PackingQty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.PackingQty')) AS DECIMAL(18,6)) END, 0),
0,
NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.UpdatedAt')), 'null'), ''),
CASE WHEN JSON_EXTRACT(raw_data,'$.IsEnabled') IN (1, true) THEN 'OPEN' ELSE 'DISABLED' END,
source_biz_key, @BatchId, @Now
FROM mdp_stg_work_order_material
WHERE source_table='RoutingOpDetail'
ON DUPLICATE KEY UPDATE routing_code=VALUES(routing_code), supplier_code=VALUES(supplier_code), order_qty=VALUES(order_qty),
due_date=VALUES(due_date), status=VALUES(status), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time),
update_time=CURRENT_TIMESTAMP
""", batchId, now);
}
private IEnumerable BuildDwdCommands(string batchId, DateTime now)
{
yield return Cmd(
"""
INSERT INTO dwd_supply_demand
(tenant_id, stat_date, demand_no, demand_line, demand_type, item_code, item_name, supplier_code, required_qty, fulfilled_qty, shortage_qty, required_date, demand_status, source_system, sync_batch_id, calc_time)
SELECT tenant_id, @StatDate, demand_no, IFNULL(demand_line,''), demand_type, item_code, item_name, supplier_code,
IFNULL(required_qty,0), 0, IFNULL(required_qty,0), required_date, status, source_system, @BatchId, @Now
FROM mdp_std_supply_demand
WHERE IFNULL(demand_no,'') <> ''
ON DUPLICATE KEY UPDATE item_code=VALUES(item_code), item_name=VALUES(item_name), supplier_code=VALUES(supplier_code),
required_qty=VALUES(required_qty), fulfilled_qty=VALUES(fulfilled_qty), shortage_qty=VALUES(shortage_qty),
demand_status=VALUES(demand_status), sync_batch_id=VALUES(sync_batch_id), calc_time=VALUES(calc_time), update_time=CURRENT_TIMESTAMP
""", batchId, now);
yield return Cmd(
"""
INSERT INTO dwd_supplier_delivery
(tenant_id, stat_date, po_no, po_line, po_type, supplier_code, supplier_name, item_code, item_name, order_qty, schedule_qty, delivery_qty, receipt_qty, return_qty, remaining_qty, due_date, need_date, last_delivery_date, last_receipt_date, delivery_status, risk_level, source_system, sync_batch_id, calc_time)
SELECT po.tenant_id, @StatDate, po.po_no, po.po_line, po.po_type, po.supplier_code, IFNULL(s.supplier_name, ds.supplier_name),
po.item_code, po.item_name, IFNULL(po.order_qty,0), IFNULL(ds.schedule_qty,0), IFNULL(dr.delivery_qty,0),
IFNULL(po.received_qty,0) + IFNULL(dr.receipt_qty,0), IFNULL(po.returned_qty,0) + IFNULL(dr.return_qty,0),
GREATEST(IFNULL(po.order_qty,0) - IFNULL(po.received_qty,0) - IFNULL(dr.receipt_qty,0) - IFNULL(po.returned_qty,0), 0),
po.due_date, COALESCE(ds.need_date, po.need_date), dr.event_time, dr.receipt_date,
CASE
WHEN IFNULL(po.order_qty,0) <= IFNULL(po.received_qty,0) + IFNULL(dr.receipt_qty,0) THEN 'COMPLETED'
WHEN COALESCE(ds.need_date, po.need_date, po.due_date) < @Now THEN 'DELAYED'
ELSE 'OPEN'
END,
CASE
WHEN COALESCE(ds.need_date, po.need_date, po.due_date) < @Now
AND IFNULL(po.order_qty,0) > IFNULL(po.received_qty,0) + IFNULL(dr.receipt_qty,0) THEN 'HIGH'
WHEN IFNULL(po.order_qty,0) > IFNULL(po.received_qty,0) + IFNULL(dr.receipt_qty,0) THEN 'MEDIUM'
ELSE 'LOW'
END,
'AIDOP', @BatchId, @Now
FROM mdp_std_purchase_order po
LEFT JOIN (
SELECT tenant_id, po_no, po_line, MIN(supplier_name) AS supplier_name, SUM(IFNULL(schedule_qty,0)) AS schedule_qty, MIN(need_date) AS need_date
FROM mdp_std_delivery_schedule
WHERE IFNULL(po_no,'') <> '' AND IFNULL(po_line,'') <> ''
GROUP BY tenant_id, po_no, po_line
) ds ON po.tenant_id=ds.tenant_id AND po.po_no=ds.po_no AND po.po_line=ds.po_line
LEFT JOIN (
SELECT tenant_id, po_no, po_line, SUM(IFNULL(delivery_qty,0)) AS delivery_qty, SUM(IFNULL(receipt_qty,0)) AS receipt_qty,
SUM(IFNULL(return_qty,0)) AS return_qty, MAX(event_time) AS event_time, MAX(receipt_date) AS receipt_date
FROM mdp_std_delivery_result
WHERE IFNULL(po_no,'') <> '' AND IFNULL(po_line,'') <> ''
GROUP BY tenant_id, po_no, po_line
) dr ON po.tenant_id=dr.tenant_id AND po.po_no=dr.po_no AND po.po_line=dr.po_line
LEFT JOIN (
SELECT tenant_id, supplier_code, MAX(supplier_name) AS supplier_name
FROM mdp_std_supplier
WHERE IFNULL(supplier_code,'') <> ''
GROUP BY tenant_id, supplier_code
) s ON po.tenant_id=s.tenant_id AND po.supplier_code=s.supplier_code
WHERE IFNULL(po.po_no,'') <> '' AND IFNULL(po.po_line,'') <> ''
ON DUPLICATE KEY UPDATE supplier_name=VALUES(supplier_name), item_code=VALUES(item_code), item_name=VALUES(item_name),
order_qty=VALUES(order_qty), schedule_qty=VALUES(schedule_qty), delivery_qty=VALUES(delivery_qty), receipt_qty=VALUES(receipt_qty),
return_qty=VALUES(return_qty), remaining_qty=VALUES(remaining_qty), delivery_status=VALUES(delivery_status),
risk_level=VALUES(risk_level), sync_batch_id=VALUES(sync_batch_id), calc_time=VALUES(calc_time), update_time=CURRENT_TIMESTAMP
""", batchId, now);
yield return Cmd("DELETE FROM dwd_supplier_risk WHERE stat_date=@StatDate", batchId, now);
yield return Cmd(
"""
INSERT INTO dwd_supplier_risk
(tenant_id, stat_date, supplier_code, supplier_name, item_code, risk_type, risk_level, risk_count, risk_qty, risk_reason, source_system, calc_batch_id, calc_time)
SELECT tenant_id, stat_date, IFNULL(supplier_code,''), supplier_name, item_code,
'DELIVERY_DELAY', risk_level, COUNT(1), SUM(IFNULL(remaining_qty,0)),
'供应交付存在延期或未完成风险', 'AIDOP', @BatchId, @Now
FROM dwd_supplier_delivery
WHERE stat_date=@StatDate AND risk_level IN ('HIGH','MEDIUM') AND IFNULL(supplier_code,'') <> ''
GROUP BY tenant_id, stat_date, supplier_code, supplier_name, item_code, risk_level
ON DUPLICATE KEY UPDATE risk_level=VALUES(risk_level), risk_count=VALUES(risk_count), risk_qty=VALUES(risk_qty),
risk_reason=VALUES(risk_reason), calc_batch_id=VALUES(calc_batch_id), calc_time=VALUES(calc_time), update_time=CURRENT_TIMESTAMP
""", batchId, now);
yield return Cmd(
"""
INSERT INTO dwd_process_outsource_delivery
(tenant_id, stat_date, work_order, op_code, routing_code, supplier_code, supplier_name, po_no, po_line, order_qty, completed_qty, remaining_qty, due_date, delivery_status, risk_level, source_system, calc_batch_id, calc_time)
SELECT o.tenant_id, @StatDate, o.work_order, o.op_code, o.routing_code, o.supplier_code, s.supplier_name, o.po_no, IFNULL(o.po_line,''),
IFNULL(o.order_qty,0), IFNULL(o.completed_qty,0), GREATEST(IFNULL(o.order_qty,0)-IFNULL(o.completed_qty,0),0),
o.due_date,
CASE WHEN IFNULL(o.completed_qty,0) >= IFNULL(o.order_qty,0) AND IFNULL(o.order_qty,0) > 0 THEN 'COMPLETED'
WHEN o.due_date < @Now THEN 'DELAYED' ELSE IFNULL(o.status,'OPEN') END,
CASE WHEN o.due_date < @Now AND IFNULL(o.completed_qty,0) < IFNULL(o.order_qty,0) THEN 'HIGH'
WHEN IFNULL(o.completed_qty,0) < IFNULL(o.order_qty,0) THEN 'MEDIUM' ELSE 'LOW' END,
'AIDOP', @BatchId, @Now
FROM mdp_std_process_outsource_order o
LEFT JOIN (
SELECT tenant_id, supplier_code, MAX(supplier_name) AS supplier_name
FROM mdp_std_supplier
WHERE IFNULL(supplier_code,'') <> ''
GROUP BY tenant_id, supplier_code
) s ON o.tenant_id=s.tenant_id AND o.supplier_code=s.supplier_code
WHERE IFNULL(o.work_order,'') <> '' OR IFNULL(o.routing_code,'') <> ''
ON DUPLICATE KEY UPDATE supplier_code=VALUES(supplier_code), supplier_name=VALUES(supplier_name), order_qty=VALUES(order_qty),
completed_qty=VALUES(completed_qty), remaining_qty=VALUES(remaining_qty), due_date=VALUES(due_date),
delivery_status=VALUES(delivery_status), risk_level=VALUES(risk_level), calc_batch_id=VALUES(calc_batch_id),
calc_time=VALUES(calc_time), update_time=CURRENT_TIMESTAMP
""", batchId, now);
}
private async Task UpsertMaterialReadinessDwdAsync(string batchId, DateTime now)
{
return await _db.Ado.ExecuteCommandAsync(
"""
INSERT INTO dwd_material_readiness
(tenant_id, stat_date, work_order, op_code, parent_item_code, component_item_code, component_item_name, required_qty, cumulative_required_qty, stock_available_qty, qc_pending_qty, in_transit_qty, delivery_reply_qty, available_qty, shortage_qty, ready_status, supplier_code, supplier_name, need_date, calc_batch_id, calc_time)
SELECT s.tenant_id, @StatDate, s.work_order, s.op_code, s.item_code, s.component_item_code, IFNULL(i.item_name, ''),
IFNULL(s.required_qty, 0), IFNULL(s.required_qty, 0), IFNULL(s.available_qty, 0), 0, IFNULL(s.in_transit_qty, 0),
IFNULL(s.incoming_qty, 0), IFNULL(s.available_qty, 0), IFNULL(s.shortage_qty, 0),
CASE WHEN IFNULL(s.shortage_qty, 0) > 0 THEN 'SHORTAGE' ELSE 'READY' END,
s.supplier_code, IFNULL(sp.supplier_name, ''), s.need_date, @BatchId, @Now
FROM mdp_std_material_readiness s
LEFT JOIN mdp_std_item i ON s.tenant_id=i.tenant_id AND s.component_item_code=i.item_code
LEFT JOIN mdp_std_supplier sp ON s.tenant_id=sp.tenant_id AND s.supplier_code=sp.supplier_code
WHERE IFNULL(s.work_order, '') <> ''
ON DUPLICATE KEY UPDATE component_item_name=VALUES(component_item_name), required_qty=VALUES(required_qty),
cumulative_required_qty=VALUES(cumulative_required_qty), stock_available_qty=VALUES(stock_available_qty),
qc_pending_qty=VALUES(qc_pending_qty), in_transit_qty=VALUES(in_transit_qty), delivery_reply_qty=VALUES(delivery_reply_qty),
available_qty=VALUES(available_qty), shortage_qty=VALUES(shortage_qty), ready_status=VALUES(ready_status),
supplier_code=VALUES(supplier_code), supplier_name=VALUES(supplier_name), need_date=VALUES(need_date),
calc_batch_id=VALUES(calc_batch_id), calc_time=VALUES(calc_time), update_time=CURRENT_TIMESTAMP
""",
new SugarParameter("@StatDate", now.Date),
new SugarParameter("@BatchId", batchId),
new SugarParameter("@Now", now));
}
private async Task DeleteCurrentShortageAsync(DateTime statDate)
{
return await _db.Ado.ExecuteCommandAsync(
"""
DELETE s
FROM dwd_material_shortage s
JOIN (
SELECT DISTINCT work_order
FROM mdp_std_material_readiness
WHERE IFNULL(work_order, '') <> ''
) w ON s.work_order = w.work_order
WHERE s.stat_date = @StatDate
""",
new SugarParameter("@StatDate", statDate));
}
private async Task InsertMaterialShortageAsync(string batchId, DateTime now)
{
return await _db.Ado.ExecuteCommandAsync(
"""
INSERT INTO dwd_material_shortage
(tenant_id, stat_date, work_order, op_code, component_item_code, shortage_qty, shortage_reason, expected_supply_date, supplier_code, related_po_no, risk_level, calc_batch_id, calc_time)
SELECT s.tenant_id, @StatDate, s.work_order, s.op_code, s.component_item_code, IFNULL(s.shortage_qty, 0),
'标准层缺料数量大于 0', s.need_date, s.supplier_code, NULL,
CASE WHEN IFNULL(s.shortage_qty, 0) > IFNULL(s.required_qty, 0) THEN 'HIGH'
WHEN IFNULL(s.shortage_qty, 0) > 0 THEN 'MEDIUM' ELSE 'LOW' END,
@BatchId, @Now
FROM mdp_std_material_readiness s
WHERE IFNULL(s.work_order, '') <> '' AND IFNULL(s.shortage_qty, 0) > 0
""",
new SugarParameter("@StatDate", now.Date),
new SugarParameter("@BatchId", batchId),
new SugarParameter("@Now", now));
}
private async Task InsertSyncLogAsync(long entityId, string entityName, string batchId, int rowsRead)
{
await _db.Ado.ExecuteCommandAsync(
"""
INSERT INTO mdp_sync_log
(tenant_id, entity_id, source_code, entity_name, sync_batch_id, sync_type, trigger_type, sync_start, rows_read, status)
VALUES (0, @EntityId, 'AIDOPDEV_MYSQL', @EntityName, @BatchId, 'FULL', 'AUTO', NOW(), @RowsRead, 'RUNNING')
""",
new SugarParameter("@EntityId", entityId),
new SugarParameter("@EntityName", entityName),
new SugarParameter("@BatchId", batchId),
new SugarParameter("@RowsRead", rowsRead));
return await _db.Ado.GetLongAsync(
"SELECT id FROM mdp_sync_log WHERE sync_batch_id=@BatchId AND entity_id=@EntityId ORDER BY id DESC LIMIT 1",
new List
{
new("@BatchId", batchId),
new("@EntityId", entityId)
});
}
private async Task MarkSyncLogSuccessAsync(long logId, DateTime started, int affected)
{
await _db.Ado.ExecuteCommandAsync(
"""
UPDATE mdp_sync_log
SET sync_end=NOW(), duration_ms=@DurationMs, rows_insert=@RowsInsert, rows_update=0, rows_skip=0, rows_error=0, status='SUCCESS'
WHERE id=@Id
""",
new SugarParameter("@DurationMs", (int)(DateTime.Now - started).TotalMilliseconds),
new SugarParameter("@RowsInsert", affected),
new SugarParameter("@Id", logId));
}
private async Task MarkSyncLogFailedAsync(long logId, DateTime started, string message)
{
await _db.Ado.ExecuteCommandAsync(
"""
UPDATE mdp_sync_log
SET sync_end=NOW(), duration_ms=@DurationMs, rows_error=1, status='FAILED', error_msg=@ErrorMsg
WHERE id=@Id
""",
new SugarParameter("@DurationMs", (int)(DateTime.Now - started).TotalMilliseconds),
new SugarParameter("@ErrorMsg", message.Length <= 1000 ? message : message[..1000]),
new SugarParameter("@Id", logId));
}
private async Task InsertTransformRunLogAsync(string batchId, DateTime startedAt, string triggerType)
{
await _db.Ado.ExecuteCommandAsync(
"""
INSERT INTO mdp_transform_run_log
(tenant_id, job_code, job_name, trigger_type, batch_id, status, start_time)
VALUES (0, 'S3_MDP_SYNC_TRANSFORM', 'S3 MDP同步与标准化转换', @TriggerType, @BatchId, 'RUNNING', @StartTime)
""",
new SugarParameter("@TriggerType", NormalizeTriggerType(triggerType)),
new SugarParameter("@BatchId", batchId),
new SugarParameter("@StartTime", startedAt));
return await _db.Ado.GetLongAsync(
"SELECT id FROM mdp_transform_run_log WHERE batch_id=@BatchId ORDER BY id DESC LIMIT 1",
new List { new("@BatchId", batchId) });
}
private async Task MarkTransformRunSuccessAsync(long runLogId, DateTime startedAt, S3MdpSyncTransformResult result)
{
var finishedAt = DateTime.Now;
await _db.Ado.ExecuteCommandAsync(
"""
UPDATE mdp_transform_run_log
SET status='SUCCESS', end_time=@EndTime, duration_ms=@DurationMs,
stage_rows=@StageRows, standard_rows=@StandardRows, dwd_rows=@DwdRows,
summary_json=@SummaryJson, update_time=CURRENT_TIMESTAMP
WHERE id=@Id
""",
new SugarParameter("@EndTime", finishedAt),
new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds),
new SugarParameter("@StageRows", result.StageRows),
new SugarParameter("@StandardRows", result.StandardRows),
new SugarParameter("@DwdRows", result.DwdRows),
new SugarParameter("@SummaryJson", BuildRunSummaryJson(result)),
new SugarParameter("@Id", runLogId));
}
private async Task MarkTransformRunFailedAsync(long runLogId, DateTime startedAt, string message)
{
var finishedAt = DateTime.Now;
await _db.Ado.ExecuteCommandAsync(
"""
UPDATE mdp_transform_run_log
SET status='FAILED', end_time=@EndTime, duration_ms=@DurationMs,
error_message=@ErrorMessage, update_time=CURRENT_TIMESTAMP
WHERE id=@Id
""",
new SugarParameter("@EndTime", finishedAt),
new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds),
new SugarParameter("@ErrorMessage", Truncate(message, 2000)),
new SugarParameter("@Id", runLogId));
}
private static S3MdpSqlCommand Cmd(string sql, string batchId, DateTime now)
{
return new S3MdpSqlCommand(sql, new[]
{
new SugarParameter("@BatchId", batchId),
new SugarParameter("@Now", now),
new SugarParameter("@StatDate", now.Date)
});
}
private static string BuildJsonObjectExpression(IEnumerable columns)
{
var parts = columns.SelectMany(c => new[] { $"'{c.Replace("'", "''")}'", $"s.`{c}`" });
return $"JSON_OBJECT({string.Join(",", parts)})";
}
private static string FindColumn(IEnumerable columns, string expected)
{
return columns.First(u => string.Equals(u, expected, StringComparison.OrdinalIgnoreCase));
}
private static string NormalizeTriggerType(string? triggerType)
{
return string.IsNullOrWhiteSpace(triggerType) ? "AUTO" : triggerType.Trim().ToUpperInvariant();
}
private static string BuildRunSummaryJson(S3MdpSyncTransformResult result)
{
return $$"""{"batchId":"{{result.BatchId}}","stageRows":{{result.StageRows}},"standardRows":{{result.StandardRows}},"dwdRows":{{result.DwdRows}},"kpiRows":{{result.KpiRows}}}""";
}
private static string ResolveKpiValueTable(int metricLevel)
{
return metricLevel switch
{
1 => "ado_s9_kpi_value_l1_day",
2 => "ado_s9_kpi_value_l2_day",
3 => "ado_s9_kpi_value_l3_day",
4 => "ado_s9_kpi_value_l4_day",
_ => "ado_s9_kpi_value_l2_day"
};
}
private static decimal DefaultS3Target(string metricCode)
{
return metricCode switch
{
"S3_L1_001" => 15m,
"S3_L1_002" => 95m,
"S3_L2_004" => 10.5m,
"S3_L2_005" => 96.72m,
_ => 0m
};
}
private static string ResolveKpiStatus(decimal actual, decimal target, string? direction, decimal? yellowThreshold, decimal? redThreshold)
{
if (target <= 0) return "gray";
var ratio = actual / target * 100m;
if (string.Equals(direction, "lower_is_better", StringComparison.OrdinalIgnoreCase))
{
if (actual <= target) return "green";
if (ratio <= (yellowThreshold ?? 110m)) return "yellow";
return ratio >= (redThreshold ?? 120m) ? "red" : "yellow";
}
if (actual >= target) return "green";
if (ratio >= (yellowThreshold ?? 95m)) return "yellow";
return ratio <= (redThreshold ?? 80m) ? "red" : "yellow";
}
private static string ResolveTrendFlag(decimal actual, decimal? previous)
{
if (previous == null) return "flat";
if (actual > previous.Value) return "up";
if (actual < previous.Value) return "down";
return "flat";
}
private static string Truncate(string? raw, int maxLength)
{
if (string.IsNullOrEmpty(raw)) return string.Empty;
return raw.Length <= maxLength ? raw : raw[..maxLength];
}
private sealed class S3ColumnRow
{
public string ColumnName { get; set; } = string.Empty;
}
private sealed class S3MdpEntityRow
{
public long Id { get; set; }
public string EntityName { get; set; } = string.Empty;
}
private sealed class S3KpiCalcRow
{
public long TenantId { get; set; }
public long FactoryId { get; set; }
public string MetricCode { get; set; } = string.Empty;
public decimal? MetricValue { get; set; }
}
private sealed class S3KpiMetaRow
{
public int MetricLevel { get; set; }
public string Direction { get; set; } = "higher_is_better";
public decimal? YellowThreshold { get; set; }
public decimal? RedThreshold { get; set; }
}
private sealed class S3KpiValueRow
{
public long Id { get; set; }
public decimal? MetricValue { get; set; }
public decimal? TargetValue { get; set; }
}
}
public sealed class S3MdpSyncTransformResult
{
public long RunLogId { get; set; }
public string BatchId { get; set; } = string.Empty;
public int StageRows { get; set; }
public int StandardRows { get; set; }
public int DwdRows { get; set; }
public int KpiRows { get; set; }
}
public sealed class S3MaterialRefreshResult
{
public string BatchId { get; set; } = string.Empty;
public int StdCount { get; set; }
public int ReadinessCount { get; set; }
public int ShortageCount { get; set; }
}
internal sealed record S3MdpSqlCommand(string Sql, SugarParameter[] Parameters);
internal sealed record S3MdpEntityConfig(
string EntityCode,
string SourceTable,
string TargetTable,
string SourceRowIdExpression,
string SourceBizKeyExpression)
{
public static readonly IReadOnlyList All = new List
{
new("S3_SUPPLIER", "SuppMaster", "mdp_stg_supplier", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`Supp`,''))"),
new("S3_CONSIGNEE_SUPPLIER", "ConsigneeAddressMaster", "mdp_stg_supplier", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`Address`,''))"),
new("S3_ITEM_ERP", "ItemMaster", "mdp_stg_item", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`ItemNum`,''))"),
new("S3_ITEM_NEW", "ic_item", "mdp_stg_item", "Id", "CONCAT(IFNULL(s.`tenant_id`,0), ':', IFNULL(s.`number`,''))"),
new("S3_SOURCE_LIST", "srm_purchase", "mdp_stg_source_list", "Id", "CONCAT(IFNULL(s.`number`,''), ':', IFNULL(s.`supplier_number`,''))"),
new("S3_SUPPLY_DEMAND", "ic_demandschedule", "mdp_stg_supply_demand", "Id", "CAST(s.`Id` AS CHAR)"),
new("S3_PURCHASE_REQUEST", "srm_pr_main", "mdp_stg_supply_demand", "Id", "COALESCE(s.`pr_billno`, CAST(s.`Id` AS CHAR))"),
new("S3_PURCHASE_ORDER_MASTER", "PurOrdMaster", "mdp_stg_purchase_order", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`PurOrd`,''))"),
new("S3_PURCHASE_ORDER_DETAIL", "PurOrdDetail", "mdp_stg_purchase_order", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`PurOrd`,''), ':', IFNULL(s.`Line`,''))"),
new("S3_DELIVERY_PLAN", "srm_polist_ds", "mdp_stg_delivery", "Id", "s.`dsnum`"),
new("S3_SHIPPER_MASTER", "scm_shd", "mdp_stg_delivery", "id", "COALESCE(s.`shddh`, CAST(s.`id` AS CHAR))"),
new("S3_SHIPPER_DETAIL", "scm_shdzb", "mdp_stg_delivery", "id", "CONCAT(IFNULL(s.`glid`,''), ':', IFNULL(s.`id`,''))"),
new("S3_RECEIPT_MASTER", "PurOrdRctMaster", "mdp_stg_receipt", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`Receiver`,''))"),
new("S3_RECEIPT_DETAIL", "PurOrdRctDetail", "mdp_stg_receipt", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`Receiver`,''), ':', IFNULL(s.`Line`,''))"),
new("S3_WORK_ORDER_MASTER", "WorkOrdMaster", "mdp_stg_work_order_material", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`WorkOrd`,''))"),
new("S3_WORK_ORDER_DETAIL", "WorkOrdDetail", "mdp_stg_work_order_material", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`WorkOrd`,''), ':', IFNULL(s.`Op`,''), ':', IFNULL(s.`ItemNum`,''))"),
new("S3_WORK_ORDER_ROUTING", "WorkOrdRouting", "mdp_stg_work_order_material", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`WorkOrd`,''), ':', IFNULL(s.`OP`,''))"),
new("S3_ROUTING_OUTSOURCE", "RoutingOpDetail", "mdp_stg_work_order_material", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`RoutingCode`,''), ':', IFNULL(s.`Op`,''), ':', IFNULL(s.`SupplierCode`,''))"),
new("S3_INVENTORY", "InvMaster", "mdp_stg_work_order_material", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`ItemNum`,''), ':', IFNULL(s.`Location`,''))")
};
}