Files
FunctionalScaffold/docs/concurrency-control.md
Roog (顾新培) 8afff21fae main:新增并发控制文档及快速参考指南
更新内容:
- 编写《并发控制》详细文档,说明任务并发限制的配置、使用和最佳实践。
- 完成《并发控制实现总结》文档,记录设计决策和开发细节。
- 添加《并发控制快速参考》文档,提供配置和常见问题的快速解决方案。
2026-02-02 17:15:11 +08:00

4.2 KiB
Raw Permalink Blame History

异步任务并发控制

概述

为了防止系统资源耗尽和控制负载,任务管理器实现了基于 asyncio.Semaphore 的并发控制机制。

配置

config.py 或环境变量中设置最大并发任务数:

# config.py
max_concurrent_jobs: int = 10  # 默认值

或通过环境变量:

export MAX_CONCURRENT_JOBS=20

工作原理

  1. 信号量机制:使用 asyncio.Semaphore 限制同时运行的任务数
  2. 自动管理:任务开始时获取槽位,完成后自动释放
  3. 队列等待:超过限制的任务会自动等待,直到有可用槽位

执行流程

POST /jobs 创建任务
       │
       ▼
asyncio.create_task(execute_job)
       │
       ▼
等待获取 semaphore 槽位
       │
       ▼
async with semaphore:  ← 获取槽位
    执行算法
    更新状态
    发送 webhook
       │
       ▼
自动释放槽位

API 端点

查询并发状态

GET /jobs/concurrency/status

响应示例:

{
  "max_concurrent": 10,
  "available_slots": 7,
  "running_jobs": 3
}

字段说明:

  • max_concurrent: 最大并发任务数(配置值)
  • available_slots: 当前可用槽位数
  • running_jobs: 当前正在运行的任务数

使用示例

1. 创建多个任务

# 创建 20 个任务
for i in {1..20}; do
  curl -X POST http://localhost:8000/jobs \
    -H "Content-Type: application/json" \
    -d "{\"algorithm\": \"PrimeChecker\", \"params\": {\"number\": $i}}"
done

2. 监控并发状态

# 持续监控并发状态
watch -n 1 'curl -s http://localhost:8000/jobs/concurrency/status | jq'

输出示例:

{
  "max_concurrent": 10,
  "available_slots": 0,
  "running_jobs": 10
}

3. 调整并发限制

# 重启服务前设置环境变量
export MAX_CONCURRENT_JOBS=20
./scripts/run_dev.sh

性能考虑

选择合适的并发数

并发数应根据以下因素确定:

  1. CPU 核心数CPU 密集型任务建议设置为核心数的 1-2 倍
  2. 内存限制:每个任务的内存占用 × 并发数 < 可用内存
  3. 外部服务限制:如果调用外部 API考虑其速率限制
  4. Redis 连接池:确保 Redis 连接池大小 ≥ 并发数

推荐配置

场景 推荐并发数 说明
CPU 密集型(如质数判断) 核心数 × 1.5 充分利用 CPU
I/O 密集型(如网络请求) 核心数 × 5-10 等待 I/O 时可切换
混合型 核心数 × 2-3 平衡 CPU 和 I/O
内存受限 根据内存计算 避免 OOM

示例计算

假设:

  • 服务器4 核 8GB 内存
  • 任务类型I/O 密集型(网络请求)
  • 单任务内存50MB
最大并发数 = min(
    核心数 × 8 = 32,
    可用内存 / 单任务内存 = 8000MB / 50MB = 160
) = 32

监控指标

相关 Prometheus 指标:

# 任务创建速率
rate(jobs_created_total[5m])

# 任务完成速率
rate(jobs_completed_total[5m])

# 任务执行时间分布
histogram_quantile(0.95, job_execution_duration_seconds_bucket)

故障排查

问题:任务一直处于 pending 状态

可能原因:

  1. 所有槽位都被占用
  2. 某些任务执行时间过长

解决方案:

# 1. 检查并发状态
curl http://localhost:8000/jobs/concurrency/status

# 2. 如果 available_slots = 0说明所有槽位被占用
# 3. 检查是否有长时间运行的任务
# 4. 考虑增加并发限制或优化算法性能

问题:系统资源耗尽

可能原因: 并发数设置过高

解决方案:

# 降低并发限制
export MAX_CONCURRENT_JOBS=5
# 重启服务

最佳实践

  1. 监控优先:部署后持续监控并发状态和系统资源
  2. 逐步调整:从保守值开始,逐步增加并发数
  3. 压力测试:在生产环境前进行充分的压力测试
  4. 设置告警:当 available_slots = 0 持续时间过长时告警
  5. 任务超时:为长时间运行的任务设置超时机制(待实现)

未来改进

  • 任务超时机制
  • 优先级队列
  • 动态调整并发数
  • 任务取消功能
  • 更细粒度的资源控制CPU、内存限制