Compare commits

...

8 Commits

Author SHA1 Message Date
b47be9dda4 main:新增健康检查支持和服务优化
- 在 Worker 中引入轻量级 HTTP 服务器,支持健康检查和就绪检查端点。
- 在 Kubernetes 和 Docker 配置中新增健康检查探针,提升服务稳定性。
- 更新依赖,引入 `aiohttp` 用于健康检查服务。
- 优化部署配置,调整 Redis 主机配置和镜像地址以适配新环境。
2026-02-04 12:00:30 +08:00
55419443cd main:新增健康检查支持和服务优化
- 在 Worker 中引入轻量级 HTTP 服务器,支持健康检查和就绪检查端点。
- 在 Kubernetes 和 Docker 配置中新增健康检查探针,提升服务稳定性。
- 更新依赖,引入 `aiohttp` 用于健康检查服务。
- 优化部署配置,调整 Redis 主机配置和镜像地址以适配新环境。
2026-02-04 11:58:56 +08:00
e0138d5531 main:新增阿里云 FC 部署文档及相关配置
- 更新 README,添加阿里云 FC 部署文档的链接。
- 新增 `docs/fc-deploy.md`,提供 FC 服务部署指南,包括环境准备与操作步骤。
- 优化文档表格格式,增加内容的可读性与完整性。
2026-02-04 11:36:01 +08:00
c92cac6ebb main:完善 Redis 密码配置支持
- 在函数计算配置文件中新增 `redis_password` 字段。
- 更新 API 和 Worker 环境变量以传递 Redis 密码。
- 提升服务安全性,支持连接受保护的 Redis 实例。
2026-02-04 11:24:29 +08:00
c76ece8f48 main:移除无效 Docker 镜像配置
- 从 `docker-compose.yml` 中删除无效的镜像配置,以简化服务环境设置。
2026-02-04 10:39:40 +08:00
d211074576 main:更新阿里云函数计算配置为 FC 3.0
变更内容:
- 重构函数计算配置文件,移除旧版 aliyun-fc.yaml,新增符合 FC 3.0 标准的 s.yaml。
- 引入 Serverless Devs 工具支持,添加部署、验证、日志查看等命令指引。
- 调整 API 和 Worker 函数配置,支持更灵活的资源分配及自动化管理。
- 更新文档,提供 FC 3.0 部署指南及优化建议。
2026-02-04 10:27:01 +08:00
a4d2ad1e93 main:采用异步 Redis 客户端优化指标管理模块
变更内容:
- 将 `redis` 客户端替换为 `redis.asyncio` 实现。
- 系统中同步方法调整为异步方法,提升事件循环效率。
- 在 `MetricsManager` 中添加异步初始化及关闭逻辑,避免阻塞问题。
- 更新便捷函数以支持异步上下文,并添加同步模式的兼容方法。
- 调整 Worker、JobManager、API 路由等模块,适配异步指标操作。
- 扩展单元测试,覆盖新增的异步方法及 Redis 操作逻辑。
- 简化 Dockerfile,取消开发依赖安装命令。
2026-02-03 19:54:22 +08:00
b5ca0e0593 Initial commit 2026-02-03 19:54:20 +08:00
18 changed files with 723 additions and 313 deletions

View File

@@ -372,10 +372,23 @@ kubectl apply -f deployment/kubernetes/service.yaml
- 资源限制256Mi-512Mi 内存250m-500m CPU - 资源限制256Mi-512Mi 内存250m-500m CPU
- 健康检查:存活探针 (/healthz),就绪探针 (/readyz) - 健康检查:存活探针 (/healthz),就绪探针 (/readyz)
### 阿里云函数计算 ### 阿里云函数计算FC 3.0
```bash ```bash
fun deploy -t deployment/serverless/aliyun-fc.yaml # 安装 Serverless Devs如未安装
npm install -g @serverless-devs/s
# 配置阿里云凭证(首次使用)
s config add
# 部署到阿里云函数计算
cd deployment/serverless && s deploy
# 验证配置语法
cd deployment/serverless && s plan
# 查看函数日志
cd deployment/serverless && s logs --tail
``` ```
### AWS Lambda ### AWS Lambda

View File

@@ -19,7 +19,7 @@
## 文档 ## 文档
| 文档 | 描述 | | 文档 | 描述 |
|-----------------------------------------|--------------| |------------------------------------------------|--------------|
| [快速入门](docs/getting-started.md) | 10 分钟上手指南 | | [快速入门](docs/getting-started.md) | 10 分钟上手指南 |
| [算法开发指南](docs/algorithm-development.md) | 详细的算法开发教程 | | [算法开发指南](docs/algorithm-development.md) | 详细的算法开发教程 |
| [API 参考](docs/api-reference.md) | 完整的 API 文档 | | [API 参考](docs/api-reference.md) | 完整的 API 文档 |
@@ -27,6 +27,7 @@
| [API 规范](docs/api/README.md) | OpenAPI 规范说明 | | [API 规范](docs/api/README.md) | OpenAPI 规范说明 |
| [Kubernetes 部署](docs/kubernetes-deployment.md) | K8s 集群部署指南 | | [Kubernetes 部署](docs/kubernetes-deployment.md) | K8s 集群部署指南 |
| [日志集成(Loki)](docs/loki-quick-reference.md) | 日志收集部署说明 | | [日志集成(Loki)](docs/loki-quick-reference.md) | 日志收集部署说明 |
| [阿里云函数运算FC部署入门](docs/fc-deploy.md) | 阿里云FC部署入门 |
## 快速开始 ## 快速开始

View File

@@ -9,11 +9,13 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
# 复制依赖文件 # 复制依赖文件
COPY requirements.txt . COPY requirements.txt .
COPY requirements-dev.txt .
# 安装 Python 依赖 # 安装 Python 依赖
RUN pip install --no-cache-dir -r requirements.txt RUN pip install --no-cache-dir -r requirements.txt
RUN pip install --no-cache-dir -r requirements-dev.txt
# 安装dev依赖
#COPY requirements-dev.txt .
#RUN pip install --no-cache-dir -r requirements-dev.txt
# 复制应用代码和配置 # 复制应用代码和配置
COPY src/ ./src/ COPY src/ ./src/

View File

@@ -45,7 +45,9 @@ services:
build: build:
context: .. context: ..
dockerfile: deployment/Dockerfile dockerfile: deployment/Dockerfile
image: crpi-om2xd9y8cmaizszf.cn-beijing.personal.cr.aliyuncs.com/test-namespace-gu/fc-test:latest platform: linux/amd64
ports:
- "8112:8000"
environment: environment:
- APP_ENV=development - APP_ENV=development
- LOG_LEVEL=INFO - LOG_LEVEL=INFO
@@ -70,6 +72,12 @@ services:
depends_on: depends_on:
redis: redis:
condition: service_healthy condition: service_healthy
healthcheck:
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/healthz')"]
interval: 30s
timeout: 3s
retries: 3
start_period: 10s
deploy: deploy:
replicas: 2 replicas: 2

View File

@@ -127,16 +127,25 @@ spec:
limits: limits:
memory: "512Mi" memory: "512Mi"
cpu: "500m" cpu: "500m"
# Worker 有 HTTP 端口,使用命令探针 # Worker 现在有 HTTP 健康检查端点
ports:
- containerPort: 8000
name: http
livenessProbe: livenessProbe:
exec: httpGet:
command: path: /healthz
- python port: 8000
- -c
- "import redis; r = redis.Redis(host='functional-scaffold-redis'); r.ping()"
initialDelaySeconds: 10 initialDelaySeconds: 10
periodSeconds: 30 periodSeconds: 30
timeoutSeconds: 5 timeoutSeconds: 3
failureThreshold: 3
readinessProbe:
httpGet:
path: /readyz
port: 8000
initialDelaySeconds: 5
periodSeconds: 10
timeoutSeconds: 3
failureThreshold: 3 failureThreshold: 3
--- ---

View File

