diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java index e24072e5a99..700d2d12d91 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java @@ -94,12 +94,12 @@ public class KafkaIndexTask extends SeekableStreamIndexTask props = new HashMap<>(((KafkaIndexTaskIOConfig) super.ioConfig).getConsumerProperties()); + KafkaIndexTaskIOConfig kafkaIndexTaskIOConfig = (KafkaIndexTaskIOConfig) super.ioConfig; + final Map props = new HashMap<>(kafkaIndexTaskIOConfig.getConsumerProperties()); props.put("auto.offset.reset", "none"); - return new KafkaRecordSupplier(props, configMapper); + return new KafkaRecordSupplier(props, configMapper, kafkaIndexTaskIOConfig.getConfigOverrides()); } finally { Thread.currentThread().setContextClassLoader(currCtxCl); diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java index ece3fa5ba82..1f4808ad683 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java @@ -27,6 +27,7 @@ import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig; import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; +import org.apache.druid.indexing.seekablestream.extension.KafkaConfigOverrides; import org.joda.time.DateTime; import javax.annotation.Nullable; @@ -36,6 +37,7 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig consumerProperties; private final long pollTimeout; + private final KafkaConfigOverrides configOverrides; @JsonCreator public KafkaIndexTaskIOConfig( @@ -56,7 +58,8 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig myEndSequenceNumbers = getEndSequenceNumbers(); for (int partition : myEndSequenceNumbers.getPartitionSequenceNumberMap().keySet()) { Preconditions.checkArgument( myEndSequenceNumbers.getPartitionSequenceNumberMap() - .get(partition) - .compareTo(getStartSequenceNumbers().getPartitionSequenceNumberMap().get(partition)) >= 0, + .get(partition) + .compareTo(getStartSequenceNumbers().getPartitionSequenceNumberMap().get(partition)) >= 0, "end offset must be >= start offset for partition[%s]", partition ); @@ -97,7 +101,8 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig setupRecordSupplier() { - return new KafkaRecordSupplier(spec.getIoConfig().getConsumerProperties(), sortingMapper); + return new KafkaRecordSupplier( + spec.getIoConfig().getConsumerProperties(), + sortingMapper, + spec.getIoConfig().getConfigOverrides() + ); } @Override @@ -197,7 +201,8 @@ public class KafkaSupervisor extends SeekableStreamSupervisor consumerProperties; private final long pollTimeout; - + private final KafkaConfigOverrides configOverrides; @JsonCreator public KafkaSupervisorIOConfig( @@ -61,7 +62,8 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig @JsonProperty("completionTimeout") Period completionTimeout, @JsonProperty("lateMessageRejectionPeriod") Period lateMessageRejectionPeriod, @JsonProperty("earlyMessageRejectionPeriod") Period earlyMessageRejectionPeriod, - @JsonProperty("lateMessageRejectionStartDateTime") DateTime lateMessageRejectionStartDateTime + @JsonProperty("lateMessageRejectionStartDateTime") DateTime lateMessageRejectionStartDateTime, + @JsonProperty("configOverrides") KafkaConfigOverrides configOverrides ) { super( @@ -86,6 +88,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig StringUtils.format("consumerProperties must contain entry for [%s]", BOOTSTRAP_SERVERS_KEY) ); this.pollTimeout = pollTimeout != null ? pollTimeout : DEFAULT_POLL_TIMEOUT_MILLIS; + this.configOverrides = configOverrides; } @JsonProperty @@ -112,6 +115,12 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig return isUseEarliestSequenceNumber(); } + @JsonProperty + public KafkaConfigOverrides getConfigOverrides() + { + return configOverrides; + } + @Override public String toString() { @@ -130,6 +139,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig ", earlyMessageRejectionPeriod=" + getEarlyMessageRejectionPeriod() + ", lateMessageRejectionPeriod=" + getLateMessageRejectionPeriod() + ", lateMessageRejectionStartDateTime=" + getLateMessageRejectionStartDateTime() + + ", configOverrides=" + getConfigOverrides() + '}'; } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIOConfigTest.java index 4de3c06e78a..60d071688e0 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIOConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIOConfigTest.java @@ -317,6 +317,7 @@ public class KafkaIOConfigTest true, DateTimes.nowUtc(), DateTimes.nowUtc(), + null, null ); final byte[] json = mapper.writeValueAsBytes(currentConfig); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 6e7bee5b240..b1dc4545aaf 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -404,7 +404,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); Assert.assertTrue(task.supportsQueries()); @@ -463,7 +464,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); final ListenableFuture future = runTask(task); @@ -509,7 +511,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); task.addToContext(Tasks.STORE_EMPTY_COLUMNS_KEY, false); @@ -543,6 +546,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, + null, null ) ); @@ -587,7 +591,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); @@ -641,7 +646,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); final ListenableFuture future = runTask(task); @@ -719,7 +725,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); final ListenableFuture future = runTask(task); @@ -823,7 +830,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); final ListenableFuture future = runTask(task); @@ -950,7 +958,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); final ListenableFuture future = runTask(task); @@ -1036,7 +1045,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); final KafkaIndexTask staleReplica = createTask( @@ -1051,7 +1061,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); @@ -1119,7 +1130,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, DateTimes.of("2010"), null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); @@ -1170,7 +1182,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, DateTimes.of("2010"), - INPUT_FORMAT + INPUT_FORMAT, + null ) ); @@ -1230,7 +1243,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); @@ -1305,7 +1319,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - new TestKafkaInputFormat(INPUT_FORMAT) + new TestKafkaInputFormat(INPUT_FORMAT), + null ) ); Assert.assertTrue(task.supportsQueries()); @@ -1381,7 +1396,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - KAFKA_INPUT_FORMAT + KAFKA_INPUT_FORMAT, + null ) ); Assert.assertTrue(task.supportsQueries()); @@ -1436,7 +1452,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); @@ -1474,7 +1491,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); @@ -1523,7 +1541,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); @@ -1577,7 +1596,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); @@ -1618,7 +1638,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); @@ -1712,7 +1733,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); @@ -1786,7 +1808,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); final KafkaIndexTask task2 = createTask( @@ -1801,7 +1824,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); @@ -1852,7 +1876,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); final KafkaIndexTask task2 = createTask( @@ -1867,7 +1892,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); @@ -1920,7 +1946,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase false, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); final KafkaIndexTask task2 = createTask( @@ -1935,7 +1962,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase false, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); @@ -1986,7 +2014,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); @@ -2034,7 +2063,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); final KafkaIndexTask task2 = createTask( @@ -2049,7 +2079,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); @@ -2103,7 +2134,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); @@ -2144,7 +2176,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); @@ -2206,7 +2239,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); @@ -2255,7 +2289,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); @@ -2317,7 +2352,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); @@ -2407,7 +2443,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); @@ -2443,7 +2480,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); @@ -2489,7 +2527,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ), context ); @@ -2536,7 +2575,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); @@ -2574,7 +2614,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); @@ -2685,7 +2726,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); @@ -2744,7 +2786,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); final ListenableFuture future = runTask(task); @@ -2766,7 +2809,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); @@ -2815,7 +2859,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); @@ -3207,7 +3252,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase true, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ) ); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java index 897a5869855..b480ec146fd 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java @@ -34,8 +34,11 @@ import org.apache.druid.metadata.DynamicConfigProvider; import org.apache.druid.metadata.MapStringDynamicConfigProvider; import org.apache.druid.segment.TestHelper; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.serialization.Deserializer; import org.junit.AfterClass; import org.junit.Assert; @@ -227,7 +230,7 @@ public class KafkaRecordSupplierTest ); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - kafkaServer.consumerProperties(), OBJECT_MAPPER); + kafkaServer.consumerProperties(), OBJECT_MAPPER, null); Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); @@ -257,7 +260,8 @@ public class KafkaRecordSupplierTest KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( properties, - OBJECT_MAPPER + OBJECT_MAPPER, + null ); Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); @@ -282,7 +286,8 @@ public class KafkaRecordSupplierTest KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( properties, - OBJECT_MAPPER + OBJECT_MAPPER, + null ); Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); //just test recordSupplier is initiated @@ -299,7 +304,8 @@ public class KafkaRecordSupplierTest KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( properties, - OBJECT_MAPPER + OBJECT_MAPPER, + null ); Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); //just test recordSupplier is initiated @@ -324,7 +330,8 @@ public class KafkaRecordSupplierTest KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( properties, - OBJECT_MAPPER + OBJECT_MAPPER, + null ); recordSupplier.assign(partitions); @@ -358,7 +365,10 @@ public class KafkaRecordSupplierTest ); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - kafkaServer.consumerProperties(), OBJECT_MAPPER); + kafkaServer.consumerProperties(), + OBJECT_MAPPER, + null + ); recordSupplier.assign(partitions); recordSupplier.seekToEarliest(partitions); @@ -399,7 +409,7 @@ public class KafkaRecordSupplierTest KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - kafkaServer.consumerProperties(), OBJECT_MAPPER); + kafkaServer.consumerProperties(), OBJECT_MAPPER, null); recordSupplier.assign(partitions); recordSupplier.seekToEarliest(partitions); @@ -470,7 +480,7 @@ public class KafkaRecordSupplierTest ); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - kafkaServer.consumerProperties(), OBJECT_MAPPER); + kafkaServer.consumerProperties(), OBJECT_MAPPER, null); recordSupplier.assign(partitions); recordSupplier.seekToEarliest(partitions); @@ -513,7 +523,7 @@ public class KafkaRecordSupplierTest ); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - kafkaServer.consumerProperties(), OBJECT_MAPPER); + kafkaServer.consumerProperties(), OBJECT_MAPPER, null); recordSupplier.assign(partitions); recordSupplier.seekToEarliest(partitions); @@ -546,7 +556,7 @@ public class KafkaRecordSupplierTest ); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - kafkaServer.consumerProperties(), OBJECT_MAPPER); + kafkaServer.consumerProperties(), OBJECT_MAPPER, null); recordSupplier.assign(partitions); @@ -572,7 +582,7 @@ public class KafkaRecordSupplierTest ); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - kafkaServer.consumerProperties(), OBJECT_MAPPER); + kafkaServer.consumerProperties(), OBJECT_MAPPER, null); recordSupplier.assign(partitions); recordSupplier.seekToEarliest(partitions); @@ -607,7 +617,7 @@ public class KafkaRecordSupplierTest public void getLatestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetShouldReturnsValidNonNull() { KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - kafkaServer.consumerProperties(), OBJECT_MAPPER); + kafkaServer.consumerProperties(), OBJECT_MAPPER, null); StreamPartition streamPartition = StreamPartition.of(topic, 0); Set> partitions = ImmutableSet.of(streamPartition); recordSupplier.assign(partitions); @@ -619,7 +629,7 @@ public class KafkaRecordSupplierTest public void getEarliestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetShouldReturnsValidNonNull() { KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - kafkaServer.consumerProperties(), OBJECT_MAPPER); + kafkaServer.consumerProperties(), OBJECT_MAPPER, null); StreamPartition streamPartition = StreamPartition.of(topic, 0); Set> partitions = ImmutableSet.of(streamPartition); recordSupplier.assign(partitions); @@ -631,7 +641,7 @@ public class KafkaRecordSupplierTest public void getLatestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShouldReturnsValidNonNull() { KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - kafkaServer.consumerProperties(), OBJECT_MAPPER); + kafkaServer.consumerProperties(), OBJECT_MAPPER, null); StreamPartition streamPartition = StreamPartition.of(topic, 0); Set> partitions = ImmutableSet.of(streamPartition); recordSupplier.assign(partitions); @@ -643,7 +653,7 @@ public class KafkaRecordSupplierTest public void getEarliestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShouldReturnsValidNonNull() { KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - kafkaServer.consumerProperties(), OBJECT_MAPPER); + kafkaServer.consumerProperties(), OBJECT_MAPPER, null); StreamPartition streamPartition = StreamPartition.of(topic, 0); Set> partitions = ImmutableSet.of(streamPartition); recordSupplier.assign(partitions); @@ -678,6 +688,27 @@ public class KafkaRecordSupplierTest Assert.assertEquals("pwd2", properties.getProperty(KafkaSupervisorIOConfig.TRUST_STORE_PASSWORD_KEY)); } + @Test + public void testUseKafkaConsumerOverrides() + { + KafkaConsumer kafkaConsumer = KafkaRecordSupplier.getKafkaConsumer( + OBJECT_MAPPER, + kafkaServer.consumerProperties(), + originalConsumerProperties -> { + final Map newMap = new HashMap<>(originalConsumerProperties); + newMap.put("client.id", "overrideConfigTest"); + return newMap; + } + ); + + // We set a client ID via config override, it should appear in the metric name tags + Map metrics = (Map) kafkaConsumer.metrics(); + for (MetricName metricName : metrics.keySet()) { + Assert.assertEquals("overrideConfigTest", metricName.tags().get("client-id")); + break; + } + } + private void insertData() throws ExecutionException, InterruptedException { try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java index bf5831ccaef..24aa2a7dd43 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java @@ -143,6 +143,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest null, null, null, + null, null ), null, @@ -318,6 +319,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest null, null, null, + null, null ), null, diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java index e503d4fd7c7..5b231deb314 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java @@ -304,6 +304,7 @@ public class KafkaSupervisorIOConfigTest new Period("PT30M"), null, null, + null, null ); String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index e6a3a919b42..0e01702240b 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -297,6 +297,7 @@ public class KafkaSupervisorTest extends EasyMockSupport new Period("PT30M"), null, null, + null, null ); @@ -447,7 +448,8 @@ public class KafkaSupervisorTest extends EasyMockSupport null, null, null, - INPUT_FORMAT + INPUT_FORMAT, + null ), new KafkaIndexTaskTuningConfig( null, @@ -3649,6 +3651,7 @@ public class KafkaSupervisorTest extends EasyMockSupport new Period("PT30M"), lateMessageRejectionPeriod, earlyMessageRejectionPeriod, + null, null ); @@ -3761,6 +3764,7 @@ public class KafkaSupervisorTest extends EasyMockSupport new Period("PT30M"), lateMessageRejectionPeriod, earlyMessageRejectionPeriod, + null, null ); @@ -3877,6 +3881,7 @@ public class KafkaSupervisorTest extends EasyMockSupport new Period("PT30M"), lateMessageRejectionPeriod, earlyMessageRejectionPeriod, + null, null ); @@ -4020,7 +4025,8 @@ public class KafkaSupervisorTest extends EasyMockSupport true, minimumMessageTime, maximumMessageTime, - INPUT_FORMAT + INPUT_FORMAT, + null ), Collections.emptyMap(), OBJECT_MAPPER diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java index 854810f9228..e009394226d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java @@ -53,6 +53,8 @@ public class Tasks public static final String FORCE_TIME_CHUNK_LOCK_KEY = "forceTimeChunkLock"; public static final String USE_SHARED_LOCK = "useSharedLock"; public static final String STORE_EMPTY_COLUMNS_KEY = "storeEmptyColumns"; + public static final String DYNAMIC_CONFIG_PROVIDER_KEY = "dynamicConfigProviderKey"; + /** * Context flag denoting if maximum possible values should be used to estimate diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/extension/KafkaConfigOverrides.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/extension/KafkaConfigOverrides.java new file mode 100644 index 00000000000..58a22751332 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/extension/KafkaConfigOverrides.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream.extension; + +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.druid.guice.annotations.ExtensionPoint; + +import java.util.Map; + +/** + * This is used to allow extensions to make adjustments to the Kafka consumer properties. + * + * This interface is only used by the druid-kafka-indexing-service extension, but the interface definition must + * be placed in a non-extension module in order for other extensions to be able to provide subtypes visible within + * the druid-kafka-indexing-service extension. + */ +@ExtensionPoint +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +public interface KafkaConfigOverrides +{ + /** + * Given a map of Kafka consumer properties, return a new potentially adjusted map of properties. + * + * @param originalConsumerProperties Kafka consumer properties + * @return Adjusted copy of Kafka consumer properties + */ + Map overrideConfigs(Map originalConsumerProperties); +}