using Admin.NET.Plugin.AiDOP.Entity.S8; using Admin.NET.Plugin.ApprovalFlow; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using SqlSugar; namespace Admin.NET.Plugin.AiDOP.Service.S8; /// /// 扫描疑似卡死的 ActiveFlow 异常,并输出可检索告警。 /// 当前只做发现,不做自动恢复。 /// public class S8ActiveFlowWatchService : ITransient { public const string AlertChannel = "s8-active-flow-stuck"; public const string AlertChannelOrphan = "s8-orphan-flow-instance"; public static readonly string[] S8FlowBizTypes = new[] { "EXCEPTION_ESCALATION", "EXCEPTION_CLOSURE" }; private readonly SqlSugarRepository _exceptionRep; private readonly SqlSugarRepository _notificationLogRep; private readonly SqlSugarRepository _flowInstanceRep; private readonly S8NotificationService _notificationService; private readonly S8ActiveFlowWatchOptions _options; private readonly ILogger _logger; public S8ActiveFlowWatchService( SqlSugarRepository exceptionRep, SqlSugarRepository notificationLogRep, SqlSugarRepository flowInstanceRep, S8NotificationService notificationService, IOptions options, ILogger logger) { _exceptionRep = exceptionRep; _notificationLogRep = notificationLogRep; _flowInstanceRep = flowInstanceRep; _notificationService = notificationService; _options = options.Value; _logger = logger; } public async Task ScanAsync(CancellationToken cancellationToken = default) { if (!_options.Enabled) return 0; var now = DateTime.Now; var thresholdHours = _options.ThresholdHours > 0 ? _options.ThresholdHours : 4; var cooldownMinutes = _options.AlertCooldownMinutes > 0 ? _options.AlertCooldownMinutes : 60; var batchSize = _options.BatchSize > 0 ? _options.BatchSize : 100; var staleBefore = now.AddHours(-thresholdHours); var alertedAfter = now.AddMinutes(-cooldownMinutes); var staleExceptions = await _exceptionRep.AsQueryable() .Where(x => !x.IsDeleted && x.ActiveFlowInstanceId.HasValue && SqlFunc.IsNull(x.UpdatedAt, x.CreatedAt) < staleBefore) .OrderBy(x => x.UpdatedAt, OrderByType.Asc) .OrderBy(x => x.Id, OrderByType.Asc) .Take(batchSize) .ToListAsync(); if (staleExceptions.Count == 0) return 0; var exceptionIds = staleExceptions.Select(x => x.Id).ToHashSet(); var recentAlertLogs = await _notificationLogRep.AsQueryable() .Where(x => x.Channel == AlertChannel && x.CreatedAt >= alertedAfter && x.ExceptionId != null) .ToListAsync(); var alertedIds = recentAlertLogs .Where(x => x.ExceptionId.HasValue && exceptionIds.Contains(x.ExceptionId.Value)) .Select(x => x.ExceptionId!.Value) .ToHashSet(); var alertCount = 0; foreach (var item in staleExceptions.Where(x => !alertedIds.Contains(x.Id))) { cancellationToken.ThrowIfCancellationRequested(); var lastTouchedAt = item.UpdatedAt ?? item.CreatedAt; var payload = new { type = "S8_ACTIVE_FLOW_STUCK", message = "S8 异常存在进行中的审批实例,但已超过阈值未更新,请人工检查审批流状态与回调链路", exceptionId = item.Id, exceptionCode = item.ExceptionCode, title = item.Title, status = item.Status, activeFlowInstanceId = item.ActiveFlowInstanceId, activeFlowBizType = item.ActiveFlowBizType, tenantId = item.TenantId, factoryId = item.FactoryId, lastTouchedAt, thresholdHours, detectedAt = now }; try { await _notificationService.SendAsync(item.TenantId, item.FactoryId, item.Id, AlertChannel, payload); _logger.LogWarning( "S8 ActiveFlow 疑似卡死: ExceptionId={ExceptionId}, Code={ExceptionCode}, Status={Status}, FlowInstanceId={FlowInstanceId}, LastTouchedAt={LastTouchedAt}, ThresholdHours={ThresholdHours}", item.Id, item.ExceptionCode, item.Status, item.ActiveFlowInstanceId, lastTouchedAt, thresholdHours); alertCount++; } catch (Exception ex) { _logger.LogError( ex, "S8 ActiveFlow 卡死告警写入失败: ExceptionId={ExceptionId}, Code={ExceptionCode}, FlowInstanceId={FlowInstanceId}", item.Id, item.ExceptionCode, item.ActiveFlowInstanceId); } } if (alertCount > 0) { _logger.LogInformation( "S8 ActiveFlow 卡死扫描完成,本轮新增告警 {AlertCount} 条,候选 {CandidateCount} 条", alertCount, staleExceptions.Count); } var orphanAlertCount = await ScanOrphanFlowInstancesAsync(staleBefore, alertedAfter, batchSize, now, thresholdHours, cancellationToken); return alertCount + orphanAlertCount; } /// /// N-2:扫描孤立的 S8 审批实例(FlowInstance 存在但无任何 AdoS8Exception 引用),疑似 OnFlowStarted 失败或回调链路丢失。 /// private async Task ScanOrphanFlowInstancesAsync( DateTime staleBefore, DateTime alertedAfter, int batchSize, DateTime now, int thresholdHours, CancellationToken cancellationToken) { // 本地副本:SqlSugar 表达式树不允许直接引用静态/私有字段,必须先拷到 lambda 闭包变量。 var bizTypes = S8FlowBizTypes; var candidates = await _flowInstanceRep.AsQueryable() .Where(fi => bizTypes.Contains(fi.BizType) && fi.Status == FlowInstanceStatusEnum.Running && fi.StartTime < staleBefore) .OrderBy(fi => fi.StartTime, OrderByType.Asc) .Take(batchSize) .ToListAsync(); if (candidates.Count == 0) return 0; var bizIds = candidates.Select(fi => fi.BizId).ToHashSet(); var instanceIds = candidates.Select(fi => fi.Id).ToHashSet(); var linked = await _exceptionRep.AsQueryable() .Where(e => !e.IsDeleted && bizIds.Contains(e.Id) && e.ActiveFlowInstanceId.HasValue && instanceIds.Contains(e.ActiveFlowInstanceId!.Value)) .Select(e => new { e.Id, FlowId = e.ActiveFlowInstanceId!.Value }) .ToListAsync(); var linkedPairs = linked.Select(x => (x.Id, x.FlowId)).ToHashSet(); var orphans = candidates.Where(fi => !linkedPairs.Contains((fi.BizId, fi.Id))).ToList(); if (orphans.Count == 0) return 0; var recentPayloads = await _notificationLogRep.AsQueryable() .Where(x => x.Channel == AlertChannelOrphan && x.CreatedAt >= alertedAfter) .Select(x => x.Payload) .ToListAsync(); var alertedInstanceIds = new HashSet(); foreach (var p in recentPayloads) { var id = TryExtractFlowInstanceId(p); if (id.HasValue) alertedInstanceIds.Add(id.Value); } var alertCount = 0; foreach (var fi in orphans.Where(x => !alertedInstanceIds.Contains(x.Id))) { cancellationToken.ThrowIfCancellationRequested(); var payload = new { type = "S8_ORPHAN_FLOW_INSTANCE", message = "S8 审批实例存在但未被任何业务异常单据引用,疑似 OnFlowStarted 失败或回调链路丢失", flowInstanceId = fi.Id, bizType = fi.BizType, bizId = fi.BizId, bizNo = fi.BizNo, flowStatus = fi.Status.ToString(), startTime = fi.StartTime, thresholdHours, detectedAt = now }; try { await _notificationService.SendAsync(0, 0, null, AlertChannelOrphan, payload); _logger.LogWarning( "S8 孤立审批实例: FlowInstanceId={InstanceId}, BizType={BizType}, BizId={BizId}, StartTime={StartTime}", fi.Id, fi.BizType, fi.BizId, fi.StartTime); alertCount++; } catch (Exception ex) { _logger.LogError(ex, "S8 孤立审批实例告警写入失败: FlowInstanceId={InstanceId}, BizType={BizType}, BizId={BizId}", fi.Id, fi.BizType, fi.BizId); } } if (alertCount > 0) { _logger.LogInformation( "S8 孤立审批实例扫描完成,本轮新增告警 {AlertCount} 条,候选 {CandidateCount} 条", alertCount, orphans.Count); } return alertCount; } /// /// 从历史告警 payload JSON 中提取 flowInstanceId 用于冷却去重。失败返回 null。 /// internal static long? TryExtractFlowInstanceId(string? payload) { if (string.IsNullOrWhiteSpace(payload)) return null; try { using var doc = System.Text.Json.JsonDocument.Parse(payload); if (doc.RootElement.TryGetProperty("flowInstanceId", out var prop) && prop.ValueKind == System.Text.Json.JsonValueKind.Number && prop.TryGetInt64(out var id)) { return id; } } catch { // payload 非法 JSON 时忽略,不影响后续告警。 } return null; } }