HBASE-26959 Brotli compression support (#4353)
Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
This commit is contained in:
parent
9a88092817
commit
f5b10e0115
|
@ -314,6 +314,10 @@
|
|||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-compression-aircompressor</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-compression-brotli</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-compression-lz4</artifactId>
|
||||
|
|
|
@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory;
|
|||
public final class Compression {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(Compression.class);
|
||||
|
||||
|
||||
// LZO
|
||||
|
||||
public static final String LZO_CODEC_CLASS_KEY =
|
||||
|
@ -97,6 +96,13 @@ public final class Compression {
|
|||
public static final String LZMA_CODEC_CLASS_DEFAULT =
|
||||
"org.apache.hadoop.hbase.io.compress.xz.LzmaCodec";
|
||||
|
||||
// Brotli
|
||||
|
||||
public static final String BROTLI_CODEC_CLASS_KEY =
|
||||
"hbase.io.compress.brotli.codec";
|
||||
public static final String BROTLI_CODEC_CLASS_DEFAULT =
|
||||
"org.apache.hadoop.hbase.io.compress.brotli.BrotliCodec";
|
||||
|
||||
/**
|
||||
* Prevent the instantiation of class.
|
||||
*/
|
||||
|
@ -148,6 +154,7 @@ public final class Compression {
|
|||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
|
||||
value="SE_TRANSIENT_FIELD_NOT_RESTORED",
|
||||
justification="We are not serializing so doesn't apply (not sure why transient though)")
|
||||
@SuppressWarnings("ImmutableEnumChecker")
|
||||
@InterfaceAudience.Public
|
||||
public static enum Algorithm {
|
||||
// LZO is GPL and requires extra install to setup. See
|
||||
|
@ -352,6 +359,31 @@ public final class Compression {
|
|||
return lzmaCodec;
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
BROTLI("brotli", BROTLI_CODEC_CLASS_KEY, BROTLI_CODEC_CLASS_DEFAULT) {
|
||||
// Use base type to avoid compile-time dependencies.
|
||||
private volatile transient CompressionCodec brotliCodec;
|
||||
private final transient Object lock = new Object();
|
||||
@Override
|
||||
CompressionCodec getCodec(Configuration conf) {
|
||||
if (brotliCodec == null) {
|
||||
synchronized (lock) {
|
||||
if (brotliCodec == null) {
|
||||
brotliCodec = buildCodec(conf, this);
|
||||
}
|
||||
}
|
||||
}
|
||||
return brotliCodec;
|
||||
}
|
||||
@Override
|
||||
public CompressionCodec reload(Configuration conf) {
|
||||
synchronized (lock) {
|
||||
brotliCodec = buildCodec(conf, this);
|
||||
LOG.warn("Reloaded configuration for {}", name());
|
||||
return brotliCodec;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
private final Configuration conf;
|
||||
|
|
|
@ -35,4 +35,16 @@ public final class CompressionUtil {
|
|||
return v;
|
||||
}
|
||||
|
||||
/**
|
||||
* Most compression algorithms can be presented with pathological input that causes an
|
||||
* expansion rather than a compression. Hadoop's compression API requires that we calculate
|
||||
* additional buffer space required for the worst case. There is a formula developed for
|
||||
* gzip that applies as a ballpark to all LZ variants. It should be good enough for now and
|
||||
* has been tested as such with a range of different inputs.
|
||||
*/
|
||||
public static int compressionOverhead(int bufferSize) {
|
||||
// Given an input buffer of 'buffersize' bytes we presume a worst case expansion of
|
||||
// 32 bytes (block header) and addition 1/6th of the input size.
|
||||
return (bufferSize / 6) + 32;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -55,7 +55,7 @@ public abstract class HadoopCompressor<T extends Compressor>
|
|||
if (outBuf.hasRemaining()) {
|
||||
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
||||
outBuf.get(b, off, n);
|
||||
LOG.trace("compress: {} bytes from outBuf", n);
|
||||
LOG.trace("compress: read {} remaining bytes from outBuf", n);
|
||||
return n;
|
||||
}
|
||||
// We don't actually begin compression until our caller calls finish().
|
||||
|
|
|
@ -49,7 +49,7 @@ public class HadoopDecompressor<T extends Decompressor>
|
|||
if (outBuf.hasRemaining()) {
|
||||
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
||||
outBuf.get(b, off, n);
|
||||
LOG.trace("decompress: {} bytes from outBuf", n);
|
||||
LOG.trace("decompress: read {} remaining bytes from outBuf", n);
|
||||
return n;
|
||||
}
|
||||
if (inBuf.position() > 0) {
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.io.OutputStream;
|
|||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
||||
import org.apache.hadoop.io.compress.BlockCompressorStream;
|
||||
import org.apache.hadoop.io.compress.BlockDecompressorStream;
|
||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||
|
@ -89,8 +90,8 @@ public class Lz4Codec implements Configurable, CompressionCodec {
|
|||
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
|
||||
throws IOException {
|
||||
int bufferSize = getBufferSize(conf);
|
||||
int compressionOverhead = (bufferSize / 6) + 32;
|
||||
return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
|
||||
return new BlockCompressorStream(out, c, bufferSize,
|
||||
CompressionUtil.compressionOverhead(bufferSize));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -147,10 +148,9 @@ public class Lz4Codec implements Configurable, CompressionCodec {
|
|||
// Package private
|
||||
|
||||
static int getBufferSize(Configuration conf) {
|
||||
int size = conf.getInt(LZ4_BUFFER_SIZE_KEY,
|
||||
return conf.getInt(LZ4_BUFFER_SIZE_KEY,
|
||||
conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
|
||||
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT));
|
||||
return size > 0 ? size : 256 * 1024; // Don't change this default
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.io.OutputStream;
|
|||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
||||
import org.apache.hadoop.io.compress.BlockCompressorStream;
|
||||
import org.apache.hadoop.io.compress.BlockDecompressorStream;
|
||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||
|
@ -89,8 +90,8 @@ public class LzoCodec implements Configurable, CompressionCodec {
|
|||
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
|
||||
throws IOException {
|
||||
int bufferSize = getBufferSize(conf);
|
||||
int compressionOverhead = (bufferSize / 6) + 32;
|
||||
return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
|
||||
return new BlockCompressorStream(out, c, bufferSize,
|
||||
CompressionUtil.compressionOverhead(bufferSize));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -147,10 +148,9 @@ public class LzoCodec implements Configurable, CompressionCodec {
|
|||
// Package private
|
||||
|
||||
static int getBufferSize(Configuration conf) {
|
||||
int size = conf.getInt(LZO_BUFFER_SIZE_KEY,
|
||||
return conf.getInt(LZO_BUFFER_SIZE_KEY,
|
||||
conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY,
|
||||
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT));
|
||||
return size > 0 ? size : 256 * 1024; // Don't change this default
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.io.OutputStream;
|
|||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
||||
import org.apache.hadoop.io.compress.BlockCompressorStream;
|
||||
import org.apache.hadoop.io.compress.BlockDecompressorStream;
|
||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||
|
@ -89,8 +90,8 @@ public class SnappyCodec implements Configurable, CompressionCodec {
|
|||
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
|
||||
throws IOException {
|
||||
int bufferSize = getBufferSize(conf);
|
||||
int compressionOverhead = (bufferSize / 6) + 32;
|
||||
return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
|
||||
return new BlockCompressorStream(out, c, bufferSize,
|
||||
CompressionUtil.compressionOverhead(bufferSize));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -147,10 +148,9 @@ public class SnappyCodec implements Configurable, CompressionCodec {
|
|||
// Package private
|
||||
|
||||
static int getBufferSize(Configuration conf) {
|
||||
int size = conf.getInt(SNAPPY_BUFFER_SIZE_KEY,
|
||||
return conf.getInt(SNAPPY_BUFFER_SIZE_KEY,
|
||||
conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
|
||||
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT));
|
||||
return size > 0 ? size : 256 * 1024; // Don't change this default
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.io.OutputStream;
|
|||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
||||
import org.apache.hadoop.io.compress.BlockCompressorStream;
|
||||
import org.apache.hadoop.io.compress.BlockDecompressorStream;
|
||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||
|
@ -50,6 +51,7 @@ import io.airlift.compress.zstd.ZstdDecompressor;
|
|||
public class ZstdCodec implements Configurable, CompressionCodec {
|
||||
|
||||
public static final String ZSTD_BUFFER_SIZE_KEY = "hbase.io.compress.zstd.buffersize";
|
||||
public static final int ZSTD_BUFFER_SIZE_DEFAULT = 256 * 1024;
|
||||
|
||||
private Configuration conf;
|
||||
|
||||
|
@ -97,8 +99,8 @@ public class ZstdCodec implements Configurable, CompressionCodec {
|
|||
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
|
||||
throws IOException {
|
||||
int bufferSize = getBufferSize(conf);
|
||||
int compressionOverhead = (bufferSize / 6) + 32;
|
||||
return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
|
||||
return new BlockCompressorStream(out, c, bufferSize,
|
||||
CompressionUtil.compressionOverhead(bufferSize));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -155,10 +157,10 @@ public class ZstdCodec implements Configurable, CompressionCodec {
|
|||
// Package private
|
||||
|
||||
static int getBufferSize(Configuration conf) {
|
||||
int size = conf.getInt(ZSTD_BUFFER_SIZE_KEY,
|
||||
return conf.getInt(ZSTD_BUFFER_SIZE_KEY,
|
||||
conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY,
|
||||
CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT));
|
||||
return size > 0 ? size : 256 * 1024; // Don't change this default
|
||||
// IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT is 0! We can't allow that.
|
||||
ZSTD_BUFFER_SIZE_DEFAULT));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,167 @@
|
|||
<?xml version="1.0"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<!--
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
-->
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<artifactId>hbase-compression</artifactId>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<version>3.0.0-alpha-3-SNAPSHOT</version>
|
||||
<relativePath>..</relativePath>
|
||||
</parent>
|
||||
<artifactId>hbase-compression-brotli</artifactId>
|
||||
<name>Apache HBase - Compression - Brotli</name>
|
||||
<description>Compression support using Brotli4j</description>
|
||||
<build>
|
||||
<plugins>
|
||||
<!-- Testing plugins -->
|
||||
<plugin>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>net.revelc.code</groupId>
|
||||
<artifactId>warbucks-maven-plugin</artifactId>
|
||||
</plugin>
|
||||
</plugins>
|
||||
<pluginManagement>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<!--Make it so assembly:single does nothing in here-->
|
||||
<artifactId>maven-assembly-plugin</artifactId>
|
||||
<configuration>
|
||||
<skipAssembly>true</skipAssembly>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-checkstyle-plugin</artifactId>
|
||||
<configuration>
|
||||
<failOnViolation>true</failOnViolation>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>net.revelc.code</groupId>
|
||||
<artifactId>warbucks-maven-plugin</artifactId>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</pluginManagement>
|
||||
</build>
|
||||
<dependencies>
|
||||
<!-- Intra-project dependencies -->
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-common</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-logging</artifactId>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-common</artifactId>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-testing-util</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-annotations</artifactId>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.stephenc.findbugs</groupId>
|
||||
<artifactId>findbugs-annotations</artifactId>
|
||||
<scope>compile</scope>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<!-- native Java compression codecs -->
|
||||
<dependency>
|
||||
<groupId>com.aayushatharva.brotli4j</groupId>
|
||||
<artifactId>brotli4j</artifactId>
|
||||
<version>${brotli4j.version}</version>
|
||||
</dependency>
|
||||
<!--Test-->
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>jcl-over-slf4j</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>jul-to-slf4j</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>log4j-api</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>log4j-core</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>log4j-slf4j-impl</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>log4j-1.2-api</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.hamcrest</groupId>
|
||||
<artifactId>hamcrest-library</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mockito</groupId>
|
||||
<artifactId>mockito-core</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<profiles>
|
||||
<profile>
|
||||
<id>build-with-jdk11</id>
|
||||
<activation>
|
||||
<jdk>[1.11,)</jdk>
|
||||
</activation>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>javax.annotation</groupId>
|
||||
<artifactId>javax.annotation-api</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</profile>
|
||||
</profiles>
|
||||
</project>
|
|
@ -0,0 +1,127 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.compress.brotli;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
||||
import org.apache.hadoop.io.compress.BlockCompressorStream;
|
||||
import org.apache.hadoop.io.compress.BlockDecompressorStream;
|
||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||
import org.apache.hadoop.io.compress.CompressionInputStream;
|
||||
import org.apache.hadoop.io.compress.CompressionOutputStream;
|
||||
import org.apache.hadoop.io.compress.Compressor;
|
||||
import org.apache.hadoop.io.compress.Decompressor;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Hadoop brotli codec implemented with Brotli4j
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class BrotliCodec implements Configurable, CompressionCodec {
|
||||
|
||||
public static final String BROTLI_LEVEL_KEY = "hbase.io.compress.brotli.level";
|
||||
// Our default is 6, based on https://blog.cloudflare.com/results-experimenting-brotli/
|
||||
public static final int BROTLI_LEVEL_DEFAULT = 6; // [0,11] or -1
|
||||
public static final String BROTLI_WINDOW_KEY = "hbase.io.compress.brotli.window";
|
||||
public static final int BROTLI_WINDOW_DEFAULT = -1; // [10-24] or -1
|
||||
public static final String BROTLI_BUFFERSIZE_KEY = "hbase.io.compress.brotli.buffersize";
|
||||
public static final int BROTLI_BUFFERSIZE_DEFAULT = 256 * 1024;
|
||||
|
||||
private Configuration conf;
|
||||
|
||||
public BrotliCodec() {
|
||||
conf = new Configuration();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration getConf() {
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Compressor createCompressor() {
|
||||
return new BrotliCompressor(getLevel(conf), getWindow(conf), getBufferSize(conf));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Decompressor createDecompressor() {
|
||||
return new BrotliDecompressor(getBufferSize(conf));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompressionInputStream createInputStream(InputStream in) throws IOException {
|
||||
return createInputStream(in, createDecompressor());
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompressionInputStream createInputStream(InputStream in, Decompressor d)
|
||||
throws IOException {
|
||||
return new BlockDecompressorStream(in, d, getBufferSize(conf));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompressionOutputStream createOutputStream(OutputStream out) throws IOException {
|
||||
return createOutputStream(out, createCompressor());
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
|
||||
throws IOException {
|
||||
int bufferSize = getBufferSize(conf);
|
||||
return new BlockCompressorStream(out, c, bufferSize,
|
||||
CompressionUtil.compressionOverhead(bufferSize));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<? extends Compressor> getCompressorType() {
|
||||
return BrotliCompressor.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<? extends Decompressor> getDecompressorType() {
|
||||
return BrotliDecompressor.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDefaultExtension() {
|
||||
return ".br";
|
||||
}
|
||||
|
||||
// Package private
|
||||
|
||||
static int getLevel(Configuration conf) {
|
||||
return conf.getInt(BROTLI_LEVEL_KEY, BROTLI_LEVEL_DEFAULT);
|
||||
}
|
||||
|
||||
static int getWindow(Configuration conf) {
|
||||
return conf.getInt(BROTLI_WINDOW_KEY, BROTLI_WINDOW_DEFAULT);
|
||||
}
|
||||
|
||||
static int getBufferSize(Configuration conf) {
|
||||
return conf.getInt(BROTLI_BUFFERSIZE_KEY, BROTLI_BUFFERSIZE_DEFAULT);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,214 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.compress.brotli;
|
||||
|
||||
import com.aayushatharva.brotli4j.Brotli4jLoader;
|
||||
import com.aayushatharva.brotli4j.encoder.Encoder;
|
||||
import com.aayushatharva.brotli4j.encoder.Encoders;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.io.compress.CanReinit;
|
||||
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
||||
import org.apache.hadoop.io.compress.Compressor;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Hadoop compressor glue for Brotli4j
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class BrotliCompressor implements CanReinit, Compressor {
|
||||
|
||||
protected static final Logger LOG = LoggerFactory.getLogger(BrotliCompressor.class);
|
||||
protected ByteBuffer inBuf, outBuf;
|
||||
protected int bufferSize;
|
||||
protected boolean finish, finished;
|
||||
protected long bytesRead, bytesWritten;
|
||||
protected Encoder.Parameters params;
|
||||
|
||||
static {
|
||||
Brotli4jLoader.ensureAvailability();
|
||||
}
|
||||
|
||||
BrotliCompressor(int level, int window, int bufferSize) {
|
||||
this.bufferSize = bufferSize;
|
||||
this.inBuf = ByteBuffer.allocate(bufferSize);
|
||||
this.outBuf = ByteBuffer.allocate(bufferSize);
|
||||
this.outBuf.position(bufferSize);
|
||||
params = new Encoder.Parameters();
|
||||
params.setQuality(level);
|
||||
params.setWindow(window);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compress(byte[] b, int off, int len) throws IOException {
|
||||
// If we have previously compressed our input and still have some buffered bytes
|
||||
// remaining, provide them to the caller.
|
||||
if (outBuf.hasRemaining()) {
|
||||
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
||||
outBuf.get(b, off, n);
|
||||
LOG.trace("compress: read {} remaining bytes from outBuf", n);
|
||||
return n;
|
||||
}
|
||||
// We don't actually begin compression until our caller calls finish().
|
||||
if (finish) {
|
||||
if (inBuf.position() > 0) {
|
||||
inBuf.flip();
|
||||
int uncompressed = inBuf.remaining();
|
||||
// If we don't have enough capacity in our currently allocated output buffer,
|
||||
// allocate a new one which does.
|
||||
int needed = maxCompressedLength(uncompressed);
|
||||
// Can we compress directly into the provided array?
|
||||
boolean direct = false;
|
||||
ByteBuffer writeBuf;
|
||||
if (len <= needed) {
|
||||
direct = true;
|
||||
writeBuf = ByteBuffer.wrap(b, off, len);
|
||||
} else {
|
||||
if (outBuf.capacity() < needed) {
|
||||
needed = CompressionUtil.roundInt2(needed);
|
||||
LOG.trace("compress: resize outBuf {}", needed);
|
||||
outBuf = ByteBuffer.allocate(needed);
|
||||
} else {
|
||||
outBuf.clear();
|
||||
}
|
||||
writeBuf = outBuf;
|
||||
}
|
||||
final int oldPos = writeBuf.position();
|
||||
Encoders.compress(inBuf, writeBuf, params);
|
||||
final int written = writeBuf.position() - oldPos;
|
||||
bytesWritten += written;
|
||||
inBuf.clear();
|
||||
LOG.trace("compress: compressed {} -> {}", uncompressed, written);
|
||||
finished = true;
|
||||
if (!direct) {
|
||||
outBuf.flip();
|
||||
int n = Math.min(written, len);
|
||||
outBuf.get(b, off, n);
|
||||
LOG.trace("compress: {} bytes", n);
|
||||
return n;
|
||||
} else {
|
||||
LOG.trace("compress: {} bytes direct", written);
|
||||
return written;
|
||||
}
|
||||
} else {
|
||||
finished = true;
|
||||
}
|
||||
}
|
||||
LOG.trace("No output");
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void end() {
|
||||
LOG.trace("end");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void finish() {
|
||||
LOG.trace("finish");
|
||||
finish = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean finished() {
|
||||
boolean b = finished && !outBuf.hasRemaining();
|
||||
LOG.trace("finished: {}", b);
|
||||
return b;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getBytesRead() {
|
||||
return bytesRead;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getBytesWritten() {
|
||||
return bytesWritten;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean needsInput() {
|
||||
boolean b = !finished();
|
||||
LOG.trace("needsInput: {}", b);
|
||||
return b;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reinit(Configuration conf) {
|
||||
LOG.trace("reinit");
|
||||
if (conf != null) {
|
||||
// Quality or window settings might have changed
|
||||
params.setQuality(BrotliCodec.getLevel(conf));
|
||||
params.setWindow(BrotliCodec.getWindow(conf));
|
||||
// Buffer size might have changed
|
||||
int newBufferSize = BrotliCodec.getBufferSize(conf);
|
||||
if (bufferSize != newBufferSize) {
|
||||
bufferSize = newBufferSize;
|
||||
this.inBuf = ByteBuffer.allocateDirect(bufferSize);
|
||||
this.outBuf = ByteBuffer.allocateDirect(bufferSize);
|
||||
}
|
||||
}
|
||||
reset();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() {
|
||||
LOG.trace("reset");
|
||||
inBuf.clear();
|
||||
outBuf.clear();
|
||||
outBuf.position(outBuf.capacity());
|
||||
bytesRead = 0;
|
||||
bytesWritten = 0;
|
||||
finish = false;
|
||||
finished = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setDictionary(byte[] b, int off, int len) {
|
||||
throw new UnsupportedOperationException("setDictionary is not supported");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setInput(byte[] b, int off, int len) {
|
||||
LOG.trace("setInput: off={} len={}", off, len);
|
||||
if (inBuf.remaining() < len) {
|
||||
// Get a new buffer that can accomodate the accumulated input plus the additional
|
||||
// input that would cause a buffer overflow without reallocation.
|
||||
// This condition should be fortunately rare, because it is expensive.
|
||||
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
|
||||
LOG.trace("setInput: resize inBuf {}", needed);
|
||||
ByteBuffer newBuf = ByteBuffer.allocate(needed);
|
||||
inBuf.flip();
|
||||
newBuf.put(inBuf);
|
||||
inBuf = newBuf;
|
||||
}
|
||||
inBuf.put(b, off, len);
|
||||
bytesRead += len;
|
||||
finished = false;
|
||||
}
|
||||
|
||||
// Package private
|
||||
|
||||
int maxCompressedLength(int len) {
|
||||
return len + CompressionUtil.compressionOverhead(len);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,154 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.compress.brotli;
|
||||
|
||||
import com.aayushatharva.brotli4j.Brotli4jLoader;
|
||||
import com.aayushatharva.brotli4j.decoder.Decoder;
|
||||
import com.aayushatharva.brotli4j.decoder.DirectDecompress;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
||||
import org.apache.hadoop.io.compress.Decompressor;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Hadoop decompressor glue for Brotli4j
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class BrotliDecompressor implements Decompressor {
|
||||
|
||||
protected static final Logger LOG = LoggerFactory.getLogger(BrotliDecompressor.class);
|
||||
protected ByteBuffer inBuf, outBuf;
|
||||
protected int inLen;
|
||||
protected boolean finished;
|
||||
|
||||
static {
|
||||
Brotli4jLoader.ensureAvailability();
|
||||
}
|
||||
|
||||
BrotliDecompressor(int bufferSize) {
|
||||
this.inBuf = ByteBuffer.allocate(bufferSize);
|
||||
this.outBuf = ByteBuffer.allocate(bufferSize);
|
||||
this.outBuf.position(bufferSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int decompress(byte[] b, int off, int len) throws IOException {
|
||||
if (outBuf.hasRemaining()) {
|
||||
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
||||
outBuf.get(b, off, n);
|
||||
LOG.trace("decompress: read {} remaining bytes from outBuf", n);
|
||||
return n;
|
||||
}
|
||||
if (inBuf.position() > 0) {
|
||||
inBuf.flip();
|
||||
int remaining = inBuf.remaining();
|
||||
inLen -= remaining;
|
||||
outBuf.rewind();
|
||||
outBuf.limit(outBuf.capacity());
|
||||
|
||||
// TODO: More inefficient than it could be, but it doesn't impact decompression speed
|
||||
// terribly and the brotli4j API alternatives do not seem to work correctly.
|
||||
// Maybe something more clever can be done as a future improvement.
|
||||
final byte[] inb = new byte[remaining];
|
||||
inBuf.get(inb);
|
||||
DirectDecompress result = Decoder.decompress(inb);
|
||||
outBuf.put(result.getDecompressedDataByteBuf().nioBuffer());
|
||||
final int written = outBuf.position();
|
||||
|
||||
inBuf.rewind();
|
||||
inBuf.limit(inBuf.capacity());
|
||||
LOG.trace("decompress: decompressed {} -> {}", remaining, written);
|
||||
outBuf.flip();
|
||||
int n = Math.min(written, len);
|
||||
outBuf.get(b, off, n);
|
||||
LOG.trace("decompress: {} bytes", n);
|
||||
return n;
|
||||
}
|
||||
LOG.trace("decompress: No output, finished");
|
||||
finished = true;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void end() {
|
||||
LOG.trace("end");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean finished() {
|
||||
LOG.trace("finished");
|
||||
return finished;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getRemaining() {
|
||||
LOG.trace("getRemaining: {}", inLen);
|
||||
return inLen;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean needsDictionary() {
|
||||
LOG.trace("needsDictionary");
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() {
|
||||
LOG.trace("reset");
|
||||
inBuf.clear();
|
||||
inLen = 0;
|
||||
outBuf.clear();
|
||||
outBuf.position(outBuf.capacity());
|
||||
finished = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean needsInput() {
|
||||
boolean b = (inBuf.position() == 0);
|
||||
LOG.trace("needsInput: {}", b);
|
||||
return b;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setDictionary(byte[] b, int off, int len) {
|
||||
throw new UnsupportedOperationException("setDictionary is not supported");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setInput(byte[] b, int off, int len) {
|
||||
LOG.trace("setInput: off={} len={}", off, len);
|
||||
if (inBuf.remaining() < len) {
|
||||
// Get a new buffer that can accomodate the accumulated input plus the additional
|
||||
// input that would cause a buffer overflow without reallocation.
|
||||
// This condition should be fortunately rare, because it is expensive.
|
||||
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
|
||||
LOG.trace("setInput: resize inBuf {}", needed);
|
||||
ByteBuffer newBuf = ByteBuffer.allocate(needed);
|
||||
inBuf.flip();
|
||||
newBuf.put(inBuf);
|
||||
inBuf = newBuf;
|
||||
}
|
||||
inBuf.put(b, off, len);
|
||||
inLen += len;
|
||||
finished = false;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,50 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.compress.brotli;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.io.compress.CompressionTestBase;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category(SmallTests.class)
|
||||
public class TestBrotliCodec extends CompressionTestBase {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestBrotliCodec.class);
|
||||
|
||||
@Test
|
||||
public void testBrotliCodecSmall() throws Exception {
|
||||
codecSmallTest(new BrotliCodec());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBrotliCodecLarge() throws Exception {
|
||||
codecLargeTest(new BrotliCodec(), 1.1); // poor compressability
|
||||
codecLargeTest(new BrotliCodec(), 2);
|
||||
codecLargeTest(new BrotliCodec(), 10); // very high compressability
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBrotliCodecVeryLarge() throws Exception {
|
||||
codecVeryLargeTest(new BrotliCodec(), 3); // like text
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.compress.brotli;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtil;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.compress.HFileTestBase;
|
||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({IOTests.class, SmallTests.class})
|
||||
public class TestHFileCompressionBrotli extends HFileTestBase {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestHFileCompressionBrotli.class);
|
||||
|
||||
private static Configuration conf;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
conf = TEST_UTIL.getConfiguration();
|
||||
conf.set(Compression.BROTLI_CODEC_CLASS_KEY, BrotliCodec.class.getCanonicalName());
|
||||
Compression.Algorithm.BROTLI.reload(conf);
|
||||
HFileTestBase.setUpBeforeClass();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() throws Exception {
|
||||
Path path = new Path(TEST_UTIL.getDataTestDir(),
|
||||
HBaseTestingUtil.getRandomUUID().toString() + ".hfile");
|
||||
doTest(conf, path, Compression.Algorithm.BROTLI);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,69 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.compress.brotli;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.wal.CompressedWALTestBase;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
|
||||
@Category({ RegionServerTests.class, MediumTests.class })
|
||||
public class TestWALCompressionBrotli extends CompressedWALTestBase {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestWALCompressionBrotli.class);
|
||||
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
conf.set(Compression.BROTLI_CODEC_CLASS_KEY, BrotliCodec.class.getCanonicalName());
|
||||
Compression.Algorithm.BROTLI.reload(conf);
|
||||
conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
|
||||
conf.setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, true);
|
||||
conf.set(CompressionContext.WAL_VALUE_COMPRESSION_TYPE, Compression.Algorithm.BROTLI.getName());
|
||||
TEST_UTIL.startMiniDFSCluster(3);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() throws Exception {
|
||||
TableName tableName = TableName.valueOf(name.getMethodName().replaceAll("[^a-zA-Z0-9]", "_"));
|
||||
doTest(tableName);
|
||||
}
|
||||
|
||||
}
|
|
@ -22,6 +22,7 @@ import java.io.OutputStream;
|
|||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
||||
import org.apache.hadoop.io.compress.BlockCompressorStream;
|
||||
import org.apache.hadoop.io.compress.BlockDecompressorStream;
|
||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||
|
@ -87,8 +88,8 @@ public class Lz4Codec implements Configurable, CompressionCodec {
|
|||
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
|
||||
throws IOException {
|
||||
int bufferSize = getBufferSize(conf);
|
||||
int compressionOverhead = (bufferSize / 6) + 32;
|
||||
return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
|
||||
return new BlockCompressorStream(out, c, bufferSize,
|
||||
CompressionUtil.compressionOverhead(bufferSize));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -109,10 +110,9 @@ public class Lz4Codec implements Configurable, CompressionCodec {
|
|||
// Package private
|
||||
|
||||
static int getBufferSize(Configuration conf) {
|
||||
int size = conf.getInt(LZ4_BUFFER_SIZE_KEY,
|
||||
return conf.getInt(LZ4_BUFFER_SIZE_KEY,
|
||||
conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
|
||||
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT));
|
||||
return size > 0 ? size : 256 * 1024; // Don't change this default
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -56,7 +56,7 @@ public class Lz4Compressor implements CanReinit, Compressor {
|
|||
if (outBuf.hasRemaining()) {
|
||||
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
||||
outBuf.get(b, off, n);
|
||||
LOG.trace("compress: {} bytes from outBuf", n);
|
||||
LOG.trace("compress: read {} remaining bytes from outBuf", n);
|
||||
return n;
|
||||
}
|
||||
// We don't actually begin compression until our caller calls finish().
|
||||
|
|
|
@ -51,7 +51,7 @@ public class Lz4Decompressor implements Decompressor {
|
|||
if (outBuf.hasRemaining()) {
|
||||
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
||||
outBuf.get(b, off, n);
|
||||
LOG.trace("decompress: {} bytes from outBuf", n);
|
||||
LOG.trace("decompress: read {} remaining bytes from outBuf", n);
|
||||
return n;
|
||||
}
|
||||
if (inBuf.position() > 0) {
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.io.compress.CompressionOutputStream;
|
|||
import org.apache.hadoop.io.compress.Compressor;
|
||||
import org.apache.hadoop.io.compress.Decompressor;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.xerial.snappy.Snappy;
|
||||
|
||||
/**
|
||||
* Hadoop Snappy codec implemented with Xerial Snappy.
|
||||
|
@ -87,8 +88,8 @@ public class SnappyCodec implements Configurable, CompressionCodec {
|
|||
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
|
||||
throws IOException {
|
||||
int bufferSize = getBufferSize(conf);
|
||||
int compressionOverhead = (bufferSize / 6) + 32;
|
||||
return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
|
||||
return new BlockCompressorStream(out, c, bufferSize,
|
||||
Snappy.maxCompressedLength(bufferSize) - bufferSize); // overhead only
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -109,10 +110,9 @@ public class SnappyCodec implements Configurable, CompressionCodec {
|
|||
// Package private
|
||||
|
||||
static int getBufferSize(Configuration conf) {
|
||||
int size = conf.getInt(SNAPPY_BUFFER_SIZE_KEY,
|
||||
return conf.getInt(SNAPPY_BUFFER_SIZE_KEY,
|
||||
conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
|
||||
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT));
|
||||
return size > 0 ? size : 256 * 1024; // Don't change this default
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -53,7 +53,7 @@ public class SnappyCompressor implements CanReinit, Compressor {
|
|||
if (outBuf.hasRemaining()) {
|
||||
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
||||
outBuf.get(b, off, n);
|
||||
LOG.trace("compress: {} bytes from outBuf", n);
|
||||
LOG.trace("compress: read {} remaining bytes from outBuf", n);
|
||||
return n;
|
||||
}
|
||||
// We don't actually begin compression until our caller calls finish().
|
||||
|
|
|
@ -47,7 +47,7 @@ public class SnappyDecompressor implements Decompressor {
|
|||
if (outBuf.hasRemaining()) {
|
||||
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
||||
outBuf.get(b, off, n);
|
||||
LOG.trace("decompress: {} bytes from outBuf", n);
|
||||
LOG.trace("decompress: read {} remaining bytes from outBuf", n);
|
||||
return n;
|
||||
}
|
||||
if (inBuf.position() > 0) {
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.io.InputStream;
|
|||
import java.io.OutputStream;
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
||||
import org.apache.hadoop.io.compress.BlockCompressorStream;
|
||||
import org.apache.hadoop.io.compress.BlockDecompressorStream;
|
||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||
|
@ -87,8 +88,8 @@ public class LzmaCodec implements Configurable, CompressionCodec {
|
|||
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
|
||||
throws IOException {
|
||||
int bufferSize = getBufferSize(conf);
|
||||
int compressionOverhead = (bufferSize / 6) + 32;
|
||||
return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
|
||||
return new BlockCompressorStream(out, c, bufferSize,
|
||||
CompressionUtil.compressionOverhead(bufferSize));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -67,7 +67,7 @@ public class LzmaCompressor implements Compressor {
|
|||
if (outBuf.hasRemaining()) {
|
||||
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
||||
outBuf.get(b, off, n);
|
||||
LOG.trace("compress: {} bytes from outBuf", n);
|
||||
LOG.trace("compress: read {} remaining bytes from outBuf", n);
|
||||
return n;
|
||||
}
|
||||
// We don't actually begin compression until our caller calls finish().
|
||||
|
@ -235,7 +235,7 @@ public class LzmaCompressor implements Compressor {
|
|||
// Package private
|
||||
|
||||
int maxCompressedLength(int len) {
|
||||
return len + 32 + (len/6);
|
||||
return len + CompressionUtil.compressionOverhead(len);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -58,7 +58,7 @@ public class LzmaDecompressor implements Decompressor {
|
|||
if (outBuf.hasRemaining()) {
|
||||
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
||||
outBuf.get(b, off, n);
|
||||
LOG.trace("decompress: {} bytes from outBuf", n);
|
||||
LOG.trace("decompress: read {} remaining bytes from outBuf", n);
|
||||
return n;
|
||||
}
|
||||
if (inBuf.position() > 0) {
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.io.compress.zstd;
|
||||
|
||||
import com.github.luben.zstd.Zstd;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
|
@ -44,6 +45,7 @@ public class ZstdCodec implements Configurable, CompressionCodec {
|
|||
|
||||
public static final String ZSTD_LEVEL_KEY = "hbase.io.compress.zstd.level";
|
||||
public static final String ZSTD_BUFFER_SIZE_KEY = "hbase.io.compress.zstd.buffersize";
|
||||
public static final int ZSTD_BUFFER_SIZE_DEFAULT = 256 * 1024;
|
||||
public static final String ZSTD_DICTIONARY_KEY = "hbase.io.compress.zstd.dictionary";
|
||||
|
||||
private Configuration conf;
|
||||
|
@ -92,8 +94,8 @@ public class ZstdCodec implements Configurable, CompressionCodec {
|
|||
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
|
||||
throws IOException {
|
||||
int bufferSize = getBufferSize(conf);
|
||||
int compressionOverhead = (bufferSize / 6) + 32;
|
||||
return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
|
||||
return new BlockCompressorStream(out, c, bufferSize,
|
||||
(int)Zstd.compressBound(bufferSize) - bufferSize); // overhead only
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -121,10 +123,10 @@ public class ZstdCodec implements Configurable, CompressionCodec {
|
|||
}
|
||||
|
||||
static int getBufferSize(Configuration conf) {
|
||||
int size = conf.getInt(ZSTD_BUFFER_SIZE_KEY,
|
||||
return conf.getInt(ZSTD_BUFFER_SIZE_KEY,
|
||||
conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY,
|
||||
CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT));
|
||||
return size > 0 ? size : 256 * 1024; // Don't change this default
|
||||
// IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT is 0! We can't allow that.
|
||||
ZSTD_BUFFER_SIZE_DEFAULT));
|
||||
}
|
||||
|
||||
static byte[] getDictionary(final Configuration conf) {
|
||||
|
|
|
@ -65,7 +65,7 @@ public class ZstdCompressor implements CanReinit, Compressor {
|
|||
if (outBuf.hasRemaining()) {
|
||||
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
||||
outBuf.get(b, off, n);
|
||||
LOG.trace("compress: {} bytes from outBuf", n);
|
||||
LOG.trace("compress: read {} remaining bytes from outBuf", n);
|
||||
return n;
|
||||
}
|
||||
// We don't actually begin compression until our caller calls finish().
|
||||
|
|
|
@ -62,7 +62,7 @@ public class ZstdDecompressor implements CanReinit, Decompressor {
|
|||
if (outBuf.hasRemaining()) {
|
||||
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
||||
outBuf.get(b, off, n);
|
||||
LOG.trace("decompress: {} bytes from outBuf", n);
|
||||
LOG.trace("decompress: read {} remaining bytes from outBuf", n);
|
||||
return n;
|
||||
}
|
||||
if (inBuf.position() > 0) {
|
||||
|
|
|
@ -43,9 +43,9 @@ public class TestZstdDictionary extends CompressionTestBase {
|
|||
HBaseClassTestRule.forClass(TestZstdDictionary.class);
|
||||
|
||||
private static final String DICTIONARY_PATH = DictionaryCache.RESOURCE_SCHEME + "zstd.test.dict";
|
||||
// zstd.test.data compressed with zstd.test.dict at level 3 will produce a result of
|
||||
// 358555 bytes
|
||||
private static final int EXPECTED_COMPRESSED_SIZE = 358555;
|
||||
// zstd.test.data compressed with zstd.test.dict at level 3 with a default buffer size of 262144
|
||||
// will produce a result of 359909 bytes
|
||||
private static final int EXPECTED_COMPRESSED_SIZE = 359909;
|
||||
|
||||
private static byte[] TEST_DATA;
|
||||
|
||||
|
|
|
@ -33,6 +33,7 @@
|
|||
|
||||
<modules>
|
||||
<module>hbase-compression-aircompressor</module>
|
||||
<module>hbase-compression-brotli</module>
|
||||
<module>hbase-compression-lz4</module>
|
||||
<module>hbase-compression-snappy</module>
|
||||
<module>hbase-compression-xz</module>
|
||||
|
|
|
@ -2322,4 +2322,71 @@ Copyright (c) 2007-2017 The JRuby project
|
|||
</licenses>
|
||||
</project>
|
||||
</supplement>
|
||||
<!-- Brotli4j license information is malformed -->
|
||||
<!-- It has been incorrectly called Apache 2.0 in the original poms-->
|
||||
<supplement>
|
||||
<project>
|
||||
<groupId>com.aayushatharva.brotli4j</groupId>
|
||||
<artifactId>brotli4j</artifactId>
|
||||
<licenses>
|
||||
<license>
|
||||
<name>Apache License, Version 2.0</name>
|
||||
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
|
||||
<distribution>repo</distribution>
|
||||
</license>
|
||||
</licenses>
|
||||
</project>
|
||||
</supplement>
|
||||
<supplement>
|
||||
<project>
|
||||
<groupId>com.aayushatharva.brotli4j</groupId>
|
||||
<artifactId>native-linux-aarch64</artifactId>
|
||||
<licenses>
|
||||
<license>
|
||||
<name>Apache License, Version 2.0</name>
|
||||
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
|
||||
<distribution>repo</distribution>
|
||||
</license>
|
||||
</licenses>
|
||||
</project>
|
||||
</supplement>
|
||||
<supplement>
|
||||
<project>
|
||||
<groupId>com.aayushatharva.brotli4j</groupId>
|
||||
<artifactId>native-linux-x86_64</artifactId>
|
||||
<licenses>
|
||||
<license>
|
||||
<name>Apache License, Version 2.0</name>
|
||||
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
|
||||
<distribution>repo</distribution>
|
||||
</license>
|
||||
</licenses>
|
||||
</project>
|
||||
</supplement>
|
||||
<supplement>
|
||||
<project>
|
||||
<groupId>com.aayushatharva.brotli4j</groupId>
|
||||
<artifactId>native-osx-x86_64</artifactId>
|
||||
<licenses>
|
||||
<license>
|
||||
<name>Apache License, Version 2.0</name>
|
||||
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
|
||||
<distribution>repo</distribution>
|
||||
</license>
|
||||
</licenses>
|
||||
</project>
|
||||
</supplement>
|
||||
<supplement>
|
||||
<project>
|
||||
<groupId>com.aayushatharva.brotli4j</groupId>
|
||||
<artifactId>native-windows-x86_64</artifactId>
|
||||
<licenses>
|
||||
<license>
|
||||
<name>Apache License, Version 2.0</name>
|
||||
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
|
||||
<distribution>repo</distribution>
|
||||
</license>
|
||||
</licenses>
|
||||
</project>
|
||||
</supplement>
|
||||
</supplementalDataModels>
|
||||
|
|
6
pom.xml
6
pom.xml
|
@ -854,6 +854,7 @@
|
|||
<spotless.version>2.21.0</spotless.version>
|
||||
<!-- compression -->
|
||||
<aircompressor.version>0.21</aircompressor.version>
|
||||
<brotli4j.version>1.7.1</brotli4j.version>
|
||||
<lz4.version>1.8.0</lz4.version>
|
||||
<snappy.version>1.1.8.4</snappy.version>
|
||||
<xz.version>1.9</xz.version>
|
||||
|
@ -1219,6 +1220,11 @@
|
|||
<artifactId>hbase-compression-aircompressor</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-compression-brotli</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-compression-lz4</artifactId>
|
||||
|
|
Loading…
Reference in New Issue