77 lines
2.1 KiB
PHP
77 lines
2.1 KiB
PHP
<?php
|
|
|
|
declare(strict_types=1);
|
|
|
|
namespace KupShop\GraphQLBundle\ApiAdmin\Util;
|
|
|
|
use KupShop\AdminBundle\Util\LegacyAdminCredentials;
|
|
use KupShop\GraphQLBundle\ApiAdmin\Changes\ChangesManager;
|
|
use KupShop\GraphQLBundle\ApiAdmin\Types\Changes\ChangeInterface;
|
|
use KupShop\GraphQLBundle\ApiAdmin\Types\Changes\Response\ChangesResetMutateResponse;
|
|
use KupShop\GraphQLBundle\ApiAdmin\Types\MutateResponse;
|
|
use KupShop\GraphQLBundle\ApiShared\ApiUtil;
|
|
use KupShop\KafkaBundle\Connection\KafkaConnectionConfig;
|
|
use KupShop\KafkaBundle\Util\KafkaConsumer;
|
|
|
|
readonly class ChangesUtil
|
|
{
|
|
public function __construct(
|
|
private LegacyAdminCredentials $adminCredentials,
|
|
private ChangesManager $changesManager,
|
|
private ?KafkaConsumer $kafkaConsumer = null,
|
|
) {
|
|
}
|
|
|
|
/**
|
|
* @return iterable<ChangeInterface>
|
|
*/
|
|
public function getChanges(int $limit): iterable
|
|
{
|
|
yield from $this->changesManager->fetchChanges(
|
|
$this->getKafkaConfig(),
|
|
ApiUtil::getLimit($limit)
|
|
);
|
|
}
|
|
|
|
public function confirmChanges(int $id): MutateResponse
|
|
{
|
|
$result = $this->kafkaConsumer->setConsumerOffset(
|
|
$this->getKafkaConfig(),
|
|
$id,
|
|
true
|
|
);
|
|
|
|
return new MutateResponse($result);
|
|
}
|
|
|
|
public function resetChanges(\DateTimeInterface $date): ChangesResetMutateResponse
|
|
{
|
|
$result = $this->kafkaConsumer->setConsumerOffsetByTimestamp(
|
|
$this->getKafkaConfig(),
|
|
$date->getTimestamp()
|
|
);
|
|
|
|
return new ChangesResetMutateResponse(
|
|
$result,
|
|
$result ? 'Changes were set to specified date.' : 'No changes since that date.',
|
|
);
|
|
}
|
|
|
|
private function getKafkaConfig(): KafkaConnectionConfig
|
|
{
|
|
return new KafkaConnectionConfig(
|
|
groupId: $this->getConsumerId(),
|
|
topic: 'graphql',
|
|
config: [
|
|
// disable autocommit of offset
|
|
'enable.auto.commit' => 'false',
|
|
]
|
|
);
|
|
}
|
|
|
|
private function getConsumerId(): string
|
|
{
|
|
return "admin-{$this->adminCredentials->getAdminID()}";
|
|
}
|
|
}
|