Configure multiple listeners on same topic (#13572)

* moved Spring retryable article to own package

* moved Spring retryable article to own package

* implement multiple listeners kafka app demo project

* fix PR comments
This commit is contained in:
Vlad Fernoaga 2023-03-20 09:26:44 +02:00 committed by GitHub
parent a68763ba79
commit f40d10a9e1
16 changed files with 294 additions and 10 deletions

View File

@ -0,0 +1,26 @@
package com.baeldung.spring.kafka.multiplelisteners;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class BookConsumer {
private static final Logger logger = LoggerFactory.getLogger(BookConsumer.class);
@KafkaListener(topics = "books", groupId = "books-content-search")
public void bookContentSearchConsumer(BookEvent event) {
logger.info("Books event received for full-text search indexing => {}", event);
}
@KafkaListener(topics = "books", groupId = "books-price-index")
public void bookPriceIndexerConsumer(BookEvent event) {
logger.info("Books event received for price indexing => {}", event);
}
@KafkaListener(topics = "books", groupId = "book-notification-consumer", concurrency = "2")
public void bookNotificationConsumer(BookEvent event) {
logger.info("Books event received for notification => {}", event);
}
}

View File

@ -0,0 +1,15 @@
package com.baeldung.spring.kafka.multiplelisteners;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class BookEvent {
private String title;
private String description;
private Double price;
}

View File

@ -0,0 +1,64 @@
package com.baeldung.spring.kafka.multiplelisteners;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerConfig.class);
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Value(value = "${kafka.backoff.interval}")
private Long interval;
@Value(value = "${kafka.backoff.max_failure}")
private Long maxAttempts;
public ConsumerFactory<String, BookEvent> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "20971520");
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "20971520");
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
props.put(JsonDeserializer.TYPE_MAPPINGS, "bookEvent:com.baeldung.spring.kafka.multiplelisteners.BookEvent");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, BookEvent> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, BookEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setCommonErrorHandler(errorHandler());
return factory;
}
@Bean
public DefaultErrorHandler errorHandler() {
BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts);
return new DefaultErrorHandler((consumerRecord, e) -> LOGGER.error(String.format("consumed record %s because this exception was thrown", consumerRecord.toString())), fixedBackOff);
}
}

View File

@ -0,0 +1,54 @@
package com.baeldung.spring.kafka.multiplelisteners;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
@Configuration
public class KafkaProducerConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "20971520");
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, BookEvent> bookProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put(JsonSerializer.TYPE_MAPPINGS, "bookEvent:com.baeldung.spring.kafka.multiplelisteners.BookEvent");
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, BookEvent> bookKafkaTemplate() {
return new KafkaTemplate<>(bookProducerFactory());
}
}

View File

@ -0,0 +1,33 @@
package com.baeldung.spring.kafka.multiplelisteners;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaAdmin;
@Configuration
public class KafkaTopicConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Value(value = "${multiple-listeners.books.topic.name}")
private String booksTopicName;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic booksTopic() {
return new NewTopic(booksTopicName, 1, (short) 1);
}
}

View File

@ -0,0 +1,14 @@
package com.baeldung.spring.kafka.multiplelisteners;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Import;
@SpringBootApplication
@Import(value = { KafkaTopicConfig.class, KafkaConsumerConfig.class, KafkaProducerConfig.class })
public class MultipleListenersApplicationKafkaApp {
public static void main(String[] args) {
SpringApplication.run(MultipleListenersApplicationKafkaApp.class, args);
}
}

View File

