From 265e8d1e3dea84120a656d030fc51cc38360dd30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roog=20=28=E9=A1=BE=E6=96=B0=E5=9F=B9=29?= Date: Tue, 3 Feb 2026 13:29:32 +0800 Subject: [PATCH] =?UTF-8?q?main:=E6=94=AF=E6=8C=81=20Worker=20=E6=A8=A1?= =?UTF-8?q?=E5=BC=8F=E8=BF=90=E8=A1=8C=E5=B9=B6=E4=BC=98=E5=8C=96=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E7=AE=A1=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 变更内容: - 在 `Dockerfile` 和 `docker-compose.yml` 中添加 Worker 模式支持,包含运行模式 `RUN_MODE` 的配置。 - 更新 API 路由,改为将任务入队处理,并由 Worker 执行。 - 在 JobManager 中新增任务队列及分布式锁功能,支持任务的入队、出队、执行控制以及重试机制。 - 添加全局并发控制逻辑,避免任务超额运行。 - 扩展单元测试,覆盖任务队列、锁机制和并发控制的各类场景。 - 在 Serverless 配置中分别为 API 和 Worker 添加独立服务定义。 提升任务调度灵活性,增强系统可靠性与扩展性。 --- deployment/Dockerfile | 16 +- deployment/docker-compose.yml | 33 +++ deployment/serverless/aliyun-fc.yaml | 36 ++- src/functional_scaffold/api/routes.py | 7 +- src/functional_scaffold/config.py | 8 + src/functional_scaffold/core/job_manager.py | 189 ++++++++++++ tests/test_job_manager.py | 304 +++++++++++++++++++- 7 files changed, 572 insertions(+), 21 deletions(-) diff --git a/deployment/Dockerfile b/deployment/Dockerfile index 244aa5b..911fd88 100644 --- a/deployment/Dockerfile +++ b/deployment/Dockerfile @@ -30,9 +30,15 @@ USER appuser # 暴露端口 EXPOSE 8000 -# 健康检查 -HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \ - CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8000/healthz')" +# 运行模式:api(默认)或 worker +ENV RUN_MODE=api -# 启动命令 -CMD ["uvicorn", "functional_scaffold.main:app", "--host", "0.0.0.0", "--port", "8000"] \ No newline at end of file +# 健康检查(仅对 API 模式有效) +HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \ + CMD if [ "$RUN_MODE" = "api" ]; then python -c "import urllib.request; urllib.request.urlopen('http://localhost:8000/healthz')"; else exit 0; fi + +# 启动脚本 +COPY --chown=appuser:appuser deployment/entrypoint.sh /app/entrypoint.sh +RUN chmod +x /app/entrypoint.sh + +CMD ["/app/entrypoint.sh"] \ No newline at end of file diff --git a/deployment/docker-compose.yml b/deployment/docker-compose.yml index a09ba3a..f4f4d02 100644 --- a/deployment/docker-compose.yml +++ b/deployment/docker-compose.yml @@ -11,6 +11,7 @@ services: - APP_ENV=development - LOG_LEVEL=INFO - METRICS_ENABLED=true + - RUN_MODE=api # Redis 指标存储配置 - REDIS_HOST=redis - REDIS_PORT=6379 @@ -38,6 +39,38 @@ services: retries: 3 start_period: 5s + # Worker 服务 - 处理异步任务 + worker: + build: + context: .. + dockerfile: deployment/Dockerfile + environment: + - APP_ENV=development + - LOG_LEVEL=INFO + - METRICS_ENABLED=true + - RUN_MODE=worker + # Redis 配置 + - REDIS_HOST=redis + - REDIS_PORT=6379 + - REDIS_DB=0 + # Worker 配置 + - WORKER_POLL_INTERVAL=1.0 + - MAX_CONCURRENT_JOBS=10 + - JOB_MAX_RETRIES=3 + - JOB_EXECUTION_TIMEOUT=300 + volumes: + - ../src:/app/src + - ../config:/app/config + labels: + logging: "promtail" + logging_jobname: "functional-scaffold-worker" + restart: unless-stopped + depends_on: + redis: + condition: service_healthy + deploy: + replicas: 2 + # Redis - 用于集中式指标存储 redis: image: redis:7-alpine diff --git a/deployment/serverless/aliyun-fc.yaml b/deployment/serverless/aliyun-fc.yaml index 5ad0028..c87c1dc 100644 --- a/deployment/serverless/aliyun-fc.yaml +++ b/deployment/serverless/aliyun-fc.yaml @@ -17,7 +17,7 @@ Resources: prime-checker: Type: 'Aliyun::Serverless::Function' Properties: - Description: '质数判断算法服务' + Description: '质数判断算法服务(API)' Runtime: custom-container MemorySize: 512 Timeout: 60 @@ -25,11 +25,14 @@ Resources: CAPort: 8000 CustomContainerConfig: Image: 'registry.cn-hangzhou.aliyuncs.com/your-namespace/functional-scaffold:latest' - Command: '["uvicorn", "functional_scaffold.main:app", "--host", "0.0.0.0", "--port", "8000"]' + Command: '["/app/entrypoint.sh"]' EnvironmentVariables: APP_ENV: production LOG_LEVEL: INFO METRICS_ENABLED: 'true' + RUN_MODE: api + REDIS_HOST: 'r-xxxxx.redis.rds.aliyuncs.com' + REDIS_PORT: '6379' Events: httpTrigger: Type: HTTP @@ -38,3 +41,32 @@ Resources: Methods: - GET - POST + job-worker: + Type: 'Aliyun::Serverless::Function' + Properties: + Description: '异步任务 Worker' + Runtime: custom-container + MemorySize: 512 + Timeout: 900 + InstanceConcurrency: 1 + CustomContainerConfig: + Image: 'registry.cn-hangzhou.aliyuncs.com/your-namespace/functional-scaffold:latest' + Command: '["/app/entrypoint.sh"]' + EnvironmentVariables: + APP_ENV: production + LOG_LEVEL: INFO + METRICS_ENABLED: 'true' + RUN_MODE: worker + REDIS_HOST: 'r-xxxxx.redis.rds.aliyuncs.com' + REDIS_PORT: '6379' + WORKER_POLL_INTERVAL: '1.0' + MAX_CONCURRENT_JOBS: '5' + JOB_MAX_RETRIES: '3' + JOB_EXECUTION_TIMEOUT: '300' + Events: + timerTrigger: + Type: Timer + Properties: + CronExpression: '0 */1 * * * *' + Enable: true + Payload: '{}' diff --git a/src/functional_scaffold/api/routes.py b/src/functional_scaffold/api/routes.py index 980828c..de50f48 100644 --- a/src/functional_scaffold/api/routes.py +++ b/src/functional_scaffold/api/routes.py @@ -1,6 +1,5 @@ """API 路由""" -import asyncio from fastapi import APIRouter, HTTPException, Depends, status import time import logging @@ -200,10 +199,10 @@ async def create_job( # 获取任务信息 job_data = await job_manager.get_job(job_id) - # 后台执行任务 - asyncio.create_task(job_manager.execute_job(job_id)) + # 任务入队,由 Worker 执行 + await job_manager.enqueue_job(job_id) - logger.info(f"异步任务已创建: job_id={job_id}, request_id={request_id}") + logger.info(f"异步任务已创建并入队: job_id={job_id}, request_id={request_id}") return JobCreateResponse( job_id=job_id, diff --git a/src/functional_scaffold/config.py b/src/functional_scaffold/config.py index f1c19f5..7b782d7 100644 --- a/src/functional_scaffold/config.py +++ b/src/functional_scaffold/config.py @@ -57,6 +57,14 @@ class Settings(BaseSettings): webhook_timeout: int = 10 # Webhook 超时时间(秒) max_concurrent_jobs: int = 10 # 最大并发任务数 + # Worker 配置 + worker_poll_interval: float = 1.0 # Worker 轮询间隔(秒) + job_queue_key: str = "job:queue" # 任务队列 Redis Key + job_concurrency_key: str = "job:concurrency" # 全局并发计数器 Redis Key + job_lock_ttl: int = 300 # 任务锁 TTL(秒) + job_max_retries: int = 3 # 任务最大重试次数 + job_execution_timeout: int = 300 # 任务执行超时(秒) + # 全局配置实例 settings = Settings() diff --git a/src/functional_scaffold/core/job_manager.py b/src/functional_scaffold/core/job_manager.py index fcb86fd..68581a1 100644 --- a/src/functional_scaffold/core/job_manager.py +++ b/src/functional_scaffold/core/job_manager.py @@ -372,6 +372,195 @@ class JobManager: """检查任务管理器是否可用""" return self._redis_client is not None + async def enqueue_job(self, job_id: str) -> bool: + """将任务加入队列 + + Args: + job_id: 任务 ID + + Returns: + bool: 是否成功入队 + """ + if not self._redis_client: + logger.error(f"Redis 不可用,无法入队任务: {job_id}") + return False + + try: + await self._redis_client.lpush(settings.job_queue_key, job_id) + logger.info(f"任务已入队: job_id={job_id}") + return True + except Exception as e: + logger.error(f"任务入队失败: job_id={job_id}, error={e}") + return False + + async def dequeue_job(self, timeout: int = 5) -> Optional[str]: + """从队列获取任务(阻塞式) + + Args: + timeout: 阻塞超时时间(秒) + + Returns: + Optional[str]: 任务 ID,超时返回 None + """ + if not self._redis_client: + return None + + try: + result = await self._redis_client.brpop(settings.job_queue_key, timeout=timeout) + if result: + # brpop 返回 (key, value) 元组 + return result[1] + return None + except Exception as e: + logger.error(f"任务出队失败: error={e}") + return None + + async def acquire_job_lock(self, job_id: str) -> bool: + """获取任务执行锁(分布式锁) + + Args: + job_id: 任务 ID + + Returns: + bool: 是否成功获取锁 + """ + if not self._redis_client: + return False + + lock_key = f"job:lock:{job_id}" + try: + acquired = await self._redis_client.set( + lock_key, "locked", nx=True, ex=settings.job_lock_ttl + ) + if acquired: + logger.debug(f"获取任务锁成功: job_id={job_id}") + return acquired is not None + except Exception as e: + logger.error(f"获取任务锁失败: job_id={job_id}, error={e}") + return False + + async def release_job_lock(self, job_id: str) -> bool: + """释放任务执行锁 + + Args: + job_id: 任务 ID + + Returns: + bool: 是否成功释放锁 + """ + if not self._redis_client: + return False + + lock_key = f"job:lock:{job_id}" + try: + await self._redis_client.delete(lock_key) + logger.debug(f"释放任务锁成功: job_id={job_id}") + return True + except Exception as e: + logger.error(f"释放任务锁失败: job_id={job_id}, error={e}") + return False + + async def increment_concurrency(self) -> int: + """增加全局并发计数 + + Returns: + int: 增加后的并发数 + """ + if not self._redis_client: + return 0 + + try: + count = await self._redis_client.incr(settings.job_concurrency_key) + return count + except Exception as e: + logger.error(f"增加并发计数失败: error={e}") + return 0 + + async def decrement_concurrency(self) -> int: + """减少全局并发计数 + + Returns: + int: 减少后的并发数 + """ + if not self._redis_client: + return 0 + + try: + count = await self._redis_client.decr(settings.job_concurrency_key) + # 防止计数变为负数 + if count < 0: + await self._redis_client.set(settings.job_concurrency_key, 0) + return 0 + return count + except Exception as e: + logger.error(f"减少并发计数失败: error={e}") + return 0 + + async def get_global_concurrency(self) -> int: + """获取当前全局并发数 + + Returns: + int: 当前并发数 + """ + if not self._redis_client: + return 0 + + try: + count = await self._redis_client.get(settings.job_concurrency_key) + return int(count) if count else 0 + except Exception as e: + logger.error(f"获取并发计数失败: error={e}") + return 0 + + async def can_execute(self) -> bool: + """检查是否可以执行新任务(全局并发控制) + + Returns: + bool: 是否可以执行 + """ + current = await self.get_global_concurrency() + return current < settings.max_concurrent_jobs + + async def get_job_retry_count(self, job_id: str) -> int: + """获取任务重试次数 + + Args: + job_id: 任务 ID + + Returns: + int: 重试次数 + """ + if not self._redis_client: + return 0 + + key = f"job:{job_id}" + try: + retry_count = await self._redis_client.hget(key, "retry_count") + return int(retry_count) if retry_count else 0 + except Exception: + return 0 + + async def increment_job_retry(self, job_id: str) -> int: + """增加任务重试次数 + + Args: + job_id: 任务 ID + + Returns: + int: 增加后的重试次数 + """ + if not self._redis_client: + return 0 + + key = f"job:{job_id}" + try: + await self._redis_client.hincrby(key, "retry_count", 1) + retry_count = await self._redis_client.hget(key, "retry_count") + return int(retry_count) if retry_count else 1 + except Exception as e: + logger.error(f"增加重试次数失败: job_id={job_id}, error={e}") + return 0 + def get_concurrency_status(self) -> Dict[str, int]: """获取并发状态 diff --git a/tests/test_job_manager.py b/tests/test_job_manager.py index 6b2bbff..2423963 100644 --- a/tests/test_job_manager.py +++ b/tests/test_job_manager.py @@ -217,7 +217,7 @@ class TestJobsAPI: "created_at": "2026-02-02T10:00:00+00:00", } ) - mock_manager.execute_job = AsyncMock() + mock_manager.enqueue_job = AsyncMock(return_value=True) mock_get_manager.return_value = mock_manager response = client.post( @@ -486,14 +486,298 @@ class TestConcurrencyControl: def test_concurrency_status_api(self, client): """测试并发状态 API 端点""" - response = client.get("/jobs/concurrency/status") + with patch( + "functional_scaffold.api.routes.get_job_manager", new_callable=AsyncMock + ) as mock_get_manager: + mock_manager = MagicMock() + mock_manager.is_available.return_value = True + mock_manager.get_concurrency_status.return_value = { + "max_concurrent": 10, + "available_slots": 8, + "running_jobs": 2, + } + mock_get_manager.return_value = mock_manager - assert response.status_code == status.HTTP_200_OK - data = response.json() + response = client.get("/jobs/concurrency/status") - assert "max_concurrent" in data - assert "available_slots" in data - assert "running_jobs" in data - assert isinstance(data["max_concurrent"], int) - assert isinstance(data["available_slots"], int) - assert isinstance(data["running_jobs"], int) + assert response.status_code == status.HTTP_200_OK + data = response.json() + + assert "max_concurrent" in data + assert "available_slots" in data + assert "running_jobs" in data + assert isinstance(data["max_concurrent"], int) + assert isinstance(data["available_slots"], int) + assert isinstance(data["running_jobs"], int) + + +class TestJobQueue: + """测试任务队列功能""" + + @pytest.mark.asyncio + async def test_enqueue_job(self): + """测试任务入队""" + manager = JobManager() + + mock_redis = AsyncMock() + mock_redis.lpush = AsyncMock(return_value=1) + manager._redis_client = mock_redis + + result = await manager.enqueue_job("test-job-id") + + assert result is True + mock_redis.lpush.assert_called_once() + + @pytest.mark.asyncio + async def test_enqueue_job_without_redis(self): + """测试 Redis 不可用时入队""" + manager = JobManager() + + result = await manager.enqueue_job("test-job-id") + + assert result is False + + @pytest.mark.asyncio + async def test_dequeue_job(self): + """测试任务出队""" + manager = JobManager() + + mock_redis = AsyncMock() + mock_redis.brpop = AsyncMock(return_value=("job:queue", "test-job-id")) + manager._redis_client = mock_redis + + result = await manager.dequeue_job(timeout=5) + + assert result == "test-job-id" + mock_redis.brpop.assert_called_once() + + @pytest.mark.asyncio + async def test_dequeue_job_timeout(self): + """测试任务出队超时""" + manager = JobManager() + + mock_redis = AsyncMock() + mock_redis.brpop = AsyncMock(return_value=None) + manager._redis_client = mock_redis + + result = await manager.dequeue_job(timeout=1) + + assert result is None + + @pytest.mark.asyncio + async def test_dequeue_job_without_redis(self): + """测试 Redis 不可用时出队""" + manager = JobManager() + + result = await manager.dequeue_job(timeout=1) + + assert result is None + + +class TestDistributedLock: + """测试分布式锁功能""" + + @pytest.mark.asyncio + async def test_acquire_job_lock(self): + """测试获取任务锁""" + manager = JobManager() + + mock_redis = AsyncMock() + mock_redis.set = AsyncMock(return_value=True) + manager._redis_client = mock_redis + + result = await manager.acquire_job_lock("test-job-id") + + assert result is True + mock_redis.set.assert_called_once() + call_args = mock_redis.set.call_args + assert call_args[0][0] == "job:lock:test-job-id" + assert call_args[1]["nx"] is True + assert "ex" in call_args[1] + + @pytest.mark.asyncio + async def test_acquire_job_lock_already_locked(self): + """测试获取已被锁定的任务锁""" + manager = JobManager() + + mock_redis = AsyncMock() + mock_redis.set = AsyncMock(return_value=None) # 锁已存在 + manager._redis_client = mock_redis + + result = await manager.acquire_job_lock("test-job-id") + + assert result is False + + @pytest.mark.asyncio + async def test_release_job_lock(self): + """测试释放任务锁""" + manager = JobManager() + + mock_redis = AsyncMock() + mock_redis.delete = AsyncMock(return_value=1) + manager._redis_client = mock_redis + + result = await manager.release_job_lock("test-job-id") + + assert result is True + mock_redis.delete.assert_called_once_with("job:lock:test-job-id") + + @pytest.mark.asyncio + async def test_release_job_lock_without_redis(self): + """测试 Redis 不可用时释放锁""" + manager = JobManager() + + result = await manager.release_job_lock("test-job-id") + + assert result is False + + +class TestGlobalConcurrency: + """测试全局并发控制功能""" + + @pytest.mark.asyncio + async def test_increment_concurrency(self): + """测试增加并发计数""" + manager = JobManager() + + mock_redis = AsyncMock() + mock_redis.incr = AsyncMock(return_value=5) + manager._redis_client = mock_redis + + result = await manager.increment_concurrency() + + assert result == 5 + mock_redis.incr.assert_called_once() + + @pytest.mark.asyncio + async def test_decrement_concurrency(self): + """测试减少并发计数""" + manager = JobManager() + + mock_redis = AsyncMock() + mock_redis.decr = AsyncMock(return_value=4) + manager._redis_client = mock_redis + + result = await manager.decrement_concurrency() + + assert result == 4 + mock_redis.decr.assert_called_once() + + @pytest.mark.asyncio + async def test_decrement_concurrency_prevent_negative(self): + """测试防止并发计数变为负数""" + manager = JobManager() + + mock_redis = AsyncMock() + mock_redis.decr = AsyncMock(return_value=-1) + mock_redis.set = AsyncMock() + manager._redis_client = mock_redis + + result = await manager.decrement_concurrency() + + assert result == 0 + mock_redis.set.assert_called_once() + + @pytest.mark.asyncio + async def test_get_global_concurrency(self): + """测试获取全局并发数""" + manager = JobManager() + + mock_redis = AsyncMock() + mock_redis.get = AsyncMock(return_value="7") + manager._redis_client = mock_redis + + result = await manager.get_global_concurrency() + + assert result == 7 + + @pytest.mark.asyncio + async def test_get_global_concurrency_empty(self): + """测试获取空的全局并发数""" + manager = JobManager() + + mock_redis = AsyncMock() + mock_redis.get = AsyncMock(return_value=None) + manager._redis_client = mock_redis + + result = await manager.get_global_concurrency() + + assert result == 0 + + @pytest.mark.asyncio + async def test_can_execute(self): + """测试检查是否可执行""" + manager = JobManager() + + mock_redis = AsyncMock() + mock_redis.get = AsyncMock(return_value="5") + manager._redis_client = mock_redis + + with patch("functional_scaffold.core.job_manager.settings") as mock_settings: + mock_settings.max_concurrent_jobs = 10 + + result = await manager.can_execute() + + assert result is True + + @pytest.mark.asyncio + async def test_can_execute_at_limit(self): + """测试达到并发限制时""" + manager = JobManager() + + mock_redis = AsyncMock() + mock_redis.get = AsyncMock(return_value="10") + manager._redis_client = mock_redis + + with patch("functional_scaffold.core.job_manager.settings") as mock_settings: + mock_settings.max_concurrent_jobs = 10 + + result = await manager.can_execute() + + assert result is False + + +class TestJobRetry: + """测试任务重试功能""" + + @pytest.mark.asyncio + async def test_get_job_retry_count(self): + """测试获取任务重试次数""" + manager = JobManager() + + mock_redis = AsyncMock() + mock_redis.hget = AsyncMock(return_value="2") + manager._redis_client = mock_redis + + result = await manager.get_job_retry_count("test-job-id") + + assert result == 2 + mock_redis.hget.assert_called_once_with("job:test-job-id", "retry_count") + + @pytest.mark.asyncio + async def test_get_job_retry_count_empty(self): + """测试获取空的重试次数""" + manager = JobManager() + + mock_redis = AsyncMock() + mock_redis.hget = AsyncMock(return_value=None) + manager._redis_client = mock_redis + + result = await manager.get_job_retry_count("test-job-id") + + assert result == 0 + + @pytest.mark.asyncio + async def test_increment_job_retry(self): + """测试增加任务重试次数""" + manager = JobManager() + + mock_redis = AsyncMock() + mock_redis.hincrby = AsyncMock() + mock_redis.hget = AsyncMock(return_value="3") + manager._redis_client = mock_redis + + result = await manager.increment_job_retry("test-job-id") + + assert result == 3 + mock_redis.hincrby.assert_called_once_with("job:test-job-id", "retry_count", 1)