diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringDeltaEncodedStagedSerde.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringDeltaEncodedStagedSerde.java index 5c188a0b9a8..1d1784285c0 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringDeltaEncodedStagedSerde.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringDeltaEncodedStagedSerde.java @@ -35,6 +35,14 @@ import java.nio.ByteOrder; * * otherwise * Long:Integer:bytes + * + * The StringSize can be following: + * -1 : Denotes an empty string + * 0 : Denotes a null string + * >0 : Denotes a non-empty string + * + * Mapping of null and empty string is done weirdly to preserve backward compatibility when nulls were returned all the + * time, and there was no distinction between empty and null string */ public class SerializablePairLongStringDeltaEncodedStagedSerde extends AbstractSerializablePairLongObjectDeltaEncodedStagedSerde { @@ -70,7 +78,13 @@ public class SerializablePairLongStringDeltaEncodedStagedSerde extends AbstractS byteBuffer.putLong(delta); } - byteBuffer.putInt(rhsBytes.length); + if (rhsString == null) { + byteBuffer.putInt(0); + } else if (rhsBytes.length == 0) { + byteBuffer.putInt(-1); + } else { + byteBuffer.putInt(rhsBytes.length); + } if (rhsBytes.length > 0) { byteBuffer.put(rhsBytes); @@ -112,6 +126,8 @@ public class SerializablePairLongStringDeltaEncodedStagedSerde extends AbstractS readOnlyBuffer.get(stringBytes, 0, stringSize); lastString = StringUtils.fromUtf8(stringBytes); + } else if (stringSize < 0) { + lastString = ""; } return new SerializablePairLongString(lhs, lastString); diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringSimpleStagedSerde.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringSimpleStagedSerde.java index 0e67b190bd0..9f9cdfaca8a 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringSimpleStagedSerde.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringSimpleStagedSerde.java @@ -33,6 +33,14 @@ import java.nio.ByteOrder; *

