| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414 |
- #!/usr/bin/env python3
- """
- Demo 数据智能生成脚本
- ====================
- 为 Demo 租户生成 30 天 KPI 值数据。支持从 Excel 模板导入用户填入的基准值,
- 未填入的自动生成,并保证父子层级数值逻辑自洽。
- 用法:
- # 全自动生成
- python3 scripts/gen_demo_data.py > scripts/demo_data.sql
- # 使用 Excel 模板(用户填入了部分基准值)
- python3 scripts/gen_demo_data.py --template scripts/demo_data_template.xlsx > scripts/demo_data.sql
- # 导入
- mysql -h... -u... -p... aidopdev < scripts/demo_data.sql
- 依赖: pip install pymysql openpyxl
- """
- import argparse
- import random
- import sys
- from datetime import date, timedelta
- DEMO_TENANT_ID = 1300000000888
- FACTORY_ID = 1
- DAYS = 30
- ID_BASE_L1 = 8880000000
- ID_BASE_L2 = 8890000000
- ID_BASE_L3 = 8900000000
- DB_CONFIG = dict(
- host="123.60.180.165",
- port=3306,
- user="aidopremote",
- password="1234567890aiDOP#",
- database="aidopdev",
- )
- random.seed(42)
- # ── DB ──────────────────────────────────────────────
- def connect_db():
- try:
- import pymysql
- return pymysql.connect(**DB_CONFIG)
- except ImportError:
- pass
- try:
- import mysql.connector
- return mysql.connector.connect(**DB_CONFIG)
- except ImportError:
- pass
- print("ERROR: 需要 pymysql 或 mysql-connector-python", file=sys.stderr)
- sys.exit(1)
- def load_kpi_master(conn):
- cur = conn.cursor()
- cur.execute(
- "SELECT MetricCode, ModuleCode, MetricLevel, Direction, Unit, ParentId, Id "
- "FROM ado_smart_ops_kpi_master WHERE TenantId = %s ORDER BY SortNo",
- (DEMO_TENANT_ID,),
- )
- rows = cur.fetchall()
- cur.close()
- return [
- dict(
- metric_code=r[0],
- module_code=r[1],
- metric_level=r[2],
- direction=r[3] or "higher_is_better",
- unit=r[4] or "",
- parent_id=r[5],
- id=r[6],
- )
- for r in rows
- ]
- # ── Excel 模板读取 ──────────────────────────────────
- def load_template(path):
- """读取 Excel 模板,返回 {metric_code: {target, actual, yellow_threshold, red_threshold}}"""
- from openpyxl import load_workbook
- wb = load_workbook(path, data_only=True)
- ws = wb.active
- overrides = {}
- for row in ws.iter_rows(min_row=2, values_only=True):
- if not row or not row[0]:
- continue
- code = str(row[0]).strip()
- if not code or not code.startswith("S"):
- continue
- target = row[7] if len(row) > 7 else None
- actual = row[8] if len(row) > 8 else None
- yellow_th = row[9] if len(row) > 9 else None
- red_th = row[10] if len(row) > 10 else None
- entry = {}
- if target is not None:
- entry["target"] = float(target)
- if actual is not None:
- entry["actual"] = float(actual)
- if yellow_th is not None:
- entry["yellow_threshold"] = float(yellow_th)
- if red_th is not None:
- entry["red_threshold"] = float(red_th)
- if entry:
- overrides[code] = entry
- return overrides
- # ── 自动生成基准值 ──────────────────────────────────
- def auto_target(kpi):
- unit = kpi.get("unit", "").strip()
- if unit == "%":
- return round(random.uniform(80, 98), 2)
- elif unit in ("天",):
- return round(random.uniform(2, 15), 2)
- elif "PPM" in unit:
- return round(random.uniform(3000, 8000), 0)
- elif "/" in unit:
- return round(random.uniform(50, 300), 2)
- elif unit in ("个", "颗/人", "家/人", "小时"):
- return round(random.uniform(5, 200), 0)
- else:
- return round(random.uniform(50, 200), 2)
- def auto_actual(target, direction):
- if direction == "higher_is_better":
- factor = random.uniform(0.82, 1.05)
- else:
- factor = random.uniform(0.90, 1.25)
- return round(target * factor, 4)
- # ── 父子自洽逻辑 ──────────────────────────────────
- def resolve_baselines(masters, overrides):
- """
- 决定每个指标的 (target, actual) 基准值。
- 优先级: 用户填入 > 从父级拆分 / 从子级汇总 > 全自动生成。
- 保证父子层级加权平均自洽。
- """
- id_map = {m["id"]: m for m in masters}
- code_map = {m["metric_code"]: m for m in masters}
- children_of = {}
- for m in masters:
- pid = m["parent_id"]
- if pid:
- children_of.setdefault(pid, []).append(m)
- baselines = {}
- def get_or_gen(m):
- code = m["metric_code"]
- if code in baselines:
- return baselines[code]
- ov = overrides.get(code, {})
- user_target = ov.get("target")
- user_actual = ov.get("actual")
- kids = children_of.get(m["id"], [])
- if user_target is not None and user_actual is not None:
- baselines[code] = (user_target, user_actual)
- _propagate_to_children(m, user_target, user_actual, kids, overrides)
- return baselines[code]
- if user_target is not None:
- actual = auto_actual(user_target, m["direction"])
- baselines[code] = (user_target, actual)
- _propagate_to_children(m, user_target, actual, kids, overrides)
- return baselines[code]
- if user_actual is not None:
- tgt = _infer_target_from_actual(user_actual, m["direction"])
- baselines[code] = (tgt, user_actual)
- _propagate_to_children(m, tgt, user_actual, kids, overrides)
- return baselines[code]
- if kids:
- kid_baselines = [get_or_gen(k) for k in kids]
- any_kid_has_user = any(
- overrides.get(k["metric_code"], {}).get("target") is not None
- or overrides.get(k["metric_code"], {}).get("actual") is not None
- for k in kids
- )
- if any_kid_has_user:
- avg_t = sum(b[0] for b in kid_baselines) / len(kid_baselines)
- avg_a = sum(b[1] for b in kid_baselines) / len(kid_baselines)
- baselines[code] = (round(avg_t, 4), round(avg_a, 4))
- return baselines[code]
- tgt = auto_target(m)
- actual = auto_actual(tgt, m["direction"])
- baselines[code] = (tgt, actual)
- _propagate_to_children(m, tgt, actual, kids, overrides)
- return baselines[code]
- def _propagate_to_children(parent_m, p_target, p_actual, kids, overrides):
- if not kids:
- return
- n = len(kids)
- for i, k in enumerate(kids):
- kcode = k["metric_code"]
- if kcode in baselines:
- continue
- kov = overrides.get(kcode, {})
- if kov.get("target") is not None or kov.get("actual") is not None:
- continue
- if i < n - 1:
- noise_t = random.uniform(-0.08, 0.08)
- noise_a = random.uniform(-0.08, 0.08)
- ct = p_target * (1 + noise_t)
- ca = p_actual * (1 + noise_a)
- else:
- already = [baselines[kids[j]["metric_code"]] for j in range(i) if kids[j]["metric_code"] in baselines]
- if already:
- sum_t = sum(b[0] for b in already)
- sum_a = sum(b[1] for b in already)
- ct = p_target * n - sum_t
- ca = p_actual * n - sum_a
- ct = max(ct, p_target * 0.7)
- ca = max(ca, p_actual * 0.5)
- else:
- ct = p_target
- ca = p_actual
- baselines[kcode] = (round(ct, 4), round(ca, 4))
- grandkids = children_of.get(k["id"], [])
- _propagate_to_children(k, ct, ca, grandkids, overrides)
- def _infer_target_from_actual(actual, direction):
- if direction == "higher_is_better":
- return round(actual / random.uniform(0.85, 1.02), 4)
- else:
- return round(actual / random.uniform(0.95, 1.20), 4)
- for m in masters:
- if m["metric_level"] == 1:
- get_or_gen(m)
- for m in masters:
- if m["metric_code"] not in baselines:
- get_or_gen(m)
- return baselines
- # ── 日值生成 ──────────────────────────────────────
- def gen_daily_values(baseline_actual, direction, days=DAYS):
- """以 baseline_actual 为中心生成 30 天日值,带轻微波动但均值 ≈ baseline。"""
- vals = []
- for i in range(days):
- noise = random.gauss(0, 0.03)
- vals.append(round(max(0, baseline_actual * (1 + noise)), 4))
- return vals
- def status_color(val, target, direction, yellow_th=None, red_th=None):
- if target == 0:
- return "green"
- if direction == "higher_is_better":
- y_pct = (yellow_th or 95) / 100.0
- r_pct = (red_th or 80) / 100.0
- if val >= target:
- return "green"
- elif val >= target * y_pct:
- return "yellow"
- else:
- return "red"
- else:
- y_pct = (yellow_th or 110) / 100.0
- r_pct = (red_th or 120) / 100.0
- if val <= target:
- return "green"
- elif val <= target * y_pct:
- return "yellow"
- else:
- return "red"
- def trend_flag(vals, idx):
- if idx == 0:
- return "flat"
- prev = vals[idx - 1]
- cur = vals[idx]
- if prev == 0:
- return "up" if cur > 0 else "flat"
- change = (cur - prev) / prev
- if change > 0.02:
- return "up"
- elif change < -0.02:
- return "down"
- return "flat"
- def esc(s):
- if s is None:
- return "NULL"
- return "'" + str(s).replace("\\", "\\\\").replace("'", "\\'") + "'"
- # ── 主流程 ────────────────────────────────────────
- def main():
- parser = argparse.ArgumentParser(description="Demo 数据智能生成")
- parser.add_argument(
- "--template", "-t",
- help="Excel 模板路径(含用户填入的基准值),不提供则全自动生成",
- )
- args = parser.parse_args()
- conn = connect_db()
- masters = load_kpi_master(conn)
- conn.close()
- if not masters:
- print(f"-- ERROR: 租户 {DEMO_TENANT_ID} 无 KpiMaster 数据", file=sys.stderr)
- sys.exit(1)
- overrides = {}
- if args.template:
- overrides = load_template(args.template)
- print(f"-- 从模板读取了 {len(overrides)} 条用户填入的基准值", file=sys.stderr)
- baselines = resolve_baselines(masters, overrides)
- today = date.today()
- start_date = today - timedelta(days=DAYS - 1)
- l1s = [m for m in masters if m["metric_level"] == 1]
- l2s = [m for m in masters if m["metric_level"] == 2]
- l3s = [m for m in masters if m["metric_level"] == 3]
- daily_vals = {}
- targets = {}
- for m in masters:
- code = m["metric_code"]
- tgt, actual_base = baselines[code]
- targets[code] = tgt
- daily_vals[code] = gen_daily_values(actual_base, m["direction"])
- print("-- ============================================================")
- print(f"-- Demo data for tenant {DEMO_TENANT_ID}, {DAYS} days")
- print(f"-- Metrics: L1={len(l1s)}, L2={len(l2s)}, L3={len(l3s)}")
- if overrides:
- print(f"-- User overrides: {len(overrides)} metrics from template")
- print("-- ============================================================")
- print()
- print(f"DELETE FROM ado_s9_kpi_value_l1_day WHERE tenant_id = {DEMO_TENANT_ID};")
- print(f"DELETE FROM ado_s9_kpi_value_l2_day WHERE tenant_id = {DEMO_TENANT_ID};")
- print(f"DELETE FROM ado_s9_kpi_value_l3_day WHERE tenant_id = {DEMO_TENANT_ID};")
- print()
- def emit_inserts(level_list, table, id_base):
- if not level_list:
- return
- print(
- f"INSERT INTO `{table}` "
- f"(id, tenant_id, factory_id, biz_date, module_code, metric_code, "
- f"metric_value, target_value, status_color, trend_flag, is_deleted, is_active) VALUES"
- )
- rows = []
- seq = 0
- for m in level_list:
- code = m["metric_code"]
- tgt = targets[code]
- vals = daily_vals[code]
- ov = overrides.get(code, {})
- y_th = ov.get("yellow_threshold")
- r_th = ov.get("red_threshold")
- for i in range(DAYS):
- seq += 1
- d = start_date + timedelta(days=i)
- v = vals[i]
- sc = status_color(v, tgt, m["direction"], y_th, r_th)
- tf = trend_flag(vals, i)
- row_id = id_base + seq
- rows.append(
- f"({row_id},{DEMO_TENANT_ID},{FACTORY_ID},'{d}',"
- f"'{m['module_code']}','{code}',{v},{tgt},{esc(sc)},{esc(tf)},0,1)"
- )
- for idx, r in enumerate(rows):
- sep = "," if idx < len(rows) - 1 else ";"
- print(f" {r}{sep}")
- print()
- emit_inserts(l1s, "ado_s9_kpi_value_l1_day", ID_BASE_L1)
- emit_inserts(l2s, "ado_s9_kpi_value_l2_day", ID_BASE_L2)
- emit_inserts(l3s, "ado_s9_kpi_value_l3_day", ID_BASE_L3)
- print("-- Done.")
- if __name__ == "__main__":
- main()
|