main: 增强 Agent Run 逻辑与消息处理

- 添加流式文本推送,支持 `message.delta` 消息类型
- 优化 Run 主流程,增加工具调用与流式数据发布逻辑
- 更新 `phpunit.xml` 环境变量,支持 Agent 配置项
- 扩展文档,完善工具调用与消息类型说明
This commit is contained in:
2025-12-22 17:51:56 +08:00
parent 59d4831f00
commit 663e15395b
5 changed files with 766 additions and 143 deletions

View File

@@ -88,7 +88,7 @@ AGENT_OPENAI_INCLUDE_USAGE=false
# AgentRunJob 队列执行策略
AGENT_RUN_JOB_TRIES=1 # 队列重试次数
AGENT_RUN_JOB_BACKOFF=3 # 重试退避秒数
AGENT_RUN_JOB_TIMEOUT=360 # Job 超时时间(秒)
AGENT_RUN_JOB_TIMEOUT=600 # Job 超时时间(秒)
# Tool 子 Run 调度与超时
AGENT_TOOL_MAX_CALLS_PER_RUN=1 # 单个父 Run 允许的工具调用次数

410
CLAUDE.md
View File

@@ -36,14 +36,24 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co
- `message_id` (UUID, 主键): 消息唯一标识
- `session_id`: 所属会话ID
- `role`: 消息角色 (USER/AGENT/TOOL/SYSTEM)
- `type`: 消息类型 (user.prompt/agent.message/run.status等)
- `type`: 消息类型 (user.prompt/agent.message/message.delta/tool.call/tool.result/run.status/error等)
- `content`: 消息内容 (text)
- `payload`: 附加数据 (jsonb)
- `payload`: 附加数据 (jsonb),包含 run_id、tool_call_id、error_type 等元数据
- `seq`: 会话内序号 (单调递增)
- `reply_to`: 回复的消息ID
- `dedupe_key`: 幂等去重键
- **约束**: `unique(session_id, seq)``unique(session_id, dedupe_key)`
#### 消息类型完整列表
- `user.prompt` (USER): 用户提示
- `agent.message` (AGENT): Agent 完整回复
- `message.delta` (AGENT): 流式文本增量
- `tool.call` (AGENT): 工具调用请求
- `tool.result` (TOOL): 工具执行结果
- `run.status` (SYSTEM): Run 状态RUNNING/DONE/FAILED/CANCELED
- `error` (SYSTEM): 错误信息
- `run.cancel.request` (USER): 取消请求
### 会话状态与门禁规则
- **OPEN**: 正常追加所有消息
@@ -58,12 +68,48 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co
3. 检查会话是否已有 RUNNING 状态的 run单会话单任务限制
4. 创建 `run.status=RUNNING` 消息并派发 `AgentRunJob`
5. `RunLoop` 执行 Agent 调用流程:
- `ContextBuilder` 构建上下文
- `AgentProviderInterface` 调用 Agent(当前为 DummyAgentProvider
- `CancelChecker` 检查取消信号
- `OutputSink` 写入 agent.message
- `ContextBuilder` 构建上下文(加载最近 20 条相关消息)
- `AgentProviderInterface::stream()` 流式调用 Agent
- 消费 `Generator<ProviderEvent>` 流:
- `MessageDelta`: 流式文本,写入 `message.delta` 消息
- `ToolCall`: 工具调用,累积后写入 `tool.call` 并分发 `ToolRunJob`
- `Done`: 流结束,写入最终 `agent.message` + `DONE` 状态
- `Error`: 错误,写入 `error` + `FAILED` 状态
- `CancelChecker` 定期检查取消信号
- 工具调用完成后,等待 `tool.result`,继续下一轮 Provider 调用
- `OutputSink` 统一写入消息,保证幂等性
6. 完成后写入 `run.status=DONE/FAILED/CANCELED`
**Provider 选择逻辑**(在 `AppServiceProvider` 中绑定):
- `HttpAgentProvider` 会检查 `AGENT_OPENAI_API_KEY` 环境变量
- 若配置了 OpenAI Key则使用 `OpenAiChatCompletionsAdapter`
- 否则回退到 `DummyAgentProvider`(返回模拟响应)
### 工具系统架构
项目支持 Agent 调用工具Tools采用子 Run 模式:
- **Tool** (`app/Services/Tool/Tool.php`): 工具接口,定义 name、description、parameters、execute 方法
- **ToolRegistry** (`app/Services/Tool/ToolRegistry.php`): 管理已注册工具,生成 OpenAI 兼容工具声明
- **ToolExecutor** (`app/Services/Tool/ToolExecutor.php`): 执行工具,处理超时和结果截断
- **ToolRunDispatcher** (`app/Services/Tool/ToolRunDispatcher.php`): 为每个工具调用创建子 run 并投递 `ToolRunJob`
- **ToolRunJob** (`app/Jobs/ToolRunJob.php`): 队列任务,执行工具并写入 `tool.result` 消息
工具调用流程:
1. Agent 返回 ToolCall 事件
2. RunLoop 累积工具调用,写入 `tool.call` 消息
3. ToolRunDispatcher 为每个工具创建子 run`run.status=RUNNING`
4. ToolRunJob 执行工具,写入 `tool.result` 消息
5. RunLoop 轮询等待所有 `tool.result`(支持超时)
6. 收集工具结果后,继续下一轮 Provider 调用
配置项(`config/agent.php`
- `agent.tools.max_calls_per_run`: 单 run 最多工具调用次数(默认 1
- `agent.tools.wait_timeout_ms`: 等待工具结果超时(默认 15000ms
- `agent.tools.wait_poll_interval_ms`: 轮询间隔(默认 200ms
- `agent.tools.timeout_seconds`: 工具执行超时(默认 15s
- `agent.tools.result_max_bytes`: 结果最大字节数(默认 4096
### 实时消息推送 (SSE)
- **端点**: `GET /api/sessions/{id}/sse?after_seq=123`
@@ -71,26 +117,41 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co
1. 先从数据库补发历史消息seq > after_seq
2. 订阅 Redis 频道 `session:{id}:messages` 监听新消息
3. 支持 `Last-Event-ID` 自动续传
4. 检测 seq gap 自动回补
5. 15 秒心跳保活
- **事件格式**: SSE event id 为消息 seq
### 服务层架构
- **ChatService**: 会话和消息的核心业务逻辑
- **ChatService** (`app/Services/ChatService.php`): 会话和消息的核心业务逻辑
- 使用行锁 (`lockForUpdate`) + 事务保证消息 seq 单调递增
- 通过 `dedupe_key` 实现幂等性
- 消息追加后发布 Redis 事件用于 SSE 推送
- 提供 `appendMessage()``listMessagesBySeq()``updateSession()` 等方法
- **RunDispatcher**: Agent Run 调度器
- **RunDispatcher** (`app/Services/RunDispatcher.php`): Agent Run 调度器
- 检查 trigger_message_id 幂等性
- 确保同会话只有一个 RUNNING 状态的 run
- **RunLoop**: Agent 执行循环
- 协调 ContextBuilder、AgentProvider、OutputSink、CancelChecker
- **RunLoop** (`app/Services/RunLoop.php`): Agent 执行循环
- 协调 ContextBuilder、AgentProvider、OutputSink、CancelChecker、ToolRunDispatcher
- 处理工具调用上限(`max_calls_per_run`
- 达到上限后强制 `tool_choice=none` 防止再次触发
- **OutputSink**: 统一的消息写入接口
- **OutputSink** (`app/Services/OutputSink.php`): 统一的消息写入接口
- `appendAgentMessage()`: 写入 agent 回复
- `appendAgentDelta()`: 写入流式文本增量
- `appendRunStatus()`: 写入 run 状态
- `appendError()`: 写入错误信息
- `appendToolCall()`: 写入工具调用
- `appendToolResult()`: 写入工具结果
- **ContextBuilder** (`app/Services/ContextBuilder.php`): 构建 Agent 上下文
- 加载最近 20 条相关消息USER/AGENT/TOOL 角色)
- 按 seq 排序并转换为 AgentContext
- **CancelChecker** (`app/Services/CancelChecker.php`): 检查 run 是否被取消
- 查询 `type='run.cancel.request'` 消息
## 常用开发命令
@@ -162,6 +223,27 @@ docker compose exec app vendor/bin/pint --test
docker compose exec app vendor/bin/pint --dirty
```
### 本地开发(不使用 Docker
如果你想在本地直接运行(需要 PHP 8.2+、PostgreSQL、Redis
```bash
# 安装依赖
composer install
# 启动 Octane 开发服务器
php artisan octane:start --host=0.0.0.0 --port=8000
# 启动队列 worker
php artisan queue:work
# 或启动 Horizon
php artisan horizon
# 查看实时日志
php artisan pail
```
### 队列与任务
```bash
@@ -254,6 +336,91 @@ docker compose exec app php artisan make:test ChatServiceTest --unit --phpunit
- `GET /api/sessions/{session_id}/sse`: SSE 实时消息流
- `POST /api/sessions/{session_id}/runs`: 手动触发 Agent Run
## 项目结构
```
app/
├── Enums/ # 枚举类ChatSessionStatus 等)
├── Exceptions/ # 自定义异常
├── Http/
│ ├── Controllers/ # API 控制器
│ │ ├── ChatSessionController.php # 会话和消息 API
│ │ ├── ChatSessionSseController.php # SSE 实时推送
│ │ ├── RunController.php # Agent Run 手动触发
│ │ ├── AuthController.php # 用户认证
│ │ └── UserController.php # 用户管理
│ ├── Requests/ # Form Request 验证
│ └── Resources/ # API 响应格式化
├── Jobs/ # 队列任务
│ ├── AgentRunJob.php # Agent Run 队列任务
│ └── ToolRunJob.php # 工具执行队列任务
├── Models/ # Eloquent 模型
│ ├── ChatSession.php
│ ├── Message.php
│ └── User.php
├── Providers/ # 服务提供者
│ └── AppServiceProvider.php # 绑定 AgentProviderInterface
└── Services/ # 业务逻辑服务
├── Agent/ # Agent Provider 实现
│ ├── OpenAi/ # OpenAI 适配器
│ │ ├── OpenAiChatCompletionsAdapter.php
│ │ ├── ChatCompletionsRequestBuilder.php
│ │ ├── OpenAiApiClient.php
│ │ ├── OpenAiStreamParser.php
│ │ └── OpenAiEventNormalizer.php
│ ├── AgentProviderInterface.php
│ ├── AgentContext.php
│ ├── ProviderEvent.php
│ ├── ProviderEventType.php
│ ├── ProviderException.php
│ ├── HttpAgentProvider.php
│ └── DummyAgentProvider.php
├── Tool/ # 工具系统
│ ├── Tool.php # 工具接口
│ ├── ToolRegistry.php # 工具注册表
│ ├── ToolExecutor.php # 工具执行器
│ ├── ToolRunDispatcher.php # 工具 Run 分发器
│ ├── ToolCall.php # 工具调用对象
│ ├── ToolResult.php # 工具结果对象
│ └── Tools/ # 具体工具实现
│ └── GetTimeTool.php # 获取时间工具(示例)
├── ChatService.php # 会话和消息核心服务
├── RunDispatcher.php # Run 调度器
├── RunLoop.php # Run 执行循环
├── ContextBuilder.php # 上下文构建器
├── OutputSink.php # 消息写入器
└── CancelChecker.php # 取消检查器
database/
├── migrations/
│ └── 2025_02_14_000003_create_chat_tables.php # 核心表结构
└── factories/ # 测试数据工厂
tests/
├── Feature/
│ ├── ChatSessionTest.php # 会话和消息测试
│ └── AgentRunTest.php # Agent Run 流程测试
└── Unit/
└── OpenAiAdapterTest.php # OpenAI 适配器单元测试
config/
├── agent.php # Agent Provider 和工具配置
├── auth.php # JWT 认证配置
├── queue.php # 队列配置
└── horizon.php # Horizon 队列监控配置
bootstrap/
├── app.php # Laravel 12 应用引导(中间件、路由、异常)
└── providers.php # 服务提供者注册
```
**关键设计原则**
- 所有 Agent Provider 实现 `AgentProviderInterface::stream()` 接口
- 使用 `Generator` 模式流式返回 `ProviderEvent`
- 统一通过 `OutputSink` 写入消息,保证事务性和幂等性
- 工具系统采用子 Run 模式,每个工具调用创建独立 run
- 所有异步操作通过队列AgentRunJob、ToolRunJob执行
## 开发注意事项
### Laravel 12 新特性
@@ -261,27 +428,80 @@ docker compose exec app php artisan make:test ChatServiceTest --unit --phpunit
- 中间件、路由、异常处理在 `bootstrap/app.php` 配置
- 服务提供者在 `bootstrap/providers.php` 注册
- Commands 自动注册(无需手动注册)
- JWT 中间件别名在 `bootstrap/app.php` 中配置为 `auth.jwt`
### 数据库操作规范
- 消息追加必须使用 `ChatService::appendMessage()`,不要直接操作 Message 模型
- 会话状态变更必须通过 `ChatService::updateSession()`
- 所有涉及 seq 递增的操作必须在事务 + 行锁中完成
- **消息追加**必须使用 `ChatService::appendMessage()`,不要直接操作 Message 模型
- 原因:需要行锁 + 事务保证 seq 单调递增,并发布 Redis 事件
- 所有 `OutputSink` 方法最终都调用 `ChatService::appendMessage()`
- **会话状态变更**:必须通过 `ChatService::updateSession()`
- 会自动校验 CLOSED 状态不可重新打开
- **所有涉及 seq 递增的操作**:必须在事务 + 行锁中完成
### Provider 与 Event Stream 开发
- 实现自定义 Provider 时必须实现 `AgentProviderInterface::stream()` 接口
- 使用 `Generator` 模式 yield `ProviderEvent` 对象
- 事件类型:
- `ProviderEvent::messageDelta($content)`: 流式文本增量
- `ProviderEvent::toolCall($toolCallId, $name, $arguments)`: 工具调用
- `ProviderEvent::done($finishReason)`: 流结束
- `ProviderEvent::error($errorCode, $message, $retryable)`: 错误
- 错误处理:抛出 `ProviderException` 包含 errorCode、retryable、httpStatus
- `RunLoop` 会自动处理重试、取消检查、工具调用分发
### 工具开发规范
- 创建新工具:继承 `Tool` 抽象类,实现 `name()``description()``parameters()``execute()` 方法
- 注册工具:在 `AppServiceProvider` 中调用 `ToolRegistry::register($tool)`
- 工具执行:
- `execute()` 方法接收 `array $args`,返回字符串结果
- 超时控制:通过 `AGENT_TOOL_TIMEOUT_SECONDS` 配置
- 结果截断:超过 `AGENT_TOOL_RESULT_MAX_BYTES` 会自动截断并标记
- 工具参数:使用 JSON Schema 格式定义,会自动传递给 OpenAI API
### 测试规范
- 所有测试使用 PHPUnit非 Pest
- Feature 测试必须测试完整的 HTTP 请求流程
- 测试中使用 Factory 创建模型数据
- 修改代码后必须运行相关测试确保通过
- **框架**所有测试使用 PHPUnit非 Pest
- **Feature 测试**必须测试完整的 HTTP 请求流程
- 使用 `RefreshDatabase` trait 在测试间刷新数据
- 使用 `Queue::fake()` 模拟队列
- 使用 `Redis::shouldReceive()` 模拟 Redis 发布
- **测试数据**:使用 Factory 创建模型数据
- **运行测试**:修改代码后必须运行相关测试确保通过
```bash
# 运行所有测试
docker compose exec app php artisan test
# 运行特定测试方法
docker compose exec app php artisan test --filter=testAppendMessageWithDedupe
```
### 队列配置
- 开发环境可使用同步队列`.env` 中设置 `QUEUE_CONNECTION=sync`
- 生产环境使用 Redis 队列 + Horizon 监控
- `AgentRunJob` 在队列中异步执行
- **开发环境**可使用同步队列`.env` 中设置 `QUEUE_CONNECTION=sync`
- 优点:调试方便,错误堆栈清晰
- 缺点:阻塞 HTTP 请求
- **生产环境**:使用 Redis 队列 + Horizon 监控
- `AgentRunJob``ToolRunJob` 在队列中异步执行
- Horizon 仪表板http://localhost:8000/horizon
- **Job 配置**:通过 `config/agent.php` 控制重试次数、退避时间、超时
### 幂等性设计
- 所有可能重复调用的操作都使用 `dedupe_key`
- `RunDispatcher` 通过 `trigger_message_id` 确保不会为同一 prompt 重复创建 run
- SSE 通过 `Last-Event-ID` / `after_seq` 支持断线续传
- **dedupe_key 机制**所有可能重复调用的操作都使用 `dedupe_key`
- 基于 UNIQUE 约束 `unique(session_id, dedupe_key)` 自动去重
- 重复请求返回已有消息(相同 message_id 和 seq
- **Run 幂等**`RunDispatcher` 通过 `trigger_message_id` 的 dedupe_key 确保不会为同一 prompt 重复创建 run
- **SSE 续传**:通过 `Last-Event-ID` / `after_seq` 支持断线续传
- **消息幂等模式**
- `run:{runId}:agent:message` - Agent 最终回复
- `run:{runId}:agent:delta:{index}` - 流式增量
- `run:{runId}:status:{status}` - Run 状态
- `run:{runId}:tool:call:{toolCallId}` - 工具调用
- `run:{runId}:tool:result:{toolCallId}` - 工具结果
### 性能优化建议
- **上下文加载**`ContextBuilder` 只加载最近 20 条消息,可通过配置调整
- **消息分页**`listMessagesBySeq()` 使用 `after_seq` + `limit` 增量拉取
- **索引优化**`(session_id, seq)``(session_id, dedupe_key)` 复合索引加速查询
- **Redis 发布**:消息追加后异步发布,使用 `DB::afterCommit()` 保证顺序
- **SSE 优化**backlog 限制 200 条,心跳 15 秒gap 检测自动回补
## 环境变量关键配置
@@ -294,10 +514,14 @@ DB_CONNECTION=pgsql
DB_HOST=pgsql
DB_PORT=5432
DB_DATABASE=ars_backend
DB_USERNAME=ars
DB_PASSWORD=secret
# Redis
REDIS_CLIENT=phpredis
REDIS_HOST=redis
REDIS_PORT=6379
CACHE_STORE=redis
# 队列
QUEUE_CONNECTION=redis # 或 sync开发用
@@ -308,6 +532,57 @@ AUTH_GUARD=api
# CORS
CORS_ALLOWED_ORIGINS=http://localhost:5173
# OpenAI 兼容 API 配置
AGENT_OPENAI_BASE_URL=https://api.openai.com/v1 # 支持任何 OpenAI 兼容端点
AGENT_OPENAI_API_KEY= # 为空时使用 DummyProvider
AGENT_OPENAI_ORGANIZATION= # 可选
AGENT_OPENAI_PROJECT= # 可选
AGENT_OPENAI_MODEL=gpt-4o-mini
AGENT_OPENAI_TEMPERATURE=0.7
AGENT_OPENAI_TOP_P=1.0
AGENT_OPENAI_INCLUDE_USAGE=false
# Agent Provider HTTP 配置(重试机制)
AGENT_PROVIDER_TIMEOUT=30 # HTTP 请求超时(秒)
AGENT_PROVIDER_CONNECT_TIMEOUT=5 # 连接超时(秒)
AGENT_PROVIDER_RETRY_TIMES=1 # 建立流前重试次数(仅连接失败/429/5xx 且未产出事件时)
AGENT_PROVIDER_RETRY_BACKOFF_MS=500 # 重试退避毫秒(指数退避)
# Agent Run Job 配置
AGENT_RUN_JOB_TRIES=1 # 队列重试次数
AGENT_RUN_JOB_BACKOFF=3 # 重试退避秒数
AGENT_RUN_JOB_TIMEOUT=600 # Job 超时时间(秒)
# 工具系统配置
AGENT_TOOL_MAX_CALLS_PER_RUN=1 # 单个父 Run 允许的工具调用次数
AGENT_TOOL_WAIT_TIMEOUT_MS=15000 # 等待 tool.result 的超时时间(毫秒)
AGENT_TOOL_WAIT_POLL_MS=200 # 等待工具结果轮询间隔(毫秒)
AGENT_TOOL_TIMEOUT_SECONDS=15 # 单个工具执行超时(秒,超出记为 TIMEOUT
AGENT_TOOL_RESULT_MAX_BYTES=4096 # 工具结果最大保存字节数(截断后仍会写入)
AGENT_TOOL_CHOICE=auto # OpenAI tool_choice 选项auto/required 等)
AGENT_TOOL_JOB_TRIES=1 # ToolRunJob 重试次数
AGENT_TOOL_JOB_BACKOFF=3 # ToolRunJob 重试退避秒数
AGENT_TOOL_JOB_TIMEOUT=120 # ToolRunJob 超时时间(秒)
```
## 初始化新环境
```bash
# 1. 复制环境配置
cp .env.example .env
# 2. 生成应用密钥
docker compose exec app php artisan key:generate
# 3. 生成 JWT 密钥
docker compose exec app php artisan jwt:secret
# 4. 运行迁移
docker compose exec app php artisan migrate
# 5. (可选)创建测试用户
docker compose exec app php artisan db:seed
```
## 相关文档
@@ -315,3 +590,90 @@ CORS_ALLOWED_ORIGINS=http://localhost:5173
- API 详细文档:`docs/ChatSession/chat-session-api.md`
- OpenAPI 规范:`docs/ChatSession/chat-session-openapi.yaml`
- 用户管理文档:`docs/User/user-api.md`
## 常见问题排查
### 队列任务不执行
- 检查 Horizon 是否运行:`docker compose ps horizon`
- 查看 Horizon 日志:`docker compose logs -f horizon`
- 检查 Redis 连接:
```bash
docker compose exec app php artisan tinker
> Redis::ping() # 应返回 "PONG"
```
- 查看失败的任务:`docker compose exec app php artisan queue:failed`
- 重试失败任务:`docker compose exec app php artisan queue:retry all`
### SSE 连接断开
- 检查 Nginx/代理是否支持 SSE需要禁用缓冲
- 确认客户端正确处理 `Last-Event-ID` 续传
- 查看 Redis 发布日志
- 测试环境下 SSE 会自动回退到仅返回 backlog无实时推送
### Agent Run 失败
- 查看 `messages` 表中 `type=error` 的消息:
```sql
SELECT message_id, session_id, content, payload
FROM messages
WHERE type = 'error'
ORDER BY created_at DESC
LIMIT 10;
```
- 检查 `payload.error_type``payload.provider``payload.retryable``payload.details`
- 检查 Provider 配置:
```bash
docker compose exec app php artisan config:show agent
```
- 查看实时日志:
```bash
docker compose exec app php artisan pail
# 或查看容器日志
docker compose logs -f app
```
- 测试 OpenAI API Key
```bash
docker compose exec app php artisan tinker
> $provider = app(App\Services\Agent\AgentProviderInterface::class);
> $context = new App\Services\Agent\AgentContext('test', []);
> foreach ($provider->stream($context) as $event) { dump($event); }
```
### 工具调用问题
- 检查工具是否注册:
```bash
docker compose exec app php artisan tinker
> $registry = app(App\Services\Tool\ToolRegistry::class);
> dump($registry->openAiToolsSpec());
```
- 查看 tool.call 和 tool.result 消息:
```sql
SELECT message_id, type, content, payload
FROM messages
WHERE session_id = 'xxx' AND type IN ('tool.call', 'tool.result')
ORDER BY seq;
```
- 检查工具调用上限:配置 `AGENT_TOOL_MAX_CALLS_PER_RUN`
- 工具执行超时:检查 `payload.status` 是否为 `TIMEOUT`
### 数据库迁移问题
- 确保 PostgreSQL 已启动:`docker compose ps pgsql`
- 查看迁移状态:`docker compose exec app php artisan migrate:status`
- 检查数据库连接:
```bash
docker compose exec app php artisan tinker
> DB::connection()->getPdo()
```
- 查看数据库日志:`docker compose logs -f pgsql`
### 消息 seq 不连续或重复
- 检查是否有并发追加消息(应使用行锁 + 事务)
- 确认所有消息追加都通过 `ChatService::appendMessage()`
- 查看 unique 约束冲突日志
### 调试技巧
- **实时日志**`docker compose exec app php artisan pail`
- **Telescope**:访问 http://localhost:8000/telescope 查看请求、查询、队列
- **Tinker REPL**`docker compose exec app php artisan tinker` 交互式调试
- **查看配置**`php artisan config:show agent`
- **查看路由**`php artisan route:list`
- **数据库查询日志**:在 `.env` 中设置 `DB_LOG_QUERIES=true`

View File

@@ -30,13 +30,18 @@ class OutputSink
}
/**
* 追加 Agent 流式文本增量(仅用于 SSE 推送,不落库)。
*
* message.delta 消息只用于实时流式推送,不需要持久化到数据库。
* 最终的完整回复会通过 appendAgentMessage() 落库。
*
* @param array<string, mixed> $meta
*/
public function appendAgentDelta(string $sessionId, string $runId, string $content, int $deltaIndex, array $meta = []): Message
public function appendAgentDelta(string $sessionId, string $runId, string $content, int $deltaIndex, array $meta = []): void
{
$dedupeKey = "run:{$runId}:agent:delta:{$deltaIndex}";
return $this->chatService->appendMessage([
// 1. 创建临时 Message 对象(不保存到数据库)
$message = new Message([
'message_id' => (string) \Illuminate\Support\Str::uuid(),
'session_id' => $sessionId,
'role' => Message::ROLE_AGENT,
'type' => 'message.delta',
@@ -45,8 +50,45 @@ class OutputSink
'run_id' => $runId,
'delta_index' => $deltaIndex,
]),
'dedupe_key' => $dedupeKey,
], $wasDeduped);
'dedupe_key' => "run:{$runId}:agent:delta:{$deltaIndex}",
'seq' => 0, // delta 消息不需要真实的 seq
'created_at' => now(),
]);
// 2. 仅发布 Redis 事件,供 SSE 实时推送
$this->publishDeltaMessage($message);
}
/**
* 发布 delta 消息到 Redis仅用于 SSE 推送)。
*
* 此方法不保存消息到数据库,只发布事件供 SSE 客户端接收。
*/
private function publishDeltaMessage(Message $message): void
{
$root = \Illuminate\Support\Facades\Redis::getFacadeRoot();
$isMocked = $root instanceof \Mockery\MockInterface;
// 如果 Redis 不可用(测试环境),直接返回
if (!class_exists(\Redis::class) && !$isMocked) {
return;
}
$channel = "session:{$message->session_id}:messages";
try {
\Illuminate\Support\Facades\Redis::publish(
$channel,
json_encode($message->toArray(), JSON_UNESCAPED_UNICODE | JSON_INVALID_UTF8_IGNORE)
);
} catch (\Throwable $e) {
logger()->warning('Redis publish failed for delta message', [
'session_id' => $message->session_id,
'run_id' => $message->payload['run_id'] ?? null,
'delta_index' => $message->payload['delta_index'] ?? null,
'error' => $e->getMessage(),
]);
}
}
/**

View File

@@ -37,10 +37,17 @@ class RunLoop
}
/**
* 运行单次 Agent Run run_id 幂等)负责取消检查、Provider 调用和结果落库
* 运行单次 Agent Run run_id 幂等)。
*
* 主流程:
* 1. 检查 run 是否已终止
* 2. 进入主循环,持续调用 Provider 直到完成或失败
* 3. 每轮迭代可能触发工具调用,工具完成后继续下一轮
* 4. 没有工具调用时,写入最终回复并标记 DONE
*/
public function run(string $sessionId, string $runId): void
{
// 1. 幂等性检查:避免重复执行已完成的 run
if ($this->isRunTerminal($sessionId, $runId)) {
return;
}
@@ -48,137 +55,346 @@ class RunLoop
$providerName = $this->resolveProviderName();
$toolCallCount = 0;
// 2. 主循环:持续调用 Provider 直到完成或失败
while (true) {
if ($this->isCanceled($sessionId, $runId)) {
$this->appendCanceled($sessionId, $runId);
// 2.1 检查用户是否取消
if ($this->checkAndHandleCancel($sessionId, $runId)) {
return;
}
$context = $this->contextBuilder->build($sessionId, $runId);
$providerOptions = [
'should_stop' => fn () => $this->isCanceled($sessionId, $runId),
];
// 2.2 执行一轮 Provider 调用
$iterationResult = $this->executeProviderIteration(
$sessionId,
$runId,
$providerName,
$toolCallCount
);
// 达到工具调用上限后强制关闭后续工具调用,避免再次触发 TOOL_CALL_LIMIT。
if ($toolCallCount >= $this->maxToolCalls) {
$providerOptions['tool_choice'] = 'none';
}
$logOptions = $providerOptions;
unset($logOptions['should_stop']);
logger('agent provider context', [
'sessionId' => $sessionId,
'runId' => $runId,
'provider' => $providerName,
'context' => $context,
'provider_options' => $logOptions,
]);
$startedAt = microtime(true);
logger('agent provider request', [
'sessionId' => $sessionId,
'runId' => $runId,
'provider' => $providerName,
'iteration' => $toolCallCount,
]);
// 单轮 Agent 调用(可能触发工具调用,后续再进下一轮)
$streamState = $this->consumeProviderStream($sessionId, $runId, $context, $providerName, $startedAt, $providerOptions);
if ($streamState['canceled'] || $streamState['failed']) {
// 2.3 处理失败或取消
if ($iterationResult['should_exit']) {
return;
}
if (! empty($streamState['tool_calls'])) {
$toolCallCount += count($streamState['tool_calls']);
if ($toolCallCount > $this->maxToolCalls) {
$this->appendProviderFailure(
$sessionId,
$runId,
'TOOL_CALL_LIMIT',
'Tool call limit reached for this run',
$providerName,
$this->latencyMs($startedAt),
[],
'TOOL_CALL_LIMIT'
);
// 2.4 如果有工具调用,处理工具执行流程
if ($iterationResult['has_tool_calls']) {
$shouldExit = $this->handleToolCalls(
$sessionId,
$runId,
$providerName,
$iterationResult,
$toolCallCount
);
if ($shouldExit) {
return;
}
// 工具调用:先调度子 Run再等待 tool.result随后继续下一轮 Provider 调用
$toolCalls = $this->dispatchToolRuns($sessionId, $runId, $streamState['tool_calls']);
$waitState = $this->awaitToolResults($sessionId, $runId, $toolCalls, $providerName);
if ($waitState['failed'] || $waitState['canceled']) {
return;
}
// 工具结果已写回上下文,继续下一轮 Agent 调用。
// 更新工具调用计数,继续下一轮 Provider 调用
$toolCallCount = $iterationResult['updated_tool_count'];
continue;
}
$latencyMs = $this->latencyMs($startedAt);
logger('agent provider response', [
'sessionId' => $sessionId,
'runId' => $runId,
'provider' => $providerName,
'latency_ms' => $latencyMs,
]);
if ($this->isCanceled($sessionId, $runId)) {
$this->appendCanceled($sessionId, $runId);
return;
}
if (! $streamState['received_event']) {
$this->appendProviderFailure(
$sessionId,
$runId,
'EMPTY_STREAM',
'Agent provider returned no events',
$providerName,
$latencyMs,
[],
'EMPTY_STREAM'
);
return;
}
if ($streamState['done_reason'] === null) {
$this->appendProviderFailure(
$sessionId,
$runId,
'STREAM_INCOMPLETE',
'Agent provider stream ended unexpectedly',
$providerName,
$latencyMs,
[],
'STREAM_INCOMPLETE'
);
return;
}
$this->outputSink->appendAgentMessage($sessionId, $runId, $streamState['reply'], [
'provider' => $providerName,
'done_reason' => $streamState['done_reason'],
], "run:{$runId}:agent:message");
if ($this->isCanceled($sessionId, $runId)) {
$this->appendCanceled($sessionId, $runId);
return;
}
$this->outputSink->appendRunStatus($sessionId, $runId, 'DONE', [
'dedupe_key' => "run:{$runId}:status:DONE",
]);
// 2.5 没有工具调用,完成 run
$this->completeRun(
$sessionId,
$runId,
$providerName,
$iterationResult['stream_state'],
$iterationResult['latency_ms']
);
return;
}
}
/**
* 检查并处理取消状态。
*
* @return bool 是否已处理取消true 表示已取消并写入状态)
*/
private function checkAndHandleCancel(string $sessionId, string $runId): bool
{
if ($this->isCanceled($sessionId, $runId)) {
$this->appendCanceled($sessionId, $runId);
return true;
}
return false;
}
/**
* 执行一轮 Provider 调用迭代。
*
* 包括:
* - 构建上下文
* - 准备 Provider 选项(工具调用限制、取消回调等)
* - 调用 Provider 流式接口
* - 记录日志
*
* @return array{
* stream_state: array,
* has_tool_calls: bool,
* updated_tool_count: int,
* should_exit: bool,
* latency_ms: int
* }
*/
private function executeProviderIteration(
string $sessionId,
string $runId,
string $providerName,
int $currentToolCallCount
): array {
// 1. 构建上下文和 Provider 选项
$context = $this->contextBuilder->build($sessionId, $runId);
$providerOptions = $this->buildProviderOptions($sessionId, $runId, $currentToolCallCount);
// 2. 记录调用日志
$this->logProviderRequest($sessionId, $runId, $providerName, $context, $providerOptions, $currentToolCallCount);
// 3. 调用 Provider 并消费事件流
$startedAt = microtime(true);
$streamState = $this->consumeProviderStream(
$sessionId,
$runId,
$context,
$providerName,
$startedAt,
$providerOptions
);
$latencyMs = $this->latencyMs($startedAt);
// 4. 检查流式调用是否失败或取消
if ($streamState['canceled'] || $streamState['failed']) {
return [
'stream_state' => $streamState,
'has_tool_calls' => false,
'updated_tool_count' => $currentToolCallCount,
'should_exit' => true,
'latency_ms' => $latencyMs,
];
}
// 5. 检查是否有工具调用
$hasToolCalls = !empty($streamState['tool_calls']);
$updatedToolCount = $currentToolCallCount + count($streamState['tool_calls']);
return [
'stream_state' => $streamState,
'has_tool_calls' => $hasToolCalls,
'updated_tool_count' => $updatedToolCount,
'should_exit' => false,
'latency_ms' => $latencyMs,
];
}
/**
* 构建 Provider 调用选项。
*
* 包括:
* - 取消检查回调
* - 工具调用限制控制
*/
private function buildProviderOptions(string $sessionId, string $runId, int $toolCallCount): array
{
$options = [
'should_stop' => fn () => $this->isCanceled($sessionId, $runId),
];
// 达到工具调用上限后,强制禁用工具调用,避免再次触发 TOOL_CALL_LIMIT 错误
if ($toolCallCount >= $this->maxToolCalls) {
$options['tool_choice'] = 'none';
}
return $options;
}
/**
* 记录 Provider 请求日志。
*/
private function logProviderRequest(
string $sessionId,
string $runId,
string $providerName,
AgentContext $context,
array $providerOptions,
int $iteration
): void {
// 日志选项(移除不可序列化的回调)
$logOptions = $providerOptions;
unset($logOptions['should_stop']);
logger('agent provider context', [
'sessionId' => $sessionId,
'runId' => $runId,
'provider' => $providerName,
'context' => $context,
'provider_options' => $logOptions,
]);
logger('agent provider request', [
'sessionId' => $sessionId,
'runId' => $runId,
'provider' => $providerName,
'iteration' => $iteration,
]);
}
/**
* 处理工具调用流程。
*
* 流程:
* 1. 检查工具调用数量是否超限
* 2. 分发工具子 Run
* 3. 等待工具执行结果
*
* @return bool 是否应该退出主循环(超限、失败或取消时返回 true
*/
private function handleToolCalls(
string $sessionId,
string $runId,
string $providerName,
array $iterationResult,
int $originalToolCallCount
): bool {
$streamState = $iterationResult['stream_state'];
$latencyMs = $iterationResult['latency_ms'];
$updatedToolCount = $iterationResult['updated_tool_count'];
// 1. 检查工具调用数量是否超限
if ($updatedToolCount > $this->maxToolCalls) {
$this->appendProviderFailure(
$sessionId,
$runId,
'TOOL_CALL_LIMIT',
'Tool call limit reached for this run',
$providerName,
$latencyMs,
[],
'TOOL_CALL_LIMIT'
);
return true; // 退出主循环
}
// 2. 分发工具子 Run
$toolCalls = $this->dispatchToolRuns($sessionId, $runId, $streamState['tool_calls']);
// 3. 等待所有工具执行完成
$waitState = $this->awaitToolResults($sessionId, $runId, $toolCalls, $providerName);
// 4. 检查等待过程中是否失败或取消
if ($waitState['failed'] || $waitState['canceled']) {
return true; // 退出主循环
}
// 工具结果已写回上下文,继续下一轮 Agent 调用
return false;
}
/**
* 完成 Run 并写入最终状态。
*
* 流程:
* 1. 验证流式响应的有效性
* 2. 写入最终 agent.message
* 3. 再次检查取消状态
* 4. 写入 run.status = DONE
*/
private function completeRun(
string $sessionId,
string $runId,
string $providerName,
array $streamState,
int $latencyMs
): void {
// 1. 记录响应日志
logger('agent provider response', [
'sessionId' => $sessionId,
'runId' => $runId,
'provider' => $providerName,
'latency_ms' => $latencyMs,
]);
// 2. 再次检查取消状态(在写入最终消息前)
if ($this->checkAndHandleCancel($sessionId, $runId)) {
return;
}
// 3. 验证流式响应的有效性
if (!$this->validateStreamResponse($sessionId, $runId, $providerName, $streamState, $latencyMs)) {
return;
}
// 4. 写入最终 agent.message
$this->outputSink->appendAgentMessage($sessionId, $runId, $streamState['reply'], [
'provider' => $providerName,
'done_reason' => $streamState['done_reason'],
], "run:{$runId}:agent:message");
// 5. 最后一次检查取消状态(在写入 DONE 前)
if ($this->checkAndHandleCancel($sessionId, $runId)) {
return;
}
// 6. 写入 run.status = DONE
$this->outputSink->appendRunStatus($sessionId, $runId, 'DONE', [
'dedupe_key' => "run:{$runId}:status:DONE",
]);
}
/**
* 验证流式响应的有效性。
*
* 检查:
* - 是否收到任何事件(避免空流)
* - 流是否正常结束(有 done_reason
*
* @return bool 是否有效true 表示有效false 表示无效并已写入错误)
*/
private function validateStreamResponse(
string $sessionId,
string $runId,
string $providerName,
array $streamState,
int $latencyMs
): bool {
// 1. 检查是否收到任何事件
if (!$streamState['received_event']) {
$this->appendProviderFailure(
$sessionId,
$runId,
'EMPTY_STREAM',
'Agent provider returned no events',
$providerName,
$latencyMs,
[],
'EMPTY_STREAM'
);
return false;
}
// 2. 检查流是否正常结束
if ($streamState['done_reason'] === null) {
$this->appendProviderFailure(
$sessionId,
$runId,
'STREAM_INCOMPLETE',
'Agent provider stream ended unexpectedly',
$providerName,
$latencyMs,
[],
'STREAM_INCOMPLETE'
);
return false;
}
return true;
}
/**
* 判断指定 run 是否已到终态,避免重复执行。
*/

View File

@@ -31,5 +31,8 @@
<env name="TELESCOPE_ENABLED" value="false"/>
<env name="NIGHTWATCH_ENABLED" value="false"/>
<env name="JWT_SECRET" value="testing_jwt_secret_for_unit_tests_32_chars_min"/>
<env name="AGENT_PROVIDER_ENDPOINT" value="null"/>
<env name="AGENT_OPENAI_BASE_URL" value="null"/>
<env name="AGENT_OPENAI_API_KEY" value="null"/>
</php>
</phpunit>