Files
FunctionalScaffold/scripts/test_metrics.py
Roog (顾新培) 5921f71756 main:添加核心文件并初始化项目
新增内容:
- 创建基础项目结构。
- 添加 `.gitignore` 和 `.dockerignore` 文件。
- 编写 `pyproject.toml` 和依赖文件。
- 添加算法模块及示例算法。
- 实现核心功能模块(日志、错误处理、指标)。
- 添加开发和运行所需的相关脚本文件及文档。
2026-02-03 18:38:08 +08:00

263 lines
8.0 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""指标方案测试脚本"""
import requests
import time
import sys
from typing import Literal
MetricsBackend = Literal["pushgateway", "redis", "memory"]
def test_pushgateway():
"""测试 Pushgateway 方案"""
print("\n=== 测试 Pushgateway 方案 ===\n")
# 1. 检查 Pushgateway 是否运行
try:
response = requests.get("http://localhost:9091/metrics", timeout=2)
print(f"✓ Pushgateway 运行正常 (状态码: {response.status_code})")
except Exception as e:
print(f"✗ Pushgateway 未运行: {e}")
return False
# 2. 发送测试请求到应用
print("\n发送测试请求...")
for i in range(5):
try:
response = requests.post(
"http://localhost:8000/invoke",
json={"number": 17},
timeout=5,
)
print(f" 请求 {i+1}: {response.status_code}")
time.sleep(0.5)
except Exception as e:
print(f" 请求 {i+1} 失败: {e}")
# 3. 等待指标推送
print("\n等待指标推送...")
time.sleep(2)
# 4. 检查 Pushgateway 中的指标
try:
response = requests.get("http://localhost:9091/metrics", timeout=2)
metrics = response.text
# 查找关键指标
if "http_requests_total" in metrics:
print("✓ 找到 http_requests_total 指标")
# 提取指标值
for line in metrics.split("\n"):
if "http_requests_total" in line and not line.startswith("#"):
print(f" {line}")
else:
print("✗ 未找到 http_requests_total 指标")
if "algorithm_executions_total" in metrics:
print("✓ 找到 algorithm_executions_total 指标")
for line in metrics.split("\n"):
if "algorithm_executions_total" in line and not line.startswith("#"):
print(f" {line}")
else:
print("✗ 未找到 algorithm_executions_total 指标")
except Exception as e:
print(f"✗ 获取指标失败: {e}")
return False
# 5. 检查 Prometheus 是否能抓取
print("\n检查 Prometheus...")
try:
response = requests.get(
"http://localhost:9090/api/v1/query",
params={"query": "http_requests_total"},
timeout=5,
)
data = response.json()
if data["status"] == "success" and data["data"]["result"]:
print(f"✓ Prometheus 成功抓取指标,找到 {len(data['data']['result'])} 条记录")
for result in data["data"]["result"][:3]:
print(f" {result['metric']} = {result['value'][1]}")
else:
print("✗ Prometheus 未找到指标")
except Exception as e:
print(f"✗ Prometheus 查询失败: {e}")
return True
def test_redis():
"""测试 Redis 方案"""
print("\n=== 测试 Redis 方案 ===\n")
# 1. 检查 Redis 是否运行
try:
import redis
client = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
client.ping()
print("✓ Redis 运行正常")
except ImportError:
print("✗ Redis 库未安装,请运行: pip install redis")
return False
except Exception as e:
print(f"✗ Redis 未运行: {e}")
return False
# 2. 清空测试数据
print("\n清空旧数据...")
try:
keys = client.keys("metrics:*")
if keys:
client.delete(*keys)
print(f" 删除了 {len(keys)} 个键")
except Exception as e:
print(f" 清空失败: {e}")
# 3. 发送测试请求
print("\n发送测试请求...")
for i in range(5):
try:
response = requests.post(
"http://localhost:8000/invoke",
json={"number": 17},
timeout=5,
)
print(f" 请求 {i+1}: {response.status_code}")
time.sleep(0.5)
except Exception as e:
print(f" 请求 {i+1} 失败: {e}")
# 4. 检查 Redis 中的指标
print("\n检查 Redis 指标...")
try:
# 检查计数器
counter_data = client.hgetall("metrics:request_counter")
if counter_data:
print(f"✓ 找到 {len(counter_data)} 个请求计数器指标")
for key, value in list(counter_data.items())[:5]:
if not key.endswith(":timestamp"):
print(f" {key} = {value}")
else:
print("✗ 未找到请求计数器指标")
# 检查算法计数器
algo_data = client.hgetall("metrics:algorithm_counter")
if algo_data:
print(f"✓ 找到 {len(algo_data)} 个算法计数器指标")
for key, value in list(algo_data.items())[:5]:
if not key.endswith(":timestamp"):
print(f" {key} = {value}")
else:
print("✗ 未找到算法计数器指标")
except Exception as e:
print(f"✗ 检查 Redis 失败: {e}")
return False
# 5. 检查 Redis Exporter
print("\n检查 Redis Exporter...")
try:
response = requests.get("http://localhost:8001/metrics", timeout=2)
metrics = response.text
if "http_requests_total" in metrics:
print("✓ Exporter 成功导出 http_requests_total")
for line in metrics.split("\n"):
if "http_requests_total" in line and not line.startswith("#"):
print(f" {line}")
break
else:
print("✗ Exporter 未导出 http_requests_total")
except Exception as e:
print(f"✗ Redis Exporter 未运行: {e}")
return True
def test_memory():
"""测试原有的内存方案"""
print("\n=== 测试内存方案(原有方案)===\n")
# 发送测试请求
print("发送测试请求...")
for i in range(5):
try:
response = requests.post(
"http://localhost:8000/invoke",
json={"number": 17},
timeout=5,
)
print(f" 请求 {i+1}: {response.status_code}")
time.sleep(0.5)
except Exception as e:
print(f" 请求 {i+1} 失败: {e}")
# 检查应用的 /metrics 端点
print("\n检查应用 /metrics 端点...")
try:
response = requests.get("http://localhost:8000/metrics", timeout=2)
metrics = response.text
if "http_requests_total" in metrics:
print("✓ 找到 http_requests_total 指标")
for line in metrics.split("\n"):
if "http_requests_total" in line and not line.startswith("#"):
print(f" {line}")
break
else:
print("✗ 未找到指标")
except Exception as e:
print(f"✗ 获取指标失败: {e}")
return False
print("\n⚠️ 注意:内存方案在多实例部署时,每个实例的指标是独立的")
return True
def main():
"""主函数"""
print("=" * 60)
print("FunctionalScaffold 指标方案测试")
print("=" * 60)
if len(sys.argv) > 1:
backend = sys.argv[1]
else:
print("\n请选择要测试的方案:")
print("1. Pushgateway推荐")
print("2. Redis + Exporter")
print("3. Memory原有方案")
choice = input("\n输入选项 (1/2/3): ").strip()
backend_map = {"1": "pushgateway", "2": "redis", "3": "memory"}
backend = backend_map.get(choice, "pushgateway")
print(f"\n选择的方案: {backend}")
# 运行测试
if backend == "pushgateway":
success = test_pushgateway()
elif backend == "redis":
success = test_redis()
elif backend == "memory":
success = test_memory()
else:
print(f"未知的方案: {backend}")
sys.exit(1)
# 输出结果
print("\n" + "=" * 60)
if success:
print("✓ 测试通过")
else:
print("✗ 测试失败")
print("=" * 60)
if __name__ == "__main__":
main()