BAEL-7258: renaming
This commit is contained in:
parent
5d13422a53
commit
40ef28a449
@ -12,7 +12,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
|
|||||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||||
|
|
||||||
public class CustomKafkaListener implements Runnable, Closeable {
|
public class CustomKafkaListener implements Runnable, AutoCloseable {
|
||||||
|
|
||||||
private static final Logger log = Logger.getLogger(CustomKafkaListener.class.getName());
|
private static final Logger log = Logger.getLogger(CustomKafkaListener.class.getName());
|
||||||
|
|
||||||
@ -43,7 +43,7 @@ public class CustomKafkaListener implements Runnable, Closeable {
|
|||||||
return new KafkaConsumer<>(props);
|
return new KafkaConsumer<>(props);
|
||||||
}
|
}
|
||||||
|
|
||||||
public CustomKafkaListener doForEach(Consumer<String> newConsumer) {
|
public CustomKafkaListener onEach(Consumer<String> newConsumer) {
|
||||||
recordConsumer = recordConsumer.andThen(newConsumer);
|
recordConsumer = recordConsumer.andThen(newConsumer);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
@ -52,7 +52,6 @@ public class CustomKafkaListener implements Runnable, Closeable {
|
|||||||
public void run() {
|
public void run() {
|
||||||
running.set(true);
|
running.set(true);
|
||||||
consumer.subscribe(Arrays.asList(topic));
|
consumer.subscribe(Arrays.asList(topic));
|
||||||
|
|
||||||
while (running.get()) {
|
while (running.get()) {
|
||||||
consumer.poll(Duration.ofMillis(100))
|
consumer.poll(Duration.ofMillis(100))
|
||||||
.forEach(record -> recordConsumer.accept(record.value()));
|
.forEach(record -> recordConsumer.accept(record.value()));
|
||||||
|
@ -7,6 +7,7 @@ import static org.assertj.core.api.Assertions.assertThat;
|
|||||||
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
|
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
@ -23,7 +24,7 @@ import org.testcontainers.shaded.org.awaitility.Awaitility;
|
|||||||
import org.testcontainers.utility.DockerImageName;
|
import org.testcontainers.utility.DockerImageName;
|
||||||
|
|
||||||
@Testcontainers
|
@Testcontainers
|
||||||
class KafkaListenerWithoutSpringLiveTest {
|
class CustomKafkaListenerLiveTest {
|
||||||
|
|
||||||
@Container
|
@Container
|
||||||
private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
|
private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
|
||||||
@ -34,30 +35,28 @@ class KafkaListenerWithoutSpringLiveTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void test() {
|
void givenANewCustomKafkaListener_thenConsumesAllMessages() {
|
||||||
// given
|
// given
|
||||||
String topic = "baeldung.articles.published";
|
String topic = "baeldung.articles.published";
|
||||||
String bootstrapServers = KAFKA_CONTAINER.getBootstrapServers();
|
String bootstrapServers = KAFKA_CONTAINER.getBootstrapServers();
|
||||||
List<String> consumedMessages = new ArrayList<>();
|
List<String> consumedMessages = new ArrayList<>();
|
||||||
|
|
||||||
// when
|
// when
|
||||||
try (CustomKafkaListener listener = new CustomKafkaListener(topic, bootstrapServers)) {
|
try (CustomKafkaListener listener = new CustomKafkaListener(topic, bootstrapServers).onEach(consumedMessages::add)) {
|
||||||
CompletableFuture.runAsync(() ->
|
CompletableFuture.runAsync(listener);
|
||||||
listener.doForEach(consumedMessages::add).run()
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
// and
|
// and
|
||||||
publishArticles(topic, asList(
|
publishArticles(topic,
|
||||||
"Introduction to Kafka",
|
"Introduction to Kafka",
|
||||||
"Kotlin for Java Developers",
|
"Kotlin for Java Developers",
|
||||||
"Reactive Spring Boot",
|
"Reactive Spring Boot",
|
||||||
"Deploying Spring Boot Applications",
|
"Deploying Spring Boot Applications",
|
||||||
"Spring Security"
|
"Spring Security"
|
||||||
));
|
);
|
||||||
|
|
||||||
// then
|
// then
|
||||||
await().untilAsserted(() -> assertThat(consumedMessages)
|
await().untilAsserted(() ->
|
||||||
.containsExactlyInAnyOrder(
|
assertThat(consumedMessages).containsExactlyInAnyOrder(
|
||||||
"Introduction to Kafka",
|
"Introduction to Kafka",
|
||||||
"Kotlin for Java Developers",
|
"Kotlin for Java Developers",
|
||||||
"Reactive Spring Boot",
|
"Reactive Spring Boot",
|
||||||
@ -66,9 +65,9 @@ class KafkaListenerWithoutSpringLiveTest {
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void publishArticles(String topic, List<String> articles) {
|
private void publishArticles(String topic, String... articles) {
|
||||||
try (KafkaProducer<String, String> producer = testKafkaProducer()) {
|
try (KafkaProducer<String, String> producer = testKafkaProducer()) {
|
||||||
articles.stream()
|
Arrays.stream(articles)
|
||||||
.map(article -> new ProducerRecord<String,String>(topic, article))
|
.map(article -> new ProducerRecord<String,String>(topic, article))
|
||||||
.forEach(producer::send);
|
.forEach(producer::send);
|
||||||
}
|
}
|
Loading…
x
Reference in New Issue
Block a user