BAEL-6057 - Implementing Retry In Kafka Consumer (#13229)

* Added test class for a simple shallow copy and deep copy

* Added test class for a simple shallow copy and deep copy

* refactor naming of test method

* formatted

* refactor test whenIsAShallowCopyDoneByCopyConstructor_thenImmutableObjectWillNotChange

* Renamed package and added md file

* refactor README.md

* first push

* refactor

* Revert "refactor README.md"

This reverts commit eae77c453ba0bf2af62bad52dc1ed61d07931e34.

* Revert "Renamed package and added md file"

This reverts commit 42c6f97cbde39cc0a5e0bacf34f86a32ded4f4aa.

* Revert "refactor test whenIsAShallowCopyDoneByCopyConstructor_thenImmutableObjectWillNotChange"

This reverts commit 44fb57fe2b51857f960dc216d33508e718e5414f.

* Revert "formatted"

This reverts commit 44be87ef25e566b8e9175cb0fdeed7f0ef485dd3.

* Revert "refactor naming of test method"

This reverts commit 6133c31057e39b19c4978f960cda1c0ba5559aae.

* Revert "Added test class for a simple shallow copy and deep copy"

This reverts commit 2cae083578883ae693d1c5e76fd4948e213e9ea0.

* Revert "Added test class for a simple shallow copy and deep copy"

This reverts commit f43312e2c1979410409f46020a3f7d555e11e966.

* Merge prohect java-supplier-callable to project core-java-lambdas

* adjusted package name

* removed AbstractAgeCalculator.java

* added test for supplier-callable

* first push for article "Implementing Retry In Kafka Consumer"

Co-authored-by: Cesare <cesare.valenti@hotmail.com>
This commit is contained in:
cesarevalenti90 2023-01-02 21:08:01 +01:00 committed by GitHub
parent 306949719b
commit d6bc6bb5d2
6 changed files with 147 additions and 6 deletions

View File

@ -1,5 +1,6 @@
package com.baeldung.spring.kafka;
import java.net.SocketTimeoutException;
import java.util.HashMap;
import java.util.Map;
@ -8,15 +9,20 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper;
import org.springframework.kafka.support.mapping.Jackson2JavaTypeMapper;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;
@EnableKafka
@Configuration
@ -25,6 +31,12 @@ public class KafkaConsumerConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Value(value = "${kafka.backoff.interval}")
private Long interval;
@Value(value = "${kafka.backoff.max_failure}")
private Long maxAttempts;
public ConsumerFactory<String, String> consumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
@ -71,7 +83,7 @@ public class KafkaConsumerConfig {
public ConcurrentKafkaListenerContainerFactory<String, String> filterKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = kafkaListenerContainerFactory("filter");
factory.setRecordFilterStrategy(record -> record.value()
.contains("World"));
.contains("World"));
return factory;
}
@ -83,7 +95,7 @@ public class KafkaConsumerConfig {
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Greeting> greetingKafkaListenerContainerFactory() {
public ConcurrentKafkaListenerContainerFactory<String, Greeting> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Greeting> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(greetingConsumerFactory());
return factory;
@ -109,15 +121,32 @@ public class KafkaConsumerConfig {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id_test");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> multiTypeKafkaListenerContainerFactory() {
@Primary
public ConcurrentKafkaListenerContainerFactory<String, Object> greetingKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(multiTypeConsumerFactory());
factory.setMessageConverter(multiTypeConverter());
factory.setCommonErrorHandler(errorHandler());
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.RECORD);
return factory;
}
@Bean
public DefaultErrorHandler errorHandler() {
BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts);
DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, e) -> {
System.out.println(String.format("consumed record %s because this exception was thrown",consumerRecord.toString(),e.getClass().getName()));
}, fixedBackOff);
//Commented because of the test
//errorHandler.addRetryableExceptions(SocketTimeoutException.class,RuntimeException.class);
errorHandler.addNotRetryableExceptions(NullPointerException.class);
return errorHandler;
}
}

View File

