S8WatchRuleService.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. using System.Text.Json;
  2. using Admin.NET.Plugin.AiDOP.Entity.S8;
  3. using Admin.NET.Plugin.AiDOP.Service.S8.Rules;
  4. namespace Admin.NET.Plugin.AiDOP.Service.S8;
  5. public class S8WatchRuleService : ITransient
  6. {
  7. private readonly SqlSugarRepository<AdoS8WatchRule> _rep;
  8. private readonly SqlSugarRepository<AdoS8DataSource> _dataSourceRep;
  9. private readonly SqlSugarRepository<AdoS8SceneConfig> _sceneRep;
  10. public S8WatchRuleService(
  11. SqlSugarRepository<AdoS8WatchRule> rep,
  12. SqlSugarRepository<AdoS8DataSource> dataSourceRep,
  13. SqlSugarRepository<AdoS8SceneConfig> sceneRep)
  14. {
  15. _rep = rep;
  16. _dataSourceRep = dataSourceRep;
  17. _sceneRep = sceneRep;
  18. }
  19. public async Task<List<AdoS8WatchRule>> ListAsync(long tenantId, long factoryId) =>
  20. await _rep.AsQueryable()
  21. .Where(x => x.TenantId == tenantId && x.FactoryId == factoryId)
  22. .ToListAsync();
  23. public async Task<AdoS8WatchRule> CreateAsync(AdoS8WatchRule body)
  24. {
  25. await ValidateAsync(body);
  26. body.Id = 0;
  27. body.CreatedAt = DateTime.Now;
  28. await _rep.InsertAsync(body);
  29. return body;
  30. }
  31. public async Task<AdoS8WatchRule> UpdateAsync(long id, AdoS8WatchRule body)
  32. {
  33. var e = await _rep.GetByIdAsync(id) ?? throw new S8BizException("记录不存在");
  34. await ValidateAsync(body, id);
  35. body.Id = id;
  36. body.CreatedAt = e.CreatedAt;
  37. body.UpdatedAt = DateTime.Now;
  38. await _rep.UpdateAsync(body);
  39. return body;
  40. }
  41. public async Task DeleteAsync(long id) => await _rep.DeleteByIdAsync(id);
  42. /// <summary>
  43. /// R4 安全更新:只更新 params_json 与 enabled。expression / rule_code / data_source_id /
  44. /// scene_code / watch_object_type / rule_type / source_object_type 一律不通过此路径修改。
  45. /// 当 RuleType 非空时,按对应 evaluator 的 Params.Parse 进行 schema 校验,解析失败抛 S8BizException。
  46. /// </summary>
  47. public async Task<AdoS8WatchRule> UpdateParamsAsync(long id, S8WatchRuleParamsPayload payload)
  48. {
  49. var entity = await _rep.GetByIdAsync(id) ?? throw new S8BizException("记录不存在");
  50. var paramsJson = payload.ParamsJson?.Trim();
  51. if (!string.IsNullOrEmpty(paramsJson))
  52. {
  53. ValidateParamsJsonByRuleType(entity.RuleType, paramsJson);
  54. }
  55. entity.ParamsJson = string.IsNullOrEmpty(paramsJson) ? null : paramsJson;
  56. entity.Enabled = payload.Enabled;
  57. // TASK-002-RESET-DIMENSION-MODEL-DEV-2B:维度归属 + 报警机制按 payload 原样落库(含 null 清空)。
  58. entity.StageCode = NormalizeOrNull(payload.StageCode);
  59. entity.OrderFlowCode = NormalizeOrNull(payload.OrderFlowCode);
  60. entity.RuleMechanism = NormalizeOrNull(payload.RuleMechanism);
  61. entity.UpdatedAt = DateTime.Now;
  62. await _rep.UpdateAsync(entity);
  63. return entity;
  64. }
  65. private static string? NormalizeOrNull(string? value)
  66. {
  67. if (string.IsNullOrWhiteSpace(value)) return null;
  68. var trimmed = value.Trim();
  69. return trimmed.Length == 0 ? null : trimmed;
  70. }
  71. private static void ValidateParamsJsonByRuleType(string? ruleType, string paramsJson)
  72. {
  73. try
  74. {
  75. switch (ruleType)
  76. {
  77. case S8TimeoutRuleEvaluator.RuleTypeCode:
  78. {
  79. var p = S8TimeoutParams.Parse(paramsJson);
  80. if (string.IsNullOrWhiteSpace(p.DueAtField)
  81. || string.IsNullOrWhiteSpace(p.StatusField)
  82. || string.IsNullOrWhiteSpace(p.ExceptionTypeCode))
  83. throw new S8BizException("TIMEOUT params 缺少必填字段:dueAtField / statusField / exceptionTypeCode");
  84. break;
  85. }
  86. case S8ShortageRuleEvaluator.RuleTypeCode:
  87. {
  88. var p = S8ShortageParams.Parse(paramsJson);
  89. if (string.IsNullOrWhiteSpace(p.TargetQtyField)
  90. || string.IsNullOrWhiteSpace(p.ActualQtyField)
  91. || string.IsNullOrWhiteSpace(p.ExceptionTypeCode))
  92. throw new S8BizException("SHORTAGE params 缺少必填字段:targetQtyField / actualQtyField / exceptionTypeCode");
  93. break;
  94. }
  95. case S8OutOfRangeRuleEvaluator.RuleTypeCode:
  96. {
  97. var p = S8OutOfRangeParams.Parse(paramsJson);
  98. if (string.IsNullOrWhiteSpace(p.MeasuredValueField))
  99. throw new S8BizException("OUT_OF_RANGE params 缺少必填字段:measuredValueField");
  100. if (p.LowerBound == null && p.UpperBound == null
  101. && string.IsNullOrWhiteSpace(p.LowerBoundField)
  102. && string.IsNullOrWhiteSpace(p.UpperBoundField))
  103. throw new S8BizException("OUT_OF_RANGE params 必须提供 upperBound / lowerBound 或对应行内字段之一");
  104. break;
  105. }
  106. default:
  107. // RuleType 为空或非三类已知值:仅做 JSON 合法性校验,避免阻塞历史数据。
  108. using (JsonDocument.Parse(paramsJson)) { }
  109. break;
  110. }
  111. }
  112. catch (JsonException ex)
  113. {
  114. throw new S8BizException($"params_json 不是合法 JSON:{ex.Message}");
  115. }
  116. }
  117. public async Task<object> TestAsync(long id)
  118. {
  119. var entity = await _rep.GetByIdAsync(id) ?? throw new S8BizException("记录不存在");
  120. await ValidateAsync(entity, id);
  121. return new { id, success = true, message = "规则基础校验通过", pollIntervalSeconds = entity.PollIntervalSeconds };
  122. }
  123. // S8-SCHED-FRONTEND-1:远未来手工暂停哨兵值(与 SqlSugar DateTime 兼容;前端按 paused_until > now 判定)。
  124. private static readonly DateTime ManualPausedSentinel = new(9999, 12, 31, 23, 59, 59);
  125. /// <summary>
  126. /// S8-SCHED-FRONTEND-1:调度参数安全更新。仅修改 poll_interval_seconds / trigger_count_required /
  127. /// recover_count_required;不动 params_json / expression / rule_type / scene_code / data_source_id。
  128. /// </summary>
  129. public async Task<AdoS8WatchRule> UpdateScheduleAsync(long id, S8WatchRuleSchedulePayload payload)
  130. {
  131. var entity = await _rep.GetByIdAsync(id) ?? throw new S8BizException("记录不存在");
  132. if (payload.PollIntervalSeconds < 60 || payload.PollIntervalSeconds > 86400)
  133. throw new S8BizException("poll_interval_seconds 必须在 60–86400 之间");
  134. if (payload.TriggerCountRequired < 1 || payload.TriggerCountRequired > 10)
  135. throw new S8BizException("trigger_count_required 必须在 1–10 之间");
  136. if (payload.RecoverCountRequired < 1 || payload.RecoverCountRequired > 10)
  137. throw new S8BizException("recover_count_required 必须在 1–10 之间");
  138. await _rep.Context.Updateable<AdoS8WatchRule>()
  139. .SetColumns(x => new AdoS8WatchRule
  140. {
  141. PollIntervalSeconds = payload.PollIntervalSeconds,
  142. TriggerCountRequired = payload.TriggerCountRequired,
  143. RecoverCountRequired = payload.RecoverCountRequired,
  144. UpdatedAt = DateTime.Now
  145. })
  146. .Where(x => x.Id == id)
  147. .ExecuteCommandAsync();
  148. return await _rep.GetByIdAsync(id) ?? entity;
  149. }
  150. /// <summary>
  151. /// S8-SCHED-FRONTEND-1:立即执行一次。把 next_run_at 置为 NOW,让下个 tick 拾取。
  152. /// 不直接同步执行 evaluator;不阻塞请求;返回 200 + 提示。
  153. /// </summary>
  154. public async Task<object> RunNowAsync(long id)
  155. {
  156. var entity = await _rep.GetByIdAsync(id) ?? throw new S8BizException("记录不存在");
  157. if (!entity.Enabled)
  158. throw new S8BizException("规则未启用,不能立即执行");
  159. var now = DateTime.Now;
  160. if (entity.PausedUntil.HasValue && entity.PausedUntil.Value > now)
  161. throw new S8BizException("规则已暂停,请先恢复");
  162. if (entity.LockUntil.HasValue && entity.LockUntil.Value > now)
  163. throw new S8BizException("规则正在执行中,请稍后再试");
  164. await _rep.Context.Updateable<AdoS8WatchRule>()
  165. .SetColumns(x => new AdoS8WatchRule
  166. {
  167. NextRunAt = now,
  168. UpdatedAt = now
  169. })
  170. .Where(x => x.Id == id)
  171. .ExecuteCommandAsync();
  172. return new { id, queued = true, message = "已排队,最长 1 分钟内执行" };
  173. }
  174. /// <summary>
  175. /// S8-SCHED-FRONTEND-1:手工暂停。paused_until = 9999-12-31 哨兵 + pause_reason=MANUAL_PAUSED。
  176. /// 不强杀正在执行的 lease;当前运行完成后下一轮自然不被拾取。
  177. /// 不清 last_status / last_error。
  178. /// </summary>
  179. public async Task<object> PauseAsync(long id)
  180. {
  181. var entity = await _rep.GetByIdAsync(id) ?? throw new S8BizException("记录不存在");
  182. await _rep.Context.Updateable<AdoS8WatchRule>()
  183. .SetColumns(x => new AdoS8WatchRule
  184. {
  185. PausedUntil = ManualPausedSentinel,
  186. PauseReason = "MANUAL_PAUSED",
  187. UpdatedAt = DateTime.Now
  188. })
  189. .Where(x => x.Id == id)
  190. .ExecuteCommandAsync();
  191. return new { id, paused = true, message = "已暂停" };
  192. }
  193. /// <summary>
  194. /// S8-SCHED-FRONTEND-1:恢复。清 paused_until / pause_reason / last_error;归零 consecutive_failure_count;
  195. /// next_run_at = NOW 让下个 tick 立即拾取。不改 enabled / params_json / rule_type。
  196. /// </summary>
  197. public async Task<object> ResumeAsync(long id)
  198. {
  199. var entity = await _rep.GetByIdAsync(id) ?? throw new S8BizException("记录不存在");
  200. var now = DateTime.Now;
  201. await _rep.Context.Updateable<AdoS8WatchRule>()
  202. .SetColumns(x => new AdoS8WatchRule
  203. {
  204. PausedUntil = null,
  205. PauseReason = null,
  206. ConsecutiveFailureCount = 0,
  207. LastError = null,
  208. NextRunAt = now,
  209. UpdatedAt = now
  210. })
  211. .Where(x => x.Id == id)
  212. .ExecuteCommandAsync();
  213. return new { id, resumed = true, message = "已恢复,并将在下一轮调度中执行" };
  214. }
  215. private async Task ValidateAsync(AdoS8WatchRule body, long? id = null)
  216. {
  217. if (string.IsNullOrWhiteSpace(body.RuleCode) || string.IsNullOrWhiteSpace(body.SceneCode))
  218. throw new S8BizException("规则编码和场景编码必填");
  219. var exists = await _rep.AsQueryable()
  220. .AnyAsync(x => x.Id != (id ?? 0) && x.TenantId == body.TenantId && x.FactoryId == body.FactoryId && x.RuleCode == body.RuleCode);
  221. if (exists) throw new S8BizException("监视规则编码已存在");
  222. var dataSource = await _dataSourceRep.GetFirstAsync(x => x.Id == body.DataSourceId)
  223. ?? throw new S8BizException("关联数据源不存在");
  224. if (!dataSource.Enabled) throw new S8BizException("关联数据源未启用");
  225. var scene = await _sceneRep.GetFirstAsync(x => x.TenantId == body.TenantId && x.FactoryId == body.FactoryId && x.SceneCode == body.SceneCode)
  226. ?? throw new S8BizException("关联场景不存在");
  227. if (!scene.Enabled) throw new S8BizException("关联场景未启用");
  228. if (body.PollIntervalSeconds <= 0) throw new S8BizException("轮询间隔必须大于 0");
  229. }
  230. }