Consolidate kafka consumer configs (#7249)

* Consolidate kafka consumer configs

* change the order of adding properties

* Add consumer properties to fix test

it seems kafka consumer does not reveive any message without these configs

* Use KafkaConsumerConfigs in integration test

* Update zookeeper and kafka versions in the setup.sh for the base druid image

*  use version 0.2 of base druid image

* Try to fix tests in KafkaRecordSupplierTest

* unused import

* Fix tests in KafkaSupervisorTest
This commit is contained in:
Surekha 2019-03-21 11:19:49 -07:00 committed by Jihoon Son
parent 494c1a2ef8
commit e170203876
11 changed files with 78 additions and 37 deletions

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.kafka;
import org.apache.druid.indexing.seekablestream.utils.RandomIdUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import java.util.HashMap;
import java.util.Map;
/**
* Common place to keep all kafka consumer configs
*/
public class KafkaConsumerConfigs
{
public static Map<String, Object> getConsumerProperties()
{
final Map<String, Object> props = new HashMap<>();
props.put("metadata.max.age.ms", "10000");
props.put("key.deserializer", ByteArrayDeserializer.class.getName());
props.put("value.deserializer", ByteArrayDeserializer.class.getName());
props.put("group.id", StringUtils.format("kafka-supervisor-%s", RandomIdUtils.getRandomId()));
props.put("auto.offset.reset", "none");
props.put("enable.auto.commit", "false");
props.put("isolation.level", "read_committed");
return props;
}
}

View File

@ -96,19 +96,14 @@ public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long>
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
final Properties props = new Properties();
KafkaRecordSupplier.addConsumerPropertiesFromConfig(
props,
configMapper,
ioConfig.getConsumerProperties()
);
props.setProperty("enable.auto.commit", "false");
props.setProperty("auto.offset.reset", "none");
props.setProperty("key.deserializer", ByteArrayDeserializer.class.getName());
props.setProperty("value.deserializer", ByteArrayDeserializer.class.getName());
props.setProperty("isolation.level", "read_committed");
props.putAll(consumerConfigs);
return new KafkaConsumer<>(props);
}

View File

@ -25,9 +25,7 @@ import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.indexing.seekablestream.utils.RandomIdUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.metadata.PasswordProvider;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@ -192,14 +190,10 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long>
private KafkaConsumer<byte[], byte[]> getKafkaConsumer()
{
final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
final Properties props = new Properties();
props.setProperty("metadata.max.age.ms", "10000");
props.setProperty("group.id", StringUtils.format("kafka-supervisor-%s", RandomIdUtils.getRandomId()));
addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties);
props.setProperty("enable.auto.commit", "false");
props.putAll(consumerConfigs);
ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
try {

View File

@ -28,7 +28,6 @@ import org.apache.druid.indexing.kafka.test.TestBroker;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.TestHelper;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
@ -49,7 +48,7 @@ import java.util.stream.Collectors;
public class KafkaRecordSupplierTest
{
private static final Logger log = new Logger(KafkaRecordSupplierTest.class);
private static String topic = "topic";
private static long poll_timeout_millis = 1000;
private static int pollRetry = 5;
@ -313,6 +312,7 @@ public class KafkaRecordSupplierTest
kafkaServer.consumerProperties(), objectMapper);
recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions);
Assert.assertEquals(0L, (long) recordSupplier.getEarliestSequenceNumber(partition0));
Assert.assertEquals(0L, (long) recordSupplier.getEarliestSequenceNumber(partition1));
@ -355,6 +355,7 @@ public class KafkaRecordSupplierTest
kafkaServer.consumerProperties(), objectMapper);
recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions);
Assert.assertEquals(0L, (long) recordSupplier.getEarliestSequenceNumber(partition0));
Assert.assertEquals(0L, (long) recordSupplier.getEarliestSequenceNumber(partition1));
@ -413,6 +414,7 @@ public class KafkaRecordSupplierTest
kafkaServer.consumerProperties(), objectMapper);
recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions);
Assert.assertEquals(0L, (long) recordSupplier.getPosition(partition0));
Assert.assertEquals(0L, (long) recordSupplier.getPosition(partition1));

View File

