| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218 |
- #!/usr/bin/env python3
- """Verify key table row counts and compare backup vs 165 schema for failed imports."""
- from __future__ import annotations
- import re
- from pathlib import Path
- import pymysql
- SQL_PATH = Path(
- r"D:\Projects\Ai-DOP\阿里云数据库备份\aidopdev_20260608_053527.sql\aidopdev_20260608_053527.sql"
- )
- DB = dict(
- host="123.60.180.165",
- port=3306,
- user="aidopremote",
- password="1234567890aiDOP#",
- database="aidopdev",
- charset="utf8mb4",
- )
- FAILED_TABLES = [
- "itemmaster",
- "prioritycode",
- "purordrctdetail",
- "purordrctmaster",
- "scm_jhjh_jq",
- "scm_shd",
- "scm_shdzb",
- "nbrdayinfo",
- ]
- KEY_TABLES = [
- "sysuser",
- "systenant",
- "ado_smart_ops_kpi_master",
- "ado_smart_ops_layout_item",
- "ado_smart_ops_dashboard_widget",
- "ado_smart_ops_dashboard_page_config",
- "mdp_stg_item",
- "mdp_std_item",
- "mdp_stg_supplier",
- "mdp_stg_purchase_order",
- "mdp_stg_work_order_material",
- "ic_item",
- "dwd_process_outsource_delivery",
- "abpauditlogs",
- "approvalflowtask",
- "syslogop",
- "sysmenu",
- "sysrole",
- ]
- def normalize(name: str) -> str:
- return name.strip("`").lower()
- def backup_create_columns(text: str, table: str) -> list[str]:
- match = re.search(
- rf"CREATE TABLE `{re.escape(table)}`\s*\((.*?)\)\s*ENGINE",
- text,
- re.IGNORECASE | re.DOTALL,
- )
- if not match:
- return []
- cols = []
- for line in match.group(1).splitlines():
- line = line.strip()
- if line.startswith("`"):
- cols.append(line.split("`", 2)[1])
- return cols
- def first_insert_info(text: str, table: str) -> tuple[str | None, int | None, list[str] | None]:
- pattern = re.compile(rf"^INSERT INTO `{re.escape(table)}`(.+?);", re.MULTILINE | re.DOTALL)
- match = pattern.search(text)
- if not match:
- return None, None, None
- tail = match.group(1).strip()
- named_cols: list[str] | None = None
- if tail.startswith("(") and ")" in tail:
- head, rest = tail[1:].split(")", 1)
- if "VALUES" not in head.upper():
- named_cols = [c.strip().strip("`") for c in head.split(",")]
- tail = rest.strip()
- values_idx = tail.upper().find("VALUES")
- if values_idx < 0:
- return tail[:120], None, named_cols
- values_part = tail[values_idx + 6 :].strip()
- if not values_part.startswith("("):
- return tail[:120], None, named_cols
- depth = 0
- in_str = False
- esc = False
- end = 0
- for i, ch in enumerate(values_part):
- if esc:
- esc = False
- continue
- if ch == "\\" and in_str:
- esc = True
- continue
- if ch == "'":
- in_str = not in_str
- continue
- if in_str:
- continue
- if ch == "(":
- depth += 1
- elif ch == ")":
- depth -= 1
- if depth == 0:
- end = i + 1
- break
- first_tuple = values_part[:end]
- inner = first_tuple[1:-1]
- vals: list[str] = []
- cur: list[str] = []
- depth = 0
- in_str = False
- esc = False
- for ch in inner:
- if esc:
- cur.append(ch)
- esc = False
- continue
- if ch == "\\" and in_str:
- cur.append(ch)
- esc = True
- continue
- if ch == "'":
- in_str = not in_str
- cur.append(ch)
- continue
- if in_str:
- cur.append(ch)
- continue
- if ch == "(":
- depth += 1
- cur.append(ch)
- continue
- if ch == ")":
- depth -= 1
- cur.append(ch)
- continue
- if ch == "," and depth == 0:
- vals.append("".join(cur).strip())
- cur = []
- continue
- cur.append(ch)
- if cur:
- vals.append("".join(cur).strip())
- return tail[:120], len(vals), named_cols
- def resolve_table(cur, lower_name: str) -> str | None:
- cur.execute(
- "SELECT TABLE_NAME FROM information_schema.TABLES "
- "WHERE TABLE_SCHEMA = DATABASE() AND LOWER(TABLE_NAME) = %s",
- (lower_name,),
- )
- row = cur.fetchone()
- return row[0] if row else None
- def target_columns(cur, table: str) -> list[str]:
- cur.execute(
- "SELECT COLUMN_NAME FROM information_schema.COLUMNS "
- "WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = %s "
- "ORDER BY ORDINAL_POSITION",
- (table,),
- )
- return [r[0] for r in cur.fetchall()]
- def main() -> None:
- text = SQL_PATH.read_text(encoding="utf-8", errors="replace")
- conn = pymysql.connect(**DB)
- cur = conn.cursor()
- print("=== Key business table row counts (165) ===")
- for lower in KEY_TABLES:
- actual = resolve_table(cur, lower)
- if not actual:
- print(f" {lower}: MISSING")
- continue
- cur.execute(f"SELECT COUNT(*) FROM `{actual}`")
- print(f" {actual}: {cur.fetchone()[0]:,}")
- print("\n=== Failed import tables: schema diff ===")
- for lower in FAILED_TABLES:
- actual = resolve_table(cur, lower)
- backup_cols = backup_create_columns(text, lower)
- _, val_count, named_cols = first_insert_info(text, lower)
- target_cols = target_columns(cur, actual) if actual else []
- print(f"\n[{lower} -> {actual or 'MISSING'}]")
- print(f" backup CREATE columns: {len(backup_cols)}")
- print(f" backup INSERT value count (1st row): {val_count}")
- if named_cols:
- print(f" backup INSERT named columns ({len(named_cols)}): {named_cols[:8]}...")
- print(f" target columns: {len(target_cols)}")
- if backup_cols and target_cols:
- backup_set = {c.lower() for c in backup_cols}
- target_set = {c.lower() for c in target_cols}
- only_backup = sorted(backup_set - target_set)
- only_target = sorted(target_set - backup_set)
- if only_backup:
- print(f" only in backup: {only_backup[:12]}{'...' if len(only_backup)>12 else ''}")
- if only_target:
- print(f" only on 165: {only_target[:12]}{'...' if len(only_target)>12 else ''}")
- conn.close()
- if __name__ == "__main__":
- main()
|