Write null byte when indexing numeric dimensions with Hadoop (#7020)

* write null byte in hadoop indexing for numeric dimensions

* Add test case to check output serializing null numeric dimensions

* Remove extra line

* Add @Nullable annotations
This commit is contained in:
Ferris Tseng 2019-03-11 21:02:03 -04:00 committed by Gian Merlino
parent 9178793ab5
commit c503ba9779
2 changed files with 93 additions and 21 deletions

View File

@ -51,6 +51,7 @@ import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.function.Consumer;
/** /**
*/ */
@ -63,6 +64,34 @@ public class InputRowSerde
private static final IndexSerdeTypeHelper FLOAT_HELPER = new FloatIndexSerdeTypeHelper(); private static final IndexSerdeTypeHelper FLOAT_HELPER = new FloatIndexSerdeTypeHelper();
private static final IndexSerdeTypeHelper DOUBLE_HELPER = new DoubleIndexSerdeTypeHelper(); private static final IndexSerdeTypeHelper DOUBLE_HELPER = new DoubleIndexSerdeTypeHelper();
private static <T extends Number> void writeNullableNumeric(
T ret,
final ByteArrayDataOutput out,
final Supplier<T> getDefault,
final Consumer<T> write)
{
if (ret == null) {
ret = getDefault.get();
}
// Write the null byte only if the default numeric value is still null.
if (ret == null) {
out.writeByte(NullHandling.IS_NULL_BYTE);
return;
}
if (NullHandling.sqlCompatible()) {
out.writeByte(NullHandling.IS_NOT_NULL_BYTE);
}
write.accept(ret);
}
private static boolean isNullByteSet(final ByteArrayDataInput in)
{
return NullHandling.sqlCompatible() && in.readByte() == NullHandling.IS_NULL_BYTE;
}
public interface IndexSerdeTypeHelper<T> public interface IndexSerdeTypeHelper<T>
{ {
ValueType getType(); ValueType getType();
@ -175,12 +204,7 @@ public class InputRowSerde
exceptionToThrow = pe; exceptionToThrow = pe;
} }
if (ret == null) { writeNullableNumeric(ret, out, NullHandling::defaultLongValue, out::writeLong);
// remove null -> zero conversion when https://github.com/apache/incubator-druid/pull/5278 series of patches is merged
// we'll also need to change the serialized encoding so that it can represent numeric nulls
ret = DimensionHandlerUtils.ZERO_LONG;
}
out.writeLong(ret);
if (exceptionToThrow != null) { if (exceptionToThrow != null) {
throw exceptionToThrow; throw exceptionToThrow;
@ -188,9 +212,10 @@ public class InputRowSerde
} }
@Override @Override
@Nullable
public Long deserialize(ByteArrayDataInput in) public Long deserialize(ByteArrayDataInput in)
{ {
return in.readLong(); return isNullByteSet(in) ? null : in.readLong();
} }
} }
@ -214,12 +239,7 @@ public class InputRowSerde
exceptionToThrow = pe; exceptionToThrow = pe;
} }
if (ret == null) { writeNullableNumeric(ret, out, NullHandling::defaultFloatValue, out::writeFloat);
// remove null -> zero conversion when https://github.com/apache/incubator-druid/pull/5278 series of patches is merged
// we'll also need to change the serialized encoding so that it can represent numeric nulls
ret = DimensionHandlerUtils.ZERO_FLOAT;
}
out.writeFloat(ret);
if (exceptionToThrow != null) { if (exceptionToThrow != null) {
throw exceptionToThrow; throw exceptionToThrow;
@ -227,9 +247,10 @@ public class InputRowSerde
} }
@Override @Override
@Nullable
public Float deserialize(ByteArrayDataInput in) public Float deserialize(ByteArrayDataInput in)
{ {
return in.readFloat(); return isNullByteSet(in) ? null : in.readFloat();
} }
} }
@ -253,12 +274,7 @@ public class InputRowSerde
exceptionToThrow = pe; exceptionToThrow = pe;
} }
if (ret == null) { writeNullableNumeric(ret, out, NullHandling::defaultDoubleValue, out::writeDouble);
// remove null -> zero conversion when https://github.com/apache/incubator-druid/pull/5278 series of patches is merged
// we'll also need to change the serialized encoding so that it can represent numeric nulls
ret = DimensionHandlerUtils.ZERO_DOUBLE;
}
out.writeDouble(ret);
if (exceptionToThrow != null) { if (exceptionToThrow != null) {
throw exceptionToThrow; throw exceptionToThrow;
@ -266,9 +282,10 @@ public class InputRowSerde
} }
@Override @Override
@Nullable
public Double deserialize(ByteArrayDataInput in) public Double deserialize(ByteArrayDataInput in)
{ {
return in.readDouble(); return isNullByteSet(in) ? null : in.readDouble();
} }
} }

View File

@ -254,4 +254,59 @@ public class InputRowSerdeTest
result.getParseExceptionMessages() result.getParseExceptionMessages()
); );
} }
@Test
public void testDimensionNullOrDefaultForNumerics()
{
HashMap<String, Object> eventWithNulls = new HashMap<>();
eventWithNulls.put("d1", null);
eventWithNulls.put("d2", Arrays.asList("d2v1", "d2v2"));
eventWithNulls.put("d3", null);
eventWithNulls.put("d4", null);
eventWithNulls.put("d5", null);
InputRow in = new MapBasedInputRow(
timestamp,
dims,
eventWithNulls
);
DimensionsSpec dimensionsSpec = new DimensionsSpec(
Arrays.asList(
new StringDimensionSchema("d1"),
new StringDimensionSchema("d2"),
new LongDimensionSchema("d3"),
new FloatDimensionSchema("d4"),
new DoubleDimensionSchema("d5")
),
null,
null
);
byte[] result = InputRowSerde.toBytes(InputRowSerde.getTypeHelperMap(dimensionsSpec), in, new AggregatorFactory[0]).getSerializedRow();
if (NullHandling.replaceWithDefault()) {
long expected = 0;
expected += 9; // timestamp bytes + dims length
expected += 18; // dim_non_existing writes: 1 16 1 bytes
expected += 4; // d1: writes 1 2 1 bytes
expected += 14; // d2: writes 1 2 1 1 4 1 4 bytes
expected += 11; // d3: writes 1 2 8 bytes
expected += 7; // d4: writes 1 2 4 bytes
expected += 11; // d5: writes 1 2 8 bytes
expected += 1; // writes aggregator length
Assert.assertEquals(expected, result.length);
Assert.assertArrayEquals(new byte[] {0, 0, 0, 0, 0, 0, 0, 0}, Arrays.copyOfRange(result, 48, 56));
Assert.assertArrayEquals(new byte[] {0, 0, 0, 0}, Arrays.copyOfRange(result, 59, 63));
Assert.assertArrayEquals(new byte[] {0, 0, 0, 0, 0, 0, 0, 0}, Arrays.copyOfRange(result, 66, 74));
} else {
long expected = 9 + 18 + 4 + 14 + 4 + 4 + 4 + 1;
Assert.assertEquals(expected, result.length);
Assert.assertEquals(result[48], NullHandling.IS_NULL_BYTE);
Assert.assertEquals(result[52], NullHandling.IS_NULL_BYTE);
Assert.assertEquals(result[56], NullHandling.IS_NULL_BYTE);
}
}
} }