main: 引入 AgentProvider 流式事件与 OpenAI 兼容适配

- 增加流式事件流支持,Provider 输出 `message.delta` 等事件
- 实现 OpenAI 兼容适配器,包括 RequestBuilder、ApiClient 等模块
- 更新 Agent Run 逻辑,支持流式增量写入与模型完成状态管理
- 扩展配置项 `agent.openai.*`,支持模型、密钥等配置
- 优化文档,完善流式事件与消息类型说明
- 增加单元测试,覆盖 Provider 和 OpenAI 适配相关逻辑
- 更新环境变量与配置示例,支持新功能
This commit is contained in:
2025-12-19 02:35:37 +08:00
parent 56523c1f0a
commit 8c4ad80dab
27 changed files with 1006 additions and 166 deletions

View File

@@ -0,0 +1,17 @@
<?php
namespace App\Services\Agent;
final class AgentContext
{
/**
* @param array<int, array{message_id: string, role: string, type: string, content: ?string, seq: int}> $messages
*/
public function __construct(
public string $runId,
public string $sessionId,
public string $systemPrompt,
public array $messages,
) {
}
}

View File

@@ -0,0 +1,14 @@
<?php
namespace App\Services\Agent;
interface AgentPlatformAdapterInterface
{
/**
* @param array<string, mixed> $options
* @return \Generator<int, \App\Services\Agent\ProviderEvent>
*/
public function stream(AgentContext $context, array $options = []): \Generator;
public function name(): string;
}

View File

@@ -5,8 +5,8 @@ namespace App\Services\Agent;
interface AgentProviderInterface
{
/**
* @param array<string, mixed> $context
* @param array<string, mixed> $options
* @return \Generator<int, \App\Services\Agent\ProviderEvent>
*/
public function generate(array $context, array $options = []): string;
public function stream(AgentContext $context, array $options = []): \Generator;
}

View File

