main:重构指标系统并切换为 Redis 方案

变更内容:
- 重构指标系统实现,支持基于 Redis 的多实例指标管理。
- 替换原有的 Pushgateway 和 Redis Exporter 方案。
- 更新 Prometheus 配置,适配新的指标抓取方式。
- 添加 Redis 指标相关配置和告警规则文件。
- 更新 Dockerfile 和 docker-compose 文件,移除多余服务,精简配置。
- 编写 `metrics_unified.py` 模块及单元测试。
- 修复部分代码中的冗余和格式问题。
This commit is contained in:
2026-02-02 13:30:28 +08:00
parent 31af5e2286
commit 241cffebc2
11 changed files with 1047 additions and 94 deletions

78
config/metrics.yaml Normal file
View File

@@ -0,0 +1,78 @@
# 指标配置文件
# 算法成员可以在此添加自定义指标
# Redis 连接配置(也可通过环境变量覆盖)
redis:
host: ${REDIS_HOST:localhost}
port: ${REDIS_PORT:6379}
db: ${REDIS_METRICS_DB:0}
password: ${REDIS_PASSWORD:}
# 全局配置
global:
prefix: "functional_scaffold" # 指标名称前缀
instance_label: true # 是否添加实例标签
# 内置指标(框架自动收集)
builtin_metrics:
http_requests:
enabled: true
name: "http_requests_total"
type: counter
description: "HTTP 请求总数"
labels: [method, endpoint, status]
http_latency:
enabled: true
name: "http_request_duration_seconds"
type: histogram
description: "HTTP 请求延迟"
labels: [method, endpoint]
buckets: [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10]
http_in_progress:
enabled: true
name: "http_requests_in_progress"
type: gauge
description: "当前进行中的 HTTP 请求数"
labels: []
algorithm_executions:
enabled: true
name: "algorithm_executions_total"
type: counter
description: "算法执行总数"
labels: [algorithm, status]
algorithm_latency:
enabled: true
name: "algorithm_execution_duration_seconds"
type: histogram
description: "算法执行延迟"
labels: [algorithm]
buckets: [0.01, 0.05, 0.1, 0.5, 1, 5, 10, 30, 60]
# 自定义指标(算法成员在此添加)
custom_metrics:
# 示例:质数判断结果统计
prime_check_results:
name: "prime_check_results_total"
type: counter
description: "质数判断结果统计"
labels: [is_prime]
# 示例:输入数字大小分布
input_number_size:
name: "input_number_size"
type: histogram
description: "输入数字大小分布"
labels: []
buckets: [10, 100, 1000, 10000, 100000, 1000000]
# 添加更多自定义指标...
# my_custom_metric:
# name: "my_metric_name"
# type: counter|gauge|histogram
# description: "指标描述"
# labels: [label1, label2]
# buckets: [...] # 仅 histogram 需要

View File

@@ -9,12 +9,15 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
# 复制依赖文件
COPY requirements.txt .
COPY requirements-dev.txt .
# 安装 Python 依赖
RUN pip install --no-cache-dir -r requirements.txt
RUN pip install --no-cache-dir -r requirements-dev.txt
# 复制应用代码
# 复制应用代码和配置
COPY src/ ./src/
COPY config/ ./config/
# 创建非 root 用户
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app

View File

@@ -11,19 +11,19 @@ services:
- APP_ENV=development
- LOG_LEVEL=INFO
- METRICS_ENABLED=true
# 方案1Pushgateway 配置
- PUSHGATEWAY_URL=pushgateway:9091
- METRICS_JOB_NAME=functional_scaffold
# 方案2Redis 配置
# Redis 指标存储配置
- REDIS_HOST=redis
- REDIS_PORT=6379
- REDIS_METRICS_DB=0
- REDIS_DB=0
# 指标配置文件路径
- METRICS_CONFIG_PATH=config/metrics.yaml
volumes:
- ../src:/app/src
- ../config:/app/config
restart: unless-stopped
depends_on:
- redis
- pushgateway
redis:
condition: service_healthy
healthcheck:
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/healthz')"]
interval: 30s
@@ -31,11 +31,11 @@ services:
retries: 3
start_period: 5s
# Redis - 用于集中式指标存储方案2
# Redis - 用于集中式指标存储
redis:
image: redis:7-alpine
ports:
- "6379:6379"
- "6380:6379"
volumes:
- redis_data:/data
command: redis-server --appendonly yes
@@ -46,47 +46,20 @@ services:
timeout: 3s
retries: 3
# Pushgateway - 用于短生命周期任务的指标推送方案1推荐
pushgateway:
image: prom/pushgateway:latest
ports:
- "9091:9091"
restart: unless-stopped
command:
- '--persistence.file=/data/pushgateway.data'
- '--persistence.interval=5m'
volumes:
- pushgateway_data:/data
# Redis Exporter - 将 Redis 指标导出为 Prometheus 格式方案2需要
redis-exporter:
build:
context: ..
dockerfile: deployment/Dockerfile.redis-exporter
ports:
- "8001:8001"
environment:
- REDIS_HOST=redis
- REDIS_PORT=6379
- REDIS_METRICS_DB=0
depends_on:
- redis
restart: unless-stopped
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ../monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
- ../monitoring/alerts:/etc/prometheus/rules
- prometheus_data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
restart: unless-stopped
depends_on:
- pushgateway
- redis-exporter
- app
grafana:
image: grafana/grafana:latest
@@ -105,4 +78,3 @@ volumes:
prometheus_data:
grafana_data:
redis_data:
pushgateway_data:

