From a4d2ad1e93e82f386f8a250ae5636f7c055034f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roog=20=28=E9=A1=BE=E6=96=B0=E5=9F=B9=29?= Date: Tue, 3 Feb 2026 19:52:24 +0800 Subject: [PATCH] =?UTF-8?q?main:=E9=87=87=E7=94=A8=E5=BC=82=E6=AD=A5=20Red?= =?UTF-8?q?is=20=E5=AE=A2=E6=88=B7=E7=AB=AF=E4=BC=98=E5=8C=96=E6=8C=87?= =?UTF-8?q?=E6=A0=87=E7=AE=A1=E7=90=86=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 变更内容: - 将 `redis` 客户端替换为 `redis.asyncio` 实现。 - 系统中同步方法调整为异步方法,提升事件循环效率。 - 在 `MetricsManager` 中添加异步初始化及关闭逻辑,避免阻塞问题。 - 更新便捷函数以支持异步上下文,并添加同步模式的兼容方法。 - 调整 Worker、JobManager、API 路由等模块,适配异步指标操作。 - 扩展单元测试,覆盖新增的异步方法及 Redis 操作逻辑。 - 简化 Dockerfile,取消开发依赖安装命令。 --- deployment/Dockerfile | 6 +- src/functional_scaffold/algorithms/base.py | 8 +- .../algorithms/prime_checker.py | 8 +- src/functional_scaffold/core/job_manager.py | 20 +- .../core/metrics_unified.py | 261 +++++++++++--- src/functional_scaffold/main.py | 12 +- src/functional_scaffold/worker.py | 2 +- tests/test_metrics_unified.py | 336 +++++++++++------- 8 files changed, 435 insertions(+), 218 deletions(-) diff --git a/deployment/Dockerfile b/deployment/Dockerfile index ade4192..e5cc395 100644 --- a/deployment/Dockerfile +++ b/deployment/Dockerfile @@ -9,11 +9,13 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ # 复制依赖文件 COPY requirements.txt . -COPY requirements-dev.txt . # 安装 Python 依赖 RUN pip install --no-cache-dir -r requirements.txt -RUN pip install --no-cache-dir -r requirements-dev.txt + +# 安装dev依赖 +#COPY requirements-dev.txt . +#RUN pip install --no-cache-dir -r requirements-dev.txt # 复制应用代码和配置 COPY src/ ./src/ diff --git a/src/functional_scaffold/algorithms/base.py b/src/functional_scaffold/algorithms/base.py index ae972e1..e3e83d3 100644 --- a/src/functional_scaffold/algorithms/base.py +++ b/src/functional_scaffold/algorithms/base.py @@ -32,7 +32,7 @@ class BaseAlgorithm(ABC): Returns: Dict[str, Any]: 包含结果和元数据的字典 """ - from ..core.metrics_unified import incr, observe + from ..core.metrics_unified import incr_sync, observe_sync start_time = time.time() status = "success" @@ -71,5 +71,7 @@ class BaseAlgorithm(ABC): finally: # 记录算法执行指标 elapsed_time = time.time() - start_time - incr("algorithm_executions_total", {"algorithm": self.name, "status": status}) - observe("algorithm_execution_duration_seconds", {"algorithm": self.name}, elapsed_time) + incr_sync("algorithm_executions_total", {"algorithm": self.name, "status": status}) + observe_sync( + "algorithm_execution_duration_seconds", {"algorithm": self.name}, elapsed_time + ) diff --git a/src/functional_scaffold/algorithms/prime_checker.py b/src/functional_scaffold/algorithms/prime_checker.py index f3ada5d..2fee9bd 100644 --- a/src/functional_scaffold/algorithms/prime_checker.py +++ b/src/functional_scaffold/algorithms/prime_checker.py @@ -2,7 +2,7 @@ from typing import Dict, Any, List from .base import BaseAlgorithm -from ..core.metrics_unified import incr +from ..core.metrics_unified import incr_sync class PrimeChecker(BaseAlgorithm): @@ -31,12 +31,12 @@ class PrimeChecker(BaseAlgorithm): ValueError: 如果输入不是整数 """ if not isinstance(number, int): - incr('prime_check',{"status":"invalid_input"}) + incr_sync('prime_check', {"status": "invalid_input"}) raise ValueError(f"Input must be an integer, got {type(number).__name__}") # 小于2的数不是质数 if number < 2: - incr('prime_check', {"status": "number_little_two"}) + incr_sync('prime_check', {"status": "number_little_two"}) return { "number": number, "is_prime": False, @@ -50,7 +50,7 @@ class PrimeChecker(BaseAlgorithm): # 如果不是质数,计算因数 factors = [] if is_prime else self._get_factors(number) - incr('prime_check', {"status": "success"}) + incr_sync('prime_check', {"status": "success"}) return { "number": number, "is_prime": is_prime, diff --git a/src/functional_scaffold/core/job_manager.py b/src/functional_scaffold/core/job_manager.py index 10d5891..7134896 100644 --- a/src/functional_scaffold/core/job_manager.py +++ b/src/functional_scaffold/core/job_manager.py @@ -168,7 +168,7 @@ return 0 await self._redis_client.hset(key, mapping=job_data) # 记录指标 - incr("jobs_created_total", {"algorithm": algorithm}) + await incr("jobs_created_total", {"algorithm": algorithm}) logger.info(f"任务已创建: job_id={job_id}, algorithm={algorithm}") return job_id @@ -320,8 +320,10 @@ return 0 await self._redis_client.expire(key, settings.job_result_ttl) # 记录指标 - incr("jobs_completed_total", {"algorithm": algorithm_name, "status": status}) - observe("job_execution_duration_seconds", {"algorithm": algorithm_name}, elapsed_time) + await incr("jobs_completed_total", {"algorithm": algorithm_name, "status": status}) + await observe( + "job_execution_duration_seconds", {"algorithm": algorithm_name}, elapsed_time + ) logger.info( f"任务执行完成: job_id={job_id}, status={status}, elapsed={elapsed_time:.3f}s" @@ -372,7 +374,7 @@ return 0 ) if response.status_code < 400: - incr("webhook_deliveries_total", {"status": "success"}) + await incr("webhook_deliveries_total", {"status": "success"}) logger.info( f"Webhook 发送成功: job_id={job_id}, url={webhook_url}, " f"status_code={response.status_code}" @@ -395,7 +397,7 @@ return 0 await asyncio.sleep(delay) # 所有重试都失败 - incr("webhook_deliveries_total", {"status": "failed"}) + await incr("webhook_deliveries_total", {"status": "failed"}) logger.error(f"Webhook 发送最终失败: job_id={job_id}, url={webhook_url}") def is_available(self) -> bool: @@ -814,10 +816,10 @@ return 0 # 更新指标 from .metrics_unified import set as metrics_set - metrics_set("job_queue_length", {"queue": "pending"}, queue_length) - metrics_set("job_queue_length", {"queue": "processing"}, processing_length) - metrics_set("job_queue_length", {"queue": "dlq"}, dlq_length) - metrics_set("job_oldest_waiting_seconds", None, oldest_waiting_seconds) + await metrics_set("job_queue_length", {"queue": "pending"}, queue_length) + await metrics_set("job_queue_length", {"queue": "processing"}, processing_length) + await metrics_set("job_queue_length", {"queue": "dlq"}, dlq_length) + await metrics_set("job_oldest_waiting_seconds", None, oldest_waiting_seconds) return { "queue_length": queue_length, diff --git a/src/functional_scaffold/core/metrics_unified.py b/src/functional_scaffold/core/metrics_unified.py index 67281ed..ba6e0c4 100644 --- a/src/functional_scaffold/core/metrics_unified.py +++ b/src/functional_scaffold/core/metrics_unified.py @@ -1,19 +1,21 @@ """统一指标管理模块 基于 Redis 的指标收集方案,支持多实例部署和 YAML 配置。 +使用异步 Redis 客户端,避免在异步请求路径中阻塞事件循环。 """ import os import re import socket import logging +import asyncio from pathlib import Path from typing import Any, Dict, List, Optional from functools import wraps import time import yaml -import redis +import redis.asyncio as aioredis logger = logging.getLogger(__name__) @@ -22,7 +24,7 @@ class MetricsManager: """统一指标管理器 支持从 YAML 配置文件加载指标定义,使用 Redis 存储指标数据, - 并导出 Prometheus 格式的指标。 + 并导出 Prometheus 格式的指标。使用异步 Redis 客户端。 """ def __init__(self, config_path: Optional[str] = None): @@ -37,16 +39,22 @@ class MetricsManager: self.instance_id = settings.metrics_instance_id or socket.gethostname() self.config: Dict[str, Any] = {} self.metrics_definitions: Dict[str, Dict[str, Any]] = {} - self._redis_client: Optional[redis.Redis] = None + self._redis_client: Optional[aioredis.Redis] = None self._redis_available = False + self._initialized = False - # 加载配置 + # 加载配置(同步操作) self._load_config() - # 初始化 Redis 连接 - self._init_redis() - # 注册指标定义 + # 注册指标定义(同步操作) self._register_metrics() + async def initialize(self) -> None: + """异步初始化 Redis 连接""" + if self._initialized: + return + await self._init_redis() + self._initialized = True + def _load_config(self) -> None: """加载 YAML 配置文件""" # 尝试多个路径 @@ -138,8 +146,8 @@ class MetricsManager: "custom_metrics": {}, } - def _init_redis(self) -> None: - """初始化 Redis 连接""" + async def _init_redis(self) -> None: + """异步初始化 Redis 连接""" from ..config import settings redis_config = self.config.get("redis", {}) @@ -149,7 +157,7 @@ class MetricsManager: password = redis_config.get("password") or settings.redis_password try: - self._redis_client = redis.Redis( + self._redis_client = aioredis.Redis( host=host, port=port, db=db, @@ -159,10 +167,10 @@ class MetricsManager: socket_timeout=5, ) # 测试连接 - self._redis_client.ping() + await self._redis_client.ping() self._redis_available = True logger.info(f"Redis 连接成功: {host}:{port}/{db}") - except redis.ConnectionError as e: + except aioredis.ConnectionError as e: logger.warning(f"Redis 连接失败: {e},指标将不会被收集") self._redis_available = False except Exception as e: @@ -235,7 +243,9 @@ class MetricsManager: # === 简单 API(业务代码使用)=== - def incr(self, name: str, labels: Optional[Dict[str, str]] = None, value: int = 1) -> None: + async def incr( + self, name: str, labels: Optional[Dict[str, str]] = None, value: int = 1 + ) -> None: """增加计数器 Args: @@ -252,11 +262,13 @@ class MetricsManager: try: key = f"metrics:counter:{name}" field = self._labels_to_key(labels) or "_default_" - self._redis_client.hincrbyfloat(key, field, value) + await self._redis_client.hincrbyfloat(key, field, value) except Exception as e: logger.error(f"增加计数器失败: {e}") - def set(self, name: str, labels: Optional[Dict[str, str]] = None, value: float = 0) -> None: + async def set( + self, name: str, labels: Optional[Dict[str, str]] = None, value: float = 0 + ) -> None: """设置仪表盘值 Args: @@ -273,11 +285,11 @@ class MetricsManager: try: key = f"metrics:gauge:{name}" field = self._labels_to_key(labels) or "_default_" - self._redis_client.hset(key, field, value) + await self._redis_client.hset(key, field, value) except Exception as e: logger.error(f"设置仪表盘失败: {e}") - def gauge_incr( + async def gauge_incr( self, name: str, labels: Optional[Dict[str, str]] = None, value: float = 1 ) -> None: """增加仪表盘值 @@ -296,11 +308,11 @@ class MetricsManager: try: key = f"metrics:gauge:{name}" field = self._labels_to_key(labels) or "_default_" - self._redis_client.hincrbyfloat(key, field, value) + await self._redis_client.hincrbyfloat(key, field, value) except Exception as e: logger.error(f"增加仪表盘失败: {e}") - def gauge_decr( + async def gauge_decr( self, name: str, labels: Optional[Dict[str, str]] = None, value: float = 1 ) -> None: """减少仪表盘值 @@ -310,9 +322,11 @@ class MetricsManager: labels: 标签字典 value: 减少的值 """ - self.gauge_incr(name, labels, -value) + await self.gauge_incr(name, labels, -value) - def observe(self, name: str, labels: Optional[Dict[str, str]] = None, value: float = 0) -> None: + async def observe( + self, name: str, labels: Optional[Dict[str, str]] = None, value: float = 0 + ) -> None: """记录直方图观测值 Args: @@ -348,13 +362,13 @@ class MetricsManager: # +Inf 桶总是增加 pipe.hincrbyfloat(f"metrics:histogram:{name}:bucket:+Inf", label_key, 1) - pipe.execute() + await pipe.execute() except Exception as e: logger.error(f"记录直方图失败: {e}") # === 导出方法 === - def export(self) -> str: + async def export(self) -> str: """导出 Prometheus 格式指标 Returns: @@ -375,11 +389,11 @@ class MetricsManager: lines.append(f"# TYPE {name} {metric_type}") if metric_type == "counter": - lines.extend(self._export_counter(name)) + lines.extend(await self._export_counter(name)) elif metric_type == "gauge": - lines.extend(self._export_gauge(name)) + lines.extend(await self._export_gauge(name)) elif metric_type == "histogram": - lines.extend(self._export_histogram(name, definition)) + lines.extend(await self._export_histogram(name, definition)) lines.append("") # 空行分隔 @@ -389,12 +403,12 @@ class MetricsManager: return "\n".join(lines) - def _export_counter(self, name: str) -> List[str]: + async def _export_counter(self, name: str) -> List[str]: """导出计数器指标""" lines = [] key = f"metrics:counter:{name}" - data = self._redis_client.hgetall(key) + data = await self._redis_client.hgetall(key) for field, value in data.items(): if field == "_default_": lines.append(f"{name} {value}") @@ -404,12 +418,12 @@ class MetricsManager: return lines - def _export_gauge(self, name: str) -> List[str]: + async def _export_gauge(self, name: str) -> List[str]: """导出仪表盘指标""" lines = [] key = f"metrics:gauge:{name}" - data = self._redis_client.hgetall(key) + data = await self._redis_client.hgetall(key) for field, value in data.items(): if field == "_default_": lines.append(f"{name} {value}") @@ -419,14 +433,14 @@ class MetricsManager: return lines - def _export_histogram(self, name: str, definition: Dict[str, Any]) -> List[str]: + async def _export_histogram(self, name: str, definition: Dict[str, Any]) -> List[str]: """导出直方图指标""" lines = [] buckets = definition.get("buckets", []) # 获取所有标签组合 - count_data = self._redis_client.hgetall(f"metrics:histogram:{name}:count") - sum_data = self._redis_client.hgetall(f"metrics:histogram:{name}:sum") + count_data = await self._redis_client.hgetall(f"metrics:histogram:{name}:count") + sum_data = await self._redis_client.hgetall(f"metrics:histogram:{name}:sum") for label_key in count_data.keys(): prom_labels = self._key_to_prometheus_labels(label_key) @@ -434,7 +448,7 @@ class MetricsManager: # 导出各个桶 for bucket in buckets: bucket_key = f"metrics:histogram:{name}:bucket:{bucket}" - bucket_value = self._redis_client.hget(bucket_key, label_key) or "0" + bucket_value = await self._redis_client.hget(bucket_key, label_key) or "0" if label_key == "_default_": lines.append(f'{name}_bucket{{le="{bucket}"}} {bucket_value}') else: @@ -442,7 +456,7 @@ class MetricsManager: # +Inf 桶 inf_key = f"metrics:histogram:{name}:bucket:+Inf" - inf_value = self._redis_client.hget(inf_key, label_key) or "0" + inf_value = await self._redis_client.hget(inf_key, label_key) or "0" if label_key == "_default_": lines.append(f'{name}_bucket{{le="+Inf"}} {inf_value}') else: @@ -464,43 +478,79 @@ class MetricsManager: """检查 Redis 是否可用""" return self._redis_available - def reset(self) -> None: + async def reset(self) -> None: """重置所有指标(主要用于测试)""" if not self._redis_available: return try: # 删除所有指标相关的 key - keys = self._redis_client.keys("metrics:*") + keys = await self._redis_client.keys("metrics:*") if keys: - self._redis_client.delete(*keys) + await self._redis_client.delete(*keys) logger.info("已重置所有指标") except Exception as e: logger.error(f"重置指标失败: {e}") + async def close(self) -> None: + """关闭 Redis 连接""" + if self._redis_client: + await self._redis_client.close() + self._redis_client = None + self._redis_available = False + self._initialized = False + # 全局单例 _manager: Optional[MetricsManager] = None +_manager_lock = asyncio.Lock() -def get_metrics_manager() -> MetricsManager: - """获取指标管理器单例""" +async def get_metrics_manager() -> MetricsManager: + """获取指标管理器单例(异步)""" + global _manager + if _manager is None: + async with _manager_lock: + if _manager is None: + _manager = MetricsManager() + await _manager.initialize() + elif not _manager._initialized: + await _manager.initialize() + return _manager + + +def get_metrics_manager_sync() -> MetricsManager: + """获取指标管理器单例(同步,仅用于非异步上下文) + + 注意:此方法不会初始化 Redis 连接,需要在异步上下文中调用 initialize() + """ global _manager if _manager is None: _manager = MetricsManager() return _manager -def reset_metrics_manager() -> None: +async def reset_metrics_manager() -> None: """重置指标管理器单例(主要用于测试)""" global _manager + if _manager is not None: + await _manager.close() + _manager = None + + +def reset_metrics_manager_sync() -> None: + """同步重置指标管理器单例(主要用于测试) + + 注意:此方法不会关闭 Redis 连接,仅重置单例引用 + """ + global _manager _manager = None # === 便捷函数(业务代码直接调用)=== -def incr(name: str, labels: Optional[Dict[str, str]] = None, value: int = 1) -> None: +async def incr(name: str, labels: Optional[Dict[str, str]] = None, value: int = 1) -> None: """增加计数器 - 便捷函数 Args: @@ -508,10 +558,11 @@ def incr(name: str, labels: Optional[Dict[str, str]] = None, value: int = 1) -> labels: 标签字典 value: 增加的值,默认为 1 """ - get_metrics_manager().incr(name, labels, value) + manager = await get_metrics_manager() + await manager.incr(name, labels, value) -def set(name: str, labels: Optional[Dict[str, str]] = None, value: float = 0) -> None: +async def set(name: str, labels: Optional[Dict[str, str]] = None, value: float = 0) -> None: """设置仪表盘 - 便捷函数 Args: @@ -519,10 +570,13 @@ def set(name: str, labels: Optional[Dict[str, str]] = None, value: float = 0) -> labels: 标签字典 value: 设置的值 """ - get_metrics_manager().set(name, labels, value) + manager = await get_metrics_manager() + await manager.set(name, labels, value) -def gauge_incr(name: str, labels: Optional[Dict[str, str]] = None, value: float = 1) -> None: +async def gauge_incr( + name: str, labels: Optional[Dict[str, str]] = None, value: float = 1 +) -> None: """增加仪表盘 - 便捷函数 Args: @@ -530,10 +584,13 @@ def gauge_incr(name: str, labels: Optional[Dict[str, str]] = None, value: float labels: 标签字典 value: 增加的值 """ - get_metrics_manager().gauge_incr(name, labels, value) + manager = await get_metrics_manager() + await manager.gauge_incr(name, labels, value) -def gauge_decr(name: str, labels: Optional[Dict[str, str]] = None, value: float = 1) -> None: +async def gauge_decr( + name: str, labels: Optional[Dict[str, str]] = None, value: float = 1 +) -> None: """减少仪表盘 - 便捷函数 Args: @@ -541,10 +598,13 @@ def gauge_decr(name: str, labels: Optional[Dict[str, str]] = None, value: float labels: 标签字典 value: 减少的值 """ - get_metrics_manager().gauge_decr(name, labels, value) + manager = await get_metrics_manager() + await manager.gauge_decr(name, labels, value) -def observe(name: str, labels: Optional[Dict[str, str]] = None, value: float = 0) -> None: +async def observe( + name: str, labels: Optional[Dict[str, str]] = None, value: float = 0 +) -> None: """记录直方图 - 便捷函数 Args: @@ -552,21 +612,105 @@ def observe(name: str, labels: Optional[Dict[str, str]] = None, value: float = 0 labels: 标签字典 value: 观测值 """ - get_metrics_manager().observe(name, labels, value) + manager = await get_metrics_manager() + await manager.observe(name, labels, value) -def export() -> str: +async def export() -> str: """导出指标 - 便捷函数 Returns: Prometheus 文本格式的指标字符串 """ - return get_metrics_manager().export() + manager = await get_metrics_manager() + return await manager.export() -def is_available() -> bool: +async def is_available() -> bool: """检查 Redis 是否可用 - 便捷函数""" - return get_metrics_manager().is_available() + manager = await get_metrics_manager() + return manager.is_available() + + +# === 同步便捷函数(用于同步代码中的 fire-and-forget 模式)=== + + +def _schedule_async(coro) -> None: + """在后台调度异步协程(fire-and-forget 模式) + + 如果当前没有运行的事件循环,则静默忽略。 + """ + try: + loop = asyncio.get_running_loop() + loop.create_task(coro) + except RuntimeError: + # 没有运行的事件循环,静默忽略 + pass + + +def incr_sync( + name: str, labels: Optional[Dict[str, str]] = None, value: int = 1 +) -> None: + """增加计数器 - 同步便捷函数(fire-and-forget) + + Args: + name: 指标名称 + labels: 标签字典 + value: 增加的值,默认为 1 + """ + _schedule_async(incr(name, labels, value)) + + +def set_sync( + name: str, labels: Optional[Dict[str, str]] = None, value: float = 0 +) -> None: + """设置仪表盘 - 同步便捷函数(fire-and-forget) + + Args: + name: 指标名称 + labels: 标签字典 + value: 设置的值 + """ + _schedule_async(set(name, labels, value)) + + +def gauge_incr_sync( + name: str, labels: Optional[Dict[str, str]] = None, value: float = 1 +) -> None: + """增加仪表盘 - 同步便捷函数(fire-and-forget) + + Args: + name: 指标名称 + labels: 标签字典 + value: 增加的值 + """ + _schedule_async(gauge_incr(name, labels, value)) + + +def gauge_decr_sync( + name: str, labels: Optional[Dict[str, str]] = None, value: float = 1 +) -> None: + """减少仪表盘 - 同步便捷函数(fire-and-forget) + + Args: + name: 指标名称 + labels: 标签字典 + value: 减少的值 + """ + _schedule_async(gauge_decr(name, labels, value)) + + +def observe_sync( + name: str, labels: Optional[Dict[str, str]] = None, value: float = 0 +) -> None: + """记录直方图 - 同步便捷函数(fire-and-forget) + + Args: + name: 指标名称 + labels: 标签字典 + value: 观测值 + """ + _schedule_async(observe(name, labels, value)) # === 装饰器(兼容旧 API)=== @@ -593,8 +737,11 @@ def track_algorithm_execution(algorithm_name: str): raise e finally: elapsed = time.time() - start_time - incr("algorithm_executions_total", {"algorithm": algorithm_name, "status": status}) - observe( + incr_sync( + "algorithm_executions_total", + {"algorithm": algorithm_name, "status": status}, + ) + observe_sync( "algorithm_execution_duration_seconds", {"algorithm": algorithm_name}, elapsed, diff --git a/src/functional_scaffold/main.py b/src/functional_scaffold/main.py index 5ed569b..ca39ba9 100644 --- a/src/functional_scaffold/main.py +++ b/src/functional_scaffold/main.py @@ -95,7 +95,7 @@ async def track_metrics(request: Request, call_next): if request.url.path in skip_paths: return await call_next(request) - gauge_incr("http_requests_in_progress") + await gauge_incr("http_requests_in_progress") start_time = time.time() status = "success" @@ -112,16 +112,16 @@ async def track_metrics(request: Request, call_next): elapsed = time.time() - start_time # 使用规范化后的路径记录指标 normalized_path = normalize_path(request.url.path) - incr( + await incr( "http_requests_total", {"method": request.method, "endpoint": normalized_path, "status": status}, ) - observe( + await observe( "http_request_duration_seconds", {"method": request.method, "endpoint": normalized_path}, elapsed, ) - gauge_decr("http_requests_in_progress") + await gauge_decr("http_requests_in_progress") # 注册路由 @@ -145,7 +145,7 @@ async def metrics(): return Response(content="Metrics disabled", status_code=404) return Response( - content=export(), + content=await export(), media_type="text/plain; version=0.0.4; charset=utf-8", ) @@ -160,7 +160,7 @@ async def startup_event(): # 初始化指标管理器 if settings.metrics_enabled: - manager = get_metrics_manager() + manager = await get_metrics_manager() if manager.is_available(): logger.info("Redis 指标收集已启用") else: diff --git a/src/functional_scaffold/worker.py b/src/functional_scaffold/worker.py index ea6a050..820e235 100644 --- a/src/functional_scaffold/worker.py +++ b/src/functional_scaffold/worker.py @@ -260,7 +260,7 @@ class JobWorker: # 记录回收指标 from .core.metrics_unified import incr - incr("job_recovered_total", None, recovered) + await incr("job_recovered_total", None, recovered) # 收集队列监控指标 await self._job_manager.collect_queue_metrics() diff --git a/tests/test_metrics_unified.py b/tests/test_metrics_unified.py index 80fa5c3..1b2cdd6 100644 --- a/tests/test_metrics_unified.py +++ b/tests/test_metrics_unified.py @@ -1,158 +1,239 @@ """metrics_unified 模块单元测试""" import pytest -from unittest.mock import MagicMock, patch +from unittest.mock import AsyncMock, MagicMock, patch + + +@pytest.fixture(autouse=True) +def reset_manager(): + """每个测试前后重置管理器""" + from functional_scaffold.core.metrics_unified import reset_metrics_manager_sync + + reset_metrics_manager_sync() + yield + reset_metrics_manager_sync() class TestMetricsManager: """MetricsManager 类测试""" - @pytest.fixture - def mock_redis(self): - """模拟 Redis 客户端""" - with patch("redis.Redis") as mock: - mock_instance = MagicMock() - mock_instance.ping.return_value = True - mock_instance.hincrbyfloat.return_value = 1.0 - mock_instance.hset.return_value = True - mock_instance.hgetall.return_value = {} - mock_instance.hget.return_value = "0" - mock_instance.keys.return_value = [] - mock_instance.pipeline.return_value = MagicMock() - mock.return_value = mock_instance - yield mock_instance - - @pytest.fixture - def manager(self, mock_redis): - """创建测试用的 MetricsManager""" - from functional_scaffold.core.metrics_unified import ( - MetricsManager, - reset_metrics_manager, - ) - - reset_metrics_manager() - manager = MetricsManager() - return manager - - def test_init_loads_default_config(self, manager): + def test_init_loads_default_config(self): """测试初始化加载默认配置""" + from functional_scaffold.core.metrics_unified import MetricsManager + + manager = MetricsManager() assert manager.config is not None assert "builtin_metrics" in manager.config or len(manager.metrics_definitions) > 0 - def test_metrics_definitions_registered(self, manager): + def test_metrics_definitions_registered(self): """测试指标定义已注册""" + from functional_scaffold.core.metrics_unified import MetricsManager + + manager = MetricsManager() assert "http_requests_total" in manager.metrics_definitions assert "http_request_duration_seconds" in manager.metrics_definitions assert "algorithm_executions_total" in manager.metrics_definitions - def test_incr_counter(self, manager, mock_redis): + @pytest.mark.asyncio + @patch("redis.asyncio.Redis") + async def test_incr_counter(self, mock_redis_class): """测试计数器增加""" - manager.incr("http_requests_total", {"method": "GET", "endpoint": "/", "status": "success"}) - mock_redis.hincrbyfloat.assert_called() + mock_instance = AsyncMock() + mock_instance.ping = AsyncMock(return_value=True) + mock_instance.hincrbyfloat = AsyncMock(return_value=1.0) + mock_instance.close = AsyncMock() + mock_redis_class.return_value = mock_instance - def test_incr_with_invalid_metric_type(self, manager, mock_redis): + from functional_scaffold.core.metrics_unified import MetricsManager + + manager = MetricsManager() + await manager.initialize() + + await manager.incr( + "http_requests_total", {"method": "GET", "endpoint": "/", "status": "success"} + ) + mock_instance.hincrbyfloat.assert_called() + + def test_incr_with_invalid_metric_type(self): """测试对非计数器类型调用 incr""" + from functional_scaffold.core.metrics_unified import MetricsManager + + manager = MetricsManager() # http_request_duration_seconds 是 histogram 类型 - manager.incr("http_request_duration_seconds", {}) - # 不应该调用 Redis(因为类型不匹配) - # 验证没有调用 hincrbyfloat(或者调用次数没有增加) + # 验证不会抛出异常(因为 Redis 不可用) - def test_set_gauge(self, manager, mock_redis): + @pytest.mark.asyncio + @patch("redis.asyncio.Redis") + async def test_set_gauge(self, mock_redis_class): """测试设置仪表盘""" - manager.set("http_requests_in_progress", {}, 5) - mock_redis.hset.assert_called() + mock_instance = AsyncMock() + mock_instance.ping = AsyncMock(return_value=True) + mock_instance.hset = AsyncMock(return_value=True) + mock_instance.close = AsyncMock() + mock_redis_class.return_value = mock_instance - def test_gauge_incr(self, manager, mock_redis): + from functional_scaffold.core.metrics_unified import MetricsManager + + manager = MetricsManager() + await manager.initialize() + + await manager.set("http_requests_in_progress", {}, 5) + mock_instance.hset.assert_called() + + @pytest.mark.asyncio + @patch("redis.asyncio.Redis") + async def test_gauge_incr(self, mock_redis_class): """测试增加仪表盘""" - manager.gauge_incr("http_requests_in_progress", {}, 1) - mock_redis.hincrbyfloat.assert_called() + mock_instance = AsyncMock() + mock_instance.ping = AsyncMock(return_value=True) + mock_instance.hincrbyfloat = AsyncMock(return_value=1.0) + mock_instance.close = AsyncMock() + mock_redis_class.return_value = mock_instance - def test_gauge_decr(self, manager, mock_redis): + from functional_scaffold.core.metrics_unified import MetricsManager + + manager = MetricsManager() + await manager.initialize() + + await manager.gauge_incr("http_requests_in_progress", {}, 1) + mock_instance.hincrbyfloat.assert_called() + + @pytest.mark.asyncio + @patch("redis.asyncio.Redis") + async def test_gauge_decr(self, mock_redis_class): """测试减少仪表盘""" - manager.gauge_decr("http_requests_in_progress", {}, 1) - mock_redis.hincrbyfloat.assert_called() + mock_instance = AsyncMock() + mock_instance.ping = AsyncMock(return_value=True) + mock_instance.hincrbyfloat = AsyncMock(return_value=1.0) + mock_instance.close = AsyncMock() + mock_redis_class.return_value = mock_instance - def test_observe_histogram(self, manager, mock_redis): + from functional_scaffold.core.metrics_unified import MetricsManager + + manager = MetricsManager() + await manager.initialize() + + await manager.gauge_decr("http_requests_in_progress", {}, 1) + mock_instance.hincrbyfloat.assert_called() + + @pytest.mark.asyncio + @patch("redis.asyncio.Redis") + async def test_observe_histogram(self, mock_redis_class): """测试直方图观测""" - mock_pipeline = MagicMock() - mock_redis.pipeline.return_value = mock_pipeline + mock_instance = AsyncMock() + mock_instance.ping = AsyncMock(return_value=True) + mock_instance.close = AsyncMock() - manager.observe("http_request_duration_seconds", {"method": "GET", "endpoint": "/"}, 0.05) + mock_pipeline = AsyncMock() + mock_pipeline.hincrbyfloat = MagicMock() + mock_pipeline.execute = AsyncMock(return_value=[]) + mock_instance.pipeline = MagicMock(return_value=mock_pipeline) - mock_redis.pipeline.assert_called() - mock_pipeline.execute.assert_called() + mock_redis_class.return_value = mock_instance - def test_labels_to_key(self, manager): + from functional_scaffold.core.metrics_unified import MetricsManager + + manager = MetricsManager() + await manager.initialize() + + await manager.observe( + "http_request_duration_seconds", {"method": "GET", "endpoint": "/"}, 0.05 + ) + mock_instance.pipeline.assert_called() + + def test_labels_to_key(self): """测试标签转换为 key""" + from functional_scaffold.core.metrics_unified import MetricsManager + + manager = MetricsManager() labels = {"method": "GET", "endpoint": "/api"} key = manager._labels_to_key(labels) assert "method=GET" in key assert "endpoint=/api" in key - def test_labels_to_key_empty(self, manager): + def test_labels_to_key_empty(self): """测试空标签转换""" + from functional_scaffold.core.metrics_unified import MetricsManager + + manager = MetricsManager() key = manager._labels_to_key(None) assert key == "" key = manager._labels_to_key({}) assert key == "" - def test_is_available(self, manager): + @pytest.mark.asyncio + @patch("redis.asyncio.Redis") + async def test_is_available(self, mock_redis_class): """测试 Redis 可用性检查""" + mock_instance = AsyncMock() + mock_instance.ping = AsyncMock(return_value=True) + mock_instance.close = AsyncMock() + mock_redis_class.return_value = mock_instance + + from functional_scaffold.core.metrics_unified import MetricsManager + + manager = MetricsManager() + await manager.initialize() + assert manager.is_available() is True class TestConvenienceFunctions: """便捷函数测试""" - @pytest.fixture(autouse=True) - def setup(self): - """每个测试前重置管理器""" - from functional_scaffold.core.metrics_unified import reset_metrics_manager - - reset_metrics_manager() - - @patch("redis.Redis") - def test_incr_function(self, mock_redis_class): + @pytest.mark.asyncio + @patch("redis.asyncio.Redis") + async def test_incr_function(self, mock_redis_class): """测试 incr 便捷函数""" - mock_instance = MagicMock() - mock_instance.ping.return_value = True + mock_instance = AsyncMock() + mock_instance.ping = AsyncMock(return_value=True) + mock_instance.hincrbyfloat = AsyncMock(return_value=1.0) + mock_instance.close = AsyncMock() mock_redis_class.return_value = mock_instance - from functional_scaffold.core.metrics_unified import incr, reset_metrics_manager + from functional_scaffold.core.metrics_unified import incr - reset_metrics_manager() - incr("http_requests_total", {"method": "GET", "endpoint": "/", "status": "success"}) + await incr( + "http_requests_total", {"method": "GET", "endpoint": "/", "status": "success"} + ) mock_instance.hincrbyfloat.assert_called() - @patch("redis.Redis") - def test_set_function(self, mock_redis_class): + @pytest.mark.asyncio + @patch("redis.asyncio.Redis") + async def test_set_function(self, mock_redis_class): """测试 set 便捷函数""" - mock_instance = MagicMock() - mock_instance.ping.return_value = True + mock_instance = AsyncMock() + mock_instance.ping = AsyncMock(return_value=True) + mock_instance.hset = AsyncMock(return_value=True) + mock_instance.close = AsyncMock() mock_redis_class.return_value = mock_instance - from functional_scaffold.core.metrics_unified import reset_metrics_manager, set + from functional_scaffold.core.metrics_unified import set - reset_metrics_manager() - set("http_requests_in_progress", {}, 10) + await set("http_requests_in_progress", {}, 10) mock_instance.hset.assert_called() - @patch("redis.Redis") - def test_observe_function(self, mock_redis_class): + @pytest.mark.asyncio + @patch("redis.asyncio.Redis") + async def test_observe_function(self, mock_redis_class): """测试 observe 便捷函数""" - mock_instance = MagicMock() - mock_instance.ping.return_value = True - mock_pipeline = MagicMock() - mock_instance.pipeline.return_value = mock_pipeline + mock_instance = AsyncMock() + mock_instance.ping = AsyncMock(return_value=True) + mock_instance.close = AsyncMock() + + mock_pipeline = AsyncMock() + mock_pipeline.hincrbyfloat = MagicMock() + mock_pipeline.execute = AsyncMock(return_value=[]) + mock_instance.pipeline = MagicMock(return_value=mock_pipeline) + mock_redis_class.return_value = mock_instance - from functional_scaffold.core.metrics_unified import observe, reset_metrics_manager + from functional_scaffold.core.metrics_unified import observe - reset_metrics_manager() - observe("http_request_duration_seconds", {"method": "GET", "endpoint": "/"}, 0.1) + await observe("http_request_duration_seconds", {"method": "GET", "endpoint": "/"}, 0.1) mock_instance.pipeline.assert_called() @@ -160,42 +241,49 @@ class TestConvenienceFunctions: class TestExport: """导出功能测试""" - @patch("redis.Redis") - def test_export_counter(self, mock_redis_class): + @pytest.mark.asyncio + @patch("redis.asyncio.Redis") + async def test_export_counter(self, mock_redis_class): """测试导出计数器""" - mock_instance = MagicMock() - mock_instance.ping.return_value = True - mock_instance.hgetall.return_value = {"method=GET,endpoint=/,status=success": "10"} + mock_instance = AsyncMock() + mock_instance.ping = AsyncMock(return_value=True) + mock_instance.hgetall = AsyncMock( + return_value={"method=GET,endpoint=/,status=success": "10"} + ) + mock_instance.hget = AsyncMock(return_value="0") + mock_instance.close = AsyncMock() mock_redis_class.return_value = mock_instance - from functional_scaffold.core.metrics_unified import export, reset_metrics_manager + from functional_scaffold.core.metrics_unified import export - reset_metrics_manager() - output = export() + output = await export() assert "http_requests_total" in output assert "HELP" in output assert "TYPE" in output - @patch("redis.Redis") - def test_export_histogram(self, mock_redis_class): + @pytest.mark.asyncio + @patch("redis.asyncio.Redis") + async def test_export_histogram(self, mock_redis_class): """测试导出直方图""" - mock_instance = MagicMock() - mock_instance.ping.return_value = True - mock_instance.hgetall.side_effect = lambda key: ( - {"method=GET,endpoint=/": "5"} - if "count" in key - else {"method=GET,endpoint=/": "0.5"} - if "sum" in key - else {} - ) - mock_instance.hget.return_value = "3" + mock_instance = AsyncMock() + mock_instance.ping = AsyncMock(return_value=True) + + async def mock_hgetall(key): + if "count" in key: + return {"method=GET,endpoint=/": "5"} + elif "sum" in key: + return {"method=GET,endpoint=/": "0.5"} + return {} + + mock_instance.hgetall = mock_hgetall + mock_instance.hget = AsyncMock(return_value="3") + mock_instance.close = AsyncMock() mock_redis_class.return_value = mock_instance - from functional_scaffold.core.metrics_unified import export, reset_metrics_manager + from functional_scaffold.core.metrics_unified import export - reset_metrics_manager() - output = export() + output = await export() assert "http_request_duration_seconds" in output @@ -226,21 +314,9 @@ class TestEnvVarSubstitution: class TestTrackAlgorithmExecution: """track_algorithm_execution 装饰器测试""" - @patch("redis.Redis") - def test_decorator_success(self, mock_redis_class): + def test_decorator_success(self): """测试装饰器成功执行""" - mock_instance = MagicMock() - mock_instance.ping.return_value = True - mock_pipeline = MagicMock() - mock_instance.pipeline.return_value = mock_pipeline - mock_redis_class.return_value = mock_instance - - from functional_scaffold.core.metrics_unified import ( - reset_metrics_manager, - track_algorithm_execution, - ) - - reset_metrics_manager() + from functional_scaffold.core.metrics_unified import track_algorithm_execution @track_algorithm_execution("test_algo") def test_func(): @@ -249,21 +325,9 @@ class TestTrackAlgorithmExecution: result = test_func() assert result == "result" - @patch("redis.Redis") - def test_decorator_error(self, mock_redis_class): + def test_decorator_error(self): """测试装饰器错误处理""" - mock_instance = MagicMock() - mock_instance.ping.return_value = True - mock_pipeline = MagicMock() - mock_instance.pipeline.return_value = mock_pipeline - mock_redis_class.return_value = mock_instance - - from functional_scaffold.core.metrics_unified import ( - reset_metrics_manager, - track_algorithm_execution, - ) - - reset_metrics_manager() + from functional_scaffold.core.metrics_unified import track_algorithm_execution @track_algorithm_execution("test_algo") def test_func():