From 2351f038eb79e2090fc56bb1773b54cbd7a9f26f Mon Sep 17 00:00:00 2001 From: zachjsh Date: Wed, 17 Apr 2024 10:00:17 -0400 Subject: [PATCH] Kafka with topicPattern can ignore old offsets spuriously (#16190) * * fix * * simplify * * simplify tests * * update matches function definition for Kafka Datasource Metadata * * add matchesOld * * override matches and plus for kafka based metadata / sequence numbers * * implement minus * add tests * * fix failing tests * * remove TODO comments * * simplfy and add comments * * remove unused variable in tests * * remove unneeded function * * add serde tests * * more stuff * * address review comments * * remove unneeded code. --- .../kafka/KafkaDataSourceMetadata.java | 91 ++- ...KafkaSeekableStreamEndSequenceNumbers.java | 179 +++++ ...fkaSeekableStreamStartSequenceNumbers.java | 214 ++++++ .../kafka/supervisor/KafkaSupervisor.java | 80 ++ .../kafka/KafkaDataSourceMetadataTest.java | 718 +++++++++++++++++- ...aSeekableStreamEndSequenceNumbersTest.java | 129 ++++ ...eekableStreamStartSequenceNumbersTest.java | 141 ++++ .../kafka/supervisor/KafkaSupervisorTest.java | 230 +++++- .../SeekableStreamEndSequenceNumbers.java | 27 +- .../SeekableStreamSequenceNumbers.java | 28 +- .../SeekableStreamStartSequenceNumbers.java | 27 +- .../supervisor/SeekableStreamSupervisor.java | 10 +- 12 files changed, 1802 insertions(+), 72 deletions(-) create mode 100644 extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamEndSequenceNumbers.java create mode 100644 extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamStartSequenceNumbers.java create mode 100644 extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamEndSequenceNumbersTest.java create mode 100644 extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamStartSequenceNumbersTest.java diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java index a7b73f445ac..cfa30809948 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java @@ -26,19 +26,41 @@ import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata; import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers; +import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.utils.CollectionUtils; +import org.apache.kafka.common.TopicPartition; import java.util.Comparator; +import java.util.Map; public class KafkaDataSourceMetadata extends SeekableStreamDataSourceMetadata implements Comparable { + private static final Logger LOGGER = new Logger(KafkaDataSourceMetadata.class); @JsonCreator public KafkaDataSourceMetadata( @JsonProperty("partitions") SeekableStreamSequenceNumbers kafkaPartitions ) { - super(kafkaPartitions); + super(kafkaPartitions == null + ? null + : kafkaPartitions instanceof SeekableStreamStartSequenceNumbers + ? + new KafkaSeekableStreamStartSequenceNumbers( + kafkaPartitions.getStream(), + ((SeekableStreamStartSequenceNumbers) kafkaPartitions).getTopic(), + kafkaPartitions.getPartitionSequenceNumberMap(), + ((SeekableStreamStartSequenceNumbers) kafkaPartitions).getPartitionOffsetMap(), + ((SeekableStreamStartSequenceNumbers) kafkaPartitions).getExclusivePartitions() + ) + : new KafkaSeekableStreamEndSequenceNumbers( + kafkaPartitions.getStream(), + ((SeekableStreamEndSequenceNumbers) kafkaPartitions).getTopic(), + kafkaPartitions.getPartitionSequenceNumberMap(), + ((SeekableStreamEndSequenceNumbers) kafkaPartitions).getPartitionOffsetMap() + )); } @Override @@ -76,4 +98,71 @@ public class KafkaDataSourceMetadata extends SeekableStreamDataSourceMetadata otherSequenceNumbers = otherMetadata.getSeekableStreamSequenceNumbers(); + if (!getSeekableStreamSequenceNumbers().isMultiTopicPartition() && !otherSequenceNumbers.isMultiTopicPartition()) { + return false; + } + final SeekableStreamSequenceNumbers mergedSequenceNumbers = thisPlusOther.getSeekableStreamSequenceNumbers(); + + final Map topicAndPartitionToSequenceNumber = CollectionUtils.mapKeys( + mergedSequenceNumbers.getPartitionSequenceNumberMap(), + k -> k.asTopicPartition(mergedSequenceNumbers.getStream()) + ); + + boolean allOtherFoundAndConsistent = otherSequenceNumbers.getPartitionSequenceNumberMap().entrySet().stream().noneMatch( + e -> { + KafkaTopicPartition kafkaTopicPartition = e.getKey(); + TopicPartition topicPartition = kafkaTopicPartition.asTopicPartition(otherSequenceNumbers.getStream()); + Long sequenceOffset = topicAndPartitionToSequenceNumber.get(topicPartition); + long oldSequenceOffset = e.getValue(); + if (sequenceOffset == null || !sequenceOffset.equals(oldSequenceOffset)) { + LOGGER.info( + "sequenceOffset found for currently computed and stored metadata does not match for " + + "topicPartition: [%s]. currentSequenceOffset: [%s], oldSequenceOffset: [%s]", + topicPartition, + sequenceOffset, + oldSequenceOffset + ); + return true; + } + return false; + } + ); + + boolean allThisFoundAndConsistent = this.getSeekableStreamSequenceNumbers().getPartitionSequenceNumberMap().entrySet().stream().noneMatch( + e -> { + KafkaTopicPartition kafkaTopicPartition = e.getKey(); + TopicPartition topicPartition = kafkaTopicPartition.asTopicPartition(this.getSeekableStreamSequenceNumbers().getStream()); + Long oldSequenceOffset = topicAndPartitionToSequenceNumber.get(topicPartition); + long sequenceOffset = e.getValue(); + if (oldSequenceOffset == null || !oldSequenceOffset.equals(sequenceOffset)) { + LOGGER.info( + "sequenceOffset found for currently computed and stored metadata does not match for " + + "topicPartition: [%s]. currentSequenceOffset: [%s], oldSequenceOffset: [%s]", + topicPartition, + sequenceOffset, + oldSequenceOffset + ); + return true; + } + return false; + } + ); + + return allOtherFoundAndConsistent && allThisFoundAndConsistent; + } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamEndSequenceNumbers.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamEndSequenceNumbers.java new file mode 100644 index 00000000000..5476f88c6ce --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamEndSequenceNumbers.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.kafka; + +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.druid.data.input.kafka.KafkaTopicPartition; +import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; +import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers; +import org.apache.druid.utils.CollectionUtils; + +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * Represents the kafka based end sequenceNumber per partition of a sequence. This class is needed because + * of special handling that must be done for multi-topic partitions to ensure that offsets are preserved. + *

+ * Do not register this class as a subtype of base class in Jackson. We want this class to be serialized + * when written to DB as a {@link SeekableStreamEndSequenceNumbers}. Do not create instances of this class + * directly from jackson mapper. + */ +@JsonTypeName(SeekableStreamEndSequenceNumbers.TYPE) +public class KafkaSeekableStreamEndSequenceNumbers extends SeekableStreamEndSequenceNumbers +{ + private final boolean isMultiTopicPartition; + + public KafkaSeekableStreamEndSequenceNumbers( + final String stream, + // kept for backward compatibility + final String topic, + final Map partitionSequenceNumberMap, + // kept for backward compatibility + final Map partitionOffsetMap + ) + { + super(stream, topic, partitionSequenceNumberMap, partitionOffsetMap); + // how to know it topicPattern if the partitionSequenceNumberMap is empty? + isMultiTopicPartition = !partitionSequenceNumberMap.isEmpty() && partitionSequenceNumberMap.keySet() + .stream() + .findFirst() + .get() + .isMultiTopicPartition(); + } + + @Override + public boolean isMultiTopicPartition() + { + return isMultiTopicPartition; + } + + @Override + public SeekableStreamSequenceNumbers plus( + SeekableStreamSequenceNumbers other + ) + { + validateSequenceNumbersBaseType(other); + + KafkaSeekableStreamEndSequenceNumbers that = (KafkaSeekableStreamEndSequenceNumbers) other; + + if (!this.isMultiTopicPartition() && !that.isMultiTopicPartition()) { + return super.plus(other); + } + + String thisTopic = getStream(); + String thatTopic = that.getStream(); + final Map newMap; + if (!isMultiTopicPartition()) { + // going from topicPattern to single topic + + // start with existing sequence numbers which in this case will be all single topic. + newMap = new HashMap<>(getPartitionSequenceNumberMap()); + + // add all sequence numbers from other where the topic name matches this topic. Transform to single topic + // as in this case we will be returning a single topic based sequence map. + newMap.putAll(that.getPartitionSequenceNumberMap().entrySet().stream() + .filter(e -> { + if (e.getKey().topic().isPresent()) { + return e.getKey().topic().get().equals(thisTopic); + } else { + // this branch shouldn't really be hit since other should be multi-topic here, but adding this + // just in case. + return thatTopic.equals(thisTopic); + } + }) + .collect(Collectors.toMap( + e -> new KafkaTopicPartition(false, thisTopic, e.getKey().partition()), + Map.Entry::getValue + ))); + } else { + // going from single topic or topicPattern to topicPattern + + // start with existing sequence numbers and transform them to multit-topic keys, as the returned + // sequence numbers will be multi-topic based. + newMap = CollectionUtils.mapKeys( + getPartitionSequenceNumberMap(), + k -> new KafkaTopicPartition( + true, + k.asTopicPartition(thisTopic).topic(), + k.partition() + ) + ); + + // add all sequence numbers from other where the topic name matches the pattern of this topic regex. Transform to + // multi-topic as in this case we will be returning a multi-topic based sequence map. + Pattern pattern = Pattern.compile(thisTopic); + newMap.putAll(that.getPartitionSequenceNumberMap().entrySet().stream() + .filter(e -> { + if (e.getKey().topic().isPresent()) { + return pattern.matcher(e.getKey().topic().get()).matches(); + } else { + return pattern.matcher(thatTopic).matches(); + } + }) + .collect(Collectors.toMap( + e -> new KafkaTopicPartition(true, e.getKey().asTopicPartition(thatTopic).topic(), e.getKey().partition()), + Map.Entry::getValue + ))); + } + + return new SeekableStreamEndSequenceNumbers<>(getStream(), newMap); + } + + @Override + public SeekableStreamSequenceNumbers minus( + SeekableStreamSequenceNumbers other + ) + { + validateSequenceNumbersBaseType(other); + + final KafkaSeekableStreamEndSequenceNumbers otherEnd = + (KafkaSeekableStreamEndSequenceNumbers) other; + + if (!this.isMultiTopicPartition() && !otherEnd.isMultiTopicPartition()) { + return super.minus(other); + } + + final Map newMap = new HashMap<>(); + //String thisTopic = getStream(); + String thatTopic = otherEnd.getStream(); + + + // remove partitions present in "that" from "this", check for exact match, multi-topic match, or single-topic match + for (Map.Entry entry : getPartitionSequenceNumberMap().entrySet()) { + String thisTopic = entry.getKey().asTopicPartition(getStream()).topic(); + boolean otherContainsThis = otherEnd.getPartitionSequenceNumberMap().containsKey(entry.getKey()); + boolean otherContainsThisMultiTopic = otherEnd.getPartitionSequenceNumberMap() + .containsKey(new KafkaTopicPartition(true, thisTopic, entry.getKey().partition())); + boolean otherContainsThisSingleTopic = (thatTopic.equals(thisTopic) && otherEnd.getPartitionSequenceNumberMap() + .containsKey(new KafkaTopicPartition(false, null, entry.getKey().partition()))); + if (!otherContainsThis && !otherContainsThisMultiTopic && !otherContainsThisSingleTopic) { + newMap.put(entry.getKey(), entry.getValue()); + } + } + + return new SeekableStreamEndSequenceNumbers<>( + getStream(), + newMap + ); + } +} diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamStartSequenceNumbers.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamStartSequenceNumbers.java new file mode 100644 index 00000000000..24a3e08b5d5 --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamStartSequenceNumbers.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.kafka; + +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.druid.data.input.kafka.KafkaTopicPartition; +import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers; +import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; +import org.apache.druid.utils.CollectionUtils; + +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * Represents the kafka based start sequenceNumber per partition of a sequence. This class is needed because + * of special handling that must be done for multi-topic partitions to ensure that offsets are preserved. + *

+ * Do not register this class as a subtype of base class in Jackson. We want this class to be serialized + * when written to DB as a {@link SeekableStreamStartSequenceNumbers}. Do not create instances of this class + * directly from jackson mapper. + */ +@JsonTypeName(SeekableStreamStartSequenceNumbers.TYPE) +public class KafkaSeekableStreamStartSequenceNumbers extends SeekableStreamStartSequenceNumbers +{ + private final boolean isMultiTopicPartition; + + public KafkaSeekableStreamStartSequenceNumbers( + String stream, + String topic, + Map partitionSequenceNumberMap, + Map partitionOffsetMap, + @Nullable Set exclusivePartitions + ) + { + super(stream, topic, partitionSequenceNumberMap, partitionOffsetMap, exclusivePartitions); + // how to know it topicPattern if the partitionSequenceNumberMap is empty? + isMultiTopicPartition = !partitionSequenceNumberMap.isEmpty() && partitionSequenceNumberMap.keySet() + .stream() + .findFirst() + .get() + .isMultiTopicPartition(); + } + + @Override + public boolean isMultiTopicPartition() + { + return isMultiTopicPartition; + } + + @Override + public SeekableStreamSequenceNumbers plus( + SeekableStreamSequenceNumbers other + ) + { + validateSequenceNumbersBaseType(other); + + KafkaSeekableStreamStartSequenceNumbers that = (KafkaSeekableStreamStartSequenceNumbers) other; + + if (!this.isMultiTopicPartition() && !that.isMultiTopicPartition()) { + return super.plus(other); + } + + + String thisTopic = getStream(); + String thatTopic = that.getStream(); + final Map newMap; + final Set newExclusivePartitions = new HashSet<>(); + if (!isMultiTopicPartition()) { + // going from topicPattern to single topic + + // start with existing sequence numbers which in this case will be all single topic. + newMap = new HashMap<>(getPartitionSequenceNumberMap()); + + // add all sequence numbers from other where the topic name matches this topic. Transform to single topic + // as in this case we will be returning a single topic based sequence map. + newMap.putAll(that.getPartitionSequenceNumberMap().entrySet().stream() + .filter(e -> { + if (e.getKey().topic().isPresent()) { + return e.getKey().topic().get().equals(thisTopic); + } else { + // this branch shouldn't really be hit since other should be multi-topic here, but adding this + // just in case. + return thatTopic.equals(thisTopic); + } + }) + .collect(Collectors.toMap( + e -> new KafkaTopicPartition(false, thisTopic, e.getKey().partition()), + Map.Entry::getValue + ))); + + // A partition is exclusive if it's + // 1) exclusive in "this" and it's not in "other"'s partitionSequenceNumberMap or + // 2) exclusive in "other" + getPartitionSequenceNumberMap().forEach( + (partitionId, sequenceOffset) -> { + KafkaTopicPartition multiTopicPartitonIdToSearch = new KafkaTopicPartition(true, thisTopic, partitionId.partition()); + if (getExclusivePartitions().contains(partitionId) && !that.getPartitionSequenceNumberMap().containsKey(multiTopicPartitonIdToSearch)) { + newExclusivePartitions.add(new KafkaTopicPartition(false, this.getStream(), partitionId.partition())); + } + } + ); + newExclusivePartitions.addAll(that.getExclusivePartitions()); + } else { + // going from single topic or topicPattern to topicPattern + + // start with existing sequence numbers and transform them to multit-topic keys, as the returned + // sequence numbers will be multi-topic based. + newMap = CollectionUtils.mapKeys( + getPartitionSequenceNumberMap(), + k -> new KafkaTopicPartition( + true, + k.asTopicPartition(thisTopic).topic(), + k.partition() + ) + ); + + // add all sequence numbers from other where the topic name matches the pattern of this topic regex. Transform to + // multi-topic as in this case we will be returning a multi-topic based sequence map. + Pattern pattern = Pattern.compile(thisTopic); + newMap.putAll(that.getPartitionSequenceNumberMap().entrySet().stream() + .filter(e -> { + if (e.getKey().topic().isPresent()) { + return pattern.matcher(e.getKey().topic().get()).matches(); + } else { + return pattern.matcher(thatTopic).matches(); + } + }) + .collect(Collectors.toMap( + e -> new KafkaTopicPartition(true, e.getKey().asTopicPartition(thatTopic).topic(), e.getKey().partition()), + Map.Entry::getValue + ))); + + // A partition is exclusive if it's + // 1) exclusive in "this" and it's not in "other"'s partitionSequenceNumberMap or + // 2) exclusive in "other" + getPartitionSequenceNumberMap().forEach( + (partitionId, sequenceOffset) -> { + KafkaTopicPartition multiTopicPartitonIdToSearch = new KafkaTopicPartition(true, thisTopic, partitionId.partition()); + boolean thatTopicMatchesThisTopicPattern = partitionId.topic().isPresent() ? pattern.matcher(partitionId.topic().get()).matches() : pattern.matcher(thatTopic).matches(); + if (getExclusivePartitions().contains(partitionId) && (!thatTopicMatchesThisTopicPattern || !that.getPartitionSequenceNumberMap().containsKey(multiTopicPartitonIdToSearch))) { + newExclusivePartitions.add(new KafkaTopicPartition(true, this.getStream(), partitionId.partition())); + } + } + ); + newExclusivePartitions.addAll(that.getExclusivePartitions()); + } + + return new SeekableStreamStartSequenceNumbers<>(getStream(), newMap, newExclusivePartitions); + } + + @Override + public SeekableStreamSequenceNumbers minus( + SeekableStreamSequenceNumbers other + ) + { + validateSequenceNumbersBaseType(other); + + final KafkaSeekableStreamStartSequenceNumbers otherStart = + (KafkaSeekableStreamStartSequenceNumbers) other; + + if (!this.isMultiTopicPartition() && !otherStart.isMultiTopicPartition()) { + return super.minus(other); + } + + final Map newMap = new HashMap<>(); + final Set newExclusivePartitions = new HashSet<>(); + String thatTopic = otherStart.getStream(); + + // remove partitions present in "that" from "this", check for exact match, multi-topic match, or single-topic match + for (Map.Entry entry : getPartitionSequenceNumberMap().entrySet()) { + String thisTopic = entry.getKey().asTopicPartition(getStream()).topic(); + boolean otherContainsThis = otherStart.getPartitionSequenceNumberMap().containsKey(entry.getKey()); + boolean otherContainsThisMultiTopic = otherStart.getPartitionSequenceNumberMap() + .containsKey(new KafkaTopicPartition(true, thisTopic, entry.getKey().partition())); + boolean otherContainsThisSingleTopic = (thatTopic.equals(thisTopic) && otherStart.getPartitionSequenceNumberMap() + .containsKey(new KafkaTopicPartition(false, null, entry.getKey().partition()))); + if (!otherContainsThis && !otherContainsThisMultiTopic && !otherContainsThisSingleTopic) { + newMap.put(entry.getKey(), entry.getValue()); + // A partition is exclusive if it's exclusive in "this" and not in "other"'s partitionSequenceNumberMap + if (getExclusivePartitions().contains(entry.getKey())) { + newExclusivePartitions.add(entry.getKey()); + } + } + } + + return new SeekableStreamStartSequenceNumbers<>( + getStream(), + newMap, + newExclusivePartitions + ); + } +} diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index e9c6b835fda..aebacecff66 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -44,6 +44,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig; +import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; @@ -63,12 +64,14 @@ import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; import java.util.Set; import java.util.TreeMap; +import java.util.regex.Pattern; import java.util.stream.Collectors; /** @@ -92,6 +95,7 @@ public class KafkaSupervisor extends SeekableStreamSupervisor latestSequenceFromStream; @@ -122,6 +126,7 @@ public class KafkaSupervisor extends SeekableStreamSupervisor getOffsetsFromMetadataStorage() + { + final DataSourceMetadata dataSourceMetadata = retrieveDataSourceMetadata(); + if (checkSourceMetadataMatch(dataSourceMetadata)) { + @SuppressWarnings("unchecked") + SeekableStreamSequenceNumbers partitions = ((KafkaDataSourceMetadata) dataSourceMetadata) + .getSeekableStreamSequenceNumbers(); + if (partitions != null && partitions.getPartitionSequenceNumberMap() != null) { + Map partitionOffsets = new HashMap<>(); + Set topicMisMatchLogged = new HashSet<>(); + partitions.getPartitionSequenceNumberMap().forEach((kafkaTopicPartition, value) -> { + final String matchValue; + // previous offsets are from multi-topic config + if (kafkaTopicPartition.topic().isPresent()) { + matchValue = kafkaTopicPartition.topic().get(); + } else { + // previous offsets are from single topic config + matchValue = partitions.getStream(); + } + + KafkaTopicPartition matchingTopicPartition = getMatchingKafkaTopicPartition(kafkaTopicPartition, matchValue); + + if (matchingTopicPartition == null && !topicMisMatchLogged.contains(matchValue)) { + log.warn( + "Topic/stream in metadata storage [%s] doesn't match spec topic/stream [%s], ignoring stored sequences", + matchValue, + getIoConfig().getStream() + ); + topicMisMatchLogged.add(matchValue); + } + if (matchingTopicPartition != null) { + partitionOffsets.put(matchingTopicPartition, value); + } + }); + return partitionOffsets; + } + } + + return Collections.emptyMap(); + } + + @Nullable + private KafkaTopicPartition getMatchingKafkaTopicPartition( + final KafkaTopicPartition kafkaTopicPartition, + final String streamMatchValue + ) + { + final boolean match; + + match = pattern != null + ? pattern.matcher(streamMatchValue).matches() + : getIoConfig().getStream().equals(streamMatchValue); + + return match ? new KafkaTopicPartition(isMultiTopic(), streamMatchValue, kafkaTopicPartition.partition()) : null; + } } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadataTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadataTest.java index 800a1fedf98..0b013b69a71 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadataTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadataTest.java @@ -21,6 +21,7 @@ package org.apache.druid.indexing.kafka; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.inject.Injector; @@ -36,17 +37,34 @@ import org.apache.druid.utils.CollectionUtils; import org.junit.Assert; import org.junit.Test; +import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.regex.Pattern; public class KafkaDataSourceMetadataTest { - private static final KafkaDataSourceMetadata START0 = startMetadata(ImmutableMap.of()); - private static final KafkaDataSourceMetadata START1 = startMetadata(ImmutableMap.of(0, 2L, 1, 3L)); - private static final KafkaDataSourceMetadata START2 = startMetadata(ImmutableMap.of(0, 2L, 1, 4L, 2, 5L)); - private static final KafkaDataSourceMetadata START3 = startMetadata(ImmutableMap.of(0, 2L, 2, 5L)); - private static final KafkaDataSourceMetadata END0 = endMetadata(ImmutableMap.of()); - private static final KafkaDataSourceMetadata END1 = endMetadata(ImmutableMap.of(0, 2L, 2, 5L)); - private static final KafkaDataSourceMetadata END2 = endMetadata(ImmutableMap.of(0, 2L, 1, 4L)); + private static final KafkaDataSourceMetadata START0 = startMetadata("foo", ImmutableMap.of()); + private static final KafkaDataSourceMetadata START1 = startMetadata("foo", ImmutableMap.of(0, 2L, 1, 3L)); + private static final KafkaDataSourceMetadata START2 = startMetadata("foo", ImmutableMap.of(0, 2L, 1, 4L, 2, 5L)); + private static final KafkaDataSourceMetadata START3 = startMetadata("foo", ImmutableMap.of(0, 2L, 2, 5L)); + private static final KafkaDataSourceMetadata START4 = startMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of()); + private static final KafkaDataSourceMetadata START5 = startMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(0, 2L, 1, 3L)); + private static final KafkaDataSourceMetadata START6 = startMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 1, 3L)); + private static final KafkaDataSourceMetadata START7 = startMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 2, 5L)); + private static final KafkaDataSourceMetadata START8 = startMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of(0, 2L, 2, 5L)); + private static final KafkaDataSourceMetadata START9 = startMetadataMultiTopic("foo2.*", ImmutableList.of("foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L)); + private static final KafkaDataSourceMetadata END0 = endMetadata("foo", ImmutableMap.of()); + private static final KafkaDataSourceMetadata END1 = endMetadata("foo", ImmutableMap.of(0, 2L, 2, 5L)); + private static final KafkaDataSourceMetadata END2 = endMetadata("foo", ImmutableMap.of(0, 2L, 1, 4L)); + private static final KafkaDataSourceMetadata END3 = endMetadata("foo", ImmutableMap.of(0, 2L, 1, 3L)); + private static final KafkaDataSourceMetadata END4 = endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of()); + private static final KafkaDataSourceMetadata END5 = endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(0, 2L, 2, 5L)); + private static final KafkaDataSourceMetadata END6 = endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(0, 2L, 1, 4L)); + private static final KafkaDataSourceMetadata END7 = endMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 2, 5L)); + private static final KafkaDataSourceMetadata END8 = endMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 1, 4L)); + private static final KafkaDataSourceMetadata END9 = endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of(0, 2L, 2, 5L)); + private static final KafkaDataSourceMetadata END10 = endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L)); @Test public void testMatches() @@ -55,33 +73,332 @@ public class KafkaDataSourceMetadataTest Assert.assertTrue(START0.matches(START1)); Assert.assertTrue(START0.matches(START2)); Assert.assertTrue(START0.matches(START3)); + // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior + Assert.assertFalse(START0.matches(START4)); + Assert.assertTrue(START0.matches(START5)); + // when merging, we lose the sequence numbers for topic foo2 here when merging, so false + Assert.assertFalse(START0.matches(START6)); + // when merging, we lose the sequence numbers for topic foo2 here when merging, so false + Assert.assertFalse(START0.matches(START7)); + // when merging, we lose the sequence numbers for topic foo2 here when merging, so false + Assert.assertFalse(START0.matches(START8)); + // when merging, we lose the sequence numbers for topics foo2, and foo22 here when merging, so false + Assert.assertFalse(START0.matches(START9)); Assert.assertTrue(START1.matches(START0)); Assert.assertTrue(START1.matches(START1)); + // sequence numbers for topic foo partition 1 are inconsistent, so false Assert.assertFalse(START1.matches(START2)); Assert.assertTrue(START1.matches(START3)); + // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4)); + Assert.assertTrue(START1.matches(START5)); + // when merging, we lose the sequence numbers for topic foo2, and sequence numbers for topic foo-1 are inconsistent, so false + Assert.assertFalse(START1.matches(START6)); + // when merging, we lose the sequence numbers for topic foo2, so false + Assert.assertFalse(START1.matches(START7)); + // when merging, we lose the sequence numbers for topic foo2, so false + Assert.assertFalse(START1.matches(START8)); + // when merging, we lose the sequence numbers for topics foo2 and foo22, so false + Assert.assertFalse(START1.matches(START9)); Assert.assertTrue(START2.matches(START0)); Assert.assertFalse(START2.matches(START1)); Assert.assertTrue(START2.matches(START2)); Assert.assertTrue(START2.matches(START3)); + // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4)); + Assert.assertFalse(START2.matches(START4)); + // when merging, sequence numbers for topic foo-1 are inconsistent, so false + Assert.assertFalse(START2.matches(START5)); + // when merging, we lose the sequence numbers for topic foo2, and sequence numbers for topic foo-1 are inconsistent, so false + Assert.assertFalse(START2.matches(START6)); + // when merging, we lose the sequence numbers for topic foo2 here when merging, so false + Assert.assertFalse(START2.matches(START7)); Assert.assertTrue(START3.matches(START0)); Assert.assertTrue(START3.matches(START1)); Assert.assertTrue(START3.matches(START2)); Assert.assertTrue(START3.matches(START3)); + // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4)); + Assert.assertFalse(START3.matches(START4)); + Assert.assertTrue(START3.matches(START5)); + // when merging, we lose the sequence numbers for topic foo2, so false + Assert.assertFalse(START3.matches(START6)); + // when merging, we lose the sequence numbers for topic foo2, so false + Assert.assertFalse(START3.matches(START7)); + // when merging, we lose the sequence numbers for topic foo2, so false + Assert.assertFalse(START3.matches(START8)); + // when merging, we lose the sequence numbers for topics foo2 and foo22, so false + Assert.assertFalse(START3.matches(START9)); + + // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4)); + Assert.assertFalse(START4.matches(START0)); + // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4)); + Assert.assertFalse(START4.matches(START1)); + // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4)); + Assert.assertFalse(START4.matches(START2)); + // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4)); + Assert.assertFalse(START4.matches(START3)); + Assert.assertTrue(START4.matches(START4)); + // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4)); + Assert.assertFalse(START4.matches(START5)); + // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4)); + Assert.assertFalse(START4.matches(START6)); + // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4)); + Assert.assertFalse(START4.matches(START7)); + // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4)); + Assert.assertFalse(START4.matches(START8)); + // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4)); + Assert.assertFalse(START4.matches(START9)); + + Assert.assertTrue(START5.matches(START0)); + Assert.assertTrue(START5.matches(START1)); + // when merging, the sequence numbers for topic foo-1 are inconsistent, so false + Assert.assertFalse(START5.matches(START2)); + Assert.assertTrue(START5.matches(START3)); + Assert.assertTrue(START5.matches(START4)); + Assert.assertTrue(START5.matches(START5)); + Assert.assertTrue(START5.matches(START6)); + Assert.assertTrue(START5.matches(START7)); + Assert.assertTrue(START5.matches(START8)); + Assert.assertTrue(START5.matches(START9)); + + Assert.assertTrue(START6.matches(START0)); + Assert.assertTrue(START6.matches(START1)); + // when merging, the sequence numbers for topic foo-1 are inconsistent, so false + Assert.assertFalse(START6.matches(START2)); + Assert.assertTrue(START6.matches(START3)); + Assert.assertTrue(START6.matches(START4)); + Assert.assertTrue(START6.matches(START5)); + Assert.assertTrue(START6.matches(START6)); + Assert.assertTrue(START6.matches(START7)); + Assert.assertTrue(START6.matches(START8)); + Assert.assertTrue(START6.matches(START9)); + + Assert.assertTrue(START7.matches(START0)); + Assert.assertTrue(START7.matches(START1)); + Assert.assertTrue(START7.matches(START2)); + Assert.assertTrue(START7.matches(START3)); + Assert.assertTrue(START7.matches(START4)); + Assert.assertTrue(START7.matches(START5)); + Assert.assertTrue(START7.matches(START6)); + Assert.assertTrue(START7.matches(START7)); + Assert.assertTrue(START7.matches(START8)); + Assert.assertTrue(START7.matches(START9)); + + Assert.assertTrue(START8.matches(START0)); + // when merging, we lose the sequence numbers for topic foo, so false + Assert.assertFalse(START8.matches(START1)); + // when merging, we lose the sequence numbers for topic foo, so false + Assert.assertFalse(START8.matches(START2)); + // when merging, we lose the sequence numbers for topic foo, so false + Assert.assertFalse(START8.matches(START3)); + Assert.assertTrue(START8.matches(START4)); + // when merging, we lose the sequence numbers for topic foo, so false + Assert.assertFalse(START8.matches(START5)); + // when merging, we lose the sequence numbers for topic foo, so false + Assert.assertFalse(START8.matches(START6)); + // when merging, we lose the sequence numbers for topic foo, so false + Assert.assertFalse(START8.matches(START7)); + Assert.assertTrue(START8.matches(START8)); + Assert.assertTrue(START8.matches(START9)); + + Assert.assertTrue(START9.matches(START0)); + // when merging, we lose the sequence numbers for topic foo, so false + Assert.assertFalse(START9.matches(START1)); + // when merging, we lose the sequence numbers for topic foo, so false + Assert.assertFalse(START9.matches(START2)); + // when merging, we lose the sequence numbers for topic foo, so false + Assert.assertFalse(START9.matches(START3)); + Assert.assertTrue(START9.matches(START4)); + // when merging, we lose the sequence numbers for topic foo, so false + Assert.assertFalse(START9.matches(START5)); + // when merging, we lose the sequence numbers for topic foo, so false + Assert.assertFalse(START9.matches(START6)); + // when merging, we lose the sequence numbers for topic foo, so false + Assert.assertFalse(START9.matches(START7)); + Assert.assertTrue(START9.matches(START8)); + Assert.assertTrue(START9.matches(START9)); Assert.assertTrue(END0.matches(END0)); Assert.assertTrue(END0.matches(END1)); Assert.assertTrue(END0.matches(END2)); + Assert.assertTrue(END0.matches(END3)); + // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4)); + Assert.assertFalse(END0.matches(END4)); + Assert.assertTrue(END0.matches(END5)); + Assert.assertTrue(END0.matches(END6)); + // when merging, we lose the sequence numbers for topic foo2, so false + Assert.assertFalse(END0.matches(END7)); + // when merging, we lose the sequence numbers for topic foo2, so false + Assert.assertFalse(END0.matches(END8)); + // when merging, we lose the sequence numbers for topic foo2, so false + Assert.assertFalse(END0.matches(END9)); + // when merging, we lose the sequence numbers for topic foo2, so false + Assert.assertFalse(END0.matches(END10)); Assert.assertTrue(END1.matches(END0)); Assert.assertTrue(END1.matches(END1)); Assert.assertTrue(END1.matches(END2)); + Assert.assertTrue(END1.matches(END3)); + // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4)); + Assert.assertFalse(END1.matches(END4)); + Assert.assertTrue(END1.matches(END5)); + Assert.assertTrue(END1.matches(END6)); + // when merging, we lose the sequence numbers for topic foo2 here when merging, so false + Assert.assertFalse(END1.matches(END7)); + // when merging, we lose the sequence numbers for topic foo2 here when merging, so false + Assert.assertFalse(END1.matches(END8)); + // when merging, we lose the sequence numbers for topic foo2 here when merging, so false + Assert.assertFalse(END1.matches(END9)); + // when merging, we lose the sequence numbers for topic foo2 here when merging, so false + Assert.assertFalse(END1.matches(END10)); Assert.assertTrue(END2.matches(END0)); Assert.assertTrue(END2.matches(END1)); Assert.assertTrue(END2.matches(END2)); + // when merging, the sequence numbers for topic foo-1 are inconsistent, so false + Assert.assertFalse(END2.matches(END3)); + // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4)); + Assert.assertFalse(END2.matches(END4)); + Assert.assertTrue(END2.matches(END5)); + Assert.assertTrue(END2.matches(END6)); + // when merging, we lose the sequence numbers for topic foo2 here when merging, so false + Assert.assertFalse(END2.matches(END7)); + // when merging, we lose the sequence numbers for topic foo2 here when merging, so false + Assert.assertFalse(END2.matches(END8)); + // when merging, we lose the sequence numbers for topic foo2 here when merging, so false + Assert.assertFalse(END2.matches(END9)); + // when merging, we lose the sequence numbers for topic foo2 here when merging, so false + Assert.assertFalse(END2.matches(END10)); + + Assert.assertTrue(END3.matches(END0)); + Assert.assertTrue(END3.matches(END1)); + // when merging, the sequence numbers for topic foo-1 are inconsistent, so false + Assert.assertFalse(END3.matches(END2)); + Assert.assertTrue(END3.matches(END3)); + // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4)); + Assert.assertFalse(END3.matches(END4)); + Assert.assertTrue(END3.matches(END5)); + // when merging, the sequence numbers for topic foo-1 are inconsistent, so false + Assert.assertFalse(END3.matches(END6)); + // when merging, we lose the sequence numbers for topic foo2 here when merging, so false + Assert.assertFalse(END3.matches(END7)); + // when merging, we lose the sequence numbers for topic foo2 here when merging, so false + Assert.assertFalse(END3.matches(END8)); + // when merging, we lose the sequence numbers for topic foo2 here when merging, so false + Assert.assertFalse(END3.matches(END9)); + // when merging, we lose the sequence numbers for topic foo2 here when merging, so false + Assert.assertFalse(END3.matches(END10)); + + // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4)); + Assert.assertFalse(END4.matches(END0)); + // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4)); + Assert.assertFalse(END4.matches(END1)); + // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4)); + Assert.assertFalse(END4.matches(END2)); + // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4)); + Assert.assertFalse(END4.matches(END3)); + Assert.assertTrue(END4.matches(END4)); + // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4)); + Assert.assertFalse(END4.matches(END5)); + // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4)); + Assert.assertFalse(END4.matches(END6)); + // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4)); + Assert.assertFalse(END4.matches(END7)); + // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4)); + Assert.assertFalse(END4.matches(END8)); + // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4)); + Assert.assertFalse(END4.matches(END9)); + // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4)); + Assert.assertFalse(END4.matches(END10)); + + Assert.assertTrue(END5.matches(END0)); + Assert.assertTrue(END5.matches(END1)); + Assert.assertTrue(END5.matches(END2)); + Assert.assertTrue(END5.matches(END3)); + Assert.assertTrue(END5.matches(END4)); + Assert.assertTrue(END5.matches(END5)); + Assert.assertTrue(END5.matches(END6)); + Assert.assertTrue(END5.matches(END7)); + Assert.assertTrue(END5.matches(END8)); + Assert.assertTrue(END5.matches(END9)); + Assert.assertTrue(END5.matches(END10)); + + Assert.assertTrue(END6.matches(END0)); + Assert.assertTrue(END6.matches(END1)); + Assert.assertTrue(END6.matches(END2)); + // when merging, the sequence numbers for topic foo-1 are inconsistent, so false + Assert.assertFalse(END6.matches(END3)); + Assert.assertTrue(END6.matches(END4)); + Assert.assertTrue(END6.matches(END5)); + Assert.assertTrue(END6.matches(END6)); + Assert.assertTrue(END6.matches(END7)); + Assert.assertTrue(END6.matches(END8)); + Assert.assertTrue(END6.matches(END9)); + Assert.assertTrue(END6.matches(END10)); + + Assert.assertTrue(END7.matches(END0)); + Assert.assertTrue(END7.matches(END1)); + Assert.assertTrue(END7.matches(END2)); + Assert.assertTrue(END7.matches(END3)); + Assert.assertTrue(END7.matches(END4)); + Assert.assertTrue(END7.matches(END5)); + Assert.assertTrue(END7.matches(END6)); + Assert.assertTrue(END7.matches(END7)); + Assert.assertTrue(END7.matches(END8)); + Assert.assertTrue(END7.matches(END9)); + Assert.assertTrue(END7.matches(END10)); + + Assert.assertTrue(END8.matches(END0)); + Assert.assertTrue(END8.matches(END1)); + Assert.assertTrue(END8.matches(END2)); + // when merging, the sequence numbers for topic foo-1, and foo-2 are inconsistent, so false + Assert.assertFalse(END8.matches(END3)); + Assert.assertTrue(END8.matches(END4)); + Assert.assertTrue(END8.matches(END5)); + Assert.assertTrue(END8.matches(END6)); + Assert.assertTrue(END8.matches(END7)); + Assert.assertTrue(END8.matches(END8)); + Assert.assertTrue(END8.matches(END9)); + Assert.assertTrue(END8.matches(END10)); + + Assert.assertTrue(END9.matches(END0)); + // when merging, we lose the sequence numbers for topic foo2 here when merging, so false + Assert.assertFalse(END9.matches(END1)); + // when merging, we lose the sequence numbers for topic foo2 here when merging, so false + Assert.assertFalse(END9.matches(END2)); + // when merging, we lose the sequence numbers for topic foo2 here when merging, so false + Assert.assertFalse(END9.matches(END3)); + Assert.assertTrue(END9.matches(END4)); + // when merging, we lose the sequence numbers for topic foo2 here when merging, so false + Assert.assertFalse(END9.matches(END5)); + // when merging, we lose the sequence numbers for topic foo2 here when merging, so false + Assert.assertFalse(END9.matches(END6)); + // when merging, we lose the sequence numbers for topic foo2 here when merging, so false + Assert.assertFalse(END9.matches(END7)); + // when merging, we lose the sequence numbers for topic foo2 here when merging, so false + Assert.assertFalse(END9.matches(END8)); + Assert.assertTrue(END9.matches(END9)); + Assert.assertTrue(END9.matches(END10)); + + Assert.assertTrue(END10.matches(END0)); + // when merging, we lose the sequence numbers for topic foo2 here when merging, so false + Assert.assertFalse(END10.matches(END1)); + // when merging, we lose the sequence numbers for topic foo2 here when merging, so false + Assert.assertFalse(END10.matches(END2)); + // when merging, we lose the sequence numbers for topic foo2 here when merging, so false + Assert.assertFalse(END10.matches(END3)); + Assert.assertTrue(END10.matches(END4)); + // when merging, we lose the sequence numbers for topic foo2 here when merging, so false + Assert.assertFalse(END10.matches(END5)); + // when merging, we lose the sequence numbers for topic foo2 here when merging, so false + Assert.assertFalse(END10.matches(END6)); + // when merging, we lose the sequence numbers for topic foo2 here when merging, so false + Assert.assertFalse(END10.matches(END7)); + // when merging, we lose the sequence numbers for topic foo2 here when merging, so false + Assert.assertFalse(END10.matches(END8)); + Assert.assertTrue(END10.matches(END9)); + Assert.assertTrue(END10.matches(END10)); } @Test @@ -91,6 +408,12 @@ public class KafkaDataSourceMetadataTest Assert.assertTrue(START1.isValidStart()); Assert.assertTrue(START2.isValidStart()); Assert.assertTrue(START3.isValidStart()); + Assert.assertTrue(START4.isValidStart()); + Assert.assertTrue(START5.isValidStart()); + Assert.assertTrue(START6.isValidStart()); + Assert.assertTrue(START7.isValidStart()); + Assert.assertTrue(START8.isValidStart()); + Assert.assertTrue(START9.isValidStart()); } @Test @@ -121,6 +444,85 @@ public class KafkaDataSourceMetadataTest START2.plus(START2) ); + // add comment on this + Assert.assertEquals( + START4, + START1.plus(START4) + ); + + Assert.assertEquals( + startMetadata(ImmutableMap.of(0, 2L, 1, 3L)), + START1.plus(START5) + ); + + Assert.assertEquals( + startMetadata(ImmutableMap.of(0, 2L, 1, 3L)), + START1.plus(START6) + ); + + Assert.assertEquals( + startMetadata(ImmutableMap.of(0, 2L, 1, 3L, 2, 5L)), + START1.plus(START7) + ); + + Assert.assertEquals( + startMetadata(ImmutableMap.of(0, 2L, 1, 3L, 2, 5L)), + START2.plus(START6) + ); + + // add comment on this + Assert.assertEquals( + START0, + START4.plus(START0) + ); + + // add comment on this + Assert.assertEquals( + START1, + START4.plus(START1) + ); + + // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4)); + Assert.assertEquals( + START4, + START4.plus(START5) + ); + + Assert.assertEquals( + START5, + START5.plus(START4) + ); + + Assert.assertEquals( + startMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 1, 3L)), + START5.plus(START6) + ); + + Assert.assertEquals( + startMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 2, 5L)), + START7.plus(START8) + ); + + Assert.assertEquals( + startMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L)), + START7.plus(START9) + ); + + Assert.assertEquals( + startMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of(0, 2L, 2, 5L)), + START8.plus(START7) + ); + + Assert.assertEquals( + startMetadataMultiTopic("foo2.*", ImmutableList.of("foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L)), + START8.plus(START9) + ); + + Assert.assertEquals( + startMetadataMultiTopic("foo2.*", ImmutableList.of("foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L)), + START9.plus(START8) + ); + Assert.assertEquals( endMetadata(ImmutableMap.of(0, 2L, 2, 5L)), END0.plus(END1) @@ -130,26 +532,136 @@ public class KafkaDataSourceMetadataTest endMetadata(ImmutableMap.of(0, 2L, 1, 4L, 2, 5L)), END1.plus(END2) ); + + // add comment on this + Assert.assertEquals( + END4, + END0.plus(END4) + ); + + // add comment on this + Assert.assertEquals( + END4, + END1.plus(END4) + ); + + // add comment on this + Assert.assertEquals( + END0, + END4.plus(END0) + ); + + // add comment on this + Assert.assertEquals( + END1, + END4.plus(END1) + ); + + Assert.assertEquals( + END3, + END2.plus(END3) + ); + + Assert.assertEquals( + END2, + END3.plus(END2) + ); + + // when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4)); + Assert.assertEquals( + END4, + END4.plus(END5) + ); + + Assert.assertEquals( + END5, + END5.plus(END4) + ); + + Assert.assertEquals( + endMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 2, 5L)), + END5.plus(END9) + ); + + Assert.assertEquals( + endMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L)), + END5.plus(END10) + ); + + Assert.assertEquals( + endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(0, 2L, 1, 4L, 2, 5L)), + END5.plus(END6) + ); + + Assert.assertEquals( + endMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 2, 5L)), + END5.plus(END7) + ); + + Assert.assertEquals( + endMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 2, 5L)), + END7.plus(END5) + ); + + Assert.assertEquals( + endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of(0, 2L, 2, 5L)), + END9.plus(END5) + ); + + Assert.assertEquals( + endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L)), + END9.plus(END10) + ); + + Assert.assertEquals( + endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L)), + END10.plus(END9) + ); } @Test public void testMinus() { - Assert.assertEquals( - startMetadata(ImmutableMap.of(1, 3L)), - START1.minus(START3) - ); - Assert.assertEquals( startMetadata(ImmutableMap.of()), START0.minus(START2) ); + Assert.assertEquals( + START0, + START0.minus(START4) + ); + Assert.assertEquals( startMetadata(ImmutableMap.of()), START1.minus(START2) ); + Assert.assertEquals( + startMetadata(ImmutableMap.of(1, 3L)), + START1.minus(START3) + ); + + Assert.assertEquals( + START1, + START1.minus(START4) + ); + + Assert.assertEquals( + startMetadata("foo", ImmutableMap.of()), + START1.minus(START5) + ); + + Assert.assertEquals( + startMetadata("foo", ImmutableMap.of()), + START1.minus(START6) + ); + + Assert.assertEquals( + startMetadata("foo", ImmutableMap.of(1, 3L)), + START1.minus(START7) + ); + Assert.assertEquals( startMetadata(ImmutableMap.of(2, 5L)), START2.minus(START1) @@ -160,6 +672,21 @@ public class KafkaDataSourceMetadataTest START2.minus(START2) ); + Assert.assertEquals( + START4, + START4.minus(START0) + ); + + Assert.assertEquals( + START4, + START4.minus(START1) + ); + + Assert.assertEquals( + startMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of()), + START5.minus(START1) + ); + Assert.assertEquals( endMetadata(ImmutableMap.of(1, 4L)), END2.minus(END1) @@ -169,6 +696,101 @@ public class KafkaDataSourceMetadataTest endMetadata(ImmutableMap.of(2, 5L)), END1.minus(END2) ); + + Assert.assertEquals( + END0, + END0.minus(END4) + ); + + Assert.assertEquals( + END4, + END4.minus(END0) + ); + + Assert.assertEquals( + END1, + END1.minus(END4) + ); + + Assert.assertEquals( + END4, + END4.minus(END1) + ); + + Assert.assertEquals( + endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of()), + END5.minus(END1) + ); + + Assert.assertEquals( + endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(0, 2L, 2, 5L)), + END5.minus(END4) + ); + + Assert.assertEquals( + endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(2, 5L)), + END5.minus(END6) + ); + + Assert.assertEquals( + endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(1, 4L)), + END6.minus(END5) + ); + + Assert.assertEquals( + endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(1, 4L)), + END6.minus(END7) + ); + + Assert.assertEquals( + endMetadataMultiTopic("foo.*", ImmutableList.of("foo2"), ImmutableMap.of(0, 2L, 2, 5L)), + END7.minus(END5) + ); + + Assert.assertEquals( + endMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(2, 5L)), + END7.minus(END8) + ); + + Assert.assertEquals( + endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(0, 2L, 2, 5L)), + END7.minus(END9) + ); + + Assert.assertEquals( + endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(0, 2L, 2, 5L)), + END7.minus(END10) + ); + + Assert.assertEquals( + END9, + END9.minus(END6) + ); + + Assert.assertEquals( + endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of()), + END9.minus(END7) + ); + + Assert.assertEquals( + endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of(2, 5L)), + END9.minus(END8) + ); + + Assert.assertEquals( + endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of()), + END9.minus(END9) + ); + + Assert.assertEquals( + endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of()), + END9.minus(END10) + ); + + Assert.assertEquals( + endMetadataMultiTopic("foo2.*", ImmutableList.of("foo22"), ImmutableMap.of(0, 2L, 2, 5L)), + END10.minus(END9) + ); } @Test @@ -203,19 +825,59 @@ public class KafkaDataSourceMetadataTest } private static KafkaDataSourceMetadata startMetadata(Map offsets) + { + return startMetadata("foo", offsets); + } + + private static KafkaDataSourceMetadata startMetadata( + String topic, + Map offsets + ) { Map newOffsets = CollectionUtils.mapKeys( offsets, k -> new KafkaTopicPartition( false, - "foo", + topic, k ) ); - return new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>("foo", newOffsets, ImmutableSet.of())); + return new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(topic, newOffsets, ImmutableSet.of())); } + private static KafkaDataSourceMetadata startMetadataMultiTopic( + String topicPattern, + List topics, + Map offsets + ) + { + Assert.assertFalse(topics.isEmpty()); + Pattern pattern = Pattern.compile(topicPattern); + Assert.assertTrue(topics.stream().allMatch(t -> pattern.matcher(t).matches())); + Map newOffsets = new HashMap<>(); + for (Map.Entry e : offsets.entrySet()) { + for (String topic : topics) { + newOffsets.put( + new KafkaTopicPartition( + true, + topic, + e.getKey() + + ), + e.getValue() + ); + } + } + return new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(topicPattern, newOffsets, ImmutableSet.of())); + } + + private static KafkaDataSourceMetadata endMetadata(Map offsets) + { + return endMetadata("foo", offsets); + } + + private static KafkaDataSourceMetadata endMetadata(String topic, Map offsets) { Map newOffsets = CollectionUtils.mapKeys( offsets, @@ -225,7 +887,33 @@ public class KafkaDataSourceMetadataTest k ) ); - return new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>("foo", newOffsets)); + return new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, newOffsets)); + } + + private static KafkaDataSourceMetadata endMetadataMultiTopic( + String topicPattern, + List topics, + Map offsets + ) + { + Assert.assertFalse(topics.isEmpty()); + Pattern pattern = Pattern.compile(topicPattern); + Assert.assertTrue(topics.stream().allMatch(t -> pattern.matcher(t).matches())); + Map newOffsets = new HashMap<>(); + for (Map.Entry e : offsets.entrySet()) { + for (String topic : topics) { + newOffsets.put( + new KafkaTopicPartition( + true, + topic, + e.getKey() + + ), + e.getValue() + ); + } + } + return new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topicPattern, newOffsets)); } private static ObjectMapper createObjectMapper() diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamEndSequenceNumbersTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamEndSequenceNumbersTest.java new file mode 100644 index 00000000000..e3b3cef6cb2 --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamEndSequenceNumbersTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.kafka; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.exc.InvalidTypeIdException; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Injector; +import com.google.inject.name.Names; +import org.apache.druid.data.input.kafka.KafkaTopicPartition; +import org.apache.druid.guice.StartupInjectorBuilder; +import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; +import org.apache.druid.initialization.CoreInjectorBuilder; +import org.apache.druid.initialization.DruidModule; +import org.apache.druid.java.util.common.jackson.JacksonUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; + +public class KafkaSeekableStreamEndSequenceNumbersTest +{ + + private static final ObjectMapper OBJECT_MAPPER = createObjectMapper(); + + @Test + public void testSerde() throws Exception + { + final String stream = "theStream"; + final Map offsetMap = ImmutableMap.of( + new KafkaTopicPartition(false, null, 1), 2L, + new KafkaTopicPartition(false, null, 3), 4L + ); + + final KafkaSeekableStreamEndSequenceNumbers partitions = new KafkaSeekableStreamEndSequenceNumbers( + stream, + null, + offsetMap, + null + ); + final String serializedString = OBJECT_MAPPER.writeValueAsString(partitions); + + // Check round-trip. + final SeekableStreamEndSequenceNumbers partitions2 = OBJECT_MAPPER.readValue( + serializedString, + new TypeReference>() {} + ); + + Assert.assertEquals( + "Round trip", + partitions, + new KafkaSeekableStreamEndSequenceNumbers(partitions2.getStream(), + partitions2.getTopic(), + partitions2.getPartitionSequenceNumberMap(), + partitions2.getPartitionOffsetMap() + ) + ); + + // Check backwards compatibility. + final Map asMap = OBJECT_MAPPER.readValue( + serializedString, + JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT + ); + + Assert.assertEquals(stream, asMap.get("stream")); + Assert.assertEquals(stream, asMap.get("topic")); + + // Jackson will deserialize the maps as string -> int maps, not int -> long. + Assert.assertEquals( + offsetMap, + OBJECT_MAPPER.convertValue(asMap.get("partitionSequenceNumberMap"), new TypeReference>() {}) + ); + Assert.assertEquals( + offsetMap, + OBJECT_MAPPER.convertValue(asMap.get("partitionOffsetMap"), new TypeReference>() {}) + ); + + // check that KafkaSeekableStreamEndSequenceNumbers not registered with mapper, so no possible collision + // when deserializing it from String / bytes + boolean expectedExceptionThrown = false; + try { + OBJECT_MAPPER.readValue( + serializedString, + KafkaSeekableStreamEndSequenceNumbers.class + ); + } + catch (InvalidTypeIdException e) { + expectedExceptionThrown = true; + } + + Assert.assertTrue("KafkaSeekableStreamEndSequenceNumbers should not be registered type", expectedExceptionThrown); + } + + private static ObjectMapper createObjectMapper() + { + DruidModule module = new KafkaIndexTaskModule(); + final Injector injector = new CoreInjectorBuilder(new StartupInjectorBuilder().build()) + .addModule( + binder -> { + binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test"); + binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8000); + binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(9000); + } + ) + .build(); + + ObjectMapper objectMapper = injector.getInstance(ObjectMapper.class); + module.getJacksonModules().forEach(objectMapper::registerModule); + return objectMapper; + } +} diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamStartSequenceNumbersTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamStartSequenceNumbersTest.java new file mode 100644 index 00000000000..07eaced7555 --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSeekableStreamStartSequenceNumbersTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.kafka; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.exc.InvalidTypeIdException; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.inject.Injector; +import com.google.inject.name.Names; +import org.apache.druid.data.input.kafka.KafkaTopicPartition; +import org.apache.druid.guice.StartupInjectorBuilder; +import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; +import org.apache.druid.initialization.CoreInjectorBuilder; +import org.apache.druid.initialization.DruidModule; +import org.apache.druid.java.util.common.jackson.JacksonUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; +import java.util.Set; + +public class KafkaSeekableStreamStartSequenceNumbersTest +{ + private static final ObjectMapper OBJECT_MAPPER = createObjectMapper(); + + @Test + public void testSerde() throws Exception + { + final String stream = "theStream"; + final Map offsetMap = ImmutableMap.of( + new KafkaTopicPartition(false, null, 1), 2L, + new KafkaTopicPartition(false, null, 3), 4L + ); + + Set exclusivePartitions = ImmutableSet.of(new KafkaTopicPartition(false, null, 1)); + + + final KafkaSeekableStreamStartSequenceNumbers partitions = new KafkaSeekableStreamStartSequenceNumbers( + stream, + null, + offsetMap, + null, + exclusivePartitions + ); + final String serializedString = OBJECT_MAPPER.writeValueAsString(partitions); + + // Check round-trip. + final SeekableStreamStartSequenceNumbers partitions2 = OBJECT_MAPPER.readValue( + serializedString, + new TypeReference>() {} + ); + + Assert.assertEquals( + "Round trip", + partitions, + new KafkaSeekableStreamStartSequenceNumbers( + partitions2.getStream(), + partitions2.getTopic(), + partitions2.getPartitionSequenceNumberMap(), + partitions2.getPartitionOffsetMap(), + partitions2.getExclusivePartitions() + ) + ); + + // Check backwards compatibility. + final Map asMap = OBJECT_MAPPER.readValue( + serializedString, + JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT + ); + + Assert.assertEquals(stream, asMap.get("stream")); + Assert.assertEquals(stream, asMap.get("topic")); + + // Jackson will deserialize the maps as string -> int maps, not int -> long. + Assert.assertEquals( + offsetMap, + OBJECT_MAPPER.convertValue(asMap.get("partitionSequenceNumberMap"), new TypeReference>() {}) + ); + Assert.assertEquals( + offsetMap, + OBJECT_MAPPER.convertValue(asMap.get("partitionOffsetMap"), new TypeReference>() {}) + ); + + Assert.assertEquals( + exclusivePartitions, + OBJECT_MAPPER.convertValue(asMap.get("exclusivePartitions"), new TypeReference>() {}) + ); + + // check that KafkaSeekableStreamStartSequenceNumbers not registered with mapper, so no possible collision + // when deserializing it from String / bytes + boolean expectedExceptionThrown = false; + try { + OBJECT_MAPPER.readValue( + serializedString, + KafkaSeekableStreamStartSequenceNumbers.class + ); + } + catch (InvalidTypeIdException e) { + expectedExceptionThrown = true; + } + + Assert.assertTrue("KafkaSeekableStreamStartSequenceNumbers should not be registered type", expectedExceptionThrown); + } + + private static ObjectMapper createObjectMapper() + { + DruidModule module = new KafkaIndexTaskModule(); + final Injector injector = new CoreInjectorBuilder(new StartupInjectorBuilder().build()) + .addModule( + binder -> { + binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test"); + binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8000); + binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(9000); + } + ) + .build(); + + ObjectMapper objectMapper = injector.getInstance(ObjectMapper.class); + module.getJacksonModules().forEach(objectMapper::registerModule); + return objectMapper; + } +} diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index e4b4b2f9c94..a7d47788325 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -166,6 +166,8 @@ public class KafkaSupervisorTest extends EasyMockSupport private SeekableStreamIndexTaskClient taskClient; private TaskQueue taskQueue; private String topic; + private String topicPattern; + private boolean multiTopic; private RowIngestionMetersFactory rowIngestionMetersFactory; private StubServiceEmitter serviceEmitter; private SupervisorStateManagerConfig supervisorConfig; @@ -174,7 +176,13 @@ public class KafkaSupervisorTest extends EasyMockSupport private static String getTopic() { //noinspection StringConcatenationMissingWhitespace - return TOPIC_PREFIX + topicPostfix++; + return TOPIC_PREFIX + topicPostfix; + } + + private static String getTopicPattern() + { + //noinspection StringConcatenationMissingWhitespace + return TOPIC_PREFIX + topicPostfix + ".*"; } @Parameterized.Parameters(name = "numThreads = {0}") @@ -222,6 +230,9 @@ public class KafkaSupervisorTest extends EasyMockSupport taskQueue = createMock(TaskQueue.class); topic = getTopic(); + topicPattern = getTopicPattern(); + topicPostfix++; + multiTopic = false; // assign to true in test if you wish to test multi-topic rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory(); serviceEmitter = new StubServiceEmitter("KafkaSupervisorTest", "localhost"); EmittingLogger.registerEmitter(serviceEmitter); @@ -906,11 +917,12 @@ public class KafkaSupervisorTest extends EasyMockSupport } /** - * Test generating the starting offsets from the partition data stored in druid_dataSource which contains the - * offsets of the last built segments. + * Test generating the starting offsets for single-topic config from the partition data stored in druid_dataSource which + * contains the offsets from previous single-topic config of the last built segments, where the previous topic does + * match the new configuration. */ @Test - public void testDatasourceMetadata() throws Exception + public void testDatasourceMetadataSingleTopicToSingleTopicMatch() throws Exception { supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); addSomeEvents(100); @@ -948,6 +960,203 @@ public class KafkaSupervisorTest extends EasyMockSupport ); } + /** + * Test generating the starting offsets for multi-topic config from the partition data stored in druid_dataSource which + * contains the offsets from previous single-topic config of the last built segments, where the previous topic does + * match the new configuration. + */ + @Test + public void testDatasourceMetadataSingleTopicToMultiTopicMatch() throws Exception + { + multiTopic = true; + supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); + addSomeEvents(100); + + Capture captured = Capture.newInstance(); + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); + ImmutableMap.Builder partitionSequenceNumberMap = ImmutableMap.builder(); + // these should match + partitionSequenceNumberMap.putAll(singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L)); + EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( + new KafkaDataSourceMetadata( + new SeekableStreamStartSequenceNumbers<>(topic, partitionSequenceNumberMap.build(), ImmutableSet.of()) + ) + ).anyTimes(); + EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true); + replayAll(); + + supervisor.start(); + supervisor.runInternal(); + verifyAll(); + + KafkaIndexTask task = captured.getValue(); + KafkaIndexTaskIOConfig taskConfig = task.getIOConfig(); + Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName()); + Assert.assertEquals( + 10L, + taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 0)).longValue() + ); + Assert.assertEquals( + 20L, + taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 1)).longValue() + ); + Assert.assertEquals( + 30L, + taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 2)).longValue() + ); + Assert.assertEquals(3, taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().size()); + } + + /** + * Test generating the starting offsets for multi-topic config from the partition data stored in druid_dataSource which + * contains the offsets from previous single-topic config of the last built segments, where the previous topic does + * not match the new configuration. + */ + @Test + public void testDatasourceMetadataSingleTopicToMultiTopicNotMatch() throws Exception + { + multiTopic = true; + supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); + addSomeEvents(100); + + Capture captured = Capture.newInstance(); + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); + ImmutableMap.Builder partitionSequenceNumberMap = ImmutableMap.builder(); + // these should not match + partitionSequenceNumberMap.putAll(singlePartitionMap("notMatch", 0, 10L, 1, 20L, 2, 30L)); + EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( + new KafkaDataSourceMetadata( + new SeekableStreamStartSequenceNumbers<>("notMatch", partitionSequenceNumberMap.build(), ImmutableSet.of()) + ) + ).anyTimes(); + EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true); + replayAll(); + + supervisor.start(); + supervisor.runInternal(); + verifyAll(); + + KafkaIndexTask task = captured.getValue(); + KafkaIndexTaskIOConfig taskConfig = task.getIOConfig(); + Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName()); + Assert.assertEquals( + 0L, + taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 0)).longValue() + ); + Assert.assertEquals( + 0L, + taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 1)).longValue() + ); + Assert.assertEquals( + 0L, + taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 2)).longValue() + ); + Assert.assertEquals(3, taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().size()); + } + + /** + * Test generating the starting offsets for single-topic config from the partition data stored in druid_dataSource which + * contains the offsets from previous multi-topic config of the last built segments. + */ + @Test + public void testDatasourceMetadataMultiTopicToSingleTopic() throws Exception + { + supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); + addSomeEvents(100); + + Capture captured = Capture.newInstance(); + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); + ImmutableMap.Builder partitionSequenceNumberMap = ImmutableMap.builder(); + // these should match + partitionSequenceNumberMap.putAll(multiTopicPartitionMap(topic, 0, 10L, 1, 20L, 2, 30L)); + // these should not match + partitionSequenceNumberMap.putAll(multiTopicPartitionMap("notMatch", 0, 10L, 1, 20L, 2, 30L)); + EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( + new KafkaDataSourceMetadata( + new SeekableStreamStartSequenceNumbers<>(topicPattern, partitionSequenceNumberMap.build(), ImmutableSet.of()) + ) + ).anyTimes(); + EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true); + replayAll(); + + supervisor.start(); + supervisor.runInternal(); + verifyAll(); + + KafkaIndexTask task = captured.getValue(); + KafkaIndexTaskIOConfig taskConfig = task.getIOConfig(); + Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName()); + Assert.assertEquals( + 10L, + taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 0)).longValue() + ); + Assert.assertEquals( + 20L, + taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 1)).longValue() + ); + Assert.assertEquals( + 30L, + taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(false, topic, 2)).longValue() + ); + Assert.assertEquals(3, taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().size()); + } + + /** + * Test generating the starting offsets for muti-topic config from the partition data stored in druid_dataSource which + * contains the offsets from previous multi-topic config of the last built segments. + */ + @Test + public void testDatasourceMetadataMultiTopicToMultiTopic() throws Exception + { + multiTopic = true; + supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); + addSomeEvents(100); + + Capture captured = Capture.newInstance(); + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); + ImmutableMap.Builder partitionSequenceNumberMap = ImmutableMap.builder(); + // these should match + partitionSequenceNumberMap.putAll(multiTopicPartitionMap(topic, 0, 10L, 1, 20L, 2, 30L)); + // these should not match + partitionSequenceNumberMap.putAll(multiTopicPartitionMap("notMatch", 0, 10L, 1, 20L, 2, 30L)); + EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( + new KafkaDataSourceMetadata( + new SeekableStreamStartSequenceNumbers<>(topic, partitionSequenceNumberMap.build(), ImmutableSet.of()) + ) + ).anyTimes(); + EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true); + replayAll(); + + supervisor.start(); + supervisor.runInternal(); + verifyAll(); + + KafkaIndexTask task = captured.getValue(); + KafkaIndexTaskIOConfig taskConfig = task.getIOConfig(); + Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName()); + Assert.assertEquals( + 10L, + taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 0)).longValue() + ); + Assert.assertEquals( + 20L, + taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 1)).longValue() + ); + Assert.assertEquals( + 30L, + taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new KafkaTopicPartition(true, topic, 2)).longValue() + ); + Assert.assertEquals(3, taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().size()); + } + @Test public void testBadMetadataOffsets() throws Exception { @@ -4621,8 +4830,8 @@ public class KafkaSupervisorTest extends EasyMockSupport consumerProperties.put("myCustomKey", "myCustomValue"); consumerProperties.put("bootstrap.servers", kafkaHost); KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig( - topic, - null, + multiTopic ? null : topic, + multiTopic ? topicPattern : null, INPUT_FORMAT, replicas, taskCount, @@ -5038,6 +5247,13 @@ public class KafkaSupervisorTest extends EasyMockSupport offset2, new KafkaTopicPartition(false, topic, partition3), offset3); } + private static ImmutableMap multiTopicPartitionMap(String topic, int partition1, long offset1, + int partition2, long offset2, int partition3, long offset3) + { + return ImmutableMap.of(new KafkaTopicPartition(true, topic, partition1), offset1, new KafkaTopicPartition(true, topic, partition2), + offset2, new KafkaTopicPartition(true, topic, partition3), offset3); + } + private static class TestTaskRunnerWorkItem extends TaskRunnerWorkItem { private final String taskType; @@ -5109,7 +5325,7 @@ public class KafkaSupervisorTest extends EasyMockSupport Deserializer valueDeserializerObject = new ByteArrayDeserializer(); return new KafkaRecordSupplier( new KafkaConsumer<>(props, keyDeserializerObject, valueDeserializerObject), - false + getIoConfig().isMultiTopic() ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java index a95f4cccab6..de61d4624c8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java @@ -22,7 +22,6 @@ package org.apache.druid.indexing.seekablestream; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; -import org.apache.druid.java.util.common.IAE; import java.util.Collections; import java.util.Comparator; @@ -47,6 +46,8 @@ import java.util.Objects; public class SeekableStreamEndSequenceNumbers implements SeekableStreamSequenceNumbers { + public static final String TYPE = "end"; + // stream/topic private final String stream; // partitionId -> sequence number @@ -126,13 +127,7 @@ public class SeekableStreamEndSequenceNumbers other ) { - if (this.getClass() != other.getClass()) { - throw new IAE( - "Expected instance of %s, got %s", - this.getClass().getName(), - other.getClass().getName() - ); - } + validateSequenceNumbersBaseType(other); final SeekableStreamEndSequenceNumbers otherEnd = (SeekableStreamEndSequenceNumbers) other; @@ -151,13 +146,7 @@ public class SeekableStreamEndSequenceNumbers other, Comparator comparator) { - if (this.getClass() != other.getClass()) { - throw new IAE( - "Expected instance of %s, got %s", - this.getClass().getName(), - other.getClass().getName() - ); - } + validateSequenceNumbersBaseType(other); final SeekableStreamEndSequenceNumbers otherStart = (SeekableStreamEndSequenceNumbers) other; @@ -185,13 +174,7 @@ public class SeekableStreamEndSequenceNumbers other ) { - if (this.getClass() != other.getClass()) { - throw new IAE( - "Expected instance of %s, got %s", - this.getClass().getName(), - other.getClass().getName() - ); - } + validateSequenceNumbersBaseType(other); final SeekableStreamEndSequenceNumbers otherEnd = (SeekableStreamEndSequenceNumbers) other; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSequenceNumbers.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSequenceNumbers.java index 44f1343e25d..ad1801e346d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSequenceNumbers.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSequenceNumbers.java @@ -24,14 +24,15 @@ import com.fasterxml.jackson.annotation.JsonSubTypes.Type; import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonTypeInfo.Id; import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.java.util.common.IAE; import java.util.Comparator; import java.util.Map; @JsonTypeInfo(use = Id.NAME, property = "type", defaultImpl = SeekableStreamEndSequenceNumbers.class) @JsonSubTypes({ - @Type(name = "start", value = SeekableStreamStartSequenceNumbers.class), - @Type(name = "end", value = SeekableStreamEndSequenceNumbers.class) + @Type(name = SeekableStreamStartSequenceNumbers.TYPE, value = SeekableStreamStartSequenceNumbers.class), + @Type(name = SeekableStreamEndSequenceNumbers.TYPE, value = SeekableStreamEndSequenceNumbers.class) }) public interface SeekableStreamSequenceNumbers { @@ -40,6 +41,29 @@ public interface SeekableStreamSequenceNumbers other) + { + if (this.getClass() != other.getClass()) { + throw new IAE( + "Expected instance of %s, got %s", + this.getClass().getName(), + other.getClass().getName() + ); + } + } + /** * Returns a map of partitionId -> sequenceNumber. */ diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java index b1a72ff3390..7d3eab99428 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java @@ -22,7 +22,6 @@ package org.apache.druid.indexing.seekablestream; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; -import org.apache.druid.java.util.common.IAE; import javax.annotation.Nullable; import java.util.Collections; @@ -42,6 +41,8 @@ import java.util.Set; public class SeekableStreamStartSequenceNumbers implements SeekableStreamSequenceNumbers { + public static final String TYPE = "start"; + // stream/topic private final String stream; // partitionId -> sequence number @@ -121,13 +122,7 @@ public class SeekableStreamStartSequenceNumbers other ) { - if (this.getClass() != other.getClass()) { - throw new IAE( - "Expected instance of %s, got %s", - this.getClass().getName(), - other.getClass().getName() - ); - } + validateSequenceNumbersBaseType(other); final SeekableStreamStartSequenceNumbers otherStart = (SeekableStreamStartSequenceNumbers) other; @@ -165,13 +160,7 @@ public class SeekableStreamStartSequenceNumbers other, Comparator comparator) { - if (this.getClass() != other.getClass()) { - throw new IAE( - "Expected instance of %s, got %s", - this.getClass().getName(), - other.getClass().getName() - ); - } + validateSequenceNumbersBaseType(other); final SeekableStreamStartSequenceNumbers otherStart = (SeekableStreamStartSequenceNumbers) other; @@ -199,13 +188,7 @@ public class SeekableStreamStartSequenceNumbers other ) { - if (this.getClass() != other.getClass()) { - throw new IAE( - "Expected instance of %s, got %s", - this.getClass().getName(), - other.getClass().getName() - ); - } + validateSequenceNumbersBaseType(other); final SeekableStreamStartSequenceNumbers otherStart = (SeekableStreamStartSequenceNumbers) other; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 59cd05bf349..f15a975694f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -3863,10 +3863,9 @@ public abstract class SeekableStreamSupervisor getOffsetsFromMetadataStorage() + protected Map getOffsetsFromMetadataStorage() { - final DataSourceMetadata dataSourceMetadata = indexerMetadataStorageCoordinator.retrieveDataSourceMetadata( - dataSource); + final DataSourceMetadata dataSourceMetadata = retrieveDataSourceMetadata(); if (dataSourceMetadata instanceof SeekableStreamDataSourceMetadata && checkSourceMetadataMatch(dataSourceMetadata)) { @SuppressWarnings("unchecked") @@ -3889,6 +3888,11 @@ public abstract class SeekableStreamSupervisor