isRunTerminal($sessionId, $runId)) { return; } if ($this->isCanceled($sessionId, $runId)) { $this->appendCanceled($sessionId, $runId); return; } $context = $this->contextBuilder->build($sessionId, $runId); if ($this->isCanceled($sessionId, $runId)) { $this->appendCanceled($sessionId, $runId); return; } $providerName = $this->resolveProviderName(); $startedAt = microtime(true); logger('agent provider request', [ 'sessionId' => $sessionId, 'runId' => $runId, 'provider' => $providerName, ]); $streamState = $this->consumeProviderStream($sessionId, $runId, $context, $providerName, $startedAt); if ($streamState['canceled'] || $streamState['failed']) { return; } $latencyMs = $this->latencyMs($startedAt); logger('agent provider response', [ 'sessionId' => $sessionId, 'runId' => $runId, 'provider' => $providerName, 'latency_ms' => $latencyMs, ]); if ($this->isCanceled($sessionId, $runId)) { $this->appendCanceled($sessionId, $runId); return; } if (! $streamState['received_event']) { $this->appendProviderFailure( $sessionId, $runId, 'EMPTY_STREAM', 'Agent provider returned no events', $providerName, $latencyMs, [], 'EMPTY_STREAM' ); return; } if ($streamState['done_reason'] === null) { $this->appendProviderFailure( $sessionId, $runId, 'STREAM_INCOMPLETE', 'Agent provider stream ended unexpectedly', $providerName, $latencyMs, [], 'STREAM_INCOMPLETE' ); return; } $this->outputSink->appendAgentMessage($sessionId, $runId, $streamState['reply'], [ 'provider' => $providerName, 'done_reason' => $streamState['done_reason'], ], "run:{$runId}:agent:message"); if ($this->isCanceled($sessionId, $runId)) { $this->appendCanceled($sessionId, $runId); return; } $this->outputSink->appendRunStatus($sessionId, $runId, 'DONE', [ 'dedupe_key' => "run:{$runId}:status:DONE", ]); } /** * 判断指定 run 是否已到终态,避免重复执行。 */ private function isRunTerminal(string $sessionId, string $runId): bool { $latestStatus = Message::query() ->where('session_id', $sessionId) ->where('type', 'run.status') ->whereRaw("payload->>'run_id' = ?", [$runId]) ->orderByDesc('seq') ->first(); $status = $latestStatus ? ($latestStatus->payload['status'] ?? null) : null; return in_array($status, self::TERMINAL_STATUSES, true); } /** * 取消时写入终态 CANCELED(幂等)。 */ private function appendCanceled(string $sessionId, string $runId): void { $this->outputSink->appendRunStatus($sessionId, $runId, 'CANCELED', [ 'dedupe_key' => "run:{$runId}:status:CANCELED", ]); } /** * 消费 Provider Streaming 事件流: * - message.delta:落增量并累计最终回复 * - done:记录结束理由 * - error/异常:写入 error + FAILED * - cancel:即时中断并写 CANCELED * @return array{reply: string, done_reason: ?string, received_event: bool, failed: bool, canceled: bool} */ private function consumeProviderStream( string $sessionId, string $runId, AgentContext $context, string $providerName, float $startedAt ): array { $reply = ''; $deltaIndex = 0; $doneReason = null; $receivedEvent = false; try { foreach ($this->provider->stream($context, [ 'should_stop' => fn () => $this->isCanceled($sessionId, $runId), ]) as $event) { $receivedEvent = true; if ($this->isCanceled($sessionId, $runId)) { $this->appendCanceled($sessionId, $runId); return $this->streamState($reply, $doneReason, $receivedEvent, false, true); } // 文本增量:持续写 message.delta 并拼接最终回复 if ($event->type === ProviderEventType::MessageDelta) { $text = (string) ($event->payload['text'] ?? ''); if ($text !== '') { $reply .= $text; $deltaIndex++; $this->outputSink->appendAgentDelta($sessionId, $runId, $text, $deltaIndex, [ 'provider' => $providerName, ]); } continue; } // 流结束 if ($event->type === ProviderEventType::Done) { $doneReason = $event->payload['reason'] ?? null; break; } // Provider 内部错误事件 if ($event->type === ProviderEventType::Error) { $latencyMs = $this->latencyMs($startedAt); $code = (string) ($event->payload['code'] ?? 'PROVIDER_ERROR'); $message = (string) ($event->payload['message'] ?? 'Agent provider error'); $this->appendProviderFailure( $sessionId, $runId, $code, $message, $providerName, $latencyMs, [ 'retryable' => $event->payload['retryable'] ?? null, 'http_status' => $event->payload['http_status'] ?? null, 'raw_message' => $event->payload['raw_message'] ?? null, ] ); return $this->streamState($reply, $doneReason, $receivedEvent, true, false); } } } catch (ProviderException $exception) { $latencyMs = $this->latencyMs($startedAt); $this->appendProviderFailure( $sessionId, $runId, $exception->errorCode, $exception->getMessage(), $providerName, $latencyMs, [ 'retryable' => $exception->retryable, 'http_status' => $exception->httpStatus, 'raw_message' => $exception->rawMessage, ] ); return $this->streamState($reply, $doneReason, $receivedEvent, true, false); } return $this->streamState($reply, $doneReason, $receivedEvent, false, false); } /** * 统一落库 Provider 错误与 FAILED 终态。 * * @param array $meta */ private function appendProviderFailure( string $sessionId, string $runId, string $code, string $message, string $providerName, int $latencyMs, array $meta = [], ?string $statusError = null ): void { $this->outputSink->appendError($sessionId, $runId, $code, $message, array_merge($meta, [ 'provider' => $providerName, 'latency_ms' => $latencyMs, ]), "run:{$runId}:error:provider"); $this->outputSink->appendRunStatus($sessionId, $runId, 'FAILED', [ 'error' => $statusError ?? $message, 'dedupe_key' => "run:{$runId}:status:FAILED", ]); } /** * 封装流式状态返回,便于上层判断。 * * @return array{reply: string, done_reason: ?string, received_event: bool, failed: bool, canceled: bool} */ private function streamState( string $reply, ?string $doneReason, bool $receivedEvent, bool $failed, bool $canceled ): array { return [ 'reply' => $reply, 'done_reason' => $doneReason, 'received_event' => $receivedEvent, 'failed' => $failed, 'canceled' => $canceled, ]; } /** * 计算耗时(毫秒)。 */ private function latencyMs(float $startedAt): int { return (int) ((microtime(true) - $startedAt) * 1000); } /** * 统一取消判断,便于 mock。 */ private function isCanceled(string $sessionId, string $runId): bool { return $this->cancelChecker->isCanceled($sessionId, $runId); } /** * 返回 Provider 名称(Dummy 使用短名)。 */ private function resolveProviderName(): string { if ($this->provider instanceof DummyAgentProvider) { return 'dummy'; } return str_replace("\0", '', get_class($this->provider)); } }