Add KafkaConfigOverrides extension point (#13122)

* Add KafkaConfigOverrides extension point

* X
This commit is contained in:
Jonathan Wei 2022-09-21 01:17:19 -05:00 committed by GitHub
parent 90d14f629a
commit 331e6d707b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 257 additions and 80 deletions

View File

@ -94,12 +94,12 @@ public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long, Kafka
ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
try { try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
KafkaIndexTaskIOConfig kafkaIndexTaskIOConfig = (KafkaIndexTaskIOConfig) super.ioConfig;
final Map<String, Object> props = new HashMap<>(((KafkaIndexTaskIOConfig) super.ioConfig).getConsumerProperties()); final Map<String, Object> props = new HashMap<>(kafkaIndexTaskIOConfig.getConsumerProperties());
props.put("auto.offset.reset", "none"); props.put("auto.offset.reset", "none");
return new KafkaRecordSupplier(props, configMapper); return new KafkaRecordSupplier(props, configMapper, kafkaIndexTaskIOConfig.getConfigOverrides());
} }
finally { finally {
Thread.currentThread().setContextClassLoader(currCtxCl); Thread.currentThread().setContextClassLoader(currCtxCl);

View File

@ -27,6 +27,7 @@ import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.extension.KafkaConfigOverrides;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -36,6 +37,7 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
{ {
private final Map<String, Object> consumerProperties; private final Map<String, Object> consumerProperties;
private final long pollTimeout; private final long pollTimeout;
private final KafkaConfigOverrides configOverrides;
@JsonCreator @JsonCreator
public KafkaIndexTaskIOConfig( public KafkaIndexTaskIOConfig(
@ -56,7 +58,8 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
@JsonProperty("useTransaction") Boolean useTransaction, @JsonProperty("useTransaction") Boolean useTransaction,
@JsonProperty("minimumMessageTime") DateTime minimumMessageTime, @JsonProperty("minimumMessageTime") DateTime minimumMessageTime,
@JsonProperty("maximumMessageTime") DateTime maximumMessageTime, @JsonProperty("maximumMessageTime") DateTime maximumMessageTime,
@JsonProperty("inputFormat") @Nullable InputFormat inputFormat @JsonProperty("inputFormat") @Nullable InputFormat inputFormat,
@JsonProperty("configOverrides") @Nullable KafkaConfigOverrides configOverrides
) )
{ {
super( super(
@ -74,13 +77,14 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties"); this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties");
this.pollTimeout = pollTimeout != null ? pollTimeout : KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS; this.pollTimeout = pollTimeout != null ? pollTimeout : KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS;
this.configOverrides = configOverrides;
final SeekableStreamEndSequenceNumbers<Integer, Long> myEndSequenceNumbers = getEndSequenceNumbers(); final SeekableStreamEndSequenceNumbers<Integer, Long> myEndSequenceNumbers = getEndSequenceNumbers();
for (int partition : myEndSequenceNumbers.getPartitionSequenceNumberMap().keySet()) { for (int partition : myEndSequenceNumbers.getPartitionSequenceNumberMap().keySet()) {
Preconditions.checkArgument( Preconditions.checkArgument(
myEndSequenceNumbers.getPartitionSequenceNumberMap() myEndSequenceNumbers.getPartitionSequenceNumberMap()
.get(partition) .get(partition)
.compareTo(getStartSequenceNumbers().getPartitionSequenceNumberMap().get(partition)) >= 0, .compareTo(getStartSequenceNumbers().getPartitionSequenceNumberMap().get(partition)) >= 0,
"end offset must be >= start offset for partition[%s]", "end offset must be >= start offset for partition[%s]",
partition partition
); );
@ -97,7 +101,8 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
Boolean useTransaction, Boolean useTransaction,
DateTime minimumMessageTime, DateTime minimumMessageTime,
DateTime maximumMessageTime, DateTime maximumMessageTime,
InputFormat inputFormat InputFormat inputFormat,
KafkaConfigOverrides configOverrides
) )
{ {
this( this(
@ -112,7 +117,8 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
useTransaction, useTransaction,
minimumMessageTime, minimumMessageTime,
maximumMessageTime, maximumMessageTime,
inputFormat inputFormat,
configOverrides
); );
} }
@ -156,6 +162,12 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
return pollTimeout; return pollTimeout;
} }
@JsonProperty
public KafkaConfigOverrides getConfigOverrides()
{
return configOverrides;
}
@Override @Override
public String toString() public String toString()
{ {
@ -169,6 +181,7 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
", useTransaction=" + isUseTransaction() + ", useTransaction=" + isUseTransaction() +
", minimumMessageTime=" + getMinimumMessageTime() + ", minimumMessageTime=" + getMinimumMessageTime() +
", maximumMessageTime=" + getMaximumMessageTime() + ", maximumMessageTime=" + getMaximumMessageTime() +
", configOverrides=" + getConfigOverrides() +
'}'; '}';
} }
} }

View File

@ -30,6 +30,7 @@ import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamException; import org.apache.druid.indexing.seekablestream.common.StreamException;
import org.apache.druid.indexing.seekablestream.common.StreamPartition; import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.indexing.seekablestream.extension.KafkaConfigOverrides;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.metadata.DynamicConfigProvider; import org.apache.druid.metadata.DynamicConfigProvider;
@ -63,10 +64,11 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long, KafkaR
public KafkaRecordSupplier( public KafkaRecordSupplier(
Map<String, Object> consumerProperties, Map<String, Object> consumerProperties,
ObjectMapper sortingMapper ObjectMapper sortingMapper,
KafkaConfigOverrides configOverrides
) )
{ {
this(getKafkaConsumer(sortingMapper, consumerProperties)); this(getKafkaConsumer(sortingMapper, consumerProperties, configOverrides));
} }
@VisibleForTesting @VisibleForTesting
@ -228,7 +230,6 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long, KafkaR
if (dynamicConfigProviderJson != null) { if (dynamicConfigProviderJson != null) {
DynamicConfigProvider dynamicConfigProvider = configMapper.convertValue(dynamicConfigProviderJson, DynamicConfigProvider.class); DynamicConfigProvider dynamicConfigProvider = configMapper.convertValue(dynamicConfigProviderJson, DynamicConfigProvider.class);
Map<String, String> dynamicConfig = dynamicConfigProvider.getConfig(); Map<String, String> dynamicConfig = dynamicConfigProvider.getConfig();
for (Map.Entry<String, String> e : dynamicConfig.entrySet()) { for (Map.Entry<String, String> e : dynamicConfig.entrySet()) {
properties.setProperty(e.getKey(), e.getValue()); properties.setProperty(e.getKey(), e.getValue());
} }
@ -268,11 +269,25 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long, KafkaR
return deserializerObject; return deserializerObject;
} }
private static KafkaConsumer<byte[], byte[]> getKafkaConsumer(ObjectMapper sortingMapper, Map<String, Object> consumerProperties) public static KafkaConsumer<byte[], byte[]> getKafkaConsumer(
ObjectMapper sortingMapper,
Map<String, Object> consumerProperties,
KafkaConfigOverrides configOverrides
)
{ {
final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(); final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
final Properties props = new Properties(); final Properties props = new Properties();
addConsumerPropertiesFromConfig(props, sortingMapper, consumerProperties); Map<String, Object> effectiveConsumerProperties;
if (configOverrides != null) {
effectiveConsumerProperties = configOverrides.overrideConfigs(consumerProperties);
} else {
effectiveConsumerProperties = consumerProperties;
}
addConsumerPropertiesFromConfig(
props,
sortingMapper,
effectiveConsumerProperties
);
props.putIfAbsent("isolation.level", "read_committed"); props.putIfAbsent("isolation.level", "read_committed");
props.putIfAbsent("group.id", StringUtils.format("kafka-supervisor-%s", IdUtils.getRandomId())); props.putIfAbsent("group.id", StringUtils.format("kafka-supervisor-%s", IdUtils.getRandomId()));
props.putAll(consumerConfigs); props.putAll(consumerConfigs);

View File

@ -63,7 +63,7 @@ public class KafkaSamplerSpec extends SeekableStreamSamplerSpec
props.put("auto.offset.reset", "none"); props.put("auto.offset.reset", "none");
props.put("request.timeout.ms", Integer.toString(samplerConfig.getTimeoutMs())); props.put("request.timeout.ms", Integer.toString(samplerConfig.getTimeoutMs()));
return new KafkaRecordSupplier(props, objectMapper); return new KafkaRecordSupplier(props, objectMapper, ((KafkaSupervisorIOConfig) ioConfig).getConfigOverrides());
} }
finally { finally {
Thread.currentThread().setContextClassLoader(currCtxCl); Thread.currentThread().setContextClassLoader(currCtxCl);

View File

@ -126,7 +126,11 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long, Kaf
@Override @Override
protected RecordSupplier<Integer, Long, KafkaRecordEntity> setupRecordSupplier() protected RecordSupplier<Integer, Long, KafkaRecordEntity> setupRecordSupplier()
{ {
return new KafkaRecordSupplier(spec.getIoConfig().getConsumerProperties(), sortingMapper); return new KafkaRecordSupplier(
spec.getIoConfig().getConsumerProperties(),
sortingMapper,
spec.getIoConfig().getConfigOverrides()
);
} }
@Override @Override
@ -197,7 +201,8 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long, Kaf
true, true,
minimumMessageTime, minimumMessageTime,
maximumMessageTime, maximumMessageTime,
ioConfig.getInputFormat() ioConfig.getInputFormat(),
kafkaIoConfig.getConfigOverrides()
); );
} }

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputFormat;
import org.apache.druid.indexing.seekablestream.extension.KafkaConfigOverrides;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig; import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
@ -43,7 +44,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
private final Map<String, Object> consumerProperties; private final Map<String, Object> consumerProperties;
private final long pollTimeout; private final long pollTimeout;
private final KafkaConfigOverrides configOverrides;
@JsonCreator @JsonCreator
public KafkaSupervisorIOConfig( public KafkaSupervisorIOConfig(
@ -61,7 +62,8 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
@JsonProperty("completionTimeout") Period completionTimeout, @JsonProperty("completionTimeout") Period completionTimeout,
@JsonProperty("lateMessageRejectionPeriod") Period lateMessageRejectionPeriod, @JsonProperty("lateMessageRejectionPeriod") Period lateMessageRejectionPeriod,
@JsonProperty("earlyMessageRejectionPeriod") Period earlyMessageRejectionPeriod, @JsonProperty("earlyMessageRejectionPeriod") Period earlyMessageRejectionPeriod,
@JsonProperty("lateMessageRejectionStartDateTime") DateTime lateMessageRejectionStartDateTime @JsonProperty("lateMessageRejectionStartDateTime") DateTime lateMessageRejectionStartDateTime,
@JsonProperty("configOverrides") KafkaConfigOverrides configOverrides
) )
{ {
super( super(
@ -86,6 +88,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
StringUtils.format("consumerProperties must contain entry for [%s]", BOOTSTRAP_SERVERS_KEY) StringUtils.format("consumerProperties must contain entry for [%s]", BOOTSTRAP_SERVERS_KEY)
); );
this.pollTimeout = pollTimeout != null ? pollTimeout : DEFAULT_POLL_TIMEOUT_MILLIS; this.pollTimeout = pollTimeout != null ? pollTimeout : DEFAULT_POLL_TIMEOUT_MILLIS;
this.configOverrides = configOverrides;
} }
@JsonProperty @JsonProperty
@ -112,6 +115,12 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
return isUseEarliestSequenceNumber(); return isUseEarliestSequenceNumber();
} }
@JsonProperty
public KafkaConfigOverrides getConfigOverrides()
{
return configOverrides;
}
@Override @Override
public String toString() public String toString()
{ {
@ -130,6 +139,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
", earlyMessageRejectionPeriod=" + getEarlyMessageRejectionPeriod() + ", earlyMessageRejectionPeriod=" + getEarlyMessageRejectionPeriod() +
", lateMessageRejectionPeriod=" + getLateMessageRejectionPeriod() + ", lateMessageRejectionPeriod=" + getLateMessageRejectionPeriod() +
", lateMessageRejectionStartDateTime=" + getLateMessageRejectionStartDateTime() + ", lateMessageRejectionStartDateTime=" + getLateMessageRejectionStartDateTime() +
", configOverrides=" + getConfigOverrides() +
'}'; '}';
} }

