Files
2025-08-02 16:30:27 +02:00

77 lines
2.1 KiB
PHP

<?php
declare(strict_types=1);
namespace KupShop\GraphQLBundle\ApiAdmin\Util;
use KupShop\AdminBundle\Util\LegacyAdminCredentials;
use KupShop\GraphQLBundle\ApiAdmin\Changes\ChangesManager;
use KupShop\GraphQLBundle\ApiAdmin\Types\Changes\ChangeInterface;
use KupShop\GraphQLBundle\ApiAdmin\Types\Changes\Response\ChangesResetMutateResponse;
use KupShop\GraphQLBundle\ApiAdmin\Types\MutateResponse;
use KupShop\GraphQLBundle\ApiShared\ApiUtil;
use KupShop\KafkaBundle\Connection\KafkaConnectionConfig;
use KupShop\KafkaBundle\Util\KafkaConsumer;
readonly class ChangesUtil
{
public function __construct(
private LegacyAdminCredentials $adminCredentials,
private ChangesManager $changesManager,
private ?KafkaConsumer $kafkaConsumer = null,
) {
}
/**
* @return iterable<ChangeInterface>
*/
public function getChanges(int $limit): iterable
{
yield from $this->changesManager->fetchChanges(
$this->getKafkaConfig(),
ApiUtil::getLimit($limit)
);
}
public function confirmChanges(int $id): MutateResponse
{
$result = $this->kafkaConsumer->setConsumerOffset(
$this->getKafkaConfig(),
$id,
true
);
return new MutateResponse($result);
}
public function resetChanges(\DateTimeInterface $date): ChangesResetMutateResponse
{
$result = $this->kafkaConsumer->setConsumerOffsetByTimestamp(
$this->getKafkaConfig(),
$date->getTimestamp()
);
return new ChangesResetMutateResponse(
$result,
$result ? 'Changes were set to specified date.' : 'No changes since that date.',
);
}
private function getKafkaConfig(): KafkaConnectionConfig
{
return new KafkaConnectionConfig(
groupId: $this->getConsumerId(),
topic: 'graphql',
config: [
// disable autocommit of offset
'enable.auto.commit' => 'false',
]
);
}
private function getConsumerId(): string
{
return "admin-{$this->adminCredentials->getAdminID()}";
}
}