main:新增 Worker 支持及任务管理优化
变更内容: - 添加 Worker 进程模块,支持基于 Redis 的任务管理及分布式锁。 - 增加 `entrypoint.sh` 启动脚本,支持根据 `RUN_MODE` 自动运行 API 或 Worker。 - 优化 `docker-compose.yml` 配置,添加镜像及平台支持。 - 在 JobManager 中集成 `request_id` 上下文传递,改进日志追踪功能。 - 扩展单元测试,提升测试覆盖率。
This commit is contained in:
@@ -16,6 +16,7 @@ import redis.asyncio as aioredis
|
||||
from ..algorithms.base import BaseAlgorithm
|
||||
from ..config import settings
|
||||
from ..core.metrics_unified import incr, observe
|
||||
from ..core.tracing import set_request_id
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -176,6 +177,7 @@ class JobManager:
|
||||
"job_id": job_id,
|
||||
"status": job_data.get("status", ""),
|
||||
"algorithm": job_data.get("algorithm", ""),
|
||||
"request_id": job_data.get("request_id") or None,
|
||||
"created_at": job_data.get("created_at", ""),
|
||||
"started_at": job_data.get("started_at") or None,
|
||||
"completed_at": job_data.get("completed_at") or None,
|
||||
@@ -223,6 +225,11 @@ class JobManager:
|
||||
|
||||
algorithm_name = job_data.get("algorithm", "")
|
||||
webhook_url = job_data.get("webhook", "")
|
||||
request_id = job_data.get("request_id", "")
|
||||
|
||||
# 设置 request_id 上下文,确保日志中包含 request_id
|
||||
if request_id:
|
||||
set_request_id(request_id)
|
||||
|
||||
# 解析参数
|
||||
try:
|
||||
@@ -234,7 +241,9 @@ class JobManager:
|
||||
async with self._semaphore:
|
||||
# 更新状态为 running
|
||||
started_at = self._get_timestamp()
|
||||
await self._redis_client.hset(key, mapping={"status": "running", "started_at": started_at})
|
||||
await self._redis_client.hset(
|
||||
key, mapping={"status": "running", "started_at": started_at}
|
||||
)
|
||||
|
||||
logger.info(f"开始执行任务: job_id={job_id}, algorithm={algorithm_name}")
|
||||
|
||||
@@ -295,7 +304,9 @@ class JobManager:
|
||||
incr("jobs_completed_total", {"algorithm": algorithm_name, "status": status})
|
||||
observe("job_execution_duration_seconds", {"algorithm": algorithm_name}, elapsed_time)
|
||||
|
||||
logger.info(f"任务执行完成: job_id={job_id}, status={status}, elapsed={elapsed_time:.3f}s")
|
||||
logger.info(
|
||||
f"任务执行完成: job_id={job_id}, status={status}, elapsed={elapsed_time:.3f}s"
|
||||
)
|
||||
|
||||
# 发送 Webhook 回调
|
||||
if webhook_url:
|
||||
|
||||
197
src/functional_scaffold/worker.py
Normal file
197
src/functional_scaffold/worker.py
Normal file
@@ -0,0 +1,197 @@
|
||||
"""Worker 进程模块
|
||||
|
||||
基于 Redis 队列的任务 Worker,支持分布式锁和全局并发控制。
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import signal
|
||||
import sys
|
||||
from typing import Optional
|
||||
|
||||
from .config import settings
|
||||
from .core.job_manager import JobManager
|
||||
from .core.logging import setup_logging
|
||||
from .core.tracing import set_request_id
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class JobWorker:
|
||||
"""任务 Worker
|
||||
|
||||
从 Redis 队列获取任务并执行,支持:
|
||||
- 分布式锁防止重复执行
|
||||
- 全局并发控制
|
||||
- 任务重试机制
|
||||
- 优雅关闭
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self._job_manager: Optional[JobManager] = None
|
||||
self._running: bool = False
|
||||
self._current_job_id: Optional[str] = None
|
||||
|
||||
async def initialize(self) -> None:
|
||||
"""初始化 Worker"""
|
||||
self._job_manager = JobManager()
|
||||
await self._job_manager.initialize()
|
||||
logger.info("Worker 初始化完成")
|
||||
|
||||
async def shutdown(self) -> None:
|
||||
"""关闭 Worker"""
|
||||
logger.info("Worker 正在关闭...")
|
||||
self._running = False
|
||||
|
||||
# 等待当前任务完成
|
||||
if self._current_job_id:
|
||||
logger.info(f"等待当前任务完成: {self._current_job_id}")
|
||||
|
||||
if self._job_manager:
|
||||
await self._job_manager.shutdown()
|
||||
|
||||
logger.info("Worker 已关闭")
|
||||
|
||||
async def run(self) -> None:
|
||||
"""运行 Worker 主循环"""
|
||||
self._running = True
|
||||
logger.info(
|
||||
f"Worker 启动,轮询间隔: {settings.worker_poll_interval}s,"
|
||||
f"最大并发: {settings.max_concurrent_jobs}"
|
||||
)
|
||||
|
||||
while self._running:
|
||||
try:
|
||||
await self._process_next_job()
|
||||
except Exception as e:
|
||||
logger.error(f"Worker 循环异常: {e}", exc_info=True)
|
||||
await asyncio.sleep(settings.worker_poll_interval)
|
||||
|
||||
async def _process_next_job(self) -> None:
|
||||
"""处理下一个任务"""
|
||||
if not self._job_manager:
|
||||
logger.error("JobManager 未初始化")
|
||||
await asyncio.sleep(settings.worker_poll_interval)
|
||||
return
|
||||
|
||||
# 从队列获取任务
|
||||
job_id = await self._job_manager.dequeue_job(timeout=int(settings.worker_poll_interval))
|
||||
|
||||
if not job_id:
|
||||
return
|
||||
|
||||
# 获取任务信息以提取 request_id
|
||||
job_data = await self._job_manager.get_job(job_id)
|
||||
if job_data:
|
||||
request_id = job_data.get("request_id") or job_id
|
||||
set_request_id(request_id)
|
||||
else:
|
||||
set_request_id(job_id)
|
||||
|
||||
logger.info(f"从队列获取任务: {job_id}")
|
||||
|
||||
# 尝试获取分布式锁
|
||||
if not await self._job_manager.acquire_job_lock(job_id):
|
||||
logger.warning(f"无法获取任务锁,任务可能正在被其他 Worker 执行: {job_id}")
|
||||
return
|
||||
|
||||
try:
|
||||
# 检查全局并发限制
|
||||
if not await self._job_manager.can_execute():
|
||||
logger.info(f"达到并发限制,任务重新入队: {job_id}")
|
||||
await self._job_manager.enqueue_job(job_id)
|
||||
return
|
||||
|
||||
# 增加并发计数
|
||||
await self._job_manager.increment_concurrency()
|
||||
self._current_job_id = job_id
|
||||
|
||||
try:
|
||||
# 执行任务
|
||||
await self._execute_with_retry(job_id)
|
||||
finally:
|
||||
# 减少并发计数
|
||||
await self._job_manager.decrement_concurrency()
|
||||
self._current_job_id = None
|
||||
|
||||
finally:
|
||||
# 释放分布式锁
|
||||
await self._job_manager.release_job_lock(job_id)
|
||||
|
||||
async def _execute_with_retry(self, job_id: str) -> None:
|
||||
"""执行任务(带重试机制)"""
|
||||
if not self._job_manager:
|
||||
return
|
||||
|
||||
try:
|
||||
# 执行任务
|
||||
await asyncio.wait_for(
|
||||
self._job_manager.execute_job(job_id),
|
||||
timeout=settings.job_execution_timeout,
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
logger.error(f"任务执行超时: {job_id}")
|
||||
await self._handle_job_failure(job_id, "任务执行超时")
|
||||
except Exception as e:
|
||||
logger.error(f"任务执行异常: {job_id}, error={e}", exc_info=True)
|
||||
await self._handle_job_failure(job_id, str(e))
|
||||
|
||||
async def _handle_job_failure(self, job_id: str, error: str) -> None:
|
||||
"""处理任务失败"""
|
||||
if not self._job_manager:
|
||||
return
|
||||
|
||||
retry_count = await self._job_manager.increment_job_retry(job_id)
|
||||
|
||||
if retry_count < settings.job_max_retries:
|
||||
logger.info(f"任务将重试 ({retry_count}/{settings.job_max_retries}): {job_id}")
|
||||
# 重新入队
|
||||
await self._job_manager.enqueue_job(job_id)
|
||||
else:
|
||||
logger.error(f"任务达到最大重试次数,标记为失败: {job_id}")
|
||||
# 更新任务状态为失败
|
||||
if self._job_manager._redis_client:
|
||||
key = f"job:{job_id}"
|
||||
await self._job_manager._redis_client.hset(
|
||||
key,
|
||||
mapping={
|
||||
"status": "failed",
|
||||
"error": f"达到最大重试次数 ({settings.job_max_retries}): {error}",
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def setup_signal_handlers(worker: JobWorker, loop: asyncio.AbstractEventLoop) -> None:
|
||||
"""设置信号处理器"""
|
||||
|
||||
def signal_handler(sig: signal.Signals) -> None:
|
||||
logger.info(f"收到信号 {sig.name},准备关闭...")
|
||||
loop.create_task(worker.shutdown())
|
||||
|
||||
for sig in (signal.SIGTERM, signal.SIGINT):
|
||||
loop.add_signal_handler(sig, signal_handler, sig)
|
||||
|
||||
|
||||
async def main() -> None:
|
||||
"""Worker 入口函数"""
|
||||
# 设置日志
|
||||
setup_logging(level=settings.log_level, format_type=settings.log_format)
|
||||
|
||||
worker = JobWorker()
|
||||
|
||||
# 设置信号处理
|
||||
loop = asyncio.get_running_loop()
|
||||
setup_signal_handlers(worker, loop)
|
||||
|
||||
try:
|
||||
await worker.initialize()
|
||||
await worker.run()
|
||||
except Exception as e:
|
||||
logger.error(f"Worker 异常退出: {e}", exc_info=True)
|
||||
sys.exit(1)
|
||||
finally:
|
||||
await worker.shutdown()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
Reference in New Issue
Block a user