From 2c49f6d89acdf1e15f6bc339321f0e78066e2e42 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Thu, 26 Mar 2020 16:54:48 -0700 Subject: [PATCH] error on value counter overflow instead of writing sad segments (#9559) --- ...loatCompressionBenchmarkFileGenerator.java | 36 +- ...LongCompressionBenchmarkFileGenerator.java | 36 +- .../druid/segment/DoubleColumnSerializer.java | 7 +- .../segment/DoubleColumnSerializerV2.java | 6 + .../druid/segment/FloatColumnSerializer.java | 7 +- .../segment/FloatColumnSerializerV2.java | 6 + .../apache/druid/segment/IndexMergerV9.java | 6 + .../druid/segment/LongColumnSerializer.java | 7 +- .../druid/segment/LongColumnSerializerV2.java | 6 + .../segment/StringDimensionMergerV9.java | 5 +- .../BlockLayoutColumnarDoublesSerializer.java | 12 + .../BlockLayoutColumnarFloatsSerializer.java | 6 + .../BlockLayoutColumnarLongsSerializer.java | 6 + .../data/ColumnCapacityExceededException.java | 39 +++ .../data/ColumnarDoublesSerializer.java | 1 + .../CompressedColumnarIntsSerializer.java | 9 +- .../data/CompressedColumnarIntsSupplier.java | 20 ++ ...CompressedVSizeColumnarIntsSerializer.java | 10 + .../CompressedVSizeColumnarIntsSupplier.java | 29 ++ .../segment/data/CompressionFactory.java | 38 +- ...EntireLayoutColumnarDoublesSerializer.java | 13 +- .../EntireLayoutColumnarFloatsSerializer.java | 9 +- .../EntireLayoutColumnarLongsSerializer.java | 6 + .../IntermediateColumnarLongsSerializer.java | 9 +- ...essedVSizeColumnarMultiIntsSerializer.java | 10 + ...pressedVSizeColumnarMultiIntsSupplier.java | 21 ++ .../VSizeColumnarMultiIntsSerializer.java | 11 +- .../CompressedColumnarIntsSerializerTest.java | 125 ++++--- .../data/CompressedDoublesSerdeTest.java | 331 ++++++++++++++++++ .../data/CompressedFloatsSerdeTest.java | 40 +++ .../CompressedLongsAutoEncodingSerdeTest.java | 1 + .../data/CompressedLongsSerdeTest.java | 45 +++ ...ressedVSizeColumnarIntsSerializerTest.java | 52 ++- ...dVSizeColumnarMultiIntsSerializerTest.java | 237 +++++++++---- 34 files changed, 1044 insertions(+), 158 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/segment/data/ColumnCapacityExceededException.java create mode 100644 processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/FloatCompressionBenchmarkFileGenerator.java b/benchmarks/src/test/java/org/apache/druid/benchmark/FloatCompressionBenchmarkFileGenerator.java index ef3bffc6500..1570619b902 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/FloatCompressionBenchmarkFileGenerator.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/FloatCompressionBenchmarkFileGenerator.java @@ -65,21 +65,26 @@ public class FloatCompressionBenchmarkFileGenerator dirPath = args[0]; } - BenchmarkColumnSchema enumeratedSchema = BenchmarkColumnSchema.makeEnumerated("", ValueType.FLOAT, true, 1, 0d, - ImmutableList.of( - 0f, - 1.1f, - 2.2f, - 3.3f, - 4.4f - ), - ImmutableList.of( - 0.95, - 0.001, - 0.0189, - 0.03, - 0.0001 - ) + BenchmarkColumnSchema enumeratedSchema = BenchmarkColumnSchema.makeEnumerated( + "", + ValueType.FLOAT, + true, + 1, + 0d, + ImmutableList.of( + 0f, + 1.1f, + 2.2f, + 3.3f, + 4.4f + ), + ImmutableList.of( + 0.95, + 0.001, + 0.0189, + 0.03, + 0.0001 + ) ); BenchmarkColumnSchema zipfLowSchema = BenchmarkColumnSchema.makeZipf( "", @@ -151,6 +156,7 @@ public class FloatCompressionBenchmarkFileGenerator File dataFile = new File(dir, entry.getKey()); ColumnarFloatsSerializer writer = CompressionFactory.getFloatSerializer( + "float-benchmark", new OffHeapMemorySegmentWriteOutMedium(), "float", ByteOrder.nativeOrder(), diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/LongCompressionBenchmarkFileGenerator.java b/benchmarks/src/test/java/org/apache/druid/benchmark/LongCompressionBenchmarkFileGenerator.java index 31c39fe040b..ead3dd0e5cc 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/LongCompressionBenchmarkFileGenerator.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/LongCompressionBenchmarkFileGenerator.java @@ -66,21 +66,26 @@ public class LongCompressionBenchmarkFileGenerator dirPath = args[0]; } - BenchmarkColumnSchema enumeratedSchema = BenchmarkColumnSchema.makeEnumerated("", ValueType.LONG, true, 1, 0d, - ImmutableList.of( - 0, - 1, - 2, - 3, - 4 - ), - ImmutableList.of( - 0.95, - 0.001, - 0.0189, - 0.03, - 0.0001 - ) + BenchmarkColumnSchema enumeratedSchema = BenchmarkColumnSchema.makeEnumerated( + "", + ValueType.LONG, + true, + 1, + 0d, + ImmutableList.of( + 0, + 1, + 2, + 3, + 4 + ), + ImmutableList.of( + 0.95, + 0.001, + 0.0189, + 0.03, + 0.0001 + ) ); BenchmarkColumnSchema zipfLowSchema = BenchmarkColumnSchema.makeZipf("", ValueType.LONG, true, 1, 0d, -1, 1000, 1d); BenchmarkColumnSchema zipfHighSchema = BenchmarkColumnSchema.makeZipf( @@ -144,6 +149,7 @@ public class LongCompressionBenchmarkFileGenerator File dataFile = new File(dir, entry.getKey()); ColumnarLongsSerializer writer = CompressionFactory.getLongSerializer( + "long-benchmark", new OffHeapMemorySegmentWriteOutMedium(), "long", ByteOrder.nativeOrder(), diff --git a/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializer.java index 67b68eda5a1..cc525cb7ba8 100644 --- a/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializer.java @@ -33,14 +33,16 @@ import java.nio.channels.WritableByteChannel; public class DoubleColumnSerializer implements GenericColumnSerializer { public static DoubleColumnSerializer create( + String columnName, SegmentWriteOutMedium segmentWriteOutMedium, String filenameBase, CompressionStrategy compression ) { - return new DoubleColumnSerializer(segmentWriteOutMedium, filenameBase, IndexIO.BYTE_ORDER, compression); + return new DoubleColumnSerializer(columnName, segmentWriteOutMedium, filenameBase, IndexIO.BYTE_ORDER, compression); } + private final String columnName; private final SegmentWriteOutMedium segmentWriteOutMedium; private final String filenameBase; private final ByteOrder byteOrder; @@ -48,12 +50,14 @@ public class DoubleColumnSerializer implements GenericColumnSerializer private ColumnarDoublesSerializer writer; private DoubleColumnSerializer( + String columnName, SegmentWriteOutMedium segmentWriteOutMedium, String filenameBase, ByteOrder byteOrder, CompressionStrategy compression ) { + this.columnName = columnName; this.segmentWriteOutMedium = segmentWriteOutMedium; this.filenameBase = filenameBase; this.byteOrder = byteOrder; @@ -64,6 +68,7 @@ public class DoubleColumnSerializer implements GenericColumnSerializer public void open() throws IOException { writer = CompressionFactory.getDoubleSerializer( + columnName, segmentWriteOutMedium, StringUtils.format("%s.double_column", filenameBase), byteOrder, diff --git a/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializerV2.java b/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializerV2.java index 3078a6aaad0..02903eb4a27 100644 --- a/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializerV2.java +++ b/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializerV2.java @@ -45,6 +45,7 @@ import java.nio.channels.WritableByteChannel; public class DoubleColumnSerializerV2 implements GenericColumnSerializer { public static DoubleColumnSerializerV2 create( + String columnName, SegmentWriteOutMedium segmentWriteOutMedium, String filenameBase, CompressionStrategy compression, @@ -52,6 +53,7 @@ public class DoubleColumnSerializerV2 implements GenericColumnSerializer ) { return new DoubleColumnSerializerV2( + columnName, segmentWriteOutMedium, filenameBase, IndexIO.BYTE_ORDER, @@ -60,6 +62,7 @@ public class DoubleColumnSerializerV2 implements GenericColumnSerializer ); } + private final String columnName; private final SegmentWriteOutMedium segmentWriteOutMedium; private final String filenameBase; private final ByteOrder byteOrder; @@ -72,6 +75,7 @@ public class DoubleColumnSerializerV2 implements GenericColumnSerializer private int rowCount = 0; private DoubleColumnSerializerV2( + String columnName, SegmentWriteOutMedium segmentWriteOutMedium, String filenameBase, ByteOrder byteOrder, @@ -79,6 +83,7 @@ public class DoubleColumnSerializerV2 implements GenericColumnSerializer BitmapSerdeFactory bitmapSerdeFactory ) { + this.columnName = columnName; this.segmentWriteOutMedium = segmentWriteOutMedium; this.filenameBase = filenameBase; this.byteOrder = byteOrder; @@ -90,6 +95,7 @@ public class DoubleColumnSerializerV2 implements GenericColumnSerializer public void open() throws IOException { writer = CompressionFactory.getDoubleSerializer( + columnName, segmentWriteOutMedium, StringUtils.format("%s.double_column", filenameBase), byteOrder, diff --git a/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializer.java index 8d413d54c1f..b96d520e2e2 100644 --- a/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializer.java @@ -33,14 +33,16 @@ import java.nio.channels.WritableByteChannel; public class FloatColumnSerializer implements GenericColumnSerializer { public static FloatColumnSerializer create( + String columnName, SegmentWriteOutMedium segmentWriteOutMedium, String filenameBase, CompressionStrategy compression ) { - return new FloatColumnSerializer(segmentWriteOutMedium, filenameBase, IndexIO.BYTE_ORDER, compression); + return new FloatColumnSerializer(columnName, segmentWriteOutMedium, filenameBase, IndexIO.BYTE_ORDER, compression); } + private final String columnName; private final SegmentWriteOutMedium segmentWriteOutMedium; private final String filenameBase; private final ByteOrder byteOrder; @@ -48,12 +50,14 @@ public class FloatColumnSerializer implements GenericColumnSerializer private ColumnarFloatsSerializer writer; private FloatColumnSerializer( + String columnName, SegmentWriteOutMedium segmentWriteOutMedium, String filenameBase, ByteOrder byteOrder, CompressionStrategy compression ) { + this.columnName = columnName; this.segmentWriteOutMedium = segmentWriteOutMedium; this.filenameBase = filenameBase; this.byteOrder = byteOrder; @@ -64,6 +68,7 @@ public class FloatColumnSerializer implements GenericColumnSerializer public void open() throws IOException { writer = CompressionFactory.getFloatSerializer( + columnName, segmentWriteOutMedium, StringUtils.format("%s.float_column", filenameBase), byteOrder, diff --git a/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializerV2.java b/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializerV2.java index 41e7b685a90..b5371a0aac9 100644 --- a/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializerV2.java +++ b/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializerV2.java @@ -45,6 +45,7 @@ import java.nio.channels.WritableByteChannel; public class FloatColumnSerializerV2 implements GenericColumnSerializer { public static FloatColumnSerializerV2 create( + String columnName, SegmentWriteOutMedium segmentWriteOutMedium, String filenameBase, CompressionStrategy compression, @@ -52,6 +53,7 @@ public class FloatColumnSerializerV2 implements GenericColumnSerializer ) { return new FloatColumnSerializerV2( + columnName, segmentWriteOutMedium, filenameBase, IndexIO.BYTE_ORDER, @@ -60,6 +62,7 @@ public class FloatColumnSerializerV2 implements GenericColumnSerializer ); } + private final String columnName; private final SegmentWriteOutMedium segmentWriteOutMedium; private final String filenameBase; private final ByteOrder byteOrder; @@ -72,6 +75,7 @@ public class FloatColumnSerializerV2 implements GenericColumnSerializer private int rowCount = 0; private FloatColumnSerializerV2( + String columnName, SegmentWriteOutMedium segmentWriteOutMedium, String filenameBase, ByteOrder byteOrder, @@ -79,6 +83,7 @@ public class FloatColumnSerializerV2 implements GenericColumnSerializer BitmapSerdeFactory bitmapSerdeFactory ) { + this.columnName = columnName; this.segmentWriteOutMedium = segmentWriteOutMedium; this.filenameBase = filenameBase; this.byteOrder = byteOrder; @@ -90,6 +95,7 @@ public class FloatColumnSerializerV2 implements GenericColumnSerializer public void open() throws IOException { writer = CompressionFactory.getFloatSerializer( + columnName, segmentWriteOutMedium, StringUtils.format("%s.float_column", filenameBase), byteOrder, diff --git a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java index 91e202ab183..e1a92f7d1a3 100644 --- a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java +++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java @@ -621,6 +621,7 @@ public class IndexMergerV9 implements IndexMerger // If using default values for null use LongColumnSerializer to allow rollback to previous versions. if (NullHandling.replaceWithDefault()) { return LongColumnSerializer.create( + columnName, segmentWriteOutMedium, columnName, indexSpec.getMetricCompression(), @@ -628,6 +629,7 @@ public class IndexMergerV9 implements IndexMerger ); } else { return LongColumnSerializerV2.create( + columnName, segmentWriteOutMedium, columnName, indexSpec.getMetricCompression(), @@ -646,12 +648,14 @@ public class IndexMergerV9 implements IndexMerger // If using default values for null use DoubleColumnSerializer to allow rollback to previous versions. if (NullHandling.replaceWithDefault()) { return DoubleColumnSerializer.create( + columnName, segmentWriteOutMedium, columnName, indexSpec.getMetricCompression() ); } else { return DoubleColumnSerializerV2.create( + columnName, segmentWriteOutMedium, columnName, indexSpec.getMetricCompression(), @@ -669,12 +673,14 @@ public class IndexMergerV9 implements IndexMerger // If using default values for null use FloatColumnSerializer to allow rollback to previous versions. if (NullHandling.replaceWithDefault()) { return FloatColumnSerializer.create( + columnName, segmentWriteOutMedium, columnName, indexSpec.getMetricCompression() ); } else { return FloatColumnSerializerV2.create( + columnName, segmentWriteOutMedium, columnName, indexSpec.getMetricCompression(), diff --git a/processing/src/main/java/org/apache/druid/segment/LongColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/LongColumnSerializer.java index 8f4cc586bc3..1f8d03bebc6 100644 --- a/processing/src/main/java/org/apache/druid/segment/LongColumnSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/LongColumnSerializer.java @@ -36,15 +36,17 @@ import java.nio.channels.WritableByteChannel; public class LongColumnSerializer implements GenericColumnSerializer { public static LongColumnSerializer create( + String columnName, SegmentWriteOutMedium segmentWriteOutMedium, String filenameBase, CompressionStrategy compression, CompressionFactory.LongEncodingStrategy encoding ) { - return new LongColumnSerializer(segmentWriteOutMedium, filenameBase, IndexIO.BYTE_ORDER, compression, encoding); + return new LongColumnSerializer(columnName, segmentWriteOutMedium, filenameBase, IndexIO.BYTE_ORDER, compression, encoding); } + private final String columnName; private final SegmentWriteOutMedium segmentWriteOutMedium; private final String filenameBase; private final ByteOrder byteOrder; @@ -53,6 +55,7 @@ public class LongColumnSerializer implements GenericColumnSerializer private ColumnarLongsSerializer writer; private LongColumnSerializer( + String columnName, SegmentWriteOutMedium segmentWriteOutMedium, String filenameBase, ByteOrder byteOrder, @@ -60,6 +63,7 @@ public class LongColumnSerializer implements GenericColumnSerializer CompressionFactory.LongEncodingStrategy encoding ) { + this.columnName = columnName; this.segmentWriteOutMedium = segmentWriteOutMedium; this.filenameBase = filenameBase; this.byteOrder = byteOrder; @@ -71,6 +75,7 @@ public class LongColumnSerializer implements GenericColumnSerializer public void open() throws IOException { writer = CompressionFactory.getLongSerializer( + columnName, segmentWriteOutMedium, StringUtils.format("%s.long_column", filenameBase), byteOrder, diff --git a/processing/src/main/java/org/apache/druid/segment/LongColumnSerializerV2.java b/processing/src/main/java/org/apache/druid/segment/LongColumnSerializerV2.java index cacac5910ca..364a7af6825 100644 --- a/processing/src/main/java/org/apache/druid/segment/LongColumnSerializerV2.java +++ b/processing/src/main/java/org/apache/druid/segment/LongColumnSerializerV2.java @@ -45,6 +45,7 @@ import java.nio.channels.WritableByteChannel; public class LongColumnSerializerV2 implements GenericColumnSerializer { public static LongColumnSerializerV2 create( + String columnName, SegmentWriteOutMedium segmentWriteOutMedium, String filenameBase, CompressionStrategy compression, @@ -53,6 +54,7 @@ public class LongColumnSerializerV2 implements GenericColumnSerializer ) { return new LongColumnSerializerV2( + columnName, segmentWriteOutMedium, filenameBase, IndexIO.BYTE_ORDER, @@ -62,6 +64,7 @@ public class LongColumnSerializerV2 implements GenericColumnSerializer ); } + private final String columnName; private final SegmentWriteOutMedium segmentWriteOutMedium; private final String filenameBase; private final ByteOrder byteOrder; @@ -75,6 +78,7 @@ public class LongColumnSerializerV2 implements GenericColumnSerializer private int rowCount = 0; private LongColumnSerializerV2( + String columnName, SegmentWriteOutMedium segmentWriteOutMedium, String filenameBase, ByteOrder byteOrder, @@ -83,6 +87,7 @@ public class LongColumnSerializerV2 implements GenericColumnSerializer BitmapSerdeFactory bitmapSerdeFactory ) { + this.columnName = columnName; this.segmentWriteOutMedium = segmentWriteOutMedium; this.filenameBase = filenameBase; this.byteOrder = byteOrder; @@ -95,6 +100,7 @@ public class LongColumnSerializerV2 implements GenericColumnSerializer public void open() throws IOException { writer = CompressionFactory.getLongSerializer( + columnName, segmentWriteOutMedium, StringUtils.format("%s.long_column", filenameBase), byteOrder, diff --git a/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java b/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java index 2ccc0dd4085..cff7ef99f80 100644 --- a/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java +++ b/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java @@ -224,17 +224,20 @@ public class StringDimensionMergerV9 implements DimensionMergerV9 if (capabilities.hasMultipleValues()) { if (compressionStrategy != CompressionStrategy.UNCOMPRESSED) { encodedValueSerializer = V3CompressedVSizeColumnarMultiIntsSerializer.create( + dimensionName, segmentWriteOutMedium, filenameBase, cardinality, compressionStrategy ); } else { - encodedValueSerializer = new VSizeColumnarMultiIntsSerializer(segmentWriteOutMedium, cardinality); + encodedValueSerializer = + new VSizeColumnarMultiIntsSerializer(dimensionName, segmentWriteOutMedium, cardinality); } } else { if (compressionStrategy != CompressionStrategy.UNCOMPRESSED) { encodedValueSerializer = CompressedVSizeColumnarIntsSerializer.create( + dimensionName, segmentWriteOutMedium, filenameBase, cardinality, diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java index c2247ac18d9..2b4612ecb3a 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java @@ -43,6 +43,7 @@ public class BlockLayoutColumnarDoublesSerializer implements ColumnarDoublesSeri .writeInt(x -> CompressedPools.BUFFER_SIZE / Double.BYTES) .writeByte(x -> x.compression.getId()); + private final String columnName; private final GenericIndexedWriter flattener; private final CompressionStrategy compression; @@ -51,12 +52,14 @@ public class BlockLayoutColumnarDoublesSerializer implements ColumnarDoublesSeri private ByteBuffer endBuffer; BlockLayoutColumnarDoublesSerializer( + String columnName, SegmentWriteOutMedium segmentWriteOutMedium, String filenameBase, ByteOrder byteOrder, CompressionStrategy compression ) { + this.columnName = columnName; this.flattener = GenericIndexedWriter.ofCompressedByteBuffers( segmentWriteOutMedium, filenameBase, @@ -75,6 +78,12 @@ public class BlockLayoutColumnarDoublesSerializer implements ColumnarDoublesSeri flattener.open(); } + @Override + public int size() + { + return numInserted; + } + @Override public void add(double value) throws IOException { @@ -89,6 +98,9 @@ public class BlockLayoutColumnarDoublesSerializer implements ColumnarDoublesSeri endBuffer.putDouble(value); ++numInserted; + if (numInserted < 0) { + throw new ColumnCapacityExceededException(columnName); + } } @Override diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java index e247252f6b3..94a3ef6319f 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java @@ -43,6 +43,7 @@ public class BlockLayoutColumnarFloatsSerializer implements ColumnarFloatsSerial .writeInt(x -> CompressedPools.BUFFER_SIZE / Float.BYTES) .writeByte(x -> x.compression.getId()); + private final String columnName; private final GenericIndexedWriter flattener; private final CompressionStrategy compression; @@ -51,12 +52,14 @@ public class BlockLayoutColumnarFloatsSerializer implements ColumnarFloatsSerial private ByteBuffer endBuffer; BlockLayoutColumnarFloatsSerializer( + String columnName, SegmentWriteOutMedium segmentWriteOutMedium, String filenameBase, ByteOrder byteOrder, CompressionStrategy compression ) { + this.columnName = columnName; this.flattener = GenericIndexedWriter.ofCompressedByteBuffers( segmentWriteOutMedium, filenameBase, @@ -94,6 +97,9 @@ public class BlockLayoutColumnarFloatsSerializer implements ColumnarFloatsSerial } endBuffer.putFloat(value); ++numInserted; + if (numInserted < 0) { + throw new ColumnCapacityExceededException(columnName); + } } @Override diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java index cb404025cc8..ff47a34cca4 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java @@ -42,6 +42,7 @@ public class BlockLayoutColumnarLongsSerializer implements ColumnarLongsSerializ .writeInt(x -> x.sizePer) .writeSomething(CompressionFactory.longEncodingWriter(x -> x.writer, x -> x.compression)); + private final String columnName; private final int sizePer; private final CompressionFactory.LongEncodingWriter writer; private final GenericIndexedWriter flattener; @@ -53,6 +54,7 @@ public class BlockLayoutColumnarLongsSerializer implements ColumnarLongsSerializ private ByteBuffer endBuffer; BlockLayoutColumnarLongsSerializer( + String columnName, SegmentWriteOutMedium segmentWriteOutMedium, String filenameBase, ByteOrder byteOrder, @@ -60,6 +62,7 @@ public class BlockLayoutColumnarLongsSerializer implements ColumnarLongsSerializ CompressionStrategy compression ) { + this.columnName = columnName; this.sizePer = writer.getBlockSize(CompressedPools.BUFFER_SIZE); int bufferSize = writer.getNumBytes(sizePer); this.flattener = GenericIndexedWriter.ofCompressedByteBuffers(segmentWriteOutMedium, filenameBase, compression, bufferSize); @@ -100,6 +103,9 @@ public class BlockLayoutColumnarLongsSerializer implements ColumnarLongsSerializ writer.write(value); ++numInserted; + if (numInserted < 0) { + throw new ColumnCapacityExceededException(columnName); + } } @Override diff --git a/processing/src/main/java/org/apache/druid/segment/data/ColumnCapacityExceededException.java b/processing/src/main/java/org/apache/druid/segment/data/ColumnCapacityExceededException.java new file mode 100644 index 00000000000..f7a2a6054be --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/data/ColumnCapacityExceededException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.data; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.druid.java.util.common.StringUtils; + +public class ColumnCapacityExceededException extends RuntimeException +{ + @VisibleForTesting + public static String formatMessage(String columnName) + { + return StringUtils.format( + "Too many values to store for %s column, try reducing maxRowsPerSegment", + columnName + ); + } + public ColumnCapacityExceededException(String columnName) + { + super(formatMessage(columnName)); + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoublesSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoublesSerializer.java index da1b9779d1d..f33cdee4c4e 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoublesSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoublesSerializer.java @@ -29,5 +29,6 @@ import java.io.IOException; public interface ColumnarDoublesSerializer extends Serializer { void open() throws IOException; + int size(); void add(double value) throws IOException; } diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java index 0d82b4ca226..a0442d95adf 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java @@ -25,7 +25,6 @@ import org.apache.druid.segment.serde.MetaSerdeHelper; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import javax.annotation.Nullable; - import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -44,6 +43,7 @@ public class CompressedColumnarIntsSerializer extends SingleValueColumnarIntsSer .writeInt(x -> x.chunkFactor) .writeByte(x -> x.compression.getId()); + private final String columnName; private final int chunkFactor; private final CompressionStrategy compression; private final GenericIndexedWriter flattener; @@ -53,6 +53,7 @@ public class CompressedColumnarIntsSerializer extends SingleValueColumnarIntsSer private ByteBuffer endBuffer; CompressedColumnarIntsSerializer( + final String columnName, final SegmentWriteOutMedium segmentWriteOutMedium, final String filenameBase, final int chunkFactor, @@ -61,6 +62,7 @@ public class CompressedColumnarIntsSerializer extends SingleValueColumnarIntsSer ) { this( + columnName, segmentWriteOutMedium, chunkFactor, byteOrder, @@ -75,6 +77,7 @@ public class CompressedColumnarIntsSerializer extends SingleValueColumnarIntsSer } CompressedColumnarIntsSerializer( + final String columnName, final SegmentWriteOutMedium segmentWriteOutMedium, final int chunkFactor, final ByteOrder byteOrder, @@ -82,6 +85,7 @@ public class CompressedColumnarIntsSerializer extends SingleValueColumnarIntsSer final GenericIndexedWriter flattener ) { + this.columnName = columnName; this.chunkFactor = chunkFactor; this.compression = compression; this.flattener = flattener; @@ -110,6 +114,9 @@ public class CompressedColumnarIntsSerializer extends SingleValueColumnarIntsSer } endBuffer.putInt(val); numInserted++; + if (numInserted < 0) { + throw new ColumnCapacityExceededException(columnName); + } } @Override diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java index f58ea592c7e..d9b3cf97085 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java @@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; +import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.CompressedPools; import org.apache.druid.segment.serde.MetaSerdeHelper; @@ -133,6 +134,25 @@ public class CompressedColumnarIntsSupplier implements WritableSupplier x.compression.getId()); public static CompressedVSizeColumnarIntsSerializer create( + final String columnName, final SegmentWriteOutMedium segmentWriteOutMedium, final String filenameBase, final int maxValue, @@ -54,6 +55,7 @@ public class CompressedVSizeColumnarIntsSerializer extends SingleValueColumnarIn ) { return new CompressedVSizeColumnarIntsSerializer( + columnName, segmentWriteOutMedium, filenameBase, maxValue, @@ -63,6 +65,7 @@ public class CompressedVSizeColumnarIntsSerializer extends SingleValueColumnarIn ); } + private final String columnName; private final int numBytes; private final int chunkFactor; private final boolean isBigEndian; @@ -75,6 +78,7 @@ public class CompressedVSizeColumnarIntsSerializer extends SingleValueColumnarIn private ByteBuffer endBuffer; CompressedVSizeColumnarIntsSerializer( + final String columnName, final SegmentWriteOutMedium segmentWriteOutMedium, final String filenameBase, final int maxValue, @@ -84,6 +88,7 @@ public class CompressedVSizeColumnarIntsSerializer extends SingleValueColumnarIn ) { this( + columnName, segmentWriteOutMedium, maxValue, chunkFactor, @@ -99,6 +104,7 @@ public class CompressedVSizeColumnarIntsSerializer extends SingleValueColumnarIn } CompressedVSizeColumnarIntsSerializer( + final String columnName, final SegmentWriteOutMedium segmentWriteOutMedium, final int maxValue, final int chunkFactor, @@ -107,6 +113,7 @@ public class CompressedVSizeColumnarIntsSerializer extends SingleValueColumnarIn final GenericIndexedWriter flattener ) { + this.columnName = columnName; this.numBytes = VSizeColumnarInts.getNumBytesForMax(maxValue); this.chunkFactor = chunkFactor; int chunkBytes = chunkFactor * numBytes; @@ -149,6 +156,9 @@ public class CompressedVSizeColumnarIntsSerializer extends SingleValueColumnarIn endBuffer.put(intBuffer.array(), 0, numBytes); } numInserted++; + if (numInserted < 0) { + throw new ColumnCapacityExceededException(columnName); + } } @Override diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java index 323945c523c..84d8b8d7ad7 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java @@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; +import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.CompressedPools; import org.apache.druid.segment.serde.MetaSerdeHelper; @@ -167,6 +168,34 @@ public class CompressedVSizeColumnarIntsSupplier implements WritableSupplier 0) .writeByte(x -> CompressionStrategy.NONE.getId()); + private final String columnName; private final SegmentWriteOutMedium segmentWriteOutMedium; private final ByteBuffer orderBuffer; private WriteOutBytes valuesOut; private int numInserted = 0; - public EntireLayoutColumnarDoublesSerializer(SegmentWriteOutMedium segmentWriteOutMedium, ByteOrder order) + public EntireLayoutColumnarDoublesSerializer(String columnName, SegmentWriteOutMedium segmentWriteOutMedium, ByteOrder order) { + this.columnName = columnName; this.segmentWriteOutMedium = segmentWriteOutMedium; this.orderBuffer = ByteBuffer.allocate(Double.BYTES); orderBuffer.order(order); @@ -59,6 +61,12 @@ public class EntireLayoutColumnarDoublesSerializer implements ColumnarDoublesSer valuesOut = segmentWriteOutMedium.makeWriteOutBytes(); } + @Override + public int size() + { + return numInserted; + } + @Override public void add(double value) throws IOException { @@ -66,6 +74,9 @@ public class EntireLayoutColumnarDoublesSerializer implements ColumnarDoublesSer orderBuffer.putDouble(value); valuesOut.write(orderBuffer.array()); ++numInserted; + if (numInserted < 0) { + throw new ColumnCapacityExceededException(columnName); + } } @Override diff --git a/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarFloatsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarFloatsSerializer.java index 1209b76217f..75b7290c67f 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarFloatsSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarFloatsSerializer.java @@ -39,16 +39,18 @@ public class EntireLayoutColumnarFloatsSerializer implements ColumnarFloatsSeria .writeInt(x -> 0) .writeByte(x -> CompressionStrategy.NONE.getId()); + private final String columnName; private final boolean isLittleEndian; private final SegmentWriteOutMedium segmentWriteOutMedium; private WriteOutBytes valuesOut; private int numInserted = 0; - EntireLayoutColumnarFloatsSerializer(SegmentWriteOutMedium segmentWriteOutMedium, ByteOrder order) + EntireLayoutColumnarFloatsSerializer(String columnName, SegmentWriteOutMedium segmentWriteOutMedium, ByteOrder order) { + this.columnName = columnName; this.segmentWriteOutMedium = segmentWriteOutMedium; - isLittleEndian = order.equals(ByteOrder.LITTLE_ENDIAN); + this.isLittleEndian = order.equals(ByteOrder.LITTLE_ENDIAN); } @Override @@ -73,6 +75,9 @@ public class EntireLayoutColumnarFloatsSerializer implements ColumnarFloatsSeria } valuesOut.writeInt(valueBits); ++numInserted; + if (numInserted < 0) { + throw new ColumnCapacityExceededException(columnName); + } } @Override diff --git a/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarLongsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarLongsSerializer.java index b29081bd0a5..9513836c016 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarLongsSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarLongsSerializer.java @@ -38,6 +38,7 @@ public class EntireLayoutColumnarLongsSerializer implements ColumnarLongsSeriali .writeInt(x -> 0) .writeSomething(CompressionFactory.longEncodingWriter(x -> x.writer, x -> CompressionStrategy.NONE)); + private final String columnName; private final CompressionFactory.LongEncodingWriter writer; private final SegmentWriteOutMedium segmentWriteOutMedium; private WriteOutBytes valuesOut; @@ -45,10 +46,12 @@ public class EntireLayoutColumnarLongsSerializer implements ColumnarLongsSeriali private int numInserted = 0; EntireLayoutColumnarLongsSerializer( + String columnName, SegmentWriteOutMedium segmentWriteOutMedium, CompressionFactory.LongEncodingWriter writer ) { + this.columnName = columnName; this.segmentWriteOutMedium = segmentWriteOutMedium; this.writer = writer; } @@ -71,6 +74,9 @@ public class EntireLayoutColumnarLongsSerializer implements ColumnarLongsSeriali { writer.write(value); ++numInserted; + if (numInserted < 0) { + throw new ColumnCapacityExceededException(columnName); + } } @Override diff --git a/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java index e08e463dd44..c0f9355350a 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java @@ -40,6 +40,7 @@ import java.nio.channels.WritableByteChannel; */ public class IntermediateColumnarLongsSerializer implements ColumnarLongsSerializer { + private final String columnName; private final SegmentWriteOutMedium segmentWriteOutMedium; private final String filenameBase; private final ByteOrder order; @@ -59,12 +60,14 @@ public class IntermediateColumnarLongsSerializer implements ColumnarLongsSeriali private ColumnarLongsSerializer delegate; IntermediateColumnarLongsSerializer( + String columnName, SegmentWriteOutMedium segmentWriteOutMedium, String filenameBase, ByteOrder order, CompressionStrategy compression ) { + this.columnName = columnName; this.segmentWriteOutMedium = segmentWriteOutMedium; this.filenameBase = filenameBase; this.order = order; @@ -92,6 +95,9 @@ public class IntermediateColumnarLongsSerializer implements ColumnarLongsSeriali } tempOut.add(value); ++numInserted; + if (numInserted < 0) { + throw new ColumnCapacityExceededException(columnName); + } if (uniqueValues.size() <= CompressionFactory.MAX_TABLE_SIZE && !uniqueValues.containsKey(value)) { uniqueValues.put(value, uniqueValues.size()); valuesAddedInOrder.add(value); @@ -127,9 +133,10 @@ public class IntermediateColumnarLongsSerializer implements ColumnarLongsSeriali } if (compression == CompressionStrategy.NONE) { - delegate = new EntireLayoutColumnarLongsSerializer(segmentWriteOutMedium, writer); + delegate = new EntireLayoutColumnarLongsSerializer(columnName, segmentWriteOutMedium, writer); } else { delegate = new BlockLayoutColumnarLongsSerializer( + columnName, segmentWriteOutMedium, filenameBase, order, diff --git a/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializer.java index ebf0c4b44cf..f6690293012 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializer.java @@ -36,6 +36,7 @@ public class V3CompressedVSizeColumnarMultiIntsSerializer extends ColumnarMultiI private static final byte VERSION = V3CompressedVSizeColumnarMultiIntsSupplier.VERSION; public static V3CompressedVSizeColumnarMultiIntsSerializer create( + final String columnName, final SegmentWriteOutMedium segmentWriteOutMedium, final String filenameBase, final int maxValue, @@ -43,7 +44,9 @@ public class V3CompressedVSizeColumnarMultiIntsSerializer extends ColumnarMultiI ) { return new V3CompressedVSizeColumnarMultiIntsSerializer( + columnName, new CompressedColumnarIntsSerializer( + columnName, segmentWriteOutMedium, filenameBase, CompressedColumnarIntsSupplier.MAX_INTS_IN_BUFFER, @@ -51,6 +54,7 @@ public class V3CompressedVSizeColumnarMultiIntsSerializer extends ColumnarMultiI compression ), new CompressedVSizeColumnarIntsSerializer( + columnName, segmentWriteOutMedium, filenameBase, maxValue, @@ -61,16 +65,19 @@ public class V3CompressedVSizeColumnarMultiIntsSerializer extends ColumnarMultiI ); } + private final String columnName; private final CompressedColumnarIntsSerializer offsetWriter; private final CompressedVSizeColumnarIntsSerializer valueWriter; private int offset; private boolean lastOffsetWritten = false; V3CompressedVSizeColumnarMultiIntsSerializer( + String columnName, CompressedColumnarIntsSerializer offsetWriter, CompressedVSizeColumnarIntsSerializer valueWriter ) { + this.columnName = columnName; this.offsetWriter = offsetWriter; this.valueWriter = valueWriter; this.offset = 0; @@ -95,6 +102,9 @@ public class V3CompressedVSizeColumnarMultiIntsSerializer extends ColumnarMultiI valueWriter.addValue(ints.get(i)); } offset += numValues; + if (offset < 0) { + throw new ColumnCapacityExceededException(columnName); + } } @Override diff --git a/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSupplier.java index 2beb43d9611..3bb934cd296 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSupplier.java @@ -24,6 +24,7 @@ import it.unimi.dsi.fastutil.ints.IntArrayList; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; +import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; import java.io.IOException; import java.nio.ByteBuffer; @@ -73,6 +74,26 @@ public class V3CompressedVSizeColumnarMultiIntsSupplier implements WritableSuppl throw new IAE("Unknown version[%s]", versionFromBuffer); } + public static V3CompressedVSizeColumnarMultiIntsSupplier fromByteBuffer(ByteBuffer buffer, ByteOrder order, SmooshedFileMapper mapper) + { + byte versionFromBuffer = buffer.get(); + + if (versionFromBuffer == VERSION) { + CompressedColumnarIntsSupplier offsetSupplier = CompressedColumnarIntsSupplier.fromByteBuffer( + buffer, + order, + mapper + ); + CompressedVSizeColumnarIntsSupplier valueSupplier = CompressedVSizeColumnarIntsSupplier.fromByteBuffer( + buffer, + order, + mapper + ); + return new V3CompressedVSizeColumnarMultiIntsSupplier(offsetSupplier, valueSupplier); + } + throw new IAE("Unknown version[%s]", versionFromBuffer); + } + @VisibleForTesting public static V3CompressedVSizeColumnarMultiIntsSupplier fromIterable( final Iterable objectsIterable, diff --git a/processing/src/main/java/org/apache/druid/segment/data/VSizeColumnarMultiIntsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/VSizeColumnarMultiIntsSerializer.java index 16fe48ddf68..088f2b5c9ee 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/VSizeColumnarMultiIntsSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/VSizeColumnarMultiIntsSerializer.java @@ -81,6 +81,7 @@ public class VSizeColumnarMultiIntsSerializer extends ColumnarMultiIntsSerialize abstract void write(WriteOutBytes out, int v) throws IOException; } + private final String columnName; private final int maxId; private final WriteInt writeInt; @@ -92,8 +93,13 @@ public class VSizeColumnarMultiIntsSerializer extends ColumnarMultiIntsSerialize private int numWritten = 0; private boolean numBytesForMaxWritten = false; - public VSizeColumnarMultiIntsSerializer(SegmentWriteOutMedium segmentWriteOutMedium, int maxId) + public VSizeColumnarMultiIntsSerializer( + String columnName, + SegmentWriteOutMedium segmentWriteOutMedium, + int maxId + ) { + this.columnName = columnName; this.segmentWriteOutMedium = segmentWriteOutMedium; this.maxId = maxId; this.writeInt = WriteInt.values()[VSizeColumnarInts.getNumBytesForMax(maxId) - 1]; @@ -120,6 +126,9 @@ public class VSizeColumnarMultiIntsSerializer extends ColumnarMultiIntsSerialize headerOut.writeInt(Ints.checkedCast(valuesOut.size())); ++numWritten; + if (numWritten < 0) { + throw new ColumnCapacityExceededException(columnName); + } } @Override diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java index 9c852ed3f32..43ea5eb0863 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java @@ -33,22 +33,27 @@ import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; +import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.apache.druid.segment.writeout.WriteOutBytes; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.io.File; +import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.List; import java.util.Random; import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; @RunWith(Parameterized.class) public class CompressedColumnarIntsSerializerTest @@ -64,6 +69,9 @@ public class CompressedColumnarIntsSerializerTest @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + @Rule + public ExpectedException expectedException = ExpectedException.none(); + public CompressedColumnarIntsSerializerTest( CompressionStrategy compressionStrategy, ByteOrder byteOrder @@ -99,6 +107,77 @@ public class CompressedColumnarIntsSerializerTest segmentWriteOutMedium.close(); } + @Test + public void testSmallData() throws Exception + { + // less than one chunk + for (int maxValue : MAX_VALUES) { + for (int chunkFactor : CHUNK_FACTORS) { + generateVals(rand.nextInt(chunkFactor), maxValue); + checkSerializedSizeAndData(chunkFactor); + } + } + } + + @Test + public void testLargeData() throws Exception + { + // more than one chunk + for (int maxValue : MAX_VALUES) { + for (int chunkFactor : CHUNK_FACTORS) { + generateVals((rand.nextInt(5) + 5) * chunkFactor + rand.nextInt(chunkFactor), maxValue); + checkSerializedSizeAndData(chunkFactor); + } + } + } + + @Test + public void testWriteEmpty() throws Exception + { + vals = new int[0]; + checkSerializedSizeAndData(2); + } + + @Test + public void testMultiValueFileLargeData() throws Exception + { + // more than one chunk + for (int maxValue : MAX_VALUES) { + for (int chunkFactor : CHUNK_FACTORS) { + generateVals((rand.nextInt(5) + 5) * chunkFactor + rand.nextInt(chunkFactor), maxValue); + checkV2SerializedSizeAndData(chunkFactor); + } + } + } + + // this test takes ~30 minutes to run + @Ignore + @Test + public void testTooManyValues() throws IOException + { + expectedException.expect(ColumnCapacityExceededException.class); + expectedException.expectMessage(ColumnCapacityExceededException.formatMessage("test")); + try ( + SegmentWriteOutMedium segmentWriteOutMedium = + TmpFileSegmentWriteOutMediumFactory.instance().makeSegmentWriteOutMedium(temporaryFolder.newFolder()) + ) { + CompressedColumnarIntsSerializer serializer = new CompressedColumnarIntsSerializer( + "test", + segmentWriteOutMedium, + "test", + CompressedColumnarIntsSupplier.MAX_INTS_IN_BUFFER, + byteOrder, + compressionStrategy + ); + serializer.open(); + + final long numRows = Integer.MAX_VALUE + 100L; + for (long i = 0L; i < numRows; i++) { + serializer.addValue(ThreadLocalRandom.current().nextInt(0, Integer.MAX_VALUE)); + } + } + } + private void generateVals(final int totalSize, final int maxValue) { vals = new int[totalSize]; @@ -112,6 +191,7 @@ public class CompressedColumnarIntsSerializerTest FileSmoosher smoosher = new FileSmoosher(temporaryFolder.newFolder()); CompressedColumnarIntsSerializer writer = new CompressedColumnarIntsSerializer( + "test", segmentWriteOutMedium, "test", chunkFactor, @@ -149,44 +229,13 @@ public class CompressedColumnarIntsSerializerTest CloseQuietly.close(columnarInts); } - @Test - public void testSmallData() throws Exception - { - // less than one chunk - for (int maxValue : MAX_VALUES) { - for (int chunkFactor : CHUNK_FACTORS) { - generateVals(rand.nextInt(chunkFactor), maxValue); - checkSerializedSizeAndData(chunkFactor); - } - } - } - - @Test - public void testLargeData() throws Exception - { - // more than one chunk - for (int maxValue : MAX_VALUES) { - for (int chunkFactor : CHUNK_FACTORS) { - generateVals((rand.nextInt(5) + 5) * chunkFactor + rand.nextInt(chunkFactor), maxValue); - checkSerializedSizeAndData(chunkFactor); - } - } - } - - @Test - public void testWriteEmpty() throws Exception - { - vals = new int[0]; - checkSerializedSizeAndData(2); - } - - private void checkV2SerializedSizeAndData(int chunkFactor) throws Exception { File tmpDirectory = FileUtils.createTempDir(StringUtils.format("CompressedIntsIndexedWriterTest_%d", chunkFactor)); FileSmoosher smoosher = new FileSmoosher(tmpDirectory); CompressedColumnarIntsSerializer writer = new CompressedColumnarIntsSerializer( + "test", segmentWriteOutMedium, chunkFactor, byteOrder, @@ -223,16 +272,4 @@ public class CompressedColumnarIntsSerializerTest CloseQuietly.close(columnarInts); mapper.close(); } - - @Test - public void testMultiValueFileLargeData() throws Exception - { - // more than one chunk - for (int maxValue : MAX_VALUES) { - for (int chunkFactor : CHUNK_FACTORS) { - generateVals((rand.nextInt(5) + 5) * chunkFactor + rand.nextInt(chunkFactor), maxValue); - checkV2SerializedSizeAndData(chunkFactor); - } - } - } } diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java new file mode 100644 index 00000000000..284aea33b75 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.data; + +import com.google.common.base.Supplier; +import com.google.common.primitives.Doubles; +import it.unimi.dsi.fastutil.ints.IntArrays; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.guava.CloseQuietly; +import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium; +import org.apache.druid.segment.writeout.SegmentWriteOutMedium; +import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.Channels; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +/** + * This is a copy-pasta of {@link CompressedFloatsSerdeTest} without {@link CompressedFloatsSerdeTest#testSupplierSerde} + * because doubles do not have a supplier serde (e.g. {@link CompressedColumnarFloatsSupplier} or + * {@link CompressedColumnarLongsSupplier}). + * + * It is not important that it remain a copy, the committer is just lazy + */ +@RunWith(Parameterized.class) +public class CompressedDoublesSerdeTest +{ + @Parameterized.Parameters(name = "{0} {1} {2}") + public static Iterable compressionStrategies() + { + List data = new ArrayList<>(); + for (CompressionStrategy strategy : CompressionStrategy.values()) { + data.add(new Object[]{strategy, ByteOrder.BIG_ENDIAN}); + data.add(new Object[]{strategy, ByteOrder.LITTLE_ENDIAN}); + } + return data; + } + + private static final double DELTA = 0.00001; + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + protected final CompressionStrategy compressionStrategy; + protected final ByteOrder order; + + private final double[] values0 = {}; + private final double[] values1 = {0, 1, 1, 0, 1, 1, 1, 1, 0, 0, 1, 1}; + private final double[] values2 = {13.2, 6.1, 0.001, 123, 12572, 123.1, 784.4, 6892.8634, 8.341111}; + private final double[] values3 = {0.001, 0.001, 0.001, 0.001, 0.001, 100, 100, 100, 100, 100}; + private final double[] values4 = {0, 0, 0, 0, 0.01, 0, 0, 0, 21.22, 0, 0, 0, 0, 0, 0}; + private final double[] values5 = {123.16, 1.12, 62.00, 462.12, 517.71, 56.54, 971.32, 824.22, 472.12, 625.26}; + private final double[] values6 = {1000000, 1000001, 1000002, 1000003, 1000004, 1000005, 1000006, 1000007, 1000008}; + private final double[] values7 = { + Double.POSITIVE_INFINITY, + Double.NEGATIVE_INFINITY, + 12378.5734, + -12718243.7496, + -93653653.1, + 12743153.385534, + 21431.414538, + 65487435436632.123, + -43734526234564.65 + }; + + public CompressedDoublesSerdeTest( + CompressionStrategy compressionStrategy, + ByteOrder order + ) + { + this.compressionStrategy = compressionStrategy; + this.order = order; + } + + @Test + public void testValueSerde() throws Exception + { + testWithValues(values0); + testWithValues(values1); + testWithValues(values2); + testWithValues(values3); + testWithValues(values4); + testWithValues(values5); + testWithValues(values6); + testWithValues(values7); + } + + @Test + public void testChunkSerde() throws Exception + { + double[] chunk = new double[10000]; + for (int i = 0; i < 10000; i++) { + chunk[i] = i; + } + testWithValues(chunk); + } + + // this test takes ~45 minutes to run + @Ignore + @Test + public void testTooManyValues() throws IOException + { + expectedException.expect(ColumnCapacityExceededException.class); + expectedException.expectMessage(ColumnCapacityExceededException.formatMessage("test")); + try ( + SegmentWriteOutMedium segmentWriteOutMedium = + TmpFileSegmentWriteOutMediumFactory.instance().makeSegmentWriteOutMedium(temporaryFolder.newFolder()) + ) { + ColumnarDoublesSerializer serializer = CompressionFactory.getDoubleSerializer( + "test", + segmentWriteOutMedium, + "test", + order, + compressionStrategy + ); + serializer.open(); + + final long numRows = Integer.MAX_VALUE + 100L; + for (long i = 0L; i < numRows; i++) { + serializer.add(ThreadLocalRandom.current().nextDouble()); + } + } + } + + public void testWithValues(double[] values) throws Exception + { + ColumnarDoublesSerializer serializer = CompressionFactory.getDoubleSerializer( + "test", + new OffHeapMemorySegmentWriteOutMedium(), + "test", + order, + compressionStrategy + ); + serializer.open(); + + for (double value : values) { + serializer.add(value); + } + Assert.assertEquals(values.length, serializer.size()); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + serializer.writeTo(Channels.newChannel(baos), null); + Assert.assertEquals(baos.size(), serializer.getSerializedSize()); + Supplier supplier = CompressedColumnarDoublesSuppliers + .fromByteBuffer(ByteBuffer.wrap(baos.toByteArray()), order); + ColumnarDoubles doubles = supplier.get(); + + assertIndexMatchesVals(doubles, values); + for (int i = 0; i < 10; i++) { + int a = (int) (ThreadLocalRandom.current().nextDouble() * values.length); + int b = (int) (ThreadLocalRandom.current().nextDouble() * values.length); + int start = a < b ? a : b; + int end = a < b ? b : a; + tryFill(doubles, values, start, end - start); + } + testConcurrentThreadReads(supplier, doubles, values); + + doubles.close(); + } + + private void tryFill(ColumnarDoubles indexed, double[] vals, final int startIndex, final int size) + { + double[] filled = new double[size]; + indexed.get(filled, startIndex, filled.length); + + for (int i = startIndex; i < filled.length; i++) { + Assert.assertEquals(vals[i + startIndex], filled[i], DELTA); + } + } + + private void assertIndexMatchesVals(ColumnarDoubles indexed, double[] vals) + { + Assert.assertEquals(vals.length, indexed.size()); + + // sequential access + int[] indices = new int[vals.length]; + for (int i = 0; i < indexed.size(); ++i) { + Assert.assertEquals(vals[i], indexed.get(i), DELTA); + indices[i] = i; + } + + // random access, limited to 1000 elements for large lists (every element would take too long) + IntArrays.shuffle(indices, ThreadLocalRandom.current()); + final int limit = Math.min(indexed.size(), 1000); + for (int i = 0; i < limit; ++i) { + int k = indices[i]; + Assert.assertEquals(vals[k], indexed.get(k), DELTA); + } + } + + // This test attempts to cause a race condition with the DirectByteBuffers, it's non-deterministic in causing it, + // which sucks but I can't think of a way to deterministically cause it... + private void testConcurrentThreadReads( + final Supplier supplier, + final ColumnarDoubles indexed, + final double[] vals + ) throws Exception + { + final AtomicReference reason = new AtomicReference("none"); + + final int numRuns = 1000; + final CountDownLatch startLatch = new CountDownLatch(1); + final CountDownLatch stopLatch = new CountDownLatch(2); + final AtomicBoolean failureHappened = new AtomicBoolean(false); + new Thread(new Runnable() + { + @Override + public void run() + { + try { + startLatch.await(); + } + catch (InterruptedException e) { + failureHappened.set(true); + reason.set("interrupt."); + stopLatch.countDown(); + return; + } + + try { + for (int i = 0; i < numRuns; ++i) { + for (int j = 0; j < indexed.size(); ++j) { + final double val = vals[j]; + final double indexedVal = indexed.get(j); + if (Doubles.compare(val, indexedVal) != 0) { + failureHappened.set(true); + reason.set(StringUtils.format("Thread1[%d]: %f != %f", j, val, indexedVal)); + stopLatch.countDown(); + return; + } + } + } + } + catch (Exception e) { + e.printStackTrace(); + failureHappened.set(true); + reason.set(e.getMessage()); + } + + stopLatch.countDown(); + } + }).start(); + + final ColumnarDoubles indexed2 = supplier.get(); + try { + new Thread(new Runnable() + { + @Override + public void run() + { + try { + startLatch.await(); + } + catch (InterruptedException e) { + stopLatch.countDown(); + return; + } + + try { + for (int i = 0; i < numRuns; ++i) { + for (int j = indexed2.size() - 1; j >= 0; --j) { + final double val = vals[j]; + final double indexedVal = indexed2.get(j); + if (Doubles.compare(val, indexedVal) != 0) { + failureHappened.set(true); + reason.set(StringUtils.format("Thread2[%d]: %f != %f", j, val, indexedVal)); + stopLatch.countDown(); + return; + } + } + } + } + catch (Exception e) { + e.printStackTrace(); + reason.set(e.getMessage()); + failureHappened.set(true); + } + + stopLatch.countDown(); + } + }).start(); + + startLatch.countDown(); + + stopLatch.await(); + } + finally { + CloseQuietly.close(indexed2); + } + + if (failureHappened.get()) { + Assert.fail("Failure happened. Reason: " + reason.get()); + } + } +} diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java index 4b78b5295ec..d11c089abf2 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java @@ -25,8 +25,14 @@ import it.unimi.dsi.fastutil.ints.IntArrays; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium; +import org.apache.druid.segment.writeout.SegmentWriteOutMedium; +import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -58,6 +64,12 @@ public class CompressedFloatsSerdeTest private static final double DELTA = 0.00001; + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + protected final CompressionStrategy compressionStrategy; protected final ByteOrder order; @@ -105,9 +117,37 @@ public class CompressedFloatsSerdeTest testWithValues(chunk); } + // this test takes ~30 minutes to run + @Ignore + @Test + public void testTooManyValues() throws IOException + { + expectedException.expect(ColumnCapacityExceededException.class); + expectedException.expectMessage(ColumnCapacityExceededException.formatMessage("test")); + try ( + SegmentWriteOutMedium segmentWriteOutMedium = + TmpFileSegmentWriteOutMediumFactory.instance().makeSegmentWriteOutMedium(temporaryFolder.newFolder()) + ) { + ColumnarFloatsSerializer serializer = CompressionFactory.getFloatSerializer( + "test", + segmentWriteOutMedium, + "test", + order, + compressionStrategy + ); + serializer.open(); + + final long numRows = Integer.MAX_VALUE + 100L; + for (long i = 0L; i < numRows; i++) { + serializer.add(ThreadLocalRandom.current().nextFloat()); + } + } + } + public void testWithValues(float[] values) throws Exception { ColumnarFloatsSerializer serializer = CompressionFactory.getFloatSerializer( + "test", new OffHeapMemorySegmentWriteOutMedium(), "test", order, diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java index ca6dd69bd70..1186d32a363 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java @@ -95,6 +95,7 @@ public class CompressedLongsAutoEncodingSerdeTest public void testValues(long[] values) throws Exception { ColumnarLongsSerializer serializer = CompressionFactory.getLongSerializer( + "test", new OffHeapMemorySegmentWriteOutMedium(), "test", order, diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java index 0fde252188f..675c49420cb 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java @@ -25,8 +25,14 @@ import it.unimi.dsi.fastutil.ints.IntArrays; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium; +import org.apache.druid.segment.writeout.SegmentWriteOutMedium; +import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -58,6 +64,12 @@ public class CompressedLongsSerdeTest return data; } + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + protected final CompressionFactory.LongEncodingStrategy encodingStrategy; protected final CompressionStrategy compressionStrategy; protected final ByteOrder order; @@ -121,6 +133,38 @@ public class CompressedLongsSerdeTest testWithValues(chunk); } + // this test takes ~50 minutes to run (even skipping 'auto') + @Ignore + @Test + public void testTooManyValues() throws IOException + { + // uncomment this if 'auto' encoded long unbounded heap usage gets put in check and this can actually pass + if (encodingStrategy.equals(CompressionFactory.LongEncodingStrategy.AUTO)) { + return; + } + expectedException.expect(ColumnCapacityExceededException.class); + expectedException.expectMessage(ColumnCapacityExceededException.formatMessage("test")); + try ( + SegmentWriteOutMedium segmentWriteOutMedium = + TmpFileSegmentWriteOutMediumFactory.instance().makeSegmentWriteOutMedium(temporaryFolder.newFolder()) + ) { + ColumnarLongsSerializer serializer = CompressionFactory.getLongSerializer( + "test", + segmentWriteOutMedium, + "test", + order, + encodingStrategy, + compressionStrategy + ); + serializer.open(); + + final long numRows = Integer.MAX_VALUE + 100L; + for (long i = 0L; i < numRows; i++) { + serializer.add(ThreadLocalRandom.current().nextLong()); + } + } + } + public void testWithValues(long[] values) throws Exception { testValues(values); @@ -130,6 +174,7 @@ public class CompressedLongsSerdeTest public void testValues(long[] values) throws Exception { ColumnarLongsSerializer serializer = CompressionFactory.getLongSerializer( + "test", new OffHeapMemorySegmentWriteOutMedium(), "test", order, diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java index 150dd0d8955..5ba842013bb 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java @@ -32,22 +32,27 @@ import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; +import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.apache.druid.segment.writeout.WriteOutBytes; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.io.File; +import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.List; import java.util.Random; import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; @RunWith(Parameterized.class) public class CompressedVSizeColumnarIntsSerializerTest @@ -62,6 +67,9 @@ public class CompressedVSizeColumnarIntsSerializerTest @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + @Rule + public ExpectedException expectedException = ExpectedException.none(); + public CompressedVSizeColumnarIntsSerializerTest( CompressionStrategy compressionStrategy, ByteOrder byteOrder @@ -108,8 +116,9 @@ public class CompressedVSizeColumnarIntsSerializerTest private void checkSerializedSizeAndData(int chunkSize) throws Exception { FileSmoosher smoosher = new FileSmoosher(temporaryFolder.newFolder()); - + final String columnName = "test"; CompressedVSizeColumnarIntsSerializer writer = new CompressedVSizeColumnarIntsSerializer( + columnName, segmentWriteOutMedium, "test", vals.length > 0 ? Ints.max(vals) : 0, @@ -170,6 +179,44 @@ public class CompressedVSizeColumnarIntsSerializerTest } } + + // this test takes ~18 minutes to run + @Ignore + @Test + public void testTooManyValues() throws IOException + { + final int maxValue = 0x0FFFFFFF; + final int maxChunkSize = CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue); + expectedException.expect(ColumnCapacityExceededException.class); + expectedException.expectMessage(ColumnCapacityExceededException.formatMessage("test")); + try ( + SegmentWriteOutMedium segmentWriteOutMedium = + TmpFileSegmentWriteOutMediumFactory.instance().makeSegmentWriteOutMedium(temporaryFolder.newFolder()) + ) { + GenericIndexedWriter genericIndexed = GenericIndexedWriter.ofCompressedByteBuffers( + segmentWriteOutMedium, + "test", + compressionStrategy, + Long.BYTES * 10000 + ); + CompressedVSizeColumnarIntsSerializer serializer = new CompressedVSizeColumnarIntsSerializer( + "test", + segmentWriteOutMedium, + maxValue, + maxChunkSize, + byteOrder, + compressionStrategy, + genericIndexed + ); + serializer.open(); + + final long numRows = Integer.MAX_VALUE + 100L; + for (long i = 0L; i < numRows; i++) { + serializer.addValue(ThreadLocalRandom.current().nextInt(0, Integer.MAX_VALUE)); + } + } + } + @Test public void testEmpty() throws Exception { @@ -181,7 +228,7 @@ public class CompressedVSizeColumnarIntsSerializerTest { File tmpDirectory = temporaryFolder.newFolder(); FileSmoosher smoosher = new FileSmoosher(tmpDirectory); - + final String columnName = "test"; GenericIndexedWriter genericIndexed = GenericIndexedWriter.ofCompressedByteBuffers( segmentWriteOutMedium, "test", @@ -189,6 +236,7 @@ public class CompressedVSizeColumnarIntsSerializerTest Long.BYTES * 10000 ); CompressedVSizeColumnarIntsSerializer writer = new CompressedVSizeColumnarIntsSerializer( + columnName, segmentWriteOutMedium, vals.length > 0 ? Ints.max(vals) : 0, chunkSize, diff --git a/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java b/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java index bb9f0b27902..de6485fff1f 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java @@ -32,11 +32,14 @@ import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; +import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.apache.druid.segment.writeout.WriteOutBytes; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -54,6 +57,7 @@ import java.util.stream.IntStream; @RunWith(Parameterized.class) public class V3CompressedVSizeColumnarMultiIntsSerializerTest { + private static final String TEST_COLUMN_NAME = "test"; private static final int[] OFFSET_CHUNK_FACTORS = new int[]{ 1, 2, @@ -69,6 +73,9 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + @Rule + public ExpectedException expectedException = ExpectedException.none(); + public V3CompressedVSizeColumnarMultiIntsSerializerTest( CompressionStrategy compressionStrategy, ByteOrder byteOrder @@ -92,19 +99,101 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest ); } - private void generateVals(final int totalSize, final int maxValue) + @Before + public void setUp() { - vals = new ArrayList<>(totalSize); - for (int i = 0; i < totalSize; ++i) { - int len = rand.nextInt(2) + 1; - int[] subVals = new int[len]; - for (int j = 0; j < len; ++j) { - subVals[j] = rand.nextInt(maxValue); + vals = null; + } + + @Test + public void testSmallData() throws Exception + { + // less than one chunk + for (int offsetChunk : OFFSET_CHUNK_FACTORS) { + for (int maxValue : MAX_VALUES) { + final int valueChunk = CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue); + generateVals(rand.nextInt(valueChunk), maxValue, 2); + checkSerializedSizeAndData(offsetChunk, valueChunk); } - vals.add(subVals); } } + @Test + public void testLargeData() throws Exception + { + // more than one chunk + for (int offsetChunk : OFFSET_CHUNK_FACTORS) { + for (int maxValue : MAX_VALUES) { + final int valueChunk = CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue); + generateVals((rand.nextInt(2) + 1) * valueChunk + rand.nextInt(valueChunk), maxValue, 2); + checkSerializedSizeAndData(offsetChunk, valueChunk); + } + } + } + + @Test + public void testEmpty() throws Exception + { + vals = new ArrayList<>(); + checkSerializedSizeAndData(1, 2); + } + + @Test + public void testMultiValueFileLargeData() throws Exception + { + // more than one chunk + for (int offsetChunk : OFFSET_CHUNK_FACTORS) { + for (int maxValue : MAX_VALUES) { + final int valueChunk = CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue); + generateVals((rand.nextInt(2) + 1) * valueChunk + rand.nextInt(valueChunk), maxValue, 2); + checkV2SerializedSizeAndData(offsetChunk, valueChunk); + } + } + } + + // this test takes ~30 minutes to run + @Ignore + @Test + public void testTooManyValues() throws Exception + { + expectedException.expect(ColumnCapacityExceededException.class); + expectedException.expectMessage(ColumnCapacityExceededException.formatMessage("test")); + // more than one chunk + final int offsetChunk = CompressedColumnarIntsSupplier.MAX_INTS_IN_BUFFER; + final int maxValue = 0x0FFFFFFF; + final int valueChunk = CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue); + final int numRows = 10000000; + final int maxValuesPerRow = 1000; + generateV2SerializedSizeAndData(numRows, maxValue, maxValuesPerRow, offsetChunk, valueChunk); + } + + private void generateVals(final int rowCount, final int maxValue, final int numValuesPerRow) + { + vals = new ArrayList<>(rowCount); + for (int i = 0; i < rowCount; ++i) { + vals.add(generateRow(rand, maxValue, numValuesPerRow)); + } + } + + private int[] generateRow(Random rand, final int maxValue, final int numValuesPerRow) + { + int len = rand.nextInt(numValuesPerRow) + 1; + int[] subVals = new int[len]; + for (int j = 0; j < len; ++j) { + subVals[j] = rand.nextInt(maxValue); + } + return subVals; + } + + private int getMaxValue(final List vals) + { + return vals + .stream() + .mapToInt(array -> IntStream.of(array).max().orElse(0)) + .max() + .orElseThrow(NoSuchElementException::new); + } + private void checkSerializedSizeAndData(int offsetChunkFactor, int valueChunkFactor) throws Exception { FileSmoosher smoosher = new FileSmoosher(temporaryFolder.newFolder()); @@ -112,6 +201,7 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest try (SegmentWriteOutMedium segmentWriteOutMedium = new OffHeapMemorySegmentWriteOutMedium()) { int maxValue = vals.size() > 0 ? getMaxValue(vals) : 0; CompressedColumnarIntsSerializer offsetWriter = new CompressedColumnarIntsSerializer( + TEST_COLUMN_NAME, segmentWriteOutMedium, "offset", offsetChunkFactor, @@ -119,6 +209,7 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest compressionStrategy ); CompressedVSizeColumnarIntsSerializer valueWriter = new CompressedVSizeColumnarIntsSerializer( + TEST_COLUMN_NAME, segmentWriteOutMedium, "value", maxValue, @@ -127,7 +218,7 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest compressionStrategy ); V3CompressedVSizeColumnarMultiIntsSerializer writer = - new V3CompressedVSizeColumnarMultiIntsSerializer(offsetWriter, valueWriter); + new V3CompressedVSizeColumnarMultiIntsSerializer(TEST_COLUMN_NAME, offsetWriter, valueWriter); V3CompressedVSizeColumnarMultiIntsSupplier supplierFromIterable = V3CompressedVSizeColumnarMultiIntsSupplier.fromIterable( Iterables.transform(vals, ArrayBasedIndexedInts::new), @@ -167,54 +258,6 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest } } - private int getMaxValue(final List vals) - { - return vals - .stream() - .mapToInt(array -> IntStream.of(array).max().orElse(0)) - .max() - .orElseThrow(NoSuchElementException::new); - } - - @Before - public void setUp() - { - vals = null; - } - - @Test - public void testSmallData() throws Exception - { - // less than one chunk - for (int offsetChunk : OFFSET_CHUNK_FACTORS) { - for (int maxValue : MAX_VALUES) { - final int valueChunk = CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue); - generateVals(rand.nextInt(valueChunk), maxValue); - checkSerializedSizeAndData(offsetChunk, valueChunk); - } - } - } - - @Test - public void testLargeData() throws Exception - { - // more than one chunk - for (int offsetChunk : OFFSET_CHUNK_FACTORS) { - for (int maxValue : MAX_VALUES) { - final int valueChunk = CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue); - generateVals((rand.nextInt(2) + 1) * valueChunk + rand.nextInt(valueChunk), maxValue); - checkSerializedSizeAndData(offsetChunk, valueChunk); - } - } - } - - @Test - public void testEmpty() throws Exception - { - vals = new ArrayList<>(); - checkSerializedSizeAndData(1, 2); - } - private void checkV2SerializedSizeAndData(int offsetChunkFactor, int valueChunkFactor) throws Exception { File tmpDirectory = FileUtils.createTempDir(StringUtils.format( @@ -227,6 +270,7 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest try (SegmentWriteOutMedium segmentWriteOutMedium = new OffHeapMemorySegmentWriteOutMedium()) { CompressedColumnarIntsSerializer offsetWriter = new CompressedColumnarIntsSerializer( + TEST_COLUMN_NAME, segmentWriteOutMedium, offsetChunkFactor, byteOrder, @@ -246,6 +290,7 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest Long.BYTES * 250000 ); CompressedVSizeColumnarIntsSerializer valueWriter = new CompressedVSizeColumnarIntsSerializer( + TEST_COLUMN_NAME, segmentWriteOutMedium, maxValue, valueChunkFactor, @@ -254,7 +299,7 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest genericIndexed ); V3CompressedVSizeColumnarMultiIntsSerializer writer = - new V3CompressedVSizeColumnarMultiIntsSerializer(offsetWriter, valueWriter); + new V3CompressedVSizeColumnarMultiIntsSerializer(TEST_COLUMN_NAME, offsetWriter, valueWriter); writer.open(); for (int[] val : vals) { writer.addValues(new ArrayBasedIndexedInts(val)); @@ -282,16 +327,76 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest } } - @Test - public void testMultiValueFileLargeData() throws Exception + private void generateV2SerializedSizeAndData(long numRows, int maxValue, int maxValuesPerRow, int offsetChunkFactor, int valueChunkFactor) throws Exception { - // more than one chunk - for (int offsetChunk : OFFSET_CHUNK_FACTORS) { - for (int maxValue : MAX_VALUES) { - final int valueChunk = CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue); - generateVals((rand.nextInt(2) + 1) * valueChunk + rand.nextInt(valueChunk), maxValue); - checkV2SerializedSizeAndData(offsetChunk, valueChunk); + File tmpDirectory = FileUtils.createTempDir(StringUtils.format( + "CompressedVSizeIndexedV3WriterTest_%d_%d", + offsetChunkFactor, + offsetChunkFactor + )); + FileSmoosher smoosher = new FileSmoosher(tmpDirectory); + + try ( + SegmentWriteOutMedium segmentWriteOutMedium = + TmpFileSegmentWriteOutMediumFactory.instance().makeSegmentWriteOutMedium(temporaryFolder.newFolder()) + ) { + CompressedColumnarIntsSerializer offsetWriter = new CompressedColumnarIntsSerializer( + TEST_COLUMN_NAME, + segmentWriteOutMedium, + offsetChunkFactor, + byteOrder, + compressionStrategy, + GenericIndexedWriter.ofCompressedByteBuffers( + segmentWriteOutMedium, + "offset", + compressionStrategy, + Long.BYTES * 250000 + ) + ); + + GenericIndexedWriter genericIndexed = GenericIndexedWriter.ofCompressedByteBuffers( + segmentWriteOutMedium, + "value", + compressionStrategy, + Long.BYTES * 250000 + ); + CompressedVSizeColumnarIntsSerializer valueWriter = new CompressedVSizeColumnarIntsSerializer( + TEST_COLUMN_NAME, + segmentWriteOutMedium, + maxValue, + valueChunkFactor, + byteOrder, + compressionStrategy, + genericIndexed + ); + V3CompressedVSizeColumnarMultiIntsSerializer writer = + new V3CompressedVSizeColumnarMultiIntsSerializer(TEST_COLUMN_NAME, offsetWriter, valueWriter); + writer.open(); + for (long l = 0L; l < numRows; l++) { + writer.addValues(new ArrayBasedIndexedInts(generateRow(rand, maxValue, maxValuesPerRow))); } + + final SmooshedWriter channel = smoosher.addWithSmooshedWriter("test", writer.getSerializedSize()); + writer.writeTo(channel, smoosher); + channel.close(); + smoosher.close(); + SmooshedFileMapper mapper = Smoosh.map(tmpDirectory); + + V3CompressedVSizeColumnarMultiIntsSupplier supplierFromByteBuffer = + V3CompressedVSizeColumnarMultiIntsSupplier.fromByteBuffer(mapper.mapFile("test"), byteOrder, mapper); + ColumnarMultiInts columnarMultiInts = supplierFromByteBuffer.get(); + Assert.assertEquals(columnarMultiInts.size(), numRows); + Random verifier = new Random(0); + for (int i = 0; i < numRows; ++i) { + IndexedInts subVals = columnarMultiInts.get(i); + int[] expected = generateRow(verifier, maxValue, maxValuesPerRow); + Assert.assertEquals(subVals.size(), expected.length); + for (int j = 0, size = subVals.size(); j < size; ++j) { + Assert.assertEquals(subVals.get(j), expected[j]); + } + } + CloseQuietly.close(columnarMultiInts); + mapper.close(); } } }