View File

@@ -2,38 +2,52 @@ groups:
- name: functional_scaffold_alerts
interval: 30s
rules:
# 高错误率告警
- alert: HighErrorRate
expr: rate(http_requests_total{status="error"}[5m]) > 0.05
for: 5m
labels:
severity: warning
annotations:
summary: "High error rate detected"
description: "Error rate is {{ $value }} requests/sec for {{ $labels.endpoint }}"
summary: "检测到高错误率"
description: "端点 {{ $labels.endpoint }} 的错误率为 {{ $value }} 请求/秒"
# 高延迟告警
- alert: HighLatency
expr: histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m])) > 1
for: 5m
labels:
severity: warning
annotations:
summary: "High latency detected"
description: "P95 latency is {{ $value }}s for {{ $labels.endpoint }}"
summary: "检测到高延迟"
description: "端点 {{ $labels.endpoint }} 的 P95 延迟为 {{ $value }}s"
# 服务不可用告警
- alert: ServiceDown
expr: up{job="functional-scaffold"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Service is down"
description: "FunctionalScaffold service has been down for more than 1 minute"
summary: "服务不可用"
description: "FunctionalScaffold 服务已停止超过 1 分钟"
- alert: HighMemoryUsage
expr: container_memory_usage_bytes{container="functional-scaffold"} / container_spec_memory_limit_bytes{container="functional-scaffold"} > 0.9
# 算法执行失败率告警
- alert: HighAlgorithmFailureRate
expr: rate(algorithm_executions_total{status="error"}[5m]) / rate(algorithm_executions_total[5m]) > 0.1
for: 5m
labels:
severity: warning
annotations:
summary: "High memory usage"
description: "Memory usage is {{ $value | humanizePercentage }} of limit"
summary: "算法执行失败率过高"
description: "算法 {{ $labels.algorithm }} 的失败率超过 10%"
# 算法执行延迟告警
- alert: HighAlgorithmLatency
expr: histogram_quantile(0.95, rate(algorithm_execution_duration_seconds_bucket[5m])) > 5
for: 5m
labels:
severity: warning
annotations:
summary: "算法执行延迟过高"
description: "算法 {{ $labels.algorithm }} 的 P95 延迟为 {{ $value }}s"

View File

@@ -8,27 +8,13 @@ global:
# 抓取配置
scrape_configs:
# 方案1从 Pushgateway 抓取指标(推荐
- job_name: 'pushgateway'
honor_labels: true
static_configs:
- targets: ['pushgateway:9091']
metric_relabel_configs:
# 保留 instance 标签
- source_labels: [instance]
target_label: instance
action: replace
# 方案2从 Redis Exporter 抓取指标
- job_name: 'redis-exporter'
static_configs:
- targets: ['redis-exporter:8001']
# 直接从应用实例抓取(如果有多个实例,需要配置服务发现)
- job_name: 'app'
# 从应用实例抓取指标Redis 统一指标方案
# 应用通过 /metrics 端点从 Redis 读取并导出 Prometheus 格式指标
- job_name: 'functional-scaffold'
static_configs:
- targets: ['app:8000']
metrics_path: '/metrics'
scrape_interval: 10s
# Prometheus 自身监控
- job_name: 'prometheus'
@@ -37,7 +23,7 @@ scrape_configs:
# 告警规则文件
rule_files:
- '/etc/prometheus/rules/*.yml'
- '/etc/prometheus/rules/*.yaml'
# Alertmanager 配置(可选)
# alerting:

View File

@@ -8,3 +8,6 @@ python-json-logger>=2.0.7
# 指标存储方案(可选,根据选择的方案安装)
# 方案2Redis 方案需要
redis>=5.0.0
# YAML 配置解析
pyyaml>=6.0.0

View File

@@ -32,7 +32,7 @@ class BaseAlgorithm(ABC):
Returns:
Dict[str, Any]: 包含结果和元数据的字典
"""
from ..core.metrics import algorithm_counter, algorithm_latency
from ..core.metrics_unified import incr, observe
start_time = time.time()
status = "success"
@@ -42,9 +42,7 @@ class BaseAlgorithm(ABC):
result = self.process(*args, **kwargs)
elapsed_time = time.time() - start_time
logger.info(
f"Algorithm {self.name} completed successfully in {elapsed_time:.3f}s"
)
logger.info(f"Algorithm {self.name} completed successfully in {elapsed_time:.3f}s")
return {
"success": True,
@@ -73,5 +71,5 @@ class BaseAlgorithm(ABC):
finally:
# 记录算法执行指标
elapsed_time = time.time() - start_time
algorithm_counter.labels(algorithm=self.name, status=status).inc()
algorithm_latency.labels(algorithm=self.name).observe(elapsed_time)
incr("algorithm_executions_total", {"algorithm": self.name, "status": status})
observe("algorithm_execution_duration_seconds", {"algorithm": self.name}, elapsed_time)

View File

@@ -8,10 +8,7 @@ from typing import Optional
class Settings(BaseSettings):
"""应用配置"""
model_config = ConfigDict(
env_file=".env",
case_sensitive=False
)
model_config = ConfigDict(env_file=".env", case_sensitive=False)
# 应用信息
app_name: str = "FunctionalScaffold"
@@ -42,6 +39,16 @@ class Settings(BaseSettings):
database_url: Optional[str] = None
# Redis 配置
redis_host: str = "localhost"
redis_port: int = 6379
redis_db: int = 0
redis_password: Optional[str] = None
# 指标配置
metrics_config_path: str = "config/metrics.yaml"
metrics_instance_id: Optional[str] = None # 默认使用 hostname
# 全局配置实例
settings = Settings()

View File

@@ -0,0 +1,605 @@
"""统一指标管理模块
基于 Redis 的指标收集方案,支持多实例部署和 YAML 配置。
"""
import os
import re
import socket
import logging
from pathlib import Path
from typing import Any, Dict, List, Optional
from functools import wraps
import time
import yaml
import redis
logger = logging.getLogger(__name__)
class MetricsManager:
"""统一指标管理器
支持从 YAML 配置文件加载指标定义,使用 Redis 存储指标数据,
并导出 Prometheus 格式的指标。
"""
def __init__(self, config_path: Optional[str] = None):
"""初始化指标管理器
Args:
config_path: 配置文件路径,默认从 settings 获取
"""
from ..config import settings
self.config_path = config_path or settings.metrics_config_path
self.instance_id = settings.metrics_instance_id or socket.gethostname()
self.config: Dict[str, Any] = {}
self.metrics_definitions: Dict[str, Dict[str, Any]] = {}
self._redis_client: Optional[redis.Redis] = None
self._redis_available = False
# 加载配置
self._load_config()
# 初始化 Redis 连接
self._init_redis()
# 注册指标定义
self._register_metrics()
def _load_config(self) -> None:
"""加载 YAML 配置文件"""
# 尝试多个路径
paths_to_try = [
Path(self.config_path),
Path.cwd() / self.config_path,
Path(__file__).parent.parent.parent.parent / self.config_path,
]
for path in paths_to_try:
if path.exists():
with open(path, "r", encoding="utf-8") as f:
content = f.read()
# 处理环境变量替换 ${VAR:default}
content = self._substitute_env_vars(content)
self.config = yaml.safe_load(content) or {}
logger.info(f"已加载指标配置文件: {path}")
return
logger.warning(f"未找到指标配置文件: {self.config_path},使用默认配置")
self.config = self._get_default_config()
def _substitute_env_vars(self, content: str) -> str:
"""替换配置中的环境变量
支持格式: ${VAR_NAME:default_value}
"""
pattern = r"\$\{([^}:]+)(?::([^}]*))?\}"
def replacer(match):
var_name = match.group(1)
default_value = match.group(2) or ""
return os.environ.get(var_name, default_value)
return re.sub(pattern, replacer, content)
def _get_default_config(self) -> Dict[str, Any]:
"""获取默认配置"""
return {
"redis": {
"host": "localhost",
"port": 6379,
"db": 0,
"password": "",
},
"global": {
"prefix": "functional_scaffold",
"instance_label": True,
},
"builtin_metrics": {
"http_requests": {
"enabled": True,
"name": "http_requests_total",
"type": "counter",
"description": "HTTP 请求总数",
"labels": ["method", "endpoint", "status"],
},
"http_latency": {
"enabled": True,
"name": "http_request_duration_seconds",
"type": "histogram",
"description": "HTTP 请求延迟",
"labels": ["method", "endpoint"],
"buckets": [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10],
},
"http_in_progress": {
"enabled": True,
"name": "http_requests_in_progress",
"type": "gauge",
"description": "当前进行中的 HTTP 请求数",
"labels": [],
},
"algorithm_executions": {
"enabled": True,
"name": "algorithm_executions_total",
"type": "counter",
"description": "算法执行总数",
"labels": ["algorithm", "status"],
},
"algorithm_latency": {
"enabled": True,
"name": "algorithm_execution_duration_seconds",
"type": "histogram",
"description": "算法执行延迟",
"labels": ["algorithm"],
"buckets": [0.01, 0.05, 0.1, 0.5, 1, 5, 10, 30, 60],
},
},
"custom_metrics": {},
}
def _init_redis(self) -> None:
"""初始化 Redis 连接"""
from ..config import settings
redis_config = self.config.get("redis", {})
host = redis_config.get("host") or settings.redis_host
port = int(redis_config.get("port") or settings.redis_port)
db = int(redis_config.get("db") or settings.redis_db)
password = redis_config.get("password") or settings.redis_password
try:
self._redis_client = redis.Redis(
host=host,
port=port,
db=db,
password=password if password else None,
decode_responses=True,
socket_connect_timeout=5,
socket_timeout=5,
)
# 测试连接
self._redis_client.ping()
self._redis_available = True
logger.info(f"Redis 连接成功: {host}:{port}/{db}")
except redis.ConnectionError as e:
logger.warning(f"Redis 连接失败: {e},指标将不会被收集")
self._redis_available = False
except Exception as e:
logger.warning(f"Redis 初始化异常: {e},指标将不会被收集")
self._redis_available = False
def _register_metrics(self) -> None:
"""注册所有指标定义"""
# 注册内置指标
builtin = self.config.get("builtin_metrics", {})
for key, metric_def in builtin.items():
if metric_def.get("enabled", True):
name = metric_def.get("name", key)
self.metrics_definitions[name] = {
"type": metric_def.get("type", "counter"),
"description": metric_def.get("description", ""),
"labels": metric_def.get("labels", []),
"buckets": metric_def.get("buckets", []),
}
# 注册自定义指标
custom = self.config.get("custom_metrics", {})
for key, metric_def in custom.items():
name = metric_def.get("name", key)
self.metrics_definitions[name] = {
"type": metric_def.get("type", "counter"),
"description": metric_def.get("description", ""),
"labels": metric_def.get("labels", []),
"buckets": metric_def.get("buckets", []),
}
logger.info(f"已注册 {len(self.metrics_definitions)} 个指标定义")
def _labels_to_key(self, labels: Optional[Dict[str, str]]) -> str:
"""将标签字典转换为 Redis key 的一部分"""
if not labels:
return ""
sorted_items = sorted(labels.items())
return ",".join(f"{k}={v}" for k, v in sorted_items)
def _key_to_prometheus_labels(self, key: str) -> str:
"""将 Redis key 格式转换为 Prometheus 标签格式(带引号)
输入: endpoint=/healthz,method=GET,status=success
输出: endpoint="/healthz",method="GET",status="success"
"""
if not key or key == "_default_":
return ""
parts = []
for pair in key.split(","):
if "=" in pair:
k, v = pair.split("=", 1)
# 转义值中的特殊字符
v = v.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n")
parts.append(f'{k}="{v}"')
return ",".join(parts)
def _validate_metric(self, name: str, expected_type: str) -> bool:
"""验证指标是否已定义且类型正确"""
if name not in self.metrics_definitions:
logger.warning(f"指标 '{name}' 未在配置中定义")
return False
if self.metrics_definitions[name]["type"] != expected_type:
logger.warning(
f"指标 '{name}' 类型不匹配: 期望 {expected_type}, "
f"实际 {self.metrics_definitions[name]['type']}"
)
return False
return True
# === 简单 API业务代码使用===
def incr(self, name: str, labels: Optional[Dict[str, str]] = None, value: int = 1) -> None:
"""增加计数器
Args:
name: 指标名称
labels: 标签字典
value: 增加的值,默认为 1
"""
if not self._redis_available:
return
if not self._validate_metric(name, "counter"):
return
try:
key = f"metrics:counter:{name}"
field = self._labels_to_key(labels) or "_default_"
self._redis_client.hincrbyfloat(key, field, value)
except Exception as e:
logger.error(f"增加计数器失败: {e}")
def set(self, name: str, labels: Optional[Dict[str, str]] = None, value: float = 0) -> None:
"""设置仪表盘值
Args:
name: 指标名称
labels: 标签字典
value: 设置的值
"""
if not self._redis_available:
return
if not self._validate_metric(name, "gauge"):
return
try:
key = f"metrics:gauge:{name}"
field = self._labels_to_key(labels) or "_default_"
self._redis_client.hset(key, field, value)
except Exception as e:
logger.error(f"设置仪表盘失败: {e}")
def gauge_incr(
self, name: str, labels: Optional[Dict[str, str]] = None, value: float = 1
) -> None:
"""增加仪表盘值
Args:
name: 指标名称
labels: 标签字典
value: 增加的值
"""
if not self._redis_available:
return
if not self._validate_metric(name, "gauge"):
return
try:
key = f"metrics:gauge:{name}"
field = self._labels_to_key(labels) or "_default_"
self._redis_client.hincrbyfloat(key, field, value)
except Exception as e:
logger.error(f"增加仪表盘失败: {e}")
def gauge_decr(
self, name: str, labels: Optional[Dict[str, str]] = None, value: float = 1
) -> None:
"""减少仪表盘值
Args:
name: 指标名称
labels: 标签字典
value: 减少的值
"""
self.gauge_incr(name, labels, -value)
def observe(self, name: str, labels: Optional[Dict[str, str]] = None, value: float = 0) -> None:
"""记录直方图观测值
Args:
name: 指标名称
labels: 标签字典
value: 观测值
"""
if not self._redis_available:
return
if not self._validate_metric(name, "histogram"):
return
try:
label_key = self._labels_to_key(labels) or "_default_"
buckets = self.metrics_definitions[name].get("buckets", [])
# 使用 pipeline 批量操作
pipe = self._redis_client.pipeline()
# 增加 count
pipe.hincrbyfloat(f"metrics:histogram:{name}:count", label_key, 1)
# 增加 sum
pipe.hincrbyfloat(f"metrics:histogram:{name}:sum", label_key, value)
# 更新各个桶
for bucket in buckets:
if value <= bucket:
bucket_key = f"metrics:histogram:{name}:bucket:{bucket}"
pipe.hincrbyfloat(bucket_key, label_key, 1)
# +Inf 桶总是增加
pipe.hincrbyfloat(f"metrics:histogram:{name}:bucket:+Inf", label_key, 1)
pipe.execute()
except Exception as e:
logger.error(f"记录直方图失败: {e}")
# === 导出方法 ===
def export(self) -> str:
"""导出 Prometheus 格式指标
Returns:
Prometheus 文本格式的指标字符串
"""
if not self._redis_available:
return "# Redis 不可用,无法导出指标\n"
lines: List[str] = []
try:
for name, definition in self.metrics_definitions.items():
metric_type = definition["type"]
description = definition["description"]
# 添加 HELP 和 TYPE
lines.append(f"# HELP {name} {description}")
lines.append(f"# TYPE {name} {metric_type}")
if metric_type == "counter":
lines.extend(self._export_counter(name))
elif metric_type == "gauge":
lines.extend(self._export_gauge(name))
elif metric_type == "histogram":
lines.extend(self._export_histogram(name, definition))
lines.append("") # 空行分隔
except Exception as e:
logger.error(f"导出指标失败: {e}")
return f"# 导出指标失败: {e}\n"
return "\n".join(lines)
def _export_counter(self, name: str) -> List[str]:
"""导出计数器指标"""
lines = []
key = f"metrics:counter:{name}"
data = self._redis_client.hgetall(key)
for field, value in data.items():
if field == "_default_":
lines.append(f"{name} {value}")
else:
prom_labels = self._key_to_prometheus_labels(field)
lines.append(f"{name}{{{prom_labels}}} {value}")
return lines
def _export_gauge(self, name: str) -> List[str]:
"""导出仪表盘指标"""
lines = []
key = f"metrics:gauge:{name}"
data = self._redis_client.hgetall(key)
for field, value in data.items():
if field == "_default_":
lines.append(f"{name} {value}")
else:
prom_labels = self._key_to_prometheus_labels(field)
lines.append(f"{name}{{{prom_labels}}} {value}")
return lines
def _export_histogram(self, name: str, definition: Dict[str, Any]) -> List[str]:
"""导出直方图指标"""
lines = []
buckets = definition.get("buckets", [])
# 获取所有标签组合
count_data = self._redis_client.hgetall(f"metrics:histogram:{name}:count")
sum_data = self._redis_client.hgetall(f"metrics:histogram:{name}:sum")
for label_key in count_data.keys():
prom_labels = self._key_to_prometheus_labels(label_key)
# 导出各个桶
for bucket in buckets:
bucket_key = f"metrics:histogram:{name}:bucket:{bucket}"
bucket_value = self._redis_client.hget(bucket_key, label_key) or "0"
if label_key == "_default_":
lines.append(f'{name}_bucket{{le="{bucket}"}} {bucket_value}')
else:
lines.append(f'{name}_bucket{{{prom_labels},le="{bucket}"}} {bucket_value}')
# +Inf 桶
inf_key = f"metrics:histogram:{name}:bucket:+Inf"
inf_value = self._redis_client.hget(inf_key, label_key) or "0"
if label_key == "_default_":
lines.append(f'{name}_bucket{{le="+Inf"}} {inf_value}')
else:
lines.append(f'{name}_bucket{{{prom_labels},le="+Inf"}} {inf_value}')
# count 和 sum
count_value = count_data.get(label_key, "0")
sum_value = sum_data.get(label_key, "0")
if label_key == "_default_":
lines.append(f"{name}_count {count_value}")
lines.append(f"{name}_sum {sum_value}")
else:
lines.append(f"{name}_count{{{prom_labels}}} {count_value}")
lines.append(f"{name}_sum{{{prom_labels}}} {sum_value}")
return lines
def is_available(self) -> bool:
"""检查 Redis 是否可用"""
return self._redis_available
def reset(self) -> None:
"""重置所有指标(主要用于测试)"""
if not self._redis_available:
return
try:
# 删除所有指标相关的 key
keys = self._redis_client.keys("metrics:*")
if keys:
self._redis_client.delete(*keys)
logger.info("已重置所有指标")
except Exception as e:
logger.error(f"重置指标失败: {e}")
# 全局单例
_manager: Optional[MetricsManager] = None
def get_metrics_manager() -> MetricsManager:
"""获取指标管理器单例"""
global _manager
if _manager is None:
_manager = MetricsManager()
return _manager
def reset_metrics_manager() -> None:
"""重置指标管理器单例(主要用于测试)"""
global _manager
_manager = None
# === 便捷函数(业务代码直接调用)===
def incr(name: str, labels: Optional[Dict[str, str]] = None, value: int = 1) -> None:
"""增加计数器 - 便捷函数
Args:
name: 指标名称
labels: 标签字典
value: 增加的值,默认为 1
"""
get_metrics_manager().incr(name, labels, value)
def set(name: str, labels: Optional[Dict[str, str]] = None, value: float = 0) -> None:
"""设置仪表盘 - 便捷函数
Args:
name: 指标名称
labels: 标签字典
value: 设置的值
"""
get_metrics_manager().set(name, labels, value)
def gauge_incr(name: str, labels: Optional[Dict[str, str]] = None, value: float = 1) -> None:
"""增加仪表盘 - 便捷函数
Args:
name: 指标名称
labels: 标签字典
value: 增加的值
"""
get_metrics_manager().gauge_incr(name, labels, value)
def gauge_decr(name: str, labels: Optional[Dict[str, str]] = None, value: float = 1) -> None:
"""减少仪表盘 - 便捷函数
Args:
name: 指标名称
labels: 标签字典
value: 减少的值
"""
get_metrics_manager().gauge_decr(name, labels, value)
def observe(name: str, labels: Optional[Dict[str, str]] = None, value: float = 0) -> None:
"""记录直方图 - 便捷函数
Args:
name: 指标名称
labels: 标签字典
value: 观测值
"""
get_metrics_manager().observe(name, labels, value)
def export() -> str:
"""导出指标 - 便捷函数
Returns:
Prometheus 文本格式的指标字符串
"""
return get_metrics_manager().export()
def is_available() -> bool:
"""检查 Redis 是否可用 - 便捷函数"""
return get_metrics_manager().is_available()
# === 装饰器(兼容旧 API===
def track_algorithm_execution(algorithm_name: str):
"""装饰器:跟踪算法执行指标
Args:
algorithm_name: 算法名称
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
status = "success"
try:
result = func(*args, **kwargs)
return result
except Exception as e:
status = "error"
raise e
finally:
elapsed = time.time() - start_time
incr("algorithm_executions_total", {"algorithm": algorithm_name, "status": status})
observe(
"algorithm_execution_duration_seconds",
{"algorithm": algorithm_name},
elapsed,
)
return wrapper
return decorator

View File

@@ -3,14 +3,20 @@
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import Response
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
import logging
import time
from .api import router
from .config import settings
from .core.logging import setup_logging
from .core.metrics import metrics_registry, request_counter, request_latency, in_progress_requests
from .core.metrics_unified import (
get_metrics_manager,
incr,
observe,
gauge_incr,
gauge_decr,
export,
)
# 设置日志
setup_logging(level=settings.log_level, format_type=settings.log_format)
@@ -57,7 +63,7 @@ async def track_metrics(request: Request, call_next):
if request.url.path == "/metrics":
return await call_next(request)
in_progress_requests.inc()
gauge_incr("http_requests_in_progress")
start_time = time.time()
status = "success"
@@ -72,16 +78,16 @@ async def track_metrics(request: Request, call_next):
raise e
finally:
elapsed = time.time() - start_time
request_counter.labels(
method=request.method,
endpoint=request.url.path,
status=status
).inc()
request_latency.labels(
method=request.method,
endpoint=request.url.path
).observe(elapsed)
in_progress_requests.dec()
incr(
"http_requests_total",
{"method": request.method, "endpoint": request.url.path, "status": status},
)
observe(
"http_request_duration_seconds",
{"method": request.method, "endpoint": request.url.path},
elapsed,
)
gauge_decr("http_requests_in_progress")
# 注册路由
@@ -105,8 +111,8 @@ async def metrics():
return Response(content="Metrics disabled", status_code=404)
return Response(
content=generate_latest(metrics_registry),
media_type=CONTENT_TYPE_LATEST,
content=export(),
media_type="text/plain; version=0.0.4; charset=utf-8",
)
@@ -118,6 +124,14 @@ async def startup_event():
logger.info(f"Environment: {settings.app_env}")
logger.info(f"Metrics enabled: {settings.metrics_enabled}")
# 初始化指标管理器
if settings.metrics_enabled:
manager = get_metrics_manager()
if manager.is_available():
logger.info("Redis 指标收集已启用")
else:
logger.warning("Redis 不可用,指标将不会被收集")
# 关闭事件
@app.on_event("shutdown")

View File

@@ -0,0 +1,273 @@
"""metrics_unified 模块单元测试"""
import pytest
from unittest.mock import MagicMock, patch
class TestMetricsManager:
"""MetricsManager 类测试"""
@pytest.fixture
def mock_redis(self):
"""模拟 Redis 客户端"""
with patch("redis.Redis") as mock:
mock_instance = MagicMock()
mock_instance.ping.return_value = True
mock_instance.hincrbyfloat.return_value = 1.0
mock_instance.hset.return_value = True
mock_instance.hgetall.return_value = {}
mock_instance.hget.return_value = "0"
mock_instance.keys.return_value = []
mock_instance.pipeline.return_value = MagicMock()
mock.return_value = mock_instance
yield mock_instance
@pytest.fixture
def manager(self, mock_redis):
"""创建测试用的 MetricsManager"""
from src.functional_scaffold.core.metrics_unified import (
MetricsManager,
reset_metrics_manager,
)
reset_metrics_manager()
manager = MetricsManager()
return manager
def test_init_loads_default_config(self, manager):
"""测试初始化加载默认配置"""
assert manager.config is not None
assert "builtin_metrics" in manager.config or len(manager.metrics_definitions) > 0
def test_metrics_definitions_registered(self, manager):
"""测试指标定义已注册"""
assert "http_requests_total" in manager.metrics_definitions
assert "http_request_duration_seconds" in manager.metrics_definitions
assert "algorithm_executions_total" in manager.metrics_definitions
def test_incr_counter(self, manager, mock_redis):
"""测试计数器增加"""
manager.incr("http_requests_total", {"method": "GET", "endpoint": "/", "status": "success"})
mock_redis.hincrbyfloat.assert_called()
def test_incr_with_invalid_metric_type(self, manager, mock_redis):
"""测试对非计数器类型调用 incr"""
# http_request_duration_seconds 是 histogram 类型
manager.incr("http_request_duration_seconds", {})
# 不应该调用 Redis因为类型不匹配
# 验证没有调用 hincrbyfloat或者调用次数没有增加
def test_set_gauge(self, manager, mock_redis):
"""测试设置仪表盘"""
manager.set("http_requests_in_progress", {}, 5)
mock_redis.hset.assert_called()
def test_gauge_incr(self, manager, mock_redis):
"""测试增加仪表盘"""
manager.gauge_incr("http_requests_in_progress", {}, 1)
mock_redis.hincrbyfloat.assert_called()
def test_gauge_decr(self, manager, mock_redis):
"""测试减少仪表盘"""
manager.gauge_decr("http_requests_in_progress", {}, 1)
mock_redis.hincrbyfloat.assert_called()
def test_observe_histogram(self, manager, mock_redis):
"""测试直方图观测"""
mock_pipeline = MagicMock()
mock_redis.pipeline.return_value = mock_pipeline
manager.observe("http_request_duration_seconds", {"method": "GET", "endpoint": "/"}, 0.05)
mock_redis.pipeline.assert_called()
mock_pipeline.execute.assert_called()
def test_labels_to_key(self, manager):
"""测试标签转换为 key"""
labels = {"method": "GET", "endpoint": "/api"}
key = manager._labels_to_key(labels)
assert "method=GET" in key
assert "endpoint=/api" in key
def test_labels_to_key_empty(self, manager):
"""测试空标签转换"""
key = manager._labels_to_key(None)
assert key == ""
key = manager._labels_to_key({})
assert key == ""
def test_is_available(self, manager):
"""测试 Redis 可用性检查"""
assert manager.is_available() is True
class TestConvenienceFunctions:
"""便捷函数测试"""
@pytest.fixture(autouse=True)
def setup(self):
"""每个测试前重置管理器"""
from src.functional_scaffold.core.metrics_unified import reset_metrics_manager
reset_metrics_manager()
@patch("redis.Redis")
def test_incr_function(self, mock_redis_class):
"""测试 incr 便捷函数"""
mock_instance = MagicMock()
mock_instance.ping.return_value = True
mock_redis_class.return_value = mock_instance
from src.functional_scaffold.core.metrics_unified import incr, reset_metrics_manager
reset_metrics_manager()
incr("http_requests_total", {"method": "GET", "endpoint": "/", "status": "success"})
mock_instance.hincrbyfloat.assert_called()
@patch("redis.Redis")
def test_set_function(self, mock_redis_class):
"""测试 set 便捷函数"""
mock_instance = MagicMock()
mock_instance.ping.return_value = True
mock_redis_class.return_value = mock_instance
from src.functional_scaffold.core.metrics_unified import reset_metrics_manager, set
reset_metrics_manager()
set("http_requests_in_progress", {}, 10)
mock_instance.hset.assert_called()
@patch("redis.Redis")
def test_observe_function(self, mock_redis_class):
"""测试 observe 便捷函数"""
mock_instance = MagicMock()
mock_instance.ping.return_value = True
mock_pipeline = MagicMock()
mock_instance.pipeline.return_value = mock_pipeline
mock_redis_class.return_value = mock_instance
from src.functional_scaffold.core.metrics_unified import observe, reset_metrics_manager
reset_metrics_manager()
observe("http_request_duration_seconds", {"method": "GET", "endpoint": "/"}, 0.1)
mock_instance.pipeline.assert_called()
class TestExport:
"""导出功能测试"""
@patch("redis.Redis")
def test_export_counter(self, mock_redis_class):
"""测试导出计数器"""
mock_instance = MagicMock()
mock_instance.ping.return_value = True
mock_instance.hgetall.return_value = {"method=GET,endpoint=/,status=success": "10"}
mock_redis_class.return_value = mock_instance
from src.functional_scaffold.core.metrics_unified import export, reset_metrics_manager
reset_metrics_manager()
output = export()
assert "http_requests_total" in output
assert "HELP" in output
assert "TYPE" in output
@patch("redis.Redis")
def test_export_histogram(self, mock_redis_class):
"""测试导出直方图"""
mock_instance = MagicMock()
mock_instance.ping.return_value = True
mock_instance.hgetall.side_effect = lambda key: (
{"method=GET,endpoint=/": "5"}
if "count" in key
else {"method=GET,endpoint=/": "0.5"}
if "sum" in key
else {}
)
mock_instance.hget.return_value = "3"
mock_redis_class.return_value = mock_instance
from src.functional_scaffold.core.metrics_unified import export, reset_metrics_manager
reset_metrics_manager()
output = export()
assert "http_request_duration_seconds" in output
class TestEnvVarSubstitution:
"""环境变量替换测试"""
def test_substitute_env_vars(self):
"""测试环境变量替换"""
import os
from src.functional_scaffold.core.metrics_unified import MetricsManager
# 设置测试环境变量
os.environ["TEST_VAR"] = "test_value"
manager = MetricsManager.__new__(MetricsManager)
result = manager._substitute_env_vars("${TEST_VAR:default}")
assert result == "test_value"
# 测试默认值
result = manager._substitute_env_vars("${NONEXISTENT_VAR:default_value}")
assert result == "default_value"
# 清理
del os.environ["TEST_VAR"]
class TestTrackAlgorithmExecution:
"""track_algorithm_execution 装饰器测试"""
@patch("redis.Redis")
def test_decorator_success(self, mock_redis_class):
"""测试装饰器成功执行"""
mock_instance = MagicMock()
mock_instance.ping.return_value = True
mock_pipeline = MagicMock()
mock_instance.pipeline.return_value = mock_pipeline
mock_redis_class.return_value = mock_instance
from src.functional_scaffold.core.metrics_unified import (
reset_metrics_manager,
track_algorithm_execution,
)
reset_metrics_manager()
@track_algorithm_execution("test_algo")
def test_func():
return "result"
result = test_func()
assert result == "result"
@patch("redis.Redis")
def test_decorator_error(self, mock_redis_class):
"""测试装饰器错误处理"""
mock_instance = MagicMock()
mock_instance.ping.return_value = True
mock_pipeline = MagicMock()
mock_instance.pipeline.return_value = mock_pipeline
mock_redis_class.return_value = mock_instance
from src.functional_scaffold.core.metrics_unified import (
reset_metrics_manager,
track_algorithm_execution,
)
reset_metrics_manager()
@track_algorithm_execution("test_algo")
def test_func():
raise ValueError("test error")
with pytest.raises(ValueError):
test_func()