S8ActiveFlowWatchService.cs 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. using Admin.NET.Plugin.AiDOP.Entity.S8;
  2. using Admin.NET.Plugin.ApprovalFlow;
  3. using Microsoft.Extensions.Logging;
  4. using Microsoft.Extensions.Options;
  5. using SqlSugar;
  6. namespace Admin.NET.Plugin.AiDOP.Service.S8;
  7. /// <summary>
  8. /// 扫描疑似卡死的 ActiveFlow 异常,并输出可检索告警。
  9. /// 当前只做发现,不做自动恢复。
  10. /// </summary>
  11. public class S8ActiveFlowWatchService : ITransient
  12. {
  13. public const string AlertChannel = "s8-active-flow-stuck";
  14. public const string AlertChannelOrphan = "s8-orphan-flow-instance";
  15. public static readonly string[] S8FlowBizTypes = new[] { "EXCEPTION_ESCALATION", "EXCEPTION_CLOSURE" };
  16. private readonly SqlSugarRepository<AdoS8Exception> _exceptionRep;
  17. private readonly SqlSugarRepository<AdoS8NotificationLog> _notificationLogRep;
  18. private readonly SqlSugarRepository<ApprovalFlowInstance> _flowInstanceRep;
  19. private readonly S8NotificationService _notificationService;
  20. private readonly S8ActiveFlowWatchOptions _options;
  21. private readonly ILogger<S8ActiveFlowWatchService> _logger;
  22. public S8ActiveFlowWatchService(
  23. SqlSugarRepository<AdoS8Exception> exceptionRep,
  24. SqlSugarRepository<AdoS8NotificationLog> notificationLogRep,
  25. SqlSugarRepository<ApprovalFlowInstance> flowInstanceRep,
  26. S8NotificationService notificationService,
  27. IOptions<S8ActiveFlowWatchOptions> options,
  28. ILogger<S8ActiveFlowWatchService> logger)
  29. {
  30. _exceptionRep = exceptionRep;
  31. _notificationLogRep = notificationLogRep;
  32. _flowInstanceRep = flowInstanceRep;
  33. _notificationService = notificationService;
  34. _options = options.Value;
  35. _logger = logger;
  36. }
  37. public async Task<int> ScanAsync(CancellationToken cancellationToken = default)
  38. {
  39. if (!_options.Enabled)
  40. return 0;
  41. var now = DateTime.Now;
  42. var thresholdHours = _options.ThresholdHours > 0 ? _options.ThresholdHours : 4;
  43. var cooldownMinutes = _options.AlertCooldownMinutes > 0 ? _options.AlertCooldownMinutes : 60;
  44. var batchSize = _options.BatchSize > 0 ? _options.BatchSize : 100;
  45. var staleBefore = now.AddHours(-thresholdHours);
  46. var alertedAfter = now.AddMinutes(-cooldownMinutes);
  47. var staleExceptions = await _exceptionRep.AsQueryable()
  48. .Where(x => !x.IsDeleted && x.ActiveFlowInstanceId.HasValue &&
  49. SqlFunc.IsNull(x.UpdatedAt, x.CreatedAt) < staleBefore)
  50. .OrderBy(x => x.UpdatedAt, OrderByType.Asc)
  51. .OrderBy(x => x.Id, OrderByType.Asc)
  52. .Take(batchSize)
  53. .ToListAsync();
  54. if (staleExceptions.Count == 0)
  55. return 0;
  56. var exceptionIds = staleExceptions.Select(x => x.Id).ToHashSet();
  57. var recentAlertLogs = await _notificationLogRep.AsQueryable()
  58. .Where(x => x.Channel == AlertChannel && x.CreatedAt >= alertedAfter && x.ExceptionId != null)
  59. .ToListAsync();
  60. var alertedIds = recentAlertLogs
  61. .Where(x => x.ExceptionId.HasValue && exceptionIds.Contains(x.ExceptionId.Value))
  62. .Select(x => x.ExceptionId!.Value)
  63. .ToHashSet();
  64. var alertCount = 0;
  65. foreach (var item in staleExceptions.Where(x => !alertedIds.Contains(x.Id)))
  66. {
  67. cancellationToken.ThrowIfCancellationRequested();
  68. var lastTouchedAt = item.UpdatedAt ?? item.CreatedAt;
  69. var payload = new
  70. {
  71. type = "S8_ACTIVE_FLOW_STUCK",
  72. message = "S8 异常存在进行中的审批实例,但已超过阈值未更新,请人工检查审批流状态与回调链路",
  73. exceptionId = item.Id,
  74. exceptionCode = item.ExceptionCode,
  75. title = item.Title,
  76. status = item.Status,
  77. activeFlowInstanceId = item.ActiveFlowInstanceId,
  78. activeFlowBizType = item.ActiveFlowBizType,
  79. tenantId = item.TenantId,
  80. factoryId = item.FactoryId,
  81. lastTouchedAt,
  82. thresholdHours,
  83. detectedAt = now
  84. };
  85. try
  86. {
  87. await _notificationService.SendAsync(item.TenantId, item.FactoryId, item.Id, AlertChannel, payload);
  88. _logger.LogWarning(
  89. "S8 ActiveFlow 疑似卡死: ExceptionId={ExceptionId}, Code={ExceptionCode}, Status={Status}, FlowInstanceId={FlowInstanceId}, LastTouchedAt={LastTouchedAt}, ThresholdHours={ThresholdHours}",
  90. item.Id,
  91. item.ExceptionCode,
  92. item.Status,
  93. item.ActiveFlowInstanceId,
  94. lastTouchedAt,
  95. thresholdHours);
  96. alertCount++;
  97. }
  98. catch (Exception ex)
  99. {
  100. _logger.LogError(
  101. ex,
  102. "S8 ActiveFlow 卡死告警写入失败: ExceptionId={ExceptionId}, Code={ExceptionCode}, FlowInstanceId={FlowInstanceId}",
  103. item.Id,
  104. item.ExceptionCode,
  105. item.ActiveFlowInstanceId);
  106. }
  107. }
  108. if (alertCount > 0)
  109. {
  110. _logger.LogInformation(
  111. "S8 ActiveFlow 卡死扫描完成,本轮新增告警 {AlertCount} 条,候选 {CandidateCount} 条",
  112. alertCount,
  113. staleExceptions.Count);
  114. }
  115. var orphanAlertCount = await ScanOrphanFlowInstancesAsync(staleBefore, alertedAfter, batchSize, now, thresholdHours, cancellationToken);
  116. return alertCount + orphanAlertCount;
  117. }
  118. /// <summary>
  119. /// N-2:扫描孤立的 S8 审批实例(FlowInstance 存在但无任何 AdoS8Exception 引用),疑似 OnFlowStarted 失败或回调链路丢失。
  120. /// </summary>
  121. private async Task<int> ScanOrphanFlowInstancesAsync(
  122. DateTime staleBefore,
  123. DateTime alertedAfter,
  124. int batchSize,
  125. DateTime now,
  126. int thresholdHours,
  127. CancellationToken cancellationToken)
  128. {
  129. // 本地副本:SqlSugar 表达式树不允许直接引用静态/私有字段,必须先拷到 lambda 闭包变量。
  130. var bizTypes = S8FlowBizTypes;
  131. var candidates = await _flowInstanceRep.AsQueryable()
  132. .Where(fi => bizTypes.Contains(fi.BizType)
  133. && fi.Status == FlowInstanceStatusEnum.Running
  134. && fi.StartTime < staleBefore)
  135. .OrderBy(fi => fi.StartTime, OrderByType.Asc)
  136. .Take(batchSize)
  137. .ToListAsync();
  138. if (candidates.Count == 0)
  139. return 0;
  140. var bizIds = candidates.Select(fi => fi.BizId).ToHashSet();
  141. var instanceIds = candidates.Select(fi => fi.Id).ToHashSet();
  142. var linked = await _exceptionRep.AsQueryable()
  143. .Where(e => !e.IsDeleted
  144. && bizIds.Contains(e.Id)
  145. && e.ActiveFlowInstanceId.HasValue
  146. && instanceIds.Contains(e.ActiveFlowInstanceId!.Value))
  147. .Select(e => new { e.Id, FlowId = e.ActiveFlowInstanceId!.Value })
  148. .ToListAsync();
  149. var linkedPairs = linked.Select(x => (x.Id, x.FlowId)).ToHashSet();
  150. var orphans = candidates.Where(fi => !linkedPairs.Contains((fi.BizId, fi.Id))).ToList();
  151. if (orphans.Count == 0)
  152. return 0;
  153. var recentPayloads = await _notificationLogRep.AsQueryable()
  154. .Where(x => x.Channel == AlertChannelOrphan && x.CreatedAt >= alertedAfter)
  155. .Select(x => x.Payload)
  156. .ToListAsync();
  157. var alertedInstanceIds = new HashSet<long>();
  158. foreach (var p in recentPayloads)
  159. {
  160. var id = TryExtractFlowInstanceId(p);
  161. if (id.HasValue) alertedInstanceIds.Add(id.Value);
  162. }
  163. var alertCount = 0;
  164. foreach (var fi in orphans.Where(x => !alertedInstanceIds.Contains(x.Id)))
  165. {
  166. cancellationToken.ThrowIfCancellationRequested();
  167. var payload = new
  168. {
  169. type = "S8_ORPHAN_FLOW_INSTANCE",
  170. message = "S8 审批实例存在但未被任何业务异常单据引用,疑似 OnFlowStarted 失败或回调链路丢失",
  171. flowInstanceId = fi.Id,
  172. bizType = fi.BizType,
  173. bizId = fi.BizId,
  174. bizNo = fi.BizNo,
  175. flowStatus = fi.Status.ToString(),
  176. startTime = fi.StartTime,
  177. thresholdHours,
  178. detectedAt = now
  179. };
  180. try
  181. {
  182. await _notificationService.SendAsync(0, 0, null, AlertChannelOrphan, payload);
  183. _logger.LogWarning(
  184. "S8 孤立审批实例: FlowInstanceId={InstanceId}, BizType={BizType}, BizId={BizId}, StartTime={StartTime}",
  185. fi.Id, fi.BizType, fi.BizId, fi.StartTime);
  186. alertCount++;
  187. }
  188. catch (Exception ex)
  189. {
  190. _logger.LogError(ex,
  191. "S8 孤立审批实例告警写入失败: FlowInstanceId={InstanceId}, BizType={BizType}, BizId={BizId}",
  192. fi.Id, fi.BizType, fi.BizId);
  193. }
  194. }
  195. if (alertCount > 0)
  196. {
  197. _logger.LogInformation(
  198. "S8 孤立审批实例扫描完成,本轮新增告警 {AlertCount} 条,候选 {CandidateCount} 条",
  199. alertCount, orphans.Count);
  200. }
  201. return alertCount;
  202. }
  203. /// <summary>
  204. /// 从历史告警 payload JSON 中提取 flowInstanceId 用于冷却去重。失败返回 null。
  205. /// </summary>
  206. internal static long? TryExtractFlowInstanceId(string? payload)
  207. {
  208. if (string.IsNullOrWhiteSpace(payload))
  209. return null;
  210. try
  211. {
  212. using var doc = System.Text.Json.JsonDocument.Parse(payload);
  213. if (doc.RootElement.TryGetProperty("flowInstanceId", out var prop)
  214. && prop.ValueKind == System.Text.Json.JsonValueKind.Number
  215. && prop.TryGetInt64(out var id))
  216. {
  217. return id;
  218. }
  219. }
  220. catch
  221. {
  222. // payload 非法 JSON 时忽略,不影响后续告警。
  223. }
  224. return null;
  225. }
  226. }