From 90ccda2db3635f76313d7475f97482b7952ff878 Mon Sep 17 00:00:00 2001 From: "thibault.faure" Date: Mon, 15 Aug 2022 00:39:21 +0200 Subject: [PATCH] BAEL-5582 add code for the kafka message filtering improvement --- .../com/baeldung/spring/kafka/Farewell.java | 37 +++++++++++++++ .../kafka/KafkaApplicationMultiListener.java | 46 +++++++++++++++++++ .../spring/kafka/KafkaConsumerConfig.java | 35 ++++++++++++++ .../spring/kafka/KafkaProducerConfig.java | 21 +++++++-- .../spring/kafka/KafkaTopicConfig.java | 8 ++++ .../spring/kafka/MultiTypeKafkaListener.java | 26 +++++++++++ .../src/main/resources/application.properties | 4 +- 7 files changed, 172 insertions(+), 5 deletions(-) create mode 100644 spring-kafka/src/main/java/com/baeldung/spring/kafka/Farewell.java create mode 100644 spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplicationMultiListener.java create mode 100644 spring-kafka/src/main/java/com/baeldung/spring/kafka/MultiTypeKafkaListener.java diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/Farewell.java b/spring-kafka/src/main/java/com/baeldung/spring/kafka/Farewell.java new file mode 100644 index 0000000000..77b21e9ef8 --- /dev/null +++ b/spring-kafka/src/main/java/com/baeldung/spring/kafka/Farewell.java @@ -0,0 +1,37 @@ +package com.baeldung.spring.kafka; + +public class Farewell { + + private String message; + private Integer remainingMinutes; + + public Farewell() { + } + + public Farewell(String message, Integer remainingMinutes) { + this.message = message; + this.remainingMinutes = remainingMinutes; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public Integer getRemainingMinutes() { + return remainingMinutes; + } + + public void setRemainingMinutes(Integer remainingMinutes) { + this.remainingMinutes = remainingMinutes; + } + + @Override + public String toString() { + return message + ". In " + remainingMinutes + "!"; + } + +} diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplicationMultiListener.java b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplicationMultiListener.java new file mode 100644 index 0000000000..fa81488085 --- /dev/null +++ b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplicationMultiListener.java @@ -0,0 +1,46 @@ +package com.baeldung.spring.kafka; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.core.KafkaTemplate; + +@SpringBootApplication +public class KafkaApplicationMultiListener { + + public static void main(String[] args) throws Exception { + + ConfigurableApplicationContext context = SpringApplication.run(KafkaApplicationMultiListener.class, args); + + MessageProducer producer = context.getBean(MessageProducer.class); + producer.sendMessages(); + + Thread.sleep(5000); + context.close(); + } + + @Bean + public MessageProducer MessageProducer() { + return new MessageProducer(); + } + + public static class MessageProducer { + + @Autowired + private KafkaTemplate multiTypeKafkaTemplate; + + @Value(value = "${multi.type.topic.name}") + private String multiTypeTopicName; + + public void sendMessages() { + multiTypeKafkaTemplate.send(multiTypeTopicName, new Greeting("Greetings", "World!")); + multiTypeKafkaTemplate.send(multiTypeTopicName, new Farewell("Farewell", 25)); + multiTypeKafkaTemplate.send(multiTypeTopicName, "Simple string message"); + } + + } + +} diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java index 9495fcf508..5873bf86ce 100644 --- a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java +++ b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java @@ -12,6 +12,10 @@ import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.support.converter.RecordMessageConverter; +import org.springframework.kafka.support.converter.StringJsonMessageConverter; +import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper; +import org.springframework.kafka.support.mapping.Jackson2JavaTypeMapper; import org.springframework.kafka.support.serializer.JsonDeserializer; @EnableKafka @@ -85,4 +89,35 @@ public class KafkaConsumerConfig { return factory; } + @Bean + public RecordMessageConverter multiTypeConverter() { + StringJsonMessageConverter converter = new StringJsonMessageConverter(); + DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper(); + typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID); + typeMapper.addTrustedPackages("com.baeldung.spring.kafka"); + Map> mappings = new HashMap<>(); + mappings.put("greeting", Greeting.class); + mappings.put("farewell", Farewell.class); + typeMapper.setIdClassMapping(mappings); + converter.setTypeMapper(typeMapper); + return converter; + } + + @Bean + public ConsumerFactory multiTypeConsumerFactory() { + HashMap props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); + return new DefaultKafkaConsumerFactory<>(props); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory multiTypeKafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(multiTypeConsumerFactory()); + factory.setMessageConverter(multiTypeConverter()); + return factory; + } + } diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaProducerConfig.java b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaProducerConfig.java index 9dff81a09d..0612ade45d 100644 --- a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaProducerConfig.java +++ b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaProducerConfig.java @@ -1,5 +1,8 @@ package com.baeldung.spring.kafka; +import java.util.HashMap; +import java.util.Map; + import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; @@ -10,9 +13,6 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.support.serializer.JsonSerializer; -import java.util.HashMap; -import java.util.Map; - @Configuration public class KafkaProducerConfig { @@ -49,4 +49,19 @@ public class KafkaProducerConfig { return new KafkaTemplate<>(greetingProducerFactory()); } + @Bean + public ProducerFactory multiTypeProducerFactory() { + Map configProps = new HashMap<>(); + configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + configProps.put(JsonSerializer.TYPE_MAPPINGS, "greeting:com.baeldung.spring.kafka.Greeting, farewell:com.baeldung.spring.kafka.Farewell"); + return new DefaultKafkaProducerFactory<>(configProps); + } + + @Bean + public KafkaTemplate multiTypeKafkaTemplate() { + return new KafkaTemplate<>(multiTypeProducerFactory()); + } + } diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaTopicConfig.java b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaTopicConfig.java index 8a006a72bc..9f14b5972d 100644 --- a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaTopicConfig.java +++ b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaTopicConfig.java @@ -31,6 +31,9 @@ public class KafkaTopicConfig { @Value(value = "${greeting.topic.name}") private String greetingTopicName; + @Value(value = "${multi.type.topic.name}") + private String multiTypeTopicName; + @Bean public KafkaAdmin kafkaAdmin() { Map configs = new HashMap<>(); @@ -66,4 +69,9 @@ public class KafkaTopicConfig { newTopic.configs(configs); return newTopic; } + + @Bean + public NewTopic multiTypeTopic() { + return new NewTopic(multiTypeTopicName, 1, (short) 1); + } } diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/MultiTypeKafkaListener.java b/spring-kafka/src/main/java/com/baeldung/spring/kafka/MultiTypeKafkaListener.java new file mode 100644 index 0000000000..9afb5ff0b6 --- /dev/null +++ b/spring-kafka/src/main/java/com/baeldung/spring/kafka/MultiTypeKafkaListener.java @@ -0,0 +1,26 @@ +package com.baeldung.spring.kafka; + +import org.springframework.kafka.annotation.KafkaHandler; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +@Component +@KafkaListener(id = "multiGroup", topics = "multitype") +public class MultiTypeKafkaListener { + + @KafkaHandler + public void handleGreeting(Greeting greeting) { + System.out.println("Greeting received: " + greeting); + } + + @KafkaHandler + public void handleF(Farewell farewell) { + System.out.println("Farewell received: " + farewell); + } + + @KafkaHandler(isDefault = true) + public void unknown(Object object) { + System.out.println("Unkown type received: " + object); + } + +} diff --git a/spring-kafka/src/main/resources/application.properties b/spring-kafka/src/main/resources/application.properties index 9a614cbafd..b1dc1ceaf5 100644 --- a/spring-kafka/src/main/resources/application.properties +++ b/spring-kafka/src/main/resources/application.properties @@ -4,14 +4,14 @@ long.message.topic.name=longMessage greeting.topic.name=greeting filtered.topic.name=filtered partitioned.topic.name=partitioned - +multi.type.topic.name=multitype # monitoring - lag analysis monitor.kafka.bootstrap.config=localhost:9092 monitor.kafka.consumer.groupid=baeldungGrp monitor.topic.name=baeldung - # monitoring - simulation monitor.producer.simulate=true monitor.consumer.simulate=true monitor.kafka.consumer.groupid.simulate=baeldungGrpSimulate test.topic=testtopic1 +