Files
FunctionalScaffold/docs/algorithm-development.md
Roog (顾新培) 8ca2f64f7e main:移除 src 目录结构,更新模块引用路径
变更内容:
- 删除 `src` 子目录,将模块引用路径从 `src.functional_scaffold` 更新为 `functional_scaffold`。
- 修改相关代码、文档、测试用例及配置文件中的路径引用,包括 `README.md`、`Dockerfile`、`uvicorn` 启动命令等。
- 优化项目目录结构,提升代码维护性和可读性。
2026-02-03 11:29:37 +08:00

15 KiB

算法开发指南

本文档详细介绍如何在 FunctionalScaffold 框架中开发算法,包括最佳实践、高级特性和常见模式。

算法架构

核心概念

┌─────────────────────────────────────────────────────────────┐
│                      HTTP 请求                               │
└─────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────┐
│                    FastAPI 路由层                            │
│  - 参数验证 (Pydantic)                                       │
│  - 请求 ID 生成                                              │
│  - 错误处理                                                  │
└─────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────┐
│                    BaseAlgorithm.execute()                   │
│  - 自动计时                                                  │
│  - 指标记录                                                  │
│  - 异常捕获                                                  │
└─────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────┐
│                    YourAlgorithm.process()                   │
│  ★ 你只需要实现这个方法 ★                                    │
└─────────────────────────────────────────────────────────────┘

BaseAlgorithm 基类

所有算法必须继承 BaseAlgorithm 类:

from abc import ABC, abstractmethod
from typing import Any, Dict

class BaseAlgorithm(ABC):
    """算法基类"""

    def __init__(self):
        self.name = self.__class__.__name__
        self.version = "1.0.0"

    @abstractmethod
    def process(self, *args, **kwargs) -> Dict[str, Any]:
        """算法处理逻辑 - 子类必须实现"""
        pass

    def execute(self, *args, **kwargs) -> Dict[str, Any]:
        """执行算法 - 自动处理埋点和错误"""
        # 框架自动处理:计时、日志、指标、异常
        pass

开发算法

基础示例

# src/functional_scaffold/algorithms/text_processor.py
from typing import Dict, Any
from .base import BaseAlgorithm

class TextProcessor(BaseAlgorithm):
    """文本处理算法"""

    def __init__(self):
        super().__init__()
        self.version = "1.0.0"

    def process(self, text: str, operation: str = "upper") -> Dict[str, Any]:
        """
        处理文本

        Args:
            text: 输入文本
            operation: 操作类型 (upper/lower/reverse)

        Returns:
            处理结果
        """
        if operation == "upper":
            result = text.upper()
        elif operation == "lower":
            result = text.lower()
        elif operation == "reverse":
            result = text[::-1]
        else:
            raise ValueError(f"不支持的操作: {operation}")

        return {
            "original": text,
            "processed": result,
            "operation": operation,
            "length": len(result)
        }

带模型加载的算法

# src/functional_scaffold/algorithms/ml_predictor.py
from typing import Dict, Any
import logging
from .base import BaseAlgorithm

logger = logging.getLogger(__name__)

class MLPredictor(BaseAlgorithm):
    """机器学习预测算法"""

    def __init__(self, model_path: str = None):
        super().__init__()
        self.model = None
        self.model_path = model_path or "models/default.pkl"
        self._load_model()

    def _load_model(self):
        """加载模型(在初始化时执行一次)"""
        logger.info(f"加载模型: {self.model_path}")
        # import joblib
        # self.model = joblib.load(self.model_path)
        self.model = "mock_model"  # 示例
        logger.info("模型加载完成")

    def process(self, features: list) -> Dict[str, Any]:
        """
        执行预测

        Args:
            features: 特征向量

        Returns:
            预测结果
        """
        if self.model is None:
            raise RuntimeError("模型未加载")

        # prediction = self.model.predict([features])[0]
        prediction = sum(features) / len(features)  # 示例

        return {
            "features": features,
            "prediction": prediction,
            "model_version": self.version
        }

带外部服务调用的算法

# src/functional_scaffold/algorithms/data_fetcher.py
from typing import Dict, Any
import httpx
from .base import BaseAlgorithm
from ..config import settings

