# 异步任务并发控制实现总结 ## 变更概述 为异步任务管理器添加了并发控制功能,使用 `asyncio.Semaphore` 限制同时运行的任务数量,防止系统资源耗尽。 ## 修改的文件 ### 1. `src/functional_scaffold/config.py` **新增配置项:** ```python max_concurrent_jobs: int = 10 # 最大并发任务数 ``` ### 2. `src/functional_scaffold/core/job_manager.py` **新增属性:** - `_semaphore: Optional[asyncio.Semaphore]` - 并发控制信号量 - `_max_concurrent_jobs: int` - 最大并发数(存储配置值) **修改方法:** - `__init__()` - 初始化 semaphore 和 max_concurrent_jobs 属性 - `initialize()` - 创建 Semaphore 实例 - `execute_job()` - 使用 `async with self._semaphore` 包裹执行逻辑 **新增方法:** - `get_concurrency_status()` - 返回并发状态(最大并发数、可用槽位、运行中任务数) ### 3. `src/functional_scaffold/api/models.py` **新增模型:** ```python class ConcurrencyStatusResponse(BaseModel): """并发状态响应""" max_concurrent: int available_slots: int running_jobs: int ``` ### 4. `src/functional_scaffold/api/routes.py` **新增端点:** ```python GET /jobs/concurrency/status ``` 返回当前并发执行状态。 ### 5. `tests/test_job_manager.py` **新增测试类:** ```python class TestConcurrencyControl: - test_get_concurrency_status() - test_get_concurrency_status_without_semaphore() - test_concurrency_limit() - test_concurrency_status_api() ``` **修改测试:** - `test_execute_job()` - 添加 semaphore 初始化 ## 工作原理 ### 并发控制流程 ``` 创建任务 (POST /jobs) │ ▼ asyncio.create_task(execute_job) │ ▼ 检查 Redis 和 semaphore 可用性 │ ▼ async with self._semaphore: ← 获取槽位(阻塞直到有可用槽位) │ ├─ 更新状态为 running ├─ 执行算法 ├─ 更新状态为 completed/failed └─ 发送 webhook │ ▼ 自动释放槽位 ``` ### 关键设计决策 1. **使用 asyncio.Semaphore** - 简单、高效、无需外部依赖 - 自动管理槽位获取和释放 - 支持异步等待 2. **在 execute_job 内部使用 semaphore** - 快速失败的检查(Redis 可用性、任务存在性)在 semaphore 外部 - 只有真正要执行的任务才占用槽位 - 任务完成后自动释放(即使发生异常) 3. **存储 _max_concurrent_jobs** - Semaphore 不暴露最大值属性 - 需要单独存储以便 `get_concurrency_status()` 使用 ## 测试覆盖 - ✅ 获取并发状态 - ✅ 未初始化时的并发状态 - ✅ 并发限制生效(创建超过限制的任务,验证只有限定数量在运行) - ✅ API 端点测试 - ✅ 所有现有测试继续通过(60/60) ## 使用示例 ### 配置并发限制 ```bash # 环境变量 export MAX_CONCURRENT_JOBS=20 # 或在 .env 文件 MAX_CONCURRENT_JOBS=20 ``` ### 查询并发状态 ```bash curl http://localhost:8000/jobs/concurrency/status ``` 响应: ```json { "max_concurrent": 10, "available_slots": 7, "running_jobs": 3 } ``` ### 测试并发控制 ```bash # 运行测试脚本 ./scripts/test_concurrency.sh ``` ## 性能影响 ### 优点 1. **防止资源耗尽**:限制同时运行的任务数 2. **可预测的负载**:系统负载不会超过配置的限制 3. **自动排队**:超过限制的任务自动等待 4. **零开销**:未达到限制时,semaphore 几乎无性能开销 ### 注意事项 1. **任务等待**:超过限制的任务会等待,可能导致响应延迟 2. **内存占用**:等待中的任务仍占用内存(协程对象) 3. **配置调优**:需要根据实际负载调整并发数 ## 监控建议 ### Prometheus 查询 ```promql # 任务创建速率 rate(jobs_created_total[5m]) # 任务完成速率 rate(jobs_completed_total[5m]) # 任务积压(创建 - 完成) rate(jobs_created_total[5m]) - rate(jobs_completed_total[5m]) ``` ### Grafana 面板 建议添加以下面板: 1. 并发状态时间序列(max_concurrent, available_slots, running_jobs) 2. 任务创建/完成速率 3. 任务执行时间分布(P50, P95, P99) ## 未来改进 1. **任务超时机制**:为长时间运行的任务设置超时 2. **优先级队列**:支持高优先级任务优先执行 3. **动态调整**:根据系统负载动态调整并发数 4. **任务取消**:支持取消等待中或运行中的任务 5. **资源限制**:更细粒度的 CPU、内存限制 ## 相关文档 - [并发控制详细文档](./concurrency-control.md) - [异步任务接口实现计划](../plans/giggly-hatching-kite.md) - [监控指南](./monitoring.md) ## 测试结果 ``` ======================== 60 passed, 7 warnings in 1.53s ======================== ``` 所有测试通过,包括 4 个新增的并发控制测试。