'admin', 'metadata.broker.list' => implode(',', $this->brokers), 'group.id' => $this->getShopGroupId('admin'), ], $config); $this->connection = \KupShop\KafkaBundle\Connection\KafkaConnection::builder($config); $this->rdConsumer = $this->connection->createConsumer(); } public function getExistingTopics() { $this->createConnection(); $metadata = $this->rdConsumer->getMetadata(true, null, 1000); $topics = []; foreach ($metadata->getTopics() as $topic) { $topics[] = $topic->getTopic(); } return $topics; } public function dbChangesTopicExists() { return in_array($this->getShopTopicName(), $this->getExistingTopics()); } public function createDbChangesTopic() { if (!isRunningOnCluster()) { return; } try { $response = $this->httpClient->request('GET', 'http://rest-proxy.kafka/v3/clusters'); $content = $response->toArray(); $clusterId = $content['data'][0]['cluster_id']; $this->httpClient->request('POST', "http://rest-proxy.kafka/v3/clusters/{$clusterId}/topics", [ 'json' => ['topic_name' => $this->getShopTopicName(), 'partitions_count' => 1, 'replication_factor' => 1], ]); } catch (\Throwable $exception) { } } }