class DataFetcher(BaseAlgorithm):
    """数据获取算法"""

    def __init__(self):
        super().__init__()
        self.api_base_url = settings.external_api_url or "https://api.example.com"

    async def _fetch_data(self, endpoint: str) -> dict:
        """异步获取数据"""
        async with httpx.AsyncClient() as client:
            response = await client.get(f"{self.api_base_url}/{endpoint}")
            response.raise_for_status()
            return response.json()

    def process(self, data_id: str) -> Dict[str, Any]:
        """
        获取并处理数据

        Args:
            data_id: 数据 ID

        Returns:
            处理后的数据
        """
        import asyncio

        # 在同步方法中调用异步函数
        loop = asyncio.new_event_loop()
        try:
            data = loop.run_until_complete(self._fetch_data(f"data/{data_id}"))
        finally:
            loop.close()

        # 处理数据
        processed = self._transform(data)

        return {
            "data_id": data_id,
            "raw_data": data,
            "processed_data": processed
        }

    def _transform(self, data: dict) -> dict:
        """数据转换"""
        return {k: v for k, v in data.items() if v is not None}

数据模型定义

请求模型

# src/functional_scaffold/api/models.py
from pydantic import BaseModel, Field, ConfigDict, field_validator
from typing import List, Optional

class TextProcessRequest(BaseModel):
    """文本处理请求"""

    model_config = ConfigDict(
        json_schema_extra={
            "example": {
                "text": "Hello World",
                "operation": "upper"
            }
        }
    )

    text: str = Field(..., min_length=1, max_length=10000, description="输入文本")
    operation: str = Field("upper", description="操作类型")

    @field_validator("operation")
    @classmethod
    def validate_operation(cls, v):
        allowed = ["upper", "lower", "reverse"]
        if v not in allowed:
            raise ValueError(f"operation 必须是 {allowed} 之一")
        return v


class MLPredictRequest(BaseModel):
    """ML 预测请求"""

    model_config = ConfigDict(
        json_schema_extra={
            "example": {
                "features": [1.0, 2.0, 3.0, 4.0]
            }
        }
    )

    features: List[float] = Field(..., min_length=1, description="特征向量")

响应模型

class TextProcessResponse(BaseModel):
    """文本处理响应"""

    request_id: str = Field(..., description="请求 ID")
    status: str = Field(..., description="处理状态")
    result: Dict[str, Any] = Field(..., description="处理结果")
    metadata: Dict[str, Any] = Field(..., description="元数据")

路由注册

同步接口

# src/functional_scaffold/api/routes.py
from fastapi import APIRouter, HTTPException, Depends, status
from .models import TextProcessRequest, TextProcessResponse
from .dependencies import get_request_id
from ..algorithms.text_processor import TextProcessor

router = APIRouter()

@router.post(
    "/text/process",
    response_model=TextProcessResponse,
    summary="文本处理",
    description="对输入文本执行指定操作",
)
async def process_text(
    request: TextProcessRequest,
    request_id: str = Depends(get_request_id),
):
    """文本处理端点"""
    try:
        processor = TextProcessor()
        result = processor.execute(request.text, request.operation)

        if not result["success"]:
            raise HTTPException(
                status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
                detail={"error": "ALGORITHM_ERROR", "message": result["error"]}
            )

        return TextProcessResponse(
            request_id=request_id,
            status="success",
            result=result["result"],
            metadata=result["metadata"]
        )

    except ValueError as e:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail={"error": "VALIDATION_ERROR", "message": str(e)}
        )

异步任务接口

算法注册后自动支持异步调用,无需额外代码:

# 创建异步任务
curl -X POST http://localhost:8000/jobs \
  -H "Content-Type: application/json" \
  -d '{
    "algorithm": "TextProcessor",
    "params": {"text": "hello", "operation": "upper"}
  }'

自定义指标

定义指标

config/metrics.yaml 中添加:

custom_metrics:
  # 文本处理统计
  text_process_total:
    name: "text_process_total"
    type: counter
    description: "文本处理总数"
    labels: [operation]

  text_length_histogram:
    name: "text_length_histogram"
    type: histogram
    description: "处理文本长度分布"
    labels: []
    buckets: [10, 50, 100, 500, 1000, 5000, 10000]

