Compare commits

...

5 Commits

Author SHA1 Message Date
9e0ba8e74f main:删除 Grafana 仪表板配置文件
更新内容:
- 移除 `dashboard.json` 文件,清理不再需要的 Grafana 仪表板配置。
- 简化项目目录结构,删除多余的监控配置以优化维护。
2026-02-02 18:40:16 +08:00
8afff21fae main:新增并发控制文档及快速参考指南
更新内容:
- 编写《并发控制》详细文档,说明任务并发限制的配置、使用和最佳实践。
- 完成《并发控制实现总结》文档,记录设计决策和开发细节。
- 添加《并发控制快速参考》文档,提供配置和常见问题的快速解决方案。
2026-02-02 17:15:11 +08:00
9b6642635b main:新增中间件测试用例
变更内容:
- 为路径规范化函数添加单元测试,验证 /jobs 等路径的行为。
- 为指标中间件编写测试,包括健康检查端点跳过和普通端点的指标记录。
- 检查任务路径规范化逻辑并验证规范化后的路径是否正确。
2026-02-02 17:12:07 +08:00
87ed8c071c main:新增并发控制功能
变更内容:
- 增加 `max_concurrent_jobs` 配置项,支持设置最大并发任务数。
- 为 `JobManager` 添加信号量控制实现任务并发限制。
- 新增获取任务并发状态的接口 `/jobs/concurrency/status`。
- 编写并发控制功能相关的测试用
2026-02-02 17:11:52 +08:00
57b276d038 main:删除指标脚本并优化指标记录逻辑
变更内容:
- 删除 `start_metrics.sh` 脚本,精简项目结构,移除不再需要的启动逻辑。
- 优化 HTTP 请求指标记录,新增健康检查端点过滤和路径参数规范化功能。
- 更新文档,添加指标过滤及路径规范化的详细说明。
- 提高 Prometheus 指标的性能和可维护性,避免标签基数爆炸。
2026-02-02 15:53:00 +08:00
31 changed files with 3142 additions and 190 deletions

View File

@@ -19,13 +19,14 @@
## 文档 ## 文档
| 文档 | 描述 | | 文档 | 描述 |
|------|------| |-----------------------------------------|--------------|
| [快速入门](docs/getting-started.md) | 10 分钟上手指南 | | [快速入门](docs/getting-started.md) | 10 分钟上手指南 |
| [算法开发指南](docs/algorithm-development.md) | 详细的算法开发教程 | | [算法开发指南](docs/algorithm-development.md) | 详细的算法开发教程 |
| [API 参考](docs/api-reference.md) | 完整的 API 文档 | | [API 参考](docs/api-reference.md) | 完整的 API 文档 |
| [监控指南](docs/monitoring.md) | 监控和告警配置 | | [监控指南](docs/monitoring.md) | 监控和告警配置 |
| [API 规范](docs/api/README.md) | OpenAPI 规范说明 | | [API 规范](docs/api/README.md) | OpenAPI 规范说明 |
| [日志集成(Loki)](docs/loki-quick-reference.md) | 日志收集部署说明 |
## 快速开始 ## 快速开始

View File

@@ -17,9 +17,16 @@ services:
- REDIS_DB=0 - REDIS_DB=0
# 指标配置文件路径 # 指标配置文件路径
- METRICS_CONFIG_PATH=config/metrics.yaml - METRICS_CONFIG_PATH=config/metrics.yaml
# 日志文件配置
- LOG_FILE_ENABLED=false
- LOG_FILE_PATH=/var/log/app/app.log
volumes: volumes:
- ../src:/app/src - ../src:/app/src
- ../config:/app/config - ../config:/app/config
- app_logs:/var/log/app
labels:
logging: "promtail"
logging_jobname: "functional-scaffold-app"
restart: unless-stopped restart: unless-stopped
depends_on: depends_on:
redis: redis:
@@ -69,12 +76,47 @@ services:
- GF_SECURITY_ADMIN_PASSWORD=admin - GF_SECURITY_ADMIN_PASSWORD=admin
volumes: volumes:
- grafana_data:/var/lib/grafana - grafana_data:/var/lib/grafana
- ../monitoring/grafana:/etc/grafana/provisioning - ../monitoring/grafana/datasources:/etc/grafana/provisioning/datasources
- ../monitoring/grafana/dashboards:/etc/grafana/provisioning/dashboards
restart: unless-stopped restart: unless-stopped
depends_on: depends_on:
- prometheus - prometheus
- loki
loki:
image: grafana/loki:2.9.3
ports:
- "3100:3100"
volumes:
- ../monitoring/loki.yaml:/etc/loki/local-config.yaml
- loki_data:/loki
command: -config.file=/etc/loki/local-config.yaml
restart: unless-stopped
healthcheck:
test: ["CMD", "wget", "--spider", "-q", "http://localhost:3100/ready"]
interval: 10s
timeout: 3s
retries: 3
promtail:
ports:
- "9080:9080"
image: grafana/promtail:3.0.0
volumes:
- ../monitoring/promtail.yaml:/etc/promtail/config.yml
# Docker stdio 收集
- /var/lib/docker/containers:/var/lib/docker/containers:ro
- /var/run/docker.sock:/var/run/docker.sock:ro
# Log 文件收集(备用)
- app_logs:/var/log/app:ro
command: -config.file=/etc/promtail/config.yml
restart: unless-stopped
depends_on:
- loki
volumes: volumes:
prometheus_data: prometheus_data:
grafana_data: grafana_data:
redis_data: redis_data:
loki_data:
app_logs:

View File

@@ -0,0 +1,204 @@
# 异步任务并发控制实现总结
## 变更概述
为异步任务管理器添加了并发控制功能,使用 `asyncio.Semaphore` 限制同时运行的任务数量,防止系统资源耗尽。
## 修改的文件
### 1. `src/functional_scaffold/config.py`
**新增配置项:**
```python
max_concurrent_jobs: int = 10 # 最大并发任务数
```
### 2. `src/functional_scaffold/core/job_manager.py`
**新增属性:**
- `_semaphore: Optional[asyncio.Semaphore]` - 并发控制信号量
- `_max_concurrent_jobs: int` - 最大并发数(存储配置值)
**修改方法:**
- `__init__()` - 初始化 semaphore 和 max_concurrent_jobs 属性
- `initialize()` - 创建 Semaphore 实例
- `execute_job()` - 使用 `async with self._semaphore` 包裹执行逻辑
**新增方法:**
- `get_concurrency_status()` - 返回并发状态(最大并发数、可用槽位、运行中任务数)
### 3. `src/functional_scaffold/api/models.py`
**新增模型:**
```python
class ConcurrencyStatusResponse(BaseModel):
"""并发状态响应"""
max_concurrent: int
available_slots: int
running_jobs: int
```
### 4. `src/functional_scaffold/api/routes.py`
**新增端点:**
```python
GET /jobs/concurrency/status
```
返回当前并发执行状态。
### 5. `tests/test_job_manager.py`
**新增测试类:**
```python
class TestConcurrencyControl:
- test_get_concurrency_status()
- test_get_concurrency_status_without_semaphore()
- test_concurrency_limit()
- test_concurrency_status_api()
```
**修改测试:**
- `test_execute_job()` - 添加 semaphore 初始化
## 工作原理
### 并发控制流程
```
创建任务 (POST /jobs)
asyncio.create_task(execute_job)
检查 Redis 和 semaphore 可用性
async with self._semaphore: ← 获取槽位(阻塞直到有可用槽位)
├─ 更新状态为 running
├─ 执行算法
├─ 更新状态为 completed/failed
└─ 发送 webhook
自动释放槽位
```
### 关键设计决策
1. **使用 asyncio.Semaphore**
- 简单、高效、无需外部依赖
- 自动管理槽位获取和释放
- 支持异步等待
2. **在 execute_job 内部使用 semaphore**
- 快速失败的检查Redis 可用性、任务存在性)在 semaphore 外部
- 只有真正要执行的任务才占用槽位
- 任务完成后自动释放(即使发生异常)
3. **存储 _max_concurrent_jobs**
- Semaphore 不暴露最大值属性
- 需要单独存储以便 `get_concurrency_status()` 使用
## 测试覆盖
- ✅ 获取并发状态
- ✅ 未初始化时的并发状态
- ✅ 并发限制生效(创建超过限制的任务,验证只有限定数量在运行)
- ✅ API 端点测试
- ✅ 所有现有测试继续通过60/60
## 使用示例
### 配置并发限制
```bash
# 环境变量
export MAX_CONCURRENT_JOBS=20
# 或在 .env 文件
MAX_CONCURRENT_JOBS=20
```
### 查询并发状态
```bash
curl http://localhost:8000/jobs/concurrency/status
```
响应:
```json
{
"max_concurrent": 10,
"available_slots": 7,
"running_jobs": 3
}
```
### 测试并发控制
```bash
# 运行测试脚本
./scripts/test_concurrency.sh
```
## 性能影响
### 优点
1. **防止资源耗尽**:限制同时运行的任务数
2. **可预测的负载**:系统负载不会超过配置的限制
3. **自动排队**:超过限制的任务自动等待
4. **零开销**未达到限制时semaphore 几乎无性能开销
### 注意事项
1. **任务等待**:超过限制的任务会等待,可能导致响应延迟
2. **内存占用**:等待中的任务仍占用内存(协程对象)
3. **配置调优**:需要根据实际负载调整并发数
## 监控建议
### Prometheus 查询
```promql
# 任务创建速率
rate(jobs_created_total[5m])
# 任务完成速率
rate(jobs_completed_total[5m])
# 任务积压(创建 - 完成)
rate(jobs_created_total[5m]) - rate(jobs_completed_total[5m])
```
### Grafana 面板
建议添加以下面板:
1. 并发状态时间序列max_concurrent, available_slots, running_jobs
2. 任务创建/完成速率
3. 任务执行时间分布P50, P95, P99
## 未来改进
1. **任务超时机制**:为长时间运行的任务设置超时
2. **优先级队列**:支持高优先级任务优先执行
3. **动态调整**:根据系统负载动态调整并发数
4. **任务取消**:支持取消等待中或运行中的任务
5. **资源限制**:更细粒度的 CPU、内存限制
## 相关文档
- [并发控制详细文档](./concurrency-control.md)
- [异步任务接口实现计划](../plans/giggly-hatching-kite.md)
- [监控指南](./monitoring.md)
## 测试结果
```
======================== 60 passed, 7 warnings in 1.53s ========================
```
所有测试通过,包括 4 个新增的并发控制测试。

View File

@@ -0,0 +1,102 @@
# 并发控制快速参考
## 配置
```bash
# 设置最大并发数(默认 10
export MAX_CONCURRENT_JOBS=20
```
## API
### 查询并发状态
```bash
GET /jobs/concurrency/status
```
**响应:**
```json
{
"max_concurrent": 10, // 最大并发数
"available_slots": 7, // 可用槽位
"running_jobs": 3 // 运行中任务数
}
```
## 代码示例
### 在 JobManager 中使用
```python
# 并发控制自动生效,无需额外代码
job_manager = await get_job_manager()
job_id = await job_manager.create_job(...)
# 任务会自动排队,等待可用槽位
asyncio.create_task(job_manager.execute_job(job_id))
```
### 查询并发状态
```python
job_manager = await get_job_manager()
status = job_manager.get_concurrency_status()
print(f"运行中: {status['running_jobs']}/{status['max_concurrent']}")
print(f"可用槽位: {status['available_slots']}")
```
## 监控
### 实时监控
```bash
# 持续监控并发状态
watch -n 1 'curl -s http://localhost:8000/jobs/concurrency/status | jq'
```
### 测试脚本
```bash
# 运行并发控制测试
./scripts/test_concurrency.sh
```
## 推荐配置
| 任务类型 | 推荐并发数 |
|---------|-----------|
| CPU 密集型 | 核心数 × 1.5 |
| I/O 密集型 | 核心数 × 5-10 |
| 混合型 | 核心数 × 2-3 |
## 故障排查
### 任务一直 pending
```bash
# 检查并发状态
curl http://localhost:8000/jobs/concurrency/status
# 如果 available_slots = 0说明所有槽位被占用
# 解决方案:
# 1. 等待当前任务完成
# 2. 增加并发限制
# 3. 优化算法性能
```
### 系统资源耗尽
```bash
# 降低并发限制
export MAX_CONCURRENT_JOBS=5
# 重启服务
./scripts/run_dev.sh
```
## 相关文档
- [详细文档](./concurrency-control.md)
- [实现总结](./concurrency-control-changelog.md)