@@ -5,12 +5,11 @@ namespace App\Services\Agent;
class DummyAgentProvider implements AgentProviderInterface
{
/**
* @param array<string, mixed> $context
* @param array<string, mixed> $options
*/
public function generate(array $context, array $options = []): string
public function stream(AgentContext $context, array $options = []): \Generator
{
$messages = $context['messages'] ?? [];
$messages = $context->messages;
$lastUser = null;
foreach (array_reverse($messages) as $msg) {
if (($msg['role'] ?? '') === 'USER' && ($msg['type'] ?? '') === 'user.prompt') {
@@ -21,6 +20,13 @@ class DummyAgentProvider implements AgentProviderInterface
$summary = $lastUser ? mb_substr($lastUser, 0, 80) : 'no user prompt';
return sprintf("Dummy-Agent: 我的当前回复的条目为 -> %s \n 我的上下文是: %s", $summary, json_encode($context['messages']));
$content = sprintf(
"Dummy-Agent: 我的当前回复的条目为 -> %s \n 我的上下文是: %s",
$summary,
json_encode($context->messages)
);
yield ProviderEvent::messageDelta($content);
yield ProviderEvent::done('dummy');
}
}

View File

@@ -2,105 +2,28 @@
namespace App\Services\Agent;
use Illuminate\Http\Client\ConnectionException;
use Illuminate\Support\Facades\Http;
use App\Services\Agent\OpenAi\OpenAiChatCompletionsAdapter;
class HttpAgentProvider implements AgentProviderInterface
{
protected string $endpoint;
protected int $timeoutSeconds;
protected int $connectTimeoutSeconds;
protected int $retryTimes;
protected int $retryBackoffMs;
private readonly bool $enabled;
public function __construct(?string $endpoint = null)
public function __construct(private readonly OpenAiChatCompletionsAdapter $adapter)
{
$this->endpoint = $endpoint ?? config('agent.provider.endpoint', '');
$this->timeoutSeconds = (int) config('agent.provider.timeout_seconds', 30);
$this->connectTimeoutSeconds = (int) config('agent.provider.connect_timeout_seconds', 5);
$this->retryTimes = (int) config('agent.provider.retry_times', 1);
$this->retryBackoffMs = (int) config('agent.provider.retry_backoff_ms', 500);
$baseUrl = (string) config('agent.openai.base_url', '');
$apiKey = (string) config('agent.openai.api_key', '');
$this->enabled = trim($baseUrl) !== '' && trim($apiKey) !== '';
}
/**
* @param array<string, mixed> $context
* @param array<string, mixed> $options
*/
public function generate(array $context, array $options = []): string
public function stream(AgentContext $context, array $options = []): \Generator
{
if (empty($this->endpoint)) {
// placeholder to avoid accidental outbound calls when未配置
return (new DummyAgentProvider())->generate($context, $options);
if (! $this->enabled) {
return (new DummyAgentProvider())->stream($context, $options);
}
$payload = [
'context' => $context,
'options' => $options,
];
$attempts = $this->retryTimes + 1;
$lastException = null;
$lastResponseBody = null;
$lastStatus = null;
for ($attempt = 1; $attempt <= $attempts; $attempt++) {
try {
$response = Http::connectTimeout($this->connectTimeoutSeconds)
->timeout($this->timeoutSeconds)
->post($this->endpoint, $payload);
$lastStatus = $response->status();
$lastResponseBody = $response->body();
if ($response->successful()) {
$data = $response->json();
return is_string($data) ? $data : ($data['content'] ?? '');
}
$retryable = $lastStatus === 429 || $lastStatus >= 500;
if ($retryable && $attempt < $attempts) {
usleep($this->retryBackoffMs * 1000);
continue;
}
throw new ProviderException(
'HTTP_ERROR',
'Agent provider failed',
$retryable,
$lastStatus,
$lastResponseBody
);
} catch (ConnectionException $exception) {
$lastException = $exception;
if ($attempt < $attempts) {
usleep($this->retryBackoffMs * 1000);
continue;
}
} catch (\Throwable $exception) {
$lastException = $exception;
break;
}
}
$rawMessage = $lastException ? $lastException->getMessage() : $lastResponseBody;
if ($lastException instanceof ConnectionException) {
throw new ProviderException(
'CONNECTION_FAILED',
'Agent provider connection failed',
true,
$lastStatus,
$rawMessage
);
}
throw new ProviderException(
'UNKNOWN_ERROR',
'Agent provider error',
false,
$lastStatus,
$rawMessage
);
return $this->adapter->stream($context, $options);
}
}

View File

@@ -0,0 +1,109 @@
<?php
namespace App\Services\Agent\OpenAi;
use App\Models\Message;
use App\Services\Agent\AgentContext;
class ChatCompletionsRequestBuilder
{
public function __construct(
private ?string $model = null,
private ?float $temperature = null,
private ?float $topP = null,
private ?bool $includeUsage = null,
) {
$this->model = $this->model ?? (string) config('agent.openai.model', 'gpt-4o-mini');
$this->temperature = $this->temperature ?? (float) config('agent.openai.temperature', 0.7);
$this->topP = $this->topP ?? (float) config('agent.openai.top_p', 1.0);
$this->includeUsage = $this->includeUsage ?? (bool) config('agent.openai.include_usage', false);
}
/**
* Builds an OpenAI-compatible Chat Completions payload from AgentContext.
*
* @param array<string, mixed> $options
* @return array<string, mixed>
*/
public function build(AgentContext $context, array $options = []): array
{
$payload = [
'model' => (string) ($options['model'] ?? $this->model),
'messages' => $this->buildMessages($context),
'stream' => true,
];
if (array_key_exists('temperature', $options)) {
$payload['temperature'] = (float) $options['temperature'];
} else {
$payload['temperature'] = (float) $this->temperature;
}
if (array_key_exists('top_p', $options)) {
$payload['top_p'] = (float) $options['top_p'];
} else {
$payload['top_p'] = (float) $this->topP;
}
if (array_key_exists('max_tokens', $options)) {
$payload['max_tokens'] = (int) $options['max_tokens'];
}
if (array_key_exists('stop', $options)) {
$payload['stop'] = $options['stop'];
}
if (array_key_exists('stream_options', $options)) {
$payload['stream_options'] = $options['stream_options'];
} elseif ($this->includeUsage) {
$payload['stream_options'] = ['include_usage' => true];
}
if (array_key_exists('response_format', $options)) {
$payload['response_format'] = $options['response_format'];
}
return $payload;
}
/**
* @return array<int, array{role: string, content: string}>
*/
private function buildMessages(AgentContext $context): array
{
$messages = [];
if ($context->systemPrompt !== '') {
$messages[] = [
'role' => 'system',
'content' => $context->systemPrompt,
];
}
foreach ($context->messages as $message) {
$role = $this->mapRole((string) ($message['role'] ?? ''));
$content = $message['content'] ?? null;
if (! $role || ! is_string($content) || $content === '') {
continue;
}
$messages[] = [
'role' => $role,
'content' => $content,
];
}
return $messages;
}
private function mapRole(string $role): ?string
{
return match ($role) {
Message::ROLE_USER => 'user',
Message::ROLE_AGENT => 'assistant',
Message::ROLE_SYSTEM => 'system',
default => null,
};
}
}

View File

@@ -0,0 +1,87 @@
<?php
namespace App\Services\Agent\OpenAi;
use App\Services\Agent\ProviderException;
use Illuminate\Http\Client\ConnectionException;
use Illuminate\Support\Facades\Http;
use Psr\Http\Message\ResponseInterface;
class OpenAiApiClient
{
public function __construct(
private ?string $baseUrl = null,
private ?string $apiKey = null,
private ?string $organization = null,
private ?string $project = null,
private ?int $timeoutSeconds = null,
private ?int $connectTimeoutSeconds = null,
) {
$this->baseUrl = $this->baseUrl ?? (string) config('agent.openai.base_url', '');
$this->apiKey = $this->apiKey ?? (string) config('agent.openai.api_key', '');
$this->organization = $this->organization ?? (string) config('agent.openai.organization', '');
$this->project = $this->project ?? (string) config('agent.openai.project', '');
$this->timeoutSeconds = $this->timeoutSeconds ?? (int) config('agent.provider.timeout_seconds', 30);
$this->connectTimeoutSeconds = $this->connectTimeoutSeconds ?? (int) config('agent.provider.connect_timeout_seconds', 5);
}
/**
* Opens a streaming response for the Chat Completions endpoint.
*
* @param array<string, mixed> $payload
*/
public function openStream(array $payload): ResponseInterface
{
$baseUrl = trim((string) $this->baseUrl);
$apiKey = trim((string) $this->apiKey);
if ($baseUrl === '' || $apiKey === '') {
throw new ProviderException('CONFIG_MISSING', 'Agent provider configuration missing', false);
}
$endpoint = rtrim($baseUrl, '/').'/chat/completions';
$headers = [
'Authorization' => 'Bearer '.$apiKey,
'Accept' => 'text/event-stream',
];
if (trim((string) $this->organization) !== '') {
$headers['OpenAI-Organization'] = (string) $this->organization;
}
if (trim((string) $this->project) !== '') {
$headers['OpenAI-Project'] = (string) $this->project;
}
try {
$response = Http::withHeaders($headers)
->connectTimeout($this->connectTimeoutSeconds)
->timeout($this->timeoutSeconds)
->withOptions(['stream' => true])
->post($endpoint, $payload);
} catch (ConnectionException $exception) {
throw new ProviderException(
'CONNECTION_FAILED',
'Agent provider connection failed',
true,
null,
$exception->getMessage()
);
}
$status = $response->status();
if ($status < 200 || $status >= 300) {
$retryable = $status === 429 || $status >= 500;
throw new ProviderException(
'HTTP_ERROR',
'Agent provider failed',
$retryable,
$status,
$response->body()
);
}
return $response->toPsrResponse();
}
}

View File

@@ -0,0 +1,102 @@
<?php
namespace App\Services\Agent\OpenAi;
use App\Services\Agent\AgentContext;
use App\Services\Agent\AgentPlatformAdapterInterface;
use App\Services\Agent\ProviderEvent;
use App\Services\Agent\ProviderEventType;
use App\Services\Agent\ProviderException;
class OpenAiChatCompletionsAdapter implements AgentPlatformAdapterInterface
{
public function __construct(
private readonly ChatCompletionsRequestBuilder $requestBuilder,
private readonly OpenAiApiClient $apiClient,
private readonly OpenAiStreamParser $streamParser,
private readonly OpenAiEventNormalizer $eventNormalizer,
private ?int $retryTimes = null,
private ?int $retryBackoffMs = null,
) {
$this->retryTimes = $this->retryTimes ?? (int) config('agent.provider.retry_times', 1);
$this->retryBackoffMs = $this->retryBackoffMs ?? (int) config('agent.provider.retry_backoff_ms', 500);
}
/**
* Streams OpenAI-compatible chat completions and yields normalized events.
*
* @param array<string, mixed> $options
* @return \Generator<int, ProviderEvent>
*/
public function stream(AgentContext $context, array $options = []): \Generator
{
$payload = $this->requestBuilder->build($context, $options);
$attempts = $this->retryTimes + 1;
$attempt = 1;
$backoffMs = $this->retryBackoffMs;
$hasYielded = false;
$shouldStop = $options['should_stop'] ?? null;
while (true) {
try {
$response = $this->apiClient->openStream($payload);
$stream = $response->getBody();
try {
foreach ($this->streamParser->parse($stream, is_callable($shouldStop) ? $shouldStop : null) as $chunk) {
$events = $this->eventNormalizer->normalize($chunk);
foreach ($events as $event) {
$hasYielded = true;
yield $event;
if ($event->type === ProviderEventType::Done || $event->type === ProviderEventType::Error) {
return;
}
}
}
} finally {
$stream->close();
}
if (! $hasYielded) {
if (is_callable($shouldStop) && $shouldStop()) {
return;
}
yield ProviderEvent::error('EMPTY_STREAM', 'Agent provider returned empty stream');
}
return;
} catch (ProviderException $exception) {
if (! $hasYielded && is_callable($shouldStop) && $shouldStop()) {
return;
}
if (! $hasYielded && $exception->retryable && $attempt < $attempts) {
usleep($backoffMs * 1000);
$attempt++;
$backoffMs *= 2;
continue;
}
yield ProviderEvent::error($exception->errorCode, $exception->getMessage(), [
'retryable' => $exception->retryable,
'http_status' => $exception->httpStatus,
'raw_message' => $exception->rawMessage,
]);
return;
} catch (\Throwable $exception) {
if (! $hasYielded && is_callable($shouldStop) && $shouldStop()) {
return;
}
yield ProviderEvent::error('UNKNOWN_ERROR', $exception->getMessage());
return;
}
}
}
public function name(): string
{
return 'openai.chat.completions';
}
}

View File

@@ -0,0 +1,50 @@
<?php
namespace App\Services\Agent\OpenAi;
use App\Services\Agent\ProviderEvent;
class OpenAiEventNormalizer
{
/**
* Normalizes a single SSE payload into ProviderEvent values.
*
* @return array<int, ProviderEvent>
*/
public function normalize(string $payload): array
{
if (trim($payload) === '[DONE]') {
return [ProviderEvent::done('done')];
}
$decoded = json_decode($payload, true);
if (! is_array($decoded)) {
return [ProviderEvent::error('INVALID_JSON', 'Agent provider returned invalid JSON', [
'raw' => $payload,
])];
}
$events = [];
$choices = $decoded['choices'] ?? [];
$firstChoice = is_array($choices) ? ($choices[0] ?? null) : null;
$delta = is_array($firstChoice) ? ($firstChoice['delta'] ?? null) : null;
if (is_array($delta)) {
$content = $delta['content'] ?? null;
if (is_string($content) && $content !== '') {
$events[] = ProviderEvent::messageDelta($content);
}
}
if (is_array($firstChoice) && array_key_exists('finish_reason', $firstChoice) && $firstChoice['finish_reason'] !== null) {
$events[] = ProviderEvent::done((string) $firstChoice['finish_reason']);
}
if (isset($decoded['usage']) && is_array($decoded['usage'])) {
$events[] = ProviderEvent::usage($decoded['usage']);
}
return $events;
}
}

View File

@@ -0,0 +1,75 @@
<?php
namespace App\Services\Agent\OpenAi;
use Psr\Http\Message\StreamInterface;
class OpenAiStreamParser
{
public function __construct(private readonly int $chunkSize = 1024)
{
}
/**
* Parses SSE data lines into payload strings.
*
* @return \Generator<int, string>
*/
public function parse(StreamInterface $stream, ?callable $shouldStop = null): \Generator
{
$buffer = '';
$eventData = '';
while (! $stream->eof()) {
if ($shouldStop && $shouldStop()) {
break;
}
$chunk = $stream->read($this->chunkSize);
if ($chunk === '') {
usleep(10000);
continue;
}
$buffer .= $chunk;
while (($pos = strpos($buffer, "\n")) !== false) {
$line = substr($buffer, 0, $pos);
$buffer = substr($buffer, $pos + 1);
$line = rtrim($line, "\r");
if ($line === '') {
if ($eventData !== '') {
yield $eventData;
$eventData = '';
}
continue;
}
if (str_starts_with($line, 'data:')) {
$data = ltrim(substr($line, 5));
if ($eventData !== '') {
$eventData .= "\n";
}
$eventData .= $data;
}
}
}
if ($buffer !== '') {
$line = rtrim($buffer, "\r");
if (str_starts_with($line, 'data:')) {
$data = ltrim(substr($line, 5));
if ($eventData !== '') {
$eventData .= "\n";
}
$eventData .= $data;
}
}
if ($eventData !== '') {
yield $eventData;
}
}
}

View File

@@ -0,0 +1,60 @@
<?php
namespace App\Services\Agent;
final class ProviderEvent
{
/**
* @param array<string, mixed> $payload
*/
public function __construct(
public ProviderEventType $type,
public array $payload = [],
) {
}
public static function messageDelta(string $text): self
{
return new self(ProviderEventType::MessageDelta, ['text' => $text]);
}
/**
* @param array<string, mixed> $payload
*/
public static function toolCall(array $payload): self
{
return new self(ProviderEventType::ToolCall, $payload);
}
/**
* @param array<string, mixed> $payload
*/
public static function toolDelta(array $payload): self
{
return new self(ProviderEventType::ToolDelta, $payload);
}
/**
* @param array<string, mixed> $usage
*/
public static function usage(array $usage): self
{
return new self(ProviderEventType::Usage, $usage);
}
public static function done(?string $reason = null): self
{
return new self(ProviderEventType::Done, ['reason' => $reason]);
}
/**
* @param array<string, mixed> $meta
*/
public static function error(string $code, string $message, array $meta = []): self
{
return new self(ProviderEventType::Error, array_merge([
'code' => $code,
'message' => $message,
], $meta));
}
}

View File

@@ -0,0 +1,13 @@
<?php
namespace App\Services\Agent;
enum ProviderEventType: string
{
case MessageDelta = 'message.delta';
case ToolCall = 'tool.call';
case ToolDelta = 'tool.delta';
case Usage = 'usage';
case Done = 'done';
case Error = 'error';
}

View File

@@ -3,6 +3,7 @@
namespace App\Services;
use App\Models\Message;
use App\Services\Agent\AgentContext;
use Illuminate\Support\Collection;
class ContextBuilder
@@ -11,18 +12,15 @@ class ContextBuilder
{
}
/**
* @return array<string, mixed>
*/
public function build(string $sessionId, string $runId): array
public function build(string $sessionId, string $runId): AgentContext
{
$messages = $this->loadRecentMessages($sessionId);
return [
'run_id' => $runId,
'session_id' => $sessionId,
'system_prompt' => 'You are an agent inside ARS. Respond concisely in plain text.',
'messages' => $messages->map(function (Message $message) {
return new AgentContext(
$runId,
$sessionId,
'You are an agent inside ARS. Respond concisely in plain text.',
$messages->map(function (Message $message) {
return [
'message_id' => $message->message_id,
'role' => $message->role,
@@ -30,8 +28,8 @@ class ContextBuilder
'content' => $message->content,
'seq' => $message->seq,
];
})->values()->all(),
];
})->values()->all()
);
}
private function loadRecentMessages(string $sessionId): Collection

View File

@@ -27,6 +27,26 @@ class OutputSink
]);
}
/**
* @param array<string, mixed> $meta
*/
public function appendAgentDelta(string $sessionId, string $runId, string $content, int $deltaIndex, array $meta = []): Message
{
$dedupeKey = "run:{$runId}:agent:delta:{$deltaIndex}";
return $this->chatService->appendMessage([
'session_id' => $sessionId,
'role' => Message::ROLE_AGENT,
'type' => 'message.delta',
'content' => $content,
'payload' => array_merge($meta, [
'run_id' => $runId,
'delta_index' => $deltaIndex,
]),
'dedupe_key' => $dedupeKey,
]);
}
/**
* @param array<string, mixed> $meta
*/

