S5MdpSyncTransformService.cs 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664
  1. using System.Text.Json;
  2. namespace Admin.NET.Plugin.AiDOP.MaterialWarehouse;
  3. /// <summary>
  4. /// S5 物料仓储 — T8 KPI 数据底座与刷新转换服务。
  5. /// 路径 A:方老师 v5.4 KPI 字段对照表 J 列 SQL 原逻辑直发 T8 SQL Server(ConfigId=t8_v5);
  6. /// 一期不做 mdp_stg_t8_* 贴源层;结果直接落 dwd_t8_* 与 ado_s9_kpi_value_l1_day。
  7. /// 包含 KPI:S5_L1_001 物料上线周期 / S5_L1_002 物料上线满足率 /
  8. /// S5_L1_003 物料仓储人效 / S5_L1_004 品类物料库存周转。
  9. /// </summary>
  10. public class S5MdpSyncTransformService : ITransient
  11. {
  12. private readonly ISqlSugarClient _db;
  13. private const string JobCode = "S5_MDP_SYNC_TRANSFORM";
  14. private const string JobName = "S5 物料仓储 MDP 同步与转换";
  15. private const string T8ConfigId = "t8_v5";
  16. private const string ModuleCode = "S5";
  17. public S5MdpSyncTransformService(ISqlSugarClient db)
  18. {
  19. _db = db;
  20. }
  21. public async Task<S5MdpSyncTransformResult> RunFullAsync(
  22. CancellationToken cancellationToken = default,
  23. string triggerType = "AUTO",
  24. S5MdpRefreshOption? option = null)
  25. {
  26. cancellationToken.ThrowIfCancellationRequested();
  27. option ??= S5MdpRefreshOption.Default();
  28. var now = DateTime.Now;
  29. var batchId = $"S5_MDP_FULL_{now:yyyyMMddHHmmss}";
  30. var normalizedTrigger = NormalizeTriggerType(triggerType);
  31. var runLogId = await InsertTransformRunLogAsync(batchId, now, normalizedTrigger, option);
  32. var result = new S5MdpSyncTransformResult
  33. {
  34. BatchId = batchId,
  35. RunLogId = runLogId,
  36. TriggerType = normalizedTrigger,
  37. SourceZtid = option.SourceZtid,
  38. BizDate = option.BizDate,
  39. BizMonth = option.BizMonth,
  40. DailyPeriodStart = option.DailyPeriodStart,
  41. DailyPeriodEnd = option.DailyPeriodEnd,
  42. MonthlyPeriodStart = option.MonthlyPeriodStart,
  43. MonthlyPeriodEnd = option.MonthlyPeriodEnd
  44. };
  45. try
  46. {
  47. // 路径 A:直发 T8 SQL,不做 stg / std 中间层
  48. result.StageRows = 0;
  49. result.StandardRows = 0;
  50. var sub16 = await BuildS5L1001MaterialOnlineCycleAsync(batchId, now, option, cancellationToken);
  51. result.MergeSub("S5_L1_001", sub16);
  52. var sub17 = await BuildS5L1002MaterialOnlineFulfillmentAsync(batchId, now, option, cancellationToken);
  53. result.MergeSub("S5_L1_002", sub17);
  54. var sub18 = await BuildS5L1003MaterialWarehouseEfficiencyAsync(batchId, now, option, cancellationToken);
  55. result.MergeSub("S5_L1_003", sub18);
  56. var sub19 = await BuildS5L1004MaterialInventoryTurnoverAsync(batchId, now, option, cancellationToken);
  57. result.MergeSub("S5_L1_004", sub19);
  58. await MarkTransformRunSuccessAsync(runLogId, now, result);
  59. return result;
  60. }
  61. catch (Exception ex)
  62. {
  63. await MarkTransformRunFailedAsync(runLogId, now, ex.Message);
  64. throw;
  65. }
  66. }
  67. // ─────────────────────────────────────────────────────────────────────────
  68. // KPI 实现(方老师 v5.4 KPI J 列 SQL 原逻辑直发 T8)
  69. // ─────────────────────────────────────────────────────────────────────────
  70. /// <summary>S5_L1_001 物料上线周期 = 配送到产线日期(lbs=生产领料) - 收货日期(lbs=采购入库)。</summary>
  71. private async Task<KpiBuildSubResult> BuildS5L1001MaterialOnlineCycleAsync(
  72. string batchId, DateTime now, S5MdpRefreshOption option, CancellationToken ct)
  73. {
  74. var sub = new KpiBuildSubResult();
  75. const string sqlOnline = @"
  76. select b.code as code, min(a.shtime) as shtime
  77. from kc_tz_head a with(nolock)
  78. inner join kc_tz_list b with(nolock) on a.Id=b.idid
  79. where a.ztid=@ztid and a.lbs='生产领料' and a.hzyn=0 and a.zfyn=0 and a.shyn=1
  80. group by b.code";
  81. const string sqlReceipt = @"
  82. select b.code as code, min(a.shtime) as shtime
  83. from kc_tz_head a with(nolock)
  84. inner join kc_tz_list b with(nolock) on a.Id=b.idid
  85. where a.ztid=@ztid and a.lbs='采购入库' and a.hzyn=0 and a.zfyn=0 and a.shyn=1
  86. group by b.code";
  87. var p = new[] { new SugarParameter("@ztid", option.SourceZtid) };
  88. var onlineRows = await QueryT8Async<S5OnlineCycleRow>(sqlOnline, p);
  89. var receiptRows = await QueryT8Async<S5OnlineCycleRow>(sqlReceipt, p);
  90. sub.T8Rows = onlineRows.Count + receiptRows.Count;
  91. var onlineByCode = onlineRows.Where(r => !string.IsNullOrEmpty(r.code))
  92. .ToDictionary(r => r.code!, r => r.shtime, StringComparer.OrdinalIgnoreCase);
  93. var receiptByCode = receiptRows.Where(r => !string.IsNullOrEmpty(r.code))
  94. .ToDictionary(r => r.code!, r => r.shtime, StringComparer.OrdinalIgnoreCase);
  95. var allCodes = new HashSet<string>(onlineByCode.Keys, StringComparer.OrdinalIgnoreCase);
  96. allCodes.UnionWith(receiptByCode.Keys);
  97. var dwdAffected = 0;
  98. var cycleDaysList = new List<int>();
  99. foreach (var code in allCodes)
  100. {
  101. ct.ThrowIfCancellationRequested();
  102. var online = onlineByCode.GetValueOrDefault(code);
  103. var receipt = receiptByCode.GetValueOrDefault(code);
  104. int? cycleDays = null;
  105. if (online.HasValue && receipt.HasValue)
  106. {
  107. cycleDays = (int)(online.Value.Date - receipt.Value.Date).TotalDays;
  108. cycleDaysList.Add(cycleDays.Value);
  109. }
  110. dwdAffected += await _db.Ado.ExecuteCommandAsync(@"
  111. INSERT INTO dwd_t8_material_online_cycle
  112. (tenant_id, factory_id, biz_date, source_ztid, item_code, online_date, receipt_date, cycle_days, batch_id, create_time)
  113. VALUES
  114. (0, 1, @bizDate, @ztid, @itemCode, @online, @receipt, @cycleDays, @batchId, @now)
  115. ON DUPLICATE KEY UPDATE
  116. online_date=VALUES(online_date), receipt_date=VALUES(receipt_date),
  117. cycle_days=VALUES(cycle_days), batch_id=VALUES(batch_id), update_time=@now",
  118. new SugarParameter("@bizDate", option.BizDate),
  119. new SugarParameter("@ztid", option.SourceZtid),
  120. new SugarParameter("@itemCode", code),
  121. new SugarParameter("@online", online),
  122. new SugarParameter("@receipt", receipt),
  123. new SugarParameter("@cycleDays", cycleDays),
  124. new SugarParameter("@batchId", batchId),
  125. new SugarParameter("@now", now));
  126. }
  127. sub.DwdRows = dwdAffected;
  128. // KPI 值:所有物料 cycle_days 的算术平均;没有任一可计算物料时写 NULL,不伪装 0
  129. decimal? metricValue = cycleDaysList.Count > 0
  130. ? (decimal)cycleDaysList.Average()
  131. : null;
  132. sub.KpiRows = await UpsertKpiValueAsync("S5_L1_001", option.BizDate, metricValue, now);
  133. sub.DenominatorStatus = cycleDaysList.Count > 0 ? "OK" : "NO_NUMERATOR";
  134. return sub;
  135. }
  136. /// <summary>S5_L1_002 物料上线满足率 = 开工日期前完成上线行数 / 工单物料总行数。</summary>
  137. private async Task<KpiBuildSubResult> BuildS5L1002MaterialOnlineFulfillmentAsync(
  138. string batchId, DateTime now, S5MdpRefreshOption option, CancellationToken ct)
  139. {
  140. var sub = new KpiBuildSubResult();
  141. const string sqlNumer = @"
  142. select lynoid as lynoid, count(*) as codenum
  143. from (
  144. select a.lynoid as lynoid, b.code as code
  145. from kc_tz_head a with(nolock)
  146. inner join kc_tz_list b with(nolock) on a.Id=b.idid
  147. left join (
  148. select noid as noid, min(kgdate) as kgdate
  149. from Cj_Bg_Head_Rep with(nolock)
  150. where ztid=@ztid group by noid
  151. ) c on a.lynoid=c.noid
  152. where a.ztid=@ztid and a.lbs='生产领料' and a.hzyn=0 and a.zfyn=0 and a.shyn=1 and a.shtime<=c.kgdate
  153. group by a.lynoid, b.code
  154. ) n
  155. group by lynoid";
  156. const string sqlDenom = @"
  157. select a.noid as noid, count(b.id) as listnum
  158. from kc_dd_head a with(nolock)
  159. left join kc_dd_list_cllist b with(nolock) on a.Id=b.idid
  160. where a.ztid=@ztid and a.lbs='生产任务' and a.zf=0 and a.shyn=1
  161. group by a.noid";
  162. var p = new[] { new SugarParameter("@ztid", option.SourceZtid) };
  163. var numerRows = await QueryT8Async<S5FulfillmentNumerRow>(sqlNumer, p);
  164. var denomRows = await QueryT8Async<S5FulfillmentDenomRow>(sqlDenom, p);
  165. sub.T8Rows = numerRows.Count + denomRows.Count;
  166. var numerByOrder = numerRows.Where(r => !string.IsNullOrEmpty(r.lynoid))
  167. .ToDictionary(r => r.lynoid!, r => r.codenum, StringComparer.OrdinalIgnoreCase);
  168. var dwdAffected = 0;
  169. var rateList = new List<decimal>();
  170. foreach (var d in denomRows)
  171. {
  172. ct.ThrowIfCancellationRequested();
  173. if (string.IsNullOrEmpty(d.noid)) continue;
  174. var beforeKg = numerByOrder.GetValueOrDefault(d.noid, 0);
  175. decimal? rate = d.listnum > 0
  176. ? Math.Round((decimal)beforeKg / d.listnum, 4)
  177. : null; // 分母为 0 时不伪装真实 0
  178. if (rate.HasValue) rateList.Add(rate.Value);
  179. dwdAffected += await _db.Ado.ExecuteCommandAsync(@"
  180. INSERT INTO dwd_t8_material_online_fulfillment
  181. (tenant_id, factory_id, biz_date, source_ztid, work_order_no,
  182. before_kgdate_rows, total_rows, fulfillment_rate, batch_id, create_time)
  183. VALUES
  184. (0, 1, @bizDate, @ztid, @workOrderNo, @beforeKg, @total, @rate, @batchId, @now)
  185. ON DUPLICATE KEY UPDATE
  186. before_kgdate_rows=VALUES(before_kgdate_rows),
  187. total_rows=VALUES(total_rows),
  188. fulfillment_rate=VALUES(fulfillment_rate),
  189. batch_id=VALUES(batch_id), update_time=@now",
  190. new SugarParameter("@bizDate", option.BizDate),
  191. new SugarParameter("@ztid", option.SourceZtid),
  192. new SugarParameter("@workOrderNo", d.noid),
  193. new SugarParameter("@beforeKg", beforeKg),
  194. new SugarParameter("@total", d.listnum),
  195. new SugarParameter("@rate", rate),
  196. new SugarParameter("@batchId", batchId),
  197. new SugarParameter("@now", now));
  198. }
  199. sub.DwdRows = dwdAffected;
  200. decimal? metricValue = rateList.Count > 0
  201. ? Math.Round(rateList.Average() * 100m, 4) // 百分号
  202. : null;
  203. sub.KpiRows = await UpsertKpiValueAsync("S5_L1_002", option.BizDate, metricValue, now);
  204. sub.DenominatorStatus = rateList.Count > 0 ? "OK" : "NO_VALID_ORDER";
  205. return sub;
  206. }
  207. /// <summary>S5_L1_003 物料仓储人效 = SUM(slzx where lbs=生产领料) / count(gw=仓管)。</summary>
  208. private async Task<KpiBuildSubResult> BuildS5L1003MaterialWarehouseEfficiencyAsync(
  209. string batchId, DateTime now, S5MdpRefreshOption option, CancellationToken ct)
  210. {
  211. var sub = new KpiBuildSubResult();
  212. const string sqlNumer = @"
  213. select sum(b.slzx) as slzx
  214. from kc_tz_head a with(nolock)
  215. inner join kc_tz_list b with(nolock) on a.Id=b.idid
  216. where a.ztid=@ztid and a.lbs='生产领料' and a.hzyn=0 and a.zfyn=0 and a.shyn=1
  217. and a.shtime between @startDate and @endDate";
  218. const string sqlDenom = @"
  219. select count(*) as penum
  220. from sys_pelist with(nolock)
  221. where ztid=@ztid and zzzt='在职' and gw='仓管'";
  222. var pNumer = new[]
  223. {
  224. new SugarParameter("@ztid", option.SourceZtid),
  225. new SugarParameter("@startDate", option.MonthlyPeriodStart),
  226. new SugarParameter("@endDate", option.MonthlyPeriodEnd)
  227. };
  228. var pDenom = new[] { new SugarParameter("@ztid", option.SourceZtid) };
  229. var numerRows = await QueryT8Async<S5SumQtyRow>(sqlNumer, pNumer);
  230. var denomRows = await QueryT8Async<S5CountRow>(sqlDenom, pDenom);
  231. sub.T8Rows = numerRows.Count + denomRows.Count;
  232. decimal? onlineQty = numerRows.FirstOrDefault()?.slzx;
  233. int? headcount = denomRows.FirstOrDefault()?.penum;
  234. // 分母 = 0 或 NULL:efficiency 写 NULL,并标记 denominator_status;不伪装真实 0
  235. decimal? efficiency = null;
  236. string denomStatus;
  237. if (!headcount.HasValue || headcount.Value <= 0)
  238. {
  239. denomStatus = "NO_HEADCOUNT";
  240. }
  241. else if (!onlineQty.HasValue)
  242. {
  243. denomStatus = "NO_NUMERATOR";
  244. }
  245. else
  246. {
  247. efficiency = Math.Round(onlineQty.Value / headcount.Value, 4);
  248. denomStatus = "OK";
  249. }
  250. sub.DenominatorStatus = denomStatus;
  251. // 月度 KPI 用 biz_month 唯一键,整月 1 行
  252. var dwdAffected = await _db.Ado.ExecuteCommandAsync(@"
  253. INSERT INTO dwd_t8_material_warehouse_efficiency
  254. (tenant_id, factory_id, biz_month, source_ztid, period_start, period_end,
  255. online_qty, warehouse_headcount, efficiency, denominator_status, batch_id, create_time)
  256. VALUES
  257. (0, 1, @bizMonth, @ztid, @periodStart, @periodEnd,
  258. @onlineQty, @headcount, @efficiency, @denomStatus, @batchId, @now)
  259. ON DUPLICATE KEY UPDATE
  260. period_start=VALUES(period_start), period_end=VALUES(period_end),
  261. online_qty=VALUES(online_qty), warehouse_headcount=VALUES(warehouse_headcount),
  262. efficiency=VALUES(efficiency), denominator_status=VALUES(denominator_status),
  263. batch_id=VALUES(batch_id), update_time=@now",
  264. new SugarParameter("@bizMonth", option.BizMonth),
  265. new SugarParameter("@ztid", option.SourceZtid),
  266. new SugarParameter("@periodStart", option.MonthlyPeriodStart),
  267. new SugarParameter("@periodEnd", option.MonthlyPeriodEnd),
  268. new SugarParameter("@onlineQty", onlineQty),
  269. new SugarParameter("@headcount", headcount),
  270. new SugarParameter("@efficiency", efficiency),
  271. new SugarParameter("@denomStatus", denomStatus),
  272. new SugarParameter("@batchId", batchId),
  273. new SugarParameter("@now", now));
  274. sub.DwdRows = dwdAffected;
  275. // 月度 KPI 入日表:用月末日作 biz_date,月内每天可由聚合 API 再分发;分母缺失时 metric_value=NULL
  276. sub.KpiRows = await UpsertKpiValueAsync("S5_L1_003", option.MonthlyPeriodEnd, efficiency, now);
  277. return sub;
  278. }
  279. /// <summary>S5_L1_004 品类物料库存周转 = D1/D2 × 30;D1=je3 月均库存金额,D2=je2 出库成本。</summary>
  280. private async Task<KpiBuildSubResult> BuildS5L1004MaterialInventoryTurnoverAsync(
  281. string batchId, DateTime now, S5MdpRefreshOption option, CancellationToken ct)
  282. {
  283. var sub = new KpiBuildSubResult();
  284. // TVF:Rep_总账_存货_V3(账套, '普通', '正常', 起期 YYYYMM, 止期 YYYYMM)
  285. const string sqlTvf = @"
  286. select ckcode as ckcode, ckname as ckname,
  287. code as code, cname as cname,
  288. pcode as pcode, pname as pname,
  289. je3 as je3, je2 as je2
  290. from dbo.Rep_总账_存货_V3(@ztid, N'普通', N'正常', @startYm, @endYm)";
  291. var p = new[]
  292. {
  293. new SugarParameter("@ztid", option.SourceZtid),
  294. new SugarParameter("@startYm", option.TvfPeriodStartYyyymm),
  295. new SugarParameter("@endYm", option.TvfPeriodEndYyyymm)
  296. };
  297. var tvfRows = await QueryT8Async<S5InventoryTurnoverRow>(sqlTvf, p);
  298. sub.T8Rows = tvfRows.Count;
  299. var dwdAffected = 0;
  300. var turnoverDaysList = new List<decimal>();
  301. foreach (var r in tvfRows)
  302. {
  303. ct.ThrowIfCancellationRequested();
  304. // 周转天数:D2=0 或 NULL 时 NULL,不伪装 0
  305. decimal? turnoverDays = (r.je2.HasValue && r.je2.Value > 0m && r.je3.HasValue)
  306. ? Math.Round(r.je3.Value / r.je2.Value * 30m, 4)
  307. : null;
  308. if (turnoverDays.HasValue) turnoverDaysList.Add(turnoverDays.Value);
  309. dwdAffected += await _db.Ado.ExecuteCommandAsync(@"
  310. INSERT INTO dwd_t8_material_inventory_turnover
  311. (tenant_id, factory_id, biz_month, source_ztid, period_start_yyyymm, period_end_yyyymm,
  312. warehouse_code, warehouse_name, item_code, item_name, category_code, category_name,
  313. avg_inventory_value, monthly_outbound_cost, turnover_days, batch_id, create_time)
  314. VALUES
  315. (0, 1, @bizMonth, @ztid, @startYm, @endYm,
  316. @ckcode, @ckname, @itemCode, @itemName, @pcode, @pname,
  317. @je3, @je2, @turnoverDays, @batchId, @now)
  318. ON DUPLICATE KEY UPDATE
  319. warehouse_name=VALUES(warehouse_name), item_name=VALUES(item_name),
  320. category_code=VALUES(category_code), category_name=VALUES(category_name),
  321. avg_inventory_value=VALUES(avg_inventory_value),
  322. monthly_outbound_cost=VALUES(monthly_outbound_cost),
  323. turnover_days=VALUES(turnover_days),
  324. period_start_yyyymm=VALUES(period_start_yyyymm),
  325. period_end_yyyymm=VALUES(period_end_yyyymm),
  326. batch_id=VALUES(batch_id), update_time=@now",
  327. new SugarParameter("@bizMonth", option.BizMonth),
  328. new SugarParameter("@ztid", option.SourceZtid),
  329. new SugarParameter("@startYm", option.TvfPeriodStartYyyymm),
  330. new SugarParameter("@endYm", option.TvfPeriodEndYyyymm),
  331. new SugarParameter("@ckcode", r.ckcode ?? ""),
  332. new SugarParameter("@ckname", r.ckname),
  333. new SugarParameter("@itemCode", r.code ?? ""),
  334. new SugarParameter("@itemName", r.cname),
  335. new SugarParameter("@pcode", r.pcode),
  336. new SugarParameter("@pname", r.pname),
  337. new SugarParameter("@je3", r.je3),
  338. new SugarParameter("@je2", r.je2),
  339. new SugarParameter("@turnoverDays", turnoverDays),
  340. new SugarParameter("@batchId", batchId),
  341. new SugarParameter("@now", now));
  342. }
  343. sub.DwdRows = dwdAffected;
  344. // KPI 值:所有品类周转天数算术平均;无任一可计算品类时 NULL
  345. decimal? metricValue = turnoverDaysList.Count > 0
  346. ? Math.Round(turnoverDaysList.Average(), 4)
  347. : null;
  348. sub.KpiRows = await UpsertKpiValueAsync("S5_L1_004", option.MonthlyPeriodEnd, metricValue, now);
  349. sub.DenominatorStatus = turnoverDaysList.Count > 0 ? "OK" : "NO_VALID_OUTBOUND_COST";
  350. return sub;
  351. }
  352. // ─────────────────────────────────────────────────────────────────────────
  353. // 跨库 / 写入 / 日志 封装
  354. // ─────────────────────────────────────────────────────────────────────────
  355. private async Task<List<T>> QueryT8Async<T>(string sql, SugarParameter[] parameters)
  356. {
  357. var t8 = _db.AsTenant().GetConnectionScope(T8ConfigId);
  358. return await t8.Ado.SqlQueryAsync<T>(sql, parameters);
  359. }
  360. private async Task<int> UpsertKpiValueAsync(string metricCode, DateTime bizDate, decimal? metricValue, DateTime now)
  361. {
  362. // 沿用 S3 UpsertS3KpiValueAsync 范式:先查现存行 → UPDATE;不存在 → SELECT MAX(id)+1 显式生成 id 后 INSERT。
  363. // ado_s9_kpi_value_l1_day.id 为手工分配主键(无 AUTO_INCREMENT),必须显式 set;
  364. // metric_value 允许 NULL(分母缺失不得伪装真实 0)。
  365. // FIX-2:截断时分秒(月度 KPI 入参可能为 YYYY-MM-DD 23:59:59),保证 SELECT WHERE biz_date=@BizDate 与 DB date 列匹配,避免重复 INSERT。
  366. bizDate = bizDate.Date;
  367. var existingId = await _db.Ado.GetLongAsync(
  368. "SELECT IFNULL((SELECT id FROM ado_s9_kpi_value_l1_day WHERE tenant_id=0 AND factory_id=1 " +
  369. "AND module_code=@ModuleCode AND metric_code=@MetricCode AND biz_date=@BizDate AND is_deleted=0 " +
  370. "ORDER BY id LIMIT 1), 0)",
  371. new List<SugarParameter>
  372. {
  373. new("@ModuleCode", ModuleCode),
  374. new("@MetricCode", metricCode),
  375. new("@BizDate", bizDate)
  376. });
  377. if (existingId > 0)
  378. {
  379. return await _db.Ado.ExecuteCommandAsync(
  380. "UPDATE ado_s9_kpi_value_l1_day SET metric_value=@MetricValue, calc_time=@Now, " +
  381. "update_time=@Now, is_deleted=0, is_active=1 WHERE id=@Id",
  382. new SugarParameter("@MetricValue", metricValue),
  383. new SugarParameter("@Now", now),
  384. new SugarParameter("@Id", existingId));
  385. }
  386. var nextId = await _db.Ado.GetLongAsync(
  387. "SELECT COALESCE(MAX(id), 0) + 1 FROM ado_s9_kpi_value_l1_day");
  388. return await _db.Ado.ExecuteCommandAsync(@"
  389. INSERT INTO ado_s9_kpi_value_l1_day
  390. (id, tenant_id, org_id, company_id, factory_id, status, biz_date,
  391. create_time, update_time, is_deleted, is_active,
  392. module_code, metric_code, metric_value, calc_time)
  393. VALUES
  394. (@Id, 0, NULL, NULL, 1, NULL, @BizDate,
  395. @Now, @Now, 0, 1,
  396. @ModuleCode, @MetricCode, @MetricValue, @Now)",
  397. new SugarParameter("@Id", nextId),
  398. new SugarParameter("@BizDate", bizDate),
  399. new SugarParameter("@Now", now),
  400. new SugarParameter("@ModuleCode", ModuleCode),
  401. new SugarParameter("@MetricCode", metricCode),
  402. new SugarParameter("@MetricValue", metricValue));
  403. }
  404. private async Task<long> InsertTransformRunLogAsync(string batchId, DateTime startedAt, string triggerType, S5MdpRefreshOption option)
  405. {
  406. await _db.Ado.ExecuteCommandAsync(@"
  407. INSERT INTO mdp_transform_run_log
  408. (tenant_id, job_code, job_name, trigger_type, batch_id, status, start_time, stage_rows, standard_rows, dwd_rows, create_time, update_time)
  409. VALUES
  410. (0, @JobCode, @JobName, @TriggerType, @BatchId, 'RUNNING', @StartTime, 0, 0, 0, @StartTime, @StartTime)",
  411. new SugarParameter("@JobCode", JobCode),
  412. new SugarParameter("@JobName", JobName),
  413. new SugarParameter("@TriggerType", triggerType),
  414. new SugarParameter("@BatchId", batchId),
  415. new SugarParameter("@StartTime", startedAt));
  416. return await _db.Ado.GetLongAsync(
  417. "SELECT id FROM mdp_transform_run_log WHERE batch_id=@BatchId ORDER BY id DESC LIMIT 1",
  418. new List<SugarParameter> { new("@BatchId", batchId) });
  419. }
  420. private async Task MarkTransformRunSuccessAsync(long runLogId, DateTime startedAt, S5MdpSyncTransformResult result)
  421. {
  422. var finishedAt = DateTime.Now;
  423. await _db.Ado.ExecuteCommandAsync(@"
  424. UPDATE mdp_transform_run_log
  425. SET status='SUCCESS', end_time=@EndTime, duration_ms=@DurationMs,
  426. stage_rows=@StageRows, standard_rows=@StandardRows, dwd_rows=@DwdRows,
  427. summary_json=@SummaryJson, update_time=CURRENT_TIMESTAMP
  428. WHERE id=@Id",
  429. new SugarParameter("@EndTime", finishedAt),
  430. new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds),
  431. new SugarParameter("@StageRows", result.StageRows),
  432. new SugarParameter("@StandardRows", result.StandardRows),
  433. new SugarParameter("@DwdRows", result.DwdRows),
  434. new SugarParameter("@SummaryJson", BuildRunSummaryJson(result)),
  435. new SugarParameter("@Id", runLogId));
  436. }
  437. private async Task MarkTransformRunFailedAsync(long runLogId, DateTime startedAt, string message)
  438. {
  439. try
  440. {
  441. var finishedAt = DateTime.Now;
  442. await _db.Ado.ExecuteCommandAsync(@"
  443. UPDATE mdp_transform_run_log
  444. SET status='FAILED', end_time=@EndTime, duration_ms=@DurationMs,
  445. error_message=@ErrorMessage, update_time=CURRENT_TIMESTAMP
  446. WHERE id=@Id",
  447. new SugarParameter("@EndTime", finishedAt),
  448. new SugarParameter("@DurationMs", (int)(finishedAt - startedAt).TotalMilliseconds),
  449. new SugarParameter("@ErrorMessage", Truncate(message, 2000)),
  450. new SugarParameter("@Id", runLogId));
  451. }
  452. catch (Exception ex)
  453. {
  454. // 写库本身失败兜底:远端 MySQL 瞬断导致 MarkFailed 自身也连不上
  455. Console.Error.WriteLine($"[S5MdpSyncTransform] MarkTransformRunFailed write failed (runLogId={runLogId}): {ex.Message}");
  456. }
  457. }
  458. private static string BuildRunSummaryJson(S5MdpSyncTransformResult r)
  459. {
  460. var summary = new
  461. {
  462. batchId = r.BatchId,
  463. sourceZtid = r.SourceZtid,
  464. bizDate = r.BizDate.ToString("yyyy-MM-dd"),
  465. bizMonth = r.BizMonth,
  466. triggerType = r.TriggerType,
  467. dwdRows = r.DwdRows,
  468. kpiRows = r.KpiRows,
  469. perKpiDwdRows = r.PerKpiDwdRows,
  470. perKpiKpiRows = r.PerKpiKpiRows,
  471. denominatorStatus = r.KpiDenominatorStatus,
  472. tvfPeriod = $"{r.MonthlyPeriodStart:yyyy-MM-dd}~{r.MonthlyPeriodEnd:yyyy-MM-dd}"
  473. };
  474. return JsonSerializer.Serialize(summary);
  475. }
  476. private static string NormalizeTriggerType(string s) =>
  477. string.IsNullOrWhiteSpace(s) ? "AUTO" : s.Trim().ToUpperInvariant();
  478. private static string Truncate(string s, int max) =>
  479. string.IsNullOrEmpty(s) ? "" : (s.Length <= max ? s : s.Substring(0, max));
  480. }
  481. // ─────────────────────────────────────────────────────────────────────────────
  482. // Refresh 入参与结果 DTO
  483. // ─────────────────────────────────────────────────────────────────────────────
  484. public sealed class S5MdpRefreshOption
  485. {
  486. /// <summary>T8 账套(kc_tz_head.ztid);实测当前唯一账套为 pbxfxp。</summary>
  487. public string SourceZtid { get; set; } = "pbxfxp";
  488. /// <summary>日 T+1 KPI 的业务日期(默认昨天)。</summary>
  489. public DateTime BizDate { get; set; }
  490. /// <summary>月 M+1 KPI 的业务月 YYYY-MM(默认上月)。</summary>
  491. public string BizMonth { get; set; } = "";
  492. /// <summary>日 T+1 KPI 区间起(含),默认昨天 00:00。</summary>
  493. public DateTime DailyPeriodStart { get; set; }
  494. /// <summary>日 T+1 KPI 区间止(含),默认昨天 23:59:59。</summary>
  495. public DateTime DailyPeriodEnd { get; set; }
  496. /// <summary>月 M+1 KPI 区间起(含),默认上月 1 日。</summary>
  497. public DateTime MonthlyPeriodStart { get; set; }
  498. /// <summary>月 M+1 KPI 区间止(含),默认上月末日。</summary>
  499. public DateTime MonthlyPeriodEnd { get; set; }
  500. /// <summary>TVF Rep_总账_存货_V3 入参起期 YYYYMM。</summary>
  501. public string TvfPeriodStartYyyymm { get; set; } = "";
  502. /// <summary>TVF Rep_总账_存货_V3 入参止期 YYYYMM。</summary>
  503. public string TvfPeriodEndYyyymm { get; set; } = "";
  504. public static S5MdpRefreshOption Default()
  505. {
  506. var today = DateTime.Today;
  507. var yesterday = today.AddDays(-1);
  508. var lastMonth = today.AddMonths(-1);
  509. var monthStart = new DateTime(lastMonth.Year, lastMonth.Month, 1);
  510. var monthEnd = monthStart.AddMonths(1).AddDays(-1);
  511. return new S5MdpRefreshOption
  512. {
  513. SourceZtid = "pbxfxp",
  514. BizDate = yesterday,
  515. BizMonth = lastMonth.ToString("yyyy-MM"),
  516. DailyPeriodStart = yesterday,
  517. DailyPeriodEnd = yesterday.AddDays(1).AddSeconds(-1),
  518. MonthlyPeriodStart = monthStart,
  519. MonthlyPeriodEnd = monthEnd.AddDays(1).AddSeconds(-1),
  520. TvfPeriodStartYyyymm = monthStart.ToString("yyyyMM"),
  521. TvfPeriodEndYyyymm = monthEnd.ToString("yyyyMM")
  522. };
  523. }
  524. }
  525. public sealed class S5MdpSyncTransformResult
  526. {
  527. public string BatchId { get; set; } = "";
  528. public long RunLogId { get; set; }
  529. public string TriggerType { get; set; } = "AUTO";
  530. public string SourceZtid { get; set; } = "";
  531. public DateTime BizDate { get; set; }
  532. public string BizMonth { get; set; } = "";
  533. public DateTime DailyPeriodStart { get; set; }
  534. public DateTime DailyPeriodEnd { get; set; }
  535. public DateTime MonthlyPeriodStart { get; set; }
  536. public DateTime MonthlyPeriodEnd { get; set; }
  537. public int StageRows { get; set; }
  538. public int StandardRows { get; set; }
  539. public int DwdRows { get; set; }
  540. public int KpiRows { get; set; }
  541. public Dictionary<string, int> PerKpiDwdRows { get; } = new();
  542. public Dictionary<string, int> PerKpiKpiRows { get; } = new();
  543. public List<string> KpiDenominatorStatus { get; } = new();
  544. public void MergeSub(string kpiCode, KpiBuildSubResult sub)
  545. {
  546. PerKpiDwdRows[kpiCode] = sub.DwdRows;
  547. PerKpiKpiRows[kpiCode] = sub.KpiRows;
  548. DwdRows += sub.DwdRows;
  549. KpiRows += sub.KpiRows;
  550. KpiDenominatorStatus.Add($"{kpiCode}:{sub.DenominatorStatus}");
  551. }
  552. }
  553. public sealed class KpiBuildSubResult
  554. {
  555. public int T8Rows { get; set; }
  556. public int DwdRows { get; set; }
  557. public int KpiRows { get; set; }
  558. public string DenominatorStatus { get; set; } = "OK";
  559. }
  560. // ─────────────────────────────────────────────────────────────────────────────
  561. // T8 result set 投影类型(与方老师 SQL SELECT 列名严格一致;SqlSugar 映射)
  562. // ─────────────────────────────────────────────────────────────────────────────
  563. internal sealed class S5OnlineCycleRow
  564. {
  565. public string? code { get; set; }
  566. public DateTime? shtime { get; set; }
  567. }
  568. internal sealed class S5FulfillmentNumerRow
  569. {
  570. public string? lynoid { get; set; }
  571. public int codenum { get; set; }
  572. }
  573. internal sealed class S5FulfillmentDenomRow
  574. {
  575. public string? noid { get; set; }
  576. public int listnum { get; set; }
  577. }
  578. internal sealed class S5SumQtyRow
  579. {
  580. public decimal? slzx { get; set; }
  581. }
  582. internal sealed class S5CountRow
  583. {
  584. public int penum { get; set; }
  585. }
  586. internal sealed class S5InventoryTurnoverRow
  587. {
  588. public string? ckcode { get; set; }
  589. public string? ckname { get; set; }
  590. public string? code { get; set; }
  591. public string? cname { get; set; }
  592. public string? pcode { get; set; }
  593. public string? pname { get; set; }
  594. public decimal? je3 { get; set; }
  595. public decimal? je2 { get; set; }
  596. }