tries = (int) config('agent.job.tries', 1); $this->timeout = (int) config('agent.job.timeout_seconds', 120); $this->backoff = (int) config('agent.job.backoff_seconds', 5); } public function handle(RunLoop $loop, OutputSink $sink, CancelChecker $cancelChecker): void { try { logger("Running run {$this->runId} for session {$this->sessionId}"); $loop->run($this->sessionId, $this->runId); } catch (\Throwable $e) { logger("Running error {$this->runId} for session {$this->sessionId}"); logger("error message:",[$e->getMessage(),$e->getTraceAsString()]); $errorCode = $e instanceof ProviderException ? $e->errorCode : 'run.failed'; $dedupeKey = $e instanceof ProviderException ? "run:{$this->runId}:error:provider" : "run:{$this->runId}:error:job"; $sink->appendError($this->sessionId, $this->runId, $errorCode, $e->getMessage(), [], $dedupeKey); $sink->appendRunStatus($this->sessionId, $this->runId, 'FAILED', [ 'error' => $e->getMessage(), 'dedupe_key' => "run:{$this->runId}:status:FAILED", ]); throw $e; } finally { $latestStatus = Message::query() ->where('session_id', $this->sessionId) ->where('type', 'run.status') ->whereRaw("payload->>'run_id' = ?", [$this->runId]) ->orderByDesc('seq') ->first(); $status = $latestStatus ? ($latestStatus->payload['status'] ?? null) : null; if ($status === 'RUNNING' || ! $status) { if ($cancelChecker->isCanceled($this->sessionId, $this->runId)) { $sink->appendRunStatus($this->sessionId, $this->runId, 'CANCELED', [ 'dedupe_key' => "run:{$this->runId}:status:CANCELED", ]); } else { $sink->appendRunStatus($this->sessionId, $this->runId, 'FAILED', [ 'error' => 'JOB_ENDED_UNEXPECTEDLY', 'dedupe_key' => "run:{$this->runId}:status:FAILED", ]); } } } } }