$config->getGroupId().': ['.implode(',', $config->getTopics()).']', 'metadata.broker.list' => implode(',', $this->brokers), 'consumer_topics' => $config->getTopics(), 'enable.partition.eof' => 'true', 'auto.offset.reset' => 'earliest', 'group.id' => $this->getShopGroupId($config->getGroupId()), ], $config->getConfig()); $this->connection = \KupShop\KafkaBundle\Connection\KafkaConnection::builder($config); $this->rdConsumer = $this->connection->createConsumer(); } public function subscribe() { $this->rdConsumer->subscribe($this->connection->getTopics()); } /** * @return iterable<\RdKafka\Message> */ public function consumeAll(?int $limit = null): iterable { $count = 0; while ($message = $this->consumeOne()) { yield $message; $count++; if (is_numeric($limit) && $count >= $limit) { break; } } } public function consumeOne(): ?\RdKafka\Message { try { $kafkaMessage = $this->rdConsumer->consume($this->connection->getConsumerConsumeTimeout()); } catch (\RdKafka\Exception $exception) { throw new KafkaException($exception->getMessage(), 0, $exception); } if (RD_KAFKA_RESP_ERR_NO_ERROR !== $kafkaMessage->err) { switch ($kafkaMessage->err) { case RD_KAFKA_RESP_ERR__PARTITION_EOF: // No more messages case RD_KAFKA_RESP_ERR__TIMED_OUT: // Attempt to connect again return null; default: throw new KafkaException($kafkaMessage->errstr(), $kafkaMessage->err); } } return $kafkaMessage; } public function processDbChangesMessages(KafkaConnectionConfig $config, callable $callback, ?int $limit = null): void { foreach ($this->getDbChangesMessages($config, $limit) as $message) { $callback($message); } } public function getDbChangesMessages(KafkaConnectionConfig $config, ?int $limit = null): iterable { return $this->withConsumer($config, function () use ($limit): iterable { return $this->consumeAll($limit); }); } /** * @template T * * @param callable(KafkaConsumer): T $fn * * @return T */ public function withConsumer(KafkaConnectionConfig $config, callable $fn): mixed { return $this->withConnection($config, function () use ($fn) { $this->subscribe(); return $fn($this); }); } /** * @template T * * @param callable(KafkaConsumer): T $fn * * @return T */ public function withConnection(KafkaConnectionConfig $config, callable $fn): mixed { $this->createConnection($config); return $fn($this); } public function setConsumerOffset(KafkaConnectionConfig $config, int $offset, bool $allowOnlyHigher = false): bool { return $this->withConnection($config, function () use ($config, $offset, $allowOnlyHigher) { $nextOffset = $offset + 1; if ($allowOnlyHigher && ($commited = $this->rdConsumer->getCommittedOffsets([new \RdKafka\TopicPartition($config->getTopic(), 0)], 1000))) { // commit is not allowed when last consumer commited offset is higher than next if (($commited[0]->getOffset() ?? 0) > $nextOffset) { return false; } } $this->rdConsumer->commit([new \RdKafka\TopicPartition($config->getTopic(), 0, $nextOffset)]); return true; }); } public function setConsumerOffsetByTimestamp(KafkaConnectionConfig $config, int $timestamp): bool { return $this->withConnection($config, function () use ($config, $timestamp) { // Convert timestamp to milliseconds (if necessary) $timestampMs = $timestamp * 1000; $topicPartition = [new \RdKafka\TopicPartition($config->getTopic(), 0, $timestampMs)]; // Fetch offsets for the specified timestamp $offsets = $this->rdConsumer->offsetsForTimes($topicPartition, 1000); // Assign and seek the consumer to the calculated offsets $validOffsets = array_filter($offsets, fn (TopicPartition $offset) => $offset->getOffset() !== -1); if (empty($validOffsets)) { return false; } $this->rdConsumer->commit($validOffsets); return true; }); } }