View File

@ -317,6 +317,7 @@ public class KafkaIOConfigTest
true, true,
DateTimes.nowUtc(), DateTimes.nowUtc(),
DateTimes.nowUtc(), DateTimes.nowUtc(),
null,
null null
); );
final byte[] json = mapper.writeValueAsBytes(currentConfig); final byte[] json = mapper.writeValueAsBytes(currentConfig);

View File

@ -404,7 +404,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
Assert.assertTrue(task.supportsQueries()); Assert.assertTrue(task.supportsQueries());
@ -463,7 +464,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
final ListenableFuture<TaskStatus> future = runTask(task); final ListenableFuture<TaskStatus> future = runTask(task);
@ -509,7 +511,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
task.addToContext(Tasks.STORE_EMPTY_COLUMNS_KEY, false); task.addToContext(Tasks.STORE_EMPTY_COLUMNS_KEY, false);
@ -543,6 +546,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
null,
null null
) )
); );
@ -587,7 +591,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
@ -641,7 +646,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
final ListenableFuture<TaskStatus> future = runTask(task); final ListenableFuture<TaskStatus> future = runTask(task);
@ -719,7 +725,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
final ListenableFuture<TaskStatus> future = runTask(task); final ListenableFuture<TaskStatus> future = runTask(task);
@ -823,7 +830,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
final ListenableFuture<TaskStatus> future = runTask(task); final ListenableFuture<TaskStatus> future = runTask(task);
@ -950,7 +958,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
final ListenableFuture<TaskStatus> future = runTask(task); final ListenableFuture<TaskStatus> future = runTask(task);
@ -1036,7 +1045,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
final KafkaIndexTask staleReplica = createTask( final KafkaIndexTask staleReplica = createTask(
@ -1051,7 +1061,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
@ -1119,7 +1130,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
DateTimes.of("2010"), DateTimes.of("2010"),
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
@ -1170,7 +1182,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
DateTimes.of("2010"), DateTimes.of("2010"),
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
@ -1230,7 +1243,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
@ -1305,7 +1319,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
new TestKafkaInputFormat(INPUT_FORMAT) new TestKafkaInputFormat(INPUT_FORMAT),
null
) )
); );
Assert.assertTrue(task.supportsQueries()); Assert.assertTrue(task.supportsQueries());
@ -1381,7 +1396,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
KAFKA_INPUT_FORMAT KAFKA_INPUT_FORMAT,
null
) )
); );
Assert.assertTrue(task.supportsQueries()); Assert.assertTrue(task.supportsQueries());
@ -1436,7 +1452,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
@ -1474,7 +1491,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
@ -1523,7 +1541,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
@ -1577,7 +1596,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
@ -1618,7 +1638,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
@ -1712,7 +1733,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
@ -1786,7 +1808,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
final KafkaIndexTask task2 = createTask( final KafkaIndexTask task2 = createTask(
@ -1801,7 +1824,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
@ -1852,7 +1876,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
final KafkaIndexTask task2 = createTask( final KafkaIndexTask task2 = createTask(
@ -1867,7 +1892,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
@ -1920,7 +1946,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
false, false,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
final KafkaIndexTask task2 = createTask( final KafkaIndexTask task2 = createTask(
@ -1935,7 +1962,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
false, false,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
@ -1986,7 +2014,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
@ -2034,7 +2063,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
final KafkaIndexTask task2 = createTask( final KafkaIndexTask task2 = createTask(
@ -2049,7 +2079,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
@ -2103,7 +2134,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
@ -2144,7 +2176,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
@ -2206,7 +2239,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
@ -2255,7 +2289,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
@ -2317,7 +2352,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
@ -2407,7 +2443,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
@ -2443,7 +2480,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
@ -2489,7 +2527,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
), ),
context context
); );
@ -2536,7 +2575,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
@ -2574,7 +2614,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
@ -2685,7 +2726,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
@ -2744,7 +2786,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
final ListenableFuture<TaskStatus> future = runTask(task); final ListenableFuture<TaskStatus> future = runTask(task);
@ -2766,7 +2809,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
@ -2815,7 +2859,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );
@ -3207,7 +3252,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true, true,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
) )
); );

