using Admin.NET.Plugin.AiDOP.Entity.S8; using Admin.NET.Plugin.AiDOP.Infrastructure.S8; using Admin.NET.Plugin.AiDOP.Service.S8.Rules; using Microsoft.Extensions.Logging; using SqlSugar; using System.Data; using System.Globalization; using System.Text.Json; namespace Admin.NET.Plugin.AiDOP.Service.S8; /// /// 监视规则轮询调度服务(首轮存根)。 /// 后续接入 Admin.NET 定时任务机制后,由调度器周期调用 , /// 按各规则的 PollIntervalSeconds 逐条评估并生成异常记录。 /// public class S8WatchSchedulerService : ITransient { private readonly SqlSugarRepository _ruleRep; private readonly SqlSugarRepository _alertRuleRep; private readonly SqlSugarRepository _dataSourceRep; private readonly SqlSugarRepository _exceptionRep; private readonly SqlSugarRepository _exceptionTypeRep; private readonly S8NotificationService _notificationService; private readonly S8NotificationLayerResolver _notificationLayerResolver; private readonly S8ManualReportService _manualReportService; private readonly S8TimeoutRuleEvaluator _timeoutEvaluator; private readonly S8ShortageRuleEvaluator _shortageEvaluator; private readonly S8OutOfRangeRuleEvaluator _outOfRangeEvaluator; private readonly ILogger _logger; private readonly SqlSugarRepository _detectionLogRep; private readonly SqlSugarRepository _detectionStateRep; private const string DetectionTriggerSource = "WATCH_SCHEDULER"; private const string DetectResultCreated = "CREATED"; private const string DetectResultRefreshed = "REFRESHED"; private const string DetectResultRecovered = "RECOVERED"; private const string DetectResultNoHit = "NO_HIT"; private const string DetectResultEvaluateFailed = "EVALUATE_FAILED"; private const string DefaultTriggerType = "VALUE_DEVIATION"; private const string SqlDataSourceType = "SQL"; // G01-05 未闭环状态集合:复用自 S8ExceptionService 当前 pendingStatuses 事实口径 // (见 S8ExceptionService.GetPagedAsync 中 pendingStatuses 的定义,两处必须保持一致)。 // 这不是“自定义未闭环集合”;若现有口径调整,两处需同步修改。 private static readonly string[] UnclosedExceptionStatuses = { "NEW", "ASSIGNED", "IN_PROGRESS", "PENDING_VERIFICATION" }; public S8WatchSchedulerService( SqlSugarRepository ruleRep, SqlSugarRepository alertRuleRep, SqlSugarRepository dataSourceRep, SqlSugarRepository exceptionRep, SqlSugarRepository exceptionTypeRep, S8NotificationService notificationService, S8NotificationLayerResolver notificationLayerResolver, S8ManualReportService manualReportService, S8TimeoutRuleEvaluator timeoutEvaluator, S8ShortageRuleEvaluator shortageEvaluator, S8OutOfRangeRuleEvaluator outOfRangeEvaluator, ILogger logger, SqlSugarRepository detectionLogRep, SqlSugarRepository detectionStateRep) { _ruleRep = ruleRep; _alertRuleRep = alertRuleRep; _dataSourceRep = dataSourceRep; _exceptionRep = exceptionRep; _exceptionTypeRep = exceptionTypeRep; _notificationService = notificationService; _notificationLayerResolver = notificationLayerResolver; _manualReportService = manualReportService; _timeoutEvaluator = timeoutEvaluator; _shortageEvaluator = shortageEvaluator; _outOfRangeEvaluator = outOfRangeEvaluator; _logger = logger; _detectionLogRep = detectionLogRep; _detectionStateRep = detectionStateRep; } public async Task> LoadExecutionRulesAsync(long tenantId, long factoryId) { // R3 OUT_OF_RANGE 重写后,新三类(OUT_OF_RANGE/TIMEOUT/SHORTAGE)改走 ProcessRulesByTypeAsync。 // 旧 AlertRule 兼容主链此处只装载 RuleType 为空/未分类的历史规则,避免新旧双跑导致重复建单。 var watchRules = await _ruleRep.AsQueryable() .Where(x => x.TenantId == tenantId && x.FactoryId == factoryId && x.Enabled && x.SceneCode == S8SceneCode.S2 && (x.RuleType == null || x.RuleType == "")) .ToListAsync(); var deviceRules = watchRules .Where(x => IsDeviceWatchObjectType(x.WatchObjectType)) .ToList(); if (deviceRules.Count == 0) return new(); var dataSourceIds = deviceRules .Select(x => x.DataSourceId) .Distinct() .ToList(); var dataSources = await _dataSourceRep.AsQueryable() .Where(x => x.TenantId == tenantId && x.FactoryId == factoryId && x.Enabled && dataSourceIds.Contains(x.Id)) .ToListAsync(); var dataSourceMap = dataSources.ToDictionary(x => x.Id); if (dataSourceMap.Count == 0) return new(); var alertRules = (await _alertRuleRep.AsQueryable() .Where(x => x.TenantId == tenantId && x.FactoryId == factoryId && x.SceneCode == S8SceneCode.S2) .ToListAsync()) .Where(IsSupportedAlertRule) .ToList(); // G-01 首版 AlertRule 冲突口径(C 收口): // 当前场景存在多条可运行 AlertRule 时,视为“当前规则配置冲突”并跳过该规则, // 不按“首条”继续运行,也不扩大为“整场景停摆”。 // 当前模型下所有 device watchRule 共享同场景 AlertRule,故冲突态下所有 device 规则均跳过, // 但此处按“逐规则跳过”的语义实现,避免被误读为“整场景 return empty 停摆”。 var alertRule = alertRules.Count == 1 ? alertRules[0] : null; var executionRules = new List(); foreach (var watchRule in deviceRules.OrderBy(x => x.Id)) { // 配置冲突:当前规则跳过(不停摆其他规则)。 if (alertRule == null) continue; if (!dataSourceMap.TryGetValue(watchRule.DataSourceId, out var dataSource)) continue; if (!IsSupportedSqlDataSource(dataSource)) continue; executionRules.Add(new S8WatchExecutionRule { WatchRuleId = watchRule.Id, WatchRuleCode = watchRule.RuleCode, SceneCode = watchRule.SceneCode, TriggerType = DefaultTriggerType, WatchObjectType = watchRule.WatchObjectType.Trim(), DataSourceId = dataSource.Id, DataSourceCode = dataSource.DataSourceCode, DataSourceType = dataSource.Type, DataSourceConnection = dataSource.Endpoint?.Trim() ?? string.Empty, QueryExpression = watchRule.Expression?.Trim() ?? string.Empty, PollIntervalSeconds = watchRule.PollIntervalSeconds, AlertRuleId = alertRule.Id, AlertRuleCode = alertRule.RuleCode, TriggerCondition = alertRule.TriggerCondition!.Trim(), ThresholdValue = alertRule.ThresholdVal!.Trim(), Severity = alertRule.Severity }); } return executionRules; } public async Task> QueryDeviceRowsAsync(long tenantId, long factoryId) { var executionRules = await LoadExecutionRulesAsync(tenantId, factoryId); var results = new List(); foreach (var rule in executionRules) results.Add(await QueryDeviceRowsAsync(rule)); return results; } public async Task QueryDeviceRowsAsync(S8WatchExecutionRule rule) { if (!string.Equals(rule.DataSourceType, SqlDataSourceType, StringComparison.OrdinalIgnoreCase)) return S8WatchDeviceQueryResult.Fail(rule, "数据源类型不是 SQL,已跳过"); if (string.IsNullOrWhiteSpace(rule.QueryExpression)) return S8WatchDeviceQueryResult.Fail(rule, "查询表达式为空,已跳过"); try { using var db = CreateSqlQueryScope(rule.DataSourceConnection); var table = await db.Ado.GetDataTableAsync(rule.QueryExpression); if (!HasRequiredColumns(table)) return S8WatchDeviceQueryResult.Fail(rule, "查询结果缺少 required columns: related_object_code/current_value"); var rows = table.Rows.Cast() .Select(MapDeviceRow) .Where(x => !string.IsNullOrWhiteSpace(x.RelatedObjectCode)) .ToList(); return S8WatchDeviceQueryResult.Ok(rule, rows); } catch (Exception ex) { return S8WatchDeviceQueryResult.Fail(rule, $"查询执行失败: {ex.Message}"); } } /// /// G01-04:基于设备级结果行集做首版 VALUE_DEVIATION 单阈值判定, /// 产出命中结果对象列表,供 G01-05 去重与 G01-06 建单消费。 /// 本方法不做去重、不做建单、不做严重度重算、不做时间线。 /// public async Task> EvaluateHitsAsync(long tenantId, long factoryId) { var executionRules = await LoadExecutionRulesAsync(tenantId, factoryId); var ruleMap = executionRules.ToDictionary(x => x.WatchRuleId); var queryResults = new List(); foreach (var rule in executionRules) queryResults.Add(await QueryDeviceRowsAsync(rule)); var hits = new List(); foreach (var queryResult in queryResults) { // G01-03 查询失败:跳过,不进入判定。 if (!queryResult.Success) continue; if (!ruleMap.TryGetValue(queryResult.WatchRuleId, out var rule)) continue; // 判定参数缺失:跳过当前规则。 if (string.IsNullOrWhiteSpace(rule.TriggerCondition) || string.IsNullOrWhiteSpace(rule.ThresholdValue)) continue; // 比较符非法:跳过当前规则。 var op = TryParseTriggerCondition(rule.TriggerCondition); if (op == null) continue; // ThresholdValue 非法:跳过当前规则。 if (!TryParseDecimal(rule.ThresholdValue, out var threshold)) continue; foreach (var row in queryResult.Rows) { // CurrentValue 非法(null / 无法解析数值):跳过当前行,不进入判定。 if (row.CurrentValue == null) continue; // 未命中:不进入后续链路。 if (!EvaluateHit(row.CurrentValue.Value, op, threshold)) continue; hits.Add(new S8WatchHitResult { SourceRuleId = rule.WatchRuleId, SourceRuleCode = rule.WatchRuleCode, AlertRuleId = rule.AlertRuleId, DataSourceId = rule.DataSourceId, RelatedObjectCode = row.RelatedObjectCode, CurrentValue = row.CurrentValue.Value, ThresholdValue = threshold, TriggerCondition = op, Severity = rule.Severity, OccurrenceDeptId = row.OccurrenceDeptId, ResponsibleDeptId = row.ResponsibleDeptId, SourcePayload = row.SourcePayload }); } } return hits; } /// /// G01-05:未闭环异常去重最小实现。 /// 消费 G01-04 产出的 ,按 (SourceRuleId + RelatedObjectCode) /// 在未闭环状态集合内判重,只回答“是否允许建单”。 /// 首版明确不做:原单刷新 / 时间线追加 / payload 更新 / 次数累计 / 严重度重算 / 状态修复。 /// public async Task> EvaluateDedupAsync(long tenantId, long factoryId) { var hits = await EvaluateHitsAsync(tenantId, factoryId); var results = new List(hits.Count); foreach (var hit in hits) { // 防御性分支:正常情况下 SourceRuleId 与 RelatedObjectCode 已由上游 // (G01-02 规则装配 + G01-03 查询结果列校验)保证;此处仅作兜底, // 不是首版正常路径。 if (hit.SourceRuleId <= 0 || string.IsNullOrWhiteSpace(hit.RelatedObjectCode)) { results.Add(new S8WatchDedupResult { Hit = hit, CanCreate = false, MatchedExceptionId = null, Reason = "missing_dedup_key" }); continue; } long matchedId; try { matchedId = await FindPendingExceptionIdAsync( tenantId, factoryId, hit.SourceRuleId, hit.RelatedObjectCode); } catch { // 去重查询失败:首版偏保守,宁可阻止也不重复建单,不扩展为补偿。 results.Add(new S8WatchDedupResult { Hit = hit, CanCreate = false, MatchedExceptionId = null, Reason = "query_failed" }); continue; } if (matchedId > 0) { // 命中已有未闭环异常 → 阻止建单。 results.Add(new S8WatchDedupResult { Hit = hit, CanCreate = false, MatchedExceptionId = matchedId, Reason = "duplicate_pending" }); } else { // 未命中 → 允许建单,交 G01-06 消费。 results.Add(new S8WatchDedupResult { Hit = hit, CanCreate = true, MatchedExceptionId = null, Reason = "no_pending" }); } } return results; } // 取任意一条匹配的未闭环异常 Id 作为“是否存在重复单”的拦截依据。 // 首版只需要“存在性”,不关心“最早 / 最新”;不在 G01-05 处理排序语义。 private async Task FindPendingExceptionIdAsync( long tenantId, long factoryId, long sourceRuleId, string relatedObjectCode) { // SqlSugar 表达式翻译要求 Contains 数组变量必须可访问;此处将类级常量 // 承接到方法内局部变量,仅为表达式翻译服务,值与 UnclosedExceptionStatuses 一致。 var statuses = UnclosedExceptionStatuses; var ids = await _exceptionRep.AsQueryable() .Where(x => x.TenantId == tenantId && x.FactoryId == factoryId && !x.IsDeleted && x.SourceRuleId == sourceRuleId && x.RelatedObjectCode == relatedObjectCode && statuses.Contains(x.Status)) .Select(x => x.Id) .Take(1) .ToListAsync(); return ids.Count > 0 ? ids[0] : 0L; } /// /// G01-06:自动建单入口。消费 G01-05 去重结果,对 CanCreate==true 的命中 /// 复用 S8ManualReportService.CreateFromWatchAsync(同一主链的自动建单分支)落成标准 AdoS8Exception。 /// CanCreate==false 直接跳过;创建失败返回最小失败结果,不补偿、不重试、不对账。 /// public async Task> CreateExceptionsAsync(long tenantId, long factoryId) { var dedupResults = await EvaluateDedupAsync(tenantId, factoryId); var results = new List(dedupResults.Count); foreach (var dedup in dedupResults) { if (!dedup.CanCreate) { results.Add(new S8WatchCreationResult { DedupResult = dedup, Created = false, Skipped = true, CreatedExceptionId = null, Reason = dedup.Reason, ErrorMessage = null }); continue; } try { var entity = await _manualReportService.CreateFromWatchAsync(dedup.Hit); await TryDispatchLayerNotificationAsync(entity); results.Add(new S8WatchCreationResult { DedupResult = dedup, Created = true, Skipped = false, CreatedExceptionId = entity.Id, Reason = "auto_created", ErrorMessage = null }); } catch (Exception ex) { results.Add(new S8WatchCreationResult { DedupResult = dedup, Created = false, Skipped = false, CreatedExceptionId = null, Reason = "create_failed", ErrorMessage = ex.Message }); } } // R3-OUT_OF_RANGE-REWRITE-1:三类正式 evaluator 路径。 // 旧 AlertRule 兼容主链(上方 dedupResults 循环)已在 LoadExecutionRulesAsync 处过滤掉 // OUT_OF_RANGE/TIMEOUT/SHORTAGE,只保留未分类历史规则;故新旧不双跑。 // R6 RunId:本次 CreateExceptionsAsync 调用对应的统一关联 id,落入 detection_log。 var runId = Guid.NewGuid().ToString("N").Substring(0, 16); results.AddRange(await ProcessRulesByTypeAsync(tenantId, factoryId, _timeoutEvaluator, S8TimeoutRuleEvaluator.RuleTypeCode, runId)); results.AddRange(await ProcessRulesByTypeAsync(tenantId, factoryId, _shortageEvaluator, S8ShortageRuleEvaluator.RuleTypeCode, runId)); results.AddRange(await ProcessRulesByTypeAsync(tenantId, factoryId, _outOfRangeEvaluator, S8OutOfRangeRuleEvaluator.RuleTypeCode, runId)); return results; } /// /// R2 TIMEOUT 类规则主链:薄包装,复用 。RunId 由内部生成。 /// public Task> ProcessTimeoutRulesAsync(long tenantId, long factoryId) => ProcessRulesByTypeAsync(tenantId, factoryId, _timeoutEvaluator, S8TimeoutRuleEvaluator.RuleTypeCode, Guid.NewGuid().ToString("N").Substring(0, 16)); /// /// R3 SHORTAGE 类规则主链:薄包装,复用 。RunId 由内部生成。 /// public Task> ProcessShortageRulesAsync(long tenantId, long factoryId) => ProcessRulesByTypeAsync(tenantId, factoryId, _shortageEvaluator, S8ShortageRuleEvaluator.RuleTypeCode, Guid.NewGuid().ToString("N").Substring(0, 16)); /// /// R3-OUT_OF_RANGE-REWRITE-1:OUT_OF_RANGE 类规则主链。 /// 复用 ,并对历史 dedup_key=NULL 的旧记录做 compat fallback: /// (source_rule_id=rule.Id AND related_object_code=hit AND status!=CLOSED AND dedup_key IS NULL AND is_deleted=0) /// 命中则 backfill 6 列,避免重复建单。RunId 由内部生成。 /// public Task> ProcessOutOfRangeRulesAsync(long tenantId, long factoryId) => ProcessRulesByTypeAsync(tenantId, factoryId, _outOfRangeEvaluator, S8OutOfRangeRuleEvaluator.RuleTypeCode, Guid.NewGuid().ToString("N").Substring(0, 16)); /// /// R2/R3 通用规则主链。S8-SCHED-CLEANUP-LEGACY-PATH-1:本方法已收敛为 thin wrapper, /// 单规则处理(evaluator → reconcile → hit 循环 → CREATED/REFRESHED/NO_HIT/EVALUATE_FAILED 日志) /// 全部下沉至 ,与 Job tick 路径 /// ( → ProcessSingleRuleAsync) /// 共享同一份逻辑,避免双维护。 /// /// 调用方仅有 (debug run-once / Process*RulesAsync 公共薄包装)。 /// debug run-once 不持 lease(保留"手动立即跑"语义),但本方法在每条规则结束时通过 /// 同步更新 watch_rule.last_run_at / last_status / /// last_error / last_duration_ms / last_run_id / consecutive_failure_count,缩小 run-once 与 /// Job tick 之间的 last_* 状态分裂;不动 lock_token / running_started_at / paused_until,避免 /// 与正在持锁运行的 Job 撕扯。 /// private async Task> ProcessRulesByTypeAsync( long tenantId, long factoryId, IS8RuleEvaluator evaluator, string ruleType, string runId) { var aggregate = new List(); var rules = await _ruleRep.AsQueryable() .Where(x => x.TenantId == tenantId && x.FactoryId == factoryId && x.Enabled && x.RuleType == ruleType) .ToListAsync(); if (rules.Count == 0) return aggregate; var alertRules = (await _alertRuleRep.AsQueryable() .Where(x => x.TenantId == tenantId && x.FactoryId == factoryId) .ToListAsync()).AsReadOnly(); foreach (var rule in rules.OrderBy(x => x.Id)) { // S8-RUN-ONCE-LEASE-AWARENESS-1:debug run-once 不持 lease,但若该 rule 已被 Scheduler Job // 通过 PickReadyRulesAsync 抢锁(lock_token 非空且 lock_until > now),run-once 跳过该 rule, // 不写 last_*、不写 detection_log、不动锁,以避免与 Job tick 并发评估同一 rule 而互相覆盖运行态。 // 锁状态用 ToListAsync 的初始快照判定;run-once 与 Job 的微秒级竞速无法在不抢 lease 的前提下 // 完全消除(已登记为 follow-up 风险)。 if (!string.IsNullOrEmpty(rule.LockToken) && rule.LockUntil.HasValue && rule.LockUntil.Value > DateTime.Now) { _logger.LogInformation( "run_once_skip_locked ruleId={RuleId} ruleCode={RuleCode} ruleType={RuleType} lockToken={Token} lockedBy={By} lockUntil={Until}", rule.Id, rule.RuleCode, ruleType, rule.LockToken, rule.LockedBy, rule.LockUntil); aggregate.Add(BuildSkipResult(rule, "rule_locked_by_scheduler", null)); continue; } var sw = System.Diagnostics.Stopwatch.StartNew(); S8RuleRunResult completion; try { var ruleResults = await ProcessSingleRuleAsync(tenantId, factoryId, rule, ruleType, evaluator, alertRules, runId); aggregate.AddRange(ruleResults); completion = new S8RuleRunResult { Success = true, Stats = new S8RuleRunStats { Hits = ruleResults.Count, Created = ruleResults.Count(r => r.Created), Refreshed = ruleResults.Count(r => r.Reason == "duplicate_pending"), Pending = ruleResults.Count(r => r.Reason == "antiflap_pending_hit"), Failed = ruleResults.Count(r => r.Reason == "create_failed" || r.Reason == "refresh_failed" || r.Reason == "antiflap_failed" || r.Reason == "evaluate_failed") } }; } catch (Exception ex) { // ProcessSingleRuleAsync 在 evaluator 抛 S8RuleEvaluatorException 时已写 EVALUATE_FAILED // detection_log 后再 throw(供 RunSingleRuleAsync 标 Success=false)。本路径无 lease, // 吞异常以保留"逐规则失败不影响其他规则"的旧 ProcessRulesByTypeAsync 语义。 _logger.LogWarning(ex, "process_rule_failed ruleCode={RuleCode} ruleType={RuleType}", rule.RuleCode, ruleType); aggregate.Add(BuildSkipResult(rule, "evaluate_failed", ex.Message)); completion = new S8RuleRunResult { Success = false, ErrorMessage = ex.Message, Stats = new() }; } sw.Stop(); try { await ApplyRunOnceCompletionAsync(rule, completion, (int)sw.ElapsedMilliseconds, runId); } catch (Exception ex) { _logger.LogWarning(ex, "run_once_completion_write_failed ruleCode={RuleCode} ruleType={RuleType}", rule.RuleCode, ruleType); } } return aggregate; } /// /// S8-SCHED-CLEANUP-LEGACY-PATH-1:debug run-once 的 last_* 状态回写(无 lease 版)。 /// 与 的语义平行,差异在于: /// - 不要求 lock_token 匹配(run-once 不持 lease) /// - 不写 lock_token / locked_by / lock_until / running_started_at(不与 Job lease 撕扯) /// - 不做 consecutive_failure_count 阈值的 paused_until 自动暂停(debug 路径不应自动暂停 demo rule) /// 写入:last_run_at / next_run_at / last_status / last_error / last_duration_ms / last_run_id / /// consecutive_failure_count / updated_at。 /// private async Task ApplyRunOnceCompletionAsync(AdoS8WatchRule rule, S8RuleRunResult result, int durationMs, string runId) { var now = DateTime.Now; var nextRunAt = now.AddSeconds(NormalizePollInterval(rule.PollIntervalSeconds)); if (result.Success) { await _ruleRep.Context.Updateable() .SetColumns(x => new AdoS8WatchRule { LastRunAt = now, NextRunAt = nextRunAt, LastStatus = "SUCCESS", LastError = null, LastDurationMs = durationMs, LastRunId = runId, ConsecutiveFailureCount = 0, UpdatedAt = now }) .Where(x => x.Id == rule.Id) .ExecuteCommandAsync(); } else { var errorTrunc = Truncate(result.ErrorMessage, 500); await _ruleRep.Context.Updateable() .SetColumns(x => new AdoS8WatchRule { LastRunAt = now, NextRunAt = nextRunAt, LastStatus = "FAILED", LastError = errorTrunc, LastDurationMs = durationMs, LastRunId = runId, ConsecutiveFailureCount = x.ConsecutiveFailureCount + 1, UpdatedAt = now }) .Where(x => x.Id == rule.Id) .ExecuteCommandAsync(); } } // S8-SCHED-EXEC-1:trigger / recover 抗抖计数兜底,null / <1 / >10 一律按 1,避免非法配置导致永远不建单 / 永远不恢复。 private static int NormalizeAntiflapCount(int raw) { if (raw < 1 || raw > 10) return 1; return raw; } /// /// 命中时累加 detection_state.consecutive_hit_count;未存在则插入 hitCount=1。 /// 返回当前 state 行(含 Id)以及命中后的 hitCount。 /// 注意:本函数不消费 trigger_count_required;上游决定是否进入 CreateFromHitAsync。 /// private async Task<(AdoS8RuleDetectionState? state, int hitCount)> UpsertDetectionStateOnHitAsync( long tenantId, long factoryId, AdoS8WatchRule rule, S8RuleHit hit) { var now = DateTime.Now; var existing = await _detectionStateRep.AsQueryable() .Where(x => x.TenantId == tenantId && x.FactoryId == factoryId && x.RuleCode == rule.RuleCode && x.DedupKey == hit.DedupKey) .FirstAsync(); if (existing == null) { var fresh = new AdoS8RuleDetectionState { TenantId = tenantId, FactoryId = factoryId, RuleCode = rule.RuleCode, DedupKey = hit.DedupKey, SourceObjectType = string.IsNullOrEmpty(hit.SourceObjectType) ? null : hit.SourceObjectType, SourceObjectId = string.IsNullOrEmpty(hit.SourceObjectId) ? null : hit.SourceObjectId, ConsecutiveHitCount = 1, ConsecutiveMissCount = 0, LastSeenAt = now, LastHitAt = now, CreatedAt = now, UpdatedAt = now }; // BUG-S8-DETECTION-STATE-ACTIVE-EXC-ID-TRIGGER1-001:必须回填 fresh.Id,否则 // trigger=1 首 tick 建单后 UPDATE state SET active_exception_id WHERE id=state.Id // 命中 0 行(state.Id 为默认 0)。沿用 S8ManualReportService 既定模式。 fresh = await _detectionStateRep.AsInsertable(fresh).ExecuteReturnEntityAsync(); return (fresh, 1); } var newHitCount = existing.ConsecutiveHitCount + 1; await _detectionStateRep.Context.Updateable() .SetColumns(x => new AdoS8RuleDetectionState { ConsecutiveHitCount = x.ConsecutiveHitCount + 1, ConsecutiveMissCount = 0, LastSeenAt = now, LastHitAt = now, SourceObjectType = string.IsNullOrEmpty(hit.SourceObjectType) ? existing.SourceObjectType : hit.SourceObjectType, SourceObjectId = string.IsNullOrEmpty(hit.SourceObjectId) ? existing.SourceObjectId : hit.SourceObjectId, UpdatedAt = now }) .Where(x => x.Id == existing.Id) .ExecuteCommandAsync(); existing.ConsecutiveHitCount = newHitCount; return (existing, newHitCount); } private async Task FindOpenExceptionByDedupKeyAsync(long tenantId, long factoryId, string dedupKey) { var ids = await _exceptionRep.AsQueryable() .Where(x => x.TenantId == tenantId && x.FactoryId == factoryId && !x.IsDeleted && x.Status != "CLOSED" && x.DedupKey == dedupKey) .Select(x => x.Id) .Take(1) .ToListAsync(); return ids.Count > 0 ? ids[0] : 0L; } /// /// R3 OUT_OF_RANGE compat fallback 查找:用 (source_rule_id, related_object_code, status!=CLOSED, /// dedup_key IS NULL, is_deleted=0) 严格条件定位旧 AlertRule 主链留下的历史记录。 /// private async Task FindLegacyOutOfRangeExceptionAsync(long tenantId, long factoryId, long sourceRuleId, string relatedObjectCode) { if (string.IsNullOrWhiteSpace(relatedObjectCode)) return 0L; var ids = await _exceptionRep.AsQueryable() .Where(x => x.TenantId == tenantId && x.FactoryId == factoryId && !x.IsDeleted && x.Status != "CLOSED" && x.DedupKey == null && x.SourceRuleId == sourceRuleId && x.RelatedObjectCode == relatedObjectCode) .Select(x => x.Id) .Take(1) .ToListAsync(); return ids.Count > 0 ? ids[0] : 0L; } /// /// R3 OUT_OF_RANGE compat fallback backfill:把历史记录的 R1 新 6 列(dedup_key/source_rule_code/ /// source_object_type/source_object_id/source_payload/last_detected_at)写入,并刷新 updated_at。 /// private async Task BackfillLegacyExceptionAsync(long exceptionId, S8RuleHit hit) { await _exceptionRep.Context.Updateable() .SetColumns(x => new AdoS8Exception { DedupKey = hit.DedupKey, SourceRuleCode = hit.SourceRuleCode, SourceObjectType = hit.SourceObjectType, SourceObjectId = hit.SourceObjectId, SourcePayload = hit.SourcePayload, LastDetectedAt = hit.DetectedAt, UpdatedAt = DateTime.Now }) .Where(x => x.Id == exceptionId) .ExecuteCommandAsync(); } /// /// R5 恢复时间最小闭环:对当前 rule 下未关闭、有 dedup_key、recovered_at 仍为 NULL 的异常, /// 凡不在本轮 hits.dedup_key 集合内的,写入 recovered_at = now、updated_at = now。 /// 仅写这 2 列;不动 status / assignee / verifier / source_payload / last_detected_at; /// recovered_at 一旦写入,本轮不做复发清空。 /// R6 返回 recoveredIds 供上游决定是否写 NO_HIT 日志,并对每个 recovered exception 写一条 RECOVERED 日志。 /// private async Task> ReconcileRecoveriesForRuleAsync( long tenantId, long factoryId, AdoS8WatchRule rule, string ruleType, List hits, string runId) { var hitDedupKeys = hits .Where(h => !string.IsNullOrWhiteSpace(h.DedupKey)) .Select(h => h.DedupKey) .ToHashSet(StringComparer.Ordinal); var candidates = await _exceptionRep.AsQueryable() .Where(x => x.TenantId == tenantId && x.FactoryId == factoryId && !x.IsDeleted && x.Status != "CLOSED" && x.SourceRuleCode == rule.RuleCode && x.DedupKey != null && x.RecoveredAt == null) .Select(x => new { x.Id, x.DedupKey, x.SourceObjectType, x.SourceObjectId, x.RelatedObjectCode, x.ConsecutiveMissCount }) .ToListAsync(); if (candidates.Count == 0) return new List(); var now = DateTime.Now; var recoverRequired = NormalizeAntiflapCount(rule.RecoverCountRequired); var recoveredIds = new List(); foreach (var c in candidates) { if (hitDedupKeys.Contains(c.DedupKey!)) continue; // S8-SCHED-EXEC-1:恢复抗抖累计。 // 1) 每次未命中:异常 ConsecutiveMissCount += 1,ConsecutiveHitCount 清零; // 2) miss < recover_count_required:仅累计,不写 recovered_at、不写 RECOVERED; // 3) miss >= recover_count_required:写 recovered_at、写 RECOVERED 日志。 var newMissCount = c.ConsecutiveMissCount + 1; await _exceptionRep.Context.Updateable() .SetColumns(x => new AdoS8Exception { ConsecutiveMissCount = x.ConsecutiveMissCount + 1, ConsecutiveHitCount = 0, UpdatedAt = now }) .Where(x => x.Id == c.Id) .ExecuteCommandAsync(); // detection_state 同步累计 miss(建单后 state.active_exception_id 仍指向 c.Id)。 await _detectionStateRep.Context.Updateable() .SetColumns(x => new AdoS8RuleDetectionState { ConsecutiveMissCount = x.ConsecutiveMissCount + 1, ConsecutiveHitCount = 0, LastSeenAt = now, LastMissAt = now, UpdatedAt = now }) .Where(x => x.TenantId == tenantId && x.FactoryId == factoryId && x.RuleCode == rule.RuleCode && x.DedupKey == c.DedupKey) .ExecuteCommandAsync(); if (newMissCount < recoverRequired) { _logger.LogInformation( "antiflap_pending_recovery ruleCode={RuleCode} dedupKey={DedupKey} missCount={Miss} recoverRequired={Required}", rule.RuleCode, c.DedupKey, newMissCount, recoverRequired); continue; } await _exceptionRep.Context.Updateable() .SetColumns(x => new AdoS8Exception { RecoveredAt = now, UpdatedAt = now }) .Where(x => x.Id == c.Id) .ExecuteCommandAsync(); recoveredIds.Add(c.Id); await WriteDetectionLogAsync(new AdoS8DetectionLog { TenantId = tenantId, FactoryId = factoryId, RuleId = rule.Id, RuleCode = rule.RuleCode, RuleType = ruleType, SceneCode = rule.SceneCode, SourceObjectType = c.SourceObjectType, SourceObjectId = c.SourceObjectId, RelatedObjectCode = c.RelatedObjectCode, DedupKey = c.DedupKey, DetectResult = DetectResultRecovered, ExceptionId = c.Id, DetectedAt = now, PayloadSnapshot = JsonSerializer.Serialize(new { ruleId = rule.Id, ruleCode = rule.RuleCode, reason = "no_longer_hit", missCount = newMissCount, recoverRequired }), RunId = runId, TriggerSource = DetectionTriggerSource, Remark = "Rule no longer hit; recovered_at marked" }); // S8-NOTIFY-WIRE-RECOVERED-1:detection_log 已写入、recovered_at 已落库后挂入恢复通知。 // 通知失败仅 LogWarning,绝不影响恢复状态/检测日志。 await TryDispatchRecoveredLayerNotificationAsync(c.Id); } if (recoveredIds.Count > 0) { _logger.LogInformation( "rule_recovered ruleCode={RuleCode} ruleType={RuleType} recoveredCount={Count} recoveredIds={Ids}", rule.RuleCode, ruleType, recoveredIds.Count, string.Join(",", recoveredIds)); } return recoveredIds; } /// R6 通用 hit 日志构造(CREATED / REFRESHED 共用)。 private static AdoS8DetectionLog BuildHitLog( long tenantId, long factoryId, AdoS8WatchRule rule, string ruleType, S8RuleHit hit, string detectResult, long exceptionId, string runId) => new() { TenantId = tenantId, FactoryId = factoryId, RuleId = rule.Id, RuleCode = rule.RuleCode, RuleType = ruleType, SceneCode = rule.SceneCode, SourceObjectType = hit.SourceObjectType, SourceObjectId = hit.SourceObjectId, RelatedObjectCode = hit.RelatedObjectCode, DedupKey = hit.DedupKey, DetectResult = detectResult, ExceptionId = exceptionId, DetectedAt = hit.DetectedAt, PayloadSnapshot = hit.SourcePayload, RunId = runId, TriggerSource = DetectionTriggerSource }; /// R6 日志写入:失败仅 LogWarning,不阻断主链;不抛异常。 private async Task WriteDetectionLogAsync(AdoS8DetectionLog log) { try { await _detectionLogRep.InsertAsync(log); } catch (Exception ex) { _logger.LogWarning(ex, "detection_log_write_failed runId={RunId} detectResult={Result} ruleCode={RuleCode} exceptionId={ExceptionId}", log.RunId, log.DetectResult, log.RuleCode, log.ExceptionId); } } private static string Truncate(string? s, int max) => string.IsNullOrEmpty(s) ? string.Empty : (s.Length <= max ? s : s.Substring(0, max)); private async Task RefreshDetectionAsync(long exceptionId, S8RuleHit hit) { // S8-SCHED-EXEC-1:刷新阶段抗抖累计 + 复发清空 recovered_at。 // ConsecutiveHitCount += 1(用 SetColumns 内表达式完成原子自增);ConsecutiveMissCount 归零。 // RecoveredAt 不为 NULL 时(复发)一并清空,保持业务对"再次命中即视为活跃"的预期。 await _exceptionRep.Context.Updateable() .SetColumns(x => new AdoS8Exception { LastDetectedAt = hit.DetectedAt, SourcePayload = hit.SourcePayload, ConsecutiveHitCount = x.ConsecutiveHitCount + 1, ConsecutiveMissCount = 0, RecoveredAt = null, UpdatedAt = DateTime.Now }) .Where(x => x.Id == exceptionId) .ExecuteCommandAsync(); } private static S8WatchCreationResult BuildCreatedResult(AdoS8WatchRule rule, S8RuleHit hit, long exceptionId) => new() { DedupResult = new S8WatchDedupResult { Hit = ToWatchHit(rule, hit), CanCreate = true, MatchedExceptionId = null, Reason = "no_pending" }, Created = true, Skipped = false, CreatedExceptionId = exceptionId, Reason = "auto_created", ErrorMessage = null }; private static S8WatchCreationResult BuildSkippedDuplicate(AdoS8WatchRule rule, S8RuleHit hit, long matchedId) => new() { DedupResult = new S8WatchDedupResult { Hit = ToWatchHit(rule, hit), CanCreate = false, MatchedExceptionId = matchedId, Reason = "duplicate_pending" }, Created = false, Skipped = true, CreatedExceptionId = null, Reason = "duplicate_pending", ErrorMessage = null }; private static S8WatchCreationResult BuildSkipResult(AdoS8WatchRule rule, string reason, string? error, S8RuleHit? hit = null) => new() { DedupResult = new S8WatchDedupResult { Hit = hit != null ? ToWatchHit(rule, hit) : new S8WatchHitResult { SourceRuleId = rule.Id, SourceRuleCode = rule.RuleCode }, CanCreate = false, MatchedExceptionId = null, Reason = reason }, Created = false, Skipped = true, CreatedExceptionId = null, Reason = reason, ErrorMessage = error }; private static S8WatchHitResult ToWatchHit(AdoS8WatchRule rule, S8RuleHit hit) => new() { SourceRuleId = hit.SourceRuleId == 0 ? rule.Id : hit.SourceRuleId, SourceRuleCode = string.IsNullOrEmpty(hit.SourceRuleCode) ? rule.RuleCode : hit.SourceRuleCode, DataSourceId = hit.DataSourceId, RelatedObjectCode = hit.RelatedObjectCode, Severity = hit.Severity, OccurrenceDeptId = hit.OccurrenceDeptId, ResponsibleDeptId = hit.ResponsibleDeptId, SourcePayload = hit.SourcePayload }; /// /// 单次轮询入口。当前仅完成规则读取与组装,返回可执行规则数量,不做实际数据采集。 /// public async Task RunOnceAsync() { var executionRules = await LoadExecutionRulesAsync(1, 1); return executionRules.Count; } // G01-04 首版最小比较符集合:>, >=, <, <=。 // 允许首尾空格;非此集合的一律视为“比较符非法”,由调用方跳过。 private static string? TryParseTriggerCondition(string raw) { var normalized = raw.Trim(); return normalized switch { ">" => ">", ">=" => ">=", "<" => "<", "<=" => "<=", _ => null }; } private static bool TryParseDecimal(string raw, out decimal value) => decimal.TryParse(raw.Trim(), NumberStyles.Any, CultureInfo.InvariantCulture, out value); private static bool EvaluateHit(decimal current, string op, decimal threshold) => op switch { ">" => current > threshold, ">=" => current >= threshold, "<" => current < threshold, "<=" => current <= threshold, _ => false }; private static bool IsSupportedAlertRule(AdoS8AlertRule alertRule) => !string.IsNullOrWhiteSpace(alertRule.TriggerCondition) && !string.IsNullOrWhiteSpace(alertRule.ThresholdVal); private static bool IsSupportedSqlDataSource(AdoS8DataSource dataSource) => dataSource.Enabled && string.Equals(dataSource.Type?.Trim(), SqlDataSourceType, StringComparison.OrdinalIgnoreCase) && !string.IsNullOrWhiteSpace(dataSource.Endpoint); private static bool IsDeviceWatchObjectType(string? watchObjectType) { if (string.IsNullOrWhiteSpace(watchObjectType)) return false; var normalized = watchObjectType.Trim().ToUpperInvariant(); return normalized is "DEVICE" or "EQUIPMENT" || watchObjectType.Trim() == "设备"; } private SqlSugarScope CreateSqlQueryScope(string connectionString) { var dbType = _ruleRep.Context.CurrentConnectionConfig.DbType; return new SqlSugarScope(new ConnectionConfig { ConfigId = $"s8-watch-sql-{Guid.NewGuid():N}", DbType = dbType, ConnectionString = connectionString, InitKeyType = InitKeyType.Attribute, IsAutoCloseConnection = true }); } private static bool HasRequiredColumns(DataTable table) => TryGetColumnName(table.Columns, "related_object_code") != null && TryGetColumnName(table.Columns, "current_value") != null; private static S8WatchDeviceRow MapDeviceRow(DataRow row) { var columns = row.Table.Columns; var relatedObjectCodeColumn = TryGetColumnName(columns, "related_object_code"); var currentValueColumn = TryGetColumnName(columns, "current_value"); var occurrenceDeptIdColumn = TryGetColumnName(columns, "occurrence_dept_id"); var responsibleDeptIdColumn = TryGetColumnName(columns, "responsible_dept_id"); return new S8WatchDeviceRow { RelatedObjectCode = ReadString(row, relatedObjectCodeColumn), CurrentValue = ReadDecimal(row, currentValueColumn), OccurrenceDeptId = ReadLong(row, occurrenceDeptIdColumn), ResponsibleDeptId = ReadLong(row, responsibleDeptIdColumn), SourcePayload = BuildSourcePayload(row) }; } private static string? TryGetColumnName(DataColumnCollection columns, string expectedName) { var normalizedExpected = NormalizeColumnName(expectedName); foreach (DataColumn column in columns) { if (NormalizeColumnName(column.ColumnName) == normalizedExpected) return column.ColumnName; } return null; } private static string NormalizeColumnName(string columnName) => columnName.Replace("_", string.Empty, StringComparison.Ordinal).Trim().ToUpperInvariant(); private static string ReadString(DataRow row, string? columnName) { if (string.IsNullOrWhiteSpace(columnName)) return string.Empty; var value = row[columnName]; return value == DBNull.Value ? string.Empty : Convert.ToString(value)?.Trim() ?? string.Empty; } private static long? ReadLong(DataRow row, string? columnName) { if (string.IsNullOrWhiteSpace(columnName)) return null; var value = row[columnName]; if (value == DBNull.Value) return null; return long.TryParse(Convert.ToString(value, CultureInfo.InvariantCulture), out var result) ? result : null; } private static decimal? ReadDecimal(DataRow row, string? columnName) { if (string.IsNullOrWhiteSpace(columnName)) return null; var value = row[columnName]; if (value == DBNull.Value) return null; return decimal.TryParse(Convert.ToString(value, CultureInfo.InvariantCulture), NumberStyles.Any, CultureInfo.InvariantCulture, out var result) ? result : null; } private static string BuildSourcePayload(DataRow row) { var payload = new Dictionary(StringComparer.OrdinalIgnoreCase); foreach (DataColumn column in row.Table.Columns) { var value = row[column]; payload[column.ColumnName] = value == DBNull.Value ? null : value; } return JsonSerializer.Serialize(payload); } // ============================================================ // S8-SCHED-EXEC-1:DB 驱动调度执行层 // ============================================================ private const int LeaseDurationMinutes = 5; private const int AutoPauseFailureThreshold = 3; private const int AutoPauseDurationHours = 1; private const int DefaultPollIntervalSeconds = 300; private const int MinPollIntervalSeconds = 60; private const int MaxPollIntervalSeconds = 86400; /// /// S8-SCHED-EXEC-1:释放过期 lease(lock_until < NOW),不修改 last_status / last_error,仅清空 lock 三件套 + running_started_at。 /// 返回释放的行数。 /// public async Task ResetExpiredLeasesAsync(long tenantId, long factoryId) { var now = DateTime.Now; var affected = await _ruleRep.Context.Updateable() .SetColumns(x => new AdoS8WatchRule { LockToken = null, LockedBy = null, LockUntil = null, RunningStartedAt = null, UpdatedAt = now }) .Where(x => x.TenantId == tenantId && x.FactoryId == factoryId && x.LockUntil != null && x.LockUntil < now) .ExecuteCommandAsync(); if (affected > 0) { _logger.LogWarning( "lease_reset tenantId={Tenant} factoryId={Factory} releasedCount={Count}", tenantId, factoryId, affected); } return affected; } /// /// S8-SCHED-EXEC-1:到期规则候选 + 乐观 UPDATE 抢锁,返回成功抢到的 lease 列表。 /// 抢锁条件:enabled=1 AND (paused_until IS NULL OR paused_until <= NOW) /// AND (next_run_at IS NULL OR next_run_at <= NOW) /// AND (lock_until IS NULL OR lock_until <= NOW)。 /// 抢锁回写:lock_token / locked_by / lock_until = NOW + 5min / running_started_at = NOW / last_run_id = runId。 /// affectedRows == 1 才算抢到;后续 OnRuleCompletedAsync 必须按 lockToken 回写,避免旧进程覆盖新 lease。 /// public async Task> PickReadyRulesAsync(long tenantId, long factoryId, int batchSize, string lockedBy, string runId) { if (batchSize <= 0) batchSize = 16; var now = DateTime.Now; var candidates = await _ruleRep.AsQueryable() .Where(x => x.TenantId == tenantId && x.FactoryId == factoryId && x.Enabled && (x.PausedUntil == null || x.PausedUntil <= now) && (x.NextRunAt == null || x.NextRunAt <= now) && (x.LockUntil == null || x.LockUntil <= now)) .OrderBy(x => x.NextRunAt, OrderByType.Asc) .OrderBy(x => x.Id, OrderByType.Asc) .Take(batchSize) .Select(x => new { x.Id, x.RuleCode, x.RuleType }) .ToListAsync(); if (candidates.Count == 0) return new(); var leases = new List(); foreach (var c in candidates) { var token = Guid.NewGuid().ToString("N"); var lockUntil = DateTime.Now.AddMinutes(LeaseDurationMinutes); var runningAt = DateTime.Now; // 乐观 UPDATE:再校验一次条件,affectedRows=1 才算抢到。 var affected = await _ruleRep.Context.Updateable() .SetColumns(x => new AdoS8WatchRule { LockToken = token, LockedBy = lockedBy, LockUntil = lockUntil, RunningStartedAt = runningAt, LastRunId = runId, UpdatedAt = runningAt }) .Where(x => x.Id == c.Id && x.Enabled && (x.PausedUntil == null || x.PausedUntil <= runningAt) && (x.NextRunAt == null || x.NextRunAt <= runningAt) && (x.LockUntil == null || x.LockUntil <= runningAt)) .ExecuteCommandAsync(); if (affected == 1) { leases.Add(new S8RuleLease { RuleId = c.Id, RuleCode = c.RuleCode, RuleType = c.RuleType, LockToken = token, LockedBy = lockedBy, LockUntil = lockUntil, RunId = runId, AcquiredAt = runningAt }); } } return leases; } /// /// S8-SCHED-EXEC-1:执行单条已抢锁规则的 evaluator → 抗抖去重 → 建单/刷新 → 恢复 reconcile。 /// 不释放 lease(OnRuleCompletedAsync 负责);evaluator 抛异常时 Result.Success=false 并保留 ErrorMessage。 /// public async Task RunSingleRuleAsync(long tenantId, long factoryId, S8RuleLease lease) { var rule = await _ruleRep.AsQueryable() .Where(x => x.Id == lease.RuleId) .FirstAsync(); if (rule == null) { return new S8RuleRunResult { Success = false, ErrorMessage = "rule_not_found", Stats = new() }; } var ruleType = rule.RuleType; if (string.IsNullOrWhiteSpace(ruleType)) { // 未分类的旧规则不在新调度路径承载;标 SKIPPED 但不视为失败。 return new S8RuleRunResult { Success = true, ErrorMessage = "rule_type_empty_skipped", Stats = new() }; } IS8RuleEvaluator? evaluator = ruleType switch { S8TimeoutRuleEvaluator.RuleTypeCode => _timeoutEvaluator, S8ShortageRuleEvaluator.RuleTypeCode => _shortageEvaluator, S8OutOfRangeRuleEvaluator.RuleTypeCode => _outOfRangeEvaluator, _ => null }; if (evaluator == null) { return new S8RuleRunResult { Success = false, ErrorMessage = $"unsupported_rule_type:{ruleType}", Stats = new() }; } try { var alertRules = (await _alertRuleRep.AsQueryable() .Where(x => x.TenantId == tenantId && x.FactoryId == factoryId) .ToListAsync()).AsReadOnly(); var results = await ProcessSingleRuleAsync(tenantId, factoryId, rule, ruleType, evaluator, alertRules, lease.RunId); var stats = new S8RuleRunStats { Hits = results.Count, Created = results.Count(r => r.Created), Refreshed = results.Count(r => r.Reason == "duplicate_pending"), Pending = results.Count(r => r.Reason == "antiflap_pending_hit"), Failed = results.Count(r => r.Reason == "create_failed" || r.Reason == "refresh_failed" || r.Reason == "antiflap_failed" || r.Reason == "evaluate_failed") }; return new S8RuleRunResult { Success = true, Stats = stats }; } catch (Exception ex) { return new S8RuleRunResult { Success = false, ErrorMessage = ex.Message, Stats = new() }; } } /// /// S8-SCHED-EXEC-1:单规则处理(evaluator → reconcile → hit 循环)。 /// 与 ProcessRulesByTypeAsync 内单规则循环体语义一致;此处抽出便于新调度路径直接调用单条 rule。 /// private async Task> ProcessSingleRuleAsync( long tenantId, long factoryId, AdoS8WatchRule rule, string ruleType, IS8RuleEvaluator evaluator, IReadOnlyList alertRules, string runId) { var results = new List(); List hits; try { hits = await evaluator.EvaluateAsync(tenantId, factoryId, rule, alertRules); } catch (Exception ex) { var failureReason = ex is S8RuleEvaluatorException sre ? sre.Reason : ex.GetType().Name; await WriteDetectionLogAsync(new AdoS8DetectionLog { TenantId = tenantId, FactoryId = factoryId, RuleId = rule.Id, RuleCode = rule.RuleCode, RuleType = ruleType, SceneCode = rule.SceneCode, SourceObjectType = rule.SourceObjectType, DetectResult = DetectResultEvaluateFailed, DetectedAt = DateTime.Now, FailureReason = failureReason, FailureMessage = Truncate(ex.Message, 1000), RunId = runId, TriggerSource = DetectionTriggerSource }); results.Add(BuildSkipResult(rule, "evaluate_failed", ex.Message)); throw; } List recoveredIds; try { recoveredIds = await ReconcileRecoveriesForRuleAsync(tenantId, factoryId, rule, ruleType, hits, runId); } catch (Exception ex) { _logger.LogWarning(ex, "recovery_reconcile_failed ruleCode={RuleCode} ruleType={RuleType}", rule.RuleCode, ruleType); recoveredIds = new(); } if (hits.Count == 0 && recoveredIds.Count == 0) { await WriteDetectionLogAsync(new AdoS8DetectionLog { TenantId = tenantId, FactoryId = factoryId, RuleId = rule.Id, RuleCode = rule.RuleCode, RuleType = ruleType, SceneCode = rule.SceneCode, SourceObjectType = rule.SourceObjectType, DetectResult = DetectResultNoHit, DetectedAt = DateTime.Now, RunId = runId, TriggerSource = DetectionTriggerSource }); } foreach (var hit in hits) { if (string.IsNullOrWhiteSpace(hit.DedupKey)) { results.Add(BuildSkipResult(rule, "missing_dedup_key", null, hit)); continue; } long matchedId; try { matchedId = await FindOpenExceptionByDedupKeyAsync(tenantId, factoryId, hit.DedupKey); } catch (Exception ex) { results.Add(BuildSkipResult(rule, "query_failed", ex.Message, hit)); continue; } if (matchedId > 0) { try { await RefreshDetectionAsync(matchedId, hit); await WriteDetectionLogAsync(BuildHitLog(tenantId, factoryId, rule, ruleType, hit, DetectResultRefreshed, matchedId, runId)); results.Add(BuildSkippedDuplicate(rule, hit, matchedId)); } catch (Exception ex) { results.Add(BuildSkipResult(rule, "refresh_failed", ex.Message, hit)); } continue; } if (string.Equals(ruleType, S8OutOfRangeRuleEvaluator.RuleTypeCode, StringComparison.OrdinalIgnoreCase)) { long compatId; try { compatId = await FindLegacyOutOfRangeExceptionAsync(tenantId, factoryId, rule.Id, hit.RelatedObjectCode); } catch (Exception ex) { results.Add(BuildSkipResult(rule, "query_failed", ex.Message, hit)); continue; } if (compatId > 0) { try { await BackfillLegacyExceptionAsync(compatId, hit); await WriteDetectionLogAsync(BuildHitLog(tenantId, factoryId, rule, ruleType, hit, DetectResultRefreshed, compatId, runId)); results.Add(BuildSkippedDuplicate(rule, hit, compatId)); } catch (Exception ex) { results.Add(BuildSkipResult(rule, "refresh_failed", ex.Message, hit)); } continue; } } bool typeExists; try { typeExists = await _exceptionTypeRep.AsQueryable() .Where(t => t.TypeCode == hit.ExceptionTypeCode && (t.TenantId == 0 || t.TenantId == tenantId) && (t.FactoryId == 0 || t.FactoryId == factoryId) && t.Enabled) .AnyAsync(); } catch (Exception ex) { results.Add(BuildSkipResult(rule, "query_failed", ex.Message, hit)); continue; } if (!typeExists) { results.Add(BuildSkipResult(rule, "exception_type_missing", null, hit)); continue; } int hitCount; AdoS8RuleDetectionState? state; try { (state, hitCount) = await UpsertDetectionStateOnHitAsync(tenantId, factoryId, rule, hit); } catch (Exception ex) { results.Add(BuildSkipResult(rule, "antiflap_failed", ex.Message, hit)); continue; } var triggerRequired = NormalizeAntiflapCount(rule.TriggerCountRequired); if (hitCount < triggerRequired) { _logger.LogInformation( "antiflap_pending_hit ruleCode={RuleCode} dedupKey={DedupKey} hitCount={HitCount} trigger={Trigger}", rule.RuleCode, hit.DedupKey, hitCount, triggerRequired); results.Add(BuildSkipResult(rule, "antiflap_pending_hit", null, hit)); continue; } try { var entity = await _manualReportService.CreateFromHitAsync(hit); await _exceptionRep.Context.Updateable() .SetColumns(x => new AdoS8Exception { ConsecutiveHitCount = hitCount, ConsecutiveMissCount = 0, UpdatedAt = DateTime.Now }) .Where(x => x.Id == entity.Id) .ExecuteCommandAsync(); if (state != null) { await _detectionStateRep.Context.Updateable() .SetColumns(x => new AdoS8RuleDetectionState { ActiveExceptionId = entity.Id, UpdatedAt = DateTime.Now }) .Where(x => x.Id == state.Id) .ExecuteCommandAsync(); } await WriteDetectionLogAsync(BuildHitLog(tenantId, factoryId, rule, ruleType, hit, DetectResultCreated, entity.Id, runId)); await TryDispatchLayerNotificationAsync(entity); results.Add(BuildCreatedResult(rule, hit, entity.Id)); } catch (Exception ex) { results.Add(BuildSkipResult(rule, "create_failed", ex.Message, hit)); } } return results; } /// /// S8-SCHED-EXEC-1:lease 执行完成回写。 /// 必须 WHERE id = lease.RuleId AND lock_token = lease.LockToken;affectedRows = 0 视为 lease 丢失,记录 Warning,不覆盖状态。 /// 失败 ≥ 阈值(默认 3)写 paused_until = NOW + 1h。 /// public async Task OnRuleCompletedAsync(long tenantId, long factoryId, S8RuleLease lease, S8RuleRunResult result, int durationMs) { var rule = await _ruleRep.AsQueryable().Where(x => x.Id == lease.RuleId).FirstAsync(); if (rule == null) { _logger.LogWarning("lease_complete_rule_missing ruleId={RuleId}", lease.RuleId); return; } var now = DateTime.Now; var effectiveInterval = NormalizePollInterval(rule.PollIntervalSeconds); var nextRunAt = now.AddSeconds(effectiveInterval); int affected; if (result.Success) { affected = await _ruleRep.Context.Updateable() .SetColumns(x => new AdoS8WatchRule { LastRunAt = now, NextRunAt = nextRunAt, LastStatus = "SUCCESS", LastError = null, LastDurationMs = durationMs, ConsecutiveFailureCount = 0, LockToken = null, LockedBy = null, LockUntil = null, RunningStartedAt = null, UpdatedAt = now }) .Where(x => x.Id == lease.RuleId && x.LockToken == lease.LockToken) .ExecuteCommandAsync(); } else { var errorTrunc = Truncate(result.ErrorMessage, 500); var newFailures = rule.ConsecutiveFailureCount + 1; DateTime? pausedUntil = rule.PausedUntil; string? pauseReason = rule.PauseReason; if (newFailures >= AutoPauseFailureThreshold) { pausedUntil = now.AddHours(AutoPauseDurationHours); pauseReason = Truncate($"AUTO_PAUSED_AFTER_{AutoPauseFailureThreshold}_FAILURES: {errorTrunc}", 64); } affected = await _ruleRep.Context.Updateable() .SetColumns(x => new AdoS8WatchRule { LastRunAt = now, NextRunAt = nextRunAt, LastStatus = "FAILED", LastError = errorTrunc, LastDurationMs = durationMs, ConsecutiveFailureCount = x.ConsecutiveFailureCount + 1, PausedUntil = pausedUntil, PauseReason = pauseReason, LockToken = null, LockedBy = null, LockUntil = null, RunningStartedAt = null, UpdatedAt = now }) .Where(x => x.Id == lease.RuleId && x.LockToken == lease.LockToken) .ExecuteCommandAsync(); } if (affected == 0) { _logger.LogWarning( "lease_lost_on_complete ruleId={RuleId} ruleCode={RuleCode} lockToken={LockToken}", lease.RuleId, lease.RuleCode, lease.LockToken); } } /// /// S8-SCHED-EXEC-1:单 tick 完整流程。Job / debug 调度入口。 /// 1) ResetExpiredLeasesAsync /// 2) PickReadyRulesAsync(batchSize) /// 3) 每条 rule 独立 try/catch 调用 RunSingleRuleAsync + OnRuleCompletedAsync /// 单条规则失败不影响其他规则;整 tick 不抛异常。 /// public async Task RunDispatchTickAsync(long tenantId, long factoryId, int batchSize, string lockedBy) { var tickId = Guid.NewGuid().ToString("N").Substring(0, 8); var runId = Guid.NewGuid().ToString("N").Substring(0, 16); var summary = new S8DispatchTickResult { TickId = tickId, RunId = runId }; try { summary.LeaseReleased = await ResetExpiredLeasesAsync(tenantId, factoryId); } catch (Exception ex) { _logger.LogError(ex, "tick_reset_lease_failed tickId={TickId}", tickId); } List leases; try { leases = await PickReadyRulesAsync(tenantId, factoryId, batchSize, lockedBy, runId); } catch (Exception ex) { _logger.LogError(ex, "tick_pick_failed tickId={TickId}", tickId); return summary; } summary.Picked = leases.Count; foreach (var lease in leases) { var sw = System.Diagnostics.Stopwatch.StartNew(); S8RuleRunResult runResult; try { runResult = await RunSingleRuleAsync(tenantId, factoryId, lease); } catch (Exception ex) { runResult = new S8RuleRunResult { Success = false, ErrorMessage = ex.Message, Stats = new() }; } sw.Stop(); try { await OnRuleCompletedAsync(tenantId, factoryId, lease, runResult, (int)sw.ElapsedMilliseconds); } catch (Exception ex) { _logger.LogError(ex, "tick_complete_failed tickId={TickId} ruleId={RuleId} ruleCode={RuleCode}", tickId, lease.RuleId, lease.RuleCode); } _logger.LogInformation( "tick_rule_done tickId={TickId} runId={RunId} ruleId={RuleId} ruleCode={RuleCode} status={Status} durationMs={Dur} hits={Hits} created={Created} refreshed={Refreshed} pending={Pending} failed={Failed} error={Error}", tickId, runId, lease.RuleId, lease.RuleCode, runResult.Success ? "SUCCESS" : "FAILED", sw.ElapsedMilliseconds, runResult.Stats.Hits, runResult.Stats.Created, runResult.Stats.Refreshed, runResult.Stats.Pending, runResult.Stats.Failed, runResult.ErrorMessage); if (runResult.Success) { summary.Success++; summary.Created += runResult.Stats.Created; summary.Refreshed += runResult.Stats.Refreshed; summary.Pending += runResult.Stats.Pending; summary.PerRuleFailed += runResult.Stats.Failed; } else { summary.Failed++; } } return summary; } private static int NormalizePollInterval(int raw) { if (raw < MinPollIntervalSeconds || raw > MaxPollIntervalSeconds) return DefaultPollIntervalSeconds; return raw; } /// /// S8-NOTIFY-WIRE-WATCH-1:异常自动建单成功后,非破坏性挂入通知分层路由。 /// 全程异常隔离:任何异常仅 LogWarning,不抛回主流程,不影响 detection_log / 事务 / /// 异常状态机;该方法独立于 CreateFromWatchAsync / CreateFromHitAsync 的事务边界 /// (两者已 commit 后才返回 entity,故在此调用安全)。 /// sceneCode:先从 entity.SceneCode;空则 fallback 到 "S8_DEMO_DEFAULT"(demo 路径)。 /// severity:直接取 entity.Severity(CreateFromWatchAsync/CreateFromHitAsync 已保证非空)。 /// private async Task TryDispatchLayerNotificationAsync(AdoS8Exception entity) { if (entity == null || entity.Id <= 0) return; try { var sceneCode = string.IsNullOrWhiteSpace(entity.SceneCode) ? "S8_DEMO_DEFAULT" : entity.SceneCode; // S8-SEVERITY-FOLLOW-SERIOUS-STANDARDIZE-EXEC-1:写入前 Normalize,落 FOLLOW/SERIOUS。 var severity = S8SeverityCode.Normalize(entity.Severity); var content = $"异常 {entity.ExceptionCode}:{entity.Title}(场景 {sceneCode},严重度 {severity}" + (string.IsNullOrWhiteSpace(entity.SourceRuleCode) ? "" : $",规则 {entity.SourceRuleCode}") + ")"; await _notificationLayerResolver.DispatchByLayerAsync(new S8NotificationLayerResolver.DispatchByLayerInput { TenantId = entity.TenantId, FactoryId = entity.FactoryId, ExceptionId = entity.Id, ExceptionNo = entity.ExceptionCode, SceneCode = sceneCode, Severity = severity, Title = entity.Title ?? string.Empty, Content = content, Status = entity.Status, SourceRuleCode = entity.SourceRuleCode, JumpUrl = $"/aidop/s8/exceptions/{entity.Id}", }); } catch (Exception ex) { _logger.LogWarning(ex, "notify_dispatch_throw exceptionId={ExceptionId}", entity.Id); } } /// /// S8-NOTIFY-WIRE-RECOVERED-1:异常恢复(recovered_at 已写入、RECOVERED detection_log 已落库)后, /// 非破坏性挂入分层通知。全程异常隔离:任何异常仅 LogWarning,不抛回主流程,不影响 detection_log / /// 状态机;call site 已在事务边界外(recovered 路径无事务)。 /// 重新读取 entity 拿场景/严重度/编号/状态/规则代码(恢复事件相对低频,1 次额外读可接受)。 /// private async Task TryDispatchRecoveredLayerNotificationAsync(long exceptionId) { if (exceptionId <= 0) return; try { var entity = await _exceptionRep.GetByIdAsync(exceptionId); if (entity == null) { _logger.LogWarning("notify_recovered_dispatch_entity_missing exceptionId={ExceptionId}", exceptionId); return; } var sceneCode = string.IsNullOrWhiteSpace(entity.SceneCode) ? "S8_DEMO_DEFAULT" : entity.SceneCode; // S8-SEVERITY-FOLLOW-SERIOUS-STANDARDIZE-EXEC-1:写入前 Normalize,落 FOLLOW/SERIOUS。 var severity = S8SeverityCode.Normalize(entity.Severity); var title = $"【已恢复】{entity.ExceptionCode}"; var content = $"异常 {entity.ExceptionCode} 已恢复,场景 {sceneCode},严重度 {severity}" + (string.IsNullOrWhiteSpace(entity.SourceRuleCode) ? "" : $",规则 {entity.SourceRuleCode}"); await _notificationLayerResolver.DispatchByLayerAsync(new S8NotificationLayerResolver.DispatchByLayerInput { TenantId = entity.TenantId, FactoryId = entity.FactoryId, ExceptionId = entity.Id, ExceptionNo = entity.ExceptionCode, SceneCode = sceneCode, Severity = severity, Title = title, Content = content, Status = entity.Status, SourceRuleCode = entity.SourceRuleCode, JumpUrl = $"/aidop/s8/exceptions/{entity.Id}", Recovered = true, }); } catch (Exception ex) { _logger.LogWarning(ex, "notify_recovered_dispatch_throw exceptionId={ExceptionId}", exceptionId); } } } public sealed class S8WatchExecutionRule { public long WatchRuleId { get; set; } public string WatchRuleCode { get; set; } = string.Empty; public string SceneCode { get; set; } = string.Empty; public string TriggerType { get; set; } = string.Empty; public string WatchObjectType { get; set; } = string.Empty; public long DataSourceId { get; set; } public string DataSourceCode { get; set; } = string.Empty; public string DataSourceType { get; set; } = string.Empty; public string DataSourceConnection { get; set; } = string.Empty; public string QueryExpression { get; set; } = string.Empty; public int PollIntervalSeconds { get; set; } public long AlertRuleId { get; set; } public string AlertRuleCode { get; set; } = string.Empty; public string TriggerCondition { get; set; } = string.Empty; public string ThresholdValue { get; set; } = string.Empty; public string Severity { get; set; } = string.Empty; } public sealed class S8WatchDeviceQueryResult { public long WatchRuleId { get; set; } public string WatchRuleCode { get; set; } = string.Empty; public bool Success { get; set; } public string? FailureReason { get; set; } public List Rows { get; set; } = new(); public static S8WatchDeviceQueryResult Ok(S8WatchExecutionRule rule, List rows) => new() { WatchRuleId = rule.WatchRuleId, WatchRuleCode = rule.WatchRuleCode, Success = true, Rows = rows }; public static S8WatchDeviceQueryResult Fail(S8WatchExecutionRule rule, string reason) => new() { WatchRuleId = rule.WatchRuleId, WatchRuleCode = rule.WatchRuleCode, Success = false, FailureReason = reason }; } public sealed class S8WatchDeviceRow { public string RelatedObjectCode { get; set; } = string.Empty; public decimal? CurrentValue { get; set; } public long? OccurrenceDeptId { get; set; } public long? ResponsibleDeptId { get; set; } public string SourcePayload { get; set; } = string.Empty; } /// /// G01-05 去重结果对象。仅服务 G01-06 建单前拦截,由 CanCreate 单决策位决定是否建单。 /// 只服务首版唯一场景 S2(迁移后由 S2S6_PRODUCTION 切到单模块 S2)+ 唯一 trigger_type VALUE_DEVIATION + 设备对象。 /// 不预留多 trigger_type / 平台化去重扩展结构。 /// Reason 值域:no_pending / duplicate_pending / missing_dedup_key / query_failed。 /// public sealed class S8WatchDedupResult { public S8WatchHitResult Hit { get; set; } = new(); public bool CanCreate { get; set; } public long? MatchedExceptionId { get; set; } public string Reason { get; set; } = string.Empty; } /// /// G01-06 建单结果对象。仅服务 G-01 首版主线验收,由 Created / Skipped 两位决定结局。 /// 只服务首版唯一场景 S2(迁移后由 S2S6_PRODUCTION 切到单模块 S2)+ 唯一 trigger_type VALUE_DEVIATION + 设备对象。 /// 不预留多 trigger_type / 平台化工单扩展结构。 /// Reason 值域:auto_created / create_failed / 透传自 DedupResult.Reason。 /// public sealed class S8WatchCreationResult { public S8WatchDedupResult DedupResult { get; set; } = new(); public bool Created { get; set; } public bool Skipped { get; set; } public long? CreatedExceptionId { get; set; } public string Reason { get; set; } = string.Empty; public string? ErrorMessage { get; set; } } /// /// G01-04 命中结果对象。承载 G01-05 去重与 G01-06 建单所需最小追溯字段, /// 仅服务首版唯一场景 S2(迁移后由 S2S6_PRODUCTION 切到单模块 S2)+ 唯一 trigger_type VALUE_DEVIATION + 设备对象。 /// 不预留多 trigger_type / 多场景 / 平台化扩展结构。 /// public sealed class S8WatchHitResult { public long SourceRuleId { get; set; } public string SourceRuleCode { get; set; } = string.Empty; public long AlertRuleId { get; set; } public long DataSourceId { get; set; } public string RelatedObjectCode { get; set; } = string.Empty; public decimal CurrentValue { get; set; } public decimal ThresholdValue { get; set; } public string TriggerCondition { get; set; } = string.Empty; public string Severity { get; set; } = string.Empty; public long? OccurrenceDeptId { get; set; } public long? ResponsibleDeptId { get; set; } public string SourcePayload { get; set; } = string.Empty; } /// S8-SCHED-EXEC-1:lease 抢占成功后传递的最小标识对象。 public sealed class S8RuleLease { public long RuleId { get; set; } public string RuleCode { get; set; } = string.Empty; public string? RuleType { get; set; } public string LockToken { get; set; } = string.Empty; public string LockedBy { get; set; } = string.Empty; public DateTime LockUntil { get; set; } public string RunId { get; set; } = string.Empty; public DateTime AcquiredAt { get; set; } } /// S8-SCHED-EXEC-1:单条规则执行结果,OnRuleCompletedAsync 据此更新状态。 public sealed class S8RuleRunResult { public bool Success { get; set; } public string? ErrorMessage { get; set; } public S8RuleRunStats Stats { get; set; } = new(); } public sealed class S8RuleRunStats { public int Hits { get; set; } public int Created { get; set; } public int Refreshed { get; set; } public int Pending { get; set; } public int Failed { get; set; } } /// S8-SCHED-EXEC-1:单 tick 调度结果聚合。 public sealed class S8DispatchTickResult { public string TickId { get; set; } = string.Empty; public string RunId { get; set; } = string.Empty; public int LeaseReleased { get; set; } public int Picked { get; set; } public int Success { get; set; } public int Failed { get; set; } public int Created { get; set; } public int Refreshed { get; set; } public int Pending { get; set; } public int PerRuleFailed { get; set; } }