main: 增强 Agent Run 调度可靠性与幂等性

- 默认切换 AgentProvider 为 HttpAgentProvider,增强网络请求的容错和重试机制
- 优化 Run 逻辑,支持多场景去重与并发保护
- 添加 Redis 发布失败的日志记录以提升问题排查效率
- 扩展 OpenAPI 规范,新增 Error 和 Run 状态相关模型
- 增强测试覆盖,验证调度策略和重复请求的幂等性
- 增加数据库索引以优化查询性能
- 更新所有相关文档和配置文件
This commit is contained in:
2025-12-18 17:41:42 +08:00
parent 2ad101c297
commit 6d934f4e34
16 changed files with 634 additions and 118 deletions

View File

@@ -17,6 +17,7 @@ class ChatSessionSseController extends Controller
public function stream(Request $request, string $sessionId): Response|StreamedResponse
{
set_time_limit(360);
$this->service->getSession($sessionId); // ensure exists
$lastEventId = $request->header('Last-Event-ID');
@@ -44,74 +45,50 @@ class ChatSessionSseController extends Controller
$response = new StreamedResponse(function () use ($sessionId, $afterSeq, $limit) {
$lastSentSeq = $afterSeq;
$lastHeartbeat = microtime(true);
$this->sendBacklog($sessionId, $lastSentSeq, $limit);
$this->sendHeartbeat($lastHeartbeat);
try {
$redis = Redis::connection()->client();
if (method_exists($redis, 'setOption')) {
$redis->setOption(\Redis::OPT_READ_TIMEOUT, 5);
$redis->setOption(\Redis::OPT_READ_TIMEOUT, 360);
}
$channel = "session:{$sessionId}:messages";
$lastPing = time();
logger()->info('sse open');
if (method_exists($redis, 'pubSubLoop')) {
$pubSub = $redis->pubSubLoop();
$pubSub->subscribe($channel);
foreach ($pubSub as $message) {
if ($message->kind === 'subscribe') {
continue;
}
if (connection_aborted()) {
$pubSub->unsubscribe();
break;
}
$payloadId = $message->payload ?? null;
if ($payloadId) {
$msg = $this->service->getMessage($sessionId, $payloadId);
if ($msg && $msg->seq > $lastSentSeq) {
$this->emitMessage($msg);
$lastSentSeq = $msg->seq;
}
}
if (time() - $lastPing >= 180) {
logger()->info('ping: sent'.$sessionId);
echo ": ping\n\n";
@ob_flush();
@flush();
$lastPing = time();
}
logger('sse open');
// Fallback for Redis drivers without pubSubLoop (older phpredis)
$redis->subscribe([$channel], function ($redisInstance, $chan, $payload) use (&$lastSentSeq, $sessionId, $limit, &$lastHeartbeat) {
if (connection_aborted()) {
logger('sse aborted');
$redisInstance->unsubscribe([$chan]);
return;
}
logger()->info('close: sent'.$sessionId);
unset($pubSub);
} else {
// Fallback for Redis drivers without pubSubLoop (older phpredis)
$redis->subscribe([$channel], function ($redisInstance, $chan, $payload) use (&$lastSentSeq, $sessionId) {
if (connection_aborted()) {
$redisInstance->unsubscribe([$chan]);
return;
$this->sendHeartbeat($lastHeartbeat);
if (! $payload) {
return;
}
$msg = $this->service->getMessage($sessionId, $payload);
if ($msg && $msg->seq > $lastSentSeq) {
if ($msg->seq > $lastSentSeq + 1) {
$this->sendBacklog($sessionId, $lastSentSeq, $limit);
}
if (! $payload) {
return;
}
$msg = $this->service->getMessage($sessionId, $payload);
if ($msg && $msg->seq > $lastSentSeq) {
if ($msg->seq > $lastSentSeq) {
$this->emitMessage($msg);
$lastSentSeq = $msg->seq;
}
});
}
}
});
} catch (\RedisException $exception) {
logger()->warning('SSE redis subscription failed', [
'session_id' => $sessionId,
'message' => $exception->getMessage(),
'code' => $exception->getCode(),
]);
echo ": redis-error\n\n";
@ob_flush();
@@ -146,4 +123,16 @@ class ChatSessionSseController extends Controller
@flush();
}
}
private function sendHeartbeat(float &$lastHeartbeat, int $intervalSeconds = 15): void
{
if ((microtime(true) - $lastHeartbeat) < $intervalSeconds) {
return;
}
echo ": ping\n\n";
@ob_flush();
@flush();
$lastHeartbeat = microtime(true);
}
}

View File

@@ -2,6 +2,9 @@
namespace App\Jobs;
use App\Models\Message;
use App\Services\Agent\ProviderException;
use App\Services\CancelChecker;
use App\Services\OutputSink;
use App\Services\RunLoop;
use Illuminate\Bus\Queueable;
@@ -14,22 +17,61 @@ class AgentRunJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public int $tries;
public int $timeout;
public int $backoff;
public function __construct(public string $sessionId, public string $runId)
{
$this->tries = (int) config('agent.job.tries', 1);
$this->timeout = (int) config('agent.job.timeout_seconds', 120);
$this->backoff = (int) config('agent.job.backoff_seconds', 5);
}
public function handle(RunLoop $loop, OutputSink $sink): void
public function handle(RunLoop $loop, OutputSink $sink, CancelChecker $cancelChecker): void
{
try {
logger("Running run {$this->runId} for session {$this->sessionId}");
$loop->run($this->sessionId, $this->runId);
} catch (\Throwable $e) {
$sink->appendError($this->sessionId, $this->runId, 'run.failed', $e->getMessage());
logger("Running error {$this->runId} for session {$this->sessionId}");
logger("error message:",[$e->getMessage(),$e->getTraceAsString()]);
$errorCode = $e instanceof ProviderException ? $e->errorCode : 'run.failed';
$dedupeKey = $e instanceof ProviderException
? "run:{$this->runId}:error:provider"
: "run:{$this->runId}:error:job";
$sink->appendError($this->sessionId, $this->runId, $errorCode, $e->getMessage(), [], $dedupeKey);
$sink->appendRunStatus($this->sessionId, $this->runId, 'FAILED', [
'error' => $e->getMessage(),
'dedupe_key' => "run:{$this->runId}:status:FAILED",
]);
throw $e;
} finally {
$latestStatus = Message::query()
->where('session_id', $this->sessionId)
->where('type', 'run.status')
->whereRaw("payload->>'run_id' = ?", [$this->runId])
->orderByDesc('seq')
->first();
$status = $latestStatus ? ($latestStatus->payload['status'] ?? null) : null;
if ($status === 'RUNNING' || ! $status) {
if ($cancelChecker->isCanceled($this->sessionId, $this->runId)) {
$sink->appendRunStatus($this->sessionId, $this->runId, 'CANCELED', [
'dedupe_key' => "run:{$this->runId}:status:CANCELED",
]);
} else {
$sink->appendRunStatus($this->sessionId, $this->runId, 'FAILED', [
'error' => 'JOB_ENDED_UNEXPECTEDLY',
'dedupe_key' => "run:{$this->runId}:status:FAILED",
]);
}
}
}
}
}

View File

@@ -12,7 +12,7 @@ class AppServiceProvider extends ServiceProvider
public function register(): void
{
$this->app->bind(\App\Services\Agent\AgentProviderInterface::class, function () {
return new \App\Services\Agent\DummyAgentProvider();
return new \App\Services\Agent\HttpAgentProvider();
});
}

View File

@@ -2,15 +2,24 @@
namespace App\Services\Agent;
use Illuminate\Http\Client\ConnectionException;
use Illuminate\Support\Facades\Http;
class HttpAgentProvider implements AgentProviderInterface
{
protected string $endpoint;
protected int $timeoutSeconds;
protected int $connectTimeoutSeconds;
protected int $retryTimes;
protected int $retryBackoffMs;
public function __construct(?string $endpoint = null)
{
$this->endpoint = $endpoint ?? config('services.agent_provider.endpoint', '');
$this->endpoint = $endpoint ?? config('agent.provider.endpoint', '');
$this->timeoutSeconds = (int) config('agent.provider.timeout_seconds', 30);
$this->connectTimeoutSeconds = (int) config('agent.provider.connect_timeout_seconds', 5);
$this->retryTimes = (int) config('agent.provider.retry_times', 1);
$this->retryBackoffMs = (int) config('agent.provider.retry_backoff_ms', 500);
}
/**
@@ -29,14 +38,69 @@ class HttpAgentProvider implements AgentProviderInterface
'options' => $options,
];
$response = Http::post($this->endpoint, $payload);
$attempts = $this->retryTimes + 1;
$lastException = null;
$lastResponseBody = null;
$lastStatus = null;
if (! $response->successful()) {
throw new \RuntimeException('Agent provider failed: '.$response->body());
for ($attempt = 1; $attempt <= $attempts; $attempt++) {
try {
$response = Http::connectTimeout($this->connectTimeoutSeconds)
->timeout($this->timeoutSeconds)
->post($this->endpoint, $payload);
$lastStatus = $response->status();
$lastResponseBody = $response->body();
if ($response->successful()) {
$data = $response->json();
return is_string($data) ? $data : ($data['content'] ?? '');
}
$retryable = $lastStatus === 429 || $lastStatus >= 500;
if ($retryable && $attempt < $attempts) {
usleep($this->retryBackoffMs * 1000);
continue;
}
throw new ProviderException(
'HTTP_ERROR',
'Agent provider failed',
$retryable,
$lastStatus,
$lastResponseBody
);
} catch (ConnectionException $exception) {
$lastException = $exception;
if ($attempt < $attempts) {
usleep($this->retryBackoffMs * 1000);
continue;
}
} catch (\Throwable $exception) {
$lastException = $exception;
break;
}
}
$data = $response->json();
$rawMessage = $lastException ? $lastException->getMessage() : $lastResponseBody;
return is_string($data) ? $data : ($data['content'] ?? '');
if ($lastException instanceof ConnectionException) {
throw new ProviderException(
'CONNECTION_FAILED',
'Agent provider connection failed',
true,
$lastStatus,
$rawMessage
);
}
throw new ProviderException(
'UNKNOWN_ERROR',
'Agent provider error',
false,
$lastStatus,
$rawMessage
);
}
}

View File

@@ -0,0 +1,16 @@
<?php
namespace App\Services\Agent;
class ProviderException extends \RuntimeException
{
public function __construct(
public readonly string $errorCode,
string $message,
public readonly bool $retryable = false,
public readonly ?int $httpStatus = null,
public readonly ?string $rawMessage = null,
) {
parent::__construct($message);
}
}

View File

@@ -12,7 +12,7 @@ class CancelChecker
->where('session_id', $sessionId)
->where('type', 'run.cancel.request')
->whereIn('role', [Message::ROLE_USER, Message::ROLE_SYSTEM])
->where('payload->run_id', $runId)
->whereRaw("payload->>'run_id' = ?", [$runId])
->exists();
}
}

View File

@@ -56,14 +56,16 @@ class ChatService
* - payload: 附加信息(可选)
* - reply_to: 被回复的消息 ID可选
* - dedupe_key: 消息去重键(可选)
* @param bool|null $wasDeduped 是否发生了去重(可选,按引用返回)
* @return Message 返回成功追加的消息实例。如果存在去重键并已存在重复消息,则返回现有的消息。
*/
public function appendMessage(array $dto): Message
public function appendMessage(array $dto, ?bool &$wasDeduped = null): Message
{
$messageRef = null;
$isNew = false;
$wasDeduped = false;
DB::transaction(function () use ($dto, &$messageRef, &$isNew) {
DB::transaction(function () use ($dto, &$messageRef, &$isNew, &$wasDeduped) {
/** @var ChatSession $session */
$session = ChatSession::query()
->whereKey($dto['session_id'])
@@ -81,6 +83,7 @@ class ChatService
if ($existing) {
$messageRef = $existing;
$wasDeduped = true;
return;
}
}
@@ -112,6 +115,7 @@ class ChatService
if ($existing) {
$messageRef = $existing;
$wasDeduped = true;
return;
}
}
@@ -288,9 +292,11 @@ class ChatService
try {
Redis::publish($channel, $message->message_id);
} catch (\Throwable $e) {
if (! app()->runningUnitTests()) {
throw $e;
}
logger()->warning('Redis publish failed', [
'session_id' => $message->session_id,
'message_id' => $message->message_id,
'error' => $e->getMessage(),
]);
}
}

View File

@@ -13,21 +13,24 @@ class OutputSink
/**
* @param array<string, mixed> $meta
*/
public function appendAgentMessage(string $sessionId, string $runId, string $content, array $meta = []): Message
public function appendAgentMessage(string $sessionId, string $runId, string $content, array $meta = [], ?string $dedupeKey = null): Message
{
$dedupeKey = $dedupeKey ?? "run:{$runId}:agent:message";
return $this->chatService->appendMessage([
'session_id' => $sessionId,
'role' => Message::ROLE_AGENT,
'type' => 'agent.message',
'content' => $content,
'payload' => array_merge($meta, ['run_id' => $runId]),
'dedupe_key' => $dedupeKey,
]);
}
/**
* @param array<string, mixed> $meta
*/
public function appendRunStatus(string $sessionId, string $runId, string $status, array $meta = []): Message
public function appendRunStatus(string $sessionId, string $runId, string $status, array $meta = [], ?bool &$wasDeduped = null): Message
{
$dedupeKey = $meta['dedupe_key'] ?? null;
unset($meta['dedupe_key']);
@@ -36,18 +39,19 @@ class OutputSink
'session_id' => $sessionId,
'role' => Message::ROLE_SYSTEM,
'type' => 'run.status',
'content' => null,
'payload' => array_merge($meta, [
'run_id' => $runId,
'status' => $status,
]),
'dedupe_key' => $dedupeKey,
]);
], $wasDeduped);
}
/**
* @param array<string, mixed> $meta
*/
public function appendError(string $sessionId, string $runId, string $code, string $message, array $meta = []): Message
public function appendError(string $sessionId, string $runId, string $code, string $message, array $meta = [], ?string $dedupeKey = null): Message
{
return $this->chatService->appendMessage([
'session_id' => $sessionId,
@@ -58,6 +62,7 @@ class OutputSink
'run_id' => $runId,
'message' => $message,
]),
'dedupe_key' => $dedupeKey,
]);
}
}

