diff --git a/apache-kafka-2/pom.xml b/apache-kafka-2/pom.xml index cd21a60e14..53217731c7 100644 --- a/apache-kafka-2/pom.xml +++ b/apache-kafka-2/pom.xml @@ -61,9 +61,9 @@ 5.7.0 - 2.8.0 + 3.6.1 1.19.3 - 1.15.3 + 1.19.3 2.15.2 diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/consumer/CustomKafkaListener.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/consumer/CustomKafkaListener.java index b432c2ba55..f4cefb21eb 100644 --- a/apache-kafka-2/src/main/java/com/baeldung/kafka/consumer/CustomKafkaListener.java +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/consumer/CustomKafkaListener.java @@ -12,14 +12,10 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; -public class CustomKafkaListener implements Runnable, AutoCloseable { - +public class CustomKafkaListener implements Runnable { private static final Logger log = Logger.getLogger(CustomKafkaListener.class.getName()); - private final String topic; private final KafkaConsumer consumer; - - private final AtomicBoolean running = new AtomicBoolean(false); private Consumer recordConsumer; @@ -50,18 +46,11 @@ public class CustomKafkaListener implements Runnable, AutoCloseable { @Override public void run() { - running.set(true); consumer.subscribe(Arrays.asList(topic)); - while (running.get()) { + while (true) { consumer.poll(Duration.ofMillis(100)) .forEach(record -> recordConsumer.accept(record.value())); } } - @Override - public void close() { - running.set(false); - consumer.close(); - } - } diff --git a/apache-kafka-2/src/test/java/com/baeldung/kafka/consumer/CustomKafkaListenerLiveTest.java b/apache-kafka-2/src/test/java/com/baeldung/kafka/consumer/CustomKafkaListenerLiveTest.java index dbe7483dcd..cba0aacce6 100644 --- a/apache-kafka-2/src/test/java/com/baeldung/kafka/consumer/CustomKafkaListenerLiveTest.java +++ b/apache-kafka-2/src/test/java/com/baeldung/kafka/consumer/CustomKafkaListenerLiveTest.java @@ -4,18 +4,32 @@ import static java.time.Duration.ofMillis; import static java.time.Duration.ofSeconds; import static java.util.Arrays.asList; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.testcontainers.shaded.org.awaitility.Awaitility.await; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Properties; +import java.time.Duration; +import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.junit.jupiter.Container; @@ -30,7 +44,7 @@ class CustomKafkaListenerLiveTest { private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")); static { - Awaitility.setDefaultTimeout(ofSeconds(1L)); + Awaitility.setDefaultTimeout(ofSeconds(5L)); Awaitility.setDefaultPollInterval(ofMillis(50L)); } @@ -42,33 +56,34 @@ class CustomKafkaListenerLiveTest { List consumedMessages = new ArrayList<>(); // when - try (CustomKafkaListener listener = new CustomKafkaListener(topic, bootstrapServers).onEach(consumedMessages::add)) { - CompletableFuture.runAsync(listener); - } + CustomKafkaListener listener = new CustomKafkaListener(topic, bootstrapServers).onEach(consumedMessages::add); + CompletableFuture.runAsync(listener); + // and publishArticles(topic, + "Introduction to Kafka", + "Kotlin for Java Developers", + "Reactive Spring Boot", + "Deploying Spring Boot Applications", + "Spring Security" + ); + + // then + await().untilAsserted(() -> + assertThat(consumedMessages).containsExactlyInAnyOrder( "Introduction to Kafka", "Kotlin for Java Developers", "Reactive Spring Boot", "Deploying Spring Boot Applications", "Spring Security" - ); + )); - // then - await().untilAsserted(() -> - assertThat(consumedMessages).containsExactlyInAnyOrder( - "Introduction to Kafka", - "Kotlin for Java Developers", - "Reactive Spring Boot", - "Deploying Spring Boot Applications", - "Spring Security" - )); } private void publishArticles(String topic, String... articles) { try (KafkaProducer producer = testKafkaProducer()) { Arrays.stream(articles) - .map(article -> new ProducerRecord(topic, article)) + .map(article -> new ProducerRecord<>(topic, "key-1", article)) .forEach(producer::send); } }