vectorize 'auto' long decoding (#11004)

* Vectorize LongDeserializers.

Also, add many more tests.

* more faster

* more more faster

* more cleanup

* fixes

* forbidden

* benchmark style

* idk why

* adjust

* add preconditions for value >= 0 for writers

* add 64 bit exception

Co-authored-by: Gian Merlino <gian@imply.io>
This commit is contained in:
Clint Wylie 2021-03-26 18:39:13 -07:00 committed by GitHub
parent e2c4466fbd
commit c0e6d1c7f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 2300 additions and 149 deletions

View File

@ -0,0 +1,211 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.benchmark.compression;
import org.apache.druid.collections.bitmap.WrappedImmutableRoaringBitmap;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.segment.data.ColumnarLongs;
import org.apache.druid.segment.data.ColumnarLongsSerializer;
import org.apache.druid.segment.data.CompressedColumnarLongsSupplier;
import org.apache.druid.segment.data.CompressionFactory;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.vector.BitmapVectorOffset;
import org.apache.druid.segment.vector.NoFilterVectorOffset;
import org.apache.druid.segment.vector.VectorOffset;
import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMedium;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.util.BitSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
@State(Scope.Benchmark)
public class BaseColumnarLongsBenchmark
{
static final int VECTOR_SIZE = 512;
/**
* Name of the long encoding strategy. For longs, this is a composite of both byte level block compression and
* encoding of values within the block.
*/
@Param({
"lz4-longs",
"lz4-auto"
})
String encoding;
Random rand = new Random(0);
long[] vals;
long minValue;
long maxValue;
@Nullable
BitSet filter;
VectorOffset vectorOffset;
void setupFilters(int rows, double filteredRowCountPercentage)
{
// todo: filter set distributions to simulate different select patterns?
// (because benchmarks don't take long enough already..)
filter = null;
final int filteredRowCount = (int) Math.floor(rows * filteredRowCountPercentage);
if (filteredRowCount < rows) {
// setup bitset filter
filter = new BitSet();
MutableRoaringBitmap bitmap = new MutableRoaringBitmap();
for (int i = 0; i < filteredRowCount; i++) {
int rowToAccess = rand.nextInt(rows);
// Skip already selected rows if any
while (filter.get(rowToAccess)) {
rowToAccess = rand.nextInt(rows);
}
filter.set(rowToAccess);
bitmap.add(rowToAccess);
}
vectorOffset = new BitmapVectorOffset(
VECTOR_SIZE,
new WrappedImmutableRoaringBitmap(bitmap.toImmutableRoaringBitmap()),
0,
rows
);
} else {
vectorOffset = new NoFilterVectorOffset(VECTOR_SIZE, 0, rows);
}
}
static int encodeToFile(long[] vals, String encoding, FileChannel output)throws IOException
{
SegmentWriteOutMedium writeOutMedium = new OnHeapMemorySegmentWriteOutMedium();
ColumnarLongsSerializer serializer;
switch (encoding) {
case "lz4-longs":
serializer = CompressionFactory.getLongSerializer(
encoding,
writeOutMedium,
"lz4-longs",
ByteOrder.LITTLE_ENDIAN,
CompressionFactory.LongEncodingStrategy.LONGS,
CompressionStrategy.LZ4
);
break;
case "lz4-auto":
serializer = CompressionFactory.getLongSerializer(
encoding,
writeOutMedium,
"lz4-auto",
ByteOrder.LITTLE_ENDIAN,
CompressionFactory.LongEncodingStrategy.AUTO,
CompressionStrategy.LZ4
);
break;
case "none-longs":
serializer = CompressionFactory.getLongSerializer(
encoding,
writeOutMedium,
"none-longs",
ByteOrder.LITTLE_ENDIAN,
CompressionFactory.LongEncodingStrategy.LONGS,
CompressionStrategy.NONE
);
break;
case "none-auto":
serializer = CompressionFactory.getLongSerializer(
encoding,
writeOutMedium,
"none-auto",
ByteOrder.LITTLE_ENDIAN,
CompressionFactory.LongEncodingStrategy.AUTO,
CompressionStrategy.NONE
);
break;
default:
throw new RuntimeException("unknown encoding");
}
serializer.open();
for (long val : vals) {
serializer.add(val);
}
serializer.writeTo(output, null);
return (int) serializer.getSerializedSize();
}
static ColumnarLongs createColumnarLongs(String encoding, ByteBuffer buffer)
{
switch (encoding) {
case "lz4-longs":
case "lz4-auto":
case "none-auto":
case "none-longs":
return CompressedColumnarLongsSupplier.fromByteBuffer(buffer, ByteOrder.LITTLE_ENDIAN).get();
}
throw new IllegalArgumentException("unknown encoding");
}
// for testing encodings: validate that all encoders read the same values
// noinspection unused
static void checkSanity(Map<String, ColumnarLongs> encoders, List<String> encodings, int rows)
{
for (int i = 0; i < rows; i++) {
checkRowSanity(encoders, encodings, i);
}
}
static void checkRowSanity(Map<String, ColumnarLongs> encoders, List<String> encodings, int row)
{
if (encodings.size() > 1) {
for (int i = 0; i < encodings.size() - 1; i++) {
String currentKey = encodings.get(i);
String nextKey = encodings.get(i + 1);
ColumnarLongs current = encoders.get(currentKey);
ColumnarLongs next = encoders.get(nextKey);
long vCurrent = current.get(row);
long vNext = next.get(row);
if (vCurrent != vNext) {
throw new RE(
"values do not match at row %s - %s:%s %s:%s",
row,
currentKey,
vCurrent,
nextKey,
vNext
);
}
}
}
}
}

View File

@ -0,0 +1,395 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.benchmark.compression;
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.generator.ColumnValueGenerator;
import org.apache.druid.segment.generator.GeneratorColumnSchema;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
import java.io.File;
import java.util.List;
@State(Scope.Benchmark)
public class BaseColumnarLongsFromGeneratorBenchmark extends BaseColumnarLongsBenchmark
{
static int SEED = 1;
/**
* Controls the probability that any generated value will be a zero, to simulate sparely populated columns
*/
@Param({
"0.0",
"0.25",
"0.5",
"0.75",
"0.95"
})
double zeroProbability;
/**
* Number of rows generated for the value distribution
*/
@Param({"5000000"})
int rows;
/**
* Value distributions to simulate various patterns of long column
*/
@Param({
"enumerated-0-1",
"enumerated-full",
"normal-1-32",
"normal-40-1000",
"sequential-1000",
"sequential-unique",
"uniform-1",
"uniform-2",
"uniform-3",
"uniform-4",
"uniform-8",
"uniform-12",
"uniform-16",
"uniform-20",
"uniform-24",
"uinform-32",
"uniform-40",
"uniform-48",
"uniform-56",
"uniform-64",
"zipf-low-100",
"zipf-low-100000",
"zipf-low-32-bit",
"zipf-high-100",
"zipf-high-100000",
"zipf-high-32-bit"
})
String distribution;
static ColumnValueGenerator makeGenerator(
String distribution,
int rows,
double zeroProbability
)
{
List<Object> enumerated;
List<Double> probability;
switch (distribution) {
case "enumerated-0-1":
enumerated = ImmutableList.of(0, 1);
probability = ImmutableList.of(0.6, 0.4);
return GeneratorColumnSchema.makeEnumerated(
distribution,
ValueType.LONG,
true,
1,
zeroProbability,
enumerated,
probability
).makeGenerator(SEED);
case "enumerated-full":
enumerated = ImmutableList.of(
0,
1,
Long.MAX_VALUE - 1,
Long.MIN_VALUE + 1,
Long.MIN_VALUE / 2,
Long.MAX_VALUE / 2
);
probability = ImmutableList.of(0.4, 0.2, 0.1, 0.1, 0.1, 0.1);
return GeneratorColumnSchema.makeEnumerated(
distribution,
ValueType.LONG,
true,
1,
zeroProbability,
enumerated,
probability
).makeGenerator(SEED);
case "normal-1-32":
return GeneratorColumnSchema.makeNormal(
distribution,
ValueType.LONG,
true,
1,
zeroProbability,
1.0,
(double) (1L << 32),
true
).makeGenerator(SEED);
case "normal-40-1000":
return GeneratorColumnSchema.makeNormal(
distribution,
ValueType.LONG,
true,
1,
zeroProbability,
(double) (1L << 40),
1000.0,
true
).makeGenerator(SEED);
case "sequential-1000":
return GeneratorColumnSchema.makeSequential(
distribution,
ValueType.LONG,
true,
1,
zeroProbability,
Integer.MAX_VALUE - 1001,
Integer.MAX_VALUE - 1
).makeGenerator(SEED);
case "sequential-unique":
return GeneratorColumnSchema.makeSequential(
distribution,
ValueType.LONG,
true,
1,
zeroProbability,
0,
rows
).makeGenerator(SEED);
case "uniform-1":
return GeneratorColumnSchema.makeDiscreteUniform(
distribution,
ValueType.LONG,
true,
1,
zeroProbability,
0,
1
).makeGenerator(SEED);
case "uniform-2":
return GeneratorColumnSchema.makeDiscreteUniform(
distribution,
ValueType.LONG,
true,
1,
zeroProbability,
0,
4
).makeGenerator(SEED);
case "uniform-3":
return GeneratorColumnSchema.makeDiscreteUniform(
distribution,
ValueType.LONG,
true,
1,
zeroProbability,
1000000,
1000008
).makeGenerator(SEED);
case "uniform-4":
return GeneratorColumnSchema.makeDiscreteUniform(
distribution,
ValueType.LONG,
true,
1,
zeroProbability,
0,
1 << 4
).makeGenerator(SEED);
case "uniform-8":
return GeneratorColumnSchema.makeDiscreteUniform(
distribution,
ValueType.LONG,
true,
1,
zeroProbability,
0,
1 << 8
).makeGenerator(SEED);
case "uniform-12":
return GeneratorColumnSchema.makeDiscreteUniform(
distribution,
ValueType.LONG,
true,
1,
zeroProbability,
0,
1 << 12
).makeGenerator(SEED);
case "uniform-16":
return GeneratorColumnSchema.makeDiscreteUniform(
distribution,
ValueType.LONG,
true,
1,
zeroProbability,
0,
1 << 16
).makeGenerator(SEED);
case "uniform-20":
return GeneratorColumnSchema.makeContinuousUniform(
distribution,
ValueType.LONG,
true,
1,
zeroProbability,
0,
1 << 20
).makeGenerator(SEED);
case "uniform-24":
return GeneratorColumnSchema.makeContinuousUniform(
distribution,
ValueType.LONG,
true,
1,
zeroProbability,
0,
(1 << 24) - 1
).makeGenerator(SEED);
case "uinform-32":
return GeneratorColumnSchema.makeContinuousUniform(
distribution,
ValueType.LONG,
true,
1,
zeroProbability,
0,
Integer.MAX_VALUE - 1
).makeGenerator(SEED);
case "uniform-40":
return GeneratorColumnSchema.makeContinuousUniform(
distribution,
ValueType.LONG,
true,
1,
zeroProbability,
0L,
(1L << 40) - 1
).makeGenerator(SEED);
case "uniform-48":
return GeneratorColumnSchema.makeContinuousUniform(
distribution,
ValueType.LONG,
true,
1,
zeroProbability,
0,
(1L << 48) - 1
).makeGenerator(SEED);
case "uniform-56":
return GeneratorColumnSchema.makeContinuousUniform(
distribution,
ValueType.LONG,
true,
1,
zeroProbability,
0,
(1L << 56 - 1)
).makeGenerator(SEED);
case "uniform-64":
return GeneratorColumnSchema.makeContinuousUniform(
distribution,
ValueType.LONG,
true,
1,
zeroProbability,
0,
Long.MAX_VALUE - 1
).makeGenerator(SEED);
case "zipf-low-100":
return GeneratorColumnSchema.makeLazyZipf(
distribution,
ValueType.LONG,
true,
1,
zeroProbability,
0,
100,
1d
).makeGenerator(SEED);
case "zipf-low-100000":
return GeneratorColumnSchema.makeLazyZipf(
distribution,
ValueType.LONG,
true,
1,
zeroProbability,
-50000,
50000,
1d
).makeGenerator(SEED);
case "zipf-low-32-bit":
return GeneratorColumnSchema.makeLazyZipf(
distribution,
ValueType.LONG,
true,
1,
0d,
0,
Integer.MAX_VALUE,
1d
).makeGenerator(SEED);
case "zipf-high-100":
return GeneratorColumnSchema.makeLazyZipf(
distribution,
ValueType.LONG,
true,
1,
zeroProbability,
0,
100,
3d
).makeGenerator(SEED);
case "zipf-high-100000":
return GeneratorColumnSchema.makeLazyZipf(
distribution,
ValueType.LONG,
true,
1,
zeroProbability,
-50000,
50000,
3d
).makeGenerator(SEED);
case "zipf-high-32-bit":
return GeneratorColumnSchema.makeLazyZipf(
distribution,
ValueType.LONG,
true,
1,
0d,
0,
Integer.MAX_VALUE,
3d
).makeGenerator(SEED);
}
throw new IllegalArgumentException("unknown distribution");
}
static String getGeneratorEncodedFilename(String encoding, String distribution, int rows, double nullProbability)
{
return StringUtils.format("%s-%s-%s-%s.bin", encoding, distribution, rows, nullProbability);
}
static File getTmpDir()
{
final String dirPath = "tmp/encoding/longs/";
File dir = new File(dirPath);
dir.mkdirs();
return dir;
}
}

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.benchmark.compression;
import org.apache.druid.java.util.common.StringUtils;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
import java.io.File;
@State(Scope.Benchmark)
public class BaseColumnarLongsFromSegmentsBenchmark extends BaseColumnarLongsBenchmark
{
/**
* Long columns to read from the segment file specified by {@link #segmentPath}
*/
@Param({
"__time",
"followers",
"friends",
"max_followers",
"max_retweets",
"max_statuses",
"retweets",
"statuses",
"tweets"
})
String columnName;
/**
* Number of rows in the segment. This should actually match the number of rows specified in {@link #segmentPath}. If
* it is smaller than only this many rows will be read, if larger then the benchmark will explode trying to read more
* data than exists rows.
*
* This is a hassle, but ensures that the row count ends up in the output measurements.
*/
@Param({"3259585"})
int rows;
/**
* Path to a segment file to read long columns from. This shouldn't really be used as a parameter, but is nice to
* be included in the output measurements.
*
* This is BYO segment, as this file doesn't probably exist for you, replace it and other parameters with the segment
* to test.
*/
@Param({"tmp/segments/twitter-ticker-1/"})
String segmentPath;
/**
* Friendly name of the segment. Like {@link #segmentPath}, this shouldn't really be used as a parameter, but is also
* nice to be included in the output measurements.
*/
@Param({"twitter-ticker"})
String segmentName;
String getColumnEncodedFileName(String encoding, String segmentName, String columnName)
{
return StringUtils.format("%s-%s-longs-%s.bin", encoding, segmentName, columnName);
}
File getTmpDir()
{
final String dirPath = StringUtils.format("tmp/encoding/%s", segmentName);
File dir = new File(dirPath);
dir.mkdirs();
return dir;
}
}

View File

@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.benchmark.compression;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.generator.ColumnValueGenerator;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.results.format.ResultFormatType;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.Writer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 1)
@Measurement(iterations = 2)
public class ColumnarLongsEncodeDataFromGeneratorBenchmark extends BaseColumnarLongsFromGeneratorBenchmark
{
@Setup
public void setup() throws Exception
{
vals = new long[rows];
final String filename = getGeneratorValueFilename(distribution, rows, zeroProbability);
File dir = getTmpDir();
File dataFile = new File(dir, filename);
if (dataFile.exists()) {
System.out.println("Data files already exist, re-using");
try (BufferedReader br = Files.newBufferedReader(dataFile.toPath(), StandardCharsets.UTF_8)) {
int lineNum = 0;
String line;
while ((line = br.readLine()) != null) {
vals[lineNum] = Long.parseLong(line);
if (vals[lineNum] < minValue) {
minValue = vals[lineNum];
}
if (vals[lineNum] > maxValue) {
maxValue = vals[lineNum];
}
lineNum++;
}
}
} else {
try (Writer writer = Files.newBufferedWriter(dataFile.toPath(), StandardCharsets.UTF_8)) {
ColumnValueGenerator valueGenerator = makeGenerator(distribution, rows, zeroProbability);
for (int i = 0; i < rows; i++) {
long value;
Object rowValue = valueGenerator.generateRowValue();
value = rowValue != null ? (long) rowValue : 0;
vals[i] = value;
if (vals[i] < minValue) {
minValue = vals[i];
}
if (vals[i] > maxValue) {
maxValue = vals[i];
}
writer.write(vals[i] + "\n");
}
}
}
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void encodeColumn(Blackhole blackhole) throws IOException
{
File dir = getTmpDir();
File columnDataFile = new File(dir, getGeneratorEncodedFilename(encoding, distribution, rows, zeroProbability));
columnDataFile.delete();
FileChannel output =
FileChannel.open(columnDataFile.toPath(), StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
int size = encodeToFile(vals, encoding, output);
EncodingSizeProfiler.encodedSize = size;
blackhole.consume(size);
output.close();
}
public static void main(String[] args) throws RunnerException
{
Options opt = new OptionsBuilder()
.include(ColumnarLongsEncodeDataFromGeneratorBenchmark.class.getSimpleName())
.addProfiler(EncodingSizeProfiler.class)
.resultFormat(ResultFormatType.CSV)
.result("column-longs-encode-speed.csv")
.build();
new Runner(opt).run();
}
private static String getGeneratorValueFilename(String distribution, int rows, double nullProbability)
{
return StringUtils.format("values-%s-%s-%s.bin", distribution, rows, nullProbability);
}
}

View File

@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.benchmark.compression;
import com.google.common.collect.Iterables;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.LongsColumn;
import org.apache.druid.segment.column.ValueType;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.results.format.ResultFormatType;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.Writer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 1)
@Measurement(iterations = 1)
public class ColumnarLongsEncodeDataFromSegmentBenchmark extends BaseColumnarLongsFromSegmentsBenchmark
{
@Setup
public void setup() throws Exception
{
initializeSegmentValueIntermediaryFile();
File dir = getTmpDir();
File dataFile = new File(dir, getColumnDataFileName(segmentName, columnName));
List<Long> values = new ArrayList<>();
try (BufferedReader br = Files.newBufferedReader(dataFile.toPath(), StandardCharsets.UTF_8)) {
String line;
while ((line = br.readLine()) != null) {
long value = Long.parseLong(line);
if (value < minValue) {
minValue = value;
}
if (value > maxValue) {
maxValue = value;
}
values.add(value);
rows++;
}
}
vals = values.stream().mapToLong(i -> i).toArray();
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void encodeColumn(Blackhole blackhole) throws IOException
{
File dir = getTmpDir();
File columnDataFile = new File(dir, getColumnEncodedFileName(encoding, segmentName, columnName));
columnDataFile.delete();
FileChannel output =
FileChannel.open(columnDataFile.toPath(), StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
int size = BaseColumnarLongsBenchmark.encodeToFile(vals, encoding, output);
EncodingSizeProfiler.encodedSize = size;
blackhole.consume(size);
output.close();
}
/**
* writes column values to an intermediary text file, 1 per line, encoders read from this file as input to write
* encoded column files.
*/
private void initializeSegmentValueIntermediaryFile() throws IOException
{
File dir = getTmpDir();
File dataFile = new File(dir, getColumnDataFileName(segmentName, columnName));
if (!dataFile.exists()) {
final IndexIO indexIO = new IndexIO(
new DefaultObjectMapper(),
() -> 0
);
try (final QueryableIndex index = indexIO.loadIndex(new File(segmentPath))) {
final Set<String> columnNames = new LinkedHashSet<>();
columnNames.add(ColumnHolder.TIME_COLUMN_NAME);
Iterables.addAll(columnNames, index.getColumnNames());
final ColumnHolder column = index.getColumnHolder(columnName);
final ColumnCapabilities capabilities = column.getCapabilities();
final ValueType columnType = capabilities.getType();
try (Writer writer = Files.newBufferedWriter(dataFile.toPath(), StandardCharsets.UTF_8)) {
if (columnType != ValueType.LONG) {
throw new RuntimeException("Invalid column type, expected 'Long'");
}
LongsColumn theColumn = (LongsColumn) column.getColumn();
for (int i = 0; i < theColumn.length(); i++) {
long value = theColumn.getLongSingleValueRow(i);
writer.write(value + "\n");
}
}
}
}
}
private String getColumnDataFileName(String segmentName, String columnName)
{
return StringUtils.format("%s-longs-%s.txt", segmentName, columnName);
}
public static void main(String[] args) throws RunnerException
{
System.out.println("main happened");
Options opt = new OptionsBuilder()
.include(ColumnarLongsEncodeDataFromSegmentBenchmark.class.getSimpleName())
.addProfiler(EncodingSizeProfiler.class)
.resultFormat(ResultFormatType.CSV)
.result("column-longs-encode-speed-segments.csv")
.build();
new Runner(opt).run();
}
}

View File

@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.benchmark.compression;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.segment.data.ColumnarLongs;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.results.format.ResultFormatType;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 3)
@Measurement(iterations = 5)
public class ColumnarLongsSelectRowsFromGeneratorBenchmark extends BaseColumnarLongsFromGeneratorBenchmark
{
private Map<String, ColumnarLongs> decoders;
private Map<String, Integer> encodedSize;
/**
* Number of rows to read, the test will randomly set positions in a simulated offset of the specified density in
* {@link #setupFilters(int, double)}
*/
@Param({
"0.1",
"0.25",
"0.5",
"0.75",
"0.95",
"1.0"
})
private double filteredRowCountPercentage;
@Setup
public void setup() throws IOException
{
decoders = new HashMap<>();
encodedSize = new HashMap<>();
setupFromFile(encoding);
setupFilters(rows, filteredRowCountPercentage);
// uncomment this block to run sanity check to ensure all specified encodings produce the same set of results
//CHECKSTYLE.OFF: Regexp
// ImmutableList<String> all = ImmutableList.of("lz4-longs", "lz4-auto");
// for (String _enc : all) {
// if (!_enc.equalsIgnoreCase(encoding)) {
// setupFromFile(_enc);
// }
// }
//
// checkSanity(decoders, all, rows);
//CHECKSTYLE.ON: Regexp
}
@TearDown
public void teardown()
{
for (ColumnarLongs longs : decoders.values()) {
longs.close();
}
}
private void setupFromFile(String encoding) throws IOException
{
File dir = getTmpDir();
File compFile = new File(dir, getGeneratorEncodedFilename(encoding, distribution, rows, zeroProbability));
ByteBuffer buffer = FileUtils.map(compFile).get();
int size = (int) compFile.length();
encodedSize.put(encoding, size);
ColumnarLongs data = createColumnarLongs(encoding, buffer);
decoders.put(encoding, data);
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void selectRows(Blackhole blackhole)
{
EncodingSizeProfiler.encodedSize = encodedSize.get(encoding);
ColumnarLongs encoder = decoders.get(encoding);
if (filter == null) {
for (int i = 0; i < rows; i++) {
blackhole.consume(encoder.get(i));
}
} else {
for (int i = filter.nextSetBit(0); i >= 0; i = filter.nextSetBit(i + 1)) {
blackhole.consume(encoder.get(i));
}
}
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void selectRowsVectorized(Blackhole blackhole)
{
EncodingSizeProfiler.encodedSize = encodedSize.get(encoding);
ColumnarLongs columnDecoder = decoders.get(encoding);
long[] vector = new long[VECTOR_SIZE];
while (!vectorOffset.isDone()) {
if (vectorOffset.isContiguous()) {
columnDecoder.get(vector, vectorOffset.getStartOffset(), vectorOffset.getCurrentVectorSize());
} else {
columnDecoder.get(vector, vectorOffset.getOffsets(), vectorOffset.getCurrentVectorSize());
}
for (int i = 0; i < vectorOffset.getCurrentVectorSize(); i++) {
blackhole.consume(vector[i]);
}
vectorOffset.advance();
}
blackhole.consume(vector);
blackhole.consume(vectorOffset);
vectorOffset.reset();
columnDecoder.close();
}
public static void main(String[] args) throws RunnerException
{
Options opt = new OptionsBuilder()
.include(ColumnarLongsSelectRowsFromGeneratorBenchmark.class.getSimpleName())
.addProfiler(EncodingSizeProfiler.class)
.resultFormat(ResultFormatType.CSV)
.result("column-longs-select-speed.csv")
.build();
new Runner(opt).run();
}
}

View File

@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.benchmark.compression;
import com.google.common.io.Files;
import org.apache.druid.segment.data.ColumnarLongs;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.results.format.ResultFormatType;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 1)
@Measurement(iterations = 1)
public class ColumnarLongsSelectRowsFromSegmentBenchmark extends BaseColumnarLongsFromSegmentsBenchmark
{
private Map<String, ColumnarLongs> decoders;
private Map<String, Integer> encodedSize;
/**
* Number of rows to read, the test will randomly set positions in a simulated offset of the specified density in
* {@link #setupFilters(int, double)}
*/
@Param({"0.01", "0.1", "0.33", "0.66", "0.95", "1.0"})
private double filteredRowCountPercentage;
@Setup
public void setup() throws Exception
{
decoders = new HashMap<>();
encodedSize = new HashMap<>();
setupFilters(rows, filteredRowCountPercentage);
setupFromFile(encoding);
// uncomment this block to run sanity check to ensure all specified encodings produce the same set of results
//CHECKSTYLE.OFF: Regexp
// List<String> all = ImmutableList.of("lz4-longs", "lz4-auto");
// for (String _enc : all) {
// if (!_enc.equals(encoding)) {
// setupFromFile(_enc);
// }
// }
//
// checkSanity(decoders, all, rows);
//CHECKSTYLE.ON: Regexp
}
@TearDown
public void teardown()
{
for (ColumnarLongs longs : decoders.values()) {
longs.close();
}
}
private void setupFromFile(String encoding) throws IOException
{
File dir = getTmpDir();
File compFile = new File(dir, getColumnEncodedFileName(encoding, segmentName, columnName));
ByteBuffer buffer = Files.map(compFile);
int size = (int) compFile.length();
encodedSize.put(encoding, size);
ColumnarLongs data = BaseColumnarLongsBenchmark.createColumnarLongs(encoding, buffer);
decoders.put(encoding, data);
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void selectRows(Blackhole blackhole)
{
EncodingSizeProfiler.encodedSize = encodedSize.get(encoding);
ColumnarLongs encoder = decoders.get(encoding);
if (filter == null) {
for (int i = 0; i < rows; i++) {
blackhole.consume(encoder.get(i));
}
} else {
for (int i = filter.nextSetBit(0); i >= 0; i = filter.nextSetBit(i + 1)) {
blackhole.consume(encoder.get(i));
}
}
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void selectRowsVectorized(Blackhole blackhole)
{
EncodingSizeProfiler.encodedSize = encodedSize.get(encoding);
ColumnarLongs columnDecoder = decoders.get(encoding);
long[] vector = new long[VECTOR_SIZE];
while (!vectorOffset.isDone()) {
if (vectorOffset.isContiguous()) {
columnDecoder.get(vector, vectorOffset.getStartOffset(), vectorOffset.getCurrentVectorSize());
} else {
columnDecoder.get(vector, vectorOffset.getOffsets(), vectorOffset.getCurrentVectorSize());
}
for (int i = 0; i < vectorOffset.getCurrentVectorSize(); i++) {
blackhole.consume(vector[i]);
}
vectorOffset.advance();
}
blackhole.consume(vector);
blackhole.consume(vectorOffset);
vectorOffset.reset();
columnDecoder.close();
}
public static void main(String[] args) throws RunnerException
{
Options opt = new OptionsBuilder()
.include(ColumnarLongsSelectRowsFromSegmentBenchmark.class.getSimpleName())
.addProfiler(EncodingSizeProfiler.class)
.resultFormat(ResultFormatType.CSV)
.result("column-longs-select-speed-segments.csv")
.build();
new Runner(opt).run();
}
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.apache.druid.benchmark;
package org.apache.druid.benchmark.compression;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import org.apache.druid.common.config.NullHandling;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.apache.druid.benchmark;
package org.apache.druid.benchmark.compression;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.benchmark.compression;
import org.openjdk.jmh.infra.BenchmarkParams;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.infra.IterationParams;
import org.openjdk.jmh.profile.InternalProfiler;
import org.openjdk.jmh.results.AggregationPolicy;
import org.openjdk.jmh.results.IterationResult;
import org.openjdk.jmh.results.Result;
import org.openjdk.jmh.results.ScalarResult;
import java.util.Collection;
import java.util.Collections;
/**
* Crude jmh 'profiler' that allows calling benchmark methods to set this static value in a benchmark run, and if
* this profiler to the run and have this additional measurement show up in the results.
*
* This allows 2 measurements to be collected for the result set, timing of the test, and size in bytes set here.
*
* @see ColumnarLongsSelectRowsFromGeneratorBenchmark#selectRows(Blackhole)
* @see ColumnarLongsSelectRowsFromGeneratorBenchmark#selectRowsVectorized(Blackhole)
* @see ColumnarLongsEncodeDataFromGeneratorBenchmark#encodeColumn(Blackhole)
* @see ColumnarLongsSelectRowsFromSegmentBenchmark#selectRows(Blackhole)
* @see ColumnarLongsSelectRowsFromSegmentBenchmark#selectRowsVectorized(Blackhole)
* @see ColumnarLongsEncodeDataFromSegmentBenchmark#encodeColumn(Blackhole)
*/
public class EncodingSizeProfiler implements InternalProfiler
{
public static int encodedSize;
@Override
public void beforeIteration(
BenchmarkParams benchmarkParams,
IterationParams iterationParams
)
{
}
@Override
public Collection<? extends Result> afterIteration(
BenchmarkParams benchmarkParams,
IterationParams iterationParams,
IterationResult result
)
{
return Collections.singletonList(
new ScalarResult("encoded size", encodedSize, "bytes", AggregationPolicy.MAX)
);
}
@Override
public String getDescription()
{
return "super janky encoding size result collector";
}
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.apache.druid.benchmark;
package org.apache.druid.benchmark.compression;
import com.google.common.base.Supplier;
import org.apache.druid.common.config.NullHandling;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.apache.druid.benchmark;
package org.apache.druid.benchmark.compression;
import com.google.common.collect.ImmutableList;
import org.apache.druid.common.config.NullHandling;

View File

@ -17,12 +17,13 @@
* under the License.
*/
package org.apache.druid.benchmark;
package org.apache.druid.benchmark.compression;
import com.google.common.base.Supplier;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.MappedByteBufferHandler;
import org.apache.druid.segment.QueryableIndexStorageAdapter;
import org.apache.druid.segment.data.ColumnarLongs;
import org.apache.druid.segment.data.CompressedColumnarLongsSupplier;
import org.openjdk.jmh.annotations.Benchmark;
@ -50,8 +51,8 @@ import java.util.concurrent.TimeUnit;
*/
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 10)
@Measurement(iterations = 25)
@Warmup(iterations = 3)
@Measurement(iterations = 5)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class LongCompressionBenchmark
@ -69,7 +70,7 @@ public class LongCompressionBenchmark
@Param({"auto", "longs"})
private static String format;
@Param({"lz4", "none"})
@Param({"lz4"})
private static String strategy;
private Supplier<ColumnarLongs> supplier;
@ -114,4 +115,18 @@ public class LongCompressionBenchmark
columnarLongs.close();
}
@Benchmark
public void readVectorizedContinuous(Blackhole bh)
{
long[] vector = new long[QueryableIndexStorageAdapter.DEFAULT_VECTOR_SIZE];
ColumnarLongs columnarLongs = supplier.get();
int count = columnarLongs.size();
for (int i = 0; i < count; i++) {
if (i % vector.length == 0) {
columnarLongs.get(vector, i, Math.min(vector.length, count - i));
}
bh.consume(vector[i % vector.length]);
}
columnarLongs.close();
}
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.apache.druid.benchmark;
package org.apache.druid.benchmark.compression;
import com.google.common.collect.ImmutableList;
import org.apache.druid.common.config.NullHandling;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.apache.druid.benchmark;
package org.apache.druid.benchmark.compression;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.FileUtils;

View File

@ -283,26 +283,9 @@ public class CompressionFactory
long read(int index);
default void read(long[] out, int outPosition, int startIndex, int length)
{
for (int i = 0; i < length; i++) {
out[outPosition + i] = read(startIndex + i);
}
}
void read(long[] out, int outPosition, int startIndex, int length);
default int read(long[] out, int outPosition, int[] indexes, int length, int indexOffset, int limit)
{
for (int i = 0; i < length; i++) {
int index = indexes[outPosition + i] - indexOffset;
if (index >= limit) {
return i;
}
out[outPosition + i] = read(index);
}
return length;
}
int read(long[] out, int outPosition, int[] indexes, int length, int indexOffset, int limit);
LongEncodingReader duplicate();
}

View File

@ -65,6 +65,18 @@ public class DeltaLongEncodingReader implements CompressionFactory.LongEncodingR
return base + deserializer.get(index);
}
@Override
public void read(long[] out, int outPosition, int startIndex, int length)
{
deserializer.getDelta(out, outPosition, startIndex, length, base);
}
@Override
public int read(long[] out, int outPosition, int[] indexes, int length, int indexOffset, int limit)
{
return deserializer.getDelta(out, outPosition, indexes, length, indexOffset, limit, base);
}
@Override
public CompressionFactory.LongEncodingReader duplicate()
{

View File

@ -19,52 +19,56 @@
package org.apache.druid.segment.data;
import org.apache.datasketches.memory.Memory;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.LongBuffer;
public class LongsLongEncodingReader implements CompressionFactory.LongEncodingReader
{
private LongBuffer buffer;
private Memory buffer;
public LongsLongEncodingReader(ByteBuffer fromBuffer, ByteOrder order)
{
this.buffer = fromBuffer.asReadOnlyBuffer().order(order).asLongBuffer();
}
private LongsLongEncodingReader(LongBuffer buffer)
{
this.buffer = buffer;
this.buffer = Memory.wrap(fromBuffer.slice(), order);
}
@Override
public void setBuffer(ByteBuffer buffer)
{
this.buffer = buffer.asLongBuffer();
this.buffer = Memory.wrap(buffer.slice(), buffer.order());
}
@Override
public long read(int index)
{
return buffer.get(buffer.position() + index);
return buffer.getLong((long) index << 3);
}
@Override
public void read(final long[] out, final int outPosition, final int startIndex, final int length)
{
final int oldPosition = buffer.position();
try {
buffer.position(oldPosition + startIndex);
buffer.get(out, outPosition, length);
buffer.getLongArray((long) startIndex << 3, out, outPosition, length);
}
finally {
buffer.position(oldPosition);
@Override
public int read(long[] out, int outPosition, int[] indexes, int length, int indexOffset, int limit)
{
for (int i = 0; i < length; i++) {
int index = indexes[outPosition + i] - indexOffset;
if (index >= limit) {
return i;
}
out[outPosition + i] = buffer.getLong((long) index << 3);
}
return length;
}
@Override
public CompressionFactory.LongEncodingReader duplicate()
{
return new LongsLongEncodingReader(buffer.duplicate());
return this;
}
}

View File

@ -71,6 +71,18 @@ public class TableLongEncodingReader implements CompressionFactory.LongEncodingR
return table[(int) deserializer.get(index)];
}
@Override
public void read(long[] out, int outPosition, int startIndex, int length)
{
deserializer.getTable(out, outPosition, startIndex, length, table);
}
@Override
public int read(long[] out, int outPosition, int[] indexes, int length, int indexOffset, int limit)
{
return deserializer.getTable(out, outPosition, indexes, length, indexOffset, limit, table);
}
@Override
public CompressionFactory.LongEncodingReader duplicate()
{

View File

@ -19,10 +19,11 @@
package org.apache.druid.segment.data;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.UOE;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
@ -213,6 +214,7 @@ public class VSizeLongSerde
@Override
public void write(long value) throws IOException
{
Preconditions.checkArgument(value >= 0);
if (count == 8) {
buffer.put(curByte);
count = 0;
@ -267,6 +269,7 @@ public class VSizeLongSerde
@Override
public void write(long value) throws IOException
{
Preconditions.checkArgument(value >= 0);
if (count == 8) {
buffer.put(curByte);
count = 0;
@ -324,6 +327,7 @@ public class VSizeLongSerde
@Override
public void write(long value) throws IOException
{
Preconditions.checkArgument(value >= 0);
int shift = 0;
if (first) {
shift = 4;
@ -388,6 +392,10 @@ public class VSizeLongSerde
@Override
public void write(long value) throws IOException
{
if (numBytes != 8) {
// if the value is not stored in a full long, ensure it is zero or positive
Preconditions.checkArgument(value >= 0);
}
for (int i = numBytes - 1; i >= 0; i--) {
buffer.put((byte) (value >>> (i * 8)));
if (output != null) {
@ -413,9 +421,74 @@ public class VSizeLongSerde
}
}
/**
* Unpack bitpacked long values from an underlying contiguous memory block
*/
public interface LongDeserializer
{
/**
* Unpack long value at the specified row index
*/
long get(int index);
/**
* Unpack a contiguous vector of long values at the specified start index of length and adjust them by the supplied
* delta base value.
*/
void getDelta(long[] out, int outPosition, int startIndex, int length, long base);
/**
* Unpack a non-contiguous vector of long values at the specified indexes and adjust them by the supplied delta base
* value.
*/
default int getDelta(long[] out, int outPosition, int[] indexes, int length, int indexOffset, int limit, long base)
{
for (int i = 0; i < length; i++) {
int index = indexes[outPosition + i] - indexOffset;
if (index >= limit) {
return i;
}
out[outPosition + i] = base + get(index);
}
return length;
}
/**
* Unpack a contiguous vector of long values at the specified start index of length and lookup and replace stored
* values based on their index in the supplied value lookup 'table'
*/
default void getTable(long[] out, int outPosition, int startIndex, int length, long[] table)
{
throw new UOE("Table decoding not supported for %s", this.getClass().getSimpleName());
}
/**
* Unpack a contiguous vector of long values at the specified indexes and lookup and replace stored values based on
* their index in the supplied value lookup 'table'
*/
default int getTable(
long[] out,
int outPosition,
int[] indexes,
int length,
int indexOffset,
int limit,
long[] table
)
{
for (int i = 0; i < length; i++) {
int index = indexes[outPosition + i] - indexOffset;
if (index >= limit) {
return i;
}
out[outPosition + i] = table[(int) get(index)];
}
return length;
}
}
private static final class Size1Des implements LongDeserializer
@ -435,6 +508,58 @@ public class VSizeLongSerde
int shift = 7 - (index & 7);
return (buffer.get(offset + (index >> 3)) >> shift) & 1;
}
@Override
public void getDelta(long[] out, int outPosition, int startIndex, int length, long base)
{
int index = startIndex;
int i = 0;
// byte align
while ((index & 0x7) != 0 && i < length) {
out[outPosition + i++] = base + get(index++);
}
for ( ; i + Byte.SIZE < length; index += Byte.SIZE) {
final byte unpack = buffer.get(offset + (index >> 3));
out[outPosition + i++] = base + (unpack >> 7) & 1;
out[outPosition + i++] = base + (unpack >> 6) & 1;
out[outPosition + i++] = base + (unpack >> 5) & 1;
out[outPosition + i++] = base + (unpack >> 4) & 1;
out[outPosition + i++] = base + (unpack >> 3) & 1;
out[outPosition + i++] = base + (unpack >> 2) & 1;
out[outPosition + i++] = base + (unpack >> 1) & 1;
out[outPosition + i++] = base + unpack & 1;
}
while (i < length) {
out[outPosition + i++] = base + get(index++);
}
}
@Override
public void getTable(long[] out, int outPosition, int startIndex, int length, long[] table)
{
int index = startIndex;
int i = 0;
// byte align
while ((index & 0x7) != 0 && i < length) {
out[outPosition + i++] = table[(int) get(index++)];
}
for ( ; i + Byte.SIZE < length; index += Byte.SIZE) {
final byte unpack = buffer.get(offset + (index >> 3));
out[outPosition + i++] = table[(unpack >> 7) & 1];
out[outPosition + i++] = table[(unpack >> 6) & 1];
out[outPosition + i++] = table[(unpack >> 5) & 1];
out[outPosition + i++] = table[(unpack >> 4) & 1];
out[outPosition + i++] = table[(unpack >> 3) & 1];
out[outPosition + i++] = table[(unpack >> 2) & 1];
out[outPosition + i++] = table[(unpack >> 1) & 1];
out[outPosition + i++] = table[unpack & 1];
}
while (i < length) {
out[outPosition + i++] = table[(int) get(index++)];
}
}
}
private static final class Size2Des implements LongDeserializer
@ -454,6 +579,58 @@ public class VSizeLongSerde
int shift = 6 - ((index & 3) << 1);
return (buffer.get(offset + (index >> 2)) >> shift) & 3;
}
@Override
public void getDelta(long[] out, int outPosition, int startIndex, int length, long base)
{
int index = startIndex;
int i = 0;
// byte align
while ((index & 0x3) != 0 && i < length) {
out[outPosition + i++] = base + get(index++);
}
for ( ; i + 8 < length; index += 8) {
final short unpack = buffer.getShort(offset + (index >> 2));
out[outPosition + i++] = base + (unpack >> 14) & 3;
out[outPosition + i++] = base + (unpack >> 12) & 3;
out[outPosition + i++] = base + (unpack >> 10) & 3;
out[outPosition + i++] = base + (unpack >> 8) & 3;
out[outPosition + i++] = base + (unpack >> 6) & 3;
out[outPosition + i++] = base + (unpack >> 4) & 3;
out[outPosition + i++] = base + (unpack >> 2) & 3;
out[outPosition + i++] = base + unpack & 3;
}
while (i < length) {
out[outPosition + i++] = base + get(index++);
}
}
@Override
public void getTable(long[] out, int outPosition, int startIndex, int length, long[] table)
{
int index = startIndex;
int i = 0;
// byte align
while ((index & 0x3) != 0 && i < length) {
out[outPosition + i++] = table[(int) get(index++)];
}
for ( ; i + 8 < length; index += 8) {
final short unpack = buffer.getShort(offset + (index >> 2));
out[outPosition + i++] = table[(unpack >> 14) & 3];
out[outPosition + i++] = table[(unpack >> 12) & 3];
out[outPosition + i++] = table[(unpack >> 10) & 3];
out[outPosition + i++] = table[(unpack >> 8) & 3];
out[outPosition + i++] = table[(unpack >> 6) & 3];
out[outPosition + i++] = table[(unpack >> 4) & 3];
out[outPosition + i++] = table[(unpack >> 2) & 3];
out[outPosition + i++] = table[unpack & 3];
}
while (i < length) {
out[outPosition + i++] = table[(int) get(index++)];
}
}
}
private static final class Size4Des implements LongDeserializer
@ -473,6 +650,58 @@ public class VSizeLongSerde
int shift = ((index + 1) & 1) << 2;
return (buffer.get(offset + (index >> 1)) >> shift) & 0xF;
}
@Override
public void getDelta(long[] out, int outPosition, int startIndex, int length, long base)
{
int index = startIndex;
int i = 0;
// byte align
while ((index & 0x1) != 0 && i < length) {
out[outPosition + i++] = base + get(index++) & 0xF;
}
for ( ; i + 8 < length; index += 8) {
final int unpack = buffer.getInt(offset + (index >> 1));
out[outPosition + i++] = base + (unpack >> 28) & 0xF;
out[outPosition + i++] = base + (unpack >> 24) & 0xF;
out[outPosition + i++] = base + (unpack >> 20) & 0xF;
out[outPosition + i++] = base + (unpack >> 16) & 0xF;
out[outPosition + i++] = base + (unpack >> 12) & 0xF;
out[outPosition + i++] = base + (unpack >> 8) & 0xF;
out[outPosition + i++] = base + (unpack >> 4) & 0xF;
out[outPosition + i++] = base + unpack & 0xF;
}
while (i < length) {
out[outPosition + i++] = base + get(index++);
}
}
@Override
public void getTable(long[] out, int outPosition, int startIndex, int length, long[] table)
{
int index = startIndex;
int i = 0;
// byte align
while ((index & 0x1) != 0 && i < length) {
out[outPosition + i++] = table[(int) get(index++)];
}
for ( ; i + 8 < length; index += 8) {
final int unpack = buffer.getInt(offset + (index >> 1));
out[outPosition + i++] = table[(unpack >> 28) & 0xF];
out[outPosition + i++] = table[(unpack >> 24) & 0xF];
out[outPosition + i++] = table[(unpack >> 20) & 0xF];
out[outPosition + i++] = table[(unpack >> 16) & 0xF];
out[outPosition + i++] = table[(unpack >> 12) & 0xF];
out[outPosition + i++] = table[(unpack >> 8) & 0xF];
out[outPosition + i++] = table[(unpack >> 4) & 0xF];
out[outPosition + i++] = table[unpack & 0xF];
}
while (i < length) {
out[outPosition + i++] = table[(int) get(index++)];
}
}
}
private static final class Size8Des implements LongDeserializer
@ -491,6 +720,52 @@ public class VSizeLongSerde
{
return buffer.get(offset + index) & 0xFF;
}
@Override
public void getDelta(long[] out, int outPosition, int startIndex, int length, long base)
{
for (int i = 0, indexOffset = startIndex; i < length; i++, indexOffset++) {
out[outPosition + i] = base + buffer.get(offset + indexOffset) & 0xFF;
}
}
@Override
public int getDelta(long[] out, int outPosition, int[] indexes, int length, int indexOffset, int limit, long base)
{
for (int i = 0; i < length; i++) {
int index = indexes[outPosition + i] - indexOffset;
if (index >= limit) {
return i;
}
out[outPosition + i] = base + (buffer.get(offset + index) & 0xFF);
}
return length;
}
@Override
public void getTable(long[] out, int outPosition, int startIndex, int length, long[] table)
{
for (int i = 0, indexOffset = startIndex; i < length; i++, indexOffset++) {
out[outPosition + i] = table[buffer.get(offset + indexOffset) & 0xFF];
}
}
@Override
public int getTable(long[] out, int outPosition, int[] indexes, int length, int indexOffset, int limit, long[] table)
{
for (int i = 0; i < length; i++) {
int index = indexes[outPosition + i] - indexOffset;
if (index >= limit) {
return i;
}
out[outPosition + i] = table[buffer.get(offset + index) & 0xFF];
}
return length;
}
}
private static final class Size12Des implements LongDeserializer
@ -508,8 +783,37 @@ public class VSizeLongSerde
public long get(int index)
{
int shift = ((index + 1) & 1) << 2;
int offset = (index * 3) >> 1;
return (buffer.getShort(this.offset + offset) >> shift) & 0xFFF;
int indexOffset = (index * 3) >> 1;
return (buffer.getShort(offset + indexOffset) >> shift) & 0xFFF;
}
@Override
public void getDelta(long[] out, int outPosition, int startIndex, int length, long base)
{
int i = 0;
int index = startIndex;
// every other value is byte aligned
if ((index & 0x1) != 0) {
out[outPosition + i++] = get(index++);
}
final int unpackSize = Long.BYTES + Integer.BYTES;
for (int indexOffset = (index * 3) >> 1; i + 8 < length; indexOffset += unpackSize) {
final long unpack = buffer.getLong(offset + indexOffset);
final int unpack2 = buffer.getInt(offset + indexOffset + Long.BYTES);
out[outPosition + i++] = base + ((unpack >> 52) & 0xFFF);
out[outPosition + i++] = base + ((unpack >> 40) & 0xFFF);
out[outPosition + i++] = base + ((unpack >> 28) & 0xFFF);
out[outPosition + i++] = base + ((unpack >> 16) & 0xFFF);
out[outPosition + i++] = base + ((unpack >> 4) & 0xFFF);
out[outPosition + i++] = base + (((unpack & 0xF) << 8) | ((unpack2 >>> 24) & 0xFF));
out[outPosition + i++] = base + ((unpack2 >> 12) & 0xFFF);
out[outPosition + i++] = base + (unpack2 & 0xFFF);
}
while (i < length) {
out[outPosition + i] = base + get(startIndex + i);
i++;
}
}
}
@ -529,6 +833,29 @@ public class VSizeLongSerde
{
return buffer.getShort(offset + (index << 1)) & 0xFFFF;
}
@Override
public void getDelta(long[] out, int outPosition, int startIndex, int length, long base)
{
for (int i = 0, indexOffset = (startIndex << 1); i < length; i++, indexOffset += Short.BYTES) {
out[outPosition + i] = base + buffer.getShort(offset + indexOffset) & 0xFFFF;
}
}
@Override
public int getDelta(long[] out, int outPosition, int[] indexes, int length, int indexOffset, int limit, long base)
{
for (int i = 0; i < length; i++) {
int index = indexes[outPosition + i] - indexOffset;
if (index >= limit) {
return i;
}
out[outPosition + i] = base + buffer.getShort(offset + (index << 1)) & 0xFFFF;
}
return length;
}
}
private static final class Size20Des implements LongDeserializer
@ -546,8 +873,37 @@ public class VSizeLongSerde
public long get(int index)
{
int shift = (((index + 1) & 1) << 2) + 8;
int offset = (index * 5) >> 1;
return (buffer.getInt(this.offset + offset) >> shift) & 0xFFFFF;
int indexOffset = (index * 5) >> 1;
return (buffer.getInt(offset + indexOffset) >> shift) & 0xFFFFF;
}
@Override
public void getDelta(long[] out, int outPosition, int startIndex, int length, long base)
{
int i = 0;
int index = startIndex;
// every other value is byte aligned
if ((index & 0x1) != 0) {
out[outPosition + i++] = get(index++);
}
final int unpackSize = Long.BYTES + Long.BYTES + Integer.BYTES;
for (int indexOffset = (index * 5) >> 1; i + 8 < length; indexOffset += unpackSize) {
final long unpack = buffer.getLong(offset + indexOffset);
final long unpack2 = buffer.getLong(offset + indexOffset + Long.BYTES);
final int unpack3 = buffer.getInt(offset + indexOffset + Long.BYTES + Long.BYTES);
out[outPosition + i++] = base + ((unpack >> 44) & 0xFFFFF);
out[outPosition + i++] = base + ((unpack >> 24) & 0xFFFFF);
out[outPosition + i++] = base + ((unpack >> 4) & 0xFFFFF);
out[outPosition + i++] = base + (((unpack & 0xF) << 16) | ((unpack2 >>> 48) & 0xFFFF));
out[outPosition + i++] = base + ((unpack2 >> 28) & 0xFFFFF);
out[outPosition + i++] = base + ((unpack2 >> 8) & 0xFFFFF);
out[outPosition + i++] = base + (((unpack2 & 0xFF) << 12) | ((unpack3 >>> 20) & 0xFFF));
out[outPosition + i++] = base + (unpack3 & 0xFFFFF);
}
while (i < length) {
out[outPosition + i] = base + get(startIndex + i);
i++;
}
}
}
@ -565,7 +921,31 @@ public class VSizeLongSerde
@Override
public long get(int index)
{
return buffer.getInt(offset + index * 3) >>> 8;
return buffer.getInt(offset + (index * 3)) >>> 8;
}
@Override
public void getDelta(long[] out, int outPosition, int startIndex, int length, long base)
{
int i = 0;
final int unpackSize = 3 * Long.BYTES;
for (int indexOffset = startIndex * 3; i + 8 < length; indexOffset += unpackSize) {
final long unpack = buffer.getLong(offset + indexOffset);
final long unpack2 = buffer.getLong(offset + indexOffset + Long.BYTES);
final long unpack3 = buffer.getLong(offset + indexOffset + Long.BYTES + Long.BYTES);
out[outPosition + i++] = base + ((unpack >> 40) & 0xFFFFFF);
out[outPosition + i++] = base + ((unpack >> 16) & 0xFFFFFF);
out[outPosition + i++] = base + (((unpack & 0xFFFF) << 8) | ((unpack2 >>> 56) & 0xFF));
out[outPosition + i++] = base + ((unpack2 >> 32) & 0xFFFFFF);
out[outPosition + i++] = base + ((unpack2 >> 8) & 0xFFFFFF);
out[outPosition + i++] = base + (((unpack2 & 0xFF) << 16) | ((unpack3 >>> 48) & 0xFFFF));
out[outPosition + i++] = base + ((unpack3 >> 24) & 0xFFFFFF);
out[outPosition + i++] = base + (unpack3 & 0xFFFFFF);
}
while (i < length) {
out[outPosition + i] = base + get(startIndex + i);
i++;
}
}
}
@ -583,7 +963,15 @@ public class VSizeLongSerde
@Override
public long get(int index)
{
return buffer.getInt(offset + (index << 2)) & 0xFFFFFFFFL;
return buffer.getInt((offset + (index << 2))) & 0xFFFFFFFFL;
}
@Override
public void getDelta(long[] out, int outPosition, int startIndex, int length, long base)
{
for (int i = 0, indexOffset = (startIndex << 2); i < length; i++, indexOffset += Integer.BYTES) {
out[outPosition + i] = base + buffer.getInt(offset + indexOffset) & 0xFFFFFFFFL;
}
}
}
@ -601,7 +989,33 @@ public class VSizeLongSerde
@Override
public long get(int index)
{
return buffer.getLong(offset + index * 5) >>> 24;
return buffer.getLong(offset + (index * 5)) >>> 24;
}
@Override
public void getDelta(long[] out, int outPosition, int startIndex, int length, long base)
{
int i = 0;
final int unpackSize = 5 * Long.BYTES;
for (int indexOffset = startIndex * 5; i + 8 < length; indexOffset += unpackSize) {
final long unpack = buffer.getLong(offset + indexOffset);
final long unpack2 = buffer.getLong(offset + indexOffset + Long.BYTES);
final long unpack3 = buffer.getLong(offset + indexOffset + (2 * Long.BYTES));
final long unpack4 = buffer.getLong(offset + indexOffset + (3 * Long.BYTES));
final long unpack5 = buffer.getLong(offset + indexOffset + (4 * Long.BYTES));
out[outPosition + i++] = base + ((unpack >>> 24) & 0xFFFFFFFFFFL);
out[outPosition + i++] = base + (((unpack & 0xFFFFFFL) << 16) | ((unpack2 >>> 48) & 0xFFFFL));
out[outPosition + i++] = base + ((unpack2 >>> 8) & 0xFFFFFFFFFFL);
out[outPosition + i++] = base + (((unpack2 & 0xFFL) << 32) | ((unpack3 >>> 32) & 0xFFFFFFFFL));
out[outPosition + i++] = base + (((unpack3 & 0xFFFFFFFFL) << 8) | ((unpack4 >>> 56) & 0xFFL));
out[outPosition + i++] = base + ((unpack4 >>> 16) & 0xFFFFFFFFFFL);
out[outPosition + i++] = base + (((unpack4 & 0xFFFFL) << 24) | ((unpack5 >>> 40) & 0xFFFFFFL));
out[outPosition + i++] = base + (unpack5 & 0xFFFFFFFFFFL);
}
while (i < length) {
out[outPosition + i] = base + get(startIndex + i);
i++;
}
}
}
@ -619,7 +1033,34 @@ public class VSizeLongSerde
@Override
public long get(int index)
{
return buffer.getLong(offset + index * 6) >>> 16;
return buffer.getLong(offset + (index * 6)) >>> 16;
}
@Override
public void getDelta(long[] out, int outPosition, int startIndex, int length, long base)
{
int i = 0;
final int unpackSize = 6 * Long.BYTES;
for (int indexOffset = startIndex * 6; i + 8 < length; indexOffset += unpackSize) {
final long unpack = buffer.getLong(offset + indexOffset);
final long unpack2 = buffer.getLong(offset + indexOffset + Long.BYTES);
final long unpack3 = buffer.getLong(offset + indexOffset + (2 * Long.BYTES));
final long unpack4 = buffer.getLong(offset + indexOffset + (3 * Long.BYTES));
final long unpack5 = buffer.getLong(offset + indexOffset + (4 * Long.BYTES));
final long unpack6 = buffer.getLong(offset + indexOffset + (5 * Long.BYTES));
out[outPosition + i++] = base + ((unpack >>> 16) & 0xFFFFFFFFFFFFL);
out[outPosition + i++] = base + (((unpack & 0xFFFFL) << 32) | ((unpack2 >>> 32) & 0xFFFFFFFFL));
out[outPosition + i++] = base + (((unpack2 & 0xFFFFFFFFL) << 16) | ((unpack3 >>> 48) & 0xFFFFL));
out[outPosition + i++] = base + (unpack3 & 0xFFFFFFFFFFFFL);
out[outPosition + i++] = base + ((unpack4 >>> 16) & 0xFFFFFFFFFFFFL);
out[outPosition + i++] = base + (((unpack4 & 0xFFFFL) << 32) | ((unpack5 >>> 32) & 0xFFFFFFFFL));
out[outPosition + i++] = base + (((unpack5 & 0xFFFFFFFFL) << 16) | ((unpack6 >>> 48) & 0xFFFFL));
out[outPosition + i++] = base + (unpack6 & 0xFFFFFFFFFFFFL);
}
while (i < length) {
out[outPosition + i] = base + get(startIndex + i);
i++;
}
}
}
@ -637,7 +1078,35 @@ public class VSizeLongSerde
@Override
public long get(int index)
{
return buffer.getLong(offset + index * 7) >>> 8;
return buffer.getLong(offset + (index * 7)) >>> 8;
}
@Override
public void getDelta(long[] out, int outPosition, int startIndex, int length, long base)
{
int i = 0;
final int unpackSize = 7 * Long.BYTES;
for (int indexOffset = startIndex * 7; i + 8 < length; indexOffset += unpackSize) {
final long unpack = buffer.getLong(offset + indexOffset);
final long unpack2 = buffer.getLong(offset + indexOffset + Long.BYTES);
final long unpack3 = buffer.getLong(offset + indexOffset + (2 * Long.BYTES));
final long unpack4 = buffer.getLong(offset + indexOffset + (3 * Long.BYTES));
final long unpack5 = buffer.getLong(offset + indexOffset + (4 * Long.BYTES));
final long unpack6 = buffer.getLong(offset + indexOffset + (5 * Long.BYTES));
final long unpack7 = buffer.getLong(offset + indexOffset + (6 * Long.BYTES));
out[outPosition + i++] = base + ((unpack >>> 8) & 0xFFFFFFFFFFFFFFL);
out[outPosition + i++] = base + (((unpack & 0xFFL) << 48) | ((unpack2 >>> 16) & 0xFFFFFFFFFFFFL));
out[outPosition + i++] = base + (((unpack2 & 0xFFFFL) << 40) | ((unpack3 >>> 24) & 0xFFFFFFFFFFL));
out[outPosition + i++] = base + (((unpack3 & 0xFFFFFFL) << 32) | ((unpack4 >>> 32) & 0xFFFFFFFFL));
out[outPosition + i++] = base + (((unpack4 & 0xFFFFFFFFL) << 24) | ((unpack5 >>> 40) & 0xFFFFFFL));
out[outPosition + i++] = base + (((unpack5 & 0xFFFFFFFFFFL) << 16) | ((unpack6 >>> 48) & 0xFFFFL));
out[outPosition + i++] = base + (((unpack6 & 0xFFFFFFFFFFFFL) << 8) | ((unpack7 >>> 56) & 0xFFL));
out[outPosition + i++] = base + (unpack7 & 0xFFFFFFFFFFFFFFL);
}
while (i < length) {
out[outPosition + i] = base + get(startIndex + i);
i++;
}
}
}
@ -657,6 +1126,26 @@ public class VSizeLongSerde
{
return buffer.getLong(offset + (index << 3));
}
@Override
public void getDelta(long[] out, int outPosition, int startIndex, int length, long base)
{
for (int i = 0, indexOffset = (startIndex << 3); i < length; i++, indexOffset += Long.BYTES) {
out[outPosition + i] = base + buffer.getLong(offset + indexOffset);
}
}
@Override
public int getDelta(long[] out, int outPosition, int[] indexes, int length, int indexOffset, int limit, long base)
{
for (int i = 0; i < length; i++) {
int index = indexes[outPosition + i] - indexOffset;
if (index >= limit) {
return i;
}
out[outPosition + i] = base + buffer.getLong(offset + (index << 3));
}
return length;
}
}
}

