namespace Admin.NET.Plugin.AiDOP.Production;
///
/// S2 生产排程 MDP 同步、标准化、DWD 与 KPI 计算服务。
///
public class S2MdpSyncTransformService : ITransient
{
private const string JobCode = "S2_MDP_SYNC_TRANSFORM";
private readonly ISqlSugarClient _db;
public S2MdpSyncTransformService(ISqlSugarClient db)
{
_db = db;
}
public async Task RunFullAsync(CancellationToken cancellationToken = default, string triggerType = "AUTO")
{
cancellationToken.ThrowIfCancellationRequested();
var now = DateTime.Now;
var batchId = $"S2_MDP_FULL_{now:yyyyMMddHHmmss}";
var runLogId = await InsertTransformRunLogAsync(batchId, now, triggerType);
var result = new S2MdpSyncTransformResult { BatchId = batchId, RunLogId = runLogId };
try
{
await EnsureS2RuntimeObjectsAsync();
result.StageRows = await SyncStagingAsync(batchId, now, cancellationToken);
result.StandardRows = await TransformStandardAsync(batchId, now, cancellationToken);
result.DwdRows = await BuildDwdAsync(batchId, now, cancellationToken);
result.KpiRows = await BuildS2KpiValuesAsync(batchId, now, cancellationToken);
await MarkTransformRunSuccessAsync(runLogId, now, result);
return result;
}
catch (Exception ex)
{
await MarkTransformRunFailedAsync(runLogId, now, ex.Message);
throw;
}
}
private async Task EnsureS2RuntimeObjectsAsync()
{
foreach (var sql in S2MdpDdl.SqlBlocks)
{
await _db.Ado.ExecuteCommandAsync(sql);
}
}
private async Task SyncStagingAsync(string batchId, DateTime now, CancellationToken cancellationToken)
{
var total = 0;
foreach (var entity in S2MdpEntityConfig.All)
{
cancellationToken.ThrowIfCancellationRequested();
total += await SyncOneEntityAsync(entity, batchId, now);
}
return total;
}
private async Task SyncOneEntityAsync(S2MdpEntityConfig entity, string batchId, DateTime now)
{
var entityRow = await _db.Ado.SqlQuerySingleAsync(
"SELECT id AS Id, entity_name AS EntityName FROM mdp_entity WHERE tenant_id=0 AND entity_code=@EntityCode LIMIT 1",
new SugarParameter("@EntityCode", entity.EntityCode));
if (entityRow == null) throw Oops.Oh($"未找到 MDP 实体配置:{entity.EntityCode}");
var columns = await _db.Ado.SqlQueryAsync(
"""
SELECT COLUMN_NAME AS ColumnName
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA=DATABASE() AND TABLE_NAME=@TableName
ORDER BY ORDINAL_POSITION
""",
new SugarParameter("@TableName", entity.SourceTable));
if (columns.Count == 0) throw Oops.Oh($"未找到源表:{entity.SourceTable}");
var names = columns.Select(u => u.ColumnName).ToList();
var tenantExpr = names.Any(u => string.Equals(u, "tenant_id", StringComparison.OrdinalIgnoreCase))
? $"IFNULL(s.`{FindColumn(names, "tenant_id")}`,0)"
: "0";
var factoryExpr = names.Any(u => string.Equals(u, "factory_id", StringComparison.OrdinalIgnoreCase))
? $"s.`{FindColumn(names, "factory_id")}`"
: "NULL";
var domainExpr = names.Any(u => string.Equals(u, "Domain", StringComparison.OrdinalIgnoreCase))
? $"s.`{FindColumn(names, "Domain")}`"
: "NULL";
var sourceRowExpr = names.Any(u => string.Equals(u, entity.SourceRowIdExpression, StringComparison.OrdinalIgnoreCase))
? $"s.`{FindColumn(names, entity.SourceRowIdExpression)}`"
: entity.SourceRowIdExpression;
var rawDataExpr = BuildJsonObjectExpression(names);
var rowsRead = await _db.Ado.GetIntAsync($"SELECT COUNT(1) FROM `{entity.SourceTable}`");
var logId = await InsertSyncLogAsync(entityRow.Id, entityRow.EntityName, batchId, rowsRead);
var started = DateTime.Now;
try
{
var affected = await _db.Ado.ExecuteCommandAsync(
$"""
INSERT INTO `{entity.TargetTable}`
(tenant_id, factory_id, source_system, source_table, source_row_id, source_biz_key, sync_batch_id, sync_time, process_status, raw_data)
SELECT
{tenantExpr},
COALESCE({factoryExpr}, {domainExpr}),
'AIDOP',
@SourceTable,
CAST({sourceRowExpr} AS CHAR),
CAST(COALESCE({entity.SourceBizKeyExpression}, CAST({sourceRowExpr} AS CHAR)) AS CHAR),
@BatchId,
@Now,
'PENDING',
{rawDataExpr}
FROM `{entity.SourceTable}` s
ON DUPLICATE KEY UPDATE
tenant_id=VALUES(tenant_id),
factory_id=VALUES(factory_id),
sync_batch_id=VALUES(sync_batch_id),
sync_time=VALUES(sync_time),
process_status=VALUES(process_status),
raw_data=VALUES(raw_data),
update_time=CURRENT_TIMESTAMP
""",
new SugarParameter("@SourceTable", entity.SourceTable),
new SugarParameter("@BatchId", batchId),
new SugarParameter("@Now", now));
await MarkSyncLogSuccessAsync(logId, started, affected);
return rowsRead;
}
catch (Exception ex)
{
await MarkSyncLogFailedAsync(logId, started, ex.Message);
throw;
}
}
private async Task TransformStandardAsync(string batchId, DateTime now, CancellationToken cancellationToken)
{
var total = 0;
foreach (var command in BuildStandardCommands(batchId, now))
{
cancellationToken.ThrowIfCancellationRequested();
total += await _db.Ado.ExecuteCommandAsync(command.Sql, command.Parameters);
}
return total;
}
private async Task BuildDwdAsync(string batchId, DateTime now, CancellationToken cancellationToken)
{
var total = 0;
foreach (var command in BuildDwdCommands(batchId, now))
{
cancellationToken.ThrowIfCancellationRequested();
total += await _db.Ado.ExecuteCommandAsync(command.Sql, command.Parameters);
}
return total;
}
private async Task BuildS2KpiValuesAsync(string batchId, DateTime now, CancellationToken cancellationToken)
{
var rows = await CalculateS2KpiValuesAsync(batchId, now.Date);
var affected = 0;
foreach (var row in rows)
{
cancellationToken.ThrowIfCancellationRequested();
affected += await UpsertS2KpiValueAsync(row, now.Date, now);
}
return affected;
}
private IEnumerable BuildStandardCommands(string batchId, DateTime now)
{
yield return Cmd(
"""
INSERT INTO mdp_std_work_order_schedule
(tenant_id, factory_id, source_system, work_order, sales_order_no, item_code, item_name, site_code, status,
priority, urgent_flag, qty_ordered, qty_completed, order_date, due_date, release_date, prod_line,
source_biz_key, sync_batch_id, sync_time)
SELECT tenant_id,
CASE WHEN factory_id REGEXP '^[0-9]+$' THEN CAST(factory_id AS UNSIGNED) ELSE 1 END,
'AIDOP',
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkOrd')),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.SalesJob')),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ItemNum')),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ItemName')),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Site')),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Status')),
CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Priority')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Priority')) AS DECIMAL(18,6)) END,
CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Urgent')) IN ('1','true','True') THEN 1 ELSE 0 END,
CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.QtyOrded')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.QtyOrded')) AS DECIMAL(18,6)) ELSE 0 END,
CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.QtyCompleted')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.QtyCompleted')) AS DECIMAL(18,6)) ELSE 0 END,
NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.OrdDate')), 'null'), ''),
NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.DueDate')), 'null'), ''),
NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ReleaseDate')), 'null'), ''),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ProdLine')),
source_biz_key, @BatchId, @Now
FROM mdp_stg_schedule
WHERE source_table='WorkOrdMaster' AND IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkOrd')), '') <> ''
ON DUPLICATE KEY UPDATE
sales_order_no=VALUES(sales_order_no), item_code=VALUES(item_code), item_name=VALUES(item_name),
site_code=VALUES(site_code), status=VALUES(status), priority=VALUES(priority), urgent_flag=VALUES(urgent_flag),
qty_ordered=VALUES(qty_ordered), qty_completed=VALUES(qty_completed), order_date=VALUES(order_date),
due_date=VALUES(due_date), release_date=VALUES(release_date), prod_line=VALUES(prod_line),
sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
""", batchId, now);
yield return Cmd(
"""
INSERT INTO mdp_std_operation_schedule
(tenant_id, factory_id, source_system, work_order, op_no, work_center, line_code, item_code,
plan_date, prod_date, start_time, end_time, ord_qty, comp_qty, run_crew, employee,
source_biz_key, sync_batch_id, sync_time)
SELECT tenant_id,
CASE WHEN factory_id REGEXP '^[0-9]+$' THEN CAST(factory_id AS UNSIGNED) ELSE 1 END,
'AIDOP',
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkOrds')),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Op')),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkCtr')),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Line')),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ItemNum')),
NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.PlanDate')), 'null'), ''),
NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ProdDate')), 'null'), ''),
NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.StartTime')), 'null'), ''),
NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.EndTime')), 'null'), ''),
CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.OrdQty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.OrdQty')) AS DECIMAL(18,6)) ELSE 0 END,
CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.CompQty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.CompQty')) AS DECIMAL(18,6)) ELSE 0 END,
CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.RunCrew')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.RunCrew')) AS DECIMAL(18,6)) END,
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Employee')),
source_biz_key, @BatchId, @Now
FROM mdp_stg_schedule
WHERE source_table='PeriodSequenceDet' AND IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkOrds')), '') <> ''
ON DUPLICATE KEY UPDATE
work_center=VALUES(work_center), line_code=VALUES(line_code), item_code=VALUES(item_code),
plan_date=VALUES(plan_date), prod_date=VALUES(prod_date), start_time=VALUES(start_time), end_time=VALUES(end_time),
ord_qty=VALUES(ord_qty), comp_qty=VALUES(comp_qty), run_crew=VALUES(run_crew), employee=VALUES(employee),
sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
""", batchId, now);
yield return Cmd(
"""
INSERT INTO mdp_std_operation_schedule
(tenant_id, factory_id, source_system, work_order, op_no, work_center, line_code, item_code,
plan_date, prod_date, start_time, end_time, ord_qty, comp_qty, run_crew, employee,
source_biz_key, sync_batch_id, sync_time)
SELECT tenant_id,
CASE WHEN factory_id REGEXP '^[0-9]+$' THEN CAST(factory_id AS UNSIGNED) ELSE 1 END,
'AIDOP',
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkOrd')),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Op')),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkCtr')),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Line')),
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ItemNum')),
NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkDate')), 'null'), ''),
NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkDate')), 'null'), ''),
NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkStartTime')), 'null'), ''),
NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkEndTime')), 'null'), ''),
CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkQty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkQty')) AS DECIMAL(18,6)) ELSE 0 END,
CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkQty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkQty')) AS DECIMAL(18,6)) ELSE 0 END,
CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.AssignedPersonnelCount')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.AssignedPersonnelCount')) AS DECIMAL(18,6)) END,
JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.AssignedEmployeeID')),
source_biz_key, @BatchId, @Now
FROM mdp_stg_schedule
WHERE source_table='ScheduleResultOpMaster' AND IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkOrd')), '') <> ''
ON DUPLICATE KEY UPDATE
work_center=VALUES(work_center), line_code=VALUES(line_code), item_code=VALUES(item_code),
plan_date=VALUES(plan_date), prod_date=VALUES(prod_date), start_time=VALUES(start_time), end_time=VALUES(end_time),
ord_qty=VALUES(ord_qty), comp_qty=VALUES(comp_qty), run_crew=VALUES(run_crew), employee=VALUES(employee),
sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP
""", batchId, now);
}
private IEnumerable BuildDwdCommands(string batchId, DateTime now)
{
yield return Cmd(
"""
INSERT INTO dwd_order_schedule_trans
(tenant_id, factory_id, stat_date, work_order, sales_order_no, item_code, item_name, site_code, prod_line,
status, urgent_flag, qty_ordered, qty_completed, order_date, due_date, release_date, first_plan_date,
last_plan_date, first_start_time, last_end_time, operation_count, scheduled_qty, completed_op_qty,
schedule_cycle_days, schedule_satisfaction_flag, wip_qty, resource_person_count, calc_batch_id, calc_time)
SELECT w.tenant_id, COALESCE(w.factory_id, 1), @StatDate,
w.work_order, w.sales_order_no, w.item_code, w.item_name, w.site_code, w.prod_line,
w.status, w.urgent_flag, w.qty_ordered, w.qty_completed, w.order_date, w.due_date, w.release_date,
MIN(o.plan_date), MAX(o.plan_date), MIN(o.start_time), MAX(o.end_time),
COUNT(o.id), SUM(IFNULL(o.ord_qty, 0)), SUM(IFNULL(o.comp_qty, 0)),
CASE
WHEN COALESCE(MIN(o.plan_date), MAX(o.plan_date)) IS NOT NULL
AND COALESCE(w.release_date, w.order_date) IS NOT NULL
THEN TIMESTAMPDIFF(HOUR, COALESCE(w.release_date, w.order_date), COALESCE(MAX(o.plan_date), MIN(o.plan_date))) / 24
END,
CASE
WHEN w.due_date IS NOT NULL AND COALESCE(MAX(o.plan_date), MIN(o.plan_date)) IS NOT NULL
AND DATE(COALESCE(MAX(o.plan_date), MIN(o.plan_date))) <= DATE(w.due_date) THEN 1
ELSE 0
END,
GREATEST(IFNULL(w.qty_ordered, 0) - IFNULL(w.qty_completed, 0), 0),
SUM(IFNULL(o.run_crew, 0)),
@BatchId, @Now
FROM mdp_std_work_order_schedule w
LEFT JOIN mdp_std_operation_schedule o
ON o.tenant_id=w.tenant_id AND IFNULL(o.work_order,'')=IFNULL(w.work_order,'')
WHERE IFNULL(w.work_order, '') <> ''
GROUP BY w.tenant_id, COALESCE(w.factory_id, 1), w.work_order, w.sales_order_no, w.item_code, w.item_name,
w.site_code, w.prod_line, w.status, w.urgent_flag, w.qty_ordered, w.qty_completed,
w.order_date, w.due_date, w.release_date
ON DUPLICATE KEY UPDATE
sales_order_no=VALUES(sales_order_no), item_code=VALUES(item_code), item_name=VALUES(item_name),
site_code=VALUES(site_code), prod_line=VALUES(prod_line), status=VALUES(status), urgent_flag=VALUES(urgent_flag),
qty_ordered=VALUES(qty_ordered), qty_completed=VALUES(qty_completed), order_date=VALUES(order_date),
due_date=VALUES(due_date), release_date=VALUES(release_date), first_plan_date=VALUES(first_plan_date),
last_plan_date=VALUES(last_plan_date), first_start_time=VALUES(first_start_time), last_end_time=VALUES(last_end_time),
operation_count=VALUES(operation_count), scheduled_qty=VALUES(scheduled_qty), completed_op_qty=VALUES(completed_op_qty),
schedule_cycle_days=VALUES(schedule_cycle_days), schedule_satisfaction_flag=VALUES(schedule_satisfaction_flag),
wip_qty=VALUES(wip_qty), resource_person_count=VALUES(resource_person_count), calc_time=VALUES(calc_time),
update_time=CURRENT_TIMESTAMP
""", batchId, now);
}
private async Task> CalculateS2KpiValuesAsync(string batchId, DateTime statDate)
{
return await _db.Ado.SqlQueryAsync(
"""
SELECT tenant_id AS TenantId, factory_id AS FactoryId, 'S2_L1_001' AS MetricCode,
ROUND(AVG(schedule_cycle_days), 4) AS MetricValue
FROM dwd_order_schedule_trans
WHERE calc_batch_id=@BatchId AND schedule_cycle_days IS NOT NULL AND schedule_cycle_days >= 0
GROUP BY tenant_id, factory_id
UNION ALL
SELECT tenant_id AS TenantId, factory_id AS FactoryId, 'S2_L1_002' AS MetricCode,
ROUND(100 * SUM(schedule_satisfaction_flag) / NULLIF(COUNT(1), 0), 4) AS MetricValue
FROM dwd_order_schedule_trans
WHERE calc_batch_id=@BatchId
GROUP BY tenant_id, factory_id
UNION ALL
SELECT tenant_id AS TenantId, factory_id AS FactoryId, 'S2_L1_003' AS MetricCode,
ROUND(COUNT(1) / NULLIF(SUM(CASE WHEN IFNULL(resource_person_count, 0) > 0 THEN resource_person_count ELSE 1 END), 0), 4) AS MetricValue
FROM dwd_order_schedule_trans
WHERE calc_batch_id=@BatchId
GROUP BY tenant_id, factory_id
UNION ALL
SELECT tenant_id AS TenantId, factory_id AS FactoryId, 'S2_L1_004' AS MetricCode,
ROUND(SUM(wip_qty) / NULLIF(SUM(CASE WHEN qty_completed > 0 THEN qty_completed ELSE completed_op_qty END), 0) * 30, 4) AS MetricValue
FROM dwd_order_schedule_trans
WHERE calc_batch_id=@BatchId
GROUP BY tenant_id, factory_id
UNION ALL
SELECT tenant_id AS TenantId, factory_id AS FactoryId, 'S2_L2_001' AS MetricCode,
ROUND(AVG(schedule_cycle_days), 4) AS MetricValue
FROM dwd_order_schedule_trans
WHERE calc_batch_id=@BatchId AND schedule_cycle_days IS NOT NULL AND schedule_cycle_days >= 0
GROUP BY tenant_id, factory_id
UNION ALL
SELECT tenant_id AS TenantId, factory_id AS FactoryId, 'S2_L2_002' AS MetricCode,
ROUND(100 * SUM(schedule_satisfaction_flag) / NULLIF(COUNT(1), 0), 4) AS MetricValue
FROM dwd_order_schedule_trans
WHERE calc_batch_id=@BatchId
GROUP BY tenant_id, factory_id
UNION ALL
SELECT tenant_id AS TenantId, factory_id AS FactoryId, 'S2_L2_003' AS MetricCode,
ROUND(COUNT(1) / NULLIF(SUM(CASE WHEN IFNULL(resource_person_count, 0) > 0 THEN resource_person_count ELSE 1 END), 0), 4) AS MetricValue
FROM dwd_order_schedule_trans
WHERE calc_batch_id=@BatchId
GROUP BY tenant_id, factory_id
""",
new SugarParameter("@BatchId", batchId),
new SugarParameter("@StatDate", statDate));
}
private async Task UpsertS2KpiValueAsync(S2KpiCalcRow row, DateTime statDate, DateTime now)
{
if (row.MetricValue == null) return 0;
var meta = await _db.Ado.SqlQuerySingleAsync(
"""
SELECT MetricLevel, Direction, YellowThreshold, RedThreshold
FROM ado_smart_ops_kpi_master
WHERE TenantId=@TenantId AND ModuleCode='S2' AND MetricCode=@MetricCode AND IsEnabled=1
LIMIT 1
""",
new SugarParameter("@TenantId", row.TenantId),
new SugarParameter("@MetricCode", row.MetricCode));
if (meta == null) return 0;
var table = ResolveKpiValueTable(meta.MetricLevel);
var current = await _db.Ado.SqlQuerySingleAsync(
$"""
SELECT id AS Id, metric_value AS MetricValue, target_value AS TargetValue
FROM {table}
WHERE tenant_id=@TenantId AND factory_id=@FactoryId AND module_code='S2'
AND metric_code=@MetricCode AND biz_date=@BizDate AND is_deleted=0
ORDER BY id
LIMIT 1
""",
new SugarParameter("@TenantId", row.TenantId),
new SugarParameter("@FactoryId", row.FactoryId),
new SugarParameter("@MetricCode", row.MetricCode),
new SugarParameter("@BizDate", statDate));
var prior = await _db.Ado.SqlQuerySingleAsync(
$"""
SELECT id AS Id, metric_value AS MetricValue, target_value AS TargetValue
FROM {table}
WHERE tenant_id=@TenantId AND factory_id=@FactoryId AND module_code='S2'
AND metric_code=@MetricCode AND biz_date<@BizDate AND is_deleted=0
ORDER BY biz_date DESC, id DESC
LIMIT 1
""",
new SugarParameter("@TenantId", row.TenantId),
new SugarParameter("@FactoryId", row.FactoryId),
new SugarParameter("@MetricCode", row.MetricCode),
new SugarParameter("@BizDate", statDate));
var actual = Math.Round(row.MetricValue.Value, 4);
var target = current?.TargetValue ?? prior?.TargetValue ?? DefaultS2Target(row.MetricCode);
var status = ResolveKpiStatus(actual, target, meta.Direction, meta.YellowThreshold, meta.RedThreshold);
var trend = ResolveTrendFlag(actual, prior?.MetricValue);
if (current != null)
{
return await _db.Ado.ExecuteCommandAsync(
$"""
UPDATE {table}
SET metric_value=@MetricValue, target_value=@TargetValue, status_color=@StatusColor, trend_flag=@TrendFlag,
is_active=1, status='ACTIVE', calc_time=@CalcTime, update_time=@CalcTime
WHERE tenant_id=@TenantId AND factory_id=@FactoryId AND module_code='S2'
AND metric_code=@MetricCode AND biz_date=@BizDate AND is_deleted=0
""",
new SugarParameter("@MetricValue", actual),
new SugarParameter("@TargetValue", target),
new SugarParameter("@StatusColor", status),
new SugarParameter("@TrendFlag", trend),
new SugarParameter("@CalcTime", now),
new SugarParameter("@TenantId", row.TenantId),
new SugarParameter("@FactoryId", row.FactoryId),
new SugarParameter("@MetricCode", row.MetricCode),
new SugarParameter("@BizDate", statDate));
}
var nextId = await _db.Ado.GetLongAsync($"SELECT COALESCE(MAX(id), 0) + 1 FROM {table}");
return await _db.Ado.ExecuteCommandAsync(
$"""
INSERT INTO {table}
(id, tenant_id, factory_id, status, biz_date, create_time, update_time, is_deleted, is_active,
module_code, metric_code, metric_value, target_value, status_color, trend_flag, calc_time)
VALUES
(@Id, @TenantId, @FactoryId, 'ACTIVE', @BizDate, @CalcTime, @CalcTime, 0, 1,
'S2', @MetricCode, @MetricValue, @TargetValue, @StatusColor, @TrendFlag, @CalcTime)
""",
new SugarParameter("@Id", nextId),
new SugarParameter("@TenantId", row.TenantId),
new SugarParameter("@FactoryId", row.FactoryId),
new SugarParameter("@BizDate", statDate),
new SugarParameter("@CalcTime", now),
new SugarParameter("@MetricCode", row.MetricCode),
new SugarParameter("@MetricValue", actual),
new SugarParameter("@TargetValue", target),
new SugarParameter("@StatusColor", status),
new SugarParameter("@TrendFlag", trend));
}
private async Task InsertSyncLogAsync(long entityId, string entityName, string batchId, int rowsRead)
{
await _db.Ado.ExecuteCommandAsync(
"""
INSERT INTO mdp_sync_log
(tenant_id, entity_id, source_code, entity_name, sync_batch_id, sync_type, trigger_type, sync_start, rows_read, status)
VALUES (0, @EntityId, 'AIDOPDEV_MYSQL', @EntityName, @BatchId, 'FULL', 'AUTO', NOW(), @RowsRead, 'RUNNING')
""",
new SugarParameter("@EntityId", entityId),
new SugarParameter("@EntityName", entityName),
new SugarParameter("@BatchId", batchId),
new SugarParameter("@RowsRead", rowsRead));
return await _db.Ado.GetLongAsync(
"SELECT id FROM mdp_sync_log WHERE sync_batch_id=@BatchId AND entity_id=@EntityId ORDER BY id DESC LIMIT 1",
new List { new("@BatchId", batchId), new("@EntityId", entityId) });
}
private async Task MarkSyncLogSuccessAsync(long logId, DateTime started, int affected)
{
await _db.Ado.ExecuteCommandAsync(
"""
UPDATE mdp_sync_log
SET sync_end=NOW(), duration_ms=@DurationMs, rows_insert=@RowsInsert, rows_update=0, rows_skip=0, rows_error=0, status='SUCCESS'
WHERE id=@Id
""",
new SugarParameter("@DurationMs", (int)(DateTime.Now - started).TotalMilliseconds),
new SugarParameter("@RowsInsert", affected),
new SugarParameter("@Id", logId));
}
private async Task MarkSyncLogFailedAsync(long logId, DateTime started, string message)
{
try
{
await _db.Ado.ExecuteCommandAsync(
"""
UPDATE mdp_sync_log
SET sync_end=NOW(), duration_ms=@DurationMs, rows_error=1, status='FAILED', error_msg=@ErrorMsg
WHERE id=@Id
""",
new SugarParameter("@DurationMs", (int)(DateTime.Now - started).TotalMilliseconds),
new SugarParameter("@ErrorMsg", Truncate(message, 1000)),
new SugarParameter("@Id", logId));
}
catch (Exception ex)
{
Console.Error.WriteLine($"[S2MdpSyncTransform] MarkSyncLogFailed write failed (syncLogId={logId}): {ex.Message}");
}
}
private async Task InsertTransformRunLogAsync(string batchId, DateTime startedAt, string triggerType)
{
await _db.Ado.ExecuteCommandAsync(
"""
INSERT INTO mdp_transform_run_log
(tenant_id, job_code, job_name, trigger_type, batch_id, status, start_time)
VALUES (0, 'S2_MDP_SYNC_TRANSFORM', 'S2 MDP同步与KPI计算', @TriggerType, @BatchId, 'RUNNING', @StartTime)
""",
new SugarParameter("@TriggerType", NormalizeTriggerType(triggerType)),
new SugarParameter("@BatchId", batchId),
new SugarParameter("@StartTime", startedAt));
return await _db.Ado.GetLongAsync(
"SELECT id FROM mdp_transform_run_log WHERE batch_id=@BatchId ORDER BY id DESC LIMIT 1",
new List { new("@BatchId", batchId) });
}
private async Task MarkTransformRunSuccessAsync(long runLogId, DateTime startedAt, S2MdpSyncTransformResult result)
{
var finishedAt = DateTime.Now;
await _db.Ado.ExecuteCommandAsync(
"""
UPDATE mdp_transform_run_log
SET status='SUCCESS', end_time=@EndTime, duration_ms=@DurationMs,
stage_rows=@StageRows, standard_rows=@StandardRows, dwd_rows=@DwdRows,
summary_json=@SummaryJson, update_time=CURRENT_TIMESTAMP
WHERE id=@Id
""",
new SugarParameter("@EndTime", finishedAt),
new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds),
new SugarParameter("@StageRows", result.StageRows),
new SugarParameter("@StandardRows", result.StandardRows),
new SugarParameter("@DwdRows", result.DwdRows),
new SugarParameter("@SummaryJson", BuildRunSummaryJson(result)),
new SugarParameter("@Id", runLogId));
}
private async Task MarkTransformRunFailedAsync(long runLogId, DateTime startedAt, string message)
{
try
{
var finishedAt = DateTime.Now;
await _db.Ado.ExecuteCommandAsync(
"""
UPDATE mdp_transform_run_log
SET status='FAILED', end_time=@EndTime, duration_ms=@DurationMs,
error_message=@ErrorMessage, update_time=CURRENT_TIMESTAMP
WHERE id=@Id
""",
new SugarParameter("@EndTime", finishedAt),
new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds),
new SugarParameter("@ErrorMessage", Truncate(message, 2000)),
new SugarParameter("@Id", runLogId));
}
catch (Exception ex)
{
Console.Error.WriteLine($"[S2MdpSyncTransform] MarkTransformRunFailed write failed (runLogId={runLogId}): {ex.Message}");
}
}
private static S2MdpSqlCommand Cmd(string sql, string batchId, DateTime now)
{
return new S2MdpSqlCommand(sql, new[]
{
new SugarParameter("@BatchId", batchId),
new SugarParameter("@Now", now),
new SugarParameter("@StatDate", now.Date)
});
}
private static string BuildJsonObjectExpression(IEnumerable columns)
{
var parts = columns.SelectMany(c => new[] { $"'{c.Replace("'", "''")}'", $"s.`{c}`" });
return $"JSON_OBJECT({string.Join(",", parts)})";
}
private static string FindColumn(IEnumerable columns, string expected)
{
return columns.First(u => string.Equals(u, expected, StringComparison.OrdinalIgnoreCase));
}
private static string NormalizeTriggerType(string? triggerType)
{
return string.IsNullOrWhiteSpace(triggerType) ? "AUTO" : triggerType.Trim().ToUpperInvariant();
}
private static string BuildRunSummaryJson(S2MdpSyncTransformResult result)
{
return $$"""{"batchId":"{{result.BatchId}}","stageRows":{{result.StageRows}},"standardRows":{{result.StandardRows}},"dwdRows":{{result.DwdRows}},"kpiRows":{{result.KpiRows}}}""";
}
private static string ResolveKpiValueTable(int metricLevel)
{
return metricLevel switch
{
1 => "ado_s9_kpi_value_l1_day",
2 => "ado_s9_kpi_value_l2_day",
3 => "ado_s9_kpi_value_l3_day",
4 => "ado_s9_kpi_value_l4_day",
_ => "ado_s9_kpi_value_l2_day"
};
}
private static decimal DefaultS2Target(string metricCode)
{
return metricCode switch
{
"S2_L1_001" => 20m,
"S2_L1_002" => 99m,
"S2_L1_003" => 20m,
"S2_L1_004" => 18m,
"S2_L2_001" => 1.2m,
"S2_L2_002" => 95m,
"S2_L2_003" => 20m,
_ => 0m
};
}
private static string ResolveKpiStatus(decimal actual, decimal target, string? direction, decimal? yellowThreshold, decimal? redThreshold)
{
if (target <= 0) return "gray";
var ratio = actual / target * 100m;
if (string.Equals(direction, "lower_is_better", StringComparison.OrdinalIgnoreCase))
{
if (actual <= target) return "green";
if (ratio <= (yellowThreshold ?? 110m)) return "yellow";
return ratio >= (redThreshold ?? 120m) ? "red" : "yellow";
}
if (actual >= target) return "green";
if (ratio >= (yellowThreshold ?? 95m)) return "yellow";
return ratio <= (redThreshold ?? 80m) ? "red" : "yellow";
}
private static string ResolveTrendFlag(decimal actual, decimal? previous)
{
if (previous == null) return "flat";
if (actual > previous.Value) return "up";
if (actual < previous.Value) return "down";
return "flat";
}
private static string Truncate(string? raw, int maxLength)
{
if (string.IsNullOrEmpty(raw)) return string.Empty;
return raw.Length <= maxLength ? raw : raw[..maxLength];
}
private sealed class S2ColumnRow
{
public string ColumnName { get; set; } = string.Empty;
}
private sealed class S2MdpEntityRow
{
public long Id { get; set; }
public string EntityName { get; set; } = string.Empty;
}
private sealed class S2KpiCalcRow
{
public long TenantId { get; set; }
public long FactoryId { get; set; }
public string MetricCode { get; set; } = string.Empty;
public decimal? MetricValue { get; set; }
}
private sealed class S2KpiMetaRow
{
public int MetricLevel { get; set; }
public string Direction { get; set; } = "higher_is_better";
public decimal? YellowThreshold { get; set; }
public decimal? RedThreshold { get; set; }
}
private sealed class S2KpiValueRow
{
public long Id { get; set; }
public decimal? MetricValue { get; set; }
public decimal? TargetValue { get; set; }
}
}
public sealed class S2MdpSyncTransformResult
{
public long RunLogId { get; set; }
public string BatchId { get; set; } = string.Empty;
public int StageRows { get; set; }
public int StandardRows { get; set; }
public int DwdRows { get; set; }
public int KpiRows { get; set; }
}
internal sealed record S2MdpSqlCommand(string Sql, SugarParameter[] Parameters);
internal sealed record S2MdpEntityConfig(
string EntityCode,
string SourceTable,
string TargetTable,
string SourceRowIdExpression,
string SourceBizKeyExpression)
{
public static readonly IReadOnlyList All = new List
{
new("S2_WORK_ORDER_MASTER", "WorkOrdMaster", "mdp_stg_schedule", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`WorkOrd`,''))"),
new("S2_WORK_ORDER_ROUTING", "WorkOrdRouting", "mdp_stg_schedule", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`WorkOrd`,''), ':', IFNULL(s.`OP`,''))"),
new("S2_WORK_ORDER_DETAIL", "WorkOrdDetail", "mdp_stg_schedule", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`WorkOrd`,''), ':', IFNULL(s.`Op`,''), ':', IFNULL(s.`ItemNum`,''))"),
new("S2_PERIOD_SEQUENCE_DET", "PeriodSequenceDet", "mdp_stg_schedule", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`WorkOrds`,''), ':', IFNULL(s.`Op`,''), ':', IFNULL(s.`Sequence`,''))"),
new("S2_SCHEDULE_RESULT_OP", "ScheduleResultOpMaster", "mdp_stg_schedule", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`WorkOrd`,''), ':', IFNULL(s.`Op`,''), ':', IFNULL(s.`WorkDate`,''))")
};
}
internal static class S2MdpDdl
{
public static readonly IReadOnlyList SqlBlocks = new[]
{
"""
CREATE TABLE IF NOT EXISTS mdp_stg_schedule (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
tenant_id BIGINT NOT NULL DEFAULT 0,
factory_id VARCHAR(64) NULL,
source_system VARCHAR(50) NOT NULL DEFAULT 'AIDOP',
source_table VARCHAR(100) NOT NULL,
source_row_id VARCHAR(100) NOT NULL,
source_biz_key VARCHAR(200) NULL,
sync_batch_id VARCHAR(100) NOT NULL,
sync_time DATETIME NOT NULL,
process_status VARCHAR(20) NOT NULL DEFAULT 'PENDING',
raw_data JSON NOT NULL,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uk_mdp_stg_schedule (tenant_id, source_table, source_row_id),
KEY idx_mdp_stg_schedule_batch (sync_batch_id),
KEY idx_mdp_stg_schedule_biz (source_biz_key)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='S2生产排程贴源层';
""",
"""
CREATE TABLE IF NOT EXISTS mdp_std_work_order_schedule (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
tenant_id BIGINT NOT NULL DEFAULT 0,
factory_id BIGINT NULL,
source_system VARCHAR(50) NOT NULL DEFAULT 'AIDOP',
work_order VARCHAR(100) NOT NULL,
sales_order_no VARCHAR(100) NULL,
item_code VARCHAR(100) NULL,
item_name VARCHAR(200) NULL,
site_code VARCHAR(50) NULL,
status VARCHAR(50) NULL,
priority DECIMAL(18,6) NULL,
urgent_flag TINYINT NOT NULL DEFAULT 0,
qty_ordered DECIMAL(18,6) NULL,
qty_completed DECIMAL(18,6) NULL,
order_date DATETIME NULL,
due_date DATETIME NULL,
release_date DATETIME NULL,
prod_line VARCHAR(100) NULL,
source_biz_key VARCHAR(200) NULL,
sync_batch_id VARCHAR(100) NOT NULL,
sync_time DATETIME NOT NULL,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uk_std_work_order_schedule (tenant_id, work_order),
KEY idx_std_work_order_schedule_batch (sync_batch_id),
KEY idx_std_work_order_schedule_due (tenant_id, due_date)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='S2标准工单排程';
""",
"""
CREATE TABLE IF NOT EXISTS mdp_std_operation_schedule (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
tenant_id BIGINT NOT NULL DEFAULT 0,
factory_id BIGINT NULL,
source_system VARCHAR(50) NOT NULL DEFAULT 'AIDOP',
work_order VARCHAR(100) NOT NULL,
op_no VARCHAR(50) NULL,
work_center VARCHAR(100) NULL,
line_code VARCHAR(100) NULL,
item_code VARCHAR(100) NULL,
plan_date DATETIME NULL,
prod_date DATETIME NULL,
start_time DATETIME NULL,
end_time DATETIME NULL,
ord_qty DECIMAL(18,6) NULL,
comp_qty DECIMAL(18,6) NULL,
run_crew DECIMAL(18,6) NULL,
employee VARCHAR(200) NULL,
source_biz_key VARCHAR(200) NULL,
sync_batch_id VARCHAR(100) NOT NULL,
sync_time DATETIME NOT NULL,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uk_std_operation_schedule (tenant_id, source_biz_key),
KEY idx_std_operation_schedule_work_order (tenant_id, work_order),
KEY idx_std_operation_schedule_batch (sync_batch_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='S2标准工序排程';
""",
"""
CREATE TABLE IF NOT EXISTS dwd_order_schedule_trans (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
tenant_id BIGINT NOT NULL DEFAULT 0,
factory_id BIGINT NOT NULL DEFAULT 1,
stat_date DATE NOT NULL,
work_order VARCHAR(100) NOT NULL,
sales_order_no VARCHAR(100) NULL,
item_code VARCHAR(100) NULL,
item_name VARCHAR(200) NULL,
site_code VARCHAR(50) NULL,
prod_line VARCHAR(100) NULL,
status VARCHAR(50) NULL,
urgent_flag TINYINT NOT NULL DEFAULT 0,
qty_ordered DECIMAL(18,6) NULL,
qty_completed DECIMAL(18,6) NULL,
order_date DATETIME NULL,
due_date DATETIME NULL,
release_date DATETIME NULL,
first_plan_date DATETIME NULL,
last_plan_date DATETIME NULL,
first_start_time DATETIME NULL,
last_end_time DATETIME NULL,
operation_count INT NOT NULL DEFAULT 0,
scheduled_qty DECIMAL(18,6) NULL,
completed_op_qty DECIMAL(18,6) NULL,
schedule_cycle_days DECIMAL(18,6) NULL,
schedule_satisfaction_flag TINYINT NOT NULL DEFAULT 0,
wip_qty DECIMAL(18,6) NULL,
resource_person_count DECIMAL(18,6) NULL,
calc_batch_id VARCHAR(100) NOT NULL,
calc_time DATETIME NOT NULL,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uk_dwd_order_schedule_trans (tenant_id, work_order, calc_batch_id),
KEY idx_dwd_order_schedule_trans_batch (calc_batch_id),
KEY idx_dwd_order_schedule_trans_stat (tenant_id, stat_date),
KEY idx_dwd_order_schedule_trans_order (tenant_id, sales_order_no)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='S2订单工单排程DWD';
""",
"""
INSERT INTO mdp_entity
(tenant_id, source_id, entity_code, entity_name, entity_type, source_table_name, target_table_name, sync_mode, batch_size, status, remark)
SELECT 0, s.id, v.entity_code, v.entity_name, 'TABLE', v.source_table_name, 'mdp_stg_schedule', 'FULL', 5000, 1, v.remark
FROM mdp_source s
JOIN (
SELECT 'S2_WORK_ORDER_MASTER' AS entity_code, 'S2工单主数据' AS entity_name, 'WorkOrdMaster' AS source_table_name, '工单主数据进入 S2 贴源层' AS remark
UNION ALL SELECT 'S2_WORK_ORDER_ROUTING', 'S2工单工艺路线', 'WorkOrdRouting', '工单工艺路线进入 S2 贴源层'
UNION ALL SELECT 'S2_WORK_ORDER_DETAIL', 'S2工单物料明细', 'WorkOrdDetail', '工单物料需求进入 S2 贴源层'
UNION ALL SELECT 'S2_PERIOD_SEQUENCE_DET', 'S2工序排程计划', 'PeriodSequenceDet', '工序间衔接与排程计划进入 S2 贴源层'
UNION ALL SELECT 'S2_SCHEDULE_RESULT_OP', 'S2工序排产结果', 'ScheduleResultOpMaster', '工序排产结果进入 S2 贴源层'
) v
WHERE s.tenant_id=0 AND s.source_code='AIDOPDEV_MYSQL'
ON DUPLICATE KEY UPDATE
source_id=VALUES(source_id), entity_name=VALUES(entity_name), entity_type=VALUES(entity_type),
source_table_name=VALUES(source_table_name), target_table_name=VALUES(target_table_name),
sync_mode=VALUES(sync_mode), batch_size=VALUES(batch_size), status=VALUES(status),
remark=VALUES(remark), update_time=CURRENT_TIMESTAMP;
"""
};
}