Files
FunctionalScaffold/tests/test_metrics_unified.py
Roog (顾新培) a4d2ad1e93 main:采用异步 Redis 客户端优化指标管理模块
变更内容:
- 将 `redis` 客户端替换为 `redis.asyncio` 实现。
- 系统中同步方法调整为异步方法,提升事件循环效率。
- 在 `MetricsManager` 中添加异步初始化及关闭逻辑,避免阻塞问题。
- 更新便捷函数以支持异步上下文,并添加同步模式的兼容方法。
- 调整 Worker、JobManager、API 路由等模块,适配异步指标操作。
- 扩展单元测试,覆盖新增的异步方法及 Redis 操作逻辑。
- 简化 Dockerfile,取消开发依赖安装命令。
2026-02-03 19:54:22 +08:00

338 lines
11 KiB
Python

"""metrics_unified 模块单元测试"""
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
@pytest.fixture(autouse=True)
def reset_manager():
"""每个测试前后重置管理器"""
from functional_scaffold.core.metrics_unified import reset_metrics_manager_sync
reset_metrics_manager_sync()
yield
reset_metrics_manager_sync()
class TestMetricsManager:
"""MetricsManager 类测试"""
def test_init_loads_default_config(self):
"""测试初始化加载默认配置"""
from functional_scaffold.core.metrics_unified import MetricsManager
manager = MetricsManager()
assert manager.config is not None
assert "builtin_metrics" in manager.config or len(manager.metrics_definitions) > 0
def test_metrics_definitions_registered(self):
"""测试指标定义已注册"""
from functional_scaffold.core.metrics_unified import MetricsManager
manager = MetricsManager()
assert "http_requests_total" in manager.metrics_definitions
assert "http_request_duration_seconds" in manager.metrics_definitions
assert "algorithm_executions_total" in manager.metrics_definitions
@pytest.mark.asyncio
@patch("redis.asyncio.Redis")
async def test_incr_counter(self, mock_redis_class):
"""测试计数器增加"""
mock_instance = AsyncMock()
mock_instance.ping = AsyncMock(return_value=True)
mock_instance.hincrbyfloat = AsyncMock(return_value=1.0)
mock_instance.close = AsyncMock()
mock_redis_class.return_value = mock_instance
from functional_scaffold.core.metrics_unified import MetricsManager
manager = MetricsManager()
await manager.initialize()
await manager.incr(
"http_requests_total", {"method": "GET", "endpoint": "/", "status": "success"}
)
mock_instance.hincrbyfloat.assert_called()
def test_incr_with_invalid_metric_type(self):
"""测试对非计数器类型调用 incr"""
from functional_scaffold.core.metrics_unified import MetricsManager
manager = MetricsManager()
# http_request_duration_seconds 是 histogram 类型
# 验证不会抛出异常(因为 Redis 不可用)
@pytest.mark.asyncio
@patch("redis.asyncio.Redis")
async def test_set_gauge(self, mock_redis_class):
"""测试设置仪表盘"""
mock_instance = AsyncMock()
mock_instance.ping = AsyncMock(return_value=True)
mock_instance.hset = AsyncMock(return_value=True)
mock_instance.close = AsyncMock()
mock_redis_class.return_value = mock_instance
from functional_scaffold.core.metrics_unified import MetricsManager
manager = MetricsManager()
await manager.initialize()
await manager.set("http_requests_in_progress", {}, 5)
mock_instance.hset.assert_called()
@pytest.mark.asyncio
@patch("redis.asyncio.Redis")
async def test_gauge_incr(self, mock_redis_class):
"""测试增加仪表盘"""
mock_instance = AsyncMock()
mock_instance.ping = AsyncMock(return_value=True)
mock_instance.hincrbyfloat = AsyncMock(return_value=1.0)
mock_instance.close = AsyncMock()
mock_redis_class.return_value = mock_instance
from functional_scaffold.core.metrics_unified import MetricsManager
manager = MetricsManager()
await manager.initialize()
await manager.gauge_incr("http_requests_in_progress", {}, 1)
mock_instance.hincrbyfloat.assert_called()
@pytest.mark.asyncio
@patch("redis.asyncio.Redis")
async def test_gauge_decr(self, mock_redis_class):
"""测试减少仪表盘"""
mock_instance = AsyncMock()
mock_instance.ping = AsyncMock(return_value=True)
mock_instance.hincrbyfloat = AsyncMock(return_value=1.0)
mock_instance.close = AsyncMock()
mock_redis_class.return_value = mock_instance
from functional_scaffold.core.metrics_unified import MetricsManager
manager = MetricsManager()
await manager.initialize()
await manager.gauge_decr("http_requests_in_progress", {}, 1)
mock_instance.hincrbyfloat.assert_called()
@pytest.mark.asyncio
@patch("redis.asyncio.Redis")
async def test_observe_histogram(self, mock_redis_class):
"""测试直方图观测"""
mock_instance = AsyncMock()
mock_instance.ping = AsyncMock(return_value=True)
mock_instance.close = AsyncMock()
mock_pipeline = AsyncMock()
mock_pipeline.hincrbyfloat = MagicMock()
mock_pipeline.execute = AsyncMock(return_value=[])
mock_instance.pipeline = MagicMock(return_value=mock_pipeline)
mock_redis_class.return_value = mock_instance
from functional_scaffold.core.metrics_unified import MetricsManager
manager = MetricsManager()
await manager.initialize()
await manager.observe(
"http_request_duration_seconds", {"method": "GET", "endpoint": "/"}, 0.05
)
mock_instance.pipeline.assert_called()
def test_labels_to_key(self):
"""测试标签转换为 key"""
from functional_scaffold.core.metrics_unified import MetricsManager
manager = MetricsManager()
labels = {"method": "GET", "endpoint": "/api"}
key = manager._labels_to_key(labels)
assert "method=GET" in key
assert "endpoint=/api" in key
def test_labels_to_key_empty(self):
"""测试空标签转换"""
from functional_scaffold.core.metrics_unified import MetricsManager
manager = MetricsManager()
key = manager._labels_to_key(None)
assert key == ""
key = manager._labels_to_key({})
assert key == ""
@pytest.mark.asyncio
@patch("redis.asyncio.Redis")
async def test_is_available(self, mock_redis_class):
"""测试 Redis 可用性检查"""
mock_instance = AsyncMock()
mock_instance.ping = AsyncMock(return_value=True)
mock_instance.close = AsyncMock()
mock_redis_class.return_value = mock_instance
from functional_scaffold.core.metrics_unified import MetricsManager
manager = MetricsManager()
await manager.initialize()
assert manager.is_available() is True
class TestConvenienceFunctions:
"""便捷函数测试"""
@pytest.mark.asyncio
@patch("redis.asyncio.Redis")
async def test_incr_function(self, mock_redis_class):
"""测试 incr 便捷函数"""
mock_instance = AsyncMock()
mock_instance.ping = AsyncMock(return_value=True)
mock_instance.hincrbyfloat = AsyncMock(return_value=1.0)
mock_instance.close = AsyncMock()
mock_redis_class.return_value = mock_instance
from functional_scaffold.core.metrics_unified import incr
await incr(
"http_requests_total", {"method": "GET", "endpoint": "/", "status": "success"}
)
mock_instance.hincrbyfloat.assert_called()
@pytest.mark.asyncio
@patch("redis.asyncio.Redis")
async def test_set_function(self, mock_redis_class):
"""测试 set 便捷函数"""
mock_instance = AsyncMock()
mock_instance.ping = AsyncMock(return_value=True)
mock_instance.hset = AsyncMock(return_value=True)
mock_instance.close = AsyncMock()
mock_redis_class.return_value = mock_instance
from functional_scaffold.core.metrics_unified import set
await set("http_requests_in_progress", {}, 10)
mock_instance.hset.assert_called()
@pytest.mark.asyncio
@patch("redis.asyncio.Redis")
async def test_observe_function(self, mock_redis_class):
"""测试 observe 便捷函数"""
mock_instance = AsyncMock()
mock_instance.ping = AsyncMock(return_value=True)
mock_instance.close = AsyncMock()
mock_pipeline = AsyncMock()
mock_pipeline.hincrbyfloat = MagicMock()
mock_pipeline.execute = AsyncMock(return_value=[])
mock_instance.pipeline = MagicMock(return_value=mock_pipeline)
mock_redis_class.return_value = mock_instance
from functional_scaffold.core.metrics_unified import observe
await observe("http_request_duration_seconds", {"method": "GET", "endpoint": "/"}, 0.1)
mock_instance.pipeline.assert_called()
class TestExport:
"""导出功能测试"""
@pytest.mark.asyncio
@patch("redis.asyncio.Redis")
async def test_export_counter(self, mock_redis_class):
"""测试导出计数器"""
mock_instance = AsyncMock()
mock_instance.ping = AsyncMock(return_value=True)
mock_instance.hgetall = AsyncMock(
return_value={"method=GET,endpoint=/,status=success": "10"}
)
mock_instance.hget = AsyncMock(return_value="0")
mock_instance.close = AsyncMock()
mock_redis_class.return_value = mock_instance
from functional_scaffold.core.metrics_unified import export
output = await export()
assert "http_requests_total" in output
assert "HELP" in output
assert "TYPE" in output
@pytest.mark.asyncio
@patch("redis.asyncio.Redis")
async def test_export_histogram(self, mock_redis_class):
"""测试导出直方图"""
mock_instance = AsyncMock()
mock_instance.ping = AsyncMock(return_value=True)
async def mock_hgetall(key):
if "count" in key:
return {"method=GET,endpoint=/": "5"}
elif "sum" in key:
return {"method=GET,endpoint=/": "0.5"}
return {}
mock_instance.hgetall = mock_hgetall
mock_instance.hget = AsyncMock(return_value="3")
mock_instance.close = AsyncMock()
mock_redis_class.return_value = mock_instance
from functional_scaffold.core.metrics_unified import export
output = await export()
assert "http_request_duration_seconds" in output
class TestEnvVarSubstitution:
"""环境变量替换测试"""
def test_substitute_env_vars(self):
"""测试环境变量替换"""
import os
from functional_scaffold.core.metrics_unified import MetricsManager
# 设置测试环境变量
os.environ["TEST_VAR"] = "test_value"
manager = MetricsManager.__new__(MetricsManager)
result = manager._substitute_env_vars("${TEST_VAR:default}")
assert result == "test_value"
# 测试默认值
result = manager._substitute_env_vars("${NONEXISTENT_VAR:default_value}")
assert result == "default_value"
# 清理
del os.environ["TEST_VAR"]
class TestTrackAlgorithmExecution:
"""track_algorithm_execution 装饰器测试"""
def test_decorator_success(self):
"""测试装饰器成功执行"""
from functional_scaffold.core.metrics_unified import track_algorithm_execution
@track_algorithm_execution("test_algo")
def test_func():
return "result"
result = test_func()
assert result == "result"
def test_decorator_error(self):
"""测试装饰器错误处理"""
from functional_scaffold.core.metrics_unified import track_algorithm_execution
@track_algorithm_execution("test_algo")
def test_func():
raise ValueError("test error")
with pytest.raises(ValueError):
test_func()