72 lines
2.0 KiB
PHP
72 lines
2.0 KiB
PHP
<?php
|
|
|
|
declare(strict_types=1);
|
|
|
|
namespace KupShop\KafkaBundle\Util;
|
|
|
|
use KupShop\KafkaBundle\Connection\KafkaConnection;
|
|
use Symfony\Component\DependencyInjection\Attribute\Autowire;
|
|
use Symfony\Contracts\HttpClient\HttpClientInterface;
|
|
|
|
class KafkaAdminUtil
|
|
{
|
|
use KafkaUtilTrait;
|
|
private KafkaConnection $connection;
|
|
public \RdKafka\KafkaConsumer $rdConsumer;
|
|
|
|
public function __construct(
|
|
#[Autowire('%kupshop.kafka.brokers%')] private readonly array $brokers,
|
|
protected HttpClientInterface $httpClient,
|
|
) {
|
|
}
|
|
|
|
public function createConnection($config = [])
|
|
{
|
|
$config = array_merge([
|
|
'transport_name' => 'admin',
|
|
'metadata.broker.list' => implode(',', $this->brokers),
|
|
'group.id' => $this->getShopGroupId('admin'),
|
|
], $config);
|
|
|
|
$this->connection = \KupShop\KafkaBundle\Connection\KafkaConnection::builder($config);
|
|
$this->rdConsumer = $this->connection->createConsumer();
|
|
}
|
|
|
|
public function getExistingTopics()
|
|
{
|
|
$this->createConnection();
|
|
|
|
$metadata = $this->rdConsumer->getMetadata(true, null, 1000);
|
|
|
|
$topics = [];
|
|
foreach ($metadata->getTopics() as $topic) {
|
|
$topics[] = $topic->getTopic();
|
|
}
|
|
|
|
return $topics;
|
|
}
|
|
|
|
public function dbChangesTopicExists()
|
|
{
|
|
return in_array($this->getShopTopicName(), $this->getExistingTopics());
|
|
}
|
|
|
|
public function createDbChangesTopic()
|
|
{
|
|
if (!isRunningOnCluster()) {
|
|
return;
|
|
}
|
|
|
|
try {
|
|
$response = $this->httpClient->request('GET', 'http://rest-proxy.kafka/v3/clusters');
|
|
$content = $response->toArray();
|
|
|
|
$clusterId = $content['data'][0]['cluster_id'];
|
|
$this->httpClient->request('POST', "http://rest-proxy.kafka/v3/clusters/{$clusterId}/topics", [
|
|
'json' => ['topic_name' => $this->getShopTopicName(), 'partitions_count' => 1, 'replication_factor' => 1],
|
|
]);
|
|
} catch (\Throwable $exception) {
|
|
}
|
|
}
|
|
}
|