MdpMonitorService.cs 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424
  1. namespace Admin.NET.Plugin.AiDOP.Order;
  2. /// <summary>
  3. /// 数据中台统一 MDP 运行监控。
  4. /// </summary>
  5. [ApiDescriptionSettings(Order = 322, Description = "统一MDP运行监控")]
  6. [Route("api/DataPlatform")]
  7. [AllowAnonymous]
  8. [NonUnify]
  9. public class MdpMonitorService : IDynamicApiController, ITransient
  10. {
  11. private static readonly Dictionary<string, string> ModuleJobCodes = new(StringComparer.OrdinalIgnoreCase)
  12. {
  13. ["S1"] = "S1_MDP_SYNC_TRANSFORM",
  14. ["S3"] = "S3_MDP_SYNC_TRANSFORM"
  15. };
  16. private readonly ISqlSugarClient _db;
  17. public MdpMonitorService(ISqlSugarClient db)
  18. {
  19. _db = db;
  20. }
  21. [DisplayName("MDP模块选项")]
  22. [HttpGet("mdp-monitor/modules")]
  23. public object GetModules()
  24. {
  25. return ModuleJobCodes
  26. .OrderBy(u => u.Key)
  27. .Select(u => new { moduleCode = u.Key, jobCode = u.Value })
  28. .ToList();
  29. }
  30. [DisplayName("MDP最近运行状态")]
  31. [HttpGet("mdp-monitor/latest")]
  32. public async Task<object> GetLatest([FromQuery] MdpMonitorQueryInput input)
  33. {
  34. var (whereSql, pars) = BuildWhere(input);
  35. return await _db.Ado.SqlQuerySingleAsync<MdpMonitorRunLogRow>(
  36. $"""
  37. {SelectColumnsSql()}
  38. FROM mdp_transform_run_log
  39. WHERE {whereSql}
  40. ORDER BY start_time DESC, id DESC
  41. LIMIT 1
  42. """,
  43. pars)
  44. ?? new MdpMonitorRunLogRow();
  45. }
  46. [DisplayName("MDP运行日志列表")]
  47. [HttpGet("mdp-monitor/list")]
  48. public async Task<object> GetList([FromQuery] MdpMonitorListInput input)
  49. {
  50. var page = input.Page <= 0 ? 1 : input.Page;
  51. var pageSize = input.PageSize <= 0 ? 10 : input.PageSize;
  52. var offset = (page - 1) * pageSize;
  53. var (whereSql, pars) = BuildWhere(input);
  54. var total = await _db.Ado.GetIntAsync($"SELECT COUNT(1) FROM mdp_transform_run_log WHERE {whereSql}", pars);
  55. var list = await _db.Ado.SqlQueryAsync<MdpMonitorRunLogRow>(
  56. $"""
  57. {SelectColumnsSql()}
  58. FROM mdp_transform_run_log
  59. WHERE {whereSql}
  60. ORDER BY start_time DESC, id DESC
  61. LIMIT {pageSize} OFFSET {offset}
  62. """,
  63. pars);
  64. return new { total, page, pageSize, list };
  65. }
  66. [DisplayName("MDP运行日志详情")]
  67. [HttpGet("mdp-monitor/detail/{id}")]
  68. public async Task<object> GetDetail(long id, [FromQuery] MdpMonitorQueryInput input)
  69. {
  70. var (whereSql, pars) = BuildWhere(input);
  71. pars.Add(new SugarParameter("@Id", id));
  72. var row = await _db.Ado.SqlQuerySingleAsync<MdpMonitorRunLogRow>(
  73. $"""
  74. {SelectColumnsSql()}
  75. FROM mdp_transform_run_log
  76. WHERE id=@Id AND {whereSql}
  77. LIMIT 1
  78. """,
  79. pars);
  80. return row ?? throw Oops.Oh("运行日志不存在");
  81. }
  82. [DisplayName("MDP同步链路详情")]
  83. [HttpGet("mdp-monitor/lineage")]
  84. public async Task<object> GetLineage([FromQuery] MdpMonitorLineageInput input)
  85. {
  86. var moduleCode = ResolveModuleCode(input.ModuleCode, input.JobCode);
  87. if (string.IsNullOrWhiteSpace(moduleCode))
  88. throw Oops.Oh("请选择 MDP 模块");
  89. var jobCode = ResolveJobCode(moduleCode, input.JobCode);
  90. var entityPrefix = $"{moduleCode}_%";
  91. var entities = await _db.Ado.SqlQueryAsync<MdpLineageEntityRow>(
  92. """
  93. SELECT e.id AS Id, e.entity_code AS EntityCode, e.entity_name AS EntityName,
  94. e.entity_type AS EntityType, s.source_code AS SourceCode,
  95. s.source_name AS SourceName, s.source_type AS SourceType,
  96. s.db_type AS SourceDbType, s.db_host AS SourceDbHost,
  97. s.db_port AS SourceDbPort, s.db_name AS SourceDbName,
  98. e.source_table_name AS SourceTableName, e.source_api_path AS SourceApiPath,
  99. s.db_type AS TargetDbType, s.db_host AS TargetDbHost,
  100. s.db_port AS TargetDbPort, s.db_name AS TargetDbName,
  101. e.target_table_name AS TargetTableName, e.sync_mode AS SyncMode,
  102. e.incr_column AS IncrColumn, e.status AS Status
  103. FROM mdp_entity e
  104. LEFT JOIN mdp_source s ON s.id = e.source_id
  105. WHERE e.entity_code LIKE @EntityPrefix
  106. ORDER BY e.entity_code
  107. """,
  108. new SugarParameter("@EntityPrefix", entityPrefix));
  109. if (entities.Count == 0)
  110. {
  111. return new MdpLineageOutput
  112. {
  113. ModuleCode = moduleCode,
  114. JobCode = jobCode,
  115. Stages = BuildStageDescriptions(moduleCode),
  116. Entities = new List<MdpLineageEntityRow>()
  117. };
  118. }
  119. var entityIds = string.Join(",", entities.Select(u => u.Id));
  120. var mappings = await _db.Ado.SqlQueryAsync<MdpLineageFieldMappingRow>(
  121. $"""
  122. SELECT entity_id AS EntityId, source_field AS SourceField, target_field AS TargetField,
  123. field_type AS FieldType, transform_script AS TransformScript,
  124. const_value AS ConstValue, lookup_table AS LookupTable,
  125. is_required AS IsRequired, default_value AS DefaultValue, sort_order AS SortOrder
  126. FROM mdp_field_mapping
  127. WHERE entity_id IN ({entityIds})
  128. ORDER BY entity_id, sort_order, target_field
  129. """);
  130. var mappingsByEntity = mappings.GroupBy(u => u.EntityId).ToDictionary(u => u.Key, u => u.ToList());
  131. Dictionary<long, MdpLineageSyncLogRow> syncLogsByEntity = new();
  132. if (!string.IsNullOrWhiteSpace(input.BatchId))
  133. {
  134. var syncLogs = await _db.Ado.SqlQueryAsync<MdpLineageSyncLogRow>(
  135. """
  136. SELECT entity_id AS EntityId, entity_name AS EntityName, status AS Status,
  137. rows_read AS RowsRead, rows_insert AS RowsInsert, rows_update AS RowsUpdate,
  138. rows_skip AS RowsSkip, rows_error AS RowsError,
  139. sync_start AS SyncStart, sync_end AS SyncEnd, duration_ms AS DurationMs,
  140. error_msg AS ErrorMsg
  141. FROM mdp_sync_log
  142. WHERE sync_batch_id = @BatchId
  143. AND entity_id IN (
  144. SELECT id FROM mdp_entity WHERE entity_code LIKE @EntityPrefix
  145. )
  146. ORDER BY sync_start, id
  147. """,
  148. new SugarParameter("@BatchId", input.BatchId.Trim()),
  149. new SugarParameter("@EntityPrefix", entityPrefix));
  150. syncLogsByEntity = syncLogs
  151. .GroupBy(u => u.EntityId)
  152. .ToDictionary(u => u.Key, u => u.OrderByDescending(x => x.SyncStart).First());
  153. }
  154. foreach (var entity in entities)
  155. {
  156. entity.SourceFullName = BuildObjectFullName(entity.SourceDbType, entity.SourceDbHost, entity.SourceDbPort, entity.SourceDbName, entity.SourceTableName ?? entity.SourceApiPath);
  157. entity.TargetFullName = BuildObjectFullName(entity.TargetDbType, entity.TargetDbHost, entity.TargetDbPort, entity.TargetDbName, entity.TargetTableName);
  158. if (mappingsByEntity.TryGetValue(entity.Id, out var entityMappings))
  159. {
  160. entity.FieldMappings = entityMappings;
  161. entity.FieldMappingCount = entityMappings.Count;
  162. }
  163. if (syncLogsByEntity.TryGetValue(entity.Id, out var syncLog))
  164. entity.SyncLog = syncLog;
  165. }
  166. return new MdpLineageOutput
  167. {
  168. ModuleCode = moduleCode,
  169. JobCode = jobCode,
  170. BatchId = input.BatchId,
  171. Stages = BuildStageDescriptions(moduleCode),
  172. Entities = entities
  173. };
  174. }
  175. private static (string WhereSql, List<SugarParameter> Parameters) BuildWhere(MdpMonitorQueryInput input)
  176. {
  177. var where = new List<string> { "IFNULL(job_code, '') LIKE '%MDP%'" };
  178. var pars = new List<SugarParameter>();
  179. var jobCode = ResolveJobCode(input.ModuleCode, input.JobCode);
  180. if (!string.IsNullOrWhiteSpace(jobCode))
  181. {
  182. where.Add("job_code=@JobCode");
  183. pars.Add(new SugarParameter("@JobCode", jobCode));
  184. }
  185. if (!string.IsNullOrWhiteSpace(input.BatchId))
  186. {
  187. where.Add("batch_id LIKE @BatchId");
  188. pars.Add(new SugarParameter("@BatchId", $"%{input.BatchId.Trim()}%"));
  189. }
  190. if (!string.IsNullOrWhiteSpace(input.Status))
  191. {
  192. where.Add("status=@Status");
  193. pars.Add(new SugarParameter("@Status", input.Status.Trim().ToUpperInvariant()));
  194. }
  195. if (input.StartTime.HasValue)
  196. {
  197. where.Add("start_time >= @StartTime");
  198. pars.Add(new SugarParameter("@StartTime", input.StartTime.Value));
  199. }
  200. if (input.EndTime.HasValue)
  201. {
  202. where.Add("start_time <= @EndTime");
  203. pars.Add(new SugarParameter("@EndTime", input.EndTime.Value));
  204. }
  205. return (string.Join(" AND ", where), pars);
  206. }
  207. private static string? ResolveJobCode(string? moduleCode, string? jobCode)
  208. {
  209. if (!string.IsNullOrWhiteSpace(jobCode))
  210. return jobCode.Trim().ToUpperInvariant();
  211. if (string.IsNullOrWhiteSpace(moduleCode))
  212. return null;
  213. return ModuleJobCodes.TryGetValue(moduleCode.Trim(), out var mapped) ? mapped : null;
  214. }
  215. private static string? ResolveModuleCode(string? moduleCode, string? jobCode)
  216. {
  217. if (!string.IsNullOrWhiteSpace(moduleCode))
  218. return moduleCode.Trim().ToUpperInvariant();
  219. if (string.IsNullOrWhiteSpace(jobCode))
  220. return null;
  221. var normalizedJobCode = jobCode.Trim();
  222. return ModuleJobCodes.FirstOrDefault(u => string.Equals(u.Value, normalizedJobCode, StringComparison.OrdinalIgnoreCase)).Key;
  223. }
  224. private static string? BuildObjectFullName(string? dbType, string? host, int? port, string? dbName, string? objectName)
  225. {
  226. if (string.IsNullOrWhiteSpace(objectName))
  227. return null;
  228. var databaseObject = string.IsNullOrWhiteSpace(dbName) ? objectName : $"{dbName}.{objectName}";
  229. var hostPart = string.IsNullOrWhiteSpace(host) ? null : port.HasValue ? $"{host}:{port}" : host;
  230. return string.Join(" / ", new[] { dbType, hostPart, databaseObject }.Where(u => !string.IsNullOrWhiteSpace(u)));
  231. }
  232. private static List<MdpLineageStageRow> BuildStageDescriptions(string moduleCode)
  233. {
  234. if (string.Equals(moduleCode, "S1", StringComparison.OrdinalIgnoreCase))
  235. {
  236. return new List<MdpLineageStageRow>
  237. {
  238. new() { StageCode = "STAGING", StageName = "贴源同步", Layer = "mdp_stg", Description = "按 mdp_entity 登记的 S1 源对象抽取数据,保留 raw_data JSON 便于追溯。", InputObjects = "旧系统 / 当前库源对象", OutputObjects = "mdp_stg_so, mdp_stg_ship_trans", Execution = "S1MdpSyncTransformService.SyncStagingAsync" },
  239. new() { StageCode = "STANDARD", StageName = "标准层转换", Layer = "mdp_std", Description = "解析贴源 raw_data,做字段标准化、租户兜底和幂等写入。", InputObjects = "mdp_stg_so, mdp_stg_ship_trans", OutputObjects = "mdp_std_so, mdp_std_ship_trans", Execution = "S1MdpSyncTransformService.BuildStandardCommands" },
  240. new() { StageCode = "DWD", StageName = "DWD宽表", Layer = "dwd", Description = "沉淀 S1 订单交付事实,供订单交付、看板和诊断读取。", InputObjects = "mdp_std_so, mdp_std_ship_trans", OutputObjects = "dwd_ship_trans", Execution = "S1MdpSyncTransformService.BuildDwdAsync" },
  241. new() { StageCode = "KPI", StageName = "指标写入", Layer = "ado_s9", Description = "计算 S1 L1 指标并写入统一指标值表。", InputObjects = "mdp_std_so, dwd_ship_trans", OutputObjects = "ado_s9_kpi_value_l1_day", Execution = "S1MdpSyncTransformService.BuildS1KpiValuesAsync" }
  242. };
  243. }
  244. if (string.Equals(moduleCode, "S3", StringComparison.OrdinalIgnoreCase))
  245. {
  246. return new List<MdpLineageStageRow>
  247. {
  248. new() { StageCode = "STAGING", StageName = "贴源同步", Layer = "mdp_stg", Description = "按 mdp_entity 登记的 S3 源对象抽取数据,保留 raw_data JSON 便于追溯。", InputObjects = "S3 源对象", OutputObjects = "mdp_stg_*", Execution = "S3MdpSyncTransformService.SyncStagingAsync" },
  249. new() { StageCode = "STANDARD", StageName = "标准层转换", Layer = "mdp_std", Description = "将供应、物料、采购、交付等对象标准化。", InputObjects = "mdp_stg_*", OutputObjects = "mdp_std_*", Execution = "S3MdpSyncTransformService.BuildStandardCommands" },
  250. new() { StageCode = "DWD", StageName = "DWD宽表", Layer = "dwd", Description = "生成供应交付、齐套、风险等分析宽表。", InputObjects = "mdp_std_*", OutputObjects = "dwd_supplier_delivery / dwd_material_readiness 等", Execution = "S3MdpSyncTransformService.BuildDwdAsync" },
  251. new() { StageCode = "KPI", StageName = "指标写入", Layer = "ado_s9", Description = "写入 S3 供应协同指标。", InputObjects = "mdp_std_* / dwd_*", OutputObjects = "ado_s9_kpi_value_*", Execution = "S3MdpSyncTransformService.BuildS3KpiValuesAsync" }
  252. };
  253. }
  254. return new List<MdpLineageStageRow>
  255. {
  256. new() { StageCode = "STAGING", StageName = "贴源同步", Layer = "mdp_stg", Description = "按 mdp_entity 登记源对象抽取数据。", InputObjects = "源对象", OutputObjects = "mdp_stg_*", Execution = "模块 MDP 同步服务" },
  257. new() { StageCode = "STANDARD", StageName = "标准层转换", Layer = "mdp_std", Description = "标准层/DWD/KPI 当前由模块后端 Service 承载。", InputObjects = "mdp_stg_*", OutputObjects = "mdp_std_* / dwd_* / 指标表", Execution = "模块 MDP 转换服务" }
  258. };
  259. }
  260. private static string SelectColumnsSql()
  261. {
  262. return """
  263. SELECT id AS Id, tenant_id AS TenantId, job_code AS JobCode, job_name AS JobName, trigger_type AS TriggerType,
  264. batch_id AS BatchId, status AS Status, start_time AS StartTime, end_time AS EndTime, duration_ms AS DurationMs,
  265. stage_rows AS StageRows, standard_rows AS StandardRows, dwd_rows AS DwdRows,
  266. error_message AS ErrorMessage, summary_json AS SummaryJson, create_time AS CreateTime, update_time AS UpdateTime
  267. """;
  268. }
  269. }
  270. public class MdpMonitorQueryInput
  271. {
  272. public string? ModuleCode { get; set; }
  273. public string? JobCode { get; set; }
  274. public string? BatchId { get; set; }
  275. public string? Status { get; set; }
  276. public DateTime? StartTime { get; set; }
  277. public DateTime? EndTime { get; set; }
  278. }
  279. public sealed class MdpMonitorListInput : MdpMonitorQueryInput
  280. {
  281. public int Page { get; set; } = 1;
  282. public int PageSize { get; set; } = 10;
  283. }
  284. public sealed class MdpMonitorLineageInput : MdpMonitorQueryInput
  285. {
  286. }
  287. public sealed class MdpMonitorRunLogRow
  288. {
  289. public long Id { get; set; }
  290. public long TenantId { get; set; }
  291. public string? JobCode { get; set; }
  292. public string? JobName { get; set; }
  293. public string? TriggerType { get; set; }
  294. public string? BatchId { get; set; }
  295. public string? Status { get; set; }
  296. public DateTime? StartTime { get; set; }
  297. public DateTime? EndTime { get; set; }
  298. public int? DurationMs { get; set; }
  299. public int? StageRows { get; set; }
  300. public int? StandardRows { get; set; }
  301. public int? DwdRows { get; set; }
  302. public string? ErrorMessage { get; set; }
  303. public string? SummaryJson { get; set; }
  304. public DateTime? CreateTime { get; set; }
  305. public DateTime? UpdateTime { get; set; }
  306. }
  307. public sealed class MdpLineageOutput
  308. {
  309. public string? ModuleCode { get; set; }
  310. public string? JobCode { get; set; }
  311. public string? BatchId { get; set; }
  312. public List<MdpLineageStageRow> Stages { get; set; } = new();
  313. public List<MdpLineageEntityRow> Entities { get; set; } = new();
  314. }
  315. public sealed class MdpLineageStageRow
  316. {
  317. public string? StageCode { get; set; }
  318. public string? StageName { get; set; }
  319. public string? Layer { get; set; }
  320. public string? Description { get; set; }
  321. public string? InputObjects { get; set; }
  322. public string? OutputObjects { get; set; }
  323. public string? Execution { get; set; }
  324. }
  325. public sealed class MdpLineageEntityRow
  326. {
  327. public long Id { get; set; }
  328. public string? EntityCode { get; set; }
  329. public string? EntityName { get; set; }
  330. public string? EntityType { get; set; }
  331. public string? SourceCode { get; set; }
  332. public string? SourceName { get; set; }
  333. public string? SourceType { get; set; }
  334. public string? SourceDbType { get; set; }
  335. public string? SourceDbHost { get; set; }
  336. public int? SourceDbPort { get; set; }
  337. public string? SourceDbName { get; set; }
  338. public string? SourceTableName { get; set; }
  339. public string? SourceApiPath { get; set; }
  340. public string? SourceFullName { get; set; }
  341. public string? TargetDbType { get; set; }
  342. public string? TargetDbHost { get; set; }
  343. public int? TargetDbPort { get; set; }
  344. public string? TargetDbName { get; set; }
  345. public string? TargetTableName { get; set; }
  346. public string? TargetFullName { get; set; }
  347. public string? SyncMode { get; set; }
  348. public string? IncrColumn { get; set; }
  349. public int? Status { get; set; }
  350. public int FieldMappingCount { get; set; }
  351. public List<MdpLineageFieldMappingRow> FieldMappings { get; set; } = new();
  352. public MdpLineageSyncLogRow? SyncLog { get; set; }
  353. }
  354. public sealed class MdpLineageFieldMappingRow
  355. {
  356. public long EntityId { get; set; }
  357. public string? SourceField { get; set; }
  358. public string? TargetField { get; set; }
  359. public string? FieldType { get; set; }
  360. public string? TransformScript { get; set; }
  361. public string? ConstValue { get; set; }
  362. public string? LookupTable { get; set; }
  363. public bool IsRequired { get; set; }
  364. public string? DefaultValue { get; set; }
  365. public int SortOrder { get; set; }
  366. }
  367. public sealed class MdpLineageSyncLogRow
  368. {
  369. public long EntityId { get; set; }
  370. public string? EntityName { get; set; }
  371. public string? Status { get; set; }
  372. public long? RowsRead { get; set; }
  373. public long? RowsInsert { get; set; }
  374. public long? RowsUpdate { get; set; }
  375. public long? RowsSkip { get; set; }
  376. public long? RowsError { get; set; }
  377. public DateTime? SyncStart { get; set; }
  378. public DateTime? SyncEnd { get; set; }
  379. public int? DurationMs { get; set; }
  380. public string? ErrorMsg { get; set; }
  381. }