| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254 |
- using Admin.NET.Plugin.AiDOP.Entity.S8;
- using Admin.NET.Plugin.ApprovalFlow;
- using Microsoft.Extensions.Logging;
- using Microsoft.Extensions.Options;
- using SqlSugar;
- namespace Admin.NET.Plugin.AiDOP.Service.S8;
- /// <summary>
- /// 扫描疑似卡死的 ActiveFlow 异常,并输出可检索告警。
- /// 当前只做发现,不做自动恢复。
- /// </summary>
- public class S8ActiveFlowWatchService : ITransient
- {
- public const string AlertChannel = "s8-active-flow-stuck";
- public const string AlertChannelOrphan = "s8-orphan-flow-instance";
- public static readonly string[] S8FlowBizTypes = new[] { "EXCEPTION_ESCALATION", "EXCEPTION_CLOSURE" };
- private readonly SqlSugarRepository<AdoS8Exception> _exceptionRep;
- private readonly SqlSugarRepository<AdoS8NotificationLog> _notificationLogRep;
- private readonly SqlSugarRepository<ApprovalFlowInstance> _flowInstanceRep;
- private readonly S8NotificationService _notificationService;
- private readonly S8ActiveFlowWatchOptions _options;
- private readonly ILogger<S8ActiveFlowWatchService> _logger;
- public S8ActiveFlowWatchService(
- SqlSugarRepository<AdoS8Exception> exceptionRep,
- SqlSugarRepository<AdoS8NotificationLog> notificationLogRep,
- SqlSugarRepository<ApprovalFlowInstance> flowInstanceRep,
- S8NotificationService notificationService,
- IOptions<S8ActiveFlowWatchOptions> options,
- ILogger<S8ActiveFlowWatchService> logger)
- {
- _exceptionRep = exceptionRep;
- _notificationLogRep = notificationLogRep;
- _flowInstanceRep = flowInstanceRep;
- _notificationService = notificationService;
- _options = options.Value;
- _logger = logger;
- }
- public async Task<int> ScanAsync(CancellationToken cancellationToken = default)
- {
- if (!_options.Enabled)
- return 0;
- var now = DateTime.Now;
- var thresholdHours = _options.ThresholdHours > 0 ? _options.ThresholdHours : 4;
- var cooldownMinutes = _options.AlertCooldownMinutes > 0 ? _options.AlertCooldownMinutes : 60;
- var batchSize = _options.BatchSize > 0 ? _options.BatchSize : 100;
- var staleBefore = now.AddHours(-thresholdHours);
- var alertedAfter = now.AddMinutes(-cooldownMinutes);
- var staleExceptions = await _exceptionRep.AsQueryable()
- .Where(x => !x.IsDeleted && x.ActiveFlowInstanceId.HasValue &&
- SqlFunc.IsNull(x.UpdatedAt, x.CreatedAt) < staleBefore)
- .OrderBy(x => x.UpdatedAt, OrderByType.Asc)
- .OrderBy(x => x.Id, OrderByType.Asc)
- .Take(batchSize)
- .ToListAsync();
- if (staleExceptions.Count == 0)
- return 0;
- var exceptionIds = staleExceptions.Select(x => x.Id).ToHashSet();
- var recentAlertLogs = await _notificationLogRep.AsQueryable()
- .Where(x => x.Channel == AlertChannel && x.CreatedAt >= alertedAfter && x.ExceptionId != null)
- .ToListAsync();
- var alertedIds = recentAlertLogs
- .Where(x => x.ExceptionId.HasValue && exceptionIds.Contains(x.ExceptionId.Value))
- .Select(x => x.ExceptionId!.Value)
- .ToHashSet();
- var alertCount = 0;
- foreach (var item in staleExceptions.Where(x => !alertedIds.Contains(x.Id)))
- {
- cancellationToken.ThrowIfCancellationRequested();
- var lastTouchedAt = item.UpdatedAt ?? item.CreatedAt;
- var payload = new
- {
- type = "S8_ACTIVE_FLOW_STUCK",
- message = "S8 异常存在进行中的审批实例,但已超过阈值未更新,请人工检查审批流状态与回调链路",
- exceptionId = item.Id,
- exceptionCode = item.ExceptionCode,
- title = item.Title,
- status = item.Status,
- activeFlowInstanceId = item.ActiveFlowInstanceId,
- activeFlowBizType = item.ActiveFlowBizType,
- tenantId = item.TenantId,
- factoryId = item.FactoryId,
- lastTouchedAt,
- thresholdHours,
- detectedAt = now
- };
- try
- {
- await _notificationService.SendAsync(item.TenantId, item.FactoryId, item.Id, AlertChannel, payload);
- _logger.LogWarning(
- "S8 ActiveFlow 疑似卡死: ExceptionId={ExceptionId}, Code={ExceptionCode}, Status={Status}, FlowInstanceId={FlowInstanceId}, LastTouchedAt={LastTouchedAt}, ThresholdHours={ThresholdHours}",
- item.Id,
- item.ExceptionCode,
- item.Status,
- item.ActiveFlowInstanceId,
- lastTouchedAt,
- thresholdHours);
- alertCount++;
- }
- catch (Exception ex)
- {
- _logger.LogError(
- ex,
- "S8 ActiveFlow 卡死告警写入失败: ExceptionId={ExceptionId}, Code={ExceptionCode}, FlowInstanceId={FlowInstanceId}",
- item.Id,
- item.ExceptionCode,
- item.ActiveFlowInstanceId);
- }
- }
- if (alertCount > 0)
- {
- _logger.LogInformation(
- "S8 ActiveFlow 卡死扫描完成,本轮新增告警 {AlertCount} 条,候选 {CandidateCount} 条",
- alertCount,
- staleExceptions.Count);
- }
- var orphanAlertCount = await ScanOrphanFlowInstancesAsync(staleBefore, alertedAfter, batchSize, now, thresholdHours, cancellationToken);
- return alertCount + orphanAlertCount;
- }
- /// <summary>
- /// N-2:扫描孤立的 S8 审批实例(FlowInstance 存在但无任何 AdoS8Exception 引用),疑似 OnFlowStarted 失败或回调链路丢失。
- /// </summary>
- private async Task<int> ScanOrphanFlowInstancesAsync(
- DateTime staleBefore,
- DateTime alertedAfter,
- int batchSize,
- DateTime now,
- int thresholdHours,
- CancellationToken cancellationToken)
- {
- // 本地副本:SqlSugar 表达式树不允许直接引用静态/私有字段,必须先拷到 lambda 闭包变量。
- var bizTypes = S8FlowBizTypes;
- var candidates = await _flowInstanceRep.AsQueryable()
- .Where(fi => bizTypes.Contains(fi.BizType)
- && fi.Status == FlowInstanceStatusEnum.Running
- && fi.StartTime < staleBefore)
- .OrderBy(fi => fi.StartTime, OrderByType.Asc)
- .Take(batchSize)
- .ToListAsync();
- if (candidates.Count == 0)
- return 0;
- var bizIds = candidates.Select(fi => fi.BizId).ToHashSet();
- var instanceIds = candidates.Select(fi => fi.Id).ToHashSet();
- var linked = await _exceptionRep.AsQueryable()
- .Where(e => !e.IsDeleted
- && bizIds.Contains(e.Id)
- && e.ActiveFlowInstanceId.HasValue
- && instanceIds.Contains(e.ActiveFlowInstanceId!.Value))
- .Select(e => new { e.Id, FlowId = e.ActiveFlowInstanceId!.Value })
- .ToListAsync();
- var linkedPairs = linked.Select(x => (x.Id, x.FlowId)).ToHashSet();
- var orphans = candidates.Where(fi => !linkedPairs.Contains((fi.BizId, fi.Id))).ToList();
- if (orphans.Count == 0)
- return 0;
- var recentPayloads = await _notificationLogRep.AsQueryable()
- .Where(x => x.Channel == AlertChannelOrphan && x.CreatedAt >= alertedAfter)
- .Select(x => x.Payload)
- .ToListAsync();
- var alertedInstanceIds = new HashSet<long>();
- foreach (var p in recentPayloads)
- {
- var id = TryExtractFlowInstanceId(p);
- if (id.HasValue) alertedInstanceIds.Add(id.Value);
- }
- var alertCount = 0;
- foreach (var fi in orphans.Where(x => !alertedInstanceIds.Contains(x.Id)))
- {
- cancellationToken.ThrowIfCancellationRequested();
- var payload = new
- {
- type = "S8_ORPHAN_FLOW_INSTANCE",
- message = "S8 审批实例存在但未被任何业务异常单据引用,疑似 OnFlowStarted 失败或回调链路丢失",
- flowInstanceId = fi.Id,
- bizType = fi.BizType,
- bizId = fi.BizId,
- bizNo = fi.BizNo,
- flowStatus = fi.Status.ToString(),
- startTime = fi.StartTime,
- thresholdHours,
- detectedAt = now
- };
- try
- {
- await _notificationService.SendAsync(0, 0, null, AlertChannelOrphan, payload);
- _logger.LogWarning(
- "S8 孤立审批实例: FlowInstanceId={InstanceId}, BizType={BizType}, BizId={BizId}, StartTime={StartTime}",
- fi.Id, fi.BizType, fi.BizId, fi.StartTime);
- alertCount++;
- }
- catch (Exception ex)
- {
- _logger.LogError(ex,
- "S8 孤立审批实例告警写入失败: FlowInstanceId={InstanceId}, BizType={BizType}, BizId={BizId}",
- fi.Id, fi.BizType, fi.BizId);
- }
- }
- if (alertCount > 0)
- {
- _logger.LogInformation(
- "S8 孤立审批实例扫描完成,本轮新增告警 {AlertCount} 条,候选 {CandidateCount} 条",
- alertCount, orphans.Count);
- }
- return alertCount;
- }
- /// <summary>
- /// 从历史告警 payload JSON 中提取 flowInstanceId 用于冷却去重。失败返回 null。
- /// </summary>
- internal static long? TryExtractFlowInstanceId(string? payload)
- {
- if (string.IsNullOrWhiteSpace(payload))
- return null;
- try
- {
- using var doc = System.Text.Json.JsonDocument.Parse(payload);
- if (doc.RootElement.TryGetProperty("flowInstanceId", out var prop)
- && prop.ValueKind == System.Text.Json.JsonValueKind.Number
- && prop.TryGetInt64(out var id))
- {
- return id;
- }
- }
- catch
- {
- // payload 非法 JSON 时忽略,不影响后续告警。
- }
- return null;
- }
- }
|