using Admin.NET.Plugin.AiDOP.Entity.S8; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using SqlSugar; namespace Admin.NET.Plugin.AiDOP.Service.S8; /// /// 扫描疑似卡死的 ActiveFlow 异常,并输出可检索告警。 /// 当前只做发现,不做自动恢复。 /// public class S8ActiveFlowWatchService : ITransient { public const string AlertChannel = "s8-active-flow-stuck"; private readonly SqlSugarRepository _exceptionRep; private readonly SqlSugarRepository _notificationLogRep; private readonly S8NotificationService _notificationService; private readonly S8ActiveFlowWatchOptions _options; private readonly ILogger _logger; public S8ActiveFlowWatchService( SqlSugarRepository exceptionRep, SqlSugarRepository notificationLogRep, S8NotificationService notificationService, IOptions options, ILogger logger) { _exceptionRep = exceptionRep; _notificationLogRep = notificationLogRep; _notificationService = notificationService; _options = options.Value; _logger = logger; } public async Task ScanAsync(CancellationToken cancellationToken = default) { if (!_options.Enabled) return 0; var now = DateTime.Now; var thresholdHours = _options.ThresholdHours > 0 ? _options.ThresholdHours : 4; var cooldownMinutes = _options.AlertCooldownMinutes > 0 ? _options.AlertCooldownMinutes : 60; var batchSize = _options.BatchSize > 0 ? _options.BatchSize : 100; var staleBefore = now.AddHours(-thresholdHours); var alertedAfter = now.AddMinutes(-cooldownMinutes); var staleExceptions = await _exceptionRep.AsQueryable() .Where(x => !x.IsDeleted && x.ActiveFlowInstanceId.HasValue && SqlFunc.IsNull(x.UpdatedAt, x.CreatedAt) < staleBefore) .OrderBy(x => x.UpdatedAt, OrderByType.Asc) .OrderBy(x => x.Id, OrderByType.Asc) .Take(batchSize) .ToListAsync(); if (staleExceptions.Count == 0) return 0; var exceptionIds = staleExceptions.Select(x => x.Id).ToHashSet(); var recentAlertLogs = await _notificationLogRep.AsQueryable() .Where(x => x.Channel == AlertChannel && x.CreatedAt >= alertedAfter && x.ExceptionId != null) .ToListAsync(); var alertedIds = recentAlertLogs .Where(x => x.ExceptionId.HasValue && exceptionIds.Contains(x.ExceptionId.Value)) .Select(x => x.ExceptionId!.Value) .ToHashSet(); var alertCount = 0; foreach (var item in staleExceptions.Where(x => !alertedIds.Contains(x.Id))) { cancellationToken.ThrowIfCancellationRequested(); var lastTouchedAt = item.UpdatedAt ?? item.CreatedAt; var payload = new { type = "S8_ACTIVE_FLOW_STUCK", message = "S8 异常存在进行中的审批实例,但已超过阈值未更新,请人工检查审批流状态与回调链路", exceptionId = item.Id, exceptionCode = item.ExceptionCode, title = item.Title, status = item.Status, activeFlowInstanceId = item.ActiveFlowInstanceId, activeFlowBizType = item.ActiveFlowBizType, tenantId = item.TenantId, factoryId = item.FactoryId, lastTouchedAt, thresholdHours, detectedAt = now }; try { await _notificationService.SendAsync(item.TenantId, item.FactoryId, item.Id, AlertChannel, payload); _logger.LogWarning( "S8 ActiveFlow 疑似卡死: ExceptionId={ExceptionId}, Code={ExceptionCode}, Status={Status}, FlowInstanceId={FlowInstanceId}, LastTouchedAt={LastTouchedAt}, ThresholdHours={ThresholdHours}", item.Id, item.ExceptionCode, item.Status, item.ActiveFlowInstanceId, lastTouchedAt, thresholdHours); alertCount++; } catch (Exception ex) { _logger.LogError( ex, "S8 ActiveFlow 卡死告警写入失败: ExceptionId={ExceptionId}, Code={ExceptionCode}, FlowInstanceId={FlowInstanceId}", item.Id, item.ExceptionCode, item.ActiveFlowInstanceId); } } if (alertCount > 0) { _logger.LogInformation( "S8 ActiveFlow 卡死扫描完成,本轮新增告警 {AlertCount} 条,候选 {CandidateCount} 条", alertCount, staleExceptions.Count); } return alertCount; } }