_verify_escalation.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393
  1. """审批流升级机制 E2E 验证(Batch 5)。
  2. 覆盖:
  3. 1. 超时自动处理(4 种动作):Notify / AutoApprove / AutoReject / AutoEscalate
  4. 2. 手动升级:Escalate API
  5. 3. 新增管理员接口 `/api/flowTask/triggerTimeoutScan` 的权限与幂等性
  6. 实现方式:
  7. - 直接 INSERT 临时审批流定义(5 个 BizType,跑完脚本后自动清理)
  8. - 通过 API 起流程、取 pending task
  9. - 直接 UPDATE task.CreateTime 将其回拨 2 小时
  10. - 调 triggerTimeoutScan 立刻执行一次
  11. - 断言 task/instance/log 表的状态
  12. """
  13. import sys
  14. import time
  15. import json
  16. import urllib.parse
  17. import pymysql
  18. import pymysql.cursors
  19. import requests
  20. BASE = "http://127.0.0.1:5005"
  21. API = f"{BASE}/api"
  22. DB = dict(
  23. host="123.60.180.165",
  24. port=3306,
  25. user="aidopremote",
  26. password="1234567890aiDOP#",
  27. database="aidopdev",
  28. cursorclass=pymysql.cursors.DictCursor,
  29. autocommit=True,
  30. )
  31. ACCOUNT = "superAdmin.NET"
  32. PASSWORD = "1234567890dop"
  33. SUPER_ADMIN_ID = 1300000000101 # superAdmin.NET(发起人 + 首任审批人)
  34. ESCALATE_TARGET_ID = 1300000000111 # Admin.NET(升级目标)
  35. ESCALATE_TARGET_NAME = "系统管理员"
  36. SUFFIX = int(time.time())
  37. checks: list[tuple[str, str, str]] = []
  38. def check(name: str, ok: bool, detail: str = "") -> None:
  39. status = "PASS" if ok else "FAIL"
  40. checks.append((status, name, detail))
  41. print(f"[{status}] {name}" + (f" — {detail}" if detail else ""))
  42. # ───── HTTP ─────
  43. def server_encrypt(plain: str) -> str:
  44. r = requests.post(f"{API}/sysCommon/encryptPlainText/{urllib.parse.quote(plain, safe='')}")
  45. r.raise_for_status()
  46. body = r.json()
  47. assert body.get("code") == 200, body
  48. return body["result"]
  49. def login(account: str = ACCOUNT, password: str = PASSWORD) -> str:
  50. enc = server_encrypt(password)
  51. r = requests.post(f"{API}/sysAuth/login", json={"account": account, "password": enc})
  52. r.raise_for_status()
  53. body = r.json()
  54. assert body.get("code") == 200, body
  55. return body["result"]["accessToken"]
  56. def api_post(token: str, path: str, payload: dict | None = None) -> dict:
  57. r = requests.post(
  58. f"{API}{path}",
  59. json=payload or {},
  60. headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
  61. )
  62. r.raise_for_status()
  63. body = r.json()
  64. assert body.get("code") == 200, f"{path} failed: {body}"
  65. return body.get("result")
  66. # ───── 流程定义生成 ─────
  67. def make_flow_json(timeout_action: str | None,
  68. enable_manual: bool = False) -> str:
  69. """生成单节点流程:开始 → 审批 → 结束。
  70. `timeout_action` 为 None 时不设置超时,用于"手动升级"场景。
  71. """
  72. node_props = {
  73. "nodeName": "审批节点",
  74. "approverType": "SpecificUser",
  75. "approverIds": str(SUPER_ADMIN_ID),
  76. "approverNames": "超级管理员",
  77. "multiApproveMode": "Any",
  78. }
  79. if timeout_action:
  80. node_props["timeoutHours"] = 1
  81. node_props["timeoutAction"] = timeout_action
  82. if timeout_action == "AutoEscalate" or enable_manual:
  83. node_props["escalationApproverType"] = "SpecificUser"
  84. node_props["escalationApproverIds"] = str(ESCALATE_TARGET_ID)
  85. node_props["escalationApproverNames"] = ESCALATE_TARGET_NAME
  86. if enable_manual:
  87. node_props["enableManualEscalation"] = True
  88. return json.dumps({
  89. "nodes": [
  90. {"id": "node_start", "type": "bpmn:startEvent", "x": 100, "y": 200,
  91. "properties": {}, "text": {"x": 100, "y": 200, "value": "开始"}},
  92. {"id": "node_task", "type": "bpmn:userTask", "x": 300, "y": 200,
  93. "properties": node_props, "text": {"x": 300, "y": 200, "value": "审批节点"}},
  94. {"id": "node_end", "type": "bpmn:endEvent", "x": 500, "y": 200,
  95. "properties": {}, "text": {"x": 500, "y": 200, "value": "结束"}},
  96. ],
  97. "edges": [
  98. {"id": "e1", "type": "bpmn:sequenceFlow",
  99. "sourceNodeId": "node_start", "targetNodeId": "node_task",
  100. "startPoint": {"x": 120, "y": 200}, "endPoint": {"x": 280, "y": 200},
  101. "pointsList": []},
  102. {"id": "e2", "type": "bpmn:sequenceFlow",
  103. "sourceNodeId": "node_task", "targetNodeId": "node_end",
  104. "startPoint": {"x": 320, "y": 200}, "endPoint": {"x": 480, "y": 200},
  105. "pointsList": []},
  106. ],
  107. }, ensure_ascii=False)
  108. # ───── 数据库辅助 ─────
  109. def insert_flow(db: pymysql.connections.Connection, name: str, biz_type: str,
  110. timeout_action: str | None, enable_manual: bool = False) -> int:
  111. flow_json = make_flow_json(timeout_action, enable_manual)
  112. with db.cursor() as c:
  113. c.execute(
  114. """
  115. INSERT INTO ApprovalFlow
  116. (Id, Code, Name, FormJson, FlowJson, Status, OrgId, IsDelete,
  117. CreateTime, UpdateTime, CreateUserId, CreateUserName,
  118. BizType, Version, IsPublished)
  119. VALUES (UUID_SHORT(), %s, %s, '[]', %s, 1, 0, 0,
  120. NOW(), NOW(), %s, 'E2E-Escalation',
  121. %s, 1, 1)
  122. """,
  123. (f"ESC_{biz_type}"[:32], f"升级E2E-{name}"[:32], flow_json,
  124. SUPER_ADMIN_ID, biz_type),
  125. )
  126. c.execute("SELECT Id FROM ApprovalFlow WHERE BizType=%s ORDER BY Id DESC LIMIT 1", (biz_type,))
  127. row = c.fetchone()
  128. return int(row["Id"])
  129. def fetch_first_pending_task(db: pymysql.connections.Connection, instance_id: int) -> dict | None:
  130. with db.cursor() as c:
  131. c.execute(
  132. "SELECT Id, AssigneeId, Status, CreateTime FROM ApprovalFlowTask "
  133. "WHERE InstanceId=%s AND Status=0 ORDER BY Id LIMIT 1",
  134. (instance_id,),
  135. )
  136. return c.fetchone()
  137. def rollback_task_time(db: pymysql.connections.Connection, task_id: int, hours: int = 2) -> None:
  138. with db.cursor() as c:
  139. c.execute(
  140. "UPDATE ApprovalFlowTask SET CreateTime = DATE_SUB(NOW(), INTERVAL %s HOUR) WHERE Id=%s",
  141. (hours, task_id),
  142. )
  143. def fetch_task_state(db: pymysql.connections.Connection, task_id: int) -> dict | None:
  144. with db.cursor() as c:
  145. c.execute("SELECT Id, Status, Comment, ActionTime FROM ApprovalFlowTask WHERE Id=%s", (task_id,))
  146. return c.fetchone()
  147. def fetch_instance_state(db: pymysql.connections.Connection, instance_id: int) -> dict | None:
  148. with db.cursor() as c:
  149. c.execute("SELECT Id, Status, EndTime, CurrentNodeId FROM ApprovalFlowInstance WHERE Id=%s", (instance_id,))
  150. return c.fetchone()
  151. def fetch_pending_tasks(db: pymysql.connections.Connection, instance_id: int, exclude_task_id: int | None = None) -> list[dict]:
  152. with db.cursor() as c:
  153. sql = "SELECT Id, AssigneeId, AssigneeName FROM ApprovalFlowTask WHERE InstanceId=%s AND Status=0"
  154. params: list = [instance_id]
  155. if exclude_task_id:
  156. sql += " AND Id<>%s"
  157. params.append(exclude_task_id)
  158. c.execute(sql, tuple(params))
  159. return list(c.fetchall())
  160. def fetch_log_actions(db: pymysql.connections.Connection, instance_id: int) -> list[int]:
  161. with db.cursor() as c:
  162. c.execute("SELECT Action FROM ApprovalFlowLog WHERE InstanceId=%s ORDER BY Id", (instance_id,))
  163. return [int(r["Action"]) for r in c.fetchall()]
  164. # ───── 场景辅助 ─────
  165. def start(token: str, biz_type: str, biz_id: int, title: str) -> int:
  166. return int(api_post(token, "/flowInstance/start", {
  167. "bizType": biz_type,
  168. "bizId": biz_id,
  169. "bizNo": f"E2E_{biz_id}",
  170. "title": title,
  171. }))
  172. # ───── 清理 ─────
  173. def cleanup(db: pymysql.connections.Connection, flow_ids: list[int], instance_ids: list[int]) -> None:
  174. with db.cursor() as c:
  175. if instance_ids:
  176. ids = ",".join(str(i) for i in instance_ids)
  177. c.execute(f"DELETE FROM ApprovalFlowLog WHERE InstanceId IN ({ids})")
  178. c.execute(f"DELETE FROM ApprovalFlowTask WHERE InstanceId IN ({ids})")
  179. c.execute(f"DELETE FROM ApprovalFlowCompletedNode WHERE InstanceId IN ({ids})")
  180. c.execute(f"DELETE FROM ApprovalFlowInstance WHERE Id IN ({ids})")
  181. if flow_ids:
  182. ids = ",".join(str(i) for i in flow_ids)
  183. c.execute(f"DELETE FROM ApprovalFlow WHERE Id IN ({ids})")
  184. # ───── main ─────
  185. def main() -> int:
  186. try:
  187. token = login()
  188. except Exception as e:
  189. print(f"[FAIL] login: {e}")
  190. return 1
  191. print(f"[PASS] login OK")
  192. db = pymysql.connect(**DB)
  193. created_flows: list[int] = []
  194. created_instances: list[int] = []
  195. try:
  196. # ── 预插 5 条流程定义(每条 1 个 BizType) ──
  197. cases = [
  198. ("notify", f"EscE2E_N_{SUFFIX}", "Notify", False),
  199. ("auto-approve", f"EscE2E_A_{SUFFIX}", "AutoApprove", False),
  200. ("auto-reject", f"EscE2E_R_{SUFFIX}", "AutoReject", False),
  201. ("auto-escalate", f"EscE2E_E_{SUFFIX}", "AutoEscalate", False),
  202. ("manual-escalate", f"EscE2E_M_{SUFFIX}", None, True),
  203. ]
  204. flow_map: dict[str, tuple[int, str]] = {}
  205. for name, biz_type, ta, em in cases:
  206. fid = insert_flow(db, name, biz_type, ta, em)
  207. created_flows.append(fid)
  208. flow_map[name] = (fid, biz_type)
  209. check("预建 5 条临时审批流定义", len(created_flows) == 5, f"flow_ids={created_flows}")
  210. # ── 起 5 个实例 ──
  211. instance_map: dict[str, int] = {}
  212. for name, (fid, biz_type) in flow_map.items():
  213. iid = start(token, biz_type, biz_id=fid, title=f"E2E-{name}-{SUFFIX}")
  214. instance_map[name] = iid
  215. created_instances.append(iid)
  216. check("5 个实例成功发起", len(instance_map) == 5, f"instance_ids={created_instances}")
  217. # ── 取 Pending task 并回拨 CreateTime(仅超时 4 个,手动升级不回拨) ──
  218. task_map: dict[str, int] = {}
  219. for name in ("notify", "auto-approve", "auto-reject", "auto-escalate", "manual-escalate"):
  220. t = fetch_first_pending_task(db, instance_map[name])
  221. assert t, f"{name} 未生成 Pending task"
  222. task_map[name] = int(t["Id"])
  223. if name in ("notify", "auto-approve", "auto-reject", "auto-escalate"):
  224. rollback_task_time(db, task_map[name], hours=2)
  225. check("4 个超时场景 task.CreateTime 已回拨 2 小时", True)
  226. # ── 第一次触发扫描 ──
  227. processed = api_post(token, "/flowTask/triggerTimeoutScan")
  228. check("triggerTimeoutScan 返回处理条数 >=4", isinstance(processed, int) and processed >= 4, f"processed={processed}")
  229. # ── 断言 Notify ──
  230. ns = fetch_task_state(db, task_map["notify"])
  231. assert ns
  232. check("Notify: task 保持 Pending(0)", ns["Status"] == 0, f"status={ns['Status']}")
  233. logs_n = fetch_log_actions(db, instance_map["notify"])
  234. check("Notify: 产生 AutoTimeout(10) 日志", 10 in logs_n, f"logs={logs_n}")
  235. # 幂等性:再触发一次扫描,Notify 不应再次写日志
  236. processed2 = api_post(token, "/flowTask/triggerTimeoutScan")
  237. logs_n2 = fetch_log_actions(db, instance_map["notify"])
  238. notify_count_1 = logs_n.count(10)
  239. notify_count_2 = logs_n2.count(10)
  240. check("Notify 幂等:二次扫描不重复写日志", notify_count_1 == notify_count_2,
  241. f"first={notify_count_1} second={notify_count_2}")
  242. # ── 断言 AutoApprove ──
  243. aa = fetch_task_state(db, task_map["auto-approve"])
  244. assert aa
  245. check("AutoApprove: task 变 Approved(1)", aa["Status"] == 1, f"status={aa['Status']}")
  246. ai = fetch_instance_state(db, instance_map["auto-approve"])
  247. assert ai
  248. # 单审批节点通过 → 实例 Approved(2)(因为后继是 endEvent)
  249. check("AutoApprove: instance 变 Approved(2)", ai["Status"] == 2, f"status={ai['Status']}")
  250. logs_a = fetch_log_actions(db, instance_map["auto-approve"])
  251. check("AutoApprove: 产生 AutoTimeout(10) 日志", 10 in logs_a, f"logs={logs_a}")
  252. # ── 断言 AutoReject ──
  253. ar = fetch_task_state(db, task_map["auto-reject"])
  254. assert ar
  255. check("AutoReject: task 变 Rejected(2)", ar["Status"] == 2, f"status={ar['Status']}")
  256. ri = fetch_instance_state(db, instance_map["auto-reject"])
  257. assert ri
  258. check("AutoReject: instance 变 Rejected(3)", ri["Status"] == 3, f"status={ri['Status']}")
  259. check("AutoReject: EndTime 已写入", ri["EndTime"] is not None)
  260. # ── 断言 AutoEscalate ──
  261. ae = fetch_task_state(db, task_map["auto-escalate"])
  262. assert ae
  263. check("AutoEscalate: 原 task 变 Escalated(6)", ae["Status"] == 6, f"status={ae['Status']}")
  264. new_tasks = fetch_pending_tasks(db, instance_map["auto-escalate"], exclude_task_id=task_map["auto-escalate"])
  265. has_target = any(int(t["AssigneeId"]) == ESCALATE_TARGET_ID for t in new_tasks)
  266. check("AutoEscalate: 升级目标生成新 Pending 任务",
  267. has_target and len(new_tasks) >= 1,
  268. f"newTasks={[(t['Id'], t['AssigneeId'], t['AssigneeName']) for t in new_tasks]}")
  269. logs_e = fetch_log_actions(db, instance_map["auto-escalate"])
  270. check("AutoEscalate: 产生 AutoTimeout(10) 日志", 10 in logs_e, f"logs={logs_e}")
  271. ei = fetch_instance_state(db, instance_map["auto-escalate"])
  272. assert ei
  273. check("AutoEscalate: instance 仍 Running(1)", ei["Status"] == 1, f"status={ei['Status']}")
  274. # ── 手动升级:调用 /flowTask/escalate ──
  275. manual_task_id = task_map["manual-escalate"]
  276. # 先验证 GetEscalationConfig 返回 enabled=true
  277. cfg_r = requests.get(
  278. f"{API}/flowTask/getEscalationConfig",
  279. params={"taskId": manual_task_id},
  280. headers={"Authorization": f"Bearer {token}"},
  281. )
  282. cfg = cfg_r.json().get("result") or {}
  283. check("Manual: getEscalationConfig 返回 enabled=true",
  284. cfg.get("enabled") is True, f"cfg={cfg}")
  285. api_post(token, "/flowTask/escalate", {"taskId": manual_task_id, "comment": "E2E 手动升级"})
  286. mt = fetch_task_state(db, manual_task_id)
  287. assert mt
  288. check("Manual: 原 task 变 Escalated(6)", mt["Status"] == 6, f"status={mt['Status']}")
  289. new_manual_tasks = fetch_pending_tasks(db, instance_map["manual-escalate"], exclude_task_id=manual_task_id)
  290. has_manual_target = any(int(t["AssigneeId"]) == ESCALATE_TARGET_ID for t in new_manual_tasks)
  291. check("Manual: 升级目标生成新 Pending 任务", has_manual_target,
  292. f"newTasks={[(t['Id'], t['AssigneeId'], t['AssigneeName']) for t in new_manual_tasks]}")
  293. logs_m = fetch_log_actions(db, instance_map["manual-escalate"])
  294. check("Manual: 产生 Escalate(11) 日志", 11 in logs_m, f"logs={logs_m}")
  295. # ── 非管理员无权触发扫描(Demo01 = NormalUser 777) ──
  296. try:
  297. user_token = login("Demo01", "1234567890dop")
  298. r = requests.post(
  299. f"{API}/flowTask/triggerTimeoutScan",
  300. headers={"Authorization": f"Bearer {user_token}"},
  301. )
  302. body = r.json()
  303. # 期望 code != 200 且 message 含"管理员"
  304. ok = body.get("code") != 200 and body.get("message") and "管理员" in body["message"]
  305. check("权限: 非管理员调用 triggerTimeoutScan 被拒", ok, f"body={body}")
  306. except Exception as e:
  307. check("权限: 非管理员调用 triggerTimeoutScan 被拒", False, f"exception={e}")
  308. finally:
  309. try:
  310. cleanup(db, created_flows, created_instances)
  311. print("[INFO] 清理临时流程/实例/任务/日志完成")
  312. except Exception as e:
  313. print(f"[WARN] 清理失败: {e}")
  314. db.close()
  315. # 汇总
  316. print("\n================ 汇总 ================")
  317. passed = sum(1 for s, _, _ in checks if s == "PASS")
  318. failed = sum(1 for s, _, _ in checks if s == "FAIL")
  319. print(f"PASS: {passed} / TOTAL: {len(checks)}")
  320. if failed:
  321. print("\n失败项:")
  322. for s, n, d in checks:
  323. if s == "FAIL":
  324. print(f" - {n}: {d}")
  325. return 1
  326. print("全部通过")
  327. return 0
  328. if __name__ == "__main__":
  329. sys.exit(main())