Refactoring of multi-topic kafka ingestion docs (#14828)

In this PR, I have gotten rid of multiTopic parameter and instead added a topicPattern parameter. Kafka supervisor will pass topicPattern or topic as the stream name to the core ingestion engine. There is validation to ensure that only one of topic or topicPattern will be set. This new setting is easier to understand than overloading the topic field that earlier could be interpreted differently depending on the value of some other field.
This commit is contained in:
Abhishek Agarwal 2023-08-16 18:00:11 +05:30 committed by GitHub
parent d9221e46e4
commit 7911a04064
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 317 additions and 78 deletions

View File

@ -37,7 +37,8 @@ This topic contains configuration reference information for the Apache Kafka sup
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|`topic`|String|The Kafka topic to read from. Must be a specific topic. Topic patterns are not supported.|yes|
|`topic`|String|The Kafka topic to read from. Must be a specific topic. Use this setting when you want to ingest from a single kafka topic.|yes|
|`topicPattern`|String|A regex pattern that can used to select multiple kafka topics to ingest data from. Either this or `topic` can be used in a spec. See [Ingesting from multiple topics](#ingesting-from-multiple-topics) for more details.|yes|
|`inputFormat`|Object|`inputFormat` to define input data parsing. See [Specifying data format](#specifying-data-format) for details about specifying the input format.|yes|
|`consumerProperties`|Map<String, Object>|A map of properties to pass to the Kafka consumer. See [More on consumer properties](#more-on-consumerproperties).|yes|
|`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll records, in milliseconds|no (default == 100)|
@ -53,7 +54,6 @@ This topic contains configuration reference information for the Apache Kafka sup
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)|
|`autoScalerConfig`|Object|Defines auto scaling behavior for Kafka ingest tasks. See [Tasks Autoscaler Properties](#task-autoscaler-properties).|no (default == null)|
|`idleConfig`|Object|Defines how and when Kafka Supervisor can become idle. See [Idle Supervisor Configuration](#idle-supervisor-configuration) for more details.|no (default == null)|
|`multiTopic`|Boolean|Set this to true if you want to ingest data from multiple Kafka topics using a single supervisor. See [Ingesting from multiple topics](#ingesting-from-multiple-topics) for more details.|no (default == false)|
## Task Autoscaler Properties
@ -138,11 +138,20 @@ The following example demonstrates supervisor spec with `lagBased` autoScaler an
}
```
## Ingesting from multiple topics
To ingest from multiple topics, you have to set `multiTopic` in the supervisor IO config to `true`. Multiple topics
can be passed as a regex pattern as the value for `topic` in the IO config. For example, to ingest data from clicks and
impressions, you will set `topic` to `clicks|impressions` in the IO config. If new topics are added to the cluster that
match the regex, druid will automatically start ingesting from those new topics. If you enable multi-topic
ingestion for a datasource, downgrading will cause the ingestion to fail for that datasource.
To ingest data from multiple topics, you have to set `topicPattern` in the supervisor IO config and not set `topic`.
Multiple topics can be passed as a regex pattern as the value for `topicPattern` in the IO config. For example, to
ingest data from clicks and impressions, you will set `topicPattern` to `clicks|impressions` in the IO config.
Similarly, you can use `metrics-.*` as the value for `topicPattern` if you want to ingest from all the topics that
start with `metrics-`. If new topics are added to the cluster that match the regex, Druid will automatically start
ingesting from those new topics. If you enable multi-topic ingestion for a datasource, downgrading to a version
lesser than 28.0.0 will cause the ingestion for that datasource to fail.
When ingesting data from multiple topics, the partitions are assigned based on the hashcode of topic and the id of the
partition within that topic. The partition assignment might not be uniform across all the tasks. It's also assumed
that partitions across individual topics have similar load. It is recommended that you have a higher number of
partitions for a high load topic and a lower number of partitions for a low load topic. Assuming that you want to
ingest from both high and low load topic in the same supervisor.
## More on consumerProperties

View File

@ -231,7 +231,7 @@ public class KafkaRecordSupplier implements RecordSupplier<KafkaTopicPartition,
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.INVALID_INPUT)
.build("Topic [%s] is not found."
+ "Check that the topic exists in Kafka cluster", stream);
+ " Check that the topic exists in Kafka cluster", stream);
}
}
return allPartitions.stream()

