main:优化任务管理及队列监控性能

变更内容:
- 优化任务出队逻辑,采用 BLMOVE 提升队列操作的原子性和可靠性。
- 在 JobManager 中新增任务锁续租、超时任务回收、ACK/NACK 状态管理功能。
- 实现任务队列和死信队列监控指标收集,为系统性能分析提供数据支持。
- 扩展 Worker 模块,增加锁续租逻辑及任务回收调度。
- 更新测试用例,覆盖任务管理和队列指标的新增逻辑。
- 补充 metrics.yaml 文件,添加队列相关的监控指标定义。
- 更新依赖,补充 Redis 支持及相关库版本规范。
This commit is contained in:
2026-02-03 18:18:02 +08:00
parent 73bd66813c
commit 7b627090f3
8 changed files with 1318 additions and 46 deletions

View File

@@ -58,13 +58,25 @@ class Settings(BaseSettings):
max_concurrent_jobs: int = 10 # 最大并发任务数
# Worker 配置
worker_poll_interval: float = 1.0 # Worker 轮询间隔(秒)
worker_poll_interval: float = 0.1 # Worker 轮询间隔(秒)
job_queue_key: str = "job:queue" # 任务队列 Redis Key
job_concurrency_key: str = "job:concurrency" # 全局并发计数器 Redis Key
job_lock_ttl: int = 300 # 任务锁 TTL
job_max_retries: int = 3 # 任务最大重试次数
job_execution_timeout: int = 300 # 任务执行超时(秒)
# 处理队列配置
job_processing_key: str = "job:processing" # 处理中队列
job_processing_ts_key: str = "job:processing:ts" # 处理时间戳 ZSET
job_dlq_key: str = "job:dlq" # 死信队列
# 锁配置扩展
job_lock_buffer: int = 60 # 锁 TTL 缓冲时间(秒)
# 回收器配置
job_sweeper_enabled: bool = True # 启用回收器
job_sweeper_interval: int = 60 # 回收扫描间隔(秒)
# 全局配置实例
settings = Settings()

View File

