Adding zstandard compression library (#12408)

* Adding zstandard compression library

* 1. Took @clintropolis's advice to have ZStandard decompressor use the byte array when the buffers are not direct.
2. Cleaned up checkstyle issues.

* Fixing zstandard version to latest stable version in pom's and updating license files

* Removing zstd from benchmarks and adding to processing (poms)

* fix the intellij inspection issue

* Removing the prefix v for the version in the license check for ztsd

* Fixing license checks

Co-authored-by: Rahul Gidwani <r_gidwani@apple.com>
This commit is contained in:
Dr. Sizzles 2022-05-28 17:01:44 -07:00 committed by GitHub
parent 79f86a0511
commit 7291c92f4f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 306 additions and 43 deletions

View File

@ -61,7 +61,9 @@ public class BaseColumnarLongsBenchmark
* encoding of values within the block.
*/
@Param({
"zstd-longs",
"lz4-longs",
"zstd-auto",
"lz4-auto"
})
String encoding;
@ -271,6 +273,26 @@ public class BaseColumnarLongsBenchmark
CompressionStrategy.NONE
);
break;
case "zstd-longs":
serializer = CompressionFactory.getLongSerializer(
encoding,
writeOutMedium,
"zstd-longs",
ByteOrder.LITTLE_ENDIAN,
CompressionFactory.LongEncodingStrategy.LONGS,
CompressionStrategy.ZSTD
);
break;
case "zstd-auto":
serializer = CompressionFactory.getLongSerializer(
encoding,
writeOutMedium,
"zstd-auto",
ByteOrder.LITTLE_ENDIAN,
CompressionFactory.LongEncodingStrategy.AUTO,
CompressionStrategy.ZSTD
);
break;
default:
throw new RuntimeException("unknown encoding");
}
@ -290,6 +312,8 @@ public class BaseColumnarLongsBenchmark
case "lz4-auto":
case "none-auto":
case "none-longs":
case "zstd-auto":
case "zstd-longs":
return CompressedColumnarLongsSupplier.fromByteBuffer(buffer, ByteOrder.LITTLE_ENDIAN).get();
}

View File

@ -233,7 +233,8 @@ def build_compatible_license_names():
compatible_licenses['BSD-2-Clause License'] = 'BSD-2-Clause License'
compatible_licenses['BSD-2-Clause'] = 'BSD-2-Clause License'
compatible_licenses['BSD 2-Clause license'] = 'BSD 2-Clause License'
compatible_licenses['BSD 2-Clause license'] = 'BSD-2-Clause License'
compatible_licenses['BSD 2-Clause License'] = 'BSD-2-Clause License'
compatible_licenses['BSD-3-Clause License'] = 'BSD-3-Clause License'
compatible_licenses['New BSD license'] = 'BSD-3-Clause License'

View File

