main:删除多余文档并清理项目目录

变更内容:
- 移除冗余文档,包括 Grafana 指南、指标对比、修复总结、OpenAPI 规范等。
- 精简项目文档结构,优化 README 文件内容。
- 提升文档层次清晰度,集中核心指南。
This commit is contained in:
2026-02-02 14:59:34 +08:00
parent 241cffebc2
commit b1077e78e9
23 changed files with 3763 additions and 960 deletions

View File

@@ -7,6 +7,7 @@
## 特性 ## 特性
-**标准化 API 接口** - 符合 RESTful 规范的 HTTP 接口 -**标准化 API 接口** - 符合 RESTful 规范的 HTTP 接口
-**同步/异步调用** - 支持同步调用和异步任务
-**开箱即用** - 完整的项目结构和配置 -**开箱即用** - 完整的项目结构和配置
-**自动文档** - Swagger/OpenAPI 自动生成 -**自动文档** - Swagger/OpenAPI 自动生成
-**监控指标** - Prometheus 指标和 Grafana 仪表板 -**监控指标** - Prometheus 指标和 Grafana 仪表板
@@ -16,6 +17,16 @@
-**完整测试** - 单元测试和集成测试 -**完整测试** - 单元测试和集成测试
-**CI/CD** - GitHub Actions 工作流 -**CI/CD** - GitHub Actions 工作流
## 文档
| 文档 | 描述 |
|------|------|
| [快速入门](docs/getting-started.md) | 10 分钟上手指南 |
| [算法开发指南](docs/algorithm-development.md) | 详细的算法开发教程 |
| [API 参考](docs/api-reference.md) | 完整的 API 文档 |
| [监控指南](docs/monitoring.md) | 监控和告警配置 |
| [API 规范](docs/api/README.md) | OpenAPI 规范说明 |
## 快速开始 ## 快速开始
### 前置要求 ### 前置要求
@@ -76,7 +87,8 @@ docker-compose up
### 核心接口 ### 核心接口
- `POST /invoke` - 同步调用算法 - `POST /invoke` - 同步调用算法
- `POST /jobs` - 异步任务接口(预留) - `POST /jobs` - 创建异步任务
- `GET /jobs/{job_id}` - 查询任务状态
### 健康检查 ### 健康检查
@@ -89,7 +101,7 @@ docker-compose up
## 示例请求 ## 示例请求
### 质数判断 ### 同步调用 - 质数判断
```bash ```bash
curl -X POST http://localhost:8000/invoke \ curl -X POST http://localhost:8000/invoke \
@@ -117,6 +129,18 @@ curl -X POST http://localhost:8000/invoke \
} }
``` ```
### 异步任务
```bash
# 创建任务
curl -X POST http://localhost:8000/jobs \
-H "Content-Type: application/json" \
-d '{"algorithm": "PrimeChecker", "params": {"number": 17}}'
# 查询状态
curl http://localhost:8000/jobs/{job_id}
```
## 项目结构 ## 项目结构
``` ```
@@ -141,6 +165,8 @@ FunctionalScaffold/
## 开发指南 ## 开发指南
详细的开发指南请参考 [算法开发指南](docs/algorithm-development.md)。
### 添加新算法 ### 添加新算法
1.`src/functional_scaffold/algorithms/` 创建新算法文件 1.`src/functional_scaffold/algorithms/` 创建新算法文件
@@ -212,6 +238,8 @@ sam deploy --template-file deployment/serverless/aws-lambda.yaml
## 监控 ## 监控
详细的监控配置请参考 [监控指南](docs/monitoring.md)。
### Prometheus 指标 ### Prometheus 指标
访问 `/metrics` 端点查看可用指标: 访问 `/metrics` 端点查看可用指标:
@@ -220,6 +248,8 @@ sam deploy --template-file deployment/serverless/aws-lambda.yaml
- `http_request_duration_seconds` - HTTP 请求延迟 - `http_request_duration_seconds` - HTTP 请求延迟
- `algorithm_executions_total` - 算法执行总数 - `algorithm_executions_total` - 算法执行总数
- `algorithm_execution_duration_seconds` - 算法执行延迟 - `algorithm_execution_duration_seconds` - 算法执行延迟
- `jobs_created_total` - 异步任务创建总数
- `jobs_completed_total` - 异步任务完成总数
### Grafana 仪表板 ### Grafana 仪表板

View File

@@ -69,10 +69,33 @@ custom_metrics:
labels: [] labels: []
buckets: [10, 100, 1000, 10000, 100000, 1000000] buckets: [10, 100, 1000, 10000, 100000, 1000000]
# 添加更多自定义指标... # 异步任务指标
# my_custom_metric: jobs_created:
# name: "my_metric_name" name: "jobs_created_total"
# type: counter|gauge|histogram type: counter
# description: "指标描述" description: "创建的异步任务总数"
# labels: [label1, label2] labels: [algorithm]
# buckets: [...] # 仅 histogram 需要
jobs_completed:
name: "jobs_completed_total"
type: counter
description: "完成的异步任务总数"
labels: [algorithm, status]
job_execution_duration:
name: "job_execution_duration_seconds"
type: histogram
description: "异步任务执行时间"
labels: [algorithm]
buckets: [0.1, 0.5, 1, 5, 10, 30, 60, 120, 300]
webhook_deliveries:
name: "webhook_deliveries_total"
type: counter
description: "Webhook 回调发送总数"
labels: [status]
prime_check_total:
name: "prime_check"
type: counter
description: "出现问题的次数"
labels: [status]

View File

@@ -0,0 +1,556 @@
# 算法开发指南
本文档详细介绍如何在 FunctionalScaffold 框架中开发算法,包括最佳实践、高级特性和常见模式。
## 算法架构
### 核心概念
```
┌─────────────────────────────────────────────────────────────┐
│ HTTP 请求 │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ FastAPI 路由层 │
│ - 参数验证 (Pydantic) │
│ - 请求 ID 生成 │
│ - 错误处理 │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ BaseAlgorithm.execute() │
│ - 自动计时 │
│ - 指标记录 │
│ - 异常捕获 │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ YourAlgorithm.process() │
│ ★ 你只需要实现这个方法 ★ │
└─────────────────────────────────────────────────────────────┘
```
### BaseAlgorithm 基类
所有算法必须继承 `BaseAlgorithm` 类:
```python
from abc import ABC, abstractmethod
from typing import Any, Dict
class BaseAlgorithm(ABC):
"""算法基类"""
def __init__(self):
self.name = self.__class__.__name__
self.version = "1.0.0"
@abstractmethod
def process(self, *args, **kwargs) -> Dict[str, Any]:
"""算法处理逻辑 - 子类必须实现"""
pass
def execute(self, *args, **kwargs) -> Dict[str, Any]:
"""执行算法 - 自动处理埋点和错误"""
# 框架自动处理:计时、日志、指标、异常
pass
```
## 开发算法
### 基础示例
```python
# src/functional_scaffold/algorithms/text_processor.py
from typing import Dict, Any
from .base import BaseAlgorithm
class TextProcessor(BaseAlgorithm):
"""文本处理算法"""
def __init__(self):
super().__init__()
self.version = "1.0.0"
def process(self, text: str, operation: str = "upper") -> Dict[str, Any]:
"""
处理文本
Args:
text: 输入文本
operation: 操作类型 (upper/lower/reverse)
Returns:
处理结果
"""
if operation == "upper":
result = text.upper()
elif operation == "lower":
result = text.lower()
elif operation == "reverse":
result = text[::-1]
else:
raise ValueError(f"不支持的操作: {operation}")
return {
"original": text,
"processed": result,
"operation": operation,
"length": len(result)
}
```
### 带模型加载的算法
```python
# src/functional_scaffold/algorithms/ml_predictor.py
from typing import Dict, Any
import logging
from .base import BaseAlgorithm
logger = logging.getLogger(__name__)
class MLPredictor(BaseAlgorithm):
"""机器学习预测算法"""
def __init__(self, model_path: str = None):
super().__init__()
self.model = None
self.model_path = model_path or "models/default.pkl"
self._load_model()
def _load_model(self):
"""加载模型(在初始化时执行一次)"""
logger.info(f"加载模型: {self.model_path}")
# import joblib
# self.model = joblib.load(self.model_path)
self.model = "mock_model" # 示例
logger.info("模型加载完成")
def process(self, features: list) -> Dict[str, Any]:
"""
执行预测
Args:
features: 特征向量
Returns:
预测结果
"""
if self.model is None:
raise RuntimeError("模型未加载")
# prediction = self.model.predict([features])[0]
prediction = sum(features) / len(features) # 示例
return {
"features": features,
"prediction": prediction,
"model_version": self.version
}
```
### 带外部服务调用的算法
```python
# src/functional_scaffold/algorithms/data_fetcher.py
from typing import Dict, Any
import httpx
from .base import BaseAlgorithm
from ..config import settings
class DataFetcher(BaseAlgorithm):
"""数据获取算法"""
def __init__(self):
super().__init__()
self.api_base_url = settings.external_api_url or "https://api.example.com"
async def _fetch_data(self, endpoint: str) -> dict:
"""异步获取数据"""
async with httpx.AsyncClient() as client:
response = await client.get(f"{self.api_base_url}/{endpoint}")
response.raise_for_status()
return response.json()
def process(self, data_id: str) -> Dict[str, Any]:
"""
获取并处理数据
Args:
data_id: 数据 ID
Returns:
处理后的数据
"""
import asyncio
# 在同步方法中调用异步函数
loop = asyncio.new_event_loop()
try:
data = loop.run_until_complete(self._fetch_data(f"data/{data_id}"))
finally:
loop.close()
# 处理数据
processed = self._transform(data)
return {
"data_id": data_id,
"raw_data": data,
"processed_data": processed
}
def _transform(self, data: dict) -> dict:
"""数据转换"""
return {k: v for k, v in data.items() if v is not None}
```
## 数据模型定义
### 请求模型
```python
# src/functional_scaffold/api/models.py
from pydantic import BaseModel, Field, ConfigDict, field_validator
from typing import List, Optional
class TextProcessRequest(BaseModel):
"""文本处理请求"""
model_config = ConfigDict(
json_schema_extra={
"example": {
"text": "Hello World",
"operation": "upper"
}
}
)
text: str = Field(..., min_length=1, max_length=10000, description="输入文本")
operation: str = Field("upper", description="操作类型")
@field_validator("operation")
@classmethod
def validate_operation(cls, v):
allowed = ["upper", "lower", "reverse"]
if v not in allowed:
raise ValueError(f"operation 必须是 {allowed} 之一")
return v
class MLPredictRequest(BaseModel):
"""ML 预测请求"""
model_config = ConfigDict(
json_schema_extra={
"example": {
"features": [1.0, 2.0, 3.0, 4.0]
}
}
)
features: List[float] = Field(..., min_length=1, description="特征向量")
```
### 响应模型
```python
class TextProcessResponse(BaseModel):
"""文本处理响应"""
request_id: str = Field(..., description="请求 ID")
status: str = Field(..., description="处理状态")
result: Dict[str, Any] = Field(..., description="处理结果")
metadata: Dict[str, Any] = Field(..., description="元数据")
```
## 路由注册
### 同步接口
```python
# src/functional_scaffold/api/routes.py
from fastapi import APIRouter, HTTPException, Depends, status
from .models import TextProcessRequest, TextProcessResponse
from .dependencies import get_request_id
from ..algorithms.text_processor import TextProcessor
router = APIRouter()
@router.post(
"/text/process",
response_model=TextProcessResponse,
summary="文本处理",
description="对输入文本执行指定操作",
)
async def process_text(
request: TextProcessRequest,
request_id: str = Depends(get_request_id),
):
"""文本处理端点"""
try:
processor = TextProcessor()
result = processor.execute(request.text, request.operation)
if not result["success"]:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail={"error": "ALGORITHM_ERROR", "message": result["error"]}
)
return TextProcessResponse(
request_id=request_id,
status="success",
result=result["result"],
metadata=result["metadata"]
)
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={"error": "VALIDATION_ERROR", "message": str(e)}
)
```
### 异步任务接口
算法注册后自动支持异步调用,无需额外代码:
```bash
# 创建异步任务
curl -X POST http://localhost:8000/jobs \
-H "Content-Type: application/json" \
-d '{
"algorithm": "TextProcessor",
"params": {"text": "hello", "operation": "upper"}
}'
```
## 自定义指标
### 定义指标
`config/metrics.yaml` 中添加:
```yaml
custom_metrics:
# 文本处理统计
text_process_total:
name: "text_process_total"
type: counter
description: "文本处理总数"
labels: [operation]
text_length_histogram:
name: "text_length_histogram"
type: histogram
description: "处理文本长度分布"
labels: []
buckets: [10, 50, 100, 500, 1000, 5000, 10000]
```
### 记录指标
```python
from ..core.metrics_unified import incr, observe
class TextProcessor(BaseAlgorithm):
def process(self, text: str, operation: str = "upper") -> Dict[str, Any]:
# 记录操作计数
incr("text_process_total", {"operation": operation})
# 记录文本长度
observe("text_length_histogram", {}, len(text))
# ... 算法逻辑 ...
```
## 错误处理
### 自定义异常
```python
# src/functional_scaffold/core/errors.py
class AlgorithmError(FunctionalScaffoldError):
"""算法执行错误"""
pass
class ModelNotLoadedError(AlgorithmError):
"""模型未加载错误"""
pass
class InvalidInputError(AlgorithmError):
"""无效输入错误"""
pass
```
### 在算法中使用
```python
from ..core.errors import InvalidInputError, ModelNotLoadedError
class MLPredictor(BaseAlgorithm):
def process(self, features: list) -> Dict[str, Any]:
if not features:
raise InvalidInputError("特征向量不能为空")
if self.model is None:
raise ModelNotLoadedError("模型未加载,请检查模型文件")
# ... 算法逻辑 ...
```
## 测试
### 单元测试
```python
# tests/test_text_processor.py
import pytest
from src.functional_scaffold.algorithms.text_processor import TextProcessor
class TestTextProcessor:
"""文本处理算法测试"""
def test_upper_operation(self):
"""测试大写转换"""
processor = TextProcessor()
result = processor.process("hello", "upper")
assert result["processed"] == "HELLO"
assert result["operation"] == "upper"
def test_lower_operation(self):
"""测试小写转换"""
processor = TextProcessor()
result = processor.process("HELLO", "lower")
assert result["processed"] == "hello"
def test_reverse_operation(self):
"""测试反转"""
processor = TextProcessor()
result = processor.process("hello", "reverse")
assert result["processed"] == "olleh"
def test_invalid_operation(self):
"""测试无效操作"""
processor = TextProcessor()
with pytest.raises(ValueError, match="不支持的操作"):
processor.process("hello", "invalid")
def test_execute_wrapper(self):
"""测试 execute 包装器"""
processor = TextProcessor()
result = processor.execute("hello", "upper")
assert result["success"] is True
assert result["result"]["processed"] == "HELLO"
assert "elapsed_time" in result["metadata"]
```
### API 集成测试
```python
# tests/test_text_api.py
import pytest
from fastapi import status
class TestTextProcessAPI:
"""文本处理 API 测试"""
def test_process_text_success(self, client):
"""测试成功处理"""
response = client.post(
"/text/process",
json={"text": "hello", "operation": "upper"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["status"] == "success"
assert data["result"]["processed"] == "HELLO"
def test_process_text_invalid_operation(self, client):
"""测试无效操作"""
response = client.post(
"/text/process",
json={"text": "hello", "operation": "invalid"}
)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
```
## 最佳实践
### 1. 保持 process() 方法简洁
```python
# ✅ 好的做法
def process(self, data):
validated = self._validate(data)
transformed = self._transform(validated)
result = self._compute(transformed)
return self._format_output(result)
# ❌ 避免
def process(self, data):
# 200 行代码全部写在这里...
```
### 2. 使用类型注解
```python
# ✅ 好的做法
def process(self, text: str, max_length: int = 100) -> Dict[str, Any]:
...
# ❌ 避免
def process(self, text, max_length=100):
...
```
### 3. 合理使用日志
```python
import logging
logger = logging.getLogger(__name__)
class MyAlgorithm(BaseAlgorithm):
def process(self, data):
logger.info(f"开始处理数据,大小: {len(data)}")
# ... 处理逻辑 ...
logger.info(f"处理完成,结果大小: {len(result)}")
return result
```
### 4. 资源管理
```python
class ResourceIntensiveAlgorithm(BaseAlgorithm):
def __init__(self):
super().__init__()
self._resource = None
def _ensure_resource(self):
"""延迟加载资源"""
if self._resource is None:
self._resource = self._load_heavy_resource()
def process(self, data):
self._ensure_resource()
return self._resource.process(data)
```
## 参考
- [快速入门指南](./getting-started.md)
- [API 参考文档](./api-reference.md)
- [监控指南](./monitoring.md)

