From 4f5ae9020a16040bf701efa294eee7b9d6059bc4 Mon Sep 17 00:00:00 2001 From: Cesare Date: Wed, 11 Jan 2023 10:23:05 +0100 Subject: [PATCH] BAEL-6057 - Implementing Retry In Kafka Consumer (moved code example to new module) --- spring-kafka-2/README.md | 7 + spring-kafka-2/pom.xml | 70 ++++++++ .../com/baeldung/spring/kafka/Farewell.java | 37 +++++ .../com/baeldung/spring/kafka/Greeting.java | 37 +++++ .../spring/kafka/KafkaConsumerConfig.java | 152 ++++++++++++++++++ .../spring/kafka/KafkaProducerConfig.java | 67 ++++++++ .../spring/kafka/KafkaTopicConfig.java | 77 +++++++++ .../spring/kafka/MultiTypeKafkaListener.java | 32 ++++ .../kafka/RetryableApplicationKafkaApp.java | 0 .../src/main/resources/application.properties | 20 +++ spring-kafka-2/src/main/resources/logback.xml | 13 ++ .../kafka/KafkaRetryableIntegrationTest.java | 0 .../src/test/resources/application.yml | 14 ++ .../src/test/resources/logback-test.xml | 19 +++ .../spring/kafka/KafkaConsumerConfig.java | 35 +--- .../spring/kafka/MultiTypeKafkaListener.java | 6 - .../src/main/resources/application.properties | 3 - 17 files changed, 548 insertions(+), 41 deletions(-) create mode 100644 spring-kafka-2/README.md create mode 100644 spring-kafka-2/pom.xml create mode 100644 spring-kafka-2/src/main/java/com/baeldung/spring/kafka/Farewell.java create mode 100644 spring-kafka-2/src/main/java/com/baeldung/spring/kafka/Greeting.java create mode 100644 spring-kafka-2/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java create mode 100644 spring-kafka-2/src/main/java/com/baeldung/spring/kafka/KafkaProducerConfig.java create mode 100644 spring-kafka-2/src/main/java/com/baeldung/spring/kafka/KafkaTopicConfig.java create mode 100644 spring-kafka-2/src/main/java/com/baeldung/spring/kafka/MultiTypeKafkaListener.java rename {spring-kafka => spring-kafka-2}/src/main/java/com/baeldung/spring/kafka/RetryableApplicationKafkaApp.java (100%) create mode 100644 spring-kafka-2/src/main/resources/application.properties create mode 100644 spring-kafka-2/src/main/resources/logback.xml rename {spring-kafka => spring-kafka-2}/src/test/java/com/baeldung/spring/kafka/KafkaRetryableIntegrationTest.java (100%) create mode 100644 spring-kafka-2/src/test/resources/application.yml create mode 100644 spring-kafka-2/src/test/resources/logback-test.xml diff --git a/spring-kafka-2/README.md b/spring-kafka-2/README.md new file mode 100644 index 0000000000..9a5ce627dd --- /dev/null +++ b/spring-kafka-2/README.md @@ -0,0 +1,7 @@ +## Spring Kafka 2 + +This module contains articles about Spring with Kafka + +### Relevant articles + +- [Intro to Apache Kafka with Spring](https://www.baeldung.com/spring-kafka) diff --git a/spring-kafka-2/pom.xml b/spring-kafka-2/pom.xml new file mode 100644 index 0000000000..d51c2e300f --- /dev/null +++ b/spring-kafka-2/pom.xml @@ -0,0 +1,70 @@ + + + 4.0.0 + spring-kafka + spring-kafka + Intro to Kafka with Spring + + + com.baeldung + parent-boot-2 + 0.0.1-SNAPSHOT + ../parent-boot-2 + + + + + org.springframework.boot + spring-boot-starter + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.kafka + spring-kafka + + + org.apache.kafka + kafka-streams + + + com.fasterxml.jackson.core + jackson-databind + + + org.projectlombok + lombok + + + org.springframework.kafka + spring-kafka-test + test + + + org.testcontainers + kafka + ${testcontainers-kafka.version} + test + + + org.testcontainers + junit-jupiter + ${testcontainers-kafka.version} + test + + + org.awaitility + awaitility + test + + + + + 1.16.2 + + + \ No newline at end of file diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/Farewell.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/Farewell.java new file mode 100644 index 0000000000..bbff315ad2 --- /dev/null +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/Farewell.java @@ -0,0 +1,37 @@ +package com.baeldung.spring.kafka; + +public class Farewell { + + private String message; + private Integer remainingMinutes; + + public Farewell() { + } + + public Farewell(String message, Integer remainingMinutes) { + this.message = message; + this.remainingMinutes = remainingMinutes; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public Integer getRemainingMinutes() { + return remainingMinutes; + } + + public void setRemainingMinutes(Integer remainingMinutes) { + this.remainingMinutes = remainingMinutes; + } + + @Override + public String toString() { + return message + ". In " + remainingMinutes + "!"; + } + +} diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/Greeting.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/Greeting.java new file mode 100644 index 0000000000..b4633e802a --- /dev/null +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/Greeting.java @@ -0,0 +1,37 @@ +package com.baeldung.spring.kafka; + +public class Greeting { + + private String msg; + private String name; + + public Greeting() { + + } + + public Greeting(String msg, String name) { + this.msg = msg; + this.name = name; + } + + public String getMsg() { + return msg; + } + + public void setMsg(String msg) { + this.msg = msg; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + @Override + public String toString() { + return msg + ", " + name + "!"; + } +} diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java new file mode 100644 index 0000000000..463d3209ea --- /dev/null +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java @@ -0,0 +1,152 @@ +package com.baeldung.spring.kafka; + +import java.net.SocketTimeoutException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.listener.DefaultErrorHandler; +import org.springframework.kafka.support.converter.RecordMessageConverter; +import org.springframework.kafka.support.converter.StringJsonMessageConverter; +import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper; +import org.springframework.kafka.support.mapping.Jackson2JavaTypeMapper; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.util.backoff.BackOff; +import org.springframework.util.backoff.FixedBackOff; + +@EnableKafka +@Configuration +public class KafkaConsumerConfig { + + @Value(value = "${spring.kafka.bootstrap-servers}") + private String bootstrapAddress; + + @Value(value = "${kafka.backoff.interval}") + private Long interval; + + @Value(value = "${kafka.backoff.max_failure}") + private Long maxAttempts; + + public ConsumerFactory consumerFactory(String groupId) { + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "20971520"); + props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "20971520"); + return new DefaultKafkaConsumerFactory<>(props); + } + + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(String groupId) { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory(groupId)); + return factory; + } + + @Bean + public ConcurrentKafkaListenerContainerFactory fooKafkaListenerContainerFactory() { + return kafkaListenerContainerFactory("foo"); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory barKafkaListenerContainerFactory() { + return kafkaListenerContainerFactory("bar"); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory headersKafkaListenerContainerFactory() { + return kafkaListenerContainerFactory("headers"); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory partitionsKafkaListenerContainerFactory() { + return kafkaListenerContainerFactory("partitions"); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory longMessageKafkaListenerContainerFactory() { + return kafkaListenerContainerFactory("longMessage"); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory filterKafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = kafkaListenerContainerFactory("filter"); + factory.setRecordFilterStrategy(record -> record.value() + .contains("World")); + return factory; + } + + public ConsumerFactory greetingConsumerFactory() { + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "greeting"); + return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(Greeting.class)); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(greetingConsumerFactory()); + return factory; + } + + @Bean + public RecordMessageConverter multiTypeConverter() { + StringJsonMessageConverter converter = new StringJsonMessageConverter(); + DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper(); + typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID); + typeMapper.addTrustedPackages("com.baeldung.spring.kafka"); + Map> mappings = new HashMap<>(); + mappings.put("greeting", Greeting.class); + mappings.put("farewell", Farewell.class); + typeMapper.setIdClassMapping(mappings); + converter.setTypeMapper(typeMapper); + return converter; + } + + @Bean + public ConsumerFactory multiTypeConsumerFactory() { + HashMap props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id_test"); + return new DefaultKafkaConsumerFactory<>(props); + } + + @Bean + @Primary + public ConcurrentKafkaListenerContainerFactory greetingKafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(multiTypeConsumerFactory()); + factory.setMessageConverter(multiTypeConverter()); + factory.setCommonErrorHandler(errorHandler()); + factory.getContainerProperties() + .setAckMode(ContainerProperties.AckMode.RECORD); + return factory; + } + + @Bean + public DefaultErrorHandler errorHandler() { + BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts); + DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, e) -> { + System.out.println(String.format("consumed record %s because this exception was thrown",consumerRecord.toString(),e.getClass().getName())); + }, fixedBackOff); + //Commented because of the test + //errorHandler.addRetryableExceptions(SocketTimeoutException.class,RuntimeException.class); + errorHandler.addNotRetryableExceptions(NullPointerException.class); + return errorHandler; + } + +} diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/KafkaProducerConfig.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/KafkaProducerConfig.java new file mode 100644 index 0000000000..da8b2bd1a6 --- /dev/null +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/KafkaProducerConfig.java @@ -0,0 +1,67 @@ +package com.baeldung.spring.kafka; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.serializer.JsonSerializer; + +@Configuration +public class KafkaProducerConfig { + + @Value(value = "${spring.kafka.bootstrap-servers}") + private String bootstrapAddress; + + @Bean + public ProducerFactory producerFactory() { + Map configProps = new HashMap<>(); + configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "20971520"); + + return new DefaultKafkaProducerFactory<>(configProps); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } + + @Bean + public ProducerFactory greetingProducerFactory() { + Map configProps = new HashMap<>(); + configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + return new DefaultKafkaProducerFactory<>(configProps); + } + + @Bean + public KafkaTemplate greetingKafkaTemplate() { + return new KafkaTemplate<>(greetingProducerFactory()); + } + + @Bean + public ProducerFactory multiTypeProducerFactory() { + Map configProps = new HashMap<>(); + configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + configProps.put(JsonSerializer.TYPE_MAPPINGS, "greeting:com.baeldung.spring.kafka.Greeting, farewell:com.baeldung.spring.kafka.Farewell"); + return new DefaultKafkaProducerFactory<>(configProps); + } + + @Bean + public KafkaTemplate multiTypeKafkaTemplate() { + return new KafkaTemplate<>(multiTypeProducerFactory()); + } + +} diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/KafkaTopicConfig.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/KafkaTopicConfig.java new file mode 100644 index 0000000000..6a20915699 --- /dev/null +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/KafkaTopicConfig.java @@ -0,0 +1,77 @@ +package com.baeldung.spring.kafka; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.KafkaAdmin; + +@Configuration +public class KafkaTopicConfig { + + @Value(value = "${spring.kafka.bootstrap-servers}") + private String bootstrapAddress; + + @Value(value = "${message.topic.name}") + private String topicName; + + @Value(value = "${long.message.topic.name}") + private String longMsgTopicName; + + @Value(value = "${partitioned.topic.name}") + private String partitionedTopicName; + + @Value(value = "${filtered.topic.name}") + private String filteredTopicName; + + @Value(value = "${greeting.topic.name}") + private String greetingTopicName; + + @Value(value = "${multi.type.topic.name}") + private String multiTypeTopicName; + + @Bean + public KafkaAdmin kafkaAdmin() { + Map configs = new HashMap<>(); + configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + return new KafkaAdmin(configs); + } + + @Bean + public NewTopic topic1() { + return new NewTopic(topicName, 1, (short) 1); + } + + @Bean + public NewTopic topic2() { + return new NewTopic(partitionedTopicName, 6, (short) 1); + } + + @Bean + public NewTopic topic3() { + return new NewTopic(filteredTopicName, 1, (short) 1); + } + + @Bean + public NewTopic topic4() { + return new NewTopic(greetingTopicName, 1, (short) 1); + } + + @Bean + public NewTopic topic5() { + NewTopic newTopic = new NewTopic(longMsgTopicName, 1, (short) 1); + Map configs = new HashMap<>(); + configs.put("max.message.bytes", "20971520"); + newTopic.configs(configs); + return newTopic; + } + + @Bean + public NewTopic multiTypeTopic() { + return new NewTopic(multiTypeTopicName, 1, (short) 1); + } +} diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/MultiTypeKafkaListener.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/MultiTypeKafkaListener.java new file mode 100644 index 0000000000..6c4d78171b --- /dev/null +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/MultiTypeKafkaListener.java @@ -0,0 +1,32 @@ +package com.baeldung.spring.kafka; + +import org.springframework.kafka.annotation.KafkaHandler; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.messaging.MessagingException; +import org.springframework.stereotype.Component; + +@Component +@KafkaListener(id = "multiGroup", topics = "multitype") +public class MultiTypeKafkaListener { + + @KafkaHandler + //@RetryableTopic(backoff = @Backoff(value = 3000L), attempts = "5", autoCreateTopics = "false",include = SocketTimeoutException.class, exclude = NullPointerException.class) + public void handleGreeting(Greeting greeting) { + if (greeting.getName() + .equalsIgnoreCase("test")) { + throw new MessagingException("test not allowed"); + } + System.out.println("Greeting received: " + greeting); + } + + @KafkaHandler + public void handleF(Farewell farewell) { + System.out.println("Farewell received: " + farewell); + } + + @KafkaHandler(isDefault = true) + public void unknown(Object object) { + System.out.println("Unkown type received: " + object); + } + +} diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/RetryableApplicationKafkaApp.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/RetryableApplicationKafkaApp.java similarity index 100% rename from spring-kafka/src/main/java/com/baeldung/spring/kafka/RetryableApplicationKafkaApp.java rename to spring-kafka-2/src/main/java/com/baeldung/spring/kafka/RetryableApplicationKafkaApp.java diff --git a/spring-kafka-2/src/main/resources/application.properties b/spring-kafka-2/src/main/resources/application.properties new file mode 100644 index 0000000000..691b6f55b7 --- /dev/null +++ b/spring-kafka-2/src/main/resources/application.properties @@ -0,0 +1,20 @@ +spring.kafka.bootstrap-servers=localhost:9092 +message.topic.name=baeldung +long.message.topic.name=longMessage +greeting.topic.name=greeting +filtered.topic.name=filtered +partitioned.topic.name=partitioned +multi.type.topic.name=multitype +# monitoring - lag analysis +monitor.kafka.bootstrap.config=localhost:9092 +monitor.kafka.consumer.groupid=baeldungGrp +monitor.topic.name=baeldung +# monitoring - simulation +monitor.producer.simulate=true +monitor.consumer.simulate=true +monitor.kafka.consumer.groupid.simulate=baeldungGrpSimulate +test.topic=testtopic1 +kafka.backoff.interval=9000 +kafka.backoff.max_failure=5 + + diff --git a/spring-kafka-2/src/main/resources/logback.xml b/spring-kafka-2/src/main/resources/logback.xml new file mode 100644 index 0000000000..7d900d8ea8 --- /dev/null +++ b/spring-kafka-2/src/main/resources/logback.xml @@ -0,0 +1,13 @@ + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + \ No newline at end of file diff --git a/spring-kafka/src/test/java/com/baeldung/spring/kafka/KafkaRetryableIntegrationTest.java b/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/KafkaRetryableIntegrationTest.java similarity index 100% rename from spring-kafka/src/test/java/com/baeldung/spring/kafka/KafkaRetryableIntegrationTest.java rename to spring-kafka-2/src/test/java/com/baeldung/spring/kafka/KafkaRetryableIntegrationTest.java diff --git a/spring-kafka-2/src/test/resources/application.yml b/spring-kafka-2/src/test/resources/application.yml new file mode 100644 index 0000000000..8b245f08b1 --- /dev/null +++ b/spring-kafka-2/src/test/resources/application.yml @@ -0,0 +1,14 @@ +spring: + kafka: + consumer: + auto-offset-reset: earliest + group-id: baeldung +test: + topic: embedded-test-topic + +monitor: + kafka: + bootstrap: + config: "PLAINTEXT://localhost:9085" + consumer: + groupid: "baeldungGrp" diff --git a/spring-kafka-2/src/test/resources/logback-test.xml b/spring-kafka-2/src/test/resources/logback-test.xml new file mode 100644 index 0000000000..74f126ebc1 --- /dev/null +++ b/spring-kafka-2/src/test/resources/logback-test.xml @@ -0,0 +1,19 @@ + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + + + + + \ No newline at end of file diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java index 463d3209ea..e8aa63a88d 100644 --- a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java +++ b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java @@ -1,6 +1,5 @@ package com.baeldung.spring.kafka; -import java.net.SocketTimeoutException; import java.util.HashMap; import java.util.Map; @@ -9,20 +8,15 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Primary; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -import org.springframework.kafka.listener.ContainerProperties; -import org.springframework.kafka.listener.DefaultErrorHandler; import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.kafka.support.converter.StringJsonMessageConverter; import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper; import org.springframework.kafka.support.mapping.Jackson2JavaTypeMapper; import org.springframework.kafka.support.serializer.JsonDeserializer; -import org.springframework.util.backoff.BackOff; -import org.springframework.util.backoff.FixedBackOff; @EnableKafka @Configuration @@ -31,12 +25,6 @@ public class KafkaConsumerConfig { @Value(value = "${spring.kafka.bootstrap-servers}") private String bootstrapAddress; - @Value(value = "${kafka.backoff.interval}") - private Long interval; - - @Value(value = "${kafka.backoff.max_failure}") - private Long maxAttempts; - public ConsumerFactory consumerFactory(String groupId) { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); @@ -83,7 +71,7 @@ public class KafkaConsumerConfig { public ConcurrentKafkaListenerContainerFactory filterKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = kafkaListenerContainerFactory("filter"); factory.setRecordFilterStrategy(record -> record.value() - .contains("World")); + .contains("World")); return factory; } @@ -95,7 +83,7 @@ public class KafkaConsumerConfig { } @Bean - public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + public ConcurrentKafkaListenerContainerFactory greetingKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(greetingConsumerFactory()); return factory; @@ -121,32 +109,15 @@ public class KafkaConsumerConfig { props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id_test"); return new DefaultKafkaConsumerFactory<>(props); } @Bean - @Primary - public ConcurrentKafkaListenerContainerFactory greetingKafkaListenerContainerFactory() { + public ConcurrentKafkaListenerContainerFactory multiTypeKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(multiTypeConsumerFactory()); factory.setMessageConverter(multiTypeConverter()); - factory.setCommonErrorHandler(errorHandler()); - factory.getContainerProperties() - .setAckMode(ContainerProperties.AckMode.RECORD); return factory; } - @Bean - public DefaultErrorHandler errorHandler() { - BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts); - DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, e) -> { - System.out.println(String.format("consumed record %s because this exception was thrown",consumerRecord.toString(),e.getClass().getName())); - }, fixedBackOff); - //Commented because of the test - //errorHandler.addRetryableExceptions(SocketTimeoutException.class,RuntimeException.class); - errorHandler.addNotRetryableExceptions(NullPointerException.class); - return errorHandler; - } - } diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/MultiTypeKafkaListener.java b/spring-kafka/src/main/java/com/baeldung/spring/kafka/MultiTypeKafkaListener.java index 4b43c84f15..9afb5ff0b6 100644 --- a/spring-kafka/src/main/java/com/baeldung/spring/kafka/MultiTypeKafkaListener.java +++ b/spring-kafka/src/main/java/com/baeldung/spring/kafka/MultiTypeKafkaListener.java @@ -2,7 +2,6 @@ package com.baeldung.spring.kafka; import org.springframework.kafka.annotation.KafkaHandler; import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.messaging.MessagingException; import org.springframework.stereotype.Component; @Component @@ -10,12 +9,7 @@ import org.springframework.stereotype.Component; public class MultiTypeKafkaListener { @KafkaHandler - //@RetryableTopic(backoff = @Backoff(value = 3000L), attempts = "5", autoCreateTopics = "false",include = SocketTimeoutException.class, exclude = NullPointerException.class) public void handleGreeting(Greeting greeting) { - if (greeting.getName() - .equalsIgnoreCase("test")) { - throw new MessagingException("test not allowed"); - } System.out.println("Greeting received: " + greeting); } diff --git a/spring-kafka/src/main/resources/application.properties b/spring-kafka/src/main/resources/application.properties index 691b6f55b7..c57537e2af 100644 --- a/spring-kafka/src/main/resources/application.properties +++ b/spring-kafka/src/main/resources/application.properties @@ -14,7 +14,4 @@ monitor.producer.simulate=true monitor.consumer.simulate=true monitor.kafka.consumer.groupid.simulate=baeldungGrpSimulate test.topic=testtopic1 -kafka.backoff.interval=9000 -kafka.backoff.max_failure=5 -