# 算法开发指南 本文档详细介绍如何在 FunctionalScaffold 框架中开发算法,包括最佳实践、高级特性和常见模式。 ## 算法架构 ### 核心概念 ``` ┌─────────────────────────────────────────────────────────────┐ │ HTTP 请求 │ └─────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ FastAPI 路由层 │ │ - 参数验证 (Pydantic) │ │ - 请求 ID 生成 │ │ - 错误处理 │ └─────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ BaseAlgorithm.execute() │ │ - 自动计时 │ │ - 指标记录 │ │ - 异常捕获 │ └─────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ YourAlgorithm.process() │ │ ★ 你只需要实现这个方法 ★ │ └─────────────────────────────────────────────────────────────┘ ``` ### BaseAlgorithm 基类 所有算法必须继承 `BaseAlgorithm` 类: ```python from abc import ABC, abstractmethod from typing import Any, Dict class BaseAlgorithm(ABC): """算法基类""" def __init__(self): self.name = self.__class__.__name__ self.version = "1.0.0" @abstractmethod def process(self, *args, **kwargs) -> Dict[str, Any]: """算法处理逻辑 - 子类必须实现""" pass def execute(self, *args, **kwargs) -> Dict[str, Any]: """执行算法 - 自动处理埋点和错误""" # 框架自动处理:计时、日志、指标、异常 pass ``` ## 开发算法 ### 基础示例 ```python # src/functional_scaffold/algorithms/text_processor.py from typing import Dict, Any from .base import BaseAlgorithm class TextProcessor(BaseAlgorithm): """文本处理算法""" def __init__(self): super().__init__() self.version = "1.0.0" def process(self, text: str, operation: str = "upper") -> Dict[str, Any]: """ 处理文本 Args: text: 输入文本 operation: 操作类型 (upper/lower/reverse) Returns: 处理结果 """ if operation == "upper": result = text.upper() elif operation == "lower": result = text.lower() elif operation == "reverse": result = text[::-1] else: raise ValueError(f"不支持的操作: {operation}") return { "original": text, "processed": result, "operation": operation, "length": len(result) } ``` ### 带模型加载的算法 ```python # src/functional_scaffold/algorithms/ml_predictor.py from typing import Dict, Any import logging from .base import BaseAlgorithm logger = logging.getLogger(__name__) class MLPredictor(BaseAlgorithm): """机器学习预测算法""" def __init__(self, model_path: str = None): super().__init__() self.model = None self.model_path = model_path or "models/default.pkl" self._load_model() def _load_model(self): """加载模型(在初始化时执行一次)""" logger.info(f"加载模型: {self.model_path}") # import joblib # self.model = joblib.load(self.model_path) self.model = "mock_model" # 示例 logger.info("模型加载完成") def process(self, features: list) -> Dict[str, Any]: """ 执行预测 Args: features: 特征向量 Returns: 预测结果 """ if self.model is None: raise RuntimeError("模型未加载") # prediction = self.model.predict([features])[0] prediction = sum(features) / len(features) # 示例 return { "features": features, "prediction": prediction, "model_version": self.version } ``` ### 带外部服务调用的算法 ```python # src/functional_scaffold/algorithms/data_fetcher.py from typing import Dict, Any import httpx from .base import BaseAlgorithm from ..config import settings class DataFetcher(BaseAlgorithm): """数据获取算法""" def __init__(self): super().__init__() self.api_base_url = settings.external_api_url or "https://api.example.com" async def _fetch_data(self, endpoint: str) -> dict: """异步获取数据""" async with httpx.AsyncClient() as client: response = await client.get(f"{self.api_base_url}/{endpoint}") response.raise_for_status() return response.json() def process(self, data_id: str) -> Dict[str, Any]: """ 获取并处理数据 Args: data_id: 数据 ID Returns: 处理后的数据 """ import asyncio # 在同步方法中调用异步函数 loop = asyncio.new_event_loop() try: data = loop.run_until_complete(self._fetch_data(f"data/{data_id}")) finally: loop.close() # 处理数据 processed = self._transform(data) return { "data_id": data_id, "raw_data": data, "processed_data": processed } def _transform(self, data: dict) -> dict: """数据转换""" return {k: v for k, v in data.items() if v is not None} ``` ## 数据模型定义 ### 请求模型 ```python # src/functional_scaffold/api/models.py from pydantic import BaseModel, Field, ConfigDict, field_validator from typing import List, Optional class TextProcessRequest(BaseModel): """文本处理请求""" model_config = ConfigDict( json_schema_extra={ "example": { "text": "Hello World", "operation": "upper" } } ) text: str = Field(..., min_length=1, max_length=10000, description="输入文本") operation: str = Field("upper", description="操作类型") @field_validator("operation") @classmethod def validate_operation(cls, v): allowed = ["upper", "lower", "reverse"] if v not in allowed: raise ValueError(f"operation 必须是 {allowed} 之一") return v class MLPredictRequest(BaseModel): """ML 预测请求""" model_config = ConfigDict( json_schema_extra={ "example": { "features": [1.0, 2.0, 3.0, 4.0] } } ) features: List[float] = Field(..., min_length=1, description="特征向量") ``` ### 响应模型 ```python class TextProcessResponse(BaseModel): """文本处理响应""" request_id: str = Field(..., description="请求 ID") status: str = Field(..., description="处理状态") result: Dict[str, Any] = Field(..., description="处理结果") metadata: Dict[str, Any] = Field(..., description="元数据") ``` ## 路由注册 ### 同步接口 ```python # src/functional_scaffold/api/routes.py from fastapi import APIRouter, HTTPException, Depends, status from .models import TextProcessRequest, TextProcessResponse from .dependencies import get_request_id from ..algorithms.text_processor import TextProcessor router = APIRouter() @router.post( "/text/process", response_model=TextProcessResponse, summary="文本处理", description="对输入文本执行指定操作", ) async def process_text( request: TextProcessRequest, request_id: str = Depends(get_request_id), ): """文本处理端点""" try: processor = TextProcessor() result = processor.execute(request.text, request.operation) if not result["success"]: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail={"error": "ALGORITHM_ERROR", "message": result["error"]} ) return TextProcessResponse( request_id=request_id, status="success", result=result["result"], metadata=result["metadata"] ) except ValueError as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail={"error": "VALIDATION_ERROR", "message": str(e)} ) ``` ### 异步任务接口 算法注册后自动支持异步调用,无需额外代码: ```bash # 创建异步任务 curl -X POST http://localhost:8000/jobs \ -H "Content-Type: application/json" \ -d '{ "algorithm": "TextProcessor", "params": {"text": "hello", "operation": "upper"} }' ``` ## 自定义指标 ### 定义指标 在 `config/metrics.yaml` 中添加: ```yaml custom_metrics: # 文本处理统计 text_process_total: name: "text_process_total" type: counter description: "文本处理总数" labels: [operation] text_length_histogram: name: "text_length_histogram" type: histogram description: "处理文本长度分布" labels: [] buckets: [10, 50, 100, 500, 1000, 5000, 10000] ``` ### 记录指标 ```python from ..core.metrics_unified import incr, observe class TextProcessor(BaseAlgorithm): def process(self, text: str, operation: str = "upper") -> Dict[str, Any]: # 记录操作计数 incr("text_process_total", {"operation": operation}) # 记录文本长度 observe("text_length_histogram", {}, len(text)) # ... 算法逻辑 ... ``` ## 错误处理 ### 自定义异常 ```python # src/functional_scaffold/core/errors.py class AlgorithmError(FunctionalScaffoldError): """算法执行错误""" pass class ModelNotLoadedError(AlgorithmError): """模型未加载错误""" pass class InvalidInputError(AlgorithmError): """无效输入错误""" pass ``` ### 在算法中使用 ```python from ..core.errors import InvalidInputError, ModelNotLoadedError class MLPredictor(BaseAlgorithm): def process(self, features: list) -> Dict[str, Any]: if not features: raise InvalidInputError("特征向量不能为空") if self.model is None: raise ModelNotLoadedError("模型未加载,请检查模型文件") # ... 算法逻辑 ... ``` ## 测试 ### 单元测试 ```python # tests/test_text_processor.py import pytest from functional_scaffold.algorithms.text_processor import TextProcessor class TestTextProcessor: """文本处理算法测试""" def test_upper_operation(self): """测试大写转换""" processor = TextProcessor() result = processor.process("hello", "upper") assert result["processed"] == "HELLO" assert result["operation"] == "upper" def test_lower_operation(self): """测试小写转换""" processor = TextProcessor() result = processor.process("HELLO", "lower") assert result["processed"] == "hello" def test_reverse_operation(self): """测试反转""" processor = TextProcessor() result = processor.process("hello", "reverse") assert result["processed"] == "olleh" def test_invalid_operation(self): """测试无效操作""" processor = TextProcessor() with pytest.raises(ValueError, match="不支持的操作"): processor.process("hello", "invalid") def test_execute_wrapper(self): """测试 execute 包装器""" processor = TextProcessor() result = processor.execute("hello", "upper") assert result["success"] is True assert result["result"]["processed"] == "HELLO" assert "elapsed_time" in result["metadata"] ``` ### API 集成测试 ```python # tests/test_text_api.py import pytest from fastapi import status class TestTextProcessAPI: """文本处理 API 测试""" def test_process_text_success(self, client): """测试成功处理""" response = client.post( "/text/process", json={"text": "hello", "operation": "upper"} ) assert response.status_code == status.HTTP_200_OK data = response.json() assert data["status"] == "success" assert data["result"]["processed"] == "HELLO" def test_process_text_invalid_operation(self, client): """测试无效操作""" response = client.post( "/text/process", json={"text": "hello", "operation": "invalid"} ) assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY ``` ## 最佳实践 ### 1. 保持 process() 方法简洁 ```python # ✅ 好的做法 def process(self, data): validated = self._validate(data) transformed = self._transform(validated) result = self._compute(transformed) return self._format_output(result) # ❌ 避免 def process(self, data): # 200 行代码全部写在这里... ``` ### 2. 使用类型注解 ```python # ✅ 好的做法 def process(self, text: str, max_length: int = 100) -> Dict[str, Any]: ... # ❌ 避免 def process(self, text, max_length=100): ... ``` ### 3. 合理使用日志 ```python import logging logger = logging.getLogger(__name__) class MyAlgorithm(BaseAlgorithm): def process(self, data): logger.info(f"开始处理数据,大小: {len(data)}") # ... 处理逻辑 ... logger.info(f"处理完成,结果大小: {len(result)}") return result ``` ### 4. 资源管理 ```python class ResourceIntensiveAlgorithm(BaseAlgorithm): def __init__(self): super().__init__() self._resource = None def _ensure_resource(self): """延迟加载资源""" if self._resource is None: self._resource = self._load_heavy_resource() def process(self, data): self._ensure_resource() return self._resource.process(data) ``` ## 参考 - [快速入门指南](./getting-started.md) - [API 参考文档](./api-reference.md) - [监控指南](./monitoring.md)