变更内容: - 移除冗余文档,包括 Grafana 指南、指标对比、修复总结、OpenAPI 规范等。 - 精简项目文档结构,优化 README 文件内容。 - 提升文档层次清晰度,集中核心指南。
557 lines
15 KiB
Markdown
557 lines
15 KiB
Markdown
# 算法开发指南
|
|
|
|
本文档详细介绍如何在 FunctionalScaffold 框架中开发算法,包括最佳实践、高级特性和常见模式。
|
|
|
|
## 算法架构
|
|
|
|
### 核心概念
|
|
|
|
```
|
|
┌─────────────────────────────────────────────────────────────┐
|
|
│ HTTP 请求 │
|
|
└─────────────────────────────────────────────────────────────┘
|
|
│
|
|
▼
|
|
┌─────────────────────────────────────────────────────────────┐
|
|
│ FastAPI 路由层 │
|
|
│ - 参数验证 (Pydantic) │
|
|
│ - 请求 ID 生成 │
|
|
│ - 错误处理 │
|
|
└─────────────────────────────────────────────────────────────┘
|
|
│
|
|
▼
|
|
┌─────────────────────────────────────────────────────────────┐
|
|
│ BaseAlgorithm.execute() │
|
|
│ - 自动计时 │
|
|
│ - 指标记录 │
|
|
│ - 异常捕获 │
|
|
└─────────────────────────────────────────────────────────────┘
|
|
│
|
|
▼
|
|
┌─────────────────────────────────────────────────────────────┐
|
|
│ YourAlgorithm.process() │
|
|
│ ★ 你只需要实现这个方法 ★ │
|
|
└─────────────────────────────────────────────────────────────┘
|
|
```
|
|
|
|
### BaseAlgorithm 基类
|
|
|
|
所有算法必须继承 `BaseAlgorithm` 类:
|
|
|
|
```python
|
|
from abc import ABC, abstractmethod
|
|
from typing import Any, Dict
|
|
|
|
class BaseAlgorithm(ABC):
|
|
"""算法基类"""
|
|
|
|
def __init__(self):
|
|
self.name = self.__class__.__name__
|
|
self.version = "1.0.0"
|
|
|
|
@abstractmethod
|
|
def process(self, *args, **kwargs) -> Dict[str, Any]:
|
|
"""算法处理逻辑 - 子类必须实现"""
|
|
pass
|
|
|
|
def execute(self, *args, **kwargs) -> Dict[str, Any]:
|
|
"""执行算法 - 自动处理埋点和错误"""
|
|
# 框架自动处理:计时、日志、指标、异常
|
|
pass
|
|
```
|
|
|
|
## 开发算法
|
|
|
|
### 基础示例
|
|
|
|
```python
|
|
# src/functional_scaffold/algorithms/text_processor.py
|
|
from typing import Dict, Any
|
|
from .base import BaseAlgorithm
|
|
|
|
class TextProcessor(BaseAlgorithm):
|
|
"""文本处理算法"""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.version = "1.0.0"
|
|
|
|
def process(self, text: str, operation: str = "upper") -> Dict[str, Any]:
|
|
"""
|
|
处理文本
|
|
|
|
Args:
|
|
text: 输入文本
|
|
operation: 操作类型 (upper/lower/reverse)
|
|
|
|
Returns:
|
|
处理结果
|
|
"""
|
|
if operation == "upper":
|
|
result = text.upper()
|
|
elif operation == "lower":
|
|
result = text.lower()
|
|
elif operation == "reverse":
|
|
result = text[::-1]
|
|
else:
|
|
raise ValueError(f"不支持的操作: {operation}")
|
|
|
|
return {
|
|
"original": text,
|
|
"processed": result,
|
|
"operation": operation,
|
|
"length": len(result)
|
|
}
|
|
```
|
|
|
|
### 带模型加载的算法
|
|
|
|
```python
|
|
# src/functional_scaffold/algorithms/ml_predictor.py
|
|
from typing import Dict, Any
|
|
import logging
|
|
from .base import BaseAlgorithm
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class MLPredictor(BaseAlgorithm):
|
|
"""机器学习预测算法"""
|
|
|
|
def __init__(self, model_path: str = None):
|
|
super().__init__()
|
|
self.model = None
|
|
self.model_path = model_path or "models/default.pkl"
|
|
self._load_model()
|
|
|
|
def _load_model(self):
|
|
"""加载模型(在初始化时执行一次)"""
|
|
logger.info(f"加载模型: {self.model_path}")
|
|
# import joblib
|
|
# self.model = joblib.load(self.model_path)
|
|
self.model = "mock_model" # 示例
|
|
logger.info("模型加载完成")
|
|
|
|
def process(self, features: list) -> Dict[str, Any]:
|
|
"""
|
|
执行预测
|
|
|
|
Args:
|
|
features: 特征向量
|
|
|
|
Returns:
|
|
预测结果
|
|
"""
|
|
if self.model is None:
|
|
raise RuntimeError("模型未加载")
|
|
|
|
# prediction = self.model.predict([features])[0]
|
|
prediction = sum(features) / len(features) # 示例
|
|
|
|
return {
|
|
"features": features,
|
|
"prediction": prediction,
|
|
"model_version": self.version
|
|
}
|
|
```
|
|
|
|
### 带外部服务调用的算法
|
|
|
|
```python
|
|
# src/functional_scaffold/algorithms/data_fetcher.py
|
|
from typing import Dict, Any
|
|
import httpx
|
|
from .base import BaseAlgorithm
|
|
from ..config import settings
|
|
|
|
class DataFetcher(BaseAlgorithm):
|
|
"""数据获取算法"""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.api_base_url = settings.external_api_url or "https://api.example.com"
|
|
|
|
async def _fetch_data(self, endpoint: str) -> dict:
|
|
"""异步获取数据"""
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.get(f"{self.api_base_url}/{endpoint}")
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def process(self, data_id: str) -> Dict[str, Any]:
|
|
"""
|
|
获取并处理数据
|
|
|
|
Args:
|
|
data_id: 数据 ID
|
|
|
|
Returns:
|
|
处理后的数据
|
|
"""
|
|
import asyncio
|
|
|
|
# 在同步方法中调用异步函数
|
|
loop = asyncio.new_event_loop()
|
|
try:
|
|
data = loop.run_until_complete(self._fetch_data(f"data/{data_id}"))
|
|
finally:
|
|
loop.close()
|
|
|
|
# 处理数据
|
|
processed = self._transform(data)
|
|
|
|
return {
|
|
"data_id": data_id,
|
|
"raw_data": data,
|
|
"processed_data": processed
|
|
}
|
|
|
|
def _transform(self, data: dict) -> dict:
|
|
"""数据转换"""
|
|
return {k: v for k, v in data.items() if v is not None}
|
|
```
|
|
|
|
## 数据模型定义
|
|
|
|
### 请求模型
|
|
|
|
```python
|
|
# src/functional_scaffold/api/models.py
|
|
from pydantic import BaseModel, Field, ConfigDict, field_validator
|
|
from typing import List, Optional
|
|
|
|
class TextProcessRequest(BaseModel):
|
|
"""文本处理请求"""
|
|
|
|
model_config = ConfigDict(
|
|
json_schema_extra={
|
|
"example": {
|
|
"text": "Hello World",
|
|
"operation": "upper"
|
|
}
|
|
}
|
|
)
|
|
|
|
text: str = Field(..., min_length=1, max_length=10000, description="输入文本")
|
|
operation: str = Field("upper", description="操作类型")
|
|
|
|
@field_validator("operation")
|
|
@classmethod
|
|
def validate_operation(cls, v):
|
|
allowed = ["upper", "lower", "reverse"]
|
|
if v not in allowed:
|
|
raise ValueError(f"operation 必须是 {allowed} 之一")
|
|
return v
|
|
|
|
|
|
class MLPredictRequest(BaseModel):
|
|
"""ML 预测请求"""
|
|
|
|
model_config = ConfigDict(
|
|
json_schema_extra={
|
|
"example": {
|
|
"features": [1.0, 2.0, 3.0, 4.0]
|
|
}
|
|
}
|
|
)
|
|
|
|
features: List[float] = Field(..., min_length=1, description="特征向量")
|
|
```
|
|
|
|
### 响应模型
|
|
|
|
```python
|
|
class TextProcessResponse(BaseModel):
|
|
"""文本处理响应"""
|
|
|
|
request_id: str = Field(..., description="请求 ID")
|
|
status: str = Field(..., description="处理状态")
|
|
result: Dict[str, Any] = Field(..., description="处理结果")
|
|
metadata: Dict[str, Any] = Field(..., description="元数据")
|
|
```
|
|
|
|
## 路由注册
|
|
|
|
### 同步接口
|
|
|
|
```python
|
|
# src/functional_scaffold/api/routes.py
|
|
from fastapi import APIRouter, HTTPException, Depends, status
|
|
from .models import TextProcessRequest, TextProcessResponse
|
|
from .dependencies import get_request_id
|
|
from ..algorithms.text_processor import TextProcessor
|
|
|
|
router = APIRouter()
|
|
|
|
@router.post(
|
|
"/text/process",
|
|
response_model=TextProcessResponse,
|
|
summary="文本处理",
|
|
description="对输入文本执行指定操作",
|
|
)
|
|
async def process_text(
|
|
request: TextProcessRequest,
|
|
request_id: str = Depends(get_request_id),
|
|
):
|
|
"""文本处理端点"""
|
|
try:
|
|
processor = TextProcessor()
|
|
result = processor.execute(request.text, request.operation)
|
|
|
|
if not result["success"]:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail={"error": "ALGORITHM_ERROR", "message": result["error"]}
|
|
)
|
|
|
|
return TextProcessResponse(
|
|
request_id=request_id,
|
|
status="success",
|
|
result=result["result"],
|
|
metadata=result["metadata"]
|
|
)
|
|
|
|
except ValueError as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail={"error": "VALIDATION_ERROR", "message": str(e)}
|
|
)
|
|
```
|
|
|
|
### 异步任务接口
|
|
|
|
算法注册后自动支持异步调用,无需额外代码:
|
|
|
|
```bash
|
|
# 创建异步任务
|
|
curl -X POST http://localhost:8000/jobs \
|
|
-H "Content-Type: application/json" \
|
|
-d '{
|
|
"algorithm": "TextProcessor",
|
|
"params": {"text": "hello", "operation": "upper"}
|
|
}'
|
|
```
|
|
|
|
## 自定义指标
|
|
|
|
### 定义指标
|
|
|
|
在 `config/metrics.yaml` 中添加:
|
|
|
|
```yaml
|
|
custom_metrics:
|
|
# 文本处理统计
|
|
text_process_total:
|
|
name: "text_process_total"
|
|
type: counter
|
|
description: "文本处理总数"
|
|
labels: [operation]
|
|
|
|
text_length_histogram:
|
|
name: "text_length_histogram"
|
|
type: histogram
|
|
description: "处理文本长度分布"
|
|
labels: []
|
|
buckets: [10, 50, 100, 500, 1000, 5000, 10000]
|
|
```
|
|
|
|
### 记录指标
|
|
|
|
```python
|
|
from ..core.metrics_unified import incr, observe
|
|
|
|
class TextProcessor(BaseAlgorithm):
|
|
def process(self, text: str, operation: str = "upper") -> Dict[str, Any]:
|
|
# 记录操作计数
|
|
incr("text_process_total", {"operation": operation})
|
|
|
|
# 记录文本长度
|
|
observe("text_length_histogram", {}, len(text))
|
|
|
|
# ... 算法逻辑 ...
|
|
```
|
|
|
|
## 错误处理
|
|
|
|
### 自定义异常
|
|
|
|
```python
|
|
# src/functional_scaffold/core/errors.py
|
|
class AlgorithmError(FunctionalScaffoldError):
|
|
"""算法执行错误"""
|
|
pass
|
|
|
|
class ModelNotLoadedError(AlgorithmError):
|
|
"""模型未加载错误"""
|
|
pass
|
|
|
|
class InvalidInputError(AlgorithmError):
|
|
"""无效输入错误"""
|
|
pass
|
|
```
|
|
|
|
### 在算法中使用
|
|
|
|
```python
|
|
from ..core.errors import InvalidInputError, ModelNotLoadedError
|
|
|
|
class MLPredictor(BaseAlgorithm):
|
|
def process(self, features: list) -> Dict[str, Any]:
|
|
if not features:
|
|
raise InvalidInputError("特征向量不能为空")
|
|
|
|
if self.model is None:
|
|
raise ModelNotLoadedError("模型未加载,请检查模型文件")
|
|
|
|
# ... 算法逻辑 ...
|
|
```
|
|
|
|
## 测试
|
|
|
|
### 单元测试
|
|
|
|
```python
|
|
# tests/test_text_processor.py
|
|
import pytest
|
|
from src.functional_scaffold.algorithms.text_processor import TextProcessor
|
|
|
|
class TestTextProcessor:
|
|
"""文本处理算法测试"""
|
|
|
|
def test_upper_operation(self):
|
|
"""测试大写转换"""
|
|
processor = TextProcessor()
|
|
result = processor.process("hello", "upper")
|
|
|
|
assert result["processed"] == "HELLO"
|
|
assert result["operation"] == "upper"
|
|
|
|
def test_lower_operation(self):
|
|
"""测试小写转换"""
|
|
processor = TextProcessor()
|
|
result = processor.process("HELLO", "lower")
|
|
|
|
assert result["processed"] == "hello"
|
|
|
|
def test_reverse_operation(self):
|
|
"""测试反转"""
|
|
processor = TextProcessor()
|
|
result = processor.process("hello", "reverse")
|
|
|
|
assert result["processed"] == "olleh"
|
|
|
|
def test_invalid_operation(self):
|
|
"""测试无效操作"""
|
|
processor = TextProcessor()
|
|
|
|
with pytest.raises(ValueError, match="不支持的操作"):
|
|
processor.process("hello", "invalid")
|
|
|
|
def test_execute_wrapper(self):
|
|
"""测试 execute 包装器"""
|
|
processor = TextProcessor()
|
|
result = processor.execute("hello", "upper")
|
|
|
|
assert result["success"] is True
|
|
assert result["result"]["processed"] == "HELLO"
|
|
assert "elapsed_time" in result["metadata"]
|
|
```
|
|
|
|
### API 集成测试
|
|
|
|
```python
|
|
# tests/test_text_api.py
|
|
import pytest
|
|
from fastapi import status
|
|
|
|
class TestTextProcessAPI:
|
|
"""文本处理 API 测试"""
|
|
|
|
def test_process_text_success(self, client):
|
|
"""测试成功处理"""
|
|
response = client.post(
|
|
"/text/process",
|
|
json={"text": "hello", "operation": "upper"}
|
|
)
|
|
|
|
assert response.status_code == status.HTTP_200_OK
|
|
data = response.json()
|
|
assert data["status"] == "success"
|
|
assert data["result"]["processed"] == "HELLO"
|
|
|
|
def test_process_text_invalid_operation(self, client):
|
|
"""测试无效操作"""
|
|
response = client.post(
|
|
"/text/process",
|
|
json={"text": "hello", "operation": "invalid"}
|
|
)
|
|
|
|
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
|
|
```
|
|
|
|
## 最佳实践
|
|
|
|
### 1. 保持 process() 方法简洁
|
|
|
|
```python
|
|
# ✅ 好的做法
|
|
def process(self, data):
|
|
validated = self._validate(data)
|
|
transformed = self._transform(validated)
|
|
result = self._compute(transformed)
|
|
return self._format_output(result)
|
|
|
|
# ❌ 避免
|
|
def process(self, data):
|
|
# 200 行代码全部写在这里...
|
|
```
|
|
|
|
### 2. 使用类型注解
|
|
|
|
```python
|
|
# ✅ 好的做法
|
|
def process(self, text: str, max_length: int = 100) -> Dict[str, Any]:
|
|
...
|
|
|
|
# ❌ 避免
|
|
def process(self, text, max_length=100):
|
|
...
|
|
```
|
|
|
|
### 3. 合理使用日志
|
|
|
|
```python
|
|
import logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class MyAlgorithm(BaseAlgorithm):
|
|
def process(self, data):
|
|
logger.info(f"开始处理数据,大小: {len(data)}")
|
|
# ... 处理逻辑 ...
|
|
logger.info(f"处理完成,结果大小: {len(result)}")
|
|
return result
|
|
```
|
|
|
|
### 4. 资源管理
|
|
|
|
```python
|
|
class ResourceIntensiveAlgorithm(BaseAlgorithm):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self._resource = None
|
|
|
|
def _ensure_resource(self):
|
|
"""延迟加载资源"""
|
|
if self._resource is None:
|
|
self._resource = self._load_heavy_resource()
|
|
|
|
def process(self, data):
|
|
self._ensure_resource()
|
|
return self._resource.process(data)
|
|
```
|
|
|
|
## 参考
|
|
|
|
- [快速入门指南](./getting-started.md)
|
|
- [API 参考文档](./api-reference.md)
|
|
- [监控指南](./monitoring.md)
|