From dcbd0338e6914175ad7d52d29c9c2104f7439aeb Mon Sep 17 00:00:00 2001 From: ROOG Date: Fri, 19 Dec 2025 12:53:53 +0800 Subject: [PATCH] =?UTF-8?q?main:=20=E4=BC=98=E5=8C=96=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E5=A4=84=E7=90=86=E9=80=BB=E8=BE=91=E4=B8=8E=20Redis=20?= =?UTF-8?q?=E5=8F=91=E5=B8=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 调整消息解析流程,支持 JSON 解码与模型实例化 - 增加 `appendMessage` 方法的保存控制参数 - 修复因保存控制导致的重复发布问题 - 优化 Redis 发布逻辑,支持消息内容推送 - 更新注释与待优化标记,提升代码可读性 --- app/Http/Controllers/ChatSessionSseController.php | 4 +++- app/Services/ChatService.php | 15 ++++++++++----- app/Services/OutputSink.php | 2 +- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/app/Http/Controllers/ChatSessionSseController.php b/app/Http/Controllers/ChatSessionSseController.php index d35bac1..106af39 100644 --- a/app/Http/Controllers/ChatSessionSseController.php +++ b/app/Http/Controllers/ChatSessionSseController.php @@ -3,6 +3,7 @@ namespace App\Http\Controllers; use App\Http\Resources\MessageResource; +use App\Models\Message; use App\Services\ChatService; use Illuminate\Http\Request; use Illuminate\Http\Response; @@ -72,7 +73,8 @@ class ChatSessionSseController extends Controller return; } - $msg = $this->service->getMessage($sessionId, $payload); + $msg = json_decode($payload, true, 512, JSON_THROW_ON_ERROR); + $msg = new Message($msg); if ($msg && $msg->seq > $lastSentSeq) { if ($msg->seq > $lastSentSeq + 1) { $this->sendBacklog($sessionId, $lastSentSeq, $limit); diff --git a/app/Services/ChatService.php b/app/Services/ChatService.php index 7f48501..227a3ec 100644 --- a/app/Services/ChatService.php +++ b/app/Services/ChatService.php @@ -59,13 +59,13 @@ class ChatService * @param bool|null $wasDeduped 是否发生了去重(可选,按引用返回) * @return Message 返回成功追加的消息实例。如果存在去重键并已存在重复消息,则返回现有的消息。 */ - public function appendMessage(array $dto, ?bool &$wasDeduped = null): Message + public function appendMessage(array $dto, ?bool &$wasDeduped = null, bool $save = true): Message { $messageRef = null; $isNew = false; $wasDeduped = false; - DB::transaction(function () use ($dto, &$messageRef, &$isNew, &$wasDeduped) { + DB::transaction(function () use ($dto, &$messageRef, &$isNew, &$wasDeduped, $save) { /** @var ChatSession $session */ $session = ChatSession::query() ->whereKey($dto['session_id']) @@ -104,7 +104,9 @@ class ChatService ]); try { - $message->save(); + if ($save) { + $message->save(); + } $isNew = true; } catch (QueryException $e) { if ($this->isUniqueConstraint($e) && $dedupeKey) { @@ -131,8 +133,10 @@ class ChatService $messageRef = $message; - if ($isNew) { + if ($isNew && $save) { DB::afterCommit(fn () => $this->publishMessageAppended($message)); + } else { + $this->publishMessageAppended($message); } }); @@ -290,7 +294,8 @@ class ChatService $channel = "session:{$message->session_id}:messages"; try { - Redis::publish($channel, $message->message_id); + //todo::优化这里。 + Redis::publish($channel, json_encode($message->toArray(), JSON_UNESCAPED_UNICODE|JSON_INVALID_UTF8_IGNORE)); } catch (\Throwable $e) { logger()->warning('Redis publish failed', [ 'session_id' => $message->session_id, diff --git a/app/Services/OutputSink.php b/app/Services/OutputSink.php index 44294f4..4602faa 100644 --- a/app/Services/OutputSink.php +++ b/app/Services/OutputSink.php @@ -44,7 +44,7 @@ class OutputSink 'delta_index' => $deltaIndex, ]), 'dedupe_key' => $dedupeKey, - ]); + ],$wasDupe,false); } /**