MdpMonitorService.cs 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772
  1. namespace Admin.NET.Plugin.AiDOP.Order;
  2. /// <summary>
  3. /// 数据中台统一 MDP 运行监控。
  4. /// </summary>
  5. [ApiDescriptionSettings(Order = 322, Description = "统一MDP运行监控")]
  6. [Route("api/DataPlatform")]
  7. [AllowAnonymous]
  8. [NonUnify]
  9. public class MdpMonitorService : IDynamicApiController, ITransient
  10. {
  11. private static readonly Dictionary<string, string> ModuleJobCodes = new(StringComparer.OrdinalIgnoreCase)
  12. {
  13. ["S1"] = "S1_MDP_SYNC_TRANSFORM",
  14. ["S2"] = "S2_MDP_SYNC_TRANSFORM",
  15. ["S3"] = "S3_MDP_SYNC_TRANSFORM",
  16. ["S4"] = "S4_MDP_SYNC_TRANSFORM"
  17. };
  18. private static readonly Dictionary<string, MdpJobCatalogItem> JobCatalog = new(StringComparer.OrdinalIgnoreCase)
  19. {
  20. ["S1_MDP_SYNC_TRANSFORM"] = new(
  21. "S1_MDP_SYNC_TRANSFORM",
  22. "order_delivery",
  23. "订单交付域",
  24. ["S1", "S2", "S3", "S4", "S7", "S9"],
  25. "GLOBAL_DOMAIN",
  26. "订单交付域 MDP 同步",
  27. "job_s1_mdp_sync_transform"),
  28. ["S2_MDP_SYNC_TRANSFORM"] = new(
  29. "S2_MDP_SYNC_TRANSFORM",
  30. "work_schedule",
  31. "工单排程域",
  32. ["S2", "S3", "S5", "S6", "S8", "S9"],
  33. "GLOBAL_DOMAIN",
  34. "工单排程域 MDP 同步",
  35. "job_s2_mdp_sync_transform"),
  36. ["S3_MDP_SYNC_TRANSFORM"] = new(
  37. "S3_MDP_SYNC_TRANSFORM",
  38. "supply_purchase",
  39. "供应采购域",
  40. ["S3", "S4", "S5", "S8", "S9"],
  41. "GLOBAL_DOMAIN",
  42. "供应采购域 MDP 同步",
  43. "job_s3_mdp_sync_transform"),
  44. ["S4_MDP_SYNC_TRANSFORM"] = new(
  45. "S4_MDP_SYNC_TRANSFORM",
  46. "purchase_execution",
  47. "采购执行域",
  48. ["S4", "S5", "S8", "S9"],
  49. "GLOBAL_DOMAIN",
  50. "采购执行域 MDP 同步",
  51. "job_s4_mdp_sync_transform")
  52. };
  53. private readonly ISqlSugarClient _db;
  54. private readonly UserManager _userManager;
  55. public MdpMonitorService(ISqlSugarClient db, UserManager userManager)
  56. {
  57. _db = db;
  58. _userManager = userManager;
  59. }
  60. [DisplayName("MDP模块选项")]
  61. [HttpGet("mdp-monitor/modules")]
  62. public object GetModules() => BuildCatalogResponse();
  63. [DisplayName("MDP任务目录")]
  64. [HttpGet("mdp-monitor/catalog")]
  65. public object GetCatalog() => BuildCatalogResponse();
  66. [DisplayName("MDP最近运行状态")]
  67. [HttpGet("mdp-monitor/latest")]
  68. public async Task<object> GetLatest([FromQuery] MdpMonitorQueryInput input)
  69. {
  70. var tenantId = _userManager.TenantId;
  71. var (whereSql, pars) = BuildWhere(input, tenantId);
  72. var row = await _db.Ado.SqlQuerySingleAsync<MdpMonitorRunLogRow>(
  73. $"""
  74. {SelectColumnsSql()}
  75. FROM mdp_transform_run_log
  76. WHERE {whereSql}
  77. ORDER BY start_time DESC, id DESC
  78. LIMIT 1
  79. """,
  80. pars)
  81. ?? new MdpMonitorRunLogRow();
  82. AttachCatalogInfo(row);
  83. return row;
  84. }
  85. [DisplayName("MDP运行日志列表")]
  86. [HttpGet("mdp-monitor/list")]
  87. public async Task<object> GetList([FromQuery] MdpMonitorListInput input)
  88. {
  89. var tenantId = _userManager.TenantId;
  90. var page = input.Page <= 0 ? 1 : input.Page;
  91. var pageSize = input.PageSize <= 0 ? 10 : input.PageSize;
  92. var offset = (page - 1) * pageSize;
  93. var (whereSql, pars) = BuildWhere(input, tenantId);
  94. var total = await _db.Ado.GetIntAsync($"SELECT COUNT(1) FROM mdp_transform_run_log WHERE {whereSql}", pars);
  95. var list = await _db.Ado.SqlQueryAsync<MdpMonitorRunLogRow>(
  96. $"""
  97. {SelectColumnsSql()}
  98. FROM mdp_transform_run_log
  99. WHERE {whereSql}
  100. ORDER BY start_time DESC, id DESC
  101. LIMIT {pageSize} OFFSET {offset}
  102. """,
  103. pars);
  104. foreach (var row in list)
  105. AttachCatalogInfo(row);
  106. return new { total, page, pageSize, list };
  107. }
  108. [DisplayName("MDP运行日志详情")]
  109. [HttpGet("mdp-monitor/detail/{id}")]
  110. public async Task<object> GetDetail(long id, [FromQuery] MdpMonitorQueryInput input)
  111. {
  112. var tenantId = _userManager.TenantId;
  113. var (whereSql, pars) = BuildWhere(input, tenantId);
  114. pars.Add(new SugarParameter("@Id", id));
  115. var row = await _db.Ado.SqlQuerySingleAsync<MdpMonitorRunLogRow>(
  116. $"""
  117. {SelectColumnsSql()}
  118. FROM mdp_transform_run_log
  119. WHERE id=@Id AND {whereSql}
  120. LIMIT 1
  121. """,
  122. pars);
  123. if (row == null)
  124. throw Oops.Oh("运行日志不存在");
  125. AttachCatalogInfo(row);
  126. return row;
  127. }
  128. [DisplayName("MDP同步链路详情")]
  129. [HttpGet("mdp-monitor/lineage")]
  130. public async Task<object> GetLineage([FromQuery] MdpMonitorLineageInput input)
  131. {
  132. var tenantId = _userManager.TenantId;
  133. var moduleCode = ResolveModuleCode(input.ModuleCode, input.JobCode);
  134. if (string.IsNullOrWhiteSpace(moduleCode))
  135. throw Oops.Oh("请选择 MDP 模块");
  136. var jobCode = ResolveJobCode(moduleCode, input.JobCode);
  137. var entityPrefix = $"{moduleCode}_%";
  138. var entities = await _db.Ado.SqlQueryAsync<MdpLineageEntityRow>(
  139. """
  140. SELECT e.id AS Id, e.entity_code AS EntityCode, e.entity_name AS EntityName,
  141. e.entity_type AS EntityType, s.source_code AS SourceCode,
  142. s.source_name AS SourceName, s.source_type AS SourceType,
  143. s.db_type AS SourceDbType, s.db_host AS SourceDbHost,
  144. s.db_port AS SourceDbPort, s.db_name AS SourceDbName,
  145. e.source_table_name AS SourceTableName, e.source_api_path AS SourceApiPath,
  146. s.db_type AS TargetDbType, s.db_host AS TargetDbHost,
  147. s.db_port AS TargetDbPort, s.db_name AS TargetDbName,
  148. e.target_table_name AS TargetTableName, e.sync_mode AS SyncMode,
  149. e.incr_column AS IncrColumn, e.status AS Status
  150. FROM mdp_entity e
  151. LEFT JOIN mdp_source s ON s.id = e.source_id
  152. WHERE e.entity_code LIKE @EntityPrefix
  153. AND (e.tenant_id = @TenantId OR e.tenant_id = 0)
  154. ORDER BY e.entity_code
  155. """,
  156. new SugarParameter("@EntityPrefix", entityPrefix),
  157. new SugarParameter("@TenantId", tenantId));
  158. if (entities.Count == 0)
  159. {
  160. var emptyOutput = new MdpLineageOutput
  161. {
  162. ModuleCode = moduleCode,
  163. JobCode = jobCode,
  164. BatchId = input.BatchId,
  165. Stages = BuildStageDescriptions(jobCode, moduleCode),
  166. Entities = new List<MdpLineageEntityRow>()
  167. };
  168. AttachLineageCatalogInfo(emptyOutput);
  169. return emptyOutput;
  170. }
  171. var entityIds = string.Join(",", entities.Select(u => u.Id));
  172. var mappings = await _db.Ado.SqlQueryAsync<MdpLineageFieldMappingRow>(
  173. $"""
  174. SELECT entity_id AS EntityId, source_field AS SourceField, target_field AS TargetField,
  175. field_type AS FieldType, transform_script AS TransformScript,
  176. const_value AS ConstValue, lookup_table AS LookupTable,
  177. is_required AS IsRequired, default_value AS DefaultValue, sort_order AS SortOrder
  178. FROM mdp_field_mapping
  179. WHERE entity_id IN ({entityIds})
  180. ORDER BY entity_id, sort_order, target_field
  181. """);
  182. var mappingsByEntity = mappings.GroupBy(u => u.EntityId).ToDictionary(u => u.Key, u => u.ToList());
  183. Dictionary<long, MdpLineageSyncLogRow> syncLogsByEntity = new();
  184. if (!string.IsNullOrWhiteSpace(input.BatchId))
  185. {
  186. var syncLogs = await _db.Ado.SqlQueryAsync<MdpLineageSyncLogRow>(
  187. """
  188. SELECT entity_id AS EntityId, entity_name AS EntityName, status AS Status,
  189. rows_read AS RowsRead, rows_insert AS RowsInsert, rows_update AS RowsUpdate,
  190. rows_skip AS RowsSkip, rows_error AS RowsError,
  191. sync_start AS SyncStart, sync_end AS SyncEnd, duration_ms AS DurationMs,
  192. error_msg AS ErrorMsg
  193. FROM mdp_sync_log
  194. WHERE sync_batch_id = @BatchId
  195. AND (tenant_id = @TenantId OR tenant_id = 0)
  196. AND entity_id IN (
  197. SELECT id FROM mdp_entity
  198. WHERE entity_code LIKE @EntityPrefix
  199. AND (tenant_id = @TenantId OR tenant_id = 0)
  200. )
  201. ORDER BY sync_start, id
  202. """,
  203. new SugarParameter("@BatchId", input.BatchId.Trim()),
  204. new SugarParameter("@EntityPrefix", entityPrefix),
  205. new SugarParameter("@TenantId", tenantId));
  206. syncLogsByEntity = syncLogs
  207. .GroupBy(u => u.EntityId)
  208. .ToDictionary(u => u.Key, u => u.OrderByDescending(x => x.SyncStart).First());
  209. }
  210. var batchId = input.BatchId?.Trim();
  211. foreach (var entity in entities)
  212. {
  213. entity.SourceFullName = BuildObjectFullName(entity.SourceDbType, entity.SourceDbHost, entity.SourceDbPort, entity.SourceDbName, entity.SourceTableName ?? entity.SourceApiPath);
  214. entity.TargetFullName = BuildObjectFullName(entity.TargetDbType, entity.TargetDbHost, entity.TargetDbPort, entity.TargetDbName, entity.TargetTableName);
  215. if (mappingsByEntity.TryGetValue(entity.Id, out var entityMappings) && entityMappings.Count > 0)
  216. {
  217. foreach (var mapping in entityMappings)
  218. {
  219. mapping.MappingSource = "CONFIG";
  220. mapping.IsFallback = false;
  221. }
  222. entity.FieldMappings = entityMappings;
  223. entity.FieldMappingCount = entityMappings.Count;
  224. }
  225. else
  226. {
  227. entity.FieldMappings = BuildFallbackFieldMappings(entity, batchId);
  228. entity.FieldMappingCount = entity.FieldMappings.Count;
  229. }
  230. if (syncLogsByEntity.TryGetValue(entity.Id, out var syncLog))
  231. entity.SyncLog = syncLog;
  232. }
  233. var output = new MdpLineageOutput
  234. {
  235. ModuleCode = moduleCode,
  236. JobCode = jobCode,
  237. BatchId = input.BatchId,
  238. Stages = BuildStageDescriptions(jobCode, moduleCode),
  239. Entities = entities
  240. };
  241. AttachLineageCatalogInfo(output);
  242. return output;
  243. }
  244. private static (string WhereSql, List<SugarParameter> Parameters) BuildWhere(MdpMonitorQueryInput input, long tenantId)
  245. {
  246. var where = new List<string> { BuildMdpRunLogTenantWhere(tenantId) };
  247. var pars = new List<SugarParameter> { new("@TenantId", tenantId) };
  248. var (jobFilterSql, jobFilterPars, noMatch) = BuildJobCodeFilter(input);
  249. if (noMatch)
  250. where.Add("1=0");
  251. else
  252. {
  253. where.Add(jobFilterSql);
  254. pars.AddRange(jobFilterPars);
  255. }
  256. if (!string.IsNullOrWhiteSpace(input.BatchId))
  257. {
  258. where.Add("batch_id LIKE @BatchId");
  259. pars.Add(new SugarParameter("@BatchId", $"%{input.BatchId.Trim()}%"));
  260. }
  261. if (!string.IsNullOrWhiteSpace(input.Status))
  262. {
  263. where.Add("status=@Status");
  264. pars.Add(new SugarParameter("@Status", input.Status.Trim().ToUpperInvariant()));
  265. }
  266. if (input.StartTime.HasValue)
  267. {
  268. where.Add("start_time >= @StartTime");
  269. pars.Add(new SugarParameter("@StartTime", input.StartTime.Value));
  270. }
  271. if (input.EndTime.HasValue)
  272. {
  273. where.Add("start_time <= @EndTime");
  274. pars.Add(new SugarParameter("@EndTime", input.EndTime.Value));
  275. }
  276. return (string.Join(" AND ", where), pars);
  277. }
  278. /// <summary>
  279. /// MDP 转换任务当前以 tenant_id=0 写入运行日志;登录租户查询时需兼容这类全局任务记录。
  280. /// </summary>
  281. internal static string BuildMdpRunLogTenantWhere(long tenantId) =>
  282. tenantId > 0 ? "(tenant_id = @TenantId OR tenant_id = 0)" : "tenant_id = 0";
  283. private static object BuildCatalogResponse()
  284. {
  285. return JobCatalog.Values
  286. .OrderBy(u => u.JobCode, StringComparer.OrdinalIgnoreCase)
  287. .Select(u => new
  288. {
  289. jobCode = u.JobCode,
  290. displayName = u.DisplayName,
  291. businessDomainCode = u.BusinessDomainCode,
  292. businessDomainName = u.BusinessDomainName,
  293. consumerModules = u.ConsumerModules,
  294. scopeType = u.ScopeType,
  295. moduleCode = ResolveModuleCodeFromJobCode(u.JobCode),
  296. scheduleJobId = u.ScheduleJobId
  297. })
  298. .ToList();
  299. }
  300. private static (string Sql, List<SugarParameter> Parameters, bool NoMatch) BuildJobCodeFilter(MdpMonitorQueryInput input)
  301. {
  302. var pars = new List<SugarParameter>();
  303. if (!string.IsNullOrWhiteSpace(input.JobCode))
  304. {
  305. pars.Add(new SugarParameter("@JobCode", input.JobCode.Trim().ToUpperInvariant()));
  306. return ("job_code=@JobCode", pars, false);
  307. }
  308. var hasDomainFilter = !string.IsNullOrWhiteSpace(input.BusinessDomainCode);
  309. var consumerModule = !string.IsNullOrWhiteSpace(input.ConsumerModule)
  310. ? input.ConsumerModule.Trim()
  311. : !string.IsNullOrWhiteSpace(input.ModuleCode) ? input.ModuleCode.Trim() : null;
  312. var hasConsumerFilter = !string.IsNullOrWhiteSpace(consumerModule);
  313. if (!hasDomainFilter && !hasConsumerFilter)
  314. return ("IFNULL(job_code, '') LIKE '%MDP%'", pars, false);
  315. var allowed = ResolveAllowedJobCodes(input.BusinessDomainCode, consumerModule);
  316. if (allowed.Count == 0)
  317. return (string.Empty, pars, true);
  318. if (allowed.Count == 1)
  319. {
  320. pars.Add(new SugarParameter("@JobCode", allowed.First()));
  321. return ("job_code=@JobCode", pars, false);
  322. }
  323. var inParts = new List<string>();
  324. var index = 0;
  325. foreach (var code in allowed.OrderBy(u => u, StringComparer.OrdinalIgnoreCase))
  326. {
  327. var paramName = $"@JobCode{index++}";
  328. inParts.Add(paramName);
  329. pars.Add(new SugarParameter(paramName, code));
  330. }
  331. return ($"job_code IN ({string.Join(", ", inParts)})", pars, false);
  332. }
  333. private static HashSet<string> ResolveAllowedJobCodes(string? businessDomainCode, string? consumerModule)
  334. {
  335. IEnumerable<MdpJobCatalogItem> items = JobCatalog.Values;
  336. if (!string.IsNullOrWhiteSpace(businessDomainCode))
  337. {
  338. var domain = businessDomainCode.Trim();
  339. items = items.Where(u => string.Equals(u.BusinessDomainCode, domain, StringComparison.OrdinalIgnoreCase));
  340. }
  341. if (!string.IsNullOrWhiteSpace(consumerModule))
  342. {
  343. var module = consumerModule.Trim();
  344. items = items.Where(u => u.ConsumerModules.Contains(module, StringComparer.OrdinalIgnoreCase));
  345. }
  346. return items.Select(u => u.JobCode).ToHashSet(StringComparer.OrdinalIgnoreCase);
  347. }
  348. private static void AttachCatalogInfo(MdpMonitorRunLogRow row)
  349. {
  350. if (row == null || string.IsNullOrWhiteSpace(row.JobCode))
  351. return;
  352. if (!JobCatalog.TryGetValue(row.JobCode.Trim(), out var item))
  353. return;
  354. row.BusinessDomainCode = item.BusinessDomainCode;
  355. row.BusinessDomainName = item.BusinessDomainName;
  356. row.ConsumerModules = string.Join(",", item.ConsumerModules);
  357. row.ScopeType = item.ScopeType;
  358. row.DisplayName = item.DisplayName;
  359. row.ScheduleJobId = item.ScheduleJobId;
  360. }
  361. private static void AttachLineageCatalogInfo(MdpLineageOutput output)
  362. {
  363. if (output == null || string.IsNullOrWhiteSpace(output.JobCode))
  364. return;
  365. if (!JobCatalog.TryGetValue(output.JobCode.Trim(), out var item))
  366. return;
  367. output.BusinessDomainCode = item.BusinessDomainCode;
  368. output.BusinessDomainName = item.BusinessDomainName;
  369. output.ConsumerModules = string.Join(",", item.ConsumerModules);
  370. output.ScopeType = item.ScopeType;
  371. output.DisplayName = item.DisplayName;
  372. output.ScheduleJobId = item.ScheduleJobId;
  373. }
  374. private static string FormatConsumerModulesLabel(MdpJobCatalogItem item) =>
  375. string.Join("、", item.ConsumerModules);
  376. private static List<MdpLineageStageRow> BuildStageDescriptions(string? jobCode, string? moduleCode)
  377. {
  378. if (!string.IsNullOrWhiteSpace(jobCode) && JobCatalog.TryGetValue(jobCode.Trim(), out var catalogItem))
  379. {
  380. return catalogItem.BusinessDomainCode switch
  381. {
  382. "order_delivery" => BuildOrderDeliveryStages(catalogItem),
  383. "work_schedule" => BuildWorkScheduleStages(catalogItem),
  384. "supply_purchase" => BuildSupplyPurchaseStages(catalogItem),
  385. "purchase_execution" => BuildPurchaseExecutionStages(catalogItem),
  386. _ => BuildGenericStages(moduleCode)
  387. };
  388. }
  389. return BuildGenericStages(moduleCode);
  390. }
  391. private static List<MdpLineageStageRow> BuildOrderDeliveryStages(MdpJobCatalogItem item)
  392. {
  393. var consumers = FormatConsumerModulesLabel(item);
  394. return new List<MdpLineageStageRow>
  395. {
  396. new() { StageCode = "STAGING", StageName = "订单交付域 · 贴源同步", Layer = "mdp_stg", Description = $"按 mdp_entity 登记订单交付域源对象抽取数据,保留 raw_data JSON 便于追溯;产出供 {consumers} 消费。", InputObjects = "旧系统 / 当前库源对象", OutputObjects = "mdp_stg_so, mdp_stg_ship_trans", Execution = "S1MdpSyncTransformService.SyncStagingAsync" },
  397. new() { StageCode = "STANDARD", StageName = "订单交付域 · 标准层转换", Layer = "mdp_std", Description = "解析贴源 raw_data,做字段标准化、租户兜底和幂等写入。", InputObjects = "mdp_stg_so, mdp_stg_ship_trans", OutputObjects = "mdp_std_so, mdp_std_ship_trans", Execution = "S1MdpSyncTransformService.BuildStandardCommands" },
  398. new() { StageCode = "DWD", StageName = "订单交付域 · DWD宽表", Layer = "dwd", Description = $"沉淀订单交付事实,供 {consumers} 看板与诊断读取。", InputObjects = "mdp_std_so, mdp_std_ship_trans", OutputObjects = "dwd_ship_trans", Execution = "S1MdpSyncTransformService.BuildDwdAsync" },
  399. new() { StageCode = "KPI", StageName = "指标写入", Layer = "ado_s9", Description = $"计算订单交付域 L1 指标并写入统一指标值表,供 {consumers} 消费。", InputObjects = "mdp_std_so, dwd_ship_trans", OutputObjects = "ado_s9_kpi_value_l1_day", Execution = "S1MdpSyncTransformService.BuildS1KpiValuesAsync" }
  400. };
  401. }
  402. private static List<MdpLineageStageRow> BuildWorkScheduleStages(MdpJobCatalogItem item)
  403. {
  404. var consumers = FormatConsumerModulesLabel(item);
  405. return new List<MdpLineageStageRow>
  406. {
  407. new() { StageCode = "STAGING", StageName = "工单排程域 · 贴源同步", Layer = "mdp_stg", Description = $"按 mdp_entity 登记工单排程域源对象抽取数据;产出供 {consumers} 消费。", InputObjects = "工单 / 工序 / 排程源对象", OutputObjects = "mdp_stg_*", Execution = "S2MdpSyncTransformService.SyncStagingAsync" },
  408. new() { StageCode = "STANDARD", StageName = "工单排程域 · 标准层转换", Layer = "mdp_std", Description = "将工单、工序、排程等对象标准化。", InputObjects = "mdp_stg_*", OutputObjects = "mdp_std_*", Execution = "S2MdpSyncTransformService.BuildStandardCommands" },
  409. new() { StageCode = "DWD", StageName = "工单排程域 · DWD宽表", Layer = "dwd", Description = $"生成制造执行与排程分析宽表,供 {consumers} 读取。", InputObjects = "mdp_std_*", OutputObjects = "dwd_*", Execution = "S2MdpSyncTransformService.BuildDwdAsync" },
  410. new() { StageCode = "KPI", StageName = "指标写入", Layer = "ado_s9", Description = $"写入工单排程域 KPI,供 {consumers} 消费。", InputObjects = "mdp_std_* / dwd_*", OutputObjects = "ado_s9_kpi_value_*", Execution = "S2MdpSyncTransformService.BuildS2KpiValuesAsync" }
  411. };
  412. }
  413. private static List<MdpLineageStageRow> BuildSupplyPurchaseStages(MdpJobCatalogItem item)
  414. {
  415. var consumers = FormatConsumerModulesLabel(item);
  416. return new List<MdpLineageStageRow>
  417. {
  418. new() { StageCode = "STAGING", StageName = "供应采购域 · 贴源同步", Layer = "mdp_stg", Description = $"按 mdp_entity 登记供应采购域源对象抽取数据;产出供 {consumers} 消费。", InputObjects = "供应 / 物料 / 采购源对象", OutputObjects = "mdp_stg_*", Execution = "S3MdpSyncTransformService.SyncStagingAsync" },
  419. new() { StageCode = "STANDARD", StageName = "供应采购域 · 标准层转换", Layer = "mdp_std", Description = "将供应、物料、采购、交货计划等对象标准化。", InputObjects = "mdp_stg_*", OutputObjects = "mdp_std_*", Execution = "S3MdpSyncTransformService.BuildStandardCommands" },
  420. new() { StageCode = "DWD", StageName = "供应采购域 · DWD宽表", Layer = "dwd", Description = $"生成供应交付、齐套、风险等分析宽表,供 {consumers} 读取。", InputObjects = "mdp_std_*", OutputObjects = "dwd_supplier_delivery / dwd_material_readiness 等", Execution = "S3MdpSyncTransformService.BuildDwdAsync" },
  421. new() { StageCode = "KPI", StageName = "指标写入", Layer = "ado_s9", Description = $"写入供应采购域指标,供 {consumers} 消费。", InputObjects = "mdp_std_* / dwd_*", OutputObjects = "ado_s9_kpi_value_*", Execution = "S3MdpSyncTransformService.BuildS3KpiValuesAsync" }
  422. };
  423. }
  424. private static List<MdpLineageStageRow> BuildPurchaseExecutionStages(MdpJobCatalogItem item)
  425. {
  426. var consumers = FormatConsumerModulesLabel(item);
  427. return new List<MdpLineageStageRow>
  428. {
  429. new() { StageCode = "STAGING", StageName = "采购执行域 · 贴源同步", Layer = "mdp_stg_s4_*", Description = $"同步采购执行域 IQC/发货/退货/欠料事实,供 {consumers} 消费;共享采购主链仍由供应采购域维护。", InputObjects = "PurOrdRctDetail / scm_shdzb / srm_polist_ds / dwd_material_shortage", OutputObjects = "mdp_stg_s4_iqc / mdp_stg_s4_shipment / mdp_stg_s4_return / mdp_stg_s4_shortage", Execution = "S4MdpSyncTransformService.SyncStagingAsync" },
  430. new() { StageCode = "STANDARD", StageName = "采购执行域 · 标准层转换", Layer = "mdp_std_s4_*", Description = "将采购执行域贴源对象标准化,并统计供应采购域共享标准层行数。", InputObjects = "mdp_stg_s4_*", OutputObjects = "mdp_std_s4_iqc / mdp_std_s4_shipment / mdp_std_s4_return / mdp_std_s4_shortage", Execution = "S4MdpSyncTransformService.BuildStandardCommands" },
  431. new() { StageCode = "DWD", StageName = "采购执行域 · DWD宽表", Layer = "dwd", Description = $"写入采购执行分析宽表,供 {consumers} 读取。", InputObjects = "dwd_supplier_delivery / mdp_std_s4_*", OutputObjects = "dwd_s4_purchase_execution / dwd_po_trans / dwd_qc_trans", Execution = "S4MdpSyncTransformService.BuildDwdAsync" },
  432. new() { StageCode = "KPI", StageName = "指标写入", Layer = "ado_s9", Description = $"写入采购执行域 L1/L2/L3 指标,供 {consumers} 消费。", InputObjects = "mdp_std_delivery_schedule / dwd_supplier_delivery / dwd_s4_purchase_execution", OutputObjects = "ado_s9_kpi_value_l1/l2/l3_day", Execution = "S4MdpSyncTransformService.BuildS4KpiValuesAsync" }
  433. };
  434. }
  435. /// <summary>
  436. /// 贴源同步在代码中维护、但未写入 mdp_field_mapping 的实体主键/业务键提示(仅监控展示兜底)。
  437. /// </summary>
  438. private static readonly Dictionary<string, (string SourceRowId, string SourceBizKeyExpr)> KnownStagingEntityKeys =
  439. new(StringComparer.OrdinalIgnoreCase)
  440. {
  441. ["S4_IQC_RECEIPT"] = ("RecID", "CONCAT(IFNULL(s.`Domain`,''), ':', IFNULL(s.`Receiver`,''), ':', IFNULL(s.`Line`,''))"),
  442. ["S4_SHIPMENT_EXEC"] = ("id", "CONCAT(IFNULL(s.`glid`,''), ':', IFNULL(s.`id`,''))"),
  443. ["S4_RETURN_EXEC"] = ("Id", "s.`dsnum`"),
  444. ["S4_SHORTAGE_EXEC"] = ("id", "CONCAT(IFNULL(s.`work_order`,''), ':', IFNULL(s.`component_item_code`,''))")
  445. };
  446. private static List<MdpLineageFieldMappingRow> BuildFallbackFieldMappings(MdpLineageEntityRow entity, string? batchId)
  447. {
  448. var sourceRowId = entity.IncrColumn;
  449. string? sourceBizKeyExpr = null;
  450. if (!string.IsNullOrWhiteSpace(entity.EntityCode) &&
  451. KnownStagingEntityKeys.TryGetValue(entity.EntityCode, out var known))
  452. {
  453. sourceRowId ??= known.SourceRowId;
  454. sourceBizKeyExpr = known.SourceBizKeyExpr;
  455. }
  456. var fallbackNote = "当前实体未配置逐字段映射,贴源同步保留源行 raw_data";
  457. var mappings = new List<MdpLineageFieldMappingRow>();
  458. var sort = 10;
  459. void Add(string sourceField, string targetField, string fieldType, string? transformScript, string? constValue = null, bool isRequired = false)
  460. {
  461. mappings.Add(new MdpLineageFieldMappingRow
  462. {
  463. EntityId = entity.Id,
  464. SourceField = sourceField,
  465. TargetField = targetField,
  466. FieldType = fieldType,
  467. TransformScript = transformScript,
  468. ConstValue = constValue,
  469. IsRequired = isRequired,
  470. SortOrder = sort,
  471. MappingSource = "FALLBACK",
  472. IsFallback = true
  473. });
  474. sort += 10;
  475. }
  476. Add("tenant_id", "tenant_id", "DIRECT", "当前租户或全局租户兜底");
  477. Add($"CONST:{entity.SourceCode ?? "AIDOP"}", "source_system", "CONST", "来源数据源编码", entity.SourceCode ?? "AIDOP", isRequired: true);
  478. Add($"CONST:{entity.SourceTableName ?? entity.SourceApiPath ?? "--"}", "source_table", "CONST", "源表名", entity.SourceTableName ?? entity.SourceApiPath, isRequired: true);
  479. if (!string.IsNullOrWhiteSpace(sourceRowId))
  480. Add(sourceRowId, "source_row_id", "DIRECT", "来自 mdp_entity.incr_column 或实体主键配置");
  481. else
  482. Add("--", "source_row_id", "DIRECT", fallbackNote);
  483. if (!string.IsNullOrWhiteSpace(sourceBizKeyExpr))
  484. Add(sourceBizKeyExpr, "source_biz_key", "EXPR", "来自实体业务键表达式", isRequired: true);
  485. else
  486. Add("--", "source_biz_key", "EXPR", fallbackNote);
  487. Add("*", "raw_data", "JSON", "源行整行 JSON");
  488. Add("CONST", "sync_batch_id", "CONST", "当前同步批次", string.IsNullOrWhiteSpace(batchId) ? "当前同步批次" : batchId, isRequired: true);
  489. Add("NOW()", "sync_time", "CONST", "同步时间", isRequired: true);
  490. return mappings;
  491. }
  492. private static List<MdpLineageStageRow> BuildGenericStages(string? moduleCode)
  493. {
  494. if (string.Equals(moduleCode, "S1", StringComparison.OrdinalIgnoreCase))
  495. return BuildOrderDeliveryStages(JobCatalog["S1_MDP_SYNC_TRANSFORM"]);
  496. if (string.Equals(moduleCode, "S2", StringComparison.OrdinalIgnoreCase))
  497. return BuildWorkScheduleStages(JobCatalog["S2_MDP_SYNC_TRANSFORM"]);
  498. if (string.Equals(moduleCode, "S3", StringComparison.OrdinalIgnoreCase))
  499. return BuildSupplyPurchaseStages(JobCatalog["S3_MDP_SYNC_TRANSFORM"]);
  500. if (string.Equals(moduleCode, "S4", StringComparison.OrdinalIgnoreCase))
  501. return BuildPurchaseExecutionStages(JobCatalog["S4_MDP_SYNC_TRANSFORM"]);
  502. return new List<MdpLineageStageRow>
  503. {
  504. new() { StageCode = "STAGING", StageName = "贴源同步", Layer = "mdp_stg", Description = "按 mdp_entity 登记源对象抽取数据。", InputObjects = "源对象", OutputObjects = "mdp_stg_*", Execution = "MDP 同步服务" },
  505. new() { StageCode = "STANDARD", StageName = "标准层转换", Layer = "mdp_std", Description = "标准层/DWD/KPI 当前由后端 Service 承载。", InputObjects = "mdp_stg_*", OutputObjects = "mdp_std_* / dwd_* / 指标表", Execution = "MDP 转换服务" }
  506. };
  507. }
  508. private static string? ResolveModuleCodeFromJobCode(string? jobCode)
  509. {
  510. if (string.IsNullOrWhiteSpace(jobCode))
  511. return null;
  512. return ModuleJobCodes.FirstOrDefault(u =>
  513. string.Equals(u.Value, jobCode.Trim(), StringComparison.OrdinalIgnoreCase)).Key;
  514. }
  515. private static string? ResolveJobCode(string? moduleCode, string? jobCode)
  516. {
  517. if (!string.IsNullOrWhiteSpace(jobCode))
  518. return jobCode.Trim().ToUpperInvariant();
  519. if (string.IsNullOrWhiteSpace(moduleCode))
  520. return null;
  521. return ModuleJobCodes.TryGetValue(moduleCode.Trim(), out var mapped) ? mapped : null;
  522. }
  523. private static string? ResolveModuleCode(string? moduleCode, string? jobCode)
  524. {
  525. if (!string.IsNullOrWhiteSpace(moduleCode))
  526. return moduleCode.Trim().ToUpperInvariant();
  527. if (string.IsNullOrWhiteSpace(jobCode))
  528. return null;
  529. var normalizedJobCode = jobCode.Trim();
  530. return ModuleJobCodes.FirstOrDefault(u => string.Equals(u.Value, normalizedJobCode, StringComparison.OrdinalIgnoreCase)).Key;
  531. }
  532. private static string? BuildObjectFullName(string? dbType, string? host, int? port, string? dbName, string? objectName)
  533. {
  534. if (string.IsNullOrWhiteSpace(objectName))
  535. return null;
  536. var databaseObject = string.IsNullOrWhiteSpace(dbName) ? objectName : $"{dbName}.{objectName}";
  537. var hostPart = string.IsNullOrWhiteSpace(host) ? null : port.HasValue ? $"{host}:{port}" : host;
  538. return string.Join(" / ", new[] { dbType, hostPart, databaseObject }.Where(u => !string.IsNullOrWhiteSpace(u)));
  539. }
  540. private static string SelectColumnsSql()
  541. {
  542. return """
  543. SELECT id AS Id, tenant_id AS TenantId, job_code AS JobCode, job_name AS JobName, trigger_type AS TriggerType,
  544. batch_id AS BatchId, status AS Status, start_time AS StartTime, end_time AS EndTime, duration_ms AS DurationMs,
  545. stage_rows AS StageRows, standard_rows AS StandardRows, dwd_rows AS DwdRows,
  546. error_message AS ErrorMessage, summary_json AS SummaryJson, create_time AS CreateTime, update_time AS UpdateTime
  547. """;
  548. }
  549. }
  550. public sealed record MdpJobCatalogItem(
  551. string JobCode,
  552. string BusinessDomainCode,
  553. string BusinessDomainName,
  554. string[] ConsumerModules,
  555. string ScopeType,
  556. string DisplayName,
  557. string? ScheduleJobId);
  558. public class MdpMonitorQueryInput
  559. {
  560. public string? BusinessDomainCode { get; set; }
  561. public string? ConsumerModule { get; set; }
  562. public string? ModuleCode { get; set; }
  563. public string? JobCode { get; set; }
  564. public string? BatchId { get; set; }
  565. public string? Status { get; set; }
  566. public DateTime? StartTime { get; set; }
  567. public DateTime? EndTime { get; set; }
  568. }
  569. public sealed class MdpMonitorListInput : MdpMonitorQueryInput
  570. {
  571. public int Page { get; set; } = 1;
  572. public int PageSize { get; set; } = 10;
  573. }
  574. public sealed class MdpMonitorLineageInput : MdpMonitorQueryInput
  575. {
  576. }
  577. public sealed class MdpMonitorRunLogRow
  578. {
  579. public long Id { get; set; }
  580. public long TenantId { get; set; }
  581. public string? JobCode { get; set; }
  582. public string? JobName { get; set; }
  583. public string? TriggerType { get; set; }
  584. public string? BatchId { get; set; }
  585. public string? Status { get; set; }
  586. public DateTime? StartTime { get; set; }
  587. public DateTime? EndTime { get; set; }
  588. public int? DurationMs { get; set; }
  589. public int? StageRows { get; set; }
  590. public int? StandardRows { get; set; }
  591. public int? DwdRows { get; set; }
  592. public string? ErrorMessage { get; set; }
  593. public string? SummaryJson { get; set; }
  594. public DateTime? CreateTime { get; set; }
  595. public DateTime? UpdateTime { get; set; }
  596. public string? BusinessDomainCode { get; set; }
  597. public string? BusinessDomainName { get; set; }
  598. public string? ConsumerModules { get; set; }
  599. public string? ScopeType { get; set; }
  600. public string? DisplayName { get; set; }
  601. public string? ScheduleJobId { get; set; }
  602. }
  603. public sealed class MdpLineageOutput
  604. {
  605. public string? ModuleCode { get; set; }
  606. public string? JobCode { get; set; }
  607. public string? BatchId { get; set; }
  608. public string? BusinessDomainCode { get; set; }
  609. public string? BusinessDomainName { get; set; }
  610. public string? ConsumerModules { get; set; }
  611. public string? ScopeType { get; set; }
  612. public string? DisplayName { get; set; }
  613. public string? ScheduleJobId { get; set; }
  614. public List<MdpLineageStageRow> Stages { get; set; } = new();
  615. public List<MdpLineageEntityRow> Entities { get; set; } = new();
  616. }
  617. public sealed class MdpLineageStageRow
  618. {
  619. public string? StageCode { get; set; }
  620. public string? StageName { get; set; }
  621. public string? Layer { get; set; }
  622. public string? Description { get; set; }
  623. public string? InputObjects { get; set; }
  624. public string? OutputObjects { get; set; }
  625. public string? Execution { get; set; }
  626. }
  627. public sealed class MdpLineageEntityRow
  628. {
  629. public long Id { get; set; }
  630. public string? EntityCode { get; set; }
  631. public string? EntityName { get; set; }
  632. public string? EntityType { get; set; }
  633. public string? SourceCode { get; set; }
  634. public string? SourceName { get; set; }
  635. public string? SourceType { get; set; }
  636. public string? SourceDbType { get; set; }
  637. public string? SourceDbHost { get; set; }
  638. public int? SourceDbPort { get; set; }
  639. public string? SourceDbName { get; set; }
  640. public string? SourceTableName { get; set; }
  641. public string? SourceApiPath { get; set; }
  642. public string? SourceFullName { get; set; }
  643. public string? TargetDbType { get; set; }
  644. public string? TargetDbHost { get; set; }
  645. public int? TargetDbPort { get; set; }
  646. public string? TargetDbName { get; set; }
  647. public string? TargetTableName { get; set; }
  648. public string? TargetFullName { get; set; }
  649. public string? SyncMode { get; set; }
  650. public string? IncrColumn { get; set; }
  651. public int? Status { get; set; }
  652. public int FieldMappingCount { get; set; }
  653. public List<MdpLineageFieldMappingRow> FieldMappings { get; set; } = new();
  654. public MdpLineageSyncLogRow? SyncLog { get; set; }
  655. }
  656. public sealed class MdpLineageFieldMappingRow
  657. {
  658. public long EntityId { get; set; }
  659. public string? SourceField { get; set; }
  660. public string? TargetField { get; set; }
  661. public string? FieldType { get; set; }
  662. public string? TransformScript { get; set; }
  663. public string? ConstValue { get; set; }
  664. public string? LookupTable { get; set; }
  665. public bool IsRequired { get; set; }
  666. public string? DefaultValue { get; set; }
  667. public int SortOrder { get; set; }
  668. public string? MappingSource { get; set; }
  669. public bool IsFallback { get; set; }
  670. }
  671. public sealed class MdpLineageSyncLogRow
  672. {
  673. public long EntityId { get; set; }
  674. public string? EntityName { get; set; }
  675. public string? Status { get; set; }
  676. public long? RowsRead { get; set; }
  677. public long? RowsInsert { get; set; }
  678. public long? RowsUpdate { get; set; }
  679. public long? RowsSkip { get; set; }
  680. public long? RowsError { get; set; }
  681. public DateTime? SyncStart { get; set; }
  682. public DateTime? SyncEnd { get; set; }
  683. public int? DurationMs { get; set; }
  684. public string? ErrorMsg { get; set; }
  685. }