@ -2,6 +2,7 @@ package com.baeldung.spring.kafka;
import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;
@Component
@ -9,7 +10,12 @@ import org.springframework.stereotype.Component;
public class MultiTypeKafkaListener {
@KafkaHandler
//@RetryableTopic(backoff = @Backoff(value = 3000L), attempts = "5", autoCreateTopics = "false",include = SocketTimeoutException.class, exclude = NullPointerException.class)
public void handleGreeting(Greeting greeting) {
if (greeting.getName()
.equalsIgnoreCase("test")) {
throw new MessagingException("test not allowed");
}
System.out.println("Greeting received: " + greeting);
}

View File

@ -0,0 +1,14 @@
package com.baeldung.spring.kafka;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Import;
@SpringBootApplication
@Import(value = { KafkaTopicConfig.class, KafkaConsumerConfig.class, KafkaProducerConfig.class })
public class RetryableApplicationKafkaApp {
public static void main(String[] args) {
SpringApplication.run(RetryableApplicationKafkaApp.class, args);
}
}

View File

@ -14,4 +14,7 @@ monitor.producer.simulate=true
monitor.consumer.simulate=true
monitor.kafka.consumer.groupid.simulate=baeldungGrpSimulate
test.topic=testtopic1
kafka.backoff.interval=9000
kafka.backoff.max_failure=5

View File

@ -1,7 +1,6 @@
package com.baeldung.kafka.embedded;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -16,6 +15,8 @@ import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.annotation.DirtiesContext;
import com.fasterxml.jackson.databind.ObjectMapper;
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
@ -33,6 +34,8 @@ class EmbeddedKafkaIntegrationTest {
@Value("${test.topic}")
private String topic;
private ObjectMapper objectMapper = new ObjectMapper();
@BeforeEach
void setup() {
consumer.resetLatch();
@ -44,7 +47,8 @@ class EmbeddedKafkaIntegrationTest {
template.send(topic, data);
boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS);
boolean messageConsumed = consumer.getLatch()
.await(10, TimeUnit.SECONDS);
assertTrue(messageConsumed);
assertThat(consumer.getPayload(), containsString(data));
}
@ -55,7 +59,8 @@ class EmbeddedKafkaIntegrationTest {
producer.send(topic, data);
boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS);
boolean messageConsumed = consumer.getLatch()
.await(10, TimeUnit.SECONDS);
assertTrue(messageConsumed);
assertThat(consumer.getPayload(), containsString(data));
}

View File

@ -0,0 +1,84 @@
package com.baeldung.spring.kafka;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import com.fasterxml.jackson.databind.ObjectMapper;
@SpringBootTest(classes = RetryableApplicationKafkaApp.class)
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
public class KafkaRetryableIntegrationTest {
@ClassRule
public static EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaBroker(1, true, "multitype");
@Autowired
private KafkaListenerEndpointRegistry registry;
@Autowired
private KafkaTemplate<String, String> template;
private ObjectMapper objectMapper = new ObjectMapper();
private static final String CONTAINER_GROUP = "multiGroup";
private static final String TOPIC = "topic";
@Before
public void setup() {
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
}
@Test
public void givenEmbeddedKafkaBroker_whenSendingAWellFormedMessage_thenMessageIsConsumed() throws Exception {
ConcurrentMessageListenerContainer<?, ?> container = (ConcurrentMessageListenerContainer<?, ?>) registry.getListenerContainer(CONTAINER_GROUP);
container.stop();
@SuppressWarnings("unchecked") AcknowledgingConsumerAwareMessageListener<String, String> messageListener = (AcknowledgingConsumerAwareMessageListener<String, String>) container.getContainerProperties()
.getMessageListener();
CountDownLatch latch = new CountDownLatch(1);
container.getContainerProperties()
.setMessageListener((AcknowledgingConsumerAwareMessageListener<String, String>) (data, acknowledgment, consumer) -> {
messageListener.onMessage(data, acknowledgment, consumer);
latch.countDown();
});
Greeting greeting = new Greeting("test1", "test2");
container.start();
template.send(TOPIC, objectMapper.writeValueAsString(greeting));
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
}
@Test
public void givenEmbeddedKafkaBroker_whenSendingAMalFormedMessage_thenMessageIsConsumedAfterRetry() throws Exception {
ConcurrentMessageListenerContainer<?, ?> container = (ConcurrentMessageListenerContainer<?, ?>) registry.getListenerContainer(CONTAINER_GROUP);
container.stop();
@SuppressWarnings("unchecked") AcknowledgingConsumerAwareMessageListener<String, String> messageListener = (AcknowledgingConsumerAwareMessageListener<String, String>) container.getContainerProperties()
.getMessageListener();
CountDownLatch latch = new CountDownLatch(1);
container.getContainerProperties()
.setMessageListener((AcknowledgingConsumerAwareMessageListener<String, String>) (data, acknowledgment, consumer) -> {
messageListener.onMessage(data, acknowledgment, consumer);
latch.countDown();
});
container.start();
Greeting greeting = new Greeting("test", "test");
template.send(TOPIC, objectMapper.writeValueAsString(greeting));
//this message will go on error
Greeting greeting2 = new Greeting("test2", "test2");
template.send(TOPIC, objectMapper.writeValueAsString(greeting2));
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
}
}