* or * Long:StringSize:StringData + * + * The StringSize can be following: + * -1 : Denotes an empty string + * 0 : Denotes a null string + * >0 : Denotes a non-empty string + * + * Mapping of null and empty string is done weirdly to preserve backward compatibility when nulls were returned all the + * time, and there was no distinction between empty and null string */ public class SerializablePairLongStringSimpleStagedSerde extends AbstractSerializablePairLongObjectSimpleStagedSerde { @@ -60,7 +68,13 @@ public class SerializablePairLongStringSimpleStagedSerde extends AbstractSeriali Preconditions.checkNotNull(value.lhs, "Long in SerializablePairLongString must be non-null"); byteBuffer.putLong(value.lhs); - byteBuffer.putInt(rhsBytes.length); + if (rhsString == null) { + byteBuffer.putInt(0); + } else if (rhsBytes.length == 0) { + byteBuffer.putInt(-1); + } else { + byteBuffer.putInt(rhsBytes.length); + } if (rhsBytes.length > 0) { byteBuffer.put(rhsBytes); @@ -93,6 +107,8 @@ public class SerializablePairLongStringSimpleStagedSerde extends AbstractSeriali readOnlyBuffer.get(stringBytes, 0, stringSize); lastString = StringUtils.fromUtf8(stringBytes); + } else if (stringSize < 0) { + lastString = ""; } return new SerializablePairLongString(lhs, lastString); diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/BackwardCompatibleSerializablePairLongStringDeltaEncodedStagedSerdeTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/BackwardCompatibleSerializablePairLongStringDeltaEncodedStagedSerdeTest.java new file mode 100644 index 00000000000..bc8cd8a1627 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/aggregation/BackwardCompatibleSerializablePairLongStringDeltaEncodedStagedSerdeTest.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.segment.serde.cell.RandomStringUtils; +import org.apache.druid.segment.serde.cell.StagedSerde; +import org.apache.druid.segment.serde.cell.StorableBuffer; +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Random; + +public class BackwardCompatibleSerializablePairLongStringDeltaEncodedStagedSerdeTest +{ + private static final OlderSerializablePairLongStringDeltaEncodedStagedSerde OLDER_INTEGER_SERDE = + new OlderSerializablePairLongStringDeltaEncodedStagedSerde(0L, true); + private static final SerializablePairLongStringDeltaEncodedStagedSerde INTEGER_SERDE = + new SerializablePairLongStringDeltaEncodedStagedSerde(0L, true); + + private static final OlderSerializablePairLongStringDeltaEncodedStagedSerde OLDER_LONG_SERDE = + new OlderSerializablePairLongStringDeltaEncodedStagedSerde(0L, false); + private static final SerializablePairLongStringDeltaEncodedStagedSerde LONG_SERDE = + new SerializablePairLongStringDeltaEncodedStagedSerde(0L, false); + + private static final Long TIMESTAMP = 100L; + + private final RandomStringUtils randomStringUtils = new RandomStringUtils(new Random(0)); + + @Test + public void testSimple() + { + SerializablePairLongString value = new SerializablePairLongString(TIMESTAMP, "fuu"); + testValue(value, OLDER_INTEGER_SERDE, INTEGER_SERDE); + testValue(value, OLDER_LONG_SERDE, LONG_SERDE); + } + + @Test + public void testNull() + { + testValue(null, OLDER_INTEGER_SERDE, INTEGER_SERDE); + testValue(null, OLDER_LONG_SERDE, LONG_SERDE); + } + + @Test + public void testNullString() + { + SerializablePairLongString value = new SerializablePairLongString(TIMESTAMP, null); + + // Write using the older serde, read using the newer serde + Assert.assertEquals( + new SerializablePairLongString(TIMESTAMP, null), + readUsingSerde(writeUsingSerde(value, OLDER_INTEGER_SERDE), INTEGER_SERDE) + ); + Assert.assertEquals( + new SerializablePairLongString(TIMESTAMP, null), + readUsingSerde(writeUsingSerde(value, OLDER_LONG_SERDE), LONG_SERDE) + ); + + // Write using the newer serde, read using the older serde + Assert.assertEquals( + new SerializablePairLongString(TIMESTAMP, null), + readUsingSerde(writeUsingSerde(value, INTEGER_SERDE), OLDER_INTEGER_SERDE) + ); + Assert.assertEquals( + new SerializablePairLongString(TIMESTAMP, null), + readUsingSerde(writeUsingSerde(value, LONG_SERDE), OLDER_LONG_SERDE) + ); + + // Compare the length of the serialized bytes for the value + Assert.assertEquals( + writeUsingSerde(value, OLDER_INTEGER_SERDE).length, + writeUsingSerde(value, INTEGER_SERDE).length + ); + Assert.assertEquals(writeUsingSerde(value, OLDER_LONG_SERDE).length, writeUsingSerde(value, LONG_SERDE).length); + } + + @Test + public void testEmptyString() + { + SerializablePairLongString value = new SerializablePairLongString(TIMESTAMP, ""); + + // Write using the older serde, read using the newer serde + Assert.assertEquals( + new SerializablePairLongString(TIMESTAMP, null), + readUsingSerde(writeUsingSerde(value, OLDER_INTEGER_SERDE), INTEGER_SERDE) + ); + Assert.assertEquals( + new SerializablePairLongString(TIMESTAMP, null), + readUsingSerde(writeUsingSerde(value, OLDER_LONG_SERDE), LONG_SERDE) + ); + + // Write using the newer serde, read using the older serde + Assert.assertEquals( + new SerializablePairLongString(TIMESTAMP, null), + readUsingSerde(writeUsingSerde(value, INTEGER_SERDE), OLDER_INTEGER_SERDE) + ); + Assert.assertEquals( + new SerializablePairLongString(TIMESTAMP, null), + readUsingSerde(writeUsingSerde(value, LONG_SERDE), OLDER_LONG_SERDE) + ); + + // Compare the length of the serialized bytes for the value + Assert.assertEquals( + writeUsingSerde(value, OLDER_INTEGER_SERDE).length, + writeUsingSerde(value, INTEGER_SERDE).length + ); + Assert.assertEquals(writeUsingSerde(value, OLDER_LONG_SERDE).length, writeUsingSerde(value, LONG_SERDE).length); + } + + @Test + public void testLargeString() + { + SerializablePairLongString value = new SerializablePairLongString( + TIMESTAMP, + randomStringUtils.randomAlphanumeric(1024 * 1024) + ); + testValue(value, OLDER_INTEGER_SERDE, INTEGER_SERDE); + testValue(value, OLDER_LONG_SERDE, LONG_SERDE); + } + + private void testValue( + @Nullable SerializablePairLongString value, + StagedSerde olderSerde, + StagedSerde serde + ) + { + // Write using the older serde, read using the newer serde + Assert.assertEquals( + value, + readUsingSerde(writeUsingSerde(value, olderSerde), serde) + ); + // Write using the newer serde, read using the older serde + Assert.assertEquals( + value, + readUsingSerde(writeUsingSerde(value, serde), olderSerde) + ); + // Compare the length of the serialized bytes for the value + Assert.assertEquals(writeUsingSerde(value, olderSerde).length, writeUsingSerde(value, serde).length); + } + + private static byte[] writeUsingSerde( + @Nullable SerializablePairLongString value, + StagedSerde serde + ) + { + return serde.serialize(value); + } + + private static SerializablePairLongString readUsingSerde( + byte[] bytes, + StagedSerde serde + ) + { + return serde.deserialize(bytes); + } + + /** + * Older serde class for delta encoded long-string pair serde, that treated empty and null strings equivalently and returned + * coerced both to null + */ + private static class OlderSerializablePairLongStringDeltaEncodedStagedSerde + implements StagedSerde + { + private final long minValue; + private final boolean useIntegerDelta; + + public OlderSerializablePairLongStringDeltaEncodedStagedSerde(long minValue, boolean useIntegerDelta) + { + this.minValue = minValue; + this.useIntegerDelta = useIntegerDelta; + } + + @Override + public StorableBuffer serializeDelayed(@Nullable SerializablePairLongString value) + { + if (value == null) { + return StorableBuffer.EMPTY; + } + + String rhsString = value.rhs; + byte[] rhsBytes = StringUtils.toUtf8WithNullToEmpty(rhsString); + + return new StorableBuffer() + { + @Override + public void store(ByteBuffer byteBuffer) + { + Preconditions.checkNotNull(value.lhs, "Long in SerializablePairLongString must be non-null"); + + long delta = value.lhs - minValue; + + Preconditions.checkState(delta >= 0 || delta == value.lhs); + + if (useIntegerDelta) { + byteBuffer.putInt(Ints.checkedCast(delta)); + } else { + byteBuffer.putLong(delta); + } + + byteBuffer.putInt(rhsBytes.length); + + if (rhsBytes.length > 0) { + byteBuffer.put(rhsBytes); + } + } + + @Override + public int getSerializedSize() + { + return (useIntegerDelta ? Integer.BYTES : Long.BYTES) + Integer.BYTES + rhsBytes.length; + } + }; + } + + @Nullable + @Override + public SerializablePairLongString deserialize(ByteBuffer byteBuffer) + { + if (byteBuffer.remaining() == 0) { + return null; + } + + ByteBuffer readOnlyBuffer = byteBuffer.asReadOnlyBuffer().order(ByteOrder.nativeOrder()); + long lhs; + + if (useIntegerDelta) { + lhs = readOnlyBuffer.getInt(); + } else { + lhs = readOnlyBuffer.getLong(); + } + + lhs += minValue; + + int stringSize = readOnlyBuffer.getInt(); + String lastString = null; + + if (stringSize > 0) { + byte[] stringBytes = new byte[stringSize]; + + readOnlyBuffer.get(stringBytes, 0, stringSize); + lastString = StringUtils.fromUtf8(stringBytes); + } + + return new SerializablePairLongString(lhs, lastString); + } + } +} diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/BackwardCompatibleSerializablePairLongStringSimpleStagedSerdeTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/BackwardCompatibleSerializablePairLongStringSimpleStagedSerdeTest.java new file mode 100644 index 00000000000..db5e5490f65 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/aggregation/BackwardCompatibleSerializablePairLongStringSimpleStagedSerdeTest.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import com.google.common.base.Preconditions; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.segment.serde.cell.RandomStringUtils; +import org.apache.druid.segment.serde.cell.StagedSerde; +import org.apache.druid.segment.serde.cell.StorableBuffer; +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Random; + +public class BackwardCompatibleSerializablePairLongStringSimpleStagedSerdeTest +{ + private static final OlderSerializablePairLongStringSimpleStagedSerde OLDER_SERDE = + new OlderSerializablePairLongStringSimpleStagedSerde(); + private static final SerializablePairLongStringSimpleStagedSerde SERDE = + new SerializablePairLongStringSimpleStagedSerde(); + + private final RandomStringUtils randomStringUtils = new RandomStringUtils(new Random(0)); + + @Test + public void testSimple() + { + testValue(new SerializablePairLongString(Long.MAX_VALUE, "fuu")); + } + + @Test + public void testNull() + { + testValue(null); + } + + @Test + public void testNullString() + { + SerializablePairLongString value = new SerializablePairLongString(Long.MAX_VALUE, null); + // Write using the older serde, read using the newer serde + Assert.assertEquals( + new SerializablePairLongString(Long.MAX_VALUE, ""), + readUsingSerde(writeUsingSerde(value, OLDER_SERDE), SERDE) + ); + // Write using the newer serde, read using the older serde + Assert.assertEquals( + new SerializablePairLongString(Long.MAX_VALUE, null), + readUsingSerde(writeUsingSerde(value, SERDE), OLDER_SERDE) + ); + // Compare the length of the serialized bytes for the value + Assert.assertEquals(writeUsingSerde(value, OLDER_SERDE).length, writeUsingSerde(value, SERDE).length); + } + + @Test + public void testEmptyString() + { + SerializablePairLongString value = new SerializablePairLongString(Long.MAX_VALUE, ""); + // Write using the older serde, read using the newer serde + Assert.assertEquals( + new SerializablePairLongString(Long.MAX_VALUE, ""), + readUsingSerde(writeUsingSerde(value, OLDER_SERDE), SERDE) + ); + // Write using the newer serde, read using the older serde + Assert.assertEquals( + new SerializablePairLongString(Long.MAX_VALUE, null), + readUsingSerde(writeUsingSerde(value, SERDE), OLDER_SERDE) + ); + // Compare the length of the serialized bytes for the value + Assert.assertEquals(writeUsingSerde(value, OLDER_SERDE).length, writeUsingSerde(value, SERDE).length); + + } + + @Test + public void testLargeString() + { + testValue(new SerializablePairLongString(Long.MAX_VALUE, randomStringUtils.randomAlphanumeric(1024 * 1024))); + } + + private void testValue(@Nullable SerializablePairLongString value) + { + // Write using the older serde, read using the newer serde + Assert.assertEquals( + value, + readUsingSerde(writeUsingSerde(value, OLDER_SERDE), SERDE) + ); + // Write using the newer serde, read using the older serde + Assert.assertEquals( + value, + readUsingSerde(writeUsingSerde(value, SERDE), OLDER_SERDE) + ); + // Compare the length of the serialized bytes for the value + Assert.assertEquals(writeUsingSerde(value, OLDER_SERDE).length, writeUsingSerde(value, SERDE).length); + } + + private static byte[] writeUsingSerde( + @Nullable SerializablePairLongString value, + StagedSerde serde + ) + { + return serde.serialize(value); + } + + private static SerializablePairLongString readUsingSerde( + byte[] bytes, + StagedSerde serde + ) + { + return serde.deserialize(bytes); + } + + /** + * Older serde class for simple long-string pair serde, that treated empty and null strings equivalently and returned + * coerced both to null + */ + private static class OlderSerializablePairLongStringSimpleStagedSerde + implements StagedSerde + { + @Override + public StorableBuffer serializeDelayed(@Nullable SerializablePairLongString value) + { + if (value == null) { + return StorableBuffer.EMPTY; + } + + String rhsString = value.rhs; + byte[] rhsBytes = StringUtils.toUtf8WithNullToEmpty(rhsString); + + return new StorableBuffer() + { + @Override + public void store(ByteBuffer byteBuffer) + { + Preconditions.checkNotNull(value.lhs, "Long in SerializablePairLongString must be non-null"); + + byteBuffer.putLong(value.lhs); + byteBuffer.putInt(rhsBytes.length); + + if (rhsBytes.length > 0) { + byteBuffer.put(rhsBytes); + } + } + + @Override + public int getSerializedSize() + { + return Long.BYTES + Integer.BYTES + rhsBytes.length; + } + }; + } + + @Nullable + @Override + public SerializablePairLongString deserialize(ByteBuffer byteBuffer) + { + if (byteBuffer.remaining() == 0) { + return null; + } + + ByteBuffer readOnlyBuffer = byteBuffer.asReadOnlyBuffer().order(ByteOrder.nativeOrder()); + long lhs = readOnlyBuffer.getLong(); + int stringSize = readOnlyBuffer.getInt(); + String lastString = null; + + if (stringSize > 0) { + byte[] stringBytes = new byte[stringSize]; + + readOnlyBuffer.get(stringBytes, 0, stringSize); + lastString = StringUtils.fromUtf8(stringBytes); + } + + return new SerializablePairLongString(lhs, lastString); + } + } + +} diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringDeltaEncodedStagedSerdeTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringDeltaEncodedStagedSerdeTest.java index d0489cf9281..d9966412d25 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringDeltaEncodedStagedSerdeTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringDeltaEncodedStagedSerdeTest.java @@ -53,6 +53,12 @@ public class SerializablePairLongStringDeltaEncodedStagedSerdeTest assertValueEquals(new SerializablePairLongString(100L, null), 8, INTEGER_SERDE); } + @Test + public void testEmptyStringInteger() + { + assertValueEquals(new SerializablePairLongString(100L, ""), 8, INTEGER_SERDE); + } + @Test public void testLargeStringInteger() { @@ -75,6 +81,13 @@ public class SerializablePairLongStringDeltaEncodedStagedSerdeTest assertValueEquals(new SerializablePairLongString(100L, null), 12, LONG_SERDE); } + @Test + public void testEmptyStringLong() + { + assertValueEquals(new SerializablePairLongString(100L, ""), 12, LONG_SERDE); + } + + @Test public void testLargeStringLong() { diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringSimpleStagedSerdeTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringSimpleStagedSerdeTest.java index 23d57f0aa9e..59c77f10b28 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringSimpleStagedSerdeTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringSimpleStagedSerdeTest.java @@ -51,6 +51,12 @@ public class SerializablePairLongStringSimpleStagedSerdeTest assertValueEquals(new SerializablePairLongString(Long.MAX_VALUE, null), 12); } + @Test + public void testEmptyString() + { + assertValueEquals(new SerializablePairLongString(Long.MAX_VALUE, ""), 12); + } + @Test public void testLargeString() {