From ef44b839eab33833e9dd61221f2cc53814f58827 Mon Sep 17 00:00:00 2001 From: Amol Gote Date: Sat, 10 Feb 2024 18:53:37 -0500 Subject: [PATCH 01/16] BAEL-7374 - First Cut --- .../KafaConsumeLastNMessagesLiveTest.java | 2 - spring-kafka-3/pom.xml | 5 + .../kafka/start/stop/consumer/Constants.java | 8 ++ .../stop/consumer/KafkaConsumerConfig.java | 42 +++++++ .../consumer/KafkaListenerControlService.java | 29 +++++ .../StartStopConsumerApplication.java | 12 ++ .../kafka/start/stop/consumer/UserEvent.java | 28 +++++ .../stop/consumer/UserEventListener.java | 21 ++++ .../start/stop/consumer/UserEventStore.java | 23 ++++ .../StartStopConsumerTest.java | 109 ++++++++++++++++++ 10 files changed, 277 insertions(+), 2 deletions(-) create mode 100644 spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/Constants.java create mode 100644 spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/KafkaConsumerConfig.java create mode 100644 spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/KafkaListenerControlService.java create mode 100644 spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/StartStopConsumerApplication.java create mode 100644 spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEvent.java create mode 100644 spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEventListener.java create mode 100644 spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEventStore.java create mode 100644 spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerTest.java diff --git a/apache-kafka-2/src/test/java/com/baeldung/kafka/KafaConsumeLastNMessagesLiveTest.java b/apache-kafka-2/src/test/java/com/baeldung/kafka/KafaConsumeLastNMessagesLiveTest.java index b92f65ca5a..f631c1a85e 100644 --- a/apache-kafka-2/src/test/java/com/baeldung/kafka/KafaConsumeLastNMessagesLiveTest.java +++ b/apache-kafka-2/src/test/java/com/baeldung/kafka/KafaConsumeLastNMessagesLiveTest.java @@ -17,13 +17,11 @@ import org.testcontainers.containers.KafkaContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.DockerImageName; - import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.concurrent.ExecutionException; - import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; diff --git a/spring-kafka-3/pom.xml b/spring-kafka-3/pom.xml index 894eab2576..c5db336e84 100644 --- a/spring-kafka-3/pom.xml +++ b/spring-kafka-3/pom.xml @@ -50,6 +50,11 @@ ${awaitility.version} test + + com.fasterxml.jackson.core + jackson-databind + ${jackson.version} + diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/Constants.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/Constants.java new file mode 100644 index 0000000000..71e453fa43 --- /dev/null +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/Constants.java @@ -0,0 +1,8 @@ +package com.baeldung.spring.kafka.start.stop.consumer; + +public class Constants { + public static final String MULTI_PARTITION_TOPIC = "multi_partition_topic"; + public static final int MULTIPLE_PARTITIONS = 5; + public static final short REPLICATION_FACTOR = 1; + public static final String LISTENER_ID = "listener-id-1"; +} diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/KafkaConsumerConfig.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/KafkaConsumerConfig.java new file mode 100644 index 0000000000..21f6270bca --- /dev/null +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/KafkaConsumerConfig.java @@ -0,0 +1,42 @@ +package com.baeldung.spring.kafka.start.stop.consumer; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.support.serializer.JsonDeserializer; + +import java.util.HashMap; +import java.util.Map; + +@EnableKafka +@Configuration +public class KafkaConsumerConfig { + + @Value("${spring.kafka.bootstrap-servers}") + private String bootstrapServers; + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + return factory; + } + + @Bean + public DefaultKafkaConsumerFactory consumerFactory() { + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.baeldung.spring.kafka.start.stop.consumer"); + return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), + new JsonDeserializer<>(UserEvent.class)); + } +} diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/KafkaListenerControlService.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/KafkaListenerControlService.java new file mode 100644 index 0000000000..259c461b4a --- /dev/null +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/KafkaListenerControlService.java @@ -0,0 +1,29 @@ +package com.baeldung.spring.kafka.start.stop.consumer; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.listener.MessageListenerContainer; +import org.springframework.stereotype.Service; + +@Service +public class KafkaListenerControlService { + + @Autowired + private KafkaListenerEndpointRegistry registry; + + // Method to start a listener + public void startListener(String listenerId) { + MessageListenerContainer listenerContainer = registry.getListenerContainer(listenerId); + if (listenerContainer != null && !listenerContainer.isRunning()) { + listenerContainer.start(); + } + } + + // Method to stop a listener + public void stopListener(String listenerId) { + MessageListenerContainer listenerContainer = registry.getListenerContainer(listenerId); + if (listenerContainer != null && listenerContainer.isRunning()) { + listenerContainer.stop(); + } + } +} diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/StartStopConsumerApplication.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/StartStopConsumerApplication.java new file mode 100644 index 0000000000..c4461b9388 --- /dev/null +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/StartStopConsumerApplication.java @@ -0,0 +1,12 @@ +package com.baeldung.spring.kafka.start.stop.consumer; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class StartStopConsumerApplication { + + public static void main(String[] args) { + SpringApplication.run(com.baeldung.spring.kafka.deserialization.exception.Application.class, args); + } + +} diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEvent.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEvent.java new file mode 100644 index 0000000000..1c7a7d35c8 --- /dev/null +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEvent.java @@ -0,0 +1,28 @@ +package com.baeldung.spring.kafka.start.stop.consumer; + +public class UserEvent { + private String userEventId; + private long eventNanoTime; + + public UserEvent(){} + + public UserEvent(String userEventId) { + this.userEventId = userEventId; + } + + public String getUserEventId() { + return userEventId; + } + + public void setUserEventId(String userEventId) { + this.userEventId = userEventId; + } + + public long getEventNanoTime() { + return eventNanoTime; + } + + public void setEventNanoTime(long eventNanoTime) { + this.eventNanoTime = eventNanoTime; + } +} diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEventListener.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEventListener.java new file mode 100644 index 0000000000..d35828211e --- /dev/null +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEventListener.java @@ -0,0 +1,21 @@ +package com.baeldung.spring.kafka.start.stop.consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +@Component +public class UserEventListener { + private static final Logger logger = LoggerFactory.getLogger(UserEventListener.class); + + @Autowired + UserEventStore userEventStore; + + @KafkaListener(id = Constants.LISTENER_ID, topics = Constants.MULTI_PARTITION_TOPIC, groupId = "test-group", + containerFactory = "kafkaListenerContainerFactory", autoStartup = "false") + public void userEventListener(UserEvent userEvent) { + logger.info("Received UserEvent: " + userEvent.getUserEventId() + ", Time: " + userEvent.getEventNanoTime()); + userEventStore.addUserEvent(userEvent); + } +} diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEventStore.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEventStore.java new file mode 100644 index 0000000000..e0ccab20bb --- /dev/null +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEventStore.java @@ -0,0 +1,23 @@ +package com.baeldung.spring.kafka.start.stop.consumer; + +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; + +@Component +public class UserEventStore { + private final List userEvents = new ArrayList<>(); + + public void addUserEvent(UserEvent userEvent){ + userEvents.add(userEvent); + } + + public List getUserEvents(){ + return userEvents; + } + + public void clearUserEvents(){ + this.userEvents.clear(); + } +} diff --git a/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerTest.java b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerTest.java new file mode 100644 index 0000000000..7dad372737 --- /dev/null +++ b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerTest.java @@ -0,0 +1,109 @@ +package com.baeldung.spring.kafka.startstopconsumer; + +import com.baeldung.spring.kafka.start.stop.consumer.*; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.LongSerializer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.kafka.support.serializer.JsonSerializer; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.shaded.com.google.common.collect.ImmutableList; +import org.testcontainers.utility.DockerImageName; +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; + + +import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +@Testcontainers +@SpringBootTest(classes = StartStopConsumerApplication.class) +public class StartStopConsumerTest { + + private static KafkaProducer producer; + + private static final Logger logger = LoggerFactory.getLogger(StartStopConsumerTest.class); + + @Container + private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")); + + @Autowired + KafkaListenerControlService kafkaListenerControlService; + + @Autowired + UserEventStore userEventStore; + + @DynamicPropertySource + static void setProps(DynamicPropertyRegistry registry) { + registry.add("spring.kafka.bootstrap-servers", KAFKA_CONTAINER::getBootstrapServers); + } + + @BeforeAll + static void setup() throws ExecutionException, InterruptedException { + KAFKA_CONTAINER.addExposedPort(9092); + + Properties adminProperties = new Properties(); + adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + + Properties producerProperties = new Properties(); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); + producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName()); + Admin admin = Admin.create(adminProperties); + producer = new KafkaProducer<>(producerProperties); + admin.createTopics(ImmutableList.of(new NewTopic(Constants.MULTI_PARTITION_TOPIC, Constants.MULTIPLE_PARTITIONS, Constants.REPLICATION_FACTOR))) + .all() + .get(); + } + + @AfterAll + static void destroy() { + KAFKA_CONTAINER.stop(); + } + + @Test + void processMessages_whenListenerIsRestarted_thenCorrectNumberOfMessagesAreConsumed() throws ExecutionException, InterruptedException { + kafkaListenerControlService.startListener(Constants.LISTENER_ID); + + //Verification that listener has started. + UserEvent startUserEventTest = new UserEvent(UUID.randomUUID().toString()); + startUserEventTest.setEventNanoTime(System.nanoTime()); + producer.send(new ProducerRecord<>(Constants.MULTI_PARTITION_TOPIC, startUserEventTest)); + await().untilAsserted(() -> assertEquals(1, this.userEventStore.getUserEvents().size())); + this.userEventStore.clearUserEvents(); + + for (long count = 1; count <= 10; count++) { + UserEvent userEvent = new UserEvent(UUID.randomUUID().toString()); + userEvent.setEventNanoTime(System.nanoTime()); + Future future = producer.send(new ProducerRecord<>(Constants.MULTI_PARTITION_TOPIC, userEvent)); + RecordMetadata metadata = future.get(); + if (count == 4) { + await().untilAsserted(() -> assertEquals(4, this.userEventStore.getUserEvents().size())); + this.kafkaListenerControlService.stopListener(Constants.LISTENER_ID); + this.userEventStore.clearUserEvents(); + } + logger.info("User Event ID: " + userEvent.getUserEventId() + ", Partition : " + metadata.partition()); + } + assertEquals(0, this.userEventStore.getUserEvents().size()); + kafkaListenerControlService.startListener(Constants.LISTENER_ID); + await().untilAsserted(() -> assertEquals(6, this.userEventStore.getUserEvents().size())); + kafkaListenerControlService.stopListener(Constants.LISTENER_ID); + } +} \ No newline at end of file From 2e750893590fd89a6478267cc574421ff5405a48 Mon Sep 17 00:00:00 2001 From: Amol Gote Date: Sat, 10 Feb 2024 19:06:16 -0500 Subject: [PATCH 02/16] BAEL-7374 - PR Build Failure --- ...tStopConsumerTest.java => StartStopConsumerUnitTest.java} | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) rename spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/{StartStopConsumerTest.java => StartStopConsumerUnitTest.java} (97%) diff --git a/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerTest.java b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerUnitTest.java similarity index 97% rename from spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerTest.java rename to spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerUnitTest.java index 7dad372737..fb557ec2b9 100644 --- a/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerTest.java +++ b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerUnitTest.java @@ -31,15 +31,14 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; @Testcontainers @SpringBootTest(classes = StartStopConsumerApplication.class) -public class StartStopConsumerTest { +public class StartStopConsumerUnitTest { private static KafkaProducer producer; - private static final Logger logger = LoggerFactory.getLogger(StartStopConsumerTest.class); + private static final Logger logger = LoggerFactory.getLogger(StartStopConsumerUnitTest.class); @Container private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")); From 45315f4f48ffef2ad323082cc07f3721cc1d5109 Mon Sep 17 00:00:00 2001 From: Amol Gote Date: Sat, 10 Feb 2024 19:23:25 -0500 Subject: [PATCH 03/16] BAEL-7374 - Removed reduntant jackson package --- spring-kafka-3/pom.xml | 5 ----- 1 file changed, 5 deletions(-) diff --git a/spring-kafka-3/pom.xml b/spring-kafka-3/pom.xml index c5db336e84..894eab2576 100644 --- a/spring-kafka-3/pom.xml +++ b/spring-kafka-3/pom.xml @@ -50,11 +50,6 @@ ${awaitility.version} test - - com.fasterxml.jackson.core - jackson-databind - ${jackson.version} - From 3b5e91b0dd38bce9708a6cebf272af2d9d205db4 Mon Sep 17 00:00:00 2001 From: Amol Gote Date: Sat, 10 Feb 2024 19:34:06 -0500 Subject: [PATCH 04/16] BAEL-7374 - Code clean Up --- .../StartStopConsumerUnitTest.java | 27 +++++++++---------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerUnitTest.java b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerUnitTest.java index fb557ec2b9..20a78f0251 100644 --- a/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerUnitTest.java +++ b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerUnitTest.java @@ -1,16 +1,15 @@ package com.baeldung.spring.kafka.startstopconsumer; import com.baeldung.spring.kafka.start.stop.consumer.*; -import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.clients.admin.AdminClientConfig; -import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.LongSerializer; +import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,8 +21,10 @@ import org.springframework.test.context.DynamicPropertySource; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.shaded.com.google.common.collect.ImmutableList; import org.testcontainers.utility.DockerImageName; + +import static java.time.Duration.ofMillis; +import static java.time.Duration.ofSeconds; import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -55,21 +56,14 @@ public class StartStopConsumerUnitTest { } @BeforeAll - static void setup() throws ExecutionException, InterruptedException { - KAFKA_CONTAINER.addExposedPort(9092); - - Properties adminProperties = new Properties(); - adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); - + static void beforeAll() { Properties producerProperties = new Properties(); producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName()); - Admin admin = Admin.create(adminProperties); producer = new KafkaProducer<>(producerProperties); - admin.createTopics(ImmutableList.of(new NewTopic(Constants.MULTI_PARTITION_TOPIC, Constants.MULTIPLE_PARTITIONS, Constants.REPLICATION_FACTOR))) - .all() - .get(); + Awaitility.setDefaultTimeout(ofSeconds(5)); + Awaitility.setDefaultPollInterval(ofMillis(50)); } @AfterAll @@ -77,6 +71,11 @@ public class StartStopConsumerUnitTest { KAFKA_CONTAINER.stop(); } + @BeforeEach + void beforeEach() { + this.userEventStore.clearUserEvents(); + } + @Test void processMessages_whenListenerIsRestarted_thenCorrectNumberOfMessagesAreConsumed() throws ExecutionException, InterruptedException { kafkaListenerControlService.startListener(Constants.LISTENER_ID); From 21c3d8fdcc3dbdd3403c470eac7b4bfb650983cb Mon Sep 17 00:00:00 2001 From: Amol Gote Date: Sat, 10 Feb 2024 21:01:53 -0500 Subject: [PATCH 05/16] BAEL-7374 - Removed unit test --- .../kafka/startstopconsumer/StartStopConsumerUnitTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerUnitTest.java b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerUnitTest.java index 20a78f0251..500962311e 100644 --- a/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerUnitTest.java +++ b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerUnitTest.java @@ -76,7 +76,7 @@ public class StartStopConsumerUnitTest { this.userEventStore.clearUserEvents(); } - @Test + /*@Test void processMessages_whenListenerIsRestarted_thenCorrectNumberOfMessagesAreConsumed() throws ExecutionException, InterruptedException { kafkaListenerControlService.startListener(Constants.LISTENER_ID); @@ -103,5 +103,5 @@ public class StartStopConsumerUnitTest { kafkaListenerControlService.startListener(Constants.LISTENER_ID); await().untilAsserted(() -> assertEquals(6, this.userEventStore.getUserEvents().size())); kafkaListenerControlService.stopListener(Constants.LISTENER_ID); - } + }*/ } \ No newline at end of file From 48a149956ec33e20b622fdb48a459635ef158aa4 Mon Sep 17 00:00:00 2001 From: Amol Gote Date: Sat, 10 Feb 2024 21:17:49 -0500 Subject: [PATCH 06/16] BAEL-7374 - Added Back Unit test --- .../kafka/startstopconsumer/StartStopConsumerUnitTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerUnitTest.java b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerUnitTest.java index 500962311e..20a78f0251 100644 --- a/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerUnitTest.java +++ b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerUnitTest.java @@ -76,7 +76,7 @@ public class StartStopConsumerUnitTest { this.userEventStore.clearUserEvents(); } - /*@Test + @Test void processMessages_whenListenerIsRestarted_thenCorrectNumberOfMessagesAreConsumed() throws ExecutionException, InterruptedException { kafkaListenerControlService.startListener(Constants.LISTENER_ID); @@ -103,5 +103,5 @@ public class StartStopConsumerUnitTest { kafkaListenerControlService.startListener(Constants.LISTENER_ID); await().untilAsserted(() -> assertEquals(6, this.userEventStore.getUserEvents().size())); kafkaListenerControlService.stopListener(Constants.LISTENER_ID); - }*/ + } } \ No newline at end of file From 8b560721b621e25906f164e730fc66cd33d50fc5 Mon Sep 17 00:00:00 2001 From: Amol Gote Date: Sat, 10 Feb 2024 21:29:51 -0500 Subject: [PATCH 07/16] BAEL-7374 - Application class --- .../kafka/start/stop/consumer/StartStopConsumerApplication.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/StartStopConsumerApplication.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/StartStopConsumerApplication.java index c4461b9388..9a2860bf1e 100644 --- a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/StartStopConsumerApplication.java +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/StartStopConsumerApplication.java @@ -6,7 +6,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; public class StartStopConsumerApplication { public static void main(String[] args) { - SpringApplication.run(com.baeldung.spring.kafka.deserialization.exception.Application.class, args); + SpringApplication.run(StartStopConsumerApplication.class, args); } } From 7a51fa8e80b9b8d516368418ff8211f54d01c95c Mon Sep 17 00:00:00 2001 From: Amol Gote Date: Sun, 11 Feb 2024 11:11:35 -0500 Subject: [PATCH 08/16] BAEL-7374 - Build Failure Fix --- .../kafka/startstopconsumer/StartStopConsumerUnitTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerUnitTest.java b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerUnitTest.java index 20a78f0251..559be3264e 100644 --- a/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerUnitTest.java +++ b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerUnitTest.java @@ -42,7 +42,7 @@ public class StartStopConsumerUnitTest { private static final Logger logger = LoggerFactory.getLogger(StartStopConsumerUnitTest.class); @Container - private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")); + private static KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")); @Autowired KafkaListenerControlService kafkaListenerControlService; From 3b4d456e3ae294c69dd3f229eb4a7ae708e8f85c Mon Sep 17 00:00:00 2001 From: Amol Gote Date: Sun, 11 Feb 2024 11:19:20 -0500 Subject: [PATCH 09/16] BAEL-7374 - Incoorect File modification --- .../com/baeldung/kafka/KafaConsumeLastNMessagesLiveTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/apache-kafka-2/src/test/java/com/baeldung/kafka/KafaConsumeLastNMessagesLiveTest.java b/apache-kafka-2/src/test/java/com/baeldung/kafka/KafaConsumeLastNMessagesLiveTest.java index f631c1a85e..b92f65ca5a 100644 --- a/apache-kafka-2/src/test/java/com/baeldung/kafka/KafaConsumeLastNMessagesLiveTest.java +++ b/apache-kafka-2/src/test/java/com/baeldung/kafka/KafaConsumeLastNMessagesLiveTest.java @@ -17,11 +17,13 @@ import org.testcontainers.containers.KafkaContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.DockerImageName; + import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.concurrent.ExecutionException; + import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; From 36f3e32ff67e2a759b7859ab2c92f848966b6602 Mon Sep 17 00:00:00 2001 From: Amol Gote Date: Tue, 13 Feb 2024 09:15:00 -0500 Subject: [PATCH 10/16] BAEL-7374 - Renamed to live test --- ...pConsumerUnitTest.java => StartStopConsumerLiveTest.java} | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) rename spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/{StartStopConsumerUnitTest.java => StartStopConsumerLiveTest.java} (96%) diff --git a/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerUnitTest.java b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerLiveTest.java similarity index 96% rename from spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerUnitTest.java rename to spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerLiveTest.java index 559be3264e..298aee513c 100644 --- a/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerUnitTest.java +++ b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerLiveTest.java @@ -33,13 +33,14 @@ import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +// This live test needs a Docker Daemon running so that a kafka container can be created @Testcontainers @SpringBootTest(classes = StartStopConsumerApplication.class) -public class StartStopConsumerUnitTest { +public class StartStopConsumerLiveTest { private static KafkaProducer producer; - private static final Logger logger = LoggerFactory.getLogger(StartStopConsumerUnitTest.class); + private static final Logger logger = LoggerFactory.getLogger(StartStopConsumerLiveTest.class); @Container private static KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")); From ec88f944279bd1db3024c1b1a61df17320da6868 Mon Sep 17 00:00:00 2001 From: Amol Gote Date: Sat, 17 Feb 2024 09:15:50 -0500 Subject: [PATCH 11/16] BAEL-7374 - Incorporated Review comments for PR --- .../kafka/start/stop/consumer/Constants.java | 4 ++-- .../consumer/KafkaListenerControlService.java | 2 -- .../consumer/StartStopConsumerApplication.java | 1 + .../kafka/start/stop/consumer/UserEvent.java | 15 +++++---------- .../start/stop/consumer/UserEventListener.java | 4 +++- .../kafka/start/stop/consumer/UserEventStore.java | 7 ++++--- .../StartStopConsumerLiveTest.java | 2 -- 7 files changed, 15 insertions(+), 20 deletions(-) diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/Constants.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/Constants.java index 71e453fa43..bc17c368d9 100644 --- a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/Constants.java +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/Constants.java @@ -1,8 +1,8 @@ package com.baeldung.spring.kafka.start.stop.consumer; public class Constants { + public static final String MULTI_PARTITION_TOPIC = "multi_partition_topic"; - public static final int MULTIPLE_PARTITIONS = 5; - public static final short REPLICATION_FACTOR = 1; + public static final String LISTENER_ID = "listener-id-1"; } diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/KafkaListenerControlService.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/KafkaListenerControlService.java index 259c461b4a..64f7158a49 100644 --- a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/KafkaListenerControlService.java +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/KafkaListenerControlService.java @@ -11,7 +11,6 @@ public class KafkaListenerControlService { @Autowired private KafkaListenerEndpointRegistry registry; - // Method to start a listener public void startListener(String listenerId) { MessageListenerContainer listenerContainer = registry.getListenerContainer(listenerId); if (listenerContainer != null && !listenerContainer.isRunning()) { @@ -19,7 +18,6 @@ public class KafkaListenerControlService { } } - // Method to stop a listener public void stopListener(String listenerId) { MessageListenerContainer listenerContainer = registry.getListenerContainer(listenerId); if (listenerContainer != null && listenerContainer.isRunning()) { diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/StartStopConsumerApplication.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/StartStopConsumerApplication.java index 9a2860bf1e..51893ea64a 100644 --- a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/StartStopConsumerApplication.java +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/StartStopConsumerApplication.java @@ -1,4 +1,5 @@ package com.baeldung.spring.kafka.start.stop.consumer; + import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEvent.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEvent.java index 1c7a7d35c8..fe95a8c4ad 100644 --- a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEvent.java +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEvent.java @@ -1,10 +1,12 @@ package com.baeldung.spring.kafka.start.stop.consumer; public class UserEvent { - private String userEventId; - private long eventNanoTime; - public UserEvent(){} + private String userEventId; + + + public UserEvent() { + } public UserEvent(String userEventId) { this.userEventId = userEventId; @@ -18,11 +20,4 @@ public class UserEvent { this.userEventId = userEventId; } - public long getEventNanoTime() { - return eventNanoTime; - } - - public void setEventNanoTime(long eventNanoTime) { - this.eventNanoTime = eventNanoTime; - } } diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEventListener.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEventListener.java index d35828211e..66fb861cdd 100644 --- a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEventListener.java +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEventListener.java @@ -1,4 +1,5 @@ package com.baeldung.spring.kafka.start.stop.consumer; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -7,6 +8,7 @@ import org.springframework.stereotype.Component; @Component public class UserEventListener { + private static final Logger logger = LoggerFactory.getLogger(UserEventListener.class); @Autowired @@ -15,7 +17,7 @@ public class UserEventListener { @KafkaListener(id = Constants.LISTENER_ID, topics = Constants.MULTI_PARTITION_TOPIC, groupId = "test-group", containerFactory = "kafkaListenerContainerFactory", autoStartup = "false") public void userEventListener(UserEvent userEvent) { - logger.info("Received UserEvent: " + userEvent.getUserEventId() + ", Time: " + userEvent.getEventNanoTime()); + logger.info("Received UserEvent: " + userEvent.getUserEventId()); userEventStore.addUserEvent(userEvent); } } diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEventStore.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEventStore.java index e0ccab20bb..071b39c7a3 100644 --- a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEventStore.java +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEventStore.java @@ -7,17 +7,18 @@ import java.util.List; @Component public class UserEventStore { + private final List userEvents = new ArrayList<>(); - public void addUserEvent(UserEvent userEvent){ + public void addUserEvent(UserEvent userEvent) { userEvents.add(userEvent); } - public List getUserEvents(){ + public List getUserEvents() { return userEvents; } - public void clearUserEvents(){ + public void clearUserEvents() { this.userEvents.clear(); } } diff --git a/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerLiveTest.java b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerLiveTest.java index 298aee513c..6405f75f91 100644 --- a/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerLiveTest.java +++ b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerLiveTest.java @@ -83,14 +83,12 @@ public class StartStopConsumerLiveTest { //Verification that listener has started. UserEvent startUserEventTest = new UserEvent(UUID.randomUUID().toString()); - startUserEventTest.setEventNanoTime(System.nanoTime()); producer.send(new ProducerRecord<>(Constants.MULTI_PARTITION_TOPIC, startUserEventTest)); await().untilAsserted(() -> assertEquals(1, this.userEventStore.getUserEvents().size())); this.userEventStore.clearUserEvents(); for (long count = 1; count <= 10; count++) { UserEvent userEvent = new UserEvent(UUID.randomUUID().toString()); - userEvent.setEventNanoTime(System.nanoTime()); Future future = producer.send(new ProducerRecord<>(Constants.MULTI_PARTITION_TOPIC, userEvent)); RecordMetadata metadata = future.get(); if (count == 4) { From 3ba11fcdba073e41d4a0129a6237c0909702bc63 Mon Sep 17 00:00:00 2001 From: Amol Gote Date: Sat, 17 Feb 2024 12:45:01 -0500 Subject: [PATCH 12/16] BAEL-7374 - Event Listener method rename --- .../spring/kafka/start/stop/consumer/UserEventListener.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEventListener.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEventListener.java index 66fb861cdd..f6235a05f5 100644 --- a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEventListener.java +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEventListener.java @@ -16,7 +16,7 @@ public class UserEventListener { @KafkaListener(id = Constants.LISTENER_ID, topics = Constants.MULTI_PARTITION_TOPIC, groupId = "test-group", containerFactory = "kafkaListenerContainerFactory", autoStartup = "false") - public void userEventListener(UserEvent userEvent) { + public void processUserEvent(UserEvent userEvent) { logger.info("Received UserEvent: " + userEvent.getUserEventId()); userEventStore.addUserEvent(userEvent); } From a40222a45adebf61b52927bfe5b006f7fba72a90 Mon Sep 17 00:00:00 2001 From: Amol Gote Date: Mon, 26 Feb 2024 07:08:41 -0500 Subject: [PATCH 13/16] BAEL-7374 - Code formatting issue --- .../spring/kafka/start/stop/consumer/UserEventListener.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEventListener.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEventListener.java index f6235a05f5..d6338d7f3a 100644 --- a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEventListener.java +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEventListener.java @@ -15,7 +15,7 @@ public class UserEventListener { UserEventStore userEventStore; @KafkaListener(id = Constants.LISTENER_ID, topics = Constants.MULTI_PARTITION_TOPIC, groupId = "test-group", - containerFactory = "kafkaListenerContainerFactory", autoStartup = "false") + containerFactory = "kafkaListenerContainerFactory", autoStartup = "false") public void processUserEvent(UserEvent userEvent) { logger.info("Received UserEvent: " + userEvent.getUserEventId()); userEventStore.addUserEvent(userEvent); From 0a2d6823de5aa656a5bded7f86f1e30b1f21753f Mon Sep 17 00:00:00 2001 From: Amol Gote Date: Thu, 29 Feb 2024 23:02:56 -0500 Subject: [PATCH 14/16] BAEL-7374 - Review comments. --- .../baeldung/spring/kafka/start/stop/consumer/Constants.java | 2 -- .../start/stop/consumer/StartStopConsumerApplication.java | 2 -- .../baeldung/spring/kafka/start/stop/consumer/UserEvent.java | 3 --- 3 files changed, 7 deletions(-) diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/Constants.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/Constants.java index bc17c368d9..9de3e117be 100644 --- a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/Constants.java +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/Constants.java @@ -1,8 +1,6 @@ package com.baeldung.spring.kafka.start.stop.consumer; public class Constants { - public static final String MULTI_PARTITION_TOPIC = "multi_partition_topic"; - public static final String LISTENER_ID = "listener-id-1"; } diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/StartStopConsumerApplication.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/StartStopConsumerApplication.java index 51893ea64a..c24517543b 100644 --- a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/StartStopConsumerApplication.java +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/StartStopConsumerApplication.java @@ -5,9 +5,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class StartStopConsumerApplication { - public static void main(String[] args) { SpringApplication.run(StartStopConsumerApplication.class, args); } - } diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEvent.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEvent.java index fe95a8c4ad..bfa8c72c39 100644 --- a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEvent.java +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEvent.java @@ -1,10 +1,8 @@ package com.baeldung.spring.kafka.start.stop.consumer; public class UserEvent { - private String userEventId; - public UserEvent() { } @@ -19,5 +17,4 @@ public class UserEvent { public void setUserEventId(String userEventId) { this.userEventId = userEventId; } - } From 2ef61a88842386be2d1df962b74e078b214f1c34 Mon Sep 17 00:00:00 2001 From: Amol Gote Date: Thu, 14 Mar 2024 14:35:48 -0400 Subject: [PATCH 15/16] Incorporated Review comments --- .../spring/kafka/start/stop/consumer/KafkaConsumerConfig.java | 4 ++-- .../stop/consumer}/StartStopConsumerLiveTest.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) rename spring-kafka-3/src/test/java/com/baeldung/spring/kafka/{startstopconsumer => start/stop/consumer}/StartStopConsumerLiveTest.java (98%) diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/KafkaConsumerConfig.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/KafkaConsumerConfig.java index 21f6270bca..f6b1d034a7 100644 --- a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/KafkaConsumerConfig.java +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/KafkaConsumerConfig.java @@ -23,7 +23,7 @@ public class KafkaConsumerConfig { @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = - new ConcurrentKafkaListenerContainerFactory<>(); + new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } @@ -37,6 +37,6 @@ public class KafkaConsumerConfig { props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.baeldung.spring.kafka.start.stop.consumer"); return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), - new JsonDeserializer<>(UserEvent.class)); + new JsonDeserializer<>(UserEvent.class)); } } diff --git a/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerLiveTest.java b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/start/stop/consumer/StartStopConsumerLiveTest.java similarity index 98% rename from spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerLiveTest.java rename to spring-kafka-3/src/test/java/com/baeldung/spring/kafka/start/stop/consumer/StartStopConsumerLiveTest.java index 6405f75f91..5df40e6d4a 100644 --- a/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerLiveTest.java +++ b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/start/stop/consumer/StartStopConsumerLiveTest.java @@ -1,4 +1,4 @@ -package com.baeldung.spring.kafka.startstopconsumer; +package com.baeldung.spring.kafka.start.stop.consumer; import com.baeldung.spring.kafka.start.stop.consumer.*; import org.apache.kafka.clients.producer.KafkaProducer; From 012020c3121371697278ae54fa0719584f9684e1 Mon Sep 17 00:00:00 2001 From: Amol Gote Date: Sat, 16 Mar 2024 14:56:09 -0400 Subject: [PATCH 16/16] Incorporated Review comments for the package name. --- .../{start/stop/consumer => startstopconsumer}/Constants.java | 2 +- .../consumer => startstopconsumer}/KafkaConsumerConfig.java | 2 +- .../KafkaListenerControlService.java | 2 +- .../StartStopConsumerApplication.java | 2 +- .../{start/stop/consumer => startstopconsumer}/UserEvent.java | 2 +- .../stop/consumer => startstopconsumer}/UserEventListener.java | 2 +- .../stop/consumer => startstopconsumer}/UserEventStore.java | 2 +- .../StartStopConsumerLiveTest.java | 3 +-- 8 files changed, 8 insertions(+), 9 deletions(-) rename spring-kafka-3/src/main/java/com/baeldung/spring/kafka/{start/stop/consumer => startstopconsumer}/Constants.java (75%) rename spring-kafka-3/src/main/java/com/baeldung/spring/kafka/{start/stop/consumer => startstopconsumer}/KafkaConsumerConfig.java (97%) rename spring-kafka-3/src/main/java/com/baeldung/spring/kafka/{start/stop/consumer => startstopconsumer}/KafkaListenerControlService.java (94%) rename spring-kafka-3/src/main/java/com/baeldung/spring/kafka/{start/stop/consumer => startstopconsumer}/StartStopConsumerApplication.java (85%) rename spring-kafka-3/src/main/java/com/baeldung/spring/kafka/{start/stop/consumer => startstopconsumer}/UserEvent.java (86%) rename spring-kafka-3/src/main/java/com/baeldung/spring/kafka/{start/stop/consumer => startstopconsumer}/UserEventListener.java (93%) rename spring-kafka-3/src/main/java/com/baeldung/spring/kafka/{start/stop/consumer => startstopconsumer}/UserEventStore.java (89%) rename spring-kafka-3/src/test/java/com/baeldung/spring/kafka/{start/stop/consumer => startstopconsumer}/StartStopConsumerLiveTest.java (97%) diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/Constants.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/startstopconsumer/Constants.java similarity index 75% rename from spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/Constants.java rename to spring-kafka-3/src/main/java/com/baeldung/spring/kafka/startstopconsumer/Constants.java index 9de3e117be..1b9f51f8a1 100644 --- a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/Constants.java +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/startstopconsumer/Constants.java @@ -1,4 +1,4 @@ -package com.baeldung.spring.kafka.start.stop.consumer; +package com.baeldung.spring.kafka.startstopconsumer; public class Constants { public static final String MULTI_PARTITION_TOPIC = "multi_partition_topic"; diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/KafkaConsumerConfig.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/startstopconsumer/KafkaConsumerConfig.java similarity index 97% rename from spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/KafkaConsumerConfig.java rename to spring-kafka-3/src/main/java/com/baeldung/spring/kafka/startstopconsumer/KafkaConsumerConfig.java index f6b1d034a7..d37345d2de 100644 --- a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/KafkaConsumerConfig.java +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/startstopconsumer/KafkaConsumerConfig.java @@ -1,4 +1,4 @@ -package com.baeldung.spring.kafka.start.stop.consumer; +package com.baeldung.spring.kafka.startstopconsumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/KafkaListenerControlService.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/startstopconsumer/KafkaListenerControlService.java similarity index 94% rename from spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/KafkaListenerControlService.java rename to spring-kafka-3/src/main/java/com/baeldung/spring/kafka/startstopconsumer/KafkaListenerControlService.java index 64f7158a49..6d4c101c38 100644 --- a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/KafkaListenerControlService.java +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/startstopconsumer/KafkaListenerControlService.java @@ -1,4 +1,4 @@ -package com.baeldung.spring.kafka.start.stop.consumer; +package com.baeldung.spring.kafka.startstopconsumer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/StartStopConsumerApplication.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerApplication.java similarity index 85% rename from spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/StartStopConsumerApplication.java rename to spring-kafka-3/src/main/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerApplication.java index c24517543b..f0cfe88717 100644 --- a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/StartStopConsumerApplication.java +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerApplication.java @@ -1,4 +1,4 @@ -package com.baeldung.spring.kafka.start.stop.consumer; +package com.baeldung.spring.kafka.startstopconsumer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEvent.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/startstopconsumer/UserEvent.java similarity index 86% rename from spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEvent.java rename to spring-kafka-3/src/main/java/com/baeldung/spring/kafka/startstopconsumer/UserEvent.java index bfa8c72c39..1c4692b8ee 100644 --- a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEvent.java +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/startstopconsumer/UserEvent.java @@ -1,4 +1,4 @@ -package com.baeldung.spring.kafka.start.stop.consumer; +package com.baeldung.spring.kafka.startstopconsumer; public class UserEvent { private String userEventId; diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEventListener.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/startstopconsumer/UserEventListener.java similarity index 93% rename from spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEventListener.java rename to spring-kafka-3/src/main/java/com/baeldung/spring/kafka/startstopconsumer/UserEventListener.java index d6338d7f3a..d68e70160f 100644 --- a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEventListener.java +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/startstopconsumer/UserEventListener.java @@ -1,4 +1,4 @@ -package com.baeldung.spring.kafka.start.stop.consumer; +package com.baeldung.spring.kafka.startstopconsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEventStore.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/startstopconsumer/UserEventStore.java similarity index 89% rename from spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEventStore.java rename to spring-kafka-3/src/main/java/com/baeldung/spring/kafka/startstopconsumer/UserEventStore.java index 071b39c7a3..3da6f7352d 100644 --- a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/start/stop/consumer/UserEventStore.java +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/startstopconsumer/UserEventStore.java @@ -1,4 +1,4 @@ -package com.baeldung.spring.kafka.start.stop.consumer; +package com.baeldung.spring.kafka.startstopconsumer; import org.springframework.stereotype.Component; diff --git a/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/start/stop/consumer/StartStopConsumerLiveTest.java b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerLiveTest.java similarity index 97% rename from spring-kafka-3/src/test/java/com/baeldung/spring/kafka/start/stop/consumer/StartStopConsumerLiveTest.java rename to spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerLiveTest.java index 5df40e6d4a..7ea8729111 100644 --- a/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/start/stop/consumer/StartStopConsumerLiveTest.java +++ b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/startstopconsumer/StartStopConsumerLiveTest.java @@ -1,6 +1,5 @@ -package com.baeldung.spring.kafka.start.stop.consumer; +package com.baeldung.spring.kafka.startstopconsumer; -import com.baeldung.spring.kafka.start.stop.consumer.*; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord;