diff --git a/.env.example b/.env.example index f024ce6..e6c410c 100644 --- a/.env.example +++ b/.env.example @@ -88,7 +88,7 @@ AGENT_OPENAI_INCLUDE_USAGE=false # AgentRunJob 队列执行策略 AGENT_RUN_JOB_TRIES=1 # 队列重试次数 AGENT_RUN_JOB_BACKOFF=3 # 重试退避秒数 -AGENT_RUN_JOB_TIMEOUT=360 # Job 超时时间(秒) +AGENT_RUN_JOB_TIMEOUT=600 # Job 超时时间(秒) # Tool 子 Run 调度与超时 AGENT_TOOL_MAX_CALLS_PER_RUN=1 # 单个父 Run 允许的工具调用次数 diff --git a/CLAUDE.md b/CLAUDE.md index 791f6bf..414c190 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -36,14 +36,24 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co - `message_id` (UUID, 主键): 消息唯一标识 - `session_id`: 所属会话ID - `role`: 消息角色 (USER/AGENT/TOOL/SYSTEM) -- `type`: 消息类型 (user.prompt/agent.message/run.status等) +- `type`: 消息类型 (user.prompt/agent.message/message.delta/tool.call/tool.result/run.status/error等) - `content`: 消息内容 (text) -- `payload`: 附加数据 (jsonb) +- `payload`: 附加数据 (jsonb),包含 run_id、tool_call_id、error_type 等元数据 - `seq`: 会话内序号 (单调递增) - `reply_to`: 回复的消息ID - `dedupe_key`: 幂等去重键 - **约束**: `unique(session_id, seq)` 和 `unique(session_id, dedupe_key)` +#### 消息类型完整列表 +- `user.prompt` (USER): 用户提示 +- `agent.message` (AGENT): Agent 完整回复 +- `message.delta` (AGENT): 流式文本增量 +- `tool.call` (AGENT): 工具调用请求 +- `tool.result` (TOOL): 工具执行结果 +- `run.status` (SYSTEM): Run 状态(RUNNING/DONE/FAILED/CANCELED) +- `error` (SYSTEM): 错误信息 +- `run.cancel.request` (USER): 取消请求 + ### 会话状态与门禁规则 - **OPEN**: 正常追加所有消息 @@ -58,12 +68,48 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co 3. 检查会话是否已有 RUNNING 状态的 run(单会话单任务限制) 4. 创建 `run.status=RUNNING` 消息并派发 `AgentRunJob` 5. `RunLoop` 执行 Agent 调用流程: - - `ContextBuilder` 构建上下文 - - `AgentProviderInterface` 调用 Agent(当前为 DummyAgentProvider) - - `CancelChecker` 检查取消信号 - - `OutputSink` 写入 agent.message + - `ContextBuilder` 构建上下文(加载最近 20 条相关消息) + - `AgentProviderInterface::stream()` 流式调用 Agent + - 消费 `Generator` 流: + - `MessageDelta`: 流式文本,写入 `message.delta` 消息 + - `ToolCall`: 工具调用,累积后写入 `tool.call` 并分发 `ToolRunJob` + - `Done`: 流结束,写入最终 `agent.message` + `DONE` 状态 + - `Error`: 错误,写入 `error` + `FAILED` 状态 + - `CancelChecker` 定期检查取消信号 + - 工具调用完成后,等待 `tool.result`,继续下一轮 Provider 调用 + - `OutputSink` 统一写入消息,保证幂等性 6. 完成后写入 `run.status=DONE/FAILED/CANCELED` +**Provider 选择逻辑**(在 `AppServiceProvider` 中绑定): +- `HttpAgentProvider` 会检查 `AGENT_OPENAI_API_KEY` 环境变量 +- 若配置了 OpenAI Key,则使用 `OpenAiChatCompletionsAdapter` +- 否则回退到 `DummyAgentProvider`(返回模拟响应) + +### 工具系统架构 + +项目支持 Agent 调用工具(Tools),采用子 Run 模式: + +- **Tool** (`app/Services/Tool/Tool.php`): 工具接口,定义 name、description、parameters、execute 方法 +- **ToolRegistry** (`app/Services/Tool/ToolRegistry.php`): 管理已注册工具,生成 OpenAI 兼容工具声明 +- **ToolExecutor** (`app/Services/Tool/ToolExecutor.php`): 执行工具,处理超时和结果截断 +- **ToolRunDispatcher** (`app/Services/Tool/ToolRunDispatcher.php`): 为每个工具调用创建子 run 并投递 `ToolRunJob` +- **ToolRunJob** (`app/Jobs/ToolRunJob.php`): 队列任务,执行工具并写入 `tool.result` 消息 + +工具调用流程: +1. Agent 返回 ToolCall 事件 +2. RunLoop 累积工具调用,写入 `tool.call` 消息 +3. ToolRunDispatcher 为每个工具创建子 run(`run.status=RUNNING`) +4. ToolRunJob 执行工具,写入 `tool.result` 消息 +5. RunLoop 轮询等待所有 `tool.result`(支持超时) +6. 收集工具结果后,继续下一轮 Provider 调用 + +配置项(`config/agent.php`): +- `agent.tools.max_calls_per_run`: 单 run 最多工具调用次数(默认 1) +- `agent.tools.wait_timeout_ms`: 等待工具结果超时(默认 15000ms) +- `agent.tools.wait_poll_interval_ms`: 轮询间隔(默认 200ms) +- `agent.tools.timeout_seconds`: 工具执行超时(默认 15s) +- `agent.tools.result_max_bytes`: 结果最大字节数(默认 4096) + ### 实时消息推送 (SSE) - **端点**: `GET /api/sessions/{id}/sse?after_seq=123` @@ -71,26 +117,41 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co 1. 先从数据库补发历史消息(seq > after_seq) 2. 订阅 Redis 频道 `session:{id}:messages` 监听新消息 3. 支持 `Last-Event-ID` 自动续传 + 4. 检测 seq gap 自动回补 + 5. 15 秒心跳保活 - **事件格式**: SSE event id 为消息 seq ### 服务层架构 -- **ChatService**: 会话和消息的核心业务逻辑 +- **ChatService** (`app/Services/ChatService.php`): 会话和消息的核心业务逻辑 - 使用行锁 (`lockForUpdate`) + 事务保证消息 seq 单调递增 - 通过 `dedupe_key` 实现幂等性 - 消息追加后发布 Redis 事件用于 SSE 推送 + - 提供 `appendMessage()`、`listMessagesBySeq()`、`updateSession()` 等方法 -- **RunDispatcher**: Agent Run 调度器 +- **RunDispatcher** (`app/Services/RunDispatcher.php`): Agent Run 调度器 - 检查 trigger_message_id 幂等性 - 确保同会话只有一个 RUNNING 状态的 run -- **RunLoop**: Agent 执行循环 - - 协调 ContextBuilder、AgentProvider、OutputSink、CancelChecker +- **RunLoop** (`app/Services/RunLoop.php`): Agent 执行循环 + - 协调 ContextBuilder、AgentProvider、OutputSink、CancelChecker、ToolRunDispatcher + - 处理工具调用上限(`max_calls_per_run`) + - 达到上限后强制 `tool_choice=none` 防止再次触发 -- **OutputSink**: 统一的消息写入接口 +- **OutputSink** (`app/Services/OutputSink.php`): 统一的消息写入接口 - `appendAgentMessage()`: 写入 agent 回复 + - `appendAgentDelta()`: 写入流式文本增量 - `appendRunStatus()`: 写入 run 状态 - `appendError()`: 写入错误信息 + - `appendToolCall()`: 写入工具调用 + - `appendToolResult()`: 写入工具结果 + +- **ContextBuilder** (`app/Services/ContextBuilder.php`): 构建 Agent 上下文 + - 加载最近 20 条相关消息(USER/AGENT/TOOL 角色) + - 按 seq 排序并转换为 AgentContext + +- **CancelChecker** (`app/Services/CancelChecker.php`): 检查 run 是否被取消 + - 查询 `type='run.cancel.request'` 消息 ## 常用开发命令 @@ -162,6 +223,27 @@ docker compose exec app vendor/bin/pint --test docker compose exec app vendor/bin/pint --dirty ``` +### 本地开发(不使用 Docker) + +如果你想在本地直接运行(需要 PHP 8.2+、PostgreSQL、Redis): + +```bash +# 安装依赖 +composer install + +# 启动 Octane 开发服务器 +php artisan octane:start --host=0.0.0.0 --port=8000 + +# 启动队列 worker +php artisan queue:work + +# 或启动 Horizon +php artisan horizon + +# 查看实时日志 +php artisan pail +``` + ### 队列与任务 ```bash @@ -254,6 +336,91 @@ docker compose exec app php artisan make:test ChatServiceTest --unit --phpunit - `GET /api/sessions/{session_id}/sse`: SSE 实时消息流 - `POST /api/sessions/{session_id}/runs`: 手动触发 Agent Run +## 项目结构 + +``` +app/ +├── Enums/ # 枚举类(ChatSessionStatus 等) +├── Exceptions/ # 自定义异常 +├── Http/ +│ ├── Controllers/ # API 控制器 +│ │ ├── ChatSessionController.php # 会话和消息 API +│ │ ├── ChatSessionSseController.php # SSE 实时推送 +│ │ ├── RunController.php # Agent Run 手动触发 +│ │ ├── AuthController.php # 用户认证 +│ │ └── UserController.php # 用户管理 +│ ├── Requests/ # Form Request 验证 +│ └── Resources/ # API 响应格式化 +├── Jobs/ # 队列任务 +│ ├── AgentRunJob.php # Agent Run 队列任务 +│ └── ToolRunJob.php # 工具执行队列任务 +├── Models/ # Eloquent 模型 +│ ├── ChatSession.php +│ ├── Message.php +│ └── User.php +├── Providers/ # 服务提供者 +│ └── AppServiceProvider.php # 绑定 AgentProviderInterface +└── Services/ # 业务逻辑服务 + ├── Agent/ # Agent Provider 实现 + │ ├── OpenAi/ # OpenAI 适配器 + │ │ ├── OpenAiChatCompletionsAdapter.php + │ │ ├── ChatCompletionsRequestBuilder.php + │ │ ├── OpenAiApiClient.php + │ │ ├── OpenAiStreamParser.php + │ │ └── OpenAiEventNormalizer.php + │ ├── AgentProviderInterface.php + │ ├── AgentContext.php + │ ├── ProviderEvent.php + │ ├── ProviderEventType.php + │ ├── ProviderException.php + │ ├── HttpAgentProvider.php + │ └── DummyAgentProvider.php + ├── Tool/ # 工具系统 + │ ├── Tool.php # 工具接口 + │ ├── ToolRegistry.php # 工具注册表 + │ ├── ToolExecutor.php # 工具执行器 + │ ├── ToolRunDispatcher.php # 工具 Run 分发器 + │ ├── ToolCall.php # 工具调用对象 + │ ├── ToolResult.php # 工具结果对象 + │ └── Tools/ # 具体工具实现 + │ └── GetTimeTool.php # 获取时间工具(示例) + ├── ChatService.php # 会话和消息核心服务 + ├── RunDispatcher.php # Run 调度器 + ├── RunLoop.php # Run 执行循环 + ├── ContextBuilder.php # 上下文构建器 + ├── OutputSink.php # 消息写入器 + └── CancelChecker.php # 取消检查器 + +database/ +├── migrations/ +│ └── 2025_02_14_000003_create_chat_tables.php # 核心表结构 +└── factories/ # 测试数据工厂 + +tests/ +├── Feature/ +│ ├── ChatSessionTest.php # 会话和消息测试 +│ └── AgentRunTest.php # Agent Run 流程测试 +└── Unit/ + └── OpenAiAdapterTest.php # OpenAI 适配器单元测试 + +config/ +├── agent.php # Agent Provider 和工具配置 +├── auth.php # JWT 认证配置 +├── queue.php # 队列配置 +└── horizon.php # Horizon 队列监控配置 + +bootstrap/ +├── app.php # Laravel 12 应用引导(中间件、路由、异常) +└── providers.php # 服务提供者注册 +``` + +**关键设计原则**: +- 所有 Agent Provider 实现 `AgentProviderInterface::stream()` 接口 +- 使用 `Generator` 模式流式返回 `ProviderEvent` +- 统一通过 `OutputSink` 写入消息,保证事务性和幂等性 +- 工具系统采用子 Run 模式,每个工具调用创建独立 run +- 所有异步操作通过队列(AgentRunJob、ToolRunJob)执行 + ## 开发注意事项 ### Laravel 12 新特性 @@ -261,27 +428,80 @@ docker compose exec app php artisan make:test ChatServiceTest --unit --phpunit - 中间件、路由、异常处理在 `bootstrap/app.php` 配置 - 服务提供者在 `bootstrap/providers.php` 注册 - Commands 自动注册(无需手动注册) +- JWT 中间件别名在 `bootstrap/app.php` 中配置为 `auth.jwt` ### 数据库操作规范 -- 消息追加必须使用 `ChatService::appendMessage()`,不要直接操作 Message 模型 -- 会话状态变更必须通过 `ChatService::updateSession()` -- 所有涉及 seq 递增的操作必须在事务 + 行锁中完成 +- **消息追加**:必须使用 `ChatService::appendMessage()`,不要直接操作 Message 模型 + - 原因:需要行锁 + 事务保证 seq 单调递增,并发布 Redis 事件 + - 所有 `OutputSink` 方法最终都调用 `ChatService::appendMessage()` +- **会话状态变更**:必须通过 `ChatService::updateSession()` + - 会自动校验 CLOSED 状态不可重新打开 +- **所有涉及 seq 递增的操作**:必须在事务 + 行锁中完成 + +### Provider 与 Event Stream 开发 +- 实现自定义 Provider 时必须实现 `AgentProviderInterface::stream()` 接口 +- 使用 `Generator` 模式 yield `ProviderEvent` 对象 +- 事件类型: + - `ProviderEvent::messageDelta($content)`: 流式文本增量 + - `ProviderEvent::toolCall($toolCallId, $name, $arguments)`: 工具调用 + - `ProviderEvent::done($finishReason)`: 流结束 + - `ProviderEvent::error($errorCode, $message, $retryable)`: 错误 +- 错误处理:抛出 `ProviderException` 包含 errorCode、retryable、httpStatus +- `RunLoop` 会自动处理重试、取消检查、工具调用分发 + +### 工具开发规范 +- 创建新工具:继承 `Tool` 抽象类,实现 `name()`、`description()`、`parameters()`、`execute()` 方法 +- 注册工具:在 `AppServiceProvider` 中调用 `ToolRegistry::register($tool)` +- 工具执行: + - `execute()` 方法接收 `array $args`,返回字符串结果 + - 超时控制:通过 `AGENT_TOOL_TIMEOUT_SECONDS` 配置 + - 结果截断:超过 `AGENT_TOOL_RESULT_MAX_BYTES` 会自动截断并标记 +- 工具参数:使用 JSON Schema 格式定义,会自动传递给 OpenAI API ### 测试规范 -- 所有测试使用 PHPUnit(非 Pest) -- Feature 测试必须测试完整的 HTTP 请求流程 -- 测试中使用 Factory 创建模型数据 -- 修改代码后必须运行相关测试确保通过 +- **框架**:所有测试使用 PHPUnit(非 Pest) +- **Feature 测试**:必须测试完整的 HTTP 请求流程 + - 使用 `RefreshDatabase` trait 在测试间刷新数据库 + - 使用 `Queue::fake()` 模拟队列 + - 使用 `Redis::shouldReceive()` 模拟 Redis 发布 +- **测试数据**:使用 Factory 创建模型数据 +- **运行测试**:修改代码后必须运行相关测试确保通过 + ```bash + # 运行所有测试 + docker compose exec app php artisan test + + # 运行特定测试方法 + docker compose exec app php artisan test --filter=testAppendMessageWithDedupe + ``` ### 队列配置 -- 开发环境可使用同步队列:`.env` 中设置 `QUEUE_CONNECTION=sync` -- 生产环境使用 Redis 队列 + Horizon 监控 -- `AgentRunJob` 在队列中异步执行 +- **开发环境**:可使用同步队列,`.env` 中设置 `QUEUE_CONNECTION=sync` + - 优点:调试方便,错误堆栈清晰 + - 缺点:阻塞 HTTP 请求 +- **生产环境**:使用 Redis 队列 + Horizon 监控 + - `AgentRunJob` 和 `ToolRunJob` 在队列中异步执行 + - Horizon 仪表板:http://localhost:8000/horizon +- **Job 配置**:通过 `config/agent.php` 控制重试次数、退避时间、超时 ### 幂等性设计 -- 所有可能重复调用的操作都使用 `dedupe_key` -- `RunDispatcher` 通过 `trigger_message_id` 确保不会为同一 prompt 重复创建 run -- SSE 通过 `Last-Event-ID` / `after_seq` 支持断线续传 +- **dedupe_key 机制**:所有可能重复调用的操作都使用 `dedupe_key` + - 基于 UNIQUE 约束 `unique(session_id, dedupe_key)` 自动去重 + - 重复请求返回已有消息(相同 message_id 和 seq) +- **Run 幂等**:`RunDispatcher` 通过 `trigger_message_id` 的 dedupe_key 确保不会为同一 prompt 重复创建 run +- **SSE 续传**:通过 `Last-Event-ID` / `after_seq` 支持断线续传 +- **消息幂等模式**: + - `run:{runId}:agent:message` - Agent 最终回复 + - `run:{runId}:agent:delta:{index}` - 流式增量 + - `run:{runId}:status:{status}` - Run 状态 + - `run:{runId}:tool:call:{toolCallId}` - 工具调用 + - `run:{runId}:tool:result:{toolCallId}` - 工具结果 + +### 性能优化建议 +- **上下文加载**:`ContextBuilder` 只加载最近 20 条消息,可通过配置调整 +- **消息分页**:`listMessagesBySeq()` 使用 `after_seq` + `limit` 增量拉取 +- **索引优化**:`(session_id, seq)` 和 `(session_id, dedupe_key)` 复合索引加速查询 +- **Redis 发布**:消息追加后异步发布,使用 `DB::afterCommit()` 保证顺序 +- **SSE 优化**:backlog 限制 200 条,心跳 15 秒,gap 检测自动回补 ## 环境变量关键配置 @@ -294,10 +514,14 @@ DB_CONNECTION=pgsql DB_HOST=pgsql DB_PORT=5432 DB_DATABASE=ars_backend +DB_USERNAME=ars +DB_PASSWORD=secret # Redis +REDIS_CLIENT=phpredis REDIS_HOST=redis REDIS_PORT=6379 +CACHE_STORE=redis # 队列 QUEUE_CONNECTION=redis # 或 sync(开发用) @@ -308,6 +532,57 @@ AUTH_GUARD=api # CORS CORS_ALLOWED_ORIGINS=http://localhost:5173 + +# OpenAI 兼容 API 配置 +AGENT_OPENAI_BASE_URL=https://api.openai.com/v1 # 支持任何 OpenAI 兼容端点 +AGENT_OPENAI_API_KEY= # 为空时使用 DummyProvider +AGENT_OPENAI_ORGANIZATION= # 可选 +AGENT_OPENAI_PROJECT= # 可选 +AGENT_OPENAI_MODEL=gpt-4o-mini +AGENT_OPENAI_TEMPERATURE=0.7 +AGENT_OPENAI_TOP_P=1.0 +AGENT_OPENAI_INCLUDE_USAGE=false + +# Agent Provider HTTP 配置(重试机制) +AGENT_PROVIDER_TIMEOUT=30 # HTTP 请求超时(秒) +AGENT_PROVIDER_CONNECT_TIMEOUT=5 # 连接超时(秒) +AGENT_PROVIDER_RETRY_TIMES=1 # 建立流前重试次数(仅连接失败/429/5xx 且未产出事件时) +AGENT_PROVIDER_RETRY_BACKOFF_MS=500 # 重试退避毫秒(指数退避) + +# Agent Run Job 配置 +AGENT_RUN_JOB_TRIES=1 # 队列重试次数 +AGENT_RUN_JOB_BACKOFF=3 # 重试退避秒数 +AGENT_RUN_JOB_TIMEOUT=600 # Job 超时时间(秒) + +# 工具系统配置 +AGENT_TOOL_MAX_CALLS_PER_RUN=1 # 单个父 Run 允许的工具调用次数 +AGENT_TOOL_WAIT_TIMEOUT_MS=15000 # 等待 tool.result 的超时时间(毫秒) +AGENT_TOOL_WAIT_POLL_MS=200 # 等待工具结果轮询间隔(毫秒) +AGENT_TOOL_TIMEOUT_SECONDS=15 # 单个工具执行超时(秒,超出记为 TIMEOUT) +AGENT_TOOL_RESULT_MAX_BYTES=4096 # 工具结果最大保存字节数(截断后仍会写入) +AGENT_TOOL_CHOICE=auto # OpenAI tool_choice 选项(auto/required 等) +AGENT_TOOL_JOB_TRIES=1 # ToolRunJob 重试次数 +AGENT_TOOL_JOB_BACKOFF=3 # ToolRunJob 重试退避秒数 +AGENT_TOOL_JOB_TIMEOUT=120 # ToolRunJob 超时时间(秒) +``` + +## 初始化新环境 + +```bash +# 1. 复制环境配置 +cp .env.example .env + +# 2. 生成应用密钥 +docker compose exec app php artisan key:generate + +# 3. 生成 JWT 密钥 +docker compose exec app php artisan jwt:secret + +# 4. 运行迁移 +docker compose exec app php artisan migrate + +# 5. (可选)创建测试用户 +docker compose exec app php artisan db:seed ``` ## 相关文档 @@ -315,3 +590,90 @@ CORS_ALLOWED_ORIGINS=http://localhost:5173 - API 详细文档:`docs/ChatSession/chat-session-api.md` - OpenAPI 规范:`docs/ChatSession/chat-session-openapi.yaml` - 用户管理文档:`docs/User/user-api.md` + +## 常见问题排查 + +### 队列任务不执行 +- 检查 Horizon 是否运行:`docker compose ps horizon` +- 查看 Horizon 日志:`docker compose logs -f horizon` +- 检查 Redis 连接: + ```bash + docker compose exec app php artisan tinker + > Redis::ping() # 应返回 "PONG" + ``` +- 查看失败的任务:`docker compose exec app php artisan queue:failed` +- 重试失败任务:`docker compose exec app php artisan queue:retry all` + +### SSE 连接断开 +- 检查 Nginx/代理是否支持 SSE(需要禁用缓冲) +- 确认客户端正确处理 `Last-Event-ID` 续传 +- 查看 Redis 发布日志 +- 测试环境下 SSE 会自动回退到仅返回 backlog(无实时推送) + +### Agent Run 失败 +- 查看 `messages` 表中 `type=error` 的消息: + ```sql + SELECT message_id, session_id, content, payload + FROM messages + WHERE type = 'error' + ORDER BY created_at DESC + LIMIT 10; + ``` +- 检查 `payload.error_type`、`payload.provider`、`payload.retryable`、`payload.details` +- 检查 Provider 配置: + ```bash + docker compose exec app php artisan config:show agent + ``` +- 查看实时日志: + ```bash + docker compose exec app php artisan pail + # 或查看容器日志 + docker compose logs -f app + ``` +- 测试 OpenAI API Key: + ```bash + docker compose exec app php artisan tinker + > $provider = app(App\Services\Agent\AgentProviderInterface::class); + > $context = new App\Services\Agent\AgentContext('test', []); + > foreach ($provider->stream($context) as $event) { dump($event); } + ``` + +### 工具调用问题 +- 检查工具是否注册: + ```bash + docker compose exec app php artisan tinker + > $registry = app(App\Services\Tool\ToolRegistry::class); + > dump($registry->openAiToolsSpec()); + ``` +- 查看 tool.call 和 tool.result 消息: + ```sql + SELECT message_id, type, content, payload + FROM messages + WHERE session_id = 'xxx' AND type IN ('tool.call', 'tool.result') + ORDER BY seq; + ``` +- 检查工具调用上限:配置 `AGENT_TOOL_MAX_CALLS_PER_RUN` +- 工具执行超时:检查 `payload.status` 是否为 `TIMEOUT` + +### 数据库迁移问题 +- 确保 PostgreSQL 已启动:`docker compose ps pgsql` +- 查看迁移状态:`docker compose exec app php artisan migrate:status` +- 检查数据库连接: + ```bash + docker compose exec app php artisan tinker + > DB::connection()->getPdo() + ``` +- 查看数据库日志:`docker compose logs -f pgsql` + +### 消息 seq 不连续或重复 +- 检查是否有并发追加消息(应使用行锁 + 事务) +- 确认所有消息追加都通过 `ChatService::appendMessage()` +- 查看 unique 约束冲突日志 + +### 调试技巧 +- **实时日志**:`docker compose exec app php artisan pail` +- **Telescope**:访问 http://localhost:8000/telescope 查看请求、查询、队列 +- **Tinker REPL**:`docker compose exec app php artisan tinker` 交互式调试 +- **查看配置**:`php artisan config:show agent` +- **查看路由**:`php artisan route:list` +- **数据库查询日志**:在 `.env` 中设置 `DB_LOG_QUERIES=true` diff --git a/app/Services/OutputSink.php b/app/Services/OutputSink.php index 7ad856e..a23a064 100644 --- a/app/Services/OutputSink.php +++ b/app/Services/OutputSink.php @@ -30,13 +30,18 @@ class OutputSink } /** + * 追加 Agent 流式文本增量(仅用于 SSE 推送,不落库)。 + * + * message.delta 消息只用于实时流式推送,不需要持久化到数据库。 + * 最终的完整回复会通过 appendAgentMessage() 落库。 + * * @param array $meta */ - public function appendAgentDelta(string $sessionId, string $runId, string $content, int $deltaIndex, array $meta = []): Message + public function appendAgentDelta(string $sessionId, string $runId, string $content, int $deltaIndex, array $meta = []): void { - $dedupeKey = "run:{$runId}:agent:delta:{$deltaIndex}"; - - return $this->chatService->appendMessage([ + // 1. 创建临时 Message 对象(不保存到数据库) + $message = new Message([ + 'message_id' => (string) \Illuminate\Support\Str::uuid(), 'session_id' => $sessionId, 'role' => Message::ROLE_AGENT, 'type' => 'message.delta', @@ -45,8 +50,45 @@ class OutputSink 'run_id' => $runId, 'delta_index' => $deltaIndex, ]), - 'dedupe_key' => $dedupeKey, - ], $wasDeduped); + 'dedupe_key' => "run:{$runId}:agent:delta:{$deltaIndex}", + 'seq' => 0, // delta 消息不需要真实的 seq + 'created_at' => now(), + ]); + + // 2. 仅发布 Redis 事件,供 SSE 实时推送 + $this->publishDeltaMessage($message); + } + + /** + * 发布 delta 消息到 Redis(仅用于 SSE 推送)。 + * + * 此方法不保存消息到数据库,只发布事件供 SSE 客户端接收。 + */ + private function publishDeltaMessage(Message $message): void + { + $root = \Illuminate\Support\Facades\Redis::getFacadeRoot(); + $isMocked = $root instanceof \Mockery\MockInterface; + + // 如果 Redis 不可用(测试环境),直接返回 + if (!class_exists(\Redis::class) && !$isMocked) { + return; + } + + $channel = "session:{$message->session_id}:messages"; + + try { + \Illuminate\Support\Facades\Redis::publish( + $channel, + json_encode($message->toArray(), JSON_UNESCAPED_UNICODE | JSON_INVALID_UTF8_IGNORE) + ); + } catch (\Throwable $e) { + logger()->warning('Redis publish failed for delta message', [ + 'session_id' => $message->session_id, + 'run_id' => $message->payload['run_id'] ?? null, + 'delta_index' => $message->payload['delta_index'] ?? null, + 'error' => $e->getMessage(), + ]); + } } /** diff --git a/app/Services/RunLoop.php b/app/Services/RunLoop.php index 3dca373..26e1dc1 100644 --- a/app/Services/RunLoop.php +++ b/app/Services/RunLoop.php @@ -37,10 +37,17 @@ class RunLoop } /** - * 运行单次 Agent Run(按 run_id 幂等),负责取消检查、Provider 调用和结果落库。 + * 运行单次 Agent Run(按 run_id 幂等)。 + * + * 主流程: + * 1. 检查 run 是否已终止 + * 2. 进入主循环,持续调用 Provider 直到完成或失败 + * 3. 每轮迭代可能触发工具调用,工具完成后继续下一轮 + * 4. 没有工具调用时,写入最终回复并标记 DONE */ public function run(string $sessionId, string $runId): void { + // 1. 幂等性检查:避免重复执行已完成的 run if ($this->isRunTerminal($sessionId, $runId)) { return; } @@ -48,137 +55,346 @@ class RunLoop $providerName = $this->resolveProviderName(); $toolCallCount = 0; + // 2. 主循环:持续调用 Provider 直到完成或失败 while (true) { - if ($this->isCanceled($sessionId, $runId)) { - $this->appendCanceled($sessionId, $runId); + // 2.1 检查用户是否取消 + if ($this->checkAndHandleCancel($sessionId, $runId)) { return; } - $context = $this->contextBuilder->build($sessionId, $runId); - $providerOptions = [ - 'should_stop' => fn () => $this->isCanceled($sessionId, $runId), - ]; + // 2.2 执行一轮 Provider 调用 + $iterationResult = $this->executeProviderIteration( + $sessionId, + $runId, + $providerName, + $toolCallCount + ); - // 达到工具调用上限后强制关闭后续工具调用,避免再次触发 TOOL_CALL_LIMIT。 - if ($toolCallCount >= $this->maxToolCalls) { - $providerOptions['tool_choice'] = 'none'; - } - $logOptions = $providerOptions; - unset($logOptions['should_stop']); - logger('agent provider context', [ - 'sessionId' => $sessionId, - 'runId' => $runId, - 'provider' => $providerName, - 'context' => $context, - 'provider_options' => $logOptions, - ]); - $startedAt = microtime(true); - - logger('agent provider request', [ - 'sessionId' => $sessionId, - 'runId' => $runId, - 'provider' => $providerName, - 'iteration' => $toolCallCount, - ]); - - // 单轮 Agent 调用(可能触发工具调用,后续再进下一轮) - $streamState = $this->consumeProviderStream($sessionId, $runId, $context, $providerName, $startedAt, $providerOptions); - - if ($streamState['canceled'] || $streamState['failed']) { + // 2.3 处理失败或取消 + if ($iterationResult['should_exit']) { return; } - if (! empty($streamState['tool_calls'])) { - $toolCallCount += count($streamState['tool_calls']); - - if ($toolCallCount > $this->maxToolCalls) { - $this->appendProviderFailure( - $sessionId, - $runId, - 'TOOL_CALL_LIMIT', - 'Tool call limit reached for this run', - $providerName, - $this->latencyMs($startedAt), - [], - 'TOOL_CALL_LIMIT' - ); + // 2.4 如果有工具调用,处理工具执行流程 + if ($iterationResult['has_tool_calls']) { + $shouldExit = $this->handleToolCalls( + $sessionId, + $runId, + $providerName, + $iterationResult, + $toolCallCount + ); + if ($shouldExit) { return; } - // 工具调用:先调度子 Run,再等待 tool.result,随后继续下一轮 Provider 调用。 - $toolCalls = $this->dispatchToolRuns($sessionId, $runId, $streamState['tool_calls']); - - $waitState = $this->awaitToolResults($sessionId, $runId, $toolCalls, $providerName); - - if ($waitState['failed'] || $waitState['canceled']) { - return; - } - - // 工具结果已写回上下文,继续下一轮 Agent 调用。 + // 更新工具调用计数,继续下一轮 Provider 调用 + $toolCallCount = $iterationResult['updated_tool_count']; continue; } - $latencyMs = $this->latencyMs($startedAt); - - logger('agent provider response', [ - 'sessionId' => $sessionId, - 'runId' => $runId, - 'provider' => $providerName, - 'latency_ms' => $latencyMs, - ]); - - if ($this->isCanceled($sessionId, $runId)) { - $this->appendCanceled($sessionId, $runId); - return; - } - - if (! $streamState['received_event']) { - $this->appendProviderFailure( - $sessionId, - $runId, - 'EMPTY_STREAM', - 'Agent provider returned no events', - $providerName, - $latencyMs, - [], - 'EMPTY_STREAM' - ); - return; - } - - if ($streamState['done_reason'] === null) { - $this->appendProviderFailure( - $sessionId, - $runId, - 'STREAM_INCOMPLETE', - 'Agent provider stream ended unexpectedly', - $providerName, - $latencyMs, - [], - 'STREAM_INCOMPLETE' - ); - return; - } - - $this->outputSink->appendAgentMessage($sessionId, $runId, $streamState['reply'], [ - 'provider' => $providerName, - 'done_reason' => $streamState['done_reason'], - ], "run:{$runId}:agent:message"); - - if ($this->isCanceled($sessionId, $runId)) { - $this->appendCanceled($sessionId, $runId); - return; - } - - $this->outputSink->appendRunStatus($sessionId, $runId, 'DONE', [ - 'dedupe_key' => "run:{$runId}:status:DONE", - ]); + // 2.5 没有工具调用,完成 run + $this->completeRun( + $sessionId, + $runId, + $providerName, + $iterationResult['stream_state'], + $iterationResult['latency_ms'] + ); return; } } + /** + * 检查并处理取消状态。 + * + * @return bool 是否已处理取消(true 表示已取消并写入状态) + */ + private function checkAndHandleCancel(string $sessionId, string $runId): bool + { + if ($this->isCanceled($sessionId, $runId)) { + $this->appendCanceled($sessionId, $runId); + return true; + } + + return false; + } + + /** + * 执行一轮 Provider 调用迭代。 + * + * 包括: + * - 构建上下文 + * - 准备 Provider 选项(工具调用限制、取消回调等) + * - 调用 Provider 流式接口 + * - 记录日志 + * + * @return array{ + * stream_state: array, + * has_tool_calls: bool, + * updated_tool_count: int, + * should_exit: bool, + * latency_ms: int + * } + */ + private function executeProviderIteration( + string $sessionId, + string $runId, + string $providerName, + int $currentToolCallCount + ): array { + // 1. 构建上下文和 Provider 选项 + $context = $this->contextBuilder->build($sessionId, $runId); + $providerOptions = $this->buildProviderOptions($sessionId, $runId, $currentToolCallCount); + + // 2. 记录调用日志 + $this->logProviderRequest($sessionId, $runId, $providerName, $context, $providerOptions, $currentToolCallCount); + + // 3. 调用 Provider 并消费事件流 + $startedAt = microtime(true); + $streamState = $this->consumeProviderStream( + $sessionId, + $runId, + $context, + $providerName, + $startedAt, + $providerOptions + ); + + $latencyMs = $this->latencyMs($startedAt); + + // 4. 检查流式调用是否失败或取消 + if ($streamState['canceled'] || $streamState['failed']) { + return [ + 'stream_state' => $streamState, + 'has_tool_calls' => false, + 'updated_tool_count' => $currentToolCallCount, + 'should_exit' => true, + 'latency_ms' => $latencyMs, + ]; + } + + // 5. 检查是否有工具调用 + $hasToolCalls = !empty($streamState['tool_calls']); + $updatedToolCount = $currentToolCallCount + count($streamState['tool_calls']); + + return [ + 'stream_state' => $streamState, + 'has_tool_calls' => $hasToolCalls, + 'updated_tool_count' => $updatedToolCount, + 'should_exit' => false, + 'latency_ms' => $latencyMs, + ]; + } + + /** + * 构建 Provider 调用选项。 + * + * 包括: + * - 取消检查回调 + * - 工具调用限制控制 + */ + private function buildProviderOptions(string $sessionId, string $runId, int $toolCallCount): array + { + $options = [ + 'should_stop' => fn () => $this->isCanceled($sessionId, $runId), + ]; + + // 达到工具调用上限后,强制禁用工具调用,避免再次触发 TOOL_CALL_LIMIT 错误 + if ($toolCallCount >= $this->maxToolCalls) { + $options['tool_choice'] = 'none'; + } + + return $options; + } + + /** + * 记录 Provider 请求日志。 + */ + private function logProviderRequest( + string $sessionId, + string $runId, + string $providerName, + AgentContext $context, + array $providerOptions, + int $iteration + ): void { + // 日志选项(移除不可序列化的回调) + $logOptions = $providerOptions; + unset($logOptions['should_stop']); + + logger('agent provider context', [ + 'sessionId' => $sessionId, + 'runId' => $runId, + 'provider' => $providerName, + 'context' => $context, + 'provider_options' => $logOptions, + ]); + + logger('agent provider request', [ + 'sessionId' => $sessionId, + 'runId' => $runId, + 'provider' => $providerName, + 'iteration' => $iteration, + ]); + } + + /** + * 处理工具调用流程。 + * + * 流程: + * 1. 检查工具调用数量是否超限 + * 2. 分发工具子 Run + * 3. 等待工具执行结果 + * + * @return bool 是否应该退出主循环(超限、失败或取消时返回 true) + */ + private function handleToolCalls( + string $sessionId, + string $runId, + string $providerName, + array $iterationResult, + int $originalToolCallCount + ): bool { + $streamState = $iterationResult['stream_state']; + $latencyMs = $iterationResult['latency_ms']; + $updatedToolCount = $iterationResult['updated_tool_count']; + + // 1. 检查工具调用数量是否超限 + if ($updatedToolCount > $this->maxToolCalls) { + $this->appendProviderFailure( + $sessionId, + $runId, + 'TOOL_CALL_LIMIT', + 'Tool call limit reached for this run', + $providerName, + $latencyMs, + [], + 'TOOL_CALL_LIMIT' + ); + + return true; // 退出主循环 + } + + // 2. 分发工具子 Run + $toolCalls = $this->dispatchToolRuns($sessionId, $runId, $streamState['tool_calls']); + + // 3. 等待所有工具执行完成 + $waitState = $this->awaitToolResults($sessionId, $runId, $toolCalls, $providerName); + + // 4. 检查等待过程中是否失败或取消 + if ($waitState['failed'] || $waitState['canceled']) { + return true; // 退出主循环 + } + + // 工具结果已写回上下文,继续下一轮 Agent 调用 + return false; + } + + /** + * 完成 Run 并写入最终状态。 + * + * 流程: + * 1. 验证流式响应的有效性 + * 2. 写入最终 agent.message + * 3. 再次检查取消状态 + * 4. 写入 run.status = DONE + */ + private function completeRun( + string $sessionId, + string $runId, + string $providerName, + array $streamState, + int $latencyMs + ): void { + // 1. 记录响应日志 + logger('agent provider response', [ + 'sessionId' => $sessionId, + 'runId' => $runId, + 'provider' => $providerName, + 'latency_ms' => $latencyMs, + ]); + + // 2. 再次检查取消状态(在写入最终消息前) + if ($this->checkAndHandleCancel($sessionId, $runId)) { + return; + } + + // 3. 验证流式响应的有效性 + if (!$this->validateStreamResponse($sessionId, $runId, $providerName, $streamState, $latencyMs)) { + return; + } + + // 4. 写入最终 agent.message + $this->outputSink->appendAgentMessage($sessionId, $runId, $streamState['reply'], [ + 'provider' => $providerName, + 'done_reason' => $streamState['done_reason'], + ], "run:{$runId}:agent:message"); + + // 5. 最后一次检查取消状态(在写入 DONE 前) + if ($this->checkAndHandleCancel($sessionId, $runId)) { + return; + } + + // 6. 写入 run.status = DONE + $this->outputSink->appendRunStatus($sessionId, $runId, 'DONE', [ + 'dedupe_key' => "run:{$runId}:status:DONE", + ]); + } + + /** + * 验证流式响应的有效性。 + * + * 检查: + * - 是否收到任何事件(避免空流) + * - 流是否正常结束(有 done_reason) + * + * @return bool 是否有效(true 表示有效,false 表示无效并已写入错误) + */ + private function validateStreamResponse( + string $sessionId, + string $runId, + string $providerName, + array $streamState, + int $latencyMs + ): bool { + // 1. 检查是否收到任何事件 + if (!$streamState['received_event']) { + $this->appendProviderFailure( + $sessionId, + $runId, + 'EMPTY_STREAM', + 'Agent provider returned no events', + $providerName, + $latencyMs, + [], + 'EMPTY_STREAM' + ); + + return false; + } + + // 2. 检查流是否正常结束 + if ($streamState['done_reason'] === null) { + $this->appendProviderFailure( + $sessionId, + $runId, + 'STREAM_INCOMPLETE', + 'Agent provider stream ended unexpectedly', + $providerName, + $latencyMs, + [], + 'STREAM_INCOMPLETE' + ); + + return false; + } + + return true; + } + /** * 判断指定 run 是否已到终态,避免重复执行。 */ diff --git a/phpunit.xml b/phpunit.xml index 356a8fa..44bd6c4 100644 --- a/phpunit.xml +++ b/phpunit.xml @@ -31,5 +31,8 @@ + + +