From 6d934f4e34799e816c1764afce87b111d57c5ec2 Mon Sep 17 00:00:00 2001 From: Roog Date: Thu, 18 Dec 2025 17:41:42 +0800 Subject: [PATCH] =?UTF-8?q?main:=20=E5=A2=9E=E5=BC=BA=20Agent=20Run=20?= =?UTF-8?q?=E8=B0=83=E5=BA=A6=E5=8F=AF=E9=9D=A0=E6=80=A7=E4=B8=8E=E5=B9=82?= =?UTF-8?q?=E7=AD=89=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 默认切换 AgentProvider 为 HttpAgentProvider,增强网络请求的容错和重试机制 - 优化 Run 逻辑,支持多场景去重与并发保护 - 添加 Redis 发布失败的日志记录以提升问题排查效率 - 扩展 OpenAPI 规范,新增 Error 和 Run 状态相关模型 - 增强测试覆盖,验证调度策略和重复请求的幂等性 - 增加数据库索引以优化查询性能 - 更新所有相关文档和配置文件 --- .../Controllers/ChatSessionSseController.php | 87 +++++------ app/Jobs/AgentRunJob.php | 46 +++++- app/Providers/AppServiceProvider.php | 2 +- app/Services/Agent/HttpAgentProvider.php | 76 +++++++++- app/Services/Agent/ProviderException.php | 16 ++ app/Services/CancelChecker.php | 2 +- app/Services/ChatService.php | 16 +- app/Services/OutputSink.php | 13 +- app/Services/RunDispatcher.php | 70 +++++---- app/Services/RunLoop.php | 98 ++++++++++-- config/agent.php | 16 ++ ...add_message_payload_expression_indexes.php | 25 ++++ docs/ChatSession/chat-session-api.md | 24 ++- docs/ChatSession/chat-session-openapi.yaml | 69 ++++++++- docs/agent-orchestrator-review.md | 53 +++++++ tests/Feature/AgentRunTest.php | 139 +++++++++++++++++- 16 files changed, 634 insertions(+), 118 deletions(-) create mode 100644 app/Services/Agent/ProviderException.php create mode 100644 config/agent.php create mode 100644 database/migrations/2025_12_18_084344_add_message_payload_expression_indexes.php create mode 100644 docs/agent-orchestrator-review.md diff --git a/app/Http/Controllers/ChatSessionSseController.php b/app/Http/Controllers/ChatSessionSseController.php index 0cdf6cb..d35bac1 100644 --- a/app/Http/Controllers/ChatSessionSseController.php +++ b/app/Http/Controllers/ChatSessionSseController.php @@ -17,6 +17,7 @@ class ChatSessionSseController extends Controller public function stream(Request $request, string $sessionId): Response|StreamedResponse { + set_time_limit(360); $this->service->getSession($sessionId); // ensure exists $lastEventId = $request->header('Last-Event-ID'); @@ -44,74 +45,50 @@ class ChatSessionSseController extends Controller $response = new StreamedResponse(function () use ($sessionId, $afterSeq, $limit) { $lastSentSeq = $afterSeq; + $lastHeartbeat = microtime(true); $this->sendBacklog($sessionId, $lastSentSeq, $limit); + $this->sendHeartbeat($lastHeartbeat); try { $redis = Redis::connection()->client(); if (method_exists($redis, 'setOption')) { - $redis->setOption(\Redis::OPT_READ_TIMEOUT, 5); + $redis->setOption(\Redis::OPT_READ_TIMEOUT, 360); } $channel = "session:{$sessionId}:messages"; - $lastPing = time(); - logger()->info('sse open'); - if (method_exists($redis, 'pubSubLoop')) { - $pubSub = $redis->pubSubLoop(); - $pubSub->subscribe($channel); - - foreach ($pubSub as $message) { - if ($message->kind === 'subscribe') { - continue; - } - - if (connection_aborted()) { - $pubSub->unsubscribe(); - break; - } - - $payloadId = $message->payload ?? null; - if ($payloadId) { - $msg = $this->service->getMessage($sessionId, $payloadId); - if ($msg && $msg->seq > $lastSentSeq) { - $this->emitMessage($msg); - $lastSentSeq = $msg->seq; - } - } - - if (time() - $lastPing >= 180) { - logger()->info('ping: sent'.$sessionId); - echo ": ping\n\n"; - @ob_flush(); - @flush(); - $lastPing = time(); - } + logger('sse open'); + // Fallback for Redis drivers without pubSubLoop (older phpredis) + $redis->subscribe([$channel], function ($redisInstance, $chan, $payload) use (&$lastSentSeq, $sessionId, $limit, &$lastHeartbeat) { + if (connection_aborted()) { + logger('sse aborted'); + $redisInstance->unsubscribe([$chan]); + return; } - logger()->info('close: sent'.$sessionId); - unset($pubSub); - } else { - // Fallback for Redis drivers without pubSubLoop (older phpredis) - $redis->subscribe([$channel], function ($redisInstance, $chan, $payload) use (&$lastSentSeq, $sessionId) { - if (connection_aborted()) { - $redisInstance->unsubscribe([$chan]); - return; + + $this->sendHeartbeat($lastHeartbeat); + + if (! $payload) { + return; + } + + $msg = $this->service->getMessage($sessionId, $payload); + if ($msg && $msg->seq > $lastSentSeq) { + if ($msg->seq > $lastSentSeq + 1) { + $this->sendBacklog($sessionId, $lastSentSeq, $limit); } - if (! $payload) { - return; - } - - $msg = $this->service->getMessage($sessionId, $payload); - if ($msg && $msg->seq > $lastSentSeq) { + if ($msg->seq > $lastSentSeq) { $this->emitMessage($msg); $lastSentSeq = $msg->seq; } - }); - } + } + }); } catch (\RedisException $exception) { logger()->warning('SSE redis subscription failed', [ 'session_id' => $sessionId, 'message' => $exception->getMessage(), + 'code' => $exception->getCode(), ]); echo ": redis-error\n\n"; @ob_flush(); @@ -146,4 +123,16 @@ class ChatSessionSseController extends Controller @flush(); } } + + private function sendHeartbeat(float &$lastHeartbeat, int $intervalSeconds = 15): void + { + if ((microtime(true) - $lastHeartbeat) < $intervalSeconds) { + return; + } + + echo ": ping\n\n"; + @ob_flush(); + @flush(); + $lastHeartbeat = microtime(true); + } } diff --git a/app/Jobs/AgentRunJob.php b/app/Jobs/AgentRunJob.php index 520e272..9cb602c 100644 --- a/app/Jobs/AgentRunJob.php +++ b/app/Jobs/AgentRunJob.php @@ -2,6 +2,9 @@ namespace App\Jobs; +use App\Models\Message; +use App\Services\Agent\ProviderException; +use App\Services\CancelChecker; use App\Services\OutputSink; use App\Services\RunLoop; use Illuminate\Bus\Queueable; @@ -14,22 +17,61 @@ class AgentRunJob implements ShouldQueue { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; + public int $tries; + + public int $timeout; + + public int $backoff; + public function __construct(public string $sessionId, public string $runId) { + $this->tries = (int) config('agent.job.tries', 1); + $this->timeout = (int) config('agent.job.timeout_seconds', 120); + $this->backoff = (int) config('agent.job.backoff_seconds', 5); } - public function handle(RunLoop $loop, OutputSink $sink): void + public function handle(RunLoop $loop, OutputSink $sink, CancelChecker $cancelChecker): void { try { + logger("Running run {$this->runId} for session {$this->sessionId}"); $loop->run($this->sessionId, $this->runId); } catch (\Throwable $e) { - $sink->appendError($this->sessionId, $this->runId, 'run.failed', $e->getMessage()); + logger("Running error {$this->runId} for session {$this->sessionId}"); + logger("error message:",[$e->getMessage(),$e->getTraceAsString()]); + $errorCode = $e instanceof ProviderException ? $e->errorCode : 'run.failed'; + $dedupeKey = $e instanceof ProviderException + ? "run:{$this->runId}:error:provider" + : "run:{$this->runId}:error:job"; + + $sink->appendError($this->sessionId, $this->runId, $errorCode, $e->getMessage(), [], $dedupeKey); $sink->appendRunStatus($this->sessionId, $this->runId, 'FAILED', [ 'error' => $e->getMessage(), 'dedupe_key' => "run:{$this->runId}:status:FAILED", ]); throw $e; + } finally { + $latestStatus = Message::query() + ->where('session_id', $this->sessionId) + ->where('type', 'run.status') + ->whereRaw("payload->>'run_id' = ?", [$this->runId]) + ->orderByDesc('seq') + ->first(); + + $status = $latestStatus ? ($latestStatus->payload['status'] ?? null) : null; + + if ($status === 'RUNNING' || ! $status) { + if ($cancelChecker->isCanceled($this->sessionId, $this->runId)) { + $sink->appendRunStatus($this->sessionId, $this->runId, 'CANCELED', [ + 'dedupe_key' => "run:{$this->runId}:status:CANCELED", + ]); + } else { + $sink->appendRunStatus($this->sessionId, $this->runId, 'FAILED', [ + 'error' => 'JOB_ENDED_UNEXPECTEDLY', + 'dedupe_key' => "run:{$this->runId}:status:FAILED", + ]); + } + } } } } diff --git a/app/Providers/AppServiceProvider.php b/app/Providers/AppServiceProvider.php index 78aa2de..4c3fa5a 100644 --- a/app/Providers/AppServiceProvider.php +++ b/app/Providers/AppServiceProvider.php @@ -12,7 +12,7 @@ class AppServiceProvider extends ServiceProvider public function register(): void { $this->app->bind(\App\Services\Agent\AgentProviderInterface::class, function () { - return new \App\Services\Agent\DummyAgentProvider(); + return new \App\Services\Agent\HttpAgentProvider(); }); } diff --git a/app/Services/Agent/HttpAgentProvider.php b/app/Services/Agent/HttpAgentProvider.php index 35d21ee..7d32609 100644 --- a/app/Services/Agent/HttpAgentProvider.php +++ b/app/Services/Agent/HttpAgentProvider.php @@ -2,15 +2,24 @@ namespace App\Services\Agent; +use Illuminate\Http\Client\ConnectionException; use Illuminate\Support\Facades\Http; class HttpAgentProvider implements AgentProviderInterface { protected string $endpoint; + protected int $timeoutSeconds; + protected int $connectTimeoutSeconds; + protected int $retryTimes; + protected int $retryBackoffMs; public function __construct(?string $endpoint = null) { - $this->endpoint = $endpoint ?? config('services.agent_provider.endpoint', ''); + $this->endpoint = $endpoint ?? config('agent.provider.endpoint', ''); + $this->timeoutSeconds = (int) config('agent.provider.timeout_seconds', 30); + $this->connectTimeoutSeconds = (int) config('agent.provider.connect_timeout_seconds', 5); + $this->retryTimes = (int) config('agent.provider.retry_times', 1); + $this->retryBackoffMs = (int) config('agent.provider.retry_backoff_ms', 500); } /** @@ -29,14 +38,69 @@ class HttpAgentProvider implements AgentProviderInterface 'options' => $options, ]; - $response = Http::post($this->endpoint, $payload); + $attempts = $this->retryTimes + 1; + $lastException = null; + $lastResponseBody = null; + $lastStatus = null; - if (! $response->successful()) { - throw new \RuntimeException('Agent provider failed: '.$response->body()); + for ($attempt = 1; $attempt <= $attempts; $attempt++) { + try { + $response = Http::connectTimeout($this->connectTimeoutSeconds) + ->timeout($this->timeoutSeconds) + ->post($this->endpoint, $payload); + + $lastStatus = $response->status(); + $lastResponseBody = $response->body(); + + if ($response->successful()) { + $data = $response->json(); + + return is_string($data) ? $data : ($data['content'] ?? ''); + } + + $retryable = $lastStatus === 429 || $lastStatus >= 500; + if ($retryable && $attempt < $attempts) { + usleep($this->retryBackoffMs * 1000); + continue; + } + + throw new ProviderException( + 'HTTP_ERROR', + 'Agent provider failed', + $retryable, + $lastStatus, + $lastResponseBody + ); + } catch (ConnectionException $exception) { + $lastException = $exception; + if ($attempt < $attempts) { + usleep($this->retryBackoffMs * 1000); + continue; + } + } catch (\Throwable $exception) { + $lastException = $exception; + break; + } } - $data = $response->json(); + $rawMessage = $lastException ? $lastException->getMessage() : $lastResponseBody; - return is_string($data) ? $data : ($data['content'] ?? ''); + if ($lastException instanceof ConnectionException) { + throw new ProviderException( + 'CONNECTION_FAILED', + 'Agent provider connection failed', + true, + $lastStatus, + $rawMessage + ); + } + + throw new ProviderException( + 'UNKNOWN_ERROR', + 'Agent provider error', + false, + $lastStatus, + $rawMessage + ); } } diff --git a/app/Services/Agent/ProviderException.php b/app/Services/Agent/ProviderException.php new file mode 100644 index 0000000..a0caab9 --- /dev/null +++ b/app/Services/Agent/ProviderException.php @@ -0,0 +1,16 @@ +where('session_id', $sessionId) ->where('type', 'run.cancel.request') ->whereIn('role', [Message::ROLE_USER, Message::ROLE_SYSTEM]) - ->where('payload->run_id', $runId) + ->whereRaw("payload->>'run_id' = ?", [$runId]) ->exists(); } } diff --git a/app/Services/ChatService.php b/app/Services/ChatService.php index 610bc38..7f48501 100644 --- a/app/Services/ChatService.php +++ b/app/Services/ChatService.php @@ -56,14 +56,16 @@ class ChatService * - payload: 附加信息(可选) * - reply_to: 被回复的消息 ID(可选) * - dedupe_key: 消息去重键(可选) + * @param bool|null $wasDeduped 是否发生了去重(可选,按引用返回) * @return Message 返回成功追加的消息实例。如果存在去重键并已存在重复消息,则返回现有的消息。 */ - public function appendMessage(array $dto): Message + public function appendMessage(array $dto, ?bool &$wasDeduped = null): Message { $messageRef = null; $isNew = false; + $wasDeduped = false; - DB::transaction(function () use ($dto, &$messageRef, &$isNew) { + DB::transaction(function () use ($dto, &$messageRef, &$isNew, &$wasDeduped) { /** @var ChatSession $session */ $session = ChatSession::query() ->whereKey($dto['session_id']) @@ -81,6 +83,7 @@ class ChatService if ($existing) { $messageRef = $existing; + $wasDeduped = true; return; } } @@ -112,6 +115,7 @@ class ChatService if ($existing) { $messageRef = $existing; + $wasDeduped = true; return; } } @@ -288,9 +292,11 @@ class ChatService try { Redis::publish($channel, $message->message_id); } catch (\Throwable $e) { - if (! app()->runningUnitTests()) { - throw $e; - } + logger()->warning('Redis publish failed', [ + 'session_id' => $message->session_id, + 'message_id' => $message->message_id, + 'error' => $e->getMessage(), + ]); } } diff --git a/app/Services/OutputSink.php b/app/Services/OutputSink.php index 8302045..aff0714 100644 --- a/app/Services/OutputSink.php +++ b/app/Services/OutputSink.php @@ -13,21 +13,24 @@ class OutputSink /** * @param array $meta */ - public function appendAgentMessage(string $sessionId, string $runId, string $content, array $meta = []): Message + public function appendAgentMessage(string $sessionId, string $runId, string $content, array $meta = [], ?string $dedupeKey = null): Message { + $dedupeKey = $dedupeKey ?? "run:{$runId}:agent:message"; + return $this->chatService->appendMessage([ 'session_id' => $sessionId, 'role' => Message::ROLE_AGENT, 'type' => 'agent.message', 'content' => $content, 'payload' => array_merge($meta, ['run_id' => $runId]), + 'dedupe_key' => $dedupeKey, ]); } /** * @param array $meta */ - public function appendRunStatus(string $sessionId, string $runId, string $status, array $meta = []): Message + public function appendRunStatus(string $sessionId, string $runId, string $status, array $meta = [], ?bool &$wasDeduped = null): Message { $dedupeKey = $meta['dedupe_key'] ?? null; unset($meta['dedupe_key']); @@ -36,18 +39,19 @@ class OutputSink 'session_id' => $sessionId, 'role' => Message::ROLE_SYSTEM, 'type' => 'run.status', + 'content' => null, 'payload' => array_merge($meta, [ 'run_id' => $runId, 'status' => $status, ]), 'dedupe_key' => $dedupeKey, - ]); + ], $wasDeduped); } /** * @param array $meta */ - public function appendError(string $sessionId, string $runId, string $code, string $message, array $meta = []): Message + public function appendError(string $sessionId, string $runId, string $code, string $message, array $meta = [], ?string $dedupeKey = null): Message { return $this->chatService->appendMessage([ 'session_id' => $sessionId, @@ -58,6 +62,7 @@ class OutputSink 'run_id' => $runId, 'message' => $message, ]), + 'dedupe_key' => $dedupeKey, ]); } } diff --git a/app/Services/RunDispatcher.php b/app/Services/RunDispatcher.php index 0016389..2a29c49 100644 --- a/app/Services/RunDispatcher.php +++ b/app/Services/RunDispatcher.php @@ -3,8 +3,10 @@ namespace App\Services; use App\Jobs\AgentRunJob; +use App\Models\ChatSession; use App\Models\Message; use Illuminate\Database\Eloquent\ModelNotFoundException; +use Illuminate\Support\Facades\DB; use Illuminate\Support\Str; class RunDispatcher @@ -25,36 +27,50 @@ class RunDispatcher throw (new ModelNotFoundException())->setModel(Message::class, [$triggerMessageId]); } - $existingForTrigger = Message::query() - ->where('session_id', $sessionId) - ->where('type', 'run.status') - ->where('payload->trigger_message_id', $triggerMessageId) - ->orderByDesc('seq') - ->first(); + $shouldDispatch = false; - if ($existingForTrigger && ($existingForTrigger->payload['run_id'] ?? null)) { - return $existingForTrigger->payload['run_id']; + $runId = DB::transaction(function () use ($sessionId, $triggerMessageId, &$shouldDispatch) { + ChatSession::query() + ->whereKey($sessionId) + ->lockForUpdate() + ->firstOrFail(); + + $latestStatus = Message::query() + ->where('session_id', $sessionId) + ->where('type', 'run.status') + ->orderByDesc('seq') + ->first(); + + if ($latestStatus && ($latestStatus->payload['status'] ?? null) === 'RUNNING' && ($latestStatus->payload['run_id'] ?? null)) { + logger('existing run found', ['sessionId' => $sessionId, 'runId' => $latestStatus->payload['run_id']]); + return $latestStatus->payload['run_id']; + } + + $candidateRunId = (string) Str::uuid(); + $wasDeduped = null; + + $statusMessage = $this->outputSink->appendRunStatus($sessionId, $candidateRunId, 'RUNNING', [ + 'trigger_message_id' => $triggerMessageId, + 'dedupe_key' => 'run:trigger:'.$triggerMessageId, + ], $wasDeduped); + + $finalRunId = $statusMessage->payload['run_id'] ?? $candidateRunId; + + if ($wasDeduped) { + logger('existing run found', ['sessionId' => $sessionId, 'runId' => $finalRunId]); + return $finalRunId; + } + + $shouldDispatch = true; + + return $finalRunId; + }); + + if ($shouldDispatch) { + logger('dispatching run', ['sessionId' => $sessionId, 'runId' => $runId]); + dispatch(new AgentRunJob($sessionId, $runId)); } - $latestStatus = Message::query() - ->where('session_id', $sessionId) - ->where('type', 'run.status') - ->orderByDesc('seq') - ->first(); - - if ($latestStatus && ($latestStatus->payload['status'] ?? null) === 'RUNNING' && ($latestStatus->payload['run_id'] ?? null)) { - return $latestStatus->payload['run_id']; - } - - $runId = (string) Str::uuid(); - - $this->outputSink->appendRunStatus($sessionId, $runId, 'RUNNING', [ - 'trigger_message_id' => $triggerMessageId, - 'dedupe_key' => 'run:trigger:'.$triggerMessageId, - ]); - - dispatch(new AgentRunJob($sessionId, $runId)); - return $runId; } } diff --git a/app/Services/RunLoop.php b/app/Services/RunLoop.php index e8d2d26..9bac9db 100644 --- a/app/Services/RunLoop.php +++ b/app/Services/RunLoop.php @@ -4,9 +4,13 @@ namespace App\Services; use App\Services\Agent\AgentProviderInterface; use App\Services\Agent\DummyAgentProvider; +use App\Services\Agent\ProviderException; +use App\Models\Message; class RunLoop { + private const TERMINAL_STATUSES = ['DONE', 'FAILED', 'CANCELED']; + public function __construct( private readonly ContextBuilder $contextBuilder, private readonly AgentProviderInterface $provider, @@ -17,36 +21,106 @@ class RunLoop public function run(string $sessionId, string $runId): void { + if ($this->isRunTerminal($sessionId, $runId)) { + return; + } + if ($this->cancelChecker->isCanceled($sessionId, $runId)) { - $this->outputSink->appendRunStatus($sessionId, $runId, 'CANCELED', [ - 'dedupe_key' => "run:{$runId}:status:CANCELED", - ]); + $this->appendCanceled($sessionId, $runId); return; } $context = $this->contextBuilder->build($sessionId, $runId); if ($this->cancelChecker->isCanceled($sessionId, $runId)) { - $this->outputSink->appendRunStatus($sessionId, $runId, 'CANCELED', [ - 'dedupe_key' => "run:{$runId}:status:CANCELED", - ]); + $this->appendCanceled($sessionId, $runId); return; } - $reply = $this->provider->generate($context); + $providerName = $this->resolveProviderName(); + $startedAt = microtime(true); + + logger('agent provider request', [ + 'sessionId' => $sessionId, + 'runId' => $runId, + 'provider' => $providerName, + ]); + + try { + $reply = $this->provider->generate($context); + } catch (ProviderException $exception) { + $latencyMs = (int) ((microtime(true) - $startedAt) * 1000); + + $this->outputSink->appendError($sessionId, $runId, $exception->errorCode, $exception->getMessage(), [ + 'retryable' => $exception->retryable, + 'http_status' => $exception->httpStatus, + 'provider' => $providerName, + 'latency_ms' => $latencyMs, + 'raw_message' => $exception->rawMessage, + ], "run:{$runId}:error:provider"); + + $this->outputSink->appendRunStatus($sessionId, $runId, 'FAILED', [ + 'error' => $exception->getMessage(), + 'dedupe_key' => "run:{$runId}:status:FAILED", + ]); + + throw $exception; + } + $latencyMs = (int) ((microtime(true) - $startedAt) * 1000); + + logger('agent provider response', [ + 'sessionId' => $sessionId, + 'runId' => $runId, + 'provider' => $providerName, + 'latency_ms' => $latencyMs, + ]); if ($this->cancelChecker->isCanceled($sessionId, $runId)) { - $this->outputSink->appendRunStatus($sessionId, $runId, 'CANCELED', [ - 'dedupe_key' => "run:{$runId}:status:CANCELED", - ]); + $this->appendCanceled($sessionId, $runId); return; } $this->outputSink->appendAgentMessage($sessionId, $runId, $reply, [ - 'provider' => $this->provider instanceof DummyAgentProvider ? 'dummy' : get_class($this->provider), - ]); + 'provider' => $providerName, + ], "run:{$runId}:agent:message"); + + if ($this->cancelChecker->isCanceled($sessionId, $runId)) { + $this->appendCanceled($sessionId, $runId); + return; + } + $this->outputSink->appendRunStatus($sessionId, $runId, 'DONE', [ 'dedupe_key' => "run:{$runId}:status:DONE", ]); } + + private function isRunTerminal(string $sessionId, string $runId): bool + { + $latestStatus = Message::query() + ->where('session_id', $sessionId) + ->where('type', 'run.status') + ->whereRaw("payload->>'run_id' = ?", [$runId]) + ->orderByDesc('seq') + ->first(); + + $status = $latestStatus ? ($latestStatus->payload['status'] ?? null) : null; + + return in_array($status, self::TERMINAL_STATUSES, true); + } + + private function appendCanceled(string $sessionId, string $runId): void + { + $this->outputSink->appendRunStatus($sessionId, $runId, 'CANCELED', [ + 'dedupe_key' => "run:{$runId}:status:CANCELED", + ]); + } + + private function resolveProviderName(): string + { + if ($this->provider instanceof DummyAgentProvider) { + return 'dummy'; + } + + return str_replace("\0", '', get_class($this->provider)); + } } diff --git a/config/agent.php b/config/agent.php new file mode 100644 index 0000000..69a34b7 --- /dev/null +++ b/config/agent.php @@ -0,0 +1,16 @@ + [ + 'endpoint' => env('AGENT_PROVIDER_ENDPOINT', ''), + 'timeout_seconds' => env('AGENT_PROVIDER_TIMEOUT', 30), + 'connect_timeout_seconds' => env('AGENT_PROVIDER_CONNECT_TIMEOUT', 5), + 'retry_times' => env('AGENT_PROVIDER_RETRY_TIMES', 1), + 'retry_backoff_ms' => env('AGENT_PROVIDER_RETRY_BACKOFF_MS', 500), + ], + 'job' => [ + 'tries' => env('AGENT_RUN_JOB_TRIES', 1), + 'backoff_seconds' => env('AGENT_RUN_JOB_BACKOFF', 5), + 'timeout_seconds' => env('AGENT_RUN_JOB_TIMEOUT', 120), + ], +]; diff --git a/database/migrations/2025_12_18_084344_add_message_payload_expression_indexes.php b/database/migrations/2025_12_18_084344_add_message_payload_expression_indexes.php new file mode 100644 index 0000000..3afedbd --- /dev/null +++ b/database/migrations/2025_12_18_084344_add_message_payload_expression_indexes.php @@ -0,0 +1,25 @@ +>'run_id')) WHERE type IN ('run.status','run.cancel.request','error','agent.message')"); + DB::statement("CREATE INDEX IF NOT EXISTS messages_payload_trigger_message_id_idx ON messages ((payload->>'trigger_message_id')) WHERE type = 'run.status'"); + } + + /** + * Reverse the migrations. + */ + public function down(): void + { + DB::statement('DROP INDEX IF EXISTS messages_payload_run_id_idx'); + DB::statement('DROP INDEX IF EXISTS messages_payload_trigger_message_id_idx'); + } +}; diff --git a/docs/ChatSession/chat-session-api.md b/docs/ChatSession/chat-session-api.md index 74a0ece..3f24862 100644 --- a/docs/ChatSession/chat-session-api.md +++ b/docs/ChatSession/chat-session-api.md @@ -8,6 +8,14 @@ - 2025-02-14:新增 ChatSession 创建、消息追加、增量查询接口;支持状态门禁与 dedupe 幂等。 - 2025-02-14:MVP-1.1 增加会话列表、会话更新(重命名/状态变更),列表附带最后一条消息摘要。 - 2025-02-15:Agent Run MVP-0 —— RunDispatcher + AgentRunJob + DummyProvider;自动在 user.prompt 后触发一次 Run,落地 run.status / agent.message。 +- 2025-12-18:Agent Run 可靠性增强 —— 并发幂等、终态去重、取消语义加强、Provider 超时/重试/错误归一,SSE gap 回补与心跳。 + +## 本次变更摘要(2025-12-18) +- RunDispatcher 并发幂等:同 trigger_message_id 只产生一个 RUNNING,且仅新建时 dispatch。 +- RunLoop/OutputSink 幂等:agent.message 与 run.status 采用 dedupe_key;重复执行不重复写。 +- Cancel 强化:多检查点取消,确保不落 agent.message 且落 CANCELED 终态。 +- Provider 可靠性:超时/重试/429/5xx,错误落库包含 retryable/http_status/provider/latency_ms。 +- SSE 可靠性:gap 触发回补,心跳保活,publish 异常不影响主流程。 ## 领域模型 - `ChatSession`:`session_id`(UUID)、`session_name`、`status`(`OPEN`/`LOCKED`/`CLOSED`)、`last_seq` @@ -141,7 +149,8 @@ - `id` 为消息 `seq`,便于续传;`data` 为消息 JSON(同追加消息响应字段)。 - Backlog:建立连接后先补发 `seq > after_seq` 的消息(order asc,最多 `limit` 条),再进入实时订阅。 - 实时:Redis channel `session:{session_id}:messages` 发布消息 ID,SSE 侧读取后按 seq 去重、推送。 -- 心跳:周期输出 `: ping` 保活(生产环境)。 +- Gap 回补:若订阅推送的 seq 与 last_sent_seq 存在缺口,会主动回补 backlog。 +- 心跳:周期输出 `: ping` 保活。 - 错误:401 未授权;404 session 不存在。 ## Agent Run MVP-0(RunDispatcher + AgentRunJob) @@ -149,13 +158,14 @@ 1. 用户追加 `role=USER && type=user.prompt` 后,Controller 自动调用 `RunDispatcher->dispatchForPrompt`。 2. 并发保护:同会话只允许一个 RUNNING;同一个 `trigger_message_id` 幂等复用已有 `run_id`。 3. 立即写入 `run.status`(SYSTEM/run.status,payload `{run_id,status:'RUNNING',trigger_message_id}`,dedupe_key=`run:trigger:{message_id}`)。 -4. 推送 `AgentRunJob(session_id, run_id)` 到队列(测试环境 QUEUE=sync 会同步执行)。 -5. RunLoop(使用 DummyAgentProvider): +4. 仅在新建 RUNNING 时推送 `AgentRunJob(session_id, run_id)` 到队列(测试环境 QUEUE=sync 会同步执行)。 +5. RunLoop(默认 HttpAgentProvider,未配置 endpoint 时回退 DummyAgentProvider): + - 终态检测:若已 DONE/FAILED/CANCELED 则直接返回。 - Cancel 检查:存在 `run.cancel.request`(payload.run_id) 则写入 `run.status=CANCELED`,不产出 agent.message。 - ContextBuilder:提取最近 20 条 USER/AGENT 消息(type in user.prompt/agent.message),seq 升序提供给 Provider。 - - Provider 返回一次性文本回复。 - - OutputSink 依次写入:`agent.message`(payload 含 run_id, provider)、`run.status=DONE`(dedupe_key=`run:{run_id}:status:DONE`)。 -6. 异常:AgentRunJob 捕获异常后写入 `error` + `run.status=FAILED`(dedupe)。 + - Provider 返回一次性文本回复(内置超时/重试/退避)。 + - OutputSink 依次写入:`agent.message`(payload 含 run_id, provider,dedupe_key=`run:{run_id}:agent:message`)、`run.status=DONE`(dedupe_key=`run:{run_id}:status:DONE`)。 +6. 异常:ProviderException 写入 `error` + `run.status=FAILED`(dedupe),error payload 包含 retryable/http_status/provider/latency_ms。 ### Run 相关消息类型(落库即真相源) | type | role | payload 关键字段 | 说明 | @@ -163,7 +173,7 @@ | run.status | SYSTEM | run_id, status(RUNNING/DONE/CANCELED/FAILED), trigger_message_id?, error? | Run 生命周期事件,CLOSED 状态下允许写入 | | agent.message | AGENT | run_id, provider | Provider 的一次性回复 | | run.cancel.request | USER/SYSTEM | run_id | CancelChecker 依据该事件判断是否中止 | -| error | SYSTEM | run_id, message | 任务异常时落库 | +| error | SYSTEM | run_id, message, retryable?, http_status?, provider?, latency_ms?, raw_message? | 任务异常时落库 | ### 触发 Run(调试入口) - `POST /sessions/{session_id}/runs` diff --git a/docs/ChatSession/chat-session-openapi.yaml b/docs/ChatSession/chat-session-openapi.yaml index 6411846..be44c98 100644 --- a/docs/ChatSession/chat-session-openapi.yaml +++ b/docs/ChatSession/chat-session-openapi.yaml @@ -1,7 +1,7 @@ openapi: 3.0.3 info: title: ChatSession & Message API - version: 1.1.0 + version: 1.1.1 description: | ChatSession & Message API(含 Archive/GetMessage/SSE 与 Run 调度)。自然语言:中文。 servers: @@ -15,6 +15,13 @@ tags: - name: Run description: Agent Run 调度 paths: + /test: + summary: 测试接口 + get: + tags: [Test] + responses: + "200": + description: 成功 /sessions: post: tags: [ChatSession] @@ -422,8 +429,13 @@ components: type: string nullable: true payload: - type: object nullable: true + oneOf: + - $ref: '#/components/schemas/RunStatusPayload' + - $ref: '#/components/schemas/AgentMessagePayload' + - $ref: '#/components/schemas/RunCancelPayload' + - $ref: '#/components/schemas/RunErrorPayload' + - type: object reply_to: type: string format: uuid @@ -470,6 +482,59 @@ components: message: type: string example: Session is closed + RunStatusPayload: + type: object + properties: + run_id: + type: string + format: uuid + status: + type: string + enum: [RUNNING, DONE, FAILED, CANCELED] + trigger_message_id: + type: string + format: uuid + nullable: true + error: + type: string + nullable: true + AgentMessagePayload: + type: object + properties: + run_id: + type: string + format: uuid + provider: + type: string + RunCancelPayload: + type: object + properties: + run_id: + type: string + format: uuid + RunErrorPayload: + type: object + properties: + run_id: + type: string + format: uuid + message: + type: string + retryable: + type: boolean + nullable: true + http_status: + type: integer + nullable: true + provider: + type: string + nullable: true + latency_ms: + type: integer + nullable: true + raw_message: + type: string + nullable: true PaginationLinks: type: object properties: diff --git a/docs/agent-orchestrator-review.md b/docs/agent-orchestrator-review.md new file mode 100644 index 0000000..e7efd3d --- /dev/null +++ b/docs/agent-orchestrator-review.md @@ -0,0 +1,53 @@ +# Agent Orchestrator 工程级 Review + +A. 现状总结 + +整体链路(RunDispatcher → Job → RunLoop → Provider → OutputSink)已能跑通,职责划分基本清晰,append-only + seq 方案也成立;但在并发幂等、run 生命周期一致性、取消可靠性、provider 超时/重试,以及 SSE 丢事件补偿方面仍有明显缺口,尚未达到对接真实 LLM Provider 的最低可靠性门槛。 + +B. 高风险问题列表 + +- P0 | 影响: 并发触发同一 prompt 时可能生成“幽灵 run”,run_id 不一致且重复出消息/计费 | 复现: 并发调用 `dispatchForPrompt` 同一 `trigger_message_id`,第二个请求 dedupe 命中但仍派发新 job | 修复: 以 `appendRunStatus` 返回的 message 为准,若 run_id 不一致则直接返回已有 run_id 且不 dispatch;或使用确定性 run_id/事务化 get-or-create | 位置: `app/Services/RunDispatcher.php:21`, `app/Services/OutputSink.php:30`, `app/Services/ChatService.php:61` +- P0 | 影响: job 重试/重复派发会生成多个 `agent.message`,导致重复输出与双重计费 | 复现: 让队列重试或重复 dispatch 同一 run_id | 修复: 给 agent.message 增加 dedupe_key(按 run_id + step/chunk),并在 RunLoop 开始前检查 run.status 是否已终态 | 位置: `app/Services/OutputSink.php:16`, `app/Services/RunLoop.php:45`, `app/Jobs/AgentRunJob.php:21` +- P1 | 影响: 同 session 可并发多个 RUNNING(违反“单 run”假设),后续状态/输出互相覆盖 | 复现: 连续/并发发送多条 prompt;`latestStatus` 检查无锁 | 修复: 在 session 行锁内判断并创建 RUNNING,或用 Redis/DB 锁/独立 run 表做硬约束 | 位置: `app/Services/RunDispatcher.php:40`, `app/Http/Controllers/ChatSessionController.php:45` +- P1 | 影响: run 生命周期非原子写入,崩溃后会出现“有回复但仍 RUNNING”或“RUNNING 无后续” | 复现: 在 `appendAgentMessage` 后杀 worker;或 dispatch 失败后 RUNNING 已落库 | 修复: 将 agent.message 与 DONE 写入同一一致性边界;添加 watchdog/finally 标记 FAILED/CANCELED | 位置: `app/Services/RunLoop.php:45`, `app/Services/RunDispatcher.php:53` +- P1 | 影响: cancel 不能严格阻止 agent.message 落库,且取消后可能无 CANCELED 终态 | 复现: cancel 请求在 provider 返回后、写回前到达;或 job 崩溃 | 修复: 在写回前/写回时再校验 cancel;在 finally 中若存在 cancel 请求则写 CANCELED | 位置: `app/Services/RunLoop.php:20`, `app/Services/CancelChecker.php:9` +- P1 | 影响: provider 调用可能无限挂起或频繁失败,run 长时间 RUNNING | 复现: provider 超时/429/5xx | 修复: 设置 Http timeout/retry/backoff,配置 job $timeout/$tries;对 429 做退避 | 位置: `app/Services/Agent/HttpAgentProvider.php:20`, `app/Jobs/AgentRunJob.php:13` +- P2 | 影响: SSE 可能漏事件(backlog 与订阅之间存在窗口),Redis publish 异常会让主流程报错 | 复现: 在 backlog 发送后、订阅前插入消息;或 Redis 异常 | 修复: 订阅后检测 seq gap 回补;publish 异常仅告警不抛 | 位置: `app/Http/Controllers/ChatSessionSseController.php:46`, `app/Services/ChatService.php:130` +- P2 | 影响: 上下文固定 20 条且无 token 预算/截断,遇大消息容易爆 token | 复现: 长文本 prompt/agent 输出 | 修复: 通过配置控制窗口/预算并做截断或摘要 | 位置: `app/Services/ContextBuilder.php:10` +- P2 | 影响: JSONB payload 查询无索引,run/cancel 查询随数据量增长会退化 | 复现: 大量消息后 run.status/取消查询变慢 | 修复: 加表达式索引 `payload->>'run_id'` / `payload->>'trigger_message_id'` | 位置: `app/Services/RunDispatcher.php:28`, `app/Services/CancelChecker.php:11` + +C. 对接真实 LLM Provider 前必须补齐的最小门槛清单 + +- 并发幂等:RunDispatcher 必须做到“同 trigger 只产生一个 run”,且不 dispatch 重复 job +- 输出幂等:agent.message 必须可去重,RunLoop 必须在终态时 no-op +- run 生命周期一致性:DONE/FAILED/CANCELED 的落库要有一致性边界或兜底 finalizer +- 取消语义:写回前必须再次校验 cancel,并确保取消后不再写 agent.message +- Provider 调用稳定性:timeout + retry/backoff + 429/5xx 处理 + job timeout/tries +- SSE 补偿:解决 backlog/subscribe 间隙与 publish 异常导致的漏消息 +- Context 预算:可配置窗口/预算并限制大 payload +- 最小测试覆盖:并发幂等、取消、重试/失败路径 + +D. 建议的下一步路线选择 + +- 方案1:先补 Orchestrator —— 补齐并发幂等、输出去重、生命周期终态一致性、取消写回保护、provider 超时/重试、SSE 缺口补偿、context 预算配置,再接真实 Provider +- 方案2:直接接真实 Provider —— 目前仅适合“实验/灰度验证链路通不通”;已有 append-only + seq + SSE 基础与 ContextBuilder 过滤,但不满足可靠性门槛 + +E. 最小变更的 patch 建议(不要求实现) + +- `app/Services/RunDispatcher.php`: `appendRunStatus` 后读取返回 message 的 `payload.run_id`;若与新 runId 不一致则直接返回已有 run_id 且跳过 dispatch;必要时把 “单 session 运行中检查” 放入带锁事务 +- `app/Services/OutputSink.php`: 允许 `appendAgentMessage` 接收并透传 dedupe_key(默认 `run:{runId}:agent:message`),避免重复输出 +- `app/Services/RunLoop.php`: 开始时/写回前检查 run 是否已终态;写回 DONE 前再检查 cancel,必要时写 CANCELED 并停止 +- `app/Jobs/AgentRunJob.php`: 设置 `$tries/$backoff/$timeout`;在 finally 中若仍 RUNNING 则落 FAILED(需查询最新 run.status) +- `app/Services/Agent/HttpAgentProvider.php` + `config/services.php`: 增加 timeout、retry/backoff 配置项与 429 退避策略 +- `app/Services/ChatService.php`: `publishMessageAppended` 改为捕获异常仅记录日志,避免 afterCommit 抛错中断主流程 +- `app/Http/Controllers/ChatSessionSseController.php`: 收到 pubsub 时若 seq gap>1 则 `sendBacklog` 回补;可加心跳定期拉取 +- 新 migration:为 `messages` 增加表达式索引 `payload->>'run_id'`、`payload->>'trigger_message_id'`(并结合 `type`)以保证查询性能 + +## 已实施增强(最小可行版本) + +- RunDispatcher 并发幂等:RUNNING 消息以 dedupe_key 作为唯一真相,只有新建 RUNNING 才 dispatch +- RunLoop/OutputSink 幂等:agent.message/run.status 使用 dedupe_key,终态重复执行可安全 no-op +- Cancel/终态收敛:多检查点取消 + Job finally 兜底写入 CANCELED/FAILED +- Provider 可靠性:超时/重试/错误归一化(ProviderException)并写入错误元数据 +- SSE 补偿:seq gap 回补 + 心跳 + publish 异常吞吐日志 +- Postgres 索引:payload 表达式索引加速 run/cancel 查询 diff --git a/tests/Feature/AgentRunTest.php b/tests/Feature/AgentRunTest.php index b2b5950..1360e56 100644 --- a/tests/Feature/AgentRunTest.php +++ b/tests/Feature/AgentRunTest.php @@ -4,6 +4,9 @@ namespace Tests\Feature; use App\Jobs\AgentRunJob; use App\Models\Message; +use App\Services\Agent\AgentProviderInterface; +use App\Services\Agent\ProviderException; +use App\Services\CancelChecker; use App\Services\ChatService; use App\Services\RunDispatcher; use App\Services\RunLoop; @@ -39,7 +42,8 @@ class AgentRunTest extends TestCase // simulate worker execution (new AgentRunJob($session->session_id, $runId))->handle( app(RunLoop::class), - app(OutputSink::class) + app(OutputSink::class), + app(CancelChecker::class) ); $messages = Message::query() @@ -52,6 +56,36 @@ class AgentRunTest extends TestCase $this->assertTrue($messages->contains(fn ($m) => $m->type === 'run.status' && ($m->payload['status'] ?? null) === 'DONE')); } + public function test_dispatch_is_idempotent_for_same_trigger(): void + { + Queue::fake(); + $service = app(ChatService::class); + $dispatcher = app(RunDispatcher::class); + + $session = $service->createSession('Idempotent Run'); + $prompt = $service->appendMessage([ + 'session_id' => $session->session_id, + 'role' => Message::ROLE_USER, + 'type' => 'user.prompt', + 'content' => 'please run once', + ]); + + $firstRunId = $dispatcher->dispatchForPrompt($session->session_id, $prompt->message_id); + $secondRunId = $dispatcher->dispatchForPrompt($session->session_id, $prompt->message_id); + + $this->assertSame($firstRunId, $secondRunId); + + Queue::assertPushed(AgentRunJob::class, 1); + + $statusMessages = Message::query() + ->where('session_id', $session->session_id) + ->where('type', 'run.status') + ->whereRaw("payload->>'trigger_message_id' = ?", [$prompt->message_id]) + ->get(); + + $this->assertCount(1, $statusMessages); + } + public function test_second_prompt_dispatches_new_run_after_first_completes(): void { Queue::fake(); @@ -70,7 +104,8 @@ class AgentRunTest extends TestCase (new AgentRunJob($session->session_id, $firstRunId))->handle( app(RunLoop::class), - app(OutputSink::class) + app(OutputSink::class), + app(CancelChecker::class) ); $secondPrompt = $service->appendMessage([ @@ -90,6 +125,51 @@ class AgentRunTest extends TestCase }); } + public function test_repeated_job_does_not_duplicate_agent_message(): void + { + Queue::fake(); + $service = app(ChatService::class); + $dispatcher = app(RunDispatcher::class); + + $session = $service->createSession('Retry Session'); + $prompt = $service->appendMessage([ + 'session_id' => $session->session_id, + 'role' => Message::ROLE_USER, + 'type' => 'user.prompt', + 'content' => 'retry run', + ]); + + $runId = $dispatcher->dispatchForPrompt($session->session_id, $prompt->message_id); + + (new AgentRunJob($session->session_id, $runId))->handle( + app(RunLoop::class), + app(OutputSink::class), + app(CancelChecker::class) + ); + + (new AgentRunJob($session->session_id, $runId))->handle( + app(RunLoop::class), + app(OutputSink::class), + app(CancelChecker::class) + ); + + $agentMessages = Message::query() + ->where('session_id', $session->session_id) + ->where('type', 'agent.message') + ->whereRaw("payload->>'run_id' = ?", [$runId]) + ->get(); + + $doneStatuses = Message::query() + ->where('session_id', $session->session_id) + ->where('type', 'run.status') + ->whereRaw("payload->>'run_id' = ?", [$runId]) + ->whereRaw("payload->>'status' = ?", ['DONE']) + ->get(); + + $this->assertCount(1, $agentMessages); + $this->assertCount(1, $doneStatuses); + } + public function test_cancel_prevents_agent_reply_and_marks_canceled(): void { Queue::fake(); @@ -123,4 +203,59 @@ class AgentRunTest extends TestCase $this->assertFalse($messages->contains(fn ($m) => $m->type === 'agent.message' && ($m->payload['run_id'] ?? null) === $runId)); $this->assertTrue($messages->contains(fn ($m) => $m->type === 'run.status' && ($m->payload['status'] ?? null) === 'CANCELED')); } + + public function test_provider_exception_writes_error_and_failed_status(): void + { + Queue::fake(); + $this->app->bind(AgentProviderInterface::class, function () { + return new class implements AgentProviderInterface { + public function generate(array $context, array $options = []): string + { + throw new ProviderException('HTTP_ERROR', 'provider failed', true, 500, 'boom'); + } + }; + }); + + $service = app(ChatService::class); + $dispatcher = app(RunDispatcher::class); + + $session = $service->createSession('Provider Failure'); + $prompt = $service->appendMessage([ + 'session_id' => $session->session_id, + 'role' => Message::ROLE_USER, + 'type' => 'user.prompt', + 'content' => 'trigger failure', + ]); + + $runId = $dispatcher->dispatchForPrompt($session->session_id, $prompt->message_id); + + try { + (new AgentRunJob($session->session_id, $runId))->handle( + app(RunLoop::class), + app(OutputSink::class), + app(CancelChecker::class) + ); + $this->fail('Expected provider exception'); + } catch (ProviderException $exception) { + $this->assertSame('HTTP_ERROR', $exception->errorCode); + } + + $messages = Message::query() + ->where('session_id', $session->session_id) + ->get(); + + $this->assertTrue($messages->contains(function ($m) use ($runId) { + return $m->type === 'run.status' + && ($m->payload['status'] ?? null) === 'FAILED' + && ($m->payload['run_id'] ?? null) === $runId; + })); + + $this->assertTrue($messages->contains(function ($m) use ($runId) { + return $m->type === 'error' + && $m->content === 'HTTP_ERROR' + && ($m->payload['run_id'] ?? null) === $runId + && ($m->payload['retryable'] ?? null) === true + && ($m->payload['http_status'] ?? null) === 500; + })); + } }