View File

@ -169,7 +169,7 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<KafkaTopicPartitio
Map<KafkaTopicPartition, Long> partitionLag = getRecordLagPerPartitionInLatestSequences(getHighestCurrentOffsets());
return new KafkaSupervisorReportPayload(
spec.getDataSchema().getDataSource(),
ioConfig.getTopic(),
ioConfig.getStream(),
numPartitions,
ioConfig.getReplicas(),
ioConfig.getTaskDuration().getMillis() / 1000,
@ -204,8 +204,8 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<KafkaTopicPartitio
baseSequenceName,
null,
null,
new SeekableStreamStartSequenceNumbers<>(kafkaIoConfig.getTopic(), startPartitions, Collections.emptySet()),
new SeekableStreamEndSequenceNumbers<>(kafkaIoConfig.getTopic(), endPartitions),
new SeekableStreamStartSequenceNumbers<>(kafkaIoConfig.getStream(), startPartitions, Collections.emptySet()),
new SeekableStreamEndSequenceNumbers<>(kafkaIoConfig.getStream(), endPartitions),
kafkaIoConfig.getConsumerProperties(),
kafkaIoConfig.getPollTimeout(),
true,

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.seekablestream.extension.KafkaConfigOverrides;
import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
@ -48,11 +49,13 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
private final Map<String, Object> consumerProperties;
private final long pollTimeout;
private final KafkaConfigOverrides configOverrides;
private final boolean multiTopic;
private final String topic;
private final String topicPattern;
@JsonCreator
public KafkaSupervisorIOConfig(
@JsonProperty("topic") String topic,
@JsonProperty("topicPattern") String topicPattern,
@JsonProperty("inputFormat") InputFormat inputFormat,
@JsonProperty("replicas") Integer replicas,
@JsonProperty("taskCount") Integer taskCount,
@ -69,12 +72,11 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
@JsonProperty("lateMessageRejectionStartDateTime") DateTime lateMessageRejectionStartDateTime,
@JsonProperty("configOverrides") KafkaConfigOverrides configOverrides,
@JsonProperty("idleConfig") IdleConfig idleConfig,
@JsonProperty("multiTopic") Boolean multiTopic,
@JsonProperty("stopTaskCount") Integer stopTaskCount
)
{
super(
Preconditions.checkNotNull(topic, "topic"),
checkTopicArguments(topic, topicPattern),
inputFormat,
replicas,
taskCount,
@ -98,13 +100,26 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
);
this.pollTimeout = pollTimeout != null ? pollTimeout : DEFAULT_POLL_TIMEOUT_MILLIS;
this.configOverrides = configOverrides;
this.multiTopic = multiTopic != null ? multiTopic : DEFAULT_IS_MULTI_TOPIC;
this.topic = topic;
this.topicPattern = topicPattern;
}
/**
* Only used in testing or serialization/deserialization
*/
@JsonProperty
public String getTopic()
{
return getStream();
return topic;
}
/**
* Only used in testing or serialization/deserialization
*/
@JsonProperty
public String getTopicPattern()
{
return topicPattern;
}
@JsonProperty
@ -131,10 +146,9 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
return configOverrides;
}
@JsonProperty
public boolean isMultiTopic()
{
return multiTopic;
return topicPattern != null;
}
@Override
@ -142,6 +156,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
{
return "KafkaSupervisorIOConfig{" +
"topic='" + getTopic() + '\'' +
"topicPattern='" + getTopicPattern() + '\'' +
", replicas=" + getReplicas() +
", taskCount=" + getTaskCount() +
", taskDuration=" + getTaskDuration() +
@ -157,7 +172,23 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
", lateMessageRejectionStartDateTime=" + getLateMessageRejectionStartDateTime() +
", configOverrides=" + getConfigOverrides() +
", idleConfig=" + getIdleConfig() +
", stopTaskCount=" + getStopTaskCount() +
'}';
}
private static String checkTopicArguments(String topic, String topicPattern)
{
if (topic == null && topicPattern == null) {
throw InvalidInput.exception("Either topic or topicPattern must be specified");
}
if (topic != null && topicPattern != null) {
throw InvalidInput.exception(
"Only one of topic [%s] or topicPattern [%s] must be specified",
topic,
topicPattern
);
}
return topic != null ? topic : topicPattern;
}
}

