BAEL-7258: increase pom versions

This commit is contained in:
emanuel.trandafir 2023-12-15 12:44:12 +01:00
parent 40ef28a449
commit 0203ff1f0a
3 changed files with 38 additions and 34 deletions

View File

@ -61,9 +61,9 @@
<properties> <properties>
<jna.version>5.7.0</jna.version> <jna.version>5.7.0</jna.version>
<kafka.version>2.8.0</kafka.version> <kafka.version>3.6.1</kafka.version>
<testcontainers-kafka.version>1.19.3</testcontainers-kafka.version> <testcontainers-kafka.version>1.19.3</testcontainers-kafka.version>
<testcontainers-jupiter.version>1.15.3</testcontainers-jupiter.version> <testcontainers-jupiter.version>1.19.3</testcontainers-jupiter.version>
<jackson.databind.version>2.15.2</jackson.databind.version> <jackson.databind.version>2.15.2</jackson.databind.version>
</properties> </properties>

View File

@ -12,14 +12,10 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
public class CustomKafkaListener implements Runnable, AutoCloseable { public class CustomKafkaListener implements Runnable {
private static final Logger log = Logger.getLogger(CustomKafkaListener.class.getName()); private static final Logger log = Logger.getLogger(CustomKafkaListener.class.getName());
private final String topic; private final String topic;
private final KafkaConsumer<String, String> consumer; private final KafkaConsumer<String, String> consumer;
private final AtomicBoolean running = new AtomicBoolean(false);
private Consumer<String> recordConsumer; private Consumer<String> recordConsumer;
@ -50,18 +46,11 @@ public class CustomKafkaListener implements Runnable, AutoCloseable {
@Override @Override
public void run() { public void run() {
running.set(true);
consumer.subscribe(Arrays.asList(topic)); consumer.subscribe(Arrays.asList(topic));
while (running.get()) { while (true) {
consumer.poll(Duration.ofMillis(100)) consumer.poll(Duration.ofMillis(100))
.forEach(record -> recordConsumer.accept(record.value())); .forEach(record -> recordConsumer.accept(record.value()));
} }
} }
@Override
public void close() {
running.set(false);
consumer.close();
}
} }

View File

@ -4,18 +4,32 @@ import static java.time.Duration.ofMillis;
import static java.time.Duration.ofSeconds; import static java.time.Duration.ofSeconds;
import static java.util.Arrays.asList; import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.testcontainers.shaded.org.awaitility.Awaitility.await; import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
import java.util.ArrayList; import java.time.Duration;
import java.util.Arrays; import java.util.*;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Container;
@ -30,7 +44,7 @@ class CustomKafkaListenerLiveTest {
private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")); private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
static { static {
Awaitility.setDefaultTimeout(ofSeconds(1L)); Awaitility.setDefaultTimeout(ofSeconds(5L));
Awaitility.setDefaultPollInterval(ofMillis(50L)); Awaitility.setDefaultPollInterval(ofMillis(50L));
} }
@ -42,33 +56,34 @@ class CustomKafkaListenerLiveTest {
List<String> consumedMessages = new ArrayList<>(); List<String> consumedMessages = new ArrayList<>();
// when // when
try (CustomKafkaListener listener = new CustomKafkaListener(topic, bootstrapServers).onEach(consumedMessages::add)) { CustomKafkaListener listener = new CustomKafkaListener(topic, bootstrapServers).onEach(consumedMessages::add);
CompletableFuture.runAsync(listener); CompletableFuture.runAsync(listener);
}
// and // and
publishArticles(topic, publishArticles(topic,
"Introduction to Kafka",
"Kotlin for Java Developers",
"Reactive Spring Boot",
"Deploying Spring Boot Applications",
"Spring Security"
);
// then
await().untilAsserted(() ->
assertThat(consumedMessages).containsExactlyInAnyOrder(
"Introduction to Kafka", "Introduction to Kafka",
"Kotlin for Java Developers", "Kotlin for Java Developers",
"Reactive Spring Boot", "Reactive Spring Boot",
"Deploying Spring Boot Applications", "Deploying Spring Boot Applications",
"Spring Security" "Spring Security"
); ));
// then
await().untilAsserted(() ->
assertThat(consumedMessages).containsExactlyInAnyOrder(
"Introduction to Kafka",
"Kotlin for Java Developers",
"Reactive Spring Boot",
"Deploying Spring Boot Applications",
"Spring Security"
));
} }
private void publishArticles(String topic, String... articles) { private void publishArticles(String topic, String... articles) {
try (KafkaProducer<String, String> producer = testKafkaProducer()) { try (KafkaProducer<String, String> producer = testKafkaProducer()) {
Arrays.stream(articles) Arrays.stream(articles)
.map(article -> new ProducerRecord<String,String>(topic, article)) .map(article -> new ProducerRecord<>(topic, "key-1", article))
.forEach(producer::send); .forEach(producer::send);
} }
} }