From 8afff21fae242f1696d9eb46363ac48ac1b83107 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roog=20=28=E9=A1=BE=E6=96=B0=E5=9F=B9=29?= Date: Mon, 2 Feb 2026 17:15:11 +0800 Subject: [PATCH] =?UTF-8?q?main:=E6=96=B0=E5=A2=9E=E5=B9=B6=E5=8F=91?= =?UTF-8?q?=E6=8E=A7=E5=88=B6=E6=96=87=E6=A1=A3=E5=8F=8A=E5=BF=AB=E9=80=9F?= =?UTF-8?q?=E5=8F=82=E8=80=83=E6=8C=87=E5=8D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 更新内容: - 编写《并发控制》详细文档,说明任务并发限制的配置、使用和最佳实践。 - 完成《并发控制实现总结》文档,记录设计决策和开发细节。 - 添加《并发控制快速参考》文档,提供配置和常见问题的快速解决方案。 --- docs/concurrency-control-changelog.md | 204 ++++++++++++++++++++++++++ docs/concurrency-control-quickref.md | 102 +++++++++++++ docs/concurrency-control.md | 204 ++++++++++++++++++++++++++ 3 files changed, 510 insertions(+) create mode 100644 docs/concurrency-control-changelog.md create mode 100644 docs/concurrency-control-quickref.md create mode 100644 docs/concurrency-control.md diff --git a/docs/concurrency-control-changelog.md b/docs/concurrency-control-changelog.md new file mode 100644 index 0000000..f554262 --- /dev/null +++ b/docs/concurrency-control-changelog.md @@ -0,0 +1,204 @@ +# 异步任务并发控制实现总结 + +## 变更概述 + +为异步任务管理器添加了并发控制功能,使用 `asyncio.Semaphore` 限制同时运行的任务数量,防止系统资源耗尽。 + +## 修改的文件 + +### 1. `src/functional_scaffold/config.py` + +**新增配置项:** +```python +max_concurrent_jobs: int = 10 # 最大并发任务数 +``` + +### 2. `src/functional_scaffold/core/job_manager.py` + +**新增属性:** +- `_semaphore: Optional[asyncio.Semaphore]` - 并发控制信号量 +- `_max_concurrent_jobs: int` - 最大并发数(存储配置值) + +**修改方法:** +- `__init__()` - 初始化 semaphore 和 max_concurrent_jobs 属性 +- `initialize()` - 创建 Semaphore 实例 +- `execute_job()` - 使用 `async with self._semaphore` 包裹执行逻辑 + +**新增方法:** +- `get_concurrency_status()` - 返回并发状态(最大并发数、可用槽位、运行中任务数) + +### 3. `src/functional_scaffold/api/models.py` + +**新增模型:** +```python +class ConcurrencyStatusResponse(BaseModel): + """并发状态响应""" + max_concurrent: int + available_slots: int + running_jobs: int +``` + +### 4. `src/functional_scaffold/api/routes.py` + +**新增端点:** +```python +GET /jobs/concurrency/status +``` + +返回当前并发执行状态。 + +### 5. `tests/test_job_manager.py` + +**新增测试类:** +```python +class TestConcurrencyControl: + - test_get_concurrency_status() + - test_get_concurrency_status_without_semaphore() + - test_concurrency_limit() + - test_concurrency_status_api() +``` + +**修改测试:** +- `test_execute_job()` - 添加 semaphore 初始化 + +## 工作原理 + +### 并发控制流程 + +``` +创建任务 (POST /jobs) + │ + ▼ +asyncio.create_task(execute_job) + │ + ▼ +检查 Redis 和 semaphore 可用性 + │ + ▼ +async with self._semaphore: ← 获取槽位(阻塞直到有可用槽位) + │ + ├─ 更新状态为 running + ├─ 执行算法 + ├─ 更新状态为 completed/failed + └─ 发送 webhook + │ + ▼ +自动释放槽位 +``` + +### 关键设计决策 + +1. **使用 asyncio.Semaphore** + - 简单、高效、无需外部依赖 + - 自动管理槽位获取和释放 + - 支持异步等待 + +2. **在 execute_job 内部使用 semaphore** + - 快速失败的检查(Redis 可用性、任务存在性)在 semaphore 外部 + - 只有真正要执行的任务才占用槽位 + - 任务完成后自动释放(即使发生异常) + +3. **存储 _max_concurrent_jobs** + - Semaphore 不暴露最大值属性 + - 需要单独存储以便 `get_concurrency_status()` 使用 + +## 测试覆盖 + +- ✅ 获取并发状态 +- ✅ 未初始化时的并发状态 +- ✅ 并发限制生效(创建超过限制的任务,验证只有限定数量在运行) +- ✅ API 端点测试 +- ✅ 所有现有测试继续通过(60/60) + +## 使用示例 + +### 配置并发限制 + +```bash +# 环境变量 +export MAX_CONCURRENT_JOBS=20 + +# 或在 .env 文件 +MAX_CONCURRENT_JOBS=20 +``` + +### 查询并发状态 + +```bash +curl http://localhost:8000/jobs/concurrency/status +``` + +响应: +```json +{ + "max_concurrent": 10, + "available_slots": 7, + "running_jobs": 3 +} +``` + +### 测试并发控制 + +```bash +# 运行测试脚本 +./scripts/test_concurrency.sh +``` + +## 性能影响 + +### 优点 + +1. **防止资源耗尽**:限制同时运行的任务数 +2. **可预测的负载**:系统负载不会超过配置的限制 +3. **自动排队**:超过限制的任务自动等待 +4. **零开销**:未达到限制时,semaphore 几乎无性能开销 + +### 注意事项 + +1. **任务等待**:超过限制的任务会等待,可能导致响应延迟 +2. **内存占用**:等待中的任务仍占用内存(协程对象) +3. **配置调优**:需要根据实际负载调整并发数 + +## 监控建议 + +### Prometheus 查询 + +```promql +# 任务创建速率 +rate(jobs_created_total[5m]) + +# 任务完成速率 +rate(jobs_completed_total[5m]) + +# 任务积压(创建 - 完成) +rate(jobs_created_total[5m]) - rate(jobs_completed_total[5m]) +``` + +### Grafana 面板 + +建议添加以下面板: +1. 并发状态时间序列(max_concurrent, available_slots, running_jobs) +2. 任务创建/完成速率 +3. 任务执行时间分布(P50, P95, P99) + +## 未来改进 + +1. **任务超时机制**:为长时间运行的任务设置超时 +2. **优先级队列**:支持高优先级任务优先执行 +3. **动态调整**:根据系统负载动态调整并发数 +4. **任务取消**:支持取消等待中或运行中的任务 +5. **资源限制**:更细粒度的 CPU、内存限制 + +## 相关文档 + +- [并发控制详细文档](./concurrency-control.md) +- [异步任务接口实现计划](../plans/giggly-hatching-kite.md) +- [监控指南](./monitoring.md) + +## 测试结果 + +``` +======================== 60 passed, 7 warnings in 1.53s ======================== +``` + +所有测试通过,包括 4 个新增的并发控制测试。 diff --git a/docs/concurrency-control-quickref.md b/docs/concurrency-control-quickref.md new file mode 100644 index 0000000..601f09b --- /dev/null +++ b/docs/concurrency-control-quickref.md @@ -0,0 +1,102 @@ +# 并发控制快速参考 + +## 配置 + +```bash +# 设置最大并发数(默认 10) +export MAX_CONCURRENT_JOBS=20 +``` + +## API + +### 查询并发状态 + +```bash +GET /jobs/concurrency/status +``` + +**响应:** +```json +{ + "max_concurrent": 10, // 最大并发数 + "available_slots": 7, // 可用槽位 + "running_jobs": 3 // 运行中任务数 +} +``` + +## 代码示例 + +### 在 JobManager 中使用 + +```python +# 并发控制自动生效,无需额外代码 +job_manager = await get_job_manager() +job_id = await job_manager.create_job(...) + +# 任务会自动排队,等待可用槽位 +asyncio.create_task(job_manager.execute_job(job_id)) +``` + +### 查询并发状态 + +```python +job_manager = await get_job_manager() +status = job_manager.get_concurrency_status() + +print(f"运行中: {status['running_jobs']}/{status['max_concurrent']}") +print(f"可用槽位: {status['available_slots']}") +``` + +## 监控 + +### 实时监控 + +```bash +# 持续监控并发状态 +watch -n 1 'curl -s http://localhost:8000/jobs/concurrency/status | jq' +``` + +### 测试脚本 + +```bash +# 运行并发控制测试 +./scripts/test_concurrency.sh +``` + +## 推荐配置 + +| 任务类型 | 推荐并发数 | +|---------|-----------| +| CPU 密集型 | 核心数 × 1.5 | +| I/O 密集型 | 核心数 × 5-10 | +| 混合型 | 核心数 × 2-3 | + +## 故障排查 + +### 任务一直 pending + +```bash +# 检查并发状态 +curl http://localhost:8000/jobs/concurrency/status + +# 如果 available_slots = 0,说明所有槽位被占用 +# 解决方案: +# 1. 等待当前任务完成 +# 2. 增加并发限制 +# 3. 优化算法性能 +``` + +### 系统资源耗尽 + +```bash +# 降低并发限制 +export MAX_CONCURRENT_JOBS=5 + +# 重启服务 +./scripts/run_dev.sh +``` + +## 相关文档 + +- [详细文档](./concurrency-control.md) +- [实现总结](./concurrency-control-changelog.md) diff --git a/docs/concurrency-control.md b/docs/concurrency-control.md new file mode 100644 index 0000000..9e2c574 --- /dev/null +++ b/docs/concurrency-control.md @@ -0,0 +1,204 @@ +# 异步任务并发控制 + +## 概述 + +为了防止系统资源耗尽和控制负载,任务管理器实现了基于 `asyncio.Semaphore` 的并发控制机制。 + +## 配置 + +在 `config.py` 或环境变量中设置最大并发任务数: + +```python +# config.py +max_concurrent_jobs: int = 10 # 默认值 +``` + +或通过环境变量: + +```bash +export MAX_CONCURRENT_JOBS=20 +``` + +## 工作原理 + +1. **信号量机制**:使用 `asyncio.Semaphore` 限制同时运行的任务数 +2. **自动管理**:任务开始时获取槽位,完成后自动释放 +3. **队列等待**:超过限制的任务会自动等待,直到有可用槽位 + +### 执行流程 + +``` +POST /jobs 创建任务 + │ + ▼ +asyncio.create_task(execute_job) + │ + ▼ +等待获取 semaphore 槽位 + │ + ▼ +async with semaphore: ← 获取槽位 + 执行算法 + 更新状态 + 发送 webhook + │ + ▼ +自动释放槽位 +``` + +## API 端点 + +### 查询并发状态 + +```bash +GET /jobs/concurrency/status +``` + +**响应示例:** + +```json +{ + "max_concurrent": 10, + "available_slots": 7, + "running_jobs": 3 +} +``` + +**字段说明:** + +- `max_concurrent`: 最大并发任务数(配置值) +- `available_slots`: 当前可用槽位数 +- `running_jobs`: 当前正在运行的任务数 + +## 使用示例 + +### 1. 创建多个任务 + +```bash +# 创建 20 个任务 +for i in {1..20}; do + curl -X POST http://localhost:8000/jobs \ + -H "Content-Type: application/json" \ + -d "{\"algorithm\": \"PrimeChecker\", \"params\": {\"number\": $i}}" +done +``` + +### 2. 监控并发状态 + +```bash +# 持续监控并发状态 +watch -n 1 'curl -s http://localhost:8000/jobs/concurrency/status | jq' +``` + +输出示例: + +```json +{ + "max_concurrent": 10, + "available_slots": 0, + "running_jobs": 10 +} +``` + +### 3. 调整并发限制 + +```bash +# 重启服务前设置环境变量 +export MAX_CONCURRENT_JOBS=20 +./scripts/run_dev.sh +``` + +## 性能考虑 + +### 选择合适的并发数 + +并发数应根据以下因素确定: + +1. **CPU 核心数**:CPU 密集型任务建议设置为核心数的 1-2 倍 +2. **内存限制**:每个任务的内存占用 × 并发数 < 可用内存 +3. **外部服务限制**:如果调用外部 API,考虑其速率限制 +4. **Redis 连接池**:确保 Redis 连接池大小 ≥ 并发数 + +### 推荐配置 + +| 场景 | 推荐并发数 | 说明 | +|------|-----------|------| +| CPU 密集型(如质数判断) | 核心数 × 1.5 | 充分利用 CPU | +| I/O 密集型(如网络请求) | 核心数 × 5-10 | 等待 I/O 时可切换 | +| 混合型 | 核心数 × 2-3 | 平衡 CPU 和 I/O | +| 内存受限 | 根据内存计算 | 避免 OOM | + +### 示例计算 + +假设: +- 服务器:4 核 8GB 内存 +- 任务类型:I/O 密集型(网络请求) +- 单任务内存:50MB + +``` +最大并发数 = min( + 核心数 × 8 = 32, + 可用内存 / 单任务内存 = 8000MB / 50MB = 160 +) = 32 +``` + +## 监控指标 + +相关 Prometheus 指标: + +```promql +# 任务创建速率 +rate(jobs_created_total[5m]) + +# 任务完成速率 +rate(jobs_completed_total[5m]) + +# 任务执行时间分布 +histogram_quantile(0.95, job_execution_duration_seconds_bucket) +``` + +## 故障排查 + +### 问题:任务一直处于 pending 状态 + +**可能原因:** +1. 所有槽位都被占用 +2. 某些任务执行时间过长 + +**解决方案:** +```bash +# 1. 检查并发状态 +curl http://localhost:8000/jobs/concurrency/status + +# 2. 如果 available_slots = 0,说明所有槽位被占用 +# 3. 检查是否有长时间运行的任务 +# 4. 考虑增加并发限制或优化算法性能 +``` + +### 问题:系统资源耗尽 + +**可能原因:** +并发数设置过高 + +**解决方案:** +```bash +# 降低并发限制 +export MAX_CONCURRENT_JOBS=5 +# 重启服务 +``` + +## 最佳实践 + +1. **监控优先**:部署后持续监控并发状态和系统资源 +2. **逐步调整**:从保守值开始,逐步增加并发数 +3. **压力测试**:在生产环境前进行充分的压力测试 +4. **设置告警**:当 `available_slots = 0` 持续时间过长时告警 +5. **任务超时**:为长时间运行的任务设置超时机制(待实现) + +## 未来改进 + +- [ ] 任务超时机制 +- [ ] 优先级队列 +- [ ] 动态调整并发数 +- [ ] 任务取消功能 +- [ ] 更细粒度的资源控制(CPU、内存限制)