BAEL-4609 - Testing Kafka and Spring Boot (#10249)
* BAEL-4437 - System Rules * BAEL-4687 Testing Kafka and Spring Boot * BAEL-4609 - Testing Kafka and Spring Boot Co-authored-by: Jonathan Cook <jcook@sciops.esa.int>
This commit is contained in:
parent
9bce552d6d
commit
cfb424f9d1
|
@ -1,9 +1,9 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<name>spring-kafka</name>
|
||||
<description>Intro to Kafka with Spring</description>
|
||||
|
||||
|
@ -15,25 +15,36 @@
|
|||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
<version>${spring-kafka.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka-test</artifactId>
|
||||
<version>${spring-kafka.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.testcontainers</groupId>
|
||||
<artifactId>kafka</artifactId>
|
||||
<version>${testcontainers-kafka.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<properties>
|
||||
<spring-kafka.version>2.3.7.RELEASE</spring-kafka.version>
|
||||
<spring-kafka.version>2.5.8.RELEASE</spring-kafka.version>
|
||||
<testcontainers-kafka.version>1.15.0</testcontainers-kafka.version>
|
||||
</properties>
|
||||
|
||||
</project>
|
|
@ -0,0 +1,39 @@
|
|||
package com.baeldung.kafka.embedded;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@Component
|
||||
public class KafkaConsumer {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
|
||||
|
||||
private CountDownLatch latch = new CountDownLatch(1);
|
||||
private String payload = null;
|
||||
|
||||
@KafkaListener(topics = "${test.topic}")
|
||||
public void receive(ConsumerRecord<?, ?> consumerRecord) {
|
||||
LOGGER.info("received payload='{}'", consumerRecord.toString());
|
||||
setPayload(consumerRecord.toString());
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
public CountDownLatch getLatch() {
|
||||
return latch;
|
||||
}
|
||||
|
||||
public String getPayload() {
|
||||
return payload;
|
||||
}
|
||||
|
||||
private void setPayload(String payload) {
|
||||
this.payload = payload;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,22 @@
|
|||
package com.baeldung.kafka.embedded;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
public class KafkaProducer {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);
|
||||
|
||||
@Autowired
|
||||
private KafkaTemplate<String, String> kafkaTemplate;
|
||||
|
||||
public void send(String topic, String payload) {
|
||||
LOGGER.info("sending payload='{}' to topic='{}'", payload, topic);
|
||||
kafkaTemplate.send(topic, payload);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,15 @@
|
|||
package com.baeldung.kafka.embedded;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
|
||||
@SpringBootApplication
|
||||
@EnableAutoConfiguration
|
||||
public class KafkaProducerConsumerApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(KafkaProducerConsumerApplication.class, args);
|
||||
}
|
||||
|
||||
}
|
|
@ -1,17 +0,0 @@
|
|||
package com.baeldung;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
import com.baeldung.spring.kafka.KafkaApplication;
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest(classes = KafkaApplication.class)
|
||||
public class SpringContextLiveTest {
|
||||
|
||||
@Test
|
||||
public void whenSpringContextIsBootstrapped_thenNoExceptions() {
|
||||
}
|
||||
}
|
|
@ -1,17 +0,0 @@
|
|||
package com.baeldung;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
import com.baeldung.spring.kafka.KafkaApplication;
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest(classes = KafkaApplication.class)
|
||||
public class SpringContextManualTest {
|
||||
|
||||
@Test
|
||||
public void whenSpringContextIsBootstrapped_thenNoExceptions() {
|
||||
}
|
||||
}
|
|
@ -0,0 +1,52 @@
|
|||
package com.baeldung.kafka.embedded;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.containsString;
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.test.context.EmbeddedKafka;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
|
||||
@SpringBootTest
|
||||
@DirtiesContext
|
||||
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
|
||||
class EmbeddedKafkaIntegrationTest {
|
||||
|
||||
@Autowired
|
||||
public KafkaTemplate<String, String> template;
|
||||
|
||||
@Autowired
|
||||
private KafkaConsumer consumer;
|
||||
|
||||
@Autowired
|
||||
private KafkaProducer producer;
|
||||
|
||||
@Value("${test.topic}")
|
||||
private String topic;
|
||||
|
||||
@Test
|
||||
public void givenEmbeddedKafkaBroker_whenSendingtoDefaultTemplate_thenMessageReceived() throws Exception {
|
||||
template.send(topic, "Sending with default template");
|
||||
consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
|
||||
assertThat(consumer.getLatch().getCount(), equalTo(0L));
|
||||
|
||||
assertThat(consumer.getPayload(), containsString("embedded-test-topic"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenEmbeddedKafkaBroker_whenSendingtoSimpleProducer_thenMessageReceived() throws Exception {
|
||||
producer.send(topic, "Sending with our own simple KafkaProducer");
|
||||
consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
|
||||
|
||||
assertThat(consumer.getLatch().getCount(), equalTo(0L));
|
||||
assertThat(consumer.getPayload(), containsString("embedded-test-topic"));
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,120 @@
|
|||
package com.baeldung.kafka.testcontainers;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.containsString;
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.boot.test.context.TestConfiguration;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Import;
|
||||
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.core.ProducerFactory;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
import org.testcontainers.containers.KafkaContainer;
|
||||
import org.testcontainers.utility.DockerImageName;
|
||||
|
||||
import com.baeldung.kafka.embedded.KafkaConsumer;
|
||||
import com.baeldung.kafka.embedded.KafkaProducer;
|
||||
import com.baeldung.kafka.embedded.KafkaProducerConsumerApplication;
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@Import(com.baeldung.kafka.testcontainers.KafkaTestContainersIntegrationTest.KafkaTestContainersConfiguration.class)
|
||||
@SpringBootTest(classes = KafkaProducerConsumerApplication.class)
|
||||
@DirtiesContext
|
||||
public class KafkaTestContainersIntegrationTest {
|
||||
|
||||
@ClassRule
|
||||
public static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));
|
||||
|
||||
@Autowired
|
||||
public KafkaTemplate<String, String> template;
|
||||
|
||||
@Autowired
|
||||
private KafkaConsumer consumer;
|
||||
|
||||
@Autowired
|
||||
private KafkaProducer producer;
|
||||
|
||||
@Value("${test.topic}")
|
||||
private String topic;
|
||||
|
||||
@Test
|
||||
public void givenKafkaDockerContainer_whenSendingtoDefaultTemplate_thenMessageReceived() throws Exception {
|
||||
template.send(topic, "Sending with default template");
|
||||
consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
|
||||
|
||||
assertThat(consumer.getLatch().getCount(), equalTo(0L));
|
||||
assertThat(consumer.getPayload(), containsString("embedded-test-topic"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenKafkaDockerContainer_whenSendingtoSimpleProducer_thenMessageReceived() throws Exception {
|
||||
producer.send(topic, "Sending with own controller");
|
||||
consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
|
||||
|
||||
assertThat(consumer.getLatch().getCount(), equalTo(0L));
|
||||
assertThat(consumer.getPayload(), containsString("embedded-test-topic"));
|
||||
}
|
||||
|
||||
@TestConfiguration
|
||||
static class KafkaTestContainersConfiguration {
|
||||
|
||||
@Bean
|
||||
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
|
||||
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
|
||||
factory.setConsumerFactory(consumerFactory());
|
||||
return factory;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ConsumerFactory<Integer, String> consumerFactory() {
|
||||
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Map<String, Object> consumerConfigs() {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
|
||||
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
props.put(ConsumerConfig.GROUP_ID_CONFIG, "baeldung");
|
||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||
return props;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ProducerFactory<String, String> producerFactory() {
|
||||
Map<String, Object> configProps = new HashMap<>();
|
||||
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
|
||||
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||
return new DefaultKafkaProducerFactory<>(configProps);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public KafkaTemplate<String, String> kafkaTemplate() {
|
||||
return new KafkaTemplate<>(producerFactory());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,7 @@
|
|||
spring:
|
||||
kafka:
|
||||
consumer:
|
||||
auto-offset-reset: earliest
|
||||
group-id: baeldung
|
||||
test:
|
||||
topic: embedded-test-topic
|
|
@ -0,0 +1,13 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<configuration>
|
||||
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
|
||||
</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<root level="INFO">
|
||||
<appender-ref ref="STDOUT" />
|
||||
</root>
|
||||
</configuration>
|
Loading…
Reference in New Issue