using Admin.NET.Plugin.AiDOP.Entity.S8; using Admin.NET.Plugin.AiDOP.Infrastructure.S8; using Admin.NET.Plugin.AiDOP.Service.S8.Rules; using Microsoft.Extensions.Logging; using SqlSugar; using System.Data; using System.Globalization; using System.Text.Json; namespace Admin.NET.Plugin.AiDOP.Service.S8; /// /// 监视规则轮询调度服务(首轮存根)。 /// 后续接入 Admin.NET 定时任务机制后,由调度器周期调用 , /// 按各规则的 PollIntervalSeconds 逐条评估并生成异常记录。 /// public class S8WatchSchedulerService : ITransient { private readonly SqlSugarRepository _ruleRep; private readonly SqlSugarRepository _alertRuleRep; private readonly SqlSugarRepository _dataSourceRep; private readonly SqlSugarRepository _exceptionRep; private readonly SqlSugarRepository _exceptionTypeRep; private readonly S8NotificationService _notificationService; private readonly S8ManualReportService _manualReportService; private readonly S8TimeoutRuleEvaluator _timeoutEvaluator; private readonly S8ShortageRuleEvaluator _shortageEvaluator; private readonly S8OutOfRangeRuleEvaluator _outOfRangeEvaluator; private readonly ILogger _logger; private readonly SqlSugarRepository _detectionLogRep; private const string DetectionTriggerSource = "WATCH_SCHEDULER"; private const string DetectResultCreated = "CREATED"; private const string DetectResultRefreshed = "REFRESHED"; private const string DetectResultRecovered = "RECOVERED"; private const string DetectResultNoHit = "NO_HIT"; private const string DetectResultEvaluateFailed = "EVALUATE_FAILED"; private const string DefaultTriggerType = "VALUE_DEVIATION"; private const string SqlDataSourceType = "SQL"; // G01-05 未闭环状态集合:复用自 S8ExceptionService 当前 pendingStatuses 事实口径 // (见 S8ExceptionService.GetPagedAsync 中 pendingStatuses 的定义,两处必须保持一致)。 // 这不是“自定义未闭环集合”;若现有口径调整,两处需同步修改。 private static readonly string[] UnclosedExceptionStatuses = { "NEW", "ASSIGNED", "IN_PROGRESS", "PENDING_VERIFICATION" }; public S8WatchSchedulerService( SqlSugarRepository ruleRep, SqlSugarRepository alertRuleRep, SqlSugarRepository dataSourceRep, SqlSugarRepository exceptionRep, SqlSugarRepository exceptionTypeRep, S8NotificationService notificationService, S8ManualReportService manualReportService, S8TimeoutRuleEvaluator timeoutEvaluator, S8ShortageRuleEvaluator shortageEvaluator, S8OutOfRangeRuleEvaluator outOfRangeEvaluator, ILogger logger, SqlSugarRepository detectionLogRep) { _ruleRep = ruleRep; _alertRuleRep = alertRuleRep; _dataSourceRep = dataSourceRep; _exceptionRep = exceptionRep; _exceptionTypeRep = exceptionTypeRep; _notificationService = notificationService; _manualReportService = manualReportService; _timeoutEvaluator = timeoutEvaluator; _shortageEvaluator = shortageEvaluator; _outOfRangeEvaluator = outOfRangeEvaluator; _logger = logger; _detectionLogRep = detectionLogRep; } public async Task> LoadExecutionRulesAsync(long tenantId, long factoryId) { // R3 OUT_OF_RANGE 重写后,新三类(OUT_OF_RANGE/TIMEOUT/SHORTAGE)改走 ProcessRulesByTypeAsync。 // 旧 AlertRule 兼容主链此处只装载 RuleType 为空/未分类的历史规则,避免新旧双跑导致重复建单。 var watchRules = await _ruleRep.AsQueryable() .Where(x => x.TenantId == tenantId && x.FactoryId == factoryId && x.Enabled && x.SceneCode == S8SceneCode.S2S6Production && (x.RuleType == null || x.RuleType == "")) .ToListAsync(); var deviceRules = watchRules .Where(x => IsDeviceWatchObjectType(x.WatchObjectType)) .ToList(); if (deviceRules.Count == 0) return new(); var dataSourceIds = deviceRules .Select(x => x.DataSourceId) .Distinct() .ToList(); var dataSources = await _dataSourceRep.AsQueryable() .Where(x => x.TenantId == tenantId && x.FactoryId == factoryId && x.Enabled && dataSourceIds.Contains(x.Id)) .ToListAsync(); var dataSourceMap = dataSources.ToDictionary(x => x.Id); if (dataSourceMap.Count == 0) return new(); var alertRules = (await _alertRuleRep.AsQueryable() .Where(x => x.TenantId == tenantId && x.FactoryId == factoryId && x.SceneCode == S8SceneCode.S2S6Production) .ToListAsync()) .Where(IsSupportedAlertRule) .ToList(); // G-01 首版 AlertRule 冲突口径(C 收口): // 当前场景存在多条可运行 AlertRule 时,视为“当前规则配置冲突”并跳过该规则, // 不按“首条”继续运行,也不扩大为“整场景停摆”。 // 当前模型下所有 device watchRule 共享同场景 AlertRule,故冲突态下所有 device 规则均跳过, // 但此处按“逐规则跳过”的语义实现,避免被误读为“整场景 return empty 停摆”。 var alertRule = alertRules.Count == 1 ? alertRules[0] : null; var executionRules = new List(); foreach (var watchRule in deviceRules.OrderBy(x => x.Id)) { // 配置冲突:当前规则跳过(不停摆其他规则)。 if (alertRule == null) continue; if (!dataSourceMap.TryGetValue(watchRule.DataSourceId, out var dataSource)) continue; if (!IsSupportedSqlDataSource(dataSource)) continue; executionRules.Add(new S8WatchExecutionRule { WatchRuleId = watchRule.Id, WatchRuleCode = watchRule.RuleCode, SceneCode = watchRule.SceneCode, TriggerType = DefaultTriggerType, WatchObjectType = watchRule.WatchObjectType.Trim(), DataSourceId = dataSource.Id, DataSourceCode = dataSource.DataSourceCode, DataSourceType = dataSource.Type, DataSourceConnection = dataSource.Endpoint?.Trim() ?? string.Empty, QueryExpression = watchRule.Expression?.Trim() ?? string.Empty, PollIntervalSeconds = watchRule.PollIntervalSeconds, AlertRuleId = alertRule.Id, AlertRuleCode = alertRule.RuleCode, TriggerCondition = alertRule.TriggerCondition!.Trim(), ThresholdValue = alertRule.ThresholdVal!.Trim(), Severity = alertRule.Severity }); } return executionRules; } public async Task> QueryDeviceRowsAsync(long tenantId, long factoryId) { var executionRules = await LoadExecutionRulesAsync(tenantId, factoryId); var results = new List(); foreach (var rule in executionRules) results.Add(await QueryDeviceRowsAsync(rule)); return results; } public async Task QueryDeviceRowsAsync(S8WatchExecutionRule rule) { if (!string.Equals(rule.DataSourceType, SqlDataSourceType, StringComparison.OrdinalIgnoreCase)) return S8WatchDeviceQueryResult.Fail(rule, "数据源类型不是 SQL,已跳过"); if (string.IsNullOrWhiteSpace(rule.QueryExpression)) return S8WatchDeviceQueryResult.Fail(rule, "查询表达式为空,已跳过"); try { using var db = CreateSqlQueryScope(rule.DataSourceConnection); var table = await db.Ado.GetDataTableAsync(rule.QueryExpression); if (!HasRequiredColumns(table)) return S8WatchDeviceQueryResult.Fail(rule, "查询结果缺少 required columns: related_object_code/current_value"); var rows = table.Rows.Cast() .Select(MapDeviceRow) .Where(x => !string.IsNullOrWhiteSpace(x.RelatedObjectCode)) .ToList(); return S8WatchDeviceQueryResult.Ok(rule, rows); } catch (Exception ex) { return S8WatchDeviceQueryResult.Fail(rule, $"查询执行失败: {ex.Message}"); } } /// /// G01-04:基于设备级结果行集做首版 VALUE_DEVIATION 单阈值判定, /// 产出命中结果对象列表,供 G01-05 去重与 G01-06 建单消费。 /// 本方法不做去重、不做建单、不做严重度重算、不做时间线。 /// public async Task> EvaluateHitsAsync(long tenantId, long factoryId) { var executionRules = await LoadExecutionRulesAsync(tenantId, factoryId); var ruleMap = executionRules.ToDictionary(x => x.WatchRuleId); var queryResults = new List(); foreach (var rule in executionRules) queryResults.Add(await QueryDeviceRowsAsync(rule)); var hits = new List(); foreach (var queryResult in queryResults) { // G01-03 查询失败:跳过,不进入判定。 if (!queryResult.Success) continue; if (!ruleMap.TryGetValue(queryResult.WatchRuleId, out var rule)) continue; // 判定参数缺失:跳过当前规则。 if (string.IsNullOrWhiteSpace(rule.TriggerCondition) || string.IsNullOrWhiteSpace(rule.ThresholdValue)) continue; // 比较符非法:跳过当前规则。 var op = TryParseTriggerCondition(rule.TriggerCondition); if (op == null) continue; // ThresholdValue 非法:跳过当前规则。 if (!TryParseDecimal(rule.ThresholdValue, out var threshold)) continue; foreach (var row in queryResult.Rows) { // CurrentValue 非法(null / 无法解析数值):跳过当前行,不进入判定。 if (row.CurrentValue == null) continue; // 未命中:不进入后续链路。 if (!EvaluateHit(row.CurrentValue.Value, op, threshold)) continue; hits.Add(new S8WatchHitResult { SourceRuleId = rule.WatchRuleId, SourceRuleCode = rule.WatchRuleCode, AlertRuleId = rule.AlertRuleId, DataSourceId = rule.DataSourceId, RelatedObjectCode = row.RelatedObjectCode, CurrentValue = row.CurrentValue.Value, ThresholdValue = threshold, TriggerCondition = op, Severity = rule.Severity, OccurrenceDeptId = row.OccurrenceDeptId, ResponsibleDeptId = row.ResponsibleDeptId, SourcePayload = row.SourcePayload }); } } return hits; } /// /// G01-05:未闭环异常去重最小实现。 /// 消费 G01-04 产出的 ,按 (SourceRuleId + RelatedObjectCode) /// 在未闭环状态集合内判重,只回答“是否允许建单”。 /// 首版明确不做:原单刷新 / 时间线追加 / payload 更新 / 次数累计 / 严重度重算 / 状态修复。 /// public async Task> EvaluateDedupAsync(long tenantId, long factoryId) { var hits = await EvaluateHitsAsync(tenantId, factoryId); var results = new List(hits.Count); foreach (var hit in hits) { // 防御性分支:正常情况下 SourceRuleId 与 RelatedObjectCode 已由上游 // (G01-02 规则装配 + G01-03 查询结果列校验)保证;此处仅作兜底, // 不是首版正常路径。 if (hit.SourceRuleId <= 0 || string.IsNullOrWhiteSpace(hit.RelatedObjectCode)) { results.Add(new S8WatchDedupResult { Hit = hit, CanCreate = false, MatchedExceptionId = null, Reason = "missing_dedup_key" }); continue; } long matchedId; try { matchedId = await FindPendingExceptionIdAsync( tenantId, factoryId, hit.SourceRuleId, hit.RelatedObjectCode); } catch { // 去重查询失败:首版偏保守,宁可阻止也不重复建单,不扩展为补偿。 results.Add(new S8WatchDedupResult { Hit = hit, CanCreate = false, MatchedExceptionId = null, Reason = "query_failed" }); continue; } if (matchedId > 0) { // 命中已有未闭环异常 → 阻止建单。 results.Add(new S8WatchDedupResult { Hit = hit, CanCreate = false, MatchedExceptionId = matchedId, Reason = "duplicate_pending" }); } else { // 未命中 → 允许建单,交 G01-06 消费。 results.Add(new S8WatchDedupResult { Hit = hit, CanCreate = true, MatchedExceptionId = null, Reason = "no_pending" }); } } return results; } // 取任意一条匹配的未闭环异常 Id 作为“是否存在重复单”的拦截依据。 // 首版只需要“存在性”,不关心“最早 / 最新”;不在 G01-05 处理排序语义。 private async Task FindPendingExceptionIdAsync( long tenantId, long factoryId, long sourceRuleId, string relatedObjectCode) { // SqlSugar 表达式翻译要求 Contains 数组变量必须可访问;此处将类级常量 // 承接到方法内局部变量,仅为表达式翻译服务,值与 UnclosedExceptionStatuses 一致。 var statuses = UnclosedExceptionStatuses; var ids = await _exceptionRep.AsQueryable() .Where(x => x.TenantId == tenantId && x.FactoryId == factoryId && !x.IsDeleted && x.SourceRuleId == sourceRuleId && x.RelatedObjectCode == relatedObjectCode && statuses.Contains(x.Status)) .Select(x => x.Id) .Take(1) .ToListAsync(); return ids.Count > 0 ? ids[0] : 0L; } /// /// G01-06:自动建单入口。消费 G01-05 去重结果,对 CanCreate==true 的命中 /// 复用 S8ManualReportService.CreateFromWatchAsync(同一主链的自动建单分支)落成标准 AdoS8Exception。 /// CanCreate==false 直接跳过;创建失败返回最小失败结果,不补偿、不重试、不对账。 /// public async Task> CreateExceptionsAsync(long tenantId, long factoryId) { var dedupResults = await EvaluateDedupAsync(tenantId, factoryId); var results = new List(dedupResults.Count); foreach (var dedup in dedupResults) { if (!dedup.CanCreate) { results.Add(new S8WatchCreationResult { DedupResult = dedup, Created = false, Skipped = true, CreatedExceptionId = null, Reason = dedup.Reason, ErrorMessage = null }); continue; } try { var entity = await _manualReportService.CreateFromWatchAsync(dedup.Hit); results.Add(new S8WatchCreationResult { DedupResult = dedup, Created = true, Skipped = false, CreatedExceptionId = entity.Id, Reason = "auto_created", ErrorMessage = null }); } catch (Exception ex) { results.Add(new S8WatchCreationResult { DedupResult = dedup, Created = false, Skipped = false, CreatedExceptionId = null, Reason = "create_failed", ErrorMessage = ex.Message }); } } // R3-OUT_OF_RANGE-REWRITE-1:三类正式 evaluator 路径。 // 旧 AlertRule 兼容主链(上方 dedupResults 循环)已在 LoadExecutionRulesAsync 处过滤掉 // OUT_OF_RANGE/TIMEOUT/SHORTAGE,只保留未分类历史规则;故新旧不双跑。 // R6 RunId:本次 CreateExceptionsAsync 调用对应的统一关联 id,落入 detection_log。 var runId = Guid.NewGuid().ToString("N").Substring(0, 16); results.AddRange(await ProcessRulesByTypeAsync(tenantId, factoryId, _timeoutEvaluator, S8TimeoutRuleEvaluator.RuleTypeCode, runId)); results.AddRange(await ProcessRulesByTypeAsync(tenantId, factoryId, _shortageEvaluator, S8ShortageRuleEvaluator.RuleTypeCode, runId)); results.AddRange(await ProcessRulesByTypeAsync(tenantId, factoryId, _outOfRangeEvaluator, S8OutOfRangeRuleEvaluator.RuleTypeCode, runId)); return results; } /// /// R2 TIMEOUT 类规则主链:薄包装,复用 。RunId 由内部生成。 /// public Task> ProcessTimeoutRulesAsync(long tenantId, long factoryId) => ProcessRulesByTypeAsync(tenantId, factoryId, _timeoutEvaluator, S8TimeoutRuleEvaluator.RuleTypeCode, Guid.NewGuid().ToString("N").Substring(0, 16)); /// /// R3 SHORTAGE 类规则主链:薄包装,复用 。RunId 由内部生成。 /// public Task> ProcessShortageRulesAsync(long tenantId, long factoryId) => ProcessRulesByTypeAsync(tenantId, factoryId, _shortageEvaluator, S8ShortageRuleEvaluator.RuleTypeCode, Guid.NewGuid().ToString("N").Substring(0, 16)); /// /// R3-OUT_OF_RANGE-REWRITE-1:OUT_OF_RANGE 类规则主链。 /// 复用 ,并对历史 dedup_key=NULL 的旧记录做 compat fallback: /// (source_rule_id=rule.Id AND related_object_code=hit AND status!=CLOSED AND dedup_key IS NULL AND is_deleted=0) /// 命中则 backfill 6 列,避免重复建单。RunId 由内部生成。 /// public Task> ProcessOutOfRangeRulesAsync(long tenantId, long factoryId) => ProcessRulesByTypeAsync(tenantId, factoryId, _outOfRangeEvaluator, S8OutOfRangeRuleEvaluator.RuleTypeCode, Guid.NewGuid().ToString("N").Substring(0, 16)); /// /// R2/R3 通用规则主链:装载 enabled WatchRule.RuleType=ruleType → evaluator → dedup_key 去重 → 建单/刷新。 /// dedup 命中:UPDATE last_detected_at + source_payload,不重复建单; /// dedup 未命中:校验 ExceptionTypeCode 是否在 baseline(tenant=0/factory=0 全局或本租户工厂),缺则跳过; /// 通过 → S8ManualReportService.CreateFromHitAsync 落标准 AdoS8Exception,新列全部回填。 /// 不做 SLA 升级、不做事件触发、不做 RecoveredAt。 /// private async Task> ProcessRulesByTypeAsync( long tenantId, long factoryId, IS8RuleEvaluator evaluator, string ruleType, string runId) { var results = new List(); var rules = await _ruleRep.AsQueryable() .Where(x => x.TenantId == tenantId && x.FactoryId == factoryId && x.Enabled && x.RuleType == ruleType) .ToListAsync(); if (rules.Count == 0) return results; var alertRules = (await _alertRuleRep.AsQueryable() .Where(x => x.TenantId == tenantId && x.FactoryId == factoryId) .ToListAsync()).AsReadOnly(); foreach (var rule in rules.OrderBy(x => x.Id)) { List hits; try { hits = await evaluator.EvaluateAsync(tenantId, factoryId, rule, alertRules); } catch (Exception ex) { // R6 EVALUATE_FAILED 日志:evaluator 抛 S8RuleEvaluatorException 走这里,包含 reason+message。 var failureReason = ex is S8RuleEvaluatorException sre ? sre.Reason : ex.GetType().Name; await WriteDetectionLogAsync(new AdoS8DetectionLog { TenantId = tenantId, FactoryId = factoryId, RuleId = rule.Id, RuleCode = rule.RuleCode, RuleType = ruleType, SceneCode = rule.SceneCode, SourceObjectType = rule.SourceObjectType, DetectResult = DetectResultEvaluateFailed, DetectedAt = DateTime.Now, FailureReason = failureReason, FailureMessage = Truncate(ex.Message, 1000), RunId = runId, TriggerSource = DetectionTriggerSource }); results.Add(BuildSkipResult(rule, "evaluate_failed", ex.Message)); continue; } // R5-RECOVERY-MINIMAL-1:evaluator 成功执行后调和 recovered_at。 // 仅在 evaluator 成功(throw 不会到这里)时才能判定"未命中"。仅写 recovered_at + updated_at, // 绝不动 status / assignee / verifier / source_payload / last_detected_at。 // R6 reconcile 返回 recoveredIds 用于决定是否写 NO_HIT。 List recoveredIds = new(); try { recoveredIds = await ReconcileRecoveriesForRuleAsync(tenantId, factoryId, rule, ruleType, hits, runId); } catch (Exception ex) { _logger.LogWarning(ex, "recovery_reconcile_failed ruleCode={RuleCode} ruleType={RuleType}", rule.RuleCode, ruleType); } // R6 NO_HIT 日志:evaluator 成功且无 hit、且本轮 reconcile 未写任何 RECOVERED → 每条 rule 一条 NO_HIT。 if (hits.Count == 0 && recoveredIds.Count == 0) { await WriteDetectionLogAsync(new AdoS8DetectionLog { TenantId = tenantId, FactoryId = factoryId, RuleId = rule.Id, RuleCode = rule.RuleCode, RuleType = ruleType, SceneCode = rule.SceneCode, SourceObjectType = rule.SourceObjectType, DetectResult = DetectResultNoHit, DetectedAt = DateTime.Now, RunId = runId, TriggerSource = DetectionTriggerSource }); } foreach (var hit in hits) { if (string.IsNullOrWhiteSpace(hit.DedupKey)) { results.Add(BuildSkipResult(rule, "missing_dedup_key", null, hit)); continue; } long matchedId; try { matchedId = await FindOpenExceptionByDedupKeyAsync(tenantId, factoryId, hit.DedupKey); } catch (Exception ex) { results.Add(BuildSkipResult(rule, "query_failed", ex.Message, hit)); continue; } if (matchedId > 0) { try { await RefreshDetectionAsync(matchedId, hit); await WriteDetectionLogAsync(BuildHitLog(tenantId, factoryId, rule, ruleType, hit, DetectResultRefreshed, matchedId, runId)); results.Add(BuildSkippedDuplicate(rule, hit, matchedId)); } catch (Exception ex) { results.Add(BuildSkipResult(rule, "refresh_failed", ex.Message, hit)); } continue; } // R3-OUT_OF_RANGE-REWRITE-1 compat fallback:dedup_key 未命中且 ruleType=OUT_OF_RANGE 时, // 尝试匹配历史 dedup_key=NULL 的旧 AlertRule 主链记录(如 id=34),命中则 backfill 6 列,避免重复建单。 if (string.Equals(ruleType, S8OutOfRangeRuleEvaluator.RuleTypeCode, StringComparison.OrdinalIgnoreCase)) { long compatId; try { compatId = await FindLegacyOutOfRangeExceptionAsync(tenantId, factoryId, rule.Id, hit.RelatedObjectCode); } catch (Exception ex) { results.Add(BuildSkipResult(rule, "query_failed", ex.Message, hit)); continue; } if (compatId > 0) { try { await BackfillLegacyExceptionAsync(compatId, hit); await WriteDetectionLogAsync(BuildHitLog(tenantId, factoryId, rule, ruleType, hit, DetectResultRefreshed, compatId, runId)); results.Add(BuildSkippedDuplicate(rule, hit, compatId)); } catch (Exception ex) { results.Add(BuildSkipResult(rule, "refresh_failed", ex.Message, hit)); } continue; } } bool typeExists; try { typeExists = await _exceptionTypeRep.AsQueryable() .Where(t => t.TypeCode == hit.ExceptionTypeCode && (t.TenantId == 0 || t.TenantId == tenantId) && (t.FactoryId == 0 || t.FactoryId == factoryId) && t.Enabled) .AnyAsync(); } catch (Exception ex) { results.Add(BuildSkipResult(rule, "query_failed", ex.Message, hit)); continue; } if (!typeExists) { results.Add(BuildSkipResult(rule, "exception_type_missing", null, hit)); continue; } try { var entity = await _manualReportService.CreateFromHitAsync(hit); await WriteDetectionLogAsync(BuildHitLog(tenantId, factoryId, rule, ruleType, hit, DetectResultCreated, entity.Id, runId)); results.Add(BuildCreatedResult(rule, hit, entity.Id)); } catch (Exception ex) { results.Add(BuildSkipResult(rule, "create_failed", ex.Message, hit)); } } } return results; } private async Task FindOpenExceptionByDedupKeyAsync(long tenantId, long factoryId, string dedupKey) { var ids = await _exceptionRep.AsQueryable() .Where(x => x.TenantId == tenantId && x.FactoryId == factoryId && !x.IsDeleted && x.Status != "CLOSED" && x.DedupKey == dedupKey) .Select(x => x.Id) .Take(1) .ToListAsync(); return ids.Count > 0 ? ids[0] : 0L; } /// /// R3 OUT_OF_RANGE compat fallback 查找:用 (source_rule_id, related_object_code, status!=CLOSED, /// dedup_key IS NULL, is_deleted=0) 严格条件定位旧 AlertRule 主链留下的历史记录。 /// private async Task FindLegacyOutOfRangeExceptionAsync(long tenantId, long factoryId, long sourceRuleId, string relatedObjectCode) { if (string.IsNullOrWhiteSpace(relatedObjectCode)) return 0L; var ids = await _exceptionRep.AsQueryable() .Where(x => x.TenantId == tenantId && x.FactoryId == factoryId && !x.IsDeleted && x.Status != "CLOSED" && x.DedupKey == null && x.SourceRuleId == sourceRuleId && x.RelatedObjectCode == relatedObjectCode) .Select(x => x.Id) .Take(1) .ToListAsync(); return ids.Count > 0 ? ids[0] : 0L; } /// /// R3 OUT_OF_RANGE compat fallback backfill:把历史记录的 R1 新 6 列(dedup_key/source_rule_code/ /// source_object_type/source_object_id/source_payload/last_detected_at)写入,并刷新 updated_at。 /// private async Task BackfillLegacyExceptionAsync(long exceptionId, S8RuleHit hit) { await _exceptionRep.Context.Updateable() .SetColumns(x => new AdoS8Exception { DedupKey = hit.DedupKey, SourceRuleCode = hit.SourceRuleCode, SourceObjectType = hit.SourceObjectType, SourceObjectId = hit.SourceObjectId, SourcePayload = hit.SourcePayload, LastDetectedAt = hit.DetectedAt, UpdatedAt = DateTime.Now }) .Where(x => x.Id == exceptionId) .ExecuteCommandAsync(); } /// /// R5 恢复时间最小闭环:对当前 rule 下未关闭、有 dedup_key、recovered_at 仍为 NULL 的异常, /// 凡不在本轮 hits.dedup_key 集合内的,写入 recovered_at = now、updated_at = now。 /// 仅写这 2 列;不动 status / assignee / verifier / source_payload / last_detected_at; /// recovered_at 一旦写入,本轮不做复发清空。 /// R6 返回 recoveredIds 供上游决定是否写 NO_HIT 日志,并对每个 recovered exception 写一条 RECOVERED 日志。 /// private async Task> ReconcileRecoveriesForRuleAsync( long tenantId, long factoryId, AdoS8WatchRule rule, string ruleType, List hits, string runId) { var hitDedupKeys = hits .Where(h => !string.IsNullOrWhiteSpace(h.DedupKey)) .Select(h => h.DedupKey) .ToHashSet(StringComparer.Ordinal); var candidates = await _exceptionRep.AsQueryable() .Where(x => x.TenantId == tenantId && x.FactoryId == factoryId && !x.IsDeleted && x.Status != "CLOSED" && x.SourceRuleCode == rule.RuleCode && x.DedupKey != null && x.RecoveredAt == null) .Select(x => new { x.Id, x.DedupKey, x.SourceObjectType, x.SourceObjectId, x.RelatedObjectCode }) .ToListAsync(); if (candidates.Count == 0) return new List(); var now = DateTime.Now; var recoveredIds = new List(); foreach (var c in candidates) { if (hitDedupKeys.Contains(c.DedupKey!)) continue; await _exceptionRep.Context.Updateable() .SetColumns(x => new AdoS8Exception { RecoveredAt = now, UpdatedAt = now }) .Where(x => x.Id == c.Id) .ExecuteCommandAsync(); recoveredIds.Add(c.Id); await WriteDetectionLogAsync(new AdoS8DetectionLog { TenantId = tenantId, FactoryId = factoryId, RuleId = rule.Id, RuleCode = rule.RuleCode, RuleType = ruleType, SceneCode = rule.SceneCode, SourceObjectType = c.SourceObjectType, SourceObjectId = c.SourceObjectId, RelatedObjectCode = c.RelatedObjectCode, DedupKey = c.DedupKey, DetectResult = DetectResultRecovered, ExceptionId = c.Id, DetectedAt = now, PayloadSnapshot = JsonSerializer.Serialize(new { ruleId = rule.Id, ruleCode = rule.RuleCode, reason = "no_longer_hit" }), RunId = runId, TriggerSource = DetectionTriggerSource, Remark = "Rule no longer hit; recovered_at marked" }); } if (recoveredIds.Count > 0) { _logger.LogInformation( "rule_recovered ruleCode={RuleCode} ruleType={RuleType} recoveredCount={Count} recoveredIds={Ids}", rule.RuleCode, ruleType, recoveredIds.Count, string.Join(",", recoveredIds)); } return recoveredIds; } /// R6 通用 hit 日志构造(CREATED / REFRESHED 共用)。 private static AdoS8DetectionLog BuildHitLog( long tenantId, long factoryId, AdoS8WatchRule rule, string ruleType, S8RuleHit hit, string detectResult, long exceptionId, string runId) => new() { TenantId = tenantId, FactoryId = factoryId, RuleId = rule.Id, RuleCode = rule.RuleCode, RuleType = ruleType, SceneCode = rule.SceneCode, SourceObjectType = hit.SourceObjectType, SourceObjectId = hit.SourceObjectId, RelatedObjectCode = hit.RelatedObjectCode, DedupKey = hit.DedupKey, DetectResult = detectResult, ExceptionId = exceptionId, DetectedAt = hit.DetectedAt, PayloadSnapshot = hit.SourcePayload, RunId = runId, TriggerSource = DetectionTriggerSource }; /// R6 日志写入:失败仅 LogWarning,不阻断主链;不抛异常。 private async Task WriteDetectionLogAsync(AdoS8DetectionLog log) { try { await _detectionLogRep.InsertAsync(log); } catch (Exception ex) { _logger.LogWarning(ex, "detection_log_write_failed runId={RunId} detectResult={Result} ruleCode={RuleCode} exceptionId={ExceptionId}", log.RunId, log.DetectResult, log.RuleCode, log.ExceptionId); } } private static string Truncate(string? s, int max) => string.IsNullOrEmpty(s) ? string.Empty : (s.Length <= max ? s : s.Substring(0, max)); private async Task RefreshDetectionAsync(long exceptionId, S8RuleHit hit) { await _exceptionRep.Context.Updateable() .SetColumns(x => new AdoS8Exception { LastDetectedAt = hit.DetectedAt, SourcePayload = hit.SourcePayload, UpdatedAt = DateTime.Now }) .Where(x => x.Id == exceptionId) .ExecuteCommandAsync(); } private static S8WatchCreationResult BuildCreatedResult(AdoS8WatchRule rule, S8RuleHit hit, long exceptionId) => new() { DedupResult = new S8WatchDedupResult { Hit = ToWatchHit(rule, hit), CanCreate = true, MatchedExceptionId = null, Reason = "no_pending" }, Created = true, Skipped = false, CreatedExceptionId = exceptionId, Reason = "auto_created", ErrorMessage = null }; private static S8WatchCreationResult BuildSkippedDuplicate(AdoS8WatchRule rule, S8RuleHit hit, long matchedId) => new() { DedupResult = new S8WatchDedupResult { Hit = ToWatchHit(rule, hit), CanCreate = false, MatchedExceptionId = matchedId, Reason = "duplicate_pending" }, Created = false, Skipped = true, CreatedExceptionId = null, Reason = "duplicate_pending", ErrorMessage = null }; private static S8WatchCreationResult BuildSkipResult(AdoS8WatchRule rule, string reason, string? error, S8RuleHit? hit = null) => new() { DedupResult = new S8WatchDedupResult { Hit = hit != null ? ToWatchHit(rule, hit) : new S8WatchHitResult { SourceRuleId = rule.Id, SourceRuleCode = rule.RuleCode }, CanCreate = false, MatchedExceptionId = null, Reason = reason }, Created = false, Skipped = true, CreatedExceptionId = null, Reason = reason, ErrorMessage = error }; private static S8WatchHitResult ToWatchHit(AdoS8WatchRule rule, S8RuleHit hit) => new() { SourceRuleId = hit.SourceRuleId == 0 ? rule.Id : hit.SourceRuleId, SourceRuleCode = string.IsNullOrEmpty(hit.SourceRuleCode) ? rule.RuleCode : hit.SourceRuleCode, DataSourceId = hit.DataSourceId, RelatedObjectCode = hit.RelatedObjectCode, Severity = hit.Severity, OccurrenceDeptId = hit.OccurrenceDeptId, ResponsibleDeptId = hit.ResponsibleDeptId, SourcePayload = hit.SourcePayload }; /// /// 单次轮询入口。当前仅完成规则读取与组装,返回可执行规则数量,不做实际数据采集。 /// public async Task RunOnceAsync() { var executionRules = await LoadExecutionRulesAsync(1, 1); return executionRules.Count; } // G01-04 首版最小比较符集合:>, >=, <, <=。 // 允许首尾空格;非此集合的一律视为“比较符非法”,由调用方跳过。 private static string? TryParseTriggerCondition(string raw) { var normalized = raw.Trim(); return normalized switch { ">" => ">", ">=" => ">=", "<" => "<", "<=" => "<=", _ => null }; } private static bool TryParseDecimal(string raw, out decimal value) => decimal.TryParse(raw.Trim(), NumberStyles.Any, CultureInfo.InvariantCulture, out value); private static bool EvaluateHit(decimal current, string op, decimal threshold) => op switch { ">" => current > threshold, ">=" => current >= threshold, "<" => current < threshold, "<=" => current <= threshold, _ => false }; private static bool IsSupportedAlertRule(AdoS8AlertRule alertRule) => !string.IsNullOrWhiteSpace(alertRule.TriggerCondition) && !string.IsNullOrWhiteSpace(alertRule.ThresholdVal); private static bool IsSupportedSqlDataSource(AdoS8DataSource dataSource) => dataSource.Enabled && string.Equals(dataSource.Type?.Trim(), SqlDataSourceType, StringComparison.OrdinalIgnoreCase) && !string.IsNullOrWhiteSpace(dataSource.Endpoint); private static bool IsDeviceWatchObjectType(string? watchObjectType) { if (string.IsNullOrWhiteSpace(watchObjectType)) return false; var normalized = watchObjectType.Trim().ToUpperInvariant(); return normalized is "DEVICE" or "EQUIPMENT" || watchObjectType.Trim() == "设备"; } private SqlSugarScope CreateSqlQueryScope(string connectionString) { var dbType = _ruleRep.Context.CurrentConnectionConfig.DbType; return new SqlSugarScope(new ConnectionConfig { ConfigId = $"s8-watch-sql-{Guid.NewGuid():N}", DbType = dbType, ConnectionString = connectionString, InitKeyType = InitKeyType.Attribute, IsAutoCloseConnection = true }); } private static bool HasRequiredColumns(DataTable table) => TryGetColumnName(table.Columns, "related_object_code") != null && TryGetColumnName(table.Columns, "current_value") != null; private static S8WatchDeviceRow MapDeviceRow(DataRow row) { var columns = row.Table.Columns; var relatedObjectCodeColumn = TryGetColumnName(columns, "related_object_code"); var currentValueColumn = TryGetColumnName(columns, "current_value"); var occurrenceDeptIdColumn = TryGetColumnName(columns, "occurrence_dept_id"); var responsibleDeptIdColumn = TryGetColumnName(columns, "responsible_dept_id"); return new S8WatchDeviceRow { RelatedObjectCode = ReadString(row, relatedObjectCodeColumn), CurrentValue = ReadDecimal(row, currentValueColumn), OccurrenceDeptId = ReadLong(row, occurrenceDeptIdColumn), ResponsibleDeptId = ReadLong(row, responsibleDeptIdColumn), SourcePayload = BuildSourcePayload(row) }; } private static string? TryGetColumnName(DataColumnCollection columns, string expectedName) { var normalizedExpected = NormalizeColumnName(expectedName); foreach (DataColumn column in columns) { if (NormalizeColumnName(column.ColumnName) == normalizedExpected) return column.ColumnName; } return null; } private static string NormalizeColumnName(string columnName) => columnName.Replace("_", string.Empty, StringComparison.Ordinal).Trim().ToUpperInvariant(); private static string ReadString(DataRow row, string? columnName) { if (string.IsNullOrWhiteSpace(columnName)) return string.Empty; var value = row[columnName]; return value == DBNull.Value ? string.Empty : Convert.ToString(value)?.Trim() ?? string.Empty; } private static long? ReadLong(DataRow row, string? columnName) { if (string.IsNullOrWhiteSpace(columnName)) return null; var value = row[columnName]; if (value == DBNull.Value) return null; return long.TryParse(Convert.ToString(value, CultureInfo.InvariantCulture), out var result) ? result : null; } private static decimal? ReadDecimal(DataRow row, string? columnName) { if (string.IsNullOrWhiteSpace(columnName)) return null; var value = row[columnName]; if (value == DBNull.Value) return null; return decimal.TryParse(Convert.ToString(value, CultureInfo.InvariantCulture), NumberStyles.Any, CultureInfo.InvariantCulture, out var result) ? result : null; } private static string BuildSourcePayload(DataRow row) { var payload = new Dictionary(StringComparer.OrdinalIgnoreCase); foreach (DataColumn column in row.Table.Columns) { var value = row[column]; payload[column.ColumnName] = value == DBNull.Value ? null : value; } return JsonSerializer.Serialize(payload); } } public sealed class S8WatchExecutionRule { public long WatchRuleId { get; set; } public string WatchRuleCode { get; set; } = string.Empty; public string SceneCode { get; set; } = string.Empty; public string TriggerType { get; set; } = string.Empty; public string WatchObjectType { get; set; } = string.Empty; public long DataSourceId { get; set; } public string DataSourceCode { get; set; } = string.Empty; public string DataSourceType { get; set; } = string.Empty; public string DataSourceConnection { get; set; } = string.Empty; public string QueryExpression { get; set; } = string.Empty; public int PollIntervalSeconds { get; set; } public long AlertRuleId { get; set; } public string AlertRuleCode { get; set; } = string.Empty; public string TriggerCondition { get; set; } = string.Empty; public string ThresholdValue { get; set; } = string.Empty; public string Severity { get; set; } = string.Empty; } public sealed class S8WatchDeviceQueryResult { public long WatchRuleId { get; set; } public string WatchRuleCode { get; set; } = string.Empty; public bool Success { get; set; } public string? FailureReason { get; set; } public List Rows { get; set; } = new(); public static S8WatchDeviceQueryResult Ok(S8WatchExecutionRule rule, List rows) => new() { WatchRuleId = rule.WatchRuleId, WatchRuleCode = rule.WatchRuleCode, Success = true, Rows = rows }; public static S8WatchDeviceQueryResult Fail(S8WatchExecutionRule rule, string reason) => new() { WatchRuleId = rule.WatchRuleId, WatchRuleCode = rule.WatchRuleCode, Success = false, FailureReason = reason }; } public sealed class S8WatchDeviceRow { public string RelatedObjectCode { get; set; } = string.Empty; public decimal? CurrentValue { get; set; } public long? OccurrenceDeptId { get; set; } public long? ResponsibleDeptId { get; set; } public string SourcePayload { get; set; } = string.Empty; } /// /// G01-05 去重结果对象。仅服务 G01-06 建单前拦截,由 CanCreate 单决策位决定是否建单。 /// 只服务首版唯一场景 S2S6_PRODUCTION + 唯一 trigger_type VALUE_DEVIATION + 设备对象。 /// 不预留多 trigger_type / 平台化去重扩展结构。 /// Reason 值域:no_pending / duplicate_pending / missing_dedup_key / query_failed。 /// public sealed class S8WatchDedupResult { public S8WatchHitResult Hit { get; set; } = new(); public bool CanCreate { get; set; } public long? MatchedExceptionId { get; set; } public string Reason { get; set; } = string.Empty; } /// /// G01-06 建单结果对象。仅服务 G-01 首版主线验收,由 Created / Skipped 两位决定结局。 /// 只服务首版唯一场景 S2S6_PRODUCTION + 唯一 trigger_type VALUE_DEVIATION + 设备对象。 /// 不预留多 trigger_type / 平台化工单扩展结构。 /// Reason 值域:auto_created / create_failed / 透传自 DedupResult.Reason。 /// public sealed class S8WatchCreationResult { public S8WatchDedupResult DedupResult { get; set; } = new(); public bool Created { get; set; } public bool Skipped { get; set; } public long? CreatedExceptionId { get; set; } public string Reason { get; set; } = string.Empty; public string? ErrorMessage { get; set; } } /// /// G01-04 命中结果对象。承载 G01-05 去重与 G01-06 建单所需最小追溯字段, /// 仅服务首版唯一场景 S2S6_PRODUCTION + 唯一 trigger_type VALUE_DEVIATION + 设备对象。 /// 不预留多 trigger_type / 多场景 / 平台化扩展结构。 /// public sealed class S8WatchHitResult { public long SourceRuleId { get; set; } public string SourceRuleCode { get; set; } = string.Empty; public long AlertRuleId { get; set; } public long DataSourceId { get; set; } public string RelatedObjectCode { get; set; } = string.Empty; public decimal CurrentValue { get; set; } public decimal ThresholdValue { get; set; } public string TriggerCondition { get; set; } = string.Empty; public string Severity { get; set; } = string.Empty; public long? OccurrenceDeptId { get; set; } public long? ResponsibleDeptId { get; set; } public string SourcePayload { get; set; } = string.Empty; }