257 lines
7.4 KiB
PHP
257 lines
7.4 KiB
PHP
<?php
|
|
|
|
declare(strict_types=1);
|
|
|
|
namespace External\HannahBundle\SAP\Util;
|
|
|
|
use Doctrine\DBAL\Exception\DeadlockException;
|
|
use Doctrine\DBAL\Exception\LockWaitTimeoutException;
|
|
use External\HannahBundle\SAP\Exception\SAPException;
|
|
use External\HannahBundle\Util\Configuration;
|
|
use External\HannahBundle\Util\FTP\SFTPClient;
|
|
use KupShop\KupShopBundle\Util\Logging\SentryLogger;
|
|
use PhpAmqpLib\Channel\AMQPChannel;
|
|
use PhpAmqpLib\Connection\AMQPStreamConnection;
|
|
use PhpAmqpLib\Message\AMQPMessage;
|
|
use Psr\Log\LoggerInterface;
|
|
|
|
class SAPConsumer
|
|
{
|
|
private $configuration;
|
|
|
|
private $sapLocator;
|
|
|
|
private $sapUtil;
|
|
|
|
private SFTPClient $sftpClient;
|
|
|
|
private $sentryLogger;
|
|
|
|
private $rabbitChannel;
|
|
|
|
private $rabbitConnection;
|
|
|
|
private LoggerInterface $logger;
|
|
|
|
public function __construct(
|
|
Configuration $configuration,
|
|
SAPLocator $sapLocator,
|
|
SAPUtil $sapUtil,
|
|
SFTPClient $sftpClient,
|
|
SentryLogger $sentryLogger,
|
|
LoggerInterface $logger,
|
|
) {
|
|
$this->configuration = $configuration;
|
|
$this->sapLocator = $sapLocator;
|
|
$this->sapUtil = $sapUtil;
|
|
$this->sftpClient = $sftpClient;
|
|
$this->sentryLogger = $sentryLogger;
|
|
$this->logger = $logger;
|
|
}
|
|
|
|
public function consume(): void
|
|
{
|
|
try {
|
|
while (count($this->getRabbitChannel()->callbacks)) {
|
|
$this->rabbitChannel->wait(null, false, 3);
|
|
}
|
|
} catch (\Exception $e) {
|
|
$this->rabbitClose();
|
|
|
|
$this->outputLog($e->getMessage(), [], 'ERROR');
|
|
|
|
throw $e;
|
|
}
|
|
}
|
|
|
|
public function execute(AMQPMessage $msg): bool
|
|
{
|
|
$message = json_decode($msg->getBody(), true) ?? [];
|
|
|
|
try {
|
|
$this->outputLog('Processing received message...');
|
|
$this->handleMessageData(
|
|
$message,
|
|
function () use ($msg) {
|
|
$msg->getChannel()->getConnection()->checkHeartBeat();
|
|
}
|
|
);
|
|
$this->outputLog('Message was processed successfully');
|
|
} catch (\Throwable $e) {
|
|
if (isDevelopment()) {
|
|
throw $e;
|
|
}
|
|
|
|
$this->outputLog('Message was not processed due some error, going for requeue', [
|
|
'message' => $e->getMessage(),
|
|
]);
|
|
|
|
$msg->getChannel()->basic_nack($msg->getDeliveryTag(), false, true);
|
|
$msg->getChannel()->basic_cancel($msg->getDeliveryTag());
|
|
|
|
if (!($e instanceof LockWaitTimeoutException) && !($e instanceof DeadlockException)) {
|
|
$this->sapUtil->addActivityLog(
|
|
'Data, která příšly ze SAPu se nepodařilo zpracovat kvůli chybě',
|
|
[
|
|
'message' => $e->getMessage(),
|
|
]
|
|
);
|
|
}
|
|
|
|
$this->sentryLogger->captureException($e);
|
|
|
|
return false;
|
|
}
|
|
|
|
$this->logCatalogInfo(
|
|
strtolower($message['method'] ?? ''),
|
|
'Processing "catalog" message: ack rabbit message',
|
|
['message' => $message]
|
|
);
|
|
|
|
if (!isDevelopment() || $this->configuration->getProcess() === 'WPJ') {
|
|
$msg->getChannel()->basic_ack($msg->getDeliveryTag());
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
public function getRabbitQueue(): string
|
|
{
|
|
return $this->getConfig()['queue'];
|
|
}
|
|
|
|
protected function getRabbitChannel(): AMQPChannel
|
|
{
|
|
if (empty($this->getConfig()['queue'])) {
|
|
throw new SAPException('Missing queue configuration!');
|
|
}
|
|
|
|
if (!$this->rabbitChannel) {
|
|
$this->rabbitChannel = $this->getRabbitConnection()->channel();
|
|
$this->rabbitChannel->basic_qos(null, 1, null);
|
|
$this->rabbitChannel->basic_consume($this->getRabbitQueue(), '', false, false, true, false, [$this, 'execute']);
|
|
}
|
|
|
|
return $this->rabbitChannel;
|
|
}
|
|
|
|
protected function getRabbitConnection(): AMQPStreamConnection
|
|
{
|
|
if (!$this->rabbitConnection) {
|
|
$this->rabbitConnection = new AMQPStreamConnection(
|
|
$this->getConfig()['host'],
|
|
$this->getConfig()['port'],
|
|
$this->getConfig()['user'],
|
|
$this->getConfig()['pass'],
|
|
$this->getConfig()['vhost']
|
|
);
|
|
}
|
|
|
|
return $this->rabbitConnection;
|
|
}
|
|
|
|
protected function rabbitClose(): void
|
|
{
|
|
try {
|
|
if ($this->rabbitChannel) {
|
|
$this->rabbitChannel->close();
|
|
}
|
|
} catch (\Exception $e) {
|
|
}
|
|
|
|
try {
|
|
if ($this->rabbitConnection) {
|
|
$this->rabbitConnection->close();
|
|
}
|
|
} catch (\Exception $e) {
|
|
}
|
|
|
|
$this->rabbitConnection = null;
|
|
$this->rabbitChannel = null;
|
|
}
|
|
|
|
public function handleMessageData(array $message, ?callable $processItemCallback = null): void
|
|
{
|
|
if (!($type = strtolower($message['method'] ?? ''))) {
|
|
throw new SAPException('Invalid message! Missing type of message');
|
|
}
|
|
|
|
$process = $message['process'] ?? null;
|
|
// check if message is for current website
|
|
if ($process && !in_array($process, $this->configuration->getSAPSupportedProcesses())) {
|
|
$this->outputLog('Message with process "'.$process.'" is not for us, skipping...');
|
|
|
|
return;
|
|
}
|
|
|
|
$dataKey = $message['data_key'] ?? $type;
|
|
|
|
// find data
|
|
if (!($data = $this->getDataFromMessage($dataKey, $message))) {
|
|
throw new SAPException('Invalid message! Data cannot be found');
|
|
}
|
|
|
|
$this->logCatalogInfo($type, 'Processing "catalog" message: start', [
|
|
'message' => $message,
|
|
'data' => $data,
|
|
]);
|
|
|
|
try {
|
|
$data = $this->sapUtil->loadSAPData($data);
|
|
} catch (SAPException) {
|
|
$data = [];
|
|
}
|
|
|
|
$this->logCatalogInfo($type, 'Processing "catalog" message: JSON file loaded', [
|
|
'message' => $message,
|
|
]);
|
|
|
|
$synchronizer = $this->sapLocator->get($type);
|
|
$synchronizer->setProcessItemCallback($processItemCallback);
|
|
$synchronizer->process($data);
|
|
|
|
$this->logCatalogInfo($type, 'Processing "catalog" message: JSON file processed', [
|
|
'message' => $message,
|
|
'context' => $synchronizer->getContext(),
|
|
]);
|
|
}
|
|
|
|
private function getDataFromMessage(string $dataKey, array $message): ?array
|
|
{
|
|
foreach ($message as $key => $data) {
|
|
if (strtolower((string) $key) === strtolower($dataKey)) {
|
|
return (array) $data;
|
|
}
|
|
|
|
if (is_array($data)) {
|
|
return $this->getDataFromMessage($dataKey, $data);
|
|
}
|
|
}
|
|
|
|
return null;
|
|
}
|
|
|
|
private function outputLog(string $message, array $data = [], string $severity = 'INFO'): void
|
|
{
|
|
file_put_contents(
|
|
'php://stderr',
|
|
sprintf('[%s] %s'.(!empty($data) ? ' - '.json_encode($data) : '').PHP_EOL, $severity, $message)
|
|
);
|
|
}
|
|
|
|
private function logCatalogInfo(string $type, string $message, array $data = []): void
|
|
{
|
|
if ($type !== 'catalog') {
|
|
return;
|
|
}
|
|
|
|
$this->logger->notice("[SAP] {$message}", $data);
|
|
}
|
|
|
|
public function getConfig(): ?array
|
|
{
|
|
return $this->configuration->get('sap.rabbit');
|
|
}
|
|
}
|