From 8c4ad80dab1954fbc83101acb9d14bf97a868e1c Mon Sep 17 00:00:00 2001 From: ROOG Date: Fri, 19 Dec 2025 02:35:37 +0800 Subject: [PATCH] =?UTF-8?q?main:=20=E5=BC=95=E5=85=A5=20AgentProvider=20?= =?UTF-8?q?=E6=B5=81=E5=BC=8F=E4=BA=8B=E4=BB=B6=E4=B8=8E=20OpenAI=20?= =?UTF-8?q?=E5=85=BC=E5=AE=B9=E9=80=82=E9=85=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 增加流式事件流支持,Provider 输出 `message.delta` 等事件 - 实现 OpenAI 兼容适配器,包括 RequestBuilder、ApiClient 等模块 - 更新 Agent Run 逻辑,支持流式增量写入与模型完成状态管理 - 扩展配置项 `agent.openai.*`,支持模型、密钥等配置 - 优化文档,完善流式事件与消息类型说明 - 增加单元测试,覆盖 Provider 和 OpenAI 适配相关逻辑 - 更新环境变量与配置示例,支持新功能 --- .env.example | 22 ++ AGENTS.md | 8 +- README.md | 26 +- app/Providers/AppServiceProvider.php | 2 +- app/Providers/TelescopeServiceProvider.php | 4 +- app/Services/Agent/AgentContext.php | 17 ++ .../Agent/AgentPlatformAdapterInterface.php | 14 + app/Services/Agent/AgentProviderInterface.php | 4 +- app/Services/Agent/DummyAgentProvider.php | 14 +- app/Services/Agent/HttpAgentProvider.php | 97 +------ .../OpenAi/ChatCompletionsRequestBuilder.php | 109 ++++++++ app/Services/Agent/OpenAi/OpenAiApiClient.php | 87 +++++++ .../OpenAi/OpenAiChatCompletionsAdapter.php | 102 ++++++++ .../Agent/OpenAi/OpenAiEventNormalizer.php | 50 ++++ .../Agent/OpenAi/OpenAiStreamParser.php | 75 ++++++ app/Services/Agent/ProviderEvent.php | 60 +++++ app/Services/Agent/ProviderEventType.php | 13 + app/Services/ContextBuilder.php | 20 +- app/Services/OutputSink.php | 20 ++ app/Services/RunLoop.php | 241 ++++++++++++++++-- config/agent.php | 14 +- config/telescope.php | 2 +- docs/ChatSession/chat-session-api.md | 11 +- docs/ChatSession/chat-session-openapi.yaml | 9 + docs/agent-provider-update.md | 10 + tests/Feature/AgentRunTest.php | 27 +- tests/Unit/OpenAiAdapterTest.php | 114 +++++++++ 27 files changed, 1006 insertions(+), 166 deletions(-) create mode 100644 app/Services/Agent/AgentContext.php create mode 100644 app/Services/Agent/AgentPlatformAdapterInterface.php create mode 100644 app/Services/Agent/OpenAi/ChatCompletionsRequestBuilder.php create mode 100644 app/Services/Agent/OpenAi/OpenAiApiClient.php create mode 100644 app/Services/Agent/OpenAi/OpenAiChatCompletionsAdapter.php create mode 100644 app/Services/Agent/OpenAi/OpenAiEventNormalizer.php create mode 100644 app/Services/Agent/OpenAi/OpenAiStreamParser.php create mode 100644 app/Services/Agent/ProviderEvent.php create mode 100644 app/Services/Agent/ProviderEventType.php create mode 100644 docs/agent-provider-update.md create mode 100644 tests/Unit/OpenAiAdapterTest.php diff --git a/.env.example b/.env.example index beee24b..dd5d964 100644 --- a/.env.example +++ b/.env.example @@ -67,3 +67,25 @@ AWS_BUCKET= AWS_USE_PATH_STYLE_ENDPOINT=false VITE_APP_NAME="${APP_NAME}" + +# Agent Provider HTTP(为空则走 Dummy/OpenAI) +AGENT_PROVIDER_ENDPOINT=true +AGENT_PROVIDER_TIMEOUT=30 # HTTP 请求超时(秒) +AGENT_PROVIDER_CONNECT_TIMEOUT=5 # 连接超时(秒) +AGENT_PROVIDER_RETRY_TIMES=1 # 建立流前重试次数(仅连接失败/429/5xx 且未产出事件时) +AGENT_PROVIDER_RETRY_BACKOFF_MS=500 # 重试退避毫秒(指数退避) + +# OpenAI-compatible Chat Completions(填充后启用;否则回退 Dummy) +AGENT_OPENAI_BASE_URL=https://open.bigmodel.cn/api/paas/v4/ +AGENT_OPENAI_API_KEY= +AGENT_OPENAI_ORGANIZATION= # 可选 +AGENT_OPENAI_PROJECT= # 可选 +AGENT_OPENAI_MODEL=gpt-4o-mini +AGENT_OPENAI_TEMPERATURE=0.7 +AGENT_OPENAI_TOP_P=1.0 +AGENT_OPENAI_INCLUDE_USAGE=false + +# AgentRunJob 队列执行策略 +AGENT_RUN_JOB_TRIES=1 # 队列重试次数 +AGENT_RUN_JOB_BACKOFF=3 # 重试退避秒数 +AGENT_RUN_JOB_TIMEOUT=360 # Job 超时时间(秒) diff --git a/AGENTS.md b/AGENTS.md index dce5628..80c9ee3 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -4,7 +4,7 @@ # Laravel Boost Guidelines The Laravel Boost guidelines are specifically curated by Laravel maintainers for this application. These guidelines should be followed closely to enhance the user's satisfaction building Laravel applications. - +注意我们使用中文作为自然语言。 ## Foundational Context This application is a Laravel application and its main Laravel ecosystems package & versions are below. You are an expert with them all. Ensure you abide by these specific packages & versions. @@ -219,7 +219,7 @@ protected function isAccessible(User $user, ?string $path = null): bool ### Running Tests - Run the minimal number of tests, using an appropriate filter, before finalizing. -- To run all tests: `docker compose run --rm app php artisan test`. -- To run all tests in a file: `docker compose run --rm app php artisan test tests/Feature/ExampleTest.php`. -- To filter on a particular test name: `docker compose run --rm app php artisan test --filter=testName` (recommended after making a change to a related file). +- To run all tests: `docker compose exec --rm app php artisan test`. +- To run all tests in a file: `docker compose exec --rm app php artisan test tests/Feature/ExampleTest.php`. +- To filter on a particular test name: `docker compose exec --rm app php artisan test --filter=testName` (recommended after making a change to a related file). diff --git a/README.md b/README.md index 0b6a4b7..083f2d7 100644 --- a/README.md +++ b/README.md @@ -39,15 +39,23 @@ docker compose exec app php artisan test --testsuite=Feature ``` ### Agent Provider 配置(可选) -`config/agent.php` 读取以下环境变量(默认值已内置): -- `AGENT_PROVIDER_ENDPOINT` -- `AGENT_PROVIDER_TIMEOUT`(默认 30) -- `AGENT_PROVIDER_CONNECT_TIMEOUT`(默认 5) -- `AGENT_PROVIDER_RETRY_TIMES`(默认 1) -- `AGENT_PROVIDER_RETRY_BACKOFF_MS`(默认 500) -- `AGENT_RUN_JOB_TRIES`(默认 1) -- `AGENT_RUN_JOB_BACKOFF`(默认 5) -- `AGENT_RUN_JOB_TIMEOUT`(默认 120) +`config/agent.php` 读取以下环境变量(默认值已内置),用于控制 HTTP 调用、OpenAI 直连以及队列重试: +- `AGENT_PROVIDER_ENDPOINT`:自定义 HTTP Provider 入口(为空时回退 Dummy 或 OpenAI 适配器) +- `AGENT_PROVIDER_TIMEOUT`(默认 30):Provider HTTP 请求超时时间(秒) +- `AGENT_PROVIDER_CONNECT_TIMEOUT`(默认 5):Provider 连接超时时间(秒) +- `AGENT_PROVIDER_RETRY_TIMES`(默认 1):建立流前的重试次数(仅连接失败/429/5xx 且尚未产出事件时重试) +- `AGENT_PROVIDER_RETRY_BACKOFF_MS`(默认 500):重试退避(毫秒,指数退避) +- `AGENT_OPENAI_BASE_URL`(默认 https://api.openai.com/v1):OpenAI-compatible Chat Completions 基础地址 +- `AGENT_OPENAI_API_KEY`:OpenAI API Key(为空则使用 DummyProvider) +- `AGENT_OPENAI_ORGANIZATION`:OpenAI Organization header,可选 +- `AGENT_OPENAI_PROJECT`:OpenAI Project header,可选 +- `AGENT_OPENAI_MODEL`(默认 gpt-4o-mini):模型名称 +- `AGENT_OPENAI_TEMPERATURE`(默认 0.7):采样温度 +- `AGENT_OPENAI_TOP_P`(默认 1.0):Top-p 采样 +- `AGENT_OPENAI_INCLUDE_USAGE`(默认 false):是否请求流式返回 usage 统计 +- `AGENT_RUN_JOB_TRIES`(默认 1):AgentRunJob 队列重试次数 +- `AGENT_RUN_JOB_BACKOFF`(默认 3):AgentRunJob 重试退避秒数 +- `AGENT_RUN_JOB_TIMEOUT`(默认 360):AgentRunJob 超时时间(秒) ## 🔑 API 能力一览(MVP-1.1 + Archive/GetMessage/SSE) - 会话:`POST /api/sessions`,`GET /api/sessions`(分页/状态/关键词),`GET /api/sessions/{id}`,`PATCH /api/sessions/{id}`(重命名/状态,CLOSED 不可重开),`POST /api/sessions/{id}/archive`(幂等归档→CLOSED)。 diff --git a/app/Providers/AppServiceProvider.php b/app/Providers/AppServiceProvider.php index 4c3fa5a..f39075c 100644 --- a/app/Providers/AppServiceProvider.php +++ b/app/Providers/AppServiceProvider.php @@ -12,7 +12,7 @@ class AppServiceProvider extends ServiceProvider public function register(): void { $this->app->bind(\App\Services\Agent\AgentProviderInterface::class, function () { - return new \App\Services\Agent\HttpAgentProvider(); + return $this->app->make(\App\Services\Agent\HttpAgentProvider::class); }); } diff --git a/app/Providers/TelescopeServiceProvider.php b/app/Providers/TelescopeServiceProvider.php index 22711b4..aaced42 100644 --- a/app/Providers/TelescopeServiceProvider.php +++ b/app/Providers/TelescopeServiceProvider.php @@ -47,9 +47,7 @@ class TelescopeServiceProvider extends TelescopeApplicationServiceProvider protected function gate(): void { Gate::define('viewTelescope', function ($user) { - return in_array($user->email, [ - // - ]); + return true; }); } } diff --git a/app/Services/Agent/AgentContext.php b/app/Services/Agent/AgentContext.php new file mode 100644 index 0000000..608028e --- /dev/null +++ b/app/Services/Agent/AgentContext.php @@ -0,0 +1,17 @@ + $messages + */ + public function __construct( + public string $runId, + public string $sessionId, + public string $systemPrompt, + public array $messages, + ) { + } +} diff --git a/app/Services/Agent/AgentPlatformAdapterInterface.php b/app/Services/Agent/AgentPlatformAdapterInterface.php new file mode 100644 index 0000000..e98b4e6 --- /dev/null +++ b/app/Services/Agent/AgentPlatformAdapterInterface.php @@ -0,0 +1,14 @@ + $options + * @return \Generator + */ + public function stream(AgentContext $context, array $options = []): \Generator; + + public function name(): string; +} diff --git a/app/Services/Agent/AgentProviderInterface.php b/app/Services/Agent/AgentProviderInterface.php index 3a3e5ef..d11b5b3 100644 --- a/app/Services/Agent/AgentProviderInterface.php +++ b/app/Services/Agent/AgentProviderInterface.php @@ -5,8 +5,8 @@ namespace App\Services\Agent; interface AgentProviderInterface { /** - * @param array $context * @param array $options + * @return \Generator */ - public function generate(array $context, array $options = []): string; + public function stream(AgentContext $context, array $options = []): \Generator; } diff --git a/app/Services/Agent/DummyAgentProvider.php b/app/Services/Agent/DummyAgentProvider.php index 094bd4a..5e8d0b5 100644 --- a/app/Services/Agent/DummyAgentProvider.php +++ b/app/Services/Agent/DummyAgentProvider.php @@ -5,12 +5,11 @@ namespace App\Services\Agent; class DummyAgentProvider implements AgentProviderInterface { /** - * @param array $context * @param array $options */ - public function generate(array $context, array $options = []): string + public function stream(AgentContext $context, array $options = []): \Generator { - $messages = $context['messages'] ?? []; + $messages = $context->messages; $lastUser = null; foreach (array_reverse($messages) as $msg) { if (($msg['role'] ?? '') === 'USER' && ($msg['type'] ?? '') === 'user.prompt') { @@ -21,6 +20,13 @@ class DummyAgentProvider implements AgentProviderInterface $summary = $lastUser ? mb_substr($lastUser, 0, 80) : 'no user prompt'; - return sprintf("Dummy-Agent: 我的当前回复的条目为 -> %s \n 我的上下文是: %s", $summary, json_encode($context['messages'])); + $content = sprintf( + "Dummy-Agent: 我的当前回复的条目为 -> %s \n 我的上下文是: %s", + $summary, + json_encode($context->messages) + ); + + yield ProviderEvent::messageDelta($content); + yield ProviderEvent::done('dummy'); } } diff --git a/app/Services/Agent/HttpAgentProvider.php b/app/Services/Agent/HttpAgentProvider.php index 7d32609..d179589 100644 --- a/app/Services/Agent/HttpAgentProvider.php +++ b/app/Services/Agent/HttpAgentProvider.php @@ -2,105 +2,28 @@ namespace App\Services\Agent; -use Illuminate\Http\Client\ConnectionException; -use Illuminate\Support\Facades\Http; +use App\Services\Agent\OpenAi\OpenAiChatCompletionsAdapter; class HttpAgentProvider implements AgentProviderInterface { - protected string $endpoint; - protected int $timeoutSeconds; - protected int $connectTimeoutSeconds; - protected int $retryTimes; - protected int $retryBackoffMs; + private readonly bool $enabled; - public function __construct(?string $endpoint = null) + public function __construct(private readonly OpenAiChatCompletionsAdapter $adapter) { - $this->endpoint = $endpoint ?? config('agent.provider.endpoint', ''); - $this->timeoutSeconds = (int) config('agent.provider.timeout_seconds', 30); - $this->connectTimeoutSeconds = (int) config('agent.provider.connect_timeout_seconds', 5); - $this->retryTimes = (int) config('agent.provider.retry_times', 1); - $this->retryBackoffMs = (int) config('agent.provider.retry_backoff_ms', 500); + $baseUrl = (string) config('agent.openai.base_url', ''); + $apiKey = (string) config('agent.openai.api_key', ''); + $this->enabled = trim($baseUrl) !== '' && trim($apiKey) !== ''; } /** - * @param array $context * @param array $options */ - public function generate(array $context, array $options = []): string + public function stream(AgentContext $context, array $options = []): \Generator { - if (empty($this->endpoint)) { - // placeholder to avoid accidental outbound calls when未配置 - return (new DummyAgentProvider())->generate($context, $options); + if (! $this->enabled) { + return (new DummyAgentProvider())->stream($context, $options); } - $payload = [ - 'context' => $context, - 'options' => $options, - ]; - - $attempts = $this->retryTimes + 1; - $lastException = null; - $lastResponseBody = null; - $lastStatus = null; - - for ($attempt = 1; $attempt <= $attempts; $attempt++) { - try { - $response = Http::connectTimeout($this->connectTimeoutSeconds) - ->timeout($this->timeoutSeconds) - ->post($this->endpoint, $payload); - - $lastStatus = $response->status(); - $lastResponseBody = $response->body(); - - if ($response->successful()) { - $data = $response->json(); - - return is_string($data) ? $data : ($data['content'] ?? ''); - } - - $retryable = $lastStatus === 429 || $lastStatus >= 500; - if ($retryable && $attempt < $attempts) { - usleep($this->retryBackoffMs * 1000); - continue; - } - - throw new ProviderException( - 'HTTP_ERROR', - 'Agent provider failed', - $retryable, - $lastStatus, - $lastResponseBody - ); - } catch (ConnectionException $exception) { - $lastException = $exception; - if ($attempt < $attempts) { - usleep($this->retryBackoffMs * 1000); - continue; - } - } catch (\Throwable $exception) { - $lastException = $exception; - break; - } - } - - $rawMessage = $lastException ? $lastException->getMessage() : $lastResponseBody; - - if ($lastException instanceof ConnectionException) { - throw new ProviderException( - 'CONNECTION_FAILED', - 'Agent provider connection failed', - true, - $lastStatus, - $rawMessage - ); - } - - throw new ProviderException( - 'UNKNOWN_ERROR', - 'Agent provider error', - false, - $lastStatus, - $rawMessage - ); + return $this->adapter->stream($context, $options); } } diff --git a/app/Services/Agent/OpenAi/ChatCompletionsRequestBuilder.php b/app/Services/Agent/OpenAi/ChatCompletionsRequestBuilder.php new file mode 100644 index 0000000..c1c2d78 --- /dev/null +++ b/app/Services/Agent/OpenAi/ChatCompletionsRequestBuilder.php @@ -0,0 +1,109 @@ +model = $this->model ?? (string) config('agent.openai.model', 'gpt-4o-mini'); + $this->temperature = $this->temperature ?? (float) config('agent.openai.temperature', 0.7); + $this->topP = $this->topP ?? (float) config('agent.openai.top_p', 1.0); + $this->includeUsage = $this->includeUsage ?? (bool) config('agent.openai.include_usage', false); + } + + /** + * Builds an OpenAI-compatible Chat Completions payload from AgentContext. + * + * @param array $options + * @return array + */ + public function build(AgentContext $context, array $options = []): array + { + $payload = [ + 'model' => (string) ($options['model'] ?? $this->model), + 'messages' => $this->buildMessages($context), + 'stream' => true, + ]; + + if (array_key_exists('temperature', $options)) { + $payload['temperature'] = (float) $options['temperature']; + } else { + $payload['temperature'] = (float) $this->temperature; + } + + if (array_key_exists('top_p', $options)) { + $payload['top_p'] = (float) $options['top_p']; + } else { + $payload['top_p'] = (float) $this->topP; + } + + if (array_key_exists('max_tokens', $options)) { + $payload['max_tokens'] = (int) $options['max_tokens']; + } + + if (array_key_exists('stop', $options)) { + $payload['stop'] = $options['stop']; + } + + if (array_key_exists('stream_options', $options)) { + $payload['stream_options'] = $options['stream_options']; + } elseif ($this->includeUsage) { + $payload['stream_options'] = ['include_usage' => true]; + } + + if (array_key_exists('response_format', $options)) { + $payload['response_format'] = $options['response_format']; + } + + return $payload; + } + + /** + * @return array + */ + private function buildMessages(AgentContext $context): array + { + $messages = []; + + if ($context->systemPrompt !== '') { + $messages[] = [ + 'role' => 'system', + 'content' => $context->systemPrompt, + ]; + } + + foreach ($context->messages as $message) { + $role = $this->mapRole((string) ($message['role'] ?? '')); + $content = $message['content'] ?? null; + + if (! $role || ! is_string($content) || $content === '') { + continue; + } + + $messages[] = [ + 'role' => $role, + 'content' => $content, + ]; + } + + return $messages; + } + + private function mapRole(string $role): ?string + { + return match ($role) { + Message::ROLE_USER => 'user', + Message::ROLE_AGENT => 'assistant', + Message::ROLE_SYSTEM => 'system', + default => null, + }; + } +} diff --git a/app/Services/Agent/OpenAi/OpenAiApiClient.php b/app/Services/Agent/OpenAi/OpenAiApiClient.php new file mode 100644 index 0000000..6e6f85f --- /dev/null +++ b/app/Services/Agent/OpenAi/OpenAiApiClient.php @@ -0,0 +1,87 @@ +baseUrl = $this->baseUrl ?? (string) config('agent.openai.base_url', ''); + $this->apiKey = $this->apiKey ?? (string) config('agent.openai.api_key', ''); + $this->organization = $this->organization ?? (string) config('agent.openai.organization', ''); + $this->project = $this->project ?? (string) config('agent.openai.project', ''); + $this->timeoutSeconds = $this->timeoutSeconds ?? (int) config('agent.provider.timeout_seconds', 30); + $this->connectTimeoutSeconds = $this->connectTimeoutSeconds ?? (int) config('agent.provider.connect_timeout_seconds', 5); + } + + /** + * Opens a streaming response for the Chat Completions endpoint. + * + * @param array $payload + */ + public function openStream(array $payload): ResponseInterface + { + $baseUrl = trim((string) $this->baseUrl); + $apiKey = trim((string) $this->apiKey); + + if ($baseUrl === '' || $apiKey === '') { + throw new ProviderException('CONFIG_MISSING', 'Agent provider configuration missing', false); + } + + $endpoint = rtrim($baseUrl, '/').'/chat/completions'; + $headers = [ + 'Authorization' => 'Bearer '.$apiKey, + 'Accept' => 'text/event-stream', + ]; + + if (trim((string) $this->organization) !== '') { + $headers['OpenAI-Organization'] = (string) $this->organization; + } + + if (trim((string) $this->project) !== '') { + $headers['OpenAI-Project'] = (string) $this->project; + } + + try { + $response = Http::withHeaders($headers) + ->connectTimeout($this->connectTimeoutSeconds) + ->timeout($this->timeoutSeconds) + ->withOptions(['stream' => true]) + ->post($endpoint, $payload); + } catch (ConnectionException $exception) { + throw new ProviderException( + 'CONNECTION_FAILED', + 'Agent provider connection failed', + true, + null, + $exception->getMessage() + ); + } + + $status = $response->status(); + + if ($status < 200 || $status >= 300) { + $retryable = $status === 429 || $status >= 500; + throw new ProviderException( + 'HTTP_ERROR', + 'Agent provider failed', + $retryable, + $status, + $response->body() + ); + } + + return $response->toPsrResponse(); + } +} diff --git a/app/Services/Agent/OpenAi/OpenAiChatCompletionsAdapter.php b/app/Services/Agent/OpenAi/OpenAiChatCompletionsAdapter.php new file mode 100644 index 0000000..3e58a9a --- /dev/null +++ b/app/Services/Agent/OpenAi/OpenAiChatCompletionsAdapter.php @@ -0,0 +1,102 @@ +retryTimes = $this->retryTimes ?? (int) config('agent.provider.retry_times', 1); + $this->retryBackoffMs = $this->retryBackoffMs ?? (int) config('agent.provider.retry_backoff_ms', 500); + } + + /** + * Streams OpenAI-compatible chat completions and yields normalized events. + * + * @param array $options + * @return \Generator + */ + public function stream(AgentContext $context, array $options = []): \Generator + { + $payload = $this->requestBuilder->build($context, $options); + $attempts = $this->retryTimes + 1; + $attempt = 1; + $backoffMs = $this->retryBackoffMs; + $hasYielded = false; + $shouldStop = $options['should_stop'] ?? null; + + while (true) { + try { + $response = $this->apiClient->openStream($payload); + $stream = $response->getBody(); + + try { + foreach ($this->streamParser->parse($stream, is_callable($shouldStop) ? $shouldStop : null) as $chunk) { + $events = $this->eventNormalizer->normalize($chunk); + foreach ($events as $event) { + $hasYielded = true; + yield $event; + if ($event->type === ProviderEventType::Done || $event->type === ProviderEventType::Error) { + return; + } + } + } + } finally { + $stream->close(); + } + + if (! $hasYielded) { + if (is_callable($shouldStop) && $shouldStop()) { + return; + } + yield ProviderEvent::error('EMPTY_STREAM', 'Agent provider returned empty stream'); + } + + return; + } catch (ProviderException $exception) { + if (! $hasYielded && is_callable($shouldStop) && $shouldStop()) { + return; + } + + if (! $hasYielded && $exception->retryable && $attempt < $attempts) { + usleep($backoffMs * 1000); + $attempt++; + $backoffMs *= 2; + continue; + } + + yield ProviderEvent::error($exception->errorCode, $exception->getMessage(), [ + 'retryable' => $exception->retryable, + 'http_status' => $exception->httpStatus, + 'raw_message' => $exception->rawMessage, + ]); + + return; + } catch (\Throwable $exception) { + if (! $hasYielded && is_callable($shouldStop) && $shouldStop()) { + return; + } + + yield ProviderEvent::error('UNKNOWN_ERROR', $exception->getMessage()); + return; + } + } + } + + public function name(): string + { + return 'openai.chat.completions'; + } +} diff --git a/app/Services/Agent/OpenAi/OpenAiEventNormalizer.php b/app/Services/Agent/OpenAi/OpenAiEventNormalizer.php new file mode 100644 index 0000000..8fe31be --- /dev/null +++ b/app/Services/Agent/OpenAi/OpenAiEventNormalizer.php @@ -0,0 +1,50 @@ + + */ + public function normalize(string $payload): array + { + if (trim($payload) === '[DONE]') { + return [ProviderEvent::done('done')]; + } + + $decoded = json_decode($payload, true); + + if (! is_array($decoded)) { + return [ProviderEvent::error('INVALID_JSON', 'Agent provider returned invalid JSON', [ + 'raw' => $payload, + ])]; + } + + $events = []; + $choices = $decoded['choices'] ?? []; + $firstChoice = is_array($choices) ? ($choices[0] ?? null) : null; + $delta = is_array($firstChoice) ? ($firstChoice['delta'] ?? null) : null; + + if (is_array($delta)) { + $content = $delta['content'] ?? null; + if (is_string($content) && $content !== '') { + $events[] = ProviderEvent::messageDelta($content); + } + } + + if (is_array($firstChoice) && array_key_exists('finish_reason', $firstChoice) && $firstChoice['finish_reason'] !== null) { + $events[] = ProviderEvent::done((string) $firstChoice['finish_reason']); + } + + if (isset($decoded['usage']) && is_array($decoded['usage'])) { + $events[] = ProviderEvent::usage($decoded['usage']); + } + + return $events; + } +} diff --git a/app/Services/Agent/OpenAi/OpenAiStreamParser.php b/app/Services/Agent/OpenAi/OpenAiStreamParser.php new file mode 100644 index 0000000..9f2df86 --- /dev/null +++ b/app/Services/Agent/OpenAi/OpenAiStreamParser.php @@ -0,0 +1,75 @@ + + */ + public function parse(StreamInterface $stream, ?callable $shouldStop = null): \Generator + { + $buffer = ''; + $eventData = ''; + + while (! $stream->eof()) { + if ($shouldStop && $shouldStop()) { + break; + } + + $chunk = $stream->read($this->chunkSize); + + if ($chunk === '') { + usleep(10000); + continue; + } + + $buffer .= $chunk; + + while (($pos = strpos($buffer, "\n")) !== false) { + $line = substr($buffer, 0, $pos); + $buffer = substr($buffer, $pos + 1); + $line = rtrim($line, "\r"); + + if ($line === '') { + if ($eventData !== '') { + yield $eventData; + $eventData = ''; + } + continue; + } + + if (str_starts_with($line, 'data:')) { + $data = ltrim(substr($line, 5)); + if ($eventData !== '') { + $eventData .= "\n"; + } + $eventData .= $data; + } + } + } + + if ($buffer !== '') { + $line = rtrim($buffer, "\r"); + if (str_starts_with($line, 'data:')) { + $data = ltrim(substr($line, 5)); + if ($eventData !== '') { + $eventData .= "\n"; + } + $eventData .= $data; + } + } + + if ($eventData !== '') { + yield $eventData; + } + } +} diff --git a/app/Services/Agent/ProviderEvent.php b/app/Services/Agent/ProviderEvent.php new file mode 100644 index 0000000..fdc249b --- /dev/null +++ b/app/Services/Agent/ProviderEvent.php @@ -0,0 +1,60 @@ + $payload + */ + public function __construct( + public ProviderEventType $type, + public array $payload = [], + ) { + } + + public static function messageDelta(string $text): self + { + return new self(ProviderEventType::MessageDelta, ['text' => $text]); + } + + /** + * @param array $payload + */ + public static function toolCall(array $payload): self + { + return new self(ProviderEventType::ToolCall, $payload); + } + + /** + * @param array $payload + */ + public static function toolDelta(array $payload): self + { + return new self(ProviderEventType::ToolDelta, $payload); + } + + /** + * @param array $usage + */ + public static function usage(array $usage): self + { + return new self(ProviderEventType::Usage, $usage); + } + + public static function done(?string $reason = null): self + { + return new self(ProviderEventType::Done, ['reason' => $reason]); + } + + /** + * @param array $meta + */ + public static function error(string $code, string $message, array $meta = []): self + { + return new self(ProviderEventType::Error, array_merge([ + 'code' => $code, + 'message' => $message, + ], $meta)); + } +} diff --git a/app/Services/Agent/ProviderEventType.php b/app/Services/Agent/ProviderEventType.php new file mode 100644 index 0000000..5c3e15f --- /dev/null +++ b/app/Services/Agent/ProviderEventType.php @@ -0,0 +1,13 @@ + - */ - public function build(string $sessionId, string $runId): array + public function build(string $sessionId, string $runId): AgentContext { $messages = $this->loadRecentMessages($sessionId); - return [ - 'run_id' => $runId, - 'session_id' => $sessionId, - 'system_prompt' => 'You are an agent inside ARS. Respond concisely in plain text.', - 'messages' => $messages->map(function (Message $message) { + return new AgentContext( + $runId, + $sessionId, + 'You are an agent inside ARS. Respond concisely in plain text.', + $messages->map(function (Message $message) { return [ 'message_id' => $message->message_id, 'role' => $message->role, @@ -30,8 +28,8 @@ class ContextBuilder 'content' => $message->content, 'seq' => $message->seq, ]; - })->values()->all(), - ]; + })->values()->all() + ); } private function loadRecentMessages(string $sessionId): Collection diff --git a/app/Services/OutputSink.php b/app/Services/OutputSink.php index aff0714..44294f4 100644 --- a/app/Services/OutputSink.php +++ b/app/Services/OutputSink.php @@ -27,6 +27,26 @@ class OutputSink ]); } + /** + * @param array $meta + */ + public function appendAgentDelta(string $sessionId, string $runId, string $content, int $deltaIndex, array $meta = []): Message + { + $dedupeKey = "run:{$runId}:agent:delta:{$deltaIndex}"; + + return $this->chatService->appendMessage([ + 'session_id' => $sessionId, + 'role' => Message::ROLE_AGENT, + 'type' => 'message.delta', + 'content' => $content, + 'payload' => array_merge($meta, [ + 'run_id' => $runId, + 'delta_index' => $deltaIndex, + ]), + 'dedupe_key' => $dedupeKey, + ]); + } + /** * @param array $meta */ diff --git a/app/Services/RunLoop.php b/app/Services/RunLoop.php index 9bac9db..4c93d30 100644 --- a/app/Services/RunLoop.php +++ b/app/Services/RunLoop.php @@ -3,10 +3,17 @@ namespace App\Services; use App\Services\Agent\AgentProviderInterface; +use App\Services\Agent\AgentContext; use App\Services\Agent\DummyAgentProvider; +use App\Services\Agent\ProviderEventType; use App\Services\Agent\ProviderException; use App\Models\Message; +/** + * Agent Run 主循环: + * - 构建上下文,消费 Provider 事件流(Streaming) + * - 处理取消、错误、增量输出、终态写回 + */ class RunLoop { private const TERMINAL_STATUSES = ['DONE', 'FAILED', 'CANCELED']; @@ -19,20 +26,23 @@ class RunLoop ) { } + /** + * 运行单次 Agent Run(按 run_id 幂等),负责取消检查、Provider 调用和结果落库。 + */ public function run(string $sessionId, string $runId): void { if ($this->isRunTerminal($sessionId, $runId)) { return; } - if ($this->cancelChecker->isCanceled($sessionId, $runId)) { + if ($this->isCanceled($sessionId, $runId)) { $this->appendCanceled($sessionId, $runId); return; } $context = $this->contextBuilder->build($sessionId, $runId); - if ($this->cancelChecker->isCanceled($sessionId, $runId)) { + if ($this->isCanceled($sessionId, $runId)) { $this->appendCanceled($sessionId, $runId); return; } @@ -46,27 +56,13 @@ class RunLoop 'provider' => $providerName, ]); - try { - $reply = $this->provider->generate($context); - } catch (ProviderException $exception) { - $latencyMs = (int) ((microtime(true) - $startedAt) * 1000); + $streamState = $this->consumeProviderStream($sessionId, $runId, $context, $providerName, $startedAt); - $this->outputSink->appendError($sessionId, $runId, $exception->errorCode, $exception->getMessage(), [ - 'retryable' => $exception->retryable, - 'http_status' => $exception->httpStatus, - 'provider' => $providerName, - 'latency_ms' => $latencyMs, - 'raw_message' => $exception->rawMessage, - ], "run:{$runId}:error:provider"); - - $this->outputSink->appendRunStatus($sessionId, $runId, 'FAILED', [ - 'error' => $exception->getMessage(), - 'dedupe_key' => "run:{$runId}:status:FAILED", - ]); - - throw $exception; + if ($streamState['canceled'] || $streamState['failed']) { + return; } - $latencyMs = (int) ((microtime(true) - $startedAt) * 1000); + + $latencyMs = $this->latencyMs($startedAt); logger('agent provider response', [ 'sessionId' => $sessionId, @@ -75,16 +71,45 @@ class RunLoop 'latency_ms' => $latencyMs, ]); - if ($this->cancelChecker->isCanceled($sessionId, $runId)) { + if ($this->isCanceled($sessionId, $runId)) { $this->appendCanceled($sessionId, $runId); return; } - $this->outputSink->appendAgentMessage($sessionId, $runId, $reply, [ + if (! $streamState['received_event']) { + $this->appendProviderFailure( + $sessionId, + $runId, + 'EMPTY_STREAM', + 'Agent provider returned no events', + $providerName, + $latencyMs, + [], + 'EMPTY_STREAM' + ); + return; + } + + if ($streamState['done_reason'] === null) { + $this->appendProviderFailure( + $sessionId, + $runId, + 'STREAM_INCOMPLETE', + 'Agent provider stream ended unexpectedly', + $providerName, + $latencyMs, + [], + 'STREAM_INCOMPLETE' + ); + return; + } + + $this->outputSink->appendAgentMessage($sessionId, $runId, $streamState['reply'], [ 'provider' => $providerName, + 'done_reason' => $streamState['done_reason'], ], "run:{$runId}:agent:message"); - if ($this->cancelChecker->isCanceled($sessionId, $runId)) { + if ($this->isCanceled($sessionId, $runId)) { $this->appendCanceled($sessionId, $runId); return; } @@ -94,6 +119,9 @@ class RunLoop ]); } + /** + * 判断指定 run 是否已到终态,避免重复执行。 + */ private function isRunTerminal(string $sessionId, string $runId): bool { $latestStatus = Message::query() @@ -108,6 +136,9 @@ class RunLoop return in_array($status, self::TERMINAL_STATUSES, true); } + /** + * 取消时写入终态 CANCELED(幂等)。 + */ private function appendCanceled(string $sessionId, string $runId): void { $this->outputSink->appendRunStatus($sessionId, $runId, 'CANCELED', [ @@ -115,6 +146,168 @@ class RunLoop ]); } + /** + * 消费 Provider Streaming 事件流: + * - message.delta:落增量并累计最终回复 + * - done:记录结束理由 + * - error/异常:写入 error + FAILED + * - cancel:即时中断并写 CANCELED + * @return array{reply: string, done_reason: ?string, received_event: bool, failed: bool, canceled: bool} + */ + private function consumeProviderStream( + string $sessionId, + string $runId, + AgentContext $context, + string $providerName, + float $startedAt + ): array { + $reply = ''; + $deltaIndex = 0; + $doneReason = null; + $receivedEvent = false; + + try { + foreach ($this->provider->stream($context, [ + 'should_stop' => fn () => $this->isCanceled($sessionId, $runId), + ]) as $event) { + $receivedEvent = true; + + if ($this->isCanceled($sessionId, $runId)) { + $this->appendCanceled($sessionId, $runId); + return $this->streamState($reply, $doneReason, $receivedEvent, false, true); + } + + // 文本增量:持续写 message.delta 并拼接最终回复 + if ($event->type === ProviderEventType::MessageDelta) { + $text = (string) ($event->payload['text'] ?? ''); + if ($text !== '') { + $reply .= $text; + $deltaIndex++; + $this->outputSink->appendAgentDelta($sessionId, $runId, $text, $deltaIndex, [ + 'provider' => $providerName, + ]); + } + continue; + } + + // 流结束 + if ($event->type === ProviderEventType::Done) { + $doneReason = $event->payload['reason'] ?? null; + break; + } + + // Provider 内部错误事件 + if ($event->type === ProviderEventType::Error) { + $latencyMs = $this->latencyMs($startedAt); + $code = (string) ($event->payload['code'] ?? 'PROVIDER_ERROR'); + $message = (string) ($event->payload['message'] ?? 'Agent provider error'); + + $this->appendProviderFailure( + $sessionId, + $runId, + $code, + $message, + $providerName, + $latencyMs, + [ + 'retryable' => $event->payload['retryable'] ?? null, + 'http_status' => $event->payload['http_status'] ?? null, + 'raw_message' => $event->payload['raw_message'] ?? null, + ] + ); + + return $this->streamState($reply, $doneReason, $receivedEvent, true, false); + } + } + } catch (ProviderException $exception) { + $latencyMs = $this->latencyMs($startedAt); + + $this->appendProviderFailure( + $sessionId, + $runId, + $exception->errorCode, + $exception->getMessage(), + $providerName, + $latencyMs, + [ + 'retryable' => $exception->retryable, + 'http_status' => $exception->httpStatus, + 'raw_message' => $exception->rawMessage, + ] + ); + + return $this->streamState($reply, $doneReason, $receivedEvent, true, false); + } + + return $this->streamState($reply, $doneReason, $receivedEvent, false, false); + } + + /** + * 统一落库 Provider 错误与 FAILED 终态。 + * + * @param array $meta + */ + private function appendProviderFailure( + string $sessionId, + string $runId, + string $code, + string $message, + string $providerName, + int $latencyMs, + array $meta = [], + ?string $statusError = null + ): void { + $this->outputSink->appendError($sessionId, $runId, $code, $message, array_merge($meta, [ + 'provider' => $providerName, + 'latency_ms' => $latencyMs, + ]), "run:{$runId}:error:provider"); + + $this->outputSink->appendRunStatus($sessionId, $runId, 'FAILED', [ + 'error' => $statusError ?? $message, + 'dedupe_key' => "run:{$runId}:status:FAILED", + ]); + } + + /** + * 封装流式状态返回,便于上层判断。 + * + * @return array{reply: string, done_reason: ?string, received_event: bool, failed: bool, canceled: bool} + */ + private function streamState( + string $reply, + ?string $doneReason, + bool $receivedEvent, + bool $failed, + bool $canceled + ): array { + return [ + 'reply' => $reply, + 'done_reason' => $doneReason, + 'received_event' => $receivedEvent, + 'failed' => $failed, + 'canceled' => $canceled, + ]; + } + + /** + * 计算耗时(毫秒)。 + */ + private function latencyMs(float $startedAt): int + { + return (int) ((microtime(true) - $startedAt) * 1000); + } + + /** + * 统一取消判断,便于 mock。 + */ + private function isCanceled(string $sessionId, string $runId): bool + { + return $this->cancelChecker->isCanceled($sessionId, $runId); + } + + /** + * 返回 Provider 名称(Dummy 使用短名)。 + */ private function resolveProviderName(): string { if ($this->provider instanceof DummyAgentProvider) { diff --git a/config/agent.php b/config/agent.php index 69a34b7..b6ce106 100644 --- a/config/agent.php +++ b/config/agent.php @@ -8,9 +8,19 @@ return [ 'retry_times' => env('AGENT_PROVIDER_RETRY_TIMES', 1), 'retry_backoff_ms' => env('AGENT_PROVIDER_RETRY_BACKOFF_MS', 500), ], + 'openai' => [ + 'base_url' => env('AGENT_OPENAI_BASE_URL', 'https://api.openai.com/v1'), + 'api_key' => env('AGENT_OPENAI_API_KEY', ''), + 'organization' => env('AGENT_OPENAI_ORGANIZATION', ''), + 'project' => env('AGENT_OPENAI_PROJECT', ''), + 'model' => env('AGENT_OPENAI_MODEL', 'gpt-4o-mini'), + 'temperature' => env('AGENT_OPENAI_TEMPERATURE', 0.7), + 'top_p' => env('AGENT_OPENAI_TOP_P', 1.0), + 'include_usage' => env('AGENT_OPENAI_INCLUDE_USAGE', false), + ], 'job' => [ 'tries' => env('AGENT_RUN_JOB_TRIES', 1), - 'backoff_seconds' => env('AGENT_RUN_JOB_BACKOFF', 5), - 'timeout_seconds' => env('AGENT_RUN_JOB_TIMEOUT', 120), + 'backoff_seconds' => env('AGENT_RUN_JOB_BACKOFF', 3), + 'timeout_seconds' => env('AGENT_RUN_JOB_TIMEOUT', 360), ], ]; diff --git a/config/telescope.php b/config/telescope.php index a31216b..28874fd 100644 --- a/config/telescope.php +++ b/config/telescope.php @@ -16,7 +16,7 @@ return [ | */ - 'enabled' => env('TELESCOPE_ENABLED', false), + 'enabled' => env('TELESCOPE_ENABLED', true), /* |-------------------------------------------------------------------------- diff --git a/docs/ChatSession/chat-session-api.md b/docs/ChatSession/chat-session-api.md index 3f24862..2fa5426 100644 --- a/docs/ChatSession/chat-session-api.md +++ b/docs/ChatSession/chat-session-api.md @@ -9,13 +9,15 @@ - 2025-02-14:MVP-1.1 增加会话列表、会话更新(重命名/状态变更),列表附带最后一条消息摘要。 - 2025-02-15:Agent Run MVP-0 —— RunDispatcher + AgentRunJob + DummyProvider;自动在 user.prompt 后触发一次 Run,落地 run.status / agent.message。 - 2025-12-18:Agent Run 可靠性增强 —— 并发幂等、终态去重、取消语义加强、Provider 超时/重试/错误归一,SSE gap 回补与心跳。 +- 2025-12-19:AgentProvider Streaming 接入 —— ProviderEvent 统一事件流,新增 message.delta 输出与 OpenAI-compatible 适配器。 -## 本次变更摘要(2025-12-18) +## 本次变更摘要(2025-12-19) - RunDispatcher 并发幂等:同 trigger_message_id 只产生一个 RUNNING,且仅新建时 dispatch。 - RunLoop/OutputSink 幂等:agent.message 与 run.status 采用 dedupe_key;重复执行不重复写。 - Cancel 强化:多检查点取消,确保不落 agent.message 且落 CANCELED 终态。 - Provider 可靠性:超时/重试/429/5xx,错误落库包含 retryable/http_status/provider/latency_ms。 - SSE 可靠性:gap 触发回补,心跳保活,publish 异常不影响主流程。 +- Streaming:AgentProvider 以事件流产出 message.delta,RunLoop 汇总后写入 agent.message。 ## 领域模型 - `ChatSession`:`session_id`(UUID)、`session_name`、`status`(`OPEN`/`LOCKED`/`CLOSED`)、`last_seq` @@ -51,7 +53,7 @@ | 字段 | 必填 | 类型 | 说明 | | --- | --- | --- | --- | | role | 是 | enum | `USER|AGENT|TOOL|SYSTEM` | - | type | 是 | string(≤64) | 如 `user.prompt`/`agent.message` 等 | + | type | 是 | string(≤64) | 如 `user.prompt`/`agent.message`/`message.delta` 等 | | content | 否 | string | 文本内容 | | payload | 否 | object | jsonb 结构 | | reply_to | 否 | uuid | 引用消息 | @@ -163,8 +165,8 @@ - 终态检测:若已 DONE/FAILED/CANCELED 则直接返回。 - Cancel 检查:存在 `run.cancel.request`(payload.run_id) 则写入 `run.status=CANCELED`,不产出 agent.message。 - ContextBuilder:提取最近 20 条 USER/AGENT 消息(type in user.prompt/agent.message),seq 升序提供给 Provider。 - - Provider 返回一次性文本回复(内置超时/重试/退避)。 - - OutputSink 依次写入:`agent.message`(payload 含 run_id, provider,dedupe_key=`run:{run_id}:agent:message`)、`run.status=DONE`(dedupe_key=`run:{run_id}:status:DONE`)。 + - Provider 以 Streaming 事件流产出文本增量(message.delta)。 + - OutputSink 持续写入 `message.delta`,最终写入 `agent.message`(payload 含 run_id, provider,dedupe_key=`run:{run_id}:agent:message`)与 `run.status=DONE`(dedupe_key=`run:{run_id}:status:DONE`)。 6. 异常:ProviderException 写入 `error` + `run.status=FAILED`(dedupe),error payload 包含 retryable/http_status/provider/latency_ms。 ### Run 相关消息类型(落库即真相源) @@ -172,6 +174,7 @@ | --- | --- | --- | --- | | run.status | SYSTEM | run_id, status(RUNNING/DONE/CANCELED/FAILED), trigger_message_id?, error? | Run 生命周期事件,CLOSED 状态下允许写入 | | agent.message | AGENT | run_id, provider | Provider 的一次性回复 | +| message.delta | AGENT | run_id, delta_index | Provider 的增量输出(Streaming) | | run.cancel.request | USER/SYSTEM | run_id | CancelChecker 依据该事件判断是否中止 | | error | SYSTEM | run_id, message, retryable?, http_status?, provider?, latency_ms?, raw_message? | 任务异常时落库 | diff --git a/docs/ChatSession/chat-session-openapi.yaml b/docs/ChatSession/chat-session-openapi.yaml index be44c98..d49e044 100644 --- a/docs/ChatSession/chat-session-openapi.yaml +++ b/docs/ChatSession/chat-session-openapi.yaml @@ -433,6 +433,7 @@ components: oneOf: - $ref: '#/components/schemas/RunStatusPayload' - $ref: '#/components/schemas/AgentMessagePayload' + - $ref: '#/components/schemas/MessageDeltaPayload' - $ref: '#/components/schemas/RunCancelPayload' - $ref: '#/components/schemas/RunErrorPayload' - type: object @@ -506,6 +507,14 @@ components: format: uuid provider: type: string + MessageDeltaPayload: + type: object + properties: + run_id: + type: string + format: uuid + delta_index: + type: integer RunCancelPayload: type: object properties: diff --git a/docs/agent-provider-update.md b/docs/agent-provider-update.md new file mode 100644 index 0000000..251156c --- /dev/null +++ b/docs/agent-provider-update.md @@ -0,0 +1,10 @@ +# AgentProvider Streaming 变更摘要(2025-12-19) + +- 引入 ProviderEvent 事件流与 AgentContext,Provider 以 Generator 输出 message.delta/done/error +- 新增 OpenAI-compatible 适配器:RequestBuilder、ApiClient、StreamParser、EventNormalizer +- RunLoop/OutputSink 支持增量落库:message.delta + agent.message + run.status +- 新增配置项 `agent.openai.*`,用于 base_url/api_key/model 等 +- 文档已补充 message.delta 的 payload 与消息类型说明 + +## 验证 +- `docker compose exec app php artisan test` diff --git a/tests/Feature/AgentRunTest.php b/tests/Feature/AgentRunTest.php index 1360e56..4b46b79 100644 --- a/tests/Feature/AgentRunTest.php +++ b/tests/Feature/AgentRunTest.php @@ -4,8 +4,9 @@ namespace Tests\Feature; use App\Jobs\AgentRunJob; use App\Models\Message; +use App\Services\Agent\AgentContext; use App\Services\Agent\AgentProviderInterface; -use App\Services\Agent\ProviderException; +use App\Services\Agent\ProviderEvent; use App\Services\CancelChecker; use App\Services\ChatService; use App\Services\RunDispatcher; @@ -204,14 +205,17 @@ class AgentRunTest extends TestCase $this->assertTrue($messages->contains(fn ($m) => $m->type === 'run.status' && ($m->payload['status'] ?? null) === 'CANCELED')); } - public function test_provider_exception_writes_error_and_failed_status(): void + public function test_provider_error_event_writes_error_and_failed_status(): void { Queue::fake(); $this->app->bind(AgentProviderInterface::class, function () { return new class implements AgentProviderInterface { - public function generate(array $context, array $options = []): string + public function stream(AgentContext $context, array $options = []): \Generator { - throw new ProviderException('HTTP_ERROR', 'provider failed', true, 500, 'boom'); + yield ProviderEvent::error('HTTP_ERROR', 'provider failed', [ + 'retryable' => true, + 'http_status' => 500, + ]); } }; }); @@ -229,16 +233,11 @@ class AgentRunTest extends TestCase $runId = $dispatcher->dispatchForPrompt($session->session_id, $prompt->message_id); - try { - (new AgentRunJob($session->session_id, $runId))->handle( - app(RunLoop::class), - app(OutputSink::class), - app(CancelChecker::class) - ); - $this->fail('Expected provider exception'); - } catch (ProviderException $exception) { - $this->assertSame('HTTP_ERROR', $exception->errorCode); - } + (new AgentRunJob($session->session_id, $runId))->handle( + app(RunLoop::class), + app(OutputSink::class), + app(CancelChecker::class) + ); $messages = Message::query() ->where('session_id', $session->session_id) diff --git a/tests/Unit/OpenAiAdapterTest.php b/tests/Unit/OpenAiAdapterTest.php new file mode 100644 index 0000000..cbc0888 --- /dev/null +++ b/tests/Unit/OpenAiAdapterTest.php @@ -0,0 +1,114 @@ +set('agent.openai.model', 'test-model'); + config()->set('agent.openai.temperature', 0.2); + config()->set('agent.openai.top_p', 0.9); + config()->set('agent.openai.include_usage', true); + + $context = new AgentContext('run-1', 'session-1', 'system prompt', [ + [ + 'message_id' => 'm1', + 'role' => Message::ROLE_USER, + 'type' => 'user.prompt', + 'content' => 'hello', + 'seq' => 1, + ], + [ + 'message_id' => 'm2', + 'role' => Message::ROLE_AGENT, + 'type' => 'agent.message', + 'content' => 'hi', + 'seq' => 2, + ], + ]); + + $payload = (new ChatCompletionsRequestBuilder())->build($context); + + $this->assertSame('test-model', $payload['model']); + $this->assertTrue($payload['stream']); + $this->assertSame(0.2, $payload['temperature']); + $this->assertSame(0.9, $payload['top_p']); + $this->assertSame(['include_usage' => true], $payload['stream_options']); + $this->assertSame([ + ['role' => 'system', 'content' => 'system prompt'], + ['role' => 'user', 'content' => 'hello'], + ['role' => 'assistant', 'content' => 'hi'], + ], $payload['messages']); + } + + public function test_event_normalizer_maps_delta_and_done(): void + { + $normalizer = new OpenAiEventNormalizer(); + + $delta = json_encode([ + 'choices' => [ + [ + 'delta' => ['content' => 'Hi'], + 'finish_reason' => null, + ], + ], + ]); + + $events = $normalizer->normalize($delta); + $this->assertCount(1, $events); + $this->assertSame(ProviderEventType::MessageDelta, $events[0]->type); + $this->assertSame('Hi', $events[0]->payload['text']); + + $done = json_encode([ + 'choices' => [ + [ + 'delta' => [], + 'finish_reason' => 'stop', + ], + ], + ]); + + $events = $normalizer->normalize($done); + $this->assertCount(1, $events); + $this->assertSame(ProviderEventType::Done, $events[0]->type); + $this->assertSame('stop', $events[0]->payload['reason']); + } + + public function test_event_normalizer_handles_invalid_json(): void + { + $normalizer = new OpenAiEventNormalizer(); + $events = $normalizer->normalize('{invalid'); + + $this->assertCount(1, $events); + $this->assertSame(ProviderEventType::Error, $events[0]->type); + $this->assertSame('INVALID_JSON', $events[0]->payload['code']); + } + + public function test_event_normalizer_handles_done_marker(): void + { + $normalizer = new OpenAiEventNormalizer(); + $events = $normalizer->normalize('[DONE]'); + + $this->assertCount(1, $events); + $this->assertSame(ProviderEventType::Done, $events[0]->type); + } + + public function test_stream_parser_splits_sse_events(): void + { + $stream = Utils::streamFor("data: {\"id\":1}\n\ndata: [DONE]\n\n"); + $parser = new OpenAiStreamParser(5); + $chunks = iterator_to_array($parser->parse($stream)); + + $this->assertSame(['{"id":1}', '[DONE]'], $chunks); + } +}