Merge pull request #12337 from hkhan/JAVA-12591-fix-kafka-integration-test

[JAVA-12591] Fix Kafka integration test
This commit is contained in:
kwoyke 2022-06-13 08:56:14 +02:00 committed by GitHub
commit 2d9171f3a6
2 changed files with 30 additions and 17 deletions

View File

@ -39,8 +39,9 @@ class EmbeddedKafkaIntegrationTest {
} }
@Test @Test
public void givenEmbeddedKafkaBroker_whenSendingtoDefaultTemplate_thenMessageReceived() throws Exception { public void givenEmbeddedKafkaBroker_whenSendingWithDefaultTemplate_thenMessageReceived() throws Exception {
String data = "Sending with default template"; String data = "Sending with default template";
template.send(topic, data); template.send(topic, data);
boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS); boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS);
@ -49,8 +50,9 @@ class EmbeddedKafkaIntegrationTest {
} }
@Test @Test
public void givenEmbeddedKafkaBroker_whenSendingtoSimpleProducer_thenMessageReceived() throws Exception { public void givenEmbeddedKafkaBroker_whenSendingWithSimpleProducer_thenMessageReceived() throws Exception {
String data = "Sending with our own simple KafkaProducer"; String data = "Sending with our own simple KafkaProducer";
producer.send(topic, data); producer.send(topic, data);
boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS); boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS);

View File

@ -3,6 +3,7 @@ package com.baeldung.kafka.testcontainers;
import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -12,6 +13,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Before;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@ -64,22 +66,31 @@ public class KafkaTestContainersLiveTest {
@Value("${test.topic}") @Value("${test.topic}")
private String topic; private String topic;
@Test @Before
public void givenKafkaDockerContainer_whenSendingtoDefaultTemplate_thenMessageReceived() throws Exception { public void setup() {
template.send(topic, "Sending with default template"); consumer.resetLatch();
consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
assertThat(consumer.getLatch().getCount(), equalTo(0L));
assertThat(consumer.getPayload(), containsString("embedded-test-topic"));
} }
@Test @Test
public void givenKafkaDockerContainer_whenSendingtoSimpleProducer_thenMessageReceived() throws Exception { public void givenKafkaDockerContainer_whenSendingWithDefaultTemplate_thenMessageReceived() throws Exception {
producer.send(topic, "Sending with own controller"); String data = "Sending with default template";
consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
assertThat(consumer.getLatch().getCount(), equalTo(0L)); template.send(topic, data);
assertThat(consumer.getPayload(), containsString("embedded-test-topic"));
boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS);
assertTrue(messageConsumed);
assertThat(consumer.getPayload(), containsString(data));
}
@Test
public void givenKafkaDockerContainer_whenSendingWithSimpleProducer_thenMessageReceived() throws Exception {
String data = "Sending with our own simple KafkaProducer";
producer.send(topic, data);
boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS);
assertTrue(messageConsumed);
assertThat(consumer.getPayload(), containsString(data));
} }
@TestConfiguration @TestConfiguration