BAEL-4609 - Testing Kafka and Spring Boot

This commit is contained in:
Jonathan Cook 2020-11-13 16:42:33 +01:00
parent eb3631f36f
commit a51e034186
3 changed files with 14 additions and 26 deletions

View File

@ -22,6 +22,7 @@
<dependency> <dependency>
<groupId>org.springframework.kafka</groupId> <groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId> <artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.fasterxml.jackson.core</groupId> <groupId>com.fasterxml.jackson.core</groupId>
@ -30,18 +31,20 @@
<dependency> <dependency>
<groupId>org.springframework.kafka</groupId> <groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId> <artifactId>spring-kafka-test</artifactId>
<version>${spring-kafka.version}</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.testcontainers</groupId> <groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId> <artifactId>kafka</artifactId>
<version>${testcontainers-kafka.version}</version>
<scope>test</scope> <scope>test</scope>
<version>1.15.0-rc2</version>
</dependency> </dependency>
</dependencies> </dependencies>
<properties> <properties>
<spring-kafka.version>2.3.7.RELEASE</spring-kafka.version> <spring-kafka.version>2.5.8.RELEASE</spring-kafka.version>
<testcontainers-kafka.version>1.15.0</testcontainers-kafka.version>
</properties> </properties>
</project> </project>

View File

@ -1,22 +1,19 @@
package com.baeldung.kafka.embedded; package com.baeldung.kafka.embedded;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;
import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@RunWith(SpringRunner.class) import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.annotation.DirtiesContext;
@SpringBootTest @SpringBootTest
@DirtiesContext @DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" }) @EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
@ -45,7 +42,7 @@ class EmbeddedKafkaIntegrationTest {
@Test @Test
public void givenEmbeddedKafkaBroker_whenSendingtoSimpleProducer_thenMessageReceived() throws Exception { public void givenEmbeddedKafkaBroker_whenSendingtoSimpleProducer_thenMessageReceived() throws Exception {
producer.send(topic, "Sending with own controller"); producer.send(topic, "Sending with our own simple KafkaProducer");
consumer.getLatch().await(10000, TimeUnit.MILLISECONDS); consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
assertThat(consumer.getLatch().getCount(), equalTo(0L)); assertThat(consumer.getLatch().getCount(), equalTo(0L));

View File

@ -12,8 +12,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@ -59,16 +57,6 @@ public class KafkaTestContainersIntegrationTest {
@Value("${test.topic}") @Value("${test.topic}")
private String topic; private String topic;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
kafka.start();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
kafka.stop();
}
@Test @Test
public void givenKafkaDockerContainer_whenSendingtoDefaultTemplate_thenMessageReceived() throws Exception { public void givenKafkaDockerContainer_whenSendingtoDefaultTemplate_thenMessageReceived() throws Exception {
template.send(topic, "Sending with default template"); template.send(topic, "Sending with default template");