View File

@@ -3,10 +3,17 @@
namespace App\Services;
use App\Services\Agent\AgentProviderInterface;
use App\Services\Agent\AgentContext;
use App\Services\Agent\DummyAgentProvider;
use App\Services\Agent\ProviderEventType;
use App\Services\Agent\ProviderException;
use App\Models\Message;
/**
* Agent Run 主循环:
* - 构建上下文,消费 Provider 事件流Streaming
* - 处理取消、错误、增量输出、终态写回
*/
class RunLoop
{
private const TERMINAL_STATUSES = ['DONE', 'FAILED', 'CANCELED'];
@@ -19,20 +26,23 @@ class RunLoop
) {
}
/**
* 运行单次 Agent Run run_id 幂等负责取消检查、Provider 调用和结果落库。
*/
public function run(string $sessionId, string $runId): void
{
if ($this->isRunTerminal($sessionId, $runId)) {
return;
}
if ($this->cancelChecker->isCanceled($sessionId, $runId)) {
if ($this->isCanceled($sessionId, $runId)) {
$this->appendCanceled($sessionId, $runId);
return;
}
$context = $this->contextBuilder->build($sessionId, $runId);
if ($this->cancelChecker->isCanceled($sessionId, $runId)) {
if ($this->isCanceled($sessionId, $runId)) {
$this->appendCanceled($sessionId, $runId);
return;
}
@@ -46,27 +56,13 @@ class RunLoop
'provider' => $providerName,
]);
try {
$reply = $this->provider->generate($context);
} catch (ProviderException $exception) {
$latencyMs = (int) ((microtime(true) - $startedAt) * 1000);
$streamState = $this->consumeProviderStream($sessionId, $runId, $context, $providerName, $startedAt);
$this->outputSink->appendError($sessionId, $runId, $exception->errorCode, $exception->getMessage(), [
'retryable' => $exception->retryable,
'http_status' => $exception->httpStatus,
'provider' => $providerName,
'latency_ms' => $latencyMs,
'raw_message' => $exception->rawMessage,
], "run:{$runId}:error:provider");
$this->outputSink->appendRunStatus($sessionId, $runId, 'FAILED', [
'error' => $exception->getMessage(),
'dedupe_key' => "run:{$runId}:status:FAILED",
]);
throw $exception;
if ($streamState['canceled'] || $streamState['failed']) {
return;
}
$latencyMs = (int) ((microtime(true) - $startedAt) * 1000);
$latencyMs = $this->latencyMs($startedAt);
logger('agent provider response', [
'sessionId' => $sessionId,
@@ -75,16 +71,45 @@ class RunLoop
'latency_ms' => $latencyMs,
]);
if ($this->cancelChecker->isCanceled($sessionId, $runId)) {
if ($this->isCanceled($sessionId, $runId)) {
$this->appendCanceled($sessionId, $runId);
return;
}
$this->outputSink->appendAgentMessage($sessionId, $runId, $reply, [
if (! $streamState['received_event']) {
$this->appendProviderFailure(
$sessionId,
$runId,
'EMPTY_STREAM',
'Agent provider returned no events',
$providerName,
$latencyMs,
[],
'EMPTY_STREAM'
);
return;
}
if ($streamState['done_reason'] === null) {
$this->appendProviderFailure(
$sessionId,
$runId,
'STREAM_INCOMPLETE',
'Agent provider stream ended unexpectedly',
$providerName,
$latencyMs,
[],
'STREAM_INCOMPLETE'
);
return;
}
$this->outputSink->appendAgentMessage($sessionId, $runId, $streamState['reply'], [
'provider' => $providerName,
'done_reason' => $streamState['done_reason'],
], "run:{$runId}:agent:message");
if ($this->cancelChecker->isCanceled($sessionId, $runId)) {
if ($this->isCanceled($sessionId, $runId)) {
$this->appendCanceled($sessionId, $runId);
return;
}
@@ -94,6 +119,9 @@ class RunLoop
]);
}
/**
* 判断指定 run 是否已到终态,避免重复执行。
*/
private function isRunTerminal(string $sessionId, string $runId): bool
{
$latestStatus = Message::query()
@@ -108,6 +136,9 @@ class RunLoop
return in_array($status, self::TERMINAL_STATUSES, true);
}
/**
* 取消时写入终态 CANCELED幂等
*/
private function appendCanceled(string $sessionId, string $runId): void
{
$this->outputSink->appendRunStatus($sessionId, $runId, 'CANCELED', [
@@ -115,6 +146,168 @@ class RunLoop
]);
}
/**
* 消费 Provider Streaming 事件流:
* - message.delta落增量并累计最终回复
* - done记录结束理由
* - error/异常:写入 error + FAILED
* - cancel即时中断并写 CANCELED
* @return array{reply: string, done_reason: ?string, received_event: bool, failed: bool, canceled: bool}
*/
private function consumeProviderStream(
string $sessionId,
string $runId,
AgentContext $context,
string $providerName,
float $startedAt
): array {
$reply = '';
$deltaIndex = 0;
$doneReason = null;
$receivedEvent = false;
try {
foreach ($this->provider->stream($context, [
'should_stop' => fn () => $this->isCanceled($sessionId, $runId),
]) as $event) {
$receivedEvent = true;
if ($this->isCanceled($sessionId, $runId)) {
$this->appendCanceled($sessionId, $runId);
return $this->streamState($reply, $doneReason, $receivedEvent, false, true);
}
// 文本增量:持续写 message.delta 并拼接最终回复
if ($event->type === ProviderEventType::MessageDelta) {
$text = (string) ($event->payload['text'] ?? '');
if ($text !== '') {
$reply .= $text;
$deltaIndex++;
$this->outputSink->appendAgentDelta($sessionId, $runId, $text, $deltaIndex, [
'provider' => $providerName,
]);
}
continue;
}
// 流结束
if ($event->type === ProviderEventType::Done) {
$doneReason = $event->payload['reason'] ?? null;
break;
}
// Provider 内部错误事件
if ($event->type === ProviderEventType::Error) {
$latencyMs = $this->latencyMs($startedAt);
$code = (string) ($event->payload['code'] ?? 'PROVIDER_ERROR');
$message = (string) ($event->payload['message'] ?? 'Agent provider error');
$this->appendProviderFailure(
$sessionId,
$runId,
$code,
$message,
$providerName,
$latencyMs,
[
'retryable' => $event->payload['retryable'] ?? null,
'http_status' => $event->payload['http_status'] ?? null,
'raw_message' => $event->payload['raw_message'] ?? null,
]
);
return $this->streamState($reply, $doneReason, $receivedEvent, true, false);
}
}
} catch (ProviderException $exception) {
$latencyMs = $this->latencyMs($startedAt);
$this->appendProviderFailure(
$sessionId,
$runId,
$exception->errorCode,
$exception->getMessage(),
$providerName,
$latencyMs,
[
'retryable' => $exception->retryable,
'http_status' => $exception->httpStatus,
'raw_message' => $exception->rawMessage,
]
);
return $this->streamState($reply, $doneReason, $receivedEvent, true, false);
}
return $this->streamState($reply, $doneReason, $receivedEvent, false, false);
}
/**
* 统一落库 Provider 错误与 FAILED 终态。
*
* @param array<string, mixed> $meta
*/
private function appendProviderFailure(
string $sessionId,
string $runId,
string $code,
string $message,
string $providerName,
int $latencyMs,
array $meta = [],
?string $statusError = null
): void {
$this->outputSink->appendError($sessionId, $runId, $code, $message, array_merge($meta, [
'provider' => $providerName,
'latency_ms' => $latencyMs,
]), "run:{$runId}:error:provider");
$this->outputSink->appendRunStatus($sessionId, $runId, 'FAILED', [
'error' => $statusError ?? $message,
'dedupe_key' => "run:{$runId}:status:FAILED",
]);
}
/**
* 封装流式状态返回,便于上层判断。
*
* @return array{reply: string, done_reason: ?string, received_event: bool, failed: bool, canceled: bool}
*/
private function streamState(
string $reply,
?string $doneReason,
bool $receivedEvent,
bool $failed,
bool $canceled
): array {
return [
'reply' => $reply,
'done_reason' => $doneReason,
'received_event' => $receivedEvent,
'failed' => $failed,
'canceled' => $canceled,
];
}
/**
* 计算耗时(毫秒)。
*/
private function latencyMs(float $startedAt): int
{
return (int) ((microtime(true) - $startedAt) * 1000);
}
/**
* 统一取消判断,便于 mock。
*/
private function isCanceled(string $sessionId, string $runId): bool
{
return $this->cancelChecker->isCanceled($sessionId, $runId);
}
/**
* 返回 Provider 名称Dummy 使用短名)。
*/
private function resolveProviderName(): string
{
if ($this->provider instanceof DummyAgentProvider) {