diff --git a/docs/development/extensions-core/kafka-supervisor-reference.md b/docs/development/extensions-core/kafka-supervisor-reference.md index be87a3dff9c..536f1ade95b 100644 --- a/docs/development/extensions-core/kafka-supervisor-reference.md +++ b/docs/development/extensions-core/kafka-supervisor-reference.md @@ -37,7 +37,8 @@ This topic contains configuration reference information for the Apache Kafka sup |Field|Type|Description|Required| |-----|----|-----------|--------| -|`topic`|String|The Kafka topic to read from. Must be a specific topic. Topic patterns are not supported.|yes| +|`topic`|String|The Kafka topic to read from. Must be a specific topic. Use this setting when you want to ingest from a single kafka topic.|yes| +|`topicPattern`|String|A regex pattern that can used to select multiple kafka topics to ingest data from. Either this or `topic` can be used in a spec. See [Ingesting from multiple topics](#ingesting-from-multiple-topics) for more details.|yes| |`inputFormat`|Object|`inputFormat` to define input data parsing. See [Specifying data format](#specifying-data-format) for details about specifying the input format.|yes| |`consumerProperties`|Map|A map of properties to pass to the Kafka consumer. See [More on consumer properties](#more-on-consumerproperties).|yes| |`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll records, in milliseconds|no (default == 100)| @@ -53,7 +54,6 @@ This topic contains configuration reference information for the Apache Kafka sup |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)| |`autoScalerConfig`|Object|Defines auto scaling behavior for Kafka ingest tasks. See [Tasks Autoscaler Properties](#task-autoscaler-properties).|no (default == null)| |`idleConfig`|Object|Defines how and when Kafka Supervisor can become idle. See [Idle Supervisor Configuration](#idle-supervisor-configuration) for more details.|no (default == null)| -|`multiTopic`|Boolean|Set this to true if you want to ingest data from multiple Kafka topics using a single supervisor. See [Ingesting from multiple topics](#ingesting-from-multiple-topics) for more details.|no (default == false)| ## Task Autoscaler Properties @@ -138,11 +138,20 @@ The following example demonstrates supervisor spec with `lagBased` autoScaler an } ``` ## Ingesting from multiple topics -To ingest from multiple topics, you have to set `multiTopic` in the supervisor IO config to `true`. Multiple topics -can be passed as a regex pattern as the value for `topic` in the IO config. For example, to ingest data from clicks and -impressions, you will set `topic` to `clicks|impressions` in the IO config. If new topics are added to the cluster that -match the regex, druid will automatically start ingesting from those new topics. If you enable multi-topic -ingestion for a datasource, downgrading will cause the ingestion to fail for that datasource. + +To ingest data from multiple topics, you have to set `topicPattern` in the supervisor IO config and not set `topic`. +Multiple topics can be passed as a regex pattern as the value for `topicPattern` in the IO config. For example, to +ingest data from clicks and impressions, you will set `topicPattern` to `clicks|impressions` in the IO config. +Similarly, you can use `metrics-.*` as the value for `topicPattern` if you want to ingest from all the topics that +start with `metrics-`. If new topics are added to the cluster that match the regex, Druid will automatically start +ingesting from those new topics. If you enable multi-topic ingestion for a datasource, downgrading to a version +lesser than 28.0.0 will cause the ingestion for that datasource to fail. + +When ingesting data from multiple topics, the partitions are assigned based on the hashcode of topic and the id of the +partition within that topic. The partition assignment might not be uniform across all the tasks. It's also assumed +that partitions across individual topics have similar load. It is recommended that you have a higher number of +partitions for a high load topic and a lower number of partitions for a low load topic. Assuming that you want to +ingest from both high and low load topic in the same supervisor. ## More on consumerProperties diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java index 96d7a849af2..a88300552e2 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java @@ -231,7 +231,7 @@ public class KafkaRecordSupplier implements RecordSupplier partitionLag = getRecordLagPerPartitionInLatestSequences(getHighestCurrentOffsets()); return new KafkaSupervisorReportPayload( spec.getDataSchema().getDataSource(), - ioConfig.getTopic(), + ioConfig.getStream(), numPartitions, ioConfig.getReplicas(), ioConfig.getTaskDuration().getMillis() / 1000, @@ -204,8 +204,8 @@ public class KafkaSupervisor extends SeekableStreamSupervisor(kafkaIoConfig.getTopic(), startPartitions, Collections.emptySet()), - new SeekableStreamEndSequenceNumbers<>(kafkaIoConfig.getTopic(), endPartitions), + new SeekableStreamStartSequenceNumbers<>(kafkaIoConfig.getStream(), startPartitions, Collections.emptySet()), + new SeekableStreamEndSequenceNumbers<>(kafkaIoConfig.getStream(), endPartitions), kafkaIoConfig.getConsumerProperties(), kafkaIoConfig.getPollTimeout(), true, diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java index c244fea95c7..fbf55f4ab5e 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import org.apache.druid.data.input.InputFormat; +import org.apache.druid.error.InvalidInput; import org.apache.druid.indexing.seekablestream.extension.KafkaConfigOverrides; import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; @@ -48,11 +49,13 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig private final Map consumerProperties; private final long pollTimeout; private final KafkaConfigOverrides configOverrides; - private final boolean multiTopic; + private final String topic; + private final String topicPattern; @JsonCreator public KafkaSupervisorIOConfig( @JsonProperty("topic") String topic, + @JsonProperty("topicPattern") String topicPattern, @JsonProperty("inputFormat") InputFormat inputFormat, @JsonProperty("replicas") Integer replicas, @JsonProperty("taskCount") Integer taskCount, @@ -69,12 +72,11 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig @JsonProperty("lateMessageRejectionStartDateTime") DateTime lateMessageRejectionStartDateTime, @JsonProperty("configOverrides") KafkaConfigOverrides configOverrides, @JsonProperty("idleConfig") IdleConfig idleConfig, - @JsonProperty("multiTopic") Boolean multiTopic, @JsonProperty("stopTaskCount") Integer stopTaskCount ) { super( - Preconditions.checkNotNull(topic, "topic"), + checkTopicArguments(topic, topicPattern), inputFormat, replicas, taskCount, @@ -98,13 +100,26 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig ); this.pollTimeout = pollTimeout != null ? pollTimeout : DEFAULT_POLL_TIMEOUT_MILLIS; this.configOverrides = configOverrides; - this.multiTopic = multiTopic != null ? multiTopic : DEFAULT_IS_MULTI_TOPIC; + this.topic = topic; + this.topicPattern = topicPattern; } + /** + * Only used in testing or serialization/deserialization + */ @JsonProperty public String getTopic() { - return getStream(); + return topic; + } + + /** + * Only used in testing or serialization/deserialization + */ + @JsonProperty + public String getTopicPattern() + { + return topicPattern; } @JsonProperty @@ -131,10 +146,9 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig return configOverrides; } - @JsonProperty public boolean isMultiTopic() { - return multiTopic; + return topicPattern != null; } @Override @@ -142,6 +156,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig { return "KafkaSupervisorIOConfig{" + "topic='" + getTopic() + '\'' + + "topicPattern='" + getTopicPattern() + '\'' + ", replicas=" + getReplicas() + ", taskCount=" + getTaskCount() + ", taskDuration=" + getTaskDuration() + @@ -157,7 +172,23 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig ", lateMessageRejectionStartDateTime=" + getLateMessageRejectionStartDateTime() + ", configOverrides=" + getConfigOverrides() + ", idleConfig=" + getIdleConfig() + + ", stopTaskCount=" + getStopTaskCount() + '}'; } + private static String checkTopicArguments(String topic, String topicPattern) + { + if (topic == null && topicPattern == null) { + throw InvalidInput.exception("Either topic or topicPattern must be specified"); + } + if (topic != null && topicPattern != null) { + throw InvalidInput.exception( + "Only one of topic [%s] or topicPattern [%s] must be specified", + topic, + topicPattern + ); + } + return topic != null ? topic : topicPattern; + } + } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java index af6f69ab2ec..9337175378f 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java @@ -114,7 +114,7 @@ public class KafkaSupervisorSpec extends SeekableStreamSupervisorSpec @Override public String getSource() { - return getIoConfig() != null ? getIoConfig().getTopic() : null; + return getIoConfig() != null ? getIoConfig().getStream() : null; } @Override diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java index 0718f3d08c9..bfd81464ba2 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; import org.apache.curator.test.TestingCluster; import org.apache.druid.data.input.kafka.KafkaRecordEntity; import org.apache.druid.data.input.kafka.KafkaTopicPartition; @@ -57,6 +58,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.regex.Pattern; import java.util.stream.Collectors; public class KafkaRecordSupplierTest @@ -67,6 +69,9 @@ public class KafkaRecordSupplierTest private static final int POLL_RETRY = 5; private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper(); + private static KafkaTopicPartition PARTITION_0 = new KafkaTopicPartition(false, null, 0); + private static KafkaTopicPartition PARTITION_1 = new KafkaTopicPartition(false, null, 1); + private static String TOPIC = "topic"; private static int TOPIC_POS_FIX = 0; private static TestingCluster ZK_SERVER; @@ -78,21 +83,21 @@ public class KafkaRecordSupplierTest private static List> generateRecords(String topic) { return ImmutableList.of( - new ProducerRecord<>(TOPIC, 0, null, jb("2008", "a", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(TOPIC, 0, null, jb("2009", "b", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(TOPIC, 0, null, jb("2010", "c", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(TOPIC, 0, null, jb("2011", "d", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(TOPIC, 0, null, jb("2011", "e", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(TOPIC, 0, null, jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")), - new ProducerRecord<>(TOPIC, 0, null, StringUtils.toUtf8("unparseable")), - new ProducerRecord<>(TOPIC, 0, null, StringUtils.toUtf8("unparseable2")), - new ProducerRecord<>(TOPIC, 0, null, null), - new ProducerRecord<>(TOPIC, 0, null, jb("2013", "f", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(TOPIC, 0, null, jb("2049", "f", "y", "notanumber", "20.0", "1.0")), - new ProducerRecord<>(TOPIC, 1, null, jb("2049", "f", "y", "10", "notanumber", "1.0")), - new ProducerRecord<>(TOPIC, 1, null, jb("2049", "f", "y", "10", "20.0", "notanumber")), - new ProducerRecord<>(TOPIC, 1, null, jb("2012", "g", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(TOPIC, 1, null, jb("2011", "h", "y", "10", "20.0", "1.0")) + new ProducerRecord<>(topic, 0, null, jb("2008", "a", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jb("2009", "b", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jb("2010", "c", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jb("2011", "d", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jb("2011", "e", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("unparseable")), + new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("unparseable2")), + new ProducerRecord<>(topic, 0, null, null), + new ProducerRecord<>(topic, 0, null, jb("2013", "f", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jb("2049", "f", "y", "notanumber", "20.0", "1.0")), + new ProducerRecord<>(topic, 1, null, jb("2049", "f", "y", "10", "notanumber", "1.0")), + new ProducerRecord<>(topic, 1, null, jb("2049", "f", "y", "10", "20.0", "notanumber")), + new ProducerRecord<>(topic, 1, null, jb("2012", "g", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 1, null, jb("2011", "h", "y", "10", "20.0", "1.0")) ); } @@ -229,8 +234,8 @@ public class KafkaRecordSupplierTest insertData(); Set> partitions = ImmutableSet.of( - StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)), - StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 1)) + StreamPartition.of(TOPIC, PARTITION_0), + StreamPartition.of(TOPIC, PARTITION_1) ); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( @@ -241,12 +246,40 @@ public class KafkaRecordSupplierTest recordSupplier.assign(partitions); Assert.assertEquals(partitions, recordSupplier.getAssignment()); - Assert.assertEquals(ImmutableSet.of(new KafkaTopicPartition(false, TOPIC, 0), new KafkaTopicPartition(false, TOPIC, 1)), + Assert.assertEquals(ImmutableSet.of(PARTITION_0, PARTITION_1), recordSupplier.getPartitionIds(TOPIC)); recordSupplier.close(); } + @Test + public void testMultiTopicSupplierSetup() throws ExecutionException, InterruptedException + { + // Insert data into TOPIC + insertData(); + + // Insert data into other topic + String otherTopic = nextTopicName(); + records = generateRecords(otherTopic); + insertData(); + + KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( + KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, true); + + String stream = Pattern.quote(TOPIC) + "|" + Pattern.quote(otherTopic); + Set partitions = recordSupplier.getPartitionIds(stream); + Set diff = Sets.difference( + ImmutableSet.of( + new KafkaTopicPartition(true, TOPIC, 0), + new KafkaTopicPartition(true, TOPIC, 1), + new KafkaTopicPartition(true, otherTopic, 0), + new KafkaTopicPartition(true, otherTopic, 1) + ), + partitions + ); + Assert.assertEquals(diff.toString(), 0, diff.size()); + } + @Test public void testSupplierSetupCustomDeserializer() throws ExecutionException, InterruptedException { @@ -255,8 +288,8 @@ public class KafkaRecordSupplierTest insertData(); Set> partitions = ImmutableSet.of( - StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)), - StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 1)) + StreamPartition.of(TOPIC, PARTITION_0), + StreamPartition.of(TOPIC, PARTITION_1) ); Map properties = KAFKA_SERVER.consumerProperties(); @@ -275,7 +308,7 @@ public class KafkaRecordSupplierTest recordSupplier.assign(partitions); Assert.assertEquals(partitions, recordSupplier.getAssignment()); - Assert.assertEquals(ImmutableSet.of(new KafkaTopicPartition(false, TOPIC, 0), new KafkaTopicPartition(false, TOPIC, 1)), + Assert.assertEquals(ImmutableSet.of(PARTITION_0, PARTITION_1), recordSupplier.getPartitionIds(TOPIC)); recordSupplier.close(); @@ -329,8 +362,8 @@ public class KafkaRecordSupplierTest insertData(); Set> partitions = ImmutableSet.of( - StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)), - StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 1)) + StreamPartition.of(TOPIC, PARTITION_0), + StreamPartition.of(TOPIC, PARTITION_1) ); Map properties = KAFKA_SERVER.consumerProperties(); @@ -371,8 +404,8 @@ public class KafkaRecordSupplierTest insertData(); Set> partitions = ImmutableSet.of( - StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)), - StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 1)) + StreamPartition.of(TOPIC, PARTITION_0), + StreamPartition.of(TOPIC, PARTITION_1) ); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( @@ -424,8 +457,8 @@ public class KafkaRecordSupplierTest } Set> partitions = ImmutableSet.of( - StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)), - StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 1)) + StreamPartition.of(TOPIC, PARTITION_0), + StreamPartition.of(TOPIC, PARTITION_1) ); @@ -491,12 +524,12 @@ public class KafkaRecordSupplierTest // Insert data insertData(); - StreamPartition partition0 = StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)); - StreamPartition partition1 = StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 1)); + StreamPartition partition0 = StreamPartition.of(TOPIC, PARTITION_0); + StreamPartition partition1 = StreamPartition.of(TOPIC, PARTITION_1); Set> partitions = ImmutableSet.of( - StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)), - StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 1)) + StreamPartition.of(TOPIC, PARTITION_0), + StreamPartition.of(TOPIC, PARTITION_1) ); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( @@ -534,12 +567,12 @@ public class KafkaRecordSupplierTest // Insert data insertData(); - StreamPartition partition0 = StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)); - StreamPartition partition1 = StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 1)); + StreamPartition partition0 = StreamPartition.of(TOPIC, PARTITION_0); + StreamPartition partition1 = StreamPartition.of(TOPIC, PARTITION_1); Set> partitions = ImmutableSet.of( - StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)), - StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 1)) + StreamPartition.of(TOPIC, PARTITION_0), + StreamPartition.of(TOPIC, PARTITION_1) ); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( @@ -568,11 +601,11 @@ public class KafkaRecordSupplierTest } } - StreamPartition partition0 = StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)); - StreamPartition partition1 = StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 1)); + StreamPartition partition0 = StreamPartition.of(TOPIC, PARTITION_0); + StreamPartition partition1 = StreamPartition.of(TOPIC, PARTITION_1); Set> partitions = ImmutableSet.of( - StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)) + StreamPartition.of(TOPIC, PARTITION_0) ); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( @@ -593,12 +626,12 @@ public class KafkaRecordSupplierTest // Insert data insertData(); - StreamPartition partition0 = StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)); - StreamPartition partition1 = StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 1)); + StreamPartition partition0 = StreamPartition.of(TOPIC, PARTITION_0); + StreamPartition partition1 = StreamPartition.of(TOPIC, PARTITION_1); Set> partitions = ImmutableSet.of( - StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)), - StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 1)) + StreamPartition.of(TOPIC, PARTITION_0), + StreamPartition.of(TOPIC, PARTITION_1) ); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( @@ -638,7 +671,7 @@ public class KafkaRecordSupplierTest { KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false); - StreamPartition streamPartition = StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)); + StreamPartition streamPartition = StreamPartition.of(TOPIC, PARTITION_0); Set> partitions = ImmutableSet.of(streamPartition); recordSupplier.assign(partitions); recordSupplier.seekToEarliest(partitions); @@ -650,7 +683,7 @@ public class KafkaRecordSupplierTest { KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false); - StreamPartition streamPartition = StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)); + StreamPartition streamPartition = StreamPartition.of(TOPIC, PARTITION_0); Set> partitions = ImmutableSet.of(streamPartition); recordSupplier.assign(partitions); recordSupplier.seekToEarliest(partitions); @@ -662,7 +695,7 @@ public class KafkaRecordSupplierTest { KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false); - StreamPartition streamPartition = StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)); + StreamPartition streamPartition = StreamPartition.of(TOPIC, PARTITION_0); Set> partitions = ImmutableSet.of(streamPartition); recordSupplier.assign(partitions); recordSupplier.seekToLatest(partitions); @@ -674,7 +707,7 @@ public class KafkaRecordSupplierTest { KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false); - StreamPartition streamPartition = StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)); + StreamPartition streamPartition = StreamPartition.of(TOPIC, PARTITION_0); Set> partitions = ImmutableSet.of(streamPartition); recordSupplier.assign(partitions); recordSupplier.seekToLatest(partitions); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java index a6731f06a77..9a6fd03726d 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java @@ -72,6 +72,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.regex.Pattern; public class KafkaSamplerSpecTest extends InitializedNullHandlingTest { @@ -163,6 +164,60 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest null, new KafkaSupervisorIOConfig( TOPIC, + null, + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), + null, + null, + null, + kafkaServer.consumerProperties(), + null, + null, + null, + null, + true, + null, + null, + null, + null, + null, + null, + null + ), + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + + KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec( + supervisorSpec, + new SamplerConfig(5, 5_000, null, null), + new InputSourceSampler(OBJECT_MAPPER), + OBJECT_MAPPER + ); + + runSamplerAndCompareResponse(samplerSpec, true); + } + + @Test + public void testSampleWithTopicPattern() + { + insertData(generateRecords(TOPIC)); + + KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpec( + null, + DATA_SCHEMA, + null, + new KafkaSupervisorIOConfig( + null, + Pattern.quote(TOPIC), new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), null, null, @@ -179,7 +234,6 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest null, null, null, - false, null ), null, @@ -216,6 +270,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest null, new KafkaSupervisorIOConfig( TOPIC, + null, new KafkaInputFormat( null, null, @@ -240,7 +295,6 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest null, null, null, - false, null ), null, @@ -331,6 +385,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest null, null, null, + null, kafkaServer.consumerProperties(), null, null, @@ -343,7 +398,6 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest null, null, null, - false, null ), null, @@ -508,6 +562,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest null, new KafkaSupervisorIOConfig( TOPIC, + null, new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), null, null, @@ -527,7 +582,6 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest null, null, null, - false, null ), null, @@ -564,6 +618,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest null, new KafkaSupervisorIOConfig( TOPIC, + null, new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), null, null, @@ -583,7 +638,6 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest null, null, null, - false, null ), null, diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java index 51453e4335a..231705418f5 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; +import org.apache.druid.error.DruidException; import org.apache.druid.indexing.kafka.KafkaConsumerConfigs; import org.apache.druid.indexing.kafka.KafkaIndexTaskModule; import org.apache.druid.indexing.kafka.KafkaRecordSupplier; @@ -75,6 +76,7 @@ public class KafkaSupervisorIOConfigTest ); Assert.assertEquals("my-topic", config.getTopic()); + Assert.assertNull(config.getTopicPattern()); Assert.assertEquals(1, (int) config.getReplicas()); Assert.assertEquals(1, (int) config.getTaskCount()); Assert.assertEquals(Duration.standardMinutes(60), config.getTaskDuration()); @@ -89,6 +91,28 @@ public class KafkaSupervisorIOConfigTest Assert.assertFalse("lateMessageRejectionStartDateTime", config.getLateMessageRejectionStartDateTime().isPresent()); } + @Test + public void testSerdeWithTopicPattern() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"kafka\",\n" + + " \"topicPattern\": \"my-topic.*\",\n" + + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"}\n" + + "}"; + + KafkaSupervisorIOConfig config = mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + KafkaSupervisorIOConfig.class + ) + ), KafkaSupervisorIOConfig.class + ); + + Assert.assertEquals("my-topic.*", config.getTopicPattern()); + Assert.assertNull(config.getTopic()); + } + @Test public void testSerdeWithNonDefaultsWithLateMessagePeriod() throws Exception { @@ -118,6 +142,7 @@ public class KafkaSupervisorIOConfigTest ); Assert.assertEquals("my-topic", config.getTopic()); + Assert.assertNull(config.getTopicPattern()); Assert.assertEquals(3, (int) config.getReplicas()); Assert.assertEquals(9, (int) config.getTaskCount()); Assert.assertEquals(Duration.standardMinutes(30), config.getTaskDuration()); @@ -160,6 +185,7 @@ public class KafkaSupervisorIOConfigTest ); Assert.assertEquals("my-topic", config.getTopic()); + Assert.assertNull(config.getTopicPattern()); Assert.assertEquals(3, (int) config.getReplicas()); Assert.assertEquals(9, (int) config.getTaskCount()); Assert.assertEquals(Duration.standardMinutes(30), config.getTaskDuration()); @@ -189,6 +215,7 @@ public class KafkaSupervisorIOConfigTest KafkaRecordSupplier.addConsumerPropertiesFromConfig(props, mapper, config.getConsumerProperties()); Assert.assertEquals("my-topic", config.getTopic()); + Assert.assertNull(config.getTopicPattern()); Assert.assertEquals("localhost:9092", props.getProperty("bootstrap.servers")); Assert.assertEquals("mytruststorepassword", props.getProperty("ssl.truststore.password")); Assert.assertEquals("mykeystorepassword", props.getProperty("ssl.keystore.password")); @@ -204,8 +231,8 @@ public class KafkaSupervisorIOConfigTest + "}"; exception.expect(JsonMappingException.class); - exception.expectCause(CoreMatchers.isA(NullPointerException.class)); - exception.expectMessage(CoreMatchers.containsString("topic")); + exception.expectCause(CoreMatchers.isA(DruidException.class)); + exception.expectMessage(CoreMatchers.containsString("Either topic or topicPattern must be specified")); mapper.readValue(jsonStr, KafkaSupervisorIOConfig.class); } @@ -293,6 +320,7 @@ public class KafkaSupervisorIOConfigTest KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig( "test", null, + null, 1, 1, new Period("PT1H"), @@ -308,7 +336,6 @@ public class KafkaSupervisorIOConfigTest null, null, null, - false, null ); String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig); @@ -336,6 +363,7 @@ public class KafkaSupervisorIOConfigTest KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig( "test", null, + null, 1, 1, new Period("PT1H"), @@ -351,7 +379,6 @@ public class KafkaSupervisorIOConfigTest null, null, mapper.convertValue(idleConfig, IdleConfig.class), - false, null ); String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java index 7874a11c588..9ff0c0bd784 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java @@ -129,6 +129,7 @@ public class KafkaSupervisorSpecTest Assert.assertEquals(4, spec.getDataSchema().getAggregators().length); Assert.assertNotNull(spec.getIoConfig()); Assert.assertEquals("metrics", spec.getIoConfig().getTopic()); + Assert.assertNull(spec.getIoConfig().getTopicPattern()); Assert.assertNotNull(spec.getTuningConfig()); Assert.assertNull(spec.getContext()); Assert.assertFalse(spec.isSuspended()); @@ -147,6 +148,85 @@ public class KafkaSupervisorSpecTest Assert.assertEquals(serialized, stable); } + @Test + public void testSerdeWithTopicPattern() throws IOException + { + String json = "{\n" + + " \"type\": \"kafka\",\n" + + " \"dataSchema\": {\n" + + " \"dataSource\": \"metrics-kafka\",\n" + + " \"parser\": {\n" + + " \"type\": \"string\",\n" + + " \"parseSpec\": {\n" + + " \"format\": \"json\",\n" + + " \"timestampSpec\": {\n" + + " \"column\": \"timestamp\",\n" + + " \"format\": \"auto\"\n" + + " },\n" + + " \"dimensionsSpec\": {\n" + + " \"dimensions\": [],\n" + + " \"dimensionExclusions\": [\n" + + " \"timestamp\",\n" + + " \"value\"\n" + + " ]\n" + + " }\n" + + " }\n" + + " },\n" + + " \"metricsSpec\": [\n" + + " {\n" + + " \"name\": \"count\",\n" + + " \"type\": \"count\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_sum\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleSum\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_min\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleMin\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_max\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleMax\"\n" + + " }\n" + + " ],\n" + + " \"granularitySpec\": {\n" + + " \"type\": \"uniform\",\n" + + " \"segmentGranularity\": \"HOUR\",\n" + + " \"queryGranularity\": \"NONE\"\n" + + " }\n" + + " },\n" + + " \"ioConfig\": {\n" + + " \"topicPattern\": \"metrics.*\",\n" + + " \"consumerProperties\": {\n" + + " \"bootstrap.servers\": \"localhost:9092\"\n" + + " },\n" + + " \"taskCount\": 1\n" + + " }\n" + + "}"; + KafkaSupervisorSpec spec = mapper.readValue(json, KafkaSupervisorSpec.class); + + Assert.assertNotNull(spec); + Assert.assertNotNull(spec.getDataSchema()); + Assert.assertEquals("metrics.*", spec.getIoConfig().getTopicPattern()); + Assert.assertNull(spec.getIoConfig().getTopic()); + Assert.assertNotNull(spec.getTuningConfig()); + Assert.assertNull(spec.getContext()); + String serialized = mapper.writeValueAsString(spec); + + // expect default values populated in reserialized string + Assert.assertTrue(serialized.contains("\"topicPattern\":\"metrics.*\"")); + Assert.assertTrue(serialized, serialized.contains("\"topic\":null")); + + KafkaSupervisorSpec spec2 = mapper.readValue(serialized, KafkaSupervisorSpec.class); + + String stable = mapper.writeValueAsString(spec2); + + Assert.assertEquals(serialized, stable); + } @Test public void testSerdeWithInputFormat() throws IOException { @@ -215,6 +295,7 @@ public class KafkaSupervisorSpecTest Assert.assertEquals(4, spec.getDataSchema().getAggregators().length); Assert.assertNotNull(spec.getIoConfig()); Assert.assertEquals("metrics", spec.getIoConfig().getTopic()); + Assert.assertNull(spec.getIoConfig().getTopicPattern()); Assert.assertNotNull(spec.getTuningConfig()); Assert.assertNull(spec.getContext()); Assert.assertFalse(spec.isSuspended()); @@ -301,6 +382,7 @@ public class KafkaSupervisorSpecTest Assert.assertEquals(4, spec.getDataSchema().getAggregators().length); Assert.assertNotNull(spec.getIoConfig()); Assert.assertEquals("metrics", spec.getIoConfig().getTopic()); + Assert.assertNull(spec.getIoConfig().getTopicPattern()); Assert.assertNotNull(spec.getTuningConfig()); Assert.assertNull(spec.getContext()); Assert.assertFalse(spec.isSuspended()); @@ -389,6 +471,7 @@ public class KafkaSupervisorSpecTest Assert.assertEquals(4, spec.getDataSchema().getAggregators().length); Assert.assertNotNull(spec.getIoConfig()); Assert.assertEquals("metrics", spec.getIoConfig().getTopic()); + Assert.assertNull(spec.getIoConfig().getTopicPattern()); Assert.assertNotNull(spec.getTuningConfig()); Assert.assertNull(spec.getContext()); Assert.assertFalse(spec.isSuspended()); @@ -473,6 +556,7 @@ public class KafkaSupervisorSpecTest Assert.assertEquals(4, spec.getDataSchema().getAggregators().length); Assert.assertNotNull(spec.getIoConfig()); Assert.assertEquals("metrics", spec.getIoConfig().getTopic()); + Assert.assertNull(spec.getIoConfig().getTopicPattern()); Assert.assertNotNull(spec.getTuningConfig()); Assert.assertNull(spec.getContext()); Assert.assertFalse(spec.isSuspended()); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 7ee0ed9f1e4..5f1311085ca 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -293,6 +293,7 @@ public class KafkaSupervisorTest extends EasyMockSupport KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig( topic, + null, INPUT_FORMAT, replicas, 1, @@ -309,7 +310,6 @@ public class KafkaSupervisorTest extends EasyMockSupport null, null, new IdleConfig(true, 1000L), - false, 1 ); @@ -4501,6 +4501,7 @@ public class KafkaSupervisorTest extends EasyMockSupport consumerProperties.put("bootstrap.servers", kafkaHost); KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig( topic, + null, INPUT_FORMAT, replicas, taskCount, @@ -4517,7 +4518,6 @@ public class KafkaSupervisorTest extends EasyMockSupport null, null, idleConfig, - false, null ); @@ -4614,6 +4614,7 @@ public class KafkaSupervisorTest extends EasyMockSupport consumerProperties.put("isolation.level", "read_committed"); KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig( topic, + null, INPUT_FORMAT, replicas, taskCount, @@ -4630,7 +4631,6 @@ public class KafkaSupervisorTest extends EasyMockSupport null, null, null, - false, null ); @@ -4731,6 +4731,7 @@ public class KafkaSupervisorTest extends EasyMockSupport consumerProperties.put("isolation.level", "read_committed"); KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig( topic, + null, INPUT_FORMAT, replicas, taskCount, @@ -4747,7 +4748,6 @@ public class KafkaSupervisorTest extends EasyMockSupport null, null, null, - false, null ); diff --git a/website/.spelling b/website/.spelling index ce03461e369..058d98c0adf 100644 --- a/website/.spelling +++ b/website/.spelling @@ -328,6 +328,7 @@ gzip gzipped hadoop hasher +hashcode hashtable high-QPS historicals