MdpMonitorService.cs 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434
  1. namespace Admin.NET.Plugin.AiDOP.Order;
  2. /// <summary>
  3. /// 数据中台统一 MDP 运行监控。
  4. /// </summary>
  5. [ApiDescriptionSettings(Order = 322, Description = "统一MDP运行监控")]
  6. [Route("api/DataPlatform")]
  7. [AllowAnonymous]
  8. [NonUnify]
  9. public class MdpMonitorService : IDynamicApiController, ITransient
  10. {
  11. private static readonly Dictionary<string, string> ModuleJobCodes = new(StringComparer.OrdinalIgnoreCase)
  12. {
  13. ["S1"] = "S1_MDP_SYNC_TRANSFORM",
  14. ["S2"] = "S2_MDP_SYNC_TRANSFORM",
  15. ["S3"] = "S3_MDP_SYNC_TRANSFORM"
  16. };
  17. private readonly ISqlSugarClient _db;
  18. private readonly UserManager _userManager;
  19. public MdpMonitorService(ISqlSugarClient db, UserManager userManager)
  20. {
  21. _db = db;
  22. _userManager = userManager;
  23. }
  24. [DisplayName("MDP模块选项")]
  25. [HttpGet("mdp-monitor/modules")]
  26. public object GetModules()
  27. {
  28. return ModuleJobCodes
  29. .OrderBy(u => u.Key)
  30. .Select(u => new { moduleCode = u.Key, jobCode = u.Value })
  31. .ToList();
  32. }
  33. [DisplayName("MDP最近运行状态")]
  34. [HttpGet("mdp-monitor/latest")]
  35. public async Task<object> GetLatest([FromQuery] MdpMonitorQueryInput input)
  36. {
  37. var tenantId = _userManager.TenantId;
  38. var (whereSql, pars) = BuildWhere(input, tenantId);
  39. return await _db.Ado.SqlQuerySingleAsync<MdpMonitorRunLogRow>(
  40. $"""
  41. {SelectColumnsSql()}
  42. FROM mdp_transform_run_log
  43. WHERE {whereSql}
  44. ORDER BY start_time DESC, id DESC
  45. LIMIT 1
  46. """,
  47. pars)
  48. ?? new MdpMonitorRunLogRow();
  49. }
  50. [DisplayName("MDP运行日志列表")]
  51. [HttpGet("mdp-monitor/list")]
  52. public async Task<object> GetList([FromQuery] MdpMonitorListInput input)
  53. {
  54. var tenantId = _userManager.TenantId;
  55. var page = input.Page <= 0 ? 1 : input.Page;
  56. var pageSize = input.PageSize <= 0 ? 10 : input.PageSize;
  57. var offset = (page - 1) * pageSize;
  58. var (whereSql, pars) = BuildWhere(input, tenantId);
  59. var total = await _db.Ado.GetIntAsync($"SELECT COUNT(1) FROM mdp_transform_run_log WHERE {whereSql}", pars);
  60. var list = await _db.Ado.SqlQueryAsync<MdpMonitorRunLogRow>(
  61. $"""
  62. {SelectColumnsSql()}
  63. FROM mdp_transform_run_log
  64. WHERE {whereSql}
  65. ORDER BY start_time DESC, id DESC
  66. LIMIT {pageSize} OFFSET {offset}
  67. """,
  68. pars);
  69. return new { total, page, pageSize, list };
  70. }
  71. [DisplayName("MDP运行日志详情")]
  72. [HttpGet("mdp-monitor/detail/{id}")]
  73. public async Task<object> GetDetail(long id, [FromQuery] MdpMonitorQueryInput input)
  74. {
  75. var tenantId = _userManager.TenantId;
  76. var (whereSql, pars) = BuildWhere(input, tenantId);
  77. pars.Add(new SugarParameter("@Id", id));
  78. var row = await _db.Ado.SqlQuerySingleAsync<MdpMonitorRunLogRow>(
  79. $"""
  80. {SelectColumnsSql()}
  81. FROM mdp_transform_run_log
  82. WHERE id=@Id AND {whereSql}
  83. LIMIT 1
  84. """,
  85. pars);
  86. return row ?? throw Oops.Oh("运行日志不存在");
  87. }
  88. [DisplayName("MDP同步链路详情")]
  89. [HttpGet("mdp-monitor/lineage")]
  90. public async Task<object> GetLineage([FromQuery] MdpMonitorLineageInput input)
  91. {
  92. var tenantId = _userManager.TenantId;
  93. var moduleCode = ResolveModuleCode(input.ModuleCode, input.JobCode);
  94. if (string.IsNullOrWhiteSpace(moduleCode))
  95. throw Oops.Oh("请选择 MDP 模块");
  96. var jobCode = ResolveJobCode(moduleCode, input.JobCode);
  97. var entityPrefix = $"{moduleCode}_%";
  98. var entities = await _db.Ado.SqlQueryAsync<MdpLineageEntityRow>(
  99. """
  100. SELECT e.id AS Id, e.entity_code AS EntityCode, e.entity_name AS EntityName,
  101. e.entity_type AS EntityType, s.source_code AS SourceCode,
  102. s.source_name AS SourceName, s.source_type AS SourceType,
  103. s.db_type AS SourceDbType, s.db_host AS SourceDbHost,
  104. s.db_port AS SourceDbPort, s.db_name AS SourceDbName,
  105. e.source_table_name AS SourceTableName, e.source_api_path AS SourceApiPath,
  106. s.db_type AS TargetDbType, s.db_host AS TargetDbHost,
  107. s.db_port AS TargetDbPort, s.db_name AS TargetDbName,
  108. e.target_table_name AS TargetTableName, e.sync_mode AS SyncMode,
  109. e.incr_column AS IncrColumn, e.status AS Status
  110. FROM mdp_entity e
  111. LEFT JOIN mdp_source s ON s.id = e.source_id
  112. WHERE e.entity_code LIKE @EntityPrefix AND e.tenant_id = @TenantId
  113. ORDER BY e.entity_code
  114. """,
  115. new SugarParameter("@EntityPrefix", entityPrefix),
  116. new SugarParameter("@TenantId", tenantId));
  117. if (entities.Count == 0)
  118. {
  119. return new MdpLineageOutput
  120. {
  121. ModuleCode = moduleCode,
  122. JobCode = jobCode,
  123. Stages = BuildStageDescriptions(moduleCode),
  124. Entities = new List<MdpLineageEntityRow>()
  125. };
  126. }
  127. var entityIds = string.Join(",", entities.Select(u => u.Id));
  128. var mappings = await _db.Ado.SqlQueryAsync<MdpLineageFieldMappingRow>(
  129. $"""
  130. SELECT entity_id AS EntityId, source_field AS SourceField, target_field AS TargetField,
  131. field_type AS FieldType, transform_script AS TransformScript,
  132. const_value AS ConstValue, lookup_table AS LookupTable,
  133. is_required AS IsRequired, default_value AS DefaultValue, sort_order AS SortOrder
  134. FROM mdp_field_mapping
  135. WHERE entity_id IN ({entityIds})
  136. ORDER BY entity_id, sort_order, target_field
  137. """);
  138. var mappingsByEntity = mappings.GroupBy(u => u.EntityId).ToDictionary(u => u.Key, u => u.ToList());
  139. Dictionary<long, MdpLineageSyncLogRow> syncLogsByEntity = new();
  140. if (!string.IsNullOrWhiteSpace(input.BatchId))
  141. {
  142. var syncLogs = await _db.Ado.SqlQueryAsync<MdpLineageSyncLogRow>(
  143. """
  144. SELECT entity_id AS EntityId, entity_name AS EntityName, status AS Status,
  145. rows_read AS RowsRead, rows_insert AS RowsInsert, rows_update AS RowsUpdate,
  146. rows_skip AS RowsSkip, rows_error AS RowsError,
  147. sync_start AS SyncStart, sync_end AS SyncEnd, duration_ms AS DurationMs,
  148. error_msg AS ErrorMsg
  149. FROM mdp_sync_log
  150. WHERE sync_batch_id = @BatchId
  151. AND tenant_id = @TenantId
  152. AND entity_id IN (
  153. SELECT id FROM mdp_entity WHERE entity_code LIKE @EntityPrefix
  154. )
  155. ORDER BY sync_start, id
  156. """,
  157. new SugarParameter("@BatchId", input.BatchId.Trim()),
  158. new SugarParameter("@EntityPrefix", entityPrefix),
  159. new SugarParameter("@TenantId", tenantId));
  160. syncLogsByEntity = syncLogs
  161. .GroupBy(u => u.EntityId)
  162. .ToDictionary(u => u.Key, u => u.OrderByDescending(x => x.SyncStart).First());
  163. }
  164. foreach (var entity in entities)
  165. {
  166. entity.SourceFullName = BuildObjectFullName(entity.SourceDbType, entity.SourceDbHost, entity.SourceDbPort, entity.SourceDbName, entity.SourceTableName ?? entity.SourceApiPath);
  167. entity.TargetFullName = BuildObjectFullName(entity.TargetDbType, entity.TargetDbHost, entity.TargetDbPort, entity.TargetDbName, entity.TargetTableName);
  168. if (mappingsByEntity.TryGetValue(entity.Id, out var entityMappings))
  169. {
  170. entity.FieldMappings = entityMappings;
  171. entity.FieldMappingCount = entityMappings.Count;
  172. }
  173. if (syncLogsByEntity.TryGetValue(entity.Id, out var syncLog))
  174. entity.SyncLog = syncLog;
  175. }
  176. return new MdpLineageOutput
  177. {
  178. ModuleCode = moduleCode,
  179. JobCode = jobCode,
  180. BatchId = input.BatchId,
  181. Stages = BuildStageDescriptions(moduleCode),
  182. Entities = entities
  183. };
  184. }
  185. private static (string WhereSql, List<SugarParameter> Parameters) BuildWhere(MdpMonitorQueryInput input, long tenantId)
  186. {
  187. var where = new List<string> { "IFNULL(job_code, '') LIKE '%MDP%'", "tenant_id = @TenantId" };
  188. var pars = new List<SugarParameter> { new("@TenantId", tenantId) };
  189. var jobCode = ResolveJobCode(input.ModuleCode, input.JobCode);
  190. if (!string.IsNullOrWhiteSpace(jobCode))
  191. {
  192. where.Add("job_code=@JobCode");
  193. pars.Add(new SugarParameter("@JobCode", jobCode));
  194. }
  195. if (!string.IsNullOrWhiteSpace(input.BatchId))
  196. {
  197. where.Add("batch_id LIKE @BatchId");
  198. pars.Add(new SugarParameter("@BatchId", $"%{input.BatchId.Trim()}%"));
  199. }
  200. if (!string.IsNullOrWhiteSpace(input.Status))
  201. {
  202. where.Add("status=@Status");
  203. pars.Add(new SugarParameter("@Status", input.Status.Trim().ToUpperInvariant()));
  204. }
  205. if (input.StartTime.HasValue)
  206. {
  207. where.Add("start_time >= @StartTime");
  208. pars.Add(new SugarParameter("@StartTime", input.StartTime.Value));
  209. }
  210. if (input.EndTime.HasValue)
  211. {
  212. where.Add("start_time <= @EndTime");
  213. pars.Add(new SugarParameter("@EndTime", input.EndTime.Value));
  214. }
  215. return (string.Join(" AND ", where), pars);
  216. }
  217. private static string? ResolveJobCode(string? moduleCode, string? jobCode)
  218. {
  219. if (!string.IsNullOrWhiteSpace(jobCode))
  220. return jobCode.Trim().ToUpperInvariant();
  221. if (string.IsNullOrWhiteSpace(moduleCode))
  222. return null;
  223. return ModuleJobCodes.TryGetValue(moduleCode.Trim(), out var mapped) ? mapped : null;
  224. }
  225. private static string? ResolveModuleCode(string? moduleCode, string? jobCode)
  226. {
  227. if (!string.IsNullOrWhiteSpace(moduleCode))
  228. return moduleCode.Trim().ToUpperInvariant();
  229. if (string.IsNullOrWhiteSpace(jobCode))
  230. return null;
  231. var normalizedJobCode = jobCode.Trim();
  232. return ModuleJobCodes.FirstOrDefault(u => string.Equals(u.Value, normalizedJobCode, StringComparison.OrdinalIgnoreCase)).Key;
  233. }
  234. private static string? BuildObjectFullName(string? dbType, string? host, int? port, string? dbName, string? objectName)
  235. {
  236. if (string.IsNullOrWhiteSpace(objectName))
  237. return null;
  238. var databaseObject = string.IsNullOrWhiteSpace(dbName) ? objectName : $"{dbName}.{objectName}";
  239. var hostPart = string.IsNullOrWhiteSpace(host) ? null : port.HasValue ? $"{host}:{port}" : host;
  240. return string.Join(" / ", new[] { dbType, hostPart, databaseObject }.Where(u => !string.IsNullOrWhiteSpace(u)));
  241. }
  242. private static List<MdpLineageStageRow> BuildStageDescriptions(string moduleCode)
  243. {
  244. if (string.Equals(moduleCode, "S1", StringComparison.OrdinalIgnoreCase))
  245. {
  246. return new List<MdpLineageStageRow>
  247. {
  248. new() { StageCode = "STAGING", StageName = "贴源同步", Layer = "mdp_stg", Description = "按 mdp_entity 登记的 S1 源对象抽取数据,保留 raw_data JSON 便于追溯。", InputObjects = "旧系统 / 当前库源对象", OutputObjects = "mdp_stg_so, mdp_stg_ship_trans", Execution = "S1MdpSyncTransformService.SyncStagingAsync" },
  249. new() { StageCode = "STANDARD", StageName = "标准层转换", Layer = "mdp_std", Description = "解析贴源 raw_data,做字段标准化、租户兜底和幂等写入。", InputObjects = "mdp_stg_so, mdp_stg_ship_trans", OutputObjects = "mdp_std_so, mdp_std_ship_trans", Execution = "S1MdpSyncTransformService.BuildStandardCommands" },
  250. new() { StageCode = "DWD", StageName = "DWD宽表", Layer = "dwd", Description = "沉淀 S1 订单交付事实,供订单交付、看板和诊断读取。", InputObjects = "mdp_std_so, mdp_std_ship_trans", OutputObjects = "dwd_ship_trans", Execution = "S1MdpSyncTransformService.BuildDwdAsync" },
  251. new() { StageCode = "KPI", StageName = "指标写入", Layer = "ado_s9", Description = "计算 S1 L1 指标并写入统一指标值表。", InputObjects = "mdp_std_so, dwd_ship_trans", OutputObjects = "ado_s9_kpi_value_l1_day", Execution = "S1MdpSyncTransformService.BuildS1KpiValuesAsync" }
  252. };
  253. }
  254. if (string.Equals(moduleCode, "S3", StringComparison.OrdinalIgnoreCase))
  255. {
  256. return new List<MdpLineageStageRow>
  257. {
  258. new() { StageCode = "STAGING", StageName = "贴源同步", Layer = "mdp_stg", Description = "按 mdp_entity 登记的 S3 源对象抽取数据,保留 raw_data JSON 便于追溯。", InputObjects = "S3 源对象", OutputObjects = "mdp_stg_*", Execution = "S3MdpSyncTransformService.SyncStagingAsync" },
  259. new() { StageCode = "STANDARD", StageName = "标准层转换", Layer = "mdp_std", Description = "将供应、物料、采购、交付等对象标准化。", InputObjects = "mdp_stg_*", OutputObjects = "mdp_std_*", Execution = "S3MdpSyncTransformService.BuildStandardCommands" },
  260. new() { StageCode = "DWD", StageName = "DWD宽表", Layer = "dwd", Description = "生成供应交付、齐套、风险等分析宽表。", InputObjects = "mdp_std_*", OutputObjects = "dwd_supplier_delivery / dwd_material_readiness 等", Execution = "S3MdpSyncTransformService.BuildDwdAsync" },
  261. new() { StageCode = "KPI", StageName = "指标写入", Layer = "ado_s9", Description = "写入 S3 供应协同指标。", InputObjects = "mdp_std_* / dwd_*", OutputObjects = "ado_s9_kpi_value_*", Execution = "S3MdpSyncTransformService.BuildS3KpiValuesAsync" }
  262. };
  263. }
  264. return new List<MdpLineageStageRow>
  265. {
  266. new() { StageCode = "STAGING", StageName = "贴源同步", Layer = "mdp_stg", Description = "按 mdp_entity 登记源对象抽取数据。", InputObjects = "源对象", OutputObjects = "mdp_stg_*", Execution = "模块 MDP 同步服务" },
  267. new() { StageCode = "STANDARD", StageName = "标准层转换", Layer = "mdp_std", Description = "标准层/DWD/KPI 当前由模块后端 Service 承载。", InputObjects = "mdp_stg_*", OutputObjects = "mdp_std_* / dwd_* / 指标表", Execution = "模块 MDP 转换服务" }
  268. };
  269. }
  270. private static string SelectColumnsSql()
  271. {
  272. return """
  273. SELECT id AS Id, tenant_id AS TenantId, job_code AS JobCode, job_name AS JobName, trigger_type AS TriggerType,
  274. batch_id AS BatchId, status AS Status, start_time AS StartTime, end_time AS EndTime, duration_ms AS DurationMs,
  275. stage_rows AS StageRows, standard_rows AS StandardRows, dwd_rows AS DwdRows,
  276. error_message AS ErrorMessage, summary_json AS SummaryJson, create_time AS CreateTime, update_time AS UpdateTime
  277. """;
  278. }
  279. }
  280. public class MdpMonitorQueryInput
  281. {
  282. public string? ModuleCode { get; set; }
  283. public string? JobCode { get; set; }
  284. public string? BatchId { get; set; }
  285. public string? Status { get; set; }
  286. public DateTime? StartTime { get; set; }
  287. public DateTime? EndTime { get; set; }
  288. }
  289. public sealed class MdpMonitorListInput : MdpMonitorQueryInput
  290. {
  291. public int Page { get; set; } = 1;
  292. public int PageSize { get; set; } = 10;
  293. }
  294. public sealed class MdpMonitorLineageInput : MdpMonitorQueryInput
  295. {
  296. }
  297. public sealed class MdpMonitorRunLogRow
  298. {
  299. public long Id { get; set; }
  300. public long TenantId { get; set; }
  301. public string? JobCode { get; set; }
  302. public string? JobName { get; set; }
  303. public string? TriggerType { get; set; }
  304. public string? BatchId { get; set; }
  305. public string? Status { get; set; }
  306. public DateTime? StartTime { get; set; }
  307. public DateTime? EndTime { get; set; }
  308. public int? DurationMs { get; set; }
  309. public int? StageRows { get; set; }
  310. public int? StandardRows { get; set; }
  311. public int? DwdRows { get; set; }
  312. public string? ErrorMessage { get; set; }
  313. public string? SummaryJson { get; set; }
  314. public DateTime? CreateTime { get; set; }
  315. public DateTime? UpdateTime { get; set; }
  316. }
  317. public sealed class MdpLineageOutput
  318. {
  319. public string? ModuleCode { get; set; }
  320. public string? JobCode { get; set; }
  321. public string? BatchId { get; set; }
  322. public List<MdpLineageStageRow> Stages { get; set; } = new();
  323. public List<MdpLineageEntityRow> Entities { get; set; } = new();
  324. }
  325. public sealed class MdpLineageStageRow
  326. {
  327. public string? StageCode { get; set; }
  328. public string? StageName { get; set; }
  329. public string? Layer { get; set; }
  330. public string? Description { get; set; }
  331. public string? InputObjects { get; set; }
  332. public string? OutputObjects { get; set; }
  333. public string? Execution { get; set; }
  334. }
  335. public sealed class MdpLineageEntityRow
  336. {
  337. public long Id { get; set; }
  338. public string? EntityCode { get; set; }
  339. public string? EntityName { get; set; }
  340. public string? EntityType { get; set; }
  341. public string? SourceCode { get; set; }
  342. public string? SourceName { get; set; }
  343. public string? SourceType { get; set; }
  344. public string? SourceDbType { get; set; }
  345. public string? SourceDbHost { get; set; }
  346. public int? SourceDbPort { get; set; }
  347. public string? SourceDbName { get; set; }
  348. public string? SourceTableName { get; set; }
  349. public string? SourceApiPath { get; set; }
  350. public string? SourceFullName { get; set; }
  351. public string? TargetDbType { get; set; }
  352. public string? TargetDbHost { get; set; }
  353. public int? TargetDbPort { get; set; }
  354. public string? TargetDbName { get; set; }
  355. public string? TargetTableName { get; set; }
  356. public string? TargetFullName { get; set; }
  357. public string? SyncMode { get; set; }
  358. public string? IncrColumn { get; set; }
  359. public int? Status { get; set; }
  360. public int FieldMappingCount { get; set; }
  361. public List<MdpLineageFieldMappingRow> FieldMappings { get; set; } = new();
  362. public MdpLineageSyncLogRow? SyncLog { get; set; }
  363. }
  364. public sealed class MdpLineageFieldMappingRow
  365. {
  366. public long EntityId { get; set; }
  367. public string? SourceField { get; set; }
  368. public string? TargetField { get; set; }
  369. public string? FieldType { get; set; }
  370. public string? TransformScript { get; set; }
  371. public string? ConstValue { get; set; }
  372. public string? LookupTable { get; set; }
  373. public bool IsRequired { get; set; }
  374. public string? DefaultValue { get; set; }
  375. public int SortOrder { get; set; }
  376. }
  377. public sealed class MdpLineageSyncLogRow
  378. {
  379. public long EntityId { get; set; }
  380. public string? EntityName { get; set; }
  381. public string? Status { get; set; }
  382. public long? RowsRead { get; set; }
  383. public long? RowsInsert { get; set; }
  384. public long? RowsUpdate { get; set; }
  385. public long? RowsSkip { get; set; }
  386. public long? RowsError { get; set; }
  387. public DateTime? SyncStart { get; set; }
  388. public DateTime? SyncEnd { get; set; }
  389. public int? DurationMs { get; set; }
  390. public string? ErrorMsg { get; set; }
  391. }