View File

@ -114,7 +114,7 @@ public class KafkaSupervisorSpec extends SeekableStreamSupervisorSpec
@Override
public String getSource()
{
return getIoConfig() != null ? getIoConfig().getTopic() : null;
return getIoConfig() != null ? getIoConfig().getStream() : null;
}
@Override

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.apache.curator.test.TestingCluster;
import org.apache.druid.data.input.kafka.KafkaRecordEntity;
import org.apache.druid.data.input.kafka.KafkaTopicPartition;
@ -57,6 +58,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
public class KafkaRecordSupplierTest
@ -67,6 +69,9 @@ public class KafkaRecordSupplierTest
private static final int POLL_RETRY = 5;
private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper();
private static KafkaTopicPartition PARTITION_0 = new KafkaTopicPartition(false, null, 0);
private static KafkaTopicPartition PARTITION_1 = new KafkaTopicPartition(false, null, 1);
private static String TOPIC = "topic";
private static int TOPIC_POS_FIX = 0;
private static TestingCluster ZK_SERVER;
@ -78,21 +83,21 @@ public class KafkaRecordSupplierTest
private static List<ProducerRecord<byte[], byte[]>> generateRecords(String topic)
{
return ImmutableList.of(
new ProducerRecord<>(TOPIC, 0, null, jb("2008", "a", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(TOPIC, 0, null, jb("2009", "b", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(TOPIC, 0, null, jb("2010", "c", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(TOPIC, 0, null, jb("2011", "d", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(TOPIC, 0, null, jb("2011", "e", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(TOPIC, 0, null, jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")),
new ProducerRecord<>(TOPIC, 0, null, StringUtils.toUtf8("unparseable")),
new ProducerRecord<>(TOPIC, 0, null, StringUtils.toUtf8("unparseable2")),
new ProducerRecord<>(TOPIC, 0, null, null),
new ProducerRecord<>(TOPIC, 0, null, jb("2013", "f", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(TOPIC, 0, null, jb("2049", "f", "y", "notanumber", "20.0", "1.0")),
new ProducerRecord<>(TOPIC, 1, null, jb("2049", "f", "y", "10", "notanumber", "1.0")),
new ProducerRecord<>(TOPIC, 1, null, jb("2049", "f", "y", "10", "20.0", "notanumber")),
new ProducerRecord<>(TOPIC, 1, null, jb("2012", "g", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(TOPIC, 1, null, jb("2011", "h", "y", "10", "20.0", "1.0"))
new ProducerRecord<>(topic, 0, null, jb("2008", "a", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2009", "b", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2010", "c", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2011", "d", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2011", "e", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("unparseable")),
new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("unparseable2")),
new ProducerRecord<>(topic, 0, null, null),
new ProducerRecord<>(topic, 0, null, jb("2013", "f", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2049", "f", "y", "notanumber", "20.0", "1.0")),
new ProducerRecord<>(topic, 1, null, jb("2049", "f", "y", "10", "notanumber", "1.0")),
new ProducerRecord<>(topic, 1, null, jb("2049", "f", "y", "10", "20.0", "notanumber")),
new ProducerRecord<>(topic, 1, null, jb("2012", "g", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 1, null, jb("2011", "h", "y", "10", "20.0", "1.0"))
);
}
@ -229,8 +234,8 @@ public class KafkaRecordSupplierTest
insertData();
Set<StreamPartition<KafkaTopicPartition>> partitions = ImmutableSet.of(
StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)),
StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 1))
StreamPartition.of(TOPIC, PARTITION_0),
StreamPartition.of(TOPIC, PARTITION_1)
);
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
@ -241,12 +246,40 @@ public class KafkaRecordSupplierTest
recordSupplier.assign(partitions);
Assert.assertEquals(partitions, recordSupplier.getAssignment());
Assert.assertEquals(ImmutableSet.of(new KafkaTopicPartition(false, TOPIC, 0), new KafkaTopicPartition(false, TOPIC, 1)),
Assert.assertEquals(ImmutableSet.of(PARTITION_0, PARTITION_1),
recordSupplier.getPartitionIds(TOPIC));
recordSupplier.close();
}
@Test
public void testMultiTopicSupplierSetup() throws ExecutionException, InterruptedException
{
// Insert data into TOPIC
insertData();
// Insert data into other topic
String otherTopic = nextTopicName();
records = generateRecords(otherTopic);
insertData();
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, true);
String stream = Pattern.quote(TOPIC) + "|" + Pattern.quote(otherTopic);
Set<KafkaTopicPartition> partitions = recordSupplier.getPartitionIds(stream);
Set<KafkaTopicPartition> diff = Sets.difference(
ImmutableSet.of(
new KafkaTopicPartition(true, TOPIC, 0),
new KafkaTopicPartition(true, TOPIC, 1),
new KafkaTopicPartition(true, otherTopic, 0),
new KafkaTopicPartition(true, otherTopic, 1)
),
partitions
);
Assert.assertEquals(diff.toString(), 0, diff.size());
}
@Test
public void testSupplierSetupCustomDeserializer() throws ExecutionException, InterruptedException
{
@ -255,8 +288,8 @@ public class KafkaRecordSupplierTest
insertData();
Set<StreamPartition<KafkaTopicPartition>> partitions = ImmutableSet.of(
StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)),
StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 1))
StreamPartition.of(TOPIC, PARTITION_0),
StreamPartition.of(TOPIC, PARTITION_1)
);
Map<String, Object> properties = KAFKA_SERVER.consumerProperties();
@ -275,7 +308,7 @@ public class KafkaRecordSupplierTest
recordSupplier.assign(partitions);
Assert.assertEquals(partitions, recordSupplier.getAssignment());
Assert.assertEquals(ImmutableSet.of(new KafkaTopicPartition(false, TOPIC, 0), new KafkaTopicPartition(false, TOPIC, 1)),
Assert.assertEquals(ImmutableSet.of(PARTITION_0, PARTITION_1),
recordSupplier.getPartitionIds(TOPIC));
recordSupplier.close();
@ -329,8 +362,8 @@ public class KafkaRecordSupplierTest
insertData();
Set<StreamPartition<KafkaTopicPartition>> partitions = ImmutableSet.of(
StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)),
StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 1))
StreamPartition.of(TOPIC, PARTITION_0),
StreamPartition.of(TOPIC, PARTITION_1)
);
Map<String, Object> properties = KAFKA_SERVER.consumerProperties();
@ -371,8 +404,8 @@ public class KafkaRecordSupplierTest
insertData();
Set<StreamPartition<KafkaTopicPartition>> partitions = ImmutableSet.of(
StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)),
StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 1))
StreamPartition.of(TOPIC, PARTITION_0),
StreamPartition.of(TOPIC, PARTITION_1)
);
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
@ -424,8 +457,8 @@ public class KafkaRecordSupplierTest
}
Set<StreamPartition<KafkaTopicPartition>> partitions = ImmutableSet.of(
StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)),
StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 1))
StreamPartition.of(TOPIC, PARTITION_0),
StreamPartition.of(TOPIC, PARTITION_1)
);
@ -491,12 +524,12 @@ public class KafkaRecordSupplierTest
// Insert data
insertData();
StreamPartition<KafkaTopicPartition> partition0 = StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0));
StreamPartition<KafkaTopicPartition> partition1 = StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 1));
StreamPartition<KafkaTopicPartition> partition0 = StreamPartition.of(TOPIC, PARTITION_0);
StreamPartition<KafkaTopicPartition> partition1 = StreamPartition.of(TOPIC, PARTITION_1);
Set<StreamPartition<KafkaTopicPartition>> partitions = ImmutableSet.of(
StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)),
StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 1))
StreamPartition.of(TOPIC, PARTITION_0),
StreamPartition.of(TOPIC, PARTITION_1)
);
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
@ -534,12 +567,12 @@ public class KafkaRecordSupplierTest
// Insert data
insertData();
StreamPartition<KafkaTopicPartition> partition0 = StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0));
StreamPartition<KafkaTopicPartition> partition1 = StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 1));
StreamPartition<KafkaTopicPartition> partition0 = StreamPartition.of(TOPIC, PARTITION_0);
StreamPartition<KafkaTopicPartition> partition1 = StreamPartition.of(TOPIC, PARTITION_1);
Set<StreamPartition<KafkaTopicPartition>> partitions = ImmutableSet.of(
StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)),
StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 1))
StreamPartition.of(TOPIC, PARTITION_0),
StreamPartition.of(TOPIC, PARTITION_1)
);
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
@ -568,11 +601,11 @@ public class KafkaRecordSupplierTest
}
}
StreamPartition<KafkaTopicPartition> partition0 = StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0));
StreamPartition<KafkaTopicPartition> partition1 = StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 1));
StreamPartition<KafkaTopicPartition> partition0 = StreamPartition.of(TOPIC, PARTITION_0);
StreamPartition<KafkaTopicPartition> partition1 = StreamPartition.of(TOPIC, PARTITION_1);
Set<StreamPartition<KafkaTopicPartition>> partitions = ImmutableSet.of(
StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0))
StreamPartition.of(TOPIC, PARTITION_0)
);
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
@ -593,12 +626,12 @@ public class KafkaRecordSupplierTest
// Insert data
insertData();
StreamPartition<KafkaTopicPartition> partition0 = StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0));
StreamPartition<KafkaTopicPartition> partition1 = StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 1));
StreamPartition<KafkaTopicPartition> partition0 = StreamPartition.of(TOPIC, PARTITION_0);
StreamPartition<KafkaTopicPartition> partition1 = StreamPartition.of(TOPIC, PARTITION_1);
Set<StreamPartition<KafkaTopicPartition>> partitions = ImmutableSet.of(
StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)),
StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 1))
StreamPartition.of(TOPIC, PARTITION_0),
StreamPartition.of(TOPIC, PARTITION_1)
);
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
@ -638,7 +671,7 @@ public class KafkaRecordSupplierTest
{
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false);
StreamPartition<KafkaTopicPartition> streamPartition = StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0));
StreamPartition<KafkaTopicPartition> streamPartition = StreamPartition.of(TOPIC, PARTITION_0);
Set<StreamPartition<KafkaTopicPartition>> partitions = ImmutableSet.of(streamPartition);
recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions);
@ -650,7 +683,7 @@ public class KafkaRecordSupplierTest
{
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false);
StreamPartition<KafkaTopicPartition> streamPartition = StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0));
StreamPartition<KafkaTopicPartition> streamPartition = StreamPartition.of(TOPIC, PARTITION_0);
Set<StreamPartition<KafkaTopicPartition>> partitions = ImmutableSet.of(streamPartition);
recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions);
@ -662,7 +695,7 @@ public class KafkaRecordSupplierTest
{
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false);
StreamPartition<KafkaTopicPartition> streamPartition = StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0));
StreamPartition<KafkaTopicPartition> streamPartition = StreamPartition.of(TOPIC, PARTITION_0);
Set<StreamPartition<KafkaTopicPartition>> partitions = ImmutableSet.of(streamPartition);
recordSupplier.assign(partitions);
recordSupplier.seekToLatest(partitions);
@ -674,7 +707,7 @@ public class KafkaRecordSupplierTest
{
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false);
StreamPartition<KafkaTopicPartition> streamPartition = StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0));
StreamPartition<KafkaTopicPartition> streamPartition = StreamPartition.of(TOPIC, PARTITION_0);
Set<StreamPartition<KafkaTopicPartition>> partitions = ImmutableSet.of(streamPartition);
recordSupplier.assign(partitions);
recordSupplier.seekToLatest(partitions);

