From 241cffebc2f116490d3b45781ba598d7bff9b6eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roog=20=28=E9=A1=BE=E6=96=B0=E5=9F=B9=29?= Date: Mon, 2 Feb 2026 13:30:28 +0800 Subject: [PATCH] =?UTF-8?q?main:=E9=87=8D=E6=9E=84=E6=8C=87=E6=A0=87?= =?UTF-8?q?=E7=B3=BB=E7=BB=9F=E5=B9=B6=E5=88=87=E6=8D=A2=E4=B8=BA=20Redis?= =?UTF-8?q?=20=E6=96=B9=E6=A1=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 变更内容: - 重构指标系统实现,支持基于 Redis 的多实例指标管理。 - 替换原有的 Pushgateway 和 Redis Exporter 方案。 - 更新 Prometheus 配置,适配新的指标抓取方式。 - 添加 Redis 指标相关配置和告警规则文件。 - 更新 Dockerfile 和 docker-compose 文件,移除多余服务,精简配置。 - 编写 `metrics_unified.py` 模块及单元测试。 - 修复部分代码中的冗余和格式问题。 --- config/metrics.yaml | 78 +++ deployment/Dockerfile | 5 +- deployment/docker-compose.yml | 50 +- monitoring/alerts/rules.yaml | 34 +- monitoring/prometheus.yml | 24 +- requirements.txt | 3 + src/functional_scaffold/algorithms/base.py | 10 +- src/functional_scaffold/config.py | 15 +- .../core/metrics_unified.py | 605 ++++++++++++++++++ src/functional_scaffold/main.py | 44 +- tests/test_metrics_unified.py | 273 ++++++++ 11 files changed, 1047 insertions(+), 94 deletions(-) create mode 100644 config/metrics.yaml create mode 100644 src/functional_scaffold/core/metrics_unified.py create mode 100644 tests/test_metrics_unified.py diff --git a/config/metrics.yaml b/config/metrics.yaml new file mode 100644 index 0000000..a7b2ff5 --- /dev/null +++ b/config/metrics.yaml @@ -0,0 +1,78 @@ +# 指标配置文件 +# 算法成员可以在此添加自定义指标 + +# Redis 连接配置(也可通过环境变量覆盖) +redis: + host: ${REDIS_HOST:localhost} + port: ${REDIS_PORT:6379} + db: ${REDIS_METRICS_DB:0} + password: ${REDIS_PASSWORD:} + +# 全局配置 +global: + prefix: "functional_scaffold" # 指标名称前缀 + instance_label: true # 是否添加实例标签 + +# 内置指标(框架自动收集) +builtin_metrics: + http_requests: + enabled: true + name: "http_requests_total" + type: counter + description: "HTTP 请求总数" + labels: [method, endpoint, status] + + http_latency: + enabled: true + name: "http_request_duration_seconds" + type: histogram + description: "HTTP 请求延迟" + labels: [method, endpoint] + buckets: [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10] + + http_in_progress: + enabled: true + name: "http_requests_in_progress" + type: gauge + description: "当前进行中的 HTTP 请求数" + labels: [] + + algorithm_executions: + enabled: true + name: "algorithm_executions_total" + type: counter + description: "算法执行总数" + labels: [algorithm, status] + + algorithm_latency: + enabled: true + name: "algorithm_execution_duration_seconds" + type: histogram + description: "算法执行延迟" + labels: [algorithm] + buckets: [0.01, 0.05, 0.1, 0.5, 1, 5, 10, 30, 60] + +# 自定义指标(算法成员在此添加) +custom_metrics: + # 示例:质数判断结果统计 + prime_check_results: + name: "prime_check_results_total" + type: counter + description: "质数判断结果统计" + labels: [is_prime] + + # 示例:输入数字大小分布 + input_number_size: + name: "input_number_size" + type: histogram + description: "输入数字大小分布" + labels: [] + buckets: [10, 100, 1000, 10000, 100000, 1000000] + + # 添加更多自定义指标... + # my_custom_metric: + # name: "my_metric_name" + # type: counter|gauge|histogram + # description: "指标描述" + # labels: [label1, label2] + # buckets: [...] # 仅 histogram 需要 diff --git a/deployment/Dockerfile b/deployment/Dockerfile index bdca21d..c954b95 100644 --- a/deployment/Dockerfile +++ b/deployment/Dockerfile @@ -9,12 +9,15 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ # 复制依赖文件 COPY requirements.txt . +COPY requirements-dev.txt . # 安装 Python 依赖 RUN pip install --no-cache-dir -r requirements.txt +RUN pip install --no-cache-dir -r requirements-dev.txt -# 复制应用代码 +# 复制应用代码和配置 COPY src/ ./src/ +COPY config/ ./config/ # 创建非 root 用户 RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app diff --git a/deployment/docker-compose.yml b/deployment/docker-compose.yml index 8a94e58..5f214ab 100644 --- a/deployment/docker-compose.yml +++ b/deployment/docker-compose.yml @@ -11,19 +11,19 @@ services: - APP_ENV=development - LOG_LEVEL=INFO - METRICS_ENABLED=true - # 方案1:Pushgateway 配置 - - PUSHGATEWAY_URL=pushgateway:9091 - - METRICS_JOB_NAME=functional_scaffold - # 方案2:Redis 配置 + # Redis 指标存储配置 - REDIS_HOST=redis - REDIS_PORT=6379 - - REDIS_METRICS_DB=0 + - REDIS_DB=0 + # 指标配置文件路径 + - METRICS_CONFIG_PATH=config/metrics.yaml volumes: - ../src:/app/src + - ../config:/app/config restart: unless-stopped depends_on: - - redis - - pushgateway + redis: + condition: service_healthy healthcheck: test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/healthz')"] interval: 30s @@ -31,11 +31,11 @@ services: retries: 3 start_period: 5s - # Redis - 用于集中式指标存储(方案2) + # Redis - 用于集中式指标存储 redis: image: redis:7-alpine ports: - - "6379:6379" + - "6380:6379" volumes: - redis_data:/data command: redis-server --appendonly yes @@ -46,47 +46,20 @@ services: timeout: 3s retries: 3 - # Pushgateway - 用于短生命周期任务的指标推送(方案1,推荐) - pushgateway: - image: prom/pushgateway:latest - ports: - - "9091:9091" - restart: unless-stopped - command: - - '--persistence.file=/data/pushgateway.data' - - '--persistence.interval=5m' - volumes: - - pushgateway_data:/data - - # Redis Exporter - 将 Redis 指标导出为 Prometheus 格式(方案2需要) - redis-exporter: - build: - context: .. - dockerfile: deployment/Dockerfile.redis-exporter - ports: - - "8001:8001" - environment: - - REDIS_HOST=redis - - REDIS_PORT=6379 - - REDIS_METRICS_DB=0 - depends_on: - - redis - restart: unless-stopped - prometheus: image: prom/prometheus:latest ports: - "9090:9090" volumes: - ../monitoring/prometheus.yml:/etc/prometheus/prometheus.yml + - ../monitoring/alerts:/etc/prometheus/rules - prometheus_data:/prometheus command: - '--config.file=/etc/prometheus/prometheus.yml' - '--storage.tsdb.path=/prometheus' restart: unless-stopped depends_on: - - pushgateway - - redis-exporter + - app grafana: image: grafana/grafana:latest @@ -105,4 +78,3 @@ volumes: prometheus_data: grafana_data: redis_data: - pushgateway_data: diff --git a/monitoring/alerts/rules.yaml b/monitoring/alerts/rules.yaml index d1e7a67..f22862c 100644 --- a/monitoring/alerts/rules.yaml +++ b/monitoring/alerts/rules.yaml @@ -2,38 +2,52 @@ groups: - name: functional_scaffold_alerts interval: 30s rules: + # 高错误率告警 - alert: HighErrorRate expr: rate(http_requests_total{status="error"}[5m]) > 0.05 for: 5m labels: severity: warning annotations: - summary: "High error rate detected" - description: "Error rate is {{ $value }} requests/sec for {{ $labels.endpoint }}" + summary: "检测到高错误率" + description: "端点 {{ $labels.endpoint }} 的错误率为 {{ $value }} 请求/秒" + # 高延迟告警 - alert: HighLatency expr: histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m])) > 1 for: 5m labels: severity: warning annotations: - summary: "High latency detected" - description: "P95 latency is {{ $value }}s for {{ $labels.endpoint }}" + summary: "检测到高延迟" + description: "端点 {{ $labels.endpoint }} 的 P95 延迟为 {{ $value }}s" + # 服务不可用告警 - alert: ServiceDown expr: up{job="functional-scaffold"} == 0 for: 1m labels: severity: critical annotations: - summary: "Service is down" - description: "FunctionalScaffold service has been down for more than 1 minute" + summary: "服务不可用" + description: "FunctionalScaffold 服务已停止超过 1 分钟" - - alert: HighMemoryUsage - expr: container_memory_usage_bytes{container="functional-scaffold"} / container_spec_memory_limit_bytes{container="functional-scaffold"} > 0.9 + # 算法执行失败率告警 + - alert: HighAlgorithmFailureRate + expr: rate(algorithm_executions_total{status="error"}[5m]) / rate(algorithm_executions_total[5m]) > 0.1 for: 5m labels: severity: warning annotations: - summary: "High memory usage" - description: "Memory usage is {{ $value | humanizePercentage }} of limit" + summary: "算法执行失败率过高" + description: "算法 {{ $labels.algorithm }} 的失败率超过 10%" + + # 算法执行延迟告警 + - alert: HighAlgorithmLatency + expr: histogram_quantile(0.95, rate(algorithm_execution_duration_seconds_bucket[5m])) > 5 + for: 5m + labels: + severity: warning + annotations: + summary: "算法执行延迟过高" + description: "算法 {{ $labels.algorithm }} 的 P95 延迟为 {{ $value }}s" diff --git a/monitoring/prometheus.yml b/monitoring/prometheus.yml index dbe523c..3e7af82 100644 --- a/monitoring/prometheus.yml +++ b/monitoring/prometheus.yml @@ -8,27 +8,13 @@ global: # 抓取配置 scrape_configs: - # 方案1:从 Pushgateway 抓取指标(推荐) - - job_name: 'pushgateway' - honor_labels: true - static_configs: - - targets: ['pushgateway:9091'] - metric_relabel_configs: - # 保留 instance 标签 - - source_labels: [instance] - target_label: instance - action: replace - - # 方案2:从 Redis Exporter 抓取指标 - - job_name: 'redis-exporter' - static_configs: - - targets: ['redis-exporter:8001'] - - # 直接从应用实例抓取(如果有多个实例,需要配置服务发现) - - job_name: 'app' + # 从应用实例抓取指标(Redis 统一指标方案) + # 应用通过 /metrics 端点从 Redis 读取并导出 Prometheus 格式指标 + - job_name: 'functional-scaffold' static_configs: - targets: ['app:8000'] metrics_path: '/metrics' + scrape_interval: 10s # Prometheus 自身监控 - job_name: 'prometheus' @@ -37,7 +23,7 @@ scrape_configs: # 告警规则文件 rule_files: - - '/etc/prometheus/rules/*.yml' + - '/etc/prometheus/rules/*.yaml' # Alertmanager 配置(可选) # alerting: diff --git a/requirements.txt b/requirements.txt index 7837771..6f07778 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,3 +8,6 @@ python-json-logger>=2.0.7 # 指标存储方案(可选,根据选择的方案安装) # 方案2:Redis 方案需要 redis>=5.0.0 + +# YAML 配置解析 +pyyaml>=6.0.0 diff --git a/src/functional_scaffold/algorithms/base.py b/src/functional_scaffold/algorithms/base.py index 9441b74..ae972e1 100644 --- a/src/functional_scaffold/algorithms/base.py +++ b/src/functional_scaffold/algorithms/base.py @@ -32,7 +32,7 @@ class BaseAlgorithm(ABC): Returns: Dict[str, Any]: 包含结果和元数据的字典 """ - from ..core.metrics import algorithm_counter, algorithm_latency + from ..core.metrics_unified import incr, observe start_time = time.time() status = "success" @@ -42,9 +42,7 @@ class BaseAlgorithm(ABC): result = self.process(*args, **kwargs) elapsed_time = time.time() - start_time - logger.info( - f"Algorithm {self.name} completed successfully in {elapsed_time:.3f}s" - ) + logger.info(f"Algorithm {self.name} completed successfully in {elapsed_time:.3f}s") return { "success": True, @@ -73,5 +71,5 @@ class BaseAlgorithm(ABC): finally: # 记录算法执行指标 elapsed_time = time.time() - start_time - algorithm_counter.labels(algorithm=self.name, status=status).inc() - algorithm_latency.labels(algorithm=self.name).observe(elapsed_time) + incr("algorithm_executions_total", {"algorithm": self.name, "status": status}) + observe("algorithm_execution_duration_seconds", {"algorithm": self.name}, elapsed_time) diff --git a/src/functional_scaffold/config.py b/src/functional_scaffold/config.py index 8baf162..b21dfc4 100644 --- a/src/functional_scaffold/config.py +++ b/src/functional_scaffold/config.py @@ -8,10 +8,7 @@ from typing import Optional class Settings(BaseSettings): """应用配置""" - model_config = ConfigDict( - env_file=".env", - case_sensitive=False - ) + model_config = ConfigDict(env_file=".env", case_sensitive=False) # 应用信息 app_name: str = "FunctionalScaffold" @@ -42,6 +39,16 @@ class Settings(BaseSettings): database_url: Optional[str] = None + # Redis 配置 + redis_host: str = "localhost" + redis_port: int = 6379 + redis_db: int = 0 + redis_password: Optional[str] = None + + # 指标配置 + metrics_config_path: str = "config/metrics.yaml" + metrics_instance_id: Optional[str] = None # 默认使用 hostname + # 全局配置实例 settings = Settings() diff --git a/src/functional_scaffold/core/metrics_unified.py b/src/functional_scaffold/core/metrics_unified.py new file mode 100644 index 0000000..67281ed --- /dev/null +++ b/src/functional_scaffold/core/metrics_unified.py @@ -0,0 +1,605 @@ +"""统一指标管理模块 + +基于 Redis 的指标收集方案,支持多实例部署和 YAML 配置。 +""" + +import os +import re +import socket +import logging +from pathlib import Path +from typing import Any, Dict, List, Optional +from functools import wraps +import time + +import yaml +import redis + +logger = logging.getLogger(__name__) + + +class MetricsManager: + """统一指标管理器 + + 支持从 YAML 配置文件加载指标定义,使用 Redis 存储指标数据, + 并导出 Prometheus 格式的指标。 + """ + + def __init__(self, config_path: Optional[str] = None): + """初始化指标管理器 + + Args: + config_path: 配置文件路径,默认从 settings 获取 + """ + from ..config import settings + + self.config_path = config_path or settings.metrics_config_path + self.instance_id = settings.metrics_instance_id or socket.gethostname() + self.config: Dict[str, Any] = {} + self.metrics_definitions: Dict[str, Dict[str, Any]] = {} + self._redis_client: Optional[redis.Redis] = None + self._redis_available = False + + # 加载配置 + self._load_config() + # 初始化 Redis 连接 + self._init_redis() + # 注册指标定义 + self._register_metrics() + + def _load_config(self) -> None: + """加载 YAML 配置文件""" + # 尝试多个路径 + paths_to_try = [ + Path(self.config_path), + Path.cwd() / self.config_path, + Path(__file__).parent.parent.parent.parent / self.config_path, + ] + + for path in paths_to_try: + if path.exists(): + with open(path, "r", encoding="utf-8") as f: + content = f.read() + # 处理环境变量替换 ${VAR:default} + content = self._substitute_env_vars(content) + self.config = yaml.safe_load(content) or {} + logger.info(f"已加载指标配置文件: {path}") + return + + logger.warning(f"未找到指标配置文件: {self.config_path},使用默认配置") + self.config = self._get_default_config() + + def _substitute_env_vars(self, content: str) -> str: + """替换配置中的环境变量 + + 支持格式: ${VAR_NAME:default_value} + """ + pattern = r"\$\{([^}:]+)(?::([^}]*))?\}" + + def replacer(match): + var_name = match.group(1) + default_value = match.group(2) or "" + return os.environ.get(var_name, default_value) + + return re.sub(pattern, replacer, content) + + def _get_default_config(self) -> Dict[str, Any]: + """获取默认配置""" + return { + "redis": { + "host": "localhost", + "port": 6379, + "db": 0, + "password": "", + }, + "global": { + "prefix": "functional_scaffold", + "instance_label": True, + }, + "builtin_metrics": { + "http_requests": { + "enabled": True, + "name": "http_requests_total", + "type": "counter", + "description": "HTTP 请求总数", + "labels": ["method", "endpoint", "status"], + }, + "http_latency": { + "enabled": True, + "name": "http_request_duration_seconds", + "type": "histogram", + "description": "HTTP 请求延迟", + "labels": ["method", "endpoint"], + "buckets": [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10], + }, + "http_in_progress": { + "enabled": True, + "name": "http_requests_in_progress", + "type": "gauge", + "description": "当前进行中的 HTTP 请求数", + "labels": [], + }, + "algorithm_executions": { + "enabled": True, + "name": "algorithm_executions_total", + "type": "counter", + "description": "算法执行总数", + "labels": ["algorithm", "status"], + }, + "algorithm_latency": { + "enabled": True, + "name": "algorithm_execution_duration_seconds", + "type": "histogram", + "description": "算法执行延迟", + "labels": ["algorithm"], + "buckets": [0.01, 0.05, 0.1, 0.5, 1, 5, 10, 30, 60], + }, + }, + "custom_metrics": {}, + } + + def _init_redis(self) -> None: + """初始化 Redis 连接""" + from ..config import settings + + redis_config = self.config.get("redis", {}) + host = redis_config.get("host") or settings.redis_host + port = int(redis_config.get("port") or settings.redis_port) + db = int(redis_config.get("db") or settings.redis_db) + password = redis_config.get("password") or settings.redis_password + + try: + self._redis_client = redis.Redis( + host=host, + port=port, + db=db, + password=password if password else None, + decode_responses=True, + socket_connect_timeout=5, + socket_timeout=5, + ) + # 测试连接 + self._redis_client.ping() + self._redis_available = True + logger.info(f"Redis 连接成功: {host}:{port}/{db}") + except redis.ConnectionError as e: + logger.warning(f"Redis 连接失败: {e},指标将不会被收集") + self._redis_available = False + except Exception as e: + logger.warning(f"Redis 初始化异常: {e},指标将不会被收集") + self._redis_available = False + + def _register_metrics(self) -> None: + """注册所有指标定义""" + # 注册内置指标 + builtin = self.config.get("builtin_metrics", {}) + for key, metric_def in builtin.items(): + if metric_def.get("enabled", True): + name = metric_def.get("name", key) + self.metrics_definitions[name] = { + "type": metric_def.get("type", "counter"), + "description": metric_def.get("description", ""), + "labels": metric_def.get("labels", []), + "buckets": metric_def.get("buckets", []), + } + + # 注册自定义指标 + custom = self.config.get("custom_metrics", {}) + for key, metric_def in custom.items(): + name = metric_def.get("name", key) + self.metrics_definitions[name] = { + "type": metric_def.get("type", "counter"), + "description": metric_def.get("description", ""), + "labels": metric_def.get("labels", []), + "buckets": metric_def.get("buckets", []), + } + + logger.info(f"已注册 {len(self.metrics_definitions)} 个指标定义") + + def _labels_to_key(self, labels: Optional[Dict[str, str]]) -> str: + """将标签字典转换为 Redis key 的一部分""" + if not labels: + return "" + sorted_items = sorted(labels.items()) + return ",".join(f"{k}={v}" for k, v in sorted_items) + + def _key_to_prometheus_labels(self, key: str) -> str: + """将 Redis key 格式转换为 Prometheus 标签格式(带引号) + + 输入: endpoint=/healthz,method=GET,status=success + 输出: endpoint="/healthz",method="GET",status="success" + """ + if not key or key == "_default_": + return "" + parts = [] + for pair in key.split(","): + if "=" in pair: + k, v = pair.split("=", 1) + # 转义值中的特殊字符 + v = v.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n") + parts.append(f'{k}="{v}"') + return ",".join(parts) + + def _validate_metric(self, name: str, expected_type: str) -> bool: + """验证指标是否已定义且类型正确""" + if name not in self.metrics_definitions: + logger.warning(f"指标 '{name}' 未在配置中定义") + return False + if self.metrics_definitions[name]["type"] != expected_type: + logger.warning( + f"指标 '{name}' 类型不匹配: 期望 {expected_type}, " + f"实际 {self.metrics_definitions[name]['type']}" + ) + return False + return True + + # === 简单 API(业务代码使用)=== + + def incr(self, name: str, labels: Optional[Dict[str, str]] = None, value: int = 1) -> None: + """增加计数器 + + Args: + name: 指标名称 + labels: 标签字典 + value: 增加的值,默认为 1 + """ + if not self._redis_available: + return + + if not self._validate_metric(name, "counter"): + return + + try: + key = f"metrics:counter:{name}" + field = self._labels_to_key(labels) or "_default_" + self._redis_client.hincrbyfloat(key, field, value) + except Exception as e: + logger.error(f"增加计数器失败: {e}") + + def set(self, name: str, labels: Optional[Dict[str, str]] = None, value: float = 0) -> None: + """设置仪表盘值 + + Args: + name: 指标名称 + labels: 标签字典 + value: 设置的值 + """ + if not self._redis_available: + return + + if not self._validate_metric(name, "gauge"): + return + + try: + key = f"metrics:gauge:{name}" + field = self._labels_to_key(labels) or "_default_" + self._redis_client.hset(key, field, value) + except Exception as e: + logger.error(f"设置仪表盘失败: {e}") + + def gauge_incr( + self, name: str, labels: Optional[Dict[str, str]] = None, value: float = 1 + ) -> None: + """增加仪表盘值 + + Args: + name: 指标名称 + labels: 标签字典 + value: 增加的值 + """ + if not self._redis_available: + return + + if not self._validate_metric(name, "gauge"): + return + + try: + key = f"metrics:gauge:{name}" + field = self._labels_to_key(labels) or "_default_" + self._redis_client.hincrbyfloat(key, field, value) + except Exception as e: + logger.error(f"增加仪表盘失败: {e}") + + def gauge_decr( + self, name: str, labels: Optional[Dict[str, str]] = None, value: float = 1 + ) -> None: + """减少仪表盘值 + + Args: + name: 指标名称 + labels: 标签字典 + value: 减少的值 + """ + self.gauge_incr(name, labels, -value) + + def observe(self, name: str, labels: Optional[Dict[str, str]] = None, value: float = 0) -> None: + """记录直方图观测值 + + Args: + name: 指标名称 + labels: 标签字典 + value: 观测值 + """ + if not self._redis_available: + return + + if not self._validate_metric(name, "histogram"): + return + + try: + label_key = self._labels_to_key(labels) or "_default_" + buckets = self.metrics_definitions[name].get("buckets", []) + + # 使用 pipeline 批量操作 + pipe = self._redis_client.pipeline() + + # 增加 count + pipe.hincrbyfloat(f"metrics:histogram:{name}:count", label_key, 1) + + # 增加 sum + pipe.hincrbyfloat(f"metrics:histogram:{name}:sum", label_key, value) + + # 更新各个桶 + for bucket in buckets: + if value <= bucket: + bucket_key = f"metrics:histogram:{name}:bucket:{bucket}" + pipe.hincrbyfloat(bucket_key, label_key, 1) + + # +Inf 桶总是增加 + pipe.hincrbyfloat(f"metrics:histogram:{name}:bucket:+Inf", label_key, 1) + + pipe.execute() + except Exception as e: + logger.error(f"记录直方图失败: {e}") + + # === 导出方法 === + + def export(self) -> str: + """导出 Prometheus 格式指标 + + Returns: + Prometheus 文本格式的指标字符串 + """ + if not self._redis_available: + return "# Redis 不可用,无法导出指标\n" + + lines: List[str] = [] + + try: + for name, definition in self.metrics_definitions.items(): + metric_type = definition["type"] + description = definition["description"] + + # 添加 HELP 和 TYPE + lines.append(f"# HELP {name} {description}") + lines.append(f"# TYPE {name} {metric_type}") + + if metric_type == "counter": + lines.extend(self._export_counter(name)) + elif metric_type == "gauge": + lines.extend(self._export_gauge(name)) + elif metric_type == "histogram": + lines.extend(self._export_histogram(name, definition)) + + lines.append("") # 空行分隔 + + except Exception as e: + logger.error(f"导出指标失败: {e}") + return f"# 导出指标失败: {e}\n" + + return "\n".join(lines) + + def _export_counter(self, name: str) -> List[str]: + """导出计数器指标""" + lines = [] + key = f"metrics:counter:{name}" + + data = self._redis_client.hgetall(key) + for field, value in data.items(): + if field == "_default_": + lines.append(f"{name} {value}") + else: + prom_labels = self._key_to_prometheus_labels(field) + lines.append(f"{name}{{{prom_labels}}} {value}") + + return lines + + def _export_gauge(self, name: str) -> List[str]: + """导出仪表盘指标""" + lines = [] + key = f"metrics:gauge:{name}" + + data = self._redis_client.hgetall(key) + for field, value in data.items(): + if field == "_default_": + lines.append(f"{name} {value}") + else: + prom_labels = self._key_to_prometheus_labels(field) + lines.append(f"{name}{{{prom_labels}}} {value}") + + return lines + + def _export_histogram(self, name: str, definition: Dict[str, Any]) -> List[str]: + """导出直方图指标""" + lines = [] + buckets = definition.get("buckets", []) + + # 获取所有标签组合 + count_data = self._redis_client.hgetall(f"metrics:histogram:{name}:count") + sum_data = self._redis_client.hgetall(f"metrics:histogram:{name}:sum") + + for label_key in count_data.keys(): + prom_labels = self._key_to_prometheus_labels(label_key) + + # 导出各个桶 + for bucket in buckets: + bucket_key = f"metrics:histogram:{name}:bucket:{bucket}" + bucket_value = self._redis_client.hget(bucket_key, label_key) or "0" + if label_key == "_default_": + lines.append(f'{name}_bucket{{le="{bucket}"}} {bucket_value}') + else: + lines.append(f'{name}_bucket{{{prom_labels},le="{bucket}"}} {bucket_value}') + + # +Inf 桶 + inf_key = f"metrics:histogram:{name}:bucket:+Inf" + inf_value = self._redis_client.hget(inf_key, label_key) or "0" + if label_key == "_default_": + lines.append(f'{name}_bucket{{le="+Inf"}} {inf_value}') + else: + lines.append(f'{name}_bucket{{{prom_labels},le="+Inf"}} {inf_value}') + + # count 和 sum + count_value = count_data.get(label_key, "0") + sum_value = sum_data.get(label_key, "0") + if label_key == "_default_": + lines.append(f"{name}_count {count_value}") + lines.append(f"{name}_sum {sum_value}") + else: + lines.append(f"{name}_count{{{prom_labels}}} {count_value}") + lines.append(f"{name}_sum{{{prom_labels}}} {sum_value}") + + return lines + + def is_available(self) -> bool: + """检查 Redis 是否可用""" + return self._redis_available + + def reset(self) -> None: + """重置所有指标(主要用于测试)""" + if not self._redis_available: + return + + try: + # 删除所有指标相关的 key + keys = self._redis_client.keys("metrics:*") + if keys: + self._redis_client.delete(*keys) + logger.info("已重置所有指标") + except Exception as e: + logger.error(f"重置指标失败: {e}") + + +# 全局单例 +_manager: Optional[MetricsManager] = None + + +def get_metrics_manager() -> MetricsManager: + """获取指标管理器单例""" + global _manager + if _manager is None: + _manager = MetricsManager() + return _manager + + +def reset_metrics_manager() -> None: + """重置指标管理器单例(主要用于测试)""" + global _manager + _manager = None + + +# === 便捷函数(业务代码直接调用)=== + + +def incr(name: str, labels: Optional[Dict[str, str]] = None, value: int = 1) -> None: + """增加计数器 - 便捷函数 + + Args: + name: 指标名称 + labels: 标签字典 + value: 增加的值,默认为 1 + """ + get_metrics_manager().incr(name, labels, value) + + +def set(name: str, labels: Optional[Dict[str, str]] = None, value: float = 0) -> None: + """设置仪表盘 - 便捷函数 + + Args: + name: 指标名称 + labels: 标签字典 + value: 设置的值 + """ + get_metrics_manager().set(name, labels, value) + + +def gauge_incr(name: str, labels: Optional[Dict[str, str]] = None, value: float = 1) -> None: + """增加仪表盘 - 便捷函数 + + Args: + name: 指标名称 + labels: 标签字典 + value: 增加的值 + """ + get_metrics_manager().gauge_incr(name, labels, value) + + +def gauge_decr(name: str, labels: Optional[Dict[str, str]] = None, value: float = 1) -> None: + """减少仪表盘 - 便捷函数 + + Args: + name: 指标名称 + labels: 标签字典 + value: 减少的值 + """ + get_metrics_manager().gauge_decr(name, labels, value) + + +def observe(name: str, labels: Optional[Dict[str, str]] = None, value: float = 0) -> None: + """记录直方图 - 便捷函数 + + Args: + name: 指标名称 + labels: 标签字典 + value: 观测值 + """ + get_metrics_manager().observe(name, labels, value) + + +def export() -> str: + """导出指标 - 便捷函数 + + Returns: + Prometheus 文本格式的指标字符串 + """ + return get_metrics_manager().export() + + +def is_available() -> bool: + """检查 Redis 是否可用 - 便捷函数""" + return get_metrics_manager().is_available() + + +# === 装饰器(兼容旧 API)=== + + +def track_algorithm_execution(algorithm_name: str): + """装饰器:跟踪算法执行指标 + + Args: + algorithm_name: 算法名称 + """ + + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + start_time = time.time() + status = "success" + + try: + result = func(*args, **kwargs) + return result + except Exception as e: + status = "error" + raise e + finally: + elapsed = time.time() - start_time + incr("algorithm_executions_total", {"algorithm": algorithm_name, "status": status}) + observe( + "algorithm_execution_duration_seconds", + {"algorithm": algorithm_name}, + elapsed, + ) + + return wrapper + + return decorator diff --git a/src/functional_scaffold/main.py b/src/functional_scaffold/main.py index 00d48f1..cedee93 100644 --- a/src/functional_scaffold/main.py +++ b/src/functional_scaffold/main.py @@ -3,14 +3,20 @@ from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import Response -from prometheus_client import generate_latest, CONTENT_TYPE_LATEST import logging import time from .api import router from .config import settings from .core.logging import setup_logging -from .core.metrics import metrics_registry, request_counter, request_latency, in_progress_requests +from .core.metrics_unified import ( + get_metrics_manager, + incr, + observe, + gauge_incr, + gauge_decr, + export, +) # 设置日志 setup_logging(level=settings.log_level, format_type=settings.log_format) @@ -57,7 +63,7 @@ async def track_metrics(request: Request, call_next): if request.url.path == "/metrics": return await call_next(request) - in_progress_requests.inc() + gauge_incr("http_requests_in_progress") start_time = time.time() status = "success" @@ -72,16 +78,16 @@ async def track_metrics(request: Request, call_next): raise e finally: elapsed = time.time() - start_time - request_counter.labels( - method=request.method, - endpoint=request.url.path, - status=status - ).inc() - request_latency.labels( - method=request.method, - endpoint=request.url.path - ).observe(elapsed) - in_progress_requests.dec() + incr( + "http_requests_total", + {"method": request.method, "endpoint": request.url.path, "status": status}, + ) + observe( + "http_request_duration_seconds", + {"method": request.method, "endpoint": request.url.path}, + elapsed, + ) + gauge_decr("http_requests_in_progress") # 注册路由 @@ -105,8 +111,8 @@ async def metrics(): return Response(content="Metrics disabled", status_code=404) return Response( - content=generate_latest(metrics_registry), - media_type=CONTENT_TYPE_LATEST, + content=export(), + media_type="text/plain; version=0.0.4; charset=utf-8", ) @@ -118,6 +124,14 @@ async def startup_event(): logger.info(f"Environment: {settings.app_env}") logger.info(f"Metrics enabled: {settings.metrics_enabled}") + # 初始化指标管理器 + if settings.metrics_enabled: + manager = get_metrics_manager() + if manager.is_available(): + logger.info("Redis 指标收集已启用") + else: + logger.warning("Redis 不可用,指标将不会被收集") + # 关闭事件 @app.on_event("shutdown") diff --git a/tests/test_metrics_unified.py b/tests/test_metrics_unified.py new file mode 100644 index 0000000..337d8a1 --- /dev/null +++ b/tests/test_metrics_unified.py @@ -0,0 +1,273 @@ +"""metrics_unified 模块单元测试""" + +import pytest +from unittest.mock import MagicMock, patch + + +class TestMetricsManager: + """MetricsManager 类测试""" + + @pytest.fixture + def mock_redis(self): + """模拟 Redis 客户端""" + with patch("redis.Redis") as mock: + mock_instance = MagicMock() + mock_instance.ping.return_value = True + mock_instance.hincrbyfloat.return_value = 1.0 + mock_instance.hset.return_value = True + mock_instance.hgetall.return_value = {} + mock_instance.hget.return_value = "0" + mock_instance.keys.return_value = [] + mock_instance.pipeline.return_value = MagicMock() + mock.return_value = mock_instance + yield mock_instance + + @pytest.fixture + def manager(self, mock_redis): + """创建测试用的 MetricsManager""" + from src.functional_scaffold.core.metrics_unified import ( + MetricsManager, + reset_metrics_manager, + ) + + reset_metrics_manager() + manager = MetricsManager() + return manager + + def test_init_loads_default_config(self, manager): + """测试初始化加载默认配置""" + assert manager.config is not None + assert "builtin_metrics" in manager.config or len(manager.metrics_definitions) > 0 + + def test_metrics_definitions_registered(self, manager): + """测试指标定义已注册""" + assert "http_requests_total" in manager.metrics_definitions + assert "http_request_duration_seconds" in manager.metrics_definitions + assert "algorithm_executions_total" in manager.metrics_definitions + + def test_incr_counter(self, manager, mock_redis): + """测试计数器增加""" + manager.incr("http_requests_total", {"method": "GET", "endpoint": "/", "status": "success"}) + mock_redis.hincrbyfloat.assert_called() + + def test_incr_with_invalid_metric_type(self, manager, mock_redis): + """测试对非计数器类型调用 incr""" + # http_request_duration_seconds 是 histogram 类型 + manager.incr("http_request_duration_seconds", {}) + # 不应该调用 Redis(因为类型不匹配) + # 验证没有调用 hincrbyfloat(或者调用次数没有增加) + + def test_set_gauge(self, manager, mock_redis): + """测试设置仪表盘""" + manager.set("http_requests_in_progress", {}, 5) + mock_redis.hset.assert_called() + + def test_gauge_incr(self, manager, mock_redis): + """测试增加仪表盘""" + manager.gauge_incr("http_requests_in_progress", {}, 1) + mock_redis.hincrbyfloat.assert_called() + + def test_gauge_decr(self, manager, mock_redis): + """测试减少仪表盘""" + manager.gauge_decr("http_requests_in_progress", {}, 1) + mock_redis.hincrbyfloat.assert_called() + + def test_observe_histogram(self, manager, mock_redis): + """测试直方图观测""" + mock_pipeline = MagicMock() + mock_redis.pipeline.return_value = mock_pipeline + + manager.observe("http_request_duration_seconds", {"method": "GET", "endpoint": "/"}, 0.05) + + mock_redis.pipeline.assert_called() + mock_pipeline.execute.assert_called() + + def test_labels_to_key(self, manager): + """测试标签转换为 key""" + labels = {"method": "GET", "endpoint": "/api"} + key = manager._labels_to_key(labels) + assert "method=GET" in key + assert "endpoint=/api" in key + + def test_labels_to_key_empty(self, manager): + """测试空标签转换""" + key = manager._labels_to_key(None) + assert key == "" + + key = manager._labels_to_key({}) + assert key == "" + + def test_is_available(self, manager): + """测试 Redis 可用性检查""" + assert manager.is_available() is True + + +class TestConvenienceFunctions: + """便捷函数测试""" + + @pytest.fixture(autouse=True) + def setup(self): + """每个测试前重置管理器""" + from src.functional_scaffold.core.metrics_unified import reset_metrics_manager + + reset_metrics_manager() + + @patch("redis.Redis") + def test_incr_function(self, mock_redis_class): + """测试 incr 便捷函数""" + mock_instance = MagicMock() + mock_instance.ping.return_value = True + mock_redis_class.return_value = mock_instance + + from src.functional_scaffold.core.metrics_unified import incr, reset_metrics_manager + + reset_metrics_manager() + incr("http_requests_total", {"method": "GET", "endpoint": "/", "status": "success"}) + + mock_instance.hincrbyfloat.assert_called() + + @patch("redis.Redis") + def test_set_function(self, mock_redis_class): + """测试 set 便捷函数""" + mock_instance = MagicMock() + mock_instance.ping.return_value = True + mock_redis_class.return_value = mock_instance + + from src.functional_scaffold.core.metrics_unified import reset_metrics_manager, set + + reset_metrics_manager() + set("http_requests_in_progress", {}, 10) + + mock_instance.hset.assert_called() + + @patch("redis.Redis") + def test_observe_function(self, mock_redis_class): + """测试 observe 便捷函数""" + mock_instance = MagicMock() + mock_instance.ping.return_value = True + mock_pipeline = MagicMock() + mock_instance.pipeline.return_value = mock_pipeline + mock_redis_class.return_value = mock_instance + + from src.functional_scaffold.core.metrics_unified import observe, reset_metrics_manager + + reset_metrics_manager() + observe("http_request_duration_seconds", {"method": "GET", "endpoint": "/"}, 0.1) + + mock_instance.pipeline.assert_called() + + +class TestExport: + """导出功能测试""" + + @patch("redis.Redis") + def test_export_counter(self, mock_redis_class): + """测试导出计数器""" + mock_instance = MagicMock() + mock_instance.ping.return_value = True + mock_instance.hgetall.return_value = {"method=GET,endpoint=/,status=success": "10"} + mock_redis_class.return_value = mock_instance + + from src.functional_scaffold.core.metrics_unified import export, reset_metrics_manager + + reset_metrics_manager() + output = export() + + assert "http_requests_total" in output + assert "HELP" in output + assert "TYPE" in output + + @patch("redis.Redis") + def test_export_histogram(self, mock_redis_class): + """测试导出直方图""" + mock_instance = MagicMock() + mock_instance.ping.return_value = True + mock_instance.hgetall.side_effect = lambda key: ( + {"method=GET,endpoint=/": "5"} + if "count" in key + else {"method=GET,endpoint=/": "0.5"} + if "sum" in key + else {} + ) + mock_instance.hget.return_value = "3" + mock_redis_class.return_value = mock_instance + + from src.functional_scaffold.core.metrics_unified import export, reset_metrics_manager + + reset_metrics_manager() + output = export() + + assert "http_request_duration_seconds" in output + + +class TestEnvVarSubstitution: + """环境变量替换测试""" + + def test_substitute_env_vars(self): + """测试环境变量替换""" + import os + from src.functional_scaffold.core.metrics_unified import MetricsManager + + # 设置测试环境变量 + os.environ["TEST_VAR"] = "test_value" + + manager = MetricsManager.__new__(MetricsManager) + result = manager._substitute_env_vars("${TEST_VAR:default}") + assert result == "test_value" + + # 测试默认值 + result = manager._substitute_env_vars("${NONEXISTENT_VAR:default_value}") + assert result == "default_value" + + # 清理 + del os.environ["TEST_VAR"] + + +class TestTrackAlgorithmExecution: + """track_algorithm_execution 装饰器测试""" + + @patch("redis.Redis") + def test_decorator_success(self, mock_redis_class): + """测试装饰器成功执行""" + mock_instance = MagicMock() + mock_instance.ping.return_value = True + mock_pipeline = MagicMock() + mock_instance.pipeline.return_value = mock_pipeline + mock_redis_class.return_value = mock_instance + + from src.functional_scaffold.core.metrics_unified import ( + reset_metrics_manager, + track_algorithm_execution, + ) + + reset_metrics_manager() + + @track_algorithm_execution("test_algo") + def test_func(): + return "result" + + result = test_func() + assert result == "result" + + @patch("redis.Redis") + def test_decorator_error(self, mock_redis_class): + """测试装饰器错误处理""" + mock_instance = MagicMock() + mock_instance.ping.return_value = True + mock_pipeline = MagicMock() + mock_instance.pipeline.return_value = mock_pipeline + mock_redis_class.return_value = mock_instance + + from src.functional_scaffold.core.metrics_unified import ( + reset_metrics_manager, + track_algorithm_execution, + ) + + reset_metrics_manager() + + @track_algorithm_execution("test_algo") + def test_func(): + raise ValueError("test error") + + with pytest.raises(ValueError): + test_func()