204
docs/concurrency-control.md Normal file
View File

@@ -0,0 +1,204 @@
# 异步任务并发控制
## 概述
为了防止系统资源耗尽和控制负载,任务管理器实现了基于 `asyncio.Semaphore` 的并发控制机制。
## 配置
`config.py` 或环境变量中设置最大并发任务数:
```python
# config.py
max_concurrent_jobs: int = 10 # 默认值
```
或通过环境变量:
```bash
export MAX_CONCURRENT_JOBS=20
```
## 工作原理
1. **信号量机制**:使用 `asyncio.Semaphore` 限制同时运行的任务数
2. **自动管理**:任务开始时获取槽位,完成后自动释放
3. **队列等待**:超过限制的任务会自动等待,直到有可用槽位
### 执行流程
```
POST /jobs 创建任务
asyncio.create_task(execute_job)
等待获取 semaphore 槽位
async with semaphore: ← 获取槽位
执行算法
更新状态
发送 webhook
自动释放槽位
```
## API 端点
### 查询并发状态
```bash
GET /jobs/concurrency/status
```
**响应示例:**
```json
{
"max_concurrent": 10,
"available_slots": 7,
"running_jobs": 3
}
```
**字段说明:**
- `max_concurrent`: 最大并发任务数(配置值)
- `available_slots`: 当前可用槽位数
- `running_jobs`: 当前正在运行的任务数
## 使用示例
### 1. 创建多个任务
```bash
# 创建 20 个任务
for i in {1..20}; do
curl -X POST http://localhost:8000/jobs \
-H "Content-Type: application/json" \
-d "{\"algorithm\": \"PrimeChecker\", \"params\": {\"number\": $i}}"
done
```
### 2. 监控并发状态
```bash
# 持续监控并发状态
watch -n 1 'curl -s http://localhost:8000/jobs/concurrency/status | jq'
```
输出示例:
```json
{
"max_concurrent": 10,
"available_slots": 0,
"running_jobs": 10
}
```
### 3. 调整并发限制
```bash
# 重启服务前设置环境变量
export MAX_CONCURRENT_JOBS=20
./scripts/run_dev.sh
```
## 性能考虑
### 选择合适的并发数
并发数应根据以下因素确定:
1. **CPU 核心数**CPU 密集型任务建议设置为核心数的 1-2 倍
2. **内存限制**:每个任务的内存占用 × 并发数 < 可用内存
3. **外部服务限制**如果调用外部 API考虑其速率限制
4. **Redis 连接池**确保 Redis 连接池大小 并发数
### 推荐配置
| 场景 | 推荐并发数 | 说明 |
|------|-----------|------|
| CPU 密集型如质数判断 | 核心数 × 1.5 | 充分利用 CPU |
| I/O 密集型如网络请求 | 核心数 × 5-10 | 等待 I/O 时可切换 |
| 混合型 | 核心数 × 2-3 | 平衡 CPU I/O |
| 内存受限 | 根据内存计算 | 避免 OOM |
### 示例计算
假设
- 服务器4 8GB 内存
- 任务类型I/O 密集型网络请求
- 单任务内存50MB
```
最大并发数 = min(
核心数 × 8 = 32,
可用内存 / 单任务内存 = 8000MB / 50MB = 160
) = 32
```
## 监控指标
相关 Prometheus 指标
```promql
# 任务创建速率
rate(jobs_created_total[5m])
# 任务完成速率
rate(jobs_completed_total[5m])
# 任务执行时间分布
histogram_quantile(0.95, job_execution_duration_seconds_bucket)
```
## 故障排查
### 问题:任务一直处于 pending 状态
**可能原因:**
1. 所有槽位都被占用
2. 某些任务执行时间过长
**解决方案:**
```bash
# 1. 检查并发状态
curl http://localhost:8000/jobs/concurrency/status
# 2. 如果 available_slots = 0说明所有槽位被占用
# 3. 检查是否有长时间运行的任务
# 4. 考虑增加并发限制或优化算法性能
```
### 问题:系统资源耗尽
**可能原因:**
并发数设置过高
**解决方案:**
```bash
# 降低并发限制
export MAX_CONCURRENT_JOBS=5
# 重启服务
```
## 最佳实践
1. **监控优先**部署后持续监控并发状态和系统资源
2. **逐步调整**从保守值开始逐步增加并发数
3. **压力测试**在生产环境前进行充分的压力测试
4. **设置告警** `available_slots = 0` 持续时间过长时告警
5. **任务超时**为长时间运行的任务设置超时机制待实现
## 未来改进
- [ ] 任务超时机制
- [ ] 优先级队列
- [ ] 动态调整并发数
- [ ] 任务取消功能
- [ ] 更细粒度的资源控制CPU内存限制

View File