View File

@ -72,6 +72,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
{
@ -163,6 +164,60 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
null,
new KafkaSupervisorIOConfig(
TOPIC,
null,
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
null,
null,
null,
kafkaServer.consumerProperties(),
null,
null,
null,
null,
true,
null,
null,
null,
null,
null,
null,
null
),
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec(
supervisorSpec,
new SamplerConfig(5, 5_000, null, null),
new InputSourceSampler(OBJECT_MAPPER),
OBJECT_MAPPER
);
runSamplerAndCompareResponse(samplerSpec, true);
}
@Test
public void testSampleWithTopicPattern()
{
insertData(generateRecords(TOPIC));
KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpec(
null,
DATA_SCHEMA,
null,
new KafkaSupervisorIOConfig(
null,
Pattern.quote(TOPIC),
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
null,
null,
@ -179,7 +234,6 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
null,
null,
null,
false,
null
),
null,
@ -216,6 +270,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
null,
new KafkaSupervisorIOConfig(
TOPIC,
null,
new KafkaInputFormat(
null,
null,
@ -240,7 +295,6 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
null,
null,
null,
false,
null
),
null,
@ -331,6 +385,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
null,
null,
null,
null,
kafkaServer.consumerProperties(),
null,
null,
@ -343,7 +398,6 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
null,
null,
null,
false,
null
),
null,
@ -508,6 +562,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
null,
new KafkaSupervisorIOConfig(
TOPIC,
null,
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
null,
null,
@ -527,7 +582,6 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
null,
null,
null,
false,
null
),
null,
@ -564,6 +618,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
null,
new KafkaSupervisorIOConfig(
TOPIC,
null,
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
null,
null,
@ -583,7 +638,6 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
null,
null,
null,
false,
null
),
null,

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
import org.apache.druid.indexing.kafka.KafkaRecordSupplier;
@ -75,6 +76,7 @@ public class KafkaSupervisorIOConfigTest
);
Assert.assertEquals("my-topic", config.getTopic());
Assert.assertNull(config.getTopicPattern());
Assert.assertEquals(1, (int) config.getReplicas());
Assert.assertEquals(1, (int) config.getTaskCount());
Assert.assertEquals(Duration.standardMinutes(60), config.getTaskDuration());
@ -89,6 +91,28 @@ public class KafkaSupervisorIOConfigTest
Assert.assertFalse("lateMessageRejectionStartDateTime", config.getLateMessageRejectionStartDateTime().isPresent());
}
@Test
public void testSerdeWithTopicPattern() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"topicPattern\": \"my-topic.*\",\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"}\n"
+ "}";
KafkaSupervisorIOConfig config = mapper.readValue(
mapper.writeValueAsString(
mapper.readValue(
jsonStr,
KafkaSupervisorIOConfig.class
)
), KafkaSupervisorIOConfig.class
);
Assert.assertEquals("my-topic.*", config.getTopicPattern());
Assert.assertNull(config.getTopic());
}
@Test
public void testSerdeWithNonDefaultsWithLateMessagePeriod() throws Exception
{
@ -118,6 +142,7 @@ public class KafkaSupervisorIOConfigTest
);
Assert.assertEquals("my-topic", config.getTopic());
Assert.assertNull(config.getTopicPattern());
Assert.assertEquals(3, (int) config.getReplicas());
Assert.assertEquals(9, (int) config.getTaskCount());
Assert.assertEquals(Duration.standardMinutes(30), config.getTaskDuration());
@ -160,6 +185,7 @@ public class KafkaSupervisorIOConfigTest
);
Assert.assertEquals("my-topic", config.getTopic());
Assert.assertNull(config.getTopicPattern());
Assert.assertEquals(3, (int) config.getReplicas());
Assert.assertEquals(9, (int) config.getTaskCount());
Assert.assertEquals(Duration.standardMinutes(30), config.getTaskDuration());
@ -189,6 +215,7 @@ public class KafkaSupervisorIOConfigTest
KafkaRecordSupplier.addConsumerPropertiesFromConfig(props, mapper, config.getConsumerProperties());
Assert.assertEquals("my-topic", config.getTopic());
Assert.assertNull(config.getTopicPattern());
Assert.assertEquals("localhost:9092", props.getProperty("bootstrap.servers"));
Assert.assertEquals("mytruststorepassword", props.getProperty("ssl.truststore.password"));
Assert.assertEquals("mykeystorepassword", props.getProperty("ssl.keystore.password"));
@ -204,8 +231,8 @@ public class KafkaSupervisorIOConfigTest
+ "}";
exception.expect(JsonMappingException.class);
exception.expectCause(CoreMatchers.isA(NullPointerException.class));
exception.expectMessage(CoreMatchers.containsString("topic"));
exception.expectCause(CoreMatchers.isA(DruidException.class));
exception.expectMessage(CoreMatchers.containsString("Either topic or topicPattern must be specified"));
mapper.readValue(jsonStr, KafkaSupervisorIOConfig.class);
}
@ -293,6 +320,7 @@ public class KafkaSupervisorIOConfigTest
KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig(
"test",
null,
null,
1,
1,
new Period("PT1H"),
@ -308,7 +336,6 @@ public class KafkaSupervisorIOConfigTest
null,
null,
null,
false,
null
);
String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig);
@ -336,6 +363,7 @@ public class KafkaSupervisorIOConfigTest
KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig(
"test",
null,
null,
1,
1,
new Period("PT1H"),
@ -351,7 +379,6 @@ public class KafkaSupervisorIOConfigTest
null,
null,
mapper.convertValue(idleConfig, IdleConfig.class),
false,
null
);
String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig);