441
docs/api-reference.md Normal file
View File

@@ -0,0 +1,441 @@
# API 参考文档
本文档详细描述 FunctionalScaffold 提供的所有 API 端点。
## 基础信息
- **Base URL**: `http://localhost:8000`
- **Content-Type**: `application/json`
- **认证**: 当前版本无需认证
## 端点概览
| 方法 | 端点 | 描述 |
|------|------|------|
| POST | `/invoke` | 同步调用算法 |
| POST | `/jobs` | 创建异步任务 |
| GET | `/jobs/{job_id}` | 查询任务状态 |
| GET | `/healthz` | 存活检查 |
| GET | `/readyz` | 就绪检查 |
| GET | `/metrics` | Prometheus 指标 |
---
## 同步调用接口
### POST /invoke
同步调用质数判断算法,立即返回结果。
#### 请求
```http
POST /invoke
Content-Type: application/json
```
**请求体**
| 字段 | 类型 | 必填 | 描述 |
|------|------|------|------|
| number | integer | 是 | 待判断的整数 |
**示例**
```json
{
"number": 17
}
```
#### 响应
**成功响应 (200 OK)**
```json
{
"request_id": "550e8400-e29b-41d4-a716-446655440000",
"status": "success",
"result": {
"number": 17,
"is_prime": true,
"factors": [],
"algorithm": "trial_division"
},
"metadata": {
"algorithm": "PrimeChecker",
"version": "1.0.0",
"elapsed_time": 0.001
}
}
```
**错误响应 (400 Bad Request)**
```json
{
"error": "VALIDATION_ERROR",
"message": "number must be an integer",
"details": {"field": "number", "value": "abc"},
"request_id": "550e8400-e29b-41d4-a716-446655440000"
}
```
#### 示例调用
```bash
curl -X POST http://localhost:8000/invoke \
-H "Content-Type: application/json" \
-d '{"number": 17}'
```
---
## 异步任务接口
### POST /jobs
创建异步任务,立即返回任务 ID任务在后台执行。
#### 请求
```http
POST /jobs
Content-Type: application/json
```
**请求体**
| 字段 | 类型 | 必填 | 描述 |
|------|------|------|------|
| algorithm | string | 是 | 算法名称(如 PrimeChecker |
| params | object | 是 | 算法参数 |
| webhook | string | 否 | 任务完成后的回调 URL |
**示例**
```json
{
"algorithm": "PrimeChecker",
"params": {"number": 17},
"webhook": "https://example.com/callback"
}
```
#### 响应
**成功响应 (202 Accepted)**
```json
{
"job_id": "a1b2c3d4e5f6",
"status": "pending",
"message": "任务已创建",
"created_at": "2026-02-02T10:00:00+00:00"
}
```
**错误响应 (404 Not Found)**
```json
{
"detail": {
"error": "ALGORITHM_NOT_FOUND",
"message": "算法 'NonExistent' 不存在",
"details": {"available_algorithms": ["PrimeChecker"]},
"request_id": "xxx"
}
}
```
**错误响应 (503 Service Unavailable)**
```json
{
"detail": {
"error": "SERVICE_UNAVAILABLE",
"message": "任务服务暂不可用,请稍后重试",
"request_id": "xxx"
}
}
```
#### 示例调用
```bash
curl -X POST http://localhost:8000/jobs \
-H "Content-Type: application/json" \
-d '{
"algorithm": "PrimeChecker",
"params": {"number": 17},
"webhook": "https://webhook.site/your-uuid"
}'
```
---
### GET /jobs/{job_id}
查询异步任务的执行状态和结果。
#### 请求
```http
GET /jobs/{job_id}
```
**路径参数**
| 参数 | 类型 | 描述 |
|------|------|------|
| job_id | string | 任务唯一标识12位十六进制 |
#### 响应
**成功响应 (200 OK) - 任务进行中**
```json
{
"job_id": "a1b2c3d4e5f6",
"status": "running",
"algorithm": "PrimeChecker",
"created_at": "2026-02-02T10:00:00+00:00",
"started_at": "2026-02-02T10:00:01+00:00",
"completed_at": null,
"result": null,
"error": null,
"metadata": null
}
```
**成功响应 (200 OK) - 任务完成**
```json
{
"job_id": "a1b2c3d4e5f6",
"status": "completed",
"algorithm": "PrimeChecker",
"created_at": "2026-02-02T10:00:00+00:00",
"started_at": "2026-02-02T10:00:01+00:00",
"completed_at": "2026-02-02T10:00:02+00:00",
"result": {
"number": 17,
"is_prime": true,
"factors": [],
"algorithm": "trial_division"
},
"error": null,
"metadata": {
"algorithm": "PrimeChecker",
"version": "1.0.0",
"elapsed_time": 0.001
}
}
```
**成功响应 (200 OK) - 任务失败**
```json
{
"job_id": "a1b2c3d4e5f6",
"status": "failed",
"algorithm": "PrimeChecker",
"created_at": "2026-02-02T10:00:00+00:00",
"started_at": "2026-02-02T10:00:01+00:00",
"completed_at": "2026-02-02T10:00:02+00:00",
"result": null,
"error": "Invalid input: number must be positive",
"metadata": null
}
```
**错误响应 (404 Not Found)**
```json
{
"detail": {
"error": "JOB_NOT_FOUND",
"message": "任务 'xxx' 不存在或已过期"
}
}
```
#### 任务状态说明
| 状态 | 描述 |
|------|------|
| pending | 等待执行 |
| running | 执行中 |
| completed | 已完成 |
| failed | 执行失败 |
#### 示例调用
```bash
curl http://localhost:8000/jobs/a1b2c3d4e5f6
```
---
### Webhook 回调
当任务完成时,如果指定了 webhook URL系统会发送 POST 请求到该 URL。
**回调请求**
```http
POST {webhook_url}
Content-Type: application/json
```
**回调负载**
```json
{
"job_id": "a1b2c3d4e5f6",
"status": "completed",
"algorithm": "PrimeChecker",
"result": {"number": 17, "is_prime": true},
"error": null,
"metadata": {"elapsed_time": 0.001},
"completed_at": "2026-02-02T10:00:02+00:00"
}
```
**重试机制**
- 最大重试次数3 次
- 重试间隔1s, 5s, 15s指数退避
- 超时时间10 秒
---
## 健康检查接口
### GET /healthz
存活检查端点,用于 Kubernetes 存活探针。
#### 响应
**成功响应 (200 OK)**
```json
{
"status": "healthy",
"timestamp": 1706868000.123
}
```
#### 示例调用
```bash
curl http://localhost:8000/healthz
```
---
### GET /readyz
就绪检查端点,用于 Kubernetes 就绪探针。
#### 响应
**成功响应 (200 OK)**
```json
{
"status": "ready",
"timestamp": 1706868000.123,
"checks": {
"algorithm": true
}
}
```
#### 示例调用
```bash
curl http://localhost:8000/readyz
```
---
## 监控接口
### GET /metrics
返回 Prometheus 格式的监控指标。
#### 响应
**成功响应 (200 OK)**
```
Content-Type: text/plain; version=0.0.4; charset=utf-8
# HELP http_requests_total HTTP 请求总数
# TYPE http_requests_total counter
http_requests_total{endpoint="/invoke",method="POST",status="success"} 42
# HELP http_request_duration_seconds HTTP 请求延迟
# TYPE http_request_duration_seconds histogram
http_request_duration_seconds_bucket{endpoint="/invoke",method="POST",le="0.01"} 35
...
# HELP algorithm_executions_total 算法执行总数
# TYPE algorithm_executions_total counter
algorithm_executions_total{algorithm="PrimeChecker",status="success"} 42
# HELP jobs_created_total 创建的异步任务总数
# TYPE jobs_created_total counter
jobs_created_total{algorithm="PrimeChecker"} 10
# HELP jobs_completed_total 完成的异步任务总数
# TYPE jobs_completed_total counter
jobs_completed_total{algorithm="PrimeChecker",status="completed"} 8
jobs_completed_total{algorithm="PrimeChecker",status="failed"} 2
```
#### 可用指标
| 指标名称 | 类型 | 标签 | 描述 |
|---------|------|------|------|
| http_requests_total | counter | method, endpoint, status | HTTP 请求总数 |
| http_request_duration_seconds | histogram | method, endpoint | HTTP 请求延迟 |
| http_requests_in_progress | gauge | - | 当前进行中的请求数 |
| algorithm_executions_total | counter | algorithm, status | 算法执行总数 |
| algorithm_execution_duration_seconds | histogram | algorithm | 算法执行延迟 |
| jobs_created_total | counter | algorithm | 创建的异步任务总数 |
| jobs_completed_total | counter | algorithm, status | 完成的异步任务总数 |
| job_execution_duration_seconds | histogram | algorithm | 异步任务执行时间 |
| webhook_deliveries_total | counter | status | Webhook 回调发送总数 |
#### 示例调用
```bash
curl http://localhost:8000/metrics
```
---
## 错误码说明
| 错误码 | HTTP 状态码 | 描述 |
|--------|------------|------|
| VALIDATION_ERROR | 400 | 请求参数验证失败 |
| ALGORITHM_NOT_FOUND | 404 | 指定的算法不存在 |
| JOB_NOT_FOUND | 404 | 任务不存在或已过期 |
| ALGORITHM_ERROR | 500 | 算法执行错误 |
| INTERNAL_ERROR | 500 | 服务器内部错误 |
| SERVICE_UNAVAILABLE | 503 | 服务暂不可用 |
---
## 在线文档
启动服务后,可以访问交互式 API 文档:
- **Swagger UI**: http://localhost:8000/docs
- **ReDoc**: http://localhost:8000/redoc
- **OpenAPI JSON**: http://localhost:8000/openapi.json

58
docs/api/README.md Normal file
View File

@@ -0,0 +1,58 @@
# API 文档
本目录包含 API 相关文档和自动生成的 OpenAPI 规范。
## 文件说明
- `openapi.json` - 自动生成的 OpenAPI 3.0 规范文件
## 生成文档
运行以下命令更新 OpenAPI 规范:
```bash
python scripts/export_openapi.py
```
## 在线文档
启动应用后,访问以下 URL 查看交互式文档:
| 文档类型 | 地址 |
|---------|------|
| Swagger UI | http://localhost:8000/docs |
| ReDoc | http://localhost:8000/redoc |
| OpenAPI JSON | http://localhost:8000/openapi.json |
## API 端点
### 核心接口
| 方法 | 端点 | 描述 |
|------|------|------|
| POST | `/invoke` | 同步调用算法 |
| POST | `/jobs` | 创建异步任务 |
| GET | `/jobs/{job_id}` | 查询任务状态 |
### 健康检查
| 方法 | 端点 | 描述 |
|------|------|------|
| GET | `/healthz` | 存活检查 |
| GET | `/readyz` | 就绪检查 |
### 监控
| 方法 | 端点 | 描述 |
|------|------|------|
| GET | `/metrics` | Prometheus 指标 |
## 详细文档
完整的 API 参考文档请查看:[API 参考文档](../api-reference.md)
## 注意事项
- `openapi.json` 是自动生成的,请勿手动编辑
- API 变更后需要重新运行导出脚本
- 确保 Pydantic 模型包含完整的文档字符串和示例

View File

@@ -135,15 +135,148 @@
"tags": [ "tags": [
"Algorithm" "Algorithm"
], ],
"summary": "异步任务接口(预留)", "summary": "创建异步任务",
"description": "异步任务接口,当前版本未实现", "description": "创建异步任务,立即返回任务 ID任务在后台执行",
"operationId": "create_job_jobs_post", "operationId": "create_job_jobs_post",
"responses": { "parameters": [
"501": { {
"description": "Successful Response", "name": "x-request-id",
"in": "header",
"required": false,
"schema": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"title": "X-Request-Id"
}
}
],
"requestBody": {
"required": true,
"content": { "content": {
"application/json": { "application/json": {
"schema": {} "schema": {
"$ref": "#/components/schemas/JobRequest"
}
}
}
},
"responses": {
"202": {
"description": "任务已创建",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/JobCreateResponse"
}
}
}
},
"400": {
"description": "请求参数错误",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorResponse"
}
}
}
},
"404": {
"description": "算法不存在",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorResponse"
}
}
}
},
"503": {
"description": "服务不可用",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorResponse"
}
}
}
},
"422": {
"description": "Validation Error",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/HTTPValidationError"
}
}
}
}
}
}
},
"/jobs/{job_id}": {
"get": {
"tags": [
"Algorithm"
],
"summary": "查询任务状态",
"description": "查询异步任务的执行状态和结果",
"operationId": "get_job_status_jobs__job_id__get",
"parameters": [
{
"name": "job_id",
"in": "path",
"required": true,
"schema": {
"type": "string",
"title": "Job Id"
}
}
],
"responses": {
"200": {
"description": "成功",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/JobStatusResponse"
}
}
}
},
"404": {
"description": "任务不存在或已过期",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorResponse"
}
}
}
},
"503": {
"description": "服务不可用",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorResponse"
}
}
}
},
"422": {
"description": "Validation Error",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/HTTPValidationError"
}
} }
} }
} }
@@ -330,6 +463,205 @@
"status": "success" "status": "success"
} }
}, },
"JobCreateResponse": {
"properties": {
"job_id": {
"type": "string",
"title": "Job Id",
"description": "任务唯一标识"
},
"status": {
"$ref": "#/components/schemas/JobStatus",
"description": "任务状态"
},
"message": {
"type": "string",
"title": "Message",
"description": "状态消息"
},
"created_at": {
"type": "string",
"title": "Created At",
"description": "创建时间ISO 8601"
}
},
"type": "object",
"required": [
"job_id",
"status",
"message",
"created_at"
],
"title": "JobCreateResponse",
"description": "任务创建响应",
"example": {
"created_at": "2026-02-02T10:00:00Z",
"job_id": "a1b2c3d4e5f6",
"message": "任务已创建",
"status": "pending"
}
},
"JobRequest": {
"properties": {
"algorithm": {
"type": "string",
"title": "Algorithm",
"description": "算法名称"
},
"params": {
"additionalProperties": true,
"type": "object",
"title": "Params",
"description": "算法参数"
},
"webhook": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"title": "Webhook",
"description": "回调 URL"
}
},
"type": "object",
"required": [
"algorithm",
"params"
],
"title": "JobRequest",
"description": "异步任务请求",
"example": {
"algorithm": "PrimeChecker",
"params": {
"number": 17
},
"webhook": "https://example.com/callback"
}
},
"JobStatus": {
"type": "string",
"enum": [
"pending",
"running",
"completed",
"failed"
],
"title": "JobStatus",
"description": "任务状态枚举"
},
"JobStatusResponse": {
"properties": {
"job_id": {
"type": "string",
"title": "Job Id",
"description": "任务唯一标识"
},
"status": {
"$ref": "#/components/schemas/JobStatus",
"description": "任务状态"
},
"algorithm": {
"type": "string",
"title": "Algorithm",
"description": "算法名称"
},
"created_at": {
"type": "string",
"title": "Created At",
"description": "创建时间ISO 8601"
},
"started_at": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"title": "Started At",
"description": "开始执行时间ISO 8601"
},
"completed_at": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"title": "Completed At",
"description": "完成时间ISO 8601"
},
"result": {
"anyOf": [
{
"additionalProperties": true,
"type": "object"
},
{
"type": "null"
}
],
"title": "Result",
"description": "执行结果(仅完成时返回)"
},
"error": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"title": "Error",
"description": "错误信息(仅失败时返回)"
},
"metadata": {
"anyOf": [
{
"additionalProperties": true,
"type": "object"
},
{
"type": "null"
}
],
"title": "Metadata",
"description": "元数据信息"
}
},
"type": "object",
"required": [
"job_id",
"status",
"algorithm",
"created_at"
],
"title": "JobStatusResponse",
"description": "任务状态查询响应",
"example": {
"algorithm": "PrimeChecker",
"completed_at": "2026-02-02T10:00:02Z",
"created_at": "2026-02-02T10:00:00Z",
"job_id": "a1b2c3d4e5f6",
"metadata": {
"elapsed_time": 0.001
},
"result": {
"is_prime": true,
"number": 17
},
"started_at": "2026-02-02T10:00:01Z",
"status": "completed"
}
},
"ReadinessResponse": { "ReadinessResponse": {
"properties": { "properties": {
"status": { "status": {

249
docs/getting-started.md Normal file
View File

@@ -0,0 +1,249 @@
# 快速入门指南
本指南帮助算法同学快速上手 FunctionalScaffold 脚手架,在 10 分钟内完成第一个算法服务的开发和部署。
## 核心理念
**算法同学只需关注核心算法逻辑**,框架自动处理:
- HTTP 接口封装
- 参数验证
- 错误处理
- 日志记录
- 性能指标
- 健康检查
- 容器化部署
## 环境准备
### 1. 安装依赖
```bash
# 克隆项目
git clone <repository-url>
cd FunctionalScaffold
# 创建虚拟环境
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
# 安装依赖
pip install -e ".[dev]"
```
### 2. 启动服务
```bash
# 开发模式(自动重载)
uvicorn src.functional_scaffold.main:app --reload --port 8000
```
### 3. 验证服务
```bash
# 健康检查
curl http://localhost:8000/healthz
# 调用示例算法
curl -X POST http://localhost:8000/invoke \
-H "Content-Type: application/json" \
-d '{"number": 17}'
```
## 添加你的第一个算法
### 步骤 1创建算法类
`src/functional_scaffold/algorithms/` 目录下创建新文件:
```python
# src/functional_scaffold/algorithms/my_algorithm.py
from typing import Dict, Any
from .base import BaseAlgorithm
class MyAlgorithm(BaseAlgorithm):
"""我的算法类"""
def process(self, input_data: Any) -> Dict[str, Any]:
"""
算法处理逻辑 - 只需实现这个方法!
Args:
input_data: 输入数据
Returns:
Dict[str, Any]: 处理结果
"""
# 在这里实现你的算法逻辑
result = self._do_calculation(input_data)
return {
"input": input_data,
"output": result,
"message": "处理成功"
}
def _do_calculation(self, data):
"""内部计算方法"""
# 你的算法实现
return data * 2
```
### 步骤 2注册算法
`src/functional_scaffold/algorithms/__init__.py` 中添加导出:
```python
from .base import BaseAlgorithm
from .prime_checker import PrimeChecker
from .my_algorithm import MyAlgorithm # 添加这行
__all__ = ["BaseAlgorithm", "PrimeChecker", "MyAlgorithm"] # 添加到列表
```
### 步骤 3添加 API 端点
`src/functional_scaffold/api/routes.py` 中添加路由:
```python
from ..algorithms.my_algorithm import MyAlgorithm
@router.post("/my-endpoint")
async def my_endpoint(
request: MyRequest, # 需要定义请求模型
request_id: str = Depends(get_request_id)
):
"""我的算法端点"""
algorithm = MyAlgorithm()
result = algorithm.execute(request.data)
if not result["success"]:
raise HTTPException(status_code=500, detail=result["error"])
return {
"request_id": request_id,
"status": "success",
"result": result["result"],
"metadata": result["metadata"]
}
```
### 步骤 4定义请求模型
`src/functional_scaffold/api/models.py` 中添加:
```python
class MyRequest(BaseModel):
"""我的请求模型"""
model_config = ConfigDict(
json_schema_extra={
"example": {"data": 10}
}
)
data: int = Field(..., description="输入数据")
```
### 步骤 5测试
```bash
curl -X POST http://localhost:8000/my-endpoint \
-H "Content-Type: application/json" \
-d '{"data": 10}'
```
## 使用异步任务
对于耗时较长的算法,使用异步任务接口:
### 创建异步任务
```bash
curl -X POST http://localhost:8000/jobs \
-H "Content-Type: application/json" \
-d '{
"algorithm": "MyAlgorithm",
"params": {"data": 10},
"webhook": "https://your-callback-url.com/notify"
}'
```
响应:
```json
{
"job_id": "a1b2c3d4e5f6",
"status": "pending",
"message": "任务已创建",
"created_at": "2026-02-02T10:00:00Z"
}
```
### 查询任务状态
```bash
curl http://localhost:8000/jobs/a1b2c3d4e5f6
```
响应:
```json
{
"job_id": "a1b2c3d4e5f6",
"status": "completed",
"algorithm": "MyAlgorithm",
"result": {"input": 10, "output": 20},
"metadata": {"elapsed_time": 0.001}
}
```
## 本地开发技巧
### 查看 API 文档
启动服务后访问:
- Swagger UI: http://localhost:8000/docs
- ReDoc: http://localhost:8000/redoc
### 运行测试
```bash
# 运行所有测试
pytest tests/ -v
# 运行单个测试文件
pytest tests/test_algorithms.py -v
```
### 代码格式化
```bash
# 格式化代码
black src/ tests/
# 检查代码规范
ruff check src/ tests/
```
## 下一步
- [算法开发详细指南](./algorithm-development.md) - 深入了解算法开发
- [API 参考文档](./api-reference.md) - 完整的 API 说明
- [监控指南](./monitoring.md) - 了解监控和告警
- [部署指南](./deployment.md) - 生产环境部署
## 常见问题
### Q: 如何处理算法中的异常?
A: 在 `process()` 方法中抛出异常即可,框架会自动捕获并返回标准错误响应。
### Q: 如何添加自定义指标?
A: 在 `config/metrics.yaml` 中定义指标,然后在代码中使用 `incr()``observe()` 记录。
### Q: 如何访问外部服务数据库、OSS
A: 在 `config.py` 中添加配置项,通过环境变量注入连接信息。
### Q: 算法需要加载大模型文件怎么办?
A: 在算法类的 `__init__` 方法中加载模型,框架会在容器启动时初始化。

View File

@@ -1,237 +0,0 @@
# Grafana Dashboard 导入和使用指南
## Dashboard 概述
新的 dashboard 包含 10 个面板,全面展示应用的监控指标:
### 第一行:核心性能指标
1. **HTTP 请求速率 (QPS)** - 每秒请求数,按端点和方法分组
2. **HTTP 请求延迟 (P50/P95/P99)** - 请求响应时间的百分位数
### 第二行:关键指标
3. **请求成功率** - 成功请求占比(仪表盘)
4. **当前并发请求数** - 实时并发数(仪表盘)
5. **HTTP 请求总数** - 累计请求数(统计卡片)
6. **算法执行总数** - 累计算法调用数(统计卡片)
### 第三行:算法性能
7. **算法执行速率** - 每秒算法执行次数
8. **算法执行延迟 (P50/P95/P99)** - 算法执行时间的百分位数
### 第四行:分布分析
9. **请求分布(按端点)** - 饼图展示各端点的请求占比
10. **请求状态分布** - 饼图展示成功/失败请求占比
## 导入步骤
### 1. 配置 Prometheus 数据源
首先确保 Prometheus 数据源已正确配置:
1. 打开 Grafanahttp://localhost:3000
2. 登录默认admin/admin
3. 进入 **Configuration****Data Sources**
4. 点击 **Add data source**
5. 选择 **Prometheus**
6. 配置:
- **Name**: `Prometheus`(必须是这个名称)
- **URL**: `http://prometheus:9090`(注意:使用服务名,不是 localhost
- **Access**: Server (default)
7. 点击 **Save & Test**,确保显示绿色的成功提示
### 2. 导入 Dashboard
有两种方式导入 dashboard
#### 方式 1通过 JSON 文件导入(推荐)
1. 在 Grafana 左侧菜单,点击 **Dashboards****Import**
2. 点击 **Upload JSON file**
3. 选择文件:`monitoring/grafana/dashboard.json`
4. 在导入页面:
- **Name**: FunctionalScaffold 监控仪表板
- **Folder**: General或创建新文件夹
- **Prometheus**: 选择刚才配置的 Prometheus 数据源
5. 点击 **Import**
#### 方式 2通过 JSON 内容导入
1. 在 Grafana 左侧菜单,点击 **Dashboards****Import**
2. 复制 `monitoring/grafana/dashboard.json` 的全部内容
3. 粘贴到 **Import via panel json** 文本框
4. 点击 **Load**
5. 配置数据源并点击 **Import**
### 3. 验证 Dashboard
导入成功后,你应该看到:
- ✅ 所有面板都正常显示
- ✅ 有数据的面板显示图表和数值
- ✅ 右上角显示自动刷新5秒
- ✅ 时间范围默认为最近 1 小时
## 生成测试数据
如果 dashboard 中没有数据或数据很少,运行流量生成脚本:
```bash
# 启动流量生成器
./scripts/generate_traffic.sh
```
这会持续发送请求到应用,生成监控数据。等待 1-2 分钟后dashboard 中应该会显示丰富的图表。
## Dashboard 功能
### 自动刷新
Dashboard 配置了自动刷新,默认每 5 秒更新一次。你可以在右上角修改刷新间隔:
- 5s默认
- 10s
- 30s
- 1m
- 5m
### 时间范围
默认显示最近 1 小时的数据。你可以在右上角修改时间范围:
- Last 5 minutes
- Last 15 minutes
- Last 30 minutes
- Last 1 hour默认
- Last 3 hours
- Last 6 hours
- Last 12 hours
- Last 24 hours
- 或自定义时间范围
### 实时模式
Dashboard 启用了 **Live** 模式(右上角的 Live 按钮),可以实时查看最新数据。
### 交互功能
- **缩放**:在时间序列图表上拖动选择区域可以放大
- **图例点击**:点击图例可以隐藏/显示对应的数据系列
- **Tooltip**:鼠标悬停在图表上查看详细数值
- **面板全屏**:点击面板标题旁的图标可以全屏查看
## 常见问题
### 问题 1数据源连接失败
**错误信息**`dial tcp [::1]:9090: connect: connection refused`
**解决方案**
- 确保 Prometheus URL 使用 `http://prometheus:9090`(服务名)
- 不要使用 `http://localhost:9090`(在容器内部无法访问)
### 问题 2面板显示 "No data"
**可能原因**
1. 应用还没有收到任何请求
2. Prometheus 还没有抓取到数据
3. 时间范围选择不当
**解决方案**
1. 发送一些测试请求:
```bash
curl -X POST http://localhost:8111/invoke \
-H "Content-Type: application/json" \
-d '{"number": 17}'
```
2. 等待 15-30 秒让 Prometheus 抓取数据
3. 调整时间范围为 "Last 5 minutes"
4. 运行流量生成脚本:`./scripts/generate_traffic.sh`
### 问题 3延迟图表显示 "NaN" 或空值
**原因**:直方图数据不足,无法计算百分位数
**解决方案**
- 发送更多请求以积累足够的数据
- 等待几分钟让数据积累
- 使用流量生成脚本持续发送请求
### 问题 4数据源变量未正确设置
**错误信息**:面板显示 "Datasource not found"
**解决方案**
1. 确保 Prometheus 数据源的名称是 `Prometheus`
2. 或者在 dashboard 设置中重新选择数据源:
- 点击右上角的齿轮图标Dashboard settings
- 进入 **Variables** 标签
- 编辑 `DS_PROMETHEUS` 变量
- 选择正确的 Prometheus 数据源
## PromQL 查询说明
Dashboard 使用的主要 PromQL 查询:
### HTTP 请求速率
```promql
sum(rate(http_requests_total[1m])) by (endpoint, method)
```
### HTTP 请求延迟 P95
```promql
histogram_quantile(0.95, sum(rate(http_request_duration_seconds_bucket[1m])) by (le, endpoint, method))
```
### 请求成功率
```promql
sum(rate(http_requests_total{status="success"}[5m])) / sum(rate(http_requests_total[5m]))
```
### 算法执行速率
```promql
sum(rate(algorithm_executions_total[1m])) by (algorithm, status)
```
## 自定义 Dashboard
你可以根据需要自定义 dashboard
1. **添加新面板**:点击右上角的 "Add panel" 按钮
2. **编辑面板**:点击面板标题 → Edit
3. **调整布局**:拖动面板调整位置和大小
4. **保存更改**:点击右上角的保存图标
## 导出和分享
### 导出 Dashboard
1. 点击右上角的分享图标
2. 选择 **Export** 标签
3. 点击 **Save to file** 下载 JSON 文件
### 分享 Dashboard
1. 点击右上角的分享图标
2. 选择 **Link** 标签
3. 复制链接分享给团队成员
## 告警配置(可选)
你可以为面板配置告警规则:
1. 编辑面板
2. 切换到 **Alert** 标签
3. 点击 **Create alert rule from this panel**
4. 配置告警条件和通知渠道
## 相关资源
- Grafana 官方文档https://grafana.com/docs/
- Prometheus 查询语言https://prometheus.io/docs/prometheus/latest/querying/basics/
- Dashboard 最佳实践https://grafana.com/docs/grafana/latest/best-practices/
## 技术支持
如果遇到问题:
1. 检查 Prometheus 是否正常运行http://localhost:9090
2. 检查应用 metrics 端点http://localhost:8111/metrics
3. 查看 Grafana 日志:`docker-compose logs grafana`
4. 查看 Prometheus 日志:`docker-compose logs prometheus`

View File

@@ -1,346 +0,0 @@
# 指标记录方案对比与使用指南
## 问题背景
在多实例部署场景下Kubernetes、Serverless原有的内存指标存储方案存在以下问题
1. **指标分散**:每个实例独立记录指标,无法聚合
2. **数据丢失**:实例销毁后指标丢失
3. **统计不准**:无法获得全局准确的指标视图
## 解决方案对比
### 方案1Pushgateway推荐
**原理:** 应用主动推送指标到 PushgatewayPrometheus 从 Pushgateway 抓取
**优点:**
- ✅ Prometheus 官方支持,生态成熟
- ✅ 实现简单,代码改动小
- ✅ 适合短生命周期任务Serverless、批处理
- ✅ 支持持久化,重启不丢失数据
**缺点:**
- ⚠️ 单点故障风险(可通过高可用部署解决)
- ⚠️ 不适合超高频推送(每秒数千次)
**适用场景:**
- Serverless 函数
- 批处理任务
- 短生命周期容器
- 实例数量动态变化的场景
### 方案2Redis + 自定义 Exporter
**原理:** 应用将指标写入 Redis自定义 Exporter 从 Redis 读取并转换为 Prometheus 格式
**优点:**
- ✅ 灵活可控,支持复杂聚合逻辑
- ✅ Redis 高性能,支持高并发写入
- ✅ 可以实现自定义的指标计算
**缺点:**
- ⚠️ 需要自己实现 Exporter维护成本高
- ⚠️ 增加了系统复杂度
- ⚠️ Redis 需要额外的运维成本
**适用场景:**
- 需要自定义指标聚合逻辑
- 超高频指标写入(每秒数万次)
- 需要实时查询指标数据
### 方案3标准 Prometheus Pull 模式(不推荐)
**原理:** Prometheus 从每个实例抓取指标,在查询时聚合
**优点:**
- ✅ Prometheus 标准做法
- ✅ 无需额外组件
**缺点:**
- ❌ 需要服务发现机制Kubernetes Service Discovery
- ❌ 短生命周期实例可能来不及抓取
- ❌ 实例销毁后数据丢失
**适用场景:**
- 长生命周期服务
- 实例数量相对固定
- 有完善的服务发现机制
## 使用指南
### 方案1Pushgateway推荐
#### 1. 启动服务
```bash
cd deployment
docker-compose up -d redis pushgateway prometheus grafana
```
#### 2. 修改代码
`src/functional_scaffold/api/routes.py` 中:
```python
# 替换导入
from functional_scaffold.core.metrics_pushgateway import (
track_request,
track_algorithm_execution,
)
# 使用方式不变
@router.post("/invoke")
@track_request("POST", "/invoke")
async def invoke_algorithm(request: InvokeRequest):
# ... 业务逻辑
```
#### 3. 配置环境变量
`.env` 文件中:
```bash
PUSHGATEWAY_URL=localhost:9091
METRICS_JOB_NAME=functional_scaffold
INSTANCE_ID=instance-1 # 可选,默认使用 HOSTNAME
```
#### 4. 验证
```bash
# 查看 Pushgateway 指标
curl http://localhost:9091/metrics
# 查看 Prometheus
open http://localhost:9090
# 查询示例
http_requests_total{job="functional_scaffold"}
```
### 方案2Redis + Exporter
#### 1. 启动服务
```bash
cd deployment
docker-compose up -d redis redis-exporter prometheus grafana
```
#### 2. 修改代码
`src/functional_scaffold/api/routes.py` 中:
```python
# 替换导入
from functional_scaffold.core.metrics_redis import (
track_request,
track_algorithm_execution,
)
# 使用方式不变
@router.post("/invoke")
@track_request("POST", "/invoke")
async def invoke_algorithm(request: InvokeRequest):
# ... 业务逻辑
```
#### 3. 配置环境变量
`.env` 文件中:
```bash
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_METRICS_DB=0
REDIS_PASSWORD= # 可选
INSTANCE_ID=instance-1 # 可选
```
#### 4. 安装 Redis 依赖
```bash
pip install redis
```
或在 `requirements.txt` 中添加:
```
redis>=5.0.0
```
#### 5. 验证
```bash
# 查看 Redis 中的指标
redis-cli
> HGETALL metrics:request_counter
# 查看 Exporter 输出
curl http://localhost:8001/metrics
# 查看 Prometheus
open http://localhost:9090
```
## 性能对比
| 指标 | Pushgateway | Redis + Exporter | 标准 Pull |
|------|-------------|------------------|-----------|
| 写入延迟 | ~5ms | ~1ms | N/A |
| 查询延迟 | ~10ms | ~20ms | ~5ms |
| 吞吐量 | ~1000 req/s | ~10000 req/s | ~500 req/s |
| 内存占用 | 低 | 中 | 低 |
| 复杂度 | 低 | 高 | 低 |
## 迁移步骤
### 从原有方案迁移到 Pushgateway
1. **安装依赖**(如果需要):
```bash
pip install prometheus-client
```
2. **替换导入**
```python
# 旧代码
from functional_scaffold.core.metrics import track_request
# 新代码
from functional_scaffold.core.metrics_pushgateway import track_request
```
3. **配置环境变量**
```bash
export PUSHGATEWAY_URL=localhost:9091
```
4. **启动 Pushgateway**
```bash
docker-compose up -d pushgateway
```
5. **更新 Prometheus 配置**(已包含在 `monitoring/prometheus.yml`
6. **测试验证**
```bash
# 发送请求
curl -X POST http://localhost:8000/invoke -d '{"number": 17}'
# 查看指标
curl http://localhost:9091/metrics | grep http_requests_total
```
### 从原有方案迁移到 Redis
1. **安装依赖**
```bash
pip install redis
```
2. **替换导入**
```python
# 旧代码
from functional_scaffold.core.metrics import track_request
# 新代码
from functional_scaffold.core.metrics_redis import track_request
```
3. **配置环境变量**
```bash
export REDIS_HOST=localhost
export REDIS_PORT=6379
```
4. **启动 Redis 和 Exporter**
```bash
docker-compose up -d redis redis-exporter
```
5. **测试验证**
```bash
# 发送请求
curl -X POST http://localhost:8000/invoke -d '{"number": 17}'
# 查看 Redis
redis-cli HGETALL metrics:request_counter
# 查看 Exporter
curl http://localhost:8001/metrics
```
## 常见问题
### Q1: Pushgateway 会成为单点故障吗?
A: 可以通过以下方式解决:
- 部署多个 Pushgateway 实例(负载均衡)
- 使用持久化存储(已配置)
- 推送失败时降级到本地日志
### Q2: Redis 方案的性能如何?
A: Redis 单实例可以支持 10万+ QPS对于大多数场景足够。如果需要更高性能可以
- 使用 Redis Cluster
- 批量写入(减少网络往返)
- 使用 Pipeline
### Q3: 如何在 Kubernetes 中使用?
A:
- **Pushgateway**: 部署为 Service应用通过 Service 名称访问
- **Redis**: 使用 StatefulSet 或托管 Redis 服务
### Q4: 指标数据会丢失吗?
A:
- **Pushgateway**: 支持持久化,重启不丢失
- **Redis**: 配置了 AOF 持久化,重启不丢失
- **标准 Pull**: 实例销毁后丢失
### Q5: 如何选择方案?
建议:
- **Serverless/短生命周期** → Pushgateway
- **超高并发/自定义逻辑** → Redis
- **长生命周期/K8s** → 标准 Pull需配置服务发现
## 监控和告警
### Grafana 仪表板
访问 http://localhost:3000admin/admin
已预配置的面板:
- HTTP 请求总数
- HTTP 请求延迟P50/P95/P99
- 算法执行次数
- 算法执行延迟
- 错误率
### 告警规则
在 `monitoring/alerts/rules.yaml` 中配置:
```yaml
groups:
- name: functional_scaffold
rules:
- alert: HighErrorRate
expr: rate(http_requests_total{status="error"}[5m]) > 0.05
for: 5m
labels:
severity: warning
annotations:
summary: "高错误率告警"
description: "错误率超过 5%"
```
## 参考资料
- [Prometheus Pushgateway 文档](https://github.com/prometheus/pushgateway)
- [Prometheus 最佳实践](https://prometheus.io/docs/practices/)
- [Redis 官方文档](https://redis.io/documentation)

View File

@@ -1,227 +0,0 @@
# Prometheus 指标记录问题修复总结
## 问题描述
Prometheus 中没有正常记录应用的访问数据。虽然 `/metrics` 端点可以访问,并且定义了所有指标类型,但这些指标都没有任何数据值。
## 根本原因
1. **HTTP 请求指标未记录**`api/routes.py` 中的路由处理函数没有使用 `@track_request` 装饰器来记录 HTTP 请求指标
2. **算法执行指标未记录**`algorithms/base.py` 中的 `execute()` 方法没有调用 metrics 模块来记录算法执行指标
## 解决方案
### 1. 添加 HTTP 请求指标跟踪中间件
**文件**`src/functional_scaffold/main.py`
**修改内容**
- 导入 metrics 相关的对象:`request_counter`, `request_latency`, `in_progress_requests`
- 添加 `track_metrics` 中间件,自动跟踪所有 HTTP 请求
**优点**
- 自动化:不需要在每个路由上手动添加装饰器
- 统一:所有端点的指标记录逻辑一致
- 易维护:新增端点自动获得指标跟踪能力
**实现代码**
```python
@app.middleware("http")
async def track_metrics(request: Request, call_next):
"""记录所有HTTP请求的指标"""
if not settings.metrics_enabled:
return await call_next(request)
# 跳过 /metrics 端点本身,避免循环记录
if request.url.path == "/metrics":
return await call_next(request)
in_progress_requests.inc()
start_time = time.time()
status = "success"
try:
response = await call_next(request)
if response.status_code >= 400:
status = "error"
return response
except Exception as e:
status = "error"
raise e
finally:
elapsed = time.time() - start_time
request_counter.labels(
method=request.method,
endpoint=request.url.path,
status=status
).inc()
request_latency.labels(
method=request.method,
endpoint=request.url.path
).observe(elapsed)
in_progress_requests.dec()
```
### 2. 添加算法执行指标记录
**文件**`src/functional_scaffold/algorithms/base.py`
**修改内容**
-`execute()` 方法中导入 `algorithm_counter``algorithm_latency`
-`finally` 块中记录算法执行指标
**实现代码**
```python
def execute(self, *args, **kwargs) -> Dict[str, Any]:
from ..core.metrics import algorithm_counter, algorithm_latency
start_time = time.time()
status = "success"
try:
# ... 算法执行逻辑 ...
except Exception as e:
status = "error"
# ... 错误处理 ...
finally:
elapsed_time = time.time() - start_time
algorithm_counter.labels(algorithm=self.name, status=status).inc()
algorithm_latency.labels(algorithm=self.name).observe(elapsed_time)
```
## 验证结果
### 1. 应用 /metrics 端点
修复后,`/metrics` 端点正常返回指标数据:
```
# HTTP 请求指标
http_requests_total{endpoint="/healthz",method="GET",status="success"} 3.0
http_requests_total{endpoint="/invoke",method="POST",status="success"} 2.0
http_requests_total{endpoint="/readyz",method="GET",status="success"} 1.0
# HTTP 请求延迟
http_request_duration_seconds_sum{endpoint="/invoke",method="POST"} 0.0065615177154541016
http_request_duration_seconds_count{endpoint="/invoke",method="POST"} 2.0
# 算法执行指标
algorithm_executions_total{algorithm="PrimeChecker",status="success"} 2.0
algorithm_execution_duration_seconds_sum{algorithm="PrimeChecker"} 0.00023603439331054688
algorithm_execution_duration_seconds_count{algorithm="PrimeChecker"} 2.0
# 当前进行中的请求
http_requests_in_progress 0.0
```
### 2. Prometheus 查询
Prometheus 成功抓取并存储了指标数据:
```bash
# 查询 HTTP 请求总数
curl 'http://localhost:9090/api/v1/query?query=http_requests_total'
# 查询算法执行总数
curl 'http://localhost:9090/api/v1/query?query=algorithm_executions_total'
```
## 可用指标
修复后,以下指标可以在 Prometheus 和 Grafana 中使用:
### HTTP 请求指标
1. **http_requests_total** (Counter)
- 标签:`method`, `endpoint`, `status`
- 描述HTTP 请求总数
- 用途:统计各端点的请求量、成功率
2. **http_request_duration_seconds** (Histogram)
- 标签:`method`, `endpoint`
- 描述HTTP 请求延迟分布
- 用途分析请求响应时间、P50/P95/P99 延迟
3. **http_requests_in_progress** (Gauge)
- 描述:当前正在处理的请求数
- 用途:监控并发请求数、负载情况
### 算法执行指标
1. **algorithm_executions_total** (Counter)
- 标签:`algorithm`, `status`
- 描述:算法执行总数
- 用途:统计算法调用量、成功率
2. **algorithm_execution_duration_seconds** (Histogram)
- 标签:`algorithm`
- 描述:算法执行延迟分布
- 用途:分析算法性能、优化瓶颈
## 使用示例
### Prometheus 查询示例
```promql
# 每秒请求数 (QPS)
rate(http_requests_total[5m])
# 请求成功率
sum(rate(http_requests_total{status="success"}[5m])) / sum(rate(http_requests_total[5m]))
# P95 延迟
histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))
# 算法执行失败率
sum(rate(algorithm_executions_total{status="error"}[5m])) / sum(rate(algorithm_executions_total[5m]))
```
### 生成测试流量
使用提供的脚本生成测试流量:
```bash
# 启动流量生成器
./scripts/generate_traffic.sh
# 在另一个终端查看实时指标
watch -n 1 'curl -s http://localhost:8111/metrics | grep http_requests_total'
```
## Grafana 仪表板
访问 Grafana 查看可视化指标:
1. 打开浏览器访问http://localhost:3000
2. 登录(默认用户名/密码admin/admin
3. 导入仪表板:`monitoring/grafana/dashboard.json`
仪表板包含以下面板:
- 请求速率QPS
- 请求延迟P50/P95/P99
- 错误率
- 算法执行统计
- 并发请求数
## 注意事项
1. **中间件顺序**:指标跟踪中间件应该在日志中间件之后注册,确保所有请求都被记录
2. **/metrics 端点**:中间件会跳过 `/metrics` 端点本身,避免循环记录
3. **错误状态**HTTP 状态码 >= 400 会被标记为 `status="error"`
4. **性能影响**:指标记录的性能开销极小(微秒级),不会影响应用性能
## 后续优化建议
1. **添加更多维度**:可以添加 `user_id``region` 等标签进行更细粒度的分析
2. **自定义指标**:根据业务需求添加自定义指标(如缓存命中率、外部 API 调用次数等)
3. **告警规则**:配置 Prometheus 告警规则,在指标异常时发送通知
4. **长期存储**:考虑使用 Thanos 或 Cortex 进行长期指标存储和查询
## 相关文件
- `src/functional_scaffold/main.py` - HTTP 请求指标跟踪中间件
- `src/functional_scaffold/algorithms/base.py` - 算法执行指标记录
- `src/functional_scaffold/core/metrics.py` - 指标定义
- `monitoring/prometheus.yml` - Prometheus 配置
- `monitoring/grafana/dashboard.json` - Grafana 仪表板
- `scripts/generate_traffic.sh` - 流量生成脚本

337
docs/monitoring.md Normal file
View File

@@ -0,0 +1,337 @@
# 监控指南
本文档介绍 FunctionalScaffold 的监控体系,包括指标收集、可视化和告警配置。
## 监控架构
```
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 应用实例 1 │ │ 应用实例 2 │ │ 应用实例 N │
│ /metrics 端点 │ │ /metrics 端点 │ │ /metrics 端点 │
└────────┬────────┘ └────────┬────────┘ └────────┬────────┘
│ │ │
│ 写入指标到 Redis │ │
└───────────────────────┼───────────────────────┘
┌─────────────────────────┐
│ Redis │
│ (指标聚合存储) │
└────────────┬────────────┘
│ 读取并导出
┌─────────────────────────┐
│ Prometheus │
│ (抓取 /metrics) │
└────────────┬────────────┘
│ 查询
┌─────────────────────────┐
│ Grafana │
│ (可视化展示) │
└─────────────────────────┘
```
## 快速开始
### 启动监控服务
```bash
cd deployment
docker-compose up -d redis prometheus grafana
```
### 访问地址
| 服务 | 地址 | 默认账号 |
|------|------|---------|
| 应用 Metrics | http://localhost:8000/metrics | - |
| Prometheus | http://localhost:9090 | - |
| Grafana | http://localhost:3000 | admin/admin |
## 指标说明
### HTTP 请求指标
| 指标 | 类型 | 标签 | 描述 |
|------|------|------|------|
| `http_requests_total` | Counter | method, endpoint, status | HTTP 请求总数 |
| `http_request_duration_seconds` | Histogram | method, endpoint | HTTP 请求延迟分布 |
| `http_requests_in_progress` | Gauge | - | 当前进行中的请求数 |
### 算法执行指标
| 指标 | 类型 | 标签 | 描述 |
|------|------|------|------|
| `algorithm_executions_total` | Counter | algorithm, status | 算法执行总数 |
| `algorithm_execution_duration_seconds` | Histogram | algorithm | 算法执行延迟分布 |
### 异步任务指标
| 指标 | 类型 | 标签 | 描述 |
|------|------|------|------|
| `jobs_created_total` | Counter | algorithm | 创建的任务总数 |
| `jobs_completed_total` | Counter | algorithm, status | 完成的任务总数 |
| `job_execution_duration_seconds` | Histogram | algorithm | 任务执行时间分布 |
| `webhook_deliveries_total` | Counter | status | Webhook 发送总数 |
## Prometheus 查询示例
### 基础查询
```promql
# 每秒请求数 (QPS)
rate(http_requests_total[5m])
# 按端点分组的 QPS
sum(rate(http_requests_total[5m])) by (endpoint)
# 请求成功率
sum(rate(http_requests_total{status="success"}[5m]))
/ sum(rate(http_requests_total[5m]))
# 当前并发请求数
http_requests_in_progress
```
### 延迟分析
```promql
# P50 延迟
histogram_quantile(0.50, rate(http_request_duration_seconds_bucket[5m]))
# P95 延迟
histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))
# P99 延迟
histogram_quantile(0.99, rate(http_request_duration_seconds_bucket[5m]))
# 平均延迟
rate(http_request_duration_seconds_sum[5m])
/ rate(http_request_duration_seconds_count[5m])
```
### 算法分析
```promql
# 算法执行速率
sum(rate(algorithm_executions_total[5m])) by (algorithm)
# 算法失败率
sum(rate(algorithm_executions_total{status="error"}[5m]))
/ sum(rate(algorithm_executions_total[5m]))
# 算法 P95 延迟
histogram_quantile(0.95,
sum(rate(algorithm_execution_duration_seconds_bucket[5m])) by (le, algorithm)
)
```
### 异步任务分析
```promql
# 任务创建速率
sum(rate(jobs_created_total[5m])) by (algorithm)
# 任务成功率
sum(rate(jobs_completed_total{status="completed"}[5m]))
/ sum(rate(jobs_completed_total[5m]))
# 任务积压(创建速率 - 完成速率)
sum(rate(jobs_created_total[5m])) - sum(rate(jobs_completed_total[5m]))
# Webhook 成功率
sum(rate(webhook_deliveries_total{status="success"}[5m]))
/ sum(rate(webhook_deliveries_total[5m]))
```
## Grafana 仪表板
### 导入仪表板
1. 打开 Grafana: http://localhost:3000
2. 登录admin/admin
3. 进入 **Dashboards****Import**
4. 上传文件:`monitoring/grafana/dashboard.json`
5. 选择 Prometheus 数据源
6. 点击 **Import**
### 仪表板面板
#### HTTP 监控区域
- **HTTP 请求速率 (QPS)** - 每秒请求数趋势
- **HTTP 请求延迟** - P50/P95/P99 延迟趋势
- **请求成功率** - 成功率仪表盘
- **当前并发请求数** - 实时并发数
- **HTTP 请求总数** - 累计请求数
- **请求分布** - 按端点/状态的饼图
#### 算法监控区域
- **算法执行速率** - 每秒执行次数
- **算法执行延迟** - P50/P95/P99 延迟
- **算法执行总数** - 累计执行数
#### 异步任务监控区域
- **任务创建总数** - 累计创建的任务数
- **任务完成总数** - 累计完成的任务数
- **任务失败总数** - 累计失败的任务数
- **任务成功率** - 成功率仪表盘
- **异步任务速率** - 创建和完成速率趋势
- **异步任务执行延迟** - P50/P95/P99 延迟
- **任务状态分布** - 按状态的饼图
- **Webhook 发送状态** - 成功/失败分布
## 告警配置
### 告警规则
告警规则定义在 `monitoring/alerts/rules.yaml`
```yaml
groups:
- name: functional_scaffold_alerts
interval: 30s
rules:
# 高错误率告警
- alert: HighErrorRate
expr: rate(http_requests_total{status="error"}[5m]) > 0.05
for: 5m
labels:
severity: warning
annotations:
summary: "检测到高错误率"
description: "端点 {{ $labels.endpoint }} 的错误率为 {{ $value }} 请求/秒"
# 高延迟告警
- alert: HighLatency
expr: histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m])) > 1
for: 5m
labels:
severity: warning
annotations:
summary: "检测到高延迟"
description: "端点 {{ $labels.endpoint }} 的 P95 延迟为 {{ $value }}s"
# 服务不可用告警
- alert: ServiceDown
expr: up{job="functional-scaffold"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "服务不可用"
description: "FunctionalScaffold 服务已停止超过 1 分钟"
# 异步任务失败率告警
- alert: HighJobFailureRate
expr: rate(jobs_completed_total{status="failed"}[5m]) / rate(jobs_completed_total[5m]) > 0.1
for: 5m
labels:
severity: warning
annotations:
summary: "异步任务失败率过高"
description: "算法 {{ $labels.algorithm }} 的异步任务失败率超过 10%"
# 任务积压告警
- alert: JobBacklog
expr: sum(rate(jobs_created_total[5m])) - sum(rate(jobs_completed_total[5m])) > 10
for: 10m
labels:
severity: warning
annotations:
summary: "异步任务积压"
description: "任务创建速率超过完成速率,可能存在积压"
```
### 告警级别
| 级别 | 描述 | 响应时间 |
|------|------|---------|
| critical | 严重告警,服务不可用 | 立即响应 |
| warning | 警告,性能下降或异常 | 1 小时内响应 |
| info | 信息,需要关注 | 工作时间内响应 |
## 自定义指标
### 添加新指标
1.`config/metrics.yaml` 中定义:
```yaml
custom_metrics:
my_custom_counter:
name: "my_custom_counter"
type: counter
description: "我的自定义计数器"
labels: [label1, label2]
my_custom_histogram:
name: "my_custom_histogram"
type: histogram
description: "我的自定义直方图"
labels: [label1]
buckets: [0.1, 0.5, 1, 5, 10]
```
2. 在代码中使用:
```python
from functional_scaffold.core.metrics_unified import incr, observe
# 增加计数器
incr("my_custom_counter", {"label1": "value1", "label2": "value2"})
# 记录直方图
observe("my_custom_histogram", {"label1": "value1"}, 0.5)
```
## 故障排查
### 指标不显示
1. 检查应用 metrics 端点:
```bash
curl http://localhost:8000/metrics
```
2. 检查 Redis 连接:
```bash
redis-cli ping
```
3. 检查 Prometheus 抓取状态:
- 访问 http://localhost:9090/targets
- 确认 functional-scaffold 目标状态为 UP
### Grafana 无数据
1. 检查数据源配置:
- URL 应为 `http://prometheus:9090`(容器内部)
- 不是 `http://localhost:9090`
2. 检查时间范围:
- 确保选择了正确的时间范围
- 尝试 "Last 5 minutes"
3. 生成测试流量:
```bash
./scripts/generate_traffic.sh
```
### 告警不触发
1. 检查 Prometheus 规则加载:
- 访问 http://localhost:9090/rules
- 确认规则已加载
2. 检查告警状态:
- 访问 http://localhost:9090/alerts
- 查看告警是否处于 pending 或 firing 状态
## 参考资料
- [Prometheus 文档](https://prometheus.io/docs/)
- [Grafana 文档](https://grafana.com/docs/)
- [PromQL 查询语言](https://prometheus.io/docs/prometheus/latest/querying/basics/)

View File

@@ -1,107 +0,0 @@
# Swagger 文档
本目录包含自动生成的 OpenAPI 规范文档。
## 生成文档
运行以下命令生成或更新 OpenAPI 规范:
```bash
python scripts/export_openapi.py
```
这将生成 `openapi.json` 文件,包含完整的 API 规范。
## 查看文档
### 在线查看
启动应用后,访问以下 URL
- **Swagger UI**: http://localhost:8000/docs
- **ReDoc**: http://localhost:8000/redoc
### 离线查看
使用 Swagger Editor 或其他 OpenAPI 工具打开 `openapi.json` 文件。
## API 规范
### 端点列表
#### 算法接口
- `POST /invoke` - 同步调用算法
- 请求体: `{"number": integer}`
- 响应: 算法执行结果
- `POST /jobs` - 异步任务接口(预留)
- 当前返回 501 Not Implemented
#### 健康检查
- `GET /healthz` - 存活检查
- 响应: `{"status": "healthy", "timestamp": float}`
- `GET /readyz` - 就绪检查
- 响应: `{"status": "ready", "timestamp": float, "checks": {...}}`
#### 监控
- `GET /metrics` - Prometheus 指标
- 响应: Prometheus 文本格式
### 数据模型
#### InvokeRequest
```json
{
"number": 17
}
```
#### InvokeResponse
```json
{
"request_id": "uuid",
"status": "success",
"result": {
"number": 17,
"is_prime": true,
"factors": [],
"algorithm": "trial_division"
},
"metadata": {
"algorithm": "PrimeChecker",
"version": "1.0.0",
"elapsed_time": 0.001
}
}
```
#### ErrorResponse
```json
{
"error": "ERROR_CODE",
"message": "Error description",
"details": {},
"request_id": "uuid"
}
```
## 更新文档
当修改 API 接口后,需要重新生成文档:
1. 修改代码(路由、模型等)
2. 运行 `python scripts/export_openapi.py`
3. 提交更新后的 `openapi.json`
## 注意事项
- `openapi.json` 是自动生成的,不要手动编辑
- 所有 API 变更都应该在代码中完成,然后重新生成文档
- 确保 Pydantic 模型包含完整的文档字符串和示例

View File

@@ -51,3 +51,43 @@ groups:
annotations: annotations:
summary: "算法执行延迟过高" summary: "算法执行延迟过高"
description: "算法 {{ $labels.algorithm }} 的 P95 延迟为 {{ $value }}s" description: "算法 {{ $labels.algorithm }} 的 P95 延迟为 {{ $value }}s"
# 异步任务失败率告警
- alert: HighJobFailureRate
expr: rate(jobs_completed_total{status="failed"}[5m]) / rate(jobs_completed_total[5m]) > 0.1
for: 5m
labels:
severity: warning
annotations:
summary: "异步任务失败率过高"
description: "算法 {{ $labels.algorithm }} 的异步任务失败率超过 10%"
# 异步任务执行延迟告警
- alert: HighJobLatency
expr: histogram_quantile(0.95, rate(job_execution_duration_seconds_bucket[5m])) > 60
for: 5m
labels:
severity: warning
annotations:
summary: "异步任务执行延迟过高"
description: "算法 {{ $labels.algorithm }} 的异步任务 P95 延迟为 {{ $value }}s"
# 异步任务积压告警
- alert: JobBacklog
expr: sum(rate(jobs_created_total[5m])) - sum(rate(jobs_completed_total[5m])) > 10
for: 10m
labels:
severity: warning
annotations:
summary: "异步任务积压"
description: "任务创建速率超过完成速率,可能存在积压"
# Webhook 发送失败率告警
- alert: HighWebhookFailureRate
expr: rate(webhook_deliveries_total{status="failed"}[5m]) / rate(webhook_deliveries_total[5m]) > 0.2
for: 5m
labels:
severity: warning
annotations:
summary: "Webhook 发送失败率过高"
description: "Webhook 发送失败率超过 20%"

View File

@@ -765,6 +765,636 @@
], ],
"title": "请求状态分布", "title": "请求状态分布",
"type": "piechart" "type": "piechart"
},
{
"collapsed": false,
"gridPos": {
"h": 1,
"w": 24,
"x": 0,
"y": 32
},
"id": 100,
"panels": [],
"title": "异步任务监控",
"type": "row"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "thresholds"
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
}
]
},
"unit": "short"
},
"overrides": []
},
"gridPos": {
"h": 4,
"w": 6,
"x": 0,
"y": 33
},
"id": 11,
"options": {
"colorMode": "value",
"graphMode": "area",
"justifyMode": "auto",
"orientation": "auto",
"reduceOptions": {
"values": false,
"calcs": ["lastNotNull"],
"fields": ""
},
"textMode": "auto"
},
"pluginVersion": "9.0.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"expr": "sum(jobs_created_total)",
"refId": "A"
}
],
"title": "任务创建总数",
"type": "stat"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "thresholds"
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
}
]
},
"unit": "short"
},
"overrides": []
},
"gridPos": {
"h": 4,
"w": 6,
"x": 6,
"y": 33
},
"id": 12,
"options": {
"colorMode": "value",
"graphMode": "area",
"justifyMode": "auto",
"orientation": "auto",
"reduceOptions": {
"values": false,
"calcs": ["lastNotNull"],
"fields": ""
},
"textMode": "auto"
},
"pluginVersion": "9.0.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"expr": "sum(jobs_completed_total{status=\"completed\"})",
"refId": "A"
}
],
"title": "任务完成总数",
"type": "stat"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "thresholds"
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "red",
"value": null
}
]
},
"unit": "short"
},
"overrides": []
},
"gridPos": {
"h": 4,
"w": 6,
"x": 12,
"y": 33
},
"id": 13,
"options": {
"colorMode": "value",
"graphMode": "area",
"justifyMode": "auto",
"orientation": "auto",
"reduceOptions": {
"values": false,
"calcs": ["lastNotNull"],
"fields": ""
},
"textMode": "auto"
},
"pluginVersion": "9.0.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"expr": "sum(jobs_completed_total{status=\"failed\"})",
"refId": "A"
}
],
"title": "任务失败总数",
"type": "stat"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "thresholds"
},
"mappings": [],
"max": 1,
"min": 0,
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "red",
"value": null
},
{
"color": "yellow",
"value": 0.9
},
{
"color": "green",
"value": 0.95
}
]
},
"unit": "percentunit"
},
"overrides": []
},
"gridPos": {
"h": 4,
"w": 6,
"x": 18,
"y": 33
},
"id": 14,
"options": {
"orientation": "auto",
"reduceOptions": {
"values": false,
"calcs": ["lastNotNull"],
"fields": ""
},
"showThresholdLabels": false,
"showThresholdMarkers": true
},
"pluginVersion": "9.0.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"expr": "sum(rate(jobs_completed_total{status=\"completed\"}[5m])) / sum(rate(jobs_completed_total[5m]))",
"refId": "A"
}
],
"title": "任务成功率",
"type": "gauge"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "任务/秒",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 20,
"gradientMode": "opacity",
"hideFrom": {
"tooltip": false,
"viz": false,
"legend": false
},
"lineInterpolation": "smooth",
"lineWidth": 2,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "never",
"spanNulls": true,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
}
]
},
"unit": "ops"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 37
},
"id": 15,
"options": {
"legend": {
"calcs": ["mean", "lastNotNull"],
"displayMode": "table",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "multi",
"sort": "desc"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"expr": "sum(rate(jobs_created_total[1m])) by (algorithm)",
"legendFormat": "创建 - {{algorithm}}",
"refId": "A"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"expr": "sum(rate(jobs_completed_total[1m])) by (algorithm, status)",
"legendFormat": "完成 - {{algorithm}} ({{status}})",
"refId": "B"
}
],
"title": "异步任务速率",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "延迟",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 10,
"gradientMode": "none",
"hideFrom": {
"tooltip": false,
"viz": false,
"legend": false
},
"lineInterpolation": "smooth",
"lineWidth": 2,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "never",
"spanNulls": true,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "yellow",
"value": 5
},
{
"color": "red",
"value": 30
}
]
},
"unit": "s"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 37
},
"id": 16,
"options": {
"legend": {
"calcs": ["mean", "max"],
"displayMode": "table",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "multi",
"sort": "desc"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"expr": "histogram_quantile(0.50, sum(rate(job_execution_duration_seconds_bucket[1m])) by (le, algorithm))",
"legendFormat": "P50 - {{algorithm}}",
"refId": "A"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"expr": "histogram_quantile(0.95, sum(rate(job_execution_duration_seconds_bucket[1m])) by (le, algorithm))",
"legendFormat": "P95 - {{algorithm}}",
"refId": "B"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"expr": "histogram_quantile(0.99, sum(rate(job_execution_duration_seconds_bucket[1m])) by (le, algorithm))",
"legendFormat": "P99 - {{algorithm}}",
"refId": "C"
}
],
"title": "异步任务执行延迟 (P50/P95/P99)",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"hideFrom": {
"tooltip": false,
"viz": false,
"legend": false
}
},
"mappings": []
},
"overrides": [
{
"matcher": {
"id": "byName",
"options": "success"
},
"properties": [
{
"id": "color",
"value": {
"fixedColor": "green",
"mode": "fixed"
}
}
]
},
{
"matcher": {
"id": "byName",
"options": "failed"
},
"properties": [
{
"id": "color",
"value": {
"fixedColor": "red",
"mode": "fixed"
}
}
]
}
]
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 45
},
"id": 17,
"options": {
"legend": {
"displayMode": "table",
"placement": "right",
"showLegend": true,
"values": ["value", "percent"]
},
"pieType": "donut",
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"expr": "sum(jobs_completed_total) by (status)",
"legendFormat": "{{status}}",
"refId": "A"
}
],
"title": "任务状态分布",
"type": "piechart"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"hideFrom": {
"tooltip": false,
"viz": false,
"legend": false
}
},
"mappings": []
},
"overrides": [
{
"matcher": {
"id": "byName",
"options": "success"
},
"properties": [
{
"id": "color",
"value": {
"fixedColor": "green",
"mode": "fixed"
}
}
]
},
{
"matcher": {
"id": "byName",
"options": "failed"
},
"properties": [
{
"id": "color",
"value": {
"fixedColor": "red",
"mode": "fixed"
}
}
]
}
]
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 45
},
"id": 18,
"options": {
"legend": {
"displayMode": "table",
"placement": "right",
"showLegend": true,
"values": ["value", "percent"]
},
"pieType": "donut",
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"expr": "sum(webhook_deliveries_total) by (status)",
"legendFormat": "{{status}}",
"refId": "A"
}
],
"title": "Webhook 发送状态",
"type": "piechart"
} }
], ],
"refresh": "5s", "refresh": "5s",

