HBASE-26959 Brotli compression support (#4353)
Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
This commit is contained in:
parent
8d17241ce0
commit
0f9f6f2cde
@ -314,6 +314,10 @@
|
|||||||
<groupId>org.apache.hbase</groupId>
|
<groupId>org.apache.hbase</groupId>
|
||||||
<artifactId>hbase-compression-aircompressor</artifactId>
|
<artifactId>hbase-compression-aircompressor</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.hbase</groupId>
|
||||||
|
<artifactId>hbase-compression-brotli</artifactId>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.hbase</groupId>
|
<groupId>org.apache.hbase</groupId>
|
||||||
<artifactId>hbase-compression-lz4</artifactId>
|
<artifactId>hbase-compression-lz4</artifactId>
|
||||||
|
@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory;
|
|||||||
public final class Compression {
|
public final class Compression {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(Compression.class);
|
private static final Logger LOG = LoggerFactory.getLogger(Compression.class);
|
||||||
|
|
||||||
|
|
||||||
// LZO
|
// LZO
|
||||||
|
|
||||||
public static final String LZO_CODEC_CLASS_KEY =
|
public static final String LZO_CODEC_CLASS_KEY =
|
||||||
@ -97,6 +96,13 @@ public final class Compression {
|
|||||||
public static final String LZMA_CODEC_CLASS_DEFAULT =
|
public static final String LZMA_CODEC_CLASS_DEFAULT =
|
||||||
"org.apache.hadoop.hbase.io.compress.xz.LzmaCodec";
|
"org.apache.hadoop.hbase.io.compress.xz.LzmaCodec";
|
||||||
|
|
||||||
|
// Brotli
|
||||||
|
|
||||||
|
public static final String BROTLI_CODEC_CLASS_KEY =
|
||||||
|
"hbase.io.compress.brotli.codec";
|
||||||
|
public static final String BROTLI_CODEC_CLASS_DEFAULT =
|
||||||
|
"org.apache.hadoop.hbase.io.compress.brotli.BrotliCodec";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Prevent the instantiation of class.
|
* Prevent the instantiation of class.
|
||||||
*/
|
*/
|
||||||
@ -148,6 +154,7 @@ public final class Compression {
|
|||||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
|
||||||
value="SE_TRANSIENT_FIELD_NOT_RESTORED",
|
value="SE_TRANSIENT_FIELD_NOT_RESTORED",
|
||||||
justification="We are not serializing so doesn't apply (not sure why transient though)")
|
justification="We are not serializing so doesn't apply (not sure why transient though)")
|
||||||
|
@SuppressWarnings("ImmutableEnumChecker")
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
public static enum Algorithm {
|
public static enum Algorithm {
|
||||||
// LZO is GPL and requires extra install to setup. See
|
// LZO is GPL and requires extra install to setup. See
|
||||||
@ -352,6 +359,31 @@ public final class Compression {
|
|||||||
return lzmaCodec;
|
return lzmaCodec;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
BROTLI("brotli", BROTLI_CODEC_CLASS_KEY, BROTLI_CODEC_CLASS_DEFAULT) {
|
||||||
|
// Use base type to avoid compile-time dependencies.
|
||||||
|
private volatile transient CompressionCodec brotliCodec;
|
||||||
|
private final transient Object lock = new Object();
|
||||||
|
@Override
|
||||||
|
CompressionCodec getCodec(Configuration conf) {
|
||||||
|
if (brotliCodec == null) {
|
||||||
|
synchronized (lock) {
|
||||||
|
if (brotliCodec == null) {
|
||||||
|
brotliCodec = buildCodec(conf, this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return brotliCodec;
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public CompressionCodec reload(Configuration conf) {
|
||||||
|
synchronized (lock) {
|
||||||
|
brotliCodec = buildCodec(conf, this);
|
||||||
|
LOG.warn("Reloaded configuration for {}", name());
|
||||||
|
return brotliCodec;
|
||||||
|
}
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
|
@ -35,4 +35,16 @@ public final class CompressionUtil {
|
|||||||
return v;
|
return v;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Most compression algorithms can be presented with pathological input that causes an
|
||||||
|
* expansion rather than a compression. Hadoop's compression API requires that we calculate
|
||||||
|
* additional buffer space required for the worst case. There is a formula developed for
|
||||||
|
* gzip that applies as a ballpark to all LZ variants. It should be good enough for now and
|
||||||
|
* has been tested as such with a range of different inputs.
|
||||||
|
*/
|
||||||
|
public static int compressionOverhead(int bufferSize) {
|
||||||
|
// Given an input buffer of 'buffersize' bytes we presume a worst case expansion of
|
||||||
|
// 32 bytes (block header) and addition 1/6th of the input size.
|
||||||
|
return (bufferSize / 6) + 32;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -57,7 +57,7 @@ public abstract class HadoopCompressor<T extends Compressor>
|
|||||||
if (outBuf.hasRemaining()) {
|
if (outBuf.hasRemaining()) {
|
||||||
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
||||||
outBuf.get(b, off, n);
|
outBuf.get(b, off, n);
|
||||||
LOG.trace("compress: {} bytes from outBuf", n);
|
LOG.trace("compress: read {} remaining bytes from outBuf", n);
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
// We don't actually begin compression until our caller calls finish().
|
// We don't actually begin compression until our caller calls finish().
|
||||||
|
@ -51,7 +51,7 @@ public class HadoopDecompressor<T extends Decompressor>
|
|||||||
if (outBuf.hasRemaining()) {
|
if (outBuf.hasRemaining()) {
|
||||||
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
||||||
outBuf.get(b, off, n);
|
outBuf.get(b, off, n);
|
||||||
LOG.trace("decompress: {} bytes from outBuf", n);
|
LOG.trace("decompress: read {} remaining bytes from outBuf", n);
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
if (inBuf.position() > 0) {
|
if (inBuf.position() > 0) {
|
||||||
|
@ -23,6 +23,7 @@ import java.io.OutputStream;
|
|||||||
import org.apache.hadoop.conf.Configurable;
|
import org.apache.hadoop.conf.Configurable;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
|
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
||||||
import org.apache.hadoop.io.compress.BlockCompressorStream;
|
import org.apache.hadoop.io.compress.BlockCompressorStream;
|
||||||
import org.apache.hadoop.io.compress.BlockDecompressorStream;
|
import org.apache.hadoop.io.compress.BlockDecompressorStream;
|
||||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||||
@ -91,8 +92,8 @@ public class Lz4Codec implements Configurable, CompressionCodec {
|
|||||||
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
|
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
int bufferSize = getBufferSize(conf);
|
int bufferSize = getBufferSize(conf);
|
||||||
int compressionOverhead = (bufferSize / 6) + 32;
|
return new BlockCompressorStream(out, c, bufferSize,
|
||||||
return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
|
CompressionUtil.compressionOverhead(bufferSize));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -149,10 +150,9 @@ public class Lz4Codec implements Configurable, CompressionCodec {
|
|||||||
// Package private
|
// Package private
|
||||||
|
|
||||||
static int getBufferSize(Configuration conf) {
|
static int getBufferSize(Configuration conf) {
|
||||||
int size = conf.getInt(LZ4_BUFFER_SIZE_KEY,
|
return conf.getInt(LZ4_BUFFER_SIZE_KEY,
|
||||||
conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
|
conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
|
||||||
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT));
|
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT));
|
||||||
return size > 0 ? size : 256 * 1024; // Don't change this default
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -23,6 +23,7 @@ import java.io.OutputStream;
|
|||||||
import org.apache.hadoop.conf.Configurable;
|
import org.apache.hadoop.conf.Configurable;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
|
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
||||||
import org.apache.hadoop.io.compress.BlockCompressorStream;
|
import org.apache.hadoop.io.compress.BlockCompressorStream;
|
||||||
import org.apache.hadoop.io.compress.BlockDecompressorStream;
|
import org.apache.hadoop.io.compress.BlockDecompressorStream;
|
||||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||||
@ -91,8 +92,8 @@ public class LzoCodec implements Configurable, CompressionCodec {
|
|||||||
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
|
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
int bufferSize = getBufferSize(conf);
|
int bufferSize = getBufferSize(conf);
|
||||||
int compressionOverhead = (bufferSize / 6) + 32;
|
return new BlockCompressorStream(out, c, bufferSize,
|
||||||
return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
|
CompressionUtil.compressionOverhead(bufferSize));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -149,10 +150,9 @@ public class LzoCodec implements Configurable, CompressionCodec {
|
|||||||
// Package private
|
// Package private
|
||||||
|
|
||||||
static int getBufferSize(Configuration conf) {
|
static int getBufferSize(Configuration conf) {
|
||||||
int size = conf.getInt(LZO_BUFFER_SIZE_KEY,
|
return conf.getInt(LZO_BUFFER_SIZE_KEY,
|
||||||
conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY,
|
conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY,
|
||||||
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT));
|
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT));
|
||||||
return size > 0 ? size : 256 * 1024; // Don't change this default
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -23,6 +23,7 @@ import java.io.OutputStream;
|
|||||||
import org.apache.hadoop.conf.Configurable;
|
import org.apache.hadoop.conf.Configurable;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
|
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
||||||
import org.apache.hadoop.io.compress.BlockCompressorStream;
|
import org.apache.hadoop.io.compress.BlockCompressorStream;
|
||||||
import org.apache.hadoop.io.compress.BlockDecompressorStream;
|
import org.apache.hadoop.io.compress.BlockDecompressorStream;
|
||||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||||
@ -91,8 +92,8 @@ public class SnappyCodec implements Configurable, CompressionCodec {
|
|||||||
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
|
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
int bufferSize = getBufferSize(conf);
|
int bufferSize = getBufferSize(conf);
|
||||||
int compressionOverhead = (bufferSize / 6) + 32;
|
return new BlockCompressorStream(out, c, bufferSize,
|
||||||
return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
|
CompressionUtil.compressionOverhead(bufferSize));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -149,10 +150,9 @@ public class SnappyCodec implements Configurable, CompressionCodec {
|
|||||||
// Package private
|
// Package private
|
||||||
|
|
||||||
static int getBufferSize(Configuration conf) {
|
static int getBufferSize(Configuration conf) {
|
||||||
int size = conf.getInt(SNAPPY_BUFFER_SIZE_KEY,
|
return conf.getInt(SNAPPY_BUFFER_SIZE_KEY,
|
||||||
conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
|
conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
|
||||||
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT));
|
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT));
|
||||||
return size > 0 ? size : 256 * 1024; // Don't change this default
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -23,6 +23,7 @@ import java.io.OutputStream;
|
|||||||
import org.apache.hadoop.conf.Configurable;
|
import org.apache.hadoop.conf.Configurable;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
|
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
||||||
import org.apache.hadoop.io.compress.BlockCompressorStream;
|
import org.apache.hadoop.io.compress.BlockCompressorStream;
|
||||||
import org.apache.hadoop.io.compress.BlockDecompressorStream;
|
import org.apache.hadoop.io.compress.BlockDecompressorStream;
|
||||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||||
@ -52,6 +53,7 @@ import io.airlift.compress.zstd.ZstdDecompressor;
|
|||||||
public class ZstdCodec implements Configurable, CompressionCodec {
|
public class ZstdCodec implements Configurable, CompressionCodec {
|
||||||
|
|
||||||
public static final String ZSTD_BUFFER_SIZE_KEY = "hbase.io.compress.zstd.buffersize";
|
public static final String ZSTD_BUFFER_SIZE_KEY = "hbase.io.compress.zstd.buffersize";
|
||||||
|
public static final int ZSTD_BUFFER_SIZE_DEFAULT = 256 * 1024;
|
||||||
|
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
|
|
||||||
@ -99,8 +101,8 @@ public class ZstdCodec implements Configurable, CompressionCodec {
|
|||||||
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
|
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
int bufferSize = getBufferSize(conf);
|
int bufferSize = getBufferSize(conf);
|
||||||
int compressionOverhead = (bufferSize / 6) + 32;
|
return new BlockCompressorStream(out, c, bufferSize,
|
||||||
return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
|
CompressionUtil.compressionOverhead(bufferSize));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -157,10 +159,10 @@ public class ZstdCodec implements Configurable, CompressionCodec {
|
|||||||
// Package private
|
// Package private
|
||||||
|
|
||||||
static int getBufferSize(Configuration conf) {
|
static int getBufferSize(Configuration conf) {
|
||||||
int size = conf.getInt(ZSTD_BUFFER_SIZE_KEY,
|
return conf.getInt(ZSTD_BUFFER_SIZE_KEY,
|
||||||
conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY,
|
conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY,
|
||||||
CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT));
|
// IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT is 0! We can't allow that.
|
||||||
return size > 0 ? size : 256 * 1024; // Don't change this default
|
ZSTD_BUFFER_SIZE_DEFAULT));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
167
hbase-compression/hbase-compression-brotli/pom.xml
Normal file
167
hbase-compression/hbase-compression-brotli/pom.xml
Normal file
@ -0,0 +1,167 @@
|
|||||||
|
<?xml version="1.0"?>
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<!--
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
-->
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<parent>
|
||||||
|
<artifactId>hbase-compression</artifactId>
|
||||||
|
<groupId>org.apache.hbase</groupId>
|
||||||
|
<version>2.6.0-SNAPSHOT</version>
|
||||||
|
<relativePath>..</relativePath>
|
||||||
|
</parent>
|
||||||
|
<artifactId>hbase-compression-brotli</artifactId>
|
||||||
|
<name>Apache HBase - Compression - Brotli</name>
|
||||||
|
<description>Compression support using Brotli4j</description>
|
||||||
|
<build>
|
||||||
|
<plugins>
|
||||||
|
<!-- Testing plugins -->
|
||||||
|
<plugin>
|
||||||
|
<artifactId>maven-surefire-plugin</artifactId>
|
||||||
|
</plugin>
|
||||||
|
<plugin>
|
||||||
|
<groupId>net.revelc.code</groupId>
|
||||||
|
<artifactId>warbucks-maven-plugin</artifactId>
|
||||||
|
</plugin>
|
||||||
|
</plugins>
|
||||||
|
<pluginManagement>
|
||||||
|
<plugins>
|
||||||
|
<plugin>
|
||||||
|
<!--Make it so assembly:single does nothing in here-->
|
||||||
|
<artifactId>maven-assembly-plugin</artifactId>
|
||||||
|
<configuration>
|
||||||
|
<skipAssembly>true</skipAssembly>
|
||||||
|
</configuration>
|
||||||
|
</plugin>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-checkstyle-plugin</artifactId>
|
||||||
|
<configuration>
|
||||||
|
<failOnViolation>true</failOnViolation>
|
||||||
|
</configuration>
|
||||||
|
</plugin>
|
||||||
|
<plugin>
|
||||||
|
<groupId>net.revelc.code</groupId>
|
||||||
|
<artifactId>warbucks-maven-plugin</artifactId>
|
||||||
|
</plugin>
|
||||||
|
</plugins>
|
||||||
|
</pluginManagement>
|
||||||
|
</build>
|
||||||
|
<dependencies>
|
||||||
|
<!-- Intra-project dependencies -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.hbase</groupId>
|
||||||
|
<artifactId>hbase-common</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.hbase</groupId>
|
||||||
|
<artifactId>hbase-logging</artifactId>
|
||||||
|
<type>test-jar</type>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.hbase</groupId>
|
||||||
|
<artifactId>hbase-common</artifactId>
|
||||||
|
<type>test-jar</type>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.hbase</groupId>
|
||||||
|
<artifactId>hbase-testing-util</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.hbase</groupId>
|
||||||
|
<artifactId>hbase-annotations</artifactId>
|
||||||
|
<type>test-jar</type>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.slf4j</groupId>
|
||||||
|
<artifactId>slf4j-api</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.github.stephenc.findbugs</groupId>
|
||||||
|
<artifactId>findbugs-annotations</artifactId>
|
||||||
|
<scope>compile</scope>
|
||||||
|
<optional>true</optional>
|
||||||
|
</dependency>
|
||||||
|
<!-- native Java compression codecs -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.aayushatharva.brotli4j</groupId>
|
||||||
|
<artifactId>brotli4j</artifactId>
|
||||||
|
<version>${brotli4j.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<!--Test-->
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.slf4j</groupId>
|
||||||
|
<artifactId>jcl-over-slf4j</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.slf4j</groupId>
|
||||||
|
<artifactId>jul-to-slf4j</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.logging.log4j</groupId>
|
||||||
|
<artifactId>log4j-api</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.logging.log4j</groupId>
|
||||||
|
<artifactId>log4j-core</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.logging.log4j</groupId>
|
||||||
|
<artifactId>log4j-slf4j-impl</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.logging.log4j</groupId>
|
||||||
|
<artifactId>log4j-1.2-api</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.hamcrest</groupId>
|
||||||
|
<artifactId>hamcrest-library</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.mockito</groupId>
|
||||||
|
<artifactId>mockito-core</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
<profiles>
|
||||||
|
<profile>
|
||||||
|
<id>build-with-jdk11</id>
|
||||||
|
<activation>
|
||||||
|
<jdk>[1.11,)</jdk>
|
||||||
|
</activation>
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>javax.annotation</groupId>
|
||||||
|
<artifactId>javax.annotation-api</artifactId>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
</profile>
|
||||||
|
</profiles>
|
||||||
|
</project>
|
@ -0,0 +1,127 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.io.compress.brotli;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import org.apache.hadoop.conf.Configurable;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
||||||
|
import org.apache.hadoop.io.compress.BlockCompressorStream;
|
||||||
|
import org.apache.hadoop.io.compress.BlockDecompressorStream;
|
||||||
|
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||||
|
import org.apache.hadoop.io.compress.CompressionInputStream;
|
||||||
|
import org.apache.hadoop.io.compress.CompressionOutputStream;
|
||||||
|
import org.apache.hadoop.io.compress.Compressor;
|
||||||
|
import org.apache.hadoop.io.compress.Decompressor;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Hadoop brotli codec implemented with Brotli4j
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class BrotliCodec implements Configurable, CompressionCodec {
|
||||||
|
|
||||||
|
public static final String BROTLI_LEVEL_KEY = "hbase.io.compress.brotli.level";
|
||||||
|
// Our default is 6, based on https://blog.cloudflare.com/results-experimenting-brotli/
|
||||||
|
public static final int BROTLI_LEVEL_DEFAULT = 6; // [0,11] or -1
|
||||||
|
public static final String BROTLI_WINDOW_KEY = "hbase.io.compress.brotli.window";
|
||||||
|
public static final int BROTLI_WINDOW_DEFAULT = -1; // [10-24] or -1
|
||||||
|
public static final String BROTLI_BUFFERSIZE_KEY = "hbase.io.compress.brotli.buffersize";
|
||||||
|
public static final int BROTLI_BUFFERSIZE_DEFAULT = 256 * 1024;
|
||||||
|
|
||||||
|
private Configuration conf;
|
||||||
|
|
||||||
|
public BrotliCodec() {
|
||||||
|
conf = new Configuration();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Configuration getConf() {
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setConf(Configuration conf) {
|
||||||
|
this.conf = conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Compressor createCompressor() {
|
||||||
|
return new BrotliCompressor(getLevel(conf), getWindow(conf), getBufferSize(conf));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Decompressor createDecompressor() {
|
||||||
|
return new BrotliDecompressor(getBufferSize(conf));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompressionInputStream createInputStream(InputStream in) throws IOException {
|
||||||
|
return createInputStream(in, createDecompressor());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompressionInputStream createInputStream(InputStream in, Decompressor d)
|
||||||
|
throws IOException {
|
||||||
|
return new BlockDecompressorStream(in, d, getBufferSize(conf));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompressionOutputStream createOutputStream(OutputStream out) throws IOException {
|
||||||
|
return createOutputStream(out, createCompressor());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
|
||||||
|
throws IOException {
|
||||||
|
int bufferSize = getBufferSize(conf);
|
||||||
|
return new BlockCompressorStream(out, c, bufferSize,
|
||||||
|
CompressionUtil.compressionOverhead(bufferSize));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Class<? extends Compressor> getCompressorType() {
|
||||||
|
return BrotliCompressor.class;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Class<? extends Decompressor> getDecompressorType() {
|
||||||
|
return BrotliDecompressor.class;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getDefaultExtension() {
|
||||||
|
return ".br";
|
||||||
|
}
|
||||||
|
|
||||||
|
// Package private
|
||||||
|
|
||||||
|
static int getLevel(Configuration conf) {
|
||||||
|
return conf.getInt(BROTLI_LEVEL_KEY, BROTLI_LEVEL_DEFAULT);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int getWindow(Configuration conf) {
|
||||||
|
return conf.getInt(BROTLI_WINDOW_KEY, BROTLI_WINDOW_DEFAULT);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int getBufferSize(Configuration conf) {
|
||||||
|
return conf.getInt(BROTLI_BUFFERSIZE_KEY, BROTLI_BUFFERSIZE_DEFAULT);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,214 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.io.compress.brotli;
|
||||||
|
|
||||||
|
import com.aayushatharva.brotli4j.Brotli4jLoader;
|
||||||
|
import com.aayushatharva.brotli4j.encoder.Encoder;
|
||||||
|
import com.aayushatharva.brotli4j.encoder.Encoders;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.io.compress.CanReinit;
|
||||||
|
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
||||||
|
import org.apache.hadoop.io.compress.Compressor;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Hadoop compressor glue for Brotli4j
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class BrotliCompressor implements CanReinit, Compressor {
|
||||||
|
|
||||||
|
protected static final Logger LOG = LoggerFactory.getLogger(BrotliCompressor.class);
|
||||||
|
protected ByteBuffer inBuf, outBuf;
|
||||||
|
protected int bufferSize;
|
||||||
|
protected boolean finish, finished;
|
||||||
|
protected long bytesRead, bytesWritten;
|
||||||
|
protected Encoder.Parameters params;
|
||||||
|
|
||||||
|
static {
|
||||||
|
Brotli4jLoader.ensureAvailability();
|
||||||
|
}
|
||||||
|
|
||||||
|
BrotliCompressor(int level, int window, int bufferSize) {
|
||||||
|
this.bufferSize = bufferSize;
|
||||||
|
this.inBuf = ByteBuffer.allocate(bufferSize);
|
||||||
|
this.outBuf = ByteBuffer.allocate(bufferSize);
|
||||||
|
this.outBuf.position(bufferSize);
|
||||||
|
params = new Encoder.Parameters();
|
||||||
|
params.setQuality(level);
|
||||||
|
params.setWindow(window);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compress(byte[] b, int off, int len) throws IOException {
|
||||||
|
// If we have previously compressed our input and still have some buffered bytes
|
||||||
|
// remaining, provide them to the caller.
|
||||||
|
if (outBuf.hasRemaining()) {
|
||||||
|
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
||||||
|
outBuf.get(b, off, n);
|
||||||
|
LOG.trace("compress: read {} remaining bytes from outBuf", n);
|
||||||
|
return n;
|
||||||
|
}
|
||||||
|
// We don't actually begin compression until our caller calls finish().
|
||||||
|
if (finish) {
|
||||||
|
if (inBuf.position() > 0) {
|
||||||
|
inBuf.flip();
|
||||||
|
int uncompressed = inBuf.remaining();
|
||||||
|
// If we don't have enough capacity in our currently allocated output buffer,
|
||||||
|
// allocate a new one which does.
|
||||||
|
int needed = maxCompressedLength(uncompressed);
|
||||||
|
// Can we compress directly into the provided array?
|
||||||
|
boolean direct = false;
|
||||||
|
ByteBuffer writeBuf;
|
||||||
|
if (len <= needed) {
|
||||||
|
direct = true;
|
||||||
|
writeBuf = ByteBuffer.wrap(b, off, len);
|
||||||
|
} else {
|
||||||
|
if (outBuf.capacity() < needed) {
|
||||||
|
needed = CompressionUtil.roundInt2(needed);
|
||||||
|
LOG.trace("compress: resize outBuf {}", needed);
|
||||||
|
outBuf = ByteBuffer.allocate(needed);
|
||||||
|
} else {
|
||||||
|
outBuf.clear();
|
||||||
|
}
|
||||||
|
writeBuf = outBuf;
|
||||||
|
}
|
||||||
|
final int oldPos = writeBuf.position();
|
||||||
|
Encoders.compress(inBuf, writeBuf, params);
|
||||||
|
final int written = writeBuf.position() - oldPos;
|
||||||
|
bytesWritten += written;
|
||||||
|
inBuf.clear();
|
||||||
|
LOG.trace("compress: compressed {} -> {}", uncompressed, written);
|
||||||
|
finished = true;
|
||||||
|
if (!direct) {
|
||||||
|
outBuf.flip();
|
||||||
|
int n = Math.min(written, len);
|
||||||
|
outBuf.get(b, off, n);
|
||||||
|
LOG.trace("compress: {} bytes", n);
|
||||||
|
return n;
|
||||||
|
} else {
|
||||||
|
LOG.trace("compress: {} bytes direct", written);
|
||||||
|
return written;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
finished = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG.trace("No output");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void end() {
|
||||||
|
LOG.trace("end");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void finish() {
|
||||||
|
LOG.trace("finish");
|
||||||
|
finish = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean finished() {
|
||||||
|
boolean b = finished && !outBuf.hasRemaining();
|
||||||
|
LOG.trace("finished: {}", b);
|
||||||
|
return b;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getBytesRead() {
|
||||||
|
return bytesRead;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getBytesWritten() {
|
||||||
|
return bytesWritten;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean needsInput() {
|
||||||
|
boolean b = !finished();
|
||||||
|
LOG.trace("needsInput: {}", b);
|
||||||
|
return b;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reinit(Configuration conf) {
|
||||||
|
LOG.trace("reinit");
|
||||||
|
if (conf != null) {
|
||||||
|
// Quality or window settings might have changed
|
||||||
|
params.setQuality(BrotliCodec.getLevel(conf));
|
||||||
|
params.setWindow(BrotliCodec.getWindow(conf));
|
||||||
|
// Buffer size might have changed
|
||||||
|
int newBufferSize = BrotliCodec.getBufferSize(conf);
|
||||||
|
if (bufferSize != newBufferSize) {
|
||||||
|
bufferSize = newBufferSize;
|
||||||
|
this.inBuf = ByteBuffer.allocateDirect(bufferSize);
|
||||||
|
this.outBuf = ByteBuffer.allocateDirect(bufferSize);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
reset();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reset() {
|
||||||
|
LOG.trace("reset");
|
||||||
|
inBuf.clear();
|
||||||
|
outBuf.clear();
|
||||||
|
outBuf.position(outBuf.capacity());
|
||||||
|
bytesRead = 0;
|
||||||
|
bytesWritten = 0;
|
||||||
|
finish = false;
|
||||||
|
finished = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setDictionary(byte[] b, int off, int len) {
|
||||||
|
throw new UnsupportedOperationException("setDictionary is not supported");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setInput(byte[] b, int off, int len) {
|
||||||
|
LOG.trace("setInput: off={} len={}", off, len);
|
||||||
|
if (inBuf.remaining() < len) {
|
||||||
|
// Get a new buffer that can accomodate the accumulated input plus the additional
|
||||||
|
// input that would cause a buffer overflow without reallocation.
|
||||||
|
// This condition should be fortunately rare, because it is expensive.
|
||||||
|
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
|
||||||
|
LOG.trace("setInput: resize inBuf {}", needed);
|
||||||
|
ByteBuffer newBuf = ByteBuffer.allocate(needed);
|
||||||
|
inBuf.flip();
|
||||||
|
newBuf.put(inBuf);
|
||||||
|
inBuf = newBuf;
|
||||||
|
}
|
||||||
|
inBuf.put(b, off, len);
|
||||||
|
bytesRead += len;
|
||||||
|
finished = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Package private
|
||||||
|
|
||||||
|
int maxCompressedLength(int len) {
|
||||||
|
return len + CompressionUtil.compressionOverhead(len);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,154 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.io.compress.brotli;
|
||||||
|
|
||||||
|
import com.aayushatharva.brotli4j.Brotli4jLoader;
|
||||||
|
import com.aayushatharva.brotli4j.decoder.Decoder;
|
||||||
|
import com.aayushatharva.brotli4j.decoder.DirectDecompress;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
||||||
|
import org.apache.hadoop.io.compress.Decompressor;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Hadoop decompressor glue for Brotli4j
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class BrotliDecompressor implements Decompressor {
|
||||||
|
|
||||||
|
protected static final Logger LOG = LoggerFactory.getLogger(BrotliDecompressor.class);
|
||||||
|
protected ByteBuffer inBuf, outBuf;
|
||||||
|
protected int inLen;
|
||||||
|
protected boolean finished;
|
||||||
|
|
||||||
|
static {
|
||||||
|
Brotli4jLoader.ensureAvailability();
|
||||||
|
}
|
||||||
|
|
||||||
|
BrotliDecompressor(int bufferSize) {
|
||||||
|
this.inBuf = ByteBuffer.allocate(bufferSize);
|
||||||
|
this.outBuf = ByteBuffer.allocate(bufferSize);
|
||||||
|
this.outBuf.position(bufferSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int decompress(byte[] b, int off, int len) throws IOException {
|
||||||
|
if (outBuf.hasRemaining()) {
|
||||||
|
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
||||||
|
outBuf.get(b, off, n);
|
||||||
|
LOG.trace("decompress: read {} remaining bytes from outBuf", n);
|
||||||
|
return n;
|
||||||
|
}
|
||||||
|
if (inBuf.position() > 0) {
|
||||||
|
inBuf.flip();
|
||||||
|
int remaining = inBuf.remaining();
|
||||||
|
inLen -= remaining;
|
||||||
|
outBuf.rewind();
|
||||||
|
outBuf.limit(outBuf.capacity());
|
||||||
|
|
||||||
|
// TODO: More inefficient than it could be, but it doesn't impact decompression speed
|
||||||
|
// terribly and the brotli4j API alternatives do not seem to work correctly.
|
||||||
|
// Maybe something more clever can be done as a future improvement.
|
||||||
|
final byte[] inb = new byte[remaining];
|
||||||
|
inBuf.get(inb);
|
||||||
|
DirectDecompress result = Decoder.decompress(inb);
|
||||||
|
outBuf.put(result.getDecompressedDataByteBuf().nioBuffer());
|
||||||
|
final int written = outBuf.position();
|
||||||
|
|
||||||
|
inBuf.rewind();
|
||||||
|
inBuf.limit(inBuf.capacity());
|
||||||
|
LOG.trace("decompress: decompressed {} -> {}", remaining, written);
|
||||||
|
outBuf.flip();
|
||||||
|
int n = Math.min(written, len);
|
||||||
|
outBuf.get(b, off, n);
|
||||||
|
LOG.trace("decompress: {} bytes", n);
|
||||||
|
return n;
|
||||||
|
}
|
||||||
|
LOG.trace("decompress: No output, finished");
|
||||||
|
finished = true;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void end() {
|
||||||
|
LOG.trace("end");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean finished() {
|
||||||
|
LOG.trace("finished");
|
||||||
|
return finished;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getRemaining() {
|
||||||
|
LOG.trace("getRemaining: {}", inLen);
|
||||||
|
return inLen;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean needsDictionary() {
|
||||||
|
LOG.trace("needsDictionary");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reset() {
|
||||||
|
LOG.trace("reset");
|
||||||
|
inBuf.clear();
|
||||||
|
inLen = 0;
|
||||||
|
outBuf.clear();
|
||||||
|
outBuf.position(outBuf.capacity());
|
||||||
|
finished = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean needsInput() {
|
||||||
|
boolean b = (inBuf.position() == 0);
|
||||||
|
LOG.trace("needsInput: {}", b);
|
||||||
|
return b;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setDictionary(byte[] b, int off, int len) {
|
||||||
|
throw new UnsupportedOperationException("setDictionary is not supported");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setInput(byte[] b, int off, int len) {
|
||||||
|
LOG.trace("setInput: off={} len={}", off, len);
|
||||||
|
if (inBuf.remaining() < len) {
|
||||||
|
// Get a new buffer that can accomodate the accumulated input plus the additional
|
||||||
|
// input that would cause a buffer overflow without reallocation.
|
||||||
|
// This condition should be fortunately rare, because it is expensive.
|
||||||
|
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
|
||||||
|
LOG.trace("setInput: resize inBuf {}", needed);
|
||||||
|
ByteBuffer newBuf = ByteBuffer.allocate(needed);
|
||||||
|
inBuf.flip();
|
||||||
|
newBuf.put(inBuf);
|
||||||
|
inBuf = newBuf;
|
||||||
|
}
|
||||||
|
inBuf.put(b, off, len);
|
||||||
|
inLen += len;
|
||||||
|
finished = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,50 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.io.compress.brotli;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.io.compress.CompressionTestBase;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category(SmallTests.class)
|
||||||
|
public class TestBrotliCodec extends CompressionTestBase {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestBrotliCodec.class);
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBrotliCodecSmall() throws Exception {
|
||||||
|
codecSmallTest(new BrotliCodec());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBrotliCodecLarge() throws Exception {
|
||||||
|
codecLargeTest(new BrotliCodec(), 1.1); // poor compressability
|
||||||
|
codecLargeTest(new BrotliCodec(), 2);
|
||||||
|
codecLargeTest(new BrotliCodec(), 10); // very high compressability
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBrotliCodecVeryLarge() throws Exception {
|
||||||
|
codecVeryLargeTest(new BrotliCodec(), 3); // like text
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,57 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.io.compress.brotli;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||||
|
import org.apache.hadoop.hbase.io.compress.HFileTestBase;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category({IOTests.class, SmallTests.class})
|
||||||
|
public class TestHFileCompressionBrotli extends HFileTestBase {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestHFileCompressionBrotli.class);
|
||||||
|
|
||||||
|
private static Configuration conf;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
conf = TEST_UTIL.getConfiguration();
|
||||||
|
conf.set(Compression.BROTLI_CODEC_CLASS_KEY, BrotliCodec.class.getCanonicalName());
|
||||||
|
Compression.Algorithm.BROTLI.reload(conf);
|
||||||
|
HFileTestBase.setUpBeforeClass();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test() throws Exception {
|
||||||
|
Path path = new Path(TEST_UTIL.getDataTestDir(),
|
||||||
|
HBaseTestingUtility.getRandomUUID().toString() + ".hfile");
|
||||||
|
doTest(conf, path, Compression.Algorithm.BROTLI);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,69 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.io.compress.brotli;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
|
import org.apache.hadoop.hbase.wal.CompressedWALTestBase;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.rules.TestName;
|
||||||
|
|
||||||
|
@Category({ RegionServerTests.class, MediumTests.class })
|
||||||
|
public class TestWALCompressionBrotli extends CompressedWALTestBase {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestWALCompressionBrotli.class);
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public TestName name = new TestName();
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
|
conf.set(Compression.BROTLI_CODEC_CLASS_KEY, BrotliCodec.class.getCanonicalName());
|
||||||
|
Compression.Algorithm.BROTLI.reload(conf);
|
||||||
|
conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
|
||||||
|
conf.setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, true);
|
||||||
|
conf.set(CompressionContext.WAL_VALUE_COMPRESSION_TYPE, Compression.Algorithm.BROTLI.getName());
|
||||||
|
TEST_UTIL.startMiniDFSCluster(3);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() throws Exception {
|
||||||
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test() throws Exception {
|
||||||
|
TableName tableName = TableName.valueOf(name.getMethodName().replaceAll("[^a-zA-Z0-9]", "_"));
|
||||||
|
doTest(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -23,6 +23,7 @@ import java.io.OutputStream;
|
|||||||
import org.apache.hadoop.conf.Configurable;
|
import org.apache.hadoop.conf.Configurable;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
|
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
||||||
import org.apache.hadoop.io.compress.BlockCompressorStream;
|
import org.apache.hadoop.io.compress.BlockCompressorStream;
|
||||||
import org.apache.hadoop.io.compress.BlockDecompressorStream;
|
import org.apache.hadoop.io.compress.BlockDecompressorStream;
|
||||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||||
@ -88,8 +89,8 @@ public class Lz4Codec implements Configurable, CompressionCodec {
|
|||||||
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
|
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
int bufferSize = getBufferSize(conf);
|
int bufferSize = getBufferSize(conf);
|
||||||
int compressionOverhead = (bufferSize / 6) + 32;
|
return new BlockCompressorStream(out, c, bufferSize,
|
||||||
return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
|
CompressionUtil.compressionOverhead(bufferSize));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -110,10 +111,9 @@ public class Lz4Codec implements Configurable, CompressionCodec {
|
|||||||
// Package private
|
// Package private
|
||||||
|
|
||||||
static int getBufferSize(Configuration conf) {
|
static int getBufferSize(Configuration conf) {
|
||||||
int size = conf.getInt(LZ4_BUFFER_SIZE_KEY,
|
return conf.getInt(LZ4_BUFFER_SIZE_KEY,
|
||||||
conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
|
conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
|
||||||
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT));
|
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT));
|
||||||
return size > 0 ? size : 256 * 1024; // Don't change this default
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -58,7 +58,7 @@ public class Lz4Compressor implements CanReinit, Compressor {
|
|||||||
if (outBuf.hasRemaining()) {
|
if (outBuf.hasRemaining()) {
|
||||||
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
||||||
outBuf.get(b, off, n);
|
outBuf.get(b, off, n);
|
||||||
LOG.trace("compress: {} bytes from outBuf", n);
|
LOG.trace("compress: read {} remaining bytes from outBuf", n);
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
// We don't actually begin compression until our caller calls finish().
|
// We don't actually begin compression until our caller calls finish().
|
||||||
|
@ -53,7 +53,7 @@ public class Lz4Decompressor implements Decompressor {
|
|||||||
if (outBuf.hasRemaining()) {
|
if (outBuf.hasRemaining()) {
|
||||||
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
||||||
outBuf.get(b, off, n);
|
outBuf.get(b, off, n);
|
||||||
LOG.trace("decompress: {} bytes from outBuf", n);
|
LOG.trace("decompress: read {} remaining bytes from outBuf", n);
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
if (inBuf.position() > 0) {
|
if (inBuf.position() > 0) {
|
||||||
|
@ -31,6 +31,7 @@ import org.apache.hadoop.io.compress.CompressionOutputStream;
|
|||||||
import org.apache.hadoop.io.compress.Compressor;
|
import org.apache.hadoop.io.compress.Compressor;
|
||||||
import org.apache.hadoop.io.compress.Decompressor;
|
import org.apache.hadoop.io.compress.Decompressor;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.xerial.snappy.Snappy;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Hadoop Snappy codec implemented with Xerial Snappy.
|
* Hadoop Snappy codec implemented with Xerial Snappy.
|
||||||
@ -88,8 +89,8 @@ public class SnappyCodec implements Configurable, CompressionCodec {
|
|||||||
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
|
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
int bufferSize = getBufferSize(conf);
|
int bufferSize = getBufferSize(conf);
|
||||||
int compressionOverhead = (bufferSize / 6) + 32;
|
return new BlockCompressorStream(out, c, bufferSize,
|
||||||
return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
|
Snappy.maxCompressedLength(bufferSize) - bufferSize); // overhead only
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -110,10 +111,9 @@ public class SnappyCodec implements Configurable, CompressionCodec {
|
|||||||
// Package private
|
// Package private
|
||||||
|
|
||||||
static int getBufferSize(Configuration conf) {
|
static int getBufferSize(Configuration conf) {
|
||||||
int size = conf.getInt(SNAPPY_BUFFER_SIZE_KEY,
|
return conf.getInt(SNAPPY_BUFFER_SIZE_KEY,
|
||||||
conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
|
conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
|
||||||
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT));
|
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT));
|
||||||
return size > 0 ? size : 256 * 1024; // Don't change this default
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -55,7 +55,7 @@ public class SnappyCompressor implements CanReinit, Compressor {
|
|||||||
if (outBuf.hasRemaining()) {
|
if (outBuf.hasRemaining()) {
|
||||||
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
||||||
outBuf.get(b, off, n);
|
outBuf.get(b, off, n);
|
||||||
LOG.trace("compress: {} bytes from outBuf", n);
|
LOG.trace("compress: read {} remaining bytes from outBuf", n);
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
// We don't actually begin compression until our caller calls finish().
|
// We don't actually begin compression until our caller calls finish().
|
||||||
|
@ -49,7 +49,7 @@ public class SnappyDecompressor implements Decompressor {
|
|||||||
if (outBuf.hasRemaining()) {
|
if (outBuf.hasRemaining()) {
|
||||||
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
||||||
outBuf.get(b, off, n);
|
outBuf.get(b, off, n);
|
||||||
LOG.trace("decompress: {} bytes from outBuf", n);
|
LOG.trace("decompress: read {} remaining bytes from outBuf", n);
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
if (inBuf.position() > 0) {
|
if (inBuf.position() > 0) {
|
||||||
|
@ -22,6 +22,7 @@ import java.io.OutputStream;
|
|||||||
|
|
||||||
import org.apache.hadoop.conf.Configurable;
|
import org.apache.hadoop.conf.Configurable;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
||||||
import org.apache.hadoop.io.compress.BlockCompressorStream;
|
import org.apache.hadoop.io.compress.BlockCompressorStream;
|
||||||
import org.apache.hadoop.io.compress.BlockDecompressorStream;
|
import org.apache.hadoop.io.compress.BlockDecompressorStream;
|
||||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||||
@ -88,8 +89,8 @@ public class LzmaCodec implements Configurable, CompressionCodec {
|
|||||||
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
|
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
int bufferSize = getBufferSize(conf);
|
int bufferSize = getBufferSize(conf);
|
||||||
int compressionOverhead = (bufferSize / 6) + 32;
|
return new BlockCompressorStream(out, c, bufferSize,
|
||||||
return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
|
CompressionUtil.compressionOverhead(bufferSize));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -68,7 +68,7 @@ public class LzmaCompressor implements Compressor {
|
|||||||
if (outBuf.hasRemaining()) {
|
if (outBuf.hasRemaining()) {
|
||||||
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
||||||
outBuf.get(b, off, n);
|
outBuf.get(b, off, n);
|
||||||
LOG.trace("compress: {} bytes from outBuf", n);
|
LOG.trace("compress: read {} remaining bytes from outBuf", n);
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
// We don't actually begin compression until our caller calls finish().
|
// We don't actually begin compression until our caller calls finish().
|
||||||
@ -236,7 +236,7 @@ public class LzmaCompressor implements Compressor {
|
|||||||
// Package private
|
// Package private
|
||||||
|
|
||||||
int maxCompressedLength(int len) {
|
int maxCompressedLength(int len) {
|
||||||
return len + 32 + (len/6);
|
return len + CompressionUtil.compressionOverhead(len);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -59,7 +59,7 @@ public class LzmaDecompressor implements Decompressor {
|
|||||||
if (outBuf.hasRemaining()) {
|
if (outBuf.hasRemaining()) {
|
||||||
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
||||||
outBuf.get(b, off, n);
|
outBuf.get(b, off, n);
|
||||||
LOG.trace("decompress: {} bytes from outBuf", n);
|
LOG.trace("decompress: read {} remaining bytes from outBuf", n);
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
if (inBuf.position() > 0) {
|
if (inBuf.position() > 0) {
|
||||||
|
@ -16,6 +16,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.io.compress.zstd;
|
package org.apache.hadoop.hbase.io.compress.zstd;
|
||||||
|
|
||||||
|
import com.github.luben.zstd.Zstd;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
@ -44,6 +45,7 @@ public class ZstdCodec implements Configurable, CompressionCodec {
|
|||||||
|
|
||||||
public static final String ZSTD_LEVEL_KEY = "hbase.io.compress.zstd.level";
|
public static final String ZSTD_LEVEL_KEY = "hbase.io.compress.zstd.level";
|
||||||
public static final String ZSTD_BUFFER_SIZE_KEY = "hbase.io.compress.zstd.buffersize";
|
public static final String ZSTD_BUFFER_SIZE_KEY = "hbase.io.compress.zstd.buffersize";
|
||||||
|
public static final int ZSTD_BUFFER_SIZE_DEFAULT = 256 * 1024;
|
||||||
public static final String ZSTD_DICTIONARY_KEY = "hbase.io.compress.zstd.dictionary";
|
public static final String ZSTD_DICTIONARY_KEY = "hbase.io.compress.zstd.dictionary";
|
||||||
|
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
@ -92,8 +94,8 @@ public class ZstdCodec implements Configurable, CompressionCodec {
|
|||||||
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
|
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
int bufferSize = getBufferSize(conf);
|
int bufferSize = getBufferSize(conf);
|
||||||
int compressionOverhead = (bufferSize / 6) + 32;
|
return new BlockCompressorStream(out, c, bufferSize,
|
||||||
return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
|
(int)Zstd.compressBound(bufferSize) - bufferSize); // overhead only
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -121,10 +123,10 @@ public class ZstdCodec implements Configurable, CompressionCodec {
|
|||||||
}
|
}
|
||||||
|
|
||||||
static int getBufferSize(Configuration conf) {
|
static int getBufferSize(Configuration conf) {
|
||||||
int size = conf.getInt(ZSTD_BUFFER_SIZE_KEY,
|
return conf.getInt(ZSTD_BUFFER_SIZE_KEY,
|
||||||
conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY,
|
conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY,
|
||||||
CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT));
|
// IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT is 0! We can't allow that.
|
||||||
return size > 0 ? size : 256 * 1024; // Don't change this default
|
ZSTD_BUFFER_SIZE_DEFAULT));
|
||||||
}
|
}
|
||||||
|
|
||||||
static byte[] getDictionary(final Configuration conf) {
|
static byte[] getDictionary(final Configuration conf) {
|
||||||
|
@ -67,7 +67,7 @@ public class ZstdCompressor implements CanReinit, Compressor {
|
|||||||
if (outBuf.hasRemaining()) {
|
if (outBuf.hasRemaining()) {
|
||||||
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
||||||
outBuf.get(b, off, n);
|
outBuf.get(b, off, n);
|
||||||
LOG.trace("compress: {} bytes from outBuf", n);
|
LOG.trace("compress: read {} remaining bytes from outBuf", n);
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
// We don't actually begin compression until our caller calls finish().
|
// We don't actually begin compression until our caller calls finish().
|
||||||
|
@ -62,7 +62,7 @@ public class ZstdDecompressor implements CanReinit, Decompressor {
|
|||||||
if (outBuf.hasRemaining()) {
|
if (outBuf.hasRemaining()) {
|
||||||
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
||||||
outBuf.get(b, off, n);
|
outBuf.get(b, off, n);
|
||||||
LOG.trace("decompress: {} bytes from outBuf", n);
|
LOG.trace("decompress: read {} remaining bytes from outBuf", n);
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
if (inBuf.position() > 0) {
|
if (inBuf.position() > 0) {
|
||||||
|
@ -43,9 +43,9 @@ public class TestZstdDictionary extends CompressionTestBase {
|
|||||||
HBaseClassTestRule.forClass(TestZstdDictionary.class);
|
HBaseClassTestRule.forClass(TestZstdDictionary.class);
|
||||||
|
|
||||||
private static final String DICTIONARY_PATH = DictionaryCache.RESOURCE_SCHEME + "zstd.test.dict";
|
private static final String DICTIONARY_PATH = DictionaryCache.RESOURCE_SCHEME + "zstd.test.dict";
|
||||||
// zstd.test.data compressed with zstd.test.dict at level 3 will produce a result of
|
// zstd.test.data compressed with zstd.test.dict at level 3 with a default buffer size of 262144
|
||||||
// 358555 bytes
|
// will produce a result of 359909 bytes
|
||||||
private static final int EXPECTED_COMPRESSED_SIZE = 358555;
|
private static final int EXPECTED_COMPRESSED_SIZE = 359909;
|
||||||
|
|
||||||
private static byte[] TEST_DATA;
|
private static byte[] TEST_DATA;
|
||||||
|
|
||||||
|
@ -33,6 +33,7 @@
|
|||||||
|
|
||||||
<modules>
|
<modules>
|
||||||
<module>hbase-compression-aircompressor</module>
|
<module>hbase-compression-aircompressor</module>
|
||||||
|
<module>hbase-compression-brotli</module>
|
||||||
<module>hbase-compression-lz4</module>
|
<module>hbase-compression-lz4</module>
|
||||||
<module>hbase-compression-snappy</module>
|
<module>hbase-compression-snappy</module>
|
||||||
<module>hbase-compression-xz</module>
|
<module>hbase-compression-xz</module>
|
||||||
|
@ -3378,4 +3378,71 @@ Copyright (c) 2007-2017 The JRuby project
|
|||||||
</licenses>
|
</licenses>
|
||||||
</project>
|
</project>
|
||||||
</supplement>
|
</supplement>
|
||||||
|
<!-- Brotli4j license information is malformed -->
|
||||||
|
<!-- It has been incorrectly called Apache 2.0 in the original poms-->
|
||||||
|
<supplement>
|
||||||
|
<project>
|
||||||
|
<groupId>com.aayushatharva.brotli4j</groupId>
|
||||||
|
<artifactId>brotli4j</artifactId>
|
||||||
|
<licenses>
|
||||||
|
<license>
|
||||||
|
<name>Apache License, Version 2.0</name>
|
||||||
|
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
|
||||||
|
<distribution>repo</distribution>
|
||||||
|
</license>
|
||||||
|
</licenses>
|
||||||
|
</project>
|
||||||
|
</supplement>
|
||||||
|
<supplement>
|
||||||
|
<project>
|
||||||
|
<groupId>com.aayushatharva.brotli4j</groupId>
|
||||||
|
<artifactId>native-linux-aarch64</artifactId>
|
||||||
|
<licenses>
|
||||||
|
<license>
|
||||||
|
<name>Apache License, Version 2.0</name>
|
||||||
|
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
|
||||||
|
<distribution>repo</distribution>
|
||||||
|
</license>
|
||||||
|
</licenses>
|
||||||
|
</project>
|
||||||
|
</supplement>
|
||||||
|
<supplement>
|
||||||
|
<project>
|
||||||
|
<groupId>com.aayushatharva.brotli4j</groupId>
|
||||||
|
<artifactId>native-linux-x86_64</artifactId>
|
||||||
|
<licenses>
|
||||||
|
<license>
|
||||||
|
<name>Apache License, Version 2.0</name>
|
||||||
|
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
|
||||||
|
<distribution>repo</distribution>
|
||||||
|
</license>
|
||||||
|
</licenses>
|
||||||
|
</project>
|
||||||
|
</supplement>
|
||||||
|
<supplement>
|
||||||
|
<project>
|
||||||
|
<groupId>com.aayushatharva.brotli4j</groupId>
|
||||||
|
<artifactId>native-osx-x86_64</artifactId>
|
||||||
|
<licenses>
|
||||||
|
<license>
|
||||||
|
<name>Apache License, Version 2.0</name>
|
||||||
|
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
|
||||||
|
<distribution>repo</distribution>
|
||||||
|
</license>
|
||||||
|
</licenses>
|
||||||
|
</project>
|
||||||
|
</supplement>
|
||||||
|
<supplement>
|
||||||
|
<project>
|
||||||
|
<groupId>com.aayushatharva.brotli4j</groupId>
|
||||||
|
<artifactId>native-windows-x86_64</artifactId>
|
||||||
|
<licenses>
|
||||||
|
<license>
|
||||||
|
<name>Apache License, Version 2.0</name>
|
||||||
|
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
|
||||||
|
<distribution>repo</distribution>
|
||||||
|
</license>
|
||||||
|
</licenses>
|
||||||
|
</project>
|
||||||
|
</supplement>
|
||||||
</supplementalDataModels>
|
</supplementalDataModels>
|
||||||
|
6
pom.xml
6
pom.xml
@ -633,6 +633,7 @@
|
|||||||
<spotless.version>2.21.0</spotless.version>
|
<spotless.version>2.21.0</spotless.version>
|
||||||
<!-- compression -->
|
<!-- compression -->
|
||||||
<aircompressor.version>0.21</aircompressor.version>
|
<aircompressor.version>0.21</aircompressor.version>
|
||||||
|
<brotli4j.version>1.7.1</brotli4j.version>
|
||||||
<lz4.version>1.8.0</lz4.version>
|
<lz4.version>1.8.0</lz4.version>
|
||||||
<snappy.version>1.1.8.4</snappy.version>
|
<snappy.version>1.1.8.4</snappy.version>
|
||||||
<xz.version>1.9</xz.version>
|
<xz.version>1.9</xz.version>
|
||||||
@ -1011,6 +1012,11 @@
|
|||||||
<artifactId>hbase-compression-aircompressor</artifactId>
|
<artifactId>hbase-compression-aircompressor</artifactId>
|
||||||
<version>${project.version}</version>
|
<version>${project.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.hbase</groupId>
|
||||||
|
<artifactId>hbase-compression-brotli</artifactId>
|
||||||
|
<version>${project.version}</version>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.hbase</groupId>
|
<groupId>org.apache.hbase</groupId>
|
||||||
<artifactId>hbase-compression-lz4</artifactId>
|
<artifactId>hbase-compression-lz4</artifactId>
|
||||||
|
Loading…
x
Reference in New Issue
Block a user