Merge pull request #12599 from thibaultfaure/improvement/BAEL-5582-kafka-message-filtering
BAEL-5582 add code for the kafka message filtering improvement
This commit is contained in:
commit
7253aab7f8
|
@ -0,0 +1,37 @@
|
||||||
|
package com.baeldung.spring.kafka;
|
||||||
|
|
||||||
|
public class Farewell {
|
||||||
|
|
||||||
|
private String message;
|
||||||
|
private Integer remainingMinutes;
|
||||||
|
|
||||||
|
public Farewell() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public Farewell(String message, Integer remainingMinutes) {
|
||||||
|
this.message = message;
|
||||||
|
this.remainingMinutes = remainingMinutes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getMessage() {
|
||||||
|
return message;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMessage(String message) {
|
||||||
|
this.message = message;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Integer getRemainingMinutes() {
|
||||||
|
return remainingMinutes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setRemainingMinutes(Integer remainingMinutes) {
|
||||||
|
this.remainingMinutes = remainingMinutes;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return message + ". In " + remainingMinutes + "!";
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,46 @@
|
||||||
|
package com.baeldung.spring.kafka;
|
||||||
|
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.boot.SpringApplication;
|
||||||
|
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||||
|
import org.springframework.context.ConfigurableApplicationContext;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.kafka.core.KafkaTemplate;
|
||||||
|
|
||||||
|
@SpringBootApplication
|
||||||
|
public class KafkaApplicationMultiListener {
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
|
ConfigurableApplicationContext context = SpringApplication.run(KafkaApplicationMultiListener.class, args);
|
||||||
|
|
||||||
|
MessageProducer producer = context.getBean(MessageProducer.class);
|
||||||
|
producer.sendMessages();
|
||||||
|
|
||||||
|
Thread.sleep(5000);
|
||||||
|
context.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public MessageProducer MessageProducer() {
|
||||||
|
return new MessageProducer();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class MessageProducer {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private KafkaTemplate<String, Object> multiTypeKafkaTemplate;
|
||||||
|
|
||||||
|
@Value(value = "${multi.type.topic.name}")
|
||||||
|
private String multiTypeTopicName;
|
||||||
|
|
||||||
|
public void sendMessages() {
|
||||||
|
multiTypeKafkaTemplate.send(multiTypeTopicName, new Greeting("Greetings", "World!"));
|
||||||
|
multiTypeKafkaTemplate.send(multiTypeTopicName, new Farewell("Farewell", 25));
|
||||||
|
multiTypeKafkaTemplate.send(multiTypeTopicName, "Simple string message");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -12,6 +12,10 @@ import org.springframework.kafka.annotation.EnableKafka;
|
||||||
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
||||||
import org.springframework.kafka.core.ConsumerFactory;
|
import org.springframework.kafka.core.ConsumerFactory;
|
||||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||||
|
import org.springframework.kafka.support.converter.RecordMessageConverter;
|
||||||
|
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
|
||||||
|
import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper;
|
||||||
|
import org.springframework.kafka.support.mapping.Jackson2JavaTypeMapper;
|
||||||
import org.springframework.kafka.support.serializer.JsonDeserializer;
|
import org.springframework.kafka.support.serializer.JsonDeserializer;
|
||||||
|
|
||||||
@EnableKafka
|
@EnableKafka
|
||||||
|
@ -85,4 +89,35 @@ public class KafkaConsumerConfig {
|
||||||
return factory;
|
return factory;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public RecordMessageConverter multiTypeConverter() {
|
||||||
|
StringJsonMessageConverter converter = new StringJsonMessageConverter();
|
||||||
|
DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
|
||||||
|
typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);
|
||||||
|
typeMapper.addTrustedPackages("com.baeldung.spring.kafka");
|
||||||
|
Map<String, Class<?>> mappings = new HashMap<>();
|
||||||
|
mappings.put("greeting", Greeting.class);
|
||||||
|
mappings.put("farewell", Farewell.class);
|
||||||
|
typeMapper.setIdClassMapping(mappings);
|
||||||
|
converter.setTypeMapper(typeMapper);
|
||||||
|
return converter;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ConsumerFactory<String, Object> multiTypeConsumerFactory() {
|
||||||
|
HashMap<String, Object> props = new HashMap<>();
|
||||||
|
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
|
||||||
|
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||||
|
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
|
||||||
|
return new DefaultKafkaConsumerFactory<>(props);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ConcurrentKafkaListenerContainerFactory<String, Object> multiTypeKafkaListenerContainerFactory() {
|
||||||
|
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
|
||||||
|
factory.setConsumerFactory(multiTypeConsumerFactory());
|
||||||
|
factory.setMessageConverter(multiTypeConverter());
|
||||||
|
return factory;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,8 @@
|
||||||
package com.baeldung.spring.kafka;
|
package com.baeldung.spring.kafka;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||||
import org.apache.kafka.common.serialization.StringSerializer;
|
import org.apache.kafka.common.serialization.StringSerializer;
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
@ -10,9 +13,6 @@ import org.springframework.kafka.core.KafkaTemplate;
|
||||||
import org.springframework.kafka.core.ProducerFactory;
|
import org.springframework.kafka.core.ProducerFactory;
|
||||||
import org.springframework.kafka.support.serializer.JsonSerializer;
|
import org.springframework.kafka.support.serializer.JsonSerializer;
|
||||||
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
@Configuration
|
@Configuration
|
||||||
public class KafkaProducerConfig {
|
public class KafkaProducerConfig {
|
||||||
|
|
||||||
|
@ -49,4 +49,19 @@ public class KafkaProducerConfig {
|
||||||
return new KafkaTemplate<>(greetingProducerFactory());
|
return new KafkaTemplate<>(greetingProducerFactory());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ProducerFactory<String, Object> multiTypeProducerFactory() {
|
||||||
|
Map<String, Object> configProps = new HashMap<>();
|
||||||
|
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
|
||||||
|
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||||
|
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
|
||||||
|
configProps.put(JsonSerializer.TYPE_MAPPINGS, "greeting:com.baeldung.spring.kafka.Greeting, farewell:com.baeldung.spring.kafka.Farewell");
|
||||||
|
return new DefaultKafkaProducerFactory<>(configProps);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public KafkaTemplate<String, Object> multiTypeKafkaTemplate() {
|
||||||
|
return new KafkaTemplate<>(multiTypeProducerFactory());
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,6 +31,9 @@ public class KafkaTopicConfig {
|
||||||
@Value(value = "${greeting.topic.name}")
|
@Value(value = "${greeting.topic.name}")
|
||||||
private String greetingTopicName;
|
private String greetingTopicName;
|
||||||
|
|
||||||
|
@Value(value = "${multi.type.topic.name}")
|
||||||
|
private String multiTypeTopicName;
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public KafkaAdmin kafkaAdmin() {
|
public KafkaAdmin kafkaAdmin() {
|
||||||
Map<String, Object> configs = new HashMap<>();
|
Map<String, Object> configs = new HashMap<>();
|
||||||
|
@ -66,4 +69,9 @@ public class KafkaTopicConfig {
|
||||||
newTopic.configs(configs);
|
newTopic.configs(configs);
|
||||||
return newTopic;
|
return newTopic;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public NewTopic multiTypeTopic() {
|
||||||
|
return new NewTopic(multiTypeTopicName, 1, (short) 1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,26 @@
|
||||||
|
package com.baeldung.spring.kafka;
|
||||||
|
|
||||||
|
import org.springframework.kafka.annotation.KafkaHandler;
|
||||||
|
import org.springframework.kafka.annotation.KafkaListener;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
@KafkaListener(id = "multiGroup", topics = "multitype")
|
||||||
|
public class MultiTypeKafkaListener {
|
||||||
|
|
||||||
|
@KafkaHandler
|
||||||
|
public void handleGreeting(Greeting greeting) {
|
||||||
|
System.out.println("Greeting received: " + greeting);
|
||||||
|
}
|
||||||
|
|
||||||
|
@KafkaHandler
|
||||||
|
public void handleF(Farewell farewell) {
|
||||||
|
System.out.println("Farewell received: " + farewell);
|
||||||
|
}
|
||||||
|
|
||||||
|
@KafkaHandler(isDefault = true)
|
||||||
|
public void unknown(Object object) {
|
||||||
|
System.out.println("Unkown type received: " + object);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -4,14 +4,14 @@ long.message.topic.name=longMessage
|
||||||
greeting.topic.name=greeting
|
greeting.topic.name=greeting
|
||||||
filtered.topic.name=filtered
|
filtered.topic.name=filtered
|
||||||
partitioned.topic.name=partitioned
|
partitioned.topic.name=partitioned
|
||||||
|
multi.type.topic.name=multitype
|
||||||
# monitoring - lag analysis
|
# monitoring - lag analysis
|
||||||
monitor.kafka.bootstrap.config=localhost:9092
|
monitor.kafka.bootstrap.config=localhost:9092
|
||||||
monitor.kafka.consumer.groupid=baeldungGrp
|
monitor.kafka.consumer.groupid=baeldungGrp
|
||||||
monitor.topic.name=baeldung
|
monitor.topic.name=baeldung
|
||||||
|
|
||||||
# monitoring - simulation
|
# monitoring - simulation
|
||||||
monitor.producer.simulate=true
|
monitor.producer.simulate=true
|
||||||
monitor.consumer.simulate=true
|
monitor.consumer.simulate=true
|
||||||
monitor.kafka.consumer.groupid.simulate=baeldungGrpSimulate
|
monitor.kafka.consumer.groupid.simulate=baeldungGrpSimulate
|
||||||
test.topic=testtopic1
|
test.topic=testtopic1
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue