| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264 |
- #!/usr/bin/env python3
- """
- Import mysqldump INSERT rows for tables whose column layout differs between backup and 165.
- Maps backup column names to target columns (by name), fills missing target columns with NULL,
- uses INSERT IGNORE to skip duplicates.
- Also skips procedure-internal INSERT ... (col list) statements.
- """
- from __future__ import annotations
- import argparse
- import re
- import sys
- from pathlib import Path
- import pymysql
- DEFAULT_SOURCE = (
- Path(__file__).resolve().parents[4]
- / "阿里云数据库备份"
- / "aidopdev_20260608_053527.sql"
- / "aidopdev_20260608_053527.sql"
- )
- DEFAULT_DB = dict(
- host="123.60.180.165",
- port=3306,
- user="aidopremote",
- password="1234567890aiDOP#",
- database="aidopdev",
- charset="utf8mb4",
- )
- TABLES = [
- "itemmaster",
- "prioritycode",
- "purordrctdetail",
- "purordrctmaster",
- "scm_jhjh_jq",
- "scm_shd",
- "scm_shdzb",
- "nbrdayinfo",
- ]
- VALUES_INSERT_RE = re.compile(
- r"^INSERT\s+INTO\s+[`']?([^`'\s(]+)[`']?\s+VALUES\s",
- re.IGNORECASE,
- )
- # backup column name (lower) -> target column name when names differ
- COLUMN_ALIASES: dict[str, str] = {
- "expire_alarm_day": "ExpireAlarmDays",
- }
- def normalize(name: str) -> str:
- return name.strip("`'\"").lower()
- def backup_create_columns(text: str, table: str) -> list[str]:
- match = re.search(
- rf"CREATE TABLE `{re.escape(table)}`\s*\((.*?)\)\s*ENGINE",
- text,
- re.IGNORECASE | re.DOTALL,
- )
- if not match:
- return []
- cols: list[str] = []
- for line in match.group(1).splitlines():
- line = line.strip()
- if line.startswith("`"):
- cols.append(line.split("`", 2)[1])
- return cols
- def split_sql_values(values_blob: str) -> list[str]:
- rows: list[str] = []
- i = 0
- n = len(values_blob)
- while i < n:
- while i < n and values_blob[i] in " \t\r\n,":
- i += 1
- if i >= n:
- break
- if values_blob[i] != "(":
- break
- depth = 0
- in_str = False
- esc = False
- start = i
- while i < n:
- ch = values_blob[i]
- if esc:
- esc = False
- elif ch == "\\" and in_str:
- esc = True
- elif ch == "'":
- in_str = not in_str
- elif not in_str:
- if ch == "(":
- depth += 1
- elif ch == ")":
- depth -= 1
- if depth == 0:
- rows.append(values_blob[start : i + 1])
- i += 1
- break
- i += 1
- return rows
- def split_row_fields(row_sql: str) -> list[str]:
- inner = row_sql.strip()[1:-1]
- fields: list[str] = []
- cur: list[str] = []
- depth = 0
- in_str = False
- esc = False
- for ch in inner:
- if esc:
- cur.append(ch)
- esc = False
- continue
- if ch == "\\" and in_str:
- cur.append(ch)
- esc = True
- continue
- if ch == "'":
- in_str = not in_str
- cur.append(ch)
- continue
- if in_str:
- cur.append(ch)
- continue
- if ch == "(":
- depth += 1
- cur.append(ch)
- continue
- if ch == ")":
- depth -= 1
- cur.append(ch)
- continue
- if ch == "," and depth == 0:
- fields.append("".join(cur).strip())
- cur = []
- continue
- cur.append(ch)
- if cur:
- fields.append("".join(cur).strip())
- return fields
- def resolve_target_table(cur, lower_name: str) -> str | None:
- cur.execute(
- "SELECT TABLE_NAME FROM information_schema.TABLES "
- "WHERE TABLE_SCHEMA = DATABASE() AND LOWER(TABLE_NAME) = %s",
- (lower_name,),
- )
- row = cur.fetchone()
- return row[0] if row else None
- def target_columns(cur, table: str) -> list[str]:
- cur.execute(
- "SELECT COLUMN_NAME FROM information_schema.COLUMNS "
- "WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = %s "
- "ORDER BY ORDINAL_POSITION",
- (table,),
- )
- return [r[0] for r in cur.fetchall()]
- def row_dict_from_backup(backup_cols: list[str], fields: list[str]) -> dict[str, str]:
- data: dict[str, str] = {}
- for idx, col in enumerate(backup_cols):
- if idx < len(fields):
- data[col.lower()] = fields[idx]
- for src, dst in COLUMN_ALIASES.items():
- if src in data and dst.lower() not in data:
- data[dst.lower()] = data[src]
- return data
- def build_insert_sql(actual_table: str, target_cols: list[str], row: dict[str, str]) -> str:
- col_sql = ", ".join(f"`{c}`" for c in target_cols)
- vals = ", ".join(row.get(c.lower(), "NULL") for c in target_cols)
- return f"INSERT IGNORE INTO `{actual_table}` ({col_sql}) VALUES ({vals})"
- def iter_values_inserts(text: str, table: str):
- pattern = re.compile(
- rf"^INSERT\s+INTO\s+[`']?{re.escape(table)}[`']?\s+VALUES\s(.+?);",
- re.IGNORECASE | re.MULTILINE | re.DOTALL,
- )
- for match in pattern.finditer(text):
- yield match.group(1).strip()
- def main() -> int:
- parser = argparse.ArgumentParser(description="Schema-aligned import for mismatched tables")
- parser.add_argument("--source", type=Path, default=DEFAULT_SOURCE)
- parser.add_argument("--dry-run", action="store_true")
- args = parser.parse_args()
- if not args.source.is_file():
- print(f"[ERROR] Source not found: {args.source}", file=sys.stderr)
- return 1
- text = args.source.read_text(encoding="utf-8", errors="replace")
- conn = pymysql.connect(**DEFAULT_DB, autocommit=False)
- cur = conn.cursor()
- total_rows = 0
- total_inserts = 0
- errors: list[str] = []
- for lower in TABLES:
- actual = resolve_target_table(cur, lower)
- if not actual:
- errors.append(f"{lower}: missing on target")
- continue
- backup_cols = backup_create_columns(text, lower)
- if not backup_cols:
- errors.append(f"{lower}: CREATE TABLE not found in backup")
- continue
- tgt_cols = target_columns(cur, actual)
- stmt_count = 0
- row_count = 0
- for values_blob in iter_values_inserts(text, lower):
- stmt_count += 1
- for row_sql in split_sql_values(values_blob):
- fields = split_row_fields(row_sql)
- row = row_dict_from_backup(backup_cols, fields)
- sql = build_insert_sql(actual, tgt_cols, row)
- if args.dry_run:
- row_count += 1
- continue
- try:
- affected = cur.execute(sql)
- row_count += 1
- total_rows += max(affected, 0)
- except Exception as exc: # noqa: BLE001
- conn.rollback()
- errors.append(f"{lower}: {exc}")
- break
- if not args.dry_run:
- conn.commit()
- total_inserts += stmt_count
- print(f"{lower} -> {actual}: statements={stmt_count}, rows={row_count}")
- conn.close()
- print(f"\nTotal INSERT statements: {total_inserts}, rows processed: {total_rows}")
- if errors:
- print("\nErrors:")
- for e in errors:
- print(f" - {e}")
- return 2
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
|