main: 扩展 Agent Run 调度与队列功能

- 增加 Agent Run MVP-0,包括 RunDispatcher 和 AgentRunJob
- 优化队列配置,支持 Redis 队列驱动,添加 Horizon 容器
- 更新 Docker 配置,细化角色分工,新增 Horizon 配置
- 增加测试任务 `TestJob`,扩展队列使用示例
- 更新 OpenAPI 规范,添加 Agent Run 相关接口及示例
- 编写文档,详细描述 Agent Run 流程与 MVP-0 功能
- 优化相关服务与文档,支持队列与异步运行
This commit is contained in:
2025-12-17 02:39:31 +08:00
parent dafa8f6b06
commit c55534ad20
42 changed files with 2596 additions and 217 deletions

View File

@@ -9,12 +9,16 @@ use App\Http\Requests\UpdateSessionRequest;
use App\Http\Resources\ChatSessionResource;
use App\Http\Resources\MessageResource;
use App\Services\ChatService;
use App\Services\RunDispatcher;
use Illuminate\Http\JsonResponse;
use Illuminate\Http\Request;
class ChatSessionController extends Controller
{
public function __construct(private readonly ChatService $service)
public function __construct(
private readonly ChatService $service,
private readonly RunDispatcher $runDispatcher,
)
{
}
@@ -45,6 +49,10 @@ class ChatSessionController extends Controller
'session_id' => $sessionId,
...$request->validated(),
]);
if ($message->role === 'USER' && $message->type === 'user.prompt') {
$this->runDispatcher->dispatchForPrompt($sessionId, $message->message_id);
}
} catch (ChatSessionStatusException $e) {
return response()->json(['message' => $e->getMessage()], 403);
}

View File

@@ -24,7 +24,12 @@ class ChatSessionSseController extends Controller
$limit = (int) $request->query('limit', 200);
$limit = $limit > 0 && $limit <= 500 ? $limit : 200;
if (app()->runningUnitTests() || app()->environment('testing') || ! class_exists(\Redis::class)) {
$useBacklogOnly = app()->runningUnitTests()
|| app()->environment('testing')
|| defined('PHPUNIT_COMPOSER_INSTALL')
|| ! class_exists(\Redis::class);
if ($useBacklogOnly) {
$lastSentSeq = $afterSeq;
ob_start();
$this->sendBacklog($sessionId, $lastSentSeq, $limit, false);
@@ -42,41 +47,75 @@ class ChatSessionSseController extends Controller
$this->sendBacklog($sessionId, $lastSentSeq, $limit);
$redis = Redis::connection()->client();
if (method_exists($redis, 'setOption')) {
$redis->setOption(\Redis::OPT_READ_TIMEOUT, 5);
}
$channel = "session:{$sessionId}:messages";
$pubSub = $redis->pubSubLoop();
$pubSub->subscribe($channel);
$lastPing = time();
foreach ($pubSub as $message) {
if ($message->kind === 'subscribe') {
continue;
try {
$redis = Redis::connection()->client();
if (method_exists($redis, 'setOption')) {
$redis->setOption(\Redis::OPT_READ_TIMEOUT, 5);
}
if (connection_aborted()) {
$pubSub->unsubscribe();
break;
}
$channel = "session:{$sessionId}:messages";
$lastPing = time();
logger()->info('sse open');
if (method_exists($redis, 'pubSubLoop')) {
$pubSub = $redis->pubSubLoop();
$pubSub->subscribe($channel);
$payloadId = $message->payload ?? null;
if ($payloadId) {
$msg = $this->service->getMessage($sessionId, $payloadId);
if ($msg && $msg->seq > $lastSentSeq) {
$this->emitMessage($msg);
$lastSentSeq = $msg->seq;
foreach ($pubSub as $message) {
if ($message->kind === 'subscribe') {
continue;
}
if (connection_aborted()) {
$pubSub->unsubscribe();
break;
}
$payloadId = $message->payload ?? null;
if ($payloadId) {
$msg = $this->service->getMessage($sessionId, $payloadId);
if ($msg && $msg->seq > $lastSentSeq) {
$this->emitMessage($msg);
$lastSentSeq = $msg->seq;
}
}
if (time() - $lastPing >= 180) {
logger()->info('ping: sent'.$sessionId);
echo ": ping\n\n";
@ob_flush();
@flush();
$lastPing = time();
}
}
}
logger()->info('close: sent'.$sessionId);
unset($pubSub);
} else {
// Fallback for Redis drivers without pubSubLoop (older phpredis)
$redis->subscribe([$channel], function ($redisInstance, $chan, $payload) use (&$lastSentSeq, $sessionId) {
if (connection_aborted()) {
$redisInstance->unsubscribe([$chan]);
return;
}
if (time() - $lastPing >= 20) {
echo ": ping\n\n";
@ob_flush();
@flush();
$lastPing = time();
if (! $payload) {
return;
}
$msg = $this->service->getMessage($sessionId, $payload);
if ($msg && $msg->seq > $lastSentSeq) {
$this->emitMessage($msg);
$lastSentSeq = $msg->seq;
}
});
}
} catch (\RedisException $exception) {
logger()->warning('SSE redis subscription failed', [
'session_id' => $sessionId,
'message' => $exception->getMessage(),
]);
echo ": redis-error\n\n";
@ob_flush();
@flush();
}
});

View File

@@ -0,0 +1,28 @@
<?php
namespace App\Http\Controllers;
use App\Http\Requests\DispatchRunRequest;
use App\Jobs\TestJob;
use App\Services\RunDispatcher;
use Illuminate\Http\JsonResponse;
class RunController extends Controller
{
public function __construct(private readonly RunDispatcher $dispatcher)
{
}
public function store(string $sessionId, DispatchRunRequest $request): JsonResponse
{
$runId = $this->dispatcher->dispatchForPrompt($sessionId, $request->validated()['trigger_message_id']);
return response()->json(['run_id' => $runId], 201);
}
public function test()
{
$job = TestJob::dispatch();
unset($job);
}
}

View File

@@ -0,0 +1,23 @@
<?php
namespace App\Http\Requests;
use Illuminate\Foundation\Http\FormRequest;
class DispatchRunRequest extends FormRequest
{
public function authorize(): bool
{
return true;
}
/**
* @return array<string, mixed>
*/
public function rules(): array
{
return [
'trigger_message_id' => ['required', 'uuid'],
];
}
}