From e17020387601cdebb7db6a0a8a3c3052aaf66064 Mon Sep 17 00:00:00 2001 From: Surekha Date: Thu, 21 Mar 2019 11:19:49 -0700 Subject: [PATCH] Consolidate kafka consumer configs (#7249) * Consolidate kafka consumer configs * change the order of adding properties * Add consumer properties to fix test it seems kafka consumer does not reveive any message without these configs * Use KafkaConsumerConfigs in integration test * Update zookeeper and kafka versions in the setup.sh for the base druid image * use version 0.2 of base druid image * Try to fix tests in KafkaRecordSupplierTest * unused import * Fix tests in KafkaSupervisorTest --- .../indexing/kafka/KafkaConsumerConfigs.java | 48 +++++++++++++++++++ .../druid/indexing/kafka/KafkaIndexTask.java | 9 +--- .../indexing/kafka/KafkaRecordSupplier.java | 10 +--- .../kafka/KafkaRecordSupplierTest.java | 6 ++- .../kafka/supervisor/KafkaSupervisorTest.java | 5 +- .../druid/indexing/kafka/test/TestBroker.java | 9 +--- .../seekablestream/common/RecordSupplier.java | 2 - .../supervisor/SeekableStreamSupervisor.java | 9 +++- integration-tests/docker-base/setup.sh | 10 ++-- integration-tests/docker/Dockerfile | 2 +- .../indexer/ITKafkaIndexingServiceTest.java | 5 +- 11 files changed, 78 insertions(+), 37 deletions(-) create mode 100644 extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java new file mode 100644 index 00000000000..b5f78694c51 --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.kafka; + +import org.apache.druid.indexing.seekablestream.utils.RandomIdUtils; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; + +import java.util.HashMap; +import java.util.Map; + +/** + * Common place to keep all kafka consumer configs + */ +public class KafkaConsumerConfigs +{ + + public static Map getConsumerProperties() + { + final Map props = new HashMap<>(); + props.put("metadata.max.age.ms", "10000"); + props.put("key.deserializer", ByteArrayDeserializer.class.getName()); + props.put("value.deserializer", ByteArrayDeserializer.class.getName()); + props.put("group.id", StringUtils.format("kafka-supervisor-%s", RandomIdUtils.getRandomId())); + props.put("auto.offset.reset", "none"); + props.put("enable.auto.commit", "false"); + props.put("isolation.level", "read_committed"); + return props; + } + +} diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java index 4fcbbe10f0c..df56cced6ec 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java @@ -96,19 +96,14 @@ public class KafkaIndexTask extends SeekableStreamIndexTask try { Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(); final Properties props = new Properties(); - KafkaRecordSupplier.addConsumerPropertiesFromConfig( props, configMapper, ioConfig.getConsumerProperties() ); - - props.setProperty("enable.auto.commit", "false"); - props.setProperty("auto.offset.reset", "none"); - props.setProperty("key.deserializer", ByteArrayDeserializer.class.getName()); - props.setProperty("value.deserializer", ByteArrayDeserializer.class.getName()); - props.setProperty("isolation.level", "read_committed"); + props.putAll(consumerConfigs); return new KafkaConsumer<>(props); } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java index be25c49eba6..fcd16737401 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java @@ -25,9 +25,7 @@ import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.indexing.seekablestream.common.StreamPartition; -import org.apache.druid.indexing.seekablestream.utils.RandomIdUtils; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.metadata.PasswordProvider; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -192,14 +190,10 @@ public class KafkaRecordSupplier implements RecordSupplier private KafkaConsumer getKafkaConsumer() { + final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(); final Properties props = new Properties(); - - props.setProperty("metadata.max.age.ms", "10000"); - props.setProperty("group.id", StringUtils.format("kafka-supervisor-%s", RandomIdUtils.getRandomId())); - addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties); - - props.setProperty("enable.auto.commit", "false"); + props.putAll(consumerConfigs); ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); try { diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java index 54b494d9e21..b5058840616 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java @@ -28,7 +28,6 @@ import org.apache.druid.indexing.kafka.test.TestBroker; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; import org.apache.druid.indexing.seekablestream.common.StreamPartition; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.segment.TestHelper; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -49,7 +48,7 @@ import java.util.stream.Collectors; public class KafkaRecordSupplierTest { - private static final Logger log = new Logger(KafkaRecordSupplierTest.class); + private static String topic = "topic"; private static long poll_timeout_millis = 1000; private static int pollRetry = 5; @@ -313,6 +312,7 @@ public class KafkaRecordSupplierTest kafkaServer.consumerProperties(), objectMapper); recordSupplier.assign(partitions); + recordSupplier.seekToEarliest(partitions); Assert.assertEquals(0L, (long) recordSupplier.getEarliestSequenceNumber(partition0)); Assert.assertEquals(0L, (long) recordSupplier.getEarliestSequenceNumber(partition1)); @@ -355,6 +355,7 @@ public class KafkaRecordSupplierTest kafkaServer.consumerProperties(), objectMapper); recordSupplier.assign(partitions); + recordSupplier.seekToEarliest(partitions); Assert.assertEquals(0L, (long) recordSupplier.getEarliestSequenceNumber(partition0)); Assert.assertEquals(0L, (long) recordSupplier.getEarliestSequenceNumber(partition1)); @@ -413,6 +414,7 @@ public class KafkaRecordSupplierTest kafkaServer.consumerProperties(), objectMapper); recordSupplier.assign(partitions); + recordSupplier.seekToEarliest(partitions); Assert.assertEquals(0L, (long) recordSupplier.getPosition(partition0)); Assert.assertEquals(0L, (long) recordSupplier.getPosition(partition1)); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index f2db280c51e..bde9052359b 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -44,6 +44,7 @@ import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; import org.apache.druid.indexing.common.task.RealtimeIndexTask; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.kafka.KafkaConsumerConfigs; import org.apache.druid.indexing.kafka.KafkaDataSourceMetadata; import org.apache.druid.indexing.kafka.KafkaIndexTask; import org.apache.druid.indexing.kafka.KafkaIndexTaskClient; @@ -104,7 +105,6 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -2780,10 +2780,9 @@ public class KafkaSupervisorTest extends EasyMockSupport String kafkaHost ) { - Map consumerProperties = new HashMap<>(); + final Map consumerProperties = KafkaConsumerConfigs.getConsumerProperties(); consumerProperties.put("myCustomKey", "myCustomValue"); consumerProperties.put("bootstrap.servers", kafkaHost); - consumerProperties.put("isolation.level", "read_committed"); KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig( topic, replicas, diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java index 10c9b2ef409..359048c2d9c 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java @@ -24,10 +24,10 @@ import com.google.common.io.Files; import kafka.server.KafkaConfig; import kafka.server.KafkaServer; import org.apache.commons.io.FileUtils; +import org.apache.druid.indexing.kafka.KafkaConsumerConfigs; import org.apache.druid.java.util.common.StringUtils; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.utils.Time; import scala.Some; @@ -123,13 +123,8 @@ public class TestBroker implements Closeable public Map consumerProperties() { - final Map props = new HashMap<>(); + final Map props = KafkaConsumerConfigs.getConsumerProperties(); props.put("bootstrap.servers", StringUtils.format("localhost:%d", getPort())); - props.put("key.deserializer", ByteArrayDeserializer.class.getName()); - props.put("value.deserializer", ByteArrayDeserializer.class.getName()); - props.put("group.id", String.valueOf(RANDOM.nextInt())); - props.put("auto.offset.reset", "earliest"); - props.put("isolation.level", "read_committed"); return props; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java index d9e599da0c8..31c343b9ac6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java @@ -40,8 +40,6 @@ public interface RecordSupplier extends Clo { /** * assigns the given partitions to this RecordSupplier - * and seek to the earliest sequence number. Previously - * assigned partitions will be replaced. * * @param partitions parititions to assign */ diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index d76cc87058e..fc94ca26171 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -2441,7 +2441,14 @@ public abstract class SeekableStreamSupervisor topicPartition = new StreamPartition<>(ioConfig.getStream(), partition); if (!recordSupplier.getAssignment().contains(topicPartition)) { - recordSupplier.assign(Collections.singleton(topicPartition)); + final Set partitions = Collections.singleton(topicPartition); + recordSupplier.assign(partitions); + try { + recordSupplier.seekToEarliest(partitions); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } } return useEarliestOffset diff --git a/integration-tests/docker-base/setup.sh b/integration-tests/docker-base/setup.sh index 9b69ca048ac..13ad92fdc9b 100644 --- a/integration-tests/docker-base/setup.sh +++ b/integration-tests/docker-base/setup.sh @@ -34,14 +34,14 @@ apt-get install -y mysql-server apt-get install -y supervisor # Zookeeper -wget -q -O - http://www.us.apache.org/dist/zookeeper/zookeeper-3.4.11/zookeeper-3.4.11.tar.gz | tar -xzf - -C /usr/local \ - && cp /usr/local/zookeeper-3.4.11/conf/zoo_sample.cfg /usr/local/zookeeper-3.4.11/conf/zoo.cfg \ - && ln -s /usr/local/zookeeper-3.4.11 /usr/local/zookeeper +wget -q -O - http://www.us.apache.org/dist/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz | tar -xzf - -C /usr/local \ + && cp /usr/local/zookeeper-3.4.13/conf/zoo_sample.cfg /usr/local/zookeeper-3.4.13/conf/zoo.cfg \ + && ln -s /usr/local/zookeeper-3.4.13 /usr/local/zookeeper # Kafka # Match the version to the Kafka client used by KafkaSupervisor -wget -q -O - http://www.us.apache.org/dist/kafka/0.10.2.2/kafka_2.12-0.10.2.2.tgz | tar -xzf - -C /usr/local \ - && ln -s /usr/local/kafka_2.12-0.10.2.2 /usr/local/kafka +wget -q -O - http://www.us.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz | tar -xzf - -C /usr/local \ + && ln -s /usr/local/kafka_2.12-2.1.0 /usr/local/kafka # Druid system user adduser --system --group --no-create-home druid \ diff --git a/integration-tests/docker/Dockerfile b/integration-tests/docker/Dockerfile index e3f3155e5f1..6c5094cf7d1 100644 --- a/integration-tests/docker/Dockerfile +++ b/integration-tests/docker/Dockerfile @@ -14,7 +14,7 @@ # limitations under the License. # Base image is built from integration-tests/docker-base in the Druid repo -FROM imply/druiditbase +FROM imply/druiditbase:0.2 RUN echo "[mysqld]\ncharacter-set-server=utf8\ncollation-server=utf8_bin\n" >> /etc/mysql/my.cnf diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java index 7447b2fdd08..d297fda766e 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java @@ -27,6 +27,7 @@ import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; import org.apache.commons.io.IOUtils; +import org.apache.druid.indexing.kafka.KafkaConsumerConfigs; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; @@ -147,7 +148,9 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest String spec; try { LOG.info("supervisorSpec name: [%s]", INDEXER_FILE); - Properties consumerProperties = new Properties(); + final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(); + final Properties consumerProperties = new Properties(); + consumerProperties.putAll(consumerConfigs); consumerProperties.put("bootstrap.servers", config.getKafkaInternalHost()); addFilteredProperties(consumerProperties);