error on value counter overflow instead of writing sad segments (#9559)

This commit is contained in:
Clint Wylie 2020-03-26 16:54:48 -07:00 committed by GitHub
parent e6e2836b0e
commit 2c49f6d89a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 1044 additions and 158 deletions

View File

@ -65,21 +65,26 @@ public class FloatCompressionBenchmarkFileGenerator
dirPath = args[0];
}
BenchmarkColumnSchema enumeratedSchema = BenchmarkColumnSchema.makeEnumerated("", ValueType.FLOAT, true, 1, 0d,
ImmutableList.of(
0f,
1.1f,
2.2f,
3.3f,
4.4f
),
ImmutableList.of(
0.95,
0.001,
0.0189,
0.03,
0.0001
)
BenchmarkColumnSchema enumeratedSchema = BenchmarkColumnSchema.makeEnumerated(
"",
ValueType.FLOAT,
true,
1,
0d,
ImmutableList.of(
0f,
1.1f,
2.2f,
3.3f,
4.4f
),
ImmutableList.of(
0.95,
0.001,
0.0189,
0.03,
0.0001
)
);
BenchmarkColumnSchema zipfLowSchema = BenchmarkColumnSchema.makeZipf(
"",
@ -151,6 +156,7 @@ public class FloatCompressionBenchmarkFileGenerator
File dataFile = new File(dir, entry.getKey());
ColumnarFloatsSerializer writer = CompressionFactory.getFloatSerializer(
"float-benchmark",
new OffHeapMemorySegmentWriteOutMedium(),
"float",
ByteOrder.nativeOrder(),

View File

@ -66,21 +66,26 @@ public class LongCompressionBenchmarkFileGenerator
dirPath = args[0];
}
BenchmarkColumnSchema enumeratedSchema = BenchmarkColumnSchema.makeEnumerated("", ValueType.LONG, true, 1, 0d,
ImmutableList.of(
0,
1,
2,
3,
4
),
ImmutableList.of(
0.95,
0.001,
0.0189,
0.03,
0.0001
)
BenchmarkColumnSchema enumeratedSchema = BenchmarkColumnSchema.makeEnumerated(
"",
ValueType.LONG,
true,
1,
0d,
ImmutableList.of(
0,
1,
2,
3,
4
),
ImmutableList.of(
0.95,
0.001,
0.0189,
0.03,
0.0001
)
);
BenchmarkColumnSchema zipfLowSchema = BenchmarkColumnSchema.makeZipf("", ValueType.LONG, true, 1, 0d, -1, 1000, 1d);
BenchmarkColumnSchema zipfHighSchema = BenchmarkColumnSchema.makeZipf(
@ -144,6 +149,7 @@ public class LongCompressionBenchmarkFileGenerator
File dataFile = new File(dir, entry.getKey());
ColumnarLongsSerializer writer = CompressionFactory.getLongSerializer(
"long-benchmark",
new OffHeapMemorySegmentWriteOutMedium(),
"long",
ByteOrder.nativeOrder(),

View File

@ -33,14 +33,16 @@ import java.nio.channels.WritableByteChannel;
public class DoubleColumnSerializer implements GenericColumnSerializer<Object>
{
public static DoubleColumnSerializer create(
String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
CompressionStrategy compression
)
{
return new DoubleColumnSerializer(segmentWriteOutMedium, filenameBase, IndexIO.BYTE_ORDER, compression);
return new DoubleColumnSerializer(columnName, segmentWriteOutMedium, filenameBase, IndexIO.BYTE_ORDER, compression);
}
private final String columnName;
private final SegmentWriteOutMedium segmentWriteOutMedium;
private final String filenameBase;
private final ByteOrder byteOrder;
@ -48,12 +50,14 @@ public class DoubleColumnSerializer implements GenericColumnSerializer<Object>
private ColumnarDoublesSerializer writer;
private DoubleColumnSerializer(
String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
ByteOrder byteOrder,
CompressionStrategy compression
)
{
this.columnName = columnName;
this.segmentWriteOutMedium = segmentWriteOutMedium;
this.filenameBase = filenameBase;
this.byteOrder = byteOrder;
@ -64,6 +68,7 @@ public class DoubleColumnSerializer implements GenericColumnSerializer<Object>
public void open() throws IOException
{
writer = CompressionFactory.getDoubleSerializer(
columnName,
segmentWriteOutMedium,
StringUtils.format("%s.double_column", filenameBase),
byteOrder,

View File

@ -45,6 +45,7 @@ import java.nio.channels.WritableByteChannel;
public class DoubleColumnSerializerV2 implements GenericColumnSerializer<Object>
{
public static DoubleColumnSerializerV2 create(
String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
CompressionStrategy compression,
@ -52,6 +53,7 @@ public class DoubleColumnSerializerV2 implements GenericColumnSerializer<Object>
)
{
return new DoubleColumnSerializerV2(
columnName,
segmentWriteOutMedium,
filenameBase,
IndexIO.BYTE_ORDER,
@ -60,6 +62,7 @@ public class DoubleColumnSerializerV2 implements GenericColumnSerializer<Object>
);
}
private final String columnName;
private final SegmentWriteOutMedium segmentWriteOutMedium;
private final String filenameBase;
private final ByteOrder byteOrder;
@ -72,6 +75,7 @@ public class DoubleColumnSerializerV2 implements GenericColumnSerializer<Object>
private int rowCount = 0;
private DoubleColumnSerializerV2(
String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
ByteOrder byteOrder,
@ -79,6 +83,7 @@ public class DoubleColumnSerializerV2 implements GenericColumnSerializer<Object>
BitmapSerdeFactory bitmapSerdeFactory
)
{
this.columnName = columnName;
this.segmentWriteOutMedium = segmentWriteOutMedium;
this.filenameBase = filenameBase;
this.byteOrder = byteOrder;
@ -90,6 +95,7 @@ public class DoubleColumnSerializerV2 implements GenericColumnSerializer<Object>
public void open() throws IOException
{
writer = CompressionFactory.getDoubleSerializer(
columnName,
segmentWriteOutMedium,
StringUtils.format("%s.double_column", filenameBase),
byteOrder,

View File

@ -33,14 +33,16 @@ import java.nio.channels.WritableByteChannel;
public class FloatColumnSerializer implements GenericColumnSerializer<Object>
{
public static FloatColumnSerializer create(
String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
CompressionStrategy compression
)
{
return new FloatColumnSerializer(segmentWriteOutMedium, filenameBase, IndexIO.BYTE_ORDER, compression);
return new FloatColumnSerializer(columnName, segmentWriteOutMedium, filenameBase, IndexIO.BYTE_ORDER, compression);
}
private final String columnName;
private final SegmentWriteOutMedium segmentWriteOutMedium;
private final String filenameBase;
private final ByteOrder byteOrder;
@ -48,12 +50,14 @@ public class FloatColumnSerializer implements GenericColumnSerializer<Object>
private ColumnarFloatsSerializer writer;
private FloatColumnSerializer(
String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
ByteOrder byteOrder,
CompressionStrategy compression
)
{
this.columnName = columnName;
this.segmentWriteOutMedium = segmentWriteOutMedium;
this.filenameBase = filenameBase;
this.byteOrder = byteOrder;
@ -64,6 +68,7 @@ public class FloatColumnSerializer implements GenericColumnSerializer<Object>
public void open() throws IOException
{
writer = CompressionFactory.getFloatSerializer(
columnName,
segmentWriteOutMedium,
StringUtils.format("%s.float_column", filenameBase),
byteOrder,

View File

@ -45,6 +45,7 @@ import java.nio.channels.WritableByteChannel;
public class FloatColumnSerializerV2 implements GenericColumnSerializer<Object>
{
public static FloatColumnSerializerV2 create(
String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
CompressionStrategy compression,
@ -52,6 +53,7 @@ public class FloatColumnSerializerV2 implements GenericColumnSerializer<Object>
)
{
return new FloatColumnSerializerV2(
columnName,
segmentWriteOutMedium,
filenameBase,
IndexIO.BYTE_ORDER,
@ -60,6 +62,7 @@ public class FloatColumnSerializerV2 implements GenericColumnSerializer<Object>
);
}
private final String columnName;
private final SegmentWriteOutMedium segmentWriteOutMedium;
private final String filenameBase;
private final ByteOrder byteOrder;
@ -72,6 +75,7 @@ public class FloatColumnSerializerV2 implements GenericColumnSerializer<Object>
private int rowCount = 0;
private FloatColumnSerializerV2(
String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
ByteOrder byteOrder,
@ -79,6 +83,7 @@ public class FloatColumnSerializerV2 implements GenericColumnSerializer<Object>
BitmapSerdeFactory bitmapSerdeFactory
)
{
this.columnName = columnName;
this.segmentWriteOutMedium = segmentWriteOutMedium;
this.filenameBase = filenameBase;
this.byteOrder = byteOrder;
@ -90,6 +95,7 @@ public class FloatColumnSerializerV2 implements GenericColumnSerializer<Object>
public void open() throws IOException
{
writer = CompressionFactory.getFloatSerializer(
columnName,
segmentWriteOutMedium,
StringUtils.format("%s.float_column", filenameBase),
byteOrder,

View File

@ -621,6 +621,7 @@ public class IndexMergerV9 implements IndexMerger
// If using default values for null use LongColumnSerializer to allow rollback to previous versions.
if (NullHandling.replaceWithDefault()) {
return LongColumnSerializer.create(
columnName,
segmentWriteOutMedium,
columnName,
indexSpec.getMetricCompression(),
@ -628,6 +629,7 @@ public class IndexMergerV9 implements IndexMerger
);
} else {
return LongColumnSerializerV2.create(
columnName,
segmentWriteOutMedium,
columnName,
indexSpec.getMetricCompression(),
@ -646,12 +648,14 @@ public class IndexMergerV9 implements IndexMerger
// If using default values for null use DoubleColumnSerializer to allow rollback to previous versions.
if (NullHandling.replaceWithDefault()) {
return DoubleColumnSerializer.create(
columnName,
segmentWriteOutMedium,
columnName,
indexSpec.getMetricCompression()
);
} else {
return DoubleColumnSerializerV2.create(
columnName,
segmentWriteOutMedium,
columnName,
indexSpec.getMetricCompression(),
@ -669,12 +673,14 @@ public class IndexMergerV9 implements IndexMerger
// If using default values for null use FloatColumnSerializer to allow rollback to previous versions.
if (NullHandling.replaceWithDefault()) {
return FloatColumnSerializer.create(
columnName,
segmentWriteOutMedium,
columnName,
indexSpec.getMetricCompression()
);
} else {
return FloatColumnSerializerV2.create(
columnName,
segmentWriteOutMedium,
columnName,
indexSpec.getMetricCompression(),

View File

@ -36,15 +36,17 @@ import java.nio.channels.WritableByteChannel;
public class LongColumnSerializer implements GenericColumnSerializer<Object>
{
public static LongColumnSerializer create(
String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
CompressionStrategy compression,
CompressionFactory.LongEncodingStrategy encoding
)
{
return new LongColumnSerializer(segmentWriteOutMedium, filenameBase, IndexIO.BYTE_ORDER, compression, encoding);
return new LongColumnSerializer(columnName, segmentWriteOutMedium, filenameBase, IndexIO.BYTE_ORDER, compression, encoding);
}
private final String columnName;
private final SegmentWriteOutMedium segmentWriteOutMedium;
private final String filenameBase;
private final ByteOrder byteOrder;
@ -53,6 +55,7 @@ public class LongColumnSerializer implements GenericColumnSerializer<Object>
private ColumnarLongsSerializer writer;
private LongColumnSerializer(
String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
ByteOrder byteOrder,
@ -60,6 +63,7 @@ public class LongColumnSerializer implements GenericColumnSerializer<Object>
CompressionFactory.LongEncodingStrategy encoding
)
{
this.columnName = columnName;
this.segmentWriteOutMedium = segmentWriteOutMedium;
this.filenameBase = filenameBase;
this.byteOrder = byteOrder;
@ -71,6 +75,7 @@ public class LongColumnSerializer implements GenericColumnSerializer<Object>
public void open() throws IOException
{
writer = CompressionFactory.getLongSerializer(
columnName,
segmentWriteOutMedium,
StringUtils.format("%s.long_column", filenameBase),
byteOrder,

View File

@ -45,6 +45,7 @@ import java.nio.channels.WritableByteChannel;
public class LongColumnSerializerV2 implements GenericColumnSerializer<Object>
{
public static LongColumnSerializerV2 create(
String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
CompressionStrategy compression,
@ -53,6 +54,7 @@ public class LongColumnSerializerV2 implements GenericColumnSerializer<Object>
)
{
return new LongColumnSerializerV2(
columnName,
segmentWriteOutMedium,
filenameBase,
IndexIO.BYTE_ORDER,
@ -62,6 +64,7 @@ public class LongColumnSerializerV2 implements GenericColumnSerializer<Object>
);
}
private final String columnName;
private final SegmentWriteOutMedium segmentWriteOutMedium;
private final String filenameBase;
private final ByteOrder byteOrder;
@ -75,6 +78,7 @@ public class LongColumnSerializerV2 implements GenericColumnSerializer<Object>
private int rowCount = 0;
private LongColumnSerializerV2(
String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
ByteOrder byteOrder,
@ -83,6 +87,7 @@ public class LongColumnSerializerV2 implements GenericColumnSerializer<Object>
BitmapSerdeFactory bitmapSerdeFactory
)
{
this.columnName = columnName;
this.segmentWriteOutMedium = segmentWriteOutMedium;
this.filenameBase = filenameBase;
this.byteOrder = byteOrder;
@ -95,6 +100,7 @@ public class LongColumnSerializerV2 implements GenericColumnSerializer<Object>
public void open() throws IOException
{
writer = CompressionFactory.getLongSerializer(
columnName,
segmentWriteOutMedium,
StringUtils.format("%s.long_column", filenameBase),
byteOrder,

View File

@ -224,17 +224,20 @@ public class StringDimensionMergerV9 implements DimensionMergerV9
if (capabilities.hasMultipleValues()) {
if (compressionStrategy != CompressionStrategy.UNCOMPRESSED) {
encodedValueSerializer = V3CompressedVSizeColumnarMultiIntsSerializer.create(
dimensionName,
segmentWriteOutMedium,
filenameBase,
cardinality,
compressionStrategy
);
} else {
encodedValueSerializer = new VSizeColumnarMultiIntsSerializer(segmentWriteOutMedium, cardinality);
encodedValueSerializer =
new VSizeColumnarMultiIntsSerializer(dimensionName, segmentWriteOutMedium, cardinality);
}
} else {
if (compressionStrategy != CompressionStrategy.UNCOMPRESSED) {
encodedValueSerializer = CompressedVSizeColumnarIntsSerializer.create(
dimensionName,
segmentWriteOutMedium,
filenameBase,
cardinality,

View File

@ -43,6 +43,7 @@ public class BlockLayoutColumnarDoublesSerializer implements ColumnarDoublesSeri
.writeInt(x -> CompressedPools.BUFFER_SIZE / Double.BYTES)
.writeByte(x -> x.compression.getId());
private final String columnName;
private final GenericIndexedWriter<ByteBuffer> flattener;
private final CompressionStrategy compression;
@ -51,12 +52,14 @@ public class BlockLayoutColumnarDoublesSerializer implements ColumnarDoublesSeri
private ByteBuffer endBuffer;
BlockLayoutColumnarDoublesSerializer(
String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
ByteOrder byteOrder,
CompressionStrategy compression
)
{
this.columnName = columnName;
this.flattener = GenericIndexedWriter.ofCompressedByteBuffers(
segmentWriteOutMedium,
filenameBase,
@ -75,6 +78,12 @@ public class BlockLayoutColumnarDoublesSerializer implements ColumnarDoublesSeri
flattener.open();
}
@Override
public int size()
{
return numInserted;
}
@Override
public void add(double value) throws IOException
{
@ -89,6 +98,9 @@ public class BlockLayoutColumnarDoublesSerializer implements ColumnarDoublesSeri
endBuffer.putDouble(value);
++numInserted;
if (numInserted < 0) {
throw new ColumnCapacityExceededException(columnName);
}
}
@Override

View File

@ -43,6 +43,7 @@ public class BlockLayoutColumnarFloatsSerializer implements ColumnarFloatsSerial
.writeInt(x -> CompressedPools.BUFFER_SIZE / Float.BYTES)
.writeByte(x -> x.compression.getId());
private final String columnName;
private final GenericIndexedWriter<ByteBuffer> flattener;
private final CompressionStrategy compression;
@ -51,12 +52,14 @@ public class BlockLayoutColumnarFloatsSerializer implements ColumnarFloatsSerial
private ByteBuffer endBuffer;
BlockLayoutColumnarFloatsSerializer(
String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
ByteOrder byteOrder,
CompressionStrategy compression
)
{
this.columnName = columnName;
this.flattener = GenericIndexedWriter.ofCompressedByteBuffers(
segmentWriteOutMedium,
filenameBase,
@ -94,6 +97,9 @@ public class BlockLayoutColumnarFloatsSerializer implements ColumnarFloatsSerial
}
endBuffer.putFloat(value);
++numInserted;
if (numInserted < 0) {
throw new ColumnCapacityExceededException(columnName);
}
}
@Override

View File

@ -42,6 +42,7 @@ public class BlockLayoutColumnarLongsSerializer implements ColumnarLongsSerializ
.writeInt(x -> x.sizePer)
.writeSomething(CompressionFactory.longEncodingWriter(x -> x.writer, x -> x.compression));
private final String columnName;
private final int sizePer;
private final CompressionFactory.LongEncodingWriter writer;
private final GenericIndexedWriter<ByteBuffer> flattener;
@ -53,6 +54,7 @@ public class BlockLayoutColumnarLongsSerializer implements ColumnarLongsSerializ
private ByteBuffer endBuffer;
BlockLayoutColumnarLongsSerializer(
String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
ByteOrder byteOrder,
@ -60,6 +62,7 @@ public class BlockLayoutColumnarLongsSerializer implements ColumnarLongsSerializ
CompressionStrategy compression
)
{
this.columnName = columnName;
this.sizePer = writer.getBlockSize(CompressedPools.BUFFER_SIZE);
int bufferSize = writer.getNumBytes(sizePer);
this.flattener = GenericIndexedWriter.ofCompressedByteBuffers(segmentWriteOutMedium, filenameBase, compression, bufferSize);
@ -100,6 +103,9 @@ public class BlockLayoutColumnarLongsSerializer implements ColumnarLongsSerializ
writer.write(value);
++numInserted;
if (numInserted < 0) {
throw new ColumnCapacityExceededException(columnName);
}
}
@Override

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.data;
import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.java.util.common.StringUtils;
public class ColumnCapacityExceededException extends RuntimeException
{
@VisibleForTesting
public static String formatMessage(String columnName)
{
return StringUtils.format(
"Too many values to store for %s column, try reducing maxRowsPerSegment",
columnName
);
}
public ColumnCapacityExceededException(String columnName)
{
super(formatMessage(columnName));
}
}

View File

@ -29,5 +29,6 @@ import java.io.IOException;
public interface ColumnarDoublesSerializer extends Serializer
{
void open() throws IOException;
int size();
void add(double value) throws IOException;
}

View File

@ -25,7 +25,6 @@ import org.apache.druid.segment.serde.MetaSerdeHelper;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@ -44,6 +43,7 @@ public class CompressedColumnarIntsSerializer extends SingleValueColumnarIntsSer
.writeInt(x -> x.chunkFactor)
.writeByte(x -> x.compression.getId());
private final String columnName;
private final int chunkFactor;
private final CompressionStrategy compression;
private final GenericIndexedWriter<ByteBuffer> flattener;
@ -53,6 +53,7 @@ public class CompressedColumnarIntsSerializer extends SingleValueColumnarIntsSer
private ByteBuffer endBuffer;
CompressedColumnarIntsSerializer(
final String columnName,
final SegmentWriteOutMedium segmentWriteOutMedium,
final String filenameBase,
final int chunkFactor,
@ -61,6 +62,7 @@ public class CompressedColumnarIntsSerializer extends SingleValueColumnarIntsSer
)
{
this(
columnName,
segmentWriteOutMedium,
chunkFactor,
byteOrder,
@ -75,6 +77,7 @@ public class CompressedColumnarIntsSerializer extends SingleValueColumnarIntsSer
}
CompressedColumnarIntsSerializer(
final String columnName,
final SegmentWriteOutMedium segmentWriteOutMedium,
final int chunkFactor,
final ByteOrder byteOrder,
@ -82,6 +85,7 @@ public class CompressedColumnarIntsSerializer extends SingleValueColumnarIntsSer
final GenericIndexedWriter<ByteBuffer> flattener
)
{
this.columnName = columnName;
this.chunkFactor = chunkFactor;
this.compression = compression;
this.flattener = flattener;
@ -110,6 +114,9 @@ public class CompressedColumnarIntsSerializer extends SingleValueColumnarIntsSer
}
endBuffer.putInt(val);
numInserted++;
if (numInserted < 0) {
throw new ColumnCapacityExceededException(columnName);
}
}
@Override

View File

@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.CompressedPools;
import org.apache.druid.segment.serde.MetaSerdeHelper;
@ -133,6 +134,25 @@ public class CompressedColumnarIntsSupplier implements WritableSupplier<Columnar
throw new IAE("Unknown version[%s]", versionFromBuffer);
}
public static CompressedColumnarIntsSupplier fromByteBuffer(ByteBuffer buffer, ByteOrder order, SmooshedFileMapper mapper)
{
byte versionFromBuffer = buffer.get();
if (versionFromBuffer == VERSION) {
final int totalSize = buffer.getInt();
final int sizePer = buffer.getInt();
final CompressionStrategy compression = CompressionStrategy.forId(buffer.get());
return new CompressedColumnarIntsSupplier(
totalSize,
sizePer,
GenericIndexed.read(buffer, new DecompressingByteBufferObjectStrategy(order, compression), mapper),
compression
);
}
throw new IAE("Unknown version[%s]", versionFromBuffer);
}
@VisibleForTesting
static CompressedColumnarIntsSupplier fromIntBuffer(
final IntBuffer buffer,

View File

@ -47,6 +47,7 @@ public class CompressedVSizeColumnarIntsSerializer extends SingleValueColumnarIn
.writeByte(x -> x.compression.getId());
public static CompressedVSizeColumnarIntsSerializer create(
final String columnName,
final SegmentWriteOutMedium segmentWriteOutMedium,
final String filenameBase,
final int maxValue,
@ -54,6 +55,7 @@ public class CompressedVSizeColumnarIntsSerializer extends SingleValueColumnarIn
)
{
return new CompressedVSizeColumnarIntsSerializer(
columnName,
segmentWriteOutMedium,
filenameBase,
maxValue,
@ -63,6 +65,7 @@ public class CompressedVSizeColumnarIntsSerializer extends SingleValueColumnarIn
);
}
private final String columnName;
private final int numBytes;
private final int chunkFactor;
private final boolean isBigEndian;
@ -75,6 +78,7 @@ public class CompressedVSizeColumnarIntsSerializer extends SingleValueColumnarIn
private ByteBuffer endBuffer;
CompressedVSizeColumnarIntsSerializer(
final String columnName,
final SegmentWriteOutMedium segmentWriteOutMedium,
final String filenameBase,
final int maxValue,
@ -84,6 +88,7 @@ public class CompressedVSizeColumnarIntsSerializer extends SingleValueColumnarIn
)
{
this(
columnName,
segmentWriteOutMedium,
maxValue,
chunkFactor,
@ -99,6 +104,7 @@ public class CompressedVSizeColumnarIntsSerializer extends SingleValueColumnarIn
}
CompressedVSizeColumnarIntsSerializer(
final String columnName,
final SegmentWriteOutMedium segmentWriteOutMedium,
final int maxValue,
final int chunkFactor,
@ -107,6 +113,7 @@ public class CompressedVSizeColumnarIntsSerializer extends SingleValueColumnarIn
final GenericIndexedWriter<ByteBuffer> flattener
)
{
this.columnName = columnName;
this.numBytes = VSizeColumnarInts.getNumBytesForMax(maxValue);
this.chunkFactor = chunkFactor;
int chunkBytes = chunkFactor * numBytes;
@ -149,6 +156,9 @@ public class CompressedVSizeColumnarIntsSerializer extends SingleValueColumnarIn
endBuffer.put(intBuffer.array(), 0, numBytes);
}
numInserted++;
if (numInserted < 0) {
throw new ColumnCapacityExceededException(columnName);
}
}
@Override

View File

@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.CompressedPools;
import org.apache.druid.segment.serde.MetaSerdeHelper;
@ -167,6 +168,34 @@ public class CompressedVSizeColumnarIntsSupplier implements WritableSupplier<Col
throw new IAE("Unknown version[%s]", versionFromBuffer);
}
public static CompressedVSizeColumnarIntsSupplier fromByteBuffer(
ByteBuffer buffer,
ByteOrder order,
SmooshedFileMapper mapper
)
{
byte versionFromBuffer = buffer.get();
if (versionFromBuffer == VERSION) {
final int numBytes = buffer.get();
final int totalSize = buffer.getInt();
final int sizePer = buffer.getInt();
final CompressionStrategy compression = CompressionStrategy.forId(buffer.get());
return new CompressedVSizeColumnarIntsSupplier(
totalSize,
sizePer,
numBytes,
GenericIndexed.read(buffer, new DecompressingByteBufferObjectStrategy(order, compression), mapper),
compression
);
}
throw new IAE("Unknown version[%s]", versionFromBuffer);
}
@VisibleForTesting
public static CompressedVSizeColumnarIntsSupplier fromList(
final IntList list,

View File

@ -331,6 +331,7 @@ public class CompressionFactory
}
public static ColumnarLongsSerializer getLongSerializer(
String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
ByteOrder order,
@ -339,12 +340,23 @@ public class CompressionFactory
)
{
if (encodingStrategy == LongEncodingStrategy.AUTO) {
return new IntermediateColumnarLongsSerializer(segmentWriteOutMedium, filenameBase, order, compressionStrategy);
return new IntermediateColumnarLongsSerializer(
columnName,
segmentWriteOutMedium,
filenameBase,
order,
compressionStrategy
);
} else if (encodingStrategy == LongEncodingStrategy.LONGS) {
if (compressionStrategy == CompressionStrategy.NONE) {
return new EntireLayoutColumnarLongsSerializer(segmentWriteOutMedium, new LongsLongEncodingWriter(order));
return new EntireLayoutColumnarLongsSerializer(
columnName,
segmentWriteOutMedium,
new LongsLongEncodingWriter(order)
);
} else {
return new BlockLayoutColumnarLongsSerializer(
columnName,
segmentWriteOutMedium,
filenameBase,
order,
@ -375,6 +387,7 @@ public class CompressionFactory
}
public static ColumnarFloatsSerializer getFloatSerializer(
String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
ByteOrder order,
@ -382,9 +395,15 @@ public class CompressionFactory
)
{
if (compressionStrategy == CompressionStrategy.NONE) {
return new EntireLayoutColumnarFloatsSerializer(segmentWriteOutMedium, order);
return new EntireLayoutColumnarFloatsSerializer(columnName, segmentWriteOutMedium, order);
} else {
return new BlockLayoutColumnarFloatsSerializer(segmentWriteOutMedium, filenameBase, order, compressionStrategy);
return new BlockLayoutColumnarFloatsSerializer(
columnName,
segmentWriteOutMedium,
filenameBase,
order,
compressionStrategy
);
}
}
@ -406,6 +425,7 @@ public class CompressionFactory
}
public static ColumnarDoublesSerializer getDoubleSerializer(
String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
ByteOrder byteOrder,
@ -413,9 +433,15 @@ public class CompressionFactory
)
{
if (compression == CompressionStrategy.NONE) {
return new EntireLayoutColumnarDoublesSerializer(segmentWriteOutMedium, byteOrder);
return new EntireLayoutColumnarDoublesSerializer(columnName, segmentWriteOutMedium, byteOrder);
} else {
return new BlockLayoutColumnarDoublesSerializer(segmentWriteOutMedium, filenameBase, byteOrder, compression);
return new BlockLayoutColumnarDoublesSerializer(
columnName,
segmentWriteOutMedium,
filenameBase,
byteOrder,
compression
);
}
}
}

View File

@ -40,14 +40,16 @@ public class EntireLayoutColumnarDoublesSerializer implements ColumnarDoublesSer
.writeInt(x -> 0)
.writeByte(x -> CompressionStrategy.NONE.getId());
private final String columnName;
private final SegmentWriteOutMedium segmentWriteOutMedium;
private final ByteBuffer orderBuffer;
private WriteOutBytes valuesOut;
private int numInserted = 0;
public EntireLayoutColumnarDoublesSerializer(SegmentWriteOutMedium segmentWriteOutMedium, ByteOrder order)
public EntireLayoutColumnarDoublesSerializer(String columnName, SegmentWriteOutMedium segmentWriteOutMedium, ByteOrder order)
{
this.columnName = columnName;
this.segmentWriteOutMedium = segmentWriteOutMedium;
this.orderBuffer = ByteBuffer.allocate(Double.BYTES);
orderBuffer.order(order);
@ -59,6 +61,12 @@ public class EntireLayoutColumnarDoublesSerializer implements ColumnarDoublesSer
valuesOut = segmentWriteOutMedium.makeWriteOutBytes();
}
@Override
public int size()
{
return numInserted;
}
@Override
public void add(double value) throws IOException
{
@ -66,6 +74,9 @@ public class EntireLayoutColumnarDoublesSerializer implements ColumnarDoublesSer
orderBuffer.putDouble(value);
valuesOut.write(orderBuffer.array());
++numInserted;
if (numInserted < 0) {
throw new ColumnCapacityExceededException(columnName);
}
}
@Override

View File

@ -39,16 +39,18 @@ public class EntireLayoutColumnarFloatsSerializer implements ColumnarFloatsSeria
.writeInt(x -> 0)
.writeByte(x -> CompressionStrategy.NONE.getId());
private final String columnName;
private final boolean isLittleEndian;
private final SegmentWriteOutMedium segmentWriteOutMedium;
private WriteOutBytes valuesOut;
private int numInserted = 0;
EntireLayoutColumnarFloatsSerializer(SegmentWriteOutMedium segmentWriteOutMedium, ByteOrder order)
EntireLayoutColumnarFloatsSerializer(String columnName, SegmentWriteOutMedium segmentWriteOutMedium, ByteOrder order)
{
this.columnName = columnName;
this.segmentWriteOutMedium = segmentWriteOutMedium;
isLittleEndian = order.equals(ByteOrder.LITTLE_ENDIAN);
this.isLittleEndian = order.equals(ByteOrder.LITTLE_ENDIAN);
}
@Override
@ -73,6 +75,9 @@ public class EntireLayoutColumnarFloatsSerializer implements ColumnarFloatsSeria
}
valuesOut.writeInt(valueBits);
++numInserted;
if (numInserted < 0) {
throw new ColumnCapacityExceededException(columnName);
}
}
@Override

View File

@ -38,6 +38,7 @@ public class EntireLayoutColumnarLongsSerializer implements ColumnarLongsSeriali
.writeInt(x -> 0)
.writeSomething(CompressionFactory.longEncodingWriter(x -> x.writer, x -> CompressionStrategy.NONE));
private final String columnName;
private final CompressionFactory.LongEncodingWriter writer;
private final SegmentWriteOutMedium segmentWriteOutMedium;
private WriteOutBytes valuesOut;
@ -45,10 +46,12 @@ public class EntireLayoutColumnarLongsSerializer implements ColumnarLongsSeriali
private int numInserted = 0;
EntireLayoutColumnarLongsSerializer(
String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
CompressionFactory.LongEncodingWriter writer
)
{
this.columnName = columnName;
this.segmentWriteOutMedium = segmentWriteOutMedium;
this.writer = writer;
}
@ -71,6 +74,9 @@ public class EntireLayoutColumnarLongsSerializer implements ColumnarLongsSeriali
{
writer.write(value);
++numInserted;
if (numInserted < 0) {
throw new ColumnCapacityExceededException(columnName);
}
}
@Override

View File

@ -40,6 +40,7 @@ import java.nio.channels.WritableByteChannel;
*/
public class IntermediateColumnarLongsSerializer implements ColumnarLongsSerializer
{
private final String columnName;
private final SegmentWriteOutMedium segmentWriteOutMedium;
private final String filenameBase;
private final ByteOrder order;
@ -59,12 +60,14 @@ public class IntermediateColumnarLongsSerializer implements ColumnarLongsSeriali
private ColumnarLongsSerializer delegate;
IntermediateColumnarLongsSerializer(
String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
ByteOrder order,
CompressionStrategy compression
)
{
this.columnName = columnName;
this.segmentWriteOutMedium = segmentWriteOutMedium;
this.filenameBase = filenameBase;
this.order = order;
@ -92,6 +95,9 @@ public class IntermediateColumnarLongsSerializer implements ColumnarLongsSeriali
}
tempOut.add(value);
++numInserted;
if (numInserted < 0) {
throw new ColumnCapacityExceededException(columnName);
}
if (uniqueValues.size() <= CompressionFactory.MAX_TABLE_SIZE && !uniqueValues.containsKey(value)) {
uniqueValues.put(value, uniqueValues.size());
valuesAddedInOrder.add(value);
@ -127,9 +133,10 @@ public class IntermediateColumnarLongsSerializer implements ColumnarLongsSeriali
}
if (compression == CompressionStrategy.NONE) {
delegate = new EntireLayoutColumnarLongsSerializer(segmentWriteOutMedium, writer);
delegate = new EntireLayoutColumnarLongsSerializer(columnName, segmentWriteOutMedium, writer);
} else {
delegate = new BlockLayoutColumnarLongsSerializer(
columnName,
segmentWriteOutMedium,
filenameBase,
order,

View File

@ -36,6 +36,7 @@ public class V3CompressedVSizeColumnarMultiIntsSerializer extends ColumnarMultiI
private static final byte VERSION = V3CompressedVSizeColumnarMultiIntsSupplier.VERSION;
public static V3CompressedVSizeColumnarMultiIntsSerializer create(
final String columnName,
final SegmentWriteOutMedium segmentWriteOutMedium,
final String filenameBase,
final int maxValue,
@ -43,7 +44,9 @@ public class V3CompressedVSizeColumnarMultiIntsSerializer extends ColumnarMultiI
)
{
return new V3CompressedVSizeColumnarMultiIntsSerializer(
columnName,
new CompressedColumnarIntsSerializer(
columnName,
segmentWriteOutMedium,
filenameBase,
CompressedColumnarIntsSupplier.MAX_INTS_IN_BUFFER,
@ -51,6 +54,7 @@ public class V3CompressedVSizeColumnarMultiIntsSerializer extends ColumnarMultiI
compression
),
new CompressedVSizeColumnarIntsSerializer(
columnName,
segmentWriteOutMedium,
filenameBase,
maxValue,
@ -61,16 +65,19 @@ public class V3CompressedVSizeColumnarMultiIntsSerializer extends ColumnarMultiI
);
}
private final String columnName;
private final CompressedColumnarIntsSerializer offsetWriter;
private final CompressedVSizeColumnarIntsSerializer valueWriter;
private int offset;
private boolean lastOffsetWritten = false;
V3CompressedVSizeColumnarMultiIntsSerializer(
String columnName,
CompressedColumnarIntsSerializer offsetWriter,
CompressedVSizeColumnarIntsSerializer valueWriter
)
{
this.columnName = columnName;
this.offsetWriter = offsetWriter;
this.valueWriter = valueWriter;
this.offset = 0;
@ -95,6 +102,9 @@ public class V3CompressedVSizeColumnarMultiIntsSerializer extends ColumnarMultiI
valueWriter.addValue(ints.get(i));
}
offset += numValues;
if (offset < 0) {
throw new ColumnCapacityExceededException(columnName);
}
}
@Override

View File

@ -24,6 +24,7 @@ import it.unimi.dsi.fastutil.ints.IntArrayList;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import java.io.IOException;
import java.nio.ByteBuffer;
@ -73,6 +74,26 @@ public class V3CompressedVSizeColumnarMultiIntsSupplier implements WritableSuppl
throw new IAE("Unknown version[%s]", versionFromBuffer);
}
public static V3CompressedVSizeColumnarMultiIntsSupplier fromByteBuffer(ByteBuffer buffer, ByteOrder order, SmooshedFileMapper mapper)
{
byte versionFromBuffer = buffer.get();
if (versionFromBuffer == VERSION) {
CompressedColumnarIntsSupplier offsetSupplier = CompressedColumnarIntsSupplier.fromByteBuffer(
buffer,
order,
mapper
);
CompressedVSizeColumnarIntsSupplier valueSupplier = CompressedVSizeColumnarIntsSupplier.fromByteBuffer(
buffer,
order,
mapper
);
return new V3CompressedVSizeColumnarMultiIntsSupplier(offsetSupplier, valueSupplier);
}
throw new IAE("Unknown version[%s]", versionFromBuffer);
}
@VisibleForTesting
public static V3CompressedVSizeColumnarMultiIntsSupplier fromIterable(
final Iterable<IndexedInts> objectsIterable,

View File

@ -81,6 +81,7 @@ public class VSizeColumnarMultiIntsSerializer extends ColumnarMultiIntsSerialize
abstract void write(WriteOutBytes out, int v) throws IOException;
}
private final String columnName;
private final int maxId;
private final WriteInt writeInt;
@ -92,8 +93,13 @@ public class VSizeColumnarMultiIntsSerializer extends ColumnarMultiIntsSerialize
private int numWritten = 0;
private boolean numBytesForMaxWritten = false;
public VSizeColumnarMultiIntsSerializer(SegmentWriteOutMedium segmentWriteOutMedium, int maxId)
public VSizeColumnarMultiIntsSerializer(
String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
int maxId
)
{
this.columnName = columnName;
this.segmentWriteOutMedium = segmentWriteOutMedium;
this.maxId = maxId;
this.writeInt = WriteInt.values()[VSizeColumnarInts.getNumBytesForMax(maxId) - 1];
@ -120,6 +126,9 @@ public class VSizeColumnarMultiIntsSerializer extends ColumnarMultiIntsSerialize
headerOut.writeInt(Ints.checkedCast(valuesOut.size()));
++numWritten;
if (numWritten < 0) {
throw new ColumnCapacityExceededException(columnName);
}
}
@Override

View File

@ -33,22 +33,27 @@ import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.WriteOutBytes;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
@RunWith(Parameterized.class)
public class CompressedColumnarIntsSerializerTest
@ -64,6 +69,9 @@ public class CompressedColumnarIntsSerializerTest
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public ExpectedException expectedException = ExpectedException.none();
public CompressedColumnarIntsSerializerTest(
CompressionStrategy compressionStrategy,
ByteOrder byteOrder
@ -99,6 +107,77 @@ public class CompressedColumnarIntsSerializerTest
segmentWriteOutMedium.close();
}
@Test
public void testSmallData() throws Exception
{
// less than one chunk
for (int maxValue : MAX_VALUES) {
for (int chunkFactor : CHUNK_FACTORS) {
generateVals(rand.nextInt(chunkFactor), maxValue);
checkSerializedSizeAndData(chunkFactor);
}
}
}
@Test
public void testLargeData() throws Exception
{
// more than one chunk
for (int maxValue : MAX_VALUES) {
for (int chunkFactor : CHUNK_FACTORS) {
generateVals((rand.nextInt(5) + 5) * chunkFactor + rand.nextInt(chunkFactor), maxValue);
checkSerializedSizeAndData(chunkFactor);
}
}
}
@Test
public void testWriteEmpty() throws Exception
{
vals = new int[0];
checkSerializedSizeAndData(2);
}
@Test
public void testMultiValueFileLargeData() throws Exception
{
// more than one chunk
for (int maxValue : MAX_VALUES) {
for (int chunkFactor : CHUNK_FACTORS) {
generateVals((rand.nextInt(5) + 5) * chunkFactor + rand.nextInt(chunkFactor), maxValue);
checkV2SerializedSizeAndData(chunkFactor);
}
}
}
// this test takes ~30 minutes to run
@Ignore
@Test
public void testTooManyValues() throws IOException
{
expectedException.expect(ColumnCapacityExceededException.class);
expectedException.expectMessage(ColumnCapacityExceededException.formatMessage("test"));
try (
SegmentWriteOutMedium segmentWriteOutMedium =
TmpFileSegmentWriteOutMediumFactory.instance().makeSegmentWriteOutMedium(temporaryFolder.newFolder())
) {
CompressedColumnarIntsSerializer serializer = new CompressedColumnarIntsSerializer(
"test",
segmentWriteOutMedium,
"test",
CompressedColumnarIntsSupplier.MAX_INTS_IN_BUFFER,
byteOrder,
compressionStrategy
);
serializer.open();
final long numRows = Integer.MAX_VALUE + 100L;
for (long i = 0L; i < numRows; i++) {
serializer.addValue(ThreadLocalRandom.current().nextInt(0, Integer.MAX_VALUE));
}
}
}
private void generateVals(final int totalSize, final int maxValue)
{
vals = new int[totalSize];
@ -112,6 +191,7 @@ public class CompressedColumnarIntsSerializerTest
FileSmoosher smoosher = new FileSmoosher(temporaryFolder.newFolder());
CompressedColumnarIntsSerializer writer = new CompressedColumnarIntsSerializer(
"test",
segmentWriteOutMedium,
"test",
chunkFactor,
@ -149,44 +229,13 @@ public class CompressedColumnarIntsSerializerTest
CloseQuietly.close(columnarInts);
}
@Test
public void testSmallData() throws Exception
{
// less than one chunk
for (int maxValue : MAX_VALUES) {
for (int chunkFactor : CHUNK_FACTORS) {
generateVals(rand.nextInt(chunkFactor), maxValue);
checkSerializedSizeAndData(chunkFactor);
}
}
}
@Test
public void testLargeData() throws Exception
{
// more than one chunk
for (int maxValue : MAX_VALUES) {
for (int chunkFactor : CHUNK_FACTORS) {
generateVals((rand.nextInt(5) + 5) * chunkFactor + rand.nextInt(chunkFactor), maxValue);
checkSerializedSizeAndData(chunkFactor);
}
}
}
@Test
public void testWriteEmpty() throws Exception
{
vals = new int[0];
checkSerializedSizeAndData(2);
}
private void checkV2SerializedSizeAndData(int chunkFactor) throws Exception
{
File tmpDirectory = FileUtils.createTempDir(StringUtils.format("CompressedIntsIndexedWriterTest_%d", chunkFactor));
FileSmoosher smoosher = new FileSmoosher(tmpDirectory);
CompressedColumnarIntsSerializer writer = new CompressedColumnarIntsSerializer(
"test",
segmentWriteOutMedium,
chunkFactor,
byteOrder,
@ -223,16 +272,4 @@ public class CompressedColumnarIntsSerializerTest
CloseQuietly.close(columnarInts);
mapper.close();
}
@Test
public void testMultiValueFileLargeData() throws Exception
{
// more than one chunk
for (int maxValue : MAX_VALUES) {
for (int chunkFactor : CHUNK_FACTORS) {
generateVals((rand.nextInt(5) + 5) * chunkFactor + rand.nextInt(chunkFactor), maxValue);
checkV2SerializedSizeAndData(chunkFactor);
}
}
}
}

View File

@ -0,0 +1,331 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.data;
import com.google.common.base.Supplier;
import com.google.common.primitives.Doubles;
import it.unimi.dsi.fastutil.ints.IntArrays;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
* This is a copy-pasta of {@link CompressedFloatsSerdeTest} without {@link CompressedFloatsSerdeTest#testSupplierSerde}
* because doubles do not have a supplier serde (e.g. {@link CompressedColumnarFloatsSupplier} or
* {@link CompressedColumnarLongsSupplier}).
*
* It is not important that it remain a copy, the committer is just lazy
*/
@RunWith(Parameterized.class)
public class CompressedDoublesSerdeTest
{
@Parameterized.Parameters(name = "{0} {1} {2}")
public static Iterable<Object[]> compressionStrategies()
{
List<Object[]> data = new ArrayList<>();
for (CompressionStrategy strategy : CompressionStrategy.values()) {
data.add(new Object[]{strategy, ByteOrder.BIG_ENDIAN});
data.add(new Object[]{strategy, ByteOrder.LITTLE_ENDIAN});
}
return data;
}
private static final double DELTA = 0.00001;
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public ExpectedException expectedException = ExpectedException.none();
protected final CompressionStrategy compressionStrategy;
protected final ByteOrder order;
private final double[] values0 = {};
private final double[] values1 = {0, 1, 1, 0, 1, 1, 1, 1, 0, 0, 1, 1};
private final double[] values2 = {13.2, 6.1, 0.001, 123, 12572, 123.1, 784.4, 6892.8634, 8.341111};
private final double[] values3 = {0.001, 0.001, 0.001, 0.001, 0.001, 100, 100, 100, 100, 100};
private final double[] values4 = {0, 0, 0, 0, 0.01, 0, 0, 0, 21.22, 0, 0, 0, 0, 0, 0};
private final double[] values5 = {123.16, 1.12, 62.00, 462.12, 517.71, 56.54, 971.32, 824.22, 472.12, 625.26};
private final double[] values6 = {1000000, 1000001, 1000002, 1000003, 1000004, 1000005, 1000006, 1000007, 1000008};
private final double[] values7 = {
Double.POSITIVE_INFINITY,
Double.NEGATIVE_INFINITY,
12378.5734,
-12718243.7496,
-93653653.1,
12743153.385534,
21431.414538,
65487435436632.123,
-43734526234564.65
};
public CompressedDoublesSerdeTest(
CompressionStrategy compressionStrategy,
ByteOrder order
)
{
this.compressionStrategy = compressionStrategy;
this.order = order;
}
@Test
public void testValueSerde() throws Exception
{
testWithValues(values0);
testWithValues(values1);
testWithValues(values2);
testWithValues(values3);
testWithValues(values4);
testWithValues(values5);
testWithValues(values6);
testWithValues(values7);
}
@Test
public void testChunkSerde() throws Exception
{
double[] chunk = new double[10000];
for (int i = 0; i < 10000; i++) {
chunk[i] = i;
}
testWithValues(chunk);
}
// this test takes ~45 minutes to run
@Ignore
@Test
public void testTooManyValues() throws IOException
{
expectedException.expect(ColumnCapacityExceededException.class);
expectedException.expectMessage(ColumnCapacityExceededException.formatMessage("test"));
try (
SegmentWriteOutMedium segmentWriteOutMedium =
TmpFileSegmentWriteOutMediumFactory.instance().makeSegmentWriteOutMedium(temporaryFolder.newFolder())
) {
ColumnarDoublesSerializer serializer = CompressionFactory.getDoubleSerializer(
"test",
segmentWriteOutMedium,
"test",
order,
compressionStrategy
);
serializer.open();
final long numRows = Integer.MAX_VALUE + 100L;
for (long i = 0L; i < numRows; i++) {
serializer.add(ThreadLocalRandom.current().nextDouble());
}
}
}
public void testWithValues(double[] values) throws Exception
{
ColumnarDoublesSerializer serializer = CompressionFactory.getDoubleSerializer(
"test",
new OffHeapMemorySegmentWriteOutMedium(),
"test",
order,
compressionStrategy
);
serializer.open();
for (double value : values) {
serializer.add(value);
}
Assert.assertEquals(values.length, serializer.size());
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
serializer.writeTo(Channels.newChannel(baos), null);
Assert.assertEquals(baos.size(), serializer.getSerializedSize());
Supplier<ColumnarDoubles> supplier = CompressedColumnarDoublesSuppliers
.fromByteBuffer(ByteBuffer.wrap(baos.toByteArray()), order);
ColumnarDoubles doubles = supplier.get();
assertIndexMatchesVals(doubles, values);
for (int i = 0; i < 10; i++) {
int a = (int) (ThreadLocalRandom.current().nextDouble() * values.length);
int b = (int) (ThreadLocalRandom.current().nextDouble() * values.length);
int start = a < b ? a : b;
int end = a < b ? b : a;
tryFill(doubles, values, start, end - start);
}
testConcurrentThreadReads(supplier, doubles, values);
doubles.close();
}
private void tryFill(ColumnarDoubles indexed, double[] vals, final int startIndex, final int size)
{
double[] filled = new double[size];
indexed.get(filled, startIndex, filled.length);
for (int i = startIndex; i < filled.length; i++) {
Assert.assertEquals(vals[i + startIndex], filled[i], DELTA);
}
}
private void assertIndexMatchesVals(ColumnarDoubles indexed, double[] vals)
{
Assert.assertEquals(vals.length, indexed.size());
// sequential access
int[] indices = new int[vals.length];
for (int i = 0; i < indexed.size(); ++i) {
Assert.assertEquals(vals[i], indexed.get(i), DELTA);
indices[i] = i;
}
// random access, limited to 1000 elements for large lists (every element would take too long)
IntArrays.shuffle(indices, ThreadLocalRandom.current());
final int limit = Math.min(indexed.size(), 1000);
for (int i = 0; i < limit; ++i) {
int k = indices[i];
Assert.assertEquals(vals[k], indexed.get(k), DELTA);
}
}
// This test attempts to cause a race condition with the DirectByteBuffers, it's non-deterministic in causing it,
// which sucks but I can't think of a way to deterministically cause it...
private void testConcurrentThreadReads(
final Supplier<ColumnarDoubles> supplier,
final ColumnarDoubles indexed,
final double[] vals
) throws Exception
{
final AtomicReference<String> reason = new AtomicReference<String>("none");
final int numRuns = 1000;
final CountDownLatch startLatch = new CountDownLatch(1);
final CountDownLatch stopLatch = new CountDownLatch(2);
final AtomicBoolean failureHappened = new AtomicBoolean(false);
new Thread(new Runnable()
{
@Override
public void run()
{
try {
startLatch.await();
}
catch (InterruptedException e) {
failureHappened.set(true);
reason.set("interrupt.");
stopLatch.countDown();
return;
}
try {
for (int i = 0; i < numRuns; ++i) {
for (int j = 0; j < indexed.size(); ++j) {
final double val = vals[j];
final double indexedVal = indexed.get(j);
if (Doubles.compare(val, indexedVal) != 0) {
failureHappened.set(true);
reason.set(StringUtils.format("Thread1[%d]: %f != %f", j, val, indexedVal));
stopLatch.countDown();
return;
}
}
}
}
catch (Exception e) {
e.printStackTrace();
failureHappened.set(true);
reason.set(e.getMessage());
}
stopLatch.countDown();
}
}).start();
final ColumnarDoubles indexed2 = supplier.get();
try {
new Thread(new Runnable()
{
@Override
public void run()
{
try {
startLatch.await();
}
catch (InterruptedException e) {
stopLatch.countDown();
return;
}
try {
for (int i = 0; i < numRuns; ++i) {
for (int j = indexed2.size() - 1; j >= 0; --j) {
final double val = vals[j];
final double indexedVal = indexed2.get(j);
if (Doubles.compare(val, indexedVal) != 0) {
failureHappened.set(true);
reason.set(StringUtils.format("Thread2[%d]: %f != %f", j, val, indexedVal));
stopLatch.countDown();
return;
}
}
}
}
catch (Exception e) {
e.printStackTrace();
reason.set(e.getMessage());
failureHappened.set(true);
}
stopLatch.countDown();
}
}).start();
startLatch.countDown();
stopLatch.await();
}
finally {
CloseQuietly.close(indexed2);
}
if (failureHappened.get()) {
Assert.fail("Failure happened. Reason: " + reason.get());
}
}
}

View File

@ -25,8 +25,14 @@ import it.unimi.dsi.fastutil.ints.IntArrays;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -58,6 +64,12 @@ public class CompressedFloatsSerdeTest
private static final double DELTA = 0.00001;
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public ExpectedException expectedException = ExpectedException.none();
protected final CompressionStrategy compressionStrategy;
protected final ByteOrder order;
@ -105,9 +117,37 @@ public class CompressedFloatsSerdeTest
testWithValues(chunk);
}
// this test takes ~30 minutes to run
@Ignore
@Test
public void testTooManyValues() throws IOException
{
expectedException.expect(ColumnCapacityExceededException.class);
expectedException.expectMessage(ColumnCapacityExceededException.formatMessage("test"));
try (
SegmentWriteOutMedium segmentWriteOutMedium =
TmpFileSegmentWriteOutMediumFactory.instance().makeSegmentWriteOutMedium(temporaryFolder.newFolder())
) {
ColumnarFloatsSerializer serializer = CompressionFactory.getFloatSerializer(
"test",
segmentWriteOutMedium,
"test",
order,
compressionStrategy
);
serializer.open();
final long numRows = Integer.MAX_VALUE + 100L;
for (long i = 0L; i < numRows; i++) {
serializer.add(ThreadLocalRandom.current().nextFloat());
}
}
}
public void testWithValues(float[] values) throws Exception
{
ColumnarFloatsSerializer serializer = CompressionFactory.getFloatSerializer(
"test",
new OffHeapMemorySegmentWriteOutMedium(),
"test",
order,

View File

@ -95,6 +95,7 @@ public class CompressedLongsAutoEncodingSerdeTest
public void testValues(long[] values) throws Exception
{
ColumnarLongsSerializer serializer = CompressionFactory.getLongSerializer(
"test",
new OffHeapMemorySegmentWriteOutMedium(),
"test",
order,

View File

@ -25,8 +25,14 @@ import it.unimi.dsi.fastutil.ints.IntArrays;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -58,6 +64,12 @@ public class CompressedLongsSerdeTest
return data;
}
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public ExpectedException expectedException = ExpectedException.none();
protected final CompressionFactory.LongEncodingStrategy encodingStrategy;
protected final CompressionStrategy compressionStrategy;
protected final ByteOrder order;
@ -121,6 +133,38 @@ public class CompressedLongsSerdeTest
testWithValues(chunk);
}
// this test takes ~50 minutes to run (even skipping 'auto')
@Ignore
@Test
public void testTooManyValues() throws IOException
{
// uncomment this if 'auto' encoded long unbounded heap usage gets put in check and this can actually pass
if (encodingStrategy.equals(CompressionFactory.LongEncodingStrategy.AUTO)) {
return;
}
expectedException.expect(ColumnCapacityExceededException.class);
expectedException.expectMessage(ColumnCapacityExceededException.formatMessage("test"));
try (
SegmentWriteOutMedium segmentWriteOutMedium =
TmpFileSegmentWriteOutMediumFactory.instance().makeSegmentWriteOutMedium(temporaryFolder.newFolder())
) {
ColumnarLongsSerializer serializer = CompressionFactory.getLongSerializer(
"test",
segmentWriteOutMedium,
"test",
order,
encodingStrategy,
compressionStrategy
);
serializer.open();
final long numRows = Integer.MAX_VALUE + 100L;
for (long i = 0L; i < numRows; i++) {
serializer.add(ThreadLocalRandom.current().nextLong());
}
}
}
public void testWithValues(long[] values) throws Exception
{
testValues(values);
@ -130,6 +174,7 @@ public class CompressedLongsSerdeTest
public void testValues(long[] values) throws Exception
{
ColumnarLongsSerializer serializer = CompressionFactory.getLongSerializer(
"test",
new OffHeapMemorySegmentWriteOutMedium(),
"test",
order,

View File

@ -32,22 +32,27 @@ import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.WriteOutBytes;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
@RunWith(Parameterized.class)
public class CompressedVSizeColumnarIntsSerializerTest
@ -62,6 +67,9 @@ public class CompressedVSizeColumnarIntsSerializerTest
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public ExpectedException expectedException = ExpectedException.none();
public CompressedVSizeColumnarIntsSerializerTest(
CompressionStrategy compressionStrategy,
ByteOrder byteOrder
@ -108,8 +116,9 @@ public class CompressedVSizeColumnarIntsSerializerTest
private void checkSerializedSizeAndData(int chunkSize) throws Exception
{
FileSmoosher smoosher = new FileSmoosher(temporaryFolder.newFolder());
final String columnName = "test";
CompressedVSizeColumnarIntsSerializer writer = new CompressedVSizeColumnarIntsSerializer(
columnName,
segmentWriteOutMedium,
"test",
vals.length > 0 ? Ints.max(vals) : 0,
@ -170,6 +179,44 @@ public class CompressedVSizeColumnarIntsSerializerTest
}
}
// this test takes ~18 minutes to run
@Ignore
@Test
public void testTooManyValues() throws IOException
{
final int maxValue = 0x0FFFFFFF;
final int maxChunkSize = CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue);
expectedException.expect(ColumnCapacityExceededException.class);
expectedException.expectMessage(ColumnCapacityExceededException.formatMessage("test"));
try (
SegmentWriteOutMedium segmentWriteOutMedium =
TmpFileSegmentWriteOutMediumFactory.instance().makeSegmentWriteOutMedium(temporaryFolder.newFolder())
) {
GenericIndexedWriter genericIndexed = GenericIndexedWriter.ofCompressedByteBuffers(
segmentWriteOutMedium,
"test",
compressionStrategy,
Long.BYTES * 10000
);
CompressedVSizeColumnarIntsSerializer serializer = new CompressedVSizeColumnarIntsSerializer(
"test",
segmentWriteOutMedium,
maxValue,
maxChunkSize,
byteOrder,
compressionStrategy,
genericIndexed
);
serializer.open();
final long numRows = Integer.MAX_VALUE + 100L;
for (long i = 0L; i < numRows; i++) {
serializer.addValue(ThreadLocalRandom.current().nextInt(0, Integer.MAX_VALUE));
}
}
}
@Test
public void testEmpty() throws Exception
{
@ -181,7 +228,7 @@ public class CompressedVSizeColumnarIntsSerializerTest
{
File tmpDirectory = temporaryFolder.newFolder();
FileSmoosher smoosher = new FileSmoosher(tmpDirectory);
final String columnName = "test";
GenericIndexedWriter genericIndexed = GenericIndexedWriter.ofCompressedByteBuffers(
segmentWriteOutMedium,
"test",
@ -189,6 +236,7 @@ public class CompressedVSizeColumnarIntsSerializerTest
Long.BYTES * 10000
);
CompressedVSizeColumnarIntsSerializer writer = new CompressedVSizeColumnarIntsSerializer(
columnName,
segmentWriteOutMedium,
vals.length > 0 ? Ints.max(vals) : 0,
chunkSize,

View File

@ -32,11 +32,14 @@ import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.WriteOutBytes;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -54,6 +57,7 @@ import java.util.stream.IntStream;
@RunWith(Parameterized.class)
public class V3CompressedVSizeColumnarMultiIntsSerializerTest
{
private static final String TEST_COLUMN_NAME = "test";
private static final int[] OFFSET_CHUNK_FACTORS = new int[]{
1,
2,
@ -69,6 +73,9 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public ExpectedException expectedException = ExpectedException.none();
public V3CompressedVSizeColumnarMultiIntsSerializerTest(
CompressionStrategy compressionStrategy,
ByteOrder byteOrder
@ -92,19 +99,101 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest
);
}
private void generateVals(final int totalSize, final int maxValue)
@Before
public void setUp()
{
vals = new ArrayList<>(totalSize);
for (int i = 0; i < totalSize; ++i) {
int len = rand.nextInt(2) + 1;
int[] subVals = new int[len];
for (int j = 0; j < len; ++j) {
subVals[j] = rand.nextInt(maxValue);
vals = null;
}
@Test
public void testSmallData() throws Exception
{
// less than one chunk
for (int offsetChunk : OFFSET_CHUNK_FACTORS) {
for (int maxValue : MAX_VALUES) {
final int valueChunk = CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue);
generateVals(rand.nextInt(valueChunk), maxValue, 2);
checkSerializedSizeAndData(offsetChunk, valueChunk);
}
vals.add(subVals);
}
}
@Test
public void testLargeData() throws Exception
{
// more than one chunk
for (int offsetChunk : OFFSET_CHUNK_FACTORS) {
for (int maxValue : MAX_VALUES) {
final int valueChunk = CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue);
generateVals((rand.nextInt(2) + 1) * valueChunk + rand.nextInt(valueChunk), maxValue, 2);
checkSerializedSizeAndData(offsetChunk, valueChunk);
}
}
}
@Test
public void testEmpty() throws Exception
{
vals = new ArrayList<>();
checkSerializedSizeAndData(1, 2);
}
@Test
public void testMultiValueFileLargeData() throws Exception
{
// more than one chunk
for (int offsetChunk : OFFSET_CHUNK_FACTORS) {
for (int maxValue : MAX_VALUES) {
final int valueChunk = CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue);
generateVals((rand.nextInt(2) + 1) * valueChunk + rand.nextInt(valueChunk), maxValue, 2);
checkV2SerializedSizeAndData(offsetChunk, valueChunk);
}
}
}
// this test takes ~30 minutes to run
@Ignore
@Test
public void testTooManyValues() throws Exception
{
expectedException.expect(ColumnCapacityExceededException.class);
expectedException.expectMessage(ColumnCapacityExceededException.formatMessage("test"));
// more than one chunk
final int offsetChunk = CompressedColumnarIntsSupplier.MAX_INTS_IN_BUFFER;
final int maxValue = 0x0FFFFFFF;
final int valueChunk = CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue);
final int numRows = 10000000;
final int maxValuesPerRow = 1000;
generateV2SerializedSizeAndData(numRows, maxValue, maxValuesPerRow, offsetChunk, valueChunk);
}
private void generateVals(final int rowCount, final int maxValue, final int numValuesPerRow)
{
vals = new ArrayList<>(rowCount);
for (int i = 0; i < rowCount; ++i) {
vals.add(generateRow(rand, maxValue, numValuesPerRow));
}
}
private int[] generateRow(Random rand, final int maxValue, final int numValuesPerRow)
{
int len = rand.nextInt(numValuesPerRow) + 1;
int[] subVals = new int[len];
for (int j = 0; j < len; ++j) {
subVals[j] = rand.nextInt(maxValue);
}
return subVals;
}
private int getMaxValue(final List<int[]> vals)
{
return vals
.stream()
.mapToInt(array -> IntStream.of(array).max().orElse(0))
.max()
.orElseThrow(NoSuchElementException::new);
}
private void checkSerializedSizeAndData(int offsetChunkFactor, int valueChunkFactor) throws Exception
{
FileSmoosher smoosher = new FileSmoosher(temporaryFolder.newFolder());
@ -112,6 +201,7 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest
try (SegmentWriteOutMedium segmentWriteOutMedium = new OffHeapMemorySegmentWriteOutMedium()) {
int maxValue = vals.size() > 0 ? getMaxValue(vals) : 0;
CompressedColumnarIntsSerializer offsetWriter = new CompressedColumnarIntsSerializer(
TEST_COLUMN_NAME,
segmentWriteOutMedium,
"offset",
offsetChunkFactor,
@ -119,6 +209,7 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest
compressionStrategy
);
CompressedVSizeColumnarIntsSerializer valueWriter = new CompressedVSizeColumnarIntsSerializer(
TEST_COLUMN_NAME,
segmentWriteOutMedium,
"value",
maxValue,
@ -127,7 +218,7 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest
compressionStrategy
);
V3CompressedVSizeColumnarMultiIntsSerializer writer =
new V3CompressedVSizeColumnarMultiIntsSerializer(offsetWriter, valueWriter);
new V3CompressedVSizeColumnarMultiIntsSerializer(TEST_COLUMN_NAME, offsetWriter, valueWriter);
V3CompressedVSizeColumnarMultiIntsSupplier supplierFromIterable =
V3CompressedVSizeColumnarMultiIntsSupplier.fromIterable(
Iterables.transform(vals, ArrayBasedIndexedInts::new),
@ -167,54 +258,6 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest
}
}
private int getMaxValue(final List<int[]> vals)
{
return vals
.stream()
.mapToInt(array -> IntStream.of(array).max().orElse(0))
.max()
.orElseThrow(NoSuchElementException::new);
}
@Before
public void setUp()
{
vals = null;
}
@Test
public void testSmallData() throws Exception
{
// less than one chunk
for (int offsetChunk : OFFSET_CHUNK_FACTORS) {
for (int maxValue : MAX_VALUES) {
final int valueChunk = CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue);
generateVals(rand.nextInt(valueChunk), maxValue);
checkSerializedSizeAndData(offsetChunk, valueChunk);
}
}
}
@Test
public void testLargeData() throws Exception
{
// more than one chunk
for (int offsetChunk : OFFSET_CHUNK_FACTORS) {
for (int maxValue : MAX_VALUES) {
final int valueChunk = CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue);
generateVals((rand.nextInt(2) + 1) * valueChunk + rand.nextInt(valueChunk), maxValue);
checkSerializedSizeAndData(offsetChunk, valueChunk);
}
}
}
@Test
public void testEmpty() throws Exception
{
vals = new ArrayList<>();
checkSerializedSizeAndData(1, 2);
}
private void checkV2SerializedSizeAndData(int offsetChunkFactor, int valueChunkFactor) throws Exception
{
File tmpDirectory = FileUtils.createTempDir(StringUtils.format(
@ -227,6 +270,7 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest
try (SegmentWriteOutMedium segmentWriteOutMedium = new OffHeapMemorySegmentWriteOutMedium()) {
CompressedColumnarIntsSerializer offsetWriter = new CompressedColumnarIntsSerializer(
TEST_COLUMN_NAME,
segmentWriteOutMedium,
offsetChunkFactor,
byteOrder,
@ -246,6 +290,7 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest
Long.BYTES * 250000
);
CompressedVSizeColumnarIntsSerializer valueWriter = new CompressedVSizeColumnarIntsSerializer(
TEST_COLUMN_NAME,
segmentWriteOutMedium,
maxValue,
valueChunkFactor,
@ -254,7 +299,7 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest
genericIndexed
);
V3CompressedVSizeColumnarMultiIntsSerializer writer =
new V3CompressedVSizeColumnarMultiIntsSerializer(offsetWriter, valueWriter);
new V3CompressedVSizeColumnarMultiIntsSerializer(TEST_COLUMN_NAME, offsetWriter, valueWriter);
writer.open();
for (int[] val : vals) {
writer.addValues(new ArrayBasedIndexedInts(val));
@ -282,16 +327,76 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest
}
}
@Test
public void testMultiValueFileLargeData() throws Exception
private void generateV2SerializedSizeAndData(long numRows, int maxValue, int maxValuesPerRow, int offsetChunkFactor, int valueChunkFactor) throws Exception
{
// more than one chunk
for (int offsetChunk : OFFSET_CHUNK_FACTORS) {
for (int maxValue : MAX_VALUES) {
final int valueChunk = CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue);
generateVals((rand.nextInt(2) + 1) * valueChunk + rand.nextInt(valueChunk), maxValue);
checkV2SerializedSizeAndData(offsetChunk, valueChunk);
File tmpDirectory = FileUtils.createTempDir(StringUtils.format(
"CompressedVSizeIndexedV3WriterTest_%d_%d",
offsetChunkFactor,
offsetChunkFactor
));
FileSmoosher smoosher = new FileSmoosher(tmpDirectory);
try (
SegmentWriteOutMedium segmentWriteOutMedium =
TmpFileSegmentWriteOutMediumFactory.instance().makeSegmentWriteOutMedium(temporaryFolder.newFolder())
) {
CompressedColumnarIntsSerializer offsetWriter = new CompressedColumnarIntsSerializer(
TEST_COLUMN_NAME,
segmentWriteOutMedium,
offsetChunkFactor,
byteOrder,
compressionStrategy,
GenericIndexedWriter.ofCompressedByteBuffers(
segmentWriteOutMedium,
"offset",
compressionStrategy,
Long.BYTES * 250000
)
);
GenericIndexedWriter genericIndexed = GenericIndexedWriter.ofCompressedByteBuffers(
segmentWriteOutMedium,
"value",
compressionStrategy,
Long.BYTES * 250000
);
CompressedVSizeColumnarIntsSerializer valueWriter = new CompressedVSizeColumnarIntsSerializer(
TEST_COLUMN_NAME,
segmentWriteOutMedium,
maxValue,
valueChunkFactor,
byteOrder,
compressionStrategy,
genericIndexed
);
V3CompressedVSizeColumnarMultiIntsSerializer writer =
new V3CompressedVSizeColumnarMultiIntsSerializer(TEST_COLUMN_NAME, offsetWriter, valueWriter);
writer.open();
for (long l = 0L; l < numRows; l++) {
writer.addValues(new ArrayBasedIndexedInts(generateRow(rand, maxValue, maxValuesPerRow)));
}
final SmooshedWriter channel = smoosher.addWithSmooshedWriter("test", writer.getSerializedSize());
writer.writeTo(channel, smoosher);
channel.close();
smoosher.close();
SmooshedFileMapper mapper = Smoosh.map(tmpDirectory);
V3CompressedVSizeColumnarMultiIntsSupplier supplierFromByteBuffer =
V3CompressedVSizeColumnarMultiIntsSupplier.fromByteBuffer(mapper.mapFile("test"), byteOrder, mapper);
ColumnarMultiInts columnarMultiInts = supplierFromByteBuffer.get();
Assert.assertEquals(columnarMultiInts.size(), numRows);
Random verifier = new Random(0);
for (int i = 0; i < numRows; ++i) {
IndexedInts subVals = columnarMultiInts.get(i);
int[] expected = generateRow(verifier, maxValue, maxValuesPerRow);
Assert.assertEquals(subVals.size(), expected.length);
for (int j = 0, size = subVals.size(); j < size; ++j) {
Assert.assertEquals(subVals.get(j), expected[j]);
}
}
CloseQuietly.close(columnarMultiInts);
mapper.close();
}
}
}