View File

@@ -11,3 +11,6 @@ redis>=5.0.0
# YAML 配置解析 # YAML 配置解析
pyyaml>=6.0.0 pyyaml>=6.0.0
# HTTP 客户端(用于 Webhook 回调)
httpx>=0.27.0

View File

@@ -2,6 +2,7 @@
from typing import Dict, Any, List from typing import Dict, Any, List
from .base import BaseAlgorithm from .base import BaseAlgorithm
from ..core.metrics_unified import incr
class PrimeChecker(BaseAlgorithm): class PrimeChecker(BaseAlgorithm):
@@ -30,10 +31,12 @@ class PrimeChecker(BaseAlgorithm):
ValueError: 如果输入不是整数 ValueError: 如果输入不是整数
""" """
if not isinstance(number, int): if not isinstance(number, int):
incr('prime_check',{"status":"invalid_input"})
raise ValueError(f"Input must be an integer, got {type(number).__name__}") raise ValueError(f"Input must be an integer, got {type(number).__name__}")
# 小于2的数不是质数 # 小于2的数不是质数
if number < 2: if number < 2:
incr('prime_check', {"status": "number_little_two"})
return { return {
"number": number, "number": number,
"is_prime": False, "is_prime": False,
@@ -47,7 +50,7 @@ class PrimeChecker(BaseAlgorithm):
# 如果不是质数,计算因数 # 如果不是质数,计算因数
factors = [] if is_prime else self._get_factors(number) factors = [] if is_prime else self._get_factors(number)
incr('prime_check', {"status": "success"})
return { return {
"number": number, "number": number,
"is_prime": is_prime, "is_prime": is_prime,

View File

@@ -1,5 +1,6 @@
"""API 数据模型""" """API 数据模型"""
from enum import Enum
from pydantic import BaseModel, Field, ConfigDict from pydantic import BaseModel, Field, ConfigDict
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
@@ -7,13 +8,7 @@ from typing import Any, Dict, Optional
class InvokeRequest(BaseModel): class InvokeRequest(BaseModel):
"""同步调用请求""" """同步调用请求"""
model_config = ConfigDict( model_config = ConfigDict(json_schema_extra={"example": {"number": 17}})
json_schema_extra={
"example": {
"number": 17
}
}
)
number: int = Field(..., description="待判断的整数") number: int = Field(..., description="待判断的整数")
@@ -30,13 +25,13 @@ class InvokeResponse(BaseModel):
"number": 17, "number": 17,
"is_prime": True, "is_prime": True,
"factors": [], "factors": [],
"algorithm": "trial_division" "algorithm": "trial_division",
}, },
"metadata": { "metadata": {
"algorithm": "PrimeChecker", "algorithm": "PrimeChecker",
"version": "1.0.0", "version": "1.0.0",
"elapsed_time": 0.001 "elapsed_time": 0.001,
} },
} }
} }
) )
@@ -71,7 +66,7 @@ class ErrorResponse(BaseModel):
"error": "VALIDATION_ERROR", "error": "VALIDATION_ERROR",
"message": "number must be an integer", "message": "number must be an integer",
"details": {"field": "number", "value": "abc"}, "details": {"field": "number", "value": "abc"},
"request_id": "550e8400-e29b-41d4-a716-446655440000" "request_id": "550e8400-e29b-41d4-a716-446655440000",
} }
} }
) )
@@ -80,3 +75,80 @@ class ErrorResponse(BaseModel):
message: str = Field(..., description="错误消息") message: str = Field(..., description="错误消息")
details: Optional[Dict[str, Any]] = Field(None, description="错误详情") details: Optional[Dict[str, Any]] = Field(None, description="错误详情")
request_id: Optional[str] = Field(None, description="请求ID") request_id: Optional[str] = Field(None, description="请求ID")
class JobStatus(str, Enum):
"""任务状态枚举"""
PENDING = "pending" # 等待执行
RUNNING = "running" # 执行中
COMPLETED = "completed" # 已完成
FAILED = "failed" # 执行失败
class JobRequest(BaseModel):
"""异步任务请求"""
model_config = ConfigDict(
json_schema_extra={
"example": {
"algorithm": "PrimeChecker",
"params": {"number": 17},
"webhook": "https://example.com/callback",
}
}
)
algorithm: str = Field(..., description="算法名称")
params: Dict[str, Any] = Field(..., description="算法参数")
webhook: Optional[str] = Field(None, description="回调 URL")
class JobCreateResponse(BaseModel):
"""任务创建响应"""
model_config = ConfigDict(
json_schema_extra={
"example": {
"job_id": "a1b2c3d4e5f6",
"status": "pending",
"message": "任务已创建",
"created_at": "2026-02-02T10:00:00Z",
}
}
)
job_id: str = Field(..., description="任务唯一标识")
status: JobStatus = Field(..., description="任务状态")
message: str = Field(..., description="状态消息")
created_at: str = Field(..., description="创建时间ISO 8601")
class JobStatusResponse(BaseModel):
"""任务状态查询响应"""
model_config = ConfigDict(
json_schema_extra={
"example": {
"job_id": "a1b2c3d4e5f6",
"status": "completed",
"algorithm": "PrimeChecker",
"created_at": "2026-02-02T10:00:00Z",
"started_at": "2026-02-02T10:00:01Z",
"completed_at": "2026-02-02T10:00:02Z",
"result": {"number": 17, "is_prime": True},
"error": None,
"metadata": {"elapsed_time": 0.001},
}
}
)
job_id: str = Field(..., description="任务唯一标识")
status: JobStatus = Field(..., description="任务状态")
algorithm: str = Field(..., description="算法名称")
created_at: str = Field(..., description="创建时间ISO 8601")
started_at: Optional[str] = Field(None, description="开始执行时间ISO 8601")
completed_at: Optional[str] = Field(None, description="完成时间ISO 8601")
result: Optional[Dict[str, Any]] = Field(None, description="执行结果(仅完成时返回)")
error: Optional[str] = Field(None, description="错误信息(仅失败时返回)")
metadata: Optional[Dict[str, Any]] = Field(None, description="元数据信息")

