From d90522163ffbf17da4c5f4d5cc418ebf3808f99e Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Fri, 22 Apr 2022 16:42:12 -0700 Subject: [PATCH] HBASE-26959 Brotli compression support (#4353) Signed-off-by: Nick Dimiduk --- hbase-assembly/pom.xml | 4 + .../hadoop/hbase/io/compress/Compression.java | 34 ++- .../hbase/io/compress/CompressionUtil.java | 12 + .../aircompressor/HadoopCompressor.java | 2 +- .../aircompressor/HadoopDecompressor.java | 2 +- .../io/compress/aircompressor/Lz4Codec.java | 8 +- .../io/compress/aircompressor/LzoCodec.java | 8 +- .../compress/aircompressor/SnappyCodec.java | 8 +- .../io/compress/aircompressor/ZstdCodec.java | 12 +- .../hbase-compression-brotli/pom.xml | 167 ++++++++++++++ .../hbase/io/compress/brotli/BrotliCodec.java | 127 +++++++++++ .../io/compress/brotli/BrotliCompressor.java | 214 ++++++++++++++++++ .../compress/brotli/BrotliDecompressor.java | 154 +++++++++++++ .../io/compress/brotli/TestBrotliCodec.java | 50 ++++ .../brotli/TestHFileCompressionBrotli.java | 57 +++++ .../brotli/TestWALCompressionBrotli.java | 69 ++++++ .../hbase/io/compress/lz4/Lz4Codec.java | 8 +- .../hbase/io/compress/lz4/Lz4Compressor.java | 2 +- .../io/compress/lz4/Lz4Decompressor.java | 2 +- .../hbase/io/compress/xerial/SnappyCodec.java | 8 +- .../io/compress/xerial/SnappyCompressor.java | 2 +- .../compress/xerial/SnappyDecompressor.java | 2 +- .../hbase/io/compress/xz/LzmaCodec.java | 5 +- .../hbase/io/compress/xz/LzmaCompressor.java | 4 +- .../io/compress/xz/LzmaDecompressor.java | 2 +- .../hbase/io/compress/zstd/ZstdCodec.java | 12 +- .../io/compress/zstd/ZstdCompressor.java | 2 +- .../io/compress/zstd/ZstdDecompressor.java | 2 +- .../io/compress/zstd/TestZstdDictionary.java | 6 +- hbase-compression/pom.xml | 1 + .../main/resources/supplemental-models.xml | 67 ++++++ pom.xml | 6 + 32 files changed, 1012 insertions(+), 47 deletions(-) create mode 100644 hbase-compression/hbase-compression-brotli/pom.xml create mode 100644 hbase-compression/hbase-compression-brotli/src/main/java/org/apache/hadoop/hbase/io/compress/brotli/BrotliCodec.java create mode 100644 hbase-compression/hbase-compression-brotli/src/main/java/org/apache/hadoop/hbase/io/compress/brotli/BrotliCompressor.java create mode 100644 hbase-compression/hbase-compression-brotli/src/main/java/org/apache/hadoop/hbase/io/compress/brotli/BrotliDecompressor.java create mode 100644 hbase-compression/hbase-compression-brotli/src/test/java/org/apache/hadoop/hbase/io/compress/brotli/TestBrotliCodec.java create mode 100644 hbase-compression/hbase-compression-brotli/src/test/java/org/apache/hadoop/hbase/io/compress/brotli/TestHFileCompressionBrotli.java create mode 100644 hbase-compression/hbase-compression-brotli/src/test/java/org/apache/hadoop/hbase/io/compress/brotli/TestWALCompressionBrotli.java diff --git a/hbase-assembly/pom.xml b/hbase-assembly/pom.xml index 796f443e9b3..298be40c0b5 100644 --- a/hbase-assembly/pom.xml +++ b/hbase-assembly/pom.xml @@ -314,6 +314,10 @@ org.apache.hbase hbase-compression-aircompressor + + org.apache.hbase + hbase-compression-brotli + org.apache.hbase hbase-compression-lz4 diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java index 8bff2944ccc..e60b9ce3b7d 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java @@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory; public final class Compression { private static final Logger LOG = LoggerFactory.getLogger(Compression.class); - // LZO public static final String LZO_CODEC_CLASS_KEY = @@ -97,6 +96,13 @@ public final class Compression { public static final String LZMA_CODEC_CLASS_DEFAULT = "org.apache.hadoop.hbase.io.compress.xz.LzmaCodec"; + // Brotli + + public static final String BROTLI_CODEC_CLASS_KEY = + "hbase.io.compress.brotli.codec"; + public static final String BROTLI_CODEC_CLASS_DEFAULT = + "org.apache.hadoop.hbase.io.compress.brotli.BrotliCodec"; + /** * Prevent the instantiation of class. */ @@ -148,6 +154,7 @@ public final class Compression { @edu.umd.cs.findbugs.annotations.SuppressWarnings( value="SE_TRANSIENT_FIELD_NOT_RESTORED", justification="We are not serializing so doesn't apply (not sure why transient though)") + @SuppressWarnings("ImmutableEnumChecker") @InterfaceAudience.Public public static enum Algorithm { // LZO is GPL and requires extra install to setup. See @@ -352,6 +359,31 @@ public final class Compression { return lzmaCodec; } } + }, + + BROTLI("brotli", BROTLI_CODEC_CLASS_KEY, BROTLI_CODEC_CLASS_DEFAULT) { + // Use base type to avoid compile-time dependencies. + private volatile transient CompressionCodec brotliCodec; + private final transient Object lock = new Object(); + @Override + CompressionCodec getCodec(Configuration conf) { + if (brotliCodec == null) { + synchronized (lock) { + if (brotliCodec == null) { + brotliCodec = buildCodec(conf, this); + } + } + } + return brotliCodec; + } + @Override + public CompressionCodec reload(Configuration conf) { + synchronized (lock) { + brotliCodec = buildCodec(conf, this); + LOG.warn("Reloaded configuration for {}", name()); + return brotliCodec; + } + } }; private final Configuration conf; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/CompressionUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/CompressionUtil.java index 70b959a1172..718cc70f1e4 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/CompressionUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/CompressionUtil.java @@ -35,4 +35,16 @@ public final class CompressionUtil { return v; } + /** + * Most compression algorithms can be presented with pathological input that causes an + * expansion rather than a compression. Hadoop's compression API requires that we calculate + * additional buffer space required for the worst case. There is a formula developed for + * gzip that applies as a ballpark to all LZ variants. It should be good enough for now and + * has been tested as such with a range of different inputs. + */ + public static int compressionOverhead(int bufferSize) { + // Given an input buffer of 'buffersize' bytes we presume a worst case expansion of + // 32 bytes (block header) and addition 1/6th of the input size. + return (bufferSize / 6) + 32; + } } diff --git a/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/HadoopCompressor.java b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/HadoopCompressor.java index c448f58dbf5..d5fd3cfdd3d 100644 --- a/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/HadoopCompressor.java +++ b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/HadoopCompressor.java @@ -57,7 +57,7 @@ public abstract class HadoopCompressor if (outBuf.hasRemaining()) { int remaining = outBuf.remaining(), n = Math.min(remaining, len); outBuf.get(b, off, n); - LOG.trace("compress: {} bytes from outBuf", n); + LOG.trace("compress: read {} remaining bytes from outBuf", n); return n; } // We don't actually begin compression until our caller calls finish(). diff --git a/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/HadoopDecompressor.java b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/HadoopDecompressor.java index f5f5b83ab30..868094f32fc 100644 --- a/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/HadoopDecompressor.java +++ b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/HadoopDecompressor.java @@ -51,7 +51,7 @@ public class HadoopDecompressor if (outBuf.hasRemaining()) { int remaining = outBuf.remaining(), n = Math.min(remaining, len); outBuf.get(b, off, n); - LOG.trace("decompress: {} bytes from outBuf", n); + LOG.trace("decompress: read {} remaining bytes from outBuf", n); return n; } if (inBuf.position() > 0) { diff --git a/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/Lz4Codec.java b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/Lz4Codec.java index c1766dc0456..81199531ad9 100644 --- a/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/Lz4Codec.java +++ b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/Lz4Codec.java @@ -23,6 +23,7 @@ import java.io.OutputStream; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.hbase.io.compress.CompressionUtil; import org.apache.hadoop.io.compress.BlockCompressorStream; import org.apache.hadoop.io.compress.BlockDecompressorStream; import org.apache.hadoop.io.compress.CompressionCodec; @@ -91,8 +92,8 @@ public class Lz4Codec implements Configurable, CompressionCodec { public CompressionOutputStream createOutputStream(OutputStream out, Compressor c) throws IOException { int bufferSize = getBufferSize(conf); - int compressionOverhead = (bufferSize / 6) + 32; - return new BlockCompressorStream(out, c, bufferSize, compressionOverhead); + return new BlockCompressorStream(out, c, bufferSize, + CompressionUtil.compressionOverhead(bufferSize)); } @Override @@ -149,10 +150,9 @@ public class Lz4Codec implements Configurable, CompressionCodec { // Package private static int getBufferSize(Configuration conf) { - int size = conf.getInt(LZ4_BUFFER_SIZE_KEY, + return conf.getInt(LZ4_BUFFER_SIZE_KEY, conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY, CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT)); - return size > 0 ? size : 256 * 1024; // Don't change this default } } diff --git a/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/LzoCodec.java b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/LzoCodec.java index 3e5ab049e95..57ac8daada7 100644 --- a/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/LzoCodec.java +++ b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/LzoCodec.java @@ -23,6 +23,7 @@ import java.io.OutputStream; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.hbase.io.compress.CompressionUtil; import org.apache.hadoop.io.compress.BlockCompressorStream; import org.apache.hadoop.io.compress.BlockDecompressorStream; import org.apache.hadoop.io.compress.CompressionCodec; @@ -91,8 +92,8 @@ public class LzoCodec implements Configurable, CompressionCodec { public CompressionOutputStream createOutputStream(OutputStream out, Compressor c) throws IOException { int bufferSize = getBufferSize(conf); - int compressionOverhead = (bufferSize / 6) + 32; - return new BlockCompressorStream(out, c, bufferSize, compressionOverhead); + return new BlockCompressorStream(out, c, bufferSize, + CompressionUtil.compressionOverhead(bufferSize)); } @Override @@ -149,10 +150,9 @@ public class LzoCodec implements Configurable, CompressionCodec { // Package private static int getBufferSize(Configuration conf) { - int size = conf.getInt(LZO_BUFFER_SIZE_KEY, + return conf.getInt(LZO_BUFFER_SIZE_KEY, conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT)); - return size > 0 ? size : 256 * 1024; // Don't change this default } } diff --git a/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/SnappyCodec.java b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/SnappyCodec.java index e325b8b625a..3669b1d9d2a 100644 --- a/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/SnappyCodec.java +++ b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/SnappyCodec.java @@ -23,6 +23,7 @@ import java.io.OutputStream; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.hbase.io.compress.CompressionUtil; import org.apache.hadoop.io.compress.BlockCompressorStream; import org.apache.hadoop.io.compress.BlockDecompressorStream; import org.apache.hadoop.io.compress.CompressionCodec; @@ -91,8 +92,8 @@ public class SnappyCodec implements Configurable, CompressionCodec { public CompressionOutputStream createOutputStream(OutputStream out, Compressor c) throws IOException { int bufferSize = getBufferSize(conf); - int compressionOverhead = (bufferSize / 6) + 32; - return new BlockCompressorStream(out, c, bufferSize, compressionOverhead); + return new BlockCompressorStream(out, c, bufferSize, + CompressionUtil.compressionOverhead(bufferSize)); } @Override @@ -149,10 +150,9 @@ public class SnappyCodec implements Configurable, CompressionCodec { // Package private static int getBufferSize(Configuration conf) { - int size = conf.getInt(SNAPPY_BUFFER_SIZE_KEY, + return conf.getInt(SNAPPY_BUFFER_SIZE_KEY, conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY, CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT)); - return size > 0 ? size : 256 * 1024; // Don't change this default } } diff --git a/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/ZstdCodec.java b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/ZstdCodec.java index a25943fbb48..f653dc0f676 100644 --- a/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/ZstdCodec.java +++ b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/ZstdCodec.java @@ -23,6 +23,7 @@ import java.io.OutputStream; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.hbase.io.compress.CompressionUtil; import org.apache.hadoop.io.compress.BlockCompressorStream; import org.apache.hadoop.io.compress.BlockDecompressorStream; import org.apache.hadoop.io.compress.CompressionCodec; @@ -52,6 +53,7 @@ import io.airlift.compress.zstd.ZstdDecompressor; public class ZstdCodec implements Configurable, CompressionCodec { public static final String ZSTD_BUFFER_SIZE_KEY = "hbase.io.compress.zstd.buffersize"; + public static final int ZSTD_BUFFER_SIZE_DEFAULT = 256 * 1024; private Configuration conf; @@ -99,8 +101,8 @@ public class ZstdCodec implements Configurable, CompressionCodec { public CompressionOutputStream createOutputStream(OutputStream out, Compressor c) throws IOException { int bufferSize = getBufferSize(conf); - int compressionOverhead = (bufferSize / 6) + 32; - return new BlockCompressorStream(out, c, bufferSize, compressionOverhead); + return new BlockCompressorStream(out, c, bufferSize, + CompressionUtil.compressionOverhead(bufferSize)); } @Override @@ -157,10 +159,10 @@ public class ZstdCodec implements Configurable, CompressionCodec { // Package private static int getBufferSize(Configuration conf) { - int size = conf.getInt(ZSTD_BUFFER_SIZE_KEY, + return conf.getInt(ZSTD_BUFFER_SIZE_KEY, conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY, - CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT)); - return size > 0 ? size : 256 * 1024; // Don't change this default + // IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT is 0! We can't allow that. + ZSTD_BUFFER_SIZE_DEFAULT)); } } diff --git a/hbase-compression/hbase-compression-brotli/pom.xml b/hbase-compression/hbase-compression-brotli/pom.xml new file mode 100644 index 00000000000..4f60bd36711 --- /dev/null +++ b/hbase-compression/hbase-compression-brotli/pom.xml @@ -0,0 +1,167 @@ + + + + 4.0.0 + + hbase-compression + org.apache.hbase + 2.5.0-SNAPSHOT + .. + + hbase-compression-brotli + Apache HBase - Compression - Brotli + Compression support using Brotli4j + + + + + maven-surefire-plugin + + + net.revelc.code + warbucks-maven-plugin + + + + + + + maven-assembly-plugin + + true + + + + org.apache.maven.plugins + maven-checkstyle-plugin + + true + + + + net.revelc.code + warbucks-maven-plugin + + + + + + + + org.apache.hbase + hbase-common + + + org.apache.hbase + hbase-logging + test-jar + test + + + org.apache.hbase + hbase-common + test-jar + test + + + org.apache.hbase + hbase-testing-util + test + + + org.apache.hbase + hbase-annotations + test-jar + test + + + org.slf4j + slf4j-api + + + com.github.stephenc.findbugs + findbugs-annotations + compile + true + + + + com.aayushatharva.brotli4j + brotli4j + ${brotli4j.version} + + + + org.slf4j + jcl-over-slf4j + test + + + org.slf4j + jul-to-slf4j + test + + + org.apache.logging.log4j + log4j-api + test + + + org.apache.logging.log4j + log4j-core + test + + + org.apache.logging.log4j + log4j-slf4j-impl + test + + + org.apache.logging.log4j + log4j-1.2-api + test + + + org.hamcrest + hamcrest-library + test + + + org.mockito + mockito-core + test + + + + + build-with-jdk11 + + [1.11,) + + + + javax.annotation + javax.annotation-api + + + + + diff --git a/hbase-compression/hbase-compression-brotli/src/main/java/org/apache/hadoop/hbase/io/compress/brotli/BrotliCodec.java b/hbase-compression/hbase-compression-brotli/src/main/java/org/apache/hadoop/hbase/io/compress/brotli/BrotliCodec.java new file mode 100644 index 00000000000..d052d6a0838 --- /dev/null +++ b/hbase-compression/hbase-compression-brotli/src/main/java/org/apache/hadoop/hbase/io/compress/brotli/BrotliCodec.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.compress.brotli; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.io.compress.CompressionUtil; +import org.apache.hadoop.io.compress.BlockCompressorStream; +import org.apache.hadoop.io.compress.BlockDecompressorStream; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionInputStream; +import org.apache.hadoop.io.compress.CompressionOutputStream; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Hadoop brotli codec implemented with Brotli4j + */ +@InterfaceAudience.Private +public class BrotliCodec implements Configurable, CompressionCodec { + + public static final String BROTLI_LEVEL_KEY = "hbase.io.compress.brotli.level"; + // Our default is 6, based on https://blog.cloudflare.com/results-experimenting-brotli/ + public static final int BROTLI_LEVEL_DEFAULT = 6; // [0,11] or -1 + public static final String BROTLI_WINDOW_KEY = "hbase.io.compress.brotli.window"; + public static final int BROTLI_WINDOW_DEFAULT = -1; // [10-24] or -1 + public static final String BROTLI_BUFFERSIZE_KEY = "hbase.io.compress.brotli.buffersize"; + public static final int BROTLI_BUFFERSIZE_DEFAULT = 256 * 1024; + + private Configuration conf; + + public BrotliCodec() { + conf = new Configuration(); + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public Compressor createCompressor() { + return new BrotliCompressor(getLevel(conf), getWindow(conf), getBufferSize(conf)); + } + + @Override + public Decompressor createDecompressor() { + return new BrotliDecompressor(getBufferSize(conf)); + } + + @Override + public CompressionInputStream createInputStream(InputStream in) throws IOException { + return createInputStream(in, createDecompressor()); + } + + @Override + public CompressionInputStream createInputStream(InputStream in, Decompressor d) + throws IOException { + return new BlockDecompressorStream(in, d, getBufferSize(conf)); + } + + @Override + public CompressionOutputStream createOutputStream(OutputStream out) throws IOException { + return createOutputStream(out, createCompressor()); + } + + @Override + public CompressionOutputStream createOutputStream(OutputStream out, Compressor c) + throws IOException { + int bufferSize = getBufferSize(conf); + return new BlockCompressorStream(out, c, bufferSize, + CompressionUtil.compressionOverhead(bufferSize)); + } + + @Override + public Class getCompressorType() { + return BrotliCompressor.class; + } + + @Override + public Class getDecompressorType() { + return BrotliDecompressor.class; + } + + @Override + public String getDefaultExtension() { + return ".br"; + } + + // Package private + + static int getLevel(Configuration conf) { + return conf.getInt(BROTLI_LEVEL_KEY, BROTLI_LEVEL_DEFAULT); + } + + static int getWindow(Configuration conf) { + return conf.getInt(BROTLI_WINDOW_KEY, BROTLI_WINDOW_DEFAULT); + } + + static int getBufferSize(Configuration conf) { + return conf.getInt(BROTLI_BUFFERSIZE_KEY, BROTLI_BUFFERSIZE_DEFAULT); + } + +} diff --git a/hbase-compression/hbase-compression-brotli/src/main/java/org/apache/hadoop/hbase/io/compress/brotli/BrotliCompressor.java b/hbase-compression/hbase-compression-brotli/src/main/java/org/apache/hadoop/hbase/io/compress/brotli/BrotliCompressor.java new file mode 100644 index 00000000000..c45eb0d1401 --- /dev/null +++ b/hbase-compression/hbase-compression-brotli/src/main/java/org/apache/hadoop/hbase/io/compress/brotli/BrotliCompressor.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.compress.brotli; + +import com.aayushatharva.brotli4j.Brotli4jLoader; +import com.aayushatharva.brotli4j.encoder.Encoder; +import com.aayushatharva.brotli4j.encoder.Encoders; + +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.io.compress.CanReinit; +import org.apache.hadoop.hbase.io.compress.CompressionUtil; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Hadoop compressor glue for Brotli4j + */ +@InterfaceAudience.Private +public class BrotliCompressor implements CanReinit, Compressor { + + protected static final Logger LOG = LoggerFactory.getLogger(BrotliCompressor.class); + protected ByteBuffer inBuf, outBuf; + protected int bufferSize; + protected boolean finish, finished; + protected long bytesRead, bytesWritten; + protected Encoder.Parameters params; + + static { + Brotli4jLoader.ensureAvailability(); + } + + BrotliCompressor(int level, int window, int bufferSize) { + this.bufferSize = bufferSize; + this.inBuf = ByteBuffer.allocate(bufferSize); + this.outBuf = ByteBuffer.allocate(bufferSize); + this.outBuf.position(bufferSize); + params = new Encoder.Parameters(); + params.setQuality(level); + params.setWindow(window); + } + + @Override + public int compress(byte[] b, int off, int len) throws IOException { + // If we have previously compressed our input and still have some buffered bytes + // remaining, provide them to the caller. + if (outBuf.hasRemaining()) { + int remaining = outBuf.remaining(), n = Math.min(remaining, len); + outBuf.get(b, off, n); + LOG.trace("compress: read {} remaining bytes from outBuf", n); + return n; + } + // We don't actually begin compression until our caller calls finish(). + if (finish) { + if (inBuf.position() > 0) { + inBuf.flip(); + int uncompressed = inBuf.remaining(); + // If we don't have enough capacity in our currently allocated output buffer, + // allocate a new one which does. + int needed = maxCompressedLength(uncompressed); + // Can we compress directly into the provided array? + boolean direct = false; + ByteBuffer writeBuf; + if (len <= needed) { + direct = true; + writeBuf = ByteBuffer.wrap(b, off, len); + } else { + if (outBuf.capacity() < needed) { + needed = CompressionUtil.roundInt2(needed); + LOG.trace("compress: resize outBuf {}", needed); + outBuf = ByteBuffer.allocate(needed); + } else { + outBuf.clear(); + } + writeBuf = outBuf; + } + final int oldPos = writeBuf.position(); + Encoders.compress(inBuf, writeBuf, params); + final int written = writeBuf.position() - oldPos; + bytesWritten += written; + inBuf.clear(); + LOG.trace("compress: compressed {} -> {}", uncompressed, written); + finished = true; + if (!direct) { + outBuf.flip(); + int n = Math.min(written, len); + outBuf.get(b, off, n); + LOG.trace("compress: {} bytes", n); + return n; + } else { + LOG.trace("compress: {} bytes direct", written); + return written; + } + } else { + finished = true; + } + } + LOG.trace("No output"); + return 0; + } + + @Override + public void end() { + LOG.trace("end"); + } + + @Override + public void finish() { + LOG.trace("finish"); + finish = true; + } + + @Override + public boolean finished() { + boolean b = finished && !outBuf.hasRemaining(); + LOG.trace("finished: {}", b); + return b; + } + + @Override + public long getBytesRead() { + return bytesRead; + } + + @Override + public long getBytesWritten() { + return bytesWritten; + } + + @Override + public boolean needsInput() { + boolean b = !finished(); + LOG.trace("needsInput: {}", b); + return b; + } + + @Override + public void reinit(Configuration conf) { + LOG.trace("reinit"); + if (conf != null) { + // Quality or window settings might have changed + params.setQuality(BrotliCodec.getLevel(conf)); + params.setWindow(BrotliCodec.getWindow(conf)); + // Buffer size might have changed + int newBufferSize = BrotliCodec.getBufferSize(conf); + if (bufferSize != newBufferSize) { + bufferSize = newBufferSize; + this.inBuf = ByteBuffer.allocateDirect(bufferSize); + this.outBuf = ByteBuffer.allocateDirect(bufferSize); + } + } + reset(); + } + + @Override + public void reset() { + LOG.trace("reset"); + inBuf.clear(); + outBuf.clear(); + outBuf.position(outBuf.capacity()); + bytesRead = 0; + bytesWritten = 0; + finish = false; + finished = false; + } + + @Override + public void setDictionary(byte[] b, int off, int len) { + throw new UnsupportedOperationException("setDictionary is not supported"); + } + + @Override + public void setInput(byte[] b, int off, int len) { + LOG.trace("setInput: off={} len={}", off, len); + if (inBuf.remaining() < len) { + // Get a new buffer that can accomodate the accumulated input plus the additional + // input that would cause a buffer overflow without reallocation. + // This condition should be fortunately rare, because it is expensive. + int needed = CompressionUtil.roundInt2(inBuf.capacity() + len); + LOG.trace("setInput: resize inBuf {}", needed); + ByteBuffer newBuf = ByteBuffer.allocate(needed); + inBuf.flip(); + newBuf.put(inBuf); + inBuf = newBuf; + } + inBuf.put(b, off, len); + bytesRead += len; + finished = false; + } + + // Package private + + int maxCompressedLength(int len) { + return len + CompressionUtil.compressionOverhead(len); + } + +} diff --git a/hbase-compression/hbase-compression-brotli/src/main/java/org/apache/hadoop/hbase/io/compress/brotli/BrotliDecompressor.java b/hbase-compression/hbase-compression-brotli/src/main/java/org/apache/hadoop/hbase/io/compress/brotli/BrotliDecompressor.java new file mode 100644 index 00000000000..8f167cd3960 --- /dev/null +++ b/hbase-compression/hbase-compression-brotli/src/main/java/org/apache/hadoop/hbase/io/compress/brotli/BrotliDecompressor.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.compress.brotli; + +import com.aayushatharva.brotli4j.Brotli4jLoader; +import com.aayushatharva.brotli4j.decoder.Decoder; +import com.aayushatharva.brotli4j.decoder.DirectDecompress; +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.hadoop.hbase.io.compress.CompressionUtil; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Hadoop decompressor glue for Brotli4j + */ +@InterfaceAudience.Private +public class BrotliDecompressor implements Decompressor { + + protected static final Logger LOG = LoggerFactory.getLogger(BrotliDecompressor.class); + protected ByteBuffer inBuf, outBuf; + protected int inLen; + protected boolean finished; + + static { + Brotli4jLoader.ensureAvailability(); + } + + BrotliDecompressor(int bufferSize) { + this.inBuf = ByteBuffer.allocate(bufferSize); + this.outBuf = ByteBuffer.allocate(bufferSize); + this.outBuf.position(bufferSize); + } + + @Override + public int decompress(byte[] b, int off, int len) throws IOException { + if (outBuf.hasRemaining()) { + int remaining = outBuf.remaining(), n = Math.min(remaining, len); + outBuf.get(b, off, n); + LOG.trace("decompress: read {} remaining bytes from outBuf", n); + return n; + } + if (inBuf.position() > 0) { + inBuf.flip(); + int remaining = inBuf.remaining(); + inLen -= remaining; + outBuf.rewind(); + outBuf.limit(outBuf.capacity()); + + // TODO: More inefficient than it could be, but it doesn't impact decompression speed + // terribly and the brotli4j API alternatives do not seem to work correctly. + // Maybe something more clever can be done as a future improvement. + final byte[] inb = new byte[remaining]; + inBuf.get(inb); + DirectDecompress result = Decoder.decompress(inb); + outBuf.put(result.getDecompressedDataByteBuf().nioBuffer()); + final int written = outBuf.position(); + + inBuf.rewind(); + inBuf.limit(inBuf.capacity()); + LOG.trace("decompress: decompressed {} -> {}", remaining, written); + outBuf.flip(); + int n = Math.min(written, len); + outBuf.get(b, off, n); + LOG.trace("decompress: {} bytes", n); + return n; + } + LOG.trace("decompress: No output, finished"); + finished = true; + return 0; + } + + + @Override + public void end() { + LOG.trace("end"); + } + + @Override + public boolean finished() { + LOG.trace("finished"); + return finished; + } + + @Override + public int getRemaining() { + LOG.trace("getRemaining: {}", inLen); + return inLen; + } + + @Override + public boolean needsDictionary() { + LOG.trace("needsDictionary"); + return false; + } + + @Override + public void reset() { + LOG.trace("reset"); + inBuf.clear(); + inLen = 0; + outBuf.clear(); + outBuf.position(outBuf.capacity()); + finished = false; + } + + @Override + public boolean needsInput() { + boolean b = (inBuf.position() == 0); + LOG.trace("needsInput: {}", b); + return b; + } + + @Override + public void setDictionary(byte[] b, int off, int len) { + throw new UnsupportedOperationException("setDictionary is not supported"); + } + + @Override + public void setInput(byte[] b, int off, int len) { + LOG.trace("setInput: off={} len={}", off, len); + if (inBuf.remaining() < len) { + // Get a new buffer that can accomodate the accumulated input plus the additional + // input that would cause a buffer overflow without reallocation. + // This condition should be fortunately rare, because it is expensive. + int needed = CompressionUtil.roundInt2(inBuf.capacity() + len); + LOG.trace("setInput: resize inBuf {}", needed); + ByteBuffer newBuf = ByteBuffer.allocate(needed); + inBuf.flip(); + newBuf.put(inBuf); + inBuf = newBuf; + } + inBuf.put(b, off, len); + inLen += len; + finished = false; + } + +} diff --git a/hbase-compression/hbase-compression-brotli/src/test/java/org/apache/hadoop/hbase/io/compress/brotli/TestBrotliCodec.java b/hbase-compression/hbase-compression-brotli/src/test/java/org/apache/hadoop/hbase/io/compress/brotli/TestBrotliCodec.java new file mode 100644 index 00000000000..50de8aae607 --- /dev/null +++ b/hbase-compression/hbase-compression-brotli/src/test/java/org/apache/hadoop/hbase/io/compress/brotli/TestBrotliCodec.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.compress.brotli; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.io.compress.CompressionTestBase; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestBrotliCodec extends CompressionTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestBrotliCodec.class); + + @Test + public void testBrotliCodecSmall() throws Exception { + codecSmallTest(new BrotliCodec()); + } + + @Test + public void testBrotliCodecLarge() throws Exception { + codecLargeTest(new BrotliCodec(), 1.1); // poor compressability + codecLargeTest(new BrotliCodec(), 2); + codecLargeTest(new BrotliCodec(), 10); // very high compressability + } + + @Test + public void testBrotliCodecVeryLarge() throws Exception { + codecVeryLargeTest(new BrotliCodec(), 3); // like text + } + +} diff --git a/hbase-compression/hbase-compression-brotli/src/test/java/org/apache/hadoop/hbase/io/compress/brotli/TestHFileCompressionBrotli.java b/hbase-compression/hbase-compression-brotli/src/test/java/org/apache/hadoop/hbase/io/compress/brotli/TestHFileCompressionBrotli.java new file mode 100644 index 00000000000..7feb26ed1f2 --- /dev/null +++ b/hbase-compression/hbase-compression-brotli/src/test/java/org/apache/hadoop/hbase/io/compress/brotli/TestHFileCompressionBrotli.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.compress.brotli; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.compress.HFileTestBase; +import org.apache.hadoop.hbase.testclassification.IOTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({IOTests.class, SmallTests.class}) +public class TestHFileCompressionBrotli extends HFileTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestHFileCompressionBrotli.class); + + private static Configuration conf; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf = TEST_UTIL.getConfiguration(); + conf.set(Compression.BROTLI_CODEC_CLASS_KEY, BrotliCodec.class.getCanonicalName()); + Compression.Algorithm.BROTLI.reload(conf); + HFileTestBase.setUpBeforeClass(); + } + + @Test + public void test() throws Exception { + Path path = new Path(TEST_UTIL.getDataTestDir(), + HBaseTestingUtility.getRandomUUID().toString() + ".hfile"); + doTest(conf, path, Compression.Algorithm.BROTLI); + } + +} diff --git a/hbase-compression/hbase-compression-brotli/src/test/java/org/apache/hadoop/hbase/io/compress/brotli/TestWALCompressionBrotli.java b/hbase-compression/hbase-compression-brotli/src/test/java/org/apache/hadoop/hbase/io/compress/brotli/TestWALCompressionBrotli.java new file mode 100644 index 00000000000..ac25951d2d4 --- /dev/null +++ b/hbase-compression/hbase-compression-brotli/src/test/java/org/apache/hadoop/hbase/io/compress/brotli/TestWALCompressionBrotli.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.compress.brotli; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.regionserver.wal.CompressionContext; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.wal.CompressedWALTestBase; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestWALCompressionBrotli extends CompressedWALTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestWALCompressionBrotli.class); + + @Rule + public TestName name = new TestName(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.set(Compression.BROTLI_CODEC_CLASS_KEY, BrotliCodec.class.getCanonicalName()); + Compression.Algorithm.BROTLI.reload(conf); + conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); + conf.setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, true); + conf.set(CompressionContext.WAL_VALUE_COMPRESSION_TYPE, Compression.Algorithm.BROTLI.getName()); + TEST_UTIL.startMiniDFSCluster(3); + } + + @AfterClass + public static void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void test() throws Exception { + TableName tableName = TableName.valueOf(name.getMethodName().replaceAll("[^a-zA-Z0-9]", "_")); + doTest(tableName); + } + +} diff --git a/hbase-compression/hbase-compression-lz4/src/main/java/org/apache/hadoop/hbase/io/compress/lz4/Lz4Codec.java b/hbase-compression/hbase-compression-lz4/src/main/java/org/apache/hadoop/hbase/io/compress/lz4/Lz4Codec.java index a218954b6f2..d6b0365d63d 100644 --- a/hbase-compression/hbase-compression-lz4/src/main/java/org/apache/hadoop/hbase/io/compress/lz4/Lz4Codec.java +++ b/hbase-compression/hbase-compression-lz4/src/main/java/org/apache/hadoop/hbase/io/compress/lz4/Lz4Codec.java @@ -23,6 +23,7 @@ import java.io.OutputStream; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.hbase.io.compress.CompressionUtil; import org.apache.hadoop.io.compress.BlockCompressorStream; import org.apache.hadoop.io.compress.BlockDecompressorStream; import org.apache.hadoop.io.compress.CompressionCodec; @@ -88,8 +89,8 @@ public class Lz4Codec implements Configurable, CompressionCodec { public CompressionOutputStream createOutputStream(OutputStream out, Compressor c) throws IOException { int bufferSize = getBufferSize(conf); - int compressionOverhead = (bufferSize / 6) + 32; - return new BlockCompressorStream(out, c, bufferSize, compressionOverhead); + return new BlockCompressorStream(out, c, bufferSize, + CompressionUtil.compressionOverhead(bufferSize)); } @Override @@ -110,10 +111,9 @@ public class Lz4Codec implements Configurable, CompressionCodec { // Package private static int getBufferSize(Configuration conf) { - int size = conf.getInt(LZ4_BUFFER_SIZE_KEY, + return conf.getInt(LZ4_BUFFER_SIZE_KEY, conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY, CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT)); - return size > 0 ? size : 256 * 1024; // Don't change this default } } diff --git a/hbase-compression/hbase-compression-lz4/src/main/java/org/apache/hadoop/hbase/io/compress/lz4/Lz4Compressor.java b/hbase-compression/hbase-compression-lz4/src/main/java/org/apache/hadoop/hbase/io/compress/lz4/Lz4Compressor.java index 71b5164f116..61046cd2050 100644 --- a/hbase-compression/hbase-compression-lz4/src/main/java/org/apache/hadoop/hbase/io/compress/lz4/Lz4Compressor.java +++ b/hbase-compression/hbase-compression-lz4/src/main/java/org/apache/hadoop/hbase/io/compress/lz4/Lz4Compressor.java @@ -58,7 +58,7 @@ public class Lz4Compressor implements CanReinit, Compressor { if (outBuf.hasRemaining()) { int remaining = outBuf.remaining(), n = Math.min(remaining, len); outBuf.get(b, off, n); - LOG.trace("compress: {} bytes from outBuf", n); + LOG.trace("compress: read {} remaining bytes from outBuf", n); return n; } // We don't actually begin compression until our caller calls finish(). diff --git a/hbase-compression/hbase-compression-lz4/src/main/java/org/apache/hadoop/hbase/io/compress/lz4/Lz4Decompressor.java b/hbase-compression/hbase-compression-lz4/src/main/java/org/apache/hadoop/hbase/io/compress/lz4/Lz4Decompressor.java index efb8c846d92..5c46671ab91 100644 --- a/hbase-compression/hbase-compression-lz4/src/main/java/org/apache/hadoop/hbase/io/compress/lz4/Lz4Decompressor.java +++ b/hbase-compression/hbase-compression-lz4/src/main/java/org/apache/hadoop/hbase/io/compress/lz4/Lz4Decompressor.java @@ -53,7 +53,7 @@ public class Lz4Decompressor implements Decompressor { if (outBuf.hasRemaining()) { int remaining = outBuf.remaining(), n = Math.min(remaining, len); outBuf.get(b, off, n); - LOG.trace("decompress: {} bytes from outBuf", n); + LOG.trace("decompress: read {} remaining bytes from outBuf", n); return n; } if (inBuf.position() > 0) { diff --git a/hbase-compression/hbase-compression-snappy/src/main/java/org/apache/hadoop/hbase/io/compress/xerial/SnappyCodec.java b/hbase-compression/hbase-compression-snappy/src/main/java/org/apache/hadoop/hbase/io/compress/xerial/SnappyCodec.java index e7c62c507c1..aae07b4d4ed 100644 --- a/hbase-compression/hbase-compression-snappy/src/main/java/org/apache/hadoop/hbase/io/compress/xerial/SnappyCodec.java +++ b/hbase-compression/hbase-compression-snappy/src/main/java/org/apache/hadoop/hbase/io/compress/xerial/SnappyCodec.java @@ -31,6 +31,7 @@ import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.Compressor; import org.apache.hadoop.io.compress.Decompressor; import org.apache.yetus.audience.InterfaceAudience; +import org.xerial.snappy.Snappy; /** * Hadoop Snappy codec implemented with Xerial Snappy. @@ -88,8 +89,8 @@ public class SnappyCodec implements Configurable, CompressionCodec { public CompressionOutputStream createOutputStream(OutputStream out, Compressor c) throws IOException { int bufferSize = getBufferSize(conf); - int compressionOverhead = (bufferSize / 6) + 32; - return new BlockCompressorStream(out, c, bufferSize, compressionOverhead); + return new BlockCompressorStream(out, c, bufferSize, + Snappy.maxCompressedLength(bufferSize) - bufferSize); // overhead only } @Override @@ -110,10 +111,9 @@ public class SnappyCodec implements Configurable, CompressionCodec { // Package private static int getBufferSize(Configuration conf) { - int size = conf.getInt(SNAPPY_BUFFER_SIZE_KEY, + return conf.getInt(SNAPPY_BUFFER_SIZE_KEY, conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY, CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT)); - return size > 0 ? size : 256 * 1024; // Don't change this default } } diff --git a/hbase-compression/hbase-compression-snappy/src/main/java/org/apache/hadoop/hbase/io/compress/xerial/SnappyCompressor.java b/hbase-compression/hbase-compression-snappy/src/main/java/org/apache/hadoop/hbase/io/compress/xerial/SnappyCompressor.java index fd999426508..2a43ca61dca 100644 --- a/hbase-compression/hbase-compression-snappy/src/main/java/org/apache/hadoop/hbase/io/compress/xerial/SnappyCompressor.java +++ b/hbase-compression/hbase-compression-snappy/src/main/java/org/apache/hadoop/hbase/io/compress/xerial/SnappyCompressor.java @@ -55,7 +55,7 @@ public class SnappyCompressor implements CanReinit, Compressor { if (outBuf.hasRemaining()) { int remaining = outBuf.remaining(), n = Math.min(remaining, len); outBuf.get(b, off, n); - LOG.trace("compress: {} bytes from outBuf", n); + LOG.trace("compress: read {} remaining bytes from outBuf", n); return n; } // We don't actually begin compression until our caller calls finish(). diff --git a/hbase-compression/hbase-compression-snappy/src/main/java/org/apache/hadoop/hbase/io/compress/xerial/SnappyDecompressor.java b/hbase-compression/hbase-compression-snappy/src/main/java/org/apache/hadoop/hbase/io/compress/xerial/SnappyDecompressor.java index e9119216168..0bad64971d6 100644 --- a/hbase-compression/hbase-compression-snappy/src/main/java/org/apache/hadoop/hbase/io/compress/xerial/SnappyDecompressor.java +++ b/hbase-compression/hbase-compression-snappy/src/main/java/org/apache/hadoop/hbase/io/compress/xerial/SnappyDecompressor.java @@ -49,7 +49,7 @@ public class SnappyDecompressor implements Decompressor { if (outBuf.hasRemaining()) { int remaining = outBuf.remaining(), n = Math.min(remaining, len); outBuf.get(b, off, n); - LOG.trace("decompress: {} bytes from outBuf", n); + LOG.trace("decompress: read {} remaining bytes from outBuf", n); return n; } if (inBuf.position() > 0) { diff --git a/hbase-compression/hbase-compression-xz/src/main/java/org/apache/hadoop/hbase/io/compress/xz/LzmaCodec.java b/hbase-compression/hbase-compression-xz/src/main/java/org/apache/hadoop/hbase/io/compress/xz/LzmaCodec.java index 99f29a2695b..8509aa05ddc 100644 --- a/hbase-compression/hbase-compression-xz/src/main/java/org/apache/hadoop/hbase/io/compress/xz/LzmaCodec.java +++ b/hbase-compression/hbase-compression-xz/src/main/java/org/apache/hadoop/hbase/io/compress/xz/LzmaCodec.java @@ -22,6 +22,7 @@ import java.io.OutputStream; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.io.compress.CompressionUtil; import org.apache.hadoop.io.compress.BlockCompressorStream; import org.apache.hadoop.io.compress.BlockDecompressorStream; import org.apache.hadoop.io.compress.CompressionCodec; @@ -88,8 +89,8 @@ public class LzmaCodec implements Configurable, CompressionCodec { public CompressionOutputStream createOutputStream(OutputStream out, Compressor c) throws IOException { int bufferSize = getBufferSize(conf); - int compressionOverhead = (bufferSize / 6) + 32; - return new BlockCompressorStream(out, c, bufferSize, compressionOverhead); + return new BlockCompressorStream(out, c, bufferSize, + CompressionUtil.compressionOverhead(bufferSize)); } @Override diff --git a/hbase-compression/hbase-compression-xz/src/main/java/org/apache/hadoop/hbase/io/compress/xz/LzmaCompressor.java b/hbase-compression/hbase-compression-xz/src/main/java/org/apache/hadoop/hbase/io/compress/xz/LzmaCompressor.java index dd4d9990954..7174942bc7d 100644 --- a/hbase-compression/hbase-compression-xz/src/main/java/org/apache/hadoop/hbase/io/compress/xz/LzmaCompressor.java +++ b/hbase-compression/hbase-compression-xz/src/main/java/org/apache/hadoop/hbase/io/compress/xz/LzmaCompressor.java @@ -68,7 +68,7 @@ public class LzmaCompressor implements Compressor { if (outBuf.hasRemaining()) { int remaining = outBuf.remaining(), n = Math.min(remaining, len); outBuf.get(b, off, n); - LOG.trace("compress: {} bytes from outBuf", n); + LOG.trace("compress: read {} remaining bytes from outBuf", n); return n; } // We don't actually begin compression until our caller calls finish(). @@ -236,7 +236,7 @@ public class LzmaCompressor implements Compressor { // Package private int maxCompressedLength(int len) { - return len + 32 + (len/6); + return len + CompressionUtil.compressionOverhead(len); } } diff --git a/hbase-compression/hbase-compression-xz/src/main/java/org/apache/hadoop/hbase/io/compress/xz/LzmaDecompressor.java b/hbase-compression/hbase-compression-xz/src/main/java/org/apache/hadoop/hbase/io/compress/xz/LzmaDecompressor.java index be450b3be16..6c3399dfb26 100644 --- a/hbase-compression/hbase-compression-xz/src/main/java/org/apache/hadoop/hbase/io/compress/xz/LzmaDecompressor.java +++ b/hbase-compression/hbase-compression-xz/src/main/java/org/apache/hadoop/hbase/io/compress/xz/LzmaDecompressor.java @@ -59,7 +59,7 @@ public class LzmaDecompressor implements Decompressor { if (outBuf.hasRemaining()) { int remaining = outBuf.remaining(), n = Math.min(remaining, len); outBuf.get(b, off, n); - LOG.trace("decompress: {} bytes from outBuf", n); + LOG.trace("decompress: read {} remaining bytes from outBuf", n); return n; } if (inBuf.position() > 0) { diff --git a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCodec.java b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCodec.java index 07b26d0c4bf..521af5b25dd 100644 --- a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCodec.java +++ b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCodec.java @@ -16,6 +16,7 @@ */ package org.apache.hadoop.hbase.io.compress.zstd; +import com.github.luben.zstd.Zstd; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -44,6 +45,7 @@ public class ZstdCodec implements Configurable, CompressionCodec { public static final String ZSTD_LEVEL_KEY = "hbase.io.compress.zstd.level"; public static final String ZSTD_BUFFER_SIZE_KEY = "hbase.io.compress.zstd.buffersize"; + public static final int ZSTD_BUFFER_SIZE_DEFAULT = 256 * 1024; public static final String ZSTD_DICTIONARY_KEY = "hbase.io.compress.zstd.dictionary"; private Configuration conf; @@ -92,8 +94,8 @@ public class ZstdCodec implements Configurable, CompressionCodec { public CompressionOutputStream createOutputStream(OutputStream out, Compressor c) throws IOException { int bufferSize = getBufferSize(conf); - int compressionOverhead = (bufferSize / 6) + 32; - return new BlockCompressorStream(out, c, bufferSize, compressionOverhead); + return new BlockCompressorStream(out, c, bufferSize, + (int)Zstd.compressBound(bufferSize) - bufferSize); // overhead only } @Override @@ -121,10 +123,10 @@ public class ZstdCodec implements Configurable, CompressionCodec { } static int getBufferSize(Configuration conf) { - int size = conf.getInt(ZSTD_BUFFER_SIZE_KEY, + return conf.getInt(ZSTD_BUFFER_SIZE_KEY, conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY, - CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT)); - return size > 0 ? size : 256 * 1024; // Don't change this default + // IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT is 0! We can't allow that. + ZSTD_BUFFER_SIZE_DEFAULT)); } static byte[] getDictionary(final Configuration conf) { diff --git a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCompressor.java b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCompressor.java index deaf7e1ea83..ea45414ccb9 100644 --- a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCompressor.java +++ b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCompressor.java @@ -67,7 +67,7 @@ public class ZstdCompressor implements CanReinit, Compressor { if (outBuf.hasRemaining()) { int remaining = outBuf.remaining(), n = Math.min(remaining, len); outBuf.get(b, off, n); - LOG.trace("compress: {} bytes from outBuf", n); + LOG.trace("compress: read {} remaining bytes from outBuf", n); return n; } // We don't actually begin compression until our caller calls finish(). diff --git a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdDecompressor.java b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdDecompressor.java index dfa37db636a..6bfa84e1c59 100644 --- a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdDecompressor.java +++ b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdDecompressor.java @@ -62,7 +62,7 @@ public class ZstdDecompressor implements CanReinit, Decompressor { if (outBuf.hasRemaining()) { int remaining = outBuf.remaining(), n = Math.min(remaining, len); outBuf.get(b, off, n); - LOG.trace("decompress: {} bytes from outBuf", n); + LOG.trace("decompress: read {} remaining bytes from outBuf", n); return n; } if (inBuf.position() > 0) { diff --git a/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestZstdDictionary.java b/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestZstdDictionary.java index 0a17ef997d2..5a76a4531f2 100644 --- a/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestZstdDictionary.java +++ b/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestZstdDictionary.java @@ -43,9 +43,9 @@ public class TestZstdDictionary extends CompressionTestBase { HBaseClassTestRule.forClass(TestZstdDictionary.class); private static final String DICTIONARY_PATH = DictionaryCache.RESOURCE_SCHEME + "zstd.test.dict"; - // zstd.test.data compressed with zstd.test.dict at level 3 will produce a result of - // 358555 bytes - private static final int EXPECTED_COMPRESSED_SIZE = 358555; + // zstd.test.data compressed with zstd.test.dict at level 3 with a default buffer size of 262144 + // will produce a result of 359909 bytes + private static final int EXPECTED_COMPRESSED_SIZE = 359909; private static byte[] TEST_DATA; diff --git a/hbase-compression/pom.xml b/hbase-compression/pom.xml index 4f65df1ef73..23a0fa091e5 100644 --- a/hbase-compression/pom.xml +++ b/hbase-compression/pom.xml @@ -33,6 +33,7 @@ hbase-compression-aircompressor + hbase-compression-brotli hbase-compression-lz4 hbase-compression-snappy hbase-compression-xz diff --git a/hbase-resource-bundle/src/main/resources/supplemental-models.xml b/hbase-resource-bundle/src/main/resources/supplemental-models.xml index 42cf49fb673..2c6b2a1a0a2 100644 --- a/hbase-resource-bundle/src/main/resources/supplemental-models.xml +++ b/hbase-resource-bundle/src/main/resources/supplemental-models.xml @@ -3378,4 +3378,71 @@ Copyright (c) 2007-2017 The JRuby project + + + + + com.aayushatharva.brotli4j + brotli4j + + + Apache License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + + + + + + + com.aayushatharva.brotli4j + native-linux-aarch64 + + + Apache License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + + + + + + + com.aayushatharva.brotli4j + native-linux-x86_64 + + + Apache License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + + + + + + + com.aayushatharva.brotli4j + native-osx-x86_64 + + + Apache License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + + + + + + + com.aayushatharva.brotli4j + native-windows-x86_64 + + + Apache License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + + + + diff --git a/pom.xml b/pom.xml index cd3b8bd3d4d..79bbe277998 100644 --- a/pom.xml +++ b/pom.xml @@ -633,6 +633,7 @@ 2.21.0 0.21 + 1.7.1 1.8.0 1.1.8.4 1.9 @@ -1011,6 +1012,11 @@ hbase-compression-aircompressor ${project.version} + + org.apache.hbase + hbase-compression-brotli + ${project.version} + org.apache.hbase hbase-compression-lz4