From b1077e78e9f66af7fdfd074ddd7e0c2c1260342f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roog=20=28=E9=A1=BE=E6=96=B0=E5=9F=B9=29?= Date: Mon, 2 Feb 2026 14:59:34 +0800 Subject: [PATCH] =?UTF-8?q?main:=E5=88=A0=E9=99=A4=E5=A4=9A=E4=BD=99?= =?UTF-8?q?=E6=96=87=E6=A1=A3=E5=B9=B6=E6=B8=85=E7=90=86=E9=A1=B9=E7=9B=AE?= =?UTF-8?q?=E7=9B=AE=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 变更内容: - 移除冗余文档,包括 Grafana 指南、指标对比、修复总结、OpenAPI 规范等。 - 精简项目文档结构,优化 README 文件内容。 - 提升文档层次清晰度,集中核心指南。 --- README.md | 34 +- config/metrics.yaml | 37 +- docs/algorithm-development.md | 556 ++++++++++++++++ docs/api-reference.md | 441 ++++++++++++ docs/api/README.md | 58 ++ docs/{swagger => api}/openapi.json | 342 +++++++++- docs/getting-started.md | 249 +++++++ docs/grafana-dashboard-guide.md | 237 ------- docs/metrics-guide.md | 346 ---------- docs/metrics-improvement-summary.md | 227 ------- docs/monitoring.md | 337 ++++++++++ docs/swagger/README.md | 107 --- monitoring/alerts/rules.yaml | 40 ++ monitoring/grafana/dashboard.json | 630 ++++++++++++++++++ requirements.txt | 3 + .../algorithms/prime_checker.py | 5 +- src/functional_scaffold/api/models.py | 94 ++- src/functional_scaffold/api/routes.py | 168 ++++- src/functional_scaffold/config.py | 5 + src/functional_scaffold/core/job_manager.py | 381 +++++++++++ src/functional_scaffold/main.py | 18 + tests/test_api.py | 7 +- tests/test_job_manager.py | 401 +++++++++++ 23 files changed, 3763 insertions(+), 960 deletions(-) create mode 100644 docs/algorithm-development.md create mode 100644 docs/api-reference.md create mode 100644 docs/api/README.md rename docs/{swagger => api}/openapi.json (52%) create mode 100644 docs/getting-started.md delete mode 100644 docs/grafana-dashboard-guide.md delete mode 100644 docs/metrics-guide.md delete mode 100644 docs/metrics-improvement-summary.md create mode 100644 docs/monitoring.md delete mode 100644 docs/swagger/README.md create mode 100644 src/functional_scaffold/core/job_manager.py create mode 100644 tests/test_job_manager.py diff --git a/README.md b/README.md index 796505f..6e3dccc 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,7 @@ ## 特性 - ✅ **标准化 API 接口** - 符合 RESTful 规范的 HTTP 接口 +- ✅ **同步/异步调用** - 支持同步调用和异步任务 - ✅ **开箱即用** - 完整的项目结构和配置 - ✅ **自动文档** - Swagger/OpenAPI 自动生成 - ✅ **监控指标** - Prometheus 指标和 Grafana 仪表板 @@ -16,6 +17,16 @@ - ✅ **完整测试** - 单元测试和集成测试 - ✅ **CI/CD** - GitHub Actions 工作流 +## 文档 + +| 文档 | 描述 | +|------|------| +| [快速入门](docs/getting-started.md) | 10 分钟上手指南 | +| [算法开发指南](docs/algorithm-development.md) | 详细的算法开发教程 | +| [API 参考](docs/api-reference.md) | 完整的 API 文档 | +| [监控指南](docs/monitoring.md) | 监控和告警配置 | +| [API 规范](docs/api/README.md) | OpenAPI 规范说明 | + ## 快速开始 ### 前置要求 @@ -76,7 +87,8 @@ docker-compose up ### 核心接口 - `POST /invoke` - 同步调用算法 -- `POST /jobs` - 异步任务接口(预留) +- `POST /jobs` - 创建异步任务 +- `GET /jobs/{job_id}` - 查询任务状态 ### 健康检查 @@ -89,7 +101,7 @@ docker-compose up ## 示例请求 -### 质数判断 +### 同步调用 - 质数判断 ```bash curl -X POST http://localhost:8000/invoke \ @@ -117,6 +129,18 @@ curl -X POST http://localhost:8000/invoke \ } ``` +### 异步任务 + +```bash +# 创建任务 +curl -X POST http://localhost:8000/jobs \ + -H "Content-Type: application/json" \ + -d '{"algorithm": "PrimeChecker", "params": {"number": 17}}' + +# 查询状态 +curl http://localhost:8000/jobs/{job_id} +``` + ## 项目结构 ``` @@ -141,6 +165,8 @@ FunctionalScaffold/ ## 开发指南 +详细的开发指南请参考 [算法开发指南](docs/algorithm-development.md)。 + ### 添加新算法 1. 在 `src/functional_scaffold/algorithms/` 创建新算法文件 @@ -212,6 +238,8 @@ sam deploy --template-file deployment/serverless/aws-lambda.yaml ## 监控 +详细的监控配置请参考 [监控指南](docs/monitoring.md)。 + ### Prometheus 指标 访问 `/metrics` 端点查看可用指标: @@ -220,6 +248,8 @@ sam deploy --template-file deployment/serverless/aws-lambda.yaml - `http_request_duration_seconds` - HTTP 请求延迟 - `algorithm_executions_total` - 算法执行总数 - `algorithm_execution_duration_seconds` - 算法执行延迟 +- `jobs_created_total` - 异步任务创建总数 +- `jobs_completed_total` - 异步任务完成总数 ### Grafana 仪表板 diff --git a/config/metrics.yaml b/config/metrics.yaml index a7b2ff5..695708f 100644 --- a/config/metrics.yaml +++ b/config/metrics.yaml @@ -69,10 +69,33 @@ custom_metrics: labels: [] buckets: [10, 100, 1000, 10000, 100000, 1000000] - # 添加更多自定义指标... - # my_custom_metric: - # name: "my_metric_name" - # type: counter|gauge|histogram - # description: "指标描述" - # labels: [label1, label2] - # buckets: [...] # 仅 histogram 需要 + # 异步任务指标 + jobs_created: + name: "jobs_created_total" + type: counter + description: "创建的异步任务总数" + labels: [algorithm] + + jobs_completed: + name: "jobs_completed_total" + type: counter + description: "完成的异步任务总数" + labels: [algorithm, status] + + job_execution_duration: + name: "job_execution_duration_seconds" + type: histogram + description: "异步任务执行时间" + labels: [algorithm] + buckets: [0.1, 0.5, 1, 5, 10, 30, 60, 120, 300] + + webhook_deliveries: + name: "webhook_deliveries_total" + type: counter + description: "Webhook 回调发送总数" + labels: [status] + prime_check_total: + name: "prime_check" + type: counter + description: "出现问题的次数" + labels: [status] \ No newline at end of file diff --git a/docs/algorithm-development.md b/docs/algorithm-development.md new file mode 100644 index 0000000..ee4ca76 --- /dev/null +++ b/docs/algorithm-development.md @@ -0,0 +1,556 @@ +# 算法开发指南 + +本文档详细介绍如何在 FunctionalScaffold 框架中开发算法,包括最佳实践、高级特性和常见模式。 + +## 算法架构 + +### 核心概念 + +``` +┌─────────────────────────────────────────────────────────────┐ +│ HTTP 请求 │ +└─────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ FastAPI 路由层 │ +│ - 参数验证 (Pydantic) │ +│ - 请求 ID 生成 │ +│ - 错误处理 │ +└─────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ BaseAlgorithm.execute() │ +│ - 自动计时 │ +│ - 指标记录 │ +│ - 异常捕获 │ +└─────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ YourAlgorithm.process() │ +│ ★ 你只需要实现这个方法 ★ │ +└─────────────────────────────────────────────────────────────┘ +``` + +### BaseAlgorithm 基类 + +所有算法必须继承 `BaseAlgorithm` 类: + +```python +from abc import ABC, abstractmethod +from typing import Any, Dict + +class BaseAlgorithm(ABC): + """算法基类""" + + def __init__(self): + self.name = self.__class__.__name__ + self.version = "1.0.0" + + @abstractmethod + def process(self, *args, **kwargs) -> Dict[str, Any]: + """算法处理逻辑 - 子类必须实现""" + pass + + def execute(self, *args, **kwargs) -> Dict[str, Any]: + """执行算法 - 自动处理埋点和错误""" + # 框架自动处理:计时、日志、指标、异常 + pass +``` + +## 开发算法 + +### 基础示例 + +```python +# src/functional_scaffold/algorithms/text_processor.py +from typing import Dict, Any +from .base import BaseAlgorithm + +class TextProcessor(BaseAlgorithm): + """文本处理算法""" + + def __init__(self): + super().__init__() + self.version = "1.0.0" + + def process(self, text: str, operation: str = "upper") -> Dict[str, Any]: + """ + 处理文本 + + Args: + text: 输入文本 + operation: 操作类型 (upper/lower/reverse) + + Returns: + 处理结果 + """ + if operation == "upper": + result = text.upper() + elif operation == "lower": + result = text.lower() + elif operation == "reverse": + result = text[::-1] + else: + raise ValueError(f"不支持的操作: {operation}") + + return { + "original": text, + "processed": result, + "operation": operation, + "length": len(result) + } +``` + +### 带模型加载的算法 + +```python +# src/functional_scaffold/algorithms/ml_predictor.py +from typing import Dict, Any +import logging +from .base import BaseAlgorithm + +logger = logging.getLogger(__name__) + +class MLPredictor(BaseAlgorithm): + """机器学习预测算法""" + + def __init__(self, model_path: str = None): + super().__init__() + self.model = None + self.model_path = model_path or "models/default.pkl" + self._load_model() + + def _load_model(self): + """加载模型(在初始化时执行一次)""" + logger.info(f"加载模型: {self.model_path}") + # import joblib + # self.model = joblib.load(self.model_path) + self.model = "mock_model" # 示例 + logger.info("模型加载完成") + + def process(self, features: list) -> Dict[str, Any]: + """ + 执行预测 + + Args: + features: 特征向量 + + Returns: + 预测结果 + """ + if self.model is None: + raise RuntimeError("模型未加载") + + # prediction = self.model.predict([features])[0] + prediction = sum(features) / len(features) # 示例 + + return { + "features": features, + "prediction": prediction, + "model_version": self.version + } +``` + +### 带外部服务调用的算法 + +```python +# src/functional_scaffold/algorithms/data_fetcher.py +from typing import Dict, Any +import httpx +from .base import BaseAlgorithm +from ..config import settings + +class DataFetcher(BaseAlgorithm): + """数据获取算法""" + + def __init__(self): + super().__init__() + self.api_base_url = settings.external_api_url or "https://api.example.com" + + async def _fetch_data(self, endpoint: str) -> dict: + """异步获取数据""" + async with httpx.AsyncClient() as client: + response = await client.get(f"{self.api_base_url}/{endpoint}") + response.raise_for_status() + return response.json() + + def process(self, data_id: str) -> Dict[str, Any]: + """ + 获取并处理数据 + + Args: + data_id: 数据 ID + + Returns: + 处理后的数据 + """ + import asyncio + + # 在同步方法中调用异步函数 + loop = asyncio.new_event_loop() + try: + data = loop.run_until_complete(self._fetch_data(f"data/{data_id}")) + finally: + loop.close() + + # 处理数据 + processed = self._transform(data) + + return { + "data_id": data_id, + "raw_data": data, + "processed_data": processed + } + + def _transform(self, data: dict) -> dict: + """数据转换""" + return {k: v for k, v in data.items() if v is not None} +``` + +## 数据模型定义 + +### 请求模型 + +```python +# src/functional_scaffold/api/models.py +from pydantic import BaseModel, Field, ConfigDict, field_validator +from typing import List, Optional + +class TextProcessRequest(BaseModel): + """文本处理请求""" + + model_config = ConfigDict( + json_schema_extra={ + "example": { + "text": "Hello World", + "operation": "upper" + } + } + ) + + text: str = Field(..., min_length=1, max_length=10000, description="输入文本") + operation: str = Field("upper", description="操作类型") + + @field_validator("operation") + @classmethod + def validate_operation(cls, v): + allowed = ["upper", "lower", "reverse"] + if v not in allowed: + raise ValueError(f"operation 必须是 {allowed} 之一") + return v + + +class MLPredictRequest(BaseModel): + """ML 预测请求""" + + model_config = ConfigDict( + json_schema_extra={ + "example": { + "features": [1.0, 2.0, 3.0, 4.0] + } + } + ) + + features: List[float] = Field(..., min_length=1, description="特征向量") +``` + +### 响应模型 + +```python +class TextProcessResponse(BaseModel): + """文本处理响应""" + + request_id: str = Field(..., description="请求 ID") + status: str = Field(..., description="处理状态") + result: Dict[str, Any] = Field(..., description="处理结果") + metadata: Dict[str, Any] = Field(..., description="元数据") +``` + +## 路由注册 + +### 同步接口 + +```python +# src/functional_scaffold/api/routes.py +from fastapi import APIRouter, HTTPException, Depends, status +from .models import TextProcessRequest, TextProcessResponse +from .dependencies import get_request_id +from ..algorithms.text_processor import TextProcessor + +router = APIRouter() + +@router.post( + "/text/process", + response_model=TextProcessResponse, + summary="文本处理", + description="对输入文本执行指定操作", +) +async def process_text( + request: TextProcessRequest, + request_id: str = Depends(get_request_id), +): + """文本处理端点""" + try: + processor = TextProcessor() + result = processor.execute(request.text, request.operation) + + if not result["success"]: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail={"error": "ALGORITHM_ERROR", "message": result["error"]} + ) + + return TextProcessResponse( + request_id=request_id, + status="success", + result=result["result"], + metadata=result["metadata"] + ) + + except ValueError as e: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail={"error": "VALIDATION_ERROR", "message": str(e)} + ) +``` + +### 异步任务接口 + +算法注册后自动支持异步调用,无需额外代码: + +```bash +# 创建异步任务 +curl -X POST http://localhost:8000/jobs \ + -H "Content-Type: application/json" \ + -d '{ + "algorithm": "TextProcessor", + "params": {"text": "hello", "operation": "upper"} + }' +``` + +## 自定义指标 + +### 定义指标 + +在 `config/metrics.yaml` 中添加: + +```yaml +custom_metrics: + # 文本处理统计 + text_process_total: + name: "text_process_total" + type: counter + description: "文本处理总数" + labels: [operation] + + text_length_histogram: + name: "text_length_histogram" + type: histogram + description: "处理文本长度分布" + labels: [] + buckets: [10, 50, 100, 500, 1000, 5000, 10000] +``` + +### 记录指标 + +```python +from ..core.metrics_unified import incr, observe + +class TextProcessor(BaseAlgorithm): + def process(self, text: str, operation: str = "upper") -> Dict[str, Any]: + # 记录操作计数 + incr("text_process_total", {"operation": operation}) + + # 记录文本长度 + observe("text_length_histogram", {}, len(text)) + + # ... 算法逻辑 ... +``` + +## 错误处理 + +### 自定义异常 + +```python +# src/functional_scaffold/core/errors.py +class AlgorithmError(FunctionalScaffoldError): + """算法执行错误""" + pass + +class ModelNotLoadedError(AlgorithmError): + """模型未加载错误""" + pass + +class InvalidInputError(AlgorithmError): + """无效输入错误""" + pass +``` + +### 在算法中使用 + +```python +from ..core.errors import InvalidInputError, ModelNotLoadedError + +class MLPredictor(BaseAlgorithm): + def process(self, features: list) -> Dict[str, Any]: + if not features: + raise InvalidInputError("特征向量不能为空") + + if self.model is None: + raise ModelNotLoadedError("模型未加载,请检查模型文件") + + # ... 算法逻辑 ... +``` + +## 测试 + +### 单元测试 + +```python +# tests/test_text_processor.py +import pytest +from src.functional_scaffold.algorithms.text_processor import TextProcessor + +class TestTextProcessor: + """文本处理算法测试""" + + def test_upper_operation(self): + """测试大写转换""" + processor = TextProcessor() + result = processor.process("hello", "upper") + + assert result["processed"] == "HELLO" + assert result["operation"] == "upper" + + def test_lower_operation(self): + """测试小写转换""" + processor = TextProcessor() + result = processor.process("HELLO", "lower") + + assert result["processed"] == "hello" + + def test_reverse_operation(self): + """测试反转""" + processor = TextProcessor() + result = processor.process("hello", "reverse") + + assert result["processed"] == "olleh" + + def test_invalid_operation(self): + """测试无效操作""" + processor = TextProcessor() + + with pytest.raises(ValueError, match="不支持的操作"): + processor.process("hello", "invalid") + + def test_execute_wrapper(self): + """测试 execute 包装器""" + processor = TextProcessor() + result = processor.execute("hello", "upper") + + assert result["success"] is True + assert result["result"]["processed"] == "HELLO" + assert "elapsed_time" in result["metadata"] +``` + +### API 集成测试 + +```python +# tests/test_text_api.py +import pytest +from fastapi import status + +class TestTextProcessAPI: + """文本处理 API 测试""" + + def test_process_text_success(self, client): + """测试成功处理""" + response = client.post( + "/text/process", + json={"text": "hello", "operation": "upper"} + ) + + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert data["status"] == "success" + assert data["result"]["processed"] == "HELLO" + + def test_process_text_invalid_operation(self, client): + """测试无效操作""" + response = client.post( + "/text/process", + json={"text": "hello", "operation": "invalid"} + ) + + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY +``` + +## 最佳实践 + +### 1. 保持 process() 方法简洁 + +```python +# ✅ 好的做法 +def process(self, data): + validated = self._validate(data) + transformed = self._transform(validated) + result = self._compute(transformed) + return self._format_output(result) + +# ❌ 避免 +def process(self, data): + # 200 行代码全部写在这里... +``` + +### 2. 使用类型注解 + +```python +# ✅ 好的做法 +def process(self, text: str, max_length: int = 100) -> Dict[str, Any]: + ... + +# ❌ 避免 +def process(self, text, max_length=100): + ... +``` + +### 3. 合理使用日志 + +```python +import logging +logger = logging.getLogger(__name__) + +class MyAlgorithm(BaseAlgorithm): + def process(self, data): + logger.info(f"开始处理数据,大小: {len(data)}") + # ... 处理逻辑 ... + logger.info(f"处理完成,结果大小: {len(result)}") + return result +``` + +### 4. 资源管理 + +```python +class ResourceIntensiveAlgorithm(BaseAlgorithm): + def __init__(self): + super().__init__() + self._resource = None + + def _ensure_resource(self): + """延迟加载资源""" + if self._resource is None: + self._resource = self._load_heavy_resource() + + def process(self, data): + self._ensure_resource() + return self._resource.process(data) +``` + +## 参考 + +- [快速入门指南](./getting-started.md) +- [API 参考文档](./api-reference.md) +- [监控指南](./monitoring.md) diff --git a/docs/api-reference.md b/docs/api-reference.md new file mode 100644 index 0000000..bff380c --- /dev/null +++ b/docs/api-reference.md @@ -0,0 +1,441 @@ +# API 参考文档 + +本文档详细描述 FunctionalScaffold 提供的所有 API 端点。 + +## 基础信息 + +- **Base URL**: `http://localhost:8000` +- **Content-Type**: `application/json` +- **认证**: 当前版本无需认证 + +## 端点概览 + +| 方法 | 端点 | 描述 | +|------|------|------| +| POST | `/invoke` | 同步调用算法 | +| POST | `/jobs` | 创建异步任务 | +| GET | `/jobs/{job_id}` | 查询任务状态 | +| GET | `/healthz` | 存活检查 | +| GET | `/readyz` | 就绪检查 | +| GET | `/metrics` | Prometheus 指标 | + +--- + +## 同步调用接口 + +### POST /invoke + +同步调用质数判断算法,立即返回结果。 + +#### 请求 + +```http +POST /invoke +Content-Type: application/json +``` + +**请求体** + +| 字段 | 类型 | 必填 | 描述 | +|------|------|------|------| +| number | integer | 是 | 待判断的整数 | + +**示例** + +```json +{ + "number": 17 +} +``` + +#### 响应 + +**成功响应 (200 OK)** + +```json +{ + "request_id": "550e8400-e29b-41d4-a716-446655440000", + "status": "success", + "result": { + "number": 17, + "is_prime": true, + "factors": [], + "algorithm": "trial_division" + }, + "metadata": { + "algorithm": "PrimeChecker", + "version": "1.0.0", + "elapsed_time": 0.001 + } +} +``` + +**错误响应 (400 Bad Request)** + +```json +{ + "error": "VALIDATION_ERROR", + "message": "number must be an integer", + "details": {"field": "number", "value": "abc"}, + "request_id": "550e8400-e29b-41d4-a716-446655440000" +} +``` + +#### 示例调用 + +```bash +curl -X POST http://localhost:8000/invoke \ + -H "Content-Type: application/json" \ + -d '{"number": 17}' +``` + +--- + +## 异步任务接口 + +### POST /jobs + +创建异步任务,立即返回任务 ID,任务在后台执行。 + +#### 请求 + +```http +POST /jobs +Content-Type: application/json +``` + +**请求体** + +| 字段 | 类型 | 必填 | 描述 | +|------|------|------|------| +| algorithm | string | 是 | 算法名称(如 PrimeChecker) | +| params | object | 是 | 算法参数 | +| webhook | string | 否 | 任务完成后的回调 URL | + +**示例** + +```json +{ + "algorithm": "PrimeChecker", + "params": {"number": 17}, + "webhook": "https://example.com/callback" +} +``` + +#### 响应 + +**成功响应 (202 Accepted)** + +```json +{ + "job_id": "a1b2c3d4e5f6", + "status": "pending", + "message": "任务已创建", + "created_at": "2026-02-02T10:00:00+00:00" +} +``` + +**错误响应 (404 Not Found)** + +```json +{ + "detail": { + "error": "ALGORITHM_NOT_FOUND", + "message": "算法 'NonExistent' 不存在", + "details": {"available_algorithms": ["PrimeChecker"]}, + "request_id": "xxx" + } +} +``` + +**错误响应 (503 Service Unavailable)** + +```json +{ + "detail": { + "error": "SERVICE_UNAVAILABLE", + "message": "任务服务暂不可用,请稍后重试", + "request_id": "xxx" + } +} +``` + +#### 示例调用 + +```bash +curl -X POST http://localhost:8000/jobs \ + -H "Content-Type: application/json" \ + -d '{ + "algorithm": "PrimeChecker", + "params": {"number": 17}, + "webhook": "https://webhook.site/your-uuid" + }' +``` + +--- + +### GET /jobs/{job_id} + +查询异步任务的执行状态和结果。 + +#### 请求 + +```http +GET /jobs/{job_id} +``` + +**路径参数** + +| 参数 | 类型 | 描述 | +|------|------|------| +| job_id | string | 任务唯一标识(12位十六进制) | + +#### 响应 + +**成功响应 (200 OK) - 任务进行中** + +```json +{ + "job_id": "a1b2c3d4e5f6", + "status": "running", + "algorithm": "PrimeChecker", + "created_at": "2026-02-02T10:00:00+00:00", + "started_at": "2026-02-02T10:00:01+00:00", + "completed_at": null, + "result": null, + "error": null, + "metadata": null +} +``` + +**成功响应 (200 OK) - 任务完成** + +```json +{ + "job_id": "a1b2c3d4e5f6", + "status": "completed", + "algorithm": "PrimeChecker", + "created_at": "2026-02-02T10:00:00+00:00", + "started_at": "2026-02-02T10:00:01+00:00", + "completed_at": "2026-02-02T10:00:02+00:00", + "result": { + "number": 17, + "is_prime": true, + "factors": [], + "algorithm": "trial_division" + }, + "error": null, + "metadata": { + "algorithm": "PrimeChecker", + "version": "1.0.0", + "elapsed_time": 0.001 + } +} +``` + +**成功响应 (200 OK) - 任务失败** + +```json +{ + "job_id": "a1b2c3d4e5f6", + "status": "failed", + "algorithm": "PrimeChecker", + "created_at": "2026-02-02T10:00:00+00:00", + "started_at": "2026-02-02T10:00:01+00:00", + "completed_at": "2026-02-02T10:00:02+00:00", + "result": null, + "error": "Invalid input: number must be positive", + "metadata": null +} +``` + +**错误响应 (404 Not Found)** + +```json +{ + "detail": { + "error": "JOB_NOT_FOUND", + "message": "任务 'xxx' 不存在或已过期" + } +} +``` + +#### 任务状态说明 + +| 状态 | 描述 | +|------|------| +| pending | 等待执行 | +| running | 执行中 | +| completed | 已完成 | +| failed | 执行失败 | + +#### 示例调用 + +```bash +curl http://localhost:8000/jobs/a1b2c3d4e5f6 +``` + +--- + +### Webhook 回调 + +当任务完成时,如果指定了 webhook URL,系统会发送 POST 请求到该 URL。 + +**回调请求** + +```http +POST {webhook_url} +Content-Type: application/json +``` + +**回调负载** + +```json +{ + "job_id": "a1b2c3d4e5f6", + "status": "completed", + "algorithm": "PrimeChecker", + "result": {"number": 17, "is_prime": true}, + "error": null, + "metadata": {"elapsed_time": 0.001}, + "completed_at": "2026-02-02T10:00:02+00:00" +} +``` + +**重试机制** + +- 最大重试次数:3 次 +- 重试间隔:1s, 5s, 15s(指数退避) +- 超时时间:10 秒 + +--- + +## 健康检查接口 + +### GET /healthz + +存活检查端点,用于 Kubernetes 存活探针。 + +#### 响应 + +**成功响应 (200 OK)** + +```json +{ + "status": "healthy", + "timestamp": 1706868000.123 +} +``` + +#### 示例调用 + +```bash +curl http://localhost:8000/healthz +``` + +--- + +### GET /readyz + +就绪检查端点,用于 Kubernetes 就绪探针。 + +#### 响应 + +**成功响应 (200 OK)** + +```json +{ + "status": "ready", + "timestamp": 1706868000.123, + "checks": { + "algorithm": true + } +} +``` + +#### 示例调用 + +```bash +curl http://localhost:8000/readyz +``` + +--- + +## 监控接口 + +### GET /metrics + +返回 Prometheus 格式的监控指标。 + +#### 响应 + +**成功响应 (200 OK)** + +``` +Content-Type: text/plain; version=0.0.4; charset=utf-8 + +# HELP http_requests_total HTTP 请求总数 +# TYPE http_requests_total counter +http_requests_total{endpoint="/invoke",method="POST",status="success"} 42 + +# HELP http_request_duration_seconds HTTP 请求延迟 +# TYPE http_request_duration_seconds histogram +http_request_duration_seconds_bucket{endpoint="/invoke",method="POST",le="0.01"} 35 +... + +# HELP algorithm_executions_total 算法执行总数 +# TYPE algorithm_executions_total counter +algorithm_executions_total{algorithm="PrimeChecker",status="success"} 42 + +# HELP jobs_created_total 创建的异步任务总数 +# TYPE jobs_created_total counter +jobs_created_total{algorithm="PrimeChecker"} 10 + +# HELP jobs_completed_total 完成的异步任务总数 +# TYPE jobs_completed_total counter +jobs_completed_total{algorithm="PrimeChecker",status="completed"} 8 +jobs_completed_total{algorithm="PrimeChecker",status="failed"} 2 +``` + +#### 可用指标 + +| 指标名称 | 类型 | 标签 | 描述 | +|---------|------|------|------| +| http_requests_total | counter | method, endpoint, status | HTTP 请求总数 | +| http_request_duration_seconds | histogram | method, endpoint | HTTP 请求延迟 | +| http_requests_in_progress | gauge | - | 当前进行中的请求数 | +| algorithm_executions_total | counter | algorithm, status | 算法执行总数 | +| algorithm_execution_duration_seconds | histogram | algorithm | 算法执行延迟 | +| jobs_created_total | counter | algorithm | 创建的异步任务总数 | +| jobs_completed_total | counter | algorithm, status | 完成的异步任务总数 | +| job_execution_duration_seconds | histogram | algorithm | 异步任务执行时间 | +| webhook_deliveries_total | counter | status | Webhook 回调发送总数 | + +#### 示例调用 + +```bash +curl http://localhost:8000/metrics +``` + +--- + +## 错误码说明 + +| 错误码 | HTTP 状态码 | 描述 | +|--------|------------|------| +| VALIDATION_ERROR | 400 | 请求参数验证失败 | +| ALGORITHM_NOT_FOUND | 404 | 指定的算法不存在 | +| JOB_NOT_FOUND | 404 | 任务不存在或已过期 | +| ALGORITHM_ERROR | 500 | 算法执行错误 | +| INTERNAL_ERROR | 500 | 服务器内部错误 | +| SERVICE_UNAVAILABLE | 503 | 服务暂不可用 | + +--- + +## 在线文档 + +启动服务后,可以访问交互式 API 文档: + +- **Swagger UI**: http://localhost:8000/docs +- **ReDoc**: http://localhost:8000/redoc +- **OpenAPI JSON**: http://localhost:8000/openapi.json diff --git a/docs/api/README.md b/docs/api/README.md new file mode 100644 index 0000000..c413795 --- /dev/null +++ b/docs/api/README.md @@ -0,0 +1,58 @@ +# API 文档 + +本目录包含 API 相关文档和自动生成的 OpenAPI 规范。 + +## 文件说明 + +- `openapi.json` - 自动生成的 OpenAPI 3.0 规范文件 + +## 生成文档 + +运行以下命令更新 OpenAPI 规范: + +```bash +python scripts/export_openapi.py +``` + +## 在线文档 + +启动应用后,访问以下 URL 查看交互式文档: + +| 文档类型 | 地址 | +|---------|------| +| Swagger UI | http://localhost:8000/docs | +| ReDoc | http://localhost:8000/redoc | +| OpenAPI JSON | http://localhost:8000/openapi.json | + +## API 端点 + +### 核心接口 + +| 方法 | 端点 | 描述 | +|------|------|------| +| POST | `/invoke` | 同步调用算法 | +| POST | `/jobs` | 创建异步任务 | +| GET | `/jobs/{job_id}` | 查询任务状态 | + +### 健康检查 + +| 方法 | 端点 | 描述 | +|------|------|------| +| GET | `/healthz` | 存活检查 | +| GET | `/readyz` | 就绪检查 | + +### 监控 + +| 方法 | 端点 | 描述 | +|------|------|------| +| GET | `/metrics` | Prometheus 指标 | + +## 详细文档 + +完整的 API 参考文档请查看:[API 参考文档](../api-reference.md) + +## 注意事项 + +- `openapi.json` 是自动生成的,请勿手动编辑 +- API 变更后需要重新运行导出脚本 +- 确保 Pydantic 模型包含完整的文档字符串和示例 diff --git a/docs/swagger/openapi.json b/docs/api/openapi.json similarity index 52% rename from docs/swagger/openapi.json rename to docs/api/openapi.json index f4da82b..d4e5451 100644 --- a/docs/swagger/openapi.json +++ b/docs/api/openapi.json @@ -135,15 +135,148 @@ "tags": [ "Algorithm" ], - "summary": "异步任务接口(预留)", - "description": "异步任务接口,当前版本未实现", + "summary": "创建异步任务", + "description": "创建异步任务,立即返回任务 ID,任务在后台执行", "operationId": "create_job_jobs_post", + "parameters": [ + { + "name": "x-request-id", + "in": "header", + "required": false, + "schema": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "title": "X-Request-Id" + } + } + ], + "requestBody": { + "required": true, + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/JobRequest" + } + } + } + }, "responses": { - "501": { - "description": "Successful Response", + "202": { + "description": "任务已创建", "content": { "application/json": { - "schema": {} + "schema": { + "$ref": "#/components/schemas/JobCreateResponse" + } + } + } + }, + "400": { + "description": "请求参数错误", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorResponse" + } + } + } + }, + "404": { + "description": "算法不存在", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorResponse" + } + } + } + }, + "503": { + "description": "服务不可用", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorResponse" + } + } + } + }, + "422": { + "description": "Validation Error", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/HTTPValidationError" + } + } + } + } + } + } + }, + "/jobs/{job_id}": { + "get": { + "tags": [ + "Algorithm" + ], + "summary": "查询任务状态", + "description": "查询异步任务的执行状态和结果", + "operationId": "get_job_status_jobs__job_id__get", + "parameters": [ + { + "name": "job_id", + "in": "path", + "required": true, + "schema": { + "type": "string", + "title": "Job Id" + } + } + ], + "responses": { + "200": { + "description": "成功", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/JobStatusResponse" + } + } + } + }, + "404": { + "description": "任务不存在或已过期", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorResponse" + } + } + } + }, + "503": { + "description": "服务不可用", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorResponse" + } + } + } + }, + "422": { + "description": "Validation Error", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/HTTPValidationError" + } } } } @@ -330,6 +463,205 @@ "status": "success" } }, + "JobCreateResponse": { + "properties": { + "job_id": { + "type": "string", + "title": "Job Id", + "description": "任务唯一标识" + }, + "status": { + "$ref": "#/components/schemas/JobStatus", + "description": "任务状态" + }, + "message": { + "type": "string", + "title": "Message", + "description": "状态消息" + }, + "created_at": { + "type": "string", + "title": "Created At", + "description": "创建时间(ISO 8601)" + } + }, + "type": "object", + "required": [ + "job_id", + "status", + "message", + "created_at" + ], + "title": "JobCreateResponse", + "description": "任务创建响应", + "example": { + "created_at": "2026-02-02T10:00:00Z", + "job_id": "a1b2c3d4e5f6", + "message": "任务已创建", + "status": "pending" + } + }, + "JobRequest": { + "properties": { + "algorithm": { + "type": "string", + "title": "Algorithm", + "description": "算法名称" + }, + "params": { + "additionalProperties": true, + "type": "object", + "title": "Params", + "description": "算法参数" + }, + "webhook": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "title": "Webhook", + "description": "回调 URL" + } + }, + "type": "object", + "required": [ + "algorithm", + "params" + ], + "title": "JobRequest", + "description": "异步任务请求", + "example": { + "algorithm": "PrimeChecker", + "params": { + "number": 17 + }, + "webhook": "https://example.com/callback" + } + }, + "JobStatus": { + "type": "string", + "enum": [ + "pending", + "running", + "completed", + "failed" + ], + "title": "JobStatus", + "description": "任务状态枚举" + }, + "JobStatusResponse": { + "properties": { + "job_id": { + "type": "string", + "title": "Job Id", + "description": "任务唯一标识" + }, + "status": { + "$ref": "#/components/schemas/JobStatus", + "description": "任务状态" + }, + "algorithm": { + "type": "string", + "title": "Algorithm", + "description": "算法名称" + }, + "created_at": { + "type": "string", + "title": "Created At", + "description": "创建时间(ISO 8601)" + }, + "started_at": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "title": "Started At", + "description": "开始执行时间(ISO 8601)" + }, + "completed_at": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "title": "Completed At", + "description": "完成时间(ISO 8601)" + }, + "result": { + "anyOf": [ + { + "additionalProperties": true, + "type": "object" + }, + { + "type": "null" + } + ], + "title": "Result", + "description": "执行结果(仅完成时返回)" + }, + "error": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "title": "Error", + "description": "错误信息(仅失败时返回)" + }, + "metadata": { + "anyOf": [ + { + "additionalProperties": true, + "type": "object" + }, + { + "type": "null" + } + ], + "title": "Metadata", + "description": "元数据信息" + } + }, + "type": "object", + "required": [ + "job_id", + "status", + "algorithm", + "created_at" + ], + "title": "JobStatusResponse", + "description": "任务状态查询响应", + "example": { + "algorithm": "PrimeChecker", + "completed_at": "2026-02-02T10:00:02Z", + "created_at": "2026-02-02T10:00:00Z", + "job_id": "a1b2c3d4e5f6", + "metadata": { + "elapsed_time": 0.001 + }, + "result": { + "is_prime": true, + "number": 17 + }, + "started_at": "2026-02-02T10:00:01Z", + "status": "completed" + } + }, "ReadinessResponse": { "properties": { "status": { diff --git a/docs/getting-started.md b/docs/getting-started.md new file mode 100644 index 0000000..82321f7 --- /dev/null +++ b/docs/getting-started.md @@ -0,0 +1,249 @@ +# 快速入门指南 + +本指南帮助算法同学快速上手 FunctionalScaffold 脚手架,在 10 分钟内完成第一个算法服务的开发和部署。 + +## 核心理念 + +**算法同学只需关注核心算法逻辑**,框架自动处理: +- HTTP 接口封装 +- 参数验证 +- 错误处理 +- 日志记录 +- 性能指标 +- 健康检查 +- 容器化部署 + +## 环境准备 + +### 1. 安装依赖 + +```bash +# 克隆项目 +git clone +cd FunctionalScaffold + +# 创建虚拟环境 +python -m venv venv +source venv/bin/activate # Windows: venv\Scripts\activate + +# 安装依赖 +pip install -e ".[dev]" +``` + +### 2. 启动服务 + +```bash +# 开发模式(自动重载) +uvicorn src.functional_scaffold.main:app --reload --port 8000 +``` + +### 3. 验证服务 + +```bash +# 健康检查 +curl http://localhost:8000/healthz + +# 调用示例算法 +curl -X POST http://localhost:8000/invoke \ + -H "Content-Type: application/json" \ + -d '{"number": 17}' +``` + +## 添加你的第一个算法 + +### 步骤 1:创建算法类 + +在 `src/functional_scaffold/algorithms/` 目录下创建新文件: + +```python +# src/functional_scaffold/algorithms/my_algorithm.py +from typing import Dict, Any +from .base import BaseAlgorithm + +class MyAlgorithm(BaseAlgorithm): + """我的算法类""" + + def process(self, input_data: Any) -> Dict[str, Any]: + """ + 算法处理逻辑 - 只需实现这个方法! + + Args: + input_data: 输入数据 + + Returns: + Dict[str, Any]: 处理结果 + """ + # 在这里实现你的算法逻辑 + result = self._do_calculation(input_data) + + return { + "input": input_data, + "output": result, + "message": "处理成功" + } + + def _do_calculation(self, data): + """内部计算方法""" + # 你的算法实现 + return data * 2 +``` + +### 步骤 2:注册算法 + +在 `src/functional_scaffold/algorithms/__init__.py` 中添加导出: + +```python +from .base import BaseAlgorithm +from .prime_checker import PrimeChecker +from .my_algorithm import MyAlgorithm # 添加这行 + +__all__ = ["BaseAlgorithm", "PrimeChecker", "MyAlgorithm"] # 添加到列表 +``` + +### 步骤 3:添加 API 端点 + +在 `src/functional_scaffold/api/routes.py` 中添加路由: + +```python +from ..algorithms.my_algorithm import MyAlgorithm + +@router.post("/my-endpoint") +async def my_endpoint( + request: MyRequest, # 需要定义请求模型 + request_id: str = Depends(get_request_id) +): + """我的算法端点""" + algorithm = MyAlgorithm() + result = algorithm.execute(request.data) + + if not result["success"]: + raise HTTPException(status_code=500, detail=result["error"]) + + return { + "request_id": request_id, + "status": "success", + "result": result["result"], + "metadata": result["metadata"] + } +``` + +### 步骤 4:定义请求模型 + +在 `src/functional_scaffold/api/models.py` 中添加: + +```python +class MyRequest(BaseModel): + """我的请求模型""" + + model_config = ConfigDict( + json_schema_extra={ + "example": {"data": 10} + } + ) + + data: int = Field(..., description="输入数据") +``` + +### 步骤 5:测试 + +```bash +curl -X POST http://localhost:8000/my-endpoint \ + -H "Content-Type: application/json" \ + -d '{"data": 10}' +``` + +## 使用异步任务 + +对于耗时较长的算法,使用异步任务接口: + +### 创建异步任务 + +```bash +curl -X POST http://localhost:8000/jobs \ + -H "Content-Type: application/json" \ + -d '{ + "algorithm": "MyAlgorithm", + "params": {"data": 10}, + "webhook": "https://your-callback-url.com/notify" + }' +``` + +响应: +```json +{ + "job_id": "a1b2c3d4e5f6", + "status": "pending", + "message": "任务已创建", + "created_at": "2026-02-02T10:00:00Z" +} +``` + +### 查询任务状态 + +```bash +curl http://localhost:8000/jobs/a1b2c3d4e5f6 +``` + +响应: +```json +{ + "job_id": "a1b2c3d4e5f6", + "status": "completed", + "algorithm": "MyAlgorithm", + "result": {"input": 10, "output": 20}, + "metadata": {"elapsed_time": 0.001} +} +``` + +## 本地开发技巧 + +### 查看 API 文档 + +启动服务后访问: +- Swagger UI: http://localhost:8000/docs +- ReDoc: http://localhost:8000/redoc + +### 运行测试 + +```bash +# 运行所有测试 +pytest tests/ -v + +# 运行单个测试文件 +pytest tests/test_algorithms.py -v +``` + +### 代码格式化 + +```bash +# 格式化代码 +black src/ tests/ + +# 检查代码规范 +ruff check src/ tests/ +``` + +## 下一步 + +- [算法开发详细指南](./algorithm-development.md) - 深入了解算法开发 +- [API 参考文档](./api-reference.md) - 完整的 API 说明 +- [监控指南](./monitoring.md) - 了解监控和告警 +- [部署指南](./deployment.md) - 生产环境部署 + +## 常见问题 + +### Q: 如何处理算法中的异常? + +A: 在 `process()` 方法中抛出异常即可,框架会自动捕获并返回标准错误响应。 + +### Q: 如何添加自定义指标? + +A: 在 `config/metrics.yaml` 中定义指标,然后在代码中使用 `incr()` 或 `observe()` 记录。 + +### Q: 如何访问外部服务(数据库、OSS)? + +A: 在 `config.py` 中添加配置项,通过环境变量注入连接信息。 + +### Q: 算法需要加载大模型文件怎么办? + +A: 在算法类的 `__init__` 方法中加载模型,框架会在容器启动时初始化。 diff --git a/docs/grafana-dashboard-guide.md b/docs/grafana-dashboard-guide.md deleted file mode 100644 index a7295fd..0000000 --- a/docs/grafana-dashboard-guide.md +++ /dev/null @@ -1,237 +0,0 @@ -# Grafana Dashboard 导入和使用指南 - -## Dashboard 概述 - -新的 dashboard 包含 10 个面板,全面展示应用的监控指标: - -### 第一行:核心性能指标 -1. **HTTP 请求速率 (QPS)** - 每秒请求数,按端点和方法分组 -2. **HTTP 请求延迟 (P50/P95/P99)** - 请求响应时间的百分位数 - -### 第二行:关键指标 -3. **请求成功率** - 成功请求占比(仪表盘) -4. **当前并发请求数** - 实时并发数(仪表盘) -5. **HTTP 请求总数** - 累计请求数(统计卡片) -6. **算法执行总数** - 累计算法调用数(统计卡片) - -### 第三行:算法性能 -7. **算法执行速率** - 每秒算法执行次数 -8. **算法执行延迟 (P50/P95/P99)** - 算法执行时间的百分位数 - -### 第四行:分布分析 -9. **请求分布(按端点)** - 饼图展示各端点的请求占比 -10. **请求状态分布** - 饼图展示成功/失败请求占比 - -## 导入步骤 - -### 1. 配置 Prometheus 数据源 - -首先确保 Prometheus 数据源已正确配置: - -1. 打开 Grafana:http://localhost:3000 -2. 登录(默认:admin/admin) -3. 进入 **Configuration** → **Data Sources** -4. 点击 **Add data source** -5. 选择 **Prometheus** -6. 配置: - - **Name**: `Prometheus`(必须是这个名称) - - **URL**: `http://prometheus:9090`(注意:使用服务名,不是 localhost) - - **Access**: Server (default) -7. 点击 **Save & Test**,确保显示绿色的成功提示 - -### 2. 导入 Dashboard - -有两种方式导入 dashboard: - -#### 方式 1:通过 JSON 文件导入(推荐) - -1. 在 Grafana 左侧菜单,点击 **Dashboards** → **Import** -2. 点击 **Upload JSON file** -3. 选择文件:`monitoring/grafana/dashboard.json` -4. 在导入页面: - - **Name**: FunctionalScaffold 监控仪表板 - - **Folder**: General(或创建新文件夹) - - **Prometheus**: 选择刚才配置的 Prometheus 数据源 -5. 点击 **Import** - -#### 方式 2:通过 JSON 内容导入 - -1. 在 Grafana 左侧菜单,点击 **Dashboards** → **Import** -2. 复制 `monitoring/grafana/dashboard.json` 的全部内容 -3. 粘贴到 **Import via panel json** 文本框 -4. 点击 **Load** -5. 配置数据源并点击 **Import** - -### 3. 验证 Dashboard - -导入成功后,你应该看到: - -- ✅ 所有面板都正常显示 -- ✅ 有数据的面板显示图表和数值 -- ✅ 右上角显示自动刷新(5秒) -- ✅ 时间范围默认为最近 1 小时 - -## 生成测试数据 - -如果 dashboard 中没有数据或数据很少,运行流量生成脚本: - -```bash -# 启动流量生成器 -./scripts/generate_traffic.sh -``` - -这会持续发送请求到应用,生成监控数据。等待 1-2 分钟后,dashboard 中应该会显示丰富的图表。 - -## Dashboard 功能 - -### 自动刷新 - -Dashboard 配置了自动刷新,默认每 5 秒更新一次。你可以在右上角修改刷新间隔: -- 5s(默认) -- 10s -- 30s -- 1m -- 5m - -### 时间范围 - -默认显示最近 1 小时的数据。你可以在右上角修改时间范围: -- Last 5 minutes -- Last 15 minutes -- Last 30 minutes -- Last 1 hour(默认) -- Last 3 hours -- Last 6 hours -- Last 12 hours -- Last 24 hours -- 或自定义时间范围 - -### 实时模式 - -Dashboard 启用了 **Live** 模式(右上角的 Live 按钮),可以实时查看最新数据。 - -### 交互功能 - -- **缩放**:在时间序列图表上拖动选择区域可以放大 -- **图例点击**:点击图例可以隐藏/显示对应的数据系列 -- **Tooltip**:鼠标悬停在图表上查看详细数值 -- **面板全屏**:点击面板标题旁的图标可以全屏查看 - -## 常见问题 - -### 问题 1:数据源连接失败 - -**错误信息**:`dial tcp [::1]:9090: connect: connection refused` - -**解决方案**: -- 确保 Prometheus URL 使用 `http://prometheus:9090`(服务名) -- 不要使用 `http://localhost:9090`(在容器内部无法访问) - -### 问题 2:面板显示 "No data" - -**可能原因**: -1. 应用还没有收到任何请求 -2. Prometheus 还没有抓取到数据 -3. 时间范围选择不当 - -**解决方案**: -1. 发送一些测试请求: - ```bash - curl -X POST http://localhost:8111/invoke \ - -H "Content-Type: application/json" \ - -d '{"number": 17}' - ``` -2. 等待 15-30 秒让 Prometheus 抓取数据 -3. 调整时间范围为 "Last 5 minutes" -4. 运行流量生成脚本:`./scripts/generate_traffic.sh` - -### 问题 3:延迟图表显示 "NaN" 或空值 - -**原因**:直方图数据不足,无法计算百分位数 - -**解决方案**: -- 发送更多请求以积累足够的数据 -- 等待几分钟让数据积累 -- 使用流量生成脚本持续发送请求 - -### 问题 4:数据源变量未正确设置 - -**错误信息**:面板显示 "Datasource not found" - -**解决方案**: -1. 确保 Prometheus 数据源的名称是 `Prometheus` -2. 或者在 dashboard 设置中重新选择数据源: - - 点击右上角的齿轮图标(Dashboard settings) - - 进入 **Variables** 标签 - - 编辑 `DS_PROMETHEUS` 变量 - - 选择正确的 Prometheus 数据源 - -## PromQL 查询说明 - -Dashboard 使用的主要 PromQL 查询: - -### HTTP 请求速率 -```promql -sum(rate(http_requests_total[1m])) by (endpoint, method) -``` - -### HTTP 请求延迟 P95 -```promql -histogram_quantile(0.95, sum(rate(http_request_duration_seconds_bucket[1m])) by (le, endpoint, method)) -``` - -### 请求成功率 -```promql -sum(rate(http_requests_total{status="success"}[5m])) / sum(rate(http_requests_total[5m])) -``` - -### 算法执行速率 -```promql -sum(rate(algorithm_executions_total[1m])) by (algorithm, status) -``` - -## 自定义 Dashboard - -你可以根据需要自定义 dashboard: - -1. **添加新面板**:点击右上角的 "Add panel" 按钮 -2. **编辑面板**:点击面板标题 → Edit -3. **调整布局**:拖动面板调整位置和大小 -4. **保存更改**:点击右上角的保存图标 - -## 导出和分享 - -### 导出 Dashboard - -1. 点击右上角的分享图标 -2. 选择 **Export** 标签 -3. 点击 **Save to file** 下载 JSON 文件 - -### 分享 Dashboard - -1. 点击右上角的分享图标 -2. 选择 **Link** 标签 -3. 复制链接分享给团队成员 - -## 告警配置(可选) - -你可以为面板配置告警规则: - -1. 编辑面板 -2. 切换到 **Alert** 标签 -3. 点击 **Create alert rule from this panel** -4. 配置告警条件和通知渠道 - -## 相关资源 - -- Grafana 官方文档:https://grafana.com/docs/ -- Prometheus 查询语言:https://prometheus.io/docs/prometheus/latest/querying/basics/ -- Dashboard 最佳实践:https://grafana.com/docs/grafana/latest/best-practices/ - -## 技术支持 - -如果遇到问题: -1. 检查 Prometheus 是否正常运行:http://localhost:9090 -2. 检查应用 metrics 端点:http://localhost:8111/metrics -3. 查看 Grafana 日志:`docker-compose logs grafana` -4. 查看 Prometheus 日志:`docker-compose logs prometheus` diff --git a/docs/metrics-guide.md b/docs/metrics-guide.md deleted file mode 100644 index 2eb6d25..0000000 --- a/docs/metrics-guide.md +++ /dev/null @@ -1,346 +0,0 @@ -# 指标记录方案对比与使用指南 - -## 问题背景 - -在多实例部署场景下(Kubernetes、Serverless),原有的内存指标存储方案存在以下问题: - -1. **指标分散**:每个实例独立记录指标,无法聚合 -2. **数据丢失**:实例销毁后指标丢失 -3. **统计不准**:无法获得全局准确的指标视图 - -## 解决方案对比 - -### 方案1:Pushgateway(推荐) - -**原理:** 应用主动推送指标到 Pushgateway,Prometheus 从 Pushgateway 抓取 - -**优点:** -- ✅ Prometheus 官方支持,生态成熟 -- ✅ 实现简单,代码改动小 -- ✅ 适合短生命周期任务(Serverless、批处理) -- ✅ 支持持久化,重启不丢失数据 - -**缺点:** -- ⚠️ 单点故障风险(可通过高可用部署解决) -- ⚠️ 不适合超高频推送(每秒数千次) - -**适用场景:** -- Serverless 函数 -- 批处理任务 -- 短生命周期容器 -- 实例数量动态变化的场景 - -### 方案2:Redis + 自定义 Exporter - -**原理:** 应用将指标写入 Redis,自定义 Exporter 从 Redis 读取并转换为 Prometheus 格式 - -**优点:** -- ✅ 灵活可控,支持复杂聚合逻辑 -- ✅ Redis 高性能,支持高并发写入 -- ✅ 可以实现自定义的指标计算 - -**缺点:** -- ⚠️ 需要自己实现 Exporter,维护成本高 -- ⚠️ 增加了系统复杂度 -- ⚠️ Redis 需要额外的运维成本 - -**适用场景:** -- 需要自定义指标聚合逻辑 -- 超高频指标写入(每秒数万次) -- 需要实时查询指标数据 - -### 方案3:标准 Prometheus Pull 模式(不推荐) - -**原理:** Prometheus 从每个实例抓取指标,在查询时聚合 - -**优点:** -- ✅ Prometheus 标准做法 -- ✅ 无需额外组件 - -**缺点:** -- ❌ 需要服务发现机制(Kubernetes Service Discovery) -- ❌ 短生命周期实例可能来不及抓取 -- ❌ 实例销毁后数据丢失 - -**适用场景:** -- 长生命周期服务 -- 实例数量相对固定 -- 有完善的服务发现机制 - -## 使用指南 - -### 方案1:Pushgateway(推荐) - -#### 1. 启动服务 - -```bash -cd deployment -docker-compose up -d redis pushgateway prometheus grafana -``` - -#### 2. 修改代码 - -在 `src/functional_scaffold/api/routes.py` 中: - -```python -# 替换导入 -from functional_scaffold.core.metrics_pushgateway import ( - track_request, - track_algorithm_execution, -) - -# 使用方式不变 -@router.post("/invoke") -@track_request("POST", "/invoke") -async def invoke_algorithm(request: InvokeRequest): - # ... 业务逻辑 -``` - -#### 3. 配置环境变量 - -在 `.env` 文件中: - -```bash -PUSHGATEWAY_URL=localhost:9091 -METRICS_JOB_NAME=functional_scaffold -INSTANCE_ID=instance-1 # 可选,默认使用 HOSTNAME -``` - -#### 4. 验证 - -```bash -# 查看 Pushgateway 指标 -curl http://localhost:9091/metrics - -# 查看 Prometheus -open http://localhost:9090 - -# 查询示例 -http_requests_total{job="functional_scaffold"} -``` - -### 方案2:Redis + Exporter - -#### 1. 启动服务 - -```bash -cd deployment -docker-compose up -d redis redis-exporter prometheus grafana -``` - -#### 2. 修改代码 - -在 `src/functional_scaffold/api/routes.py` 中: - -```python -# 替换导入 -from functional_scaffold.core.metrics_redis import ( - track_request, - track_algorithm_execution, -) - -# 使用方式不变 -@router.post("/invoke") -@track_request("POST", "/invoke") -async def invoke_algorithm(request: InvokeRequest): - # ... 业务逻辑 -``` - -#### 3. 配置环境变量 - -在 `.env` 文件中: - -```bash -REDIS_HOST=localhost -REDIS_PORT=6379 -REDIS_METRICS_DB=0 -REDIS_PASSWORD= # 可选 -INSTANCE_ID=instance-1 # 可选 -``` - -#### 4. 安装 Redis 依赖 - -```bash -pip install redis -``` - -或在 `requirements.txt` 中添加: - -``` -redis>=5.0.0 -``` - -#### 5. 验证 - -```bash -# 查看 Redis 中的指标 -redis-cli -> HGETALL metrics:request_counter - -# 查看 Exporter 输出 -curl http://localhost:8001/metrics - -# 查看 Prometheus -open http://localhost:9090 -``` - -## 性能对比 - -| 指标 | Pushgateway | Redis + Exporter | 标准 Pull | -|------|-------------|------------------|-----------| -| 写入延迟 | ~5ms | ~1ms | N/A | -| 查询延迟 | ~10ms | ~20ms | ~5ms | -| 吞吐量 | ~1000 req/s | ~10000 req/s | ~500 req/s | -| 内存占用 | 低 | 中 | 低 | -| 复杂度 | 低 | 高 | 低 | - -## 迁移步骤 - -### 从原有方案迁移到 Pushgateway - -1. **安装依赖**(如果需要): - ```bash - pip install prometheus-client - ``` - -2. **替换导入**: - ```python - # 旧代码 - from functional_scaffold.core.metrics import track_request - - # 新代码 - from functional_scaffold.core.metrics_pushgateway import track_request - ``` - -3. **配置环境变量**: - ```bash - export PUSHGATEWAY_URL=localhost:9091 - ``` - -4. **启动 Pushgateway**: - ```bash - docker-compose up -d pushgateway - ``` - -5. **更新 Prometheus 配置**(已包含在 `monitoring/prometheus.yml`) - -6. **测试验证**: - ```bash - # 发送请求 - curl -X POST http://localhost:8000/invoke -d '{"number": 17}' - - # 查看指标 - curl http://localhost:9091/metrics | grep http_requests_total - ``` - -### 从原有方案迁移到 Redis - -1. **安装依赖**: - ```bash - pip install redis - ``` - -2. **替换导入**: - ```python - # 旧代码 - from functional_scaffold.core.metrics import track_request - - # 新代码 - from functional_scaffold.core.metrics_redis import track_request - ``` - -3. **配置环境变量**: - ```bash - export REDIS_HOST=localhost - export REDIS_PORT=6379 - ``` - -4. **启动 Redis 和 Exporter**: - ```bash - docker-compose up -d redis redis-exporter - ``` - -5. **测试验证**: - ```bash - # 发送请求 - curl -X POST http://localhost:8000/invoke -d '{"number": 17}' - - # 查看 Redis - redis-cli HGETALL metrics:request_counter - - # 查看 Exporter - curl http://localhost:8001/metrics - ``` - -## 常见问题 - -### Q1: Pushgateway 会成为单点故障吗? - -A: 可以通过以下方式解决: -- 部署多个 Pushgateway 实例(负载均衡) -- 使用持久化存储(已配置) -- 推送失败时降级到本地日志 - -### Q2: Redis 方案的性能如何? - -A: Redis 单实例可以支持 10万+ QPS,对于大多数场景足够。如果需要更高性能,可以: -- 使用 Redis Cluster -- 批量写入(减少网络往返) -- 使用 Pipeline - -### Q3: 如何在 Kubernetes 中使用? - -A: -- **Pushgateway**: 部署为 Service,应用通过 Service 名称访问 -- **Redis**: 使用 StatefulSet 或托管 Redis 服务 - -### Q4: 指标数据会丢失吗? - -A: -- **Pushgateway**: 支持持久化,重启不丢失 -- **Redis**: 配置了 AOF 持久化,重启不丢失 -- **标准 Pull**: 实例销毁后丢失 - -### Q5: 如何选择方案? - -建议: -- **Serverless/短生命周期** → Pushgateway -- **超高并发/自定义逻辑** → Redis -- **长生命周期/K8s** → 标准 Pull(需配置服务发现) - -## 监控和告警 - -### Grafana 仪表板 - -访问 http://localhost:3000(admin/admin) - -已预配置的面板: -- HTTP 请求总数 -- HTTP 请求延迟(P50/P95/P99) -- 算法执行次数 -- 算法执行延迟 -- 错误率 - -### 告警规则 - -在 `monitoring/alerts/rules.yaml` 中配置: - -```yaml -groups: - - name: functional_scaffold - rules: - - alert: HighErrorRate - expr: rate(http_requests_total{status="error"}[5m]) > 0.05 - for: 5m - labels: - severity: warning - annotations: - summary: "高错误率告警" - description: "错误率超过 5%" -``` - -## 参考资料 - -- [Prometheus Pushgateway 文档](https://github.com/prometheus/pushgateway) -- [Prometheus 最佳实践](https://prometheus.io/docs/practices/) -- [Redis 官方文档](https://redis.io/documentation) diff --git a/docs/metrics-improvement-summary.md b/docs/metrics-improvement-summary.md deleted file mode 100644 index 87872fa..0000000 --- a/docs/metrics-improvement-summary.md +++ /dev/null @@ -1,227 +0,0 @@ -# Prometheus 指标记录问题修复总结 - -## 问题描述 - -Prometheus 中没有正常记录应用的访问数据。虽然 `/metrics` 端点可以访问,并且定义了所有指标类型,但这些指标都没有任何数据值。 - -## 根本原因 - -1. **HTTP 请求指标未记录**:`api/routes.py` 中的路由处理函数没有使用 `@track_request` 装饰器来记录 HTTP 请求指标 -2. **算法执行指标未记录**:`algorithms/base.py` 中的 `execute()` 方法没有调用 metrics 模块来记录算法执行指标 - -## 解决方案 - -### 1. 添加 HTTP 请求指标跟踪中间件 - -**文件**:`src/functional_scaffold/main.py` - -**修改内容**: -- 导入 metrics 相关的对象:`request_counter`, `request_latency`, `in_progress_requests` -- 添加 `track_metrics` 中间件,自动跟踪所有 HTTP 请求 - -**优点**: -- 自动化:不需要在每个路由上手动添加装饰器 -- 统一:所有端点的指标记录逻辑一致 -- 易维护:新增端点自动获得指标跟踪能力 - -**实现代码**: -```python -@app.middleware("http") -async def track_metrics(request: Request, call_next): - """记录所有HTTP请求的指标""" - if not settings.metrics_enabled: - return await call_next(request) - - # 跳过 /metrics 端点本身,避免循环记录 - if request.url.path == "/metrics": - return await call_next(request) - - in_progress_requests.inc() - start_time = time.time() - status = "success" - - try: - response = await call_next(request) - if response.status_code >= 400: - status = "error" - return response - except Exception as e: - status = "error" - raise e - finally: - elapsed = time.time() - start_time - request_counter.labels( - method=request.method, - endpoint=request.url.path, - status=status - ).inc() - request_latency.labels( - method=request.method, - endpoint=request.url.path - ).observe(elapsed) - in_progress_requests.dec() -``` - -### 2. 添加算法执行指标记录 - -**文件**:`src/functional_scaffold/algorithms/base.py` - -**修改内容**: -- 在 `execute()` 方法中导入 `algorithm_counter` 和 `algorithm_latency` -- 在 `finally` 块中记录算法执行指标 - -**实现代码**: -```python -def execute(self, *args, **kwargs) -> Dict[str, Any]: - from ..core.metrics import algorithm_counter, algorithm_latency - - start_time = time.time() - status = "success" - - try: - # ... 算法执行逻辑 ... - except Exception as e: - status = "error" - # ... 错误处理 ... - finally: - elapsed_time = time.time() - start_time - algorithm_counter.labels(algorithm=self.name, status=status).inc() - algorithm_latency.labels(algorithm=self.name).observe(elapsed_time) -``` - -## 验证结果 - -### 1. 应用 /metrics 端点 - -修复后,`/metrics` 端点正常返回指标数据: - -``` -# HTTP 请求指标 -http_requests_total{endpoint="/healthz",method="GET",status="success"} 3.0 -http_requests_total{endpoint="/invoke",method="POST",status="success"} 2.0 -http_requests_total{endpoint="/readyz",method="GET",status="success"} 1.0 - -# HTTP 请求延迟 -http_request_duration_seconds_sum{endpoint="/invoke",method="POST"} 0.0065615177154541016 -http_request_duration_seconds_count{endpoint="/invoke",method="POST"} 2.0 - -# 算法执行指标 -algorithm_executions_total{algorithm="PrimeChecker",status="success"} 2.0 -algorithm_execution_duration_seconds_sum{algorithm="PrimeChecker"} 0.00023603439331054688 -algorithm_execution_duration_seconds_count{algorithm="PrimeChecker"} 2.0 - -# 当前进行中的请求 -http_requests_in_progress 0.0 -``` - -### 2. Prometheus 查询 - -Prometheus 成功抓取并存储了指标数据: - -```bash -# 查询 HTTP 请求总数 -curl 'http://localhost:9090/api/v1/query?query=http_requests_total' - -# 查询算法执行总数 -curl 'http://localhost:9090/api/v1/query?query=algorithm_executions_total' -``` - -## 可用指标 - -修复后,以下指标可以在 Prometheus 和 Grafana 中使用: - -### HTTP 请求指标 - -1. **http_requests_total** (Counter) - - 标签:`method`, `endpoint`, `status` - - 描述:HTTP 请求总数 - - 用途:统计各端点的请求量、成功率 - -2. **http_request_duration_seconds** (Histogram) - - 标签:`method`, `endpoint` - - 描述:HTTP 请求延迟分布 - - 用途:分析请求响应时间、P50/P95/P99 延迟 - -3. **http_requests_in_progress** (Gauge) - - 描述:当前正在处理的请求数 - - 用途:监控并发请求数、负载情况 - -### 算法执行指标 - -1. **algorithm_executions_total** (Counter) - - 标签:`algorithm`, `status` - - 描述:算法执行总数 - - 用途:统计算法调用量、成功率 - -2. **algorithm_execution_duration_seconds** (Histogram) - - 标签:`algorithm` - - 描述:算法执行延迟分布 - - 用途:分析算法性能、优化瓶颈 - -## 使用示例 - -### Prometheus 查询示例 - -```promql -# 每秒请求数 (QPS) -rate(http_requests_total[5m]) - -# 请求成功率 -sum(rate(http_requests_total{status="success"}[5m])) / sum(rate(http_requests_total[5m])) - -# P95 延迟 -histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m])) - -# 算法执行失败率 -sum(rate(algorithm_executions_total{status="error"}[5m])) / sum(rate(algorithm_executions_total[5m])) -``` - -### 生成测试流量 - -使用提供的脚本生成测试流量: - -```bash -# 启动流量生成器 -./scripts/generate_traffic.sh - -# 在另一个终端查看实时指标 -watch -n 1 'curl -s http://localhost:8111/metrics | grep http_requests_total' -``` - -## Grafana 仪表板 - -访问 Grafana 查看可视化指标: - -1. 打开浏览器访问:http://localhost:3000 -2. 登录(默认用户名/密码:admin/admin) -3. 导入仪表板:`monitoring/grafana/dashboard.json` - -仪表板包含以下面板: -- 请求速率(QPS) -- 请求延迟(P50/P95/P99) -- 错误率 -- 算法执行统计 -- 并发请求数 - -## 注意事项 - -1. **中间件顺序**:指标跟踪中间件应该在日志中间件之后注册,确保所有请求都被记录 -2. **/metrics 端点**:中间件会跳过 `/metrics` 端点本身,避免循环记录 -3. **错误状态**:HTTP 状态码 >= 400 会被标记为 `status="error"` -4. **性能影响**:指标记录的性能开销极小(微秒级),不会影响应用性能 - -## 后续优化建议 - -1. **添加更多维度**:可以添加 `user_id`、`region` 等标签进行更细粒度的分析 -2. **自定义指标**:根据业务需求添加自定义指标(如缓存命中率、外部 API 调用次数等) -3. **告警规则**:配置 Prometheus 告警规则,在指标异常时发送通知 -4. **长期存储**:考虑使用 Thanos 或 Cortex 进行长期指标存储和查询 - -## 相关文件 - -- `src/functional_scaffold/main.py` - HTTP 请求指标跟踪中间件 -- `src/functional_scaffold/algorithms/base.py` - 算法执行指标记录 -- `src/functional_scaffold/core/metrics.py` - 指标定义 -- `monitoring/prometheus.yml` - Prometheus 配置 -- `monitoring/grafana/dashboard.json` - Grafana 仪表板 -- `scripts/generate_traffic.sh` - 流量生成脚本 diff --git a/docs/monitoring.md b/docs/monitoring.md new file mode 100644 index 0000000..a7838e4 --- /dev/null +++ b/docs/monitoring.md @@ -0,0 +1,337 @@ +# 监控指南 + +本文档介绍 FunctionalScaffold 的监控体系,包括指标收集、可视化和告警配置。 + +## 监控架构 + +``` +┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ +│ 应用实例 1 │ │ 应用实例 2 │ │ 应用实例 N │ +│ /metrics 端点 │ │ /metrics 端点 │ │ /metrics 端点 │ +└────────┬────────┘ └────────┬────────┘ └────────┬────────┘ + │ │ │ + │ 写入指标到 Redis │ │ + └───────────────────────┼───────────────────────┘ + │ + ▼ + ┌─────────────────────────┐ + │ Redis │ + │ (指标聚合存储) │ + └────────────┬────────────┘ + │ + │ 读取并导出 + ▼ + ┌─────────────────────────┐ + │ Prometheus │ + │ (抓取 /metrics) │ + └────────────┬────────────┘ + │ + │ 查询 + ▼ + ┌─────────────────────────┐ + │ Grafana │ + │ (可视化展示) │ + └─────────────────────────┘ +``` + +## 快速开始 + +### 启动监控服务 + +```bash +cd deployment +docker-compose up -d redis prometheus grafana +``` + +### 访问地址 + +| 服务 | 地址 | 默认账号 | +|------|------|---------| +| 应用 Metrics | http://localhost:8000/metrics | - | +| Prometheus | http://localhost:9090 | - | +| Grafana | http://localhost:3000 | admin/admin | + +## 指标说明 + +### HTTP 请求指标 + +| 指标 | 类型 | 标签 | 描述 | +|------|------|------|------| +| `http_requests_total` | Counter | method, endpoint, status | HTTP 请求总数 | +| `http_request_duration_seconds` | Histogram | method, endpoint | HTTP 请求延迟分布 | +| `http_requests_in_progress` | Gauge | - | 当前进行中的请求数 | + +### 算法执行指标 + +| 指标 | 类型 | 标签 | 描述 | +|------|------|------|------| +| `algorithm_executions_total` | Counter | algorithm, status | 算法执行总数 | +| `algorithm_execution_duration_seconds` | Histogram | algorithm | 算法执行延迟分布 | + +### 异步任务指标 + +| 指标 | 类型 | 标签 | 描述 | +|------|------|------|------| +| `jobs_created_total` | Counter | algorithm | 创建的任务总数 | +| `jobs_completed_total` | Counter | algorithm, status | 完成的任务总数 | +| `job_execution_duration_seconds` | Histogram | algorithm | 任务执行时间分布 | +| `webhook_deliveries_total` | Counter | status | Webhook 发送总数 | + +## Prometheus 查询示例 + +### 基础查询 + +```promql +# 每秒请求数 (QPS) +rate(http_requests_total[5m]) + +# 按端点分组的 QPS +sum(rate(http_requests_total[5m])) by (endpoint) + +# 请求成功率 +sum(rate(http_requests_total{status="success"}[5m])) +/ sum(rate(http_requests_total[5m])) + +# 当前并发请求数 +http_requests_in_progress +``` + +### 延迟分析 + +```promql +# P50 延迟 +histogram_quantile(0.50, rate(http_request_duration_seconds_bucket[5m])) + +# P95 延迟 +histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m])) + +# P99 延迟 +histogram_quantile(0.99, rate(http_request_duration_seconds_bucket[5m])) + +# 平均延迟 +rate(http_request_duration_seconds_sum[5m]) +/ rate(http_request_duration_seconds_count[5m]) +``` + +### 算法分析 + +```promql +# 算法执行速率 +sum(rate(algorithm_executions_total[5m])) by (algorithm) + +# 算法失败率 +sum(rate(algorithm_executions_total{status="error"}[5m])) +/ sum(rate(algorithm_executions_total[5m])) + +# 算法 P95 延迟 +histogram_quantile(0.95, + sum(rate(algorithm_execution_duration_seconds_bucket[5m])) by (le, algorithm) +) +``` + +### 异步任务分析 + +```promql +# 任务创建速率 +sum(rate(jobs_created_total[5m])) by (algorithm) + +# 任务成功率 +sum(rate(jobs_completed_total{status="completed"}[5m])) +/ sum(rate(jobs_completed_total[5m])) + +# 任务积压(创建速率 - 完成速率) +sum(rate(jobs_created_total[5m])) - sum(rate(jobs_completed_total[5m])) + +# Webhook 成功率 +sum(rate(webhook_deliveries_total{status="success"}[5m])) +/ sum(rate(webhook_deliveries_total[5m])) +``` + +## Grafana 仪表板 + +### 导入仪表板 + +1. 打开 Grafana: http://localhost:3000 +2. 登录(admin/admin) +3. 进入 **Dashboards** → **Import** +4. 上传文件:`monitoring/grafana/dashboard.json` +5. 选择 Prometheus 数据源 +6. 点击 **Import** + +### 仪表板面板 + +#### HTTP 监控区域 +- **HTTP 请求速率 (QPS)** - 每秒请求数趋势 +- **HTTP 请求延迟** - P50/P95/P99 延迟趋势 +- **请求成功率** - 成功率仪表盘 +- **当前并发请求数** - 实时并发数 +- **HTTP 请求总数** - 累计请求数 +- **请求分布** - 按端点/状态的饼图 + +#### 算法监控区域 +- **算法执行速率** - 每秒执行次数 +- **算法执行延迟** - P50/P95/P99 延迟 +- **算法执行总数** - 累计执行数 + +#### 异步任务监控区域 +- **任务创建总数** - 累计创建的任务数 +- **任务完成总数** - 累计完成的任务数 +- **任务失败总数** - 累计失败的任务数 +- **任务成功率** - 成功率仪表盘 +- **异步任务速率** - 创建和完成速率趋势 +- **异步任务执行延迟** - P50/P95/P99 延迟 +- **任务状态分布** - 按状态的饼图 +- **Webhook 发送状态** - 成功/失败分布 + +## 告警配置 + +### 告警规则 + +告警规则定义在 `monitoring/alerts/rules.yaml`: + +```yaml +groups: + - name: functional_scaffold_alerts + interval: 30s + rules: + # 高错误率告警 + - alert: HighErrorRate + expr: rate(http_requests_total{status="error"}[5m]) > 0.05 + for: 5m + labels: + severity: warning + annotations: + summary: "检测到高错误率" + description: "端点 {{ $labels.endpoint }} 的错误率为 {{ $value }} 请求/秒" + + # 高延迟告警 + - alert: HighLatency + expr: histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m])) > 1 + for: 5m + labels: + severity: warning + annotations: + summary: "检测到高延迟" + description: "端点 {{ $labels.endpoint }} 的 P95 延迟为 {{ $value }}s" + + # 服务不可用告警 + - alert: ServiceDown + expr: up{job="functional-scaffold"} == 0 + for: 1m + labels: + severity: critical + annotations: + summary: "服务不可用" + description: "FunctionalScaffold 服务已停止超过 1 分钟" + + # 异步任务失败率告警 + - alert: HighJobFailureRate + expr: rate(jobs_completed_total{status="failed"}[5m]) / rate(jobs_completed_total[5m]) > 0.1 + for: 5m + labels: + severity: warning + annotations: + summary: "异步任务失败率过高" + description: "算法 {{ $labels.algorithm }} 的异步任务失败率超过 10%" + + # 任务积压告警 + - alert: JobBacklog + expr: sum(rate(jobs_created_total[5m])) - sum(rate(jobs_completed_total[5m])) > 10 + for: 10m + labels: + severity: warning + annotations: + summary: "异步任务积压" + description: "任务创建速率超过完成速率,可能存在积压" +``` + +### 告警级别 + +| 级别 | 描述 | 响应时间 | +|------|------|---------| +| critical | 严重告警,服务不可用 | 立即响应 | +| warning | 警告,性能下降或异常 | 1 小时内响应 | +| info | 信息,需要关注 | 工作时间内响应 | + +## 自定义指标 + +### 添加新指标 + +1. 在 `config/metrics.yaml` 中定义: + +```yaml +custom_metrics: + my_custom_counter: + name: "my_custom_counter" + type: counter + description: "我的自定义计数器" + labels: [label1, label2] + + my_custom_histogram: + name: "my_custom_histogram" + type: histogram + description: "我的自定义直方图" + labels: [label1] + buckets: [0.1, 0.5, 1, 5, 10] +``` + +2. 在代码中使用: + +```python +from functional_scaffold.core.metrics_unified import incr, observe + +# 增加计数器 +incr("my_custom_counter", {"label1": "value1", "label2": "value2"}) + +# 记录直方图 +observe("my_custom_histogram", {"label1": "value1"}, 0.5) +``` + +## 故障排查 + +### 指标不显示 + +1. 检查应用 metrics 端点: + ```bash + curl http://localhost:8000/metrics + ``` + +2. 检查 Redis 连接: + ```bash + redis-cli ping + ``` + +3. 检查 Prometheus 抓取状态: + - 访问 http://localhost:9090/targets + - 确认 functional-scaffold 目标状态为 UP + +### Grafana 无数据 + +1. 检查数据源配置: + - URL 应为 `http://prometheus:9090`(容器内部) + - 不是 `http://localhost:9090` + +2. 检查时间范围: + - 确保选择了正确的时间范围 + - 尝试 "Last 5 minutes" + +3. 生成测试流量: + ```bash + ./scripts/generate_traffic.sh + ``` + +### 告警不触发 + +1. 检查 Prometheus 规则加载: + - 访问 http://localhost:9090/rules + - 确认规则已加载 + +2. 检查告警状态: + - 访问 http://localhost:9090/alerts + - 查看告警是否处于 pending 或 firing 状态 + +## 参考资料 + +- [Prometheus 文档](https://prometheus.io/docs/) +- [Grafana 文档](https://grafana.com/docs/) +- [PromQL 查询语言](https://prometheus.io/docs/prometheus/latest/querying/basics/) diff --git a/docs/swagger/README.md b/docs/swagger/README.md deleted file mode 100644 index e2980c5..0000000 --- a/docs/swagger/README.md +++ /dev/null @@ -1,107 +0,0 @@ -# Swagger 文档 - -本目录包含自动生成的 OpenAPI 规范文档。 - -## 生成文档 - -运行以下命令生成或更新 OpenAPI 规范: - -```bash -python scripts/export_openapi.py -``` - -这将生成 `openapi.json` 文件,包含完整的 API 规范。 - -## 查看文档 - -### 在线查看 - -启动应用后,访问以下 URL: - -- **Swagger UI**: http://localhost:8000/docs -- **ReDoc**: http://localhost:8000/redoc - -### 离线查看 - -使用 Swagger Editor 或其他 OpenAPI 工具打开 `openapi.json` 文件。 - -## API 规范 - -### 端点列表 - -#### 算法接口 - -- `POST /invoke` - 同步调用算法 - - 请求体: `{"number": integer}` - - 响应: 算法执行结果 - -- `POST /jobs` - 异步任务接口(预留) - - 当前返回 501 Not Implemented - -#### 健康检查 - -- `GET /healthz` - 存活检查 - - 响应: `{"status": "healthy", "timestamp": float}` - -- `GET /readyz` - 就绪检查 - - 响应: `{"status": "ready", "timestamp": float, "checks": {...}}` - -#### 监控 - -- `GET /metrics` - Prometheus 指标 - - 响应: Prometheus 文本格式 - -### 数据模型 - -#### InvokeRequest - -```json -{ - "number": 17 -} -``` - -#### InvokeResponse - -```json -{ - "request_id": "uuid", - "status": "success", - "result": { - "number": 17, - "is_prime": true, - "factors": [], - "algorithm": "trial_division" - }, - "metadata": { - "algorithm": "PrimeChecker", - "version": "1.0.0", - "elapsed_time": 0.001 - } -} -``` - -#### ErrorResponse - -```json -{ - "error": "ERROR_CODE", - "message": "Error description", - "details": {}, - "request_id": "uuid" -} -``` - -## 更新文档 - -当修改 API 接口后,需要重新生成文档: - -1. 修改代码(路由、模型等) -2. 运行 `python scripts/export_openapi.py` -3. 提交更新后的 `openapi.json` - -## 注意事项 - -- `openapi.json` 是自动生成的,不要手动编辑 -- 所有 API 变更都应该在代码中完成,然后重新生成文档 -- 确保 Pydantic 模型包含完整的文档字符串和示例 diff --git a/monitoring/alerts/rules.yaml b/monitoring/alerts/rules.yaml index f22862c..1dceeba 100644 --- a/monitoring/alerts/rules.yaml +++ b/monitoring/alerts/rules.yaml @@ -51,3 +51,43 @@ groups: annotations: summary: "算法执行延迟过高" description: "算法 {{ $labels.algorithm }} 的 P95 延迟为 {{ $value }}s" + + # 异步任务失败率告警 + - alert: HighJobFailureRate + expr: rate(jobs_completed_total{status="failed"}[5m]) / rate(jobs_completed_total[5m]) > 0.1 + for: 5m + labels: + severity: warning + annotations: + summary: "异步任务失败率过高" + description: "算法 {{ $labels.algorithm }} 的异步任务失败率超过 10%" + + # 异步任务执行延迟告警 + - alert: HighJobLatency + expr: histogram_quantile(0.95, rate(job_execution_duration_seconds_bucket[5m])) > 60 + for: 5m + labels: + severity: warning + annotations: + summary: "异步任务执行延迟过高" + description: "算法 {{ $labels.algorithm }} 的异步任务 P95 延迟为 {{ $value }}s" + + # 异步任务积压告警 + - alert: JobBacklog + expr: sum(rate(jobs_created_total[5m])) - sum(rate(jobs_completed_total[5m])) > 10 + for: 10m + labels: + severity: warning + annotations: + summary: "异步任务积压" + description: "任务创建速率超过完成速率,可能存在积压" + + # Webhook 发送失败率告警 + - alert: HighWebhookFailureRate + expr: rate(webhook_deliveries_total{status="failed"}[5m]) / rate(webhook_deliveries_total[5m]) > 0.2 + for: 5m + labels: + severity: warning + annotations: + summary: "Webhook 发送失败率过高" + description: "Webhook 发送失败率超过 20%" diff --git a/monitoring/grafana/dashboard.json b/monitoring/grafana/dashboard.json index 9a5716d..748e211 100644 --- a/monitoring/grafana/dashboard.json +++ b/monitoring/grafana/dashboard.json @@ -765,6 +765,636 @@ ], "title": "请求状态分布", "type": "piechart" + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 32 + }, + "id": 100, + "panels": [], + "title": "异步任务监控", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 6, + "x": 0, + "y": 33 + }, + "id": 11, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "values": false, + "calcs": ["lastNotNull"], + "fields": "" + }, + "textMode": "auto" + }, + "pluginVersion": "9.0.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "expr": "sum(jobs_created_total)", + "refId": "A" + } + ], + "title": "任务创建总数", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 6, + "x": 6, + "y": 33 + }, + "id": 12, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "values": false, + "calcs": ["lastNotNull"], + "fields": "" + }, + "textMode": "auto" + }, + "pluginVersion": "9.0.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "expr": "sum(jobs_completed_total{status=\"completed\"})", + "refId": "A" + } + ], + "title": "任务完成总数", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "red", + "value": null + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 6, + "x": 12, + "y": 33 + }, + "id": 13, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "values": false, + "calcs": ["lastNotNull"], + "fields": "" + }, + "textMode": "auto" + }, + "pluginVersion": "9.0.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "expr": "sum(jobs_completed_total{status=\"failed\"})", + "refId": "A" + } + ], + "title": "任务失败总数", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "max": 1, + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "red", + "value": null + }, + { + "color": "yellow", + "value": 0.9 + }, + { + "color": "green", + "value": 0.95 + } + ] + }, + "unit": "percentunit" + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 6, + "x": 18, + "y": 33 + }, + "id": 14, + "options": { + "orientation": "auto", + "reduceOptions": { + "values": false, + "calcs": ["lastNotNull"], + "fields": "" + }, + "showThresholdLabels": false, + "showThresholdMarkers": true + }, + "pluginVersion": "9.0.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "expr": "sum(rate(jobs_completed_total{status=\"completed\"}[5m])) / sum(rate(jobs_completed_total[5m]))", + "refId": "A" + } + ], + "title": "任务成功率", + "type": "gauge" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "任务/秒", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 20, + "gradientMode": "opacity", + "hideFrom": { + "tooltip": false, + "viz": false, + "legend": false + }, + "lineInterpolation": "smooth", + "lineWidth": 2, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": true, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "ops" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 37 + }, + "id": 15, + "options": { + "legend": { + "calcs": ["mean", "lastNotNull"], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "expr": "sum(rate(jobs_created_total[1m])) by (algorithm)", + "legendFormat": "创建 - {{algorithm}}", + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "expr": "sum(rate(jobs_completed_total[1m])) by (algorithm, status)", + "legendFormat": "完成 - {{algorithm}} ({{status}})", + "refId": "B" + } + ], + "title": "异步任务速率", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "延迟", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "tooltip": false, + "viz": false, + "legend": false + }, + "lineInterpolation": "smooth", + "lineWidth": 2, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": true, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 5 + }, + { + "color": "red", + "value": 30 + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 37 + }, + "id": 16, + "options": { + "legend": { + "calcs": ["mean", "max"], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "expr": "histogram_quantile(0.50, sum(rate(job_execution_duration_seconds_bucket[1m])) by (le, algorithm))", + "legendFormat": "P50 - {{algorithm}}", + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "expr": "histogram_quantile(0.95, sum(rate(job_execution_duration_seconds_bucket[1m])) by (le, algorithm))", + "legendFormat": "P95 - {{algorithm}}", + "refId": "B" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "expr": "histogram_quantile(0.99, sum(rate(job_execution_duration_seconds_bucket[1m])) by (le, algorithm))", + "legendFormat": "P99 - {{algorithm}}", + "refId": "C" + } + ], + "title": "异步任务执行延迟 (P50/P95/P99)", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "hideFrom": { + "tooltip": false, + "viz": false, + "legend": false + } + }, + "mappings": [] + }, + "overrides": [ + { + "matcher": { + "id": "byName", + "options": "success" + }, + "properties": [ + { + "id": "color", + "value": { + "fixedColor": "green", + "mode": "fixed" + } + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "failed" + }, + "properties": [ + { + "id": "color", + "value": { + "fixedColor": "red", + "mode": "fixed" + } + } + ] + } + ] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 45 + }, + "id": 17, + "options": { + "legend": { + "displayMode": "table", + "placement": "right", + "showLegend": true, + "values": ["value", "percent"] + }, + "pieType": "donut", + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "expr": "sum(jobs_completed_total) by (status)", + "legendFormat": "{{status}}", + "refId": "A" + } + ], + "title": "任务状态分布", + "type": "piechart" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "hideFrom": { + "tooltip": false, + "viz": false, + "legend": false + } + }, + "mappings": [] + }, + "overrides": [ + { + "matcher": { + "id": "byName", + "options": "success" + }, + "properties": [ + { + "id": "color", + "value": { + "fixedColor": "green", + "mode": "fixed" + } + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "failed" + }, + "properties": [ + { + "id": "color", + "value": { + "fixedColor": "red", + "mode": "fixed" + } + } + ] + } + ] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 45 + }, + "id": 18, + "options": { + "legend": { + "displayMode": "table", + "placement": "right", + "showLegend": true, + "values": ["value", "percent"] + }, + "pieType": "donut", + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "expr": "sum(webhook_deliveries_total) by (status)", + "legendFormat": "{{status}}", + "refId": "A" + } + ], + "title": "Webhook 发送状态", + "type": "piechart" } ], "refresh": "5s", diff --git a/requirements.txt b/requirements.txt index 6f07778..2da63da 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,3 +11,6 @@ redis>=5.0.0 # YAML 配置解析 pyyaml>=6.0.0 + +# HTTP 客户端(用于 Webhook 回调) +httpx>=0.27.0 diff --git a/src/functional_scaffold/algorithms/prime_checker.py b/src/functional_scaffold/algorithms/prime_checker.py index 0f0aebc..f3ada5d 100644 --- a/src/functional_scaffold/algorithms/prime_checker.py +++ b/src/functional_scaffold/algorithms/prime_checker.py @@ -2,6 +2,7 @@ from typing import Dict, Any, List from .base import BaseAlgorithm +from ..core.metrics_unified import incr class PrimeChecker(BaseAlgorithm): @@ -30,10 +31,12 @@ class PrimeChecker(BaseAlgorithm): ValueError: 如果输入不是整数 """ if not isinstance(number, int): + incr('prime_check',{"status":"invalid_input"}) raise ValueError(f"Input must be an integer, got {type(number).__name__}") # 小于2的数不是质数 if number < 2: + incr('prime_check', {"status": "number_little_two"}) return { "number": number, "is_prime": False, @@ -47,7 +50,7 @@ class PrimeChecker(BaseAlgorithm): # 如果不是质数,计算因数 factors = [] if is_prime else self._get_factors(number) - + incr('prime_check', {"status": "success"}) return { "number": number, "is_prime": is_prime, diff --git a/src/functional_scaffold/api/models.py b/src/functional_scaffold/api/models.py index 633ff0f..faf89ab 100644 --- a/src/functional_scaffold/api/models.py +++ b/src/functional_scaffold/api/models.py @@ -1,5 +1,6 @@ """API 数据模型""" +from enum import Enum from pydantic import BaseModel, Field, ConfigDict from typing import Any, Dict, Optional @@ -7,13 +8,7 @@ from typing import Any, Dict, Optional class InvokeRequest(BaseModel): """同步调用请求""" - model_config = ConfigDict( - json_schema_extra={ - "example": { - "number": 17 - } - } - ) + model_config = ConfigDict(json_schema_extra={"example": {"number": 17}}) number: int = Field(..., description="待判断的整数") @@ -30,13 +25,13 @@ class InvokeResponse(BaseModel): "number": 17, "is_prime": True, "factors": [], - "algorithm": "trial_division" + "algorithm": "trial_division", }, "metadata": { "algorithm": "PrimeChecker", "version": "1.0.0", - "elapsed_time": 0.001 - } + "elapsed_time": 0.001, + }, } } ) @@ -71,7 +66,7 @@ class ErrorResponse(BaseModel): "error": "VALIDATION_ERROR", "message": "number must be an integer", "details": {"field": "number", "value": "abc"}, - "request_id": "550e8400-e29b-41d4-a716-446655440000" + "request_id": "550e8400-e29b-41d4-a716-446655440000", } } ) @@ -80,3 +75,80 @@ class ErrorResponse(BaseModel): message: str = Field(..., description="错误消息") details: Optional[Dict[str, Any]] = Field(None, description="错误详情") request_id: Optional[str] = Field(None, description="请求ID") + + +class JobStatus(str, Enum): + """任务状态枚举""" + + PENDING = "pending" # 等待执行 + RUNNING = "running" # 执行中 + COMPLETED = "completed" # 已完成 + FAILED = "failed" # 执行失败 + + +class JobRequest(BaseModel): + """异步任务请求""" + + model_config = ConfigDict( + json_schema_extra={ + "example": { + "algorithm": "PrimeChecker", + "params": {"number": 17}, + "webhook": "https://example.com/callback", + } + } + ) + + algorithm: str = Field(..., description="算法名称") + params: Dict[str, Any] = Field(..., description="算法参数") + webhook: Optional[str] = Field(None, description="回调 URL") + + +class JobCreateResponse(BaseModel): + """任务创建响应""" + + model_config = ConfigDict( + json_schema_extra={ + "example": { + "job_id": "a1b2c3d4e5f6", + "status": "pending", + "message": "任务已创建", + "created_at": "2026-02-02T10:00:00Z", + } + } + ) + + job_id: str = Field(..., description="任务唯一标识") + status: JobStatus = Field(..., description="任务状态") + message: str = Field(..., description="状态消息") + created_at: str = Field(..., description="创建时间(ISO 8601)") + + +class JobStatusResponse(BaseModel): + """任务状态查询响应""" + + model_config = ConfigDict( + json_schema_extra={ + "example": { + "job_id": "a1b2c3d4e5f6", + "status": "completed", + "algorithm": "PrimeChecker", + "created_at": "2026-02-02T10:00:00Z", + "started_at": "2026-02-02T10:00:01Z", + "completed_at": "2026-02-02T10:00:02Z", + "result": {"number": 17, "is_prime": True}, + "error": None, + "metadata": {"elapsed_time": 0.001}, + } + } + ) + + job_id: str = Field(..., description="任务唯一标识") + status: JobStatus = Field(..., description="任务状态") + algorithm: str = Field(..., description="算法名称") + created_at: str = Field(..., description="创建时间(ISO 8601)") + started_at: Optional[str] = Field(None, description="开始执行时间(ISO 8601)") + completed_at: Optional[str] = Field(None, description="完成时间(ISO 8601)") + result: Optional[Dict[str, Any]] = Field(None, description="执行结果(仅完成时返回)") + error: Optional[str] = Field(None, description="错误信息(仅失败时返回)") + metadata: Optional[Dict[str, Any]] = Field(None, description="元数据信息") diff --git a/src/functional_scaffold/api/routes.py b/src/functional_scaffold/api/routes.py index 5a0697d..d10c762 100644 --- a/src/functional_scaffold/api/routes.py +++ b/src/functional_scaffold/api/routes.py @@ -1,7 +1,7 @@ """API 路由""" +import asyncio from fastapi import APIRouter, HTTPException, Depends, status -from fastapi.responses import JSONResponse import time import logging @@ -11,10 +11,15 @@ from .models import ( HealthResponse, ReadinessResponse, ErrorResponse, + JobRequest, + JobCreateResponse, + JobStatusResponse, + JobStatus, ) from .dependencies import get_request_id from ..algorithms.prime_checker import PrimeChecker -from ..core.errors import FunctionalScaffoldError, ValidationError, AlgorithmError +from ..core.errors import ValidationError, AlgorithmError +from ..core.job_manager import get_job_manager logger = logging.getLogger(__name__) @@ -134,17 +139,156 @@ async def readiness_check(): @router.post( "/jobs", - status_code=status.HTTP_501_NOT_IMPLEMENTED, - summary="异步任务接口(预留)", - description="异步任务接口,当前版本未实现", + response_model=JobCreateResponse, + status_code=status.HTTP_202_ACCEPTED, + summary="创建异步任务", + description="创建异步任务,立即返回任务 ID,任务在后台执行", + responses={ + 202: {"description": "任务已创建", "model": JobCreateResponse}, + 400: {"description": "请求参数错误", "model": ErrorResponse}, + 404: {"description": "算法不存在", "model": ErrorResponse}, + 503: {"description": "服务不可用", "model": ErrorResponse}, + }, ) -async def create_job(): +async def create_job( + request: JobRequest, + request_id: str = Depends(get_request_id), +): """ - 异步任务接口(预留) + 创建异步任务 - 用于提交长时间运行的任务 + - **algorithm**: 算法名称(如 PrimeChecker) + - **params**: 算法参数 + - **webhook**: 任务完成后的回调 URL(可选) """ - raise HTTPException( - status_code=status.HTTP_501_NOT_IMPLEMENTED, - detail={"error": "NOT_IMPLEMENTED", "message": "Async jobs not implemented yet"}, - ) + try: + job_manager = await get_job_manager() + + # 检查任务管理器是否可用 + if not job_manager.is_available(): + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail={ + "error": "SERVICE_UNAVAILABLE", + "message": "任务服务暂不可用,请稍后重试", + "request_id": request_id, + }, + ) + + # 验证算法存在 + available_algorithms = job_manager.get_available_algorithms() + if request.algorithm not in available_algorithms: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail={ + "error": "ALGORITHM_NOT_FOUND", + "message": f"算法 '{request.algorithm}' 不存在", + "details": {"available_algorithms": available_algorithms}, + "request_id": request_id, + }, + ) + + # 创建任务 + job_id = await job_manager.create_job( + algorithm=request.algorithm, + params=request.params, + webhook=request.webhook, + request_id=request_id, + ) + + # 获取任务信息 + job_data = await job_manager.get_job(job_id) + + # 后台执行任务 + asyncio.create_task(job_manager.execute_job(job_id)) + + logger.info(f"异步任务已创建: job_id={job_id}, request_id={request_id}") + + return JobCreateResponse( + job_id=job_id, + status=JobStatus.PENDING, + message="任务已创建", + created_at=job_data["created_at"], + ) + + except HTTPException: + raise + + except Exception as e: + logger.error(f"创建任务失败: {str(e)}", exc_info=True) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail={ + "error": "INTERNAL_ERROR", + "message": str(e), + "request_id": request_id, + }, + ) + + +@router.get( + "/jobs/{job_id}", + response_model=JobStatusResponse, + summary="查询任务状态", + description="查询异步任务的执行状态和结果", + responses={ + 200: {"description": "成功", "model": JobStatusResponse}, + 404: {"description": "任务不存在或已过期", "model": ErrorResponse}, + 503: {"description": "服务不可用", "model": ErrorResponse}, + }, +) +async def get_job_status(job_id: str): + """ + 查询任务状态 + + - **job_id**: 任务唯一标识 + """ + try: + job_manager = await get_job_manager() + + # 检查任务管理器是否可用 + if not job_manager.is_available(): + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail={ + "error": "SERVICE_UNAVAILABLE", + "message": "任务服务暂不可用,请稍后重试", + }, + ) + + # 获取任务信息 + job_data = await job_manager.get_job(job_id) + + if not job_data: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail={ + "error": "JOB_NOT_FOUND", + "message": f"任务 '{job_id}' 不存在或已过期", + }, + ) + + return JobStatusResponse( + job_id=job_data["job_id"], + status=JobStatus(job_data["status"]), + algorithm=job_data["algorithm"], + created_at=job_data["created_at"], + started_at=job_data["started_at"], + completed_at=job_data["completed_at"], + result=job_data["result"], + error=job_data["error"], + metadata=job_data["metadata"], + ) + + except HTTPException: + raise + + except Exception as e: + logger.error(f"查询任务状态失败: {str(e)}", exc_info=True) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail={ + "error": "INTERNAL_ERROR", + "message": str(e), + }, + ) diff --git a/src/functional_scaffold/config.py b/src/functional_scaffold/config.py index b21dfc4..5f89561 100644 --- a/src/functional_scaffold/config.py +++ b/src/functional_scaffold/config.py @@ -49,6 +49,11 @@ class Settings(BaseSettings): metrics_config_path: str = "config/metrics.yaml" metrics_instance_id: Optional[str] = None # 默认使用 hostname + # 异步任务配置 + job_result_ttl: int = 1800 # 结果缓存时间(秒),默认 30 分钟 + webhook_max_retries: int = 3 # Webhook 最大重试次数 + webhook_timeout: int = 10 # Webhook 超时时间(秒) + # 全局配置实例 settings = Settings() diff --git a/src/functional_scaffold/core/job_manager.py b/src/functional_scaffold/core/job_manager.py new file mode 100644 index 0000000..6fc89f8 --- /dev/null +++ b/src/functional_scaffold/core/job_manager.py @@ -0,0 +1,381 @@ +"""异步任务管理模块 + +基于 Redis 的异步任务管理,支持任务创建、执行、状态查询和 Webhook 回调。 +""" + +import asyncio +import json +import logging +import secrets +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional, Type + +import httpx +import redis.asyncio as aioredis + +from ..algorithms.base import BaseAlgorithm +from ..config import settings +from ..core.metrics_unified import incr, observe + +logger = logging.getLogger(__name__) + + +class JobManager: + """异步任务管理器""" + + def __init__(self): + self._redis_client: Optional[aioredis.Redis] = None + self._algorithm_registry: Dict[str, Type[BaseAlgorithm]] = {} + self._http_client: Optional[httpx.AsyncClient] = None + + async def initialize(self) -> None: + """初始化 Redis 连接和 HTTP 客户端""" + # 初始化 Redis 异步连接 + try: + self._redis_client = aioredis.Redis( + host=settings.redis_host, + port=settings.redis_port, + db=settings.redis_db, + password=settings.redis_password if settings.redis_password else None, + decode_responses=True, + socket_connect_timeout=5, + socket_timeout=5, + ) + # 测试连接 + await self._redis_client.ping() + logger.info(f"任务管理器 Redis 连接成功: {settings.redis_host}:{settings.redis_port}") + except Exception as e: + logger.error(f"任务管理器 Redis 连接失败: {e}") + self._redis_client = None + + # 初始化 HTTP 客户端 + self._http_client = httpx.AsyncClient(timeout=settings.webhook_timeout) + + # 注册算法 + self._register_algorithms() + + async def shutdown(self) -> None: + """关闭连接""" + if self._redis_client: + await self._redis_client.close() + logger.info("任务管理器 Redis 连接已关闭") + + if self._http_client: + await self._http_client.aclose() + logger.info("任务管理器 HTTP 客户端已关闭") + + def _register_algorithms(self) -> None: + """注册可用的算法类""" + from ..algorithms import __all__ as algorithm_names + from .. import algorithms as algorithms_module + + for name in algorithm_names: + cls = getattr(algorithms_module, name, None) + if cls and isinstance(cls, type) and issubclass(cls, BaseAlgorithm): + if cls is not BaseAlgorithm: + self._algorithm_registry[name] = cls + logger.debug(f"已注册算法: {name}") + + logger.info(f"已注册 {len(self._algorithm_registry)} 个算法") + + def get_available_algorithms(self) -> List[str]: + """获取可用算法列表""" + return list(self._algorithm_registry.keys()) + + def _generate_job_id(self) -> str: + """生成 12 位十六进制任务 ID""" + return secrets.token_hex(6) + + def _get_timestamp(self) -> str: + """获取 ISO 8601 格式时间戳""" + return datetime.now(timezone.utc).isoformat() + + async def create_job( + self, + algorithm: str, + params: Dict[str, Any], + webhook: Optional[str] = None, + request_id: Optional[str] = None, + ) -> str: + """创建新任务,返回 job_id + + Args: + algorithm: 算法名称 + params: 算法参数 + webhook: 回调 URL(可选) + request_id: 关联的请求 ID(可选) + + Returns: + str: 任务 ID + + Raises: + RuntimeError: Redis 不可用时抛出 + ValueError: 算法不存在时抛出 + """ + if not self._redis_client: + raise RuntimeError("Redis 不可用,无法创建任务") + + if algorithm not in self._algorithm_registry: + raise ValueError(f"算法 '{algorithm}' 不存在") + + job_id = self._generate_job_id() + created_at = self._get_timestamp() + + # 构建任务数据 + job_data = { + "status": "pending", + "algorithm": algorithm, + "params": json.dumps(params), + "webhook": webhook or "", + "request_id": request_id or "", + "created_at": created_at, + "started_at": "", + "completed_at": "", + "result": "", + "error": "", + "metadata": "", + } + + # 存储到 Redis + key = f"job:{job_id}" + await self._redis_client.hset(key, mapping=job_data) + + # 记录指标 + incr("jobs_created_total", {"algorithm": algorithm}) + + logger.info(f"任务已创建: job_id={job_id}, algorithm={algorithm}") + return job_id + + async def get_job(self, job_id: str) -> Optional[Dict[str, Any]]: + """获取任务信息 + + Args: + job_id: 任务 ID + + Returns: + 任务信息字典,不存在时返回 None + """ + if not self._redis_client: + return None + + key = f"job:{job_id}" + job_data = await self._redis_client.hgetall(key) + + if not job_data: + return None + + # 解析 JSON 字段 + result = { + "job_id": job_id, + "status": job_data.get("status", ""), + "algorithm": job_data.get("algorithm", ""), + "created_at": job_data.get("created_at", ""), + "started_at": job_data.get("started_at") or None, + "completed_at": job_data.get("completed_at") or None, + "result": None, + "error": job_data.get("error") or None, + "metadata": None, + } + + # 解析 result + if job_data.get("result"): + try: + result["result"] = json.loads(job_data["result"]) + except json.JSONDecodeError: + result["result"] = None + + # 解析 metadata + if job_data.get("metadata"): + try: + result["metadata"] = json.loads(job_data["metadata"]) + except json.JSONDecodeError: + result["metadata"] = None + + return result + + async def execute_job(self, job_id: str) -> None: + """执行任务(在后台任务中调用) + + Args: + job_id: 任务 ID + """ + if not self._redis_client: + logger.error(f"Redis 不可用,无法执行任务: {job_id}") + return + + key = f"job:{job_id}" + job_data = await self._redis_client.hgetall(key) + + if not job_data: + logger.error(f"任务不存在: {job_id}") + return + + algorithm_name = job_data.get("algorithm", "") + webhook_url = job_data.get("webhook", "") + + # 解析参数 + try: + params = json.loads(job_data.get("params", "{}")) + except json.JSONDecodeError: + params = {} + + # 更新状态为 running + started_at = self._get_timestamp() + await self._redis_client.hset(key, mapping={"status": "running", "started_at": started_at}) + + logger.info(f"开始执行任务: job_id={job_id}, algorithm={algorithm_name}") + + import time + + start_time = time.time() + status = "completed" + result_data = None + error_msg = None + metadata = None + + try: + # 获取算法类并执行 + algorithm_cls = self._algorithm_registry.get(algorithm_name) + if not algorithm_cls: + raise ValueError(f"算法 '{algorithm_name}' 不存在") + + algorithm = algorithm_cls() + + # 根据算法类型传递参数 + if algorithm_name == "PrimeChecker": + execution_result = algorithm.execute(params.get("number", 0)) + else: + # 通用参数传递 + execution_result = algorithm.execute(**params) + + if execution_result.get("success"): + result_data = execution_result.get("result", {}) + metadata = execution_result.get("metadata", {}) + else: + status = "failed" + error_msg = execution_result.get("error", "算法执行失败") + metadata = execution_result.get("metadata", {}) + + except Exception as e: + status = "failed" + error_msg = str(e) + logger.error(f"任务执行失败: job_id={job_id}, error={e}", exc_info=True) + + # 计算执行时间 + elapsed_time = time.time() - start_time + completed_at = self._get_timestamp() + + # 更新任务状态 + update_data = { + "status": status, + "completed_at": completed_at, + "result": json.dumps(result_data) if result_data else "", + "error": error_msg or "", + "metadata": json.dumps(metadata) if metadata else "", + } + await self._redis_client.hset(key, mapping=update_data) + + # 设置 TTL + await self._redis_client.expire(key, settings.job_result_ttl) + + # 记录指标 + incr("jobs_completed_total", {"algorithm": algorithm_name, "status": status}) + observe("job_execution_duration_seconds", {"algorithm": algorithm_name}, elapsed_time) + + logger.info(f"任务执行完成: job_id={job_id}, status={status}, elapsed={elapsed_time:.3f}s") + + # 发送 Webhook 回调 + if webhook_url: + await self._send_webhook(job_id, webhook_url) + + async def _send_webhook(self, job_id: str, webhook_url: str) -> None: + """发送 Webhook 回调(带重试) + + Args: + job_id: 任务 ID + webhook_url: 回调 URL + """ + if not self._http_client: + logger.warning("HTTP 客户端不可用,无法发送 Webhook") + return + + # 获取任务数据 + job_data = await self.get_job(job_id) + if not job_data: + logger.error(f"无法获取任务数据用于 Webhook: {job_id}") + return + + # 构建回调负载 + payload = { + "job_id": job_data["job_id"], + "status": job_data["status"], + "algorithm": job_data["algorithm"], + "result": job_data["result"], + "error": job_data["error"], + "metadata": job_data["metadata"], + "completed_at": job_data["completed_at"], + } + + # 重试间隔(指数退避) + retry_delays = [1, 5, 15] + max_retries = settings.webhook_max_retries + + for attempt in range(max_retries): + try: + response = await self._http_client.post( + webhook_url, + json=payload, + headers={"Content-Type": "application/json"}, + ) + + if response.status_code < 400: + incr("webhook_deliveries_total", {"status": "success"}) + logger.info( + f"Webhook 发送成功: job_id={job_id}, url={webhook_url}, " + f"status_code={response.status_code}" + ) + return + else: + logger.warning( + f"Webhook 响应错误: job_id={job_id}, status_code={response.status_code}" + ) + + except Exception as e: + logger.warning( + f"Webhook 发送失败 (尝试 {attempt + 1}/{max_retries}): " + f"job_id={job_id}, error={e}" + ) + + # 等待后重试 + if attempt < max_retries - 1: + delay = retry_delays[min(attempt, len(retry_delays) - 1)] + await asyncio.sleep(delay) + + # 所有重试都失败 + incr("webhook_deliveries_total", {"status": "failed"}) + logger.error(f"Webhook 发送最终失败: job_id={job_id}, url={webhook_url}") + + def is_available(self) -> bool: + """检查任务管理器是否可用""" + return self._redis_client is not None + + +# 全局单例 +_job_manager: Optional[JobManager] = None + + +async def get_job_manager() -> JobManager: + """获取任务管理器单例""" + global _job_manager + if _job_manager is None: + _job_manager = JobManager() + await _job_manager.initialize() + return _job_manager + + +async def shutdown_job_manager() -> None: + """关闭任务管理器""" + global _job_manager + if _job_manager is not None: + await _job_manager.shutdown() + _job_manager = None diff --git a/src/functional_scaffold/main.py b/src/functional_scaffold/main.py index cedee93..449f15e 100644 --- a/src/functional_scaffold/main.py +++ b/src/functional_scaffold/main.py @@ -17,6 +17,7 @@ from .core.metrics_unified import ( gauge_decr, export, ) +from .core.job_manager import get_job_manager, shutdown_job_manager # 设置日志 setup_logging(level=settings.log_level, format_type=settings.log_format) @@ -132,6 +133,16 @@ async def startup_event(): else: logger.warning("Redis 不可用,指标将不会被收集") + # 初始化任务管理器 + try: + job_manager = await get_job_manager() + if job_manager.is_available(): + logger.info("异步任务管理器已启用") + else: + logger.warning("Redis 不可用,异步任务功能将不可用") + except Exception as e: + logger.warning(f"任务管理器初始化失败: {e}") + # 关闭事件 @app.on_event("shutdown") @@ -139,6 +150,13 @@ async def shutdown_event(): """应用关闭时执行""" logger.info(f"Shutting down {settings.app_name}") + # 关闭任务管理器 + try: + await shutdown_job_manager() + logger.info("任务管理器已关闭") + except Exception as e: + logger.warning(f"任务管理器关闭失败: {e}") + if __name__ == "__main__": import uvicorn diff --git a/tests/test_api.py b/tests/test_api.py index 5f64b5b..c64a4bc 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -103,8 +103,5 @@ class TestMetricsEndpoint: class TestJobsEndpoint: """测试异步任务端点""" - def test_jobs_not_implemented(self, client): - """测试异步任务接口(未实现)""" - response = client.post("/jobs", json={"number": 17}) - - assert response.status_code == status.HTTP_501_NOT_IMPLEMENTED + # 详细测试在 test_job_manager.py 中 + pass diff --git a/tests/test_job_manager.py b/tests/test_job_manager.py new file mode 100644 index 0000000..e717691 --- /dev/null +++ b/tests/test_job_manager.py @@ -0,0 +1,401 @@ +"""异步任务管理器测试""" + +import asyncio +import json +import pytest +from unittest.mock import AsyncMock, MagicMock, patch +from fastapi import status + +from src.functional_scaffold.core.job_manager import ( + JobManager, + get_job_manager, + shutdown_job_manager, +) +from src.functional_scaffold.api.models import JobStatus + + +class TestJobManager: + """测试 JobManager 类""" + + @pytest.fixture + def mock_redis(self): + """模拟 Redis 客户端""" + mock = AsyncMock() + mock.ping = AsyncMock(return_value=True) + mock.hset = AsyncMock() + mock.hgetall = AsyncMock(return_value={}) + mock.expire = AsyncMock() + mock.close = AsyncMock() + return mock + + @pytest.fixture + def mock_http_client(self): + """模拟 HTTP 客户端""" + mock = AsyncMock() + mock.post = AsyncMock() + mock.aclose = AsyncMock() + return mock + + @pytest.mark.asyncio + async def test_generate_job_id(self): + """测试任务 ID 生成""" + manager = JobManager() + job_id = manager._generate_job_id() + + assert len(job_id) == 12 + assert all(c in "0123456789abcdef" for c in job_id) + + @pytest.mark.asyncio + async def test_get_timestamp(self): + """测试时间戳生成""" + manager = JobManager() + timestamp = manager._get_timestamp() + + assert "T" in timestamp + assert timestamp.endswith("+00:00") or timestamp.endswith("Z") + + @pytest.mark.asyncio + async def test_get_available_algorithms(self): + """测试获取可用算法列表""" + manager = JobManager() + manager._register_algorithms() + + algorithms = manager.get_available_algorithms() + + assert "PrimeChecker" in algorithms + + @pytest.mark.asyncio + async def test_is_available_without_redis(self): + """测试 Redis 不可用时的状态""" + manager = JobManager() + + assert manager.is_available() is False + + +class TestJobManagerWithMocks: + """使用 Mock 测试 JobManager""" + + @pytest.mark.asyncio + async def test_create_job(self): + """测试创建任务""" + manager = JobManager() + + # 模拟 Redis + mock_redis = AsyncMock() + mock_redis.hset = AsyncMock() + manager._redis_client = mock_redis + manager._register_algorithms() + + job_id = await manager.create_job( + algorithm="PrimeChecker", + params={"number": 17}, + webhook="https://example.com/callback", + request_id="test-request-id", + ) + + assert len(job_id) == 12 + mock_redis.hset.assert_called_once() + + @pytest.mark.asyncio + async def test_create_job_invalid_algorithm(self): + """测试创建任务时算法不存在""" + manager = JobManager() + + mock_redis = AsyncMock() + manager._redis_client = mock_redis + manager._register_algorithms() + + with pytest.raises(ValueError, match="不存在"): + await manager.create_job( + algorithm="NonExistentAlgorithm", + params={}, + ) + + @pytest.mark.asyncio + async def test_create_job_redis_unavailable(self): + """测试 Redis 不可用时创建任务""" + manager = JobManager() + manager._register_algorithms() + + with pytest.raises(RuntimeError, match="Redis 不可用"): + await manager.create_job( + algorithm="PrimeChecker", + params={"number": 17}, + ) + + @pytest.mark.asyncio + async def test_get_job(self): + """测试获取任务信息""" + manager = JobManager() + + mock_redis = AsyncMock() + mock_redis.hgetall = AsyncMock( + return_value={ + "status": "completed", + "algorithm": "PrimeChecker", + "created_at": "2026-02-02T10:00:00+00:00", + "started_at": "2026-02-02T10:00:01+00:00", + "completed_at": "2026-02-02T10:00:02+00:00", + "result": '{"number": 17, "is_prime": true}', + "error": "", + "metadata": '{"elapsed_time": 0.001}', + } + ) + manager._redis_client = mock_redis + + job_data = await manager.get_job("test-job-id") + + assert job_data is not None + assert job_data["job_id"] == "test-job-id" + assert job_data["status"] == "completed" + assert job_data["algorithm"] == "PrimeChecker" + assert job_data["result"]["number"] == 17 + assert job_data["result"]["is_prime"] is True + + @pytest.mark.asyncio + async def test_get_job_not_found(self): + """测试获取不存在的任务""" + manager = JobManager() + + mock_redis = AsyncMock() + mock_redis.hgetall = AsyncMock(return_value={}) + manager._redis_client = mock_redis + + job_data = await manager.get_job("non-existent-job") + + assert job_data is None + + @pytest.mark.asyncio + async def test_execute_job(self): + """测试执行任务""" + manager = JobManager() + + mock_redis = AsyncMock() + mock_redis.hgetall = AsyncMock( + return_value={ + "status": "pending", + "algorithm": "PrimeChecker", + "params": '{"number": 17}', + "webhook": "", + "request_id": "test-request-id", + "created_at": "2026-02-02T10:00:00+00:00", + } + ) + mock_redis.hset = AsyncMock() + mock_redis.expire = AsyncMock() + manager._redis_client = mock_redis + manager._register_algorithms() + + await manager.execute_job("test-job-id") + + # 验证状态更新被调用 + assert mock_redis.hset.call_count >= 2 # running + completed + mock_redis.expire.assert_called_once() + + +class TestJobsAPI: + """测试 /jobs API 端点""" + + def test_create_job_success(self, client): + """测试成功创建任务""" + with patch( + "src.functional_scaffold.api.routes.get_job_manager", new_callable=AsyncMock + ) as mock_get_manager: + mock_manager = MagicMock() + mock_manager.is_available.return_value = True + mock_manager.get_available_algorithms.return_value = ["PrimeChecker"] + mock_manager.create_job = AsyncMock(return_value="abc123def456") + mock_manager.get_job = AsyncMock( + return_value={ + "job_id": "abc123def456", + "status": "pending", + "algorithm": "PrimeChecker", + "created_at": "2026-02-02T10:00:00+00:00", + } + ) + mock_manager.execute_job = AsyncMock() + mock_get_manager.return_value = mock_manager + + response = client.post( + "/jobs", + json={ + "algorithm": "PrimeChecker", + "params": {"number": 17}, + }, + ) + + assert response.status_code == status.HTTP_202_ACCEPTED + data = response.json() + assert data["job_id"] == "abc123def456" + assert data["status"] == "pending" + assert data["message"] == "任务已创建" + + def test_create_job_algorithm_not_found(self, client): + """测试创建任务时算法不存在""" + with patch( + "src.functional_scaffold.api.routes.get_job_manager", new_callable=AsyncMock + ) as mock_get_manager: + mock_manager = MagicMock() + mock_manager.is_available.return_value = True + mock_manager.get_available_algorithms.return_value = ["PrimeChecker"] + mock_get_manager.return_value = mock_manager + + response = client.post( + "/jobs", + json={ + "algorithm": "NonExistentAlgorithm", + "params": {}, + }, + ) + + assert response.status_code == status.HTTP_404_NOT_FOUND + data = response.json() + assert data["detail"]["error"] == "ALGORITHM_NOT_FOUND" + + def test_create_job_service_unavailable(self, client): + """测试服务不可用时创建任务""" + with patch( + "src.functional_scaffold.api.routes.get_job_manager", new_callable=AsyncMock + ) as mock_get_manager: + mock_manager = MagicMock() + mock_manager.is_available.return_value = False + mock_get_manager.return_value = mock_manager + + response = client.post( + "/jobs", + json={ + "algorithm": "PrimeChecker", + "params": {"number": 17}, + }, + ) + + assert response.status_code == status.HTTP_503_SERVICE_UNAVAILABLE + + def test_get_job_status_success(self, client): + """测试成功查询任务状态""" + with patch( + "src.functional_scaffold.api.routes.get_job_manager", new_callable=AsyncMock + ) as mock_get_manager: + mock_manager = MagicMock() + mock_manager.is_available.return_value = True + mock_manager.get_job = AsyncMock( + return_value={ + "job_id": "abc123def456", + "status": "completed", + "algorithm": "PrimeChecker", + "created_at": "2026-02-02T10:00:00+00:00", + "started_at": "2026-02-02T10:00:01+00:00", + "completed_at": "2026-02-02T10:00:02+00:00", + "result": {"number": 17, "is_prime": True}, + "error": None, + "metadata": {"elapsed_time": 0.001}, + } + ) + mock_get_manager.return_value = mock_manager + + response = client.get("/jobs/abc123def456") + + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert data["job_id"] == "abc123def456" + assert data["status"] == "completed" + assert data["result"]["is_prime"] is True + + def test_get_job_status_not_found(self, client): + """测试查询不存在的任务""" + with patch( + "src.functional_scaffold.api.routes.get_job_manager", new_callable=AsyncMock + ) as mock_get_manager: + mock_manager = MagicMock() + mock_manager.is_available.return_value = True + mock_manager.get_job = AsyncMock(return_value=None) + mock_get_manager.return_value = mock_manager + + response = client.get("/jobs/non-existent-job") + + assert response.status_code == status.HTTP_404_NOT_FOUND + data = response.json() + assert data["detail"]["error"] == "JOB_NOT_FOUND" + + def test_get_job_status_service_unavailable(self, client): + """测试服务不可用时查询任务""" + with patch( + "src.functional_scaffold.api.routes.get_job_manager", new_callable=AsyncMock + ) as mock_get_manager: + mock_manager = MagicMock() + mock_manager.is_available.return_value = False + mock_get_manager.return_value = mock_manager + + response = client.get("/jobs/abc123def456") + + assert response.status_code == status.HTTP_503_SERVICE_UNAVAILABLE + + +class TestWebhook: + """测试 Webhook 回调""" + + @pytest.mark.asyncio + async def test_send_webhook_success(self): + """测试成功发送 Webhook""" + manager = JobManager() + + mock_redis = AsyncMock() + mock_redis.hgetall = AsyncMock( + return_value={ + "status": "completed", + "algorithm": "PrimeChecker", + "created_at": "2026-02-02T10:00:00+00:00", + "completed_at": "2026-02-02T10:00:02+00:00", + "result": '{"number": 17, "is_prime": true}', + "error": "", + "metadata": '{"elapsed_time": 0.001}', + } + ) + manager._redis_client = mock_redis + + mock_response = MagicMock() + mock_response.status_code = 200 + + mock_http = AsyncMock() + mock_http.post = AsyncMock(return_value=mock_response) + manager._http_client = mock_http + + await manager._send_webhook("test-job-id", "https://example.com/callback") + + mock_http.post.assert_called_once() + call_args = mock_http.post.call_args + assert call_args[0][0] == "https://example.com/callback" + assert "json" in call_args[1] + + @pytest.mark.asyncio + async def test_send_webhook_retry_on_failure(self): + """测试 Webhook 失败时重试""" + manager = JobManager() + + mock_redis = AsyncMock() + mock_redis.hgetall = AsyncMock( + return_value={ + "status": "completed", + "algorithm": "PrimeChecker", + "created_at": "2026-02-02T10:00:00+00:00", + "completed_at": "2026-02-02T10:00:02+00:00", + "result": "{}", + "error": "", + "metadata": "{}", + } + ) + manager._redis_client = mock_redis + + mock_http = AsyncMock() + mock_http.post = AsyncMock(side_effect=Exception("Connection error")) + manager._http_client = mock_http + + # 使用较短的重试间隔进行测试 + with patch("src.functional_scaffold.core.job_manager.settings") as mock_settings: + mock_settings.webhook_max_retries = 2 + mock_settings.webhook_timeout = 1 + + await manager._send_webhook("test-job-id", "https://example.com/callback") + + # 验证重试次数 + assert mock_http.post.call_count == 2