Files
ars-backend/app/Http/Controllers/ChatSessionSseController.php
ROOG 78875ec3eb main: 增强工具调用与上下文日志记录
- 添加 `disable_tools` 选项,支持达到调用上限后禁用工具
- 增加工具调用与结果的日志记录,提升调试信息
- 优化上下文构建,记录已加载的消息信息
- 完善流式消息推送逻辑,支持 `message.delta` 类型
2025-12-22 19:10:44 +08:00

148 lines
5.1 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<?php
namespace App\Http\Controllers;
use App\Http\Resources\MessageResource;
use App\Models\Message;
use App\Services\ChatService;
use Illuminate\Http\Request;
use Illuminate\Http\Response;
use Illuminate\Support\Facades\Redis;
use Symfony\Component\HttpFoundation\StreamedResponse;
class ChatSessionSseController extends Controller
{
public function __construct(private readonly ChatService $service)
{
}
public function stream(Request $request, string $sessionId): Response|StreamedResponse
{
set_time_limit(360);
$this->service->getSession($sessionId); // ensure exists
$lastEventId = $request->header('Last-Event-ID');
$afterSeq = is_numeric($lastEventId) ? (int) $lastEventId : (int) $request->query('after_seq', 0);
$limit = (int) $request->query('limit', 200);
$limit = $limit > 0 && $limit <= 500 ? $limit : 200;
$useBacklogOnly = app()->runningUnitTests()
|| app()->environment('testing')
|| defined('PHPUNIT_COMPOSER_INSTALL')
|| ! class_exists(\Redis::class);
if ($useBacklogOnly) {
$lastSentSeq = $afterSeq;
ob_start();
$this->sendBacklog($sessionId, $lastSentSeq, $limit, false);
$content = ob_get_clean() ?: '';
return response($content, 200, [
'Content-Type' => 'text/event-stream',
'Cache-Control' => 'no-cache',
'X-Accel-Buffering' => 'no',
]);
}
$response = new StreamedResponse(function () use ($sessionId, $afterSeq, $limit) {
$lastSentSeq = $afterSeq;
$lastHeartbeat = microtime(true);
$this->sendBacklog($sessionId, $lastSentSeq, $limit);
$this->sendHeartbeat($lastHeartbeat);
try {
$redis = Redis::connection()->client();
if (method_exists($redis, 'setOption')) {
$redis->setOption(\Redis::OPT_READ_TIMEOUT, 360);
}
$channel = "session:{$sessionId}:messages";
logger('sse open');
// Fallback for Redis drivers without pubSubLoop (older phpredis)
$redis->subscribe([$channel], function ($redisInstance, $chan, $payload) use (&$lastSentSeq, $sessionId, $limit, &$lastHeartbeat) {
if (connection_aborted()) {
logger('sse aborted');
$redisInstance->unsubscribe([$chan]);
return;
}
$this->sendHeartbeat($lastHeartbeat);
if (! $payload) {
return;
}
$msg = json_decode($payload, true, 512, JSON_THROW_ON_ERROR);
$msg = new Message($msg);
// message.delta 不持久化seq=0直接推送
if ($msg && $msg->type === 'message.delta') {
$this->emitMessage($msg, true);
return;
}
if ($msg && $msg->seq > $lastSentSeq) {
if ($msg->seq > $lastSentSeq + 1) {
$this->sendBacklog($sessionId, $lastSentSeq, $limit);
}
if ($msg->seq > $lastSentSeq) {
$this->emitMessage($msg);
$lastSentSeq = $msg->seq;
}
}
});
} catch (\RedisException $exception) {
logger()->warning('SSE redis subscription failed', [
'session_id' => $sessionId,
'message' => $exception->getMessage(),
'code' => $exception->getCode(),
]);
echo ": redis-error\n\n";
@ob_flush();
@flush();
}
});
$response->headers->set('Content-Type', 'text/event-stream');
$response->headers->set('Cache-Control', 'no-cache');
$response->headers->set('X-Accel-Buffering', 'no');
return $response;
}
private function sendBacklog(string $sessionId, int &$lastSentSeq, int $limit, bool $flush = true): void
{
$backlog = $this->service->listMessagesBySeq($sessionId, $lastSentSeq, $limit);
foreach ($backlog as $message) {
$this->emitMessage($message, $flush);
$lastSentSeq = $message->seq;
}
}
private function emitMessage($message, bool $flush = true): void
{
$payload = (new MessageResource($message))->resolve();
echo 'id: '.$message->seq."\n";
echo "event: message\n";
echo 'data: '.json_encode($payload, JSON_UNESCAPED_UNICODE)."\n\n";
if ($flush) {
@ob_flush();
@flush();
}
}
private function sendHeartbeat(float &$lastHeartbeat, int $intervalSeconds = 15): void
{
if ((microtime(true) - $lastHeartbeat) < $intervalSeconds) {
return;
}
echo ": ping\n\n";
@ob_flush();
@flush();
$lastHeartbeat = microtime(true);
}
}