View File

@ -34,8 +34,11 @@ import org.apache.druid.metadata.DynamicConfigProvider;
import org.apache.druid.metadata.MapStringDynamicConfigProvider; import org.apache.druid.metadata.MapStringDynamicConfigProvider;
import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.TestHelper;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Deserializer;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
@ -227,7 +230,7 @@ public class KafkaRecordSupplierTest
); );
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
kafkaServer.consumerProperties(), OBJECT_MAPPER); kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); Assert.assertTrue(recordSupplier.getAssignment().isEmpty());
@ -257,7 +260,8 @@ public class KafkaRecordSupplierTest
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
properties, properties,
OBJECT_MAPPER OBJECT_MAPPER,
null
); );
Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); Assert.assertTrue(recordSupplier.getAssignment().isEmpty());
@ -282,7 +286,8 @@ public class KafkaRecordSupplierTest
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
properties, properties,
OBJECT_MAPPER OBJECT_MAPPER,
null
); );
Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); //just test recordSupplier is initiated Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); //just test recordSupplier is initiated
@ -299,7 +304,8 @@ public class KafkaRecordSupplierTest
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
properties, properties,
OBJECT_MAPPER OBJECT_MAPPER,
null
); );
Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); //just test recordSupplier is initiated Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); //just test recordSupplier is initiated
@ -324,7 +330,8 @@ public class KafkaRecordSupplierTest
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
properties, properties,
OBJECT_MAPPER OBJECT_MAPPER,
null
); );
recordSupplier.assign(partitions); recordSupplier.assign(partitions);
@ -358,7 +365,10 @@ public class KafkaRecordSupplierTest
); );
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
kafkaServer.consumerProperties(), OBJECT_MAPPER); kafkaServer.consumerProperties(),
OBJECT_MAPPER,
null
);
recordSupplier.assign(partitions); recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions); recordSupplier.seekToEarliest(partitions);
@ -399,7 +409,7 @@ public class KafkaRecordSupplierTest
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
kafkaServer.consumerProperties(), OBJECT_MAPPER); kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
recordSupplier.assign(partitions); recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions); recordSupplier.seekToEarliest(partitions);
@ -470,7 +480,7 @@ public class KafkaRecordSupplierTest
); );
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
kafkaServer.consumerProperties(), OBJECT_MAPPER); kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
recordSupplier.assign(partitions); recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions); recordSupplier.seekToEarliest(partitions);
@ -513,7 +523,7 @@ public class KafkaRecordSupplierTest
); );
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
kafkaServer.consumerProperties(), OBJECT_MAPPER); kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
recordSupplier.assign(partitions); recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions); recordSupplier.seekToEarliest(partitions);
@ -546,7 +556,7 @@ public class KafkaRecordSupplierTest
); );
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
kafkaServer.consumerProperties(), OBJECT_MAPPER); kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
recordSupplier.assign(partitions); recordSupplier.assign(partitions);
@ -572,7 +582,7 @@ public class KafkaRecordSupplierTest
); );
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
kafkaServer.consumerProperties(), OBJECT_MAPPER); kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
recordSupplier.assign(partitions); recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions); recordSupplier.seekToEarliest(partitions);
@ -607,7 +617,7 @@ public class KafkaRecordSupplierTest
public void getLatestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetShouldReturnsValidNonNull() public void getLatestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetShouldReturnsValidNonNull()
{ {
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
kafkaServer.consumerProperties(), OBJECT_MAPPER); kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
StreamPartition<Integer> streamPartition = StreamPartition.of(topic, 0); StreamPartition<Integer> streamPartition = StreamPartition.of(topic, 0);
Set<StreamPartition<Integer>> partitions = ImmutableSet.of(streamPartition); Set<StreamPartition<Integer>> partitions = ImmutableSet.of(streamPartition);
recordSupplier.assign(partitions); recordSupplier.assign(partitions);
@ -619,7 +629,7 @@ public class KafkaRecordSupplierTest
public void getEarliestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetShouldReturnsValidNonNull() public void getEarliestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetShouldReturnsValidNonNull()
{ {
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
kafkaServer.consumerProperties(), OBJECT_MAPPER); kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
StreamPartition<Integer> streamPartition = StreamPartition.of(topic, 0); StreamPartition<Integer> streamPartition = StreamPartition.of(topic, 0);
Set<StreamPartition<Integer>> partitions = ImmutableSet.of(streamPartition); Set<StreamPartition<Integer>> partitions = ImmutableSet.of(streamPartition);
recordSupplier.assign(partitions); recordSupplier.assign(partitions);
@ -631,7 +641,7 @@ public class KafkaRecordSupplierTest
public void getLatestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShouldReturnsValidNonNull() public void getLatestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShouldReturnsValidNonNull()
{ {
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
kafkaServer.consumerProperties(), OBJECT_MAPPER); kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
StreamPartition<Integer> streamPartition = StreamPartition.of(topic, 0); StreamPartition<Integer> streamPartition = StreamPartition.of(topic, 0);
Set<StreamPartition<Integer>> partitions = ImmutableSet.of(streamPartition); Set<StreamPartition<Integer>> partitions = ImmutableSet.of(streamPartition);
recordSupplier.assign(partitions); recordSupplier.assign(partitions);
@ -643,7 +653,7 @@ public class KafkaRecordSupplierTest
public void getEarliestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShouldReturnsValidNonNull() public void getEarliestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShouldReturnsValidNonNull()
{ {
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
kafkaServer.consumerProperties(), OBJECT_MAPPER); kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
StreamPartition<Integer> streamPartition = StreamPartition.of(topic, 0); StreamPartition<Integer> streamPartition = StreamPartition.of(topic, 0);
Set<StreamPartition<Integer>> partitions = ImmutableSet.of(streamPartition); Set<StreamPartition<Integer>> partitions = ImmutableSet.of(streamPartition);
recordSupplier.assign(partitions); recordSupplier.assign(partitions);
@ -678,6 +688,27 @@ public class KafkaRecordSupplierTest
Assert.assertEquals("pwd2", properties.getProperty(KafkaSupervisorIOConfig.TRUST_STORE_PASSWORD_KEY)); Assert.assertEquals("pwd2", properties.getProperty(KafkaSupervisorIOConfig.TRUST_STORE_PASSWORD_KEY));
} }
@Test
public void testUseKafkaConsumerOverrides()
{
KafkaConsumer<byte[], byte[]> kafkaConsumer = KafkaRecordSupplier.getKafkaConsumer(
OBJECT_MAPPER,
kafkaServer.consumerProperties(),
originalConsumerProperties -> {
final Map<String, Object> newMap = new HashMap<>(originalConsumerProperties);
newMap.put("client.id", "overrideConfigTest");
return newMap;
}
);
// We set a client ID via config override, it should appear in the metric name tags
Map<MetricName, KafkaMetric> metrics = (Map<MetricName, KafkaMetric>) kafkaConsumer.metrics();
for (MetricName metricName : metrics.keySet()) {
Assert.assertEquals("overrideConfigTest", metricName.tags().get("client-id"));
break;
}
}
private void insertData() throws ExecutionException, InterruptedException private void insertData() throws ExecutionException, InterruptedException
{ {
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) { try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {

View File

@ -143,6 +143,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
null, null,
null, null,
null, null,
null,
null null
), ),
null, null,
@ -318,6 +319,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
null, null,
null, null,
null, null,
null,
null null
), ),
null, null,

View File

@ -304,6 +304,7 @@ public class KafkaSupervisorIOConfigTest
new Period("PT30M"), new Period("PT30M"),
null, null,
null, null,
null,
null null
); );
String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig); String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig);

