queueUtil = $queueUtil; } #[Required] final public function setSynchronizationJobProvider(SynchronizationJobProvider $jobProvider): void { $this->jobProvider = $jobProvider; } #[Required] final public function setSentryLogger(SentryLogger $sentryLogger): void { $this->sentryLogger = $sentryLogger; } protected function execute(InputInterface $input, OutputInterface $output): int { $runningJobs = array_filter($this->queueUtil->getJobStatuses(), fn (array $x) => $x['status'] === SynchronizationQueueUtil::STATUS_RUNNING); foreach ($runningJobs as $jobId => $jobInfo) { if (!($job = $this->jobProvider->getJob($jobId))) { continue; } // skip jobs with no execution time limit if ($job->maxExecutionTime === null) { continue; } // job is running more than maximum execution time, so return error code so synchronization pod is going to be restarted if ((time() - $jobInfo['time']) > ($job->maxExecutionTime * 60)) { // update job status $this->queueUtil->setJobStatus( $jobId, SynchronizationQueueUtil::STATUS_FAILED, "Job exceeded maximum execution time limit - {$job->maxExecutionTime} minutes" ); // send notification to Sentry so we know about jobs that timed out $this->sentryLogger->captureException( new SynchronizationException( "Job \"{$job->name}\" exceeded maximum execution time limit!", [ 'jobName' => $job->name, 'jobMaxExecutionTime' => $job->maxExecutionTime, ] ) ); // return error code - timeout (error status) so livenessProbe is going to restart synchronization pod return 124; } } // return 0 as success (ok status) return 0; } }