View File

@@ -1,7 +1,7 @@
"""API 路由""" """API 路由"""
import asyncio
from fastapi import APIRouter, HTTPException, Depends, status from fastapi import APIRouter, HTTPException, Depends, status
from fastapi.responses import JSONResponse
import time import time
import logging import logging
@@ -11,10 +11,15 @@ from .models import (
HealthResponse, HealthResponse,
ReadinessResponse, ReadinessResponse,
ErrorResponse, ErrorResponse,
JobRequest,
JobCreateResponse,
JobStatusResponse,
JobStatus,
) )
from .dependencies import get_request_id from .dependencies import get_request_id
from ..algorithms.prime_checker import PrimeChecker from ..algorithms.prime_checker import PrimeChecker
from ..core.errors import FunctionalScaffoldError, ValidationError, AlgorithmError from ..core.errors import ValidationError, AlgorithmError
from ..core.job_manager import get_job_manager
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -134,17 +139,156 @@ async def readiness_check():
@router.post( @router.post(
"/jobs", "/jobs",
status_code=status.HTTP_501_NOT_IMPLEMENTED, response_model=JobCreateResponse,
summary="异步任务接口(预留)", status_code=status.HTTP_202_ACCEPTED,
description="异步任务接口,当前版本未实现", summary="创建异步任务",
description="创建异步任务,立即返回任务 ID任务在后台执行",
responses={
202: {"description": "任务已创建", "model": JobCreateResponse},
400: {"description": "请求参数错误", "model": ErrorResponse},
404: {"description": "算法不存在", "model": ErrorResponse},
503: {"description": "服务不可用", "model": ErrorResponse},
},
) )
async def create_job(): async def create_job(
request: JobRequest,
request_id: str = Depends(get_request_id),
):
""" """
异步任务接口(预留) 创建异步任务
用于提交长时间运行的任务 - **algorithm**: 算法名称(如 PrimeChecker
- **params**: 算法参数
- **webhook**: 任务完成后的回调 URL可选
""" """
try:
job_manager = await get_job_manager()
# 检查任务管理器是否可用
if not job_manager.is_available():
raise HTTPException( raise HTTPException(
status_code=status.HTTP_501_NOT_IMPLEMENTED, status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail={"error": "NOT_IMPLEMENTED", "message": "Async jobs not implemented yet"}, detail={
"error": "SERVICE_UNAVAILABLE",
"message": "任务服务暂不可用,请稍后重试",
"request_id": request_id,
},
)
# 验证算法存在
available_algorithms = job_manager.get_available_algorithms()
if request.algorithm not in available_algorithms:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail={
"error": "ALGORITHM_NOT_FOUND",
"message": f"算法 '{request.algorithm}' 不存在",
"details": {"available_algorithms": available_algorithms},
"request_id": request_id,
},
)
# 创建任务
job_id = await job_manager.create_job(
algorithm=request.algorithm,
params=request.params,
webhook=request.webhook,
request_id=request_id,
)
# 获取任务信息
job_data = await job_manager.get_job(job_id)
# 后台执行任务
asyncio.create_task(job_manager.execute_job(job_id))
logger.info(f"异步任务已创建: job_id={job_id}, request_id={request_id}")
return JobCreateResponse(
job_id=job_id,
status=JobStatus.PENDING,
message="任务已创建",
created_at=job_data["created_at"],
)
except HTTPException:
raise
except Exception as e:
logger.error(f"创建任务失败: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail={
"error": "INTERNAL_ERROR",
"message": str(e),
"request_id": request_id,
},
)
@router.get(
"/jobs/{job_id}",
response_model=JobStatusResponse,
summary="查询任务状态",
description="查询异步任务的执行状态和结果",
responses={
200: {"description": "成功", "model": JobStatusResponse},
404: {"description": "任务不存在或已过期", "model": ErrorResponse},
503: {"description": "服务不可用", "model": ErrorResponse},
},
)
async def get_job_status(job_id: str):
"""
查询任务状态
- **job_id**: 任务唯一标识
"""
try:
job_manager = await get_job_manager()
# 检查任务管理器是否可用
if not job_manager.is_available():
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail={
"error": "SERVICE_UNAVAILABLE",
"message": "任务服务暂不可用,请稍后重试",
},
)
# 获取任务信息
job_data = await job_manager.get_job(job_id)
if not job_data:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail={
"error": "JOB_NOT_FOUND",
"message": f"任务 '{job_id}' 不存在或已过期",
},
)
return JobStatusResponse(
job_id=job_data["job_id"],
status=JobStatus(job_data["status"]),
algorithm=job_data["algorithm"],
created_at=job_data["created_at"],
started_at=job_data["started_at"],
completed_at=job_data["completed_at"],
result=job_data["result"],
error=job_data["error"],
metadata=job_data["metadata"],
)
except HTTPException:
raise
except Exception as e:
logger.error(f"查询任务状态失败: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail={
"error": "INTERNAL_ERROR",
"message": str(e),
},
) )

