diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java index 3f87b2799ac..7a4d3fad72d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamEndSequenceNumbers.java @@ -105,7 +105,7 @@ public class SeekableStreamEndSequenceNumbers getPartitionOffsetMap() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java index f737292275b..9a2577158eb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbers.java @@ -50,16 +50,21 @@ public class SeekableStreamStartSequenceNumbers partitionSequenceNumberMap, + // kept for backward compatibility + @JsonProperty("partitionOffsetMap") final Map partitionOffsetMap, @JsonProperty("exclusivePartitions") @Nullable final Set exclusivePartitions ) { - this.stream = Preconditions.checkNotNull(stream, "stream"); - this.partitionSequenceNumberMap = Preconditions.checkNotNull( - partitionSequenceNumberMap, - "partitionIdToSequenceNumberMap" - ); + this.stream = stream == null ? topic : stream; + this.partitionSequenceNumberMap = partitionOffsetMap == null ? partitionSequenceNumberMap : partitionOffsetMap; + + Preconditions.checkNotNull(this.stream, "stream"); + Preconditions.checkNotNull(this.partitionSequenceNumberMap, "partitionIdToSequenceNumberMap"); + // exclusiveOffset can be null if this class is deserialized from metadata store. Note that only end offsets are // stored in metadata store. // The default is true because there was only Kafka indexing service before in which the end offset is always @@ -67,6 +72,15 @@ public class SeekableStreamStartSequenceNumbers partitionSequenceNumberMap, + Set exclusivePartitions + ) + { + this(stream, null, partitionSequenceNumberMap, null, exclusivePartitions); + } + @Override @JsonProperty public String getStream() @@ -74,6 +88,16 @@ public class SeekableStreamStartSequenceNumbers getPartitionSequenceNumberMap() @@ -81,6 +105,16 @@ public class SeekableStreamStartSequenceNumbers getPartitionOffsetMap() + { + return partitionSequenceNumberMap; + } + @Override public SeekableStreamSequenceNumbers plus( SeekableStreamSequenceNumbers other diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbersTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbersTest.java new file mode 100644 index 00000000000..f4342e27acb --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamStartSequenceNumbersTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.java.util.common.jackson.JacksonUtils; +import org.apache.druid.segment.TestHelper; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; + +public class SeekableStreamStartSequenceNumbersTest +{ + private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper(); + + @Test + public void testSerde() throws Exception + { + final String stream = "theStream"; + final Map offsetMap = ImmutableMap.of(1, 2L, 3, 4L); + + final SeekableStreamStartSequenceNumbers partitions = new SeekableStreamStartSequenceNumbers<>( + stream, + offsetMap, + ImmutableSet.of(6) + ); + final String serializedString = OBJECT_MAPPER.writeValueAsString(partitions); + + // Check round-trip. + final SeekableStreamStartSequenceNumbers partitions2 = OBJECT_MAPPER.readValue( + serializedString, + new TypeReference>() {} + ); + + Assert.assertEquals("Round trip", partitions, partitions2); + + // Check backwards compatibility. + final Map asMap = OBJECT_MAPPER.readValue( + serializedString, + JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT + ); + + Assert.assertEquals(stream, asMap.get("stream")); + Assert.assertEquals(stream, asMap.get("topic")); + + // Jackson will deserialize the maps as string -> int maps, not int -> long. + Assert.assertEquals( + offsetMap, + OBJECT_MAPPER.convertValue(asMap.get("partitionSequenceNumberMap"), new TypeReference>() {}) + ); + Assert.assertEquals( + offsetMap, + OBJECT_MAPPER.convertValue(asMap.get("partitionOffsetMap"), new TypeReference>() {}) + ); + } +}