Files
ars-backend/app/Services/OutputSink.php
ROOG e956df9daa main: 增强工具功能与消息处理
- 添加 `FileReadTool`,支持文件内容读取与安全验证
- 引入 `hasToolMessages` 逻辑,优化工具历史上下文处理
- 修改工具选项逻辑,支持禁用工具时的动态调整
- 增加消息序列化逻辑,优化 Redis 序列管理与数据同步
- 扩展测试覆盖,验证序列化与工具调用场景
- 增强 Docker Compose 脚本,支持应用重置与日志清理
- 调整工具调用超时设置,提升运行时用户体验
2025-12-24 00:55:54 +08:00

177 lines
6.1 KiB
PHP
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<?php
namespace App\Services;
use App\Models\Message;
use App\Services\MessageSequence;
use App\Services\Tool\ToolCall;
use App\Services\Tool\ToolResult;
class OutputSink
{
public function __construct(
private readonly ChatService $chatService,
private readonly MessageSequence $messageSequence,
)
{
}
/**
* @param array<string, mixed> $meta
*/
public function appendAgentMessage(string $sessionId, string $runId, string $content, array $meta = [], ?string $dedupeKey = null): Message
{
$dedupeKey = $dedupeKey ?? "run:{$runId}:agent:message";
return $this->chatService->appendMessage([
'session_id' => $sessionId,
'role' => Message::ROLE_AGENT,
'type' => 'agent.message',
'content' => $content,
'payload' => array_merge($meta, ['run_id' => $runId]),
'dedupe_key' => $dedupeKey,
]);
}
/**
* 追加 Agent 流式文本增量(仅用于 SSE 推送,不落库)。
*
* message.delta 消息只用于实时流式推送,不需要持久化到数据库。
* 最终的完整回复会通过 appendAgentMessage() 落库。
*
* @param array<string, mixed> $meta
*/
public function appendAgentDelta(string $sessionId, string $runId, string $content, int $deltaIndex, array $meta = []): void
{
$session = $this->chatService->getSession($sessionId);
// 1. 创建临时 Message 对象(不保存到数据库)
$message = new Message([
'message_id' => (string) \Illuminate\Support\Str::uuid(),
'session_id' => $session->session_id,
'role' => Message::ROLE_AGENT,
'type' => 'message.delta',
'content' => $content,
'payload' => array_merge($meta, [
'run_id' => $runId,
'delta_index' => $deltaIndex,
]),
'dedupe_key' => "run:{$runId}:agent:delta:{$deltaIndex}",
'seq' => $this->messageSequence->nextForSession($session),
'created_at' => now(),
]);
// 2. 仅发布 Redis 事件,供 SSE 实时推送
$this->publishDeltaMessage($message);
}
/**
* 发布 delta 消息到 Redis仅用于 SSE 推送)。
*
* 此方法不保存消息到数据库,只发布事件供 SSE 客户端接收。
*/
private function publishDeltaMessage(Message $message): void
{
$root = \Illuminate\Support\Facades\Redis::getFacadeRoot();
$isMocked = $root instanceof \Mockery\MockInterface;
// 如果 Redis 不可用(测试环境),直接返回
if (!class_exists(\Redis::class) && !$isMocked) {
return;
}
$channel = "session:{$message->session_id}:messages";
try {
\Illuminate\Support\Facades\Redis::publish(
$channel,
json_encode($message->toArray(), JSON_UNESCAPED_UNICODE | JSON_INVALID_UTF8_IGNORE)
);
} catch (\Throwable $e) {
logger()->warning('Redis publish failed for delta message', [
'session_id' => $message->session_id,
'run_id' => $message->payload['run_id'] ?? null,
'delta_index' => $message->payload['delta_index'] ?? null,
'error' => $e->getMessage(),
]);
}
}
/**
* @param array<string, mixed> $meta
*/
public function appendRunStatus(string $sessionId, string $runId, string $status, array $meta = [], ?bool &$wasDeduped = null): Message
{
$dedupeKey = $meta['dedupe_key'] ?? null;
unset($meta['dedupe_key']);
return $this->chatService->appendMessage([
'session_id' => $sessionId,
'role' => Message::ROLE_SYSTEM,
'type' => 'run.status',
'content' => null,
'payload' => array_merge($meta, [
'run_id' => $runId,
'status' => $status,
]),
'dedupe_key' => $dedupeKey,
], $wasDeduped);
}
/**
* @param array<string, mixed> $meta
*/
public function appendError(string $sessionId, string $runId, string $code, string $message, array $meta = [], ?string $dedupeKey = null): Message
{
return $this->chatService->appendMessage([
'session_id' => $sessionId,
'role' => Message::ROLE_SYSTEM,
'type' => 'error',
'content' => $code,
'payload' => array_merge($meta, [
'run_id' => $runId,
'message' => $message,
]),
'dedupe_key' => $dedupeKey,
]);
}
public function appendToolCall(string $sessionId, ToolCall $toolCall): Message
{
return $this->chatService->appendMessage([
'session_id' => $sessionId,
'role' => Message::ROLE_AGENT,
'type' => 'tool.call',
'content' => $toolCall->rawArguments ?: json_encode($toolCall->arguments, JSON_UNESCAPED_UNICODE),
'payload' => [
'run_id' => $toolCall->parentRunId,
'tool_run_id' => $toolCall->runId,
'tool_call_id' => $toolCall->toolCallId,
'name' => $toolCall->name,
'arguments' => $toolCall->arguments,
],
'dedupe_key' => "run:{$toolCall->parentRunId}:tool_call:{$toolCall->toolCallId}",
]);
}
public function appendToolResult(string $sessionId, ToolResult $result): Message
{
return $this->chatService->appendMessage([
'session_id' => $sessionId,
'role' => Message::ROLE_TOOL,
'type' => 'tool.result',
'content' => $result->output,
'payload' => [
'run_id' => $result->runId,
'parent_run_id' => $result->parentRunId,
'tool_call_id' => $result->toolCallId,
'name' => $result->name,
'status' => $result->status,
'error' => $result->error,
'truncated' => $result->truncated,
],
'dedupe_key' => "run:{$result->runId}:tool_result",
]);
}
}