diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java index 69d4aa10298..b26e1dc5892 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java @@ -61,7 +61,9 @@ public class BaseColumnarLongsBenchmark * encoding of values within the block. */ @Param({ + "zstd-longs", "lz4-longs", + "zstd-auto", "lz4-auto" }) String encoding; @@ -271,6 +273,26 @@ public class BaseColumnarLongsBenchmark CompressionStrategy.NONE ); break; + case "zstd-longs": + serializer = CompressionFactory.getLongSerializer( + encoding, + writeOutMedium, + "zstd-longs", + ByteOrder.LITTLE_ENDIAN, + CompressionFactory.LongEncodingStrategy.LONGS, + CompressionStrategy.ZSTD + ); + break; + case "zstd-auto": + serializer = CompressionFactory.getLongSerializer( + encoding, + writeOutMedium, + "zstd-auto", + ByteOrder.LITTLE_ENDIAN, + CompressionFactory.LongEncodingStrategy.AUTO, + CompressionStrategy.ZSTD + ); + break; default: throw new RuntimeException("unknown encoding"); } @@ -290,6 +312,8 @@ public class BaseColumnarLongsBenchmark case "lz4-auto": case "none-auto": case "none-longs": + case "zstd-auto": + case "zstd-longs": return CompressedColumnarLongsSupplier.fromByteBuffer(buffer, ByteOrder.LITTLE_ENDIAN).get(); } diff --git a/distribution/bin/check-licenses.py b/distribution/bin/check-licenses.py index 8af9e59226e..1de9b1e22e7 100755 --- a/distribution/bin/check-licenses.py +++ b/distribution/bin/check-licenses.py @@ -233,7 +233,8 @@ def build_compatible_license_names(): compatible_licenses['BSD-2-Clause License'] = 'BSD-2-Clause License' compatible_licenses['BSD-2-Clause'] = 'BSD-2-Clause License' - compatible_licenses['BSD 2-Clause license'] = 'BSD 2-Clause License' + compatible_licenses['BSD 2-Clause license'] = 'BSD-2-Clause License' + compatible_licenses['BSD 2-Clause License'] = 'BSD-2-Clause License' compatible_licenses['BSD-3-Clause License'] = 'BSD-3-Clause License' compatible_licenses['New BSD license'] = 'BSD-3-Clause License' diff --git a/docs/development/extensions-core/kafka-supervisor-reference.md b/docs/development/extensions-core/kafka-supervisor-reference.md index a59177aa319..1e8d1b40cae 100644 --- a/docs/development/extensions-core/kafka-supervisor-reference.md +++ b/docs/development/extensions-core/kafka-supervisor-reference.md @@ -208,8 +208,8 @@ The `tuningConfig` is optional and default parameters will be used if no `tuning |Field|Type|Description|Required| |-----|----|-----------|--------| |bitmap|Object|Compression format for bitmap indexes. Should be a JSON object. See [Bitmap types](#bitmap-types) below for options.|no (defaults to Roaring)| -|dimensionCompression|String|Compression format for dimension columns. Choose from `LZ4`, `LZF`, or `uncompressed`.|no (default == `LZ4`)| -|metricCompression|String|Compression format for primitive type metric columns. Choose from `LZ4`, `LZF`, `uncompressed`, or `none`.|no (default == `LZ4`)| +|dimensionCompression|String|Compression format for dimension columns. Choose from `LZ4`, `LZF`, `ZSTD` or `uncompressed`.|no (default == `LZ4`)| +|metricCompression|String|Compression format for primitive type metric columns. Choose from `LZ4`, `LZF`, `ZSTD`, `uncompressed` or `none`.|no (default == `LZ4`)| |longEncoding|String|Encoding format for metric and dimension columns with type long. Choose from `auto` or `longs`. `auto` encodes the values using offset or lookup table depending on column cardinality, and store them with variable size. `longs` stores the value as is with 8 bytes each.|no (default == `longs`)| ##### Bitmap types diff --git a/licenses.yaml b/licenses.yaml index 68ef74ca8a8..372d452fda0 100644 --- a/licenses.yaml +++ b/licenses.yaml @@ -3964,7 +3964,7 @@ name: JNI binding for Zstd license_category: binary module: java-core license_name: BSD-2-Clause License -version: 1.3.3-1 +version: 1.5.2-3 copyright: Luben Karavelov license_file_path: licenses/bin/zstd-jni.BSD2 libraries: @@ -5028,7 +5028,7 @@ libraries: name: DNS Java license_category: binary module: java-core -license_name: BSD 2-Clause license +license_name: BSD-2-Clause License version: 2.1.7 libraries: - dnsjava: dnsjava diff --git a/pom.xml b/pom.xml index 621e0f8d743..1053739519f 100644 --- a/pom.xml +++ b/pom.xml @@ -449,7 +449,7 @@ com.github.luben zstd-jni - 1.3.3-1 + 1.5.2-3 com.fasterxml.jackson diff --git a/processing/pom.xml b/processing/pom.xml index fdcdaaffff3..4bd7a2c3c7d 100644 --- a/processing/pom.xml +++ b/processing/pom.xml @@ -169,6 +169,10 @@ io.netty netty-common + + com.github.luben + zstd-jni + diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressionStrategy.java b/processing/src/main/java/org/apache/druid/segment/data/CompressionStrategy.java index 1d229b51c8a..478983f9417 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/CompressionStrategy.java +++ b/processing/src/main/java/org/apache/druid/segment/data/CompressionStrategy.java @@ -21,6 +21,7 @@ package org.apache.druid.segment.data; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonValue; +import com.github.luben.zstd.Zstd; import com.ning.compress.BufferRecycler; import com.ning.compress.lzf.LZFDecoder; import com.ning.compress.lzf.LZFEncoder; @@ -75,6 +76,21 @@ public enum CompressionStrategy return LZ4Compressor.DEFAULT_COMPRESSOR; } }, + + ZSTD((byte) 0x2) { + @Override + public Decompressor getDecompressor() + { + return ZstdDecompressor.DEFAULT_COMPRESSOR; + } + + @Override + public Compressor getCompressor() + { + return ZstdCompressor.DEFAULT_COMPRESSOR; + } + }, + UNCOMPRESSED((byte) 0xFF) { @Override public Decompressor getDecompressor() @@ -171,7 +187,7 @@ public enum CompressionStrategy /** * Allocates a buffer that should be passed to {@link #compress} method as input buffer. Different Compressors * require (or work more efficiently with) different kinds of buffers. - * + *

* If the allocated buffer is a direct buffer, it should be registered to be freed with the given Closer. */ ByteBuffer allocateInBuffer(int inputSize, Closer closer) @@ -182,9 +198,9 @@ public enum CompressionStrategy /** * Allocates a buffer that should be passed to {@link #compress} method as output buffer. Different Compressors * require (or work more efficiently with) different kinds of buffers. - * + *

* Allocates a buffer that is always enough to compress a byte sequence of the given size. - * + *

* If the allocated buffer is a direct buffer, it should be registered to be freed with the given Closer. */ abstract ByteBuffer allocateOutBuffer(int inputSize, Closer closer); @@ -344,6 +360,81 @@ public enum CompressionStrategy } } + public static class ZstdCompressor extends Compressor + { + private static final ZstdCompressor DEFAULT_COMPRESSOR = new ZstdCompressor(); + + @Override + ByteBuffer allocateInBuffer(int inputSize, Closer closer) + { + ByteBuffer inBuffer = ByteBuffer.allocateDirect(inputSize); + closer.register(() -> ByteBufferUtils.free(inBuffer)); + return inBuffer; + } + + @Override + ByteBuffer allocateOutBuffer(int inputSize, Closer closer) + { + ByteBuffer outBuffer = ByteBuffer.allocateDirect((int) Zstd.compressBound(inputSize)); + closer.register(() -> ByteBufferUtils.free(outBuffer)); + return outBuffer; + } + + @Override + public ByteBuffer compress(ByteBuffer in, ByteBuffer out) + { + int position = in.position(); + out.clear(); + long sizeNeeded = Zstd.compressBound(in.remaining()); + if (out.remaining() < sizeNeeded) { + throw new RuntimeException("Output buffer too small, please allocate more space. " + sizeNeeded + " required."); + } + Zstd.compress(out, in, Zstd.maxCompressionLevel()); + in.position(position); + out.flip(); + return out; + } + } + + public static class ZstdDecompressor implements Decompressor + { + private static final ZstdDecompressor DEFAULT_COMPRESSOR = new ZstdDecompressor(); + + @Override + public void decompress(ByteBuffer in, int numBytes, ByteBuffer out) + { + out.clear(); + if (!in.isDirect() || !out.isDirect()) { + // fall back to heap byte arrays if both buffers are not direct + final byte[] inputBytes = new byte[numBytes]; + in.get(inputBytes); + try (final ResourceHolder outputBytesHolder = CompressedPools.getOutputBytes()) { + final byte[] outputBytes = outputBytesHolder.get(); + int decompressedBytes = (int) Zstd.decompressByteArray( + outputBytes, + 0, + outputBytes.length, + inputBytes, + 0, + numBytes + ); + out.put(outputBytes, 0, decompressedBytes); + out.flip(); + } + } else { + int decompressedBytes = (int) Zstd.decompressDirectByteBuffer( + out, + out.position(), + out.remaining(), + in, + in.position(), + numBytes + ); + out.limit(out.position() + decompressedBytes); + } + } + } + /** * Logs info relating to whether LZ4 is using native or pure Java implementations */ diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressionStrategyTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressionStrategyTest.java index 44a97d01e67..23c0d64e5b2 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/CompressionStrategyTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/CompressionStrategyTest.java @@ -19,7 +19,6 @@ package org.apache.druid.segment.data; -import com.google.common.base.Function; import com.google.common.collect.Iterables; import org.apache.druid.collections.ResourceHolder; import org.apache.druid.java.util.common.ByteBufferUtils; @@ -40,7 +39,6 @@ import java.util.Collection; import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -53,14 +51,7 @@ public class CompressionStrategyTest { return Iterables.transform( Arrays.asList(CompressionStrategy.noNoneValues()), - new Function() - { - @Override - public Object[] apply(CompressionStrategy compressionStrategy) - { - return new Object[]{compressionStrategy}; - } - } + compressionStrategy -> new Object[]{compressionStrategy} ); } @@ -101,22 +92,12 @@ public class CompressionStrategyTest public void testBasicOperations() { ByteBuffer compressionOut = compressionStrategy.getCompressor().allocateOutBuffer(originalData.length, closer); - ByteBuffer compressed = compressionStrategy.getCompressor().compress(ByteBuffer.wrap(originalData), compressionOut); - ByteBuffer output = ByteBuffer.allocate(originalData.length); - compressionStrategy.getDecompressor().decompress(compressed, compressed.remaining(), output); - byte[] checkArray = new byte[DATA_SIZER]; - output.get(checkArray); - Assert.assertArrayEquals("Uncompressed data does not match", originalData, checkArray); - } - - @Test - public void testDirectMemoryOperations() - { - ByteBuffer compressionOut = compressionStrategy.getCompressor().allocateOutBuffer(originalData.length, closer); - ByteBuffer compressed = compressionStrategy.getCompressor().compress(ByteBuffer.wrap(originalData), compressionOut); - + ByteBuffer compressionIn = compressionStrategy.getCompressor().allocateInBuffer(originalData.length, closer); try (final ResourceHolder holder = ByteBufferUtils.allocateDirect(originalData.length)) { final ByteBuffer output = holder.get(); + compressionIn.put(originalData); + compressionIn.rewind(); + ByteBuffer compressed = compressionStrategy.getCompressor().compress(compressionIn, compressionOut); compressionStrategy.getDecompressor().decompress(compressed, compressed.remaining(), output); byte[] checkArray = new byte[DATA_SIZER]; output.get(checkArray); @@ -140,27 +121,33 @@ public class CompressionStrategyTest for (int i = 0; i < numThreads; ++i) { results.add( threadPoolExecutor.submit( - new Callable() - { - @Override - public Boolean call() - { - ByteBuffer compressionOut = compressionStrategy.getCompressor().allocateOutBuffer(originalData.length, closer); - ByteBuffer compressed = compressionStrategy.getCompressor().compress(ByteBuffer.wrap(originalData), compressionOut); - ByteBuffer output = ByteBuffer.allocate(originalData.length); + () -> { + ByteBuffer compressionOut = compressionStrategy.getCompressor() + .allocateOutBuffer(originalData.length, closer); + ByteBuffer compressionIn = compressionStrategy.getCompressor() + .allocateInBuffer(originalData.length, closer); + try { + compressionIn.put(originalData); + compressionIn.position(0); + ByteBuffer compressed = compressionStrategy.getCompressor().compress(compressionIn, compressionOut); + ByteBuffer output = compressionStrategy.getCompressor().allocateOutBuffer(originalData.length, closer); compressionStrategy.getDecompressor().decompress(compressed, compressed.remaining(), output); byte[] checkArray = new byte[DATA_SIZER]; output.get(checkArray); Assert.assertArrayEquals("Uncompressed data does not match", originalData, checkArray); return true; } + finally { + ByteBufferUtils.free(compressionIn); + ByteBufferUtils.free(compressionOut); + } } ) ); } threadPoolExecutor.shutdown(); - for (Future result : results) { - Assert.assertTrue((Boolean) result.get()); + for (Future result : results) { + Assert.assertTrue(result.get()); } } } diff --git a/processing/src/test/java/org/apache/druid/segment/data/TestColumnCompression.java b/processing/src/test/java/org/apache/druid/segment/data/TestColumnCompression.java new file mode 100644 index 00000000000..5833bdd538d --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/data/TestColumnCompression.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.data; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import org.apache.druid.java.util.common.ByteBufferUtils; +import org.apache.druid.java.util.common.io.Closer; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.WritableByteChannel; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.BitSet; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; + +@RunWith(Parameterized.class) +public class TestColumnCompression +{ + + private final CompressionStrategy compressionType; + private ColumnarMultiInts compressed; + int bytes = 1; + int valuesPerRowBound = 5; + int filteredRowCount = 1000; + private BitSet filter; + private ByteBuffer buffer; + + public TestColumnCompression(CompressionStrategy strategy) + { + compressionType = strategy; + } + + @Parameterized.Parameters + public static Iterable compressionStrategies() + { + return Arrays.stream(CompressionStrategy.values()) + .filter(x -> !CompressionStrategy.NONE.equals(x)) + .map(strategy -> new Object[]{strategy}).collect(Collectors.toList()); + } + + @Before + public void setUp() throws Exception + { + Random rand = ThreadLocalRandom.current(); + List rows = new ArrayList<>(); + final int bound = 1 << bytes; + for (int i = 0; i < 0x100000; i++) { + int count = rand.nextInt(valuesPerRowBound) + 1; + int[] row = new int[rand.nextInt(count)]; + for (int j = 0; j < row.length; j++) { + row[j] = rand.nextInt(bound); + } + rows.add(row); + } + + buffer = serialize( + CompressedVSizeColumnarMultiIntsSupplier.fromIterable( + Iterables.transform(rows, (Function) input -> VSizeColumnarInts.fromArray(input, 20)), + bound - 1, + ByteOrder.nativeOrder(), + compressionType, + Closer.create() + ) + ); + this.compressed = CompressedVSizeColumnarMultiIntsSupplier.fromByteBuffer( + buffer, + ByteOrder.nativeOrder() + ).get(); + + filter = new BitSet(); + for (int i = 0; i < filteredRowCount; i++) { + int rowToAccess = rand.nextInt(rows.size()); + // Skip already selected rows if any + while (filter.get(rowToAccess)) { + rowToAccess = (rowToAccess + 1) % rows.size(); + } + filter.set(rowToAccess); + } + } + + @After + public void tearDown() + { + ByteBufferUtils.free(buffer); + } + + private static ByteBuffer serialize(WritableSupplier writableSupplier) throws IOException + { + final ByteBuffer buffer = ByteBuffer.allocateDirect((int) writableSupplier.getSerializedSize()); + WritableByteChannel channel = new WritableByteChannel() + { + @Override + public int write(ByteBuffer src) + { + int size = src.remaining(); + buffer.put(src); + return size; + } + + @Override + public boolean isOpen() + { + return true; + } + + @Override + public void close() + { + + } + }; + + writableSupplier.writeTo(channel, null); + buffer.rewind(); + return buffer; + } + + @Test + public void testCompressed() + { + for (int i = filter.nextSetBit(0); i >= 0; i = filter.nextSetBit(i + 1)) { + IndexedInts row = compressed.get(i); + for (int j = 0, rowSize = row != null ? row.size() : 0; j < rowSize; j++) { + row.get(j); + } + } + } +}