Files
ars-backend/app/Services/RunDispatcher.php
ROOG c55534ad20 main: 扩展 Agent Run 调度与队列功能
- 增加 Agent Run MVP-0,包括 RunDispatcher 和 AgentRunJob
- 优化队列配置,支持 Redis 队列驱动,添加 Horizon 容器
- 更新 Docker 配置,细化角色分工,新增 Horizon 配置
- 增加测试任务 `TestJob`,扩展队列使用示例
- 更新 OpenAPI 规范,添加 Agent Run 相关接口及示例
- 编写文档,详细描述 Agent Run 流程与 MVP-0 功能
- 优化相关服务与文档,支持队列与异步运行
2025-12-17 02:39:31 +08:00

61 lines
1.8 KiB
PHP

<?php
namespace App\Services;
use App\Jobs\AgentRunJob;
use App\Models\Message;
use Illuminate\Database\Eloquent\ModelNotFoundException;
use Illuminate\Support\Str;
class RunDispatcher
{
public function __construct(
private readonly ChatService $chatService,
private readonly OutputSink $outputSink,
) {
}
/**
* @throws ModelNotFoundException
*/
public function dispatchForPrompt(string $sessionId, string $triggerMessageId): string
{
$triggerMessage = $this->chatService->getMessage($sessionId, $triggerMessageId);
if (! $triggerMessage) {
throw (new ModelNotFoundException())->setModel(Message::class, [$triggerMessageId]);
}
$existingForTrigger = Message::query()
->where('session_id', $sessionId)
->where('type', 'run.status')
->where('payload->trigger_message_id', $triggerMessageId)
->orderByDesc('seq')
->first();
if ($existingForTrigger && ($existingForTrigger->payload['run_id'] ?? null)) {
return $existingForTrigger->payload['run_id'];
}
$latestStatus = Message::query()
->where('session_id', $sessionId)
->where('type', 'run.status')
->orderByDesc('seq')
->first();
if ($latestStatus && ($latestStatus->payload['status'] ?? null) === 'RUNNING' && ($latestStatus->payload['run_id'] ?? null)) {
return $latestStatus->payload['run_id'];
}
$runId = (string) Str::uuid();
$this->outputSink->appendRunStatus($sessionId, $runId, 'RUNNING', [
'trigger_message_id' => $triggerMessageId,
'dedupe_key' => 'run:trigger:'.$triggerMessageId,
]);
dispatch(new AgentRunJob($sessionId, $runId));
return $runId;
}
}