Merge pull request #15420 from constantinurs/spring-kafka-dlt

BAEL-7199: Dead Letter Queue for Kafka with Spring
This commit is contained in:
Andrea Giulio Cerasoni 2023-12-22 08:47:00 +00:00 committed by GitHub
commit f1ec994d06
11 changed files with 490 additions and 0 deletions

View File

@ -0,0 +1,36 @@
package com.baeldung.spring.kafka.dlt;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ConsumerFactory<String, Payment> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(Payment.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Payment> containerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Payment> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}

View File

@ -0,0 +1,13 @@
package com.baeldung.spring.kafka.dlt;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;
@EnableKafka
@SpringBootApplication
public class KafkaDltApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaDltApplication.class, args);
}
}

View File

@ -0,0 +1,34 @@
package com.baeldung.spring.kafka.dlt;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
@Configuration
public class KafkaRetryConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, Payment> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return new DefaultKafkaProducerFactory<>(config, new StringSerializer(), new JsonSerializer<>());
}
@Bean
public KafkaTemplate<String, Payment> retryableTopicKafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}

View File

@ -0,0 +1,43 @@
package com.baeldung.spring.kafka.dlt;
import java.math.BigDecimal;
import java.util.Currency;
import java.util.StringJoiner;
public class Payment {
private String reference;
private BigDecimal amount;
private Currency currency;
public String getReference() {
return reference;
}
public void setReference(String reference) {
this.reference = reference;
}
public BigDecimal getAmount() {
return amount;
}
public void setAmount(BigDecimal amount) {
this.amount = amount;
}
public Currency getCurrency() {
return currency;
}
public void setCurrency(Currency currency) {
this.currency = currency;
}
@Override
public String toString() {
return new StringJoiner(", ", Payment.class.getSimpleName() + "[", "]").add("reference='" + reference + "'")
.add("amount=" + amount)
.add("currency=" + currency)
.toString();
}
}

View File

@ -0,0 +1,29 @@
package com.baeldung.spring.kafka.dlt.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.DltHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.retrytopic.DltStrategy;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
import com.baeldung.spring.kafka.dlt.Payment;
@Service
public class PaymentListenerDltFailOnError {
private final Logger log = LoggerFactory.getLogger(PaymentListenerDltFailOnError.class);
@RetryableTopic(attempts = "1", kafkaTemplate = "retryableTopicKafkaTemplate", dltStrategy = DltStrategy.FAIL_ON_ERROR)
@KafkaListener(topics = { "payments-fail-on-error-dlt" }, groupId = "payments")
public void handlePayment(Payment payment, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("Event on main topic={}, payload={}", topic, payment);
}
@DltHandler
public void handleDltPayment(Payment payment, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("Event on dlt topic={}, payload={}", topic, payment);
}
}

View File

@ -0,0 +1,29 @@
package com.baeldung.spring.kafka.dlt.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.DltHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.retrytopic.DltStrategy;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
import com.baeldung.spring.kafka.dlt.Payment;
@Service
public class PaymentListenerDltRetryOnError {
private final Logger log = LoggerFactory.getLogger(PaymentListenerDltRetryOnError.class);
@RetryableTopic(attempts = "1", kafkaTemplate = "retryableTopicKafkaTemplate", dltStrategy = DltStrategy.ALWAYS_RETRY_ON_ERROR)
@KafkaListener(topics = { "payments-retry-on-error-dlt" }, groupId = "payments")
public void handlePayment(Payment payment, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("Event on main topic={}, payload={}", topic, payment);
}
@DltHandler
public void handleDltPayment(Payment payment, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("Event on dlt topic={}, payload={}", topic, payment);
}
}

View File

@ -0,0 +1,29 @@
package com.baeldung.spring.kafka.dlt.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.DltHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.retrytopic.DltStrategy;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
import com.baeldung.spring.kafka.dlt.Payment;
@Service
public class PaymentListenerNoDlt {
private final Logger log = LoggerFactory.getLogger(PaymentListenerNoDlt.class);
@RetryableTopic(attempts = "1", kafkaTemplate = "retryableTopicKafkaTemplate", dltStrategy = DltStrategy.NO_DLT)
@KafkaListener(topics = { "payments-no-dlt" }, groupId = "payments")
public void handlePayment(Payment payment, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("Event on main topic={}, payload={}", topic, payment);
}
@DltHandler
public void handleDltPayment(Payment payment, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("Event on dlt topic={}, payload={}", topic, payment);
}
}

View File

