createConnection($configuration); if (!($messagesCount = $connection->countMessagesInQueues())) { $this->closeConnection($connection); return false; } $moreMessages = true; $processedCount = 0; $startTime = microtime(true); // zacnu zpracovavat zmeny $this->processMessages($configuration, $connection, function (\AMQPEnvelope $envelope) use ($fn, $connection, $configuration, $startTime, $messagesCount, &$processedCount, &$moreMessages) { // pokud vyprsel limit pro zpracovani zmen, tak to ukoncim if ((microtime(true) - $startTime) >= $configuration->timeout) { return false; } $wasAckedEarly = false; // pokud ma na sobe message header s `x-ack-on-start`, tak provedu acknuti message hned na zacatku if ($envelope->hasHeader('x-ack-on-start')) { $connection->ack($envelope, $configuration->queue); $wasAckedEarly = true; } // podle celkoveho poctu a zpracovaneho poctu odetekuju, zda ve fronte jsou jeste nejake messages, ktere jsem nezpracoval $moreMessages = ++$processedCount < $messagesCount; try { $result = $fn($envelope); } catch (RabbitRetryMessageException) { $this->handleRetryMessage($envelope, $configuration, $connection); return $moreMessages; } catch (\Throwable $e) { if (isLocalDevelopment()) { throw $e; } $this->sentryLogger->captureException($e, [ 'extra' => [ 'body' => $envelope->getBody(), ], ]); $result = false; } // pokud mi umrela connectiona, tak zastavim consume, protoze stejnak nefunguje ackovani atd... kdyz jsem uz odpojenej if (!$this->isConnectionAlive($connection)) { return false; } // pokud byla message uz acknuta, tak vracim vysledek, protoze logika dole uz neni potreba if ($wasAckedEarly) { return $moreMessages; } // pokud nastala chyba a nebo jsem na locale, tak message vratim zpatky do fronty if (!$result || isLocalDevelopment()) { $connection->nack($envelope, $configuration->queue, AMQP_REQUEUE); return $moreMessages; } // vse probehlo uspesne, takze provedu potvrzeni message $connection->ack($envelope, $configuration->queue); return $moreMessages; }); $this->closeConnection($connection); return $moreMessages; } /** * Message processing handler. * * Handler is processing all queues that are configured by configuration (priority queues). */ private function processMessages(RabbitConsumerConfiguration $configuration, Connection $connection, callable $fn): void { $processNext = true; // multiple queues can be configured when using priorities, so we need to iterate all of them foreach ($configuration->getQueues() as $options) { // process should be stopped because $fn told us if (!$processNext) { break; } // no messages in queue, so we can skip it if (!($messagesCount = $connection->queue($options['queueName'])->declareQueue())) { continue; } $processedCount = 0; // start queue consuming $connection->queue($options['queueName'])->consume(function (\AMQPEnvelope $envelope) use ($fn, $messagesCount, &$processedCount, &$processNext) { if (!$fn($envelope)) { $processNext = false; return false; } // no more messages in queue, so return false to stop consume return ++$processedCount < $messagesCount; }); } } private function closeConnection(Connection $connection): void { $amqpConnection = $connection->channel()->getConnection(); try { $connection->channel()->close(); } catch (\Exception) { } try { $amqpConnection->disconnect(); } catch (\Exception) { } } private function handleRetryMessage(\AMQPEnvelope $envelope, RabbitConsumerConfiguration $configuration, Connection $connection): void { // pokud je nastavena retry strategy, tak ji aplikuji if ($configuration->retryStrategy) { // pokud ji chci jeste opakovat, tak ji znovu poslu do delay fronty odkud se potom vrati do // normlani fronty a zkusi se zpracovat znovu if ($configuration->retryStrategy->isRetryable($envelope)) { $connection->publish( $envelope->getBody(), ['x-retry-count' => ($envelope->getHeaders()['x-retry-count'] ?? 0) + 1], $configuration->retryStrategy->getWaitingTime($envelope), // retry message should be routed to default queue when returning from delay queue (retry message becomes unprioritized) AmqpStamp::createFromAmqpEnvelope($envelope, retryRoutingKey: $configuration->getDefaultQueue()) ); } } // potvrdim message, aby zmizela z fronty $connection->ack($envelope, $configuration->queue); } private function createConnection(RabbitConsumerConfiguration $configuration): Connection { $options = [ 'auto_setup' => false, 'queues' => $configuration->getQueuesConfiguration(), ]; if ($configuration->hasPriorityQueuesWithExchange()) { $options['exchange'] = [ 'name' => $configuration->priorityOptions['exchange'], 'type' => 'direct', ]; } $connection = Connection::fromDsn( rtrim($configuration->dsn, '/').'/'.$configuration->queue, $options ); // priority options are configured, so setup priority queues with exchange if ($configuration->hasPriorityQueuesWithExchange()) { $connection->exchange()->declareExchange(); foreach ($configuration->getQueues() as $config) { $connection->queue($config['queueName'])->declareQueue(); $connection->queue($config['queueName'])->bind($configuration->priorityOptions['exchange'], $config['priorityName']); } } $connection->channel()->setPrefetchCount($configuration->prefetchCount); return $connection; } private function isConnectionAlive(Connection $connection): bool { return $connection->channel()->isConnected(); } }