@ -44,6 +44,7 @@ import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.RealtimeIndexTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
import org.apache.druid.indexing.kafka.KafkaDataSourceMetadata;
import org.apache.druid.indexing.kafka.KafkaIndexTask;
import org.apache.druid.indexing.kafka.KafkaIndexTaskClient;
@ -104,7 +105,6 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@ -2780,10 +2780,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
String kafkaHost
)
{
Map<String, Object> consumerProperties = new HashMap<>();
final Map<String, Object> consumerProperties = KafkaConsumerConfigs.getConsumerProperties();
consumerProperties.put("myCustomKey", "myCustomValue");
consumerProperties.put("bootstrap.servers", kafkaHost);
consumerProperties.put("isolation.level", "read_committed");
KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig(
topic,
replicas,

View File

@ -24,10 +24,10 @@ import com.google.common.io.Files;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import org.apache.commons.io.FileUtils;
import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.Time;
import scala.Some;
@ -123,13 +123,8 @@ public class TestBroker implements Closeable
public Map<String, Object> consumerProperties()
{
final Map<String, Object> props = new HashMap<>();
final Map<String, Object> props = KafkaConsumerConfigs.getConsumerProperties();
props.put("bootstrap.servers", StringUtils.format("localhost:%d", getPort()));
props.put("key.deserializer", ByteArrayDeserializer.class.getName());
props.put("value.deserializer", ByteArrayDeserializer.class.getName());
props.put("group.id", String.valueOf(RANDOM.nextInt()));
props.put("auto.offset.reset", "earliest");
props.put("isolation.level", "read_committed");
return props;
}

View File

@ -40,8 +40,6 @@ public interface RecordSupplier<PartitionIdType, SequenceOffsetType> extends Clo
{
/**
* assigns the given partitions to this RecordSupplier
* and seek to the earliest sequence number. Previously
* assigned partitions will be replaced.
*
* @param partitions parititions to assign
*/

View File

@ -2441,7 +2441,14 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
synchronized (recordSupplierLock) {
StreamPartition<PartitionIdType> topicPartition = new StreamPartition<>(ioConfig.getStream(), partition);
if (!recordSupplier.getAssignment().contains(topicPartition)) {
recordSupplier.assign(Collections.singleton(topicPartition));
final Set partitions = Collections.singleton(topicPartition);
recordSupplier.assign(partitions);
try {
recordSupplier.seekToEarliest(partitions);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return useEarliestOffset

View File

@ -34,14 +34,14 @@ apt-get install -y mysql-server
apt-get install -y supervisor
# Zookeeper
wget -q -O - http://www.us.apache.org/dist/zookeeper/zookeeper-3.4.11/zookeeper-3.4.11.tar.gz | tar -xzf - -C /usr/local \
&& cp /usr/local/zookeeper-3.4.11/conf/zoo_sample.cfg /usr/local/zookeeper-3.4.11/conf/zoo.cfg \
&& ln -s /usr/local/zookeeper-3.4.11 /usr/local/zookeeper
wget -q -O - http://www.us.apache.org/dist/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz | tar -xzf - -C /usr/local \
&& cp /usr/local/zookeeper-3.4.13/conf/zoo_sample.cfg /usr/local/zookeeper-3.4.13/conf/zoo.cfg \
&& ln -s /usr/local/zookeeper-3.4.13 /usr/local/zookeeper
# Kafka
# Match the version to the Kafka client used by KafkaSupervisor
wget -q -O - http://www.us.apache.org/dist/kafka/0.10.2.2/kafka_2.12-0.10.2.2.tgz | tar -xzf - -C /usr/local \
&& ln -s /usr/local/kafka_2.12-0.10.2.2 /usr/local/kafka
wget -q -O - http://www.us.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz | tar -xzf - -C /usr/local \
&& ln -s /usr/local/kafka_2.12-2.1.0 /usr/local/kafka
# Druid system user
adduser --system --group --no-create-home druid \

View File

@ -14,7 +14,7 @@
# limitations under the License.
# Base image is built from integration-tests/docker-base in the Druid repo
FROM imply/druiditbase
FROM imply/druiditbase:0.2
RUN echo "[mysqld]\ncharacter-set-server=utf8\ncollation-server=utf8_bin\n" >> /etc/mysql/my.cnf

View File

@ -27,6 +27,7 @@ import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.commons.io.IOUtils;
import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
@ -147,7 +148,9 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
String spec;
try {
LOG.info("supervisorSpec name: [%s]", INDEXER_FILE);
Properties consumerProperties = new Properties();
final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
final Properties consumerProperties = new Properties();
consumerProperties.putAll(consumerConfigs);
consumerProperties.put("bootstrap.servers", config.getKafkaInternalHost());
addFilteredProperties(consumerProperties);