Allow user to set group.id for Kafka ingestion task (#11147)

* allow user to set group.id for Kafka ingestion task

* fix test coverage by removing deprecated code and add doc

* fix typo

* Update docs/development/extensions-core/kafka-ingestion.md

Co-authored-by: frank chen <frankchen@apache.org>

Co-authored-by: frank chen <frankchen@apache.org>
This commit is contained in:
Yuanli Han 2021-05-09 11:56:19 +08:00 committed by GitHub
parent 2df42143ae
commit 8647040f4d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 6 additions and 48 deletions

View File

@ -40,6 +40,8 @@ This service is provided in the `druid-kafka-indexing-service` core Apache Druid
> In addition, users could set `isolation.level` `read_uncommitted` in `consumerProperties`, if don't need Druid to consume transactional topics or need Druid to consume older versions of Kafka. > In addition, users could set `isolation.level` `read_uncommitted` in `consumerProperties`, if don't need Druid to consume transactional topics or need Druid to consume older versions of Kafka.
> Make sure offsets are sequential, since there is no offset gap check in Druid anymore. > Make sure offsets are sequential, since there is no offset gap check in Druid anymore.
> If your Kafka cluster enables consumer-group based ACLs, you can set `group.id` in `consumerProperties` to override the default auto generated group id.
## Tutorial ## Tutorial
This page contains reference documentation for Apache Kafka-based ingestion. This page contains reference documentation for Apache Kafka-based ingestion.

View File

@ -19,9 +19,6 @@
package org.apache.druid.indexing.kafka; package org.apache.druid.indexing.kafka;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.java.util.common.StringUtils;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -35,7 +32,6 @@ public class KafkaConsumerConfigs
{ {
final Map<String, Object> props = new HashMap<>(); final Map<String, Object> props = new HashMap<>();
props.put("metadata.max.age.ms", "10000"); props.put("metadata.max.age.ms", "10000");
props.put("group.id", StringUtils.format("kafka-supervisor-%s", IdUtils.getRandomId()));
props.put("auto.offset.reset", "none"); props.put("auto.offset.reset", "none");
props.put("enable.auto.commit", "false"); props.put("enable.auto.commit", "false");
return props; return props;

View File

@ -30,15 +30,9 @@ import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.DataSchema;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long, KafkaRecordEntity> public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long, KafkaRecordEntity>
{ {
@ -84,44 +78,6 @@ public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long, Kafka
return pollRetryMs; return pollRetryMs;
} }
@Deprecated
KafkaConsumer<byte[], byte[]> newConsumer()
{
ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
final Properties props = new Properties();
KafkaRecordSupplier.addConsumerPropertiesFromConfig(
props,
configMapper,
ioConfig.getConsumerProperties()
);
props.putIfAbsent("isolation.level", "read_committed");
props.putAll(consumerConfigs);
return new KafkaConsumer<>(props);
}
finally {
Thread.currentThread().setContextClassLoader(currCtxCl);
}
}
@Deprecated
static void assignPartitions(
final KafkaConsumer consumer,
final String topic,
final Set<Integer> partitions
)
{
consumer.assign(
new ArrayList<>(
partitions.stream().map(n -> new TopicPartition(topic, n)).collect(Collectors.toList())
)
);
}
@Override @Override
protected SeekableStreamIndexTaskRunner<Integer, Long, KafkaRecordEntity> createTaskRunner() protected SeekableStreamIndexTaskRunner<Integer, Long, KafkaRecordEntity> createTaskRunner()
{ {

View File

@ -22,6 +22,7 @@ package org.apache.druid.indexing.kafka;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.data.input.kafka.KafkaRecordEntity; import org.apache.druid.data.input.kafka.KafkaRecordEntity;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig; import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
@ -29,6 +30,7 @@ import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamException; import org.apache.druid.indexing.seekablestream.common.StreamException;
import org.apache.druid.indexing.seekablestream.common.StreamPartition; import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.metadata.DynamicConfigProvider; import org.apache.druid.metadata.DynamicConfigProvider;
import org.apache.druid.metadata.PasswordProvider; import org.apache.druid.metadata.PasswordProvider;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
@ -255,6 +257,7 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long, KafkaR
final Properties props = new Properties(); final Properties props = new Properties();
addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties); addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties);
props.putIfAbsent("isolation.level", "read_committed"); props.putIfAbsent("isolation.level", "read_committed");
props.putIfAbsent("group.id", StringUtils.format("kafka-supervisor-%s", IdUtils.getRandomId()));
props.putAll(consumerConfigs); props.putAll(consumerConfigs);
ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();

View File

@ -22,6 +22,7 @@
500MiB 500MiB
64-bit 64-bit
ACL ACL
ACLs
APIs APIs
AvroStorage AvroStorage
AWS AWS