upgrade Apache Kafka to 2.8.0 (#11139)

* upgrade to Apache Kafka 2.8.0 (release notes:
  https://downloads.apache.org/kafka/2.8.0/RELEASE_NOTES.html)
* pass Kafka version as a Docker argument in integration tests
  to keep in sync with maven version
* fix use of internal Kafka APIs in integration tests
This commit is contained in:
Xavier Léauté 2021-04-24 08:27:07 -07:00 committed by GitHub
parent 31042cddf5
commit 0296f20551
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 19 additions and 17 deletions

View File

@ -35,6 +35,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.lookup.namespace.NamespaceExtractionModule; import org.apache.druid.server.lookup.namespace.NamespaceExtractionModule;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
@ -47,7 +48,6 @@ import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder; import org.junit.rules.TemporaryFolder;
import scala.Some; import scala.Some;
import scala.collection.immutable.List$;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import java.io.IOException; import java.io.IOException;
@ -96,7 +96,7 @@ public class TestKafkaExtractionCluster
getBrokerProperties(), getBrokerProperties(),
Time.SYSTEM, Time.SYSTEM,
Some.apply(StringUtils.format("TestingBroker[%d]-", 1)), Some.apply(StringUtils.format("TestingBroker[%d]-", 1)),
List$.MODULE$.empty()); false);
kafkaServer.startup(); kafkaServer.startup();
log.info("---------------------------Started Kafka Broker ---------------------------"); log.info("---------------------------Started Kafka Broker ---------------------------");
@ -203,15 +203,18 @@ public class TestKafkaExtractionCluster
return kafkaProducerProperties; return kafkaProducerProperties;
} }
private void checkServer() @SuppressWarnings({"unchecked", "rawtypes"})
private void checkServer() throws Exception
{ {
if (!kafkaServer.dataPlaneRequestProcessor().controller().isActive()) { try (Admin adminClient = Admin.create((Map) getConsumerProperties())) {
throw new ISE("server is not active!"); if (adminClient.describeCluster().controller().get() == null) {
throw new ISE("server is not active!");
}
} }
} }
@Test(timeout = 60_000L) @Test(timeout = 60_000L)
public void testSimpleLookup() throws InterruptedException public void testSimpleLookup() throws Exception
{ {
try (final Producer<byte[], byte[]> producer = new KafkaProducer(makeProducerProperties())) { try (final Producer<byte[], byte[]> producer = new KafkaProducer(makeProducerProperties())) {
checkServer(); checkServer();

View File

@ -31,7 +31,6 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import scala.Some; import scala.Some;
import scala.collection.immutable.List$;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.Closeable; import java.io.Closeable;
@ -89,7 +88,7 @@ public class TestBroker implements Closeable
config, config,
Time.SYSTEM, Time.SYSTEM,
Some.apply(StringUtils.format("TestingBroker[%d]-", id)), Some.apply(StringUtils.format("TestingBroker[%d]-", id)),
List$.MODULE$.empty() false
); );
server.startup(); server.startup();
} }

View File

@ -20,6 +20,7 @@ FROM openjdk:$JDK_VERSION as druidbase
# Otherwise docker's layered images mean that things are not actually deleted. # Otherwise docker's layered images mean that things are not actually deleted.
COPY base-setup.sh /root/base-setup.sh COPY base-setup.sh /root/base-setup.sh
ARG KAFKA_VERSION
ARG APACHE_ARCHIVE_MIRROR_HOST=https://archive.apache.org ARG APACHE_ARCHIVE_MIRROR_HOST=https://archive.apache.org
RUN APACHE_ARCHIVE_MIRROR_HOST=${APACHE_ARCHIVE_MIRROR_HOST} /root/base-setup.sh && rm -f /root/base-setup.sh RUN APACHE_ARCHIVE_MIRROR_HOST=${APACHE_ARCHIVE_MIRROR_HOST} /root/base-setup.sh && rm -f /root/base-setup.sh

View File

@ -51,8 +51,7 @@ install_zk
ln -s /usr/local/$ZK_TAR /usr/local/zookeeper-3.5 ln -s /usr/local/$ZK_TAR /usr/local/zookeeper-3.5
# Kafka # Kafka
# Match the version to the Kafka client used by KafkaSupervisor # KAFKA_VERSION is defined by docker build arguments
KAFKA_VERSION=2.7.0
wget -q -O /tmp/kafka_2.13-$KAFKA_VERSION.tgz "$APACHE_ARCHIVE_MIRROR_HOST/dist/kafka/$KAFKA_VERSION/kafka_2.13-$KAFKA_VERSION.tgz" wget -q -O /tmp/kafka_2.13-$KAFKA_VERSION.tgz "$APACHE_ARCHIVE_MIRROR_HOST/dist/kafka/$KAFKA_VERSION/kafka_2.13-$KAFKA_VERSION.tgz"
tar -xzf /tmp/kafka_2.13-$KAFKA_VERSION.tgz -C /usr/local tar -xzf /tmp/kafka_2.13-$KAFKA_VERSION.tgz -C /usr/local
ln -s /usr/local/kafka_2.13-$KAFKA_VERSION /usr/local/kafka ln -s /usr/local/kafka_2.13-$KAFKA_VERSION /usr/local/kafka

