#!/usr/bin/env python3 """CLOSE 证据准备:S1 MDP 刷新 + S3 净需求样例(幂等,可重复跑)。""" import json import subprocess import sys import time import urllib.error import urllib.request from datetime import datetime, timedelta import pymysql BASE = "http://127.0.0.1:5005" TENANT = 797403760988229 BILL_NO = "MPO482024102300001" DEMAND_MARKER = "CLOSE_UAT_S3_DEMAND" DEMAND_ID = 9106000999000001 CONN = dict( host="123.60.180.165", port=3306, user="aidopremote", password="1234567890aiDOP#", database="aidopdev", charset="utf8mb4", connect_timeout=20, cursorclass=pymysql.cursors.DictCursor, ) def http_json(method, path, body=None, token=None, timeout=300): headers = {"Content-Type": "application/json", "Accept": "application/json"} if token: headers["Authorization"] = f"Bearer {token}" data = None if body is None else json.dumps(body).encode("utf-8") req = urllib.request.Request(f"{BASE}{path}", data=data, headers=headers, method=method) try: with urllib.request.urlopen(req, timeout=timeout) as resp: raw = resp.read().decode("utf-8") try: return resp.status, json.loads(raw) except json.JSONDecodeError: return resp.status, {"raw": raw[:2000]} except urllib.error.HTTPError as e: raw = e.read().decode("utf-8", errors="replace") try: return e.code, json.loads(raw) except json.JSONDecodeError: return e.code, {"raw": raw[:2000]} def login(): node_script = r""" const { sm2 } = require('sm-crypto-v2'); const PK = '0484C7466D950E120E5ECE5DD85D0C90EAA85081A3A2BD7C57AE6DC822EFCCBD66620C67B0103FC8DD280E36C3B282977B722AAEC3C56518EDCEBAFB72C5A05312'; console.log(JSON.stringify({ account: 'AIDOPDemo', password: sm2.doEncrypt('1234567890dop', PK, 1), tenantId: '797403760988229' })); """ proc = subprocess.run( ["node", "-e", node_script], cwd=r"d:\Projects\Ai-DOP\SourceCode\ZZYDOP\Web", capture_output=True, text=True, timeout=30, ) if proc.returncode != 0: raise RuntimeError(proc.stderr or proc.stdout) _, body = http_json("POST", "/api/sysAuth/login", json.loads(proc.stdout.strip())) token = (body.get("result") or {}).get("accessToken") or body.get("accessToken") if not token: raise RuntimeError(f"login failed: {body}") return token def unwrap(body): if isinstance(body, dict) and body.get("code") == 200 and "result" in body: return body["result"] if isinstance(body, dict) and body.get("ok") is True: return body return body def count_dwd(conn): with conn.cursor() as c: c.execute( "SELECT COUNT(*) AS c FROM dwd_ship_trans WHERE tenant_id=%s AND order_no=%s", (TENANT, BILL_NO), ) return c.fetchone()["c"] def count_demand(conn): with conn.cursor() as c: c.execute( """ SELECT COUNT(*) AS c FROM ic_demandschedule WHERE tenant_id=%s AND IFNULL(status,'')='P' AND IFNULL(tosechedqty,0)>0 AND (IFNULL(ishistoryversion,'')='' OR ishistoryversion='N') AND IFNULL(IsDeleted,0)=0 """, (TENANT,), ) return c.fetchone()["c"] def find_seed_item(conn): with conn.cursor() as c: c.execute( """ SELECT sp.number AS itemnum, sp.supplier_number, im.PurMfg FROM srm_purchase sp INNER JOIN ItemMaster im ON im.ItemNum = sp.number WHERE sp.tenant_id=%s AND IFNULL(sp.IsDeleted,0)=0 AND IFNULL(sp.quota_rate,0)>0 AND IFNULL(sp.supplier_number,'')<>'' AND IFNULL(im.PurMfg,'')='P' ORDER BY sp.number LIMIT 1 """, (TENANT,), ) return c.fetchone() def ensure_demand_seed(conn): existing = count_demand(conn) if existing > 0: return {"action": "skip", "reason": f"demand_published={existing}"} item = find_seed_item(conn) if not item: return {"action": "failed", "reason": "no item with source+PO open qty"} need_date = (datetime.now() + timedelta(days=7)).strftime("%Y-%m-%d %H:%M:%S") qty = 10.0 with conn.cursor() as c: c.execute("SELECT Id FROM ic_demandschedule WHERE Id=%s", (DEMAND_ID,)) if c.fetchone(): c.execute( """ UPDATE ic_demandschedule SET itemnum=%s, status='P', toschedqty=%s, sechedqty=%s, requestdate=%s, arrivaldate=%s, ishistoryversion='N', IsDeleted=0, remarks=%s, update_time=NOW() WHERE Id=%s AND tenant_id=%s """, (item["itemnum"], qty, qty, need_date, need_date, DEMAND_MARKER, DEMAND_ID, TENANT), ) else: c.execute( """ INSERT INTO ic_demandschedule (Id, itemnum, status, toschedqty, sechedqty, requestdate, arrivaldate, ishistoryversion, remarks, tenant_id, IsDeleted, create_time, update_time) VALUES (%s,%s,'P',%s,%s,%s,%s,'N',%s,%s,0,NOW(),NOW()) """, (DEMAND_ID, item["itemnum"], qty, qty, need_date, need_date, DEMAND_MARKER, TENANT), ) conn.commit() return {"action": "seeded", "itemnum": item["itemnum"], "qty": qty, "demand_id": DEMAND_ID} def refresh_s1_mdp(token): st, body = http_json("POST", "/api/AidopKanban/s1-mdp/refresh", None, token, timeout=600) return st, unwrap(body) def wait_dwd(conn, target=2, attempts=12, sleep_s=5): for i in range(attempts): c = count_dwd(conn) if c >= target: return c, i + 1 time.sleep(sleep_s) return count_dwd(conn), attempts def main(): report = {"tenant_id": str(TENANT), "bill_no": BILL_NO, "steps": []} token = login() report["login"] = "ok" with pymysql.connect(**CONN) as conn: report["before"] = { "dwd_ship_rows": count_dwd(conn), "demand_published": count_demand(conn), } st, mdp = refresh_s1_mdp(token) report["steps"].append({"step": "s1_mdp_refresh", "http": st, "body": mdp}) dwd_after_mdp, tries = wait_dwd(conn, target=2) report["steps"].append({"step": "wait_dwd_ship", "rows": dwd_after_mdp, "attempts": tries}) demand_result = ensure_demand_seed(conn) report["steps"].append({"step": "seed_s3_demand", **demand_result}) report["after"] = { "dwd_ship_rows": count_dwd(conn), "demand_published": count_demand(conn), } report["checks"] = [ {"name": "s1_mdp_http", "ok": report["steps"][0]["http"] == 200}, {"name": "dwd_ship_main_ge2", "ok": report["after"]["dwd_ship_rows"] >= 2}, {"name": "demand_published_gt0", "ok": report["after"]["demand_published"] > 0}, ] report["passed"] = all(c["ok"] for c in report["checks"]) out = r"d:\Projects\Ai-DOP\SourceCode\ZZYDOP\doc\_prep_close_evidence_result.json" with open(out, "w", encoding="utf-8") as f: json.dump(report, f, ensure_ascii=False, indent=2, default=str) print(json.dumps(report, ensure_ascii=False, indent=2, default=str)) sys.exit(0 if report["passed"] else 7) if __name__ == "__main__": main()