#!/usr/bin/env python3 """ 将 mysqldump 备份中的数据合并导入 MySQL,跳过已存在行(INSERT IGNORE)。 适用场景:目标库已有部分数据,需要从备份补全而不产生主键/唯一键重复。 默认连接 Dev 165 / aidopdev(与 Database.json 开发库一致)。 不会执行 DROP TABLE / CREATE TABLE / CREATE DATABASE,不覆盖表结构。 用法示例: python doc/db/import_sql_dump_merge.py --dry-run python doc/db/import_sql_dump_merge.py python doc/db/import_sql_dump_merge.py --skip-tables abpauditlogs,abpauditlogactions """ from __future__ import annotations import argparse import re import sys import time from collections import defaultdict from pathlib import Path import pymysql DEFAULT_SOURCE = ( Path(__file__).resolve().parents[4] / "阿里云数据库备份" / "aidopdev_20260608_053527.sql" / "aidopdev_20260608_053527.sql" ) DEFAULT_DB = dict( host="123.60.180.165", port=3306, user="aidopremote", password="1234567890aiDOP#", database="aidopdev", charset="utf8mb4", ) SKIP_PREFIXES = ( "DROP TABLE", "DROP DATABASE", "CREATE DATABASE", "CREATE TABLE", "ALTER TABLE", "LOCK TABLES", "UNLOCK TABLES", "/*!40000 ALTER TABLE", "/*!40101 SET @saved_cs_client", ) SAFE_PREFIXES = ( "SET NAMES", "SET TIME_ZONE", "SET @OLD_", "SET SQL_MODE", "SET FOREIGN_KEY_CHECKS", "SET UNIQUE_CHECKS", "SET CHARACTER_SET_CLIENT", "SET CHARACTER_SET_RESULTS", "SET COLLATION_CONNECTION", "SET SQL_NOTES", "/*!40101 SET @OLD_", "/*!40103 SET @OLD_", "/*!40014 SET @OLD_", "/*!40111 SET @OLD_", "/*!50503 SET NAMES", ) INSERT_RE = re.compile( r"^INSERT\s+(?:IGNORE\s+)?INTO\s+[`]?([^`\s(]+)[`]?", re.IGNORECASE, ) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Merge mysqldump data into MySQL with INSERT IGNORE") parser.add_argument( "--source", type=Path, default=DEFAULT_SOURCE, help=f"SQL dump file path (default: {DEFAULT_SOURCE})", ) parser.add_argument("--host", default=DEFAULT_DB["host"]) parser.add_argument("--port", type=int, default=DEFAULT_DB["port"]) parser.add_argument("--user", default=DEFAULT_DB["user"]) parser.add_argument("--password", default=DEFAULT_DB["password"]) parser.add_argument("--database", default=DEFAULT_DB["database"]) parser.add_argument("--dry-run", action="store_true", help="Only scan and report, do not execute") parser.add_argument( "--only-tables", default="", help="Comma-separated table allowlist (lowercase). Empty = all tables with INSERT", ) parser.add_argument( "--skip-tables", default="", help="Comma-separated tables to skip (lowercase)", ) parser.add_argument( "--batch-commit", type=int, default=1, help="Commit every N INSERT statements (default 1)", ) return parser.parse_args() def normalize_table(name: str) -> str: return name.strip("`").lower() def should_skip_statement(stmt: str) -> bool: upper = stmt.lstrip().upper() if upper.startswith("USE "): return True return any(upper.startswith(prefix.upper()) for prefix in SKIP_PREFIXES) def is_safe_session(stmt: str) -> bool: stripped = stmt.lstrip() return any(stripped.startswith(prefix) for prefix in SAFE_PREFIXES) def to_insert_ignore(stmt: str) -> str: if re.match(r"^INSERT\s+IGNORE\s+INTO", stmt, re.IGNORECASE): return stmt return re.sub(r"^INSERT\s+INTO", "INSERT IGNORE INTO", stmt, count=1, flags=re.IGNORECASE) def rewrite_insert_table(stmt: str, actual_table: str) -> str: sql = to_insert_ignore(stmt) return re.sub( r"^(INSERT\s+(?:IGNORE\s+)?INTO\s+)[`']?[^`'\s(]+[`']?", rf"\1`{actual_table}`", sql, count=1, flags=re.IGNORECASE, ) def iter_statements(path: Path): buffer: list[str] = [] for line in path.open("r", encoding="utf-8", errors="replace"): buffer.append(line) if not line.rstrip().endswith(";"): continue stmt = "".join(buffer).strip() buffer.clear() if stmt: yield stmt if buffer: tail = "".join(buffer).strip() if tail: yield tail def is_values_only_insert(stmt: str) -> bool: """Only import mysqldump data rows; skip procedure fragments with column lists.""" stripped = stmt.lstrip() return bool( re.match( r"^INSERT\s+(?:IGNORE\s+)?INTO\s+[`']?[^`'\s(]+[`']?\s+VALUES\s", stripped, re.IGNORECASE, ) ) def extract_insert_table(stmt: str) -> str | None: match = INSERT_RE.match(stmt.strip()) return normalize_table(match.group(1)) if match else None def connect_db(args: argparse.Namespace): return pymysql.connect( host=args.host, port=args.port, user=args.user, password=args.password, database=args.database, charset="utf8mb4", autocommit=False, connect_timeout=30, read_timeout=600, write_timeout=600, ) def load_target_tables(conn) -> dict[str, str]: """Return lowercase table name -> actual TABLE_NAME on server.""" with conn.cursor() as cur: cur.execute( "SELECT TABLE_NAME FROM information_schema.TABLES " "WHERE TABLE_SCHEMA = DATABASE()" ) mapping: dict[str, str] = {} for row in cur.fetchall(): actual = row[0] mapping[normalize_table(actual)] = actual return mapping def main() -> int: args = parse_args() source: Path = args.source if not source.is_file(): print(f"[ERROR] Source file not found: {source}", file=sys.stderr) return 1 only_tables = {t.strip().lower() for t in args.only_tables.split(",") if t.strip()} skip_tables = {t.strip().lower() for t in args.skip_tables.split(",") if t.strip()} stats = defaultdict(int) table_rows_attempted = defaultdict(int) errors: list[str] = [] warnings: list[str] = [] print(f"Source: {source}") print(f"Target: {args.user}@{args.host}:{args.port}/{args.database}") print(f"Mode: {'DRY-RUN' if args.dry_run else 'EXECUTE'}") if only_tables: print(f"Only tables: {', '.join(sorted(only_tables))}") if skip_tables: print(f"Skip tables: {', '.join(sorted(skip_tables))}") conn = None target_tables: dict[str, str] = {} if not args.dry_run: conn = connect_db(args) target_tables = load_target_tables(conn) print(f"Target tables in database: {len(target_tables)}") started = time.time() pending_since_commit = 0 try: for stmt in iter_statements(source): if should_skip_statement(stmt): stats["skipped_ddl"] += 1 continue table = extract_insert_table(stmt) if table: if not is_values_only_insert(stmt): stats["skipped_non_values_insert"] += 1 continue if only_tables and table not in only_tables: stats["skipped_filter"] += 1 continue if table in skip_tables: stats["skipped_filter"] += 1 continue if not args.dry_run and table not in target_tables: stats["skipped_missing_table"] += 1 warnings.append(f"missing table on target: {table}") continue stats["insert_statements"] += 1 table_rows_attempted[table] += 1 if args.dry_run: continue sql = rewrite_insert_table(stmt, target_tables[table]) try: with conn.cursor() as cur: affected = cur.execute(sql) stats["insert_executed"] += 1 stats["rows_affected"] += max(affected, 0) pending_since_commit += 1 if pending_since_commit >= args.batch_commit: conn.commit() pending_since_commit = 0 except Exception as exc: # noqa: BLE001 conn.rollback() pending_since_commit = 0 stats["insert_failed"] += 1 errors.append(f"{table}: {exc}") continue if is_safe_session(stmt): if args.dry_run: stats["session_setup"] += 1 continue try: with conn.cursor() as cur: cur.execute(stmt) stats["session_setup"] += 1 except Exception: stats["session_setup_ignored"] += 1 continue stats["skipped_other"] += 1 if conn and pending_since_commit: conn.commit() finally: if conn: conn.close() elapsed = time.time() - started print("\n=== Summary ===") print(f"Elapsed: {elapsed:.1f}s") for key in sorted(stats): print(f" {key}: {stats[key]}") if table_rows_attempted: print("\nINSERT statements by table (top 20):") for table, count in sorted(table_rows_attempted.items(), key=lambda x: (-x[1], x[0]))[:20]: print(f" {table}: {count}") if len(table_rows_attempted) > 20: print(f" ... and {len(table_rows_attempted) - 20} more tables") if warnings: print(f"\nWarnings ({len(warnings)}):") for msg in warnings[:30]: print(f" - {msg}") if len(warnings) > 30: print(f" ... and {len(warnings) - 30} more") if errors: print(f"\nErrors ({len(errors)}):") for msg in errors[:30]: print(f" - {msg}") if len(errors) > 30: print(f" ... and {len(errors) - 30} more") return 2 return 0 if __name__ == "__main__": raise SystemExit(main())