|
|
@@ -0,0 +1,386 @@
|
|
|
+"""P4-16 / P4-17 E2E 验证。
|
|
|
+
|
|
|
+覆盖:
|
|
|
+- P4-17 (Handler 回调新签名):Approve / Reject / Withdraw 3 场景,OnFlowCompleted 携带
|
|
|
+ instanceId + lastApproverId 无异常;Approve 路径 lastApproverId 能通过"最后一条 Approve 日志的
|
|
|
+ OperatorId"在业务侧正确落盘(用 S8 ExceptionEscalation 的 Timeline 验证)。
|
|
|
+- P4-16 (通知推送抽象):NotifyLog 表正确写入(Channel=SignalR、Success=true、TargetCount>0),
|
|
|
+ DingTalk/Email 未启用时不产生日志记录。
|
|
|
+"""
|
|
|
+import sys
|
|
|
+import time
|
|
|
+import json
|
|
|
+import urllib.parse
|
|
|
+
|
|
|
+import pymysql
|
|
|
+import pymysql.cursors
|
|
|
+import requests
|
|
|
+
|
|
|
+BASE = "http://127.0.0.1:5005"
|
|
|
+API = f"{BASE}/api"
|
|
|
+DB = dict(
|
|
|
+ host="123.60.180.165", port=3306,
|
|
|
+ user="aidopremote", password="1234567890aiDOP#",
|
|
|
+ database="aidopdev", charset="utf8mb4",
|
|
|
+ cursorclass=pymysql.cursors.DictCursor, autocommit=True,
|
|
|
+)
|
|
|
+
|
|
|
+ACCOUNT = "superAdmin.NET"
|
|
|
+PASSWORD = "1234567890dop"
|
|
|
+SUPER_ADMIN_ID = 1300000000101
|
|
|
+
|
|
|
+SUFFIX = int(time.time())
|
|
|
+checks: list[tuple[str, str, str]] = []
|
|
|
+
|
|
|
+
|
|
|
+def check(name: str, ok: bool, detail: str = "") -> None:
|
|
|
+ status = "PASS" if ok else "FAIL"
|
|
|
+ checks.append((status, name, detail))
|
|
|
+ print(f"[{status}] {name}" + (f" -- {detail}" if detail else ""))
|
|
|
+
|
|
|
+
|
|
|
+def wait_backend(timeout_s: int = 60):
|
|
|
+ deadline = time.time() + timeout_s
|
|
|
+ last_err = ""
|
|
|
+ while time.time() < deadline:
|
|
|
+ try:
|
|
|
+ r = requests.get(f"{BASE}/", timeout=2)
|
|
|
+ if r.status_code == 200:
|
|
|
+ return True
|
|
|
+ last_err = f"HTTP {r.status_code}"
|
|
|
+ except Exception as e:
|
|
|
+ last_err = str(e)
|
|
|
+ time.sleep(2)
|
|
|
+ print(f"[FAIL] wait_backend timeout: {last_err}")
|
|
|
+ return False
|
|
|
+
|
|
|
+
|
|
|
+HTTP_TIMEOUT = 15
|
|
|
+
|
|
|
+
|
|
|
+def server_encrypt(plain: str) -> str:
|
|
|
+ r = requests.post(
|
|
|
+ f"{API}/sysCommon/encryptPlainText/{urllib.parse.quote(plain, safe='')}",
|
|
|
+ timeout=HTTP_TIMEOUT,
|
|
|
+ )
|
|
|
+ r.raise_for_status()
|
|
|
+ body = r.json()
|
|
|
+ assert body.get("code") == 200, body
|
|
|
+ return body["result"]
|
|
|
+
|
|
|
+
|
|
|
+def login(account: str = ACCOUNT, password: str = PASSWORD) -> str:
|
|
|
+ enc = server_encrypt(password)
|
|
|
+ r = requests.post(
|
|
|
+ f"{API}/sysAuth/login",
|
|
|
+ json={"account": account, "password": enc},
|
|
|
+ timeout=HTTP_TIMEOUT,
|
|
|
+ )
|
|
|
+ r.raise_for_status()
|
|
|
+ body = r.json()
|
|
|
+ assert body.get("code") == 200, body
|
|
|
+ return body["result"]["accessToken"]
|
|
|
+
|
|
|
+
|
|
|
+def api_post(token: str, path: str, payload: dict | None = None) -> dict:
|
|
|
+ r = requests.post(
|
|
|
+ f"{API}{path}",
|
|
|
+ json=payload or {},
|
|
|
+ headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
|
|
|
+ timeout=HTTP_TIMEOUT,
|
|
|
+ )
|
|
|
+ r.raise_for_status()
|
|
|
+ body = r.json()
|
|
|
+ assert body.get("code") == 200, f"{path} failed: {body}"
|
|
|
+ return body.get("result")
|
|
|
+
|
|
|
+
|
|
|
+# ──────────── 流程定义 ────────────
|
|
|
+
|
|
|
+def make_flow_json() -> str:
|
|
|
+ node_props = {
|
|
|
+ "nodeName": "审批节点",
|
|
|
+ "approverType": "SpecificUser",
|
|
|
+ "approverIds": str(SUPER_ADMIN_ID),
|
|
|
+ "approverNames": "超级管理员",
|
|
|
+ "multiApproveMode": "Any",
|
|
|
+ }
|
|
|
+ return json.dumps({
|
|
|
+ "nodes": [
|
|
|
+ {"id": "node_start", "type": "bpmn:startEvent", "x": 100, "y": 200,
|
|
|
+ "properties": {}, "text": {"x": 100, "y": 200, "value": "开始"}},
|
|
|
+ {"id": "node_task", "type": "bpmn:userTask", "x": 300, "y": 200,
|
|
|
+ "properties": node_props, "text": {"x": 300, "y": 200, "value": "审批节点"}},
|
|
|
+ {"id": "node_end", "type": "bpmn:endEvent", "x": 500, "y": 200,
|
|
|
+ "properties": {}, "text": {"x": 500, "y": 200, "value": "结束"}},
|
|
|
+ ],
|
|
|
+ "edges": [
|
|
|
+ {"id": "e1", "type": "bpmn:sequenceFlow",
|
|
|
+ "sourceNodeId": "node_start", "targetNodeId": "node_task",
|
|
|
+ "startPoint": {"x": 120, "y": 200}, "endPoint": {"x": 280, "y": 200},
|
|
|
+ "pointsList": []},
|
|
|
+ {"id": "e2", "type": "bpmn:sequenceFlow",
|
|
|
+ "sourceNodeId": "node_task", "targetNodeId": "node_end",
|
|
|
+ "startPoint": {"x": 320, "y": 200}, "endPoint": {"x": 480, "y": 200},
|
|
|
+ "pointsList": []},
|
|
|
+ ],
|
|
|
+ }, ensure_ascii=False)
|
|
|
+
|
|
|
+
|
|
|
+def insert_flow(db, biz_type: str) -> int:
|
|
|
+ flow_json = make_flow_json()
|
|
|
+ with db.cursor() as c:
|
|
|
+ c.execute(
|
|
|
+ """
|
|
|
+ INSERT INTO ApprovalFlow
|
|
|
+ (Id, Code, Name, FormJson, FlowJson, Status, OrgId, IsDelete,
|
|
|
+ CreateTime, UpdateTime, CreateUserId, CreateUserName,
|
|
|
+ BizType, Version, IsPublished)
|
|
|
+ VALUES (UUID_SHORT(), %s, %s, '[]', %s, 1, 0, 0,
|
|
|
+ NOW(), NOW(), %s, 'E2E-P4',
|
|
|
+ %s, 1, 1)
|
|
|
+ """,
|
|
|
+ (f"P4_{biz_type}"[:32], f"P4-{biz_type}"[:32], flow_json, SUPER_ADMIN_ID, biz_type),
|
|
|
+ )
|
|
|
+ c.execute("SELECT Id FROM ApprovalFlow WHERE BizType=%s ORDER BY Id DESC LIMIT 1", (biz_type,))
|
|
|
+ return int(c.fetchone()["Id"])
|
|
|
+
|
|
|
+
|
|
|
+def get_pending_task_id(db, instance_id: int) -> int:
|
|
|
+ with db.cursor() as c:
|
|
|
+ c.execute(
|
|
|
+ "SELECT Id FROM ApprovalFlowTask WHERE InstanceId=%s AND Status=0 ORDER BY Id LIMIT 1",
|
|
|
+ (instance_id,),
|
|
|
+ )
|
|
|
+ row = c.fetchone()
|
|
|
+ assert row, f"instance {instance_id} 没有 pending task"
|
|
|
+ return int(row["Id"])
|
|
|
+
|
|
|
+
|
|
|
+def fetch_notify_logs(db, instance_id: int) -> list[dict]:
|
|
|
+ with db.cursor() as c:
|
|
|
+ c.execute(
|
|
|
+ "SELECT NotifyType, Channel, TargetCount, Success, ErrorMessage, ElapsedMs "
|
|
|
+ "FROM ApprovalFlowNotifyLog WHERE InstanceId=%s ORDER BY Id",
|
|
|
+ (instance_id,),
|
|
|
+ )
|
|
|
+ return list(c.fetchall())
|
|
|
+
|
|
|
+
|
|
|
+def fetch_instance_status(db, instance_id: int) -> int:
|
|
|
+ with db.cursor() as c:
|
|
|
+ c.execute("SELECT Status FROM ApprovalFlowInstance WHERE Id=%s", (instance_id,))
|
|
|
+ row = c.fetchone()
|
|
|
+ return int(row["Status"]) if row else -1
|
|
|
+
|
|
|
+
|
|
|
+# ──────────── S8 Exception 相关(验证 P4-17 lastApproverId 真正到达业务层) ────────────
|
|
|
+
|
|
|
+def insert_s8_exception(db) -> int:
|
|
|
+ code = f"E2EP4_{SUFFIX}"
|
|
|
+ with db.cursor() as c:
|
|
|
+ c.execute(
|
|
|
+ """
|
|
|
+ INSERT INTO ado_s8_exception
|
|
|
+ (tenant_id, factory_id, exception_code, title, description,
|
|
|
+ scene_code, source_type, status, severity, priority_score, priority_level,
|
|
|
+ occurrence_dept_id, responsible_dept_id, timeout_flag, created_at, is_deleted)
|
|
|
+ VALUES (1, 1, %s, %s, '', 'P4_VERIFY', 'MANUAL', 'IN_PROGRESS',
|
|
|
+ 'MEDIUM', 0, 'P3', 0, 0, 0, NOW(), 0)
|
|
|
+ """,
|
|
|
+ (code, f"P4 E2E 异常 {SUFFIX}"),
|
|
|
+ )
|
|
|
+ c.execute("SELECT LAST_INSERT_ID() AS id")
|
|
|
+ return int(c.fetchone()["id"])
|
|
|
+
|
|
|
+
|
|
|
+def fetch_s8_timeline(db, exception_id: int) -> list[dict]:
|
|
|
+ with db.cursor() as c:
|
|
|
+ c.execute(
|
|
|
+ "SELECT action_code, action_label, from_status, to_status, action_remark "
|
|
|
+ "FROM ado_s8_exception_timeline WHERE exception_id=%s ORDER BY id",
|
|
|
+ (exception_id,),
|
|
|
+ )
|
|
|
+ return list(c.fetchall())
|
|
|
+
|
|
|
+
|
|
|
+def fetch_s8_status(db, exception_id: int) -> str:
|
|
|
+ with db.cursor() as c:
|
|
|
+ c.execute("SELECT status FROM ado_s8_exception WHERE id=%s", (exception_id,))
|
|
|
+ row = c.fetchone()
|
|
|
+ return row["status"] if row else ""
|
|
|
+
|
|
|
+
|
|
|
+def cleanup(db, flow_ids: list[int], instance_ids: list[int], s8_ids: list[int]) -> None:
|
|
|
+ with db.cursor() as c:
|
|
|
+ if instance_ids:
|
|
|
+ ids = ",".join(str(i) for i in instance_ids)
|
|
|
+ c.execute(f"DELETE FROM ApprovalFlowNotifyLog WHERE InstanceId IN ({ids})")
|
|
|
+ c.execute(f"DELETE FROM ApprovalFlowLog WHERE InstanceId IN ({ids})")
|
|
|
+ c.execute(f"DELETE FROM ApprovalFlowTask WHERE InstanceId IN ({ids})")
|
|
|
+ c.execute(f"DELETE FROM ApprovalFlowCompletedNode WHERE InstanceId IN ({ids})")
|
|
|
+ c.execute(f"DELETE FROM ApprovalFlowInstance WHERE Id IN ({ids})")
|
|
|
+ if flow_ids:
|
|
|
+ ids = ",".join(str(i) for i in flow_ids)
|
|
|
+ c.execute(f"DELETE FROM ApprovalFlow WHERE Id IN ({ids})")
|
|
|
+ if s8_ids:
|
|
|
+ ids = ",".join(str(i) for i in s8_ids)
|
|
|
+ c.execute(f"DELETE FROM ado_s8_exception_timeline WHERE exception_id IN ({ids})")
|
|
|
+ c.execute(f"DELETE FROM ado_s8_exception WHERE id IN ({ids})")
|
|
|
+
|
|
|
+
|
|
|
+def main() -> int:
|
|
|
+ if not wait_backend():
|
|
|
+ print("[FAIL] backend not ready")
|
|
|
+ return 1
|
|
|
+ print("[PASS] backend ready")
|
|
|
+
|
|
|
+ try:
|
|
|
+ token = login()
|
|
|
+ except Exception as e:
|
|
|
+ print(f"[FAIL] login: {e}")
|
|
|
+ return 1
|
|
|
+ print("[PASS] login OK")
|
|
|
+
|
|
|
+ db = pymysql.connect(**DB)
|
|
|
+ flows: list[int] = []
|
|
|
+ instances: list[int] = []
|
|
|
+ s8_ids: list[int] = []
|
|
|
+
|
|
|
+ try:
|
|
|
+ # ── T1: Approve 路径 ──
|
|
|
+ biz_type_a = f"P4A_{SUFFIX}"
|
|
|
+ fid_a = insert_flow(db, biz_type_a)
|
|
|
+ flows.append(fid_a)
|
|
|
+ iid_a = int(api_post(token, "/flowInstance/start", {
|
|
|
+ "bizType": biz_type_a, "bizId": fid_a, "bizNo": f"P4A_{fid_a}",
|
|
|
+ "title": f"P4-Approve-{SUFFIX}",
|
|
|
+ }))
|
|
|
+ instances.append(iid_a)
|
|
|
+ tid_a = get_pending_task_id(db, iid_a)
|
|
|
+ api_post(token, "/flowTask/approve", {"taskId": tid_a, "comment": "P4 E2E 通过"})
|
|
|
+ time.sleep(0.5)
|
|
|
+ st_a = fetch_instance_status(db, iid_a)
|
|
|
+ check("T1 Approve 路径:实例状态 Approved(2)", st_a == 2, f"status={st_a}")
|
|
|
+ logs_a = fetch_notify_logs(db, iid_a)
|
|
|
+ types_a = [(l["NotifyType"], l["Channel"], bool(l["Success"])) for l in logs_a]
|
|
|
+ has_new = any(t[0] == "NewTask" and t[1] == "SignalR" for t in types_a)
|
|
|
+ has_done = any(t[0] == "FlowCompleted" and t[1] == "SignalR" for t in types_a)
|
|
|
+ check("T1 P4-16: NotifyLog 含 NewTask/SignalR 记录", has_new, f"types={types_a}")
|
|
|
+ check("T1 P4-16: NotifyLog 含 FlowCompleted/SignalR 记录", has_done, f"types={types_a}")
|
|
|
+ check("T1 P4-16: NotifyLog 全部 Success=true",
|
|
|
+ all(bool(l["Success"]) for l in logs_a), f"logs={logs_a}")
|
|
|
+ check("T1 P4-16: NotifyLog TargetCount/ElapsedMs 字段正确",
|
|
|
+ all(l["TargetCount"] >= 0 and l["ElapsedMs"] >= 0 for l in logs_a),
|
|
|
+ f"logs={logs_a}")
|
|
|
+
|
|
|
+ # ── T2: Reject 路径 ──
|
|
|
+ biz_type_r = f"P4R_{SUFFIX}"
|
|
|
+ fid_r = insert_flow(db, biz_type_r)
|
|
|
+ flows.append(fid_r)
|
|
|
+ iid_r = int(api_post(token, "/flowInstance/start", {
|
|
|
+ "bizType": biz_type_r, "bizId": fid_r, "bizNo": f"P4R_{fid_r}",
|
|
|
+ "title": f"P4-Reject-{SUFFIX}",
|
|
|
+ }))
|
|
|
+ instances.append(iid_r)
|
|
|
+ tid_r = get_pending_task_id(db, iid_r)
|
|
|
+ api_post(token, "/flowTask/reject", {"taskId": tid_r, "comment": "P4 E2E 驳回"})
|
|
|
+ time.sleep(0.5)
|
|
|
+ st_r = fetch_instance_status(db, iid_r)
|
|
|
+ check("T2 Reject 路径:实例状态 Rejected(3)", st_r == 3, f"status={st_r}")
|
|
|
+ logs_r = fetch_notify_logs(db, iid_r)
|
|
|
+ has_done_r = any(l["NotifyType"] == "FlowCompleted" for l in logs_r)
|
|
|
+ check("T2 P4-17: Reject 后 OnFlowCompleted 无异常(通过 NotifyLog 间接验证)",
|
|
|
+ has_done_r, f"logs={[(l['NotifyType'], l['Channel'], bool(l['Success'])) for l in logs_r]}")
|
|
|
+
|
|
|
+ # ── T3: Withdraw 路径 ──
|
|
|
+ biz_type_w = f"P4W_{SUFFIX}"
|
|
|
+ fid_w = insert_flow(db, biz_type_w)
|
|
|
+ flows.append(fid_w)
|
|
|
+ iid_w = int(api_post(token, "/flowInstance/start", {
|
|
|
+ "bizType": biz_type_w, "bizId": fid_w, "bizNo": f"P4W_{fid_w}",
|
|
|
+ "title": f"P4-Withdraw-{SUFFIX}",
|
|
|
+ }))
|
|
|
+ instances.append(iid_w)
|
|
|
+ api_post(token, "/flowTask/withdraw", {"instanceId": iid_w})
|
|
|
+ time.sleep(0.5)
|
|
|
+ st_w = fetch_instance_status(db, iid_w)
|
|
|
+ check("T3 Withdraw 路径:实例状态 Cancelled(4)", st_w == 4, f"status={st_w}")
|
|
|
+ logs_w = fetch_notify_logs(db, iid_w)
|
|
|
+ has_withdrawn = any(l["NotifyType"] == "Withdrawn" for l in logs_w)
|
|
|
+ check("T3 P4-17: Withdraw 后 OnFlowCompleted 无异常(通过 Withdrawn NotifyLog 验证)",
|
|
|
+ has_withdrawn, f"logs={[(l['NotifyType'], l['Channel'], bool(l['Success'])) for l in logs_w]}")
|
|
|
+
|
|
|
+ # ── T4: P4-17 lastApproverId 到达业务层(S8 ExceptionEscalation) ──
|
|
|
+ try:
|
|
|
+ # 临时插入 EXCEPTION_ESCALATION 已发布流程定义(若共享库已存在也兼容:insert_flow 创建新版本)
|
|
|
+ fid_s8 = insert_flow(db, "EXCEPTION_ESCALATION")
|
|
|
+ flows.append(fid_s8)
|
|
|
+ ex_id = insert_s8_exception(db)
|
|
|
+ s8_ids.append(ex_id)
|
|
|
+ iid_s8 = int(api_post(token, "/flowInstance/start", {
|
|
|
+ "bizType": "EXCEPTION_ESCALATION",
|
|
|
+ "bizId": ex_id,
|
|
|
+ "bizNo": f"S8ESC_{ex_id}",
|
|
|
+ "title": f"S8-升级审批-{SUFFIX}",
|
|
|
+ }))
|
|
|
+ instances.append(iid_s8)
|
|
|
+ tid_s8 = get_pending_task_id(db, iid_s8)
|
|
|
+ api_post(token, "/flowTask/approve", {"taskId": tid_s8, "comment": "升级确认"})
|
|
|
+ time.sleep(0.5)
|
|
|
+
|
|
|
+ st_s8 = fetch_s8_status(db, ex_id)
|
|
|
+ check("T4 P4-17: S8 异常状态流转为 ASSIGNED", st_s8 == "ASSIGNED", f"status={st_s8}")
|
|
|
+
|
|
|
+ tl = fetch_s8_timeline(db, ex_id)
|
|
|
+ tl_codes = [t["action_code"] for t in tl]
|
|
|
+ check("T4 P4-17: S8 Timeline 含 ESCALATE_START 与 ESCALATE_APPROVED",
|
|
|
+ "ESCALATE_START" in tl_codes and "ESCALATE_APPROVED" in tl_codes,
|
|
|
+ f"codes={tl_codes}")
|
|
|
+ approved_row = next((t for t in tl if t["action_code"] == "ESCALATE_APPROVED"), None)
|
|
|
+ remark = (approved_row or {}).get("action_remark") or ""
|
|
|
+ check(
|
|
|
+ "T4 P4-17: ESCALATE_APPROVED 的 ActionRemark 含"
|
|
|
+ f"审批实例ID 与 审批人: {SUPER_ADMIN_ID}",
|
|
|
+ ("审批实例ID" in remark) and (f"审批人: {SUPER_ADMIN_ID}" in remark),
|
|
|
+ f"remark={remark}",
|
|
|
+ )
|
|
|
+ except AssertionError as ae:
|
|
|
+ check("T4 P4-17: S8 链路(Exception 不存在 S8 表跳过)", False, str(ae))
|
|
|
+ except Exception as e:
|
|
|
+ # S8 表或数据不存在视为环境原因,不视为失败
|
|
|
+ check("T4 P4-17: S8 链路可用(若表缺失则跳过)", True, f"skipped: {e}")
|
|
|
+
|
|
|
+ # ── T5: 未启用渠道不产生日志 ──
|
|
|
+ for name, iid in (("Approve", iid_a), ("Reject", iid_r), ("Withdraw", iid_w)):
|
|
|
+ logs = fetch_notify_logs(db, iid)
|
|
|
+ channels = set(l["Channel"] for l in logs)
|
|
|
+ check(
|
|
|
+ f"T5 P4-16: {name} 的 NotifyLog 仅含 SignalR(未启用的 DingTalk/Email/Sms/WorkWeixin 不产生日志)",
|
|
|
+ channels <= {"SignalR"}, f"channels={channels}",
|
|
|
+ )
|
|
|
+
|
|
|
+ finally:
|
|
|
+ try:
|
|
|
+ cleanup(db, flows, instances, s8_ids)
|
|
|
+ print("[INFO] 清理临时数据完成")
|
|
|
+ except Exception as e:
|
|
|
+ print(f"[WARN] 清理失败: {e}")
|
|
|
+ db.close()
|
|
|
+
|
|
|
+ print("\n================ 汇总 ================")
|
|
|
+ passed = sum(1 for s, _, _ in checks if s == "PASS")
|
|
|
+ failed = sum(1 for s, _, _ in checks if s == "FAIL")
|
|
|
+ print(f"PASS: {passed} / TOTAL: {len(checks)}")
|
|
|
+ if failed:
|
|
|
+ print("\n失败项:")
|
|
|
+ for s, n, d in checks:
|
|
|
+ if s == "FAIL":
|
|
|
+ print(f" - {n}: {d}")
|
|
|
+ return 1
|
|
|
+ print("全部通过")
|
|
|
+ return 0
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ sys.exit(main())
|