| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374 |
- #!/usr/bin/env python3
- """E2E: S1 order review -> resource check -> work order generation."""
- import json
- import sys
- import urllib.error
- import urllib.request
- try:
- import pymysql
- except ImportError:
- print(json.dumps({"error": "pymysql not installed"}, ensure_ascii=False))
- sys.exit(1)
- BASE = "http://127.0.0.1:5005"
- TENANT = 797403760988229
- BILL_NO = "MPO482024102300001"
- CONN = dict(
- host="123.60.180.165",
- port=3306,
- user="aidopremote",
- password="1234567890aiDOP#",
- database="aidopdev",
- charset="utf8mb4",
- connect_timeout=20,
- cursorclass=pymysql.cursors.DictCursor,
- )
- def http_json(method, path, body=None, token=None):
- headers = {"Content-Type": "application/json", "Accept": "application/json"}
- if token:
- headers["Authorization"] = f"Bearer {token}"
- data = None if body is None else json.dumps(body).encode("utf-8")
- req = urllib.request.Request(f"{BASE}{path}", data=data, headers=headers, method=method)
- try:
- with urllib.request.urlopen(req, timeout=60) as resp:
- raw = resp.read().decode("utf-8")
- return resp.status, json.loads(raw) if raw else {}
- except urllib.error.HTTPError as e:
- raw = e.read().decode("utf-8", errors="replace")
- try:
- body = json.loads(raw)
- except json.JSONDecodeError:
- body = {"raw": raw[:2000]}
- return e.code, body
- def login_sm2():
- """Login via Node sm-crypto helper."""
- import subprocess
- node_script = r"""
- const { sm2 } = require('sm-crypto-v2');
- const PK = '0484C7466D950E120E5ECE5DD85D0C90EAA85081A3A2BD7C57AE6DC822EFCCBD66620C67B0103FC8DD280E36C3B282977B722AAEC3C56518EDCEBAFB72C5A05312';
- const encPwd = sm2.doEncrypt('1234567890dop', PK, 1);
- console.log(JSON.stringify({ account: 'AIDOPDemo', password: encPwd, tenantId: '797403760988229' }));
- """
- proc = subprocess.run(
- ["node", "-e", node_script],
- cwd=r"d:\Projects\Ai-DOP\SourceCode\ZZYDOP\Web",
- capture_output=True,
- text=True,
- timeout=30,
- )
- if proc.returncode != 0:
- raise RuntimeError(f"sm2 encrypt failed: {proc.stderr}")
- payload = json.loads(proc.stdout.strip())
- status, body = http_json("POST", "/api/sysAuth/login", payload)
- token = (
- (body.get("result") or {}).get("accessToken")
- or body.get("accessToken")
- or (body.get("result") or {}).get("token")
- )
- if not token:
- raise RuntimeError(f"login failed: {json.dumps(body, ensure_ascii=False)[:500]}")
- return token
- def q(cur, sql, params=None):
- cur.execute(sql, params or ())
- return cur.fetchall()
- def q1(cur, sql, params=None):
- rows = q(cur, sql, params)
- return rows[0] if rows else {}
- def unwrap(body):
- if isinstance(body, dict) and body.get("code") == 200 and "result" in body:
- return body["result"]
- return body
- def api_ok(status, body):
- if status != 200:
- return False
- if body.get("code") == 200 and body.get("type") == "success":
- return True
- payload = unwrap(body)
- return isinstance(payload, dict) and bool(payload.get("actionCode") or payload.get("message"))
- def snapshot_db(cur, order_id=None):
- snap = {}
- snap["order"] = q1(
- cur,
- """
- SELECT Id, bill_no, tenant_id, status
- FROM crm_seorder
- WHERE tenant_id = %s AND bill_no = %s AND IsDeleted = 0
- """,
- (TENANT, BILL_NO),
- )
- oid = order_id or snap["order"].get("Id")
- snap["entries"] = q(
- cur,
- """
- SELECT Id, entry_seq, item_number, qty, plan_date, sys_capacity_date, progress, bom_number
- FROM crm_seorderentry
- WHERE seorder_id = %s AND tenant_id = %s AND IsDeleted = 0
- ORDER BY entry_seq
- """,
- (oid, TENANT),
- ) if oid else []
- entry_ids = [r["Id"] for r in snap["entries"]]
- if entry_ids:
- fmt = ",".join(["%s"] * len(entry_ids))
- snap["work_orders"] = q(
- cur,
- f"""
- SELECT RecID, WorkOrd, ItemNum, Status, BusinessID, tenant_id
- FROM WorkOrdMaster
- WHERE tenant_id = %s AND BusinessID IN ({fmt})
- ORDER BY WorkOrd
- """,
- [TENANT, *entry_ids],
- )
- snap["examine_results"] = q(
- cur,
- f"""
- SELECT Id, sentry_id, morder_no, need_qty, kitting_times, IsDeleted, create_time
- FROM b_examine_result
- WHERE tenant_id = %s AND sentry_id IN ({fmt})
- ORDER BY create_time DESC
- LIMIT 10
- """,
- [TENANT, *entry_ids],
- )
- snap["examine_lines"] = q(
- cur,
- f"""
- SELECT COUNT(*) AS cnt
- FROM b_bom_child_examine b
- INNER JOIN b_examine_result e ON e.Id = b.examine_id
- WHERE e.tenant_id = %s AND e.sentry_id IN ({fmt}) AND e.IsDeleted = 0 AND b.is_use = 1
- """,
- [TENANT, *entry_ids],
- )
- snap["examine_line_count"] = (snap["examine_lines"][0] or {}).get("cnt", 0)
- else:
- snap["work_orders"] = []
- snap["examine_results"] = []
- snap["examine_line_count"] = 0
- snap["action_log_table"] = q1(
- cur,
- """
- SELECT COUNT(*) AS cnt
- FROM information_schema.tables
- WHERE table_schema = 'aidopdev' AND table_name = 'aidop_action_run_log'
- """,
- ).get("cnt", 0)
- snap["recent_action_logs"] = []
- if snap["action_log_table"]:
- snap["recent_action_logs"] = q(
- cur,
- """
- SELECT id, action_code, biz_no, status, message, start_time
- FROM aidop_action_run_log
- WHERE tenant_id = %s AND action_code LIKE 'S1_%%'
- ORDER BY start_time DESC
- LIMIT 5
- """,
- (TENANT,),
- )
- snap["item_master"] = {}
- for item in ("3121C0035", "1A0C885"):
- snap["item_master"][item] = q1(
- cur, "SELECT COUNT(*) AS cnt FROM ItemMaster WHERE ItemNum = %s", (item,)
- ).get("cnt", 0)
- snap["ic_bom"] = {}
- for item in ("3121C0035", "1A0C885"):
- snap["ic_bom"][item] = q1(
- cur,
- """
- SELECT COUNT(*) AS cnt FROM ic_bom
- WHERE tenant_id = %s AND item_number = %s AND IsDeleted = 0
- """,
- (TENANT, item),
- ).get("cnt", 0)
- snap["s1_mdp_runs"] = q(
- cur,
- """
- SELECT id, job_code, status, batch_id, start_time, end_time
- FROM mdp_transform_run_log
- WHERE job_code = 'S1_MDP_SYNC_TRANSFORM'
- AND start_time >= DATE_SUB(NOW(), INTERVAL 6 HOUR)
- ORDER BY start_time DESC
- LIMIT 5
- """,
- )
- snap["dwd_ship_rows"] = q1(
- cur,
- """
- SELECT COUNT(*) AS cnt FROM dwd_ship_trans
- WHERE tenant_id = %s AND order_no = %s
- """,
- (TENANT, BILL_NO),
- ).get("cnt", 0)
- return snap
- def prepare_review_state(conn, order_id):
- """将订单行置为可评审态(progress 0/1),便于重复跑 E2E。"""
- with conn.cursor() as cur:
- cur.execute(
- """
- UPDATE crm_seorderentry
- SET progress = '0', update_time = NOW()
- WHERE seorder_id = %s AND tenant_id = %s AND IsDeleted = 0
- """,
- (order_id, TENANT),
- )
- rows = cur.rowcount
- conn.commit()
- return rows
- def main():
- report = {"bill_no": BILL_NO, "tenant_id": str(TENANT), "steps": []}
- # 0) backend health
- try:
- status, body = http_json("GET", "/api/sysAuth/captcha")
- report["backend"] = {"ok": status == 200, "http": status}
- except Exception as e:
- report["backend"] = {"ok": False, "error": str(e)}
- print(json.dumps(report, ensure_ascii=False, indent=2, default=str))
- sys.exit(1)
- token = login_sm2()
- report["login"] = "ok"
- # 1) list order via API
- status, list_body = http_json(
- "GET",
- f"/api/Order/seorder/list?billNo={BILL_NO}&page=1&pageSize=20",
- token=token,
- )
- result = list_body.get("result") or list_body
- rows = result.get("list") or []
- report["steps"].append(
- {
- "step": "list_order",
- "http": status,
- "total": result.get("total"),
- "order_id": rows[0]["id"] if rows else None,
- }
- )
- if not rows:
- print(json.dumps(report, ensure_ascii=False, indent=2, default=str))
- sys.exit(2)
- order_id = rows[0]["id"]
- with pymysql.connect(**CONN) as conn:
- with conn.cursor() as cur:
- report["before"] = snapshot_db(cur, order_id)
- report["e2e_prep"] = {"entry_progress_reset": prepare_review_state(conn, order_id)}
- with conn.cursor() as cur:
- report["before_after_prep"] = snapshot_db(cur, order_id)
- # 2) call review API
- status, review_body = http_json(
- "POST",
- "/api/Order/seorder/review",
- body={"ids": [order_id]},
- token=token,
- )
- report["steps"].append(
- {
- "step": "review_api",
- "http": status,
- "code": review_body.get("code"),
- "type": review_body.get("type"),
- "message": review_body.get("message"),
- "result": review_body.get("result"),
- }
- )
- with pymysql.connect(**CONN) as conn:
- with conn.cursor() as cur:
- report["after"] = snapshot_db(cur, order_id)
- # 3) assertions
- review_result = unwrap(review_body)
- after = report["after"]
- checks = []
- checks.append(
- {
- "name": "review_api_success",
- "ok": api_ok(status, review_body),
- "detail": review_result.get("message") or review_body.get("message"),
- }
- )
- checks.append(
- {
- "name": "work_orders_exist",
- "ok": len(after.get("work_orders") or []) >= 2,
- "detail": [w.get("WorkOrd") for w in (after.get("work_orders") or [])],
- }
- )
- checks.append(
- {
- "name": "resource_check_rows",
- "ok": (review_result.get("resourceCheckCount") or 0) >= 1
- or len(after.get("examine_results") or []) >= 1,
- "detail": {
- "api_resourceCheckCount": review_result.get("resourceCheckCount"),
- "db_examine_count": len(after.get("examine_results") or []),
- "db_examine_line_count": after.get("examine_line_count"),
- },
- }
- )
- checks.append(
- {
- "name": "action_log_table",
- "ok": (after.get("action_log_table") or 0) >= 1,
- "detail": "aidop_action_run_log exists",
- }
- )
- checks.append(
- {
- "name": "item_master_prereq",
- "ok": all(v >= 1 for v in (after.get("item_master") or {}).values()),
- "detail": after.get("item_master"),
- }
- )
- checks.append(
- {
- "name": "s1_mdp_recent_run",
- "ok": len(after.get("s1_mdp_runs") or []) >= 1,
- "detail": after.get("s1_mdp_runs"),
- }
- )
- checks.append(
- {
- "name": "dwd_ship_trans_order",
- "ok": (after.get("dwd_ship_rows") or 0) >= 2,
- "detail": f"order_no={BILL_NO}, rows={after.get('dwd_ship_rows')}",
- "note": "主样例订单行 2 行,DWD 应 >=2",
- }
- )
- report["checks"] = checks
- report["passed"] = all(c["ok"] for c in checks)
- print(json.dumps(report, ensure_ascii=False, indent=2, default=str))
- sys.exit(0 if report["passed"] else 3)
- if __name__ == "__main__":
- main()
|