From ad9942e8112dea31fe58a478aefda5ad829055ec Mon Sep 17 00:00:00 2001 From: "emanuel.trandafir" Date: Wed, 17 Jan 2024 12:24:03 +0100 Subject: [PATCH] BAEL-7202: handling kafka deser errors --- spring-kafka-3/pom.xml | 25 +++++ .../exception/Application.java | 13 +++ .../exception/ArticlePublishedEvent.java | 4 + .../exception/ArticlesPublishedListener.java | 22 +++++ .../exception/EmailService.java | 22 +++++ .../exception/KafkaConfig.java | 55 +++++++++++ .../exception/KafkaErrorHandler.java | 34 +++++++ .../kafka/trusted/packages/SomeData.java | 37 +++++++ .../DeserializationExceptionLiveTest.java | 96 +++++++++++++++++++ .../packages/ListenerConfiguration.java | 42 ++++++++ .../packages/ProducerConfiguration.java | 39 ++++++++ .../packages/TrustedPackagesLiveTest.java | 59 ++++++++++++ spring-kafka-3_/README.md | 2 + spring-kafka-3_/pom.xml | 36 +++++++ .../com/baeldung/spring/kafka/SomeData.java | 0 .../spring/kafka/ListenerConfiguration.java | 0 .../spring/kafka/ProducerConfiguration.java | 0 .../spring/kafka/TrustedPackagesLiveTest.java | 0 18 files changed, 486 insertions(+) create mode 100644 spring-kafka-3/src/main/java/com/baeldung/spring/kafka/deserialization/exception/Application.java create mode 100644 spring-kafka-3/src/main/java/com/baeldung/spring/kafka/deserialization/exception/ArticlePublishedEvent.java create mode 100644 spring-kafka-3/src/main/java/com/baeldung/spring/kafka/deserialization/exception/ArticlesPublishedListener.java create mode 100644 spring-kafka-3/src/main/java/com/baeldung/spring/kafka/deserialization/exception/EmailService.java create mode 100644 spring-kafka-3/src/main/java/com/baeldung/spring/kafka/deserialization/exception/KafkaConfig.java create mode 100644 spring-kafka-3/src/main/java/com/baeldung/spring/kafka/deserialization/exception/KafkaErrorHandler.java create mode 100644 spring-kafka-3/src/main/java/com/baeldung/spring/kafka/trusted/packages/SomeData.java create mode 100644 spring-kafka-3/src/test/java/com/baeldung/spring/kafka/deserialization/exception/DeserializationExceptionLiveTest.java create mode 100644 spring-kafka-3/src/test/java/com/baeldung/spring/kafka/trusted/packages/ListenerConfiguration.java create mode 100644 spring-kafka-3/src/test/java/com/baeldung/spring/kafka/trusted/packages/ProducerConfiguration.java create mode 100644 spring-kafka-3/src/test/java/com/baeldung/spring/kafka/trusted/packages/TrustedPackagesLiveTest.java create mode 100644 spring-kafka-3_/README.md create mode 100644 spring-kafka-3_/pom.xml rename {spring-kafka-3 => spring-kafka-3_}/src/main/java/com/baeldung/spring/kafka/SomeData.java (100%) rename {spring-kafka-3 => spring-kafka-3_}/src/test/java/com/baeldung/spring/kafka/ListenerConfiguration.java (100%) rename {spring-kafka-3 => spring-kafka-3_}/src/test/java/com/baeldung/spring/kafka/ProducerConfiguration.java (100%) rename {spring-kafka-3 => spring-kafka-3_}/src/test/java/com/baeldung/spring/kafka/TrustedPackagesLiveTest.java (100%) diff --git a/spring-kafka-3/pom.xml b/spring-kafka-3/pom.xml index 972412d18e..894eab2576 100644 --- a/spring-kafka-3/pom.xml +++ b/spring-kafka-3/pom.xml @@ -15,6 +15,10 @@ spring-kafka-3 + + org.springframework.boot + spring-boot-starter + org.springframework.kafka spring-kafka @@ -28,9 +32,30 @@ spring-kafka-test test + + org.testcontainers + junit-jupiter + ${testcontainers.version} + test + + + org.testcontainers + kafka + ${testcontainers.version} + test + + + org.awaitility + awaitility + ${awaitility.version} + test + + 17 3.0.12 + 1.19.3 + 4.2.0 diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/deserialization/exception/Application.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/deserialization/exception/Application.java new file mode 100644 index 0000000000..d3857a0ca0 --- /dev/null +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/deserialization/exception/Application.java @@ -0,0 +1,13 @@ +package com.baeldung.spring.kafka.deserialization.exception; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class Application { + + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } + +} diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/deserialization/exception/ArticlePublishedEvent.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/deserialization/exception/ArticlePublishedEvent.java new file mode 100644 index 0000000000..d43f6a6692 --- /dev/null +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/deserialization/exception/ArticlePublishedEvent.java @@ -0,0 +1,4 @@ +package com.baeldung.spring.kafka.deserialization.exception; + +public record ArticlePublishedEvent(String article) { +} \ No newline at end of file diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/deserialization/exception/ArticlesPublishedListener.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/deserialization/exception/ArticlesPublishedListener.java new file mode 100644 index 0000000000..c5d977aa8e --- /dev/null +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/deserialization/exception/ArticlesPublishedListener.java @@ -0,0 +1,22 @@ +package com.baeldung.spring.kafka.deserialization.exception; + +import java.util.logging.Logger; + +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +@Component +public class ArticlesPublishedListener { + private static final Logger log = Logger.getLogger(ArticlesPublishedListener.class.getName()); + private final EmailService emailService; + + public ArticlesPublishedListener(EmailService emailService) { + this.emailService = emailService; + } + + @KafkaListener(topics = "baeldung.articles.published") + public void onArticlePublished(ArticlePublishedEvent event) { + log.info("Received event published event: " + event); + emailService.sendNewsletter(event.article()); + } +} diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/deserialization/exception/EmailService.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/deserialization/exception/EmailService.java new file mode 100644 index 0000000000..1e48bd87b3 --- /dev/null +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/deserialization/exception/EmailService.java @@ -0,0 +1,22 @@ +package com.baeldung.spring.kafka.deserialization.exception; + +import java.util.ArrayList; +import java.util.List; +import java.util.logging.Logger; + +import org.springframework.stereotype.Service; + +@Service +public class EmailService { + private final static Logger log = Logger.getLogger(EmailService.class.getName()); + private final List articles = new ArrayList<>(); + + public void sendNewsletter(String article) { + log.info("Sending newsletter for article: " + article); + articles.add(article); + } + + public List getArticles() { + return articles; + } +} diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/deserialization/exception/KafkaConfig.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/deserialization/exception/KafkaConfig.java new file mode 100644 index 0000000000..5151e98b87 --- /dev/null +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/deserialization/exception/KafkaConfig.java @@ -0,0 +1,55 @@ +package com.baeldung.spring.kafka.deserialization.exception; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.CommonErrorHandler; +import org.springframework.kafka.support.serializer.JsonDeserializer; + +@EnableKafka +@Configuration +class KafkaConfig { + + @Bean + ConsumerFactory consumerFactory( + @Value("${spring.kafka.bootstrap-servers}") String bootstrapServers + ) { + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "baeldung-app-1"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); + return new DefaultKafkaConsumerFactory<>( + props, + new StringDeserializer(), + new JsonDeserializer<>(ArticlePublishedEvent.class) + ); + } + + @Bean + ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( + ConsumerFactory consumerFactory, + CommonErrorHandler commonErrorHandler + ) { + var factory = new ConcurrentKafkaListenerContainerFactory(); + factory.setConsumerFactory(consumerFactory); + factory.setCommonErrorHandler(commonErrorHandler); + return factory; + } + + @Bean + CommonErrorHandler kafkaErrorHandler() { + return new KafkaErrorHandler(); + } + +} \ No newline at end of file diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/deserialization/exception/KafkaErrorHandler.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/deserialization/exception/KafkaErrorHandler.java new file mode 100644 index 0000000000..ea4211ab53 --- /dev/null +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/deserialization/exception/KafkaErrorHandler.java @@ -0,0 +1,34 @@ +package com.baeldung.spring.kafka.deserialization.exception; + + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.errors.RecordDeserializationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.kafka.listener.CommonErrorHandler; +import org.springframework.kafka.listener.MessageListenerContainer; + +class KafkaErrorHandler implements CommonErrorHandler { + private static final Logger log = LoggerFactory.getLogger(KafkaErrorHandler.class); + + @Override + public void handleRecord(Exception exception, ConsumerRecord record, Consumer consumer, MessageListenerContainer container) { + handle(exception, consumer); + } + + @Override + public void handleOtherException(Exception exception, Consumer consumer, MessageListenerContainer container, boolean batchListener) { + handle(exception, consumer); + } + + private void handle(Exception exception, Consumer consumer) { + log.error("Exception thrown", exception); + if (exception instanceof RecordDeserializationException ex) { + consumer.seek(ex.topicPartition(), ex.offset() + 1L); + consumer.commitSync(); + } else { + log.error("Exception not handled", exception); + } + } +} \ No newline at end of file diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/trusted/packages/SomeData.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/trusted/packages/SomeData.java new file mode 100644 index 0000000000..1da3ad8635 --- /dev/null +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/trusted/packages/SomeData.java @@ -0,0 +1,37 @@ +package com.baeldung.spring.kafka.trusted.packages; + +import java.time.Instant; + +public class SomeData { + + private String id; + private String type; + private String status; + private Instant timestamp; + + public SomeData() { + } + + public SomeData(String id, String type, String status, Instant timestamp) { + this.id = id; + this.type = type; + this.status = status; + this.timestamp = timestamp; + } + + public String getId() { + return id; + } + + public String getType() { + return type; + } + + public String getStatus() { + return status; + } + + public Instant getTimestamp() { + return timestamp; + } +} diff --git a/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/deserialization/exception/DeserializationExceptionLiveTest.java b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/deserialization/exception/DeserializationExceptionLiveTest.java new file mode 100644 index 0000000000..19950ad54b --- /dev/null +++ b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/deserialization/exception/DeserializationExceptionLiveTest.java @@ -0,0 +1,96 @@ +package com.baeldung.spring.kafka.deserialization.exception; + +import static java.time.Duration.ofMillis; +import static java.time.Duration.ofSeconds; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +@Testcontainers +@SpringBootTest(classes = Application.class) +class DeserializationExceptionLiveTest { + + @Container + private static KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")); + + private static KafkaProducer testKafkaProducer; + + @Autowired + private EmailService emailService; + + @DynamicPropertySource + static void setProps(DynamicPropertyRegistry registry) { + registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers); + } + + @BeforeAll + static void beforeAll() { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + testKafkaProducer = new KafkaProducer<>(props); + + Awaitility.setDefaultTimeout(ofSeconds(5)); + Awaitility.setDefaultPollInterval(ofMillis(50)); + } + + @BeforeEach + void beforeEach() { + emailService.getArticles().clear(); + } + + @Test + void whenPublishingInvalidArticleEvent_thenHandleExceptionAndContinueProcessing() { + publishArticle("{ \"article\": \"Introduction to Kafka\" }"); + publishArticle(" !! Invalid JSON !! "); + publishArticle("{ \"article\": \"Kafka Streams Tutorial\" }"); + + await().untilAsserted(() -> assertThat(emailService.getArticles()) + .containsExactlyInAnyOrder( + "Introduction to Kafka", + "Kafka Streams Tutorial" + )); + } + + @Test + void whenPublishingValidArticleEvent_thenProcessWithoutErrors() { + publishArticle("{ \"article\": \"Kotlin for Java Developers\" }"); + publishArticle("{ \"article\": \"The S.O.L.I.D. Principles\" }"); + + await().untilAsserted(() -> assertThat(emailService.getArticles()) + .containsExactlyInAnyOrder( + "Kotlin for Java Developers", + "The S.O.L.I.D. Principles" + )); + } + + + static void publishArticle(String jsonBody) { + ProducerRecord record = new ProducerRecord<>("baeldung.articles.published", jsonBody); + try { + testKafkaProducer.send(record).get(); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + } +} diff --git a/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/trusted/packages/ListenerConfiguration.java b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/trusted/packages/ListenerConfiguration.java new file mode 100644 index 0000000000..6bcb047c19 --- /dev/null +++ b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/trusted/packages/ListenerConfiguration.java @@ -0,0 +1,42 @@ +package com.baeldung.spring.kafka.trusted.packages; + +import java.util.Map; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.support.serializer.JsonDeserializer; + +@Configuration +public class ListenerConfiguration { + + @Bean("messageListenerContainer") + public ConcurrentKafkaListenerContainerFactory messageListenerContainer() { + ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory<>(); + container.setConsumerFactory(someDataConsumerFactory()); + return container; + } + + @Bean + public ConsumerFactory someDataConsumerFactory() { + JsonDeserializer payloadJsonDeserializer = new JsonDeserializer<>(); + payloadJsonDeserializer.trustedPackages("com.baeldung.spring.kafka"); + return new DefaultKafkaConsumerFactory<>( + consumerConfigs(), + new StringDeserializer(), + payloadJsonDeserializer + ); + } + + @Bean + public Map consumerConfigs() { + return Map.of( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "PLAINTEXT://localhost:9092", + ConsumerConfig.GROUP_ID_CONFIG, "some-group-id" + ); + } +} \ No newline at end of file diff --git a/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/trusted/packages/ProducerConfiguration.java b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/trusted/packages/ProducerConfiguration.java new file mode 100644 index 0000000000..f7de2f5023 --- /dev/null +++ b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/trusted/packages/ProducerConfiguration.java @@ -0,0 +1,39 @@ +package com.baeldung.spring.kafka.trusted.packages; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.serializer.JsonSerializer; +import org.springframework.kafka.support.serializer.StringOrBytesSerializer; + +import java.util.Map; + +@Configuration +public class ProducerConfiguration { + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } + + @Bean + public ProducerFactory producerFactory() { + JsonSerializer jsonSerializer = new JsonSerializer<>(); + jsonSerializer.setAddTypeInfo(true); + return new DefaultKafkaProducerFactory<>( + producerFactoryConfig(), + new StringOrBytesSerializer(), + jsonSerializer + ); + } + + @Bean + public Map producerFactoryConfig() { + return Map.of( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "PLAINTEXT://localhost:9092" + ); + } +} \ No newline at end of file diff --git a/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/trusted/packages/TrustedPackagesLiveTest.java b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/trusted/packages/TrustedPackagesLiveTest.java new file mode 100644 index 0000000000..532f862e13 --- /dev/null +++ b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/trusted/packages/TrustedPackagesLiveTest.java @@ -0,0 +1,59 @@ +package com.baeldung.spring.kafka.trusted.packages; + +import java.time.Instant; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.SpyBean; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; + +/** + * This test requires a running instance of kafka to be present + */ +@SpringBootTest +@Disabled("This test requires a running instance of kafka to be present - manually run it") +public class TrustedPackagesLiveTest { + + @Autowired + private KafkaTemplate kafkaTemplate; + + @SpyBean + TestConsumer testConsumer; + + @Test + void givenMessageInTheTopic_whenTypeInfoPackageIsTrusted_thenMessageIsSuccessfullyConsumed() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + + Mockito.doAnswer(invocationOnMock -> { + try { + latch.countDown(); + return invocationOnMock.callRealMethod(); + } catch (Exception e) { + return null; + } + }).when(testConsumer).onMessage(Mockito.any()); + + SomeData someData = new SomeData("1", "active", "sent", Instant.now()); + kafkaTemplate.send(new ProducerRecord<>("sourceTopic", null, someData)); + + Assertions.assertTrue(latch.await(10, TimeUnit.SECONDS)); + } + + @Component + static class TestConsumer { + + @KafkaListener(topics = "sourceTopic", containerFactory = "messageListenerContainer") + public void onMessage(SomeData someData) { + + } + } +} diff --git a/spring-kafka-3_/README.md b/spring-kafka-3_/README.md new file mode 100644 index 0000000000..f9c0036ce3 --- /dev/null +++ b/spring-kafka-3_/README.md @@ -0,0 +1,2 @@ +## Relevant Articles +- [Spring Kafka Trusted Packages Feature](https://www.baeldung.com/spring-kafka-trusted-packages-feature) diff --git a/spring-kafka-3_/pom.xml b/spring-kafka-3_/pom.xml new file mode 100644 index 0000000000..972412d18e --- /dev/null +++ b/spring-kafka-3_/pom.xml @@ -0,0 +1,36 @@ + + + com.baeldung + parent-boot-2 + 0.0.1-SNAPSHOT + ../parent-boot-2 + + + 4.0.0 + + spring-kafka-3 + jar + + spring-kafka-3 + + + + org.springframework.kafka + spring-kafka + + + com.fasterxml.jackson.core + jackson-databind + + + org.springframework.kafka + spring-kafka-test + test + + + + + 3.0.12 + + diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/SomeData.java b/spring-kafka-3_/src/main/java/com/baeldung/spring/kafka/SomeData.java similarity index 100% rename from spring-kafka-3/src/main/java/com/baeldung/spring/kafka/SomeData.java rename to spring-kafka-3_/src/main/java/com/baeldung/spring/kafka/SomeData.java diff --git a/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/ListenerConfiguration.java b/spring-kafka-3_/src/test/java/com/baeldung/spring/kafka/ListenerConfiguration.java similarity index 100% rename from spring-kafka-3/src/test/java/com/baeldung/spring/kafka/ListenerConfiguration.java rename to spring-kafka-3_/src/test/java/com/baeldung/spring/kafka/ListenerConfiguration.java diff --git a/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/ProducerConfiguration.java b/spring-kafka-3_/src/test/java/com/baeldung/spring/kafka/ProducerConfiguration.java similarity index 100% rename from spring-kafka-3/src/test/java/com/baeldung/spring/kafka/ProducerConfiguration.java rename to spring-kafka-3_/src/test/java/com/baeldung/spring/kafka/ProducerConfiguration.java diff --git a/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/TrustedPackagesLiveTest.java b/spring-kafka-3_/src/test/java/com/baeldung/spring/kafka/TrustedPackagesLiveTest.java similarity index 100% rename from spring-kafka-3/src/test/java/com/baeldung/spring/kafka/TrustedPackagesLiveTest.java rename to spring-kafka-3_/src/test/java/com/baeldung/spring/kafka/TrustedPackagesLiveTest.java