View File

@ -297,6 +297,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
new Period("PT30M"), new Period("PT30M"),
null, null,
null, null,
null,
null null
); );
@ -447,7 +448,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
null, null,
null, null,
null, null,
INPUT_FORMAT INPUT_FORMAT,
null
), ),
new KafkaIndexTaskTuningConfig( new KafkaIndexTaskTuningConfig(
null, null,
@ -3649,6 +3651,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
new Period("PT30M"), new Period("PT30M"),
lateMessageRejectionPeriod, lateMessageRejectionPeriod,
earlyMessageRejectionPeriod, earlyMessageRejectionPeriod,
null,
null null
); );
@ -3761,6 +3764,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
new Period("PT30M"), new Period("PT30M"),
lateMessageRejectionPeriod, lateMessageRejectionPeriod,
earlyMessageRejectionPeriod, earlyMessageRejectionPeriod,
null,
null null
); );
@ -3877,6 +3881,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
new Period("PT30M"), new Period("PT30M"),
lateMessageRejectionPeriod, lateMessageRejectionPeriod,
earlyMessageRejectionPeriod, earlyMessageRejectionPeriod,
null,
null null
); );
@ -4020,7 +4025,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
true, true,
minimumMessageTime, minimumMessageTime,
maximumMessageTime, maximumMessageTime,
INPUT_FORMAT INPUT_FORMAT,
null
), ),
Collections.emptyMap(), Collections.emptyMap(),
OBJECT_MAPPER OBJECT_MAPPER

