_verify_order_review_e2e.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374
  1. #!/usr/bin/env python3
  2. """E2E: S1 order review -> resource check -> work order generation."""
  3. import json
  4. import sys
  5. import urllib.error
  6. import urllib.request
  7. try:
  8. import pymysql
  9. except ImportError:
  10. print(json.dumps({"error": "pymysql not installed"}, ensure_ascii=False))
  11. sys.exit(1)
  12. BASE = "http://127.0.0.1:5005"
  13. TENANT = 797403760988229
  14. BILL_NO = "MPO482024102300001"
  15. CONN = dict(
  16. host="123.60.180.165",
  17. port=3306,
  18. user="aidopremote",
  19. password="1234567890aiDOP#",
  20. database="aidopdev",
  21. charset="utf8mb4",
  22. connect_timeout=20,
  23. cursorclass=pymysql.cursors.DictCursor,
  24. )
  25. def http_json(method, path, body=None, token=None):
  26. headers = {"Content-Type": "application/json", "Accept": "application/json"}
  27. if token:
  28. headers["Authorization"] = f"Bearer {token}"
  29. data = None if body is None else json.dumps(body).encode("utf-8")
  30. req = urllib.request.Request(f"{BASE}{path}", data=data, headers=headers, method=method)
  31. try:
  32. with urllib.request.urlopen(req, timeout=60) as resp:
  33. raw = resp.read().decode("utf-8")
  34. return resp.status, json.loads(raw) if raw else {}
  35. except urllib.error.HTTPError as e:
  36. raw = e.read().decode("utf-8", errors="replace")
  37. try:
  38. body = json.loads(raw)
  39. except json.JSONDecodeError:
  40. body = {"raw": raw[:2000]}
  41. return e.code, body
  42. def login_sm2():
  43. """Login via Node sm-crypto helper."""
  44. import subprocess
  45. node_script = r"""
  46. const { sm2 } = require('sm-crypto-v2');
  47. const PK = '0484C7466D950E120E5ECE5DD85D0C90EAA85081A3A2BD7C57AE6DC822EFCCBD66620C67B0103FC8DD280E36C3B282977B722AAEC3C56518EDCEBAFB72C5A05312';
  48. const encPwd = sm2.doEncrypt('1234567890dop', PK, 1);
  49. console.log(JSON.stringify({ account: 'AIDOPDemo', password: encPwd, tenantId: '797403760988229' }));
  50. """
  51. proc = subprocess.run(
  52. ["node", "-e", node_script],
  53. cwd=r"d:\Projects\Ai-DOP\SourceCode\ZZYDOP\Web",
  54. capture_output=True,
  55. text=True,
  56. timeout=30,
  57. )
  58. if proc.returncode != 0:
  59. raise RuntimeError(f"sm2 encrypt failed: {proc.stderr}")
  60. payload = json.loads(proc.stdout.strip())
  61. status, body = http_json("POST", "/api/sysAuth/login", payload)
  62. token = (
  63. (body.get("result") or {}).get("accessToken")
  64. or body.get("accessToken")
  65. or (body.get("result") or {}).get("token")
  66. )
  67. if not token:
  68. raise RuntimeError(f"login failed: {json.dumps(body, ensure_ascii=False)[:500]}")
  69. return token
  70. def q(cur, sql, params=None):
  71. cur.execute(sql, params or ())
  72. return cur.fetchall()
  73. def q1(cur, sql, params=None):
  74. rows = q(cur, sql, params)
  75. return rows[0] if rows else {}
  76. def unwrap(body):
  77. if isinstance(body, dict) and body.get("code") == 200 and "result" in body:
  78. return body["result"]
  79. return body
  80. def api_ok(status, body):
  81. if status != 200:
  82. return False
  83. if body.get("code") == 200 and body.get("type") == "success":
  84. return True
  85. payload = unwrap(body)
  86. return isinstance(payload, dict) and bool(payload.get("actionCode") or payload.get("message"))
  87. def snapshot_db(cur, order_id=None):
  88. snap = {}
  89. snap["order"] = q1(
  90. cur,
  91. """
  92. SELECT Id, bill_no, tenant_id, status
  93. FROM crm_seorder
  94. WHERE tenant_id = %s AND bill_no = %s AND IsDeleted = 0
  95. """,
  96. (TENANT, BILL_NO),
  97. )
  98. oid = order_id or snap["order"].get("Id")
  99. snap["entries"] = q(
  100. cur,
  101. """
  102. SELECT Id, entry_seq, item_number, qty, plan_date, sys_capacity_date, progress, bom_number
  103. FROM crm_seorderentry
  104. WHERE seorder_id = %s AND tenant_id = %s AND IsDeleted = 0
  105. ORDER BY entry_seq
  106. """,
  107. (oid, TENANT),
  108. ) if oid else []
  109. entry_ids = [r["Id"] for r in snap["entries"]]
  110. if entry_ids:
  111. fmt = ",".join(["%s"] * len(entry_ids))
  112. snap["work_orders"] = q(
  113. cur,
  114. f"""
  115. SELECT RecID, WorkOrd, ItemNum, Status, BusinessID, tenant_id
  116. FROM WorkOrdMaster
  117. WHERE tenant_id = %s AND BusinessID IN ({fmt})
  118. ORDER BY WorkOrd
  119. """,
  120. [TENANT, *entry_ids],
  121. )
  122. snap["examine_results"] = q(
  123. cur,
  124. f"""
  125. SELECT Id, sentry_id, morder_no, need_qty, kitting_times, IsDeleted, create_time
  126. FROM b_examine_result
  127. WHERE tenant_id = %s AND sentry_id IN ({fmt})
  128. ORDER BY create_time DESC
  129. LIMIT 10
  130. """,
  131. [TENANT, *entry_ids],
  132. )
  133. snap["examine_lines"] = q(
  134. cur,
  135. f"""
  136. SELECT COUNT(*) AS cnt
  137. FROM b_bom_child_examine b
  138. INNER JOIN b_examine_result e ON e.Id = b.examine_id
  139. WHERE e.tenant_id = %s AND e.sentry_id IN ({fmt}) AND e.IsDeleted = 0 AND b.is_use = 1
  140. """,
  141. [TENANT, *entry_ids],
  142. )
  143. snap["examine_line_count"] = (snap["examine_lines"][0] or {}).get("cnt", 0)
  144. else:
  145. snap["work_orders"] = []
  146. snap["examine_results"] = []
  147. snap["examine_line_count"] = 0
  148. snap["action_log_table"] = q1(
  149. cur,
  150. """
  151. SELECT COUNT(*) AS cnt
  152. FROM information_schema.tables
  153. WHERE table_schema = 'aidopdev' AND table_name = 'aidop_action_run_log'
  154. """,
  155. ).get("cnt", 0)
  156. snap["recent_action_logs"] = []
  157. if snap["action_log_table"]:
  158. snap["recent_action_logs"] = q(
  159. cur,
  160. """
  161. SELECT id, action_code, biz_no, status, message, start_time
  162. FROM aidop_action_run_log
  163. WHERE tenant_id = %s AND action_code LIKE 'S1_%%'
  164. ORDER BY start_time DESC
  165. LIMIT 5
  166. """,
  167. (TENANT,),
  168. )
  169. snap["item_master"] = {}
  170. for item in ("3121C0035", "1A0C885"):
  171. snap["item_master"][item] = q1(
  172. cur, "SELECT COUNT(*) AS cnt FROM ItemMaster WHERE ItemNum = %s", (item,)
  173. ).get("cnt", 0)
  174. snap["ic_bom"] = {}
  175. for item in ("3121C0035", "1A0C885"):
  176. snap["ic_bom"][item] = q1(
  177. cur,
  178. """
  179. SELECT COUNT(*) AS cnt FROM ic_bom
  180. WHERE tenant_id = %s AND item_number = %s AND IsDeleted = 0
  181. """,
  182. (TENANT, item),
  183. ).get("cnt", 0)
  184. snap["s1_mdp_runs"] = q(
  185. cur,
  186. """
  187. SELECT id, job_code, status, batch_id, start_time, end_time
  188. FROM mdp_transform_run_log
  189. WHERE job_code = 'S1_MDP_SYNC_TRANSFORM'
  190. AND start_time >= DATE_SUB(NOW(), INTERVAL 6 HOUR)
  191. ORDER BY start_time DESC
  192. LIMIT 5
  193. """,
  194. )
  195. snap["dwd_ship_rows"] = q1(
  196. cur,
  197. """
  198. SELECT COUNT(*) AS cnt FROM dwd_ship_trans
  199. WHERE tenant_id = %s AND order_no = %s
  200. """,
  201. (TENANT, BILL_NO),
  202. ).get("cnt", 0)
  203. return snap
  204. def prepare_review_state(conn, order_id):
  205. """将订单行置为可评审态(progress 0/1),便于重复跑 E2E。"""
  206. with conn.cursor() as cur:
  207. cur.execute(
  208. """
  209. UPDATE crm_seorderentry
  210. SET progress = '0', update_time = NOW()
  211. WHERE seorder_id = %s AND tenant_id = %s AND IsDeleted = 0
  212. """,
  213. (order_id, TENANT),
  214. )
  215. rows = cur.rowcount
  216. conn.commit()
  217. return rows
  218. def main():
  219. report = {"bill_no": BILL_NO, "tenant_id": str(TENANT), "steps": []}
  220. # 0) backend health
  221. try:
  222. status, body = http_json("GET", "/api/sysAuth/captcha")
  223. report["backend"] = {"ok": status == 200, "http": status}
  224. except Exception as e:
  225. report["backend"] = {"ok": False, "error": str(e)}
  226. print(json.dumps(report, ensure_ascii=False, indent=2, default=str))
  227. sys.exit(1)
  228. token = login_sm2()
  229. report["login"] = "ok"
  230. # 1) list order via API
  231. status, list_body = http_json(
  232. "GET",
  233. f"/api/Order/seorder/list?billNo={BILL_NO}&page=1&pageSize=20",
  234. token=token,
  235. )
  236. result = list_body.get("result") or list_body
  237. rows = result.get("list") or []
  238. report["steps"].append(
  239. {
  240. "step": "list_order",
  241. "http": status,
  242. "total": result.get("total"),
  243. "order_id": rows[0]["id"] if rows else None,
  244. }
  245. )
  246. if not rows:
  247. print(json.dumps(report, ensure_ascii=False, indent=2, default=str))
  248. sys.exit(2)
  249. order_id = rows[0]["id"]
  250. with pymysql.connect(**CONN) as conn:
  251. with conn.cursor() as cur:
  252. report["before"] = snapshot_db(cur, order_id)
  253. report["e2e_prep"] = {"entry_progress_reset": prepare_review_state(conn, order_id)}
  254. with conn.cursor() as cur:
  255. report["before_after_prep"] = snapshot_db(cur, order_id)
  256. # 2) call review API
  257. status, review_body = http_json(
  258. "POST",
  259. "/api/Order/seorder/review",
  260. body={"ids": [order_id]},
  261. token=token,
  262. )
  263. report["steps"].append(
  264. {
  265. "step": "review_api",
  266. "http": status,
  267. "code": review_body.get("code"),
  268. "type": review_body.get("type"),
  269. "message": review_body.get("message"),
  270. "result": review_body.get("result"),
  271. }
  272. )
  273. with pymysql.connect(**CONN) as conn:
  274. with conn.cursor() as cur:
  275. report["after"] = snapshot_db(cur, order_id)
  276. # 3) assertions
  277. review_result = unwrap(review_body)
  278. after = report["after"]
  279. checks = []
  280. checks.append(
  281. {
  282. "name": "review_api_success",
  283. "ok": api_ok(status, review_body),
  284. "detail": review_result.get("message") or review_body.get("message"),
  285. }
  286. )
  287. checks.append(
  288. {
  289. "name": "work_orders_exist",
  290. "ok": len(after.get("work_orders") or []) >= 2,
  291. "detail": [w.get("WorkOrd") for w in (after.get("work_orders") or [])],
  292. }
  293. )
  294. checks.append(
  295. {
  296. "name": "resource_check_rows",
  297. "ok": (review_result.get("resourceCheckCount") or 0) >= 1
  298. or len(after.get("examine_results") or []) >= 1,
  299. "detail": {
  300. "api_resourceCheckCount": review_result.get("resourceCheckCount"),
  301. "db_examine_count": len(after.get("examine_results") or []),
  302. "db_examine_line_count": after.get("examine_line_count"),
  303. },
  304. }
  305. )
  306. checks.append(
  307. {
  308. "name": "action_log_table",
  309. "ok": (after.get("action_log_table") or 0) >= 1,
  310. "detail": "aidop_action_run_log exists",
  311. }
  312. )
  313. checks.append(
  314. {
  315. "name": "item_master_prereq",
  316. "ok": all(v >= 1 for v in (after.get("item_master") or {}).values()),
  317. "detail": after.get("item_master"),
  318. }
  319. )
  320. checks.append(
  321. {
  322. "name": "s1_mdp_recent_run",
  323. "ok": len(after.get("s1_mdp_runs") or []) >= 1,
  324. "detail": after.get("s1_mdp_runs"),
  325. }
  326. )
  327. checks.append(
  328. {
  329. "name": "dwd_ship_trans_order",
  330. "ok": (after.get("dwd_ship_rows") or 0) >= 2,
  331. "detail": f"order_no={BILL_NO}, rows={after.get('dwd_ship_rows')}",
  332. "note": "主样例订单行 2 行,DWD 应 >=2",
  333. }
  334. )
  335. report["checks"] = checks
  336. report["passed"] = all(c["ok"] for c in checks)
  337. print(json.dumps(report, ensure_ascii=False, indent=2, default=str))
  338. sys.exit(0 if report["passed"] else 3)
  339. if __name__ == "__main__":
  340. main()