update kafka client version to 2.5.0 (#9902)

- remove dependency on deprecated internal Kafka classes
- keep LZ4 version in line with the version shipped with Kafka
This commit is contained in:
Xavier Léauté 2020-05-27 13:20:32 -07:00 committed by GitHub
parent b5dfa5322b
commit 65280a6953
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 82 additions and 62 deletions

View File

@ -38,7 +38,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.2</version>
<version>${apache.kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>

View File

@ -97,11 +97,6 @@
<artifactId>jackson-databind</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
@ -135,6 +130,13 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<!-- Kafka brokers require ZooKeeper 3.5.x clients for testing -->
<version>3.5.8</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
@ -176,7 +178,7 @@
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.7</version>
<version>2.12.10</version>
<scope>test</scope>
</dependency>
</dependencies>

View File

@ -141,6 +141,13 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<!-- Kafka brokers require ZooKeeper 3.5.x clients for testing -->
<version>3.5.8</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
@ -188,7 +195,7 @@
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.7</version>
<version>2.12.10</version>
<scope>test</scope>
</dependency>
<dependency>

View File

@ -27,10 +27,6 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import kafka.admin.AdminUtils;
import kafka.admin.BrokerMetadata;
import kafka.admin.RackAwareMode;
import kafka.utils.ZkUtils;
import org.apache.curator.test.TestingCluster;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.impl.DimensionSchema;
@ -89,10 +85,12 @@ import org.apache.druid.segment.realtime.appenderator.DummyForInjectionAppendera
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.server.metrics.ExceptionCapturingServiceEmitter;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.easymock.Capture;
@ -111,8 +109,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import scala.Option;
import scala.collection.Seq;
import java.io.File;
import java.io.IOException;
@ -148,7 +144,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
private static String kafkaHost;
private static DataSchema dataSchema;
private static int topicPostfix;
private static ZkUtils zkUtils;
private final int numThreads;
@ -202,8 +197,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
kafkaHost = StringUtils.format("localhost:%d", kafkaServer.getPort());
dataSchema = getDataSchema(DATASOURCE);
zkUtils = ZkUtils.apply(zkServer.getConnectString(), 30000, 30000, JaasUtils.isZkSecurityEnabled());
}
@Before
@ -237,9 +230,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
zkServer.stop();
zkServer = null;
zkUtils.close();
zkUtils = null;
}
@Test
@ -3242,8 +3232,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
private void addSomeEvents(int numEventsPerPartition) throws Exception
{
//create topic manually
AdminUtils.createTopic(zkUtils, topic, NUM_PARTITIONS, 1, new Properties(), RackAwareMode.Enforced$.MODULE$);
// create topic manually
try (Admin admin = kafkaServer.newAdminClient()) {
admin.createTopics(
Collections.singletonList(new NewTopic(topic, NUM_PARTITIONS, (short) 1))
).all().get();
}
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
kafkaProducer.initTransactions();
@ -3266,23 +3260,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
private void addMoreEvents(int numEventsPerPartition, int num_partitions) throws Exception
{
Seq<BrokerMetadata> brokerList = AdminUtils.getBrokerMetadatas(
zkUtils,
RackAwareMode.Enforced$.MODULE$,
Option.apply(zkUtils.getSortedBrokerList())
);
scala.collection.Map<Object, Seq<Object>> replicaAssignment = AdminUtils.assignReplicasToBrokers(
brokerList,
num_partitions,
1, 0, 0
);
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(
zkUtils,
topic,
replicaAssignment,
new Properties(),
true
);
try (Admin admin = kafkaServer.newAdminClient()) {
admin.createPartitions(Collections.singletonMap(topic, NewPartitions.increaseTo(num_partitions))).all().get();
}
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
kafkaProducer.initTransactions();

View File

@ -25,6 +25,7 @@ import kafka.server.KafkaServer;
import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
@ -100,18 +101,30 @@ public class TestBroker implements Closeable
public KafkaProducer<byte[], byte[]> newProducer()
{
return new KafkaProducer(producerProperties());
return new KafkaProducer<>(producerProperties());
}
public Admin newAdminClient()
{
return Admin.create(adminClientProperties());
}
Map<String, Object> adminClientProperties()
{
final Map<String, Object> props = new HashMap<>();
commonClientProperties(props);
return props;
}
public KafkaConsumer<byte[], byte[]> newConsumer()
{
return new KafkaConsumer(consumerProperties());
return new KafkaConsumer<>(consumerProperties());
}
public Map<String, String> producerProperties()
public Map<String, Object> producerProperties()
{
final Map<String, String> props = new HashMap<>();
props.put("bootstrap.servers", StringUtils.format("localhost:%d", getPort()));
final Map<String, Object> props = new HashMap<>();
commonClientProperties(props);
props.put("key.serializer", ByteArraySerializer.class.getName());
props.put("value.serializer", ByteArraySerializer.class.getName());
props.put("acks", "all");
@ -120,6 +133,11 @@ public class TestBroker implements Closeable
return props;
}
void commonClientProperties(Map<String, Object> props)
{
props.put("bootstrap.servers", StringUtils.format("localhost:%d", getPort()));
}
public Map<String, Object> consumerProperties()
{
final Map<String, Object> props = KafkaConsumerConfigs.getConsumerProperties();

View File

@ -31,18 +31,26 @@ apt-get install -y mysql-server
apt-get install -y supervisor
# Zookeeper
wget -q -O /tmp/zookeeper-3.4.14.tar.gz "https://apache.org/dist/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz"
tar -xzf /tmp/zookeeper-3.4.14.tar.gz -C /usr/local
cp /usr/local/zookeeper-3.4.14/conf/zoo_sample.cfg /usr/local/zookeeper-3.4.14/conf/zoo.cfg
ln -s /usr/local/zookeeper-3.4.14 /usr/local/zookeeper
rm /tmp/zookeeper-3.4.14.tar.gz
#ZK_VERSION=3.5.8
#ZK_TAR=apache-zookeeper-$ZK_VERSION-bin
ZK_VERISON=3.4.14
ZK_TAR=zookeeper-$ZK_VERSION
wget -q -O /tmp/$ZK_TAR.tar.gz "https://apache.org/dist/zookeeper/zookeeper-$ZK_VERSION/$ZK_TAR.tar.gz"
tar -xzf /tmp/$ZK_TAR.tar.gz -C /usr/local
cp /usr/local/$ZK_TAR/conf/zoo_sample.cfg /usr/local/$ZK_TAR/conf/zoo.cfg
ln -s /usr/local/$ZK_TAR /usr/local/zookeeper
rm /tmp/$ZK_TAR.tar.gz
# Kafka
# Match the version to the Kafka client used by KafkaSupervisor
wget -q -O /tmp/kafka_2.12-2.1.1.tgz "https://apache.org/dist/kafka/2.1.1/kafka_2.12-2.1.1.tgz"
tar -xzf /tmp/kafka_2.12-2.1.1.tgz -C /usr/local
ln -s /usr/local/kafka_2.12-2.1.1 /usr/local/kafka
rm /tmp/kafka_2.12-2.1.1.tgz
KAFKA_VERSION=2.5.0
wget -q -O /tmp/kafka_2.12-$KAFKA_VERSION.tgz "https://apache.org/dist/kafka/$KAFKA_VERSION/kafka_2.12-$KAFKA_VERSION.tgz"
tar -xzf /tmp/kafka_2.12-$KAFKA_VERSION.tgz -C /usr/local
ln -s /usr/local/kafka_2.12-$KAFKA_VERSION /usr/local/kafka
rm /tmp/kafka_2.12-$KAFKA_VERSION.tgz
# Druid system user
adduser --system --group --no-create-home druid \

View File

@ -1975,7 +1975,7 @@ name: LZ4 Java
license_category: binary
module: java-core
license_name: Apache License version 2.0
version: 1.6.0
version: 1.7.1
libraries:
- org.lz4: lz4-java
@ -3251,7 +3251,7 @@ libraries:
---
name: Apache Kafka
version: 2.2.2
version: 2.5.0
license_category: binary
module: extensions/druid-kafka-indexing-service
license_name: Apache License version 2.0
@ -4159,7 +4159,7 @@ name: Apache Kafka
license_category: binary
module: extensions/kafka-extraction-namespace
license_name: Apache License version 2.0
version: 2.2.2
version: 2.5.0
libraries:
- org.apache.kafka: kafka_2.12
- org.apache.kafka: kafka-clients

View File

@ -78,7 +78,7 @@
<aether.version>0.9.0.M2</aether.version>
<apache.curator.version>4.3.0</apache.curator.version>
<apache.curator.test.version>2.12.0</apache.curator.test.version>
<apache.kafka.version>2.2.2</apache.kafka.version>
<apache.kafka.version>2.5.0</apache.kafka.version>
<apache.ranger.version>2.0.0</apache.ranger.version>
<apache.ranger.gson.version>2.2.4</apache.ranger.gson.version>
<avatica.version>1.15.0</avatica.version>
@ -111,6 +111,9 @@
<powermock.version>2.0.2</powermock.version>
<aws.sdk.version>1.11.199</aws.sdk.version>
<caffeine.version>2.8.0</caffeine.version>
<!-- Curator requires 3.4.x ZooKeeper clients to maintain compatibility with 3.4.x ZooKeeper servers,
If we upgrade to 3.5.x clients, curator requires 3.5.x servers, which would break backwards compatibility
see http://curator.apache.org/zk-compatibility.html -->
<!-- When upgrading ZK, edit docs and integration tests as well (integration-tests/docker-base/setup.sh) -->
<zookeeper.version>3.4.14</zookeeper.version>
<checkerframework.version>2.5.7</checkerframework.version>
@ -771,7 +774,7 @@
<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
<version>1.6.0</version>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>

View File

@ -58,13 +58,15 @@ function _download_zookeeper() {
local dest="$1"
local zk_version
zk_version="$(_get_zookeeper_version)"
#zk_tar=apache-zookeeper-${zk_version}-bin # for zk 3.5.x
zk_tar=zookeeper-${zk_version} # for zk 3.4.x
_log "Downloading zookeeper"
curl -s "https://archive.apache.org/dist/zookeeper/zookeeper-${zk_version}/zookeeper-${zk_version}.tar.gz" \
curl -s "https://archive.apache.org/dist/zookeeper/zookeeper-${zk_version}/$zk_tar.tar.gz" \
| tar xz \
&& rm -rf "$dest" \
&& mv "zookeeper-${zk_version}" "$dest" \
&& rm -f "zookeeper-${zk_version}"
&& mv "$zk_tar" "$dest" \
&& rm -f "$zk_tar"
}
function _build_distribution() {