diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplicationLongMessage.java b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplicationLongMessage.java new file mode 100644 index 0000000000..0af0a4b091 --- /dev/null +++ b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplicationLongMessage.java @@ -0,0 +1,75 @@ +package com.baeldung.spring.kafka; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.core.KafkaTemplate; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; + +@SpringBootApplication +public class KafkaApplicationLongMessage { + + public static void main(String[] args) throws Exception { + + ConfigurableApplicationContext context = SpringApplication.run(KafkaApplicationLongMessage.class, args); + + LongMessageProducer producer = context.getBean(LongMessageProducer.class); + + String fileData = readLongMessage(); + producer.sendMessage(fileData); + + //Deliberate delay to let listener consume produced message before main thread stops + Thread.sleep(5000); + context.close(); + } + + private static String readLongMessage() throws IOException { + String data = ""; + + //update complete location of large message here + data = new String(Files.readAllBytes(Paths.get("RandomTextFile.txt"))); + return data; + } + + @Bean + public LongMessageProducer longMessageProducer() { + return new LongMessageProducer(); + } + + @Bean + public LongMessageListener longMessageListener() { + return new LongMessageListener(); + } + + public static class LongMessageProducer { + + @Autowired + private KafkaTemplate kafkaTemplate; + + @Value(value = "${long.message.topic.name}") + private String topicName; + + public void sendMessage(String message) { + kafkaTemplate.send(topicName, message); + System.out.println("Long message Sent"); + } + + } + + public static class LongMessageListener { + + @KafkaListener(topics = "${long.message.topic.name}", groupId = "longMessage", containerFactory = "longMessageKafkaListenerContainerFactory") + public void listenGroupLongMessage(String message) { + System.out.println("Received Message in group 'longMessage'"); + } + + } + +} diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java index abaa431eec..9495fcf508 100644 --- a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java +++ b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java @@ -27,6 +27,8 @@ public class KafkaConsumerConfig { props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "20971520"); + props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "20971520"); return new DefaultKafkaConsumerFactory<>(props); } @@ -56,6 +58,11 @@ public class KafkaConsumerConfig { return kafkaListenerContainerFactory("partitions"); } + @Bean + public ConcurrentKafkaListenerContainerFactory longMessageKafkaListenerContainerFactory() { + return kafkaListenerContainerFactory("longMessage"); + } + @Bean public ConcurrentKafkaListenerContainerFactory filterKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = kafkaListenerContainerFactory("filter"); diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaProducerConfig.java b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaProducerConfig.java index 0223bab0fe..9dff81a09d 100644 --- a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaProducerConfig.java +++ b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaProducerConfig.java @@ -25,6 +25,8 @@ public class KafkaProducerConfig { configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "20971520"); + return new DefaultKafkaProducerFactory<>(configProps); } diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaTopicConfig.java b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaTopicConfig.java index 00e4147cd0..8a006a72bc 100644 --- a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaTopicConfig.java +++ b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaTopicConfig.java @@ -19,6 +19,9 @@ public class KafkaTopicConfig { @Value(value = "${message.topic.name}") private String topicName; + @Value(value = "${long.message.topic.name}") + private String longMsgTopicName; + @Value(value = "${partitioned.topic.name}") private String partitionedTopicName; @@ -54,4 +57,13 @@ public class KafkaTopicConfig { public NewTopic topic4() { return new NewTopic(greetingTopicName, 1, (short) 1); } + + @Bean + public NewTopic topic5() { + NewTopic newTopic = new NewTopic(longMsgTopicName, 1, (short) 1); + Map configs = new HashMap<>(); + configs.put("max.message.bytes", "20971520"); + newTopic.configs(configs); + return newTopic; + } } diff --git a/spring-kafka/src/main/resources/application.properties b/spring-kafka/src/main/resources/application.properties index e6a4668da3..e1a983339b 100644 --- a/spring-kafka/src/main/resources/application.properties +++ b/spring-kafka/src/main/resources/application.properties @@ -1,5 +1,6 @@ kafka.bootstrapAddress=localhost:9092 message.topic.name=baeldung +long.message.topic.name=longMessage greeting.topic.name=greeting filtered.topic.name=filtered partitioned.topic.name=partitioned