S8ActiveFlowWatchService.cs 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. using Admin.NET.Plugin.AiDOP.Entity.S8;
  2. using Microsoft.Extensions.Logging;
  3. using Microsoft.Extensions.Options;
  4. using SqlSugar;
  5. namespace Admin.NET.Plugin.AiDOP.Service.S8;
  6. /// <summary>
  7. /// 扫描疑似卡死的 ActiveFlow 异常,并输出可检索告警。
  8. /// 当前只做发现,不做自动恢复。
  9. /// </summary>
  10. public class S8ActiveFlowWatchService : ITransient
  11. {
  12. public const string AlertChannel = "s8-active-flow-stuck";
  13. private readonly SqlSugarRepository<AdoS8Exception> _exceptionRep;
  14. private readonly SqlSugarRepository<AdoS8NotificationLog> _notificationLogRep;
  15. private readonly S8NotificationService _notificationService;
  16. private readonly S8ActiveFlowWatchOptions _options;
  17. private readonly ILogger<S8ActiveFlowWatchService> _logger;
  18. public S8ActiveFlowWatchService(
  19. SqlSugarRepository<AdoS8Exception> exceptionRep,
  20. SqlSugarRepository<AdoS8NotificationLog> notificationLogRep,
  21. S8NotificationService notificationService,
  22. IOptions<S8ActiveFlowWatchOptions> options,
  23. ILogger<S8ActiveFlowWatchService> logger)
  24. {
  25. _exceptionRep = exceptionRep;
  26. _notificationLogRep = notificationLogRep;
  27. _notificationService = notificationService;
  28. _options = options.Value;
  29. _logger = logger;
  30. }
  31. public async Task<int> ScanAsync(CancellationToken cancellationToken = default)
  32. {
  33. if (!_options.Enabled)
  34. return 0;
  35. var now = DateTime.Now;
  36. var thresholdHours = _options.ThresholdHours > 0 ? _options.ThresholdHours : 4;
  37. var cooldownMinutes = _options.AlertCooldownMinutes > 0 ? _options.AlertCooldownMinutes : 60;
  38. var batchSize = _options.BatchSize > 0 ? _options.BatchSize : 100;
  39. var staleBefore = now.AddHours(-thresholdHours);
  40. var alertedAfter = now.AddMinutes(-cooldownMinutes);
  41. var staleExceptions = await _exceptionRep.AsQueryable()
  42. .Where(x => !x.IsDeleted && x.ActiveFlowInstanceId.HasValue &&
  43. SqlFunc.IsNull(x.UpdatedAt, x.CreatedAt) < staleBefore)
  44. .OrderBy(x => x.UpdatedAt, OrderByType.Asc)
  45. .OrderBy(x => x.Id, OrderByType.Asc)
  46. .Take(batchSize)
  47. .ToListAsync();
  48. if (staleExceptions.Count == 0)
  49. return 0;
  50. var exceptionIds = staleExceptions.Select(x => x.Id).ToHashSet();
  51. var recentAlertLogs = await _notificationLogRep.AsQueryable()
  52. .Where(x => x.Channel == AlertChannel && x.CreatedAt >= alertedAfter && x.ExceptionId != null)
  53. .ToListAsync();
  54. var alertedIds = recentAlertLogs
  55. .Where(x => x.ExceptionId.HasValue && exceptionIds.Contains(x.ExceptionId.Value))
  56. .Select(x => x.ExceptionId!.Value)
  57. .ToHashSet();
  58. var alertCount = 0;
  59. foreach (var item in staleExceptions.Where(x => !alertedIds.Contains(x.Id)))
  60. {
  61. cancellationToken.ThrowIfCancellationRequested();
  62. var lastTouchedAt = item.UpdatedAt ?? item.CreatedAt;
  63. var payload = new
  64. {
  65. type = "S8_ACTIVE_FLOW_STUCK",
  66. message = "S8 异常存在进行中的审批实例,但已超过阈值未更新,请人工检查审批流状态与回调链路",
  67. exceptionId = item.Id,
  68. exceptionCode = item.ExceptionCode,
  69. title = item.Title,
  70. status = item.Status,
  71. activeFlowInstanceId = item.ActiveFlowInstanceId,
  72. activeFlowBizType = item.ActiveFlowBizType,
  73. tenantId = item.TenantId,
  74. factoryId = item.FactoryId,
  75. lastTouchedAt,
  76. thresholdHours,
  77. detectedAt = now
  78. };
  79. try
  80. {
  81. await _notificationService.SendAsync(item.TenantId, item.FactoryId, item.Id, AlertChannel, payload);
  82. _logger.LogWarning(
  83. "S8 ActiveFlow 疑似卡死: ExceptionId={ExceptionId}, Code={ExceptionCode}, Status={Status}, FlowInstanceId={FlowInstanceId}, LastTouchedAt={LastTouchedAt}, ThresholdHours={ThresholdHours}",
  84. item.Id,
  85. item.ExceptionCode,
  86. item.Status,
  87. item.ActiveFlowInstanceId,
  88. lastTouchedAt,
  89. thresholdHours);
  90. alertCount++;
  91. }
  92. catch (Exception ex)
  93. {
  94. _logger.LogError(
  95. ex,
  96. "S8 ActiveFlow 卡死告警写入失败: ExceptionId={ExceptionId}, Code={ExceptionCode}, FlowInstanceId={FlowInstanceId}",
  97. item.Id,
  98. item.ExceptionCode,
  99. item.ActiveFlowInstanceId);
  100. }
  101. }
  102. if (alertCount > 0)
  103. {
  104. _logger.LogInformation(
  105. "S8 ActiveFlow 卡死扫描完成,本轮新增告警 {AlertCount} 条,候选 {CandidateCount} 条",
  106. alertCount,
  107. staleExceptions.Count);
  108. }
  109. return alertCount;
  110. }
  111. }