| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393 |
- """审批流升级机制 E2E 验证(Batch 5)。
- 覆盖:
- 1. 超时自动处理(4 种动作):Notify / AutoApprove / AutoReject / AutoEscalate
- 2. 手动升级:Escalate API
- 3. 新增管理员接口 `/api/flowTask/triggerTimeoutScan` 的权限与幂等性
- 实现方式:
- - 直接 INSERT 临时审批流定义(5 个 BizType,跑完脚本后自动清理)
- - 通过 API 起流程、取 pending task
- - 直接 UPDATE task.CreateTime 将其回拨 2 小时
- - 调 triggerTimeoutScan 立刻执行一次
- - 断言 task/instance/log 表的状态
- """
- import sys
- import time
- import json
- import urllib.parse
- import pymysql
- import pymysql.cursors
- import requests
- BASE = "http://127.0.0.1:5005"
- API = f"{BASE}/api"
- DB = dict(
- host="123.60.180.165",
- port=3306,
- user="aidopremote",
- password="1234567890aiDOP#",
- database="aidopdev",
- cursorclass=pymysql.cursors.DictCursor,
- autocommit=True,
- )
- ACCOUNT = "superAdmin.NET"
- PASSWORD = "1234567890dop"
- SUPER_ADMIN_ID = 1300000000101 # superAdmin.NET(发起人 + 首任审批人)
- ESCALATE_TARGET_ID = 1300000000111 # Admin.NET(升级目标)
- ESCALATE_TARGET_NAME = "系统管理员"
- SUFFIX = int(time.time())
- checks: list[tuple[str, str, str]] = []
- def check(name: str, ok: bool, detail: str = "") -> None:
- status = "PASS" if ok else "FAIL"
- checks.append((status, name, detail))
- print(f"[{status}] {name}" + (f" — {detail}" if detail else ""))
- # ───── HTTP ─────
- def server_encrypt(plain: str) -> str:
- r = requests.post(f"{API}/sysCommon/encryptPlainText/{urllib.parse.quote(plain, safe='')}")
- r.raise_for_status()
- body = r.json()
- assert body.get("code") == 200, body
- return body["result"]
- def login(account: str = ACCOUNT, password: str = PASSWORD) -> str:
- enc = server_encrypt(password)
- r = requests.post(f"{API}/sysAuth/login", json={"account": account, "password": enc})
- r.raise_for_status()
- body = r.json()
- assert body.get("code") == 200, body
- return body["result"]["accessToken"]
- def api_post(token: str, path: str, payload: dict | None = None) -> dict:
- r = requests.post(
- f"{API}{path}",
- json=payload or {},
- headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
- )
- r.raise_for_status()
- body = r.json()
- assert body.get("code") == 200, f"{path} failed: {body}"
- return body.get("result")
- # ───── 流程定义生成 ─────
- def make_flow_json(timeout_action: str | None,
- enable_manual: bool = False) -> str:
- """生成单节点流程:开始 → 审批 → 结束。
- `timeout_action` 为 None 时不设置超时,用于"手动升级"场景。
- """
- node_props = {
- "nodeName": "审批节点",
- "approverType": "SpecificUser",
- "approverIds": str(SUPER_ADMIN_ID),
- "approverNames": "超级管理员",
- "multiApproveMode": "Any",
- }
- if timeout_action:
- node_props["timeoutHours"] = 1
- node_props["timeoutAction"] = timeout_action
- if timeout_action == "AutoEscalate" or enable_manual:
- node_props["escalationApproverType"] = "SpecificUser"
- node_props["escalationApproverIds"] = str(ESCALATE_TARGET_ID)
- node_props["escalationApproverNames"] = ESCALATE_TARGET_NAME
- if enable_manual:
- node_props["enableManualEscalation"] = True
- return json.dumps({
- "nodes": [
- {"id": "node_start", "type": "bpmn:startEvent", "x": 100, "y": 200,
- "properties": {}, "text": {"x": 100, "y": 200, "value": "开始"}},
- {"id": "node_task", "type": "bpmn:userTask", "x": 300, "y": 200,
- "properties": node_props, "text": {"x": 300, "y": 200, "value": "审批节点"}},
- {"id": "node_end", "type": "bpmn:endEvent", "x": 500, "y": 200,
- "properties": {}, "text": {"x": 500, "y": 200, "value": "结束"}},
- ],
- "edges": [
- {"id": "e1", "type": "bpmn:sequenceFlow",
- "sourceNodeId": "node_start", "targetNodeId": "node_task",
- "startPoint": {"x": 120, "y": 200}, "endPoint": {"x": 280, "y": 200},
- "pointsList": []},
- {"id": "e2", "type": "bpmn:sequenceFlow",
- "sourceNodeId": "node_task", "targetNodeId": "node_end",
- "startPoint": {"x": 320, "y": 200}, "endPoint": {"x": 480, "y": 200},
- "pointsList": []},
- ],
- }, ensure_ascii=False)
- # ───── 数据库辅助 ─────
- def insert_flow(db: pymysql.connections.Connection, name: str, biz_type: str,
- timeout_action: str | None, enable_manual: bool = False) -> int:
- flow_json = make_flow_json(timeout_action, enable_manual)
- with db.cursor() as c:
- c.execute(
- """
- INSERT INTO ApprovalFlow
- (Id, Code, Name, FormJson, FlowJson, Status, OrgId, IsDelete,
- CreateTime, UpdateTime, CreateUserId, CreateUserName,
- BizType, Version, IsPublished)
- VALUES (UUID_SHORT(), %s, %s, '[]', %s, 1, 0, 0,
- NOW(), NOW(), %s, 'E2E-Escalation',
- %s, 1, 1)
- """,
- (f"ESC_{biz_type}"[:32], f"升级E2E-{name}"[:32], flow_json,
- SUPER_ADMIN_ID, biz_type),
- )
- c.execute("SELECT Id FROM ApprovalFlow WHERE BizType=%s ORDER BY Id DESC LIMIT 1", (biz_type,))
- row = c.fetchone()
- return int(row["Id"])
- def fetch_first_pending_task(db: pymysql.connections.Connection, instance_id: int) -> dict | None:
- with db.cursor() as c:
- c.execute(
- "SELECT Id, AssigneeId, Status, CreateTime FROM ApprovalFlowTask "
- "WHERE InstanceId=%s AND Status=0 ORDER BY Id LIMIT 1",
- (instance_id,),
- )
- return c.fetchone()
- def rollback_task_time(db: pymysql.connections.Connection, task_id: int, hours: int = 2) -> None:
- with db.cursor() as c:
- c.execute(
- "UPDATE ApprovalFlowTask SET CreateTime = DATE_SUB(NOW(), INTERVAL %s HOUR) WHERE Id=%s",
- (hours, task_id),
- )
- def fetch_task_state(db: pymysql.connections.Connection, task_id: int) -> dict | None:
- with db.cursor() as c:
- c.execute("SELECT Id, Status, Comment, ActionTime FROM ApprovalFlowTask WHERE Id=%s", (task_id,))
- return c.fetchone()
- def fetch_instance_state(db: pymysql.connections.Connection, instance_id: int) -> dict | None:
- with db.cursor() as c:
- c.execute("SELECT Id, Status, EndTime, CurrentNodeId FROM ApprovalFlowInstance WHERE Id=%s", (instance_id,))
- return c.fetchone()
- def fetch_pending_tasks(db: pymysql.connections.Connection, instance_id: int, exclude_task_id: int | None = None) -> list[dict]:
- with db.cursor() as c:
- sql = "SELECT Id, AssigneeId, AssigneeName FROM ApprovalFlowTask WHERE InstanceId=%s AND Status=0"
- params: list = [instance_id]
- if exclude_task_id:
- sql += " AND Id<>%s"
- params.append(exclude_task_id)
- c.execute(sql, tuple(params))
- return list(c.fetchall())
- def fetch_log_actions(db: pymysql.connections.Connection, instance_id: int) -> list[int]:
- with db.cursor() as c:
- c.execute("SELECT Action FROM ApprovalFlowLog WHERE InstanceId=%s ORDER BY Id", (instance_id,))
- return [int(r["Action"]) for r in c.fetchall()]
- # ───── 场景辅助 ─────
- def start(token: str, biz_type: str, biz_id: int, title: str) -> int:
- return int(api_post(token, "/flowInstance/start", {
- "bizType": biz_type,
- "bizId": biz_id,
- "bizNo": f"E2E_{biz_id}",
- "title": title,
- }))
- # ───── 清理 ─────
- def cleanup(db: pymysql.connections.Connection, flow_ids: list[int], instance_ids: list[int]) -> None:
- with db.cursor() as c:
- if instance_ids:
- ids = ",".join(str(i) for i in instance_ids)
- c.execute(f"DELETE FROM ApprovalFlowLog WHERE InstanceId IN ({ids})")
- c.execute(f"DELETE FROM ApprovalFlowTask WHERE InstanceId IN ({ids})")
- c.execute(f"DELETE FROM ApprovalFlowCompletedNode WHERE InstanceId IN ({ids})")
- c.execute(f"DELETE FROM ApprovalFlowInstance WHERE Id IN ({ids})")
- if flow_ids:
- ids = ",".join(str(i) for i in flow_ids)
- c.execute(f"DELETE FROM ApprovalFlow WHERE Id IN ({ids})")
- # ───── main ─────
- def main() -> int:
- try:
- token = login()
- except Exception as e:
- print(f"[FAIL] login: {e}")
- return 1
- print(f"[PASS] login OK")
- db = pymysql.connect(**DB)
- created_flows: list[int] = []
- created_instances: list[int] = []
- try:
- # ── 预插 5 条流程定义(每条 1 个 BizType) ──
- cases = [
- ("notify", f"EscE2E_N_{SUFFIX}", "Notify", False),
- ("auto-approve", f"EscE2E_A_{SUFFIX}", "AutoApprove", False),
- ("auto-reject", f"EscE2E_R_{SUFFIX}", "AutoReject", False),
- ("auto-escalate", f"EscE2E_E_{SUFFIX}", "AutoEscalate", False),
- ("manual-escalate", f"EscE2E_M_{SUFFIX}", None, True),
- ]
- flow_map: dict[str, tuple[int, str]] = {}
- for name, biz_type, ta, em in cases:
- fid = insert_flow(db, name, biz_type, ta, em)
- created_flows.append(fid)
- flow_map[name] = (fid, biz_type)
- check("预建 5 条临时审批流定义", len(created_flows) == 5, f"flow_ids={created_flows}")
- # ── 起 5 个实例 ──
- instance_map: dict[str, int] = {}
- for name, (fid, biz_type) in flow_map.items():
- iid = start(token, biz_type, biz_id=fid, title=f"E2E-{name}-{SUFFIX}")
- instance_map[name] = iid
- created_instances.append(iid)
- check("5 个实例成功发起", len(instance_map) == 5, f"instance_ids={created_instances}")
- # ── 取 Pending task 并回拨 CreateTime(仅超时 4 个,手动升级不回拨) ──
- task_map: dict[str, int] = {}
- for name in ("notify", "auto-approve", "auto-reject", "auto-escalate", "manual-escalate"):
- t = fetch_first_pending_task(db, instance_map[name])
- assert t, f"{name} 未生成 Pending task"
- task_map[name] = int(t["Id"])
- if name in ("notify", "auto-approve", "auto-reject", "auto-escalate"):
- rollback_task_time(db, task_map[name], hours=2)
- check("4 个超时场景 task.CreateTime 已回拨 2 小时", True)
- # ── 第一次触发扫描 ──
- processed = api_post(token, "/flowTask/triggerTimeoutScan")
- check("triggerTimeoutScan 返回处理条数 >=4", isinstance(processed, int) and processed >= 4, f"processed={processed}")
- # ── 断言 Notify ──
- ns = fetch_task_state(db, task_map["notify"])
- assert ns
- check("Notify: task 保持 Pending(0)", ns["Status"] == 0, f"status={ns['Status']}")
- logs_n = fetch_log_actions(db, instance_map["notify"])
- check("Notify: 产生 AutoTimeout(10) 日志", 10 in logs_n, f"logs={logs_n}")
- # 幂等性:再触发一次扫描,Notify 不应再次写日志
- processed2 = api_post(token, "/flowTask/triggerTimeoutScan")
- logs_n2 = fetch_log_actions(db, instance_map["notify"])
- notify_count_1 = logs_n.count(10)
- notify_count_2 = logs_n2.count(10)
- check("Notify 幂等:二次扫描不重复写日志", notify_count_1 == notify_count_2,
- f"first={notify_count_1} second={notify_count_2}")
- # ── 断言 AutoApprove ──
- aa = fetch_task_state(db, task_map["auto-approve"])
- assert aa
- check("AutoApprove: task 变 Approved(1)", aa["Status"] == 1, f"status={aa['Status']}")
- ai = fetch_instance_state(db, instance_map["auto-approve"])
- assert ai
- # 单审批节点通过 → 实例 Approved(2)(因为后继是 endEvent)
- check("AutoApprove: instance 变 Approved(2)", ai["Status"] == 2, f"status={ai['Status']}")
- logs_a = fetch_log_actions(db, instance_map["auto-approve"])
- check("AutoApprove: 产生 AutoTimeout(10) 日志", 10 in logs_a, f"logs={logs_a}")
- # ── 断言 AutoReject ──
- ar = fetch_task_state(db, task_map["auto-reject"])
- assert ar
- check("AutoReject: task 变 Rejected(2)", ar["Status"] == 2, f"status={ar['Status']}")
- ri = fetch_instance_state(db, instance_map["auto-reject"])
- assert ri
- check("AutoReject: instance 变 Rejected(3)", ri["Status"] == 3, f"status={ri['Status']}")
- check("AutoReject: EndTime 已写入", ri["EndTime"] is not None)
- # ── 断言 AutoEscalate ──
- ae = fetch_task_state(db, task_map["auto-escalate"])
- assert ae
- check("AutoEscalate: 原 task 变 Escalated(6)", ae["Status"] == 6, f"status={ae['Status']}")
- new_tasks = fetch_pending_tasks(db, instance_map["auto-escalate"], exclude_task_id=task_map["auto-escalate"])
- has_target = any(int(t["AssigneeId"]) == ESCALATE_TARGET_ID for t in new_tasks)
- check("AutoEscalate: 升级目标生成新 Pending 任务",
- has_target and len(new_tasks) >= 1,
- f"newTasks={[(t['Id'], t['AssigneeId'], t['AssigneeName']) for t in new_tasks]}")
- logs_e = fetch_log_actions(db, instance_map["auto-escalate"])
- check("AutoEscalate: 产生 AutoTimeout(10) 日志", 10 in logs_e, f"logs={logs_e}")
- ei = fetch_instance_state(db, instance_map["auto-escalate"])
- assert ei
- check("AutoEscalate: instance 仍 Running(1)", ei["Status"] == 1, f"status={ei['Status']}")
- # ── 手动升级:调用 /flowTask/escalate ──
- manual_task_id = task_map["manual-escalate"]
- # 先验证 GetEscalationConfig 返回 enabled=true
- cfg_r = requests.get(
- f"{API}/flowTask/getEscalationConfig",
- params={"taskId": manual_task_id},
- headers={"Authorization": f"Bearer {token}"},
- )
- cfg = cfg_r.json().get("result") or {}
- check("Manual: getEscalationConfig 返回 enabled=true",
- cfg.get("enabled") is True, f"cfg={cfg}")
- api_post(token, "/flowTask/escalate", {"taskId": manual_task_id, "comment": "E2E 手动升级"})
- mt = fetch_task_state(db, manual_task_id)
- assert mt
- check("Manual: 原 task 变 Escalated(6)", mt["Status"] == 6, f"status={mt['Status']}")
- new_manual_tasks = fetch_pending_tasks(db, instance_map["manual-escalate"], exclude_task_id=manual_task_id)
- has_manual_target = any(int(t["AssigneeId"]) == ESCALATE_TARGET_ID for t in new_manual_tasks)
- check("Manual: 升级目标生成新 Pending 任务", has_manual_target,
- f"newTasks={[(t['Id'], t['AssigneeId'], t['AssigneeName']) for t in new_manual_tasks]}")
- logs_m = fetch_log_actions(db, instance_map["manual-escalate"])
- check("Manual: 产生 Escalate(11) 日志", 11 in logs_m, f"logs={logs_m}")
- # ── 非管理员无权触发扫描(Demo01 = NormalUser 777) ──
- try:
- user_token = login("Demo01", "1234567890dop")
- r = requests.post(
- f"{API}/flowTask/triggerTimeoutScan",
- headers={"Authorization": f"Bearer {user_token}"},
- )
- body = r.json()
- # 期望 code != 200 且 message 含"管理员"
- ok = body.get("code") != 200 and body.get("message") and "管理员" in body["message"]
- check("权限: 非管理员调用 triggerTimeoutScan 被拒", ok, f"body={body}")
- except Exception as e:
- check("权限: 非管理员调用 triggerTimeoutScan 被拒", False, f"exception={e}")
- finally:
- try:
- cleanup(db, created_flows, created_instances)
- print("[INFO] 清理临时流程/实例/任务/日志完成")
- except Exception as e:
- print(f"[WARN] 清理失败: {e}")
- db.close()
- # 汇总
- print("\n================ 汇总 ================")
- passed = sum(1 for s, _, _ in checks if s == "PASS")
- failed = sum(1 for s, _, _ in checks if s == "FAIL")
- print(f"PASS: {passed} / TOTAL: {len(checks)}")
- if failed:
- print("\n失败项:")
- for s, n, d in checks:
- if s == "FAIL":
- print(f" - {n}: {d}")
- return 1
- print("全部通过")
- return 0
- if __name__ == "__main__":
- sys.exit(main())
|