| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772 |
- namespace Admin.NET.Plugin.AiDOP.Order;
- /// <summary>
- /// 数据中台统一 MDP 运行监控。
- /// </summary>
- [ApiDescriptionSettings(Order = 322, Description = "统一MDP运行监控")]
- [Route("api/DataPlatform")]
- [AllowAnonymous]
- [NonUnify]
- public class MdpMonitorService : IDynamicApiController, ITransient
- {
- private static readonly Dictionary<string, string> ModuleJobCodes = new(StringComparer.OrdinalIgnoreCase)
- {
- ["S1"] = "S1_MDP_SYNC_TRANSFORM",
- ["S2"] = "S2_MDP_SYNC_TRANSFORM",
- ["S3"] = "S3_MDP_SYNC_TRANSFORM",
- ["S4"] = "S4_MDP_SYNC_TRANSFORM"
- };
- private static readonly Dictionary<string, MdpJobCatalogItem> JobCatalog = new(StringComparer.OrdinalIgnoreCase)
- {
- ["S1_MDP_SYNC_TRANSFORM"] = new(
- "S1_MDP_SYNC_TRANSFORM",
- "order_delivery",
- "订单交付域",
- ["S1", "S2", "S3", "S4", "S7", "S9"],
- "GLOBAL_DOMAIN",
- "订单交付域 MDP 同步",
- "job_s1_mdp_sync_transform"),
- ["S2_MDP_SYNC_TRANSFORM"] = new(
- "S2_MDP_SYNC_TRANSFORM",
- "work_schedule",
- "工单排程域",
- ["S2", "S3", "S5", "S6", "S8", "S9"],
- "GLOBAL_DOMAIN",
- "工单排程域 MDP 同步",
- "job_s2_mdp_sync_transform"),
- ["S3_MDP_SYNC_TRANSFORM"] = new(
- "S3_MDP_SYNC_TRANSFORM",
- "supply_purchase",
- "供应采购域",
- ["S3", "S4", "S5", "S8", "S9"],
- "GLOBAL_DOMAIN",
- "供应采购域 MDP 同步",
- "job_s3_mdp_sync_transform"),
- ["S4_MDP_SYNC_TRANSFORM"] = new(
- "S4_MDP_SYNC_TRANSFORM",
- "purchase_execution",
- "采购执行域",
- ["S4", "S5", "S8", "S9"],
- "GLOBAL_DOMAIN",
- "采购执行域 MDP 同步",
- "job_s4_mdp_sync_transform")
- };
- private readonly ISqlSugarClient _db;
- private readonly UserManager _userManager;
- public MdpMonitorService(ISqlSugarClient db, UserManager userManager)
- {
- _db = db;
- _userManager = userManager;
- }
- [DisplayName("MDP模块选项")]
- [HttpGet("mdp-monitor/modules")]
- public object GetModules() => BuildCatalogResponse();
- [DisplayName("MDP任务目录")]
- [HttpGet("mdp-monitor/catalog")]
- public object GetCatalog() => BuildCatalogResponse();
- [DisplayName("MDP最近运行状态")]
- [HttpGet("mdp-monitor/latest")]
- public async Task<object> GetLatest([FromQuery] MdpMonitorQueryInput input)
- {
- var tenantId = _userManager.TenantId;
- var (whereSql, pars) = BuildWhere(input, tenantId);
- var row = await _db.Ado.SqlQuerySingleAsync<MdpMonitorRunLogRow>(
- $"""
- {SelectColumnsSql()}
- FROM mdp_transform_run_log
- WHERE {whereSql}
- ORDER BY start_time DESC, id DESC
- LIMIT 1
- """,
- pars)
- ?? new MdpMonitorRunLogRow();
- AttachCatalogInfo(row);
- return row;
- }
- [DisplayName("MDP运行日志列表")]
- [HttpGet("mdp-monitor/list")]
- public async Task<object> GetList([FromQuery] MdpMonitorListInput input)
- {
- var tenantId = _userManager.TenantId;
- var page = input.Page <= 0 ? 1 : input.Page;
- var pageSize = input.PageSize <= 0 ? 10 : input.PageSize;
- var offset = (page - 1) * pageSize;
- var (whereSql, pars) = BuildWhere(input, tenantId);
- var total = await _db.Ado.GetIntAsync($"SELECT COUNT(1) FROM mdp_transform_run_log WHERE {whereSql}", pars);
- var list = await _db.Ado.SqlQueryAsync<MdpMonitorRunLogRow>(
- $"""
- {SelectColumnsSql()}
- FROM mdp_transform_run_log
- WHERE {whereSql}
- ORDER BY start_time DESC, id DESC
- LIMIT {pageSize} OFFSET {offset}
- """,
- pars);
- foreach (var row in list)
- AttachCatalogInfo(row);
- return new { total, page, pageSize, list };
- }
- [DisplayName("MDP运行日志详情")]
- [HttpGet("mdp-monitor/detail/{id}")]
- public async Task<object> GetDetail(long id, [FromQuery] MdpMonitorQueryInput input)
- {
- var tenantId = _userManager.TenantId;
- var (whereSql, pars) = BuildWhere(input, tenantId);
- pars.Add(new SugarParameter("@Id", id));
- var row = await _db.Ado.SqlQuerySingleAsync<MdpMonitorRunLogRow>(
- $"""
- {SelectColumnsSql()}
- FROM mdp_transform_run_log
- WHERE id=@Id AND {whereSql}
- LIMIT 1
- """,
- pars);
- if (row == null)
- throw Oops.Oh("运行日志不存在");
- AttachCatalogInfo(row);
- return row;
- }
- [DisplayName("MDP同步链路详情")]
- [HttpGet("mdp-monitor/lineage")]
- public async Task<object> GetLineage([FromQuery] MdpMonitorLineageInput input)
- {
- var tenantId = _userManager.TenantId;
- var moduleCode = ResolveModuleCode(input.ModuleCode, input.JobCode);
- if (string.IsNullOrWhiteSpace(moduleCode))
- throw Oops.Oh("请选择 MDP 模块");
- var jobCode = ResolveJobCode(moduleCode, input.JobCode);
- var entityPrefix = $"{moduleCode}_%";
- var entities = await _db.Ado.SqlQueryAsync<MdpLineageEntityRow>(
- """
- SELECT e.id AS Id, e.entity_code AS EntityCode, e.entity_name AS EntityName,
- e.entity_type AS EntityType, s.source_code AS SourceCode,
- s.source_name AS SourceName, s.source_type AS SourceType,
- s.db_type AS SourceDbType, s.db_host AS SourceDbHost,
- s.db_port AS SourceDbPort, s.db_name AS SourceDbName,
- e.source_table_name AS SourceTableName, e.source_api_path AS SourceApiPath,
- s.db_type AS TargetDbType, s.db_host AS TargetDbHost,
- s.db_port AS TargetDbPort, s.db_name AS TargetDbName,
- e.target_table_name AS TargetTableName, e.sync_mode AS SyncMode,
- e.incr_column AS IncrColumn, e.status AS Status
- FROM mdp_entity e
- LEFT JOIN mdp_source s ON s.id = e.source_id
- WHERE e.entity_code LIKE @EntityPrefix
- AND (e.tenant_id = @TenantId OR e.tenant_id = 0)
- ORDER BY e.entity_code
- """,
- new SugarParameter("@EntityPrefix", entityPrefix),
- new SugarParameter("@TenantId", tenantId));
- if (entities.Count == 0)
- {
- var emptyOutput = new MdpLineageOutput
- {
- ModuleCode = moduleCode,
- JobCode = jobCode,
- BatchId = input.BatchId,
- Stages = BuildStageDescriptions(jobCode, moduleCode),
- Entities = new List<MdpLineageEntityRow>()
- };
- AttachLineageCatalogInfo(emptyOutput);
- return emptyOutput;
- }
- var entityIds = string.Join(",", entities.Select(u => u.Id));
- var mappings = await _db.Ado.SqlQueryAsync<MdpLineageFieldMappingRow>(
- $"""
- SELECT entity_id AS EntityId, source_field AS SourceField, target_field AS TargetField,
- field_type AS FieldType, transform_script AS TransformScript,
- const_value AS ConstValue, lookup_table AS LookupTable,
- is_required AS IsRequired, default_value AS DefaultValue, sort_order AS SortOrder
- FROM mdp_field_mapping
- WHERE entity_id IN ({entityIds})
- ORDER BY entity_id, sort_order, target_field
- """);
- var mappingsByEntity = mappings.GroupBy(u => u.EntityId).ToDictionary(u => u.Key, u => u.ToList());
- Dictionary<long, MdpLineageSyncLogRow> syncLogsByEntity = new();
- if (!string.IsNullOrWhiteSpace(input.BatchId))
- {
- var syncLogs = await _db.Ado.SqlQueryAsync<MdpLineageSyncLogRow>(
- """
- SELECT entity_id AS EntityId, entity_name AS EntityName, status AS Status,
- rows_read AS RowsRead, rows_insert AS RowsInsert, rows_update AS RowsUpdate,
- rows_skip AS RowsSkip, rows_error AS RowsError,
- sync_start AS SyncStart, sync_end AS SyncEnd, duration_ms AS DurationMs,
- error_msg AS ErrorMsg
- FROM mdp_sync_log
- WHERE sync_batch_id = @BatchId
- AND (tenant_id = @TenantId OR tenant_id = 0)
- AND entity_id IN (
- SELECT id FROM mdp_entity
- WHERE entity_code LIKE @EntityPrefix
- AND (tenant_id = @TenantId OR tenant_id = 0)
- )
- ORDER BY sync_start, id
- """,
- new SugarParameter("@BatchId", input.BatchId.Trim()),
- new SugarParameter("@EntityPrefix", entityPrefix),
- new SugarParameter("@TenantId", tenantId));
- syncLogsByEntity = syncLogs
- .GroupBy(u => u.EntityId)
- .ToDictionary(u => u.Key, u => u.OrderByDescending(x => x.SyncStart).First());
- }
- var batchId = input.BatchId?.Trim();
- foreach (var entity in entities)
- {
- entity.SourceFullName = BuildObjectFullName(entity.SourceDbType, entity.SourceDbHost, entity.SourceDbPort, entity.SourceDbName, entity.SourceTableName ?? entity.SourceApiPath);
- entity.TargetFullName = BuildObjectFullName(entity.TargetDbType, entity.TargetDbHost, entity.TargetDbPort, entity.TargetDbName, entity.TargetTableName);
- if (mappingsByEntity.TryGetValue(entity.Id, out var entityMappings) && entityMappings.Count > 0)
- {
- foreach (var mapping in entityMappings)
- {
- mapping.MappingSource = "CONFIG";
- mapping.IsFallback = false;
- }
- entity.FieldMappings = entityMappings;
- entity.FieldMappingCount = entityMappings.Count;
- }
- else
- {
- entity.FieldMappings = BuildFallbackFieldMappings(entity, batchId);
- entity.FieldMappingCount = entity.FieldMappings.Count;
- }
- if (syncLogsByEntity.TryGetValue(entity.Id, out var syncLog))
- entity.SyncLog = syncLog;
- }
- var output = new MdpLineageOutput
- {
- ModuleCode = moduleCode,
- JobCode = jobCode,
- BatchId = input.BatchId,
- Stages = BuildStageDescriptions(jobCode, moduleCode),
- Entities = entities
- };
- AttachLineageCatalogInfo(output);
- return output;
- }
- private static (string WhereSql, List<SugarParameter> Parameters) BuildWhere(MdpMonitorQueryInput input, long tenantId)
- {
- var where = new List<string> { BuildMdpRunLogTenantWhere(tenantId) };
- var pars = new List<SugarParameter> { new("@TenantId", tenantId) };
- var (jobFilterSql, jobFilterPars, noMatch) = BuildJobCodeFilter(input);
- if (noMatch)
- where.Add("1=0");
- else
- {
- where.Add(jobFilterSql);
- pars.AddRange(jobFilterPars);
- }
- if (!string.IsNullOrWhiteSpace(input.BatchId))
- {
- where.Add("batch_id LIKE @BatchId");
- pars.Add(new SugarParameter("@BatchId", $"%{input.BatchId.Trim()}%"));
- }
- if (!string.IsNullOrWhiteSpace(input.Status))
- {
- where.Add("status=@Status");
- pars.Add(new SugarParameter("@Status", input.Status.Trim().ToUpperInvariant()));
- }
- if (input.StartTime.HasValue)
- {
- where.Add("start_time >= @StartTime");
- pars.Add(new SugarParameter("@StartTime", input.StartTime.Value));
- }
- if (input.EndTime.HasValue)
- {
- where.Add("start_time <= @EndTime");
- pars.Add(new SugarParameter("@EndTime", input.EndTime.Value));
- }
- return (string.Join(" AND ", where), pars);
- }
- /// <summary>
- /// MDP 转换任务当前以 tenant_id=0 写入运行日志;登录租户查询时需兼容这类全局任务记录。
- /// </summary>
- internal static string BuildMdpRunLogTenantWhere(long tenantId) =>
- tenantId > 0 ? "(tenant_id = @TenantId OR tenant_id = 0)" : "tenant_id = 0";
- private static object BuildCatalogResponse()
- {
- return JobCatalog.Values
- .OrderBy(u => u.JobCode, StringComparer.OrdinalIgnoreCase)
- .Select(u => new
- {
- jobCode = u.JobCode,
- displayName = u.DisplayName,
- businessDomainCode = u.BusinessDomainCode,
- businessDomainName = u.BusinessDomainName,
- consumerModules = u.ConsumerModules,
- scopeType = u.ScopeType,
- moduleCode = ResolveModuleCodeFromJobCode(u.JobCode),
- scheduleJobId = u.ScheduleJobId
- })
- .ToList();
- }
- private static (string Sql, List<SugarParameter> Parameters, bool NoMatch) BuildJobCodeFilter(MdpMonitorQueryInput input)
- {
- var pars = new List<SugarParameter>();
- if (!string.IsNullOrWhiteSpace(input.JobCode))
- {
- pars.Add(new SugarParameter("@JobCode", input.JobCode.Trim().ToUpperInvariant()));
- return ("job_code=@JobCode", pars, false);
- }
- var hasDomainFilter = !string.IsNullOrWhiteSpace(input.BusinessDomainCode);
- var consumerModule = !string.IsNullOrWhiteSpace(input.ConsumerModule)
- ? input.ConsumerModule.Trim()
- : !string.IsNullOrWhiteSpace(input.ModuleCode) ? input.ModuleCode.Trim() : null;
- var hasConsumerFilter = !string.IsNullOrWhiteSpace(consumerModule);
- if (!hasDomainFilter && !hasConsumerFilter)
- return ("IFNULL(job_code, '') LIKE '%MDP%'", pars, false);
- var allowed = ResolveAllowedJobCodes(input.BusinessDomainCode, consumerModule);
- if (allowed.Count == 0)
- return (string.Empty, pars, true);
- if (allowed.Count == 1)
- {
- pars.Add(new SugarParameter("@JobCode", allowed.First()));
- return ("job_code=@JobCode", pars, false);
- }
- var inParts = new List<string>();
- var index = 0;
- foreach (var code in allowed.OrderBy(u => u, StringComparer.OrdinalIgnoreCase))
- {
- var paramName = $"@JobCode{index++}";
- inParts.Add(paramName);
- pars.Add(new SugarParameter(paramName, code));
- }
- return ($"job_code IN ({string.Join(", ", inParts)})", pars, false);
- }
- private static HashSet<string> ResolveAllowedJobCodes(string? businessDomainCode, string? consumerModule)
- {
- IEnumerable<MdpJobCatalogItem> items = JobCatalog.Values;
- if (!string.IsNullOrWhiteSpace(businessDomainCode))
- {
- var domain = businessDomainCode.Trim();
- items = items.Where(u => string.Equals(u.BusinessDomainCode, domain, StringComparison.OrdinalIgnoreCase));
- }
- if (!string.IsNullOrWhiteSpace(consumerModule))
- {
- var module = consumerModule.Trim();
- items = items.Where(u => u.ConsumerModules.Contains(module, StringComparer.OrdinalIgnoreCase));
- }
- return items.Select(u => u.JobCode).ToHashSet(StringComparer.OrdinalIgnoreCase);
- }
- private static void AttachCatalogInfo(MdpMonitorRunLogRow row)
- {
- if (row == null || string.IsNullOrWhiteSpace(row.JobCode))
- return;
- if (!JobCatalog.TryGetValue(row.JobCode.Trim(), out var item))
- return;
- row.BusinessDomainCode = item.BusinessDomainCode;
- row.BusinessDomainName = item.BusinessDomainName;
- row.ConsumerModules = string.Join(",", item.ConsumerModules);
- row.ScopeType = item.ScopeType;
- row.DisplayName = item.DisplayName;
- row.ScheduleJobId = item.ScheduleJobId;
- }
- private static void AttachLineageCatalogInfo(MdpLineageOutput output)
- {
- if (output == null || string.IsNullOrWhiteSpace(output.JobCode))
- return;
- if (!JobCatalog.TryGetValue(output.JobCode.Trim(), out var item))
- return;
- output.BusinessDomainCode = item.BusinessDomainCode;
- output.BusinessDomainName = item.BusinessDomainName;
- output.ConsumerModules = string.Join(",", item.ConsumerModules);
- output.ScopeType = item.ScopeType;
- output.DisplayName = item.DisplayName;
- output.ScheduleJobId = item.ScheduleJobId;
- }
- private static string FormatConsumerModulesLabel(MdpJobCatalogItem item) =>
- string.Join("、", item.ConsumerModules);
- private static List<MdpLineageStageRow> BuildStageDescriptions(string? jobCode, string? moduleCode)
- {
- if (!string.IsNullOrWhiteSpace(jobCode) && JobCatalog.TryGetValue(jobCode.Trim(), out var catalogItem))
- {
- return catalogItem.BusinessDomainCode switch
- {
- "order_delivery" => BuildOrderDeliveryStages(catalogItem),
- "work_schedule" => BuildWorkScheduleStages(catalogItem),
- "supply_purchase" => BuildSupplyPurchaseStages(catalogItem),
- "purchase_execution" => BuildPurchaseExecutionStages(catalogItem),
- _ => BuildGenericStages(moduleCode)
- };
- }
- return BuildGenericStages(moduleCode);
- }
- private static List<MdpLineageStageRow> BuildOrderDeliveryStages(MdpJobCatalogItem item)
- {
- var consumers = FormatConsumerModulesLabel(item);
- return new List<MdpLineageStageRow>
- {
- new() { StageCode = "STAGING", StageName = "订单交付域 · 贴源同步", Layer = "mdp_stg", Description = $"按 mdp_entity 登记订单交付域源对象抽取数据,保留 raw_data JSON 便于追溯;产出供 {consumers} 消费。", InputObjects = "旧系统 / 当前库源对象", OutputObjects = "mdp_stg_so, mdp_stg_ship_trans", Execution = "S1MdpSyncTransformService.SyncStagingAsync" },
- new() { StageCode = "STANDARD", StageName = "订单交付域 · 标准层转换", Layer = "mdp_std", Description = "解析贴源 raw_data,做字段标准化、租户兜底和幂等写入。", InputObjects = "mdp_stg_so, mdp_stg_ship_trans", OutputObjects = "mdp_std_so, mdp_std_ship_trans", Execution = "S1MdpSyncTransformService.BuildStandardCommands" },
- new() { StageCode = "DWD", StageName = "订单交付域 · DWD宽表", Layer = "dwd", Description = $"沉淀订单交付事实,供 {consumers} 看板与诊断读取。", InputObjects = "mdp_std_so, mdp_std_ship_trans", OutputObjects = "dwd_ship_trans", Execution = "S1MdpSyncTransformService.BuildDwdAsync" },
- new() { StageCode = "KPI", StageName = "指标写入", Layer = "ado_s9", Description = $"计算订单交付域 L1 指标并写入统一指标值表,供 {consumers} 消费。", InputObjects = "mdp_std_so, dwd_ship_trans", OutputObjects = "ado_s9_kpi_value_l1_day", Execution = "S1MdpSyncTransformService.BuildS1KpiValuesAsync" }
- };
- }
- private static List<MdpLineageStageRow> BuildWorkScheduleStages(MdpJobCatalogItem item)
- {
- var consumers = FormatConsumerModulesLabel(item);
- return new List<MdpLineageStageRow>
- {
- new() { StageCode = "STAGING", StageName = "工单排程域 · 贴源同步", Layer = "mdp_stg", Description = $"按 mdp_entity 登记工单排程域源对象抽取数据;产出供 {consumers} 消费。", InputObjects = "工单 / 工序 / 排程源对象", OutputObjects = "mdp_stg_*", Execution = "S2MdpSyncTransformService.SyncStagingAsync" },
- new() { StageCode = "STANDARD", StageName = "工单排程域 · 标准层转换", Layer = "mdp_std", Description = "将工单、工序、排程等对象标准化。", InputObjects = "mdp_stg_*", OutputObjects = "mdp_std_*", Execution = "S2MdpSyncTransformService.BuildStandardCommands" },
- new() { StageCode = "DWD", StageName = "工单排程域 · DWD宽表", Layer = "dwd", Description = $"生成制造执行与排程分析宽表,供 {consumers} 读取。", InputObjects = "mdp_std_*", OutputObjects = "dwd_*", Execution = "S2MdpSyncTransformService.BuildDwdAsync" },
- new() { StageCode = "KPI", StageName = "指标写入", Layer = "ado_s9", Description = $"写入工单排程域 KPI,供 {consumers} 消费。", InputObjects = "mdp_std_* / dwd_*", OutputObjects = "ado_s9_kpi_value_*", Execution = "S2MdpSyncTransformService.BuildS2KpiValuesAsync" }
- };
- }
- private static List<MdpLineageStageRow> BuildSupplyPurchaseStages(MdpJobCatalogItem item)
- {
- var consumers = FormatConsumerModulesLabel(item);
- return new List<MdpLineageStageRow>
- {
- new() { StageCode = "STAGING", StageName = "供应采购域 · 贴源同步", Layer = "mdp_stg", Description = $"按 mdp_entity 登记供应采购域源对象抽取数据;产出供 {consumers} 消费。", InputObjects = "供应 / 物料 / 采购源对象", OutputObjects = "mdp_stg_*", Execution = "S3MdpSyncTransformService.SyncStagingAsync" },
- new() { StageCode = "STANDARD", StageName = "供应采购域 · 标准层转换", Layer = "mdp_std", Description = "将供应、物料、采购、交货计划等对象标准化。", InputObjects = "mdp_stg_*", OutputObjects = "mdp_std_*", Execution = "S3MdpSyncTransformService.BuildStandardCommands" },
- new() { StageCode = "DWD", StageName = "供应采购域 · DWD宽表", Layer = "dwd", Description = $"生成供应交付、齐套、风险等分析宽表,供 {consumers} 读取。", InputObjects = "mdp_std_*", OutputObjects = "dwd_supplier_delivery / dwd_material_readiness 等", Execution = "S3MdpSyncTransformService.BuildDwdAsync" },
- new() { StageCode = "KPI", StageName = "指标写入", Layer = "ado_s9", Description = $"写入供应采购域指标,供 {consumers} 消费。", InputObjects = "mdp_std_* / dwd_*", OutputObjects = "ado_s9_kpi_value_*", Execution = "S3MdpSyncTransformService.BuildS3KpiValuesAsync" }
- };
- }
- private static List<MdpLineageStageRow> BuildPurchaseExecutionStages(MdpJobCatalogItem item)
- {
- var consumers = FormatConsumerModulesLabel(item);
- return new List<MdpLineageStageRow>
- {
- new() { StageCode = "STAGING", StageName = "采购执行域 · 贴源同步", Layer = "mdp_stg_s4_*", Description = $"同步采购执行域 IQC/发货/退货/欠料事实,供 {consumers} 消费;共享采购主链仍由供应采购域维护。", InputObjects = "PurOrdRctDetail / scm_shdzb / srm_polist_ds / dwd_material_shortage", OutputObjects = "mdp_stg_s4_iqc / mdp_stg_s4_shipment / mdp_stg_s4_return / mdp_stg_s4_shortage", Execution = "S4MdpSyncTransformService.SyncStagingAsync" },
- new() { StageCode = "STANDARD", StageName = "采购执行域 · 标准层转换", Layer = "mdp_std_s4_*", Description = "将采购执行域贴源对象标准化,并统计供应采购域共享标准层行数。", InputObjects = "mdp_stg_s4_*", OutputObjects = "mdp_std_s4_iqc / mdp_std_s4_shipment / mdp_std_s4_return / mdp_std_s4_shortage", Execution = "S4MdpSyncTransformService.BuildStandardCommands" },
- new() { StageCode = "DWD", StageName = "采购执行域 · DWD宽表", Layer = "dwd", Description = $"写入采购执行分析宽表,供 {consumers} 读取。", InputObjects = "dwd_supplier_delivery / mdp_std_s4_*", OutputObjects = "dwd_s4_purchase_execution / dwd_po_trans / dwd_qc_trans", Execution = "S4MdpSyncTransformService.BuildDwdAsync" },
- new() { StageCode = "KPI", StageName = "指标写入", Layer = "ado_s9", Description = $"写入采购执行域 L1/L2/L3 指标,供 {consumers} 消费。", InputObjects = "mdp_std_delivery_schedule / dwd_supplier_delivery / dwd_s4_purchase_execution", OutputObjects = "ado_s9_kpi_value_l1/l2/l3_day", Execution = "S4MdpSyncTransformService.BuildS4KpiValuesAsync" }
- };
- }
- /// <summary>
- /// 贴源同步在代码中维护、但未写入 mdp_field_mapping 的实体主键/业务键提示(仅监控展示兜底)。
- /// </summary>
- private static readonly Dictionary<string, (string SourceRowId, string SourceBizKeyExpr)> KnownStagingEntityKeys =
- new(StringComparer.OrdinalIgnoreCase)
- {
- ["S4_IQC_RECEIPT"] = ("RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`Receiver`,''), ':', IFNULL(s.`Line`,''))"),
- ["S4_SHIPMENT_EXEC"] = ("id", "CONCAT(IFNULL(s.`glid`,''), ':', IFNULL(s.`id`,''))"),
- ["S4_RETURN_EXEC"] = ("Id", "s.`dsnum`"),
- ["S4_SHORTAGE_EXEC"] = ("id", "CONCAT(IFNULL(s.`work_order`,''), ':', IFNULL(s.`component_item_code`,''))")
- };
- private static List<MdpLineageFieldMappingRow> BuildFallbackFieldMappings(MdpLineageEntityRow entity, string? batchId)
- {
- var sourceRowId = entity.IncrColumn;
- string? sourceBizKeyExpr = null;
- if (!string.IsNullOrWhiteSpace(entity.EntityCode) &&
- KnownStagingEntityKeys.TryGetValue(entity.EntityCode, out var known))
- {
- sourceRowId ??= known.SourceRowId;
- sourceBizKeyExpr = known.SourceBizKeyExpr;
- }
- var fallbackNote = "当前实体未配置逐字段映射,贴源同步保留源行 raw_data";
- var mappings = new List<MdpLineageFieldMappingRow>();
- var sort = 10;
- void Add(string sourceField, string targetField, string fieldType, string? transformScript, string? constValue = null, bool isRequired = false)
- {
- mappings.Add(new MdpLineageFieldMappingRow
- {
- EntityId = entity.Id,
- SourceField = sourceField,
- TargetField = targetField,
- FieldType = fieldType,
- TransformScript = transformScript,
- ConstValue = constValue,
- IsRequired = isRequired,
- SortOrder = sort,
- MappingSource = "FALLBACK",
- IsFallback = true
- });
- sort += 10;
- }
- Add("tenant_id", "tenant_id", "DIRECT", "当前租户或全局租户兜底");
- Add($"CONST:{entity.SourceCode ?? "AIDOP"}", "source_system", "CONST", "来源数据源编码", entity.SourceCode ?? "AIDOP", isRequired: true);
- Add($"CONST:{entity.SourceTableName ?? entity.SourceApiPath ?? "--"}", "source_table", "CONST", "源表名", entity.SourceTableName ?? entity.SourceApiPath, isRequired: true);
- if (!string.IsNullOrWhiteSpace(sourceRowId))
- Add(sourceRowId, "source_row_id", "DIRECT", "来自 mdp_entity.incr_column 或实体主键配置");
- else
- Add("--", "source_row_id", "DIRECT", fallbackNote);
- if (!string.IsNullOrWhiteSpace(sourceBizKeyExpr))
- Add(sourceBizKeyExpr, "source_biz_key", "EXPR", "来自实体业务键表达式", isRequired: true);
- else
- Add("--", "source_biz_key", "EXPR", fallbackNote);
- Add("*", "raw_data", "JSON", "源行整行 JSON");
- Add("CONST", "sync_batch_id", "CONST", "当前同步批次", string.IsNullOrWhiteSpace(batchId) ? "当前同步批次" : batchId, isRequired: true);
- Add("NOW()", "sync_time", "CONST", "同步时间", isRequired: true);
- return mappings;
- }
- private static List<MdpLineageStageRow> BuildGenericStages(string? moduleCode)
- {
- if (string.Equals(moduleCode, "S1", StringComparison.OrdinalIgnoreCase))
- return BuildOrderDeliveryStages(JobCatalog["S1_MDP_SYNC_TRANSFORM"]);
- if (string.Equals(moduleCode, "S2", StringComparison.OrdinalIgnoreCase))
- return BuildWorkScheduleStages(JobCatalog["S2_MDP_SYNC_TRANSFORM"]);
- if (string.Equals(moduleCode, "S3", StringComparison.OrdinalIgnoreCase))
- return BuildSupplyPurchaseStages(JobCatalog["S3_MDP_SYNC_TRANSFORM"]);
- if (string.Equals(moduleCode, "S4", StringComparison.OrdinalIgnoreCase))
- return BuildPurchaseExecutionStages(JobCatalog["S4_MDP_SYNC_TRANSFORM"]);
- return new List<MdpLineageStageRow>
- {
- new() { StageCode = "STAGING", StageName = "贴源同步", Layer = "mdp_stg", Description = "按 mdp_entity 登记源对象抽取数据。", InputObjects = "源对象", OutputObjects = "mdp_stg_*", Execution = "MDP 同步服务" },
- new() { StageCode = "STANDARD", StageName = "标准层转换", Layer = "mdp_std", Description = "标准层/DWD/KPI 当前由后端 Service 承载。", InputObjects = "mdp_stg_*", OutputObjects = "mdp_std_* / dwd_* / 指标表", Execution = "MDP 转换服务" }
- };
- }
- private static string? ResolveModuleCodeFromJobCode(string? jobCode)
- {
- if (string.IsNullOrWhiteSpace(jobCode))
- return null;
- return ModuleJobCodes.FirstOrDefault(u =>
- string.Equals(u.Value, jobCode.Trim(), StringComparison.OrdinalIgnoreCase)).Key;
- }
- private static string? ResolveJobCode(string? moduleCode, string? jobCode)
- {
- if (!string.IsNullOrWhiteSpace(jobCode))
- return jobCode.Trim().ToUpperInvariant();
- if (string.IsNullOrWhiteSpace(moduleCode))
- return null;
- return ModuleJobCodes.TryGetValue(moduleCode.Trim(), out var mapped) ? mapped : null;
- }
- private static string? ResolveModuleCode(string? moduleCode, string? jobCode)
- {
- if (!string.IsNullOrWhiteSpace(moduleCode))
- return moduleCode.Trim().ToUpperInvariant();
- if (string.IsNullOrWhiteSpace(jobCode))
- return null;
- var normalizedJobCode = jobCode.Trim();
- return ModuleJobCodes.FirstOrDefault(u => string.Equals(u.Value, normalizedJobCode, StringComparison.OrdinalIgnoreCase)).Key;
- }
- private static string? BuildObjectFullName(string? dbType, string? host, int? port, string? dbName, string? objectName)
- {
- if (string.IsNullOrWhiteSpace(objectName))
- return null;
- var databaseObject = string.IsNullOrWhiteSpace(dbName) ? objectName : $"{dbName}.{objectName}";
- var hostPart = string.IsNullOrWhiteSpace(host) ? null : port.HasValue ? $"{host}:{port}" : host;
- return string.Join(" / ", new[] { dbType, hostPart, databaseObject }.Where(u => !string.IsNullOrWhiteSpace(u)));
- }
- private static string SelectColumnsSql()
- {
- return """
- SELECT id AS Id, tenant_id AS TenantId, job_code AS JobCode, job_name AS JobName, trigger_type AS TriggerType,
- batch_id AS BatchId, status AS Status, start_time AS StartTime, end_time AS EndTime, duration_ms AS DurationMs,
- stage_rows AS StageRows, standard_rows AS StandardRows, dwd_rows AS DwdRows,
- error_message AS ErrorMessage, summary_json AS SummaryJson, create_time AS CreateTime, update_time AS UpdateTime
- """;
- }
- }
- public sealed record MdpJobCatalogItem(
- string JobCode,
- string BusinessDomainCode,
- string BusinessDomainName,
- string[] ConsumerModules,
- string ScopeType,
- string DisplayName,
- string? ScheduleJobId);
- public class MdpMonitorQueryInput
- {
- public string? BusinessDomainCode { get; set; }
- public string? ConsumerModule { get; set; }
- public string? ModuleCode { get; set; }
- public string? JobCode { get; set; }
- public string? BatchId { get; set; }
- public string? Status { get; set; }
- public DateTime? StartTime { get; set; }
- public DateTime? EndTime { get; set; }
- }
- public sealed class MdpMonitorListInput : MdpMonitorQueryInput
- {
- public int Page { get; set; } = 1;
- public int PageSize { get; set; } = 10;
- }
- public sealed class MdpMonitorLineageInput : MdpMonitorQueryInput
- {
- }
- public sealed class MdpMonitorRunLogRow
- {
- public long Id { get; set; }
- public long TenantId { get; set; }
- public string? JobCode { get; set; }
- public string? JobName { get; set; }
- public string? TriggerType { get; set; }
- public string? BatchId { get; set; }
- public string? Status { get; set; }
- public DateTime? StartTime { get; set; }
- public DateTime? EndTime { get; set; }
- public int? DurationMs { get; set; }
- public int? StageRows { get; set; }
- public int? StandardRows { get; set; }
- public int? DwdRows { get; set; }
- public string? ErrorMessage { get; set; }
- public string? SummaryJson { get; set; }
- public DateTime? CreateTime { get; set; }
- public DateTime? UpdateTime { get; set; }
- public string? BusinessDomainCode { get; set; }
- public string? BusinessDomainName { get; set; }
- public string? ConsumerModules { get; set; }
- public string? ScopeType { get; set; }
- public string? DisplayName { get; set; }
- public string? ScheduleJobId { get; set; }
- }
- public sealed class MdpLineageOutput
- {
- public string? ModuleCode { get; set; }
- public string? JobCode { get; set; }
- public string? BatchId { get; set; }
- public string? BusinessDomainCode { get; set; }
- public string? BusinessDomainName { get; set; }
- public string? ConsumerModules { get; set; }
- public string? ScopeType { get; set; }
- public string? DisplayName { get; set; }
- public string? ScheduleJobId { get; set; }
- public List<MdpLineageStageRow> Stages { get; set; } = new();
- public List<MdpLineageEntityRow> Entities { get; set; } = new();
- }
- public sealed class MdpLineageStageRow
- {
- public string? StageCode { get; set; }
- public string? StageName { get; set; }
- public string? Layer { get; set; }
- public string? Description { get; set; }
- public string? InputObjects { get; set; }
- public string? OutputObjects { get; set; }
- public string? Execution { get; set; }
- }
- public sealed class MdpLineageEntityRow
- {
- public long Id { get; set; }
- public string? EntityCode { get; set; }
- public string? EntityName { get; set; }
- public string? EntityType { get; set; }
- public string? SourceCode { get; set; }
- public string? SourceName { get; set; }
- public string? SourceType { get; set; }
- public string? SourceDbType { get; set; }
- public string? SourceDbHost { get; set; }
- public int? SourceDbPort { get; set; }
- public string? SourceDbName { get; set; }
- public string? SourceTableName { get; set; }
- public string? SourceApiPath { get; set; }
- public string? SourceFullName { get; set; }
- public string? TargetDbType { get; set; }
- public string? TargetDbHost { get; set; }
- public int? TargetDbPort { get; set; }
- public string? TargetDbName { get; set; }
- public string? TargetTableName { get; set; }
- public string? TargetFullName { get; set; }
- public string? SyncMode { get; set; }
- public string? IncrColumn { get; set; }
- public int? Status { get; set; }
- public int FieldMappingCount { get; set; }
- public List<MdpLineageFieldMappingRow> FieldMappings { get; set; } = new();
- public MdpLineageSyncLogRow? SyncLog { get; set; }
- }
- public sealed class MdpLineageFieldMappingRow
- {
- public long EntityId { get; set; }
- public string? SourceField { get; set; }
- public string? TargetField { get; set; }
- public string? FieldType { get; set; }
- public string? TransformScript { get; set; }
- public string? ConstValue { get; set; }
- public string? LookupTable { get; set; }
- public bool IsRequired { get; set; }
- public string? DefaultValue { get; set; }
- public int SortOrder { get; set; }
- public string? MappingSource { get; set; }
- public bool IsFallback { get; set; }
- }
- public sealed class MdpLineageSyncLogRow
- {
- public long EntityId { get; set; }
- public string? EntityName { get; set; }
- public string? Status { get; set; }
- public long? RowsRead { get; set; }
- public long? RowsInsert { get; set; }
- public long? RowsUpdate { get; set; }
- public long? RowsSkip { get; set; }
- public long? RowsError { get; set; }
- public DateTime? SyncStart { get; set; }
- public DateTime? SyncEnd { get; set; }
- public int? DurationMs { get; set; }
- public string? ErrorMsg { get; set; }
- }
|