main:重构指标系统并切换为 Redis 方案
变更内容: - 重构指标系统实现,支持基于 Redis 的多实例指标管理。 - 替换原有的 Pushgateway 和 Redis Exporter 方案。 - 更新 Prometheus 配置,适配新的指标抓取方式。 - 添加 Redis 指标相关配置和告警规则文件。 - 更新 Dockerfile 和 docker-compose 文件,移除多余服务,精简配置。 - 编写 `metrics_unified.py` 模块及单元测试。 - 修复部分代码中的冗余和格式问题。
This commit is contained in:
@@ -32,7 +32,7 @@ class BaseAlgorithm(ABC):
|
||||
Returns:
|
||||
Dict[str, Any]: 包含结果和元数据的字典
|
||||
"""
|
||||
from ..core.metrics import algorithm_counter, algorithm_latency
|
||||
from ..core.metrics_unified import incr, observe
|
||||
|
||||
start_time = time.time()
|
||||
status = "success"
|
||||
@@ -42,9 +42,7 @@ class BaseAlgorithm(ABC):
|
||||
result = self.process(*args, **kwargs)
|
||||
elapsed_time = time.time() - start_time
|
||||
|
||||
logger.info(
|
||||
f"Algorithm {self.name} completed successfully in {elapsed_time:.3f}s"
|
||||
)
|
||||
logger.info(f"Algorithm {self.name} completed successfully in {elapsed_time:.3f}s")
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
@@ -73,5 +71,5 @@ class BaseAlgorithm(ABC):
|
||||
finally:
|
||||
# 记录算法执行指标
|
||||
elapsed_time = time.time() - start_time
|
||||
algorithm_counter.labels(algorithm=self.name, status=status).inc()
|
||||
algorithm_latency.labels(algorithm=self.name).observe(elapsed_time)
|
||||
incr("algorithm_executions_total", {"algorithm": self.name, "status": status})
|
||||
observe("algorithm_execution_duration_seconds", {"algorithm": self.name}, elapsed_time)
|
||||
|
||||
@@ -8,10 +8,7 @@ from typing import Optional
|
||||
class Settings(BaseSettings):
|
||||
"""应用配置"""
|
||||
|
||||
model_config = ConfigDict(
|
||||
env_file=".env",
|
||||
case_sensitive=False
|
||||
)
|
||||
model_config = ConfigDict(env_file=".env", case_sensitive=False)
|
||||
|
||||
# 应用信息
|
||||
app_name: str = "FunctionalScaffold"
|
||||
@@ -42,6 +39,16 @@ class Settings(BaseSettings):
|
||||
|
||||
database_url: Optional[str] = None
|
||||
|
||||
# Redis 配置
|
||||
redis_host: str = "localhost"
|
||||
redis_port: int = 6379
|
||||
redis_db: int = 0
|
||||
redis_password: Optional[str] = None
|
||||
|
||||
# 指标配置
|
||||
metrics_config_path: str = "config/metrics.yaml"
|
||||
metrics_instance_id: Optional[str] = None # 默认使用 hostname
|
||||
|
||||
|
||||
# 全局配置实例
|
||||
settings = Settings()
|
||||
|
||||
605
src/functional_scaffold/core/metrics_unified.py
Normal file
605
src/functional_scaffold/core/metrics_unified.py
Normal file
@@ -0,0 +1,605 @@
|
||||
"""统一指标管理模块
|
||||
|
||||
基于 Redis 的指标收集方案,支持多实例部署和 YAML 配置。
|
||||
"""
|
||||
|
||||
import os
|
||||
import re
|
||||
import socket
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
from functools import wraps
|
||||
import time
|
||||
|
||||
import yaml
|
||||
import redis
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MetricsManager:
|
||||
"""统一指标管理器
|
||||
|
||||
支持从 YAML 配置文件加载指标定义,使用 Redis 存储指标数据,
|
||||
并导出 Prometheus 格式的指标。
|
||||
"""
|
||||
|
||||
def __init__(self, config_path: Optional[str] = None):
|
||||
"""初始化指标管理器
|
||||
|
||||
Args:
|
||||
config_path: 配置文件路径,默认从 settings 获取
|
||||
"""
|
||||
from ..config import settings
|
||||
|
||||
self.config_path = config_path or settings.metrics_config_path
|
||||
self.instance_id = settings.metrics_instance_id or socket.gethostname()
|
||||
self.config: Dict[str, Any] = {}
|
||||
self.metrics_definitions: Dict[str, Dict[str, Any]] = {}
|
||||
self._redis_client: Optional[redis.Redis] = None
|
||||
self._redis_available = False
|
||||
|
||||
# 加载配置
|
||||
self._load_config()
|
||||
# 初始化 Redis 连接
|
||||
self._init_redis()
|
||||
# 注册指标定义
|
||||
self._register_metrics()
|
||||
|
||||
def _load_config(self) -> None:
|
||||
"""加载 YAML 配置文件"""
|
||||
# 尝试多个路径
|
||||
paths_to_try = [
|
||||
Path(self.config_path),
|
||||
Path.cwd() / self.config_path,
|
||||
Path(__file__).parent.parent.parent.parent / self.config_path,
|
||||
]
|
||||
|
||||
for path in paths_to_try:
|
||||
if path.exists():
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
# 处理环境变量替换 ${VAR:default}
|
||||
content = self._substitute_env_vars(content)
|
||||
self.config = yaml.safe_load(content) or {}
|
||||
logger.info(f"已加载指标配置文件: {path}")
|
||||
return
|
||||
|
||||
logger.warning(f"未找到指标配置文件: {self.config_path},使用默认配置")
|
||||
self.config = self._get_default_config()
|
||||
|
||||
def _substitute_env_vars(self, content: str) -> str:
|
||||
"""替换配置中的环境变量
|
||||
|
||||
支持格式: ${VAR_NAME:default_value}
|
||||
"""
|
||||
pattern = r"\$\{([^}:]+)(?::([^}]*))?\}"
|
||||
|
||||
def replacer(match):
|
||||
var_name = match.group(1)
|
||||
default_value = match.group(2) or ""
|
||||
return os.environ.get(var_name, default_value)
|
||||
|
||||
return re.sub(pattern, replacer, content)
|
||||
|
||||
def _get_default_config(self) -> Dict[str, Any]:
|
||||
"""获取默认配置"""
|
||||
return {
|
||||
"redis": {
|
||||
"host": "localhost",
|
||||
"port": 6379,
|
||||
"db": 0,
|
||||
"password": "",
|
||||
},
|
||||
"global": {
|
||||
"prefix": "functional_scaffold",
|
||||
"instance_label": True,
|
||||
},
|
||||
"builtin_metrics": {
|
||||
"http_requests": {
|
||||
"enabled": True,
|
||||
"name": "http_requests_total",
|
||||
"type": "counter",
|
||||
"description": "HTTP 请求总数",
|
||||
"labels": ["method", "endpoint", "status"],
|
||||
},
|
||||
"http_latency": {
|
||||
"enabled": True,
|
||||
"name": "http_request_duration_seconds",
|
||||
"type": "histogram",
|
||||
"description": "HTTP 请求延迟",
|
||||
"labels": ["method", "endpoint"],
|
||||
"buckets": [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10],
|
||||
},
|
||||
"http_in_progress": {
|
||||
"enabled": True,
|
||||
"name": "http_requests_in_progress",
|
||||
"type": "gauge",
|
||||
"description": "当前进行中的 HTTP 请求数",
|
||||
"labels": [],
|
||||
},
|
||||
"algorithm_executions": {
|
||||
"enabled": True,
|
||||
"name": "algorithm_executions_total",
|
||||
"type": "counter",
|
||||
"description": "算法执行总数",
|
||||
"labels": ["algorithm", "status"],
|
||||
},
|
||||
"algorithm_latency": {
|
||||
"enabled": True,
|
||||
"name": "algorithm_execution_duration_seconds",
|
||||
"type": "histogram",
|
||||
"description": "算法执行延迟",
|
||||
"labels": ["algorithm"],
|
||||
"buckets": [0.01, 0.05, 0.1, 0.5, 1, 5, 10, 30, 60],
|
||||
},
|
||||
},
|
||||
"custom_metrics": {},
|
||||
}
|
||||
|
||||
def _init_redis(self) -> None:
|
||||
"""初始化 Redis 连接"""
|
||||
from ..config import settings
|
||||
|
||||
redis_config = self.config.get("redis", {})
|
||||
host = redis_config.get("host") or settings.redis_host
|
||||
port = int(redis_config.get("port") or settings.redis_port)
|
||||
db = int(redis_config.get("db") or settings.redis_db)
|
||||
password = redis_config.get("password") or settings.redis_password
|
||||
|
||||
try:
|
||||
self._redis_client = redis.Redis(
|
||||
host=host,
|
||||
port=port,
|
||||
db=db,
|
||||
password=password if password else None,
|
||||
decode_responses=True,
|
||||
socket_connect_timeout=5,
|
||||
socket_timeout=5,
|
||||
)
|
||||
# 测试连接
|
||||
self._redis_client.ping()
|
||||
self._redis_available = True
|
||||
logger.info(f"Redis 连接成功: {host}:{port}/{db}")
|
||||
except redis.ConnectionError as e:
|
||||
logger.warning(f"Redis 连接失败: {e},指标将不会被收集")
|
||||
self._redis_available = False
|
||||
except Exception as e:
|
||||
logger.warning(f"Redis 初始化异常: {e},指标将不会被收集")
|
||||
self._redis_available = False
|
||||
|
||||
def _register_metrics(self) -> None:
|
||||
"""注册所有指标定义"""
|
||||
# 注册内置指标
|
||||
builtin = self.config.get("builtin_metrics", {})
|
||||
for key, metric_def in builtin.items():
|
||||
if metric_def.get("enabled", True):
|
||||
name = metric_def.get("name", key)
|
||||
self.metrics_definitions[name] = {
|
||||
"type": metric_def.get("type", "counter"),
|
||||
"description": metric_def.get("description", ""),
|
||||
"labels": metric_def.get("labels", []),
|
||||
"buckets": metric_def.get("buckets", []),
|
||||
}
|
||||
|
||||
# 注册自定义指标
|
||||
custom = self.config.get("custom_metrics", {})
|
||||
for key, metric_def in custom.items():
|
||||
name = metric_def.get("name", key)
|
||||
self.metrics_definitions[name] = {
|
||||
"type": metric_def.get("type", "counter"),
|
||||
"description": metric_def.get("description", ""),
|
||||
"labels": metric_def.get("labels", []),
|
||||
"buckets": metric_def.get("buckets", []),
|
||||
}
|
||||
|
||||
logger.info(f"已注册 {len(self.metrics_definitions)} 个指标定义")
|
||||
|
||||
def _labels_to_key(self, labels: Optional[Dict[str, str]]) -> str:
|
||||
"""将标签字典转换为 Redis key 的一部分"""
|
||||
if not labels:
|
||||
return ""
|
||||
sorted_items = sorted(labels.items())
|
||||
return ",".join(f"{k}={v}" for k, v in sorted_items)
|
||||
|
||||
def _key_to_prometheus_labels(self, key: str) -> str:
|
||||
"""将 Redis key 格式转换为 Prometheus 标签格式(带引号)
|
||||
|
||||
输入: endpoint=/healthz,method=GET,status=success
|
||||
输出: endpoint="/healthz",method="GET",status="success"
|
||||
"""
|
||||
if not key or key == "_default_":
|
||||
return ""
|
||||
parts = []
|
||||
for pair in key.split(","):
|
||||
if "=" in pair:
|
||||
k, v = pair.split("=", 1)
|
||||
# 转义值中的特殊字符
|
||||
v = v.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n")
|
||||
parts.append(f'{k}="{v}"')
|
||||
return ",".join(parts)
|
||||
|
||||
def _validate_metric(self, name: str, expected_type: str) -> bool:
|
||||
"""验证指标是否已定义且类型正确"""
|
||||
if name not in self.metrics_definitions:
|
||||
logger.warning(f"指标 '{name}' 未在配置中定义")
|
||||
return False
|
||||
if self.metrics_definitions[name]["type"] != expected_type:
|
||||
logger.warning(
|
||||
f"指标 '{name}' 类型不匹配: 期望 {expected_type}, "
|
||||
f"实际 {self.metrics_definitions[name]['type']}"
|
||||
)
|
||||
return False
|
||||
return True
|
||||
|
||||
# === 简单 API(业务代码使用)===
|
||||
|
||||
def incr(self, name: str, labels: Optional[Dict[str, str]] = None, value: int = 1) -> None:
|
||||
"""增加计数器
|
||||
|
||||
Args:
|
||||
name: 指标名称
|
||||
labels: 标签字典
|
||||
value: 增加的值,默认为 1
|
||||
"""
|
||||
if not self._redis_available:
|
||||
return
|
||||
|
||||
if not self._validate_metric(name, "counter"):
|
||||
return
|
||||
|
||||
try:
|
||||
key = f"metrics:counter:{name}"
|
||||
field = self._labels_to_key(labels) or "_default_"
|
||||
self._redis_client.hincrbyfloat(key, field, value)
|
||||
except Exception as e:
|
||||
logger.error(f"增加计数器失败: {e}")
|
||||
|
||||
def set(self, name: str, labels: Optional[Dict[str, str]] = None, value: float = 0) -> None:
|
||||
"""设置仪表盘值
|
||||
|
||||
Args:
|
||||
name: 指标名称
|
||||
labels: 标签字典
|
||||
value: 设置的值
|
||||
"""
|
||||
if not self._redis_available:
|
||||
return
|
||||
|
||||
if not self._validate_metric(name, "gauge"):
|
||||
return
|
||||
|
||||
try:
|
||||
key = f"metrics:gauge:{name}"
|
||||
field = self._labels_to_key(labels) or "_default_"
|
||||
self._redis_client.hset(key, field, value)
|
||||
except Exception as e:
|
||||
logger.error(f"设置仪表盘失败: {e}")
|
||||
|
||||
def gauge_incr(
|
||||
self, name: str, labels: Optional[Dict[str, str]] = None, value: float = 1
|
||||
) -> None:
|
||||
"""增加仪表盘值
|
||||
|
||||
Args:
|
||||
name: 指标名称
|
||||
labels: 标签字典
|
||||
value: 增加的值
|
||||
"""
|
||||
if not self._redis_available:
|
||||
return
|
||||
|
||||
if not self._validate_metric(name, "gauge"):
|
||||
return
|
||||
|
||||
try:
|
||||
key = f"metrics:gauge:{name}"
|
||||
field = self._labels_to_key(labels) or "_default_"
|
||||
self._redis_client.hincrbyfloat(key, field, value)
|
||||
except Exception as e:
|
||||
logger.error(f"增加仪表盘失败: {e}")
|
||||
|
||||
def gauge_decr(
|
||||
self, name: str, labels: Optional[Dict[str, str]] = None, value: float = 1
|
||||
) -> None:
|
||||
"""减少仪表盘值
|
||||
|
||||
Args:
|
||||
name: 指标名称
|
||||
labels: 标签字典
|
||||
value: 减少的值
|
||||
"""
|
||||
self.gauge_incr(name, labels, -value)
|
||||
|
||||
def observe(self, name: str, labels: Optional[Dict[str, str]] = None, value: float = 0) -> None:
|
||||
"""记录直方图观测值
|
||||
|
||||
Args:
|
||||
name: 指标名称
|
||||
labels: 标签字典
|
||||
value: 观测值
|
||||
"""
|
||||
if not self._redis_available:
|
||||
return
|
||||
|
||||
if not self._validate_metric(name, "histogram"):
|
||||
return
|
||||
|
||||
try:
|
||||
label_key = self._labels_to_key(labels) or "_default_"
|
||||
buckets = self.metrics_definitions[name].get("buckets", [])
|
||||
|
||||
# 使用 pipeline 批量操作
|
||||
pipe = self._redis_client.pipeline()
|
||||
|
||||
# 增加 count
|
||||
pipe.hincrbyfloat(f"metrics:histogram:{name}:count", label_key, 1)
|
||||
|
||||
# 增加 sum
|
||||
pipe.hincrbyfloat(f"metrics:histogram:{name}:sum", label_key, value)
|
||||
|
||||
# 更新各个桶
|
||||
for bucket in buckets:
|
||||
if value <= bucket:
|
||||
bucket_key = f"metrics:histogram:{name}:bucket:{bucket}"
|
||||
pipe.hincrbyfloat(bucket_key, label_key, 1)
|
||||
|
||||
# +Inf 桶总是增加
|
||||
pipe.hincrbyfloat(f"metrics:histogram:{name}:bucket:+Inf", label_key, 1)
|
||||
|
||||
pipe.execute()
|
||||
except Exception as e:
|
||||
logger.error(f"记录直方图失败: {e}")
|
||||
|
||||
# === 导出方法 ===
|
||||
|
||||
def export(self) -> str:
|
||||
"""导出 Prometheus 格式指标
|
||||
|
||||
Returns:
|
||||
Prometheus 文本格式的指标字符串
|
||||
"""
|
||||
if not self._redis_available:
|
||||
return "# Redis 不可用,无法导出指标\n"
|
||||
|
||||
lines: List[str] = []
|
||||
|
||||
try:
|
||||
for name, definition in self.metrics_definitions.items():
|
||||
metric_type = definition["type"]
|
||||
description = definition["description"]
|
||||
|
||||
# 添加 HELP 和 TYPE
|
||||
lines.append(f"# HELP {name} {description}")
|
||||
lines.append(f"# TYPE {name} {metric_type}")
|
||||
|
||||
if metric_type == "counter":
|
||||
lines.extend(self._export_counter(name))
|
||||
elif metric_type == "gauge":
|
||||
lines.extend(self._export_gauge(name))
|
||||
elif metric_type == "histogram":
|
||||
lines.extend(self._export_histogram(name, definition))
|
||||
|
||||
lines.append("") # 空行分隔
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"导出指标失败: {e}")
|
||||
return f"# 导出指标失败: {e}\n"
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
def _export_counter(self, name: str) -> List[str]:
|
||||
"""导出计数器指标"""
|
||||
lines = []
|
||||
key = f"metrics:counter:{name}"
|
||||
|
||||
data = self._redis_client.hgetall(key)
|
||||
for field, value in data.items():
|
||||
if field == "_default_":
|
||||
lines.append(f"{name} {value}")
|
||||
else:
|
||||
prom_labels = self._key_to_prometheus_labels(field)
|
||||
lines.append(f"{name}{{{prom_labels}}} {value}")
|
||||
|
||||
return lines
|
||||
|
||||
def _export_gauge(self, name: str) -> List[str]:
|
||||
"""导出仪表盘指标"""
|
||||
lines = []
|
||||
key = f"metrics:gauge:{name}"
|
||||
|
||||
data = self._redis_client.hgetall(key)
|
||||
for field, value in data.items():
|
||||
if field == "_default_":
|
||||
lines.append(f"{name} {value}")
|
||||
else:
|
||||
prom_labels = self._key_to_prometheus_labels(field)
|
||||
lines.append(f"{name}{{{prom_labels}}} {value}")
|
||||
|
||||
return lines
|
||||
|
||||
def _export_histogram(self, name: str, definition: Dict[str, Any]) -> List[str]:
|
||||
"""导出直方图指标"""
|
||||
lines = []
|
||||
buckets = definition.get("buckets", [])
|
||||
|
||||
# 获取所有标签组合
|
||||
count_data = self._redis_client.hgetall(f"metrics:histogram:{name}:count")
|
||||
sum_data = self._redis_client.hgetall(f"metrics:histogram:{name}:sum")
|
||||
|
||||
for label_key in count_data.keys():
|
||||
prom_labels = self._key_to_prometheus_labels(label_key)
|
||||
|
||||
# 导出各个桶
|
||||
for bucket in buckets:
|
||||
bucket_key = f"metrics:histogram:{name}:bucket:{bucket}"
|
||||
bucket_value = self._redis_client.hget(bucket_key, label_key) or "0"
|
||||
if label_key == "_default_":
|
||||
lines.append(f'{name}_bucket{{le="{bucket}"}} {bucket_value}')
|
||||
else:
|
||||
lines.append(f'{name}_bucket{{{prom_labels},le="{bucket}"}} {bucket_value}')
|
||||
|
||||
# +Inf 桶
|
||||
inf_key = f"metrics:histogram:{name}:bucket:+Inf"
|
||||
inf_value = self._redis_client.hget(inf_key, label_key) or "0"
|
||||
if label_key == "_default_":
|
||||
lines.append(f'{name}_bucket{{le="+Inf"}} {inf_value}')
|
||||
else:
|
||||
lines.append(f'{name}_bucket{{{prom_labels},le="+Inf"}} {inf_value}')
|
||||
|
||||
# count 和 sum
|
||||
count_value = count_data.get(label_key, "0")
|
||||
sum_value = sum_data.get(label_key, "0")
|
||||
if label_key == "_default_":
|
||||
lines.append(f"{name}_count {count_value}")
|
||||
lines.append(f"{name}_sum {sum_value}")
|
||||
else:
|
||||
lines.append(f"{name}_count{{{prom_labels}}} {count_value}")
|
||||
lines.append(f"{name}_sum{{{prom_labels}}} {sum_value}")
|
||||
|
||||
return lines
|
||||
|
||||
def is_available(self) -> bool:
|
||||
"""检查 Redis 是否可用"""
|
||||
return self._redis_available
|
||||
|
||||
def reset(self) -> None:
|
||||
"""重置所有指标(主要用于测试)"""
|
||||
if not self._redis_available:
|
||||
return
|
||||
|
||||
try:
|
||||
# 删除所有指标相关的 key
|
||||
keys = self._redis_client.keys("metrics:*")
|
||||
if keys:
|
||||
self._redis_client.delete(*keys)
|
||||
logger.info("已重置所有指标")
|
||||
except Exception as e:
|
||||
logger.error(f"重置指标失败: {e}")
|
||||
|
||||
|
||||
# 全局单例
|
||||
_manager: Optional[MetricsManager] = None
|
||||
|
||||
|
||||
def get_metrics_manager() -> MetricsManager:
|
||||
"""获取指标管理器单例"""
|
||||
global _manager
|
||||
if _manager is None:
|
||||
_manager = MetricsManager()
|
||||
return _manager
|
||||
|
||||
|
||||
def reset_metrics_manager() -> None:
|
||||
"""重置指标管理器单例(主要用于测试)"""
|
||||
global _manager
|
||||
_manager = None
|
||||
|
||||
|
||||
# === 便捷函数(业务代码直接调用)===
|
||||
|
||||
|
||||
def incr(name: str, labels: Optional[Dict[str, str]] = None, value: int = 1) -> None:
|
||||
"""增加计数器 - 便捷函数
|
||||
|
||||
Args:
|
||||
name: 指标名称
|
||||
labels: 标签字典
|
||||
value: 增加的值,默认为 1
|
||||
"""
|
||||
get_metrics_manager().incr(name, labels, value)
|
||||
|
||||
|
||||
def set(name: str, labels: Optional[Dict[str, str]] = None, value: float = 0) -> None:
|
||||
"""设置仪表盘 - 便捷函数
|
||||
|
||||
Args:
|
||||
name: 指标名称
|
||||
labels: 标签字典
|
||||
value: 设置的值
|
||||
"""
|
||||
get_metrics_manager().set(name, labels, value)
|
||||
|
||||
|
||||
def gauge_incr(name: str, labels: Optional[Dict[str, str]] = None, value: float = 1) -> None:
|
||||
"""增加仪表盘 - 便捷函数
|
||||
|
||||
Args:
|
||||
name: 指标名称
|
||||
labels: 标签字典
|
||||
value: 增加的值
|
||||
"""
|
||||
get_metrics_manager().gauge_incr(name, labels, value)
|
||||
|
||||
|
||||
def gauge_decr(name: str, labels: Optional[Dict[str, str]] = None, value: float = 1) -> None:
|
||||
"""减少仪表盘 - 便捷函数
|
||||
|
||||
Args:
|
||||
name: 指标名称
|
||||
labels: 标签字典
|
||||
value: 减少的值
|
||||
"""
|
||||
get_metrics_manager().gauge_decr(name, labels, value)
|
||||
|
||||
|
||||
def observe(name: str, labels: Optional[Dict[str, str]] = None, value: float = 0) -> None:
|
||||
"""记录直方图 - 便捷函数
|
||||
|
||||
Args:
|
||||
name: 指标名称
|
||||
labels: 标签字典
|
||||
value: 观测值
|
||||
"""
|
||||
get_metrics_manager().observe(name, labels, value)
|
||||
|
||||
|
||||
def export() -> str:
|
||||
"""导出指标 - 便捷函数
|
||||
|
||||
Returns:
|
||||
Prometheus 文本格式的指标字符串
|
||||
"""
|
||||
return get_metrics_manager().export()
|
||||
|
||||
|
||||
def is_available() -> bool:
|
||||
"""检查 Redis 是否可用 - 便捷函数"""
|
||||
return get_metrics_manager().is_available()
|
||||
|
||||
|
||||
# === 装饰器(兼容旧 API)===
|
||||
|
||||
|
||||
def track_algorithm_execution(algorithm_name: str):
|
||||
"""装饰器:跟踪算法执行指标
|
||||
|
||||
Args:
|
||||
algorithm_name: 算法名称
|
||||
"""
|
||||
|
||||
def decorator(func):
|
||||
@wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
start_time = time.time()
|
||||
status = "success"
|
||||
|
||||
try:
|
||||
result = func(*args, **kwargs)
|
||||
return result
|
||||
except Exception as e:
|
||||
status = "error"
|
||||
raise e
|
||||
finally:
|
||||
elapsed = time.time() - start_time
|
||||
incr("algorithm_executions_total", {"algorithm": algorithm_name, "status": status})
|
||||
observe(
|
||||
"algorithm_execution_duration_seconds",
|
||||
{"algorithm": algorithm_name},
|
||||
elapsed,
|
||||
)
|
||||
|
||||
return wrapper
|
||||
|
||||
return decorator
|
||||
@@ -3,14 +3,20 @@
|
||||
from fastapi import FastAPI, Request
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.responses import Response
|
||||
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
|
||||
import logging
|
||||
import time
|
||||
|
||||
from .api import router
|
||||
from .config import settings
|
||||
from .core.logging import setup_logging
|
||||
from .core.metrics import metrics_registry, request_counter, request_latency, in_progress_requests
|
||||
from .core.metrics_unified import (
|
||||
get_metrics_manager,
|
||||
incr,
|
||||
observe,
|
||||
gauge_incr,
|
||||
gauge_decr,
|
||||
export,
|
||||
)
|
||||
|
||||
# 设置日志
|
||||
setup_logging(level=settings.log_level, format_type=settings.log_format)
|
||||
@@ -57,7 +63,7 @@ async def track_metrics(request: Request, call_next):
|
||||
if request.url.path == "/metrics":
|
||||
return await call_next(request)
|
||||
|
||||
in_progress_requests.inc()
|
||||
gauge_incr("http_requests_in_progress")
|
||||
start_time = time.time()
|
||||
status = "success"
|
||||
|
||||
@@ -72,16 +78,16 @@ async def track_metrics(request: Request, call_next):
|
||||
raise e
|
||||
finally:
|
||||
elapsed = time.time() - start_time
|
||||
request_counter.labels(
|
||||
method=request.method,
|
||||
endpoint=request.url.path,
|
||||
status=status
|
||||
).inc()
|
||||
request_latency.labels(
|
||||
method=request.method,
|
||||
endpoint=request.url.path
|
||||
).observe(elapsed)
|
||||
in_progress_requests.dec()
|
||||
incr(
|
||||
"http_requests_total",
|
||||
{"method": request.method, "endpoint": request.url.path, "status": status},
|
||||
)
|
||||
observe(
|
||||
"http_request_duration_seconds",
|
||||
{"method": request.method, "endpoint": request.url.path},
|
||||
elapsed,
|
||||
)
|
||||
gauge_decr("http_requests_in_progress")
|
||||
|
||||
|
||||
# 注册路由
|
||||
@@ -105,8 +111,8 @@ async def metrics():
|
||||
return Response(content="Metrics disabled", status_code=404)
|
||||
|
||||
return Response(
|
||||
content=generate_latest(metrics_registry),
|
||||
media_type=CONTENT_TYPE_LATEST,
|
||||
content=export(),
|
||||
media_type="text/plain; version=0.0.4; charset=utf-8",
|
||||
)
|
||||
|
||||
|
||||
@@ -118,6 +124,14 @@ async def startup_event():
|
||||
logger.info(f"Environment: {settings.app_env}")
|
||||
logger.info(f"Metrics enabled: {settings.metrics_enabled}")
|
||||
|
||||
# 初始化指标管理器
|
||||
if settings.metrics_enabled:
|
||||
manager = get_metrics_manager()
|
||||
if manager.is_available():
|
||||
logger.info("Redis 指标收集已启用")
|
||||
else:
|
||||
logger.warning("Redis 不可用,指标将不会被收集")
|
||||
|
||||
|
||||
# 关闭事件
|
||||
@app.on_event("shutdown")
|
||||
|
||||
Reference in New Issue
Block a user