BAEL-7258: kafka listener without spring
This commit is contained in:
parent
dfb560b277
commit
5d13422a53
@ -0,0 +1,68 @@
|
|||||||
|
package com.baeldung.kafka.consumer;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Properties;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
import java.util.logging.Logger;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||||
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||||
|
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||||
|
|
||||||
|
public class CustomKafkaListener implements Runnable, Closeable {
|
||||||
|
|
||||||
|
private static final Logger log = Logger.getLogger(CustomKafkaListener.class.getName());
|
||||||
|
|
||||||
|
private final String topic;
|
||||||
|
private final KafkaConsumer<String, String> consumer;
|
||||||
|
|
||||||
|
private final AtomicBoolean running = new AtomicBoolean(false);
|
||||||
|
private Consumer<String> recordConsumer;
|
||||||
|
|
||||||
|
|
||||||
|
public CustomKafkaListener(String topic, KafkaConsumer<String, String> consumer) {
|
||||||
|
this.topic = topic;
|
||||||
|
this.consumer = consumer;
|
||||||
|
this.recordConsumer = record -> log.info("received: " + record);
|
||||||
|
}
|
||||||
|
|
||||||
|
public CustomKafkaListener(String topic, String bootstrapServers) {
|
||||||
|
this(topic, defaultKafkaConsumer(bootstrapServers));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static KafkaConsumer<String, String> defaultKafkaConsumer(String boostrapServers) {
|
||||||
|
Properties props = new Properties();
|
||||||
|
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, boostrapServers);
|
||||||
|
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test_group_id");
|
||||||
|
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||||
|
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
|
||||||
|
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
|
||||||
|
return new KafkaConsumer<>(props);
|
||||||
|
}
|
||||||
|
|
||||||
|
public CustomKafkaListener doForEach(Consumer<String> newConsumer) {
|
||||||
|
recordConsumer = recordConsumer.andThen(newConsumer);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
running.set(true);
|
||||||
|
consumer.subscribe(Arrays.asList(topic));
|
||||||
|
|
||||||
|
while (running.get()) {
|
||||||
|
consumer.poll(Duration.ofMillis(100))
|
||||||
|
.forEach(record -> recordConsumer.accept(record.value()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
running.set(false);
|
||||||
|
consumer.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,85 @@
|
|||||||
|
package com.baeldung.kafka.consumer;
|
||||||
|
|
||||||
|
import static java.time.Duration.ofMillis;
|
||||||
|
import static java.time.Duration.ofSeconds;
|
||||||
|
import static java.util.Arrays.asList;
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Properties;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||||
|
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||||
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
|
import org.apache.kafka.common.serialization.StringSerializer;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.testcontainers.containers.KafkaContainer;
|
||||||
|
import org.testcontainers.junit.jupiter.Container;
|
||||||
|
import org.testcontainers.junit.jupiter.Testcontainers;
|
||||||
|
import org.testcontainers.shaded.org.awaitility.Awaitility;
|
||||||
|
import org.testcontainers.utility.DockerImageName;
|
||||||
|
|
||||||
|
@Testcontainers
|
||||||
|
class KafkaListenerWithoutSpringLiveTest {
|
||||||
|
|
||||||
|
@Container
|
||||||
|
private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
|
||||||
|
|
||||||
|
static {
|
||||||
|
Awaitility.setDefaultTimeout(ofSeconds(1L));
|
||||||
|
Awaitility.setDefaultPollInterval(ofMillis(50L));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void test() {
|
||||||
|
// given
|
||||||
|
String topic = "baeldung.articles.published";
|
||||||
|
String bootstrapServers = KAFKA_CONTAINER.getBootstrapServers();
|
||||||
|
List<String> consumedMessages = new ArrayList<>();
|
||||||
|
|
||||||
|
// when
|
||||||
|
try (CustomKafkaListener listener = new CustomKafkaListener(topic, bootstrapServers)) {
|
||||||
|
CompletableFuture.runAsync(() ->
|
||||||
|
listener.doForEach(consumedMessages::add).run()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
// and
|
||||||
|
publishArticles(topic, asList(
|
||||||
|
"Introduction to Kafka",
|
||||||
|
"Kotlin for Java Developers",
|
||||||
|
"Reactive Spring Boot",
|
||||||
|
"Deploying Spring Boot Applications",
|
||||||
|
"Spring Security"
|
||||||
|
));
|
||||||
|
|
||||||
|
// then
|
||||||
|
await().untilAsserted(() -> assertThat(consumedMessages)
|
||||||
|
.containsExactlyInAnyOrder(
|
||||||
|
"Introduction to Kafka",
|
||||||
|
"Kotlin for Java Developers",
|
||||||
|
"Reactive Spring Boot",
|
||||||
|
"Deploying Spring Boot Applications",
|
||||||
|
"Spring Security"
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void publishArticles(String topic, List<String> articles) {
|
||||||
|
try (KafkaProducer<String, String> producer = testKafkaProducer()) {
|
||||||
|
articles.stream()
|
||||||
|
.map(article -> new ProducerRecord<String,String>(topic, article))
|
||||||
|
.forEach(producer::send);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static KafkaProducer<String, String> testKafkaProducer() {
|
||||||
|
Properties props = new Properties();
|
||||||
|
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
|
||||||
|
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
|
||||||
|
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
|
||||||
|
return new KafkaProducer<>(props);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user