Files
ars-backend/app/Http/Controllers/ChatSessionSseController.php
Roog 6d934f4e34 main: 增强 Agent Run 调度可靠性与幂等性
- 默认切换 AgentProvider 为 HttpAgentProvider,增强网络请求的容错和重试机制
- 优化 Run 逻辑,支持多场景去重与并发保护
- 添加 Redis 发布失败的日志记录以提升问题排查效率
- 扩展 OpenAPI 规范,新增 Error 和 Run 状态相关模型
- 增强测试覆盖,验证调度策略和重复请求的幂等性
- 增加数据库索引以优化查询性能
- 更新所有相关文档和配置文件
2025-12-18 17:41:42 +08:00

139 lines
4.8 KiB
PHP

<?php
namespace App\Http\Controllers;
use App\Http\Resources\MessageResource;
use App\Services\ChatService;
use Illuminate\Http\Request;
use Illuminate\Http\Response;
use Illuminate\Support\Facades\Redis;
use Symfony\Component\HttpFoundation\StreamedResponse;
class ChatSessionSseController extends Controller
{
public function __construct(private readonly ChatService $service)
{
}
public function stream(Request $request, string $sessionId): Response|StreamedResponse
{
set_time_limit(360);
$this->service->getSession($sessionId); // ensure exists
$lastEventId = $request->header('Last-Event-ID');
$afterSeq = is_numeric($lastEventId) ? (int) $lastEventId : (int) $request->query('after_seq', 0);
$limit = (int) $request->query('limit', 200);
$limit = $limit > 0 && $limit <= 500 ? $limit : 200;
$useBacklogOnly = app()->runningUnitTests()
|| app()->environment('testing')
|| defined('PHPUNIT_COMPOSER_INSTALL')
|| ! class_exists(\Redis::class);
if ($useBacklogOnly) {
$lastSentSeq = $afterSeq;
ob_start();
$this->sendBacklog($sessionId, $lastSentSeq, $limit, false);
$content = ob_get_clean() ?: '';
return response($content, 200, [
'Content-Type' => 'text/event-stream',
'Cache-Control' => 'no-cache',
'X-Accel-Buffering' => 'no',
]);
}
$response = new StreamedResponse(function () use ($sessionId, $afterSeq, $limit) {
$lastSentSeq = $afterSeq;
$lastHeartbeat = microtime(true);
$this->sendBacklog($sessionId, $lastSentSeq, $limit);
$this->sendHeartbeat($lastHeartbeat);
try {
$redis = Redis::connection()->client();
if (method_exists($redis, 'setOption')) {
$redis->setOption(\Redis::OPT_READ_TIMEOUT, 360);
}
$channel = "session:{$sessionId}:messages";
logger('sse open');
// Fallback for Redis drivers without pubSubLoop (older phpredis)
$redis->subscribe([$channel], function ($redisInstance, $chan, $payload) use (&$lastSentSeq, $sessionId, $limit, &$lastHeartbeat) {
if (connection_aborted()) {
logger('sse aborted');
$redisInstance->unsubscribe([$chan]);
return;
}
$this->sendHeartbeat($lastHeartbeat);
if (! $payload) {
return;
}
$msg = $this->service->getMessage($sessionId, $payload);
if ($msg && $msg->seq > $lastSentSeq) {
if ($msg->seq > $lastSentSeq + 1) {
$this->sendBacklog($sessionId, $lastSentSeq, $limit);
}
if ($msg->seq > $lastSentSeq) {
$this->emitMessage($msg);
$lastSentSeq = $msg->seq;
}
}
});
} catch (\RedisException $exception) {
logger()->warning('SSE redis subscription failed', [
'session_id' => $sessionId,
'message' => $exception->getMessage(),
'code' => $exception->getCode(),
]);
echo ": redis-error\n\n";
@ob_flush();
@flush();
}
});
$response->headers->set('Content-Type', 'text/event-stream');
$response->headers->set('Cache-Control', 'no-cache');
$response->headers->set('X-Accel-Buffering', 'no');
return $response;
}
private function sendBacklog(string $sessionId, int &$lastSentSeq, int $limit, bool $flush = true): void
{
$backlog = $this->service->listMessagesBySeq($sessionId, $lastSentSeq, $limit);
foreach ($backlog as $message) {
$this->emitMessage($message, $flush);
$lastSentSeq = $message->seq;
}
}
private function emitMessage($message, bool $flush = true): void
{
$payload = (new MessageResource($message))->resolve();
echo 'id: '.$message->seq."\n";
echo "event: message\n";
echo 'data: '.json_encode($payload, JSON_UNESCAPED_UNICODE)."\n\n";
if ($flush) {
@ob_flush();
@flush();
}
}
private function sendHeartbeat(float &$lastHeartbeat, int $intervalSeconds = 15): void
{
if ((microtime(true) - $lastHeartbeat) < $intervalSeconds) {
return;
}
echo ": ping\n\n";
@ob_flush();
@flush();
$lastHeartbeat = microtime(true);
}
}