main:新增 Worker 支持及任务管理优化
变更内容: - 添加 Worker 进程模块,支持基于 Redis 的任务管理及分布式锁。 - 增加 `entrypoint.sh` 启动脚本,支持根据 `RUN_MODE` 自动运行 API 或 Worker。 - 优化 `docker-compose.yml` 配置,添加镜像及平台支持。 - 在 JobManager 中集成 `request_id` 上下文传递,改进日志追踪功能。 - 扩展单元测试,提升测试覆盖率。
This commit is contained in:
@@ -16,6 +16,7 @@ import redis.asyncio as aioredis
|
||||
from ..algorithms.base import BaseAlgorithm
|
||||
from ..config import settings
|
||||
from ..core.metrics_unified import incr, observe
|
||||
from ..core.tracing import set_request_id
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -176,6 +177,7 @@ class JobManager:
|
||||
"job_id": job_id,
|
||||
"status": job_data.get("status", ""),
|
||||
"algorithm": job_data.get("algorithm", ""),
|
||||
"request_id": job_data.get("request_id") or None,
|
||||
"created_at": job_data.get("created_at", ""),
|
||||
"started_at": job_data.get("started_at") or None,
|
||||
"completed_at": job_data.get("completed_at") or None,
|
||||
@@ -223,6 +225,11 @@ class JobManager:
|
||||
|
||||
algorithm_name = job_data.get("algorithm", "")
|
||||
webhook_url = job_data.get("webhook", "")
|
||||
request_id = job_data.get("request_id", "")
|
||||
|
||||
# 设置 request_id 上下文,确保日志中包含 request_id
|
||||
if request_id:
|
||||
set_request_id(request_id)
|
||||
|
||||
# 解析参数
|
||||
try:
|
||||
@@ -234,7 +241,9 @@ class JobManager:
|
||||
async with self._semaphore:
|
||||
# 更新状态为 running
|
||||
started_at = self._get_timestamp()
|
||||
await self._redis_client.hset(key, mapping={"status": "running", "started_at": started_at})
|
||||
await self._redis_client.hset(
|
||||
key, mapping={"status": "running", "started_at": started_at}
|
||||
)
|
||||
|
||||
logger.info(f"开始执行任务: job_id={job_id}, algorithm={algorithm_name}")
|
||||
|
||||
@@ -295,7 +304,9 @@ class JobManager:
|
||||
incr("jobs_completed_total", {"algorithm": algorithm_name, "status": status})
|
||||
observe("job_execution_duration_seconds", {"algorithm": algorithm_name}, elapsed_time)
|
||||
|
||||
logger.info(f"任务执行完成: job_id={job_id}, status={status}, elapsed={elapsed_time:.3f}s")
|
||||
logger.info(
|
||||
f"任务执行完成: job_id={job_id}, status={status}, elapsed={elapsed_time:.3f}s"
|
||||
)
|
||||
|
||||
# 发送 Webhook 回调
|
||||
if webhook_url:
|
||||
|
||||
Reference in New Issue
Block a user