*/ public function getChanges(int $limit): iterable { yield from $this->changesManager->fetchChanges( $this->getKafkaConfig(), ApiUtil::getLimit($limit) ); } public function confirmChanges(int $id): MutateResponse { $result = $this->kafkaConsumer->setConsumerOffset( $this->getKafkaConfig(), $id, true ); return new MutateResponse($result); } public function resetChanges(\DateTimeInterface $date): ChangesResetMutateResponse { $result = $this->kafkaConsumer->setConsumerOffsetByTimestamp( $this->getKafkaConfig(), $date->getTimestamp() ); return new ChangesResetMutateResponse( $result, $result ? 'Changes were set to specified date.' : 'No changes since that date.', ); } private function getKafkaConfig(): KafkaConnectionConfig { return new KafkaConnectionConfig( groupId: $this->getConsumerId(), topic: 'graphql', config: [ // disable autocommit of offset 'enable.auto.commit' => 'false', ] ); } private function getConsumerId(): string { return "admin-{$this->adminCredentials->getAdminID()}"; } }