View File

@@ -3,8 +3,10 @@
namespace App\Services;
use App\Jobs\AgentRunJob;
use App\Models\ChatSession;
use App\Models\Message;
use Illuminate\Database\Eloquent\ModelNotFoundException;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Str;
class RunDispatcher
@@ -25,36 +27,50 @@ class RunDispatcher
throw (new ModelNotFoundException())->setModel(Message::class, [$triggerMessageId]);
}
$existingForTrigger = Message::query()
->where('session_id', $sessionId)
->where('type', 'run.status')
->where('payload->trigger_message_id', $triggerMessageId)
->orderByDesc('seq')
->first();
$shouldDispatch = false;
if ($existingForTrigger && ($existingForTrigger->payload['run_id'] ?? null)) {
return $existingForTrigger->payload['run_id'];
$runId = DB::transaction(function () use ($sessionId, $triggerMessageId, &$shouldDispatch) {
ChatSession::query()
->whereKey($sessionId)
->lockForUpdate()
->firstOrFail();
$latestStatus = Message::query()
->where('session_id', $sessionId)
->where('type', 'run.status')
->orderByDesc('seq')
->first();
if ($latestStatus && ($latestStatus->payload['status'] ?? null) === 'RUNNING' && ($latestStatus->payload['run_id'] ?? null)) {
logger('existing run found', ['sessionId' => $sessionId, 'runId' => $latestStatus->payload['run_id']]);
return $latestStatus->payload['run_id'];
}
$candidateRunId = (string) Str::uuid();
$wasDeduped = null;
$statusMessage = $this->outputSink->appendRunStatus($sessionId, $candidateRunId, 'RUNNING', [
'trigger_message_id' => $triggerMessageId,
'dedupe_key' => 'run:trigger:'.$triggerMessageId,
], $wasDeduped);
$finalRunId = $statusMessage->payload['run_id'] ?? $candidateRunId;
if ($wasDeduped) {
logger('existing run found', ['sessionId' => $sessionId, 'runId' => $finalRunId]);
return $finalRunId;
}
$shouldDispatch = true;
return $finalRunId;
});
if ($shouldDispatch) {
logger('dispatching run', ['sessionId' => $sessionId, 'runId' => $runId]);
dispatch(new AgentRunJob($sessionId, $runId));
}
$latestStatus = Message::query()
->where('session_id', $sessionId)
->where('type', 'run.status')
->orderByDesc('seq')
->first();
if ($latestStatus && ($latestStatus->payload['status'] ?? null) === 'RUNNING' && ($latestStatus->payload['run_id'] ?? null)) {
return $latestStatus->payload['run_id'];
}
$runId = (string) Str::uuid();
$this->outputSink->appendRunStatus($sessionId, $runId, 'RUNNING', [
'trigger_message_id' => $triggerMessageId,
'dedupe_key' => 'run:trigger:'.$triggerMessageId,
]);
dispatch(new AgentRunJob($sessionId, $runId));
return $runId;
}
}