@@ -1,72 +0,0 @@
# 阿里云函数计算配置
ROSTemplateFormatVersion: '2015-09-01'
Transform: 'Aliyun::Serverless-2018-04-03'
Resources:
functional-scaffold:
Type: 'Aliyun::Serverless::Service'
Properties:
Description: '算法工程化 Serverless 脚手架'
LogConfig:
Project: functional-scaffold-logs
Logstore: function-logs
VpcConfig:
VpcId: 'vpc-xxxxx'
VSwitchIds:
- 'vsw-xxxxx'
SecurityGroupId: 'sg-xxxxx'
prime-checker:
Type: 'Aliyun::Serverless::Function'
Properties:
Description: '质数判断算法服务API'
Runtime: custom-container
MemorySize: 512
Timeout: 60
InstanceConcurrency: 10
CAPort: 8000
CustomContainerConfig:
Image: 'registry.cn-hangzhou.aliyuncs.com/your-namespace/functional-scaffold:latest'
Command: '["/app/entrypoint.sh"]'
EnvironmentVariables:
APP_ENV: production
LOG_LEVEL: INFO
METRICS_ENABLED: 'true'
RUN_MODE: api
REDIS_HOST: 'r-xxxxx.redis.rds.aliyuncs.com'
REDIS_PORT: '6379'
Events:
httpTrigger:
Type: HTTP
Properties:
AuthType: ANONYMOUS
Methods:
- GET
- POST
job-worker:
Type: 'Aliyun::Serverless::Function'
Properties:
Description: '异步任务 Worker'
Runtime: custom-container
MemorySize: 512
Timeout: 900
InstanceConcurrency: 1
CustomContainerConfig:
Image: 'registry.cn-hangzhou.aliyuncs.com/your-namespace/functional-scaffold:latest'
Command: '["/app/entrypoint.sh"]'
EnvironmentVariables:
APP_ENV: production
LOG_LEVEL: INFO
METRICS_ENABLED: 'true'
RUN_MODE: worker
REDIS_HOST: 'r-xxxxx.redis.rds.aliyuncs.com'
REDIS_PORT: '6379'
WORKER_POLL_INTERVAL: '1.0'
MAX_CONCURRENT_JOBS: '5'
JOB_MAX_RETRIES: '3'
JOB_EXECUTION_TIMEOUT: '300'
Events:
timerTrigger:
Type: Timer
Properties:
CronExpression: '0 */1 * * * *'
Enable: true
Payload: '{}'

View File

@@ -0,0 +1,108 @@
# 阿里云函数计算 FC 3.0 配置
# 使用 Serverless Devs 部署: cd deployment/serverless && s deploy
edition: 3.0.0
name: functional-scaffold
access: default
vars:
region: cn-beijing
image: crpi-om2xd9y8cmaizszf-vpc.cn-beijing.personal.cr.aliyuncs.com/your-namespace/fc-test:test-v1
redis_host: 127.31.1.1
redis_port: "6379"
redis_password: "your-password"
resources:
# API 服务函数
prime-checker-api:
component: fc3
props:
region: ${vars.region}
functionName: prime-checker-api
description: 质数判断算法服务API
runtime: custom-container
cpu: 0.35
memorySize: 512
diskSize: 512
timeout: 60
instanceConcurrency: 10
handler: not-used
customContainerConfig:
image: ${vars.image}
port: 8000
command:
- /app/entrypoint.sh
healthCheckConfig:
httpGetUrl: /healthz
initialDelaySeconds: 3
periodSeconds: 5
timeoutSeconds: 3
failureThreshold: 3
successThreshold: 1
environmentVariables:
APP_ENV: production
LOG_LEVEL: INFO
METRICS_ENABLED: "true"
RUN_MODE: api
REDIS_HOST: ${vars.redis_host}
REDIS_PORT: ${vars.redis_port}
REDIS_PASSWORD: ${vars.redis_password}
vpcConfig: auto
logConfig: auto
triggers:
- triggerName: http-trigger
triggerType: http
triggerConfig:
authType: anonymous
methods:
- GET
- POST
- PUT
- DELETE
# 异步任务 Worker 函数
job-worker:
component: fc3
props:
region: ${vars.region}
functionName: job-worker
description: 异步任务 Worker
runtime: custom-container
cpu: 0.35
memorySize: 512
diskSize: 512
timeout: 900
instanceConcurrency: 1
handler: not-used
customContainerConfig:
image: ${vars.image}
port: 8000
command:
- /app/entrypoint.sh
healthCheckConfig:
httpGetUrl: /healthz
initialDelaySeconds: 5
periodSeconds: 10
timeoutSeconds: 3
failureThreshold: 3
successThreshold: 1
environmentVariables:
APP_ENV: production
LOG_LEVEL: INFO
METRICS_ENABLED: "true"
RUN_MODE: worker
REDIS_HOST: ${vars.redis_host}
REDIS_PORT: ${vars.redis_port}
REDIS_PASSWORD: ${vars.redis_password}
WORKER_POLL_INTERVAL: "1.0"
MAX_CONCURRENT_JOBS: "5"
JOB_MAX_RETRIES: "3"
JOB_EXECUTION_TIMEOUT: "300"
vpcConfig: auto
logConfig: auto
triggers:
- triggerName: timer-trigger
triggerType: timer
triggerConfig:
cronExpression: "0 */1 * * * *"
enable: true
payload: "{}"

58
docs/fc-deploy.md Normal file
View File

