Files
kupshop/bundles/External/ZNZBundle/Util/ZNZWorker.php
2025-08-02 16:30:27 +02:00

134 lines
4.0 KiB
PHP
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
declare(strict_types=1);
namespace External\ZNZBundle\Util;
use External\ZNZBundle\Exception\ZNZException;
use KupShop\KupShopBundle\Context\LanguageContext;
use KupShop\KupShopBundle\Util\Contexts;
use KupShop\KupShopBundle\Util\Functional\Mapping;
use KupShop\SynchronizationBundle\Exception\RabbitRetryMessageException;
use KupShop\SynchronizationBundle\Rabbit\RabbitConsumerConfiguration;
use KupShop\SynchronizationBundle\Util\Rabbit\RabbitConsumer;
class ZNZWorker
{
public function __construct(
private ZNZConfiguration $configuration,
private ZNZSynchronizerLocator $synchronizerLocator,
private RabbitConsumer $rabbitConsumer,
private ZNZLogger $logger,
) {
}
public function consumeChanges(): bool
{
$this->increaseMemoryLimit();
return $this->rabbitConsumer->consume(
$this->getRabbitConsumerConfiguration(),
fn (\AMQPEnvelope $envelope) => $this->processMessage(json_decode($envelope->getBody(), true)),
);
}
public function processMessage(array $data): bool
{
if (!$this->isMessageWebsiteSupported($data)) {
return true;
}
if (!$this->isMessageLanguageSupported($data)) {
return true;
}
// najdu servisu pro synchronizaci dane zmenove tabulky
if (!($synchronizer = $this->synchronizerLocator->getByHandledTable($data['Tabulka']))) {
if (isLocalDevelopment()) {
throw new ZNZException(sprintf('Unknown value in "Tabulka" column: %s', $data['Tabulka']));
}
// do not log to the activity log unknown or unexpected values under the "Tabulka" key are now common and should be ignored.
return true;
}
try {
$synchronizer->process($data);
} catch (RabbitRetryMessageException $e) {
throw $e;
} catch (\Throwable $e) {
$this->logger->log($e, sprintf('Process of "%s" change failed!', $data['Tabulka']), [
'data' => $this->prepareDataForActivityLog($data),
]);
if (($e instanceof ZNZException) && $e->type === ZNZException::TYPE_SOFT) {
return true;
}
return false;
}
return true;
}
/**
* Kontroluje, zda je dana message pro aktualni web.
*/
private function isMessageWebsiteSupported(array $data): bool
{
if (empty($data['IDWebsite']) || isset($this->configuration->getSupportedWebsites()[$data['IDWebsite']])) {
return true;
}
return false;
}
/**
* Kontroluje, zda je jazyk pro dany web podporovany. Pokud neni, tak tu message muzeme zahodit.
*/
private function isMessageLanguageSupported(array $data): bool
{
static $supportedLanguages;
if ($supportedLanguages === null) {
$supportedLanguages = Mapping::mapKeys(Contexts::get(LanguageContext::class)->getAll(), fn ($k, $v) => [$v->getLocale(), true]);
}
if ($data['IDLanguage'] === null || isset($supportedLanguages[$data['IDLanguage']])) {
return true;
}
return false;
}
private function prepareDataForActivityLog(array $data): array
{
// pokud je to neco silene dlouheho, tak to oriznu
if (!empty($data['json']) && strlen($data['json']) > 10000) {
$data['json'] = substr($data['json'], 0, 10000);
}
return $data;
}
private function getRabbitConsumerConfiguration(): RabbitConsumerConfiguration
{
return RabbitConsumerConfiguration::createDefault(
queue: $this->configuration->getRabbitQueue(),
timeout: 45,
retryStrategy: $this->configuration->getRabbitRetryStrategy(),
priorityOptions: ['count' => 3, 'exchange' => 'znz']
);
}
private function increaseMemoryLimit(): void
{
static $memoryIncreased;
if (!$memoryIncreased) {
increaseMemoryLimit(4096);
}
}
}