Merge pull request #15380 from etrandafir93/features/BAEL-7258-kafka_listener_without_spring

BAEL-7258: kafka listener without spring
This commit is contained in:
davidmartinezbarua 2023-12-15 13:57:54 -03:00 committed by GitHub
commit 980a886fef
3 changed files with 140 additions and 2 deletions

View File

@ -61,9 +61,9 @@
<properties>
<jna.version>5.7.0</jna.version>
<kafka.version>2.8.0</kafka.version>
<kafka.version>3.6.1</kafka.version>
<testcontainers-kafka.version>1.19.3</testcontainers-kafka.version>
<testcontainers-jupiter.version>1.15.3</testcontainers-jupiter.version>
<testcontainers-jupiter.version>1.19.3</testcontainers-jupiter.version>
<jackson.databind.version>2.15.2</jackson.databind.version>
</properties>

View File

@ -0,0 +1,56 @@
package com.baeldung.kafka.consumer;
import java.io.Closeable;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.logging.Logger;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
public class CustomKafkaListener implements Runnable {
private static final Logger log = Logger.getLogger(CustomKafkaListener.class.getName());
private final String topic;
private final KafkaConsumer<String, String> consumer;
private Consumer<String> recordConsumer;
public CustomKafkaListener(String topic, KafkaConsumer<String, String> consumer) {
this.topic = topic;
this.consumer = consumer;
this.recordConsumer = record -> log.info("received: " + record);
}
public CustomKafkaListener(String topic, String bootstrapServers) {
this(topic, defaultKafkaConsumer(bootstrapServers));
}
private static KafkaConsumer<String, String> defaultKafkaConsumer(String boostrapServers) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, boostrapServers);
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test_group_id");
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return new KafkaConsumer<>(props);
}
public CustomKafkaListener onEach(Consumer<String> newConsumer) {
recordConsumer = recordConsumer.andThen(newConsumer);
return this;
}
@Override
public void run() {
consumer.subscribe(Arrays.asList(topic));
while (true) {
consumer.poll(Duration.ofMillis(100))
.forEach(record -> recordConsumer.accept(record.value()));
}
}
}

View File

@ -0,0 +1,82 @@
package com.baeldung.kafka.consumer;
import static java.time.Duration.ofMillis;
import static java.time.Duration.ofSeconds;
import static org.assertj.core.api.Assertions.assertThat;
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testcontainers.utility.DockerImageName;
@Testcontainers
class CustomKafkaListenerLiveTest {
@Container
private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
static {
Awaitility.setDefaultTimeout(ofSeconds(5L));
Awaitility.setDefaultPollInterval(ofMillis(50L));
}
@Test
void givenANewCustomKafkaListener_thenConsumesAllMessages() {
// given
String topic = "baeldung.articles.published";
String bootstrapServers = KAFKA_CONTAINER.getBootstrapServers();
List<String> consumedMessages = new ArrayList<>();
// when
CustomKafkaListener listener = new CustomKafkaListener(topic, bootstrapServers).onEach(consumedMessages::add);
CompletableFuture.runAsync(listener);
// and
publishArticles(topic,
"Introduction to Kafka",
"Kotlin for Java Developers",
"Reactive Spring Boot",
"Deploying Spring Boot Applications",
"Spring Security"
);
// then
await().untilAsserted(() ->
assertThat(consumedMessages).containsExactlyInAnyOrder(
"Introduction to Kafka",
"Kotlin for Java Developers",
"Reactive Spring Boot",
"Deploying Spring Boot Applications",
"Spring Security"
));
}
private void publishArticles(String topic, String... articles) {
try (KafkaProducer<String, String> producer = testKafkaProducer()) {
Arrays.stream(articles)
.map(article -> new ProducerRecord<>(topic, "key-1", article))
.forEach(producer::send);
}
}
private static KafkaProducer<String, String> testKafkaProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<>(props);
}
}