Ensure backward compatibility of multi dimension partitioning (#11889)

This PR has changes to ensure backward compatibility of multi dimension partitioning
such that if some middle managers are upgraded to a newer version, the cluster still
functions normally for single_dim use cases.
This commit is contained in:
Kashif Faraz 2021-11-10 10:23:34 +05:30 committed by GitHub
parent a8805ab60d
commit d3914c1a78
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 379 additions and 73 deletions

View File

@ -39,6 +39,15 @@ public class StringTuple implements Comparable<StringTuple>
return new StringTuple(values);
}
/**
* Gets the first String from the given StringTuple if the tuple is non-null
* and non-empty, null otherwise.
*/
public static String firstOrNull(StringTuple tuple)
{
return tuple == null || tuple.size() < 1 ? null : tuple.get(0);
}
@JsonCreator
public StringTuple(String[] values)
{

View File

@ -29,7 +29,12 @@ import java.util.Objects;
/**
* See {@link BuildingShardSpec} for how this class is used.
* <p>
* Calling {@link #convert(int)} on an instance of this class creates a
* {@link SingleDimensionShardSpec} if there is a single dimension or a
* {@link DimensionRangeShardSpec} if there are multiple dimensions.
*
* @see SingleDimensionShardSpec
* @see DimensionRangeShardSpec
*/
public class BuildingDimensionRangeShardSpec implements BuildingShardSpec<DimensionRangeShardSpec>
@ -68,14 +73,14 @@ public class BuildingDimensionRangeShardSpec implements BuildingShardSpec<Dimens
@Nullable
@JsonProperty("start")
public StringTuple getStart()
public StringTuple getStartTuple()
{
return start;
}
@Nullable
@JsonProperty("end")
public StringTuple getEnd()
public StringTuple getEndTuple()
{
return end;
}
@ -100,8 +105,8 @@ public class BuildingDimensionRangeShardSpec implements BuildingShardSpec<Dimens
return dimensions != null && dimensions.size() == 1
? new SingleDimensionShardSpec(
dimensions.get(0),
firstOrNull(start),
firstOrNull(end),
StringTuple.firstOrNull(start),
StringTuple.firstOrNull(end),
partitionId,
numCorePartitions
) : new DimensionRangeShardSpec(
@ -113,11 +118,6 @@ public class BuildingDimensionRangeShardSpec implements BuildingShardSpec<Dimens
);
}
private String firstOrNull(StringTuple tuple)
{
return tuple == null || tuple.size() < 1 ? null : tuple.get(0);
}
@Override
public <T> PartitionChunk<T> createChunk(T obj)
{

View File

@ -21,8 +21,13 @@ package org.apache.druid.timeline.partition;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue;
import org.apache.druid.data.input.StringTuple;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
/**
@ -30,17 +35,17 @@ import java.util.Objects;
*
* @see SingleDimensionShardSpec
*/
public class BuildingSingleDimensionShardSpec implements BuildingShardSpec<SingleDimensionShardSpec>
public class BuildingSingleDimensionShardSpec extends BuildingDimensionRangeShardSpec
{
public static final String TYPE = "building_single_dim";
private final int bucketId;
private final String dimension;
@Nullable
private final String start;
@Nullable
private final String end;
private final int partitionId;
@JsonCreator
public BuildingSingleDimensionShardSpec(
@ -51,57 +56,58 @@ public class BuildingSingleDimensionShardSpec implements BuildingShardSpec<Singl
@JsonProperty("partitionNum") int partitionNum
)
{
this.bucketId = bucketId;
super(
bucketId,
dimension == null ? Collections.emptyList() : Collections.singletonList(dimension),
start == null ? null : StringTuple.create(start),
end == null ? null : StringTuple.create(end),
partitionNum
);
this.dimension = dimension;
this.start = start;
this.end = end;
this.partitionId = partitionNum;
}
@JsonProperty("dimension")
@JsonValue
public Map<String, Object> getSerializableObject()
{
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("dimension", dimension);
jsonMap.put("start", start);
jsonMap.put("end", end);
jsonMap.put("bucketId", getBucketId());
jsonMap.put("partitionNum", getPartitionNum());
return jsonMap;
}
public String getDimension()
{
return dimension;
}
@Nullable
@JsonProperty("start")
public String getStart()
{
return start;
}
@Nullable
@JsonProperty("end")
public String getEnd()
{
return end;
}
@Override
@JsonProperty("partitionNum")
public int getPartitionNum()
{
return partitionId;
}
@Override
@JsonProperty("bucketId")
public int getBucketId()
{
return bucketId;
}
@Override
public SingleDimensionShardSpec convert(int numCorePartitions)
{
return new SingleDimensionShardSpec(dimension, start, end, partitionId, numCorePartitions);
return new SingleDimensionShardSpec(dimension, start, end, getPartitionNum(), numCorePartitions);
}
@Override
public <T> PartitionChunk<T> createChunk(T obj)
{
return new NumberedPartitionChunk<>(partitionId, 0, obj);
return new NumberedPartitionChunk<>(getPartitionNum(), 0, obj);
}
@Override
@ -114,8 +120,8 @@ public class BuildingSingleDimensionShardSpec implements BuildingShardSpec<Singl
return false;
}
BuildingSingleDimensionShardSpec that = (BuildingSingleDimensionShardSpec) o;
return bucketId == that.bucketId &&
partitionId == that.partitionId &&
return getBucketId() == that.getBucketId() &&
getPartitionNum() == that.getPartitionNum() &&
Objects.equals(dimension, that.dimension) &&
Objects.equals(start, that.start) &&
Objects.equals(end, that.end);
@ -124,18 +130,18 @@ public class BuildingSingleDimensionShardSpec implements BuildingShardSpec<Singl
@Override
public int hashCode()
{
return Objects.hash(bucketId, dimension, start, end, partitionId);
return Objects.hash(getBucketId(), dimension, start, end, getPartitionNum());
}
@Override
public String toString()
{
return "BuildingSingleDimensionShardSpec{" +
"bucketId=" + bucketId +
"bucketId=" + getBucketId() +
", dimension='" + dimension + '\'' +
", start='" + start + '\'' +
", end='" + end + '\'' +
", partitionNum=" + partitionId +
", partitionNum=" + getPartitionNum() +
'}';
}
}

