service->getSession($sessionId); // ensure exists $lastEventId = $request->header('Last-Event-ID'); $afterSeq = is_numeric($lastEventId) ? (int) $lastEventId : (int) $request->query('after_seq', 0); $limit = (int) $request->query('limit', 200); $limit = $limit > 0 && $limit <= 500 ? $limit : 200; $useBacklogOnly = app()->runningUnitTests() || app()->environment('testing') || defined('PHPUNIT_COMPOSER_INSTALL') || ! class_exists(\Redis::class); if ($useBacklogOnly) { $lastSentSeq = $afterSeq; ob_start(); $this->sendBacklog($sessionId, $lastSentSeq, $limit, false); $content = ob_get_clean() ?: ''; return response($content, 200, [ 'Content-Type' => 'text/event-stream', 'Cache-Control' => 'no-cache', 'X-Accel-Buffering' => 'no', ]); } $response = new StreamedResponse(function () use ($sessionId, $afterSeq, $limit) { $lastSentSeq = $afterSeq; $lastHeartbeat = microtime(true); $this->sendBacklog($sessionId, $lastSentSeq, $limit); $this->sendHeartbeat($lastHeartbeat); try { $redis = Redis::connection()->client(); if (method_exists($redis, 'setOption')) { $redis->setOption(\Redis::OPT_READ_TIMEOUT, 360); } $channel = "session:{$sessionId}:messages"; logger('sse open'); // Fallback for Redis drivers without pubSubLoop (older phpredis) $redis->subscribe([$channel], function ($redisInstance, $chan, $payload) use (&$lastSentSeq, $sessionId, $limit, &$lastHeartbeat) { if (connection_aborted()) { logger('sse aborted'); $redisInstance->unsubscribe([$chan]); return; } $this->sendHeartbeat($lastHeartbeat); if (! $payload) { return; } $msg = json_decode($payload, true, 512, JSON_THROW_ON_ERROR); $msg = new Message($msg); // message.delta 不持久化,seq=0,直接推送 if ($msg && $msg->type === 'message.delta') { $this->emitMessage($msg, true); return; } if ($msg && $msg->seq > $lastSentSeq) { if ($msg->seq > $lastSentSeq + 1) { $this->sendBacklog($sessionId, $lastSentSeq, $limit); } if ($msg->seq > $lastSentSeq) { $this->emitMessage($msg); $lastSentSeq = $msg->seq; } } }); } catch (\RedisException $exception) { logger()->warning('SSE redis subscription failed', [ 'session_id' => $sessionId, 'message' => $exception->getMessage(), 'code' => $exception->getCode(), ]); echo ": redis-error\n\n"; @ob_flush(); @flush(); } }); $response->headers->set('Content-Type', 'text/event-stream'); $response->headers->set('Cache-Control', 'no-cache'); $response->headers->set('X-Accel-Buffering', 'no'); return $response; } private function sendBacklog(string $sessionId, int &$lastSentSeq, int $limit, bool $flush = true): void { $backlog = $this->service->listMessagesBySeq($sessionId, $lastSentSeq, $limit); foreach ($backlog as $message) { $this->emitMessage($message, $flush); $lastSentSeq = $message->seq; } } private function emitMessage($message, bool $flush = true): void { $payload = (new MessageResource($message))->resolve(); echo 'id: '.$message->seq."\n"; echo "event: message\n"; echo 'data: '.json_encode($payload, JSON_UNESCAPED_UNICODE)."\n\n"; if ($flush) { @ob_flush(); @flush(); } } private function sendHeartbeat(float &$lastHeartbeat, int $intervalSeconds = 15): void { if ((microtime(true) - $lastHeartbeat) < $intervalSeconds) { return; } echo ": ping\n\n"; @ob_flush(); @flush(); $lastHeartbeat = microtime(true); } }