变更内容: - 将 `redis` 客户端替换为 `redis.asyncio` 实现。 - 系统中同步方法调整为异步方法,提升事件循环效率。 - 在 `MetricsManager` 中添加异步初始化及关闭逻辑,避免阻塞问题。 - 更新便捷函数以支持异步上下文,并添加同步模式的兼容方法。 - 调整 Worker、JobManager、API 路由等模块,适配异步指标操作。 - 扩展单元测试,覆盖新增的异步方法及 Redis 操作逻辑。 - 简化 Dockerfile,取消开发依赖安装命令。
338 lines
11 KiB
Python
338 lines
11 KiB
Python
"""metrics_unified 模块单元测试"""
|
|
|
|
import pytest
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_manager():
|
|
"""每个测试前后重置管理器"""
|
|
from functional_scaffold.core.metrics_unified import reset_metrics_manager_sync
|
|
|
|
reset_metrics_manager_sync()
|
|
yield
|
|
reset_metrics_manager_sync()
|
|
|
|
|
|
class TestMetricsManager:
|
|
"""MetricsManager 类测试"""
|
|
|
|
def test_init_loads_default_config(self):
|
|
"""测试初始化加载默认配置"""
|
|
from functional_scaffold.core.metrics_unified import MetricsManager
|
|
|
|
manager = MetricsManager()
|
|
assert manager.config is not None
|
|
assert "builtin_metrics" in manager.config or len(manager.metrics_definitions) > 0
|
|
|
|
def test_metrics_definitions_registered(self):
|
|
"""测试指标定义已注册"""
|
|
from functional_scaffold.core.metrics_unified import MetricsManager
|
|
|
|
manager = MetricsManager()
|
|
assert "http_requests_total" in manager.metrics_definitions
|
|
assert "http_request_duration_seconds" in manager.metrics_definitions
|
|
assert "algorithm_executions_total" in manager.metrics_definitions
|
|
|
|
@pytest.mark.asyncio
|
|
@patch("redis.asyncio.Redis")
|
|
async def test_incr_counter(self, mock_redis_class):
|
|
"""测试计数器增加"""
|
|
mock_instance = AsyncMock()
|
|
mock_instance.ping = AsyncMock(return_value=True)
|
|
mock_instance.hincrbyfloat = AsyncMock(return_value=1.0)
|
|
mock_instance.close = AsyncMock()
|
|
mock_redis_class.return_value = mock_instance
|
|
|
|
from functional_scaffold.core.metrics_unified import MetricsManager
|
|
|
|
manager = MetricsManager()
|
|
await manager.initialize()
|
|
|
|
await manager.incr(
|
|
"http_requests_total", {"method": "GET", "endpoint": "/", "status": "success"}
|
|
)
|
|
mock_instance.hincrbyfloat.assert_called()
|
|
|
|
def test_incr_with_invalid_metric_type(self):
|
|
"""测试对非计数器类型调用 incr"""
|
|
from functional_scaffold.core.metrics_unified import MetricsManager
|
|
|
|
manager = MetricsManager()
|
|
# http_request_duration_seconds 是 histogram 类型
|
|
# 验证不会抛出异常(因为 Redis 不可用)
|
|
|
|
@pytest.mark.asyncio
|
|
@patch("redis.asyncio.Redis")
|
|
async def test_set_gauge(self, mock_redis_class):
|
|
"""测试设置仪表盘"""
|
|
mock_instance = AsyncMock()
|
|
mock_instance.ping = AsyncMock(return_value=True)
|
|
mock_instance.hset = AsyncMock(return_value=True)
|
|
mock_instance.close = AsyncMock()
|
|
mock_redis_class.return_value = mock_instance
|
|
|
|
from functional_scaffold.core.metrics_unified import MetricsManager
|
|
|
|
manager = MetricsManager()
|
|
await manager.initialize()
|
|
|
|
await manager.set("http_requests_in_progress", {}, 5)
|
|
mock_instance.hset.assert_called()
|
|
|
|
@pytest.mark.asyncio
|
|
@patch("redis.asyncio.Redis")
|
|
async def test_gauge_incr(self, mock_redis_class):
|
|
"""测试增加仪表盘"""
|
|
mock_instance = AsyncMock()
|
|
mock_instance.ping = AsyncMock(return_value=True)
|
|
mock_instance.hincrbyfloat = AsyncMock(return_value=1.0)
|
|
mock_instance.close = AsyncMock()
|
|
mock_redis_class.return_value = mock_instance
|
|
|
|
from functional_scaffold.core.metrics_unified import MetricsManager
|
|
|
|
manager = MetricsManager()
|
|
await manager.initialize()
|
|
|
|
await manager.gauge_incr("http_requests_in_progress", {}, 1)
|
|
mock_instance.hincrbyfloat.assert_called()
|
|
|
|
@pytest.mark.asyncio
|
|
@patch("redis.asyncio.Redis")
|
|
async def test_gauge_decr(self, mock_redis_class):
|
|
"""测试减少仪表盘"""
|
|
mock_instance = AsyncMock()
|
|
mock_instance.ping = AsyncMock(return_value=True)
|
|
mock_instance.hincrbyfloat = AsyncMock(return_value=1.0)
|
|
mock_instance.close = AsyncMock()
|
|
mock_redis_class.return_value = mock_instance
|
|
|
|
from functional_scaffold.core.metrics_unified import MetricsManager
|
|
|
|
manager = MetricsManager()
|
|
await manager.initialize()
|
|
|
|
await manager.gauge_decr("http_requests_in_progress", {}, 1)
|
|
mock_instance.hincrbyfloat.assert_called()
|
|
|
|
@pytest.mark.asyncio
|
|
@patch("redis.asyncio.Redis")
|
|
async def test_observe_histogram(self, mock_redis_class):
|
|
"""测试直方图观测"""
|
|
mock_instance = AsyncMock()
|
|
mock_instance.ping = AsyncMock(return_value=True)
|
|
mock_instance.close = AsyncMock()
|
|
|
|
mock_pipeline = AsyncMock()
|
|
mock_pipeline.hincrbyfloat = MagicMock()
|
|
mock_pipeline.execute = AsyncMock(return_value=[])
|
|
mock_instance.pipeline = MagicMock(return_value=mock_pipeline)
|
|
|
|
mock_redis_class.return_value = mock_instance
|
|
|
|
from functional_scaffold.core.metrics_unified import MetricsManager
|
|
|
|
manager = MetricsManager()
|
|
await manager.initialize()
|
|
|
|
await manager.observe(
|
|
"http_request_duration_seconds", {"method": "GET", "endpoint": "/"}, 0.05
|
|
)
|
|
mock_instance.pipeline.assert_called()
|
|
|
|
def test_labels_to_key(self):
|
|
"""测试标签转换为 key"""
|
|
from functional_scaffold.core.metrics_unified import MetricsManager
|
|
|
|
manager = MetricsManager()
|
|
labels = {"method": "GET", "endpoint": "/api"}
|
|
key = manager._labels_to_key(labels)
|
|
assert "method=GET" in key
|
|
assert "endpoint=/api" in key
|
|
|
|
def test_labels_to_key_empty(self):
|
|
"""测试空标签转换"""
|
|
from functional_scaffold.core.metrics_unified import MetricsManager
|
|
|
|
manager = MetricsManager()
|
|
key = manager._labels_to_key(None)
|
|
assert key == ""
|
|
|
|
key = manager._labels_to_key({})
|
|
assert key == ""
|
|
|
|
@pytest.mark.asyncio
|
|
@patch("redis.asyncio.Redis")
|
|
async def test_is_available(self, mock_redis_class):
|
|
"""测试 Redis 可用性检查"""
|
|
mock_instance = AsyncMock()
|
|
mock_instance.ping = AsyncMock(return_value=True)
|
|
mock_instance.close = AsyncMock()
|
|
mock_redis_class.return_value = mock_instance
|
|
|
|
from functional_scaffold.core.metrics_unified import MetricsManager
|
|
|
|
manager = MetricsManager()
|
|
await manager.initialize()
|
|
|
|
assert manager.is_available() is True
|
|
|
|
|
|
class TestConvenienceFunctions:
|
|
"""便捷函数测试"""
|
|
|
|
@pytest.mark.asyncio
|
|
@patch("redis.asyncio.Redis")
|
|
async def test_incr_function(self, mock_redis_class):
|
|
"""测试 incr 便捷函数"""
|
|
mock_instance = AsyncMock()
|
|
mock_instance.ping = AsyncMock(return_value=True)
|
|
mock_instance.hincrbyfloat = AsyncMock(return_value=1.0)
|
|
mock_instance.close = AsyncMock()
|
|
mock_redis_class.return_value = mock_instance
|
|
|
|
from functional_scaffold.core.metrics_unified import incr
|
|
|
|
await incr(
|
|
"http_requests_total", {"method": "GET", "endpoint": "/", "status": "success"}
|
|
)
|
|
|
|
mock_instance.hincrbyfloat.assert_called()
|
|
|
|
@pytest.mark.asyncio
|
|
@patch("redis.asyncio.Redis")
|
|
async def test_set_function(self, mock_redis_class):
|
|
"""测试 set 便捷函数"""
|
|
mock_instance = AsyncMock()
|
|
mock_instance.ping = AsyncMock(return_value=True)
|
|
mock_instance.hset = AsyncMock(return_value=True)
|
|
mock_instance.close = AsyncMock()
|
|
mock_redis_class.return_value = mock_instance
|
|
|
|
from functional_scaffold.core.metrics_unified import set
|
|
|
|
await set("http_requests_in_progress", {}, 10)
|
|
|
|
mock_instance.hset.assert_called()
|
|
|
|
@pytest.mark.asyncio
|
|
@patch("redis.asyncio.Redis")
|
|
async def test_observe_function(self, mock_redis_class):
|
|
"""测试 observe 便捷函数"""
|
|
mock_instance = AsyncMock()
|
|
mock_instance.ping = AsyncMock(return_value=True)
|
|
mock_instance.close = AsyncMock()
|
|
|
|
mock_pipeline = AsyncMock()
|
|
mock_pipeline.hincrbyfloat = MagicMock()
|
|
mock_pipeline.execute = AsyncMock(return_value=[])
|
|
mock_instance.pipeline = MagicMock(return_value=mock_pipeline)
|
|
|
|
mock_redis_class.return_value = mock_instance
|
|
|
|
from functional_scaffold.core.metrics_unified import observe
|
|
|
|
await observe("http_request_duration_seconds", {"method": "GET", "endpoint": "/"}, 0.1)
|
|
|
|
mock_instance.pipeline.assert_called()
|
|
|
|
|
|
class TestExport:
|
|
"""导出功能测试"""
|
|
|
|
@pytest.mark.asyncio
|
|
@patch("redis.asyncio.Redis")
|
|
async def test_export_counter(self, mock_redis_class):
|
|
"""测试导出计数器"""
|
|
mock_instance = AsyncMock()
|
|
mock_instance.ping = AsyncMock(return_value=True)
|
|
mock_instance.hgetall = AsyncMock(
|
|
return_value={"method=GET,endpoint=/,status=success": "10"}
|
|
)
|
|
mock_instance.hget = AsyncMock(return_value="0")
|
|
mock_instance.close = AsyncMock()
|
|
mock_redis_class.return_value = mock_instance
|
|
|
|
from functional_scaffold.core.metrics_unified import export
|
|
|
|
output = await export()
|
|
|
|
assert "http_requests_total" in output
|
|
assert "HELP" in output
|
|
assert "TYPE" in output
|
|
|
|
@pytest.mark.asyncio
|
|
@patch("redis.asyncio.Redis")
|
|
async def test_export_histogram(self, mock_redis_class):
|
|
"""测试导出直方图"""
|
|
mock_instance = AsyncMock()
|
|
mock_instance.ping = AsyncMock(return_value=True)
|
|
|
|
async def mock_hgetall(key):
|
|
if "count" in key:
|
|
return {"method=GET,endpoint=/": "5"}
|
|
elif "sum" in key:
|
|
return {"method=GET,endpoint=/": "0.5"}
|
|
return {}
|
|
|
|
mock_instance.hgetall = mock_hgetall
|
|
mock_instance.hget = AsyncMock(return_value="3")
|
|
mock_instance.close = AsyncMock()
|
|
mock_redis_class.return_value = mock_instance
|
|
|
|
from functional_scaffold.core.metrics_unified import export
|
|
|
|
output = await export()
|
|
|
|
assert "http_request_duration_seconds" in output
|
|
|
|
|
|
class TestEnvVarSubstitution:
|
|
"""环境变量替换测试"""
|
|
|
|
def test_substitute_env_vars(self):
|
|
"""测试环境变量替换"""
|
|
import os
|
|
from functional_scaffold.core.metrics_unified import MetricsManager
|
|
|
|
# 设置测试环境变量
|
|
os.environ["TEST_VAR"] = "test_value"
|
|
|
|
manager = MetricsManager.__new__(MetricsManager)
|
|
result = manager._substitute_env_vars("${TEST_VAR:default}")
|
|
assert result == "test_value"
|
|
|
|
# 测试默认值
|
|
result = manager._substitute_env_vars("${NONEXISTENT_VAR:default_value}")
|
|
assert result == "default_value"
|
|
|
|
# 清理
|
|
del os.environ["TEST_VAR"]
|
|
|
|
|
|
class TestTrackAlgorithmExecution:
|
|
"""track_algorithm_execution 装饰器测试"""
|
|
|
|
def test_decorator_success(self):
|
|
"""测试装饰器成功执行"""
|
|
from functional_scaffold.core.metrics_unified import track_algorithm_execution
|
|
|
|
@track_algorithm_execution("test_algo")
|
|
def test_func():
|
|
return "result"
|
|
|
|
result = test_func()
|
|
assert result == "result"
|
|
|
|
def test_decorator_error(self):
|
|
"""测试装饰器错误处理"""
|
|
from functional_scaffold.core.metrics_unified import track_algorithm_execution
|
|
|
|
@track_algorithm_execution("test_algo")
|
|
def test_func():
|
|
raise ValueError("test error")
|
|
|
|
with pytest.raises(ValueError):
|
|
test_func()
|