Files
kupshop/bundles/KupShop/KafkaBundle/Util/KafkaConsumer.php
2025-08-02 16:30:27 +02:00

165 lines
5.3 KiB
PHP

<?php
declare(strict_types=1);
namespace KupShop\KafkaBundle\Util;
use KupShop\KafkaBundle\Connection\KafkaConnection;
use KupShop\KafkaBundle\Connection\KafkaConnectionConfig;
use KupShop\KafkaBundle\Exception\KafkaException;
use RdKafka\TopicPartition;
use Symfony\Component\DependencyInjection\Attribute\Autowire;
class KafkaConsumer
{
use KafkaUtilTrait;
private KafkaConnection $connection;
public \RdKafka\KafkaConsumer $rdConsumer;
public function __construct(
#[Autowire('%kupshop.kafka.brokers%')] private readonly array $brokers,
) {
}
public function createConnection(KafkaConnectionConfig $config): void
{
$config = array_merge([
'transport_name' => $config->getGroupId().': ['.implode(',', $config->getTopics()).']',
'metadata.broker.list' => implode(',', $this->brokers),
'consumer_topics' => $config->getTopics(),
'enable.partition.eof' => 'true',
'auto.offset.reset' => 'earliest',
'group.id' => $this->getShopGroupId($config->getGroupId()),
], $config->getConfig());
$this->connection = \KupShop\KafkaBundle\Connection\KafkaConnection::builder($config);
$this->rdConsumer = $this->connection->createConsumer();
}
public function subscribe()
{
$this->rdConsumer->subscribe($this->connection->getTopics());
}
/**
* @return iterable<\RdKafka\Message>
*/
public function consumeAll(?int $limit = null): iterable
{
$count = 0;
while ($message = $this->consumeOne()) {
yield $message;
$count++;
if (is_numeric($limit) && $count >= $limit) {
break;
}
}
}
public function consumeOne(): ?\RdKafka\Message
{
try {
$kafkaMessage = $this->rdConsumer->consume($this->connection->getConsumerConsumeTimeout());
} catch (\RdKafka\Exception $exception) {
throw new KafkaException($exception->getMessage(), 0, $exception);
}
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $kafkaMessage->err) {
switch ($kafkaMessage->err) {
case RD_KAFKA_RESP_ERR__PARTITION_EOF: // No more messages
case RD_KAFKA_RESP_ERR__TIMED_OUT: // Attempt to connect again
return null;
default:
throw new KafkaException($kafkaMessage->errstr(), $kafkaMessage->err);
}
}
return $kafkaMessage;
}
public function processDbChangesMessages(KafkaConnectionConfig $config, callable $callback, ?int $limit = null): void
{
foreach ($this->getDbChangesMessages($config, $limit) as $message) {
$callback($message);
}
}
public function getDbChangesMessages(KafkaConnectionConfig $config, ?int $limit = null): iterable
{
return $this->withConsumer($config, function () use ($limit): iterable {
return $this->consumeAll($limit);
});
}
/**
* @template T
*
* @param callable(KafkaConsumer): T $fn
*
* @return T
*/
public function withConsumer(KafkaConnectionConfig $config, callable $fn): mixed
{
return $this->withConnection($config, function () use ($fn) {
$this->subscribe();
return $fn($this);
});
}
/**
* @template T
*
* @param callable(KafkaConsumer): T $fn
*
* @return T
*/
public function withConnection(KafkaConnectionConfig $config, callable $fn): mixed
{
$this->createConnection($config);
return $fn($this);
}
public function setConsumerOffset(KafkaConnectionConfig $config, int $offset, bool $allowOnlyHigher = false): bool
{
return $this->withConnection($config, function () use ($config, $offset, $allowOnlyHigher) {
$nextOffset = $offset + 1;
if ($allowOnlyHigher && ($commited = $this->rdConsumer->getCommittedOffsets([new \RdKafka\TopicPartition($config->getTopic(), 0)], 1000))) {
// commit is not allowed when last consumer commited offset is higher than next
if (($commited[0]->getOffset() ?? 0) > $nextOffset) {
return false;
}
}
$this->rdConsumer->commit([new \RdKafka\TopicPartition($config->getTopic(), 0, $nextOffset)]);
return true;
});
}
public function setConsumerOffsetByTimestamp(KafkaConnectionConfig $config, int $timestamp): bool
{
return $this->withConnection($config, function () use ($config, $timestamp) {
// Convert timestamp to milliseconds (if necessary)
$timestampMs = $timestamp * 1000;
$topicPartition = [new \RdKafka\TopicPartition($config->getTopic(), 0, $timestampMs)];
// Fetch offsets for the specified timestamp
$offsets = $this->rdConsumer->offsetsForTimes($topicPartition, 1000);
// Assign and seek the consumer to the calculated offsets
$validOffsets = array_filter($offsets, fn (TopicPartition $offset) => $offset->getOffset() !== -1);
if (empty($validOffsets)) {
return false;
}
$this->rdConsumer->commit($validOffsets);
return true;
});
}
}