From 5d13422a53d95116560abe2d48da18e8107d56b3 Mon Sep 17 00:00:00 2001 From: emanueltrandafir1993 Date: Sun, 3 Dec 2023 22:17:16 +0100 Subject: [PATCH] BAEL-7258: kafka listener without spring --- .../kafka/consumer/CustomKafkaListener.java | 68 +++++++++++++++ .../KafkaListenerWithoutSpringLiveTest.java | 85 +++++++++++++++++++ 2 files changed, 153 insertions(+) create mode 100644 apache-kafka-2/src/main/java/com/baeldung/kafka/consumer/CustomKafkaListener.java create mode 100644 apache-kafka-2/src/test/java/com/baeldung/kafka/consumer/KafkaListenerWithoutSpringLiveTest.java diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/consumer/CustomKafkaListener.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/consumer/CustomKafkaListener.java new file mode 100644 index 0000000000..58987d473b --- /dev/null +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/consumer/CustomKafkaListener.java @@ -0,0 +1,68 @@ +package com.baeldung.kafka.consumer; + +import java.io.Closeable; +import java.time.Duration; +import java.util.Arrays; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.logging.Logger; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; + +public class CustomKafkaListener implements Runnable, Closeable { + + private static final Logger log = Logger.getLogger(CustomKafkaListener.class.getName()); + + private final String topic; + private final KafkaConsumer consumer; + + private final AtomicBoolean running = new AtomicBoolean(false); + private Consumer recordConsumer; + + + public CustomKafkaListener(String topic, KafkaConsumer consumer) { + this.topic = topic; + this.consumer = consumer; + this.recordConsumer = record -> log.info("received: " + record); + } + + public CustomKafkaListener(String topic, String bootstrapServers) { + this(topic, defaultKafkaConsumer(bootstrapServers)); + } + + private static KafkaConsumer defaultKafkaConsumer(String boostrapServers) { + Properties props = new Properties(); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, boostrapServers); + props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test_group_id"); + props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + return new KafkaConsumer<>(props); + } + + public CustomKafkaListener doForEach(Consumer newConsumer) { + recordConsumer = recordConsumer.andThen(newConsumer); + return this; + } + + @Override + public void run() { + running.set(true); + consumer.subscribe(Arrays.asList(topic)); + + while (running.get()) { + consumer.poll(Duration.ofMillis(100)) + .forEach(record -> recordConsumer.accept(record.value())); + } + } + + @Override + public void close() { + running.set(false); + consumer.close(); + } + +} diff --git a/apache-kafka-2/src/test/java/com/baeldung/kafka/consumer/KafkaListenerWithoutSpringLiveTest.java b/apache-kafka-2/src/test/java/com/baeldung/kafka/consumer/KafkaListenerWithoutSpringLiveTest.java new file mode 100644 index 0000000000..213344d0a8 --- /dev/null +++ b/apache-kafka-2/src/test/java/com/baeldung/kafka/consumer/KafkaListenerWithoutSpringLiveTest.java @@ -0,0 +1,85 @@ +package com.baeldung.kafka.consumer; + +import static java.time.Duration.ofMillis; +import static java.time.Duration.ofSeconds; +import static java.util.Arrays.asList; +import static org.assertj.core.api.Assertions.assertThat; +import static org.testcontainers.shaded.org.awaitility.Awaitility.await; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.shaded.org.awaitility.Awaitility; +import org.testcontainers.utility.DockerImageName; + +@Testcontainers +class KafkaListenerWithoutSpringLiveTest { + + @Container + private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")); + + static { + Awaitility.setDefaultTimeout(ofSeconds(1L)); + Awaitility.setDefaultPollInterval(ofMillis(50L)); + } + + @Test + void test() { + // given + String topic = "baeldung.articles.published"; + String bootstrapServers = KAFKA_CONTAINER.getBootstrapServers(); + List consumedMessages = new ArrayList<>(); + + // when + try (CustomKafkaListener listener = new CustomKafkaListener(topic, bootstrapServers)) { + CompletableFuture.runAsync(() -> + listener.doForEach(consumedMessages::add).run() + ); + } + // and + publishArticles(topic, asList( + "Introduction to Kafka", + "Kotlin for Java Developers", + "Reactive Spring Boot", + "Deploying Spring Boot Applications", + "Spring Security" + )); + + // then + await().untilAsserted(() -> assertThat(consumedMessages) + .containsExactlyInAnyOrder( + "Introduction to Kafka", + "Kotlin for Java Developers", + "Reactive Spring Boot", + "Deploying Spring Boot Applications", + "Spring Security" + )); + } + + private void publishArticles(String topic, List articles) { + try (KafkaProducer producer = testKafkaProducer()) { + articles.stream() + .map(article -> new ProducerRecord(topic, article)) + .forEach(producer::send); + } + } + + private static KafkaProducer testKafkaProducer() { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + return new KafkaProducer<>(props); + } + +} \ No newline at end of file