View File

@ -32,7 +32,12 @@ import java.util.Objects;
/**
* See {@link BucketNumberedShardSpec} for how this class is used.
* <p>
* Calling {@link #convert(int)} on an instance of this class creates a
* {@link BuildingSingleDimensionShardSpec} if there is a single dimension
* or {@link BuildingDimensionRangeShardSpec} if there are multiple dimensions.
*
* @see BuildingSingleDimensionShardSpec
* @see BuildingDimensionRangeShardSpec
*/
public class DimensionRangeBucketShardSpec implements BucketNumberedShardSpec<BuildingDimensionRangeShardSpec>
@ -100,7 +105,20 @@ public class DimensionRangeBucketShardSpec implements BucketNumberedShardSpec<Bu
@Override
public BuildingDimensionRangeShardSpec convert(int partitionId)
{
return new BuildingDimensionRangeShardSpec(bucketId, dimensions, start, end, partitionId);
return dimensions != null && dimensions.size() == 1
? new BuildingSingleDimensionShardSpec(
bucketId,
dimensions.get(0),
StringTuple.firstOrNull(start),
StringTuple.firstOrNull(end),
partitionId
) : new BuildingDimensionRangeShardSpec(
bucketId,
dimensions,
start,
end,
partitionId
);
}
@Override

View File

