using System.Text.Json;
using Admin.NET.Core.Service;
namespace Admin.NET.Plugin.ApprovalFlow.Service;
///
/// 流程推进引擎 — 核心状态机
/// 不暴露为 API,由其他 Service 内部调用
///
public class FlowEngineService : ITransient
{
private readonly SqlSugarRepository _flowRep;
private readonly SqlSugarRepository _instanceRep;
private readonly SqlSugarRepository _taskRep;
private readonly SqlSugarRepository _logRep;
private readonly SqlSugarRepository _userRoleRep;
private readonly SqlSugarRepository _userRep;
private readonly SqlSugarRepository _orgRep;
private readonly SqlSugarRepository _delegateRep;
private readonly SqlSugarRepository _completedNodeRep;
private readonly UserManager _userManager;
private readonly SysOrgService _sysOrgService;
private readonly FlowNotifyService _notifyService;
public FlowEngineService(
SqlSugarRepository flowRep,
SqlSugarRepository instanceRep,
SqlSugarRepository taskRep,
SqlSugarRepository logRep,
SqlSugarRepository userRoleRep,
SqlSugarRepository userRep,
SqlSugarRepository orgRep,
SqlSugarRepository delegateRep,
SqlSugarRepository completedNodeRep,
UserManager userManager,
SysOrgService sysOrgService,
FlowNotifyService notifyService)
{
_flowRep = flowRep;
_instanceRep = instanceRep;
_taskRep = taskRep;
_logRep = logRep;
_userRoleRep = userRoleRep;
_userRep = userRep;
_orgRep = orgRep;
_delegateRep = delegateRep;
_completedNodeRep = completedNodeRep;
_userManager = userManager;
_sysOrgService = sysOrgService;
_notifyService = notifyService;
}
///
/// 机构数据权限依赖实体的 OrgId;插入时必须与当前用户机构一致,否则非超管在「我发起的/待办」中查不到(OrgId=0 会被过滤)。
///
private async Task ResolveOrgIdForNewFlowEntityAsync()
{
var oid = _userManager.OrgId;
if (oid > 0) return oid;
var list = await _sysOrgService.GetUserOrgIdList();
return list is { Count: > 0 } ? list[0] : 0;
}
// ═══════════════════════════════════════════
// 核心生命周期
// ═══════════════════════════════════════════
///
/// 发起流程
///
public async Task StartFlow(StartFlowInput input)
{
var flow = await _flowRep.AsQueryable()
.Where(u => u.BizType == input.BizType && u.IsPublished && !u.IsDelete)
.OrderByDescending(u => u.Version)
.FirstAsync() ?? throw Oops.Oh($"未找到业务类型 [{input.BizType}] 的已发布流程定义");
if (string.IsNullOrWhiteSpace(flow.FlowJson))
throw Oops.Oh("流程定义的 FlowJson 为空,请先设计流程图");
var flowData = JsonSerializer.Deserialize(flow.FlowJson)
?? throw Oops.Oh("FlowJson 反序列化失败");
var orgId = await ResolveOrgIdForNewFlowEntityAsync();
var instance = new ApprovalFlowInstance
{
FlowId = flow.Id,
FlowVersion = flow.Version,
BizType = input.BizType,
BizId = input.BizId,
BizNo = input.BizNo,
Title = input.Title ?? $"{flow.Name}-{input.BizNo}",
InitiatorId = _userManager.UserId,
InitiatorName = _userManager.RealName,
Status = FlowInstanceStatusEnum.Running,
FlowJsonSnapshot = flow.FlowJson,
StartTime = DateTime.Now,
OrgId = orgId,
};
var startNode = flowData.Nodes.FirstOrDefault(n =>
n.Type is "bpmn:startEvent" or "start-node")
?? throw Oops.Oh("流程图中未找到开始节点");
await _instanceRep.InsertAsync(instance);
await WriteLog(instance.Id, null, startNode.Id, FlowLogActionEnum.Submit, input.Comment);
// 开始节点登记为已完成,随后沿出边推进(兼容首节点即为并行网关 Fork 的拓扑)
await MarkNodeCompleted(instance.Id, startNode);
var firstOutgoing = flowData.Edges.Where(e => e.SourceNodeId == startNode.Id).Select(e => e.TargetNodeId).ToList();
if (firstOutgoing.Count == 0)
throw Oops.Oh("流程图开始节点未连接任何后继节点");
foreach (var target in firstOutgoing)
{
await ProcessNextNode(instance, flowData, target);
}
await InvokeHandler(input.BizType, h => h.OnFlowStarted(input.BizId, instance.Id));
return instance.Id;
}
///
/// 同意
///
public async Task Approve(long taskId, string? comment)
{
var task = await GetPendingTask(taskId);
task.Status = FlowTaskStatusEnum.Approved;
task.Comment = comment;
task.ActionTime = DateTime.Now;
await _taskRep.AsUpdateable(task).ExecuteCommandAsync();
// P1-6 审批代理:同步取消配对的本人/代理任务
await CancelPairedDelegateTask(task);
await WriteLog(task.InstanceId, taskId, task.NodeId, FlowLogActionEnum.Approve, comment);
var instance = await _instanceRep.GetByIdAsync(task.InstanceId)
?? throw Oops.Oh("流程实例不存在");
if (await IsNodeCompleted(instance, task.NodeId))
{
await InvokeHandler(instance.BizType,
h => h.OnNodeCompleted(instance.BizId, instance.Id, task.NodeId, task.NodeName ?? "", task.AssigneeId));
var flowData = DeserializeFlowJson(instance.FlowJsonSnapshot);
await AdvanceToNext(instance, flowData, task.NodeId);
}
}
///
/// 拒绝
///
public async Task Reject(long taskId, string? comment)
{
var task = await GetPendingTask(taskId);
task.Status = FlowTaskStatusEnum.Rejected;
task.Comment = comment;
task.ActionTime = DateTime.Now;
await _taskRep.AsUpdateable(task).ExecuteCommandAsync();
// Reject 导致流程终止:取消整个实例所有剩余 Pending 任务(含并行分支、配对代理任务等)
await CancelAllPendingTasks(task.InstanceId, task.Id);
var instance = await _instanceRep.GetByIdAsync(task.InstanceId)
?? throw Oops.Oh("流程实例不存在");
instance.Status = FlowInstanceStatusEnum.Rejected;
instance.EndTime = DateTime.Now;
await _instanceRep.AsUpdateable(instance).ExecuteCommandAsync();
await WriteLog(instance.Id, taskId, task.NodeId, FlowLogActionEnum.Reject, comment);
// P4-17: 驳回人 = 当前被指派的审批人
await InvokeHandler(instance.BizType,
h => h.OnFlowCompleted(instance.BizId, instance.Id, FlowInstanceStatusEnum.Rejected, task.AssigneeId));
await _notifyService.NotifyFlowCompleted(instance.InitiatorId, instance.Id, instance.Title, FlowInstanceStatusEnum.Rejected, instance.BizType);
}
// ═══════════════════════════════════════════
// 扩展操作
// ═══════════════════════════════════════════
///
/// 转办
///
public async Task Transfer(long taskId, long targetUserId, string? comment)
{
var task = await GetPendingTask(taskId);
task.Status = FlowTaskStatusEnum.Transferred;
task.Comment = comment;
task.ActionTime = DateTime.Now;
task.TransferToId = targetUserId;
await _taskRep.AsUpdateable(task).ExecuteCommandAsync();
var targetUser = await _userRep.GetByIdAsync(targetUserId);
var instForOrg = await _instanceRep.GetByIdAsync(task.InstanceId);
var newTask = new ApprovalFlowTask
{
InstanceId = task.InstanceId,
NodeId = task.NodeId,
NodeName = task.NodeName,
AssigneeId = targetUserId,
AssigneeName = targetUser?.RealName,
Status = FlowTaskStatusEnum.Pending,
OrgId = instForOrg?.OrgId ?? 0,
};
await _taskRep.InsertAsync(newTask);
await WriteLog(task.InstanceId, taskId, task.NodeId, FlowLogActionEnum.Transfer,
$"{comment} → 转办给 {targetUser?.RealName}");
var instance = await _instanceRep.GetByIdAsync(task.InstanceId);
await _notifyService.NotifyTransferred(targetUserId, task.InstanceId, instance?.Title ?? "", _userManager.RealName, instance?.BizType ?? "");
}
///
/// 撤回(发起人撤回)
///
public async Task Withdraw(long instanceId)
{
var instance = await _instanceRep.GetByIdAsync(instanceId)
?? throw Oops.Oh("流程实例不存在");
if (instance.InitiatorId != _userManager.UserId)
throw Oops.Oh("只有发起人可以撤回");
if (instance.Status != FlowInstanceStatusEnum.Running)
throw Oops.Oh("当前流程状态不允许撤回");
var pendingTasks = await _taskRep.AsQueryable()
.Where(t => t.InstanceId == instanceId && t.Status == FlowTaskStatusEnum.Pending)
.ToListAsync();
var doneTasks = await _taskRep.AsQueryable()
.Where(t => t.InstanceId == instanceId &&
t.Status != FlowTaskStatusEnum.Pending &&
t.Status != FlowTaskStatusEnum.Cancelled)
.CountAsync();
if (doneTasks > 0)
throw Oops.Oh("已有人审批过,不可撤回");
var cancelledUserIds = pendingTasks.Select(t => t.AssigneeId).Distinct().ToList();
foreach (var t in pendingTasks)
{
t.Status = FlowTaskStatusEnum.Cancelled;
t.ActionTime = DateTime.Now;
}
await _taskRep.AsUpdateable(pendingTasks).ExecuteCommandAsync();
instance.Status = FlowInstanceStatusEnum.Cancelled;
instance.EndTime = DateTime.Now;
await _instanceRep.AsUpdateable(instance).ExecuteCommandAsync();
await WriteLog(instanceId, null, instance.CurrentNodeId, FlowLogActionEnum.Withdraw, null);
// P4-17: 撤回场景无"审批人",lastApproverId = null
await InvokeHandler(instance.BizType,
h => h.OnFlowCompleted(instance.BizId, instanceId, FlowInstanceStatusEnum.Cancelled, null));
await _notifyService.NotifyWithdrawn(cancelledUserIds, instanceId, instance.Title, instance.InitiatorName, instance.BizType);
}
///
/// 退回上一步
///
public async Task ReturnToPrev(long taskId, string? comment)
{
var task = await GetPendingTask(taskId);
var instance = await _instanceRep.GetByIdAsync(task.InstanceId)
?? throw Oops.Oh("流程实例不存在");
var flowData = DeserializeFlowJson(instance.FlowJsonSnapshot);
var prevNodeId = FindPrevUserTaskNodeId(flowData, task.NodeId);
if (prevNodeId == null)
throw Oops.Oh("已是第一个审批节点,无法退回");
await CancelPendingTasks(task.InstanceId, task.NodeId);
task.Status = FlowTaskStatusEnum.Returned;
task.Comment = comment;
task.ActionTime = DateTime.Now;
await _taskRep.AsUpdateable(task).ExecuteCommandAsync();
instance.CurrentNodeId = prevNodeId;
await _instanceRep.AsUpdateable(instance).ExecuteCommandAsync();
await CreateTasksForNode(instance, flowData, prevNodeId);
await WriteLog(instance.Id, taskId, task.NodeId, FlowLogActionEnum.Return, comment);
var returnedTasks = await _taskRep.AsQueryable()
.Where(t => t.InstanceId == instance.Id && t.NodeId == prevNodeId && t.Status == FlowTaskStatusEnum.Pending)
.ToListAsync();
var returnedUserIds = returnedTasks.Select(t => t.AssigneeId).Distinct().ToList();
await _notifyService.NotifyReturned(returnedUserIds, instance.Id, instance.Title, _userManager.RealName, instance.BizType);
}
///
/// 加签
///
public async Task AddSign(long taskId, long targetUserId, string? comment)
{
var task = await GetPendingTask(taskId);
var targetUser = await _userRep.GetByIdAsync(targetUserId);
var instForAddSign = await _instanceRep.GetByIdAsync(task.InstanceId);
var newTask = new ApprovalFlowTask
{
InstanceId = task.InstanceId,
NodeId = task.NodeId,
NodeName = task.NodeName,
AssigneeId = targetUserId,
AssigneeName = targetUser?.RealName,
Status = FlowTaskStatusEnum.Pending,
IsAddSign = true,
AddSignById = _userManager.UserId,
OrgId = instForAddSign?.OrgId ?? 0,
};
await _taskRep.InsertAsync(newTask);
await WriteLog(task.InstanceId, taskId, task.NodeId, FlowLogActionEnum.AddSign,
$"{comment} → 加签给 {targetUser?.RealName}");
var instance = await _instanceRep.GetByIdAsync(task.InstanceId);
await _notifyService.NotifyAddSign(targetUserId, task.InstanceId, instance?.Title ?? "", _userManager.RealName, instance?.BizType ?? "");
}
///
/// 手动升级 — 当前审批人主动将任务升级到更高层级
///
public async Task Escalate(long taskId, string? comment)
{
var task = await GetPendingTask(taskId);
var instance = await _instanceRep.GetByIdAsync(task.InstanceId)
?? throw Oops.Oh("流程实例不存在");
var flowData = DeserializeFlowJson(instance.FlowJsonSnapshot);
var node = flowData.Nodes.FirstOrDefault(n => n.Id == task.NodeId)
?? throw Oops.Oh("节点不存在");
var props = node.Properties;
if (props?.EnableManualEscalation != true
|| string.IsNullOrWhiteSpace(props.EscalationApproverType)
|| string.IsNullOrWhiteSpace(props.EscalationApproverIds))
throw Oops.Oh("该节点未配置升级目标,无法升级");
task.Status = FlowTaskStatusEnum.Escalated;
task.Comment = comment;
task.ActionTime = DateTime.Now;
await _taskRep.AsUpdateable(task).ExecuteCommandAsync();
await CancelPendingTasks(task.InstanceId, task.NodeId, task.Id);
var escalationApprovers = await ResolveApprovers(
new FlowProperties
{
ApproverType = props.EscalationApproverType,
ApproverIds = props.EscalationApproverIds,
ApproverNames = props.EscalationApproverNames,
},
instance.InitiatorId);
if (escalationApprovers.Count == 0)
throw Oops.Oh("升级目标审批人列表为空");
var newTasks = escalationApprovers.Select(a => new ApprovalFlowTask
{
InstanceId = instance.Id,
NodeId = task.NodeId,
NodeName = task.NodeName,
AssigneeId = a.userId,
AssigneeName = a.userName,
Status = FlowTaskStatusEnum.Pending,
OrgId = instance.OrgId,
}).ToList();
await _taskRep.AsInsertable(newTasks).ExecuteCommandAsync();
var targetNames = string.Join(", ", escalationApprovers.Select(a => a.userName));
await WriteLog(instance.Id, taskId, task.NodeId, FlowLogActionEnum.Escalate,
$"{comment} → 升级给 {targetNames}");
var targetUserIds = escalationApprovers.Select(a => a.userId).Distinct().ToList();
await _notifyService.NotifyEscalated(targetUserIds, instance.Id, instance.Title,
_userManager.RealName, task.NodeName, instance.BizType);
}
///
/// 催办
///
public async Task Urge(long instanceId)
{
var instance = await _instanceRep.GetByIdAsync(instanceId)
?? throw Oops.Oh("流程实例不存在");
if (instance.Status != FlowInstanceStatusEnum.Running)
throw Oops.Oh("当前流程不在审批中");
await WriteLog(instanceId, null, instance.CurrentNodeId, FlowLogActionEnum.Urge, "催办");
var pendingTasks = await _taskRep.AsQueryable()
.Where(t => t.InstanceId == instanceId && t.Status == FlowTaskStatusEnum.Pending)
.ToListAsync();
var userIds = pendingTasks.Select(t => t.AssigneeId).Distinct().ToList();
await _notifyService.NotifyUrge(userIds, instanceId, instance.Title, instance.BizType);
}
// ═══════════════════════════════════════════
// 超时自动处理(由 FlowTimeoutJob 调用,无 UserManager 上下文)
// ═══════════════════════════════════════════
///
/// 扫描所有 Pending 且已超过 timeoutHours 的任务,按节点 timeoutAction 执行对应动作。
/// 与 FlowTimeoutJob 扫描等价,抽出公共方法以便管理员接口立即触发(运维/E2E 测试用)。
/// 返回本次处理的任务条数。
///
public async Task ScanTimeoutTasks(CancellationToken stoppingToken = default)
{
var pendingTasks = await _taskRep.Context.Queryable()
.InnerJoin((t, i) => t.InstanceId == i.Id)
.Where((t, i) => t.Status == FlowTaskStatusEnum.Pending
&& i.Status == FlowInstanceStatusEnum.Running)
.Select((t, i) => new
{
TaskId = t.Id,
TaskCreatedAt = t.CreateTime,
NodeId = t.NodeId,
FlowJsonSnapshot = i.FlowJsonSnapshot,
})
.ToListAsync();
var now = DateTime.Now;
var processed = 0;
foreach (var item in pendingTasks)
{
if (stoppingToken.IsCancellationRequested) break;
if (string.IsNullOrWhiteSpace(item.FlowJsonSnapshot)) continue;
ApprovalFlowItem? flowData;
try { flowData = JsonSerializer.Deserialize(item.FlowJsonSnapshot); }
catch { continue; }
var node = flowData?.Nodes?.FirstOrDefault(n => n.Id == item.NodeId);
var props = node?.Properties;
if (props?.TimeoutHours == null || props.TimeoutHours <= 0) continue;
if (string.IsNullOrWhiteSpace(props.TimeoutAction)) continue;
var deadline = item.TaskCreatedAt.AddHours(props.TimeoutHours.Value);
if (now < deadline) continue;
// Notify 动作幂等:已记录过 AutoTimeout 日志的跳过,避免每轮重复催办
if (props.TimeoutAction == "Notify")
{
var alreadyNotified = await _logRep.AsQueryable()
.AnyAsync(log => log.TaskId == item.TaskId
&& log.Action == FlowLogActionEnum.AutoTimeout);
if (alreadyNotified) continue;
}
try
{
await HandleTimeoutTask(item.TaskId);
processed++;
}
catch
{
// 单任务异常隔离:吞掉继续下一个,由 Job/Admin API 层统一记日志
}
}
return processed;
}
///
/// 处理单个超时任务(由定时任务调用)
///
public async Task HandleTimeoutTask(long taskId)
{
var task = await _taskRep.GetByIdAsync(taskId);
if (task == null || task.Status != FlowTaskStatusEnum.Pending) return;
var instance = await _instanceRep.GetByIdAsync(task.InstanceId);
if (instance == null || instance.Status != FlowInstanceStatusEnum.Running) return;
var flowData = DeserializeFlowJson(instance.FlowJsonSnapshot);
var node = flowData.Nodes.FirstOrDefault(n => n.Id == task.NodeId);
var props = node?.Properties;
if (props == null) return;
switch (props.TimeoutAction)
{
case "Notify":
await _notifyService.NotifyTimeout(
new List { task.AssigneeId }, instance.Id, instance.Title, instance.BizType);
await WriteSystemLog(instance.Id, task.Id, task.NodeId,
FlowLogActionEnum.AutoTimeout, "审批超时,已发送提醒通知");
break;
case "AutoApprove":
await AutoApproveTask(task, instance);
break;
case "AutoReject":
await AutoRejectTask(task, instance);
break;
case "AutoEscalate":
await AutoEscalateTask(task, props, instance);
break;
}
}
private async Task AutoApproveTask(ApprovalFlowTask task, ApprovalFlowInstance instance)
{
task.Status = FlowTaskStatusEnum.Approved;
task.Comment = "系统自动通过(超时)";
task.ActionTime = DateTime.Now;
await _taskRep.AsUpdateable(task).ExecuteCommandAsync();
await WriteSystemLog(instance.Id, task.Id, task.NodeId,
FlowLogActionEnum.AutoTimeout, "审批超时,系统自动通过");
if (await IsNodeCompleted(instance, task.NodeId))
{
// P4-17: 系统自动通过,无人工审批人,approverUserId = null
await InvokeHandler(instance.BizType,
h => h.OnNodeCompleted(instance.BizId, instance.Id, task.NodeId, task.NodeName ?? "", null));
var flowData = DeserializeFlowJson(instance.FlowJsonSnapshot);
await AdvanceToNext(instance, flowData, task.NodeId);
}
}
private async Task AutoRejectTask(ApprovalFlowTask task, ApprovalFlowInstance instance)
{
task.Status = FlowTaskStatusEnum.Rejected;
task.Comment = "系统自动拒绝(超时)";
task.ActionTime = DateTime.Now;
await _taskRep.AsUpdateable(task).ExecuteCommandAsync();
await CancelPendingTasks(task.InstanceId, task.NodeId, task.Id);
instance.Status = FlowInstanceStatusEnum.Rejected;
instance.EndTime = DateTime.Now;
await _instanceRep.AsUpdateable(instance).ExecuteCommandAsync();
await WriteSystemLog(instance.Id, task.Id, task.NodeId,
FlowLogActionEnum.AutoTimeout, "审批超时,系统自动拒绝");
// P4-17: 系统自动拒绝,无人工审批人,lastApproverId = null
await InvokeHandler(instance.BizType,
h => h.OnFlowCompleted(instance.BizId, instance.Id, FlowInstanceStatusEnum.Rejected, null));
await _notifyService.NotifyFlowCompleted(instance.InitiatorId, instance.Id,
instance.Title, FlowInstanceStatusEnum.Rejected, instance.BizType);
}
private async Task AutoEscalateTask(ApprovalFlowTask task, FlowProperties nodeProps, ApprovalFlowInstance instance)
{
if (string.IsNullOrWhiteSpace(nodeProps.EscalationApproverType)
|| string.IsNullOrWhiteSpace(nodeProps.EscalationApproverIds))
return;
task.Status = FlowTaskStatusEnum.Escalated;
task.Comment = "系统自动升级(超时)";
task.ActionTime = DateTime.Now;
await _taskRep.AsUpdateable(task).ExecuteCommandAsync();
await CancelPendingTasks(task.InstanceId, task.NodeId, task.Id);
var approvers = await ResolveApprovers(
new FlowProperties
{
ApproverType = nodeProps.EscalationApproverType,
ApproverIds = nodeProps.EscalationApproverIds,
},
instance.InitiatorId);
if (approvers.Count == 0) return;
var newTasks = approvers.Select(a => new ApprovalFlowTask
{
InstanceId = instance.Id,
NodeId = task.NodeId,
NodeName = task.NodeName,
AssigneeId = a.userId,
AssigneeName = a.userName,
Status = FlowTaskStatusEnum.Pending,
OrgId = instance.OrgId,
}).ToList();
await _taskRep.AsInsertable(newTasks).ExecuteCommandAsync();
var targetNames = string.Join(", ", approvers.Select(a => a.userName));
await WriteSystemLog(instance.Id, task.Id, task.NodeId,
FlowLogActionEnum.AutoTimeout, $"审批超时,自动升级给 {targetNames}");
var targetUserIds = approvers.Select(a => a.userId).Distinct().ToList();
await _notifyService.NotifyEscalated(targetUserIds, instance.Id, instance.Title,
"系统", task.NodeName, instance.BizType);
}
private async Task WriteSystemLog(long instanceId, long? taskId, string? nodeId,
FlowLogActionEnum action, string? comment)
{
await _logRep.InsertAsync(new ApprovalFlowLog
{
InstanceId = instanceId,
TaskId = taskId,
NodeId = nodeId,
Action = action,
OperatorId = 0,
OperatorName = "系统",
Comment = comment,
});
}
// ═══════════════════════════════════════════
// 内部引擎方法
// ═══════════════════════════════════════════
///
/// 推进到下一节点。支持并行网关(Fork / Join)。
/// 行为约定:
/// - 进入本方法前,调用方(如 )应已将 (userTask)标记为完成节点;
/// 本方法会将途经的网关节点也写入 。
/// - 并行网关 Fork(出边>=2):沿每条出边递归推进;
/// 并行网关 Join(入边>=2):校验所有前驱节点是否都在"已完成"集合中,
/// 任一尚未完成则**静默等待**(不报错、不推进),由后续分支完成后再次触发 Join 校验。
///
private async Task AdvanceToNext(ApprovalFlowInstance instance, ApprovalFlowItem flowData, string currentNodeId)
{
// 当前节点可能是 userTask(完成记录由 Approve 写入)或网关(入口处已写入);此处统一确保幂等入库
var currentNode = flowData.Nodes.FirstOrDefault(n => n.Id == currentNodeId);
await MarkNodeCompleted(instance.Id, currentNode);
var outgoingEdges = flowData.Edges.Where(e => e.SourceNodeId == currentNodeId).ToList();
if (outgoingEdges.Count == 0)
{
await CompleteInstance(instance, FlowInstanceStatusEnum.Approved);
return;
}
// 非并行网关场景:当前节点一般只有 1 条出边
foreach (var edge in outgoingEdges)
{
await ProcessNextNode(instance, flowData, edge.TargetNodeId);
}
}
///
/// 处理某个"下一节点"。根据节点类型分发:
/// - endEvent:所有分支任务都结束时触发实例完成
/// - exclusiveGateway:按条件选择分支
/// - parallelGateway:Fork 并行分发;Join 等待所有前驱完成
/// - userTask / 其他:创建任务
///
private async Task ProcessNextNode(ApprovalFlowInstance instance, ApprovalFlowItem flowData, string nextNodeId)
{
var nextNode = flowData.Nodes.FirstOrDefault(n => n.Id == nextNodeId);
if (nextNode == null)
{
await CompleteInstance(instance, FlowInstanceStatusEnum.Approved);
return;
}
if (nextNode.Type is "bpmn:endEvent" or "end-node")
{
// 所有并行分支都已结束(无其它 Pending 任务)才真正完成实例
var hasOtherPending = await _taskRep.AsQueryable()
.AnyAsync(t => t.InstanceId == instance.Id && t.Status == FlowTaskStatusEnum.Pending);
if (hasOtherPending) return;
await CompleteInstance(instance, FlowInstanceStatusEnum.Approved);
return;
}
if (nextNode.Type is "bpmn:exclusiveGateway")
{
await MarkNodeCompleted(instance.Id, nextNode);
var bizData = await GetBizData(instance.BizType, instance.BizId);
var targetNodeId = EvaluateGateway(nextNode.Properties?.Conditions, flowData, nextNode.Id, bizData);
instance.CurrentNodeId = targetNodeId;
await _instanceRep.AsUpdateable(instance).UpdateColumns(i => new { i.CurrentNodeId }).ExecuteCommandAsync();
await ProcessNextNode(instance, flowData, targetNodeId);
return;
}
if (nextNode.Type is "bpmn:parallelGateway")
{
var incoming = flowData.Edges.Where(e => e.TargetNodeId == nextNode.Id).Select(e => e.SourceNodeId).ToList();
var outgoing = flowData.Edges.Where(e => e.SourceNodeId == nextNode.Id).Select(e => e.TargetNodeId).ToList();
// Join 语义:入边 >= 2,需等所有前驱都已完成
if (incoming.Count >= 2)
{
var completedSet = await GetCompletedNodeIdSet(instance.Id);
if (!incoming.All(p => completedSet.Contains(p)))
{
// 未汇合,静默等待后续分支抵达
return;
}
}
// Fork 或 Join 通过:标记网关完成,沿所有出边推进
await MarkNodeCompleted(instance.Id, nextNode);
foreach (var target in outgoing)
{
await ProcessNextNode(instance, flowData, target);
}
return;
}
// userTask 或其他:创建任务
instance.CurrentNodeId = nextNodeId;
await _instanceRep.AsUpdateable(instance).UpdateColumns(i => new { i.CurrentNodeId }).ExecuteCommandAsync();
await CreateTasksForNode(instance, flowData, nextNodeId);
}
///
/// 标记节点已完成(幂等:重复写入被唯一索引拦截后忽略)
///
private async Task MarkNodeCompleted(long instanceId, ApprovalFlowNodeItem? node)
{
if (node == null) return;
try
{
await _completedNodeRep.InsertAsync(new ApprovalFlowCompletedNode
{
InstanceId = instanceId,
NodeId = node.Id,
NodeName = node.Properties?.NodeName ?? node.Text?.Value,
NodeType = node.Type,
CompletedTime = DateTime.Now,
});
}
catch
{
// 并发场景下可能触发唯一索引冲突,忽略(已存在即可)
}
}
///
/// 查询实例已完成节点 Id 集合
///
private async Task> GetCompletedNodeIdSet(long instanceId)
{
var ids = await _completedNodeRep.AsQueryable()
.Where(c => c.InstanceId == instanceId)
.Select(c => c.NodeId)
.ToListAsync();
return new HashSet(ids);
}
private async Task CompleteInstance(ApprovalFlowInstance instance, FlowInstanceStatusEnum status)
{
// 幂等:并行分支同时到达 end 时避免重复完成
var latest = await _instanceRep.GetByIdAsync(instance.Id);
if (latest == null || latest.Status != FlowInstanceStatusEnum.Running) return;
instance.Status = status;
instance.EndTime = DateTime.Now;
await _instanceRep.AsUpdateable(instance)
.UpdateColumns(i => new { i.Status, i.EndTime })
.ExecuteCommandAsync();
// P4-17: 正常完成路径的 lastApproverId = 最后一条人工 Approve 日志的操作人(系统自动通过不算)
var lastApproverId = status == FlowInstanceStatusEnum.Approved
? await GetLastHumanApproverIdAsync(instance.Id)
: null;
await InvokeHandler(instance.BizType,
h => h.OnFlowCompleted(instance.BizId, instance.Id, status, lastApproverId));
await _notifyService.NotifyFlowCompleted(instance.InitiatorId, instance.Id, instance.Title, status, instance.BizType);
}
///
/// P4-17: 查询该实例最后一条人工 Approve 日志的操作人 UserId
/// (系统超时自动通过写入的是 AutoTimeout,不会被此查询命中)
///
private async Task GetLastHumanApproverIdAsync(long instanceId)
{
var lastLog = await _logRep.AsQueryable()
.Where(l => l.InstanceId == instanceId && l.Action == FlowLogActionEnum.Approve)
.OrderByDescending(l => l.CreateTime)
.FirstAsync();
return lastLog?.OperatorId > 0 ? lastLog.OperatorId : null;
}
private async Task CreateTasksForNode(ApprovalFlowInstance instance, ApprovalFlowItem flowData, string nodeId)
{
var node = flowData.Nodes.FirstOrDefault(n => n.Id == nodeId)
?? throw Oops.Oh($"FlowJson 中未找到节点 [{nodeId}]");
var approvers = await ResolveApprovers(node.Properties, instance.InitiatorId);
if (approvers.Count == 0)
throw Oops.Oh($"节点 [{node.Properties?.NodeName ?? nodeId}] 未配置审批人或审批人列表为空");
var nodeName = node.Properties?.NodeName ?? node.Text?.Value;
var tasks = approvers.Select(a => new ApprovalFlowTask
{
InstanceId = instance.Id,
NodeId = nodeId,
NodeName = nodeName,
AssigneeId = a.userId,
AssigneeName = a.userName,
Status = FlowTaskStatusEnum.Pending,
OrgId = instance.OrgId,
}).ToList();
// P1-6 审批代理:为每个原审批人检查是否存在有效代理,是则并行创建一条代理任务
var delegateTasks = new List();
foreach (var a in approvers)
{
var del = await FindEffectiveDelegate(a.userId, instance.BizType);
if (del == null) continue;
// 代理人和原审批人不能重复,代理人也不能是原审批人列表中其他人(避免同一人两条任务)
if (approvers.Any(x => x.userId == del.DelegateUserId)) continue;
delegateTasks.Add(new ApprovalFlowTask
{
InstanceId = instance.Id,
NodeId = nodeId,
NodeName = nodeName,
AssigneeId = del.DelegateUserId,
AssigneeName = del.DelegateUserName,
Status = FlowTaskStatusEnum.Pending,
OrgId = instance.OrgId,
IsDelegate = true,
DelegateForUserId = a.userId,
DelegateForUserName = a.userName,
});
}
var allTasks = tasks.Concat(delegateTasks).ToList();
await _taskRep.AsInsertable(allTasks).ExecuteCommandAsync();
var assigneeIds = allTasks.Select(t => t.AssigneeId).Distinct().ToList();
await _notifyService.NotifyNewTask(assigneeIds, instance.Id, instance.Title, nodeName, instance.BizType);
}
///
/// 查找指定用户当前生效的审批代理(时间窗口内 + 已启用 + BizType 匹配或全局)
///
private async Task FindEffectiveDelegate(long userId, string? bizType)
{
var now = DateTime.Now;
return await _delegateRep.AsQueryable()
.Where(d => d.UserId == userId
&& d.IsEnabled
&& d.StartTime <= now
&& d.EndTime >= now
&& (string.IsNullOrEmpty(d.BizType) || d.BizType == bizType))
.OrderBy(d => d.BizType == null ? 1 : 0) // 优先匹配指定 BizType 的代理
.OrderByDescending(d => d.CreateTime)
.FirstAsync();
}
///
/// 取消与已完成任务配对的代理任务(P1-6)
/// - 本人任务完成:取消对应的代理任务
/// - 代理任务完成:取消对应的本人任务
///
private async Task CancelPairedDelegateTask(ApprovalFlowTask completedTask)
{
ApprovalFlowTask? paired;
if (completedTask.IsDelegate)
{
var originalUserId = completedTask.DelegateForUserId ?? 0;
if (originalUserId == 0) return;
paired = await _taskRep.AsQueryable()
.Where(t => t.InstanceId == completedTask.InstanceId
&& t.NodeId == completedTask.NodeId
&& t.Status == FlowTaskStatusEnum.Pending
&& t.AssigneeId == originalUserId
&& !t.IsDelegate)
.FirstAsync();
}
else
{
paired = await _taskRep.AsQueryable()
.Where(t => t.InstanceId == completedTask.InstanceId
&& t.NodeId == completedTask.NodeId
&& t.Status == FlowTaskStatusEnum.Pending
&& t.IsDelegate
&& t.DelegateForUserId == completedTask.AssigneeId)
.FirstAsync();
}
if (paired == null) return;
paired.Status = FlowTaskStatusEnum.Cancelled;
paired.ActionTime = DateTime.Now;
await _taskRep.AsUpdateable(paired).ExecuteCommandAsync();
}
private async Task> ResolveApprovers(FlowProperties? props, long initiatorId)
{
if (props == null || string.IsNullOrWhiteSpace(props.ApproverType))
return new List<(long, string)>();
var approverType = props.ApproverType;
if (approverType == nameof(ApproverTypeEnum.Initiator))
{
var initiator = await _userRep.GetByIdAsync(initiatorId);
return initiator != null
? new List<(long, string)> { (initiator.Id, initiator.RealName ?? "") }
: new List<(long, string)>();
}
if (string.IsNullOrWhiteSpace(props.ApproverIds))
return new List<(long, string)>();
var ids = props.ApproverIds.Split(',', StringSplitOptions.RemoveEmptyEntries)
.Select(s => long.TryParse(s.Trim(), out var v) ? v : 0).Where(id => id > 0).ToList();
if (approverType == nameof(ApproverTypeEnum.SpecificUser))
{
var users = await _userRep.AsQueryable()
.Where(u => ids.Contains(u.Id)).ToListAsync();
return users.Select(u => (u.Id, u.RealName ?? "")).ToList();
}
if (approverType == nameof(ApproverTypeEnum.Role))
{
var userIds = await _userRoleRep.AsQueryable()
.Where(ur => ids.Contains(ur.RoleId))
.Select(ur => ur.UserId)
.ToListAsync();
var users = await _userRep.AsQueryable()
.Where(u => userIds.Contains(u.Id)).ToListAsync();
return users.Select(u => (u.Id, u.RealName ?? "")).ToList();
}
if (approverType == nameof(ApproverTypeEnum.Department))
{
var users = await _userRep.AsQueryable()
.Where(u => ids.Contains(u.OrgId)).ToListAsync();
return users.Select(u => (u.Id, u.RealName ?? "")).ToList();
}
if (approverType == nameof(ApproverTypeEnum.DepartmentLeader))
{
var initiator = await _userRep.GetByIdAsync(initiatorId);
if (initiator == null || initiator.OrgId <= 0)
return new List<(long, string)>();
var org = await _orgRep.GetByIdAsync(initiator.OrgId);
if (org?.DirectorId != null && org.DirectorId > 0)
{
var director = await _userRep.GetByIdAsync(org.DirectorId.Value);
if (director != null)
return new List<(long, string)> { (director.Id, director.RealName ?? "") };
}
if (initiator.ManagerUserId != null && initiator.ManagerUserId > 0)
{
var manager = await _userRep.GetByIdAsync(initiator.ManagerUserId.Value);
if (manager != null)
return new List<(long, string)> { (manager.Id, manager.RealName ?? "") };
}
return new List<(long, string)>();
}
return new List<(long, string)>();
}
///
/// 评估排他网关 — 依次尝试各非默认分支的条件表达式,首个匹配的获胜;
/// 全不匹配则走默认分支;无默认则走第一条出边
/// 支持简单比较表达式:variable op value(op: ==,!=,>,>=,<,<=)
///
private string EvaluateGateway(List? conditions, ApprovalFlowItem flowData, string gatewayNodeId, Dictionary? bizData)
{
if (conditions != null && conditions.Count > 0 && bizData != null && bizData.Count > 0)
{
foreach (var cond in conditions.Where(c => !c.IsDefault))
{
if (!string.IsNullOrWhiteSpace(cond.Expression) && EvalSimpleExpression(cond.Expression, bizData))
return cond.TargetNodeId;
}
var defaultBranch = conditions.FirstOrDefault(c => c.IsDefault);
if (defaultBranch != null)
return defaultBranch.TargetNodeId;
}
else if (conditions != null && conditions.Count > 0)
{
var defaultBranch = conditions.FirstOrDefault(c => c.IsDefault);
if (defaultBranch != null) return defaultBranch.TargetNodeId;
return conditions.First().TargetNodeId;
}
var edge = flowData.Edges.FirstOrDefault(e => e.SourceNodeId == gatewayNodeId);
return edge?.TargetNodeId ?? throw Oops.Oh("排他网关没有出边");
}
///
/// 简单表达式求值:支持 "field op value" 格式(如 "urgent == 1", "customLevel >= 3", "amount > 10000")
/// 多条件用 && 连接
///
private static bool EvalSimpleExpression(string expression, Dictionary bizData)
{
var parts = expression.Split("&&", StringSplitOptions.TrimEntries);
foreach (var part in parts)
{
if (!EvalSingleComparison(part.Trim(), bizData))
return false;
}
return true;
}
private static bool EvalSingleComparison(string expr, Dictionary bizData)
{
string[] ops = { ">=", "<=", "!=", "==", ">", "<" };
foreach (var op in ops)
{
var idx = expr.IndexOf(op, StringComparison.Ordinal);
if (idx < 0) continue;
var fieldName = expr[..idx].Trim();
var valueStr = expr[(idx + op.Length)..].Trim().Trim('"', '\'');
if (!bizData.TryGetValue(fieldName, out var fieldValue))
return false;
if (decimal.TryParse(fieldValue?.ToString(), out var numLeft) && decimal.TryParse(valueStr, out var numRight))
{
return op switch
{
"==" => numLeft == numRight,
"!=" => numLeft != numRight,
">" => numLeft > numRight,
">=" => numLeft >= numRight,
"<" => numLeft < numRight,
"<=" => numLeft <= numRight,
_ => false,
};
}
var strLeft = fieldValue?.ToString() ?? "";
return op switch
{
"==" => strLeft.Equals(valueStr, StringComparison.OrdinalIgnoreCase),
"!=" => !strLeft.Equals(valueStr, StringComparison.OrdinalIgnoreCase),
_ => false,
};
}
return false;
}
private async Task?> GetBizData(string bizType, long bizId)
{
var handlers = App.GetServices();
var handler = handlers?.FirstOrDefault(h => h.BizType == bizType);
if (handler == null) return null;
return await handler.GetBizData(bizId);
}
private string? FindNextNodeId(ApprovalFlowItem flowData, string currentNodeId)
{
var edge = flowData.Edges.FirstOrDefault(e => e.SourceNodeId == currentNodeId);
return edge?.TargetNodeId;
}
private string? FindPrevUserTaskNodeId(ApprovalFlowItem flowData, string currentNodeId)
{
var inEdge = flowData.Edges.FirstOrDefault(e => e.TargetNodeId == currentNodeId);
if (inEdge == null) return null;
var prevNode = flowData.Nodes.FirstOrDefault(n => n.Id == inEdge.SourceNodeId);
if (prevNode == null) return null;
if (prevNode.Type is "bpmn:userTask" or "user-node" or "task-node")
return prevNode.Id;
// 递归跳过网关等非用户任务节点
return FindPrevUserTaskNodeId(flowData, prevNode.Id);
}
private async Task IsNodeCompleted(ApprovalFlowInstance instance, string nodeId)
{
var flowData = DeserializeFlowJson(instance.FlowJsonSnapshot);
var node = flowData.Nodes.FirstOrDefault(n => n.Id == nodeId);
var mode = node?.Properties?.MultiApproveMode;
if (mode == nameof(MultiApproveModeEnum.All))
{
var pendingCount = await _taskRep.AsQueryable()
.Where(t => t.InstanceId == instance.Id && t.NodeId == nodeId && t.Status == FlowTaskStatusEnum.Pending)
.CountAsync();
return pendingCount == 0;
}
// 默认或签(Any):一人通过即完成,取消其他 Pending
await CancelPendingTasks(instance.Id, nodeId);
return true;
}
private async Task CancelPendingTasks(long instanceId, string nodeId, long? excludeTaskId = null)
{
var tasks = await _taskRep.AsQueryable()
.Where(t => t.InstanceId == instanceId && t.NodeId == nodeId && t.Status == FlowTaskStatusEnum.Pending)
.WhereIF(excludeTaskId.HasValue, t => t.Id != excludeTaskId!.Value)
.ToListAsync();
foreach (var t in tasks)
{
t.Status = FlowTaskStatusEnum.Cancelled;
t.ActionTime = DateTime.Now;
}
if (tasks.Count > 0)
await _taskRep.AsUpdateable(tasks).ExecuteCommandAsync();
}
///
/// 取消整个实例下所有剩余 Pending 任务(Reject 时跨并行分支使用)
///
private async Task CancelAllPendingTasks(long instanceId, long? excludeTaskId = null)
{
var tasks = await _taskRep.AsQueryable()
.Where(t => t.InstanceId == instanceId && t.Status == FlowTaskStatusEnum.Pending)
.WhereIF(excludeTaskId.HasValue, t => t.Id != excludeTaskId!.Value)
.ToListAsync();
foreach (var t in tasks)
{
t.Status = FlowTaskStatusEnum.Cancelled;
t.ActionTime = DateTime.Now;
}
if (tasks.Count > 0)
await _taskRep.AsUpdateable(tasks).ExecuteCommandAsync();
}
private async Task GetPendingTask(long taskId)
{
var task = await _taskRep.GetByIdAsync(taskId)
?? throw Oops.Oh("审批任务不存在");
if (task.Status != FlowTaskStatusEnum.Pending)
throw Oops.Oh("该任务已处理");
if (task.AssigneeId != _userManager.UserId)
throw Oops.Oh("当前用户不是该任务的审批人");
return task;
}
private async Task WriteLog(long instanceId, long? taskId, string? nodeId, FlowLogActionEnum action, string? comment)
{
await _logRep.InsertAsync(new ApprovalFlowLog
{
InstanceId = instanceId,
TaskId = taskId,
NodeId = nodeId,
Action = action,
OperatorId = _userManager.UserId,
OperatorName = _userManager.RealName,
Comment = comment,
});
}
private static ApprovalFlowItem DeserializeFlowJson(string? json)
{
if (string.IsNullOrWhiteSpace(json))
throw Oops.Oh("FlowJson 快照为空");
return JsonSerializer.Deserialize(json)
?? throw Oops.Oh("FlowJson 反序列化失败");
}
private async Task InvokeHandler(string bizType, Func action)
{
var handlers = App.GetServices();
var handler = handlers?.FirstOrDefault(h => h.BizType == bizType);
if (handler != null)
await action(handler);
}
}