#!/usr/bin/env python3 """E2E-01: 旧 Business API 等价迁移统一验收编排。 1. 扫描 Web/ 无旧 api/business 直连 2. 串联运行 S1 / S2-S4 / S1-S4 full 子脚本 3. 校验 aidop_action_run_log 关键 action_code 存在 """ from __future__ import annotations import json import os import subprocess import sys import urllib.error import urllib.request from pathlib import Path ROOT = Path(__file__).resolve().parents[1] DOC = ROOT / "doc" WEB = ROOT / "Web" BASE = os.environ.get("AIDOP_API_BASE", "http://127.0.0.1:5005") TENANT = 797403760988229 REQUIRED_ACTION_CODES = [ "S1_ORDER_REVIEW", "S1_DELIVERY_CONFIRM", "S1_ORDER_REFRESH_PLAN", "S2_SCHEDULE_GENERATE", "S2_PRIORITY_RECHECK", "S2_KITTING_CHECK_BATCH", "S2_KITTING_CHECK_SINGLE", "S2_WORK_ORDER_RELEASE", "S4_MRP_GENERATE", "S3_DELIVERY_GENERATE", "S4_PROCUREMENT_PIPELINE", ] CHILD_SCRIPTS = [ ("order_review", DOC / "_verify_order_review_e2e.py"), ("s2_s4_chain", DOC / "_verify_s2_s4_chain_e2e.py"), ("s1_s4_full", DOC / "_verify_s1_s4_full_e2e.py"), ("s3_delivery_18step", DOC / "_verify_s3_delivery_18step_e2e.py"), ] PREP_SCRIPT = DOC / "_prep_close_evidence.py" OLD_URL_PATTERNS = [ "123.60.180.165:8087", "/api/business/", ] def scan_old_frontend_urls() -> list[str]: hits: list[str] = [] if not WEB.exists(): return hits skip_dirs = {"node_modules", "dist", ".git", ".pnpm"} for path in WEB.rglob("*"): if not path.is_file(): continue if any(part in skip_dirs for part in path.parts): continue if path.suffix not in {".ts", ".vue", ".js", ".tsx", ".jsx"}: continue try: text = path.read_text(encoding="utf-8", errors="ignore") except OSError: continue for pat in OLD_URL_PATTERNS: if pat in text: hits.append(f"{path.relative_to(ROOT)}: {pat}") return hits def http_json(method: str, path: str, body=None, token: str | None = None): headers = {"Content-Type": "application/json", "Accept": "application/json"} if token: headers["Authorization"] = f"Bearer {token}" data = None if body is None else json.dumps(body).encode("utf-8") req = urllib.request.Request(f"{BASE}{path}", data=data, headers=headers, method=method) try: with urllib.request.urlopen(req, timeout=60) as resp: raw = resp.read().decode("utf-8") try: return resp.status, json.loads(raw) except json.JSONDecodeError: return resp.status, {"raw": raw[:2000]} except urllib.error.HTTPError as e: raw = e.read().decode("utf-8", errors="replace") try: return e.code, json.loads(raw) except json.JSONDecodeError: return e.code, {"raw": raw[:2000]} def login() -> str: node_script = r""" const { sm2 } = require('sm-crypto-v2'); const PK = '0484C7466D950E120E5ECE5DD85D0C90EAA85081A3A2BD7C57AE6DC822EFCCBD66620C67B0103FC8DD280E36C3B282977B722AAEC3C56518EDCEBAFB72C5A05312'; console.log(JSON.stringify({ account: 'AIDOPDemo', password: sm2.doEncrypt('1234567890dop', PK, 1), tenantId: '797403760988229' })); """ proc = subprocess.run( ["node", "-e", node_script], cwd=str(WEB), capture_output=True, text=True, timeout=30, ) if proc.returncode != 0: raise RuntimeError(proc.stderr or proc.stdout) _, body = http_json("POST", "/api/sysAuth/login", json.loads(proc.stdout.strip())) token = (body.get("result") or {}).get("accessToken") or body.get("accessToken") if not token: raise RuntimeError(f"login failed: {body}") return token def parse_child_report(stdout: str) -> dict | None: text = (stdout or "").strip() if not text: return None try: return json.loads(text) except json.JSONDecodeError: start = text.find("{") end = text.rfind("}") if start >= 0 and end > start: try: return json.loads(text[start : end + 1]) except json.JSONDecodeError: return None return None def scan_frontend_wiring() -> list[dict]: """CLOSE-E2E:检查关键能力是否已挂到前端页面(彻底打通之一)。""" checks: list[dict] = [] def file_contains(rel: str, *needles: str) -> bool: path = ROOT / rel if not path.exists(): return False text = path.read_text(encoding="utf-8", errors="ignore") return all(n in text for n in needles) def files_contain(rels: list[str], *needles: str) -> bool: text = "" for rel in rels: path = ROOT / rel if path.exists(): text += path.read_text(encoding="utf-8", errors="ignore") return all(n in text for n in needles) wiring = [ ("S1_order_review_page", "Web/src/views/aidop/business/orderList.vue", ("reviewSeOrder", "confirmSeOrderDelivery"), False), ("S2_schedule_page", "Web/src/views/aidop/production/workOrderSchedulingList.vue", ("productionSchedule",), False), ( "S2_dispatch_page", [ "Web/src/views/aidop/business/workOrderDispatchList.vue", "Web/src/views/aidop/business/workOrderDispatchForm.vue", ], ("onKitCheck", "releaseWorkOrder"), True, ), ("S2_single_kitting_page", "Web/src/views/aidop/business/workOrderDispatchList.vue", ("kittingCheckSingle",), False), ("S3_delivery_generate_page", "Web/src/views/aidop/s3/supply/deliveryScheduleList.vue", ("generateDeliverySchedule",), False), ("S3_procurement_pipeline_page", "Web/src/views/aidop/s3/supply/deliveryScheduleList.vue", ("executeProcurementPipeline",), False), ("S4_read_path_page", "Web/src/views/aidop/data-platform/actionRunLogs.vue", ("fetchS4ReadPathConsistency",), False), ] for item in wiring: name, rel, needles, multi = item if multi: ok = files_contain(rel, *needles) file_label = " + ".join(rel) else: ok = file_contains(rel, *needles) file_label = rel checks.append({"name": name, "file": file_label, "ok": ok, "needles": list(needles)}) return checks def build_module_status(child_reports: dict[str, dict | None], wiring: list[dict]) -> dict: """汇总 S1-S4 打通状态:passed / 待对账 / 待补齐 / 需决策。""" modules: dict[str, dict] = { "S1": {"status": "待对账", "evidence": [], "gaps": []}, "S2": {"status": "待对账", "evidence": [], "gaps": []}, "S3": {"status": "待对账", "evidence": [], "gaps": []}, "S4": {"status": "待对账", "evidence": [], "gaps": []}, } def child_passed(name: str) -> bool: r = child_reports.get(name) return isinstance(r, dict) and bool(r.get("passed")) if child_passed("order_review"): modules["S1"]["evidence"].append("order_review_e2e") if child_passed("s1_s4_full"): modules["S1"]["evidence"].append("s1_s4_full_e2e") modules["S4"]["evidence"].append("s1_s4_full_e2e") if child_passed("s2_s4_chain"): modules["S2"]["evidence"].append("s2_s4_chain_e2e") modules["S3"]["evidence"].append("s2_s4_chain_e2e") modules["S4"]["evidence"].append("s2_s4_chain_e2e") if child_passed("s3_delivery_18step"): modules["S3"]["evidence"].append("s3_delivery_18step_e2e") wiring_map = {w["name"]: w["ok"] for w in wiring} if not wiring_map.get("S1_order_review_page"): modules["S1"]["gaps"].append("订单评审页未接线") if not wiring_map.get("S2_schedule_page") or not wiring_map.get("S2_dispatch_page"): modules["S2"]["gaps"].append("排程/下达页未接线") if not wiring_map.get("S2_single_kitting_page"): modules["S2"]["gaps"].append("单工单齐套无页面入口") if not wiring_map.get("S3_delivery_generate_page"): modules["S3"]["gaps"].append("交货计划生成无页面入口") if not wiring_map.get("S3_procurement_pipeline_page"): modules["S3"]["gaps"].append("采购闭环无页面入口") if not wiring_map.get("S4_read_path_page"): modules["S4"]["gaps"].append("S4读路径一致性无页面入口") modules["S2"]["gaps"].append("库存占用:需决策") modules["S3"]["gaps"].append("真实SAP/QAD:需决策") modules["S1"]["gaps"].append("替代料口径:需决策") decision_only = lambda gaps: not gaps or all("需决策" in g for g in gaps) required_evidence = { "S1": {"order_review_e2e", "s1_s4_full_e2e"}, "S2": {"s2_s4_chain_e2e"}, "S3": {"s3_delivery_18step_e2e"}, "S4": {"s1_s4_full_e2e", "s2_s4_chain_e2e"}, } for key, mod in modules.items(): if mod["gaps"] and any("未接线" in g or "无页面" in g for g in mod["gaps"]): mod["status"] = "待补齐" elif not mod["evidence"]: mod["status"] = "待补齐" elif set(mod["evidence"]) >= required_evidence[key] and decision_only(mod["gaps"]): mod["status"] = "已打通" elif mod["evidence"]: mod["status"] = "待对账" return modules def run_child(name: str, script: Path) -> dict: if not script.exists(): return {"name": name, "ok": False, "error": f"missing script: {script}"} proc = subprocess.run( [sys.executable, str(script)], cwd=str(ROOT), capture_output=True, text=True, timeout=300, ) parsed = parse_child_report(proc.stdout or "") result = { "name": name, "ok": proc.returncode == 0, "exit_code": proc.returncode, "parsed": parsed, "stdout_tail": (proc.stdout or "")[-4000:], "stderr_tail": (proc.stderr or "")[-2000:], } if isinstance(parsed, dict): result["child_passed"] = parsed.get("passed") result["child_checks"] = parsed.get("checks") return result def check_action_logs(token: str) -> dict: checks = [] for code in REQUIRED_ACTION_CODES: st, body = http_json( "GET", f"/api/Aidop/action-run-log/list?actionCode={code}&page=1&pageSize=1", token=token, ) total = 0 if isinstance(body, dict): total = body.get("total") or 0 checks.append({"action_code": code, "http": st, "total": total, "ok": st == 200 and total > 0}) return {"checks": checks, "ok": all(c["ok"] for c in checks)} def main(): report = { "matrix_doc": "doc/plan/旧Business API等价验收矩阵.md", "steps": [], "checks": [], } url_hits = scan_old_frontend_urls() report["steps"].append({"step": "old_url_scan", "hits": url_hits, "ok": len(url_hits) == 0}) report["checks"].append({"name": "no_old_frontend_url", "ok": len(url_hits) == 0, "detail": url_hits}) prep_result = run_child("prep_close_evidence", PREP_SCRIPT) report["steps"].append({"step": "prep_close_evidence", **prep_result}) report["checks"].append({"name": "prep_close_evidence", "ok": prep_result.get("ok", False)}) child_results = [] child_reports: dict[str, dict | None] = {} for name, script in CHILD_SCRIPTS: result = run_child(name, script) child_results.append(result) child_reports[name] = result.get("parsed") if isinstance(result.get("parsed"), dict) else None report["steps"].append({"step": f"child_{name}", **result}) report["checks"].append({"name": f"child_{name}", "ok": result.get("ok", False)}) wiring = scan_frontend_wiring() report["steps"].append({"step": "frontend_wiring", "checks": wiring, "ok": all(c["ok"] for c in wiring)}) report["checks"].append({ "name": "frontend_wiring", "ok": all(c["ok"] for c in wiring), "detail": [c for c in wiring if not c["ok"]], }) report["module_status"] = build_module_status(child_reports, wiring) report["gaps"] = { k: v.get("gaps", []) for k, v in report["module_status"].items() } try: token = login() log_check = check_action_logs(token) report["steps"].append({"step": "action_log_codes", **log_check}) report["checks"].append({ "name": "action_log_codes", "ok": log_check["ok"], "detail": [c for c in log_check["checks"] if not c["ok"]], }) except Exception as ex: report["steps"].append({"step": "action_log_codes", "ok": False, "error": str(ex)}) report["checks"].append({"name": "action_log_codes", "ok": False, "detail": str(ex)}) report["passed"] = all(c["ok"] for c in report["checks"]) out = DOC / "_verify_old_api_equivalence_e2e_result.json" out.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8") print(json.dumps(report, ensure_ascii=False, indent=2)) sys.exit(0 if report["passed"] else 5) if __name__ == "__main__": main()