From ef44b839eab33833e9dd61221f2cc53814f58827 Mon Sep 17 00:00:00 2001 From: Amol Gote Date: Sat, 10 Feb 2024 18:53:37 -0500 Subject: [PATCH] BAEL-7374 - First Cut --- .../KafaConsumeLastNMessagesLiveTest.java | 2 - spring-kafka-3/pom.xml | 5 + .../kafka/start/stop/consumer/Constants.java | 8 ++ .../stop/consumer/KafkaConsumerConfig.java | 42 +++++++ .../consumer/KafkaListenerControlService.java | 29 +++++ .../StartStopConsumerApplication.java | 12 ++ .../kafka/start/stop/consumer/UserEvent.java | 28 +++++ .../stop/consumer/UserEventListener.java | 21 ++++ .../start/stop/consumer/UserEventStore.java | 23 ++++ .../StartStopConsumerTest.java | 109 ++++++++++++++++++ 10 files changed, 277 insertions(+), 2 deletions(-) create mode 100644 spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/Constants.java create mode 100644 spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/KafkaConsumerConfig.java create mode 100644 spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/KafkaListenerControlService.java create mode 100644 spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/StartStopConsumerApplication.java create mode 100644 spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEvent.java create mode 100644 spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEventListener.java create mode 100644 spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEventStore.java create mode 100644 spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerTest.java diff --git a/apache-kafka-2/src/test/java/com/baeldung/kafka/KafaConsumeLastNMessagesLiveTest.java b/apache-kafka-2/src/test/java/com/baeldung/kafka/KafaConsumeLastNMessagesLiveTest.java index b92f65ca5a..f631c1a85e 100644 --- a/apache-kafka-2/src/test/java/com/baeldung/kafka/KafaConsumeLastNMessagesLiveTest.java +++ b/apache-kafka-2/src/test/java/com/baeldung/kafka/KafaConsumeLastNMessagesLiveTest.java @@ -17,13 +17,11 @@ import org.testcontainers.containers.KafkaContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.DockerImageName; - import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.concurrent.ExecutionException; - import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; diff --git a/spring-kafka-3/pom.xml b/spring-kafka-3/pom.xml index 894eab2576..c5db336e84 100644 --- a/spring-kafka-3/pom.xml +++ b/spring-kafka-3/pom.xml @@ -50,6 +50,11 @@ ${awaitility.version} test + + com.fasterxml.jackson.core + jackson-databind + ${jackson.version} + diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/Constants.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/Constants.java new file mode 100644 index 0000000000..71e453fa43 --- /dev/null +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/Constants.java @@ -0,0 +1,8 @@ +package com.baeldung.spring.kafka.start.stop.consumer; + +public class Constants { + public static final String MULTI_PARTITION_TOPIC = "multi_partition_topic"; + public static final int MULTIPLE_PARTITIONS = 5; + public static final short REPLICATION_FACTOR = 1; + public static final String LISTENER_ID = "listener-id-1"; +} diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/KafkaConsumerConfig.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/KafkaConsumerConfig.java new file mode 100644 index 0000000000..21f6270bca --- /dev/null +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/KafkaConsumerConfig.java @@ -0,0 +1,42 @@ +package com.baeldung.spring.kafka.start.stop.consumer; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.support.serializer.JsonDeserializer; + +import java.util.HashMap; +import java.util.Map; + +@EnableKafka +@Configuration +public class KafkaConsumerConfig { + + @Value("${spring.kafka.bootstrap-servers}") + private String bootstrapServers; + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + return factory; + } + + @Bean + public DefaultKafkaConsumerFactory consumerFactory() { + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.baeldung.spring.kafka.start.stop.consumer"); + return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), + new JsonDeserializer<>(UserEvent.class)); + } +} diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/KafkaListenerControlService.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/KafkaListenerControlService.java new file mode 100644 index 0000000000..259c461b4a --- /dev/null +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/KafkaListenerControlService.java @@ -0,0 +1,29 @@ +package com.baeldung.spring.kafka.start.stop.consumer; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.listener.MessageListenerContainer; +import org.springframework.stereotype.Service; + +@Service +public class KafkaListenerControlService { + + @Autowired + private KafkaListenerEndpointRegistry registry; + + // Method to start a listener + public void startListener(String listenerId) { + MessageListenerContainer listenerContainer = registry.getListenerContainer(listenerId); + if (listenerContainer != null && !listenerContainer.isRunning()) { + listenerContainer.start(); + } + } + + // Method to stop a listener + public void stopListener(String listenerId) { + MessageListenerContainer listenerContainer = registry.getListenerContainer(listenerId); + if (listenerContainer != null && listenerContainer.isRunning()) { + listenerContainer.stop(); + } + } +} diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/StartStopConsumerApplication.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/StartStopConsumerApplication.java new file mode 100644 index 0000000000..c4461b9388 --- /dev/null +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/StartStopConsumerApplication.java @@ -0,0 +1,12 @@ +package com.baeldung.spring.kafka.start.stop.consumer; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class StartStopConsumerApplication { + + public static void main(String[] args) { + SpringApplication.run(com.baeldung.spring.kafka.deserialization.exception.Application.class, args); + } + +} diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEvent.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEvent.java new file mode 100644 index 0000000000..1c7a7d35c8 --- /dev/null +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEvent.java @@ -0,0 +1,28 @@ +package com.baeldung.spring.kafka.start.stop.consumer; + +public class UserEvent { + private String userEventId; + private long eventNanoTime; + + public UserEvent(){} + + public UserEvent(String userEventId) { + this.userEventId = userEventId; + } + + public String getUserEventId() { + return userEventId; + } + + public void setUserEventId(String userEventId) { + this.userEventId = userEventId; + } + + public long getEventNanoTime() { + return eventNanoTime; + } + + public void setEventNanoTime(long eventNanoTime) { + this.eventNanoTime = eventNanoTime; + } +} diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEventListener.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEventListener.java new file mode 100644 index 0000000000..d35828211e --- /dev/null +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEventListener.java @@ -0,0 +1,21 @@ +package com.baeldung.spring.kafka.start.stop.consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +@Component +public class UserEventListener { + private static final Logger logger = LoggerFactory.getLogger(UserEventListener.class); + + @Autowired + UserEventStore userEventStore; + + @KafkaListener(id = Constants.LISTENER_ID, topics = Constants.MULTI_PARTITION_TOPIC, groupId = "test-group", + containerFactory = "kafkaListenerContainerFactory", autoStartup = "false") + public void userEventListener(UserEvent userEvent) { + logger.info("Received UserEvent: " + userEvent.getUserEventId() + ", Time: " + userEvent.getEventNanoTime()); + userEventStore.addUserEvent(userEvent); + } +} diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEventStore.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEventStore.java new file mode 100644 index 0000000000..e0ccab20bb --- /dev/null +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEventStore.java @@ -0,0 +1,23 @@ +package com.baeldung.spring.kafka.start.stop.consumer; + +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; + +@Component +public class UserEventStore { + private final List userEvents = new ArrayList<>(); + + public void addUserEvent(UserEvent userEvent){ + userEvents.add(userEvent); + } + + public List getUserEvents(){ + return userEvents; + } + + public void clearUserEvents(){ + this.userEvents.clear(); + } +} diff --git a/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerTest.java b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerTest.java new file mode 100644 index 0000000000..7dad372737 --- /dev/null +++ b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerTest.java @@ -0,0 +1,109 @@ +package com.baeldung.spring.kafka.startstopconsumer; + +import com.baeldung.spring.kafka.start.stop.consumer.*; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.LongSerializer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.kafka.support.serializer.JsonSerializer; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.shaded.com.google.common.collect.ImmutableList; +import org.testcontainers.utility.DockerImageName; +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; + + +import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +@Testcontainers +@SpringBootTest(classes = StartStopConsumerApplication.class) +public class StartStopConsumerTest { + + private static KafkaProducer producer; + + private static final Logger logger = LoggerFactory.getLogger(StartStopConsumerTest.class); + + @Container + private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")); + + @Autowired + KafkaListenerControlService kafkaListenerControlService; + + @Autowired + UserEventStore userEventStore; + + @DynamicPropertySource + static void setProps(DynamicPropertyRegistry registry) { + registry.add("spring.kafka.bootstrap-servers", KAFKA_CONTAINER::getBootstrapServers); + } + + @BeforeAll + static void setup() throws ExecutionException, InterruptedException { + KAFKA_CONTAINER.addExposedPort(9092); + + Properties adminProperties = new Properties(); + adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + + Properties producerProperties = new Properties(); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); + producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName()); + Admin admin = Admin.create(adminProperties); + producer = new KafkaProducer<>(producerProperties); + admin.createTopics(ImmutableList.of(new NewTopic(Constants.MULTI_PARTITION_TOPIC, Constants.MULTIPLE_PARTITIONS, Constants.REPLICATION_FACTOR))) + .all() + .get(); + } + + @AfterAll + static void destroy() { + KAFKA_CONTAINER.stop(); + } + + @Test + void processMessages_whenListenerIsRestarted_thenCorrectNumberOfMessagesAreConsumed() throws ExecutionException, InterruptedException { + kafkaListenerControlService.startListener(Constants.LISTENER_ID); + + //Verification that listener has started. + UserEvent startUserEventTest = new UserEvent(UUID.randomUUID().toString()); + startUserEventTest.setEventNanoTime(System.nanoTime()); + producer.send(new ProducerRecord<>(Constants.MULTI_PARTITION_TOPIC, startUserEventTest)); + await().untilAsserted(() -> assertEquals(1, this.userEventStore.getUserEvents().size())); + this.userEventStore.clearUserEvents(); + + for (long count = 1; count <= 10; count++) { + UserEvent userEvent = new UserEvent(UUID.randomUUID().toString()); + userEvent.setEventNanoTime(System.nanoTime()); + Future future = producer.send(new ProducerRecord<>(Constants.MULTI_PARTITION_TOPIC, userEvent)); + RecordMetadata metadata = future.get(); + if (count == 4) { + await().untilAsserted(() -> assertEquals(4, this.userEventStore.getUserEvents().size())); + this.kafkaListenerControlService.stopListener(Constants.LISTENER_ID); + this.userEventStore.clearUserEvents(); + } + logger.info("User Event ID: " + userEvent.getUserEventId() + ", Partition : " + metadata.partition()); + } + assertEquals(0, this.userEventStore.getUserEvents().size()); + kafkaListenerControlService.startListener(Constants.LISTENER_ID); + await().untilAsserted(() -> assertEquals(6, this.userEventStore.getUserEvents().size())); + kafkaListenerControlService.stopListener(Constants.LISTENER_ID); + } +} \ No newline at end of file