using Admin.NET.Plugin.AiDOP.Entity.S8;
using Admin.NET.Plugin.ApprovalFlow;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using SqlSugar;
namespace Admin.NET.Plugin.AiDOP.Service.S8;
///
/// 扫描疑似卡死的 ActiveFlow 异常,并输出可检索告警。
/// 当前只做发现,不做自动恢复。
///
public class S8ActiveFlowWatchService : ITransient
{
public const string AlertChannel = "s8-active-flow-stuck";
public const string AlertChannelOrphan = "s8-orphan-flow-instance";
public static readonly string[] S8FlowBizTypes = new[] { "EXCEPTION_ESCALATION", "EXCEPTION_CLOSURE" };
private readonly SqlSugarRepository _exceptionRep;
private readonly SqlSugarRepository _notificationLogRep;
private readonly SqlSugarRepository _flowInstanceRep;
private readonly S8NotificationService _notificationService;
private readonly S8ActiveFlowWatchOptions _options;
private readonly ILogger _logger;
public S8ActiveFlowWatchService(
SqlSugarRepository exceptionRep,
SqlSugarRepository notificationLogRep,
SqlSugarRepository flowInstanceRep,
S8NotificationService notificationService,
IOptions options,
ILogger logger)
{
_exceptionRep = exceptionRep;
_notificationLogRep = notificationLogRep;
_flowInstanceRep = flowInstanceRep;
_notificationService = notificationService;
_options = options.Value;
_logger = logger;
}
public async Task ScanAsync(CancellationToken cancellationToken = default)
{
if (!_options.Enabled)
return 0;
var now = DateTime.Now;
var thresholdHours = _options.ThresholdHours > 0 ? _options.ThresholdHours : 4;
var cooldownMinutes = _options.AlertCooldownMinutes > 0 ? _options.AlertCooldownMinutes : 60;
var batchSize = _options.BatchSize > 0 ? _options.BatchSize : 100;
var staleBefore = now.AddHours(-thresholdHours);
var alertedAfter = now.AddMinutes(-cooldownMinutes);
var staleExceptions = await _exceptionRep.AsQueryable()
.Where(x => !x.IsDeleted && x.ActiveFlowInstanceId.HasValue &&
SqlFunc.IsNull(x.UpdatedAt, x.CreatedAt) < staleBefore)
.OrderBy(x => x.UpdatedAt, OrderByType.Asc)
.OrderBy(x => x.Id, OrderByType.Asc)
.Take(batchSize)
.ToListAsync();
if (staleExceptions.Count == 0)
return 0;
var exceptionIds = staleExceptions.Select(x => x.Id).ToHashSet();
var recentAlertLogs = await _notificationLogRep.AsQueryable()
.Where(x => x.Channel == AlertChannel && x.CreatedAt >= alertedAfter && x.ExceptionId != null)
.ToListAsync();
var alertedIds = recentAlertLogs
.Where(x => x.ExceptionId.HasValue && exceptionIds.Contains(x.ExceptionId.Value))
.Select(x => x.ExceptionId!.Value)
.ToHashSet();
var alertCount = 0;
foreach (var item in staleExceptions.Where(x => !alertedIds.Contains(x.Id)))
{
cancellationToken.ThrowIfCancellationRequested();
var lastTouchedAt = item.UpdatedAt ?? item.CreatedAt;
var payload = new
{
type = "S8_ACTIVE_FLOW_STUCK",
message = "S8 异常存在进行中的审批实例,但已超过阈值未更新,请人工检查审批流状态与回调链路",
exceptionId = item.Id,
exceptionCode = item.ExceptionCode,
title = item.Title,
status = item.Status,
activeFlowInstanceId = item.ActiveFlowInstanceId,
activeFlowBizType = item.ActiveFlowBizType,
tenantId = item.TenantId,
factoryId = item.FactoryId,
lastTouchedAt,
thresholdHours,
detectedAt = now
};
try
{
await _notificationService.SendAsync(item.TenantId, item.FactoryId, item.Id, AlertChannel, payload);
_logger.LogWarning(
"S8 ActiveFlow 疑似卡死: ExceptionId={ExceptionId}, Code={ExceptionCode}, Status={Status}, FlowInstanceId={FlowInstanceId}, LastTouchedAt={LastTouchedAt}, ThresholdHours={ThresholdHours}",
item.Id,
item.ExceptionCode,
item.Status,
item.ActiveFlowInstanceId,
lastTouchedAt,
thresholdHours);
alertCount++;
}
catch (Exception ex)
{
_logger.LogError(
ex,
"S8 ActiveFlow 卡死告警写入失败: ExceptionId={ExceptionId}, Code={ExceptionCode}, FlowInstanceId={FlowInstanceId}",
item.Id,
item.ExceptionCode,
item.ActiveFlowInstanceId);
}
}
if (alertCount > 0)
{
_logger.LogInformation(
"S8 ActiveFlow 卡死扫描完成,本轮新增告警 {AlertCount} 条,候选 {CandidateCount} 条",
alertCount,
staleExceptions.Count);
}
var orphanAlertCount = await ScanOrphanFlowInstancesAsync(staleBefore, alertedAfter, batchSize, now, thresholdHours, cancellationToken);
return alertCount + orphanAlertCount;
}
///
/// N-2:扫描孤立的 S8 审批实例(FlowInstance 存在但无任何 AdoS8Exception 引用),疑似 OnFlowStarted 失败或回调链路丢失。
///
private async Task ScanOrphanFlowInstancesAsync(
DateTime staleBefore,
DateTime alertedAfter,
int batchSize,
DateTime now,
int thresholdHours,
CancellationToken cancellationToken)
{
// 本地副本:SqlSugar 表达式树不允许直接引用静态/私有字段,必须先拷到 lambda 闭包变量。
var bizTypes = S8FlowBizTypes;
var candidates = await _flowInstanceRep.AsQueryable()
.Where(fi => bizTypes.Contains(fi.BizType)
&& fi.Status == FlowInstanceStatusEnum.Running
&& fi.StartTime < staleBefore)
.OrderBy(fi => fi.StartTime, OrderByType.Asc)
.Take(batchSize)
.ToListAsync();
if (candidates.Count == 0)
return 0;
var bizIds = candidates.Select(fi => fi.BizId).ToHashSet();
var instanceIds = candidates.Select(fi => fi.Id).ToHashSet();
var linked = await _exceptionRep.AsQueryable()
.Where(e => !e.IsDeleted
&& bizIds.Contains(e.Id)
&& e.ActiveFlowInstanceId.HasValue
&& instanceIds.Contains(e.ActiveFlowInstanceId!.Value))
.Select(e => new { e.Id, FlowId = e.ActiveFlowInstanceId!.Value })
.ToListAsync();
var linkedPairs = linked.Select(x => (x.Id, x.FlowId)).ToHashSet();
var orphans = candidates.Where(fi => !linkedPairs.Contains((fi.BizId, fi.Id))).ToList();
if (orphans.Count == 0)
return 0;
var recentPayloads = await _notificationLogRep.AsQueryable()
.Where(x => x.Channel == AlertChannelOrphan && x.CreatedAt >= alertedAfter)
.Select(x => x.Payload)
.ToListAsync();
var alertedInstanceIds = new HashSet();
foreach (var p in recentPayloads)
{
var id = TryExtractFlowInstanceId(p);
if (id.HasValue) alertedInstanceIds.Add(id.Value);
}
var alertCount = 0;
foreach (var fi in orphans.Where(x => !alertedInstanceIds.Contains(x.Id)))
{
cancellationToken.ThrowIfCancellationRequested();
var payload = new
{
type = "S8_ORPHAN_FLOW_INSTANCE",
message = "S8 审批实例存在但未被任何业务异常单据引用,疑似 OnFlowStarted 失败或回调链路丢失",
flowInstanceId = fi.Id,
bizType = fi.BizType,
bizId = fi.BizId,
bizNo = fi.BizNo,
flowStatus = fi.Status.ToString(),
startTime = fi.StartTime,
thresholdHours,
detectedAt = now
};
try
{
await _notificationService.SendAsync(0, 0, null, AlertChannelOrphan, payload);
_logger.LogWarning(
"S8 孤立审批实例: FlowInstanceId={InstanceId}, BizType={BizType}, BizId={BizId}, StartTime={StartTime}",
fi.Id, fi.BizType, fi.BizId, fi.StartTime);
alertCount++;
}
catch (Exception ex)
{
_logger.LogError(ex,
"S8 孤立审批实例告警写入失败: FlowInstanceId={InstanceId}, BizType={BizType}, BizId={BizId}",
fi.Id, fi.BizType, fi.BizId);
}
}
if (alertCount > 0)
{
_logger.LogInformation(
"S8 孤立审批实例扫描完成,本轮新增告警 {AlertCount} 条,候选 {CandidateCount} 条",
alertCount, orphans.Count);
}
return alertCount;
}
///
/// 从历史告警 payload JSON 中提取 flowInstanceId 用于冷却去重。失败返回 null。
///
internal static long? TryExtractFlowInstanceId(string? payload)
{
if (string.IsNullOrWhiteSpace(payload))
return null;
try
{
using var doc = System.Text.Json.JsonDocument.Parse(payload);
if (doc.RootElement.TryGetProperty("flowInstanceId", out var prop)
&& prop.ValueKind == System.Text.Json.JsonValueKind.Number
&& prop.TryGetInt64(out var id))
{
return id;
}
}
catch
{
// payload 非法 JSON 时忽略,不影响后续告警。
}
return null;
}
}