main:新增并发控制功能
变更内容: - 增加 `max_concurrent_jobs` 配置项,支持设置最大并发任务数。 - 为 `JobManager` 添加信号量控制实现任务并发限制。 - 新增获取任务并发状态的接口 `/jobs/concurrency/status`。 - 编写并发控制功能相关的测试用
This commit is contained in:
@@ -186,6 +186,10 @@ class TestJobManagerWithMocks:
|
||||
manager._redis_client = mock_redis
|
||||
manager._register_algorithms()
|
||||
|
||||
# 初始化 semaphore
|
||||
import asyncio
|
||||
manager._semaphore = asyncio.Semaphore(10)
|
||||
|
||||
await manager.execute_job("test-job-id")
|
||||
|
||||
# 验证状态更新被调用
|
||||
@@ -399,3 +403,97 @@ class TestWebhook:
|
||||
|
||||
# 验证重试次数
|
||||
assert mock_http.post.call_count == 2
|
||||
|
||||
|
||||
class TestConcurrencyControl:
|
||||
"""测试并发控制功能"""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_concurrency_status(self):
|
||||
"""测试获取并发状态"""
|
||||
manager = JobManager()
|
||||
|
||||
# 初始化 semaphore
|
||||
manager._max_concurrent_jobs = 10
|
||||
manager._semaphore = asyncio.Semaphore(10)
|
||||
|
||||
status = manager.get_concurrency_status()
|
||||
|
||||
assert status["max_concurrent"] == 10
|
||||
assert status["available_slots"] == 10
|
||||
assert status["running_jobs"] == 0
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_concurrency_status_without_semaphore(self):
|
||||
"""测试未初始化 semaphore 时获取并发状态"""
|
||||
manager = JobManager()
|
||||
|
||||
status = manager.get_concurrency_status()
|
||||
|
||||
assert status["max_concurrent"] == 0
|
||||
assert status["available_slots"] == 0
|
||||
assert status["running_jobs"] == 0
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_concurrency_limit(self):
|
||||
"""测试并发限制是否生效"""
|
||||
manager = JobManager()
|
||||
|
||||
# 设置较小的并发限制
|
||||
manager._max_concurrent_jobs = 2
|
||||
manager._semaphore = asyncio.Semaphore(2)
|
||||
|
||||
# 模拟 Redis
|
||||
mock_redis = AsyncMock()
|
||||
mock_redis.hgetall = AsyncMock(
|
||||
return_value={
|
||||
"status": "pending",
|
||||
"algorithm": "PrimeChecker",
|
||||
"params": '{"number": 17}',
|
||||
"webhook": "",
|
||||
"request_id": "test-request-id",
|
||||
"created_at": "2026-02-02T10:00:00+00:00",
|
||||
}
|
||||
)
|
||||
mock_redis.hset = AsyncMock()
|
||||
mock_redis.expire = AsyncMock()
|
||||
manager._redis_client = mock_redis
|
||||
manager._register_algorithms()
|
||||
|
||||
# 创建一个慢速任务
|
||||
async def slow_execute():
|
||||
async with manager._semaphore:
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
# 启动 3 个任务
|
||||
tasks = [asyncio.create_task(slow_execute()) for _ in range(3)]
|
||||
|
||||
# 等待一小段时间,让前两个任务获取 semaphore
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
# 检查并发状态
|
||||
status = manager.get_concurrency_status()
|
||||
assert status["running_jobs"] == 2 # 只有 2 个任务在运行
|
||||
assert status["available_slots"] == 0 # 没有可用槽位
|
||||
|
||||
# 等待所有任务完成
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
# 检查最终状态
|
||||
status = manager.get_concurrency_status()
|
||||
assert status["running_jobs"] == 0
|
||||
assert status["available_slots"] == 2
|
||||
|
||||
def test_concurrency_status_api(self, client):
|
||||
"""测试并发状态 API 端点"""
|
||||
response = client.get("/jobs/concurrency/status")
|
||||
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
data = response.json()
|
||||
|
||||
assert "max_concurrent" in data
|
||||
assert "available_slots" in data
|
||||
assert "running_jobs" in data
|
||||
assert isinstance(data["max_concurrent"], int)
|
||||
assert isinstance(data["available_slots"], int)
|
||||
assert isinstance(data["running_jobs"], int)
|
||||
|
||||
Reference in New Issue
Block a user