diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/dlt/KafkaConsumerConfig.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/dlt/KafkaConsumerConfig.java new file mode 100644 index 0000000000..557fa11ed3 --- /dev/null +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/dlt/KafkaConsumerConfig.java @@ -0,0 +1,36 @@ +package com.baeldung.spring.kafka.dlt; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.support.serializer.JsonDeserializer; + +@Configuration +public class KafkaConsumerConfig { + + @Value(value = "${spring.kafka.bootstrap-servers}") + private String bootstrapServers; + + @Bean + public ConsumerFactory consumerFactory() { + Map config = new HashMap<>(); + config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + + return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(Payment.class)); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory containerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + return factory; + } +} \ No newline at end of file diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/dlt/KafkaDltApplication.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/dlt/KafkaDltApplication.java new file mode 100644 index 0000000000..7aa2e3cdb8 --- /dev/null +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/dlt/KafkaDltApplication.java @@ -0,0 +1,13 @@ +package com.baeldung.spring.kafka.dlt; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.kafka.annotation.EnableKafka; + +@EnableKafka +@SpringBootApplication +public class KafkaDltApplication { + public static void main(String[] args) { + SpringApplication.run(KafkaDltApplication.class, args); + } +} diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/dlt/KafkaRetryConfig.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/dlt/KafkaRetryConfig.java new file mode 100644 index 0000000000..35a2b31168 --- /dev/null +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/dlt/KafkaRetryConfig.java @@ -0,0 +1,34 @@ +package com.baeldung.spring.kafka.dlt; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.serializer.JsonSerializer; + +@Configuration +public class KafkaRetryConfig { + + @Value(value = "${spring.kafka.bootstrap-servers}") + private String bootstrapServers; + + @Bean + public ProducerFactory producerFactory() { + Map config = new HashMap<>(); + config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + + return new DefaultKafkaProducerFactory<>(config, new StringSerializer(), new JsonSerializer<>()); + } + + @Bean + public KafkaTemplate retryableTopicKafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } +} \ No newline at end of file diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/dlt/Payment.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/dlt/Payment.java new file mode 100644 index 0000000000..1554c1671b --- /dev/null +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/dlt/Payment.java @@ -0,0 +1,43 @@ +package com.baeldung.spring.kafka.dlt; + +import java.math.BigDecimal; +import java.util.Currency; +import java.util.StringJoiner; + +public class Payment { + private String reference; + private BigDecimal amount; + private Currency currency; + + public String getReference() { + return reference; + } + + public void setReference(String reference) { + this.reference = reference; + } + + public BigDecimal getAmount() { + return amount; + } + + public void setAmount(BigDecimal amount) { + this.amount = amount; + } + + public Currency getCurrency() { + return currency; + } + + public void setCurrency(Currency currency) { + this.currency = currency; + } + + @Override + public String toString() { + return new StringJoiner(", ", Payment.class.getSimpleName() + "[", "]").add("reference='" + reference + "'") + .add("amount=" + amount) + .add("currency=" + currency) + .toString(); + } +} \ No newline at end of file diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/dlt/listener/PaymentListenerDltFailOnError.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/dlt/listener/PaymentListenerDltFailOnError.java new file mode 100644 index 0000000000..c407310074 --- /dev/null +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/dlt/listener/PaymentListenerDltFailOnError.java @@ -0,0 +1,29 @@ +package com.baeldung.spring.kafka.dlt.listener; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.kafka.annotation.DltHandler; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.annotation.RetryableTopic; +import org.springframework.kafka.retrytopic.DltStrategy; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Service; + +import com.baeldung.spring.kafka.dlt.Payment; + +@Service +public class PaymentListenerDltFailOnError { + private final Logger log = LoggerFactory.getLogger(PaymentListenerDltFailOnError.class); + + @RetryableTopic(attempts = "1", kafkaTemplate = "retryableTopicKafkaTemplate", dltStrategy = DltStrategy.FAIL_ON_ERROR) + @KafkaListener(topics = { "payments-fail-on-error-dlt" }, groupId = "payments") + public void handlePayment(Payment payment, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { + log.info("Event on main topic={}, payload={}", topic, payment); + } + + @DltHandler + public void handleDltPayment(Payment payment, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { + log.info("Event on dlt topic={}, payload={}", topic, payment); + } +} \ No newline at end of file diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/dlt/listener/PaymentListenerDltRetryOnError.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/dlt/listener/PaymentListenerDltRetryOnError.java new file mode 100644 index 0000000000..9c6666c938 --- /dev/null +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/dlt/listener/PaymentListenerDltRetryOnError.java @@ -0,0 +1,29 @@ +package com.baeldung.spring.kafka.dlt.listener; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.kafka.annotation.DltHandler; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.annotation.RetryableTopic; +import org.springframework.kafka.retrytopic.DltStrategy; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Service; + +import com.baeldung.spring.kafka.dlt.Payment; + +@Service +public class PaymentListenerDltRetryOnError { + private final Logger log = LoggerFactory.getLogger(PaymentListenerDltRetryOnError.class); + + @RetryableTopic(attempts = "1", kafkaTemplate = "retryableTopicKafkaTemplate", dltStrategy = DltStrategy.ALWAYS_RETRY_ON_ERROR) + @KafkaListener(topics = { "payments-retry-on-error-dlt" }, groupId = "payments") + public void handlePayment(Payment payment, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { + log.info("Event on main topic={}, payload={}", topic, payment); + } + + @DltHandler + public void handleDltPayment(Payment payment, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { + log.info("Event on dlt topic={}, payload={}", topic, payment); + } +} \ No newline at end of file diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/dlt/listener/PaymentListenerNoDlt.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/dlt/listener/PaymentListenerNoDlt.java new file mode 100644 index 0000000000..a12d423b30 --- /dev/null +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/dlt/listener/PaymentListenerNoDlt.java @@ -0,0 +1,29 @@ +package com.baeldung.spring.kafka.dlt.listener; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.kafka.annotation.DltHandler; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.annotation.RetryableTopic; +import org.springframework.kafka.retrytopic.DltStrategy; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Service; + +import com.baeldung.spring.kafka.dlt.Payment; + +@Service +public class PaymentListenerNoDlt { + private final Logger log = LoggerFactory.getLogger(PaymentListenerNoDlt.class); + + @RetryableTopic(attempts = "1", kafkaTemplate = "retryableTopicKafkaTemplate", dltStrategy = DltStrategy.NO_DLT) + @KafkaListener(topics = { "payments-no-dlt" }, groupId = "payments") + public void handlePayment(Payment payment, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { + log.info("Event on main topic={}, payload={}", topic, payment); + } + + @DltHandler + public void handleDltPayment(Payment payment, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { + log.info("Event on dlt topic={}, payload={}", topic, payment); + } +} \ No newline at end of file diff --git a/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/dlt/KafkaDltFailOnErrorIntegrationTest.java b/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/dlt/KafkaDltFailOnErrorIntegrationTest.java new file mode 100644 index 0000000000..b20ee2ea12 --- /dev/null +++ b/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/dlt/KafkaDltFailOnErrorIntegrationTest.java @@ -0,0 +1,87 @@ +package com.baeldung.spring.kafka.dlt; + +import static com.baeldung.spring.kafka.dlt.PaymentTestUtils.createPayment; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.SpyBean; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.listener.MessageListenerContainer; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.ContainerTestUtils; + +import com.baeldung.spring.kafka.dlt.listener.PaymentListenerDltFailOnError; + +@SpringBootTest(classes = KafkaDltApplication.class) +@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9099", "port=9099" }) +public class KafkaDltFailOnErrorIntegrationTest { + private static final String TOPIC = "payments-fail-on-error-dlt"; + + @Autowired + private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; + + @Autowired + private KafkaTemplate kafkaProducer; + + @SpyBean + private PaymentListenerDltFailOnError paymentsConsumer; + + @BeforeEach + void setUp() { + // wait for embedded Kafka + for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) { + ContainerTestUtils.waitForAssignment(messageListenerContainer, 1); + } + } + + @Test + public void whenMainConsumerSucceeds_thenNoDltMessage() throws Exception { + CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1); + + doAnswer(invocation -> { + mainTopicCountDownLatch.countDown(); + return null; + }).when(paymentsConsumer) + .handlePayment(any(), any()); + + kafkaProducer.send(TOPIC, createPayment("dlt-fail-main")); + + assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue(); + verify(paymentsConsumer, never()).handleDltPayment(any(), any()); + } + + @Test + public void whenDltConsumerFails_thenDltProcessingStops() throws Exception { + CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1); + CountDownLatch dlTTopicCountDownLatch = new CountDownLatch(2); + + doAnswer(invocation -> { + mainTopicCountDownLatch.countDown(); + throw new Exception("Simulating error in main consumer"); + }).when(paymentsConsumer) + .handlePayment(any(), any()); + + doAnswer(invocation -> { + dlTTopicCountDownLatch.countDown(); + throw new Exception("Simulating error in dlt consumer"); + }).when(paymentsConsumer) + .handleDltPayment(any(), any()); + + kafkaProducer.send(TOPIC, createPayment("dlt-fail")); + + assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(dlTTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isFalse(); + assertThat(dlTTopicCountDownLatch.getCount()).isEqualTo(1); + } +} \ No newline at end of file diff --git a/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/dlt/KafkaDltRetryOnErrorIntegrationTest.java b/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/dlt/KafkaDltRetryOnErrorIntegrationTest.java new file mode 100644 index 0000000000..393eb2ae46 --- /dev/null +++ b/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/dlt/KafkaDltRetryOnErrorIntegrationTest.java @@ -0,0 +1,88 @@ +package com.baeldung.spring.kafka.dlt; + +import static com.baeldung.spring.kafka.dlt.PaymentTestUtils.createPayment; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.SpyBean; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.listener.MessageListenerContainer; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.ContainerTestUtils; + +import com.baeldung.spring.kafka.dlt.listener.PaymentListenerDltRetryOnError; + +@SpringBootTest(classes = KafkaDltApplication.class) +@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9099", "port=9099" }) +public class KafkaDltRetryOnErrorIntegrationTest { + private static final String TOPIC = "payments-retry-on-error-dlt"; + + @Autowired + private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; + + @Autowired + private KafkaTemplate kafkaProducer; + + @SpyBean + private PaymentListenerDltRetryOnError paymentsConsumer; + + @BeforeEach + void setUp() { + // wait for embedded Kafka + for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) { + ContainerTestUtils.waitForAssignment(messageListenerContainer, 1); + } + } + + @Test + public void whenMainConsumerSucceeds_thenNoDltMessage() throws Exception { + CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1); + + doAnswer(invocation -> { + mainTopicCountDownLatch.countDown(); + return null; + }).when(paymentsConsumer) + .handlePayment(any(), any()); + + kafkaProducer.send(TOPIC, createPayment("dlt-retry-main")); + + assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue(); + verify(paymentsConsumer, never()).handleDltPayment(any(), any()); + } + + @Test + public void whenDltConsumerFails_thenDltConsumerRetriesMessage() throws Exception { + CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1); + CountDownLatch dlTTopicCountDownLatch = new CountDownLatch(3); + + doAnswer(invocation -> { + mainTopicCountDownLatch.countDown(); + throw new Exception("Simulating error in main consumer"); + }).when(paymentsConsumer) + .handlePayment(any(), any()); + + doAnswer(invocation -> { + dlTTopicCountDownLatch.countDown(); + throw new Exception("Simulating error in dlt consumer"); + }).when(paymentsConsumer) + .handleDltPayment(any(), any()); + + kafkaProducer.send(TOPIC, createPayment("dlt-retry")); + + assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(dlTTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(dlTTopicCountDownLatch.getCount()).isEqualTo(0); + } +} \ No newline at end of file diff --git a/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/dlt/KafkaNoDltIntegrationTest.java b/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/dlt/KafkaNoDltIntegrationTest.java new file mode 100644 index 0000000000..81cca9fec3 --- /dev/null +++ b/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/dlt/KafkaNoDltIntegrationTest.java @@ -0,0 +1,87 @@ +package com.baeldung.spring.kafka.dlt; + +import static com.baeldung.spring.kafka.dlt.PaymentTestUtils.createPayment; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.SpyBean; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.listener.MessageListenerContainer; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.ContainerTestUtils; + +import com.baeldung.spring.kafka.dlt.listener.PaymentListenerNoDlt; + +@SpringBootTest(classes = KafkaDltApplication.class) +@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9099", "port=9099" }) +public class KafkaNoDltIntegrationTest { + private static final String TOPIC = "payments-no-dlt"; + + @Autowired + private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; + + @Autowired + private KafkaTemplate kafkaProducer; + + @SpyBean + private PaymentListenerNoDlt paymentsConsumer; + + @BeforeEach + void setUp() { + // wait for embedded Kafka + for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) { + ContainerTestUtils.waitForAssignment(messageListenerContainer, 1); + } + } + + @Test + public void whenMainConsumerSucceeds_thenNoDltMessage() throws Exception { + CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1); + + doAnswer(invocation -> { + mainTopicCountDownLatch.countDown(); + return null; + }).when(paymentsConsumer) + .handlePayment(any(), any()); + + kafkaProducer.send(TOPIC, createPayment("no-dlt-main")); + + assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue(); + verify(paymentsConsumer, never()).handleDltPayment(any(), any()); + } + + @Test + public void whenMainConsumerFails_thenDltConsumerDoesNotReceiveMessage() throws Exception { + CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1); + CountDownLatch dlTTopicCountDownLatch = new CountDownLatch(1); + + doAnswer(invocation -> { + mainTopicCountDownLatch.countDown(); + throw new Exception("Simulating error in main consumer"); + }).when(paymentsConsumer) + .handlePayment(any(), any()); + + doAnswer(invocation -> { + dlTTopicCountDownLatch.countDown(); + return null; + }).when(paymentsConsumer) + .handleDltPayment(any(), any()); + + kafkaProducer.send(TOPIC, createPayment("no-dlt")); + + assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(dlTTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isFalse(); + assertThat(dlTTopicCountDownLatch.getCount()).isEqualTo(1); + } +} \ No newline at end of file diff --git a/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/dlt/PaymentTestUtils.java b/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/dlt/PaymentTestUtils.java new file mode 100644 index 0000000000..7b8a574779 --- /dev/null +++ b/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/dlt/PaymentTestUtils.java @@ -0,0 +1,15 @@ +package com.baeldung.spring.kafka.dlt; + +import java.math.BigDecimal; +import java.util.Currency; + +class PaymentTestUtils { + + static Payment createPayment(String reference) { + Payment payment = new Payment(); + payment.setAmount(BigDecimal.valueOf(71)); + payment.setCurrency(Currency.getInstance("GBP")); + payment.setReference(reference); + return payment; + } +} \ No newline at end of file