| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335 |
- #!/usr/bin/env python3
- """
- 将 mysqldump 备份中的数据合并导入 MySQL,跳过已存在行(INSERT IGNORE)。
- 适用场景:目标库已有部分数据,需要从备份补全而不产生主键/唯一键重复。
- 默认连接 Dev 165 / aidopdev(与 Database.json 开发库一致)。
- 不会执行 DROP TABLE / CREATE TABLE / CREATE DATABASE,不覆盖表结构。
- 用法示例:
- python doc/db/import_sql_dump_merge.py --dry-run
- python doc/db/import_sql_dump_merge.py
- python doc/db/import_sql_dump_merge.py --skip-tables abpauditlogs,abpauditlogactions
- """
- from __future__ import annotations
- import argparse
- import re
- import sys
- import time
- from collections import defaultdict
- from pathlib import Path
- import pymysql
- DEFAULT_SOURCE = (
- Path(__file__).resolve().parents[4]
- / "阿里云数据库备份"
- / "aidopdev_20260608_053527.sql"
- / "aidopdev_20260608_053527.sql"
- )
- DEFAULT_DB = dict(
- host="123.60.180.165",
- port=3306,
- user="aidopremote",
- password="1234567890aiDOP#",
- database="aidopdev",
- charset="utf8mb4",
- )
- SKIP_PREFIXES = (
- "DROP TABLE",
- "DROP DATABASE",
- "CREATE DATABASE",
- "CREATE TABLE",
- "ALTER TABLE",
- "LOCK TABLES",
- "UNLOCK TABLES",
- "/*!40000 ALTER TABLE",
- "/*!40101 SET @saved_cs_client",
- )
- SAFE_PREFIXES = (
- "SET NAMES",
- "SET TIME_ZONE",
- "SET @OLD_",
- "SET SQL_MODE",
- "SET FOREIGN_KEY_CHECKS",
- "SET UNIQUE_CHECKS",
- "SET CHARACTER_SET_CLIENT",
- "SET CHARACTER_SET_RESULTS",
- "SET COLLATION_CONNECTION",
- "SET SQL_NOTES",
- "/*!40101 SET @OLD_",
- "/*!40103 SET @OLD_",
- "/*!40014 SET @OLD_",
- "/*!40111 SET @OLD_",
- "/*!50503 SET NAMES",
- )
- INSERT_RE = re.compile(
- r"^INSERT\s+(?:IGNORE\s+)?INTO\s+[`]?([^`\s(]+)[`]?",
- re.IGNORECASE,
- )
- def parse_args() -> argparse.Namespace:
- parser = argparse.ArgumentParser(description="Merge mysqldump data into MySQL with INSERT IGNORE")
- parser.add_argument(
- "--source",
- type=Path,
- default=DEFAULT_SOURCE,
- help=f"SQL dump file path (default: {DEFAULT_SOURCE})",
- )
- parser.add_argument("--host", default=DEFAULT_DB["host"])
- parser.add_argument("--port", type=int, default=DEFAULT_DB["port"])
- parser.add_argument("--user", default=DEFAULT_DB["user"])
- parser.add_argument("--password", default=DEFAULT_DB["password"])
- parser.add_argument("--database", default=DEFAULT_DB["database"])
- parser.add_argument("--dry-run", action="store_true", help="Only scan and report, do not execute")
- parser.add_argument(
- "--only-tables",
- default="",
- help="Comma-separated table allowlist (lowercase). Empty = all tables with INSERT",
- )
- parser.add_argument(
- "--skip-tables",
- default="",
- help="Comma-separated tables to skip (lowercase)",
- )
- parser.add_argument(
- "--batch-commit",
- type=int,
- default=1,
- help="Commit every N INSERT statements (default 1)",
- )
- return parser.parse_args()
- def normalize_table(name: str) -> str:
- return name.strip("`").lower()
- def should_skip_statement(stmt: str) -> bool:
- upper = stmt.lstrip().upper()
- if upper.startswith("USE "):
- return True
- return any(upper.startswith(prefix.upper()) for prefix in SKIP_PREFIXES)
- def is_safe_session(stmt: str) -> bool:
- stripped = stmt.lstrip()
- return any(stripped.startswith(prefix) for prefix in SAFE_PREFIXES)
- def to_insert_ignore(stmt: str) -> str:
- if re.match(r"^INSERT\s+IGNORE\s+INTO", stmt, re.IGNORECASE):
- return stmt
- return re.sub(r"^INSERT\s+INTO", "INSERT IGNORE INTO", stmt, count=1, flags=re.IGNORECASE)
- def rewrite_insert_table(stmt: str, actual_table: str) -> str:
- sql = to_insert_ignore(stmt)
- return re.sub(
- r"^(INSERT\s+(?:IGNORE\s+)?INTO\s+)[`']?[^`'\s(]+[`']?",
- rf"\1`{actual_table}`",
- sql,
- count=1,
- flags=re.IGNORECASE,
- )
- def iter_statements(path: Path):
- buffer: list[str] = []
- for line in path.open("r", encoding="utf-8", errors="replace"):
- buffer.append(line)
- if not line.rstrip().endswith(";"):
- continue
- stmt = "".join(buffer).strip()
- buffer.clear()
- if stmt:
- yield stmt
- if buffer:
- tail = "".join(buffer).strip()
- if tail:
- yield tail
- def is_values_only_insert(stmt: str) -> bool:
- """Only import mysqldump data rows; skip procedure fragments with column lists."""
- stripped = stmt.lstrip()
- return bool(
- re.match(
- r"^INSERT\s+(?:IGNORE\s+)?INTO\s+[`']?[^`'\s(]+[`']?\s+VALUES\s",
- stripped,
- re.IGNORECASE,
- )
- )
- def extract_insert_table(stmt: str) -> str | None:
- match = INSERT_RE.match(stmt.strip())
- return normalize_table(match.group(1)) if match else None
- def connect_db(args: argparse.Namespace):
- return pymysql.connect(
- host=args.host,
- port=args.port,
- user=args.user,
- password=args.password,
- database=args.database,
- charset="utf8mb4",
- autocommit=False,
- connect_timeout=30,
- read_timeout=600,
- write_timeout=600,
- )
- def load_target_tables(conn) -> dict[str, str]:
- """Return lowercase table name -> actual TABLE_NAME on server."""
- with conn.cursor() as cur:
- cur.execute(
- "SELECT TABLE_NAME FROM information_schema.TABLES "
- "WHERE TABLE_SCHEMA = DATABASE()"
- )
- mapping: dict[str, str] = {}
- for row in cur.fetchall():
- actual = row[0]
- mapping[normalize_table(actual)] = actual
- return mapping
- def main() -> int:
- args = parse_args()
- source: Path = args.source
- if not source.is_file():
- print(f"[ERROR] Source file not found: {source}", file=sys.stderr)
- return 1
- only_tables = {t.strip().lower() for t in args.only_tables.split(",") if t.strip()}
- skip_tables = {t.strip().lower() for t in args.skip_tables.split(",") if t.strip()}
- stats = defaultdict(int)
- table_rows_attempted = defaultdict(int)
- errors: list[str] = []
- warnings: list[str] = []
- print(f"Source: {source}")
- print(f"Target: {args.user}@{args.host}:{args.port}/{args.database}")
- print(f"Mode: {'DRY-RUN' if args.dry_run else 'EXECUTE'}")
- if only_tables:
- print(f"Only tables: {', '.join(sorted(only_tables))}")
- if skip_tables:
- print(f"Skip tables: {', '.join(sorted(skip_tables))}")
- conn = None
- target_tables: dict[str, str] = {}
- if not args.dry_run:
- conn = connect_db(args)
- target_tables = load_target_tables(conn)
- print(f"Target tables in database: {len(target_tables)}")
- started = time.time()
- pending_since_commit = 0
- try:
- for stmt in iter_statements(source):
- if should_skip_statement(stmt):
- stats["skipped_ddl"] += 1
- continue
- table = extract_insert_table(stmt)
- if table:
- if not is_values_only_insert(stmt):
- stats["skipped_non_values_insert"] += 1
- continue
- if only_tables and table not in only_tables:
- stats["skipped_filter"] += 1
- continue
- if table in skip_tables:
- stats["skipped_filter"] += 1
- continue
- if not args.dry_run and table not in target_tables:
- stats["skipped_missing_table"] += 1
- warnings.append(f"missing table on target: {table}")
- continue
- stats["insert_statements"] += 1
- table_rows_attempted[table] += 1
- if args.dry_run:
- continue
- sql = rewrite_insert_table(stmt, target_tables[table])
- try:
- with conn.cursor() as cur:
- affected = cur.execute(sql)
- stats["insert_executed"] += 1
- stats["rows_affected"] += max(affected, 0)
- pending_since_commit += 1
- if pending_since_commit >= args.batch_commit:
- conn.commit()
- pending_since_commit = 0
- except Exception as exc: # noqa: BLE001
- conn.rollback()
- pending_since_commit = 0
- stats["insert_failed"] += 1
- errors.append(f"{table}: {exc}")
- continue
- if is_safe_session(stmt):
- if args.dry_run:
- stats["session_setup"] += 1
- continue
- try:
- with conn.cursor() as cur:
- cur.execute(stmt)
- stats["session_setup"] += 1
- except Exception:
- stats["session_setup_ignored"] += 1
- continue
- stats["skipped_other"] += 1
- if conn and pending_since_commit:
- conn.commit()
- finally:
- if conn:
- conn.close()
- elapsed = time.time() - started
- print("\n=== Summary ===")
- print(f"Elapsed: {elapsed:.1f}s")
- for key in sorted(stats):
- print(f" {key}: {stats[key]}")
- if table_rows_attempted:
- print("\nINSERT statements by table (top 20):")
- for table, count in sorted(table_rows_attempted.items(), key=lambda x: (-x[1], x[0]))[:20]:
- print(f" {table}: {count}")
- if len(table_rows_attempted) > 20:
- print(f" ... and {len(table_rows_attempted) - 20} more tables")
- if warnings:
- print(f"\nWarnings ({len(warnings)}):")
- for msg in warnings[:30]:
- print(f" - {msg}")
- if len(warnings) > 30:
- print(f" ... and {len(warnings) - 30} more")
- if errors:
- print(f"\nErrors ({len(errors)}):")
- for msg in errors[:30]:
- print(f" - {msg}")
- if len(errors) > 30:
- print(f" ... and {len(errors) - 30} more")
- return 2
- return 0
- if __name__ == "__main__":
- raise SystemExit(main())
|