From 1ea0623a79bd140055dae66f70c9a1a6bcbb41e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roog=20=28=E9=A1=BE=E6=96=B0=E5=9F=B9=29?= Date: Tue, 3 Feb 2026 15:13:11 +0800 Subject: [PATCH] =?UTF-8?q?main:=E6=96=B0=E5=A2=9E=20Worker=20=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E5=8F=8A=E4=BB=BB=E5=8A=A1=E7=AE=A1=E7=90=86=E4=BC=98?= =?UTF-8?q?=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 变更内容: - 添加 Worker 进程模块,支持基于 Redis 的任务管理及分布式锁。 - 增加 `entrypoint.sh` 启动脚本,支持根据 `RUN_MODE` 自动运行 API 或 Worker。 - 优化 `docker-compose.yml` 配置,添加镜像及平台支持。 - 在 JobManager 中集成 `request_id` 上下文传递,改进日志追踪功能。 - 扩展单元测试,提升测试覆盖率。 --- deployment/entrypoint.sh | 12 ++ src/functional_scaffold/core/job_manager.py | 15 +- src/functional_scaffold/worker.py | 197 ++++++++++++++++++++ tests/test_job_manager.py | 5 +- 4 files changed, 223 insertions(+), 6 deletions(-) create mode 100644 deployment/entrypoint.sh create mode 100644 src/functional_scaffold/worker.py diff --git a/deployment/entrypoint.sh b/deployment/entrypoint.sh new file mode 100644 index 0000000..cd90ab6 --- /dev/null +++ b/deployment/entrypoint.sh @@ -0,0 +1,12 @@ +#!/bin/bash +# 启动脚本:根据 RUN_MODE 环境变量选择启动 API 或 Worker + +set -e + +if [ "$RUN_MODE" = "worker" ]; then + echo "启动 Worker 模式..." + exec python -m functional_scaffold.worker +else + echo "启动 API 模式..." + exec uvicorn functional_scaffold.main:app --host 0.0.0.0 --port 8000 +fi diff --git a/src/functional_scaffold/core/job_manager.py b/src/functional_scaffold/core/job_manager.py index 68581a1..44c27c2 100644 --- a/src/functional_scaffold/core/job_manager.py +++ b/src/functional_scaffold/core/job_manager.py @@ -16,6 +16,7 @@ import redis.asyncio as aioredis from ..algorithms.base import BaseAlgorithm from ..config import settings from ..core.metrics_unified import incr, observe +from ..core.tracing import set_request_id logger = logging.getLogger(__name__) @@ -176,6 +177,7 @@ class JobManager: "job_id": job_id, "status": job_data.get("status", ""), "algorithm": job_data.get("algorithm", ""), + "request_id": job_data.get("request_id") or None, "created_at": job_data.get("created_at", ""), "started_at": job_data.get("started_at") or None, "completed_at": job_data.get("completed_at") or None, @@ -223,6 +225,11 @@ class JobManager: algorithm_name = job_data.get("algorithm", "") webhook_url = job_data.get("webhook", "") + request_id = job_data.get("request_id", "") + + # 设置 request_id 上下文,确保日志中包含 request_id + if request_id: + set_request_id(request_id) # 解析参数 try: @@ -234,7 +241,9 @@ class JobManager: async with self._semaphore: # 更新状态为 running started_at = self._get_timestamp() - await self._redis_client.hset(key, mapping={"status": "running", "started_at": started_at}) + await self._redis_client.hset( + key, mapping={"status": "running", "started_at": started_at} + ) logger.info(f"开始执行任务: job_id={job_id}, algorithm={algorithm_name}") @@ -295,7 +304,9 @@ class JobManager: incr("jobs_completed_total", {"algorithm": algorithm_name, "status": status}) observe("job_execution_duration_seconds", {"algorithm": algorithm_name}, elapsed_time) - logger.info(f"任务执行完成: job_id={job_id}, status={status}, elapsed={elapsed_time:.3f}s") + logger.info( + f"任务执行完成: job_id={job_id}, status={status}, elapsed={elapsed_time:.3f}s" + ) # 发送 Webhook 回调 if webhook_url: diff --git a/src/functional_scaffold/worker.py b/src/functional_scaffold/worker.py new file mode 100644 index 0000000..90b9993 --- /dev/null +++ b/src/functional_scaffold/worker.py @@ -0,0 +1,197 @@ +"""Worker 进程模块 + +基于 Redis 队列的任务 Worker,支持分布式锁和全局并发控制。 +""" + +import asyncio +import logging +import signal +import sys +from typing import Optional + +from .config import settings +from .core.job_manager import JobManager +from .core.logging import setup_logging +from .core.tracing import set_request_id + +logger = logging.getLogger(__name__) + + +class JobWorker: + """任务 Worker + + 从 Redis 队列获取任务并执行,支持: + - 分布式锁防止重复执行 + - 全局并发控制 + - 任务重试机制 + - 优雅关闭 + """ + + def __init__(self): + self._job_manager: Optional[JobManager] = None + self._running: bool = False + self._current_job_id: Optional[str] = None + + async def initialize(self) -> None: + """初始化 Worker""" + self._job_manager = JobManager() + await self._job_manager.initialize() + logger.info("Worker 初始化完成") + + async def shutdown(self) -> None: + """关闭 Worker""" + logger.info("Worker 正在关闭...") + self._running = False + + # 等待当前任务完成 + if self._current_job_id: + logger.info(f"等待当前任务完成: {self._current_job_id}") + + if self._job_manager: + await self._job_manager.shutdown() + + logger.info("Worker 已关闭") + + async def run(self) -> None: + """运行 Worker 主循环""" + self._running = True + logger.info( + f"Worker 启动,轮询间隔: {settings.worker_poll_interval}s," + f"最大并发: {settings.max_concurrent_jobs}" + ) + + while self._running: + try: + await self._process_next_job() + except Exception as e: + logger.error(f"Worker 循环异常: {e}", exc_info=True) + await asyncio.sleep(settings.worker_poll_interval) + + async def _process_next_job(self) -> None: + """处理下一个任务""" + if not self._job_manager: + logger.error("JobManager 未初始化") + await asyncio.sleep(settings.worker_poll_interval) + return + + # 从队列获取任务 + job_id = await self._job_manager.dequeue_job(timeout=int(settings.worker_poll_interval)) + + if not job_id: + return + + # 获取任务信息以提取 request_id + job_data = await self._job_manager.get_job(job_id) + if job_data: + request_id = job_data.get("request_id") or job_id + set_request_id(request_id) + else: + set_request_id(job_id) + + logger.info(f"从队列获取任务: {job_id}") + + # 尝试获取分布式锁 + if not await self._job_manager.acquire_job_lock(job_id): + logger.warning(f"无法获取任务锁,任务可能正在被其他 Worker 执行: {job_id}") + return + + try: + # 检查全局并发限制 + if not await self._job_manager.can_execute(): + logger.info(f"达到并发限制,任务重新入队: {job_id}") + await self._job_manager.enqueue_job(job_id) + return + + # 增加并发计数 + await self._job_manager.increment_concurrency() + self._current_job_id = job_id + + try: + # 执行任务 + await self._execute_with_retry(job_id) + finally: + # 减少并发计数 + await self._job_manager.decrement_concurrency() + self._current_job_id = None + + finally: + # 释放分布式锁 + await self._job_manager.release_job_lock(job_id) + + async def _execute_with_retry(self, job_id: str) -> None: + """执行任务(带重试机制)""" + if not self._job_manager: + return + + try: + # 执行任务 + await asyncio.wait_for( + self._job_manager.execute_job(job_id), + timeout=settings.job_execution_timeout, + ) + except asyncio.TimeoutError: + logger.error(f"任务执行超时: {job_id}") + await self._handle_job_failure(job_id, "任务执行超时") + except Exception as e: + logger.error(f"任务执行异常: {job_id}, error={e}", exc_info=True) + await self._handle_job_failure(job_id, str(e)) + + async def _handle_job_failure(self, job_id: str, error: str) -> None: + """处理任务失败""" + if not self._job_manager: + return + + retry_count = await self._job_manager.increment_job_retry(job_id) + + if retry_count < settings.job_max_retries: + logger.info(f"任务将重试 ({retry_count}/{settings.job_max_retries}): {job_id}") + # 重新入队 + await self._job_manager.enqueue_job(job_id) + else: + logger.error(f"任务达到最大重试次数,标记为失败: {job_id}") + # 更新任务状态为失败 + if self._job_manager._redis_client: + key = f"job:{job_id}" + await self._job_manager._redis_client.hset( + key, + mapping={ + "status": "failed", + "error": f"达到最大重试次数 ({settings.job_max_retries}): {error}", + }, + ) + + +def setup_signal_handlers(worker: JobWorker, loop: asyncio.AbstractEventLoop) -> None: + """设置信号处理器""" + + def signal_handler(sig: signal.Signals) -> None: + logger.info(f"收到信号 {sig.name},准备关闭...") + loop.create_task(worker.shutdown()) + + for sig in (signal.SIGTERM, signal.SIGINT): + loop.add_signal_handler(sig, signal_handler, sig) + + +async def main() -> None: + """Worker 入口函数""" + # 设置日志 + setup_logging(level=settings.log_level, format_type=settings.log_format) + + worker = JobWorker() + + # 设置信号处理 + loop = asyncio.get_running_loop() + setup_signal_handlers(worker, loop) + + try: + await worker.initialize() + await worker.run() + except Exception as e: + logger.error(f"Worker 异常退出: {e}", exc_info=True) + sys.exit(1) + finally: + await worker.shutdown() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/tests/test_job_manager.py b/tests/test_job_manager.py index 2423963..e7374e5 100644 --- a/tests/test_job_manager.py +++ b/tests/test_job_manager.py @@ -1,17 +1,13 @@ """异步任务管理器测试""" import asyncio -import json import pytest from unittest.mock import AsyncMock, MagicMock, patch from fastapi import status from functional_scaffold.core.job_manager import ( JobManager, - get_job_manager, - shutdown_job_manager, ) -from functional_scaffold.api.models import JobStatus class TestJobManager: @@ -188,6 +184,7 @@ class TestJobManagerWithMocks: # 初始化 semaphore import asyncio + manager._semaphore = asyncio.Semaphore(10) await manager.execute_job("test-job-id")