diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/consumer/ConsumeFromBeginning.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/consumer/ConsumeFromBeginning.java new file mode 100644 index 0000000000..569c5aa9e9 --- /dev/null +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/consumer/ConsumeFromBeginning.java @@ -0,0 +1,81 @@ +package com.baeldung.kafka.consumer; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Properties; +import java.util.UUID; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConsumeFromBeginning { + + private static Logger logger = LoggerFactory.getLogger(ConsumeFromBeginning.class); + + private static String TOPIC = "baeldung"; + private static int messagesInTopic = 10; + + private static KafkaProducer producer; + private static KafkaConsumer consumer; + + public static void main(String[] args) { + setup(); + + publishMessages(); + + consumeFromBeginning(); + } + + private static void consumeFromBeginning() { + consumer.subscribe(Arrays.asList(TOPIC)); + + ConsumerRecords records = consumer.poll(Duration.ofSeconds(10)); + + for (ConsumerRecord record : records) { + logger.info(record.value()); + } + + consumer.seekToBeginning(consumer.assignment()); + + records = consumer.poll(Duration.ofSeconds(10)); + + for (ConsumerRecord record : records) { + logger.info(record.value()); + } + } + + private static void publishMessages() { + for (int i = 1; i <= messagesInTopic; i++) { + ProducerRecord record = new ProducerRecord<>(TOPIC, String.valueOf(i)); + producer.send(record); + } + } + + private static void setup() { + Properties producerProperties = new Properties(); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + + Properties consumerProperties = new Properties(); + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID() + .toString()); + + producer = new KafkaProducer<>(producerProperties); + consumer = new KafkaConsumer<>(consumerProperties); + } + +} diff --git a/apache-kafka-2/src/test/java/com/baeldung/kafka/consumer/ConsumeFromBeginningLiveTest.java b/apache-kafka-2/src/test/java/com/baeldung/kafka/consumer/ConsumeFromBeginningLiveTest.java new file mode 100644 index 0000000000..6bfba1eca9 --- /dev/null +++ b/apache-kafka-2/src/test/java/com/baeldung/kafka/consumer/ConsumeFromBeginningLiveTest.java @@ -0,0 +1,109 @@ +package com.baeldung.kafka.consumer; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.ExecutionException; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +// This live test needs a Docker Daemon running so that a kafka container can be created + +@Testcontainers +public class ConsumeFromBeginningLiveTest { + + private static Logger logger = LoggerFactory.getLogger(ConsumeFromBeginningLiveTest.class); + + private static String TOPIC = "baeldung"; + private static int messagesInTopic = 10; + + private static KafkaProducer producer; + private static KafkaConsumer consumer; + + @Container + private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")); + + @BeforeAll + static void setup() { + KAFKA_CONTAINER.addExposedPort(9092); + + Properties producerProperties = new Properties(); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + + Properties consumerProperties = new Properties(); + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID() + .toString()); + + producer = new KafkaProducer<>(producerProperties); + consumer = new KafkaConsumer<>(consumerProperties); + } + + private static void publishMessages() throws ExecutionException, InterruptedException { + for (int i = 1; i <= messagesInTopic; i++) { + ProducerRecord record = new ProducerRecord<>(TOPIC, String.valueOf(i)); + producer.send(record) + .get(); + } + } + + @AfterAll + static void destroy() { + KAFKA_CONTAINER.stop(); + } + + @Test + void givenMessages_whenConsumedFromBeginning_thenCheckIfConsumedFromBeginning() throws ExecutionException, InterruptedException { + + publishMessages(); + + consumer.subscribe(Arrays.asList(TOPIC)); + + ConsumerRecords records = consumer.poll(Duration.ofSeconds(10)); + + int messageCount = 0; + for (ConsumerRecord record : records) { + logger.info(record.value()); + messageCount++; + } + + assertEquals(messagesInTopic, messageCount); + + consumer.seekToBeginning(consumer.assignment()); + + records = consumer.poll(Duration.ofSeconds(10)); + + messageCount = 0; + for (ConsumerRecord record : records) { + logger.info(record.value()); + messageCount++; + } + + assertEquals(messagesInTopic, messageCount); + } +}