using Admin.NET.Plugin.AiDOP.Entity.S8; using Admin.NET.Plugin.AiDOP.Infrastructure.S8; using SqlSugar; using System.Data; using System.Globalization; using System.Text.Json; namespace Admin.NET.Plugin.AiDOP.Service.S8; /// /// 监视规则轮询调度服务(首轮存根)。 /// 后续接入 Admin.NET 定时任务机制后,由调度器周期调用 , /// 按各规则的 PollIntervalSeconds 逐条评估并生成异常记录。 /// public class S8WatchSchedulerService : ITransient { private readonly SqlSugarRepository _ruleRep; private readonly SqlSugarRepository _alertRuleRep; private readonly SqlSugarRepository _dataSourceRep; private readonly SqlSugarRepository _exceptionRep; private readonly S8NotificationService _notificationService; private readonly S8ManualReportService _manualReportService; private const string DefaultTriggerType = "VALUE_DEVIATION"; private const string SqlDataSourceType = "SQL"; // G01-05 未闭环状态集合:复用自 S8ExceptionService 当前 pendingStatuses 事实口径 // (见 S8ExceptionService.GetPagedAsync 中 pendingStatuses 的定义,两处必须保持一致)。 // 这不是“自定义未闭环集合”;若现有口径调整,两处需同步修改。 private static readonly string[] UnclosedExceptionStatuses = { "NEW", "ASSIGNED", "IN_PROGRESS", "PENDING_VERIFICATION" }; public S8WatchSchedulerService( SqlSugarRepository ruleRep, SqlSugarRepository alertRuleRep, SqlSugarRepository dataSourceRep, SqlSugarRepository exceptionRep, S8NotificationService notificationService, S8ManualReportService manualReportService) { _ruleRep = ruleRep; _alertRuleRep = alertRuleRep; _dataSourceRep = dataSourceRep; _exceptionRep = exceptionRep; _notificationService = notificationService; _manualReportService = manualReportService; } public async Task> LoadExecutionRulesAsync(long tenantId, long factoryId) { var watchRules = await _ruleRep.AsQueryable() .Where(x => x.TenantId == tenantId && x.FactoryId == factoryId && x.Enabled && x.SceneCode == S8SceneCode.S2S6Production) .ToListAsync(); var deviceRules = watchRules .Where(x => IsDeviceWatchObjectType(x.WatchObjectType)) .ToList(); if (deviceRules.Count == 0) return new(); var dataSourceIds = deviceRules .Select(x => x.DataSourceId) .Distinct() .ToList(); var dataSources = await _dataSourceRep.AsQueryable() .Where(x => x.TenantId == tenantId && x.FactoryId == factoryId && x.Enabled && dataSourceIds.Contains(x.Id)) .ToListAsync(); var dataSourceMap = dataSources.ToDictionary(x => x.Id); if (dataSourceMap.Count == 0) return new(); var alertRules = (await _alertRuleRep.AsQueryable() .Where(x => x.TenantId == tenantId && x.FactoryId == factoryId && x.SceneCode == S8SceneCode.S2S6Production) .ToListAsync()) .Where(IsSupportedAlertRule) .ToList(); // G-01 首版 AlertRule 冲突口径(C 收口): // 当前场景存在多条可运行 AlertRule 时,视为“当前规则配置冲突”并跳过该规则, // 不按“首条”继续运行,也不扩大为“整场景停摆”。 // 当前模型下所有 device watchRule 共享同场景 AlertRule,故冲突态下所有 device 规则均跳过, // 但此处按“逐规则跳过”的语义实现,避免被误读为“整场景 return empty 停摆”。 var alertRule = alertRules.Count == 1 ? alertRules[0] : null; var executionRules = new List(); foreach (var watchRule in deviceRules.OrderBy(x => x.Id)) { // 配置冲突:当前规则跳过(不停摆其他规则)。 if (alertRule == null) continue; if (!dataSourceMap.TryGetValue(watchRule.DataSourceId, out var dataSource)) continue; if (!IsSupportedSqlDataSource(dataSource)) continue; executionRules.Add(new S8WatchExecutionRule { WatchRuleId = watchRule.Id, WatchRuleCode = watchRule.RuleCode, SceneCode = watchRule.SceneCode, TriggerType = DefaultTriggerType, WatchObjectType = watchRule.WatchObjectType.Trim(), DataSourceId = dataSource.Id, DataSourceCode = dataSource.DataSourceCode, DataSourceType = dataSource.Type, DataSourceConnection = dataSource.Endpoint?.Trim() ?? string.Empty, QueryExpression = watchRule.Expression?.Trim() ?? string.Empty, PollIntervalSeconds = watchRule.PollIntervalSeconds, AlertRuleId = alertRule.Id, AlertRuleCode = alertRule.RuleCode, TriggerCondition = alertRule.TriggerCondition!.Trim(), ThresholdValue = alertRule.ThresholdVal!.Trim(), Severity = alertRule.Severity }); } return executionRules; } public async Task> QueryDeviceRowsAsync(long tenantId, long factoryId) { var executionRules = await LoadExecutionRulesAsync(tenantId, factoryId); var results = new List(); foreach (var rule in executionRules) results.Add(await QueryDeviceRowsAsync(rule)); return results; } public async Task QueryDeviceRowsAsync(S8WatchExecutionRule rule) { if (!string.Equals(rule.DataSourceType, SqlDataSourceType, StringComparison.OrdinalIgnoreCase)) return S8WatchDeviceQueryResult.Fail(rule, "数据源类型不是 SQL,已跳过"); if (string.IsNullOrWhiteSpace(rule.QueryExpression)) return S8WatchDeviceQueryResult.Fail(rule, "查询表达式为空,已跳过"); try { using var db = CreateSqlQueryScope(rule.DataSourceConnection); var table = await db.Ado.GetDataTableAsync(rule.QueryExpression); if (!HasRequiredColumns(table)) return S8WatchDeviceQueryResult.Fail(rule, "查询结果缺少 required columns: related_object_code/current_value"); var rows = table.Rows.Cast() .Select(MapDeviceRow) .Where(x => !string.IsNullOrWhiteSpace(x.RelatedObjectCode)) .ToList(); return S8WatchDeviceQueryResult.Ok(rule, rows); } catch (Exception ex) { return S8WatchDeviceQueryResult.Fail(rule, $"查询执行失败: {ex.Message}"); } } /// /// G01-04:基于设备级结果行集做首版 VALUE_DEVIATION 单阈值判定, /// 产出命中结果对象列表,供 G01-05 去重与 G01-06 建单消费。 /// 本方法不做去重、不做建单、不做严重度重算、不做时间线。 /// public async Task> EvaluateHitsAsync(long tenantId, long factoryId) { var executionRules = await LoadExecutionRulesAsync(tenantId, factoryId); var ruleMap = executionRules.ToDictionary(x => x.WatchRuleId); var queryResults = new List(); foreach (var rule in executionRules) queryResults.Add(await QueryDeviceRowsAsync(rule)); var hits = new List(); foreach (var queryResult in queryResults) { // G01-03 查询失败:跳过,不进入判定。 if (!queryResult.Success) continue; if (!ruleMap.TryGetValue(queryResult.WatchRuleId, out var rule)) continue; // 判定参数缺失:跳过当前规则。 if (string.IsNullOrWhiteSpace(rule.TriggerCondition) || string.IsNullOrWhiteSpace(rule.ThresholdValue)) continue; // 比较符非法:跳过当前规则。 var op = TryParseTriggerCondition(rule.TriggerCondition); if (op == null) continue; // ThresholdValue 非法:跳过当前规则。 if (!TryParseDecimal(rule.ThresholdValue, out var threshold)) continue; foreach (var row in queryResult.Rows) { // CurrentValue 非法(null / 无法解析数值):跳过当前行,不进入判定。 if (row.CurrentValue == null) continue; // 未命中:不进入后续链路。 if (!EvaluateHit(row.CurrentValue.Value, op, threshold)) continue; hits.Add(new S8WatchHitResult { SourceRuleId = rule.WatchRuleId, SourceRuleCode = rule.WatchRuleCode, AlertRuleId = rule.AlertRuleId, DataSourceId = rule.DataSourceId, RelatedObjectCode = row.RelatedObjectCode, CurrentValue = row.CurrentValue.Value, ThresholdValue = threshold, TriggerCondition = op, Severity = rule.Severity, OccurrenceDeptId = row.OccurrenceDeptId, ResponsibleDeptId = row.ResponsibleDeptId, SourcePayload = row.SourcePayload }); } } return hits; } /// /// G01-05:未闭环异常去重最小实现。 /// 消费 G01-04 产出的 ,按 (SourceRuleId + RelatedObjectCode) /// 在未闭环状态集合内判重,只回答“是否允许建单”。 /// 首版明确不做:原单刷新 / 时间线追加 / payload 更新 / 次数累计 / 严重度重算 / 状态修复。 /// public async Task> EvaluateDedupAsync(long tenantId, long factoryId) { var hits = await EvaluateHitsAsync(tenantId, factoryId); var results = new List(hits.Count); foreach (var hit in hits) { // 防御性分支:正常情况下 SourceRuleId 与 RelatedObjectCode 已由上游 // (G01-02 规则装配 + G01-03 查询结果列校验)保证;此处仅作兜底, // 不是首版正常路径。 if (hit.SourceRuleId <= 0 || string.IsNullOrWhiteSpace(hit.RelatedObjectCode)) { results.Add(new S8WatchDedupResult { Hit = hit, CanCreate = false, MatchedExceptionId = null, Reason = "missing_dedup_key" }); continue; } long matchedId; try { matchedId = await FindPendingExceptionIdAsync( tenantId, factoryId, hit.SourceRuleId, hit.RelatedObjectCode); } catch { // 去重查询失败:首版偏保守,宁可阻止也不重复建单,不扩展为补偿。 results.Add(new S8WatchDedupResult { Hit = hit, CanCreate = false, MatchedExceptionId = null, Reason = "query_failed" }); continue; } if (matchedId > 0) { // 命中已有未闭环异常 → 阻止建单。 results.Add(new S8WatchDedupResult { Hit = hit, CanCreate = false, MatchedExceptionId = matchedId, Reason = "duplicate_pending" }); } else { // 未命中 → 允许建单,交 G01-06 消费。 results.Add(new S8WatchDedupResult { Hit = hit, CanCreate = true, MatchedExceptionId = null, Reason = "no_pending" }); } } return results; } // 取任意一条匹配的未闭环异常 Id 作为“是否存在重复单”的拦截依据。 // 首版只需要“存在性”,不关心“最早 / 最新”;不在 G01-05 处理排序语义。 private async Task FindPendingExceptionIdAsync( long tenantId, long factoryId, long sourceRuleId, string relatedObjectCode) { // SqlSugar 表达式翻译要求 Contains 数组变量必须可访问;此处将类级常量 // 承接到方法内局部变量,仅为表达式翻译服务,值与 UnclosedExceptionStatuses 一致。 var statuses = UnclosedExceptionStatuses; var ids = await _exceptionRep.AsQueryable() .Where(x => x.TenantId == tenantId && x.FactoryId == factoryId && !x.IsDeleted && x.SourceRuleId == sourceRuleId && x.RelatedObjectCode == relatedObjectCode && statuses.Contains(x.Status)) .Select(x => x.Id) .Take(1) .ToListAsync(); return ids.Count > 0 ? ids[0] : 0L; } /// /// G01-06:自动建单入口。消费 G01-05 去重结果,对 CanCreate==true 的命中 /// 复用 S8ManualReportService.CreateFromWatchAsync(同一主链的自动建单分支)落成标准 AdoS8Exception。 /// CanCreate==false 直接跳过;创建失败返回最小失败结果,不补偿、不重试、不对账。 /// public async Task> CreateExceptionsAsync(long tenantId, long factoryId) { var dedupResults = await EvaluateDedupAsync(tenantId, factoryId); var results = new List(dedupResults.Count); foreach (var dedup in dedupResults) { if (!dedup.CanCreate) { results.Add(new S8WatchCreationResult { DedupResult = dedup, Created = false, Skipped = true, CreatedExceptionId = null, Reason = dedup.Reason, ErrorMessage = null }); continue; } try { var entity = await _manualReportService.CreateFromWatchAsync(dedup.Hit); results.Add(new S8WatchCreationResult { DedupResult = dedup, Created = true, Skipped = false, CreatedExceptionId = entity.Id, Reason = "auto_created", ErrorMessage = null }); } catch (Exception ex) { results.Add(new S8WatchCreationResult { DedupResult = dedup, Created = false, Skipped = false, CreatedExceptionId = null, Reason = "create_failed", ErrorMessage = ex.Message }); } } return results; } /// /// 单次轮询入口。当前仅完成规则读取与组装,返回可执行规则数量,不做实际数据采集。 /// public async Task RunOnceAsync() { var executionRules = await LoadExecutionRulesAsync(1, 1); return executionRules.Count; } // G01-04 首版最小比较符集合:>, >=, <, <=。 // 允许首尾空格;非此集合的一律视为“比较符非法”,由调用方跳过。 private static string? TryParseTriggerCondition(string raw) { var normalized = raw.Trim(); return normalized switch { ">" => ">", ">=" => ">=", "<" => "<", "<=" => "<=", _ => null }; } private static bool TryParseDecimal(string raw, out decimal value) => decimal.TryParse(raw.Trim(), NumberStyles.Any, CultureInfo.InvariantCulture, out value); private static bool EvaluateHit(decimal current, string op, decimal threshold) => op switch { ">" => current > threshold, ">=" => current >= threshold, "<" => current < threshold, "<=" => current <= threshold, _ => false }; private static bool IsSupportedAlertRule(AdoS8AlertRule alertRule) => !string.IsNullOrWhiteSpace(alertRule.TriggerCondition) && !string.IsNullOrWhiteSpace(alertRule.ThresholdVal); private static bool IsSupportedSqlDataSource(AdoS8DataSource dataSource) => dataSource.Enabled && string.Equals(dataSource.Type?.Trim(), SqlDataSourceType, StringComparison.OrdinalIgnoreCase) && !string.IsNullOrWhiteSpace(dataSource.Endpoint); private static bool IsDeviceWatchObjectType(string? watchObjectType) { if (string.IsNullOrWhiteSpace(watchObjectType)) return false; var normalized = watchObjectType.Trim().ToUpperInvariant(); return normalized is "DEVICE" or "EQUIPMENT" || watchObjectType.Trim() == "设备"; } private SqlSugarScope CreateSqlQueryScope(string connectionString) { var dbType = _ruleRep.Context.CurrentConnectionConfig.DbType; return new SqlSugarScope(new ConnectionConfig { ConfigId = $"s8-watch-sql-{Guid.NewGuid():N}", DbType = dbType, ConnectionString = connectionString, InitKeyType = InitKeyType.Attribute, IsAutoCloseConnection = true }); } private static bool HasRequiredColumns(DataTable table) => TryGetColumnName(table.Columns, "related_object_code") != null && TryGetColumnName(table.Columns, "current_value") != null; private static S8WatchDeviceRow MapDeviceRow(DataRow row) { var columns = row.Table.Columns; var relatedObjectCodeColumn = TryGetColumnName(columns, "related_object_code"); var currentValueColumn = TryGetColumnName(columns, "current_value"); var occurrenceDeptIdColumn = TryGetColumnName(columns, "occurrence_dept_id"); var responsibleDeptIdColumn = TryGetColumnName(columns, "responsible_dept_id"); return new S8WatchDeviceRow { RelatedObjectCode = ReadString(row, relatedObjectCodeColumn), CurrentValue = ReadDecimal(row, currentValueColumn), OccurrenceDeptId = ReadLong(row, occurrenceDeptIdColumn), ResponsibleDeptId = ReadLong(row, responsibleDeptIdColumn), SourcePayload = BuildSourcePayload(row) }; } private static string? TryGetColumnName(DataColumnCollection columns, string expectedName) { var normalizedExpected = NormalizeColumnName(expectedName); foreach (DataColumn column in columns) { if (NormalizeColumnName(column.ColumnName) == normalizedExpected) return column.ColumnName; } return null; } private static string NormalizeColumnName(string columnName) => columnName.Replace("_", string.Empty, StringComparison.Ordinal).Trim().ToUpperInvariant(); private static string ReadString(DataRow row, string? columnName) { if (string.IsNullOrWhiteSpace(columnName)) return string.Empty; var value = row[columnName]; return value == DBNull.Value ? string.Empty : Convert.ToString(value)?.Trim() ?? string.Empty; } private static long? ReadLong(DataRow row, string? columnName) { if (string.IsNullOrWhiteSpace(columnName)) return null; var value = row[columnName]; if (value == DBNull.Value) return null; return long.TryParse(Convert.ToString(value, CultureInfo.InvariantCulture), out var result) ? result : null; } private static decimal? ReadDecimal(DataRow row, string? columnName) { if (string.IsNullOrWhiteSpace(columnName)) return null; var value = row[columnName]; if (value == DBNull.Value) return null; return decimal.TryParse(Convert.ToString(value, CultureInfo.InvariantCulture), NumberStyles.Any, CultureInfo.InvariantCulture, out var result) ? result : null; } private static string BuildSourcePayload(DataRow row) { var payload = new Dictionary(StringComparer.OrdinalIgnoreCase); foreach (DataColumn column in row.Table.Columns) { var value = row[column]; payload[column.ColumnName] = value == DBNull.Value ? null : value; } return JsonSerializer.Serialize(payload); } } public sealed class S8WatchExecutionRule { public long WatchRuleId { get; set; } public string WatchRuleCode { get; set; } = string.Empty; public string SceneCode { get; set; } = string.Empty; public string TriggerType { get; set; } = string.Empty; public string WatchObjectType { get; set; } = string.Empty; public long DataSourceId { get; set; } public string DataSourceCode { get; set; } = string.Empty; public string DataSourceType { get; set; } = string.Empty; public string DataSourceConnection { get; set; } = string.Empty; public string QueryExpression { get; set; } = string.Empty; public int PollIntervalSeconds { get; set; } public long AlertRuleId { get; set; } public string AlertRuleCode { get; set; } = string.Empty; public string TriggerCondition { get; set; } = string.Empty; public string ThresholdValue { get; set; } = string.Empty; public string Severity { get; set; } = string.Empty; } public sealed class S8WatchDeviceQueryResult { public long WatchRuleId { get; set; } public string WatchRuleCode { get; set; } = string.Empty; public bool Success { get; set; } public string? FailureReason { get; set; } public List Rows { get; set; } = new(); public static S8WatchDeviceQueryResult Ok(S8WatchExecutionRule rule, List rows) => new() { WatchRuleId = rule.WatchRuleId, WatchRuleCode = rule.WatchRuleCode, Success = true, Rows = rows }; public static S8WatchDeviceQueryResult Fail(S8WatchExecutionRule rule, string reason) => new() { WatchRuleId = rule.WatchRuleId, WatchRuleCode = rule.WatchRuleCode, Success = false, FailureReason = reason }; } public sealed class S8WatchDeviceRow { public string RelatedObjectCode { get; set; } = string.Empty; public decimal? CurrentValue { get; set; } public long? OccurrenceDeptId { get; set; } public long? ResponsibleDeptId { get; set; } public string SourcePayload { get; set; } = string.Empty; } /// /// G01-05 去重结果对象。仅服务 G01-06 建单前拦截,由 CanCreate 单决策位决定是否建单。 /// 只服务首版唯一场景 S2S6_PRODUCTION + 唯一 trigger_type VALUE_DEVIATION + 设备对象。 /// 不预留多 trigger_type / 平台化去重扩展结构。 /// Reason 值域:no_pending / duplicate_pending / missing_dedup_key / query_failed。 /// public sealed class S8WatchDedupResult { public S8WatchHitResult Hit { get; set; } = new(); public bool CanCreate { get; set; } public long? MatchedExceptionId { get; set; } public string Reason { get; set; } = string.Empty; } /// /// G01-06 建单结果对象。仅服务 G-01 首版主线验收,由 Created / Skipped 两位决定结局。 /// 只服务首版唯一场景 S2S6_PRODUCTION + 唯一 trigger_type VALUE_DEVIATION + 设备对象。 /// 不预留多 trigger_type / 平台化工单扩展结构。 /// Reason 值域:auto_created / create_failed / 透传自 DedupResult.Reason。 /// public sealed class S8WatchCreationResult { public S8WatchDedupResult DedupResult { get; set; } = new(); public bool Created { get; set; } public bool Skipped { get; set; } public long? CreatedExceptionId { get; set; } public string Reason { get; set; } = string.Empty; public string? ErrorMessage { get; set; } } /// /// G01-04 命中结果对象。承载 G01-05 去重与 G01-06 建单所需最小追溯字段, /// 仅服务首版唯一场景 S2S6_PRODUCTION + 唯一 trigger_type VALUE_DEVIATION + 设备对象。 /// 不预留多 trigger_type / 多场景 / 平台化扩展结构。 /// public sealed class S8WatchHitResult { public long SourceRuleId { get; set; } public string SourceRuleCode { get; set; } = string.Empty; public long AlertRuleId { get; set; } public long DataSourceId { get; set; } public string RelatedObjectCode { get; set; } = string.Empty; public decimal CurrentValue { get; set; } public decimal ThresholdValue { get; set; } public string TriggerCondition { get; set; } = string.Empty; public string Severity { get; set; } = string.Empty; public long? OccurrenceDeptId { get; set; } public long? ResponsibleDeptId { get; set; } public string SourcePayload { get; set; } = string.Empty; }