S8ActiveFlowWatchService.cs 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. using Admin.NET.Plugin.AiDOP.Entity.S8;
  2. using Admin.NET.Plugin.ApprovalFlow;
  3. using Microsoft.Extensions.Logging;
  4. using Microsoft.Extensions.Options;
  5. using SqlSugar;
  6. namespace Admin.NET.Plugin.AiDOP.Service.S8;
  7. /// <summary>
  8. /// 扫描疑似卡死的 ActiveFlow 异常,并输出可检索告警。
  9. /// 当前只做发现,不做自动恢复。
  10. /// </summary>
  11. public class S8ActiveFlowWatchService : ITransient
  12. {
  13. public const string AlertChannel = "s8-active-flow-stuck";
  14. public const string AlertChannelOrphan = "s8-orphan-flow-instance";
  15. public static readonly string[] S8FlowBizTypes = new[] { "EXCEPTION_ESCALATION", "EXCEPTION_CLOSURE" };
  16. private readonly SqlSugarRepository<AdoS8Exception> _exceptionRep;
  17. private readonly SqlSugarRepository<AdoS8NotificationLog> _notificationLogRep;
  18. private readonly SqlSugarRepository<ApprovalFlowInstance> _flowInstanceRep;
  19. private readonly S8NotificationService _notificationService;
  20. private readonly S8ActiveFlowWatchOptions _options;
  21. private readonly ILogger<S8ActiveFlowWatchService> _logger;
  22. public S8ActiveFlowWatchService(
  23. SqlSugarRepository<AdoS8Exception> exceptionRep,
  24. SqlSugarRepository<AdoS8NotificationLog> notificationLogRep,
  25. SqlSugarRepository<ApprovalFlowInstance> flowInstanceRep,
  26. S8NotificationService notificationService,
  27. IOptions<S8ActiveFlowWatchOptions> options,
  28. ILogger<S8ActiveFlowWatchService> logger)
  29. {
  30. _exceptionRep = exceptionRep;
  31. _notificationLogRep = notificationLogRep;
  32. _flowInstanceRep = flowInstanceRep;
  33. _notificationService = notificationService;
  34. _options = options.Value;
  35. _logger = logger;
  36. }
  37. public async Task<int> ScanAsync(long tenantId, long factoryId, CancellationToken cancellationToken = default)
  38. {
  39. if (!_options.Enabled)
  40. return 0;
  41. var now = DateTime.Now;
  42. var thresholdHours = _options.ThresholdHours > 0 ? _options.ThresholdHours : 4;
  43. var cooldownMinutes = _options.AlertCooldownMinutes > 0 ? _options.AlertCooldownMinutes : 60;
  44. var batchSize = _options.BatchSize > 0 ? _options.BatchSize : 100;
  45. var staleBefore = now.AddHours(-thresholdHours);
  46. var alertedAfter = now.AddMinutes(-cooldownMinutes);
  47. var staleExceptions = await _exceptionRep.AsQueryable()
  48. .Where(x => !x.IsDeleted && x.ActiveFlowInstanceId.HasValue &&
  49. x.TenantId == tenantId && x.FactoryId == factoryId &&
  50. SqlFunc.IsNull(x.UpdatedAt, x.CreatedAt) < staleBefore)
  51. .OrderBy(x => x.UpdatedAt, OrderByType.Asc)
  52. .OrderBy(x => x.Id, OrderByType.Asc)
  53. .Take(batchSize)
  54. .ToListAsync();
  55. if (staleExceptions.Count == 0)
  56. return 0;
  57. var exceptionIds = staleExceptions.Select(x => x.Id).ToHashSet();
  58. var recentAlertLogs = await _notificationLogRep.AsQueryable()
  59. .Where(x => x.TenantId == tenantId && x.FactoryId == factoryId &&
  60. x.Channel == AlertChannel && x.CreatedAt >= alertedAfter && x.ExceptionId != null)
  61. .ToListAsync();
  62. var alertedIds = recentAlertLogs
  63. .Where(x => x.ExceptionId.HasValue && exceptionIds.Contains(x.ExceptionId.Value))
  64. .Select(x => x.ExceptionId!.Value)
  65. .ToHashSet();
  66. var alertCount = 0;
  67. foreach (var item in staleExceptions.Where(x => !alertedIds.Contains(x.Id)))
  68. {
  69. cancellationToken.ThrowIfCancellationRequested();
  70. var lastTouchedAt = item.UpdatedAt ?? item.CreatedAt;
  71. var payload = new
  72. {
  73. type = "S8_ACTIVE_FLOW_STUCK",
  74. message = "S8 异常存在进行中的审批实例,但已超过阈值未更新,请人工检查审批流状态与回调链路",
  75. exceptionId = item.Id,
  76. exceptionCode = item.ExceptionCode,
  77. title = item.Title,
  78. status = item.Status,
  79. activeFlowInstanceId = item.ActiveFlowInstanceId,
  80. activeFlowBizType = item.ActiveFlowBizType,
  81. tenantId = item.TenantId,
  82. factoryId = item.FactoryId,
  83. lastTouchedAt,
  84. thresholdHours,
  85. detectedAt = now
  86. };
  87. try
  88. {
  89. await _notificationService.SendAsync(item.TenantId, item.FactoryId, item.Id, AlertChannel, payload);
  90. _logger.LogWarning(
  91. "S8 ActiveFlow 疑似卡死: ExceptionId={ExceptionId}, Code={ExceptionCode}, Status={Status}, FlowInstanceId={FlowInstanceId}, LastTouchedAt={LastTouchedAt}, ThresholdHours={ThresholdHours}",
  92. item.Id,
  93. item.ExceptionCode,
  94. item.Status,
  95. item.ActiveFlowInstanceId,
  96. lastTouchedAt,
  97. thresholdHours);
  98. alertCount++;
  99. }
  100. catch (Exception ex)
  101. {
  102. _logger.LogError(
  103. ex,
  104. "S8 ActiveFlow 卡死告警写入失败: ExceptionId={ExceptionId}, Code={ExceptionCode}, FlowInstanceId={FlowInstanceId}",
  105. item.Id,
  106. item.ExceptionCode,
  107. item.ActiveFlowInstanceId);
  108. }
  109. }
  110. if (alertCount > 0)
  111. {
  112. _logger.LogInformation(
  113. "S8 ActiveFlow 卡死扫描完成,本轮新增告警 {AlertCount} 条,候选 {CandidateCount} 条",
  114. alertCount,
  115. staleExceptions.Count);
  116. }
  117. var orphanAlertCount = await ScanOrphanFlowInstancesAsync(staleBefore, alertedAfter, batchSize, now, thresholdHours, cancellationToken);
  118. return alertCount + orphanAlertCount;
  119. }
  120. /// <summary>
  121. /// N-2:扫描孤立的 S8 审批实例(FlowInstance 存在但无任何 AdoS8Exception 引用),疑似 OnFlowStarted 失败或回调链路丢失。
  122. /// </summary>
  123. private async Task<int> ScanOrphanFlowInstancesAsync(
  124. DateTime staleBefore,
  125. DateTime alertedAfter,
  126. int batchSize,
  127. DateTime now,
  128. int thresholdHours,
  129. CancellationToken cancellationToken)
  130. {
  131. // 本地副本:SqlSugar 表达式树不允许直接引用静态/私有字段,必须先拷到 lambda 闭包变量。
  132. var bizTypes = S8FlowBizTypes;
  133. var candidates = await _flowInstanceRep.AsQueryable()
  134. .Where(fi => bizTypes.Contains(fi.BizType)
  135. && fi.Status == FlowInstanceStatusEnum.Running
  136. && fi.StartTime < staleBefore)
  137. .OrderBy(fi => fi.StartTime, OrderByType.Asc)
  138. .Take(batchSize)
  139. .ToListAsync();
  140. if (candidates.Count == 0)
  141. return 0;
  142. var bizIds = candidates.Select(fi => fi.BizId).ToHashSet();
  143. var instanceIds = candidates.Select(fi => fi.Id).ToHashSet();
  144. var linked = await _exceptionRep.AsQueryable()
  145. .Where(e => !e.IsDeleted
  146. && bizIds.Contains(e.Id)
  147. && e.ActiveFlowInstanceId.HasValue
  148. && instanceIds.Contains(e.ActiveFlowInstanceId!.Value))
  149. .Select(e => new { e.Id, FlowId = e.ActiveFlowInstanceId!.Value })
  150. .ToListAsync();
  151. var linkedPairs = linked.Select(x => (x.Id, x.FlowId)).ToHashSet();
  152. var orphans = candidates.Where(fi => !linkedPairs.Contains((fi.BizId, fi.Id))).ToList();
  153. if (orphans.Count == 0)
  154. return 0;
  155. var recentPayloads = await _notificationLogRep.AsQueryable()
  156. .Where(x => x.Channel == AlertChannelOrphan && x.CreatedAt >= alertedAfter)
  157. .Select(x => x.Payload)
  158. .ToListAsync();
  159. var alertedInstanceIds = new HashSet<long>();
  160. foreach (var p in recentPayloads)
  161. {
  162. var id = TryExtractFlowInstanceId(p);
  163. if (id.HasValue) alertedInstanceIds.Add(id.Value);
  164. }
  165. var alertCount = 0;
  166. foreach (var fi in orphans.Where(x => !alertedInstanceIds.Contains(x.Id)))
  167. {
  168. cancellationToken.ThrowIfCancellationRequested();
  169. var payload = new
  170. {
  171. type = "S8_ORPHAN_FLOW_INSTANCE",
  172. message = "S8 审批实例存在但未被任何业务异常单据引用,疑似 OnFlowStarted 失败或回调链路丢失",
  173. flowInstanceId = fi.Id,
  174. bizType = fi.BizType,
  175. bizId = fi.BizId,
  176. bizNo = fi.BizNo,
  177. flowStatus = fi.Status.ToString(),
  178. startTime = fi.StartTime,
  179. thresholdHours,
  180. detectedAt = now
  181. };
  182. try
  183. {
  184. await _notificationService.SendAsync(0, 0, null, AlertChannelOrphan, payload);
  185. _logger.LogWarning(
  186. "S8 孤立审批实例: FlowInstanceId={InstanceId}, BizType={BizType}, BizId={BizId}, StartTime={StartTime}",
  187. fi.Id, fi.BizType, fi.BizId, fi.StartTime);
  188. alertCount++;
  189. }
  190. catch (Exception ex)
  191. {
  192. _logger.LogError(ex,
  193. "S8 孤立审批实例告警写入失败: FlowInstanceId={InstanceId}, BizType={BizType}, BizId={BizId}",
  194. fi.Id, fi.BizType, fi.BizId);
  195. }
  196. }
  197. if (alertCount > 0)
  198. {
  199. _logger.LogInformation(
  200. "S8 孤立审批实例扫描完成,本轮新增告警 {AlertCount} 条,候选 {CandidateCount} 条",
  201. alertCount, orphans.Count);
  202. }
  203. return alertCount;
  204. }
  205. /// <summary>
  206. /// 从历史告警 payload JSON 中提取 flowInstanceId 用于冷却去重。失败返回 null。
  207. /// </summary>
  208. internal static long? TryExtractFlowInstanceId(string? payload)
  209. {
  210. if (string.IsNullOrWhiteSpace(payload))
  211. return null;
  212. try
  213. {
  214. using var doc = System.Text.Json.JsonDocument.Parse(payload);
  215. if (doc.RootElement.TryGetProperty("flowInstanceId", out var prop)
  216. && prop.ValueKind == System.Text.Json.JsonValueKind.Number
  217. && prop.TryGetInt64(out var id))
  218. {
  219. return id;
  220. }
  221. }
  222. catch
  223. {
  224. // payload 非法 JSON 时忽略,不影响后续告警。
  225. }
  226. return null;
  227. }
  228. }