Files
ars-backend/app/Services/RunLoop.php
ROOG 59d4831f00 main: 增强工具调用与消息流程
- 支持 tool.call 和 tool.result 消息类型处理
- 引入 Tool 调度与执行逻辑,支持超时与结果截断
- 增加 ToolRegistry 和 ToolExecutor 管理工具定义与执行
- 更新上下文构建与消息映射逻辑,适配工具闭环处理
- 扩展配置与环境变量,支持 Tool 调用相关选项
- 增强单元测试覆盖工具调用与执行情景
- 更新文档和 OpenAPI,新增工具相关说明与模型定义
2025-12-22 12:36:59 +08:00

655 lines
22 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<?php
namespace App\Services;
use App\Services\Agent\AgentProviderInterface;
use App\Services\Agent\AgentContext;
use App\Services\Agent\DummyAgentProvider;
use App\Services\Agent\ProviderEventType;
use App\Services\Agent\ProviderException;
use App\Models\Message;
use App\Services\Tool\ToolCall;
use App\Services\Tool\ToolRunDispatcher;
/**
* Agent Run 主循环:
* - 构建上下文,消费 Provider 事件流Streaming
* - 处理取消、错误、增量输出、终态写回
*/
class RunLoop
{
private const TERMINAL_STATUSES = ['DONE', 'FAILED', 'CANCELED'];
private readonly int $maxToolCalls;
private readonly int $toolWaitTimeoutMs;
private readonly int $toolPollIntervalMs;
public function __construct(
private readonly ContextBuilder $contextBuilder,
private readonly AgentProviderInterface $provider,
private readonly OutputSink $outputSink,
private readonly CancelChecker $cancelChecker,
private readonly ToolRunDispatcher $toolRunDispatcher,
) {
$this->maxToolCalls = (int) config('agent.tools.max_calls_per_run', 10);
$this->toolWaitTimeoutMs = (int) config('agent.tools.wait_timeout_ms', 15000);
$this->toolPollIntervalMs = (int) config('agent.tools.wait_poll_interval_ms', 200);
}
/**
* 运行单次 Agent Run按 run_id 幂等负责取消检查、Provider 调用和结果落库。
*/
public function run(string $sessionId, string $runId): void
{
if ($this->isRunTerminal($sessionId, $runId)) {
return;
}
$providerName = $this->resolveProviderName();
$toolCallCount = 0;
while (true) {
if ($this->isCanceled($sessionId, $runId)) {
$this->appendCanceled($sessionId, $runId);
return;
}
$context = $this->contextBuilder->build($sessionId, $runId);
$providerOptions = [
'should_stop' => fn () => $this->isCanceled($sessionId, $runId),
];
// 达到工具调用上限后强制关闭后续工具调用,避免再次触发 TOOL_CALL_LIMIT。
if ($toolCallCount >= $this->maxToolCalls) {
$providerOptions['tool_choice'] = 'none';
}
$logOptions = $providerOptions;
unset($logOptions['should_stop']);
logger('agent provider context', [
'sessionId' => $sessionId,
'runId' => $runId,
'provider' => $providerName,
'context' => $context,
'provider_options' => $logOptions,
]);
$startedAt = microtime(true);
logger('agent provider request', [
'sessionId' => $sessionId,
'runId' => $runId,
'provider' => $providerName,
'iteration' => $toolCallCount,
]);
// 单轮 Agent 调用(可能触发工具调用,后续再进下一轮)
$streamState = $this->consumeProviderStream($sessionId, $runId, $context, $providerName, $startedAt, $providerOptions);
if ($streamState['canceled'] || $streamState['failed']) {
return;
}
if (! empty($streamState['tool_calls'])) {
$toolCallCount += count($streamState['tool_calls']);
if ($toolCallCount > $this->maxToolCalls) {
$this->appendProviderFailure(
$sessionId,
$runId,
'TOOL_CALL_LIMIT',
'Tool call limit reached for this run',
$providerName,
$this->latencyMs($startedAt),
[],
'TOOL_CALL_LIMIT'
);
return;
}
// 工具调用:先调度子 Run再等待 tool.result随后继续下一轮 Provider 调用。
$toolCalls = $this->dispatchToolRuns($sessionId, $runId, $streamState['tool_calls']);
$waitState = $this->awaitToolResults($sessionId, $runId, $toolCalls, $providerName);
if ($waitState['failed'] || $waitState['canceled']) {
return;
}
// 工具结果已写回上下文,继续下一轮 Agent 调用。
continue;
}
$latencyMs = $this->latencyMs($startedAt);
logger('agent provider response', [
'sessionId' => $sessionId,
'runId' => $runId,
'provider' => $providerName,
'latency_ms' => $latencyMs,
]);
if ($this->isCanceled($sessionId, $runId)) {
$this->appendCanceled($sessionId, $runId);
return;
}
if (! $streamState['received_event']) {
$this->appendProviderFailure(
$sessionId,
$runId,
'EMPTY_STREAM',
'Agent provider returned no events',
$providerName,
$latencyMs,
[],
'EMPTY_STREAM'
);
return;
}
if ($streamState['done_reason'] === null) {
$this->appendProviderFailure(
$sessionId,
$runId,
'STREAM_INCOMPLETE',
'Agent provider stream ended unexpectedly',
$providerName,
$latencyMs,
[],
'STREAM_INCOMPLETE'
);
return;
}
$this->outputSink->appendAgentMessage($sessionId, $runId, $streamState['reply'], [
'provider' => $providerName,
'done_reason' => $streamState['done_reason'],
], "run:{$runId}:agent:message");
if ($this->isCanceled($sessionId, $runId)) {
$this->appendCanceled($sessionId, $runId);
return;
}
$this->outputSink->appendRunStatus($sessionId, $runId, 'DONE', [
'dedupe_key' => "run:{$runId}:status:DONE",
]);
return;
}
}
/**
* 判断指定 run 是否已到终态,避免重复执行。
*/
private function isRunTerminal(string $sessionId, string $runId): bool
{
$latestStatus = Message::query()
->where('session_id', $sessionId)
->where('type', 'run.status')
->whereRaw("payload->>'run_id' = ?", [$runId])
->orderByDesc('seq')
->first();
$status = $latestStatus ? ($latestStatus->payload['status'] ?? null) : null;
return in_array($status, self::TERMINAL_STATUSES, true);
}
/**
* 取消时写入终态 CANCELED幂等
*/
private function appendCanceled(string $sessionId, string $runId): void
{
$this->outputSink->appendRunStatus($sessionId, $runId, 'CANCELED', [
'dedupe_key' => "run:{$runId}:status:CANCELED",
]);
}
/**
* 消费 Provider Streaming 事件流:
* - message.delta落增量并累计最终回复
* - done记录结束理由
* - error/异常:写入 error + FAILED
* - cancel即时中断并写 CANCELED
* - tool.delta/tool.call收集工具调用信息后续驱动子 Run
*
* @return array{
* reply: string,
* done_reason: ?string,
* received_event: bool,
* failed: bool,
* canceled: bool,
* tool_calls: array<int, array<string, mixed>>
* }
*/
private function consumeProviderStream(
string $sessionId,
string $runId,
AgentContext $context,
string $providerName,
float $startedAt,
array $providerOptions = []
): array {
$reply = '';
$deltaIndex = 0;
$doneReason = null;
$receivedEvent = false;
$toolCallBuffer = [];
$toolCallOrder = [];
try {
$providerOptions = array_merge([
'should_stop' => fn () => $this->isCanceled($sessionId, $runId),
], $providerOptions);
foreach ($this->provider->stream($context, $providerOptions) as $event) {
$receivedEvent = true;
if ($this->isCanceled($sessionId, $runId)) {
$this->appendCanceled($sessionId, $runId);
$toolCalls = $this->finalizeToolCalls($toolCallBuffer, $toolCallOrder, $doneReason);
return $this->streamState($reply, $doneReason, $receivedEvent, false, true, $toolCalls);
}
// 文本增量:持续写 message.delta 并拼接最终回复
if ($event->type === ProviderEventType::MessageDelta) {
$text = (string) ($event->payload['text'] ?? '');
if ($text !== '') {
$reply .= $text;
$deltaIndex++;
$this->outputSink->appendAgentDelta($sessionId, $runId, $text, $deltaIndex, [
'provider' => $providerName,
]);
}
continue;
}
if ($event->type === ProviderEventType::ToolDelta || $event->type === ProviderEventType::ToolCall) {
$toolCalls = $event->payload['tool_calls'] ?? [];
if (is_array($toolCalls)) {
$this->accumulateToolCalls($toolCallBuffer, $toolCallOrder, $toolCalls);
}
continue;
}
// 流结束
if ($event->type === ProviderEventType::Done) {
$doneReason = $event->payload['reason'] ?? null;
break;
}
// Provider 内部错误事件
if ($event->type === ProviderEventType::Error) {
$latencyMs = $this->latencyMs($startedAt);
$code = (string) ($event->payload['code'] ?? 'PROVIDER_ERROR');
$message = (string) ($event->payload['message'] ?? 'Agent provider error');
$this->appendProviderFailure(
$sessionId,
$runId,
$code,
$message,
$providerName,
$latencyMs,
[
'retryable' => $event->payload['retryable'] ?? null,
'http_status' => $event->payload['http_status'] ?? null,
'raw_message' => $event->payload['raw_message'] ?? null,
]
);
$toolCalls = $this->finalizeToolCalls($toolCallBuffer, $toolCallOrder, $doneReason);
return $this->streamState($reply, $doneReason, $receivedEvent, true, false, $toolCalls);
}
}
} catch (ProviderException $exception) {
$latencyMs = $this->latencyMs($startedAt);
$this->appendProviderFailure(
$sessionId,
$runId,
$exception->errorCode,
$exception->getMessage(),
$providerName,
$latencyMs,
[
'retryable' => $exception->retryable,
'http_status' => $exception->httpStatus,
'raw_message' => $exception->rawMessage,
]
);
$toolCalls = $this->finalizeToolCalls($toolCallBuffer, $toolCallOrder, $doneReason);
return $this->streamState($reply, $doneReason, $receivedEvent, true, false, $toolCalls);
}
$toolCalls = $this->finalizeToolCalls($toolCallBuffer, $toolCallOrder, $doneReason);
return $this->streamState($reply, $doneReason, $receivedEvent, false, false, $toolCalls);
}
/**
* 统一落库 Provider 错误与 FAILED 终态。
*
* @param array<string, mixed> $meta
*/
private function appendProviderFailure(
string $sessionId,
string $runId,
string $code,
string $message,
string $providerName,
int $latencyMs,
array $meta = [],
?string $statusError = null
): void {
$this->outputSink->appendError($sessionId, $runId, $code, $message, array_merge($meta, [
'provider' => $providerName,
'latency_ms' => $latencyMs,
]), "run:{$runId}:error:provider");
$this->outputSink->appendRunStatus($sessionId, $runId, 'FAILED', [
'error' => $statusError ?? $message,
'dedupe_key' => "run:{$runId}:status:FAILED",
]);
}
/**
* 封装流式状态返回,便于上层判断。
*
* @return array{
* reply: string,
* done_reason: ?string,
* received_event: bool,
* failed: bool,
* canceled: bool,
* tool_calls: array<int, array<string, mixed>>
* }
*/
private function streamState(
string $reply,
?string $doneReason,
bool $receivedEvent,
bool $failed,
bool $canceled,
array $toolCalls
): array {
return [
'reply' => $reply,
'done_reason' => $doneReason,
'received_event' => $receivedEvent,
'failed' => $failed,
'canceled' => $canceled,
'tool_calls' => $toolCalls,
];
}
/**
* 工具增量收集:同一个 tool_call_id 可能多次分片返回,此处拼接参数与名称。
*
* @param array<string, array<string, mixed>> $buffer
* @param array<string, int> $order
* @param array<int, array<string, mixed>> $toolCalls
*/
private function accumulateToolCalls(array &$buffer, array &$order, array $toolCalls): void
{
foreach ($toolCalls as $call) {
$id = is_string($call['id'] ?? null) && $call['id'] !== ''
? $call['id']
: md5(json_encode($call));
$index = is_int($call['index'] ?? null) ? (int) $call['index'] : count($order);
if (! isset($buffer[$id])) {
$buffer[$id] = [
'id' => $id,
'name' => $call['name'] ?? null,
'arguments' => '',
'index' => $index,
];
$order[$id] = $index;
}
if (isset($call['name']) && is_string($call['name']) && $call['name'] !== '') {
$buffer[$id]['name'] = $call['name'];
}
$arguments = $call['arguments'] ?? '';
if (is_string($arguments) && $arguments !== '') {
$buffer[$id]['arguments'] .= $arguments;
}
}
}
/**
* 将缓存的 tool.call 增量整理为最终列表(保持 provider 给出的顺序)。
*
* @param array<string, array<string, mixed>> $buffer
* @param array<string, int> $order
* @return array<int, array<string, mixed>>
*/
private function finalizeToolCalls(array $buffer, array $order, ?string $doneReason): array
{
if (empty($buffer)) {
return [];
}
uasort($buffer, function ($a, $b) use ($order) {
$orderA = $order[$a['id']] ?? ($a['index'] ?? 0);
$orderB = $order[$b['id']] ?? ($b['index'] ?? 0);
return $orderA <=> $orderB;
});
return array_values(array_map(function (array $call) use ($doneReason) {
return [
'id' => (string) ($call['id'] ?? ''),
'name' => (string) ($call['name'] ?? ''),
'arguments' => (string) ($call['arguments'] ?? ''),
'finish_reason' => $doneReason,
];
}, $buffer));
}
/**
* 将 Tool 调用落库并触发子 Run。
*
* @param array<int, array<string, mixed>> $toolCalls
* @return array<int, ToolCall>
*/
private function dispatchToolRuns(string $sessionId, string $parentRunId, array $toolCalls): array
{
$dispatched = [];
foreach ($toolCalls as $call) {
$toolCallId = (string) ($call['id'] ?? '');
$name = (string) ($call['name'] ?? '');
$rawArguments = (string) ($call['arguments'] ?? '');
if ($toolCallId === '' || $name === '') {
continue;
}
$arguments = $this->decodeToolArguments($rawArguments);
$toolRunId = $this->generateToolRunId($parentRunId, $toolCallId);
$toolCall = new ToolCall($toolRunId, $parentRunId, $toolCallId, $name, $arguments, $rawArguments);
$this->outputSink->appendToolCall($sessionId, $toolCall);
$this->toolRunDispatcher->dispatch($sessionId, $toolCall);
$dispatched[] = $toolCall;
}
return $dispatched;
}
/**
* 等待工具子 Run 写入 tool.result超时/失败会直接结束父 Run。
*
* @param array<int, ToolCall> $toolCalls
* @return array{failed: bool, canceled: bool}
*/
private function awaitToolResults(string $sessionId, string $parentRunId, array $toolCalls, string $providerName): array
{
$start = microtime(true);
$expectedIds = array_map(fn (ToolCall $call) => $call->toolCallId, $toolCalls);
$expectedRuns = array_map(fn (ToolCall $call) => $call->runId, $toolCalls);
while (true) {
if ($this->isCanceled($sessionId, $parentRunId)) {
$this->appendCanceled($sessionId, $parentRunId);
return ['failed' => false, 'canceled' => true];
}
$results = $this->findToolResults($sessionId, $parentRunId);
$statuses = $this->findToolRunStatuses($sessionId, $parentRunId);
foreach ($expectedRuns as $runId) {
$status = $statuses[$runId] ?? null;
if ($status === 'FAILED') {
$this->appendProviderFailure(
$sessionId,
$parentRunId,
'TOOL_RUN_FAILED',
"Tool run {$runId} failed",
$providerName,
$this->latencyMs($start),
[],
'TOOL_RUN_FAILED'
);
return ['failed' => true, 'canceled' => false];
}
if ($status === 'CANCELED') {
$this->appendCanceled($sessionId, $parentRunId);
return ['failed' => false, 'canceled' => true];
}
}
$readyIds = array_intersect($expectedIds, array_keys($results));
if (count($readyIds) === count($expectedIds)) {
return ['failed' => false, 'canceled' => false];
}
if ($this->latencyMs($start) >= $this->toolWaitTimeoutMs) {
$this->appendProviderFailure(
$sessionId,
$parentRunId,
'TOOL_RESULT_TIMEOUT',
'Tool result wait timeout',
$providerName,
$this->latencyMs($start),
[],
'TOOL_RESULT_TIMEOUT'
);
return ['failed' => true, 'canceled' => false];
}
usleep($this->toolPollIntervalMs * 1000);
}
}
/**
* @return array<string, \App\Models\Message>
*/
private function findToolResults(string $sessionId, string $parentRunId): array
{
$messages = Message::query()
->where('session_id', $sessionId)
->where('type', 'tool.result')
->whereRaw("payload->>'parent_run_id' = ?", [$parentRunId])
->orderBy('seq')
->get();
$byToolCall = [];
foreach ($messages as $message) {
$toolCallId = $message->payload['tool_call_id'] ?? null;
if (is_string($toolCallId) && $toolCallId !== '') {
$byToolCall[$toolCallId] = $message;
}
}
return $byToolCall;
}
/**
* @return array<string, string>
*/
private function findToolRunStatuses(string $sessionId, string $parentRunId): array
{
$messages = Message::query()
->where('session_id', $sessionId)
->where('type', 'run.status')
->whereRaw("payload->>'parent_run_id' = ?", [$parentRunId])
->orderBy('seq')
->get();
$statuses = [];
foreach ($messages as $message) {
$runId = $message->payload['run_id'] ?? null;
$status = $message->payload['status'] ?? null;
if (is_string($runId) && is_string($status)) {
$statuses[$runId] = $status;
}
}
return $statuses;
}
private function generateToolRunId(string $parentRunId, string $toolCallId): string
{
return substr(hash('sha256', $parentRunId.'|'.$toolCallId), 0, 32);
}
/**
* @return array<string, mixed>
*/
private function decodeToolArguments(string $rawArguments): array
{
$decoded = json_decode($rawArguments, true);
return is_array($decoded) ? $decoded : [];
}
/**
* 计算耗时(毫秒)。
*/
private function latencyMs(float $startedAt): int
{
return (int) ((microtime(true) - $startedAt) * 1000);
}
/**
* 统一取消判断,便于 mock。
*/
private function isCanceled(string $sessionId, string $runId): bool
{
return $this->cancelChecker->isCanceled($sessionId, $runId);
}
/**
* 返回 Provider 名称Dummy 使用短名)。
*/
private function resolveProviderName(): string
{
if ($this->provider instanceof DummyAgentProvider) {
return 'dummy';
}
return str_replace("\0", '', get_class($this->provider));
}
}