Files
FunctionalScaffold/src/functional_scaffold/api/routes.py
Roog (顾新培) 265e8d1e3d main:支持 Worker 模式运行并优化任务管理
变更内容:
- 在 `Dockerfile` 和 `docker-compose.yml` 中添加 Worker 模式支持,包含运行模式 `RUN_MODE` 的配置。
- 更新 API 路由,改为将任务入队处理,并由 Worker 执行。
- 在 JobManager 中新增任务队列及分布式锁功能,支持任务的入队、出队、执行控制以及重试机制。
- 添加全局并发控制逻辑,避免任务超额运行。
- 扩展单元测试,覆盖任务队列、锁机制和并发控制的各类场景。
- 在 Serverless 配置中分别为 API 和 Worker 添加独立服务定义。

提升任务调度灵活性,增强系统可靠性与扩展性。
2026-02-03 18:38:08 +08:00

349 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""API 路由"""
from fastapi import APIRouter, HTTPException, Depends, status
import time
import logging
from .models import (
InvokeRequest,
InvokeResponse,
HealthResponse,
ReadinessResponse,
ErrorResponse,
JobRequest,
JobCreateResponse,
JobStatusResponse,
JobStatus,
ConcurrencyStatusResponse,
)
from .dependencies import get_request_id
from ..algorithms.prime_checker import PrimeChecker
from ..core.errors import ValidationError, AlgorithmError
from ..core.job_manager import get_job_manager
logger = logging.getLogger(__name__)
router = APIRouter()
@router.post(
"/invoke",
response_model=InvokeResponse,
status_code=status.HTTP_200_OK,
summary="同步调用算法",
description="同步调用质数判断算法,立即返回结果",
responses={
200: {"description": "成功", "model": InvokeResponse},
400: {"description": "请求参数错误", "model": ErrorResponse},
500: {"description": "服务器内部错误", "model": ErrorResponse},
},
)
async def invoke_algorithm(
request: InvokeRequest,
request_id: str = Depends(get_request_id),
):
"""
同步调用质数判断算法
- **number**: 待判断的整数
"""
try:
logger.info(f"Processing request {request_id} with number={request.number}")
# 创建算法实例并执行
checker = PrimeChecker()
execution_result = checker.execute(request.number)
if not execution_result["success"]:
raise AlgorithmError(
execution_result.get("error", "Algorithm execution failed"),
details=execution_result.get("metadata", {}),
)
return InvokeResponse(
request_id=request_id,
status="success",
result=execution_result["result"],
metadata=execution_result["metadata"],
)
except ValidationError as e:
logger.warning(f"Validation error for request {request_id}: {e.message}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=e.to_dict(),
)
except AlgorithmError as e:
logger.error(f"Algorithm error for request {request_id}: {e.message}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=e.to_dict(),
)
except Exception as e:
logger.error(f"Unexpected error for request {request_id}: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail={
"error": "INTERNAL_ERROR",
"message": str(e),
"request_id": request_id,
},
)
@router.get(
"/healthz",
response_model=HealthResponse,
summary="健康检查",
description="检查服务是否存活",
)
async def health_check():
"""
健康检查端点
返回服务的健康状态,用于存活探针
"""
return HealthResponse(
status="healthy",
timestamp=time.time(),
)
@router.get(
"/readyz",
response_model=ReadinessResponse,
summary="就绪检查",
description="检查服务是否就绪",
)
async def readiness_check():
"""
就绪检查端点
返回服务的就绪状态,用于就绪探针
"""
# 这里可以添加更多检查,例如数据库连接、外部服务等
checks = {
"algorithm": True, # 算法模块可用
}
all_ready = all(checks.values())
return ReadinessResponse(
status="ready" if all_ready else "not_ready",
timestamp=time.time(),
checks=checks,
)
@router.post(
"/jobs",
response_model=JobCreateResponse,
status_code=status.HTTP_202_ACCEPTED,
summary="创建异步任务",
description="创建异步任务,立即返回任务 ID任务在后台执行",
responses={
202: {"description": "任务已创建", "model": JobCreateResponse},
400: {"description": "请求参数错误", "model": ErrorResponse},
404: {"description": "算法不存在", "model": ErrorResponse},
503: {"description": "服务不可用", "model": ErrorResponse},
},
)
async def create_job(
request: JobRequest,
request_id: str = Depends(get_request_id),
):
"""
创建异步任务
- **algorithm**: 算法名称(如 PrimeChecker
- **params**: 算法参数
- **webhook**: 任务完成后的回调 URL可选
"""
try:
job_manager = await get_job_manager()
# 检查任务管理器是否可用
if not job_manager.is_available():
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail={
"error": "SERVICE_UNAVAILABLE",
"message": "任务服务暂不可用,请稍后重试",
"request_id": request_id,
},
)
# 验证算法存在
available_algorithms = job_manager.get_available_algorithms()
if request.algorithm not in available_algorithms:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail={
"error": "ALGORITHM_NOT_FOUND",
"message": f"算法 '{request.algorithm}' 不存在",
"details": {"available_algorithms": available_algorithms},
"request_id": request_id,
},
)
# 创建任务
job_id = await job_manager.create_job(
algorithm=request.algorithm,
params=request.params,
webhook=request.webhook,
request_id=request_id,
)
# 获取任务信息
job_data = await job_manager.get_job(job_id)
# 任务入队,由 Worker 执行
await job_manager.enqueue_job(job_id)
logger.info(f"异步任务已创建并入队: job_id={job_id}, request_id={request_id}")
return JobCreateResponse(
job_id=job_id,
status=JobStatus.PENDING,
message="任务已创建",
created_at=job_data["created_at"],
)
except HTTPException:
raise
except Exception as e:
logger.error(f"创建任务失败: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail={
"error": "INTERNAL_ERROR",
"message": str(e),
"request_id": request_id,
},
)
@router.get(
"/jobs/{job_id}",
response_model=JobStatusResponse,
summary="查询任务状态",
description="查询异步任务的执行状态和结果",
responses={
200: {"description": "成功", "model": JobStatusResponse},
404: {"description": "任务不存在或已过期", "model": ErrorResponse},
503: {"description": "服务不可用", "model": ErrorResponse},
},
)
async def get_job_status(job_id: str):
"""
查询任务状态
- **job_id**: 任务唯一标识
"""
try:
job_manager = await get_job_manager()
# 检查任务管理器是否可用
if not job_manager.is_available():
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail={
"error": "SERVICE_UNAVAILABLE",
"message": "任务服务暂不可用,请稍后重试",
},
)
# 获取任务信息
job_data = await job_manager.get_job(job_id)
if not job_data:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail={
"error": "JOB_NOT_FOUND",
"message": f"任务 '{job_id}' 不存在或已过期",
},
)
return JobStatusResponse(
job_id=job_data["job_id"],
status=JobStatus(job_data["status"]),
algorithm=job_data["algorithm"],
created_at=job_data["created_at"],
started_at=job_data["started_at"],
completed_at=job_data["completed_at"],
result=job_data["result"],
error=job_data["error"],
metadata=job_data["metadata"],
)
except HTTPException:
raise
except Exception as e:
logger.error(f"查询任务状态失败: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail={
"error": "INTERNAL_ERROR",
"message": str(e),
},
)
@router.get(
"/jobs/concurrency/status",
response_model=ConcurrencyStatusResponse,
summary="查询并发状态",
description="查询任务管理器的并发执行状态",
responses={
200: {"description": "成功", "model": ConcurrencyStatusResponse},
503: {"description": "服务不可用", "model": ErrorResponse},
},
)
async def get_concurrency_status():
"""
查询并发状态
返回当前任务管理器的并发执行状态,包括:
- 最大并发任务数
- 当前可用槽位数
- 当前运行中的任务数
"""
try:
job_manager = await get_job_manager()
# 检查任务管理器是否可用
if not job_manager.is_available():
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail={
"error": "SERVICE_UNAVAILABLE",
"message": "任务管理器不可用",
},
)
concurrency_status = job_manager.get_concurrency_status()
return ConcurrencyStatusResponse(
max_concurrent=concurrency_status["max_concurrent"],
available_slots=concurrency_status["available_slots"],
running_jobs=concurrency_status["running_jobs"],
)
except HTTPException:
raise
except Exception as e:
logger.error(f"查询并发状态失败: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail={
"error": "INTERNAL_ERROR",
"message": str(e),
},
)