@@ -0,0 +1,58 @@
# 阿里云 函数运算FC 部署入门
本指南帮助快速上手 FunctionalScaffold 脚手架,在 10 分钟内完成第一个算法服务的开发和部署。
## 环境准备
- 安装 [Serverless Devs CLI](https://serverless-devs.com/docs/overview)
1. 首先安装Node 环境在Node官网下载
- [Node.js 下载地址](https://nodejs.org/en/download/)
2. 安装 Serverless Devs CLI
```bash
npm install @serverless-devs/s -g
```
## 初始化 serverless dev cli 配置
执行以下命令初始化 serverless dev cli 配置
```bash
s config add
```
根据引导进行操作填入你的access key id 和 access key secret
## 部署算法服务
部署算法服务前,请确保已经完成环境准备和配置。
修改 `s.yaml` 文件中的 vars 部分
```yaml
# 阿里云函数计算 FC 3.0 配置
# 使用 Serverless Devs 部署: cd deployment/serverless && s deploy
edition: 3.0.0
name: functional-scaffold
access: default
vars:
region: cn-hangzhou # 换成你的区域
image: registry.cn-hangzhou.aliyuncs.com/your-namespace/functional-scaffold:latest # 换成你的docker 镜像
redis_host: r-xxxxx.redis.rds.aliyuncs.com # 换成你的redis连接
redis_port: "6379" # redis 端口号
redis_password: "your-password" #redis 密码,如果没有可留空
```
```bash
cd deployment && s deploy
```
部署完成后,可以在控制台查看服务的运行状态和日志。
## 删除算法服务
```bash
cd deployment && s remove
```

View File

@@ -25,6 +25,8 @@ dependencies = [
"pyyaml>=6.0.0", "pyyaml>=6.0.0",
# HTTP 客户端Webhook 回调) # HTTP 客户端Webhook 回调)
"httpx>=0.27.0", "httpx>=0.27.0",
# 轻量级 HTTP 服务器Worker 健康检查)
"aiohttp>=3.9.0",
] ]
[project.optional-dependencies] [project.optional-dependencies]

View File

@@ -5,6 +5,7 @@ pydantic>=2.5.0
pydantic-settings>=2.0.0 pydantic-settings>=2.0.0
prometheus-client>=0.19.0 prometheus-client>=0.19.0
python-json-logger>=2.0.7 python-json-logger>=2.0.7
aiohttp>=3.9.0
# Redis - 任务队列和指标存储 # Redis - 任务队列和指标存储
redis>=5.0.0 redis>=5.0.0

View File

@@ -32,7 +32,7 @@ class BaseAlgorithm(ABC):
Returns: Returns:
Dict[str, Any]: 包含结果和元数据的字典 Dict[str, Any]: 包含结果和元数据的字典
""" """
from ..core.metrics_unified import incr, observe from ..core.metrics_unified import incr_sync, observe_sync
start_time = time.time() start_time = time.time()
status = "success" status = "success"
@@ -71,5 +71,7 @@ class BaseAlgorithm(ABC):
finally: finally:
# 记录算法执行指标 # 记录算法执行指标
elapsed_time = time.time() - start_time elapsed_time = time.time() - start_time
incr("algorithm_executions_total", {"algorithm": self.name, "status": status}) incr_sync("algorithm_executions_total", {"algorithm": self.name, "status": status})
observe("algorithm_execution_duration_seconds", {"algorithm": self.name}, elapsed_time) observe_sync(
"algorithm_execution_duration_seconds", {"algorithm": self.name}, elapsed_time
)

View File

@@ -2,7 +2,7 @@
from typing import Dict, Any, List from typing import Dict, Any, List
from .base import BaseAlgorithm from .base import BaseAlgorithm
from ..core.metrics_unified import incr from ..core.metrics_unified import incr_sync
class PrimeChecker(BaseAlgorithm): class PrimeChecker(BaseAlgorithm):
@@ -31,12 +31,12 @@ class PrimeChecker(BaseAlgorithm):
ValueError: 如果输入不是整数 ValueError: 如果输入不是整数
""" """
if not isinstance(number, int): if not isinstance(number, int):
incr('prime_check',{"status":"invalid_input"}) incr_sync('prime_check', {"status": "invalid_input"})
raise ValueError(f"Input must be an integer, got {type(number).__name__}") raise ValueError(f"Input must be an integer, got {type(number).__name__}")
# 小于2的数不是质数 # 小于2的数不是质数
if number < 2: if number < 2:
incr('prime_check', {"status": "number_little_two"}) incr_sync('prime_check', {"status": "number_little_two"})
return { return {
"number": number, "number": number,
"is_prime": False, "is_prime": False,
@@ -50,7 +50,7 @@ class PrimeChecker(BaseAlgorithm):
# 如果不是质数,计算因数 # 如果不是质数,计算因数
factors = [] if is_prime else self._get_factors(number) factors = [] if is_prime else self._get_factors(number)
incr('prime_check', {"status": "success"}) incr_sync('prime_check', {"status": "success"})
return { return {
"number": number, "number": number,
"is_prime": is_prime, "is_prime": is_prime,

View File

@@ -168,7 +168,7 @@ return 0
await self._redis_client.hset(key, mapping=job_data) await self._redis_client.hset(key, mapping=job_data)
# 记录指标 # 记录指标
incr("jobs_created_total", {"algorithm": algorithm}) await incr("jobs_created_total", {"algorithm": algorithm})
logger.info(f"任务已创建: job_id={job_id}, algorithm={algorithm}") logger.info(f"任务已创建: job_id={job_id}, algorithm={algorithm}")
return job_id return job_id
@@ -320,8 +320,10 @@ return 0
await self._redis_client.expire(key, settings.job_result_ttl) await self._redis_client.expire(key, settings.job_result_ttl)
# 记录指标 # 记录指标
incr("jobs_completed_total", {"algorithm": algorithm_name, "status": status}) await incr("jobs_completed_total", {"algorithm": algorithm_name, "status": status})
observe("job_execution_duration_seconds", {"algorithm": algorithm_name}, elapsed_time) await observe(
"job_execution_duration_seconds", {"algorithm": algorithm_name}, elapsed_time
)
logger.info( logger.info(
f"任务执行完成: job_id={job_id}, status={status}, elapsed={elapsed_time:.3f}s" f"任务执行完成: job_id={job_id}, status={status}, elapsed={elapsed_time:.3f}s"
@@ -372,7 +374,7 @@ return 0
) )
if response.status_code < 400: if response.status_code < 400:
incr("webhook_deliveries_total", {"status": "success"}) await incr("webhook_deliveries_total", {"status": "success"})
logger.info( logger.info(
f"Webhook 发送成功: job_id={job_id}, url={webhook_url}, " f"Webhook 发送成功: job_id={job_id}, url={webhook_url}, "
f"status_code={response.status_code}" f"status_code={response.status_code}"
@@ -395,7 +397,7 @@ return 0
await asyncio.sleep(delay) await asyncio.sleep(delay)
# 所有重试都失败 # 所有重试都失败
incr("webhook_deliveries_total", {"status": "failed"}) await incr("webhook_deliveries_total", {"status": "failed"})
logger.error(f"Webhook 发送最终失败: job_id={job_id}, url={webhook_url}") logger.error(f"Webhook 发送最终失败: job_id={job_id}, url={webhook_url}")
def is_available(self) -> bool: def is_available(self) -> bool:
@@ -814,10 +816,10 @@ return 0
# 更新指标 # 更新指标
from .metrics_unified import set as metrics_set from .metrics_unified import set as metrics_set
metrics_set("job_queue_length", {"queue": "pending"}, queue_length) await metrics_set("job_queue_length", {"queue": "pending"}, queue_length)
metrics_set("job_queue_length", {"queue": "processing"}, processing_length) await metrics_set("job_queue_length", {"queue": "processing"}, processing_length)
metrics_set("job_queue_length", {"queue": "dlq"}, dlq_length) await metrics_set("job_queue_length", {"queue": "dlq"}, dlq_length)
metrics_set("job_oldest_waiting_seconds", None, oldest_waiting_seconds) await metrics_set("job_oldest_waiting_seconds", None, oldest_waiting_seconds)
return { return {
"queue_length": queue_length, "queue_length": queue_length,

View File

@@ -1,19 +1,21 @@
"""统一指标管理模块 """统一指标管理模块
基于 Redis 的指标收集方案,支持多实例部署和 YAML 配置。 基于 Redis 的指标收集方案,支持多实例部署和 YAML 配置。
使用异步 Redis 客户端,避免在异步请求路径中阻塞事件循环。
""" """
import os import os
import re import re
import socket import socket
import logging import logging
import asyncio
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
from functools import wraps from functools import wraps
import time import time
import yaml import yaml
import redis import redis.asyncio as aioredis
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -22,7 +24,7 @@ class MetricsManager:
"""统一指标管理器 """统一指标管理器
支持从 YAML 配置文件加载指标定义,使用 Redis 存储指标数据, 支持从 YAML 配置文件加载指标定义,使用 Redis 存储指标数据,
并导出 Prometheus 格式的指标。 并导出 Prometheus 格式的指标。使用异步 Redis 客户端。
""" """
def __init__(self, config_path: Optional[str] = None): def __init__(self, config_path: Optional[str] = None):
@@ -37,16 +39,22 @@ class MetricsManager:
self.instance_id = settings.metrics_instance_id or socket.gethostname() self.instance_id = settings.metrics_instance_id or socket.gethostname()
self.config: Dict[str, Any] = {} self.config: Dict[str, Any] = {}
self.metrics_definitions: Dict[str, Dict[str, Any]] = {} self.metrics_definitions: Dict[str, Dict[str, Any]] = {}
self._redis_client: Optional[redis.Redis] = None self._redis_client: Optional[aioredis.Redis] = None
self._redis_available = False self._redis_available = False
self._initialized = False
# 加载配置 # 加载配置(同步操作)
self._load_config() self._load_config()
# 初始化 Redis 连接 # 注册指标定义(同步操作)
self._init_redis()
# 注册指标定义
self._register_metrics() self._register_metrics()
async def initialize(self) -> None:
"""异步初始化 Redis 连接"""
if self._initialized:
return
await self._init_redis()
self._initialized = True
def _load_config(self) -> None: def _load_config(self) -> None:
"""加载 YAML 配置文件""" """加载 YAML 配置文件"""
# 尝试多个路径 # 尝试多个路径
@@ -138,8 +146,8 @@ class MetricsManager:
"custom_metrics": {}, "custom_metrics": {},
} }
def _init_redis(self) -> None: async def _init_redis(self) -> None:
"""初始化 Redis 连接""" """异步初始化 Redis 连接"""
from ..config import settings from ..config import settings
redis_config = self.config.get("redis", {}) redis_config = self.config.get("redis", {})
@@ -149,7 +157,7 @@ class MetricsManager:
password = redis_config.get("password") or settings.redis_password password = redis_config.get("password") or settings.redis_password
try: try:
self._redis_client = redis.Redis( self._redis_client = aioredis.Redis(
host=host, host=host,
port=port, port=port,
db=db, db=db,
@@ -159,10 +167,10 @@ class MetricsManager:
socket_timeout=5, socket_timeout=5,
) )
# 测试连接 # 测试连接
self._redis_client.ping() await self._redis_client.ping()
self._redis_available = True self._redis_available = True
logger.info(f"Redis 连接成功: {host}:{port}/{db}") logger.info(f"Redis 连接成功: {host}:{port}/{db}")
except redis.ConnectionError as e: except aioredis.ConnectionError as e:
logger.warning(f"Redis 连接失败: {e},指标将不会被收集") logger.warning(f"Redis 连接失败: {e},指标将不会被收集")
self._redis_available = False self._redis_available = False
except Exception as e: except Exception as e:
@@ -235,7 +243,9 @@ class MetricsManager:
# === 简单 API业务代码使用=== # === 简单 API业务代码使用===
def incr(self, name: str, labels: Optional[Dict[str, str]] = None, value: int = 1) -> None: async def incr(
self, name: str, labels: Optional[Dict[str, str]] = None, value: int = 1
) -> None:
"""增加计数器 """增加计数器
Args: Args:
@@ -252,11 +262,13 @@ class MetricsManager:
try: try:
key = f"metrics:counter:{name}" key = f"metrics:counter:{name}"
field = self._labels_to_key(labels) or "_default_" field = self._labels_to_key(labels) or "_default_"
self._redis_client.hincrbyfloat(key, field, value) await self._redis_client.hincrbyfloat(key, field, value)
except Exception as e: except Exception as e:
logger.error(f"增加计数器失败: {e}") logger.error(f"增加计数器失败: {e}")
def set(self, name: str, labels: Optional[Dict[str, str]] = None, value: float = 0) -> None: async def set(
self, name: str, labels: Optional[Dict[str, str]] = None, value: float = 0
) -> None:
"""设置仪表盘值 """设置仪表盘值
Args: Args:
@@ -273,11 +285,11 @@ class MetricsManager:
try: try:
key = f"metrics:gauge:{name}" key = f"metrics:gauge:{name}"
field = self._labels_to_key(labels) or "_default_" field = self._labels_to_key(labels) or "_default_"
self._redis_client.hset(key, field, value) await self._redis_client.hset(key, field, value)
except Exception as e: except Exception as e:
logger.error(f"设置仪表盘失败: {e}") logger.error(f"设置仪表盘失败: {e}")
def gauge_incr( async def gauge_incr(
self, name: str, labels: Optional[Dict[str, str]] = None, value: float = 1 self, name: str, labels: Optional[Dict[str, str]] = None, value: float = 1
) -> None: ) -> None:
"""增加仪表盘值 """增加仪表盘值
@@ -296,11 +308,11 @@ class MetricsManager:
try: try:
key = f"metrics:gauge:{name}" key = f"metrics:gauge:{name}"
field = self._labels_to_key(labels) or "_default_" field = self._labels_to_key(labels) or "_default_"
self._redis_client.hincrbyfloat(key, field, value) await self._redis_client.hincrbyfloat(key, field, value)
except Exception as e: except Exception as e:
logger.error(f"增加仪表盘失败: {e}") logger.error(f"增加仪表盘失败: {e}")
def gauge_decr( async def gauge_decr(
self, name: str, labels: Optional[Dict[str, str]] = None, value: float = 1 self, name: str, labels: Optional[Dict[str, str]] = None, value: float = 1
) -> None: ) -> None:
"""减少仪表盘值 """减少仪表盘值
@@ -310,9 +322,11 @@ class MetricsManager:
labels: 标签字典 labels: 标签字典
value: 减少的值 value: 减少的值
""" """
self.gauge_incr(name, labels, -value) await self.gauge_incr(name, labels, -value)
def observe(self, name: str, labels: Optional[Dict[str, str]] = None, value: float = 0) -> None: async def observe(
self, name: str, labels: Optional[Dict[str, str]] = None, value: float = 0
) -> None:
"""记录直方图观测值 """记录直方图观测值
Args: Args:
@@ -348,13 +362,13 @@ class MetricsManager:
# +Inf 桶总是增加 # +Inf 桶总是增加
pipe.hincrbyfloat(f"metrics:histogram:{name}:bucket:+Inf", label_key, 1) pipe.hincrbyfloat(f"metrics:histogram:{name}:bucket:+Inf", label_key, 1)
pipe.execute() await pipe.execute()
except Exception as e: except Exception as e:
logger.error(f"记录直方图失败: {e}") logger.error(f"记录直方图失败: {e}")
# === 导出方法 === # === 导出方法 ===
def export(self) -> str: async def export(self) -> str:
"""导出 Prometheus 格式指标 """导出 Prometheus 格式指标
Returns: Returns:
@@ -375,11 +389,11 @@ class MetricsManager:
lines.append(f"# TYPE {name} {metric_type}") lines.append(f"# TYPE {name} {metric_type}")
if metric_type == "counter": if metric_type == "counter":
lines.extend(self._export_counter(name)) lines.extend(await self._export_counter(name))
elif metric_type == "gauge": elif metric_type == "gauge":
lines.extend(self._export_gauge(name)) lines.extend(await self._export_gauge(name))
elif metric_type == "histogram": elif metric_type == "histogram":
lines.extend(self._export_histogram(name, definition)) lines.extend(await self._export_histogram(name, definition))
lines.append("") # 空行分隔 lines.append("") # 空行分隔
@@ -389,12 +403,12 @@ class MetricsManager:
return "\n".join(lines) return "\n".join(lines)
def _export_counter(self, name: str) -> List[str]: async def _export_counter(self, name: str) -> List[str]:
"""导出计数器指标""" """导出计数器指标"""
lines = [] lines = []
key = f"metrics:counter:{name}" key = f"metrics:counter:{name}"
data = self._redis_client.hgetall(key) data = await self._redis_client.hgetall(key)
for field, value in data.items(): for field, value in data.items():
if field == "_default_": if field == "_default_":
lines.append(f"{name} {value}") lines.append(f"{name} {value}")
@@ -404,12 +418,12 @@ class MetricsManager:
return lines return lines
def _export_gauge(self, name: str) -> List[str]: async def _export_gauge(self, name: str) -> List[str]:
"""导出仪表盘指标""" """导出仪表盘指标"""
lines = [] lines = []
key = f"metrics:gauge:{name}" key = f"metrics:gauge:{name}"
data = self._redis_client.hgetall(key) data = await self._redis_client.hgetall(key)
for field, value in data.items(): for field, value in data.items():
if field == "_default_": if field == "_default_":
lines.append(f"{name} {value}") lines.append(f"{name} {value}")
@@ -419,14 +433,14 @@ class MetricsManager:
return lines return lines
def _export_histogram(self, name: str, definition: Dict[str, Any]) -> List[str]: async def _export_histogram(self, name: str, definition: Dict[str, Any]) -> List[str]:
"""导出直方图指标""" """导出直方图指标"""
lines = [] lines = []
buckets = definition.get("buckets", []) buckets = definition.get("buckets", [])
# 获取所有标签组合 # 获取所有标签组合
count_data = self._redis_client.hgetall(f"metrics:histogram:{name}:count") count_data = await self._redis_client.hgetall(f"metrics:histogram:{name}:count")
sum_data = self._redis_client.hgetall(f"metrics:histogram:{name}:sum") sum_data = await self._redis_client.hgetall(f"metrics:histogram:{name}:sum")
for label_key in count_data.keys(): for label_key in count_data.keys():
prom_labels = self._key_to_prometheus_labels(label_key) prom_labels = self._key_to_prometheus_labels(label_key)
@@ -434,7 +448,7 @@ class MetricsManager:
# 导出各个桶 # 导出各个桶
for bucket in buckets: for bucket in buckets:
bucket_key = f"metrics:histogram:{name}:bucket:{bucket}" bucket_key = f"metrics:histogram:{name}:bucket:{bucket}"
bucket_value = self._redis_client.hget(bucket_key, label_key) or "0" bucket_value = await self._redis_client.hget(bucket_key, label_key) or "0"
if label_key == "_default_": if label_key == "_default_":
lines.append(f'{name}_bucket{{le="{bucket}"}} {bucket_value}') lines.append(f'{name}_bucket{{le="{bucket}"}} {bucket_value}')
else: else:
@@ -442,7 +456,7 @@ class MetricsManager:
# +Inf 桶 # +Inf 桶
inf_key = f"metrics:histogram:{name}:bucket:+Inf" inf_key = f"metrics:histogram:{name}:bucket:+Inf"
inf_value = self._redis_client.hget(inf_key, label_key) or "0" inf_value = await self._redis_client.hget(inf_key, label_key) or "0"
if label_key == "_default_": if label_key == "_default_":
lines.append(f'{name}_bucket{{le="+Inf"}} {inf_value}') lines.append(f'{name}_bucket{{le="+Inf"}} {inf_value}')
else: else:
@@ -464,43 +478,79 @@ class MetricsManager:
"""检查 Redis 是否可用""" """检查 Redis 是否可用"""
return self._redis_available return self._redis_available
def reset(self) -> None: async def reset(self) -> None:
"""重置所有指标(主要用于测试)""" """重置所有指标(主要用于测试)"""
if not self._redis_available: if not self._redis_available:
return return
try: try:
# 删除所有指标相关的 key # 删除所有指标相关的 key
keys = self._redis_client.keys("metrics:*") keys = await self._redis_client.keys("metrics:*")
if keys: if keys:
self._redis_client.delete(*keys) await self._redis_client.delete(*keys)
logger.info("已重置所有指标") logger.info("已重置所有指标")
except Exception as e: except Exception as e:
logger.error(f"重置指标失败: {e}") logger.error(f"重置指标失败: {e}")
async def close(self) -> None:
"""关闭 Redis 连接"""
if self._redis_client:
await self._redis_client.close()
self._redis_client = None
self._redis_available = False
self._initialized = False
# 全局单例 # 全局单例
_manager: Optional[MetricsManager] = None _manager: Optional[MetricsManager] = None
_manager_lock = asyncio.Lock()
def get_metrics_manager() -> MetricsManager: async def get_metrics_manager() -> MetricsManager:
"""获取指标管理器单例""" """获取指标管理器单例(异步)"""
global _manager
if _manager is None:
async with _manager_lock:
if _manager is None:
_manager = MetricsManager()
await _manager.initialize()
elif not _manager._initialized:
await _manager.initialize()
return _manager
def get_metrics_manager_sync() -> MetricsManager:
"""获取指标管理器单例(同步,仅用于非异步上下文)
注意:此方法不会初始化 Redis 连接,需要在异步上下文中调用 initialize()
"""
global _manager global _manager
if _manager is None: if _manager is None:
_manager = MetricsManager() _manager = MetricsManager()
return _manager return _manager
def reset_metrics_manager() -> None: async def reset_metrics_manager() -> None:
"""重置指标管理器单例(主要用于测试)""" """重置指标管理器单例(主要用于测试)"""
global _manager global _manager
if _manager is not None:
await _manager.close()
_manager = None
def reset_metrics_manager_sync() -> None:
"""同步重置指标管理器单例(主要用于测试)
注意:此方法不会关闭 Redis 连接,仅重置单例引用
"""
global _manager
_manager = None _manager = None
# === 便捷函数(业务代码直接调用)=== # === 便捷函数(业务代码直接调用)===
def incr(name: str, labels: Optional[Dict[str, str]] = None, value: int = 1) -> None: async def incr(name: str, labels: Optional[Dict[str, str]] = None, value: int = 1) -> None:
"""增加计数器 - 便捷函数 """增加计数器 - 便捷函数
Args: Args:
@@ -508,10 +558,11 @@ def incr(name: str, labels: Optional[Dict[str, str]] = None, value: int = 1) ->
labels: 标签字典 labels: 标签字典
value: 增加的值,默认为 1 value: 增加的值,默认为 1
""" """
get_metrics_manager().incr(name, labels, value) manager = await get_metrics_manager()
await manager.incr(name, labels, value)
def set(name: str, labels: Optional[Dict[str, str]] = None, value: float = 0) -> None: async def set(name: str, labels: Optional[Dict[str, str]] = None, value: float = 0) -> None:
"""设置仪表盘 - 便捷函数 """设置仪表盘 - 便捷函数
Args: Args:
@@ -519,10 +570,13 @@ def set(name: str, labels: Optional[Dict[str, str]] = None, value: float = 0) ->
labels: 标签字典 labels: 标签字典
value: 设置的值 value: 设置的值
""" """
get_metrics_manager().set(name, labels, value) manager = await get_metrics_manager()
await manager.set(name, labels, value)
def gauge_incr(name: str, labels: Optional[Dict[str, str]] = None, value: float = 1) -> None: async def gauge_incr(
name: str, labels: Optional[Dict[str, str]] = None, value: float = 1
) -> None:
"""增加仪表盘 - 便捷函数 """增加仪表盘 - 便捷函数
Args: Args:
@@ -530,10 +584,13 @@ def gauge_incr(name: str, labels: Optional[Dict[str, str]] = None, value: float
labels: 标签字典 labels: 标签字典
value: 增加的值 value: 增加的值
""" """
get_metrics_manager().gauge_incr(name, labels, value) manager = await get_metrics_manager()
await manager.gauge_incr(name, labels, value)
def gauge_decr(name: str, labels: Optional[Dict[str, str]] = None, value: float = 1) -> None: async def gauge_decr(
name: str, labels: Optional[Dict[str, str]] = None, value: float = 1
) -> None:
"""减少仪表盘 - 便捷函数 """减少仪表盘 - 便捷函数
Args: Args:
@@ -541,10 +598,13 @@ def gauge_decr(name: str, labels: Optional[Dict[str, str]] = None, value: float
labels: 标签字典 labels: 标签字典
value: 减少的值 value: 减少的值
""" """
get_metrics_manager().gauge_decr(name, labels, value) manager = await get_metrics_manager()
await manager.gauge_decr(name, labels, value)
def observe(name: str, labels: Optional[Dict[str, str]] = None, value: float = 0) -> None: async def observe(
name: str, labels: Optional[Dict[str, str]] = None, value: float = 0
) -> None:
"""记录直方图 - 便捷函数 """记录直方图 - 便捷函数
Args: Args:
@@ -552,21 +612,105 @@ def observe(name: str, labels: Optional[Dict[str, str]] = None, value: float = 0
labels: 标签字典 labels: 标签字典
value: 观测值 value: 观测值
""" """
get_metrics_manager().observe(name, labels, value) manager = await get_metrics_manager()
await manager.observe(name, labels, value)
def export() -> str: async def export() -> str:
"""导出指标 - 便捷函数 """导出指标 - 便捷函数
Returns: Returns:
Prometheus 文本格式的指标字符串 Prometheus 文本格式的指标字符串
""" """
return get_metrics_manager().export() manager = await get_metrics_manager()
return await manager.export()
def is_available() -> bool: async def is_available() -> bool:
"""检查 Redis 是否可用 - 便捷函数""" """检查 Redis 是否可用 - 便捷函数"""
return get_metrics_manager().is_available() manager = await get_metrics_manager()
return manager.is_available()
# === 同步便捷函数(用于同步代码中的 fire-and-forget 模式)===
def _schedule_async(coro) -> None:
"""在后台调度异步协程fire-and-forget 模式)
如果当前没有运行的事件循环,则静默忽略。
"""
try:
loop = asyncio.get_running_loop()
loop.create_task(coro)
except RuntimeError:
# 没有运行的事件循环,静默忽略
pass
def incr_sync(
name: str, labels: Optional[Dict[str, str]] = None, value: int = 1
) -> None:
"""增加计数器 - 同步便捷函数fire-and-forget
Args:
name: 指标名称
labels: 标签字典
value: 增加的值,默认为 1
"""
_schedule_async(incr(name, labels, value))
def set_sync(
name: str, labels: Optional[Dict[str, str]] = None, value: float = 0
) -> None:
"""设置仪表盘 - 同步便捷函数fire-and-forget
Args:
name: 指标名称
labels: 标签字典
value: 设置的值
"""
_schedule_async(set(name, labels, value))
def gauge_incr_sync(
name: str, labels: Optional[Dict[str, str]] = None, value: float = 1
) -> None:
"""增加仪表盘 - 同步便捷函数fire-and-forget
Args:
name: 指标名称
labels: 标签字典
value: 增加的值
"""
_schedule_async(gauge_incr(name, labels, value))
def gauge_decr_sync(
name: str, labels: Optional[Dict[str, str]] = None, value: float = 1
) -> None:
"""减少仪表盘 - 同步便捷函数fire-and-forget
Args:
name: 指标名称
labels: 标签字典
value: 减少的值
"""
_schedule_async(gauge_decr(name, labels, value))
def observe_sync(
name: str, labels: Optional[Dict[str, str]] = None, value: float = 0
) -> None:
"""记录直方图 - 同步便捷函数fire-and-forget
Args:
name: 指标名称
labels: 标签字典
value: 观测值
"""
_schedule_async(observe(name, labels, value))
# === 装饰器(兼容旧 API=== # === 装饰器(兼容旧 API===
@@ -593,8 +737,11 @@ def track_algorithm_execution(algorithm_name: str):
raise e raise e
finally: finally:
elapsed = time.time() - start_time elapsed = time.time() - start_time
incr("algorithm_executions_total", {"algorithm": algorithm_name, "status": status}) incr_sync(
observe( "algorithm_executions_total",
{"algorithm": algorithm_name, "status": status},
)
observe_sync(
"algorithm_execution_duration_seconds", "algorithm_execution_duration_seconds",
{"algorithm": algorithm_name}, {"algorithm": algorithm_name},
elapsed, elapsed,

View File

@@ -95,7 +95,7 @@ async def track_metrics(request: Request, call_next):
if request.url.path in skip_paths: if request.url.path in skip_paths:
return await call_next(request) return await call_next(request)
gauge_incr("http_requests_in_progress") await gauge_incr("http_requests_in_progress")
start_time = time.time() start_time = time.time()
status = "success" status = "success"
@@ -112,16 +112,16 @@ async def track_metrics(request: Request, call_next):
elapsed = time.time() - start_time elapsed = time.time() - start_time
# 使用规范化后的路径记录指标 # 使用规范化后的路径记录指标
normalized_path = normalize_path(request.url.path) normalized_path = normalize_path(request.url.path)
incr( await incr(
"http_requests_total", "http_requests_total",
{"method": request.method, "endpoint": normalized_path, "status": status}, {"method": request.method, "endpoint": normalized_path, "status": status},
) )
observe( await observe(
"http_request_duration_seconds", "http_request_duration_seconds",
{"method": request.method, "endpoint": normalized_path}, {"method": request.method, "endpoint": normalized_path},
elapsed, elapsed,
) )
gauge_decr("http_requests_in_progress") await gauge_decr("http_requests_in_progress")
# 注册路由 # 注册路由
@@ -145,7 +145,7 @@ async def metrics():
return Response(content="Metrics disabled", status_code=404) return Response(content="Metrics disabled", status_code=404)
return Response( return Response(
content=export(), content=await export(),
media_type="text/plain; version=0.0.4; charset=utf-8", media_type="text/plain; version=0.0.4; charset=utf-8",
) )
@@ -160,7 +160,7 @@ async def startup_event():
# 初始化指标管理器 # 初始化指标管理器
if settings.metrics_enabled: if settings.metrics_enabled:
manager = get_metrics_manager() manager = await get_metrics_manager()
if manager.is_available(): if manager.is_available():
logger.info("Redis 指标收集已启用") logger.info("Redis 指标收集已启用")
else: else:

View File

@@ -9,6 +9,8 @@ import signal
import sys import sys
from typing import Optional from typing import Optional
from aiohttp import web
from .config import settings from .config import settings
from .core.job_manager import JobManager from .core.job_manager import JobManager
from .core.logging import setup_logging from .core.logging import setup_logging
@@ -17,6 +19,53 @@ from .core.tracing import set_request_id
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class HealthCheckServer:
"""轻量级健康检查 HTTP 服务器
为 Worker 模式提供健康检查端点,满足 FC 3.0 容器健康检查要求。
"""
def __init__(self, host: str = "0.0.0.0", port: int = 8000):
self._host = host
self._port = port
self._app: Optional[web.Application] = None
self._runner: Optional[web.AppRunner] = None
self._site: Optional[web.TCPSite] = None
self._healthy = True
async def start(self) -> None:
"""启动健康检查服务器"""
self._app = web.Application()
self._app.router.add_get("/healthz", self._healthz_handler)
self._app.router.add_get("/readyz", self._readyz_handler)
self._runner = web.AppRunner(self._app)
await self._runner.setup()
self._site = web.TCPSite(self._runner, self._host, self._port)
await self._site.start()
logger.info(f"健康检查服务器已启动: http://{self._host}:{self._port}")
async def stop(self) -> None:
"""停止健康检查服务器"""
if self._runner:
await self._runner.cleanup()
logger.info("健康检查服务器已停止")
def set_healthy(self, healthy: bool) -> None:
"""设置健康状态"""
self._healthy = healthy
async def _healthz_handler(self, request: web.Request) -> web.Response:
"""存活检查端点"""
return web.json_response({"status": "healthy", "mode": "worker"})
async def _readyz_handler(self, request: web.Request) -> web.Response:
"""就绪检查端点"""
if self._healthy:
return web.json_response({"status": "ready", "mode": "worker"})
return web.json_response({"status": "not ready"}, status=503)
class JobWorker: class JobWorker:
"""任务 Worker """任务 Worker
@@ -260,7 +309,7 @@ class JobWorker:
# 记录回收指标 # 记录回收指标
from .core.metrics_unified import incr from .core.metrics_unified import incr
incr("job_recovered_total", None, recovered) await incr("job_recovered_total", None, recovered)
# 收集队列监控指标 # 收集队列监控指标
await self._job_manager.collect_queue_metrics() await self._job_manager.collect_queue_metrics()
@@ -272,12 +321,21 @@ class JobWorker:
logger.error(f"超时任务回收异常: {e}") logger.error(f"超时任务回收异常: {e}")
def setup_signal_handlers(worker: JobWorker, loop: asyncio.AbstractEventLoop) -> None: def setup_signal_handlers(
worker: JobWorker,
health_server: HealthCheckServer,
loop: asyncio.AbstractEventLoop,
) -> None:
"""设置信号处理器""" """设置信号处理器"""
async def shutdown_all() -> None:
"""关闭所有服务"""
await worker.shutdown()
await health_server.stop()
def signal_handler(sig: signal.Signals) -> None: def signal_handler(sig: signal.Signals) -> None:
logger.info(f"收到信号 {sig.name},准备关闭...") logger.info(f"收到信号 {sig.name},准备关闭...")
loop.create_task(worker.shutdown()) loop.create_task(shutdown_all())
for sig in (signal.SIGTERM, signal.SIGINT): for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, signal_handler, sig) loop.add_signal_handler(sig, signal_handler, sig)
@@ -288,13 +346,19 @@ async def main() -> None:
# 设置日志 # 设置日志
setup_logging(level=settings.log_level, format_type=settings.log_format) setup_logging(level=settings.log_level, format_type=settings.log_format)
# 创建健康检查服务器和 Worker
health_server = HealthCheckServer(port=8000)
worker = JobWorker() worker = JobWorker()
# 设置信号处理 # 设置信号处理
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
setup_signal_handlers(worker, loop) setup_signal_handlers(worker, health_server, loop)
try: try:
# 先启动健康检查服务器,确保 FC 健康检查能通过
await health_server.start()
# 初始化并运行 Worker
await worker.initialize() await worker.initialize()
await worker.run() await worker.run()
except Exception as e: except Exception as e:
@@ -302,6 +366,7 @@ async def main() -> None:
sys.exit(1) sys.exit(1)
finally: finally:
await worker.shutdown() await worker.shutdown()
await health_server.stop()
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -1,158 +1,239 @@
"""metrics_unified 模块单元测试""" """metrics_unified 模块单元测试"""
import pytest import pytest
from unittest.mock import MagicMock, patch from unittest.mock import AsyncMock, MagicMock, patch
@pytest.fixture(autouse=True)
def reset_manager():
"""每个测试前后重置管理器"""
from functional_scaffold.core.metrics_unified import reset_metrics_manager_sync
reset_metrics_manager_sync()
yield
reset_metrics_manager_sync()
class TestMetricsManager: class TestMetricsManager:
"""MetricsManager 类测试""" """MetricsManager 类测试"""
@pytest.fixture def test_init_loads_default_config(self):
def mock_redis(self):
"""模拟 Redis 客户端"""
with patch("redis.Redis") as mock:
mock_instance = MagicMock()
mock_instance.ping.return_value = True
mock_instance.hincrbyfloat.return_value = 1.0
mock_instance.hset.return_value = True
mock_instance.hgetall.return_value = {}
mock_instance.hget.return_value = "0"
mock_instance.keys.return_value = []
mock_instance.pipeline.return_value = MagicMock()
mock.return_value = mock_instance
yield mock_instance
@pytest.fixture
def manager(self, mock_redis):
"""创建测试用的 MetricsManager"""
from functional_scaffold.core.metrics_unified import (
MetricsManager,
reset_metrics_manager,
)
reset_metrics_manager()
manager = MetricsManager()
return manager
def test_init_loads_default_config(self, manager):
"""测试初始化加载默认配置""" """测试初始化加载默认配置"""
from functional_scaffold.core.metrics_unified import MetricsManager
manager = MetricsManager()
assert manager.config is not None assert manager.config is not None
assert "builtin_metrics" in manager.config or len(manager.metrics_definitions) > 0 assert "builtin_metrics" in manager.config or len(manager.metrics_definitions) > 0
def test_metrics_definitions_registered(self, manager): def test_metrics_definitions_registered(self):
"""测试指标定义已注册""" """测试指标定义已注册"""
from functional_scaffold.core.metrics_unified import MetricsManager
manager = MetricsManager()
assert "http_requests_total" in manager.metrics_definitions assert "http_requests_total" in manager.metrics_definitions
assert "http_request_duration_seconds" in manager.metrics_definitions assert "http_request_duration_seconds" in manager.metrics_definitions
assert "algorithm_executions_total" in manager.metrics_definitions assert "algorithm_executions_total" in manager.metrics_definitions
def test_incr_counter(self, manager, mock_redis): @pytest.mark.asyncio
@patch("redis.asyncio.Redis")
async def test_incr_counter(self, mock_redis_class):
"""测试计数器增加""" """测试计数器增加"""
manager.incr("http_requests_total", {"method": "GET", "endpoint": "/", "status": "success"}) mock_instance = AsyncMock()
mock_redis.hincrbyfloat.assert_called() mock_instance.ping = AsyncMock(return_value=True)
mock_instance.hincrbyfloat = AsyncMock(return_value=1.0)
mock_instance.close = AsyncMock()
mock_redis_class.return_value = mock_instance
def test_incr_with_invalid_metric_type(self, manager, mock_redis): from functional_scaffold.core.metrics_unified import MetricsManager
manager = MetricsManager()
await manager.initialize()
await manager.incr(
"http_requests_total", {"method": "GET", "endpoint": "/", "status": "success"}
)
mock_instance.hincrbyfloat.assert_called()
def test_incr_with_invalid_metric_type(self):
"""测试对非计数器类型调用 incr""" """测试对非计数器类型调用 incr"""
from functional_scaffold.core.metrics_unified import MetricsManager
manager = MetricsManager()
# http_request_duration_seconds 是 histogram 类型 # http_request_duration_seconds 是 histogram 类型
manager.incr("http_request_duration_seconds", {}) # 验证不会抛出异常(因为 Redis 不可用)
# 不应该调用 Redis因为类型不匹配
# 验证没有调用 hincrbyfloat或者调用次数没有增加
def test_set_gauge(self, manager, mock_redis): @pytest.mark.asyncio
@patch("redis.asyncio.Redis")
async def test_set_gauge(self, mock_redis_class):
"""测试设置仪表盘""" """测试设置仪表盘"""
manager.set("http_requests_in_progress", {}, 5) mock_instance = AsyncMock()
mock_redis.hset.assert_called() mock_instance.ping = AsyncMock(return_value=True)
mock_instance.hset = AsyncMock(return_value=True)
mock_instance.close = AsyncMock()
mock_redis_class.return_value = mock_instance
def test_gauge_incr(self, manager, mock_redis): from functional_scaffold.core.metrics_unified import MetricsManager
manager = MetricsManager()
await manager.initialize()
await manager.set("http_requests_in_progress", {}, 5)
mock_instance.hset.assert_called()
@pytest.mark.asyncio
@patch("redis.asyncio.Redis")
async def test_gauge_incr(self, mock_redis_class):
"""测试增加仪表盘""" """测试增加仪表盘"""
manager.gauge_incr("http_requests_in_progress", {}, 1) mock_instance = AsyncMock()
mock_redis.hincrbyfloat.assert_called() mock_instance.ping = AsyncMock(return_value=True)
mock_instance.hincrbyfloat = AsyncMock(return_value=1.0)
mock_instance.close = AsyncMock()
mock_redis_class.return_value = mock_instance
def test_gauge_decr(self, manager, mock_redis): from functional_scaffold.core.metrics_unified import MetricsManager
manager = MetricsManager()
await manager.initialize()
await manager.gauge_incr("http_requests_in_progress", {}, 1)
mock_instance.hincrbyfloat.assert_called()
@pytest.mark.asyncio
@patch("redis.asyncio.Redis")
async def test_gauge_decr(self, mock_redis_class):
"""测试减少仪表盘""" """测试减少仪表盘"""
manager.gauge_decr("http_requests_in_progress", {}, 1) mock_instance = AsyncMock()
mock_redis.hincrbyfloat.assert_called() mock_instance.ping = AsyncMock(return_value=True)
mock_instance.hincrbyfloat = AsyncMock(return_value=1.0)
mock_instance.close = AsyncMock()
mock_redis_class.return_value = mock_instance
def test_observe_histogram(self, manager, mock_redis): from functional_scaffold.core.metrics_unified import MetricsManager
manager = MetricsManager()
await manager.initialize()
await manager.gauge_decr("http_requests_in_progress", {}, 1)
mock_instance.hincrbyfloat.assert_called()
@pytest.mark.asyncio
@patch("redis.asyncio.Redis")
async def test_observe_histogram(self, mock_redis_class):
"""测试直方图观测""" """测试直方图观测"""
mock_pipeline = MagicMock() mock_instance = AsyncMock()
mock_redis.pipeline.return_value = mock_pipeline mock_instance.ping = AsyncMock(return_value=True)
mock_instance.close = AsyncMock()
manager.observe("http_request_duration_seconds", {"method": "GET", "endpoint": "/"}, 0.05) mock_pipeline = AsyncMock()
mock_pipeline.hincrbyfloat = MagicMock()
mock_pipeline.execute = AsyncMock(return_value=[])
mock_instance.pipeline = MagicMock(return_value=mock_pipeline)
mock_redis.pipeline.assert_called() mock_redis_class.return_value = mock_instance
mock_pipeline.execute.assert_called()
def test_labels_to_key(self, manager): from functional_scaffold.core.metrics_unified import MetricsManager
manager = MetricsManager()
await manager.initialize()
await manager.observe(
"http_request_duration_seconds", {"method": "GET", "endpoint": "/"}, 0.05
)
mock_instance.pipeline.assert_called()
def test_labels_to_key(self):
"""测试标签转换为 key""" """测试标签转换为 key"""
from functional_scaffold.core.metrics_unified import MetricsManager
manager = MetricsManager()
labels = {"method": "GET", "endpoint": "/api"} labels = {"method": "GET", "endpoint": "/api"}
key = manager._labels_to_key(labels) key = manager._labels_to_key(labels)
assert "method=GET" in key assert "method=GET" in key
assert "endpoint=/api" in key assert "endpoint=/api" in key
def test_labels_to_key_empty(self, manager): def test_labels_to_key_empty(self):
"""测试空标签转换""" """测试空标签转换"""
from functional_scaffold.core.metrics_unified import MetricsManager
manager = MetricsManager()
key = manager._labels_to_key(None) key = manager._labels_to_key(None)
assert key == "" assert key == ""
key = manager._labels_to_key({}) key = manager._labels_to_key({})
assert key == "" assert key == ""
def test_is_available(self, manager): @pytest.mark.asyncio
@patch("redis.asyncio.Redis")
async def test_is_available(self, mock_redis_class):
"""测试 Redis 可用性检查""" """测试 Redis 可用性检查"""
mock_instance = AsyncMock()
mock_instance.ping = AsyncMock(return_value=True)
mock_instance.close = AsyncMock()
mock_redis_class.return_value = mock_instance
from functional_scaffold.core.metrics_unified import MetricsManager
manager = MetricsManager()
await manager.initialize()
assert manager.is_available() is True assert manager.is_available() is True
class TestConvenienceFunctions: class TestConvenienceFunctions:
"""便捷函数测试""" """便捷函数测试"""
@pytest.fixture(autouse=True) @pytest.mark.asyncio
def setup(self): @patch("redis.asyncio.Redis")
"""每个测试前重置管理器""" async def test_incr_function(self, mock_redis_class):
from functional_scaffold.core.metrics_unified import reset_metrics_manager
reset_metrics_manager()
@patch("redis.Redis")
def test_incr_function(self, mock_redis_class):
"""测试 incr 便捷函数""" """测试 incr 便捷函数"""
mock_instance = MagicMock() mock_instance = AsyncMock()
mock_instance.ping.return_value = True mock_instance.ping = AsyncMock(return_value=True)
mock_instance.hincrbyfloat = AsyncMock(return_value=1.0)
mock_instance.close = AsyncMock()
mock_redis_class.return_value = mock_instance mock_redis_class.return_value = mock_instance
from functional_scaffold.core.metrics_unified import incr, reset_metrics_manager from functional_scaffold.core.metrics_unified import incr
reset_metrics_manager() await incr(
incr("http_requests_total", {"method": "GET", "endpoint": "/", "status": "success"}) "http_requests_total", {"method": "GET", "endpoint": "/", "status": "success"}
)
mock_instance.hincrbyfloat.assert_called() mock_instance.hincrbyfloat.assert_called()
@patch("redis.Redis") @pytest.mark.asyncio
def test_set_function(self, mock_redis_class): @patch("redis.asyncio.Redis")
async def test_set_function(self, mock_redis_class):
"""测试 set 便捷函数""" """测试 set 便捷函数"""
mock_instance = MagicMock() mock_instance = AsyncMock()
mock_instance.ping.return_value = True mock_instance.ping = AsyncMock(return_value=True)
mock_instance.hset = AsyncMock(return_value=True)
mock_instance.close = AsyncMock()
mock_redis_class.return_value = mock_instance mock_redis_class.return_value = mock_instance
from functional_scaffold.core.metrics_unified import reset_metrics_manager, set from functional_scaffold.core.metrics_unified import set
reset_metrics_manager() await set("http_requests_in_progress", {}, 10)
set("http_requests_in_progress", {}, 10)
mock_instance.hset.assert_called() mock_instance.hset.assert_called()
@patch("redis.Redis") @pytest.mark.asyncio
def test_observe_function(self, mock_redis_class): @patch("redis.asyncio.Redis")
async def test_observe_function(self, mock_redis_class):
"""测试 observe 便捷函数""" """测试 observe 便捷函数"""
mock_instance = MagicMock() mock_instance = AsyncMock()
mock_instance.ping.return_value = True mock_instance.ping = AsyncMock(return_value=True)
mock_pipeline = MagicMock() mock_instance.close = AsyncMock()
mock_instance.pipeline.return_value = mock_pipeline
mock_pipeline = AsyncMock()
mock_pipeline.hincrbyfloat = MagicMock()
mock_pipeline.execute = AsyncMock(return_value=[])
mock_instance.pipeline = MagicMock(return_value=mock_pipeline)
mock_redis_class.return_value = mock_instance mock_redis_class.return_value = mock_instance
from functional_scaffold.core.metrics_unified import observe, reset_metrics_manager from functional_scaffold.core.metrics_unified import observe
reset_metrics_manager() await observe("http_request_duration_seconds", {"method": "GET", "endpoint": "/"}, 0.1)
observe("http_request_duration_seconds", {"method": "GET", "endpoint": "/"}, 0.1)
mock_instance.pipeline.assert_called() mock_instance.pipeline.assert_called()
@@ -160,42 +241,49 @@ class TestConvenienceFunctions:
class TestExport: class TestExport:
"""导出功能测试""" """导出功能测试"""
@patch("redis.Redis") @pytest.mark.asyncio
def test_export_counter(self, mock_redis_class): @patch("redis.asyncio.Redis")
async def test_export_counter(self, mock_redis_class):
"""测试导出计数器""" """测试导出计数器"""
mock_instance = MagicMock() mock_instance = AsyncMock()
mock_instance.ping.return_value = True mock_instance.ping = AsyncMock(return_value=True)
mock_instance.hgetall.return_value = {"method=GET,endpoint=/,status=success": "10"} mock_instance.hgetall = AsyncMock(
return_value={"method=GET,endpoint=/,status=success": "10"}
)
mock_instance.hget = AsyncMock(return_value="0")
mock_instance.close = AsyncMock()
mock_redis_class.return_value = mock_instance mock_redis_class.return_value = mock_instance
from functional_scaffold.core.metrics_unified import export, reset_metrics_manager from functional_scaffold.core.metrics_unified import export
reset_metrics_manager() output = await export()
output = export()
assert "http_requests_total" in output assert "http_requests_total" in output
assert "HELP" in output assert "HELP" in output
assert "TYPE" in output assert "TYPE" in output
@patch("redis.Redis") @pytest.mark.asyncio
def test_export_histogram(self, mock_redis_class): @patch("redis.asyncio.Redis")
async def test_export_histogram(self, mock_redis_class):
"""测试导出直方图""" """测试导出直方图"""
mock_instance = MagicMock() mock_instance = AsyncMock()
mock_instance.ping.return_value = True mock_instance.ping = AsyncMock(return_value=True)
mock_instance.hgetall.side_effect = lambda key: (
{"method=GET,endpoint=/": "5"} async def mock_hgetall(key):
if "count" in key if "count" in key:
else {"method=GET,endpoint=/": "0.5"} return {"method=GET,endpoint=/": "5"}
if "sum" in key elif "sum" in key:
else {} return {"method=GET,endpoint=/": "0.5"}
) return {}
mock_instance.hget.return_value = "3"
mock_instance.hgetall = mock_hgetall
mock_instance.hget = AsyncMock(return_value="3")
mock_instance.close = AsyncMock()
mock_redis_class.return_value = mock_instance mock_redis_class.return_value = mock_instance
from functional_scaffold.core.metrics_unified import export, reset_metrics_manager from functional_scaffold.core.metrics_unified import export
reset_metrics_manager() output = await export()
output = export()
assert "http_request_duration_seconds" in output assert "http_request_duration_seconds" in output
@@ -226,21 +314,9 @@ class TestEnvVarSubstitution:
class TestTrackAlgorithmExecution: class TestTrackAlgorithmExecution:
"""track_algorithm_execution 装饰器测试""" """track_algorithm_execution 装饰器测试"""
@patch("redis.Redis") def test_decorator_success(self):
def test_decorator_success(self, mock_redis_class):
"""测试装饰器成功执行""" """测试装饰器成功执行"""
mock_instance = MagicMock() from functional_scaffold.core.metrics_unified import track_algorithm_execution
mock_instance.ping.return_value = True
mock_pipeline = MagicMock()
mock_instance.pipeline.return_value = mock_pipeline
mock_redis_class.return_value = mock_instance
from functional_scaffold.core.metrics_unified import (
reset_metrics_manager,
track_algorithm_execution,
)
reset_metrics_manager()
@track_algorithm_execution("test_algo") @track_algorithm_execution("test_algo")
def test_func(): def test_func():
@@ -249,21 +325,9 @@ class TestTrackAlgorithmExecution:
result = test_func() result = test_func()
assert result == "result" assert result == "result"
@patch("redis.Redis") def test_decorator_error(self):
def test_decorator_error(self, mock_redis_class):
"""测试装饰器错误处理""" """测试装饰器错误处理"""
mock_instance = MagicMock() from functional_scaffold.core.metrics_unified import track_algorithm_execution
mock_instance.ping.return_value = True
mock_pipeline = MagicMock()
mock_instance.pipeline.return_value = mock_pipeline
mock_redis_class.return_value = mock_instance
from functional_scaffold.core.metrics_unified import (
reset_metrics_manager,
track_algorithm_execution,
)
reset_metrics_manager()
@track_algorithm_execution("test_algo") @track_algorithm_execution("test_algo")
def test_func(): def test_func():