#!/usr/bin/env python3 """ Demo 数据智能生成脚本 ==================== 为 Demo 租户生成 30 天 KPI 值数据。支持从 Excel 模板导入用户填入的基准值, 未填入的自动生成,并保证父子层级数值逻辑自洽。 用法: # 全自动生成 python3 scripts/gen_demo_data.py > scripts/demo_data.sql # 使用 Excel 模板(用户填入了部分基准值) python3 scripts/gen_demo_data.py --template scripts/demo_data_template.xlsx > scripts/demo_data.sql # 导入 mysql -h... -u... -p... aidopdev < scripts/demo_data.sql 依赖: pip install pymysql openpyxl """ import argparse import random import sys from datetime import date, timedelta DEMO_TENANT_ID = 1300000000888 FACTORY_ID = 1 DAYS = 30 ID_BASE_L1 = 8880000000 ID_BASE_L2 = 8890000000 ID_BASE_L3 = 8900000000 DB_CONFIG = dict( host="123.60.180.165", port=3306, user="aidopremote", password="1234567890aiDOP#", database="aidopdev", ) random.seed(42) # ── DB ────────────────────────────────────────────── def connect_db(): try: import pymysql return pymysql.connect(**DB_CONFIG) except ImportError: pass try: import mysql.connector return mysql.connector.connect(**DB_CONFIG) except ImportError: pass print("ERROR: 需要 pymysql 或 mysql-connector-python", file=sys.stderr) sys.exit(1) def load_kpi_master(conn): cur = conn.cursor() cur.execute( "SELECT MetricCode, ModuleCode, MetricLevel, Direction, Unit, ParentId, Id " "FROM ado_smart_ops_kpi_master WHERE TenantId = %s ORDER BY SortNo", (DEMO_TENANT_ID,), ) rows = cur.fetchall() cur.close() return [ dict( metric_code=r[0], module_code=r[1], metric_level=r[2], direction=r[3] or "higher_is_better", unit=r[4] or "", parent_id=r[5], id=r[6], ) for r in rows ] # ── Excel 模板读取 ────────────────────────────────── def load_template(path): """读取 Excel 模板,返回 {metric_code: {target, actual, yellow_threshold, red_threshold}}""" from openpyxl import load_workbook wb = load_workbook(path, data_only=True) ws = wb.active overrides = {} for row in ws.iter_rows(min_row=2, values_only=True): if not row or not row[0]: continue code = str(row[0]).strip() if not code or not code.startswith("S"): continue target = row[7] if len(row) > 7 else None actual = row[8] if len(row) > 8 else None yellow_th = row[9] if len(row) > 9 else None red_th = row[10] if len(row) > 10 else None entry = {} if target is not None: entry["target"] = float(target) if actual is not None: entry["actual"] = float(actual) if yellow_th is not None: entry["yellow_threshold"] = float(yellow_th) if red_th is not None: entry["red_threshold"] = float(red_th) if entry: overrides[code] = entry return overrides # ── 自动生成基准值 ────────────────────────────────── def auto_target(kpi): unit = kpi.get("unit", "").strip() if unit == "%": return round(random.uniform(80, 98), 2) elif unit in ("天",): return round(random.uniform(2, 15), 2) elif "PPM" in unit: return round(random.uniform(3000, 8000), 0) elif "/" in unit: return round(random.uniform(50, 300), 2) elif unit in ("个", "颗/人", "家/人", "小时"): return round(random.uniform(5, 200), 0) else: return round(random.uniform(50, 200), 2) def auto_actual(target, direction): if direction == "higher_is_better": factor = random.uniform(0.82, 1.05) else: factor = random.uniform(0.90, 1.25) return round(target * factor, 4) # ── 父子自洽逻辑 ────────────────────────────────── def resolve_baselines(masters, overrides): """ 决定每个指标的 (target, actual) 基准值。 优先级: 用户填入 > 从父级拆分 / 从子级汇总 > 全自动生成。 保证父子层级加权平均自洽。 """ id_map = {m["id"]: m for m in masters} code_map = {m["metric_code"]: m for m in masters} children_of = {} for m in masters: pid = m["parent_id"] if pid: children_of.setdefault(pid, []).append(m) baselines = {} def get_or_gen(m): code = m["metric_code"] if code in baselines: return baselines[code] ov = overrides.get(code, {}) user_target = ov.get("target") user_actual = ov.get("actual") kids = children_of.get(m["id"], []) if user_target is not None and user_actual is not None: baselines[code] = (user_target, user_actual) _propagate_to_children(m, user_target, user_actual, kids, overrides) return baselines[code] if user_target is not None: actual = auto_actual(user_target, m["direction"]) baselines[code] = (user_target, actual) _propagate_to_children(m, user_target, actual, kids, overrides) return baselines[code] if user_actual is not None: tgt = _infer_target_from_actual(user_actual, m["direction"]) baselines[code] = (tgt, user_actual) _propagate_to_children(m, tgt, user_actual, kids, overrides) return baselines[code] if kids: kid_baselines = [get_or_gen(k) for k in kids] any_kid_has_user = any( overrides.get(k["metric_code"], {}).get("target") is not None or overrides.get(k["metric_code"], {}).get("actual") is not None for k in kids ) if any_kid_has_user: avg_t = sum(b[0] for b in kid_baselines) / len(kid_baselines) avg_a = sum(b[1] for b in kid_baselines) / len(kid_baselines) baselines[code] = (round(avg_t, 4), round(avg_a, 4)) return baselines[code] tgt = auto_target(m) actual = auto_actual(tgt, m["direction"]) baselines[code] = (tgt, actual) _propagate_to_children(m, tgt, actual, kids, overrides) return baselines[code] def _propagate_to_children(parent_m, p_target, p_actual, kids, overrides): if not kids: return n = len(kids) for i, k in enumerate(kids): kcode = k["metric_code"] if kcode in baselines: continue kov = overrides.get(kcode, {}) if kov.get("target") is not None or kov.get("actual") is not None: continue if i < n - 1: noise_t = random.uniform(-0.08, 0.08) noise_a = random.uniform(-0.08, 0.08) ct = p_target * (1 + noise_t) ca = p_actual * (1 + noise_a) else: already = [baselines[kids[j]["metric_code"]] for j in range(i) if kids[j]["metric_code"] in baselines] if already: sum_t = sum(b[0] for b in already) sum_a = sum(b[1] for b in already) ct = p_target * n - sum_t ca = p_actual * n - sum_a ct = max(ct, p_target * 0.7) ca = max(ca, p_actual * 0.5) else: ct = p_target ca = p_actual baselines[kcode] = (round(ct, 4), round(ca, 4)) grandkids = children_of.get(k["id"], []) _propagate_to_children(k, ct, ca, grandkids, overrides) def _infer_target_from_actual(actual, direction): if direction == "higher_is_better": return round(actual / random.uniform(0.85, 1.02), 4) else: return round(actual / random.uniform(0.95, 1.20), 4) for m in masters: if m["metric_level"] == 1: get_or_gen(m) for m in masters: if m["metric_code"] not in baselines: get_or_gen(m) return baselines # ── 日值生成 ────────────────────────────────────── def gen_daily_values(baseline_actual, direction, days=DAYS): """以 baseline_actual 为中心生成 30 天日值,带轻微波动但均值 ≈ baseline。""" vals = [] for i in range(days): noise = random.gauss(0, 0.03) vals.append(round(max(0, baseline_actual * (1 + noise)), 4)) return vals def status_color(val, target, direction, yellow_th=None, red_th=None): if target == 0: return "green" if direction == "higher_is_better": y_pct = (yellow_th or 95) / 100.0 r_pct = (red_th or 80) / 100.0 if val >= target: return "green" elif val >= target * y_pct: return "yellow" else: return "red" else: y_pct = (yellow_th or 110) / 100.0 r_pct = (red_th or 120) / 100.0 if val <= target: return "green" elif val <= target * y_pct: return "yellow" else: return "red" def trend_flag(vals, idx): if idx == 0: return "flat" prev = vals[idx - 1] cur = vals[idx] if prev == 0: return "up" if cur > 0 else "flat" change = (cur - prev) / prev if change > 0.02: return "up" elif change < -0.02: return "down" return "flat" def esc(s): if s is None: return "NULL" return "'" + str(s).replace("\\", "\\\\").replace("'", "\\'") + "'" # ── 主流程 ──────────────────────────────────────── def main(): parser = argparse.ArgumentParser(description="Demo 数据智能生成") parser.add_argument( "--template", "-t", help="Excel 模板路径(含用户填入的基准值),不提供则全自动生成", ) args = parser.parse_args() conn = connect_db() masters = load_kpi_master(conn) conn.close() if not masters: print(f"-- ERROR: 租户 {DEMO_TENANT_ID} 无 KpiMaster 数据", file=sys.stderr) sys.exit(1) overrides = {} if args.template: overrides = load_template(args.template) print(f"-- 从模板读取了 {len(overrides)} 条用户填入的基准值", file=sys.stderr) baselines = resolve_baselines(masters, overrides) today = date.today() start_date = today - timedelta(days=DAYS - 1) l1s = [m for m in masters if m["metric_level"] == 1] l2s = [m for m in masters if m["metric_level"] == 2] l3s = [m for m in masters if m["metric_level"] == 3] daily_vals = {} targets = {} for m in masters: code = m["metric_code"] tgt, actual_base = baselines[code] targets[code] = tgt daily_vals[code] = gen_daily_values(actual_base, m["direction"]) print("-- ============================================================") print(f"-- Demo data for tenant {DEMO_TENANT_ID}, {DAYS} days") print(f"-- Metrics: L1={len(l1s)}, L2={len(l2s)}, L3={len(l3s)}") if overrides: print(f"-- User overrides: {len(overrides)} metrics from template") print("-- ============================================================") print() print(f"DELETE FROM ado_s9_kpi_value_l1_day WHERE tenant_id = {DEMO_TENANT_ID};") print(f"DELETE FROM ado_s9_kpi_value_l2_day WHERE tenant_id = {DEMO_TENANT_ID};") print(f"DELETE FROM ado_s9_kpi_value_l3_day WHERE tenant_id = {DEMO_TENANT_ID};") print() def emit_inserts(level_list, table, id_base): if not level_list: return print( f"INSERT INTO `{table}` " f"(id, tenant_id, factory_id, biz_date, module_code, metric_code, " f"metric_value, target_value, status_color, trend_flag, is_deleted, is_active) VALUES" ) rows = [] seq = 0 for m in level_list: code = m["metric_code"] tgt = targets[code] vals = daily_vals[code] ov = overrides.get(code, {}) y_th = ov.get("yellow_threshold") r_th = ov.get("red_threshold") for i in range(DAYS): seq += 1 d = start_date + timedelta(days=i) v = vals[i] sc = status_color(v, tgt, m["direction"], y_th, r_th) tf = trend_flag(vals, i) row_id = id_base + seq rows.append( f"({row_id},{DEMO_TENANT_ID},{FACTORY_ID},'{d}'," f"'{m['module_code']}','{code}',{v},{tgt},{esc(sc)},{esc(tf)},0,1)" ) for idx, r in enumerate(rows): sep = "," if idx < len(rows) - 1 else ";" print(f" {r}{sep}") print() emit_inserts(l1s, "ado_s9_kpi_value_l1_day", ID_BASE_L1) emit_inserts(l2s, "ado_s9_kpi_value_l2_day", ID_BASE_L2) emit_inserts(l3s, "ado_s9_kpi_value_l3_day", ID_BASE_L3) print("-- Done.") if __name__ == "__main__": main()