213 lines
7.6 KiB
PHP
213 lines
7.6 KiB
PHP
<?php
|
|
|
|
declare(strict_types=1);
|
|
|
|
namespace KupShop\SynchronizationBundle\Util\Rabbit;
|
|
|
|
use KupShop\KupShopBundle\Util\Logging\SentryLogger;
|
|
use KupShop\SynchronizationBundle\Exception\RabbitRetryMessageException;
|
|
use KupShop\SynchronizationBundle\Rabbit\RabbitConsumerConfiguration;
|
|
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpStamp;
|
|
use Symfony\Component\Messenger\Bridge\Amqp\Transport\Connection;
|
|
|
|
class RabbitConsumer
|
|
{
|
|
public function __construct(private readonly SentryLogger $sentryLogger)
|
|
{
|
|
}
|
|
|
|
/**
|
|
* @param callable(\AMQPEnvelope): bool $fn
|
|
*/
|
|
public function consume(RabbitConsumerConfiguration $configuration, callable $fn): bool
|
|
{
|
|
$connection = $this->createConnection($configuration);
|
|
|
|
if (!($messagesCount = $connection->countMessagesInQueues())) {
|
|
$this->closeConnection($connection);
|
|
|
|
return false;
|
|
}
|
|
|
|
$moreMessages = true;
|
|
$processedCount = 0;
|
|
|
|
$startTime = microtime(true);
|
|
|
|
// zacnu zpracovavat zmeny
|
|
$this->processMessages($configuration, $connection, function (\AMQPEnvelope $envelope) use ($fn, $connection, $configuration, $startTime, $messagesCount, &$processedCount, &$moreMessages) {
|
|
// pokud vyprsel limit pro zpracovani zmen, tak to ukoncim
|
|
if ((microtime(true) - $startTime) >= $configuration->timeout) {
|
|
return false;
|
|
}
|
|
|
|
$wasAckedEarly = false;
|
|
// pokud ma na sobe message header s `x-ack-on-start`, tak provedu acknuti message hned na zacatku
|
|
if ($envelope->hasHeader('x-ack-on-start')) {
|
|
$connection->ack($envelope, $configuration->queue);
|
|
$wasAckedEarly = true;
|
|
}
|
|
|
|
// podle celkoveho poctu a zpracovaneho poctu odetekuju, zda ve fronte jsou jeste nejake messages, ktere jsem nezpracoval
|
|
$moreMessages = ++$processedCount < $messagesCount;
|
|
|
|
try {
|
|
$result = $fn($envelope);
|
|
} catch (RabbitRetryMessageException) {
|
|
$this->handleRetryMessage($envelope, $configuration, $connection);
|
|
|
|
return $moreMessages;
|
|
} catch (\Throwable $e) {
|
|
if (isLocalDevelopment()) {
|
|
throw $e;
|
|
}
|
|
|
|
$this->sentryLogger->captureException($e, [
|
|
'extra' => [
|
|
'body' => $envelope->getBody(),
|
|
],
|
|
]);
|
|
|
|
$result = false;
|
|
}
|
|
|
|
// pokud mi umrela connectiona, tak zastavim consume, protoze stejnak nefunguje ackovani atd... kdyz jsem uz odpojenej
|
|
if (!$this->isConnectionAlive($connection)) {
|
|
return false;
|
|
}
|
|
|
|
// pokud byla message uz acknuta, tak vracim vysledek, protoze logika dole uz neni potreba
|
|
if ($wasAckedEarly) {
|
|
return $moreMessages;
|
|
}
|
|
|
|
// pokud nastala chyba a nebo jsem na locale, tak message vratim zpatky do fronty
|
|
if (!$result || isLocalDevelopment()) {
|
|
$connection->nack($envelope, $configuration->queue, AMQP_REQUEUE);
|
|
|
|
return $moreMessages;
|
|
}
|
|
|
|
// vse probehlo uspesne, takze provedu potvrzeni message
|
|
$connection->ack($envelope, $configuration->queue);
|
|
|
|
return $moreMessages;
|
|
});
|
|
|
|
$this->closeConnection($connection);
|
|
|
|
return $moreMessages;
|
|
}
|
|
|
|
/**
|
|
* Message processing handler.
|
|
*
|
|
* Handler is processing all queues that are configured by configuration (priority queues).
|
|
*/
|
|
private function processMessages(RabbitConsumerConfiguration $configuration, Connection $connection, callable $fn): void
|
|
{
|
|
$processNext = true;
|
|
|
|
// multiple queues can be configured when using priorities, so we need to iterate all of them
|
|
foreach ($configuration->getQueues() as $options) {
|
|
// process should be stopped because $fn told us
|
|
if (!$processNext) {
|
|
break;
|
|
}
|
|
|
|
// no messages in queue, so we can skip it
|
|
if (!($messagesCount = $connection->queue($options['queueName'])->declareQueue())) {
|
|
continue;
|
|
}
|
|
|
|
$processedCount = 0;
|
|
|
|
// start queue consuming
|
|
$connection->queue($options['queueName'])->consume(function (\AMQPEnvelope $envelope) use ($fn, $messagesCount, &$processedCount, &$processNext) {
|
|
if (!$fn($envelope)) {
|
|
$processNext = false;
|
|
|
|
return false;
|
|
}
|
|
|
|
// no more messages in queue, so return false to stop consume
|
|
return ++$processedCount < $messagesCount;
|
|
});
|
|
}
|
|
}
|
|
|
|
private function closeConnection(Connection $connection): void
|
|
{
|
|
$amqpConnection = $connection->channel()->getConnection();
|
|
|
|
try {
|
|
$connection->channel()->close();
|
|
} catch (\Exception) {
|
|
}
|
|
|
|
try {
|
|
$amqpConnection->disconnect();
|
|
} catch (\Exception) {
|
|
}
|
|
}
|
|
|
|
private function handleRetryMessage(\AMQPEnvelope $envelope, RabbitConsumerConfiguration $configuration, Connection $connection): void
|
|
{
|
|
// pokud je nastavena retry strategy, tak ji aplikuji
|
|
if ($configuration->retryStrategy) {
|
|
// pokud ji chci jeste opakovat, tak ji znovu poslu do delay fronty odkud se potom vrati do
|
|
// normlani fronty a zkusi se zpracovat znovu
|
|
if ($configuration->retryStrategy->isRetryable($envelope)) {
|
|
$connection->publish(
|
|
$envelope->getBody(),
|
|
['x-retry-count' => ($envelope->getHeaders()['x-retry-count'] ?? 0) + 1],
|
|
$configuration->retryStrategy->getWaitingTime($envelope),
|
|
// retry message should be routed to default queue when returning from delay queue (retry message becomes unprioritized)
|
|
AmqpStamp::createFromAmqpEnvelope($envelope, retryRoutingKey: $configuration->getDefaultQueue())
|
|
);
|
|
}
|
|
}
|
|
|
|
// potvrdim message, aby zmizela z fronty
|
|
$connection->ack($envelope, $configuration->queue);
|
|
}
|
|
|
|
private function createConnection(RabbitConsumerConfiguration $configuration): Connection
|
|
{
|
|
$options = [
|
|
'auto_setup' => false,
|
|
'queues' => $configuration->getQueuesConfiguration(),
|
|
];
|
|
|
|
if ($configuration->hasPriorityQueuesWithExchange()) {
|
|
$options['exchange'] = [
|
|
'name' => $configuration->priorityOptions['exchange'],
|
|
'type' => 'direct',
|
|
];
|
|
}
|
|
|
|
$connection = Connection::fromDsn(
|
|
rtrim($configuration->dsn, '/').'/'.$configuration->queue,
|
|
$options
|
|
);
|
|
|
|
// priority options are configured, so setup priority queues with exchange
|
|
if ($configuration->hasPriorityQueuesWithExchange()) {
|
|
$connection->exchange()->declareExchange();
|
|
foreach ($configuration->getQueues() as $config) {
|
|
$connection->queue($config['queueName'])->declareQueue();
|
|
$connection->queue($config['queueName'])->bind($configuration->priorityOptions['exchange'], $config['priorityName']);
|
|
}
|
|
}
|
|
|
|
$connection->channel()->setPrefetchCount($configuration->prefetchCount);
|
|
|
|
return $connection;
|
|
}
|
|
|
|
private function isConnectionAlive(Connection $connection): bool
|
|
{
|
|
return $connection->channel()->isConnected();
|
|
}
|
|
}
|