Files
FunctionalScaffold/docs/concurrency-control-changelog.md
Roog (顾新培) 3e1d850954 main:新增并发控制文档及快速参考指南
更新内容:
- 编写《并发控制》详细文档,说明任务并发限制的配置、使用和最佳实践。
- 完成《并发控制实现总结》文档,记录设计决策和开发细节。
- 添加《并发控制快速参考》文档,提供配置和常见问题的快速解决方案。
2026-02-03 18:38:08 +08:00

205 lines
4.8 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 异步任务并发控制实现总结
## 变更概述
为异步任务管理器添加了并发控制功能,使用 `asyncio.Semaphore` 限制同时运行的任务数量,防止系统资源耗尽。
## 修改的文件
### 1. `src/functional_scaffold/config.py`
**新增配置项:**
```python
max_concurrent_jobs: int = 10 # 最大并发任务数
```
### 2. `src/functional_scaffold/core/job_manager.py`
**新增属性:**
- `_semaphore: Optional[asyncio.Semaphore]` - 并发控制信号量
- `_max_concurrent_jobs: int` - 最大并发数(存储配置值)
**修改方法:**
- `__init__()` - 初始化 semaphore 和 max_concurrent_jobs 属性
- `initialize()` - 创建 Semaphore 实例
- `execute_job()` - 使用 `async with self._semaphore` 包裹执行逻辑
**新增方法:**
- `get_concurrency_status()` - 返回并发状态(最大并发数、可用槽位、运行中任务数)
### 3. `src/functional_scaffold/api/models.py`
**新增模型:**
```python
class ConcurrencyStatusResponse(BaseModel):
"""并发状态响应"""
max_concurrent: int
available_slots: int
running_jobs: int
```
### 4. `src/functional_scaffold/api/routes.py`
**新增端点:**
```python
GET /jobs/concurrency/status
```
返回当前并发执行状态。
### 5. `tests/test_job_manager.py`
**新增测试类:**
```python
class TestConcurrencyControl:
- test_get_concurrency_status()
- test_get_concurrency_status_without_semaphore()
- test_concurrency_limit()
- test_concurrency_status_api()
```
**修改测试:**
- `test_execute_job()` - 添加 semaphore 初始化
## 工作原理
### 并发控制流程
```
创建任务 (POST /jobs)
asyncio.create_task(execute_job)
检查 Redis 和 semaphore 可用性
async with self._semaphore: ← 获取槽位(阻塞直到有可用槽位)
├─ 更新状态为 running
├─ 执行算法
├─ 更新状态为 completed/failed
└─ 发送 webhook
自动释放槽位
```
### 关键设计决策
1. **使用 asyncio.Semaphore**
- 简单、高效、无需外部依赖
- 自动管理槽位获取和释放
- 支持异步等待
2. **在 execute_job 内部使用 semaphore**
- 快速失败的检查Redis 可用性、任务存在性)在 semaphore 外部
- 只有真正要执行的任务才占用槽位
- 任务完成后自动释放(即使发生异常)
3. **存储 _max_concurrent_jobs**
- Semaphore 不暴露最大值属性
- 需要单独存储以便 `get_concurrency_status()` 使用
## 测试覆盖
- ✅ 获取并发状态
- ✅ 未初始化时的并发状态
- ✅ 并发限制生效(创建超过限制的任务,验证只有限定数量在运行)
- ✅ API 端点测试
- ✅ 所有现有测试继续通过60/60
## 使用示例
### 配置并发限制
```bash
# 环境变量
export MAX_CONCURRENT_JOBS=20
# 或在 .env 文件
MAX_CONCURRENT_JOBS=20
```
### 查询并发状态
```bash
curl http://localhost:8000/jobs/concurrency/status
```
响应:
```json
{
"max_concurrent": 10,
"available_slots": 7,
"running_jobs": 3
}
```
### 测试并发控制
```bash
# 运行测试脚本
./scripts/test_concurrency.sh
```
## 性能影响
### 优点
1. **防止资源耗尽**:限制同时运行的任务数
2. **可预测的负载**:系统负载不会超过配置的限制
3. **自动排队**:超过限制的任务自动等待
4. **零开销**未达到限制时semaphore 几乎无性能开销
### 注意事项
1. **任务等待**:超过限制的任务会等待,可能导致响应延迟
2. **内存占用**:等待中的任务仍占用内存(协程对象)
3. **配置调优**:需要根据实际负载调整并发数
## 监控建议
### Prometheus 查询
```promql
# 任务创建速率
rate(jobs_created_total[5m])
# 任务完成速率
rate(jobs_completed_total[5m])
# 任务积压(创建 - 完成)
rate(jobs_created_total[5m]) - rate(jobs_completed_total[5m])
```
### Grafana 面板
建议添加以下面板:
1. 并发状态时间序列max_concurrent, available_slots, running_jobs
2. 任务创建/完成速率
3. 任务执行时间分布P50, P95, P99
## 未来改进
1. **任务超时机制**:为长时间运行的任务设置超时
2. **优先级队列**:支持高优先级任务优先执行
3. **动态调整**:根据系统负载动态调整并发数
4. **任务取消**:支持取消等待中或运行中的任务
5. **资源限制**:更细粒度的 CPU、内存限制
## 相关文档
- [并发控制详细文档](./concurrency-control.md)
- [异步任务接口实现计划](../plans/giggly-hatching-kite.md)
- [监控指南](./monitoring.md)
## 测试结果
```
======================== 60 passed, 7 warnings in 1.53s ========================
```
所有测试通过,包括 4 个新增的并发控制测试。