变更内容: - 删除 `src` 子目录,将模块引用路径从 `src.functional_scaffold` 更新为 `functional_scaffold`。 - 修改相关代码、文档、测试用例及配置文件中的路径引用,包括 `README.md`、`Dockerfile`、`uvicorn` 启动命令等。 - 优化项目目录结构,提升代码维护性和可读性。
15 KiB
15 KiB
算法开发指南
本文档详细介绍如何在 FunctionalScaffold 框架中开发算法,包括最佳实践、高级特性和常见模式。
算法架构
核心概念
┌─────────────────────────────────────────────────────────────┐
│ HTTP 请求 │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ FastAPI 路由层 │
│ - 参数验证 (Pydantic) │
│ - 请求 ID 生成 │
│ - 错误处理 │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ BaseAlgorithm.execute() │
│ - 自动计时 │
│ - 指标记录 │
│ - 异常捕获 │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ YourAlgorithm.process() │
│ ★ 你只需要实现这个方法 ★ │
└─────────────────────────────────────────────────────────────┘
BaseAlgorithm 基类
所有算法必须继承 BaseAlgorithm 类:
from abc import ABC, abstractmethod
from typing import Any, Dict
class BaseAlgorithm(ABC):
"""算法基类"""
def __init__(self):
self.name = self.__class__.__name__
self.version = "1.0.0"
@abstractmethod
def process(self, *args, **kwargs) -> Dict[str, Any]:
"""算法处理逻辑 - 子类必须实现"""
pass
def execute(self, *args, **kwargs) -> Dict[str, Any]:
"""执行算法 - 自动处理埋点和错误"""
# 框架自动处理:计时、日志、指标、异常
pass
开发算法
基础示例
# src/functional_scaffold/algorithms/text_processor.py
from typing import Dict, Any
from .base import BaseAlgorithm
class TextProcessor(BaseAlgorithm):
"""文本处理算法"""
def __init__(self):
super().__init__()
self.version = "1.0.0"
def process(self, text: str, operation: str = "upper") -> Dict[str, Any]:
"""
处理文本
Args:
text: 输入文本
operation: 操作类型 (upper/lower/reverse)
Returns:
处理结果
"""
if operation == "upper":
result = text.upper()
elif operation == "lower":
result = text.lower()
elif operation == "reverse":
result = text[::-1]
else:
raise ValueError(f"不支持的操作: {operation}")
return {
"original": text,
"processed": result,
"operation": operation,
"length": len(result)
}
带模型加载的算法
# src/functional_scaffold/algorithms/ml_predictor.py
from typing import Dict, Any
import logging
from .base import BaseAlgorithm
logger = logging.getLogger(__name__)
class MLPredictor(BaseAlgorithm):
"""机器学习预测算法"""
def __init__(self, model_path: str = None):
super().__init__()
self.model = None
self.model_path = model_path or "models/default.pkl"
self._load_model()
def _load_model(self):
"""加载模型(在初始化时执行一次)"""
logger.info(f"加载模型: {self.model_path}")
# import joblib
# self.model = joblib.load(self.model_path)
self.model = "mock_model" # 示例
logger.info("模型加载完成")
def process(self, features: list) -> Dict[str, Any]:
"""
执行预测
Args:
features: 特征向量
Returns:
预测结果
"""
if self.model is None:
raise RuntimeError("模型未加载")
# prediction = self.model.predict([features])[0]
prediction = sum(features) / len(features) # 示例
return {
"features": features,
"prediction": prediction,
"model_version": self.version
}
带外部服务调用的算法
# src/functional_scaffold/algorithms/data_fetcher.py
from typing import Dict, Any
import httpx
from .base import BaseAlgorithm
from ..config import settings
class DataFetcher(BaseAlgorithm):
"""数据获取算法"""
def __init__(self):
super().__init__()
self.api_base_url = settings.external_api_url or "https://api.example.com"
async def _fetch_data(self, endpoint: str) -> dict:
"""异步获取数据"""
async with httpx.AsyncClient() as client:
response = await client.get(f"{self.api_base_url}/{endpoint}")
response.raise_for_status()
return response.json()
def process(self, data_id: str) -> Dict[str, Any]:
"""
获取并处理数据
Args:
data_id: 数据 ID
Returns:
处理后的数据
"""
import asyncio
# 在同步方法中调用异步函数
loop = asyncio.new_event_loop()
try:
data = loop.run_until_complete(self._fetch_data(f"data/{data_id}"))
finally:
loop.close()
# 处理数据
processed = self._transform(data)
return {
"data_id": data_id,
"raw_data": data,
"processed_data": processed
}
def _transform(self, data: dict) -> dict:
"""数据转换"""
return {k: v for k, v in data.items() if v is not None}
数据模型定义
请求模型
# src/functional_scaffold/api/models.py
from pydantic import BaseModel, Field, ConfigDict, field_validator
from typing import List, Optional
class TextProcessRequest(BaseModel):
"""文本处理请求"""
model_config = ConfigDict(
json_schema_extra={
"example": {
"text": "Hello World",
"operation": "upper"
}
}
)
text: str = Field(..., min_length=1, max_length=10000, description="输入文本")
operation: str = Field("upper", description="操作类型")
@field_validator("operation")
@classmethod
def validate_operation(cls, v):
allowed = ["upper", "lower", "reverse"]
if v not in allowed:
raise ValueError(f"operation 必须是 {allowed} 之一")
return v
class MLPredictRequest(BaseModel):
"""ML 预测请求"""
model_config = ConfigDict(
json_schema_extra={
"example": {
"features": [1.0, 2.0, 3.0, 4.0]
}
}
)
features: List[float] = Field(..., min_length=1, description="特征向量")
响应模型
class TextProcessResponse(BaseModel):
"""文本处理响应"""
request_id: str = Field(..., description="请求 ID")
status: str = Field(..., description="处理状态")
result: Dict[str, Any] = Field(..., description="处理结果")
metadata: Dict[str, Any] = Field(..., description="元数据")
路由注册
同步接口
# src/functional_scaffold/api/routes.py
from fastapi import APIRouter, HTTPException, Depends, status
from .models import TextProcessRequest, TextProcessResponse
from .dependencies import get_request_id
from ..algorithms.text_processor import TextProcessor
router = APIRouter()
@router.post(
"/text/process",
response_model=TextProcessResponse,
summary="文本处理",
description="对输入文本执行指定操作",
)
async def process_text(
request: TextProcessRequest,
request_id: str = Depends(get_request_id),
):
"""文本处理端点"""
try:
processor = TextProcessor()
result = processor.execute(request.text, request.operation)
if not result["success"]:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail={"error": "ALGORITHM_ERROR", "message": result["error"]}
)
return TextProcessResponse(
request_id=request_id,
status="success",
result=result["result"],
metadata=result["metadata"]
)
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={"error": "VALIDATION_ERROR", "message": str(e)}
)
异步任务接口
算法注册后自动支持异步调用,无需额外代码:
# 创建异步任务
curl -X POST http://localhost:8000/jobs \
-H "Content-Type: application/json" \
-d '{
"algorithm": "TextProcessor",
"params": {"text": "hello", "operation": "upper"}
}'
自定义指标
定义指标
在 config/metrics.yaml 中添加:
custom_metrics:
# 文本处理统计
text_process_total:
name: "text_process_total"
type: counter
description: "文本处理总数"
labels: [operation]
text_length_histogram:
name: "text_length_histogram"
type: histogram
description: "处理文本长度分布"
labels: []
buckets: [10, 50, 100, 500, 1000, 5000, 10000]
记录指标
from ..core.metrics_unified import incr, observe
class TextProcessor(BaseAlgorithm):
def process(self, text: str, operation: str = "upper") -> Dict[str, Any]:
# 记录操作计数
incr("text_process_total", {"operation": operation})
# 记录文本长度
observe("text_length_histogram", {}, len(text))
# ... 算法逻辑 ...
错误处理
自定义异常
# src/functional_scaffold/core/errors.py
class AlgorithmError(FunctionalScaffoldError):
"""算法执行错误"""
pass
class ModelNotLoadedError(AlgorithmError):
"""模型未加载错误"""
pass
class InvalidInputError(AlgorithmError):
"""无效输入错误"""
pass
在算法中使用
from ..core.errors import InvalidInputError, ModelNotLoadedError
class MLPredictor(BaseAlgorithm):
def process(self, features: list) -> Dict[str, Any]:
if not features:
raise InvalidInputError("特征向量不能为空")
if self.model is None:
raise ModelNotLoadedError("模型未加载,请检查模型文件")
# ... 算法逻辑 ...
测试
单元测试
# tests/test_text_processor.py
import pytest
from functional_scaffold.algorithms.text_processor import TextProcessor
class TestTextProcessor:
"""文本处理算法测试"""
def test_upper_operation(self):
"""测试大写转换"""
processor = TextProcessor()
result = processor.process("hello", "upper")
assert result["processed"] == "HELLO"
assert result["operation"] == "upper"
def test_lower_operation(self):
"""测试小写转换"""
processor = TextProcessor()
result = processor.process("HELLO", "lower")
assert result["processed"] == "hello"
def test_reverse_operation(self):
"""测试反转"""
processor = TextProcessor()
result = processor.process("hello", "reverse")
assert result["processed"] == "olleh"
def test_invalid_operation(self):
"""测试无效操作"""
processor = TextProcessor()
with pytest.raises(ValueError, match="不支持的操作"):
processor.process("hello", "invalid")
def test_execute_wrapper(self):
"""测试 execute 包装器"""
processor = TextProcessor()
result = processor.execute("hello", "upper")
assert result["success"] is True
assert result["result"]["processed"] == "HELLO"
assert "elapsed_time" in result["metadata"]
API 集成测试
# tests/test_text_api.py
import pytest
from fastapi import status
class TestTextProcessAPI:
"""文本处理 API 测试"""
def test_process_text_success(self, client):
"""测试成功处理"""
response = client.post(
"/text/process",
json={"text": "hello", "operation": "upper"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["status"] == "success"
assert data["result"]["processed"] == "HELLO"
def test_process_text_invalid_operation(self, client):
"""测试无效操作"""
response = client.post(
"/text/process",
json={"text": "hello", "operation": "invalid"}
)
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
最佳实践
1. 保持 process() 方法简洁
# ✅ 好的做法
def process(self, data):
validated = self._validate(data)
transformed = self._transform(validated)
result = self._compute(transformed)
return self._format_output(result)
# ❌ 避免
def process(self, data):
# 200 行代码全部写在这里...
2. 使用类型注解
# ✅ 好的做法
def process(self, text: str, max_length: int = 100) -> Dict[str, Any]:
...
# ❌ 避免
def process(self, text, max_length=100):
...
3. 合理使用日志
import logging
logger = logging.getLogger(__name__)
class MyAlgorithm(BaseAlgorithm):
def process(self, data):
logger.info(f"开始处理数据,大小: {len(data)}")
# ... 处理逻辑 ...
logger.info(f"处理完成,结果大小: {len(result)}")
return result
4. 资源管理
class ResourceIntensiveAlgorithm(BaseAlgorithm):
def __init__(self):
super().__init__()
self._resource = None
def _ensure_resource(self):
"""延迟加载资源"""
if self._resource is None:
self._resource = self._load_heavy_resource()
def process(self, data):
self._ensure_resource()
return self._resource.process(data)