diff --git a/spring-kafka/pom.xml b/spring-kafka/pom.xml index 2b4a0914e6..3673fe8fa5 100644 --- a/spring-kafka/pom.xml +++ b/spring-kafka/pom.xml @@ -1,9 +1,9 @@ - + 4.0.0 spring-kafka - 0.0.1-SNAPSHOT spring-kafka Intro to Kafka with Spring @@ -15,21 +15,29 @@ - org.springframework.boot spring-boot-starter - org.springframework.kafka spring-kafka - com.fasterxml.jackson.core jackson-databind + + org.springframework.kafka + spring-kafka-test + test + + + org.testcontainers + kafka + test + 1.15.0-rc2 + diff --git a/spring-kafka/src/main/java/com/baeldung/kafka/embedded/KafkaConsumer.java b/spring-kafka/src/main/java/com/baeldung/kafka/embedded/KafkaConsumer.java new file mode 100644 index 0000000000..48a194b4e3 --- /dev/null +++ b/spring-kafka/src/main/java/com/baeldung/kafka/embedded/KafkaConsumer.java @@ -0,0 +1,39 @@ +package com.baeldung.kafka.embedded; + +import java.util.concurrent.CountDownLatch; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Component +public class KafkaConsumer { + + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class); + + private CountDownLatch latch = new CountDownLatch(1); + private String payload = null; + + @KafkaListener(topics = "${test.topic}") + public void receive(ConsumerRecord consumerRecord) { + LOGGER.info("received payload='{}'", consumerRecord.toString()); + setPayload(consumerRecord.toString()); + latch.countDown(); + } + + public CountDownLatch getLatch() { + return latch; + } + + public String getPayload() { + return payload; + } + + private void setPayload(String payload) { + this.payload = payload; + } + +} diff --git a/spring-kafka/src/main/java/com/baeldung/kafka/embedded/KafkaProducer.java b/spring-kafka/src/main/java/com/baeldung/kafka/embedded/KafkaProducer.java new file mode 100644 index 0000000000..d7cbd35011 --- /dev/null +++ b/spring-kafka/src/main/java/com/baeldung/kafka/embedded/KafkaProducer.java @@ -0,0 +1,22 @@ +package com.baeldung.kafka.embedded; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; + +@Component +public class KafkaProducer { + + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class); + + @Autowired + private KafkaTemplate kafkaTemplate; + + public void send(String topic, String payload) { + LOGGER.info("sending payload='{}' to topic='{}'", payload, topic); + kafkaTemplate.send(topic, payload); + } + +} diff --git a/spring-kafka/src/main/java/com/baeldung/kafka/embedded/KafkaProducerConsumerApplication.java b/spring-kafka/src/main/java/com/baeldung/kafka/embedded/KafkaProducerConsumerApplication.java new file mode 100644 index 0000000000..bf14251d75 --- /dev/null +++ b/spring-kafka/src/main/java/com/baeldung/kafka/embedded/KafkaProducerConsumerApplication.java @@ -0,0 +1,15 @@ +package com.baeldung.kafka.embedded; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +@EnableAutoConfiguration +public class KafkaProducerConsumerApplication { + + public static void main(String[] args) { + SpringApplication.run(KafkaProducerConsumerApplication.class, args); + } + +} diff --git a/spring-kafka/src/test/java/com/baeldung/SpringContextLiveTest.java b/spring-kafka/src/test/java/com/baeldung/SpringContextLiveTest.java deleted file mode 100644 index 60262df9d4..0000000000 --- a/spring-kafka/src/test/java/com/baeldung/SpringContextLiveTest.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.baeldung; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.test.context.junit4.SpringRunner; - -import com.baeldung.spring.kafka.KafkaApplication; - -@RunWith(SpringRunner.class) -@SpringBootTest(classes = KafkaApplication.class) -public class SpringContextLiveTest { - - @Test - public void whenSpringContextIsBootstrapped_thenNoExceptions() { - } -} diff --git a/spring-kafka/src/test/java/com/baeldung/SpringContextManualTest.java b/spring-kafka/src/test/java/com/baeldung/SpringContextManualTest.java deleted file mode 100644 index 0d2c19136f..0000000000 --- a/spring-kafka/src/test/java/com/baeldung/SpringContextManualTest.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.baeldung; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.test.context.junit4.SpringRunner; - -import com.baeldung.spring.kafka.KafkaApplication; - -@RunWith(SpringRunner.class) -@SpringBootTest(classes = KafkaApplication.class) -public class SpringContextManualTest { - - @Test - public void whenSpringContextIsBootstrapped_thenNoExceptions() { - } -} diff --git a/spring-kafka/src/test/java/com/baeldung/kafka/embedded/EmbeddedKafkaIntegrationTest.java b/spring-kafka/src/test/java/com/baeldung/kafka/embedded/EmbeddedKafkaIntegrationTest.java new file mode 100644 index 0000000000..49ea080369 --- /dev/null +++ b/spring-kafka/src/test/java/com/baeldung/kafka/embedded/EmbeddedKafkaIntegrationTest.java @@ -0,0 +1,55 @@ +package com.baeldung.kafka.embedded; + +import org.junit.jupiter.api.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit4.SpringRunner; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.util.concurrent.TimeUnit; + +@RunWith(SpringRunner.class) +@SpringBootTest +@DirtiesContext +@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" }) +class EmbeddedKafkaIntegrationTest { + + @Autowired + public KafkaTemplate template; + + @Autowired + private KafkaConsumer consumer; + + @Autowired + private KafkaProducer producer; + + @Value("${test.topic}") + private String topic; + + @Test + public void givenEmbeddedKafkaBroker_whenSendingtoDefaultTemplate_thenMessageReceived() throws Exception { + template.send(topic, "Sending with default template"); + consumer.getLatch().await(10000, TimeUnit.MILLISECONDS); + assertThat(consumer.getLatch().getCount(), equalTo(0L)); + + assertThat(consumer.getPayload(), containsString("embedded-test-topic")); + } + + @Test + public void givenEmbeddedKafkaBroker_whenSendingtoSimpleProducer_thenMessageReceived() throws Exception { + producer.send(topic, "Sending with own controller"); + consumer.getLatch().await(10000, TimeUnit.MILLISECONDS); + + assertThat(consumer.getLatch().getCount(), equalTo(0L)); + assertThat(consumer.getPayload(), containsString("embedded-test-topic")); + } + +} diff --git a/spring-kafka/src/test/java/com/baeldung/kafka/testcontainers/KafkaTestContainersIntegrationTest.java b/spring-kafka/src/test/java/com/baeldung/kafka/testcontainers/KafkaTestContainersIntegrationTest.java new file mode 100644 index 0000000000..cf941b770f --- /dev/null +++ b/spring-kafka/src/test/java/com/baeldung/kafka/testcontainers/KafkaTestContainersIntegrationTest.java @@ -0,0 +1,132 @@ +package com.baeldung.kafka.testcontainers; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Import; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit4.SpringRunner; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.utility.DockerImageName; + +import com.baeldung.kafka.embedded.KafkaConsumer; +import com.baeldung.kafka.embedded.KafkaProducer; +import com.baeldung.kafka.embedded.KafkaProducerConsumerApplication; + +@RunWith(SpringRunner.class) +@Import(com.baeldung.kafka.testcontainers.KafkaTestContainersIntegrationTest.KafkaTestContainersConfiguration.class) +@SpringBootTest(classes = KafkaProducerConsumerApplication.class) +@DirtiesContext +public class KafkaTestContainersIntegrationTest { + + @ClassRule + public static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3")); + + @Autowired + public KafkaTemplate template; + + @Autowired + private KafkaConsumer consumer; + + @Autowired + private KafkaProducer producer; + + @Value("${test.topic}") + private String topic; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + kafka.start(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + kafka.stop(); + } + + @Test + public void givenKafkaDockerContainer_whenSendingtoDefaultTemplate_thenMessageReceived() throws Exception { + template.send(topic, "Sending with default template"); + consumer.getLatch().await(10000, TimeUnit.MILLISECONDS); + + assertThat(consumer.getLatch().getCount(), equalTo(0L)); + assertThat(consumer.getPayload(), containsString("embedded-test-topic")); + } + + @Test + public void givenKafkaDockerContainer_whenSendingtoSimpleProducer_thenMessageReceived() throws Exception { + producer.send(topic, "Sending with own controller"); + consumer.getLatch().await(10000, TimeUnit.MILLISECONDS); + + assertThat(consumer.getLatch().getCount(), equalTo(0L)); + assertThat(consumer.getPayload(), containsString("embedded-test-topic")); + } + + @TestConfiguration + static class KafkaTestContainersConfiguration { + + @Bean + ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + return factory; + } + + @Bean + public ConsumerFactory consumerFactory() { + return new DefaultKafkaConsumerFactory<>(consumerConfigs()); + } + + @Bean + public Map consumerConfigs() { + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "baeldung"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + return props; + } + + @Bean + public ProducerFactory producerFactory() { + Map configProps = new HashMap<>(); + configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); + configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + return new DefaultKafkaProducerFactory<>(configProps); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } + + } + +} diff --git a/spring-kafka/src/test/resources/application.yml b/spring-kafka/src/test/resources/application.yml new file mode 100644 index 0000000000..7d7997c6fd --- /dev/null +++ b/spring-kafka/src/test/resources/application.yml @@ -0,0 +1,7 @@ +spring: + kafka: + consumer: + auto-offset-reset: earliest + group-id: baeldung +test: + topic: embedded-test-topic \ No newline at end of file diff --git a/spring-kafka/src/test/resources/logback.xml b/spring-kafka/src/test/resources/logback.xml new file mode 100644 index 0000000000..7d900d8ea8 --- /dev/null +++ b/spring-kafka/src/test/resources/logback.xml @@ -0,0 +1,13 @@ + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + \ No newline at end of file