FlowEngineService.cs 50 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197
  1. using System.Text.Json;
  2. using Admin.NET.Core.Service;
  3. using Microsoft.Extensions.Logging;
  4. namespace Admin.NET.Plugin.ApprovalFlow.Service;
  5. /// <summary>
  6. /// 流程推进引擎 — 核心状态机
  7. /// 不暴露为 API,由其他 Service 内部调用
  8. /// </summary>
  9. public class FlowEngineService : ITransient
  10. {
  11. private readonly SqlSugarRepository<ApprovalFlow> _flowRep;
  12. private readonly SqlSugarRepository<ApprovalFlowInstance> _instanceRep;
  13. private readonly SqlSugarRepository<ApprovalFlowTask> _taskRep;
  14. private readonly SqlSugarRepository<ApprovalFlowLog> _logRep;
  15. private readonly SqlSugarRepository<SysUserRole> _userRoleRep;
  16. private readonly SqlSugarRepository<SysUser> _userRep;
  17. private readonly SqlSugarRepository<SysOrg> _orgRep;
  18. private readonly SqlSugarRepository<ApprovalFlowDelegate> _delegateRep;
  19. private readonly SqlSugarRepository<ApprovalFlowCompletedNode> _completedNodeRep;
  20. private readonly UserManager _userManager;
  21. private readonly SysOrgService _sysOrgService;
  22. private readonly FlowNotifyService _notifyService;
  23. private readonly ILogger<FlowEngineService> _logger;
  24. public FlowEngineService(
  25. SqlSugarRepository<ApprovalFlow> flowRep,
  26. SqlSugarRepository<ApprovalFlowInstance> instanceRep,
  27. SqlSugarRepository<ApprovalFlowTask> taskRep,
  28. SqlSugarRepository<ApprovalFlowLog> logRep,
  29. SqlSugarRepository<SysUserRole> userRoleRep,
  30. SqlSugarRepository<SysUser> userRep,
  31. SqlSugarRepository<SysOrg> orgRep,
  32. SqlSugarRepository<ApprovalFlowDelegate> delegateRep,
  33. SqlSugarRepository<ApprovalFlowCompletedNode> completedNodeRep,
  34. UserManager userManager,
  35. SysOrgService sysOrgService,
  36. FlowNotifyService notifyService,
  37. ILogger<FlowEngineService> logger)
  38. {
  39. _flowRep = flowRep;
  40. _instanceRep = instanceRep;
  41. _taskRep = taskRep;
  42. _logRep = logRep;
  43. _userRoleRep = userRoleRep;
  44. _userRep = userRep;
  45. _orgRep = orgRep;
  46. _delegateRep = delegateRep;
  47. _completedNodeRep = completedNodeRep;
  48. _userManager = userManager;
  49. _sysOrgService = sysOrgService;
  50. _notifyService = notifyService;
  51. _logger = logger;
  52. }
  53. /// <summary>
  54. /// 机构数据权限依赖实体的 OrgId;插入时必须与当前用户机构一致,否则非超管在「我发起的/待办」中查不到(OrgId=0 会被过滤)。
  55. /// </summary>
  56. private async Task<long> ResolveOrgIdForNewFlowEntityAsync()
  57. {
  58. var oid = _userManager.OrgId;
  59. if (oid > 0) return oid;
  60. var list = await _sysOrgService.GetUserOrgIdList();
  61. return list is { Count: > 0 } ? list[0] : 0;
  62. }
  63. // ═══════════════════════════════════════════
  64. // 核心生命周期
  65. // ═══════════════════════════════════════════
  66. /// <summary>
  67. /// 发起流程
  68. /// </summary>
  69. public async Task<long> StartFlow(StartFlowInput input)
  70. {
  71. var flow = await _flowRep.AsQueryable()
  72. .Where(u => u.BizType == input.BizType && u.IsPublished && !u.IsDelete)
  73. .OrderByDescending(u => u.Version)
  74. .FirstAsync() ?? throw Oops.Oh($"未找到业务类型 [{input.BizType}] 的已发布流程定义");
  75. if (string.IsNullOrWhiteSpace(flow.FlowJson))
  76. throw Oops.Oh("流程定义的 FlowJson 为空,请先设计流程图");
  77. var flowData = JsonSerializer.Deserialize<ApprovalFlowItem>(flow.FlowJson)
  78. ?? throw Oops.Oh("FlowJson 反序列化失败");
  79. var orgId = await ResolveOrgIdForNewFlowEntityAsync();
  80. var instance = new ApprovalFlowInstance
  81. {
  82. FlowId = flow.Id,
  83. FlowVersion = flow.Version,
  84. BizType = input.BizType,
  85. BizId = input.BizId,
  86. BizNo = input.BizNo,
  87. Title = input.Title ?? $"{flow.Name}-{input.BizNo}",
  88. InitiatorId = _userManager.UserId,
  89. InitiatorName = _userManager.RealName,
  90. Status = FlowInstanceStatusEnum.Running,
  91. FlowJsonSnapshot = flow.FlowJson,
  92. StartTime = DateTime.Now,
  93. OrgId = orgId,
  94. };
  95. var startNode = flowData.Nodes.FirstOrDefault(n =>
  96. n.Type is "bpmn:startEvent" or "start-node")
  97. ?? throw Oops.Oh("流程图中未找到开始节点");
  98. await _instanceRep.InsertAsync(instance);
  99. await WriteLog(instance.Id, null, startNode.Id, FlowLogActionEnum.Submit, input.Comment);
  100. // 开始节点登记为已完成,随后沿出边推进(兼容首节点即为并行网关 Fork 的拓扑)
  101. await MarkNodeCompleted(instance.Id, startNode);
  102. var firstOutgoing = flowData.Edges.Where(e => e.SourceNodeId == startNode.Id).Select(e => e.TargetNodeId).ToList();
  103. if (firstOutgoing.Count == 0)
  104. throw Oops.Oh("流程图开始节点未连接任何后继节点");
  105. foreach (var target in firstOutgoing)
  106. {
  107. await ProcessNextNode(instance, flowData, target);
  108. }
  109. await InvokeHandler(input.BizType, instance.Id, h => h.OnFlowStarted(input.BizId, instance.Id));
  110. return instance.Id;
  111. }
  112. /// <summary>
  113. /// 同意
  114. /// </summary>
  115. public async Task Approve(long taskId, string? comment)
  116. {
  117. var task = await GetPendingTask(taskId);
  118. task.Status = FlowTaskStatusEnum.Approved;
  119. task.Comment = comment;
  120. task.ActionTime = DateTime.Now;
  121. await _taskRep.AsUpdateable(task).ExecuteCommandAsync();
  122. // P1-6 审批代理:同步取消配对的本人/代理任务
  123. await CancelPairedDelegateTask(task);
  124. await WriteLog(task.InstanceId, taskId, task.NodeId, FlowLogActionEnum.Approve, comment);
  125. var instance = await _instanceRep.GetByIdAsync(task.InstanceId)
  126. ?? throw Oops.Oh("流程实例不存在");
  127. if (await IsNodeCompleted(instance, task.NodeId))
  128. {
  129. await InvokeHandler(instance.BizType, instance.Id,
  130. h => h.OnNodeCompleted(instance.BizId, instance.Id, task.NodeId, task.NodeName ?? "", task.AssigneeId));
  131. var flowData = DeserializeFlowJson(instance.FlowJsonSnapshot);
  132. await AdvanceToNext(instance, flowData, task.NodeId);
  133. }
  134. }
  135. /// <summary>
  136. /// 拒绝
  137. /// </summary>
  138. public async Task Reject(long taskId, string? comment)
  139. {
  140. var task = await GetPendingTask(taskId);
  141. task.Status = FlowTaskStatusEnum.Rejected;
  142. task.Comment = comment;
  143. task.ActionTime = DateTime.Now;
  144. await _taskRep.AsUpdateable(task).ExecuteCommandAsync();
  145. // Reject 导致流程终止:取消整个实例所有剩余 Pending 任务(含并行分支、配对代理任务等)
  146. await CancelAllPendingTasks(task.InstanceId, task.Id);
  147. var instance = await _instanceRep.GetByIdAsync(task.InstanceId)
  148. ?? throw Oops.Oh("流程实例不存在");
  149. instance.Status = FlowInstanceStatusEnum.Rejected;
  150. instance.EndTime = DateTime.Now;
  151. await _instanceRep.AsUpdateable(instance).ExecuteCommandAsync();
  152. await WriteLog(instance.Id, taskId, task.NodeId, FlowLogActionEnum.Reject, comment);
  153. // P4-17: 驳回人 = 当前被指派的审批人
  154. await InvokeHandler(instance.BizType, instance.Id,
  155. h => h.OnFlowCompleted(instance.BizId, instance.Id, FlowInstanceStatusEnum.Rejected, task.AssigneeId));
  156. await _notifyService.NotifyFlowCompleted(instance.InitiatorId, instance.Id, instance.Title, FlowInstanceStatusEnum.Rejected, instance.BizType);
  157. }
  158. // ═══════════════════════════════════════════
  159. // 扩展操作
  160. // ═══════════════════════════════════════════
  161. /// <summary>
  162. /// 转办
  163. /// </summary>
  164. public async Task Transfer(long taskId, long targetUserId, string? comment)
  165. {
  166. var task = await GetPendingTask(taskId);
  167. task.Status = FlowTaskStatusEnum.Transferred;
  168. task.Comment = comment;
  169. task.ActionTime = DateTime.Now;
  170. task.TransferToId = targetUserId;
  171. await _taskRep.AsUpdateable(task).ExecuteCommandAsync();
  172. var targetUser = await _userRep.GetByIdAsync(targetUserId);
  173. var instForOrg = await _instanceRep.GetByIdAsync(task.InstanceId);
  174. var newTask = new ApprovalFlowTask
  175. {
  176. InstanceId = task.InstanceId,
  177. NodeId = task.NodeId,
  178. NodeName = task.NodeName,
  179. AssigneeId = targetUserId,
  180. AssigneeName = targetUser?.RealName,
  181. Status = FlowTaskStatusEnum.Pending,
  182. OrgId = instForOrg?.OrgId ?? 0,
  183. };
  184. await _taskRep.InsertAsync(newTask);
  185. await WriteLog(task.InstanceId, taskId, task.NodeId, FlowLogActionEnum.Transfer,
  186. $"{comment} → 转办给 {targetUser?.RealName}");
  187. var instance = await _instanceRep.GetByIdAsync(task.InstanceId);
  188. await _notifyService.NotifyTransferred(targetUserId, task.InstanceId, instance?.Title ?? "", _userManager.RealName, instance?.BizType ?? "");
  189. }
  190. /// <summary>
  191. /// 撤回(发起人撤回)
  192. /// </summary>
  193. public async Task Withdraw(long instanceId)
  194. {
  195. var instance = await _instanceRep.GetByIdAsync(instanceId)
  196. ?? throw Oops.Oh("流程实例不存在");
  197. if (instance.InitiatorId != _userManager.UserId)
  198. throw Oops.Oh("只有发起人可以撤回");
  199. if (instance.Status != FlowInstanceStatusEnum.Running)
  200. throw Oops.Oh("当前流程状态不允许撤回");
  201. var pendingTasks = await _taskRep.AsQueryable()
  202. .Where(t => t.InstanceId == instanceId && t.Status == FlowTaskStatusEnum.Pending)
  203. .ToListAsync();
  204. var doneTasks = await _taskRep.AsQueryable()
  205. .Where(t => t.InstanceId == instanceId &&
  206. t.Status != FlowTaskStatusEnum.Pending &&
  207. t.Status != FlowTaskStatusEnum.Cancelled)
  208. .CountAsync();
  209. if (doneTasks > 0)
  210. throw Oops.Oh("已有人审批过,不可撤回");
  211. var cancelledUserIds = pendingTasks.Select(t => t.AssigneeId).Distinct().ToList();
  212. foreach (var t in pendingTasks)
  213. {
  214. t.Status = FlowTaskStatusEnum.Cancelled;
  215. t.ActionTime = DateTime.Now;
  216. }
  217. await _taskRep.AsUpdateable(pendingTasks).ExecuteCommandAsync();
  218. instance.Status = FlowInstanceStatusEnum.Cancelled;
  219. instance.EndTime = DateTime.Now;
  220. await _instanceRep.AsUpdateable(instance).ExecuteCommandAsync();
  221. await WriteLog(instanceId, null, instance.CurrentNodeId, FlowLogActionEnum.Withdraw, null);
  222. // P4-17: 撤回场景无"审批人",lastApproverId = null
  223. await InvokeHandler(instance.BizType, instanceId,
  224. h => h.OnFlowCompleted(instance.BizId, instanceId, FlowInstanceStatusEnum.Cancelled, null));
  225. await _notifyService.NotifyWithdrawn(cancelledUserIds, instanceId, instance.Title, instance.InitiatorName, instance.BizType);
  226. }
  227. /// <summary>
  228. /// 退回上一步
  229. /// </summary>
  230. public async Task ReturnToPrev(long taskId, string? comment)
  231. {
  232. var task = await GetPendingTask(taskId);
  233. var instance = await _instanceRep.GetByIdAsync(task.InstanceId)
  234. ?? throw Oops.Oh("流程实例不存在");
  235. var flowData = DeserializeFlowJson(instance.FlowJsonSnapshot);
  236. var prevNodeId = FindPrevUserTaskNodeId(flowData, task.NodeId);
  237. if (prevNodeId == null)
  238. throw Oops.Oh("已是第一个审批节点,无法退回");
  239. await CancelPendingTasks(task.InstanceId, task.NodeId);
  240. task.Status = FlowTaskStatusEnum.Returned;
  241. task.Comment = comment;
  242. task.ActionTime = DateTime.Now;
  243. await _taskRep.AsUpdateable(task).ExecuteCommandAsync();
  244. instance.CurrentNodeId = prevNodeId;
  245. await _instanceRep.AsUpdateable(instance).ExecuteCommandAsync();
  246. await CreateTasksForNode(instance, flowData, prevNodeId);
  247. await WriteLog(instance.Id, taskId, task.NodeId, FlowLogActionEnum.Return, comment);
  248. var returnedTasks = await _taskRep.AsQueryable()
  249. .Where(t => t.InstanceId == instance.Id && t.NodeId == prevNodeId && t.Status == FlowTaskStatusEnum.Pending)
  250. .ToListAsync();
  251. var returnedUserIds = returnedTasks.Select(t => t.AssigneeId).Distinct().ToList();
  252. await _notifyService.NotifyReturned(returnedUserIds, instance.Id, instance.Title, _userManager.RealName, instance.BizType);
  253. }
  254. /// <summary>
  255. /// 加签
  256. /// </summary>
  257. public async Task AddSign(long taskId, long targetUserId, string? comment)
  258. {
  259. var task = await GetPendingTask(taskId);
  260. var targetUser = await _userRep.GetByIdAsync(targetUserId);
  261. var instForAddSign = await _instanceRep.GetByIdAsync(task.InstanceId);
  262. var newTask = new ApprovalFlowTask
  263. {
  264. InstanceId = task.InstanceId,
  265. NodeId = task.NodeId,
  266. NodeName = task.NodeName,
  267. AssigneeId = targetUserId,
  268. AssigneeName = targetUser?.RealName,
  269. Status = FlowTaskStatusEnum.Pending,
  270. IsAddSign = true,
  271. AddSignById = _userManager.UserId,
  272. OrgId = instForAddSign?.OrgId ?? 0,
  273. };
  274. await _taskRep.InsertAsync(newTask);
  275. await WriteLog(task.InstanceId, taskId, task.NodeId, FlowLogActionEnum.AddSign,
  276. $"{comment} → 加签给 {targetUser?.RealName}");
  277. var instance = await _instanceRep.GetByIdAsync(task.InstanceId);
  278. await _notifyService.NotifyAddSign(targetUserId, task.InstanceId, instance?.Title ?? "", _userManager.RealName, instance?.BizType ?? "");
  279. }
  280. /// <summary>
  281. /// 手动升级 — 当前审批人主动将任务升级到更高层级
  282. /// </summary>
  283. public async Task Escalate(long taskId, string? comment)
  284. {
  285. var task = await GetPendingTask(taskId);
  286. var instance = await _instanceRep.GetByIdAsync(task.InstanceId)
  287. ?? throw Oops.Oh("流程实例不存在");
  288. var flowData = DeserializeFlowJson(instance.FlowJsonSnapshot);
  289. var node = flowData.Nodes.FirstOrDefault(n => n.Id == task.NodeId)
  290. ?? throw Oops.Oh("节点不存在");
  291. var props = node.Properties;
  292. if (props?.EnableManualEscalation != true
  293. || string.IsNullOrWhiteSpace(props.EscalationApproverType)
  294. || string.IsNullOrWhiteSpace(props.EscalationApproverIds))
  295. throw Oops.Oh("该节点未配置升级目标,无法升级");
  296. task.Status = FlowTaskStatusEnum.Escalated;
  297. task.Comment = comment;
  298. task.ActionTime = DateTime.Now;
  299. await _taskRep.AsUpdateable(task).ExecuteCommandAsync();
  300. await CancelPendingTasks(task.InstanceId, task.NodeId, task.Id);
  301. var escalationApprovers = await ResolveApprovers(
  302. new FlowProperties
  303. {
  304. ApproverType = props.EscalationApproverType,
  305. ApproverIds = props.EscalationApproverIds,
  306. ApproverNames = props.EscalationApproverNames,
  307. },
  308. instance.InitiatorId);
  309. if (escalationApprovers.Count == 0)
  310. throw Oops.Oh("升级目标审批人列表为空");
  311. var newTasks = escalationApprovers.Select(a => new ApprovalFlowTask
  312. {
  313. InstanceId = instance.Id,
  314. NodeId = task.NodeId,
  315. NodeName = task.NodeName,
  316. AssigneeId = a.userId,
  317. AssigneeName = a.userName,
  318. Status = FlowTaskStatusEnum.Pending,
  319. OrgId = instance.OrgId,
  320. }).ToList();
  321. await _taskRep.AsInsertable(newTasks).ExecuteCommandAsync();
  322. var targetNames = string.Join(", ", escalationApprovers.Select(a => a.userName));
  323. await WriteLog(instance.Id, taskId, task.NodeId, FlowLogActionEnum.Escalate,
  324. $"{comment} → 升级给 {targetNames}");
  325. var targetUserIds = escalationApprovers.Select(a => a.userId).Distinct().ToList();
  326. await _notifyService.NotifyEscalated(targetUserIds, instance.Id, instance.Title,
  327. _userManager.RealName, task.NodeName, instance.BizType);
  328. }
  329. /// <summary>
  330. /// 催办
  331. /// </summary>
  332. public async Task Urge(long instanceId)
  333. {
  334. var instance = await _instanceRep.GetByIdAsync(instanceId)
  335. ?? throw Oops.Oh("流程实例不存在");
  336. if (instance.Status != FlowInstanceStatusEnum.Running)
  337. throw Oops.Oh("当前流程不在审批中");
  338. await WriteLog(instanceId, null, instance.CurrentNodeId, FlowLogActionEnum.Urge, "催办");
  339. var pendingTasks = await _taskRep.AsQueryable()
  340. .Where(t => t.InstanceId == instanceId && t.Status == FlowTaskStatusEnum.Pending)
  341. .ToListAsync();
  342. var userIds = pendingTasks.Select(t => t.AssigneeId).Distinct().ToList();
  343. await _notifyService.NotifyUrge(userIds, instanceId, instance.Title, instance.BizType);
  344. }
  345. // ═══════════════════════════════════════════
  346. // 超时自动处理(由 FlowTimeoutJob 调用,无 UserManager 上下文)
  347. // ═══════════════════════════════════════════
  348. /// <summary>
  349. /// 扫描所有 Pending 且已超过 timeoutHours 的任务,按节点 timeoutAction 执行对应动作。
  350. /// 与 FlowTimeoutJob 扫描等价,抽出公共方法以便管理员接口立即触发(运维/E2E 测试用)。
  351. /// 返回本次处理的任务条数。
  352. /// </summary>
  353. public async Task<int> ScanTimeoutTasks(CancellationToken stoppingToken = default)
  354. {
  355. var pendingTasks = await _taskRep.Context.Queryable<ApprovalFlowTask>()
  356. .InnerJoin<ApprovalFlowInstance>((t, i) => t.InstanceId == i.Id)
  357. .Where((t, i) => t.Status == FlowTaskStatusEnum.Pending
  358. && i.Status == FlowInstanceStatusEnum.Running)
  359. .Select((t, i) => new
  360. {
  361. TaskId = t.Id,
  362. TaskCreatedAt = t.CreateTime,
  363. NodeId = t.NodeId,
  364. FlowJsonSnapshot = i.FlowJsonSnapshot,
  365. })
  366. .ToListAsync();
  367. var now = DateTime.Now;
  368. var processed = 0;
  369. foreach (var item in pendingTasks)
  370. {
  371. if (stoppingToken.IsCancellationRequested) break;
  372. if (string.IsNullOrWhiteSpace(item.FlowJsonSnapshot)) continue;
  373. ApprovalFlowItem? flowData;
  374. try { flowData = JsonSerializer.Deserialize<ApprovalFlowItem>(item.FlowJsonSnapshot); }
  375. catch { continue; }
  376. var node = flowData?.Nodes?.FirstOrDefault(n => n.Id == item.NodeId);
  377. var props = node?.Properties;
  378. if (props?.TimeoutHours == null || props.TimeoutHours <= 0) continue;
  379. if (string.IsNullOrWhiteSpace(props.TimeoutAction)) continue;
  380. var deadline = item.TaskCreatedAt.AddHours(props.TimeoutHours.Value);
  381. if (now < deadline) continue;
  382. // Notify 动作幂等:已记录过 AutoTimeout 日志的跳过,避免每轮重复催办
  383. if (props.TimeoutAction == "Notify")
  384. {
  385. var alreadyNotified = await _logRep.AsQueryable()
  386. .AnyAsync(log => log.TaskId == item.TaskId
  387. && log.Action == FlowLogActionEnum.AutoTimeout);
  388. if (alreadyNotified) continue;
  389. }
  390. try
  391. {
  392. await HandleTimeoutTask(item.TaskId);
  393. processed++;
  394. }
  395. catch
  396. {
  397. // 单任务异常隔离:吞掉继续下一个,由 Job/Admin API 层统一记日志
  398. }
  399. }
  400. return processed;
  401. }
  402. /// <summary>
  403. /// 处理单个超时任务(由定时任务调用)
  404. /// </summary>
  405. public async Task HandleTimeoutTask(long taskId)
  406. {
  407. var task = await _taskRep.GetByIdAsync(taskId);
  408. if (task == null || task.Status != FlowTaskStatusEnum.Pending) return;
  409. var instance = await _instanceRep.GetByIdAsync(task.InstanceId);
  410. if (instance == null || instance.Status != FlowInstanceStatusEnum.Running) return;
  411. var flowData = DeserializeFlowJson(instance.FlowJsonSnapshot);
  412. var node = flowData.Nodes.FirstOrDefault(n => n.Id == task.NodeId);
  413. var props = node?.Properties;
  414. if (props == null) return;
  415. switch (props.TimeoutAction)
  416. {
  417. case "Notify":
  418. await _notifyService.NotifyTimeout(
  419. new List<long> { task.AssigneeId }, instance.Id, instance.Title, instance.BizType);
  420. await WriteSystemLog(instance.Id, task.Id, task.NodeId,
  421. FlowLogActionEnum.AutoTimeout, "审批超时,已发送提醒通知");
  422. break;
  423. case "AutoApprove":
  424. await AutoApproveTask(task, instance);
  425. break;
  426. case "AutoReject":
  427. await AutoRejectTask(task, instance);
  428. break;
  429. case "AutoEscalate":
  430. await AutoEscalateTask(task, props, instance);
  431. break;
  432. }
  433. }
  434. private async Task AutoApproveTask(ApprovalFlowTask task, ApprovalFlowInstance instance)
  435. {
  436. task.Status = FlowTaskStatusEnum.Approved;
  437. task.Comment = "系统自动通过(超时)";
  438. task.ActionTime = DateTime.Now;
  439. await _taskRep.AsUpdateable(task).ExecuteCommandAsync();
  440. await WriteSystemLog(instance.Id, task.Id, task.NodeId,
  441. FlowLogActionEnum.AutoTimeout, "审批超时,系统自动通过");
  442. if (await IsNodeCompleted(instance, task.NodeId))
  443. {
  444. // P4-17: 系统自动通过,无人工审批人,approverUserId = null
  445. await InvokeHandler(instance.BizType, instance.Id,
  446. h => h.OnNodeCompleted(instance.BizId, instance.Id, task.NodeId, task.NodeName ?? "", null));
  447. var flowData = DeserializeFlowJson(instance.FlowJsonSnapshot);
  448. await AdvanceToNext(instance, flowData, task.NodeId);
  449. }
  450. }
  451. private async Task AutoRejectTask(ApprovalFlowTask task, ApprovalFlowInstance instance)
  452. {
  453. task.Status = FlowTaskStatusEnum.Rejected;
  454. task.Comment = "系统自动拒绝(超时)";
  455. task.ActionTime = DateTime.Now;
  456. await _taskRep.AsUpdateable(task).ExecuteCommandAsync();
  457. await CancelPendingTasks(task.InstanceId, task.NodeId, task.Id);
  458. instance.Status = FlowInstanceStatusEnum.Rejected;
  459. instance.EndTime = DateTime.Now;
  460. await _instanceRep.AsUpdateable(instance).ExecuteCommandAsync();
  461. await WriteSystemLog(instance.Id, task.Id, task.NodeId,
  462. FlowLogActionEnum.AutoTimeout, "审批超时,系统自动拒绝");
  463. // P4-17: 系统自动拒绝,无人工审批人,lastApproverId = null
  464. await InvokeHandler(instance.BizType, instance.Id,
  465. h => h.OnFlowCompleted(instance.BizId, instance.Id, FlowInstanceStatusEnum.Rejected, null));
  466. await _notifyService.NotifyFlowCompleted(instance.InitiatorId, instance.Id,
  467. instance.Title, FlowInstanceStatusEnum.Rejected, instance.BizType);
  468. }
  469. private async Task AutoEscalateTask(ApprovalFlowTask task, FlowProperties nodeProps, ApprovalFlowInstance instance)
  470. {
  471. if (string.IsNullOrWhiteSpace(nodeProps.EscalationApproverType)
  472. || string.IsNullOrWhiteSpace(nodeProps.EscalationApproverIds))
  473. return;
  474. task.Status = FlowTaskStatusEnum.Escalated;
  475. task.Comment = "系统自动升级(超时)";
  476. task.ActionTime = DateTime.Now;
  477. await _taskRep.AsUpdateable(task).ExecuteCommandAsync();
  478. await CancelPendingTasks(task.InstanceId, task.NodeId, task.Id);
  479. var approvers = await ResolveApprovers(
  480. new FlowProperties
  481. {
  482. ApproverType = nodeProps.EscalationApproverType,
  483. ApproverIds = nodeProps.EscalationApproverIds,
  484. },
  485. instance.InitiatorId);
  486. if (approvers.Count == 0) return;
  487. var newTasks = approvers.Select(a => new ApprovalFlowTask
  488. {
  489. InstanceId = instance.Id,
  490. NodeId = task.NodeId,
  491. NodeName = task.NodeName,
  492. AssigneeId = a.userId,
  493. AssigneeName = a.userName,
  494. Status = FlowTaskStatusEnum.Pending,
  495. OrgId = instance.OrgId,
  496. }).ToList();
  497. await _taskRep.AsInsertable(newTasks).ExecuteCommandAsync();
  498. var targetNames = string.Join(", ", approvers.Select(a => a.userName));
  499. await WriteSystemLog(instance.Id, task.Id, task.NodeId,
  500. FlowLogActionEnum.AutoTimeout, $"审批超时,自动升级给 {targetNames}");
  501. var targetUserIds = approvers.Select(a => a.userId).Distinct().ToList();
  502. await _notifyService.NotifyEscalated(targetUserIds, instance.Id, instance.Title,
  503. "系统", task.NodeName, instance.BizType);
  504. }
  505. private async Task WriteSystemLog(long instanceId, long? taskId, string? nodeId,
  506. FlowLogActionEnum action, string? comment)
  507. {
  508. await _logRep.InsertAsync(new ApprovalFlowLog
  509. {
  510. InstanceId = instanceId,
  511. TaskId = taskId,
  512. NodeId = nodeId,
  513. Action = action,
  514. OperatorId = 0,
  515. OperatorName = "系统",
  516. Comment = comment,
  517. });
  518. }
  519. // ═══════════════════════════════════════════
  520. // 内部引擎方法
  521. // ═══════════════════════════════════════════
  522. /// <summary>
  523. /// 推进到下一节点。支持并行网关(Fork / Join)。
  524. /// 行为约定:
  525. /// - 进入本方法前,调用方(如 <see cref="Approve"/>)应已将 <paramref name="currentNodeId"/>(userTask)标记为完成节点;
  526. /// 本方法会将途经的网关节点也写入 <see cref="ApprovalFlowCompletedNode"/>。
  527. /// - 并行网关 Fork(出边&gt;=2):沿每条出边递归推进;
  528. /// 并行网关 Join(入边&gt;=2):校验所有前驱节点是否都在"已完成"集合中,
  529. /// 任一尚未完成则**静默等待**(不报错、不推进),由后续分支完成后再次触发 Join 校验。
  530. /// </summary>
  531. private async Task AdvanceToNext(ApprovalFlowInstance instance, ApprovalFlowItem flowData, string currentNodeId)
  532. {
  533. // 当前节点可能是 userTask(完成记录由 Approve 写入)或网关(入口处已写入);此处统一确保幂等入库
  534. var currentNode = flowData.Nodes.FirstOrDefault(n => n.Id == currentNodeId);
  535. await MarkNodeCompleted(instance.Id, currentNode);
  536. var outgoingEdges = flowData.Edges.Where(e => e.SourceNodeId == currentNodeId).ToList();
  537. if (outgoingEdges.Count == 0)
  538. {
  539. await CompleteInstance(instance, FlowInstanceStatusEnum.Approved);
  540. return;
  541. }
  542. // 非并行网关场景:当前节点一般只有 1 条出边
  543. foreach (var edge in outgoingEdges)
  544. {
  545. await ProcessNextNode(instance, flowData, edge.TargetNodeId);
  546. }
  547. }
  548. /// <summary>
  549. /// 处理某个"下一节点"。根据节点类型分发:
  550. /// - endEvent:所有分支任务都结束时触发实例完成
  551. /// - exclusiveGateway:按条件选择分支
  552. /// - parallelGateway:Fork 并行分发;Join 等待所有前驱完成
  553. /// - userTask / 其他:创建任务
  554. /// </summary>
  555. private async Task ProcessNextNode(ApprovalFlowInstance instance, ApprovalFlowItem flowData, string nextNodeId)
  556. {
  557. var nextNode = flowData.Nodes.FirstOrDefault(n => n.Id == nextNodeId);
  558. if (nextNode == null)
  559. {
  560. await CompleteInstance(instance, FlowInstanceStatusEnum.Approved);
  561. return;
  562. }
  563. if (nextNode.Type is "bpmn:endEvent" or "end-node")
  564. {
  565. // 所有并行分支都已结束(无其它 Pending 任务)才真正完成实例
  566. var hasOtherPending = await _taskRep.AsQueryable()
  567. .AnyAsync(t => t.InstanceId == instance.Id && t.Status == FlowTaskStatusEnum.Pending);
  568. if (hasOtherPending) return;
  569. await CompleteInstance(instance, FlowInstanceStatusEnum.Approved);
  570. return;
  571. }
  572. if (nextNode.Type is "bpmn:exclusiveGateway")
  573. {
  574. await MarkNodeCompleted(instance.Id, nextNode);
  575. var bizData = await GetBizData(instance.BizType, instance.BizId);
  576. var targetNodeId = EvaluateGateway(nextNode.Properties?.Conditions, flowData, nextNode.Id, bizData);
  577. instance.CurrentNodeId = targetNodeId;
  578. await _instanceRep.AsUpdateable(instance).UpdateColumns(i => new { i.CurrentNodeId }).ExecuteCommandAsync();
  579. await ProcessNextNode(instance, flowData, targetNodeId);
  580. return;
  581. }
  582. if (nextNode.Type is "bpmn:parallelGateway")
  583. {
  584. var incoming = flowData.Edges.Where(e => e.TargetNodeId == nextNode.Id).Select(e => e.SourceNodeId).ToList();
  585. var outgoing = flowData.Edges.Where(e => e.SourceNodeId == nextNode.Id).Select(e => e.TargetNodeId).ToList();
  586. // Join 语义:入边 >= 2,需等所有前驱都已完成
  587. if (incoming.Count >= 2)
  588. {
  589. var completedSet = await GetCompletedNodeIdSet(instance.Id);
  590. if (!incoming.All(p => completedSet.Contains(p)))
  591. {
  592. // 未汇合,静默等待后续分支抵达
  593. return;
  594. }
  595. }
  596. // Fork 或 Join 通过:标记网关完成,沿所有出边推进
  597. await MarkNodeCompleted(instance.Id, nextNode);
  598. foreach (var target in outgoing)
  599. {
  600. await ProcessNextNode(instance, flowData, target);
  601. }
  602. return;
  603. }
  604. // userTask 或其他:创建任务
  605. instance.CurrentNodeId = nextNodeId;
  606. await _instanceRep.AsUpdateable(instance).UpdateColumns(i => new { i.CurrentNodeId }).ExecuteCommandAsync();
  607. await CreateTasksForNode(instance, flowData, nextNodeId);
  608. }
  609. /// <summary>
  610. /// 标记节点已完成(幂等:重复写入被唯一索引拦截后忽略)
  611. /// </summary>
  612. private async Task MarkNodeCompleted(long instanceId, ApprovalFlowNodeItem? node)
  613. {
  614. if (node == null) return;
  615. try
  616. {
  617. await _completedNodeRep.InsertAsync(new ApprovalFlowCompletedNode
  618. {
  619. InstanceId = instanceId,
  620. NodeId = node.Id,
  621. NodeName = node.Properties?.NodeName ?? node.Text?.Value,
  622. NodeType = node.Type,
  623. CompletedTime = DateTime.Now,
  624. });
  625. }
  626. catch
  627. {
  628. // 并发场景下可能触发唯一索引冲突,忽略(已存在即可)
  629. }
  630. }
  631. /// <summary>
  632. /// 查询实例已完成节点 Id 集合
  633. /// </summary>
  634. private async Task<HashSet<string>> GetCompletedNodeIdSet(long instanceId)
  635. {
  636. var ids = await _completedNodeRep.AsQueryable()
  637. .Where(c => c.InstanceId == instanceId)
  638. .Select(c => c.NodeId)
  639. .ToListAsync();
  640. return new HashSet<string>(ids);
  641. }
  642. private async Task CompleteInstance(ApprovalFlowInstance instance, FlowInstanceStatusEnum status)
  643. {
  644. // 幂等:并行分支同时到达 end 时避免重复完成
  645. var latest = await _instanceRep.GetByIdAsync(instance.Id);
  646. if (latest == null || latest.Status != FlowInstanceStatusEnum.Running) return;
  647. instance.Status = status;
  648. instance.EndTime = DateTime.Now;
  649. await _instanceRep.AsUpdateable(instance)
  650. .UpdateColumns(i => new { i.Status, i.EndTime })
  651. .ExecuteCommandAsync();
  652. // P4-17: 正常完成路径的 lastApproverId = 最后一条人工 Approve 日志的操作人(系统自动通过不算)
  653. var lastApproverId = status == FlowInstanceStatusEnum.Approved
  654. ? await GetLastHumanApproverIdAsync(instance.Id)
  655. : null;
  656. await InvokeHandler(instance.BizType, instance.Id,
  657. h => h.OnFlowCompleted(instance.BizId, instance.Id, status, lastApproverId));
  658. await _notifyService.NotifyFlowCompleted(instance.InitiatorId, instance.Id, instance.Title, status, instance.BizType);
  659. }
  660. /// <summary>
  661. /// P4-17: 查询该实例最后一条人工 Approve 日志的操作人 UserId
  662. /// (系统超时自动通过写入的是 AutoTimeout,不会被此查询命中)
  663. /// </summary>
  664. private async Task<long?> GetLastHumanApproverIdAsync(long instanceId)
  665. {
  666. var lastLog = await _logRep.AsQueryable()
  667. .Where(l => l.InstanceId == instanceId && l.Action == FlowLogActionEnum.Approve)
  668. .OrderByDescending(l => l.CreateTime)
  669. .FirstAsync();
  670. return lastLog?.OperatorId > 0 ? lastLog.OperatorId : null;
  671. }
  672. private async Task CreateTasksForNode(ApprovalFlowInstance instance, ApprovalFlowItem flowData, string nodeId)
  673. {
  674. var node = flowData.Nodes.FirstOrDefault(n => n.Id == nodeId)
  675. ?? throw Oops.Oh($"FlowJson 中未找到节点 [{nodeId}]");
  676. var approvers = await ResolveApprovers(node.Properties, instance.InitiatorId);
  677. if (approvers.Count == 0)
  678. throw Oops.Oh($"节点 [{node.Properties?.NodeName ?? nodeId}] 未配置审批人或审批人列表为空");
  679. var nodeName = node.Properties?.NodeName ?? node.Text?.Value;
  680. var tasks = approvers.Select(a => new ApprovalFlowTask
  681. {
  682. InstanceId = instance.Id,
  683. NodeId = nodeId,
  684. NodeName = nodeName,
  685. AssigneeId = a.userId,
  686. AssigneeName = a.userName,
  687. Status = FlowTaskStatusEnum.Pending,
  688. OrgId = instance.OrgId,
  689. }).ToList();
  690. // P1-6 审批代理:为每个原审批人检查是否存在有效代理,是则并行创建一条代理任务
  691. var delegateTasks = new List<ApprovalFlowTask>();
  692. foreach (var a in approvers)
  693. {
  694. var del = await FindEffectiveDelegate(a.userId, instance.BizType);
  695. if (del == null) continue;
  696. // 代理人和原审批人不能重复,代理人也不能是原审批人列表中其他人(避免同一人两条任务)
  697. if (approvers.Any(x => x.userId == del.DelegateUserId)) continue;
  698. delegateTasks.Add(new ApprovalFlowTask
  699. {
  700. InstanceId = instance.Id,
  701. NodeId = nodeId,
  702. NodeName = nodeName,
  703. AssigneeId = del.DelegateUserId,
  704. AssigneeName = del.DelegateUserName,
  705. Status = FlowTaskStatusEnum.Pending,
  706. OrgId = instance.OrgId,
  707. IsDelegate = true,
  708. DelegateForUserId = a.userId,
  709. DelegateForUserName = a.userName,
  710. });
  711. }
  712. var allTasks = tasks.Concat(delegateTasks).ToList();
  713. await _taskRep.AsInsertable(allTasks).ExecuteCommandAsync();
  714. var assigneeIds = allTasks.Select(t => t.AssigneeId).Distinct().ToList();
  715. await _notifyService.NotifyNewTask(assigneeIds, instance.Id, instance.Title, nodeName, instance.BizType);
  716. }
  717. /// <summary>
  718. /// 查找指定用户当前生效的审批代理(时间窗口内 + 已启用 + BizType 匹配或全局)
  719. /// </summary>
  720. private async Task<ApprovalFlowDelegate?> FindEffectiveDelegate(long userId, string? bizType)
  721. {
  722. var now = DateTime.Now;
  723. return await _delegateRep.AsQueryable()
  724. .Where(d => d.UserId == userId
  725. && d.IsEnabled
  726. && d.StartTime <= now
  727. && d.EndTime >= now
  728. && (string.IsNullOrEmpty(d.BizType) || d.BizType == bizType))
  729. .OrderBy(d => d.BizType == null ? 1 : 0) // 优先匹配指定 BizType 的代理
  730. .OrderByDescending(d => d.CreateTime)
  731. .FirstAsync();
  732. }
  733. /// <summary>
  734. /// 取消与已完成任务配对的代理任务(P1-6)
  735. /// - 本人任务完成:取消对应的代理任务
  736. /// - 代理任务完成:取消对应的本人任务
  737. /// </summary>
  738. private async Task CancelPairedDelegateTask(ApprovalFlowTask completedTask)
  739. {
  740. ApprovalFlowTask? paired;
  741. if (completedTask.IsDelegate)
  742. {
  743. var originalUserId = completedTask.DelegateForUserId ?? 0;
  744. if (originalUserId == 0) return;
  745. paired = await _taskRep.AsQueryable()
  746. .Where(t => t.InstanceId == completedTask.InstanceId
  747. && t.NodeId == completedTask.NodeId
  748. && t.Status == FlowTaskStatusEnum.Pending
  749. && t.AssigneeId == originalUserId
  750. && !t.IsDelegate)
  751. .FirstAsync();
  752. }
  753. else
  754. {
  755. paired = await _taskRep.AsQueryable()
  756. .Where(t => t.InstanceId == completedTask.InstanceId
  757. && t.NodeId == completedTask.NodeId
  758. && t.Status == FlowTaskStatusEnum.Pending
  759. && t.IsDelegate
  760. && t.DelegateForUserId == completedTask.AssigneeId)
  761. .FirstAsync();
  762. }
  763. if (paired == null) return;
  764. paired.Status = FlowTaskStatusEnum.Cancelled;
  765. paired.ActionTime = DateTime.Now;
  766. await _taskRep.AsUpdateable(paired).ExecuteCommandAsync();
  767. }
  768. private async Task<List<(long userId, string userName)>> ResolveApprovers(FlowProperties? props, long initiatorId)
  769. {
  770. if (props == null || string.IsNullOrWhiteSpace(props.ApproverType))
  771. return new List<(long, string)>();
  772. var approverType = props.ApproverType;
  773. if (approverType == nameof(ApproverTypeEnum.Initiator))
  774. {
  775. var initiator = await _userRep.GetByIdAsync(initiatorId);
  776. return initiator != null
  777. ? new List<(long, string)> { (initiator.Id, initiator.RealName ?? "") }
  778. : new List<(long, string)>();
  779. }
  780. if (string.IsNullOrWhiteSpace(props.ApproverIds))
  781. return new List<(long, string)>();
  782. var ids = props.ApproverIds.Split(',', StringSplitOptions.RemoveEmptyEntries)
  783. .Select(s => long.TryParse(s.Trim(), out var v) ? v : 0).Where(id => id > 0).ToList();
  784. if (approverType == nameof(ApproverTypeEnum.SpecificUser))
  785. {
  786. var users = await _userRep.AsQueryable()
  787. .Where(u => ids.Contains(u.Id)).ToListAsync();
  788. return users.Select(u => (u.Id, u.RealName ?? "")).ToList();
  789. }
  790. if (approverType == nameof(ApproverTypeEnum.Role))
  791. {
  792. var userIds = await _userRoleRep.AsQueryable()
  793. .Where(ur => ids.Contains(ur.RoleId))
  794. .Select(ur => ur.UserId)
  795. .ToListAsync();
  796. var users = await _userRep.AsQueryable()
  797. .Where(u => userIds.Contains(u.Id)).ToListAsync();
  798. return users.Select(u => (u.Id, u.RealName ?? "")).ToList();
  799. }
  800. if (approverType == nameof(ApproverTypeEnum.Department))
  801. {
  802. var users = await _userRep.AsQueryable()
  803. .Where(u => ids.Contains(u.OrgId)).ToListAsync();
  804. return users.Select(u => (u.Id, u.RealName ?? "")).ToList();
  805. }
  806. if (approverType == nameof(ApproverTypeEnum.DepartmentLeader))
  807. {
  808. var initiator = await _userRep.GetByIdAsync(initiatorId);
  809. if (initiator == null || initiator.OrgId <= 0)
  810. return new List<(long, string)>();
  811. var org = await _orgRep.GetByIdAsync(initiator.OrgId);
  812. if (org?.DirectorId != null && org.DirectorId > 0)
  813. {
  814. var director = await _userRep.GetByIdAsync(org.DirectorId.Value);
  815. if (director != null)
  816. return new List<(long, string)> { (director.Id, director.RealName ?? "") };
  817. }
  818. if (initiator.ManagerUserId != null && initiator.ManagerUserId > 0)
  819. {
  820. var manager = await _userRep.GetByIdAsync(initiator.ManagerUserId.Value);
  821. if (manager != null)
  822. return new List<(long, string)> { (manager.Id, manager.RealName ?? "") };
  823. }
  824. return new List<(long, string)>();
  825. }
  826. return new List<(long, string)>();
  827. }
  828. /// <summary>
  829. /// 评估排他网关 — 依次尝试各非默认分支的条件表达式,首个匹配的获胜;
  830. /// 全不匹配则走默认分支;无默认则走第一条出边
  831. /// 支持简单比较表达式:variable op value(op: ==,!=,>,>=,&lt;,&lt;=)
  832. /// </summary>
  833. private string EvaluateGateway(List<GatewayCondition>? conditions, ApprovalFlowItem flowData, string gatewayNodeId, Dictionary<string, object>? bizData)
  834. {
  835. if (conditions != null && conditions.Count > 0 && bizData != null && bizData.Count > 0)
  836. {
  837. foreach (var cond in conditions.Where(c => !c.IsDefault))
  838. {
  839. if (!string.IsNullOrWhiteSpace(cond.Expression) && EvalSimpleExpression(cond.Expression, bizData))
  840. return cond.TargetNodeId;
  841. }
  842. var defaultBranch = conditions.FirstOrDefault(c => c.IsDefault);
  843. if (defaultBranch != null)
  844. return defaultBranch.TargetNodeId;
  845. }
  846. else if (conditions != null && conditions.Count > 0)
  847. {
  848. var defaultBranch = conditions.FirstOrDefault(c => c.IsDefault);
  849. if (defaultBranch != null) return defaultBranch.TargetNodeId;
  850. return conditions.First().TargetNodeId;
  851. }
  852. var edge = flowData.Edges.FirstOrDefault(e => e.SourceNodeId == gatewayNodeId);
  853. return edge?.TargetNodeId ?? throw Oops.Oh("排他网关没有出边");
  854. }
  855. /// <summary>
  856. /// 简单表达式求值:支持 "field op value" 格式(如 "urgent == 1", "customLevel >= 3", "amount > 10000")
  857. /// 多条件用 &amp;&amp; 连接
  858. /// </summary>
  859. private static bool EvalSimpleExpression(string expression, Dictionary<string, object> bizData)
  860. {
  861. var parts = expression.Split("&&", StringSplitOptions.TrimEntries);
  862. foreach (var part in parts)
  863. {
  864. if (!EvalSingleComparison(part.Trim(), bizData))
  865. return false;
  866. }
  867. return true;
  868. }
  869. private static bool EvalSingleComparison(string expr, Dictionary<string, object> bizData)
  870. {
  871. string[] ops = { ">=", "<=", "!=", "==", ">", "<" };
  872. foreach (var op in ops)
  873. {
  874. var idx = expr.IndexOf(op, StringComparison.Ordinal);
  875. if (idx < 0) continue;
  876. var fieldName = expr[..idx].Trim();
  877. var valueStr = expr[(idx + op.Length)..].Trim().Trim('"', '\'');
  878. if (!bizData.TryGetValue(fieldName, out var fieldValue))
  879. return false;
  880. if (decimal.TryParse(fieldValue?.ToString(), out var numLeft) && decimal.TryParse(valueStr, out var numRight))
  881. {
  882. return op switch
  883. {
  884. "==" => numLeft == numRight,
  885. "!=" => numLeft != numRight,
  886. ">" => numLeft > numRight,
  887. ">=" => numLeft >= numRight,
  888. "<" => numLeft < numRight,
  889. "<=" => numLeft <= numRight,
  890. _ => false,
  891. };
  892. }
  893. var strLeft = fieldValue?.ToString() ?? "";
  894. return op switch
  895. {
  896. "==" => strLeft.Equals(valueStr, StringComparison.OrdinalIgnoreCase),
  897. "!=" => !strLeft.Equals(valueStr, StringComparison.OrdinalIgnoreCase),
  898. _ => false,
  899. };
  900. }
  901. return false;
  902. }
  903. private async Task<Dictionary<string, object>?> GetBizData(string bizType, long bizId)
  904. {
  905. var handlers = App.GetServices<IFlowBizHandler>();
  906. var handler = handlers?.FirstOrDefault(h => h.BizType == bizType);
  907. if (handler == null) return null;
  908. return await handler.GetBizData(bizId);
  909. }
  910. private string? FindNextNodeId(ApprovalFlowItem flowData, string currentNodeId)
  911. {
  912. var edge = flowData.Edges.FirstOrDefault(e => e.SourceNodeId == currentNodeId);
  913. return edge?.TargetNodeId;
  914. }
  915. private string? FindPrevUserTaskNodeId(ApprovalFlowItem flowData, string currentNodeId)
  916. {
  917. var inEdge = flowData.Edges.FirstOrDefault(e => e.TargetNodeId == currentNodeId);
  918. if (inEdge == null) return null;
  919. var prevNode = flowData.Nodes.FirstOrDefault(n => n.Id == inEdge.SourceNodeId);
  920. if (prevNode == null) return null;
  921. if (prevNode.Type is "bpmn:userTask" or "user-node" or "task-node")
  922. return prevNode.Id;
  923. // 递归跳过网关等非用户任务节点
  924. return FindPrevUserTaskNodeId(flowData, prevNode.Id);
  925. }
  926. private async Task<bool> IsNodeCompleted(ApprovalFlowInstance instance, string nodeId)
  927. {
  928. var flowData = DeserializeFlowJson(instance.FlowJsonSnapshot);
  929. var node = flowData.Nodes.FirstOrDefault(n => n.Id == nodeId);
  930. var mode = node?.Properties?.MultiApproveMode;
  931. if (mode == nameof(MultiApproveModeEnum.All))
  932. {
  933. var pendingCount = await _taskRep.AsQueryable()
  934. .Where(t => t.InstanceId == instance.Id && t.NodeId == nodeId && t.Status == FlowTaskStatusEnum.Pending)
  935. .CountAsync();
  936. return pendingCount == 0;
  937. }
  938. // 默认或签(Any):一人通过即完成,取消其他 Pending
  939. await CancelPendingTasks(instance.Id, nodeId);
  940. return true;
  941. }
  942. private async Task CancelPendingTasks(long instanceId, string nodeId, long? excludeTaskId = null)
  943. {
  944. var tasks = await _taskRep.AsQueryable()
  945. .Where(t => t.InstanceId == instanceId && t.NodeId == nodeId && t.Status == FlowTaskStatusEnum.Pending)
  946. .WhereIF(excludeTaskId.HasValue, t => t.Id != excludeTaskId!.Value)
  947. .ToListAsync();
  948. foreach (var t in tasks)
  949. {
  950. t.Status = FlowTaskStatusEnum.Cancelled;
  951. t.ActionTime = DateTime.Now;
  952. }
  953. if (tasks.Count > 0)
  954. await _taskRep.AsUpdateable(tasks).ExecuteCommandAsync();
  955. }
  956. /// <summary>
  957. /// 取消整个实例下所有剩余 Pending 任务(Reject 时跨并行分支使用)
  958. /// </summary>
  959. private async Task CancelAllPendingTasks(long instanceId, long? excludeTaskId = null)
  960. {
  961. var tasks = await _taskRep.AsQueryable()
  962. .Where(t => t.InstanceId == instanceId && t.Status == FlowTaskStatusEnum.Pending)
  963. .WhereIF(excludeTaskId.HasValue, t => t.Id != excludeTaskId!.Value)
  964. .ToListAsync();
  965. foreach (var t in tasks)
  966. {
  967. t.Status = FlowTaskStatusEnum.Cancelled;
  968. t.ActionTime = DateTime.Now;
  969. }
  970. if (tasks.Count > 0)
  971. await _taskRep.AsUpdateable(tasks).ExecuteCommandAsync();
  972. }
  973. private async Task<ApprovalFlowTask> GetPendingTask(long taskId)
  974. {
  975. var task = await _taskRep.GetByIdAsync(taskId)
  976. ?? throw Oops.Oh("审批任务不存在");
  977. if (task.Status != FlowTaskStatusEnum.Pending)
  978. throw Oops.Oh("该任务已处理");
  979. if (task.AssigneeId != _userManager.UserId)
  980. throw Oops.Oh("当前用户不是该任务的审批人");
  981. return task;
  982. }
  983. private async Task WriteLog(long instanceId, long? taskId, string? nodeId, FlowLogActionEnum action, string? comment)
  984. {
  985. await _logRep.InsertAsync(new ApprovalFlowLog
  986. {
  987. InstanceId = instanceId,
  988. TaskId = taskId,
  989. NodeId = nodeId,
  990. Action = action,
  991. OperatorId = _userManager.UserId,
  992. OperatorName = _userManager.RealName,
  993. Comment = comment,
  994. });
  995. }
  996. private static ApprovalFlowItem DeserializeFlowJson(string? json)
  997. {
  998. if (string.IsNullOrWhiteSpace(json))
  999. throw Oops.Oh("FlowJson 快照为空");
  1000. return JsonSerializer.Deserialize<ApprovalFlowItem>(json)
  1001. ?? throw Oops.Oh("FlowJson 反序列化失败");
  1002. }
  1003. private async Task InvokeHandler(string bizType, long instanceId, Func<IFlowBizHandler, Task> action)
  1004. {
  1005. var handlers = App.GetServices<IFlowBizHandler>();
  1006. var handler = handlers?.FirstOrDefault(h => h.BizType == bizType);
  1007. if (handler == null)
  1008. return;
  1009. try
  1010. {
  1011. await action(handler);
  1012. }
  1013. catch (Exception ex)
  1014. {
  1015. _logger.LogError(ex, "FlowBizHandler 执行失败: BizType={BizType}, InstanceId={InstanceId}, ErrorType={ErrorType}, ErrorMessage={ErrorMessage}",
  1016. bizType, instanceId, ex.GetType().FullName, ex.Message);
  1017. var notifiers = App.GetServices<IFlowHandlerFailureNotifier>();
  1018. if (notifiers != null)
  1019. {
  1020. foreach (var notifier in notifiers)
  1021. {
  1022. try
  1023. {
  1024. await notifier.NotifyAsync(bizType, instanceId, ex);
  1025. }
  1026. catch (Exception notifyEx)
  1027. {
  1028. _logger.LogError(notifyEx, "FlowHandlerFailureNotifier 执行失败: NotifierType={NotifierType}, BizType={BizType}, InstanceId={InstanceId}",
  1029. notifier.GetType().FullName, bizType, instanceId);
  1030. }
  1031. }
  1032. }
  1033. throw;
  1034. }
  1035. }
  1036. }