namespace Admin.NET.Plugin.AiDOP.Production; /// /// S2 生产排程 MDP 同步、标准化、DWD 与 KPI 计算服务。 /// public class S2MdpSyncTransformService : ITransient { private const string JobCode = "S2_MDP_SYNC_TRANSFORM"; private readonly ISqlSugarClient _db; public S2MdpSyncTransformService(ISqlSugarClient db) { _db = db; } public async Task RunFullAsync(CancellationToken cancellationToken = default, string triggerType = "AUTO") { cancellationToken.ThrowIfCancellationRequested(); var now = DateTime.Now; var batchId = $"S2_MDP_FULL_{now:yyyyMMddHHmmss}"; var runLogId = await InsertTransformRunLogAsync(batchId, now, triggerType); var result = new S2MdpSyncTransformResult { BatchId = batchId, RunLogId = runLogId }; try { await EnsureS2RuntimeObjectsAsync(); result.StageRows = await SyncStagingAsync(batchId, now, cancellationToken); result.StandardRows = await TransformStandardAsync(batchId, now, cancellationToken); result.DwdRows = await BuildDwdAsync(batchId, now, cancellationToken); result.KpiRows = await BuildS2KpiValuesAsync(batchId, now, cancellationToken); await MarkTransformRunSuccessAsync(runLogId, now, result); return result; } catch (Exception ex) { await MarkTransformRunFailedAsync(runLogId, now, ex.Message); throw; } } private async Task EnsureS2RuntimeObjectsAsync() { foreach (var sql in S2MdpDdl.SqlBlocks) { await _db.Ado.ExecuteCommandAsync(sql); } } private async Task SyncStagingAsync(string batchId, DateTime now, CancellationToken cancellationToken) { var total = 0; foreach (var entity in S2MdpEntityConfig.All) { cancellationToken.ThrowIfCancellationRequested(); total += await SyncOneEntityAsync(entity, batchId, now); } return total; } private async Task SyncOneEntityAsync(S2MdpEntityConfig entity, string batchId, DateTime now) { var entityRow = await _db.Ado.SqlQuerySingleAsync( "SELECT id AS Id, entity_name AS EntityName FROM mdp_entity WHERE tenant_id=0 AND entity_code=@EntityCode LIMIT 1", new SugarParameter("@EntityCode", entity.EntityCode)); if (entityRow == null) throw Oops.Oh($"未找到 MDP 实体配置:{entity.EntityCode}"); var columns = await _db.Ado.SqlQueryAsync( """ SELECT COLUMN_NAME AS ColumnName FROM information_schema.COLUMNS WHERE TABLE_SCHEMA=DATABASE() AND TABLE_NAME=@TableName ORDER BY ORDINAL_POSITION """, new SugarParameter("@TableName", entity.SourceTable)); if (columns.Count == 0) throw Oops.Oh($"未找到源表:{entity.SourceTable}"); var names = columns.Select(u => u.ColumnName).ToList(); var tenantExpr = names.Any(u => string.Equals(u, "tenant_id", StringComparison.OrdinalIgnoreCase)) ? $"IFNULL(s.`{FindColumn(names, "tenant_id")}`,0)" : "0"; var factoryExpr = names.Any(u => string.Equals(u, "factory_id", StringComparison.OrdinalIgnoreCase)) ? $"s.`{FindColumn(names, "factory_id")}`" : "NULL"; var domainExpr = names.Any(u => string.Equals(u, "Domain", StringComparison.OrdinalIgnoreCase)) ? $"s.`{FindColumn(names, "Domain")}`" : "NULL"; var sourceRowExpr = names.Any(u => string.Equals(u, entity.SourceRowIdExpression, StringComparison.OrdinalIgnoreCase)) ? $"s.`{FindColumn(names, entity.SourceRowIdExpression)}`" : entity.SourceRowIdExpression; var rawDataExpr = BuildJsonObjectExpression(names); var rowsRead = await _db.Ado.GetIntAsync($"SELECT COUNT(1) FROM `{entity.SourceTable}`"); var logId = await InsertSyncLogAsync(entityRow.Id, entityRow.EntityName, batchId, rowsRead); var started = DateTime.Now; try { var affected = await _db.Ado.ExecuteCommandAsync( $""" INSERT INTO `{entity.TargetTable}` (tenant_id, factory_id, source_system, source_table, source_row_id, source_biz_key, sync_batch_id, sync_time, process_status, raw_data) SELECT {tenantExpr}, COALESCE({factoryExpr}, {domainExpr}), 'AIDOP', @SourceTable, CAST({sourceRowExpr} AS CHAR), CAST(COALESCE({entity.SourceBizKeyExpression}, CAST({sourceRowExpr} AS CHAR)) AS CHAR), @BatchId, @Now, 'PENDING', {rawDataExpr} FROM `{entity.SourceTable}` s ON DUPLICATE KEY UPDATE tenant_id=VALUES(tenant_id), factory_id=VALUES(factory_id), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time), process_status=VALUES(process_status), raw_data=VALUES(raw_data), update_time=CURRENT_TIMESTAMP """, new SugarParameter("@SourceTable", entity.SourceTable), new SugarParameter("@BatchId", batchId), new SugarParameter("@Now", now)); await MarkSyncLogSuccessAsync(logId, started, affected); return rowsRead; } catch (Exception ex) { await MarkSyncLogFailedAsync(logId, started, ex.Message); throw; } } private async Task TransformStandardAsync(string batchId, DateTime now, CancellationToken cancellationToken) { var total = 0; foreach (var command in BuildStandardCommands(batchId, now)) { cancellationToken.ThrowIfCancellationRequested(); total += await _db.Ado.ExecuteCommandAsync(command.Sql, command.Parameters); } return total; } private async Task BuildDwdAsync(string batchId, DateTime now, CancellationToken cancellationToken) { var total = 0; foreach (var command in BuildDwdCommands(batchId, now)) { cancellationToken.ThrowIfCancellationRequested(); total += await _db.Ado.ExecuteCommandAsync(command.Sql, command.Parameters); } return total; } private async Task BuildS2KpiValuesAsync(string batchId, DateTime now, CancellationToken cancellationToken) { var rows = await CalculateS2KpiValuesAsync(batchId, now.Date); var affected = 0; foreach (var row in rows) { cancellationToken.ThrowIfCancellationRequested(); affected += await UpsertS2KpiValueAsync(row, now.Date, now); } return affected; } private IEnumerable BuildStandardCommands(string batchId, DateTime now) { yield return Cmd( """ INSERT INTO mdp_std_work_order_schedule (tenant_id, factory_id, source_system, work_order, sales_order_no, item_code, item_name, site_code, status, priority, urgent_flag, qty_ordered, qty_completed, order_date, due_date, release_date, prod_line, source_biz_key, sync_batch_id, sync_time) SELECT tenant_id, CASE WHEN factory_id REGEXP '^[0-9]+$' THEN CAST(factory_id AS UNSIGNED) ELSE 1 END, 'AIDOP', JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkOrd')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.SalesJob')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ItemNum')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ItemName')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Site')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Status')), CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Priority')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Priority')) AS DECIMAL(18,6)) END, CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Urgent')) IN ('1','true','True') THEN 1 ELSE 0 END, CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.QtyOrded')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.QtyOrded')) AS DECIMAL(18,6)) ELSE 0 END, CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.QtyCompleted')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.QtyCompleted')) AS DECIMAL(18,6)) ELSE 0 END, NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.OrdDate')), 'null'), ''), NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.DueDate')), 'null'), ''), NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ReleaseDate')), 'null'), ''), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ProdLine')), source_biz_key, @BatchId, @Now FROM mdp_stg_schedule WHERE source_table='WorkOrdMaster' AND IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkOrd')), '') <> '' ON DUPLICATE KEY UPDATE sales_order_no=VALUES(sales_order_no), item_code=VALUES(item_code), item_name=VALUES(item_name), site_code=VALUES(site_code), status=VALUES(status), priority=VALUES(priority), urgent_flag=VALUES(urgent_flag), qty_ordered=VALUES(qty_ordered), qty_completed=VALUES(qty_completed), order_date=VALUES(order_date), due_date=VALUES(due_date), release_date=VALUES(release_date), prod_line=VALUES(prod_line), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP """, batchId, now); yield return Cmd( """ INSERT INTO mdp_std_operation_schedule (tenant_id, factory_id, source_system, work_order, op_no, work_center, line_code, item_code, plan_date, prod_date, start_time, end_time, ord_qty, comp_qty, run_crew, employee, source_biz_key, sync_batch_id, sync_time) SELECT tenant_id, CASE WHEN factory_id REGEXP '^[0-9]+$' THEN CAST(factory_id AS UNSIGNED) ELSE 1 END, 'AIDOP', JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkOrds')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Op')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkCtr')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Line')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ItemNum')), NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.PlanDate')), 'null'), ''), NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ProdDate')), 'null'), ''), NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.StartTime')), 'null'), ''), NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.EndTime')), 'null'), ''), CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.OrdQty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.OrdQty')) AS DECIMAL(18,6)) ELSE 0 END, CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.CompQty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.CompQty')) AS DECIMAL(18,6)) ELSE 0 END, CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.RunCrew')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.RunCrew')) AS DECIMAL(18,6)) END, JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Employee')), source_biz_key, @BatchId, @Now FROM mdp_stg_schedule WHERE source_table='PeriodSequenceDet' AND IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkOrds')), '') <> '' ON DUPLICATE KEY UPDATE work_center=VALUES(work_center), line_code=VALUES(line_code), item_code=VALUES(item_code), plan_date=VALUES(plan_date), prod_date=VALUES(prod_date), start_time=VALUES(start_time), end_time=VALUES(end_time), ord_qty=VALUES(ord_qty), comp_qty=VALUES(comp_qty), run_crew=VALUES(run_crew), employee=VALUES(employee), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP """, batchId, now); yield return Cmd( """ INSERT INTO mdp_std_operation_schedule (tenant_id, factory_id, source_system, work_order, op_no, work_center, line_code, item_code, plan_date, prod_date, start_time, end_time, ord_qty, comp_qty, run_crew, employee, source_biz_key, sync_batch_id, sync_time) SELECT tenant_id, CASE WHEN factory_id REGEXP '^[0-9]+$' THEN CAST(factory_id AS UNSIGNED) ELSE 1 END, 'AIDOP', JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkOrd')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Op')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkCtr')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.Line')), JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.ItemNum')), NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkDate')), 'null'), ''), NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkDate')), 'null'), ''), NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkStartTime')), 'null'), ''), NULLIF(NULLIF(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkEndTime')), 'null'), ''), CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkQty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkQty')) AS DECIMAL(18,6)) ELSE 0 END, CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkQty')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkQty')) AS DECIMAL(18,6)) ELSE 0 END, CASE WHEN JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.AssignedPersonnelCount')) REGEXP '^-?[0-9]+(\\.[0-9]+)?$' THEN CAST(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.AssignedPersonnelCount')) AS DECIMAL(18,6)) END, JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.AssignedEmployeeID')), source_biz_key, @BatchId, @Now FROM mdp_stg_schedule WHERE source_table='ScheduleResultOpMaster' AND IFNULL(JSON_UNQUOTE(JSON_EXTRACT(raw_data,'$.WorkOrd')), '') <> '' ON DUPLICATE KEY UPDATE work_center=VALUES(work_center), line_code=VALUES(line_code), item_code=VALUES(item_code), plan_date=VALUES(plan_date), prod_date=VALUES(prod_date), start_time=VALUES(start_time), end_time=VALUES(end_time), ord_qty=VALUES(ord_qty), comp_qty=VALUES(comp_qty), run_crew=VALUES(run_crew), employee=VALUES(employee), sync_batch_id=VALUES(sync_batch_id), sync_time=VALUES(sync_time), update_time=CURRENT_TIMESTAMP """, batchId, now); } private IEnumerable BuildDwdCommands(string batchId, DateTime now) { yield return Cmd( """ INSERT INTO dwd_order_schedule_trans (tenant_id, factory_id, stat_date, work_order, sales_order_no, item_code, item_name, site_code, prod_line, status, urgent_flag, qty_ordered, qty_completed, order_date, due_date, release_date, first_plan_date, last_plan_date, first_start_time, last_end_time, operation_count, scheduled_qty, completed_op_qty, schedule_cycle_days, schedule_satisfaction_flag, wip_qty, resource_person_count, calc_batch_id, calc_time) SELECT w.tenant_id, COALESCE(w.factory_id, 1), @StatDate, w.work_order, w.sales_order_no, w.item_code, w.item_name, w.site_code, w.prod_line, w.status, w.urgent_flag, w.qty_ordered, w.qty_completed, w.order_date, w.due_date, w.release_date, MIN(o.plan_date), MAX(o.plan_date), MIN(o.start_time), MAX(o.end_time), COUNT(o.id), SUM(IFNULL(o.ord_qty, 0)), SUM(IFNULL(o.comp_qty, 0)), CASE WHEN COALESCE(MIN(o.plan_date), MAX(o.plan_date)) IS NOT NULL AND COALESCE(w.release_date, w.order_date) IS NOT NULL THEN TIMESTAMPDIFF(HOUR, COALESCE(w.release_date, w.order_date), COALESCE(MAX(o.plan_date), MIN(o.plan_date))) / 24 END, CASE WHEN w.due_date IS NOT NULL AND COALESCE(MAX(o.plan_date), MIN(o.plan_date)) IS NOT NULL AND DATE(COALESCE(MAX(o.plan_date), MIN(o.plan_date))) <= DATE(w.due_date) THEN 1 ELSE 0 END, GREATEST(IFNULL(w.qty_ordered, 0) - IFNULL(w.qty_completed, 0), 0), SUM(IFNULL(o.run_crew, 0)), @BatchId, @Now FROM mdp_std_work_order_schedule w LEFT JOIN mdp_std_operation_schedule o ON o.tenant_id=w.tenant_id AND IFNULL(o.work_order,'')=IFNULL(w.work_order,'') WHERE IFNULL(w.work_order, '') <> '' GROUP BY w.tenant_id, COALESCE(w.factory_id, 1), w.work_order, w.sales_order_no, w.item_code, w.item_name, w.site_code, w.prod_line, w.status, w.urgent_flag, w.qty_ordered, w.qty_completed, w.order_date, w.due_date, w.release_date ON DUPLICATE KEY UPDATE sales_order_no=VALUES(sales_order_no), item_code=VALUES(item_code), item_name=VALUES(item_name), site_code=VALUES(site_code), prod_line=VALUES(prod_line), status=VALUES(status), urgent_flag=VALUES(urgent_flag), qty_ordered=VALUES(qty_ordered), qty_completed=VALUES(qty_completed), order_date=VALUES(order_date), due_date=VALUES(due_date), release_date=VALUES(release_date), first_plan_date=VALUES(first_plan_date), last_plan_date=VALUES(last_plan_date), first_start_time=VALUES(first_start_time), last_end_time=VALUES(last_end_time), operation_count=VALUES(operation_count), scheduled_qty=VALUES(scheduled_qty), completed_op_qty=VALUES(completed_op_qty), schedule_cycle_days=VALUES(schedule_cycle_days), schedule_satisfaction_flag=VALUES(schedule_satisfaction_flag), wip_qty=VALUES(wip_qty), resource_person_count=VALUES(resource_person_count), calc_time=VALUES(calc_time), update_time=CURRENT_TIMESTAMP """, batchId, now); } private async Task> CalculateS2KpiValuesAsync(string batchId, DateTime statDate) { return await _db.Ado.SqlQueryAsync( """ SELECT tenant_id AS TenantId, factory_id AS FactoryId, 'S2_L1_001' AS MetricCode, ROUND(AVG(schedule_cycle_days), 4) AS MetricValue FROM dwd_order_schedule_trans WHERE calc_batch_id=@BatchId AND schedule_cycle_days IS NOT NULL AND schedule_cycle_days >= 0 GROUP BY tenant_id, factory_id UNION ALL SELECT tenant_id AS TenantId, factory_id AS FactoryId, 'S2_L1_002' AS MetricCode, ROUND(100 * SUM(schedule_satisfaction_flag) / NULLIF(COUNT(1), 0), 4) AS MetricValue FROM dwd_order_schedule_trans WHERE calc_batch_id=@BatchId GROUP BY tenant_id, factory_id UNION ALL SELECT tenant_id AS TenantId, factory_id AS FactoryId, 'S2_L1_003' AS MetricCode, ROUND(COUNT(1) / NULLIF(SUM(CASE WHEN IFNULL(resource_person_count, 0) > 0 THEN resource_person_count ELSE 1 END), 0), 4) AS MetricValue FROM dwd_order_schedule_trans WHERE calc_batch_id=@BatchId GROUP BY tenant_id, factory_id UNION ALL SELECT tenant_id AS TenantId, factory_id AS FactoryId, 'S2_L1_004' AS MetricCode, ROUND(SUM(wip_qty) / NULLIF(SUM(CASE WHEN qty_completed > 0 THEN qty_completed ELSE completed_op_qty END), 0) * 30, 4) AS MetricValue FROM dwd_order_schedule_trans WHERE calc_batch_id=@BatchId GROUP BY tenant_id, factory_id UNION ALL SELECT tenant_id AS TenantId, factory_id AS FactoryId, 'S2_L2_001' AS MetricCode, ROUND(AVG(schedule_cycle_days), 4) AS MetricValue FROM dwd_order_schedule_trans WHERE calc_batch_id=@BatchId AND schedule_cycle_days IS NOT NULL AND schedule_cycle_days >= 0 GROUP BY tenant_id, factory_id UNION ALL SELECT tenant_id AS TenantId, factory_id AS FactoryId, 'S2_L2_002' AS MetricCode, ROUND(100 * SUM(schedule_satisfaction_flag) / NULLIF(COUNT(1), 0), 4) AS MetricValue FROM dwd_order_schedule_trans WHERE calc_batch_id=@BatchId GROUP BY tenant_id, factory_id UNION ALL SELECT tenant_id AS TenantId, factory_id AS FactoryId, 'S2_L2_003' AS MetricCode, ROUND(COUNT(1) / NULLIF(SUM(CASE WHEN IFNULL(resource_person_count, 0) > 0 THEN resource_person_count ELSE 1 END), 0), 4) AS MetricValue FROM dwd_order_schedule_trans WHERE calc_batch_id=@BatchId GROUP BY tenant_id, factory_id """, new SugarParameter("@BatchId", batchId), new SugarParameter("@StatDate", statDate)); } private async Task UpsertS2KpiValueAsync(S2KpiCalcRow row, DateTime statDate, DateTime now) { if (row.MetricValue == null) return 0; var meta = await _db.Ado.SqlQuerySingleAsync( """ SELECT MetricLevel, Direction, YellowThreshold, RedThreshold FROM ado_smart_ops_kpi_master WHERE TenantId=@TenantId AND ModuleCode='S2' AND MetricCode=@MetricCode AND IsEnabled=1 LIMIT 1 """, new SugarParameter("@TenantId", row.TenantId), new SugarParameter("@MetricCode", row.MetricCode)); if (meta == null) return 0; var table = ResolveKpiValueTable(meta.MetricLevel); var current = await _db.Ado.SqlQuerySingleAsync( $""" SELECT id AS Id, metric_value AS MetricValue, target_value AS TargetValue FROM {table} WHERE tenant_id=@TenantId AND factory_id=@FactoryId AND module_code='S2' AND metric_code=@MetricCode AND biz_date=@BizDate AND is_deleted=0 ORDER BY id LIMIT 1 """, new SugarParameter("@TenantId", row.TenantId), new SugarParameter("@FactoryId", row.FactoryId), new SugarParameter("@MetricCode", row.MetricCode), new SugarParameter("@BizDate", statDate)); var prior = await _db.Ado.SqlQuerySingleAsync( $""" SELECT id AS Id, metric_value AS MetricValue, target_value AS TargetValue FROM {table} WHERE tenant_id=@TenantId AND factory_id=@FactoryId AND module_code='S2' AND metric_code=@MetricCode AND biz_date<@BizDate AND is_deleted=0 ORDER BY biz_date DESC, id DESC LIMIT 1 """, new SugarParameter("@TenantId", row.TenantId), new SugarParameter("@FactoryId", row.FactoryId), new SugarParameter("@MetricCode", row.MetricCode), new SugarParameter("@BizDate", statDate)); var actual = Math.Round(row.MetricValue.Value, 4); var target = current?.TargetValue ?? prior?.TargetValue ?? DefaultS2Target(row.MetricCode); var status = ResolveKpiStatus(actual, target, meta.Direction, meta.YellowThreshold, meta.RedThreshold); var trend = ResolveTrendFlag(actual, prior?.MetricValue); if (current != null) { return await _db.Ado.ExecuteCommandAsync( $""" UPDATE {table} SET metric_value=@MetricValue, target_value=@TargetValue, status_color=@StatusColor, trend_flag=@TrendFlag, is_active=1, status='ACTIVE', calc_time=@CalcTime, update_time=@CalcTime WHERE tenant_id=@TenantId AND factory_id=@FactoryId AND module_code='S2' AND metric_code=@MetricCode AND biz_date=@BizDate AND is_deleted=0 """, new SugarParameter("@MetricValue", actual), new SugarParameter("@TargetValue", target), new SugarParameter("@StatusColor", status), new SugarParameter("@TrendFlag", trend), new SugarParameter("@CalcTime", now), new SugarParameter("@TenantId", row.TenantId), new SugarParameter("@FactoryId", row.FactoryId), new SugarParameter("@MetricCode", row.MetricCode), new SugarParameter("@BizDate", statDate)); } var nextId = await _db.Ado.GetLongAsync($"SELECT COALESCE(MAX(id), 0) + 1 FROM {table}"); return await _db.Ado.ExecuteCommandAsync( $""" INSERT INTO {table} (id, tenant_id, factory_id, status, biz_date, create_time, update_time, is_deleted, is_active, module_code, metric_code, metric_value, target_value, status_color, trend_flag, calc_time) VALUES (@Id, @TenantId, @FactoryId, 'ACTIVE', @BizDate, @CalcTime, @CalcTime, 0, 1, 'S2', @MetricCode, @MetricValue, @TargetValue, @StatusColor, @TrendFlag, @CalcTime) """, new SugarParameter("@Id", nextId), new SugarParameter("@TenantId", row.TenantId), new SugarParameter("@FactoryId", row.FactoryId), new SugarParameter("@BizDate", statDate), new SugarParameter("@CalcTime", now), new SugarParameter("@MetricCode", row.MetricCode), new SugarParameter("@MetricValue", actual), new SugarParameter("@TargetValue", target), new SugarParameter("@StatusColor", status), new SugarParameter("@TrendFlag", trend)); } private async Task InsertSyncLogAsync(long entityId, string entityName, string batchId, int rowsRead) { await _db.Ado.ExecuteCommandAsync( """ INSERT INTO mdp_sync_log (tenant_id, entity_id, source_code, entity_name, sync_batch_id, sync_type, trigger_type, sync_start, rows_read, status) VALUES (0, @EntityId, 'AIDOPDEV_MYSQL', @EntityName, @BatchId, 'FULL', 'AUTO', NOW(), @RowsRead, 'RUNNING') """, new SugarParameter("@EntityId", entityId), new SugarParameter("@EntityName", entityName), new SugarParameter("@BatchId", batchId), new SugarParameter("@RowsRead", rowsRead)); return await _db.Ado.GetLongAsync( "SELECT id FROM mdp_sync_log WHERE sync_batch_id=@BatchId AND entity_id=@EntityId ORDER BY id DESC LIMIT 1", new List { new("@BatchId", batchId), new("@EntityId", entityId) }); } private async Task MarkSyncLogSuccessAsync(long logId, DateTime started, int affected) { await _db.Ado.ExecuteCommandAsync( """ UPDATE mdp_sync_log SET sync_end=NOW(), duration_ms=@DurationMs, rows_insert=@RowsInsert, rows_update=0, rows_skip=0, rows_error=0, status='SUCCESS' WHERE id=@Id """, new SugarParameter("@DurationMs", (int)(DateTime.Now - started).TotalMilliseconds), new SugarParameter("@RowsInsert", affected), new SugarParameter("@Id", logId)); } private async Task MarkSyncLogFailedAsync(long logId, DateTime started, string message) { try { await _db.Ado.ExecuteCommandAsync( """ UPDATE mdp_sync_log SET sync_end=NOW(), duration_ms=@DurationMs, rows_error=1, status='FAILED', error_msg=@ErrorMsg WHERE id=@Id """, new SugarParameter("@DurationMs", (int)(DateTime.Now - started).TotalMilliseconds), new SugarParameter("@ErrorMsg", Truncate(message, 1000)), new SugarParameter("@Id", logId)); } catch (Exception ex) { Console.Error.WriteLine($"[S2MdpSyncTransform] MarkSyncLogFailed write failed (syncLogId={logId}): {ex.Message}"); } } private async Task InsertTransformRunLogAsync(string batchId, DateTime startedAt, string triggerType) { await _db.Ado.ExecuteCommandAsync( """ INSERT INTO mdp_transform_run_log (tenant_id, job_code, job_name, trigger_type, batch_id, status, start_time) VALUES (0, 'S2_MDP_SYNC_TRANSFORM', 'S2 MDP同步与KPI计算', @TriggerType, @BatchId, 'RUNNING', @StartTime) """, new SugarParameter("@TriggerType", NormalizeTriggerType(triggerType)), new SugarParameter("@BatchId", batchId), new SugarParameter("@StartTime", startedAt)); return await _db.Ado.GetLongAsync( "SELECT id FROM mdp_transform_run_log WHERE batch_id=@BatchId ORDER BY id DESC LIMIT 1", new List { new("@BatchId", batchId) }); } private async Task MarkTransformRunSuccessAsync(long runLogId, DateTime startedAt, S2MdpSyncTransformResult result) { var finishedAt = DateTime.Now; await _db.Ado.ExecuteCommandAsync( """ UPDATE mdp_transform_run_log SET status='SUCCESS', end_time=@EndTime, duration_ms=@DurationMs, stage_rows=@StageRows, standard_rows=@StandardRows, dwd_rows=@DwdRows, summary_json=@SummaryJson, update_time=CURRENT_TIMESTAMP WHERE id=@Id """, new SugarParameter("@EndTime", finishedAt), new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds), new SugarParameter("@StageRows", result.StageRows), new SugarParameter("@StandardRows", result.StandardRows), new SugarParameter("@DwdRows", result.DwdRows), new SugarParameter("@SummaryJson", BuildRunSummaryJson(result)), new SugarParameter("@Id", runLogId)); } private async Task MarkTransformRunFailedAsync(long runLogId, DateTime startedAt, string message) { try { var finishedAt = DateTime.Now; await _db.Ado.ExecuteCommandAsync( """ UPDATE mdp_transform_run_log SET status='FAILED', end_time=@EndTime, duration_ms=@DurationMs, error_message=@ErrorMessage, update_time=CURRENT_TIMESTAMP WHERE id=@Id """, new SugarParameter("@EndTime", finishedAt), new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds), new SugarParameter("@ErrorMessage", Truncate(message, 2000)), new SugarParameter("@Id", runLogId)); } catch (Exception ex) { Console.Error.WriteLine($"[S2MdpSyncTransform] MarkTransformRunFailed write failed (runLogId={runLogId}): {ex.Message}"); } } private static S2MdpSqlCommand Cmd(string sql, string batchId, DateTime now) { return new S2MdpSqlCommand(sql, new[] { new SugarParameter("@BatchId", batchId), new SugarParameter("@Now", now), new SugarParameter("@StatDate", now.Date) }); } private static string BuildJsonObjectExpression(IEnumerable columns) { var parts = columns.SelectMany(c => new[] { $"'{c.Replace("'", "''")}'", $"s.`{c}`" }); return $"JSON_OBJECT({string.Join(",", parts)})"; } private static string FindColumn(IEnumerable columns, string expected) { return columns.First(u => string.Equals(u, expected, StringComparison.OrdinalIgnoreCase)); } private static string NormalizeTriggerType(string? triggerType) { return string.IsNullOrWhiteSpace(triggerType) ? "AUTO" : triggerType.Trim().ToUpperInvariant(); } private static string BuildRunSummaryJson(S2MdpSyncTransformResult result) { return $$"""{"batchId":"{{result.BatchId}}","stageRows":{{result.StageRows}},"standardRows":{{result.StandardRows}},"dwdRows":{{result.DwdRows}},"kpiRows":{{result.KpiRows}}}"""; } private static string ResolveKpiValueTable(int metricLevel) { return metricLevel switch { 1 => "ado_s9_kpi_value_l1_day", 2 => "ado_s9_kpi_value_l2_day", 3 => "ado_s9_kpi_value_l3_day", 4 => "ado_s9_kpi_value_l4_day", _ => "ado_s9_kpi_value_l2_day" }; } private static decimal DefaultS2Target(string metricCode) { return metricCode switch { "S2_L1_001" => 20m, "S2_L1_002" => 99m, "S2_L1_003" => 20m, "S2_L1_004" => 18m, "S2_L2_001" => 1.2m, "S2_L2_002" => 95m, "S2_L2_003" => 20m, _ => 0m }; } private static string ResolveKpiStatus(decimal actual, decimal target, string? direction, decimal? yellowThreshold, decimal? redThreshold) { if (target <= 0) return "gray"; var ratio = actual / target * 100m; if (string.Equals(direction, "lower_is_better", StringComparison.OrdinalIgnoreCase)) { if (actual <= target) return "green"; if (ratio <= (yellowThreshold ?? 110m)) return "yellow"; return ratio >= (redThreshold ?? 120m) ? "red" : "yellow"; } if (actual >= target) return "green"; if (ratio >= (yellowThreshold ?? 95m)) return "yellow"; return ratio <= (redThreshold ?? 80m) ? "red" : "yellow"; } private static string ResolveTrendFlag(decimal actual, decimal? previous) { if (previous == null) return "flat"; if (actual > previous.Value) return "up"; if (actual < previous.Value) return "down"; return "flat"; } private static string Truncate(string? raw, int maxLength) { if (string.IsNullOrEmpty(raw)) return string.Empty; return raw.Length <= maxLength ? raw : raw[..maxLength]; } private sealed class S2ColumnRow { public string ColumnName { get; set; } = string.Empty; } private sealed class S2MdpEntityRow { public long Id { get; set; } public string EntityName { get; set; } = string.Empty; } private sealed class S2KpiCalcRow { public long TenantId { get; set; } public long FactoryId { get; set; } public string MetricCode { get; set; } = string.Empty; public decimal? MetricValue { get; set; } } private sealed class S2KpiMetaRow { public int MetricLevel { get; set; } public string Direction { get; set; } = "higher_is_better"; public decimal? YellowThreshold { get; set; } public decimal? RedThreshold { get; set; } } private sealed class S2KpiValueRow { public long Id { get; set; } public decimal? MetricValue { get; set; } public decimal? TargetValue { get; set; } } } public sealed class S2MdpSyncTransformResult { public long RunLogId { get; set; } public string BatchId { get; set; } = string.Empty; public int StageRows { get; set; } public int StandardRows { get; set; } public int DwdRows { get; set; } public int KpiRows { get; set; } } internal sealed record S2MdpSqlCommand(string Sql, SugarParameter[] Parameters); internal sealed record S2MdpEntityConfig( string EntityCode, string SourceTable, string TargetTable, string SourceRowIdExpression, string SourceBizKeyExpression) { public static readonly IReadOnlyList All = new List { new("S2_WORK_ORDER_MASTER", "WorkOrdMaster", "mdp_stg_schedule", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`WorkOrd`,''))"), new("S2_WORK_ORDER_ROUTING", "WorkOrdRouting", "mdp_stg_schedule", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`WorkOrd`,''), ':', IFNULL(s.`OP`,''))"), new("S2_WORK_ORDER_DETAIL", "WorkOrdDetail", "mdp_stg_schedule", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`WorkOrd`,''), ':', IFNULL(s.`Op`,''), ':', IFNULL(s.`ItemNum`,''))"), new("S2_PERIOD_SEQUENCE_DET", "PeriodSequenceDet", "mdp_stg_schedule", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`WorkOrds`,''), ':', IFNULL(s.`Op`,''), ':', IFNULL(s.`Sequence`,''))"), new("S2_SCHEDULE_RESULT_OP", "ScheduleResultOpMaster", "mdp_stg_schedule", "RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`WorkOrd`,''), ':', IFNULL(s.`Op`,''), ':', IFNULL(s.`WorkDate`,''))") }; } internal static class S2MdpDdl { public static readonly IReadOnlyList SqlBlocks = new[] { """ CREATE TABLE IF NOT EXISTS mdp_stg_schedule ( id BIGINT AUTO_INCREMENT PRIMARY KEY, tenant_id BIGINT NOT NULL DEFAULT 0, factory_id VARCHAR(64) NULL, source_system VARCHAR(50) NOT NULL DEFAULT 'AIDOP', source_table VARCHAR(100) NOT NULL, source_row_id VARCHAR(100) NOT NULL, source_biz_key VARCHAR(200) NULL, sync_batch_id VARCHAR(100) NOT NULL, sync_time DATETIME NOT NULL, process_status VARCHAR(20) NOT NULL DEFAULT 'PENDING', raw_data JSON NOT NULL, update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, UNIQUE KEY uk_mdp_stg_schedule (tenant_id, source_table, source_row_id), KEY idx_mdp_stg_schedule_batch (sync_batch_id), KEY idx_mdp_stg_schedule_biz (source_biz_key) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='S2生产排程贴源层'; """, """ CREATE TABLE IF NOT EXISTS mdp_std_work_order_schedule ( id BIGINT AUTO_INCREMENT PRIMARY KEY, tenant_id BIGINT NOT NULL DEFAULT 0, factory_id BIGINT NULL, source_system VARCHAR(50) NOT NULL DEFAULT 'AIDOP', work_order VARCHAR(100) NOT NULL, sales_order_no VARCHAR(100) NULL, item_code VARCHAR(100) NULL, item_name VARCHAR(200) NULL, site_code VARCHAR(50) NULL, status VARCHAR(50) NULL, priority DECIMAL(18,6) NULL, urgent_flag TINYINT NOT NULL DEFAULT 0, qty_ordered DECIMAL(18,6) NULL, qty_completed DECIMAL(18,6) NULL, order_date DATETIME NULL, due_date DATETIME NULL, release_date DATETIME NULL, prod_line VARCHAR(100) NULL, source_biz_key VARCHAR(200) NULL, sync_batch_id VARCHAR(100) NOT NULL, sync_time DATETIME NOT NULL, update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, UNIQUE KEY uk_std_work_order_schedule (tenant_id, work_order), KEY idx_std_work_order_schedule_batch (sync_batch_id), KEY idx_std_work_order_schedule_due (tenant_id, due_date) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='S2标准工单排程'; """, """ CREATE TABLE IF NOT EXISTS mdp_std_operation_schedule ( id BIGINT AUTO_INCREMENT PRIMARY KEY, tenant_id BIGINT NOT NULL DEFAULT 0, factory_id BIGINT NULL, source_system VARCHAR(50) NOT NULL DEFAULT 'AIDOP', work_order VARCHAR(100) NOT NULL, op_no VARCHAR(50) NULL, work_center VARCHAR(100) NULL, line_code VARCHAR(100) NULL, item_code VARCHAR(100) NULL, plan_date DATETIME NULL, prod_date DATETIME NULL, start_time DATETIME NULL, end_time DATETIME NULL, ord_qty DECIMAL(18,6) NULL, comp_qty DECIMAL(18,6) NULL, run_crew DECIMAL(18,6) NULL, employee VARCHAR(200) NULL, source_biz_key VARCHAR(200) NULL, sync_batch_id VARCHAR(100) NOT NULL, sync_time DATETIME NOT NULL, update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, UNIQUE KEY uk_std_operation_schedule (tenant_id, source_biz_key), KEY idx_std_operation_schedule_work_order (tenant_id, work_order), KEY idx_std_operation_schedule_batch (sync_batch_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='S2标准工序排程'; """, """ CREATE TABLE IF NOT EXISTS dwd_order_schedule_trans ( id BIGINT AUTO_INCREMENT PRIMARY KEY, tenant_id BIGINT NOT NULL DEFAULT 0, factory_id BIGINT NOT NULL DEFAULT 1, stat_date DATE NOT NULL, work_order VARCHAR(100) NOT NULL, sales_order_no VARCHAR(100) NULL, item_code VARCHAR(100) NULL, item_name VARCHAR(200) NULL, site_code VARCHAR(50) NULL, prod_line VARCHAR(100) NULL, status VARCHAR(50) NULL, urgent_flag TINYINT NOT NULL DEFAULT 0, qty_ordered DECIMAL(18,6) NULL, qty_completed DECIMAL(18,6) NULL, order_date DATETIME NULL, due_date DATETIME NULL, release_date DATETIME NULL, first_plan_date DATETIME NULL, last_plan_date DATETIME NULL, first_start_time DATETIME NULL, last_end_time DATETIME NULL, operation_count INT NOT NULL DEFAULT 0, scheduled_qty DECIMAL(18,6) NULL, completed_op_qty DECIMAL(18,6) NULL, schedule_cycle_days DECIMAL(18,6) NULL, schedule_satisfaction_flag TINYINT NOT NULL DEFAULT 0, wip_qty DECIMAL(18,6) NULL, resource_person_count DECIMAL(18,6) NULL, calc_batch_id VARCHAR(100) NOT NULL, calc_time DATETIME NOT NULL, update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, UNIQUE KEY uk_dwd_order_schedule_trans (tenant_id, work_order, calc_batch_id), KEY idx_dwd_order_schedule_trans_batch (calc_batch_id), KEY idx_dwd_order_schedule_trans_stat (tenant_id, stat_date), KEY idx_dwd_order_schedule_trans_order (tenant_id, sales_order_no) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='S2订单工单排程DWD'; """, """ INSERT INTO mdp_entity (tenant_id, source_id, entity_code, entity_name, entity_type, source_table_name, target_table_name, sync_mode, batch_size, status, remark) SELECT 0, s.id, v.entity_code, v.entity_name, 'TABLE', v.source_table_name, 'mdp_stg_schedule', 'FULL', 5000, 1, v.remark FROM mdp_source s JOIN ( SELECT 'S2_WORK_ORDER_MASTER' AS entity_code, 'S2工单主数据' AS entity_name, 'WorkOrdMaster' AS source_table_name, '工单主数据进入 S2 贴源层' AS remark UNION ALL SELECT 'S2_WORK_ORDER_ROUTING', 'S2工单工艺路线', 'WorkOrdRouting', '工单工艺路线进入 S2 贴源层' UNION ALL SELECT 'S2_WORK_ORDER_DETAIL', 'S2工单物料明细', 'WorkOrdDetail', '工单物料需求进入 S2 贴源层' UNION ALL SELECT 'S2_PERIOD_SEQUENCE_DET', 'S2工序排程计划', 'PeriodSequenceDet', '工序间衔接与排程计划进入 S2 贴源层' UNION ALL SELECT 'S2_SCHEDULE_RESULT_OP', 'S2工序排产结果', 'ScheduleResultOpMaster', '工序排产结果进入 S2 贴源层' ) v WHERE s.tenant_id=0 AND s.source_code='AIDOPDEV_MYSQL' ON DUPLICATE KEY UPDATE source_id=VALUES(source_id), entity_name=VALUES(entity_name), entity_type=VALUES(entity_type), source_table_name=VALUES(source_table_name), target_table_name=VALUES(target_table_name), sync_mode=VALUES(sync_mode), batch_size=VALUES(batch_size), status=VALUES(status), remark=VALUES(remark), update_time=CURRENT_TIMESTAMP; """ }; }