| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390 |
- #!/usr/bin/env python3
- """Full E2E: S1 confirm/refresh + S2 release success + S4 MRP PR + delivery schedule."""
- import json
- import subprocess
- import sys
- import urllib.error
- import urllib.request
- from datetime import date
- import pymysql
- BASE = "http://127.0.0.1:5005"
- TENANT = 797403760988229
- DOMAIN = "8010"
- ORDER_ID = 9106000400000002
- BILL_NO = "MPO482024102300001"
- WORK_ORDS = ["M500130841", "M500130842"]
- RELEASE_WO = "M500130841"
- SHORTAGE_ITEM = "034DD002"
- PO_SAMPLE = "PO-UAT-20260604-01"
- CONN = dict(
- host="123.60.180.165",
- port=3306,
- user="aidopremote",
- password="1234567890aiDOP#",
- database="aidopdev",
- charset="utf8mb4",
- connect_timeout=20,
- cursorclass=pymysql.cursors.DictCursor,
- )
- def http_json(method, path, body=None, token=None):
- headers = {"Content-Type": "application/json", "Accept": "application/json"}
- if token:
- headers["Authorization"] = f"Bearer {token}"
- data = None if body is None else json.dumps(body).encode("utf-8")
- req = urllib.request.Request(f"{BASE}{path}", data=data, headers=headers, method=method)
- try:
- with urllib.request.urlopen(req, timeout=120) as resp:
- raw = resp.read().decode("utf-8")
- try:
- return resp.status, json.loads(raw)
- except json.JSONDecodeError:
- return resp.status, {"raw": raw[:2000]}
- except urllib.error.HTTPError as e:
- raw = e.read().decode("utf-8", errors="replace")
- try:
- return e.code, json.loads(raw)
- except json.JSONDecodeError:
- return e.code, {"raw": raw[:2000]}
- def login():
- node_script = r"""
- const { sm2 } = require('sm-crypto-v2');
- const PK = '0484C7466D950E120E5ECE5DD85D0C90EAA85081A3A2BD7C57AE6DC822EFCCBD66620C67B0103FC8DD280E36C3B282977B722AAEC3C56518EDCEBAFB72C5A05312';
- console.log(JSON.stringify({ account: 'AIDOPDemo', password: sm2.doEncrypt('1234567890dop', PK, 1), tenantId: '797403760988229' }));
- """
- proc = subprocess.run(
- ["node", "-e", node_script],
- cwd=r"d:\Projects\Ai-DOP\SourceCode\ZZYDOP\Web",
- capture_output=True,
- text=True,
- timeout=30,
- )
- if proc.returncode != 0:
- raise RuntimeError(proc.stderr)
- status, body = http_json("POST", "/api/sysAuth/login", json.loads(proc.stdout.strip()))
- token = (body.get("result") or {}).get("accessToken") or body.get("accessToken")
- if not token:
- raise RuntimeError(f"login failed: {body}")
- return token
- def unwrap(body):
- if isinstance(body, dict) and body.get("code") == 200 and "result" in body:
- return body["result"]
- return body
- def pick_int(obj, *keys, default=0):
- if not isinstance(obj, dict):
- return default
- for k in keys:
- if k in obj and obj[k] is not None:
- try:
- return int(obj[k])
- except (TypeError, ValueError):
- pass
- return default
- def ensure_supplier_for_shortage_item(conn):
- """E2E 数据准备:缺料外购件补 srm_purchase 供应商(仅测试租户)。"""
- with conn.cursor() as cur:
- cur.execute(
- "SELECT COUNT(*) AS c FROM srm_purchase WHERE tenant_id=%s AND number=%s AND IFNULL(IsDeleted,0)=0",
- (TENANT, SHORTAGE_ITEM),
- )
- if cur.fetchone()["c"] > 0:
- return "already_exists"
- cur.execute(
- "SELECT Id, name FROM ic_item WHERE tenant_id=%s AND number=%s AND IsDeleted=0 LIMIT 1",
- (TENANT, SHORTAGE_ITEM),
- )
- ic = cur.fetchone()
- if not ic:
- return "no_ic_item"
- cur.execute(
- "SELECT * FROM srm_purchase WHERE tenant_id=%s AND IFNULL(IsDeleted,0)=0 AND supplier_id>0 LIMIT 1",
- (TENANT,),
- )
- tpl = cur.fetchone()
- if not tpl:
- return "no_template"
- cur.execute("SELECT IFNULL(MAX(Id),0)+1 AS nid FROM srm_purchase")
- new_id = cur.fetchone()["nid"]
- cur.execute(
- """
- INSERT INTO srm_purchase (
- Id, icitem_id, icitem_name, number, supplier_id, supplier_name, supplier_number,
- supplier_type, currency_type, lead_time, quota_rate, purchase_unit, IsRequireGoods,
- tenant_id, company_id, factory_id, is_active, effective_date, expiring_date,
- quota_priority, create_time, update_time, IsDeleted
- ) VALUES (
- %s, %s, %s, %s, %s, %s, %s,
- %s, %s, %s, %s, %s, %s,
- %s, %s, %s, %s, %s, %s,
- %s, NOW(), NOW(), 0
- )
- """,
- (
- new_id,
- ic["Id"],
- ic.get("name") or SHORTAGE_ITEM,
- SHORTAGE_ITEM,
- tpl["supplier_id"],
- tpl["supplier_name"],
- tpl["supplier_number"],
- tpl.get("supplier_type") or "标准",
- tpl.get("currency_type") or 1,
- tpl.get("lead_time") or 7,
- tpl.get("quota_rate") or 100,
- tpl.get("purchase_unit") or "PCS",
- tpl.get("IsRequireGoods") or 0,
- TENANT,
- tpl.get("company_id") or 1000,
- tpl.get("factory_id") or 2400,
- tpl.get("is_active") or "是",
- tpl.get("effective_date"),
- tpl.get("expiring_date"),
- tpl.get("quota_priority") or 0,
- ),
- )
- conn.commit()
- return "inserted"
- def prepare_e2e_state(conn):
- """E2E 前置:交期确认需 progress=2;下达需 Status=p 且无缺料。"""
- with conn.cursor() as cur:
- cur.execute(
- """
- UPDATE crm_seorderentry
- SET progress = '2', update_time = NOW()
- WHERE seorder_id = %s AND IsDeleted = 0
- """,
- (ORDER_ID,),
- )
- entry_rows = cur.rowcount
- cur.execute(
- """
- UPDATE WorkOrdMaster
- SET Status = 'p', ReleaseDate = NULL, LotSerial = NULL, UpdateTime = NOW()
- WHERE tenant_id = %s AND WorkOrd = %s
- """,
- (TENANT, RELEASE_WO),
- )
- wo_reset = cur.rowcount
- cur.execute(
- """
- UPDATE WorkOrdRouting
- SET Status = 'p', UpdateTime = NOW()
- WHERE tenant_id = %s AND WorkOrd = %s
- """,
- (TENANT, RELEASE_WO),
- )
- cur.execute(
- """
- DELETE FROM NbrMaster
- WHERE tenant_id = %s AND WorkOrd = %s AND Type = 'SM'
- """,
- (TENANT, RELEASE_WO),
- )
- pick_deleted = cur.rowcount
- conn.commit()
- return {"entry_progress_set": entry_rows, "wo_reset": wo_reset, "pick_bills_deleted": pick_deleted}
- def clear_shortage_for_release(conn, work_ord):
- """E2E:清零指定工单资源检查缺料,验证下达成功路径。"""
- with conn.cursor() as cur:
- cur.execute(
- """
- UPDATE b_bom_child_examine bce
- INNER JOIN b_examine_result ber ON ber.Id = bce.examine_id
- SET bce.lack_qty = 0, bce.self_lack_qty = 0, bce.update_time = NOW()
- WHERE ber.tenant_id = %s AND ber.morder_no = %s AND ber.IsDeleted = 0 AND bce.is_use = 1
- """,
- (TENANT, work_ord),
- )
- n = cur.rowcount
- cur.execute(
- """
- UPDATE mes_morder SET MaterialSituation = '齐套', update_time = NOW()
- WHERE tenant_id = %s AND morder_no = %s AND IsDeleted = 0
- """,
- (TENANT, work_ord),
- )
- conn.commit()
- return n
- def snapshot(conn):
- snap = {}
- with conn.cursor() as cur:
- cur.execute(
- "SELECT entry_seq, progress FROM crm_seorderentry WHERE seorder_id=%s ORDER BY entry_seq",
- (ORDER_ID,),
- )
- snap["entries"] = cur.fetchall()
- cur.execute(
- "SELECT WorkOrd, Status FROM WorkOrdMaster WHERE WorkOrd=%s AND tenant_id=%s",
- (RELEASE_WO, TENANT),
- )
- snap["release_wo"] = cur.fetchone()
- cur.execute(
- "SELECT COUNT(*) AS c FROM NbrMaster WHERE tenant_id=%s AND WorkOrd=%s AND Type='SM'",
- (TENANT, RELEASE_WO),
- )
- snap["pick_bills"] = cur.fetchone()["c"]
- cur.execute(
- """
- SELECT COUNT(*) AS c FROM srm_pr_main
- WHERE tenant_id=%s AND create_time >= DATE_SUB(NOW(), INTERVAL 2 HOUR)
- """,
- (TENANT,),
- )
- snap["recent_pr"] = cur.fetchone()["c"]
- cur.execute(
- """
- SELECT id, job_code, status, batch_id, start_time
- FROM mdp_transform_run_log
- WHERE job_code='S1_MDP_SYNC_TRANSFORM' AND start_time >= DATE_SUB(NOW(), INTERVAL 6 HOUR)
- ORDER BY start_time DESC LIMIT 3
- """
- )
- snap["s1_mdp_runs"] = cur.fetchall()
- cur.execute(
- "SELECT COUNT(*) AS c FROM dwd_ship_trans WHERE tenant_id=%s AND order_no=%s",
- (TENANT, BILL_NO),
- )
- snap["dwd_ship_rows"] = cur.fetchone()["c"]
- return snap
- def main():
- report = {"steps": [], "checks": []}
- token = login()
- report["login"] = "ok"
- with pymysql.connect(**CONN) as conn:
- report["supplier_seed"] = ensure_supplier_for_shortage_item(conn)
- report["e2e_prep"] = prepare_e2e_state(conn)
- report["before"] = snapshot(conn)
- # S1 交期确认
- st, body = http_json("POST", "/api/Order/seorder/confirm-delivery", {"ids": [ORDER_ID]}, token)
- report["steps"].append({"step": "confirm_delivery", "http": st, "body": unwrap(body)})
- # S4 MRP(缺料→PR)
- st, body = http_json("POST", f"/api/WorkOrder/material-requirement/generate?domain={TENANT}", None, token)
- mrp = unwrap(body)
- report["steps"].append({"step": "mrp_with_pr", "http": st, "body": mrp})
- # S2 下达成功路径(先清零缺料)
- with pymysql.connect(**CONN) as conn:
- cleared = clear_shortage_for_release(conn, RELEASE_WO)
- report["shortage_cleared_rows"] = cleared
- st, body = http_json(
- "POST",
- "/api/WorkOrder/dispatch/release",
- {
- "workOrd": RELEASE_WO,
- "tenantId": TENANT,
- "ordDate": date.today().isoformat(),
- "lotSerial": "E2E-FULL-001",
- },
- token,
- )
- report["steps"].append({"step": "release_success", "http": st, "body": unwrap(body)})
- # S3 交货计划生成(需求路径;无候选时回退 PO 批量)
- st, body = http_json("POST", "/api/Supply/delivery-schedule/generate", None, token)
- ds_body = unwrap(body)
- report["steps"].append({"step": "delivery_schedule_generate", "http": st, "body": ds_body})
- created = pick_int(ds_body, "createdCount", "CreatedCount")
- if created <= 0:
- st_b, body_b = http_json(
- "POST",
- "/api/Supply/delivery-schedule/batch-generate",
- {"poNumber": PO_SAMPLE},
- token,
- )
- report["steps"].append({"step": "delivery_schedule_batch_po", "http": st_b, "body": unwrap(body_b)})
- # S1 3级计划重排
- st, body = http_json(
- "POST",
- f"/api/Order/seorder/{ORDER_ID}/refresh-plan",
- {"reason": "E2E full chain refresh"},
- token,
- )
- report["steps"].append({"step": "refresh_plan", "http": st, "body": unwrap(body)})
- with pymysql.connect(**CONN) as conn:
- report["after"] = snapshot(conn)
- def ok(step, code=200):
- s = next((x for x in report["steps"] if x["step"] == step), None)
- return s is not None and s["http"] == code
- pr_created = pick_int(mrp, "prCreatedCount", "PrCreatedCount")
- recent_pr = report["after"].get("recent_pr") or 0
- report["checks"] = [
- {"name": "confirm_delivery", "ok": ok("confirm_delivery")},
- {
- "name": "mrp_pr_created",
- "ok": pr_created > 0 or recent_pr > 0,
- "detail": f"prCreatedCount={pr_created}, recent_pr={recent_pr}",
- },
- {"name": "release_success", "ok": ok("release_success")},
- {
- "name": "pick_bill_created",
- "ok": (report["after"].get("pick_bills") or 0) > 0,
- "detail": report["after"].get("pick_bills"),
- },
- {
- "name": "work_order_status_r",
- "ok": (report["after"].get("release_wo") or {}).get("Status") == "r",
- "detail": report["after"].get("release_wo"),
- },
- {
- "name": "work_order_was_reset_to_p",
- "ok": (report["before"].get("release_wo") or {}).get("Status") == "p",
- "detail": report["before"].get("release_wo"),
- },
- {"name": "delivery_schedule", "ok": ok("delivery_schedule_generate")},
- {
- "name": "delivery_schedule_or_batch",
- "ok": ok("delivery_schedule_generate")
- or ok("delivery_schedule_batch_po"),
- "detail": "需求生成或 PO 批量生成至少一路 HTTP 200",
- },
- {"name": "refresh_plan", "ok": ok("refresh_plan")},
- {
- "name": "s1_mdp_after_chain",
- "ok": len((report["after"].get("s1_mdp_runs") or [])) >= 1,
- "detail": report["after"].get("s1_mdp_runs"),
- },
- {
- "name": "dwd_ship_trans_main",
- "ok": (report["after"].get("dwd_ship_rows") or 0) >= 2,
- "detail": f"order_no={BILL_NO}, rows={report['after'].get('dwd_ship_rows')}",
- },
- ]
- report["passed"] = all(c["ok"] for c in report["checks"])
- out = r"d:\Projects\Ai-DOP\SourceCode\ZZYDOP\doc\_verify_s1_s4_full_e2e_result.json"
- with open(out, "w", encoding="utf-8") as f:
- json.dump(report, f, ensure_ascii=False, indent=2, default=str)
- print(json.dumps(report, ensure_ascii=False, indent=2, default=str))
- sys.exit(0 if report["passed"] else 5)
- if __name__ == "__main__":
- main()
|