@ -0,0 +1,87 @@
package com.baeldung.spring.kafka.dlt;
import static com.baeldung.spring.kafka.dlt.PaymentTestUtils.createPayment;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import com.baeldung.spring.kafka.dlt.listener.PaymentListenerDltFailOnError;
@SpringBootTest(classes = KafkaDltApplication.class)
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9099", "port=9099" })
public class KafkaDltFailOnErrorIntegrationTest {
private static final String TOPIC = "payments-fail-on-error-dlt";
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Autowired
private KafkaTemplate<String, Payment> kafkaProducer;
@SpyBean
private PaymentListenerDltFailOnError paymentsConsumer;
@BeforeEach
void setUp() {
// wait for embedded Kafka
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
ContainerTestUtils.waitForAssignment(messageListenerContainer, 1);
}
}
@Test
public void whenMainConsumerSucceeds_thenNoDltMessage() throws Exception {
CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1);
doAnswer(invocation -> {
mainTopicCountDownLatch.countDown();
return null;
}).when(paymentsConsumer)
.handlePayment(any(), any());
kafkaProducer.send(TOPIC, createPayment("dlt-fail-main"));
assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
verify(paymentsConsumer, never()).handleDltPayment(any(), any());
}
@Test
public void whenDltConsumerFails_thenDltProcessingStops() throws Exception {
CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1);
CountDownLatch dlTTopicCountDownLatch = new CountDownLatch(2);
doAnswer(invocation -> {
mainTopicCountDownLatch.countDown();
throw new Exception("Simulating error in main consumer");
}).when(paymentsConsumer)
.handlePayment(any(), any());
doAnswer(invocation -> {
dlTTopicCountDownLatch.countDown();
throw new Exception("Simulating error in dlt consumer");
}).when(paymentsConsumer)
.handleDltPayment(any(), any());
kafkaProducer.send(TOPIC, createPayment("dlt-fail"));
assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(dlTTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isFalse();
assertThat(dlTTopicCountDownLatch.getCount()).isEqualTo(1);
}
}

View File

@ -0,0 +1,88 @@
package com.baeldung.spring.kafka.dlt;
import static com.baeldung.spring.kafka.dlt.PaymentTestUtils.createPayment;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import com.baeldung.spring.kafka.dlt.listener.PaymentListenerDltRetryOnError;
@SpringBootTest(classes = KafkaDltApplication.class)
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9099", "port=9099" })
public class KafkaDltRetryOnErrorIntegrationTest {
private static final String TOPIC = "payments-retry-on-error-dlt";
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Autowired
private KafkaTemplate<String, Payment> kafkaProducer;
@SpyBean
private PaymentListenerDltRetryOnError paymentsConsumer;
@BeforeEach
void setUp() {
// wait for embedded Kafka
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
ContainerTestUtils.waitForAssignment(messageListenerContainer, 1);
}
}
@Test
public void whenMainConsumerSucceeds_thenNoDltMessage() throws Exception {
CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1);
doAnswer(invocation -> {
mainTopicCountDownLatch.countDown();
return null;
}).when(paymentsConsumer)
.handlePayment(any(), any());
kafkaProducer.send(TOPIC, createPayment("dlt-retry-main"));
assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
verify(paymentsConsumer, never()).handleDltPayment(any(), any());
}
@Test
public void whenDltConsumerFails_thenDltConsumerRetriesMessage() throws Exception {
CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1);
CountDownLatch dlTTopicCountDownLatch = new CountDownLatch(3);
doAnswer(invocation -> {
mainTopicCountDownLatch.countDown();
throw new Exception("Simulating error in main consumer");
}).when(paymentsConsumer)
.handlePayment(any(), any());
doAnswer(invocation -> {
dlTTopicCountDownLatch.countDown();
throw new Exception("Simulating error in dlt consumer");
}).when(paymentsConsumer)
.handleDltPayment(any(), any());
kafkaProducer.send(TOPIC, createPayment("dlt-retry"));
assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(dlTTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(dlTTopicCountDownLatch.getCount()).isEqualTo(0);
}
}

View File

@ -0,0 +1,87 @@
package com.baeldung.spring.kafka.dlt;
import static com.baeldung.spring.kafka.dlt.PaymentTestUtils.createPayment;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import com.baeldung.spring.kafka.dlt.listener.PaymentListenerNoDlt;
@SpringBootTest(classes = KafkaDltApplication.class)
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9099", "port=9099" })
public class KafkaNoDltIntegrationTest {
private static final String TOPIC = "payments-no-dlt";
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Autowired
private KafkaTemplate<String, Payment> kafkaProducer;
@SpyBean
private PaymentListenerNoDlt paymentsConsumer;
@BeforeEach
void setUp() {
// wait for embedded Kafka
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
ContainerTestUtils.waitForAssignment(messageListenerContainer, 1);
}
}
@Test
public void whenMainConsumerSucceeds_thenNoDltMessage() throws Exception {
CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1);
doAnswer(invocation -> {
mainTopicCountDownLatch.countDown();
return null;
}).when(paymentsConsumer)
.handlePayment(any(), any());
kafkaProducer.send(TOPIC, createPayment("no-dlt-main"));
assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
verify(paymentsConsumer, never()).handleDltPayment(any(), any());
}
@Test
public void whenMainConsumerFails_thenDltConsumerDoesNotReceiveMessage() throws Exception {
CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1);
CountDownLatch dlTTopicCountDownLatch = new CountDownLatch(1);
doAnswer(invocation -> {
mainTopicCountDownLatch.countDown();
throw new Exception("Simulating error in main consumer");
}).when(paymentsConsumer)
.handlePayment(any(), any());
doAnswer(invocation -> {
dlTTopicCountDownLatch.countDown();
return null;
}).when(paymentsConsumer)
.handleDltPayment(any(), any());
kafkaProducer.send(TOPIC, createPayment("no-dlt"));
assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(dlTTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isFalse();
assertThat(dlTTopicCountDownLatch.getCount()).isEqualTo(1);
}
}

View File

@ -0,0 +1,15 @@
package com.baeldung.spring.kafka.dlt;
import java.math.BigDecimal;
import java.util.Currency;
class PaymentTestUtils {
static Payment createPayment(String reference) {
Payment payment = new Payment();
payment.setAmount(BigDecimal.valueOf(71));
payment.setCurrency(Currency.getInstance("GBP"));
payment.setReference(reference);
return payment;
}
}