Files
ars-backend/app/Services/RunLoop.php
Roog 71226c255b 添加新的工具功能和测试覆盖:
- 注册 `LsTool` 和 `BashTool` 工具,支持目录操作和命令执行
- 增强工具调用逻辑,添加日志记录以提升调试能力
- 增加 `ToolRegistry` 和 `RunLoop` 的增量累积与排序优化
- 完善单元测试覆盖新工具的执行与行为验证
2025-12-23 17:26:27 +08:00

877 lines
28 KiB
PHP
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<?php
namespace App\Services;
use App\Services\Agent\AgentProviderInterface;
use App\Services\Agent\AgentContext;
use App\Services\Agent\DummyAgentProvider;
use App\Services\Agent\ProviderEventType;
use App\Services\Agent\ProviderException;
use App\Models\Message;
use App\Services\Tool\ToolCall;
use App\Services\Tool\ToolRunDispatcher;
/**
* Agent Run 主循环:
* - 构建上下文,消费 Provider 事件流Streaming
* - 处理取消、错误、增量输出、终态写回
*/
class RunLoop
{
private const TERMINAL_STATUSES = ['DONE', 'FAILED', 'CANCELED'];
private readonly int $maxToolCalls;
private readonly int $toolWaitTimeoutMs;
private readonly int $toolPollIntervalMs;
public function __construct(
private readonly ContextBuilder $contextBuilder,
private readonly AgentProviderInterface $provider,
private readonly OutputSink $outputSink,
private readonly CancelChecker $cancelChecker,
private readonly ToolRunDispatcher $toolRunDispatcher,
) {
$this->maxToolCalls = (int) config('agent.tools.max_calls_per_run', 10);
$this->toolWaitTimeoutMs = (int) config('agent.tools.wait_timeout_ms', 15000);
$this->toolPollIntervalMs = (int) config('agent.tools.wait_poll_interval_ms', 200);
}
/**
* 运行单次 Agent Run按 run_id 幂等)。
*
* 主流程:
* 1. 检查 run 是否已终止
* 2. 进入主循环,持续调用 Provider 直到完成或失败
* 3. 每轮迭代可能触发工具调用,工具完成后继续下一轮
* 4. 没有工具调用时,写入最终回复并标记 DONE
*/
public function run(string $sessionId, string $runId): void
{
// 1. 幂等性检查:避免重复执行已完成的 run
if ($this->isRunTerminal($sessionId, $runId)) {
return;
}
$providerName = $this->resolveProviderName();
$toolCallCount = 0;
// 2. 主循环:持续调用 Provider 直到完成或失败
while (true) {
// 2.1 检查用户是否取消
if ($this->checkAndHandleCancel($sessionId, $runId)) {
return;
}
// 2.2 执行一轮 Provider 调用
$iterationResult = $this->executeProviderIteration(
$sessionId,
$runId,
$providerName,
$toolCallCount
);
logger('agent provider iteration', [$iterationResult]);
// 2.3 处理失败或取消
if ($iterationResult['should_exit']) {
return;
}
// 2.4 如果有工具调用,处理工具执行流程
if ($iterationResult['has_tool_calls']) {
$shouldExit = $this->handleToolCalls(
$sessionId,
$runId,
$providerName,
$iterationResult,
$toolCallCount
);
if ($shouldExit) {
return;
}
// 更新工具调用计数,继续下一轮 Provider 调用
$toolCallCount = $iterationResult['updated_tool_count'];
continue;
}
// 2.5 没有工具调用,完成 run
$this->completeRun(
$sessionId,
$runId,
$providerName,
$iterationResult['stream_state'],
$iterationResult['latency_ms']
);
return;
}
}
/**
* 检查并处理取消状态。
*
* @return bool 是否已处理取消true 表示已取消并写入状态)
*/
private function checkAndHandleCancel(string $sessionId, string $runId): bool
{
if ($this->isCanceled($sessionId, $runId)) {
$this->appendCanceled($sessionId, $runId);
return true;
}
return false;
}
/**
* 执行一轮 Provider 调用迭代。
*
* 包括:
* - 构建上下文
* - 准备 Provider 选项(工具调用限制、取消回调等)
* - 调用 Provider 流式接口
* - 记录日志
*
* @return array{
* stream_state: array,
* has_tool_calls: bool,
* updated_tool_count: int,
* should_exit: bool,
* latency_ms: int
* }
*/
private function executeProviderIteration(
string $sessionId,
string $runId,
string $providerName,
int $currentToolCallCount
): array {
// 1. 构建上下文和 Provider 选项
$context = $this->contextBuilder->build($sessionId, $runId);
$providerOptions = $this->buildProviderOptions($sessionId, $runId, $currentToolCallCount);
// 2. 记录调用日志
$this->logProviderRequest($sessionId, $runId, $providerName, $context, $providerOptions, $currentToolCallCount);
// 3. 调用 Provider 并消费事件流
$startedAt = microtime(true);
$streamState = $this->consumeProviderStream(
$sessionId,
$runId,
$context,
$providerName,
$startedAt,
$providerOptions
);
$latencyMs = $this->latencyMs($startedAt);
// 4. 检查流式调用是否失败或取消
if ($streamState['canceled'] || $streamState['failed']) {
return [
'stream_state' => $streamState,
'has_tool_calls' => false,
'updated_tool_count' => $currentToolCallCount,
'should_exit' => true,
'latency_ms' => $latencyMs,
];
}
// 5. 检查是否有工具调用
$hasToolCalls = !empty($streamState['tool_calls']);
$updatedToolCount = $currentToolCallCount + count($streamState['tool_calls']);
return [
'stream_state' => $streamState,
'has_tool_calls' => $hasToolCalls,
'updated_tool_count' => $updatedToolCount,
'should_exit' => false,
'latency_ms' => $latencyMs,
];
}
/**
* 构建 Provider 调用选项。
*
* 包括:
* - 取消检查回调
* - 工具调用限制控制
*/
private function buildProviderOptions(string $sessionId, string $runId, int $toolCallCount): array
{
$options = [
'should_stop' => fn () => $this->isCanceled($sessionId, $runId),
];
// 达到工具调用上限后,禁用工具列表,避免再次触发 TOOL_CALL_LIMIT 错误
if ($toolCallCount >= $this->maxToolCalls) {
$options['disable_tools'] = true;
}
return $options;
}
/**
* 记录 Provider 请求日志。
*/
private function logProviderRequest(
string $sessionId,
string $runId,
string $providerName,
AgentContext $context,
array $providerOptions,
int $iteration
): void {
// 日志选项(移除不可序列化的回调)
$logOptions = $providerOptions;
unset($logOptions['should_stop']);
logger('agent provider context', [
'sessionId' => $sessionId,
'runId' => $runId,
'provider' => $providerName,
'context' => $context,
'provider_options' => $logOptions,
]);
logger('agent provider request', [
'sessionId' => $sessionId,
'runId' => $runId,
'provider' => $providerName,
'iteration' => $iteration,
]);
}
/**
* 处理工具调用流程。
*
* 流程:
* 1. 检查工具调用数量是否超限
* 2. 分发工具子 Run
* 3. 等待工具执行结果
*
* @return bool 是否应该退出主循环(超限、失败或取消时返回 true
*/
private function handleToolCalls(
string $sessionId,
string $runId,
string $providerName,
array $iterationResult,
int $originalToolCallCount
): bool {
$streamState = $iterationResult['stream_state'];
$latencyMs = $iterationResult['latency_ms'];
$updatedToolCount = $iterationResult['updated_tool_count'];
logger('agent tool calls', [$streamState, $latencyMs, $updatedToolCount]);
// 1. 检查工具调用数量是否超限
if ($updatedToolCount > $this->maxToolCalls) {
$this->appendProviderFailure(
$sessionId,
$runId,
'TOOL_CALL_LIMIT',
'Tool call limit reached for this run',
$providerName,
$latencyMs,
[],
'TOOL_CALL_LIMIT'
);
return true; // 退出主循环
}
// 2. 分发工具子 Run
$toolCalls = $this->dispatchToolRuns($sessionId, $runId, $streamState['tool_calls']);
// 3. 等待所有工具执行完成
$waitState = $this->awaitToolResults($sessionId, $runId, $toolCalls, $providerName);
// 4. 检查等待过程中是否失败或取消
if ($waitState['failed'] || $waitState['canceled']) {
return true; // 退出主循环
}
// 工具结果已写回上下文,继续下一轮 Agent 调用
return false;
}
/**
* 完成 Run 并写入最终状态。
*
* 流程:
* 1. 验证流式响应的有效性
* 2. 写入最终 agent.message
* 3. 再次检查取消状态
* 4. 写入 run.status = DONE
*/
private function completeRun(
string $sessionId,
string $runId,
string $providerName,
array $streamState,
int $latencyMs
): void {
// 1. 记录响应日志
logger('agent provider response', [
'sessionId' => $sessionId,
'runId' => $runId,
'provider' => $providerName,
'latency_ms' => $latencyMs,
]);
// 2. 再次检查取消状态(在写入最终消息前)
if ($this->checkAndHandleCancel($sessionId, $runId)) {
return;
}
// 3. 验证流式响应的有效性
if (!$this->validateStreamResponse($sessionId, $runId, $providerName, $streamState, $latencyMs)) {
return;
}
// 4. 写入最终 agent.message
$this->outputSink->appendAgentMessage($sessionId, $runId, $streamState['reply'], [
'provider' => $providerName,
'done_reason' => $streamState['done_reason'],
], "run:{$runId}:agent:message");
// 5. 最后一次检查取消状态(在写入 DONE 前)
if ($this->checkAndHandleCancel($sessionId, $runId)) {
return;
}
// 6. 写入 run.status = DONE
$this->outputSink->appendRunStatus($sessionId, $runId, 'DONE', [
'dedupe_key' => "run:{$runId}:status:DONE",
]);
}
/**
* 验证流式响应的有效性。
*
* 检查:
* - 是否收到任何事件(避免空流)
* - 流是否正常结束(有 done_reason
*
* @return bool 是否有效true 表示有效false 表示无效并已写入错误)
*/
private function validateStreamResponse(
string $sessionId,
string $runId,
string $providerName,
array $streamState,
int $latencyMs
): bool {
// 1. 检查是否收到任何事件
if (!$streamState['received_event']) {
$this->appendProviderFailure(
$sessionId,
$runId,
'EMPTY_STREAM',
'Agent provider returned no events',
$providerName,
$latencyMs,
[],
'EMPTY_STREAM'
);
return false;
}
// 2. 检查流是否正常结束
if ($streamState['done_reason'] === null) {
$this->appendProviderFailure(
$sessionId,
$runId,
'STREAM_INCOMPLETE',
'Agent provider stream ended unexpectedly',
$providerName,
$latencyMs,
[],
'STREAM_INCOMPLETE'
);
return false;
}
return true;
}
/**
* 判断指定 run 是否已到终态,避免重复执行。
*/
private function isRunTerminal(string $sessionId, string $runId): bool
{
$latestStatus = Message::query()
->where('session_id', $sessionId)
->where('type', 'run.status')
->whereRaw("payload->>'run_id' = ?", [$runId])
->orderByDesc('seq')
->first();
$status = $latestStatus ? ($latestStatus->payload['status'] ?? null) : null;
return in_array($status, self::TERMINAL_STATUSES, true);
}
/**
* 取消时写入终态 CANCELED幂等
*/
private function appendCanceled(string $sessionId, string $runId): void
{
$this->outputSink->appendRunStatus($sessionId, $runId, 'CANCELED', [
'dedupe_key' => "run:{$runId}:status:CANCELED",
]);
}
/**
* 消费 Provider Streaming 事件流:
* - message.delta落增量并累计最终回复
* - done记录结束理由
* - error/异常:写入 error + FAILED
* - cancel即时中断并写 CANCELED
* - tool.delta/tool.call收集工具调用信息后续驱动子 Run
*
* @return array{
* reply: string,
* done_reason: ?string,
* received_event: bool,
* failed: bool,
* canceled: bool,
* tool_calls: array<int, array<string, mixed>>
* }
*/
private function consumeProviderStream(
string $sessionId,
string $runId,
AgentContext $context,
string $providerName,
float $startedAt,
array $providerOptions = []
): array {
$reply = '';
$deltaIndex = 0;
$doneReason = null;
$receivedEvent = false;
$toolCallBuffer = [];
$toolCallOrder = [];
try {
$providerOptions = array_merge([
'should_stop' => fn () => $this->isCanceled($sessionId, $runId),
], $providerOptions);
foreach ($this->provider->stream($context, $providerOptions) as $event) {
$receivedEvent = true;
if ($this->isCanceled($sessionId, $runId)) {
$this->appendCanceled($sessionId, $runId);
$toolCalls = $this->finalizeToolCalls($toolCallBuffer, $toolCallOrder, $doneReason);
return $this->streamState($reply, $doneReason, $receivedEvent, false, true, $toolCalls);
}
// 文本增量:持续写 message.delta 并拼接最终回复
if ($event->type === ProviderEventType::MessageDelta) {
$text = (string) ($event->payload['text'] ?? '');
if ($text !== '') {
$reply .= $text;
$deltaIndex++;
$this->outputSink->appendAgentDelta($sessionId, $runId, $text, $deltaIndex, [
'provider' => $providerName,
]);
}
continue;
}
if ($event->type === ProviderEventType::ToolDelta || $event->type === ProviderEventType::ToolCall) {
$toolCalls = $event->payload['tool_calls'] ?? [];
if (is_array($toolCalls)) {
$this->accumulateToolCalls($toolCallBuffer, $toolCallOrder, $toolCalls);
}
continue;
}
// 流结束
if ($event->type === ProviderEventType::Done) {
$doneReason = $event->payload['reason'] ?? null;
break;
}
// Provider 内部错误事件
if ($event->type === ProviderEventType::Error) {
$latencyMs = $this->latencyMs($startedAt);
$code = (string) ($event->payload['code'] ?? 'PROVIDER_ERROR');
$message = (string) ($event->payload['message'] ?? 'Agent provider error');
$this->appendProviderFailure(
$sessionId,
$runId,
$code,
$message,
$providerName,
$latencyMs,
[
'retryable' => $event->payload['retryable'] ?? null,
'http_status' => $event->payload['http_status'] ?? null,
'raw_message' => $event->payload['raw_message'] ?? null,
]
);
$toolCalls = $this->finalizeToolCalls($toolCallBuffer, $toolCallOrder, $doneReason);
return $this->streamState($reply, $doneReason, $receivedEvent, true, false, $toolCalls);
}
}
} catch (ProviderException $exception) {
$latencyMs = $this->latencyMs($startedAt);
$this->appendProviderFailure(
$sessionId,
$runId,
$exception->errorCode,
$exception->getMessage(),
$providerName,
$latencyMs,
[
'retryable' => $exception->retryable,
'http_status' => $exception->httpStatus,
'raw_message' => $exception->rawMessage,
]
);
$toolCalls = $this->finalizeToolCalls($toolCallBuffer, $toolCallOrder, $doneReason);
return $this->streamState($reply, $doneReason, $receivedEvent, true, false, $toolCalls);
}
$toolCalls = $this->finalizeToolCalls($toolCallBuffer, $toolCallOrder, $doneReason);
return $this->streamState($reply, $doneReason, $receivedEvent, false, false, $toolCalls);
}
/**
* 统一落库 Provider 错误与 FAILED 终态。
*
* @param array<string, mixed> $meta
*/
private function appendProviderFailure(
string $sessionId,
string $runId,
string $code,
string $message,
string $providerName,
int $latencyMs,
array $meta = [],
?string $statusError = null
): void {
$this->outputSink->appendError($sessionId, $runId, $code, $message, array_merge($meta, [
'provider' => $providerName,
'latency_ms' => $latencyMs,
]), "run:{$runId}:error:provider");
$this->outputSink->appendRunStatus($sessionId, $runId, 'FAILED', [
'error' => $statusError ?? $message,
'dedupe_key' => "run:{$runId}:status:FAILED",
]);
}
/**
* 封装流式状态返回,便于上层判断。
*
* @return array{
* reply: string,
* done_reason: ?string,
* received_event: bool,
* failed: bool,
* canceled: bool,
* tool_calls: array<int, array<string, mixed>>
* }
*/
private function streamState(
string $reply,
?string $doneReason,
bool $receivedEvent,
bool $failed,
bool $canceled,
array $toolCalls
): array {
return [
'reply' => $reply,
'done_reason' => $doneReason,
'received_event' => $receivedEvent,
'failed' => $failed,
'canceled' => $canceled,
'tool_calls' => $toolCalls,
];
}
/**
* 工具增量收集:同一个 tool call 通过 index 关联,多次分片返回时拼接参数与名称。
*
* OpenAI 流式 API 中tool call 的第一个 chunk 包含 id、name、index
* 后续 chunks 只包含 arguments 增量和 index无 id
* 因此必须使用 index 作为 buffer 的 key 来正确累积。
*
* @param array<int, array<string, mixed>> $buffer 以 index 为 key 的缓冲区
* @param array<int, int> $order 记录 index 出现顺序
* @param array<int, array<string, mixed>> $toolCalls
*/
private function accumulateToolCalls(array &$buffer, array &$order, array $toolCalls): void
{
foreach ($toolCalls as $call) {
// 使用 index 作为主键OpenAI 流式 API 的标准做法)
$index = is_int($call['index'] ?? null) ? (int) $call['index'] : 0;
if (! isset($buffer[$index])) {
$buffer[$index] = [
'id' => $call['id'] ?? null,
'name' => $call['name'] ?? null,
'arguments' => '',
'index' => $index,
];
$order[$index] = count($order);
}
// 更新 id第一个 chunk 才有)
if (isset($call['id']) && is_string($call['id']) && $call['id'] !== '') {
$buffer[$index]['id'] = $call['id'];
}
// 更新 name第一个 chunk 才有)
if (isset($call['name']) && is_string($call['name']) && $call['name'] !== '') {
$buffer[$index]['name'] = $call['name'];
}
// 累积 arguments
$arguments = $call['arguments'] ?? '';
if (is_string($arguments) && $arguments !== '') {
$buffer[$index]['arguments'] .= $arguments;
}
}
}
/**
* 将缓存的 tool.call 增量整理为最终列表(按 index 排序)。
*
* @param array<int, array<string, mixed>> $buffer 以 index 为 key 的缓冲区
* @param array<int, int> $order 记录 index 出现顺序
* @return array<int, array<string, mixed>>
*/
private function finalizeToolCalls(array $buffer, array $order, ?string $doneReason): array
{
if (empty($buffer)) {
return [];
}
// 按 index 排序
ksort($buffer);
return array_values(array_map(function (array $call) use ($doneReason) {
return [
'id' => (string) ($call['id'] ?? ''),
'name' => (string) ($call['name'] ?? ''),
'arguments' => (string) ($call['arguments'] ?? ''),
'finish_reason' => $doneReason,
];
}, $buffer));
}
/**
* 将 Tool 调用落库并触发子 Run。
*
* @param array<int, array<string, mixed>> $toolCalls
* @return array<int, ToolCall>
*/
private function dispatchToolRuns(string $sessionId, string $parentRunId, array $toolCalls): array
{
$dispatched = [];
foreach ($toolCalls as $call) {
$toolCallId = (string) ($call['id'] ?? '');
$name = (string) ($call['name'] ?? '');
$rawArguments = (string) ($call['arguments'] ?? '');
if ($toolCallId === '' || $name === '') {
continue;
}
$arguments = $this->decodeToolArguments($rawArguments);
$toolRunId = $this->generateToolRunId($parentRunId, $toolCallId);
$toolCall = new ToolCall($toolRunId, $parentRunId, $toolCallId, $name, $arguments, $rawArguments);
$this->outputSink->appendToolCall($sessionId, $toolCall);
$this->toolRunDispatcher->dispatch($sessionId, $toolCall);
$dispatched[] = $toolCall;
}
return $dispatched;
}
/**
* 等待工具子 Run 写入 tool.result超时/失败会直接结束父 Run。
*
* @param array<int, ToolCall> $toolCalls
* @return array{failed: bool, canceled: bool}
*/
private function awaitToolResults(string $sessionId, string $parentRunId, array $toolCalls, string $providerName): array
{
$start = microtime(true);
$expectedIds = array_map(fn (ToolCall $call) => $call->toolCallId, $toolCalls);
$expectedRuns = array_map(fn (ToolCall $call) => $call->runId, $toolCalls);
while (true) {
if ($this->isCanceled($sessionId, $parentRunId)) {
$this->appendCanceled($sessionId, $parentRunId);
return ['failed' => false, 'canceled' => true];
}
$results = $this->findToolResults($sessionId, $parentRunId);
$statuses = $this->findToolRunStatuses($sessionId, $parentRunId);
foreach ($expectedRuns as $runId) {
$status = $statuses[$runId] ?? null;
if ($status === 'FAILED') {
$this->appendProviderFailure(
$sessionId,
$parentRunId,
'TOOL_RUN_FAILED',
"Tool run {$runId} failed",
$providerName,
$this->latencyMs($start),
[],
'TOOL_RUN_FAILED'
);
return ['failed' => true, 'canceled' => false];
}
if ($status === 'CANCELED') {
$this->appendCanceled($sessionId, $parentRunId);
return ['failed' => false, 'canceled' => true];
}
}
$readyIds = array_intersect($expectedIds, array_keys($results));
if (count($readyIds) === count($expectedIds)) {
return ['failed' => false, 'canceled' => false];
}
if ($this->latencyMs($start) >= $this->toolWaitTimeoutMs) {
$this->appendProviderFailure(
$sessionId,
$parentRunId,
'TOOL_RESULT_TIMEOUT',
'Tool result wait timeout',
$providerName,
$this->latencyMs($start),
[],
'TOOL_RESULT_TIMEOUT'
);
return ['failed' => true, 'canceled' => false];
}
usleep($this->toolPollIntervalMs * 1000);
}
}
/**
* @return array<string, \App\Models\Message>
*/
private function findToolResults(string $sessionId, string $parentRunId): array
{
$messages = Message::query()
->where('session_id', $sessionId)
->where('type', 'tool.result')
->whereRaw("payload->>'parent_run_id' = ?", [$parentRunId])
->orderBy('seq')
->get();
$byToolCall = [];
foreach ($messages as $message) {
$toolCallId = $message->payload['tool_call_id'] ?? null;
if (is_string($toolCallId) && $toolCallId !== '') {
$byToolCall[$toolCallId] = $message;
}
}
return $byToolCall;
}
/**
* @return array<string, string>
*/
private function findToolRunStatuses(string $sessionId, string $parentRunId): array
{
$messages = Message::query()
->where('session_id', $sessionId)
->where('type', 'run.status')
->whereRaw("payload->>'parent_run_id' = ?", [$parentRunId])
->orderBy('seq')
->get();
$statuses = [];
foreach ($messages as $message) {
$runId = $message->payload['run_id'] ?? null;
$status = $message->payload['status'] ?? null;
if (is_string($runId) && is_string($status)) {
$statuses[$runId] = $status;
}
}
return $statuses;
}
private function generateToolRunId(string $parentRunId, string $toolCallId): string
{
return substr(hash('sha256', $parentRunId.'|'.$toolCallId), 0, 32);
}
/**
* @return array<string, mixed>
*/
private function decodeToolArguments(string $rawArguments): array
{
$decoded = json_decode($rawArguments, true);
return is_array($decoded) ? $decoded : [];
}
/**
* 计算耗时(毫秒)。
*/
private function latencyMs(float $startedAt): int
{
return (int) ((microtime(true) - $startedAt) * 1000);
}
/**
* 统一取消判断,便于 mock。
*/
private function isCanceled(string $sessionId, string $runId): bool
{
return $this->cancelChecker->isCanceled($sessionId, $runId);
}
/**
* 返回 Provider 名称Dummy 使用短名)。
*/
private function resolveProviderName(): string
{
if ($this->provider instanceof DummyAgentProvider) {
return 'dummy';
}
return str_replace("\0", '', get_class($this->provider));
}
}