更新内容: - 编写《并发控制》详细文档,说明任务并发限制的配置、使用和最佳实践。 - 完成《并发控制实现总结》文档,记录设计决策和开发细节。 - 添加《并发控制快速参考》文档,提供配置和常见问题的快速解决方案。
4.8 KiB
4.8 KiB
异步任务并发控制实现总结
变更概述
为异步任务管理器添加了并发控制功能,使用 asyncio.Semaphore 限制同时运行的任务数量,防止系统资源耗尽。
修改的文件
1. src/functional_scaffold/config.py
新增配置项:
max_concurrent_jobs: int = 10 # 最大并发任务数
2. src/functional_scaffold/core/job_manager.py
新增属性:
_semaphore: Optional[asyncio.Semaphore]- 并发控制信号量_max_concurrent_jobs: int- 最大并发数(存储配置值)
修改方法:
__init__()- 初始化 semaphore 和 max_concurrent_jobs 属性initialize()- 创建 Semaphore 实例execute_job()- 使用async with self._semaphore包裹执行逻辑
新增方法:
get_concurrency_status()- 返回并发状态(最大并发数、可用槽位、运行中任务数)
3. src/functional_scaffold/api/models.py
新增模型:
class ConcurrencyStatusResponse(BaseModel):
"""并发状态响应"""
max_concurrent: int
available_slots: int
running_jobs: int
4. src/functional_scaffold/api/routes.py
新增端点:
GET /jobs/concurrency/status
返回当前并发执行状态。
5. tests/test_job_manager.py
新增测试类:
class TestConcurrencyControl:
- test_get_concurrency_status()
- test_get_concurrency_status_without_semaphore()
- test_concurrency_limit()
- test_concurrency_status_api()
修改测试:
test_execute_job()- 添加 semaphore 初始化
工作原理
并发控制流程
创建任务 (POST /jobs)
│
▼
asyncio.create_task(execute_job)
│
▼
检查 Redis 和 semaphore 可用性
│
▼
async with self._semaphore: ← 获取槽位(阻塞直到有可用槽位)
│
├─ 更新状态为 running
├─ 执行算法
├─ 更新状态为 completed/failed
└─ 发送 webhook
│
▼
自动释放槽位
关键设计决策
-
使用 asyncio.Semaphore
- 简单、高效、无需外部依赖
- 自动管理槽位获取和释放
- 支持异步等待
-
在 execute_job 内部使用 semaphore
- 快速失败的检查(Redis 可用性、任务存在性)在 semaphore 外部
- 只有真正要执行的任务才占用槽位
- 任务完成后自动释放(即使发生异常)
-
存储 _max_concurrent_jobs
- Semaphore 不暴露最大值属性
- 需要单独存储以便
get_concurrency_status()使用
测试覆盖
- ✅ 获取并发状态
- ✅ 未初始化时的并发状态
- ✅ 并发限制生效(创建超过限制的任务,验证只有限定数量在运行)
- ✅ API 端点测试
- ✅ 所有现有测试继续通过(60/60)
使用示例
配置并发限制
# 环境变量
export MAX_CONCURRENT_JOBS=20
# 或在 .env 文件
MAX_CONCURRENT_JOBS=20
查询并发状态
curl http://localhost:8000/jobs/concurrency/status
响应:
{
"max_concurrent": 10,
"available_slots": 7,
"running_jobs": 3
}
测试并发控制
# 运行测试脚本
./scripts/test_concurrency.sh
性能影响
优点
- 防止资源耗尽:限制同时运行的任务数
- 可预测的负载:系统负载不会超过配置的限制
- 自动排队:超过限制的任务自动等待
- 零开销:未达到限制时,semaphore 几乎无性能开销
注意事项
- 任务等待:超过限制的任务会等待,可能导致响应延迟
- 内存占用:等待中的任务仍占用内存(协程对象)
- 配置调优:需要根据实际负载调整并发数
监控建议
Prometheus 查询
# 任务创建速率
rate(jobs_created_total[5m])
# 任务完成速率
rate(jobs_completed_total[5m])
# 任务积压(创建 - 完成)
rate(jobs_created_total[5m]) - rate(jobs_completed_total[5m])
Grafana 面板
建议添加以下面板:
- 并发状态时间序列(max_concurrent, available_slots, running_jobs)
- 任务创建/完成速率
- 任务执行时间分布(P50, P95, P99)
未来改进
- 任务超时机制:为长时间运行的任务设置超时
- 优先级队列:支持高优先级任务优先执行
- 动态调整:根据系统负载动态调整并发数
- 任务取消:支持取消等待中或运行中的任务
- 资源限制:更细粒度的 CPU、内存限制
相关文档
测试结果
======================== 60 passed, 7 warnings in 1.53s ========================
所有测试通过,包括 4 个新增的并发控制测试。