新增内容: - 创建基础项目结构。 - 添加 `.gitignore` 和 `.dockerignore` 文件。 - 编写 `pyproject.toml` 和依赖文件。 - 添加算法模块及示例算法。 - 实现核心功能模块(日志、错误处理、指标)。 - 添加开发和运行所需的相关脚本文件及文档。
263 lines
8.0 KiB
Python
Executable File
263 lines
8.0 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""指标方案测试脚本"""
|
||
|
||
import requests
|
||
import time
|
||
import sys
|
||
from typing import Literal
|
||
|
||
MetricsBackend = Literal["pushgateway", "redis", "memory"]
|
||
|
||
|
||
def test_pushgateway():
|
||
"""测试 Pushgateway 方案"""
|
||
print("\n=== 测试 Pushgateway 方案 ===\n")
|
||
|
||
# 1. 检查 Pushgateway 是否运行
|
||
try:
|
||
response = requests.get("http://localhost:9091/metrics", timeout=2)
|
||
print(f"✓ Pushgateway 运行正常 (状态码: {response.status_code})")
|
||
except Exception as e:
|
||
print(f"✗ Pushgateway 未运行: {e}")
|
||
return False
|
||
|
||
# 2. 发送测试请求到应用
|
||
print("\n发送测试请求...")
|
||
for i in range(5):
|
||
try:
|
||
response = requests.post(
|
||
"http://localhost:8000/invoke",
|
||
json={"number": 17},
|
||
timeout=5,
|
||
)
|
||
print(f" 请求 {i+1}: {response.status_code}")
|
||
time.sleep(0.5)
|
||
except Exception as e:
|
||
print(f" 请求 {i+1} 失败: {e}")
|
||
|
||
# 3. 等待指标推送
|
||
print("\n等待指标推送...")
|
||
time.sleep(2)
|
||
|
||
# 4. 检查 Pushgateway 中的指标
|
||
try:
|
||
response = requests.get("http://localhost:9091/metrics", timeout=2)
|
||
metrics = response.text
|
||
|
||
# 查找关键指标
|
||
if "http_requests_total" in metrics:
|
||
print("✓ 找到 http_requests_total 指标")
|
||
# 提取指标值
|
||
for line in metrics.split("\n"):
|
||
if "http_requests_total" in line and not line.startswith("#"):
|
||
print(f" {line}")
|
||
else:
|
||
print("✗ 未找到 http_requests_total 指标")
|
||
|
||
if "algorithm_executions_total" in metrics:
|
||
print("✓ 找到 algorithm_executions_total 指标")
|
||
for line in metrics.split("\n"):
|
||
if "algorithm_executions_total" in line and not line.startswith("#"):
|
||
print(f" {line}")
|
||
else:
|
||
print("✗ 未找到 algorithm_executions_total 指标")
|
||
|
||
except Exception as e:
|
||
print(f"✗ 获取指标失败: {e}")
|
||
return False
|
||
|
||
# 5. 检查 Prometheus 是否能抓取
|
||
print("\n检查 Prometheus...")
|
||
try:
|
||
response = requests.get(
|
||
"http://localhost:9090/api/v1/query",
|
||
params={"query": "http_requests_total"},
|
||
timeout=5,
|
||
)
|
||
data = response.json()
|
||
if data["status"] == "success" and data["data"]["result"]:
|
||
print(f"✓ Prometheus 成功抓取指标,找到 {len(data['data']['result'])} 条记录")
|
||
for result in data["data"]["result"][:3]:
|
||
print(f" {result['metric']} = {result['value'][1]}")
|
||
else:
|
||
print("✗ Prometheus 未找到指标")
|
||
except Exception as e:
|
||
print(f"✗ Prometheus 查询失败: {e}")
|
||
|
||
return True
|
||
|
||
|
||
def test_redis():
|
||
"""测试 Redis 方案"""
|
||
print("\n=== 测试 Redis 方案 ===\n")
|
||
|
||
# 1. 检查 Redis 是否运行
|
||
try:
|
||
import redis
|
||
|
||
client = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
|
||
client.ping()
|
||
print("✓ Redis 运行正常")
|
||
except ImportError:
|
||
print("✗ Redis 库未安装,请运行: pip install redis")
|
||
return False
|
||
except Exception as e:
|
||
print(f"✗ Redis 未运行: {e}")
|
||
return False
|
||
|
||
# 2. 清空测试数据
|
||
print("\n清空旧数据...")
|
||
try:
|
||
keys = client.keys("metrics:*")
|
||
if keys:
|
||
client.delete(*keys)
|
||
print(f" 删除了 {len(keys)} 个键")
|
||
except Exception as e:
|
||
print(f" 清空失败: {e}")
|
||
|
||
# 3. 发送测试请求
|
||
print("\n发送测试请求...")
|
||
for i in range(5):
|
||
try:
|
||
response = requests.post(
|
||
"http://localhost:8000/invoke",
|
||
json={"number": 17},
|
||
timeout=5,
|
||
)
|
||
print(f" 请求 {i+1}: {response.status_code}")
|
||
time.sleep(0.5)
|
||
except Exception as e:
|
||
print(f" 请求 {i+1} 失败: {e}")
|
||
|
||
# 4. 检查 Redis 中的指标
|
||
print("\n检查 Redis 指标...")
|
||
try:
|
||
# 检查计数器
|
||
counter_data = client.hgetall("metrics:request_counter")
|
||
if counter_data:
|
||
print(f"✓ 找到 {len(counter_data)} 个请求计数器指标")
|
||
for key, value in list(counter_data.items())[:5]:
|
||
if not key.endswith(":timestamp"):
|
||
print(f" {key} = {value}")
|
||
else:
|
||
print("✗ 未找到请求计数器指标")
|
||
|
||
# 检查算法计数器
|
||
algo_data = client.hgetall("metrics:algorithm_counter")
|
||
if algo_data:
|
||
print(f"✓ 找到 {len(algo_data)} 个算法计数器指标")
|
||
for key, value in list(algo_data.items())[:5]:
|
||
if not key.endswith(":timestamp"):
|
||
print(f" {key} = {value}")
|
||
else:
|
||
print("✗ 未找到算法计数器指标")
|
||
|
||
except Exception as e:
|
||
print(f"✗ 检查 Redis 失败: {e}")
|
||
return False
|
||
|
||
# 5. 检查 Redis Exporter
|
||
print("\n检查 Redis Exporter...")
|
||
try:
|
||
response = requests.get("http://localhost:8001/metrics", timeout=2)
|
||
metrics = response.text
|
||
|
||
if "http_requests_total" in metrics:
|
||
print("✓ Exporter 成功导出 http_requests_total")
|
||
for line in metrics.split("\n"):
|
||
if "http_requests_total" in line and not line.startswith("#"):
|
||
print(f" {line}")
|
||
break
|
||
else:
|
||
print("✗ Exporter 未导出 http_requests_total")
|
||
|
||
except Exception as e:
|
||
print(f"✗ Redis Exporter 未运行: {e}")
|
||
|
||
return True
|
||
|
||
|
||
def test_memory():
|
||
"""测试原有的内存方案"""
|
||
print("\n=== 测试内存方案(原有方案)===\n")
|
||
|
||
# 发送测试请求
|
||
print("发送测试请求...")
|
||
for i in range(5):
|
||
try:
|
||
response = requests.post(
|
||
"http://localhost:8000/invoke",
|
||
json={"number": 17},
|
||
timeout=5,
|
||
)
|
||
print(f" 请求 {i+1}: {response.status_code}")
|
||
time.sleep(0.5)
|
||
except Exception as e:
|
||
print(f" 请求 {i+1} 失败: {e}")
|
||
|
||
# 检查应用的 /metrics 端点
|
||
print("\n检查应用 /metrics 端点...")
|
||
try:
|
||
response = requests.get("http://localhost:8000/metrics", timeout=2)
|
||
metrics = response.text
|
||
|
||
if "http_requests_total" in metrics:
|
||
print("✓ 找到 http_requests_total 指标")
|
||
for line in metrics.split("\n"):
|
||
if "http_requests_total" in line and not line.startswith("#"):
|
||
print(f" {line}")
|
||
break
|
||
else:
|
||
print("✗ 未找到指标")
|
||
|
||
except Exception as e:
|
||
print(f"✗ 获取指标失败: {e}")
|
||
return False
|
||
|
||
print("\n⚠️ 注意:内存方案在多实例部署时,每个实例的指标是独立的")
|
||
return True
|
||
|
||
|
||
def main():
|
||
"""主函数"""
|
||
print("=" * 60)
|
||
print("FunctionalScaffold 指标方案测试")
|
||
print("=" * 60)
|
||
|
||
if len(sys.argv) > 1:
|
||
backend = sys.argv[1]
|
||
else:
|
||
print("\n请选择要测试的方案:")
|
||
print("1. Pushgateway(推荐)")
|
||
print("2. Redis + Exporter")
|
||
print("3. Memory(原有方案)")
|
||
choice = input("\n输入选项 (1/2/3): ").strip()
|
||
|
||
backend_map = {"1": "pushgateway", "2": "redis", "3": "memory"}
|
||
backend = backend_map.get(choice, "pushgateway")
|
||
|
||
print(f"\n选择的方案: {backend}")
|
||
|
||
# 运行测试
|
||
if backend == "pushgateway":
|
||
success = test_pushgateway()
|
||
elif backend == "redis":
|
||
success = test_redis()
|
||
elif backend == "memory":
|
||
success = test_memory()
|
||
else:
|
||
print(f"未知的方案: {backend}")
|
||
sys.exit(1)
|
||
|
||
# 输出结果
|
||
print("\n" + "=" * 60)
|
||
if success:
|
||
print("✓ 测试通过")
|
||
else:
|
||
print("✗ 测试失败")
|
||
print("=" * 60)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|