# 异步任务并发控制 ## 概述 为了防止系统资源耗尽和控制负载,任务管理器实现了基于 `asyncio.Semaphore` 的并发控制机制。 ## 配置 在 `config.py` 或环境变量中设置最大并发任务数: ```python # config.py max_concurrent_jobs: int = 10 # 默认值 ``` 或通过环境变量: ```bash export MAX_CONCURRENT_JOBS=20 ``` ## 工作原理 1. **信号量机制**:使用 `asyncio.Semaphore` 限制同时运行的任务数 2. **自动管理**:任务开始时获取槽位,完成后自动释放 3. **队列等待**:超过限制的任务会自动等待,直到有可用槽位 ### 执行流程 ``` POST /jobs 创建任务 │ ▼ asyncio.create_task(execute_job) │ ▼ 等待获取 semaphore 槽位 │ ▼ async with semaphore: ← 获取槽位 执行算法 更新状态 发送 webhook │ ▼ 自动释放槽位 ``` ## API 端点 ### 查询并发状态 ```bash GET /jobs/concurrency/status ``` **响应示例:** ```json { "max_concurrent": 10, "available_slots": 7, "running_jobs": 3 } ``` **字段说明:** - `max_concurrent`: 最大并发任务数(配置值) - `available_slots`: 当前可用槽位数 - `running_jobs`: 当前正在运行的任务数 ## 使用示例 ### 1. 创建多个任务 ```bash # 创建 20 个任务 for i in {1..20}; do curl -X POST http://localhost:8000/jobs \ -H "Content-Type: application/json" \ -d "{\"algorithm\": \"PrimeChecker\", \"params\": {\"number\": $i}}" done ``` ### 2. 监控并发状态 ```bash # 持续监控并发状态 watch -n 1 'curl -s http://localhost:8000/jobs/concurrency/status | jq' ``` 输出示例: ```json { "max_concurrent": 10, "available_slots": 0, "running_jobs": 10 } ``` ### 3. 调整并发限制 ```bash # 重启服务前设置环境变量 export MAX_CONCURRENT_JOBS=20 ./scripts/run_dev.sh ``` ## 性能考虑 ### 选择合适的并发数 并发数应根据以下因素确定: 1. **CPU 核心数**:CPU 密集型任务建议设置为核心数的 1-2 倍 2. **内存限制**:每个任务的内存占用 × 并发数 < 可用内存 3. **外部服务限制**:如果调用外部 API,考虑其速率限制 4. **Redis 连接池**:确保 Redis 连接池大小 ≥ 并发数 ### 推荐配置 | 场景 | 推荐并发数 | 说明 | |------|-----------|------| | CPU 密集型(如质数判断) | 核心数 × 1.5 | 充分利用 CPU | | I/O 密集型(如网络请求) | 核心数 × 5-10 | 等待 I/O 时可切换 | | 混合型 | 核心数 × 2-3 | 平衡 CPU 和 I/O | | 内存受限 | 根据内存计算 | 避免 OOM | ### 示例计算 假设: - 服务器:4 核 8GB 内存 - 任务类型:I/O 密集型(网络请求) - 单任务内存:50MB ``` 最大并发数 = min( 核心数 × 8 = 32, 可用内存 / 单任务内存 = 8000MB / 50MB = 160 ) = 32 ``` ## 监控指标 相关 Prometheus 指标: ```promql # 任务创建速率 rate(jobs_created_total[5m]) # 任务完成速率 rate(jobs_completed_total[5m]) # 任务执行时间分布 histogram_quantile(0.95, job_execution_duration_seconds_bucket) ``` ## 故障排查 ### 问题:任务一直处于 pending 状态 **可能原因:** 1. 所有槽位都被占用 2. 某些任务执行时间过长 **解决方案:** ```bash # 1. 检查并发状态 curl http://localhost:8000/jobs/concurrency/status # 2. 如果 available_slots = 0,说明所有槽位被占用 # 3. 检查是否有长时间运行的任务 # 4. 考虑增加并发限制或优化算法性能 ``` ### 问题:系统资源耗尽 **可能原因:** 并发数设置过高 **解决方案:** ```bash # 降低并发限制 export MAX_CONCURRENT_JOBS=5 # 重启服务 ``` ## 最佳实践 1. **监控优先**:部署后持续监控并发状态和系统资源 2. **逐步调整**:从保守值开始,逐步增加并发数 3. **压力测试**:在生产环境前进行充分的压力测试 4. **设置告警**:当 `available_slots = 0` 持续时间过长时告警 5. **任务超时**:为长时间运行的任务设置超时机制(待实现) ## 未来改进 - [ ] 任务超时机制 - [ ] 优先级队列 - [ ] 动态调整并发数 - [ ] 任务取消功能 - [ ] 更细粒度的资源控制(CPU、内存限制)