Handle null values in Range Partition dimension distribution (#11973)

This PR adds support for handling null dimension values while creating partition boundaries
in range partitioning.

This means that we can now have partition boundaries like [null, "abc"] or ["abc", null, "def"].
This commit is contained in:
Kashif Faraz 2021-11-24 14:30:02 +05:30 committed by GitHub
parent e6570cadc4
commit 48dbe0ea45
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 209 additions and 6 deletions

View File

@ -68,8 +68,9 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask
{
public static final String TYPE = "partial_dimension_distribution";
// Future work: StringDistribution does not handle inserting NULLs. This is the same behavior as hadoop indexing.
private static final boolean SKIP_NULL = true;
// Do not skip nulls as StringDistribution can handle null values.
// This behavior is different from hadoop indexing.
private static final boolean SKIP_NULL = false;
private final int numAttempts;
private final ParallelIndexIngestionSpec ingestionSchema;
@ -276,9 +277,10 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask
}
String[] values = new String[partitionDimensions.size()];
for (int i = 0; i < partitionDimensions.size(); ++i) {
values[i] = Iterables.getOnlyElement(
inputRow.getDimension(partitionDimensions.get(i))
);
List<String> dimensionValues = inputRow.getDimension(partitionDimensions.get(i));
if (dimensionValues != null && !dimensionValues.isEmpty()) {
values[i] = Iterables.getOnlyElement(dimensionValues);
}
}
final StringTuple partitionDimensionValues = StringTuple.create(values);

View File

@ -33,7 +33,7 @@ import org.apache.druid.data.input.StringTuple;
*/
public class ArrayOfStringTuplesSerDe extends ArrayOfItemsSerDe<StringTuple>
{
private static final ArrayOfStringsSerDe STRINGS_SERDE = new ArrayOfStringsSerDe();
private static final ArrayOfStringsNullSafeSerde STRINGS_SERDE = new ArrayOfStringsNullSafeSerde();
@Override
public byte[] serializeToByteArray(StringTuple[] items)

View File

@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task.batch.parallel.distribution;
import org.apache.datasketches.ArrayOfItemsSerDe;
import org.apache.datasketches.ArrayOfStringsSerDe;
import org.apache.datasketches.Util;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.data.input.StringTuple;
import org.apache.druid.java.util.common.IAE;
import java.nio.charset.StandardCharsets;
/**
* Serde for {@link StringTuple}.
* <p>
* The implementation is the same as {@link ArrayOfStringsSerDe}, except this
* class handles null String values as well.
*/
public class ArrayOfStringsNullSafeSerde extends ArrayOfItemsSerDe<String>
{
private static final int NULL_STRING_LENGTH = -1;
@Override
public byte[] serializeToByteArray(final String[] items)
{
// Determine the bytes for each String
int length = 0;
final byte[][] itemsBytes = new byte[items.length][];
for (int i = 0; i < items.length; i++) {
length += Integer.BYTES;
// Do not initialize the byte array for a null String
if (items[i] != null) {
itemsBytes[i] = items[i].getBytes(StandardCharsets.UTF_8);
length += itemsBytes[i].length;
}
}
// Create a single byte array for all the Strings
final byte[] bytes = new byte[length];
final WritableMemory mem = WritableMemory.writableWrap(bytes);
long offsetBytes = 0;
for (int i = 0; i < items.length; i++) {
if (itemsBytes[i] != null) {
// Write the length of the array and the array itself
mem.putInt(offsetBytes, itemsBytes[i].length);
offsetBytes += Integer.BYTES;
mem.putByteArray(offsetBytes, itemsBytes[i], 0, itemsBytes[i].length);
offsetBytes += itemsBytes[i].length;
} else {
mem.putInt(offsetBytes, NULL_STRING_LENGTH);
offsetBytes += Integer.BYTES;
}
}
return bytes;
}
@Override
public String[] deserializeFromMemory(final Memory mem, final int numItems)
{
final String[] array = new String[numItems];
long offsetBytes = 0;
for (int i = 0; i < numItems; i++) {
// Read the length of the ith String
Util.checkBounds(offsetBytes, Integer.BYTES, mem.getCapacity());
final int strLength = mem.getInt(offsetBytes);
offsetBytes += Integer.BYTES;
if (strLength >= 0) {
// Read the bytes for the String
final byte[] bytes = new byte[strLength];
Util.checkBounds(offsetBytes, strLength, mem.getCapacity());
mem.getByteArray(offsetBytes, bytes, 0, strLength);
offsetBytes += strLength;
array[i] = new String(bytes, StandardCharsets.UTF_8);
} else if (strLength != NULL_STRING_LENGTH) {
throw new IAE(
"Illegal strLength [%s] at offset [%s]. Must be %s, 0 or a positive integer.",
strLength,
offsetBytes,
NULL_STRING_LENGTH
);
}
}
return array;
}
}

View File

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task.batch.parallel.distribution;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.java.util.common.IAE;
import org.junit.Assert;
import org.junit.Test;
public class ArrayOfStringsNullSafeSerdeTest
{
private final ArrayOfStringsNullSafeSerde serde = new ArrayOfStringsNullSafeSerde();
@Test
public void testStringArray()
{
testSerde("abc", "def", "xyz");
testSerde("abc", "123", "456.0");
}
@Test
public void testSingletonArray()
{
testSerde("abc");
testSerde("xyz");
}
@Test
public void testEmptyArray()
{
testSerde();
}
@Test
public void testArrayWithNullString()
{
testSerde((String) null);
testSerde("abc", null, "def");
testSerde(null, null, null);
}
@Test
public void testArrayWithEmptyString()
{
testSerde("");
testSerde("abc", "def", "");
testSerde("", "", "");
testSerde("", null, "abc");
}
@Test
public void testIllegalStrLength()
{
// bytes for length = -2
final byte[] bytes = {-2, -1, -1, -1};
IAE exception = Assert.assertThrows(
IAE.class,
() -> serde.deserializeFromMemory(Memory.wrap(bytes), 1)
);
Assert.assertEquals(
"Illegal strLength [-2] at offset [4]. Must be -1, 0 or a positive integer.",
exception.getMessage()
);
}
private void testSerde(String... inputArray)
{
byte[] bytes = serde.serializeToByteArray(inputArray);
String[] deserialized = serde.deserializeFromMemory(Memory.wrap(bytes), inputArray.length);
Assert.assertEquals(inputArray, deserialized);
}
}