View File

@ -33,8 +33,9 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.function.Supplier;
public class ColumnValueGenerator
public class ColumnValueGenerator implements Supplier<Object>
{
private final GeneratorColumnSchema schema;
private final long seed;
@ -224,4 +225,10 @@ public class ColumnValueGenerator
((EnumeratedDistribution) distribution).reseedRandomGenerator(seed);
}
}
@Override
public Object get()
{
return generateRowValue();
}
}

View File

@ -224,12 +224,18 @@ public class CompressedLongsSerdeTest
Assert.assertEquals(vals.length, indexed.size());
// sequential access
long[] vector = new long[256];
int[] indices = new int[vals.length];
for (int i = 0; i < indexed.size(); ++i) {
if (i % 256 == 0) {
indexed.get(vector, i, Math.min(256, indexed.size() - i));
}
Assert.assertEquals(vals[i], indexed.get(i));
Assert.assertEquals(vals[i], vector[i % 256]);
indices[i] = i;
}
// random access, limited to 1000 elements for large lists (every element would take too long)
IntArrays.shuffle(indices, ThreadLocalRandom.current());
final int limit = Math.min(indexed.size(), 1000);

View File

@ -20,19 +20,75 @@
package org.apache.druid.segment.data;
import com.google.common.primitives.Ints;
import org.apache.druid.java.util.common.StringUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.stream.Collectors;
@RunWith(Enclosed.class)
public class VSizeLongSerdeTest
{
private ByteBuffer buffer;
private ByteArrayOutputStream outStream;
private ByteBuffer outBuffer;
@RunWith(Parameterized.class)
public static class EveryLittleBitTest
{
private final int numBits;
public EveryLittleBitTest(int numBits)
{
this.numBits = numBits;
}
@Parameterized.Parameters(name = "numBits={0}")
public static Collection<Object[]> data()
{
return Arrays.stream(VSizeLongSerde.SUPPORTED_SIZES)
.mapToObj(value -> new Object[]{value})
.collect(Collectors.toList());
}
@Test
public void testEveryPowerOfTwo() throws IOException
{
// Test every long that has a single bit set.
final int numLongs = Math.min(64, numBits);
final long[] longs = new long[numLongs];
for (int bit = 0; bit < numLongs; bit++) {
longs[bit] = 1L << bit;
}
testSerde(numBits, longs);
}
@Test
public void testEveryPowerOfTwoMinusOne() throws IOException
{
// Test every long with runs of low bits set.
final int numLongs = Math.min(64, numBits + 1);
final long[] longs = new long[numLongs];
for (int bit = 0; bit < numLongs; bit++) {
longs[bit] = (1L << bit) - 1;
}
testSerde(numBits, longs);
}
}
public static class SpecificValuesTest
{
private final long[] values0 = {0, 1, 1, 0, 1, 1, 1, 1, 0, 0, 1, 1};
private final long[] values1 = {0, 1, 1, 0, 1, 1, 1, 1, 0, 0, 1, 1};
private final long[] values2 = {12, 5, 2, 9, 3, 2, 5, 1, 0, 6, 13, 10, 15};
@ -41,13 +97,6 @@ public class VSizeLongSerdeTest
private final long[] values5 = {123, 632, 12, 39, 536, 0, 1023, 52, 777, 526, 214, 562, 823, 346};
private final long[] values6 = {1000000, 1000001, 1000002, 1000003, 1000004, 1000005, 1000006, 1000007, 1000008};
@Before
public void setUp()
{
outStream = new ByteArrayOutputStream();
outBuffer = ByteBuffer.allocate(500000);
}
@Test
public void testGetBitsForMax()
{
@ -89,22 +138,43 @@ public class VSizeLongSerdeTest
@Test
public void testSerdeLoop() throws IOException
{
final long[] zeroTo256 = generateSequentialLongs(0, 256);
final long[] zeroTo50000 = generateSequentialLongs(0, 50000);
for (int i : VSizeLongSerde.SUPPORTED_SIZES) {
if (i >= 8) {
testSerdeIncLoop(i, 0, 256);
testSerde(i, zeroTo256);
}
if (i >= 16) {
testSerdeIncLoop(i, 0, 50000);
testSerde(i, zeroTo50000);
}
}
}
public void testSerde(int longSize, long[] values) throws IOException
private long[] generateSequentialLongs(final long start, final long end)
{
outBuffer.rewind();
outStream.reset();
VSizeLongSerde.LongSerializer streamSer = VSizeLongSerde.getSerializer(longSize, outStream);
VSizeLongSerde.LongSerializer bufferSer = VSizeLongSerde.getSerializer(longSize, outBuffer, 0);
final long[] values = new long[Ints.checkedCast(end - start)];
for (int i = 0; i < values.length; i++) {
values[i] = start + i;
}
return values;
}
}
public static void testSerde(int numBits, long[] values) throws IOException
{
final int bufferOffset = 1;
final ByteArrayOutputStream outStream = new ByteArrayOutputStream();
outStream.write(0xAF); // Dummy byte so the real stuff starts at bufferOffset
final ByteBuffer buffer =
ByteBuffer.allocate(VSizeLongSerde.getSerializedSize(numBits, values.length) + bufferOffset);
buffer.rewind();
buffer.put(0, (byte) 0xAF); // Dummy byte again.
VSizeLongSerde.LongSerializer streamSer = VSizeLongSerde.getSerializer(numBits, outStream);
VSizeLongSerde.LongSerializer bufferSer = VSizeLongSerde.getSerializer(numBits, buffer, bufferOffset);
for (long value : values) {
streamSer.write(value);
bufferSer.write(value);
@ -112,40 +182,190 @@ public class VSizeLongSerdeTest
streamSer.close();
bufferSer.close();
buffer = ByteBuffer.wrap(outStream.toByteArray());
Assert.assertEquals(VSizeLongSerde.getSerializedSize(longSize, values.length), buffer.capacity());
Assert.assertEquals(VSizeLongSerde.getSerializedSize(longSize, values.length), outBuffer.position());
VSizeLongSerde.LongDeserializer streamDes = VSizeLongSerde.getDeserializer(longSize, buffer, 0);
VSizeLongSerde.LongDeserializer bufferDes = VSizeLongSerde.getDeserializer(longSize, outBuffer, 0);
for (int i = 0; i < values.length; i++) {
Assert.assertEquals(values[i], streamDes.get(i));
Assert.assertEquals(values[i], bufferDes.get(i));
}
// Verify serialized sizes.
final ByteBuffer bufferFromStream = ByteBuffer.wrap(outStream.toByteArray());
Assert.assertEquals(
StringUtils.format("Serialized size (stream, numBits = %d)", numBits),
VSizeLongSerde.getSerializedSize(numBits, values.length),
bufferFromStream.capacity() - bufferOffset
);
Assert.assertEquals(
StringUtils.format("Serialized size (buffer, numBits = %d)", numBits),
VSizeLongSerde.getSerializedSize(numBits, values.length),
buffer.position() - bufferOffset
);
// Verify the actual serialized contents.
Assert.assertArrayEquals(
StringUtils.format("Stream and buffer serialized images are equal (numBits = %d)", numBits),
bufferFromStream.array(),
buffer.array()
);
// Verify deserialization. We know the two serialized buffers are equal, so from this point on, just use one.
VSizeLongSerde.LongDeserializer deserializer = VSizeLongSerde.getDeserializer(numBits, buffer, bufferOffset);
testGetSingleRow(deserializer, numBits, values);
testContiguousGetSingleRow(deserializer, numBits, values);
testContiguousGetWholeRegion(deserializer, numBits, values);
testNoncontiguousGetSingleRow(deserializer, numBits, values);
testNoncontiguousGetEveryOtherValue(deserializer, numBits, values);
testNoncontiguousGetEveryOtherValueWithLimit(deserializer, numBits, values);
}
public void testSerdeIncLoop(int longSize, long start, long end) throws IOException
private static void testGetSingleRow(
final VSizeLongSerde.LongDeserializer deserializer,
final int numBits,
final long[] values
)
{
outBuffer.rewind();
outStream.reset();
VSizeLongSerde.LongSerializer streamSer = VSizeLongSerde.getSerializer(longSize, outStream);
VSizeLongSerde.LongSerializer bufferSer = VSizeLongSerde.getSerializer(longSize, outBuffer, 0);
for (long i = start; i < end; i++) {
streamSer.write(i);
bufferSer.write(i);
}
streamSer.close();
bufferSer.close();
buffer = ByteBuffer.wrap(outStream.toByteArray());
Assert.assertEquals(VSizeLongSerde.getSerializedSize(longSize, (int) (end - start)), buffer.capacity());
Assert.assertEquals(VSizeLongSerde.getSerializedSize(longSize, (int) (end - start)), outBuffer.position());
VSizeLongSerde.LongDeserializer streamDes = VSizeLongSerde.getDeserializer(longSize, buffer, 0);
VSizeLongSerde.LongDeserializer bufferDes = VSizeLongSerde.getDeserializer(longSize, outBuffer, 0);
for (int i = 0; i < end - start; i++) {
Assert.assertEquals(start + i, streamDes.get(i));
Assert.assertEquals(start + i, bufferDes.get(i));
for (int i = 0; i < values.length; i++) {
Assert.assertEquals(
StringUtils.format("Deserializer (testGetSingleRow, numBits = %d, position = %d)", numBits, i),
values[i],
deserializer.get(i)
);
}
}
private static void testContiguousGetSingleRow(
final VSizeLongSerde.LongDeserializer deserializer,
final int numBits,
final long[] values
)
{
final int outPosition = 1;
final long[] out = new long[values.length + outPosition];
for (int i = 0; i < values.length; i++) {
Arrays.fill(out, -1);
deserializer.getDelta(out, outPosition, i, 1, 0);
Assert.assertEquals(
StringUtils.format("Deserializer (testContiguousGetSingleRow, numBits = %d, position = %d)", numBits, i),
values[i],
out[outPosition]
);
}
}
private static void testContiguousGetWholeRegion(
final VSizeLongSerde.LongDeserializer deserializer,
final int numBits,
final long[] values
)
{
final int outPosition = 1;
final long[] out = new long[values.length + outPosition];
Arrays.fill(out, -1);
deserializer.getDelta(out, outPosition, 0, values.length, 0);
Assert.assertArrayEquals(
StringUtils.format("Deserializer (testContiguousGetWholeRegion, numBits = %d)", numBits),
values,
Arrays.stream(out).skip(outPosition).toArray()
);
}
private static void testNoncontiguousGetSingleRow(
final VSizeLongSerde.LongDeserializer deserializer,
final int numBits,
final long[] values
)
{
final int indexOffset = 1;
final int outPosition = 1;
final long[] out = new long[values.length + outPosition];
final int[] indexes = new int[values.length + outPosition];
for (int i = 0; i < values.length; i++) {
Arrays.fill(out, -1);
Arrays.fill(indexes, -1);
indexes[outPosition] = i + indexOffset;
deserializer.getDelta(out, outPosition, indexes, 1, indexOffset, values.length, 0);
Assert.assertEquals(
StringUtils.format("Deserializer (testNoncontiguousGetSingleRow, numBits = %d, position = %d)", numBits, i),
values[i],
out[outPosition]
);
}
}
private static void testNoncontiguousGetEveryOtherValue(
final VSizeLongSerde.LongDeserializer deserializer,
final int numBits,
final long[] values
)
{
final int indexOffset = 1;
final int outPosition = 1;
final long[] out = new long[values.length + outPosition];
final long[] expectedOut = new long[values.length + outPosition];
final int[] indexes = new int[values.length + outPosition];
Arrays.fill(out, -1);
Arrays.fill(expectedOut, -1);
Arrays.fill(indexes, -1);
int cnt = 0;
for (int i = 0; i < values.length; i++) {
if (i % 2 == 0) {
indexes[outPosition + i / 2] = i + indexOffset;
expectedOut[outPosition + i / 2] = values[i];
cnt++;
}
}
deserializer.getDelta(out, outPosition, indexes, cnt, indexOffset, values.length, 0);
Assert.assertArrayEquals(
StringUtils.format("Deserializer (testNoncontiguousGetEveryOtherValue, numBits = %d)", numBits),
expectedOut,
out
);
}
private static void testNoncontiguousGetEveryOtherValueWithLimit(
final VSizeLongSerde.LongDeserializer deserializer,
final int numBits,
final long[] values
)
{
final int indexOffset = 1;
final int outPosition = 1;
final long[] out = new long[values.length + outPosition];
final long[] expectedOut = new long[values.length + outPosition];
final int[] indexes = new int[values.length + outPosition];
final int limit = values.length - 2; // Don't do the last value
Arrays.fill(out, -1);
Arrays.fill(expectedOut, -1);
Arrays.fill(indexes, -1);
int cnt = 0;
for (int i = 0; i < values.length; i++) {
if (i % 2 == 0) {
indexes[outPosition + i / 2] = i + indexOffset;
if (i < limit) {
expectedOut[outPosition + i / 2] = values[i];
}
cnt++;
}
}
final int ret = deserializer.getDelta(out, outPosition, indexes, cnt, indexOffset, limit, 0);
Assert.assertArrayEquals(
StringUtils.format("Deserializer (testNoncontiguousGetEveryOtherValue, numBits = %d)", numBits),
expectedOut,
out
);
Assert.assertEquals(Math.max(0, cnt - 1), ret);
}
}