Kafka with topicPattern can ignore old offsets spuriously (#16190)

* * fix

* * simplify

* * simplify tests

* * update matches function definition for Kafka Datasource Metadata

* * add matchesOld

* * override matches and plus for kafka based metadata / sequence numbers

* * implement minus
* add tests

* * fix failing tests

* * remove TODO comments

* * simplfy and add comments

* * remove unused variable in tests

* * remove unneeded function

* * add serde tests

* * more stuff

* * address review comments

* * remove unneeded code.
This commit is contained in:
zachjsh 2024-04-17 10:00:17 -04:00 committed by GitHub
parent 0bf5e7745d
commit 2351f038eb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 1802 additions and 72 deletions

View File

@ -26,19 +26,41 @@ import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.utils.CollectionUtils;
import org.apache.kafka.common.TopicPartition;
import java.util.Comparator;
import java.util.Map;
public class KafkaDataSourceMetadata extends SeekableStreamDataSourceMetadata<KafkaTopicPartition, Long> implements Comparable<KafkaDataSourceMetadata>
{
private static final Logger LOGGER = new Logger(KafkaDataSourceMetadata.class);
@JsonCreator
public KafkaDataSourceMetadata(
@JsonProperty("partitions") SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> kafkaPartitions
)
{
super(kafkaPartitions);
super(kafkaPartitions == null
? null
: kafkaPartitions instanceof SeekableStreamStartSequenceNumbers
?
new KafkaSeekableStreamStartSequenceNumbers(
kafkaPartitions.getStream(),
((SeekableStreamStartSequenceNumbers<KafkaTopicPartition, Long>) kafkaPartitions).getTopic(),
kafkaPartitions.getPartitionSequenceNumberMap(),
((SeekableStreamStartSequenceNumbers<KafkaTopicPartition, Long>) kafkaPartitions).getPartitionOffsetMap(),
((SeekableStreamStartSequenceNumbers<KafkaTopicPartition, Long>) kafkaPartitions).getExclusivePartitions()
)
: new KafkaSeekableStreamEndSequenceNumbers(
kafkaPartitions.getStream(),
((SeekableStreamEndSequenceNumbers<KafkaTopicPartition, Long>) kafkaPartitions).getTopic(),
kafkaPartitions.getPartitionSequenceNumberMap(),
((SeekableStreamEndSequenceNumbers<KafkaTopicPartition, Long>) kafkaPartitions).getPartitionOffsetMap()
));
}
@Override
@ -76,4 +98,71 @@ public class KafkaDataSourceMetadata extends SeekableStreamDataSourceMetadata<Ka
}
return getSeekableStreamSequenceNumbers().compareTo(other.getSeekableStreamSequenceNumbers(), Comparator.naturalOrder());
}
@Override
public boolean matches(DataSourceMetadata other)
{
if (!getClass().equals(other.getClass())) {
return false;
}
KafkaDataSourceMetadata thisPlusOther = (KafkaDataSourceMetadata) plus(other);
if (thisPlusOther.equals(other.plus(this))) {
return true;
}
// check that thisPlusOther contains all metadata from other, and that there is no inconsistency or loss
KafkaDataSourceMetadata otherMetadata = (KafkaDataSourceMetadata) other;
final SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> otherSequenceNumbers = otherMetadata.getSeekableStreamSequenceNumbers();
if (!getSeekableStreamSequenceNumbers().isMultiTopicPartition() && !otherSequenceNumbers.isMultiTopicPartition()) {
return false;
}
final SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> mergedSequenceNumbers = thisPlusOther.getSeekableStreamSequenceNumbers();
final Map<TopicPartition, Long> topicAndPartitionToSequenceNumber = CollectionUtils.mapKeys(
mergedSequenceNumbers.getPartitionSequenceNumberMap(),
k -> k.asTopicPartition(mergedSequenceNumbers.getStream())
);
boolean allOtherFoundAndConsistent = otherSequenceNumbers.getPartitionSequenceNumberMap().entrySet().stream().noneMatch(
e -> {
KafkaTopicPartition kafkaTopicPartition = e.getKey();
TopicPartition topicPartition = kafkaTopicPartition.asTopicPartition(otherSequenceNumbers.getStream());
Long sequenceOffset = topicAndPartitionToSequenceNumber.get(topicPartition);
long oldSequenceOffset = e.getValue();
if (sequenceOffset == null || !sequenceOffset.equals(oldSequenceOffset)) {
LOGGER.info(
"sequenceOffset found for currently computed and stored metadata does not match for "
+ "topicPartition: [%s]. currentSequenceOffset: [%s], oldSequenceOffset: [%s]",
topicPartition,
sequenceOffset,
oldSequenceOffset
);
return true;
}
return false;
}
);
boolean allThisFoundAndConsistent = this.getSeekableStreamSequenceNumbers().getPartitionSequenceNumberMap().entrySet().stream().noneMatch(
e -> {
KafkaTopicPartition kafkaTopicPartition = e.getKey();
TopicPartition topicPartition = kafkaTopicPartition.asTopicPartition(this.getSeekableStreamSequenceNumbers().getStream());
Long oldSequenceOffset = topicAndPartitionToSequenceNumber.get(topicPartition);
long sequenceOffset = e.getValue();
if (oldSequenceOffset == null || !oldSequenceOffset.equals(sequenceOffset)) {
LOGGER.info(
"sequenceOffset found for currently computed and stored metadata does not match for "
+ "topicPartition: [%s]. currentSequenceOffset: [%s], oldSequenceOffset: [%s]",
topicPartition,
sequenceOffset,
oldSequenceOffset
);
return true;
}
return false;
}
);
return allOtherFoundAndConsistent && allThisFoundAndConsistent;
}
}

View File

@ -0,0 +1,179 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.kafka;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.druid.data.input.kafka.KafkaTopicPartition;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
import org.apache.druid.utils.CollectionUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* Represents the kafka based end sequenceNumber per partition of a sequence. This class is needed because
* of special handling that must be done for multi-topic partitions to ensure that offsets are preserved.
* <p>
* Do not register this class as a subtype of base class in Jackson. We want this class to be serialized
* when written to DB as a {@link SeekableStreamEndSequenceNumbers}. Do not create instances of this class
* directly from jackson mapper.
*/
@JsonTypeName(SeekableStreamEndSequenceNumbers.TYPE)
public class KafkaSeekableStreamEndSequenceNumbers extends SeekableStreamEndSequenceNumbers<KafkaTopicPartition, Long>
{
private final boolean isMultiTopicPartition;
public KafkaSeekableStreamEndSequenceNumbers(
final String stream,
// kept for backward compatibility
final String topic,
final Map<KafkaTopicPartition, Long> partitionSequenceNumberMap,
// kept for backward compatibility
final Map<KafkaTopicPartition, Long> partitionOffsetMap
)
{
super(stream, topic, partitionSequenceNumberMap, partitionOffsetMap);
// how to know it topicPattern if the partitionSequenceNumberMap is empty?
isMultiTopicPartition = !partitionSequenceNumberMap.isEmpty() && partitionSequenceNumberMap.keySet()
.stream()
.findFirst()
.get()
.isMultiTopicPartition();
}
@Override
public boolean isMultiTopicPartition()
{
return isMultiTopicPartition;
}
@Override
public SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> plus(
SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> other
)
{
validateSequenceNumbersBaseType(other);
KafkaSeekableStreamEndSequenceNumbers that = (KafkaSeekableStreamEndSequenceNumbers) other;
if (!this.isMultiTopicPartition() && !that.isMultiTopicPartition()) {
return super.plus(other);
}
String thisTopic = getStream();
String thatTopic = that.getStream();
final Map<KafkaTopicPartition, Long> newMap;
if (!isMultiTopicPartition()) {
// going from topicPattern to single topic
// start with existing sequence numbers which in this case will be all single topic.
newMap = new HashMap<>(getPartitionSequenceNumberMap());
// add all sequence numbers from other where the topic name matches this topic. Transform to single topic
// as in this case we will be returning a single topic based sequence map.
newMap.putAll(that.getPartitionSequenceNumberMap().entrySet().stream()
.filter(e -> {
if (e.getKey().topic().isPresent()) {
return e.getKey().topic().get().equals(thisTopic);
} else {
// this branch shouldn't really be hit since other should be multi-topic here, but adding this
// just in case.
return thatTopic.equals(thisTopic);
}
})
.collect(Collectors.toMap(
e -> new KafkaTopicPartition(false, thisTopic, e.getKey().partition()),
Map.Entry::getValue
)));
} else {
// going from single topic or topicPattern to topicPattern
// start with existing sequence numbers and transform them to multit-topic keys, as the returned
// sequence numbers will be multi-topic based.
newMap = CollectionUtils.mapKeys(
getPartitionSequenceNumberMap(),
k -> new KafkaTopicPartition(
true,
k.asTopicPartition(thisTopic).topic(),
k.partition()
)
);
// add all sequence numbers from other where the topic name matches the pattern of this topic regex. Transform to
// multi-topic as in this case we will be returning a multi-topic based sequence map.
Pattern pattern = Pattern.compile(thisTopic);
newMap.putAll(that.getPartitionSequenceNumberMap().entrySet().stream()
.filter(e -> {
if (e.getKey().topic().isPresent()) {
return pattern.matcher(e.getKey().topic().get()).matches();
} else {
return pattern.matcher(thatTopic).matches();
}
})
.collect(Collectors.toMap(
e -> new KafkaTopicPartition(true, e.getKey().asTopicPartition(thatTopic).topic(), e.getKey().partition()),
Map.Entry::getValue
)));
}
return new SeekableStreamEndSequenceNumbers<>(getStream(), newMap);
}
@Override
public SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> minus(
SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> other
)
{
validateSequenceNumbersBaseType(other);
final KafkaSeekableStreamEndSequenceNumbers otherEnd =
(KafkaSeekableStreamEndSequenceNumbers) other;
if (!this.isMultiTopicPartition() && !otherEnd.isMultiTopicPartition()) {
return super.minus(other);
}
final Map<KafkaTopicPartition, Long> newMap = new HashMap<>();
//String thisTopic = getStream();
String thatTopic = otherEnd.getStream();
// remove partitions present in "that" from "this", check for exact match, multi-topic match, or single-topic match
for (Map.Entry<KafkaTopicPartition, Long> entry : getPartitionSequenceNumberMap().entrySet()) {
String thisTopic = entry.getKey().asTopicPartition(getStream()).topic();
boolean otherContainsThis = otherEnd.getPartitionSequenceNumberMap().containsKey(entry.getKey());
boolean otherContainsThisMultiTopic = otherEnd.getPartitionSequenceNumberMap()
.containsKey(new KafkaTopicPartition(true, thisTopic, entry.getKey().partition()));
boolean otherContainsThisSingleTopic = (thatTopic.equals(thisTopic) && otherEnd.getPartitionSequenceNumberMap()
.containsKey(new KafkaTopicPartition(false, null, entry.getKey().partition())));
if (!otherContainsThis && !otherContainsThisMultiTopic && !otherContainsThisSingleTopic) {
newMap.put(entry.getKey(), entry.getValue());
}
}
return new SeekableStreamEndSequenceNumbers<>(
getStream(),
newMap
);
}
}

View File

@ -0,0 +1,214 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.kafka;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.druid.data.input.kafka.KafkaTopicPartition;
import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.utils.CollectionUtils;
import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* Represents the kafka based start sequenceNumber per partition of a sequence. This class is needed because
* of special handling that must be done for multi-topic partitions to ensure that offsets are preserved.
* <p>
* Do not register this class as a subtype of base class in Jackson. We want this class to be serialized
* when written to DB as a {@link SeekableStreamStartSequenceNumbers}. Do not create instances of this class
* directly from jackson mapper.
*/
@JsonTypeName(SeekableStreamStartSequenceNumbers.TYPE)
public class KafkaSeekableStreamStartSequenceNumbers extends SeekableStreamStartSequenceNumbers<KafkaTopicPartition, Long>
{
private final boolean isMultiTopicPartition;
public KafkaSeekableStreamStartSequenceNumbers(
String stream,
String topic,
Map<KafkaTopicPartition, Long> partitionSequenceNumberMap,
Map<KafkaTopicPartition, Long> partitionOffsetMap,
@Nullable Set<KafkaTopicPartition> exclusivePartitions
)
{
super(stream, topic, partitionSequenceNumberMap, partitionOffsetMap, exclusivePartitions);
// how to know it topicPattern if the partitionSequenceNumberMap is empty?
isMultiTopicPartition = !partitionSequenceNumberMap.isEmpty() && partitionSequenceNumberMap.keySet()
.stream()
.findFirst()
.get()
.isMultiTopicPartition();
}
@Override
public boolean isMultiTopicPartition()
{
return isMultiTopicPartition;
}
@Override
public SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> plus(
SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> other
)
{
validateSequenceNumbersBaseType(other);
KafkaSeekableStreamStartSequenceNumbers that = (KafkaSeekableStreamStartSequenceNumbers) other;
if (!this.isMultiTopicPartition() && !that.isMultiTopicPartition()) {
return super.plus(other);
}
String thisTopic = getStream();
String thatTopic = that.getStream();
final Map<KafkaTopicPartition, Long> newMap;
final Set<KafkaTopicPartition> newExclusivePartitions = new HashSet<>();
if (!isMultiTopicPartition()) {
// going from topicPattern to single topic
// start with existing sequence numbers which in this case will be all single topic.
newMap = new HashMap<>(getPartitionSequenceNumberMap());
// add all sequence numbers from other where the topic name matches this topic. Transform to single topic
// as in this case we will be returning a single topic based sequence map.
newMap.putAll(that.getPartitionSequenceNumberMap().entrySet().stream()
.filter(e -> {
if (e.getKey().topic().isPresent()) {
return e.getKey().topic().get().equals(thisTopic);
} else {
// this branch shouldn't really be hit since other should be multi-topic here, but adding this
// just in case.
return thatTopic.equals(thisTopic);
}
})
.collect(Collectors.toMap(
e -> new KafkaTopicPartition(false, thisTopic, e.getKey().partition()),
Map.Entry::getValue
)));
// A partition is exclusive if it's
// 1) exclusive in "this" and it's not in "other"'s partitionSequenceNumberMap or
// 2) exclusive in "other"
getPartitionSequenceNumberMap().forEach(
(partitionId, sequenceOffset) -> {
KafkaTopicPartition multiTopicPartitonIdToSearch = new KafkaTopicPartition(true, thisTopic, partitionId.partition());
if (getExclusivePartitions().contains(partitionId) && !that.getPartitionSequenceNumberMap().containsKey(multiTopicPartitonIdToSearch)) {
newExclusivePartitions.add(new KafkaTopicPartition(false, this.getStream(), partitionId.partition()));
}
}
);
newExclusivePartitions.addAll(that.getExclusivePartitions());
} else {
// going from single topic or topicPattern to topicPattern
// start with existing sequence numbers and transform them to multit-topic keys, as the returned
// sequence numbers will be multi-topic based.
newMap = CollectionUtils.mapKeys(
getPartitionSequenceNumberMap(),
k -> new KafkaTopicPartition(
true,
k.asTopicPartition(thisTopic).topic(),
k.partition()
)
);
// add all sequence numbers from other where the topic name matches the pattern of this topic regex. Transform to
// multi-topic as in this case we will be returning a multi-topic based sequence map.
Pattern pattern = Pattern.compile(thisTopic);
newMap.putAll(that.getPartitionSequenceNumberMap().entrySet().stream()
.filter(e -> {
if (e.getKey().topic().isPresent()) {
return pattern.matcher(e.getKey().topic().get()).matches();
} else {
return pattern.matcher(thatTopic).matches();
}
})
.collect(Collectors.toMap(
e -> new KafkaTopicPartition(true, e.getKey().asTopicPartition(thatTopic).topic(), e.getKey().partition()),
Map.Entry::getValue
)));
// A partition is exclusive if it's
// 1) exclusive in "this" and it's not in "other"'s partitionSequenceNumberMap or
// 2) exclusive in "other"
getPartitionSequenceNumberMap().forEach(
(partitionId, sequenceOffset) -> {
KafkaTopicPartition multiTopicPartitonIdToSearch = new KafkaTopicPartition(true, thisTopic, partitionId.partition());
boolean thatTopicMatchesThisTopicPattern = partitionId.topic().isPresent() ? pattern.matcher(partitionId.topic().get()).matches() : pattern.matcher(thatTopic).matches();
if (getExclusivePartitions().contains(partitionId) && (!thatTopicMatchesThisTopicPattern || !that.getPartitionSequenceNumberMap().containsKey(multiTopicPartitonIdToSearch))) {
newExclusivePartitions.add(new KafkaTopicPartition(true, this.getStream(), partitionId.partition()));
}
}
);
newExclusivePartitions.addAll(that.getExclusivePartitions());
}
return new SeekableStreamStartSequenceNumbers<>(getStream(), newMap, newExclusivePartitions);
}
@Override
public SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> minus(
SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> other
)
{
validateSequenceNumbersBaseType(other);
final KafkaSeekableStreamStartSequenceNumbers otherStart =
(KafkaSeekableStreamStartSequenceNumbers) other;
if (!this.isMultiTopicPartition() && !otherStart.isMultiTopicPartition()) {
return super.minus(other);
}
final Map<KafkaTopicPartition, Long> newMap = new HashMap<>();
final Set<KafkaTopicPartition> newExclusivePartitions = new HashSet<>();
String thatTopic = otherStart.getStream();
// remove partitions present in "that" from "this", check for exact match, multi-topic match, or single-topic match
for (Map.Entry<KafkaTopicPartition, Long> entry : getPartitionSequenceNumberMap().entrySet()) {
String thisTopic = entry.getKey().asTopicPartition(getStream()).topic();
boolean otherContainsThis = otherStart.getPartitionSequenceNumberMap().containsKey(entry.getKey());
boolean otherContainsThisMultiTopic = otherStart.getPartitionSequenceNumberMap()
.containsKey(new KafkaTopicPartition(true, thisTopic, entry.getKey().partition()));
boolean otherContainsThisSingleTopic = (thatTopic.equals(thisTopic) && otherStart.getPartitionSequenceNumberMap()
.containsKey(new KafkaTopicPartition(false, null, entry.getKey().partition())));
if (!otherContainsThis && !otherContainsThisMultiTopic && !otherContainsThisSingleTopic) {
newMap.put(entry.getKey(), entry.getValue());
// A partition is exclusive if it's exclusive in "this" and not in "other"'s partitionSequenceNumberMap
if (getExclusivePartitions().contains(entry.getKey())) {
newExclusivePartitions.add(entry.getKey());
}
}
}
return new SeekableStreamStartSequenceNumbers<>(
getStream(),
newMap,
newExclusivePartitions
);
}
}

View File

@ -44,6 +44,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
@ -63,12 +64,14 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
@ -92,6 +95,7 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<KafkaTopicPartitio
private final ServiceEmitter emitter;
private final DruidMonitorSchedulerConfig monitorSchedulerConfig;
private final Pattern pattern;
private volatile Map<KafkaTopicPartition, Long> latestSequenceFromStream;
@ -122,6 +126,7 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<KafkaTopicPartitio
this.spec = spec;
this.emitter = spec.getEmitter();
this.monitorSchedulerConfig = spec.getMonitorSchedulerConfig();
this.pattern = getIoConfig().isMultiTopic() ? Pattern.compile(getIoConfig().getStream()) : null;
}
@ -444,4 +449,79 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<KafkaTopicPartitio
{
return spec.getTuningConfig();
}
protected boolean isMultiTopic()
{
return getIoConfig().isMultiTopic() && pattern != null;
}
/**
* Gets the offsets as stored in the metadata store. The map returned will only contain
* offsets from topic partitions that match the current supervisor config stream. This
* override is needed because in the case of multi-topic, a user could have updated the supervisor
* config from single topic to mult-topic, where the new multi-topic pattern regex matches the
* old config single topic. Without this override, the previously stored metadata for the single
* topic would be deemed as different from the currently configure stream, and not be included in
* the offset map returned. This implementation handles these cases appropriately.
*
* @return the previoulsy stored offsets from metadata storage, possibly updated with offsets removed
* for topics that do not match the currently configured supervisor topic. Topic partition keys may also be
* updated to single topic or multi-topic depending on the supervisor config, as needed.
*/
@Override
protected Map<KafkaTopicPartition, Long> getOffsetsFromMetadataStorage()
{
final DataSourceMetadata dataSourceMetadata = retrieveDataSourceMetadata();
if (checkSourceMetadataMatch(dataSourceMetadata)) {
@SuppressWarnings("unchecked")
SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> partitions = ((KafkaDataSourceMetadata) dataSourceMetadata)
.getSeekableStreamSequenceNumbers();
if (partitions != null && partitions.getPartitionSequenceNumberMap() != null) {
Map<KafkaTopicPartition, Long> partitionOffsets = new HashMap<>();
Set<String> topicMisMatchLogged = new HashSet<>();
partitions.getPartitionSequenceNumberMap().forEach((kafkaTopicPartition, value) -> {
final String matchValue;
// previous offsets are from multi-topic config
if (kafkaTopicPartition.topic().isPresent()) {
matchValue = kafkaTopicPartition.topic().get();
} else {
// previous offsets are from single topic config
matchValue = partitions.getStream();
}
KafkaTopicPartition matchingTopicPartition = getMatchingKafkaTopicPartition(kafkaTopicPartition, matchValue);
if (matchingTopicPartition == null && !topicMisMatchLogged.contains(matchValue)) {
log.warn(
"Topic/stream in metadata storage [%s] doesn't match spec topic/stream [%s], ignoring stored sequences",
matchValue,
getIoConfig().getStream()
);
topicMisMatchLogged.add(matchValue);
}
if (matchingTopicPartition != null) {
partitionOffsets.put(matchingTopicPartition, value);
}
});
return partitionOffsets;
}
}
return Collections.emptyMap();
}
@Nullable
private KafkaTopicPartition getMatchingKafkaTopicPartition(
final KafkaTopicPartition kafkaTopicPartition,
final String streamMatchValue
)
{
final boolean match;
match = pattern != null
? pattern.matcher(streamMatchValue).matches()
: getIoConfig().getStream().equals(streamMatchValue);
return match ? new KafkaTopicPartition(isMultiTopic(), streamMatchValue, kafkaTopicPartition.partition()) : null;
}
}

View File

@ -21,6 +21,7 @@ package org.apache.druid.indexing.kafka;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Injector;
@ -36,17 +37,34 @@ import org.apache.druid.utils.CollectionUtils;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
public class KafkaDataSourceMetadataTest
{
private static final KafkaDataSourceMetadata START0 = startMetadata(ImmutableMap.of());
private static final KafkaDataSourceMetadata START1 = startMetadata(ImmutableMap.of(0, 2L, 1, 3L));
private static final KafkaDataSourceMetadata START2 = startMetadata(ImmutableMap.of(0, 2L, 1, 4L, 2, 5L));
private static final KafkaDataSourceMetadata START3 = startMetadata(ImmutableMap.of(0, 2L, 2, 5L));
private static final KafkaDataSourceMetadata END0 = endMetadata(ImmutableMap.of());
private static final KafkaDataSourceMetadata END1 = endMetadata(ImmutableMap.of(0, 2L, 2, 5L));
private static final KafkaDataSourceMetadata END2 = endMetadata(ImmutableMap.of(0, 2L, 1, 4L));
private static final KafkaDataSourceMetadata START0 = startMetadata("foo", ImmutableMap.of());
private static final KafkaDataSourceMetadata START1 = startMetadata("foo", ImmutableMap.of(0, 2L, 1, 3L));
private static final KafkaDataSourceMetadata START2 = startMetadata("foo", ImmutableMap.of(0, 2L, 1, 4L, 2, 5L));
private static final KafkaDataSourceMetadata START3 = startMetadata("foo", ImmutableMap.of(0, 2L, 2, 5L));
private static final KafkaDataSourceMetadata START4 = startMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of());
private static final KafkaDataSourceMetadata START5 = startMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(0, 2L, 1, 3L));
private static final KafkaDataSourceMetadata START6 = startMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 1, 3L));
private static final KafkaDataSourceMetadata START7 = startMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 2, 5L));
private static final KafkaDataSourceMetadata START8 = startMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of(0, 2L, 2, 5L));
private static final KafkaDataSourceMetadata START9 = startMetadataMultiTopic("foo2.*", ImmutableList.of("foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L));
private static final KafkaDataSourceMetadata END0 = endMetadata("foo", ImmutableMap.of());
private static final KafkaDataSourceMetadata END1 = endMetadata("foo", ImmutableMap.of(0, 2L, 2, 5L));
private static final KafkaDataSourceMetadata END2 = endMetadata("foo", ImmutableMap.of(0, 2L, 1, 4L));
private static final KafkaDataSourceMetadata END3 = endMetadata("foo", ImmutableMap.of(0, 2L, 1, 3L));
private static final KafkaDataSourceMetadata END4 = endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of());
private static final KafkaDataSourceMetadata END5 = endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(0, 2L, 2, 5L));
private static final KafkaDataSourceMetadata END6 = endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(0, 2L, 1, 4L));
private static final KafkaDataSourceMetadata END7 = endMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 2, 5L));
private static final KafkaDataSourceMetadata END8 = endMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 1, 4L));
private static final KafkaDataSourceMetadata END9 = endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of(0, 2L, 2, 5L));
private static final KafkaDataSourceMetadata END10 = endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L));
@Test
public void testMatches()
@ -55,33 +73,332 @@ public class KafkaDataSourceMetadataTest
Assert.assertTrue(START0.matches(START1));
Assert.assertTrue(START0.matches(START2));
Assert.assertTrue(START0.matches(START3));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior
Assert.assertFalse(START0.matches(START4));
Assert.assertTrue(START0.matches(START5));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(START0.matches(START6));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(START0.matches(START7));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(START0.matches(START8));
// when merging, we lose the sequence numbers for topics foo2, and foo22 here when merging, so false
Assert.assertFalse(START0.matches(START9));
Assert.assertTrue(START1.matches(START0));
Assert.assertTrue(START1.matches(START1));
// sequence numbers for topic foo partition 1 are inconsistent, so false
Assert.assertFalse(START1.matches(START2));
Assert.assertTrue(START1.matches(START3));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertTrue(START1.matches(START5));
// when merging, we lose the sequence numbers for topic foo2, and sequence numbers for topic foo-1 are inconsistent, so false
Assert.assertFalse(START1.matches(START6));
// when merging, we lose the sequence numbers for topic foo2, so false
Assert.assertFalse(START1.matches(START7));
// when merging, we lose the sequence numbers for topic foo2, so false
Assert.assertFalse(START1.matches(START8));
// when merging, we lose the sequence numbers for topics foo2 and foo22, so false
Assert.assertFalse(START1.matches(START9));
Assert.assertTrue(START2.matches(START0));
Assert.assertFalse(START2.matches(START1));
Assert.assertTrue(START2.matches(START2));
Assert.assertTrue(START2.matches(START3));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(START2.matches(START4));
// when merging, sequence numbers for topic foo-1 are inconsistent, so false
Assert.assertFalse(START2.matches(START5));
// when merging, we lose the sequence numbers for topic foo2, and sequence numbers for topic foo-1 are inconsistent, so false
Assert.assertFalse(START2.matches(START6));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(START2.matches(START7));
Assert.assertTrue(START3.matches(START0));
Assert.assertTrue(START3.matches(START1));
Assert.assertTrue(START3.matches(START2));
Assert.assertTrue(START3.matches(START3));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(START3.matches(START4));
Assert.assertTrue(START3.matches(START5));
// when merging, we lose the sequence numbers for topic foo2, so false
Assert.assertFalse(START3.matches(START6));
// when merging, we lose the sequence numbers for topic foo2, so false
Assert.assertFalse(START3.matches(START7));
// when merging, we lose the sequence numbers for topic foo2, so false
Assert.assertFalse(START3.matches(START8));
// when merging, we lose the sequence numbers for topics foo2 and foo22, so false
Assert.assertFalse(START3.matches(START9));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(START4.matches(START0));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(START4.matches(START1));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(START4.matches(START2));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(START4.matches(START3));
Assert.assertTrue(START4.matches(START4));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(START4.matches(START5));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(START4.matches(START6));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(START4.matches(START7));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(START4.matches(START8));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(START4.matches(START9));
Assert.assertTrue(START5.matches(START0));
Assert.assertTrue(START5.matches(START1));
// when merging, the sequence numbers for topic foo-1 are inconsistent, so false
Assert.assertFalse(START5.matches(START2));
Assert.assertTrue(START5.matches(START3));
Assert.assertTrue(START5.matches(START4));
Assert.assertTrue(START5.matches(START5));
Assert.assertTrue(START5.matches(START6));
Assert.assertTrue(START5.matches(START7));
Assert.assertTrue(START5.matches(START8));
Assert.assertTrue(START5.matches(START9));
Assert.assertTrue(START6.matches(START0));
Assert.assertTrue(START6.matches(START1));
// when merging, the sequence numbers for topic foo-1 are inconsistent, so false
Assert.assertFalse(START6.matches(START2));
Assert.assertTrue(START6.matches(START3));
Assert.assertTrue(START6.matches(START4));
Assert.assertTrue(START6.matches(START5));
Assert.assertTrue(START6.matches(START6));
Assert.assertTrue(START6.matches(START7));
Assert.assertTrue(START6.matches(START8));
Assert.assertTrue(START6.matches(START9));
Assert.assertTrue(START7.matches(START0));
Assert.assertTrue(START7.matches(START1));
Assert.assertTrue(START7.matches(START2));
Assert.assertTrue(START7.matches(START3));
Assert.assertTrue(START7.matches(START4));
Assert.assertTrue(START7.matches(START5));
Assert.assertTrue(START7.matches(START6));
Assert.assertTrue(START7.matches(START7));
Assert.assertTrue(START7.matches(START8));
Assert.assertTrue(START7.matches(START9));
Assert.assertTrue(START8.matches(START0));
// when merging, we lose the sequence numbers for topic foo, so false
Assert.assertFalse(START8.matches(START1));
// when merging, we lose the sequence numbers for topic foo, so false
Assert.assertFalse(START8.matches(START2));
// when merging, we lose the sequence numbers for topic foo, so false
Assert.assertFalse(START8.matches(START3));
Assert.assertTrue(START8.matches(START4));
// when merging, we lose the sequence numbers for topic foo, so false
Assert.assertFalse(START8.matches(START5));
// when merging, we lose the sequence numbers for topic foo, so false
Assert.assertFalse(START8.matches(START6));
// when merging, we lose the sequence numbers for topic foo, so false
Assert.assertFalse(START8.matches(START7));
Assert.assertTrue(START8.matches(START8));
Assert.assertTrue(START8.matches(START9));
Assert.assertTrue(START9.matches(START0));
// when merging, we lose the sequence numbers for topic foo, so false
Assert.assertFalse(START9.matches(START1));
// when merging, we lose the sequence numbers for topic foo, so false
Assert.assertFalse(START9.matches(START2));
// when merging, we lose the sequence numbers for topic foo, so false
Assert.assertFalse(START9.matches(START3));
Assert.assertTrue(START9.matches(START4));
// when merging, we lose the sequence numbers for topic foo, so false
Assert.assertFalse(START9.matches(START5));
// when merging, we lose the sequence numbers for topic foo, so false
Assert.assertFalse(START9.matches(START6));
// when merging, we lose the sequence numbers for topic foo, so false
Assert.assertFalse(START9.matches(START7));
Assert.assertTrue(START9.matches(START8));
Assert.assertTrue(START9.matches(START9));
Assert.assertTrue(END0.matches(END0));
Assert.assertTrue(END0.matches(END1));
Assert.assertTrue(END0.matches(END2));
Assert.assertTrue(END0.matches(END3));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(END0.matches(END4));
Assert.assertTrue(END0.matches(END5));
Assert.assertTrue(END0.matches(END6));
// when merging, we lose the sequence numbers for topic foo2, so false
Assert.assertFalse(END0.matches(END7));
// when merging, we lose the sequence numbers for topic foo2, so false
Assert.assertFalse(END0.matches(END8));
// when merging, we lose the sequence numbers for topic foo2, so false
Assert.assertFalse(END0.matches(END9));
// when merging, we lose the sequence numbers for topic foo2, so false
Assert.assertFalse(END0.matches(END10));
Assert.assertTrue(END1.matches(END0));
Assert.assertTrue(END1.matches(END1));
Assert.assertTrue(END1.matches(END2));
Assert.assertTrue(END1.matches(END3));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(END1.matches(END4));
Assert.assertTrue(END1.matches(END5));
Assert.assertTrue(END1.matches(END6));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END1.matches(END7));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END1.matches(END8));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END1.matches(END9));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END1.matches(END10));
Assert.assertTrue(END2.matches(END0));
Assert.assertTrue(END2.matches(END1));
Assert.assertTrue(END2.matches(END2));
// when merging, the sequence numbers for topic foo-1 are inconsistent, so false
Assert.assertFalse(END2.matches(END3));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(END2.matches(END4));
Assert.assertTrue(END2.matches(END5));
Assert.assertTrue(END2.matches(END6));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END2.matches(END7));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END2.matches(END8));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END2.matches(END9));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END2.matches(END10));
Assert.assertTrue(END3.matches(END0));
Assert.assertTrue(END3.matches(END1));
// when merging, the sequence numbers for topic foo-1 are inconsistent, so false
Assert.assertFalse(END3.matches(END2));
Assert.assertTrue(END3.matches(END3));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(END3.matches(END4));
Assert.assertTrue(END3.matches(END5));
// when merging, the sequence numbers for topic foo-1 are inconsistent, so false
Assert.assertFalse(END3.matches(END6));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END3.matches(END7));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END3.matches(END8));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END3.matches(END9));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END3.matches(END10));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(END4.matches(END0));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(END4.matches(END1));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(END4.matches(END2));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(END4.matches(END3));
Assert.assertTrue(END4.matches(END4));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(END4.matches(END5));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(END4.matches(END6));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(END4.matches(END7));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(END4.matches(END8));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(END4.matches(END9));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(END4.matches(END10));
Assert.assertTrue(END5.matches(END0));
Assert.assertTrue(END5.matches(END1));
Assert.assertTrue(END5.matches(END2));
Assert.assertTrue(END5.matches(END3));
Assert.assertTrue(END5.matches(END4));
Assert.assertTrue(END5.matches(END5));
Assert.assertTrue(END5.matches(END6));
Assert.assertTrue(END5.matches(END7));
Assert.assertTrue(END5.matches(END8));
Assert.assertTrue(END5.matches(END9));
Assert.assertTrue(END5.matches(END10));
Assert.assertTrue(END6.matches(END0));
Assert.assertTrue(END6.matches(END1));
Assert.assertTrue(END6.matches(END2));
// when merging, the sequence numbers for topic foo-1 are inconsistent, so false
Assert.assertFalse(END6.matches(END3));
Assert.assertTrue(END6.matches(END4));
Assert.assertTrue(END6.matches(END5));
Assert.assertTrue(END6.matches(END6));
Assert.assertTrue(END6.matches(END7));
Assert.assertTrue(END6.matches(END8));
Assert.assertTrue(END6.matches(END9));
Assert.assertTrue(END6.matches(END10));
Assert.assertTrue(END7.matches(END0));
Assert.assertTrue(END7.matches(END1));
Assert.assertTrue(END7.matches(END2));
Assert.assertTrue(END7.matches(END3));
Assert.assertTrue(END7.matches(END4));
Assert.assertTrue(END7.matches(END5));
Assert.assertTrue(END7.matches(END6));
Assert.assertTrue(END7.matches(END7));
Assert.assertTrue(END7.matches(END8));
Assert.assertTrue(END7.matches(END9));
Assert.assertTrue(END7.matches(END10));
Assert.assertTrue(END8.matches(END0));
Assert.assertTrue(END8.matches(END1));
Assert.assertTrue(END8.matches(END2));
// when merging, the sequence numbers for topic foo-1, and foo-2 are inconsistent, so false
Assert.assertFalse(END8.matches(END3));
Assert.assertTrue(END8.matches(END4));
Assert.assertTrue(END8.matches(END5));
Assert.assertTrue(END8.matches(END6));
Assert.assertTrue(END8.matches(END7));
Assert.assertTrue(END8.matches(END8));
Assert.assertTrue(END8.matches(END9));
Assert.assertTrue(END8.matches(END10));
Assert.assertTrue(END9.matches(END0));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END9.matches(END1));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END9.matches(END2));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END9.matches(END3));
Assert.assertTrue(END9.matches(END4));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END9.matches(END5));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END9.matches(END6));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END9.matches(END7));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END9.matches(END8));
Assert.assertTrue(END9.matches(END9));
Assert.assertTrue(END9.matches(END10));
Assert.assertTrue(END10.matches(END0));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END10.matches(END1));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END10.matches(END2));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END10.matches(END3));
Assert.assertTrue(END10.matches(END4));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END10.matches(END5));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END10.matches(END6));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END10.matches(END7));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END10.matches(END8));
Assert.assertTrue(END10.matches(END9));
Assert.assertTrue(END10.matches(END10));
}
@Test
@ -91,6 +408,12 @@ public class KafkaDataSourceMetadataTest
Assert.assertTrue(START1.isValidStart());
Assert.assertTrue(START2.isValidStart());
Assert.assertTrue(START3.isValidStart());
Assert.assertTrue(START4.isValidStart());
Assert.assertTrue(START5.isValidStart());
Assert.assertTrue(START6.isValidStart());
Assert.assertTrue(START7.isValidStart());
Assert.assertTrue(START8.isValidStart());
Assert.assertTrue(START9.isValidStart());
}
@Test
@ -121,6 +444,85 @@ public class KafkaDataSourceMetadataTest
START2.plus(START2)
);
// add comment on this
Assert.assertEquals(
START4,
START1.plus(START4)
);
Assert.assertEquals(
startMetadata(ImmutableMap.of(0, 2L, 1, 3L)),
START1.plus(START5)
);
Assert.assertEquals(
startMetadata(ImmutableMap.of(0, 2L, 1, 3L)),
START1.plus(START6)
);
Assert.assertEquals(
startMetadata(ImmutableMap.of(0, 2L, 1, 3L, 2, 5L)),
START1.plus(START7)
);
Assert.assertEquals(
startMetadata(ImmutableMap.of(0, 2L, 1, 3L, 2, 5L)),
START2.plus(START6)
);
// add comment on this
Assert.assertEquals(
START0,
START4.plus(START0)
);
// add comment on this
Assert.assertEquals(
START1,
START4.plus(START1)
);
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertEquals(
START4,
START4.plus(START5)
);
Assert.assertEquals(
START5,
START5.plus(START4)
);
Assert.assertEquals(
startMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 1, 3L)),
START5.plus(START6)
);
Assert.assertEquals(
startMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 2, 5L)),
START7.plus(START8)
);
Assert.assertEquals(
startMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L)),
START7.plus(START9)
);
Assert.assertEquals(
startMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of(0, 2L, 2, 5L)),
START8.plus(START7)
);
Assert.assertEquals(
startMetadataMultiTopic("foo2.*", ImmutableList.of("foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L)),
START8.plus(START9)
);
Assert.assertEquals(
startMetadataMultiTopic("foo2.*", ImmutableList.of("foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L)),
START9.plus(START8)
);
Assert.assertEquals(
endMetadata(ImmutableMap.of(0, 2L, 2, 5L)),
END0.plus(END1)
@ -130,26 +532,136 @@ public class KafkaDataSourceMetadataTest
endMetadata(ImmutableMap.of(0, 2L, 1, 4L, 2, 5L)),
END1.plus(END2)
);
// add comment on this
Assert.assertEquals(
END4,
END0.plus(END4)
);
// add comment on this
Assert.assertEquals(
END4,
END1.plus(END4)
);
// add comment on this
Assert.assertEquals(
END0,
END4.plus(END0)
);
// add comment on this
Assert.assertEquals(
END1,
END4.plus(END1)
);
Assert.assertEquals(
END3,
END2.plus(END3)
);
Assert.assertEquals(
END2,
END3.plus(END2)
);
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertEquals(
END4,
END4.plus(END5)
);
Assert.assertEquals(
END5,
END5.plus(END4)
);
Assert.assertEquals(
endMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 2, 5L)),
END5.plus(END9)
);
Assert.assertEquals(
endMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L)),
END5.plus(END10)
);
Assert.assertEquals(
endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(0, 2L, 1, 4L, 2, 5L)),
END5.plus(END6)
);
Assert.assertEquals(
endMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 2, 5L)),
END5.plus(END7)
);
Assert.assertEquals(
endMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 2, 5L)),
END7.plus(END5)
);
Assert.assertEquals(
endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of(0, 2L, 2, 5L)),
END9.plus(END5)
);
Assert.assertEquals(
endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L)),
END9.plus(END10)
);
Assert.assertEquals(
endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L)),
END10.plus(END9)
);
}
@Test
public void testMinus()
{
Assert.assertEquals(
startMetadata(ImmutableMap.of(1, 3L)),
START1.minus(START3)
);
Assert.assertEquals(
startMetadata(ImmutableMap.of()),
START0.minus(START2)
);
Assert.assertEquals(
START0,
START0.minus(START4)
);
Assert.assertEquals(
startMetadata(ImmutableMap.of()),
START1.minus(START2)
);
Assert.assertEquals(
startMetadata(ImmutableMap.of(1, 3L)),
START1.minus(START3)
);
Assert.assertEquals(
START1,
START1.minus(START4)
);
Assert.assertEquals(
startMetadata("foo", ImmutableMap.of()),
START1.minus(START5)
);
Assert.assertEquals(
startMetadata("foo", ImmutableMap.of()),
START1.minus(START6)
);
Assert.assertEquals(
startMetadata("foo", ImmutableMap.of(1, 3L)),
START1.minus(START7)
);
Assert.assertEquals(
startMetadata(ImmutableMap.of(2, 5L)),
START2.minus(START1)
@ -160,6 +672,21 @@ public class KafkaDataSourceMetadataTest
START2.minus(START2)
);
Assert.assertEquals(
START4,
START4.minus(START0)
);
Assert.assertEquals(
START4,
START4.minus(START1)
);
Assert.assertEquals(
startMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of()),
START5.minus(START1)
);
Assert.assertEquals(
endMetadata(ImmutableMap.of(1, 4L)),
END2.minus(END1)
@ -169,6 +696,101 @@ public class KafkaDataSourceMetadataTest
endMetadata(ImmutableMap.of(2, 5L)),
END1.minus(END2)
);
Assert.assertEquals(
END0,
END0.minus(END4)
);
Assert.assertEquals(
END4,
END4.minus(END0)
);
Assert.assertEquals(
END1,
END1.minus(END4)
);
Assert.assertEquals(
END4,
END4.minus(END1)
);
Assert.assertEquals(
endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of()),
END5.minus(END1)
);
Assert.assertEquals(
endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(0, 2L, 2, 5L)),
END5.minus(END4)
);
Assert.assertEquals(
endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(2, 5L)),
END5.minus(END6)
);
Assert.assertEquals(
endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(1, 4L)),
END6.minus(END5)
);
Assert.assertEquals(
endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(1, 4L)),
END6.minus(END7)
);
Assert.assertEquals(
endMetadataMultiTopic("foo.*", ImmutableList.of("foo2"), ImmutableMap.of(0, 2L, 2, 5L)),
END7.minus(END5)
);
Assert.assertEquals(
endMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(2, 5L)),
END7.minus(END8)
);
Assert.assertEquals(
endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(0, 2L, 2, 5L)),
END7.minus(END9)
);
Assert.assertEquals(
endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(0, 2L, 2, 5L)),
END7.minus(END10)
);
Assert.assertEquals(
END9,
END9.minus(END6)
);
Assert.assertEquals(
endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of()),
END9.minus(END7)
);
Assert.assertEquals(
endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of(2, 5L)),
END9.minus(END8)
);
Assert.assertEquals(
endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of()),
END9.minus(END9)
);
Assert.assertEquals(
endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of()),
END9.minus(END10)
);
Assert.assertEquals(
endMetadataMultiTopic("foo2.*", ImmutableList.of("foo22"), ImmutableMap.of(0, 2L, 2, 5L)),
END10.minus(END9)
);
}
@Test
@ -203,19 +825,59 @@ public class KafkaDataSourceMetadataTest
}
private static KafkaDataSourceMetadata startMetadata(Map<Integer, Long> offsets)
{
return startMetadata("foo", offsets);
}
private static KafkaDataSourceMetadata startMetadata(
String topic,
Map<Integer, Long> offsets
)
{
Map<KafkaTopicPartition, Long> newOffsets = CollectionUtils.mapKeys(
offsets,
k -> new KafkaTopicPartition(
false,
"foo",
topic,
k
)
);
return new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>("foo", newOffsets, ImmutableSet.of()));
return new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(topic, newOffsets, ImmutableSet.of()));
}
private static KafkaDataSourceMetadata startMetadataMultiTopic(
String topicPattern,
List<String> topics,
Map<Integer, Long> offsets
)
{
Assert.assertFalse(topics.isEmpty());
Pattern pattern = Pattern.compile(topicPattern);
Assert.assertTrue(topics.stream().allMatch(t -> pattern.matcher(t).matches()));
Map<KafkaTopicPartition, Long> newOffsets = new HashMap<>();
for (Map.Entry<Integer, Long> e : offsets.entrySet()) {
for (String topic : topics) {
newOffsets.put(
new KafkaTopicPartition(
true,
topic,
e.getKey()
),
e.getValue()
);
}
}
return new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(topicPattern, newOffsets, ImmutableSet.of()));
}
private static KafkaDataSourceMetadata endMetadata(Map<Integer, Long> offsets)
{
return endMetadata("foo", offsets);
}
private static KafkaDataSourceMetadata endMetadata(String topic, Map<Integer, Long> offsets)
{
Map<KafkaTopicPartition, Long> newOffsets = CollectionUtils.mapKeys(
offsets,
@ -225,7 +887,33 @@ public class KafkaDataSourceMetadataTest
k
)
);
return new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>("foo", newOffsets));
return new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, newOffsets));
}
private static KafkaDataSourceMetadata endMetadataMultiTopic(
String topicPattern,
List<String> topics,
Map<Integer, Long> offsets
)
{
Assert.assertFalse(topics.isEmpty());
Pattern pattern = Pattern.compile(topicPattern);
Assert.assertTrue(topics.stream().allMatch(t -> pattern.matcher(t).matches()));
Map<KafkaTopicPartition, Long> newOffsets = new HashMap<>();
for (Map.Entry<Integer, Long> e : offsets.entrySet()) {
for (String topic : topics) {
newOffsets.put(
new KafkaTopicPartition(
true,
topic,
e.getKey()
),
e.getValue()
);
}
}
return new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topicPattern, newOffsets));
}
private static ObjectMapper createObjectMapper()

View File

@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.kafka;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.exc.InvalidTypeIdException;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Injector;
import com.google.inject.name.Names;
import org.apache.druid.data.input.kafka.KafkaTopicPartition;
import org.apache.druid.guice.StartupInjectorBuilder;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.initialization.CoreInjectorBuilder;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.junit.Assert;
import org.junit.Test;
import java.util.Map;
public class KafkaSeekableStreamEndSequenceNumbersTest
{
private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
@Test
public void testSerde() throws Exception
{
final String stream = "theStream";
final Map<KafkaTopicPartition, Long> offsetMap = ImmutableMap.of(
new KafkaTopicPartition(false, null, 1), 2L,
new KafkaTopicPartition(false, null, 3), 4L
);
final KafkaSeekableStreamEndSequenceNumbers partitions = new KafkaSeekableStreamEndSequenceNumbers(
stream,
null,
offsetMap,
null
);
final String serializedString = OBJECT_MAPPER.writeValueAsString(partitions);
// Check round-trip.
final SeekableStreamEndSequenceNumbers<KafkaTopicPartition, Long> partitions2 = OBJECT_MAPPER.readValue(
serializedString,
new TypeReference<SeekableStreamEndSequenceNumbers<KafkaTopicPartition, Long>>() {}
);
Assert.assertEquals(
"Round trip",
partitions,
new KafkaSeekableStreamEndSequenceNumbers(partitions2.getStream(),
partitions2.getTopic(),
partitions2.getPartitionSequenceNumberMap(),
partitions2.getPartitionOffsetMap()
)
);
// Check backwards compatibility.
final Map<String, Object> asMap = OBJECT_MAPPER.readValue(
serializedString,
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
);
Assert.assertEquals(stream, asMap.get("stream"));
Assert.assertEquals(stream, asMap.get("topic"));
// Jackson will deserialize the maps as string -> int maps, not int -> long.
Assert.assertEquals(
offsetMap,
OBJECT_MAPPER.convertValue(asMap.get("partitionSequenceNumberMap"), new TypeReference<Map<KafkaTopicPartition, Long>>() {})
);
Assert.assertEquals(
offsetMap,
OBJECT_MAPPER.convertValue(asMap.get("partitionOffsetMap"), new TypeReference<Map<KafkaTopicPartition, Long>>() {})
);
// check that KafkaSeekableStreamEndSequenceNumbers not registered with mapper, so no possible collision
// when deserializing it from String / bytes
boolean expectedExceptionThrown = false;
try {
OBJECT_MAPPER.readValue(
serializedString,
KafkaSeekableStreamEndSequenceNumbers.class
);
}
catch (InvalidTypeIdException e) {
expectedExceptionThrown = true;
}
Assert.assertTrue("KafkaSeekableStreamEndSequenceNumbers should not be registered type", expectedExceptionThrown);
}
private static ObjectMapper createObjectMapper()
{
DruidModule module = new KafkaIndexTaskModule();
final Injector injector = new CoreInjectorBuilder(new StartupInjectorBuilder().build())
.addModule(
binder -> {
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8000);
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(9000);
}
)
.build();
ObjectMapper objectMapper = injector.getInstance(ObjectMapper.class);
module.getJacksonModules().forEach(objectMapper::registerModule);
return objectMapper;
}
}

View File

@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.kafka;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.exc.InvalidTypeIdException;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Injector;
import com.google.inject.name.Names;
import org.apache.druid.data.input.kafka.KafkaTopicPartition;
import org.apache.druid.guice.StartupInjectorBuilder;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.initialization.CoreInjectorBuilder;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.junit.Assert;
import org.junit.Test;
import java.util.Map;
import java.util.Set;
public class KafkaSeekableStreamStartSequenceNumbersTest
{
private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
@Test
public void testSerde() throws Exception
{
final String stream = "theStream";
final Map<KafkaTopicPartition, Long> offsetMap = ImmutableMap.of(
new KafkaTopicPartition(false, null, 1), 2L,
new KafkaTopicPartition(false, null, 3), 4L
);
Set<KafkaTopicPartition> exclusivePartitions = ImmutableSet.of(new KafkaTopicPartition(false, null, 1));
final KafkaSeekableStreamStartSequenceNumbers partitions = new KafkaSeekableStreamStartSequenceNumbers(
stream,
null,
offsetMap,
null,
exclusivePartitions
);
final String serializedString = OBJECT_MAPPER.writeValueAsString(partitions);
// Check round-trip.
final SeekableStreamStartSequenceNumbers<KafkaTopicPartition, Long> partitions2 = OBJECT_MAPPER.readValue(
serializedString,
new TypeReference<SeekableStreamStartSequenceNumbers<KafkaTopicPartition, Long>>() {}
);
Assert.assertEquals(
"Round trip",
partitions,
new KafkaSeekableStreamStartSequenceNumbers(
partitions2.getStream(),
partitions2.getTopic(),
partitions2.getPartitionSequenceNumberMap(),
partitions2.getPartitionOffsetMap(),
partitions2.getExclusivePartitions()
)
);
// Check backwards compatibility.
final Map<String, Object> asMap = OBJECT_MAPPER.readValue(
serializedString,
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
);
Assert.assertEquals(stream, asMap.get("stream"));
Assert.assertEquals(stream, asMap.get("topic"));
// Jackson will deserialize the maps as string -> int maps, not int -> long.
Assert.assertEquals(
offsetMap,
OBJECT_MAPPER.convertValue(asMap.get("partitionSequenceNumberMap"), new TypeReference<Map<KafkaTopicPartition, Long>>() {})
);
Assert.assertEquals(
offsetMap,
OBJECT_MAPPER.convertValue(asMap.get("partitionOffsetMap"), new TypeReference<Map<KafkaTopicPartition, Long>>() {})
);
Assert.assertEquals(
exclusivePartitions,
OBJECT_MAPPER.convertValue(asMap.get("exclusivePartitions"), new TypeReference<Set<KafkaTopicPartition>>() {})
);
// check that KafkaSeekableStreamStartSequenceNumbers not registered with mapper, so no possible collision
// when deserializing it from String / bytes
boolean expectedExceptionThrown = false;
try {
OBJECT_MAPPER.readValue(
serializedString,
KafkaSeekableStreamStartSequenceNumbers.class
);
}
catch (InvalidTypeIdException e) {
expectedExceptionThrown = true;
}
Assert.assertTrue("KafkaSeekableStreamStartSequenceNumbers should not be registered type", expectedExceptionThrown);
}
private static ObjectMapper createObjectMapper()
{
DruidModule module = new KafkaIndexTaskModule();
final Injector injector = new CoreInjectorBuilder(new StartupInjectorBuilder().build())
.addModule(
binder -> {
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8000);
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(9000);
}
)
.build();
ObjectMapper objectMapper = injector.getInstance(ObjectMapper.class);
module.getJacksonModules().forEach(objectMapper::registerModule);
return objectMapper;
}
}

View File

@ -166,6 +166,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
private SeekableStreamIndexTaskClient<KafkaTopicPartition, Long> taskClient;
private TaskQueue taskQueue;
private String topic;
private String topicPattern;
private boolean multiTopic;
private RowIngestionMetersFactory rowIngestionMetersFactory;
private StubServiceEmitter serviceEmitter;
private SupervisorStateManagerConfig supervisorConfig;
@ -174,7 +176,13 @@ public class KafkaSupervisorTest extends EasyMockSupport
private static String getTopic()
{
//noinspection StringConcatenationMissingWhitespace
return TOPIC_PREFIX + topicPostfix++;
return TOPIC_PREFIX + topicPostfix;
}
private static String getTopicPattern()
{
//noinspection StringConcatenationMissingWhitespace
return TOPIC_PREFIX + topicPostfix + ".*";
}
@Parameterized.Parameters(name = "numThreads = {0}")
@ -222,6 +230,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
taskQueue = createMock(TaskQueue.class);
topic = getTopic();
topicPattern = getTopicPattern();
topicPostfix++;
multiTopic = false; // assign to true in test if you wish to test multi-topic
rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
serviceEmitter = new StubServiceEmitter("KafkaSupervisorTest", "localhost");
EmittingLogger.registerEmitter(serviceEmitter);
@ -906,11 +917,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
}
/**
* Test generating the starting offsets from the partition data stored in druid_dataSource which contains the
* offsets of the last built segments.
* Test generating the starting offsets for single-topic config from the partition data stored in druid_dataSource which
* contains the offsets from previous single-topic config of the last built segments, where the previous topic does
* match the new configuration.
*/
@Test
public void testDatasourceMetadata() throws Exception
public void testDatasourceMetadataSingleTopicToSingleTopicMatch() throws Exception
{
supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
addSomeEvents(100);
@ -948,6 +960,203 @@ public class KafkaSupervisorTest extends EasyMockSupport
);
}
/**
* Test generating the starting offsets for multi-topic config from the partition data stored in druid_dataSource which
* contains the offsets from previous single-topic config of the last built segments, where the previous topic does
* match the new configuration.
*/
@Test
public void testDatasourceMetadataSingleTopicToMultiTopicMatch() throws Exception
{
multiTopic = true;
supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
addSomeEvents(100);
Capture<KafkaIndexTask> captured = Capture.newInstance();
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
ImmutableMap.Builder<KafkaTopicPartition, Long> partitionSequenceNumberMap = ImmutableMap.builder();
// these should match
partitionSequenceNumberMap.putAll(singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L));
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(topic, partitionSequenceNumberMap.build(), ImmutableSet.of())
)
).anyTimes();
EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true);
replayAll();
supervisor.start();
supervisor.runInternal();
verifyAll();
KafkaIndexTask task = captured.getValue();
KafkaIndexTaskIOConfig taskConfig = task.getIOConfig();
Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
Assert.assertEquals(
10L,
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 0)).longValue()
);
Assert.assertEquals(
20L,
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 1)).longValue()
);
Assert.assertEquals(
30L,
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 2)).longValue()
);
Assert.assertEquals(3, taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().size());
}
/**
* Test generating the starting offsets for multi-topic config from the partition data stored in druid_dataSource which
* contains the offsets from previous single-topic config of the last built segments, where the previous topic does
* not match the new configuration.
*/
@Test
public void testDatasourceMetadataSingleTopicToMultiTopicNotMatch() throws Exception
{
multiTopic = true;
supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
addSomeEvents(100);
Capture<KafkaIndexTask> captured = Capture.newInstance();
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
ImmutableMap.Builder<KafkaTopicPartition, Long> partitionSequenceNumberMap = ImmutableMap.builder();
// these should not match
partitionSequenceNumberMap.putAll(singlePartitionMap("notMatch", 0, 10L, 1, 20L, 2, 30L));
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>("notMatch", partitionSequenceNumberMap.build(), ImmutableSet.of())
)
).anyTimes();
EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true);
replayAll();
supervisor.start();
supervisor.runInternal();
verifyAll();
KafkaIndexTask task = captured.getValue();
KafkaIndexTaskIOConfig taskConfig = task.getIOConfig();
Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
Assert.assertEquals(
0L,
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 0)).longValue()
);
Assert.assertEquals(
0L,
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 1)).longValue()
);
Assert.assertEquals(
0L,
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 2)).longValue()
);
Assert.assertEquals(3, taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().size());
}
/**
* Test generating the starting offsets for single-topic config from the partition data stored in druid_dataSource which
* contains the offsets from previous multi-topic config of the last built segments.
*/
@Test
public void testDatasourceMetadataMultiTopicToSingleTopic() throws Exception
{
supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
addSomeEvents(100);
Capture<KafkaIndexTask> captured = Capture.newInstance();
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
ImmutableMap.Builder<KafkaTopicPartition, Long> partitionSequenceNumberMap = ImmutableMap.builder();
// these should match
partitionSequenceNumberMap.putAll(multiTopicPartitionMap(topic, 0, 10L, 1, 20L, 2, 30L));
// these should not match
partitionSequenceNumberMap.putAll(multiTopicPartitionMap("notMatch", 0, 10L, 1, 20L, 2, 30L));
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(topicPattern, partitionSequenceNumberMap.build(), ImmutableSet.of())
)
).anyTimes();
EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true);
replayAll();
supervisor.start();
supervisor.runInternal();
verifyAll();
KafkaIndexTask task = captured.getValue();
KafkaIndexTaskIOConfig taskConfig = task.getIOConfig();
Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
Assert.assertEquals(
10L,
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 0)).longValue()
);
Assert.assertEquals(
20L,
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 1)).longValue()
);
Assert.assertEquals(
30L,
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 2)).longValue()
);
Assert.assertEquals(3, taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().size());
}
/**
* Test generating the starting offsets for muti-topic config from the partition data stored in druid_dataSource which
* contains the offsets from previous multi-topic config of the last built segments.
*/
@Test
public void testDatasourceMetadataMultiTopicToMultiTopic() throws Exception
{
multiTopic = true;
supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
addSomeEvents(100);
Capture<KafkaIndexTask> captured = Capture.newInstance();
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
ImmutableMap.Builder<KafkaTopicPartition, Long> partitionSequenceNumberMap = ImmutableMap.builder();
// these should match
partitionSequenceNumberMap.putAll(multiTopicPartitionMap(topic, 0, 10L, 1, 20L, 2, 30L));
// these should not match
partitionSequenceNumberMap.putAll(multiTopicPartitionMap("notMatch", 0, 10L, 1, 20L, 2, 30L));
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(topic, partitionSequenceNumberMap.build(), ImmutableSet.of())
)
).anyTimes();
EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true);
replayAll();
supervisor.start();
supervisor.runInternal();
verifyAll();
KafkaIndexTask task = captured.getValue();
KafkaIndexTaskIOConfig taskConfig = task.getIOConfig();
Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
Assert.assertEquals(
10L,
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 0)).longValue()
);
Assert.assertEquals(
20L,
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 1)).longValue()
);
Assert.assertEquals(
30L,
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 2)).longValue()
);
Assert.assertEquals(3, taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().size());
}
@Test
public void testBadMetadataOffsets() throws Exception
{
@ -4621,8 +4830,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
consumerProperties.put("myCustomKey", "myCustomValue");
consumerProperties.put("bootstrap.servers", kafkaHost);
KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig(
topic,
null,
multiTopic ? null : topic,
multiTopic ? topicPattern : null,
INPUT_FORMAT,
replicas,
taskCount,
@ -5038,6 +5247,13 @@ public class KafkaSupervisorTest extends EasyMockSupport
offset2, new KafkaTopicPartition(false, topic, partition3), offset3);
}
private static ImmutableMap<KafkaTopicPartition, Long> multiTopicPartitionMap(String topic, int partition1, long offset1,
int partition2, long offset2, int partition3, long offset3)
{
return ImmutableMap.of(new KafkaTopicPartition(true, topic, partition1), offset1, new KafkaTopicPartition(true, topic, partition2),
offset2, new KafkaTopicPartition(true, topic, partition3), offset3);
}
private static class TestTaskRunnerWorkItem extends TaskRunnerWorkItem
{
private final String taskType;
@ -5109,7 +5325,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Deserializer valueDeserializerObject = new ByteArrayDeserializer();
return new KafkaRecordSupplier(
new KafkaConsumer<>(props, keyDeserializerObject, valueDeserializerObject),
false
getIoConfig().isMultiTopic()
);
}

