Эх сурвалжийг харах

refactor(s8): collapse legacy watch scheduler path

YY968XX 2 долоо хоног өмнө
parent
commit
d06acc4805

+ 87 - 186
server/Plugins/Admin.NET.Plugin.AiDOP/Service/S8/S8WatchSchedulerService.cs

@@ -437,16 +437,23 @@ public class S8WatchSchedulerService : ITransient
         ProcessRulesByTypeAsync(tenantId, factoryId, _outOfRangeEvaluator, S8OutOfRangeRuleEvaluator.RuleTypeCode, Guid.NewGuid().ToString("N").Substring(0, 16));
 
     /// <summary>
-    /// R2/R3 通用规则主链:装载 enabled WatchRule.RuleType=ruleType → evaluator → dedup_key 去重 → 建单/刷新。
-    /// dedup 命中:UPDATE last_detected_at + source_payload,不重复建单;
-    /// dedup 未命中:校验 ExceptionTypeCode 是否在 baseline(tenant=0/factory=0 全局或本租户工厂),缺则跳过;
-    /// 通过 → S8ManualReportService.CreateFromHitAsync 落标准 AdoS8Exception,新列全部回填。
-    /// 不做 SLA 升级、不做事件触发、不做 RecoveredAt。
+    /// R2/R3 通用规则主链。S8-SCHED-CLEANUP-LEGACY-PATH-1:本方法已收敛为 thin wrapper,
+    /// 单规则处理(evaluator → reconcile → hit 循环 → CREATED/REFRESHED/NO_HIT/EVALUATE_FAILED 日志)
+    /// 全部下沉至 <see cref="ProcessSingleRuleAsync"/>,与 Job tick 路径
+    /// (<see cref="RunDispatchTickAsync"/> → <see cref="RunSingleRuleAsync"/> → ProcessSingleRuleAsync)
+    /// 共享同一份逻辑,避免双维护。
+    ///
+    /// 调用方仅有 <see cref="CreateExceptionsAsync"/>(debug run-once / Process*RulesAsync 公共薄包装)。
+    /// debug run-once 不持 lease(保留"手动立即跑"语义),但本方法在每条规则结束时通过
+    /// <see cref="ApplyRunOnceCompletionAsync"/> 同步更新 watch_rule.last_run_at / last_status /
+    /// last_error / last_duration_ms / last_run_id / consecutive_failure_count,缩小 run-once 与
+    /// Job tick 之间的 last_* 状态分裂;不动 lock_token / running_started_at / paused_until,避免
+    /// 与正在持锁运行的 Job 撕扯。
     /// </summary>
     private async Task<List<S8WatchCreationResult>> ProcessRulesByTypeAsync(
         long tenantId, long factoryId, IS8RuleEvaluator evaluator, string ruleType, string runId)
     {
-        var results = new List<S8WatchCreationResult>();
+        var aggregate = new List<S8WatchCreationResult>();
 
         var rules = await _ruleRep.AsQueryable()
             .Where(x => x.TenantId == tenantId
@@ -454,7 +461,7 @@ public class S8WatchSchedulerService : ITransient
                         && x.Enabled
                         && x.RuleType == ruleType)
             .ToListAsync();
-        if (rules.Count == 0) return results;
+        if (rules.Count == 0) return aggregate;
 
         var alertRules = (await _alertRuleRep.AsQueryable()
             .Where(x => x.TenantId == tenantId && x.FactoryId == factoryId)
@@ -462,203 +469,97 @@ public class S8WatchSchedulerService : ITransient
 
         foreach (var rule in rules.OrderBy(x => x.Id))
         {
-            List<S8RuleHit> hits;
+            var sw = System.Diagnostics.Stopwatch.StartNew();
+            S8RuleRunResult completion;
             try
             {
-                hits = await evaluator.EvaluateAsync(tenantId, factoryId, rule, alertRules);
+                var ruleResults = await ProcessSingleRuleAsync(tenantId, factoryId, rule, ruleType, evaluator, alertRules, runId);
+                aggregate.AddRange(ruleResults);
+                completion = new S8RuleRunResult
+                {
+                    Success = true,
+                    Stats = new S8RuleRunStats
+                    {
+                        Hits = ruleResults.Count,
+                        Created = ruleResults.Count(r => r.Created),
+                        Refreshed = ruleResults.Count(r => r.Reason == "duplicate_pending"),
+                        Pending = ruleResults.Count(r => r.Reason == "antiflap_pending_hit"),
+                        Failed = ruleResults.Count(r => r.Reason == "create_failed" || r.Reason == "refresh_failed" || r.Reason == "antiflap_failed" || r.Reason == "evaluate_failed")
+                    }
+                };
             }
             catch (Exception ex)
             {
-                // R6 EVALUATE_FAILED 日志:evaluator 抛 S8RuleEvaluatorException 走这里,包含 reason+message。
-                var failureReason = ex is S8RuleEvaluatorException sre ? sre.Reason : ex.GetType().Name;
-                await WriteDetectionLogAsync(new AdoS8DetectionLog
-                {
-                    TenantId = tenantId, FactoryId = factoryId,
-                    RuleId = rule.Id, RuleCode = rule.RuleCode, RuleType = ruleType, SceneCode = rule.SceneCode,
-                    SourceObjectType = rule.SourceObjectType,
-                    DetectResult = DetectResultEvaluateFailed,
-                    DetectedAt = DateTime.Now,
-                    FailureReason = failureReason,
-                    FailureMessage = Truncate(ex.Message, 1000),
-                    RunId = runId, TriggerSource = DetectionTriggerSource
-                });
-                results.Add(BuildSkipResult(rule, "evaluate_failed", ex.Message));
-                continue;
+                // ProcessSingleRuleAsync 在 evaluator 抛 S8RuleEvaluatorException 时已写 EVALUATE_FAILED
+                // detection_log 后再 throw(供 RunSingleRuleAsync 标 Success=false)。本路径无 lease,
+                // 吞异常以保留"逐规则失败不影响其他规则"的旧 ProcessRulesByTypeAsync 语义。
+                _logger.LogWarning(ex, "process_rule_failed ruleCode={RuleCode} ruleType={RuleType}", rule.RuleCode, ruleType);
+                aggregate.Add(BuildSkipResult(rule, "evaluate_failed", ex.Message));
+                completion = new S8RuleRunResult { Success = false, ErrorMessage = ex.Message, Stats = new() };
             }
+            sw.Stop();
 
-            // R5-RECOVERY-MINIMAL-1:evaluator 成功执行后调和 recovered_at。
-            // 仅在 evaluator 成功(throw 不会到这里)时才能判定"未命中"。仅写 recovered_at + updated_at,
-            // 绝不动 status / assignee / verifier / source_payload / last_detected_at。
-            // R6 reconcile 返回 recoveredIds 用于决定是否写 NO_HIT。
-            List<long> recoveredIds = new();
             try
             {
-                recoveredIds = await ReconcileRecoveriesForRuleAsync(tenantId, factoryId, rule, ruleType, hits, runId);
+                await ApplyRunOnceCompletionAsync(rule, completion, (int)sw.ElapsedMilliseconds, runId);
             }
             catch (Exception ex)
             {
-                _logger.LogWarning(ex, "recovery_reconcile_failed ruleCode={RuleCode} ruleType={RuleType}", rule.RuleCode, ruleType);
-            }
-
-            // R6 NO_HIT 日志:evaluator 成功且无 hit、且本轮 reconcile 未写任何 RECOVERED → 每条 rule 一条 NO_HIT。
-            if (hits.Count == 0 && recoveredIds.Count == 0)
-            {
-                await WriteDetectionLogAsync(new AdoS8DetectionLog
-                {
-                    TenantId = tenantId, FactoryId = factoryId,
-                    RuleId = rule.Id, RuleCode = rule.RuleCode, RuleType = ruleType, SceneCode = rule.SceneCode,
-                    SourceObjectType = rule.SourceObjectType,
-                    DetectResult = DetectResultNoHit,
-                    DetectedAt = DateTime.Now,
-                    RunId = runId, TriggerSource = DetectionTriggerSource
-                });
+                _logger.LogWarning(ex, "run_once_completion_write_failed ruleCode={RuleCode} ruleType={RuleType}", rule.RuleCode, ruleType);
             }
+        }
 
-            foreach (var hit in hits)
-            {
-                if (string.IsNullOrWhiteSpace(hit.DedupKey))
-                {
-                    results.Add(BuildSkipResult(rule, "missing_dedup_key", null, hit));
-                    continue;
-                }
-
-                long matchedId;
-                try
-                {
-                    matchedId = await FindOpenExceptionByDedupKeyAsync(tenantId, factoryId, hit.DedupKey);
-                }
-                catch (Exception ex)
-                {
-                    results.Add(BuildSkipResult(rule, "query_failed", ex.Message, hit));
-                    continue;
-                }
-
-                if (matchedId > 0)
-                {
-                    try
-                    {
-                        await RefreshDetectionAsync(matchedId, hit);
-                        await WriteDetectionLogAsync(BuildHitLog(tenantId, factoryId, rule, ruleType, hit, DetectResultRefreshed, matchedId, runId));
-                        results.Add(BuildSkippedDuplicate(rule, hit, matchedId));
-                    }
-                    catch (Exception ex)
-                    {
-                        results.Add(BuildSkipResult(rule, "refresh_failed", ex.Message, hit));
-                    }
-                    continue;
-                }
-
-                // R3-OUT_OF_RANGE-REWRITE-1 compat fallback:dedup_key 未命中且 ruleType=OUT_OF_RANGE 时,
-                // 尝试匹配历史 dedup_key=NULL 的旧 AlertRule 主链记录(如 id=34),命中则 backfill 6 列,避免重复建单。
-                if (string.Equals(ruleType, S8OutOfRangeRuleEvaluator.RuleTypeCode, StringComparison.OrdinalIgnoreCase))
-                {
-                    long compatId;
-                    try
-                    {
-                        compatId = await FindLegacyOutOfRangeExceptionAsync(tenantId, factoryId, rule.Id, hit.RelatedObjectCode);
-                    }
-                    catch (Exception ex)
-                    {
-                        results.Add(BuildSkipResult(rule, "query_failed", ex.Message, hit));
-                        continue;
-                    }
-                    if (compatId > 0)
-                    {
-                        try
-                        {
-                            await BackfillLegacyExceptionAsync(compatId, hit);
-                            await WriteDetectionLogAsync(BuildHitLog(tenantId, factoryId, rule, ruleType, hit, DetectResultRefreshed, compatId, runId));
-                            results.Add(BuildSkippedDuplicate(rule, hit, compatId));
-                        }
-                        catch (Exception ex)
-                        {
-                            results.Add(BuildSkipResult(rule, "refresh_failed", ex.Message, hit));
-                        }
-                        continue;
-                    }
-                }
-
-                bool typeExists;
-                try
-                {
-                    typeExists = await _exceptionTypeRep.AsQueryable()
-                        .Where(t => t.TypeCode == hit.ExceptionTypeCode
-                                    && (t.TenantId == 0 || t.TenantId == tenantId)
-                                    && (t.FactoryId == 0 || t.FactoryId == factoryId)
-                                    && t.Enabled)
-                        .AnyAsync();
-                }
-                catch (Exception ex)
-                {
-                    results.Add(BuildSkipResult(rule, "query_failed", ex.Message, hit));
-                    continue;
-                }
-
-                if (!typeExists)
-                {
-                    results.Add(BuildSkipResult(rule, "exception_type_missing", null, hit));
-                    continue;
-                }
-
-                // S8-SCHED-EXEC-1:建单前抗抖累计。
-                // 累计落 ado_s8_rule_detection_state;hitCount < trigger_count_required 时 pending、不建单、不写 CREATED 日志。
-                int hitCount;
-                AdoS8RuleDetectionState? state;
-                try
-                {
-                    (state, hitCount) = await UpsertDetectionStateOnHitAsync(tenantId, factoryId, rule, hit);
-                }
-                catch (Exception ex)
-                {
-                    results.Add(BuildSkipResult(rule, "antiflap_failed", ex.Message, hit));
-                    continue;
-                }
-
-                var triggerRequired = NormalizeAntiflapCount(rule.TriggerCountRequired);
-                if (hitCount < triggerRequired)
-                {
-                    _logger.LogInformation(
-                        "antiflap_pending_hit ruleCode={RuleCode} dedupKey={DedupKey} hitCount={HitCount} trigger={Trigger}",
-                        rule.RuleCode, hit.DedupKey, hitCount, triggerRequired);
-                    results.Add(BuildSkipResult(rule, "antiflap_pending_hit", null, hit));
-                    continue;
-                }
+        return aggregate;
+    }
 
-                try
+    /// <summary>
+    /// S8-SCHED-CLEANUP-LEGACY-PATH-1:debug run-once 的 last_* 状态回写(无 lease 版)。
+    /// 与 <see cref="OnRuleCompletedAsync"/> 的语义平行,差异在于:
+    /// - 不要求 lock_token 匹配(run-once 不持 lease)
+    /// - 不写 lock_token / locked_by / lock_until / running_started_at(不与 Job lease 撕扯)
+    /// - 不做 consecutive_failure_count 阈值的 paused_until 自动暂停(debug 路径不应自动暂停 demo rule)
+    /// 写入:last_run_at / next_run_at / last_status / last_error / last_duration_ms / last_run_id /
+    ///       consecutive_failure_count / updated_at。
+    /// </summary>
+    private async Task ApplyRunOnceCompletionAsync(AdoS8WatchRule rule, S8RuleRunResult result, int durationMs, string runId)
+    {
+        var now = DateTime.Now;
+        var nextRunAt = now.AddSeconds(NormalizePollInterval(rule.PollIntervalSeconds));
+        if (result.Success)
+        {
+            await _ruleRep.Context.Updateable<AdoS8WatchRule>()
+                .SetColumns(x => new AdoS8WatchRule
                 {
-                    var entity = await _manualReportService.CreateFromHitAsync(hit);
-                    // 新建异常的抗抖累计与触发计数对齐,便于后续 reconcile 解释累计来源。
-                    await _exceptionRep.Context.Updateable<AdoS8Exception>()
-                        .SetColumns(x => new AdoS8Exception
-                        {
-                            ConsecutiveHitCount = hitCount,
-                            ConsecutiveMissCount = 0,
-                            UpdatedAt = DateTime.Now
-                        })
-                        .Where(x => x.Id == entity.Id)
-                        .ExecuteCommandAsync();
-                    if (state != null)
-                    {
-                        await _detectionStateRep.Context.Updateable<AdoS8RuleDetectionState>()
-                            .SetColumns(x => new AdoS8RuleDetectionState
-                            {
-                                ActiveExceptionId = entity.Id,
-                                UpdatedAt = DateTime.Now
-                            })
-                            .Where(x => x.Id == state.Id)
-                            .ExecuteCommandAsync();
-                    }
-                    await WriteDetectionLogAsync(BuildHitLog(tenantId, factoryId, rule, ruleType, hit, DetectResultCreated, entity.Id, runId));
-                    results.Add(BuildCreatedResult(rule, hit, entity.Id));
-                }
-                catch (Exception ex)
+                    LastRunAt = now,
+                    NextRunAt = nextRunAt,
+                    LastStatus = "SUCCESS",
+                    LastError = null,
+                    LastDurationMs = durationMs,
+                    LastRunId = runId,
+                    ConsecutiveFailureCount = 0,
+                    UpdatedAt = now
+                })
+                .Where(x => x.Id == rule.Id)
+                .ExecuteCommandAsync();
+        }
+        else
+        {
+            var errorTrunc = Truncate(result.ErrorMessage, 500);
+            await _ruleRep.Context.Updateable<AdoS8WatchRule>()
+                .SetColumns(x => new AdoS8WatchRule
                 {
-                    results.Add(BuildSkipResult(rule, "create_failed", ex.Message, hit));
-                }
-            }
+                    LastRunAt = now,
+                    NextRunAt = nextRunAt,
+                    LastStatus = "FAILED",
+                    LastError = errorTrunc,
+                    LastDurationMs = durationMs,
+                    LastRunId = runId,
+                    ConsecutiveFailureCount = x.ConsecutiveFailureCount + 1,
+                    UpdatedAt = now
+                })
+                .Where(x => x.Id == rule.Id)
+                .ExecuteCommandAsync();
         }
-
-        return results;
     }
 
     // S8-SCHED-EXEC-1:trigger / recover 抗抖计数兜底,null / <1 / >10 一律按 1,避免非法配置导致永远不建单 / 永远不恢复。