using Admin.NET.Plugin.AiDOP.Entity.S8;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using SqlSugar;
namespace Admin.NET.Plugin.AiDOP.Service.S8;
///
/// 扫描疑似卡死的 ActiveFlow 异常,并输出可检索告警。
/// 当前只做发现,不做自动恢复。
///
public class S8ActiveFlowWatchService : ITransient
{
public const string AlertChannel = "s8-active-flow-stuck";
private readonly SqlSugarRepository _exceptionRep;
private readonly SqlSugarRepository _notificationLogRep;
private readonly S8NotificationService _notificationService;
private readonly S8ActiveFlowWatchOptions _options;
private readonly ILogger _logger;
public S8ActiveFlowWatchService(
SqlSugarRepository exceptionRep,
SqlSugarRepository notificationLogRep,
S8NotificationService notificationService,
IOptions options,
ILogger logger)
{
_exceptionRep = exceptionRep;
_notificationLogRep = notificationLogRep;
_notificationService = notificationService;
_options = options.Value;
_logger = logger;
}
public async Task ScanAsync(CancellationToken cancellationToken = default)
{
if (!_options.Enabled)
return 0;
var now = DateTime.Now;
var thresholdHours = _options.ThresholdHours > 0 ? _options.ThresholdHours : 4;
var cooldownMinutes = _options.AlertCooldownMinutes > 0 ? _options.AlertCooldownMinutes : 60;
var batchSize = _options.BatchSize > 0 ? _options.BatchSize : 100;
var staleBefore = now.AddHours(-thresholdHours);
var alertedAfter = now.AddMinutes(-cooldownMinutes);
var staleExceptions = await _exceptionRep.AsQueryable()
.Where(x => !x.IsDeleted && x.ActiveFlowInstanceId.HasValue &&
SqlFunc.IsNull(x.UpdatedAt, x.CreatedAt) < staleBefore)
.OrderBy(x => x.UpdatedAt, OrderByType.Asc)
.OrderBy(x => x.Id, OrderByType.Asc)
.Take(batchSize)
.ToListAsync();
if (staleExceptions.Count == 0)
return 0;
var exceptionIds = staleExceptions.Select(x => x.Id).ToHashSet();
var recentAlertLogs = await _notificationLogRep.AsQueryable()
.Where(x => x.Channel == AlertChannel && x.CreatedAt >= alertedAfter && x.ExceptionId != null)
.ToListAsync();
var alertedIds = recentAlertLogs
.Where(x => x.ExceptionId.HasValue && exceptionIds.Contains(x.ExceptionId.Value))
.Select(x => x.ExceptionId!.Value)
.ToHashSet();
var alertCount = 0;
foreach (var item in staleExceptions.Where(x => !alertedIds.Contains(x.Id)))
{
cancellationToken.ThrowIfCancellationRequested();
var lastTouchedAt = item.UpdatedAt ?? item.CreatedAt;
var payload = new
{
type = "S8_ACTIVE_FLOW_STUCK",
message = "S8 异常存在进行中的审批实例,但已超过阈值未更新,请人工检查审批流状态与回调链路",
exceptionId = item.Id,
exceptionCode = item.ExceptionCode,
title = item.Title,
status = item.Status,
activeFlowInstanceId = item.ActiveFlowInstanceId,
activeFlowBizType = item.ActiveFlowBizType,
tenantId = item.TenantId,
factoryId = item.FactoryId,
lastTouchedAt,
thresholdHours,
detectedAt = now
};
try
{
await _notificationService.SendAsync(item.TenantId, item.FactoryId, item.Id, AlertChannel, payload);
_logger.LogWarning(
"S8 ActiveFlow 疑似卡死: ExceptionId={ExceptionId}, Code={ExceptionCode}, Status={Status}, FlowInstanceId={FlowInstanceId}, LastTouchedAt={LastTouchedAt}, ThresholdHours={ThresholdHours}",
item.Id,
item.ExceptionCode,
item.Status,
item.ActiveFlowInstanceId,
lastTouchedAt,
thresholdHours);
alertCount++;
}
catch (Exception ex)
{
_logger.LogError(
ex,
"S8 ActiveFlow 卡死告警写入失败: ExceptionId={ExceptionId}, Code={ExceptionCode}, FlowInstanceId={FlowInstanceId}",
item.Id,
item.ExceptionCode,
item.ActiveFlowInstanceId);
}
}
if (alertCount > 0)
{
_logger.LogInformation(
"S8 ActiveFlow 卡死扫描完成,本轮新增告警 {AlertCount} 条,候选 {CandidateCount} 条",
alertCount,
staleExceptions.Count);
}
return alertCount;
}
}