namespace Admin.NET.Plugin.AiDOP.Order; /// /// 数据中台统一 MDP 运行监控。 /// [ApiDescriptionSettings(Order = 322, Description = "统一MDP运行监控")] [Route("api/DataPlatform")] [AllowAnonymous] [NonUnify] public class MdpMonitorService : IDynamicApiController, ITransient { private static readonly Dictionary ModuleJobCodes = new(StringComparer.OrdinalIgnoreCase) { ["S1"] = "S1_MDP_SYNC_TRANSFORM", ["S3"] = "S3_MDP_SYNC_TRANSFORM" }; private readonly ISqlSugarClient _db; public MdpMonitorService(ISqlSugarClient db) { _db = db; } [DisplayName("MDP模块选项")] [HttpGet("mdp-monitor/modules")] public object GetModules() { return ModuleJobCodes .OrderBy(u => u.Key) .Select(u => new { moduleCode = u.Key, jobCode = u.Value }) .ToList(); } [DisplayName("MDP最近运行状态")] [HttpGet("mdp-monitor/latest")] public async Task GetLatest([FromQuery] MdpMonitorQueryInput input) { var (whereSql, pars) = BuildWhere(input); return await _db.Ado.SqlQuerySingleAsync( $""" {SelectColumnsSql()} FROM mdp_transform_run_log WHERE {whereSql} ORDER BY start_time DESC, id DESC LIMIT 1 """, pars) ?? new MdpMonitorRunLogRow(); } [DisplayName("MDP运行日志列表")] [HttpGet("mdp-monitor/list")] public async Task GetList([FromQuery] MdpMonitorListInput input) { var page = input.Page <= 0 ? 1 : input.Page; var pageSize = input.PageSize <= 0 ? 10 : input.PageSize; var offset = (page - 1) * pageSize; var (whereSql, pars) = BuildWhere(input); var total = await _db.Ado.GetIntAsync($"SELECT COUNT(1) FROM mdp_transform_run_log WHERE {whereSql}", pars); var list = await _db.Ado.SqlQueryAsync( $""" {SelectColumnsSql()} FROM mdp_transform_run_log WHERE {whereSql} ORDER BY start_time DESC, id DESC LIMIT {pageSize} OFFSET {offset} """, pars); return new { total, page, pageSize, list }; } [DisplayName("MDP运行日志详情")] [HttpGet("mdp-monitor/detail/{id}")] public async Task GetDetail(long id, [FromQuery] MdpMonitorQueryInput input) { var (whereSql, pars) = BuildWhere(input); pars.Add(new SugarParameter("@Id", id)); var row = await _db.Ado.SqlQuerySingleAsync( $""" {SelectColumnsSql()} FROM mdp_transform_run_log WHERE id=@Id AND {whereSql} LIMIT 1 """, pars); return row ?? throw Oops.Oh("运行日志不存在"); } [DisplayName("MDP同步链路详情")] [HttpGet("mdp-monitor/lineage")] public async Task GetLineage([FromQuery] MdpMonitorLineageInput input) { var moduleCode = ResolveModuleCode(input.ModuleCode, input.JobCode); if (string.IsNullOrWhiteSpace(moduleCode)) throw Oops.Oh("请选择 MDP 模块"); var jobCode = ResolveJobCode(moduleCode, input.JobCode); var entityPrefix = $"{moduleCode}_%"; var entities = await _db.Ado.SqlQueryAsync( """ SELECT e.id AS Id, e.entity_code AS EntityCode, e.entity_name AS EntityName, e.entity_type AS EntityType, s.source_code AS SourceCode, s.source_name AS SourceName, s.source_type AS SourceType, s.db_type AS SourceDbType, s.db_host AS SourceDbHost, s.db_port AS SourceDbPort, s.db_name AS SourceDbName, e.source_table_name AS SourceTableName, e.source_api_path AS SourceApiPath, s.db_type AS TargetDbType, s.db_host AS TargetDbHost, s.db_port AS TargetDbPort, s.db_name AS TargetDbName, e.target_table_name AS TargetTableName, e.sync_mode AS SyncMode, e.incr_column AS IncrColumn, e.status AS Status FROM mdp_entity e LEFT JOIN mdp_source s ON s.id = e.source_id WHERE e.entity_code LIKE @EntityPrefix ORDER BY e.entity_code """, new SugarParameter("@EntityPrefix", entityPrefix)); if (entities.Count == 0) { return new MdpLineageOutput { ModuleCode = moduleCode, JobCode = jobCode, Stages = BuildStageDescriptions(moduleCode), Entities = new List() }; } var entityIds = string.Join(",", entities.Select(u => u.Id)); var mappings = await _db.Ado.SqlQueryAsync( $""" SELECT entity_id AS EntityId, source_field AS SourceField, target_field AS TargetField, field_type AS FieldType, transform_script AS TransformScript, const_value AS ConstValue, lookup_table AS LookupTable, is_required AS IsRequired, default_value AS DefaultValue, sort_order AS SortOrder FROM mdp_field_mapping WHERE entity_id IN ({entityIds}) ORDER BY entity_id, sort_order, target_field """); var mappingsByEntity = mappings.GroupBy(u => u.EntityId).ToDictionary(u => u.Key, u => u.ToList()); Dictionary syncLogsByEntity = new(); if (!string.IsNullOrWhiteSpace(input.BatchId)) { var syncLogs = await _db.Ado.SqlQueryAsync( """ SELECT entity_id AS EntityId, entity_name AS EntityName, status AS Status, rows_read AS RowsRead, rows_insert AS RowsInsert, rows_update AS RowsUpdate, rows_skip AS RowsSkip, rows_error AS RowsError, sync_start AS SyncStart, sync_end AS SyncEnd, duration_ms AS DurationMs, error_msg AS ErrorMsg FROM mdp_sync_log WHERE sync_batch_id = @BatchId AND entity_id IN ( SELECT id FROM mdp_entity WHERE entity_code LIKE @EntityPrefix ) ORDER BY sync_start, id """, new SugarParameter("@BatchId", input.BatchId.Trim()), new SugarParameter("@EntityPrefix", entityPrefix)); syncLogsByEntity = syncLogs .GroupBy(u => u.EntityId) .ToDictionary(u => u.Key, u => u.OrderByDescending(x => x.SyncStart).First()); } foreach (var entity in entities) { entity.SourceFullName = BuildObjectFullName(entity.SourceDbType, entity.SourceDbHost, entity.SourceDbPort, entity.SourceDbName, entity.SourceTableName ?? entity.SourceApiPath); entity.TargetFullName = BuildObjectFullName(entity.TargetDbType, entity.TargetDbHost, entity.TargetDbPort, entity.TargetDbName, entity.TargetTableName); if (mappingsByEntity.TryGetValue(entity.Id, out var entityMappings)) { entity.FieldMappings = entityMappings; entity.FieldMappingCount = entityMappings.Count; } if (syncLogsByEntity.TryGetValue(entity.Id, out var syncLog)) entity.SyncLog = syncLog; } return new MdpLineageOutput { ModuleCode = moduleCode, JobCode = jobCode, BatchId = input.BatchId, Stages = BuildStageDescriptions(moduleCode), Entities = entities }; } private static (string WhereSql, List Parameters) BuildWhere(MdpMonitorQueryInput input) { var where = new List { "IFNULL(job_code, '') LIKE '%MDP%'" }; var pars = new List(); var jobCode = ResolveJobCode(input.ModuleCode, input.JobCode); if (!string.IsNullOrWhiteSpace(jobCode)) { where.Add("job_code=@JobCode"); pars.Add(new SugarParameter("@JobCode", jobCode)); } if (!string.IsNullOrWhiteSpace(input.BatchId)) { where.Add("batch_id LIKE @BatchId"); pars.Add(new SugarParameter("@BatchId", $"%{input.BatchId.Trim()}%")); } if (!string.IsNullOrWhiteSpace(input.Status)) { where.Add("status=@Status"); pars.Add(new SugarParameter("@Status", input.Status.Trim().ToUpperInvariant())); } if (input.StartTime.HasValue) { where.Add("start_time >= @StartTime"); pars.Add(new SugarParameter("@StartTime", input.StartTime.Value)); } if (input.EndTime.HasValue) { where.Add("start_time <= @EndTime"); pars.Add(new SugarParameter("@EndTime", input.EndTime.Value)); } return (string.Join(" AND ", where), pars); } private static string? ResolveJobCode(string? moduleCode, string? jobCode) { if (!string.IsNullOrWhiteSpace(jobCode)) return jobCode.Trim().ToUpperInvariant(); if (string.IsNullOrWhiteSpace(moduleCode)) return null; return ModuleJobCodes.TryGetValue(moduleCode.Trim(), out var mapped) ? mapped : null; } private static string? ResolveModuleCode(string? moduleCode, string? jobCode) { if (!string.IsNullOrWhiteSpace(moduleCode)) return moduleCode.Trim().ToUpperInvariant(); if (string.IsNullOrWhiteSpace(jobCode)) return null; var normalizedJobCode = jobCode.Trim(); return ModuleJobCodes.FirstOrDefault(u => string.Equals(u.Value, normalizedJobCode, StringComparison.OrdinalIgnoreCase)).Key; } private static string? BuildObjectFullName(string? dbType, string? host, int? port, string? dbName, string? objectName) { if (string.IsNullOrWhiteSpace(objectName)) return null; var databaseObject = string.IsNullOrWhiteSpace(dbName) ? objectName : $"{dbName}.{objectName}"; var hostPart = string.IsNullOrWhiteSpace(host) ? null : port.HasValue ? $"{host}:{port}" : host; return string.Join(" / ", new[] { dbType, hostPart, databaseObject }.Where(u => !string.IsNullOrWhiteSpace(u))); } private static List BuildStageDescriptions(string moduleCode) { if (string.Equals(moduleCode, "S1", StringComparison.OrdinalIgnoreCase)) { return new List { new() { StageCode = "STAGING", StageName = "贴源同步", Layer = "mdp_stg", Description = "按 mdp_entity 登记的 S1 源对象抽取数据,保留 raw_data JSON 便于追溯。", InputObjects = "旧系统 / 当前库源对象", OutputObjects = "mdp_stg_so, mdp_stg_ship_trans", Execution = "S1MdpSyncTransformService.SyncStagingAsync" }, new() { StageCode = "STANDARD", StageName = "标准层转换", Layer = "mdp_std", Description = "解析贴源 raw_data,做字段标准化、租户兜底和幂等写入。", InputObjects = "mdp_stg_so, mdp_stg_ship_trans", OutputObjects = "mdp_std_so, mdp_std_ship_trans", Execution = "S1MdpSyncTransformService.BuildStandardCommands" }, new() { StageCode = "DWD", StageName = "DWD宽表", Layer = "dwd", Description = "沉淀 S1 订单交付事实,供订单交付、看板和诊断读取。", InputObjects = "mdp_std_so, mdp_std_ship_trans", OutputObjects = "dwd_ship_trans", Execution = "S1MdpSyncTransformService.BuildDwdAsync" }, new() { StageCode = "KPI", StageName = "指标写入", Layer = "ado_s9", Description = "计算 S1 L1 指标并写入统一指标值表。", InputObjects = "mdp_std_so, dwd_ship_trans", OutputObjects = "ado_s9_kpi_value_l1_day", Execution = "S1MdpSyncTransformService.BuildS1KpiValuesAsync" } }; } if (string.Equals(moduleCode, "S3", StringComparison.OrdinalIgnoreCase)) { return new List { new() { StageCode = "STAGING", StageName = "贴源同步", Layer = "mdp_stg", Description = "按 mdp_entity 登记的 S3 源对象抽取数据,保留 raw_data JSON 便于追溯。", InputObjects = "S3 源对象", OutputObjects = "mdp_stg_*", Execution = "S3MdpSyncTransformService.SyncStagingAsync" }, new() { StageCode = "STANDARD", StageName = "标准层转换", Layer = "mdp_std", Description = "将供应、物料、采购、交付等对象标准化。", InputObjects = "mdp_stg_*", OutputObjects = "mdp_std_*", Execution = "S3MdpSyncTransformService.BuildStandardCommands" }, new() { StageCode = "DWD", StageName = "DWD宽表", Layer = "dwd", Description = "生成供应交付、齐套、风险等分析宽表。", InputObjects = "mdp_std_*", OutputObjects = "dwd_supplier_delivery / dwd_material_readiness 等", Execution = "S3MdpSyncTransformService.BuildDwdAsync" }, new() { StageCode = "KPI", StageName = "指标写入", Layer = "ado_s9", Description = "写入 S3 供应协同指标。", InputObjects = "mdp_std_* / dwd_*", OutputObjects = "ado_s9_kpi_value_*", Execution = "S3MdpSyncTransformService.BuildS3KpiValuesAsync" } }; } return new List { new() { StageCode = "STAGING", StageName = "贴源同步", Layer = "mdp_stg", Description = "按 mdp_entity 登记源对象抽取数据。", InputObjects = "源对象", OutputObjects = "mdp_stg_*", Execution = "模块 MDP 同步服务" }, new() { StageCode = "STANDARD", StageName = "标准层转换", Layer = "mdp_std", Description = "标准层/DWD/KPI 当前由模块后端 Service 承载。", InputObjects = "mdp_stg_*", OutputObjects = "mdp_std_* / dwd_* / 指标表", Execution = "模块 MDP 转换服务" } }; } private static string SelectColumnsSql() { return """ SELECT id AS Id, tenant_id AS TenantId, job_code AS JobCode, job_name AS JobName, trigger_type AS TriggerType, batch_id AS BatchId, status AS Status, start_time AS StartTime, end_time AS EndTime, duration_ms AS DurationMs, stage_rows AS StageRows, standard_rows AS StandardRows, dwd_rows AS DwdRows, error_message AS ErrorMessage, summary_json AS SummaryJson, create_time AS CreateTime, update_time AS UpdateTime """; } } public class MdpMonitorQueryInput { public string? ModuleCode { get; set; } public string? JobCode { get; set; } public string? BatchId { get; set; } public string? Status { get; set; } public DateTime? StartTime { get; set; } public DateTime? EndTime { get; set; } } public sealed class MdpMonitorListInput : MdpMonitorQueryInput { public int Page { get; set; } = 1; public int PageSize { get; set; } = 10; } public sealed class MdpMonitorLineageInput : MdpMonitorQueryInput { } public sealed class MdpMonitorRunLogRow { public long Id { get; set; } public long TenantId { get; set; } public string? JobCode { get; set; } public string? JobName { get; set; } public string? TriggerType { get; set; } public string? BatchId { get; set; } public string? Status { get; set; } public DateTime? StartTime { get; set; } public DateTime? EndTime { get; set; } public int? DurationMs { get; set; } public int? StageRows { get; set; } public int? StandardRows { get; set; } public int? DwdRows { get; set; } public string? ErrorMessage { get; set; } public string? SummaryJson { get; set; } public DateTime? CreateTime { get; set; } public DateTime? UpdateTime { get; set; } } public sealed class MdpLineageOutput { public string? ModuleCode { get; set; } public string? JobCode { get; set; } public string? BatchId { get; set; } public List Stages { get; set; } = new(); public List Entities { get; set; } = new(); } public sealed class MdpLineageStageRow { public string? StageCode { get; set; } public string? StageName { get; set; } public string? Layer { get; set; } public string? Description { get; set; } public string? InputObjects { get; set; } public string? OutputObjects { get; set; } public string? Execution { get; set; } } public sealed class MdpLineageEntityRow { public long Id { get; set; } public string? EntityCode { get; set; } public string? EntityName { get; set; } public string? EntityType { get; set; } public string? SourceCode { get; set; } public string? SourceName { get; set; } public string? SourceType { get; set; } public string? SourceDbType { get; set; } public string? SourceDbHost { get; set; } public int? SourceDbPort { get; set; } public string? SourceDbName { get; set; } public string? SourceTableName { get; set; } public string? SourceApiPath { get; set; } public string? SourceFullName { get; set; } public string? TargetDbType { get; set; } public string? TargetDbHost { get; set; } public int? TargetDbPort { get; set; } public string? TargetDbName { get; set; } public string? TargetTableName { get; set; } public string? TargetFullName { get; set; } public string? SyncMode { get; set; } public string? IncrColumn { get; set; } public int? Status { get; set; } public int FieldMappingCount { get; set; } public List FieldMappings { get; set; } = new(); public MdpLineageSyncLogRow? SyncLog { get; set; } } public sealed class MdpLineageFieldMappingRow { public long EntityId { get; set; } public string? SourceField { get; set; } public string? TargetField { get; set; } public string? FieldType { get; set; } public string? TransformScript { get; set; } public string? ConstValue { get; set; } public string? LookupTable { get; set; } public bool IsRequired { get; set; } public string? DefaultValue { get; set; } public int SortOrder { get; set; } } public sealed class MdpLineageSyncLogRow { public long EntityId { get; set; } public string? EntityName { get; set; } public string? Status { get; set; } public long? RowsRead { get; set; } public long? RowsInsert { get; set; } public long? RowsUpdate { get; set; } public long? RowsSkip { get; set; } public long? RowsError { get; set; } public DateTime? SyncStart { get; set; } public DateTime? SyncEnd { get; set; } public int? DurationMs { get; set; } public string? ErrorMsg { get; set; } }