View File

@ -466,6 +466,7 @@
<DRUID_INTEGRATION_TEST_SKIP_RUN_DOCKER>${docker.run.skip}</DRUID_INTEGRATION_TEST_SKIP_RUN_DOCKER> <DRUID_INTEGRATION_TEST_SKIP_RUN_DOCKER>${docker.run.skip}</DRUID_INTEGRATION_TEST_SKIP_RUN_DOCKER>
<DRUID_INTEGRATION_TEST_INDEXER>${it.indexer}</DRUID_INTEGRATION_TEST_INDEXER> <DRUID_INTEGRATION_TEST_INDEXER>${it.indexer}</DRUID_INTEGRATION_TEST_INDEXER>
<MYSQL_VERSION>${mysql.version}</MYSQL_VERSION> <MYSQL_VERSION>${mysql.version}</MYSQL_VERSION>
<KAFKA_VERSION>${apache.kafka.version}</KAFKA_VERSION>
<ZK_VERSION>${zk.version}</ZK_VERSION> <ZK_VERSION>${zk.version}</ZK_VERSION>
</environmentVariables> </environmentVariables>
<executable>${project.basedir}/build_run_cluster.sh</executable> <executable>${project.basedir}/build_run_cluster.sh</executable>

View File

@ -22,17 +22,17 @@ set -e
if [ -z "$DRUID_INTEGRATION_TEST_JVM_RUNTIME" ] if [ -z "$DRUID_INTEGRATION_TEST_JVM_RUNTIME" ]
then then
echo "\$DRUID_INTEGRATION_TEST_JVM_RUNTIME is not set. Building druid-cluster with default Java version" echo "\$DRUID_INTEGRATION_TEST_JVM_RUNTIME is not set. Building druid-cluster with default Java version"
docker build -t druid/cluster --build-arg MYSQL_VERSION $SHARED_DIR/docker docker build -t druid/cluster --build-arg KAFKA_VERSION --build-arg MYSQL_VERSION $SHARED_DIR/docker
else else
echo "\$DRUID_INTEGRATION_TEST_JVM_RUNTIME is set with value ${DRUID_INTEGRATION_TEST_JVM_RUNTIME}" echo "\$DRUID_INTEGRATION_TEST_JVM_RUNTIME is set with value ${DRUID_INTEGRATION_TEST_JVM_RUNTIME}"
case "${DRUID_INTEGRATION_TEST_JVM_RUNTIME}" in case "${DRUID_INTEGRATION_TEST_JVM_RUNTIME}" in
8) 8)
echo "Build druid-cluster with Java 8" echo "Build druid-cluster with Java 8"
docker build -t druid/cluster --build-arg JDK_VERSION=8-slim --build-arg MYSQL_VERSION --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker docker build -t druid/cluster --build-arg JDK_VERSION=8-slim --build-arg KAFKA_VERSION --build-arg MYSQL_VERSION --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker
;; ;;
11) 11)
echo "Build druid-cluster with Java 11" echo "Build druid-cluster with Java 11"
docker build -t druid/cluster --build-arg JDK_VERSION=11-slim --build-arg MYSQL_VERSION --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker docker build -t druid/cluster --build-arg JDK_VERSION=11-slim --build-arg KAFKA_VERSION --build-arg MYSQL_VERSION --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker
;; ;;
*) *)
echo "Invalid JVM Runtime given. Stopping" echo "Invalid JVM Runtime given. Stopping"

View File

@ -3672,7 +3672,7 @@ libraries:
--- ---
name: Apache Kafka name: Apache Kafka
version: 2.7.0 version: 2.8.0
license_category: binary license_category: binary
module: extensions/druid-kafka-indexing-service module: extensions/druid-kafka-indexing-service
license_name: Apache License version 2.0 license_name: Apache License version 2.0
@ -4585,9 +4585,8 @@ name: Apache Kafka
license_category: binary license_category: binary
module: extensions/kafka-extraction-namespace module: extensions/kafka-extraction-namespace
license_name: Apache License version 2.0 license_name: Apache License version 2.0
version: 2.7.0 version: 2.8.0
libraries: libraries:
- org.apache.kafka: kafka_2.13
- org.apache.kafka: kafka-clients - org.apache.kafka: kafka-clients
notices: notices:
- kafka-clients: - kafka-clients:

View File

@ -77,7 +77,7 @@
<aether.version>0.9.0.M2</aether.version> <aether.version>0.9.0.M2</aether.version>
<apache.curator.version>4.3.0</apache.curator.version> <apache.curator.version>4.3.0</apache.curator.version>
<apache.curator.test.version>2.12.0</apache.curator.test.version> <apache.curator.test.version>2.12.0</apache.curator.test.version>
<apache.kafka.version>2.7.0</apache.kafka.version> <apache.kafka.version>2.8.0</apache.kafka.version>
<apache.ranger.version>2.0.0</apache.ranger.version> <apache.ranger.version>2.0.0</apache.ranger.version>
<apache.ranger.gson.version>2.2.4</apache.ranger.gson.version> <apache.ranger.gson.version>2.2.4</apache.ranger.gson.version>
<avatica.version>1.17.0</avatica.version> <avatica.version>1.17.0</avatica.version>