FlowEngineService.cs 52 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213
  1. using System.Text.Json;
  2. using Admin.NET.Core.Service;
  3. using Microsoft.Extensions.Logging;
  4. namespace Admin.NET.Plugin.ApprovalFlow.Service;
  5. /// <summary>
  6. /// 流程推进引擎 — 核心状态机
  7. /// 不暴露为 API,由其他 Service 内部调用
  8. /// </summary>
  9. public class FlowEngineService : ITransient
  10. {
  11. private readonly SqlSugarRepository<ApprovalFlow> _flowRep;
  12. private readonly SqlSugarRepository<ApprovalFlowInstance> _instanceRep;
  13. private readonly SqlSugarRepository<ApprovalFlowTask> _taskRep;
  14. private readonly SqlSugarRepository<ApprovalFlowLog> _logRep;
  15. private readonly SqlSugarRepository<SysUserRole> _userRoleRep;
  16. private readonly SqlSugarRepository<SysUser> _userRep;
  17. private readonly SqlSugarRepository<SysOrg> _orgRep;
  18. private readonly SqlSugarRepository<ApprovalFlowDelegate> _delegateRep;
  19. private readonly SqlSugarRepository<ApprovalFlowCompletedNode> _completedNodeRep;
  20. private readonly UserManager _userManager;
  21. private readonly SysOrgService _sysOrgService;
  22. private readonly FlowNotifyService _notifyService;
  23. private readonly ILogger<FlowEngineService> _logger;
  24. public FlowEngineService(
  25. SqlSugarRepository<ApprovalFlow> flowRep,
  26. SqlSugarRepository<ApprovalFlowInstance> instanceRep,
  27. SqlSugarRepository<ApprovalFlowTask> taskRep,
  28. SqlSugarRepository<ApprovalFlowLog> logRep,
  29. SqlSugarRepository<SysUserRole> userRoleRep,
  30. SqlSugarRepository<SysUser> userRep,
  31. SqlSugarRepository<SysOrg> orgRep,
  32. SqlSugarRepository<ApprovalFlowDelegate> delegateRep,
  33. SqlSugarRepository<ApprovalFlowCompletedNode> completedNodeRep,
  34. UserManager userManager,
  35. SysOrgService sysOrgService,
  36. FlowNotifyService notifyService,
  37. ILogger<FlowEngineService> logger)
  38. {
  39. _flowRep = flowRep;
  40. _instanceRep = instanceRep;
  41. _taskRep = taskRep;
  42. _logRep = logRep;
  43. _userRoleRep = userRoleRep;
  44. _userRep = userRep;
  45. _orgRep = orgRep;
  46. _delegateRep = delegateRep;
  47. _completedNodeRep = completedNodeRep;
  48. _userManager = userManager;
  49. _sysOrgService = sysOrgService;
  50. _notifyService = notifyService;
  51. _logger = logger;
  52. }
  53. /// <summary>
  54. /// 机构数据权限依赖实体的 OrgId;插入时必须与当前用户机构一致,否则非超管在「我发起的/待办」中查不到(OrgId=0 会被过滤)。
  55. /// </summary>
  56. private async Task<long> ResolveOrgIdForNewFlowEntityAsync()
  57. {
  58. var oid = _userManager.OrgId;
  59. if (oid > 0) return oid;
  60. var list = await _sysOrgService.GetUserOrgIdList();
  61. return list is { Count: > 0 } ? list[0] : 0;
  62. }
  63. // ═══════════════════════════════════════════
  64. // 核心生命周期
  65. // ═══════════════════════════════════════════
  66. /// <summary>
  67. /// 发起流程
  68. /// </summary>
  69. public async Task<long> StartFlow(StartFlowInput input)
  70. {
  71. // S8-S1-EXCEPTION-FLOW-SYNC-FIX-1:流程定义是「全局配置」,不应受 SqlSugarFilter 的数据范围(DataScope)隔离。
  72. // ApprovalFlow 继承 EntityBaseOrgDel,会被数据范围过滤命中:
  73. // - 角色 DataScope=Self(仅本人) → 加「CreateUserId == 当前用户」过滤(按 entityType 注册,非 IOrgIdFilter);
  74. // - 角色 DataScope=Dept/DeptChild → 加 IOrgIdFilter「OrgId ∈ 用户机构集」过滤。
  75. // 既有定义(如 TB001/EXCEPTION_REPORT)的 CreateUserId 为流程创建者(非发起人)、OrgId=0,非超管用户在任一受限数据范围下都会被过滤掉
  76. // → 此查询返回 null → 抛「未找到已发布流程定义」→ StartFlow 失败 → 上游静默吞、建单不起流。
  77. // ClearFilter() 清除该查询全部全局过滤(Org/Self 数据范围 + 软删);软删由 WHERE 显式 !IsDelete 补回;
  78. // ApprovalFlow 无租户过滤(EntityBase 未实现 ITenantIdFilter),不存在绕过租户隔离风险。
  79. var flow = await _flowRep.AsQueryable()
  80. .ClearFilter()
  81. .Where(u => u.BizType == input.BizType && u.IsPublished && !u.IsDelete)
  82. .OrderByDescending(u => u.Version)
  83. .FirstAsync() ?? throw Oops.Oh($"未找到业务类型 [{input.BizType}] 的已发布流程定义");
  84. if (string.IsNullOrWhiteSpace(flow.FlowJson))
  85. throw Oops.Oh("流程定义的 FlowJson 为空,请先设计流程图");
  86. var flowData = JsonSerializer.Deserialize<ApprovalFlowItem>(flow.FlowJson)
  87. ?? throw Oops.Oh("FlowJson 反序列化失败");
  88. var orgId = await ResolveOrgIdForNewFlowEntityAsync();
  89. var instance = new ApprovalFlowInstance
  90. {
  91. FlowId = flow.Id,
  92. FlowVersion = flow.Version,
  93. BizType = input.BizType,
  94. BizId = input.BizId,
  95. BizNo = input.BizNo,
  96. Title = input.Title ?? $"{flow.Name}-{input.BizNo}",
  97. InitiatorId = _userManager.UserId,
  98. InitiatorName = _userManager.RealName,
  99. Status = FlowInstanceStatusEnum.Running,
  100. FlowJsonSnapshot = flow.FlowJson,
  101. StartTime = DateTime.Now,
  102. OrgId = orgId,
  103. };
  104. var startNode = flowData.Nodes.FirstOrDefault(n =>
  105. n.Type is "bpmn:startEvent" or "start-node")
  106. ?? throw Oops.Oh("流程图中未找到开始节点");
  107. await _instanceRep.InsertAsync(instance);
  108. await WriteLog(instance.Id, null, startNode.Id, FlowLogActionEnum.Submit, input.Comment);
  109. // 开始节点登记为已完成,随后沿出边推进(兼容首节点即为并行网关 Fork 的拓扑)
  110. await MarkNodeCompleted(instance.Id, startNode);
  111. var firstOutgoing = flowData.Edges.Where(e => e.SourceNodeId == startNode.Id).Select(e => e.TargetNodeId).ToList();
  112. if (firstOutgoing.Count == 0)
  113. throw Oops.Oh("流程图开始节点未连接任何后继节点");
  114. foreach (var target in firstOutgoing)
  115. {
  116. await ProcessNextNode(instance, flowData, target);
  117. }
  118. await InvokeHandler(input.BizType, instance.Id, h => h.OnFlowStarted(input.BizId, instance.Id));
  119. return instance.Id;
  120. }
  121. /// <summary>
  122. /// 同意
  123. /// </summary>
  124. public async Task Approve(long taskId, string? comment)
  125. {
  126. var task = await GetPendingTask(taskId);
  127. task.Status = FlowTaskStatusEnum.Approved;
  128. task.Comment = comment;
  129. task.ActionTime = DateTime.Now;
  130. await _taskRep.AsUpdateable(task).ExecuteCommandAsync();
  131. // P1-6 审批代理:同步取消配对的本人/代理任务
  132. await CancelPairedDelegateTask(task);
  133. await WriteLog(task.InstanceId, taskId, task.NodeId, FlowLogActionEnum.Approve, comment);
  134. var instance = await _instanceRep.GetByIdAsync(task.InstanceId)
  135. ?? throw Oops.Oh("流程实例不存在");
  136. if (await IsNodeCompleted(instance, task.NodeId))
  137. {
  138. await InvokeHandler(instance.BizType, instance.Id,
  139. h => h.OnNodeCompleted(instance.BizId, instance.Id, task.NodeId, task.NodeName ?? "", task.AssigneeId));
  140. var flowData = DeserializeFlowJson(instance.FlowJsonSnapshot);
  141. await AdvanceToNext(instance, flowData, task.NodeId);
  142. }
  143. }
  144. /// <summary>
  145. /// 拒绝
  146. /// </summary>
  147. public async Task Reject(long taskId, string? comment)
  148. {
  149. var task = await GetPendingTask(taskId);
  150. task.Status = FlowTaskStatusEnum.Rejected;
  151. task.Comment = comment;
  152. task.ActionTime = DateTime.Now;
  153. await _taskRep.AsUpdateable(task).ExecuteCommandAsync();
  154. // Reject 导致流程终止:取消整个实例所有剩余 Pending 任务(含并行分支、配对代理任务等)
  155. await CancelAllPendingTasks(task.InstanceId, task.Id);
  156. var instance = await _instanceRep.GetByIdAsync(task.InstanceId)
  157. ?? throw Oops.Oh("流程实例不存在");
  158. instance.Status = FlowInstanceStatusEnum.Rejected;
  159. instance.EndTime = DateTime.Now;
  160. await _instanceRep.AsUpdateable(instance).ExecuteCommandAsync();
  161. await WriteLog(instance.Id, taskId, task.NodeId, FlowLogActionEnum.Reject, comment);
  162. // P4-17: 驳回人 = 当前被指派的审批人
  163. await InvokeHandler(instance.BizType, instance.Id,
  164. h => h.OnFlowCompleted(instance.BizId, instance.Id, FlowInstanceStatusEnum.Rejected, task.AssigneeId));
  165. await _notifyService.NotifyFlowCompleted(instance.InitiatorId, instance.Id, instance.Title, FlowInstanceStatusEnum.Rejected, instance.BizType);
  166. }
  167. // ═══════════════════════════════════════════
  168. // 扩展操作
  169. // ═══════════════════════════════════════════
  170. /// <summary>
  171. /// 转办
  172. /// </summary>
  173. public async Task Transfer(long taskId, long targetUserId, string? comment)
  174. {
  175. var task = await GetPendingTask(taskId);
  176. task.Status = FlowTaskStatusEnum.Transferred;
  177. task.Comment = comment;
  178. task.ActionTime = DateTime.Now;
  179. task.TransferToId = targetUserId;
  180. await _taskRep.AsUpdateable(task).ExecuteCommandAsync();
  181. var targetUser = await _userRep.GetByIdAsync(targetUserId);
  182. var instForOrg = await _instanceRep.GetByIdAsync(task.InstanceId);
  183. var newTask = new ApprovalFlowTask
  184. {
  185. InstanceId = task.InstanceId,
  186. NodeId = task.NodeId,
  187. NodeName = task.NodeName,
  188. AssigneeId = targetUserId,
  189. AssigneeName = targetUser?.RealName,
  190. Status = FlowTaskStatusEnum.Pending,
  191. OrgId = instForOrg?.OrgId ?? 0,
  192. };
  193. await _taskRep.InsertAsync(newTask);
  194. await WriteLog(task.InstanceId, taskId, task.NodeId, FlowLogActionEnum.Transfer,
  195. $"{comment} → 转办给 {targetUser?.RealName}");
  196. var instance = await _instanceRep.GetByIdAsync(task.InstanceId);
  197. await _notifyService.NotifyTransferred(targetUserId, task.InstanceId, instance?.Title ?? "", _userManager.RealName, instance?.BizType ?? "");
  198. }
  199. /// <summary>
  200. /// 撤回(发起人撤回)
  201. /// </summary>
  202. public async Task Withdraw(long instanceId)
  203. {
  204. var instance = await _instanceRep.GetByIdAsync(instanceId)
  205. ?? throw Oops.Oh("流程实例不存在");
  206. if (instance.InitiatorId != _userManager.UserId)
  207. throw Oops.Oh("只有发起人可以撤回");
  208. if (instance.Status != FlowInstanceStatusEnum.Running)
  209. throw Oops.Oh("当前流程状态不允许撤回");
  210. var pendingTasks = await _taskRep.AsQueryable()
  211. .Where(t => t.InstanceId == instanceId && t.Status == FlowTaskStatusEnum.Pending)
  212. .ToListAsync();
  213. var doneTasks = await _taskRep.AsQueryable()
  214. .Where(t => t.InstanceId == instanceId &&
  215. t.Status != FlowTaskStatusEnum.Pending &&
  216. t.Status != FlowTaskStatusEnum.Cancelled)
  217. .CountAsync();
  218. if (doneTasks > 0)
  219. throw Oops.Oh("已有人审批过,不可撤回");
  220. var cancelledUserIds = pendingTasks.Select(t => t.AssigneeId).Distinct().ToList();
  221. foreach (var t in pendingTasks)
  222. {
  223. t.Status = FlowTaskStatusEnum.Cancelled;
  224. t.ActionTime = DateTime.Now;
  225. }
  226. await _taskRep.AsUpdateable(pendingTasks).ExecuteCommandAsync();
  227. instance.Status = FlowInstanceStatusEnum.Cancelled;
  228. instance.EndTime = DateTime.Now;
  229. await _instanceRep.AsUpdateable(instance).ExecuteCommandAsync();
  230. await WriteLog(instanceId, null, instance.CurrentNodeId, FlowLogActionEnum.Withdraw, null);
  231. // P4-17: 撤回场景无"审批人",lastApproverId = null
  232. await InvokeHandler(instance.BizType, instanceId,
  233. h => h.OnFlowCompleted(instance.BizId, instanceId, FlowInstanceStatusEnum.Cancelled, null));
  234. await _notifyService.NotifyWithdrawn(cancelledUserIds, instanceId, instance.Title, instance.InitiatorName, instance.BizType);
  235. }
  236. /// <summary>
  237. /// 退回上一步
  238. /// </summary>
  239. public async Task ReturnToPrev(long taskId, string? comment)
  240. {
  241. var task = await GetPendingTask(taskId);
  242. var instance = await _instanceRep.GetByIdAsync(task.InstanceId)
  243. ?? throw Oops.Oh("流程实例不存在");
  244. var flowData = DeserializeFlowJson(instance.FlowJsonSnapshot);
  245. var prevNodeId = FindPrevUserTaskNodeId(flowData, task.NodeId);
  246. if (prevNodeId == null)
  247. throw Oops.Oh("已是第一个审批节点,无法退回");
  248. await CancelPendingTasks(task.InstanceId, task.NodeId);
  249. task.Status = FlowTaskStatusEnum.Returned;
  250. task.Comment = comment;
  251. task.ActionTime = DateTime.Now;
  252. await _taskRep.AsUpdateable(task).ExecuteCommandAsync();
  253. instance.CurrentNodeId = prevNodeId;
  254. await _instanceRep.AsUpdateable(instance).ExecuteCommandAsync();
  255. await CreateTasksForNode(instance, flowData, prevNodeId);
  256. await WriteLog(instance.Id, taskId, task.NodeId, FlowLogActionEnum.Return, comment);
  257. var returnedTasks = await _taskRep.AsQueryable()
  258. .Where(t => t.InstanceId == instance.Id && t.NodeId == prevNodeId && t.Status == FlowTaskStatusEnum.Pending)
  259. .ToListAsync();
  260. var returnedUserIds = returnedTasks.Select(t => t.AssigneeId).Distinct().ToList();
  261. await _notifyService.NotifyReturned(returnedUserIds, instance.Id, instance.Title, _userManager.RealName, instance.BizType);
  262. }
  263. /// <summary>
  264. /// 加签
  265. /// </summary>
  266. public async Task AddSign(long taskId, long targetUserId, string? comment)
  267. {
  268. var task = await GetPendingTask(taskId);
  269. var targetUser = await _userRep.GetByIdAsync(targetUserId);
  270. var instForAddSign = await _instanceRep.GetByIdAsync(task.InstanceId);
  271. var newTask = new ApprovalFlowTask
  272. {
  273. InstanceId = task.InstanceId,
  274. NodeId = task.NodeId,
  275. NodeName = task.NodeName,
  276. AssigneeId = targetUserId,
  277. AssigneeName = targetUser?.RealName,
  278. Status = FlowTaskStatusEnum.Pending,
  279. IsAddSign = true,
  280. AddSignById = _userManager.UserId,
  281. OrgId = instForAddSign?.OrgId ?? 0,
  282. };
  283. await _taskRep.InsertAsync(newTask);
  284. await WriteLog(task.InstanceId, taskId, task.NodeId, FlowLogActionEnum.AddSign,
  285. $"{comment} → 加签给 {targetUser?.RealName}");
  286. var instance = await _instanceRep.GetByIdAsync(task.InstanceId);
  287. await _notifyService.NotifyAddSign(targetUserId, task.InstanceId, instance?.Title ?? "", _userManager.RealName, instance?.BizType ?? "");
  288. }
  289. /// <summary>
  290. /// 手动升级 — 当前审批人主动将任务升级到更高层级
  291. /// </summary>
  292. public async Task Escalate(long taskId, string? comment)
  293. {
  294. var task = await GetPendingTask(taskId);
  295. var instance = await _instanceRep.GetByIdAsync(task.InstanceId)
  296. ?? throw Oops.Oh("流程实例不存在");
  297. var flowData = DeserializeFlowJson(instance.FlowJsonSnapshot);
  298. var node = flowData.Nodes.FirstOrDefault(n => n.Id == task.NodeId)
  299. ?? throw Oops.Oh("节点不存在");
  300. var props = node.Properties;
  301. if (props?.EnableManualEscalation != true
  302. || string.IsNullOrWhiteSpace(props.EscalationApproverType)
  303. || string.IsNullOrWhiteSpace(props.EscalationApproverIds))
  304. throw Oops.Oh("该节点未配置升级目标,无法升级");
  305. task.Status = FlowTaskStatusEnum.Escalated;
  306. task.Comment = comment;
  307. task.ActionTime = DateTime.Now;
  308. await _taskRep.AsUpdateable(task).ExecuteCommandAsync();
  309. await CancelPendingTasks(task.InstanceId, task.NodeId, task.Id);
  310. var escalationApprovers = await ResolveApprovers(
  311. new FlowProperties
  312. {
  313. ApproverType = props.EscalationApproverType,
  314. ApproverIds = props.EscalationApproverIds,
  315. ApproverNames = props.EscalationApproverNames,
  316. },
  317. instance.InitiatorId);
  318. if (escalationApprovers.Count == 0)
  319. throw Oops.Oh("升级目标审批人列表为空");
  320. var newTasks = escalationApprovers.Select(a => new ApprovalFlowTask
  321. {
  322. InstanceId = instance.Id,
  323. NodeId = task.NodeId,
  324. NodeName = task.NodeName,
  325. AssigneeId = a.userId,
  326. AssigneeName = a.userName,
  327. Status = FlowTaskStatusEnum.Pending,
  328. OrgId = instance.OrgId,
  329. }).ToList();
  330. await _taskRep.AsInsertable(newTasks).ExecuteCommandAsync();
  331. var targetNames = string.Join(", ", escalationApprovers.Select(a => a.userName));
  332. await WriteLog(instance.Id, taskId, task.NodeId, FlowLogActionEnum.Escalate,
  333. $"{comment} → 升级给 {targetNames}");
  334. var targetUserIds = escalationApprovers.Select(a => a.userId).Distinct().ToList();
  335. await _notifyService.NotifyEscalated(targetUserIds, instance.Id, instance.Title,
  336. _userManager.RealName, task.NodeName, instance.BizType);
  337. }
  338. /// <summary>
  339. /// 催办
  340. /// </summary>
  341. public async Task Urge(long instanceId)
  342. {
  343. var instance = await _instanceRep.GetByIdAsync(instanceId)
  344. ?? throw Oops.Oh("流程实例不存在");
  345. if (instance.Status != FlowInstanceStatusEnum.Running)
  346. throw Oops.Oh("当前流程不在审批中");
  347. await WriteLog(instanceId, null, instance.CurrentNodeId, FlowLogActionEnum.Urge, "催办");
  348. var pendingTasks = await _taskRep.AsQueryable()
  349. .Where(t => t.InstanceId == instanceId && t.Status == FlowTaskStatusEnum.Pending)
  350. .ToListAsync();
  351. var userIds = pendingTasks.Select(t => t.AssigneeId).Distinct().ToList();
  352. await _notifyService.NotifyUrge(userIds, instanceId, instance.Title, instance.BizType);
  353. }
  354. // ═══════════════════════════════════════════
  355. // 超时自动处理(由 FlowTimeoutJob 调用,无 UserManager 上下文)
  356. // ═══════════════════════════════════════════
  357. /// <summary>
  358. /// 扫描所有 Pending 且已超过 timeoutHours 的任务,按节点 timeoutAction 执行对应动作。
  359. /// 与 FlowTimeoutJob 扫描等价,抽出公共方法以便管理员接口立即触发(运维/E2E 测试用)。
  360. /// 返回本次处理的任务条数。
  361. /// </summary>
  362. public async Task<int> ScanTimeoutTasks(CancellationToken stoppingToken = default)
  363. {
  364. var pendingTasks = await _taskRep.Context.Queryable<ApprovalFlowTask>()
  365. .InnerJoin<ApprovalFlowInstance>((t, i) => t.InstanceId == i.Id)
  366. .Where((t, i) => t.Status == FlowTaskStatusEnum.Pending
  367. && i.Status == FlowInstanceStatusEnum.Running)
  368. .Select((t, i) => new
  369. {
  370. TaskId = t.Id,
  371. TaskCreatedAt = t.CreateTime,
  372. NodeId = t.NodeId,
  373. FlowJsonSnapshot = i.FlowJsonSnapshot,
  374. })
  375. .ToListAsync();
  376. var now = DateTime.Now;
  377. var processed = 0;
  378. foreach (var item in pendingTasks)
  379. {
  380. if (stoppingToken.IsCancellationRequested) break;
  381. if (string.IsNullOrWhiteSpace(item.FlowJsonSnapshot)) continue;
  382. ApprovalFlowItem? flowData;
  383. try { flowData = JsonSerializer.Deserialize<ApprovalFlowItem>(item.FlowJsonSnapshot); }
  384. catch { continue; }
  385. var node = flowData?.Nodes?.FirstOrDefault(n => n.Id == item.NodeId);
  386. var props = node?.Properties;
  387. if (props?.TimeoutHours == null || props.TimeoutHours <= 0) continue;
  388. if (string.IsNullOrWhiteSpace(props.TimeoutAction)) continue;
  389. var deadline = item.TaskCreatedAt.AddHours(props.TimeoutHours.Value);
  390. if (now < deadline) continue;
  391. // Notify 动作幂等:已记录过 AutoTimeout 日志的跳过,避免每轮重复催办
  392. if (props.TimeoutAction == "Notify")
  393. {
  394. var alreadyNotified = await _logRep.AsQueryable()
  395. .AnyAsync(log => log.TaskId == item.TaskId
  396. && log.Action == FlowLogActionEnum.AutoTimeout);
  397. if (alreadyNotified) continue;
  398. }
  399. try
  400. {
  401. await HandleTimeoutTask(item.TaskId);
  402. processed++;
  403. }
  404. catch
  405. {
  406. // 单任务异常隔离:吞掉继续下一个,由 Job/Admin API 层统一记日志
  407. }
  408. }
  409. return processed;
  410. }
  411. /// <summary>
  412. /// 处理单个超时任务(由定时任务调用)
  413. /// </summary>
  414. public async Task HandleTimeoutTask(long taskId)
  415. {
  416. var task = await _taskRep.GetByIdAsync(taskId);
  417. if (task == null || task.Status != FlowTaskStatusEnum.Pending) return;
  418. var instance = await _instanceRep.GetByIdAsync(task.InstanceId);
  419. if (instance == null || instance.Status != FlowInstanceStatusEnum.Running) return;
  420. var flowData = DeserializeFlowJson(instance.FlowJsonSnapshot);
  421. var node = flowData.Nodes.FirstOrDefault(n => n.Id == task.NodeId);
  422. var props = node?.Properties;
  423. if (props == null) return;
  424. switch (props.TimeoutAction)
  425. {
  426. case "Notify":
  427. await _notifyService.NotifyTimeout(
  428. new List<long> { task.AssigneeId }, instance.Id, instance.Title, instance.BizType);
  429. await WriteSystemLog(instance.Id, task.Id, task.NodeId,
  430. FlowLogActionEnum.AutoTimeout, "审批超时,已发送提醒通知");
  431. break;
  432. case "AutoApprove":
  433. await AutoApproveTask(task, instance);
  434. break;
  435. case "AutoReject":
  436. await AutoRejectTask(task, instance);
  437. break;
  438. case "AutoEscalate":
  439. await AutoEscalateTask(task, props, instance);
  440. break;
  441. }
  442. }
  443. private async Task AutoApproveTask(ApprovalFlowTask task, ApprovalFlowInstance instance)
  444. {
  445. task.Status = FlowTaskStatusEnum.Approved;
  446. task.Comment = "系统自动通过(超时)";
  447. task.ActionTime = DateTime.Now;
  448. await _taskRep.AsUpdateable(task).ExecuteCommandAsync();
  449. await WriteSystemLog(instance.Id, task.Id, task.NodeId,
  450. FlowLogActionEnum.AutoTimeout, "审批超时,系统自动通过");
  451. if (await IsNodeCompleted(instance, task.NodeId))
  452. {
  453. // P4-17: 系统自动通过,无人工审批人,approverUserId = null
  454. await InvokeHandler(instance.BizType, instance.Id,
  455. h => h.OnNodeCompleted(instance.BizId, instance.Id, task.NodeId, task.NodeName ?? "", null));
  456. var flowData = DeserializeFlowJson(instance.FlowJsonSnapshot);
  457. await AdvanceToNext(instance, flowData, task.NodeId);
  458. }
  459. }
  460. private async Task AutoRejectTask(ApprovalFlowTask task, ApprovalFlowInstance instance)
  461. {
  462. task.Status = FlowTaskStatusEnum.Rejected;
  463. task.Comment = "系统自动拒绝(超时)";
  464. task.ActionTime = DateTime.Now;
  465. await _taskRep.AsUpdateable(task).ExecuteCommandAsync();
  466. await CancelPendingTasks(task.InstanceId, task.NodeId, task.Id);
  467. instance.Status = FlowInstanceStatusEnum.Rejected;
  468. instance.EndTime = DateTime.Now;
  469. await _instanceRep.AsUpdateable(instance).ExecuteCommandAsync();
  470. await WriteSystemLog(instance.Id, task.Id, task.NodeId,
  471. FlowLogActionEnum.AutoTimeout, "审批超时,系统自动拒绝");
  472. // P4-17: 系统自动拒绝,无人工审批人,lastApproverId = null
  473. await InvokeHandler(instance.BizType, instance.Id,
  474. h => h.OnFlowCompleted(instance.BizId, instance.Id, FlowInstanceStatusEnum.Rejected, null));
  475. await _notifyService.NotifyFlowCompleted(instance.InitiatorId, instance.Id,
  476. instance.Title, FlowInstanceStatusEnum.Rejected, instance.BizType);
  477. }
  478. private async Task AutoEscalateTask(ApprovalFlowTask task, FlowProperties nodeProps, ApprovalFlowInstance instance)
  479. {
  480. if (string.IsNullOrWhiteSpace(nodeProps.EscalationApproverType)
  481. || string.IsNullOrWhiteSpace(nodeProps.EscalationApproverIds))
  482. return;
  483. task.Status = FlowTaskStatusEnum.Escalated;
  484. task.Comment = "系统自动升级(超时)";
  485. task.ActionTime = DateTime.Now;
  486. await _taskRep.AsUpdateable(task).ExecuteCommandAsync();
  487. await CancelPendingTasks(task.InstanceId, task.NodeId, task.Id);
  488. var approvers = await ResolveApprovers(
  489. new FlowProperties
  490. {
  491. ApproverType = nodeProps.EscalationApproverType,
  492. ApproverIds = nodeProps.EscalationApproverIds,
  493. },
  494. instance.InitiatorId);
  495. if (approvers.Count == 0) return;
  496. var newTasks = approvers.Select(a => new ApprovalFlowTask
  497. {
  498. InstanceId = instance.Id,
  499. NodeId = task.NodeId,
  500. NodeName = task.NodeName,
  501. AssigneeId = a.userId,
  502. AssigneeName = a.userName,
  503. Status = FlowTaskStatusEnum.Pending,
  504. OrgId = instance.OrgId,
  505. }).ToList();
  506. await _taskRep.AsInsertable(newTasks).ExecuteCommandAsync();
  507. var targetNames = string.Join(", ", approvers.Select(a => a.userName));
  508. await WriteSystemLog(instance.Id, task.Id, task.NodeId,
  509. FlowLogActionEnum.AutoTimeout, $"审批超时,自动升级给 {targetNames}");
  510. var targetUserIds = approvers.Select(a => a.userId).Distinct().ToList();
  511. await _notifyService.NotifyEscalated(targetUserIds, instance.Id, instance.Title,
  512. "系统", task.NodeName, instance.BizType);
  513. }
  514. private async Task WriteSystemLog(long instanceId, long? taskId, string? nodeId,
  515. FlowLogActionEnum action, string? comment)
  516. {
  517. await _logRep.InsertAsync(new ApprovalFlowLog
  518. {
  519. InstanceId = instanceId,
  520. TaskId = taskId,
  521. NodeId = nodeId,
  522. Action = action,
  523. OperatorId = 0,
  524. OperatorName = "系统",
  525. Comment = comment,
  526. });
  527. }
  528. // ═══════════════════════════════════════════
  529. // 内部引擎方法
  530. // ═══════════════════════════════════════════
  531. /// <summary>
  532. /// 推进到下一节点。支持并行网关(Fork / Join)。
  533. /// 行为约定:
  534. /// - 进入本方法前,调用方(如 <see cref="Approve"/>)应已将 <paramref name="currentNodeId"/>(userTask)标记为完成节点;
  535. /// 本方法会将途经的网关节点也写入 <see cref="ApprovalFlowCompletedNode"/>。
  536. /// - 并行网关 Fork(出边&gt;=2):沿每条出边递归推进;
  537. /// 并行网关 Join(入边&gt;=2):校验所有前驱节点是否都在"已完成"集合中,
  538. /// 任一尚未完成则**静默等待**(不报错、不推进),由后续分支完成后再次触发 Join 校验。
  539. /// </summary>
  540. private async Task AdvanceToNext(ApprovalFlowInstance instance, ApprovalFlowItem flowData, string currentNodeId)
  541. {
  542. // 当前节点可能是 userTask(完成记录由 Approve 写入)或网关(入口处已写入);此处统一确保幂等入库
  543. var currentNode = flowData.Nodes.FirstOrDefault(n => n.Id == currentNodeId);
  544. await MarkNodeCompleted(instance.Id, currentNode);
  545. var outgoingEdges = flowData.Edges.Where(e => e.SourceNodeId == currentNodeId).ToList();
  546. if (outgoingEdges.Count == 0)
  547. {
  548. await CompleteInstance(instance, FlowInstanceStatusEnum.Approved);
  549. return;
  550. }
  551. // 非并行网关场景:当前节点一般只有 1 条出边
  552. foreach (var edge in outgoingEdges)
  553. {
  554. await ProcessNextNode(instance, flowData, edge.TargetNodeId);
  555. }
  556. }
  557. /// <summary>
  558. /// 处理某个"下一节点"。根据节点类型分发:
  559. /// - endEvent:所有分支任务都结束时触发实例完成
  560. /// - exclusiveGateway:按条件选择分支
  561. /// - parallelGateway:Fork 并行分发;Join 等待所有前驱完成
  562. /// - userTask / 其他:创建任务
  563. /// </summary>
  564. private async Task ProcessNextNode(ApprovalFlowInstance instance, ApprovalFlowItem flowData, string nextNodeId)
  565. {
  566. var nextNode = flowData.Nodes.FirstOrDefault(n => n.Id == nextNodeId);
  567. if (nextNode == null)
  568. {
  569. await CompleteInstance(instance, FlowInstanceStatusEnum.Approved);
  570. return;
  571. }
  572. if (nextNode.Type is "bpmn:endEvent" or "end-node")
  573. {
  574. // 所有并行分支都已结束(无其它 Pending 任务)才真正完成实例
  575. var hasOtherPending = await _taskRep.AsQueryable()
  576. .AnyAsync(t => t.InstanceId == instance.Id && t.Status == FlowTaskStatusEnum.Pending);
  577. if (hasOtherPending) return;
  578. await CompleteInstance(instance, FlowInstanceStatusEnum.Approved);
  579. return;
  580. }
  581. if (nextNode.Type is "bpmn:exclusiveGateway")
  582. {
  583. await MarkNodeCompleted(instance.Id, nextNode);
  584. var bizData = await GetBizData(instance.BizType, instance.BizId);
  585. var targetNodeId = EvaluateGateway(nextNode.Properties?.Conditions, flowData, nextNode.Id, bizData);
  586. instance.CurrentNodeId = targetNodeId;
  587. await _instanceRep.AsUpdateable(instance).UpdateColumns(i => new { i.CurrentNodeId }).ExecuteCommandAsync();
  588. await ProcessNextNode(instance, flowData, targetNodeId);
  589. return;
  590. }
  591. if (nextNode.Type is "bpmn:parallelGateway")
  592. {
  593. var incoming = flowData.Edges.Where(e => e.TargetNodeId == nextNode.Id).Select(e => e.SourceNodeId).ToList();
  594. var outgoing = flowData.Edges.Where(e => e.SourceNodeId == nextNode.Id).Select(e => e.TargetNodeId).ToList();
  595. // Join 语义:入边 >= 2,需等所有前驱都已完成
  596. if (incoming.Count >= 2)
  597. {
  598. var completedSet = await GetCompletedNodeIdSet(instance.Id);
  599. if (!incoming.All(p => completedSet.Contains(p)))
  600. {
  601. // 未汇合,静默等待后续分支抵达
  602. return;
  603. }
  604. }
  605. // Fork 或 Join 通过:标记网关完成,沿所有出边推进
  606. await MarkNodeCompleted(instance.Id, nextNode);
  607. foreach (var target in outgoing)
  608. {
  609. await ProcessNextNode(instance, flowData, target);
  610. }
  611. return;
  612. }
  613. // userTask 或其他:创建任务
  614. instance.CurrentNodeId = nextNodeId;
  615. await _instanceRep.AsUpdateable(instance).UpdateColumns(i => new { i.CurrentNodeId }).ExecuteCommandAsync();
  616. await CreateTasksForNode(instance, flowData, nextNodeId);
  617. }
  618. /// <summary>
  619. /// 标记节点已完成(幂等:重复写入被唯一索引拦截后忽略)
  620. /// </summary>
  621. private async Task MarkNodeCompleted(long instanceId, ApprovalFlowNodeItem? node)
  622. {
  623. if (node == null) return;
  624. try
  625. {
  626. await _completedNodeRep.InsertAsync(new ApprovalFlowCompletedNode
  627. {
  628. InstanceId = instanceId,
  629. NodeId = node.Id,
  630. NodeName = node.Properties?.NodeName ?? node.Text?.Value,
  631. NodeType = node.Type,
  632. CompletedTime = DateTime.Now,
  633. });
  634. }
  635. catch
  636. {
  637. // 并发场景下可能触发唯一索引冲突,忽略(已存在即可)
  638. }
  639. }
  640. /// <summary>
  641. /// 查询实例已完成节点 Id 集合
  642. /// </summary>
  643. private async Task<HashSet<string>> GetCompletedNodeIdSet(long instanceId)
  644. {
  645. var ids = await _completedNodeRep.AsQueryable()
  646. .Where(c => c.InstanceId == instanceId)
  647. .Select(c => c.NodeId)
  648. .ToListAsync();
  649. return new HashSet<string>(ids);
  650. }
  651. private async Task CompleteInstance(ApprovalFlowInstance instance, FlowInstanceStatusEnum status)
  652. {
  653. // 幂等:并行分支同时到达 end 时避免重复完成
  654. var latest = await _instanceRep.GetByIdAsync(instance.Id);
  655. if (latest == null || latest.Status != FlowInstanceStatusEnum.Running) return;
  656. instance.Status = status;
  657. instance.EndTime = DateTime.Now;
  658. await _instanceRep.AsUpdateable(instance)
  659. .UpdateColumns(i => new { i.Status, i.EndTime })
  660. .ExecuteCommandAsync();
  661. // P4-17: 正常完成路径的 lastApproverId = 最后一条人工 Approve 日志的操作人(系统自动通过不算)
  662. var lastApproverId = status == FlowInstanceStatusEnum.Approved
  663. ? await GetLastHumanApproverIdAsync(instance.Id)
  664. : null;
  665. await InvokeHandler(instance.BizType, instance.Id,
  666. h => h.OnFlowCompleted(instance.BizId, instance.Id, status, lastApproverId));
  667. await _notifyService.NotifyFlowCompleted(instance.InitiatorId, instance.Id, instance.Title, status, instance.BizType);
  668. }
  669. /// <summary>
  670. /// P4-17: 查询该实例最后一条人工 Approve 日志的操作人 UserId
  671. /// (系统超时自动通过写入的是 AutoTimeout,不会被此查询命中)
  672. /// </summary>
  673. private async Task<long?> GetLastHumanApproverIdAsync(long instanceId)
  674. {
  675. var lastLog = await _logRep.AsQueryable()
  676. .Where(l => l.InstanceId == instanceId && l.Action == FlowLogActionEnum.Approve)
  677. .OrderByDescending(l => l.CreateTime)
  678. .FirstAsync();
  679. return lastLog?.OperatorId > 0 ? lastLog.OperatorId : null;
  680. }
  681. private async Task CreateTasksForNode(ApprovalFlowInstance instance, ApprovalFlowItem flowData, string nodeId)
  682. {
  683. var node = flowData.Nodes.FirstOrDefault(n => n.Id == nodeId)
  684. ?? throw Oops.Oh($"FlowJson 中未找到节点 [{nodeId}]");
  685. var approvers = await ResolveApprovers(node.Properties, instance.InitiatorId);
  686. if (approvers.Count == 0)
  687. throw Oops.Oh($"节点 [{node.Properties?.NodeName ?? nodeId}] 未配置审批人或审批人列表为空");
  688. var nodeName = node.Properties?.NodeName ?? node.Text?.Value;
  689. var tasks = approvers.Select(a => new ApprovalFlowTask
  690. {
  691. InstanceId = instance.Id,
  692. NodeId = nodeId,
  693. NodeName = nodeName,
  694. AssigneeId = a.userId,
  695. AssigneeName = a.userName,
  696. Status = FlowTaskStatusEnum.Pending,
  697. OrgId = instance.OrgId,
  698. }).ToList();
  699. // P1-6 审批代理:为每个原审批人检查是否存在有效代理,是则并行创建一条代理任务
  700. var delegateTasks = new List<ApprovalFlowTask>();
  701. foreach (var a in approvers)
  702. {
  703. var del = await FindEffectiveDelegate(a.userId, instance.BizType);
  704. if (del == null) continue;
  705. // 代理人和原审批人不能重复,代理人也不能是原审批人列表中其他人(避免同一人两条任务)
  706. if (approvers.Any(x => x.userId == del.DelegateUserId)) continue;
  707. delegateTasks.Add(new ApprovalFlowTask
  708. {
  709. InstanceId = instance.Id,
  710. NodeId = nodeId,
  711. NodeName = nodeName,
  712. AssigneeId = del.DelegateUserId,
  713. AssigneeName = del.DelegateUserName,
  714. Status = FlowTaskStatusEnum.Pending,
  715. OrgId = instance.OrgId,
  716. IsDelegate = true,
  717. DelegateForUserId = a.userId,
  718. DelegateForUserName = a.userName,
  719. });
  720. }
  721. var allTasks = tasks.Concat(delegateTasks).ToList();
  722. await _taskRep.AsInsertable(allTasks).ExecuteCommandAsync();
  723. var assigneeIds = allTasks.Select(t => t.AssigneeId).Distinct().ToList();
  724. await _notifyService.NotifyNewTask(assigneeIds, instance.Id, instance.Title, nodeName, instance.BizType);
  725. }
  726. /// <summary>
  727. /// 查找指定用户当前生效的审批代理(时间窗口内 + 已启用 + BizType 匹配或全局)
  728. /// </summary>
  729. private async Task<ApprovalFlowDelegate?> FindEffectiveDelegate(long userId, string? bizType)
  730. {
  731. var now = DateTime.Now;
  732. return await _delegateRep.AsQueryable()
  733. .Where(d => d.UserId == userId
  734. && d.IsEnabled
  735. && d.StartTime <= now
  736. && d.EndTime >= now
  737. && (string.IsNullOrEmpty(d.BizType) || d.BizType == bizType))
  738. .OrderBy(d => d.BizType == null ? 1 : 0) // 优先匹配指定 BizType 的代理
  739. .OrderByDescending(d => d.CreateTime)
  740. .FirstAsync();
  741. }
  742. /// <summary>
  743. /// 取消与已完成任务配对的代理任务(P1-6)
  744. /// - 本人任务完成:取消对应的代理任务
  745. /// - 代理任务完成:取消对应的本人任务
  746. /// </summary>
  747. private async Task CancelPairedDelegateTask(ApprovalFlowTask completedTask)
  748. {
  749. ApprovalFlowTask? paired;
  750. if (completedTask.IsDelegate)
  751. {
  752. var originalUserId = completedTask.DelegateForUserId ?? 0;
  753. if (originalUserId == 0) return;
  754. paired = await _taskRep.AsQueryable()
  755. .Where(t => t.InstanceId == completedTask.InstanceId
  756. && t.NodeId == completedTask.NodeId
  757. && t.Status == FlowTaskStatusEnum.Pending
  758. && t.AssigneeId == originalUserId
  759. && !t.IsDelegate)
  760. .FirstAsync();
  761. }
  762. else
  763. {
  764. paired = await _taskRep.AsQueryable()
  765. .Where(t => t.InstanceId == completedTask.InstanceId
  766. && t.NodeId == completedTask.NodeId
  767. && t.Status == FlowTaskStatusEnum.Pending
  768. && t.IsDelegate
  769. && t.DelegateForUserId == completedTask.AssigneeId)
  770. .FirstAsync();
  771. }
  772. if (paired == null) return;
  773. paired.Status = FlowTaskStatusEnum.Cancelled;
  774. paired.ActionTime = DateTime.Now;
  775. await _taskRep.AsUpdateable(paired).ExecuteCommandAsync();
  776. }
  777. private async Task<List<(long userId, string userName)>> ResolveApprovers(FlowProperties? props, long initiatorId)
  778. {
  779. if (props == null || string.IsNullOrWhiteSpace(props.ApproverType))
  780. return new List<(long, string)>();
  781. var approverType = props.ApproverType;
  782. if (approverType == nameof(ApproverTypeEnum.Initiator))
  783. {
  784. var initiator = await _userRep.GetByIdAsync(initiatorId);
  785. return initiator != null
  786. ? new List<(long, string)> { (initiator.Id, initiator.RealName ?? "") }
  787. : new List<(long, string)>();
  788. }
  789. if (string.IsNullOrWhiteSpace(props.ApproverIds))
  790. return new List<(long, string)>();
  791. var ids = props.ApproverIds.Split(',', StringSplitOptions.RemoveEmptyEntries)
  792. .Select(s => long.TryParse(s.Trim(), out var v) ? v : 0).Where(id => id > 0).ToList();
  793. if (approverType == nameof(ApproverTypeEnum.SpecificUser))
  794. {
  795. var users = await _userRep.AsQueryable()
  796. .Where(u => ids.Contains(u.Id)).ToListAsync();
  797. return users.Select(u => (u.Id, u.RealName ?? "")).ToList();
  798. }
  799. if (approverType == nameof(ApproverTypeEnum.Role))
  800. {
  801. var userIds = await _userRoleRep.AsQueryable()
  802. .Where(ur => ids.Contains(ur.RoleId))
  803. .Select(ur => ur.UserId)
  804. .ToListAsync();
  805. // S8-S1-EXCEPTION-FLOW-SYNC-FIX-1:审批人解析是「流程配置」,不应受发起人数据范围(DataScope)隔离。
  806. // SysUser 继承 EntityBaseTenantOrg(→EntityBaseOrg),发起人 DataScope=Self 时被「CreateUserId==发起人」过滤、
  807. // Dept/DeptChild 时被 OrgId 过滤,会把角色成员(甚至发起人自己,CreateUserId 可能为 NULL)过滤掉
  808. // → 审批人列表为空 → ProcessNextNode 抛错、不建任务、实例悬挂。
  809. // ClearFilter() 清除数据范围过滤;显式补回租户隔离(TenantId==当前登录租户),等价于原全局租户过滤,无跨租户泄漏。
  810. var users = await _userRep.AsQueryable()
  811. .ClearFilter()
  812. .Where(u => userIds.Contains(u.Id) && u.TenantId == _userManager.TenantId)
  813. .ToListAsync();
  814. return users.Select(u => (u.Id, u.RealName ?? "")).ToList();
  815. }
  816. if (approverType == nameof(ApproverTypeEnum.Department))
  817. {
  818. var users = await _userRep.AsQueryable()
  819. .Where(u => ids.Contains(u.OrgId)).ToListAsync();
  820. return users.Select(u => (u.Id, u.RealName ?? "")).ToList();
  821. }
  822. if (approverType == nameof(ApproverTypeEnum.DepartmentLeader))
  823. {
  824. var initiator = await _userRep.GetByIdAsync(initiatorId);
  825. if (initiator == null || initiator.OrgId <= 0)
  826. return new List<(long, string)>();
  827. var org = await _orgRep.GetByIdAsync(initiator.OrgId);
  828. if (org?.DirectorId != null && org.DirectorId > 0)
  829. {
  830. var director = await _userRep.GetByIdAsync(org.DirectorId.Value);
  831. if (director != null)
  832. return new List<(long, string)> { (director.Id, director.RealName ?? "") };
  833. }
  834. if (initiator.ManagerUserId != null && initiator.ManagerUserId > 0)
  835. {
  836. var manager = await _userRep.GetByIdAsync(initiator.ManagerUserId.Value);
  837. if (manager != null)
  838. return new List<(long, string)> { (manager.Id, manager.RealName ?? "") };
  839. }
  840. return new List<(long, string)>();
  841. }
  842. return new List<(long, string)>();
  843. }
  844. /// <summary>
  845. /// 评估排他网关 — 依次尝试各非默认分支的条件表达式,首个匹配的获胜;
  846. /// 全不匹配则走默认分支;无默认则走第一条出边
  847. /// 支持简单比较表达式:variable op value(op: ==,!=,>,>=,&lt;,&lt;=)
  848. /// </summary>
  849. private string EvaluateGateway(List<GatewayCondition>? conditions, ApprovalFlowItem flowData, string gatewayNodeId, Dictionary<string, object>? bizData)
  850. {
  851. if (conditions != null && conditions.Count > 0 && bizData != null && bizData.Count > 0)
  852. {
  853. foreach (var cond in conditions.Where(c => !c.IsDefault))
  854. {
  855. if (!string.IsNullOrWhiteSpace(cond.Expression) && EvalSimpleExpression(cond.Expression, bizData))
  856. return cond.TargetNodeId;
  857. }
  858. var defaultBranch = conditions.FirstOrDefault(c => c.IsDefault);
  859. if (defaultBranch != null)
  860. return defaultBranch.TargetNodeId;
  861. }
  862. else if (conditions != null && conditions.Count > 0)
  863. {
  864. var defaultBranch = conditions.FirstOrDefault(c => c.IsDefault);
  865. if (defaultBranch != null) return defaultBranch.TargetNodeId;
  866. return conditions.First().TargetNodeId;
  867. }
  868. var edge = flowData.Edges.FirstOrDefault(e => e.SourceNodeId == gatewayNodeId);
  869. return edge?.TargetNodeId ?? throw Oops.Oh("排他网关没有出边");
  870. }
  871. /// <summary>
  872. /// 简单表达式求值:支持 "field op value" 格式(如 "urgent == 1", "customLevel >= 3", "amount > 10000")
  873. /// 多条件用 &amp;&amp; 连接
  874. /// </summary>
  875. private static bool EvalSimpleExpression(string expression, Dictionary<string, object> bizData)
  876. {
  877. var parts = expression.Split("&&", StringSplitOptions.TrimEntries);
  878. foreach (var part in parts)
  879. {
  880. if (!EvalSingleComparison(part.Trim(), bizData))
  881. return false;
  882. }
  883. return true;
  884. }
  885. private static bool EvalSingleComparison(string expr, Dictionary<string, object> bizData)
  886. {
  887. string[] ops = { ">=", "<=", "!=", "==", ">", "<" };
  888. foreach (var op in ops)
  889. {
  890. var idx = expr.IndexOf(op, StringComparison.Ordinal);
  891. if (idx < 0) continue;
  892. var fieldName = expr[..idx].Trim();
  893. var valueStr = expr[(idx + op.Length)..].Trim().Trim('"', '\'');
  894. if (!bizData.TryGetValue(fieldName, out var fieldValue))
  895. return false;
  896. if (decimal.TryParse(fieldValue?.ToString(), out var numLeft) && decimal.TryParse(valueStr, out var numRight))
  897. {
  898. return op switch
  899. {
  900. "==" => numLeft == numRight,
  901. "!=" => numLeft != numRight,
  902. ">" => numLeft > numRight,
  903. ">=" => numLeft >= numRight,
  904. "<" => numLeft < numRight,
  905. "<=" => numLeft <= numRight,
  906. _ => false,
  907. };
  908. }
  909. var strLeft = fieldValue?.ToString() ?? "";
  910. return op switch
  911. {
  912. "==" => strLeft.Equals(valueStr, StringComparison.OrdinalIgnoreCase),
  913. "!=" => !strLeft.Equals(valueStr, StringComparison.OrdinalIgnoreCase),
  914. _ => false,
  915. };
  916. }
  917. return false;
  918. }
  919. private async Task<Dictionary<string, object>?> GetBizData(string bizType, long bizId)
  920. {
  921. var handlers = App.GetServices<IFlowBizHandler>();
  922. var handler = handlers?.FirstOrDefault(h => h.BizType == bizType);
  923. if (handler == null) return null;
  924. return await handler.GetBizData(bizId);
  925. }
  926. private string? FindNextNodeId(ApprovalFlowItem flowData, string currentNodeId)
  927. {
  928. var edge = flowData.Edges.FirstOrDefault(e => e.SourceNodeId == currentNodeId);
  929. return edge?.TargetNodeId;
  930. }
  931. private string? FindPrevUserTaskNodeId(ApprovalFlowItem flowData, string currentNodeId)
  932. {
  933. var inEdge = flowData.Edges.FirstOrDefault(e => e.TargetNodeId == currentNodeId);
  934. if (inEdge == null) return null;
  935. var prevNode = flowData.Nodes.FirstOrDefault(n => n.Id == inEdge.SourceNodeId);
  936. if (prevNode == null) return null;
  937. if (prevNode.Type is "bpmn:userTask" or "user-node" or "task-node")
  938. return prevNode.Id;
  939. // 递归跳过网关等非用户任务节点
  940. return FindPrevUserTaskNodeId(flowData, prevNode.Id);
  941. }
  942. private async Task<bool> IsNodeCompleted(ApprovalFlowInstance instance, string nodeId)
  943. {
  944. var flowData = DeserializeFlowJson(instance.FlowJsonSnapshot);
  945. var node = flowData.Nodes.FirstOrDefault(n => n.Id == nodeId);
  946. var mode = node?.Properties?.MultiApproveMode;
  947. if (mode == nameof(MultiApproveModeEnum.All))
  948. {
  949. var pendingCount = await _taskRep.AsQueryable()
  950. .Where(t => t.InstanceId == instance.Id && t.NodeId == nodeId && t.Status == FlowTaskStatusEnum.Pending)
  951. .CountAsync();
  952. return pendingCount == 0;
  953. }
  954. // 默认或签(Any):一人通过即完成,取消其他 Pending
  955. await CancelPendingTasks(instance.Id, nodeId);
  956. return true;
  957. }
  958. private async Task CancelPendingTasks(long instanceId, string nodeId, long? excludeTaskId = null)
  959. {
  960. var tasks = await _taskRep.AsQueryable()
  961. .Where(t => t.InstanceId == instanceId && t.NodeId == nodeId && t.Status == FlowTaskStatusEnum.Pending)
  962. .WhereIF(excludeTaskId.HasValue, t => t.Id != excludeTaskId!.Value)
  963. .ToListAsync();
  964. foreach (var t in tasks)
  965. {
  966. t.Status = FlowTaskStatusEnum.Cancelled;
  967. t.ActionTime = DateTime.Now;
  968. }
  969. if (tasks.Count > 0)
  970. await _taskRep.AsUpdateable(tasks).ExecuteCommandAsync();
  971. }
  972. /// <summary>
  973. /// 取消整个实例下所有剩余 Pending 任务(Reject 时跨并行分支使用)
  974. /// </summary>
  975. private async Task CancelAllPendingTasks(long instanceId, long? excludeTaskId = null)
  976. {
  977. var tasks = await _taskRep.AsQueryable()
  978. .Where(t => t.InstanceId == instanceId && t.Status == FlowTaskStatusEnum.Pending)
  979. .WhereIF(excludeTaskId.HasValue, t => t.Id != excludeTaskId!.Value)
  980. .ToListAsync();
  981. foreach (var t in tasks)
  982. {
  983. t.Status = FlowTaskStatusEnum.Cancelled;
  984. t.ActionTime = DateTime.Now;
  985. }
  986. if (tasks.Count > 0)
  987. await _taskRep.AsUpdateable(tasks).ExecuteCommandAsync();
  988. }
  989. private async Task<ApprovalFlowTask> GetPendingTask(long taskId)
  990. {
  991. var task = await _taskRep.GetByIdAsync(taskId)
  992. ?? throw Oops.Oh("审批任务不存在");
  993. if (task.Status != FlowTaskStatusEnum.Pending)
  994. throw Oops.Oh("该任务已处理");
  995. if (task.AssigneeId != _userManager.UserId)
  996. throw Oops.Oh("当前用户不是该任务的审批人");
  997. return task;
  998. }
  999. private async Task WriteLog(long instanceId, long? taskId, string? nodeId, FlowLogActionEnum action, string? comment)
  1000. {
  1001. await _logRep.InsertAsync(new ApprovalFlowLog
  1002. {
  1003. InstanceId = instanceId,
  1004. TaskId = taskId,
  1005. NodeId = nodeId,
  1006. Action = action,
  1007. OperatorId = _userManager.UserId,
  1008. OperatorName = _userManager.RealName,
  1009. Comment = comment,
  1010. });
  1011. }
  1012. private static ApprovalFlowItem DeserializeFlowJson(string? json)
  1013. {
  1014. if (string.IsNullOrWhiteSpace(json))
  1015. throw Oops.Oh("FlowJson 快照为空");
  1016. return JsonSerializer.Deserialize<ApprovalFlowItem>(json)
  1017. ?? throw Oops.Oh("FlowJson 反序列化失败");
  1018. }
  1019. private async Task InvokeHandler(string bizType, long instanceId, Func<IFlowBizHandler, Task> action)
  1020. {
  1021. var handlers = App.GetServices<IFlowBizHandler>();
  1022. var handler = handlers?.FirstOrDefault(h => h.BizType == bizType);
  1023. if (handler == null)
  1024. return;
  1025. try
  1026. {
  1027. await action(handler);
  1028. }
  1029. catch (Exception ex)
  1030. {
  1031. _logger.LogError(ex, "FlowBizHandler 执行失败: BizType={BizType}, InstanceId={InstanceId}, ErrorType={ErrorType}, ErrorMessage={ErrorMessage}",
  1032. bizType, instanceId, ex.GetType().FullName, ex.Message);
  1033. var notifiers = App.GetServices<IFlowHandlerFailureNotifier>();
  1034. if (notifiers != null)
  1035. {
  1036. foreach (var notifier in notifiers)
  1037. {
  1038. try
  1039. {
  1040. await notifier.NotifyAsync(bizType, instanceId, ex);
  1041. }
  1042. catch (Exception notifyEx)
  1043. {
  1044. _logger.LogError(notifyEx, "FlowHandlerFailureNotifier 执行失败: NotifierType={NotifierType}, BizType={BizType}, InstanceId={InstanceId}",
  1045. notifier.GetType().FullName, bizType, instanceId);
  1046. }
  1047. }
  1048. }
  1049. throw;
  1050. }
  1051. }
  1052. }