Files
kupshop/bundles/KupShop/SynchronizationBundle/Rabbit/RabbitConsumerConfiguration.php
2025-08-02 16:30:27 +02:00

101 lines
3.3 KiB
PHP

<?php
declare(strict_types=1);
namespace KupShop\SynchronizationBundle\Rabbit;
use KupShop\SynchronizationBundle\Rabbit\RetryStrategy\RetryStrategyInterface;
class RabbitConsumerConfiguration
{
/** @deprecated Use `DSN_RABBIT_CLUSTER` */
public const DSN_RABBIT_OLD = 'amqp://wpj:SbyAMozJ03tPo5EA8rVU@rabbit.wpj.cz/kupshop/';
public const DSN_RABBIT_CLUSTER = 'amqp://wpj:SbyAMozJ03tPo5EA8rVU@rabbitmq-cluster.rabbitmq/wpjshop/';
public function __construct(
public readonly string $queue,
public readonly int $timeout,
public readonly int $prefetchCount,
#[\SensitiveParameter] public readonly string $dsn,
public readonly ?RetryStrategyInterface $retryStrategy = null,
public readonly array $queueOptions = [],
public readonly array $priorityOptions = [],
) {
}
public static function create(string $queue, int $timeout = 60, int $prefetchCount = 200, ?RetryStrategyInterface $retryStrategy = null, array $queueOptions = [], array $priorityOptions = [], string $dsn = self::DSN_RABBIT_OLD): static
{
return new static($queue, $timeout, $prefetchCount, $dsn, $retryStrategy, $queueOptions);
}
public static function createDefault(string $queue, int $timeout = 60, int $prefetchCount = 200, ?RetryStrategyInterface $retryStrategy = null, array $queueOptions = [], array $priorityOptions = []): static
{
return new static($queue, $timeout, $prefetchCount, self::DSN_RABBIT_CLUSTER, $retryStrategy, $queueOptions, $priorityOptions);
}
/**
* Returns name of default queue - queue with the lowest priority when priorities are configured.
*
* It is used, for example, for delay queues. When message processing fails and retryStrategy is enabled, the message is routed
* to a delay queue, from where it is sent to this default queue.
*/
public function getDefaultQueue(): string
{
return $this->getQueues()[0]['queueName'];
}
/**
* Queues configuration for `Connection` class.
*/
public function getQueuesConfiguration(): array
{
$queues = [];
foreach ($this->getQueues() as $options) {
$queues[$options['queueName']] = ['arguments' => $this->queueOptions];
}
return $queues;
}
/**
* Returns all queues that are configured by this configuration.
*/
public function getQueues(): array
{
if (!$this->getPriorities()) {
return [0 => ['queueName' => $this->queue, 'priorityName' => null]];
}
$queues = [];
foreach ($this->getPriorities() as $priority => $priorityName) {
$queues[$priority] = ['queueName' => "{$this->queue}.{$priorityName}", 'priorityName' => $priorityName];
}
return $queues;
}
public function hasPriorityQueuesWithExchange(): bool
{
return !empty($this->priorityOptions['count']) && !empty($this->priorityOptions['exchange']);
}
private function getPriorities(): ?array
{
if (!$this->hasPriorityQueuesWithExchange()) {
return null;
}
$count = $this->priorityOptions['count'];
$priorityNames = [];
foreach (range($count - 1, 0) as $priority) {
$priorityNames[$priority] = "priority.{$priority}";
}
return $priorityNames;
}
}