split_weekly_time_by_person.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458
  1. # -*- coding: utf-8 -*-
  2. """
  3. 将「每周详细计划_时间」中同一周内多责任人的行拆成「一人一行」。
  4. - 识别「\\n【某某】」分段(与此前合并彭熙玉格式一致)
  5. - 否则从 E 列拆:多个 @、分号、顿号、逗号等
  6. - 解除 A2:A4 等合并后重写表体;表头第 1 行保留
  7. - 日报「20260413」中「二、项目周进度」(约 G–N 列、第 4 行起)按同一时间块
  8. 与日期范围(时间页 B 列)对齐,一人一行;原多行重复填充的会合并为与时间页相同的行数
  9. """
  10. from __future__ import annotations
  11. import re
  12. import shutil
  13. from datetime import datetime
  14. from pathlib import Path
  15. import openpyxl
  16. from openpyxl.utils import range_boundaries
  17. SRC = Path(r"d:\Projects\Ai-DOP\AI-DOP项目总计划_20260413.xlsx")
  18. SHEET = "每周详细计划_时间"
  19. SHEET_DAILY = "20260413"
  20. # 日报周进度区:里程碑左侧 A–F 不动,右侧 G(周次)…N(风险) 与「时间」页 A–H 对应
  21. DAILY_WEEK_ROW_MIN = 4
  22. DAILY_WEEK_ROW_MAX = 13
  23. def cell_tl(ws, row: int, col: int) -> tuple[int, int]:
  24. for m in ws.merged_cells.ranges:
  25. min_col, min_row, max_col, max_row = range_boundaries(str(m))
  26. if min_row <= row <= max_row and min_col <= col <= max_col:
  27. return min_row, min_col
  28. return row, col
  29. def get_cell(ws, row: int, col: int):
  30. rr, cc = cell_tl(ws, row, col)
  31. return ws.cell(rr, cc).value
  32. def set_cell(ws, row: int, col: int, value):
  33. rr, cc = cell_tl(ws, row, col)
  34. ws.cell(rr, cc).value = value
  35. def read_row(ws, r: int) -> dict[str, object | None]:
  36. return {chr(64 + i): get_cell(ws, r, i) for i in range(1, 9)}
  37. def unmerge_all(ws):
  38. mlist = list(ws.merged_cells.ranges)
  39. for m in mlist:
  40. ws.unmerge_cells(str(m))
  41. def has_bracket_split(text: str) -> bool:
  42. return bool(text) and "\n【" in text
  43. def split_bracket_segments(s: str | None) -> list[tuple[str, str]]:
  44. """返回 [(责任人简称, 该段正文), ...],第一段为「主行」键 __primary__。"""
  45. if s is None:
  46. return [("__primary__", "")]
  47. t = str(s).strip()
  48. if not t:
  49. return [("__primary__", "")]
  50. if "\n【" not in t:
  51. return [("__primary__", t)]
  52. parts = re.split(r"\n【([^】]+)】", t)
  53. out: list[tuple[str, str]] = []
  54. head = parts[0].strip()
  55. out.append(("__primary__", head))
  56. rest = parts[1:]
  57. for i in range(0, len(rest), 2):
  58. name = rest[i].strip()
  59. body = rest[i + 1].strip() if i + 1 < len(rest) else ""
  60. out.append((name, body))
  61. return out
  62. def primary_display_name(seg: list[tuple[str, str]]) -> str:
  63. h = seg[0][1] if seg else ""
  64. m = re.search(r"@\s*([^\s;;,,\n(]+)", h)
  65. if m:
  66. return m.group(1).strip()
  67. m2 = re.match(r"^([\u4e00-\u9fa5]{2,4})", h)
  68. if m2:
  69. return m2.group(1).strip()
  70. return "主责"
  71. def people_from_e_plain(e: str) -> list[str]:
  72. """无【】时从 E 拆人名。"""
  73. if not e:
  74. return []
  75. t = e.strip()
  76. # 多个 @xxx
  77. ats = re.findall(r"@\s*([^\s;;,,\n]+)", t)
  78. if len(ats) >= 2:
  79. cleaned = []
  80. for a in ats:
  81. a = re.sub(r"([^)]*)", "", a)
  82. cleaned.append(a.strip())
  83. return cleaned
  84. # 分号
  85. if re.search(r"[;;]", t):
  86. parts = re.split(r"[;;]", t)
  87. names = []
  88. for p in parts:
  89. p = p.strip()
  90. if not p:
  91. continue
  92. m = re.match(r"@\s*([^\s(]+)", p)
  93. if m:
  94. names.append(re.sub(r"([^)]*)", "", m.group(1)).strip())
  95. else:
  96. m2 = re.match(r"^([\u4e00-\u9fa5]{2,4})", p)
  97. if m2:
  98. names.append(m2.group(1))
  99. return [n for n in names if n]
  100. # 顿号(两人常见)
  101. if "、" in t and len(t) <= 40:
  102. parts = t.split("、")
  103. if len(parts) >= 2:
  104. out = []
  105. for i, p in enumerate(parts):
  106. p = p.strip()
  107. m2 = re.match(r"^([\u4e00-\u9fa5]{2,4})", p)
  108. if m2:
  109. out.append(m2.group(1))
  110. else:
  111. out.append(p[:6])
  112. return out
  113. return []
  114. def value_for_person_bracket(col_val: str | None, person: str, segs: list[tuple[str, str]]) -> str | None:
  115. if col_val is None:
  116. return None
  117. s = str(col_val).strip()
  118. if not s:
  119. return None
  120. if person == "__primary__":
  121. if "\n【" in s:
  122. return s.split("\n【", 1)[0].strip() or None
  123. return s or None
  124. marker = f"\n【{person}】"
  125. if marker in s:
  126. return s.split(marker, 1)[1].strip() or None
  127. return None
  128. def expand_row(row: dict[str, object | None], week_in_a: bool) -> list[dict[str, object | None]]:
  129. """返回 1..n 行(每人一行)。week_in_a:本行是否在 A 列带有周次(W0/W1/…)。"""
  130. cols_cd = {k: row.get(k) for k in "CDEFGH"}
  131. combined = "\n".join(str(v or "") for v in cols_cd.values())
  132. people: list[str] = []
  133. if has_bracket_split(combined):
  134. segs = split_bracket_segments(str(row.get("E") or ""))
  135. names = [k for k, _ in segs if k != "__primary__"]
  136. base_name = primary_display_name(segs)
  137. people = [base_name] + names
  138. rows_out: list[dict[str, object | None]] = []
  139. for i, pname in enumerate(people):
  140. nr: dict[str, object | None] = {}
  141. if i == 0 and week_in_a and row.get("A"):
  142. nr["A"] = row.get("A")
  143. else:
  144. nr["A"] = None
  145. nr["B"] = row.get("B")
  146. for col in "CDEFGH":
  147. raw = row.get(col)
  148. if col in "CDEFGH":
  149. if has_bracket_split(str(raw or "")):
  150. key = "__primary__" if i == 0 else pname
  151. # i==0 用 __primary__ 段;其余用对应【pname】
  152. if i == 0:
  153. seg_key = "__primary__"
  154. else:
  155. seg_key = pname
  156. nr[col] = value_for_person_bracket(str(raw) if raw else None, seg_key, segs)
  157. else:
  158. nr[col] = raw
  159. else:
  160. nr[col] = raw
  161. # E 无【时上面已处理;有【时 E 已分段写入
  162. if i == 0 and not has_bracket_split(str(row.get("E") or "")):
  163. pass
  164. rows_out.append(nr)
  165. # 修正:上面循环里 col 包含了 E 两次逻辑混乱,重写清晰版
  166. rows_out = []
  167. for i, pname in enumerate(people):
  168. nr: dict[str, object | None] = {}
  169. nr["A"] = (row.get("A") if i == 0 and week_in_a else None)
  170. nr["B"] = row.get("B")
  171. for col in "CDEFGH":
  172. raw = row.get(col)
  173. if raw is None:
  174. nr[col] = None
  175. continue
  176. s = str(raw)
  177. if has_bracket_split(s):
  178. if i == 0:
  179. nr[col] = value_for_person_bracket(s, "__primary__", segs)
  180. else:
  181. nr[col] = value_for_person_bracket(s, pname, segs)
  182. else:
  183. nr[col] = raw
  184. rows_out.append(nr)
  185. return rows_out
  186. plain = people_from_e_plain(str(row.get("E") or ""))
  187. if len(plain) <= 1:
  188. return [dict(row)]
  189. rows_out = []
  190. for i, pname in enumerate(plain):
  191. nr = {k: row.get(k) for k in "ABCDEFGH"}
  192. nr["A"] = row.get("A") if i == 0 and week_in_a else None
  193. nr["B"] = row.get("B")
  194. # 每人一行:E 只保留与该人相关的一句;无结构时整段 D/F 复制
  195. nr["E"] = f"@{pname}" if not str(row.get("E") or "").startswith("@") else f"@{pname}"
  196. # 若原文本含该人说明,尽量截取
  197. full_e = str(row.get("E") or "")
  198. if pname in full_e:
  199. m = re.search(rf"{re.escape(pname)}[^;;\n]*", full_e)
  200. if m:
  201. nr["E"] = m.group(0).strip()
  202. rows_out.append(nr)
  203. return rows_out
  204. def collect_bracket_names(row: dict[str, object | None]) -> list[str]:
  205. found: list[str] = []
  206. for col in "CDEFGH":
  207. t = str(row.get(col) or "")
  208. for m in re.finditer(r"\n【([^】]+)】", t):
  209. n = m.group(1).strip()
  210. if n and n not in found:
  211. found.append(n)
  212. return found
  213. def expand_row_fixed(row: dict[str, object | None], has_week_cell: bool) -> list[dict[str, object | None]]:
  214. """has_week_cell:本物理行在 A 列是否出现周次(含合并左上角)。"""
  215. cols = {k: row.get(k) for k in "CDEFGH"}
  216. combined = "\n".join(str(v or "") for v in cols.values())
  217. if has_bracket_split(combined):
  218. segs_e = split_bracket_segments(str(row.get("E") or ""))
  219. names_from_e = [k for k, _ in segs_e if k != "__primary__"]
  220. names_from_cols = collect_bracket_names(row)
  221. # 合并顺序:先 E 中显式标签,再补齐其他列出现过的标签
  222. names: list[str] = []
  223. for n in names_from_e + names_from_cols:
  224. if n not in names:
  225. names.append(n)
  226. base_person = primary_display_name(segs_e)
  227. order = [base_person] + [n for n in names if n != base_person]
  228. if len(order) < 2:
  229. return [dict(row)]
  230. out_rows: list[dict[str, object | None]] = []
  231. for i, pname in enumerate(order):
  232. nr: dict[str, object | None] = {}
  233. nr["A"] = row.get("A") if (i == 0 and has_week_cell) else None
  234. nr["B"] = row.get("B")
  235. for col in "CDEFGH":
  236. raw = row.get(col)
  237. if raw is None:
  238. nr[col] = None
  239. continue
  240. s = str(raw)
  241. if has_bracket_split(s):
  242. if i == 0:
  243. nr[col] = value_for_person_bracket(s, "__primary__", segs_e)
  244. else:
  245. nr[col] = value_for_person_bracket(s, pname, segs_e)
  246. else:
  247. nr[col] = raw
  248. out_rows.append(nr)
  249. return out_rows
  250. plain = people_from_e_plain(str(row.get("E") or ""))
  251. if len(plain) <= 1:
  252. return [dict(row)]
  253. out_rows = []
  254. for i, pname in enumerate(plain):
  255. nr = {k: row.get(k) for k in "ABCDEFGH"}
  256. nr["A"] = row.get("A") if (i == 0 and has_week_cell) else None
  257. nr["B"] = row.get("B")
  258. nr["E"] = f"@{pname}"
  259. full_e = str(row.get("E") or "")
  260. if pname in full_e:
  261. chunk = re.search(rf"{re.escape(pname)}[^;;\n@]*", full_e)
  262. if chunk:
  263. nr["E"] = chunk.group(0).strip()
  264. out_rows.append(nr)
  265. return out_rows
  266. def write_row(ws, r: int, data: dict[str, object | None]):
  267. for i, L in enumerate("ABCDEFGH", start=1):
  268. ws.cell(r, i).value = data.get(L)
  269. def norm_week(val) -> str | None:
  270. if val is None:
  271. return None
  272. s = str(val).strip()
  273. m = re.match(r"^(W\d+)\b", s, re.I)
  274. return m.group(1).upper() if m else None
  275. def norm_date_range(v) -> str:
  276. if v is None:
  277. return ""
  278. if hasattr(v, "strftime") and not isinstance(v, str):
  279. return v.strftime("%Y/%m/%d")
  280. s = str(v).strip().replace("-", "/")
  281. s = re.sub(r"\s*-\s*", " - ", s)
  282. s = re.sub(r"\s+", " ", s)
  283. return s
  284. def gn_tuple_daily(ws, r: int) -> tuple:
  285. return tuple(get_cell(ws, r, c) for c in range(7, 15))
  286. def daily_week_anchor_row(ws_daily) -> int | None:
  287. for r in range(DAILY_WEEK_ROW_MIN, DAILY_WEEK_ROW_MAX + 1):
  288. if norm_week(get_cell(ws_daily, r, 7)):
  289. return r
  290. return None
  291. def daily_week_span_width(ws_daily, anchor: int) -> int:
  292. w0 = norm_week(get_cell(ws_daily, anchor, 7))
  293. if not w0:
  294. return 1
  295. i0 = norm_date_range(get_cell(ws_daily, anchor, 9))
  296. sig0 = gn_tuple_daily(ws_daily, anchor)
  297. span = 1
  298. r = anchor
  299. while r < DAILY_WEEK_ROW_MAX:
  300. r2 = r + 1
  301. g2 = get_cell(ws_daily, r2, 7)
  302. i2 = norm_date_range(get_cell(ws_daily, r2, 9))
  303. sig2 = gn_tuple_daily(ws_daily, r2)
  304. if norm_week(g2) == w0 and sig2 == sig0:
  305. span += 1
  306. r = r2
  307. continue
  308. g2s = str(g2).strip() if g2 is not None else ""
  309. if not g2s and i2 == i0 and bool(i0):
  310. span += 1
  311. r = r2
  312. continue
  313. break
  314. return span
  315. def find_time_sheet_rows_by_date_b(time_ws, date_ref) -> list[int]:
  316. want = norm_date_range(date_ref)
  317. if not want:
  318. return []
  319. rows: list[int] = []
  320. started = False
  321. for r in range(2, time_ws.max_row + 1):
  322. b = get_cell(time_ws, r, 2)
  323. if norm_date_range(b) == want:
  324. rows.append(r)
  325. started = True
  326. elif started:
  327. break
  328. return rows
  329. def write_time_row_to_daily(ws_daily, dr: int, letters: dict[str, object | None]):
  330. set_cell(ws_daily, dr, 7, letters.get("A"))
  331. set_cell(ws_daily, dr, 8, letters.get("H"))
  332. set_cell(ws_daily, dr, 9, letters.get("B"))
  333. set_cell(ws_daily, dr, 10, letters.get("C"))
  334. set_cell(ws_daily, dr, 11, letters.get("D"))
  335. set_cell(ws_daily, dr, 12, letters.get("E"))
  336. set_cell(ws_daily, dr, 13, letters.get("F"))
  337. set_cell(ws_daily, dr, 14, letters.get("G"))
  338. def clear_daily_week_cols(ws_daily, dr: int):
  339. for c in range(7, 15):
  340. set_cell(ws_daily, dr, c, None)
  341. def sync_daily_20260413_from_time_sheet(wb):
  342. if SHEET_DAILY not in wb.sheetnames:
  343. return
  344. ws_t = wb[SHEET]
  345. ws_d = wb[SHEET_DAILY]
  346. anchor = daily_week_anchor_row(ws_d)
  347. if anchor is None:
  348. return
  349. span = daily_week_span_width(ws_d, anchor)
  350. date_ref = get_cell(ws_d, anchor, 9)
  351. time_rows = find_time_sheet_rows_by_date_b(ws_t, date_ref)
  352. if not time_rows:
  353. return
  354. room = DAILY_WEEK_ROW_MAX - anchor + 1
  355. n_write = min(len(time_rows), room)
  356. for idx in range(n_write):
  357. dr = anchor + idx
  358. write_time_row_to_daily(ws_d, dr, read_row(ws_t, time_rows[idx]))
  359. for dr in range(anchor + n_write, anchor + span):
  360. if DAILY_WEEK_ROW_MIN <= dr <= DAILY_WEEK_ROW_MAX:
  361. clear_daily_week_cols(ws_d, dr)
  362. def main():
  363. if not SRC.exists():
  364. raise SystemExit(f"missing: {SRC}")
  365. backup = SRC.with_name(SRC.stem + "_before_split_person_" + datetime.now().strftime("%Y%m%d_%H%M%S") + ".xlsx")
  366. shutil.copy2(SRC, backup)
  367. wb = openpyxl.load_workbook(SRC)
  368. ws = wb[SHEET]
  369. unmerge_all(ws)
  370. header = read_row(ws, 1)
  371. body: list[dict[str, object | None]] = []
  372. max_r = ws.max_row
  373. for r in range(2, max_r + 1):
  374. rowd = read_row(ws, r)
  375. if not any(v is not None and str(v).strip() for v in rowd.values()):
  376. continue
  377. has_week = bool(rowd.get("A")) and str(rowd.get("A")).strip() != ""
  378. expanded = expand_row_fixed(rowd, has_week)
  379. body.extend(expanded)
  380. # 清空第 2 行及以下
  381. if ws.max_row >= 2:
  382. ws.delete_rows(2, ws.max_row - 1)
  383. write_row(ws, 1, header)
  384. for idx, rowd in enumerate(body, start=2):
  385. write_row(ws, idx, rowd)
  386. sync_daily_20260413_from_time_sheet(wb)
  387. wb.save(SRC)
  388. print("saved", SRC)
  389. print("backup", backup)
  390. if __name__ == "__main__":
  391. main()