Merge pull request #9271 from ferarubogdan94/master
[BAEL-3890] Using Kafka MockConsumer
This commit is contained in:
commit
ba8b6d0660
|
@ -121,6 +121,11 @@
|
||||||
<artifactId>univocity-parsers</artifactId>
|
<artifactId>univocity-parsers</artifactId>
|
||||||
<version>${univocity.version}</version>
|
<version>${univocity.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.kafka</groupId>
|
||||||
|
<artifactId>kafka-clients</artifactId>
|
||||||
|
<version>${kafka.version}</version>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.awaitility</groupId>
|
<groupId>org.awaitility</groupId>
|
||||||
<artifactId>awaitility</artifactId>
|
<artifactId>awaitility</artifactId>
|
||||||
|
@ -184,6 +189,7 @@
|
||||||
<renjin.version>RELEASE</renjin.version>
|
<renjin.version>RELEASE</renjin.version>
|
||||||
<rcaller.version>3.0</rcaller.version>
|
<rcaller.version>3.0</rcaller.version>
|
||||||
<rserve.version>1.8.1</rserve.version>
|
<rserve.version>1.8.1</rserve.version>
|
||||||
|
<kafka.version>2.5.0</kafka.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
|
|
@ -0,0 +1,28 @@
|
||||||
|
package com.baeldung.kafka.consumer;
|
||||||
|
|
||||||
|
class CountryPopulation {
|
||||||
|
|
||||||
|
private String country;
|
||||||
|
private Integer population;
|
||||||
|
|
||||||
|
public CountryPopulation(String country, Integer population) {
|
||||||
|
this.country = country;
|
||||||
|
this.population = population;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getCountry() {
|
||||||
|
return country;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCountry(String country) {
|
||||||
|
this.country = country;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Integer getPopulation() {
|
||||||
|
return population;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPopulation(Integer population) {
|
||||||
|
this.population = population;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,60 @@
|
||||||
|
package com.baeldung.kafka.consumer;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.stream.StreamSupport;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.consumer.Consumer;
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||||
|
import org.apache.kafka.common.TopicPartition;
|
||||||
|
import org.apache.kafka.common.errors.WakeupException;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
public class CountryPopulationConsumer {
|
||||||
|
|
||||||
|
private static Logger logger = LoggerFactory.getLogger(CountryPopulationConsumer.class);
|
||||||
|
|
||||||
|
private Consumer<String, Integer> consumer;
|
||||||
|
private java.util.function.Consumer<Throwable> exceptionConsumer;
|
||||||
|
private java.util.function.Consumer<CountryPopulation> countryPopulationConsumer;
|
||||||
|
|
||||||
|
public CountryPopulationConsumer(
|
||||||
|
Consumer<String, Integer> consumer, java.util.function.Consumer<Throwable> exceptionConsumer,
|
||||||
|
java.util.function.Consumer<CountryPopulation> countryPopulationConsumer) {
|
||||||
|
this.consumer = consumer;
|
||||||
|
this.exceptionConsumer = exceptionConsumer;
|
||||||
|
this.countryPopulationConsumer = countryPopulationConsumer;
|
||||||
|
}
|
||||||
|
|
||||||
|
void startBySubscribing(String topic) {
|
||||||
|
consume(() -> consumer.subscribe(Collections.singleton(topic)));
|
||||||
|
}
|
||||||
|
|
||||||
|
void startByAssigning(String topic, int partition) {
|
||||||
|
consume(() -> consumer.assign(Collections.singleton(new TopicPartition(topic, partition))));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void consume(Runnable beforePollingTask) {
|
||||||
|
try {
|
||||||
|
beforePollingTask.run();
|
||||||
|
while (true) {
|
||||||
|
ConsumerRecords<String, Integer> records = consumer.poll(Duration.ofMillis(1000));
|
||||||
|
StreamSupport.stream(records.spliterator(), false)
|
||||||
|
.map(record -> new CountryPopulation(record.key(), record.value()))
|
||||||
|
.forEach(countryPopulationConsumer);
|
||||||
|
consumer.commitSync();
|
||||||
|
}
|
||||||
|
} catch (WakeupException e) {
|
||||||
|
logger.info("Shutting down...");
|
||||||
|
} catch (RuntimeException ex) {
|
||||||
|
exceptionConsumer.accept(ex);
|
||||||
|
} finally {
|
||||||
|
consumer.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void stop() {
|
||||||
|
consumer.wakeup();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,100 @@
|
||||||
|
package com.baeldung.kafka.consumer;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||||
|
import org.apache.kafka.clients.consumer.MockConsumer;
|
||||||
|
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
|
||||||
|
import org.apache.kafka.common.KafkaException;
|
||||||
|
import org.apache.kafka.common.TopicPartition;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
class CountryPopulationConsumerUnitTest {
|
||||||
|
|
||||||
|
private static final String TOPIC = "topic";
|
||||||
|
private static final int PARTITION = 0;
|
||||||
|
|
||||||
|
private CountryPopulationConsumer countryPopulationConsumer;
|
||||||
|
|
||||||
|
private List<CountryPopulation> updates;
|
||||||
|
private Throwable pollException;
|
||||||
|
|
||||||
|
private MockConsumer<String, Integer> consumer;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void setUp() {
|
||||||
|
consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
|
||||||
|
updates = new ArrayList<>();
|
||||||
|
countryPopulationConsumer = new CountryPopulationConsumer(consumer, ex -> this.pollException = ex, updates::add);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void whenStartingByAssigningTopicPartition_thenExpectUpdatesAreConsumedCorrectly() {
|
||||||
|
// GIVEN
|
||||||
|
consumer.schedulePollTask(() -> consumer.addRecord(record(TOPIC, PARTITION, "Romania", 19_410_000)));
|
||||||
|
consumer.schedulePollTask(() -> countryPopulationConsumer.stop());
|
||||||
|
|
||||||
|
HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
|
||||||
|
TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
|
||||||
|
startOffsets.put(tp, 0L);
|
||||||
|
consumer.updateBeginningOffsets(startOffsets);
|
||||||
|
|
||||||
|
// WHEN
|
||||||
|
countryPopulationConsumer.startByAssigning(TOPIC, PARTITION);
|
||||||
|
|
||||||
|
// THEN
|
||||||
|
assertThat(updates).hasSize(1);
|
||||||
|
assertThat(consumer.closed()).isTrue();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void whenStartingBySubscribingToTopic_thenExpectUpdatesAreConsumedCorrectly() {
|
||||||
|
// GIVEN
|
||||||
|
consumer.schedulePollTask(() -> {
|
||||||
|
consumer.rebalance(Collections.singletonList(new TopicPartition(TOPIC, 0)));
|
||||||
|
consumer.addRecord(record(TOPIC, PARTITION, "Romania", 19_410_000));
|
||||||
|
});
|
||||||
|
consumer.schedulePollTask(() -> countryPopulationConsumer.stop());
|
||||||
|
|
||||||
|
HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
|
||||||
|
TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
|
||||||
|
startOffsets.put(tp, 0L);
|
||||||
|
consumer.updateBeginningOffsets(startOffsets);
|
||||||
|
|
||||||
|
// WHEN
|
||||||
|
countryPopulationConsumer.startBySubscribing(TOPIC);
|
||||||
|
|
||||||
|
// THEN
|
||||||
|
assertThat(updates).hasSize(1);
|
||||||
|
assertThat(consumer.closed()).isTrue();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void whenStartingBySubscribingToTopicAndExceptionOccurs_thenExpectExceptionIsHandledCorrectly() {
|
||||||
|
// GIVEN
|
||||||
|
consumer.schedulePollTask(() -> consumer.setPollException(new KafkaException("poll exception")));
|
||||||
|
consumer.schedulePollTask(() -> countryPopulationConsumer.stop());
|
||||||
|
|
||||||
|
HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
|
||||||
|
TopicPartition tp = new TopicPartition(TOPIC, 0);
|
||||||
|
startOffsets.put(tp, 0L);
|
||||||
|
consumer.updateBeginningOffsets(startOffsets);
|
||||||
|
|
||||||
|
// WHEN
|
||||||
|
countryPopulationConsumer.startBySubscribing(TOPIC);
|
||||||
|
|
||||||
|
// THEN
|
||||||
|
assertThat(pollException).isInstanceOf(KafkaException.class).hasMessage("poll exception");
|
||||||
|
assertThat(consumer.closed()).isTrue();
|
||||||
|
}
|
||||||
|
|
||||||
|
private ConsumerRecord<String, Integer> record(String topic, int partition, String country, int population) {
|
||||||
|
return new ConsumerRecord<>(topic, partition, 0, country, population);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue