GenericIndexedWriter: Fix issue when writing large values to large columns. (#9029)

This commit is contained in:
Gian Merlino 2019-12-13 15:33:14 -08:00 committed by Himanshu
parent 3325da1718
commit d452cbbb82
2 changed files with 118 additions and 13 deletions

View File

@ -19,6 +19,7 @@
package org.apache.druid.segment.data;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import it.unimi.dsi.fastutil.longs.LongArrayList;
@ -48,7 +49,7 @@ import java.nio.channels.WritableByteChannel;
*/
public class GenericIndexedWriter<T> implements Serializer
{
private static int PAGE_SIZE = 4096;
private static final int PAGE_SIZE = 4096;
private static final MetaSerdeHelper<GenericIndexedWriter> SINGLE_FILE_META_SERDE_HELPER = MetaSerdeHelper
.firstWriteByte((GenericIndexedWriter x) -> GenericIndexed.VERSION_ONE)
@ -148,9 +149,16 @@ public class GenericIndexedWriter<T> implements Serializer
@Nullable
private LongList headerOutLong;
// Used by checkedCastNonnegativeLongToInt. Will always be Integer.MAX_VALUE in production.
private int intMaxForCasting = Integer.MAX_VALUE;
private final ByteBuffer getOffsetBuffer = ByteBuffer.allocate(Integer.BYTES);
public GenericIndexedWriter(SegmentWriteOutMedium segmentWriteOutMedium, String filenameBase, ObjectStrategy<T> strategy)
public GenericIndexedWriter(
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
ObjectStrategy<T> strategy
)
{
this(segmentWriteOutMedium, filenameBase, strategy, Integer.MAX_VALUE & ~PAGE_SIZE);
}
@ -210,13 +218,18 @@ public class GenericIndexedWriter<T> implements Serializer
objectsSorted = false;
}
@VisibleForTesting
void setIntMaxForCasting(final int intMaxForCasting)
{
this.intMaxForCasting = intMaxForCasting;
}
public void write(@Nullable T objectToWrite) throws IOException
{
if (objectsSorted && prevObject != null && strategy.compare(prevObject, objectToWrite) >= 0) {
objectsSorted = false;
}
++numWritten;
// for compatibility with the format (see GenericIndexed javadoc for description of the format),
// this field is used to store nullness marker, but in a better format this info can take 1 bit.
valuesOut.writeInt(objectToWrite == null ? GenericIndexed.NULL_VALUE_SIZE_MARKER : 0);
@ -224,17 +237,28 @@ public class GenericIndexedWriter<T> implements Serializer
strategy.writeTo(objectToWrite, valuesOut);
}
if (!requireMultipleFiles) {
headerOut.writeInt(Ints.checkedCast(valuesOut.size()));
} else {
headerOutLong.add(valuesOut.size());
}
// Before updating the header, check if we need to switch to multi-file mode.
if (!requireMultipleFiles && getSerializedSize() > fileSizeLimit) {
requireMultipleFiles = true;
initializeHeaderOutLong();
}
// Increment number of values written. Important to do this after the check above, since numWritten is
// accessed during "initializeHeaderOutLong" to determine the length of the header.
++numWritten;
if (!requireMultipleFiles) {
headerOut.writeInt(checkedCastNonnegativeLongToInt(valuesOut.size()));
// Check _again_ if we need to switch to multi-file mode. (We might need to after updating the header.)
if (getSerializedSize() > fileSizeLimit) {
requireMultipleFiles = true;
initializeHeaderOutLong();
}
} else {
headerOutLong.add(valuesOut.size());
}
if (objectsSorted) {
prevObject = objectToWrite;
}
@ -250,7 +274,7 @@ public class GenericIndexedWriter<T> implements Serializer
startOffset = getOffset(index - 1) + Integer.BYTES;
}
long endOffset = getOffset(index);
int valueSize = Ints.checkedCast(endOffset - startOffset);
int valueSize = checkedCastNonnegativeLongToInt(endOffset - startOffset);
if (valueSize == 0) {
return null;
}
@ -410,7 +434,7 @@ public class GenericIndexedWriter<T> implements Serializer
if (headerIndex >= numWritten) {
return true;
} else if (headerIndex + bagSize <= numWritten) {
currentValueOffset = headerOutLong.getLong(Ints.checkedCast(headerIndex + bagSize - 1));
currentValueOffset = headerOutLong.getLong(checkedCastNonnegativeLongToInt(headerIndex + bagSize - 1));
} else if (numWritten < headerIndex + bagSize) {
currentValueOffset = headerOutLong.getLong(numWritten - 1);
}
@ -446,7 +470,7 @@ public class GenericIndexedWriter<T> implements Serializer
}
currentNumBytes = headerOutLong.getLong(pos);
relativeNumBytes = currentNumBytes - relativeRefBytes;
helperBuffer.putInt(0, Ints.checkedCast(relativeNumBytes));
helperBuffer.putInt(0, checkedCastNonnegativeLongToInt(relativeNumBytes));
helperBuffer.clear();
smooshChannel.write(helperBuffer);
}
@ -464,4 +488,17 @@ public class GenericIndexedWriter<T> implements Serializer
}
}
/**
* Cast a long to an int, throwing an exception if it is out of range. Uses "intMaxForCasting" as the max
* integer value. Only works for nonnegative "n".
*/
private int checkedCastNonnegativeLongToInt(final long n)
{
if (n >= 0 && n <= intMaxForCasting) {
return (int) n;
} else {
// Likely bug.
throw new IAE("Value out of nonnegative int range");
}
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.data;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMedium;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
public class GenericIndexedWriterTest
{
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@BeforeClass
public static void staticSetUp()
{
NullHandling.initializeForTests();
}
@Test
public void writeLargeValueIntoLargeColumn() throws IOException
{
// Regression test for https://github.com/apache/incubator-druid/issues/9027.
final GenericIndexedWriter<String> writer = new GenericIndexedWriter<>(
new OnHeapMemorySegmentWriteOutMedium(),
"test",
GenericIndexed.STRING_STRATEGY,
100
);
writer.setIntMaxForCasting(150);
writer.open();
writer.write("i really like writing strings");
writer.write("i really like writing strings");
writer.write("i really like writing strings i really like writing strings i really like writing strings");
writer.write("i really like writing strings");
writer.writeTo(
FileChannel.open(temporaryFolder.newFile().toPath(), StandardOpenOption.WRITE),
new FileSmoosher(temporaryFolder.newFolder())
);
}
}