diff --git a/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/TestKafkaExtractionCluster.java b/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/TestKafkaExtractionCluster.java index 724064af3bf..d72a7ac9086 100644 --- a/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/TestKafkaExtractionCluster.java +++ b/extensions-core/kafka-extraction-namespace/src/test/java/org/apache/druid/query/lookup/TestKafkaExtractionCluster.java @@ -35,6 +35,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.server.lookup.namespace.NamespaceExtractionModule; +import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -47,7 +48,6 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; import scala.Some; -import scala.collection.immutable.List$; import javax.annotation.Nonnull; import java.io.IOException; @@ -96,7 +96,7 @@ public class TestKafkaExtractionCluster getBrokerProperties(), Time.SYSTEM, Some.apply(StringUtils.format("TestingBroker[%d]-", 1)), - List$.MODULE$.empty()); + false); kafkaServer.startup(); log.info("---------------------------Started Kafka Broker ---------------------------"); @@ -203,15 +203,18 @@ public class TestKafkaExtractionCluster return kafkaProducerProperties; } - private void checkServer() + @SuppressWarnings({"unchecked", "rawtypes"}) + private void checkServer() throws Exception { - if (!kafkaServer.dataPlaneRequestProcessor().controller().isActive()) { - throw new ISE("server is not active!"); + try (Admin adminClient = Admin.create((Map) getConsumerProperties())) { + if (adminClient.describeCluster().controller().get() == null) { + throw new ISE("server is not active!"); + } } } @Test(timeout = 60_000L) - public void testSimpleLookup() throws InterruptedException + public void testSimpleLookup() throws Exception { try (final Producer producer = new KafkaProducer(makeProducerProperties())) { checkServer(); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java index 9d3f7c500c4..44020d744ee 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java @@ -31,7 +31,6 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.utils.Time; import scala.Some; -import scala.collection.immutable.List$; import javax.annotation.Nullable; import java.io.Closeable; @@ -89,7 +88,7 @@ public class TestBroker implements Closeable config, Time.SYSTEM, Some.apply(StringUtils.format("TestingBroker[%d]-", id)), - List$.MODULE$.empty() + false ); server.startup(); } diff --git a/integration-tests/docker/Dockerfile b/integration-tests/docker/Dockerfile index 421d3474055..f56ff766368 100644 --- a/integration-tests/docker/Dockerfile +++ b/integration-tests/docker/Dockerfile @@ -20,6 +20,7 @@ FROM openjdk:$JDK_VERSION as druidbase # Otherwise docker's layered images mean that things are not actually deleted. COPY base-setup.sh /root/base-setup.sh +ARG KAFKA_VERSION ARG APACHE_ARCHIVE_MIRROR_HOST=https://archive.apache.org RUN APACHE_ARCHIVE_MIRROR_HOST=${APACHE_ARCHIVE_MIRROR_HOST} /root/base-setup.sh && rm -f /root/base-setup.sh diff --git a/integration-tests/docker/base-setup.sh b/integration-tests/docker/base-setup.sh index e68f6bcccdc..b2aa33e1284 100755 --- a/integration-tests/docker/base-setup.sh +++ b/integration-tests/docker/base-setup.sh @@ -51,8 +51,7 @@ install_zk ln -s /usr/local/$ZK_TAR /usr/local/zookeeper-3.5 # Kafka -# Match the version to the Kafka client used by KafkaSupervisor -KAFKA_VERSION=2.7.0 +# KAFKA_VERSION is defined by docker build arguments wget -q -O /tmp/kafka_2.13-$KAFKA_VERSION.tgz "$APACHE_ARCHIVE_MIRROR_HOST/dist/kafka/$KAFKA_VERSION/kafka_2.13-$KAFKA_VERSION.tgz" tar -xzf /tmp/kafka_2.13-$KAFKA_VERSION.tgz -C /usr/local ln -s /usr/local/kafka_2.13-$KAFKA_VERSION /usr/local/kafka diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 2fb9adc239e..07f30b54b79 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -466,6 +466,7 @@ ${docker.run.skip} ${it.indexer} ${mysql.version} + ${apache.kafka.version} ${zk.version} ${project.basedir}/build_run_cluster.sh diff --git a/integration-tests/script/docker_build_containers.sh b/integration-tests/script/docker_build_containers.sh index ef3df477dec..6a3867af135 100755 --- a/integration-tests/script/docker_build_containers.sh +++ b/integration-tests/script/docker_build_containers.sh @@ -22,17 +22,17 @@ set -e if [ -z "$DRUID_INTEGRATION_TEST_JVM_RUNTIME" ] then echo "\$DRUID_INTEGRATION_TEST_JVM_RUNTIME is not set. Building druid-cluster with default Java version" - docker build -t druid/cluster --build-arg MYSQL_VERSION $SHARED_DIR/docker + docker build -t druid/cluster --build-arg KAFKA_VERSION --build-arg MYSQL_VERSION $SHARED_DIR/docker else echo "\$DRUID_INTEGRATION_TEST_JVM_RUNTIME is set with value ${DRUID_INTEGRATION_TEST_JVM_RUNTIME}" case "${DRUID_INTEGRATION_TEST_JVM_RUNTIME}" in 8) echo "Build druid-cluster with Java 8" - docker build -t druid/cluster --build-arg JDK_VERSION=8-slim --build-arg MYSQL_VERSION --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker + docker build -t druid/cluster --build-arg JDK_VERSION=8-slim --build-arg KAFKA_VERSION --build-arg MYSQL_VERSION --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker ;; 11) echo "Build druid-cluster with Java 11" - docker build -t druid/cluster --build-arg JDK_VERSION=11-slim --build-arg MYSQL_VERSION --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker + docker build -t druid/cluster --build-arg JDK_VERSION=11-slim --build-arg KAFKA_VERSION --build-arg MYSQL_VERSION --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker ;; *) echo "Invalid JVM Runtime given. Stopping" diff --git a/licenses.yaml b/licenses.yaml index 034b463ffa2..77edbdfbb50 100644 --- a/licenses.yaml +++ b/licenses.yaml @@ -3672,7 +3672,7 @@ libraries: --- name: Apache Kafka -version: 2.7.0 +version: 2.8.0 license_category: binary module: extensions/druid-kafka-indexing-service license_name: Apache License version 2.0 @@ -4585,9 +4585,8 @@ name: Apache Kafka license_category: binary module: extensions/kafka-extraction-namespace license_name: Apache License version 2.0 -version: 2.7.0 +version: 2.8.0 libraries: - - org.apache.kafka: kafka_2.13 - org.apache.kafka: kafka-clients notices: - kafka-clients: diff --git a/pom.xml b/pom.xml index c47e04ab69a..d817a658824 100644 --- a/pom.xml +++ b/pom.xml @@ -77,7 +77,7 @@ 0.9.0.M2 4.3.0 2.12.0 - 2.7.0 + 2.8.0 2.0.0 2.2.4 1.17.0