134 lines
4.0 KiB
PHP
134 lines
4.0 KiB
PHP
<?php
|
||
|
||
declare(strict_types=1);
|
||
|
||
namespace External\ZNZBundle\Util;
|
||
|
||
use External\ZNZBundle\Exception\ZNZException;
|
||
use KupShop\KupShopBundle\Context\LanguageContext;
|
||
use KupShop\KupShopBundle\Util\Contexts;
|
||
use KupShop\KupShopBundle\Util\Functional\Mapping;
|
||
use KupShop\SynchronizationBundle\Exception\RabbitRetryMessageException;
|
||
use KupShop\SynchronizationBundle\Rabbit\RabbitConsumerConfiguration;
|
||
use KupShop\SynchronizationBundle\Util\Rabbit\RabbitConsumer;
|
||
|
||
class ZNZWorker
|
||
{
|
||
public function __construct(
|
||
private ZNZConfiguration $configuration,
|
||
private ZNZSynchronizerLocator $synchronizerLocator,
|
||
private RabbitConsumer $rabbitConsumer,
|
||
private ZNZLogger $logger,
|
||
) {
|
||
}
|
||
|
||
public function consumeChanges(): bool
|
||
{
|
||
$this->increaseMemoryLimit();
|
||
|
||
return $this->rabbitConsumer->consume(
|
||
$this->getRabbitConsumerConfiguration(),
|
||
fn (\AMQPEnvelope $envelope) => $this->processMessage(json_decode($envelope->getBody(), true)),
|
||
);
|
||
}
|
||
|
||
public function processMessage(array $data): bool
|
||
{
|
||
if (!$this->isMessageWebsiteSupported($data)) {
|
||
return true;
|
||
}
|
||
|
||
if (!$this->isMessageLanguageSupported($data)) {
|
||
return true;
|
||
}
|
||
|
||
// najdu servisu pro synchronizaci dane zmenove tabulky
|
||
if (!($synchronizer = $this->synchronizerLocator->getByHandledTable($data['Tabulka']))) {
|
||
if (isLocalDevelopment()) {
|
||
throw new ZNZException(sprintf('Unknown value in "Tabulka" column: %s', $data['Tabulka']));
|
||
}
|
||
|
||
// do not log to the activity log – unknown or unexpected values under the "Tabulka" key are now common and should be ignored.
|
||
|
||
return true;
|
||
}
|
||
|
||
try {
|
||
$synchronizer->process($data);
|
||
} catch (RabbitRetryMessageException $e) {
|
||
throw $e;
|
||
} catch (\Throwable $e) {
|
||
$this->logger->log($e, sprintf('Process of "%s" change failed!', $data['Tabulka']), [
|
||
'data' => $this->prepareDataForActivityLog($data),
|
||
]);
|
||
|
||
if (($e instanceof ZNZException) && $e->type === ZNZException::TYPE_SOFT) {
|
||
return true;
|
||
}
|
||
|
||
return false;
|
||
}
|
||
|
||
return true;
|
||
}
|
||
|
||
/**
|
||
* Kontroluje, zda je dana message pro aktualni web.
|
||
*/
|
||
private function isMessageWebsiteSupported(array $data): bool
|
||
{
|
||
if (empty($data['IDWebsite']) || isset($this->configuration->getSupportedWebsites()[$data['IDWebsite']])) {
|
||
return true;
|
||
}
|
||
|
||
return false;
|
||
}
|
||
|
||
/**
|
||
* Kontroluje, zda je jazyk pro dany web podporovany. Pokud neni, tak tu message muzeme zahodit.
|
||
*/
|
||
private function isMessageLanguageSupported(array $data): bool
|
||
{
|
||
static $supportedLanguages;
|
||
|
||
if ($supportedLanguages === null) {
|
||
$supportedLanguages = Mapping::mapKeys(Contexts::get(LanguageContext::class)->getAll(), fn ($k, $v) => [$v->getLocale(), true]);
|
||
}
|
||
|
||
if ($data['IDLanguage'] === null || isset($supportedLanguages[$data['IDLanguage']])) {
|
||
return true;
|
||
}
|
||
|
||
return false;
|
||
}
|
||
|
||
private function prepareDataForActivityLog(array $data): array
|
||
{
|
||
// pokud je to neco silene dlouheho, tak to oriznu
|
||
if (!empty($data['json']) && strlen($data['json']) > 10000) {
|
||
$data['json'] = substr($data['json'], 0, 10000);
|
||
}
|
||
|
||
return $data;
|
||
}
|
||
|
||
private function getRabbitConsumerConfiguration(): RabbitConsumerConfiguration
|
||
{
|
||
return RabbitConsumerConfiguration::createDefault(
|
||
queue: $this->configuration->getRabbitQueue(),
|
||
timeout: 45,
|
||
retryStrategy: $this->configuration->getRabbitRetryStrategy(),
|
||
priorityOptions: ['count' => 3, 'exchange' => 'znz']
|
||
);
|
||
}
|
||
|
||
private function increaseMemoryLimit(): void
|
||
{
|
||
static $memoryIncreased;
|
||
|
||
if (!$memoryIncreased) {
|
||
increaseMemoryLimit(4096);
|
||
}
|
||
}
|
||
}
|