_probe_tenant_isolation.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. # -*- coding: utf-8 -*-
  2. """对比两个租户在 AiDOP 核心表中的数据差异,验证租户隔离是否落地。"""
  3. from __future__ import annotations
  4. import sys
  5. import pymysql
  6. sys.stdout.reconfigure(encoding='utf-8')
  7. CONN = dict(
  8. host='123.60.180.165', port=3306,
  9. user='aidopremote', password='1234567890aiDOP#',
  10. database='aidopdev', charset='utf8mb4', autocommit=True,
  11. )
  12. # Admin.NET 框架默认租户 ID(SqlSugarConst.DefaultTenantId)
  13. DEFAULT_TENANT = 1300000000001
  14. DEMO_TENANT = 1300000000888
  15. TENANTS = [
  16. ("superAdmin 默认租户", DEFAULT_TENANT),
  17. ("DemoAdmin Demo 租户", DEMO_TENANT),
  18. ]
  19. def table_exists(cur, table: str) -> bool:
  20. cur.execute(
  21. "SELECT COUNT(*) AS c FROM information_schema.tables "
  22. "WHERE table_schema=DATABASE() AND table_name=%s",
  23. (table,),
  24. )
  25. row = cur.fetchone()
  26. return bool(row and row.get('c', 0) > 0)
  27. def count_by_tenant(cur, table: str, tenant_col: str, tenant_id: int, extra_where: str = "") -> int:
  28. sql = f"SELECT COUNT(*) AS c FROM `{table}` WHERE `{tenant_col}` = %s"
  29. if extra_where:
  30. sql += f" AND {extra_where}"
  31. cur.execute(sql, (tenant_id,))
  32. row = cur.fetchone()
  33. return int(row['c']) if row else 0
  34. def sample_s4_l1(cur, tenant_id: int, limit: int = 3):
  35. # 最新 biz_date 的 S4 L1 值
  36. cur.execute(
  37. """
  38. SELECT biz_date, metric_code, metric_value, target_value, status_color
  39. FROM ado_s9_kpi_value_l1_day
  40. WHERE tenant_id = %s AND module_code = 'S4' AND is_deleted = 0
  41. AND biz_date = (
  42. SELECT MAX(biz_date) FROM ado_s9_kpi_value_l1_day
  43. WHERE tenant_id = %s AND module_code = 'S4' AND is_deleted = 0
  44. )
  45. ORDER BY metric_code
  46. LIMIT %s
  47. """,
  48. (tenant_id, tenant_id, limit),
  49. )
  50. return cur.fetchall()
  51. def list_kpi_master_s4(cur, tenant_id: int, limit: int = 3):
  52. cur.execute(
  53. """
  54. SELECT MetricCode, MetricName, MetricLevel, YellowThreshold, RedThreshold
  55. FROM ado_smart_ops_kpi_master
  56. WHERE TenantId = %s AND ModuleCode = 'S4' AND MetricLevel = 1
  57. ORDER BY SortNo
  58. LIMIT %s
  59. """,
  60. (tenant_id, limit),
  61. )
  62. return cur.fetchall()
  63. def main() -> None:
  64. conn = pymysql.connect(**CONN)
  65. with conn.cursor(pymysql.cursors.DictCursor) as cur:
  66. tables = [
  67. ("ado_smart_ops_kpi_master", "TenantId", ""),
  68. ("ado_smart_ops_layout_item", "TenantId", ""),
  69. ("ado_smart_ops_home_module", "TenantId", ""),
  70. ("ado_s9_kpi_value_l1_day", "tenant_id", "is_deleted=0"),
  71. ("ado_s9_kpi_value_l2_day", "tenant_id", "is_deleted=0"),
  72. ("ado_s9_kpi_value_l3_day", "tenant_id", "is_deleted=0"),
  73. ]
  74. print("=" * 88)
  75. print(f"{'表':<32} {'过滤':<18} " + " ".join(f"{n:<24}" for n, _ in TENANTS))
  76. print("-" * 88)
  77. for t, tcol, extra in tables:
  78. if not table_exists(cur, t):
  79. print(f"{t:<32} [表不存在]")
  80. continue
  81. counts = [count_by_tenant(cur, t, tcol, tid, extra) for _, tid in TENANTS]
  82. extra_lbl = extra or "-"
  83. print(f"{t:<32} {extra_lbl:<18} " + " ".join(f"{c:<24}" for c in counts))
  84. # S4 KpiMaster 指标名称对比(第一个 L1 指标)
  85. print("\n" + "=" * 88)
  86. print("S4 L1 KpiMaster(前 3 条):")
  87. for name, tid in TENANTS:
  88. print(f"\n [{name}] TenantId={tid}")
  89. rows = list_kpi_master_s4(cur, tid, 3)
  90. if not rows:
  91. print(" (无数据)")
  92. for r in rows:
  93. print(f" {r['MetricCode']:<20} | {r['MetricName']:<24} | 黄={r['YellowThreshold']} 红={r['RedThreshold']}")
  94. # S4 L1 最新日值对比
  95. print("\n" + "=" * 88)
  96. print("S4 L1 日表最新 biz_date 的前 3 条(目的:两个租户数值应不相同):")
  97. for name, tid in TENANTS:
  98. print(f"\n [{name}] TenantId={tid}")
  99. rows = sample_s4_l1(cur, tid, 3)
  100. if not rows:
  101. print(" (无数据)")
  102. for r in rows:
  103. print(f" {str(r['biz_date']):<12} {r['metric_code']:<20} 实际={r['metric_value']} 目标={r['target_value']} 颜色={r['status_color']}")
  104. conn.close()
  105. if __name__ == "__main__":
  106. main()