| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215 |
- #!/usr/bin/env python3
- """CLOSE 证据准备:S1 MDP 刷新 + S3 净需求样例(幂等,可重复跑)。"""
- import json
- import subprocess
- import sys
- import time
- import urllib.error
- import urllib.request
- from datetime import datetime, timedelta
- import pymysql
- BASE = "http://127.0.0.1:5005"
- TENANT = 797403760988229
- BILL_NO = "MPO482024102300001"
- DEMAND_MARKER = "CLOSE_UAT_S3_DEMAND"
- DEMAND_ID = 9106000999000001
- CONN = dict(
- host="123.60.180.165",
- port=3306,
- user="aidopremote",
- password="1234567890aiDOP#",
- database="aidopdev",
- charset="utf8mb4",
- connect_timeout=20,
- cursorclass=pymysql.cursors.DictCursor,
- )
- def http_json(method, path, body=None, token=None, timeout=300):
- headers = {"Content-Type": "application/json", "Accept": "application/json"}
- if token:
- headers["Authorization"] = f"Bearer {token}"
- data = None if body is None else json.dumps(body).encode("utf-8")
- req = urllib.request.Request(f"{BASE}{path}", data=data, headers=headers, method=method)
- try:
- with urllib.request.urlopen(req, timeout=timeout) as resp:
- raw = resp.read().decode("utf-8")
- try:
- return resp.status, json.loads(raw)
- except json.JSONDecodeError:
- return resp.status, {"raw": raw[:2000]}
- except urllib.error.HTTPError as e:
- raw = e.read().decode("utf-8", errors="replace")
- try:
- return e.code, json.loads(raw)
- except json.JSONDecodeError:
- return e.code, {"raw": raw[:2000]}
- def login():
- node_script = r"""
- const { sm2 } = require('sm-crypto-v2');
- const PK = '0484C7466D950E120E5ECE5DD85D0C90EAA85081A3A2BD7C57AE6DC822EFCCBD66620C67B0103FC8DD280E36C3B282977B722AAEC3C56518EDCEBAFB72C5A05312';
- console.log(JSON.stringify({ account: 'AIDOPDemo', password: sm2.doEncrypt('1234567890dop', PK, 1), tenantId: '797403760988229' }));
- """
- proc = subprocess.run(
- ["node", "-e", node_script],
- cwd=r"d:\Projects\Ai-DOP\SourceCode\ZZYDOP\Web",
- capture_output=True,
- text=True,
- timeout=30,
- )
- if proc.returncode != 0:
- raise RuntimeError(proc.stderr or proc.stdout)
- _, body = http_json("POST", "/api/sysAuth/login", json.loads(proc.stdout.strip()))
- token = (body.get("result") or {}).get("accessToken") or body.get("accessToken")
- if not token:
- raise RuntimeError(f"login failed: {body}")
- return token
- def unwrap(body):
- if isinstance(body, dict) and body.get("code") == 200 and "result" in body:
- return body["result"]
- if isinstance(body, dict) and body.get("ok") is True:
- return body
- return body
- def count_dwd(conn):
- with conn.cursor() as c:
- c.execute(
- "SELECT COUNT(*) AS c FROM dwd_ship_trans WHERE tenant_id=%s AND order_no=%s",
- (TENANT, BILL_NO),
- )
- return c.fetchone()["c"]
- def count_demand(conn):
- with conn.cursor() as c:
- c.execute(
- """
- SELECT COUNT(*) AS c FROM ic_demandschedule
- WHERE tenant_id=%s AND IFNULL(status,'')='P' AND IFNULL(tosechedqty,0)>0
- AND (IFNULL(ishistoryversion,'')='' OR ishistoryversion='N') AND IFNULL(IsDeleted,0)=0
- """,
- (TENANT,),
- )
- return c.fetchone()["c"]
- def find_seed_item(conn):
- with conn.cursor() as c:
- c.execute(
- """
- SELECT sp.number AS itemnum, sp.supplier_number, im.PurMfg
- FROM srm_purchase sp
- INNER JOIN ItemMaster im ON im.ItemNum = sp.number
- WHERE sp.tenant_id=%s AND IFNULL(sp.IsDeleted,0)=0
- AND IFNULL(sp.quota_rate,0)>0
- AND IFNULL(sp.supplier_number,'')<>''
- AND IFNULL(im.PurMfg,'')='P'
- ORDER BY sp.number
- LIMIT 1
- """,
- (TENANT,),
- )
- return c.fetchone()
- def ensure_demand_seed(conn):
- existing = count_demand(conn)
- if existing > 0:
- return {"action": "skip", "reason": f"demand_published={existing}"}
- item = find_seed_item(conn)
- if not item:
- return {"action": "failed", "reason": "no item with source+PO open qty"}
- need_date = (datetime.now() + timedelta(days=7)).strftime("%Y-%m-%d %H:%M:%S")
- qty = 10.0
- with conn.cursor() as c:
- c.execute("SELECT Id FROM ic_demandschedule WHERE Id=%s", (DEMAND_ID,))
- if c.fetchone():
- c.execute(
- """
- UPDATE ic_demandschedule
- SET itemnum=%s, status='P', toschedqty=%s, sechedqty=%s,
- requestdate=%s, arrivaldate=%s, ishistoryversion='N',
- IsDeleted=0, remarks=%s, update_time=NOW()
- WHERE Id=%s AND tenant_id=%s
- """,
- (item["itemnum"], qty, qty, need_date, need_date, DEMAND_MARKER, DEMAND_ID, TENANT),
- )
- else:
- c.execute(
- """
- INSERT INTO ic_demandschedule
- (Id, itemnum, status, toschedqty, sechedqty, requestdate, arrivaldate,
- ishistoryversion, remarks, tenant_id, IsDeleted, create_time, update_time)
- VALUES (%s,%s,'P',%s,%s,%s,%s,'N',%s,%s,0,NOW(),NOW())
- """,
- (DEMAND_ID, item["itemnum"], qty, qty, need_date, need_date, DEMAND_MARKER, TENANT),
- )
- conn.commit()
- return {"action": "seeded", "itemnum": item["itemnum"], "qty": qty, "demand_id": DEMAND_ID}
- def refresh_s1_mdp(token):
- st, body = http_json("POST", "/api/AidopKanban/s1-mdp/refresh", None, token, timeout=600)
- return st, unwrap(body)
- def wait_dwd(conn, target=2, attempts=12, sleep_s=5):
- for i in range(attempts):
- c = count_dwd(conn)
- if c >= target:
- return c, i + 1
- time.sleep(sleep_s)
- return count_dwd(conn), attempts
- def main():
- report = {"tenant_id": str(TENANT), "bill_no": BILL_NO, "steps": []}
- token = login()
- report["login"] = "ok"
- with pymysql.connect(**CONN) as conn:
- report["before"] = {
- "dwd_ship_rows": count_dwd(conn),
- "demand_published": count_demand(conn),
- }
- st, mdp = refresh_s1_mdp(token)
- report["steps"].append({"step": "s1_mdp_refresh", "http": st, "body": mdp})
- dwd_after_mdp, tries = wait_dwd(conn, target=2)
- report["steps"].append({"step": "wait_dwd_ship", "rows": dwd_after_mdp, "attempts": tries})
- demand_result = ensure_demand_seed(conn)
- report["steps"].append({"step": "seed_s3_demand", **demand_result})
- report["after"] = {
- "dwd_ship_rows": count_dwd(conn),
- "demand_published": count_demand(conn),
- }
- report["checks"] = [
- {"name": "s1_mdp_http", "ok": report["steps"][0]["http"] == 200},
- {"name": "dwd_ship_main_ge2", "ok": report["after"]["dwd_ship_rows"] >= 2},
- {"name": "demand_published_gt0", "ok": report["after"]["demand_published"] > 0},
- ]
- report["passed"] = all(c["ok"] for c in report["checks"])
- out = r"d:\Projects\Ai-DOP\SourceCode\ZZYDOP\doc\_prep_close_evidence_result.json"
- with open(out, "w", encoding="utf-8") as f:
- json.dump(report, f, ensure_ascii=False, indent=2, default=str)
- print(json.dumps(report, ensure_ascii=False, indent=2, default=str))
- sys.exit(0 if report["passed"] else 7)
- if __name__ == "__main__":
- main()
|