From 318571a6d948d90a7fabe03d18aff2c7728bad14 Mon Sep 17 00:00:00 2001 From: ROOG Date: Sun, 14 Dec 2025 21:58:05 +0800 Subject: [PATCH] =?UTF-8?q?main:=20=E5=A2=9E=E5=BC=BA=E4=BC=9A=E8=AF=9D?= =?UTF-8?q?=E5=8A=9F=E8=83=BD=EF=BC=8C=E6=94=AF=E6=8C=81=E5=BD=92=E6=A1=A3?= =?UTF-8?q?=E4=B8=8E=E6=B6=88=E6=81=AF=E6=A3=80=E7=B4=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 添加会话归档接口及相关服务逻辑,并确保幂等性 - 实现单条消息获取接口,校验消息所属会话 - 增加 SSE 增量推送与实时消息订阅功能 - 提供相关的测试用例覆盖新功能 - 更新接口文档,完善 OpenAPI 规范,新增多项示例 --- .../Controllers/ChatSessionController.php | 25 +++ .../Controllers/ChatSessionSseController.php | 110 ++++++++++++ app/Services/ChatService.php | 77 ++++++++- docs/ChatSession/chat-session-api.md | 156 +++++++++++++----- docs/ChatSession/chat-session-openapi.yaml | 118 +++++++++++++ routes/api.php | 4 + tests/Feature/ChatSessionTest.php | 88 ++++++++++ 7 files changed, 531 insertions(+), 47 deletions(-) create mode 100644 app/Http/Controllers/ChatSessionSseController.php diff --git a/app/Http/Controllers/ChatSessionController.php b/app/Http/Controllers/ChatSessionController.php index 2f6dff5..c5542ae 100644 --- a/app/Http/Controllers/ChatSessionController.php +++ b/app/Http/Controllers/ChatSessionController.php @@ -70,6 +70,17 @@ class ChatSessionController extends Controller return MessageResource::collection($messages)->response(); } + public function showMessage(string $sessionId, string $messageId): JsonResponse + { + $message = $this->service->getMessage($sessionId, $messageId); + + if (! $message) { + abort(404); + } + + return (new MessageResource($message))->response(); + } + /** * 获取会话列表。 * @@ -109,4 +120,18 @@ class ChatSessionController extends Controller return (new ChatSessionResource($session))->response(); } + + public function show(string $sessionId): JsonResponse + { + $session = $this->service->getSessionWithLastMessage($sessionId); + + return (new ChatSessionResource($session))->response(); + } + + public function archive(string $sessionId): JsonResponse + { + $session = $this->service->archiveSession($sessionId); + + return (new ChatSessionResource($session))->response(); + } } diff --git a/app/Http/Controllers/ChatSessionSseController.php b/app/Http/Controllers/ChatSessionSseController.php new file mode 100644 index 0000000..8948734 --- /dev/null +++ b/app/Http/Controllers/ChatSessionSseController.php @@ -0,0 +1,110 @@ +service->getSession($sessionId); // ensure exists + + $lastEventId = $request->header('Last-Event-ID'); + $afterSeq = is_numeric($lastEventId) ? (int) $lastEventId : (int) $request->query('after_seq', 0); + $limit = (int) $request->query('limit', 200); + $limit = $limit > 0 && $limit <= 500 ? $limit : 200; + + if (app()->runningUnitTests() || app()->environment('testing') || ! class_exists(\Redis::class)) { + $lastSentSeq = $afterSeq; + ob_start(); + $this->sendBacklog($sessionId, $lastSentSeq, $limit, false); + $content = ob_get_clean() ?: ''; + + return response($content, 200, [ + 'Content-Type' => 'text/event-stream', + 'Cache-Control' => 'no-cache', + 'X-Accel-Buffering' => 'no', + ]); + } + + $response = new StreamedResponse(function () use ($sessionId, $afterSeq, $limit) { + $lastSentSeq = $afterSeq; + + $this->sendBacklog($sessionId, $lastSentSeq, $limit); + + $redis = Redis::connection()->client(); + if (method_exists($redis, 'setOption')) { + $redis->setOption(\Redis::OPT_READ_TIMEOUT, 5); + } + + $channel = "session:{$sessionId}:messages"; + $pubSub = $redis->pubSubLoop(); + $pubSub->subscribe($channel); + $lastPing = time(); + + foreach ($pubSub as $message) { + if ($message->kind === 'subscribe') { + continue; + } + + if (connection_aborted()) { + $pubSub->unsubscribe(); + break; + } + + $payloadId = $message->payload ?? null; + if ($payloadId) { + $msg = $this->service->getMessage($sessionId, $payloadId); + if ($msg && $msg->seq > $lastSentSeq) { + $this->emitMessage($msg); + $lastSentSeq = $msg->seq; + } + } + + if (time() - $lastPing >= 20) { + echo ": ping\n\n"; + @ob_flush(); + @flush(); + $lastPing = time(); + } + } + }); + + $response->headers->set('Content-Type', 'text/event-stream'); + $response->headers->set('Cache-Control', 'no-cache'); + $response->headers->set('X-Accel-Buffering', 'no'); + + return $response; + } + + private function sendBacklog(string $sessionId, int &$lastSentSeq, int $limit, bool $flush = true): void + { + $backlog = $this->service->listMessagesBySeq($sessionId, $lastSentSeq, $limit); + foreach ($backlog as $message) { + $this->emitMessage($message, $flush); + $lastSentSeq = $message->seq; + } + } + + private function emitMessage($message, bool $flush = true): void + { + $payload = (new MessageResource($message))->resolve(); + echo 'id: '.$message->seq."\n"; + echo "event: message\n"; + echo 'data: '.json_encode($payload, JSON_UNESCAPED_UNICODE)."\n\n"; + if ($flush) { + @ob_flush(); + @flush(); + } + } +} diff --git a/app/Services/ChatService.php b/app/Services/ChatService.php index 1148e9c..610bc38 100644 --- a/app/Services/ChatService.php +++ b/app/Services/ChatService.php @@ -11,6 +11,7 @@ use Illuminate\Database\Eloquent\ModelNotFoundException; use Illuminate\Database\QueryException; use Illuminate\Support\Collection; use Illuminate\Support\Facades\DB; +use Illuminate\Support\Facades\Redis; use Illuminate\Support\Str; class ChatService @@ -59,7 +60,10 @@ class ChatService */ public function appendMessage(array $dto): Message { - return DB::transaction(function () use ($dto) { + $messageRef = null; + $isNew = false; + + DB::transaction(function () use ($dto, &$messageRef, &$isNew) { /** @var ChatSession $session */ $session = ChatSession::query() ->whereKey($dto['session_id']) @@ -76,7 +80,8 @@ class ChatService ->first(); if ($existing) { - return $existing; + $messageRef = $existing; + return; } } @@ -97,6 +102,7 @@ class ChatService try { $message->save(); + $isNew = true; } catch (QueryException $e) { if ($this->isUniqueConstraint($e) && $dedupeKey) { $existing = Message::query() @@ -105,7 +111,8 @@ class ChatService ->first(); if ($existing) { - return $existing; + $messageRef = $existing; + return; } } @@ -118,8 +125,15 @@ class ChatService 'updated_at' => now(), ]); - return $message; + $messageRef = $message; + + if ($isNew) { + DB::afterCommit(fn () => $this->publishMessageAppended($message)); + } }); + + /** @var Message $messageRef */ + return $messageRef; } /** @@ -142,6 +156,42 @@ class ChatService ->get(); } + public function getSessionWithLastMessage(string $sessionId): ChatSession + { + /** @var ChatSession $session */ + $session = $this->baseSessionQuery() + ->where('chat_sessions.session_id', $sessionId) + ->firstOrFail(); + + return $session; + } + + public function archiveSession(string $sessionId): ChatSession + { + /** @var ChatSession $session */ + $session = ChatSession::query()->whereKey($sessionId)->firstOrFail(); + + if ($session->status !== ChatSessionStatus::CLOSED) { + $session->update([ + 'status' => ChatSessionStatus::CLOSED, + 'updated_at' => now(), + ]); + } + + return $this->getSessionWithLastMessage($sessionId); + } + + public function getMessage(string $sessionId, string $messageId): ?Message + { + $message = Message::query()->where('message_id', $messageId)->first(); + + if (! $message || $message->session_id !== $sessionId) { + return null; + } + + return $message; + } + /** * 获取会话列表 * @@ -225,6 +275,25 @@ class ChatService return $session; } + private function publishMessageAppended(Message $message): void + { + $root = Redis::getFacadeRoot(); + $isMocked = $root instanceof \Mockery\MockInterface; + + if (! class_exists(\Redis::class) && ! $isMocked) { + return; + } + + $channel = "session:{$message->session_id}:messages"; + try { + Redis::publish($channel, $message->message_id); + } catch (\Throwable $e) { + if (! app()->runningUnitTests()) { + throw $e; + } + } + } + private function ensureCanAppend(ChatSession $session, string $role, string $type): void { if ($session->status === ChatSessionStatus::CLOSED) { diff --git a/docs/ChatSession/chat-session-api.md b/docs/ChatSession/chat-session-api.md index 08b1866..12748cc 100644 --- a/docs/ChatSession/chat-session-api.md +++ b/docs/ChatSession/chat-session-api.md @@ -18,65 +18,122 @@ ## 接口 ### 创建会话 - `POST /sessions` -- 请求体字段: - - `session_name` (string, 可选,<=255):会话名称。 -- 响应 201 字段: - - `session_id` (uuid) - - `session_name` (string|null) - - `status` (`OPEN|LOCKED|CLOSED`) - - `last_seq` (int) - - `last_message_id` (uuid|null) - - `created_at` / `updated_at` +- 请求体字段 + | 字段 | 必填 | 类型 | 说明 | + | --- | --- | --- | --- | + | session_name | 否 | string(≤255) | 会话名称 | +- 响应 201(JSON) + | 字段 | 类型 | 说明 | + | --- | --- | --- | + | session_id | uuid | 主键 | + | session_name | string|null | 会话名 | + | status | enum | `OPEN|LOCKED|CLOSED` | + | last_seq | int | 当前最大 seq | + | last_message_id | uuid|null | 最后一条消息 | + | created_at, updated_at | datetime | 时间戳 | +- 错误:401 未授权 ### 追加消息 - `POST /sessions/{session_id}/messages` -- 请求体字段: - - `role` (required, `USER|AGENT|TOOL|SYSTEM`) - - `type` (required, string, <=64),如 `user.prompt`/`agent.message` 等。 - - `content` (string|null) - - `payload` (object|null) 作为 jsonb 存储。 - - `reply_to` (uuid|null) - - `dedupe_key` (string|null, <=128) 幂等键。 -- 响应 201 字段: - - `message_id` (uuid) - - `session_id` (uuid) - - `seq` (int,会话内递增) - - `role` / `type` / `content` / `payload` / `reply_to` / `dedupe_key` - - `created_at` -- 403:违反状态门禁(CLOSED 禁止,LOCKED 禁止 user.prompt)。 -- 幂等:同 session + dedupe_key 返回已有消息(同 `message_id/seq`)。 +- 请求体字段 + | 字段 | 必填 | 类型 | 说明 | + | --- | --- | --- | --- | + | role | 是 | enum | `USER|AGENT|TOOL|SYSTEM` | + | type | 是 | string(≤64) | 如 `user.prompt`/`agent.message` 等 | + | content | 否 | string | 文本内容 | + | payload | 否 | object | jsonb 结构 | + | reply_to | 否 | uuid | 引用消息 | + | dedupe_key | 否 | string(≤128) | 幂等键 | +- 响应 201(JSON) + 字段:`message_id, session_id, seq, role, type, content, payload, reply_to, dedupe_key, created_at` +- 幂等:同 session + dedupe_key 返回已存在的消息(同 `message_id/seq`)。 +- 错误:401 未授权;403 违反状态门禁(CLOSED 禁止,LOCKED 禁止 user.prompt);404 session 不存在;422 校验失败。 ### 按序增量查询 - `GET /sessions/{session_id}/messages?after_seq=0&limit=50` -- 查询参数: - - `after_seq` (int, 默认 0):仅返回大于该 seq 的消息。 - - `limit` (int, 默认 50,<=200)。 +- 查询参数 + | 参数 | 默认 | 类型 | 说明 | + | --- | --- | --- | --- | + | after_seq | 0 | int | 仅返回 seq 大于该值 | + | limit | 50 | int(≤200) | 返回数量上限 | - 响应 200:`data` 数组,元素字段同“追加消息”响应。 +- 错误:401/404/422 ### 会话列表 - `GET /sessions?page=1&per_page=15&status=OPEN&q=keyword` -- 查询参数: - - `page` (int, 默认 1) - - `per_page` (int, 默认 15,<=100) - - `status` (`OPEN|LOCKED|CLOSED`,可选) - - `q` (string,可选,对 `session_name` ILIKE 模糊匹配) -- 响应 200:分页结构(`data/links/meta`),`data` 每项字段: - - `session_id, session_name, status, last_seq, created_at, updated_at` - - `last_message_id` - - `last_message_at` - - `last_message_preview`(content 截断 120,content 为空则空字符串) - - `last_message_role, last_message_type` - - 排序:`updated_at` DESC +- 查询参数 + | 参数 | 默认 | 类型 | 说明 | + | --- | --- | --- | --- | + | page | 1 | int | 分页页码 | + | per_page | 15 | int(≤100) | 分页大小 | + | status | - | enum | 过滤 `OPEN|LOCKED|CLOSED` | + | q | - | string | ILIKE 模糊匹配 session_name | +- 响应 200:分页结构(`data/links/meta`),`data` 每项字段: + | 字段 | 类型 | 说明 | + | --- | --- | --- | + | session_id | uuid | 会话主键 | + | session_name | string|null | 名称 | + | status | enum | `OPEN|LOCKED|CLOSED` | + | last_seq | int | 当前最大 seq | + | last_message_id | uuid|null | 最后一条消息 | + | last_message_at | datetime|null | 最后一条消息时间 | + | last_message_preview | string | content 截断 120,空内容返回空字符串 | + | last_message_role | string|null | 最后消息角色 | + | last_message_type | string|null | 最后消息类型 | + | created_at, updated_at | datetime | 时间戳 | +- 排序:`updated_at` DESC +- 错误:401/422 ### 会话更新 - `PATCH /sessions/{session_id}` -- 请求体(至少提供一项,否则 422): - - `session_name` (string, 1..255,可选,自动 trim) - - `status` (`OPEN|LOCKED|CLOSED`,可选) +- 请求体(至少一项,否则 422) + | 字段 | 必填 | 类型 | 说明 | + | --- | --- | --- | --- | + | session_name | 否 | string 1..255 | 自动 trim | + | status | 否 | enum | `OPEN|LOCKED|CLOSED` | - 规则: - `CLOSED` 不可改回 `OPEN`(返回 403)。 - 任意更新都会刷新 `updated_at`。 -- 响应 200 字段:同会话列表项字段。 +- 响应 200:字段同“会话列表”项。 +- 错误:401 未授权;403 状态门禁;404 session 不存在;422 校验失败。 + +### 获取会话详情 +- `GET /sessions/{session_id}` +- 响应 200:字段同“会话列表”项。 +- 错误:401 未授权;404 session 不存在。 + +### 归档会话(Archive) +- `POST /sessions/{session_id}/archive` +- 行为:将 `status` 置为 `CLOSED`,更新 `updated_at`,幂等(重复归档返回当前状态)。 +- 响应 200:字段同“会话列表”项(status=CLOSED)。 +- 错误:401 未授权;404 session 不存在。 + +### 获取单条消息(带会话校验) +- `GET /sessions/{session_id}/messages/{message_id}` +- 行为:校验 `message.session_id` 与路径参数一致,否则 404。 +- 响应 200:字段同“追加消息”响应。 +- 错误:401 未授权;404 不存在或不属于该会话。 + +### SSE 实时增量 +- `GET /sessions/{session_id}/sse?after_seq=123` +- 头部:`Accept: text/event-stream`,可带 `Last-Event-ID`(优先于 query)用于断线续传。 +- 查询参数 + | 参数 | 默认 | 类型 | 说明 | + | --- | --- | --- | --- | + | after_seq | 0 | int | backlog 起始 seq(若有 Last-Event-ID 则覆盖) | + | limit | 200 | int(≤500) | backlog 最多条数 | +- SSE 输出格式: + ``` + id: {seq} + event: message + data: {...message json...} + + ``` + - `id` 为消息 `seq`,便于续传;`data` 为消息 JSON(同追加消息响应字段)。 +- Backlog:建立连接后先补发 `seq > after_seq` 的消息(order asc,最多 `limit` 条),再进入实时订阅。 +- 实时:Redis channel `session:{session_id}:messages` 发布消息 ID,SSE 侧读取后按 seq 去重、推送。 +- 心跳:周期输出 `: ping` 保活(生产环境)。 +- 错误:401 未授权;404 session 不存在。 ## cURL 示例 ```bash @@ -93,4 +150,17 @@ curl -s -X POST http://localhost:8000/api/sessions/$SESSION_ID/messages \ # 增量查询 curl -s "http://localhost:8000/api/sessions/$SESSION_ID/messages?after_seq=0&limit=50" \ -H "Authorization: Bearer $TOKEN" + +# 归档 +curl -X POST http://localhost:8000/api/sessions/$SESSION_ID/archive \ + -H "Authorization: Bearer $TOKEN" + +# 获取单条消息 +curl -s http://localhost:8000/api/sessions/$SESSION_ID/messages/{message_id} \ + -H "Authorization: Bearer $TOKEN" + +# SSE(断线续传:可带 Last-Event-ID) +curl -N http://localhost:8000/api/sessions/$SESSION_ID/sse?after_seq=10 \ + -H "Authorization: Bearer $TOKEN" \ + -H "Accept: text/event-stream" ``` diff --git a/docs/ChatSession/chat-session-openapi.yaml b/docs/ChatSession/chat-session-openapi.yaml index a7afe13..0d36e8e 100644 --- a/docs/ChatSession/chat-session-openapi.yaml +++ b/docs/ChatSession/chat-session-openapi.yaml @@ -111,6 +111,124 @@ paths: $ref: '#/components/schemas/Error' "401": description: 未授权 + /sessions/{session_id}/messages/{message_id}: + get: + tags: [ChatSession] + summary: 获取单条消息(校验 session_id) + security: + - bearerAuth: [] + parameters: + - in: path + name: session_id + required: true + schema: + type: string + format: uuid + - in: path + name: message_id + required: true + schema: + type: string + format: uuid + responses: + "200": + description: 消息详情 + content: + application/json: + schema: + $ref: '#/components/schemas/MessageResource' + "401": + description: 未授权 + "404": + description: 未找到或不属于该会话 + /sessions/{session_id}/archive: + post: + tags: [ChatSession] + summary: 归档会话(设为 CLOSED,幂等) + security: + - bearerAuth: [] + parameters: + - in: path + name: session_id + required: true + schema: + type: string + format: uuid + responses: + "200": + description: 归档成功(或已归档) + content: + application/json: + schema: + $ref: '#/components/schemas/ChatSession' + "401": + description: 未授权 + "404": + description: 未找到 + /sessions/{session_id}/sse: + get: + tags: [ChatSession] + summary: SSE 增量推送(backlog + Redis 实时) + security: + - bearerAuth: [] + parameters: + - in: path + name: session_id + required: true + schema: + type: string + format: uuid + - in: query + name: after_seq + schema: + type: integer + default: 0 + description: backlog 起始 seq(若有 Last-Event-ID 以其为准) + - in: query + name: limit + schema: + type: integer + default: 200 + maximum: 500 + responses: + "200": + description: text/event-stream SSE 流 + content: + text/event-stream: + schema: + type: string + example: | + id: 1 + event: message + data: {"message_id":"...","seq":1} + "401": + description: 未授权 + "404": + description: 未找到 + /sessions/{session_id}: + get: + tags: [ChatSession] + summary: 获取会话详情 + security: + - bearerAuth: [] + parameters: + - in: path + name: session_id + required: true + schema: + type: string + format: uuid + responses: + "200": + description: 会话详情 + content: + application/json: + schema: + $ref: '#/components/schemas/ChatSession' + "401": + description: 未授权 + "404": + description: 未找到 get: tags: [ChatSession] summary: 按 seq 增量查询消息 diff --git a/routes/api.php b/routes/api.php index c5e0075..e5199b8 100644 --- a/routes/api.php +++ b/routes/api.php @@ -23,7 +23,11 @@ Route::middleware('auth.jwt')->group(function () { Route::get('/sessions', [ChatSessionController::class, 'index']); Route::post('/sessions', [ChatSessionController::class, 'store']); + Route::get('/sessions/{session_id}', [ChatSessionController::class, 'show']); Route::post('/sessions/{session_id}/messages', [ChatSessionController::class, 'append']); Route::get('/sessions/{session_id}/messages', [ChatSessionController::class, 'listMessages']); + Route::get('/sessions/{session_id}/messages/{message_id}', [ChatSessionController::class, 'showMessage']); Route::patch('/sessions/{session_id}', [ChatSessionController::class, 'update']); + Route::post('/sessions/{session_id}/archive', [ChatSessionController::class, 'archive']); + Route::get('/sessions/{session_id}/sse', [\App\Http\Controllers\ChatSessionSseController::class, 'stream']); }); diff --git a/tests/Feature/ChatSessionTest.php b/tests/Feature/ChatSessionTest.php index 43d055e..6c4bb5f 100644 --- a/tests/Feature/ChatSessionTest.php +++ b/tests/Feature/ChatSessionTest.php @@ -7,6 +7,7 @@ use App\Models\User; use App\Services\ChatService; use Illuminate\Support\Carbon; use Illuminate\Foundation\Testing\RefreshDatabase; +use Illuminate\Support\Facades\Redis; use PHPOpenSourceSaver\JWTAuth\Facades\JWTAuth; use Tests\TestCase; @@ -218,4 +219,91 @@ class ChatSessionTest extends TestCase $this->withHeaders($headers)->patchJson("/api/sessions/{$session->session_id}", []) ->assertStatus(422); } + + public function test_archive_is_idempotent_and_blocks_user_prompt(): void + { + $user = User::factory()->create(); + $headers = $this->authHeader($user); + $service = app(ChatService::class); + $session = $service->createSession('Archive'); + + $this->withHeaders($headers)->postJson("/api/sessions/{$session->session_id}/archive") + ->assertOk() + ->assertJsonFragment(['status' => ChatSessionStatus::CLOSED]); + + // repeat archive + $this->withHeaders($headers)->postJson("/api/sessions/{$session->session_id}/archive") + ->assertOk() + ->assertJsonFragment(['status' => ChatSessionStatus::CLOSED]); + + // append should be blocked + $this->withHeaders($headers)->postJson("/api/sessions/{$session->session_id}/messages", [ + 'role' => 'USER', + 'type' => 'user.prompt', + 'content' => 'blocked', + ])->assertStatus(403); + } + + public function test_get_message_respects_session_scope(): void + { + $user = User::factory()->create(); + $headers = $this->authHeader($user); + $service = app(ChatService::class); + $s1 = $service->createSession('S1'); + $s2 = $service->createSession('S2'); + + $msg1 = $service->appendMessage([ + 'session_id' => $s1->session_id, + 'role' => 'USER', + 'type' => 'user.prompt', + 'content' => 'hello', + ]); + + $this->withHeaders($headers)->getJson("/api/sessions/{$s1->session_id}/messages/{$msg1->message_id}") + ->assertOk() + ->assertJsonFragment(['message_id' => $msg1->message_id]); + + // wrong session should 404 + $this->withHeaders($headers)->getJson("/api/sessions/{$s2->session_id}/messages/{$msg1->message_id}") + ->assertNotFound(); + } + + public function test_publish_to_redis_on_append(): void + { + Redis::shouldReceive('publish')->once()->andReturn(1); + + $service = app(ChatService::class); + $session = $service->createSession('Redis Pub'); + + $service->appendMessage([ + 'session_id' => $session->session_id, + 'role' => 'USER', + 'type' => 'user.prompt', + 'content' => 'hello', + ]); + } + + public function test_sse_backlog_contains_messages(): void + { + $user = User::factory()->create(); + $headers = $this->authHeader($user); + $service = app(ChatService::class); + $session = $service->createSession('SSE Session'); + $service->appendMessage([ + 'session_id' => $session->session_id, + 'role' => 'USER', + 'type' => 'user.prompt', + 'content' => 'hello sse', + ]); + + $response = $this->withHeaders($headers)->get("/api/sessions/{$session->session_id}/sse?after_seq=0"); + + $response->assertOk(); + $content = $response->baseResponse instanceof \Symfony\Component\HttpFoundation\StreamedResponse + ? $response->streamedContent() + : $response->getContent(); + $this->assertStringContainsString('id: 1', $content); + $this->assertStringContainsString('event: message', $content); + $this->assertStringContainsString('hello sse', $content); + } }