| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424 |
- namespace Admin.NET.Plugin.AiDOP.Order;
- /// <summary>
- /// 数据中台统一 MDP 运行监控。
- /// </summary>
- [ApiDescriptionSettings(Order = 322, Description = "统一MDP运行监控")]
- [Route("api/DataPlatform")]
- [AllowAnonymous]
- [NonUnify]
- public class MdpMonitorService : IDynamicApiController, ITransient
- {
- private static readonly Dictionary<string, string> ModuleJobCodes = new(StringComparer.OrdinalIgnoreCase)
- {
- ["S1"] = "S1_MDP_SYNC_TRANSFORM",
- ["S3"] = "S3_MDP_SYNC_TRANSFORM"
- };
- private readonly ISqlSugarClient _db;
- public MdpMonitorService(ISqlSugarClient db)
- {
- _db = db;
- }
- [DisplayName("MDP模块选项")]
- [HttpGet("mdp-monitor/modules")]
- public object GetModules()
- {
- return ModuleJobCodes
- .OrderBy(u => u.Key)
- .Select(u => new { moduleCode = u.Key, jobCode = u.Value })
- .ToList();
- }
- [DisplayName("MDP最近运行状态")]
- [HttpGet("mdp-monitor/latest")]
- public async Task<object> GetLatest([FromQuery] MdpMonitorQueryInput input)
- {
- var (whereSql, pars) = BuildWhere(input);
- return await _db.Ado.SqlQuerySingleAsync<MdpMonitorRunLogRow>(
- $"""
- {SelectColumnsSql()}
- FROM mdp_transform_run_log
- WHERE {whereSql}
- ORDER BY start_time DESC, id DESC
- LIMIT 1
- """,
- pars)
- ?? new MdpMonitorRunLogRow();
- }
- [DisplayName("MDP运行日志列表")]
- [HttpGet("mdp-monitor/list")]
- public async Task<object> GetList([FromQuery] MdpMonitorListInput input)
- {
- var page = input.Page <= 0 ? 1 : input.Page;
- var pageSize = input.PageSize <= 0 ? 10 : input.PageSize;
- var offset = (page - 1) * pageSize;
- var (whereSql, pars) = BuildWhere(input);
- var total = await _db.Ado.GetIntAsync($"SELECT COUNT(1) FROM mdp_transform_run_log WHERE {whereSql}", pars);
- var list = await _db.Ado.SqlQueryAsync<MdpMonitorRunLogRow>(
- $"""
- {SelectColumnsSql()}
- FROM mdp_transform_run_log
- WHERE {whereSql}
- ORDER BY start_time DESC, id DESC
- LIMIT {pageSize} OFFSET {offset}
- """,
- pars);
- return new { total, page, pageSize, list };
- }
- [DisplayName("MDP运行日志详情")]
- [HttpGet("mdp-monitor/detail/{id}")]
- public async Task<object> GetDetail(long id, [FromQuery] MdpMonitorQueryInput input)
- {
- var (whereSql, pars) = BuildWhere(input);
- pars.Add(new SugarParameter("@Id", id));
- var row = await _db.Ado.SqlQuerySingleAsync<MdpMonitorRunLogRow>(
- $"""
- {SelectColumnsSql()}
- FROM mdp_transform_run_log
- WHERE id=@Id AND {whereSql}
- LIMIT 1
- """,
- pars);
- return row ?? throw Oops.Oh("运行日志不存在");
- }
- [DisplayName("MDP同步链路详情")]
- [HttpGet("mdp-monitor/lineage")]
- public async Task<object> GetLineage([FromQuery] MdpMonitorLineageInput input)
- {
- var moduleCode = ResolveModuleCode(input.ModuleCode, input.JobCode);
- if (string.IsNullOrWhiteSpace(moduleCode))
- throw Oops.Oh("请选择 MDP 模块");
- var jobCode = ResolveJobCode(moduleCode, input.JobCode);
- var entityPrefix = $"{moduleCode}_%";
- var entities = await _db.Ado.SqlQueryAsync<MdpLineageEntityRow>(
- """
- SELECT e.id AS Id, e.entity_code AS EntityCode, e.entity_name AS EntityName,
- e.entity_type AS EntityType, s.source_code AS SourceCode,
- s.source_name AS SourceName, s.source_type AS SourceType,
- s.db_type AS SourceDbType, s.db_host AS SourceDbHost,
- s.db_port AS SourceDbPort, s.db_name AS SourceDbName,
- e.source_table_name AS SourceTableName, e.source_api_path AS SourceApiPath,
- s.db_type AS TargetDbType, s.db_host AS TargetDbHost,
- s.db_port AS TargetDbPort, s.db_name AS TargetDbName,
- e.target_table_name AS TargetTableName, e.sync_mode AS SyncMode,
- e.incr_column AS IncrColumn, e.status AS Status
- FROM mdp_entity e
- LEFT JOIN mdp_source s ON s.id = e.source_id
- WHERE e.entity_code LIKE @EntityPrefix
- ORDER BY e.entity_code
- """,
- new SugarParameter("@EntityPrefix", entityPrefix));
- if (entities.Count == 0)
- {
- return new MdpLineageOutput
- {
- ModuleCode = moduleCode,
- JobCode = jobCode,
- Stages = BuildStageDescriptions(moduleCode),
- Entities = new List<MdpLineageEntityRow>()
- };
- }
- var entityIds = string.Join(",", entities.Select(u => u.Id));
- var mappings = await _db.Ado.SqlQueryAsync<MdpLineageFieldMappingRow>(
- $"""
- SELECT entity_id AS EntityId, source_field AS SourceField, target_field AS TargetField,
- field_type AS FieldType, transform_script AS TransformScript,
- const_value AS ConstValue, lookup_table AS LookupTable,
- is_required AS IsRequired, default_value AS DefaultValue, sort_order AS SortOrder
- FROM mdp_field_mapping
- WHERE entity_id IN ({entityIds})
- ORDER BY entity_id, sort_order, target_field
- """);
- var mappingsByEntity = mappings.GroupBy(u => u.EntityId).ToDictionary(u => u.Key, u => u.ToList());
- Dictionary<long, MdpLineageSyncLogRow> syncLogsByEntity = new();
- if (!string.IsNullOrWhiteSpace(input.BatchId))
- {
- var syncLogs = await _db.Ado.SqlQueryAsync<MdpLineageSyncLogRow>(
- """
- SELECT entity_id AS EntityId, entity_name AS EntityName, status AS Status,
- rows_read AS RowsRead, rows_insert AS RowsInsert, rows_update AS RowsUpdate,
- rows_skip AS RowsSkip, rows_error AS RowsError,
- sync_start AS SyncStart, sync_end AS SyncEnd, duration_ms AS DurationMs,
- error_msg AS ErrorMsg
- FROM mdp_sync_log
- WHERE sync_batch_id = @BatchId
- AND entity_id IN (
- SELECT id FROM mdp_entity WHERE entity_code LIKE @EntityPrefix
- )
- ORDER BY sync_start, id
- """,
- new SugarParameter("@BatchId", input.BatchId.Trim()),
- new SugarParameter("@EntityPrefix", entityPrefix));
- syncLogsByEntity = syncLogs
- .GroupBy(u => u.EntityId)
- .ToDictionary(u => u.Key, u => u.OrderByDescending(x => x.SyncStart).First());
- }
- foreach (var entity in entities)
- {
- entity.SourceFullName = BuildObjectFullName(entity.SourceDbType, entity.SourceDbHost, entity.SourceDbPort, entity.SourceDbName, entity.SourceTableName ?? entity.SourceApiPath);
- entity.TargetFullName = BuildObjectFullName(entity.TargetDbType, entity.TargetDbHost, entity.TargetDbPort, entity.TargetDbName, entity.TargetTableName);
- if (mappingsByEntity.TryGetValue(entity.Id, out var entityMappings))
- {
- entity.FieldMappings = entityMappings;
- entity.FieldMappingCount = entityMappings.Count;
- }
- if (syncLogsByEntity.TryGetValue(entity.Id, out var syncLog))
- entity.SyncLog = syncLog;
- }
- return new MdpLineageOutput
- {
- ModuleCode = moduleCode,
- JobCode = jobCode,
- BatchId = input.BatchId,
- Stages = BuildStageDescriptions(moduleCode),
- Entities = entities
- };
- }
- private static (string WhereSql, List<SugarParameter> Parameters) BuildWhere(MdpMonitorQueryInput input)
- {
- var where = new List<string> { "IFNULL(job_code, '') LIKE '%MDP%'" };
- var pars = new List<SugarParameter>();
- var jobCode = ResolveJobCode(input.ModuleCode, input.JobCode);
- if (!string.IsNullOrWhiteSpace(jobCode))
- {
- where.Add("job_code=@JobCode");
- pars.Add(new SugarParameter("@JobCode", jobCode));
- }
- if (!string.IsNullOrWhiteSpace(input.BatchId))
- {
- where.Add("batch_id LIKE @BatchId");
- pars.Add(new SugarParameter("@BatchId", $"%{input.BatchId.Trim()}%"));
- }
- if (!string.IsNullOrWhiteSpace(input.Status))
- {
- where.Add("status=@Status");
- pars.Add(new SugarParameter("@Status", input.Status.Trim().ToUpperInvariant()));
- }
- if (input.StartTime.HasValue)
- {
- where.Add("start_time >= @StartTime");
- pars.Add(new SugarParameter("@StartTime", input.StartTime.Value));
- }
- if (input.EndTime.HasValue)
- {
- where.Add("start_time <= @EndTime");
- pars.Add(new SugarParameter("@EndTime", input.EndTime.Value));
- }
- return (string.Join(" AND ", where), pars);
- }
- private static string? ResolveJobCode(string? moduleCode, string? jobCode)
- {
- if (!string.IsNullOrWhiteSpace(jobCode))
- return jobCode.Trim().ToUpperInvariant();
- if (string.IsNullOrWhiteSpace(moduleCode))
- return null;
- return ModuleJobCodes.TryGetValue(moduleCode.Trim(), out var mapped) ? mapped : null;
- }
- private static string? ResolveModuleCode(string? moduleCode, string? jobCode)
- {
- if (!string.IsNullOrWhiteSpace(moduleCode))
- return moduleCode.Trim().ToUpperInvariant();
- if (string.IsNullOrWhiteSpace(jobCode))
- return null;
- var normalizedJobCode = jobCode.Trim();
- return ModuleJobCodes.FirstOrDefault(u => string.Equals(u.Value, normalizedJobCode, StringComparison.OrdinalIgnoreCase)).Key;
- }
- private static string? BuildObjectFullName(string? dbType, string? host, int? port, string? dbName, string? objectName)
- {
- if (string.IsNullOrWhiteSpace(objectName))
- return null;
- var databaseObject = string.IsNullOrWhiteSpace(dbName) ? objectName : $"{dbName}.{objectName}";
- var hostPart = string.IsNullOrWhiteSpace(host) ? null : port.HasValue ? $"{host}:{port}" : host;
- return string.Join(" / ", new[] { dbType, hostPart, databaseObject }.Where(u => !string.IsNullOrWhiteSpace(u)));
- }
- private static List<MdpLineageStageRow> BuildStageDescriptions(string moduleCode)
- {
- if (string.Equals(moduleCode, "S1", StringComparison.OrdinalIgnoreCase))
- {
- return new List<MdpLineageStageRow>
- {
- new() { StageCode = "STAGING", StageName = "贴源同步", Layer = "mdp_stg", Description = "按 mdp_entity 登记的 S1 源对象抽取数据,保留 raw_data JSON 便于追溯。", InputObjects = "旧系统 / 当前库源对象", OutputObjects = "mdp_stg_so, mdp_stg_ship_trans", Execution = "S1MdpSyncTransformService.SyncStagingAsync" },
- new() { StageCode = "STANDARD", StageName = "标准层转换", Layer = "mdp_std", Description = "解析贴源 raw_data,做字段标准化、租户兜底和幂等写入。", InputObjects = "mdp_stg_so, mdp_stg_ship_trans", OutputObjects = "mdp_std_so, mdp_std_ship_trans", Execution = "S1MdpSyncTransformService.BuildStandardCommands" },
- new() { StageCode = "DWD", StageName = "DWD宽表", Layer = "dwd", Description = "沉淀 S1 订单交付事实,供订单交付、看板和诊断读取。", InputObjects = "mdp_std_so, mdp_std_ship_trans", OutputObjects = "dwd_ship_trans", Execution = "S1MdpSyncTransformService.BuildDwdAsync" },
- new() { StageCode = "KPI", StageName = "指标写入", Layer = "ado_s9", Description = "计算 S1 L1 指标并写入统一指标值表。", InputObjects = "mdp_std_so, dwd_ship_trans", OutputObjects = "ado_s9_kpi_value_l1_day", Execution = "S1MdpSyncTransformService.BuildS1KpiValuesAsync" }
- };
- }
- if (string.Equals(moduleCode, "S3", StringComparison.OrdinalIgnoreCase))
- {
- return new List<MdpLineageStageRow>
- {
- new() { StageCode = "STAGING", StageName = "贴源同步", Layer = "mdp_stg", Description = "按 mdp_entity 登记的 S3 源对象抽取数据,保留 raw_data JSON 便于追溯。", InputObjects = "S3 源对象", OutputObjects = "mdp_stg_*", Execution = "S3MdpSyncTransformService.SyncStagingAsync" },
- new() { StageCode = "STANDARD", StageName = "标准层转换", Layer = "mdp_std", Description = "将供应、物料、采购、交付等对象标准化。", InputObjects = "mdp_stg_*", OutputObjects = "mdp_std_*", Execution = "S3MdpSyncTransformService.BuildStandardCommands" },
- new() { StageCode = "DWD", StageName = "DWD宽表", Layer = "dwd", Description = "生成供应交付、齐套、风险等分析宽表。", InputObjects = "mdp_std_*", OutputObjects = "dwd_supplier_delivery / dwd_material_readiness 等", Execution = "S3MdpSyncTransformService.BuildDwdAsync" },
- new() { StageCode = "KPI", StageName = "指标写入", Layer = "ado_s9", Description = "写入 S3 供应协同指标。", InputObjects = "mdp_std_* / dwd_*", OutputObjects = "ado_s9_kpi_value_*", Execution = "S3MdpSyncTransformService.BuildS3KpiValuesAsync" }
- };
- }
- return new List<MdpLineageStageRow>
- {
- new() { StageCode = "STAGING", StageName = "贴源同步", Layer = "mdp_stg", Description = "按 mdp_entity 登记源对象抽取数据。", InputObjects = "源对象", OutputObjects = "mdp_stg_*", Execution = "模块 MDP 同步服务" },
- new() { StageCode = "STANDARD", StageName = "标准层转换", Layer = "mdp_std", Description = "标准层/DWD/KPI 当前由模块后端 Service 承载。", InputObjects = "mdp_stg_*", OutputObjects = "mdp_std_* / dwd_* / 指标表", Execution = "模块 MDP 转换服务" }
- };
- }
- private static string SelectColumnsSql()
- {
- return """
- SELECT id AS Id, tenant_id AS TenantId, job_code AS JobCode, job_name AS JobName, trigger_type AS TriggerType,
- batch_id AS BatchId, status AS Status, start_time AS StartTime, end_time AS EndTime, duration_ms AS DurationMs,
- stage_rows AS StageRows, standard_rows AS StandardRows, dwd_rows AS DwdRows,
- error_message AS ErrorMessage, summary_json AS SummaryJson, create_time AS CreateTime, update_time AS UpdateTime
- """;
- }
- }
- public class MdpMonitorQueryInput
- {
- public string? ModuleCode { get; set; }
- public string? JobCode { get; set; }
- public string? BatchId { get; set; }
- public string? Status { get; set; }
- public DateTime? StartTime { get; set; }
- public DateTime? EndTime { get; set; }
- }
- public sealed class MdpMonitorListInput : MdpMonitorQueryInput
- {
- public int Page { get; set; } = 1;
- public int PageSize { get; set; } = 10;
- }
- public sealed class MdpMonitorLineageInput : MdpMonitorQueryInput
- {
- }
- public sealed class MdpMonitorRunLogRow
- {
- public long Id { get; set; }
- public long TenantId { get; set; }
- public string? JobCode { get; set; }
- public string? JobName { get; set; }
- public string? TriggerType { get; set; }
- public string? BatchId { get; set; }
- public string? Status { get; set; }
- public DateTime? StartTime { get; set; }
- public DateTime? EndTime { get; set; }
- public int? DurationMs { get; set; }
- public int? StageRows { get; set; }
- public int? StandardRows { get; set; }
- public int? DwdRows { get; set; }
- public string? ErrorMessage { get; set; }
- public string? SummaryJson { get; set; }
- public DateTime? CreateTime { get; set; }
- public DateTime? UpdateTime { get; set; }
- }
- public sealed class MdpLineageOutput
- {
- public string? ModuleCode { get; set; }
- public string? JobCode { get; set; }
- public string? BatchId { get; set; }
- public List<MdpLineageStageRow> Stages { get; set; } = new();
- public List<MdpLineageEntityRow> Entities { get; set; } = new();
- }
- public sealed class MdpLineageStageRow
- {
- public string? StageCode { get; set; }
- public string? StageName { get; set; }
- public string? Layer { get; set; }
- public string? Description { get; set; }
- public string? InputObjects { get; set; }
- public string? OutputObjects { get; set; }
- public string? Execution { get; set; }
- }
- public sealed class MdpLineageEntityRow
- {
- public long Id { get; set; }
- public string? EntityCode { get; set; }
- public string? EntityName { get; set; }
- public string? EntityType { get; set; }
- public string? SourceCode { get; set; }
- public string? SourceName { get; set; }
- public string? SourceType { get; set; }
- public string? SourceDbType { get; set; }
- public string? SourceDbHost { get; set; }
- public int? SourceDbPort { get; set; }
- public string? SourceDbName { get; set; }
- public string? SourceTableName { get; set; }
- public string? SourceApiPath { get; set; }
- public string? SourceFullName { get; set; }
- public string? TargetDbType { get; set; }
- public string? TargetDbHost { get; set; }
- public int? TargetDbPort { get; set; }
- public string? TargetDbName { get; set; }
- public string? TargetTableName { get; set; }
- public string? TargetFullName { get; set; }
- public string? SyncMode { get; set; }
- public string? IncrColumn { get; set; }
- public int? Status { get; set; }
- public int FieldMappingCount { get; set; }
- public List<MdpLineageFieldMappingRow> FieldMappings { get; set; } = new();
- public MdpLineageSyncLogRow? SyncLog { get; set; }
- }
- public sealed class MdpLineageFieldMappingRow
- {
- public long EntityId { get; set; }
- public string? SourceField { get; set; }
- public string? TargetField { get; set; }
- public string? FieldType { get; set; }
- public string? TransformScript { get; set; }
- public string? ConstValue { get; set; }
- public string? LookupTable { get; set; }
- public bool IsRequired { get; set; }
- public string? DefaultValue { get; set; }
- public int SortOrder { get; set; }
- }
- public sealed class MdpLineageSyncLogRow
- {
- public long EntityId { get; set; }
- public string? EntityName { get; set; }
- public string? Status { get; set; }
- public long? RowsRead { get; set; }
- public long? RowsInsert { get; set; }
- public long? RowsUpdate { get; set; }
- public long? RowsSkip { get; set; }
- public long? RowsError { get; set; }
- public DateTime? SyncStart { get; set; }
- public DateTime? SyncEnd { get; set; }
- public int? DurationMs { get; set; }
- public string? ErrorMsg { get; set; }
- }
|