Adds backwards-compatible serde for SeekableStreamStartSequenceNumbers. (#7512)

This allows them to be deserialized by older Druid versions as
KafkaPartitions objects.

Fixes #7470.
This commit is contained in:
Gian Merlino 2019-04-19 13:19:45 -07:00 committed by Fangjin Yang
parent 5463ecb979
commit 1fb5ec3989
3 changed files with 118 additions and 7 deletions

View File

@ -105,7 +105,7 @@ public class SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetTyp
/**
* Identical to {@link #getStream()}. Here for backwards compatibility, so a serialized
* SeekableStreamStartSequenceNumbers can be read by older Druid versions as a KafkaPartitions object.
* SeekableStreamEndSequenceNumbers can be read by older Druid versions as a KafkaPartitions object.
*/
@JsonProperty
public String getTopic()
@ -182,7 +182,7 @@ public class SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetTyp
/**
* Identical to {@link #getPartitionSequenceNumberMap()} ()}. Here for backwards compatibility, so a serialized
* SeekableStreamStartSequenceNumbers can be read by older Druid versions as a KafkaPartitions object.
* SeekableStreamEndSequenceNumbers can be read by older Druid versions as a KafkaPartitions object.
*/
@JsonProperty
public Map<PartitionIdType, SequenceOffsetType> getPartitionOffsetMap()

View File

@ -50,16 +50,21 @@ public class SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetT
@JsonCreator
public SeekableStreamStartSequenceNumbers(
@JsonProperty("stream") final String stream,
// kept for backward compatibility
@JsonProperty("topic") final String topic,
@JsonProperty("partitionSequenceNumberMap")
final Map<PartitionIdType, SequenceOffsetType> partitionSequenceNumberMap,
// kept for backward compatibility
@JsonProperty("partitionOffsetMap") final Map<PartitionIdType, SequenceOffsetType> partitionOffsetMap,
@JsonProperty("exclusivePartitions") @Nullable final Set<PartitionIdType> exclusivePartitions
)
{
this.stream = Preconditions.checkNotNull(stream, "stream");
this.partitionSequenceNumberMap = Preconditions.checkNotNull(
partitionSequenceNumberMap,
"partitionIdToSequenceNumberMap"
);
this.stream = stream == null ? topic : stream;
this.partitionSequenceNumberMap = partitionOffsetMap == null ? partitionSequenceNumberMap : partitionOffsetMap;
Preconditions.checkNotNull(this.stream, "stream");
Preconditions.checkNotNull(this.partitionSequenceNumberMap, "partitionIdToSequenceNumberMap");
// exclusiveOffset can be null if this class is deserialized from metadata store. Note that only end offsets are
// stored in metadata store.
// The default is true because there was only Kafka indexing service before in which the end offset is always
@ -67,6 +72,15 @@ public class SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetT
this.exclusivePartitions = exclusivePartitions == null ? Collections.emptySet() : exclusivePartitions;
}
public SeekableStreamStartSequenceNumbers(
String stream,
Map<PartitionIdType, SequenceOffsetType> partitionSequenceNumberMap,
Set<PartitionIdType> exclusivePartitions
)
{
this(stream, null, partitionSequenceNumberMap, null, exclusivePartitions);
}
@Override
@JsonProperty
public String getStream()
@ -74,6 +88,16 @@ public class SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetT
return stream;
}
/**
* Identical to {@link #getStream()}. Here for backwards compatibility, so a serialized
* SeekableStreamStartSequenceNumbers can be read by older Druid versions as a KafkaPartitions object.
*/
@JsonProperty
public String getTopic()
{
return stream;
}
@Override
@JsonProperty
public Map<PartitionIdType, SequenceOffsetType> getPartitionSequenceNumberMap()
@ -81,6 +105,16 @@ public class SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetT
return partitionSequenceNumberMap;
}
/**
* Identical to {@link #getPartitionSequenceNumberMap()} ()}. Here for backwards compatibility, so a serialized
* SeekableStreamStartSequenceNumbers can be read by older Druid versions as a KafkaPartitions object.
*/
@JsonProperty
public Map<PartitionIdType, SequenceOffsetType> getPartitionOffsetMap()
{
return partitionSequenceNumberMap;
}
@Override
public SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> plus(
SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> other

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.seekablestream;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.segment.TestHelper;
import org.junit.Assert;
import org.junit.Test;
import java.util.Map;
public class SeekableStreamStartSequenceNumbersTest
{
private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper();
@Test
public void testSerde() throws Exception
{
final String stream = "theStream";
final Map<Integer, Long> offsetMap = ImmutableMap.of(1, 2L, 3, 4L);
final SeekableStreamStartSequenceNumbers<Integer, Long> partitions = new SeekableStreamStartSequenceNumbers<>(
stream,
offsetMap,
ImmutableSet.of(6)
);
final String serializedString = OBJECT_MAPPER.writeValueAsString(partitions);
// Check round-trip.
final SeekableStreamStartSequenceNumbers<Integer, Long> partitions2 = OBJECT_MAPPER.readValue(
serializedString,
new TypeReference<SeekableStreamStartSequenceNumbers<Integer, Long>>() {}
);
Assert.assertEquals("Round trip", partitions, partitions2);
// Check backwards compatibility.
final Map<String, Object> asMap = OBJECT_MAPPER.readValue(
serializedString,
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
);
Assert.assertEquals(stream, asMap.get("stream"));
Assert.assertEquals(stream, asMap.get("topic"));
// Jackson will deserialize the maps as string -> int maps, not int -> long.
Assert.assertEquals(
offsetMap,
OBJECT_MAPPER.convertValue(asMap.get("partitionSequenceNumberMap"), new TypeReference<Map<Integer, Long>>() {})
);
Assert.assertEquals(
offsetMap,
OBJECT_MAPPER.convertValue(asMap.get("partitionOffsetMap"), new TypeReference<Map<Integer, Long>>() {})
);
}
}