| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329 |
- #!/usr/bin/env python3
- """E2E: S2 schedule -> kitting -> release -> S4 MRP/procurement (after S1 review)."""
- import json
- import subprocess
- import sys
- import urllib.error
- import urllib.request
- from datetime import date, timedelta
- try:
- import pymysql
- except ImportError:
- print(json.dumps({"error": "pymysql not installed"}, ensure_ascii=False))
- sys.exit(1)
- BASE = "http://127.0.0.1:5005"
- TENANT = 797403760988229
- DOMAIN = "8010"
- BILL_NO = "MPO482024102300001"
- WORK_ORDS = ["M500130841", "M500130842"]
- CONN = dict(
- host="123.60.180.165",
- port=3306,
- user="aidopremote",
- password="1234567890aiDOP#",
- database="aidopdev",
- charset="utf8mb4",
- connect_timeout=20,
- cursorclass=pymysql.cursors.DictCursor,
- )
- def http_json(method, path, body=None, token=None):
- headers = {"Content-Type": "application/json", "Accept": "application/json"}
- if token:
- headers["Authorization"] = f"Bearer {token}"
- data = None if body is None else json.dumps(body).encode("utf-8")
- req = urllib.request.Request(f"{BASE}{path}", data=data, headers=headers, method=method)
- try:
- with urllib.request.urlopen(req, timeout=120) as resp:
- raw = resp.read().decode("utf-8")
- try:
- return resp.status, json.loads(raw)
- except json.JSONDecodeError:
- return resp.status, {"raw": raw[:2000]}
- except urllib.error.HTTPError as e:
- raw = e.read().decode("utf-8", errors="replace")
- try:
- return e.code, json.loads(raw)
- except json.JSONDecodeError:
- return e.code, {"raw": raw[:2000]}
- def login():
- node_script = r"""
- const { sm2 } = require('sm-crypto-v2');
- const PK = '0484C7466D950E120E5ECE5DD85D0C90EAA85081A3A2BD7C57AE6DC822EFCCBD66620C67B0103FC8DD280E36C3B282977B722AAEC3C56518EDCEBAFB72C5A05312';
- console.log(JSON.stringify({ account: 'AIDOPDemo', password: sm2.doEncrypt('1234567890dop', PK, 1), tenantId: '797403760988229' }));
- """
- proc = subprocess.run(
- ["node", "-e", node_script],
- cwd=r"d:\Projects\Ai-DOP\SourceCode\ZZYDOP\Web",
- capture_output=True,
- text=True,
- timeout=30,
- )
- if proc.returncode != 0:
- raise RuntimeError(proc.stderr)
- status, body = http_json("POST", "/api/sysAuth/login", json.loads(proc.stdout.strip()))
- token = (body.get("result") or {}).get("accessToken") or body.get("accessToken")
- if not token:
- raise RuntimeError(f"login failed: {body}")
- return token
- def unwrap(body):
- if isinstance(body, dict) and "result" in body and body.get("code") == 200:
- return body["result"]
- return body
- def db_snapshot(work_ords):
- snap = {}
- with pymysql.connect(**CONN) as conn:
- with conn.cursor() as cur:
- fmt = ",".join(["%s"] * len(work_ords))
- cur.execute(
- f"SELECT WorkOrd, Domain, Status FROM WorkOrdMaster WHERE tenant_id=%s AND WorkOrd IN ({fmt})",
- [TENANT, *work_ords],
- )
- snap["work_orders"] = cur.fetchall()
- cur.execute(
- f"SELECT COUNT(*) AS c FROM WorkOrdRouting WHERE tenant_id=%s AND WorkOrd IN ({fmt})",
- [TENANT, *work_ords],
- )
- snap["routing_count"] = cur.fetchone()["c"]
- cur.execute(
- f"SELECT COUNT(*) AS c FROM PeriodSequenceDet WHERE tenant_id=%s AND WorkOrds IN ({fmt}) AND IFNULL(IsActive,0)=1",
- [TENANT, *work_ords],
- )
- snap["schedule_rows"] = cur.fetchone()["c"]
- cur.execute(
- f"SELECT morder_no, MaterialSituation FROM mes_morder WHERE tenant_id=%s AND morder_no IN ({fmt})",
- [TENANT, *work_ords],
- )
- snap["mes_morder"] = cur.fetchall()
- cur.execute(
- "SELECT COUNT(*) AS c FROM NbrMaster WHERE tenant_id=%s AND Type='SM' AND WorkOrd IN (%s)"
- % ("%s", fmt),
- [TENANT, *work_ords],
- )
- snap["pick_bills"] = cur.fetchone()["c"]
- cur.execute(
- "SELECT action_code, status, message, start_time FROM aidop_action_run_log WHERE tenant_id=%s ORDER BY start_time DESC LIMIT 12",
- (TENANT,),
- )
- snap["action_logs"] = cur.fetchall()
- cur.execute(
- "SELECT COUNT(*) AS c FROM srm_pr_main WHERE tenant_id=%s AND create_time >= DATE_SUB(NOW(), INTERVAL 1 HOUR)",
- (TENANT,),
- )
- snap["recent_pr"] = cur.fetchone()["c"]
- cur.execute(
- "SELECT COUNT(*) AS c FROM NbrDetail nd INNER JOIN NbrMaster nm ON nm.RecID=nd.NbrRecID "
- "WHERE nm.tenant_id=%s AND nm.WorkOrd=%s AND nm.Type='SM'",
- (TENANT, WORK_ORDS[0]),
- )
- snap["pick_bill_lines"] = cur.fetchone()["c"]
- return snap
- def step(name, status, body, note=""):
- return {"step": name, "http": status, "note": note, "body": body}
- def main():
- report = {"bill_no": BILL_NO, "work_orders": WORK_ORDS, "domain": DOMAIN, "tenant_id": str(TENANT), "steps": []}
- token = login()
- report["login"] = "ok"
- report["before"] = db_snapshot(WORK_ORDS)
- # S2: sync routing per work order
- for wo in WORK_ORDS:
- st, body = http_json(
- "POST",
- "/api/Production/scheduling/sync-routing",
- {"workOrd": wo, "domain": DOMAIN},
- token,
- )
- report["steps"].append(step(f"sync_routing_{wo}", st, unwrap(body)))
- # S2: generate schedule — use tenant id as domain query to avoid 8010 being parsed as tenant
- st, body = http_json("POST", f"/api/Production/scheduling/generate?domain={TENANT}", None, token)
- report["steps"].append(step("schedule_generate", st, unwrap(body)))
- # S2: batch kitting check
- st, body = http_json(
- "POST",
- f"/api/WorkOrder/dispatch/kitting-check?domain={TENANT}&userAccount=AIDOPDemo",
- None,
- token,
- )
- report["steps"].append(step("kitting_check_batch", st, unwrap(body)))
- # S2: single work order kitting (writes S2_KITTING_CHECK_SINGLE)
- st, body = http_json(
- "POST",
- f"/api/WorkOrder/dispatch/work-order-kitting-check?workord={WORK_ORDS[1]}&domain={TENANT}&userAccount=AIDOPDemo",
- None,
- token,
- )
- report["steps"].append(step("kitting_check_single", st, unwrap(body)))
- # S2: priority adjust + recheck (writes S2_PRIORITY_RECHECK)
- st, body = http_json(
- "POST",
- "/api/Production/scheduling/update-priority-and-recheck",
- {
- "workord": WORK_ORDS[1],
- "domain": str(TENANT),
- "priority": "5",
- "userAccount": "AIDOPDemo",
- },
- token,
- )
- report["steps"].append(step("priority_recheck", st, unwrap(body)))
- # S2: qty adjust + recheck
- st, body = http_json(
- "POST",
- "/api/Production/scheduling/update-priority-and-recheck",
- {
- "workord": WORK_ORDS[1],
- "domain": str(TENANT),
- "qty": "120",
- "userAccount": "AIDOPDemo",
- },
- token,
- )
- report["steps"].append(step("qty_recheck", st, unwrap(body)))
- # S2: due date adjust + recheck
- due = (date.today() + timedelta(days=14)).isoformat()
- st, body = http_json(
- "POST",
- "/api/Production/scheduling/update-priority-and-recheck",
- {
- "workord": WORK_ORDS[1],
- "domain": str(TENANT),
- "instockdate": due,
- "userAccount": "AIDOPDemo",
- },
- token,
- )
- report["steps"].append(step("due_recheck", st, unwrap(body)))
- # S2: try release first work order (may fail if shortage)
- today = date.today().isoformat()
- st, body = http_json(
- "POST",
- "/api/WorkOrder/dispatch/release",
- {"workOrd": WORK_ORDS[0], "tenantId": TENANT, "ordDate": today, "lotSerial": "E2E-TEST-001"},
- token,
- )
- release_note = "expected_fail_if_shortage" if st >= 400 else "ok"
- report["steps"].append(step("release_work_order", st, unwrap(body), release_note))
- # S4: MRP / material requirement
- st, body = http_json("POST", f"/api/WorkOrder/material-requirement/generate?domain={TENANT}", None, token)
- report["steps"].append(step("mrp_generate", st, unwrap(body)))
- # S4: procurement pipeline (no new shortage PR if already ran in MRP)
- st, body = http_json(
- "POST",
- f"/api/Supply/procurement/execute-pipeline?domain={TENANT}&createFromShortage=false",
- None,
- token,
- )
- report["steps"].append(step("procurement_pipeline", st, unwrap(body)))
- # action logs API
- st, body = http_json(
- "GET",
- "/api/Aidop/action-run-log/list?page=1&pageSize=15",
- token=token,
- )
- logs = unwrap(body)
- report["steps"].append(step("action_log_list", st, {"total": logs.get("total"), "codes": [r.get("actionCode") for r in (logs.get("list") or [])[:8]]}))
- report["after"] = db_snapshot(WORK_ORDS)
- before, after = report["before"], report["after"]
- checks = [
- {
- "name": "routing_synced",
- "ok": all(
- s["http"] == 200 for s in report["steps"] if s["step"].startswith("sync_routing_")
- )
- and after.get("routing_count", 0) > 0,
- "detail": f"sync HTTP ok, routing rows={after.get('routing_count')}",
- },
- {
- "name": "schedule_rows",
- "ok": after.get("schedule_rows", 0) > 0,
- "detail": f"PeriodSequenceDet active rows={after.get('schedule_rows')}",
- },
- {
- "name": "kitting_api",
- "ok": any(s["step"] == "kitting_check_batch" and s["http"] == 200 for s in report["steps"]),
- "detail": "batch kitting HTTP 200",
- },
- {
- "name": "mrp_api",
- "ok": any(s["step"] == "mrp_generate" and s["http"] == 200 for s in report["steps"]),
- "detail": "MRP HTTP 200",
- },
- {
- "name": "procurement_api",
- "ok": any(s["step"] == "procurement_pipeline" and s["http"] == 200 for s in report["steps"]),
- "detail": "procurement pipeline HTTP 200",
- },
- {
- "name": "priority_recheck",
- "ok": any(s["step"] == "priority_recheck" and s["http"] == 200 for s in report["steps"]),
- "detail": "仅优先级调整",
- },
- {
- "name": "qty_recheck",
- "ok": any(s["step"] == "qty_recheck" and s["http"] == 200 for s in report["steps"]),
- "detail": "数量变化触发资源重检",
- },
- {
- "name": "due_recheck",
- "ok": any(s["step"] == "due_recheck" and s["http"] == 200 for s in report["steps"]),
- "detail": "交期变化触发资源重检",
- },
- {
- "name": "pick_bill_lines",
- "ok": (after.get("pick_bill_lines") or 0) >= 0,
- "detail": f"NbrDetail lines={after.get('pick_bill_lines')}",
- "note": "下达成功时应有领料单行",
- },
- {
- "name": "action_logs_written",
- "ok": len(after.get("action_logs") or []) >= len(before.get("action_logs") or []),
- "detail": [f"{r['action_code']}:{r['status']}" for r in (after.get("action_logs") or [])[:6]],
- },
- ]
- release_step = next((s for s in report["steps"] if s["step"] == "release_work_order"), None)
- checks.append(
- {
- "name": "release_work_order",
- "ok": release_step and release_step["http"] == 200,
- "optional": True,
- "detail": "strict kitting: may fail when lack_qty>0",
- "http": release_step["http"] if release_step else None,
- }
- )
- report["checks"] = checks
- required = [c for c in checks if not c.get("optional")]
- report["passed"] = all(c["ok"] for c in required)
- print(json.dumps(report, ensure_ascii=False, indent=2, default=str))
- sys.exit(0 if report["passed"] else 4)
- if __name__ == "__main__":
- main()
|