From 7c8b96927df437fb8a375401b1d191ae3dbf9df7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roog=20=28=E9=A1=BE=E6=96=B0=E5=9F=B9=29?= Date: Tue, 3 Feb 2026 18:18:02 +0800 Subject: [PATCH] =?UTF-8?q?main:=E4=BC=98=E5=8C=96=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E7=AE=A1=E7=90=86=E5=8F=8A=E9=98=9F=E5=88=97=E7=9B=91=E6=8E=A7?= =?UTF-8?q?=E6=80=A7=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 变更内容: - 优化任务出队逻辑,采用 BLMOVE 提升队列操作的原子性和可靠性。 - 在 JobManager 中新增任务锁续租、超时任务回收、ACK/NACK 状态管理功能。 - 实现任务队列和死信队列监控指标收集,为系统性能分析提供数据支持。 - 扩展 Worker 模块,增加锁续租逻辑及任务回收调度。 - 更新测试用例,覆盖任务管理和队列指标的新增逻辑。 - 补充 metrics.yaml 文件,添加队列相关的监控指标定义。 - 更新依赖,补充 Redis 支持及相关库版本规范。 --- config/metrics.yaml | 20 + monitoring/grafana/dashboards/dashboard.json | 498 +++++++++++++++++++ pyproject.toml | 7 +- requirements.txt | 6 +- src/functional_scaffold/config.py | 14 +- src/functional_scaffold/core/job_manager.py | 276 +++++++++- src/functional_scaffold/worker.py | 133 ++++- tests/test_job_manager.py | 410 ++++++++++++++- 8 files changed, 1318 insertions(+), 46 deletions(-) diff --git a/config/metrics.yaml b/config/metrics.yaml index 695708f..67990bf 100644 --- a/config/metrics.yaml +++ b/config/metrics.yaml @@ -94,6 +94,26 @@ custom_metrics: type: counter description: "Webhook 回调发送总数" labels: [status] + + # 队列监控指标 + job_queue_length: + name: "job_queue_length" + type: gauge + description: "待处理任务队列长度" + labels: [queue] + + job_oldest_waiting_seconds: + name: "job_oldest_waiting_seconds" + type: gauge + description: "最长任务等待时间(秒)" + labels: [] + + job_recovered_total: + name: "job_recovered_total" + type: counter + description: "回收的超时任务总数" + labels: [] + prime_check_total: name: "prime_check" type: counter diff --git a/monitoring/grafana/dashboards/dashboard.json b/monitoring/grafana/dashboards/dashboard.json index 748e211..2b54a21 100644 --- a/monitoring/grafana/dashboards/dashboard.json +++ b/monitoring/grafana/dashboards/dashboard.json @@ -1395,6 +1395,504 @@ ], "title": "Webhook 发送状态", "type": "piechart" + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 53 + }, + "id": 200, + "panels": [], + "title": "队列监控", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "任务数", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 20, + "gradientMode": "opacity", + "hideFrom": { + "tooltip": false, + "viz": false, + "legend": false + }, + "lineInterpolation": "smooth", + "lineWidth": 2, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": true, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 50 + }, + { + "color": "red", + "value": 100 + } + ] + }, + "unit": "short" + }, + "overrides": [ + { + "matcher": { + "id": "byName", + "options": "pending" + }, + "properties": [ + { + "id": "color", + "value": { + "fixedColor": "blue", + "mode": "fixed" + } + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "processing" + }, + "properties": [ + { + "id": "color", + "value": { + "fixedColor": "orange", + "mode": "fixed" + } + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "dlq" + }, + "properties": [ + { + "id": "color", + "value": { + "fixedColor": "red", + "mode": "fixed" + } + } + ] + } + ] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 54 + }, + "id": 19, + "options": { + "legend": { + "calcs": ["mean", "last", "max"], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "expr": "job_queue_length", + "legendFormat": "{{queue}}", + "refId": "A" + } + ], + "title": "队列长度趋势", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "秒", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 20, + "gradientMode": "opacity", + "hideFrom": { + "tooltip": false, + "viz": false, + "legend": false + }, + "lineInterpolation": "smooth", + "lineWidth": 2, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": true, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "line" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 60 + }, + { + "color": "red", + "value": 300 + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 54 + }, + "id": 20, + "options": { + "legend": { + "calcs": ["mean", "last", "max"], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "pluginVersion": "9.0.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "expr": "job_oldest_waiting_seconds", + "legendFormat": "最长等待时间", + "refId": "A" + } + ], + "title": "最长任务等待时间", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 10 + }, + { + "color": "red", + "value": 50 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 6, + "x": 0, + "y": 62 + }, + "id": 21, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "values": false, + "calcs": ["last"], + "fields": "" + }, + "textMode": "auto" + }, + "pluginVersion": "9.0.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "expr": "job_queue_length{queue=\"pending\"}", + "refId": "A" + } + ], + "title": "待处理队列", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 5 + }, + { + "color": "red", + "value": 10 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 6, + "x": 6, + "y": 62 + }, + "id": 22, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "values": false, + "calcs": ["last"], + "fields": "" + }, + "textMode": "auto" + }, + "pluginVersion": "9.0.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "expr": "job_queue_length{queue=\"processing\"}", + "refId": "A" + } + ], + "title": "处理中队列", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 1 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 6, + "x": 12, + "y": 62 + }, + "id": 23, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "values": false, + "calcs": ["last"], + "fields": "" + }, + "textMode": "auto" + }, + "pluginVersion": "9.0.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "expr": "job_queue_length{queue=\"dlq\"}", + "refId": "A" + } + ], + "title": "死信队列", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 6, + "x": 18, + "y": 62 + }, + "id": 24, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "values": false, + "calcs": ["last"], + "fields": "" + }, + "textMode": "auto" + }, + "pluginVersion": "9.0.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "expr": "sum(job_recovered_total) or vector(0)", + "refId": "A" + } + ], + "title": "回收任务总数", + "type": "stat" } ], "refresh": "5s", diff --git a/pyproject.toml b/pyproject.toml index 6d96702..e8373ce 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,6 +19,12 @@ dependencies = [ "pydantic-settings>=2.0.0", "prometheus-client>=0.19.0", "python-json-logger>=2.0.7", + # Redis - 任务队列和指标存储 + "redis>=5.0.0", + # YAML 配置解析 + "pyyaml>=6.0.0", + # HTTP 客户端(Webhook 回调) + "httpx>=0.27.0", ] [project.optional-dependencies] @@ -26,7 +32,6 @@ dev = [ "pytest>=7.4.0", "pytest-asyncio>=0.21.0", "pytest-cov>=4.1.0", - "httpx>=0.26.0", "black>=23.12.0", "ruff>=0.1.0", ] diff --git a/requirements.txt b/requirements.txt index 2da63da..6337e8e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ +# 核心依赖 - 与 pyproject.toml 保持同步 fastapi>=0.109.0 uvicorn[standard]>=0.27.0 pydantic>=2.5.0 @@ -5,12 +6,11 @@ pydantic-settings>=2.0.0 prometheus-client>=0.19.0 python-json-logger>=2.0.7 -# 指标存储方案(可选,根据选择的方案安装) -# 方案2:Redis 方案需要 +# Redis - 任务队列和指标存储 redis>=5.0.0 # YAML 配置解析 pyyaml>=6.0.0 -# HTTP 客户端(用于 Webhook 回调) +# HTTP 客户端(Webhook 回调) httpx>=0.27.0 diff --git a/src/functional_scaffold/config.py b/src/functional_scaffold/config.py index 7b782d7..595ffcc 100644 --- a/src/functional_scaffold/config.py +++ b/src/functional_scaffold/config.py @@ -58,13 +58,25 @@ class Settings(BaseSettings): max_concurrent_jobs: int = 10 # 最大并发任务数 # Worker 配置 - worker_poll_interval: float = 1.0 # Worker 轮询间隔(秒) + worker_poll_interval: float = 0.1 # Worker 轮询间隔(秒) job_queue_key: str = "job:queue" # 任务队列 Redis Key job_concurrency_key: str = "job:concurrency" # 全局并发计数器 Redis Key job_lock_ttl: int = 300 # 任务锁 TTL(秒) job_max_retries: int = 3 # 任务最大重试次数 job_execution_timeout: int = 300 # 任务执行超时(秒) + # 处理队列配置 + job_processing_key: str = "job:processing" # 处理中队列 + job_processing_ts_key: str = "job:processing:ts" # 处理时间戳 ZSET + job_dlq_key: str = "job:dlq" # 死信队列 + + # 锁配置扩展 + job_lock_buffer: int = 60 # 锁 TTL 缓冲时间(秒) + + # 回收器配置 + job_sweeper_enabled: bool = True # 启用回收器 + job_sweeper_interval: int = 60 # 回收扫描间隔(秒) + # 全局配置实例 settings = Settings() diff --git a/src/functional_scaffold/core/job_manager.py b/src/functional_scaffold/core/job_manager.py index 44c27c2..10d5891 100644 --- a/src/functional_scaffold/core/job_manager.py +++ b/src/functional_scaffold/core/job_manager.py @@ -7,6 +7,7 @@ import asyncio import json import logging import secrets +import time from datetime import datetime, timezone from typing import Any, Dict, List, Optional, Type @@ -24,6 +25,24 @@ logger = logging.getLogger(__name__) class JobManager: """异步任务管理器""" + # Lua 脚本:安全释放锁(验证 token) + RELEASE_LOCK_SCRIPT = """ +local current = redis.call('GET', KEYS[1]) +if current == ARGV[1] then + return redis.call('DEL', KEYS[1]) +end +return 0 +""" + + # Lua 脚本:锁续租(验证 token 后延长 TTL) + RENEW_LOCK_SCRIPT = """ +local current = redis.call('GET', KEYS[1]) +if current == ARGV[1] then + return redis.call('EXPIRE', KEYS[1], ARGV[2]) +end +return 0 +""" + def __init__(self): self._redis_client: Optional[aioredis.Redis] = None self._algorithm_registry: Dict[str, Type[BaseAlgorithm]] = {} @@ -405,7 +424,10 @@ class JobManager: return False async def dequeue_job(self, timeout: int = 5) -> Optional[str]: - """从队列获取任务(阻塞式) + """从队列获取任务(阻塞式,转移式出队) + + 使用 BLMOVE 原子性地将任务从 job:queue 移动到 job:processing, + 防止 Worker 崩溃时任务丢失。 Args: timeout: 阻塞超时时间(秒) @@ -417,44 +439,54 @@ class JobManager: return None try: - result = await self._redis_client.brpop(settings.job_queue_key, timeout=timeout) - if result: - # brpop 返回 (key, value) 元组 - return result[1] - return None + # 使用 BLMOVE 原子性转移任务 + job_id = await self._redis_client.blmove( + settings.job_queue_key, # 源: job:queue + settings.job_processing_key, # 目标: job:processing + timeout, + "RIGHT", + "LEFT", + ) + if job_id: + # 记录出队时间戳到 ZSET + await self._redis_client.zadd(settings.job_processing_ts_key, {job_id: time.time()}) + logger.debug(f"任务已转移到处理队列: {job_id}") + return job_id except Exception as e: logger.error(f"任务出队失败: error={e}") return None - async def acquire_job_lock(self, job_id: str) -> bool: - """获取任务执行锁(分布式锁) + async def acquire_job_lock(self, job_id: str) -> Optional[str]: + """获取任务执行锁(分布式锁,带 Token) Args: job_id: 任务 ID Returns: - bool: 是否成功获取锁 + Optional[str]: 成功时返回锁 token,失败返回 None """ if not self._redis_client: - return False + return None lock_key = f"job:lock:{job_id}" + lock_token = secrets.token_hex(16) # 随机 token + lock_ttl = settings.job_execution_timeout + settings.job_lock_buffer try: - acquired = await self._redis_client.set( - lock_key, "locked", nx=True, ex=settings.job_lock_ttl - ) + acquired = await self._redis_client.set(lock_key, lock_token, nx=True, ex=lock_ttl) if acquired: logger.debug(f"获取任务锁成功: job_id={job_id}") - return acquired is not None + return lock_token + return None except Exception as e: logger.error(f"获取任务锁失败: job_id={job_id}, error={e}") - return False + return None - async def release_job_lock(self, job_id: str) -> bool: - """释放任务执行锁 + async def release_job_lock(self, job_id: str, lock_token: Optional[str] = None) -> bool: + """释放任务执行锁(使用 Lua 脚本验证 token) Args: job_id: 任务 ID + lock_token: 锁 token(用于验证所有权) Returns: bool: 是否成功释放锁 @@ -464,9 +496,22 @@ class JobManager: lock_key = f"job:lock:{job_id}" try: - await self._redis_client.delete(lock_key) - logger.debug(f"释放任务锁成功: job_id={job_id}") - return True + if lock_token: + # 使用 Lua 脚本安全释放锁 + result = await self._redis_client.eval( + self.RELEASE_LOCK_SCRIPT, 1, lock_key, lock_token + ) + if result == 1: + logger.debug(f"释放任务锁成功: job_id={job_id}") + return True + else: + logger.warning(f"释放任务锁失败(token 不匹配): job_id={job_id}") + return False + else: + # 向后兼容:无 token 时直接删除 + await self._redis_client.delete(lock_key) + logger.debug(f"释放任务锁成功(无 token 验证): job_id={job_id}") + return True except Exception as e: logger.error(f"释放任务锁失败: job_id={job_id}, error={e}") return False @@ -572,6 +617,136 @@ class JobManager: logger.error(f"增加重试次数失败: job_id={job_id}, error={e}") return 0 + async def ack_job(self, job_id: str) -> bool: + """确认任务完成(从处理队列移除) + + Args: + job_id: 任务 ID + + Returns: + bool: 是否成功确认 + """ + if not self._redis_client: + return False + + try: + async with self._redis_client.pipeline(transaction=True) as pipe: + pipe.lrem(settings.job_processing_key, 1, job_id) + pipe.zrem(settings.job_processing_ts_key, job_id) + await pipe.execute() + logger.debug(f"任务已确认完成: job_id={job_id}") + return True + except Exception as e: + logger.error(f"确认任务失败: job_id={job_id}, error={e}") + return False + + async def nack_job(self, job_id: str, requeue: bool = True) -> bool: + """拒绝任务(从处理队列移除,根据重试次数决定重新入队或进死信队列) + + Args: + job_id: 任务 ID + requeue: 是否尝试重新入队 + + Returns: + bool: 是否成功处理 + """ + if not self._redis_client: + return False + + try: + retry_count = await self.get_job_retry_count(job_id) + async with self._redis_client.pipeline(transaction=True) as pipe: + pipe.lrem(settings.job_processing_key, 1, job_id) + pipe.zrem(settings.job_processing_ts_key, job_id) + if requeue and retry_count < settings.job_max_retries: + pipe.lpush(settings.job_queue_key, job_id) + logger.info(f"任务重新入队: job_id={job_id}, retry_count={retry_count}") + else: + pipe.lpush(settings.job_dlq_key, job_id) + logger.warning(f"任务进入死信队列: job_id={job_id}, retry_count={retry_count}") + await pipe.execute() + return True + except Exception as e: + logger.error(f"拒绝任务失败: job_id={job_id}, error={e}") + return False + + async def renew_job_lock(self, job_id: str, lock_token: str) -> bool: + """续租任务锁(延长 TTL) + + Args: + job_id: 任务 ID + lock_token: 锁 token + + Returns: + bool: 是否成功续租 + """ + if not self._redis_client: + return False + + lock_key = f"job:lock:{job_id}" + lock_ttl = settings.job_execution_timeout + settings.job_lock_buffer + try: + result = await self._redis_client.eval( + self.RENEW_LOCK_SCRIPT, 1, lock_key, lock_token, lock_ttl + ) + if result == 1: + logger.debug(f"锁续租成功: job_id={job_id}") + return True + else: + logger.warning(f"锁续租失败(token 不匹配或锁已过期): job_id={job_id}") + return False + except Exception as e: + logger.error(f"锁续租失败: job_id={job_id}, error={e}") + return False + + async def recover_stale_jobs(self) -> int: + """回收超时任务 + + 扫描 job:processing:ts ZSET,找出超时的任务, + 根据重试次数决定重新入队或进死信队列。 + + Returns: + int: 回收的任务数量 + """ + if not self._redis_client: + return 0 + + timeout = settings.job_execution_timeout + settings.job_lock_buffer + cutoff = time.time() - timeout + + try: + # 获取超时任务列表 + stale_jobs = await self._redis_client.zrangebyscore( + settings.job_processing_ts_key, "-inf", cutoff + ) + + recovered = 0 + for job_id in stale_jobs: + # 增加重试次数 + await self.increment_job_retry(job_id) + retry_count = await self.get_job_retry_count(job_id) + + async with self._redis_client.pipeline(transaction=True) as pipe: + pipe.lrem(settings.job_processing_key, 1, job_id) + pipe.zrem(settings.job_processing_ts_key, job_id) + if retry_count < settings.job_max_retries: + pipe.lpush(settings.job_queue_key, job_id) + logger.info(f"超时任务重新入队: job_id={job_id}, retry_count={retry_count}") + else: + pipe.lpush(settings.job_dlq_key, job_id) + logger.warning( + f"超时任务进入死信队列: job_id={job_id}, retry_count={retry_count}" + ) + await pipe.execute() + recovered += 1 + + if recovered > 0: + logger.info(f"回收超时任务完成: 共 {recovered} 个") + return recovered + except Exception as e: + logger.error(f"回收超时任务失败: error={e}") + return 0 + def get_concurrency_status(self) -> Dict[str, int]: """获取并发状态 @@ -598,6 +773,67 @@ class JobManager: "running_jobs": running_jobs, } + async def collect_queue_metrics(self) -> Dict[str, Any]: + """收集队列监控指标 + + Returns: + Dict[str, Any]: 包含以下键的字典 + - queue_length: 待处理队列长度 + - processing_length: 处理中队列长度 + - dlq_length: 死信队列长度 + - oldest_waiting_seconds: 最长等待时间(秒) + """ + if not self._redis_client: + return { + "queue_length": 0, + "processing_length": 0, + "dlq_length": 0, + "oldest_waiting_seconds": 0, + } + + try: + # 使用 pipeline 批量获取队列长度 + async with self._redis_client.pipeline(transaction=False) as pipe: + pipe.llen(settings.job_queue_key) + pipe.llen(settings.job_processing_key) + pipe.llen(settings.job_dlq_key) + pipe.zrange(settings.job_processing_ts_key, 0, 0, withscores=True) + results = await pipe.execute() + + queue_length = results[0] or 0 + processing_length = results[1] or 0 + dlq_length = results[2] or 0 + + # 计算最长等待时间 + oldest_waiting_seconds = 0 + if results[3]: + # results[3] 是 [(job_id, timestamp), ...] 格式 + oldest_ts = results[3][0][1] + oldest_waiting_seconds = time.time() - oldest_ts + + # 更新指标 + from .metrics_unified import set as metrics_set + + metrics_set("job_queue_length", {"queue": "pending"}, queue_length) + metrics_set("job_queue_length", {"queue": "processing"}, processing_length) + metrics_set("job_queue_length", {"queue": "dlq"}, dlq_length) + metrics_set("job_oldest_waiting_seconds", None, oldest_waiting_seconds) + + return { + "queue_length": queue_length, + "processing_length": processing_length, + "dlq_length": dlq_length, + "oldest_waiting_seconds": oldest_waiting_seconds, + } + except Exception as e: + logger.error(f"收集队列指标失败: error={e}") + return { + "queue_length": 0, + "processing_length": 0, + "dlq_length": 0, + "oldest_waiting_seconds": 0, + } + # 全局单例 _job_manager: Optional[JobManager] = None diff --git a/src/functional_scaffold/worker.py b/src/functional_scaffold/worker.py index 90b9993..ea6a050 100644 --- a/src/functional_scaffold/worker.py +++ b/src/functional_scaffold/worker.py @@ -24,6 +24,8 @@ class JobWorker: - 分布式锁防止重复执行 - 全局并发控制 - 任务重试机制 + - 锁续租机制 + - 超时任务回收 - 优雅关闭 """ @@ -31,6 +33,9 @@ class JobWorker: self._job_manager: Optional[JobManager] = None self._running: bool = False self._current_job_id: Optional[str] = None + self._current_lock_token: Optional[str] = None + self._lock_renewal_task: Optional[asyncio.Task] = None + self._sweeper_task: Optional[asyncio.Task] = None async def initialize(self) -> None: """初始化 Worker""" @@ -43,6 +48,22 @@ class JobWorker: logger.info("Worker 正在关闭...") self._running = False + # 取消回收器任务 + if self._sweeper_task and not self._sweeper_task.done(): + self._sweeper_task.cancel() + try: + await self._sweeper_task + except asyncio.CancelledError: + pass + + # 取消锁续租任务 + if self._lock_renewal_task and not self._lock_renewal_task.done(): + self._lock_renewal_task.cancel() + try: + await self._lock_renewal_task + except asyncio.CancelledError: + pass + # 等待当前任务完成 if self._current_job_id: logger.info(f"等待当前任务完成: {self._current_job_id}") @@ -60,6 +81,11 @@ class JobWorker: f"最大并发: {settings.max_concurrent_jobs}" ) + # 启动超时任务回收器 + if settings.job_sweeper_enabled: + self._sweeper_task = asyncio.create_task(self._sweeper_loop()) + logger.info(f"超时任务回收器已启动,扫描间隔: {settings.job_sweeper_interval}s") + while self._running: try: await self._process_next_job() @@ -74,7 +100,7 @@ class JobWorker: await asyncio.sleep(settings.worker_poll_interval) return - # 从队列获取任务 + # 从队列获取任务(转移式出队) job_id = await self._job_manager.dequeue_job(timeout=int(settings.worker_poll_interval)) if not job_id: @@ -90,16 +116,23 @@ class JobWorker: logger.info(f"从队列获取任务: {job_id}") - # 尝试获取分布式锁 - if not await self._job_manager.acquire_job_lock(job_id): + # 尝试获取分布式锁(返回 token) + lock_token = await self._job_manager.acquire_job_lock(job_id) + if not lock_token: logger.warning(f"无法获取任务锁,任务可能正在被其他 Worker 执行: {job_id}") + # 任务留在 processing 队列,等待回收器处理 return + self._current_lock_token = lock_token + + # 启动锁续租协程 + self._lock_renewal_task = asyncio.create_task(self._lock_renewal_loop(job_id, lock_token)) + try: # 检查全局并发限制 if not await self._job_manager.can_execute(): - logger.info(f"达到并发限制,任务重新入队: {job_id}") - await self._job_manager.enqueue_job(job_id) + logger.info(f"达到并发限制,任务 NACK 重新入队: {job_id}") + await self._job_manager.nack_job(job_id, requeue=True) return # 增加并发计数 @@ -108,20 +141,39 @@ class JobWorker: try: # 执行任务 - await self._execute_with_retry(job_id) + success = await self._execute_with_retry(job_id) + if success: + await self._job_manager.ack_job(job_id) + else: + await self._job_manager.increment_job_retry(job_id) + await self._job_manager.nack_job(job_id, requeue=True) finally: # 减少并发计数 await self._job_manager.decrement_concurrency() self._current_job_id = None finally: - # 释放分布式锁 - await self._job_manager.release_job_lock(job_id) + # 停止锁续租 + if self._lock_renewal_task and not self._lock_renewal_task.done(): + self._lock_renewal_task.cancel() + try: + await self._lock_renewal_task + except asyncio.CancelledError: + pass + self._lock_renewal_task = None - async def _execute_with_retry(self, job_id: str) -> None: - """执行任务(带重试机制)""" + # 释放分布式锁 + await self._job_manager.release_job_lock(job_id, lock_token) + self._current_lock_token = None + + async def _execute_with_retry(self, job_id: str) -> bool: + """执行任务(带重试机制) + + Returns: + bool: 任务是否成功执行 + """ if not self._job_manager: - return + return False try: # 执行任务 @@ -129,12 +181,15 @@ class JobWorker: self._job_manager.execute_job(job_id), timeout=settings.job_execution_timeout, ) + return True except asyncio.TimeoutError: logger.error(f"任务执行超时: {job_id}") await self._handle_job_failure(job_id, "任务执行超时") + return False except Exception as e: logger.error(f"任务执行异常: {job_id}, error={e}", exc_info=True) await self._handle_job_failure(job_id, str(e)) + return False async def _handle_job_failure(self, job_id: str, error: str) -> None: """处理任务失败""" @@ -160,6 +215,62 @@ class JobWorker: }, ) + async def _lock_renewal_loop(self, job_id: str, lock_token: str) -> None: + """锁续租协程 + + 定期续租任务锁,防止长任务执行时锁过期。 + + Args: + job_id: 任务 ID + lock_token: 锁 token + """ + # 续租间隔为锁 TTL 的一半 + interval = (settings.job_execution_timeout + settings.job_lock_buffer) / 2 + while True: + try: + await asyncio.sleep(interval) + if not self._job_manager: + break + if not await self._job_manager.renew_job_lock(job_id, lock_token): + logger.error(f"锁续租失败,可能已被其他进程获取: {job_id}") + break + logger.debug(f"锁续租成功: {job_id}") + except asyncio.CancelledError: + logger.debug(f"锁续租协程已取消: {job_id}") + break + except Exception as e: + logger.error(f"锁续租异常: {job_id}, error={e}") + break + + async def _sweeper_loop(self) -> None: + """超时任务回收协程 + + 定期扫描处理中队列,回收超时任务,并收集队列监控指标。 + """ + while self._running: + try: + await asyncio.sleep(settings.job_sweeper_interval) + if not self._job_manager: + continue + + # 回收超时任务 + recovered = await self._job_manager.recover_stale_jobs() + if recovered > 0: + logger.info(f"回收超时任务: {recovered} 个") + # 记录回收指标 + from .core.metrics_unified import incr + + incr("job_recovered_total", None, recovered) + + # 收集队列监控指标 + await self._job_manager.collect_queue_metrics() + + except asyncio.CancelledError: + logger.debug("超时任务回收协程已取消") + break + except Exception as e: + logger.error(f"超时任务回收异常: {e}") + def setup_signal_handlers(worker: JobWorker, loop: asyncio.AbstractEventLoop) -> None: """设置信号处理器""" diff --git a/tests/test_job_manager.py b/tests/test_job_manager.py index e7374e5..86c1c83 100644 --- a/tests/test_job_manager.py +++ b/tests/test_job_manager.py @@ -536,17 +536,19 @@ class TestJobQueue: @pytest.mark.asyncio async def test_dequeue_job(self): - """测试任务出队""" + """测试任务出队(使用 BLMOVE)""" manager = JobManager() mock_redis = AsyncMock() - mock_redis.brpop = AsyncMock(return_value=("job:queue", "test-job-id")) + mock_redis.blmove = AsyncMock(return_value="test-job-id") + mock_redis.zadd = AsyncMock() manager._redis_client = mock_redis result = await manager.dequeue_job(timeout=5) assert result == "test-job-id" - mock_redis.brpop.assert_called_once() + mock_redis.blmove.assert_called_once() + mock_redis.zadd.assert_called_once() @pytest.mark.asyncio async def test_dequeue_job_timeout(self): @@ -554,7 +556,7 @@ class TestJobQueue: manager = JobManager() mock_redis = AsyncMock() - mock_redis.brpop = AsyncMock(return_value=None) + mock_redis.blmove = AsyncMock(return_value=None) manager._redis_client = mock_redis result = await manager.dequeue_job(timeout=1) @@ -585,7 +587,8 @@ class TestDistributedLock: result = await manager.acquire_job_lock("test-job-id") - assert result is True + assert result is not None # 返回 token + assert len(result) == 32 # 16 字节的十六进制字符串 mock_redis.set.assert_called_once() call_args = mock_redis.set.call_args assert call_args[0][0] == "job:lock:test-job-id" @@ -603,7 +606,7 @@ class TestDistributedLock: result = await manager.acquire_job_lock("test-job-id") - assert result is False + assert result is None @pytest.mark.asyncio async def test_release_job_lock(self): @@ -611,20 +614,20 @@ class TestDistributedLock: manager = JobManager() mock_redis = AsyncMock() - mock_redis.delete = AsyncMock(return_value=1) + mock_redis.eval = AsyncMock(return_value=1) manager._redis_client = mock_redis - result = await manager.release_job_lock("test-job-id") + result = await manager.release_job_lock("test-job-id", "valid-token") assert result is True - mock_redis.delete.assert_called_once_with("job:lock:test-job-id") + mock_redis.eval.assert_called_once() @pytest.mark.asyncio async def test_release_job_lock_without_redis(self): """测试 Redis 不可用时释放锁""" manager = JobManager() - result = await manager.release_job_lock("test-job-id") + result = await manager.release_job_lock("test-job-id", "token") assert result is False @@ -778,3 +781,390 @@ class TestJobRetry: assert result == 3 mock_redis.hincrby.assert_called_once_with("job:test-job-id", "retry_count", 1) + + +class TestTransferDequeue: + """测试转移式出队功能""" + + @pytest.mark.asyncio + async def test_dequeue_job_with_blmove(self): + """测试使用 BLMOVE 转移式出队""" + manager = JobManager() + + mock_redis = AsyncMock() + mock_redis.blmove = AsyncMock(return_value="test-job-id") + mock_redis.zadd = AsyncMock() + manager._redis_client = mock_redis + + result = await manager.dequeue_job(timeout=5) + + assert result == "test-job-id" + mock_redis.blmove.assert_called_once() + mock_redis.zadd.assert_called_once() + + @pytest.mark.asyncio + async def test_dequeue_job_timeout(self): + """测试出队超时""" + manager = JobManager() + + mock_redis = AsyncMock() + mock_redis.blmove = AsyncMock(return_value=None) + manager._redis_client = mock_redis + + result = await manager.dequeue_job(timeout=1) + + assert result is None + mock_redis.zadd.assert_not_called() + + +class TestTokenBasedLock: + """测试带 Token 的安全锁""" + + @pytest.mark.asyncio + async def test_acquire_job_lock_returns_token(self): + """测试获取锁返回 token""" + manager = JobManager() + + mock_redis = AsyncMock() + mock_redis.set = AsyncMock(return_value=True) + manager._redis_client = mock_redis + + result = await manager.acquire_job_lock("test-job-id") + + assert result is not None + assert len(result) == 32 # 16 字节的十六进制字符串 + mock_redis.set.assert_called_once() + call_args = mock_redis.set.call_args + assert call_args[0][0] == "job:lock:test-job-id" + assert call_args[1]["nx"] is True + + @pytest.mark.asyncio + async def test_acquire_job_lock_already_locked(self): + """测试获取已被锁定的任务锁""" + manager = JobManager() + + mock_redis = AsyncMock() + mock_redis.set = AsyncMock(return_value=None) + manager._redis_client = mock_redis + + result = await manager.acquire_job_lock("test-job-id") + + assert result is None + + @pytest.mark.asyncio + async def test_release_job_lock_with_token(self): + """测试使用 token 释放锁""" + manager = JobManager() + + mock_redis = AsyncMock() + mock_redis.eval = AsyncMock(return_value=1) + manager._redis_client = mock_redis + + result = await manager.release_job_lock("test-job-id", "valid-token") + + assert result is True + mock_redis.eval.assert_called_once() + + @pytest.mark.asyncio + async def test_release_job_lock_invalid_token(self): + """测试使用无效 token 释放锁""" + manager = JobManager() + + mock_redis = AsyncMock() + mock_redis.eval = AsyncMock(return_value=0) + manager._redis_client = mock_redis + + result = await manager.release_job_lock("test-job-id", "invalid-token") + + assert result is False + + @pytest.mark.asyncio + async def test_release_job_lock_without_token(self): + """测试不使用 token 释放锁(向后兼容)""" + manager = JobManager() + + mock_redis = AsyncMock() + mock_redis.delete = AsyncMock() + manager._redis_client = mock_redis + + result = await manager.release_job_lock("test-job-id") + + assert result is True + mock_redis.delete.assert_called_once_with("job:lock:test-job-id") + + +class TestAckNack: + """测试 ACK/NACK 机制""" + + @pytest.mark.asyncio + async def test_ack_job(self): + """测试确认任务完成""" + manager = JobManager() + + mock_pipe = AsyncMock() + mock_pipe.lrem = MagicMock() + mock_pipe.zrem = MagicMock() + mock_pipe.execute = AsyncMock() + mock_pipe.__aenter__ = AsyncMock(return_value=mock_pipe) + mock_pipe.__aexit__ = AsyncMock() + + mock_redis = AsyncMock() + mock_redis.pipeline = MagicMock(return_value=mock_pipe) + manager._redis_client = mock_redis + + result = await manager.ack_job("test-job-id") + + assert result is True + mock_pipe.lrem.assert_called_once() + mock_pipe.zrem.assert_called_once() + + @pytest.mark.asyncio + async def test_nack_job_requeue(self): + """测试拒绝任务并重新入队""" + manager = JobManager() + + mock_pipe = AsyncMock() + mock_pipe.lrem = MagicMock() + mock_pipe.zrem = MagicMock() + mock_pipe.lpush = MagicMock() + mock_pipe.execute = AsyncMock() + mock_pipe.__aenter__ = AsyncMock(return_value=mock_pipe) + mock_pipe.__aexit__ = AsyncMock() + + mock_redis = AsyncMock() + mock_redis.pipeline = MagicMock(return_value=mock_pipe) + mock_redis.hget = AsyncMock(return_value="0") # retry_count = 0 + manager._redis_client = mock_redis + + result = await manager.nack_job("test-job-id", requeue=True) + + assert result is True + assert mock_pipe.lpush.call_count == 1 + + @pytest.mark.asyncio + async def test_nack_job_to_dlq(self): + """测试拒绝任务进入死信队列""" + manager = JobManager() + + mock_pipe = AsyncMock() + mock_pipe.lrem = MagicMock() + mock_pipe.zrem = MagicMock() + mock_pipe.lpush = MagicMock() + mock_pipe.execute = AsyncMock() + mock_pipe.__aenter__ = AsyncMock(return_value=mock_pipe) + mock_pipe.__aexit__ = AsyncMock() + + mock_redis = AsyncMock() + mock_redis.pipeline = MagicMock(return_value=mock_pipe) + mock_redis.hget = AsyncMock(return_value="5") # retry_count > max_retries + manager._redis_client = mock_redis + + with patch("functional_scaffold.core.job_manager.settings") as mock_settings: + mock_settings.job_max_retries = 3 + mock_settings.job_processing_key = "job:processing" + mock_settings.job_processing_ts_key = "job:processing:ts" + mock_settings.job_dlq_key = "job:dlq" + mock_settings.job_queue_key = "job:queue" + + result = await manager.nack_job("test-job-id", requeue=True) + + assert result is True + + +class TestLockRenewal: + """测试锁续租功能""" + + @pytest.mark.asyncio + async def test_renew_job_lock_success(self): + """测试锁续租成功""" + manager = JobManager() + + mock_redis = AsyncMock() + mock_redis.eval = AsyncMock(return_value=1) + manager._redis_client = mock_redis + + result = await manager.renew_job_lock("test-job-id", "valid-token") + + assert result is True + mock_redis.eval.assert_called_once() + + @pytest.mark.asyncio + async def test_renew_job_lock_invalid_token(self): + """测试锁续租失败(token 不匹配)""" + manager = JobManager() + + mock_redis = AsyncMock() + mock_redis.eval = AsyncMock(return_value=0) + manager._redis_client = mock_redis + + result = await manager.renew_job_lock("test-job-id", "invalid-token") + + assert result is False + + @pytest.mark.asyncio + async def test_renew_job_lock_without_redis(self): + """测试 Redis 不可用时续租""" + manager = JobManager() + + result = await manager.renew_job_lock("test-job-id", "token") + + assert result is False + + +class TestStaleJobRecovery: + """测试超时任务回收功能""" + + @pytest.mark.asyncio + async def test_recover_stale_jobs_empty(self): + """测试没有超时任务时的回收""" + manager = JobManager() + + mock_redis = AsyncMock() + mock_redis.zrangebyscore = AsyncMock(return_value=[]) + manager._redis_client = mock_redis + + result = await manager.recover_stale_jobs() + + assert result == 0 + + @pytest.mark.asyncio + async def test_recover_stale_jobs_requeue(self): + """测试回收超时任务并重新入队""" + manager = JobManager() + + mock_pipe = AsyncMock() + mock_pipe.lrem = MagicMock() + mock_pipe.zrem = MagicMock() + mock_pipe.lpush = MagicMock() + mock_pipe.execute = AsyncMock() + mock_pipe.__aenter__ = AsyncMock(return_value=mock_pipe) + mock_pipe.__aexit__ = AsyncMock() + + mock_redis = AsyncMock() + mock_redis.zrangebyscore = AsyncMock(return_value=["stale-job-1", "stale-job-2"]) + mock_redis.hincrby = AsyncMock() + mock_redis.hget = AsyncMock(return_value="1") # retry_count = 1 + mock_redis.pipeline = MagicMock(return_value=mock_pipe) + manager._redis_client = mock_redis + + with patch("functional_scaffold.core.job_manager.settings") as mock_settings: + mock_settings.job_execution_timeout = 300 + mock_settings.job_lock_buffer = 60 + mock_settings.job_max_retries = 3 + mock_settings.job_processing_key = "job:processing" + mock_settings.job_processing_ts_key = "job:processing:ts" + mock_settings.job_dlq_key = "job:dlq" + mock_settings.job_queue_key = "job:queue" + + result = await manager.recover_stale_jobs() + + assert result == 2 + + @pytest.mark.asyncio + async def test_recover_stale_jobs_to_dlq(self): + """测试回收超时任务进入死信队列""" + manager = JobManager() + + mock_pipe = AsyncMock() + mock_pipe.lrem = MagicMock() + mock_pipe.zrem = MagicMock() + mock_pipe.lpush = MagicMock() + mock_pipe.execute = AsyncMock() + mock_pipe.__aenter__ = AsyncMock(return_value=mock_pipe) + mock_pipe.__aexit__ = AsyncMock() + + mock_redis = AsyncMock() + mock_redis.zrangebyscore = AsyncMock(return_value=["stale-job-1"]) + mock_redis.hincrby = AsyncMock() + mock_redis.hget = AsyncMock(return_value="5") # retry_count > max_retries + mock_redis.pipeline = MagicMock(return_value=mock_pipe) + manager._redis_client = mock_redis + + with patch("functional_scaffold.core.job_manager.settings") as mock_settings: + mock_settings.job_execution_timeout = 300 + mock_settings.job_lock_buffer = 60 + mock_settings.job_max_retries = 3 + mock_settings.job_processing_key = "job:processing" + mock_settings.job_processing_ts_key = "job:processing:ts" + mock_settings.job_dlq_key = "job:dlq" + mock_settings.job_queue_key = "job:queue" + + result = await manager.recover_stale_jobs() + + assert result == 1 + + @pytest.mark.asyncio + async def test_recover_stale_jobs_without_redis(self): + """测试 Redis 不可用时回收""" + manager = JobManager() + + result = await manager.recover_stale_jobs() + + assert result == 0 + + +class TestQueueMetrics: + """测试队列监控指标收集""" + + @pytest.mark.asyncio + async def test_collect_queue_metrics(self): + """测试收集队列指标""" + manager = JobManager() + + mock_pipe = AsyncMock() + mock_pipe.llen = MagicMock() + mock_pipe.zrange = MagicMock() + mock_pipe.execute = AsyncMock(return_value=[5, 2, 1, [("job-1", 1000.0)]]) + mock_pipe.__aenter__ = AsyncMock(return_value=mock_pipe) + mock_pipe.__aexit__ = AsyncMock() + + mock_redis = AsyncMock() + mock_redis.pipeline = MagicMock(return_value=mock_pipe) + manager._redis_client = mock_redis + + with patch("functional_scaffold.core.job_manager.time") as mock_time: + mock_time.time.return_value = 1060.0 # 60 秒后 + + with patch("functional_scaffold.core.job_manager.set") as mock_set: + result = await manager.collect_queue_metrics() + + assert result["queue_length"] == 5 + assert result["processing_length"] == 2 + assert result["dlq_length"] == 1 + assert result["oldest_waiting_seconds"] == 60.0 + + @pytest.mark.asyncio + async def test_collect_queue_metrics_empty(self): + """测试空队列时收集指标""" + manager = JobManager() + + mock_pipe = AsyncMock() + mock_pipe.llen = MagicMock() + mock_pipe.zrange = MagicMock() + mock_pipe.execute = AsyncMock(return_value=[0, 0, 0, []]) + mock_pipe.__aenter__ = AsyncMock(return_value=mock_pipe) + mock_pipe.__aexit__ = AsyncMock() + + mock_redis = AsyncMock() + mock_redis.pipeline = MagicMock(return_value=mock_pipe) + manager._redis_client = mock_redis + + with patch("functional_scaffold.core.job_manager.set"): + result = await manager.collect_queue_metrics() + + assert result["queue_length"] == 0 + assert result["processing_length"] == 0 + assert result["dlq_length"] == 0 + assert result["oldest_waiting_seconds"] == 0 + + @pytest.mark.asyncio + async def test_collect_queue_metrics_without_redis(self): + """测试 Redis 不可用时收集指标""" + manager = JobManager() + + result = await manager.collect_queue_metrics() + + assert result["queue_length"] == 0 + assert result["processing_length"] == 0 + assert result["dlq_length"] == 0 + assert result["oldest_waiting_seconds"] == 0