#!/usr/bin/env python3 """指标方案测试脚本""" import requests import time import sys from typing import Literal MetricsBackend = Literal["pushgateway", "redis", "memory"] def test_pushgateway(): """测试 Pushgateway 方案""" print("\n=== 测试 Pushgateway 方案 ===\n") # 1. 检查 Pushgateway 是否运行 try: response = requests.get("http://localhost:9091/metrics", timeout=2) print(f"✓ Pushgateway 运行正常 (状态码: {response.status_code})") except Exception as e: print(f"✗ Pushgateway 未运行: {e}") return False # 2. 发送测试请求到应用 print("\n发送测试请求...") for i in range(5): try: response = requests.post( "http://localhost:8000/invoke", json={"number": 17}, timeout=5, ) print(f" 请求 {i+1}: {response.status_code}") time.sleep(0.5) except Exception as e: print(f" 请求 {i+1} 失败: {e}") # 3. 等待指标推送 print("\n等待指标推送...") time.sleep(2) # 4. 检查 Pushgateway 中的指标 try: response = requests.get("http://localhost:9091/metrics", timeout=2) metrics = response.text # 查找关键指标 if "http_requests_total" in metrics: print("✓ 找到 http_requests_total 指标") # 提取指标值 for line in metrics.split("\n"): if "http_requests_total" in line and not line.startswith("#"): print(f" {line}") else: print("✗ 未找到 http_requests_total 指标") if "algorithm_executions_total" in metrics: print("✓ 找到 algorithm_executions_total 指标") for line in metrics.split("\n"): if "algorithm_executions_total" in line and not line.startswith("#"): print(f" {line}") else: print("✗ 未找到 algorithm_executions_total 指标") except Exception as e: print(f"✗ 获取指标失败: {e}") return False # 5. 检查 Prometheus 是否能抓取 print("\n检查 Prometheus...") try: response = requests.get( "http://localhost:9090/api/v1/query", params={"query": "http_requests_total"}, timeout=5, ) data = response.json() if data["status"] == "success" and data["data"]["result"]: print(f"✓ Prometheus 成功抓取指标,找到 {len(data['data']['result'])} 条记录") for result in data["data"]["result"][:3]: print(f" {result['metric']} = {result['value'][1]}") else: print("✗ Prometheus 未找到指标") except Exception as e: print(f"✗ Prometheus 查询失败: {e}") return True def test_redis(): """测试 Redis 方案""" print("\n=== 测试 Redis 方案 ===\n") # 1. 检查 Redis 是否运行 try: import redis client = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True) client.ping() print("✓ Redis 运行正常") except ImportError: print("✗ Redis 库未安装,请运行: pip install redis") return False except Exception as e: print(f"✗ Redis 未运行: {e}") return False # 2. 清空测试数据 print("\n清空旧数据...") try: keys = client.keys("metrics:*") if keys: client.delete(*keys) print(f" 删除了 {len(keys)} 个键") except Exception as e: print(f" 清空失败: {e}") # 3. 发送测试请求 print("\n发送测试请求...") for i in range(5): try: response = requests.post( "http://localhost:8000/invoke", json={"number": 17}, timeout=5, ) print(f" 请求 {i+1}: {response.status_code}") time.sleep(0.5) except Exception as e: print(f" 请求 {i+1} 失败: {e}") # 4. 检查 Redis 中的指标 print("\n检查 Redis 指标...") try: # 检查计数器 counter_data = client.hgetall("metrics:request_counter") if counter_data: print(f"✓ 找到 {len(counter_data)} 个请求计数器指标") for key, value in list(counter_data.items())[:5]: if not key.endswith(":timestamp"): print(f" {key} = {value}") else: print("✗ 未找到请求计数器指标") # 检查算法计数器 algo_data = client.hgetall("metrics:algorithm_counter") if algo_data: print(f"✓ 找到 {len(algo_data)} 个算法计数器指标") for key, value in list(algo_data.items())[:5]: if not key.endswith(":timestamp"): print(f" {key} = {value}") else: print("✗ 未找到算法计数器指标") except Exception as e: print(f"✗ 检查 Redis 失败: {e}") return False # 5. 检查 Redis Exporter print("\n检查 Redis Exporter...") try: response = requests.get("http://localhost:8001/metrics", timeout=2) metrics = response.text if "http_requests_total" in metrics: print("✓ Exporter 成功导出 http_requests_total") for line in metrics.split("\n"): if "http_requests_total" in line and not line.startswith("#"): print(f" {line}") break else: print("✗ Exporter 未导出 http_requests_total") except Exception as e: print(f"✗ Redis Exporter 未运行: {e}") return True def test_memory(): """测试原有的内存方案""" print("\n=== 测试内存方案(原有方案)===\n") # 发送测试请求 print("发送测试请求...") for i in range(5): try: response = requests.post( "http://localhost:8000/invoke", json={"number": 17}, timeout=5, ) print(f" 请求 {i+1}: {response.status_code}") time.sleep(0.5) except Exception as e: print(f" 请求 {i+1} 失败: {e}") # 检查应用的 /metrics 端点 print("\n检查应用 /metrics 端点...") try: response = requests.get("http://localhost:8000/metrics", timeout=2) metrics = response.text if "http_requests_total" in metrics: print("✓ 找到 http_requests_total 指标") for line in metrics.split("\n"): if "http_requests_total" in line and not line.startswith("#"): print(f" {line}") break else: print("✗ 未找到指标") except Exception as e: print(f"✗ 获取指标失败: {e}") return False print("\n⚠️ 注意:内存方案在多实例部署时,每个实例的指标是独立的") return True def main(): """主函数""" print("=" * 60) print("FunctionalScaffold 指标方案测试") print("=" * 60) if len(sys.argv) > 1: backend = sys.argv[1] else: print("\n请选择要测试的方案:") print("1. Pushgateway(推荐)") print("2. Redis + Exporter") print("3. Memory(原有方案)") choice = input("\n输入选项 (1/2/3): ").strip() backend_map = {"1": "pushgateway", "2": "redis", "3": "memory"} backend = backend_map.get(choice, "pushgateway") print(f"\n选择的方案: {backend}") # 运行测试 if backend == "pushgateway": success = test_pushgateway() elif backend == "redis": success = test_redis() elif backend == "memory": success = test_memory() else: print(f"未知的方案: {backend}") sys.exit(1) # 输出结果 print("\n" + "=" * 60) if success: print("✓ 测试通过") else: print("✗ 测试失败") print("=" * 60) if __name__ == "__main__": main()