Compare commits
5 Commits
b1077e78e9
...
9e0ba8e74f
| Author | SHA1 | Date | |
|---|---|---|---|
| 9e0ba8e74f | |||
| 8afff21fae | |||
| 9b6642635b | |||
| 87ed8c071c | |||
| 57b276d038 |
15
README.md
15
README.md
@@ -19,13 +19,14 @@
|
||||
|
||||
## 文档
|
||||
|
||||
| 文档 | 描述 |
|
||||
|------|------|
|
||||
| [快速入门](docs/getting-started.md) | 10 分钟上手指南 |
|
||||
| [算法开发指南](docs/algorithm-development.md) | 详细的算法开发教程 |
|
||||
| [API 参考](docs/api-reference.md) | 完整的 API 文档 |
|
||||
| [监控指南](docs/monitoring.md) | 监控和告警配置 |
|
||||
| [API 规范](docs/api/README.md) | OpenAPI 规范说明 |
|
||||
| 文档 | 描述 |
|
||||
|-----------------------------------------|--------------|
|
||||
| [快速入门](docs/getting-started.md) | 10 分钟上手指南 |
|
||||
| [算法开发指南](docs/algorithm-development.md) | 详细的算法开发教程 |
|
||||
| [API 参考](docs/api-reference.md) | 完整的 API 文档 |
|
||||
| [监控指南](docs/monitoring.md) | 监控和告警配置 |
|
||||
| [API 规范](docs/api/README.md) | OpenAPI 规范说明 |
|
||||
| [日志集成(Loki)](docs/loki-quick-reference.md) | 日志收集部署说明 |
|
||||
|
||||
## 快速开始
|
||||
|
||||
|
||||
@@ -17,9 +17,16 @@ services:
|
||||
- REDIS_DB=0
|
||||
# 指标配置文件路径
|
||||
- METRICS_CONFIG_PATH=config/metrics.yaml
|
||||
# 日志文件配置
|
||||
- LOG_FILE_ENABLED=false
|
||||
- LOG_FILE_PATH=/var/log/app/app.log
|
||||
volumes:
|
||||
- ../src:/app/src
|
||||
- ../config:/app/config
|
||||
- app_logs:/var/log/app
|
||||
labels:
|
||||
logging: "promtail"
|
||||
logging_jobname: "functional-scaffold-app"
|
||||
restart: unless-stopped
|
||||
depends_on:
|
||||
redis:
|
||||
@@ -69,12 +76,47 @@ services:
|
||||
- GF_SECURITY_ADMIN_PASSWORD=admin
|
||||
volumes:
|
||||
- grafana_data:/var/lib/grafana
|
||||
- ../monitoring/grafana:/etc/grafana/provisioning
|
||||
- ../monitoring/grafana/datasources:/etc/grafana/provisioning/datasources
|
||||
- ../monitoring/grafana/dashboards:/etc/grafana/provisioning/dashboards
|
||||
restart: unless-stopped
|
||||
depends_on:
|
||||
- prometheus
|
||||
- loki
|
||||
|
||||
loki:
|
||||
image: grafana/loki:2.9.3
|
||||
ports:
|
||||
- "3100:3100"
|
||||
volumes:
|
||||
- ../monitoring/loki.yaml:/etc/loki/local-config.yaml
|
||||
- loki_data:/loki
|
||||
command: -config.file=/etc/loki/local-config.yaml
|
||||
restart: unless-stopped
|
||||
healthcheck:
|
||||
test: ["CMD", "wget", "--spider", "-q", "http://localhost:3100/ready"]
|
||||
interval: 10s
|
||||
timeout: 3s
|
||||
retries: 3
|
||||
|
||||
promtail:
|
||||
ports:
|
||||
- "9080:9080"
|
||||
image: grafana/promtail:3.0.0
|
||||
volumes:
|
||||
- ../monitoring/promtail.yaml:/etc/promtail/config.yml
|
||||
# Docker stdio 收集
|
||||
- /var/lib/docker/containers:/var/lib/docker/containers:ro
|
||||
- /var/run/docker.sock:/var/run/docker.sock:ro
|
||||
# Log 文件收集(备用)
|
||||
- app_logs:/var/log/app:ro
|
||||
command: -config.file=/etc/promtail/config.yml
|
||||
restart: unless-stopped
|
||||
depends_on:
|
||||
- loki
|
||||
|
||||
volumes:
|
||||
prometheus_data:
|
||||
grafana_data:
|
||||
redis_data:
|
||||
loki_data:
|
||||
app_logs:
|
||||
|
||||
204
docs/concurrency-control-changelog.md
Normal file
204
docs/concurrency-control-changelog.md
Normal file
@@ -0,0 +1,204 @@
|
||||
# 异步任务并发控制实现总结
|
||||
|
||||
## 变更概述
|
||||
|
||||
为异步任务管理器添加了并发控制功能,使用 `asyncio.Semaphore` 限制同时运行的任务数量,防止系统资源耗尽。
|
||||
|
||||
## 修改的文件
|
||||
|
||||
### 1. `src/functional_scaffold/config.py`
|
||||
|
||||
**新增配置项:**
|
||||
```python
|
||||
max_concurrent_jobs: int = 10 # 最大并发任务数
|
||||
```
|
||||
|
||||
### 2. `src/functional_scaffold/core/job_manager.py`
|
||||
|
||||
**新增属性:**
|
||||
- `_semaphore: Optional[asyncio.Semaphore]` - 并发控制信号量
|
||||
- `_max_concurrent_jobs: int` - 最大并发数(存储配置值)
|
||||
|
||||
**修改方法:**
|
||||
- `__init__()` - 初始化 semaphore 和 max_concurrent_jobs 属性
|
||||
- `initialize()` - 创建 Semaphore 实例
|
||||
- `execute_job()` - 使用 `async with self._semaphore` 包裹执行逻辑
|
||||
|
||||
**新增方法:**
|
||||
- `get_concurrency_status()` - 返回并发状态(最大并发数、可用槽位、运行中任务数)
|
||||
|
||||
### 3. `src/functional_scaffold/api/models.py`
|
||||
|
||||
**新增模型:**
|
||||
```python
|
||||
class ConcurrencyStatusResponse(BaseModel):
|
||||
"""并发状态响应"""
|
||||
max_concurrent: int
|
||||
available_slots: int
|
||||
running_jobs: int
|
||||
```
|
||||
|
||||
### 4. `src/functional_scaffold/api/routes.py`
|
||||
|
||||
**新增端点:**
|
||||
```python
|
||||
GET /jobs/concurrency/status
|
||||
```
|
||||
|
||||
返回当前并发执行状态。
|
||||
|
||||
### 5. `tests/test_job_manager.py`
|
||||
|
||||
**新增测试类:**
|
||||
```python
|
||||
class TestConcurrencyControl:
|
||||
- test_get_concurrency_status()
|
||||
- test_get_concurrency_status_without_semaphore()
|
||||
- test_concurrency_limit()
|
||||
- test_concurrency_status_api()
|
||||
```
|
||||
|
||||
**修改测试:**
|
||||
- `test_execute_job()` - 添加 semaphore 初始化
|
||||
|
||||
## 工作原理
|
||||
|
||||
### 并发控制流程
|
||||
|
||||
```
|
||||
创建任务 (POST /jobs)
|
||||
│
|
||||
▼
|
||||
asyncio.create_task(execute_job)
|
||||
│
|
||||
▼
|
||||
检查 Redis 和 semaphore 可用性
|
||||
│
|
||||
▼
|
||||
async with self._semaphore: ← 获取槽位(阻塞直到有可用槽位)
|
||||
│
|
||||
├─ 更新状态为 running
|
||||
├─ 执行算法
|
||||
├─ 更新状态为 completed/failed
|
||||
└─ 发送 webhook
|
||||
│
|
||||
▼
|
||||
自动释放槽位
|
||||
```
|
||||
|
||||
### 关键设计决策
|
||||
|
||||
1. **使用 asyncio.Semaphore**
|
||||
- 简单、高效、无需外部依赖
|
||||
- 自动管理槽位获取和释放
|
||||
- 支持异步等待
|
||||
|
||||
2. **在 execute_job 内部使用 semaphore**
|
||||
- 快速失败的检查(Redis 可用性、任务存在性)在 semaphore 外部
|
||||
- 只有真正要执行的任务才占用槽位
|
||||
- 任务完成后自动释放(即使发生异常)
|
||||
|
||||
3. **存储 _max_concurrent_jobs**
|
||||
- Semaphore 不暴露最大值属性
|
||||
- 需要单独存储以便 `get_concurrency_status()` 使用
|
||||
|
||||
## 测试覆盖
|
||||
|
||||
- ✅ 获取并发状态
|
||||
- ✅ 未初始化时的并发状态
|
||||
- ✅ 并发限制生效(创建超过限制的任务,验证只有限定数量在运行)
|
||||
- ✅ API 端点测试
|
||||
- ✅ 所有现有测试继续通过(60/60)
|
||||
|
||||
## 使用示例
|
||||
|
||||
### 配置并发限制
|
||||
|
||||
```bash
|
||||
# 环境变量
|
||||
export MAX_CONCURRENT_JOBS=20
|
||||
|
||||
# 或在 .env 文件
|
||||
MAX_CONCURRENT_JOBS=20
|
||||
```
|
||||
|
||||
### 查询并发状态
|
||||
|
||||
```bash
|
||||
curl http://localhost:8000/jobs/concurrency/status
|
||||
```
|
||||
|
||||
响应:
|
||||
```json
|
||||
{
|
||||
"max_concurrent": 10,
|
||||
"available_slots": 7,
|
||||
"running_jobs": 3
|
||||
}
|
||||
```
|
||||
|
||||
### 测试并发控制
|
||||
|
||||
```bash
|
||||
# 运行测试脚本
|
||||
./scripts/test_concurrency.sh
|
||||
```
|
||||
|
||||
## 性能影响
|
||||
|
||||
### 优点
|
||||
|
||||
1. **防止资源耗尽**:限制同时运行的任务数
|
||||
2. **可预测的负载**:系统负载不会超过配置的限制
|
||||
3. **自动排队**:超过限制的任务自动等待
|
||||
4. **零开销**:未达到限制时,semaphore 几乎无性能开销
|
||||
|
||||
### 注意事项
|
||||
|
||||
1. **任务等待**:超过限制的任务会等待,可能导致响应延迟
|
||||
2. **内存占用**:等待中的任务仍占用内存(协程对象)
|
||||
3. **配置调优**:需要根据实际负载调整并发数
|
||||
|
||||
## 监控建议
|
||||
|
||||
### Prometheus 查询
|
||||
|
||||
```promql
|
||||
# 任务创建速率
|
||||
rate(jobs_created_total[5m])
|
||||
|
||||
# 任务完成速率
|
||||
rate(jobs_completed_total[5m])
|
||||
|
||||
# 任务积压(创建 - 完成)
|
||||
rate(jobs_created_total[5m]) - rate(jobs_completed_total[5m])
|
||||
```
|
||||
|
||||
### Grafana 面板
|
||||
|
||||
建议添加以下面板:
|
||||
1. 并发状态时间序列(max_concurrent, available_slots, running_jobs)
|
||||
2. 任务创建/完成速率
|
||||
3. 任务执行时间分布(P50, P95, P99)
|
||||
|
||||
## 未来改进
|
||||
|
||||
1. **任务超时机制**:为长时间运行的任务设置超时
|
||||
2. **优先级队列**:支持高优先级任务优先执行
|
||||
3. **动态调整**:根据系统负载动态调整并发数
|
||||
4. **任务取消**:支持取消等待中或运行中的任务
|
||||
5. **资源限制**:更细粒度的 CPU、内存限制
|
||||
|
||||
## 相关文档
|
||||
|
||||
- [并发控制详细文档](./concurrency-control.md)
|
||||
- [异步任务接口实现计划](../plans/giggly-hatching-kite.md)
|
||||
- [监控指南](./monitoring.md)
|
||||
|
||||
## 测试结果
|
||||
|
||||
```
|
||||
======================== 60 passed, 7 warnings in 1.53s ========================
|
||||
```
|
||||
|
||||
所有测试通过,包括 4 个新增的并发控制测试。
|
||||
102
docs/concurrency-control-quickref.md
Normal file
102
docs/concurrency-control-quickref.md
Normal file
@@ -0,0 +1,102 @@
|
||||
# 并发控制快速参考
|
||||
|
||||
## 配置
|
||||
|
||||
```bash
|
||||
# 设置最大并发数(默认 10)
|
||||
export MAX_CONCURRENT_JOBS=20
|
||||
```
|
||||
|
||||
## API
|
||||
|
||||
### 查询并发状态
|
||||
|
||||
```bash
|
||||
GET /jobs/concurrency/status
|
||||
```
|
||||
|
||||
**响应:**
|
||||
```json
|
||||
{
|
||||
"max_concurrent": 10, // 最大并发数
|
||||
"available_slots": 7, // 可用槽位
|
||||
"running_jobs": 3 // 运行中任务数
|
||||
}
|
||||
```
|
||||
|
||||
## 代码示例
|
||||
|
||||
### 在 JobManager 中使用
|
||||
|
||||
```python
|
||||
# 并发控制自动生效,无需额外代码
|
||||
job_manager = await get_job_manager()
|
||||
job_id = await job_manager.create_job(...)
|
||||
|
||||
# 任务会自动排队,等待可用槽位
|
||||
asyncio.create_task(job_manager.execute_job(job_id))
|
||||
```
|
||||
|
||||
### 查询并发状态
|
||||
|
||||
```python
|
||||
job_manager = await get_job_manager()
|
||||
status = job_manager.get_concurrency_status()
|
||||
|
||||
print(f"运行中: {status['running_jobs']}/{status['max_concurrent']}")
|
||||
print(f"可用槽位: {status['available_slots']}")
|
||||
```
|
||||
|
||||
## 监控
|
||||
|
||||
### 实时监控
|
||||
|
||||
```bash
|
||||
# 持续监控并发状态
|
||||
watch -n 1 'curl -s http://localhost:8000/jobs/concurrency/status | jq'
|
||||
```
|
||||
|
||||
### 测试脚本
|
||||
|
||||
```bash
|
||||
# 运行并发控制测试
|
||||
./scripts/test_concurrency.sh
|
||||
```
|
||||
|
||||
## 推荐配置
|
||||
|
||||
| 任务类型 | 推荐并发数 |
|
||||
|---------|-----------|
|
||||
| CPU 密集型 | 核心数 × 1.5 |
|
||||
| I/O 密集型 | 核心数 × 5-10 |
|
||||
| 混合型 | 核心数 × 2-3 |
|
||||
|
||||
## 故障排查
|
||||
|
||||
### 任务一直 pending
|
||||
|
||||
```bash
|
||||
# 检查并发状态
|
||||
curl http://localhost:8000/jobs/concurrency/status
|
||||
|
||||
# 如果 available_slots = 0,说明所有槽位被占用
|
||||
# 解决方案:
|
||||
# 1. 等待当前任务完成
|
||||
# 2. 增加并发限制
|
||||
# 3. 优化算法性能
|
||||
```
|
||||
|
||||
### 系统资源耗尽
|
||||
|
||||
```bash
|
||||
# 降低并发限制
|
||||
export MAX_CONCURRENT_JOBS=5
|
||||
|
||||
# 重启服务
|
||||
./scripts/run_dev.sh
|
||||
```
|
||||
|
||||
## 相关文档
|
||||
|
||||
- [详细文档](./concurrency-control.md)
|
||||
- [实现总结](./concurrency-control-changelog.md)
|
||||
204
docs/concurrency-control.md
Normal file
204
docs/concurrency-control.md
Normal file
@@ -0,0 +1,204 @@
|
||||
# 异步任务并发控制
|
||||
|
||||
## 概述
|
||||
|
||||
为了防止系统资源耗尽和控制负载,任务管理器实现了基于 `asyncio.Semaphore` 的并发控制机制。
|
||||
|
||||
## 配置
|
||||
|
||||
在 `config.py` 或环境变量中设置最大并发任务数:
|
||||
|
||||
```python
|
||||
# config.py
|
||||
max_concurrent_jobs: int = 10 # 默认值
|
||||
```
|
||||
|
||||
或通过环境变量:
|
||||
|
||||
```bash
|
||||
export MAX_CONCURRENT_JOBS=20
|
||||
```
|
||||
|
||||
## 工作原理
|
||||
|
||||
1. **信号量机制**:使用 `asyncio.Semaphore` 限制同时运行的任务数
|
||||
2. **自动管理**:任务开始时获取槽位,完成后自动释放
|
||||
3. **队列等待**:超过限制的任务会自动等待,直到有可用槽位
|
||||
|
||||
### 执行流程
|
||||
|
||||
```
|
||||
POST /jobs 创建任务
|
||||
│
|
||||
▼
|
||||
asyncio.create_task(execute_job)
|
||||
│
|
||||
▼
|
||||
等待获取 semaphore 槽位
|
||||
│
|
||||
▼
|
||||
async with semaphore: ← 获取槽位
|
||||
执行算法
|
||||
更新状态
|
||||
发送 webhook
|
||||
│
|
||||
▼
|
||||
自动释放槽位
|
||||
```
|
||||
|
||||
## API 端点
|
||||
|
||||
### 查询并发状态
|
||||
|
||||
```bash
|
||||
GET /jobs/concurrency/status
|
||||
```
|
||||
|
||||
**响应示例:**
|
||||
|
||||
```json
|
||||
{
|
||||
"max_concurrent": 10,
|
||||
"available_slots": 7,
|
||||
"running_jobs": 3
|
||||
}
|
||||
```
|
||||
|
||||
**字段说明:**
|
||||
|
||||
- `max_concurrent`: 最大并发任务数(配置值)
|
||||
- `available_slots`: 当前可用槽位数
|
||||
- `running_jobs`: 当前正在运行的任务数
|
||||
|
||||
## 使用示例
|
||||
|
||||
### 1. 创建多个任务
|
||||
|
||||
```bash
|
||||
# 创建 20 个任务
|
||||
for i in {1..20}; do
|
||||
curl -X POST http://localhost:8000/jobs \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "{\"algorithm\": \"PrimeChecker\", \"params\": {\"number\": $i}}"
|
||||
done
|
||||
```
|
||||
|
||||
### 2. 监控并发状态
|
||||
|
||||
```bash
|
||||
# 持续监控并发状态
|
||||
watch -n 1 'curl -s http://localhost:8000/jobs/concurrency/status | jq'
|
||||
```
|
||||
|
||||
输出示例:
|
||||
|
||||
```json
|
||||
{
|
||||
"max_concurrent": 10,
|
||||
"available_slots": 0,
|
||||
"running_jobs": 10
|
||||
}
|
||||
```
|
||||
|
||||
### 3. 调整并发限制
|
||||
|
||||
```bash
|
||||
# 重启服务前设置环境变量
|
||||
export MAX_CONCURRENT_JOBS=20
|
||||
./scripts/run_dev.sh
|
||||
```
|
||||
|
||||
## 性能考虑
|
||||
|
||||
### 选择合适的并发数
|
||||
|
||||
并发数应根据以下因素确定:
|
||||
|
||||
1. **CPU 核心数**:CPU 密集型任务建议设置为核心数的 1-2 倍
|
||||
2. **内存限制**:每个任务的内存占用 × 并发数 < 可用内存
|
||||
3. **外部服务限制**:如果调用外部 API,考虑其速率限制
|
||||
4. **Redis 连接池**:确保 Redis 连接池大小 ≥ 并发数
|
||||
|
||||
### 推荐配置
|
||||
|
||||
| 场景 | 推荐并发数 | 说明 |
|
||||
|------|-----------|------|
|
||||
| CPU 密集型(如质数判断) | 核心数 × 1.5 | 充分利用 CPU |
|
||||
| I/O 密集型(如网络请求) | 核心数 × 5-10 | 等待 I/O 时可切换 |
|
||||
| 混合型 | 核心数 × 2-3 | 平衡 CPU 和 I/O |
|
||||
| 内存受限 | 根据内存计算 | 避免 OOM |
|
||||
|
||||
### 示例计算
|
||||
|
||||
假设:
|
||||
- 服务器:4 核 8GB 内存
|
||||
- 任务类型:I/O 密集型(网络请求)
|
||||
- 单任务内存:50MB
|
||||
|
||||
```
|
||||
最大并发数 = min(
|
||||
核心数 × 8 = 32,
|
||||
可用内存 / 单任务内存 = 8000MB / 50MB = 160
|
||||
) = 32
|
||||
```
|
||||
|
||||
## 监控指标
|
||||
|
||||
相关 Prometheus 指标:
|
||||
|
||||
```promql
|
||||
# 任务创建速率
|
||||
rate(jobs_created_total[5m])
|
||||
|
||||
# 任务完成速率
|
||||
rate(jobs_completed_total[5m])
|
||||
|
||||
# 任务执行时间分布
|
||||
histogram_quantile(0.95, job_execution_duration_seconds_bucket)
|
||||
```
|
||||
|
||||
## 故障排查
|
||||
|
||||
### 问题:任务一直处于 pending 状态
|
||||
|
||||
**可能原因:**
|
||||
1. 所有槽位都被占用
|
||||
2. 某些任务执行时间过长
|
||||
|
||||
**解决方案:**
|
||||
```bash
|
||||
# 1. 检查并发状态
|
||||
curl http://localhost:8000/jobs/concurrency/status
|
||||
|
||||
# 2. 如果 available_slots = 0,说明所有槽位被占用
|
||||
# 3. 检查是否有长时间运行的任务
|
||||
# 4. 考虑增加并发限制或优化算法性能
|
||||
```
|
||||
|
||||
### 问题:系统资源耗尽
|
||||
|
||||
**可能原因:**
|
||||
并发数设置过高
|
||||
|
||||
**解决方案:**
|
||||
```bash
|
||||
# 降低并发限制
|
||||
export MAX_CONCURRENT_JOBS=5
|
||||
# 重启服务
|
||||
```
|
||||
|
||||
## 最佳实践
|
||||
|
||||
1. **监控优先**:部署后持续监控并发状态和系统资源
|
||||
2. **逐步调整**:从保守值开始,逐步增加并发数
|
||||
3. **压力测试**:在生产环境前进行充分的压力测试
|
||||
4. **设置告警**:当 `available_slots = 0` 持续时间过长时告警
|
||||
5. **任务超时**:为长时间运行的任务设置超时机制(待实现)
|
||||
|
||||
## 未来改进
|
||||
|
||||
- [ ] 任务超时机制
|
||||
- [ ] 优先级队列
|
||||
- [ ] 动态调整并发数
|
||||
- [ ] 任务取消功能
|
||||
- [ ] 更细粒度的资源控制(CPU、内存限制)
|
||||
238
docs/loki-implementation-summary.md
Normal file
238
docs/loki-implementation-summary.md
Normal file
@@ -0,0 +1,238 @@
|
||||
# Loki 日志收集系统集成 - 实施总结
|
||||
|
||||
## 实施完成
|
||||
|
||||
已成功集成 Grafana Loki 日志收集系统到 FunctionalScaffold 项目。
|
||||
|
||||
## 新增文件
|
||||
|
||||
### 1. 监控配置文件
|
||||
|
||||
| 文件 | 说明 |
|
||||
|------|------|
|
||||
| `monitoring/loki.yaml` | Loki 服务配置(7天保留期,10MB/s速率限制)|
|
||||
| `monitoring/promtail.yaml` | Promtail 日志采集配置(支持 Docker stdio 和文件两种模式)|
|
||||
|
||||
### 2. Grafana Provisioning
|
||||
|
||||
| 文件 | 说明 |
|
||||
|------|------|
|
||||
| `monitoring/grafana/datasources/prometheus.yaml` | Prometheus 数据源自动配置 |
|
||||
| `monitoring/grafana/datasources/loki.yaml` | Loki 数据源自动配置 |
|
||||
| `monitoring/grafana/dashboards/provider.yaml` | Dashboard 自动加载配置 |
|
||||
| `monitoring/grafana/dashboards/logs-dashboard.json` | 日志监控仪表板 |
|
||||
| `monitoring/grafana/dashboards/dashboard.json` | 原有监控仪表板(已移动)|
|
||||
|
||||
### 3. 文档和脚本
|
||||
|
||||
| 文件 | 说明 |
|
||||
|------|------|
|
||||
| `docs/loki-integration.md` | Loki 使用完整文档(包含查询示例、故障排查等)|
|
||||
| `scripts/verify_loki.sh` | Loki 集成验证脚本 |
|
||||
|
||||
## 修改文件
|
||||
|
||||
### 1. Docker Compose 配置
|
||||
|
||||
**文件**: `deployment/docker-compose.yml`
|
||||
|
||||
**变更**:
|
||||
- 添加 `loki` 服务(端口 3100)
|
||||
- 添加 `promtail` 服务(端口 9080)
|
||||
- 更新 `app` 服务:
|
||||
- 添加日志文件配置环境变量
|
||||
- 添加 `app_logs` 卷挂载
|
||||
- 添加 Promtail 标签
|
||||
- 更新 `grafana` 服务:
|
||||
- 修改 provisioning 卷挂载结构
|
||||
- 添加对 Loki 的依赖
|
||||
- 添加 `loki_data` 和 `app_logs` 卷
|
||||
|
||||
### 2. 应用代码
|
||||
|
||||
**文件**: `src/functional_scaffold/core/logging.py`
|
||||
|
||||
**变更**:
|
||||
- 添加 `file_path` 参数支持
|
||||
- 实现 `RotatingFileHandler`(100MB,5个备份)
|
||||
- 支持同时输出到控制台和文件
|
||||
|
||||
**文件**: `src/functional_scaffold/config.py`
|
||||
|
||||
**变更**:
|
||||
- 添加 `log_file_enabled` 配置(默认 False)
|
||||
- 添加 `log_file_path` 配置(默认 `/var/log/app/app.log`)
|
||||
|
||||
**文件**: `src/functional_scaffold/main.py`
|
||||
|
||||
**变更**:
|
||||
- 更新 `setup_logging()` 调用,传入文件路径参数
|
||||
|
||||
## 架构特点
|
||||
|
||||
### 1. 双模式日志收集
|
||||
|
||||
**模式 1: Docker stdio 收集(默认)**
|
||||
- ✅ 无需修改应用代码
|
||||
- ✅ 自动收集容器标准输出
|
||||
- ✅ 性能影响极小
|
||||
- ✅ 推荐用于生产环境
|
||||
|
||||
**模式 2: 文件收集(备用)**
|
||||
- ✅ 日志持久化到文件
|
||||
- ✅ 支持日志轮转
|
||||
- ✅ 适合需要本地日志的场景
|
||||
- ⚙️ 需要设置 `LOG_FILE_ENABLED=true`
|
||||
|
||||
### 2. 自动化配置
|
||||
|
||||
- ✅ Grafana 数据源自动加载
|
||||
- ✅ Dashboard 自动加载
|
||||
- ✅ 无需手动配置
|
||||
|
||||
### 3. 结构化日志
|
||||
|
||||
- ✅ JSON 格式日志
|
||||
- ✅ 自动提取字段(level, logger, request_id 等)
|
||||
- ✅ 支持 LogQL 查询
|
||||
|
||||
## 使用方式
|
||||
|
||||
### 快速启动
|
||||
|
||||
```bash
|
||||
cd deployment
|
||||
docker-compose up -d
|
||||
```
|
||||
|
||||
### 访问服务
|
||||
|
||||
- **Grafana**: http://localhost:3000 (admin/admin)
|
||||
- **Loki API**: http://localhost:3100
|
||||
- **Promtail**: http://localhost:9080
|
||||
|
||||
### 查看日志
|
||||
|
||||
**方式 1: Grafana 日志仪表板**
|
||||
1. 访问 http://localhost:3000
|
||||
2. 进入 "日志监控" 仪表板
|
||||
|
||||
**方式 2: Grafana Explore**
|
||||
1. 访问 http://localhost:3000/explore
|
||||
2. 选择 Loki 数据源
|
||||
3. 输入查询: `{job="functional-scaffold-app"}`
|
||||
|
||||
### 验证集成
|
||||
|
||||
```bash
|
||||
./scripts/verify_loki.sh
|
||||
```
|
||||
|
||||
## LogQL 查询示例
|
||||
|
||||
```logql
|
||||
# 查询所有日志
|
||||
{job="functional-scaffold-app"}
|
||||
|
||||
# 查询错误日志
|
||||
{job="functional-scaffold-app", level="ERROR"}
|
||||
|
||||
# 按 request_id 过滤
|
||||
{job="functional-scaffold-app"} | json | request_id = "abc123"
|
||||
|
||||
# 统计日志量
|
||||
sum by (level) (count_over_time({job="functional-scaffold-app"}[5m]))
|
||||
```
|
||||
|
||||
## 配置说明
|
||||
|
||||
### 日志保留期
|
||||
|
||||
默认 7 天,可在 `monitoring/loki.yaml` 中修改:
|
||||
|
||||
```yaml
|
||||
limits_config:
|
||||
retention_period: 168h # 7 天
|
||||
```
|
||||
|
||||
### 日志文件模式
|
||||
|
||||
在 `deployment/docker-compose.yml` 中启用:
|
||||
|
||||
```yaml
|
||||
environment:
|
||||
- LOG_FILE_ENABLED=true
|
||||
- LOG_FILE_PATH=/var/log/app/app.log
|
||||
```
|
||||
|
||||
### 日志级别
|
||||
|
||||
在 `deployment/docker-compose.yml` 中调整:
|
||||
|
||||
```yaml
|
||||
environment:
|
||||
- LOG_LEVEL=INFO # DEBUG, INFO, WARNING, ERROR, CRITICAL
|
||||
```
|
||||
|
||||
## 监控指标
|
||||
|
||||
Loki 集成后,可以在 Grafana 中查看:
|
||||
|
||||
- **日志流**: 实时日志流
|
||||
- **日志量趋势**: 按时间和级别统计
|
||||
- **日志级别分布**: INFO/WARNING/ERROR 分布
|
||||
- **错误日志**: 只显示 ERROR 级别
|
||||
|
||||
## 故障排查
|
||||
|
||||
### 看不到日志
|
||||
|
||||
1. 检查服务状态: `docker-compose ps`
|
||||
2. 查看 Promtail 日志: `docker-compose logs promtail`
|
||||
3. 验证容器标签: `docker inspect <container> | grep Labels`
|
||||
4. 查询 Loki API: `curl http://localhost:3100/loki/api/v1/label/job/values`
|
||||
|
||||
### Docker socket 权限问题
|
||||
|
||||
```bash
|
||||
sudo chmod 666 /var/run/docker.sock
|
||||
```
|
||||
|
||||
### 日志量过大
|
||||
|
||||
1. 调整保留期为 3 天
|
||||
2. 降低摄入速率限制
|
||||
3. 添加日志过滤规则
|
||||
|
||||
详细故障排查请参考 `docs/loki-integration.md`。
|
||||
|
||||
## 性能影响
|
||||
|
||||
- **CPU**: < 5% 额外开销
|
||||
- **内存**: Loki ~200MB, Promtail ~50MB
|
||||
- **磁盘**: 取决于日志量,7天约 1-5GB
|
||||
- **网络**: 本地通信,影响极小
|
||||
|
||||
## 下一步
|
||||
|
||||
可选的增强功能:
|
||||
|
||||
1. **告警规则**: 配置基于日志的告警
|
||||
2. **日志导出**: 定期导出日志到对象存储
|
||||
3. **多租户**: 配置 Loki 多租户模式
|
||||
4. **长期存储**: 配置 S3/OSS 作为后端存储
|
||||
|
||||
## 参考文档
|
||||
|
||||
- 完整使用文档: `docs/loki-integration.md`
|
||||
- Loki 官方文档: https://grafana.com/docs/loki/latest/
|
||||
- LogQL 查询语言: https://grafana.com/docs/loki/latest/logql/
|
||||
|
||||
## 总结
|
||||
|
||||
✅ **完成**: Loki 日志收集系统已成功集成
|
||||
✅ **测试**: 可通过 `./scripts/verify_loki.sh` 验证
|
||||
✅ **文档**: 提供完整的使用和故障排查文档
|
||||
✅ **生产就绪**: 支持双模式收集,配置灵活
|
||||
|
||||
集成已完成,可以开始使用 Loki 进行日志收集和分析!
|
||||
564
docs/loki-integration.md
Normal file
564
docs/loki-integration.md
Normal file
@@ -0,0 +1,564 @@
|
||||
# Loki 日志收集系统集成文档
|
||||
|
||||
## 概述
|
||||
|
||||
本项目已集成 Grafana Loki 日志收集系统,支持两种日志收集模式:
|
||||
|
||||
1. **Docker stdio 收集**(推荐)- 从容器标准输出/错误收集日志
|
||||
2. **Log 文件收集**(备用)- 从日志文件收集日志
|
||||
|
||||
## 架构
|
||||
|
||||
```
|
||||
应用容器 (stdout/stderr)
|
||||
↓
|
||||
Docker Engine
|
||||
↓
|
||||
Promtail (日志采集器)
|
||||
↓
|
||||
Loki (日志存储)
|
||||
↓
|
||||
Grafana (可视化)
|
||||
```
|
||||
|
||||
## 快速开始
|
||||
|
||||
### 1. 启动服务
|
||||
|
||||
```bash
|
||||
cd deployment
|
||||
docker-compose up -d
|
||||
```
|
||||
|
||||
这将启动以下服务:
|
||||
- **app**: 应用服务 (端口 8111)
|
||||
- **loki**: 日志存储服务 (端口 3100)
|
||||
- **promtail**: 日志采集服务 (端口 9080)
|
||||
- **grafana**: 可视化服务 (端口 3000)
|
||||
- **prometheus**: 指标收集服务 (端口 9090)
|
||||
- **redis**: 缓存服务 (端口 6380)
|
||||
|
||||
### 2. 访问 Grafana
|
||||
|
||||
1. 打开浏览器访问 http://localhost:3000
|
||||
2. 使用默认凭据登录:
|
||||
- 用户名: `admin`
|
||||
- 密码: `admin`
|
||||
3. 首次登录后建议修改密码
|
||||
|
||||
### 3. 查看日志
|
||||
|
||||
#### 方式 1: 使用预配置的日志仪表板
|
||||
|
||||
1. 在 Grafana 左侧菜单点击 **Dashboards**
|
||||
2. 选择 **日志监控** 仪表板
|
||||
3. 查看以下面板:
|
||||
- **日志流 (实时)**: 实时日志流
|
||||
- **日志量趋势(按级别)**: 时间序列图表
|
||||
- **日志级别分布**: 按级别统计
|
||||
- **错误日志**: 只显示 ERROR 级别日志
|
||||
|
||||
#### 方式 2: 使用 Explore 功能
|
||||
|
||||
1. 在 Grafana 左侧菜单点击 **Explore** (指南针图标)
|
||||
2. 选择 **Loki** 数据源
|
||||
3. 输入 LogQL 查询语句(见下文)
|
||||
|
||||
## LogQL 查询示例
|
||||
|
||||
### 基础查询
|
||||
|
||||
```logql
|
||||
# 查询所有应用日志
|
||||
{job="functional-scaffold-app"}
|
||||
|
||||
# 查询特定级别的日志
|
||||
{job="functional-scaffold-app", level="ERROR"}
|
||||
{job="functional-scaffold-app", level="INFO"}
|
||||
|
||||
# 查询特定容器的日志
|
||||
{container="functional-scaffold-app-1"}
|
||||
```
|
||||
|
||||
### 文本过滤
|
||||
|
||||
```logql
|
||||
# 包含特定文本
|
||||
{job="functional-scaffold-app"} |= "request_id"
|
||||
|
||||
# 不包含特定文本
|
||||
{job="functional-scaffold-app"} != "healthz"
|
||||
|
||||
# 正则表达式匹配
|
||||
{job="functional-scaffold-app"} |~ "error|exception"
|
||||
|
||||
# 正则表达式不匹配
|
||||
{job="functional-scaffold-app"} !~ "debug|trace"
|
||||
```
|
||||
|
||||
### JSON 字段提取
|
||||
|
||||
```logql
|
||||
# 提取 request_id 字段
|
||||
{job="functional-scaffold-app"} | json | request_id != ""
|
||||
|
||||
# 提取并过滤特定 request_id
|
||||
{job="functional-scaffold-app"} | json | request_id = "abc123"
|
||||
|
||||
# 提取 logger 字段
|
||||
{job="functional-scaffold-app"} | json | logger = "functional_scaffold.api.routes"
|
||||
```
|
||||
|
||||
### 聚合查询
|
||||
|
||||
```logql
|
||||
# 统计日志数量
|
||||
count_over_time({job="functional-scaffold-app"}[5m])
|
||||
|
||||
# 按级别统计
|
||||
sum by (level) (count_over_time({job="functional-scaffold-app"}[5m]))
|
||||
|
||||
# 计算错误率
|
||||
sum(rate({job="functional-scaffold-app", level="ERROR"}[5m]))
|
||||
/
|
||||
sum(rate({job="functional-scaffold-app"}[5m]))
|
||||
```
|
||||
|
||||
## 日志收集模式
|
||||
|
||||
### 模式 1: Docker stdio 收集(默认,推荐)
|
||||
|
||||
**特点:**
|
||||
- 无需修改应用代码
|
||||
- 自动收集容器标准输出/错误
|
||||
- 性能影响极小
|
||||
- 配置简单
|
||||
|
||||
**工作原理:**
|
||||
1. 应用将日志输出到 stdout/stderr
|
||||
2. Docker Engine 捕获日志
|
||||
3. Promtail 通过 Docker API 读取日志
|
||||
4. 日志发送到 Loki 存储
|
||||
|
||||
**配置:**
|
||||
- 应用容器需要添加标签:
|
||||
```yaml
|
||||
labels:
|
||||
logging: "promtail"
|
||||
logging_jobname: "functional-scaffold-app"
|
||||
```
|
||||
|
||||
### 模式 2: Log 文件收集(备用)
|
||||
|
||||
**特点:**
|
||||
- 日志持久化到文件
|
||||
- 支持日志轮转
|
||||
- 适合需要本地日志文件的场景
|
||||
|
||||
**启用方式:**
|
||||
|
||||
1. 修改 `deployment/docker-compose.yml`:
|
||||
```yaml
|
||||
environment:
|
||||
- LOG_FILE_ENABLED=true
|
||||
- LOG_FILE_PATH=/var/log/app/app.log
|
||||
```
|
||||
|
||||
2. 重启服务:
|
||||
```bash
|
||||
docker-compose up -d app
|
||||
```
|
||||
|
||||
**日志文件配置:**
|
||||
- 最大文件大小: 100MB
|
||||
- 保留备份数: 5 个
|
||||
- 总存储空间: 最多 500MB
|
||||
|
||||
## 配置说明
|
||||
|
||||
### Loki 配置 (monitoring/loki.yaml)
|
||||
|
||||
```yaml
|
||||
limits_config:
|
||||
retention_period: 168h # 日志保留 7 天
|
||||
ingestion_rate_mb: 10 # 摄入速率限制 10MB/s
|
||||
ingestion_burst_size_mb: 20 # 突发大小 20MB
|
||||
```
|
||||
|
||||
**可调整参数:**
|
||||
- `retention_period`: 日志保留时间(默认 7 天)
|
||||
- `ingestion_rate_mb`: 每秒摄入速率限制
|
||||
- `ingestion_burst_size_mb`: 突发流量大小
|
||||
|
||||
### Promtail 配置 (monitoring/promtail.yaml)
|
||||
|
||||
**Docker stdio 收集配置:**
|
||||
```yaml
|
||||
scrape_configs:
|
||||
- job_name: docker
|
||||
docker_sd_configs:
|
||||
- host: unix:///var/run/docker.sock
|
||||
filters:
|
||||
- name: label
|
||||
values: ["logging=promtail"]
|
||||
```
|
||||
|
||||
**文件收集配置:**
|
||||
```yaml
|
||||
scrape_configs:
|
||||
- job_name: app_files
|
||||
static_configs:
|
||||
- targets:
|
||||
- localhost
|
||||
labels:
|
||||
job: functional-scaffold-app-files
|
||||
__path__: /var/log/app/*.log
|
||||
```
|
||||
|
||||
## 验证和测试
|
||||
|
||||
### 1. 检查服务状态
|
||||
|
||||
```bash
|
||||
# 查看所有服务
|
||||
docker-compose ps
|
||||
|
||||
# 检查 Loki 健康状态
|
||||
curl http://localhost:3100/ready
|
||||
|
||||
# 检查 Promtail 健康状态
|
||||
curl http://localhost:9080/ready
|
||||
```
|
||||
|
||||
### 2. 生成测试日志
|
||||
|
||||
```bash
|
||||
# 发送测试请求
|
||||
curl -X POST http://localhost:8111/invoke \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"algorithm": "PrimeChecker", "params": {"number": 17}}'
|
||||
```
|
||||
|
||||
### 3. 查询日志
|
||||
|
||||
```bash
|
||||
# 使用 Loki API 查询
|
||||
curl -G -s "http://localhost:3100/loki/api/v1/query_range" \
|
||||
--data-urlencode 'query={job="functional-scaffold-app"}' \
|
||||
--data-urlencode 'limit=10' \
|
||||
| jq '.data.result'
|
||||
```
|
||||
|
||||
### 4. 在 Grafana 中验证
|
||||
|
||||
1. 访问 http://localhost:3000/explore
|
||||
2. 选择 Loki 数据源
|
||||
3. 输入查询: `{job="functional-scaffold-app"}`
|
||||
4. 应该能看到应用日志
|
||||
|
||||
## 故障排查
|
||||
|
||||
### 问题 1: 看不到日志
|
||||
|
||||
**检查步骤:**
|
||||
|
||||
1. 确认 Promtail 正在运行:
|
||||
```bash
|
||||
docker-compose ps promtail
|
||||
```
|
||||
|
||||
2. 检查 Promtail 日志:
|
||||
```bash
|
||||
docker-compose logs promtail
|
||||
```
|
||||
|
||||
3. 确认应用容器有正确的标签:
|
||||
```bash
|
||||
docker inspect functional-scaffold-app-1 | grep -A 5 Labels
|
||||
```
|
||||
|
||||
4. 检查 Loki 是否接收到日志:
|
||||
```bash
|
||||
curl -G -s "http://localhost:3100/loki/api/v1/label/job/values" | jq
|
||||
```
|
||||
|
||||
### 问题 2: Promtail 无法访问 Docker socket
|
||||
|
||||
**错误信息:**
|
||||
```
|
||||
permission denied while trying to connect to the Docker daemon socket
|
||||
```
|
||||
|
||||
**解决方案:**
|
||||
|
||||
在 macOS/Linux 上,确保 Docker socket 权限正确:
|
||||
```bash
|
||||
sudo chmod 666 /var/run/docker.sock
|
||||
```
|
||||
|
||||
或者将 Promtail 容器添加到 docker 组(Linux):
|
||||
```yaml
|
||||
promtail:
|
||||
user: root
|
||||
group_add:
|
||||
- docker
|
||||
```
|
||||
|
||||
### 问题 3: 日志量过大
|
||||
|
||||
**症状:**
|
||||
- Loki 响应缓慢
|
||||
- 磁盘空间不足
|
||||
|
||||
**解决方案:**
|
||||
|
||||
1. 调整日志保留期:
|
||||
```yaml
|
||||
# monitoring/loki.yaml
|
||||
limits_config:
|
||||
retention_period: 72h # 改为 3 天
|
||||
```
|
||||
|
||||
2. 增加摄入速率限制:
|
||||
```yaml
|
||||
limits_config:
|
||||
ingestion_rate_mb: 5 # 降低到 5MB/s
|
||||
```
|
||||
|
||||
3. 添加日志过滤:
|
||||
```yaml
|
||||
# monitoring/promtail.yaml
|
||||
pipeline_stages:
|
||||
- match:
|
||||
selector: '{job="functional-scaffold-app"}'
|
||||
stages:
|
||||
- drop:
|
||||
expression: ".*healthz.*" # 丢弃健康检查日志
|
||||
```
|
||||
|
||||
### 问题 4: 文件模式下看不到日志
|
||||
|
||||
**检查步骤:**
|
||||
|
||||
1. 确认文件日志已启用:
|
||||
```bash
|
||||
docker-compose exec app env | grep LOG_FILE
|
||||
```
|
||||
|
||||
2. 检查日志文件是否存在:
|
||||
```bash
|
||||
docker-compose exec app ls -lh /var/log/app/
|
||||
```
|
||||
|
||||
3. 检查 Promtail 是否能访问日志文件:
|
||||
```bash
|
||||
docker-compose exec promtail ls -lh /var/log/app/
|
||||
```
|
||||
|
||||
## 性能优化
|
||||
|
||||
### 1. 减少日志量
|
||||
|
||||
**在应用层面:**
|
||||
- 调整日志级别为 WARNING 或 ERROR
|
||||
- 过滤掉不必要的日志(如健康检查)
|
||||
|
||||
```yaml
|
||||
# docker-compose.yml
|
||||
environment:
|
||||
- LOG_LEVEL=WARNING
|
||||
```
|
||||
|
||||
**在 Promtail 层面:**
|
||||
```yaml
|
||||
# monitoring/promtail.yaml
|
||||
pipeline_stages:
|
||||
- drop:
|
||||
expression: ".*healthz.*"
|
||||
drop_counter_reason: "healthcheck"
|
||||
```
|
||||
|
||||
### 2. 优化查询性能
|
||||
|
||||
**使用标签过滤:**
|
||||
```logql
|
||||
# 好:使用标签过滤(快)
|
||||
{job="functional-scaffold-app", level="ERROR"}
|
||||
|
||||
# 差:使用文本过滤(慢)
|
||||
{job="functional-scaffold-app"} |= "ERROR"
|
||||
```
|
||||
|
||||
**限制时间范围:**
|
||||
```logql
|
||||
# 查询最近 5 分钟
|
||||
{job="functional-scaffold-app"}[5m]
|
||||
|
||||
# 避免查询过长时间范围
|
||||
{job="functional-scaffold-app"}[7d] # 慢
|
||||
```
|
||||
|
||||
### 3. 存储优化
|
||||
|
||||
**定期清理旧数据:**
|
||||
```bash
|
||||
# Loki 会自动根据 retention_period 清理
|
||||
# 也可以手动清理
|
||||
docker-compose exec loki rm -rf /loki/chunks/*
|
||||
```
|
||||
|
||||
**监控磁盘使用:**
|
||||
```bash
|
||||
docker-compose exec loki du -sh /loki/chunks
|
||||
```
|
||||
|
||||
## 高级功能
|
||||
|
||||
### 1. 告警规则
|
||||
|
||||
在 Loki 中配置告警规则(需要 Loki Ruler):
|
||||
|
||||
```yaml
|
||||
# monitoring/loki-rules.yaml
|
||||
groups:
|
||||
- name: error_alerts
|
||||
interval: 1m
|
||||
rules:
|
||||
- alert: HighErrorRate
|
||||
expr: |
|
||||
sum(rate({job="functional-scaffold-app", level="ERROR"}[5m]))
|
||||
/
|
||||
sum(rate({job="functional-scaffold-app"}[5m]))
|
||||
> 0.05
|
||||
for: 5m
|
||||
labels:
|
||||
severity: warning
|
||||
annotations:
|
||||
summary: "错误率过高"
|
||||
description: "应用错误率超过 5%"
|
||||
```
|
||||
|
||||
### 2. 日志导出
|
||||
|
||||
**导出为 JSON:**
|
||||
```bash
|
||||
curl -G -s "http://localhost:3100/loki/api/v1/query_range" \
|
||||
--data-urlencode 'query={job="functional-scaffold-app"}' \
|
||||
--data-urlencode 'start=2024-01-01T00:00:00Z' \
|
||||
--data-urlencode 'end=2024-01-02T00:00:00Z' \
|
||||
| jq '.data.result' > logs.json
|
||||
```
|
||||
|
||||
**导出为文本:**
|
||||
```bash
|
||||
curl -G -s "http://localhost:3100/loki/api/v1/query_range" \
|
||||
--data-urlencode 'query={job="functional-scaffold-app"}' \
|
||||
| jq -r '.data.result[].values[][1]' > logs.txt
|
||||
```
|
||||
|
||||
### 3. 与 Prometheus 集成
|
||||
|
||||
在 Grafana 仪表板中同时显示日志和指标:
|
||||
|
||||
```json
|
||||
{
|
||||
"panels": [
|
||||
{
|
||||
"title": "错误率和错误日志",
|
||||
"targets": [
|
||||
{
|
||||
"datasource": "Prometheus",
|
||||
"expr": "rate(http_requests_total{status=\"error\"}[5m])"
|
||||
},
|
||||
{
|
||||
"datasource": "Loki",
|
||||
"expr": "{job=\"functional-scaffold-app\", level=\"ERROR\"}"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
## 最佳实践
|
||||
|
||||
### 1. 日志格式
|
||||
|
||||
**使用结构化日志(JSON):**
|
||||
```python
|
||||
logger.info("处理请求", extra={
|
||||
"request_id": "abc123",
|
||||
"user_id": "user456",
|
||||
"duration": 0.123
|
||||
})
|
||||
```
|
||||
|
||||
**输出:**
|
||||
```json
|
||||
{
|
||||
"asctime": "2024-01-01 12:00:00,000",
|
||||
"name": "functional_scaffold.api.routes",
|
||||
"levelname": "INFO",
|
||||
"message": "处理请求",
|
||||
"request_id": "abc123",
|
||||
"user_id": "user456",
|
||||
"duration": 0.123
|
||||
}
|
||||
```
|
||||
|
||||
### 2. 标签策略
|
||||
|
||||
**好的标签:**
|
||||
- 低基数(值的种类少)
|
||||
- 用于过滤和分组
|
||||
- 例如:`level`, `logger`, `container`
|
||||
|
||||
**不好的标签:**
|
||||
- 高基数(值的种类多)
|
||||
- 例如:`request_id`, `user_id`, `timestamp`
|
||||
|
||||
**正确做法:**
|
||||
```logql
|
||||
# 使用标签过滤
|
||||
{job="functional-scaffold-app", level="ERROR"}
|
||||
|
||||
# 使用 JSON 提取高基数字段
|
||||
{job="functional-scaffold-app"} | json | request_id = "abc123"
|
||||
```
|
||||
|
||||
### 3. 查询优化
|
||||
|
||||
**使用时间范围:**
|
||||
```logql
|
||||
{job="functional-scaffold-app"}[5m] # 最近 5 分钟
|
||||
```
|
||||
|
||||
**限制返回行数:**
|
||||
```logql
|
||||
{job="functional-scaffold-app"} | limit 100
|
||||
```
|
||||
|
||||
**使用聚合减少数据量:**
|
||||
```logql
|
||||
sum by (level) (count_over_time({job="functional-scaffold-app"}[5m]))
|
||||
```
|
||||
|
||||
## 参考资料
|
||||
|
||||
- [Loki 官方文档](https://grafana.com/docs/loki/latest/)
|
||||
- [LogQL 查询语言](https://grafana.com/docs/loki/latest/logql/)
|
||||
- [Promtail 配置](https://grafana.com/docs/loki/latest/clients/promtail/configuration/)
|
||||
- [Grafana Explore](https://grafana.com/docs/grafana/latest/explore/)
|
||||
|
||||
## 总结
|
||||
|
||||
本项目的 Loki 集成提供了:
|
||||
|
||||
✅ **开箱即用** - 无需额外配置即可收集日志
|
||||
✅ **双模式支持** - Docker stdio(默认)和文件收集
|
||||
✅ **自动化配置** - 数据源和仪表板自动加载
|
||||
✅ **结构化日志** - JSON 格式,支持字段提取
|
||||
✅ **高性能** - 低资源占用,快速查询
|
||||
✅ **易于扩展** - 支持自定义标签和过滤规则
|
||||
|
||||
如有问题,请参考故障排查章节或查阅官方文档。
|
||||
237
docs/loki-quick-reference.md
Normal file
237
docs/loki-quick-reference.md
Normal file
@@ -0,0 +1,237 @@
|
||||
# Loki 快速参考
|
||||
|
||||
## 常用命令
|
||||
|
||||
### 服务管理
|
||||
|
||||
```bash
|
||||
# 启动所有服务
|
||||
cd deployment && docker-compose up -d
|
||||
|
||||
# 查看服务状态
|
||||
docker-compose ps
|
||||
|
||||
# 查看日志
|
||||
docker-compose logs -f loki
|
||||
docker-compose logs -f promtail
|
||||
|
||||
# 重启服务
|
||||
docker-compose restart loki promtail
|
||||
|
||||
# 停止服务
|
||||
docker-compose down
|
||||
```
|
||||
|
||||
### 健康检查
|
||||
|
||||
```bash
|
||||
# Loki
|
||||
curl http://localhost:3100/ready
|
||||
|
||||
# Promtail
|
||||
curl http://localhost:9080/ready
|
||||
|
||||
# 验证脚本
|
||||
./scripts/verify_loki.sh
|
||||
```
|
||||
|
||||
## 常用 LogQL 查询
|
||||
|
||||
### 基础查询
|
||||
|
||||
```logql
|
||||
# 所有日志
|
||||
{job="functional-scaffold-app"}
|
||||
|
||||
# 错误日志
|
||||
{job="functional-scaffold-app", level="ERROR"}
|
||||
|
||||
# 特定时间范围
|
||||
{job="functional-scaffold-app"}[5m]
|
||||
```
|
||||
|
||||
### 文本过滤
|
||||
|
||||
```logql
|
||||
# 包含文本
|
||||
{job="functional-scaffold-app"} |= "error"
|
||||
|
||||
# 不包含文本
|
||||
{job="functional-scaffold-app"} != "healthz"
|
||||
|
||||
# 正则匹配
|
||||
{job="functional-scaffold-app"} |~ "error|exception"
|
||||
```
|
||||
|
||||
### JSON 提取
|
||||
|
||||
```logql
|
||||
# 提取 request_id
|
||||
{job="functional-scaffold-app"} | json | request_id != ""
|
||||
|
||||
# 按 request_id 过滤
|
||||
{job="functional-scaffold-app"} | json | request_id = "abc123"
|
||||
```
|
||||
|
||||
### 聚合统计
|
||||
|
||||
```logql
|
||||
# 日志数量
|
||||
count_over_time({job="functional-scaffold-app"}[5m])
|
||||
|
||||
# 按级别统计
|
||||
sum by (level) (count_over_time({job="functional-scaffold-app"}[5m]))
|
||||
|
||||
# 错误率
|
||||
sum(rate({job="functional-scaffold-app", level="ERROR"}[5m]))
|
||||
/
|
||||
sum(rate({job="functional-scaffold-app"}[5m]))
|
||||
```
|
||||
|
||||
## API 查询
|
||||
|
||||
### 查询日志
|
||||
|
||||
```bash
|
||||
# 查询最近的日志
|
||||
curl -G -s "http://localhost:3100/loki/api/v1/query_range" \
|
||||
--data-urlencode 'query={job="functional-scaffold-app"}' \
|
||||
--data-urlencode 'limit=10' \
|
||||
| jq '.data.result'
|
||||
|
||||
# 查询错误日志
|
||||
curl -G -s "http://localhost:3100/loki/api/v1/query_range" \
|
||||
--data-urlencode 'query={job="functional-scaffold-app", level="ERROR"}' \
|
||||
| jq '.data.result'
|
||||
```
|
||||
|
||||
### 查询标签
|
||||
|
||||
```bash
|
||||
# 查询所有 job 标签值
|
||||
curl -s "http://localhost:3100/loki/api/v1/label/job/values" | jq
|
||||
|
||||
# 查询所有 level 标签值
|
||||
curl -s "http://localhost:3100/loki/api/v1/label/level/values" | jq
|
||||
```
|
||||
|
||||
## 配置切换
|
||||
|
||||
### 启用文件日志
|
||||
|
||||
编辑 `deployment/docker-compose.yml`:
|
||||
|
||||
```yaml
|
||||
environment:
|
||||
- LOG_FILE_ENABLED=true
|
||||
```
|
||||
|
||||
重启服务:
|
||||
|
||||
```bash
|
||||
docker-compose up -d app
|
||||
```
|
||||
|
||||
### 调整日志级别
|
||||
|
||||
编辑 `deployment/docker-compose.yml`:
|
||||
|
||||
```yaml
|
||||
environment:
|
||||
- LOG_LEVEL=WARNING # DEBUG, INFO, WARNING, ERROR, CRITICAL
|
||||
```
|
||||
|
||||
### 修改保留期
|
||||
|
||||
编辑 `monitoring/loki.yaml`:
|
||||
|
||||
```yaml
|
||||
limits_config:
|
||||
retention_period: 72h # 改为 3 天
|
||||
```
|
||||
|
||||
重启 Loki:
|
||||
|
||||
```bash
|
||||
docker-compose restart loki
|
||||
```
|
||||
|
||||
## 访问地址
|
||||
|
||||
| 服务 | 地址 | 凭据 |
|
||||
|------|------|------|
|
||||
| Grafana | http://localhost:3000 | admin/admin |
|
||||
| Loki API | http://localhost:3100 | - |
|
||||
| Promtail | http://localhost:9080 | - |
|
||||
| Prometheus | http://localhost:9090 | - |
|
||||
| App | http://localhost:8111 | - |
|
||||
|
||||
## 故障排查
|
||||
|
||||
### 看不到日志
|
||||
|
||||
```bash
|
||||
# 1. 检查 Promtail 日志
|
||||
docker-compose logs promtail | tail -50
|
||||
|
||||
# 2. 检查容器标签
|
||||
docker inspect deployment-app-1 | grep -A 5 Labels
|
||||
|
||||
# 3. 查询 Loki
|
||||
curl -s "http://localhost:3100/loki/api/v1/label/job/values" | jq
|
||||
```
|
||||
|
||||
### Docker socket 权限
|
||||
|
||||
```bash
|
||||
sudo chmod 666 /var/run/docker.sock
|
||||
```
|
||||
|
||||
### 清理日志数据
|
||||
|
||||
```bash
|
||||
# 停止 Loki
|
||||
docker-compose stop loki
|
||||
|
||||
# 清理数据
|
||||
docker-compose exec loki rm -rf /loki/chunks/*
|
||||
|
||||
# 重启 Loki
|
||||
docker-compose start loki
|
||||
```
|
||||
|
||||
## 性能优化
|
||||
|
||||
### 减少日志量
|
||||
|
||||
```yaml
|
||||
# docker-compose.yml
|
||||
environment:
|
||||
- LOG_LEVEL=WARNING # 只记录警告和错误
|
||||
```
|
||||
|
||||
### 过滤健康检查日志
|
||||
|
||||
编辑 `monitoring/promtail.yaml`:
|
||||
|
||||
```yaml
|
||||
pipeline_stages:
|
||||
- drop:
|
||||
expression: ".*healthz.*"
|
||||
```
|
||||
|
||||
### 限制查询范围
|
||||
|
||||
```logql
|
||||
# 好:限制时间范围
|
||||
{job="functional-scaffold-app"}[5m]
|
||||
|
||||
# 差:查询所有时间
|
||||
{job="functional-scaffold-app"}
|
||||
```
|
||||
|
||||
## 文档链接
|
||||
|
||||
- 完整文档: `docs/loki-integration.md`
|
||||
- 实施总结: `docs/loki-implementation-summary.md`
|
||||
- 验证脚本: `scripts/verify_loki.sh`
|
||||
126
docs/metrics-filtering-changelog.md
Normal file
126
docs/metrics-filtering-changelog.md
Normal file
@@ -0,0 +1,126 @@
|
||||
# 指标过滤和路径规范化
|
||||
|
||||
## 变更说明
|
||||
|
||||
本次修改优化了 HTTP 请求指标的记录逻辑,主要包括两个方面:
|
||||
|
||||
### 1. 跳过健康检查端点
|
||||
|
||||
以下端点不再记录到 Prometheus 指标中:
|
||||
- `/metrics` - 指标端点本身
|
||||
- `/healthz` - 存活检查
|
||||
- `/readyz` - 就绪检查
|
||||
|
||||
**原因**:这些端点通常被频繁调用(如 Kubernetes 健康检查、Prometheus 抓取),但对业务监控意义不大,会产生大量噪音数据。
|
||||
|
||||
### 2. 路径参数规范化
|
||||
|
||||
带有路径参数的端点会被规范化为模板形式:
|
||||
|
||||
| 原始路径 | 规范化后 |
|
||||
|---------|---------|
|
||||
| `GET /jobs/a1b2c3d4e5f6` | `GET /jobs/{job_id}` |
|
||||
| `GET /jobs/xyz123456789` | `GET /jobs/{job_id}` |
|
||||
|
||||
**原因**:避免因为不同的路径参数值产生过多的指标标签,导致指标基数爆炸(cardinality explosion),影响 Prometheus 性能。
|
||||
|
||||
## 实现细节
|
||||
|
||||
### 代码修改
|
||||
|
||||
**文件:`src/functional_scaffold/main.py`**
|
||||
|
||||
1. 添加 `normalize_path()` 函数:
|
||||
```python
|
||||
def normalize_path(path: str) -> str:
|
||||
"""规范化路径,将路径参数替换为模板形式"""
|
||||
if path.startswith("/jobs/") and len(path) > 6:
|
||||
return "/jobs/{job_id}"
|
||||
return path
|
||||
```
|
||||
|
||||
2. 修改 `track_metrics` 中间件:
|
||||
```python
|
||||
# 跳过不需要记录指标的端点
|
||||
skip_paths = {"/metrics", "/readyz", "/healthz"}
|
||||
if request.url.path in skip_paths:
|
||||
return await call_next(request)
|
||||
|
||||
# 使用规范化后的路径记录指标
|
||||
normalized_path = normalize_path(request.url.path)
|
||||
incr("http_requests_total",
|
||||
{"method": request.method, "endpoint": normalized_path, "status": status})
|
||||
```
|
||||
|
||||
### 测试覆盖
|
||||
|
||||
**文件:`tests/test_middleware.py`**
|
||||
|
||||
新增 6 个测试用例:
|
||||
- `test_normalize_jobs_path` - 测试任务路径规范化
|
||||
- `test_normalize_other_paths` - 测试其他路径保持不变
|
||||
- `test_normalize_jobs_root` - 测试 /jobs 根路径
|
||||
- `test_skip_health_endpoints` - 测试跳过健康检查端点
|
||||
- `test_record_normal_endpoints` - 测试记录普通端点
|
||||
- `test_normalize_job_path` - 测试规范化任务路径的集成测试
|
||||
|
||||
所有测试通过:✅ 56/56 passed
|
||||
|
||||
## 验证方法
|
||||
|
||||
### 手动测试
|
||||
|
||||
使用提供的测试脚本:
|
||||
```bash
|
||||
./scripts/test_metrics_filtering.sh
|
||||
```
|
||||
|
||||
### 预期结果
|
||||
|
||||
访问 `/metrics` 端点后,应该看到:
|
||||
|
||||
✅ **应该出现的指标:**
|
||||
```
|
||||
http_requests_total{method="POST",endpoint="/invoke",status="success"} 1
|
||||
http_requests_total{method="GET",endpoint="/jobs/{job_id}",status="error"} 2
|
||||
```
|
||||
|
||||
❌ **不应该出现的指标:**
|
||||
```
|
||||
http_requests_total{method="GET",endpoint="/healthz",...}
|
||||
http_requests_total{method="GET",endpoint="/readyz",...}
|
||||
http_requests_total{method="GET",endpoint="/metrics",...}
|
||||
http_requests_total{method="GET",endpoint="/jobs/a1b2c3d4e5f6",...}
|
||||
```
|
||||
|
||||
## 扩展性
|
||||
|
||||
如果需要添加更多路径规范化规则,只需修改 `normalize_path()` 函数:
|
||||
|
||||
```python
|
||||
def normalize_path(path: str) -> str:
|
||||
"""规范化路径,将路径参数替换为模板形式"""
|
||||
# 任务路径
|
||||
if path.startswith("/jobs/") and len(path) > 6:
|
||||
return "/jobs/{job_id}"
|
||||
|
||||
# 用户路径(示例)
|
||||
if path.startswith("/users/") and len(path) > 7:
|
||||
return "/users/{user_id}"
|
||||
|
||||
# 其他路径保持不变
|
||||
return path
|
||||
```
|
||||
|
||||
## 影响范围
|
||||
|
||||
- ✅ 不影响现有功能
|
||||
- ✅ 不影响 API 行为
|
||||
- ✅ 仅影响指标记录逻辑
|
||||
- ✅ 向后兼容
|
||||
- ✅ 所有测试通过
|
||||
|
||||
## 相关文档
|
||||
|
||||
- [监控指南](../docs/monitoring.md) - 已更新指标说明
|
||||
- [测试脚本](../scripts/test_metrics_filtering.sh) - 手动验证脚本
|
||||
@@ -61,6 +61,19 @@ docker-compose up -d redis prometheus grafana
|
||||
| `http_request_duration_seconds` | Histogram | method, endpoint | HTTP 请求延迟分布 |
|
||||
| `http_requests_in_progress` | Gauge | - | 当前进行中的请求数 |
|
||||
|
||||
**注意事项:**
|
||||
|
||||
1. **跳过的端点**:以下端点不会被记录到指标中,以减少噪音:
|
||||
- `/metrics` - 指标端点本身
|
||||
- `/healthz` - 存活检查
|
||||
- `/readyz` - 就绪检查
|
||||
|
||||
2. **路径规范化**:带有路径参数的端点会被规范化为模板形式:
|
||||
- `GET /jobs/a1b2c3d4e5f6` → `GET /jobs/{job_id}`
|
||||
- `GET /jobs/xyz123456789` → `GET /jobs/{job_id}`
|
||||
|
||||
这样可以避免因为不同的路径参数值产生过多的指标标签,导致指标基数爆炸。
|
||||
|
||||
### 算法执行指标
|
||||
|
||||
| 指标 | 类型 | 标签 | 描述 |
|
||||
|
||||
258
monitoring/README.md
Normal file
258
monitoring/README.md
Normal file
@@ -0,0 +1,258 @@
|
||||
# Monitoring 目录说明
|
||||
|
||||
本目录包含所有监控和日志收集相关的配置文件。
|
||||
|
||||
## 目录结构
|
||||
|
||||
```
|
||||
monitoring/
|
||||
├── alerts/ # Prometheus 告警规则
|
||||
│ └── rules.yaml # 告警规则配置
|
||||
├── grafana/ # Grafana 配置
|
||||
│ ├── datasources/ # 数据源自动配置
|
||||
│ │ ├── prometheus.yaml # Prometheus 数据源
|
||||
│ │ └── loki.yaml # Loki 数据源
|
||||
│ └── dashboards/ # 仪表板自动加载
|
||||
│ ├── provider.yaml # Dashboard provider 配置
|
||||
│ ├── dashboard.json # 指标监控仪表板
|
||||
│ └── logs-dashboard.json # 日志监控仪表板
|
||||
├── loki.yaml # Loki 日志存储配置
|
||||
├── promtail.yaml # Promtail 日志采集配置
|
||||
└── prometheus.yml # Prometheus 指标收集配置
|
||||
```
|
||||
|
||||
## 配置文件说明
|
||||
|
||||
### Prometheus 配置
|
||||
|
||||
**文件**: `prometheus.yml`
|
||||
|
||||
Prometheus 指标收集配置,包括:
|
||||
- 抓取间隔: 5 秒
|
||||
- 目标: app 服务的 `/metrics` 端点
|
||||
- 告警规则: 从 `alerts/` 目录加载
|
||||
|
||||
### Loki 配置
|
||||
|
||||
**文件**: `loki.yaml`
|
||||
|
||||
Loki 日志存储配置,包括:
|
||||
- 存储方式: 本地文件系统
|
||||
- 日志保留期: 7 天
|
||||
- 摄入速率限制: 10MB/s
|
||||
- 自动压缩和清理
|
||||
|
||||
**关键配置**:
|
||||
```yaml
|
||||
limits_config:
|
||||
retention_period: 168h # 7 天
|
||||
ingestion_rate_mb: 10 # 10MB/s
|
||||
```
|
||||
|
||||
### Promtail 配置
|
||||
|
||||
**文件**: `promtail.yaml`
|
||||
|
||||
Promtail 日志采集配置,支持两种模式:
|
||||
|
||||
**模式 1: Docker stdio 收集(默认)**
|
||||
- 通过 Docker API 自动发现容器
|
||||
- 过滤带有 `logging=promtail` 标签的容器
|
||||
- 自动解析 JSON 日志
|
||||
|
||||
**模式 2: 文件收集(备用)**
|
||||
- 从 `/var/log/app/*.log` 读取日志文件
|
||||
- 支持日志轮转
|
||||
- 需要设置 `LOG_FILE_ENABLED=true`
|
||||
|
||||
### Grafana Provisioning
|
||||
|
||||
**数据源** (`grafana/datasources/`)
|
||||
|
||||
自动配置 Grafana 数据源:
|
||||
- `prometheus.yaml`: Prometheus 数据源(默认)
|
||||
- `loki.yaml`: Loki 数据源
|
||||
|
||||
**仪表板** (`grafana/dashboards/`)
|
||||
|
||||
自动加载 Grafana 仪表板:
|
||||
- `provider.yaml`: Dashboard provider 配置
|
||||
- `dashboard.json`: 指标监控仪表板(HTTP 请求、算法执行等)
|
||||
- `logs-dashboard.json`: 日志监控仪表板(日志流、错误日志等)
|
||||
|
||||
### 告警规则
|
||||
|
||||
**文件**: `alerts/rules.yaml`
|
||||
|
||||
Prometheus 告警规则,包括:
|
||||
- 高错误率告警
|
||||
- 高延迟告警
|
||||
- 服务不可用告警
|
||||
|
||||
## 修改配置
|
||||
|
||||
### 调整日志保留期
|
||||
|
||||
编辑 `loki.yaml`:
|
||||
|
||||
```yaml
|
||||
limits_config:
|
||||
retention_period: 72h # 改为 3 天
|
||||
```
|
||||
|
||||
重启 Loki:
|
||||
|
||||
```bash
|
||||
cd deployment
|
||||
docker-compose restart loki
|
||||
```
|
||||
|
||||
### 调整指标抓取间隔
|
||||
|
||||
编辑 `prometheus.yml`:
|
||||
|
||||
```yaml
|
||||
global:
|
||||
scrape_interval: 10s # 改为 10 秒
|
||||
```
|
||||
|
||||
重启 Prometheus:
|
||||
|
||||
```bash
|
||||
cd deployment
|
||||
docker-compose restart prometheus
|
||||
```
|
||||
|
||||
### 添加新的告警规则
|
||||
|
||||
编辑 `alerts/rules.yaml`,添加新规则:
|
||||
|
||||
```yaml
|
||||
groups:
|
||||
- name: my_alerts
|
||||
rules:
|
||||
- alert: MyAlert
|
||||
expr: my_metric > 100
|
||||
for: 5m
|
||||
labels:
|
||||
severity: warning
|
||||
annotations:
|
||||
summary: "我的告警"
|
||||
```
|
||||
|
||||
重启 Prometheus:
|
||||
|
||||
```bash
|
||||
cd deployment
|
||||
docker-compose restart prometheus
|
||||
```
|
||||
|
||||
### 添加新的仪表板
|
||||
|
||||
1. 在 Grafana UI 中创建仪表板
|
||||
2. 导出为 JSON
|
||||
3. 保存到 `grafana/dashboards/my-dashboard.json`
|
||||
4. 重启 Grafana(或等待自动重载)
|
||||
|
||||
```bash
|
||||
cd deployment
|
||||
docker-compose restart grafana
|
||||
```
|
||||
|
||||
## 验证配置
|
||||
|
||||
### 检查 Prometheus 配置
|
||||
|
||||
```bash
|
||||
# 访问 Prometheus UI
|
||||
open http://localhost:9090
|
||||
|
||||
# 检查目标状态
|
||||
open http://localhost:9090/targets
|
||||
|
||||
# 检查告警规则
|
||||
open http://localhost:9090/alerts
|
||||
```
|
||||
|
||||
### 检查 Loki 配置
|
||||
|
||||
```bash
|
||||
# 检查 Loki 健康状态
|
||||
curl http://localhost:3100/ready
|
||||
|
||||
# 查询标签
|
||||
curl -s "http://localhost:3100/loki/api/v1/label/job/values" | jq
|
||||
```
|
||||
|
||||
### 检查 Grafana 配置
|
||||
|
||||
```bash
|
||||
# 访问 Grafana UI
|
||||
open http://localhost:3000
|
||||
|
||||
# 检查数据源
|
||||
curl -s -u admin:admin http://localhost:3000/api/datasources | jq
|
||||
|
||||
# 检查仪表板
|
||||
curl -s -u admin:admin http://localhost:3000/api/search | jq
|
||||
```
|
||||
|
||||
## 故障排查
|
||||
|
||||
### Prometheus 无法抓取指标
|
||||
|
||||
1. 检查 app 服务是否运行: `docker-compose ps app`
|
||||
2. 检查 metrics 端点: `curl http://localhost:8111/metrics`
|
||||
3. 查看 Prometheus 日志: `docker-compose logs prometheus`
|
||||
|
||||
### Loki 无法接收日志
|
||||
|
||||
1. 检查 Promtail 是否运行: `docker-compose ps promtail`
|
||||
2. 查看 Promtail 日志: `docker-compose logs promtail`
|
||||
3. 检查容器标签: `docker inspect <container> | grep Labels`
|
||||
|
||||
### Grafana 数据源未加载
|
||||
|
||||
1. 检查 provisioning 目录挂载: `docker-compose config | grep grafana -A 10`
|
||||
2. 查看 Grafana 日志: `docker-compose logs grafana`
|
||||
3. 手动重启 Grafana: `docker-compose restart grafana`
|
||||
|
||||
## 相关文档
|
||||
|
||||
- [Loki 集成文档](../docs/loki-integration.md) - 完整的 Loki 使用文档
|
||||
- [Loki 快速参考](../docs/loki-quick-reference.md) - 常用命令和查询
|
||||
- [Loki 实施总结](../docs/loki-implementation-summary.md) - 实施细节和架构说明
|
||||
- [Prometheus 官方文档](https://prometheus.io/docs/)
|
||||
- [Loki 官方文档](https://grafana.com/docs/loki/latest/)
|
||||
- [Grafana 官方文档](https://grafana.com/docs/grafana/latest/)
|
||||
|
||||
## 性能建议
|
||||
|
||||
### 日志量控制
|
||||
|
||||
- 调整日志级别为 WARNING 或 ERROR
|
||||
- 过滤掉不必要的日志(如健康检查)
|
||||
- 减少日志保留期
|
||||
|
||||
### 指标优化
|
||||
|
||||
- 增加抓取间隔(如 15s 或 30s)
|
||||
- 减少指标基数(避免高基数标签)
|
||||
- 定期清理旧数据
|
||||
|
||||
### 存储优化
|
||||
|
||||
- 监控磁盘使用: `docker-compose exec loki du -sh /loki`
|
||||
- 定期备份重要数据
|
||||
- 考虑使用对象存储(S3/OSS)作为后端
|
||||
|
||||
## 总结
|
||||
|
||||
本目录包含完整的监控和日志收集配置:
|
||||
|
||||
✅ **Prometheus** - 指标收集和告警
|
||||
✅ **Loki** - 日志存储和查询
|
||||
✅ **Promtail** - 日志采集
|
||||
✅ **Grafana** - 可视化和仪表板
|
||||
|
||||
所有配置都支持自动加载,无需手动配置。
|
||||
292
monitoring/grafana/dashboards/logs-dashboard.json
Normal file
292
monitoring/grafana/dashboards/logs-dashboard.json
Normal file
@@ -0,0 +1,292 @@
|
||||
{
|
||||
"annotations": {
|
||||
"list": [
|
||||
{
|
||||
"builtIn": 1,
|
||||
"datasource": {
|
||||
"type": "grafana",
|
||||
"uid": "-- Grafana --"
|
||||
},
|
||||
"enable": true,
|
||||
"hide": true,
|
||||
"iconColor": "rgba(0, 211, 255, 1)",
|
||||
"name": "Annotations & Alerts",
|
||||
"type": "dashboard"
|
||||
}
|
||||
]
|
||||
},
|
||||
"editable": true,
|
||||
"fiscalYearStartMonth": 0,
|
||||
"graphTooltip": 0,
|
||||
"id": null,
|
||||
"links": [],
|
||||
"liveNow": false,
|
||||
"panels": [
|
||||
{
|
||||
"datasource": {
|
||||
"type": "loki",
|
||||
"uid": "Loki"
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 10,
|
||||
"w": 24,
|
||||
"x": 0,
|
||||
"y": 0
|
||||
},
|
||||
"id": 1,
|
||||
"options": {
|
||||
"dedupStrategy": "none",
|
||||
"enableLogDetails": true,
|
||||
"prettifyLogMessage": false,
|
||||
"showCommonLabels": false,
|
||||
"showLabels": false,
|
||||
"showTime": true,
|
||||
"sortOrder": "Descending",
|
||||
"wrapLogMessage": false
|
||||
},
|
||||
"targets": [
|
||||
{
|
||||
"datasource": {
|
||||
"type": "loki",
|
||||
"uid": "Loki"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "{job=\"functional-scaffold-app\"}",
|
||||
"queryType": "range",
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"title": "日志流 (实时)",
|
||||
"type": "logs"
|
||||
},
|
||||
{
|
||||
"datasource": {
|
||||
"type": "loki",
|
||||
"uid": "Loki"
|
||||
},
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": {
|
||||
"mode": "palette-classic"
|
||||
},
|
||||
"custom": {
|
||||
"axisCenteredZero": false,
|
||||
"axisColorMode": "text",
|
||||
"axisLabel": "",
|
||||
"axisPlacement": "auto",
|
||||
"barAlignment": 0,
|
||||
"drawStyle": "line",
|
||||
"fillOpacity": 10,
|
||||
"gradientMode": "none",
|
||||
"hideFrom": {
|
||||
"tooltip": false,
|
||||
"viz": false,
|
||||
"legend": false
|
||||
},
|
||||
"lineInterpolation": "linear",
|
||||
"lineWidth": 1,
|
||||
"pointSize": 5,
|
||||
"scaleDistribution": {
|
||||
"type": "linear"
|
||||
},
|
||||
"showPoints": "never",
|
||||
"spanNulls": false,
|
||||
"stacking": {
|
||||
"group": "A",
|
||||
"mode": "none"
|
||||
},
|
||||
"thresholdsStyle": {
|
||||
"mode": "off"
|
||||
}
|
||||
},
|
||||
"mappings": [],
|
||||
"thresholds": {
|
||||
"mode": "absolute",
|
||||
"steps": [
|
||||
{
|
||||
"color": "green",
|
||||
"value": null
|
||||
}
|
||||
]
|
||||
},
|
||||
"unit": "short"
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 8,
|
||||
"w": 12,
|
||||
"x": 0,
|
||||
"y": 10
|
||||
},
|
||||
"id": 2,
|
||||
"options": {
|
||||
"legend": {
|
||||
"calcs": [],
|
||||
"displayMode": "list",
|
||||
"placement": "bottom",
|
||||
"showLegend": true
|
||||
},
|
||||
"tooltip": {
|
||||
"mode": "single",
|
||||
"sort": "none"
|
||||
}
|
||||
},
|
||||
"targets": [
|
||||
{
|
||||
"datasource": {
|
||||
"type": "loki",
|
||||
"uid": "Loki"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "sum by (level) (count_over_time({job=\"functional-scaffold-app\"}[1m]))",
|
||||
"queryType": "range",
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"title": "日志量趋势(按级别)",
|
||||
"type": "timeseries"
|
||||
},
|
||||
{
|
||||
"datasource": {
|
||||
"type": "loki",
|
||||
"uid": "Loki"
|
||||
},
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": {
|
||||
"mode": "thresholds"
|
||||
},
|
||||
"mappings": [],
|
||||
"thresholds": {
|
||||
"mode": "absolute",
|
||||
"steps": [
|
||||
{
|
||||
"color": "green",
|
||||
"value": null
|
||||
},
|
||||
{
|
||||
"color": "yellow",
|
||||
"value": 10
|
||||
},
|
||||
{
|
||||
"color": "red",
|
||||
"value": 50
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 8,
|
||||
"w": 12,
|
||||
"x": 12,
|
||||
"y": 10
|
||||
},
|
||||
"id": 3,
|
||||
"options": {
|
||||
"orientation": "auto",
|
||||
"reduceOptions": {
|
||||
"values": false,
|
||||
"calcs": [
|
||||
"lastNotNull"
|
||||
],
|
||||
"fields": ""
|
||||
},
|
||||
"showThresholdLabels": false,
|
||||
"showThresholdMarkers": true
|
||||
},
|
||||
"pluginVersion": "9.5.3",
|
||||
"targets": [
|
||||
{
|
||||
"datasource": {
|
||||
"type": "loki",
|
||||
"uid": "Loki"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "sum by (level) (count_over_time({job=\"functional-scaffold-app\"}[$__range]))",
|
||||
"queryType": "range",
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"title": "日志级别分布",
|
||||
"type": "gauge"
|
||||
},
|
||||
{
|
||||
"datasource": {
|
||||
"type": "loki",
|
||||
"uid": "Loki"
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 10,
|
||||
"w": 24,
|
||||
"x": 0,
|
||||
"y": 18
|
||||
},
|
||||
"id": 4,
|
||||
"options": {
|
||||
"dedupStrategy": "none",
|
||||
"enableLogDetails": true,
|
||||
"prettifyLogMessage": false,
|
||||
"showCommonLabels": false,
|
||||
"showLabels": false,
|
||||
"showTime": true,
|
||||
"sortOrder": "Descending",
|
||||
"wrapLogMessage": false
|
||||
},
|
||||
"targets": [
|
||||
{
|
||||
"datasource": {
|
||||
"type": "loki",
|
||||
"uid": "Loki"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "{job=\"functional-scaffold-app\", level=\"ERROR\"}",
|
||||
"queryType": "range",
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"title": "错误日志",
|
||||
"type": "logs"
|
||||
}
|
||||
],
|
||||
"refresh": "5s",
|
||||
"schemaVersion": 38,
|
||||
"style": "dark",
|
||||
"tags": ["logs", "loki"],
|
||||
"templating": {
|
||||
"list": [
|
||||
{
|
||||
"current": {
|
||||
"selected": false,
|
||||
"text": "",
|
||||
"value": ""
|
||||
},
|
||||
"hide": 0,
|
||||
"label": "Request ID",
|
||||
"name": "request_id",
|
||||
"options": [
|
||||
{
|
||||
"selected": true,
|
||||
"text": "",
|
||||
"value": ""
|
||||
}
|
||||
],
|
||||
"query": "",
|
||||
"skipUrlSync": false,
|
||||
"type": "textbox"
|
||||
}
|
||||
]
|
||||
},
|
||||
"time": {
|
||||
"from": "now-15m",
|
||||
"to": "now"
|
||||
},
|
||||
"timepicker": {},
|
||||
"timezone": "",
|
||||
"title": "日志监控",
|
||||
"uid": "logs-dashboard",
|
||||
"version": 0,
|
||||
"weekStart": ""
|
||||
}
|
||||
13
monitoring/grafana/dashboards/provider.yaml
Normal file
13
monitoring/grafana/dashboards/provider.yaml
Normal file
@@ -0,0 +1,13 @@
|
||||
apiVersion: 1
|
||||
|
||||
providers:
|
||||
- name: 'default'
|
||||
orgId: 1
|
||||
folder: ''
|
||||
type: file
|
||||
disableDeletion: false
|
||||
updateIntervalSeconds: 10
|
||||
allowUiUpdates: true
|
||||
options:
|
||||
path: /etc/grafana/provisioning/dashboards
|
||||
foldersFromFilesStructure: true
|
||||
11
monitoring/grafana/datasources/loki.yaml
Normal file
11
monitoring/grafana/datasources/loki.yaml
Normal file
@@ -0,0 +1,11 @@
|
||||
apiVersion: 1
|
||||
|
||||
datasources:
|
||||
- name: Loki
|
||||
type: loki
|
||||
access: proxy
|
||||
url: http://loki:3100
|
||||
isDefault: false
|
||||
editable: false
|
||||
jsonData:
|
||||
maxLines: 1000
|
||||
11
monitoring/grafana/datasources/prometheus.yaml
Normal file
11
monitoring/grafana/datasources/prometheus.yaml
Normal file
@@ -0,0 +1,11 @@
|
||||
apiVersion: 1
|
||||
|
||||
datasources:
|
||||
- name: Prometheus
|
||||
type: prometheus
|
||||
access: proxy
|
||||
url: http://prometheus:9090
|
||||
isDefault: true
|
||||
editable: false
|
||||
jsonData:
|
||||
timeInterval: "5s"
|
||||
39
monitoring/loki.yaml
Normal file
39
monitoring/loki.yaml
Normal file
@@ -0,0 +1,39 @@
|
||||
auth_enabled: false
|
||||
|
||||
server:
|
||||
http_listen_port: 3100
|
||||
grpc_listen_port: 9096
|
||||
|
||||
common:
|
||||
path_prefix: /loki
|
||||
storage:
|
||||
filesystem:
|
||||
chunks_directory: /loki/chunks
|
||||
rules_directory: /loki/rules
|
||||
replication_factor: 1
|
||||
ring:
|
||||
instance_addr: 127.0.0.1
|
||||
kvstore:
|
||||
store: inmemory
|
||||
|
||||
schema_config:
|
||||
configs:
|
||||
- from: 2020-10-24
|
||||
store: boltdb-shipper
|
||||
object_store: filesystem
|
||||
schema: v11
|
||||
index:
|
||||
prefix: index_
|
||||
period: 24h
|
||||
|
||||
limits_config:
|
||||
retention_period: 168h # 7 天
|
||||
ingestion_rate_mb: 10
|
||||
ingestion_burst_size_mb: 20
|
||||
|
||||
compactor:
|
||||
working_directory: /loki/compactor
|
||||
shared_store: filesystem
|
||||
compaction_interval: 10m
|
||||
retention_enabled: true
|
||||
retention_delete_delay: 2h
|
||||
71
monitoring/promtail.yaml
Normal file
71
monitoring/promtail.yaml
Normal file
@@ -0,0 +1,71 @@
|
||||
server:
|
||||
http_listen_port: 9080
|
||||
grpc_listen_port: 0
|
||||
|
||||
positions:
|
||||
filename: /tmp/positions.yaml
|
||||
|
||||
clients:
|
||||
- url: http://loki:3100/loki/api/v1/push
|
||||
|
||||
scrape_configs:
|
||||
# 场景 1: Docker stdio 收集(主要方式)
|
||||
- job_name: docker
|
||||
docker_sd_configs:
|
||||
- host: unix:///var/run/docker.sock
|
||||
refresh_interval: 5s
|
||||
filters:
|
||||
- name: label
|
||||
values: ["logging=promtail"]
|
||||
relabel_configs:
|
||||
- source_labels: ['__meta_docker_container_name']
|
||||
regex: '/(.*)'
|
||||
target_label: 'container'
|
||||
- source_labels: ['__meta_docker_container_label_logging_jobname']
|
||||
target_label: 'job'
|
||||
- source_labels: ['__meta_docker_container_id']
|
||||
target_label: '__path__'
|
||||
replacement: '/var/lib/docker/containers/$1/*.log'
|
||||
pipeline_stages:
|
||||
- json:
|
||||
expressions:
|
||||
log: log
|
||||
stream: stream
|
||||
time: time
|
||||
- json:
|
||||
source: log
|
||||
expressions:
|
||||
level: levelname
|
||||
logger: name
|
||||
message: message
|
||||
request_id: request_id
|
||||
- labels:
|
||||
level:
|
||||
logger:
|
||||
- output:
|
||||
source: log
|
||||
|
||||
# 场景 2: Log 文件收集(备用)
|
||||
- job_name: app_files
|
||||
static_configs:
|
||||
- targets:
|
||||
- localhost
|
||||
labels:
|
||||
job: functional-scaffold-app-files
|
||||
__path__: /var/log/app/*.log
|
||||
pipeline_stages:
|
||||
- json:
|
||||
expressions:
|
||||
timestamp: asctime
|
||||
level: levelname
|
||||
logger: name
|
||||
message: message
|
||||
request_id: request_id
|
||||
- timestamp:
|
||||
source: timestamp
|
||||
format: "2006-01-02 15:04:05,000"
|
||||
- labels:
|
||||
level:
|
||||
logger:
|
||||
- output:
|
||||
source: message
|
||||
@@ -1,114 +0,0 @@
|
||||
#!/bin/bash
|
||||
# 指标方案快速启动脚本
|
||||
|
||||
set -e
|
||||
|
||||
# 颜色定义
|
||||
RED='\033[0;31m'
|
||||
GREEN='\033[0;32m'
|
||||
YELLOW='\033[1;33m'
|
||||
NC='\033[0m' # No Color
|
||||
|
||||
echo "=========================================="
|
||||
echo "FunctionalScaffold 指标方案启动脚本"
|
||||
echo "=========================================="
|
||||
|
||||
# 检查 docker-compose
|
||||
if ! command -v docker-compose &> /dev/null; then
|
||||
echo -e "${RED}错误: docker-compose 未安装${NC}"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# 选择方案
|
||||
echo ""
|
||||
echo "请选择指标方案:"
|
||||
echo "1. Pushgateway(推荐,适合 Serverless)"
|
||||
echo "2. Redis + Exporter(适合高并发)"
|
||||
echo "3. 两者都启动(用于对比测试)"
|
||||
echo ""
|
||||
read -p "输入选项 (1/2/3): " choice
|
||||
|
||||
cd "$(dirname "$0")/../deployment"
|
||||
|
||||
case $choice in
|
||||
1)
|
||||
echo -e "${GREEN}启动 Pushgateway 方案...${NC}"
|
||||
docker-compose up -d redis pushgateway prometheus grafana
|
||||
echo ""
|
||||
echo -e "${GREEN}✓ Pushgateway 方案已启动${NC}"
|
||||
echo ""
|
||||
echo "服务地址:"
|
||||
echo " - Pushgateway: http://localhost:9091"
|
||||
echo " - Prometheus: http://localhost:9090"
|
||||
echo " - Grafana: http://localhost:3000 (admin/admin)"
|
||||
echo ""
|
||||
echo "下一步:"
|
||||
echo " 1. 修改代码导入: from functional_scaffold.core.metrics_pushgateway import ..."
|
||||
echo " 2. 配置环境变量: PUSHGATEWAY_URL=localhost:9091"
|
||||
echo " 3. 启动应用: ./scripts/run_dev.sh"
|
||||
echo " 4. 运行测试: python scripts/test_metrics.py pushgateway"
|
||||
;;
|
||||
2)
|
||||
echo -e "${GREEN}启动 Redis 方案...${NC}"
|
||||
|
||||
# 检查 redis 依赖
|
||||
if ! python -c "import redis" 2>/dev/null; then
|
||||
echo -e "${YELLOW}警告: redis 库未安装${NC}"
|
||||
echo "正在安装 redis..."
|
||||
pip install redis
|
||||
fi
|
||||
|
||||
docker-compose up -d redis redis-exporter prometheus grafana
|
||||
echo ""
|
||||
echo -e "${GREEN}✓ Redis 方案已启动${NC}"
|
||||
echo ""
|
||||
echo "服务地址:"
|
||||
echo " - Redis: localhost:6379"
|
||||
echo " - Redis Exporter: http://localhost:8001/metrics"
|
||||
echo " - Prometheus: http://localhost:9090"
|
||||
echo " - Grafana: http://localhost:3000 (admin/admin)"
|
||||
echo ""
|
||||
echo "下一步:"
|
||||
echo " 1. 修改代码导入: from functional_scaffold.core.metrics_redis import ..."
|
||||
echo " 2. 配置环境变量: REDIS_HOST=localhost REDIS_PORT=6379"
|
||||
echo " 3. 启动应用: ./scripts/run_dev.sh"
|
||||
echo " 4. 运行测试: python scripts/test_metrics.py redis"
|
||||
;;
|
||||
3)
|
||||
echo -e "${GREEN}启动所有服务...${NC}"
|
||||
|
||||
# 检查 redis 依赖
|
||||
if ! python -c "import redis" 2>/dev/null; then
|
||||
echo -e "${YELLOW}警告: redis 库未安装${NC}"
|
||||
echo "正在安装 redis..."
|
||||
pip install redis
|
||||
fi
|
||||
|
||||
docker-compose up -d
|
||||
echo ""
|
||||
echo -e "${GREEN}✓ 所有服务已启动${NC}"
|
||||
echo ""
|
||||
echo "服务地址:"
|
||||
echo " - 应用: http://localhost:8000"
|
||||
echo " - Pushgateway: http://localhost:9091"
|
||||
echo " - Redis: localhost:6379"
|
||||
echo " - Redis Exporter: http://localhost:8001/metrics"
|
||||
echo " - Prometheus: http://localhost:9090"
|
||||
echo " - Grafana: http://localhost:3000 (admin/admin)"
|
||||
echo ""
|
||||
echo "下一步:"
|
||||
echo " 1. 查看文档: cat docs/metrics-guide.md"
|
||||
echo " 2. 运行测试: python scripts/test_metrics.py"
|
||||
;;
|
||||
*)
|
||||
echo -e "${RED}无效的选项${NC}"
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
|
||||
echo ""
|
||||
echo "=========================================="
|
||||
echo "查看日志: docker-compose logs -f"
|
||||
echo "停止服务: docker-compose down"
|
||||
echo "查看文档: cat ../docs/metrics-guide.md"
|
||||
echo "=========================================="
|
||||
104
scripts/test_concurrency.sh
Executable file
104
scripts/test_concurrency.sh
Executable file
@@ -0,0 +1,104 @@
|
||||
#!/bin/bash
|
||||
# 并发控制测试脚本
|
||||
|
||||
set -e
|
||||
|
||||
BASE_URL="http://localhost:8000"
|
||||
|
||||
echo "=== 异步任务并发控制测试 ==="
|
||||
echo ""
|
||||
|
||||
# 1. 检查服务是否运行
|
||||
echo "1. 检查服务状态..."
|
||||
if ! curl -s "${BASE_URL}/healthz" > /dev/null; then
|
||||
echo "❌ 服务未运行,请先启动服务"
|
||||
exit 1
|
||||
fi
|
||||
echo "✅ 服务正常运行"
|
||||
echo ""
|
||||
|
||||
# 2. 查询初始并发状态
|
||||
echo "2. 查询初始并发状态..."
|
||||
curl -s "${BASE_URL}/jobs/concurrency/status" | jq '.'
|
||||
echo ""
|
||||
|
||||
# 3. 创建多个任务
|
||||
echo "3. 创建 15 个任务(测试并发限制)..."
|
||||
JOB_IDS=()
|
||||
for i in {1..15}; do
|
||||
# 使用较大的质数,让任务执行时间更长
|
||||
NUMBER=$((10000 + i * 1000))
|
||||
RESPONSE=$(curl -s -X POST "${BASE_URL}/jobs" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "{\"algorithm\": \"PrimeChecker\", \"params\": {\"number\": ${NUMBER}}}")
|
||||
|
||||
JOB_ID=$(echo "$RESPONSE" | jq -r '.job_id')
|
||||
JOB_IDS+=("$JOB_ID")
|
||||
echo " 创建任务 ${i}/15: job_id=${JOB_ID}"
|
||||
|
||||
# 短暂延迟,避免请求过快
|
||||
sleep 0.1
|
||||
done
|
||||
echo ""
|
||||
|
||||
# 4. 立即查询并发状态(应该看到多个任务在运行)
|
||||
echo "4. 查询并发状态(任务执行中)..."
|
||||
for i in {1..5}; do
|
||||
echo " 第 ${i} 次查询:"
|
||||
STATUS=$(curl -s "${BASE_URL}/jobs/concurrency/status")
|
||||
echo " $(echo "$STATUS" | jq -c '.')"
|
||||
sleep 1
|
||||
done
|
||||
echo ""
|
||||
|
||||
# 5. 等待所有任务完成
|
||||
echo "5. 等待任务完成..."
|
||||
COMPLETED=0
|
||||
TOTAL=${#JOB_IDS[@]}
|
||||
|
||||
while [ $COMPLETED -lt $TOTAL ]; do
|
||||
COMPLETED=0
|
||||
for JOB_ID in "${JOB_IDS[@]}"; do
|
||||
STATUS=$(curl -s "${BASE_URL}/jobs/${JOB_ID}" | jq -r '.status')
|
||||
if [ "$STATUS" = "completed" ] || [ "$STATUS" = "failed" ]; then
|
||||
((COMPLETED++))
|
||||
fi
|
||||
done
|
||||
|
||||
echo " 进度: ${COMPLETED}/${TOTAL} 任务完成"
|
||||
|
||||
# 显示当前并发状态
|
||||
CONCURRENCY=$(curl -s "${BASE_URL}/jobs/concurrency/status")
|
||||
echo " 并发状态: $(echo "$CONCURRENCY" | jq -c '.')"
|
||||
|
||||
if [ $COMPLETED -lt $TOTAL ]; then
|
||||
sleep 2
|
||||
fi
|
||||
done
|
||||
echo ""
|
||||
|
||||
# 6. 查询最终并发状态
|
||||
echo "6. 查询最终并发状态..."
|
||||
curl -s "${BASE_URL}/jobs/concurrency/status" | jq '.'
|
||||
echo ""
|
||||
|
||||
# 7. 显示任务结果统计
|
||||
echo "7. 任务结果统计..."
|
||||
COMPLETED_COUNT=0
|
||||
FAILED_COUNT=0
|
||||
|
||||
for JOB_ID in "${JOB_IDS[@]}"; do
|
||||
STATUS=$(curl -s "${BASE_URL}/jobs/${JOB_ID}" | jq -r '.status')
|
||||
if [ "$STATUS" = "completed" ]; then
|
||||
((COMPLETED_COUNT++))
|
||||
elif [ "$STATUS" = "failed" ]; then
|
||||
((FAILED_COUNT++))
|
||||
fi
|
||||
done
|
||||
|
||||
echo " 总任务数: ${TOTAL}"
|
||||
echo " 成功: ${COMPLETED_COUNT}"
|
||||
echo " 失败: ${FAILED_COUNT}"
|
||||
echo ""
|
||||
|
||||
echo "=== 测试完成 ==="
|
||||
39
scripts/test_metrics_filtering.sh
Executable file
39
scripts/test_metrics_filtering.sh
Executable file
@@ -0,0 +1,39 @@
|
||||
#!/bin/bash
|
||||
# 测试指标过滤和路径规范化
|
||||
|
||||
echo "=== 测试指标过滤和路径规范化 ==="
|
||||
echo ""
|
||||
|
||||
# 启动服务(假设已经在运行)
|
||||
BASE_URL="http://localhost:8000"
|
||||
|
||||
echo "1. 访问健康检查端点(应该被跳过,不记录指标)"
|
||||
curl -s "$BASE_URL/healthz" > /dev/null
|
||||
curl -s "$BASE_URL/readyz" > /dev/null
|
||||
echo " ✓ 已访问 /healthz 和 /readyz"
|
||||
echo ""
|
||||
|
||||
echo "2. 访问普通端点(应该记录指标)"
|
||||
curl -s -X POST "$BASE_URL/invoke" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"number": 17}' > /dev/null
|
||||
echo " ✓ 已访问 POST /invoke"
|
||||
echo ""
|
||||
|
||||
echo "3. 访问任务端点(应该规范化为 /jobs/{job_id})"
|
||||
curl -s "$BASE_URL/jobs/a1b2c3d4e5f6" > /dev/null
|
||||
curl -s "$BASE_URL/jobs/xyz123456789" > /dev/null
|
||||
echo " ✓ 已访问 GET /jobs/a1b2c3d4e5f6 和 GET /jobs/xyz123456789"
|
||||
echo ""
|
||||
|
||||
echo "4. 查看指标输出"
|
||||
echo " 查找 http_requests_total 指标:"
|
||||
curl -s "$BASE_URL/metrics" | grep 'http_requests_total{' | grep -v '#'
|
||||
echo ""
|
||||
echo " 预期结果:"
|
||||
echo " - 应该看到 endpoint=\"/invoke\" 的记录"
|
||||
echo " - 应该看到 endpoint=\"/jobs/{job_id}\" 的记录(而不是具体的 job_id)"
|
||||
echo " - 不应该看到 endpoint=\"/healthz\" 或 endpoint=\"/readyz\" 的记录"
|
||||
echo " - 不应该看到 endpoint=\"/metrics\" 的记录"
|
||||
echo ""
|
||||
echo "=== 测试完成 ==="
|
||||
100
scripts/verify_loki.sh
Executable file
100
scripts/verify_loki.sh
Executable file
@@ -0,0 +1,100 @@
|
||||
#!/bin/bash
|
||||
# Loki 集成验证脚本
|
||||
|
||||
set -e
|
||||
|
||||
echo "========================================="
|
||||
echo "Loki 日志收集系统验证"
|
||||
echo "========================================="
|
||||
echo ""
|
||||
|
||||
# 颜色定义
|
||||
GREEN='\033[0;32m'
|
||||
RED='\033[0;31m'
|
||||
YELLOW='\033[1;33m'
|
||||
NC='\033[0m' # No Color
|
||||
|
||||
# 检查服务状态
|
||||
echo "1. 检查服务状态..."
|
||||
echo "-------------------"
|
||||
docker-compose ps
|
||||
|
||||
echo ""
|
||||
echo "2. 检查 Loki 健康状态..."
|
||||
echo "-------------------"
|
||||
if curl -s http://localhost:3100/ready | grep -q "ready"; then
|
||||
echo -e "${GREEN}✓ Loki 服务正常${NC}"
|
||||
else
|
||||
echo -e "${RED}✗ Loki 服务异常${NC}"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo ""
|
||||
echo "3. 检查 Promtail 健康状态..."
|
||||
echo "-------------------"
|
||||
if curl -s http://localhost:9080/ready | grep -q "ready"; then
|
||||
echo -e "${GREEN}✓ Promtail 服务正常${NC}"
|
||||
else
|
||||
echo -e "${RED}✗ Promtail 服务异常${NC}"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo ""
|
||||
echo "4. 生成测试日志..."
|
||||
echo "-------------------"
|
||||
curl -X POST http://localhost:8111/invoke \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"algorithm": "PrimeChecker", "params": {"number": 17}}' \
|
||||
-s -o /dev/null -w "HTTP Status: %{http_code}\n"
|
||||
|
||||
echo ""
|
||||
echo "5. 等待日志收集 (5秒)..."
|
||||
sleep 5
|
||||
|
||||
echo ""
|
||||
echo "6. 查询 Loki 日志..."
|
||||
echo "-------------------"
|
||||
LOGS=$(curl -G -s "http://localhost:3100/loki/api/v1/query_range" \
|
||||
--data-urlencode 'query={job="functional-scaffold-app"}' \
|
||||
--data-urlencode 'limit=5')
|
||||
|
||||
if echo "$LOGS" | jq -e '.data.result | length > 0' > /dev/null 2>&1; then
|
||||
echo -e "${GREEN}✓ 成功查询到日志${NC}"
|
||||
echo ""
|
||||
echo "最近的日志条目:"
|
||||
echo "$LOGS" | jq -r '.data.result[0].values[-1][1]' | head -3
|
||||
else
|
||||
echo -e "${YELLOW}⚠ 暂时没有查询到日志,可能需要等待更长时间${NC}"
|
||||
fi
|
||||
|
||||
echo ""
|
||||
echo "7. 检查 Grafana 数据源..."
|
||||
echo "-------------------"
|
||||
DATASOURCES=$(curl -s -u admin:admin http://localhost:3000/api/datasources)
|
||||
if echo "$DATASOURCES" | jq -e '.[] | select(.name == "Loki")' > /dev/null 2>&1; then
|
||||
echo -e "${GREEN}✓ Loki 数据源已配置${NC}"
|
||||
else
|
||||
echo -e "${RED}✗ Loki 数据源未配置${NC}"
|
||||
fi
|
||||
|
||||
if echo "$DATASOURCES" | jq -e '.[] | select(.name == "Prometheus")' > /dev/null 2>&1; then
|
||||
echo -e "${GREEN}✓ Prometheus 数据源已配置${NC}"
|
||||
else
|
||||
echo -e "${RED}✗ Prometheus 数据源未配置${NC}"
|
||||
fi
|
||||
|
||||
echo ""
|
||||
echo "========================================="
|
||||
echo "验证完成!"
|
||||
echo "========================================="
|
||||
echo ""
|
||||
echo "访问地址:"
|
||||
echo " - Grafana: http://localhost:3000 (admin/admin)"
|
||||
echo " - Loki: http://localhost:3100"
|
||||
echo " - Promtail: http://localhost:9080"
|
||||
echo ""
|
||||
echo "查看日志:"
|
||||
echo " 1. 访问 Grafana Explore: http://localhost:3000/explore"
|
||||
echo " 2. 选择 Loki 数据源"
|
||||
echo " 3. 输入查询: {job=\"functional-scaffold-app\"}"
|
||||
echo ""
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
from fastapi import Header, HTTPException
|
||||
from typing import Optional
|
||||
from ..core.tracing import set_request_id, generate_request_id
|
||||
from ..core.tracing import set_request_id, generate_request_id, get_request_id as get_current_request_id
|
||||
|
||||
|
||||
async def get_request_id(x_request_id: Optional[str] = Header(None)) -> str:
|
||||
@@ -15,6 +15,12 @@ async def get_request_id(x_request_id: Optional[str] = Header(None)) -> str:
|
||||
Returns:
|
||||
str: 请求ID
|
||||
"""
|
||||
# 先检查 ContextVar 中是否已经有 request_id(由中间件设置)
|
||||
existing_request_id = get_current_request_id()
|
||||
if existing_request_id:
|
||||
return existing_request_id
|
||||
|
||||
# 如果没有,则从请求头获取或生成新的
|
||||
request_id = x_request_id or generate_request_id()
|
||||
set_request_id(request_id)
|
||||
return request_id
|
||||
|
||||
@@ -152,3 +152,21 @@ class JobStatusResponse(BaseModel):
|
||||
result: Optional[Dict[str, Any]] = Field(None, description="执行结果(仅完成时返回)")
|
||||
error: Optional[str] = Field(None, description="错误信息(仅失败时返回)")
|
||||
metadata: Optional[Dict[str, Any]] = Field(None, description="元数据信息")
|
||||
|
||||
|
||||
class ConcurrencyStatusResponse(BaseModel):
|
||||
"""并发状态响应"""
|
||||
|
||||
model_config = ConfigDict(
|
||||
json_schema_extra={
|
||||
"example": {
|
||||
"max_concurrent": 10,
|
||||
"available_slots": 7,
|
||||
"running_jobs": 3,
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
max_concurrent: int = Field(..., description="最大并发任务数")
|
||||
available_slots: int = Field(..., description="当前可用槽位数")
|
||||
running_jobs: int = Field(..., description="当前运行中的任务数")
|
||||
|
||||
@@ -15,6 +15,7 @@ from .models import (
|
||||
JobCreateResponse,
|
||||
JobStatusResponse,
|
||||
JobStatus,
|
||||
ConcurrencyStatusResponse,
|
||||
)
|
||||
from .dependencies import get_request_id
|
||||
from ..algorithms.prime_checker import PrimeChecker
|
||||
@@ -292,3 +293,57 @@ async def get_job_status(job_id: str):
|
||||
"message": str(e),
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@router.get(
|
||||
"/jobs/concurrency/status",
|
||||
response_model=ConcurrencyStatusResponse,
|
||||
summary="查询并发状态",
|
||||
description="查询任务管理器的并发执行状态",
|
||||
responses={
|
||||
200: {"description": "成功", "model": ConcurrencyStatusResponse},
|
||||
503: {"description": "服务不可用", "model": ErrorResponse},
|
||||
},
|
||||
)
|
||||
async def get_concurrency_status():
|
||||
"""
|
||||
查询并发状态
|
||||
|
||||
返回当前任务管理器的并发执行状态,包括:
|
||||
- 最大并发任务数
|
||||
- 当前可用槽位数
|
||||
- 当前运行中的任务数
|
||||
"""
|
||||
try:
|
||||
job_manager = await get_job_manager()
|
||||
|
||||
# 检查任务管理器是否可用
|
||||
if not job_manager.is_available():
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
||||
detail={
|
||||
"error": "SERVICE_UNAVAILABLE",
|
||||
"message": "任务管理器不可用",
|
||||
},
|
||||
)
|
||||
|
||||
concurrency_status = job_manager.get_concurrency_status()
|
||||
|
||||
return ConcurrencyStatusResponse(
|
||||
max_concurrent=concurrency_status["max_concurrent"],
|
||||
available_slots=concurrency_status["available_slots"],
|
||||
running_jobs=concurrency_status["running_jobs"],
|
||||
)
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"查询并发状态失败: {str(e)}", exc_info=True)
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail={
|
||||
"error": "INTERNAL_ERROR",
|
||||
"message": str(e),
|
||||
},
|
||||
)
|
||||
|
||||
@@ -23,6 +23,8 @@ class Settings(BaseSettings):
|
||||
# 日志配置
|
||||
log_level: str = "INFO"
|
||||
log_format: str = "json"
|
||||
log_file_enabled: bool = False
|
||||
log_file_path: str = "/var/log/app/app.log"
|
||||
|
||||
# 指标配置
|
||||
metrics_enabled: bool = True
|
||||
@@ -53,6 +55,7 @@ class Settings(BaseSettings):
|
||||
job_result_ttl: int = 1800 # 结果缓存时间(秒),默认 30 分钟
|
||||
webhook_max_retries: int = 3 # Webhook 最大重试次数
|
||||
webhook_timeout: int = 10 # Webhook 超时时间(秒)
|
||||
max_concurrent_jobs: int = 10 # 最大并发任务数
|
||||
|
||||
|
||||
# 全局配置实例
|
||||
|
||||
@@ -27,6 +27,8 @@ class JobManager:
|
||||
self._redis_client: Optional[aioredis.Redis] = None
|
||||
self._algorithm_registry: Dict[str, Type[BaseAlgorithm]] = {}
|
||||
self._http_client: Optional[httpx.AsyncClient] = None
|
||||
self._semaphore: Optional[asyncio.Semaphore] = None
|
||||
self._max_concurrent_jobs: int = 0
|
||||
|
||||
async def initialize(self) -> None:
|
||||
"""初始化 Redis 连接和 HTTP 客户端"""
|
||||
@@ -51,6 +53,11 @@ class JobManager:
|
||||
# 初始化 HTTP 客户端
|
||||
self._http_client = httpx.AsyncClient(timeout=settings.webhook_timeout)
|
||||
|
||||
# 初始化并发控制信号量
|
||||
self._max_concurrent_jobs = settings.max_concurrent_jobs
|
||||
self._semaphore = asyncio.Semaphore(self._max_concurrent_jobs)
|
||||
logger.info(f"任务并发限制已设置: {self._max_concurrent_jobs}")
|
||||
|
||||
# 注册算法
|
||||
self._register_algorithms()
|
||||
|
||||
@@ -203,6 +210,10 @@ class JobManager:
|
||||
logger.error(f"Redis 不可用,无法执行任务: {job_id}")
|
||||
return
|
||||
|
||||
if not self._semaphore:
|
||||
logger.error(f"并发控制未初始化,无法执行任务: {job_id}")
|
||||
return
|
||||
|
||||
key = f"job:{job_id}"
|
||||
job_data = await self._redis_client.hgetall(key)
|
||||
|
||||
@@ -219,74 +230,76 @@ class JobManager:
|
||||
except json.JSONDecodeError:
|
||||
params = {}
|
||||
|
||||
# 更新状态为 running
|
||||
started_at = self._get_timestamp()
|
||||
await self._redis_client.hset(key, mapping={"status": "running", "started_at": started_at})
|
||||
# 使用信号量控制并发
|
||||
async with self._semaphore:
|
||||
# 更新状态为 running
|
||||
started_at = self._get_timestamp()
|
||||
await self._redis_client.hset(key, mapping={"status": "running", "started_at": started_at})
|
||||
|
||||
logger.info(f"开始执行任务: job_id={job_id}, algorithm={algorithm_name}")
|
||||
logger.info(f"开始执行任务: job_id={job_id}, algorithm={algorithm_name}")
|
||||
|
||||
import time
|
||||
import time
|
||||
|
||||
start_time = time.time()
|
||||
status = "completed"
|
||||
result_data = None
|
||||
error_msg = None
|
||||
metadata = None
|
||||
start_time = time.time()
|
||||
status = "completed"
|
||||
result_data = None
|
||||
error_msg = None
|
||||
metadata = None
|
||||
|
||||
try:
|
||||
# 获取算法类并执行
|
||||
algorithm_cls = self._algorithm_registry.get(algorithm_name)
|
||||
if not algorithm_cls:
|
||||
raise ValueError(f"算法 '{algorithm_name}' 不存在")
|
||||
try:
|
||||
# 获取算法类并执行
|
||||
algorithm_cls = self._algorithm_registry.get(algorithm_name)
|
||||
if not algorithm_cls:
|
||||
raise ValueError(f"算法 '{algorithm_name}' 不存在")
|
||||
|
||||
algorithm = algorithm_cls()
|
||||
algorithm = algorithm_cls()
|
||||
|
||||
# 根据算法类型传递参数
|
||||
if algorithm_name == "PrimeChecker":
|
||||
execution_result = algorithm.execute(params.get("number", 0))
|
||||
else:
|
||||
# 通用参数传递
|
||||
execution_result = algorithm.execute(**params)
|
||||
# 根据算法类型传递参数
|
||||
if algorithm_name == "PrimeChecker":
|
||||
execution_result = algorithm.execute(params.get("number", 0))
|
||||
else:
|
||||
# 通用参数传递
|
||||
execution_result = algorithm.execute(**params)
|
||||
|
||||
if execution_result.get("success"):
|
||||
result_data = execution_result.get("result", {})
|
||||
metadata = execution_result.get("metadata", {})
|
||||
else:
|
||||
if execution_result.get("success"):
|
||||
result_data = execution_result.get("result", {})
|
||||
metadata = execution_result.get("metadata", {})
|
||||
else:
|
||||
status = "failed"
|
||||
error_msg = execution_result.get("error", "算法执行失败")
|
||||
metadata = execution_result.get("metadata", {})
|
||||
|
||||
except Exception as e:
|
||||
status = "failed"
|
||||
error_msg = execution_result.get("error", "算法执行失败")
|
||||
metadata = execution_result.get("metadata", {})
|
||||
error_msg = str(e)
|
||||
logger.error(f"任务执行失败: job_id={job_id}, error={e}", exc_info=True)
|
||||
|
||||
except Exception as e:
|
||||
status = "failed"
|
||||
error_msg = str(e)
|
||||
logger.error(f"任务执行失败: job_id={job_id}, error={e}", exc_info=True)
|
||||
# 计算执行时间
|
||||
elapsed_time = time.time() - start_time
|
||||
completed_at = self._get_timestamp()
|
||||
|
||||
# 计算执行时间
|
||||
elapsed_time = time.time() - start_time
|
||||
completed_at = self._get_timestamp()
|
||||
# 更新任务状态
|
||||
update_data = {
|
||||
"status": status,
|
||||
"completed_at": completed_at,
|
||||
"result": json.dumps(result_data) if result_data else "",
|
||||
"error": error_msg or "",
|
||||
"metadata": json.dumps(metadata) if metadata else "",
|
||||
}
|
||||
await self._redis_client.hset(key, mapping=update_data)
|
||||
|
||||
# 更新任务状态
|
||||
update_data = {
|
||||
"status": status,
|
||||
"completed_at": completed_at,
|
||||
"result": json.dumps(result_data) if result_data else "",
|
||||
"error": error_msg or "",
|
||||
"metadata": json.dumps(metadata) if metadata else "",
|
||||
}
|
||||
await self._redis_client.hset(key, mapping=update_data)
|
||||
# 设置 TTL
|
||||
await self._redis_client.expire(key, settings.job_result_ttl)
|
||||
|
||||
# 设置 TTL
|
||||
await self._redis_client.expire(key, settings.job_result_ttl)
|
||||
# 记录指标
|
||||
incr("jobs_completed_total", {"algorithm": algorithm_name, "status": status})
|
||||
observe("job_execution_duration_seconds", {"algorithm": algorithm_name}, elapsed_time)
|
||||
|
||||
# 记录指标
|
||||
incr("jobs_completed_total", {"algorithm": algorithm_name, "status": status})
|
||||
observe("job_execution_duration_seconds", {"algorithm": algorithm_name}, elapsed_time)
|
||||
logger.info(f"任务执行完成: job_id={job_id}, status={status}, elapsed={elapsed_time:.3f}s")
|
||||
|
||||
logger.info(f"任务执行完成: job_id={job_id}, status={status}, elapsed={elapsed_time:.3f}s")
|
||||
|
||||
# 发送 Webhook 回调
|
||||
if webhook_url:
|
||||
await self._send_webhook(job_id, webhook_url)
|
||||
# 发送 Webhook 回调
|
||||
if webhook_url:
|
||||
await self._send_webhook(job_id, webhook_url)
|
||||
|
||||
async def _send_webhook(self, job_id: str, webhook_url: str) -> None:
|
||||
"""发送 Webhook 回调(带重试)
|
||||
@@ -359,6 +372,32 @@ class JobManager:
|
||||
"""检查任务管理器是否可用"""
|
||||
return self._redis_client is not None
|
||||
|
||||
def get_concurrency_status(self) -> Dict[str, int]:
|
||||
"""获取并发状态
|
||||
|
||||
Returns:
|
||||
Dict[str, int]: 包含以下键的字典
|
||||
- max_concurrent: 最大并发数
|
||||
- available_slots: 可用槽位数
|
||||
- running_jobs: 当前运行中的任务数
|
||||
"""
|
||||
if not self._semaphore:
|
||||
return {
|
||||
"max_concurrent": 0,
|
||||
"available_slots": 0,
|
||||
"running_jobs": 0,
|
||||
}
|
||||
|
||||
max_concurrent = self._max_concurrent_jobs
|
||||
available_slots = self._semaphore._value
|
||||
running_jobs = max_concurrent - available_slots
|
||||
|
||||
return {
|
||||
"max_concurrent": max_concurrent,
|
||||
"available_slots": available_slots,
|
||||
"running_jobs": running_jobs,
|
||||
}
|
||||
|
||||
|
||||
# 全局单例
|
||||
_job_manager: Optional[JobManager] = None
|
||||
|
||||
@@ -2,14 +2,39 @@
|
||||
|
||||
import logging
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
from logging.handlers import RotatingFileHandler
|
||||
from pythonjsonlogger.json import JsonFormatter
|
||||
|
||||
from .tracing import get_request_id
|
||||
|
||||
|
||||
class RequestIdFilter(logging.Filter):
|
||||
"""自动添加 request_id 到日志记录的过滤器"""
|
||||
|
||||
def filter(self, record: logging.LogRecord) -> bool:
|
||||
"""
|
||||
为日志记录添加 request_id 字段
|
||||
|
||||
Args:
|
||||
record: 日志记录
|
||||
|
||||
Returns:
|
||||
bool: 总是返回 True(不过滤任何日志)
|
||||
"""
|
||||
# 从 ContextVar 中获取 request_id
|
||||
request_id = get_request_id()
|
||||
# 添加到日志记录中,如果没有则设置为 None
|
||||
record.request_id = request_id if request_id else "-"
|
||||
return True
|
||||
|
||||
|
||||
def setup_logging(
|
||||
level: str = "INFO",
|
||||
format_type: str = "json",
|
||||
logger_name: Optional[str] = None,
|
||||
file_path: Optional[str] = None,
|
||||
) -> logging.Logger:
|
||||
"""
|
||||
配置日志系统
|
||||
@@ -18,6 +43,7 @@ def setup_logging(
|
||||
level: 日志级别 (DEBUG, INFO, WARNING, ERROR, CRITICAL)
|
||||
format_type: 日志格式 ('json' 或 'text')
|
||||
logger_name: 日志器名称,None表示根日志器
|
||||
file_path: 日志文件路径,None表示不写入文件
|
||||
|
||||
Returns:
|
||||
logging.Logger: 配置好的日志器
|
||||
@@ -28,23 +54,45 @@ def setup_logging(
|
||||
# 清除现有处理器
|
||||
logger.handlers.clear()
|
||||
|
||||
# 创建控制台处理器
|
||||
handler = logging.StreamHandler(sys.stdout)
|
||||
handler.setLevel(getattr(logging, level.upper()))
|
||||
|
||||
# 设置格式
|
||||
if format_type == "json":
|
||||
formatter = JsonFormatter(
|
||||
"%(asctime)s %(name)s %(levelname)s %(message)s",
|
||||
"%(asctime)s %(name)s %(levelname)s %(message)s %(request_id)s",
|
||||
timestamp=True,
|
||||
)
|
||||
else:
|
||||
formatter = logging.Formatter(
|
||||
"%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
||||
"%(asctime)s - %(name)s - %(levelname)s - [%(request_id)s] - %(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S",
|
||||
)
|
||||
|
||||
handler.setFormatter(formatter)
|
||||
logger.addHandler(handler)
|
||||
# 创建 RequestIdFilter
|
||||
request_id_filter = RequestIdFilter()
|
||||
|
||||
# 创建控制台处理器
|
||||
console_handler = logging.StreamHandler(sys.stdout)
|
||||
console_handler.setLevel(getattr(logging, level.upper()))
|
||||
console_handler.setFormatter(formatter)
|
||||
console_handler.addFilter(request_id_filter)
|
||||
logger.addHandler(console_handler)
|
||||
|
||||
# 创建文件处理器(如果指定了文件路径)
|
||||
if file_path:
|
||||
# 确保日志目录存在
|
||||
log_dir = Path(file_path).parent
|
||||
log_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# 创建 RotatingFileHandler
|
||||
# 最大 100MB,保留 5 个备份
|
||||
file_handler = RotatingFileHandler(
|
||||
file_path,
|
||||
maxBytes=100 * 1024 * 1024, # 100MB
|
||||
backupCount=5,
|
||||
encoding="utf-8",
|
||||
)
|
||||
file_handler.setLevel(getattr(logging, level.upper()))
|
||||
file_handler.setFormatter(formatter)
|
||||
file_handler.addFilter(request_id_filter)
|
||||
logger.addHandler(file_handler)
|
||||
|
||||
return logger
|
||||
|
||||
@@ -9,6 +9,7 @@ import time
|
||||
from .api import router
|
||||
from .config import settings
|
||||
from .core.logging import setup_logging
|
||||
from .core.tracing import generate_request_id, set_request_id, get_request_id
|
||||
from .core.metrics_unified import (
|
||||
get_metrics_manager,
|
||||
incr,
|
||||
@@ -20,7 +21,11 @@ from .core.metrics_unified import (
|
||||
from .core.job_manager import get_job_manager, shutdown_job_manager
|
||||
|
||||
# 设置日志
|
||||
setup_logging(level=settings.log_level, format_type=settings.log_format)
|
||||
setup_logging(
|
||||
level=settings.log_level,
|
||||
format_type=settings.log_format,
|
||||
file_path=settings.log_file_path if settings.log_file_enabled else None,
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# 创建 FastAPI 应用
|
||||
@@ -47,12 +52,37 @@ app.add_middleware(
|
||||
@app.middleware("http")
|
||||
async def log_requests(request: Request, call_next):
|
||||
"""记录所有HTTP请求"""
|
||||
# 从请求头获取或生成 request_id
|
||||
request_id = request.headers.get("x-request-id") or generate_request_id()
|
||||
set_request_id(request_id)
|
||||
|
||||
logger.info(f"Request: {request.method} {request.url.path}")
|
||||
response = await call_next(request)
|
||||
logger.info(f"Response: {response.status_code}")
|
||||
return response
|
||||
|
||||
|
||||
def normalize_path(path: str) -> str:
|
||||
"""
|
||||
规范化路径,将路径参数替换为模板形式
|
||||
|
||||
Args:
|
||||
path: 原始路径
|
||||
|
||||
Returns:
|
||||
规范化后的路径
|
||||
|
||||
Examples:
|
||||
/jobs/a1b2c3d4e5f6 -> /jobs/{job_id}
|
||||
/invoke -> /invoke
|
||||
"""
|
||||
# 匹配 /jobs/{任意字符串} 模式
|
||||
if path.startswith("/jobs/") and len(path) > 6:
|
||||
return "/jobs/{job_id}"
|
||||
|
||||
return path
|
||||
|
||||
|
||||
# 指标跟踪中间件
|
||||
@app.middleware("http")
|
||||
async def track_metrics(request: Request, call_next):
|
||||
@@ -60,8 +90,9 @@ async def track_metrics(request: Request, call_next):
|
||||
if not settings.metrics_enabled:
|
||||
return await call_next(request)
|
||||
|
||||
# 跳过 /metrics 端点本身,避免循环记录
|
||||
if request.url.path == "/metrics":
|
||||
# 跳过不需要记录指标的端点
|
||||
skip_paths = {"/metrics", "/readyz", "/healthz"}
|
||||
if request.url.path in skip_paths:
|
||||
return await call_next(request)
|
||||
|
||||
gauge_incr("http_requests_in_progress")
|
||||
@@ -79,13 +110,15 @@ async def track_metrics(request: Request, call_next):
|
||||
raise e
|
||||
finally:
|
||||
elapsed = time.time() - start_time
|
||||
# 使用规范化后的路径记录指标
|
||||
normalized_path = normalize_path(request.url.path)
|
||||
incr(
|
||||
"http_requests_total",
|
||||
{"method": request.method, "endpoint": request.url.path, "status": status},
|
||||
{"method": request.method, "endpoint": normalized_path, "status": status},
|
||||
)
|
||||
observe(
|
||||
"http_request_duration_seconds",
|
||||
{"method": request.method, "endpoint": request.url.path},
|
||||
{"method": request.method, "endpoint": normalized_path},
|
||||
elapsed,
|
||||
)
|
||||
gauge_decr("http_requests_in_progress")
|
||||
|
||||
@@ -186,6 +186,10 @@ class TestJobManagerWithMocks:
|
||||
manager._redis_client = mock_redis
|
||||
manager._register_algorithms()
|
||||
|
||||
# 初始化 semaphore
|
||||
import asyncio
|
||||
manager._semaphore = asyncio.Semaphore(10)
|
||||
|
||||
await manager.execute_job("test-job-id")
|
||||
|
||||
# 验证状态更新被调用
|
||||
@@ -399,3 +403,97 @@ class TestWebhook:
|
||||
|
||||
# 验证重试次数
|
||||
assert mock_http.post.call_count == 2
|
||||
|
||||
|
||||
class TestConcurrencyControl:
|
||||
"""测试并发控制功能"""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_concurrency_status(self):
|
||||
"""测试获取并发状态"""
|
||||
manager = JobManager()
|
||||
|
||||
# 初始化 semaphore
|
||||
manager._max_concurrent_jobs = 10
|
||||
manager._semaphore = asyncio.Semaphore(10)
|
||||
|
||||
status = manager.get_concurrency_status()
|
||||
|
||||
assert status["max_concurrent"] == 10
|
||||
assert status["available_slots"] == 10
|
||||
assert status["running_jobs"] == 0
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_concurrency_status_without_semaphore(self):
|
||||
"""测试未初始化 semaphore 时获取并发状态"""
|
||||
manager = JobManager()
|
||||
|
||||
status = manager.get_concurrency_status()
|
||||
|
||||
assert status["max_concurrent"] == 0
|
||||
assert status["available_slots"] == 0
|
||||
assert status["running_jobs"] == 0
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_concurrency_limit(self):
|
||||
"""测试并发限制是否生效"""
|
||||
manager = JobManager()
|
||||
|
||||
# 设置较小的并发限制
|
||||
manager._max_concurrent_jobs = 2
|
||||
manager._semaphore = asyncio.Semaphore(2)
|
||||
|
||||
# 模拟 Redis
|
||||
mock_redis = AsyncMock()
|
||||
mock_redis.hgetall = AsyncMock(
|
||||
return_value={
|
||||
"status": "pending",
|
||||
"algorithm": "PrimeChecker",
|
||||
"params": '{"number": 17}',
|
||||
"webhook": "",
|
||||
"request_id": "test-request-id",
|
||||
"created_at": "2026-02-02T10:00:00+00:00",
|
||||
}
|
||||
)
|
||||
mock_redis.hset = AsyncMock()
|
||||
mock_redis.expire = AsyncMock()
|
||||
manager._redis_client = mock_redis
|
||||
manager._register_algorithms()
|
||||
|
||||
# 创建一个慢速任务
|
||||
async def slow_execute():
|
||||
async with manager._semaphore:
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
# 启动 3 个任务
|
||||
tasks = [asyncio.create_task(slow_execute()) for _ in range(3)]
|
||||
|
||||
# 等待一小段时间,让前两个任务获取 semaphore
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
# 检查并发状态
|
||||
status = manager.get_concurrency_status()
|
||||
assert status["running_jobs"] == 2 # 只有 2 个任务在运行
|
||||
assert status["available_slots"] == 0 # 没有可用槽位
|
||||
|
||||
# 等待所有任务完成
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
# 检查最终状态
|
||||
status = manager.get_concurrency_status()
|
||||
assert status["running_jobs"] == 0
|
||||
assert status["available_slots"] == 2
|
||||
|
||||
def test_concurrency_status_api(self, client):
|
||||
"""测试并发状态 API 端点"""
|
||||
response = client.get("/jobs/concurrency/status")
|
||||
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
data = response.json()
|
||||
|
||||
assert "max_concurrent" in data
|
||||
assert "available_slots" in data
|
||||
assert "running_jobs" in data
|
||||
assert isinstance(data["max_concurrent"], int)
|
||||
assert isinstance(data["available_slots"], int)
|
||||
assert isinstance(data["running_jobs"], int)
|
||||
|
||||
97
tests/test_middleware.py
Normal file
97
tests/test_middleware.py
Normal file
@@ -0,0 +1,97 @@
|
||||
"""中间件测试"""
|
||||
|
||||
import pytest
|
||||
from unittest.mock import patch, MagicMock
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
from src.functional_scaffold.main import app, normalize_path
|
||||
|
||||
|
||||
class TestNormalizePath:
|
||||
"""测试路径规范化函数"""
|
||||
|
||||
def test_normalize_jobs_path(self):
|
||||
"""测试 /jobs/{job_id} 路径规范化"""
|
||||
assert normalize_path("/jobs/a1b2c3d4e5f6") == "/jobs/{job_id}"
|
||||
assert normalize_path("/jobs/123456789012") == "/jobs/{job_id}"
|
||||
assert normalize_path("/jobs/xyz") == "/jobs/{job_id}"
|
||||
|
||||
def test_normalize_other_paths(self):
|
||||
"""测试其他路径保持不变"""
|
||||
assert normalize_path("/invoke") == "/invoke"
|
||||
assert normalize_path("/healthz") == "/healthz"
|
||||
assert normalize_path("/readyz") == "/readyz"
|
||||
assert normalize_path("/metrics") == "/metrics"
|
||||
assert normalize_path("/docs") == "/docs"
|
||||
|
||||
def test_normalize_jobs_root(self):
|
||||
"""测试 /jobs 根路径"""
|
||||
assert normalize_path("/jobs") == "/jobs"
|
||||
|
||||
|
||||
class TestMetricsMiddleware:
|
||||
"""测试指标中间件"""
|
||||
|
||||
@patch("src.functional_scaffold.main.incr")
|
||||
@patch("src.functional_scaffold.main.observe")
|
||||
@patch("src.functional_scaffold.main.gauge_incr")
|
||||
@patch("src.functional_scaffold.main.gauge_decr")
|
||||
def test_skip_health_endpoints(self, mock_gauge_decr, mock_gauge_incr, mock_observe, mock_incr):
|
||||
"""测试跳过健康检查端点"""
|
||||
client = TestClient(app)
|
||||
|
||||
# 访问健康检查端点
|
||||
client.get("/healthz")
|
||||
client.get("/readyz")
|
||||
client.get("/metrics")
|
||||
|
||||
# 验证没有记录指标
|
||||
mock_incr.assert_not_called()
|
||||
mock_observe.assert_not_called()
|
||||
mock_gauge_incr.assert_not_called()
|
||||
mock_gauge_decr.assert_not_called()
|
||||
|
||||
@patch("src.functional_scaffold.main.incr")
|
||||
@patch("src.functional_scaffold.main.observe")
|
||||
@patch("src.functional_scaffold.main.gauge_incr")
|
||||
@patch("src.functional_scaffold.main.gauge_decr")
|
||||
def test_record_normal_endpoints(self, mock_gauge_decr, mock_gauge_incr, mock_observe, mock_incr):
|
||||
"""测试记录普通端点"""
|
||||
client = TestClient(app)
|
||||
|
||||
# 访问普通端点
|
||||
client.post("/invoke", json={"number": 17})
|
||||
|
||||
# 验证记录了指标
|
||||
mock_gauge_incr.assert_called_once()
|
||||
mock_gauge_decr.assert_called_once()
|
||||
mock_incr.assert_called_once()
|
||||
mock_observe.assert_called_once()
|
||||
|
||||
# 验证使用了正确的端点路径
|
||||
incr_call_args = mock_incr.call_args
|
||||
assert incr_call_args[0][1]["endpoint"] == "/invoke"
|
||||
|
||||
@patch("src.functional_scaffold.main.incr")
|
||||
@patch("src.functional_scaffold.main.observe")
|
||||
@patch("src.functional_scaffold.main.gauge_incr")
|
||||
@patch("src.functional_scaffold.main.gauge_decr")
|
||||
@patch("src.functional_scaffold.core.job_manager.get_job_manager")
|
||||
def test_normalize_job_path(self, mock_get_manager, mock_gauge_decr, mock_gauge_incr, mock_observe, mock_incr):
|
||||
"""测试规范化任务路径"""
|
||||
# Mock job manager
|
||||
mock_manager = MagicMock()
|
||||
mock_manager.get_job.return_value = None
|
||||
mock_get_manager.return_value = mock_manager
|
||||
|
||||
client = TestClient(app)
|
||||
|
||||
# 访问任务端点(会返回 404,但中间件应该记录指标)
|
||||
client.get("/jobs/a1b2c3d4e5f6")
|
||||
|
||||
# 验证记录了指标
|
||||
mock_incr.assert_called_once()
|
||||
|
||||
# 验证使用了规范化后的路径
|
||||
incr_call_args = mock_incr.call_args
|
||||
assert incr_call_args[0][1]["endpoint"] == "/jobs/{job_id}"
|
||||
Reference in New Issue
Block a user