diff --git a/apache-kafka-2/README.md b/apache-kafka-2/README.md new file mode 100644 index 0000000000..f43d51c20c --- /dev/null +++ b/apache-kafka-2/README.md @@ -0,0 +1,6 @@ +## Apache Kafka + +This module contains articles about Apache Kafka. + +##### Building the project +You can build the project from the command line using: *mvn clean install*, or in an IDE. diff --git a/apache-kafka-2/log4j.properties b/apache-kafka-2/log4j.properties new file mode 100644 index 0000000000..2173c5d96f --- /dev/null +++ b/apache-kafka-2/log4j.properties @@ -0,0 +1 @@ +log4j.rootLogger=INFO, stdout diff --git a/apache-kafka-2/pom.xml b/apache-kafka-2/pom.xml new file mode 100644 index 0000000000..2d81e9110a --- /dev/null +++ b/apache-kafka-2/pom.xml @@ -0,0 +1,69 @@ + + + 4.0.0 + apache-kafka-2 + apache-kafka-2 + + + com.baeldung + parent-modules + 1.0.0-SNAPSHOT + + + + + org.apache.kafka + kafka-clients + ${kafka.version} + + + org.slf4j + slf4j-api + ${org.slf4j.version} + + + org.slf4j + slf4j-log4j12 + ${org.slf4j.version} + + + com.google.guava + guava + ${guava.version} + + + org.testcontainers + kafka + ${testcontainers-kafka.version} + test + + + org.testcontainers + junit-jupiter + ${testcontainers-jupiter.version} + test + + + net.java.dev.jna + jna + 5.7.0 + test + + + org.projectlombok + lombok + ${lombok.version} + provided + + + + + 2.8.0 + 1.15.3 + 1.15.3 + 1.18.20 + + + \ No newline at end of file diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/KafkaAdminClient.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/KafkaAdminClient.java new file mode 100644 index 0000000000..e86dd74bda --- /dev/null +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/KafkaAdminClient.java @@ -0,0 +1,35 @@ +package com.baeldung.kafka; + +import java.util.Collection; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.common.Node; + +public class KafkaAdminClient { + private final AdminClient client; + + public KafkaAdminClient(String bootstrap) { + Properties props = new Properties(); + props.put("bootstrap.servers", bootstrap); + props.put("request.timeout.ms", 3000); + props.put("connections.max.idle.ms", 5000); + + this.client = AdminClient.create(props); + } + + public boolean verifyConnection() throws ExecutionException, InterruptedException { + Collection nodes = this.client.describeCluster() + .nodes() + .get(); + return nodes != null && nodes.size() > 0; + } + + public static void main(String[] args) throws ExecutionException, InterruptedException { + String defaultBootStrapServer = "localhost:9092"; + KafkaAdminClient kafkaAdminClient = new KafkaAdminClient(defaultBootStrapServer); + System.out.println(kafkaAdminClient.verifyConnection()); + } + +} diff --git a/apache-kafka-2/src/test/java/com/baeldung/kafka/KafkaConnectionLiveTest.java b/apache-kafka-2/src/test/java/com/baeldung/kafka/KafkaConnectionLiveTest.java new file mode 100644 index 0000000000..8ce604c37f --- /dev/null +++ b/apache-kafka-2/src/test/java/com/baeldung/kafka/KafkaConnectionLiveTest.java @@ -0,0 +1,39 @@ +package com.baeldung.kafka; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import static org.assertj.core.api.Assertions.assertThat; + +// This live test needs a running Docker instance so that a kafka container can be created + +@Testcontainers +class KafkaConnectionLiveTest { + + @Container + private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3")); + private KafkaAdminClient kafkaAdminClient; + + @BeforeEach + void setup() { + KAFKA_CONTAINER.addExposedPort(9092); + this.kafkaAdminClient = new KafkaAdminClient(KAFKA_CONTAINER.getBootstrapServers()); + } + + @AfterEach + void destroy() { + KAFKA_CONTAINER.stop(); + } + + @Test + void givenKafkaIsRunning_whenCheckedForConnection_thenConnectionIsVerified() throws Exception { + boolean alive = kafkaAdminClient.verifyConnection(); + assertThat(alive).isTrue(); + } + +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 2264caa99f..3918250d85 100644 --- a/pom.xml +++ b/pom.xml @@ -338,6 +338,7 @@ apache-cxf-modules apache-kafka + apache-kafka-2 apache-libraries apache-olingo apache-poi @@ -769,6 +770,7 @@ apache-cxf-modules apache-kafka + apache-kafka-2 apache-libraries apache-olingo apache-poi