更新内容: - 编写《并发控制》详细文档,说明任务并发限制的配置、使用和最佳实践。 - 完成《并发控制实现总结》文档,记录设计决策和开发细节。 - 添加《并发控制快速参考》文档,提供配置和常见问题的快速解决方案。
205 lines
4.2 KiB
Markdown
205 lines
4.2 KiB
Markdown
# 异步任务并发控制
|
||
|
||
## 概述
|
||
|
||
为了防止系统资源耗尽和控制负载,任务管理器实现了基于 `asyncio.Semaphore` 的并发控制机制。
|
||
|
||
## 配置
|
||
|
||
在 `config.py` 或环境变量中设置最大并发任务数:
|
||
|
||
```python
|
||
# config.py
|
||
max_concurrent_jobs: int = 10 # 默认值
|
||
```
|
||
|
||
或通过环境变量:
|
||
|
||
```bash
|
||
export MAX_CONCURRENT_JOBS=20
|
||
```
|
||
|
||
## 工作原理
|
||
|
||
1. **信号量机制**:使用 `asyncio.Semaphore` 限制同时运行的任务数
|
||
2. **自动管理**:任务开始时获取槽位,完成后自动释放
|
||
3. **队列等待**:超过限制的任务会自动等待,直到有可用槽位
|
||
|
||
### 执行流程
|
||
|
||
```
|
||
POST /jobs 创建任务
|
||
│
|
||
▼
|
||
asyncio.create_task(execute_job)
|
||
│
|
||
▼
|
||
等待获取 semaphore 槽位
|
||
│
|
||
▼
|
||
async with semaphore: ← 获取槽位
|
||
执行算法
|
||
更新状态
|
||
发送 webhook
|
||
│
|
||
▼
|
||
自动释放槽位
|
||
```
|
||
|
||
## API 端点
|
||
|
||
### 查询并发状态
|
||
|
||
```bash
|
||
GET /jobs/concurrency/status
|
||
```
|
||
|
||
**响应示例:**
|
||
|
||
```json
|
||
{
|
||
"max_concurrent": 10,
|
||
"available_slots": 7,
|
||
"running_jobs": 3
|
||
}
|
||
```
|
||
|
||
**字段说明:**
|
||
|
||
- `max_concurrent`: 最大并发任务数(配置值)
|
||
- `available_slots`: 当前可用槽位数
|
||
- `running_jobs`: 当前正在运行的任务数
|
||
|
||
## 使用示例
|
||
|
||
### 1. 创建多个任务
|
||
|
||
```bash
|
||
# 创建 20 个任务
|
||
for i in {1..20}; do
|
||
curl -X POST http://localhost:8000/jobs \
|
||
-H "Content-Type: application/json" \
|
||
-d "{\"algorithm\": \"PrimeChecker\", \"params\": {\"number\": $i}}"
|
||
done
|
||
```
|
||
|
||
### 2. 监控并发状态
|
||
|
||
```bash
|
||
# 持续监控并发状态
|
||
watch -n 1 'curl -s http://localhost:8000/jobs/concurrency/status | jq'
|
||
```
|
||
|
||
输出示例:
|
||
|
||
```json
|
||
{
|
||
"max_concurrent": 10,
|
||
"available_slots": 0,
|
||
"running_jobs": 10
|
||
}
|
||
```
|
||
|
||
### 3. 调整并发限制
|
||
|
||
```bash
|
||
# 重启服务前设置环境变量
|
||
export MAX_CONCURRENT_JOBS=20
|
||
./scripts/run_dev.sh
|
||
```
|
||
|
||
## 性能考虑
|
||
|
||
### 选择合适的并发数
|
||
|
||
并发数应根据以下因素确定:
|
||
|
||
1. **CPU 核心数**:CPU 密集型任务建议设置为核心数的 1-2 倍
|
||
2. **内存限制**:每个任务的内存占用 × 并发数 < 可用内存
|
||
3. **外部服务限制**:如果调用外部 API,考虑其速率限制
|
||
4. **Redis 连接池**:确保 Redis 连接池大小 ≥ 并发数
|
||
|
||
### 推荐配置
|
||
|
||
| 场景 | 推荐并发数 | 说明 |
|
||
|------|-----------|------|
|
||
| CPU 密集型(如质数判断) | 核心数 × 1.5 | 充分利用 CPU |
|
||
| I/O 密集型(如网络请求) | 核心数 × 5-10 | 等待 I/O 时可切换 |
|
||
| 混合型 | 核心数 × 2-3 | 平衡 CPU 和 I/O |
|
||
| 内存受限 | 根据内存计算 | 避免 OOM |
|
||
|
||
### 示例计算
|
||
|
||
假设:
|
||
- 服务器:4 核 8GB 内存
|
||
- 任务类型:I/O 密集型(网络请求)
|
||
- 单任务内存:50MB
|
||
|
||
```
|
||
最大并发数 = min(
|
||
核心数 × 8 = 32,
|
||
可用内存 / 单任务内存 = 8000MB / 50MB = 160
|
||
) = 32
|
||
```
|
||
|
||
## 监控指标
|
||
|
||
相关 Prometheus 指标:
|
||
|
||
```promql
|
||
# 任务创建速率
|
||
rate(jobs_created_total[5m])
|
||
|
||
# 任务完成速率
|
||
rate(jobs_completed_total[5m])
|
||
|
||
# 任务执行时间分布
|
||
histogram_quantile(0.95, job_execution_duration_seconds_bucket)
|
||
```
|
||
|
||
## 故障排查
|
||
|
||
### 问题:任务一直处于 pending 状态
|
||
|
||
**可能原因:**
|
||
1. 所有槽位都被占用
|
||
2. 某些任务执行时间过长
|
||
|
||
**解决方案:**
|
||
```bash
|
||
# 1. 检查并发状态
|
||
curl http://localhost:8000/jobs/concurrency/status
|
||
|
||
# 2. 如果 available_slots = 0,说明所有槽位被占用
|
||
# 3. 检查是否有长时间运行的任务
|
||
# 4. 考虑增加并发限制或优化算法性能
|
||
```
|
||
|
||
### 问题:系统资源耗尽
|
||
|
||
**可能原因:**
|
||
并发数设置过高
|
||
|
||
**解决方案:**
|
||
```bash
|
||
# 降低并发限制
|
||
export MAX_CONCURRENT_JOBS=5
|
||
# 重启服务
|
||
```
|
||
|
||
## 最佳实践
|
||
|
||
1. **监控优先**:部署后持续监控并发状态和系统资源
|
||
2. **逐步调整**:从保守值开始,逐步增加并发数
|
||
3. **压力测试**:在生产环境前进行充分的压力测试
|
||
4. **设置告警**:当 `available_slots = 0` 持续时间过长时告警
|
||
5. **任务超时**:为长时间运行的任务设置超时机制(待实现)
|
||
|
||
## 未来改进
|
||
|
||
- [ ] 任务超时机制
|
||
- [ ] 优先级队列
|
||
- [ ] 动态调整并发数
|
||
- [ ] 任务取消功能
|
||
- [ ] 更细粒度的资源控制(CPU、内存限制)
|