diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/dlt/listener/PaymentListenerDltFailOnError.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/dlt/listener/PaymentListenerDltFailOnError.java index c407310074..e15e3d8261 100644 --- a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/dlt/listener/PaymentListenerDltFailOnError.java +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/dlt/listener/PaymentListenerDltFailOnError.java @@ -17,7 +17,7 @@ public class PaymentListenerDltFailOnError { private final Logger log = LoggerFactory.getLogger(PaymentListenerDltFailOnError.class); @RetryableTopic(attempts = "1", kafkaTemplate = "retryableTopicKafkaTemplate", dltStrategy = DltStrategy.FAIL_ON_ERROR) - @KafkaListener(topics = { "payments-fail-on-error-dlt" }, groupId = "payments") + @KafkaListener(topics = { "payments-fail-on-error-dlt" }, groupId = "payments", containerFactory="containerFactory") public void handlePayment(Payment payment, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { log.info("Event on main topic={}, payload={}", topic, payment); } diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/dlt/listener/PaymentListenerDltRetryOnError.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/dlt/listener/PaymentListenerDltRetryOnError.java index 9c6666c938..c7317b016a 100644 --- a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/dlt/listener/PaymentListenerDltRetryOnError.java +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/dlt/listener/PaymentListenerDltRetryOnError.java @@ -17,7 +17,7 @@ public class PaymentListenerDltRetryOnError { private final Logger log = LoggerFactory.getLogger(PaymentListenerDltRetryOnError.class); @RetryableTopic(attempts = "1", kafkaTemplate = "retryableTopicKafkaTemplate", dltStrategy = DltStrategy.ALWAYS_RETRY_ON_ERROR) - @KafkaListener(topics = { "payments-retry-on-error-dlt" }, groupId = "payments") + @KafkaListener(topics = { "payments-retry-on-error-dlt" }, groupId = "payments", containerFactory="containerFactory") public void handlePayment(Payment payment, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { log.info("Event on main topic={}, payload={}", topic, payment); } diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/dlt/listener/PaymentListenerNoDlt.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/dlt/listener/PaymentListenerNoDlt.java index a12d423b30..b85d9e5984 100644 --- a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/dlt/listener/PaymentListenerNoDlt.java +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/dlt/listener/PaymentListenerNoDlt.java @@ -17,7 +17,7 @@ public class PaymentListenerNoDlt { private final Logger log = LoggerFactory.getLogger(PaymentListenerNoDlt.class); @RetryableTopic(attempts = "1", kafkaTemplate = "retryableTopicKafkaTemplate", dltStrategy = DltStrategy.NO_DLT) - @KafkaListener(topics = { "payments-no-dlt" }, groupId = "payments") + @KafkaListener(topics = { "payments-no-dlt" }, groupId = "payments", containerFactory="containerFactory") public void handlePayment(Payment payment, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { log.info("Event on main topic={}, payload={}", topic, payment); } diff --git a/spring-kafka-2/src/main/resources/application.properties b/spring-kafka-2/src/main/resources/application.properties index 9111491b58..8a74074cf5 100644 --- a/spring-kafka-2/src/main/resources/application.properties +++ b/spring-kafka-2/src/main/resources/application.properties @@ -1,4 +1,4 @@ -spring.kafka.bootstrap-servers=localhost:9092,localhost:9093,localhost:9094 +spring.kafka.bootstrap-servers=localhost:9092,localhost:9093,localhost:9094,localhost:9095 message.topic.name=baeldung long.message.topic.name=longMessage greeting.topic.name=greeting diff --git a/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/dlt/KafkaDltFailOnErrorIntegrationTest.java b/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/dlt/KafkaDltFailOnErrorIntegrationTest.java deleted file mode 100644 index b20ee2ea12..0000000000 --- a/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/dlt/KafkaDltFailOnErrorIntegrationTest.java +++ /dev/null @@ -1,87 +0,0 @@ -package com.baeldung.spring.kafka.dlt; - -import static com.baeldung.spring.kafka.dlt.PaymentTestUtils.createPayment; -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.mock.mockito.SpyBean; -import org.springframework.kafka.config.KafkaListenerEndpointRegistry; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.listener.MessageListenerContainer; -import org.springframework.kafka.test.context.EmbeddedKafka; -import org.springframework.kafka.test.utils.ContainerTestUtils; - -import com.baeldung.spring.kafka.dlt.listener.PaymentListenerDltFailOnError; - -@SpringBootTest(classes = KafkaDltApplication.class) -@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9099", "port=9099" }) -public class KafkaDltFailOnErrorIntegrationTest { - private static final String TOPIC = "payments-fail-on-error-dlt"; - - @Autowired - private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; - - @Autowired - private KafkaTemplate kafkaProducer; - - @SpyBean - private PaymentListenerDltFailOnError paymentsConsumer; - - @BeforeEach - void setUp() { - // wait for embedded Kafka - for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) { - ContainerTestUtils.waitForAssignment(messageListenerContainer, 1); - } - } - - @Test - public void whenMainConsumerSucceeds_thenNoDltMessage() throws Exception { - CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1); - - doAnswer(invocation -> { - mainTopicCountDownLatch.countDown(); - return null; - }).when(paymentsConsumer) - .handlePayment(any(), any()); - - kafkaProducer.send(TOPIC, createPayment("dlt-fail-main")); - - assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue(); - verify(paymentsConsumer, never()).handleDltPayment(any(), any()); - } - - @Test - public void whenDltConsumerFails_thenDltProcessingStops() throws Exception { - CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1); - CountDownLatch dlTTopicCountDownLatch = new CountDownLatch(2); - - doAnswer(invocation -> { - mainTopicCountDownLatch.countDown(); - throw new Exception("Simulating error in main consumer"); - }).when(paymentsConsumer) - .handlePayment(any(), any()); - - doAnswer(invocation -> { - dlTTopicCountDownLatch.countDown(); - throw new Exception("Simulating error in dlt consumer"); - }).when(paymentsConsumer) - .handleDltPayment(any(), any()); - - kafkaProducer.send(TOPIC, createPayment("dlt-fail")); - - assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue(); - assertThat(dlTTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isFalse(); - assertThat(dlTTopicCountDownLatch.getCount()).isEqualTo(1); - } -} \ No newline at end of file diff --git a/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/dlt/KafkaDltIntegrationTest.java b/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/dlt/KafkaDltIntegrationTest.java new file mode 100644 index 0000000000..180b4f639e --- /dev/null +++ b/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/dlt/KafkaDltIntegrationTest.java @@ -0,0 +1,181 @@ +package com.baeldung.spring.kafka.dlt; + +import static com.baeldung.spring.kafka.dlt.PaymentTestUtils.createPayment; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.SpyBean; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.listener.MessageListenerContainer; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.ContainerTestUtils; + +import com.baeldung.spring.kafka.dlt.listener.PaymentListenerDltFailOnError; +import com.baeldung.spring.kafka.dlt.listener.PaymentListenerDltRetryOnError; +import com.baeldung.spring.kafka.dlt.listener.PaymentListenerNoDlt; + +@SpringBootTest(classes = KafkaDltApplication.class) +@EmbeddedKafka( + partitions = 1, + brokerProperties = { "listeners=PLAINTEXT://localhost:9095", "port=9095" }, + topics = {"payments-fail-on-error-dlt", "payments-retry-on-error-dlt", "payments-no-dlt"} +) +public class KafkaDltIntegrationTest { + private static final String FAIL_ON_ERROR_TOPIC = "payments-fail-on-error-dlt"; + private static final String RETRY_ON_ERROR_TOPIC = "payments-retry-on-error-dlt"; + private static final String NO_DLT_TOPIC = "payments-no-dlt"; + + @Autowired + private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; + + @Autowired + private KafkaTemplate kafkaProducer; + + @SpyBean + private PaymentListenerDltFailOnError failOnErrorPaymentsConsumer; + + @SpyBean + private PaymentListenerDltRetryOnError retryOnErrorPaymentsConsumer; + + @SpyBean + private PaymentListenerNoDlt noDltPaymentsConsumer; + + @BeforeEach + void setUp() { + // wait for embedded Kafka + for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) { + ContainerTestUtils.waitForAssignment(messageListenerContainer, 1); + } + } + + @Test + public void whenMainDltFailOnErrorConsumerSucceeds_thenNoDltMessage() throws Exception { + CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1); + + doAnswer(invocation -> { + mainTopicCountDownLatch.countDown(); + return null; + }).when(failOnErrorPaymentsConsumer) + .handlePayment(any(), any()); + + kafkaProducer.send(FAIL_ON_ERROR_TOPIC, createPayment("dlt-fail-main")); + + assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue(); + verify(failOnErrorPaymentsConsumer, never()).handleDltPayment(any(), any()); + } + + @Test + public void whenDltConsumerFails_thenDltProcessingStops() throws Exception { + CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1); + CountDownLatch dlTTopicCountDownLatch = new CountDownLatch(2); + + doAnswer(invocation -> { + mainTopicCountDownLatch.countDown(); + throw new Exception("Simulating error in main consumer"); + }).when(failOnErrorPaymentsConsumer) + .handlePayment(any(), any()); + + doAnswer(invocation -> { + dlTTopicCountDownLatch.countDown(); + throw new Exception("Simulating error in dlt consumer"); + }).when(failOnErrorPaymentsConsumer) + .handleDltPayment(any(), any()); + + kafkaProducer.send(FAIL_ON_ERROR_TOPIC, createPayment("dlt-fail")); + + assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(dlTTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isFalse(); + assertThat(dlTTopicCountDownLatch.getCount()).isEqualTo(1); + } + + @Test + public void whenMainRetryOnErrorConsumerSucceeds_thenNoDltMessage() throws Exception { + CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1); + + doAnswer(invocation -> { + mainTopicCountDownLatch.countDown(); + return null; + }).when(retryOnErrorPaymentsConsumer) + .handlePayment(any(), any()); + + kafkaProducer.send(RETRY_ON_ERROR_TOPIC, createPayment("dlt-retry-main")); + + assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue(); + verify(retryOnErrorPaymentsConsumer, never()).handleDltPayment(any(), any()); + } + + @Test + public void whenDltConsumerFails_thenDltConsumerRetriesMessage() throws Exception { + CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1); + CountDownLatch dlTTopicCountDownLatch = new CountDownLatch(3); + + doAnswer(invocation -> { + mainTopicCountDownLatch.countDown(); + throw new Exception("Simulating error in main consumer"); + }).when(retryOnErrorPaymentsConsumer) + .handlePayment(any(), any()); + + doAnswer(invocation -> { + dlTTopicCountDownLatch.countDown(); + throw new Exception("Simulating error in dlt consumer"); + }).when(retryOnErrorPaymentsConsumer) + .handleDltPayment(any(), any()); + + kafkaProducer.send(RETRY_ON_ERROR_TOPIC, createPayment("dlt-retry")); + + assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(dlTTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(dlTTopicCountDownLatch.getCount()).isEqualTo(0); + } + + @Test + public void whenMainNoDltConsumerSucceeds_thenNoDltMessage() throws Exception { + CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1); + + doAnswer(invocation -> { + mainTopicCountDownLatch.countDown(); + return null; + }).when(noDltPaymentsConsumer) + .handlePayment(any(), any()); + + kafkaProducer.send(NO_DLT_TOPIC, createPayment("no-dlt-main")); + + assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue(); + verify(noDltPaymentsConsumer, never()).handleDltPayment(any(), any()); + } + + @Test + public void whenMainConsumerFails_thenDltConsumerDoesNotReceiveMessage() throws Exception { + CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1); + CountDownLatch dlTTopicCountDownLatch = new CountDownLatch(1); + + doAnswer(invocation -> { + mainTopicCountDownLatch.countDown(); + throw new Exception("Simulating error in main consumer"); + }).when(noDltPaymentsConsumer) + .handlePayment(any(), any()); + + doAnswer(invocation -> { + dlTTopicCountDownLatch.countDown(); + return null; + }).when(noDltPaymentsConsumer) + .handleDltPayment(any(), any()); + + kafkaProducer.send(NO_DLT_TOPIC, createPayment("no-dlt")); + + assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(dlTTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isFalse(); + assertThat(dlTTopicCountDownLatch.getCount()).isEqualTo(1); + } +} \ No newline at end of file diff --git a/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/dlt/KafkaDltRetryOnErrorIntegrationTest.java b/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/dlt/KafkaDltRetryOnErrorIntegrationTest.java deleted file mode 100644 index 393eb2ae46..0000000000 --- a/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/dlt/KafkaDltRetryOnErrorIntegrationTest.java +++ /dev/null @@ -1,88 +0,0 @@ -package com.baeldung.spring.kafka.dlt; - -import static com.baeldung.spring.kafka.dlt.PaymentTestUtils.createPayment; -import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.mock.mockito.SpyBean; -import org.springframework.kafka.config.KafkaListenerEndpointRegistry; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.listener.MessageListenerContainer; -import org.springframework.kafka.test.context.EmbeddedKafka; -import org.springframework.kafka.test.utils.ContainerTestUtils; - -import com.baeldung.spring.kafka.dlt.listener.PaymentListenerDltRetryOnError; - -@SpringBootTest(classes = KafkaDltApplication.class) -@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9099", "port=9099" }) -public class KafkaDltRetryOnErrorIntegrationTest { - private static final String TOPIC = "payments-retry-on-error-dlt"; - - @Autowired - private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; - - @Autowired - private KafkaTemplate kafkaProducer; - - @SpyBean - private PaymentListenerDltRetryOnError paymentsConsumer; - - @BeforeEach - void setUp() { - // wait for embedded Kafka - for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) { - ContainerTestUtils.waitForAssignment(messageListenerContainer, 1); - } - } - - @Test - public void whenMainConsumerSucceeds_thenNoDltMessage() throws Exception { - CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1); - - doAnswer(invocation -> { - mainTopicCountDownLatch.countDown(); - return null; - }).when(paymentsConsumer) - .handlePayment(any(), any()); - - kafkaProducer.send(TOPIC, createPayment("dlt-retry-main")); - - assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue(); - verify(paymentsConsumer, never()).handleDltPayment(any(), any()); - } - - @Test - public void whenDltConsumerFails_thenDltConsumerRetriesMessage() throws Exception { - CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1); - CountDownLatch dlTTopicCountDownLatch = new CountDownLatch(3); - - doAnswer(invocation -> { - mainTopicCountDownLatch.countDown(); - throw new Exception("Simulating error in main consumer"); - }).when(paymentsConsumer) - .handlePayment(any(), any()); - - doAnswer(invocation -> { - dlTTopicCountDownLatch.countDown(); - throw new Exception("Simulating error in dlt consumer"); - }).when(paymentsConsumer) - .handleDltPayment(any(), any()); - - kafkaProducer.send(TOPIC, createPayment("dlt-retry")); - - assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue(); - assertThat(dlTTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue(); - assertThat(dlTTopicCountDownLatch.getCount()).isEqualTo(0); - } -} \ No newline at end of file diff --git a/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/dlt/KafkaNoDltIntegrationTest.java b/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/dlt/KafkaNoDltIntegrationTest.java deleted file mode 100644 index 81cca9fec3..0000000000 --- a/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/dlt/KafkaNoDltIntegrationTest.java +++ /dev/null @@ -1,87 +0,0 @@ -package com.baeldung.spring.kafka.dlt; - -import static com.baeldung.spring.kafka.dlt.PaymentTestUtils.createPayment; -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.mock.mockito.SpyBean; -import org.springframework.kafka.config.KafkaListenerEndpointRegistry; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.listener.MessageListenerContainer; -import org.springframework.kafka.test.context.EmbeddedKafka; -import org.springframework.kafka.test.utils.ContainerTestUtils; - -import com.baeldung.spring.kafka.dlt.listener.PaymentListenerNoDlt; - -@SpringBootTest(classes = KafkaDltApplication.class) -@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9099", "port=9099" }) -public class KafkaNoDltIntegrationTest { - private static final String TOPIC = "payments-no-dlt"; - - @Autowired - private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; - - @Autowired - private KafkaTemplate kafkaProducer; - - @SpyBean - private PaymentListenerNoDlt paymentsConsumer; - - @BeforeEach - void setUp() { - // wait for embedded Kafka - for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) { - ContainerTestUtils.waitForAssignment(messageListenerContainer, 1); - } - } - - @Test - public void whenMainConsumerSucceeds_thenNoDltMessage() throws Exception { - CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1); - - doAnswer(invocation -> { - mainTopicCountDownLatch.countDown(); - return null; - }).when(paymentsConsumer) - .handlePayment(any(), any()); - - kafkaProducer.send(TOPIC, createPayment("no-dlt-main")); - - assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue(); - verify(paymentsConsumer, never()).handleDltPayment(any(), any()); - } - - @Test - public void whenMainConsumerFails_thenDltConsumerDoesNotReceiveMessage() throws Exception { - CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1); - CountDownLatch dlTTopicCountDownLatch = new CountDownLatch(1); - - doAnswer(invocation -> { - mainTopicCountDownLatch.countDown(); - throw new Exception("Simulating error in main consumer"); - }).when(paymentsConsumer) - .handlePayment(any(), any()); - - doAnswer(invocation -> { - dlTTopicCountDownLatch.countDown(); - return null; - }).when(paymentsConsumer) - .handleDltPayment(any(), any()); - - kafkaProducer.send(TOPIC, createPayment("no-dlt")); - - assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue(); - assertThat(dlTTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isFalse(); - assertThat(dlTTopicCountDownLatch.getCount()).isEqualTo(1); - } -} \ No newline at end of file