@@ -7,6 +7,7 @@ import asyncio
import json
import logging
import secrets
import time
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Type
@@ -24,6 +25,24 @@ logger = logging.getLogger(__name__)
class JobManager:
"""异步任务管理器"""
# Lua 脚本:安全释放锁(验证 token
RELEASE_LOCK_SCRIPT = """
local current = redis.call('GET', KEYS[1])
if current == ARGV[1] then
return redis.call('DEL', KEYS[1])
end
return 0
"""
# Lua 脚本:锁续租(验证 token 后延长 TTL
RENEW_LOCK_SCRIPT = """
local current = redis.call('GET', KEYS[1])
if current == ARGV[1] then
return redis.call('EXPIRE', KEYS[1], ARGV[2])
end
return 0
"""
def __init__(self):
self._redis_client: Optional[aioredis.Redis] = None
self._algorithm_registry: Dict[str, Type[BaseAlgorithm]] = {}
@@ -405,7 +424,10 @@ class JobManager:
return False
async def dequeue_job(self, timeout: int = 5) -> Optional[str]:
"""从队列获取任务(阻塞式)
"""从队列获取任务(阻塞式,转移式出队
使用 BLMOVE 原子性地将任务从 job:queue 移动到 job:processing
防止 Worker 崩溃时任务丢失。
Args:
timeout: 阻塞超时时间(秒)
@@ -417,44 +439,54 @@ class JobManager:
return None
try:
result = await self._redis_client.brpop(settings.job_queue_key, timeout=timeout)
if result:
# brpop 返回 (key, value) 元组
return result[1]
return None
# 使用 BLMOVE 原子性转移任务
job_id = await self._redis_client.blmove(
settings.job_queue_key, # 源: job:queue
settings.job_processing_key, # 目标: job:processing
timeout,
"RIGHT",
"LEFT",
)
if job_id:
# 记录出队时间戳到 ZSET
await self._redis_client.zadd(settings.job_processing_ts_key, {job_id: time.time()})
logger.debug(f"任务已转移到处理队列: {job_id}")
return job_id
except Exception as e:
logger.error(f"任务出队失败: error={e}")
return None
async def acquire_job_lock(self, job_id: str) -> bool:
"""获取任务执行锁(分布式锁)
async def acquire_job_lock(self, job_id: str) -> Optional[str]:
"""获取任务执行锁(分布式锁,带 Token
Args:
job_id: 任务 ID
Returns:
bool: 是否成功获取锁
Optional[str]: 成功时返回锁 token失败返回 None
"""
if not self._redis_client:
return False
return None
lock_key = f"job:lock:{job_id}"
lock_token = secrets.token_hex(16) # 随机 token
lock_ttl = settings.job_execution_timeout + settings.job_lock_buffer
try:
acquired = await self._redis_client.set(
lock_key, "locked", nx=True, ex=settings.job_lock_ttl
)
acquired = await self._redis_client.set(lock_key, lock_token, nx=True, ex=lock_ttl)
if acquired:
logger.debug(f"获取任务锁成功: job_id={job_id}")
return acquired is not None
return lock_token
return None
except Exception as e:
logger.error(f"获取任务锁失败: job_id={job_id}, error={e}")
return False
return None
async def release_job_lock(self, job_id: str) -> bool:
"""释放任务执行锁
async def release_job_lock(self, job_id: str, lock_token: Optional[str] = None) -> bool:
"""释放任务执行锁(使用 Lua 脚本验证 token
Args:
job_id: 任务 ID
lock_token: 锁 token用于验证所有权
Returns:
bool: 是否成功释放锁
@@ -464,9 +496,22 @@ class JobManager:
lock_key = f"job:lock:{job_id}"
try:
await self._redis_client.delete(lock_key)
logger.debug(f"释放任务锁成功: job_id={job_id}")
return True
if lock_token:
# 使用 Lua 脚本安全释放锁
result = await self._redis_client.eval(
self.RELEASE_LOCK_SCRIPT, 1, lock_key, lock_token
)
if result == 1:
logger.debug(f"释放任务锁成功: job_id={job_id}")
return True
else:
logger.warning(f"释放任务锁失败token 不匹配): job_id={job_id}")
return False
else:
# 向后兼容:无 token 时直接删除
await self._redis_client.delete(lock_key)
logger.debug(f"释放任务锁成功(无 token 验证): job_id={job_id}")
return True
except Exception as e:
logger.error(f"释放任务锁失败: job_id={job_id}, error={e}")
return False
@@ -572,6 +617,136 @@ class JobManager:
logger.error(f"增加重试次数失败: job_id={job_id}, error={e}")
return 0
async def ack_job(self, job_id: str) -> bool:
"""确认任务完成(从处理队列移除)
Args:
job_id: 任务 ID
Returns:
bool: 是否成功确认
"""
if not self._redis_client:
return False
try:
async with self._redis_client.pipeline(transaction=True) as pipe:
pipe.lrem(settings.job_processing_key, 1, job_id)
pipe.zrem(settings.job_processing_ts_key, job_id)
await pipe.execute()
logger.debug(f"任务已确认完成: job_id={job_id}")
return True
except Exception as e:
logger.error(f"确认任务失败: job_id={job_id}, error={e}")
return False
async def nack_job(self, job_id: str, requeue: bool = True) -> bool:
"""拒绝任务(从处理队列移除,根据重试次数决定重新入队或进死信队列)
Args:
job_id: 任务 ID
requeue: 是否尝试重新入队
Returns:
bool: 是否成功处理
"""
if not self._redis_client:
return False
try:
retry_count = await self.get_job_retry_count(job_id)
async with self._redis_client.pipeline(transaction=True) as pipe:
pipe.lrem(settings.job_processing_key, 1, job_id)
pipe.zrem(settings.job_processing_ts_key, job_id)
if requeue and retry_count < settings.job_max_retries:
pipe.lpush(settings.job_queue_key, job_id)
logger.info(f"任务重新入队: job_id={job_id}, retry_count={retry_count}")
else:
pipe.lpush(settings.job_dlq_key, job_id)
logger.warning(f"任务进入死信队列: job_id={job_id}, retry_count={retry_count}")
await pipe.execute()
return True
except Exception as e:
logger.error(f"拒绝任务失败: job_id={job_id}, error={e}")
return False
async def renew_job_lock(self, job_id: str, lock_token: str) -> bool:
"""续租任务锁(延长 TTL
Args:
job_id: 任务 ID
lock_token: 锁 token
Returns:
bool: 是否成功续租
"""
if not self._redis_client:
return False
lock_key = f"job:lock:{job_id}"
lock_ttl = settings.job_execution_timeout + settings.job_lock_buffer
try:
result = await self._redis_client.eval(
self.RENEW_LOCK_SCRIPT, 1, lock_key, lock_token, lock_ttl
)
if result == 1:
logger.debug(f"锁续租成功: job_id={job_id}")
return True
else:
logger.warning(f"锁续租失败token 不匹配或锁已过期): job_id={job_id}")
return False
except Exception as e:
logger.error(f"锁续租失败: job_id={job_id}, error={e}")
return False
async def recover_stale_jobs(self) -> int:
"""回收超时任务
扫描 job:processing:ts ZSET找出超时的任务
根据重试次数决定重新入队或进死信队列。
Returns:
int: 回收的任务数量
"""
if not self._redis_client:
return 0
timeout = settings.job_execution_timeout + settings.job_lock_buffer
cutoff = time.time() - timeout
try:
# 获取超时任务列表
stale_jobs = await self._redis_client.zrangebyscore(
settings.job_processing_ts_key, "-inf", cutoff
)
recovered = 0
for job_id in stale_jobs:
# 增加重试次数
await self.increment_job_retry(job_id)
retry_count = await self.get_job_retry_count(job_id)
async with self._redis_client.pipeline(transaction=True) as pipe:
pipe.lrem(settings.job_processing_key, 1, job_id)
pipe.zrem(settings.job_processing_ts_key, job_id)
if retry_count < settings.job_max_retries:
pipe.lpush(settings.job_queue_key, job_id)
logger.info(f"超时任务重新入队: job_id={job_id}, retry_count={retry_count}")
else:
pipe.lpush(settings.job_dlq_key, job_id)
logger.warning(
f"超时任务进入死信队列: job_id={job_id}, retry_count={retry_count}"
)
await pipe.execute()
recovered += 1
if recovered > 0:
logger.info(f"回收超时任务完成: 共 {recovered}")
return recovered
except Exception as e:
logger.error(f"回收超时任务失败: error={e}")
return 0
def get_concurrency_status(self) -> Dict[str, int]:
"""获取并发状态
@@ -598,6 +773,67 @@ class JobManager:
"running_jobs": running_jobs,
}
async def collect_queue_metrics(self) -> Dict[str, Any]:
"""收集队列监控指标
Returns:
Dict[str, Any]: 包含以下键的字典
- queue_length: 待处理队列长度
- processing_length: 处理中队列长度
- dlq_length: 死信队列长度
- oldest_waiting_seconds: 最长等待时间(秒)
"""
if not self._redis_client:
return {
"queue_length": 0,
"processing_length": 0,
"dlq_length": 0,
"oldest_waiting_seconds": 0,
}
try:
# 使用 pipeline 批量获取队列长度
async with self._redis_client.pipeline(transaction=False) as pipe:
pipe.llen(settings.job_queue_key)
pipe.llen(settings.job_processing_key)
pipe.llen(settings.job_dlq_key)
pipe.zrange(settings.job_processing_ts_key, 0, 0, withscores=True)
results = await pipe.execute()
queue_length = results[0] or 0
processing_length = results[1] or 0
dlq_length = results[2] or 0
# 计算最长等待时间
oldest_waiting_seconds = 0
if results[3]:
# results[3] 是 [(job_id, timestamp), ...] 格式
oldest_ts = results[3][0][1]
oldest_waiting_seconds = time.time() - oldest_ts
# 更新指标
from .metrics_unified import set as metrics_set
metrics_set("job_queue_length", {"queue": "pending"}, queue_length)
metrics_set("job_queue_length", {"queue": "processing"}, processing_length)
metrics_set("job_queue_length", {"queue": "dlq"}, dlq_length)
metrics_set("job_oldest_waiting_seconds", None, oldest_waiting_seconds)
return {
"queue_length": queue_length,
"processing_length": processing_length,
"dlq_length": dlq_length,
"oldest_waiting_seconds": oldest_waiting_seconds,
}
except Exception as e:
logger.error(f"收集队列指标失败: error={e}")
return {
"queue_length": 0,
"processing_length": 0,
"dlq_length": 0,
"oldest_waiting_seconds": 0,
}
# 全局单例
_job_manager: Optional[JobManager] = None

View File

@@ -24,6 +24,8 @@ class JobWorker:
- 分布式锁防止重复执行
- 全局并发控制
- 任务重试机制
- 锁续租机制
- 超时任务回收
- 优雅关闭
"""
@@ -31,6 +33,9 @@ class JobWorker:
self._job_manager: Optional[JobManager] = None
self._running: bool = False
self._current_job_id: Optional[str] = None
self._current_lock_token: Optional[str] = None
self._lock_renewal_task: Optional[asyncio.Task] = None
self._sweeper_task: Optional[asyncio.Task] = None
async def initialize(self) -> None:
"""初始化 Worker"""
@@ -43,6 +48,22 @@ class JobWorker:
logger.info("Worker 正在关闭...")
self._running = False
# 取消回收器任务
if self._sweeper_task and not self._sweeper_task.done():
self._sweeper_task.cancel()
try:
await self._sweeper_task
except asyncio.CancelledError:
pass
# 取消锁续租任务
if self._lock_renewal_task and not self._lock_renewal_task.done():
self._lock_renewal_task.cancel()
try:
await self._lock_renewal_task
except asyncio.CancelledError:
pass
# 等待当前任务完成
if self._current_job_id:
logger.info(f"等待当前任务完成: {self._current_job_id}")
@@ -60,6 +81,11 @@ class JobWorker:
f"最大并发: {settings.max_concurrent_jobs}"
)
# 启动超时任务回收器
if settings.job_sweeper_enabled:
self._sweeper_task = asyncio.create_task(self._sweeper_loop())
logger.info(f"超时任务回收器已启动,扫描间隔: {settings.job_sweeper_interval}s")
while self._running:
try:
await self._process_next_job()
@@ -74,7 +100,7 @@ class JobWorker:
await asyncio.sleep(settings.worker_poll_interval)
return
# 从队列获取任务
# 从队列获取任务(转移式出队)
job_id = await self._job_manager.dequeue_job(timeout=int(settings.worker_poll_interval))
if not job_id:
@@ -90,16 +116,23 @@ class JobWorker:
logger.info(f"从队列获取任务: {job_id}")
# 尝试获取分布式锁
if not await self._job_manager.acquire_job_lock(job_id):
# 尝试获取分布式锁(返回 token
lock_token = await self._job_manager.acquire_job_lock(job_id)
if not lock_token:
logger.warning(f"无法获取任务锁,任务可能正在被其他 Worker 执行: {job_id}")
# 任务留在 processing 队列,等待回收器处理
return
self._current_lock_token = lock_token
# 启动锁续租协程
self._lock_renewal_task = asyncio.create_task(self._lock_renewal_loop(job_id, lock_token))
try:
# 检查全局并发限制
if not await self._job_manager.can_execute():
logger.info(f"达到并发限制,任务重新入队: {job_id}")
await self._job_manager.enqueue_job(job_id)
logger.info(f"达到并发限制,任务 NACK 重新入队: {job_id}")
await self._job_manager.nack_job(job_id, requeue=True)
return
# 增加并发计数
@@ -108,20 +141,39 @@ class JobWorker:
try:
# 执行任务
await self._execute_with_retry(job_id)
success = await self._execute_with_retry(job_id)
if success:
await self._job_manager.ack_job(job_id)
else:
await self._job_manager.increment_job_retry(job_id)
await self._job_manager.nack_job(job_id, requeue=True)
finally:
# 减少并发计数
await self._job_manager.decrement_concurrency()
self._current_job_id = None
finally:
# 释放分布式锁
await self._job_manager.release_job_lock(job_id)
# 停止锁续租
if self._lock_renewal_task and not self._lock_renewal_task.done():
self._lock_renewal_task.cancel()
try:
await self._lock_renewal_task
except asyncio.CancelledError:
pass
self._lock_renewal_task = None
async def _execute_with_retry(self, job_id: str) -> None:
"""执行任务(带重试机制)"""
# 释放分布式锁
await self._job_manager.release_job_lock(job_id, lock_token)
self._current_lock_token = None
async def _execute_with_retry(self, job_id: str) -> bool:
"""执行任务(带重试机制)
Returns:
bool: 任务是否成功执行
"""
if not self._job_manager:
return
return False
try:
# 执行任务
@@ -129,12 +181,15 @@ class JobWorker:
self._job_manager.execute_job(job_id),
timeout=settings.job_execution_timeout,
)
return True
except asyncio.TimeoutError:
logger.error(f"任务执行超时: {job_id}")
await self._handle_job_failure(job_id, "任务执行超时")
return False
except Exception as e:
logger.error(f"任务执行异常: {job_id}, error={e}", exc_info=True)
await self._handle_job_failure(job_id, str(e))
return False
async def _handle_job_failure(self, job_id: str, error: str) -> None:
"""处理任务失败"""
@@ -160,6 +215,62 @@ class JobWorker:
},
)
async def _lock_renewal_loop(self, job_id: str, lock_token: str) -> None:
"""锁续租协程
定期续租任务锁,防止长任务执行时锁过期。
Args:
job_id: 任务 ID
lock_token: 锁 token
"""
# 续租间隔为锁 TTL 的一半
interval = (settings.job_execution_timeout + settings.job_lock_buffer) / 2
while True:
try:
await asyncio.sleep(interval)
if not self._job_manager:
break
if not await self._job_manager.renew_job_lock(job_id, lock_token):
logger.error(f"锁续租失败,可能已被其他进程获取: {job_id}")
break
logger.debug(f"锁续租成功: {job_id}")
except asyncio.CancelledError:
logger.debug(f"锁续租协程已取消: {job_id}")
break
except Exception as e:
logger.error(f"锁续租异常: {job_id}, error={e}")
break
async def _sweeper_loop(self) -> None:
"""超时任务回收协程
定期扫描处理中队列,回收超时任务,并收集队列监控指标。
"""
while self._running:
try:
await asyncio.sleep(settings.job_sweeper_interval)
if not self._job_manager:
continue
# 回收超时任务
recovered = await self._job_manager.recover_stale_jobs()
if recovered > 0:
logger.info(f"回收超时任务: {recovered}")
# 记录回收指标
from .core.metrics_unified import incr
incr("job_recovered_total", None, recovered)
# 收集队列监控指标
await self._job_manager.collect_queue_metrics()
except asyncio.CancelledError:
logger.debug("超时任务回收协程已取消")
break
except Exception as e:
logger.error(f"超时任务回收异常: {e}")
def setup_signal_handlers(worker: JobWorker, loop: asyncio.AbstractEventLoop) -> None:
"""设置信号处理器"""