S8WatchSchedulerService.cs 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847
  1. using Admin.NET.Plugin.AiDOP.Entity.S8;
  2. using Admin.NET.Plugin.AiDOP.Infrastructure.S8;
  3. using Admin.NET.Plugin.AiDOP.Service.S8.Rules;
  4. using SqlSugar;
  5. using System.Data;
  6. using System.Globalization;
  7. using System.Text.Json;
  8. namespace Admin.NET.Plugin.AiDOP.Service.S8;
  9. /// <summary>
  10. /// 监视规则轮询调度服务(首轮存根)。
  11. /// 后续接入 Admin.NET 定时任务机制后,由调度器周期调用 <see cref="RunOnceAsync"/>,
  12. /// 按各规则的 PollIntervalSeconds 逐条评估并生成异常记录。
  13. /// </summary>
  14. public class S8WatchSchedulerService : ITransient
  15. {
  16. private readonly SqlSugarRepository<AdoS8WatchRule> _ruleRep;
  17. private readonly SqlSugarRepository<AdoS8AlertRule> _alertRuleRep;
  18. private readonly SqlSugarRepository<AdoS8DataSource> _dataSourceRep;
  19. private readonly SqlSugarRepository<AdoS8Exception> _exceptionRep;
  20. private readonly SqlSugarRepository<AdoS8ExceptionType> _exceptionTypeRep;
  21. private readonly S8NotificationService _notificationService;
  22. private readonly S8ManualReportService _manualReportService;
  23. private readonly S8TimeoutRuleEvaluator _timeoutEvaluator;
  24. private readonly S8ShortageRuleEvaluator _shortageEvaluator;
  25. private const string DefaultTriggerType = "VALUE_DEVIATION";
  26. private const string SqlDataSourceType = "SQL";
  27. // G01-05 未闭环状态集合:复用自 S8ExceptionService 当前 pendingStatuses 事实口径
  28. // (见 S8ExceptionService.GetPagedAsync 中 pendingStatuses 的定义,两处必须保持一致)。
  29. // 这不是“自定义未闭环集合”;若现有口径调整,两处需同步修改。
  30. private static readonly string[] UnclosedExceptionStatuses =
  31. { "NEW", "ASSIGNED", "IN_PROGRESS", "PENDING_VERIFICATION" };
  32. public S8WatchSchedulerService(
  33. SqlSugarRepository<AdoS8WatchRule> ruleRep,
  34. SqlSugarRepository<AdoS8AlertRule> alertRuleRep,
  35. SqlSugarRepository<AdoS8DataSource> dataSourceRep,
  36. SqlSugarRepository<AdoS8Exception> exceptionRep,
  37. SqlSugarRepository<AdoS8ExceptionType> exceptionTypeRep,
  38. S8NotificationService notificationService,
  39. S8ManualReportService manualReportService,
  40. S8TimeoutRuleEvaluator timeoutEvaluator,
  41. S8ShortageRuleEvaluator shortageEvaluator)
  42. {
  43. _ruleRep = ruleRep;
  44. _alertRuleRep = alertRuleRep;
  45. _dataSourceRep = dataSourceRep;
  46. _exceptionRep = exceptionRep;
  47. _exceptionTypeRep = exceptionTypeRep;
  48. _notificationService = notificationService;
  49. _manualReportService = manualReportService;
  50. _timeoutEvaluator = timeoutEvaluator;
  51. _shortageEvaluator = shortageEvaluator;
  52. }
  53. public async Task<List<S8WatchExecutionRule>> LoadExecutionRulesAsync(long tenantId, long factoryId)
  54. {
  55. var watchRules = await _ruleRep.AsQueryable()
  56. .Where(x => x.TenantId == tenantId
  57. && x.FactoryId == factoryId
  58. && x.Enabled
  59. && x.SceneCode == S8SceneCode.S2S6Production)
  60. .ToListAsync();
  61. var deviceRules = watchRules
  62. .Where(x => IsDeviceWatchObjectType(x.WatchObjectType))
  63. .ToList();
  64. if (deviceRules.Count == 0) return new();
  65. var dataSourceIds = deviceRules
  66. .Select(x => x.DataSourceId)
  67. .Distinct()
  68. .ToList();
  69. var dataSources = await _dataSourceRep.AsQueryable()
  70. .Where(x => x.TenantId == tenantId
  71. && x.FactoryId == factoryId
  72. && x.Enabled
  73. && dataSourceIds.Contains(x.Id))
  74. .ToListAsync();
  75. var dataSourceMap = dataSources.ToDictionary(x => x.Id);
  76. if (dataSourceMap.Count == 0) return new();
  77. var alertRules = (await _alertRuleRep.AsQueryable()
  78. .Where(x => x.TenantId == tenantId
  79. && x.FactoryId == factoryId
  80. && x.SceneCode == S8SceneCode.S2S6Production)
  81. .ToListAsync())
  82. .Where(IsSupportedAlertRule)
  83. .ToList();
  84. // G-01 首版 AlertRule 冲突口径(C 收口):
  85. // 当前场景存在多条可运行 AlertRule 时,视为“当前规则配置冲突”并跳过该规则,
  86. // 不按“首条”继续运行,也不扩大为“整场景停摆”。
  87. // 当前模型下所有 device watchRule 共享同场景 AlertRule,故冲突态下所有 device 规则均跳过,
  88. // 但此处按“逐规则跳过”的语义实现,避免被误读为“整场景 return empty 停摆”。
  89. var alertRule = alertRules.Count == 1 ? alertRules[0] : null;
  90. var executionRules = new List<S8WatchExecutionRule>();
  91. foreach (var watchRule in deviceRules.OrderBy(x => x.Id))
  92. {
  93. // 配置冲突:当前规则跳过(不停摆其他规则)。
  94. if (alertRule == null)
  95. continue;
  96. if (!dataSourceMap.TryGetValue(watchRule.DataSourceId, out var dataSource))
  97. continue;
  98. if (!IsSupportedSqlDataSource(dataSource))
  99. continue;
  100. executionRules.Add(new S8WatchExecutionRule
  101. {
  102. WatchRuleId = watchRule.Id,
  103. WatchRuleCode = watchRule.RuleCode,
  104. SceneCode = watchRule.SceneCode,
  105. TriggerType = DefaultTriggerType,
  106. WatchObjectType = watchRule.WatchObjectType.Trim(),
  107. DataSourceId = dataSource.Id,
  108. DataSourceCode = dataSource.DataSourceCode,
  109. DataSourceType = dataSource.Type,
  110. DataSourceConnection = dataSource.Endpoint?.Trim() ?? string.Empty,
  111. QueryExpression = watchRule.Expression?.Trim() ?? string.Empty,
  112. PollIntervalSeconds = watchRule.PollIntervalSeconds,
  113. AlertRuleId = alertRule.Id,
  114. AlertRuleCode = alertRule.RuleCode,
  115. TriggerCondition = alertRule.TriggerCondition!.Trim(),
  116. ThresholdValue = alertRule.ThresholdVal!.Trim(),
  117. Severity = alertRule.Severity
  118. });
  119. }
  120. return executionRules;
  121. }
  122. public async Task<List<S8WatchDeviceQueryResult>> QueryDeviceRowsAsync(long tenantId, long factoryId)
  123. {
  124. var executionRules = await LoadExecutionRulesAsync(tenantId, factoryId);
  125. var results = new List<S8WatchDeviceQueryResult>();
  126. foreach (var rule in executionRules)
  127. results.Add(await QueryDeviceRowsAsync(rule));
  128. return results;
  129. }
  130. public async Task<S8WatchDeviceQueryResult> QueryDeviceRowsAsync(S8WatchExecutionRule rule)
  131. {
  132. if (!string.Equals(rule.DataSourceType, SqlDataSourceType, StringComparison.OrdinalIgnoreCase))
  133. return S8WatchDeviceQueryResult.Fail(rule, "数据源类型不是 SQL,已跳过");
  134. if (string.IsNullOrWhiteSpace(rule.QueryExpression))
  135. return S8WatchDeviceQueryResult.Fail(rule, "查询表达式为空,已跳过");
  136. try
  137. {
  138. using var db = CreateSqlQueryScope(rule.DataSourceConnection);
  139. var table = await db.Ado.GetDataTableAsync(rule.QueryExpression);
  140. if (!HasRequiredColumns(table))
  141. return S8WatchDeviceQueryResult.Fail(rule, "查询结果缺少 required columns: related_object_code/current_value");
  142. var rows = table.Rows.Cast<DataRow>()
  143. .Select(MapDeviceRow)
  144. .Where(x => !string.IsNullOrWhiteSpace(x.RelatedObjectCode))
  145. .ToList();
  146. return S8WatchDeviceQueryResult.Ok(rule, rows);
  147. }
  148. catch (Exception ex)
  149. {
  150. return S8WatchDeviceQueryResult.Fail(rule, $"查询执行失败: {ex.Message}");
  151. }
  152. }
  153. /// <summary>
  154. /// G01-04:基于设备级结果行集做首版 VALUE_DEVIATION 单阈值判定,
  155. /// 产出命中结果对象列表,供 G01-05 去重与 G01-06 建单消费。
  156. /// 本方法不做去重、不做建单、不做严重度重算、不做时间线。
  157. /// </summary>
  158. public async Task<List<S8WatchHitResult>> EvaluateHitsAsync(long tenantId, long factoryId)
  159. {
  160. var executionRules = await LoadExecutionRulesAsync(tenantId, factoryId);
  161. var ruleMap = executionRules.ToDictionary(x => x.WatchRuleId);
  162. var queryResults = new List<S8WatchDeviceQueryResult>();
  163. foreach (var rule in executionRules)
  164. queryResults.Add(await QueryDeviceRowsAsync(rule));
  165. var hits = new List<S8WatchHitResult>();
  166. foreach (var queryResult in queryResults)
  167. {
  168. // G01-03 查询失败:跳过,不进入判定。
  169. if (!queryResult.Success) continue;
  170. if (!ruleMap.TryGetValue(queryResult.WatchRuleId, out var rule)) continue;
  171. // 判定参数缺失:跳过当前规则。
  172. if (string.IsNullOrWhiteSpace(rule.TriggerCondition)
  173. || string.IsNullOrWhiteSpace(rule.ThresholdValue))
  174. continue;
  175. // 比较符非法:跳过当前规则。
  176. var op = TryParseTriggerCondition(rule.TriggerCondition);
  177. if (op == null) continue;
  178. // ThresholdValue 非法:跳过当前规则。
  179. if (!TryParseDecimal(rule.ThresholdValue, out var threshold)) continue;
  180. foreach (var row in queryResult.Rows)
  181. {
  182. // CurrentValue 非法(null / 无法解析数值):跳过当前行,不进入判定。
  183. if (row.CurrentValue == null) continue;
  184. // 未命中:不进入后续链路。
  185. if (!EvaluateHit(row.CurrentValue.Value, op, threshold)) continue;
  186. hits.Add(new S8WatchHitResult
  187. {
  188. SourceRuleId = rule.WatchRuleId,
  189. SourceRuleCode = rule.WatchRuleCode,
  190. AlertRuleId = rule.AlertRuleId,
  191. DataSourceId = rule.DataSourceId,
  192. RelatedObjectCode = row.RelatedObjectCode,
  193. CurrentValue = row.CurrentValue.Value,
  194. ThresholdValue = threshold,
  195. TriggerCondition = op,
  196. Severity = rule.Severity,
  197. OccurrenceDeptId = row.OccurrenceDeptId,
  198. ResponsibleDeptId = row.ResponsibleDeptId,
  199. SourcePayload = row.SourcePayload
  200. });
  201. }
  202. }
  203. return hits;
  204. }
  205. /// <summary>
  206. /// G01-05:未闭环异常去重最小实现。
  207. /// 消费 G01-04 产出的 <see cref="S8WatchHitResult"/>,按 (SourceRuleId + RelatedObjectCode)
  208. /// 在未闭环状态集合内判重,只回答“是否允许建单”。
  209. /// 首版明确不做:原单刷新 / 时间线追加 / payload 更新 / 次数累计 / 严重度重算 / 状态修复。
  210. /// </summary>
  211. public async Task<List<S8WatchDedupResult>> EvaluateDedupAsync(long tenantId, long factoryId)
  212. {
  213. var hits = await EvaluateHitsAsync(tenantId, factoryId);
  214. var results = new List<S8WatchDedupResult>(hits.Count);
  215. foreach (var hit in hits)
  216. {
  217. // 防御性分支:正常情况下 SourceRuleId 与 RelatedObjectCode 已由上游
  218. // (G01-02 规则装配 + G01-03 查询结果列校验)保证;此处仅作兜底,
  219. // 不是首版正常路径。
  220. if (hit.SourceRuleId <= 0 || string.IsNullOrWhiteSpace(hit.RelatedObjectCode))
  221. {
  222. results.Add(new S8WatchDedupResult
  223. {
  224. Hit = hit,
  225. CanCreate = false,
  226. MatchedExceptionId = null,
  227. Reason = "missing_dedup_key"
  228. });
  229. continue;
  230. }
  231. long matchedId;
  232. try
  233. {
  234. matchedId = await FindPendingExceptionIdAsync(
  235. tenantId, factoryId, hit.SourceRuleId, hit.RelatedObjectCode);
  236. }
  237. catch
  238. {
  239. // 去重查询失败:首版偏保守,宁可阻止也不重复建单,不扩展为补偿。
  240. results.Add(new S8WatchDedupResult
  241. {
  242. Hit = hit,
  243. CanCreate = false,
  244. MatchedExceptionId = null,
  245. Reason = "query_failed"
  246. });
  247. continue;
  248. }
  249. if (matchedId > 0)
  250. {
  251. // 命中已有未闭环异常 → 阻止建单。
  252. results.Add(new S8WatchDedupResult
  253. {
  254. Hit = hit,
  255. CanCreate = false,
  256. MatchedExceptionId = matchedId,
  257. Reason = "duplicate_pending"
  258. });
  259. }
  260. else
  261. {
  262. // 未命中 → 允许建单,交 G01-06 消费。
  263. results.Add(new S8WatchDedupResult
  264. {
  265. Hit = hit,
  266. CanCreate = true,
  267. MatchedExceptionId = null,
  268. Reason = "no_pending"
  269. });
  270. }
  271. }
  272. return results;
  273. }
  274. // 取任意一条匹配的未闭环异常 Id 作为“是否存在重复单”的拦截依据。
  275. // 首版只需要“存在性”,不关心“最早 / 最新”;不在 G01-05 处理排序语义。
  276. private async Task<long> FindPendingExceptionIdAsync(
  277. long tenantId, long factoryId, long sourceRuleId, string relatedObjectCode)
  278. {
  279. // SqlSugar 表达式翻译要求 Contains 数组变量必须可访问;此处将类级常量
  280. // 承接到方法内局部变量,仅为表达式翻译服务,值与 UnclosedExceptionStatuses 一致。
  281. var statuses = UnclosedExceptionStatuses;
  282. var ids = await _exceptionRep.AsQueryable()
  283. .Where(x => x.TenantId == tenantId
  284. && x.FactoryId == factoryId
  285. && !x.IsDeleted
  286. && x.SourceRuleId == sourceRuleId
  287. && x.RelatedObjectCode == relatedObjectCode
  288. && statuses.Contains(x.Status))
  289. .Select(x => x.Id)
  290. .Take(1)
  291. .ToListAsync();
  292. return ids.Count > 0 ? ids[0] : 0L;
  293. }
  294. /// <summary>
  295. /// G01-06:自动建单入口。消费 G01-05 去重结果,对 CanCreate==true 的命中
  296. /// 复用 S8ManualReportService.CreateFromWatchAsync(同一主链的自动建单分支)落成标准 AdoS8Exception。
  297. /// CanCreate==false 直接跳过;创建失败返回最小失败结果,不补偿、不重试、不对账。
  298. /// </summary>
  299. public async Task<List<S8WatchCreationResult>> CreateExceptionsAsync(long tenantId, long factoryId)
  300. {
  301. var dedupResults = await EvaluateDedupAsync(tenantId, factoryId);
  302. var results = new List<S8WatchCreationResult>(dedupResults.Count);
  303. foreach (var dedup in dedupResults)
  304. {
  305. if (!dedup.CanCreate)
  306. {
  307. results.Add(new S8WatchCreationResult
  308. {
  309. DedupResult = dedup,
  310. Created = false,
  311. Skipped = true,
  312. CreatedExceptionId = null,
  313. Reason = dedup.Reason,
  314. ErrorMessage = null
  315. });
  316. continue;
  317. }
  318. try
  319. {
  320. var entity = await _manualReportService.CreateFromWatchAsync(dedup.Hit);
  321. results.Add(new S8WatchCreationResult
  322. {
  323. DedupResult = dedup,
  324. Created = true,
  325. Skipped = false,
  326. CreatedExceptionId = entity.Id,
  327. Reason = "auto_created",
  328. ErrorMessage = null
  329. });
  330. }
  331. catch (Exception ex)
  332. {
  333. results.Add(new S8WatchCreationResult
  334. {
  335. DedupResult = dedup,
  336. Created = false,
  337. Skipped = false,
  338. CreatedExceptionId = null,
  339. Reason = "create_failed",
  340. ErrorMessage = ex.Message
  341. });
  342. }
  343. }
  344. // R2: TIMEOUT 路径,按 rule_type='TIMEOUT' 分派;OUT_OF_RANGE 走上方既有兼容分支不动。
  345. // 未知 rule_type 与 null 由 LoadExecutionRulesAsync 既有过滤(DEVICE + S2S6_PRODUCTION + AlertRule)天然隔离,
  346. // 不在本路径处理;不抛出全局异常。
  347. results.AddRange(await ProcessTimeoutRulesAsync(tenantId, factoryId));
  348. results.AddRange(await ProcessShortageRulesAsync(tenantId, factoryId));
  349. return results;
  350. }
  351. /// <summary>
  352. /// R2 TIMEOUT 类规则主链:薄包装,复用 <see cref="ProcessRulesByTypeAsync"/>。
  353. /// </summary>
  354. public Task<List<S8WatchCreationResult>> ProcessTimeoutRulesAsync(long tenantId, long factoryId) =>
  355. ProcessRulesByTypeAsync(tenantId, factoryId, _timeoutEvaluator, S8TimeoutRuleEvaluator.RuleTypeCode);
  356. /// <summary>
  357. /// R3 SHORTAGE 类规则主链:薄包装,复用 <see cref="ProcessRulesByTypeAsync"/>。
  358. /// </summary>
  359. public Task<List<S8WatchCreationResult>> ProcessShortageRulesAsync(long tenantId, long factoryId) =>
  360. ProcessRulesByTypeAsync(tenantId, factoryId, _shortageEvaluator, S8ShortageRuleEvaluator.RuleTypeCode);
  361. /// <summary>
  362. /// R2/R3 通用规则主链:装载 enabled WatchRule.RuleType=ruleType → evaluator → dedup_key 去重 → 建单/刷新。
  363. /// dedup 命中:UPDATE last_detected_at + source_payload,不重复建单;
  364. /// dedup 未命中:校验 ExceptionTypeCode 是否在 baseline(tenant=0/factory=0 全局或本租户工厂),缺则跳过;
  365. /// 通过 → S8ManualReportService.CreateFromHitAsync 落标准 AdoS8Exception,新列全部回填。
  366. /// 不做 SLA 升级、不做事件触发、不做 RecoveredAt。
  367. /// </summary>
  368. private async Task<List<S8WatchCreationResult>> ProcessRulesByTypeAsync(
  369. long tenantId, long factoryId, IS8RuleEvaluator evaluator, string ruleType)
  370. {
  371. var results = new List<S8WatchCreationResult>();
  372. var rules = await _ruleRep.AsQueryable()
  373. .Where(x => x.TenantId == tenantId
  374. && x.FactoryId == factoryId
  375. && x.Enabled
  376. && x.RuleType == ruleType)
  377. .ToListAsync();
  378. if (rules.Count == 0) return results;
  379. var alertRules = (await _alertRuleRep.AsQueryable()
  380. .Where(x => x.TenantId == tenantId && x.FactoryId == factoryId)
  381. .ToListAsync()).AsReadOnly();
  382. foreach (var rule in rules.OrderBy(x => x.Id))
  383. {
  384. List<S8RuleHit> hits;
  385. try
  386. {
  387. hits = await evaluator.EvaluateAsync(tenantId, factoryId, rule, alertRules);
  388. }
  389. catch (Exception ex)
  390. {
  391. results.Add(BuildSkipResult(rule, "evaluate_failed", ex.Message));
  392. continue;
  393. }
  394. foreach (var hit in hits)
  395. {
  396. if (string.IsNullOrWhiteSpace(hit.DedupKey))
  397. {
  398. results.Add(BuildSkipResult(rule, "missing_dedup_key", null, hit));
  399. continue;
  400. }
  401. long matchedId;
  402. try
  403. {
  404. matchedId = await FindOpenExceptionByDedupKeyAsync(tenantId, factoryId, hit.DedupKey);
  405. }
  406. catch (Exception ex)
  407. {
  408. results.Add(BuildSkipResult(rule, "query_failed", ex.Message, hit));
  409. continue;
  410. }
  411. if (matchedId > 0)
  412. {
  413. try
  414. {
  415. await RefreshDetectionAsync(matchedId, hit);
  416. results.Add(BuildSkippedDuplicate(rule, hit, matchedId));
  417. }
  418. catch (Exception ex)
  419. {
  420. results.Add(BuildSkipResult(rule, "refresh_failed", ex.Message, hit));
  421. }
  422. continue;
  423. }
  424. bool typeExists;
  425. try
  426. {
  427. typeExists = await _exceptionTypeRep.AsQueryable()
  428. .Where(t => t.TypeCode == hit.ExceptionTypeCode
  429. && (t.TenantId == 0 || t.TenantId == tenantId)
  430. && (t.FactoryId == 0 || t.FactoryId == factoryId)
  431. && t.Enabled)
  432. .AnyAsync();
  433. }
  434. catch (Exception ex)
  435. {
  436. results.Add(BuildSkipResult(rule, "query_failed", ex.Message, hit));
  437. continue;
  438. }
  439. if (!typeExists)
  440. {
  441. results.Add(BuildSkipResult(rule, "exception_type_missing", null, hit));
  442. continue;
  443. }
  444. try
  445. {
  446. var entity = await _manualReportService.CreateFromHitAsync(hit);
  447. results.Add(BuildCreatedResult(rule, hit, entity.Id));
  448. }
  449. catch (Exception ex)
  450. {
  451. results.Add(BuildSkipResult(rule, "create_failed", ex.Message, hit));
  452. }
  453. }
  454. }
  455. return results;
  456. }
  457. private async Task<long> FindOpenExceptionByDedupKeyAsync(long tenantId, long factoryId, string dedupKey)
  458. {
  459. var ids = await _exceptionRep.AsQueryable()
  460. .Where(x => x.TenantId == tenantId
  461. && x.FactoryId == factoryId
  462. && !x.IsDeleted
  463. && x.Status != "CLOSED"
  464. && x.DedupKey == dedupKey)
  465. .Select(x => x.Id)
  466. .Take(1)
  467. .ToListAsync();
  468. return ids.Count > 0 ? ids[0] : 0L;
  469. }
  470. private async Task RefreshDetectionAsync(long exceptionId, S8RuleHit hit)
  471. {
  472. await _exceptionRep.Context.Updateable<AdoS8Exception>()
  473. .SetColumns(x => new AdoS8Exception
  474. {
  475. LastDetectedAt = hit.DetectedAt,
  476. SourcePayload = hit.SourcePayload,
  477. UpdatedAt = DateTime.Now
  478. })
  479. .Where(x => x.Id == exceptionId)
  480. .ExecuteCommandAsync();
  481. }
  482. private static S8WatchCreationResult BuildCreatedResult(AdoS8WatchRule rule, S8RuleHit hit, long exceptionId) =>
  483. new()
  484. {
  485. DedupResult = new S8WatchDedupResult
  486. {
  487. Hit = ToWatchHit(rule, hit),
  488. CanCreate = true,
  489. MatchedExceptionId = null,
  490. Reason = "no_pending"
  491. },
  492. Created = true,
  493. Skipped = false,
  494. CreatedExceptionId = exceptionId,
  495. Reason = "auto_created",
  496. ErrorMessage = null
  497. };
  498. private static S8WatchCreationResult BuildSkippedDuplicate(AdoS8WatchRule rule, S8RuleHit hit, long matchedId) =>
  499. new()
  500. {
  501. DedupResult = new S8WatchDedupResult
  502. {
  503. Hit = ToWatchHit(rule, hit),
  504. CanCreate = false,
  505. MatchedExceptionId = matchedId,
  506. Reason = "duplicate_pending"
  507. },
  508. Created = false,
  509. Skipped = true,
  510. CreatedExceptionId = null,
  511. Reason = "duplicate_pending",
  512. ErrorMessage = null
  513. };
  514. private static S8WatchCreationResult BuildSkipResult(AdoS8WatchRule rule, string reason, string? error, S8RuleHit? hit = null) =>
  515. new()
  516. {
  517. DedupResult = new S8WatchDedupResult
  518. {
  519. Hit = hit != null ? ToWatchHit(rule, hit) : new S8WatchHitResult { SourceRuleId = rule.Id, SourceRuleCode = rule.RuleCode },
  520. CanCreate = false,
  521. MatchedExceptionId = null,
  522. Reason = reason
  523. },
  524. Created = false,
  525. Skipped = true,
  526. CreatedExceptionId = null,
  527. Reason = reason,
  528. ErrorMessage = error
  529. };
  530. private static S8WatchHitResult ToWatchHit(AdoS8WatchRule rule, S8RuleHit hit) => new()
  531. {
  532. SourceRuleId = hit.SourceRuleId == 0 ? rule.Id : hit.SourceRuleId,
  533. SourceRuleCode = string.IsNullOrEmpty(hit.SourceRuleCode) ? rule.RuleCode : hit.SourceRuleCode,
  534. DataSourceId = hit.DataSourceId,
  535. RelatedObjectCode = hit.RelatedObjectCode,
  536. Severity = hit.Severity,
  537. OccurrenceDeptId = hit.OccurrenceDeptId,
  538. ResponsibleDeptId = hit.ResponsibleDeptId,
  539. SourcePayload = hit.SourcePayload
  540. };
  541. /// <summary>
  542. /// 单次轮询入口。当前仅完成规则读取与组装,返回可执行规则数量,不做实际数据采集。
  543. /// </summary>
  544. public async Task<int> RunOnceAsync()
  545. {
  546. var executionRules = await LoadExecutionRulesAsync(1, 1);
  547. return executionRules.Count;
  548. }
  549. // G01-04 首版最小比较符集合:>, >=, <, <=。
  550. // 允许首尾空格;非此集合的一律视为“比较符非法”,由调用方跳过。
  551. private static string? TryParseTriggerCondition(string raw)
  552. {
  553. var normalized = raw.Trim();
  554. return normalized switch
  555. {
  556. ">" => ">",
  557. ">=" => ">=",
  558. "<" => "<",
  559. "<=" => "<=",
  560. _ => null
  561. };
  562. }
  563. private static bool TryParseDecimal(string raw, out decimal value) =>
  564. decimal.TryParse(raw.Trim(), NumberStyles.Any, CultureInfo.InvariantCulture, out value);
  565. private static bool EvaluateHit(decimal current, string op, decimal threshold) => op switch
  566. {
  567. ">" => current > threshold,
  568. ">=" => current >= threshold,
  569. "<" => current < threshold,
  570. "<=" => current <= threshold,
  571. _ => false
  572. };
  573. private static bool IsSupportedAlertRule(AdoS8AlertRule alertRule) =>
  574. !string.IsNullOrWhiteSpace(alertRule.TriggerCondition)
  575. && !string.IsNullOrWhiteSpace(alertRule.ThresholdVal);
  576. private static bool IsSupportedSqlDataSource(AdoS8DataSource dataSource) =>
  577. dataSource.Enabled
  578. && string.Equals(dataSource.Type?.Trim(), SqlDataSourceType, StringComparison.OrdinalIgnoreCase)
  579. && !string.IsNullOrWhiteSpace(dataSource.Endpoint);
  580. private static bool IsDeviceWatchObjectType(string? watchObjectType)
  581. {
  582. if (string.IsNullOrWhiteSpace(watchObjectType)) return false;
  583. var normalized = watchObjectType.Trim().ToUpperInvariant();
  584. return normalized is "DEVICE" or "EQUIPMENT" || watchObjectType.Trim() == "设备";
  585. }
  586. private SqlSugarScope CreateSqlQueryScope(string connectionString)
  587. {
  588. var dbType = _ruleRep.Context.CurrentConnectionConfig.DbType;
  589. return new SqlSugarScope(new ConnectionConfig
  590. {
  591. ConfigId = $"s8-watch-sql-{Guid.NewGuid():N}",
  592. DbType = dbType,
  593. ConnectionString = connectionString,
  594. InitKeyType = InitKeyType.Attribute,
  595. IsAutoCloseConnection = true
  596. });
  597. }
  598. private static bool HasRequiredColumns(DataTable table) =>
  599. TryGetColumnName(table.Columns, "related_object_code") != null
  600. && TryGetColumnName(table.Columns, "current_value") != null;
  601. private static S8WatchDeviceRow MapDeviceRow(DataRow row)
  602. {
  603. var columns = row.Table.Columns;
  604. var relatedObjectCodeColumn = TryGetColumnName(columns, "related_object_code");
  605. var currentValueColumn = TryGetColumnName(columns, "current_value");
  606. var occurrenceDeptIdColumn = TryGetColumnName(columns, "occurrence_dept_id");
  607. var responsibleDeptIdColumn = TryGetColumnName(columns, "responsible_dept_id");
  608. return new S8WatchDeviceRow
  609. {
  610. RelatedObjectCode = ReadString(row, relatedObjectCodeColumn),
  611. CurrentValue = ReadDecimal(row, currentValueColumn),
  612. OccurrenceDeptId = ReadLong(row, occurrenceDeptIdColumn),
  613. ResponsibleDeptId = ReadLong(row, responsibleDeptIdColumn),
  614. SourcePayload = BuildSourcePayload(row)
  615. };
  616. }
  617. private static string? TryGetColumnName(DataColumnCollection columns, string expectedName)
  618. {
  619. var normalizedExpected = NormalizeColumnName(expectedName);
  620. foreach (DataColumn column in columns)
  621. {
  622. if (NormalizeColumnName(column.ColumnName) == normalizedExpected)
  623. return column.ColumnName;
  624. }
  625. return null;
  626. }
  627. private static string NormalizeColumnName(string columnName) =>
  628. columnName.Replace("_", string.Empty, StringComparison.Ordinal).Trim().ToUpperInvariant();
  629. private static string ReadString(DataRow row, string? columnName)
  630. {
  631. if (string.IsNullOrWhiteSpace(columnName)) return string.Empty;
  632. var value = row[columnName];
  633. return value == DBNull.Value ? string.Empty : Convert.ToString(value)?.Trim() ?? string.Empty;
  634. }
  635. private static long? ReadLong(DataRow row, string? columnName)
  636. {
  637. if (string.IsNullOrWhiteSpace(columnName)) return null;
  638. var value = row[columnName];
  639. if (value == DBNull.Value) return null;
  640. return long.TryParse(Convert.ToString(value, CultureInfo.InvariantCulture), out var result) ? result : null;
  641. }
  642. private static decimal? ReadDecimal(DataRow row, string? columnName)
  643. {
  644. if (string.IsNullOrWhiteSpace(columnName)) return null;
  645. var value = row[columnName];
  646. if (value == DBNull.Value) return null;
  647. return decimal.TryParse(Convert.ToString(value, CultureInfo.InvariantCulture), NumberStyles.Any, CultureInfo.InvariantCulture, out var result)
  648. ? result
  649. : null;
  650. }
  651. private static string BuildSourcePayload(DataRow row)
  652. {
  653. var payload = new Dictionary<string, object?>(StringComparer.OrdinalIgnoreCase);
  654. foreach (DataColumn column in row.Table.Columns)
  655. {
  656. var value = row[column];
  657. payload[column.ColumnName] = value == DBNull.Value ? null : value;
  658. }
  659. return JsonSerializer.Serialize(payload);
  660. }
  661. }
  662. public sealed class S8WatchExecutionRule
  663. {
  664. public long WatchRuleId { get; set; }
  665. public string WatchRuleCode { get; set; } = string.Empty;
  666. public string SceneCode { get; set; } = string.Empty;
  667. public string TriggerType { get; set; } = string.Empty;
  668. public string WatchObjectType { get; set; } = string.Empty;
  669. public long DataSourceId { get; set; }
  670. public string DataSourceCode { get; set; } = string.Empty;
  671. public string DataSourceType { get; set; } = string.Empty;
  672. public string DataSourceConnection { get; set; } = string.Empty;
  673. public string QueryExpression { get; set; } = string.Empty;
  674. public int PollIntervalSeconds { get; set; }
  675. public long AlertRuleId { get; set; }
  676. public string AlertRuleCode { get; set; } = string.Empty;
  677. public string TriggerCondition { get; set; } = string.Empty;
  678. public string ThresholdValue { get; set; } = string.Empty;
  679. public string Severity { get; set; } = string.Empty;
  680. }
  681. public sealed class S8WatchDeviceQueryResult
  682. {
  683. public long WatchRuleId { get; set; }
  684. public string WatchRuleCode { get; set; } = string.Empty;
  685. public bool Success { get; set; }
  686. public string? FailureReason { get; set; }
  687. public List<S8WatchDeviceRow> Rows { get; set; } = new();
  688. public static S8WatchDeviceQueryResult Ok(S8WatchExecutionRule rule, List<S8WatchDeviceRow> rows) =>
  689. new()
  690. {
  691. WatchRuleId = rule.WatchRuleId,
  692. WatchRuleCode = rule.WatchRuleCode,
  693. Success = true,
  694. Rows = rows
  695. };
  696. public static S8WatchDeviceQueryResult Fail(S8WatchExecutionRule rule, string reason) =>
  697. new()
  698. {
  699. WatchRuleId = rule.WatchRuleId,
  700. WatchRuleCode = rule.WatchRuleCode,
  701. Success = false,
  702. FailureReason = reason
  703. };
  704. }
  705. public sealed class S8WatchDeviceRow
  706. {
  707. public string RelatedObjectCode { get; set; } = string.Empty;
  708. public decimal? CurrentValue { get; set; }
  709. public long? OccurrenceDeptId { get; set; }
  710. public long? ResponsibleDeptId { get; set; }
  711. public string SourcePayload { get; set; } = string.Empty;
  712. }
  713. /// <summary>
  714. /// G01-05 去重结果对象。仅服务 G01-06 建单前拦截,由 CanCreate 单决策位决定是否建单。
  715. /// 只服务首版唯一场景 S2S6_PRODUCTION + 唯一 trigger_type VALUE_DEVIATION + 设备对象。
  716. /// 不预留多 trigger_type / 平台化去重扩展结构。
  717. /// Reason 值域:no_pending / duplicate_pending / missing_dedup_key / query_failed。
  718. /// </summary>
  719. public sealed class S8WatchDedupResult
  720. {
  721. public S8WatchHitResult Hit { get; set; } = new();
  722. public bool CanCreate { get; set; }
  723. public long? MatchedExceptionId { get; set; }
  724. public string Reason { get; set; } = string.Empty;
  725. }
  726. /// <summary>
  727. /// G01-06 建单结果对象。仅服务 G-01 首版主线验收,由 Created / Skipped 两位决定结局。
  728. /// 只服务首版唯一场景 S2S6_PRODUCTION + 唯一 trigger_type VALUE_DEVIATION + 设备对象。
  729. /// 不预留多 trigger_type / 平台化工单扩展结构。
  730. /// Reason 值域:auto_created / create_failed / 透传自 DedupResult.Reason。
  731. /// </summary>
  732. public sealed class S8WatchCreationResult
  733. {
  734. public S8WatchDedupResult DedupResult { get; set; } = new();
  735. public bool Created { get; set; }
  736. public bool Skipped { get; set; }
  737. public long? CreatedExceptionId { get; set; }
  738. public string Reason { get; set; } = string.Empty;
  739. public string? ErrorMessage { get; set; }
  740. }
  741. /// <summary>
  742. /// G01-04 命中结果对象。承载 G01-05 去重与 G01-06 建单所需最小追溯字段,
  743. /// 仅服务首版唯一场景 S2S6_PRODUCTION + 唯一 trigger_type VALUE_DEVIATION + 设备对象。
  744. /// 不预留多 trigger_type / 多场景 / 平台化扩展结构。
  745. /// </summary>
  746. public sealed class S8WatchHitResult
  747. {
  748. public long SourceRuleId { get; set; }
  749. public string SourceRuleCode { get; set; } = string.Empty;
  750. public long AlertRuleId { get; set; }
  751. public long DataSourceId { get; set; }
  752. public string RelatedObjectCode { get; set; } = string.Empty;
  753. public decimal CurrentValue { get; set; }
  754. public decimal ThresholdValue { get; set; }
  755. public string TriggerCondition { get; set; } = string.Empty;
  756. public string Severity { get; set; } = string.Empty;
  757. public long? OccurrenceDeptId { get; set; }
  758. public long? ResponsibleDeptId { get; set; }
  759. public string SourcePayload { get; set; } = string.Empty;
  760. }