BAEL-4905: Send large messages with Kafka (#10992)

This commit is contained in:
kaushal 2021-07-03 18:41:51 +05:30 committed by GitHub
parent c31d5fffe1
commit 9930b4c945
5 changed files with 97 additions and 0 deletions

View File

@ -0,0 +1,75 @@
package com.baeldung.spring.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
@SpringBootApplication
public class KafkaApplicationLongMessage {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(KafkaApplicationLongMessage.class, args);
LongMessageProducer producer = context.getBean(LongMessageProducer.class);
String fileData = readLongMessage();
producer.sendMessage(fileData);
//Deliberate delay to let listener consume produced message before main thread stops
Thread.sleep(5000);
context.close();
}
private static String readLongMessage() throws IOException {
String data = "";
//update complete location of large message here
data = new String(Files.readAllBytes(Paths.get("RandomTextFile.txt")));
return data;
}
@Bean
public LongMessageProducer longMessageProducer() {
return new LongMessageProducer();
}
@Bean
public LongMessageListener longMessageListener() {
return new LongMessageListener();
}
public static class LongMessageProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value(value = "${long.message.topic.name}")
private String topicName;
public void sendMessage(String message) {
kafkaTemplate.send(topicName, message);
System.out.println("Long message Sent");
}
}
public static class LongMessageListener {
@KafkaListener(topics = "${long.message.topic.name}", groupId = "longMessage", containerFactory = "longMessageKafkaListenerContainerFactory")
public void listenGroupLongMessage(String message) {
System.out.println("Received Message in group 'longMessage'");
}
}
}

View File

@ -27,6 +27,8 @@ public class KafkaConsumerConfig {
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "20971520");
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "20971520");
return new DefaultKafkaConsumerFactory<>(props);
}
@ -56,6 +58,11 @@ public class KafkaConsumerConfig {
return kafkaListenerContainerFactory("partitions");
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> longMessageKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("longMessage");
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> filterKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = kafkaListenerContainerFactory("filter");

View File

@ -25,6 +25,8 @@ public class KafkaProducerConfig {
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "20971520");
return new DefaultKafkaProducerFactory<>(configProps);
}

View File

@ -19,6 +19,9 @@ public class KafkaTopicConfig {
@Value(value = "${message.topic.name}")
private String topicName;
@Value(value = "${long.message.topic.name}")
private String longMsgTopicName;
@Value(value = "${partitioned.topic.name}")
private String partitionedTopicName;
@ -54,4 +57,13 @@ public class KafkaTopicConfig {
public NewTopic topic4() {
return new NewTopic(greetingTopicName, 1, (short) 1);
}
@Bean
public NewTopic topic5() {
NewTopic newTopic = new NewTopic(longMsgTopicName, 1, (short) 1);
Map<String, String> configs = new HashMap<>();
configs.put("max.message.bytes", "20971520");
newTopic.configs(configs);
return newTopic;
}
}

View File

@ -1,5 +1,6 @@
kafka.bootstrapAddress=localhost:9092
message.topic.name=baeldung
long.message.topic.name=longMessage
greeting.topic.name=greeting
filtered.topic.name=filtered
partitioned.topic.name=partitioned