diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/multiplelisteners/BookConsumer.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/multiplelisteners/BookConsumer.java new file mode 100644 index 0000000000..77c7dc2e91 --- /dev/null +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/multiplelisteners/BookConsumer.java @@ -0,0 +1,26 @@ +package com.baeldung.spring.kafka.multiplelisteners; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Service; + +@Service +public class BookConsumer { + private static final Logger logger = LoggerFactory.getLogger(BookConsumer.class); + + @KafkaListener(topics = "books", groupId = "books-content-search") + public void bookContentSearchConsumer(BookEvent event) { + logger.info("Books event received for full-text search indexing => {}", event); + } + + @KafkaListener(topics = "books", groupId = "books-price-index") + public void bookPriceIndexerConsumer(BookEvent event) { + logger.info("Books event received for price indexing => {}", event); + } + + @KafkaListener(topics = "books", groupId = "book-notification-consumer", concurrency = "2") + public void bookNotificationConsumer(BookEvent event) { + logger.info("Books event received for notification => {}", event); + } +} diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/multiplelisteners/BookEvent.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/multiplelisteners/BookEvent.java new file mode 100644 index 0000000000..11b13f120d --- /dev/null +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/multiplelisteners/BookEvent.java @@ -0,0 +1,15 @@ +package com.baeldung.spring.kafka.multiplelisteners; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class BookEvent { + + private String title; + private String description; + private Double price; +} diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/multiplelisteners/KafkaConsumerConfig.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/multiplelisteners/KafkaConsumerConfig.java new file mode 100644 index 0000000000..a6e0a91425 --- /dev/null +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/multiplelisteners/KafkaConsumerConfig.java @@ -0,0 +1,64 @@ +package com.baeldung.spring.kafka.multiplelisteners; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.DefaultErrorHandler; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.util.backoff.BackOff; +import org.springframework.util.backoff.FixedBackOff; + +@EnableKafka +@Configuration +public class KafkaConsumerConfig { + + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerConfig.class); + + @Value(value = "${spring.kafka.bootstrap-servers}") + private String bootstrapAddress; + + @Value(value = "${kafka.backoff.interval}") + private Long interval; + + @Value(value = "${kafka.backoff.max_failure}") + private Long maxAttempts; + + public ConsumerFactory consumerFactory() { + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); + props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "20971520"); + props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "20971520"); + props.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); + props.put(JsonDeserializer.TYPE_MAPPINGS, "bookEvent:com.baeldung.spring.kafka.multiplelisteners.BookEvent"); + + return new DefaultKafkaConsumerFactory<>(props); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + factory.setCommonErrorHandler(errorHandler()); + return factory; + } + + @Bean + public DefaultErrorHandler errorHandler() { + BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts); + return new DefaultErrorHandler((consumerRecord, e) -> LOGGER.error(String.format("consumed record %s because this exception was thrown", consumerRecord.toString())), fixedBackOff); + } + +} diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/multiplelisteners/KafkaProducerConfig.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/multiplelisteners/KafkaProducerConfig.java new file mode 100644 index 0000000000..35204428a0 --- /dev/null +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/multiplelisteners/KafkaProducerConfig.java @@ -0,0 +1,54 @@ +package com.baeldung.spring.kafka.multiplelisteners; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.serializer.JsonSerializer; + +@Configuration +public class KafkaProducerConfig { + + @Value(value = "${spring.kafka.bootstrap-servers}") + private String bootstrapAddress; + + @Bean + public ProducerFactory producerFactory() { + Map configProps = new HashMap<>(); + configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "20971520"); + + return new DefaultKafkaProducerFactory<>(configProps); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } + + @Bean + public ProducerFactory bookProducerFactory() { + Map configProps = new HashMap<>(); + configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + configProps.put(JsonSerializer.TYPE_MAPPINGS, "bookEvent:com.baeldung.spring.kafka.multiplelisteners.BookEvent"); + + return new DefaultKafkaProducerFactory<>(configProps); + } + + @Bean + public KafkaTemplate bookKafkaTemplate() { + return new KafkaTemplate<>(bookProducerFactory()); + } + +} diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/multiplelisteners/KafkaTopicConfig.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/multiplelisteners/KafkaTopicConfig.java new file mode 100644 index 0000000000..5bc0a966b5 --- /dev/null +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/multiplelisteners/KafkaTopicConfig.java @@ -0,0 +1,33 @@ +package com.baeldung.spring.kafka.multiplelisteners; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.KafkaAdmin; + +@Configuration +public class KafkaTopicConfig { + + @Value(value = "${spring.kafka.bootstrap-servers}") + private String bootstrapAddress; + + @Value(value = "${multiple-listeners.books.topic.name}") + private String booksTopicName; + + @Bean + public KafkaAdmin kafkaAdmin() { + Map configs = new HashMap<>(); + configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + return new KafkaAdmin(configs); + } + + @Bean + public NewTopic booksTopic() { + return new NewTopic(booksTopicName, 1, (short) 1); + } +} diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/multiplelisteners/MultipleListenersApplicationKafkaApp.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/multiplelisteners/MultipleListenersApplicationKafkaApp.java new file mode 100644 index 0000000000..c8a2f6e689 --- /dev/null +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/multiplelisteners/MultipleListenersApplicationKafkaApp.java @@ -0,0 +1,14 @@ +package com.baeldung.spring.kafka.multiplelisteners; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Import; + +@SpringBootApplication +@Import(value = { KafkaTopicConfig.class, KafkaConsumerConfig.class, KafkaProducerConfig.class }) +public class MultipleListenersApplicationKafkaApp { + + public static void main(String[] args) { + SpringApplication.run(MultipleListenersApplicationKafkaApp.class, args); + } +} diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/Farewell.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/retryable/Farewell.java similarity index 94% rename from spring-kafka-2/src/main/java/com/baeldung/spring/kafka/Farewell.java rename to spring-kafka-2/src/main/java/com/baeldung/spring/kafka/retryable/Farewell.java index bbff315ad2..519d847aab 100644 --- a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/Farewell.java +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/retryable/Farewell.java @@ -1,4 +1,4 @@ -package com.baeldung.spring.kafka; +package com.baeldung.spring.kafka.retryable; public class Farewell { diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/Greeting.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/retryable/Greeting.java similarity index 92% rename from spring-kafka-2/src/main/java/com/baeldung/spring/kafka/Greeting.java rename to spring-kafka-2/src/main/java/com/baeldung/spring/kafka/retryable/Greeting.java index b4633e802a..79abeda34e 100644 --- a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/Greeting.java +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/retryable/Greeting.java @@ -1,4 +1,4 @@ -package com.baeldung.spring.kafka; +package com.baeldung.spring.kafka.retryable; public class Greeting { diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/retryable/KafkaConsumerConfig.java similarity index 98% rename from spring-kafka-2/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java rename to spring-kafka-2/src/main/java/com/baeldung/spring/kafka/retryable/KafkaConsumerConfig.java index 463d3209ea..cf4b29137e 100644 --- a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/retryable/KafkaConsumerConfig.java @@ -1,6 +1,5 @@ -package com.baeldung.spring.kafka; +package com.baeldung.spring.kafka.retryable; -import java.net.SocketTimeoutException; import java.util.HashMap; import java.util.Map; diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/KafkaProducerConfig.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/retryable/KafkaProducerConfig.java similarity index 94% rename from spring-kafka-2/src/main/java/com/baeldung/spring/kafka/KafkaProducerConfig.java rename to spring-kafka-2/src/main/java/com/baeldung/spring/kafka/retryable/KafkaProducerConfig.java index da8b2bd1a6..90dcb2ccf9 100644 --- a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/KafkaProducerConfig.java +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/retryable/KafkaProducerConfig.java @@ -1,4 +1,4 @@ -package com.baeldung.spring.kafka; +package com.baeldung.spring.kafka.retryable; import java.util.HashMap; import java.util.Map; @@ -55,7 +55,7 @@ public class KafkaProducerConfig { configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); - configProps.put(JsonSerializer.TYPE_MAPPINGS, "greeting:com.baeldung.spring.kafka.Greeting, farewell:com.baeldung.spring.kafka.Farewell"); + configProps.put(JsonSerializer.TYPE_MAPPINGS, "greeting:com.baeldung.spring.kafka.retrayable.Greeting, farewell:com.baeldung.spring.kafka.retrayable.Farewell"); return new DefaultKafkaProducerFactory<>(configProps); } diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/KafkaTopicConfig.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/retryable/KafkaTopicConfig.java similarity index 97% rename from spring-kafka-2/src/main/java/com/baeldung/spring/kafka/KafkaTopicConfig.java rename to spring-kafka-2/src/main/java/com/baeldung/spring/kafka/retryable/KafkaTopicConfig.java index 6a20915699..3b4c2d9928 100644 --- a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/KafkaTopicConfig.java +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/retryable/KafkaTopicConfig.java @@ -1,4 +1,4 @@ -package com.baeldung.spring.kafka; +package com.baeldung.spring.kafka.retryable; import java.util.HashMap; import java.util.Map; diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/MultiTypeKafkaListener.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/retryable/MultiTypeKafkaListener.java similarity index 95% rename from spring-kafka-2/src/main/java/com/baeldung/spring/kafka/MultiTypeKafkaListener.java rename to spring-kafka-2/src/main/java/com/baeldung/spring/kafka/retryable/MultiTypeKafkaListener.java index 6c4d78171b..441e564176 100644 --- a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/MultiTypeKafkaListener.java +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/retryable/MultiTypeKafkaListener.java @@ -1,4 +1,4 @@ -package com.baeldung.spring.kafka; +package com.baeldung.spring.kafka.retryable; import org.springframework.kafka.annotation.KafkaHandler; import org.springframework.kafka.annotation.KafkaListener; diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/RetryableApplicationKafkaApp.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/retryable/RetryableApplicationKafkaApp.java similarity index 91% rename from spring-kafka-2/src/main/java/com/baeldung/spring/kafka/RetryableApplicationKafkaApp.java rename to spring-kafka-2/src/main/java/com/baeldung/spring/kafka/retryable/RetryableApplicationKafkaApp.java index e43207829a..458ebac124 100644 --- a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/RetryableApplicationKafkaApp.java +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/retryable/RetryableApplicationKafkaApp.java @@ -1,4 +1,4 @@ -package com.baeldung.spring.kafka; +package com.baeldung.spring.kafka.retryable; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; diff --git a/spring-kafka-2/src/main/resources/application.properties b/spring-kafka-2/src/main/resources/application.properties index 691b6f55b7..4725ace2d9 100644 --- a/spring-kafka-2/src/main/resources/application.properties +++ b/spring-kafka-2/src/main/resources/application.properties @@ -16,5 +16,7 @@ monitor.kafka.consumer.groupid.simulate=baeldungGrpSimulate test.topic=testtopic1 kafka.backoff.interval=9000 kafka.backoff.max_failure=5 +# multiple listeners properties +multiple-listeners.books.topic.name=books diff --git a/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/multiplelisteners/KafkaMultipleListenersIntegrationTest.java b/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/multiplelisteners/KafkaMultipleListenersIntegrationTest.java new file mode 100644 index 0000000000..b6634ec7ed --- /dev/null +++ b/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/multiplelisteners/KafkaMultipleListenersIntegrationTest.java @@ -0,0 +1,75 @@ +package com.baeldung.spring.kafka.multiplelisteners; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; +import org.springframework.kafka.test.context.EmbeddedKafka; + +@SpringBootTest(classes = MultipleListenersApplicationKafkaApp.class) +@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" }) +class KafkaMultipleListenersIntegrationTest { + + @Autowired + private KafkaListenerEndpointRegistry registry; + @Autowired + private KafkaTemplate bookEventKafkaTemplate; + + private static final String TOPIC = "books"; + + @Test + void givenEmbeddedKafkaBroker_whenSendingAMessage_thenMessageIsConsumedByAll3Listeners() throws Exception { + BookEvent bookEvent = new BookEvent("test-book-title-1", "test-book-desc-1", 2.0); + CountDownLatch latch = new CountDownLatch(3); + + List> bookListeners = registry.getAllListenerContainers() + .stream() + .map(c -> (ConcurrentMessageListenerContainer) c) + .collect(Collectors.toList()); + + bookListeners.forEach(listener -> { + listener.stop(); + listener.getContainerProperties() + .setMessageListener((AcknowledgingConsumerAwareMessageListener) (data, acknowledgment, consumer) -> { + assertThat(data.value()).isEqualTo(bookEvent); + latch.countDown(); + }); + listener.start(); + }); + + bookEventKafkaTemplate.send(TOPIC, UUID.randomUUID() + .toString(), bookEvent); + + assertThat(bookListeners.size()).isEqualTo(3); + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + } + + @Test + void givenEmbeddedKafkaBroker_whenSendingThreeMessage_thenListenerPrintLogs() throws Exception { + CountDownLatch latch = new CountDownLatch(3); + Arrays.stream(new int[] { 1, 2, 3 }) + .mapToObj(i -> new BookEvent(String.format("book %s", i), String.format("description %s", i), (double) i)) + .forEach(bookEvent -> { + bookEventKafkaTemplate.send(TOPIC, UUID.randomUUID() + .toString(), bookEvent); + latch.countDown(); + }); + + // wait for messages to be printed + Thread.sleep(1000); + + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + } +} diff --git a/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/KafkaRetryableIntegrationTest.java b/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/retryable/KafkaRetryableIntegrationTest.java similarity index 95% rename from spring-kafka-2/src/test/java/com/baeldung/spring/kafka/KafkaRetryableIntegrationTest.java rename to spring-kafka-2/src/test/java/com/baeldung/spring/kafka/retryable/KafkaRetryableIntegrationTest.java index 5417fee1ac..52cda85f90 100644 --- a/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/KafkaRetryableIntegrationTest.java +++ b/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/retryable/KafkaRetryableIntegrationTest.java @@ -1,4 +1,4 @@ -package com.baeldung.spring.kafka; +package com.baeldung.spring.kafka.retryable; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; @@ -17,6 +17,8 @@ import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.context.EmbeddedKafka; +import com.baeldung.spring.kafka.retryable.Greeting; +import com.baeldung.spring.kafka.retryable.RetryableApplicationKafkaApp; import com.fasterxml.jackson.databind.ObjectMapper; @SpringBootTest(classes = RetryableApplicationKafkaApp.class)