Merge pull request #15568 from constantinurs/spring-kafka-dlt

BAEL-7199: Fix CI pipeline
This commit is contained in:
Loredana Crusoveanu 2024-01-11 13:59:26 +02:00 committed by GitHub
commit f3f51df137
8 changed files with 185 additions and 266 deletions

View File

@ -17,7 +17,7 @@ public class PaymentListenerDltFailOnError {
private final Logger log = LoggerFactory.getLogger(PaymentListenerDltFailOnError.class);
@RetryableTopic(attempts = "1", kafkaTemplate = "retryableTopicKafkaTemplate", dltStrategy = DltStrategy.FAIL_ON_ERROR)
@KafkaListener(topics = { "payments-fail-on-error-dlt" }, groupId = "payments")
@KafkaListener(topics = { "payments-fail-on-error-dlt" }, groupId = "payments", containerFactory="containerFactory")
public void handlePayment(Payment payment, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("Event on main topic={}, payload={}", topic, payment);
}

View File

@ -17,7 +17,7 @@ public class PaymentListenerDltRetryOnError {
private final Logger log = LoggerFactory.getLogger(PaymentListenerDltRetryOnError.class);
@RetryableTopic(attempts = "1", kafkaTemplate = "retryableTopicKafkaTemplate", dltStrategy = DltStrategy.ALWAYS_RETRY_ON_ERROR)
@KafkaListener(topics = { "payments-retry-on-error-dlt" }, groupId = "payments")
@KafkaListener(topics = { "payments-retry-on-error-dlt" }, groupId = "payments", containerFactory="containerFactory")
public void handlePayment(Payment payment, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("Event on main topic={}, payload={}", topic, payment);
}

View File

@ -17,7 +17,7 @@ public class PaymentListenerNoDlt {
private final Logger log = LoggerFactory.getLogger(PaymentListenerNoDlt.class);
@RetryableTopic(attempts = "1", kafkaTemplate = "retryableTopicKafkaTemplate", dltStrategy = DltStrategy.NO_DLT)
@KafkaListener(topics = { "payments-no-dlt" }, groupId = "payments")
@KafkaListener(topics = { "payments-no-dlt" }, groupId = "payments", containerFactory="containerFactory")
public void handlePayment(Payment payment, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("Event on main topic={}, payload={}", topic, payment);
}

View File

@ -1,4 +1,4 @@
spring.kafka.bootstrap-servers=localhost:9092,localhost:9093,localhost:9094
spring.kafka.bootstrap-servers=localhost:9092,localhost:9093,localhost:9094,localhost:9095
message.topic.name=baeldung
long.message.topic.name=longMessage
greeting.topic.name=greeting

View File

@ -1,87 +0,0 @@
package com.baeldung.spring.kafka.dlt;
import static com.baeldung.spring.kafka.dlt.PaymentTestUtils.createPayment;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import com.baeldung.spring.kafka.dlt.listener.PaymentListenerDltFailOnError;
@SpringBootTest(classes = KafkaDltApplication.class)
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9099", "port=9099" })
public class KafkaDltFailOnErrorIntegrationTest {
private static final String TOPIC = "payments-fail-on-error-dlt";
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Autowired
private KafkaTemplate<String, Payment> kafkaProducer;
@SpyBean
private PaymentListenerDltFailOnError paymentsConsumer;
@BeforeEach
void setUp() {
// wait for embedded Kafka
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
ContainerTestUtils.waitForAssignment(messageListenerContainer, 1);
}
}
@Test
public void whenMainConsumerSucceeds_thenNoDltMessage() throws Exception {
CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1);
doAnswer(invocation -> {
mainTopicCountDownLatch.countDown();
return null;
}).when(paymentsConsumer)
.handlePayment(any(), any());
kafkaProducer.send(TOPIC, createPayment("dlt-fail-main"));
assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
verify(paymentsConsumer, never()).handleDltPayment(any(), any());
}
@Test
public void whenDltConsumerFails_thenDltProcessingStops() throws Exception {
CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1);
CountDownLatch dlTTopicCountDownLatch = new CountDownLatch(2);
doAnswer(invocation -> {
mainTopicCountDownLatch.countDown();
throw new Exception("Simulating error in main consumer");
}).when(paymentsConsumer)
.handlePayment(any(), any());
doAnswer(invocation -> {
dlTTopicCountDownLatch.countDown();
throw new Exception("Simulating error in dlt consumer");
}).when(paymentsConsumer)
.handleDltPayment(any(), any());
kafkaProducer.send(TOPIC, createPayment("dlt-fail"));
assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(dlTTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isFalse();
assertThat(dlTTopicCountDownLatch.getCount()).isEqualTo(1);
}
}

View File

@ -0,0 +1,181 @@
package com.baeldung.spring.kafka.dlt;
import static com.baeldung.spring.kafka.dlt.PaymentTestUtils.createPayment;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import com.baeldung.spring.kafka.dlt.listener.PaymentListenerDltFailOnError;
import com.baeldung.spring.kafka.dlt.listener.PaymentListenerDltRetryOnError;
import com.baeldung.spring.kafka.dlt.listener.PaymentListenerNoDlt;
@SpringBootTest(classes = KafkaDltApplication.class)
@EmbeddedKafka(
partitions = 1,
brokerProperties = { "listeners=PLAINTEXT://localhost:9095", "port=9095" },
topics = {"payments-fail-on-error-dlt", "payments-retry-on-error-dlt", "payments-no-dlt"}
)
public class KafkaDltIntegrationTest {
private static final String FAIL_ON_ERROR_TOPIC = "payments-fail-on-error-dlt";
private static final String RETRY_ON_ERROR_TOPIC = "payments-retry-on-error-dlt";
private static final String NO_DLT_TOPIC = "payments-no-dlt";
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Autowired
private KafkaTemplate<String, Payment> kafkaProducer;
@SpyBean
private PaymentListenerDltFailOnError failOnErrorPaymentsConsumer;
@SpyBean
private PaymentListenerDltRetryOnError retryOnErrorPaymentsConsumer;
@SpyBean
private PaymentListenerNoDlt noDltPaymentsConsumer;
@BeforeEach
void setUp() {
// wait for embedded Kafka
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
ContainerTestUtils.waitForAssignment(messageListenerContainer, 1);
}
}
@Test
public void whenMainDltFailOnErrorConsumerSucceeds_thenNoDltMessage() throws Exception {
CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1);
doAnswer(invocation -> {
mainTopicCountDownLatch.countDown();
return null;
}).when(failOnErrorPaymentsConsumer)
.handlePayment(any(), any());
kafkaProducer.send(FAIL_ON_ERROR_TOPIC, createPayment("dlt-fail-main"));
assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
verify(failOnErrorPaymentsConsumer, never()).handleDltPayment(any(), any());
}
@Test
public void whenDltConsumerFails_thenDltProcessingStops() throws Exception {
CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1);
CountDownLatch dlTTopicCountDownLatch = new CountDownLatch(2);
doAnswer(invocation -> {
mainTopicCountDownLatch.countDown();
throw new Exception("Simulating error in main consumer");
}).when(failOnErrorPaymentsConsumer)
.handlePayment(any(), any());
doAnswer(invocation -> {
dlTTopicCountDownLatch.countDown();
throw new Exception("Simulating error in dlt consumer");
}).when(failOnErrorPaymentsConsumer)
.handleDltPayment(any(), any());
kafkaProducer.send(FAIL_ON_ERROR_TOPIC, createPayment("dlt-fail"));
assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(dlTTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isFalse();
assertThat(dlTTopicCountDownLatch.getCount()).isEqualTo(1);
}
@Test
public void whenMainRetryOnErrorConsumerSucceeds_thenNoDltMessage() throws Exception {
CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1);
doAnswer(invocation -> {
mainTopicCountDownLatch.countDown();
return null;
}).when(retryOnErrorPaymentsConsumer)
.handlePayment(any(), any());
kafkaProducer.send(RETRY_ON_ERROR_TOPIC, createPayment("dlt-retry-main"));
assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
verify(retryOnErrorPaymentsConsumer, never()).handleDltPayment(any(), any());
}
@Test
public void whenDltConsumerFails_thenDltConsumerRetriesMessage() throws Exception {
CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1);
CountDownLatch dlTTopicCountDownLatch = new CountDownLatch(3);
doAnswer(invocation -> {
mainTopicCountDownLatch.countDown();
throw new Exception("Simulating error in main consumer");
}).when(retryOnErrorPaymentsConsumer)
.handlePayment(any(), any());
doAnswer(invocation -> {
dlTTopicCountDownLatch.countDown();
throw new Exception("Simulating error in dlt consumer");
}).when(retryOnErrorPaymentsConsumer)
.handleDltPayment(any(), any());
kafkaProducer.send(RETRY_ON_ERROR_TOPIC, createPayment("dlt-retry"));
assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(dlTTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(dlTTopicCountDownLatch.getCount()).isEqualTo(0);
}
@Test
public void whenMainNoDltConsumerSucceeds_thenNoDltMessage() throws Exception {
CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1);
doAnswer(invocation -> {
mainTopicCountDownLatch.countDown();
return null;
}).when(noDltPaymentsConsumer)
.handlePayment(any(), any());
kafkaProducer.send(NO_DLT_TOPIC, createPayment("no-dlt-main"));
assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
verify(noDltPaymentsConsumer, never()).handleDltPayment(any(), any());
}
@Test
public void whenMainConsumerFails_thenDltConsumerDoesNotReceiveMessage() throws Exception {
CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1);
CountDownLatch dlTTopicCountDownLatch = new CountDownLatch(1);
doAnswer(invocation -> {
mainTopicCountDownLatch.countDown();
throw new Exception("Simulating error in main consumer");
}).when(noDltPaymentsConsumer)
.handlePayment(any(), any());
doAnswer(invocation -> {
dlTTopicCountDownLatch.countDown();
return null;
}).when(noDltPaymentsConsumer)
.handleDltPayment(any(), any());
kafkaProducer.send(NO_DLT_TOPIC, createPayment("no-dlt"));
assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(dlTTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isFalse();
assertThat(dlTTopicCountDownLatch.getCount()).isEqualTo(1);
}
}

