Files
kupshop/bundles/External/HannahBundle/SAP/Util/SAPConsumer.php
2025-08-02 16:30:27 +02:00

257 lines
7.4 KiB
PHP

<?php
declare(strict_types=1);
namespace External\HannahBundle\SAP\Util;
use Doctrine\DBAL\Exception\DeadlockException;
use Doctrine\DBAL\Exception\LockWaitTimeoutException;
use External\HannahBundle\SAP\Exception\SAPException;
use External\HannahBundle\Util\Configuration;
use External\HannahBundle\Util\FTP\SFTPClient;
use KupShop\KupShopBundle\Util\Logging\SentryLogger;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use Psr\Log\LoggerInterface;
class SAPConsumer
{
private $configuration;
private $sapLocator;
private $sapUtil;
private SFTPClient $sftpClient;
private $sentryLogger;
private $rabbitChannel;
private $rabbitConnection;
private LoggerInterface $logger;
public function __construct(
Configuration $configuration,
SAPLocator $sapLocator,
SAPUtil $sapUtil,
SFTPClient $sftpClient,
SentryLogger $sentryLogger,
LoggerInterface $logger,
) {
$this->configuration = $configuration;
$this->sapLocator = $sapLocator;
$this->sapUtil = $sapUtil;
$this->sftpClient = $sftpClient;
$this->sentryLogger = $sentryLogger;
$this->logger = $logger;
}
public function consume(): void
{
try {
while (count($this->getRabbitChannel()->callbacks)) {
$this->rabbitChannel->wait(null, false, 3);
}
} catch (\Exception $e) {
$this->rabbitClose();
$this->outputLog($e->getMessage(), [], 'ERROR');
throw $e;
}
}
public function execute(AMQPMessage $msg): bool
{
$message = json_decode($msg->getBody(), true) ?? [];
try {
$this->outputLog('Processing received message...');
$this->handleMessageData(
$message,
function () use ($msg) {
$msg->getChannel()->getConnection()->checkHeartBeat();
}
);
$this->outputLog('Message was processed successfully');
} catch (\Throwable $e) {
if (isDevelopment()) {
throw $e;
}
$this->outputLog('Message was not processed due some error, going for requeue', [
'message' => $e->getMessage(),
]);
$msg->getChannel()->basic_nack($msg->getDeliveryTag(), false, true);
$msg->getChannel()->basic_cancel($msg->getDeliveryTag());
if (!($e instanceof LockWaitTimeoutException) && !($e instanceof DeadlockException)) {
$this->sapUtil->addActivityLog(
'Data, která příšly ze SAPu se nepodařilo zpracovat kvůli chybě',
[
'message' => $e->getMessage(),
]
);
}
$this->sentryLogger->captureException($e);
return false;
}
$this->logCatalogInfo(
strtolower($message['method'] ?? ''),
'Processing "catalog" message: ack rabbit message',
['message' => $message]
);
if (!isDevelopment() || $this->configuration->getProcess() === 'WPJ') {
$msg->getChannel()->basic_ack($msg->getDeliveryTag());
}
return true;
}
public function getRabbitQueue(): string
{
return $this->getConfig()['queue'];
}
protected function getRabbitChannel(): AMQPChannel
{
if (empty($this->getConfig()['queue'])) {
throw new SAPException('Missing queue configuration!');
}
if (!$this->rabbitChannel) {
$this->rabbitChannel = $this->getRabbitConnection()->channel();
$this->rabbitChannel->basic_qos(null, 1, null);
$this->rabbitChannel->basic_consume($this->getRabbitQueue(), '', false, false, true, false, [$this, 'execute']);
}
return $this->rabbitChannel;
}
protected function getRabbitConnection(): AMQPStreamConnection
{
if (!$this->rabbitConnection) {
$this->rabbitConnection = new AMQPStreamConnection(
$this->getConfig()['host'],
$this->getConfig()['port'],
$this->getConfig()['user'],
$this->getConfig()['pass'],
$this->getConfig()['vhost']
);
}
return $this->rabbitConnection;
}
protected function rabbitClose(): void
{
try {
if ($this->rabbitChannel) {
$this->rabbitChannel->close();
}
} catch (\Exception $e) {
}
try {
if ($this->rabbitConnection) {
$this->rabbitConnection->close();
}
} catch (\Exception $e) {
}
$this->rabbitConnection = null;
$this->rabbitChannel = null;
}
public function handleMessageData(array $message, ?callable $processItemCallback = null): void
{
if (!($type = strtolower($message['method'] ?? ''))) {
throw new SAPException('Invalid message! Missing type of message');
}
$process = $message['process'] ?? null;
// check if message is for current website
if ($process && !in_array($process, $this->configuration->getSAPSupportedProcesses())) {
$this->outputLog('Message with process "'.$process.'" is not for us, skipping...');
return;
}
$dataKey = $message['data_key'] ?? $type;
// find data
if (!($data = $this->getDataFromMessage($dataKey, $message))) {
throw new SAPException('Invalid message! Data cannot be found');
}
$this->logCatalogInfo($type, 'Processing "catalog" message: start', [
'message' => $message,
'data' => $data,
]);
try {
$data = $this->sapUtil->loadSAPData($data);
} catch (SAPException) {
$data = [];
}
$this->logCatalogInfo($type, 'Processing "catalog" message: JSON file loaded', [
'message' => $message,
]);
$synchronizer = $this->sapLocator->get($type);
$synchronizer->setProcessItemCallback($processItemCallback);
$synchronizer->process($data);
$this->logCatalogInfo($type, 'Processing "catalog" message: JSON file processed', [
'message' => $message,
'context' => $synchronizer->getContext(),
]);
}
private function getDataFromMessage(string $dataKey, array $message): ?array
{
foreach ($message as $key => $data) {
if (strtolower((string) $key) === strtolower($dataKey)) {
return (array) $data;
}
if (is_array($data)) {
return $this->getDataFromMessage($dataKey, $data);
}
}
return null;
}
private function outputLog(string $message, array $data = [], string $severity = 'INFO'): void
{
file_put_contents(
'php://stderr',
sprintf('[%s] %s'.(!empty($data) ? ' - '.json_encode($data) : '').PHP_EOL, $severity, $message)
);
}
private function logCatalogInfo(string $type, string $message, array $data = []): void
{
if ($type !== 'catalog') {
return;
}
$this->logger->notice("[SAP] {$message}", $data);
}
public function getConfig(): ?array
{
return $this->configuration->get('sap.rabbit');
}
}