- 添加 `disable_tools` 选项,支持达到调用上限后禁用工具 - 增加工具调用与结果的日志记录,提升调试信息 - 优化上下文构建,记录已加载的消息信息 - 完善流式消息推送逻辑,支持 `message.delta` 类型
148 lines
5.1 KiB
PHP
148 lines
5.1 KiB
PHP
<?php
|
||
|
||
namespace App\Http\Controllers;
|
||
|
||
use App\Http\Resources\MessageResource;
|
||
use App\Models\Message;
|
||
use App\Services\ChatService;
|
||
use Illuminate\Http\Request;
|
||
use Illuminate\Http\Response;
|
||
use Illuminate\Support\Facades\Redis;
|
||
use Symfony\Component\HttpFoundation\StreamedResponse;
|
||
|
||
class ChatSessionSseController extends Controller
|
||
{
|
||
public function __construct(private readonly ChatService $service)
|
||
{
|
||
}
|
||
|
||
public function stream(Request $request, string $sessionId): Response|StreamedResponse
|
||
{
|
||
set_time_limit(360);
|
||
$this->service->getSession($sessionId); // ensure exists
|
||
|
||
$lastEventId = $request->header('Last-Event-ID');
|
||
$afterSeq = is_numeric($lastEventId) ? (int) $lastEventId : (int) $request->query('after_seq', 0);
|
||
$limit = (int) $request->query('limit', 200);
|
||
$limit = $limit > 0 && $limit <= 500 ? $limit : 200;
|
||
|
||
$useBacklogOnly = app()->runningUnitTests()
|
||
|| app()->environment('testing')
|
||
|| defined('PHPUNIT_COMPOSER_INSTALL')
|
||
|| ! class_exists(\Redis::class);
|
||
|
||
if ($useBacklogOnly) {
|
||
$lastSentSeq = $afterSeq;
|
||
ob_start();
|
||
$this->sendBacklog($sessionId, $lastSentSeq, $limit, false);
|
||
$content = ob_get_clean() ?: '';
|
||
|
||
return response($content, 200, [
|
||
'Content-Type' => 'text/event-stream',
|
||
'Cache-Control' => 'no-cache',
|
||
'X-Accel-Buffering' => 'no',
|
||
]);
|
||
}
|
||
|
||
$response = new StreamedResponse(function () use ($sessionId, $afterSeq, $limit) {
|
||
$lastSentSeq = $afterSeq;
|
||
$lastHeartbeat = microtime(true);
|
||
|
||
$this->sendBacklog($sessionId, $lastSentSeq, $limit);
|
||
$this->sendHeartbeat($lastHeartbeat);
|
||
|
||
try {
|
||
$redis = Redis::connection()->client();
|
||
if (method_exists($redis, 'setOption')) {
|
||
$redis->setOption(\Redis::OPT_READ_TIMEOUT, 360);
|
||
}
|
||
|
||
$channel = "session:{$sessionId}:messages";
|
||
logger('sse open');
|
||
// Fallback for Redis drivers without pubSubLoop (older phpredis)
|
||
$redis->subscribe([$channel], function ($redisInstance, $chan, $payload) use (&$lastSentSeq, $sessionId, $limit, &$lastHeartbeat) {
|
||
if (connection_aborted()) {
|
||
logger('sse aborted');
|
||
$redisInstance->unsubscribe([$chan]);
|
||
return;
|
||
}
|
||
|
||
$this->sendHeartbeat($lastHeartbeat);
|
||
|
||
if (! $payload) {
|
||
return;
|
||
}
|
||
|
||
$msg = json_decode($payload, true, 512, JSON_THROW_ON_ERROR);
|
||
$msg = new Message($msg);
|
||
|
||
// message.delta 不持久化,seq=0,直接推送
|
||
if ($msg && $msg->type === 'message.delta') {
|
||
$this->emitMessage($msg, true);
|
||
return;
|
||
}
|
||
|
||
if ($msg && $msg->seq > $lastSentSeq) {
|
||
if ($msg->seq > $lastSentSeq + 1) {
|
||
$this->sendBacklog($sessionId, $lastSentSeq, $limit);
|
||
}
|
||
|
||
if ($msg->seq > $lastSentSeq) {
|
||
$this->emitMessage($msg);
|
||
$lastSentSeq = $msg->seq;
|
||
}
|
||
}
|
||
});
|
||
} catch (\RedisException $exception) {
|
||
logger()->warning('SSE redis subscription failed', [
|
||
'session_id' => $sessionId,
|
||
'message' => $exception->getMessage(),
|
||
'code' => $exception->getCode(),
|
||
]);
|
||
echo ": redis-error\n\n";
|
||
@ob_flush();
|
||
@flush();
|
||
}
|
||
});
|
||
|
||
$response->headers->set('Content-Type', 'text/event-stream');
|
||
$response->headers->set('Cache-Control', 'no-cache');
|
||
$response->headers->set('X-Accel-Buffering', 'no');
|
||
|
||
return $response;
|
||
}
|
||
|
||
private function sendBacklog(string $sessionId, int &$lastSentSeq, int $limit, bool $flush = true): void
|
||
{
|
||
$backlog = $this->service->listMessagesBySeq($sessionId, $lastSentSeq, $limit);
|
||
foreach ($backlog as $message) {
|
||
$this->emitMessage($message, $flush);
|
||
$lastSentSeq = $message->seq;
|
||
}
|
||
}
|
||
|
||
private function emitMessage($message, bool $flush = true): void
|
||
{
|
||
$payload = (new MessageResource($message))->resolve();
|
||
echo 'id: '.$message->seq."\n";
|
||
echo "event: message\n";
|
||
echo 'data: '.json_encode($payload, JSON_UNESCAPED_UNICODE)."\n\n";
|
||
if ($flush) {
|
||
@ob_flush();
|
||
@flush();
|
||
}
|
||
}
|
||
|
||
private function sendHeartbeat(float &$lastHeartbeat, int $intervalSeconds = 15): void
|
||
{
|
||
if ((microtime(true) - $lastHeartbeat) < $intervalSeconds) {
|
||
return;
|
||
}
|
||
|
||
echo ": ping\n\n";
|
||
@ob_flush();
|
||
@flush();
|
||
$lastHeartbeat = microtime(true);
|
||
}
|
||
}
|