View File

@ -53,6 +53,8 @@ public class Tasks
public static final String FORCE_TIME_CHUNK_LOCK_KEY = "forceTimeChunkLock"; public static final String FORCE_TIME_CHUNK_LOCK_KEY = "forceTimeChunkLock";
public static final String USE_SHARED_LOCK = "useSharedLock"; public static final String USE_SHARED_LOCK = "useSharedLock";
public static final String STORE_EMPTY_COLUMNS_KEY = "storeEmptyColumns"; public static final String STORE_EMPTY_COLUMNS_KEY = "storeEmptyColumns";
public static final String DYNAMIC_CONFIG_PROVIDER_KEY = "dynamicConfigProviderKey";
/** /**
* Context flag denoting if maximum possible values should be used to estimate * Context flag denoting if maximum possible values should be used to estimate

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.seekablestream.extension;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.guice.annotations.ExtensionPoint;
import java.util.Map;
/**
* This is used to allow extensions to make adjustments to the Kafka consumer properties.
*
* This interface is only used by the druid-kafka-indexing-service extension, but the interface definition must
* be placed in a non-extension module in order for other extensions to be able to provide subtypes visible within
* the druid-kafka-indexing-service extension.
*/
@ExtensionPoint
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
public interface KafkaConfigOverrides
{
/**
* Given a map of Kafka consumer properties, return a new potentially adjusted map of properties.
*
* @param originalConsumerProperties Kafka consumer properties
* @return Adjusted copy of Kafka consumer properties
*/
Map<String, Object> overrideConfigs(Map<String, Object> originalConsumerProperties);
}