| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344 |
- #!/usr/bin/env python3
- """E2E-01: 旧 Business API 等价迁移统一验收编排。
- 1. 扫描 Web/ 无旧 api/business 直连
- 2. 串联运行 S1 / S2-S4 / S1-S4 full 子脚本
- 3. 校验 aidop_action_run_log 关键 action_code 存在
- """
- from __future__ import annotations
- import json
- import os
- import subprocess
- import sys
- import urllib.error
- import urllib.request
- from pathlib import Path
- ROOT = Path(__file__).resolve().parents[1]
- DOC = ROOT / "doc"
- WEB = ROOT / "Web"
- BASE = os.environ.get("AIDOP_API_BASE", "http://127.0.0.1:5005")
- TENANT = 797403760988229
- REQUIRED_ACTION_CODES = [
- "S1_ORDER_REVIEW",
- "S1_DELIVERY_CONFIRM",
- "S1_ORDER_REFRESH_PLAN",
- "S2_SCHEDULE_GENERATE",
- "S2_PRIORITY_RECHECK",
- "S2_KITTING_CHECK_BATCH",
- "S2_KITTING_CHECK_SINGLE",
- "S2_WORK_ORDER_RELEASE",
- "S4_MRP_GENERATE",
- "S3_DELIVERY_GENERATE",
- "S4_PROCUREMENT_PIPELINE",
- ]
- CHILD_SCRIPTS = [
- ("order_review", DOC / "_verify_order_review_e2e.py"),
- ("s2_s4_chain", DOC / "_verify_s2_s4_chain_e2e.py"),
- ("s1_s4_full", DOC / "_verify_s1_s4_full_e2e.py"),
- ("s3_delivery_18step", DOC / "_verify_s3_delivery_18step_e2e.py"),
- ]
- PREP_SCRIPT = DOC / "_prep_close_evidence.py"
- OLD_URL_PATTERNS = [
- "123.60.180.165:8087",
- "/api/business/",
- ]
- def scan_old_frontend_urls() -> list[str]:
- hits: list[str] = []
- if not WEB.exists():
- return hits
- skip_dirs = {"node_modules", "dist", ".git", ".pnpm"}
- for path in WEB.rglob("*"):
- if not path.is_file():
- continue
- if any(part in skip_dirs for part in path.parts):
- continue
- if path.suffix not in {".ts", ".vue", ".js", ".tsx", ".jsx"}:
- continue
- try:
- text = path.read_text(encoding="utf-8", errors="ignore")
- except OSError:
- continue
- for pat in OLD_URL_PATTERNS:
- if pat in text:
- hits.append(f"{path.relative_to(ROOT)}: {pat}")
- return hits
- def http_json(method: str, path: str, body=None, token: str | None = None):
- headers = {"Content-Type": "application/json", "Accept": "application/json"}
- if token:
- headers["Authorization"] = f"Bearer {token}"
- data = None if body is None else json.dumps(body).encode("utf-8")
- req = urllib.request.Request(f"{BASE}{path}", data=data, headers=headers, method=method)
- try:
- with urllib.request.urlopen(req, timeout=60) as resp:
- raw = resp.read().decode("utf-8")
- try:
- return resp.status, json.loads(raw)
- except json.JSONDecodeError:
- return resp.status, {"raw": raw[:2000]}
- except urllib.error.HTTPError as e:
- raw = e.read().decode("utf-8", errors="replace")
- try:
- return e.code, json.loads(raw)
- except json.JSONDecodeError:
- return e.code, {"raw": raw[:2000]}
- def login() -> str:
- node_script = r"""
- const { sm2 } = require('sm-crypto-v2');
- const PK = '0484C7466D950E120E5ECE5DD85D0C90EAA85081A3A2BD7C57AE6DC822EFCCBD66620C67B0103FC8DD280E36C3B282977B722AAEC3C56518EDCEBAFB72C5A05312';
- console.log(JSON.stringify({ account: 'AIDOPDemo', password: sm2.doEncrypt('1234567890dop', PK, 1), tenantId: '797403760988229' }));
- """
- proc = subprocess.run(
- ["node", "-e", node_script],
- cwd=str(WEB),
- capture_output=True,
- text=True,
- timeout=30,
- )
- if proc.returncode != 0:
- raise RuntimeError(proc.stderr or proc.stdout)
- _, body = http_json("POST", "/api/sysAuth/login", json.loads(proc.stdout.strip()))
- token = (body.get("result") or {}).get("accessToken") or body.get("accessToken")
- if not token:
- raise RuntimeError(f"login failed: {body}")
- return token
- def parse_child_report(stdout: str) -> dict | None:
- text = (stdout or "").strip()
- if not text:
- return None
- try:
- return json.loads(text)
- except json.JSONDecodeError:
- start = text.find("{")
- end = text.rfind("}")
- if start >= 0 and end > start:
- try:
- return json.loads(text[start : end + 1])
- except json.JSONDecodeError:
- return None
- return None
- def scan_frontend_wiring() -> list[dict]:
- """CLOSE-E2E:检查关键能力是否已挂到前端页面(彻底打通之一)。"""
- checks: list[dict] = []
- def file_contains(rel: str, *needles: str) -> bool:
- path = ROOT / rel
- if not path.exists():
- return False
- text = path.read_text(encoding="utf-8", errors="ignore")
- return all(n in text for n in needles)
- def files_contain(rels: list[str], *needles: str) -> bool:
- text = ""
- for rel in rels:
- path = ROOT / rel
- if path.exists():
- text += path.read_text(encoding="utf-8", errors="ignore")
- return all(n in text for n in needles)
- wiring = [
- ("S1_order_review_page", "Web/src/views/aidop/business/orderList.vue", ("reviewSeOrder", "confirmSeOrderDelivery"), False),
- ("S2_schedule_page", "Web/src/views/aidop/production/workOrderSchedulingList.vue", ("productionSchedule",), False),
- (
- "S2_dispatch_page",
- [
- "Web/src/views/aidop/business/workOrderDispatchList.vue",
- "Web/src/views/aidop/business/workOrderDispatchForm.vue",
- ],
- ("onKitCheck", "releaseWorkOrder"),
- True,
- ),
- ("S2_single_kitting_page", "Web/src/views/aidop/business/workOrderDispatchList.vue", ("kittingCheckSingle",), False),
- ("S3_delivery_generate_page", "Web/src/views/aidop/s3/supply/deliveryScheduleList.vue", ("generateDeliverySchedule",), False),
- ("S3_procurement_pipeline_page", "Web/src/views/aidop/s3/supply/deliveryScheduleList.vue", ("executeProcurementPipeline",), False),
- ("S4_read_path_page", "Web/src/views/aidop/data-platform/actionRunLogs.vue", ("fetchS4ReadPathConsistency",), False),
- ]
- for item in wiring:
- name, rel, needles, multi = item
- if multi:
- ok = files_contain(rel, *needles)
- file_label = " + ".join(rel)
- else:
- ok = file_contains(rel, *needles)
- file_label = rel
- checks.append({"name": name, "file": file_label, "ok": ok, "needles": list(needles)})
- return checks
- def build_module_status(child_reports: dict[str, dict | None], wiring: list[dict]) -> dict:
- """汇总 S1-S4 打通状态:passed / 待对账 / 待补齐 / 需决策。"""
- modules: dict[str, dict] = {
- "S1": {"status": "待对账", "evidence": [], "gaps": []},
- "S2": {"status": "待对账", "evidence": [], "gaps": []},
- "S3": {"status": "待对账", "evidence": [], "gaps": []},
- "S4": {"status": "待对账", "evidence": [], "gaps": []},
- }
- def child_passed(name: str) -> bool:
- r = child_reports.get(name)
- return isinstance(r, dict) and bool(r.get("passed"))
- if child_passed("order_review"):
- modules["S1"]["evidence"].append("order_review_e2e")
- if child_passed("s1_s4_full"):
- modules["S1"]["evidence"].append("s1_s4_full_e2e")
- modules["S4"]["evidence"].append("s1_s4_full_e2e")
- if child_passed("s2_s4_chain"):
- modules["S2"]["evidence"].append("s2_s4_chain_e2e")
- modules["S3"]["evidence"].append("s2_s4_chain_e2e")
- modules["S4"]["evidence"].append("s2_s4_chain_e2e")
- if child_passed("s3_delivery_18step"):
- modules["S3"]["evidence"].append("s3_delivery_18step_e2e")
- wiring_map = {w["name"]: w["ok"] for w in wiring}
- if not wiring_map.get("S1_order_review_page"):
- modules["S1"]["gaps"].append("订单评审页未接线")
- if not wiring_map.get("S2_schedule_page") or not wiring_map.get("S2_dispatch_page"):
- modules["S2"]["gaps"].append("排程/下达页未接线")
- if not wiring_map.get("S2_single_kitting_page"):
- modules["S2"]["gaps"].append("单工单齐套无页面入口")
- if not wiring_map.get("S3_delivery_generate_page"):
- modules["S3"]["gaps"].append("交货计划生成无页面入口")
- if not wiring_map.get("S3_procurement_pipeline_page"):
- modules["S3"]["gaps"].append("采购闭环无页面入口")
- if not wiring_map.get("S4_read_path_page"):
- modules["S4"]["gaps"].append("S4读路径一致性无页面入口")
- modules["S2"]["gaps"].append("库存占用:需决策")
- modules["S3"]["gaps"].append("真实SAP/QAD:需决策")
- modules["S1"]["gaps"].append("替代料口径:需决策")
- decision_only = lambda gaps: not gaps or all("需决策" in g for g in gaps)
- required_evidence = {
- "S1": {"order_review_e2e", "s1_s4_full_e2e"},
- "S2": {"s2_s4_chain_e2e"},
- "S3": {"s3_delivery_18step_e2e"},
- "S4": {"s1_s4_full_e2e", "s2_s4_chain_e2e"},
- }
- for key, mod in modules.items():
- if mod["gaps"] and any("未接线" in g or "无页面" in g for g in mod["gaps"]):
- mod["status"] = "待补齐"
- elif not mod["evidence"]:
- mod["status"] = "待补齐"
- elif set(mod["evidence"]) >= required_evidence[key] and decision_only(mod["gaps"]):
- mod["status"] = "已打通"
- elif mod["evidence"]:
- mod["status"] = "待对账"
- return modules
- def run_child(name: str, script: Path) -> dict:
- if not script.exists():
- return {"name": name, "ok": False, "error": f"missing script: {script}"}
- proc = subprocess.run(
- [sys.executable, str(script)],
- cwd=str(ROOT),
- capture_output=True,
- text=True,
- timeout=300,
- )
- parsed = parse_child_report(proc.stdout or "")
- result = {
- "name": name,
- "ok": proc.returncode == 0,
- "exit_code": proc.returncode,
- "parsed": parsed,
- "stdout_tail": (proc.stdout or "")[-4000:],
- "stderr_tail": (proc.stderr or "")[-2000:],
- }
- if isinstance(parsed, dict):
- result["child_passed"] = parsed.get("passed")
- result["child_checks"] = parsed.get("checks")
- return result
- def check_action_logs(token: str) -> dict:
- checks = []
- for code in REQUIRED_ACTION_CODES:
- st, body = http_json(
- "GET",
- f"/api/Aidop/action-run-log/list?actionCode={code}&page=1&pageSize=1",
- token=token,
- )
- total = 0
- if isinstance(body, dict):
- total = body.get("total") or 0
- checks.append({"action_code": code, "http": st, "total": total, "ok": st == 200 and total > 0})
- return {"checks": checks, "ok": all(c["ok"] for c in checks)}
- def main():
- report = {
- "matrix_doc": "doc/plan/旧Business API等价验收矩阵.md",
- "steps": [],
- "checks": [],
- }
- url_hits = scan_old_frontend_urls()
- report["steps"].append({"step": "old_url_scan", "hits": url_hits, "ok": len(url_hits) == 0})
- report["checks"].append({"name": "no_old_frontend_url", "ok": len(url_hits) == 0, "detail": url_hits})
- prep_result = run_child("prep_close_evidence", PREP_SCRIPT)
- report["steps"].append({"step": "prep_close_evidence", **prep_result})
- report["checks"].append({"name": "prep_close_evidence", "ok": prep_result.get("ok", False)})
- child_results = []
- child_reports: dict[str, dict | None] = {}
- for name, script in CHILD_SCRIPTS:
- result = run_child(name, script)
- child_results.append(result)
- child_reports[name] = result.get("parsed") if isinstance(result.get("parsed"), dict) else None
- report["steps"].append({"step": f"child_{name}", **result})
- report["checks"].append({"name": f"child_{name}", "ok": result.get("ok", False)})
- wiring = scan_frontend_wiring()
- report["steps"].append({"step": "frontend_wiring", "checks": wiring, "ok": all(c["ok"] for c in wiring)})
- report["checks"].append({
- "name": "frontend_wiring",
- "ok": all(c["ok"] for c in wiring),
- "detail": [c for c in wiring if not c["ok"]],
- })
- report["module_status"] = build_module_status(child_reports, wiring)
- report["gaps"] = {
- k: v.get("gaps", [])
- for k, v in report["module_status"].items()
- }
- try:
- token = login()
- log_check = check_action_logs(token)
- report["steps"].append({"step": "action_log_codes", **log_check})
- report["checks"].append({
- "name": "action_log_codes",
- "ok": log_check["ok"],
- "detail": [c for c in log_check["checks"] if not c["ok"]],
- })
- except Exception as ex:
- report["steps"].append({"step": "action_log_codes", "ok": False, "error": str(ex)})
- report["checks"].append({"name": "action_log_codes", "ok": False, "detail": str(ex)})
- report["passed"] = all(c["ok"] for c in report["checks"])
- out = DOC / "_verify_old_api_equivalence_e2e_result.json"
- out.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8")
- print(json.dumps(report, ensure_ascii=False, indent=2))
- sys.exit(0 if report["passed"] else 5)
- if __name__ == "__main__":
- main()
|