| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211 |
- #!/usr/bin/env python3
- """165 开发库:补建 L2 宽表 dwd_po_trans / dwd_qc_trans 及 1.0.154 相关迁移(幂等)。"""
- from __future__ import annotations
- import sys
- from pathlib import Path
- import pymysql
- DB = dict(
- host="123.60.180.165",
- port=3306,
- user="aidopremote",
- password="1234567890aiDOP#",
- database="aidopdev",
- charset="utf8mb4",
- )
- CREATE_DWD_PO_TRANS = """
- CREATE TABLE IF NOT EXISTS dwd_po_trans (
- id BIGINT AUTO_INCREMENT PRIMARY KEY,
- tenant_id BIGINT NOT NULL,
- factory_id BIGINT NULL DEFAULT 1,
- po_no VARCHAR(50),
- po_line VARCHAR(50),
- supplier_code VARCHAR(50),
- item_code VARCHAR(50),
- order_qty DECIMAL(12,3),
- received_qty DECIMAL(12,3),
- returned_qty DECIMAL(12,3) NULL DEFAULT 0,
- shortage_qty DECIMAL(12,3) NULL DEFAULT 0,
- due_date DATE NULL,
- actual_arrival_date DATE NULL,
- risk_level VARCHAR(20) NULL,
- trans_date DATE,
- source_system VARCHAR(20),
- sync_batch_id VARCHAR(100) NULL,
- sync_time DATETIME,
- INDEX idx_tenant_date (tenant_id, trans_date),
- INDEX idx_tenant_supplier (tenant_id, supplier_code)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='采购交易明细宽表'
- """
- CREATE_DWD_QC_TRANS = """
- CREATE TABLE IF NOT EXISTS dwd_qc_trans (
- id BIGINT AUTO_INCREMENT PRIMARY KEY,
- tenant_id BIGINT NOT NULL,
- item_code VARCHAR(50),
- supplier_code VARCHAR(50),
- batch_no VARCHAR(50),
- sample_qty INT,
- defect_qty INT,
- result ENUM('PASS','FAIL','CONCESSION'),
- trans_date DATE,
- source_system VARCHAR(20),
- sync_time DATETIME,
- UNIQUE KEY uk_dwd_qc_trans (tenant_id, trans_date, item_code, supplier_code, batch_no),
- INDEX idx_tenant_date (tenant_id, trans_date),
- INDEX idx_tenant_item (tenant_id, item_code),
- INDEX idx_tenant_supplier (tenant_id, supplier_code)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='质检交易明细宽表'
- """
- ADD_QC_UNIQUE_KEY = """
- ALTER TABLE dwd_qc_trans
- ADD UNIQUE KEY uk_dwd_qc_trans (tenant_id, trans_date, item_code, supplier_code, batch_no)
- """
- ALTER_COLUMNS = [
- ("factory_id", "ALTER TABLE dwd_po_trans ADD COLUMN factory_id BIGINT NULL DEFAULT 1 AFTER tenant_id"),
- ("po_line", "ALTER TABLE dwd_po_trans ADD COLUMN po_line VARCHAR(50) NULL AFTER po_no"),
- ("returned_qty", "ALTER TABLE dwd_po_trans ADD COLUMN returned_qty DECIMAL(12,3) NULL DEFAULT 0 AFTER received_qty"),
- ("shortage_qty", "ALTER TABLE dwd_po_trans ADD COLUMN shortage_qty DECIMAL(12,3) NULL DEFAULT 0 AFTER returned_qty"),
- ("due_date", "ALTER TABLE dwd_po_trans ADD COLUMN due_date DATE NULL AFTER shortage_qty"),
- ("actual_arrival_date", "ALTER TABLE dwd_po_trans ADD COLUMN actual_arrival_date DATE NULL AFTER due_date"),
- ("risk_level", "ALTER TABLE dwd_po_trans ADD COLUMN risk_level VARCHAR(20) NULL AFTER actual_arrival_date"),
- ("sync_batch_id", "ALTER TABLE dwd_po_trans ADD COLUMN sync_batch_id VARCHAR(100) NULL AFTER source_system"),
- ]
- MDP_ENTITY_INSERT = """
- INSERT INTO mdp_entity
- (tenant_id, source_id, entity_code, entity_name, entity_type, source_table_name, target_table_name, sync_mode, batch_size, status, remark)
- SELECT 0, s.id, v.entity_code, v.entity_name, 'TABLE', v.source_table_name, v.target_table_name, 'FULL', 5000, 1, v.remark
- FROM mdp_source s
- JOIN (
- SELECT 'S4_IQC_RECEIPT' AS entity_code, 'S4 IQC收货明细' AS entity_name, 'PurOrdRctDetail' AS source_table_name, 'mdp_stg_s4_iqc' AS target_table_name, 'S4 采购执行 IQC/收货贴源,不重复 S3 采购订单主链' AS remark
- UNION ALL SELECT 'S4_SHIPMENT_EXEC', 'S4 发货执行明细', 'scm_shdzb', 'mdp_stg_s4_shipment', 'S4 供应商发货单执行贴源'
- UNION ALL SELECT 'S4_RETURN_EXEC', 'S4 采购退货行', 'srm_polist_ds', 'mdp_stg_s4_return', 'S4 交货计划退货数量贴源'
- UNION ALL SELECT 'S4_SHORTAGE_EXEC', 'S4 欠料执行', 'dwd_material_shortage', 'mdp_stg_s4_shortage', 'S4 消费 S3 缺料 DWD 快照,不新增第三套工单贴源'
- ) v ON 1=1
- WHERE s.tenant_id = 0 AND s.source_code = 'AIDOPDEV_MYSQL'
- ON DUPLICATE KEY UPDATE
- source_id = VALUES(source_id),
- entity_name = VALUES(entity_name),
- source_table_name = VALUES(source_table_name),
- target_table_name = VALUES(target_table_name),
- remark = VALUES(remark),
- update_time = CURRENT_TIMESTAMP
- """
- def index_exists(cur, table: str, index_name: str) -> bool:
- cur.execute(
- """
- SELECT 1 FROM information_schema.STATISTICS
- WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = %s AND INDEX_NAME = %s
- LIMIT 1
- """,
- (table, index_name),
- )
- return cur.fetchone() is not None
- def column_exists(cur, table: str, column: str) -> bool:
- cur.execute(
- """
- SELECT 1 FROM information_schema.COLUMNS
- WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = %s AND COLUMN_NAME = %s
- LIMIT 1
- """,
- (table, column),
- )
- return cur.fetchone() is not None
- def table_exists(cur, table: str) -> bool:
- cur.execute(
- """
- SELECT 1 FROM information_schema.TABLES
- WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = %s
- LIMIT 1
- """,
- (table,),
- )
- return cur.fetchone() is not None
- def main() -> int:
- conn = pymysql.connect(**DB, autocommit=False)
- cur = conn.cursor()
- try:
- print("1) CREATE TABLE dwd_po_trans (if missing)...")
- cur.execute(CREATE_DWD_PO_TRANS)
- conn.commit()
- print(f" table exists: {table_exists(cur, 'dwd_po_trans')}")
- print("2) Ensure extension columns (idempotent)...")
- for col, ddl in ALTER_COLUMNS:
- if column_exists(cur, "dwd_po_trans", col):
- print(f" skip column {col} (already exists)")
- continue
- cur.execute(ddl)
- conn.commit()
- print(f" added column {col}")
- print("3) Register S4 mdp_entity rows...")
- cur.execute("SELECT COUNT(*) FROM mdp_source WHERE tenant_id=0 AND source_code='AIDOPDEV_MYSQL'")
- source_count = cur.fetchone()[0]
- if source_count == 0:
- print(" [WARN] mdp_source AIDOPDEV_MYSQL not found; skip mdp_entity insert")
- else:
- cur.execute(MDP_ENTITY_INSERT)
- conn.commit()
- cur.execute(
- "SELECT entity_code FROM mdp_entity WHERE entity_code IN ('S4_IQC_RECEIPT','S4_SHIPMENT_EXEC','S4_RETURN_EXEC','S4_SHORTAGE_EXEC') ORDER BY entity_code"
- )
- rows = [r[0] for r in cur.fetchall()]
- print(f" mdp_entity rows: {rows}")
- print("4) CREATE TABLE dwd_qc_trans (if missing)...")
- cur.execute(CREATE_DWD_QC_TRANS)
- conn.commit()
- print(f" table exists: {table_exists(cur, 'dwd_qc_trans')}")
- if table_exists(cur, "dwd_qc_trans") and not index_exists(cur, "dwd_qc_trans", "uk_dwd_qc_trans"):
- try:
- cur.execute(ADD_QC_UNIQUE_KEY)
- conn.commit()
- print(" added unique key uk_dwd_qc_trans")
- except Exception as exc: # noqa: BLE001
- conn.rollback()
- print(f" [WARN] could not add uk_dwd_qc_trans: {exc}")
- else:
- print(" skip unique key uk_dwd_qc_trans (already exists)")
- cur.execute(
- "SELECT COUNT(*) FROM information_schema.COLUMNS WHERE TABLE_SCHEMA=DATABASE() AND TABLE_NAME='dwd_po_trans'"
- )
- po_col_count = cur.fetchone()[0]
- cur.execute("SELECT COUNT(*) FROM dwd_po_trans")
- po_row_count = cur.fetchone()[0]
- cur.execute(
- "SELECT COUNT(*) FROM information_schema.COLUMNS WHERE TABLE_SCHEMA=DATABASE() AND TABLE_NAME='dwd_qc_trans'"
- )
- qc_col_count = cur.fetchone()[0]
- cur.execute("SELECT COUNT(*) FROM dwd_qc_trans")
- qc_row_count = cur.fetchone()[0]
- print(f"\nDone.")
- print(f" dwd_po_trans: columns={po_col_count}, rows={po_row_count}")
- print(f" dwd_qc_trans: columns={qc_col_count}, rows={qc_row_count}")
- return 0
- except Exception as exc: # noqa: BLE001
- conn.rollback()
- print(f"[ERROR] {exc}", file=sys.stderr)
- return 1
- finally:
- conn.close()
- if __name__ == "__main__":
- raise SystemExit(main())
|