diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/InputRowSerde.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/InputRowSerde.java index b9c48f45170..c0a42f2eb7f 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/InputRowSerde.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/InputRowSerde.java @@ -51,6 +51,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Consumer; /** */ @@ -63,6 +64,34 @@ public class InputRowSerde private static final IndexSerdeTypeHelper FLOAT_HELPER = new FloatIndexSerdeTypeHelper(); private static final IndexSerdeTypeHelper DOUBLE_HELPER = new DoubleIndexSerdeTypeHelper(); + private static void writeNullableNumeric( + T ret, + final ByteArrayDataOutput out, + final Supplier getDefault, + final Consumer write) + { + if (ret == null) { + ret = getDefault.get(); + } + + // Write the null byte only if the default numeric value is still null. + if (ret == null) { + out.writeByte(NullHandling.IS_NULL_BYTE); + return; + } + + if (NullHandling.sqlCompatible()) { + out.writeByte(NullHandling.IS_NOT_NULL_BYTE); + } + + write.accept(ret); + } + + private static boolean isNullByteSet(final ByteArrayDataInput in) + { + return NullHandling.sqlCompatible() && in.readByte() == NullHandling.IS_NULL_BYTE; + } + public interface IndexSerdeTypeHelper { ValueType getType(); @@ -175,12 +204,7 @@ public class InputRowSerde exceptionToThrow = pe; } - if (ret == null) { - // remove null -> zero conversion when https://github.com/apache/incubator-druid/pull/5278 series of patches is merged - // we'll also need to change the serialized encoding so that it can represent numeric nulls - ret = DimensionHandlerUtils.ZERO_LONG; - } - out.writeLong(ret); + writeNullableNumeric(ret, out, NullHandling::defaultLongValue, out::writeLong); if (exceptionToThrow != null) { throw exceptionToThrow; @@ -188,9 +212,10 @@ public class InputRowSerde } @Override + @Nullable public Long deserialize(ByteArrayDataInput in) { - return in.readLong(); + return isNullByteSet(in) ? null : in.readLong(); } } @@ -214,12 +239,7 @@ public class InputRowSerde exceptionToThrow = pe; } - if (ret == null) { - // remove null -> zero conversion when https://github.com/apache/incubator-druid/pull/5278 series of patches is merged - // we'll also need to change the serialized encoding so that it can represent numeric nulls - ret = DimensionHandlerUtils.ZERO_FLOAT; - } - out.writeFloat(ret); + writeNullableNumeric(ret, out, NullHandling::defaultFloatValue, out::writeFloat); if (exceptionToThrow != null) { throw exceptionToThrow; @@ -227,9 +247,10 @@ public class InputRowSerde } @Override + @Nullable public Float deserialize(ByteArrayDataInput in) { - return in.readFloat(); + return isNullByteSet(in) ? null : in.readFloat(); } } @@ -253,12 +274,7 @@ public class InputRowSerde exceptionToThrow = pe; } - if (ret == null) { - // remove null -> zero conversion when https://github.com/apache/incubator-druid/pull/5278 series of patches is merged - // we'll also need to change the serialized encoding so that it can represent numeric nulls - ret = DimensionHandlerUtils.ZERO_DOUBLE; - } - out.writeDouble(ret); + writeNullableNumeric(ret, out, NullHandling::defaultDoubleValue, out::writeDouble); if (exceptionToThrow != null) { throw exceptionToThrow; @@ -266,9 +282,10 @@ public class InputRowSerde } @Override + @Nullable public Double deserialize(ByteArrayDataInput in) { - return in.readDouble(); + return isNullByteSet(in) ? null : in.readDouble(); } } diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/InputRowSerdeTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/InputRowSerdeTest.java index ce9b95cd185..3d00c15627c 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/InputRowSerdeTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/InputRowSerdeTest.java @@ -254,4 +254,59 @@ public class InputRowSerdeTest result.getParseExceptionMessages() ); } + + @Test + public void testDimensionNullOrDefaultForNumerics() + { + HashMap eventWithNulls = new HashMap<>(); + eventWithNulls.put("d1", null); + eventWithNulls.put("d2", Arrays.asList("d2v1", "d2v2")); + eventWithNulls.put("d3", null); + eventWithNulls.put("d4", null); + eventWithNulls.put("d5", null); + + InputRow in = new MapBasedInputRow( + timestamp, + dims, + eventWithNulls + ); + + DimensionsSpec dimensionsSpec = new DimensionsSpec( + Arrays.asList( + new StringDimensionSchema("d1"), + new StringDimensionSchema("d2"), + new LongDimensionSchema("d3"), + new FloatDimensionSchema("d4"), + new DoubleDimensionSchema("d5") + ), + null, + null + ); + + byte[] result = InputRowSerde.toBytes(InputRowSerde.getTypeHelperMap(dimensionsSpec), in, new AggregatorFactory[0]).getSerializedRow(); + + if (NullHandling.replaceWithDefault()) { + long expected = 0; + expected += 9; // timestamp bytes + dims length + expected += 18; // dim_non_existing writes: 1 16 1 bytes + expected += 4; // d1: writes 1 2 1 bytes + expected += 14; // d2: writes 1 2 1 1 4 1 4 bytes + expected += 11; // d3: writes 1 2 8 bytes + expected += 7; // d4: writes 1 2 4 bytes + expected += 11; // d5: writes 1 2 8 bytes + expected += 1; // writes aggregator length + + Assert.assertEquals(expected, result.length); + Assert.assertArrayEquals(new byte[] {0, 0, 0, 0, 0, 0, 0, 0}, Arrays.copyOfRange(result, 48, 56)); + Assert.assertArrayEquals(new byte[] {0, 0, 0, 0}, Arrays.copyOfRange(result, 59, 63)); + Assert.assertArrayEquals(new byte[] {0, 0, 0, 0, 0, 0, 0, 0}, Arrays.copyOfRange(result, 66, 74)); + } else { + long expected = 9 + 18 + 4 + 14 + 4 + 4 + 4 + 1; + + Assert.assertEquals(expected, result.length); + Assert.assertEquals(result[48], NullHandling.IS_NULL_BYTE); + Assert.assertEquals(result[52], NullHandling.IS_NULL_BYTE); + Assert.assertEquals(result[56], NullHandling.IS_NULL_BYTE); + } + } }