| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328 |
- #!/usr/bin/env python3
- """CLOSE-S3: 交货计划 18 步可观测对账 + 采购闭环 + 异常路径证据。"""
- import json
- import subprocess
- import sys
- import urllib.error
- import urllib.request
- from datetime import datetime
- import pymysql
- BASE = "http://127.0.0.1:5005"
- TENANT = 797403760988229
- PO_SAMPLE = "PO-UAT-20260604-01"
- CONN = dict(
- host="123.60.180.165",
- port=3306,
- user="aidopremote",
- password="1234567890aiDOP#",
- database="aidopdev",
- charset="utf8mb4",
- connect_timeout=20,
- cursorclass=pymysql.cursors.DictCursor,
- )
- # 旧 CreateDeliverySchedule 18 步 → 本脚本可观测检查点
- STEP_CHECKS = [
- ("step01_demand_filter", "步骤1 已发布净需求>0需求计划", "sql_demand_published"),
- ("step02_existing_ds", "步骤2-3 已有交货单占用/差额", "sql_existing_ds"),
- ("step05_source_list", "步骤5 有效货源清单", "sql_active_source"),
- ("step06_po_lines", "步骤6-7 可用 PO/DO 明细", "sql_po_do_lines"),
- ("step09_quota_rule", "步骤9 配额规则快照", "api_rule_snapshot"),
- ("step12_15_generate", "步骤12-15 生成交货单/PR/异常", "api_generate_result"),
- ("step16_18_persist", "步骤16-18 编号与事务落库", "sql_persist_recent"),
- ("step_procurement", "AutoTransferDoOrPo 采购闭环", "api_procurement_pipeline"),
- ]
- def http_json(method, path, body=None, token=None):
- headers = {"Content-Type": "application/json", "Accept": "application/json"}
- if token:
- headers["Authorization"] = f"Bearer {token}"
- data = None if body is None else json.dumps(body).encode("utf-8")
- req = urllib.request.Request(f"{BASE}{path}", data=data, headers=headers, method=method)
- try:
- with urllib.request.urlopen(req, timeout=120) as resp:
- raw = resp.read().decode("utf-8")
- try:
- return resp.status, json.loads(raw)
- except json.JSONDecodeError:
- return resp.status, {"raw": raw[:2000]}
- except urllib.error.HTTPError as e:
- raw = e.read().decode("utf-8", errors="replace")
- try:
- return e.code, json.loads(raw)
- except json.JSONDecodeError:
- return e.code, {"raw": raw[:2000]}
- def login():
- node_script = r"""
- const { sm2 } = require('sm-crypto-v2');
- const PK = '0484C7466D950E120E5ECE5DD85D0C90EAA85081A3A2BD7C57AE6DC822EFCCBD66620C67B0103FC8DD280E36C3B282977B722AAEC3C56518EDCEBAFB72C5A05312';
- console.log(JSON.stringify({ account: 'AIDOPDemo', password: sm2.doEncrypt('1234567890dop', PK, 1), tenantId: '797403760988229' }));
- """
- proc = subprocess.run(
- ["node", "-e", node_script],
- cwd=r"d:\Projects\Ai-DOP\SourceCode\ZZYDOP\Web",
- capture_output=True,
- text=True,
- timeout=30,
- )
- if proc.returncode != 0:
- raise RuntimeError(proc.stderr or proc.stdout)
- _, body = http_json("POST", "/api/sysAuth/login", json.loads(proc.stdout.strip()))
- token = (body.get("result") or {}).get("accessToken") or body.get("accessToken")
- if not token:
- raise RuntimeError(f"login failed: {body}")
- return token
- def unwrap(body):
- if isinstance(body, dict) and body.get("code") == 200 and "result" in body:
- return body["result"]
- return body
- def sql_before(conn):
- snap = {}
- with conn.cursor() as cur:
- cur.execute(
- """
- SELECT COUNT(*) AS c FROM ic_demandschedule a
- WHERE a.tenant_id=%s AND IFNULL(a.status,'')='P' AND IFNULL(a.tosechedqty,0)>0
- AND (IFNULL(a.ishistoryversion,'')='' OR a.ishistoryversion='N') AND IFNULL(a.IsDeleted,0)=0
- """,
- (TENANT,),
- )
- snap["demand_published"] = cur.fetchone()["c"]
- cur.execute(
- """
- SELECT COUNT(*) AS c FROM srm_polist_ds
- WHERE tenant_id=%s AND IFNULL(isactive,1)=1 AND createtime >= DATE_SUB(NOW(), INTERVAL 24 HOUR)
- """,
- (TENANT,),
- )
- snap["ds_24h"] = cur.fetchone()["c"]
- cur.execute(
- """
- SELECT COUNT(*) AS c FROM srm_purchase
- WHERE tenant_id=%s AND IFNULL(IsDeleted,0)=0
- AND (is_active IN ('是','Y','1','y') OR is_active=1)
- """,
- (TENANT,),
- )
- snap["active_source"] = cur.fetchone()["c"]
- cur.execute(
- """
- SELECT COUNT(*) AS c FROM PurOrdMaster m
- INNER JOIN PurOrdDetail d ON m.RecID=d.PurOrdRecID
- WHERE m.tenant_id=%s AND IFNULL(m.Status,'')<>'C' AND IFNULL(d.Status,'')<>'C'
- AND (IFNULL(d.QtyOrded,0)-IFNULL(d.RctQty,0))>0
- """,
- (TENANT,),
- )
- snap["po_do_lines"] = cur.fetchone()["c"]
- cur.execute(
- """
- SELECT COUNT(*) AS c FROM DeliveryExceptionMaster
- WHERE tenant_id=%s AND OptTime >= DATE_SUB(NOW(), INTERVAL 24 HOUR)
- """,
- (TENANT,),
- )
- snap["exceptions_24h"] = cur.fetchone()["c"]
- return snap
- def sql_after(conn):
- snap = sql_before(conn)
- with conn.cursor() as cur:
- cur.execute(
- """
- SELECT COUNT(*) AS c FROM srm_pr_main
- WHERE tenant_id=%s AND create_time >= DATE_SUB(NOW(), INTERVAL 2 HOUR)
- """,
- (TENANT,),
- )
- snap["pr_2h"] = cur.fetchone()["c"]
- cur.execute(
- """
- SELECT id, action_code, status, message, start_time
- FROM aidop_action_run_log
- WHERE tenant_id=%s AND action_code IN ('S3_DELIVERY_GENERATE','S4_PROCUREMENT_PIPELINE')
- ORDER BY start_time DESC LIMIT 5
- """,
- (TENANT,),
- )
- snap["action_logs"] = cur.fetchall()
- return snap
- def build_step_checks(before, after, gen_result, batch_result, pipe_result):
- gen = gen_result if isinstance(gen_result, dict) else {}
- batch = batch_result if isinstance(batch_result, dict) else {}
- pipe = pipe_result if isinstance(pipe_result, dict) else {}
- rule = gen.get("ruleSnapshot") or gen.get("RuleSnapshot") or {}
- created = int(gen.get("createdCount") or gen.get("CreatedCount") or 0)
- batch_created = int(batch.get("createdCount") or batch.get("CreatedCount") or 0)
- exception_count = int(gen.get("exceptionCount") or gen.get("ExceptionCount") or 0)
- ds_after = after.get("ds_24h") or 0
- ds_before = before.get("ds_24h") or 0
- checks = []
- checks.append({
- "id": "step01_demand_filter",
- "ok": (before.get("demand_published") or 0) >= 0,
- "detail": f"demand_published={before.get('demand_published')}",
- "note": "为0时走 PO 批量生成路径",
- })
- checks.append({
- "id": "step02_existing_ds",
- "ok": True,
- "detail": f"existing_ds_24h={ds_before}",
- "note": "已有交货单由服务内 QueryExistingScheduleUsages 处理",
- })
- checks.append({
- "id": "step05_source_list",
- "ok": (before.get("active_source") or 0) > 0,
- "detail": f"active_source={before.get('active_source')}",
- })
- checks.append({
- "id": "step06_po_lines",
- "ok": (before.get("po_do_lines") or 0) > 0,
- "detail": f"po_do_lines={before.get('po_do_lines')}",
- })
- checks.append({
- "id": "step09_quota_rule",
- "ok": bool(rule) or created > 0 or batch_created > 0,
- "detail": "ruleSnapshot present" if rule else "no snapshot in API body",
- })
- checks.append({
- "id": "step12_15_generate",
- "ok": created > 0 or batch_created > 0 or exception_count > 0 or (gen.get("demandCount") or 0) == 0,
- "detail": {
- "demand_generate": {
- "demandCount": gen.get("demandCount"),
- "createdCount": created,
- "exceptionCount": exception_count,
- "message": gen.get("message"),
- },
- "batch_generate": {
- "po": PO_SAMPLE,
- "createdCount": batch_created,
- "message": batch.get("message"),
- },
- },
- })
- checks.append({
- "id": "step16_18_persist",
- "ok": ds_after >= ds_before or created > 0 or batch_created > 0 or (after.get("exceptions_24h") or 0) > (before.get("exceptions_24h") or 0),
- "detail": f"ds_24h {ds_before}->{ds_after}, exceptions_24h={after.get('exceptions_24h')}",
- })
- checks.append({
- "id": "step_procurement",
- "ok": isinstance(pipe, dict) and (pipe.get("message") is not None),
- "detail": {
- "message": pipe.get("message"),
- "prMergeReducedCount": pipe.get("prMergeReducedCount"),
- "poCreatedCount": pipe.get("poCreatedCount"),
- "qadTrackingCount": pipe.get("qadTrackingCount"),
- "warnings": pipe.get("warnings"),
- },
- })
- return checks
- def main():
- report = {"tenant_id": str(TENANT), "po_sample": PO_SAMPLE, "steps": [], "checks": []}
- token = login()
- report["login"] = "ok"
- with pymysql.connect(**CONN) as conn:
- report["sql_before"] = sql_before(conn)
- st, body = http_json("POST", "/api/Supply/delivery-schedule/generate", None, token)
- gen_result = unwrap(body)
- report["steps"].append({"step": "demand_generate", "http": st, "body": gen_result})
- batch_result = {}
- demand_count = int(gen_result.get("demandCount") or gen_result.get("DemandCount") or 0) if isinstance(gen_result, dict) else 0
- created_count = int(gen_result.get("createdCount") or gen_result.get("CreatedCount") or 0) if isinstance(gen_result, dict) else 0
- if demand_count == 0 or created_count == 0:
- st2, body2 = http_json(
- "POST",
- "/api/Supply/delivery-schedule/batch-generate",
- {"poNumber": PO_SAMPLE},
- token,
- )
- batch_result = unwrap(body2)
- report["steps"].append({"step": "batch_generate_by_po", "http": st2, "body": batch_result})
- st3, body3 = http_json(
- "POST",
- f"/api/Supply/procurement/execute-pipeline?domain={TENANT}&createFromShortage=false&mergeHistorical=true",
- None,
- token,
- )
- pipe_result = unwrap(body3)
- report["steps"].append({"step": "procurement_pipeline", "http": st3, "body": pipe_result})
- with pymysql.connect(**CONN) as conn:
- report["sql_after"] = sql_after(conn)
- report["step_checks"] = build_step_checks(
- report["sql_before"],
- report["sql_after"],
- gen_result,
- batch_result,
- pipe_result,
- )
- report["checks"] = [
- {"name": "demand_generate_http", "ok": st == 200},
- {"name": "procurement_pipeline_http", "ok": st3 == 200},
- {"name": "active_source_exists", "ok": (report["sql_before"].get("active_source") or 0) > 0},
- {"name": "po_lines_exist", "ok": (report["sql_before"].get("po_do_lines") or 0) > 0},
- {
- "name": "delivery_created_or_exception",
- "ok": created_count > 0
- or int((batch_result or {}).get("createdCount") or (batch_result or {}).get("CreatedCount") or 0) > 0
- or int(gen_result.get("exceptionCount") or gen_result.get("ExceptionCount") or 0) > 0
- if isinstance(gen_result, dict)
- else False,
- "detail": "需求路径或 PO 批量路径或异常记录",
- },
- {
- "name": "s3_action_log",
- "ok": any(
- (r.get("action_code") == "S3_DELIVERY_GENERATE" and r.get("status") == "SUCCESS")
- for r in (report["sql_after"].get("action_logs") or [])
- ),
- "detail": report["sql_after"].get("action_logs"),
- },
- ]
- report["checks"].extend(
- {"name": c["id"], "ok": c["ok"], "detail": c.get("detail"), "note": c.get("note")}
- for c in report["step_checks"]
- )
- report["passed"] = all(c["ok"] for c in report["checks"] if c["name"] in {
- "demand_generate_http",
- "procurement_pipeline_http",
- "active_source_exists",
- "po_lines_exist",
- "s3_action_log",
- "step05_source_list",
- "step06_po_lines",
- "step_procurement",
- })
- out = r"d:\Projects\Ai-DOP\SourceCode\ZZYDOP\doc\_verify_s3_delivery_18step_e2e_result.json"
- with open(out, "w", encoding="utf-8") as f:
- json.dump(report, f, ensure_ascii=False, indent=2, default=str)
- print(json.dumps(report, ensure_ascii=False, indent=2, default=str))
- sys.exit(0 if report["passed"] else 6)
- if __name__ == "__main__":
- main()
|