70 lines
1.8 KiB
PHP
70 lines
1.8 KiB
PHP
<?php
|
|
|
|
declare(strict_types=1);
|
|
|
|
namespace External\HannahBundle\SAP\Command;
|
|
|
|
use External\HannahBundle\SAP\Util\SAPConsumer;
|
|
use KupShop\KupShopBundle\Util\Logging\SentryLogger;
|
|
use PhpAmqpLib\Exception\AMQPTimeoutException;
|
|
use Symfony\Component\Console\Command\Command;
|
|
use Symfony\Component\Console\Input\InputInterface;
|
|
use Symfony\Component\Console\Output\OutputInterface;
|
|
|
|
class SAPConsumeCommand extends Command
|
|
{
|
|
protected static $defaultName = 'sap:consume';
|
|
|
|
/** @var SAPConsumer */
|
|
private $consumer;
|
|
|
|
/** @var SentryLogger */
|
|
private $sentryLogger;
|
|
|
|
/**
|
|
* @required
|
|
*/
|
|
public function setSAPConsumer(SAPConsumer $consumer): void
|
|
{
|
|
$this->consumer = $consumer;
|
|
}
|
|
|
|
/**
|
|
* @required
|
|
*/
|
|
public function setSentryLogger(SentryLogger $sentryLogger): void
|
|
{
|
|
$this->sentryLogger = $sentryLogger;
|
|
}
|
|
|
|
protected function execute(InputInterface $input, OutputInterface $output): int
|
|
{
|
|
$output->writeln(sprintf('Starting consumer: %s...', $this->consumer->getRabbitQueue()));
|
|
|
|
while (true) {
|
|
sqlGetConnection()->connect();
|
|
|
|
try {
|
|
$this->consumer->consume();
|
|
} catch (\Throwable $e) {
|
|
if ($e instanceof AMQPTimeoutException) {
|
|
$output->writeln('[INFO] Connection timed out, probably no data to receive. Sleep and try in next 300 seconds...');
|
|
|
|
sqlGetConnection()->close();
|
|
|
|
sleep(300);
|
|
continue;
|
|
}
|
|
|
|
$this->sentryLogger->captureException($e);
|
|
|
|
$output->writeln('[INFO] Some error happened during consuming. Restarting consumer...');
|
|
|
|
sleep(5);
|
|
}
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
}
|