import_schema_aligned_tables.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. #!/usr/bin/env python3
  2. """
  3. Import mysqldump INSERT rows for tables whose column layout differs between backup and 165.
  4. Maps backup column names to target columns (by name), fills missing target columns with NULL,
  5. uses INSERT IGNORE to skip duplicates.
  6. Also skips procedure-internal INSERT ... (col list) statements.
  7. """
  8. from __future__ import annotations
  9. import argparse
  10. import re
  11. import sys
  12. from pathlib import Path
  13. import pymysql
  14. DEFAULT_SOURCE = (
  15. Path(__file__).resolve().parents[4]
  16. / "阿里云数据库备份"
  17. / "aidopdev_20260608_053527.sql"
  18. / "aidopdev_20260608_053527.sql"
  19. )
  20. DEFAULT_DB = dict(
  21. host="123.60.180.165",
  22. port=3306,
  23. user="aidopremote",
  24. password="1234567890aiDOP#",
  25. database="aidopdev",
  26. charset="utf8mb4",
  27. )
  28. TABLES = [
  29. "itemmaster",
  30. "prioritycode",
  31. "purordrctdetail",
  32. "purordrctmaster",
  33. "scm_jhjh_jq",
  34. "scm_shd",
  35. "scm_shdzb",
  36. "nbrdayinfo",
  37. ]
  38. VALUES_INSERT_RE = re.compile(
  39. r"^INSERT\s+INTO\s+[`']?([^`'\s(]+)[`']?\s+VALUES\s",
  40. re.IGNORECASE,
  41. )
  42. # backup column name (lower) -> target column name when names differ
  43. COLUMN_ALIASES: dict[str, str] = {
  44. "expire_alarm_day": "ExpireAlarmDays",
  45. }
  46. def normalize(name: str) -> str:
  47. return name.strip("`'\"").lower()
  48. def backup_create_columns(text: str, table: str) -> list[str]:
  49. match = re.search(
  50. rf"CREATE TABLE `{re.escape(table)}`\s*\((.*?)\)\s*ENGINE",
  51. text,
  52. re.IGNORECASE | re.DOTALL,
  53. )
  54. if not match:
  55. return []
  56. cols: list[str] = []
  57. for line in match.group(1).splitlines():
  58. line = line.strip()
  59. if line.startswith("`"):
  60. cols.append(line.split("`", 2)[1])
  61. return cols
  62. def split_sql_values(values_blob: str) -> list[str]:
  63. rows: list[str] = []
  64. i = 0
  65. n = len(values_blob)
  66. while i < n:
  67. while i < n and values_blob[i] in " \t\r\n,":
  68. i += 1
  69. if i >= n:
  70. break
  71. if values_blob[i] != "(":
  72. break
  73. depth = 0
  74. in_str = False
  75. esc = False
  76. start = i
  77. while i < n:
  78. ch = values_blob[i]
  79. if esc:
  80. esc = False
  81. elif ch == "\\" and in_str:
  82. esc = True
  83. elif ch == "'":
  84. in_str = not in_str
  85. elif not in_str:
  86. if ch == "(":
  87. depth += 1
  88. elif ch == ")":
  89. depth -= 1
  90. if depth == 0:
  91. rows.append(values_blob[start : i + 1])
  92. i += 1
  93. break
  94. i += 1
  95. return rows
  96. def split_row_fields(row_sql: str) -> list[str]:
  97. inner = row_sql.strip()[1:-1]
  98. fields: list[str] = []
  99. cur: list[str] = []
  100. depth = 0
  101. in_str = False
  102. esc = False
  103. for ch in inner:
  104. if esc:
  105. cur.append(ch)
  106. esc = False
  107. continue
  108. if ch == "\\" and in_str:
  109. cur.append(ch)
  110. esc = True
  111. continue
  112. if ch == "'":
  113. in_str = not in_str
  114. cur.append(ch)
  115. continue
  116. if in_str:
  117. cur.append(ch)
  118. continue
  119. if ch == "(":
  120. depth += 1
  121. cur.append(ch)
  122. continue
  123. if ch == ")":
  124. depth -= 1
  125. cur.append(ch)
  126. continue
  127. if ch == "," and depth == 0:
  128. fields.append("".join(cur).strip())
  129. cur = []
  130. continue
  131. cur.append(ch)
  132. if cur:
  133. fields.append("".join(cur).strip())
  134. return fields
  135. def resolve_target_table(cur, lower_name: str) -> str | None:
  136. cur.execute(
  137. "SELECT TABLE_NAME FROM information_schema.TABLES "
  138. "WHERE TABLE_SCHEMA = DATABASE() AND LOWER(TABLE_NAME) = %s",
  139. (lower_name,),
  140. )
  141. row = cur.fetchone()
  142. return row[0] if row else None
  143. def target_columns(cur, table: str) -> list[str]:
  144. cur.execute(
  145. "SELECT COLUMN_NAME FROM information_schema.COLUMNS "
  146. "WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = %s "
  147. "ORDER BY ORDINAL_POSITION",
  148. (table,),
  149. )
  150. return [r[0] for r in cur.fetchall()]
  151. def row_dict_from_backup(backup_cols: list[str], fields: list[str]) -> dict[str, str]:
  152. data: dict[str, str] = {}
  153. for idx, col in enumerate(backup_cols):
  154. if idx < len(fields):
  155. data[col.lower()] = fields[idx]
  156. for src, dst in COLUMN_ALIASES.items():
  157. if src in data and dst.lower() not in data:
  158. data[dst.lower()] = data[src]
  159. return data
  160. def build_insert_sql(actual_table: str, target_cols: list[str], row: dict[str, str]) -> str:
  161. col_sql = ", ".join(f"`{c}`" for c in target_cols)
  162. vals = ", ".join(row.get(c.lower(), "NULL") for c in target_cols)
  163. return f"INSERT IGNORE INTO `{actual_table}` ({col_sql}) VALUES ({vals})"
  164. def iter_values_inserts(text: str, table: str):
  165. pattern = re.compile(
  166. rf"^INSERT\s+INTO\s+[`']?{re.escape(table)}[`']?\s+VALUES\s(.+?);",
  167. re.IGNORECASE | re.MULTILINE | re.DOTALL,
  168. )
  169. for match in pattern.finditer(text):
  170. yield match.group(1).strip()
  171. def main() -> int:
  172. parser = argparse.ArgumentParser(description="Schema-aligned import for mismatched tables")
  173. parser.add_argument("--source", type=Path, default=DEFAULT_SOURCE)
  174. parser.add_argument("--dry-run", action="store_true")
  175. args = parser.parse_args()
  176. if not args.source.is_file():
  177. print(f"[ERROR] Source not found: {args.source}", file=sys.stderr)
  178. return 1
  179. text = args.source.read_text(encoding="utf-8", errors="replace")
  180. conn = pymysql.connect(**DEFAULT_DB, autocommit=False)
  181. cur = conn.cursor()
  182. total_rows = 0
  183. total_inserts = 0
  184. errors: list[str] = []
  185. for lower in TABLES:
  186. actual = resolve_target_table(cur, lower)
  187. if not actual:
  188. errors.append(f"{lower}: missing on target")
  189. continue
  190. backup_cols = backup_create_columns(text, lower)
  191. if not backup_cols:
  192. errors.append(f"{lower}: CREATE TABLE not found in backup")
  193. continue
  194. tgt_cols = target_columns(cur, actual)
  195. stmt_count = 0
  196. row_count = 0
  197. for values_blob in iter_values_inserts(text, lower):
  198. stmt_count += 1
  199. for row_sql in split_sql_values(values_blob):
  200. fields = split_row_fields(row_sql)
  201. row = row_dict_from_backup(backup_cols, fields)
  202. sql = build_insert_sql(actual, tgt_cols, row)
  203. if args.dry_run:
  204. row_count += 1
  205. continue
  206. try:
  207. affected = cur.execute(sql)
  208. row_count += 1
  209. total_rows += max(affected, 0)
  210. except Exception as exc: # noqa: BLE001
  211. conn.rollback()
  212. errors.append(f"{lower}: {exc}")
  213. break
  214. if not args.dry_run:
  215. conn.commit()
  216. total_inserts += stmt_count
  217. print(f"{lower} -> {actual}: statements={stmt_count}, rows={row_count}")
  218. conn.close()
  219. print(f"\nTotal INSERT statements: {total_inserts}, rows processed: {total_rows}")
  220. if errors:
  221. print("\nErrors:")
  222. for e in errors:
  223. print(f" - {e}")
  224. return 2
  225. return 0
  226. if __name__ == "__main__":
  227. raise SystemExit(main())