[JAVA-12591] Fix Kafka integration test

This commit is contained in:
Haroon Khan 2022-06-09 23:16:24 +01:00
parent edc22b309a
commit 19a08f66d9
2 changed files with 27 additions and 16 deletions

View File

@ -15,12 +15,14 @@ public class KafkaConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class); private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
private CountDownLatch latch = new CountDownLatch(1); private CountDownLatch latch = new CountDownLatch(1);
private String payload = null;
private String payload;
@KafkaListener(topics = "${test.topic}") @KafkaListener(topics = "${test.topic}")
public void receive(ConsumerRecord<?, ?> consumerRecord) { public void receive(ConsumerRecord<?, ?> consumerRecord) {
LOGGER.info("received payload='{}'", consumerRecord.toString()); LOGGER.info("received payload='{}'", consumerRecord.toString());
setPayload(consumerRecord.toString());
payload = consumerRecord.toString();
latch.countDown(); latch.countDown();
} }
@ -28,12 +30,12 @@ public class KafkaConsumer {
return latch; return latch;
} }
public void resetLatch() {
latch = new CountDownLatch(1);
}
public String getPayload() { public String getPayload() {
return payload; return payload;
} }
private void setPayload(String payload) {
this.payload = payload;
}
} }

View File

@ -3,9 +3,11 @@ package com.baeldung.kafka.embedded;
import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
@ -31,22 +33,29 @@ class EmbeddedKafkaIntegrationTest {
@Value("${test.topic}") @Value("${test.topic}")
private String topic; private String topic;
@BeforeEach
void setup() {
consumer.resetLatch();
}
@Test @Test
public void givenEmbeddedKafkaBroker_whenSendingtoDefaultTemplate_thenMessageReceived() throws Exception { public void givenEmbeddedKafkaBroker_whenSendingtoDefaultTemplate_thenMessageReceived() throws Exception {
template.send(topic, "Sending with default template"); String data = "Sending with default template";
consumer.getLatch().await(10000, TimeUnit.MILLISECONDS); template.send(topic, data);
assertThat(consumer.getLatch().getCount(), equalTo(0L));
boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS);
assertThat(consumer.getPayload(), containsString("embedded-test-topic")); assertTrue(messageConsumed);
assertThat(consumer.getPayload(), containsString(data));
} }
@Test @Test
public void givenEmbeddedKafkaBroker_whenSendingtoSimpleProducer_thenMessageReceived() throws Exception { public void givenEmbeddedKafkaBroker_whenSendingtoSimpleProducer_thenMessageReceived() throws Exception {
producer.send(topic, "Sending with our own simple KafkaProducer"); String data = "Sending with our own simple KafkaProducer";
consumer.getLatch().await(10000, TimeUnit.MILLISECONDS); producer.send(topic, data);
assertThat(consumer.getLatch().getCount(), equalTo(0L)); boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS);
assertThat(consumer.getPayload(), containsString("embedded-test-topic")); assertTrue(messageConsumed);
assertThat(consumer.getPayload(), containsString(data));
} }
} }