main:新增并发控制文档及快速参考指南
更新内容: - 编写《并发控制》详细文档,说明任务并发限制的配置、使用和最佳实践。 - 完成《并发控制实现总结》文档,记录设计决策和开发细节。 - 添加《并发控制快速参考》文档,提供配置和常见问题的快速解决方案。
This commit is contained in:
204
docs/concurrency-control-changelog.md
Normal file
204
docs/concurrency-control-changelog.md
Normal file
@@ -0,0 +1,204 @@
|
|||||||
|
# 异步任务并发控制实现总结
|
||||||
|
|
||||||
|
## 变更概述
|
||||||
|
|
||||||
|
为异步任务管理器添加了并发控制功能,使用 `asyncio.Semaphore` 限制同时运行的任务数量,防止系统资源耗尽。
|
||||||
|
|
||||||
|
## 修改的文件
|
||||||
|
|
||||||
|
### 1. `src/functional_scaffold/config.py`
|
||||||
|
|
||||||
|
**新增配置项:**
|
||||||
|
```python
|
||||||
|
max_concurrent_jobs: int = 10 # 最大并发任务数
|
||||||
|
```
|
||||||
|
|
||||||
|
### 2. `src/functional_scaffold/core/job_manager.py`
|
||||||
|
|
||||||
|
**新增属性:**
|
||||||
|
- `_semaphore: Optional[asyncio.Semaphore]` - 并发控制信号量
|
||||||
|
- `_max_concurrent_jobs: int` - 最大并发数(存储配置值)
|
||||||
|
|
||||||
|
**修改方法:**
|
||||||
|
- `__init__()` - 初始化 semaphore 和 max_concurrent_jobs 属性
|
||||||
|
- `initialize()` - 创建 Semaphore 实例
|
||||||
|
- `execute_job()` - 使用 `async with self._semaphore` 包裹执行逻辑
|
||||||
|
|
||||||
|
**新增方法:**
|
||||||
|
- `get_concurrency_status()` - 返回并发状态(最大并发数、可用槽位、运行中任务数)
|
||||||
|
|
||||||
|
### 3. `src/functional_scaffold/api/models.py`
|
||||||
|
|
||||||
|
**新增模型:**
|
||||||
|
```python
|
||||||
|
class ConcurrencyStatusResponse(BaseModel):
|
||||||
|
"""并发状态响应"""
|
||||||
|
max_concurrent: int
|
||||||
|
available_slots: int
|
||||||
|
running_jobs: int
|
||||||
|
```
|
||||||
|
|
||||||
|
### 4. `src/functional_scaffold/api/routes.py`
|
||||||
|
|
||||||
|
**新增端点:**
|
||||||
|
```python
|
||||||
|
GET /jobs/concurrency/status
|
||||||
|
```
|
||||||
|
|
||||||
|
返回当前并发执行状态。
|
||||||
|
|
||||||
|
### 5. `tests/test_job_manager.py`
|
||||||
|
|
||||||
|
**新增测试类:**
|
||||||
|
```python
|
||||||
|
class TestConcurrencyControl:
|
||||||
|
- test_get_concurrency_status()
|
||||||
|
- test_get_concurrency_status_without_semaphore()
|
||||||
|
- test_concurrency_limit()
|
||||||
|
- test_concurrency_status_api()
|
||||||
|
```
|
||||||
|
|
||||||
|
**修改测试:**
|
||||||
|
- `test_execute_job()` - 添加 semaphore 初始化
|
||||||
|
|
||||||
|
## 工作原理
|
||||||
|
|
||||||
|
### 并发控制流程
|
||||||
|
|
||||||
|
```
|
||||||
|
创建任务 (POST /jobs)
|
||||||
|
│
|
||||||
|
▼
|
||||||
|
asyncio.create_task(execute_job)
|
||||||
|
│
|
||||||
|
▼
|
||||||
|
检查 Redis 和 semaphore 可用性
|
||||||
|
│
|
||||||
|
▼
|
||||||
|
async with self._semaphore: ← 获取槽位(阻塞直到有可用槽位)
|
||||||
|
│
|
||||||
|
├─ 更新状态为 running
|
||||||
|
├─ 执行算法
|
||||||
|
├─ 更新状态为 completed/failed
|
||||||
|
└─ 发送 webhook
|
||||||
|
│
|
||||||
|
▼
|
||||||
|
自动释放槽位
|
||||||
|
```
|
||||||
|
|
||||||
|
### 关键设计决策
|
||||||
|
|
||||||
|
1. **使用 asyncio.Semaphore**
|
||||||
|
- 简单、高效、无需外部依赖
|
||||||
|
- 自动管理槽位获取和释放
|
||||||
|
- 支持异步等待
|
||||||
|
|
||||||
|
2. **在 execute_job 内部使用 semaphore**
|
||||||
|
- 快速失败的检查(Redis 可用性、任务存在性)在 semaphore 外部
|
||||||
|
- 只有真正要执行的任务才占用槽位
|
||||||
|
- 任务完成后自动释放(即使发生异常)
|
||||||
|
|
||||||
|
3. **存储 _max_concurrent_jobs**
|
||||||
|
- Semaphore 不暴露最大值属性
|
||||||
|
- 需要单独存储以便 `get_concurrency_status()` 使用
|
||||||
|
|
||||||
|
## 测试覆盖
|
||||||
|
|
||||||
|
- ✅ 获取并发状态
|
||||||
|
- ✅ 未初始化时的并发状态
|
||||||
|
- ✅ 并发限制生效(创建超过限制的任务,验证只有限定数量在运行)
|
||||||
|
- ✅ API 端点测试
|
||||||
|
- ✅ 所有现有测试继续通过(60/60)
|
||||||
|
|
||||||
|
## 使用示例
|
||||||
|
|
||||||
|
### 配置并发限制
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# 环境变量
|
||||||
|
export MAX_CONCURRENT_JOBS=20
|
||||||
|
|
||||||
|
# 或在 .env 文件
|
||||||
|
MAX_CONCURRENT_JOBS=20
|
||||||
|
```
|
||||||
|
|
||||||
|
### 查询并发状态
|
||||||
|
|
||||||
|
```bash
|
||||||
|
curl http://localhost:8000/jobs/concurrency/status
|
||||||
|
```
|
||||||
|
|
||||||
|
响应:
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"max_concurrent": 10,
|
||||||
|
"available_slots": 7,
|
||||||
|
"running_jobs": 3
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### 测试并发控制
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# 运行测试脚本
|
||||||
|
./scripts/test_concurrency.sh
|
||||||
|
```
|
||||||
|
|
||||||
|
## 性能影响
|
||||||
|
|
||||||
|
### 优点
|
||||||
|
|
||||||
|
1. **防止资源耗尽**:限制同时运行的任务数
|
||||||
|
2. **可预测的负载**:系统负载不会超过配置的限制
|
||||||
|
3. **自动排队**:超过限制的任务自动等待
|
||||||
|
4. **零开销**:未达到限制时,semaphore 几乎无性能开销
|
||||||
|
|
||||||
|
### 注意事项
|
||||||
|
|
||||||
|
1. **任务等待**:超过限制的任务会等待,可能导致响应延迟
|
||||||
|
2. **内存占用**:等待中的任务仍占用内存(协程对象)
|
||||||
|
3. **配置调优**:需要根据实际负载调整并发数
|
||||||
|
|
||||||
|
## 监控建议
|
||||||
|
|
||||||
|
### Prometheus 查询
|
||||||
|
|
||||||
|
```promql
|
||||||
|
# 任务创建速率
|
||||||
|
rate(jobs_created_total[5m])
|
||||||
|
|
||||||
|
# 任务完成速率
|
||||||
|
rate(jobs_completed_total[5m])
|
||||||
|
|
||||||
|
# 任务积压(创建 - 完成)
|
||||||
|
rate(jobs_created_total[5m]) - rate(jobs_completed_total[5m])
|
||||||
|
```
|
||||||
|
|
||||||
|
### Grafana 面板
|
||||||
|
|
||||||
|
建议添加以下面板:
|
||||||
|
1. 并发状态时间序列(max_concurrent, available_slots, running_jobs)
|
||||||
|
2. 任务创建/完成速率
|
||||||
|
3. 任务执行时间分布(P50, P95, P99)
|
||||||
|
|
||||||
|
## 未来改进
|
||||||
|
|
||||||
|
1. **任务超时机制**:为长时间运行的任务设置超时
|
||||||
|
2. **优先级队列**:支持高优先级任务优先执行
|
||||||
|
3. **动态调整**:根据系统负载动态调整并发数
|
||||||
|
4. **任务取消**:支持取消等待中或运行中的任务
|
||||||
|
5. **资源限制**:更细粒度的 CPU、内存限制
|
||||||
|
|
||||||
|
## 相关文档
|
||||||
|
|
||||||
|
- [并发控制详细文档](./concurrency-control.md)
|
||||||
|
- [异步任务接口实现计划](../plans/giggly-hatching-kite.md)
|
||||||
|
- [监控指南](./monitoring.md)
|
||||||
|
|
||||||
|
## 测试结果
|
||||||
|
|
||||||
|
```
|
||||||
|
======================== 60 passed, 7 warnings in 1.53s ========================
|
||||||
|
```
|
||||||
|
|
||||||
|
所有测试通过,包括 4 个新增的并发控制测试。
|
||||||
102
docs/concurrency-control-quickref.md
Normal file
102
docs/concurrency-control-quickref.md
Normal file
@@ -0,0 +1,102 @@
|
|||||||
|
# 并发控制快速参考
|
||||||
|
|
||||||
|
## 配置
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# 设置最大并发数(默认 10)
|
||||||
|
export MAX_CONCURRENT_JOBS=20
|
||||||
|
```
|
||||||
|
|
||||||
|
## API
|
||||||
|
|
||||||
|
### 查询并发状态
|
||||||
|
|
||||||
|
```bash
|
||||||
|
GET /jobs/concurrency/status
|
||||||
|
```
|
||||||
|
|
||||||
|
**响应:**
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"max_concurrent": 10, // 最大并发数
|
||||||
|
"available_slots": 7, // 可用槽位
|
||||||
|
"running_jobs": 3 // 运行中任务数
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## 代码示例
|
||||||
|
|
||||||
|
### 在 JobManager 中使用
|
||||||
|
|
||||||
|
```python
|
||||||
|
# 并发控制自动生效,无需额外代码
|
||||||
|
job_manager = await get_job_manager()
|
||||||
|
job_id = await job_manager.create_job(...)
|
||||||
|
|
||||||
|
# 任务会自动排队,等待可用槽位
|
||||||
|
asyncio.create_task(job_manager.execute_job(job_id))
|
||||||
|
```
|
||||||
|
|
||||||
|
### 查询并发状态
|
||||||
|
|
||||||
|
```python
|
||||||
|
job_manager = await get_job_manager()
|
||||||
|
status = job_manager.get_concurrency_status()
|
||||||
|
|
||||||
|
print(f"运行中: {status['running_jobs']}/{status['max_concurrent']}")
|
||||||
|
print(f"可用槽位: {status['available_slots']}")
|
||||||
|
```
|
||||||
|
|
||||||
|
## 监控
|
||||||
|
|
||||||
|
### 实时监控
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# 持续监控并发状态
|
||||||
|
watch -n 1 'curl -s http://localhost:8000/jobs/concurrency/status | jq'
|
||||||
|
```
|
||||||
|
|
||||||
|
### 测试脚本
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# 运行并发控制测试
|
||||||
|
./scripts/test_concurrency.sh
|
||||||
|
```
|
||||||
|
|
||||||
|
## 推荐配置
|
||||||
|
|
||||||
|
| 任务类型 | 推荐并发数 |
|
||||||
|
|---------|-----------|
|
||||||
|
| CPU 密集型 | 核心数 × 1.5 |
|
||||||
|
| I/O 密集型 | 核心数 × 5-10 |
|
||||||
|
| 混合型 | 核心数 × 2-3 |
|
||||||
|
|
||||||
|
## 故障排查
|
||||||
|
|
||||||
|
### 任务一直 pending
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# 检查并发状态
|
||||||
|
curl http://localhost:8000/jobs/concurrency/status
|
||||||
|
|
||||||
|
# 如果 available_slots = 0,说明所有槽位被占用
|
||||||
|
# 解决方案:
|
||||||
|
# 1. 等待当前任务完成
|
||||||
|
# 2. 增加并发限制
|
||||||
|
# 3. 优化算法性能
|
||||||
|
```
|
||||||
|
|
||||||
|
### 系统资源耗尽
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# 降低并发限制
|
||||||
|
export MAX_CONCURRENT_JOBS=5
|
||||||
|
|
||||||
|
# 重启服务
|
||||||
|
./scripts/run_dev.sh
|
||||||
|
```
|
||||||
|
|
||||||
|
## 相关文档
|
||||||
|
|
||||||
|
- [详细文档](./concurrency-control.md)
|
||||||
|
- [实现总结](./concurrency-control-changelog.md)
|
||||||
204
docs/concurrency-control.md
Normal file
204
docs/concurrency-control.md
Normal file
@@ -0,0 +1,204 @@
|
|||||||
|
# 异步任务并发控制
|
||||||
|
|
||||||
|
## 概述
|
||||||
|
|
||||||
|
为了防止系统资源耗尽和控制负载,任务管理器实现了基于 `asyncio.Semaphore` 的并发控制机制。
|
||||||
|
|
||||||
|
## 配置
|
||||||
|
|
||||||
|
在 `config.py` 或环境变量中设置最大并发任务数:
|
||||||
|
|
||||||
|
```python
|
||||||
|
# config.py
|
||||||
|
max_concurrent_jobs: int = 10 # 默认值
|
||||||
|
```
|
||||||
|
|
||||||
|
或通过环境变量:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
export MAX_CONCURRENT_JOBS=20
|
||||||
|
```
|
||||||
|
|
||||||
|
## 工作原理
|
||||||
|
|
||||||
|
1. **信号量机制**:使用 `asyncio.Semaphore` 限制同时运行的任务数
|
||||||
|
2. **自动管理**:任务开始时获取槽位,完成后自动释放
|
||||||
|
3. **队列等待**:超过限制的任务会自动等待,直到有可用槽位
|
||||||
|
|
||||||
|
### 执行流程
|
||||||
|
|
||||||
|
```
|
||||||
|
POST /jobs 创建任务
|
||||||
|
│
|
||||||
|
▼
|
||||||
|
asyncio.create_task(execute_job)
|
||||||
|
│
|
||||||
|
▼
|
||||||
|
等待获取 semaphore 槽位
|
||||||
|
│
|
||||||
|
▼
|
||||||
|
async with semaphore: ← 获取槽位
|
||||||
|
执行算法
|
||||||
|
更新状态
|
||||||
|
发送 webhook
|
||||||
|
│
|
||||||
|
▼
|
||||||
|
自动释放槽位
|
||||||
|
```
|
||||||
|
|
||||||
|
## API 端点
|
||||||
|
|
||||||
|
### 查询并发状态
|
||||||
|
|
||||||
|
```bash
|
||||||
|
GET /jobs/concurrency/status
|
||||||
|
```
|
||||||
|
|
||||||
|
**响应示例:**
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"max_concurrent": 10,
|
||||||
|
"available_slots": 7,
|
||||||
|
"running_jobs": 3
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**字段说明:**
|
||||||
|
|
||||||
|
- `max_concurrent`: 最大并发任务数(配置值)
|
||||||
|
- `available_slots`: 当前可用槽位数
|
||||||
|
- `running_jobs`: 当前正在运行的任务数
|
||||||
|
|
||||||
|
## 使用示例
|
||||||
|
|
||||||
|
### 1. 创建多个任务
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# 创建 20 个任务
|
||||||
|
for i in {1..20}; do
|
||||||
|
curl -X POST http://localhost:8000/jobs \
|
||||||
|
-H "Content-Type: application/json" \
|
||||||
|
-d "{\"algorithm\": \"PrimeChecker\", \"params\": {\"number\": $i}}"
|
||||||
|
done
|
||||||
|
```
|
||||||
|
|
||||||
|
### 2. 监控并发状态
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# 持续监控并发状态
|
||||||
|
watch -n 1 'curl -s http://localhost:8000/jobs/concurrency/status | jq'
|
||||||
|
```
|
||||||
|
|
||||||
|
输出示例:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"max_concurrent": 10,
|
||||||
|
"available_slots": 0,
|
||||||
|
"running_jobs": 10
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### 3. 调整并发限制
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# 重启服务前设置环境变量
|
||||||
|
export MAX_CONCURRENT_JOBS=20
|
||||||
|
./scripts/run_dev.sh
|
||||||
|
```
|
||||||
|
|
||||||
|
## 性能考虑
|
||||||
|
|
||||||
|
### 选择合适的并发数
|
||||||
|
|
||||||
|
并发数应根据以下因素确定:
|
||||||
|
|
||||||
|
1. **CPU 核心数**:CPU 密集型任务建议设置为核心数的 1-2 倍
|
||||||
|
2. **内存限制**:每个任务的内存占用 × 并发数 < 可用内存
|
||||||
|
3. **外部服务限制**:如果调用外部 API,考虑其速率限制
|
||||||
|
4. **Redis 连接池**:确保 Redis 连接池大小 ≥ 并发数
|
||||||
|
|
||||||
|
### 推荐配置
|
||||||
|
|
||||||
|
| 场景 | 推荐并发数 | 说明 |
|
||||||
|
|------|-----------|------|
|
||||||
|
| CPU 密集型(如质数判断) | 核心数 × 1.5 | 充分利用 CPU |
|
||||||
|
| I/O 密集型(如网络请求) | 核心数 × 5-10 | 等待 I/O 时可切换 |
|
||||||
|
| 混合型 | 核心数 × 2-3 | 平衡 CPU 和 I/O |
|
||||||
|
| 内存受限 | 根据内存计算 | 避免 OOM |
|
||||||
|
|
||||||
|
### 示例计算
|
||||||
|
|
||||||
|
假设:
|
||||||
|
- 服务器:4 核 8GB 内存
|
||||||
|
- 任务类型:I/O 密集型(网络请求)
|
||||||
|
- 单任务内存:50MB
|
||||||
|
|
||||||
|
```
|
||||||
|
最大并发数 = min(
|
||||||
|
核心数 × 8 = 32,
|
||||||
|
可用内存 / 单任务内存 = 8000MB / 50MB = 160
|
||||||
|
) = 32
|
||||||
|
```
|
||||||
|
|
||||||
|
## 监控指标
|
||||||
|
|
||||||
|
相关 Prometheus 指标:
|
||||||
|
|
||||||
|
```promql
|
||||||
|
# 任务创建速率
|
||||||
|
rate(jobs_created_total[5m])
|
||||||
|
|
||||||
|
# 任务完成速率
|
||||||
|
rate(jobs_completed_total[5m])
|
||||||
|
|
||||||
|
# 任务执行时间分布
|
||||||
|
histogram_quantile(0.95, job_execution_duration_seconds_bucket)
|
||||||
|
```
|
||||||
|
|
||||||
|
## 故障排查
|
||||||
|
|
||||||
|
### 问题:任务一直处于 pending 状态
|
||||||
|
|
||||||
|
**可能原因:**
|
||||||
|
1. 所有槽位都被占用
|
||||||
|
2. 某些任务执行时间过长
|
||||||
|
|
||||||
|
**解决方案:**
|
||||||
|
```bash
|
||||||
|
# 1. 检查并发状态
|
||||||
|
curl http://localhost:8000/jobs/concurrency/status
|
||||||
|
|
||||||
|
# 2. 如果 available_slots = 0,说明所有槽位被占用
|
||||||
|
# 3. 检查是否有长时间运行的任务
|
||||||
|
# 4. 考虑增加并发限制或优化算法性能
|
||||||
|
```
|
||||||
|
|
||||||
|
### 问题:系统资源耗尽
|
||||||
|
|
||||||
|
**可能原因:**
|
||||||
|
并发数设置过高
|
||||||
|
|
||||||
|
**解决方案:**
|
||||||
|
```bash
|
||||||
|
# 降低并发限制
|
||||||
|
export MAX_CONCURRENT_JOBS=5
|
||||||
|
# 重启服务
|
||||||
|
```
|
||||||
|
|
||||||
|
## 最佳实践
|
||||||
|
|
||||||
|
1. **监控优先**:部署后持续监控并发状态和系统资源
|
||||||
|
2. **逐步调整**:从保守值开始,逐步增加并发数
|
||||||
|
3. **压力测试**:在生产环境前进行充分的压力测试
|
||||||
|
4. **设置告警**:当 `available_slots = 0` 持续时间过长时告警
|
||||||
|
5. **任务超时**:为长时间运行的任务设置超时机制(待实现)
|
||||||
|
|
||||||
|
## 未来改进
|
||||||
|
|
||||||
|
- [ ] 任务超时机制
|
||||||
|
- [ ] 优先级队列
|
||||||
|
- [ ] 动态调整并发数
|
||||||
|
- [ ] 任务取消功能
|
||||||
|
- [ ] 更细粒度的资源控制(CPU、内存限制)
|
||||||
Reference in New Issue
Block a user