View File

@ -129,6 +129,7 @@ public class KafkaSupervisorSpecTest
Assert.assertEquals(4, spec.getDataSchema().getAggregators().length);
Assert.assertNotNull(spec.getIoConfig());
Assert.assertEquals("metrics", spec.getIoConfig().getTopic());
Assert.assertNull(spec.getIoConfig().getTopicPattern());
Assert.assertNotNull(spec.getTuningConfig());
Assert.assertNull(spec.getContext());
Assert.assertFalse(spec.isSuspended());
@ -147,6 +148,85 @@ public class KafkaSupervisorSpecTest
Assert.assertEquals(serialized, stable);
}
@Test
public void testSerdeWithTopicPattern() throws IOException
{
String json = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"dataSchema\": {\n"
+ " \"dataSource\": \"metrics-kafka\",\n"
+ " \"parser\": {\n"
+ " \"type\": \"string\",\n"
+ " \"parseSpec\": {\n"
+ " \"format\": \"json\",\n"
+ " \"timestampSpec\": {\n"
+ " \"column\": \"timestamp\",\n"
+ " \"format\": \"auto\"\n"
+ " },\n"
+ " \"dimensionsSpec\": {\n"
+ " \"dimensions\": [],\n"
+ " \"dimensionExclusions\": [\n"
+ " \"timestamp\",\n"
+ " \"value\"\n"
+ " ]\n"
+ " }\n"
+ " }\n"
+ " },\n"
+ " \"metricsSpec\": [\n"
+ " {\n"
+ " \"name\": \"count\",\n"
+ " \"type\": \"count\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"value_sum\",\n"
+ " \"fieldName\": \"value\",\n"
+ " \"type\": \"doubleSum\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"value_min\",\n"
+ " \"fieldName\": \"value\",\n"
+ " \"type\": \"doubleMin\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"value_max\",\n"
+ " \"fieldName\": \"value\",\n"
+ " \"type\": \"doubleMax\"\n"
+ " }\n"
+ " ],\n"
+ " \"granularitySpec\": {\n"
+ " \"type\": \"uniform\",\n"
+ " \"segmentGranularity\": \"HOUR\",\n"
+ " \"queryGranularity\": \"NONE\"\n"
+ " }\n"
+ " },\n"
+ " \"ioConfig\": {\n"
+ " \"topicPattern\": \"metrics.*\",\n"
+ " \"consumerProperties\": {\n"
+ " \"bootstrap.servers\": \"localhost:9092\"\n"
+ " },\n"
+ " \"taskCount\": 1\n"
+ " }\n"
+ "}";
KafkaSupervisorSpec spec = mapper.readValue(json, KafkaSupervisorSpec.class);
Assert.assertNotNull(spec);
Assert.assertNotNull(spec.getDataSchema());
Assert.assertEquals("metrics.*", spec.getIoConfig().getTopicPattern());
Assert.assertNull(spec.getIoConfig().getTopic());
Assert.assertNotNull(spec.getTuningConfig());
Assert.assertNull(spec.getContext());
String serialized = mapper.writeValueAsString(spec);
// expect default values populated in reserialized string
Assert.assertTrue(serialized.contains("\"topicPattern\":\"metrics.*\""));
Assert.assertTrue(serialized, serialized.contains("\"topic\":null"));
KafkaSupervisorSpec spec2 = mapper.readValue(serialized, KafkaSupervisorSpec.class);
String stable = mapper.writeValueAsString(spec2);
Assert.assertEquals(serialized, stable);
}
@Test
public void testSerdeWithInputFormat() throws IOException
{
@ -215,6 +295,7 @@ public class KafkaSupervisorSpecTest
Assert.assertEquals(4, spec.getDataSchema().getAggregators().length);
Assert.assertNotNull(spec.getIoConfig());
Assert.assertEquals("metrics", spec.getIoConfig().getTopic());
Assert.assertNull(spec.getIoConfig().getTopicPattern());
Assert.assertNotNull(spec.getTuningConfig());
Assert.assertNull(spec.getContext());
Assert.assertFalse(spec.isSuspended());
@ -301,6 +382,7 @@ public class KafkaSupervisorSpecTest
Assert.assertEquals(4, spec.getDataSchema().getAggregators().length);
Assert.assertNotNull(spec.getIoConfig());
Assert.assertEquals("metrics", spec.getIoConfig().getTopic());
Assert.assertNull(spec.getIoConfig().getTopicPattern());
Assert.assertNotNull(spec.getTuningConfig());
Assert.assertNull(spec.getContext());
Assert.assertFalse(spec.isSuspended());
@ -389,6 +471,7 @@ public class KafkaSupervisorSpecTest
Assert.assertEquals(4, spec.getDataSchema().getAggregators().length);
Assert.assertNotNull(spec.getIoConfig());
Assert.assertEquals("metrics", spec.getIoConfig().getTopic());
Assert.assertNull(spec.getIoConfig().getTopicPattern());
Assert.assertNotNull(spec.getTuningConfig());
Assert.assertNull(spec.getContext());
Assert.assertFalse(spec.isSuspended());
@ -473,6 +556,7 @@ public class KafkaSupervisorSpecTest
Assert.assertEquals(4, spec.getDataSchema().getAggregators().length);
Assert.assertNotNull(spec.getIoConfig());
Assert.assertEquals("metrics", spec.getIoConfig().getTopic());
Assert.assertNull(spec.getIoConfig().getTopicPattern());
Assert.assertNotNull(spec.getTuningConfig());
Assert.assertNull(spec.getContext());
Assert.assertFalse(spec.isSuspended());

View File

@ -293,6 +293,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig(
topic,
null,
INPUT_FORMAT,
replicas,
1,
@ -309,7 +310,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
null,
new IdleConfig(true, 1000L),
false,
1
);
@ -4501,6 +4501,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
consumerProperties.put("bootstrap.servers", kafkaHost);
KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig(
topic,
null,
INPUT_FORMAT,
replicas,
taskCount,
@ -4517,7 +4518,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
null,
idleConfig,
false,
null
);
@ -4614,6 +4614,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
consumerProperties.put("isolation.level", "read_committed");
KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig(
topic,
null,
INPUT_FORMAT,
replicas,
taskCount,
@ -4630,7 +4631,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
null,
null,
false,
null
);
@ -4731,6 +4731,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
consumerProperties.put("isolation.level", "read_committed");
KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig(
topic,
null,
INPUT_FORMAT,
replicas,
taskCount,
@ -4747,7 +4748,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
null,
null,
false,
null
);

View File

@ -328,6 +328,7 @@ gzip
gzipped
hadoop
hasher
hashcode
hashtable
high-QPS
historicals