记录指标

from ..core.metrics_unified import incr, observe

class TextProcessor(BaseAlgorithm):
    def process(self, text: str, operation: str = "upper") -> Dict[str, Any]:
        # 记录操作计数
        incr("text_process_total", {"operation": operation})

        # 记录文本长度
        observe("text_length_histogram", {}, len(text))

        # ... 算法逻辑 ...

错误处理

自定义异常

# src/functional_scaffold/core/errors.py
class AlgorithmError(FunctionalScaffoldError):
    """算法执行错误"""
    pass

class ModelNotLoadedError(AlgorithmError):
    """模型未加载错误"""
    pass

class InvalidInputError(AlgorithmError):
    """无效输入错误"""
    pass

在算法中使用

from ..core.errors import InvalidInputError, ModelNotLoadedError

class MLPredictor(BaseAlgorithm):
    def process(self, features: list) -> Dict[str, Any]:
        if not features:
            raise InvalidInputError("特征向量不能为空")

        if self.model is None:
            raise ModelNotLoadedError("模型未加载,请检查模型文件")

        # ... 算法逻辑 ...

测试

单元测试

# tests/test_text_processor.py
import pytest
from functional_scaffold.algorithms.text_processor import TextProcessor

class TestTextProcessor:
    """文本处理算法测试"""

    def test_upper_operation(self):
        """测试大写转换"""
        processor = TextProcessor()
        result = processor.process("hello", "upper")

        assert result["processed"] == "HELLO"
        assert result["operation"] == "upper"

    def test_lower_operation(self):
        """测试小写转换"""
        processor = TextProcessor()
        result = processor.process("HELLO", "lower")

        assert result["processed"] == "hello"

    def test_reverse_operation(self):
        """测试反转"""
        processor = TextProcessor()
        result = processor.process("hello", "reverse")

        assert result["processed"] == "olleh"

    def test_invalid_operation(self):
        """测试无效操作"""
        processor = TextProcessor()

        with pytest.raises(ValueError, match="不支持的操作"):
            processor.process("hello", "invalid")

    def test_execute_wrapper(self):
        """测试 execute 包装器"""
        processor = TextProcessor()
        result = processor.execute("hello", "upper")

        assert result["success"] is True
        assert result["result"]["processed"] == "HELLO"
        assert "elapsed_time" in result["metadata"]

API 集成测试

# tests/test_text_api.py
import pytest
from fastapi import status

class TestTextProcessAPI:
    """文本处理 API 测试"""

    def test_process_text_success(self, client):
        """测试成功处理"""
        response = client.post(
            "/text/process",
            json={"text": "hello", "operation": "upper"}
        )

        assert response.status_code == status.HTTP_200_OK
        data = response.json()
        assert data["status"] == "success"
        assert data["result"]["processed"] == "HELLO"

    def test_process_text_invalid_operation(self, client):
        """测试无效操作"""
        response = client.post(
            "/text/process",
            json={"text": "hello", "operation": "invalid"}
        )

        assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY

最佳实践

1. 保持 process() 方法简洁

# ✅ 好的做法
def process(self, data):
    validated = self._validate(data)
    transformed = self._transform(validated)
    result = self._compute(transformed)
    return self._format_output(result)

# ❌ 避免
def process(self, data):
    # 200 行代码全部写在这里...

2. 使用类型注解

# ✅ 好的做法
def process(self, text: str, max_length: int = 100) -> Dict[str, Any]:
    ...

# ❌ 避免
def process(self, text, max_length=100):
    ...

3. 合理使用日志

import logging
logger = logging.getLogger(__name__)

class MyAlgorithm(BaseAlgorithm):
    def process(self, data):
        logger.info(f"开始处理数据,大小: {len(data)}")
        # ... 处理逻辑 ...
        logger.info(f"处理完成,结果大小: {len(result)}")
        return result

4. 资源管理

class ResourceIntensiveAlgorithm(BaseAlgorithm):
    def __init__(self):
        super().__init__()
        self._resource = None

    def _ensure_resource(self):
        """延迟加载资源"""
        if self._resource is None:
            self._resource = self._load_heavy_resource()

    def process(self, data):
        self._ensure_resource()
        return self._resource.process(data)

参考