View File

@@ -4,9 +4,13 @@ namespace App\Services;
use App\Services\Agent\AgentProviderInterface;
use App\Services\Agent\DummyAgentProvider;
use App\Services\Agent\ProviderException;
use App\Models\Message;
class RunLoop
{
private const TERMINAL_STATUSES = ['DONE', 'FAILED', 'CANCELED'];
public function __construct(
private readonly ContextBuilder $contextBuilder,
private readonly AgentProviderInterface $provider,
@@ -17,36 +21,106 @@ class RunLoop
public function run(string $sessionId, string $runId): void
{
if ($this->isRunTerminal($sessionId, $runId)) {
return;
}
if ($this->cancelChecker->isCanceled($sessionId, $runId)) {
$this->outputSink->appendRunStatus($sessionId, $runId, 'CANCELED', [
'dedupe_key' => "run:{$runId}:status:CANCELED",
]);
$this->appendCanceled($sessionId, $runId);
return;
}
$context = $this->contextBuilder->build($sessionId, $runId);
if ($this->cancelChecker->isCanceled($sessionId, $runId)) {
$this->outputSink->appendRunStatus($sessionId, $runId, 'CANCELED', [
'dedupe_key' => "run:{$runId}:status:CANCELED",
]);
$this->appendCanceled($sessionId, $runId);
return;
}
$reply = $this->provider->generate($context);
$providerName = $this->resolveProviderName();
$startedAt = microtime(true);
logger('agent provider request', [
'sessionId' => $sessionId,
'runId' => $runId,
'provider' => $providerName,
]);
try {
$reply = $this->provider->generate($context);
} catch (ProviderException $exception) {
$latencyMs = (int) ((microtime(true) - $startedAt) * 1000);
$this->outputSink->appendError($sessionId, $runId, $exception->errorCode, $exception->getMessage(), [
'retryable' => $exception->retryable,
'http_status' => $exception->httpStatus,
'provider' => $providerName,
'latency_ms' => $latencyMs,
'raw_message' => $exception->rawMessage,
], "run:{$runId}:error:provider");
$this->outputSink->appendRunStatus($sessionId, $runId, 'FAILED', [
'error' => $exception->getMessage(),
'dedupe_key' => "run:{$runId}:status:FAILED",
]);
throw $exception;
}
$latencyMs = (int) ((microtime(true) - $startedAt) * 1000);
logger('agent provider response', [
'sessionId' => $sessionId,
'runId' => $runId,
'provider' => $providerName,
'latency_ms' => $latencyMs,
]);
if ($this->cancelChecker->isCanceled($sessionId, $runId)) {
$this->outputSink->appendRunStatus($sessionId, $runId, 'CANCELED', [
'dedupe_key' => "run:{$runId}:status:CANCELED",
]);
$this->appendCanceled($sessionId, $runId);
return;
}
$this->outputSink->appendAgentMessage($sessionId, $runId, $reply, [
'provider' => $this->provider instanceof DummyAgentProvider ? 'dummy' : get_class($this->provider),
]);
'provider' => $providerName,
], "run:{$runId}:agent:message");
if ($this->cancelChecker->isCanceled($sessionId, $runId)) {
$this->appendCanceled($sessionId, $runId);
return;
}
$this->outputSink->appendRunStatus($sessionId, $runId, 'DONE', [
'dedupe_key' => "run:{$runId}:status:DONE",
]);
}
private function isRunTerminal(string $sessionId, string $runId): bool
{
$latestStatus = Message::query()
->where('session_id', $sessionId)
->where('type', 'run.status')
->whereRaw("payload->>'run_id' = ?", [$runId])
->orderByDesc('seq')
->first();
$status = $latestStatus ? ($latestStatus->payload['status'] ?? null) : null;
return in_array($status, self::TERMINAL_STATUSES, true);
}
private function appendCanceled(string $sessionId, string $runId): void
{
$this->outputSink->appendRunStatus($sessionId, $runId, 'CANCELED', [
'dedupe_key' => "run:{$runId}:status:CANCELED",
]);
}
private function resolveProviderName(): string
{
if ($this->provider instanceof DummyAgentProvider) {
return 'dummy';
}
return str_replace("\0", '', get_class($this->provider));
}
}