@ -208,8 +208,8 @@ The `tuningConfig` is optional and default parameters will be used if no `tuning
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|bitmap|Object|Compression format for bitmap indexes. Should be a JSON object. See [Bitmap types](#bitmap-types) below for options.|no (defaults to Roaring)|
|dimensionCompression|String|Compression format for dimension columns. Choose from `LZ4`, `LZF`, or `uncompressed`.|no (default == `LZ4`)|
|metricCompression|String|Compression format for primitive type metric columns. Choose from `LZ4`, `LZF`, `uncompressed`, or `none`.|no (default == `LZ4`)|
|dimensionCompression|String|Compression format for dimension columns. Choose from `LZ4`, `LZF`, `ZSTD` or `uncompressed`.|no (default == `LZ4`)|
|metricCompression|String|Compression format for primitive type metric columns. Choose from `LZ4`, `LZF`, `ZSTD`, `uncompressed` or `none`.|no (default == `LZ4`)|
|longEncoding|String|Encoding format for metric and dimension columns with type long. Choose from `auto` or `longs`. `auto` encodes the values using offset or lookup table depending on column cardinality, and store them with variable size. `longs` stores the value as is with 8 bytes each.|no (default == `longs`)|
##### Bitmap types

View File

@ -3964,7 +3964,7 @@ name: JNI binding for Zstd
license_category: binary
module: java-core
license_name: BSD-2-Clause License
version: 1.3.3-1
version: 1.5.2-3
copyright: Luben Karavelov
license_file_path: licenses/bin/zstd-jni.BSD2
libraries:
@ -5028,7 +5028,7 @@ libraries:
name: DNS Java
license_category: binary
module: java-core
license_name: BSD 2-Clause license
license_name: BSD-2-Clause License
version: 2.1.7
libraries:
- dnsjava: dnsjava

View File

@ -449,7 +449,7 @@
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<version>1.3.3-1</version>
<version>1.5.2-3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson</groupId>

View File

@ -169,6 +169,10 @@
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
</dependency>
<!-- Tests -->
<dependency>

View File

@ -21,6 +21,7 @@ package org.apache.druid.segment.data;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
import com.github.luben.zstd.Zstd;
import com.ning.compress.BufferRecycler;
import com.ning.compress.lzf.LZFDecoder;
import com.ning.compress.lzf.LZFEncoder;
@ -75,6 +76,21 @@ public enum CompressionStrategy
return LZ4Compressor.DEFAULT_COMPRESSOR;
}
},
ZSTD((byte) 0x2) {
@Override
public Decompressor getDecompressor()
{
return ZstdDecompressor.DEFAULT_COMPRESSOR;
}
@Override
public Compressor getCompressor()
{
return ZstdCompressor.DEFAULT_COMPRESSOR;
}
},
UNCOMPRESSED((byte) 0xFF) {
@Override
public Decompressor getDecompressor()
@ -171,7 +187,7 @@ public enum CompressionStrategy
/**
* Allocates a buffer that should be passed to {@link #compress} method as input buffer. Different Compressors
* require (or work more efficiently with) different kinds of buffers.
*
* <p>
* If the allocated buffer is a direct buffer, it should be registered to be freed with the given Closer.
*/
ByteBuffer allocateInBuffer(int inputSize, Closer closer)
@ -182,9 +198,9 @@ public enum CompressionStrategy
/**
* Allocates a buffer that should be passed to {@link #compress} method as output buffer. Different Compressors
* require (or work more efficiently with) different kinds of buffers.
*
* <p>
* Allocates a buffer that is always enough to compress a byte sequence of the given size.
*
* <p>
* If the allocated buffer is a direct buffer, it should be registered to be freed with the given Closer.
*/
abstract ByteBuffer allocateOutBuffer(int inputSize, Closer closer);
@ -344,6 +360,81 @@ public enum CompressionStrategy
}
}
public static class ZstdCompressor extends Compressor
{
private static final ZstdCompressor DEFAULT_COMPRESSOR = new ZstdCompressor();
@Override
ByteBuffer allocateInBuffer(int inputSize, Closer closer)
{
ByteBuffer inBuffer = ByteBuffer.allocateDirect(inputSize);
closer.register(() -> ByteBufferUtils.free(inBuffer));
return inBuffer;
}
@Override
ByteBuffer allocateOutBuffer(int inputSize, Closer closer)
{
ByteBuffer outBuffer = ByteBuffer.allocateDirect((int) Zstd.compressBound(inputSize));
closer.register(() -> ByteBufferUtils.free(outBuffer));
return outBuffer;
}
@Override
public ByteBuffer compress(ByteBuffer in, ByteBuffer out)
{
int position = in.position();
out.clear();
long sizeNeeded = Zstd.compressBound(in.remaining());
if (out.remaining() < sizeNeeded) {
throw new RuntimeException("Output buffer too small, please allocate more space. " + sizeNeeded + " required.");
}
Zstd.compress(out, in, Zstd.maxCompressionLevel());
in.position(position);
out.flip();
return out;
}
}
public static class ZstdDecompressor implements Decompressor
{
private static final ZstdDecompressor DEFAULT_COMPRESSOR = new ZstdDecompressor();
@Override
public void decompress(ByteBuffer in, int numBytes, ByteBuffer out)
{
out.clear();
if (!in.isDirect() || !out.isDirect()) {
// fall back to heap byte arrays if both buffers are not direct
final byte[] inputBytes = new byte[numBytes];
in.get(inputBytes);
try (final ResourceHolder<byte[]> outputBytesHolder = CompressedPools.getOutputBytes()) {
final byte[] outputBytes = outputBytesHolder.get();
int decompressedBytes = (int) Zstd.decompressByteArray(
outputBytes,
0,
outputBytes.length,
inputBytes,
0,
numBytes
);
out.put(outputBytes, 0, decompressedBytes);
out.flip();
}
} else {
int decompressedBytes = (int) Zstd.decompressDirectByteBuffer(
out,
out.position(),
out.remaining(),
in,
in.position(),
numBytes
);
out.limit(out.position() + decompressedBytes);
}
}
}
/**
* Logs info relating to whether LZ4 is using native or pure Java implementations
*/

View File

@ -19,7 +19,6 @@
package org.apache.druid.segment.data;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.java.util.common.ByteBufferUtils;
@ -40,7 +39,6 @@ import java.util.Collection;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -53,14 +51,7 @@ public class CompressionStrategyTest
{
return Iterables.transform(
Arrays.asList(CompressionStrategy.noNoneValues()),
new Function<CompressionStrategy, Object[]>()
{
@Override
public Object[] apply(CompressionStrategy compressionStrategy)
{
return new Object[]{compressionStrategy};
}
}
compressionStrategy -> new Object[]{compressionStrategy}
);
}
@ -101,22 +92,12 @@ public class CompressionStrategyTest
public void testBasicOperations()
{
ByteBuffer compressionOut = compressionStrategy.getCompressor().allocateOutBuffer(originalData.length, closer);
ByteBuffer compressed = compressionStrategy.getCompressor().compress(ByteBuffer.wrap(originalData), compressionOut);
ByteBuffer output = ByteBuffer.allocate(originalData.length);
compressionStrategy.getDecompressor().decompress(compressed, compressed.remaining(), output);
byte[] checkArray = new byte[DATA_SIZER];
output.get(checkArray);
Assert.assertArrayEquals("Uncompressed data does not match", originalData, checkArray);
}
@Test
public void testDirectMemoryOperations()
{
ByteBuffer compressionOut = compressionStrategy.getCompressor().allocateOutBuffer(originalData.length, closer);
ByteBuffer compressed = compressionStrategy.getCompressor().compress(ByteBuffer.wrap(originalData), compressionOut);
ByteBuffer compressionIn = compressionStrategy.getCompressor().allocateInBuffer(originalData.length, closer);
try (final ResourceHolder<ByteBuffer> holder = ByteBufferUtils.allocateDirect(originalData.length)) {
final ByteBuffer output = holder.get();
compressionIn.put(originalData);
compressionIn.rewind();
ByteBuffer compressed = compressionStrategy.getCompressor().compress(compressionIn, compressionOut);
compressionStrategy.getDecompressor().decompress(compressed, compressed.remaining(), output);
byte[] checkArray = new byte[DATA_SIZER];
output.get(checkArray);
@ -140,27 +121,33 @@ public class CompressionStrategyTest
for (int i = 0; i < numThreads; ++i) {
results.add(
threadPoolExecutor.submit(
new Callable<Boolean>()
{
@Override
public Boolean call()
{
ByteBuffer compressionOut = compressionStrategy.getCompressor().allocateOutBuffer(originalData.length, closer);
ByteBuffer compressed = compressionStrategy.getCompressor().compress(ByteBuffer.wrap(originalData), compressionOut);
ByteBuffer output = ByteBuffer.allocate(originalData.length);
() -> {
ByteBuffer compressionOut = compressionStrategy.getCompressor()
.allocateOutBuffer(originalData.length, closer);
ByteBuffer compressionIn = compressionStrategy.getCompressor()
.allocateInBuffer(originalData.length, closer);
try {
compressionIn.put(originalData);
compressionIn.position(0);
ByteBuffer compressed = compressionStrategy.getCompressor().compress(compressionIn, compressionOut);
ByteBuffer output = compressionStrategy.getCompressor().allocateOutBuffer(originalData.length, closer);
compressionStrategy.getDecompressor().decompress(compressed, compressed.remaining(), output);
byte[] checkArray = new byte[DATA_SIZER];
output.get(checkArray);
Assert.assertArrayEquals("Uncompressed data does not match", originalData, checkArray);
return true;
}
finally {
ByteBufferUtils.free(compressionIn);
ByteBufferUtils.free(compressionOut);
}
}
)
);
}
threadPoolExecutor.shutdown();
for (Future result : results) {
Assert.assertTrue((Boolean) result.get());
for (Future<Boolean> result : results) {
Assert.assertTrue(result.get());
}
}
}

View File

@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.data;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import org.apache.druid.java.util.common.ByteBufferUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
@RunWith(Parameterized.class)
public class TestColumnCompression
{
private final CompressionStrategy compressionType;
private ColumnarMultiInts compressed;
int bytes = 1;
int valuesPerRowBound = 5;
int filteredRowCount = 1000;
private BitSet filter;
private ByteBuffer buffer;
public TestColumnCompression(CompressionStrategy strategy)
{
compressionType = strategy;
}
@Parameterized.Parameters
public static Iterable<Object[]> compressionStrategies()
{
return Arrays.stream(CompressionStrategy.values())
.filter(x -> !CompressionStrategy.NONE.equals(x))
.map(strategy -> new Object[]{strategy}).collect(Collectors.toList());
}
@Before
public void setUp() throws Exception
{
Random rand = ThreadLocalRandom.current();
List<int[]> rows = new ArrayList<>();
final int bound = 1 << bytes;
for (int i = 0; i < 0x100000; i++) {
int count = rand.nextInt(valuesPerRowBound) + 1;
int[] row = new int[rand.nextInt(count)];
for (int j = 0; j < row.length; j++) {
row[j] = rand.nextInt(bound);
}
rows.add(row);
}
buffer = serialize(
CompressedVSizeColumnarMultiIntsSupplier.fromIterable(
Iterables.transform(rows, (Function<int[], ColumnarInts>) input -> VSizeColumnarInts.fromArray(input, 20)),
bound - 1,
ByteOrder.nativeOrder(),
compressionType,
Closer.create()
)
);
this.compressed = CompressedVSizeColumnarMultiIntsSupplier.fromByteBuffer(
buffer,
ByteOrder.nativeOrder()
).get();
filter = new BitSet();
for (int i = 0; i < filteredRowCount; i++) {
int rowToAccess = rand.nextInt(rows.size());
// Skip already selected rows if any
while (filter.get(rowToAccess)) {
rowToAccess = (rowToAccess + 1) % rows.size();
}
filter.set(rowToAccess);
}
}
@After
public void tearDown()
{
ByteBufferUtils.free(buffer);
}
private static ByteBuffer serialize(WritableSupplier<ColumnarMultiInts> writableSupplier) throws IOException
{
final ByteBuffer buffer = ByteBuffer.allocateDirect((int) writableSupplier.getSerializedSize());
WritableByteChannel channel = new WritableByteChannel()
{
@Override
public int write(ByteBuffer src)
{
int size = src.remaining();
buffer.put(src);
return size;
}
@Override
public boolean isOpen()
{
return true;
}
@Override
public void close()
{
}
};
writableSupplier.writeTo(channel, null);
buffer.rewind();
return buffer;
}
@Test
public void testCompressed()
{
for (int i = filter.nextSetBit(0); i >= 0; i = filter.nextSetBit(i + 1)) {
IndexedInts row = compressed.get(i);
for (int j = 0, rowSize = row != null ? row.size() : 0; j < rowSize; j++) {
row.get(j);
}
}
}
}