"""审批流升级机制 E2E 验证(Batch 5)。 覆盖: 1. 超时自动处理(4 种动作):Notify / AutoApprove / AutoReject / AutoEscalate 2. 手动升级:Escalate API 3. 新增管理员接口 `/api/flowTask/triggerTimeoutScan` 的权限与幂等性 实现方式: - 直接 INSERT 临时审批流定义(5 个 BizType,跑完脚本后自动清理) - 通过 API 起流程、取 pending task - 直接 UPDATE task.CreateTime 将其回拨 2 小时 - 调 triggerTimeoutScan 立刻执行一次 - 断言 task/instance/log 表的状态 """ import sys import time import json import urllib.parse import pymysql import pymysql.cursors import requests BASE = "http://127.0.0.1:5005" API = f"{BASE}/api" DB = dict( host="123.60.180.165", port=3306, user="aidopremote", password="1234567890aiDOP#", database="aidopdev", cursorclass=pymysql.cursors.DictCursor, autocommit=True, ) ACCOUNT = "superAdmin.NET" PASSWORD = "1234567890dop" SUPER_ADMIN_ID = 1300000000101 # superAdmin.NET(发起人 + 首任审批人) ESCALATE_TARGET_ID = 1300000000111 # Admin.NET(升级目标) ESCALATE_TARGET_NAME = "系统管理员" SUFFIX = int(time.time()) checks: list[tuple[str, str, str]] = [] def check(name: str, ok: bool, detail: str = "") -> None: status = "PASS" if ok else "FAIL" checks.append((status, name, detail)) print(f"[{status}] {name}" + (f" — {detail}" if detail else "")) # ───── HTTP ───── def server_encrypt(plain: str) -> str: r = requests.post(f"{API}/sysCommon/encryptPlainText/{urllib.parse.quote(plain, safe='')}") r.raise_for_status() body = r.json() assert body.get("code") == 200, body return body["result"] def login(account: str = ACCOUNT, password: str = PASSWORD) -> str: enc = server_encrypt(password) r = requests.post(f"{API}/sysAuth/login", json={"account": account, "password": enc}) r.raise_for_status() body = r.json() assert body.get("code") == 200, body return body["result"]["accessToken"] def api_post(token: str, path: str, payload: dict | None = None) -> dict: r = requests.post( f"{API}{path}", json=payload or {}, headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"}, ) r.raise_for_status() body = r.json() assert body.get("code") == 200, f"{path} failed: {body}" return body.get("result") # ───── 流程定义生成 ───── def make_flow_json(timeout_action: str | None, enable_manual: bool = False) -> str: """生成单节点流程:开始 → 审批 → 结束。 `timeout_action` 为 None 时不设置超时,用于"手动升级"场景。 """ node_props = { "nodeName": "审批节点", "approverType": "SpecificUser", "approverIds": str(SUPER_ADMIN_ID), "approverNames": "超级管理员", "multiApproveMode": "Any", } if timeout_action: node_props["timeoutHours"] = 1 node_props["timeoutAction"] = timeout_action if timeout_action == "AutoEscalate" or enable_manual: node_props["escalationApproverType"] = "SpecificUser" node_props["escalationApproverIds"] = str(ESCALATE_TARGET_ID) node_props["escalationApproverNames"] = ESCALATE_TARGET_NAME if enable_manual: node_props["enableManualEscalation"] = True return json.dumps({ "nodes": [ {"id": "node_start", "type": "bpmn:startEvent", "x": 100, "y": 200, "properties": {}, "text": {"x": 100, "y": 200, "value": "开始"}}, {"id": "node_task", "type": "bpmn:userTask", "x": 300, "y": 200, "properties": node_props, "text": {"x": 300, "y": 200, "value": "审批节点"}}, {"id": "node_end", "type": "bpmn:endEvent", "x": 500, "y": 200, "properties": {}, "text": {"x": 500, "y": 200, "value": "结束"}}, ], "edges": [ {"id": "e1", "type": "bpmn:sequenceFlow", "sourceNodeId": "node_start", "targetNodeId": "node_task", "startPoint": {"x": 120, "y": 200}, "endPoint": {"x": 280, "y": 200}, "pointsList": []}, {"id": "e2", "type": "bpmn:sequenceFlow", "sourceNodeId": "node_task", "targetNodeId": "node_end", "startPoint": {"x": 320, "y": 200}, "endPoint": {"x": 480, "y": 200}, "pointsList": []}, ], }, ensure_ascii=False) # ───── 数据库辅助 ───── def insert_flow(db: pymysql.connections.Connection, name: str, biz_type: str, timeout_action: str | None, enable_manual: bool = False) -> int: flow_json = make_flow_json(timeout_action, enable_manual) with db.cursor() as c: c.execute( """ INSERT INTO ApprovalFlow (Id, Code, Name, FormJson, FlowJson, Status, OrgId, IsDelete, CreateTime, UpdateTime, CreateUserId, CreateUserName, BizType, Version, IsPublished) VALUES (UUID_SHORT(), %s, %s, '[]', %s, 1, 0, 0, NOW(), NOW(), %s, 'E2E-Escalation', %s, 1, 1) """, (f"ESC_{biz_type}"[:32], f"升级E2E-{name}"[:32], flow_json, SUPER_ADMIN_ID, biz_type), ) c.execute("SELECT Id FROM ApprovalFlow WHERE BizType=%s ORDER BY Id DESC LIMIT 1", (biz_type,)) row = c.fetchone() return int(row["Id"]) def fetch_first_pending_task(db: pymysql.connections.Connection, instance_id: int) -> dict | None: with db.cursor() as c: c.execute( "SELECT Id, AssigneeId, Status, CreateTime FROM ApprovalFlowTask " "WHERE InstanceId=%s AND Status=0 ORDER BY Id LIMIT 1", (instance_id,), ) return c.fetchone() def rollback_task_time(db: pymysql.connections.Connection, task_id: int, hours: int = 2) -> None: with db.cursor() as c: c.execute( "UPDATE ApprovalFlowTask SET CreateTime = DATE_SUB(NOW(), INTERVAL %s HOUR) WHERE Id=%s", (hours, task_id), ) def fetch_task_state(db: pymysql.connections.Connection, task_id: int) -> dict | None: with db.cursor() as c: c.execute("SELECT Id, Status, Comment, ActionTime FROM ApprovalFlowTask WHERE Id=%s", (task_id,)) return c.fetchone() def fetch_instance_state(db: pymysql.connections.Connection, instance_id: int) -> dict | None: with db.cursor() as c: c.execute("SELECT Id, Status, EndTime, CurrentNodeId FROM ApprovalFlowInstance WHERE Id=%s", (instance_id,)) return c.fetchone() def fetch_pending_tasks(db: pymysql.connections.Connection, instance_id: int, exclude_task_id: int | None = None) -> list[dict]: with db.cursor() as c: sql = "SELECT Id, AssigneeId, AssigneeName FROM ApprovalFlowTask WHERE InstanceId=%s AND Status=0" params: list = [instance_id] if exclude_task_id: sql += " AND Id<>%s" params.append(exclude_task_id) c.execute(sql, tuple(params)) return list(c.fetchall()) def fetch_log_actions(db: pymysql.connections.Connection, instance_id: int) -> list[int]: with db.cursor() as c: c.execute("SELECT Action FROM ApprovalFlowLog WHERE InstanceId=%s ORDER BY Id", (instance_id,)) return [int(r["Action"]) for r in c.fetchall()] # ───── 场景辅助 ───── def start(token: str, biz_type: str, biz_id: int, title: str) -> int: return int(api_post(token, "/flowInstance/start", { "bizType": biz_type, "bizId": biz_id, "bizNo": f"E2E_{biz_id}", "title": title, })) # ───── 清理 ───── def cleanup(db: pymysql.connections.Connection, flow_ids: list[int], instance_ids: list[int]) -> None: with db.cursor() as c: if instance_ids: ids = ",".join(str(i) for i in instance_ids) c.execute(f"DELETE FROM ApprovalFlowLog WHERE InstanceId IN ({ids})") c.execute(f"DELETE FROM ApprovalFlowTask WHERE InstanceId IN ({ids})") c.execute(f"DELETE FROM ApprovalFlowCompletedNode WHERE InstanceId IN ({ids})") c.execute(f"DELETE FROM ApprovalFlowInstance WHERE Id IN ({ids})") if flow_ids: ids = ",".join(str(i) for i in flow_ids) c.execute(f"DELETE FROM ApprovalFlow WHERE Id IN ({ids})") # ───── main ───── def main() -> int: try: token = login() except Exception as e: print(f"[FAIL] login: {e}") return 1 print(f"[PASS] login OK") db = pymysql.connect(**DB) created_flows: list[int] = [] created_instances: list[int] = [] try: # ── 预插 5 条流程定义(每条 1 个 BizType) ── cases = [ ("notify", f"EscE2E_N_{SUFFIX}", "Notify", False), ("auto-approve", f"EscE2E_A_{SUFFIX}", "AutoApprove", False), ("auto-reject", f"EscE2E_R_{SUFFIX}", "AutoReject", False), ("auto-escalate", f"EscE2E_E_{SUFFIX}", "AutoEscalate", False), ("manual-escalate", f"EscE2E_M_{SUFFIX}", None, True), ] flow_map: dict[str, tuple[int, str]] = {} for name, biz_type, ta, em in cases: fid = insert_flow(db, name, biz_type, ta, em) created_flows.append(fid) flow_map[name] = (fid, biz_type) check("预建 5 条临时审批流定义", len(created_flows) == 5, f"flow_ids={created_flows}") # ── 起 5 个实例 ── instance_map: dict[str, int] = {} for name, (fid, biz_type) in flow_map.items(): iid = start(token, biz_type, biz_id=fid, title=f"E2E-{name}-{SUFFIX}") instance_map[name] = iid created_instances.append(iid) check("5 个实例成功发起", len(instance_map) == 5, f"instance_ids={created_instances}") # ── 取 Pending task 并回拨 CreateTime(仅超时 4 个,手动升级不回拨) ── task_map: dict[str, int] = {} for name in ("notify", "auto-approve", "auto-reject", "auto-escalate", "manual-escalate"): t = fetch_first_pending_task(db, instance_map[name]) assert t, f"{name} 未生成 Pending task" task_map[name] = int(t["Id"]) if name in ("notify", "auto-approve", "auto-reject", "auto-escalate"): rollback_task_time(db, task_map[name], hours=2) check("4 个超时场景 task.CreateTime 已回拨 2 小时", True) # ── 第一次触发扫描 ── processed = api_post(token, "/flowTask/triggerTimeoutScan") check("triggerTimeoutScan 返回处理条数 >=4", isinstance(processed, int) and processed >= 4, f"processed={processed}") # ── 断言 Notify ── ns = fetch_task_state(db, task_map["notify"]) assert ns check("Notify: task 保持 Pending(0)", ns["Status"] == 0, f"status={ns['Status']}") logs_n = fetch_log_actions(db, instance_map["notify"]) check("Notify: 产生 AutoTimeout(10) 日志", 10 in logs_n, f"logs={logs_n}") # 幂等性:再触发一次扫描,Notify 不应再次写日志 processed2 = api_post(token, "/flowTask/triggerTimeoutScan") logs_n2 = fetch_log_actions(db, instance_map["notify"]) notify_count_1 = logs_n.count(10) notify_count_2 = logs_n2.count(10) check("Notify 幂等:二次扫描不重复写日志", notify_count_1 == notify_count_2, f"first={notify_count_1} second={notify_count_2}") # ── 断言 AutoApprove ── aa = fetch_task_state(db, task_map["auto-approve"]) assert aa check("AutoApprove: task 变 Approved(1)", aa["Status"] == 1, f"status={aa['Status']}") ai = fetch_instance_state(db, instance_map["auto-approve"]) assert ai # 单审批节点通过 → 实例 Approved(2)(因为后继是 endEvent) check("AutoApprove: instance 变 Approved(2)", ai["Status"] == 2, f"status={ai['Status']}") logs_a = fetch_log_actions(db, instance_map["auto-approve"]) check("AutoApprove: 产生 AutoTimeout(10) 日志", 10 in logs_a, f"logs={logs_a}") # ── 断言 AutoReject ── ar = fetch_task_state(db, task_map["auto-reject"]) assert ar check("AutoReject: task 变 Rejected(2)", ar["Status"] == 2, f"status={ar['Status']}") ri = fetch_instance_state(db, instance_map["auto-reject"]) assert ri check("AutoReject: instance 变 Rejected(3)", ri["Status"] == 3, f"status={ri['Status']}") check("AutoReject: EndTime 已写入", ri["EndTime"] is not None) # ── 断言 AutoEscalate ── ae = fetch_task_state(db, task_map["auto-escalate"]) assert ae check("AutoEscalate: 原 task 变 Escalated(6)", ae["Status"] == 6, f"status={ae['Status']}") new_tasks = fetch_pending_tasks(db, instance_map["auto-escalate"], exclude_task_id=task_map["auto-escalate"]) has_target = any(int(t["AssigneeId"]) == ESCALATE_TARGET_ID for t in new_tasks) check("AutoEscalate: 升级目标生成新 Pending 任务", has_target and len(new_tasks) >= 1, f"newTasks={[(t['Id'], t['AssigneeId'], t['AssigneeName']) for t in new_tasks]}") logs_e = fetch_log_actions(db, instance_map["auto-escalate"]) check("AutoEscalate: 产生 AutoTimeout(10) 日志", 10 in logs_e, f"logs={logs_e}") ei = fetch_instance_state(db, instance_map["auto-escalate"]) assert ei check("AutoEscalate: instance 仍 Running(1)", ei["Status"] == 1, f"status={ei['Status']}") # ── 手动升级:调用 /flowTask/escalate ── manual_task_id = task_map["manual-escalate"] # 先验证 GetEscalationConfig 返回 enabled=true cfg_r = requests.get( f"{API}/flowTask/getEscalationConfig", params={"taskId": manual_task_id}, headers={"Authorization": f"Bearer {token}"}, ) cfg = cfg_r.json().get("result") or {} check("Manual: getEscalationConfig 返回 enabled=true", cfg.get("enabled") is True, f"cfg={cfg}") api_post(token, "/flowTask/escalate", {"taskId": manual_task_id, "comment": "E2E 手动升级"}) mt = fetch_task_state(db, manual_task_id) assert mt check("Manual: 原 task 变 Escalated(6)", mt["Status"] == 6, f"status={mt['Status']}") new_manual_tasks = fetch_pending_tasks(db, instance_map["manual-escalate"], exclude_task_id=manual_task_id) has_manual_target = any(int(t["AssigneeId"]) == ESCALATE_TARGET_ID for t in new_manual_tasks) check("Manual: 升级目标生成新 Pending 任务", has_manual_target, f"newTasks={[(t['Id'], t['AssigneeId'], t['AssigneeName']) for t in new_manual_tasks]}") logs_m = fetch_log_actions(db, instance_map["manual-escalate"]) check("Manual: 产生 Escalate(11) 日志", 11 in logs_m, f"logs={logs_m}") # ── 非管理员无权触发扫描(Demo01 = NormalUser 777) ── try: user_token = login("Demo01", "1234567890dop") r = requests.post( f"{API}/flowTask/triggerTimeoutScan", headers={"Authorization": f"Bearer {user_token}"}, ) body = r.json() # 期望 code != 200 且 message 含"管理员" ok = body.get("code") != 200 and body.get("message") and "管理员" in body["message"] check("权限: 非管理员调用 triggerTimeoutScan 被拒", ok, f"body={body}") except Exception as e: check("权限: 非管理员调用 triggerTimeoutScan 被拒", False, f"exception={e}") finally: try: cleanup(db, created_flows, created_instances) print("[INFO] 清理临时流程/实例/任务/日志完成") except Exception as e: print(f"[WARN] 清理失败: {e}") db.close() # 汇总 print("\n================ 汇总 ================") passed = sum(1 for s, _, _ in checks if s == "PASS") failed = sum(1 for s, _, _ in checks if s == "FAIL") print(f"PASS: {passed} / TOTAL: {len(checks)}") if failed: print("\n失败项:") for s, n, d in checks: if s == "FAIL": print(f" - {n}: {d}") return 1 print("全部通过") return 0 if __name__ == "__main__": sys.exit(main())