service->getSession($sessionId); // ensure exists $lastEventId = $request->header('Last-Event-ID'); $afterSeq = is_numeric($lastEventId) ? (int) $lastEventId : (int) $request->query('after_seq', 0); $limit = (int) $request->query('limit', 200); $limit = $limit > 0 && $limit <= 500 ? $limit : 200; $useBacklogOnly = app()->runningUnitTests() || app()->environment('testing') || defined('PHPUNIT_COMPOSER_INSTALL') || ! class_exists(\Redis::class); if ($useBacklogOnly) { $lastSentSeq = $afterSeq; ob_start(); $this->sendBacklog($sessionId, $lastSentSeq, $limit, false); $content = ob_get_clean() ?: ''; return response($content, 200, [ 'Content-Type' => 'text/event-stream', 'Cache-Control' => 'no-cache', 'X-Accel-Buffering' => 'no', ]); } $response = new StreamedResponse(function () use ($sessionId, $afterSeq, $limit) { $lastSentSeq = $afterSeq; $this->sendBacklog($sessionId, $lastSentSeq, $limit); try { $redis = Redis::connection()->client(); if (method_exists($redis, 'setOption')) { $redis->setOption(\Redis::OPT_READ_TIMEOUT, 5); } $channel = "session:{$sessionId}:messages"; $lastPing = time(); logger()->info('sse open'); if (method_exists($redis, 'pubSubLoop')) { $pubSub = $redis->pubSubLoop(); $pubSub->subscribe($channel); foreach ($pubSub as $message) { if ($message->kind === 'subscribe') { continue; } if (connection_aborted()) { $pubSub->unsubscribe(); break; } $payloadId = $message->payload ?? null; if ($payloadId) { $msg = $this->service->getMessage($sessionId, $payloadId); if ($msg && $msg->seq > $lastSentSeq) { $this->emitMessage($msg); $lastSentSeq = $msg->seq; } } if (time() - $lastPing >= 180) { logger()->info('ping: sent'.$sessionId); echo ": ping\n\n"; @ob_flush(); @flush(); $lastPing = time(); } } logger()->info('close: sent'.$sessionId); unset($pubSub); } else { // Fallback for Redis drivers without pubSubLoop (older phpredis) $redis->subscribe([$channel], function ($redisInstance, $chan, $payload) use (&$lastSentSeq, $sessionId) { if (connection_aborted()) { $redisInstance->unsubscribe([$chan]); return; } if (! $payload) { return; } $msg = $this->service->getMessage($sessionId, $payload); if ($msg && $msg->seq > $lastSentSeq) { $this->emitMessage($msg); $lastSentSeq = $msg->seq; } }); } } catch (\RedisException $exception) { logger()->warning('SSE redis subscription failed', [ 'session_id' => $sessionId, 'message' => $exception->getMessage(), ]); echo ": redis-error\n\n"; @ob_flush(); @flush(); } }); $response->headers->set('Content-Type', 'text/event-stream'); $response->headers->set('Cache-Control', 'no-cache'); $response->headers->set('X-Accel-Buffering', 'no'); return $response; } private function sendBacklog(string $sessionId, int &$lastSentSeq, int $limit, bool $flush = true): void { $backlog = $this->service->listMessagesBySeq($sessionId, $lastSentSeq, $limit); foreach ($backlog as $message) { $this->emitMessage($message, $flush); $lastSentSeq = $message->seq; } } private function emitMessage($message, bool $flush = true): void { $payload = (new MessageResource($message))->resolve(); echo 'id: '.$message->seq."\n"; echo "event: message\n"; echo 'data: '.json_encode($payload, JSON_UNESCAPED_UNICODE)."\n\n"; if ($flush) { @ob_flush(); @flush(); } } }