From 8647040f4d0b2a80ef1807d7af18230a487bac60 Mon Sep 17 00:00:00 2001 From: Yuanli Han <44718283+yuanlihan@users.noreply.github.com> Date: Sun, 9 May 2021 11:56:19 +0800 Subject: [PATCH] Allow user to set group.id for Kafka ingestion task (#11147) * allow user to set group.id for Kafka ingestion task * fix test coverage by removing deprecated code and add doc * fix typo * Update docs/development/extensions-core/kafka-ingestion.md Co-authored-by: frank chen Co-authored-by: frank chen --- .../extensions-core/kafka-ingestion.md | 2 + .../indexing/kafka/KafkaConsumerConfigs.java | 4 -- .../druid/indexing/kafka/KafkaIndexTask.java | 44 ------------------- .../indexing/kafka/KafkaRecordSupplier.java | 3 ++ website/.spelling | 1 + 5 files changed, 6 insertions(+), 48 deletions(-) diff --git a/docs/development/extensions-core/kafka-ingestion.md b/docs/development/extensions-core/kafka-ingestion.md index 0dd31677af9..e272bc1a1d5 100644 --- a/docs/development/extensions-core/kafka-ingestion.md +++ b/docs/development/extensions-core/kafka-ingestion.md @@ -40,6 +40,8 @@ This service is provided in the `druid-kafka-indexing-service` core Apache Druid > In addition, users could set `isolation.level` `read_uncommitted` in `consumerProperties`, if don't need Druid to consume transactional topics or need Druid to consume older versions of Kafka. > Make sure offsets are sequential, since there is no offset gap check in Druid anymore. +> If your Kafka cluster enables consumer-group based ACLs, you can set `group.id` in `consumerProperties` to override the default auto generated group id. + ## Tutorial This page contains reference documentation for Apache Kafka-based ingestion. diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java index 365be19cc03..9ba3123582d 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java @@ -19,9 +19,6 @@ package org.apache.druid.indexing.kafka; -import org.apache.druid.common.utils.IdUtils; -import org.apache.druid.java.util.common.StringUtils; - import java.util.HashMap; import java.util.Map; @@ -35,7 +32,6 @@ public class KafkaConsumerConfigs { final Map props = new HashMap<>(); props.put("metadata.max.age.ms", "10000"); - props.put("group.id", StringUtils.format("kafka-supervisor-%s", IdUtils.getRandomId())); props.put("auto.offset.reset", "none"); props.put("enable.auto.commit", "false"); return props; diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java index bd98510027d..623379aae55 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java @@ -30,15 +30,9 @@ import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; import org.apache.druid.segment.indexing.DataSchema; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.TopicPartition; -import java.util.ArrayList; import java.util.HashMap; import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.stream.Collectors; public class KafkaIndexTask extends SeekableStreamIndexTask { @@ -84,44 +78,6 @@ public class KafkaIndexTask extends SeekableStreamIndexTask newConsumer() - { - ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); - try { - Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); - - final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(); - final Properties props = new Properties(); - KafkaRecordSupplier.addConsumerPropertiesFromConfig( - props, - configMapper, - ioConfig.getConsumerProperties() - ); - props.putIfAbsent("isolation.level", "read_committed"); - props.putAll(consumerConfigs); - - return new KafkaConsumer<>(props); - } - finally { - Thread.currentThread().setContextClassLoader(currCtxCl); - } - } - - @Deprecated - static void assignPartitions( - final KafkaConsumer consumer, - final String topic, - final Set partitions - ) - { - consumer.assign( - new ArrayList<>( - partitions.stream().map(n -> new TopicPartition(topic, n)).collect(Collectors.toList()) - ) - ); - } - @Override protected SeekableStreamIndexTaskRunner createTaskRunner() { diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java index 1397cddf22d..fe32ffe5ffb 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java @@ -22,6 +22,7 @@ package org.apache.druid.indexing.kafka; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; +import org.apache.druid.common.utils.IdUtils; import org.apache.druid.data.input.kafka.KafkaRecordEntity; import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; @@ -29,6 +30,7 @@ import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.indexing.seekablestream.common.StreamException; import org.apache.druid.indexing.seekablestream.common.StreamPartition; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.metadata.DynamicConfigProvider; import org.apache.druid.metadata.PasswordProvider; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -255,6 +257,7 @@ public class KafkaRecordSupplier implements RecordSupplier