main:添加核心文件并初始化项目

新增内容:
- 创建基础项目结构。
- 添加 `.gitignore` 和 `.dockerignore` 文件。
- 编写 `pyproject.toml` 和依赖文件。
- 添加算法模块及示例算法。
- 实现核心功能模块(日志、错误处理、指标)。
- 添加开发和运行所需的相关脚本文件及文档。
This commit is contained in:
2026-02-02 10:46:01 +08:00
commit 31af5e2286
54 changed files with 5726 additions and 0 deletions

262
scripts/test_metrics.py Executable file
View File

@@ -0,0 +1,262 @@
#!/usr/bin/env python3
"""指标方案测试脚本"""
import requests
import time
import sys
from typing import Literal
MetricsBackend = Literal["pushgateway", "redis", "memory"]
def test_pushgateway():
"""测试 Pushgateway 方案"""
print("\n=== 测试 Pushgateway 方案 ===\n")
# 1. 检查 Pushgateway 是否运行
try:
response = requests.get("http://localhost:9091/metrics", timeout=2)
print(f"✓ Pushgateway 运行正常 (状态码: {response.status_code})")
except Exception as e:
print(f"✗ Pushgateway 未运行: {e}")
return False
# 2. 发送测试请求到应用
print("\n发送测试请求...")
for i in range(5):
try:
response = requests.post(
"http://localhost:8000/invoke",
json={"number": 17},
timeout=5,
)
print(f" 请求 {i+1}: {response.status_code}")
time.sleep(0.5)
except Exception as e:
print(f" 请求 {i+1} 失败: {e}")
# 3. 等待指标推送
print("\n等待指标推送...")
time.sleep(2)
# 4. 检查 Pushgateway 中的指标
try:
response = requests.get("http://localhost:9091/metrics", timeout=2)
metrics = response.text
# 查找关键指标
if "http_requests_total" in metrics:
print("✓ 找到 http_requests_total 指标")
# 提取指标值
for line in metrics.split("\n"):
if "http_requests_total" in line and not line.startswith("#"):
print(f" {line}")
else:
print("✗ 未找到 http_requests_total 指标")
if "algorithm_executions_total" in metrics:
print("✓ 找到 algorithm_executions_total 指标")
for line in metrics.split("\n"):
if "algorithm_executions_total" in line and not line.startswith("#"):
print(f" {line}")
else:
print("✗ 未找到 algorithm_executions_total 指标")
except Exception as e:
print(f"✗ 获取指标失败: {e}")
return False
# 5. 检查 Prometheus 是否能抓取
print("\n检查 Prometheus...")
try:
response = requests.get(
"http://localhost:9090/api/v1/query",
params={"query": "http_requests_total"},
timeout=5,
)
data = response.json()
if data["status"] == "success" and data["data"]["result"]:
print(f"✓ Prometheus 成功抓取指标,找到 {len(data['data']['result'])} 条记录")
for result in data["data"]["result"][:3]:
print(f" {result['metric']} = {result['value'][1]}")
else:
print("✗ Prometheus 未找到指标")
except Exception as e:
print(f"✗ Prometheus 查询失败: {e}")
return True
def test_redis():
"""测试 Redis 方案"""
print("\n=== 测试 Redis 方案 ===\n")
# 1. 检查 Redis 是否运行
try:
import redis
client = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
client.ping()
print("✓ Redis 运行正常")
except ImportError:
print("✗ Redis 库未安装,请运行: pip install redis")
return False
except Exception as e:
print(f"✗ Redis 未运行: {e}")
return False
# 2. 清空测试数据
print("\n清空旧数据...")
try:
keys = client.keys("metrics:*")
if keys:
client.delete(*keys)
print(f" 删除了 {len(keys)} 个键")
except Exception as e:
print(f" 清空失败: {e}")
# 3. 发送测试请求
print("\n发送测试请求...")
for i in range(5):
try:
response = requests.post(
"http://localhost:8000/invoke",
json={"number": 17},
timeout=5,
)
print(f" 请求 {i+1}: {response.status_code}")
time.sleep(0.5)
except Exception as e:
print(f" 请求 {i+1} 失败: {e}")
# 4. 检查 Redis 中的指标
print("\n检查 Redis 指标...")
try:
# 检查计数器
counter_data = client.hgetall("metrics:request_counter")
if counter_data:
print(f"✓ 找到 {len(counter_data)} 个请求计数器指标")
for key, value in list(counter_data.items())[:5]:
if not key.endswith(":timestamp"):
print(f" {key} = {value}")
else:
print("✗ 未找到请求计数器指标")
# 检查算法计数器
algo_data = client.hgetall("metrics:algorithm_counter")
if algo_data:
print(f"✓ 找到 {len(algo_data)} 个算法计数器指标")
for key, value in list(algo_data.items())[:5]:
if not key.endswith(":timestamp"):
print(f" {key} = {value}")
else:
print("✗ 未找到算法计数器指标")
except Exception as e:
print(f"✗ 检查 Redis 失败: {e}")
return False
# 5. 检查 Redis Exporter
print("\n检查 Redis Exporter...")
try:
response = requests.get("http://localhost:8001/metrics", timeout=2)
metrics = response.text
if "http_requests_total" in metrics:
print("✓ Exporter 成功导出 http_requests_total")
for line in metrics.split("\n"):
if "http_requests_total" in line and not line.startswith("#"):
print(f" {line}")
break
else:
print("✗ Exporter 未导出 http_requests_total")
except Exception as e:
print(f"✗ Redis Exporter 未运行: {e}")
return True
def test_memory():
"""测试原有的内存方案"""
print("\n=== 测试内存方案(原有方案)===\n")
# 发送测试请求
print("发送测试请求...")
for i in range(5):
try:
response = requests.post(
"http://localhost:8000/invoke",
json={"number": 17},
timeout=5,
)
print(f" 请求 {i+1}: {response.status_code}")
time.sleep(0.5)
except Exception as e:
print(f" 请求 {i+1} 失败: {e}")
# 检查应用的 /metrics 端点
print("\n检查应用 /metrics 端点...")
try:
response = requests.get("http://localhost:8000/metrics", timeout=2)
metrics = response.text
if "http_requests_total" in metrics:
print("✓ 找到 http_requests_total 指标")
for line in metrics.split("\n"):
if "http_requests_total" in line and not line.startswith("#"):
print(f" {line}")
break
else:
print("✗ 未找到指标")
except Exception as e:
print(f"✗ 获取指标失败: {e}")
return False
print("\n⚠️ 注意:内存方案在多实例部署时,每个实例的指标是独立的")
return True
def main():
"""主函数"""
print("=" * 60)
print("FunctionalScaffold 指标方案测试")
print("=" * 60)
if len(sys.argv) > 1:
backend = sys.argv[1]
else:
print("\n请选择要测试的方案:")
print("1. Pushgateway推荐")
print("2. Redis + Exporter")
print("3. Memory原有方案")
choice = input("\n输入选项 (1/2/3): ").strip()
backend_map = {"1": "pushgateway", "2": "redis", "3": "memory"}
backend = backend_map.get(choice, "pushgateway")
print(f"\n选择的方案: {backend}")
# 运行测试
if backend == "pushgateway":
success = test_pushgateway()
elif backend == "redis":
success = test_redis()
elif backend == "memory":
success = test_memory()
else:
print(f"未知的方案: {backend}")
sys.exit(1)
# 输出结果
print("\n" + "=" * 60)
if success:
print("✓ 测试通过")
else:
print("✗ 测试失败")
print("=" * 60)
if __name__ == "__main__":
main()