@ -1,4 +1,4 @@
package com.baeldung.spring.kafka;
package com.baeldung.spring.kafka.retryable;
public class Farewell {

View File

@ -1,4 +1,4 @@
package com.baeldung.spring.kafka;
package com.baeldung.spring.kafka.retryable;
public class Greeting {

View File

@ -1,6 +1,5 @@
package com.baeldung.spring.kafka;
package com.baeldung.spring.kafka.retryable;
import java.net.SocketTimeoutException;
import java.util.HashMap;
import java.util.Map;

View File

@ -1,4 +1,4 @@
package com.baeldung.spring.kafka;
package com.baeldung.spring.kafka.retryable;
import java.util.HashMap;
import java.util.Map;
@ -55,7 +55,7 @@ public class KafkaProducerConfig {
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put(JsonSerializer.TYPE_MAPPINGS, "greeting:com.baeldung.spring.kafka.Greeting, farewell:com.baeldung.spring.kafka.Farewell");
configProps.put(JsonSerializer.TYPE_MAPPINGS, "greeting:com.baeldung.spring.kafka.retrayable.Greeting, farewell:com.baeldung.spring.kafka.retrayable.Farewell");
return new DefaultKafkaProducerFactory<>(configProps);
}

View File

@ -1,4 +1,4 @@
package com.baeldung.spring.kafka;
package com.baeldung.spring.kafka.retryable;
import java.util.HashMap;
import java.util.Map;

View File

@ -1,4 +1,4 @@
package com.baeldung.spring.kafka;
package com.baeldung.spring.kafka.retryable;
import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.KafkaListener;

View File

@ -1,4 +1,4 @@
package com.baeldung.spring.kafka;
package com.baeldung.spring.kafka.retryable;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

View File

@ -16,5 +16,7 @@ monitor.kafka.consumer.groupid.simulate=baeldungGrpSimulate
test.topic=testtopic1
kafka.backoff.interval=9000
kafka.backoff.max_failure=5
# multiple listeners properties
multiple-listeners.books.topic.name=books

View File

@ -0,0 +1,75 @@
package com.baeldung.spring.kafka.multiplelisteners;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.test.context.EmbeddedKafka;
@SpringBootTest(classes = MultipleListenersApplicationKafkaApp.class)
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class KafkaMultipleListenersIntegrationTest {
@Autowired
private KafkaListenerEndpointRegistry registry;
@Autowired
private KafkaTemplate<String, BookEvent> bookEventKafkaTemplate;
private static final String TOPIC = "books";
@Test
void givenEmbeddedKafkaBroker_whenSendingAMessage_thenMessageIsConsumedByAll3Listeners() throws Exception {
BookEvent bookEvent = new BookEvent("test-book-title-1", "test-book-desc-1", 2.0);
CountDownLatch latch = new CountDownLatch(3);
List<? extends ConcurrentMessageListenerContainer<?, ?>> bookListeners = registry.getAllListenerContainers()
.stream()
.map(c -> (ConcurrentMessageListenerContainer<?, ?>) c)
.collect(Collectors.toList());
bookListeners.forEach(listener -> {
listener.stop();
listener.getContainerProperties()
.setMessageListener((AcknowledgingConsumerAwareMessageListener<String, BookEvent>) (data, acknowledgment, consumer) -> {
assertThat(data.value()).isEqualTo(bookEvent);
latch.countDown();
});
listener.start();
});
bookEventKafkaTemplate.send(TOPIC, UUID.randomUUID()
.toString(), bookEvent);
assertThat(bookListeners.size()).isEqualTo(3);
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
}
@Test
void givenEmbeddedKafkaBroker_whenSendingThreeMessage_thenListenerPrintLogs() throws Exception {
CountDownLatch latch = new CountDownLatch(3);
Arrays.stream(new int[] { 1, 2, 3 })
.mapToObj(i -> new BookEvent(String.format("book %s", i), String.format("description %s", i), (double) i))
.forEach(bookEvent -> {
bookEventKafkaTemplate.send(TOPIC, UUID.randomUUID()
.toString(), bookEvent);
latch.countDown();
});
// wait for messages to be printed
Thread.sleep(1000);
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
}
}

View File

@ -1,4 +1,4 @@
package com.baeldung.spring.kafka;
package com.baeldung.spring.kafka.retryable;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
@ -17,6 +17,8 @@ import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import com.baeldung.spring.kafka.retryable.Greeting;
import com.baeldung.spring.kafka.retryable.RetryableApplicationKafkaApp;
import com.fasterxml.jackson.databind.ObjectMapper;
@SpringBootTest(classes = RetryableApplicationKafkaApp.class)