main: 增强会话功能,支持归档与消息检索

- 添加会话归档接口及相关服务逻辑,并确保幂等性
- 实现单条消息获取接口,校验消息所属会话
- 增加 SSE 增量推送与实时消息订阅功能
- 提供相关的测试用例覆盖新功能
- 更新接口文档,完善 OpenAPI 规范,新增多项示例
This commit is contained in:
2025-12-14 21:58:05 +08:00
parent 6356baacc0
commit 318571a6d9
7 changed files with 531 additions and 47 deletions

View File

@@ -0,0 +1,110 @@
<?php
namespace App\Http\Controllers;
use App\Http\Resources\MessageResource;
use App\Services\ChatService;
use Illuminate\Http\Request;
use Illuminate\Http\Response;
use Illuminate\Support\Facades\Redis;
use Symfony\Component\HttpFoundation\StreamedResponse;
class ChatSessionSseController extends Controller
{
public function __construct(private readonly ChatService $service)
{
}
public function stream(Request $request, string $sessionId): Response|StreamedResponse
{
$this->service->getSession($sessionId); // ensure exists
$lastEventId = $request->header('Last-Event-ID');
$afterSeq = is_numeric($lastEventId) ? (int) $lastEventId : (int) $request->query('after_seq', 0);
$limit = (int) $request->query('limit', 200);
$limit = $limit > 0 && $limit <= 500 ? $limit : 200;
if (app()->runningUnitTests() || app()->environment('testing') || ! class_exists(\Redis::class)) {
$lastSentSeq = $afterSeq;
ob_start();
$this->sendBacklog($sessionId, $lastSentSeq, $limit, false);
$content = ob_get_clean() ?: '';
return response($content, 200, [
'Content-Type' => 'text/event-stream',
'Cache-Control' => 'no-cache',
'X-Accel-Buffering' => 'no',
]);
}
$response = new StreamedResponse(function () use ($sessionId, $afterSeq, $limit) {
$lastSentSeq = $afterSeq;
$this->sendBacklog($sessionId, $lastSentSeq, $limit);
$redis = Redis::connection()->client();
if (method_exists($redis, 'setOption')) {
$redis->setOption(\Redis::OPT_READ_TIMEOUT, 5);
}
$channel = "session:{$sessionId}:messages";
$pubSub = $redis->pubSubLoop();
$pubSub->subscribe($channel);
$lastPing = time();
foreach ($pubSub as $message) {
if ($message->kind === 'subscribe') {
continue;
}
if (connection_aborted()) {
$pubSub->unsubscribe();
break;
}
$payloadId = $message->payload ?? null;
if ($payloadId) {
$msg = $this->service->getMessage($sessionId, $payloadId);
if ($msg && $msg->seq > $lastSentSeq) {
$this->emitMessage($msg);
$lastSentSeq = $msg->seq;
}
}
if (time() - $lastPing >= 20) {
echo ": ping\n\n";
@ob_flush();
@flush();
$lastPing = time();
}
}
});
$response->headers->set('Content-Type', 'text/event-stream');
$response->headers->set('Cache-Control', 'no-cache');
$response->headers->set('X-Accel-Buffering', 'no');
return $response;
}
private function sendBacklog(string $sessionId, int &$lastSentSeq, int $limit, bool $flush = true): void
{
$backlog = $this->service->listMessagesBySeq($sessionId, $lastSentSeq, $limit);
foreach ($backlog as $message) {
$this->emitMessage($message, $flush);
$lastSentSeq = $message->seq;
}
}
private function emitMessage($message, bool $flush = true): void
{
$payload = (new MessageResource($message))->resolve();
echo 'id: '.$message->seq."\n";
echo "event: message\n";
echo 'data: '.json_encode($payload, JSON_UNESCAPED_UNICODE)."\n\n";
if ($flush) {
@ob_flush();
@flush();
}
}
}