View File

@@ -49,6 +49,11 @@ class Settings(BaseSettings):
metrics_config_path: str = "config/metrics.yaml" metrics_config_path: str = "config/metrics.yaml"
metrics_instance_id: Optional[str] = None # 默认使用 hostname metrics_instance_id: Optional[str] = None # 默认使用 hostname
# 异步任务配置
job_result_ttl: int = 1800 # 结果缓存时间(秒),默认 30 分钟
webhook_max_retries: int = 3 # Webhook 最大重试次数
webhook_timeout: int = 10 # Webhook 超时时间(秒)
# 全局配置实例 # 全局配置实例
settings = Settings() settings = Settings()

View File

@@ -0,0 +1,381 @@
"""异步任务管理模块
基于 Redis 的异步任务管理,支持任务创建、执行、状态查询和 Webhook 回调。
"""
import asyncio
import json
import logging
import secrets
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Type
import httpx
import redis.asyncio as aioredis
from ..algorithms.base import BaseAlgorithm
from ..config import settings
from ..core.metrics_unified import incr, observe
logger = logging.getLogger(__name__)
class JobManager:
"""异步任务管理器"""
def __init__(self):
self._redis_client: Optional[aioredis.Redis] = None
self._algorithm_registry: Dict[str, Type[BaseAlgorithm]] = {}
self._http_client: Optional[httpx.AsyncClient] = None
async def initialize(self) -> None:
"""初始化 Redis 连接和 HTTP 客户端"""
# 初始化 Redis 异步连接
try:
self._redis_client = aioredis.Redis(
host=settings.redis_host,
port=settings.redis_port,
db=settings.redis_db,
password=settings.redis_password if settings.redis_password else None,
decode_responses=True,
socket_connect_timeout=5,
socket_timeout=5,
)
# 测试连接
await self._redis_client.ping()
logger.info(f"任务管理器 Redis 连接成功: {settings.redis_host}:{settings.redis_port}")
except Exception as e:
logger.error(f"任务管理器 Redis 连接失败: {e}")
self._redis_client = None
# 初始化 HTTP 客户端
self._http_client = httpx.AsyncClient(timeout=settings.webhook_timeout)
# 注册算法
self._register_algorithms()
async def shutdown(self) -> None:
"""关闭连接"""
if self._redis_client:
await self._redis_client.close()
logger.info("任务管理器 Redis 连接已关闭")
if self._http_client:
await self._http_client.aclose()
logger.info("任务管理器 HTTP 客户端已关闭")
def _register_algorithms(self) -> None:
"""注册可用的算法类"""
from ..algorithms import __all__ as algorithm_names
from .. import algorithms as algorithms_module
for name in algorithm_names:
cls = getattr(algorithms_module, name, None)
if cls and isinstance(cls, type) and issubclass(cls, BaseAlgorithm):
if cls is not BaseAlgorithm:
self._algorithm_registry[name] = cls
logger.debug(f"已注册算法: {name}")
logger.info(f"已注册 {len(self._algorithm_registry)} 个算法")
def get_available_algorithms(self) -> List[str]:
"""获取可用算法列表"""
return list(self._algorithm_registry.keys())
def _generate_job_id(self) -> str:
"""生成 12 位十六进制任务 ID"""
return secrets.token_hex(6)
def _get_timestamp(self) -> str:
"""获取 ISO 8601 格式时间戳"""
return datetime.now(timezone.utc).isoformat()
async def create_job(
self,
algorithm: str,
params: Dict[str, Any],
webhook: Optional[str] = None,
request_id: Optional[str] = None,
) -> str:
"""创建新任务,返回 job_id
Args:
algorithm: 算法名称
params: 算法参数
webhook: 回调 URL可选
request_id: 关联的请求 ID可选
Returns:
str: 任务 ID
Raises:
RuntimeError: Redis 不可用时抛出
ValueError: 算法不存在时抛出
"""
if not self._redis_client:
raise RuntimeError("Redis 不可用,无法创建任务")
if algorithm not in self._algorithm_registry:
raise ValueError(f"算法 '{algorithm}' 不存在")
job_id = self._generate_job_id()
created_at = self._get_timestamp()
# 构建任务数据
job_data = {
"status": "pending",
"algorithm": algorithm,
"params": json.dumps(params),
"webhook": webhook or "",
"request_id": request_id or "",
"created_at": created_at,
"started_at": "",
"completed_at": "",
"result": "",
"error": "",
"metadata": "",
}
# 存储到 Redis
key = f"job:{job_id}"
await self._redis_client.hset(key, mapping=job_data)
# 记录指标
incr("jobs_created_total", {"algorithm": algorithm})
logger.info(f"任务已创建: job_id={job_id}, algorithm={algorithm}")
return job_id
async def get_job(self, job_id: str) -> Optional[Dict[str, Any]]:
"""获取任务信息
Args:
job_id: 任务 ID
Returns:
任务信息字典,不存在时返回 None
"""
if not self._redis_client:
return None
key = f"job:{job_id}"
job_data = await self._redis_client.hgetall(key)
if not job_data:
return None
# 解析 JSON 字段
result = {
"job_id": job_id,
"status": job_data.get("status", ""),
"algorithm": job_data.get("algorithm", ""),
"created_at": job_data.get("created_at", ""),
"started_at": job_data.get("started_at") or None,
"completed_at": job_data.get("completed_at") or None,
"result": None,
"error": job_data.get("error") or None,
"metadata": None,
}
# 解析 result
if job_data.get("result"):
try:
result["result"] = json.loads(job_data["result"])
except json.JSONDecodeError:
result["result"] = None
# 解析 metadata
if job_data.get("metadata"):
try:
result["metadata"] = json.loads(job_data["metadata"])
except json.JSONDecodeError:
result["metadata"] = None
return result
async def execute_job(self, job_id: str) -> None:
"""执行任务(在后台任务中调用)
Args:
job_id: 任务 ID
"""
if not self._redis_client:
logger.error(f"Redis 不可用,无法执行任务: {job_id}")
return
key = f"job:{job_id}"
job_data = await self._redis_client.hgetall(key)
if not job_data:
logger.error(f"任务不存在: {job_id}")
return
algorithm_name = job_data.get("algorithm", "")
webhook_url = job_data.get("webhook", "")
# 解析参数
try:
params = json.loads(job_data.get("params", "{}"))
except json.JSONDecodeError:
params = {}
# 更新状态为 running
started_at = self._get_timestamp()
await self._redis_client.hset(key, mapping={"status": "running", "started_at": started_at})
logger.info(f"开始执行任务: job_id={job_id}, algorithm={algorithm_name}")
import time
start_time = time.time()
status = "completed"
result_data = None
error_msg = None
metadata = None
try:
# 获取算法类并执行
algorithm_cls = self._algorithm_registry.get(algorithm_name)
if not algorithm_cls:
raise ValueError(f"算法 '{algorithm_name}' 不存在")
algorithm = algorithm_cls()
# 根据算法类型传递参数
if algorithm_name == "PrimeChecker":
execution_result = algorithm.execute(params.get("number", 0))
else:
# 通用参数传递
execution_result = algorithm.execute(**params)
if execution_result.get("success"):
result_data = execution_result.get("result", {})
metadata = execution_result.get("metadata", {})
else:
status = "failed"
error_msg = execution_result.get("error", "算法执行失败")
metadata = execution_result.get("metadata", {})
except Exception as e:
status = "failed"
error_msg = str(e)
logger.error(f"任务执行失败: job_id={job_id}, error={e}", exc_info=True)
# 计算执行时间
elapsed_time = time.time() - start_time
completed_at = self._get_timestamp()
# 更新任务状态
update_data = {
"status": status,
"completed_at": completed_at,
"result": json.dumps(result_data) if result_data else "",
"error": error_msg or "",
"metadata": json.dumps(metadata) if metadata else "",
}
await self._redis_client.hset(key, mapping=update_data)
# 设置 TTL
await self._redis_client.expire(key, settings.job_result_ttl)
# 记录指标
incr("jobs_completed_total", {"algorithm": algorithm_name, "status": status})
observe("job_execution_duration_seconds", {"algorithm": algorithm_name}, elapsed_time)
logger.info(f"任务执行完成: job_id={job_id}, status={status}, elapsed={elapsed_time:.3f}s")
# 发送 Webhook 回调
if webhook_url:
await self._send_webhook(job_id, webhook_url)
async def _send_webhook(self, job_id: str, webhook_url: str) -> None:
"""发送 Webhook 回调(带重试)
Args:
job_id: 任务 ID
webhook_url: 回调 URL
"""
if not self._http_client:
logger.warning("HTTP 客户端不可用,无法发送 Webhook")
return
# 获取任务数据
job_data = await self.get_job(job_id)
if not job_data:
logger.error(f"无法获取任务数据用于 Webhook: {job_id}")
return
# 构建回调负载
payload = {
"job_id": job_data["job_id"],
"status": job_data["status"],
"algorithm": job_data["algorithm"],
"result": job_data["result"],
"error": job_data["error"],
"metadata": job_data["metadata"],
"completed_at": job_data["completed_at"],
}
# 重试间隔(指数退避)
retry_delays = [1, 5, 15]
max_retries = settings.webhook_max_retries
for attempt in range(max_retries):
try:
response = await self._http_client.post(
webhook_url,
json=payload,
headers={"Content-Type": "application/json"},
)
if response.status_code < 400:
incr("webhook_deliveries_total", {"status": "success"})
logger.info(
f"Webhook 发送成功: job_id={job_id}, url={webhook_url}, "
f"status_code={response.status_code}"
)
return
else:
logger.warning(
f"Webhook 响应错误: job_id={job_id}, status_code={response.status_code}"
)
except Exception as e:
logger.warning(
f"Webhook 发送失败 (尝试 {attempt + 1}/{max_retries}): "
f"job_id={job_id}, error={e}"
)
# 等待后重试
if attempt < max_retries - 1:
delay = retry_delays[min(attempt, len(retry_delays) - 1)]
await asyncio.sleep(delay)
# 所有重试都失败
incr("webhook_deliveries_total", {"status": "failed"})
logger.error(f"Webhook 发送最终失败: job_id={job_id}, url={webhook_url}")
def is_available(self) -> bool:
"""检查任务管理器是否可用"""
return self._redis_client is not None
# 全局单例
_job_manager: Optional[JobManager] = None
async def get_job_manager() -> JobManager:
"""获取任务管理器单例"""
global _job_manager
if _job_manager is None:
_job_manager = JobManager()
await _job_manager.initialize()
return _job_manager
async def shutdown_job_manager() -> None:
"""关闭任务管理器"""
global _job_manager
if _job_manager is not None:
await _job_manager.shutdown()
_job_manager = None

