From c0e6d1c7f81c8e19fdcf6f1bd2c11fd3eb926bb0 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 26 Mar 2021 18:39:13 -0700 Subject: [PATCH] vectorize 'auto' long decoding (#11004) * Vectorize LongDeserializers. Also, add many more tests. * more faster * more more faster * more cleanup * fixes * forbidden * benchmark style * idk why * adjust * add preconditions for value >= 0 for writers * add 64 bit exception Co-authored-by: Gian Merlino --- .../BaseColumnarLongsBenchmark.java | 211 ++++++++ ...seColumnarLongsFromGeneratorBenchmark.java | 395 ++++++++++++++ ...aseColumnarLongsFromSegmentsBenchmark.java | 88 +++ ...LongsEncodeDataFromGeneratorBenchmark.java | 135 +++++ ...arLongsEncodeDataFromSegmentBenchmark.java | 165 ++++++ ...LongsSelectRowsFromGeneratorBenchmark.java | 169 ++++++ ...arLongsSelectRowsFromSegmentBenchmark.java | 164 ++++++ .../CompressedColumnarIntsBenchmark.java | 2 +- ...ressedVSizeColumnarMultiIntsBenchmark.java | 2 +- .../compression/EncodingSizeProfiler.java | 76 +++ .../FloatCompressionBenchmark.java | 2 +- ...loatCompressionBenchmarkFileGenerator.java | 2 +- .../LongCompressionBenchmark.java | 23 +- ...LongCompressionBenchmarkFileGenerator.java | 2 +- .../VSizeSerdeBenchmark.java | 2 +- .../segment/data/CompressionFactory.java | 21 +- .../segment/data/DeltaLongEncodingReader.java | 12 + .../segment/data/LongsLongEncodingReader.java | 40 +- .../segment/data/TableLongEncodingReader.java | 12 + .../druid/segment/data/VSizeLongSerde.java | 511 +++++++++++++++++- .../generator/ColumnValueGenerator.java | 9 +- .../data/CompressedLongsSerdeTest.java | 6 + .../segment/data/VSizeLongSerdeTest.java | 400 +++++++++++--- 23 files changed, 2300 insertions(+), 149 deletions(-) create mode 100644 benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java create mode 100644 benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsFromGeneratorBenchmark.java create mode 100644 benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsFromSegmentsBenchmark.java create mode 100644 benchmarks/src/test/java/org/apache/druid/benchmark/compression/ColumnarLongsEncodeDataFromGeneratorBenchmark.java create mode 100644 benchmarks/src/test/java/org/apache/druid/benchmark/compression/ColumnarLongsEncodeDataFromSegmentBenchmark.java create mode 100644 benchmarks/src/test/java/org/apache/druid/benchmark/compression/ColumnarLongsSelectRowsFromGeneratorBenchmark.java create mode 100644 benchmarks/src/test/java/org/apache/druid/benchmark/compression/ColumnarLongsSelectRowsFromSegmentBenchmark.java rename benchmarks/src/test/java/org/apache/druid/benchmark/{ => compression}/CompressedColumnarIntsBenchmark.java (99%) rename benchmarks/src/test/java/org/apache/druid/benchmark/{ => compression}/CompressedVSizeColumnarMultiIntsBenchmark.java (99%) create mode 100644 benchmarks/src/test/java/org/apache/druid/benchmark/compression/EncodingSizeProfiler.java rename benchmarks/src/test/java/org/apache/druid/benchmark/{ => compression}/FloatCompressionBenchmark.java (98%) rename benchmarks/src/test/java/org/apache/druid/benchmark/{ => compression}/FloatCompressionBenchmarkFileGenerator.java (99%) rename benchmarks/src/test/java/org/apache/druid/benchmark/{ => compression}/LongCompressionBenchmark.java (84%) rename benchmarks/src/test/java/org/apache/druid/benchmark/{ => compression}/LongCompressionBenchmarkFileGenerator.java (99%) rename benchmarks/src/test/java/org/apache/druid/benchmark/{ => compression}/VSizeSerdeBenchmark.java (99%) diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java new file mode 100644 index 00000000000..9093171ff9e --- /dev/null +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.benchmark.compression; + +import org.apache.druid.collections.bitmap.WrappedImmutableRoaringBitmap; +import org.apache.druid.java.util.common.RE; +import org.apache.druid.segment.data.ColumnarLongs; +import org.apache.druid.segment.data.ColumnarLongsSerializer; +import org.apache.druid.segment.data.CompressedColumnarLongsSupplier; +import org.apache.druid.segment.data.CompressionFactory; +import org.apache.druid.segment.data.CompressionStrategy; +import org.apache.druid.segment.vector.BitmapVectorOffset; +import org.apache.druid.segment.vector.NoFilterVectorOffset; +import org.apache.druid.segment.vector.VectorOffset; +import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMedium; +import org.apache.druid.segment.writeout.SegmentWriteOutMedium; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.roaringbitmap.buffer.MutableRoaringBitmap; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.FileChannel; +import java.util.BitSet; +import java.util.List; +import java.util.Map; +import java.util.Random; + +@State(Scope.Benchmark) +public class BaseColumnarLongsBenchmark +{ + static final int VECTOR_SIZE = 512; + + /** + * Name of the long encoding strategy. For longs, this is a composite of both byte level block compression and + * encoding of values within the block. + */ + @Param({ + "lz4-longs", + "lz4-auto" + }) + String encoding; + + Random rand = new Random(0); + + long[] vals; + + long minValue; + long maxValue; + + @Nullable + BitSet filter; + + VectorOffset vectorOffset; + + void setupFilters(int rows, double filteredRowCountPercentage) + { + // todo: filter set distributions to simulate different select patterns? + // (because benchmarks don't take long enough already..) + filter = null; + final int filteredRowCount = (int) Math.floor(rows * filteredRowCountPercentage); + + if (filteredRowCount < rows) { + // setup bitset filter + filter = new BitSet(); + MutableRoaringBitmap bitmap = new MutableRoaringBitmap(); + for (int i = 0; i < filteredRowCount; i++) { + int rowToAccess = rand.nextInt(rows); + // Skip already selected rows if any + while (filter.get(rowToAccess)) { + rowToAccess = rand.nextInt(rows); + } + filter.set(rowToAccess); + bitmap.add(rowToAccess); + } + vectorOffset = new BitmapVectorOffset( + VECTOR_SIZE, + new WrappedImmutableRoaringBitmap(bitmap.toImmutableRoaringBitmap()), + 0, + rows + ); + } else { + vectorOffset = new NoFilterVectorOffset(VECTOR_SIZE, 0, rows); + } + } + + static int encodeToFile(long[] vals, String encoding, FileChannel output)throws IOException + { + SegmentWriteOutMedium writeOutMedium = new OnHeapMemorySegmentWriteOutMedium(); + + ColumnarLongsSerializer serializer; + switch (encoding) { + case "lz4-longs": + serializer = CompressionFactory.getLongSerializer( + encoding, + writeOutMedium, + "lz4-longs", + ByteOrder.LITTLE_ENDIAN, + CompressionFactory.LongEncodingStrategy.LONGS, + CompressionStrategy.LZ4 + ); + break; + case "lz4-auto": + serializer = CompressionFactory.getLongSerializer( + encoding, + writeOutMedium, + "lz4-auto", + ByteOrder.LITTLE_ENDIAN, + CompressionFactory.LongEncodingStrategy.AUTO, + CompressionStrategy.LZ4 + ); + break; + case "none-longs": + serializer = CompressionFactory.getLongSerializer( + encoding, + writeOutMedium, + "none-longs", + ByteOrder.LITTLE_ENDIAN, + CompressionFactory.LongEncodingStrategy.LONGS, + CompressionStrategy.NONE + ); + break; + case "none-auto": + serializer = CompressionFactory.getLongSerializer( + encoding, + writeOutMedium, + "none-auto", + ByteOrder.LITTLE_ENDIAN, + CompressionFactory.LongEncodingStrategy.AUTO, + CompressionStrategy.NONE + ); + break; + default: + throw new RuntimeException("unknown encoding"); + } + + serializer.open(); + for (long val : vals) { + serializer.add(val); + } + serializer.writeTo(output, null); + return (int) serializer.getSerializedSize(); + } + + static ColumnarLongs createColumnarLongs(String encoding, ByteBuffer buffer) + { + switch (encoding) { + case "lz4-longs": + case "lz4-auto": + case "none-auto": + case "none-longs": + return CompressedColumnarLongsSupplier.fromByteBuffer(buffer, ByteOrder.LITTLE_ENDIAN).get(); + } + + throw new IllegalArgumentException("unknown encoding"); + } + + + // for testing encodings: validate that all encoders read the same values + // noinspection unused + static void checkSanity(Map encoders, List encodings, int rows) + { + for (int i = 0; i < rows; i++) { + checkRowSanity(encoders, encodings, i); + } + } + + static void checkRowSanity(Map encoders, List encodings, int row) + { + if (encodings.size() > 1) { + for (int i = 0; i < encodings.size() - 1; i++) { + String currentKey = encodings.get(i); + String nextKey = encodings.get(i + 1); + ColumnarLongs current = encoders.get(currentKey); + ColumnarLongs next = encoders.get(nextKey); + long vCurrent = current.get(row); + long vNext = next.get(row); + if (vCurrent != vNext) { + throw new RE( + "values do not match at row %s - %s:%s %s:%s", + row, + currentKey, + vCurrent, + nextKey, + vNext + ); + } + } + } + } +} diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsFromGeneratorBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsFromGeneratorBenchmark.java new file mode 100644 index 00000000000..5e1c85e534b --- /dev/null +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsFromGeneratorBenchmark.java @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.benchmark.compression; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.generator.ColumnValueGenerator; +import org.apache.druid.segment.generator.GeneratorColumnSchema; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; + +import java.io.File; +import java.util.List; + +@State(Scope.Benchmark) +public class BaseColumnarLongsFromGeneratorBenchmark extends BaseColumnarLongsBenchmark +{ + static int SEED = 1; + + /** + * Controls the probability that any generated value will be a zero, to simulate sparely populated columns + */ + @Param({ + "0.0", + "0.25", + "0.5", + "0.75", + "0.95" + }) + double zeroProbability; + + /** + * Number of rows generated for the value distribution + */ + @Param({"5000000"}) + int rows; + + /** + * Value distributions to simulate various patterns of long column + */ + @Param({ + "enumerated-0-1", + "enumerated-full", + "normal-1-32", + "normal-40-1000", + "sequential-1000", + "sequential-unique", + "uniform-1", + "uniform-2", + "uniform-3", + "uniform-4", + "uniform-8", + "uniform-12", + "uniform-16", + "uniform-20", + "uniform-24", + "uinform-32", + "uniform-40", + "uniform-48", + "uniform-56", + "uniform-64", + "zipf-low-100", + "zipf-low-100000", + "zipf-low-32-bit", + "zipf-high-100", + "zipf-high-100000", + "zipf-high-32-bit" + }) + String distribution; + + + static ColumnValueGenerator makeGenerator( + String distribution, + int rows, + double zeroProbability + ) + { + List enumerated; + List probability; + switch (distribution) { + case "enumerated-0-1": + enumerated = ImmutableList.of(0, 1); + probability = ImmutableList.of(0.6, 0.4); + return GeneratorColumnSchema.makeEnumerated( + distribution, + ValueType.LONG, + true, + 1, + zeroProbability, + enumerated, + probability + ).makeGenerator(SEED); + case "enumerated-full": + enumerated = ImmutableList.of( + 0, + 1, + Long.MAX_VALUE - 1, + Long.MIN_VALUE + 1, + Long.MIN_VALUE / 2, + Long.MAX_VALUE / 2 + ); + probability = ImmutableList.of(0.4, 0.2, 0.1, 0.1, 0.1, 0.1); + return GeneratorColumnSchema.makeEnumerated( + distribution, + ValueType.LONG, + true, + 1, + zeroProbability, + enumerated, + probability + ).makeGenerator(SEED); + case "normal-1-32": + return GeneratorColumnSchema.makeNormal( + distribution, + ValueType.LONG, + true, + 1, + zeroProbability, + 1.0, + (double) (1L << 32), + true + ).makeGenerator(SEED); + case "normal-40-1000": + return GeneratorColumnSchema.makeNormal( + distribution, + ValueType.LONG, + true, + 1, + zeroProbability, + (double) (1L << 40), + 1000.0, + true + ).makeGenerator(SEED); + case "sequential-1000": + return GeneratorColumnSchema.makeSequential( + distribution, + ValueType.LONG, + true, + 1, + zeroProbability, + Integer.MAX_VALUE - 1001, + Integer.MAX_VALUE - 1 + ).makeGenerator(SEED); + case "sequential-unique": + return GeneratorColumnSchema.makeSequential( + distribution, + ValueType.LONG, + true, + 1, + zeroProbability, + 0, + rows + ).makeGenerator(SEED); + case "uniform-1": + return GeneratorColumnSchema.makeDiscreteUniform( + distribution, + ValueType.LONG, + true, + 1, + zeroProbability, + 0, + 1 + ).makeGenerator(SEED); + case "uniform-2": + return GeneratorColumnSchema.makeDiscreteUniform( + distribution, + ValueType.LONG, + true, + 1, + zeroProbability, + 0, + 4 + ).makeGenerator(SEED); + case "uniform-3": + return GeneratorColumnSchema.makeDiscreteUniform( + distribution, + ValueType.LONG, + true, + 1, + zeroProbability, + 1000000, + 1000008 + ).makeGenerator(SEED); + case "uniform-4": + return GeneratorColumnSchema.makeDiscreteUniform( + distribution, + ValueType.LONG, + true, + 1, + zeroProbability, + 0, + 1 << 4 + ).makeGenerator(SEED); + case "uniform-8": + return GeneratorColumnSchema.makeDiscreteUniform( + distribution, + ValueType.LONG, + true, + 1, + zeroProbability, + 0, + 1 << 8 + ).makeGenerator(SEED); + case "uniform-12": + return GeneratorColumnSchema.makeDiscreteUniform( + distribution, + ValueType.LONG, + true, + 1, + zeroProbability, + 0, + 1 << 12 + ).makeGenerator(SEED); + case "uniform-16": + return GeneratorColumnSchema.makeDiscreteUniform( + distribution, + ValueType.LONG, + true, + 1, + zeroProbability, + 0, + 1 << 16 + ).makeGenerator(SEED); + case "uniform-20": + return GeneratorColumnSchema.makeContinuousUniform( + distribution, + ValueType.LONG, + true, + 1, + zeroProbability, + 0, + 1 << 20 + ).makeGenerator(SEED); + case "uniform-24": + return GeneratorColumnSchema.makeContinuousUniform( + distribution, + ValueType.LONG, + true, + 1, + zeroProbability, + 0, + (1 << 24) - 1 + ).makeGenerator(SEED); + case "uinform-32": + return GeneratorColumnSchema.makeContinuousUniform( + distribution, + ValueType.LONG, + true, + 1, + zeroProbability, + 0, + Integer.MAX_VALUE - 1 + ).makeGenerator(SEED); + case "uniform-40": + return GeneratorColumnSchema.makeContinuousUniform( + distribution, + ValueType.LONG, + true, + 1, + zeroProbability, + 0L, + (1L << 40) - 1 + ).makeGenerator(SEED); + case "uniform-48": + return GeneratorColumnSchema.makeContinuousUniform( + distribution, + ValueType.LONG, + true, + 1, + zeroProbability, + 0, + (1L << 48) - 1 + ).makeGenerator(SEED); + case "uniform-56": + return GeneratorColumnSchema.makeContinuousUniform( + distribution, + ValueType.LONG, + true, + 1, + zeroProbability, + 0, + (1L << 56 - 1) + ).makeGenerator(SEED); + case "uniform-64": + return GeneratorColumnSchema.makeContinuousUniform( + distribution, + ValueType.LONG, + true, + 1, + zeroProbability, + 0, + Long.MAX_VALUE - 1 + ).makeGenerator(SEED); + case "zipf-low-100": + return GeneratorColumnSchema.makeLazyZipf( + distribution, + ValueType.LONG, + true, + 1, + zeroProbability, + 0, + 100, + 1d + ).makeGenerator(SEED); + case "zipf-low-100000": + return GeneratorColumnSchema.makeLazyZipf( + distribution, + ValueType.LONG, + true, + 1, + zeroProbability, + -50000, + 50000, + 1d + ).makeGenerator(SEED); + case "zipf-low-32-bit": + return GeneratorColumnSchema.makeLazyZipf( + distribution, + ValueType.LONG, + true, + 1, + 0d, + 0, + Integer.MAX_VALUE, + 1d + ).makeGenerator(SEED); + case "zipf-high-100": + return GeneratorColumnSchema.makeLazyZipf( + distribution, + ValueType.LONG, + true, + 1, + zeroProbability, + 0, + 100, + 3d + ).makeGenerator(SEED); + case "zipf-high-100000": + return GeneratorColumnSchema.makeLazyZipf( + distribution, + ValueType.LONG, + true, + 1, + zeroProbability, + -50000, + 50000, + 3d + ).makeGenerator(SEED); + case "zipf-high-32-bit": + return GeneratorColumnSchema.makeLazyZipf( + distribution, + ValueType.LONG, + true, + 1, + 0d, + 0, + Integer.MAX_VALUE, + 3d + ).makeGenerator(SEED); + } + throw new IllegalArgumentException("unknown distribution"); + } + + static String getGeneratorEncodedFilename(String encoding, String distribution, int rows, double nullProbability) + { + return StringUtils.format("%s-%s-%s-%s.bin", encoding, distribution, rows, nullProbability); + } + + static File getTmpDir() + { + final String dirPath = "tmp/encoding/longs/"; + File dir = new File(dirPath); + dir.mkdirs(); + return dir; + } +} diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsFromSegmentsBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsFromSegmentsBenchmark.java new file mode 100644 index 00000000000..0c0fd2df600 --- /dev/null +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsFromSegmentsBenchmark.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.benchmark.compression; + +import org.apache.druid.java.util.common.StringUtils; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; + +import java.io.File; + +@State(Scope.Benchmark) +public class BaseColumnarLongsFromSegmentsBenchmark extends BaseColumnarLongsBenchmark +{ + /** + * Long columns to read from the segment file specified by {@link #segmentPath} + */ + @Param({ + "__time", + "followers", + "friends", + "max_followers", + "max_retweets", + "max_statuses", + "retweets", + "statuses", + "tweets" + }) + String columnName; + + /** + * Number of rows in the segment. This should actually match the number of rows specified in {@link #segmentPath}. If + * it is smaller than only this many rows will be read, if larger then the benchmark will explode trying to read more + * data than exists rows. + * + * This is a hassle, but ensures that the row count ends up in the output measurements. + */ + @Param({"3259585"}) + int rows; + + + /** + * Path to a segment file to read long columns from. This shouldn't really be used as a parameter, but is nice to + * be included in the output measurements. + * + * This is BYO segment, as this file doesn't probably exist for you, replace it and other parameters with the segment + * to test. + */ + @Param({"tmp/segments/twitter-ticker-1/"}) + String segmentPath; + + /** + * Friendly name of the segment. Like {@link #segmentPath}, this shouldn't really be used as a parameter, but is also + * nice to be included in the output measurements. + */ + @Param({"twitter-ticker"}) + String segmentName; + + String getColumnEncodedFileName(String encoding, String segmentName, String columnName) + { + return StringUtils.format("%s-%s-longs-%s.bin", encoding, segmentName, columnName); + } + + File getTmpDir() + { + final String dirPath = StringUtils.format("tmp/encoding/%s", segmentName); + File dir = new File(dirPath); + dir.mkdirs(); + return dir; + } +} diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/ColumnarLongsEncodeDataFromGeneratorBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/ColumnarLongsEncodeDataFromGeneratorBenchmark.java new file mode 100644 index 00000000000..f02d25d06f5 --- /dev/null +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/ColumnarLongsEncodeDataFromGeneratorBenchmark.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.benchmark.compression; + +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.segment.generator.ColumnValueGenerator; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.results.format.ResultFormatType; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.Writer; +import java.nio.channels.FileChannel; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 1) +@Measurement(iterations = 2) +public class ColumnarLongsEncodeDataFromGeneratorBenchmark extends BaseColumnarLongsFromGeneratorBenchmark +{ + @Setup + public void setup() throws Exception + { + vals = new long[rows]; + final String filename = getGeneratorValueFilename(distribution, rows, zeroProbability); + File dir = getTmpDir(); + File dataFile = new File(dir, filename); + + if (dataFile.exists()) { + System.out.println("Data files already exist, re-using"); + try (BufferedReader br = Files.newBufferedReader(dataFile.toPath(), StandardCharsets.UTF_8)) { + int lineNum = 0; + String line; + while ((line = br.readLine()) != null) { + vals[lineNum] = Long.parseLong(line); + if (vals[lineNum] < minValue) { + minValue = vals[lineNum]; + } + if (vals[lineNum] > maxValue) { + maxValue = vals[lineNum]; + } + lineNum++; + } + } + } else { + try (Writer writer = Files.newBufferedWriter(dataFile.toPath(), StandardCharsets.UTF_8)) { + ColumnValueGenerator valueGenerator = makeGenerator(distribution, rows, zeroProbability); + + for (int i = 0; i < rows; i++) { + long value; + Object rowValue = valueGenerator.generateRowValue(); + value = rowValue != null ? (long) rowValue : 0; + vals[i] = value; + if (vals[i] < minValue) { + minValue = vals[i]; + } + if (vals[i] > maxValue) { + maxValue = vals[i]; + } + writer.write(vals[i] + "\n"); + } + } + } + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public void encodeColumn(Blackhole blackhole) throws IOException + { + File dir = getTmpDir(); + File columnDataFile = new File(dir, getGeneratorEncodedFilename(encoding, distribution, rows, zeroProbability)); + columnDataFile.delete(); + FileChannel output = + FileChannel.open(columnDataFile.toPath(), StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE); + + int size = encodeToFile(vals, encoding, output); + EncodingSizeProfiler.encodedSize = size; + blackhole.consume(size); + output.close(); + } + + public static void main(String[] args) throws RunnerException + { + Options opt = new OptionsBuilder() + .include(ColumnarLongsEncodeDataFromGeneratorBenchmark.class.getSimpleName()) + .addProfiler(EncodingSizeProfiler.class) + .resultFormat(ResultFormatType.CSV) + .result("column-longs-encode-speed.csv") + .build(); + + new Runner(opt).run(); + } + + private static String getGeneratorValueFilename(String distribution, int rows, double nullProbability) + { + return StringUtils.format("values-%s-%s-%s.bin", distribution, rows, nullProbability); + } +} diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/ColumnarLongsEncodeDataFromSegmentBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/ColumnarLongsEncodeDataFromSegmentBenchmark.java new file mode 100644 index 00000000000..aa2f119754b --- /dev/null +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/ColumnarLongsEncodeDataFromSegmentBenchmark.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.benchmark.compression; + +import com.google.common.collect.Iterables; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.segment.IndexIO; +import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.LongsColumn; +import org.apache.druid.segment.column.ValueType; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.results.format.ResultFormatType; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.Writer; +import java.nio.channels.FileChannel; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 1) +@Measurement(iterations = 1) +public class ColumnarLongsEncodeDataFromSegmentBenchmark extends BaseColumnarLongsFromSegmentsBenchmark +{ + @Setup + public void setup() throws Exception + { + initializeSegmentValueIntermediaryFile(); + File dir = getTmpDir(); + File dataFile = new File(dir, getColumnDataFileName(segmentName, columnName)); + + List values = new ArrayList<>(); + try (BufferedReader br = Files.newBufferedReader(dataFile.toPath(), StandardCharsets.UTF_8)) { + String line; + while ((line = br.readLine()) != null) { + long value = Long.parseLong(line); + if (value < minValue) { + minValue = value; + } + if (value > maxValue) { + maxValue = value; + } + values.add(value); + rows++; + } + } + + vals = values.stream().mapToLong(i -> i).toArray(); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void encodeColumn(Blackhole blackhole) throws IOException + { + File dir = getTmpDir(); + File columnDataFile = new File(dir, getColumnEncodedFileName(encoding, segmentName, columnName)); + columnDataFile.delete(); + FileChannel output = + FileChannel.open(columnDataFile.toPath(), StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE); + + int size = BaseColumnarLongsBenchmark.encodeToFile(vals, encoding, output); + EncodingSizeProfiler.encodedSize = size; + blackhole.consume(size); + output.close(); + } + + /** + * writes column values to an intermediary text file, 1 per line, encoders read from this file as input to write + * encoded column files. + */ + private void initializeSegmentValueIntermediaryFile() throws IOException + { + File dir = getTmpDir(); + File dataFile = new File(dir, getColumnDataFileName(segmentName, columnName)); + + if (!dataFile.exists()) { + final IndexIO indexIO = new IndexIO( + new DefaultObjectMapper(), + () -> 0 + ); + try (final QueryableIndex index = indexIO.loadIndex(new File(segmentPath))) { + final Set columnNames = new LinkedHashSet<>(); + columnNames.add(ColumnHolder.TIME_COLUMN_NAME); + Iterables.addAll(columnNames, index.getColumnNames()); + final ColumnHolder column = index.getColumnHolder(columnName); + final ColumnCapabilities capabilities = column.getCapabilities(); + final ValueType columnType = capabilities.getType(); + try (Writer writer = Files.newBufferedWriter(dataFile.toPath(), StandardCharsets.UTF_8)) { + if (columnType != ValueType.LONG) { + throw new RuntimeException("Invalid column type, expected 'Long'"); + } + LongsColumn theColumn = (LongsColumn) column.getColumn(); + + + for (int i = 0; i < theColumn.length(); i++) { + long value = theColumn.getLongSingleValueRow(i); + writer.write(value + "\n"); + } + } + } + } + } + + private String getColumnDataFileName(String segmentName, String columnName) + { + return StringUtils.format("%s-longs-%s.txt", segmentName, columnName); + } + + public static void main(String[] args) throws RunnerException + { + System.out.println("main happened"); + Options opt = new OptionsBuilder() + .include(ColumnarLongsEncodeDataFromSegmentBenchmark.class.getSimpleName()) + .addProfiler(EncodingSizeProfiler.class) + .resultFormat(ResultFormatType.CSV) + .result("column-longs-encode-speed-segments.csv") + .build(); + + new Runner(opt).run(); + } +} diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/ColumnarLongsSelectRowsFromGeneratorBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/ColumnarLongsSelectRowsFromGeneratorBenchmark.java new file mode 100644 index 00000000000..d25ded77892 --- /dev/null +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/ColumnarLongsSelectRowsFromGeneratorBenchmark.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.benchmark.compression; + +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.segment.data.ColumnarLongs; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.results.format.ResultFormatType; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 3) +@Measurement(iterations = 5) +public class ColumnarLongsSelectRowsFromGeneratorBenchmark extends BaseColumnarLongsFromGeneratorBenchmark +{ + private Map decoders; + private Map encodedSize; + + /** + * Number of rows to read, the test will randomly set positions in a simulated offset of the specified density in + * {@link #setupFilters(int, double)} + */ + @Param({ + "0.1", + "0.25", + "0.5", + "0.75", + "0.95", + "1.0" + }) + private double filteredRowCountPercentage; + + @Setup + public void setup() throws IOException + { + decoders = new HashMap<>(); + encodedSize = new HashMap<>(); + + setupFromFile(encoding); + setupFilters(rows, filteredRowCountPercentage); + + // uncomment this block to run sanity check to ensure all specified encodings produce the same set of results + //CHECKSTYLE.OFF: Regexp +// ImmutableList all = ImmutableList.of("lz4-longs", "lz4-auto"); +// for (String _enc : all) { +// if (!_enc.equalsIgnoreCase(encoding)) { +// setupFromFile(_enc); +// } +// } +// +// checkSanity(decoders, all, rows); + //CHECKSTYLE.ON: Regexp + } + + @TearDown + public void teardown() + { + for (ColumnarLongs longs : decoders.values()) { + longs.close(); + } + } + + private void setupFromFile(String encoding) throws IOException + { + File dir = getTmpDir(); + File compFile = new File(dir, getGeneratorEncodedFilename(encoding, distribution, rows, zeroProbability)); + ByteBuffer buffer = FileUtils.map(compFile).get(); + + int size = (int) compFile.length(); + encodedSize.put(encoding, size); + ColumnarLongs data = createColumnarLongs(encoding, buffer); + decoders.put(encoding, data); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void selectRows(Blackhole blackhole) + { + EncodingSizeProfiler.encodedSize = encodedSize.get(encoding); + ColumnarLongs encoder = decoders.get(encoding); + if (filter == null) { + for (int i = 0; i < rows; i++) { + blackhole.consume(encoder.get(i)); + } + } else { + for (int i = filter.nextSetBit(0); i >= 0; i = filter.nextSetBit(i + 1)) { + blackhole.consume(encoder.get(i)); + } + } + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void selectRowsVectorized(Blackhole blackhole) + { + EncodingSizeProfiler.encodedSize = encodedSize.get(encoding); + ColumnarLongs columnDecoder = decoders.get(encoding); + long[] vector = new long[VECTOR_SIZE]; + while (!vectorOffset.isDone()) { + if (vectorOffset.isContiguous()) { + columnDecoder.get(vector, vectorOffset.getStartOffset(), vectorOffset.getCurrentVectorSize()); + } else { + columnDecoder.get(vector, vectorOffset.getOffsets(), vectorOffset.getCurrentVectorSize()); + } + for (int i = 0; i < vectorOffset.getCurrentVectorSize(); i++) { + blackhole.consume(vector[i]); + } + vectorOffset.advance(); + } + blackhole.consume(vector); + blackhole.consume(vectorOffset); + vectorOffset.reset(); + columnDecoder.close(); + } + + public static void main(String[] args) throws RunnerException + { + Options opt = new OptionsBuilder() + .include(ColumnarLongsSelectRowsFromGeneratorBenchmark.class.getSimpleName()) + .addProfiler(EncodingSizeProfiler.class) + .resultFormat(ResultFormatType.CSV) + .result("column-longs-select-speed.csv") + .build(); + + new Runner(opt).run(); + } +} diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/ColumnarLongsSelectRowsFromSegmentBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/ColumnarLongsSelectRowsFromSegmentBenchmark.java new file mode 100644 index 00000000000..0fadb6249e7 --- /dev/null +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/ColumnarLongsSelectRowsFromSegmentBenchmark.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.benchmark.compression; + +import com.google.common.io.Files; +import org.apache.druid.segment.data.ColumnarLongs; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.results.format.ResultFormatType; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 1) +@Measurement(iterations = 1) +public class ColumnarLongsSelectRowsFromSegmentBenchmark extends BaseColumnarLongsFromSegmentsBenchmark +{ + private Map decoders; + private Map encodedSize; + + /** + * Number of rows to read, the test will randomly set positions in a simulated offset of the specified density in + * {@link #setupFilters(int, double)} + */ + @Param({"0.01", "0.1", "0.33", "0.66", "0.95", "1.0"}) + private double filteredRowCountPercentage; + + @Setup + public void setup() throws Exception + { + decoders = new HashMap<>(); + encodedSize = new HashMap<>(); + setupFilters(rows, filteredRowCountPercentage); + + setupFromFile(encoding); + + + // uncomment this block to run sanity check to ensure all specified encodings produce the same set of results + //CHECKSTYLE.OFF: Regexp +// List all = ImmutableList.of("lz4-longs", "lz4-auto"); +// for (String _enc : all) { +// if (!_enc.equals(encoding)) { +// setupFromFile(_enc); +// } +// } +// +// checkSanity(decoders, all, rows); + //CHECKSTYLE.ON: Regexp + } + + @TearDown + public void teardown() + { + for (ColumnarLongs longs : decoders.values()) { + longs.close(); + } + } + + private void setupFromFile(String encoding) throws IOException + { + File dir = getTmpDir(); + File compFile = new File(dir, getColumnEncodedFileName(encoding, segmentName, columnName)); + ByteBuffer buffer = Files.map(compFile); + + int size = (int) compFile.length(); + encodedSize.put(encoding, size); + ColumnarLongs data = BaseColumnarLongsBenchmark.createColumnarLongs(encoding, buffer); + decoders.put(encoding, data); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void selectRows(Blackhole blackhole) + { + EncodingSizeProfiler.encodedSize = encodedSize.get(encoding); + ColumnarLongs encoder = decoders.get(encoding); + if (filter == null) { + for (int i = 0; i < rows; i++) { + blackhole.consume(encoder.get(i)); + } + } else { + for (int i = filter.nextSetBit(0); i >= 0; i = filter.nextSetBit(i + 1)) { + blackhole.consume(encoder.get(i)); + } + } + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void selectRowsVectorized(Blackhole blackhole) + { + EncodingSizeProfiler.encodedSize = encodedSize.get(encoding); + ColumnarLongs columnDecoder = decoders.get(encoding); + long[] vector = new long[VECTOR_SIZE]; + while (!vectorOffset.isDone()) { + if (vectorOffset.isContiguous()) { + columnDecoder.get(vector, vectorOffset.getStartOffset(), vectorOffset.getCurrentVectorSize()); + } else { + columnDecoder.get(vector, vectorOffset.getOffsets(), vectorOffset.getCurrentVectorSize()); + } + for (int i = 0; i < vectorOffset.getCurrentVectorSize(); i++) { + blackhole.consume(vector[i]); + } + vectorOffset.advance(); + } + blackhole.consume(vector); + blackhole.consume(vectorOffset); + vectorOffset.reset(); + columnDecoder.close(); + } + + + public static void main(String[] args) throws RunnerException + { + Options opt = new OptionsBuilder() + .include(ColumnarLongsSelectRowsFromSegmentBenchmark.class.getSimpleName()) + .addProfiler(EncodingSizeProfiler.class) + .resultFormat(ResultFormatType.CSV) + .result("column-longs-select-speed-segments.csv") + .build(); + + new Runner(opt).run(); + } +} diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/CompressedColumnarIntsBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/CompressedColumnarIntsBenchmark.java similarity index 99% rename from benchmarks/src/test/java/org/apache/druid/benchmark/CompressedColumnarIntsBenchmark.java rename to benchmarks/src/test/java/org/apache/druid/benchmark/compression/CompressedColumnarIntsBenchmark.java index 5e283ae55d9..bbcd2fc1395 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/CompressedColumnarIntsBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/CompressedColumnarIntsBenchmark.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.druid.benchmark; +package org.apache.druid.benchmark.compression; import it.unimi.dsi.fastutil.ints.IntArrayList; import org.apache.druid.common.config.NullHandling; diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/CompressedVSizeColumnarMultiIntsBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/CompressedVSizeColumnarMultiIntsBenchmark.java similarity index 99% rename from benchmarks/src/test/java/org/apache/druid/benchmark/CompressedVSizeColumnarMultiIntsBenchmark.java rename to benchmarks/src/test/java/org/apache/druid/benchmark/compression/CompressedVSizeColumnarMultiIntsBenchmark.java index bbf5b537c83..5eb88195f92 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/CompressedVSizeColumnarMultiIntsBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/CompressedVSizeColumnarMultiIntsBenchmark.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.druid.benchmark; +package org.apache.druid.benchmark.compression; import com.google.common.base.Function; import com.google.common.collect.Iterables; diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/EncodingSizeProfiler.java b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/EncodingSizeProfiler.java new file mode 100644 index 00000000000..e241de1da1a --- /dev/null +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/EncodingSizeProfiler.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.benchmark.compression; + +import org.openjdk.jmh.infra.BenchmarkParams; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.infra.IterationParams; +import org.openjdk.jmh.profile.InternalProfiler; +import org.openjdk.jmh.results.AggregationPolicy; +import org.openjdk.jmh.results.IterationResult; +import org.openjdk.jmh.results.Result; +import org.openjdk.jmh.results.ScalarResult; + +import java.util.Collection; +import java.util.Collections; + +/** + * Crude jmh 'profiler' that allows calling benchmark methods to set this static value in a benchmark run, and if + * this profiler to the run and have this additional measurement show up in the results. + * + * This allows 2 measurements to be collected for the result set, timing of the test, and size in bytes set here. + * + * @see ColumnarLongsSelectRowsFromGeneratorBenchmark#selectRows(Blackhole) + * @see ColumnarLongsSelectRowsFromGeneratorBenchmark#selectRowsVectorized(Blackhole) + * @see ColumnarLongsEncodeDataFromGeneratorBenchmark#encodeColumn(Blackhole) + * @see ColumnarLongsSelectRowsFromSegmentBenchmark#selectRows(Blackhole) + * @see ColumnarLongsSelectRowsFromSegmentBenchmark#selectRowsVectorized(Blackhole) + * @see ColumnarLongsEncodeDataFromSegmentBenchmark#encodeColumn(Blackhole) + */ +public class EncodingSizeProfiler implements InternalProfiler +{ + public static int encodedSize; + + @Override + public void beforeIteration( + BenchmarkParams benchmarkParams, + IterationParams iterationParams + ) + { + } + + @Override + public Collection afterIteration( + BenchmarkParams benchmarkParams, + IterationParams iterationParams, + IterationResult result + ) + { + return Collections.singletonList( + new ScalarResult("encoded size", encodedSize, "bytes", AggregationPolicy.MAX) + ); + } + + @Override + public String getDescription() + { + return "super janky encoding size result collector"; + } +} diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/FloatCompressionBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/FloatCompressionBenchmark.java similarity index 98% rename from benchmarks/src/test/java/org/apache/druid/benchmark/FloatCompressionBenchmark.java rename to benchmarks/src/test/java/org/apache/druid/benchmark/compression/FloatCompressionBenchmark.java index 1663c0eba37..a8063ab667a 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/FloatCompressionBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/FloatCompressionBenchmark.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.druid.benchmark; +package org.apache.druid.benchmark.compression; import com.google.common.base.Supplier; import org.apache.druid.common.config.NullHandling; diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/FloatCompressionBenchmarkFileGenerator.java b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/FloatCompressionBenchmarkFileGenerator.java similarity index 99% rename from benchmarks/src/test/java/org/apache/druid/benchmark/FloatCompressionBenchmarkFileGenerator.java rename to benchmarks/src/test/java/org/apache/druid/benchmark/compression/FloatCompressionBenchmarkFileGenerator.java index 424f2977104..82709a6c4b3 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/FloatCompressionBenchmarkFileGenerator.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/FloatCompressionBenchmarkFileGenerator.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.druid.benchmark; +package org.apache.druid.benchmark.compression; import com.google.common.collect.ImmutableList; import org.apache.druid.common.config.NullHandling; diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/LongCompressionBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/LongCompressionBenchmark.java similarity index 84% rename from benchmarks/src/test/java/org/apache/druid/benchmark/LongCompressionBenchmark.java rename to benchmarks/src/test/java/org/apache/druid/benchmark/compression/LongCompressionBenchmark.java index 7bd057311a4..e059c8a7ea7 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/LongCompressionBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/LongCompressionBenchmark.java @@ -17,12 +17,13 @@ * under the License. */ -package org.apache.druid.benchmark; +package org.apache.druid.benchmark.compression; import com.google.common.base.Supplier; import org.apache.druid.common.config.NullHandling; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.MappedByteBufferHandler; +import org.apache.druid.segment.QueryableIndexStorageAdapter; import org.apache.druid.segment.data.ColumnarLongs; import org.apache.druid.segment.data.CompressedColumnarLongsSupplier; import org.openjdk.jmh.annotations.Benchmark; @@ -50,8 +51,8 @@ import java.util.concurrent.TimeUnit; */ @State(Scope.Benchmark) @Fork(value = 1) -@Warmup(iterations = 10) -@Measurement(iterations = 25) +@Warmup(iterations = 3) +@Measurement(iterations = 5) @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) public class LongCompressionBenchmark @@ -69,7 +70,7 @@ public class LongCompressionBenchmark @Param({"auto", "longs"}) private static String format; - @Param({"lz4", "none"}) + @Param({"lz4"}) private static String strategy; private Supplier supplier; @@ -114,4 +115,18 @@ public class LongCompressionBenchmark columnarLongs.close(); } + @Benchmark + public void readVectorizedContinuous(Blackhole bh) + { + long[] vector = new long[QueryableIndexStorageAdapter.DEFAULT_VECTOR_SIZE]; + ColumnarLongs columnarLongs = supplier.get(); + int count = columnarLongs.size(); + for (int i = 0; i < count; i++) { + if (i % vector.length == 0) { + columnarLongs.get(vector, i, Math.min(vector.length, count - i)); + } + bh.consume(vector[i % vector.length]); + } + columnarLongs.close(); + } } diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/LongCompressionBenchmarkFileGenerator.java b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/LongCompressionBenchmarkFileGenerator.java similarity index 99% rename from benchmarks/src/test/java/org/apache/druid/benchmark/LongCompressionBenchmarkFileGenerator.java rename to benchmarks/src/test/java/org/apache/druid/benchmark/compression/LongCompressionBenchmarkFileGenerator.java index b9bca954de4..55d5f6b82bb 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/LongCompressionBenchmarkFileGenerator.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/LongCompressionBenchmarkFileGenerator.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.druid.benchmark; +package org.apache.druid.benchmark.compression; import com.google.common.collect.ImmutableList; import org.apache.druid.common.config.NullHandling; diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/VSizeSerdeBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/VSizeSerdeBenchmark.java similarity index 99% rename from benchmarks/src/test/java/org/apache/druid/benchmark/VSizeSerdeBenchmark.java rename to benchmarks/src/test/java/org/apache/druid/benchmark/compression/VSizeSerdeBenchmark.java index 1738046205e..995925e2d69 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/VSizeSerdeBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/VSizeSerdeBenchmark.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.druid.benchmark; +package org.apache.druid.benchmark.compression; import org.apache.druid.common.config.NullHandling; import org.apache.druid.java.util.common.FileUtils; diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java b/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java index 7bf647a2711..0b45cdb8d5c 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java @@ -283,26 +283,9 @@ public class CompressionFactory long read(int index); - default void read(long[] out, int outPosition, int startIndex, int length) - { - for (int i = 0; i < length; i++) { - out[outPosition + i] = read(startIndex + i); - } - } + void read(long[] out, int outPosition, int startIndex, int length); - default int read(long[] out, int outPosition, int[] indexes, int length, int indexOffset, int limit) - { - for (int i = 0; i < length; i++) { - int index = indexes[outPosition + i] - indexOffset; - if (index >= limit) { - return i; - } - - out[outPosition + i] = read(index); - } - - return length; - } + int read(long[] out, int outPosition, int[] indexes, int length, int indexOffset, int limit); LongEncodingReader duplicate(); } diff --git a/processing/src/main/java/org/apache/druid/segment/data/DeltaLongEncodingReader.java b/processing/src/main/java/org/apache/druid/segment/data/DeltaLongEncodingReader.java index c76e919c119..435aa2ddfd1 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/DeltaLongEncodingReader.java +++ b/processing/src/main/java/org/apache/druid/segment/data/DeltaLongEncodingReader.java @@ -65,6 +65,18 @@ public class DeltaLongEncodingReader implements CompressionFactory.LongEncodingR return base + deserializer.get(index); } + @Override + public void read(long[] out, int outPosition, int startIndex, int length) + { + deserializer.getDelta(out, outPosition, startIndex, length, base); + } + + @Override + public int read(long[] out, int outPosition, int[] indexes, int length, int indexOffset, int limit) + { + return deserializer.getDelta(out, outPosition, indexes, length, indexOffset, limit, base); + } + @Override public CompressionFactory.LongEncodingReader duplicate() { diff --git a/processing/src/main/java/org/apache/druid/segment/data/LongsLongEncodingReader.java b/processing/src/main/java/org/apache/druid/segment/data/LongsLongEncodingReader.java index aaf2c0ef202..7fd1aef58b4 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/LongsLongEncodingReader.java +++ b/processing/src/main/java/org/apache/druid/segment/data/LongsLongEncodingReader.java @@ -19,52 +19,56 @@ package org.apache.druid.segment.data; +import org.apache.datasketches.memory.Memory; + import java.nio.ByteBuffer; import java.nio.ByteOrder; -import java.nio.LongBuffer; public class LongsLongEncodingReader implements CompressionFactory.LongEncodingReader { - private LongBuffer buffer; + private Memory buffer; public LongsLongEncodingReader(ByteBuffer fromBuffer, ByteOrder order) { - this.buffer = fromBuffer.asReadOnlyBuffer().order(order).asLongBuffer(); - } - - private LongsLongEncodingReader(LongBuffer buffer) - { - this.buffer = buffer; + this.buffer = Memory.wrap(fromBuffer.slice(), order); } @Override public void setBuffer(ByteBuffer buffer) { - this.buffer = buffer.asLongBuffer(); + this.buffer = Memory.wrap(buffer.slice(), buffer.order()); } @Override public long read(int index) { - return buffer.get(buffer.position() + index); + return buffer.getLong((long) index << 3); } @Override public void read(final long[] out, final int outPosition, final int startIndex, final int length) { - final int oldPosition = buffer.position(); - try { - buffer.position(oldPosition + startIndex); - buffer.get(out, outPosition, length); - } - finally { - buffer.position(oldPosition); + buffer.getLongArray((long) startIndex << 3, out, outPosition, length); + } + + @Override + public int read(long[] out, int outPosition, int[] indexes, int length, int indexOffset, int limit) + { + for (int i = 0; i < length; i++) { + int index = indexes[outPosition + i] - indexOffset; + if (index >= limit) { + return i; + } + + out[outPosition + i] = buffer.getLong((long) index << 3); } + + return length; } @Override public CompressionFactory.LongEncodingReader duplicate() { - return new LongsLongEncodingReader(buffer.duplicate()); + return this; } } diff --git a/processing/src/main/java/org/apache/druid/segment/data/TableLongEncodingReader.java b/processing/src/main/java/org/apache/druid/segment/data/TableLongEncodingReader.java index 0a20e7b8293..6a5e17b1080 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/TableLongEncodingReader.java +++ b/processing/src/main/java/org/apache/druid/segment/data/TableLongEncodingReader.java @@ -71,6 +71,18 @@ public class TableLongEncodingReader implements CompressionFactory.LongEncodingR return table[(int) deserializer.get(index)]; } + @Override + public void read(long[] out, int outPosition, int startIndex, int length) + { + deserializer.getTable(out, outPosition, startIndex, length, table); + } + + @Override + public int read(long[] out, int outPosition, int[] indexes, int length, int indexOffset, int limit) + { + return deserializer.getTable(out, outPosition, indexes, length, indexOffset, limit, table); + } + @Override public CompressionFactory.LongEncodingReader duplicate() { diff --git a/processing/src/main/java/org/apache/druid/segment/data/VSizeLongSerde.java b/processing/src/main/java/org/apache/druid/segment/data/VSizeLongSerde.java index c0c3c08bbef..ff0316d0f15 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/VSizeLongSerde.java +++ b/processing/src/main/java/org/apache/druid/segment/data/VSizeLongSerde.java @@ -19,10 +19,11 @@ package org.apache.druid.segment.data; +import com.google.common.base.Preconditions; import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.UOE; import javax.annotation.Nullable; - import java.io.Closeable; import java.io.IOException; import java.io.OutputStream; @@ -213,6 +214,7 @@ public class VSizeLongSerde @Override public void write(long value) throws IOException { + Preconditions.checkArgument(value >= 0); if (count == 8) { buffer.put(curByte); count = 0; @@ -267,6 +269,7 @@ public class VSizeLongSerde @Override public void write(long value) throws IOException { + Preconditions.checkArgument(value >= 0); if (count == 8) { buffer.put(curByte); count = 0; @@ -324,6 +327,7 @@ public class VSizeLongSerde @Override public void write(long value) throws IOException { + Preconditions.checkArgument(value >= 0); int shift = 0; if (first) { shift = 4; @@ -388,6 +392,10 @@ public class VSizeLongSerde @Override public void write(long value) throws IOException { + if (numBytes != 8) { + // if the value is not stored in a full long, ensure it is zero or positive + Preconditions.checkArgument(value >= 0); + } for (int i = numBytes - 1; i >= 0; i--) { buffer.put((byte) (value >>> (i * 8))); if (output != null) { @@ -413,9 +421,74 @@ public class VSizeLongSerde } } + /** + * Unpack bitpacked long values from an underlying contiguous memory block + */ public interface LongDeserializer { + /** + * Unpack long value at the specified row index + */ long get(int index); + + /** + * Unpack a contiguous vector of long values at the specified start index of length and adjust them by the supplied + * delta base value. + */ + void getDelta(long[] out, int outPosition, int startIndex, int length, long base); + + /** + * Unpack a non-contiguous vector of long values at the specified indexes and adjust them by the supplied delta base + * value. + */ + default int getDelta(long[] out, int outPosition, int[] indexes, int length, int indexOffset, int limit, long base) + { + for (int i = 0; i < length; i++) { + int index = indexes[outPosition + i] - indexOffset; + if (index >= limit) { + return i; + } + + out[outPosition + i] = base + get(index); + } + + return length; + } + + /** + * Unpack a contiguous vector of long values at the specified start index of length and lookup and replace stored + * values based on their index in the supplied value lookup 'table' + */ + default void getTable(long[] out, int outPosition, int startIndex, int length, long[] table) + { + throw new UOE("Table decoding not supported for %s", this.getClass().getSimpleName()); + } + + /** + * Unpack a contiguous vector of long values at the specified indexes and lookup and replace stored values based on + * their index in the supplied value lookup 'table' + */ + default int getTable( + long[] out, + int outPosition, + int[] indexes, + int length, + int indexOffset, + int limit, + long[] table + ) + { + for (int i = 0; i < length; i++) { + int index = indexes[outPosition + i] - indexOffset; + if (index >= limit) { + return i; + } + + out[outPosition + i] = table[(int) get(index)]; + } + + return length; + } } private static final class Size1Des implements LongDeserializer @@ -435,6 +508,58 @@ public class VSizeLongSerde int shift = 7 - (index & 7); return (buffer.get(offset + (index >> 3)) >> shift) & 1; } + + @Override + public void getDelta(long[] out, int outPosition, int startIndex, int length, long base) + { + int index = startIndex; + int i = 0; + + // byte align + while ((index & 0x7) != 0 && i < length) { + out[outPosition + i++] = base + get(index++); + } + for ( ; i + Byte.SIZE < length; index += Byte.SIZE) { + final byte unpack = buffer.get(offset + (index >> 3)); + out[outPosition + i++] = base + (unpack >> 7) & 1; + out[outPosition + i++] = base + (unpack >> 6) & 1; + out[outPosition + i++] = base + (unpack >> 5) & 1; + out[outPosition + i++] = base + (unpack >> 4) & 1; + out[outPosition + i++] = base + (unpack >> 3) & 1; + out[outPosition + i++] = base + (unpack >> 2) & 1; + out[outPosition + i++] = base + (unpack >> 1) & 1; + out[outPosition + i++] = base + unpack & 1; + } + while (i < length) { + out[outPosition + i++] = base + get(index++); + } + } + + @Override + public void getTable(long[] out, int outPosition, int startIndex, int length, long[] table) + { + int index = startIndex; + int i = 0; + + // byte align + while ((index & 0x7) != 0 && i < length) { + out[outPosition + i++] = table[(int) get(index++)]; + } + for ( ; i + Byte.SIZE < length; index += Byte.SIZE) { + final byte unpack = buffer.get(offset + (index >> 3)); + out[outPosition + i++] = table[(unpack >> 7) & 1]; + out[outPosition + i++] = table[(unpack >> 6) & 1]; + out[outPosition + i++] = table[(unpack >> 5) & 1]; + out[outPosition + i++] = table[(unpack >> 4) & 1]; + out[outPosition + i++] = table[(unpack >> 3) & 1]; + out[outPosition + i++] = table[(unpack >> 2) & 1]; + out[outPosition + i++] = table[(unpack >> 1) & 1]; + out[outPosition + i++] = table[unpack & 1]; + } + while (i < length) { + out[outPosition + i++] = table[(int) get(index++)]; + } + } } private static final class Size2Des implements LongDeserializer @@ -454,6 +579,58 @@ public class VSizeLongSerde int shift = 6 - ((index & 3) << 1); return (buffer.get(offset + (index >> 2)) >> shift) & 3; } + + @Override + public void getDelta(long[] out, int outPosition, int startIndex, int length, long base) + { + int index = startIndex; + int i = 0; + + // byte align + while ((index & 0x3) != 0 && i < length) { + out[outPosition + i++] = base + get(index++); + } + for ( ; i + 8 < length; index += 8) { + final short unpack = buffer.getShort(offset + (index >> 2)); + out[outPosition + i++] = base + (unpack >> 14) & 3; + out[outPosition + i++] = base + (unpack >> 12) & 3; + out[outPosition + i++] = base + (unpack >> 10) & 3; + out[outPosition + i++] = base + (unpack >> 8) & 3; + out[outPosition + i++] = base + (unpack >> 6) & 3; + out[outPosition + i++] = base + (unpack >> 4) & 3; + out[outPosition + i++] = base + (unpack >> 2) & 3; + out[outPosition + i++] = base + unpack & 3; + } + while (i < length) { + out[outPosition + i++] = base + get(index++); + } + } + + @Override + public void getTable(long[] out, int outPosition, int startIndex, int length, long[] table) + { + int index = startIndex; + int i = 0; + + // byte align + while ((index & 0x3) != 0 && i < length) { + out[outPosition + i++] = table[(int) get(index++)]; + } + for ( ; i + 8 < length; index += 8) { + final short unpack = buffer.getShort(offset + (index >> 2)); + out[outPosition + i++] = table[(unpack >> 14) & 3]; + out[outPosition + i++] = table[(unpack >> 12) & 3]; + out[outPosition + i++] = table[(unpack >> 10) & 3]; + out[outPosition + i++] = table[(unpack >> 8) & 3]; + out[outPosition + i++] = table[(unpack >> 6) & 3]; + out[outPosition + i++] = table[(unpack >> 4) & 3]; + out[outPosition + i++] = table[(unpack >> 2) & 3]; + out[outPosition + i++] = table[unpack & 3]; + } + while (i < length) { + out[outPosition + i++] = table[(int) get(index++)]; + } + } } private static final class Size4Des implements LongDeserializer @@ -473,6 +650,58 @@ public class VSizeLongSerde int shift = ((index + 1) & 1) << 2; return (buffer.get(offset + (index >> 1)) >> shift) & 0xF; } + + @Override + public void getDelta(long[] out, int outPosition, int startIndex, int length, long base) + { + int index = startIndex; + int i = 0; + + // byte align + while ((index & 0x1) != 0 && i < length) { + out[outPosition + i++] = base + get(index++) & 0xF; + } + for ( ; i + 8 < length; index += 8) { + final int unpack = buffer.getInt(offset + (index >> 1)); + out[outPosition + i++] = base + (unpack >> 28) & 0xF; + out[outPosition + i++] = base + (unpack >> 24) & 0xF; + out[outPosition + i++] = base + (unpack >> 20) & 0xF; + out[outPosition + i++] = base + (unpack >> 16) & 0xF; + out[outPosition + i++] = base + (unpack >> 12) & 0xF; + out[outPosition + i++] = base + (unpack >> 8) & 0xF; + out[outPosition + i++] = base + (unpack >> 4) & 0xF; + out[outPosition + i++] = base + unpack & 0xF; + } + while (i < length) { + out[outPosition + i++] = base + get(index++); + } + } + + @Override + public void getTable(long[] out, int outPosition, int startIndex, int length, long[] table) + { + int index = startIndex; + int i = 0; + + // byte align + while ((index & 0x1) != 0 && i < length) { + out[outPosition + i++] = table[(int) get(index++)]; + } + for ( ; i + 8 < length; index += 8) { + final int unpack = buffer.getInt(offset + (index >> 1)); + out[outPosition + i++] = table[(unpack >> 28) & 0xF]; + out[outPosition + i++] = table[(unpack >> 24) & 0xF]; + out[outPosition + i++] = table[(unpack >> 20) & 0xF]; + out[outPosition + i++] = table[(unpack >> 16) & 0xF]; + out[outPosition + i++] = table[(unpack >> 12) & 0xF]; + out[outPosition + i++] = table[(unpack >> 8) & 0xF]; + out[outPosition + i++] = table[(unpack >> 4) & 0xF]; + out[outPosition + i++] = table[unpack & 0xF]; + } + while (i < length) { + out[outPosition + i++] = table[(int) get(index++)]; + } + } } private static final class Size8Des implements LongDeserializer @@ -491,6 +720,52 @@ public class VSizeLongSerde { return buffer.get(offset + index) & 0xFF; } + + @Override + public void getDelta(long[] out, int outPosition, int startIndex, int length, long base) + { + for (int i = 0, indexOffset = startIndex; i < length; i++, indexOffset++) { + out[outPosition + i] = base + buffer.get(offset + indexOffset) & 0xFF; + } + } + + @Override + public int getDelta(long[] out, int outPosition, int[] indexes, int length, int indexOffset, int limit, long base) + { + for (int i = 0; i < length; i++) { + int index = indexes[outPosition + i] - indexOffset; + if (index >= limit) { + return i; + } + + out[outPosition + i] = base + (buffer.get(offset + index) & 0xFF); + } + + return length; + } + + @Override + public void getTable(long[] out, int outPosition, int startIndex, int length, long[] table) + { + for (int i = 0, indexOffset = startIndex; i < length; i++, indexOffset++) { + out[outPosition + i] = table[buffer.get(offset + indexOffset) & 0xFF]; + } + } + + @Override + public int getTable(long[] out, int outPosition, int[] indexes, int length, int indexOffset, int limit, long[] table) + { + for (int i = 0; i < length; i++) { + int index = indexes[outPosition + i] - indexOffset; + if (index >= limit) { + return i; + } + + out[outPosition + i] = table[buffer.get(offset + index) & 0xFF]; + } + + return length; + } } private static final class Size12Des implements LongDeserializer @@ -508,8 +783,37 @@ public class VSizeLongSerde public long get(int index) { int shift = ((index + 1) & 1) << 2; - int offset = (index * 3) >> 1; - return (buffer.getShort(this.offset + offset) >> shift) & 0xFFF; + int indexOffset = (index * 3) >> 1; + return (buffer.getShort(offset + indexOffset) >> shift) & 0xFFF; + } + + + @Override + public void getDelta(long[] out, int outPosition, int startIndex, int length, long base) + { + int i = 0; + int index = startIndex; + // every other value is byte aligned + if ((index & 0x1) != 0) { + out[outPosition + i++] = get(index++); + } + final int unpackSize = Long.BYTES + Integer.BYTES; + for (int indexOffset = (index * 3) >> 1; i + 8 < length; indexOffset += unpackSize) { + final long unpack = buffer.getLong(offset + indexOffset); + final int unpack2 = buffer.getInt(offset + indexOffset + Long.BYTES); + out[outPosition + i++] = base + ((unpack >> 52) & 0xFFF); + out[outPosition + i++] = base + ((unpack >> 40) & 0xFFF); + out[outPosition + i++] = base + ((unpack >> 28) & 0xFFF); + out[outPosition + i++] = base + ((unpack >> 16) & 0xFFF); + out[outPosition + i++] = base + ((unpack >> 4) & 0xFFF); + out[outPosition + i++] = base + (((unpack & 0xF) << 8) | ((unpack2 >>> 24) & 0xFF)); + out[outPosition + i++] = base + ((unpack2 >> 12) & 0xFFF); + out[outPosition + i++] = base + (unpack2 & 0xFFF); + } + while (i < length) { + out[outPosition + i] = base + get(startIndex + i); + i++; + } } } @@ -529,6 +833,29 @@ public class VSizeLongSerde { return buffer.getShort(offset + (index << 1)) & 0xFFFF; } + + @Override + public void getDelta(long[] out, int outPosition, int startIndex, int length, long base) + { + for (int i = 0, indexOffset = (startIndex << 1); i < length; i++, indexOffset += Short.BYTES) { + out[outPosition + i] = base + buffer.getShort(offset + indexOffset) & 0xFFFF; + } + } + + @Override + public int getDelta(long[] out, int outPosition, int[] indexes, int length, int indexOffset, int limit, long base) + { + for (int i = 0; i < length; i++) { + int index = indexes[outPosition + i] - indexOffset; + if (index >= limit) { + return i; + } + + out[outPosition + i] = base + buffer.getShort(offset + (index << 1)) & 0xFFFF; + } + + return length; + } } private static final class Size20Des implements LongDeserializer @@ -546,8 +873,37 @@ public class VSizeLongSerde public long get(int index) { int shift = (((index + 1) & 1) << 2) + 8; - int offset = (index * 5) >> 1; - return (buffer.getInt(this.offset + offset) >> shift) & 0xFFFFF; + int indexOffset = (index * 5) >> 1; + return (buffer.getInt(offset + indexOffset) >> shift) & 0xFFFFF; + } + + @Override + public void getDelta(long[] out, int outPosition, int startIndex, int length, long base) + { + int i = 0; + int index = startIndex; + // every other value is byte aligned + if ((index & 0x1) != 0) { + out[outPosition + i++] = get(index++); + } + final int unpackSize = Long.BYTES + Long.BYTES + Integer.BYTES; + for (int indexOffset = (index * 5) >> 1; i + 8 < length; indexOffset += unpackSize) { + final long unpack = buffer.getLong(offset + indexOffset); + final long unpack2 = buffer.getLong(offset + indexOffset + Long.BYTES); + final int unpack3 = buffer.getInt(offset + indexOffset + Long.BYTES + Long.BYTES); + out[outPosition + i++] = base + ((unpack >> 44) & 0xFFFFF); + out[outPosition + i++] = base + ((unpack >> 24) & 0xFFFFF); + out[outPosition + i++] = base + ((unpack >> 4) & 0xFFFFF); + out[outPosition + i++] = base + (((unpack & 0xF) << 16) | ((unpack2 >>> 48) & 0xFFFF)); + out[outPosition + i++] = base + ((unpack2 >> 28) & 0xFFFFF); + out[outPosition + i++] = base + ((unpack2 >> 8) & 0xFFFFF); + out[outPosition + i++] = base + (((unpack2 & 0xFF) << 12) | ((unpack3 >>> 20) & 0xFFF)); + out[outPosition + i++] = base + (unpack3 & 0xFFFFF); + } + while (i < length) { + out[outPosition + i] = base + get(startIndex + i); + i++; + } } } @@ -565,7 +921,31 @@ public class VSizeLongSerde @Override public long get(int index) { - return buffer.getInt(offset + index * 3) >>> 8; + return buffer.getInt(offset + (index * 3)) >>> 8; + } + + @Override + public void getDelta(long[] out, int outPosition, int startIndex, int length, long base) + { + int i = 0; + final int unpackSize = 3 * Long.BYTES; + for (int indexOffset = startIndex * 3; i + 8 < length; indexOffset += unpackSize) { + final long unpack = buffer.getLong(offset + indexOffset); + final long unpack2 = buffer.getLong(offset + indexOffset + Long.BYTES); + final long unpack3 = buffer.getLong(offset + indexOffset + Long.BYTES + Long.BYTES); + out[outPosition + i++] = base + ((unpack >> 40) & 0xFFFFFF); + out[outPosition + i++] = base + ((unpack >> 16) & 0xFFFFFF); + out[outPosition + i++] = base + (((unpack & 0xFFFF) << 8) | ((unpack2 >>> 56) & 0xFF)); + out[outPosition + i++] = base + ((unpack2 >> 32) & 0xFFFFFF); + out[outPosition + i++] = base + ((unpack2 >> 8) & 0xFFFFFF); + out[outPosition + i++] = base + (((unpack2 & 0xFF) << 16) | ((unpack3 >>> 48) & 0xFFFF)); + out[outPosition + i++] = base + ((unpack3 >> 24) & 0xFFFFFF); + out[outPosition + i++] = base + (unpack3 & 0xFFFFFF); + } + while (i < length) { + out[outPosition + i] = base + get(startIndex + i); + i++; + } } } @@ -583,7 +963,15 @@ public class VSizeLongSerde @Override public long get(int index) { - return buffer.getInt(offset + (index << 2)) & 0xFFFFFFFFL; + return buffer.getInt((offset + (index << 2))) & 0xFFFFFFFFL; + } + + @Override + public void getDelta(long[] out, int outPosition, int startIndex, int length, long base) + { + for (int i = 0, indexOffset = (startIndex << 2); i < length; i++, indexOffset += Integer.BYTES) { + out[outPosition + i] = base + buffer.getInt(offset + indexOffset) & 0xFFFFFFFFL; + } } } @@ -601,7 +989,33 @@ public class VSizeLongSerde @Override public long get(int index) { - return buffer.getLong(offset + index * 5) >>> 24; + return buffer.getLong(offset + (index * 5)) >>> 24; + } + + @Override + public void getDelta(long[] out, int outPosition, int startIndex, int length, long base) + { + int i = 0; + final int unpackSize = 5 * Long.BYTES; + for (int indexOffset = startIndex * 5; i + 8 < length; indexOffset += unpackSize) { + final long unpack = buffer.getLong(offset + indexOffset); + final long unpack2 = buffer.getLong(offset + indexOffset + Long.BYTES); + final long unpack3 = buffer.getLong(offset + indexOffset + (2 * Long.BYTES)); + final long unpack4 = buffer.getLong(offset + indexOffset + (3 * Long.BYTES)); + final long unpack5 = buffer.getLong(offset + indexOffset + (4 * Long.BYTES)); + out[outPosition + i++] = base + ((unpack >>> 24) & 0xFFFFFFFFFFL); + out[outPosition + i++] = base + (((unpack & 0xFFFFFFL) << 16) | ((unpack2 >>> 48) & 0xFFFFL)); + out[outPosition + i++] = base + ((unpack2 >>> 8) & 0xFFFFFFFFFFL); + out[outPosition + i++] = base + (((unpack2 & 0xFFL) << 32) | ((unpack3 >>> 32) & 0xFFFFFFFFL)); + out[outPosition + i++] = base + (((unpack3 & 0xFFFFFFFFL) << 8) | ((unpack4 >>> 56) & 0xFFL)); + out[outPosition + i++] = base + ((unpack4 >>> 16) & 0xFFFFFFFFFFL); + out[outPosition + i++] = base + (((unpack4 & 0xFFFFL) << 24) | ((unpack5 >>> 40) & 0xFFFFFFL)); + out[outPosition + i++] = base + (unpack5 & 0xFFFFFFFFFFL); + } + while (i < length) { + out[outPosition + i] = base + get(startIndex + i); + i++; + } } } @@ -619,7 +1033,34 @@ public class VSizeLongSerde @Override public long get(int index) { - return buffer.getLong(offset + index * 6) >>> 16; + return buffer.getLong(offset + (index * 6)) >>> 16; + } + + @Override + public void getDelta(long[] out, int outPosition, int startIndex, int length, long base) + { + int i = 0; + final int unpackSize = 6 * Long.BYTES; + for (int indexOffset = startIndex * 6; i + 8 < length; indexOffset += unpackSize) { + final long unpack = buffer.getLong(offset + indexOffset); + final long unpack2 = buffer.getLong(offset + indexOffset + Long.BYTES); + final long unpack3 = buffer.getLong(offset + indexOffset + (2 * Long.BYTES)); + final long unpack4 = buffer.getLong(offset + indexOffset + (3 * Long.BYTES)); + final long unpack5 = buffer.getLong(offset + indexOffset + (4 * Long.BYTES)); + final long unpack6 = buffer.getLong(offset + indexOffset + (5 * Long.BYTES)); + out[outPosition + i++] = base + ((unpack >>> 16) & 0xFFFFFFFFFFFFL); + out[outPosition + i++] = base + (((unpack & 0xFFFFL) << 32) | ((unpack2 >>> 32) & 0xFFFFFFFFL)); + out[outPosition + i++] = base + (((unpack2 & 0xFFFFFFFFL) << 16) | ((unpack3 >>> 48) & 0xFFFFL)); + out[outPosition + i++] = base + (unpack3 & 0xFFFFFFFFFFFFL); + out[outPosition + i++] = base + ((unpack4 >>> 16) & 0xFFFFFFFFFFFFL); + out[outPosition + i++] = base + (((unpack4 & 0xFFFFL) << 32) | ((unpack5 >>> 32) & 0xFFFFFFFFL)); + out[outPosition + i++] = base + (((unpack5 & 0xFFFFFFFFL) << 16) | ((unpack6 >>> 48) & 0xFFFFL)); + out[outPosition + i++] = base + (unpack6 & 0xFFFFFFFFFFFFL); + } + while (i < length) { + out[outPosition + i] = base + get(startIndex + i); + i++; + } } } @@ -637,7 +1078,35 @@ public class VSizeLongSerde @Override public long get(int index) { - return buffer.getLong(offset + index * 7) >>> 8; + return buffer.getLong(offset + (index * 7)) >>> 8; + } + + @Override + public void getDelta(long[] out, int outPosition, int startIndex, int length, long base) + { + int i = 0; + final int unpackSize = 7 * Long.BYTES; + for (int indexOffset = startIndex * 7; i + 8 < length; indexOffset += unpackSize) { + final long unpack = buffer.getLong(offset + indexOffset); + final long unpack2 = buffer.getLong(offset + indexOffset + Long.BYTES); + final long unpack3 = buffer.getLong(offset + indexOffset + (2 * Long.BYTES)); + final long unpack4 = buffer.getLong(offset + indexOffset + (3 * Long.BYTES)); + final long unpack5 = buffer.getLong(offset + indexOffset + (4 * Long.BYTES)); + final long unpack6 = buffer.getLong(offset + indexOffset + (5 * Long.BYTES)); + final long unpack7 = buffer.getLong(offset + indexOffset + (6 * Long.BYTES)); + out[outPosition + i++] = base + ((unpack >>> 8) & 0xFFFFFFFFFFFFFFL); + out[outPosition + i++] = base + (((unpack & 0xFFL) << 48) | ((unpack2 >>> 16) & 0xFFFFFFFFFFFFL)); + out[outPosition + i++] = base + (((unpack2 & 0xFFFFL) << 40) | ((unpack3 >>> 24) & 0xFFFFFFFFFFL)); + out[outPosition + i++] = base + (((unpack3 & 0xFFFFFFL) << 32) | ((unpack4 >>> 32) & 0xFFFFFFFFL)); + out[outPosition + i++] = base + (((unpack4 & 0xFFFFFFFFL) << 24) | ((unpack5 >>> 40) & 0xFFFFFFL)); + out[outPosition + i++] = base + (((unpack5 & 0xFFFFFFFFFFL) << 16) | ((unpack6 >>> 48) & 0xFFFFL)); + out[outPosition + i++] = base + (((unpack6 & 0xFFFFFFFFFFFFL) << 8) | ((unpack7 >>> 56) & 0xFFL)); + out[outPosition + i++] = base + (unpack7 & 0xFFFFFFFFFFFFFFL); + } + while (i < length) { + out[outPosition + i] = base + get(startIndex + i); + i++; + } } } @@ -657,6 +1126,26 @@ public class VSizeLongSerde { return buffer.getLong(offset + (index << 3)); } - } + @Override + public void getDelta(long[] out, int outPosition, int startIndex, int length, long base) + { + for (int i = 0, indexOffset = (startIndex << 3); i < length; i++, indexOffset += Long.BYTES) { + out[outPosition + i] = base + buffer.getLong(offset + indexOffset); + } + } + + @Override + public int getDelta(long[] out, int outPosition, int[] indexes, int length, int indexOffset, int limit, long base) + { + for (int i = 0; i < length; i++) { + int index = indexes[outPosition + i] - indexOffset; + if (index >= limit) { + return i; + } + out[outPosition + i] = base + buffer.getLong(offset + (index << 3)); + } + return length; + } + } } diff --git a/processing/src/main/java/org/apache/druid/segment/generator/ColumnValueGenerator.java b/processing/src/main/java/org/apache/druid/segment/generator/ColumnValueGenerator.java index fbfc1a9f3c1..1fc09f57be6 100644 --- a/processing/src/main/java/org/apache/druid/segment/generator/ColumnValueGenerator.java +++ b/processing/src/main/java/org/apache/druid/segment/generator/ColumnValueGenerator.java @@ -33,8 +33,9 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Random; +import java.util.function.Supplier; -public class ColumnValueGenerator +public class ColumnValueGenerator implements Supplier { private final GeneratorColumnSchema schema; private final long seed; @@ -224,4 +225,10 @@ public class ColumnValueGenerator ((EnumeratedDistribution) distribution).reseedRandomGenerator(seed); } } + + @Override + public Object get() + { + return generateRowValue(); + } } diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java index 675c49420cb..f0effe78a2d 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java @@ -224,12 +224,18 @@ public class CompressedLongsSerdeTest Assert.assertEquals(vals.length, indexed.size()); // sequential access + long[] vector = new long[256]; int[] indices = new int[vals.length]; for (int i = 0; i < indexed.size(); ++i) { + if (i % 256 == 0) { + indexed.get(vector, i, Math.min(256, indexed.size() - i)); + } Assert.assertEquals(vals[i], indexed.get(i)); + Assert.assertEquals(vals[i], vector[i % 256]); indices[i] = i; } + // random access, limited to 1000 elements for large lists (every element would take too long) IntArrays.shuffle(indices, ThreadLocalRandom.current()); final int limit = Math.min(indexed.size(), 1000); diff --git a/processing/src/test/java/org/apache/druid/segment/data/VSizeLongSerdeTest.java b/processing/src/test/java/org/apache/druid/segment/data/VSizeLongSerdeTest.java index 879077f6091..6168769efd1 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/VSizeLongSerdeTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/VSizeLongSerdeTest.java @@ -20,91 +20,161 @@ package org.apache.druid.segment.data; +import com.google.common.primitives.Ints; +import org.apache.druid.java.util.common.StringUtils; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collection; +import java.util.stream.Collectors; +@RunWith(Enclosed.class) public class VSizeLongSerdeTest { - private ByteBuffer buffer; - private ByteArrayOutputStream outStream; - private ByteBuffer outBuffer; - private final long[] values0 = {0, 1, 1, 0, 1, 1, 1, 1, 0, 0, 1, 1}; - private final long[] values1 = {0, 1, 1, 0, 1, 1, 1, 1, 0, 0, 1, 1}; - private final long[] values2 = {12, 5, 2, 9, 3, 2, 5, 1, 0, 6, 13, 10, 15}; - private final long[] values3 = {1, 1, 1, 1, 1, 11, 11, 11, 11}; - private final long[] values4 = {200, 200, 200, 401, 200, 301, 200, 200, 200, 404, 200, 200, 200, 200}; - private final long[] values5 = {123, 632, 12, 39, 536, 0, 1023, 52, 777, 526, 214, 562, 823, 346}; - private final long[] values6 = {1000000, 1000001, 1000002, 1000003, 1000004, 1000005, 1000006, 1000007, 1000008}; - - @Before - public void setUp() + @RunWith(Parameterized.class) + public static class EveryLittleBitTest { - outStream = new ByteArrayOutputStream(); - outBuffer = ByteBuffer.allocate(500000); - } + private final int numBits; - @Test - public void testGetBitsForMax() - { - Assert.assertEquals(1, VSizeLongSerde.getBitsForMax(1)); - Assert.assertEquals(1, VSizeLongSerde.getBitsForMax(2)); - Assert.assertEquals(2, VSizeLongSerde.getBitsForMax(3)); - Assert.assertEquals(4, VSizeLongSerde.getBitsForMax(16)); - Assert.assertEquals(8, VSizeLongSerde.getBitsForMax(200)); - Assert.assertEquals(12, VSizeLongSerde.getBitsForMax(999)); - Assert.assertEquals(24, VSizeLongSerde.getBitsForMax(12345678)); - Assert.assertEquals(32, VSizeLongSerde.getBitsForMax(Integer.MAX_VALUE)); - Assert.assertEquals(64, VSizeLongSerde.getBitsForMax(Long.MAX_VALUE)); - } + public EveryLittleBitTest(int numBits) + { + this.numBits = numBits; + } - @Test - public void testSerdeValues() throws IOException - { - for (int i : VSizeLongSerde.SUPPORTED_SIZES) { - testSerde(i, values0); - if (i >= 1) { - testSerde(i, values1); + @Parameterized.Parameters(name = "numBits={0}") + public static Collection data() + { + return Arrays.stream(VSizeLongSerde.SUPPORTED_SIZES) + .mapToObj(value -> new Object[]{value}) + .collect(Collectors.toList()); + } + + @Test + public void testEveryPowerOfTwo() throws IOException + { + // Test every long that has a single bit set. + + final int numLongs = Math.min(64, numBits); + final long[] longs = new long[numLongs]; + + for (int bit = 0; bit < numLongs; bit++) { + longs[bit] = 1L << bit; } - if (i >= 4) { - testSerde(i, values2); - testSerde(i, values3); - } - if (i >= 9) { - testSerde(i, values4); - } - if (i >= 10) { - testSerde(i, values5); - } - if (i >= 20) { - testSerde(i, values6); + + testSerde(numBits, longs); + } + + @Test + public void testEveryPowerOfTwoMinusOne() throws IOException + { + // Test every long with runs of low bits set. + + final int numLongs = Math.min(64, numBits + 1); + final long[] longs = new long[numLongs]; + + for (int bit = 0; bit < numLongs; bit++) { + longs[bit] = (1L << bit) - 1; } + + testSerde(numBits, longs); } } - @Test - public void testSerdeLoop() throws IOException + public static class SpecificValuesTest { - for (int i : VSizeLongSerde.SUPPORTED_SIZES) { - if (i >= 8) { - testSerdeIncLoop(i, 0, 256); + private final long[] values0 = {0, 1, 1, 0, 1, 1, 1, 1, 0, 0, 1, 1}; + private final long[] values1 = {0, 1, 1, 0, 1, 1, 1, 1, 0, 0, 1, 1}; + private final long[] values2 = {12, 5, 2, 9, 3, 2, 5, 1, 0, 6, 13, 10, 15}; + private final long[] values3 = {1, 1, 1, 1, 1, 11, 11, 11, 11}; + private final long[] values4 = {200, 200, 200, 401, 200, 301, 200, 200, 200, 404, 200, 200, 200, 200}; + private final long[] values5 = {123, 632, 12, 39, 536, 0, 1023, 52, 777, 526, 214, 562, 823, 346}; + private final long[] values6 = {1000000, 1000001, 1000002, 1000003, 1000004, 1000005, 1000006, 1000007, 1000008}; + + @Test + public void testGetBitsForMax() + { + Assert.assertEquals(1, VSizeLongSerde.getBitsForMax(1)); + Assert.assertEquals(1, VSizeLongSerde.getBitsForMax(2)); + Assert.assertEquals(2, VSizeLongSerde.getBitsForMax(3)); + Assert.assertEquals(4, VSizeLongSerde.getBitsForMax(16)); + Assert.assertEquals(8, VSizeLongSerde.getBitsForMax(200)); + Assert.assertEquals(12, VSizeLongSerde.getBitsForMax(999)); + Assert.assertEquals(24, VSizeLongSerde.getBitsForMax(12345678)); + Assert.assertEquals(32, VSizeLongSerde.getBitsForMax(Integer.MAX_VALUE)); + Assert.assertEquals(64, VSizeLongSerde.getBitsForMax(Long.MAX_VALUE)); + } + + @Test + public void testSerdeValues() throws IOException + { + for (int i : VSizeLongSerde.SUPPORTED_SIZES) { + testSerde(i, values0); + if (i >= 1) { + testSerde(i, values1); + } + if (i >= 4) { + testSerde(i, values2); + testSerde(i, values3); + } + if (i >= 9) { + testSerde(i, values4); + } + if (i >= 10) { + testSerde(i, values5); + } + if (i >= 20) { + testSerde(i, values6); + } } - if (i >= 16) { - testSerdeIncLoop(i, 0, 50000); + } + + @Test + public void testSerdeLoop() throws IOException + { + final long[] zeroTo256 = generateSequentialLongs(0, 256); + final long[] zeroTo50000 = generateSequentialLongs(0, 50000); + + for (int i : VSizeLongSerde.SUPPORTED_SIZES) { + if (i >= 8) { + testSerde(i, zeroTo256); + } + if (i >= 16) { + testSerde(i, zeroTo50000); + } } } + + private long[] generateSequentialLongs(final long start, final long end) + { + final long[] values = new long[Ints.checkedCast(end - start)]; + + for (int i = 0; i < values.length; i++) { + values[i] = start + i; + } + + return values; + } } - public void testSerde(int longSize, long[] values) throws IOException + public static void testSerde(int numBits, long[] values) throws IOException { - outBuffer.rewind(); - outStream.reset(); - VSizeLongSerde.LongSerializer streamSer = VSizeLongSerde.getSerializer(longSize, outStream); - VSizeLongSerde.LongSerializer bufferSer = VSizeLongSerde.getSerializer(longSize, outBuffer, 0); + final int bufferOffset = 1; + final ByteArrayOutputStream outStream = new ByteArrayOutputStream(); + outStream.write(0xAF); // Dummy byte so the real stuff starts at bufferOffset + + final ByteBuffer buffer = + ByteBuffer.allocate(VSizeLongSerde.getSerializedSize(numBits, values.length) + bufferOffset); + buffer.rewind(); + buffer.put(0, (byte) 0xAF); // Dummy byte again. + VSizeLongSerde.LongSerializer streamSer = VSizeLongSerde.getSerializer(numBits, outStream); + VSizeLongSerde.LongSerializer bufferSer = VSizeLongSerde.getSerializer(numBits, buffer, bufferOffset); for (long value : values) { streamSer.write(value); bufferSer.write(value); @@ -112,40 +182,190 @@ public class VSizeLongSerdeTest streamSer.close(); bufferSer.close(); - buffer = ByteBuffer.wrap(outStream.toByteArray()); - Assert.assertEquals(VSizeLongSerde.getSerializedSize(longSize, values.length), buffer.capacity()); - Assert.assertEquals(VSizeLongSerde.getSerializedSize(longSize, values.length), outBuffer.position()); - VSizeLongSerde.LongDeserializer streamDes = VSizeLongSerde.getDeserializer(longSize, buffer, 0); - VSizeLongSerde.LongDeserializer bufferDes = VSizeLongSerde.getDeserializer(longSize, outBuffer, 0); - for (int i = 0; i < values.length; i++) { - Assert.assertEquals(values[i], streamDes.get(i)); - Assert.assertEquals(values[i], bufferDes.get(i)); - } + // Verify serialized sizes. + final ByteBuffer bufferFromStream = ByteBuffer.wrap(outStream.toByteArray()); + Assert.assertEquals( + StringUtils.format("Serialized size (stream, numBits = %d)", numBits), + VSizeLongSerde.getSerializedSize(numBits, values.length), + bufferFromStream.capacity() - bufferOffset + ); + Assert.assertEquals( + StringUtils.format("Serialized size (buffer, numBits = %d)", numBits), + VSizeLongSerde.getSerializedSize(numBits, values.length), + buffer.position() - bufferOffset + ); + + // Verify the actual serialized contents. + Assert.assertArrayEquals( + StringUtils.format("Stream and buffer serialized images are equal (numBits = %d)", numBits), + bufferFromStream.array(), + buffer.array() + ); + + // Verify deserialization. We know the two serialized buffers are equal, so from this point on, just use one. + VSizeLongSerde.LongDeserializer deserializer = VSizeLongSerde.getDeserializer(numBits, buffer, bufferOffset); + + testGetSingleRow(deserializer, numBits, values); + testContiguousGetSingleRow(deserializer, numBits, values); + testContiguousGetWholeRegion(deserializer, numBits, values); + testNoncontiguousGetSingleRow(deserializer, numBits, values); + testNoncontiguousGetEveryOtherValue(deserializer, numBits, values); + testNoncontiguousGetEveryOtherValueWithLimit(deserializer, numBits, values); } - public void testSerdeIncLoop(int longSize, long start, long end) throws IOException + private static void testGetSingleRow( + final VSizeLongSerde.LongDeserializer deserializer, + final int numBits, + final long[] values + ) { - outBuffer.rewind(); - outStream.reset(); - VSizeLongSerde.LongSerializer streamSer = VSizeLongSerde.getSerializer(longSize, outStream); - VSizeLongSerde.LongSerializer bufferSer = VSizeLongSerde.getSerializer(longSize, outBuffer, 0); - for (long i = start; i < end; i++) { - streamSer.write(i); - bufferSer.write(i); - } - streamSer.close(); - bufferSer.close(); - - buffer = ByteBuffer.wrap(outStream.toByteArray()); - Assert.assertEquals(VSizeLongSerde.getSerializedSize(longSize, (int) (end - start)), buffer.capacity()); - Assert.assertEquals(VSizeLongSerde.getSerializedSize(longSize, (int) (end - start)), outBuffer.position()); - VSizeLongSerde.LongDeserializer streamDes = VSizeLongSerde.getDeserializer(longSize, buffer, 0); - VSizeLongSerde.LongDeserializer bufferDes = VSizeLongSerde.getDeserializer(longSize, outBuffer, 0); - for (int i = 0; i < end - start; i++) { - Assert.assertEquals(start + i, streamDes.get(i)); - Assert.assertEquals(start + i, bufferDes.get(i)); + for (int i = 0; i < values.length; i++) { + Assert.assertEquals( + StringUtils.format("Deserializer (testGetSingleRow, numBits = %d, position = %d)", numBits, i), + values[i], + deserializer.get(i) + ); } } + private static void testContiguousGetSingleRow( + final VSizeLongSerde.LongDeserializer deserializer, + final int numBits, + final long[] values + ) + { + final int outPosition = 1; + final long[] out = new long[values.length + outPosition]; + for (int i = 0; i < values.length; i++) { + + Arrays.fill(out, -1); + deserializer.getDelta(out, outPosition, i, 1, 0); + + Assert.assertEquals( + StringUtils.format("Deserializer (testContiguousGetSingleRow, numBits = %d, position = %d)", numBits, i), + values[i], + out[outPosition] + ); + } + } + + private static void testContiguousGetWholeRegion( + final VSizeLongSerde.LongDeserializer deserializer, + final int numBits, + final long[] values + ) + { + final int outPosition = 1; + final long[] out = new long[values.length + outPosition]; + Arrays.fill(out, -1); + deserializer.getDelta(out, outPosition, 0, values.length, 0); + + Assert.assertArrayEquals( + StringUtils.format("Deserializer (testContiguousGetWholeRegion, numBits = %d)", numBits), + values, + Arrays.stream(out).skip(outPosition).toArray() + ); + } + + private static void testNoncontiguousGetSingleRow( + final VSizeLongSerde.LongDeserializer deserializer, + final int numBits, + final long[] values + ) + { + final int indexOffset = 1; + final int outPosition = 1; + final long[] out = new long[values.length + outPosition]; + final int[] indexes = new int[values.length + outPosition]; + + for (int i = 0; i < values.length; i++) { + Arrays.fill(out, -1); + Arrays.fill(indexes, -1); + indexes[outPosition] = i + indexOffset; + + deserializer.getDelta(out, outPosition, indexes, 1, indexOffset, values.length, 0); + + Assert.assertEquals( + StringUtils.format("Deserializer (testNoncontiguousGetSingleRow, numBits = %d, position = %d)", numBits, i), + values[i], + out[outPosition] + ); + } + } + + private static void testNoncontiguousGetEveryOtherValue( + final VSizeLongSerde.LongDeserializer deserializer, + final int numBits, + final long[] values + ) + { + final int indexOffset = 1; + final int outPosition = 1; + final long[] out = new long[values.length + outPosition]; + final long[] expectedOut = new long[values.length + outPosition]; + final int[] indexes = new int[values.length + outPosition]; + + Arrays.fill(out, -1); + Arrays.fill(expectedOut, -1); + Arrays.fill(indexes, -1); + + int cnt = 0; + for (int i = 0; i < values.length; i++) { + if (i % 2 == 0) { + indexes[outPosition + i / 2] = i + indexOffset; + expectedOut[outPosition + i / 2] = values[i]; + cnt++; + } + } + + deserializer.getDelta(out, outPosition, indexes, cnt, indexOffset, values.length, 0); + + Assert.assertArrayEquals( + StringUtils.format("Deserializer (testNoncontiguousGetEveryOtherValue, numBits = %d)", numBits), + expectedOut, + out + ); + } + + private static void testNoncontiguousGetEveryOtherValueWithLimit( + final VSizeLongSerde.LongDeserializer deserializer, + final int numBits, + final long[] values + ) + { + final int indexOffset = 1; + final int outPosition = 1; + final long[] out = new long[values.length + outPosition]; + final long[] expectedOut = new long[values.length + outPosition]; + final int[] indexes = new int[values.length + outPosition]; + final int limit = values.length - 2; // Don't do the last value + + Arrays.fill(out, -1); + Arrays.fill(expectedOut, -1); + Arrays.fill(indexes, -1); + + int cnt = 0; + for (int i = 0; i < values.length; i++) { + if (i % 2 == 0) { + indexes[outPosition + i / 2] = i + indexOffset; + + if (i < limit) { + expectedOut[outPosition + i / 2] = values[i]; + } + + cnt++; + } + } + + final int ret = deserializer.getDelta(out, outPosition, indexes, cnt, indexOffset, limit, 0); + + Assert.assertArrayEquals( + StringUtils.format("Deserializer (testNoncontiguousGetEveryOtherValue, numBits = %d)", numBits), + expectedOut, + out + ); + + Assert.assertEquals(Math.max(0, cnt - 1), ret); + } }