diff --git a/spring-kafka-2/README.md b/spring-kafka-2/README.md
new file mode 100644
index 0000000000..9a5ce627dd
--- /dev/null
+++ b/spring-kafka-2/README.md
@@ -0,0 +1,7 @@
+## Spring Kafka 2
+
+This module contains articles about Spring with Kafka
+
+### Relevant articles
+
+- [Intro to Apache Kafka with Spring](https://www.baeldung.com/spring-kafka)
diff --git a/spring-kafka-2/pom.xml b/spring-kafka-2/pom.xml
new file mode 100644
index 0000000000..d51c2e300f
--- /dev/null
+++ b/spring-kafka-2/pom.xml
@@ -0,0 +1,70 @@
+
+
+ 4.0.0
+ spring-kafka
+ spring-kafka
+ Intro to Kafka with Spring
+
+
+ com.baeldung
+ parent-boot-2
+ 0.0.1-SNAPSHOT
+ ../parent-boot-2
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+
+ org.springframework.kafka
+ spring-kafka
+
+
+ org.apache.kafka
+ kafka-streams
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+ org.projectlombok
+ lombok
+
+
+ org.springframework.kafka
+ spring-kafka-test
+ test
+
+
+ org.testcontainers
+ kafka
+ ${testcontainers-kafka.version}
+ test
+
+
+ org.testcontainers
+ junit-jupiter
+ ${testcontainers-kafka.version}
+ test
+
+
+ org.awaitility
+ awaitility
+ test
+
+
+
+
+ 1.16.2
+
+
+
\ No newline at end of file
diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/Farewell.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/Farewell.java
new file mode 100644
index 0000000000..bbff315ad2
--- /dev/null
+++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/Farewell.java
@@ -0,0 +1,37 @@
+package com.baeldung.spring.kafka;
+
+public class Farewell {
+
+ private String message;
+ private Integer remainingMinutes;
+
+ public Farewell() {
+ }
+
+ public Farewell(String message, Integer remainingMinutes) {
+ this.message = message;
+ this.remainingMinutes = remainingMinutes;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+
+ public Integer getRemainingMinutes() {
+ return remainingMinutes;
+ }
+
+ public void setRemainingMinutes(Integer remainingMinutes) {
+ this.remainingMinutes = remainingMinutes;
+ }
+
+ @Override
+ public String toString() {
+ return message + ". In " + remainingMinutes + "!";
+ }
+
+}
diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/Greeting.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/Greeting.java
new file mode 100644
index 0000000000..b4633e802a
--- /dev/null
+++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/Greeting.java
@@ -0,0 +1,37 @@
+package com.baeldung.spring.kafka;
+
+public class Greeting {
+
+ private String msg;
+ private String name;
+
+ public Greeting() {
+
+ }
+
+ public Greeting(String msg, String name) {
+ this.msg = msg;
+ this.name = name;
+ }
+
+ public String getMsg() {
+ return msg;
+ }
+
+ public void setMsg(String msg) {
+ this.msg = msg;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String toString() {
+ return msg + ", " + name + "!";
+ }
+}
diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java
new file mode 100644
index 0000000000..463d3209ea
--- /dev/null
+++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java
@@ -0,0 +1,152 @@
+package com.baeldung.spring.kafka;
+
+import java.net.SocketTimeoutException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Primary;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.listener.ContainerProperties;
+import org.springframework.kafka.listener.DefaultErrorHandler;
+import org.springframework.kafka.support.converter.RecordMessageConverter;
+import org.springframework.kafka.support.converter.StringJsonMessageConverter;
+import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper;
+import org.springframework.kafka.support.mapping.Jackson2JavaTypeMapper;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.util.backoff.BackOff;
+import org.springframework.util.backoff.FixedBackOff;
+
+@EnableKafka
+@Configuration
+public class KafkaConsumerConfig {
+
+ @Value(value = "${spring.kafka.bootstrap-servers}")
+ private String bootstrapAddress;
+
+ @Value(value = "${kafka.backoff.interval}")
+ private Long interval;
+
+ @Value(value = "${kafka.backoff.max_failure}")
+ private Long maxAttempts;
+
+ public ConsumerFactory consumerFactory(String groupId) {
+ Map props = new HashMap<>();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "20971520");
+ props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "20971520");
+ return new DefaultKafkaConsumerFactory<>(props);
+ }
+
+ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(String groupId) {
+ ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
+ factory.setConsumerFactory(consumerFactory(groupId));
+ return factory;
+ }
+
+ @Bean
+ public ConcurrentKafkaListenerContainerFactory fooKafkaListenerContainerFactory() {
+ return kafkaListenerContainerFactory("foo");
+ }
+
+ @Bean
+ public ConcurrentKafkaListenerContainerFactory barKafkaListenerContainerFactory() {
+ return kafkaListenerContainerFactory("bar");
+ }
+
+ @Bean
+ public ConcurrentKafkaListenerContainerFactory headersKafkaListenerContainerFactory() {
+ return kafkaListenerContainerFactory("headers");
+ }
+
+ @Bean
+ public ConcurrentKafkaListenerContainerFactory partitionsKafkaListenerContainerFactory() {
+ return kafkaListenerContainerFactory("partitions");
+ }
+
+ @Bean
+ public ConcurrentKafkaListenerContainerFactory longMessageKafkaListenerContainerFactory() {
+ return kafkaListenerContainerFactory("longMessage");
+ }
+
+ @Bean
+ public ConcurrentKafkaListenerContainerFactory filterKafkaListenerContainerFactory() {
+ ConcurrentKafkaListenerContainerFactory factory = kafkaListenerContainerFactory("filter");
+ factory.setRecordFilterStrategy(record -> record.value()
+ .contains("World"));
+ return factory;
+ }
+
+ public ConsumerFactory greetingConsumerFactory() {
+ Map props = new HashMap<>();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "greeting");
+ return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(Greeting.class));
+ }
+
+ @Bean
+ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
+ ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
+ factory.setConsumerFactory(greetingConsumerFactory());
+ return factory;
+ }
+
+ @Bean
+ public RecordMessageConverter multiTypeConverter() {
+ StringJsonMessageConverter converter = new StringJsonMessageConverter();
+ DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
+ typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);
+ typeMapper.addTrustedPackages("com.baeldung.spring.kafka");
+ Map> mappings = new HashMap<>();
+ mappings.put("greeting", Greeting.class);
+ mappings.put("farewell", Farewell.class);
+ typeMapper.setIdClassMapping(mappings);
+ converter.setTypeMapper(typeMapper);
+ return converter;
+ }
+
+ @Bean
+ public ConsumerFactory multiTypeConsumerFactory() {
+ HashMap props = new HashMap<>();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id_test");
+ return new DefaultKafkaConsumerFactory<>(props);
+ }
+
+ @Bean
+ @Primary
+ public ConcurrentKafkaListenerContainerFactory greetingKafkaListenerContainerFactory() {
+ ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
+ factory.setConsumerFactory(multiTypeConsumerFactory());
+ factory.setMessageConverter(multiTypeConverter());
+ factory.setCommonErrorHandler(errorHandler());
+ factory.getContainerProperties()
+ .setAckMode(ContainerProperties.AckMode.RECORD);
+ return factory;
+ }
+
+ @Bean
+ public DefaultErrorHandler errorHandler() {
+ BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts);
+ DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, e) -> {
+ System.out.println(String.format("consumed record %s because this exception was thrown",consumerRecord.toString(),e.getClass().getName()));
+ }, fixedBackOff);
+ //Commented because of the test
+ //errorHandler.addRetryableExceptions(SocketTimeoutException.class,RuntimeException.class);
+ errorHandler.addNotRetryableExceptions(NullPointerException.class);
+ return errorHandler;
+ }
+
+}
diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/KafkaProducerConfig.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/KafkaProducerConfig.java
new file mode 100644
index 0000000000..da8b2bd1a6
--- /dev/null
+++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/KafkaProducerConfig.java
@@ -0,0 +1,67 @@
+package com.baeldung.spring.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+
+@Configuration
+public class KafkaProducerConfig {
+
+ @Value(value = "${spring.kafka.bootstrap-servers}")
+ private String bootstrapAddress;
+
+ @Bean
+ public ProducerFactory producerFactory() {
+ Map configProps = new HashMap<>();
+ configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
+ configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ configProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "20971520");
+
+ return new DefaultKafkaProducerFactory<>(configProps);
+ }
+
+ @Bean
+ public KafkaTemplate kafkaTemplate() {
+ return new KafkaTemplate<>(producerFactory());
+ }
+
+ @Bean
+ public ProducerFactory greetingProducerFactory() {
+ Map configProps = new HashMap<>();
+ configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
+ configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
+ return new DefaultKafkaProducerFactory<>(configProps);
+ }
+
+ @Bean
+ public KafkaTemplate greetingKafkaTemplate() {
+ return new KafkaTemplate<>(greetingProducerFactory());
+ }
+
+ @Bean
+ public ProducerFactory multiTypeProducerFactory() {
+ Map configProps = new HashMap<>();
+ configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
+ configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
+ configProps.put(JsonSerializer.TYPE_MAPPINGS, "greeting:com.baeldung.spring.kafka.Greeting, farewell:com.baeldung.spring.kafka.Farewell");
+ return new DefaultKafkaProducerFactory<>(configProps);
+ }
+
+ @Bean
+ public KafkaTemplate multiTypeKafkaTemplate() {
+ return new KafkaTemplate<>(multiTypeProducerFactory());
+ }
+
+}
diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/KafkaTopicConfig.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/KafkaTopicConfig.java
new file mode 100644
index 0000000000..6a20915699
--- /dev/null
+++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/KafkaTopicConfig.java
@@ -0,0 +1,77 @@
+package com.baeldung.spring.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.KafkaAdmin;
+
+@Configuration
+public class KafkaTopicConfig {
+
+ @Value(value = "${spring.kafka.bootstrap-servers}")
+ private String bootstrapAddress;
+
+ @Value(value = "${message.topic.name}")
+ private String topicName;
+
+ @Value(value = "${long.message.topic.name}")
+ private String longMsgTopicName;
+
+ @Value(value = "${partitioned.topic.name}")
+ private String partitionedTopicName;
+
+ @Value(value = "${filtered.topic.name}")
+ private String filteredTopicName;
+
+ @Value(value = "${greeting.topic.name}")
+ private String greetingTopicName;
+
+ @Value(value = "${multi.type.topic.name}")
+ private String multiTypeTopicName;
+
+ @Bean
+ public KafkaAdmin kafkaAdmin() {
+ Map configs = new HashMap<>();
+ configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
+ return new KafkaAdmin(configs);
+ }
+
+ @Bean
+ public NewTopic topic1() {
+ return new NewTopic(topicName, 1, (short) 1);
+ }
+
+ @Bean
+ public NewTopic topic2() {
+ return new NewTopic(partitionedTopicName, 6, (short) 1);
+ }
+
+ @Bean
+ public NewTopic topic3() {
+ return new NewTopic(filteredTopicName, 1, (short) 1);
+ }
+
+ @Bean
+ public NewTopic topic4() {
+ return new NewTopic(greetingTopicName, 1, (short) 1);
+ }
+
+ @Bean
+ public NewTopic topic5() {
+ NewTopic newTopic = new NewTopic(longMsgTopicName, 1, (short) 1);
+ Map configs = new HashMap<>();
+ configs.put("max.message.bytes", "20971520");
+ newTopic.configs(configs);
+ return newTopic;
+ }
+
+ @Bean
+ public NewTopic multiTypeTopic() {
+ return new NewTopic(multiTypeTopicName, 1, (short) 1);
+ }
+}
diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/MultiTypeKafkaListener.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/MultiTypeKafkaListener.java
new file mode 100644
index 0000000000..6c4d78171b
--- /dev/null
+++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/MultiTypeKafkaListener.java
@@ -0,0 +1,32 @@
+package com.baeldung.spring.kafka;
+
+import org.springframework.kafka.annotation.KafkaHandler;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.messaging.MessagingException;
+import org.springframework.stereotype.Component;
+
+@Component
+@KafkaListener(id = "multiGroup", topics = "multitype")
+public class MultiTypeKafkaListener {
+
+ @KafkaHandler
+ //@RetryableTopic(backoff = @Backoff(value = 3000L), attempts = "5", autoCreateTopics = "false",include = SocketTimeoutException.class, exclude = NullPointerException.class)
+ public void handleGreeting(Greeting greeting) {
+ if (greeting.getName()
+ .equalsIgnoreCase("test")) {
+ throw new MessagingException("test not allowed");
+ }
+ System.out.println("Greeting received: " + greeting);
+ }
+
+ @KafkaHandler
+ public void handleF(Farewell farewell) {
+ System.out.println("Farewell received: " + farewell);
+ }
+
+ @KafkaHandler(isDefault = true)
+ public void unknown(Object object) {
+ System.out.println("Unkown type received: " + object);
+ }
+
+}
diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/RetryableApplicationKafkaApp.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/RetryableApplicationKafkaApp.java
similarity index 100%
rename from spring-kafka/src/main/java/com/baeldung/spring/kafka/RetryableApplicationKafkaApp.java
rename to spring-kafka-2/src/main/java/com/baeldung/spring/kafka/RetryableApplicationKafkaApp.java
diff --git a/spring-kafka-2/src/main/resources/application.properties b/spring-kafka-2/src/main/resources/application.properties
new file mode 100644
index 0000000000..691b6f55b7
--- /dev/null
+++ b/spring-kafka-2/src/main/resources/application.properties
@@ -0,0 +1,20 @@
+spring.kafka.bootstrap-servers=localhost:9092
+message.topic.name=baeldung
+long.message.topic.name=longMessage
+greeting.topic.name=greeting
+filtered.topic.name=filtered
+partitioned.topic.name=partitioned
+multi.type.topic.name=multitype
+# monitoring - lag analysis
+monitor.kafka.bootstrap.config=localhost:9092
+monitor.kafka.consumer.groupid=baeldungGrp
+monitor.topic.name=baeldung
+# monitoring - simulation
+monitor.producer.simulate=true
+monitor.consumer.simulate=true
+monitor.kafka.consumer.groupid.simulate=baeldungGrpSimulate
+test.topic=testtopic1
+kafka.backoff.interval=9000
+kafka.backoff.max_failure=5
+
+
diff --git a/spring-kafka-2/src/main/resources/logback.xml b/spring-kafka-2/src/main/resources/logback.xml
new file mode 100644
index 0000000000..7d900d8ea8
--- /dev/null
+++ b/spring-kafka-2/src/main/resources/logback.xml
@@ -0,0 +1,13 @@
+
+
+
+
+ %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/spring-kafka/src/test/java/com/baeldung/spring/kafka/KafkaRetryableIntegrationTest.java b/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/KafkaRetryableIntegrationTest.java
similarity index 100%
rename from spring-kafka/src/test/java/com/baeldung/spring/kafka/KafkaRetryableIntegrationTest.java
rename to spring-kafka-2/src/test/java/com/baeldung/spring/kafka/KafkaRetryableIntegrationTest.java
diff --git a/spring-kafka-2/src/test/resources/application.yml b/spring-kafka-2/src/test/resources/application.yml
new file mode 100644
index 0000000000..8b245f08b1
--- /dev/null
+++ b/spring-kafka-2/src/test/resources/application.yml
@@ -0,0 +1,14 @@
+spring:
+ kafka:
+ consumer:
+ auto-offset-reset: earliest
+ group-id: baeldung
+test:
+ topic: embedded-test-topic
+
+monitor:
+ kafka:
+ bootstrap:
+ config: "PLAINTEXT://localhost:9085"
+ consumer:
+ groupid: "baeldungGrp"
diff --git a/spring-kafka-2/src/test/resources/logback-test.xml b/spring-kafka-2/src/test/resources/logback-test.xml
new file mode 100644
index 0000000000..74f126ebc1
--- /dev/null
+++ b/spring-kafka-2/src/test/resources/logback-test.xml
@@ -0,0 +1,19 @@
+
+
+
+
+ %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java
index 463d3209ea..e8aa63a88d 100644
--- a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java
+++ b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java
@@ -1,6 +1,5 @@
package com.baeldung.spring.kafka;
-import java.net.SocketTimeoutException;
import java.util.HashMap;
import java.util.Map;
@@ -9,20 +8,15 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.Primary;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
-import org.springframework.kafka.listener.ContainerProperties;
-import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper;
import org.springframework.kafka.support.mapping.Jackson2JavaTypeMapper;
import org.springframework.kafka.support.serializer.JsonDeserializer;
-import org.springframework.util.backoff.BackOff;
-import org.springframework.util.backoff.FixedBackOff;
@EnableKafka
@Configuration
@@ -31,12 +25,6 @@ public class KafkaConsumerConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
- @Value(value = "${kafka.backoff.interval}")
- private Long interval;
-
- @Value(value = "${kafka.backoff.max_failure}")
- private Long maxAttempts;
-
public ConsumerFactory consumerFactory(String groupId) {
Map props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
@@ -83,7 +71,7 @@ public class KafkaConsumerConfig {
public ConcurrentKafkaListenerContainerFactory filterKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = kafkaListenerContainerFactory("filter");
factory.setRecordFilterStrategy(record -> record.value()
- .contains("World"));
+ .contains("World"));
return factory;
}
@@ -95,7 +83,7 @@ public class KafkaConsumerConfig {
}
@Bean
- public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
+ public ConcurrentKafkaListenerContainerFactory greetingKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(greetingConsumerFactory());
return factory;
@@ -121,32 +109,15 @@ public class KafkaConsumerConfig {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
- props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id_test");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
- @Primary
- public ConcurrentKafkaListenerContainerFactory greetingKafkaListenerContainerFactory() {
+ public ConcurrentKafkaListenerContainerFactory multiTypeKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(multiTypeConsumerFactory());
factory.setMessageConverter(multiTypeConverter());
- factory.setCommonErrorHandler(errorHandler());
- factory.getContainerProperties()
- .setAckMode(ContainerProperties.AckMode.RECORD);
return factory;
}
- @Bean
- public DefaultErrorHandler errorHandler() {
- BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts);
- DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, e) -> {
- System.out.println(String.format("consumed record %s because this exception was thrown",consumerRecord.toString(),e.getClass().getName()));
- }, fixedBackOff);
- //Commented because of the test
- //errorHandler.addRetryableExceptions(SocketTimeoutException.class,RuntimeException.class);
- errorHandler.addNotRetryableExceptions(NullPointerException.class);
- return errorHandler;
- }
-
}
diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/MultiTypeKafkaListener.java b/spring-kafka/src/main/java/com/baeldung/spring/kafka/MultiTypeKafkaListener.java
index 4b43c84f15..9afb5ff0b6 100644
--- a/spring-kafka/src/main/java/com/baeldung/spring/kafka/MultiTypeKafkaListener.java
+++ b/spring-kafka/src/main/java/com/baeldung/spring/kafka/MultiTypeKafkaListener.java
@@ -2,7 +2,6 @@ package com.baeldung.spring.kafka;
import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;
@Component
@@ -10,12 +9,7 @@ import org.springframework.stereotype.Component;
public class MultiTypeKafkaListener {
@KafkaHandler
- //@RetryableTopic(backoff = @Backoff(value = 3000L), attempts = "5", autoCreateTopics = "false",include = SocketTimeoutException.class, exclude = NullPointerException.class)
public void handleGreeting(Greeting greeting) {
- if (greeting.getName()
- .equalsIgnoreCase("test")) {
- throw new MessagingException("test not allowed");
- }
System.out.println("Greeting received: " + greeting);
}
diff --git a/spring-kafka/src/main/resources/application.properties b/spring-kafka/src/main/resources/application.properties
index 691b6f55b7..c57537e2af 100644
--- a/spring-kafka/src/main/resources/application.properties
+++ b/spring-kafka/src/main/resources/application.properties
@@ -14,7 +14,4 @@ monitor.producer.simulate=true
monitor.consumer.simulate=true
monitor.kafka.consumer.groupid.simulate=baeldungGrpSimulate
test.topic=testtopic1
-kafka.backoff.interval=9000
-kafka.backoff.max_failure=5
-