namespace Admin.NET.Plugin.AiDOP.Order; /// /// 数据中台统一 MDP 运行监控。 /// [ApiDescriptionSettings(Order = 322, Description = "统一MDP运行监控")] [Route("api/DataPlatform")] [AllowAnonymous] [NonUnify] public class MdpMonitorService : IDynamicApiController, ITransient { private static readonly Dictionary ModuleJobCodes = new(StringComparer.OrdinalIgnoreCase) { ["S1"] = "S1_MDP_SYNC_TRANSFORM", ["S2"] = "S2_MDP_SYNC_TRANSFORM", ["S3"] = "S3_MDP_SYNC_TRANSFORM", ["S4"] = "S4_MDP_SYNC_TRANSFORM" }; private static readonly Dictionary JobCatalog = new(StringComparer.OrdinalIgnoreCase) { ["S1_MDP_SYNC_TRANSFORM"] = new( "S1_MDP_SYNC_TRANSFORM", "order_delivery", "订单交付域", ["S1", "S2", "S3", "S4", "S7", "S9"], "GLOBAL_DOMAIN", "订单交付域 MDP 同步", "job_s1_mdp_sync_transform"), ["S2_MDP_SYNC_TRANSFORM"] = new( "S2_MDP_SYNC_TRANSFORM", "work_schedule", "工单排程域", ["S2", "S3", "S5", "S6", "S8", "S9"], "GLOBAL_DOMAIN", "工单排程域 MDP 同步", "job_s2_mdp_sync_transform"), ["S3_MDP_SYNC_TRANSFORM"] = new( "S3_MDP_SYNC_TRANSFORM", "supply_purchase", "供应采购域", ["S3", "S4", "S5", "S8", "S9"], "GLOBAL_DOMAIN", "供应采购域 MDP 同步", "job_s3_mdp_sync_transform"), ["S4_MDP_SYNC_TRANSFORM"] = new( "S4_MDP_SYNC_TRANSFORM", "purchase_execution", "采购执行域", ["S4", "S5", "S8", "S9"], "GLOBAL_DOMAIN", "采购执行域 MDP 同步", "job_s4_mdp_sync_transform") }; private readonly ISqlSugarClient _db; private readonly UserManager _userManager; public MdpMonitorService(ISqlSugarClient db, UserManager userManager) { _db = db; _userManager = userManager; } [DisplayName("MDP模块选项")] [HttpGet("mdp-monitor/modules")] public object GetModules() => BuildCatalogResponse(); [DisplayName("MDP任务目录")] [HttpGet("mdp-monitor/catalog")] public object GetCatalog() => BuildCatalogResponse(); [DisplayName("MDP最近运行状态")] [HttpGet("mdp-monitor/latest")] public async Task GetLatest([FromQuery] MdpMonitorQueryInput input) { var tenantId = _userManager.TenantId; var (whereSql, pars) = BuildWhere(input, tenantId); var row = await _db.Ado.SqlQuerySingleAsync( $""" {SelectColumnsSql()} FROM mdp_transform_run_log WHERE {whereSql} ORDER BY start_time DESC, id DESC LIMIT 1 """, pars) ?? new MdpMonitorRunLogRow(); AttachCatalogInfo(row); return row; } [DisplayName("MDP运行日志列表")] [HttpGet("mdp-monitor/list")] public async Task GetList([FromQuery] MdpMonitorListInput input) { var tenantId = _userManager.TenantId; var page = input.Page <= 0 ? 1 : input.Page; var pageSize = input.PageSize <= 0 ? 10 : input.PageSize; var offset = (page - 1) * pageSize; var (whereSql, pars) = BuildWhere(input, tenantId); var total = await _db.Ado.GetIntAsync($"SELECT COUNT(1) FROM mdp_transform_run_log WHERE {whereSql}", pars); var list = await _db.Ado.SqlQueryAsync( $""" {SelectColumnsSql()} FROM mdp_transform_run_log WHERE {whereSql} ORDER BY start_time DESC, id DESC LIMIT {pageSize} OFFSET {offset} """, pars); foreach (var row in list) AttachCatalogInfo(row); return new { total, page, pageSize, list }; } [DisplayName("MDP运行日志详情")] [HttpGet("mdp-monitor/detail/{id}")] public async Task GetDetail(long id, [FromQuery] MdpMonitorQueryInput input) { var tenantId = _userManager.TenantId; var (whereSql, pars) = BuildWhere(input, tenantId); pars.Add(new SugarParameter("@Id", id)); var row = await _db.Ado.SqlQuerySingleAsync( $""" {SelectColumnsSql()} FROM mdp_transform_run_log WHERE id=@Id AND {whereSql} LIMIT 1 """, pars); if (row == null) throw Oops.Oh("运行日志不存在"); AttachCatalogInfo(row); return row; } [DisplayName("MDP同步链路详情")] [HttpGet("mdp-monitor/lineage")] public async Task GetLineage([FromQuery] MdpMonitorLineageInput input) { var tenantId = _userManager.TenantId; var moduleCode = ResolveModuleCode(input.ModuleCode, input.JobCode); if (string.IsNullOrWhiteSpace(moduleCode)) throw Oops.Oh("请选择 MDP 模块"); var jobCode = ResolveJobCode(moduleCode, input.JobCode); var entityPrefix = $"{moduleCode}_%"; var entities = await _db.Ado.SqlQueryAsync( """ SELECT e.id AS Id, e.entity_code AS EntityCode, e.entity_name AS EntityName, e.entity_type AS EntityType, s.source_code AS SourceCode, s.source_name AS SourceName, s.source_type AS SourceType, s.db_type AS SourceDbType, s.db_host AS SourceDbHost, s.db_port AS SourceDbPort, s.db_name AS SourceDbName, e.source_table_name AS SourceTableName, e.source_api_path AS SourceApiPath, s.db_type AS TargetDbType, s.db_host AS TargetDbHost, s.db_port AS TargetDbPort, s.db_name AS TargetDbName, e.target_table_name AS TargetTableName, e.sync_mode AS SyncMode, e.incr_column AS IncrColumn, e.status AS Status FROM mdp_entity e LEFT JOIN mdp_source s ON s.id = e.source_id WHERE e.entity_code LIKE @EntityPrefix AND (e.tenant_id = @TenantId OR e.tenant_id = 0) ORDER BY e.entity_code """, new SugarParameter("@EntityPrefix", entityPrefix), new SugarParameter("@TenantId", tenantId)); if (entities.Count == 0) { var emptyOutput = new MdpLineageOutput { ModuleCode = moduleCode, JobCode = jobCode, BatchId = input.BatchId, Stages = BuildStageDescriptions(jobCode, moduleCode), Entities = new List() }; AttachLineageCatalogInfo(emptyOutput); return emptyOutput; } var entityIds = string.Join(",", entities.Select(u => u.Id)); var mappings = await _db.Ado.SqlQueryAsync( $""" SELECT entity_id AS EntityId, source_field AS SourceField, target_field AS TargetField, field_type AS FieldType, transform_script AS TransformScript, const_value AS ConstValue, lookup_table AS LookupTable, is_required AS IsRequired, default_value AS DefaultValue, sort_order AS SortOrder FROM mdp_field_mapping WHERE entity_id IN ({entityIds}) ORDER BY entity_id, sort_order, target_field """); var mappingsByEntity = mappings.GroupBy(u => u.EntityId).ToDictionary(u => u.Key, u => u.ToList()); Dictionary syncLogsByEntity = new(); if (!string.IsNullOrWhiteSpace(input.BatchId)) { var syncLogs = await _db.Ado.SqlQueryAsync( """ SELECT entity_id AS EntityId, entity_name AS EntityName, status AS Status, rows_read AS RowsRead, rows_insert AS RowsInsert, rows_update AS RowsUpdate, rows_skip AS RowsSkip, rows_error AS RowsError, sync_start AS SyncStart, sync_end AS SyncEnd, duration_ms AS DurationMs, error_msg AS ErrorMsg FROM mdp_sync_log WHERE sync_batch_id = @BatchId AND (tenant_id = @TenantId OR tenant_id = 0) AND entity_id IN ( SELECT id FROM mdp_entity WHERE entity_code LIKE @EntityPrefix AND (tenant_id = @TenantId OR tenant_id = 0) ) ORDER BY sync_start, id """, new SugarParameter("@BatchId", input.BatchId.Trim()), new SugarParameter("@EntityPrefix", entityPrefix), new SugarParameter("@TenantId", tenantId)); syncLogsByEntity = syncLogs .GroupBy(u => u.EntityId) .ToDictionary(u => u.Key, u => u.OrderByDescending(x => x.SyncStart).First()); } var batchId = input.BatchId?.Trim(); foreach (var entity in entities) { entity.SourceFullName = BuildObjectFullName(entity.SourceDbType, entity.SourceDbHost, entity.SourceDbPort, entity.SourceDbName, entity.SourceTableName ?? entity.SourceApiPath); entity.TargetFullName = BuildObjectFullName(entity.TargetDbType, entity.TargetDbHost, entity.TargetDbPort, entity.TargetDbName, entity.TargetTableName); if (mappingsByEntity.TryGetValue(entity.Id, out var entityMappings) && entityMappings.Count > 0) { foreach (var mapping in entityMappings) { mapping.MappingSource = "CONFIG"; mapping.IsFallback = false; } entity.FieldMappings = entityMappings; entity.FieldMappingCount = entityMappings.Count; } else { entity.FieldMappings = BuildFallbackFieldMappings(entity, batchId); entity.FieldMappingCount = entity.FieldMappings.Count; } if (syncLogsByEntity.TryGetValue(entity.Id, out var syncLog)) entity.SyncLog = syncLog; } var output = new MdpLineageOutput { ModuleCode = moduleCode, JobCode = jobCode, BatchId = input.BatchId, Stages = BuildStageDescriptions(jobCode, moduleCode), Entities = entities }; AttachLineageCatalogInfo(output); return output; } private static (string WhereSql, List Parameters) BuildWhere(MdpMonitorQueryInput input, long tenantId) { var where = new List { BuildMdpRunLogTenantWhere(tenantId) }; var pars = new List { new("@TenantId", tenantId) }; var (jobFilterSql, jobFilterPars, noMatch) = BuildJobCodeFilter(input); if (noMatch) where.Add("1=0"); else { where.Add(jobFilterSql); pars.AddRange(jobFilterPars); } if (!string.IsNullOrWhiteSpace(input.BatchId)) { where.Add("batch_id LIKE @BatchId"); pars.Add(new SugarParameter("@BatchId", $"%{input.BatchId.Trim()}%")); } if (!string.IsNullOrWhiteSpace(input.Status)) { where.Add("status=@Status"); pars.Add(new SugarParameter("@Status", input.Status.Trim().ToUpperInvariant())); } if (input.StartTime.HasValue) { where.Add("start_time >= @StartTime"); pars.Add(new SugarParameter("@StartTime", input.StartTime.Value)); } if (input.EndTime.HasValue) { where.Add("start_time <= @EndTime"); pars.Add(new SugarParameter("@EndTime", input.EndTime.Value)); } return (string.Join(" AND ", where), pars); } /// /// MDP 转换任务当前以 tenant_id=0 写入运行日志;登录租户查询时需兼容这类全局任务记录。 /// internal static string BuildMdpRunLogTenantWhere(long tenantId) => tenantId > 0 ? "(tenant_id = @TenantId OR tenant_id = 0)" : "tenant_id = 0"; private static object BuildCatalogResponse() { return JobCatalog.Values .OrderBy(u => u.JobCode, StringComparer.OrdinalIgnoreCase) .Select(u => new { jobCode = u.JobCode, displayName = u.DisplayName, businessDomainCode = u.BusinessDomainCode, businessDomainName = u.BusinessDomainName, consumerModules = u.ConsumerModules, scopeType = u.ScopeType, moduleCode = ResolveModuleCodeFromJobCode(u.JobCode), scheduleJobId = u.ScheduleJobId }) .ToList(); } private static (string Sql, List Parameters, bool NoMatch) BuildJobCodeFilter(MdpMonitorQueryInput input) { var pars = new List(); if (!string.IsNullOrWhiteSpace(input.JobCode)) { pars.Add(new SugarParameter("@JobCode", input.JobCode.Trim().ToUpperInvariant())); return ("job_code=@JobCode", pars, false); } var hasDomainFilter = !string.IsNullOrWhiteSpace(input.BusinessDomainCode); var consumerModule = !string.IsNullOrWhiteSpace(input.ConsumerModule) ? input.ConsumerModule.Trim() : !string.IsNullOrWhiteSpace(input.ModuleCode) ? input.ModuleCode.Trim() : null; var hasConsumerFilter = !string.IsNullOrWhiteSpace(consumerModule); if (!hasDomainFilter && !hasConsumerFilter) return ("IFNULL(job_code, '') LIKE '%MDP%'", pars, false); var allowed = ResolveAllowedJobCodes(input.BusinessDomainCode, consumerModule); if (allowed.Count == 0) return (string.Empty, pars, true); if (allowed.Count == 1) { pars.Add(new SugarParameter("@JobCode", allowed.First())); return ("job_code=@JobCode", pars, false); } var inParts = new List(); var index = 0; foreach (var code in allowed.OrderBy(u => u, StringComparer.OrdinalIgnoreCase)) { var paramName = $"@JobCode{index++}"; inParts.Add(paramName); pars.Add(new SugarParameter(paramName, code)); } return ($"job_code IN ({string.Join(", ", inParts)})", pars, false); } private static HashSet ResolveAllowedJobCodes(string? businessDomainCode, string? consumerModule) { IEnumerable items = JobCatalog.Values; if (!string.IsNullOrWhiteSpace(businessDomainCode)) { var domain = businessDomainCode.Trim(); items = items.Where(u => string.Equals(u.BusinessDomainCode, domain, StringComparison.OrdinalIgnoreCase)); } if (!string.IsNullOrWhiteSpace(consumerModule)) { var module = consumerModule.Trim(); items = items.Where(u => u.ConsumerModules.Contains(module, StringComparer.OrdinalIgnoreCase)); } return items.Select(u => u.JobCode).ToHashSet(StringComparer.OrdinalIgnoreCase); } private static void AttachCatalogInfo(MdpMonitorRunLogRow row) { if (row == null || string.IsNullOrWhiteSpace(row.JobCode)) return; if (!JobCatalog.TryGetValue(row.JobCode.Trim(), out var item)) return; row.BusinessDomainCode = item.BusinessDomainCode; row.BusinessDomainName = item.BusinessDomainName; row.ConsumerModules = string.Join(",", item.ConsumerModules); row.ScopeType = item.ScopeType; row.DisplayName = item.DisplayName; row.ScheduleJobId = item.ScheduleJobId; } private static void AttachLineageCatalogInfo(MdpLineageOutput output) { if (output == null || string.IsNullOrWhiteSpace(output.JobCode)) return; if (!JobCatalog.TryGetValue(output.JobCode.Trim(), out var item)) return; output.BusinessDomainCode = item.BusinessDomainCode; output.BusinessDomainName = item.BusinessDomainName; output.ConsumerModules = string.Join(",", item.ConsumerModules); output.ScopeType = item.ScopeType; output.DisplayName = item.DisplayName; output.ScheduleJobId = item.ScheduleJobId; } private static string FormatConsumerModulesLabel(MdpJobCatalogItem item) => string.Join("、", item.ConsumerModules); private static List BuildStageDescriptions(string? jobCode, string? moduleCode) { if (!string.IsNullOrWhiteSpace(jobCode) && JobCatalog.TryGetValue(jobCode.Trim(), out var catalogItem)) { return catalogItem.BusinessDomainCode switch { "order_delivery" => BuildOrderDeliveryStages(catalogItem), "work_schedule" => BuildWorkScheduleStages(catalogItem), "supply_purchase" => BuildSupplyPurchaseStages(catalogItem), "purchase_execution" => BuildPurchaseExecutionStages(catalogItem), _ => BuildGenericStages(moduleCode) }; } return BuildGenericStages(moduleCode); } private static List BuildOrderDeliveryStages(MdpJobCatalogItem item) { var consumers = FormatConsumerModulesLabel(item); return new List { new() { StageCode = "STAGING", StageName = "订单交付域 · 贴源同步", Layer = "mdp_stg", Description = $"按 mdp_entity 登记订单交付域源对象抽取数据,保留 raw_data JSON 便于追溯;产出供 {consumers} 消费。", InputObjects = "旧系统 / 当前库源对象", OutputObjects = "mdp_stg_so, mdp_stg_ship_trans", Execution = "S1MdpSyncTransformService.SyncStagingAsync" }, new() { StageCode = "STANDARD", StageName = "订单交付域 · 标准层转换", Layer = "mdp_std", Description = "解析贴源 raw_data,做字段标准化、租户兜底和幂等写入。", InputObjects = "mdp_stg_so, mdp_stg_ship_trans", OutputObjects = "mdp_std_so, mdp_std_ship_trans", Execution = "S1MdpSyncTransformService.BuildStandardCommands" }, new() { StageCode = "DWD", StageName = "订单交付域 · DWD宽表", Layer = "dwd", Description = $"沉淀订单交付事实,供 {consumers} 看板与诊断读取。", InputObjects = "mdp_std_so, mdp_std_ship_trans", OutputObjects = "dwd_ship_trans", Execution = "S1MdpSyncTransformService.BuildDwdAsync" }, new() { StageCode = "KPI", StageName = "指标写入", Layer = "ado_s9", Description = $"计算订单交付域 L1 指标并写入统一指标值表,供 {consumers} 消费。", InputObjects = "mdp_std_so, dwd_ship_trans", OutputObjects = "ado_s9_kpi_value_l1_day", Execution = "S1MdpSyncTransformService.BuildS1KpiValuesAsync" } }; } private static List BuildWorkScheduleStages(MdpJobCatalogItem item) { var consumers = FormatConsumerModulesLabel(item); return new List { new() { StageCode = "STAGING", StageName = "工单排程域 · 贴源同步", Layer = "mdp_stg", Description = $"按 mdp_entity 登记工单排程域源对象抽取数据;产出供 {consumers} 消费。", InputObjects = "工单 / 工序 / 排程源对象", OutputObjects = "mdp_stg_*", Execution = "S2MdpSyncTransformService.SyncStagingAsync" }, new() { StageCode = "STANDARD", StageName = "工单排程域 · 标准层转换", Layer = "mdp_std", Description = "将工单、工序、排程等对象标准化。", InputObjects = "mdp_stg_*", OutputObjects = "mdp_std_*", Execution = "S2MdpSyncTransformService.BuildStandardCommands" }, new() { StageCode = "DWD", StageName = "工单排程域 · DWD宽表", Layer = "dwd", Description = $"生成制造执行与排程分析宽表,供 {consumers} 读取。", InputObjects = "mdp_std_*", OutputObjects = "dwd_*", Execution = "S2MdpSyncTransformService.BuildDwdAsync" }, new() { StageCode = "KPI", StageName = "指标写入", Layer = "ado_s9", Description = $"写入工单排程域 KPI,供 {consumers} 消费。", InputObjects = "mdp_std_* / dwd_*", OutputObjects = "ado_s9_kpi_value_*", Execution = "S2MdpSyncTransformService.BuildS2KpiValuesAsync" } }; } private static List BuildSupplyPurchaseStages(MdpJobCatalogItem item) { var consumers = FormatConsumerModulesLabel(item); return new List { new() { StageCode = "STAGING", StageName = "供应采购域 · 贴源同步", Layer = "mdp_stg", Description = $"按 mdp_entity 登记供应采购域源对象抽取数据;产出供 {consumers} 消费。", InputObjects = "供应 / 物料 / 采购源对象", OutputObjects = "mdp_stg_*", Execution = "S3MdpSyncTransformService.SyncStagingAsync" }, new() { StageCode = "STANDARD", StageName = "供应采购域 · 标准层转换", Layer = "mdp_std", Description = "将供应、物料、采购、交货计划等对象标准化。", InputObjects = "mdp_stg_*", OutputObjects = "mdp_std_*", Execution = "S3MdpSyncTransformService.BuildStandardCommands" }, new() { StageCode = "DWD", StageName = "供应采购域 · DWD宽表", Layer = "dwd", Description = $"生成供应交付、齐套、风险等分析宽表,供 {consumers} 读取。", InputObjects = "mdp_std_*", OutputObjects = "dwd_supplier_delivery / dwd_material_readiness 等", Execution = "S3MdpSyncTransformService.BuildDwdAsync" }, new() { StageCode = "KPI", StageName = "指标写入", Layer = "ado_s9", Description = $"写入供应采购域指标,供 {consumers} 消费。", InputObjects = "mdp_std_* / dwd_*", OutputObjects = "ado_s9_kpi_value_*", Execution = "S3MdpSyncTransformService.BuildS3KpiValuesAsync" } }; } private static List BuildPurchaseExecutionStages(MdpJobCatalogItem item) { var consumers = FormatConsumerModulesLabel(item); return new List { new() { StageCode = "STAGING", StageName = "采购执行域 · 贴源同步", Layer = "mdp_stg_s4_*", Description = $"同步采购执行域 IQC/发货/退货/欠料事实,供 {consumers} 消费;共享采购主链仍由供应采购域维护。", InputObjects = "PurOrdRctDetail / scm_shdzb / srm_polist_ds / dwd_material_shortage", OutputObjects = "mdp_stg_s4_iqc / mdp_stg_s4_shipment / mdp_stg_s4_return / mdp_stg_s4_shortage", Execution = "S4MdpSyncTransformService.SyncStagingAsync" }, new() { StageCode = "STANDARD", StageName = "采购执行域 · 标准层转换", Layer = "mdp_std_s4_*", Description = "将采购执行域贴源对象标准化,并统计供应采购域共享标准层行数。", InputObjects = "mdp_stg_s4_*", OutputObjects = "mdp_std_s4_iqc / mdp_std_s4_shipment / mdp_std_s4_return / mdp_std_s4_shortage", Execution = "S4MdpSyncTransformService.BuildStandardCommands" }, new() { StageCode = "DWD", StageName = "采购执行域 · DWD宽表", Layer = "dwd", Description = $"写入采购执行分析宽表,供 {consumers} 读取。", InputObjects = "dwd_supplier_delivery / mdp_std_s4_*", OutputObjects = "dwd_s4_purchase_execution / dwd_po_trans / dwd_qc_trans", Execution = "S4MdpSyncTransformService.BuildDwdAsync" }, new() { StageCode = "KPI", StageName = "指标写入", Layer = "ado_s9", Description = $"写入采购执行域 L1/L2/L3 指标,供 {consumers} 消费。", InputObjects = "mdp_std_delivery_schedule / dwd_supplier_delivery / dwd_s4_purchase_execution", OutputObjects = "ado_s9_kpi_value_l1/l2/l3_day", Execution = "S4MdpSyncTransformService.BuildS4KpiValuesAsync" } }; } /// /// 贴源同步在代码中维护、但未写入 mdp_field_mapping 的实体主键/业务键提示(仅监控展示兜底)。 /// private static readonly Dictionary KnownStagingEntityKeys = new(StringComparer.OrdinalIgnoreCase) { ["S4_IQC_RECEIPT"] = ("RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`Receiver`,''), ':', IFNULL(s.`Line`,''))"), ["S4_SHIPMENT_EXEC"] = ("id", "CONCAT(IFNULL(s.`glid`,''), ':', IFNULL(s.`id`,''))"), ["S4_RETURN_EXEC"] = ("Id", "s.`dsnum`"), ["S4_SHORTAGE_EXEC"] = ("id", "CONCAT(IFNULL(s.`work_order`,''), ':', IFNULL(s.`component_item_code`,''))") }; private static List BuildFallbackFieldMappings(MdpLineageEntityRow entity, string? batchId) { var sourceRowId = entity.IncrColumn; string? sourceBizKeyExpr = null; if (!string.IsNullOrWhiteSpace(entity.EntityCode) && KnownStagingEntityKeys.TryGetValue(entity.EntityCode, out var known)) { sourceRowId ??= known.SourceRowId; sourceBizKeyExpr = known.SourceBizKeyExpr; } var fallbackNote = "当前实体未配置逐字段映射,贴源同步保留源行 raw_data"; var mappings = new List(); var sort = 10; void Add(string sourceField, string targetField, string fieldType, string? transformScript, string? constValue = null, bool isRequired = false) { mappings.Add(new MdpLineageFieldMappingRow { EntityId = entity.Id, SourceField = sourceField, TargetField = targetField, FieldType = fieldType, TransformScript = transformScript, ConstValue = constValue, IsRequired = isRequired, SortOrder = sort, MappingSource = "FALLBACK", IsFallback = true }); sort += 10; } Add("tenant_id", "tenant_id", "DIRECT", "当前租户或全局租户兜底"); Add($"CONST:{entity.SourceCode ?? "AIDOP"}", "source_system", "CONST", "来源数据源编码", entity.SourceCode ?? "AIDOP", isRequired: true); Add($"CONST:{entity.SourceTableName ?? entity.SourceApiPath ?? "--"}", "source_table", "CONST", "源表名", entity.SourceTableName ?? entity.SourceApiPath, isRequired: true); if (!string.IsNullOrWhiteSpace(sourceRowId)) Add(sourceRowId, "source_row_id", "DIRECT", "来自 mdp_entity.incr_column 或实体主键配置"); else Add("--", "source_row_id", "DIRECT", fallbackNote); if (!string.IsNullOrWhiteSpace(sourceBizKeyExpr)) Add(sourceBizKeyExpr, "source_biz_key", "EXPR", "来自实体业务键表达式", isRequired: true); else Add("--", "source_biz_key", "EXPR", fallbackNote); Add("*", "raw_data", "JSON", "源行整行 JSON"); Add("CONST", "sync_batch_id", "CONST", "当前同步批次", string.IsNullOrWhiteSpace(batchId) ? "当前同步批次" : batchId, isRequired: true); Add("NOW()", "sync_time", "CONST", "同步时间", isRequired: true); return mappings; } private static List BuildGenericStages(string? moduleCode) { if (string.Equals(moduleCode, "S1", StringComparison.OrdinalIgnoreCase)) return BuildOrderDeliveryStages(JobCatalog["S1_MDP_SYNC_TRANSFORM"]); if (string.Equals(moduleCode, "S2", StringComparison.OrdinalIgnoreCase)) return BuildWorkScheduleStages(JobCatalog["S2_MDP_SYNC_TRANSFORM"]); if (string.Equals(moduleCode, "S3", StringComparison.OrdinalIgnoreCase)) return BuildSupplyPurchaseStages(JobCatalog["S3_MDP_SYNC_TRANSFORM"]); if (string.Equals(moduleCode, "S4", StringComparison.OrdinalIgnoreCase)) return BuildPurchaseExecutionStages(JobCatalog["S4_MDP_SYNC_TRANSFORM"]); return new List { new() { StageCode = "STAGING", StageName = "贴源同步", Layer = "mdp_stg", Description = "按 mdp_entity 登记源对象抽取数据。", InputObjects = "源对象", OutputObjects = "mdp_stg_*", Execution = "MDP 同步服务" }, new() { StageCode = "STANDARD", StageName = "标准层转换", Layer = "mdp_std", Description = "标准层/DWD/KPI 当前由后端 Service 承载。", InputObjects = "mdp_stg_*", OutputObjects = "mdp_std_* / dwd_* / 指标表", Execution = "MDP 转换服务" } }; } private static string? ResolveModuleCodeFromJobCode(string? jobCode) { if (string.IsNullOrWhiteSpace(jobCode)) return null; return ModuleJobCodes.FirstOrDefault(u => string.Equals(u.Value, jobCode.Trim(), StringComparison.OrdinalIgnoreCase)).Key; } private static string? ResolveJobCode(string? moduleCode, string? jobCode) { if (!string.IsNullOrWhiteSpace(jobCode)) return jobCode.Trim().ToUpperInvariant(); if (string.IsNullOrWhiteSpace(moduleCode)) return null; return ModuleJobCodes.TryGetValue(moduleCode.Trim(), out var mapped) ? mapped : null; } private static string? ResolveModuleCode(string? moduleCode, string? jobCode) { if (!string.IsNullOrWhiteSpace(moduleCode)) return moduleCode.Trim().ToUpperInvariant(); if (string.IsNullOrWhiteSpace(jobCode)) return null; var normalizedJobCode = jobCode.Trim(); return ModuleJobCodes.FirstOrDefault(u => string.Equals(u.Value, normalizedJobCode, StringComparison.OrdinalIgnoreCase)).Key; } private static string? BuildObjectFullName(string? dbType, string? host, int? port, string? dbName, string? objectName) { if (string.IsNullOrWhiteSpace(objectName)) return null; var databaseObject = string.IsNullOrWhiteSpace(dbName) ? objectName : $"{dbName}.{objectName}"; var hostPart = string.IsNullOrWhiteSpace(host) ? null : port.HasValue ? $"{host}:{port}" : host; return string.Join(" / ", new[] { dbType, hostPart, databaseObject }.Where(u => !string.IsNullOrWhiteSpace(u))); } private static string SelectColumnsSql() { return """ SELECT id AS Id, tenant_id AS TenantId, job_code AS JobCode, job_name AS JobName, trigger_type AS TriggerType, batch_id AS BatchId, status AS Status, start_time AS StartTime, end_time AS EndTime, duration_ms AS DurationMs, stage_rows AS StageRows, standard_rows AS StandardRows, dwd_rows AS DwdRows, error_message AS ErrorMessage, summary_json AS SummaryJson, create_time AS CreateTime, update_time AS UpdateTime """; } } public sealed record MdpJobCatalogItem( string JobCode, string BusinessDomainCode, string BusinessDomainName, string[] ConsumerModules, string ScopeType, string DisplayName, string? ScheduleJobId); public class MdpMonitorQueryInput { public string? BusinessDomainCode { get; set; } public string? ConsumerModule { get; set; } public string? ModuleCode { get; set; } public string? JobCode { get; set; } public string? BatchId { get; set; } public string? Status { get; set; } public DateTime? StartTime { get; set; } public DateTime? EndTime { get; set; } } public sealed class MdpMonitorListInput : MdpMonitorQueryInput { public int Page { get; set; } = 1; public int PageSize { get; set; } = 10; } public sealed class MdpMonitorLineageInput : MdpMonitorQueryInput { } public sealed class MdpMonitorRunLogRow { public long Id { get; set; } public long TenantId { get; set; } public string? JobCode { get; set; } public string? JobName { get; set; } public string? TriggerType { get; set; } public string? BatchId { get; set; } public string? Status { get; set; } public DateTime? StartTime { get; set; } public DateTime? EndTime { get; set; } public int? DurationMs { get; set; } public int? StageRows { get; set; } public int? StandardRows { get; set; } public int? DwdRows { get; set; } public string? ErrorMessage { get; set; } public string? SummaryJson { get; set; } public DateTime? CreateTime { get; set; } public DateTime? UpdateTime { get; set; } public string? BusinessDomainCode { get; set; } public string? BusinessDomainName { get; set; } public string? ConsumerModules { get; set; } public string? ScopeType { get; set; } public string? DisplayName { get; set; } public string? ScheduleJobId { get; set; } } public sealed class MdpLineageOutput { public string? ModuleCode { get; set; } public string? JobCode { get; set; } public string? BatchId { get; set; } public string? BusinessDomainCode { get; set; } public string? BusinessDomainName { get; set; } public string? ConsumerModules { get; set; } public string? ScopeType { get; set; } public string? DisplayName { get; set; } public string? ScheduleJobId { get; set; } public List Stages { get; set; } = new(); public List Entities { get; set; } = new(); } public sealed class MdpLineageStageRow { public string? StageCode { get; set; } public string? StageName { get; set; } public string? Layer { get; set; } public string? Description { get; set; } public string? InputObjects { get; set; } public string? OutputObjects { get; set; } public string? Execution { get; set; } } public sealed class MdpLineageEntityRow { public long Id { get; set; } public string? EntityCode { get; set; } public string? EntityName { get; set; } public string? EntityType { get; set; } public string? SourceCode { get; set; } public string? SourceName { get; set; } public string? SourceType { get; set; } public string? SourceDbType { get; set; } public string? SourceDbHost { get; set; } public int? SourceDbPort { get; set; } public string? SourceDbName { get; set; } public string? SourceTableName { get; set; } public string? SourceApiPath { get; set; } public string? SourceFullName { get; set; } public string? TargetDbType { get; set; } public string? TargetDbHost { get; set; } public int? TargetDbPort { get; set; } public string? TargetDbName { get; set; } public string? TargetTableName { get; set; } public string? TargetFullName { get; set; } public string? SyncMode { get; set; } public string? IncrColumn { get; set; } public int? Status { get; set; } public int FieldMappingCount { get; set; } public List FieldMappings { get; set; } = new(); public MdpLineageSyncLogRow? SyncLog { get; set; } } public sealed class MdpLineageFieldMappingRow { public long EntityId { get; set; } public string? SourceField { get; set; } public string? TargetField { get; set; } public string? FieldType { get; set; } public string? TransformScript { get; set; } public string? ConstValue { get; set; } public string? LookupTable { get; set; } public bool IsRequired { get; set; } public string? DefaultValue { get; set; } public int SortOrder { get; set; } public string? MappingSource { get; set; } public bool IsFallback { get; set; } } public sealed class MdpLineageSyncLogRow { public long EntityId { get; set; } public string? EntityName { get; set; } public string? Status { get; set; } public long? RowsRead { get; set; } public long? RowsInsert { get; set; } public long? RowsUpdate { get; set; } public long? RowsSkip { get; set; } public long? RowsError { get; set; } public DateTime? SyncStart { get; set; } public DateTime? SyncEnd { get; set; } public int? DurationMs { get; set; } public string? ErrorMsg { get; set; } }