View File

@ -1,88 +0,0 @@
package com.baeldung.spring.kafka.dlt;
import static com.baeldung.spring.kafka.dlt.PaymentTestUtils.createPayment;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import com.baeldung.spring.kafka.dlt.listener.PaymentListenerDltRetryOnError;
@SpringBootTest(classes = KafkaDltApplication.class)
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9099", "port=9099" })
public class KafkaDltRetryOnErrorIntegrationTest {
private static final String TOPIC = "payments-retry-on-error-dlt";
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Autowired
private KafkaTemplate<String, Payment> kafkaProducer;
@SpyBean
private PaymentListenerDltRetryOnError paymentsConsumer;
@BeforeEach
void setUp() {
// wait for embedded Kafka
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
ContainerTestUtils.waitForAssignment(messageListenerContainer, 1);
}
}
@Test
public void whenMainConsumerSucceeds_thenNoDltMessage() throws Exception {
CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1);
doAnswer(invocation -> {
mainTopicCountDownLatch.countDown();
return null;
}).when(paymentsConsumer)
.handlePayment(any(), any());
kafkaProducer.send(TOPIC, createPayment("dlt-retry-main"));
assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
verify(paymentsConsumer, never()).handleDltPayment(any(), any());
}
@Test
public void whenDltConsumerFails_thenDltConsumerRetriesMessage() throws Exception {
CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1);
CountDownLatch dlTTopicCountDownLatch = new CountDownLatch(3);
doAnswer(invocation -> {
mainTopicCountDownLatch.countDown();
throw new Exception("Simulating error in main consumer");
}).when(paymentsConsumer)
.handlePayment(any(), any());
doAnswer(invocation -> {
dlTTopicCountDownLatch.countDown();
throw new Exception("Simulating error in dlt consumer");
}).when(paymentsConsumer)
.handleDltPayment(any(), any());
kafkaProducer.send(TOPIC, createPayment("dlt-retry"));
assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(dlTTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(dlTTopicCountDownLatch.getCount()).isEqualTo(0);
}
}

