#!/usr/bin/env python3 """ Import mysqldump INSERT rows for tables whose column layout differs between backup and 165. Maps backup column names to target columns (by name), fills missing target columns with NULL, uses INSERT IGNORE to skip duplicates. Also skips procedure-internal INSERT ... (col list) statements. """ from __future__ import annotations import argparse import re import sys from pathlib import Path import pymysql DEFAULT_SOURCE = ( Path(__file__).resolve().parents[4] / "阿里云数据库备份" / "aidopdev_20260608_053527.sql" / "aidopdev_20260608_053527.sql" ) DEFAULT_DB = dict( host="123.60.180.165", port=3306, user="aidopremote", password="1234567890aiDOP#", database="aidopdev", charset="utf8mb4", ) TABLES = [ "itemmaster", "prioritycode", "purordrctdetail", "purordrctmaster", "scm_jhjh_jq", "scm_shd", "scm_shdzb", "nbrdayinfo", ] VALUES_INSERT_RE = re.compile( r"^INSERT\s+INTO\s+[`']?([^`'\s(]+)[`']?\s+VALUES\s", re.IGNORECASE, ) # backup column name (lower) -> target column name when names differ COLUMN_ALIASES: dict[str, str] = { "expire_alarm_day": "ExpireAlarmDays", } def normalize(name: str) -> str: return name.strip("`'\"").lower() def backup_create_columns(text: str, table: str) -> list[str]: match = re.search( rf"CREATE TABLE `{re.escape(table)}`\s*\((.*?)\)\s*ENGINE", text, re.IGNORECASE | re.DOTALL, ) if not match: return [] cols: list[str] = [] for line in match.group(1).splitlines(): line = line.strip() if line.startswith("`"): cols.append(line.split("`", 2)[1]) return cols def split_sql_values(values_blob: str) -> list[str]: rows: list[str] = [] i = 0 n = len(values_blob) while i < n: while i < n and values_blob[i] in " \t\r\n,": i += 1 if i >= n: break if values_blob[i] != "(": break depth = 0 in_str = False esc = False start = i while i < n: ch = values_blob[i] if esc: esc = False elif ch == "\\" and in_str: esc = True elif ch == "'": in_str = not in_str elif not in_str: if ch == "(": depth += 1 elif ch == ")": depth -= 1 if depth == 0: rows.append(values_blob[start : i + 1]) i += 1 break i += 1 return rows def split_row_fields(row_sql: str) -> list[str]: inner = row_sql.strip()[1:-1] fields: list[str] = [] cur: list[str] = [] depth = 0 in_str = False esc = False for ch in inner: if esc: cur.append(ch) esc = False continue if ch == "\\" and in_str: cur.append(ch) esc = True continue if ch == "'": in_str = not in_str cur.append(ch) continue if in_str: cur.append(ch) continue if ch == "(": depth += 1 cur.append(ch) continue if ch == ")": depth -= 1 cur.append(ch) continue if ch == "," and depth == 0: fields.append("".join(cur).strip()) cur = [] continue cur.append(ch) if cur: fields.append("".join(cur).strip()) return fields def resolve_target_table(cur, lower_name: str) -> str | None: cur.execute( "SELECT TABLE_NAME FROM information_schema.TABLES " "WHERE TABLE_SCHEMA = DATABASE() AND LOWER(TABLE_NAME) = %s", (lower_name,), ) row = cur.fetchone() return row[0] if row else None def target_columns(cur, table: str) -> list[str]: cur.execute( "SELECT COLUMN_NAME FROM information_schema.COLUMNS " "WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = %s " "ORDER BY ORDINAL_POSITION", (table,), ) return [r[0] for r in cur.fetchall()] def row_dict_from_backup(backup_cols: list[str], fields: list[str]) -> dict[str, str]: data: dict[str, str] = {} for idx, col in enumerate(backup_cols): if idx < len(fields): data[col.lower()] = fields[idx] for src, dst in COLUMN_ALIASES.items(): if src in data and dst.lower() not in data: data[dst.lower()] = data[src] return data def build_insert_sql(actual_table: str, target_cols: list[str], row: dict[str, str]) -> str: col_sql = ", ".join(f"`{c}`" for c in target_cols) vals = ", ".join(row.get(c.lower(), "NULL") for c in target_cols) return f"INSERT IGNORE INTO `{actual_table}` ({col_sql}) VALUES ({vals})" def iter_values_inserts(text: str, table: str): pattern = re.compile( rf"^INSERT\s+INTO\s+[`']?{re.escape(table)}[`']?\s+VALUES\s(.+?);", re.IGNORECASE | re.MULTILINE | re.DOTALL, ) for match in pattern.finditer(text): yield match.group(1).strip() def main() -> int: parser = argparse.ArgumentParser(description="Schema-aligned import for mismatched tables") parser.add_argument("--source", type=Path, default=DEFAULT_SOURCE) parser.add_argument("--dry-run", action="store_true") args = parser.parse_args() if not args.source.is_file(): print(f"[ERROR] Source not found: {args.source}", file=sys.stderr) return 1 text = args.source.read_text(encoding="utf-8", errors="replace") conn = pymysql.connect(**DEFAULT_DB, autocommit=False) cur = conn.cursor() total_rows = 0 total_inserts = 0 errors: list[str] = [] for lower in TABLES: actual = resolve_target_table(cur, lower) if not actual: errors.append(f"{lower}: missing on target") continue backup_cols = backup_create_columns(text, lower) if not backup_cols: errors.append(f"{lower}: CREATE TABLE not found in backup") continue tgt_cols = target_columns(cur, actual) stmt_count = 0 row_count = 0 for values_blob in iter_values_inserts(text, lower): stmt_count += 1 for row_sql in split_sql_values(values_blob): fields = split_row_fields(row_sql) row = row_dict_from_backup(backup_cols, fields) sql = build_insert_sql(actual, tgt_cols, row) if args.dry_run: row_count += 1 continue try: affected = cur.execute(sql) row_count += 1 total_rows += max(affected, 0) except Exception as exc: # noqa: BLE001 conn.rollback() errors.append(f"{lower}: {exc}") break if not args.dry_run: conn.commit() total_inserts += stmt_count print(f"{lower} -> {actual}: statements={stmt_count}, rows={row_count}") conn.close() print(f"\nTotal INSERT statements: {total_inserts}, rows processed: {total_rows}") if errors: print("\nErrors:") for e in errors: print(f" - {e}") return 2 return 0 if __name__ == "__main__": raise SystemExit(main())