From d452cbbb821a872a703453cd4ba22ac77f7737dc Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 13 Dec 2019 15:33:14 -0800 Subject: [PATCH] GenericIndexedWriter: Fix issue when writing large values to large columns. (#9029) --- .../segment/data/GenericIndexedWriter.java | 63 +++++++++++++---- .../data/GenericIndexedWriterTest.java | 68 +++++++++++++++++++ 2 files changed, 118 insertions(+), 13 deletions(-) create mode 100644 processing/src/test/java/org/apache/druid/segment/data/GenericIndexedWriterTest.java diff --git a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java index 976c88f3c41..75785066c5a 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java +++ b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java @@ -19,6 +19,7 @@ package org.apache.druid.segment.data; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.primitives.Ints; import it.unimi.dsi.fastutil.longs.LongArrayList; @@ -48,7 +49,7 @@ import java.nio.channels.WritableByteChannel; */ public class GenericIndexedWriter implements Serializer { - private static int PAGE_SIZE = 4096; + private static final int PAGE_SIZE = 4096; private static final MetaSerdeHelper SINGLE_FILE_META_SERDE_HELPER = MetaSerdeHelper .firstWriteByte((GenericIndexedWriter x) -> GenericIndexed.VERSION_ONE) @@ -148,9 +149,16 @@ public class GenericIndexedWriter implements Serializer @Nullable private LongList headerOutLong; + // Used by checkedCastNonnegativeLongToInt. Will always be Integer.MAX_VALUE in production. + private int intMaxForCasting = Integer.MAX_VALUE; + private final ByteBuffer getOffsetBuffer = ByteBuffer.allocate(Integer.BYTES); - public GenericIndexedWriter(SegmentWriteOutMedium segmentWriteOutMedium, String filenameBase, ObjectStrategy strategy) + public GenericIndexedWriter( + SegmentWriteOutMedium segmentWriteOutMedium, + String filenameBase, + ObjectStrategy strategy + ) { this(segmentWriteOutMedium, filenameBase, strategy, Integer.MAX_VALUE & ~PAGE_SIZE); } @@ -210,13 +218,18 @@ public class GenericIndexedWriter implements Serializer objectsSorted = false; } + @VisibleForTesting + void setIntMaxForCasting(final int intMaxForCasting) + { + this.intMaxForCasting = intMaxForCasting; + } + public void write(@Nullable T objectToWrite) throws IOException { if (objectsSorted && prevObject != null && strategy.compare(prevObject, objectToWrite) >= 0) { objectsSorted = false; } - ++numWritten; // for compatibility with the format (see GenericIndexed javadoc for description of the format), // this field is used to store nullness marker, but in a better format this info can take 1 bit. valuesOut.writeInt(objectToWrite == null ? GenericIndexed.NULL_VALUE_SIZE_MARKER : 0); @@ -224,17 +237,28 @@ public class GenericIndexedWriter implements Serializer strategy.writeTo(objectToWrite, valuesOut); } - if (!requireMultipleFiles) { - headerOut.writeInt(Ints.checkedCast(valuesOut.size())); - } else { - headerOutLong.add(valuesOut.size()); - } - + // Before updating the header, check if we need to switch to multi-file mode. if (!requireMultipleFiles && getSerializedSize() > fileSizeLimit) { requireMultipleFiles = true; initializeHeaderOutLong(); } + // Increment number of values written. Important to do this after the check above, since numWritten is + // accessed during "initializeHeaderOutLong" to determine the length of the header. + ++numWritten; + + if (!requireMultipleFiles) { + headerOut.writeInt(checkedCastNonnegativeLongToInt(valuesOut.size())); + + // Check _again_ if we need to switch to multi-file mode. (We might need to after updating the header.) + if (getSerializedSize() > fileSizeLimit) { + requireMultipleFiles = true; + initializeHeaderOutLong(); + } + } else { + headerOutLong.add(valuesOut.size()); + } + if (objectsSorted) { prevObject = objectToWrite; } @@ -250,7 +274,7 @@ public class GenericIndexedWriter implements Serializer startOffset = getOffset(index - 1) + Integer.BYTES; } long endOffset = getOffset(index); - int valueSize = Ints.checkedCast(endOffset - startOffset); + int valueSize = checkedCastNonnegativeLongToInt(endOffset - startOffset); if (valueSize == 0) { return null; } @@ -391,7 +415,7 @@ public class GenericIndexedWriter implements Serializer /** * Checks if candidate value splits can divide value file in such a way no object/element crosses the value splits. * - * @param powerTwo candidate value split expressed as power of 2. + * @param powerTwo candidate value split expressed as power of 2. * * @return true if candidate value split can hold all splits. * @@ -410,7 +434,7 @@ public class GenericIndexedWriter implements Serializer if (headerIndex >= numWritten) { return true; } else if (headerIndex + bagSize <= numWritten) { - currentValueOffset = headerOutLong.getLong(Ints.checkedCast(headerIndex + bagSize - 1)); + currentValueOffset = headerOutLong.getLong(checkedCastNonnegativeLongToInt(headerIndex + bagSize - 1)); } else if (numWritten < headerIndex + bagSize) { currentValueOffset = headerOutLong.getLong(numWritten - 1); } @@ -446,7 +470,7 @@ public class GenericIndexedWriter implements Serializer } currentNumBytes = headerOutLong.getLong(pos); relativeNumBytes = currentNumBytes - relativeRefBytes; - helperBuffer.putInt(0, Ints.checkedCast(relativeNumBytes)); + helperBuffer.putInt(0, checkedCastNonnegativeLongToInt(relativeNumBytes)); helperBuffer.clear(); smooshChannel.write(helperBuffer); } @@ -464,4 +488,17 @@ public class GenericIndexedWriter implements Serializer } } + /** + * Cast a long to an int, throwing an exception if it is out of range. Uses "intMaxForCasting" as the max + * integer value. Only works for nonnegative "n". + */ + private int checkedCastNonnegativeLongToInt(final long n) + { + if (n >= 0 && n <= intMaxForCasting) { + return (int) n; + } else { + // Likely bug. + throw new IAE("Value out of nonnegative int range"); + } + } } diff --git a/processing/src/test/java/org/apache/druid/segment/data/GenericIndexedWriterTest.java b/processing/src/test/java/org/apache/druid/segment/data/GenericIndexedWriterTest.java new file mode 100644 index 00000000000..7be04c30d62 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/data/GenericIndexedWriterTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.data; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; +import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMedium; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.file.StandardOpenOption; + +public class GenericIndexedWriterTest +{ + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @BeforeClass + public static void staticSetUp() + { + NullHandling.initializeForTests(); + } + + @Test + public void writeLargeValueIntoLargeColumn() throws IOException + { + // Regression test for https://github.com/apache/incubator-druid/issues/9027. + + final GenericIndexedWriter writer = new GenericIndexedWriter<>( + new OnHeapMemorySegmentWriteOutMedium(), + "test", + GenericIndexed.STRING_STRATEGY, + 100 + ); + + writer.setIntMaxForCasting(150); + writer.open(); + writer.write("i really like writing strings"); + writer.write("i really like writing strings"); + writer.write("i really like writing strings i really like writing strings i really like writing strings"); + writer.write("i really like writing strings"); + writer.writeTo( + FileChannel.open(temporaryFolder.newFile().toPath(), StandardOpenOption.WRITE), + new FileSmoosher(temporaryFolder.newFolder()) + ); + } +}