getQueue(); $queue[$jobId] = $jobId; $this->setQueue($queue); } public function processOne(?OutputInterface $output = null): bool { $jobId = $this->getJob(); if (!$jobId) { return false; } $job = $this->jobProvider->getJob($jobId); if (!$job) { return false; } try { $this->setJobStatus($jobId, self::STATUS_RUNNING); ($job->fn)($output ?: new NullOutput()); } catch (\Exception $e) { $this->setJobStatus($jobId, self::STATUS_FAILED, $e->getMessage()); throw $e; } $this->setJobStatus($jobId, self::STATUS_SUCCESS); return true; } public function processAll(): void { while (!empty($this->getQueue())) { $this->processOne(); } } public function isQueued(string $jobId): bool { return in_array($jobId, $this->getQueue()); } protected function getQueue(): array { return getCache(self::CACHE_KEY) ?: []; } protected function setQueue(array $queue): void { setCache(self::CACHE_KEY, $queue); } protected function getJob(): ?string { $queue = $this->getQueue(); $job = array_shift($queue); $this->setQueue($queue); return $job; } public function setJobStatus(string $jobId, string $status, string $msg = ''): void { $statuses = $this->getJobStatuses(); $currentStatus = $statuses[$jobId] ?? []; $currentStatus['status'] = $status; $currentStatus['msg'] = $msg; if ($status === self::STATUS_RUNNING) { $currentStatus['time'] = time(); } $statuses[$jobId] = $currentStatus; $this->setJobStatuses($statuses); } protected function setJobStatuses(array $statuses): void { setCache(self::CACHE_KEY_JOB_STATUSES, $statuses, self::CACHE_TTL); } public function getJobStatuses(): array { return getCache(self::CACHE_KEY_JOB_STATUSES) ?: []; } public function getJobStatus(string $jobId): ?array { return $this->getJobStatuses()[$jobId] ?? null; } #[Required] public function setJobProvider(SynchronizationJobProvider $jobProvider): void { $this->jobProvider = $jobProvider; } }