diff --git a/gradle-modules/gradle-7/toolchains-feature/gradle/wrapper/gradle-wrapper.jar b/gradle-modules/gradle-7/toolchains-feature/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 0000000000..c1962a79e2 Binary files /dev/null and b/gradle-modules/gradle-7/toolchains-feature/gradle/wrapper/gradle-wrapper.jar differ diff --git a/pom.xml b/pom.xml index 32b8e8c8c8..167509d2b3 100644 --- a/pom.xml +++ b/pom.xml @@ -867,6 +867,7 @@ spring-jersey spring-jinq spring-kafka-2 + spring-kafka-3 spring-kafka spring-katharsis spring-mobile diff --git a/spring-kafka-3/pom.xml b/spring-kafka-3/pom.xml new file mode 100644 index 0000000000..972412d18e --- /dev/null +++ b/spring-kafka-3/pom.xml @@ -0,0 +1,36 @@ + + + com.baeldung + parent-boot-2 + 0.0.1-SNAPSHOT + ../parent-boot-2 + + + 4.0.0 + + spring-kafka-3 + jar + + spring-kafka-3 + + + + org.springframework.kafka + spring-kafka + + + com.fasterxml.jackson.core + jackson-databind + + + org.springframework.kafka + spring-kafka-test + test + + + + + 3.0.12 + + diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/SomeData.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/SomeData.java new file mode 100644 index 0000000000..eb2e57c33d --- /dev/null +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/SomeData.java @@ -0,0 +1,37 @@ +package com.baeldung.spring.kafka; + +import java.time.Instant; + +public class SomeData { + + private String id; + private String type; + private String status; + private Instant timestamp; + + public SomeData() { + } + + public SomeData(String id, String type, String status, Instant timestamp) { + this.id = id; + this.type = type; + this.status = status; + this.timestamp = timestamp; + } + + public String getId() { + return id; + } + + public String getType() { + return type; + } + + public String getStatus() { + return status; + } + + public Instant getTimestamp() { + return timestamp; + } +} diff --git a/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/ListenerConfiguration.java b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/ListenerConfiguration.java new file mode 100644 index 0000000000..e35b1ee415 --- /dev/null +++ b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/ListenerConfiguration.java @@ -0,0 +1,42 @@ +package com.baeldung.spring.kafka; + +import java.util.Map; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.support.serializer.JsonDeserializer; + +@Configuration +public class ListenerConfiguration { + + @Bean("messageListenerContainer") + public ConcurrentKafkaListenerContainerFactory messageListenerContainer() { + ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory<>(); + container.setConsumerFactory(someDataConsumerFactory()); + return container; + } + + @Bean + public ConsumerFactory someDataConsumerFactory() { + JsonDeserializer payloadJsonDeserializer = new JsonDeserializer<>(); + payloadJsonDeserializer.trustedPackages("com.baeldung.spring.kafka"); + return new DefaultKafkaConsumerFactory<>( + consumerConfigs(), + new StringDeserializer(), + payloadJsonDeserializer + ); + } + + @Bean + public Map consumerConfigs() { + return Map.of( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "PLAINTEXT://localhost:9092", + ConsumerConfig.GROUP_ID_CONFIG, "some-group-id" + ); + } +} \ No newline at end of file diff --git a/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/ProducerConfiguration.java b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/ProducerConfiguration.java new file mode 100644 index 0000000000..7373424841 --- /dev/null +++ b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/ProducerConfiguration.java @@ -0,0 +1,40 @@ +package com.baeldung.spring.kafka; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.serializer.JsonSerializer; +import org.springframework.kafka.support.serializer.StringOrBytesSerializer; + +import java.time.Instant; +import java.util.Map; + +@Configuration +public class ProducerConfiguration { + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } + + @Bean + public ProducerFactory producerFactory() { + JsonSerializer jsonSerializer = new JsonSerializer<>(); + jsonSerializer.setAddTypeInfo(true); + return new DefaultKafkaProducerFactory<>( + producerFactoryConfig(), + new StringOrBytesSerializer(), + jsonSerializer + ); + } + + @Bean + public Map producerFactoryConfig() { + return Map.of( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "PLAINTEXT://localhost:9092" + ); + } +} \ No newline at end of file diff --git a/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/TrustedPackagesLiveTest.java b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/TrustedPackagesLiveTest.java new file mode 100644 index 0000000000..fa4b79cd65 --- /dev/null +++ b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/TrustedPackagesLiveTest.java @@ -0,0 +1,57 @@ +package com.baeldung.spring.kafka; + +import java.time.Instant; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.SpyBean; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; + +/** + * This test requires a running instance of kafka to be present + */ +@SpringBootTest +public class TrustedPackagesLiveTest { + + @Autowired + private KafkaTemplate kafkaTemplate; + + @SpyBean + TestConsumer testConsumer; + + @Test + void givenMessageInTheTopic_whenTypeInfoPackageIsTrusted_thenMessageIsSuccessfullyConsumed() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + + Mockito.doAnswer(invocationOnMock -> { + try { + latch.countDown(); + return invocationOnMock.callRealMethod(); + } catch (Exception e) { + return null; + } + }).when(testConsumer).onMessage(Mockito.any()); + + SomeData someData = new SomeData("1", "active", "sent", Instant.now()); + kafkaTemplate.send(new ProducerRecord<>("sourceTopic", null, someData)); + + Assertions.assertTrue(latch.await(20L, TimeUnit.SECONDS)); + } + + @Component + static class TestConsumer { + + @KafkaListener(topics = "sourceTopic", containerFactory = "messageListenerContainer") + public void onMessage(SomeData someData) { + + } + } +}