View File

@@ -17,6 +17,7 @@ from .core.metrics_unified import (
gauge_decr, gauge_decr,
export, export,
) )
from .core.job_manager import get_job_manager, shutdown_job_manager
# 设置日志 # 设置日志
setup_logging(level=settings.log_level, format_type=settings.log_format) setup_logging(level=settings.log_level, format_type=settings.log_format)
@@ -132,6 +133,16 @@ async def startup_event():
else: else:
logger.warning("Redis 不可用,指标将不会被收集") logger.warning("Redis 不可用,指标将不会被收集")
# 初始化任务管理器
try:
job_manager = await get_job_manager()
if job_manager.is_available():
logger.info("异步任务管理器已启用")
else:
logger.warning("Redis 不可用,异步任务功能将不可用")
except Exception as e:
logger.warning(f"任务管理器初始化失败: {e}")
# 关闭事件 # 关闭事件
@app.on_event("shutdown") @app.on_event("shutdown")
@@ -139,6 +150,13 @@ async def shutdown_event():
"""应用关闭时执行""" """应用关闭时执行"""
logger.info(f"Shutting down {settings.app_name}") logger.info(f"Shutting down {settings.app_name}")
# 关闭任务管理器
try:
await shutdown_job_manager()
logger.info("任务管理器已关闭")
except Exception as e:
logger.warning(f"任务管理器关闭失败: {e}")
if __name__ == "__main__": if __name__ == "__main__":
import uvicorn import uvicorn

