BAEL-4687 Testing Kafka and Spring Boot

This commit is contained in:
Jonathan Cook 2020-11-08 13:23:31 +01:00
parent f72fce6549
commit eb3631f36f
10 changed files with 297 additions and 40 deletions

View File

@ -1,9 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-kafka</name>
<description>Intro to Kafka with Spring</description>
@ -15,21 +15,29 @@
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<scope>test</scope>
<version>1.15.0-rc2</version>
</dependency>
</dependencies>
<properties>

View File

@ -0,0 +1,39 @@
package com.baeldung.kafka.embedded;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Component
public class KafkaConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
private CountDownLatch latch = new CountDownLatch(1);
private String payload = null;
@KafkaListener(topics = "${test.topic}")
public void receive(ConsumerRecord<?, ?> consumerRecord) {
LOGGER.info("received payload='{}'", consumerRecord.toString());
setPayload(consumerRecord.toString());
latch.countDown();
}
public CountDownLatch getLatch() {
return latch;
}
public String getPayload() {
return payload;
}
private void setPayload(String payload) {
this.payload = payload;
}
}

View File

@ -0,0 +1,22 @@
package com.baeldung.kafka.embedded;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class KafkaProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, String payload) {
LOGGER.info("sending payload='{}' to topic='{}'", payload, topic);
kafkaTemplate.send(topic, payload);
}
}

View File

@ -0,0 +1,15 @@
package com.baeldung.kafka.embedded;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@EnableAutoConfiguration
public class KafkaProducerConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaProducerConsumerApplication.class, args);
}
}

View File

@ -1,17 +0,0 @@
package com.baeldung;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import com.baeldung.spring.kafka.KafkaApplication;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = KafkaApplication.class)
public class SpringContextLiveTest {
@Test
public void whenSpringContextIsBootstrapped_thenNoExceptions() {
}
}

View File

@ -1,17 +0,0 @@
package com.baeldung;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import com.baeldung.spring.kafka.KafkaApplication;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = KafkaApplication.class)
public class SpringContextManualTest {
@Test
public void whenSpringContextIsBootstrapped_thenNoExceptions() {
}
}

View File

@ -0,0 +1,55 @@
package com.baeldung.kafka.embedded;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import java.util.concurrent.TimeUnit;
@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class EmbeddedKafkaIntegrationTest {
@Autowired
public KafkaTemplate<String, String> template;
@Autowired
private KafkaConsumer consumer;
@Autowired
private KafkaProducer producer;
@Value("${test.topic}")
private String topic;
@Test
public void givenEmbeddedKafkaBroker_whenSendingtoDefaultTemplate_thenMessageReceived() throws Exception {
template.send(topic, "Sending with default template");
consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
assertThat(consumer.getLatch().getCount(), equalTo(0L));
assertThat(consumer.getPayload(), containsString("embedded-test-topic"));
}
@Test
public void givenEmbeddedKafkaBroker_whenSendingtoSimpleProducer_thenMessageReceived() throws Exception {
producer.send(topic, "Sending with own controller");
consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
assertThat(consumer.getLatch().getCount(), equalTo(0L));
assertThat(consumer.getPayload(), containsString("embedded-test-topic"));
}
}

View File

@ -0,0 +1,132 @@
package com.baeldung.kafka.testcontainers;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
import com.baeldung.kafka.embedded.KafkaConsumer;
import com.baeldung.kafka.embedded.KafkaProducer;
import com.baeldung.kafka.embedded.KafkaProducerConsumerApplication;
@RunWith(SpringRunner.class)
@Import(com.baeldung.kafka.testcontainers.KafkaTestContainersIntegrationTest.KafkaTestContainersConfiguration.class)
@SpringBootTest(classes = KafkaProducerConsumerApplication.class)
@DirtiesContext
public class KafkaTestContainersIntegrationTest {
@ClassRule
public static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));
@Autowired
public KafkaTemplate<String, String> template;
@Autowired
private KafkaConsumer consumer;
@Autowired
private KafkaProducer producer;
@Value("${test.topic}")
private String topic;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
kafka.start();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
kafka.stop();
}
@Test
public void givenKafkaDockerContainer_whenSendingtoDefaultTemplate_thenMessageReceived() throws Exception {
template.send(topic, "Sending with default template");
consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
assertThat(consumer.getLatch().getCount(), equalTo(0L));
assertThat(consumer.getPayload(), containsString("embedded-test-topic"));
}
@Test
public void givenKafkaDockerContainer_whenSendingtoSimpleProducer_thenMessageReceived() throws Exception {
producer.send(topic, "Sending with own controller");
consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
assertThat(consumer.getLatch().getCount(), equalTo(0L));
assertThat(consumer.getPayload(), containsString("embedded-test-topic"));
}
@TestConfiguration
static class KafkaTestContainersConfiguration {
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "baeldung");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
}

View File

@ -0,0 +1,7 @@
spring:
kafka:
consumer:
auto-offset-reset: earliest
group-id: baeldung
test:
topic: embedded-test-topic

View File

@ -0,0 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>