View File

@ -22,7 +22,6 @@ package org.apache.druid.indexing.seekablestream;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.IAE;
import java.util.Collections;
import java.util.Comparator;
@ -47,6 +46,8 @@ import java.util.Objects;
public class SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetType> implements
SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType>
{
public static final String TYPE = "end";
// stream/topic
private final String stream;
// partitionId -> sequence number
@ -126,13 +127,7 @@ public class SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetTyp
SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> other
)
{
if (this.getClass() != other.getClass()) {
throw new IAE(
"Expected instance of %s, got %s",
this.getClass().getName(),
other.getClass().getName()
);
}
validateSequenceNumbersBaseType(other);
final SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetType> otherEnd =
(SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetType>) other;
@ -151,13 +146,7 @@ public class SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetTyp
@Override
public int compareTo(SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> other, Comparator<SequenceOffsetType> comparator)
{
if (this.getClass() != other.getClass()) {
throw new IAE(
"Expected instance of %s, got %s",
this.getClass().getName(),
other.getClass().getName()
);
}
validateSequenceNumbersBaseType(other);
final SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetType> otherStart =
(SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetType>) other;
@ -185,13 +174,7 @@ public class SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetTyp
SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> other
)
{
if (this.getClass() != other.getClass()) {
throw new IAE(
"Expected instance of %s, got %s",
this.getClass().getName(),
other.getClass().getName()
);
}
validateSequenceNumbersBaseType(other);
final SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetType> otherEnd =
(SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetType>) other;

View File

@ -24,14 +24,15 @@ import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.java.util.common.IAE;
import java.util.Comparator;
import java.util.Map;
@JsonTypeInfo(use = Id.NAME, property = "type", defaultImpl = SeekableStreamEndSequenceNumbers.class)
@JsonSubTypes({
@Type(name = "start", value = SeekableStreamStartSequenceNumbers.class),
@Type(name = "end", value = SeekableStreamEndSequenceNumbers.class)
@Type(name = SeekableStreamStartSequenceNumbers.TYPE, value = SeekableStreamStartSequenceNumbers.class),
@Type(name = SeekableStreamEndSequenceNumbers.TYPE, value = SeekableStreamEndSequenceNumbers.class)
})
public interface SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType>
{
@ -40,6 +41,29 @@ public interface SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetTy
*/
String getStream();
/**
* Returns whether the sequence number data is for possibly multiple streams / topics.
*/
default boolean isMultiTopicPartition()
{
return false;
}
/**
* throws exception if this class and other class are not equal
* @param other the other instance to compare.
*/
default void validateSequenceNumbersBaseType(SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> other)
{
if (this.getClass() != other.getClass()) {
throw new IAE(
"Expected instance of %s, got %s",
this.getClass().getName(),
other.getClass().getName()
);
}
}
/**
* Returns a map of partitionId -> sequenceNumber.
*/

View File

@ -22,7 +22,6 @@ package org.apache.druid.indexing.seekablestream;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.IAE;
import javax.annotation.Nullable;
import java.util.Collections;
@ -42,6 +41,8 @@ import java.util.Set;
public class SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetType> implements
SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType>
{
public static final String TYPE = "start";
// stream/topic
private final String stream;
// partitionId -> sequence number
@ -121,13 +122,7 @@ public class SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetT
SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> other
)
{
if (this.getClass() != other.getClass()) {
throw new IAE(
"Expected instance of %s, got %s",
this.getClass().getName(),
other.getClass().getName()
);
}
validateSequenceNumbersBaseType(other);
final SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetType> otherStart =
(SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetType>) other;
@ -165,13 +160,7 @@ public class SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetT
@Override
public int compareTo(SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> other, Comparator<SequenceOffsetType> comparator)
{
if (this.getClass() != other.getClass()) {
throw new IAE(
"Expected instance of %s, got %s",
this.getClass().getName(),
other.getClass().getName()
);
}
validateSequenceNumbersBaseType(other);
final SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetType> otherStart =
(SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetType>) other;
@ -199,13 +188,7 @@ public class SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetT
SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> other
)
{
if (this.getClass() != other.getClass()) {
throw new IAE(
"Expected instance of %s, got %s",
this.getClass().getName(),
other.getClass().getName()
);
}
validateSequenceNumbersBaseType(other);
final SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetType> otherStart =
(SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetType>) other;

View File

@ -3863,10 +3863,9 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
private Map<PartitionIdType, SequenceOffsetType> getOffsetsFromMetadataStorage()
protected Map<PartitionIdType, SequenceOffsetType> getOffsetsFromMetadataStorage()
{
final DataSourceMetadata dataSourceMetadata = indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(
dataSource);
final DataSourceMetadata dataSourceMetadata = retrieveDataSourceMetadata();
if (dataSourceMetadata instanceof SeekableStreamDataSourceMetadata
&& checkSourceMetadataMatch(dataSourceMetadata)) {
@SuppressWarnings("unchecked")
@ -3889,6 +3888,11 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return Collections.emptyMap();
}
protected DataSourceMetadata retrieveDataSourceMetadata()
{
return indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(dataSource);
}
/**
* Fetches the earliest or latest offset from the stream via the {@link RecordSupplier}
*/