run_dwd_po_trans_migration.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. #!/usr/bin/env python3
  2. """165 开发库:补建 L2 宽表 dwd_po_trans / dwd_qc_trans 及 1.0.154 相关迁移(幂等)。"""
  3. from __future__ import annotations
  4. import sys
  5. from pathlib import Path
  6. import pymysql
  7. DB = dict(
  8. host="123.60.180.165",
  9. port=3306,
  10. user="aidopremote",
  11. password="1234567890aiDOP#",
  12. database="aidopdev",
  13. charset="utf8mb4",
  14. )
  15. CREATE_DWD_PO_TRANS = """
  16. CREATE TABLE IF NOT EXISTS dwd_po_trans (
  17. id BIGINT AUTO_INCREMENT PRIMARY KEY,
  18. tenant_id BIGINT NOT NULL,
  19. factory_id BIGINT NULL DEFAULT 1,
  20. po_no VARCHAR(50),
  21. po_line VARCHAR(50),
  22. supplier_code VARCHAR(50),
  23. item_code VARCHAR(50),
  24. order_qty DECIMAL(12,3),
  25. received_qty DECIMAL(12,3),
  26. returned_qty DECIMAL(12,3) NULL DEFAULT 0,
  27. shortage_qty DECIMAL(12,3) NULL DEFAULT 0,
  28. due_date DATE NULL,
  29. actual_arrival_date DATE NULL,
  30. risk_level VARCHAR(20) NULL,
  31. trans_date DATE,
  32. source_system VARCHAR(20),
  33. sync_batch_id VARCHAR(100) NULL,
  34. sync_time DATETIME,
  35. INDEX idx_tenant_date (tenant_id, trans_date),
  36. INDEX idx_tenant_supplier (tenant_id, supplier_code)
  37. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='采购交易明细宽表'
  38. """
  39. CREATE_DWD_QC_TRANS = """
  40. CREATE TABLE IF NOT EXISTS dwd_qc_trans (
  41. id BIGINT AUTO_INCREMENT PRIMARY KEY,
  42. tenant_id BIGINT NOT NULL,
  43. item_code VARCHAR(50),
  44. supplier_code VARCHAR(50),
  45. batch_no VARCHAR(50),
  46. sample_qty INT,
  47. defect_qty INT,
  48. result ENUM('PASS','FAIL','CONCESSION'),
  49. trans_date DATE,
  50. source_system VARCHAR(20),
  51. sync_time DATETIME,
  52. UNIQUE KEY uk_dwd_qc_trans (tenant_id, trans_date, item_code, supplier_code, batch_no),
  53. INDEX idx_tenant_date (tenant_id, trans_date),
  54. INDEX idx_tenant_item (tenant_id, item_code),
  55. INDEX idx_tenant_supplier (tenant_id, supplier_code)
  56. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='质检交易明细宽表'
  57. """
  58. ADD_QC_UNIQUE_KEY = """
  59. ALTER TABLE dwd_qc_trans
  60. ADD UNIQUE KEY uk_dwd_qc_trans (tenant_id, trans_date, item_code, supplier_code, batch_no)
  61. """
  62. ALTER_COLUMNS = [
  63. ("factory_id", "ALTER TABLE dwd_po_trans ADD COLUMN factory_id BIGINT NULL DEFAULT 1 AFTER tenant_id"),
  64. ("po_line", "ALTER TABLE dwd_po_trans ADD COLUMN po_line VARCHAR(50) NULL AFTER po_no"),
  65. ("returned_qty", "ALTER TABLE dwd_po_trans ADD COLUMN returned_qty DECIMAL(12,3) NULL DEFAULT 0 AFTER received_qty"),
  66. ("shortage_qty", "ALTER TABLE dwd_po_trans ADD COLUMN shortage_qty DECIMAL(12,3) NULL DEFAULT 0 AFTER returned_qty"),
  67. ("due_date", "ALTER TABLE dwd_po_trans ADD COLUMN due_date DATE NULL AFTER shortage_qty"),
  68. ("actual_arrival_date", "ALTER TABLE dwd_po_trans ADD COLUMN actual_arrival_date DATE NULL AFTER due_date"),
  69. ("risk_level", "ALTER TABLE dwd_po_trans ADD COLUMN risk_level VARCHAR(20) NULL AFTER actual_arrival_date"),
  70. ("sync_batch_id", "ALTER TABLE dwd_po_trans ADD COLUMN sync_batch_id VARCHAR(100) NULL AFTER source_system"),
  71. ]
  72. MDP_ENTITY_INSERT = """
  73. INSERT INTO mdp_entity
  74. (tenant_id, source_id, entity_code, entity_name, entity_type, source_table_name, target_table_name, sync_mode, batch_size, status, remark)
  75. SELECT 0, s.id, v.entity_code, v.entity_name, 'TABLE', v.source_table_name, v.target_table_name, 'FULL', 5000, 1, v.remark
  76. FROM mdp_source s
  77. JOIN (
  78. SELECT 'S4_IQC_RECEIPT' AS entity_code, 'S4 IQC收货明细' AS entity_name, 'PurOrdRctDetail' AS source_table_name, 'mdp_stg_s4_iqc' AS target_table_name, 'S4 采购执行 IQC/收货贴源,不重复 S3 采购订单主链' AS remark
  79. UNION ALL SELECT 'S4_SHIPMENT_EXEC', 'S4 发货执行明细', 'scm_shdzb', 'mdp_stg_s4_shipment', 'S4 供应商发货单执行贴源'
  80. UNION ALL SELECT 'S4_RETURN_EXEC', 'S4 采购退货行', 'srm_polist_ds', 'mdp_stg_s4_return', 'S4 交货计划退货数量贴源'
  81. UNION ALL SELECT 'S4_SHORTAGE_EXEC', 'S4 欠料执行', 'dwd_material_shortage', 'mdp_stg_s4_shortage', 'S4 消费 S3 缺料 DWD 快照,不新增第三套工单贴源'
  82. ) v ON 1=1
  83. WHERE s.tenant_id = 0 AND s.source_code = 'AIDOPDEV_MYSQL'
  84. ON DUPLICATE KEY UPDATE
  85. source_id = VALUES(source_id),
  86. entity_name = VALUES(entity_name),
  87. source_table_name = VALUES(source_table_name),
  88. target_table_name = VALUES(target_table_name),
  89. remark = VALUES(remark),
  90. update_time = CURRENT_TIMESTAMP
  91. """
  92. def index_exists(cur, table: str, index_name: str) -> bool:
  93. cur.execute(
  94. """
  95. SELECT 1 FROM information_schema.STATISTICS
  96. WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = %s AND INDEX_NAME = %s
  97. LIMIT 1
  98. """,
  99. (table, index_name),
  100. )
  101. return cur.fetchone() is not None
  102. def column_exists(cur, table: str, column: str) -> bool:
  103. cur.execute(
  104. """
  105. SELECT 1 FROM information_schema.COLUMNS
  106. WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = %s AND COLUMN_NAME = %s
  107. LIMIT 1
  108. """,
  109. (table, column),
  110. )
  111. return cur.fetchone() is not None
  112. def table_exists(cur, table: str) -> bool:
  113. cur.execute(
  114. """
  115. SELECT 1 FROM information_schema.TABLES
  116. WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = %s
  117. LIMIT 1
  118. """,
  119. (table,),
  120. )
  121. return cur.fetchone() is not None
  122. def main() -> int:
  123. conn = pymysql.connect(**DB, autocommit=False)
  124. cur = conn.cursor()
  125. try:
  126. print("1) CREATE TABLE dwd_po_trans (if missing)...")
  127. cur.execute(CREATE_DWD_PO_TRANS)
  128. conn.commit()
  129. print(f" table exists: {table_exists(cur, 'dwd_po_trans')}")
  130. print("2) Ensure extension columns (idempotent)...")
  131. for col, ddl in ALTER_COLUMNS:
  132. if column_exists(cur, "dwd_po_trans", col):
  133. print(f" skip column {col} (already exists)")
  134. continue
  135. cur.execute(ddl)
  136. conn.commit()
  137. print(f" added column {col}")
  138. print("3) Register S4 mdp_entity rows...")
  139. cur.execute("SELECT COUNT(*) FROM mdp_source WHERE tenant_id=0 AND source_code='AIDOPDEV_MYSQL'")
  140. source_count = cur.fetchone()[0]
  141. if source_count == 0:
  142. print(" [WARN] mdp_source AIDOPDEV_MYSQL not found; skip mdp_entity insert")
  143. else:
  144. cur.execute(MDP_ENTITY_INSERT)
  145. conn.commit()
  146. cur.execute(
  147. "SELECT entity_code FROM mdp_entity WHERE entity_code IN ('S4_IQC_RECEIPT','S4_SHIPMENT_EXEC','S4_RETURN_EXEC','S4_SHORTAGE_EXEC') ORDER BY entity_code"
  148. )
  149. rows = [r[0] for r in cur.fetchall()]
  150. print(f" mdp_entity rows: {rows}")
  151. print("4) CREATE TABLE dwd_qc_trans (if missing)...")
  152. cur.execute(CREATE_DWD_QC_TRANS)
  153. conn.commit()
  154. print(f" table exists: {table_exists(cur, 'dwd_qc_trans')}")
  155. if table_exists(cur, "dwd_qc_trans") and not index_exists(cur, "dwd_qc_trans", "uk_dwd_qc_trans"):
  156. try:
  157. cur.execute(ADD_QC_UNIQUE_KEY)
  158. conn.commit()
  159. print(" added unique key uk_dwd_qc_trans")
  160. except Exception as exc: # noqa: BLE001
  161. conn.rollback()
  162. print(f" [WARN] could not add uk_dwd_qc_trans: {exc}")
  163. else:
  164. print(" skip unique key uk_dwd_qc_trans (already exists)")
  165. cur.execute(
  166. "SELECT COUNT(*) FROM information_schema.COLUMNS WHERE TABLE_SCHEMA=DATABASE() AND TABLE_NAME='dwd_po_trans'"
  167. )
  168. po_col_count = cur.fetchone()[0]
  169. cur.execute("SELECT COUNT(*) FROM dwd_po_trans")
  170. po_row_count = cur.fetchone()[0]
  171. cur.execute(
  172. "SELECT COUNT(*) FROM information_schema.COLUMNS WHERE TABLE_SCHEMA=DATABASE() AND TABLE_NAME='dwd_qc_trans'"
  173. )
  174. qc_col_count = cur.fetchone()[0]
  175. cur.execute("SELECT COUNT(*) FROM dwd_qc_trans")
  176. qc_row_count = cur.fetchone()[0]
  177. print(f"\nDone.")
  178. print(f" dwd_po_trans: columns={po_col_count}, rows={po_row_count}")
  179. print(f" dwd_qc_trans: columns={qc_col_count}, rows={qc_row_count}")
  180. return 0
  181. except Exception as exc: # noqa: BLE001
  182. conn.rollback()
  183. print(f"[ERROR] {exc}", file=sys.stderr)
  184. return 1
  185. finally:
  186. conn.close()
  187. if __name__ == "__main__":
  188. raise SystemExit(main())