using System.Text.Json; using Admin.NET.Core.Service; namespace Admin.NET.Plugin.ApprovalFlow.Service; /// /// 流程推进引擎 — 核心状态机 /// 不暴露为 API,由其他 Service 内部调用 /// public class FlowEngineService : ITransient { private readonly SqlSugarRepository _flowRep; private readonly SqlSugarRepository _instanceRep; private readonly SqlSugarRepository _taskRep; private readonly SqlSugarRepository _logRep; private readonly SqlSugarRepository _userRoleRep; private readonly SqlSugarRepository _userRep; private readonly SqlSugarRepository _orgRep; private readonly SqlSugarRepository _delegateRep; private readonly SqlSugarRepository _completedNodeRep; private readonly UserManager _userManager; private readonly SysOrgService _sysOrgService; private readonly FlowNotifyService _notifyService; public FlowEngineService( SqlSugarRepository flowRep, SqlSugarRepository instanceRep, SqlSugarRepository taskRep, SqlSugarRepository logRep, SqlSugarRepository userRoleRep, SqlSugarRepository userRep, SqlSugarRepository orgRep, SqlSugarRepository delegateRep, SqlSugarRepository completedNodeRep, UserManager userManager, SysOrgService sysOrgService, FlowNotifyService notifyService) { _flowRep = flowRep; _instanceRep = instanceRep; _taskRep = taskRep; _logRep = logRep; _userRoleRep = userRoleRep; _userRep = userRep; _orgRep = orgRep; _delegateRep = delegateRep; _completedNodeRep = completedNodeRep; _userManager = userManager; _sysOrgService = sysOrgService; _notifyService = notifyService; } /// /// 机构数据权限依赖实体的 OrgId;插入时必须与当前用户机构一致,否则非超管在「我发起的/待办」中查不到(OrgId=0 会被过滤)。 /// private async Task ResolveOrgIdForNewFlowEntityAsync() { var oid = _userManager.OrgId; if (oid > 0) return oid; var list = await _sysOrgService.GetUserOrgIdList(); return list is { Count: > 0 } ? list[0] : 0; } // ═══════════════════════════════════════════ // 核心生命周期 // ═══════════════════════════════════════════ /// /// 发起流程 /// public async Task StartFlow(StartFlowInput input) { var flow = await _flowRep.AsQueryable() .Where(u => u.BizType == input.BizType && u.IsPublished && !u.IsDelete) .OrderByDescending(u => u.Version) .FirstAsync() ?? throw Oops.Oh($"未找到业务类型 [{input.BizType}] 的已发布流程定义"); if (string.IsNullOrWhiteSpace(flow.FlowJson)) throw Oops.Oh("流程定义的 FlowJson 为空,请先设计流程图"); var flowData = JsonSerializer.Deserialize(flow.FlowJson) ?? throw Oops.Oh("FlowJson 反序列化失败"); var orgId = await ResolveOrgIdForNewFlowEntityAsync(); var instance = new ApprovalFlowInstance { FlowId = flow.Id, FlowVersion = flow.Version, BizType = input.BizType, BizId = input.BizId, BizNo = input.BizNo, Title = input.Title ?? $"{flow.Name}-{input.BizNo}", InitiatorId = _userManager.UserId, InitiatorName = _userManager.RealName, Status = FlowInstanceStatusEnum.Running, FlowJsonSnapshot = flow.FlowJson, StartTime = DateTime.Now, OrgId = orgId, }; var startNode = flowData.Nodes.FirstOrDefault(n => n.Type is "bpmn:startEvent" or "start-node") ?? throw Oops.Oh("流程图中未找到开始节点"); await _instanceRep.InsertAsync(instance); await WriteLog(instance.Id, null, startNode.Id, FlowLogActionEnum.Submit, input.Comment); // 开始节点登记为已完成,随后沿出边推进(兼容首节点即为并行网关 Fork 的拓扑) await MarkNodeCompleted(instance.Id, startNode); var firstOutgoing = flowData.Edges.Where(e => e.SourceNodeId == startNode.Id).Select(e => e.TargetNodeId).ToList(); if (firstOutgoing.Count == 0) throw Oops.Oh("流程图开始节点未连接任何后继节点"); foreach (var target in firstOutgoing) { await ProcessNextNode(instance, flowData, target); } await InvokeHandler(input.BizType, h => h.OnFlowStarted(input.BizId, instance.Id)); return instance.Id; } /// /// 同意 /// public async Task Approve(long taskId, string? comment) { var task = await GetPendingTask(taskId); task.Status = FlowTaskStatusEnum.Approved; task.Comment = comment; task.ActionTime = DateTime.Now; await _taskRep.AsUpdateable(task).ExecuteCommandAsync(); // P1-6 审批代理:同步取消配对的本人/代理任务 await CancelPairedDelegateTask(task); await WriteLog(task.InstanceId, taskId, task.NodeId, FlowLogActionEnum.Approve, comment); var instance = await _instanceRep.GetByIdAsync(task.InstanceId) ?? throw Oops.Oh("流程实例不存在"); if (await IsNodeCompleted(instance, task.NodeId)) { await InvokeHandler(instance.BizType, h => h.OnNodeCompleted(instance.BizId, instance.Id, task.NodeId, task.NodeName ?? "", task.AssigneeId)); var flowData = DeserializeFlowJson(instance.FlowJsonSnapshot); await AdvanceToNext(instance, flowData, task.NodeId); } } /// /// 拒绝 /// public async Task Reject(long taskId, string? comment) { var task = await GetPendingTask(taskId); task.Status = FlowTaskStatusEnum.Rejected; task.Comment = comment; task.ActionTime = DateTime.Now; await _taskRep.AsUpdateable(task).ExecuteCommandAsync(); // Reject 导致流程终止:取消整个实例所有剩余 Pending 任务(含并行分支、配对代理任务等) await CancelAllPendingTasks(task.InstanceId, task.Id); var instance = await _instanceRep.GetByIdAsync(task.InstanceId) ?? throw Oops.Oh("流程实例不存在"); instance.Status = FlowInstanceStatusEnum.Rejected; instance.EndTime = DateTime.Now; await _instanceRep.AsUpdateable(instance).ExecuteCommandAsync(); await WriteLog(instance.Id, taskId, task.NodeId, FlowLogActionEnum.Reject, comment); // P4-17: 驳回人 = 当前被指派的审批人 await InvokeHandler(instance.BizType, h => h.OnFlowCompleted(instance.BizId, instance.Id, FlowInstanceStatusEnum.Rejected, task.AssigneeId)); await _notifyService.NotifyFlowCompleted(instance.InitiatorId, instance.Id, instance.Title, FlowInstanceStatusEnum.Rejected, instance.BizType); } // ═══════════════════════════════════════════ // 扩展操作 // ═══════════════════════════════════════════ /// /// 转办 /// public async Task Transfer(long taskId, long targetUserId, string? comment) { var task = await GetPendingTask(taskId); task.Status = FlowTaskStatusEnum.Transferred; task.Comment = comment; task.ActionTime = DateTime.Now; task.TransferToId = targetUserId; await _taskRep.AsUpdateable(task).ExecuteCommandAsync(); var targetUser = await _userRep.GetByIdAsync(targetUserId); var instForOrg = await _instanceRep.GetByIdAsync(task.InstanceId); var newTask = new ApprovalFlowTask { InstanceId = task.InstanceId, NodeId = task.NodeId, NodeName = task.NodeName, AssigneeId = targetUserId, AssigneeName = targetUser?.RealName, Status = FlowTaskStatusEnum.Pending, OrgId = instForOrg?.OrgId ?? 0, }; await _taskRep.InsertAsync(newTask); await WriteLog(task.InstanceId, taskId, task.NodeId, FlowLogActionEnum.Transfer, $"{comment} → 转办给 {targetUser?.RealName}"); var instance = await _instanceRep.GetByIdAsync(task.InstanceId); await _notifyService.NotifyTransferred(targetUserId, task.InstanceId, instance?.Title ?? "", _userManager.RealName, instance?.BizType ?? ""); } /// /// 撤回(发起人撤回) /// public async Task Withdraw(long instanceId) { var instance = await _instanceRep.GetByIdAsync(instanceId) ?? throw Oops.Oh("流程实例不存在"); if (instance.InitiatorId != _userManager.UserId) throw Oops.Oh("只有发起人可以撤回"); if (instance.Status != FlowInstanceStatusEnum.Running) throw Oops.Oh("当前流程状态不允许撤回"); var pendingTasks = await _taskRep.AsQueryable() .Where(t => t.InstanceId == instanceId && t.Status == FlowTaskStatusEnum.Pending) .ToListAsync(); var doneTasks = await _taskRep.AsQueryable() .Where(t => t.InstanceId == instanceId && t.Status != FlowTaskStatusEnum.Pending && t.Status != FlowTaskStatusEnum.Cancelled) .CountAsync(); if (doneTasks > 0) throw Oops.Oh("已有人审批过,不可撤回"); var cancelledUserIds = pendingTasks.Select(t => t.AssigneeId).Distinct().ToList(); foreach (var t in pendingTasks) { t.Status = FlowTaskStatusEnum.Cancelled; t.ActionTime = DateTime.Now; } await _taskRep.AsUpdateable(pendingTasks).ExecuteCommandAsync(); instance.Status = FlowInstanceStatusEnum.Cancelled; instance.EndTime = DateTime.Now; await _instanceRep.AsUpdateable(instance).ExecuteCommandAsync(); await WriteLog(instanceId, null, instance.CurrentNodeId, FlowLogActionEnum.Withdraw, null); // P4-17: 撤回场景无"审批人",lastApproverId = null await InvokeHandler(instance.BizType, h => h.OnFlowCompleted(instance.BizId, instanceId, FlowInstanceStatusEnum.Cancelled, null)); await _notifyService.NotifyWithdrawn(cancelledUserIds, instanceId, instance.Title, instance.InitiatorName, instance.BizType); } /// /// 退回上一步 /// public async Task ReturnToPrev(long taskId, string? comment) { var task = await GetPendingTask(taskId); var instance = await _instanceRep.GetByIdAsync(task.InstanceId) ?? throw Oops.Oh("流程实例不存在"); var flowData = DeserializeFlowJson(instance.FlowJsonSnapshot); var prevNodeId = FindPrevUserTaskNodeId(flowData, task.NodeId); if (prevNodeId == null) throw Oops.Oh("已是第一个审批节点,无法退回"); await CancelPendingTasks(task.InstanceId, task.NodeId); task.Status = FlowTaskStatusEnum.Returned; task.Comment = comment; task.ActionTime = DateTime.Now; await _taskRep.AsUpdateable(task).ExecuteCommandAsync(); instance.CurrentNodeId = prevNodeId; await _instanceRep.AsUpdateable(instance).ExecuteCommandAsync(); await CreateTasksForNode(instance, flowData, prevNodeId); await WriteLog(instance.Id, taskId, task.NodeId, FlowLogActionEnum.Return, comment); var returnedTasks = await _taskRep.AsQueryable() .Where(t => t.InstanceId == instance.Id && t.NodeId == prevNodeId && t.Status == FlowTaskStatusEnum.Pending) .ToListAsync(); var returnedUserIds = returnedTasks.Select(t => t.AssigneeId).Distinct().ToList(); await _notifyService.NotifyReturned(returnedUserIds, instance.Id, instance.Title, _userManager.RealName, instance.BizType); } /// /// 加签 /// public async Task AddSign(long taskId, long targetUserId, string? comment) { var task = await GetPendingTask(taskId); var targetUser = await _userRep.GetByIdAsync(targetUserId); var instForAddSign = await _instanceRep.GetByIdAsync(task.InstanceId); var newTask = new ApprovalFlowTask { InstanceId = task.InstanceId, NodeId = task.NodeId, NodeName = task.NodeName, AssigneeId = targetUserId, AssigneeName = targetUser?.RealName, Status = FlowTaskStatusEnum.Pending, IsAddSign = true, AddSignById = _userManager.UserId, OrgId = instForAddSign?.OrgId ?? 0, }; await _taskRep.InsertAsync(newTask); await WriteLog(task.InstanceId, taskId, task.NodeId, FlowLogActionEnum.AddSign, $"{comment} → 加签给 {targetUser?.RealName}"); var instance = await _instanceRep.GetByIdAsync(task.InstanceId); await _notifyService.NotifyAddSign(targetUserId, task.InstanceId, instance?.Title ?? "", _userManager.RealName, instance?.BizType ?? ""); } /// /// 手动升级 — 当前审批人主动将任务升级到更高层级 /// public async Task Escalate(long taskId, string? comment) { var task = await GetPendingTask(taskId); var instance = await _instanceRep.GetByIdAsync(task.InstanceId) ?? throw Oops.Oh("流程实例不存在"); var flowData = DeserializeFlowJson(instance.FlowJsonSnapshot); var node = flowData.Nodes.FirstOrDefault(n => n.Id == task.NodeId) ?? throw Oops.Oh("节点不存在"); var props = node.Properties; if (props?.EnableManualEscalation != true || string.IsNullOrWhiteSpace(props.EscalationApproverType) || string.IsNullOrWhiteSpace(props.EscalationApproverIds)) throw Oops.Oh("该节点未配置升级目标,无法升级"); task.Status = FlowTaskStatusEnum.Escalated; task.Comment = comment; task.ActionTime = DateTime.Now; await _taskRep.AsUpdateable(task).ExecuteCommandAsync(); await CancelPendingTasks(task.InstanceId, task.NodeId, task.Id); var escalationApprovers = await ResolveApprovers( new FlowProperties { ApproverType = props.EscalationApproverType, ApproverIds = props.EscalationApproverIds, ApproverNames = props.EscalationApproverNames, }, instance.InitiatorId); if (escalationApprovers.Count == 0) throw Oops.Oh("升级目标审批人列表为空"); var newTasks = escalationApprovers.Select(a => new ApprovalFlowTask { InstanceId = instance.Id, NodeId = task.NodeId, NodeName = task.NodeName, AssigneeId = a.userId, AssigneeName = a.userName, Status = FlowTaskStatusEnum.Pending, OrgId = instance.OrgId, }).ToList(); await _taskRep.AsInsertable(newTasks).ExecuteCommandAsync(); var targetNames = string.Join(", ", escalationApprovers.Select(a => a.userName)); await WriteLog(instance.Id, taskId, task.NodeId, FlowLogActionEnum.Escalate, $"{comment} → 升级给 {targetNames}"); var targetUserIds = escalationApprovers.Select(a => a.userId).Distinct().ToList(); await _notifyService.NotifyEscalated(targetUserIds, instance.Id, instance.Title, _userManager.RealName, task.NodeName, instance.BizType); } /// /// 催办 /// public async Task Urge(long instanceId) { var instance = await _instanceRep.GetByIdAsync(instanceId) ?? throw Oops.Oh("流程实例不存在"); if (instance.Status != FlowInstanceStatusEnum.Running) throw Oops.Oh("当前流程不在审批中"); await WriteLog(instanceId, null, instance.CurrentNodeId, FlowLogActionEnum.Urge, "催办"); var pendingTasks = await _taskRep.AsQueryable() .Where(t => t.InstanceId == instanceId && t.Status == FlowTaskStatusEnum.Pending) .ToListAsync(); var userIds = pendingTasks.Select(t => t.AssigneeId).Distinct().ToList(); await _notifyService.NotifyUrge(userIds, instanceId, instance.Title, instance.BizType); } // ═══════════════════════════════════════════ // 超时自动处理(由 FlowTimeoutJob 调用,无 UserManager 上下文) // ═══════════════════════════════════════════ /// /// 扫描所有 Pending 且已超过 timeoutHours 的任务,按节点 timeoutAction 执行对应动作。 /// 与 FlowTimeoutJob 扫描等价,抽出公共方法以便管理员接口立即触发(运维/E2E 测试用)。 /// 返回本次处理的任务条数。 /// public async Task ScanTimeoutTasks(CancellationToken stoppingToken = default) { var pendingTasks = await _taskRep.Context.Queryable() .InnerJoin((t, i) => t.InstanceId == i.Id) .Where((t, i) => t.Status == FlowTaskStatusEnum.Pending && i.Status == FlowInstanceStatusEnum.Running) .Select((t, i) => new { TaskId = t.Id, TaskCreatedAt = t.CreateTime, NodeId = t.NodeId, FlowJsonSnapshot = i.FlowJsonSnapshot, }) .ToListAsync(); var now = DateTime.Now; var processed = 0; foreach (var item in pendingTasks) { if (stoppingToken.IsCancellationRequested) break; if (string.IsNullOrWhiteSpace(item.FlowJsonSnapshot)) continue; ApprovalFlowItem? flowData; try { flowData = JsonSerializer.Deserialize(item.FlowJsonSnapshot); } catch { continue; } var node = flowData?.Nodes?.FirstOrDefault(n => n.Id == item.NodeId); var props = node?.Properties; if (props?.TimeoutHours == null || props.TimeoutHours <= 0) continue; if (string.IsNullOrWhiteSpace(props.TimeoutAction)) continue; var deadline = item.TaskCreatedAt.AddHours(props.TimeoutHours.Value); if (now < deadline) continue; // Notify 动作幂等:已记录过 AutoTimeout 日志的跳过,避免每轮重复催办 if (props.TimeoutAction == "Notify") { var alreadyNotified = await _logRep.AsQueryable() .AnyAsync(log => log.TaskId == item.TaskId && log.Action == FlowLogActionEnum.AutoTimeout); if (alreadyNotified) continue; } try { await HandleTimeoutTask(item.TaskId); processed++; } catch { // 单任务异常隔离:吞掉继续下一个,由 Job/Admin API 层统一记日志 } } return processed; } /// /// 处理单个超时任务(由定时任务调用) /// public async Task HandleTimeoutTask(long taskId) { var task = await _taskRep.GetByIdAsync(taskId); if (task == null || task.Status != FlowTaskStatusEnum.Pending) return; var instance = await _instanceRep.GetByIdAsync(task.InstanceId); if (instance == null || instance.Status != FlowInstanceStatusEnum.Running) return; var flowData = DeserializeFlowJson(instance.FlowJsonSnapshot); var node = flowData.Nodes.FirstOrDefault(n => n.Id == task.NodeId); var props = node?.Properties; if (props == null) return; switch (props.TimeoutAction) { case "Notify": await _notifyService.NotifyTimeout( new List { task.AssigneeId }, instance.Id, instance.Title, instance.BizType); await WriteSystemLog(instance.Id, task.Id, task.NodeId, FlowLogActionEnum.AutoTimeout, "审批超时,已发送提醒通知"); break; case "AutoApprove": await AutoApproveTask(task, instance); break; case "AutoReject": await AutoRejectTask(task, instance); break; case "AutoEscalate": await AutoEscalateTask(task, props, instance); break; } } private async Task AutoApproveTask(ApprovalFlowTask task, ApprovalFlowInstance instance) { task.Status = FlowTaskStatusEnum.Approved; task.Comment = "系统自动通过(超时)"; task.ActionTime = DateTime.Now; await _taskRep.AsUpdateable(task).ExecuteCommandAsync(); await WriteSystemLog(instance.Id, task.Id, task.NodeId, FlowLogActionEnum.AutoTimeout, "审批超时,系统自动通过"); if (await IsNodeCompleted(instance, task.NodeId)) { // P4-17: 系统自动通过,无人工审批人,approverUserId = null await InvokeHandler(instance.BizType, h => h.OnNodeCompleted(instance.BizId, instance.Id, task.NodeId, task.NodeName ?? "", null)); var flowData = DeserializeFlowJson(instance.FlowJsonSnapshot); await AdvanceToNext(instance, flowData, task.NodeId); } } private async Task AutoRejectTask(ApprovalFlowTask task, ApprovalFlowInstance instance) { task.Status = FlowTaskStatusEnum.Rejected; task.Comment = "系统自动拒绝(超时)"; task.ActionTime = DateTime.Now; await _taskRep.AsUpdateable(task).ExecuteCommandAsync(); await CancelPendingTasks(task.InstanceId, task.NodeId, task.Id); instance.Status = FlowInstanceStatusEnum.Rejected; instance.EndTime = DateTime.Now; await _instanceRep.AsUpdateable(instance).ExecuteCommandAsync(); await WriteSystemLog(instance.Id, task.Id, task.NodeId, FlowLogActionEnum.AutoTimeout, "审批超时,系统自动拒绝"); // P4-17: 系统自动拒绝,无人工审批人,lastApproverId = null await InvokeHandler(instance.BizType, h => h.OnFlowCompleted(instance.BizId, instance.Id, FlowInstanceStatusEnum.Rejected, null)); await _notifyService.NotifyFlowCompleted(instance.InitiatorId, instance.Id, instance.Title, FlowInstanceStatusEnum.Rejected, instance.BizType); } private async Task AutoEscalateTask(ApprovalFlowTask task, FlowProperties nodeProps, ApprovalFlowInstance instance) { if (string.IsNullOrWhiteSpace(nodeProps.EscalationApproverType) || string.IsNullOrWhiteSpace(nodeProps.EscalationApproverIds)) return; task.Status = FlowTaskStatusEnum.Escalated; task.Comment = "系统自动升级(超时)"; task.ActionTime = DateTime.Now; await _taskRep.AsUpdateable(task).ExecuteCommandAsync(); await CancelPendingTasks(task.InstanceId, task.NodeId, task.Id); var approvers = await ResolveApprovers( new FlowProperties { ApproverType = nodeProps.EscalationApproverType, ApproverIds = nodeProps.EscalationApproverIds, }, instance.InitiatorId); if (approvers.Count == 0) return; var newTasks = approvers.Select(a => new ApprovalFlowTask { InstanceId = instance.Id, NodeId = task.NodeId, NodeName = task.NodeName, AssigneeId = a.userId, AssigneeName = a.userName, Status = FlowTaskStatusEnum.Pending, OrgId = instance.OrgId, }).ToList(); await _taskRep.AsInsertable(newTasks).ExecuteCommandAsync(); var targetNames = string.Join(", ", approvers.Select(a => a.userName)); await WriteSystemLog(instance.Id, task.Id, task.NodeId, FlowLogActionEnum.AutoTimeout, $"审批超时,自动升级给 {targetNames}"); var targetUserIds = approvers.Select(a => a.userId).Distinct().ToList(); await _notifyService.NotifyEscalated(targetUserIds, instance.Id, instance.Title, "系统", task.NodeName, instance.BizType); } private async Task WriteSystemLog(long instanceId, long? taskId, string? nodeId, FlowLogActionEnum action, string? comment) { await _logRep.InsertAsync(new ApprovalFlowLog { InstanceId = instanceId, TaskId = taskId, NodeId = nodeId, Action = action, OperatorId = 0, OperatorName = "系统", Comment = comment, }); } // ═══════════════════════════════════════════ // 内部引擎方法 // ═══════════════════════════════════════════ /// /// 推进到下一节点。支持并行网关(Fork / Join)。 /// 行为约定: /// - 进入本方法前,调用方(如 )应已将 (userTask)标记为完成节点; /// 本方法会将途经的网关节点也写入 。 /// - 并行网关 Fork(出边>=2):沿每条出边递归推进; /// 并行网关 Join(入边>=2):校验所有前驱节点是否都在"已完成"集合中, /// 任一尚未完成则**静默等待**(不报错、不推进),由后续分支完成后再次触发 Join 校验。 /// private async Task AdvanceToNext(ApprovalFlowInstance instance, ApprovalFlowItem flowData, string currentNodeId) { // 当前节点可能是 userTask(完成记录由 Approve 写入)或网关(入口处已写入);此处统一确保幂等入库 var currentNode = flowData.Nodes.FirstOrDefault(n => n.Id == currentNodeId); await MarkNodeCompleted(instance.Id, currentNode); var outgoingEdges = flowData.Edges.Where(e => e.SourceNodeId == currentNodeId).ToList(); if (outgoingEdges.Count == 0) { await CompleteInstance(instance, FlowInstanceStatusEnum.Approved); return; } // 非并行网关场景:当前节点一般只有 1 条出边 foreach (var edge in outgoingEdges) { await ProcessNextNode(instance, flowData, edge.TargetNodeId); } } /// /// 处理某个"下一节点"。根据节点类型分发: /// - endEvent:所有分支任务都结束时触发实例完成 /// - exclusiveGateway:按条件选择分支 /// - parallelGateway:Fork 并行分发;Join 等待所有前驱完成 /// - userTask / 其他:创建任务 /// private async Task ProcessNextNode(ApprovalFlowInstance instance, ApprovalFlowItem flowData, string nextNodeId) { var nextNode = flowData.Nodes.FirstOrDefault(n => n.Id == nextNodeId); if (nextNode == null) { await CompleteInstance(instance, FlowInstanceStatusEnum.Approved); return; } if (nextNode.Type is "bpmn:endEvent" or "end-node") { // 所有并行分支都已结束(无其它 Pending 任务)才真正完成实例 var hasOtherPending = await _taskRep.AsQueryable() .AnyAsync(t => t.InstanceId == instance.Id && t.Status == FlowTaskStatusEnum.Pending); if (hasOtherPending) return; await CompleteInstance(instance, FlowInstanceStatusEnum.Approved); return; } if (nextNode.Type is "bpmn:exclusiveGateway") { await MarkNodeCompleted(instance.Id, nextNode); var bizData = await GetBizData(instance.BizType, instance.BizId); var targetNodeId = EvaluateGateway(nextNode.Properties?.Conditions, flowData, nextNode.Id, bizData); instance.CurrentNodeId = targetNodeId; await _instanceRep.AsUpdateable(instance).UpdateColumns(i => new { i.CurrentNodeId }).ExecuteCommandAsync(); await ProcessNextNode(instance, flowData, targetNodeId); return; } if (nextNode.Type is "bpmn:parallelGateway") { var incoming = flowData.Edges.Where(e => e.TargetNodeId == nextNode.Id).Select(e => e.SourceNodeId).ToList(); var outgoing = flowData.Edges.Where(e => e.SourceNodeId == nextNode.Id).Select(e => e.TargetNodeId).ToList(); // Join 语义:入边 >= 2,需等所有前驱都已完成 if (incoming.Count >= 2) { var completedSet = await GetCompletedNodeIdSet(instance.Id); if (!incoming.All(p => completedSet.Contains(p))) { // 未汇合,静默等待后续分支抵达 return; } } // Fork 或 Join 通过:标记网关完成,沿所有出边推进 await MarkNodeCompleted(instance.Id, nextNode); foreach (var target in outgoing) { await ProcessNextNode(instance, flowData, target); } return; } // userTask 或其他:创建任务 instance.CurrentNodeId = nextNodeId; await _instanceRep.AsUpdateable(instance).UpdateColumns(i => new { i.CurrentNodeId }).ExecuteCommandAsync(); await CreateTasksForNode(instance, flowData, nextNodeId); } /// /// 标记节点已完成(幂等:重复写入被唯一索引拦截后忽略) /// private async Task MarkNodeCompleted(long instanceId, ApprovalFlowNodeItem? node) { if (node == null) return; try { await _completedNodeRep.InsertAsync(new ApprovalFlowCompletedNode { InstanceId = instanceId, NodeId = node.Id, NodeName = node.Properties?.NodeName ?? node.Text?.Value, NodeType = node.Type, CompletedTime = DateTime.Now, }); } catch { // 并发场景下可能触发唯一索引冲突,忽略(已存在即可) } } /// /// 查询实例已完成节点 Id 集合 /// private async Task> GetCompletedNodeIdSet(long instanceId) { var ids = await _completedNodeRep.AsQueryable() .Where(c => c.InstanceId == instanceId) .Select(c => c.NodeId) .ToListAsync(); return new HashSet(ids); } private async Task CompleteInstance(ApprovalFlowInstance instance, FlowInstanceStatusEnum status) { // 幂等:并行分支同时到达 end 时避免重复完成 var latest = await _instanceRep.GetByIdAsync(instance.Id); if (latest == null || latest.Status != FlowInstanceStatusEnum.Running) return; instance.Status = status; instance.EndTime = DateTime.Now; await _instanceRep.AsUpdateable(instance) .UpdateColumns(i => new { i.Status, i.EndTime }) .ExecuteCommandAsync(); // P4-17: 正常完成路径的 lastApproverId = 最后一条人工 Approve 日志的操作人(系统自动通过不算) var lastApproverId = status == FlowInstanceStatusEnum.Approved ? await GetLastHumanApproverIdAsync(instance.Id) : null; await InvokeHandler(instance.BizType, h => h.OnFlowCompleted(instance.BizId, instance.Id, status, lastApproverId)); await _notifyService.NotifyFlowCompleted(instance.InitiatorId, instance.Id, instance.Title, status, instance.BizType); } /// /// P4-17: 查询该实例最后一条人工 Approve 日志的操作人 UserId /// (系统超时自动通过写入的是 AutoTimeout,不会被此查询命中) /// private async Task GetLastHumanApproverIdAsync(long instanceId) { var lastLog = await _logRep.AsQueryable() .Where(l => l.InstanceId == instanceId && l.Action == FlowLogActionEnum.Approve) .OrderByDescending(l => l.CreateTime) .FirstAsync(); return lastLog?.OperatorId > 0 ? lastLog.OperatorId : null; } private async Task CreateTasksForNode(ApprovalFlowInstance instance, ApprovalFlowItem flowData, string nodeId) { var node = flowData.Nodes.FirstOrDefault(n => n.Id == nodeId) ?? throw Oops.Oh($"FlowJson 中未找到节点 [{nodeId}]"); var approvers = await ResolveApprovers(node.Properties, instance.InitiatorId); if (approvers.Count == 0) throw Oops.Oh($"节点 [{node.Properties?.NodeName ?? nodeId}] 未配置审批人或审批人列表为空"); var nodeName = node.Properties?.NodeName ?? node.Text?.Value; var tasks = approvers.Select(a => new ApprovalFlowTask { InstanceId = instance.Id, NodeId = nodeId, NodeName = nodeName, AssigneeId = a.userId, AssigneeName = a.userName, Status = FlowTaskStatusEnum.Pending, OrgId = instance.OrgId, }).ToList(); // P1-6 审批代理:为每个原审批人检查是否存在有效代理,是则并行创建一条代理任务 var delegateTasks = new List(); foreach (var a in approvers) { var del = await FindEffectiveDelegate(a.userId, instance.BizType); if (del == null) continue; // 代理人和原审批人不能重复,代理人也不能是原审批人列表中其他人(避免同一人两条任务) if (approvers.Any(x => x.userId == del.DelegateUserId)) continue; delegateTasks.Add(new ApprovalFlowTask { InstanceId = instance.Id, NodeId = nodeId, NodeName = nodeName, AssigneeId = del.DelegateUserId, AssigneeName = del.DelegateUserName, Status = FlowTaskStatusEnum.Pending, OrgId = instance.OrgId, IsDelegate = true, DelegateForUserId = a.userId, DelegateForUserName = a.userName, }); } var allTasks = tasks.Concat(delegateTasks).ToList(); await _taskRep.AsInsertable(allTasks).ExecuteCommandAsync(); var assigneeIds = allTasks.Select(t => t.AssigneeId).Distinct().ToList(); await _notifyService.NotifyNewTask(assigneeIds, instance.Id, instance.Title, nodeName, instance.BizType); } /// /// 查找指定用户当前生效的审批代理(时间窗口内 + 已启用 + BizType 匹配或全局) /// private async Task FindEffectiveDelegate(long userId, string? bizType) { var now = DateTime.Now; return await _delegateRep.AsQueryable() .Where(d => d.UserId == userId && d.IsEnabled && d.StartTime <= now && d.EndTime >= now && (string.IsNullOrEmpty(d.BizType) || d.BizType == bizType)) .OrderBy(d => d.BizType == null ? 1 : 0) // 优先匹配指定 BizType 的代理 .OrderByDescending(d => d.CreateTime) .FirstAsync(); } /// /// 取消与已完成任务配对的代理任务(P1-6) /// - 本人任务完成:取消对应的代理任务 /// - 代理任务完成:取消对应的本人任务 /// private async Task CancelPairedDelegateTask(ApprovalFlowTask completedTask) { ApprovalFlowTask? paired; if (completedTask.IsDelegate) { var originalUserId = completedTask.DelegateForUserId ?? 0; if (originalUserId == 0) return; paired = await _taskRep.AsQueryable() .Where(t => t.InstanceId == completedTask.InstanceId && t.NodeId == completedTask.NodeId && t.Status == FlowTaskStatusEnum.Pending && t.AssigneeId == originalUserId && !t.IsDelegate) .FirstAsync(); } else { paired = await _taskRep.AsQueryable() .Where(t => t.InstanceId == completedTask.InstanceId && t.NodeId == completedTask.NodeId && t.Status == FlowTaskStatusEnum.Pending && t.IsDelegate && t.DelegateForUserId == completedTask.AssigneeId) .FirstAsync(); } if (paired == null) return; paired.Status = FlowTaskStatusEnum.Cancelled; paired.ActionTime = DateTime.Now; await _taskRep.AsUpdateable(paired).ExecuteCommandAsync(); } private async Task> ResolveApprovers(FlowProperties? props, long initiatorId) { if (props == null || string.IsNullOrWhiteSpace(props.ApproverType)) return new List<(long, string)>(); var approverType = props.ApproverType; if (approverType == nameof(ApproverTypeEnum.Initiator)) { var initiator = await _userRep.GetByIdAsync(initiatorId); return initiator != null ? new List<(long, string)> { (initiator.Id, initiator.RealName ?? "") } : new List<(long, string)>(); } if (string.IsNullOrWhiteSpace(props.ApproverIds)) return new List<(long, string)>(); var ids = props.ApproverIds.Split(',', StringSplitOptions.RemoveEmptyEntries) .Select(s => long.TryParse(s.Trim(), out var v) ? v : 0).Where(id => id > 0).ToList(); if (approverType == nameof(ApproverTypeEnum.SpecificUser)) { var users = await _userRep.AsQueryable() .Where(u => ids.Contains(u.Id)).ToListAsync(); return users.Select(u => (u.Id, u.RealName ?? "")).ToList(); } if (approverType == nameof(ApproverTypeEnum.Role)) { var userIds = await _userRoleRep.AsQueryable() .Where(ur => ids.Contains(ur.RoleId)) .Select(ur => ur.UserId) .ToListAsync(); var users = await _userRep.AsQueryable() .Where(u => userIds.Contains(u.Id)).ToListAsync(); return users.Select(u => (u.Id, u.RealName ?? "")).ToList(); } if (approverType == nameof(ApproverTypeEnum.Department)) { var users = await _userRep.AsQueryable() .Where(u => ids.Contains(u.OrgId)).ToListAsync(); return users.Select(u => (u.Id, u.RealName ?? "")).ToList(); } if (approverType == nameof(ApproverTypeEnum.DepartmentLeader)) { var initiator = await _userRep.GetByIdAsync(initiatorId); if (initiator == null || initiator.OrgId <= 0) return new List<(long, string)>(); var org = await _orgRep.GetByIdAsync(initiator.OrgId); if (org?.DirectorId != null && org.DirectorId > 0) { var director = await _userRep.GetByIdAsync(org.DirectorId.Value); if (director != null) return new List<(long, string)> { (director.Id, director.RealName ?? "") }; } if (initiator.ManagerUserId != null && initiator.ManagerUserId > 0) { var manager = await _userRep.GetByIdAsync(initiator.ManagerUserId.Value); if (manager != null) return new List<(long, string)> { (manager.Id, manager.RealName ?? "") }; } return new List<(long, string)>(); } return new List<(long, string)>(); } /// /// 评估排他网关 — 依次尝试各非默认分支的条件表达式,首个匹配的获胜; /// 全不匹配则走默认分支;无默认则走第一条出边 /// 支持简单比较表达式:variable op value(op: ==,!=,>,>=,<,<=) /// private string EvaluateGateway(List? conditions, ApprovalFlowItem flowData, string gatewayNodeId, Dictionary? bizData) { if (conditions != null && conditions.Count > 0 && bizData != null && bizData.Count > 0) { foreach (var cond in conditions.Where(c => !c.IsDefault)) { if (!string.IsNullOrWhiteSpace(cond.Expression) && EvalSimpleExpression(cond.Expression, bizData)) return cond.TargetNodeId; } var defaultBranch = conditions.FirstOrDefault(c => c.IsDefault); if (defaultBranch != null) return defaultBranch.TargetNodeId; } else if (conditions != null && conditions.Count > 0) { var defaultBranch = conditions.FirstOrDefault(c => c.IsDefault); if (defaultBranch != null) return defaultBranch.TargetNodeId; return conditions.First().TargetNodeId; } var edge = flowData.Edges.FirstOrDefault(e => e.SourceNodeId == gatewayNodeId); return edge?.TargetNodeId ?? throw Oops.Oh("排他网关没有出边"); } /// /// 简单表达式求值:支持 "field op value" 格式(如 "urgent == 1", "customLevel >= 3", "amount > 10000") /// 多条件用 && 连接 /// private static bool EvalSimpleExpression(string expression, Dictionary bizData) { var parts = expression.Split("&&", StringSplitOptions.TrimEntries); foreach (var part in parts) { if (!EvalSingleComparison(part.Trim(), bizData)) return false; } return true; } private static bool EvalSingleComparison(string expr, Dictionary bizData) { string[] ops = { ">=", "<=", "!=", "==", ">", "<" }; foreach (var op in ops) { var idx = expr.IndexOf(op, StringComparison.Ordinal); if (idx < 0) continue; var fieldName = expr[..idx].Trim(); var valueStr = expr[(idx + op.Length)..].Trim().Trim('"', '\''); if (!bizData.TryGetValue(fieldName, out var fieldValue)) return false; if (decimal.TryParse(fieldValue?.ToString(), out var numLeft) && decimal.TryParse(valueStr, out var numRight)) { return op switch { "==" => numLeft == numRight, "!=" => numLeft != numRight, ">" => numLeft > numRight, ">=" => numLeft >= numRight, "<" => numLeft < numRight, "<=" => numLeft <= numRight, _ => false, }; } var strLeft = fieldValue?.ToString() ?? ""; return op switch { "==" => strLeft.Equals(valueStr, StringComparison.OrdinalIgnoreCase), "!=" => !strLeft.Equals(valueStr, StringComparison.OrdinalIgnoreCase), _ => false, }; } return false; } private async Task?> GetBizData(string bizType, long bizId) { var handlers = App.GetServices(); var handler = handlers?.FirstOrDefault(h => h.BizType == bizType); if (handler == null) return null; return await handler.GetBizData(bizId); } private string? FindNextNodeId(ApprovalFlowItem flowData, string currentNodeId) { var edge = flowData.Edges.FirstOrDefault(e => e.SourceNodeId == currentNodeId); return edge?.TargetNodeId; } private string? FindPrevUserTaskNodeId(ApprovalFlowItem flowData, string currentNodeId) { var inEdge = flowData.Edges.FirstOrDefault(e => e.TargetNodeId == currentNodeId); if (inEdge == null) return null; var prevNode = flowData.Nodes.FirstOrDefault(n => n.Id == inEdge.SourceNodeId); if (prevNode == null) return null; if (prevNode.Type is "bpmn:userTask" or "user-node" or "task-node") return prevNode.Id; // 递归跳过网关等非用户任务节点 return FindPrevUserTaskNodeId(flowData, prevNode.Id); } private async Task IsNodeCompleted(ApprovalFlowInstance instance, string nodeId) { var flowData = DeserializeFlowJson(instance.FlowJsonSnapshot); var node = flowData.Nodes.FirstOrDefault(n => n.Id == nodeId); var mode = node?.Properties?.MultiApproveMode; if (mode == nameof(MultiApproveModeEnum.All)) { var pendingCount = await _taskRep.AsQueryable() .Where(t => t.InstanceId == instance.Id && t.NodeId == nodeId && t.Status == FlowTaskStatusEnum.Pending) .CountAsync(); return pendingCount == 0; } // 默认或签(Any):一人通过即完成,取消其他 Pending await CancelPendingTasks(instance.Id, nodeId); return true; } private async Task CancelPendingTasks(long instanceId, string nodeId, long? excludeTaskId = null) { var tasks = await _taskRep.AsQueryable() .Where(t => t.InstanceId == instanceId && t.NodeId == nodeId && t.Status == FlowTaskStatusEnum.Pending) .WhereIF(excludeTaskId.HasValue, t => t.Id != excludeTaskId!.Value) .ToListAsync(); foreach (var t in tasks) { t.Status = FlowTaskStatusEnum.Cancelled; t.ActionTime = DateTime.Now; } if (tasks.Count > 0) await _taskRep.AsUpdateable(tasks).ExecuteCommandAsync(); } /// /// 取消整个实例下所有剩余 Pending 任务(Reject 时跨并行分支使用) /// private async Task CancelAllPendingTasks(long instanceId, long? excludeTaskId = null) { var tasks = await _taskRep.AsQueryable() .Where(t => t.InstanceId == instanceId && t.Status == FlowTaskStatusEnum.Pending) .WhereIF(excludeTaskId.HasValue, t => t.Id != excludeTaskId!.Value) .ToListAsync(); foreach (var t in tasks) { t.Status = FlowTaskStatusEnum.Cancelled; t.ActionTime = DateTime.Now; } if (tasks.Count > 0) await _taskRep.AsUpdateable(tasks).ExecuteCommandAsync(); } private async Task GetPendingTask(long taskId) { var task = await _taskRep.GetByIdAsync(taskId) ?? throw Oops.Oh("审批任务不存在"); if (task.Status != FlowTaskStatusEnum.Pending) throw Oops.Oh("该任务已处理"); if (task.AssigneeId != _userManager.UserId) throw Oops.Oh("当前用户不是该任务的审批人"); return task; } private async Task WriteLog(long instanceId, long? taskId, string? nodeId, FlowLogActionEnum action, string? comment) { await _logRep.InsertAsync(new ApprovalFlowLog { InstanceId = instanceId, TaskId = taskId, NodeId = nodeId, Action = action, OperatorId = _userManager.UserId, OperatorName = _userManager.RealName, Comment = comment, }); } private static ApprovalFlowItem DeserializeFlowJson(string? json) { if (string.IsNullOrWhiteSpace(json)) throw Oops.Oh("FlowJson 快照为空"); return JsonSerializer.Deserialize(json) ?? throw Oops.Oh("FlowJson 反序列化失败"); } private async Task InvokeHandler(string bizType, Func action) { var handlers = App.GetServices(); var handler = handlers?.FirstOrDefault(h => h.BizType == bizType); if (handler != null) await action(handler); } }