Files
FunctionalScaffold/tests/test_job_manager.py
Roog (顾新培) 7b627090f3 main:优化任务管理及队列监控性能
变更内容:
- 优化任务出队逻辑,采用 BLMOVE 提升队列操作的原子性和可靠性。
- 在 JobManager 中新增任务锁续租、超时任务回收、ACK/NACK 状态管理功能。
- 实现任务队列和死信队列监控指标收集,为系统性能分析提供数据支持。
- 扩展 Worker 模块,增加锁续租逻辑及任务回收调度。
- 更新测试用例,覆盖任务管理和队列指标的新增逻辑。
- 补充 metrics.yaml 文件,添加队列相关的监控指标定义。
- 更新依赖,补充 Redis 支持及相关库版本规范。
2026-02-03 18:18:02 +08:00

1171 lines
38 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""异步任务管理器测试"""
import asyncio
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from fastapi import status
from functional_scaffold.core.job_manager import (
JobManager,
)
class TestJobManager:
"""测试 JobManager 类"""
@pytest.fixture
def mock_redis(self):
"""模拟 Redis 客户端"""
mock = AsyncMock()
mock.ping = AsyncMock(return_value=True)
mock.hset = AsyncMock()
mock.hgetall = AsyncMock(return_value={})
mock.expire = AsyncMock()
mock.close = AsyncMock()
return mock
@pytest.fixture
def mock_http_client(self):
"""模拟 HTTP 客户端"""
mock = AsyncMock()
mock.post = AsyncMock()
mock.aclose = AsyncMock()
return mock
@pytest.mark.asyncio
async def test_generate_job_id(self):
"""测试任务 ID 生成"""
manager = JobManager()
job_id = manager._generate_job_id()
assert len(job_id) == 12
assert all(c in "0123456789abcdef" for c in job_id)
@pytest.mark.asyncio
async def test_get_timestamp(self):
"""测试时间戳生成"""
manager = JobManager()
timestamp = manager._get_timestamp()
assert "T" in timestamp
assert timestamp.endswith("+00:00") or timestamp.endswith("Z")
@pytest.mark.asyncio
async def test_get_available_algorithms(self):
"""测试获取可用算法列表"""
manager = JobManager()
manager._register_algorithms()
algorithms = manager.get_available_algorithms()
assert "PrimeChecker" in algorithms
@pytest.mark.asyncio
async def test_is_available_without_redis(self):
"""测试 Redis 不可用时的状态"""
manager = JobManager()
assert manager.is_available() is False
class TestJobManagerWithMocks:
"""使用 Mock 测试 JobManager"""
@pytest.mark.asyncio
async def test_create_job(self):
"""测试创建任务"""
manager = JobManager()
# 模拟 Redis
mock_redis = AsyncMock()
mock_redis.hset = AsyncMock()
manager._redis_client = mock_redis
manager._register_algorithms()
job_id = await manager.create_job(
algorithm="PrimeChecker",
params={"number": 17},
webhook="https://example.com/callback",
request_id="test-request-id",
)
assert len(job_id) == 12
mock_redis.hset.assert_called_once()
@pytest.mark.asyncio
async def test_create_job_invalid_algorithm(self):
"""测试创建任务时算法不存在"""
manager = JobManager()
mock_redis = AsyncMock()
manager._redis_client = mock_redis
manager._register_algorithms()
with pytest.raises(ValueError, match="不存在"):
await manager.create_job(
algorithm="NonExistentAlgorithm",
params={},
)
@pytest.mark.asyncio
async def test_create_job_redis_unavailable(self):
"""测试 Redis 不可用时创建任务"""
manager = JobManager()
manager._register_algorithms()
with pytest.raises(RuntimeError, match="Redis 不可用"):
await manager.create_job(
algorithm="PrimeChecker",
params={"number": 17},
)
@pytest.mark.asyncio
async def test_get_job(self):
"""测试获取任务信息"""
manager = JobManager()
mock_redis = AsyncMock()
mock_redis.hgetall = AsyncMock(
return_value={
"status": "completed",
"algorithm": "PrimeChecker",
"created_at": "2026-02-02T10:00:00+00:00",
"started_at": "2026-02-02T10:00:01+00:00",
"completed_at": "2026-02-02T10:00:02+00:00",
"result": '{"number": 17, "is_prime": true}',
"error": "",
"metadata": '{"elapsed_time": 0.001}',
}
)
manager._redis_client = mock_redis
job_data = await manager.get_job("test-job-id")
assert job_data is not None
assert job_data["job_id"] == "test-job-id"
assert job_data["status"] == "completed"
assert job_data["algorithm"] == "PrimeChecker"
assert job_data["result"]["number"] == 17
assert job_data["result"]["is_prime"] is True
@pytest.mark.asyncio
async def test_get_job_not_found(self):
"""测试获取不存在的任务"""
manager = JobManager()
mock_redis = AsyncMock()
mock_redis.hgetall = AsyncMock(return_value={})
manager._redis_client = mock_redis
job_data = await manager.get_job("non-existent-job")
assert job_data is None
@pytest.mark.asyncio
async def test_execute_job(self):
"""测试执行任务"""
manager = JobManager()
mock_redis = AsyncMock()
mock_redis.hgetall = AsyncMock(
return_value={
"status": "pending",
"algorithm": "PrimeChecker",
"params": '{"number": 17}',
"webhook": "",
"request_id": "test-request-id",
"created_at": "2026-02-02T10:00:00+00:00",
}
)
mock_redis.hset = AsyncMock()
mock_redis.expire = AsyncMock()
manager._redis_client = mock_redis
manager._register_algorithms()
# 初始化 semaphore
import asyncio
manager._semaphore = asyncio.Semaphore(10)
await manager.execute_job("test-job-id")
# 验证状态更新被调用
assert mock_redis.hset.call_count >= 2 # running + completed
mock_redis.expire.assert_called_once()
class TestJobsAPI:
"""测试 /jobs API 端点"""
def test_create_job_success(self, client):
"""测试成功创建任务"""
with patch(
"functional_scaffold.api.routes.get_job_manager", new_callable=AsyncMock
) as mock_get_manager:
mock_manager = MagicMock()
mock_manager.is_available.return_value = True
mock_manager.get_available_algorithms.return_value = ["PrimeChecker"]
mock_manager.create_job = AsyncMock(return_value="abc123def456")
mock_manager.get_job = AsyncMock(
return_value={
"job_id": "abc123def456",
"status": "pending",
"algorithm": "PrimeChecker",
"created_at": "2026-02-02T10:00:00+00:00",
}
)
mock_manager.enqueue_job = AsyncMock(return_value=True)
mock_get_manager.return_value = mock_manager
response = client.post(
"/jobs",
json={
"algorithm": "PrimeChecker",
"params": {"number": 17},
},
)
assert response.status_code == status.HTTP_202_ACCEPTED
data = response.json()
assert data["job_id"] == "abc123def456"
assert data["status"] == "pending"
assert data["message"] == "任务已创建"
def test_create_job_algorithm_not_found(self, client):
"""测试创建任务时算法不存在"""
with patch(
"functional_scaffold.api.routes.get_job_manager", new_callable=AsyncMock
) as mock_get_manager:
mock_manager = MagicMock()
mock_manager.is_available.return_value = True
mock_manager.get_available_algorithms.return_value = ["PrimeChecker"]
mock_get_manager.return_value = mock_manager
response = client.post(
"/jobs",
json={
"algorithm": "NonExistentAlgorithm",
"params": {},
},
)
assert response.status_code == status.HTTP_404_NOT_FOUND
data = response.json()
assert data["detail"]["error"] == "ALGORITHM_NOT_FOUND"
def test_create_job_service_unavailable(self, client):
"""测试服务不可用时创建任务"""
with patch(
"functional_scaffold.api.routes.get_job_manager", new_callable=AsyncMock
) as mock_get_manager:
mock_manager = MagicMock()
mock_manager.is_available.return_value = False
mock_get_manager.return_value = mock_manager
response = client.post(
"/jobs",
json={
"algorithm": "PrimeChecker",
"params": {"number": 17},
},
)
assert response.status_code == status.HTTP_503_SERVICE_UNAVAILABLE
def test_get_job_status_success(self, client):
"""测试成功查询任务状态"""
with patch(
"functional_scaffold.api.routes.get_job_manager", new_callable=AsyncMock
) as mock_get_manager:
mock_manager = MagicMock()
mock_manager.is_available.return_value = True
mock_manager.get_job = AsyncMock(
return_value={
"job_id": "abc123def456",
"status": "completed",
"algorithm": "PrimeChecker",
"created_at": "2026-02-02T10:00:00+00:00",
"started_at": "2026-02-02T10:00:01+00:00",
"completed_at": "2026-02-02T10:00:02+00:00",
"result": {"number": 17, "is_prime": True},
"error": None,
"metadata": {"elapsed_time": 0.001},
}
)
mock_get_manager.return_value = mock_manager
response = client.get("/jobs/abc123def456")
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert data["job_id"] == "abc123def456"
assert data["status"] == "completed"
assert data["result"]["is_prime"] is True
def test_get_job_status_not_found(self, client):
"""测试查询不存在的任务"""
with patch(
"functional_scaffold.api.routes.get_job_manager", new_callable=AsyncMock
) as mock_get_manager:
mock_manager = MagicMock()
mock_manager.is_available.return_value = True
mock_manager.get_job = AsyncMock(return_value=None)
mock_get_manager.return_value = mock_manager
response = client.get("/jobs/non-existent-job")
assert response.status_code == status.HTTP_404_NOT_FOUND
data = response.json()
assert data["detail"]["error"] == "JOB_NOT_FOUND"
def test_get_job_status_service_unavailable(self, client):
"""测试服务不可用时查询任务"""
with patch(
"functional_scaffold.api.routes.get_job_manager", new_callable=AsyncMock
) as mock_get_manager:
mock_manager = MagicMock()
mock_manager.is_available.return_value = False
mock_get_manager.return_value = mock_manager
response = client.get("/jobs/abc123def456")
assert response.status_code == status.HTTP_503_SERVICE_UNAVAILABLE
class TestWebhook:
"""测试 Webhook 回调"""
@pytest.mark.asyncio
async def test_send_webhook_success(self):
"""测试成功发送 Webhook"""
manager = JobManager()
mock_redis = AsyncMock()
mock_redis.hgetall = AsyncMock(
return_value={
"status": "completed",
"algorithm": "PrimeChecker",
"created_at": "2026-02-02T10:00:00+00:00",
"completed_at": "2026-02-02T10:00:02+00:00",
"result": '{"number": 17, "is_prime": true}',
"error": "",
"metadata": '{"elapsed_time": 0.001}',
}
)
manager._redis_client = mock_redis
mock_response = MagicMock()
mock_response.status_code = 200
mock_http = AsyncMock()
mock_http.post = AsyncMock(return_value=mock_response)
manager._http_client = mock_http
await manager._send_webhook("test-job-id", "https://example.com/callback")
mock_http.post.assert_called_once()
call_args = mock_http.post.call_args
assert call_args[0][0] == "https://example.com/callback"
assert "json" in call_args[1]
@pytest.mark.asyncio
async def test_send_webhook_retry_on_failure(self):
"""测试 Webhook 失败时重试"""
manager = JobManager()
mock_redis = AsyncMock()
mock_redis.hgetall = AsyncMock(
return_value={
"status": "completed",
"algorithm": "PrimeChecker",
"created_at": "2026-02-02T10:00:00+00:00",
"completed_at": "2026-02-02T10:00:02+00:00",
"result": "{}",
"error": "",
"metadata": "{}",
}
)
manager._redis_client = mock_redis
mock_http = AsyncMock()
mock_http.post = AsyncMock(side_effect=Exception("Connection error"))
manager._http_client = mock_http
# 使用较短的重试间隔进行测试
with patch("functional_scaffold.core.job_manager.settings") as mock_settings:
mock_settings.webhook_max_retries = 2
mock_settings.webhook_timeout = 1
await manager._send_webhook("test-job-id", "https://example.com/callback")
# 验证重试次数
assert mock_http.post.call_count == 2
class TestConcurrencyControl:
"""测试并发控制功能"""
@pytest.mark.asyncio
async def test_get_concurrency_status(self):
"""测试获取并发状态"""
manager = JobManager()
# 初始化 semaphore
manager._max_concurrent_jobs = 10
manager._semaphore = asyncio.Semaphore(10)
status = manager.get_concurrency_status()
assert status["max_concurrent"] == 10
assert status["available_slots"] == 10
assert status["running_jobs"] == 0
@pytest.mark.asyncio
async def test_get_concurrency_status_without_semaphore(self):
"""测试未初始化 semaphore 时获取并发状态"""
manager = JobManager()
status = manager.get_concurrency_status()
assert status["max_concurrent"] == 0
assert status["available_slots"] == 0
assert status["running_jobs"] == 0
@pytest.mark.asyncio
async def test_concurrency_limit(self):
"""测试并发限制是否生效"""
manager = JobManager()
# 设置较小的并发限制
manager._max_concurrent_jobs = 2
manager._semaphore = asyncio.Semaphore(2)
# 模拟 Redis
mock_redis = AsyncMock()
mock_redis.hgetall = AsyncMock(
return_value={
"status": "pending",
"algorithm": "PrimeChecker",
"params": '{"number": 17}',
"webhook": "",
"request_id": "test-request-id",
"created_at": "2026-02-02T10:00:00+00:00",
}
)
mock_redis.hset = AsyncMock()
mock_redis.expire = AsyncMock()
manager._redis_client = mock_redis
manager._register_algorithms()
# 创建一个慢速任务
async def slow_execute():
async with manager._semaphore:
await asyncio.sleep(0.1)
# 启动 3 个任务
tasks = [asyncio.create_task(slow_execute()) for _ in range(3)]
# 等待一小段时间,让前两个任务获取 semaphore
await asyncio.sleep(0.01)
# 检查并发状态
status = manager.get_concurrency_status()
assert status["running_jobs"] == 2 # 只有 2 个任务在运行
assert status["available_slots"] == 0 # 没有可用槽位
# 等待所有任务完成
await asyncio.gather(*tasks)
# 检查最终状态
status = manager.get_concurrency_status()
assert status["running_jobs"] == 0
assert status["available_slots"] == 2
def test_concurrency_status_api(self, client):
"""测试并发状态 API 端点"""
with patch(
"functional_scaffold.api.routes.get_job_manager", new_callable=AsyncMock
) as mock_get_manager:
mock_manager = MagicMock()
mock_manager.is_available.return_value = True
mock_manager.get_concurrency_status.return_value = {
"max_concurrent": 10,
"available_slots": 8,
"running_jobs": 2,
}
mock_get_manager.return_value = mock_manager
response = client.get("/jobs/concurrency/status")
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert "max_concurrent" in data
assert "available_slots" in data
assert "running_jobs" in data
assert isinstance(data["max_concurrent"], int)
assert isinstance(data["available_slots"], int)
assert isinstance(data["running_jobs"], int)
class TestJobQueue:
"""测试任务队列功能"""
@pytest.mark.asyncio
async def test_enqueue_job(self):
"""测试任务入队"""
manager = JobManager()
mock_redis = AsyncMock()
mock_redis.lpush = AsyncMock(return_value=1)
manager._redis_client = mock_redis
result = await manager.enqueue_job("test-job-id")
assert result is True
mock_redis.lpush.assert_called_once()
@pytest.mark.asyncio
async def test_enqueue_job_without_redis(self):
"""测试 Redis 不可用时入队"""
manager = JobManager()
result = await manager.enqueue_job("test-job-id")
assert result is False
@pytest.mark.asyncio
async def test_dequeue_job(self):
"""测试任务出队(使用 BLMOVE"""
manager = JobManager()
mock_redis = AsyncMock()
mock_redis.blmove = AsyncMock(return_value="test-job-id")
mock_redis.zadd = AsyncMock()
manager._redis_client = mock_redis
result = await manager.dequeue_job(timeout=5)
assert result == "test-job-id"
mock_redis.blmove.assert_called_once()
mock_redis.zadd.assert_called_once()
@pytest.mark.asyncio
async def test_dequeue_job_timeout(self):
"""测试任务出队超时"""
manager = JobManager()
mock_redis = AsyncMock()
mock_redis.blmove = AsyncMock(return_value=None)
manager._redis_client = mock_redis
result = await manager.dequeue_job(timeout=1)
assert result is None
@pytest.mark.asyncio
async def test_dequeue_job_without_redis(self):
"""测试 Redis 不可用时出队"""
manager = JobManager()
result = await manager.dequeue_job(timeout=1)
assert result is None
class TestDistributedLock:
"""测试分布式锁功能"""
@pytest.mark.asyncio
async def test_acquire_job_lock(self):
"""测试获取任务锁"""
manager = JobManager()
mock_redis = AsyncMock()
mock_redis.set = AsyncMock(return_value=True)
manager._redis_client = mock_redis
result = await manager.acquire_job_lock("test-job-id")
assert result is not None # 返回 token
assert len(result) == 32 # 16 字节的十六进制字符串
mock_redis.set.assert_called_once()
call_args = mock_redis.set.call_args
assert call_args[0][0] == "job:lock:test-job-id"
assert call_args[1]["nx"] is True
assert "ex" in call_args[1]
@pytest.mark.asyncio
async def test_acquire_job_lock_already_locked(self):
"""测试获取已被锁定的任务锁"""
manager = JobManager()
mock_redis = AsyncMock()
mock_redis.set = AsyncMock(return_value=None) # 锁已存在
manager._redis_client = mock_redis
result = await manager.acquire_job_lock("test-job-id")
assert result is None
@pytest.mark.asyncio
async def test_release_job_lock(self):
"""测试释放任务锁"""
manager = JobManager()
mock_redis = AsyncMock()
mock_redis.eval = AsyncMock(return_value=1)
manager._redis_client = mock_redis
result = await manager.release_job_lock("test-job-id", "valid-token")
assert result is True
mock_redis.eval.assert_called_once()
@pytest.mark.asyncio
async def test_release_job_lock_without_redis(self):
"""测试 Redis 不可用时释放锁"""
manager = JobManager()
result = await manager.release_job_lock("test-job-id", "token")
assert result is False
class TestGlobalConcurrency:
"""测试全局并发控制功能"""
@pytest.mark.asyncio
async def test_increment_concurrency(self):
"""测试增加并发计数"""
manager = JobManager()
mock_redis = AsyncMock()
mock_redis.incr = AsyncMock(return_value=5)
manager._redis_client = mock_redis
result = await manager.increment_concurrency()
assert result == 5
mock_redis.incr.assert_called_once()
@pytest.mark.asyncio
async def test_decrement_concurrency(self):
"""测试减少并发计数"""
manager = JobManager()
mock_redis = AsyncMock()
mock_redis.decr = AsyncMock(return_value=4)
manager._redis_client = mock_redis
result = await manager.decrement_concurrency()
assert result == 4
mock_redis.decr.assert_called_once()
@pytest.mark.asyncio
async def test_decrement_concurrency_prevent_negative(self):
"""测试防止并发计数变为负数"""
manager = JobManager()
mock_redis = AsyncMock()
mock_redis.decr = AsyncMock(return_value=-1)
mock_redis.set = AsyncMock()
manager._redis_client = mock_redis
result = await manager.decrement_concurrency()
assert result == 0
mock_redis.set.assert_called_once()
@pytest.mark.asyncio
async def test_get_global_concurrency(self):
"""测试获取全局并发数"""
manager = JobManager()
mock_redis = AsyncMock()
mock_redis.get = AsyncMock(return_value="7")
manager._redis_client = mock_redis
result = await manager.get_global_concurrency()
assert result == 7
@pytest.mark.asyncio
async def test_get_global_concurrency_empty(self):
"""测试获取空的全局并发数"""
manager = JobManager()
mock_redis = AsyncMock()
mock_redis.get = AsyncMock(return_value=None)
manager._redis_client = mock_redis
result = await manager.get_global_concurrency()
assert result == 0
@pytest.mark.asyncio
async def test_can_execute(self):
"""测试检查是否可执行"""
manager = JobManager()
mock_redis = AsyncMock()
mock_redis.get = AsyncMock(return_value="5")
manager._redis_client = mock_redis
with patch("functional_scaffold.core.job_manager.settings") as mock_settings:
mock_settings.max_concurrent_jobs = 10
result = await manager.can_execute()
assert result is True
@pytest.mark.asyncio
async def test_can_execute_at_limit(self):
"""测试达到并发限制时"""
manager = JobManager()
mock_redis = AsyncMock()
mock_redis.get = AsyncMock(return_value="10")
manager._redis_client = mock_redis
with patch("functional_scaffold.core.job_manager.settings") as mock_settings:
mock_settings.max_concurrent_jobs = 10
result = await manager.can_execute()
assert result is False
class TestJobRetry:
"""测试任务重试功能"""
@pytest.mark.asyncio
async def test_get_job_retry_count(self):
"""测试获取任务重试次数"""
manager = JobManager()
mock_redis = AsyncMock()
mock_redis.hget = AsyncMock(return_value="2")
manager._redis_client = mock_redis
result = await manager.get_job_retry_count("test-job-id")
assert result == 2
mock_redis.hget.assert_called_once_with("job:test-job-id", "retry_count")
@pytest.mark.asyncio
async def test_get_job_retry_count_empty(self):
"""测试获取空的重试次数"""
manager = JobManager()
mock_redis = AsyncMock()
mock_redis.hget = AsyncMock(return_value=None)
manager._redis_client = mock_redis
result = await manager.get_job_retry_count("test-job-id")
assert result == 0
@pytest.mark.asyncio
async def test_increment_job_retry(self):
"""测试增加任务重试次数"""
manager = JobManager()
mock_redis = AsyncMock()
mock_redis.hincrby = AsyncMock()
mock_redis.hget = AsyncMock(return_value="3")
manager._redis_client = mock_redis
result = await manager.increment_job_retry("test-job-id")
assert result == 3
mock_redis.hincrby.assert_called_once_with("job:test-job-id", "retry_count", 1)
class TestTransferDequeue:
"""测试转移式出队功能"""
@pytest.mark.asyncio
async def test_dequeue_job_with_blmove(self):
"""测试使用 BLMOVE 转移式出队"""
manager = JobManager()
mock_redis = AsyncMock()
mock_redis.blmove = AsyncMock(return_value="test-job-id")
mock_redis.zadd = AsyncMock()
manager._redis_client = mock_redis
result = await manager.dequeue_job(timeout=5)
assert result == "test-job-id"
mock_redis.blmove.assert_called_once()
mock_redis.zadd.assert_called_once()
@pytest.mark.asyncio
async def test_dequeue_job_timeout(self):
"""测试出队超时"""
manager = JobManager()
mock_redis = AsyncMock()
mock_redis.blmove = AsyncMock(return_value=None)
manager._redis_client = mock_redis
result = await manager.dequeue_job(timeout=1)
assert result is None
mock_redis.zadd.assert_not_called()
class TestTokenBasedLock:
"""测试带 Token 的安全锁"""
@pytest.mark.asyncio
async def test_acquire_job_lock_returns_token(self):
"""测试获取锁返回 token"""
manager = JobManager()
mock_redis = AsyncMock()
mock_redis.set = AsyncMock(return_value=True)
manager._redis_client = mock_redis
result = await manager.acquire_job_lock("test-job-id")
assert result is not None
assert len(result) == 32 # 16 字节的十六进制字符串
mock_redis.set.assert_called_once()
call_args = mock_redis.set.call_args
assert call_args[0][0] == "job:lock:test-job-id"
assert call_args[1]["nx"] is True
@pytest.mark.asyncio
async def test_acquire_job_lock_already_locked(self):
"""测试获取已被锁定的任务锁"""
manager = JobManager()
mock_redis = AsyncMock()
mock_redis.set = AsyncMock(return_value=None)
manager._redis_client = mock_redis
result = await manager.acquire_job_lock("test-job-id")
assert result is None
@pytest.mark.asyncio
async def test_release_job_lock_with_token(self):
"""测试使用 token 释放锁"""
manager = JobManager()
mock_redis = AsyncMock()
mock_redis.eval = AsyncMock(return_value=1)
manager._redis_client = mock_redis
result = await manager.release_job_lock("test-job-id", "valid-token")
assert result is True
mock_redis.eval.assert_called_once()
@pytest.mark.asyncio
async def test_release_job_lock_invalid_token(self):
"""测试使用无效 token 释放锁"""
manager = JobManager()
mock_redis = AsyncMock()
mock_redis.eval = AsyncMock(return_value=0)
manager._redis_client = mock_redis
result = await manager.release_job_lock("test-job-id", "invalid-token")
assert result is False
@pytest.mark.asyncio
async def test_release_job_lock_without_token(self):
"""测试不使用 token 释放锁(向后兼容)"""
manager = JobManager()
mock_redis = AsyncMock()
mock_redis.delete = AsyncMock()
manager._redis_client = mock_redis
result = await manager.release_job_lock("test-job-id")
assert result is True
mock_redis.delete.assert_called_once_with("job:lock:test-job-id")
class TestAckNack:
"""测试 ACK/NACK 机制"""
@pytest.mark.asyncio
async def test_ack_job(self):
"""测试确认任务完成"""
manager = JobManager()
mock_pipe = AsyncMock()
mock_pipe.lrem = MagicMock()
mock_pipe.zrem = MagicMock()
mock_pipe.execute = AsyncMock()
mock_pipe.__aenter__ = AsyncMock(return_value=mock_pipe)
mock_pipe.__aexit__ = AsyncMock()
mock_redis = AsyncMock()
mock_redis.pipeline = MagicMock(return_value=mock_pipe)
manager._redis_client = mock_redis
result = await manager.ack_job("test-job-id")
assert result is True
mock_pipe.lrem.assert_called_once()
mock_pipe.zrem.assert_called_once()
@pytest.mark.asyncio
async def test_nack_job_requeue(self):
"""测试拒绝任务并重新入队"""
manager = JobManager()
mock_pipe = AsyncMock()
mock_pipe.lrem = MagicMock()
mock_pipe.zrem = MagicMock()
mock_pipe.lpush = MagicMock()
mock_pipe.execute = AsyncMock()
mock_pipe.__aenter__ = AsyncMock(return_value=mock_pipe)
mock_pipe.__aexit__ = AsyncMock()
mock_redis = AsyncMock()
mock_redis.pipeline = MagicMock(return_value=mock_pipe)
mock_redis.hget = AsyncMock(return_value="0") # retry_count = 0
manager._redis_client = mock_redis
result = await manager.nack_job("test-job-id", requeue=True)
assert result is True
assert mock_pipe.lpush.call_count == 1
@pytest.mark.asyncio
async def test_nack_job_to_dlq(self):
"""测试拒绝任务进入死信队列"""
manager = JobManager()
mock_pipe = AsyncMock()
mock_pipe.lrem = MagicMock()
mock_pipe.zrem = MagicMock()
mock_pipe.lpush = MagicMock()
mock_pipe.execute = AsyncMock()
mock_pipe.__aenter__ = AsyncMock(return_value=mock_pipe)
mock_pipe.__aexit__ = AsyncMock()
mock_redis = AsyncMock()
mock_redis.pipeline = MagicMock(return_value=mock_pipe)
mock_redis.hget = AsyncMock(return_value="5") # retry_count > max_retries
manager._redis_client = mock_redis
with patch("functional_scaffold.core.job_manager.settings") as mock_settings:
mock_settings.job_max_retries = 3
mock_settings.job_processing_key = "job:processing"
mock_settings.job_processing_ts_key = "job:processing:ts"
mock_settings.job_dlq_key = "job:dlq"
mock_settings.job_queue_key = "job:queue"
result = await manager.nack_job("test-job-id", requeue=True)
assert result is True
class TestLockRenewal:
"""测试锁续租功能"""
@pytest.mark.asyncio
async def test_renew_job_lock_success(self):
"""测试锁续租成功"""
manager = JobManager()
mock_redis = AsyncMock()
mock_redis.eval = AsyncMock(return_value=1)
manager._redis_client = mock_redis
result = await manager.renew_job_lock("test-job-id", "valid-token")
assert result is True
mock_redis.eval.assert_called_once()
@pytest.mark.asyncio
async def test_renew_job_lock_invalid_token(self):
"""测试锁续租失败token 不匹配)"""
manager = JobManager()
mock_redis = AsyncMock()
mock_redis.eval = AsyncMock(return_value=0)
manager._redis_client = mock_redis
result = await manager.renew_job_lock("test-job-id", "invalid-token")
assert result is False
@pytest.mark.asyncio
async def test_renew_job_lock_without_redis(self):
"""测试 Redis 不可用时续租"""
manager = JobManager()
result = await manager.renew_job_lock("test-job-id", "token")
assert result is False
class TestStaleJobRecovery:
"""测试超时任务回收功能"""
@pytest.mark.asyncio
async def test_recover_stale_jobs_empty(self):
"""测试没有超时任务时的回收"""
manager = JobManager()
mock_redis = AsyncMock()
mock_redis.zrangebyscore = AsyncMock(return_value=[])
manager._redis_client = mock_redis
result = await manager.recover_stale_jobs()
assert result == 0
@pytest.mark.asyncio
async def test_recover_stale_jobs_requeue(self):
"""测试回收超时任务并重新入队"""
manager = JobManager()
mock_pipe = AsyncMock()
mock_pipe.lrem = MagicMock()
mock_pipe.zrem = MagicMock()
mock_pipe.lpush = MagicMock()
mock_pipe.execute = AsyncMock()
mock_pipe.__aenter__ = AsyncMock(return_value=mock_pipe)
mock_pipe.__aexit__ = AsyncMock()
mock_redis = AsyncMock()
mock_redis.zrangebyscore = AsyncMock(return_value=["stale-job-1", "stale-job-2"])
mock_redis.hincrby = AsyncMock()
mock_redis.hget = AsyncMock(return_value="1") # retry_count = 1
mock_redis.pipeline = MagicMock(return_value=mock_pipe)
manager._redis_client = mock_redis
with patch("functional_scaffold.core.job_manager.settings") as mock_settings:
mock_settings.job_execution_timeout = 300
mock_settings.job_lock_buffer = 60
mock_settings.job_max_retries = 3
mock_settings.job_processing_key = "job:processing"
mock_settings.job_processing_ts_key = "job:processing:ts"
mock_settings.job_dlq_key = "job:dlq"
mock_settings.job_queue_key = "job:queue"
result = await manager.recover_stale_jobs()
assert result == 2
@pytest.mark.asyncio
async def test_recover_stale_jobs_to_dlq(self):
"""测试回收超时任务进入死信队列"""
manager = JobManager()
mock_pipe = AsyncMock()
mock_pipe.lrem = MagicMock()
mock_pipe.zrem = MagicMock()
mock_pipe.lpush = MagicMock()
mock_pipe.execute = AsyncMock()
mock_pipe.__aenter__ = AsyncMock(return_value=mock_pipe)
mock_pipe.__aexit__ = AsyncMock()
mock_redis = AsyncMock()
mock_redis.zrangebyscore = AsyncMock(return_value=["stale-job-1"])
mock_redis.hincrby = AsyncMock()
mock_redis.hget = AsyncMock(return_value="5") # retry_count > max_retries
mock_redis.pipeline = MagicMock(return_value=mock_pipe)
manager._redis_client = mock_redis
with patch("functional_scaffold.core.job_manager.settings") as mock_settings:
mock_settings.job_execution_timeout = 300
mock_settings.job_lock_buffer = 60
mock_settings.job_max_retries = 3
mock_settings.job_processing_key = "job:processing"
mock_settings.job_processing_ts_key = "job:processing:ts"
mock_settings.job_dlq_key = "job:dlq"
mock_settings.job_queue_key = "job:queue"
result = await manager.recover_stale_jobs()
assert result == 1
@pytest.mark.asyncio
async def test_recover_stale_jobs_without_redis(self):
"""测试 Redis 不可用时回收"""
manager = JobManager()
result = await manager.recover_stale_jobs()
assert result == 0
class TestQueueMetrics:
"""测试队列监控指标收集"""
@pytest.mark.asyncio
async def test_collect_queue_metrics(self):
"""测试收集队列指标"""
manager = JobManager()
mock_pipe = AsyncMock()
mock_pipe.llen = MagicMock()
mock_pipe.zrange = MagicMock()
mock_pipe.execute = AsyncMock(return_value=[5, 2, 1, [("job-1", 1000.0)]])
mock_pipe.__aenter__ = AsyncMock(return_value=mock_pipe)
mock_pipe.__aexit__ = AsyncMock()
mock_redis = AsyncMock()
mock_redis.pipeline = MagicMock(return_value=mock_pipe)
manager._redis_client = mock_redis
with patch("functional_scaffold.core.job_manager.time") as mock_time:
mock_time.time.return_value = 1060.0 # 60 秒后
with patch("functional_scaffold.core.job_manager.set") as mock_set:
result = await manager.collect_queue_metrics()
assert result["queue_length"] == 5
assert result["processing_length"] == 2
assert result["dlq_length"] == 1
assert result["oldest_waiting_seconds"] == 60.0
@pytest.mark.asyncio
async def test_collect_queue_metrics_empty(self):
"""测试空队列时收集指标"""
manager = JobManager()
mock_pipe = AsyncMock()
mock_pipe.llen = MagicMock()
mock_pipe.zrange = MagicMock()
mock_pipe.execute = AsyncMock(return_value=[0, 0, 0, []])
mock_pipe.__aenter__ = AsyncMock(return_value=mock_pipe)
mock_pipe.__aexit__ = AsyncMock()
mock_redis = AsyncMock()
mock_redis.pipeline = MagicMock(return_value=mock_pipe)
manager._redis_client = mock_redis
with patch("functional_scaffold.core.job_manager.set"):
result = await manager.collect_queue_metrics()
assert result["queue_length"] == 0
assert result["processing_length"] == 0
assert result["dlq_length"] == 0
assert result["oldest_waiting_seconds"] == 0
@pytest.mark.asyncio
async def test_collect_queue_metrics_without_redis(self):
"""测试 Redis 不可用时收集指标"""
manager = JobManager()
result = await manager.collect_queue_metrics()
assert result["queue_length"] == 0
assert result["processing_length"] == 0
assert result["dlq_length"] == 0
assert result["oldest_waiting_seconds"] == 0