Compare commits
8 Commits
7b627090f3
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| b47be9dda4 | |||
| 55419443cd | |||
| e0138d5531 | |||
| c92cac6ebb | |||
| c76ece8f48 | |||
| d211074576 | |||
| a4d2ad1e93 | |||
| b5ca0e0593 |
17
CLAUDE.md
17
CLAUDE.md
@@ -372,10 +372,23 @@ kubectl apply -f deployment/kubernetes/service.yaml
|
|||||||
- 资源限制:256Mi-512Mi 内存,250m-500m CPU
|
- 资源限制:256Mi-512Mi 内存,250m-500m CPU
|
||||||
- 健康检查:存活探针 (/healthz),就绪探针 (/readyz)
|
- 健康检查:存活探针 (/healthz),就绪探针 (/readyz)
|
||||||
|
|
||||||
### 阿里云函数计算
|
### 阿里云函数计算(FC 3.0)
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
fun deploy -t deployment/serverless/aliyun-fc.yaml
|
# 安装 Serverless Devs(如未安装)
|
||||||
|
npm install -g @serverless-devs/s
|
||||||
|
|
||||||
|
# 配置阿里云凭证(首次使用)
|
||||||
|
s config add
|
||||||
|
|
||||||
|
# 部署到阿里云函数计算
|
||||||
|
cd deployment/serverless && s deploy
|
||||||
|
|
||||||
|
# 验证配置语法
|
||||||
|
cd deployment/serverless && s plan
|
||||||
|
|
||||||
|
# 查看函数日志
|
||||||
|
cd deployment/serverless && s logs --tail
|
||||||
```
|
```
|
||||||
|
|
||||||
### AWS Lambda
|
### AWS Lambda
|
||||||
|
|||||||
@@ -19,7 +19,7 @@
|
|||||||
## 文档
|
## 文档
|
||||||
|
|
||||||
| 文档 | 描述 |
|
| 文档 | 描述 |
|
||||||
|-----------------------------------------|--------------|
|
|------------------------------------------------|--------------|
|
||||||
| [快速入门](docs/getting-started.md) | 10 分钟上手指南 |
|
| [快速入门](docs/getting-started.md) | 10 分钟上手指南 |
|
||||||
| [算法开发指南](docs/algorithm-development.md) | 详细的算法开发教程 |
|
| [算法开发指南](docs/algorithm-development.md) | 详细的算法开发教程 |
|
||||||
| [API 参考](docs/api-reference.md) | 完整的 API 文档 |
|
| [API 参考](docs/api-reference.md) | 完整的 API 文档 |
|
||||||
@@ -27,6 +27,7 @@
|
|||||||
| [API 规范](docs/api/README.md) | OpenAPI 规范说明 |
|
| [API 规范](docs/api/README.md) | OpenAPI 规范说明 |
|
||||||
| [Kubernetes 部署](docs/kubernetes-deployment.md) | K8s 集群部署指南 |
|
| [Kubernetes 部署](docs/kubernetes-deployment.md) | K8s 集群部署指南 |
|
||||||
| [日志集成(Loki)](docs/loki-quick-reference.md) | 日志收集部署说明 |
|
| [日志集成(Loki)](docs/loki-quick-reference.md) | 日志收集部署说明 |
|
||||||
|
| [阿里云函数运算FC部署入门](docs/fc-deploy.md) | 阿里云FC部署入门 |
|
||||||
|
|
||||||
## 快速开始
|
## 快速开始
|
||||||
|
|
||||||
|
|||||||
@@ -9,11 +9,13 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
|
|||||||
|
|
||||||
# 复制依赖文件
|
# 复制依赖文件
|
||||||
COPY requirements.txt .
|
COPY requirements.txt .
|
||||||
COPY requirements-dev.txt .
|
|
||||||
|
|
||||||
# 安装 Python 依赖
|
# 安装 Python 依赖
|
||||||
RUN pip install --no-cache-dir -r requirements.txt
|
RUN pip install --no-cache-dir -r requirements.txt
|
||||||
RUN pip install --no-cache-dir -r requirements-dev.txt
|
|
||||||
|
# 安装dev依赖
|
||||||
|
#COPY requirements-dev.txt .
|
||||||
|
#RUN pip install --no-cache-dir -r requirements-dev.txt
|
||||||
|
|
||||||
# 复制应用代码和配置
|
# 复制应用代码和配置
|
||||||
COPY src/ ./src/
|
COPY src/ ./src/
|
||||||
|
|||||||
@@ -45,7 +45,9 @@ services:
|
|||||||
build:
|
build:
|
||||||
context: ..
|
context: ..
|
||||||
dockerfile: deployment/Dockerfile
|
dockerfile: deployment/Dockerfile
|
||||||
image: crpi-om2xd9y8cmaizszf.cn-beijing.personal.cr.aliyuncs.com/test-namespace-gu/fc-test:latest
|
platform: linux/amd64
|
||||||
|
ports:
|
||||||
|
- "8112:8000"
|
||||||
environment:
|
environment:
|
||||||
- APP_ENV=development
|
- APP_ENV=development
|
||||||
- LOG_LEVEL=INFO
|
- LOG_LEVEL=INFO
|
||||||
@@ -70,6 +72,12 @@ services:
|
|||||||
depends_on:
|
depends_on:
|
||||||
redis:
|
redis:
|
||||||
condition: service_healthy
|
condition: service_healthy
|
||||||
|
healthcheck:
|
||||||
|
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/healthz')"]
|
||||||
|
interval: 30s
|
||||||
|
timeout: 3s
|
||||||
|
retries: 3
|
||||||
|
start_period: 10s
|
||||||
deploy:
|
deploy:
|
||||||
replicas: 2
|
replicas: 2
|
||||||
|
|
||||||
|
|||||||
@@ -127,16 +127,25 @@ spec:
|
|||||||
limits:
|
limits:
|
||||||
memory: "512Mi"
|
memory: "512Mi"
|
||||||
cpu: "500m"
|
cpu: "500m"
|
||||||
# Worker 没有 HTTP 端口,使用命令探针
|
# Worker 现在有 HTTP 健康检查端点
|
||||||
|
ports:
|
||||||
|
- containerPort: 8000
|
||||||
|
name: http
|
||||||
livenessProbe:
|
livenessProbe:
|
||||||
exec:
|
httpGet:
|
||||||
command:
|
path: /healthz
|
||||||
- python
|
port: 8000
|
||||||
- -c
|
|
||||||
- "import redis; r = redis.Redis(host='functional-scaffold-redis'); r.ping()"
|
|
||||||
initialDelaySeconds: 10
|
initialDelaySeconds: 10
|
||||||
periodSeconds: 30
|
periodSeconds: 30
|
||||||
timeoutSeconds: 5
|
timeoutSeconds: 3
|
||||||
|
failureThreshold: 3
|
||||||
|
readinessProbe:
|
||||||
|
httpGet:
|
||||||
|
path: /readyz
|
||||||
|
port: 8000
|
||||||
|
initialDelaySeconds: 5
|
||||||
|
periodSeconds: 10
|
||||||
|
timeoutSeconds: 3
|
||||||
failureThreshold: 3
|
failureThreshold: 3
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|||||||
@@ -1,72 +0,0 @@
|
|||||||
# 阿里云函数计算配置
|
|
||||||
ROSTemplateFormatVersion: '2015-09-01'
|
|
||||||
Transform: 'Aliyun::Serverless-2018-04-03'
|
|
||||||
Resources:
|
|
||||||
functional-scaffold:
|
|
||||||
Type: 'Aliyun::Serverless::Service'
|
|
||||||
Properties:
|
|
||||||
Description: '算法工程化 Serverless 脚手架'
|
|
||||||
LogConfig:
|
|
||||||
Project: functional-scaffold-logs
|
|
||||||
Logstore: function-logs
|
|
||||||
VpcConfig:
|
|
||||||
VpcId: 'vpc-xxxxx'
|
|
||||||
VSwitchIds:
|
|
||||||
- 'vsw-xxxxx'
|
|
||||||
SecurityGroupId: 'sg-xxxxx'
|
|
||||||
prime-checker:
|
|
||||||
Type: 'Aliyun::Serverless::Function'
|
|
||||||
Properties:
|
|
||||||
Description: '质数判断算法服务(API)'
|
|
||||||
Runtime: custom-container
|
|
||||||
MemorySize: 512
|
|
||||||
Timeout: 60
|
|
||||||
InstanceConcurrency: 10
|
|
||||||
CAPort: 8000
|
|
||||||
CustomContainerConfig:
|
|
||||||
Image: 'registry.cn-hangzhou.aliyuncs.com/your-namespace/functional-scaffold:latest'
|
|
||||||
Command: '["/app/entrypoint.sh"]'
|
|
||||||
EnvironmentVariables:
|
|
||||||
APP_ENV: production
|
|
||||||
LOG_LEVEL: INFO
|
|
||||||
METRICS_ENABLED: 'true'
|
|
||||||
RUN_MODE: api
|
|
||||||
REDIS_HOST: 'r-xxxxx.redis.rds.aliyuncs.com'
|
|
||||||
REDIS_PORT: '6379'
|
|
||||||
Events:
|
|
||||||
httpTrigger:
|
|
||||||
Type: HTTP
|
|
||||||
Properties:
|
|
||||||
AuthType: ANONYMOUS
|
|
||||||
Methods:
|
|
||||||
- GET
|
|
||||||
- POST
|
|
||||||
job-worker:
|
|
||||||
Type: 'Aliyun::Serverless::Function'
|
|
||||||
Properties:
|
|
||||||
Description: '异步任务 Worker'
|
|
||||||
Runtime: custom-container
|
|
||||||
MemorySize: 512
|
|
||||||
Timeout: 900
|
|
||||||
InstanceConcurrency: 1
|
|
||||||
CustomContainerConfig:
|
|
||||||
Image: 'registry.cn-hangzhou.aliyuncs.com/your-namespace/functional-scaffold:latest'
|
|
||||||
Command: '["/app/entrypoint.sh"]'
|
|
||||||
EnvironmentVariables:
|
|
||||||
APP_ENV: production
|
|
||||||
LOG_LEVEL: INFO
|
|
||||||
METRICS_ENABLED: 'true'
|
|
||||||
RUN_MODE: worker
|
|
||||||
REDIS_HOST: 'r-xxxxx.redis.rds.aliyuncs.com'
|
|
||||||
REDIS_PORT: '6379'
|
|
||||||
WORKER_POLL_INTERVAL: '1.0'
|
|
||||||
MAX_CONCURRENT_JOBS: '5'
|
|
||||||
JOB_MAX_RETRIES: '3'
|
|
||||||
JOB_EXECUTION_TIMEOUT: '300'
|
|
||||||
Events:
|
|
||||||
timerTrigger:
|
|
||||||
Type: Timer
|
|
||||||
Properties:
|
|
||||||
CronExpression: '0 */1 * * * *'
|
|
||||||
Enable: true
|
|
||||||
Payload: '{}'
|
|
||||||
108
deployment/serverless/s.yaml
Normal file
108
deployment/serverless/s.yaml
Normal file
@@ -0,0 +1,108 @@
|
|||||||
|
# 阿里云函数计算 FC 3.0 配置
|
||||||
|
# 使用 Serverless Devs 部署: cd deployment/serverless && s deploy
|
||||||
|
edition: 3.0.0
|
||||||
|
name: functional-scaffold
|
||||||
|
access: default
|
||||||
|
|
||||||
|
vars:
|
||||||
|
region: cn-beijing
|
||||||
|
image: crpi-om2xd9y8cmaizszf-vpc.cn-beijing.personal.cr.aliyuncs.com/your-namespace/fc-test:test-v1
|
||||||
|
redis_host: 127.31.1.1
|
||||||
|
redis_port: "6379"
|
||||||
|
redis_password: "your-password"
|
||||||
|
|
||||||
|
resources:
|
||||||
|
# API 服务函数
|
||||||
|
prime-checker-api:
|
||||||
|
component: fc3
|
||||||
|
props:
|
||||||
|
region: ${vars.region}
|
||||||
|
functionName: prime-checker-api
|
||||||
|
description: 质数判断算法服务(API)
|
||||||
|
runtime: custom-container
|
||||||
|
cpu: 0.35
|
||||||
|
memorySize: 512
|
||||||
|
diskSize: 512
|
||||||
|
timeout: 60
|
||||||
|
instanceConcurrency: 10
|
||||||
|
handler: not-used
|
||||||
|
customContainerConfig:
|
||||||
|
image: ${vars.image}
|
||||||
|
port: 8000
|
||||||
|
command:
|
||||||
|
- /app/entrypoint.sh
|
||||||
|
healthCheckConfig:
|
||||||
|
httpGetUrl: /healthz
|
||||||
|
initialDelaySeconds: 3
|
||||||
|
periodSeconds: 5
|
||||||
|
timeoutSeconds: 3
|
||||||
|
failureThreshold: 3
|
||||||
|
successThreshold: 1
|
||||||
|
environmentVariables:
|
||||||
|
APP_ENV: production
|
||||||
|
LOG_LEVEL: INFO
|
||||||
|
METRICS_ENABLED: "true"
|
||||||
|
RUN_MODE: api
|
||||||
|
REDIS_HOST: ${vars.redis_host}
|
||||||
|
REDIS_PORT: ${vars.redis_port}
|
||||||
|
REDIS_PASSWORD: ${vars.redis_password}
|
||||||
|
vpcConfig: auto
|
||||||
|
logConfig: auto
|
||||||
|
triggers:
|
||||||
|
- triggerName: http-trigger
|
||||||
|
triggerType: http
|
||||||
|
triggerConfig:
|
||||||
|
authType: anonymous
|
||||||
|
methods:
|
||||||
|
- GET
|
||||||
|
- POST
|
||||||
|
- PUT
|
||||||
|
- DELETE
|
||||||
|
|
||||||
|
# 异步任务 Worker 函数
|
||||||
|
job-worker:
|
||||||
|
component: fc3
|
||||||
|
props:
|
||||||
|
region: ${vars.region}
|
||||||
|
functionName: job-worker
|
||||||
|
description: 异步任务 Worker
|
||||||
|
runtime: custom-container
|
||||||
|
cpu: 0.35
|
||||||
|
memorySize: 512
|
||||||
|
diskSize: 512
|
||||||
|
timeout: 900
|
||||||
|
instanceConcurrency: 1
|
||||||
|
handler: not-used
|
||||||
|
customContainerConfig:
|
||||||
|
image: ${vars.image}
|
||||||
|
port: 8000
|
||||||
|
command:
|
||||||
|
- /app/entrypoint.sh
|
||||||
|
healthCheckConfig:
|
||||||
|
httpGetUrl: /healthz
|
||||||
|
initialDelaySeconds: 5
|
||||||
|
periodSeconds: 10
|
||||||
|
timeoutSeconds: 3
|
||||||
|
failureThreshold: 3
|
||||||
|
successThreshold: 1
|
||||||
|
environmentVariables:
|
||||||
|
APP_ENV: production
|
||||||
|
LOG_LEVEL: INFO
|
||||||
|
METRICS_ENABLED: "true"
|
||||||
|
RUN_MODE: worker
|
||||||
|
REDIS_HOST: ${vars.redis_host}
|
||||||
|
REDIS_PORT: ${vars.redis_port}
|
||||||
|
REDIS_PASSWORD: ${vars.redis_password}
|
||||||
|
WORKER_POLL_INTERVAL: "1.0"
|
||||||
|
MAX_CONCURRENT_JOBS: "5"
|
||||||
|
JOB_MAX_RETRIES: "3"
|
||||||
|
JOB_EXECUTION_TIMEOUT: "300"
|
||||||
|
vpcConfig: auto
|
||||||
|
logConfig: auto
|
||||||
|
triggers:
|
||||||
|
- triggerName: timer-trigger
|
||||||
|
triggerType: timer
|
||||||
|
triggerConfig:
|
||||||
|
cronExpression: "0 */1 * * * *"
|
||||||
|
enable: true
|
||||||
|
payload: "{}"
|
||||||
58
docs/fc-deploy.md
Normal file
58
docs/fc-deploy.md
Normal file
@@ -0,0 +1,58 @@
|
|||||||
|
# 阿里云 函数运算FC 部署入门
|
||||||
|
|
||||||
|
本指南帮助快速上手 FunctionalScaffold 脚手架,在 10 分钟内完成第一个算法服务的开发和部署。
|
||||||
|
|
||||||
|
## 环境准备
|
||||||
|
|
||||||
|
- 安装 [Serverless Devs CLI](https://serverless-devs.com/docs/overview)
|
||||||
|
|
||||||
|
1. 首先安装Node 环境,在Node官网下载
|
||||||
|
- [Node.js 下载地址](https://nodejs.org/en/download/)
|
||||||
|
2. 安装 Serverless Devs CLI
|
||||||
|
|
||||||
|
```bash
|
||||||
|
npm install @serverless-devs/s -g
|
||||||
|
```
|
||||||
|
|
||||||
|
## 初始化 serverless dev cli 配置
|
||||||
|
|
||||||
|
执行以下命令初始化 serverless dev cli 配置
|
||||||
|
|
||||||
|
```bash
|
||||||
|
s config add
|
||||||
|
```
|
||||||
|
|
||||||
|
根据引导进行操作,填入你的access key id 和 access key secret
|
||||||
|
|
||||||
|
## 部署算法服务
|
||||||
|
|
||||||
|
部署算法服务前,请确保已经完成环境准备和配置。
|
||||||
|
|
||||||
|
修改 `s.yaml` 文件中的 vars 部分
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
# 阿里云函数计算 FC 3.0 配置
|
||||||
|
# 使用 Serverless Devs 部署: cd deployment/serverless && s deploy
|
||||||
|
edition: 3.0.0
|
||||||
|
name: functional-scaffold
|
||||||
|
access: default
|
||||||
|
|
||||||
|
vars:
|
||||||
|
region: cn-hangzhou # 换成你的区域
|
||||||
|
image: registry.cn-hangzhou.aliyuncs.com/your-namespace/functional-scaffold:latest # 换成你的docker 镜像
|
||||||
|
redis_host: r-xxxxx.redis.rds.aliyuncs.com # 换成你的redis连接
|
||||||
|
redis_port: "6379" # redis 端口号
|
||||||
|
redis_password: "your-password" #redis 密码,如果没有可留空
|
||||||
|
```
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cd deployment && s deploy
|
||||||
|
```
|
||||||
|
|
||||||
|
部署完成后,可以在控制台查看服务的运行状态和日志。
|
||||||
|
|
||||||
|
## 删除算法服务
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cd deployment && s remove
|
||||||
|
```
|
||||||
@@ -25,6 +25,8 @@ dependencies = [
|
|||||||
"pyyaml>=6.0.0",
|
"pyyaml>=6.0.0",
|
||||||
# HTTP 客户端(Webhook 回调)
|
# HTTP 客户端(Webhook 回调)
|
||||||
"httpx>=0.27.0",
|
"httpx>=0.27.0",
|
||||||
|
# 轻量级 HTTP 服务器(Worker 健康检查)
|
||||||
|
"aiohttp>=3.9.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
[project.optional-dependencies]
|
[project.optional-dependencies]
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ pydantic>=2.5.0
|
|||||||
pydantic-settings>=2.0.0
|
pydantic-settings>=2.0.0
|
||||||
prometheus-client>=0.19.0
|
prometheus-client>=0.19.0
|
||||||
python-json-logger>=2.0.7
|
python-json-logger>=2.0.7
|
||||||
|
aiohttp>=3.9.0
|
||||||
|
|
||||||
# Redis - 任务队列和指标存储
|
# Redis - 任务队列和指标存储
|
||||||
redis>=5.0.0
|
redis>=5.0.0
|
||||||
|
|||||||
@@ -32,7 +32,7 @@ class BaseAlgorithm(ABC):
|
|||||||
Returns:
|
Returns:
|
||||||
Dict[str, Any]: 包含结果和元数据的字典
|
Dict[str, Any]: 包含结果和元数据的字典
|
||||||
"""
|
"""
|
||||||
from ..core.metrics_unified import incr, observe
|
from ..core.metrics_unified import incr_sync, observe_sync
|
||||||
|
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
status = "success"
|
status = "success"
|
||||||
@@ -71,5 +71,7 @@ class BaseAlgorithm(ABC):
|
|||||||
finally:
|
finally:
|
||||||
# 记录算法执行指标
|
# 记录算法执行指标
|
||||||
elapsed_time = time.time() - start_time
|
elapsed_time = time.time() - start_time
|
||||||
incr("algorithm_executions_total", {"algorithm": self.name, "status": status})
|
incr_sync("algorithm_executions_total", {"algorithm": self.name, "status": status})
|
||||||
observe("algorithm_execution_duration_seconds", {"algorithm": self.name}, elapsed_time)
|
observe_sync(
|
||||||
|
"algorithm_execution_duration_seconds", {"algorithm": self.name}, elapsed_time
|
||||||
|
)
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
from typing import Dict, Any, List
|
from typing import Dict, Any, List
|
||||||
from .base import BaseAlgorithm
|
from .base import BaseAlgorithm
|
||||||
from ..core.metrics_unified import incr
|
from ..core.metrics_unified import incr_sync
|
||||||
|
|
||||||
|
|
||||||
class PrimeChecker(BaseAlgorithm):
|
class PrimeChecker(BaseAlgorithm):
|
||||||
@@ -31,12 +31,12 @@ class PrimeChecker(BaseAlgorithm):
|
|||||||
ValueError: 如果输入不是整数
|
ValueError: 如果输入不是整数
|
||||||
"""
|
"""
|
||||||
if not isinstance(number, int):
|
if not isinstance(number, int):
|
||||||
incr('prime_check',{"status":"invalid_input"})
|
incr_sync('prime_check', {"status": "invalid_input"})
|
||||||
raise ValueError(f"Input must be an integer, got {type(number).__name__}")
|
raise ValueError(f"Input must be an integer, got {type(number).__name__}")
|
||||||
|
|
||||||
# 小于2的数不是质数
|
# 小于2的数不是质数
|
||||||
if number < 2:
|
if number < 2:
|
||||||
incr('prime_check', {"status": "number_little_two"})
|
incr_sync('prime_check', {"status": "number_little_two"})
|
||||||
return {
|
return {
|
||||||
"number": number,
|
"number": number,
|
||||||
"is_prime": False,
|
"is_prime": False,
|
||||||
@@ -50,7 +50,7 @@ class PrimeChecker(BaseAlgorithm):
|
|||||||
|
|
||||||
# 如果不是质数,计算因数
|
# 如果不是质数,计算因数
|
||||||
factors = [] if is_prime else self._get_factors(number)
|
factors = [] if is_prime else self._get_factors(number)
|
||||||
incr('prime_check', {"status": "success"})
|
incr_sync('prime_check', {"status": "success"})
|
||||||
return {
|
return {
|
||||||
"number": number,
|
"number": number,
|
||||||
"is_prime": is_prime,
|
"is_prime": is_prime,
|
||||||
|
|||||||
@@ -168,7 +168,7 @@ return 0
|
|||||||
await self._redis_client.hset(key, mapping=job_data)
|
await self._redis_client.hset(key, mapping=job_data)
|
||||||
|
|
||||||
# 记录指标
|
# 记录指标
|
||||||
incr("jobs_created_total", {"algorithm": algorithm})
|
await incr("jobs_created_total", {"algorithm": algorithm})
|
||||||
|
|
||||||
logger.info(f"任务已创建: job_id={job_id}, algorithm={algorithm}")
|
logger.info(f"任务已创建: job_id={job_id}, algorithm={algorithm}")
|
||||||
return job_id
|
return job_id
|
||||||
@@ -320,8 +320,10 @@ return 0
|
|||||||
await self._redis_client.expire(key, settings.job_result_ttl)
|
await self._redis_client.expire(key, settings.job_result_ttl)
|
||||||
|
|
||||||
# 记录指标
|
# 记录指标
|
||||||
incr("jobs_completed_total", {"algorithm": algorithm_name, "status": status})
|
await incr("jobs_completed_total", {"algorithm": algorithm_name, "status": status})
|
||||||
observe("job_execution_duration_seconds", {"algorithm": algorithm_name}, elapsed_time)
|
await observe(
|
||||||
|
"job_execution_duration_seconds", {"algorithm": algorithm_name}, elapsed_time
|
||||||
|
)
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
f"任务执行完成: job_id={job_id}, status={status}, elapsed={elapsed_time:.3f}s"
|
f"任务执行完成: job_id={job_id}, status={status}, elapsed={elapsed_time:.3f}s"
|
||||||
@@ -372,7 +374,7 @@ return 0
|
|||||||
)
|
)
|
||||||
|
|
||||||
if response.status_code < 400:
|
if response.status_code < 400:
|
||||||
incr("webhook_deliveries_total", {"status": "success"})
|
await incr("webhook_deliveries_total", {"status": "success"})
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Webhook 发送成功: job_id={job_id}, url={webhook_url}, "
|
f"Webhook 发送成功: job_id={job_id}, url={webhook_url}, "
|
||||||
f"status_code={response.status_code}"
|
f"status_code={response.status_code}"
|
||||||
@@ -395,7 +397,7 @@ return 0
|
|||||||
await asyncio.sleep(delay)
|
await asyncio.sleep(delay)
|
||||||
|
|
||||||
# 所有重试都失败
|
# 所有重试都失败
|
||||||
incr("webhook_deliveries_total", {"status": "failed"})
|
await incr("webhook_deliveries_total", {"status": "failed"})
|
||||||
logger.error(f"Webhook 发送最终失败: job_id={job_id}, url={webhook_url}")
|
logger.error(f"Webhook 发送最终失败: job_id={job_id}, url={webhook_url}")
|
||||||
|
|
||||||
def is_available(self) -> bool:
|
def is_available(self) -> bool:
|
||||||
@@ -814,10 +816,10 @@ return 0
|
|||||||
# 更新指标
|
# 更新指标
|
||||||
from .metrics_unified import set as metrics_set
|
from .metrics_unified import set as metrics_set
|
||||||
|
|
||||||
metrics_set("job_queue_length", {"queue": "pending"}, queue_length)
|
await metrics_set("job_queue_length", {"queue": "pending"}, queue_length)
|
||||||
metrics_set("job_queue_length", {"queue": "processing"}, processing_length)
|
await metrics_set("job_queue_length", {"queue": "processing"}, processing_length)
|
||||||
metrics_set("job_queue_length", {"queue": "dlq"}, dlq_length)
|
await metrics_set("job_queue_length", {"queue": "dlq"}, dlq_length)
|
||||||
metrics_set("job_oldest_waiting_seconds", None, oldest_waiting_seconds)
|
await metrics_set("job_oldest_waiting_seconds", None, oldest_waiting_seconds)
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"queue_length": queue_length,
|
"queue_length": queue_length,
|
||||||
|
|||||||
@@ -1,19 +1,21 @@
|
|||||||
"""统一指标管理模块
|
"""统一指标管理模块
|
||||||
|
|
||||||
基于 Redis 的指标收集方案,支持多实例部署和 YAML 配置。
|
基于 Redis 的指标收集方案,支持多实例部署和 YAML 配置。
|
||||||
|
使用异步 Redis 客户端,避免在异步请求路径中阻塞事件循环。
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
import socket
|
import socket
|
||||||
import logging
|
import logging
|
||||||
|
import asyncio
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any, Dict, List, Optional
|
from typing import Any, Dict, List, Optional
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
import time
|
import time
|
||||||
|
|
||||||
import yaml
|
import yaml
|
||||||
import redis
|
import redis.asyncio as aioredis
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -22,7 +24,7 @@ class MetricsManager:
|
|||||||
"""统一指标管理器
|
"""统一指标管理器
|
||||||
|
|
||||||
支持从 YAML 配置文件加载指标定义,使用 Redis 存储指标数据,
|
支持从 YAML 配置文件加载指标定义,使用 Redis 存储指标数据,
|
||||||
并导出 Prometheus 格式的指标。
|
并导出 Prometheus 格式的指标。使用异步 Redis 客户端。
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, config_path: Optional[str] = None):
|
def __init__(self, config_path: Optional[str] = None):
|
||||||
@@ -37,16 +39,22 @@ class MetricsManager:
|
|||||||
self.instance_id = settings.metrics_instance_id or socket.gethostname()
|
self.instance_id = settings.metrics_instance_id or socket.gethostname()
|
||||||
self.config: Dict[str, Any] = {}
|
self.config: Dict[str, Any] = {}
|
||||||
self.metrics_definitions: Dict[str, Dict[str, Any]] = {}
|
self.metrics_definitions: Dict[str, Dict[str, Any]] = {}
|
||||||
self._redis_client: Optional[redis.Redis] = None
|
self._redis_client: Optional[aioredis.Redis] = None
|
||||||
self._redis_available = False
|
self._redis_available = False
|
||||||
|
self._initialized = False
|
||||||
|
|
||||||
# 加载配置
|
# 加载配置(同步操作)
|
||||||
self._load_config()
|
self._load_config()
|
||||||
# 初始化 Redis 连接
|
# 注册指标定义(同步操作)
|
||||||
self._init_redis()
|
|
||||||
# 注册指标定义
|
|
||||||
self._register_metrics()
|
self._register_metrics()
|
||||||
|
|
||||||
|
async def initialize(self) -> None:
|
||||||
|
"""异步初始化 Redis 连接"""
|
||||||
|
if self._initialized:
|
||||||
|
return
|
||||||
|
await self._init_redis()
|
||||||
|
self._initialized = True
|
||||||
|
|
||||||
def _load_config(self) -> None:
|
def _load_config(self) -> None:
|
||||||
"""加载 YAML 配置文件"""
|
"""加载 YAML 配置文件"""
|
||||||
# 尝试多个路径
|
# 尝试多个路径
|
||||||
@@ -138,8 +146,8 @@ class MetricsManager:
|
|||||||
"custom_metrics": {},
|
"custom_metrics": {},
|
||||||
}
|
}
|
||||||
|
|
||||||
def _init_redis(self) -> None:
|
async def _init_redis(self) -> None:
|
||||||
"""初始化 Redis 连接"""
|
"""异步初始化 Redis 连接"""
|
||||||
from ..config import settings
|
from ..config import settings
|
||||||
|
|
||||||
redis_config = self.config.get("redis", {})
|
redis_config = self.config.get("redis", {})
|
||||||
@@ -149,7 +157,7 @@ class MetricsManager:
|
|||||||
password = redis_config.get("password") or settings.redis_password
|
password = redis_config.get("password") or settings.redis_password
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self._redis_client = redis.Redis(
|
self._redis_client = aioredis.Redis(
|
||||||
host=host,
|
host=host,
|
||||||
port=port,
|
port=port,
|
||||||
db=db,
|
db=db,
|
||||||
@@ -159,10 +167,10 @@ class MetricsManager:
|
|||||||
socket_timeout=5,
|
socket_timeout=5,
|
||||||
)
|
)
|
||||||
# 测试连接
|
# 测试连接
|
||||||
self._redis_client.ping()
|
await self._redis_client.ping()
|
||||||
self._redis_available = True
|
self._redis_available = True
|
||||||
logger.info(f"Redis 连接成功: {host}:{port}/{db}")
|
logger.info(f"Redis 连接成功: {host}:{port}/{db}")
|
||||||
except redis.ConnectionError as e:
|
except aioredis.ConnectionError as e:
|
||||||
logger.warning(f"Redis 连接失败: {e},指标将不会被收集")
|
logger.warning(f"Redis 连接失败: {e},指标将不会被收集")
|
||||||
self._redis_available = False
|
self._redis_available = False
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -235,7 +243,9 @@ class MetricsManager:
|
|||||||
|
|
||||||
# === 简单 API(业务代码使用)===
|
# === 简单 API(业务代码使用)===
|
||||||
|
|
||||||
def incr(self, name: str, labels: Optional[Dict[str, str]] = None, value: int = 1) -> None:
|
async def incr(
|
||||||
|
self, name: str, labels: Optional[Dict[str, str]] = None, value: int = 1
|
||||||
|
) -> None:
|
||||||
"""增加计数器
|
"""增加计数器
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@@ -252,11 +262,13 @@ class MetricsManager:
|
|||||||
try:
|
try:
|
||||||
key = f"metrics:counter:{name}"
|
key = f"metrics:counter:{name}"
|
||||||
field = self._labels_to_key(labels) or "_default_"
|
field = self._labels_to_key(labels) or "_default_"
|
||||||
self._redis_client.hincrbyfloat(key, field, value)
|
await self._redis_client.hincrbyfloat(key, field, value)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"增加计数器失败: {e}")
|
logger.error(f"增加计数器失败: {e}")
|
||||||
|
|
||||||
def set(self, name: str, labels: Optional[Dict[str, str]] = None, value: float = 0) -> None:
|
async def set(
|
||||||
|
self, name: str, labels: Optional[Dict[str, str]] = None, value: float = 0
|
||||||
|
) -> None:
|
||||||
"""设置仪表盘值
|
"""设置仪表盘值
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@@ -273,11 +285,11 @@ class MetricsManager:
|
|||||||
try:
|
try:
|
||||||
key = f"metrics:gauge:{name}"
|
key = f"metrics:gauge:{name}"
|
||||||
field = self._labels_to_key(labels) or "_default_"
|
field = self._labels_to_key(labels) or "_default_"
|
||||||
self._redis_client.hset(key, field, value)
|
await self._redis_client.hset(key, field, value)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"设置仪表盘失败: {e}")
|
logger.error(f"设置仪表盘失败: {e}")
|
||||||
|
|
||||||
def gauge_incr(
|
async def gauge_incr(
|
||||||
self, name: str, labels: Optional[Dict[str, str]] = None, value: float = 1
|
self, name: str, labels: Optional[Dict[str, str]] = None, value: float = 1
|
||||||
) -> None:
|
) -> None:
|
||||||
"""增加仪表盘值
|
"""增加仪表盘值
|
||||||
@@ -296,11 +308,11 @@ class MetricsManager:
|
|||||||
try:
|
try:
|
||||||
key = f"metrics:gauge:{name}"
|
key = f"metrics:gauge:{name}"
|
||||||
field = self._labels_to_key(labels) or "_default_"
|
field = self._labels_to_key(labels) or "_default_"
|
||||||
self._redis_client.hincrbyfloat(key, field, value)
|
await self._redis_client.hincrbyfloat(key, field, value)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"增加仪表盘失败: {e}")
|
logger.error(f"增加仪表盘失败: {e}")
|
||||||
|
|
||||||
def gauge_decr(
|
async def gauge_decr(
|
||||||
self, name: str, labels: Optional[Dict[str, str]] = None, value: float = 1
|
self, name: str, labels: Optional[Dict[str, str]] = None, value: float = 1
|
||||||
) -> None:
|
) -> None:
|
||||||
"""减少仪表盘值
|
"""减少仪表盘值
|
||||||
@@ -310,9 +322,11 @@ class MetricsManager:
|
|||||||
labels: 标签字典
|
labels: 标签字典
|
||||||
value: 减少的值
|
value: 减少的值
|
||||||
"""
|
"""
|
||||||
self.gauge_incr(name, labels, -value)
|
await self.gauge_incr(name, labels, -value)
|
||||||
|
|
||||||
def observe(self, name: str, labels: Optional[Dict[str, str]] = None, value: float = 0) -> None:
|
async def observe(
|
||||||
|
self, name: str, labels: Optional[Dict[str, str]] = None, value: float = 0
|
||||||
|
) -> None:
|
||||||
"""记录直方图观测值
|
"""记录直方图观测值
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@@ -348,13 +362,13 @@ class MetricsManager:
|
|||||||
# +Inf 桶总是增加
|
# +Inf 桶总是增加
|
||||||
pipe.hincrbyfloat(f"metrics:histogram:{name}:bucket:+Inf", label_key, 1)
|
pipe.hincrbyfloat(f"metrics:histogram:{name}:bucket:+Inf", label_key, 1)
|
||||||
|
|
||||||
pipe.execute()
|
await pipe.execute()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"记录直方图失败: {e}")
|
logger.error(f"记录直方图失败: {e}")
|
||||||
|
|
||||||
# === 导出方法 ===
|
# === 导出方法 ===
|
||||||
|
|
||||||
def export(self) -> str:
|
async def export(self) -> str:
|
||||||
"""导出 Prometheus 格式指标
|
"""导出 Prometheus 格式指标
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
@@ -375,11 +389,11 @@ class MetricsManager:
|
|||||||
lines.append(f"# TYPE {name} {metric_type}")
|
lines.append(f"# TYPE {name} {metric_type}")
|
||||||
|
|
||||||
if metric_type == "counter":
|
if metric_type == "counter":
|
||||||
lines.extend(self._export_counter(name))
|
lines.extend(await self._export_counter(name))
|
||||||
elif metric_type == "gauge":
|
elif metric_type == "gauge":
|
||||||
lines.extend(self._export_gauge(name))
|
lines.extend(await self._export_gauge(name))
|
||||||
elif metric_type == "histogram":
|
elif metric_type == "histogram":
|
||||||
lines.extend(self._export_histogram(name, definition))
|
lines.extend(await self._export_histogram(name, definition))
|
||||||
|
|
||||||
lines.append("") # 空行分隔
|
lines.append("") # 空行分隔
|
||||||
|
|
||||||
@@ -389,12 +403,12 @@ class MetricsManager:
|
|||||||
|
|
||||||
return "\n".join(lines)
|
return "\n".join(lines)
|
||||||
|
|
||||||
def _export_counter(self, name: str) -> List[str]:
|
async def _export_counter(self, name: str) -> List[str]:
|
||||||
"""导出计数器指标"""
|
"""导出计数器指标"""
|
||||||
lines = []
|
lines = []
|
||||||
key = f"metrics:counter:{name}"
|
key = f"metrics:counter:{name}"
|
||||||
|
|
||||||
data = self._redis_client.hgetall(key)
|
data = await self._redis_client.hgetall(key)
|
||||||
for field, value in data.items():
|
for field, value in data.items():
|
||||||
if field == "_default_":
|
if field == "_default_":
|
||||||
lines.append(f"{name} {value}")
|
lines.append(f"{name} {value}")
|
||||||
@@ -404,12 +418,12 @@ class MetricsManager:
|
|||||||
|
|
||||||
return lines
|
return lines
|
||||||
|
|
||||||
def _export_gauge(self, name: str) -> List[str]:
|
async def _export_gauge(self, name: str) -> List[str]:
|
||||||
"""导出仪表盘指标"""
|
"""导出仪表盘指标"""
|
||||||
lines = []
|
lines = []
|
||||||
key = f"metrics:gauge:{name}"
|
key = f"metrics:gauge:{name}"
|
||||||
|
|
||||||
data = self._redis_client.hgetall(key)
|
data = await self._redis_client.hgetall(key)
|
||||||
for field, value in data.items():
|
for field, value in data.items():
|
||||||
if field == "_default_":
|
if field == "_default_":
|
||||||
lines.append(f"{name} {value}")
|
lines.append(f"{name} {value}")
|
||||||
@@ -419,14 +433,14 @@ class MetricsManager:
|
|||||||
|
|
||||||
return lines
|
return lines
|
||||||
|
|
||||||
def _export_histogram(self, name: str, definition: Dict[str, Any]) -> List[str]:
|
async def _export_histogram(self, name: str, definition: Dict[str, Any]) -> List[str]:
|
||||||
"""导出直方图指标"""
|
"""导出直方图指标"""
|
||||||
lines = []
|
lines = []
|
||||||
buckets = definition.get("buckets", [])
|
buckets = definition.get("buckets", [])
|
||||||
|
|
||||||
# 获取所有标签组合
|
# 获取所有标签组合
|
||||||
count_data = self._redis_client.hgetall(f"metrics:histogram:{name}:count")
|
count_data = await self._redis_client.hgetall(f"metrics:histogram:{name}:count")
|
||||||
sum_data = self._redis_client.hgetall(f"metrics:histogram:{name}:sum")
|
sum_data = await self._redis_client.hgetall(f"metrics:histogram:{name}:sum")
|
||||||
|
|
||||||
for label_key in count_data.keys():
|
for label_key in count_data.keys():
|
||||||
prom_labels = self._key_to_prometheus_labels(label_key)
|
prom_labels = self._key_to_prometheus_labels(label_key)
|
||||||
@@ -434,7 +448,7 @@ class MetricsManager:
|
|||||||
# 导出各个桶
|
# 导出各个桶
|
||||||
for bucket in buckets:
|
for bucket in buckets:
|
||||||
bucket_key = f"metrics:histogram:{name}:bucket:{bucket}"
|
bucket_key = f"metrics:histogram:{name}:bucket:{bucket}"
|
||||||
bucket_value = self._redis_client.hget(bucket_key, label_key) or "0"
|
bucket_value = await self._redis_client.hget(bucket_key, label_key) or "0"
|
||||||
if label_key == "_default_":
|
if label_key == "_default_":
|
||||||
lines.append(f'{name}_bucket{{le="{bucket}"}} {bucket_value}')
|
lines.append(f'{name}_bucket{{le="{bucket}"}} {bucket_value}')
|
||||||
else:
|
else:
|
||||||
@@ -442,7 +456,7 @@ class MetricsManager:
|
|||||||
|
|
||||||
# +Inf 桶
|
# +Inf 桶
|
||||||
inf_key = f"metrics:histogram:{name}:bucket:+Inf"
|
inf_key = f"metrics:histogram:{name}:bucket:+Inf"
|
||||||
inf_value = self._redis_client.hget(inf_key, label_key) or "0"
|
inf_value = await self._redis_client.hget(inf_key, label_key) or "0"
|
||||||
if label_key == "_default_":
|
if label_key == "_default_":
|
||||||
lines.append(f'{name}_bucket{{le="+Inf"}} {inf_value}')
|
lines.append(f'{name}_bucket{{le="+Inf"}} {inf_value}')
|
||||||
else:
|
else:
|
||||||
@@ -464,43 +478,79 @@ class MetricsManager:
|
|||||||
"""检查 Redis 是否可用"""
|
"""检查 Redis 是否可用"""
|
||||||
return self._redis_available
|
return self._redis_available
|
||||||
|
|
||||||
def reset(self) -> None:
|
async def reset(self) -> None:
|
||||||
"""重置所有指标(主要用于测试)"""
|
"""重置所有指标(主要用于测试)"""
|
||||||
if not self._redis_available:
|
if not self._redis_available:
|
||||||
return
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# 删除所有指标相关的 key
|
# 删除所有指标相关的 key
|
||||||
keys = self._redis_client.keys("metrics:*")
|
keys = await self._redis_client.keys("metrics:*")
|
||||||
if keys:
|
if keys:
|
||||||
self._redis_client.delete(*keys)
|
await self._redis_client.delete(*keys)
|
||||||
logger.info("已重置所有指标")
|
logger.info("已重置所有指标")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"重置指标失败: {e}")
|
logger.error(f"重置指标失败: {e}")
|
||||||
|
|
||||||
|
async def close(self) -> None:
|
||||||
|
"""关闭 Redis 连接"""
|
||||||
|
if self._redis_client:
|
||||||
|
await self._redis_client.close()
|
||||||
|
self._redis_client = None
|
||||||
|
self._redis_available = False
|
||||||
|
self._initialized = False
|
||||||
|
|
||||||
|
|
||||||
# 全局单例
|
# 全局单例
|
||||||
_manager: Optional[MetricsManager] = None
|
_manager: Optional[MetricsManager] = None
|
||||||
|
_manager_lock = asyncio.Lock()
|
||||||
|
|
||||||
|
|
||||||
def get_metrics_manager() -> MetricsManager:
|
async def get_metrics_manager() -> MetricsManager:
|
||||||
"""获取指标管理器单例"""
|
"""获取指标管理器单例(异步)"""
|
||||||
|
global _manager
|
||||||
|
if _manager is None:
|
||||||
|
async with _manager_lock:
|
||||||
|
if _manager is None:
|
||||||
|
_manager = MetricsManager()
|
||||||
|
await _manager.initialize()
|
||||||
|
elif not _manager._initialized:
|
||||||
|
await _manager.initialize()
|
||||||
|
return _manager
|
||||||
|
|
||||||
|
|
||||||
|
def get_metrics_manager_sync() -> MetricsManager:
|
||||||
|
"""获取指标管理器单例(同步,仅用于非异步上下文)
|
||||||
|
|
||||||
|
注意:此方法不会初始化 Redis 连接,需要在异步上下文中调用 initialize()
|
||||||
|
"""
|
||||||
global _manager
|
global _manager
|
||||||
if _manager is None:
|
if _manager is None:
|
||||||
_manager = MetricsManager()
|
_manager = MetricsManager()
|
||||||
return _manager
|
return _manager
|
||||||
|
|
||||||
|
|
||||||
def reset_metrics_manager() -> None:
|
async def reset_metrics_manager() -> None:
|
||||||
"""重置指标管理器单例(主要用于测试)"""
|
"""重置指标管理器单例(主要用于测试)"""
|
||||||
global _manager
|
global _manager
|
||||||
|
if _manager is not None:
|
||||||
|
await _manager.close()
|
||||||
|
_manager = None
|
||||||
|
|
||||||
|
|
||||||
|
def reset_metrics_manager_sync() -> None:
|
||||||
|
"""同步重置指标管理器单例(主要用于测试)
|
||||||
|
|
||||||
|
注意:此方法不会关闭 Redis 连接,仅重置单例引用
|
||||||
|
"""
|
||||||
|
global _manager
|
||||||
_manager = None
|
_manager = None
|
||||||
|
|
||||||
|
|
||||||
# === 便捷函数(业务代码直接调用)===
|
# === 便捷函数(业务代码直接调用)===
|
||||||
|
|
||||||
|
|
||||||
def incr(name: str, labels: Optional[Dict[str, str]] = None, value: int = 1) -> None:
|
async def incr(name: str, labels: Optional[Dict[str, str]] = None, value: int = 1) -> None:
|
||||||
"""增加计数器 - 便捷函数
|
"""增加计数器 - 便捷函数
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@@ -508,10 +558,11 @@ def incr(name: str, labels: Optional[Dict[str, str]] = None, value: int = 1) ->
|
|||||||
labels: 标签字典
|
labels: 标签字典
|
||||||
value: 增加的值,默认为 1
|
value: 增加的值,默认为 1
|
||||||
"""
|
"""
|
||||||
get_metrics_manager().incr(name, labels, value)
|
manager = await get_metrics_manager()
|
||||||
|
await manager.incr(name, labels, value)
|
||||||
|
|
||||||
|
|
||||||
def set(name: str, labels: Optional[Dict[str, str]] = None, value: float = 0) -> None:
|
async def set(name: str, labels: Optional[Dict[str, str]] = None, value: float = 0) -> None:
|
||||||
"""设置仪表盘 - 便捷函数
|
"""设置仪表盘 - 便捷函数
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@@ -519,10 +570,13 @@ def set(name: str, labels: Optional[Dict[str, str]] = None, value: float = 0) ->
|
|||||||
labels: 标签字典
|
labels: 标签字典
|
||||||
value: 设置的值
|
value: 设置的值
|
||||||
"""
|
"""
|
||||||
get_metrics_manager().set(name, labels, value)
|
manager = await get_metrics_manager()
|
||||||
|
await manager.set(name, labels, value)
|
||||||
|
|
||||||
|
|
||||||
def gauge_incr(name: str, labels: Optional[Dict[str, str]] = None, value: float = 1) -> None:
|
async def gauge_incr(
|
||||||
|
name: str, labels: Optional[Dict[str, str]] = None, value: float = 1
|
||||||
|
) -> None:
|
||||||
"""增加仪表盘 - 便捷函数
|
"""增加仪表盘 - 便捷函数
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@@ -530,10 +584,13 @@ def gauge_incr(name: str, labels: Optional[Dict[str, str]] = None, value: float
|
|||||||
labels: 标签字典
|
labels: 标签字典
|
||||||
value: 增加的值
|
value: 增加的值
|
||||||
"""
|
"""
|
||||||
get_metrics_manager().gauge_incr(name, labels, value)
|
manager = await get_metrics_manager()
|
||||||
|
await manager.gauge_incr(name, labels, value)
|
||||||
|
|
||||||
|
|
||||||
def gauge_decr(name: str, labels: Optional[Dict[str, str]] = None, value: float = 1) -> None:
|
async def gauge_decr(
|
||||||
|
name: str, labels: Optional[Dict[str, str]] = None, value: float = 1
|
||||||
|
) -> None:
|
||||||
"""减少仪表盘 - 便捷函数
|
"""减少仪表盘 - 便捷函数
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@@ -541,10 +598,13 @@ def gauge_decr(name: str, labels: Optional[Dict[str, str]] = None, value: float
|
|||||||
labels: 标签字典
|
labels: 标签字典
|
||||||
value: 减少的值
|
value: 减少的值
|
||||||
"""
|
"""
|
||||||
get_metrics_manager().gauge_decr(name, labels, value)
|
manager = await get_metrics_manager()
|
||||||
|
await manager.gauge_decr(name, labels, value)
|
||||||
|
|
||||||
|
|
||||||
def observe(name: str, labels: Optional[Dict[str, str]] = None, value: float = 0) -> None:
|
async def observe(
|
||||||
|
name: str, labels: Optional[Dict[str, str]] = None, value: float = 0
|
||||||
|
) -> None:
|
||||||
"""记录直方图 - 便捷函数
|
"""记录直方图 - 便捷函数
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@@ -552,21 +612,105 @@ def observe(name: str, labels: Optional[Dict[str, str]] = None, value: float = 0
|
|||||||
labels: 标签字典
|
labels: 标签字典
|
||||||
value: 观测值
|
value: 观测值
|
||||||
"""
|
"""
|
||||||
get_metrics_manager().observe(name, labels, value)
|
manager = await get_metrics_manager()
|
||||||
|
await manager.observe(name, labels, value)
|
||||||
|
|
||||||
|
|
||||||
def export() -> str:
|
async def export() -> str:
|
||||||
"""导出指标 - 便捷函数
|
"""导出指标 - 便捷函数
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Prometheus 文本格式的指标字符串
|
Prometheus 文本格式的指标字符串
|
||||||
"""
|
"""
|
||||||
return get_metrics_manager().export()
|
manager = await get_metrics_manager()
|
||||||
|
return await manager.export()
|
||||||
|
|
||||||
|
|
||||||
def is_available() -> bool:
|
async def is_available() -> bool:
|
||||||
"""检查 Redis 是否可用 - 便捷函数"""
|
"""检查 Redis 是否可用 - 便捷函数"""
|
||||||
return get_metrics_manager().is_available()
|
manager = await get_metrics_manager()
|
||||||
|
return manager.is_available()
|
||||||
|
|
||||||
|
|
||||||
|
# === 同步便捷函数(用于同步代码中的 fire-and-forget 模式)===
|
||||||
|
|
||||||
|
|
||||||
|
def _schedule_async(coro) -> None:
|
||||||
|
"""在后台调度异步协程(fire-and-forget 模式)
|
||||||
|
|
||||||
|
如果当前没有运行的事件循环,则静默忽略。
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
loop.create_task(coro)
|
||||||
|
except RuntimeError:
|
||||||
|
# 没有运行的事件循环,静默忽略
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def incr_sync(
|
||||||
|
name: str, labels: Optional[Dict[str, str]] = None, value: int = 1
|
||||||
|
) -> None:
|
||||||
|
"""增加计数器 - 同步便捷函数(fire-and-forget)
|
||||||
|
|
||||||
|
Args:
|
||||||
|
name: 指标名称
|
||||||
|
labels: 标签字典
|
||||||
|
value: 增加的值,默认为 1
|
||||||
|
"""
|
||||||
|
_schedule_async(incr(name, labels, value))
|
||||||
|
|
||||||
|
|
||||||
|
def set_sync(
|
||||||
|
name: str, labels: Optional[Dict[str, str]] = None, value: float = 0
|
||||||
|
) -> None:
|
||||||
|
"""设置仪表盘 - 同步便捷函数(fire-and-forget)
|
||||||
|
|
||||||
|
Args:
|
||||||
|
name: 指标名称
|
||||||
|
labels: 标签字典
|
||||||
|
value: 设置的值
|
||||||
|
"""
|
||||||
|
_schedule_async(set(name, labels, value))
|
||||||
|
|
||||||
|
|
||||||
|
def gauge_incr_sync(
|
||||||
|
name: str, labels: Optional[Dict[str, str]] = None, value: float = 1
|
||||||
|
) -> None:
|
||||||
|
"""增加仪表盘 - 同步便捷函数(fire-and-forget)
|
||||||
|
|
||||||
|
Args:
|
||||||
|
name: 指标名称
|
||||||
|
labels: 标签字典
|
||||||
|
value: 增加的值
|
||||||
|
"""
|
||||||
|
_schedule_async(gauge_incr(name, labels, value))
|
||||||
|
|
||||||
|
|
||||||
|
def gauge_decr_sync(
|
||||||
|
name: str, labels: Optional[Dict[str, str]] = None, value: float = 1
|
||||||
|
) -> None:
|
||||||
|
"""减少仪表盘 - 同步便捷函数(fire-and-forget)
|
||||||
|
|
||||||
|
Args:
|
||||||
|
name: 指标名称
|
||||||
|
labels: 标签字典
|
||||||
|
value: 减少的值
|
||||||
|
"""
|
||||||
|
_schedule_async(gauge_decr(name, labels, value))
|
||||||
|
|
||||||
|
|
||||||
|
def observe_sync(
|
||||||
|
name: str, labels: Optional[Dict[str, str]] = None, value: float = 0
|
||||||
|
) -> None:
|
||||||
|
"""记录直方图 - 同步便捷函数(fire-and-forget)
|
||||||
|
|
||||||
|
Args:
|
||||||
|
name: 指标名称
|
||||||
|
labels: 标签字典
|
||||||
|
value: 观测值
|
||||||
|
"""
|
||||||
|
_schedule_async(observe(name, labels, value))
|
||||||
|
|
||||||
|
|
||||||
# === 装饰器(兼容旧 API)===
|
# === 装饰器(兼容旧 API)===
|
||||||
@@ -593,8 +737,11 @@ def track_algorithm_execution(algorithm_name: str):
|
|||||||
raise e
|
raise e
|
||||||
finally:
|
finally:
|
||||||
elapsed = time.time() - start_time
|
elapsed = time.time() - start_time
|
||||||
incr("algorithm_executions_total", {"algorithm": algorithm_name, "status": status})
|
incr_sync(
|
||||||
observe(
|
"algorithm_executions_total",
|
||||||
|
{"algorithm": algorithm_name, "status": status},
|
||||||
|
)
|
||||||
|
observe_sync(
|
||||||
"algorithm_execution_duration_seconds",
|
"algorithm_execution_duration_seconds",
|
||||||
{"algorithm": algorithm_name},
|
{"algorithm": algorithm_name},
|
||||||
elapsed,
|
elapsed,
|
||||||
|
|||||||
@@ -95,7 +95,7 @@ async def track_metrics(request: Request, call_next):
|
|||||||
if request.url.path in skip_paths:
|
if request.url.path in skip_paths:
|
||||||
return await call_next(request)
|
return await call_next(request)
|
||||||
|
|
||||||
gauge_incr("http_requests_in_progress")
|
await gauge_incr("http_requests_in_progress")
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
status = "success"
|
status = "success"
|
||||||
|
|
||||||
@@ -112,16 +112,16 @@ async def track_metrics(request: Request, call_next):
|
|||||||
elapsed = time.time() - start_time
|
elapsed = time.time() - start_time
|
||||||
# 使用规范化后的路径记录指标
|
# 使用规范化后的路径记录指标
|
||||||
normalized_path = normalize_path(request.url.path)
|
normalized_path = normalize_path(request.url.path)
|
||||||
incr(
|
await incr(
|
||||||
"http_requests_total",
|
"http_requests_total",
|
||||||
{"method": request.method, "endpoint": normalized_path, "status": status},
|
{"method": request.method, "endpoint": normalized_path, "status": status},
|
||||||
)
|
)
|
||||||
observe(
|
await observe(
|
||||||
"http_request_duration_seconds",
|
"http_request_duration_seconds",
|
||||||
{"method": request.method, "endpoint": normalized_path},
|
{"method": request.method, "endpoint": normalized_path},
|
||||||
elapsed,
|
elapsed,
|
||||||
)
|
)
|
||||||
gauge_decr("http_requests_in_progress")
|
await gauge_decr("http_requests_in_progress")
|
||||||
|
|
||||||
|
|
||||||
# 注册路由
|
# 注册路由
|
||||||
@@ -145,7 +145,7 @@ async def metrics():
|
|||||||
return Response(content="Metrics disabled", status_code=404)
|
return Response(content="Metrics disabled", status_code=404)
|
||||||
|
|
||||||
return Response(
|
return Response(
|
||||||
content=export(),
|
content=await export(),
|
||||||
media_type="text/plain; version=0.0.4; charset=utf-8",
|
media_type="text/plain; version=0.0.4; charset=utf-8",
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -160,7 +160,7 @@ async def startup_event():
|
|||||||
|
|
||||||
# 初始化指标管理器
|
# 初始化指标管理器
|
||||||
if settings.metrics_enabled:
|
if settings.metrics_enabled:
|
||||||
manager = get_metrics_manager()
|
manager = await get_metrics_manager()
|
||||||
if manager.is_available():
|
if manager.is_available():
|
||||||
logger.info("Redis 指标收集已启用")
|
logger.info("Redis 指标收集已启用")
|
||||||
else:
|
else:
|
||||||
|
|||||||
@@ -9,6 +9,8 @@ import signal
|
|||||||
import sys
|
import sys
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
|
from aiohttp import web
|
||||||
|
|
||||||
from .config import settings
|
from .config import settings
|
||||||
from .core.job_manager import JobManager
|
from .core.job_manager import JobManager
|
||||||
from .core.logging import setup_logging
|
from .core.logging import setup_logging
|
||||||
@@ -17,6 +19,53 @@ from .core.tracing import set_request_id
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class HealthCheckServer:
|
||||||
|
"""轻量级健康检查 HTTP 服务器
|
||||||
|
|
||||||
|
为 Worker 模式提供健康检查端点,满足 FC 3.0 容器健康检查要求。
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, host: str = "0.0.0.0", port: int = 8000):
|
||||||
|
self._host = host
|
||||||
|
self._port = port
|
||||||
|
self._app: Optional[web.Application] = None
|
||||||
|
self._runner: Optional[web.AppRunner] = None
|
||||||
|
self._site: Optional[web.TCPSite] = None
|
||||||
|
self._healthy = True
|
||||||
|
|
||||||
|
async def start(self) -> None:
|
||||||
|
"""启动健康检查服务器"""
|
||||||
|
self._app = web.Application()
|
||||||
|
self._app.router.add_get("/healthz", self._healthz_handler)
|
||||||
|
self._app.router.add_get("/readyz", self._readyz_handler)
|
||||||
|
|
||||||
|
self._runner = web.AppRunner(self._app)
|
||||||
|
await self._runner.setup()
|
||||||
|
self._site = web.TCPSite(self._runner, self._host, self._port)
|
||||||
|
await self._site.start()
|
||||||
|
logger.info(f"健康检查服务器已启动: http://{self._host}:{self._port}")
|
||||||
|
|
||||||
|
async def stop(self) -> None:
|
||||||
|
"""停止健康检查服务器"""
|
||||||
|
if self._runner:
|
||||||
|
await self._runner.cleanup()
|
||||||
|
logger.info("健康检查服务器已停止")
|
||||||
|
|
||||||
|
def set_healthy(self, healthy: bool) -> None:
|
||||||
|
"""设置健康状态"""
|
||||||
|
self._healthy = healthy
|
||||||
|
|
||||||
|
async def _healthz_handler(self, request: web.Request) -> web.Response:
|
||||||
|
"""存活检查端点"""
|
||||||
|
return web.json_response({"status": "healthy", "mode": "worker"})
|
||||||
|
|
||||||
|
async def _readyz_handler(self, request: web.Request) -> web.Response:
|
||||||
|
"""就绪检查端点"""
|
||||||
|
if self._healthy:
|
||||||
|
return web.json_response({"status": "ready", "mode": "worker"})
|
||||||
|
return web.json_response({"status": "not ready"}, status=503)
|
||||||
|
|
||||||
|
|
||||||
class JobWorker:
|
class JobWorker:
|
||||||
"""任务 Worker
|
"""任务 Worker
|
||||||
|
|
||||||
@@ -260,7 +309,7 @@ class JobWorker:
|
|||||||
# 记录回收指标
|
# 记录回收指标
|
||||||
from .core.metrics_unified import incr
|
from .core.metrics_unified import incr
|
||||||
|
|
||||||
incr("job_recovered_total", None, recovered)
|
await incr("job_recovered_total", None, recovered)
|
||||||
|
|
||||||
# 收集队列监控指标
|
# 收集队列监控指标
|
||||||
await self._job_manager.collect_queue_metrics()
|
await self._job_manager.collect_queue_metrics()
|
||||||
@@ -272,12 +321,21 @@ class JobWorker:
|
|||||||
logger.error(f"超时任务回收异常: {e}")
|
logger.error(f"超时任务回收异常: {e}")
|
||||||
|
|
||||||
|
|
||||||
def setup_signal_handlers(worker: JobWorker, loop: asyncio.AbstractEventLoop) -> None:
|
def setup_signal_handlers(
|
||||||
|
worker: JobWorker,
|
||||||
|
health_server: HealthCheckServer,
|
||||||
|
loop: asyncio.AbstractEventLoop,
|
||||||
|
) -> None:
|
||||||
"""设置信号处理器"""
|
"""设置信号处理器"""
|
||||||
|
|
||||||
|
async def shutdown_all() -> None:
|
||||||
|
"""关闭所有服务"""
|
||||||
|
await worker.shutdown()
|
||||||
|
await health_server.stop()
|
||||||
|
|
||||||
def signal_handler(sig: signal.Signals) -> None:
|
def signal_handler(sig: signal.Signals) -> None:
|
||||||
logger.info(f"收到信号 {sig.name},准备关闭...")
|
logger.info(f"收到信号 {sig.name},准备关闭...")
|
||||||
loop.create_task(worker.shutdown())
|
loop.create_task(shutdown_all())
|
||||||
|
|
||||||
for sig in (signal.SIGTERM, signal.SIGINT):
|
for sig in (signal.SIGTERM, signal.SIGINT):
|
||||||
loop.add_signal_handler(sig, signal_handler, sig)
|
loop.add_signal_handler(sig, signal_handler, sig)
|
||||||
@@ -288,13 +346,19 @@ async def main() -> None:
|
|||||||
# 设置日志
|
# 设置日志
|
||||||
setup_logging(level=settings.log_level, format_type=settings.log_format)
|
setup_logging(level=settings.log_level, format_type=settings.log_format)
|
||||||
|
|
||||||
|
# 创建健康检查服务器和 Worker
|
||||||
|
health_server = HealthCheckServer(port=8000)
|
||||||
worker = JobWorker()
|
worker = JobWorker()
|
||||||
|
|
||||||
# 设置信号处理
|
# 设置信号处理
|
||||||
loop = asyncio.get_running_loop()
|
loop = asyncio.get_running_loop()
|
||||||
setup_signal_handlers(worker, loop)
|
setup_signal_handlers(worker, health_server, loop)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
# 先启动健康检查服务器,确保 FC 健康检查能通过
|
||||||
|
await health_server.start()
|
||||||
|
|
||||||
|
# 初始化并运行 Worker
|
||||||
await worker.initialize()
|
await worker.initialize()
|
||||||
await worker.run()
|
await worker.run()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -302,6 +366,7 @@ async def main() -> None:
|
|||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
finally:
|
finally:
|
||||||
await worker.shutdown()
|
await worker.shutdown()
|
||||||
|
await health_server.stop()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
@@ -1,158 +1,239 @@
|
|||||||
"""metrics_unified 模块单元测试"""
|
"""metrics_unified 模块单元测试"""
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
from unittest.mock import MagicMock, patch
|
from unittest.mock import AsyncMock, MagicMock, patch
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def reset_manager():
|
||||||
|
"""每个测试前后重置管理器"""
|
||||||
|
from functional_scaffold.core.metrics_unified import reset_metrics_manager_sync
|
||||||
|
|
||||||
|
reset_metrics_manager_sync()
|
||||||
|
yield
|
||||||
|
reset_metrics_manager_sync()
|
||||||
|
|
||||||
|
|
||||||
class TestMetricsManager:
|
class TestMetricsManager:
|
||||||
"""MetricsManager 类测试"""
|
"""MetricsManager 类测试"""
|
||||||
|
|
||||||
@pytest.fixture
|
def test_init_loads_default_config(self):
|
||||||
def mock_redis(self):
|
|
||||||
"""模拟 Redis 客户端"""
|
|
||||||
with patch("redis.Redis") as mock:
|
|
||||||
mock_instance = MagicMock()
|
|
||||||
mock_instance.ping.return_value = True
|
|
||||||
mock_instance.hincrbyfloat.return_value = 1.0
|
|
||||||
mock_instance.hset.return_value = True
|
|
||||||
mock_instance.hgetall.return_value = {}
|
|
||||||
mock_instance.hget.return_value = "0"
|
|
||||||
mock_instance.keys.return_value = []
|
|
||||||
mock_instance.pipeline.return_value = MagicMock()
|
|
||||||
mock.return_value = mock_instance
|
|
||||||
yield mock_instance
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def manager(self, mock_redis):
|
|
||||||
"""创建测试用的 MetricsManager"""
|
|
||||||
from functional_scaffold.core.metrics_unified import (
|
|
||||||
MetricsManager,
|
|
||||||
reset_metrics_manager,
|
|
||||||
)
|
|
||||||
|
|
||||||
reset_metrics_manager()
|
|
||||||
manager = MetricsManager()
|
|
||||||
return manager
|
|
||||||
|
|
||||||
def test_init_loads_default_config(self, manager):
|
|
||||||
"""测试初始化加载默认配置"""
|
"""测试初始化加载默认配置"""
|
||||||
|
from functional_scaffold.core.metrics_unified import MetricsManager
|
||||||
|
|
||||||
|
manager = MetricsManager()
|
||||||
assert manager.config is not None
|
assert manager.config is not None
|
||||||
assert "builtin_metrics" in manager.config or len(manager.metrics_definitions) > 0
|
assert "builtin_metrics" in manager.config or len(manager.metrics_definitions) > 0
|
||||||
|
|
||||||
def test_metrics_definitions_registered(self, manager):
|
def test_metrics_definitions_registered(self):
|
||||||
"""测试指标定义已注册"""
|
"""测试指标定义已注册"""
|
||||||
|
from functional_scaffold.core.metrics_unified import MetricsManager
|
||||||
|
|
||||||
|
manager = MetricsManager()
|
||||||
assert "http_requests_total" in manager.metrics_definitions
|
assert "http_requests_total" in manager.metrics_definitions
|
||||||
assert "http_request_duration_seconds" in manager.metrics_definitions
|
assert "http_request_duration_seconds" in manager.metrics_definitions
|
||||||
assert "algorithm_executions_total" in manager.metrics_definitions
|
assert "algorithm_executions_total" in manager.metrics_definitions
|
||||||
|
|
||||||
def test_incr_counter(self, manager, mock_redis):
|
@pytest.mark.asyncio
|
||||||
|
@patch("redis.asyncio.Redis")
|
||||||
|
async def test_incr_counter(self, mock_redis_class):
|
||||||
"""测试计数器增加"""
|
"""测试计数器增加"""
|
||||||
manager.incr("http_requests_total", {"method": "GET", "endpoint": "/", "status": "success"})
|
mock_instance = AsyncMock()
|
||||||
mock_redis.hincrbyfloat.assert_called()
|
mock_instance.ping = AsyncMock(return_value=True)
|
||||||
|
mock_instance.hincrbyfloat = AsyncMock(return_value=1.0)
|
||||||
|
mock_instance.close = AsyncMock()
|
||||||
|
mock_redis_class.return_value = mock_instance
|
||||||
|
|
||||||
def test_incr_with_invalid_metric_type(self, manager, mock_redis):
|
from functional_scaffold.core.metrics_unified import MetricsManager
|
||||||
|
|
||||||
|
manager = MetricsManager()
|
||||||
|
await manager.initialize()
|
||||||
|
|
||||||
|
await manager.incr(
|
||||||
|
"http_requests_total", {"method": "GET", "endpoint": "/", "status": "success"}
|
||||||
|
)
|
||||||
|
mock_instance.hincrbyfloat.assert_called()
|
||||||
|
|
||||||
|
def test_incr_with_invalid_metric_type(self):
|
||||||
"""测试对非计数器类型调用 incr"""
|
"""测试对非计数器类型调用 incr"""
|
||||||
|
from functional_scaffold.core.metrics_unified import MetricsManager
|
||||||
|
|
||||||
|
manager = MetricsManager()
|
||||||
# http_request_duration_seconds 是 histogram 类型
|
# http_request_duration_seconds 是 histogram 类型
|
||||||
manager.incr("http_request_duration_seconds", {})
|
# 验证不会抛出异常(因为 Redis 不可用)
|
||||||
# 不应该调用 Redis(因为类型不匹配)
|
|
||||||
# 验证没有调用 hincrbyfloat(或者调用次数没有增加)
|
|
||||||
|
|
||||||
def test_set_gauge(self, manager, mock_redis):
|
@pytest.mark.asyncio
|
||||||
|
@patch("redis.asyncio.Redis")
|
||||||
|
async def test_set_gauge(self, mock_redis_class):
|
||||||
"""测试设置仪表盘"""
|
"""测试设置仪表盘"""
|
||||||
manager.set("http_requests_in_progress", {}, 5)
|
mock_instance = AsyncMock()
|
||||||
mock_redis.hset.assert_called()
|
mock_instance.ping = AsyncMock(return_value=True)
|
||||||
|
mock_instance.hset = AsyncMock(return_value=True)
|
||||||
|
mock_instance.close = AsyncMock()
|
||||||
|
mock_redis_class.return_value = mock_instance
|
||||||
|
|
||||||
def test_gauge_incr(self, manager, mock_redis):
|
from functional_scaffold.core.metrics_unified import MetricsManager
|
||||||
|
|
||||||
|
manager = MetricsManager()
|
||||||
|
await manager.initialize()
|
||||||
|
|
||||||
|
await manager.set("http_requests_in_progress", {}, 5)
|
||||||
|
mock_instance.hset.assert_called()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@patch("redis.asyncio.Redis")
|
||||||
|
async def test_gauge_incr(self, mock_redis_class):
|
||||||
"""测试增加仪表盘"""
|
"""测试增加仪表盘"""
|
||||||
manager.gauge_incr("http_requests_in_progress", {}, 1)
|
mock_instance = AsyncMock()
|
||||||
mock_redis.hincrbyfloat.assert_called()
|
mock_instance.ping = AsyncMock(return_value=True)
|
||||||
|
mock_instance.hincrbyfloat = AsyncMock(return_value=1.0)
|
||||||
|
mock_instance.close = AsyncMock()
|
||||||
|
mock_redis_class.return_value = mock_instance
|
||||||
|
|
||||||
def test_gauge_decr(self, manager, mock_redis):
|
from functional_scaffold.core.metrics_unified import MetricsManager
|
||||||
|
|
||||||
|
manager = MetricsManager()
|
||||||
|
await manager.initialize()
|
||||||
|
|
||||||
|
await manager.gauge_incr("http_requests_in_progress", {}, 1)
|
||||||
|
mock_instance.hincrbyfloat.assert_called()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@patch("redis.asyncio.Redis")
|
||||||
|
async def test_gauge_decr(self, mock_redis_class):
|
||||||
"""测试减少仪表盘"""
|
"""测试减少仪表盘"""
|
||||||
manager.gauge_decr("http_requests_in_progress", {}, 1)
|
mock_instance = AsyncMock()
|
||||||
mock_redis.hincrbyfloat.assert_called()
|
mock_instance.ping = AsyncMock(return_value=True)
|
||||||
|
mock_instance.hincrbyfloat = AsyncMock(return_value=1.0)
|
||||||
|
mock_instance.close = AsyncMock()
|
||||||
|
mock_redis_class.return_value = mock_instance
|
||||||
|
|
||||||
def test_observe_histogram(self, manager, mock_redis):
|
from functional_scaffold.core.metrics_unified import MetricsManager
|
||||||
|
|
||||||
|
manager = MetricsManager()
|
||||||
|
await manager.initialize()
|
||||||
|
|
||||||
|
await manager.gauge_decr("http_requests_in_progress", {}, 1)
|
||||||
|
mock_instance.hincrbyfloat.assert_called()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@patch("redis.asyncio.Redis")
|
||||||
|
async def test_observe_histogram(self, mock_redis_class):
|
||||||
"""测试直方图观测"""
|
"""测试直方图观测"""
|
||||||
mock_pipeline = MagicMock()
|
mock_instance = AsyncMock()
|
||||||
mock_redis.pipeline.return_value = mock_pipeline
|
mock_instance.ping = AsyncMock(return_value=True)
|
||||||
|
mock_instance.close = AsyncMock()
|
||||||
|
|
||||||
manager.observe("http_request_duration_seconds", {"method": "GET", "endpoint": "/"}, 0.05)
|
mock_pipeline = AsyncMock()
|
||||||
|
mock_pipeline.hincrbyfloat = MagicMock()
|
||||||
|
mock_pipeline.execute = AsyncMock(return_value=[])
|
||||||
|
mock_instance.pipeline = MagicMock(return_value=mock_pipeline)
|
||||||
|
|
||||||
mock_redis.pipeline.assert_called()
|
mock_redis_class.return_value = mock_instance
|
||||||
mock_pipeline.execute.assert_called()
|
|
||||||
|
|
||||||
def test_labels_to_key(self, manager):
|
from functional_scaffold.core.metrics_unified import MetricsManager
|
||||||
|
|
||||||
|
manager = MetricsManager()
|
||||||
|
await manager.initialize()
|
||||||
|
|
||||||
|
await manager.observe(
|
||||||
|
"http_request_duration_seconds", {"method": "GET", "endpoint": "/"}, 0.05
|
||||||
|
)
|
||||||
|
mock_instance.pipeline.assert_called()
|
||||||
|
|
||||||
|
def test_labels_to_key(self):
|
||||||
"""测试标签转换为 key"""
|
"""测试标签转换为 key"""
|
||||||
|
from functional_scaffold.core.metrics_unified import MetricsManager
|
||||||
|
|
||||||
|
manager = MetricsManager()
|
||||||
labels = {"method": "GET", "endpoint": "/api"}
|
labels = {"method": "GET", "endpoint": "/api"}
|
||||||
key = manager._labels_to_key(labels)
|
key = manager._labels_to_key(labels)
|
||||||
assert "method=GET" in key
|
assert "method=GET" in key
|
||||||
assert "endpoint=/api" in key
|
assert "endpoint=/api" in key
|
||||||
|
|
||||||
def test_labels_to_key_empty(self, manager):
|
def test_labels_to_key_empty(self):
|
||||||
"""测试空标签转换"""
|
"""测试空标签转换"""
|
||||||
|
from functional_scaffold.core.metrics_unified import MetricsManager
|
||||||
|
|
||||||
|
manager = MetricsManager()
|
||||||
key = manager._labels_to_key(None)
|
key = manager._labels_to_key(None)
|
||||||
assert key == ""
|
assert key == ""
|
||||||
|
|
||||||
key = manager._labels_to_key({})
|
key = manager._labels_to_key({})
|
||||||
assert key == ""
|
assert key == ""
|
||||||
|
|
||||||
def test_is_available(self, manager):
|
@pytest.mark.asyncio
|
||||||
|
@patch("redis.asyncio.Redis")
|
||||||
|
async def test_is_available(self, mock_redis_class):
|
||||||
"""测试 Redis 可用性检查"""
|
"""测试 Redis 可用性检查"""
|
||||||
|
mock_instance = AsyncMock()
|
||||||
|
mock_instance.ping = AsyncMock(return_value=True)
|
||||||
|
mock_instance.close = AsyncMock()
|
||||||
|
mock_redis_class.return_value = mock_instance
|
||||||
|
|
||||||
|
from functional_scaffold.core.metrics_unified import MetricsManager
|
||||||
|
|
||||||
|
manager = MetricsManager()
|
||||||
|
await manager.initialize()
|
||||||
|
|
||||||
assert manager.is_available() is True
|
assert manager.is_available() is True
|
||||||
|
|
||||||
|
|
||||||
class TestConvenienceFunctions:
|
class TestConvenienceFunctions:
|
||||||
"""便捷函数测试"""
|
"""便捷函数测试"""
|
||||||
|
|
||||||
@pytest.fixture(autouse=True)
|
@pytest.mark.asyncio
|
||||||
def setup(self):
|
@patch("redis.asyncio.Redis")
|
||||||
"""每个测试前重置管理器"""
|
async def test_incr_function(self, mock_redis_class):
|
||||||
from functional_scaffold.core.metrics_unified import reset_metrics_manager
|
|
||||||
|
|
||||||
reset_metrics_manager()
|
|
||||||
|
|
||||||
@patch("redis.Redis")
|
|
||||||
def test_incr_function(self, mock_redis_class):
|
|
||||||
"""测试 incr 便捷函数"""
|
"""测试 incr 便捷函数"""
|
||||||
mock_instance = MagicMock()
|
mock_instance = AsyncMock()
|
||||||
mock_instance.ping.return_value = True
|
mock_instance.ping = AsyncMock(return_value=True)
|
||||||
|
mock_instance.hincrbyfloat = AsyncMock(return_value=1.0)
|
||||||
|
mock_instance.close = AsyncMock()
|
||||||
mock_redis_class.return_value = mock_instance
|
mock_redis_class.return_value = mock_instance
|
||||||
|
|
||||||
from functional_scaffold.core.metrics_unified import incr, reset_metrics_manager
|
from functional_scaffold.core.metrics_unified import incr
|
||||||
|
|
||||||
reset_metrics_manager()
|
await incr(
|
||||||
incr("http_requests_total", {"method": "GET", "endpoint": "/", "status": "success"})
|
"http_requests_total", {"method": "GET", "endpoint": "/", "status": "success"}
|
||||||
|
)
|
||||||
|
|
||||||
mock_instance.hincrbyfloat.assert_called()
|
mock_instance.hincrbyfloat.assert_called()
|
||||||
|
|
||||||
@patch("redis.Redis")
|
@pytest.mark.asyncio
|
||||||
def test_set_function(self, mock_redis_class):
|
@patch("redis.asyncio.Redis")
|
||||||
|
async def test_set_function(self, mock_redis_class):
|
||||||
"""测试 set 便捷函数"""
|
"""测试 set 便捷函数"""
|
||||||
mock_instance = MagicMock()
|
mock_instance = AsyncMock()
|
||||||
mock_instance.ping.return_value = True
|
mock_instance.ping = AsyncMock(return_value=True)
|
||||||
|
mock_instance.hset = AsyncMock(return_value=True)
|
||||||
|
mock_instance.close = AsyncMock()
|
||||||
mock_redis_class.return_value = mock_instance
|
mock_redis_class.return_value = mock_instance
|
||||||
|
|
||||||
from functional_scaffold.core.metrics_unified import reset_metrics_manager, set
|
from functional_scaffold.core.metrics_unified import set
|
||||||
|
|
||||||
reset_metrics_manager()
|
await set("http_requests_in_progress", {}, 10)
|
||||||
set("http_requests_in_progress", {}, 10)
|
|
||||||
|
|
||||||
mock_instance.hset.assert_called()
|
mock_instance.hset.assert_called()
|
||||||
|
|
||||||
@patch("redis.Redis")
|
@pytest.mark.asyncio
|
||||||
def test_observe_function(self, mock_redis_class):
|
@patch("redis.asyncio.Redis")
|
||||||
|
async def test_observe_function(self, mock_redis_class):
|
||||||
"""测试 observe 便捷函数"""
|
"""测试 observe 便捷函数"""
|
||||||
mock_instance = MagicMock()
|
mock_instance = AsyncMock()
|
||||||
mock_instance.ping.return_value = True
|
mock_instance.ping = AsyncMock(return_value=True)
|
||||||
mock_pipeline = MagicMock()
|
mock_instance.close = AsyncMock()
|
||||||
mock_instance.pipeline.return_value = mock_pipeline
|
|
||||||
|
mock_pipeline = AsyncMock()
|
||||||
|
mock_pipeline.hincrbyfloat = MagicMock()
|
||||||
|
mock_pipeline.execute = AsyncMock(return_value=[])
|
||||||
|
mock_instance.pipeline = MagicMock(return_value=mock_pipeline)
|
||||||
|
|
||||||
mock_redis_class.return_value = mock_instance
|
mock_redis_class.return_value = mock_instance
|
||||||
|
|
||||||
from functional_scaffold.core.metrics_unified import observe, reset_metrics_manager
|
from functional_scaffold.core.metrics_unified import observe
|
||||||
|
|
||||||
reset_metrics_manager()
|
await observe("http_request_duration_seconds", {"method": "GET", "endpoint": "/"}, 0.1)
|
||||||
observe("http_request_duration_seconds", {"method": "GET", "endpoint": "/"}, 0.1)
|
|
||||||
|
|
||||||
mock_instance.pipeline.assert_called()
|
mock_instance.pipeline.assert_called()
|
||||||
|
|
||||||
@@ -160,42 +241,49 @@ class TestConvenienceFunctions:
|
|||||||
class TestExport:
|
class TestExport:
|
||||||
"""导出功能测试"""
|
"""导出功能测试"""
|
||||||
|
|
||||||
@patch("redis.Redis")
|
@pytest.mark.asyncio
|
||||||
def test_export_counter(self, mock_redis_class):
|
@patch("redis.asyncio.Redis")
|
||||||
|
async def test_export_counter(self, mock_redis_class):
|
||||||
"""测试导出计数器"""
|
"""测试导出计数器"""
|
||||||
mock_instance = MagicMock()
|
mock_instance = AsyncMock()
|
||||||
mock_instance.ping.return_value = True
|
mock_instance.ping = AsyncMock(return_value=True)
|
||||||
mock_instance.hgetall.return_value = {"method=GET,endpoint=/,status=success": "10"}
|
mock_instance.hgetall = AsyncMock(
|
||||||
|
return_value={"method=GET,endpoint=/,status=success": "10"}
|
||||||
|
)
|
||||||
|
mock_instance.hget = AsyncMock(return_value="0")
|
||||||
|
mock_instance.close = AsyncMock()
|
||||||
mock_redis_class.return_value = mock_instance
|
mock_redis_class.return_value = mock_instance
|
||||||
|
|
||||||
from functional_scaffold.core.metrics_unified import export, reset_metrics_manager
|
from functional_scaffold.core.metrics_unified import export
|
||||||
|
|
||||||
reset_metrics_manager()
|
output = await export()
|
||||||
output = export()
|
|
||||||
|
|
||||||
assert "http_requests_total" in output
|
assert "http_requests_total" in output
|
||||||
assert "HELP" in output
|
assert "HELP" in output
|
||||||
assert "TYPE" in output
|
assert "TYPE" in output
|
||||||
|
|
||||||
@patch("redis.Redis")
|
@pytest.mark.asyncio
|
||||||
def test_export_histogram(self, mock_redis_class):
|
@patch("redis.asyncio.Redis")
|
||||||
|
async def test_export_histogram(self, mock_redis_class):
|
||||||
"""测试导出直方图"""
|
"""测试导出直方图"""
|
||||||
mock_instance = MagicMock()
|
mock_instance = AsyncMock()
|
||||||
mock_instance.ping.return_value = True
|
mock_instance.ping = AsyncMock(return_value=True)
|
||||||
mock_instance.hgetall.side_effect = lambda key: (
|
|
||||||
{"method=GET,endpoint=/": "5"}
|
async def mock_hgetall(key):
|
||||||
if "count" in key
|
if "count" in key:
|
||||||
else {"method=GET,endpoint=/": "0.5"}
|
return {"method=GET,endpoint=/": "5"}
|
||||||
if "sum" in key
|
elif "sum" in key:
|
||||||
else {}
|
return {"method=GET,endpoint=/": "0.5"}
|
||||||
)
|
return {}
|
||||||
mock_instance.hget.return_value = "3"
|
|
||||||
|
mock_instance.hgetall = mock_hgetall
|
||||||
|
mock_instance.hget = AsyncMock(return_value="3")
|
||||||
|
mock_instance.close = AsyncMock()
|
||||||
mock_redis_class.return_value = mock_instance
|
mock_redis_class.return_value = mock_instance
|
||||||
|
|
||||||
from functional_scaffold.core.metrics_unified import export, reset_metrics_manager
|
from functional_scaffold.core.metrics_unified import export
|
||||||
|
|
||||||
reset_metrics_manager()
|
output = await export()
|
||||||
output = export()
|
|
||||||
|
|
||||||
assert "http_request_duration_seconds" in output
|
assert "http_request_duration_seconds" in output
|
||||||
|
|
||||||
@@ -226,21 +314,9 @@ class TestEnvVarSubstitution:
|
|||||||
class TestTrackAlgorithmExecution:
|
class TestTrackAlgorithmExecution:
|
||||||
"""track_algorithm_execution 装饰器测试"""
|
"""track_algorithm_execution 装饰器测试"""
|
||||||
|
|
||||||
@patch("redis.Redis")
|
def test_decorator_success(self):
|
||||||
def test_decorator_success(self, mock_redis_class):
|
|
||||||
"""测试装饰器成功执行"""
|
"""测试装饰器成功执行"""
|
||||||
mock_instance = MagicMock()
|
from functional_scaffold.core.metrics_unified import track_algorithm_execution
|
||||||
mock_instance.ping.return_value = True
|
|
||||||
mock_pipeline = MagicMock()
|
|
||||||
mock_instance.pipeline.return_value = mock_pipeline
|
|
||||||
mock_redis_class.return_value = mock_instance
|
|
||||||
|
|
||||||
from functional_scaffold.core.metrics_unified import (
|
|
||||||
reset_metrics_manager,
|
|
||||||
track_algorithm_execution,
|
|
||||||
)
|
|
||||||
|
|
||||||
reset_metrics_manager()
|
|
||||||
|
|
||||||
@track_algorithm_execution("test_algo")
|
@track_algorithm_execution("test_algo")
|
||||||
def test_func():
|
def test_func():
|
||||||
@@ -249,21 +325,9 @@ class TestTrackAlgorithmExecution:
|
|||||||
result = test_func()
|
result = test_func()
|
||||||
assert result == "result"
|
assert result == "result"
|
||||||
|
|
||||||
@patch("redis.Redis")
|
def test_decorator_error(self):
|
||||||
def test_decorator_error(self, mock_redis_class):
|
|
||||||
"""测试装饰器错误处理"""
|
"""测试装饰器错误处理"""
|
||||||
mock_instance = MagicMock()
|
from functional_scaffold.core.metrics_unified import track_algorithm_execution
|
||||||
mock_instance.ping.return_value = True
|
|
||||||
mock_pipeline = MagicMock()
|
|
||||||
mock_instance.pipeline.return_value = mock_pipeline
|
|
||||||
mock_redis_class.return_value = mock_instance
|
|
||||||
|
|
||||||
from functional_scaffold.core.metrics_unified import (
|
|
||||||
reset_metrics_manager,
|
|
||||||
track_algorithm_execution,
|
|
||||||
)
|
|
||||||
|
|
||||||
reset_metrics_manager()
|
|
||||||
|
|
||||||
@track_algorithm_execution("test_algo")
|
@track_algorithm_execution("test_algo")
|
||||||
def test_func():
|
def test_func():
|
||||||
|
|||||||
Reference in New Issue
Block a user