View File

@@ -103,8 +103,5 @@ class TestMetricsEndpoint:
class TestJobsEndpoint: class TestJobsEndpoint:
"""测试异步任务端点""" """测试异步任务端点"""
def test_jobs_not_implemented(self, client): # 详细测试在 test_job_manager.py 中
"""测试异步任务接口(未实现)""" pass
response = client.post("/jobs", json={"number": 17})
assert response.status_code == status.HTTP_501_NOT_IMPLEMENTED

401
tests/test_job_manager.py Normal file
View File

@@ -0,0 +1,401 @@
"""异步任务管理器测试"""
import asyncio
import json
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from fastapi import status
from src.functional_scaffold.core.job_manager import (
JobManager,
get_job_manager,
shutdown_job_manager,
)
from src.functional_scaffold.api.models import JobStatus
class TestJobManager:
"""测试 JobManager 类"""
@pytest.fixture
def mock_redis(self):
"""模拟 Redis 客户端"""
mock = AsyncMock()
mock.ping = AsyncMock(return_value=True)
mock.hset = AsyncMock()
mock.hgetall = AsyncMock(return_value={})
mock.expire = AsyncMock()
mock.close = AsyncMock()
return mock
@pytest.fixture
def mock_http_client(self):
"""模拟 HTTP 客户端"""
mock = AsyncMock()
mock.post = AsyncMock()
mock.aclose = AsyncMock()
return mock
@pytest.mark.asyncio
async def test_generate_job_id(self):
"""测试任务 ID 生成"""
manager = JobManager()
job_id = manager._generate_job_id()
assert len(job_id) == 12
assert all(c in "0123456789abcdef" for c in job_id)
@pytest.mark.asyncio
async def test_get_timestamp(self):
"""测试时间戳生成"""
manager = JobManager()
timestamp = manager._get_timestamp()
assert "T" in timestamp
assert timestamp.endswith("+00:00") or timestamp.endswith("Z")
@pytest.mark.asyncio
async def test_get_available_algorithms(self):
"""测试获取可用算法列表"""
manager = JobManager()
manager._register_algorithms()
algorithms = manager.get_available_algorithms()
assert "PrimeChecker" in algorithms
@pytest.mark.asyncio
async def test_is_available_without_redis(self):
"""测试 Redis 不可用时的状态"""
manager = JobManager()
assert manager.is_available() is False
class TestJobManagerWithMocks:
"""使用 Mock 测试 JobManager"""
@pytest.mark.asyncio
async def test_create_job(self):
"""测试创建任务"""
manager = JobManager()
# 模拟 Redis
mock_redis = AsyncMock()
mock_redis.hset = AsyncMock()
manager._redis_client = mock_redis
manager._register_algorithms()
job_id = await manager.create_job(
algorithm="PrimeChecker",
params={"number": 17},
webhook="https://example.com/callback",
request_id="test-request-id",
)
assert len(job_id) == 12
mock_redis.hset.assert_called_once()
@pytest.mark.asyncio
async def test_create_job_invalid_algorithm(self):
"""测试创建任务时算法不存在"""
manager = JobManager()
mock_redis = AsyncMock()
manager._redis_client = mock_redis
manager._register_algorithms()
with pytest.raises(ValueError, match="不存在"):
await manager.create_job(
algorithm="NonExistentAlgorithm",
params={},
)
@pytest.mark.asyncio
async def test_create_job_redis_unavailable(self):
"""测试 Redis 不可用时创建任务"""
manager = JobManager()
manager._register_algorithms()
with pytest.raises(RuntimeError, match="Redis 不可用"):
await manager.create_job(
algorithm="PrimeChecker",
params={"number": 17},
)
@pytest.mark.asyncio
async def test_get_job(self):
"""测试获取任务信息"""
manager = JobManager()
mock_redis = AsyncMock()
mock_redis.hgetall = AsyncMock(
return_value={
"status": "completed",
"algorithm": "PrimeChecker",
"created_at": "2026-02-02T10:00:00+00:00",
"started_at": "2026-02-02T10:00:01+00:00",
"completed_at": "2026-02-02T10:00:02+00:00",
"result": '{"number": 17, "is_prime": true}',
"error": "",
"metadata": '{"elapsed_time": 0.001}',
}
)
manager._redis_client = mock_redis
job_data = await manager.get_job("test-job-id")
assert job_data is not None
assert job_data["job_id"] == "test-job-id"
assert job_data["status"] == "completed"
assert job_data["algorithm"] == "PrimeChecker"
assert job_data["result"]["number"] == 17
assert job_data["result"]["is_prime"] is True
@pytest.mark.asyncio
async def test_get_job_not_found(self):
"""测试获取不存在的任务"""
manager = JobManager()
mock_redis = AsyncMock()
mock_redis.hgetall = AsyncMock(return_value={})
manager._redis_client = mock_redis
job_data = await manager.get_job("non-existent-job")
assert job_data is None
@pytest.mark.asyncio
async def test_execute_job(self):
"""测试执行任务"""
manager = JobManager()
mock_redis = AsyncMock()
mock_redis.hgetall = AsyncMock(
return_value={
"status": "pending",
"algorithm": "PrimeChecker",
"params": '{"number": 17}',
"webhook": "",
"request_id": "test-request-id",
"created_at": "2026-02-02T10:00:00+00:00",
}
)
mock_redis.hset = AsyncMock()
mock_redis.expire = AsyncMock()
manager._redis_client = mock_redis
manager._register_algorithms()
await manager.execute_job("test-job-id")
# 验证状态更新被调用
assert mock_redis.hset.call_count >= 2 # running + completed
mock_redis.expire.assert_called_once()
class TestJobsAPI:
"""测试 /jobs API 端点"""
def test_create_job_success(self, client):
"""测试成功创建任务"""
with patch(
"src.functional_scaffold.api.routes.get_job_manager", new_callable=AsyncMock
) as mock_get_manager:
mock_manager = MagicMock()
mock_manager.is_available.return_value = True
mock_manager.get_available_algorithms.return_value = ["PrimeChecker"]
mock_manager.create_job = AsyncMock(return_value="abc123def456")
mock_manager.get_job = AsyncMock(
return_value={
"job_id": "abc123def456",
"status": "pending",
"algorithm": "PrimeChecker",
"created_at": "2026-02-02T10:00:00+00:00",
}
)
mock_manager.execute_job = AsyncMock()
mock_get_manager.return_value = mock_manager
response = client.post(
"/jobs",
json={
"algorithm": "PrimeChecker",
"params": {"number": 17},
},
)
assert response.status_code == status.HTTP_202_ACCEPTED
data = response.json()
assert data["job_id"] == "abc123def456"
assert data["status"] == "pending"
assert data["message"] == "任务已创建"
def test_create_job_algorithm_not_found(self, client):
"""测试创建任务时算法不存在"""
with patch(
"src.functional_scaffold.api.routes.get_job_manager", new_callable=AsyncMock
) as mock_get_manager:
mock_manager = MagicMock()
mock_manager.is_available.return_value = True
mock_manager.get_available_algorithms.return_value = ["PrimeChecker"]
mock_get_manager.return_value = mock_manager
response = client.post(
"/jobs",
json={
"algorithm": "NonExistentAlgorithm",
"params": {},
},
)
assert response.status_code == status.HTTP_404_NOT_FOUND
data = response.json()
assert data["detail"]["error"] == "ALGORITHM_NOT_FOUND"
def test_create_job_service_unavailable(self, client):
"""测试服务不可用时创建任务"""
with patch(
"src.functional_scaffold.api.routes.get_job_manager", new_callable=AsyncMock
) as mock_get_manager:
mock_manager = MagicMock()
mock_manager.is_available.return_value = False
mock_get_manager.return_value = mock_manager
response = client.post(
"/jobs",
json={
"algorithm": "PrimeChecker",
"params": {"number": 17},
},
)
assert response.status_code == status.HTTP_503_SERVICE_UNAVAILABLE
def test_get_job_status_success(self, client):
"""测试成功查询任务状态"""
with patch(
"src.functional_scaffold.api.routes.get_job_manager", new_callable=AsyncMock
) as mock_get_manager:
mock_manager = MagicMock()
mock_manager.is_available.return_value = True
mock_manager.get_job = AsyncMock(
return_value={
"job_id": "abc123def456",
"status": "completed",
"algorithm": "PrimeChecker",
"created_at": "2026-02-02T10:00:00+00:00",
"started_at": "2026-02-02T10:00:01+00:00",
"completed_at": "2026-02-02T10:00:02+00:00",
"result": {"number": 17, "is_prime": True},
"error": None,
"metadata": {"elapsed_time": 0.001},
}
)
mock_get_manager.return_value = mock_manager
response = client.get("/jobs/abc123def456")
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["job_id"] == "abc123def456"
assert data["status"] == "completed"
assert data["result"]["is_prime"] is True
def test_get_job_status_not_found(self, client):
"""测试查询不存在的任务"""
with patch(
"src.functional_scaffold.api.routes.get_job_manager", new_callable=AsyncMock
) as mock_get_manager:
mock_manager = MagicMock()
mock_manager.is_available.return_value = True
mock_manager.get_job = AsyncMock(return_value=None)
mock_get_manager.return_value = mock_manager
response = client.get("/jobs/non-existent-job")
assert response.status_code == status.HTTP_404_NOT_FOUND
data = response.json()
assert data["detail"]["error"] == "JOB_NOT_FOUND"
def test_get_job_status_service_unavailable(self, client):
"""测试服务不可用时查询任务"""
with patch(
"src.functional_scaffold.api.routes.get_job_manager", new_callable=AsyncMock
) as mock_get_manager:
mock_manager = MagicMock()
mock_manager.is_available.return_value = False
mock_get_manager.return_value = mock_manager
response = client.get("/jobs/abc123def456")
assert response.status_code == status.HTTP_503_SERVICE_UNAVAILABLE
class TestWebhook:
"""测试 Webhook 回调"""
@pytest.mark.asyncio
async def test_send_webhook_success(self):
"""测试成功发送 Webhook"""
manager = JobManager()
mock_redis = AsyncMock()
mock_redis.hgetall = AsyncMock(
return_value={
"status": "completed",
"algorithm": "PrimeChecker",
"created_at": "2026-02-02T10:00:00+00:00",
"completed_at": "2026-02-02T10:00:02+00:00",
"result": '{"number": 17, "is_prime": true}',
"error": "",
"metadata": '{"elapsed_time": 0.001}',
}
)
manager._redis_client = mock_redis
mock_response = MagicMock()
mock_response.status_code = 200
mock_http = AsyncMock()
mock_http.post = AsyncMock(return_value=mock_response)
manager._http_client = mock_http
await manager._send_webhook("test-job-id", "https://example.com/callback")
mock_http.post.assert_called_once()
call_args = mock_http.post.call_args
assert call_args[0][0] == "https://example.com/callback"
assert "json" in call_args[1]
@pytest.mark.asyncio
async def test_send_webhook_retry_on_failure(self):
"""测试 Webhook 失败时重试"""
manager = JobManager()
mock_redis = AsyncMock()
mock_redis.hgetall = AsyncMock(
return_value={
"status": "completed",
"algorithm": "PrimeChecker",
"created_at": "2026-02-02T10:00:00+00:00",
"completed_at": "2026-02-02T10:00:02+00:00",
"result": "{}",
"error": "",
"metadata": "{}",
}
)
manager._redis_client = mock_redis
mock_http = AsyncMock()
mock_http.post = AsyncMock(side_effect=Exception("Connection error"))
manager._http_client = mock_http
# 使用较短的重试间隔进行测试
with patch("src.functional_scaffold.core.job_manager.settings") as mock_settings:
mock_settings.webhook_max_retries = 2
mock_settings.webhook_timeout = 1
await manager._send_webhook("test-job-id", "https://example.com/callback")
# 验证重试次数
assert mock_http.post.call_count == 2