@@ -0,0 +1,238 @@
# Loki 日志收集系统集成 - 实施总结
## 实施完成
已成功集成 Grafana Loki 日志收集系统到 FunctionalScaffold 项目。
## 新增文件
### 1. 监控配置文件
| 文件 | 说明 |
|------|------|
| `monitoring/loki.yaml` | Loki 服务配置7天保留期10MB/s速率限制|
| `monitoring/promtail.yaml` | Promtail 日志采集配置(支持 Docker stdio 和文件两种模式)|
### 2. Grafana Provisioning
| 文件 | 说明 |
|------|------|
| `monitoring/grafana/datasources/prometheus.yaml` | Prometheus 数据源自动配置 |
| `monitoring/grafana/datasources/loki.yaml` | Loki 数据源自动配置 |
| `monitoring/grafana/dashboards/provider.yaml` | Dashboard 自动加载配置 |
| `monitoring/grafana/dashboards/logs-dashboard.json` | 日志监控仪表板 |
| `monitoring/grafana/dashboards/dashboard.json` | 原有监控仪表板(已移动)|
### 3. 文档和脚本
| 文件 | 说明 |
|------|------|
| `docs/loki-integration.md` | Loki 使用完整文档(包含查询示例、故障排查等)|
| `scripts/verify_loki.sh` | Loki 集成验证脚本 |
## 修改文件
### 1. Docker Compose 配置
**文件**: `deployment/docker-compose.yml`
**变更**:
- 添加 `loki` 服务(端口 3100
- 添加 `promtail` 服务(端口 9080
- 更新 `app` 服务:
- 添加日志文件配置环境变量
- 添加 `app_logs` 卷挂载
- 添加 Promtail 标签
- 更新 `grafana` 服务:
- 修改 provisioning 卷挂载结构
- 添加对 Loki 的依赖
- 添加 `loki_data``app_logs`
### 2. 应用代码
**文件**: `src/functional_scaffold/core/logging.py`
**变更**:
- 添加 `file_path` 参数支持
- 实现 `RotatingFileHandler`100MB5个备份
- 支持同时输出到控制台和文件
**文件**: `src/functional_scaffold/config.py`
**变更**:
- 添加 `log_file_enabled` 配置(默认 False
- 添加 `log_file_path` 配置(默认 `/var/log/app/app.log`
**文件**: `src/functional_scaffold/main.py`
**变更**:
- 更新 `setup_logging()` 调用,传入文件路径参数
## 架构特点
### 1. 双模式日志收集
**模式 1: Docker stdio 收集(默认)**
- ✅ 无需修改应用代码
- ✅ 自动收集容器标准输出
- ✅ 性能影响极小
- ✅ 推荐用于生产环境
**模式 2: 文件收集(备用)**
- ✅ 日志持久化到文件
- ✅ 支持日志轮转
- ✅ 适合需要本地日志的场景
- ⚙️ 需要设置 `LOG_FILE_ENABLED=true`
### 2. 自动化配置
- ✅ Grafana 数据源自动加载
- ✅ Dashboard 自动加载
- ✅ 无需手动配置
### 3. 结构化日志
- ✅ JSON 格式日志
- ✅ 自动提取字段level, logger, request_id 等)
- ✅ 支持 LogQL 查询
## 使用方式
### 快速启动
```bash
cd deployment
docker-compose up -d
```
### 访问服务
- **Grafana**: http://localhost:3000 (admin/admin)
- **Loki API**: http://localhost:3100
- **Promtail**: http://localhost:9080
### 查看日志
**方式 1: Grafana 日志仪表板**
1. 访问 http://localhost:3000
2. 进入 "日志监控" 仪表板
**方式 2: Grafana Explore**
1. 访问 http://localhost:3000/explore
2. 选择 Loki 数据源
3. 输入查询: `{job="functional-scaffold-app"}`
### 验证集成
```bash
./scripts/verify_loki.sh
```
## LogQL 查询示例
```logql
# 查询所有日志
{job="functional-scaffold-app"}
# 查询错误日志
{job="functional-scaffold-app", level="ERROR"}
# 按 request_id 过滤
{job="functional-scaffold-app"} | json | request_id = "abc123"
# 统计日志量
sum by (level) (count_over_time({job="functional-scaffold-app"}[5m]))
```
## 配置说明
### 日志保留期
默认 7 天,可在 `monitoring/loki.yaml` 中修改:
```yaml
limits_config:
retention_period: 168h # 7 天
```
### 日志文件模式
`deployment/docker-compose.yml` 中启用:
```yaml
environment:
- LOG_FILE_ENABLED=true
- LOG_FILE_PATH=/var/log/app/app.log
```
### 日志级别
`deployment/docker-compose.yml` 中调整:
```yaml
environment:
- LOG_LEVEL=INFO # DEBUG, INFO, WARNING, ERROR, CRITICAL
```
## 监控指标
Loki 集成后,可以在 Grafana 中查看:
- **日志流**: 实时日志流
- **日志量趋势**: 按时间和级别统计
- **日志级别分布**: INFO/WARNING/ERROR 分布
- **错误日志**: 只显示 ERROR 级别
## 故障排查
### 看不到日志
1. 检查服务状态: `docker-compose ps`
2. 查看 Promtail 日志: `docker-compose logs promtail`
3. 验证容器标签: `docker inspect <container> | grep Labels`
4. 查询 Loki API: `curl http://localhost:3100/loki/api/v1/label/job/values`
### Docker socket 权限问题
```bash
sudo chmod 666 /var/run/docker.sock
```
### 日志量过大
1. 调整保留期为 3 天
2. 降低摄入速率限制
3. 添加日志过滤规则
详细故障排查请参考 `docs/loki-integration.md`
## 性能影响
- **CPU**: < 5% 额外开销
- **内存**: Loki ~200MB, Promtail ~50MB
- **磁盘**: 取决于日志量7天约 1-5GB
- **网络**: 本地通信影响极小
## 下一步
可选的增强功能
1. **告警规则**: 配置基于日志的告警
2. **日志导出**: 定期导出日志到对象存储
3. **多租户**: 配置 Loki 多租户模式
4. **长期存储**: 配置 S3/OSS 作为后端存储
## 参考文档
- 完整使用文档: `docs/loki-integration.md`
- Loki 官方文档: https://grafana.com/docs/loki/latest/
- LogQL 查询语言: https://grafana.com/docs/loki/latest/logql/
## 总结
**完成**: Loki 日志收集系统已成功集成
**测试**: 可通过 `./scripts/verify_loki.sh` 验证
**文档**: 提供完整的使用和故障排查文档
**生产就绪**: 支持双模式收集配置灵活
集成已完成可以开始使用 Loki 进行日志收集和分析

564
docs/loki-integration.md Normal file
View File

@@ -0,0 +1,564 @@
# Loki 日志收集系统集成文档
## 概述
本项目已集成 Grafana Loki 日志收集系统,支持两种日志收集模式:
1. **Docker stdio 收集**(推荐)- 从容器标准输出/错误收集日志
2. **Log 文件收集**(备用)- 从日志文件收集日志
## 架构
```
应用容器 (stdout/stderr)
Docker Engine
Promtail (日志采集器)
Loki (日志存储)
Grafana (可视化)
```
## 快速开始
### 1. 启动服务
```bash
cd deployment
docker-compose up -d
```
这将启动以下服务:
- **app**: 应用服务 (端口 8111)
- **loki**: 日志存储服务 (端口 3100)
- **promtail**: 日志采集服务 (端口 9080)
- **grafana**: 可视化服务 (端口 3000)
- **prometheus**: 指标收集服务 (端口 9090)
- **redis**: 缓存服务 (端口 6380)
### 2. 访问 Grafana
1. 打开浏览器访问 http://localhost:3000
2. 使用默认凭据登录:
- 用户名: `admin`
- 密码: `admin`
3. 首次登录后建议修改密码
### 3. 查看日志
#### 方式 1: 使用预配置的日志仪表板
1. 在 Grafana 左侧菜单点击 **Dashboards**
2. 选择 **日志监控** 仪表板
3. 查看以下面板:
- **日志流 (实时)**: 实时日志流
- **日志量趋势(按级别)**: 时间序列图表
- **日志级别分布**: 按级别统计
- **错误日志**: 只显示 ERROR 级别日志
#### 方式 2: 使用 Explore 功能
1. 在 Grafana 左侧菜单点击 **Explore** (指南针图标)
2. 选择 **Loki** 数据源
3. 输入 LogQL 查询语句(见下文)
## LogQL 查询示例
### 基础查询
```logql
# 查询所有应用日志
{job="functional-scaffold-app"}
# 查询特定级别的日志
{job="functional-scaffold-app", level="ERROR"}
{job="functional-scaffold-app", level="INFO"}
# 查询特定容器的日志
{container="functional-scaffold-app-1"}
```
### 文本过滤
```logql
# 包含特定文本
{job="functional-scaffold-app"} |= "request_id"
# 不包含特定文本
{job="functional-scaffold-app"} != "healthz"
# 正则表达式匹配
{job="functional-scaffold-app"} |~ "error|exception"
# 正则表达式不匹配
{job="functional-scaffold-app"} !~ "debug|trace"
```
### JSON 字段提取
```logql
# 提取 request_id 字段
{job="functional-scaffold-app"} | json | request_id != ""
# 提取并过滤特定 request_id
{job="functional-scaffold-app"} | json | request_id = "abc123"
# 提取 logger 字段
{job="functional-scaffold-app"} | json | logger = "functional_scaffold.api.routes"
```
### 聚合查询
```logql
# 统计日志数量
count_over_time({job="functional-scaffold-app"}[5m])
# 按级别统计
sum by (level) (count_over_time({job="functional-scaffold-app"}[5m]))
# 计算错误率
sum(rate({job="functional-scaffold-app", level="ERROR"}[5m]))
/
sum(rate({job="functional-scaffold-app"}[5m]))
```
## 日志收集模式
### 模式 1: Docker stdio 收集(默认,推荐)
**特点:**
- 无需修改应用代码
- 自动收集容器标准输出/错误
- 性能影响极小
- 配置简单
**工作原理:**
1. 应用将日志输出到 stdout/stderr
2. Docker Engine 捕获日志
3. Promtail 通过 Docker API 读取日志
4. 日志发送到 Loki 存储
**配置:**
- 应用容器需要添加标签:
```yaml
labels:
logging: "promtail"
logging_jobname: "functional-scaffold-app"
```
### 模式 2: Log 文件收集(备用)
**特点:**
- 日志持久化到文件
- 支持日志轮转
- 适合需要本地日志文件的场景
**启用方式:**
1. 修改 `deployment/docker-compose.yml`
```yaml
environment:
- LOG_FILE_ENABLED=true
- LOG_FILE_PATH=/var/log/app/app.log
```
2. 重启服务:
```bash
docker-compose up -d app
```
**日志文件配置:**
- 最大文件大小: 100MB
- 保留备份数: 5 个
- 总存储空间: 最多 500MB
## 配置说明
### Loki 配置 (monitoring/loki.yaml)
```yaml
limits_config:
retention_period: 168h # 日志保留 7 天
ingestion_rate_mb: 10 # 摄入速率限制 10MB/s
ingestion_burst_size_mb: 20 # 突发大小 20MB
```
**可调整参数:**
- `retention_period`: 日志保留时间(默认 7 天)
- `ingestion_rate_mb`: 每秒摄入速率限制
- `ingestion_burst_size_mb`: 突发流量大小
### Promtail 配置 (monitoring/promtail.yaml)
**Docker stdio 收集配置:**
```yaml
scrape_configs:
- job_name: docker
docker_sd_configs:
- host: unix:///var/run/docker.sock
filters:
- name: label
values: ["logging=promtail"]
```
**文件收集配置:**
```yaml
scrape_configs:
- job_name: app_files
static_configs:
- targets:
- localhost
labels:
job: functional-scaffold-app-files
__path__: /var/log/app/*.log
```
## 验证和测试
### 1. 检查服务状态
```bash
# 查看所有服务
docker-compose ps
# 检查 Loki 健康状态
curl http://localhost:3100/ready
# 检查 Promtail 健康状态
curl http://localhost:9080/ready
```
### 2. 生成测试日志
```bash
# 发送测试请求
curl -X POST http://localhost:8111/invoke \
-H "Content-Type: application/json" \
-d '{"algorithm": "PrimeChecker", "params": {"number": 17}}'
```
### 3. 查询日志
```bash
# 使用 Loki API 查询
curl -G -s "http://localhost:3100/loki/api/v1/query_range" \
--data-urlencode 'query={job="functional-scaffold-app"}' \
--data-urlencode 'limit=10' \
| jq '.data.result'
```
### 4. 在 Grafana 中验证
1. 访问 http://localhost:3000/explore
2. 选择 Loki 数据源
3. 输入查询: `{job="functional-scaffold-app"}`
4. 应该能看到应用日志
## 故障排查
### 问题 1: 看不到日志
**检查步骤:**
1. 确认 Promtail 正在运行:
```bash
docker-compose ps promtail
```
2. 检查 Promtail 日志:
```bash
docker-compose logs promtail
```
3. 确认应用容器有正确的标签:
```bash
docker inspect functional-scaffold-app-1 | grep -A 5 Labels
```
4. 检查 Loki 是否接收到日志:
```bash
curl -G -s "http://localhost:3100/loki/api/v1/label/job/values" | jq
```
### 问题 2: Promtail 无法访问 Docker socket
**错误信息:**
```
permission denied while trying to connect to the Docker daemon socket
```
**解决方案:**
在 macOS/Linux 上,确保 Docker socket 权限正确:
```bash
sudo chmod 666 /var/run/docker.sock
```
或者将 Promtail 容器添加到 docker 组Linux
```yaml
promtail:
user: root
group_add:
- docker
```
### 问题 3: 日志量过大
**症状:**
- Loki 响应缓慢
- 磁盘空间不足
**解决方案:**
1. 调整日志保留期:
```yaml
# monitoring/loki.yaml
limits_config:
retention_period: 72h # 改为 3 天
```
2. 增加摄入速率限制:
```yaml
limits_config:
ingestion_rate_mb: 5 # 降低到 5MB/s
```
3. 添加日志过滤:
```yaml
# monitoring/promtail.yaml
pipeline_stages:
- match:
selector: '{job="functional-scaffold-app"}'
stages:
- drop:
expression: ".*healthz.*" # 丢弃健康检查日志
```
### 问题 4: 文件模式下看不到日志
**检查步骤:**
1. 确认文件日志已启用:
```bash
docker-compose exec app env | grep LOG_FILE
```
2. 检查日志文件是否存在:
```bash
docker-compose exec app ls -lh /var/log/app/
```
3. 检查 Promtail 是否能访问日志文件:
```bash
docker-compose exec promtail ls -lh /var/log/app/
```
## 性能优化
### 1. 减少日志量
**在应用层面:**
- 调整日志级别为 WARNING 或 ERROR
- 过滤掉不必要的日志(如健康检查)
```yaml
# docker-compose.yml
environment:
- LOG_LEVEL=WARNING
```
**在 Promtail 层面:**
```yaml
# monitoring/promtail.yaml
pipeline_stages:
- drop:
expression: ".*healthz.*"
drop_counter_reason: "healthcheck"
```
### 2. 优化查询性能
**使用标签过滤:**
```logql
# 好:使用标签过滤(快)
{job="functional-scaffold-app", level="ERROR"}
# 差:使用文本过滤(慢)
{job="functional-scaffold-app"} |= "ERROR"
```
**限制时间范围:**
```logql
# 查询最近 5 分钟
{job="functional-scaffold-app"}[5m]
# 避免查询过长时间范围
{job="functional-scaffold-app"}[7d] # 慢
```
### 3. 存储优化
**定期清理旧数据:**
```bash
# Loki 会自动根据 retention_period 清理
# 也可以手动清理
docker-compose exec loki rm -rf /loki/chunks/*
```
**监控磁盘使用:**
```bash
docker-compose exec loki du -sh /loki/chunks
```
## 高级功能
### 1. 告警规则
在 Loki 中配置告警规则(需要 Loki Ruler
```yaml
# monitoring/loki-rules.yaml
groups:
- name: error_alerts
interval: 1m
rules:
- alert: HighErrorRate
expr: |
sum(rate({job="functional-scaffold-app", level="ERROR"}[5m]))
/
sum(rate({job="functional-scaffold-app"}[5m]))
> 0.05
for: 5m
labels:
severity: warning
annotations:
summary: "错误率过高"
description: "应用错误率超过 5%"
```
### 2. 日志导出
**导出为 JSON**
```bash
curl -G -s "http://localhost:3100/loki/api/v1/query_range" \
--data-urlencode 'query={job="functional-scaffold-app"}' \
--data-urlencode 'start=2024-01-01T00:00:00Z' \
--data-urlencode 'end=2024-01-02T00:00:00Z' \
| jq '.data.result' > logs.json
```
**导出为文本:**
```bash
curl -G -s "http://localhost:3100/loki/api/v1/query_range" \
--data-urlencode 'query={job="functional-scaffold-app"}' \
| jq -r '.data.result[].values[][1]' > logs.txt
```
### 3. 与 Prometheus 集成
在 Grafana 仪表板中同时显示日志和指标:
```json
{
"panels": [
{
"title": "错误率和错误日志",
"targets": [
{
"datasource": "Prometheus",
"expr": "rate(http_requests_total{status=\"error\"}[5m])"
},
{
"datasource": "Loki",
"expr": "{job=\"functional-scaffold-app\", level=\"ERROR\"}"
}
]
}
]
}
```
## 最佳实践
### 1. 日志格式
**使用结构化日志JSON**
```python
logger.info("处理请求", extra={
"request_id": "abc123",
"user_id": "user456",
"duration": 0.123
})
```
**输出:**
```json
{
"asctime": "2024-01-01 12:00:00,000",
"name": "functional_scaffold.api.routes",
"levelname": "INFO",
"message": "处理请求",
"request_id": "abc123",
"user_id": "user456",
"duration": 0.123
}
```
### 2. 标签策略
**好的标签:**
- 低基数(值的种类少)
- 用于过滤和分组
- 例如:`level`, `logger`, `container`
**不好的标签:**
- 高基数(值的种类多)
- 例如:`request_id`, `user_id`, `timestamp`
**正确做法:**
```logql
# 使用标签过滤
{job="functional-scaffold-app", level="ERROR"}
# 使用 JSON 提取高基数字段
{job="functional-scaffold-app"} | json | request_id = "abc123"
```
### 3. 查询优化
**使用时间范围:**
```logql
{job="functional-scaffold-app"}[5m] # 最近 5 分钟
```
**限制返回行数:**
```logql
{job="functional-scaffold-app"} | limit 100
```
**使用聚合减少数据量:**
```logql
sum by (level) (count_over_time({job="functional-scaffold-app"}[5m]))
```
## 参考资料
- [Loki 官方文档](https://grafana.com/docs/loki/latest/)
- [LogQL 查询语言](https://grafana.com/docs/loki/latest/logql/)
- [Promtail 配置](https://grafana.com/docs/loki/latest/clients/promtail/configuration/)
- [Grafana Explore](https://grafana.com/docs/grafana/latest/explore/)
## 总结
本项目的 Loki 集成提供了:
**开箱即用** - 无需额外配置即可收集日志
**双模式支持** - Docker stdio默认和文件收集
**自动化配置** - 数据源和仪表板自动加载
**结构化日志** - JSON 格式,支持字段提取
**高性能** - 低资源占用,快速查询
**易于扩展** - 支持自定义标签和过滤规则
如有问题,请参考故障排查章节或查阅官方文档。

View File

@@ -0,0 +1,237 @@
# Loki 快速参考
## 常用命令
### 服务管理
```bash
# 启动所有服务
cd deployment && docker-compose up -d
# 查看服务状态
docker-compose ps
# 查看日志
docker-compose logs -f loki
docker-compose logs -f promtail
# 重启服务
docker-compose restart loki promtail
# 停止服务
docker-compose down
```
### 健康检查
```bash
# Loki
curl http://localhost:3100/ready
# Promtail
curl http://localhost:9080/ready
# 验证脚本
./scripts/verify_loki.sh
```
## 常用 LogQL 查询
### 基础查询
```logql
# 所有日志
{job="functional-scaffold-app"}
# 错误日志
{job="functional-scaffold-app", level="ERROR"}
# 特定时间范围
{job="functional-scaffold-app"}[5m]
```
### 文本过滤
```logql
# 包含文本
{job="functional-scaffold-app"} |= "error"
# 不包含文本
{job="functional-scaffold-app"} != "healthz"
# 正则匹配
{job="functional-scaffold-app"} |~ "error|exception"
```
### JSON 提取
```logql
# 提取 request_id
{job="functional-scaffold-app"} | json | request_id != ""
# 按 request_id 过滤
{job="functional-scaffold-app"} | json | request_id = "abc123"
```
### 聚合统计
```logql
# 日志数量
count_over_time({job="functional-scaffold-app"}[5m])
# 按级别统计
sum by (level) (count_over_time({job="functional-scaffold-app"}[5m]))
# 错误率
sum(rate({job="functional-scaffold-app", level="ERROR"}[5m]))
/
sum(rate({job="functional-scaffold-app"}[5m]))
```
## API 查询
### 查询日志
```bash
# 查询最近的日志
curl -G -s "http://localhost:3100/loki/api/v1/query_range" \
--data-urlencode 'query={job="functional-scaffold-app"}' \
--data-urlencode 'limit=10' \
| jq '.data.result'
# 查询错误日志
curl -G -s "http://localhost:3100/loki/api/v1/query_range" \
--data-urlencode 'query={job="functional-scaffold-app", level="ERROR"}' \
| jq '.data.result'
```
### 查询标签
```bash
# 查询所有 job 标签值
curl -s "http://localhost:3100/loki/api/v1/label/job/values" | jq
# 查询所有 level 标签值
curl -s "http://localhost:3100/loki/api/v1/label/level/values" | jq
```
## 配置切换
### 启用文件日志
编辑 `deployment/docker-compose.yml`:
```yaml
environment:
- LOG_FILE_ENABLED=true
```
重启服务:
```bash
docker-compose up -d app
```
### 调整日志级别
编辑 `deployment/docker-compose.yml`:
```yaml
environment:
- LOG_LEVEL=WARNING # DEBUG, INFO, WARNING, ERROR, CRITICAL
```
### 修改保留期
编辑 `monitoring/loki.yaml`:
```yaml
limits_config:
retention_period: 72h # 改为 3 天
```
重启 Loki:
```bash
docker-compose restart loki
```
## 访问地址
| 服务 | 地址 | 凭据 |
|------|------|------|
| Grafana | http://localhost:3000 | admin/admin |
| Loki API | http://localhost:3100 | - |
| Promtail | http://localhost:9080 | - |
| Prometheus | http://localhost:9090 | - |
| App | http://localhost:8111 | - |
## 故障排查
### 看不到日志
```bash
# 1. 检查 Promtail 日志
docker-compose logs promtail | tail -50
# 2. 检查容器标签
docker inspect deployment-app-1 | grep -A 5 Labels
# 3. 查询 Loki
curl -s "http://localhost:3100/loki/api/v1/label/job/values" | jq
```
### Docker socket 权限
```bash
sudo chmod 666 /var/run/docker.sock
```
### 清理日志数据
```bash
# 停止 Loki
docker-compose stop loki
# 清理数据
docker-compose exec loki rm -rf /loki/chunks/*
# 重启 Loki
docker-compose start loki
```
## 性能优化
### 减少日志量
```yaml
# docker-compose.yml
environment:
- LOG_LEVEL=WARNING # 只记录警告和错误
```
### 过滤健康检查日志
编辑 `monitoring/promtail.yaml`:
```yaml
pipeline_stages:
- drop:
expression: ".*healthz.*"
```
### 限制查询范围
```logql
# 好:限制时间范围
{job="functional-scaffold-app"}[5m]
# 差:查询所有时间
{job="functional-scaffold-app"}
```
## 文档链接
- 完整文档: `docs/loki-integration.md`
- 实施总结: `docs/loki-implementation-summary.md`
- 验证脚本: `scripts/verify_loki.sh`

View File

@@ -0,0 +1,126 @@
# 指标过滤和路径规范化
## 变更说明
本次修改优化了 HTTP 请求指标的记录逻辑,主要包括两个方面:
### 1. 跳过健康检查端点
以下端点不再记录到 Prometheus 指标中:
- `/metrics` - 指标端点本身
- `/healthz` - 存活检查
- `/readyz` - 就绪检查
**原因**:这些端点通常被频繁调用(如 Kubernetes 健康检查、Prometheus 抓取),但对业务监控意义不大,会产生大量噪音数据。
### 2. 路径参数规范化
带有路径参数的端点会被规范化为模板形式:
| 原始路径 | 规范化后 |
|---------|---------|
| `GET /jobs/a1b2c3d4e5f6` | `GET /jobs/{job_id}` |
| `GET /jobs/xyz123456789` | `GET /jobs/{job_id}` |
**原因**避免因为不同的路径参数值产生过多的指标标签导致指标基数爆炸cardinality explosion影响 Prometheus 性能。
## 实现细节
### 代码修改
**文件:`src/functional_scaffold/main.py`**
1. 添加 `normalize_path()` 函数:
```python
def normalize_path(path: str) -> str:
"""规范化路径,将路径参数替换为模板形式"""
if path.startswith("/jobs/") and len(path) > 6:
return "/jobs/{job_id}"
return path
```
2. 修改 `track_metrics` 中间件:
```python
# 跳过不需要记录指标的端点
skip_paths = {"/metrics", "/readyz", "/healthz"}
if request.url.path in skip_paths:
return await call_next(request)
# 使用规范化后的路径记录指标
normalized_path = normalize_path(request.url.path)
incr("http_requests_total",
{"method": request.method, "endpoint": normalized_path, "status": status})
```
### 测试覆盖
**文件:`tests/test_middleware.py`**
新增 6 个测试用例:
- `test_normalize_jobs_path` - 测试任务路径规范化
- `test_normalize_other_paths` - 测试其他路径保持不变
- `test_normalize_jobs_root` - 测试 /jobs 根路径
- `test_skip_health_endpoints` - 测试跳过健康检查端点
- `test_record_normal_endpoints` - 测试记录普通端点
- `test_normalize_job_path` - 测试规范化任务路径的集成测试
所有测试通过:✅ 56/56 passed
## 验证方法
### 手动测试
使用提供的测试脚本:
```bash
./scripts/test_metrics_filtering.sh
```
### 预期结果
访问 `/metrics` 端点后,应该看到:
**应该出现的指标:**
```
http_requests_total{method="POST",endpoint="/invoke",status="success"} 1
http_requests_total{method="GET",endpoint="/jobs/{job_id}",status="error"} 2
```
**不应该出现的指标:**
```
http_requests_total{method="GET",endpoint="/healthz",...}
http_requests_total{method="GET",endpoint="/readyz",...}
http_requests_total{method="GET",endpoint="/metrics",...}
http_requests_total{method="GET",endpoint="/jobs/a1b2c3d4e5f6",...}
```
## 扩展性
如果需要添加更多路径规范化规则,只需修改 `normalize_path()` 函数:
```python
def normalize_path(path: str) -> str:
"""规范化路径,将路径参数替换为模板形式"""
# 任务路径
if path.startswith("/jobs/") and len(path) > 6:
return "/jobs/{job_id}"
# 用户路径(示例)
if path.startswith("/users/") and len(path) > 7:
return "/users/{user_id}"
# 其他路径保持不变
return path
```
## 影响范围
- ✅ 不影响现有功能
- ✅ 不影响 API 行为
- ✅ 仅影响指标记录逻辑
- ✅ 向后兼容
- ✅ 所有测试通过
## 相关文档
- [监控指南](../docs/monitoring.md) - 已更新指标说明
- [测试脚本](../scripts/test_metrics_filtering.sh) - 手动验证脚本

View File

@@ -61,6 +61,19 @@ docker-compose up -d redis prometheus grafana
| `http_request_duration_seconds` | Histogram | method, endpoint | HTTP 请求延迟分布 | | `http_request_duration_seconds` | Histogram | method, endpoint | HTTP 请求延迟分布 |
| `http_requests_in_progress` | Gauge | - | 当前进行中的请求数 | | `http_requests_in_progress` | Gauge | - | 当前进行中的请求数 |
**注意事项:**
1. **跳过的端点**:以下端点不会被记录到指标中,以减少噪音:
- `/metrics` - 指标端点本身
- `/healthz` - 存活检查
- `/readyz` - 就绪检查
2. **路径规范化**:带有路径参数的端点会被规范化为模板形式:
- `GET /jobs/a1b2c3d4e5f6``GET /jobs/{job_id}`
- `GET /jobs/xyz123456789``GET /jobs/{job_id}`
这样可以避免因为不同的路径参数值产生过多的指标标签,导致指标基数爆炸。
### 算法执行指标 ### 算法执行指标
| 指标 | 类型 | 标签 | 描述 | | 指标 | 类型 | 标签 | 描述 |

258
monitoring/README.md Normal file
View File

@@ -0,0 +1,258 @@
# Monitoring 目录说明
本目录包含所有监控和日志收集相关的配置文件。
## 目录结构
```
monitoring/
├── alerts/ # Prometheus 告警规则
│ └── rules.yaml # 告警规则配置
├── grafana/ # Grafana 配置
│ ├── datasources/ # 数据源自动配置
│ │ ├── prometheus.yaml # Prometheus 数据源
│ │ └── loki.yaml # Loki 数据源
│ └── dashboards/ # 仪表板自动加载
│ ├── provider.yaml # Dashboard provider 配置
│ ├── dashboard.json # 指标监控仪表板
│ └── logs-dashboard.json # 日志监控仪表板
├── loki.yaml # Loki 日志存储配置
├── promtail.yaml # Promtail 日志采集配置
└── prometheus.yml # Prometheus 指标收集配置
```
## 配置文件说明
### Prometheus 配置
**文件**: `prometheus.yml`
Prometheus 指标收集配置,包括:
- 抓取间隔: 5 秒
- 目标: app 服务的 `/metrics` 端点
- 告警规则: 从 `alerts/` 目录加载
### Loki 配置
**文件**: `loki.yaml`
Loki 日志存储配置,包括:
- 存储方式: 本地文件系统
- 日志保留期: 7 天
- 摄入速率限制: 10MB/s
- 自动压缩和清理
**关键配置**:
```yaml
limits_config:
retention_period: 168h # 7 天
ingestion_rate_mb: 10 # 10MB/s
```
### Promtail 配置
**文件**: `promtail.yaml`
Promtail 日志采集配置,支持两种模式:
**模式 1: Docker stdio 收集(默认)**
- 通过 Docker API 自动发现容器
- 过滤带有 `logging=promtail` 标签的容器
- 自动解析 JSON 日志
**模式 2: 文件收集(备用)**
-`/var/log/app/*.log` 读取日志文件
- 支持日志轮转
- 需要设置 `LOG_FILE_ENABLED=true`
### Grafana Provisioning
**数据源** (`grafana/datasources/`)
自动配置 Grafana 数据源:
- `prometheus.yaml`: Prometheus 数据源(默认)
- `loki.yaml`: Loki 数据源
**仪表板** (`grafana/dashboards/`)
自动加载 Grafana 仪表板:
- `provider.yaml`: Dashboard provider 配置
- `dashboard.json`: 指标监控仪表板HTTP 请求、算法执行等)
- `logs-dashboard.json`: 日志监控仪表板(日志流、错误日志等)
### 告警规则
**文件**: `alerts/rules.yaml`
Prometheus 告警规则,包括:
- 高错误率告警
- 高延迟告警
- 服务不可用告警
## 修改配置
### 调整日志保留期
编辑 `loki.yaml`:
```yaml
limits_config:
retention_period: 72h # 改为 3 天
```
重启 Loki:
```bash
cd deployment
docker-compose restart loki
```
### 调整指标抓取间隔
编辑 `prometheus.yml`:
```yaml
global:
scrape_interval: 10s # 改为 10 秒
```
重启 Prometheus:
```bash
cd deployment
docker-compose restart prometheus
```
### 添加新的告警规则
编辑 `alerts/rules.yaml`,添加新规则:
```yaml
groups:
- name: my_alerts
rules:
- alert: MyAlert
expr: my_metric > 100
for: 5m
labels:
severity: warning
annotations:
summary: "我的告警"
```
重启 Prometheus:
```bash
cd deployment
docker-compose restart prometheus
```
### 添加新的仪表板
1. 在 Grafana UI 中创建仪表板
2. 导出为 JSON
3. 保存到 `grafana/dashboards/my-dashboard.json`
4. 重启 Grafana或等待自动重载
```bash
cd deployment
docker-compose restart grafana
```
## 验证配置
### 检查 Prometheus 配置
```bash
# 访问 Prometheus UI
open http://localhost:9090
# 检查目标状态
open http://localhost:9090/targets
# 检查告警规则
open http://localhost:9090/alerts
```
### 检查 Loki 配置
```bash
# 检查 Loki 健康状态
curl http://localhost:3100/ready
# 查询标签
curl -s "http://localhost:3100/loki/api/v1/label/job/values" | jq
```
### 检查 Grafana 配置
```bash
# 访问 Grafana UI
open http://localhost:3000
# 检查数据源
curl -s -u admin:admin http://localhost:3000/api/datasources | jq
# 检查仪表板
curl -s -u admin:admin http://localhost:3000/api/search | jq
```
## 故障排查
### Prometheus 无法抓取指标
1. 检查 app 服务是否运行: `docker-compose ps app`
2. 检查 metrics 端点: `curl http://localhost:8111/metrics`
3. 查看 Prometheus 日志: `docker-compose logs prometheus`
### Loki 无法接收日志
1. 检查 Promtail 是否运行: `docker-compose ps promtail`
2. 查看 Promtail 日志: `docker-compose logs promtail`
3. 检查容器标签: `docker inspect <container> | grep Labels`
### Grafana 数据源未加载
1. 检查 provisioning 目录挂载: `docker-compose config | grep grafana -A 10`
2. 查看 Grafana 日志: `docker-compose logs grafana`
3. 手动重启 Grafana: `docker-compose restart grafana`
## 相关文档
- [Loki 集成文档](../docs/loki-integration.md) - 完整的 Loki 使用文档
- [Loki 快速参考](../docs/loki-quick-reference.md) - 常用命令和查询
- [Loki 实施总结](../docs/loki-implementation-summary.md) - 实施细节和架构说明
- [Prometheus 官方文档](https://prometheus.io/docs/)
- [Loki 官方文档](https://grafana.com/docs/loki/latest/)
- [Grafana 官方文档](https://grafana.com/docs/grafana/latest/)
## 性能建议
### 日志量控制
- 调整日志级别为 WARNING 或 ERROR
- 过滤掉不必要的日志(如健康检查)
- 减少日志保留期
### 指标优化
- 增加抓取间隔(如 15s 或 30s
- 减少指标基数(避免高基数标签)
- 定期清理旧数据
### 存储优化
- 监控磁盘使用: `docker-compose exec loki du -sh /loki`
- 定期备份重要数据
- 考虑使用对象存储S3/OSS作为后端
## 总结
本目录包含完整的监控和日志收集配置:
**Prometheus** - 指标收集和告警
**Loki** - 日志存储和查询
**Promtail** - 日志采集
**Grafana** - 可视化和仪表板
所有配置都支持自动加载,无需手动配置。

View File

@@ -0,0 +1,292 @@
{
"annotations": {
"list": [
{
"builtIn": 1,
"datasource": {
"type": "grafana",
"uid": "-- Grafana --"
},
"enable": true,
"hide": true,
"iconColor": "rgba(0, 211, 255, 1)",
"name": "Annotations & Alerts",
"type": "dashboard"
}
]
},
"editable": true,
"fiscalYearStartMonth": 0,
"graphTooltip": 0,
"id": null,
"links": [],
"liveNow": false,
"panels": [
{
"datasource": {
"type": "loki",
"uid": "Loki"
},
"gridPos": {
"h": 10,
"w": 24,
"x": 0,
"y": 0
},
"id": 1,
"options": {
"dedupStrategy": "none",
"enableLogDetails": true,
"prettifyLogMessage": false,
"showCommonLabels": false,
"showLabels": false,
"showTime": true,
"sortOrder": "Descending",
"wrapLogMessage": false
},
"targets": [
{
"datasource": {
"type": "loki",
"uid": "Loki"
},
"editorMode": "code",
"expr": "{job=\"functional-scaffold-app\"}",
"queryType": "range",
"refId": "A"
}
],
"title": "日志流 (实时)",
"type": "logs"
},
{
"datasource": {
"type": "loki",
"uid": "Loki"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 10,
"gradientMode": "none",
"hideFrom": {
"tooltip": false,
"viz": false,
"legend": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "never",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
}
]
},
"unit": "short"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 10
},
"id": 2,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "loki",
"uid": "Loki"
},
"editorMode": "code",
"expr": "sum by (level) (count_over_time({job=\"functional-scaffold-app\"}[1m]))",
"queryType": "range",
"refId": "A"
}
],
"title": "日志量趋势(按级别)",
"type": "timeseries"
},
{
"datasource": {
"type": "loki",
"uid": "Loki"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "thresholds"
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "yellow",
"value": 10
},
{
"color": "red",
"value": 50
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 10
},
"id": 3,
"options": {
"orientation": "auto",
"reduceOptions": {
"values": false,
"calcs": [
"lastNotNull"
],
"fields": ""
},
"showThresholdLabels": false,
"showThresholdMarkers": true
},
"pluginVersion": "9.5.3",
"targets": [
{
"datasource": {
"type": "loki",
"uid": "Loki"
},
"editorMode": "code",
"expr": "sum by (level) (count_over_time({job=\"functional-scaffold-app\"}[$__range]))",
"queryType": "range",
"refId": "A"
}
],
"title": "日志级别分布",
"type": "gauge"
},
{
"datasource": {
"type": "loki",
"uid": "Loki"
},
"gridPos": {
"h": 10,
"w": 24,
"x": 0,
"y": 18
},
"id": 4,
"options": {
"dedupStrategy": "none",
"enableLogDetails": true,
"prettifyLogMessage": false,
"showCommonLabels": false,
"showLabels": false,
"showTime": true,
"sortOrder": "Descending",
"wrapLogMessage": false
},
"targets": [
{
"datasource": {
"type": "loki",
"uid": "Loki"
},
"editorMode": "code",
"expr": "{job=\"functional-scaffold-app\", level=\"ERROR\"}",
"queryType": "range",
"refId": "A"
}
],
"title": "错误日志",
"type": "logs"
}
],
"refresh": "5s",
"schemaVersion": 38,
"style": "dark",
"tags": ["logs", "loki"],
"templating": {
"list": [
{
"current": {
"selected": false,
"text": "",
"value": ""
},
"hide": 0,
"label": "Request ID",
"name": "request_id",
"options": [
{
"selected": true,
"text": "",
"value": ""
}
],
"query": "",
"skipUrlSync": false,
"type": "textbox"
}
]
},
"time": {
"from": "now-15m",
"to": "now"
},
"timepicker": {},
"timezone": "",
"title": "日志监控",
"uid": "logs-dashboard",
"version": 0,
"weekStart": ""
}

View File

@@ -0,0 +1,13 @@
apiVersion: 1
providers:
- name: 'default'
orgId: 1
folder: ''
type: file
disableDeletion: false
updateIntervalSeconds: 10
allowUiUpdates: true
options:
path: /etc/grafana/provisioning/dashboards
foldersFromFilesStructure: true

View File

@@ -0,0 +1,11 @@
apiVersion: 1
datasources:
- name: Loki
type: loki
access: proxy
url: http://loki:3100
isDefault: false
editable: false
jsonData:
maxLines: 1000

View File

@@ -0,0 +1,11 @@
apiVersion: 1
datasources:
- name: Prometheus
type: prometheus
access: proxy
url: http://prometheus:9090
isDefault: true
editable: false
jsonData:
timeInterval: "5s"

39
monitoring/loki.yaml Normal file
View File

@@ -0,0 +1,39 @@
auth_enabled: false
server:
http_listen_port: 3100
grpc_listen_port: 9096
common:
path_prefix: /loki
storage:
filesystem:
chunks_directory: /loki/chunks
rules_directory: /loki/rules
replication_factor: 1
ring:
instance_addr: 127.0.0.1
kvstore:
store: inmemory
schema_config:
configs:
- from: 2020-10-24
store: boltdb-shipper
object_store: filesystem
schema: v11
index:
prefix: index_
period: 24h
limits_config:
retention_period: 168h # 7 天
ingestion_rate_mb: 10
ingestion_burst_size_mb: 20
compactor:
working_directory: /loki/compactor
shared_store: filesystem
compaction_interval: 10m
retention_enabled: true
retention_delete_delay: 2h

71
monitoring/promtail.yaml Normal file
View File

@@ -0,0 +1,71 @@
server:
http_listen_port: 9080
grpc_listen_port: 0
positions:
filename: /tmp/positions.yaml
clients:
- url: http://loki:3100/loki/api/v1/push
scrape_configs:
# 场景 1: Docker stdio 收集(主要方式)
- job_name: docker
docker_sd_configs:
- host: unix:///var/run/docker.sock
refresh_interval: 5s
filters:
- name: label
values: ["logging=promtail"]
relabel_configs:
- source_labels: ['__meta_docker_container_name']
regex: '/(.*)'
target_label: 'container'
- source_labels: ['__meta_docker_container_label_logging_jobname']
target_label: 'job'
- source_labels: ['__meta_docker_container_id']
target_label: '__path__'
replacement: '/var/lib/docker/containers/$1/*.log'
pipeline_stages:
- json:
expressions:
log: log
stream: stream
time: time
- json:
source: log
expressions:
level: levelname
logger: name
message: message
request_id: request_id
- labels:
level:
logger:
- output:
source: log
# 场景 2: Log 文件收集(备用)
- job_name: app_files
static_configs:
- targets:
- localhost
labels:
job: functional-scaffold-app-files
__path__: /var/log/app/*.log
pipeline_stages:
- json:
expressions:
timestamp: asctime
level: levelname
logger: name
message: message
request_id: request_id
- timestamp:
source: timestamp
format: "2006-01-02 15:04:05,000"
- labels:
level:
logger:
- output:
source: message

View File

@@ -1,114 +0,0 @@
#!/bin/bash
# 指标方案快速启动脚本
set -e
# 颜色定义
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
echo "=========================================="
echo "FunctionalScaffold 指标方案启动脚本"
echo "=========================================="
# 检查 docker-compose
if ! command -v docker-compose &> /dev/null; then
echo -e "${RED}错误: docker-compose 未安装${NC}"
exit 1
fi
# 选择方案
echo ""
echo "请选择指标方案:"
echo "1. Pushgateway推荐适合 Serverless"
echo "2. Redis + Exporter适合高并发"
echo "3. 两者都启动(用于对比测试)"
echo ""
read -p "输入选项 (1/2/3): " choice
cd "$(dirname "$0")/../deployment"
case $choice in
1)
echo -e "${GREEN}启动 Pushgateway 方案...${NC}"
docker-compose up -d redis pushgateway prometheus grafana
echo ""
echo -e "${GREEN}✓ Pushgateway 方案已启动${NC}"
echo ""
echo "服务地址:"
echo " - Pushgateway: http://localhost:9091"
echo " - Prometheus: http://localhost:9090"
echo " - Grafana: http://localhost:3000 (admin/admin)"
echo ""
echo "下一步:"
echo " 1. 修改代码导入: from functional_scaffold.core.metrics_pushgateway import ..."
echo " 2. 配置环境变量: PUSHGATEWAY_URL=localhost:9091"
echo " 3. 启动应用: ./scripts/run_dev.sh"
echo " 4. 运行测试: python scripts/test_metrics.py pushgateway"
;;
2)
echo -e "${GREEN}启动 Redis 方案...${NC}"
# 检查 redis 依赖
if ! python -c "import redis" 2>/dev/null; then
echo -e "${YELLOW}警告: redis 库未安装${NC}"
echo "正在安装 redis..."
pip install redis
fi
docker-compose up -d redis redis-exporter prometheus grafana
echo ""
echo -e "${GREEN}✓ Redis 方案已启动${NC}"
echo ""
echo "服务地址:"
echo " - Redis: localhost:6379"
echo " - Redis Exporter: http://localhost:8001/metrics"
echo " - Prometheus: http://localhost:9090"
echo " - Grafana: http://localhost:3000 (admin/admin)"
echo ""
echo "下一步:"
echo " 1. 修改代码导入: from functional_scaffold.core.metrics_redis import ..."
echo " 2. 配置环境变量: REDIS_HOST=localhost REDIS_PORT=6379"
echo " 3. 启动应用: ./scripts/run_dev.sh"
echo " 4. 运行测试: python scripts/test_metrics.py redis"
;;
3)
echo -e "${GREEN}启动所有服务...${NC}"
# 检查 redis 依赖
if ! python -c "import redis" 2>/dev/null; then
echo -e "${YELLOW}警告: redis 库未安装${NC}"
echo "正在安装 redis..."
pip install redis
fi
docker-compose up -d
echo ""
echo -e "${GREEN}✓ 所有服务已启动${NC}"
echo ""
echo "服务地址:"
echo " - 应用: http://localhost:8000"
echo " - Pushgateway: http://localhost:9091"
echo " - Redis: localhost:6379"
echo " - Redis Exporter: http://localhost:8001/metrics"
echo " - Prometheus: http://localhost:9090"
echo " - Grafana: http://localhost:3000 (admin/admin)"
echo ""
echo "下一步:"
echo " 1. 查看文档: cat docs/metrics-guide.md"
echo " 2. 运行测试: python scripts/test_metrics.py"
;;
*)
echo -e "${RED}无效的选项${NC}"
exit 1
;;
esac
echo ""
echo "=========================================="
echo "查看日志: docker-compose logs -f"
echo "停止服务: docker-compose down"
echo "查看文档: cat ../docs/metrics-guide.md"
echo "=========================================="

104
scripts/test_concurrency.sh Executable file
View File

@@ -0,0 +1,104 @@
#!/bin/bash
# 并发控制测试脚本
set -e
BASE_URL="http://localhost:8000"
echo "=== 异步任务并发控制测试 ==="
echo ""
# 1. 检查服务是否运行
echo "1. 检查服务状态..."
if ! curl -s "${BASE_URL}/healthz" > /dev/null; then
echo "❌ 服务未运行,请先启动服务"
exit 1
fi
echo "✅ 服务正常运行"
echo ""
# 2. 查询初始并发状态
echo "2. 查询初始并发状态..."
curl -s "${BASE_URL}/jobs/concurrency/status" | jq '.'
echo ""
# 3. 创建多个任务
echo "3. 创建 15 个任务(测试并发限制)..."
JOB_IDS=()
for i in {1..15}; do
# 使用较大的质数,让任务执行时间更长
NUMBER=$((10000 + i * 1000))
RESPONSE=$(curl -s -X POST "${BASE_URL}/jobs" \
-H "Content-Type: application/json" \
-d "{\"algorithm\": \"PrimeChecker\", \"params\": {\"number\": ${NUMBER}}}")
JOB_ID=$(echo "$RESPONSE" | jq -r '.job_id')
JOB_IDS+=("$JOB_ID")
echo " 创建任务 ${i}/15: job_id=${JOB_ID}"
# 短暂延迟,避免请求过快
sleep 0.1
done
echo ""
# 4. 立即查询并发状态(应该看到多个任务在运行)
echo "4. 查询并发状态(任务执行中)..."
for i in {1..5}; do
echo "${i} 次查询:"
STATUS=$(curl -s "${BASE_URL}/jobs/concurrency/status")
echo " $(echo "$STATUS" | jq -c '.')"
sleep 1
done
echo ""
# 5. 等待所有任务完成
echo "5. 等待任务完成..."
COMPLETED=0
TOTAL=${#JOB_IDS[@]}
while [ $COMPLETED -lt $TOTAL ]; do
COMPLETED=0
for JOB_ID in "${JOB_IDS[@]}"; do
STATUS=$(curl -s "${BASE_URL}/jobs/${JOB_ID}" | jq -r '.status')
if [ "$STATUS" = "completed" ] || [ "$STATUS" = "failed" ]; then
((COMPLETED++))
fi
done
echo " 进度: ${COMPLETED}/${TOTAL} 任务完成"
# 显示当前并发状态
CONCURRENCY=$(curl -s "${BASE_URL}/jobs/concurrency/status")
echo " 并发状态: $(echo "$CONCURRENCY" | jq -c '.')"
if [ $COMPLETED -lt $TOTAL ]; then
sleep 2
fi
done
echo ""
# 6. 查询最终并发状态
echo "6. 查询最终并发状态..."
curl -s "${BASE_URL}/jobs/concurrency/status" | jq '.'
echo ""
# 7. 显示任务结果统计
echo "7. 任务结果统计..."
COMPLETED_COUNT=0
FAILED_COUNT=0
for JOB_ID in "${JOB_IDS[@]}"; do
STATUS=$(curl -s "${BASE_URL}/jobs/${JOB_ID}" | jq -r '.status')
if [ "$STATUS" = "completed" ]; then
((COMPLETED_COUNT++))
elif [ "$STATUS" = "failed" ]; then
((FAILED_COUNT++))
fi
done
echo " 总任务数: ${TOTAL}"
echo " 成功: ${COMPLETED_COUNT}"
echo " 失败: ${FAILED_COUNT}"
echo ""
echo "=== 测试完成 ==="

View File

@@ -0,0 +1,39 @@
#!/bin/bash
# 测试指标过滤和路径规范化
echo "=== 测试指标过滤和路径规范化 ==="
echo ""
# 启动服务(假设已经在运行)
BASE_URL="http://localhost:8000"
echo "1. 访问健康检查端点(应该被跳过,不记录指标)"
curl -s "$BASE_URL/healthz" > /dev/null
curl -s "$BASE_URL/readyz" > /dev/null
echo " ✓ 已访问 /healthz 和 /readyz"
echo ""
echo "2. 访问普通端点(应该记录指标)"
curl -s -X POST "$BASE_URL/invoke" \
-H "Content-Type: application/json" \
-d '{"number": 17}' > /dev/null
echo " ✓ 已访问 POST /invoke"
echo ""
echo "3. 访问任务端点(应该规范化为 /jobs/{job_id}"
curl -s "$BASE_URL/jobs/a1b2c3d4e5f6" > /dev/null
curl -s "$BASE_URL/jobs/xyz123456789" > /dev/null
echo " ✓ 已访问 GET /jobs/a1b2c3d4e5f6 和 GET /jobs/xyz123456789"
echo ""
echo "4. 查看指标输出"
echo " 查找 http_requests_total 指标:"
curl -s "$BASE_URL/metrics" | grep 'http_requests_total{' | grep -v '#'
echo ""
echo " 预期结果:"
echo " - 应该看到 endpoint=\"/invoke\" 的记录"
echo " - 应该看到 endpoint=\"/jobs/{job_id}\" 的记录(而不是具体的 job_id"
echo " - 不应该看到 endpoint=\"/healthz\" 或 endpoint=\"/readyz\" 的记录"
echo " - 不应该看到 endpoint=\"/metrics\" 的记录"
echo ""
echo "=== 测试完成 ==="

100
scripts/verify_loki.sh Executable file
View File

@@ -0,0 +1,100 @@
#!/bin/bash
# Loki 集成验证脚本
set -e
echo "========================================="
echo "Loki 日志收集系统验证"
echo "========================================="
echo ""
# 颜色定义
GREEN='\033[0;32m'
RED='\033[0;31m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# 检查服务状态
echo "1. 检查服务状态..."
echo "-------------------"
docker-compose ps
echo ""
echo "2. 检查 Loki 健康状态..."
echo "-------------------"
if curl -s http://localhost:3100/ready | grep -q "ready"; then
echo -e "${GREEN}✓ Loki 服务正常${NC}"
else
echo -e "${RED}✗ Loki 服务异常${NC}"
exit 1
fi
echo ""
echo "3. 检查 Promtail 健康状态..."
echo "-------------------"
if curl -s http://localhost:9080/ready | grep -q "ready"; then
echo -e "${GREEN}✓ Promtail 服务正常${NC}"
else
echo -e "${RED}✗ Promtail 服务异常${NC}"
exit 1
fi
echo ""
echo "4. 生成测试日志..."
echo "-------------------"
curl -X POST http://localhost:8111/invoke \
-H "Content-Type: application/json" \
-d '{"algorithm": "PrimeChecker", "params": {"number": 17}}' \
-s -o /dev/null -w "HTTP Status: %{http_code}\n"
echo ""
echo "5. 等待日志收集 (5秒)..."
sleep 5
echo ""
echo "6. 查询 Loki 日志..."
echo "-------------------"
LOGS=$(curl -G -s "http://localhost:3100/loki/api/v1/query_range" \
--data-urlencode 'query={job="functional-scaffold-app"}' \
--data-urlencode 'limit=5')
if echo "$LOGS" | jq -e '.data.result | length > 0' > /dev/null 2>&1; then
echo -e "${GREEN}✓ 成功查询到日志${NC}"
echo ""
echo "最近的日志条目:"
echo "$LOGS" | jq -r '.data.result[0].values[-1][1]' | head -3
else
echo -e "${YELLOW}⚠ 暂时没有查询到日志,可能需要等待更长时间${NC}"
fi
echo ""
echo "7. 检查 Grafana 数据源..."
echo "-------------------"
DATASOURCES=$(curl -s -u admin:admin http://localhost:3000/api/datasources)
if echo "$DATASOURCES" | jq -e '.[] | select(.name == "Loki")' > /dev/null 2>&1; then
echo -e "${GREEN}✓ Loki 数据源已配置${NC}"
else
echo -e "${RED}✗ Loki 数据源未配置${NC}"
fi
if echo "$DATASOURCES" | jq -e '.[] | select(.name == "Prometheus")' > /dev/null 2>&1; then
echo -e "${GREEN}✓ Prometheus 数据源已配置${NC}"
else
echo -e "${RED}✗ Prometheus 数据源未配置${NC}"
fi
echo ""
echo "========================================="
echo "验证完成!"
echo "========================================="
echo ""
echo "访问地址:"
echo " - Grafana: http://localhost:3000 (admin/admin)"
echo " - Loki: http://localhost:3100"
echo " - Promtail: http://localhost:9080"
echo ""
echo "查看日志:"
echo " 1. 访问 Grafana Explore: http://localhost:3000/explore"
echo " 2. 选择 Loki 数据源"
echo " 3. 输入查询: {job=\"functional-scaffold-app\"}"
echo ""

View File

@@ -2,7 +2,7 @@
from fastapi import Header, HTTPException from fastapi import Header, HTTPException
from typing import Optional from typing import Optional
from ..core.tracing import set_request_id, generate_request_id from ..core.tracing import set_request_id, generate_request_id, get_request_id as get_current_request_id
async def get_request_id(x_request_id: Optional[str] = Header(None)) -> str: async def get_request_id(x_request_id: Optional[str] = Header(None)) -> str:
@@ -15,6 +15,12 @@ async def get_request_id(x_request_id: Optional[str] = Header(None)) -> str:
Returns: Returns:
str: 请求ID str: 请求ID
""" """
# 先检查 ContextVar 中是否已经有 request_id由中间件设置
existing_request_id = get_current_request_id()
if existing_request_id:
return existing_request_id
# 如果没有,则从请求头获取或生成新的
request_id = x_request_id or generate_request_id() request_id = x_request_id or generate_request_id()
set_request_id(request_id) set_request_id(request_id)
return request_id return request_id

View File

@@ -152,3 +152,21 @@ class JobStatusResponse(BaseModel):
result: Optional[Dict[str, Any]] = Field(None, description="执行结果(仅完成时返回)") result: Optional[Dict[str, Any]] = Field(None, description="执行结果(仅完成时返回)")
error: Optional[str] = Field(None, description="错误信息(仅失败时返回)") error: Optional[str] = Field(None, description="错误信息(仅失败时返回)")
metadata: Optional[Dict[str, Any]] = Field(None, description="元数据信息") metadata: Optional[Dict[str, Any]] = Field(None, description="元数据信息")
class ConcurrencyStatusResponse(BaseModel):
"""并发状态响应"""
model_config = ConfigDict(
json_schema_extra={
"example": {
"max_concurrent": 10,
"available_slots": 7,
"running_jobs": 3,
}
}
)
max_concurrent: int = Field(..., description="最大并发任务数")
available_slots: int = Field(..., description="当前可用槽位数")
running_jobs: int = Field(..., description="当前运行中的任务数")

View File

@@ -15,6 +15,7 @@ from .models import (
JobCreateResponse, JobCreateResponse,
JobStatusResponse, JobStatusResponse,
JobStatus, JobStatus,
ConcurrencyStatusResponse,
) )
from .dependencies import get_request_id from .dependencies import get_request_id
from ..algorithms.prime_checker import PrimeChecker from ..algorithms.prime_checker import PrimeChecker
@@ -292,3 +293,57 @@ async def get_job_status(job_id: str):
"message": str(e), "message": str(e),
}, },
) )
@router.get(
"/jobs/concurrency/status",
response_model=ConcurrencyStatusResponse,
summary="查询并发状态",
description="查询任务管理器的并发执行状态",
responses={
200: {"description": "成功", "model": ConcurrencyStatusResponse},
503: {"description": "服务不可用", "model": ErrorResponse},
},
)
async def get_concurrency_status():
"""
查询并发状态
返回当前任务管理器的并发执行状态,包括:
- 最大并发任务数
- 当前可用槽位数
- 当前运行中的任务数
"""
try:
job_manager = await get_job_manager()
# 检查任务管理器是否可用
if not job_manager.is_available():
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail={
"error": "SERVICE_UNAVAILABLE",
"message": "任务管理器不可用",
},
)
concurrency_status = job_manager.get_concurrency_status()
return ConcurrencyStatusResponse(
max_concurrent=concurrency_status["max_concurrent"],
available_slots=concurrency_status["available_slots"],
running_jobs=concurrency_status["running_jobs"],
)
except HTTPException:
raise
except Exception as e:
logger.error(f"查询并发状态失败: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail={
"error": "INTERNAL_ERROR",
"message": str(e),
},
)

View File

@@ -23,6 +23,8 @@ class Settings(BaseSettings):
# 日志配置 # 日志配置
log_level: str = "INFO" log_level: str = "INFO"
log_format: str = "json" log_format: str = "json"
log_file_enabled: bool = False
log_file_path: str = "/var/log/app/app.log"
# 指标配置 # 指标配置
metrics_enabled: bool = True metrics_enabled: bool = True
@@ -53,6 +55,7 @@ class Settings(BaseSettings):
job_result_ttl: int = 1800 # 结果缓存时间(秒),默认 30 分钟 job_result_ttl: int = 1800 # 结果缓存时间(秒),默认 30 分钟
webhook_max_retries: int = 3 # Webhook 最大重试次数 webhook_max_retries: int = 3 # Webhook 最大重试次数
webhook_timeout: int = 10 # Webhook 超时时间(秒) webhook_timeout: int = 10 # Webhook 超时时间(秒)
max_concurrent_jobs: int = 10 # 最大并发任务数
# 全局配置实例 # 全局配置实例

View File

@@ -27,6 +27,8 @@ class JobManager:
self._redis_client: Optional[aioredis.Redis] = None self._redis_client: Optional[aioredis.Redis] = None
self._algorithm_registry: Dict[str, Type[BaseAlgorithm]] = {} self._algorithm_registry: Dict[str, Type[BaseAlgorithm]] = {}
self._http_client: Optional[httpx.AsyncClient] = None self._http_client: Optional[httpx.AsyncClient] = None
self._semaphore: Optional[asyncio.Semaphore] = None
self._max_concurrent_jobs: int = 0
async def initialize(self) -> None: async def initialize(self) -> None:
"""初始化 Redis 连接和 HTTP 客户端""" """初始化 Redis 连接和 HTTP 客户端"""
@@ -51,6 +53,11 @@ class JobManager:
# 初始化 HTTP 客户端 # 初始化 HTTP 客户端
self._http_client = httpx.AsyncClient(timeout=settings.webhook_timeout) self._http_client = httpx.AsyncClient(timeout=settings.webhook_timeout)
# 初始化并发控制信号量
self._max_concurrent_jobs = settings.max_concurrent_jobs
self._semaphore = asyncio.Semaphore(self._max_concurrent_jobs)
logger.info(f"任务并发限制已设置: {self._max_concurrent_jobs}")
# 注册算法 # 注册算法
self._register_algorithms() self._register_algorithms()
@@ -203,6 +210,10 @@ class JobManager:
logger.error(f"Redis 不可用,无法执行任务: {job_id}") logger.error(f"Redis 不可用,无法执行任务: {job_id}")
return return
if not self._semaphore:
logger.error(f"并发控制未初始化,无法执行任务: {job_id}")
return
key = f"job:{job_id}" key = f"job:{job_id}"
job_data = await self._redis_client.hgetall(key) job_data = await self._redis_client.hgetall(key)
@@ -219,74 +230,76 @@ class JobManager:
except json.JSONDecodeError: except json.JSONDecodeError:
params = {} params = {}
# 更新状态为 running # 使用信号量控制并发
started_at = self._get_timestamp() async with self._semaphore:
await self._redis_client.hset(key, mapping={"status": "running", "started_at": started_at}) # 更新状态为 running
started_at = self._get_timestamp()
await self._redis_client.hset(key, mapping={"status": "running", "started_at": started_at})
logger.info(f"开始执行任务: job_id={job_id}, algorithm={algorithm_name}") logger.info(f"开始执行任务: job_id={job_id}, algorithm={algorithm_name}")
import time import time
start_time = time.time() start_time = time.time()
status = "completed" status = "completed"
result_data = None result_data = None
error_msg = None error_msg = None
metadata = None metadata = None
try: try:
# 获取算法类并执行 # 获取算法类并执行
algorithm_cls = self._algorithm_registry.get(algorithm_name) algorithm_cls = self._algorithm_registry.get(algorithm_name)
if not algorithm_cls: if not algorithm_cls:
raise ValueError(f"算法 '{algorithm_name}' 不存在") raise ValueError(f"算法 '{algorithm_name}' 不存在")
algorithm = algorithm_cls() algorithm = algorithm_cls()
# 根据算法类型传递参数 # 根据算法类型传递参数
if algorithm_name == "PrimeChecker": if algorithm_name == "PrimeChecker":
execution_result = algorithm.execute(params.get("number", 0)) execution_result = algorithm.execute(params.get("number", 0))
else: else:
# 通用参数传递 # 通用参数传递
execution_result = algorithm.execute(**params) execution_result = algorithm.execute(**params)
if execution_result.get("success"): if execution_result.get("success"):
result_data = execution_result.get("result", {}) result_data = execution_result.get("result", {})
metadata = execution_result.get("metadata", {}) metadata = execution_result.get("metadata", {})
else: else:
status = "failed"
error_msg = execution_result.get("error", "算法执行失败")
metadata = execution_result.get("metadata", {})
except Exception as e:
status = "failed" status = "failed"
error_msg = execution_result.get("error", "算法执行失败") error_msg = str(e)
metadata = execution_result.get("metadata", {}) logger.error(f"任务执行失败: job_id={job_id}, error={e}", exc_info=True)
except Exception as e: # 计算执行时间
status = "failed" elapsed_time = time.time() - start_time
error_msg = str(e) completed_at = self._get_timestamp()
logger.error(f"任务执行失败: job_id={job_id}, error={e}", exc_info=True)
# 计算执行时间 # 更新任务状态
elapsed_time = time.time() - start_time update_data = {
completed_at = self._get_timestamp() "status": status,
"completed_at": completed_at,
"result": json.dumps(result_data) if result_data else "",
"error": error_msg or "",
"metadata": json.dumps(metadata) if metadata else "",
}
await self._redis_client.hset(key, mapping=update_data)
# 更新任务状态 # 设置 TTL
update_data = { await self._redis_client.expire(key, settings.job_result_ttl)
"status": status,
"completed_at": completed_at,
"result": json.dumps(result_data) if result_data else "",
"error": error_msg or "",
"metadata": json.dumps(metadata) if metadata else "",
}
await self._redis_client.hset(key, mapping=update_data)
# 设置 TTL # 记录指标
await self._redis_client.expire(key, settings.job_result_ttl) incr("jobs_completed_total", {"algorithm": algorithm_name, "status": status})
observe("job_execution_duration_seconds", {"algorithm": algorithm_name}, elapsed_time)
# 记录指标 logger.info(f"任务执行完成: job_id={job_id}, status={status}, elapsed={elapsed_time:.3f}s")
incr("jobs_completed_total", {"algorithm": algorithm_name, "status": status})
observe("job_execution_duration_seconds", {"algorithm": algorithm_name}, elapsed_time)
logger.info(f"任务执行完成: job_id={job_id}, status={status}, elapsed={elapsed_time:.3f}s") # 发送 Webhook 回调
if webhook_url:
# 发送 Webhook 回调 await self._send_webhook(job_id, webhook_url)
if webhook_url:
await self._send_webhook(job_id, webhook_url)
async def _send_webhook(self, job_id: str, webhook_url: str) -> None: async def _send_webhook(self, job_id: str, webhook_url: str) -> None:
"""发送 Webhook 回调(带重试) """发送 Webhook 回调(带重试)
@@ -359,6 +372,32 @@ class JobManager:
"""检查任务管理器是否可用""" """检查任务管理器是否可用"""
return self._redis_client is not None return self._redis_client is not None
def get_concurrency_status(self) -> Dict[str, int]:
"""获取并发状态
Returns:
Dict[str, int]: 包含以下键的字典
- max_concurrent: 最大并发数
- available_slots: 可用槽位数
- running_jobs: 当前运行中的任务数
"""
if not self._semaphore:
return {
"max_concurrent": 0,
"available_slots": 0,
"running_jobs": 0,
}
max_concurrent = self._max_concurrent_jobs
available_slots = self._semaphore._value
running_jobs = max_concurrent - available_slots
return {
"max_concurrent": max_concurrent,
"available_slots": available_slots,
"running_jobs": running_jobs,
}
# 全局单例 # 全局单例
_job_manager: Optional[JobManager] = None _job_manager: Optional[JobManager] = None

View File

@@ -2,14 +2,39 @@
import logging import logging
import sys import sys
from pathlib import Path
from typing import Optional from typing import Optional
from logging.handlers import RotatingFileHandler
from pythonjsonlogger.json import JsonFormatter from pythonjsonlogger.json import JsonFormatter
from .tracing import get_request_id
class RequestIdFilter(logging.Filter):
"""自动添加 request_id 到日志记录的过滤器"""
def filter(self, record: logging.LogRecord) -> bool:
"""
为日志记录添加 request_id 字段
Args:
record: 日志记录
Returns:
bool: 总是返回 True不过滤任何日志
"""
# 从 ContextVar 中获取 request_id
request_id = get_request_id()
# 添加到日志记录中,如果没有则设置为 None
record.request_id = request_id if request_id else "-"
return True
def setup_logging( def setup_logging(
level: str = "INFO", level: str = "INFO",
format_type: str = "json", format_type: str = "json",
logger_name: Optional[str] = None, logger_name: Optional[str] = None,
file_path: Optional[str] = None,
) -> logging.Logger: ) -> logging.Logger:
""" """
配置日志系统 配置日志系统
@@ -18,6 +43,7 @@ def setup_logging(
level: 日志级别 (DEBUG, INFO, WARNING, ERROR, CRITICAL) level: 日志级别 (DEBUG, INFO, WARNING, ERROR, CRITICAL)
format_type: 日志格式 ('json''text') format_type: 日志格式 ('json''text')
logger_name: 日志器名称None表示根日志器 logger_name: 日志器名称None表示根日志器
file_path: 日志文件路径None表示不写入文件
Returns: Returns:
logging.Logger: 配置好的日志器 logging.Logger: 配置好的日志器
@@ -28,23 +54,45 @@ def setup_logging(
# 清除现有处理器 # 清除现有处理器
logger.handlers.clear() logger.handlers.clear()
# 创建控制台处理器
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(getattr(logging, level.upper()))
# 设置格式 # 设置格式
if format_type == "json": if format_type == "json":
formatter = JsonFormatter( formatter = JsonFormatter(
"%(asctime)s %(name)s %(levelname)s %(message)s", "%(asctime)s %(name)s %(levelname)s %(message)s %(request_id)s",
timestamp=True, timestamp=True,
) )
else: else:
formatter = logging.Formatter( formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s", "%(asctime)s - %(name)s - %(levelname)s - [%(request_id)s] - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S", datefmt="%Y-%m-%d %H:%M:%S",
) )
handler.setFormatter(formatter) # 创建 RequestIdFilter
logger.addHandler(handler) request_id_filter = RequestIdFilter()
# 创建控制台处理器
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(getattr(logging, level.upper()))
console_handler.setFormatter(formatter)
console_handler.addFilter(request_id_filter)
logger.addHandler(console_handler)
# 创建文件处理器(如果指定了文件路径)
if file_path:
# 确保日志目录存在
log_dir = Path(file_path).parent
log_dir.mkdir(parents=True, exist_ok=True)
# 创建 RotatingFileHandler
# 最大 100MB保留 5 个备份
file_handler = RotatingFileHandler(
file_path,
maxBytes=100 * 1024 * 1024, # 100MB
backupCount=5,
encoding="utf-8",
)
file_handler.setLevel(getattr(logging, level.upper()))
file_handler.setFormatter(formatter)
file_handler.addFilter(request_id_filter)
logger.addHandler(file_handler)
return logger return logger

View File

@@ -9,6 +9,7 @@ import time
from .api import router from .api import router
from .config import settings from .config import settings
from .core.logging import setup_logging from .core.logging import setup_logging
from .core.tracing import generate_request_id, set_request_id, get_request_id
from .core.metrics_unified import ( from .core.metrics_unified import (
get_metrics_manager, get_metrics_manager,
incr, incr,
@@ -20,7 +21,11 @@ from .core.metrics_unified import (
from .core.job_manager import get_job_manager, shutdown_job_manager from .core.job_manager import get_job_manager, shutdown_job_manager
# 设置日志 # 设置日志
setup_logging(level=settings.log_level, format_type=settings.log_format) setup_logging(
level=settings.log_level,
format_type=settings.log_format,
file_path=settings.log_file_path if settings.log_file_enabled else None,
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# 创建 FastAPI 应用 # 创建 FastAPI 应用
@@ -47,12 +52,37 @@ app.add_middleware(
@app.middleware("http") @app.middleware("http")
async def log_requests(request: Request, call_next): async def log_requests(request: Request, call_next):
"""记录所有HTTP请求""" """记录所有HTTP请求"""
# 从请求头获取或生成 request_id
request_id = request.headers.get("x-request-id") or generate_request_id()
set_request_id(request_id)
logger.info(f"Request: {request.method} {request.url.path}") logger.info(f"Request: {request.method} {request.url.path}")
response = await call_next(request) response = await call_next(request)
logger.info(f"Response: {response.status_code}") logger.info(f"Response: {response.status_code}")
return response return response
def normalize_path(path: str) -> str:
"""
规范化路径,将路径参数替换为模板形式
Args:
path: 原始路径
Returns:
规范化后的路径
Examples:
/jobs/a1b2c3d4e5f6 -> /jobs/{job_id}
/invoke -> /invoke
"""
# 匹配 /jobs/{任意字符串} 模式
if path.startswith("/jobs/") and len(path) > 6:
return "/jobs/{job_id}"
return path
# 指标跟踪中间件 # 指标跟踪中间件
@app.middleware("http") @app.middleware("http")
async def track_metrics(request: Request, call_next): async def track_metrics(request: Request, call_next):
@@ -60,8 +90,9 @@ async def track_metrics(request: Request, call_next):
if not settings.metrics_enabled: if not settings.metrics_enabled:
return await call_next(request) return await call_next(request)
# 跳过 /metrics 端点本身,避免循环记录 # 跳过不需要记录指标的端点
if request.url.path == "/metrics": skip_paths = {"/metrics", "/readyz", "/healthz"}
if request.url.path in skip_paths:
return await call_next(request) return await call_next(request)
gauge_incr("http_requests_in_progress") gauge_incr("http_requests_in_progress")
@@ -79,13 +110,15 @@ async def track_metrics(request: Request, call_next):
raise e raise e
finally: finally:
elapsed = time.time() - start_time elapsed = time.time() - start_time
# 使用规范化后的路径记录指标
normalized_path = normalize_path(request.url.path)
incr( incr(
"http_requests_total", "http_requests_total",
{"method": request.method, "endpoint": request.url.path, "status": status}, {"method": request.method, "endpoint": normalized_path, "status": status},
) )
observe( observe(
"http_request_duration_seconds", "http_request_duration_seconds",
{"method": request.method, "endpoint": request.url.path}, {"method": request.method, "endpoint": normalized_path},
elapsed, elapsed,
) )
gauge_decr("http_requests_in_progress") gauge_decr("http_requests_in_progress")

View File

@@ -186,6 +186,10 @@ class TestJobManagerWithMocks:
manager._redis_client = mock_redis manager._redis_client = mock_redis
manager._register_algorithms() manager._register_algorithms()
# 初始化 semaphore
import asyncio
manager._semaphore = asyncio.Semaphore(10)
await manager.execute_job("test-job-id") await manager.execute_job("test-job-id")
# 验证状态更新被调用 # 验证状态更新被调用
@@ -399,3 +403,97 @@ class TestWebhook:
# 验证重试次数 # 验证重试次数
assert mock_http.post.call_count == 2 assert mock_http.post.call_count == 2
class TestConcurrencyControl:
"""测试并发控制功能"""
@pytest.mark.asyncio
async def test_get_concurrency_status(self):
"""测试获取并发状态"""
manager = JobManager()
# 初始化 semaphore
manager._max_concurrent_jobs = 10
manager._semaphore = asyncio.Semaphore(10)
status = manager.get_concurrency_status()
assert status["max_concurrent"] == 10
assert status["available_slots"] == 10
assert status["running_jobs"] == 0
@pytest.mark.asyncio
async def test_get_concurrency_status_without_semaphore(self):
"""测试未初始化 semaphore 时获取并发状态"""
manager = JobManager()
status = manager.get_concurrency_status()
assert status["max_concurrent"] == 0
assert status["available_slots"] == 0
assert status["running_jobs"] == 0
@pytest.mark.asyncio
async def test_concurrency_limit(self):
"""测试并发限制是否生效"""
manager = JobManager()
# 设置较小的并发限制
manager._max_concurrent_jobs = 2
manager._semaphore = asyncio.Semaphore(2)
# 模拟 Redis
mock_redis = AsyncMock()
mock_redis.hgetall = AsyncMock(
return_value={
"status": "pending",
"algorithm": "PrimeChecker",
"params": '{"number": 17}',
"webhook": "",
"request_id": "test-request-id",
"created_at": "2026-02-02T10:00:00+00:00",
}
)
mock_redis.hset = AsyncMock()
mock_redis.expire = AsyncMock()
manager._redis_client = mock_redis
manager._register_algorithms()
# 创建一个慢速任务
async def slow_execute():
async with manager._semaphore:
await asyncio.sleep(0.1)
# 启动 3 个任务
tasks = [asyncio.create_task(slow_execute()) for _ in range(3)]
# 等待一小段时间,让前两个任务获取 semaphore
await asyncio.sleep(0.01)
# 检查并发状态
status = manager.get_concurrency_status()
assert status["running_jobs"] == 2 # 只有 2 个任务在运行
assert status["available_slots"] == 0 # 没有可用槽位
# 等待所有任务完成
await asyncio.gather(*tasks)
# 检查最终状态
status = manager.get_concurrency_status()
assert status["running_jobs"] == 0
assert status["available_slots"] == 2
def test_concurrency_status_api(self, client):
"""测试并发状态 API 端点"""
response = client.get("/jobs/concurrency/status")
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert "max_concurrent" in data
assert "available_slots" in data
assert "running_jobs" in data
assert isinstance(data["max_concurrent"], int)
assert isinstance(data["available_slots"], int)
assert isinstance(data["running_jobs"], int)

97
tests/test_middleware.py Normal file
View File

@@ -0,0 +1,97 @@
"""中间件测试"""
import pytest
from unittest.mock import patch, MagicMock
from fastapi.testclient import TestClient
from src.functional_scaffold.main import app, normalize_path
class TestNormalizePath:
"""测试路径规范化函数"""
def test_normalize_jobs_path(self):
"""测试 /jobs/{job_id} 路径规范化"""
assert normalize_path("/jobs/a1b2c3d4e5f6") == "/jobs/{job_id}"
assert normalize_path("/jobs/123456789012") == "/jobs/{job_id}"
assert normalize_path("/jobs/xyz") == "/jobs/{job_id}"
def test_normalize_other_paths(self):
"""测试其他路径保持不变"""
assert normalize_path("/invoke") == "/invoke"
assert normalize_path("/healthz") == "/healthz"
assert normalize_path("/readyz") == "/readyz"
assert normalize_path("/metrics") == "/metrics"
assert normalize_path("/docs") == "/docs"
def test_normalize_jobs_root(self):
"""测试 /jobs 根路径"""
assert normalize_path("/jobs") == "/jobs"
class TestMetricsMiddleware:
"""测试指标中间件"""
@patch("src.functional_scaffold.main.incr")
@patch("src.functional_scaffold.main.observe")
@patch("src.functional_scaffold.main.gauge_incr")
@patch("src.functional_scaffold.main.gauge_decr")
def test_skip_health_endpoints(self, mock_gauge_decr, mock_gauge_incr, mock_observe, mock_incr):
"""测试跳过健康检查端点"""
client = TestClient(app)
# 访问健康检查端点
client.get("/healthz")
client.get("/readyz")
client.get("/metrics")
# 验证没有记录指标
mock_incr.assert_not_called()
mock_observe.assert_not_called()
mock_gauge_incr.assert_not_called()
mock_gauge_decr.assert_not_called()
@patch("src.functional_scaffold.main.incr")
@patch("src.functional_scaffold.main.observe")
@patch("src.functional_scaffold.main.gauge_incr")
@patch("src.functional_scaffold.main.gauge_decr")
def test_record_normal_endpoints(self, mock_gauge_decr, mock_gauge_incr, mock_observe, mock_incr):
"""测试记录普通端点"""
client = TestClient(app)
# 访问普通端点
client.post("/invoke", json={"number": 17})
# 验证记录了指标
mock_gauge_incr.assert_called_once()
mock_gauge_decr.assert_called_once()
mock_incr.assert_called_once()
mock_observe.assert_called_once()
# 验证使用了正确的端点路径
incr_call_args = mock_incr.call_args
assert incr_call_args[0][1]["endpoint"] == "/invoke"
@patch("src.functional_scaffold.main.incr")
@patch("src.functional_scaffold.main.observe")
@patch("src.functional_scaffold.main.gauge_incr")
@patch("src.functional_scaffold.main.gauge_decr")
@patch("src.functional_scaffold.core.job_manager.get_job_manager")
def test_normalize_job_path(self, mock_get_manager, mock_gauge_decr, mock_gauge_incr, mock_observe, mock_incr):
"""测试规范化任务路径"""
# Mock job manager
mock_manager = MagicMock()
mock_manager.get_job.return_value = None
mock_get_manager.return_value = mock_manager
client = TestClient(app)
# 访问任务端点(会返回 404但中间件应该记录指标
client.get("/jobs/a1b2c3d4e5f6")
# 验证记录了指标
mock_incr.assert_called_once()
# 验证使用了规范化后的路径
incr_call_args = mock_incr.call_args
assert incr_call_args[0][1]["endpoint"] == "/jobs/{job_id}"