maxToolCalls = (int) config('agent.tools.max_calls_per_run', 10); $this->toolWaitTimeoutMs = (int) config('agent.tools.wait_timeout_ms', 15000); $this->toolPollIntervalMs = (int) config('agent.tools.wait_poll_interval_ms', 200); } /** * 运行单次 Agent Run(按 run_id 幂等)。 * * 主流程: * 1. 检查 run 是否已终止 * 2. 进入主循环,持续调用 Provider 直到完成或失败 * 3. 每轮迭代可能触发工具调用,工具完成后继续下一轮 * 4. 没有工具调用时,写入最终回复并标记 DONE */ public function run(string $sessionId, string $runId): void { // 1. 幂等性检查:避免重复执行已完成的 run if ($this->isRunTerminal($sessionId, $runId)) { return; } $providerName = $this->resolveProviderName(); $toolCallCount = 0; // 2. 主循环:持续调用 Provider 直到完成或失败 while (true) { // 2.1 检查用户是否取消 if ($this->checkAndHandleCancel($sessionId, $runId)) { return; } // 2.2 执行一轮 Provider 调用 $iterationResult = $this->executeProviderIteration( $sessionId, $runId, $providerName, $toolCallCount ); // 2.3 处理失败或取消 if ($iterationResult['should_exit']) { return; } // 2.4 如果有工具调用,处理工具执行流程 if ($iterationResult['has_tool_calls']) { $shouldExit = $this->handleToolCalls( $sessionId, $runId, $providerName, $iterationResult, $toolCallCount ); if ($shouldExit) { return; } // 更新工具调用计数,继续下一轮 Provider 调用 $toolCallCount = $iterationResult['updated_tool_count']; continue; } // 2.5 没有工具调用,完成 run $this->completeRun( $sessionId, $runId, $providerName, $iterationResult['stream_state'], $iterationResult['latency_ms'] ); return; } } /** * 检查并处理取消状态。 * * @return bool 是否已处理取消(true 表示已取消并写入状态) */ private function checkAndHandleCancel(string $sessionId, string $runId): bool { if ($this->isCanceled($sessionId, $runId)) { $this->appendCanceled($sessionId, $runId); return true; } return false; } /** * 执行一轮 Provider 调用迭代。 * * 包括: * - 构建上下文 * - 准备 Provider 选项(工具调用限制、取消回调等) * - 调用 Provider 流式接口 * - 记录日志 * * @return array{ * stream_state: array, * has_tool_calls: bool, * updated_tool_count: int, * should_exit: bool, * latency_ms: int * } */ private function executeProviderIteration( string $sessionId, string $runId, string $providerName, int $currentToolCallCount ): array { // 1. 构建上下文和 Provider 选项 $context = $this->contextBuilder->build($sessionId, $runId); $providerOptions = $this->buildProviderOptions($sessionId, $runId, $currentToolCallCount); // 2. 记录调用日志 $this->logProviderRequest($sessionId, $runId, $providerName, $context, $providerOptions, $currentToolCallCount); // 3. 调用 Provider 并消费事件流 $startedAt = microtime(true); $streamState = $this->consumeProviderStream( $sessionId, $runId, $context, $providerName, $startedAt, $providerOptions ); $latencyMs = $this->latencyMs($startedAt); // 4. 检查流式调用是否失败或取消 if ($streamState['canceled'] || $streamState['failed']) { return [ 'stream_state' => $streamState, 'has_tool_calls' => false, 'updated_tool_count' => $currentToolCallCount, 'should_exit' => true, 'latency_ms' => $latencyMs, ]; } // 5. 检查是否有工具调用 $hasToolCalls = !empty($streamState['tool_calls']); $updatedToolCount = $currentToolCallCount + count($streamState['tool_calls']); return [ 'stream_state' => $streamState, 'has_tool_calls' => $hasToolCalls, 'updated_tool_count' => $updatedToolCount, 'should_exit' => false, 'latency_ms' => $latencyMs, ]; } /** * 构建 Provider 调用选项。 * * 包括: * - 取消检查回调 * - 工具调用限制控制 */ private function buildProviderOptions(string $sessionId, string $runId, int $toolCallCount): array { $options = [ 'should_stop' => fn () => $this->isCanceled($sessionId, $runId), ]; // 达到工具调用上限后,禁用工具列表,避免再次触发 TOOL_CALL_LIMIT 错误 if ($toolCallCount >= $this->maxToolCalls) { $options['disable_tools'] = true; } return $options; } /** * 记录 Provider 请求日志。 */ private function logProviderRequest( string $sessionId, string $runId, string $providerName, AgentContext $context, array $providerOptions, int $iteration ): void { // 日志选项(移除不可序列化的回调) $logOptions = $providerOptions; unset($logOptions['should_stop']); logger('agent provider context', [ 'sessionId' => $sessionId, 'runId' => $runId, 'provider' => $providerName, 'context' => $context, 'provider_options' => $logOptions, ]); logger('agent provider request', [ 'sessionId' => $sessionId, 'runId' => $runId, 'provider' => $providerName, 'iteration' => $iteration, ]); } /** * 处理工具调用流程。 * * 流程: * 1. 检查工具调用数量是否超限 * 2. 分发工具子 Run * 3. 等待工具执行结果 * * @return bool 是否应该退出主循环(超限、失败或取消时返回 true) */ private function handleToolCalls( string $sessionId, string $runId, string $providerName, array $iterationResult, int $originalToolCallCount ): bool { $streamState = $iterationResult['stream_state']; $latencyMs = $iterationResult['latency_ms']; $updatedToolCount = $iterationResult['updated_tool_count']; // 1. 检查工具调用数量是否超限 if ($updatedToolCount > $this->maxToolCalls) { $this->appendProviderFailure( $sessionId, $runId, 'TOOL_CALL_LIMIT', 'Tool call limit reached for this run', $providerName, $latencyMs, [], 'TOOL_CALL_LIMIT' ); return true; // 退出主循环 } // 2. 分发工具子 Run $toolCalls = $this->dispatchToolRuns($sessionId, $runId, $streamState['tool_calls']); // 3. 等待所有工具执行完成 $waitState = $this->awaitToolResults($sessionId, $runId, $toolCalls, $providerName); // 4. 检查等待过程中是否失败或取消 if ($waitState['failed'] || $waitState['canceled']) { return true; // 退出主循环 } // 工具结果已写回上下文,继续下一轮 Agent 调用 return false; } /** * 完成 Run 并写入最终状态。 * * 流程: * 1. 验证流式响应的有效性 * 2. 写入最终 agent.message * 3. 再次检查取消状态 * 4. 写入 run.status = DONE */ private function completeRun( string $sessionId, string $runId, string $providerName, array $streamState, int $latencyMs ): void { // 1. 记录响应日志 logger('agent provider response', [ 'sessionId' => $sessionId, 'runId' => $runId, 'provider' => $providerName, 'latency_ms' => $latencyMs, ]); // 2. 再次检查取消状态(在写入最终消息前) if ($this->checkAndHandleCancel($sessionId, $runId)) { return; } // 3. 验证流式响应的有效性 if (!$this->validateStreamResponse($sessionId, $runId, $providerName, $streamState, $latencyMs)) { return; } // 4. 写入最终 agent.message $this->outputSink->appendAgentMessage($sessionId, $runId, $streamState['reply'], [ 'provider' => $providerName, 'done_reason' => $streamState['done_reason'], ], "run:{$runId}:agent:message"); // 5. 最后一次检查取消状态(在写入 DONE 前) if ($this->checkAndHandleCancel($sessionId, $runId)) { return; } // 6. 写入 run.status = DONE $this->outputSink->appendRunStatus($sessionId, $runId, 'DONE', [ 'dedupe_key' => "run:{$runId}:status:DONE", ]); } /** * 验证流式响应的有效性。 * * 检查: * - 是否收到任何事件(避免空流) * - 流是否正常结束(有 done_reason) * * @return bool 是否有效(true 表示有效,false 表示无效并已写入错误) */ private function validateStreamResponse( string $sessionId, string $runId, string $providerName, array $streamState, int $latencyMs ): bool { // 1. 检查是否收到任何事件 if (!$streamState['received_event']) { $this->appendProviderFailure( $sessionId, $runId, 'EMPTY_STREAM', 'Agent provider returned no events', $providerName, $latencyMs, [], 'EMPTY_STREAM' ); return false; } // 2. 检查流是否正常结束 if ($streamState['done_reason'] === null) { $this->appendProviderFailure( $sessionId, $runId, 'STREAM_INCOMPLETE', 'Agent provider stream ended unexpectedly', $providerName, $latencyMs, [], 'STREAM_INCOMPLETE' ); return false; } return true; } /** * 判断指定 run 是否已到终态,避免重复执行。 */ private function isRunTerminal(string $sessionId, string $runId): bool { $latestStatus = Message::query() ->where('session_id', $sessionId) ->where('type', 'run.status') ->whereRaw("payload->>'run_id' = ?", [$runId]) ->orderByDesc('seq') ->first(); $status = $latestStatus ? ($latestStatus->payload['status'] ?? null) : null; return in_array($status, self::TERMINAL_STATUSES, true); } /** * 取消时写入终态 CANCELED(幂等)。 */ private function appendCanceled(string $sessionId, string $runId): void { $this->outputSink->appendRunStatus($sessionId, $runId, 'CANCELED', [ 'dedupe_key' => "run:{$runId}:status:CANCELED", ]); } /** * 消费 Provider Streaming 事件流: * - message.delta:落增量并累计最终回复 * - done:记录结束理由 * - error/异常:写入 error + FAILED * - cancel:即时中断并写 CANCELED * - tool.delta/tool.call:收集工具调用信息,后续驱动子 Run * * @return array{ * reply: string, * done_reason: ?string, * received_event: bool, * failed: bool, * canceled: bool, * tool_calls: array> * } */ private function consumeProviderStream( string $sessionId, string $runId, AgentContext $context, string $providerName, float $startedAt, array $providerOptions = [] ): array { $reply = ''; $deltaIndex = 0; $doneReason = null; $receivedEvent = false; $toolCallBuffer = []; $toolCallOrder = []; try { $providerOptions = array_merge([ 'should_stop' => fn () => $this->isCanceled($sessionId, $runId), ], $providerOptions); foreach ($this->provider->stream($context, $providerOptions) as $event) { $receivedEvent = true; if ($this->isCanceled($sessionId, $runId)) { $this->appendCanceled($sessionId, $runId); $toolCalls = $this->finalizeToolCalls($toolCallBuffer, $toolCallOrder, $doneReason); return $this->streamState($reply, $doneReason, $receivedEvent, false, true, $toolCalls); } // 文本增量:持续写 message.delta 并拼接最终回复 if ($event->type === ProviderEventType::MessageDelta) { $text = (string) ($event->payload['text'] ?? ''); if ($text !== '') { $reply .= $text; $deltaIndex++; $this->outputSink->appendAgentDelta($sessionId, $runId, $text, $deltaIndex, [ 'provider' => $providerName, ]); } continue; } if ($event->type === ProviderEventType::ToolDelta || $event->type === ProviderEventType::ToolCall) { $toolCalls = $event->payload['tool_calls'] ?? []; if (is_array($toolCalls)) { $this->accumulateToolCalls($toolCallBuffer, $toolCallOrder, $toolCalls); } continue; } // 流结束 if ($event->type === ProviderEventType::Done) { $doneReason = $event->payload['reason'] ?? null; break; } // Provider 内部错误事件 if ($event->type === ProviderEventType::Error) { $latencyMs = $this->latencyMs($startedAt); $code = (string) ($event->payload['code'] ?? 'PROVIDER_ERROR'); $message = (string) ($event->payload['message'] ?? 'Agent provider error'); $this->appendProviderFailure( $sessionId, $runId, $code, $message, $providerName, $latencyMs, [ 'retryable' => $event->payload['retryable'] ?? null, 'http_status' => $event->payload['http_status'] ?? null, 'raw_message' => $event->payload['raw_message'] ?? null, ] ); $toolCalls = $this->finalizeToolCalls($toolCallBuffer, $toolCallOrder, $doneReason); return $this->streamState($reply, $doneReason, $receivedEvent, true, false, $toolCalls); } } } catch (ProviderException $exception) { $latencyMs = $this->latencyMs($startedAt); $this->appendProviderFailure( $sessionId, $runId, $exception->errorCode, $exception->getMessage(), $providerName, $latencyMs, [ 'retryable' => $exception->retryable, 'http_status' => $exception->httpStatus, 'raw_message' => $exception->rawMessage, ] ); $toolCalls = $this->finalizeToolCalls($toolCallBuffer, $toolCallOrder, $doneReason); return $this->streamState($reply, $doneReason, $receivedEvent, true, false, $toolCalls); } $toolCalls = $this->finalizeToolCalls($toolCallBuffer, $toolCallOrder, $doneReason); return $this->streamState($reply, $doneReason, $receivedEvent, false, false, $toolCalls); } /** * 统一落库 Provider 错误与 FAILED 终态。 * * @param array $meta */ private function appendProviderFailure( string $sessionId, string $runId, string $code, string $message, string $providerName, int $latencyMs, array $meta = [], ?string $statusError = null ): void { $this->outputSink->appendError($sessionId, $runId, $code, $message, array_merge($meta, [ 'provider' => $providerName, 'latency_ms' => $latencyMs, ]), "run:{$runId}:error:provider"); $this->outputSink->appendRunStatus($sessionId, $runId, 'FAILED', [ 'error' => $statusError ?? $message, 'dedupe_key' => "run:{$runId}:status:FAILED", ]); } /** * 封装流式状态返回,便于上层判断。 * * @return array{ * reply: string, * done_reason: ?string, * received_event: bool, * failed: bool, * canceled: bool, * tool_calls: array> * } */ private function streamState( string $reply, ?string $doneReason, bool $receivedEvent, bool $failed, bool $canceled, array $toolCalls ): array { return [ 'reply' => $reply, 'done_reason' => $doneReason, 'received_event' => $receivedEvent, 'failed' => $failed, 'canceled' => $canceled, 'tool_calls' => $toolCalls, ]; } /** * 工具增量收集:同一个 tool_call_id 可能多次分片返回,此处拼接参数与名称。 * * @param array> $buffer * @param array $order * @param array> $toolCalls */ private function accumulateToolCalls(array &$buffer, array &$order, array $toolCalls): void { foreach ($toolCalls as $call) { $id = is_string($call['id'] ?? null) && $call['id'] !== '' ? $call['id'] : md5(json_encode($call)); $index = is_int($call['index'] ?? null) ? (int) $call['index'] : count($order); if (! isset($buffer[$id])) { $buffer[$id] = [ 'id' => $id, 'name' => $call['name'] ?? null, 'arguments' => '', 'index' => $index, ]; $order[$id] = $index; } if (isset($call['name']) && is_string($call['name']) && $call['name'] !== '') { $buffer[$id]['name'] = $call['name']; } $arguments = $call['arguments'] ?? ''; if (is_string($arguments) && $arguments !== '') { $buffer[$id]['arguments'] .= $arguments; } } } /** * 将缓存的 tool.call 增量整理为最终列表(保持 provider 给出的顺序)。 * * @param array> $buffer * @param array $order * @return array> */ private function finalizeToolCalls(array $buffer, array $order, ?string $doneReason): array { if (empty($buffer)) { return []; } uasort($buffer, function ($a, $b) use ($order) { $orderA = $order[$a['id']] ?? ($a['index'] ?? 0); $orderB = $order[$b['id']] ?? ($b['index'] ?? 0); return $orderA <=> $orderB; }); return array_values(array_map(function (array $call) use ($doneReason) { return [ 'id' => (string) ($call['id'] ?? ''), 'name' => (string) ($call['name'] ?? ''), 'arguments' => (string) ($call['arguments'] ?? ''), 'finish_reason' => $doneReason, ]; }, $buffer)); } /** * 将 Tool 调用落库并触发子 Run。 * * @param array> $toolCalls * @return array */ private function dispatchToolRuns(string $sessionId, string $parentRunId, array $toolCalls): array { $dispatched = []; foreach ($toolCalls as $call) { $toolCallId = (string) ($call['id'] ?? ''); $name = (string) ($call['name'] ?? ''); $rawArguments = (string) ($call['arguments'] ?? ''); if ($toolCallId === '' || $name === '') { continue; } $arguments = $this->decodeToolArguments($rawArguments); $toolRunId = $this->generateToolRunId($parentRunId, $toolCallId); $toolCall = new ToolCall($toolRunId, $parentRunId, $toolCallId, $name, $arguments, $rawArguments); $this->outputSink->appendToolCall($sessionId, $toolCall); $this->toolRunDispatcher->dispatch($sessionId, $toolCall); $dispatched[] = $toolCall; } return $dispatched; } /** * 等待工具子 Run 写入 tool.result,超时/失败会直接结束父 Run。 * * @param array $toolCalls * @return array{failed: bool, canceled: bool} */ private function awaitToolResults(string $sessionId, string $parentRunId, array $toolCalls, string $providerName): array { $start = microtime(true); $expectedIds = array_map(fn (ToolCall $call) => $call->toolCallId, $toolCalls); $expectedRuns = array_map(fn (ToolCall $call) => $call->runId, $toolCalls); while (true) { if ($this->isCanceled($sessionId, $parentRunId)) { $this->appendCanceled($sessionId, $parentRunId); return ['failed' => false, 'canceled' => true]; } $results = $this->findToolResults($sessionId, $parentRunId); $statuses = $this->findToolRunStatuses($sessionId, $parentRunId); foreach ($expectedRuns as $runId) { $status = $statuses[$runId] ?? null; if ($status === 'FAILED') { $this->appendProviderFailure( $sessionId, $parentRunId, 'TOOL_RUN_FAILED', "Tool run {$runId} failed", $providerName, $this->latencyMs($start), [], 'TOOL_RUN_FAILED' ); return ['failed' => true, 'canceled' => false]; } if ($status === 'CANCELED') { $this->appendCanceled($sessionId, $parentRunId); return ['failed' => false, 'canceled' => true]; } } $readyIds = array_intersect($expectedIds, array_keys($results)); if (count($readyIds) === count($expectedIds)) { return ['failed' => false, 'canceled' => false]; } if ($this->latencyMs($start) >= $this->toolWaitTimeoutMs) { $this->appendProviderFailure( $sessionId, $parentRunId, 'TOOL_RESULT_TIMEOUT', 'Tool result wait timeout', $providerName, $this->latencyMs($start), [], 'TOOL_RESULT_TIMEOUT' ); return ['failed' => true, 'canceled' => false]; } usleep($this->toolPollIntervalMs * 1000); } } /** * @return array */ private function findToolResults(string $sessionId, string $parentRunId): array { $messages = Message::query() ->where('session_id', $sessionId) ->where('type', 'tool.result') ->whereRaw("payload->>'parent_run_id' = ?", [$parentRunId]) ->orderBy('seq') ->get(); $byToolCall = []; foreach ($messages as $message) { $toolCallId = $message->payload['tool_call_id'] ?? null; if (is_string($toolCallId) && $toolCallId !== '') { $byToolCall[$toolCallId] = $message; } } return $byToolCall; } /** * @return array */ private function findToolRunStatuses(string $sessionId, string $parentRunId): array { $messages = Message::query() ->where('session_id', $sessionId) ->where('type', 'run.status') ->whereRaw("payload->>'parent_run_id' = ?", [$parentRunId]) ->orderBy('seq') ->get(); $statuses = []; foreach ($messages as $message) { $runId = $message->payload['run_id'] ?? null; $status = $message->payload['status'] ?? null; if (is_string($runId) && is_string($status)) { $statuses[$runId] = $status; } } return $statuses; } private function generateToolRunId(string $parentRunId, string $toolCallId): string { return substr(hash('sha256', $parentRunId.'|'.$toolCallId), 0, 32); } /** * @return array */ private function decodeToolArguments(string $rawArguments): array { $decoded = json_decode($rawArguments, true); return is_array($decoded) ? $decoded : []; } /** * 计算耗时(毫秒)。 */ private function latencyMs(float $startedAt): int { return (int) ((microtime(true) - $startedAt) * 1000); } /** * 统一取消判断,便于 mock。 */ private function isCanceled(string $sessionId, string $runId): bool { return $this->cancelChecker->isCanceled($sessionId, $runId); } /** * 返回 Provider 名称(Dummy 使用短名)。 */ private function resolveProviderName(): string { if ($this->provider instanceof DummyAgentProvider) { return 'dummy'; } return str_replace("\0", '', get_class($this->provider)); } }