| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458 |
- # -*- coding: utf-8 -*-
- """
- 将「每周详细计划_时间」中同一周内多责任人的行拆成「一人一行」。
- - 识别「\\n【某某】」分段(与此前合并彭熙玉格式一致)
- - 否则从 E 列拆:多个 @、分号、顿号、逗号等
- - 解除 A2:A4 等合并后重写表体;表头第 1 行保留
- - 日报「20260413」中「二、项目周进度」(约 G–N 列、第 4 行起)按同一时间块
- 与日期范围(时间页 B 列)对齐,一人一行;原多行重复填充的会合并为与时间页相同的行数
- """
- from __future__ import annotations
- import re
- import shutil
- from datetime import datetime
- from pathlib import Path
- import openpyxl
- from openpyxl.utils import range_boundaries
- SRC = Path(r"d:\Projects\Ai-DOP\AI-DOP项目总计划_20260413.xlsx")
- SHEET = "每周详细计划_时间"
- SHEET_DAILY = "20260413"
- # 日报周进度区:里程碑左侧 A–F 不动,右侧 G(周次)…N(风险) 与「时间」页 A–H 对应
- DAILY_WEEK_ROW_MIN = 4
- DAILY_WEEK_ROW_MAX = 13
- def cell_tl(ws, row: int, col: int) -> tuple[int, int]:
- for m in ws.merged_cells.ranges:
- min_col, min_row, max_col, max_row = range_boundaries(str(m))
- if min_row <= row <= max_row and min_col <= col <= max_col:
- return min_row, min_col
- return row, col
- def get_cell(ws, row: int, col: int):
- rr, cc = cell_tl(ws, row, col)
- return ws.cell(rr, cc).value
- def set_cell(ws, row: int, col: int, value):
- rr, cc = cell_tl(ws, row, col)
- ws.cell(rr, cc).value = value
- def read_row(ws, r: int) -> dict[str, object | None]:
- return {chr(64 + i): get_cell(ws, r, i) for i in range(1, 9)}
- def unmerge_all(ws):
- mlist = list(ws.merged_cells.ranges)
- for m in mlist:
- ws.unmerge_cells(str(m))
- def has_bracket_split(text: str) -> bool:
- return bool(text) and "\n【" in text
- def split_bracket_segments(s: str | None) -> list[tuple[str, str]]:
- """返回 [(责任人简称, 该段正文), ...],第一段为「主行」键 __primary__。"""
- if s is None:
- return [("__primary__", "")]
- t = str(s).strip()
- if not t:
- return [("__primary__", "")]
- if "\n【" not in t:
- return [("__primary__", t)]
- parts = re.split(r"\n【([^】]+)】", t)
- out: list[tuple[str, str]] = []
- head = parts[0].strip()
- out.append(("__primary__", head))
- rest = parts[1:]
- for i in range(0, len(rest), 2):
- name = rest[i].strip()
- body = rest[i + 1].strip() if i + 1 < len(rest) else ""
- out.append((name, body))
- return out
- def primary_display_name(seg: list[tuple[str, str]]) -> str:
- h = seg[0][1] if seg else ""
- m = re.search(r"@\s*([^\s;;,,\n(]+)", h)
- if m:
- return m.group(1).strip()
- m2 = re.match(r"^([\u4e00-\u9fa5]{2,4})", h)
- if m2:
- return m2.group(1).strip()
- return "主责"
- def people_from_e_plain(e: str) -> list[str]:
- """无【】时从 E 拆人名。"""
- if not e:
- return []
- t = e.strip()
- # 多个 @xxx
- ats = re.findall(r"@\s*([^\s;;,,\n]+)", t)
- if len(ats) >= 2:
- cleaned = []
- for a in ats:
- a = re.sub(r"([^)]*)", "", a)
- cleaned.append(a.strip())
- return cleaned
- # 分号
- if re.search(r"[;;]", t):
- parts = re.split(r"[;;]", t)
- names = []
- for p in parts:
- p = p.strip()
- if not p:
- continue
- m = re.match(r"@\s*([^\s(]+)", p)
- if m:
- names.append(re.sub(r"([^)]*)", "", m.group(1)).strip())
- else:
- m2 = re.match(r"^([\u4e00-\u9fa5]{2,4})", p)
- if m2:
- names.append(m2.group(1))
- return [n for n in names if n]
- # 顿号(两人常见)
- if "、" in t and len(t) <= 40:
- parts = t.split("、")
- if len(parts) >= 2:
- out = []
- for i, p in enumerate(parts):
- p = p.strip()
- m2 = re.match(r"^([\u4e00-\u9fa5]{2,4})", p)
- if m2:
- out.append(m2.group(1))
- else:
- out.append(p[:6])
- return out
- return []
- def value_for_person_bracket(col_val: str | None, person: str, segs: list[tuple[str, str]]) -> str | None:
- if col_val is None:
- return None
- s = str(col_val).strip()
- if not s:
- return None
- if person == "__primary__":
- if "\n【" in s:
- return s.split("\n【", 1)[0].strip() or None
- return s or None
- marker = f"\n【{person}】"
- if marker in s:
- return s.split(marker, 1)[1].strip() or None
- return None
- def expand_row(row: dict[str, object | None], week_in_a: bool) -> list[dict[str, object | None]]:
- """返回 1..n 行(每人一行)。week_in_a:本行是否在 A 列带有周次(W0/W1/…)。"""
- cols_cd = {k: row.get(k) for k in "CDEFGH"}
- combined = "\n".join(str(v or "") for v in cols_cd.values())
- people: list[str] = []
- if has_bracket_split(combined):
- segs = split_bracket_segments(str(row.get("E") or ""))
- names = [k for k, _ in segs if k != "__primary__"]
- base_name = primary_display_name(segs)
- people = [base_name] + names
- rows_out: list[dict[str, object | None]] = []
- for i, pname in enumerate(people):
- nr: dict[str, object | None] = {}
- if i == 0 and week_in_a and row.get("A"):
- nr["A"] = row.get("A")
- else:
- nr["A"] = None
- nr["B"] = row.get("B")
- for col in "CDEFGH":
- raw = row.get(col)
- if col in "CDEFGH":
- if has_bracket_split(str(raw or "")):
- key = "__primary__" if i == 0 else pname
- # i==0 用 __primary__ 段;其余用对应【pname】
- if i == 0:
- seg_key = "__primary__"
- else:
- seg_key = pname
- nr[col] = value_for_person_bracket(str(raw) if raw else None, seg_key, segs)
- else:
- nr[col] = raw
- else:
- nr[col] = raw
- # E 无【时上面已处理;有【时 E 已分段写入
- if i == 0 and not has_bracket_split(str(row.get("E") or "")):
- pass
- rows_out.append(nr)
- # 修正:上面循环里 col 包含了 E 两次逻辑混乱,重写清晰版
- rows_out = []
- for i, pname in enumerate(people):
- nr: dict[str, object | None] = {}
- nr["A"] = (row.get("A") if i == 0 and week_in_a else None)
- nr["B"] = row.get("B")
- for col in "CDEFGH":
- raw = row.get(col)
- if raw is None:
- nr[col] = None
- continue
- s = str(raw)
- if has_bracket_split(s):
- if i == 0:
- nr[col] = value_for_person_bracket(s, "__primary__", segs)
- else:
- nr[col] = value_for_person_bracket(s, pname, segs)
- else:
- nr[col] = raw
- rows_out.append(nr)
- return rows_out
- plain = people_from_e_plain(str(row.get("E") or ""))
- if len(plain) <= 1:
- return [dict(row)]
- rows_out = []
- for i, pname in enumerate(plain):
- nr = {k: row.get(k) for k in "ABCDEFGH"}
- nr["A"] = row.get("A") if i == 0 and week_in_a else None
- nr["B"] = row.get("B")
- # 每人一行:E 只保留与该人相关的一句;无结构时整段 D/F 复制
- nr["E"] = f"@{pname}" if not str(row.get("E") or "").startswith("@") else f"@{pname}"
- # 若原文本含该人说明,尽量截取
- full_e = str(row.get("E") or "")
- if pname in full_e:
- m = re.search(rf"{re.escape(pname)}[^;;\n]*", full_e)
- if m:
- nr["E"] = m.group(0).strip()
- rows_out.append(nr)
- return rows_out
- def collect_bracket_names(row: dict[str, object | None]) -> list[str]:
- found: list[str] = []
- for col in "CDEFGH":
- t = str(row.get(col) or "")
- for m in re.finditer(r"\n【([^】]+)】", t):
- n = m.group(1).strip()
- if n and n not in found:
- found.append(n)
- return found
- def expand_row_fixed(row: dict[str, object | None], has_week_cell: bool) -> list[dict[str, object | None]]:
- """has_week_cell:本物理行在 A 列是否出现周次(含合并左上角)。"""
- cols = {k: row.get(k) for k in "CDEFGH"}
- combined = "\n".join(str(v or "") for v in cols.values())
- if has_bracket_split(combined):
- segs_e = split_bracket_segments(str(row.get("E") or ""))
- names_from_e = [k for k, _ in segs_e if k != "__primary__"]
- names_from_cols = collect_bracket_names(row)
- # 合并顺序:先 E 中显式标签,再补齐其他列出现过的标签
- names: list[str] = []
- for n in names_from_e + names_from_cols:
- if n not in names:
- names.append(n)
- base_person = primary_display_name(segs_e)
- order = [base_person] + [n for n in names if n != base_person]
- if len(order) < 2:
- return [dict(row)]
- out_rows: list[dict[str, object | None]] = []
- for i, pname in enumerate(order):
- nr: dict[str, object | None] = {}
- nr["A"] = row.get("A") if (i == 0 and has_week_cell) else None
- nr["B"] = row.get("B")
- for col in "CDEFGH":
- raw = row.get(col)
- if raw is None:
- nr[col] = None
- continue
- s = str(raw)
- if has_bracket_split(s):
- if i == 0:
- nr[col] = value_for_person_bracket(s, "__primary__", segs_e)
- else:
- nr[col] = value_for_person_bracket(s, pname, segs_e)
- else:
- nr[col] = raw
- out_rows.append(nr)
- return out_rows
- plain = people_from_e_plain(str(row.get("E") or ""))
- if len(plain) <= 1:
- return [dict(row)]
- out_rows = []
- for i, pname in enumerate(plain):
- nr = {k: row.get(k) for k in "ABCDEFGH"}
- nr["A"] = row.get("A") if (i == 0 and has_week_cell) else None
- nr["B"] = row.get("B")
- nr["E"] = f"@{pname}"
- full_e = str(row.get("E") or "")
- if pname in full_e:
- chunk = re.search(rf"{re.escape(pname)}[^;;\n@]*", full_e)
- if chunk:
- nr["E"] = chunk.group(0).strip()
- out_rows.append(nr)
- return out_rows
- def write_row(ws, r: int, data: dict[str, object | None]):
- for i, L in enumerate("ABCDEFGH", start=1):
- ws.cell(r, i).value = data.get(L)
- def norm_week(val) -> str | None:
- if val is None:
- return None
- s = str(val).strip()
- m = re.match(r"^(W\d+)\b", s, re.I)
- return m.group(1).upper() if m else None
- def norm_date_range(v) -> str:
- if v is None:
- return ""
- if hasattr(v, "strftime") and not isinstance(v, str):
- return v.strftime("%Y/%m/%d")
- s = str(v).strip().replace("-", "/")
- s = re.sub(r"\s*-\s*", " - ", s)
- s = re.sub(r"\s+", " ", s)
- return s
- def gn_tuple_daily(ws, r: int) -> tuple:
- return tuple(get_cell(ws, r, c) for c in range(7, 15))
- def daily_week_anchor_row(ws_daily) -> int | None:
- for r in range(DAILY_WEEK_ROW_MIN, DAILY_WEEK_ROW_MAX + 1):
- if norm_week(get_cell(ws_daily, r, 7)):
- return r
- return None
- def daily_week_span_width(ws_daily, anchor: int) -> int:
- w0 = norm_week(get_cell(ws_daily, anchor, 7))
- if not w0:
- return 1
- i0 = norm_date_range(get_cell(ws_daily, anchor, 9))
- sig0 = gn_tuple_daily(ws_daily, anchor)
- span = 1
- r = anchor
- while r < DAILY_WEEK_ROW_MAX:
- r2 = r + 1
- g2 = get_cell(ws_daily, r2, 7)
- i2 = norm_date_range(get_cell(ws_daily, r2, 9))
- sig2 = gn_tuple_daily(ws_daily, r2)
- if norm_week(g2) == w0 and sig2 == sig0:
- span += 1
- r = r2
- continue
- g2s = str(g2).strip() if g2 is not None else ""
- if not g2s and i2 == i0 and bool(i0):
- span += 1
- r = r2
- continue
- break
- return span
- def find_time_sheet_rows_by_date_b(time_ws, date_ref) -> list[int]:
- want = norm_date_range(date_ref)
- if not want:
- return []
- rows: list[int] = []
- started = False
- for r in range(2, time_ws.max_row + 1):
- b = get_cell(time_ws, r, 2)
- if norm_date_range(b) == want:
- rows.append(r)
- started = True
- elif started:
- break
- return rows
- def write_time_row_to_daily(ws_daily, dr: int, letters: dict[str, object | None]):
- set_cell(ws_daily, dr, 7, letters.get("A"))
- set_cell(ws_daily, dr, 8, letters.get("H"))
- set_cell(ws_daily, dr, 9, letters.get("B"))
- set_cell(ws_daily, dr, 10, letters.get("C"))
- set_cell(ws_daily, dr, 11, letters.get("D"))
- set_cell(ws_daily, dr, 12, letters.get("E"))
- set_cell(ws_daily, dr, 13, letters.get("F"))
- set_cell(ws_daily, dr, 14, letters.get("G"))
- def clear_daily_week_cols(ws_daily, dr: int):
- for c in range(7, 15):
- set_cell(ws_daily, dr, c, None)
- def sync_daily_20260413_from_time_sheet(wb):
- if SHEET_DAILY not in wb.sheetnames:
- return
- ws_t = wb[SHEET]
- ws_d = wb[SHEET_DAILY]
- anchor = daily_week_anchor_row(ws_d)
- if anchor is None:
- return
- span = daily_week_span_width(ws_d, anchor)
- date_ref = get_cell(ws_d, anchor, 9)
- time_rows = find_time_sheet_rows_by_date_b(ws_t, date_ref)
- if not time_rows:
- return
- room = DAILY_WEEK_ROW_MAX - anchor + 1
- n_write = min(len(time_rows), room)
- for idx in range(n_write):
- dr = anchor + idx
- write_time_row_to_daily(ws_d, dr, read_row(ws_t, time_rows[idx]))
- for dr in range(anchor + n_write, anchor + span):
- if DAILY_WEEK_ROW_MIN <= dr <= DAILY_WEEK_ROW_MAX:
- clear_daily_week_cols(ws_d, dr)
- def main():
- if not SRC.exists():
- raise SystemExit(f"missing: {SRC}")
- backup = SRC.with_name(SRC.stem + "_before_split_person_" + datetime.now().strftime("%Y%m%d_%H%M%S") + ".xlsx")
- shutil.copy2(SRC, backup)
- wb = openpyxl.load_workbook(SRC)
- ws = wb[SHEET]
- unmerge_all(ws)
- header = read_row(ws, 1)
- body: list[dict[str, object | None]] = []
- max_r = ws.max_row
- for r in range(2, max_r + 1):
- rowd = read_row(ws, r)
- if not any(v is not None and str(v).strip() for v in rowd.values()):
- continue
- has_week = bool(rowd.get("A")) and str(rowd.get("A")).strip() != ""
- expanded = expand_row_fixed(rowd, has_week)
- body.extend(expanded)
- # 清空第 2 行及以下
- if ws.max_row >= 2:
- ws.delete_rows(2, ws.max_row - 1)
- write_row(ws, 1, header)
- for idx, rowd in enumerate(body, start=2):
- write_row(ws, idx, rowd)
- sync_daily_20260413_from_time_sheet(wb)
- wb.save(SRC)
- print("saved", SRC)
- print("backup", backup)
- if __name__ == "__main__":
- main()
|