@ -19,8 +19,11 @@
package org.apache.druid.timeline.partition;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.collect.ForwardingList;
import org.apache.druid.data.input.StringTuple;
import org.apache.druid.java.util.common.IAE;
import java.util.ArrayList;
import java.util.Arrays;
@ -36,13 +39,6 @@ public class PartitionBoundaries extends ForwardingList<StringTuple> implements
{
private final List<StringTuple> delegate;
// For jackson
@SuppressWarnings("unused")
private PartitionBoundaries()
{
delegate = new ArrayList<>();
}
/**
* @param partitions Elements corresponding to evenly-spaced fractional ranks of the distribution
*/
@ -71,6 +67,57 @@ public class PartitionBoundaries extends ForwardingList<StringTuple> implements
delegate = Collections.unmodifiableList(partitionBoundaries);
}
/**
* This constructor supports an array of Objects and not just an array of
* StringTuples for backward compatibility. Older versions of this class
* are serialized as a String array.
*
* @param partitions array of StringTuples or array of String
*/
@JsonCreator
private PartitionBoundaries(Object[] partitions)
{
delegate = Arrays.stream(partitions)
.map(this::toStringTuple)
.collect(Collectors.toList());
}
@JsonValue
public Object getSerializableObject()
{
boolean isSingleDim = true;
for (StringTuple tuple : delegate) {
if (tuple != null && tuple.size() != 1) {
isSingleDim = false;
break;
}
}
if (isSingleDim) {
return delegate.stream().map(StringTuple::firstOrNull).collect(Collectors.toList());
} else {
return delegate;
}
}
/**
* Converts the given item to a StringTuple.
*/
private StringTuple toStringTuple(Object item)
{
if (item == null || item instanceof StringTuple) {
return (StringTuple) item;
} else if (item instanceof String) {
return StringTuple.create((String) item);
} else if (item instanceof String[]) {
return StringTuple.create((String[]) item);
} else if (item instanceof List) {
return StringTuple.create((String[]) ((List) item).toArray(new String[0]));
} else {
throw new IAE("Item must either be a String or StringTuple");
}
}
@Override
protected List<StringTuple> delegate()
{

View File

@ -58,13 +58,7 @@ public class BuildingDimensionRangeShardSpecTest
public void testConvert_withSingleDimension()
{
Assert.assertEquals(
new SingleDimensionShardSpec(
"dim",
"start",
"end",
5,
10
),
new SingleDimensionShardSpec("dim", "start", "end", 5, 10),
new BuildingDimensionRangeShardSpec(
1,
Collections.singletonList("dim"),

View File

@ -19,16 +19,19 @@
package org.apache.druid.timeline.partition;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.InjectableValues.Std;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.java.util.common.ISE;
import org.junit.Assert;
import org.junit.Test;
import java.util.Map;
public class BuildingSingleDimensionShardSpecTest
{
private static final ObjectMapper OBJECT_MAPPER = setupObjectMapper();
@Test
public void testConvert()
{
@ -48,25 +51,88 @@ public class BuildingSingleDimensionShardSpecTest
}
@Test
public void testSerde() throws JsonProcessingException
public void testSerde()
{
final BuildingSingleDimensionShardSpec original =
new BuildingSingleDimensionShardSpec(1, "dim", "start", "end", 5);
final String json = serialize(original);
final BuildingSingleDimensionShardSpec fromJson =
(BuildingSingleDimensionShardSpec) deserialize(json, ShardSpec.class);
Assert.assertEquals(original, fromJson);
}
@Test
public void testGetSerializableObject()
{
BuildingSingleDimensionShardSpec shardSpec =
new BuildingSingleDimensionShardSpec(1, "dim", "abc", "xyz", 5);
// Verify the fields of the serializable object
Map<String, Object> jsonMap = shardSpec.getSerializableObject();
Assert.assertEquals(5, jsonMap.size());
Assert.assertEquals(1, jsonMap.get("bucketId"));
Assert.assertEquals("dim", jsonMap.get("dimension"));
Assert.assertEquals("abc", jsonMap.get("start"));
Assert.assertEquals("xyz", jsonMap.get("end"));
Assert.assertEquals(5, jsonMap.get("partitionNum"));
}
@Test
public void testDeserializeFromMap()
{
final String json = "{\"type\": \"" + BuildingSingleDimensionShardSpec.TYPE + "\","
+ " \"bucketId\":1,"
+ " \"dimension\": \"dim\","
+ " \"start\": \"abc\","
+ " \"end\": \"xyz\","
+ " \"partitionNum\": 5}";
BuildingSingleDimensionShardSpec shardSpec =
(BuildingSingleDimensionShardSpec) deserialize(json, ShardSpec.class);
Assert.assertEquals(
new BuildingSingleDimensionShardSpec(1, "dim", "abc", "xyz", 5),
shardSpec
);
}
@Test
public void testEquals()
{
Assert.assertEquals(
new BuildingSingleDimensionShardSpec(10, "dim", "start", "end", 4),
new BuildingSingleDimensionShardSpec(10, "dim", "start", "end", 4)
);
}
private static ObjectMapper setupObjectMapper()
{
final ObjectMapper mapper = ShardSpecTestUtils.initObjectMapper();
mapper.registerSubtypes(
new NamedType(BuildingSingleDimensionShardSpec.class, BuildingSingleDimensionShardSpec.TYPE)
);
mapper.setInjectableValues(new Std().addValue(ObjectMapper.class, mapper));
final BuildingSingleDimensionShardSpec original = new BuildingSingleDimensionShardSpec(1, "dim", "start", "end", 5);
final String json = mapper.writeValueAsString(original);
final BuildingSingleDimensionShardSpec fromJson = (BuildingSingleDimensionShardSpec) mapper.readValue(
json,
ShardSpec.class
);
Assert.assertEquals(original, fromJson);
return mapper;
}
@Test
public void testEquals()
private String serialize(Object object)
{
EqualsVerifier.forClass(BuildingSingleDimensionShardSpec.class).usingGetClass().verify();
try {
return OBJECT_MAPPER.writeValueAsString(object);
}
catch (Exception e) {
throw new ISE("Error while serializing");
}
}
private <T> T deserialize(String json, Class<T> clazz)
{
try {
return OBJECT_MAPPER.readValue(json, clazz);
}
catch (Exception e) {
throw new ISE(e, "Error while deserializing");
}
}
}

View File

@ -35,6 +35,7 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
public class DimensionRangeBucketShardSpecTest
@ -65,6 +66,20 @@ public class DimensionRangeBucketShardSpecTest
);
}
@Test
public void testConvert_withSingleDimension()
{
Assert.assertEquals(
new BuildingSingleDimensionShardSpec(1, "dim", "start", "end", 5),
new DimensionRangeBucketShardSpec(
1,
Collections.singletonList("dim"),
StringTuple.create("start"),
StringTuple.create("end")
).convert(5)
);
}
@Test
public void testCreateChunk()
{

View File

@ -19,10 +19,10 @@
package org.apache.druid.timeline.partition;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.data.input.StringTuple;
import org.apache.druid.java.util.common.ISE;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -34,6 +34,8 @@ import java.util.List;
public class PartitionBoundariesTest
{
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private PartitionBoundaries target;
private StringTuple[] values;
private List<StringTuple> expected;
@ -79,16 +81,108 @@ public class PartitionBoundariesTest
@Test
public void handlesRepeatedValue()
{
Assert.assertEquals(Arrays.asList(null, null), new PartitionBoundaries(StringTuple.create("a"), StringTuple.create("a"), StringTuple.create("a")));
Assert.assertEquals(
Arrays.asList(null, null),
new PartitionBoundaries(
StringTuple.create("a"),
StringTuple.create("a"),
StringTuple.create("a")
)
);
}
@Test
public void serializesDeserializes() throws JsonProcessingException
public void serializesDeserializes()
{
final ObjectMapper objectMapper = new ObjectMapper();
String serialized = objectMapper.writeValueAsString(target);
Object deserialized = objectMapper.readValue(serialized, target.getClass());
Assert.assertEquals(serialized, objectMapper.writeValueAsString(deserialized));
String serialized = serialize(target);
Object deserialized = deserialize(serialized, target.getClass());
Assert.assertEquals(serialized, serialize(deserialized));
}
@Test
public void testSerdeWithMultiDimensions()
{
PartitionBoundaries original = new PartitionBoundaries(
StringTuple.create("a", "10"),
StringTuple.create("b", "7"),
StringTuple.create("c", "4")
);
String json = serialize(original);
PartitionBoundaries deserialized = deserialize(json, PartitionBoundaries.class);
Assert.assertEquals(original, deserialized);
}
@Test
public void testGetSerializableObject_withMultiDimensions()
{
// Create a PartitionBoundaries for multiple dimensions
PartitionBoundaries multiDimBoundaries = new PartitionBoundaries(
StringTuple.create("a", "10"),
StringTuple.create("b", "7"),
StringTuple.create("c", "4")
);
// Verify that the serializable object is a List<StringTuple>
Object serializableObject = multiDimBoundaries.getSerializableObject();
Assert.assertTrue(serializableObject instanceof List);
assertThatItemsAreNullOr(StringTuple.class, (List<?>) serializableObject);
// Verify the output of getSerializableObject can be serialized/deserialized
String json = serialize(serializableObject);
PartitionBoundaries deserialized = deserialize(json, PartitionBoundaries.class);
Assert.assertEquals(multiDimBoundaries, deserialized);
}
@Test
public void testGetSerializableObject_withSingleDimension()
{
// Create a PartitionBoundaries for a single dimension
PartitionBoundaries singleDimBoundaries = new PartitionBoundaries(
StringTuple.create("a"),
StringTuple.create("b"),
StringTuple.create("c")
);
// Verify that the serializable object is a List<String>
Object serializableObject = singleDimBoundaries.getSerializableObject();
Assert.assertTrue(serializableObject instanceof List);
assertThatItemsAreNullOr(String.class, (List<?>) serializableObject);
// Verify the output of getSerializableObject can be serialized/deserialized
String json = serialize(serializableObject);
PartitionBoundaries deserialized = deserialize(json, PartitionBoundaries.class);
Assert.assertEquals(singleDimBoundaries, deserialized);
}
@Test
public void testDeserializeArrayOfString()
{
String json = "[null, \"a\", null]";
PartitionBoundaries deserialized = deserialize(json, PartitionBoundaries.class);
Assert.assertEquals(
new PartitionBoundaries(
null,
StringTuple.create("a"),
StringTuple.create("b")
),
deserialized
);
}
@Test
public void testDeserializeArrayOfTuples()
{
String json = "[null, [\"a\",\"10\"], null]";
PartitionBoundaries deserialized = deserialize(json, PartitionBoundaries.class);
Assert.assertEquals(
new PartitionBoundaries(
null,
StringTuple.create("a", "10"),
StringTuple.create("a", "20")
),
deserialized
);
}
@Test
@ -102,4 +196,41 @@ public class PartitionBoundariesTest
{
EqualsVerifier.forClass(PartitionBoundaries.class).withNonnullFields("delegate").usingGetClass().verify();
}
/**
* Asserts that all the items in the given list are either null or of the
* specified class.
*/
private <T> void assertThatItemsAreNullOr(Class<T> clazz, List<?> list)
{
if (list == null || list.isEmpty()) {
return;
}
for (Object item : list) {
if (item != null) {
Assert.assertSame(clazz, item.getClass());
}
}
}
private String serialize(Object object)
{
try {
return OBJECT_MAPPER.writeValueAsString(object);
}
catch (Exception e) {
throw new ISE("Error while serializing");
}
}
private <T> T deserialize(String json, Class<T> clazz)
{
try {
return OBJECT_MAPPER.readValue(json, clazz);
}
catch (Exception e) {
throw new ISE(e, "Error while deserializing");
}
}
}

View File

@ -19,6 +19,7 @@
package org.apache.druid.timeline.partition;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@ -163,6 +164,25 @@ public class SingleDimensionShardSpecTest
testSerde(new SingleDimensionShardSpec("dim", "abc", "xyz", 10, null));
}
@Test
public void testDeserialize() throws JsonProcessingException
{
final String json = "{\"type\": \"single\","
+ " \"dimension\": \"dim\","
+ " \"start\": \"abc\","
+ "\"end\": \"xyz\","
+ "\"partitionNum\": 5,"
+ "\"numCorePartitions\": 10}";
ShardSpec shardSpec = OBJECT_MAPPER.readValue(json, ShardSpec.class);
Assert.assertTrue(shardSpec instanceof SingleDimensionShardSpec);
SingleDimensionShardSpec singleDimShardSpec = (SingleDimensionShardSpec) shardSpec;
Assert.assertEquals(
new SingleDimensionShardSpec("dim", "abc", "xyz", 5, 10),
singleDimShardSpec
);
}
private void testSerde(SingleDimensionShardSpec shardSpec) throws IOException
{
String json = OBJECT_MAPPER.writeValueAsString(shardSpec);