diff --git a/libraries-data-2/pom.xml b/libraries-data-2/pom.xml index cbb24edd3f..bdfb2c5ed6 100644 --- a/libraries-data-2/pom.xml +++ b/libraries-data-2/pom.xml @@ -121,6 +121,11 @@ univocity-parsers ${univocity.version} + + org.apache.kafka + kafka-clients + ${kafka.version} + org.awaitility awaitility @@ -184,6 +189,7 @@ RELEASE 3.0 1.8.1 + 2.5.0 diff --git a/libraries-data-2/src/main/java/com/baeldung/kafka/consumer/CountryPopulation.java b/libraries-data-2/src/main/java/com/baeldung/kafka/consumer/CountryPopulation.java new file mode 100644 index 0000000000..8c1351642f --- /dev/null +++ b/libraries-data-2/src/main/java/com/baeldung/kafka/consumer/CountryPopulation.java @@ -0,0 +1,28 @@ +package com.baeldung.kafka.consumer; + +class CountryPopulation { + + private String country; + private Integer population; + + public CountryPopulation(String country, Integer population) { + this.country = country; + this.population = population; + } + + public String getCountry() { + return country; + } + + public void setCountry(String country) { + this.country = country; + } + + public Integer getPopulation() { + return population; + } + + public void setPopulation(Integer population) { + this.population = population; + } +} \ No newline at end of file diff --git a/libraries-data-2/src/main/java/com/baeldung/kafka/consumer/CountryPopulationConsumer.java b/libraries-data-2/src/main/java/com/baeldung/kafka/consumer/CountryPopulationConsumer.java new file mode 100644 index 0000000000..ba4dfe6f3b --- /dev/null +++ b/libraries-data-2/src/main/java/com/baeldung/kafka/consumer/CountryPopulationConsumer.java @@ -0,0 +1,60 @@ +package com.baeldung.kafka.consumer; + +import java.time.Duration; +import java.util.Collections; +import java.util.stream.StreamSupport; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CountryPopulationConsumer { + + private static Logger logger = LoggerFactory.getLogger(CountryPopulationConsumer.class); + + private Consumer consumer; + private java.util.function.Consumer exceptionConsumer; + private java.util.function.Consumer countryPopulationConsumer; + + public CountryPopulationConsumer( + Consumer consumer, java.util.function.Consumer exceptionConsumer, + java.util.function.Consumer countryPopulationConsumer) { + this.consumer = consumer; + this.exceptionConsumer = exceptionConsumer; + this.countryPopulationConsumer = countryPopulationConsumer; + } + + void startBySubscribing(String topic) { + consume(() -> consumer.subscribe(Collections.singleton(topic))); + } + + void startByAssigning(String topic, int partition) { + consume(() -> consumer.assign(Collections.singleton(new TopicPartition(topic, partition)))); + } + + private void consume(Runnable beforePollingTask) { + try { + beforePollingTask.run(); + while (true) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); + StreamSupport.stream(records.spliterator(), false) + .map(record -> new CountryPopulation(record.key(), record.value())) + .forEach(countryPopulationConsumer); + consumer.commitSync(); + } + } catch (WakeupException e) { + logger.info("Shutting down..."); + } catch (RuntimeException ex) { + exceptionConsumer.accept(ex); + } finally { + consumer.close(); + } + } + + public void stop() { + consumer.wakeup(); + } +} \ No newline at end of file diff --git a/libraries-data-2/src/test/java/com/baeldung/kafka/consumer/CountryPopulationConsumerUnitTest.java b/libraries-data-2/src/test/java/com/baeldung/kafka/consumer/CountryPopulationConsumerUnitTest.java new file mode 100644 index 0000000000..1b49c71716 --- /dev/null +++ b/libraries-data-2/src/test/java/com/baeldung/kafka/consumer/CountryPopulationConsumerUnitTest.java @@ -0,0 +1,100 @@ +package com.baeldung.kafka.consumer; + +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class CountryPopulationConsumerUnitTest { + + private static final String TOPIC = "topic"; + private static final int PARTITION = 0; + + private CountryPopulationConsumer countryPopulationConsumer; + + private List updates; + private Throwable pollException; + + private MockConsumer consumer; + + @BeforeEach + void setUp() { + consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + updates = new ArrayList<>(); + countryPopulationConsumer = new CountryPopulationConsumer(consumer, ex -> this.pollException = ex, updates::add); + } + + @Test + void whenStartingByAssigningTopicPartition_thenExpectUpdatesAreConsumedCorrectly() { + // GIVEN + consumer.schedulePollTask(() -> consumer.addRecord(record(TOPIC, PARTITION, "Romania", 19_410_000))); + consumer.schedulePollTask(() -> countryPopulationConsumer.stop()); + + HashMap startOffsets = new HashMap<>(); + TopicPartition tp = new TopicPartition(TOPIC, PARTITION); + startOffsets.put(tp, 0L); + consumer.updateBeginningOffsets(startOffsets); + + // WHEN + countryPopulationConsumer.startByAssigning(TOPIC, PARTITION); + + // THEN + assertThat(updates).hasSize(1); + assertThat(consumer.closed()).isTrue(); + } + + @Test + void whenStartingBySubscribingToTopic_thenExpectUpdatesAreConsumedCorrectly() { + // GIVEN + consumer.schedulePollTask(() -> { + consumer.rebalance(Collections.singletonList(new TopicPartition(TOPIC, 0))); + consumer.addRecord(record(TOPIC, PARTITION, "Romania", 19_410_000)); + }); + consumer.schedulePollTask(() -> countryPopulationConsumer.stop()); + + HashMap startOffsets = new HashMap<>(); + TopicPartition tp = new TopicPartition(TOPIC, PARTITION); + startOffsets.put(tp, 0L); + consumer.updateBeginningOffsets(startOffsets); + + // WHEN + countryPopulationConsumer.startBySubscribing(TOPIC); + + // THEN + assertThat(updates).hasSize(1); + assertThat(consumer.closed()).isTrue(); + } + + @Test + void whenStartingBySubscribingToTopicAndExceptionOccurs_thenExpectExceptionIsHandledCorrectly() { + // GIVEN + consumer.schedulePollTask(() -> consumer.setPollException(new KafkaException("poll exception"))); + consumer.schedulePollTask(() -> countryPopulationConsumer.stop()); + + HashMap startOffsets = new HashMap<>(); + TopicPartition tp = new TopicPartition(TOPIC, 0); + startOffsets.put(tp, 0L); + consumer.updateBeginningOffsets(startOffsets); + + // WHEN + countryPopulationConsumer.startBySubscribing(TOPIC); + + // THEN + assertThat(pollException).isInstanceOf(KafkaException.class).hasMessage("poll exception"); + assertThat(consumer.closed()).isTrue(); + } + + private ConsumerRecord record(String topic, int partition, String country, int population) { + return new ConsumerRecord<>(topic, partition, 0, country, population); + } +} \ No newline at end of file