View File

@ -1,87 +0,0 @@
package com.baeldung.spring.kafka.dlt;
import static com.baeldung.spring.kafka.dlt.PaymentTestUtils.createPayment;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import com.baeldung.spring.kafka.dlt.listener.PaymentListenerNoDlt;
@SpringBootTest(classes = KafkaDltApplication.class)
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9099", "port=9099" })
public class KafkaNoDltIntegrationTest {
private static final String TOPIC = "payments-no-dlt";
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Autowired
private KafkaTemplate<String, Payment> kafkaProducer;
@SpyBean
private PaymentListenerNoDlt paymentsConsumer;
@BeforeEach
void setUp() {
// wait for embedded Kafka
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
ContainerTestUtils.waitForAssignment(messageListenerContainer, 1);
}
}
@Test
public void whenMainConsumerSucceeds_thenNoDltMessage() throws Exception {
CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1);
doAnswer(invocation -> {
mainTopicCountDownLatch.countDown();
return null;
}).when(paymentsConsumer)
.handlePayment(any(), any());
kafkaProducer.send(TOPIC, createPayment("no-dlt-main"));
assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
verify(paymentsConsumer, never()).handleDltPayment(any(), any());
}
@Test
public void whenMainConsumerFails_thenDltConsumerDoesNotReceiveMessage() throws Exception {
CountDownLatch mainTopicCountDownLatch = new CountDownLatch(1);
CountDownLatch dlTTopicCountDownLatch = new CountDownLatch(1);
doAnswer(invocation -> {
mainTopicCountDownLatch.countDown();
throw new Exception("Simulating error in main consumer");
}).when(paymentsConsumer)
.handlePayment(any(), any());
doAnswer(invocation -> {
dlTTopicCountDownLatch.countDown();
return null;
}).when(paymentsConsumer)
.handleDltPayment(any(), any());
kafkaProducer.send(TOPIC, createPayment("no-dlt"));
assertThat(mainTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(dlTTopicCountDownLatch.await(5, TimeUnit.SECONDS)).isFalse();
assertThat(dlTTopicCountDownLatch.getCount()).isEqualTo(1);
}
}