Revert "HBASE-26353 Support loadable dictionaries in hbase-compression-zstd (#3748)"
This reverts commit bfa4584125
.
This is not ready yet. There are some code paths remaining where store
configuration (CompoundConfiguration) is not passed into the block decoding
context. Found with additional integration tests.
This commit is contained in:
parent
0d4982404c
commit
e0813e5402
|
@ -21,8 +21,6 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class CompressionUtil {
|
public class CompressionUtil {
|
||||||
|
|
||||||
private CompressionUtil() { }
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Round up to the next power of two, unless the value would become negative (ints
|
* Round up to the next power of two, unless the value would become negative (ints
|
||||||
* are signed), in which case just return Integer.MAX_VALUE.
|
* are signed), in which case just return Integer.MAX_VALUE.
|
||||||
|
|
|
@ -1,164 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with this
|
|
||||||
* work for additional information regarding copyright ownership. The ASF
|
|
||||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
* License for the specific language governing permissions and limitations under
|
|
||||||
* the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.io.compress;
|
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
|
||||||
import java.io.FileNotFoundException;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A utility class for managing compressor/decompressor dictionary loading and caching of load
|
|
||||||
* results. Useful for any codec that can support changing dictionaries at runtime,
|
|
||||||
* such as ZStandard.
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public class DictionaryCache {
|
|
||||||
|
|
||||||
public static final String DICTIONARY_MAX_SIZE_KEY = "hbase.io.compress.dictionary.max.size";
|
|
||||||
public static final int DEFAULT_DICTIONARY_MAX_SIZE = 10 * 1024 * 1024;
|
|
||||||
public static final String RESOURCE_SCHEME = "resource://";
|
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(DictionaryCache.class);
|
|
||||||
private static volatile LoadingCache<String, byte[]> CACHE;
|
|
||||||
|
|
||||||
private DictionaryCache() { }
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Load a dictionary or return a previously cached load.
|
|
||||||
* @param conf configuration
|
|
||||||
* @param path the hadoop Path where the dictionary is located, as a String
|
|
||||||
* @return the dictionary bytes if successful, null otherwise
|
|
||||||
*/
|
|
||||||
public static byte[] getDictionary(final Configuration conf, final String path)
|
|
||||||
throws IOException {
|
|
||||||
if (path == null || path.isEmpty()) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
// Create the dictionary loading cache if we haven't already
|
|
||||||
if (CACHE == null) {
|
|
||||||
synchronized (DictionaryCache.class) {
|
|
||||||
if (CACHE == null) {
|
|
||||||
final int maxSize = conf.getInt(DICTIONARY_MAX_SIZE_KEY, DEFAULT_DICTIONARY_MAX_SIZE);
|
|
||||||
CACHE = CacheBuilder.newBuilder()
|
|
||||||
.maximumSize(100)
|
|
||||||
.expireAfterAccess(10, TimeUnit.MINUTES)
|
|
||||||
.build(
|
|
||||||
new CacheLoader<String, byte[]>() {
|
|
||||||
public byte[] load(String s) throws Exception {
|
|
||||||
byte[] bytes;
|
|
||||||
if (path.startsWith(RESOURCE_SCHEME)) {
|
|
||||||
bytes = loadFromResource(conf, path, maxSize);
|
|
||||||
} else {
|
|
||||||
bytes = loadFromHadoopFs(conf, path, maxSize);
|
|
||||||
}
|
|
||||||
LOG.info("Loaded dictionary from {} (size {})", s, bytes.length);
|
|
||||||
return bytes;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get or load the dictionary for the given path
|
|
||||||
try {
|
|
||||||
return CACHE.get(path);
|
|
||||||
} catch (ExecutionException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Visible for testing
|
|
||||||
public static byte[] loadFromResource(final Configuration conf, final String s,
|
|
||||||
final int maxSize) throws IOException {
|
|
||||||
if (!s.startsWith(RESOURCE_SCHEME)) {
|
|
||||||
throw new IllegalArgumentException("Path does not start with " + RESOURCE_SCHEME);
|
|
||||||
}
|
|
||||||
final String path = s.substring(RESOURCE_SCHEME.length(), s.length());
|
|
||||||
final InputStream in = DictionaryCache.class.getClassLoader().getResourceAsStream(path);
|
|
||||||
if (in == null) {
|
|
||||||
throw new FileNotFoundException("Resource " + path + " not found");
|
|
||||||
}
|
|
||||||
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
|
||||||
final byte[] buffer = new byte[8192];
|
|
||||||
try {
|
|
||||||
int n, len = 0;
|
|
||||||
do {
|
|
||||||
n = in.read(buffer);
|
|
||||||
if (n > 0) {
|
|
||||||
len += n;
|
|
||||||
if (len > maxSize) {
|
|
||||||
throw new IllegalArgumentException("Dictionary " + s + " is too large" +
|
|
||||||
", limit=" + maxSize);
|
|
||||||
}
|
|
||||||
baos.write(buffer, 0, n);
|
|
||||||
}
|
|
||||||
} while (n > 0);
|
|
||||||
} finally {
|
|
||||||
in.close();
|
|
||||||
}
|
|
||||||
return baos.toByteArray();
|
|
||||||
}
|
|
||||||
|
|
||||||
private static byte[] loadFromHadoopFs(final Configuration conf, final String s,
|
|
||||||
final int maxSize) throws IOException {
|
|
||||||
final Path path = new Path(s);
|
|
||||||
final FileSystem fs = FileSystem.get(path.toUri(), conf);
|
|
||||||
final FileStatus stat = fs.getFileStatus(path);
|
|
||||||
if (!stat.isFile()) {
|
|
||||||
throw new IllegalArgumentException(s + " is not a file");
|
|
||||||
}
|
|
||||||
if (stat.getLen() > maxSize) {
|
|
||||||
throw new IllegalArgumentException("Dictionary " + s + " is too large" +
|
|
||||||
", size=" + stat.getLen() + ", limit=" + maxSize);
|
|
||||||
}
|
|
||||||
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
|
||||||
final byte[] buffer = new byte[8192];
|
|
||||||
try (final FSDataInputStream in = fs.open(path)) {
|
|
||||||
int n;
|
|
||||||
do {
|
|
||||||
n = in.read(buffer);
|
|
||||||
if (n > 0) {
|
|
||||||
baos.write(buffer, 0, n);
|
|
||||||
}
|
|
||||||
} while (n > 0);
|
|
||||||
}
|
|
||||||
return baos.toByteArray();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Visible for testing
|
|
||||||
public static boolean contains(String dictionaryPath) {
|
|
||||||
if (CACHE != null) {
|
|
||||||
return CACHE.asMap().containsKey(dictionaryPath);
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -17,10 +17,12 @@
|
||||||
package org.apache.hadoop.hbase.io.compress;
|
package org.apache.hadoop.hbase.io.compress;
|
||||||
|
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configurable;
|
import org.apache.hadoop.conf.Configurable;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
@ -29,8 +31,6 @@ import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||||
import org.apache.hadoop.io.compress.CompressionInputStream;
|
import org.apache.hadoop.io.compress.CompressionInputStream;
|
||||||
import org.apache.hadoop.io.compress.CompressionOutputStream;
|
import org.apache.hadoop.io.compress.CompressionOutputStream;
|
||||||
import org.apache.hadoop.io.compress.Compressor;
|
|
||||||
import org.apache.hadoop.io.compress.Decompressor;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -39,11 +39,11 @@ public class CompressionTestBase {
|
||||||
|
|
||||||
protected static final Logger LOG = LoggerFactory.getLogger(CompressionTestBase.class);
|
protected static final Logger LOG = LoggerFactory.getLogger(CompressionTestBase.class);
|
||||||
|
|
||||||
protected static final int LARGE_SIZE = 10 * 1024 * 1024;
|
static final int LARGE_SIZE = 10 * 1024 * 1024;
|
||||||
protected static final int VERY_LARGE_SIZE = 100 * 1024 * 1024;
|
static final int VERY_LARGE_SIZE = 100 * 1024 * 1024;
|
||||||
protected static final int BLOCK_SIZE = 4096;
|
static final int BLOCK_SIZE = 4096;
|
||||||
|
|
||||||
protected static final byte[] SMALL_INPUT;
|
static final byte[] SMALL_INPUT;
|
||||||
static {
|
static {
|
||||||
// 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987, 1597
|
// 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987, 1597
|
||||||
SMALL_INPUT = new byte[1+1+2+3+5+8+13+21+34+55+89+144+233+377+610+987+1597];
|
SMALL_INPUT = new byte[1+1+2+3+5+8+13+21+34+55+89+144+233+377+610+987+1597];
|
||||||
|
@ -67,21 +67,15 @@ public class CompressionTestBase {
|
||||||
Arrays.fill(SMALL_INPUT, off, (off+=1597), (byte)'Q');
|
Arrays.fill(SMALL_INPUT, off, (off+=1597), (byte)'Q');
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void codecTest(final CompressionCodec codec, final byte[][] input) throws Exception {
|
protected void codecTest(final CompressionCodec codec, final byte[][] input)
|
||||||
codecTest(codec, input, null);
|
throws Exception {
|
||||||
}
|
|
||||||
|
|
||||||
protected void codecTest(final CompressionCodec codec, final byte[][] input,
|
|
||||||
final Integer expectedCompressedSize) throws Exception {
|
|
||||||
// We do this in Compression.java
|
// We do this in Compression.java
|
||||||
((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024);
|
((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024);
|
||||||
// Compress
|
// Compress
|
||||||
long start = EnvironmentEdgeManager.currentTime();
|
|
||||||
Compressor compressor = codec.createCompressor();
|
|
||||||
compressor.reinit(((Configurable)codec).getConf());
|
|
||||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||||
CompressionOutputStream out = codec.createOutputStream(baos, compressor);
|
CompressionOutputStream out = codec.createOutputStream(baos);
|
||||||
int inLen = 0;
|
int inLen = 0;
|
||||||
|
long start = EnvironmentEdgeManager.currentTime();
|
||||||
for (int i = 0; i < input.length; i++) {
|
for (int i = 0; i < input.length; i++) {
|
||||||
out.write(input[i]);
|
out.write(input[i]);
|
||||||
inLen += input[i].length;
|
inLen += input[i].length;
|
||||||
|
@ -91,18 +85,9 @@ public class CompressionTestBase {
|
||||||
final byte[] compressed = baos.toByteArray();
|
final byte[] compressed = baos.toByteArray();
|
||||||
LOG.info("{} compressed {} bytes to {} bytes in {} ms", codec.getClass().getSimpleName(),
|
LOG.info("{} compressed {} bytes to {} bytes in {} ms", codec.getClass().getSimpleName(),
|
||||||
inLen, compressed.length, end - start);
|
inLen, compressed.length, end - start);
|
||||||
if (expectedCompressedSize != null) {
|
|
||||||
assertTrue("Expected compressed size does not match: (expected=" + expectedCompressedSize +
|
|
||||||
", actual=" + compressed.length + ")", expectedCompressedSize == compressed.length);
|
|
||||||
}
|
|
||||||
// Decompress
|
// Decompress
|
||||||
final byte[] plain = new byte[inLen];
|
final byte[] plain = new byte[inLen];
|
||||||
Decompressor decompressor = codec.createDecompressor();
|
CompressionInputStream in = codec.createInputStream(new ByteArrayInputStream(compressed));
|
||||||
if (decompressor instanceof CanReinit) {
|
|
||||||
((CanReinit)decompressor).reinit(((Configurable)codec).getConf());
|
|
||||||
}
|
|
||||||
CompressionInputStream in = codec.createInputStream(new ByteArrayInputStream(compressed),
|
|
||||||
decompressor);
|
|
||||||
start = EnvironmentEdgeManager.currentTime();
|
start = EnvironmentEdgeManager.currentTime();
|
||||||
IOUtils.readFully(in, plain, 0, plain.length);
|
IOUtils.readFully(in, plain, 0, plain.length);
|
||||||
in.close();
|
in.close();
|
||||||
|
@ -128,37 +113,29 @@ public class CompressionTestBase {
|
||||||
/**
|
/**
|
||||||
* Test with a large input (1MB) divided into blocks of 4KB.
|
* Test with a large input (1MB) divided into blocks of 4KB.
|
||||||
*/
|
*/
|
||||||
protected void codecLargeTest(final CompressionCodec codec, final double sigma)
|
protected void codecLargeTest(final CompressionCodec codec, final double sigma) throws Exception {
|
||||||
throws Exception {
|
RandomDistribution.DiscreteRNG zipf =
|
||||||
RandomDistribution.DiscreteRNG rng =
|
|
||||||
new RandomDistribution.Zipf(new Random(), 0, Byte.MAX_VALUE, sigma);
|
new RandomDistribution.Zipf(new Random(), 0, Byte.MAX_VALUE, sigma);
|
||||||
final byte[][] input = new byte[LARGE_SIZE/BLOCK_SIZE][BLOCK_SIZE];
|
final byte[][] input = new byte[LARGE_SIZE/BLOCK_SIZE][BLOCK_SIZE];
|
||||||
fill(rng, input);
|
for (int i = 0; i < input.length; i++) {
|
||||||
|
for (int j = 0; j < input[i].length; j++) {
|
||||||
|
input[i][j] = (byte)zipf.nextInt();
|
||||||
|
}
|
||||||
|
}
|
||||||
codecTest(codec, input);
|
codecTest(codec, input);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test with a very large input (100MB) as a single input buffer.
|
* Test with a very large input (100MB) as a single input buffer.
|
||||||
*/
|
*/
|
||||||
protected void codecVeryLargeTest(final CompressionCodec codec, final double sigma)
|
protected void codecVeryLargeTest(final CompressionCodec codec, final double sigma) throws Exception {
|
||||||
throws Exception {
|
RandomDistribution.DiscreteRNG zipf =
|
||||||
RandomDistribution.DiscreteRNG rng =
|
|
||||||
new RandomDistribution.Zipf(new Random(), 0, Byte.MAX_VALUE, sigma);
|
new RandomDistribution.Zipf(new Random(), 0, Byte.MAX_VALUE, sigma);
|
||||||
final byte[][] input = new byte[1][VERY_LARGE_SIZE];
|
final byte[][] input = new byte[1][VERY_LARGE_SIZE];
|
||||||
fill(rng, input);
|
for (int i = 0; i < VERY_LARGE_SIZE; i++) {
|
||||||
|
input[0][i] = (byte)zipf.nextInt();
|
||||||
|
}
|
||||||
codecTest(codec, input);
|
codecTest(codec, input);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected static void fill(RandomDistribution.DiscreteRNG rng, byte[][] input) {
|
|
||||||
for (int i = 0; i < input.length; i++) {
|
|
||||||
fill(rng, input[i]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected static void fill(RandomDistribution.DiscreteRNG rng, byte[] input) {
|
|
||||||
for (int i = 0; i < input.length; i++) {
|
|
||||||
input[i] = (byte) rng.nextInt();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,7 +22,6 @@ import java.io.OutputStream;
|
||||||
import org.apache.hadoop.conf.Configurable;
|
import org.apache.hadoop.conf.Configurable;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.hbase.io.compress.DictionaryCache;
|
|
||||||
import org.apache.hadoop.io.compress.BlockCompressorStream;
|
import org.apache.hadoop.io.compress.BlockCompressorStream;
|
||||||
import org.apache.hadoop.io.compress.BlockDecompressorStream;
|
import org.apache.hadoop.io.compress.BlockDecompressorStream;
|
||||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||||
|
@ -42,7 +41,6 @@ public class ZstdCodec implements Configurable, CompressionCodec {
|
||||||
|
|
||||||
public static final String ZSTD_LEVEL_KEY = "hbase.io.compress.zstd.level";
|
public static final String ZSTD_LEVEL_KEY = "hbase.io.compress.zstd.level";
|
||||||
public static final String ZSTD_BUFFER_SIZE_KEY = "hbase.io.compress.zstd.buffersize";
|
public static final String ZSTD_BUFFER_SIZE_KEY = "hbase.io.compress.zstd.buffersize";
|
||||||
public static final String ZSTD_DICTIONARY_KEY = "hbase.io.compress.zstd.dictionary";
|
|
||||||
|
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
|
|
||||||
|
@ -62,12 +60,12 @@ public class ZstdCodec implements Configurable, CompressionCodec {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Compressor createCompressor() {
|
public Compressor createCompressor() {
|
||||||
return new ZstdCompressor(getLevel(conf), getBufferSize(conf), getDictionary(conf));
|
return new ZstdCompressor(getLevel(conf), getBufferSize(conf));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Decompressor createDecompressor() {
|
public Decompressor createDecompressor() {
|
||||||
return new ZstdDecompressor(getBufferSize(conf), getDictionary(conf));
|
return new ZstdDecompressor(getBufferSize(conf));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -125,13 +123,4 @@ public class ZstdCodec implements Configurable, CompressionCodec {
|
||||||
return size > 0 ? size : 256 * 1024; // Don't change this default
|
return size > 0 ? size : 256 * 1024; // Don't change this default
|
||||||
}
|
}
|
||||||
|
|
||||||
static byte[] getDictionary(final Configuration conf) {
|
|
||||||
String path = conf.get(ZSTD_DICTIONARY_KEY);
|
|
||||||
try {
|
|
||||||
return DictionaryCache.getDictionary(conf, path);
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new RuntimeException("Unable to load dictionary at " + path, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,7 +26,6 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import com.github.luben.zstd.Zstd;
|
import com.github.luben.zstd.Zstd;
|
||||||
import com.github.luben.zstd.ZstdDictCompress;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Hadoop compressor glue for zstd-jni.
|
* Hadoop compressor glue for zstd-jni.
|
||||||
|
@ -39,21 +38,13 @@ public class ZstdCompressor implements CanReinit, Compressor {
|
||||||
protected ByteBuffer inBuf, outBuf;
|
protected ByteBuffer inBuf, outBuf;
|
||||||
protected boolean finish, finished;
|
protected boolean finish, finished;
|
||||||
protected long bytesRead, bytesWritten;
|
protected long bytesRead, bytesWritten;
|
||||||
protected ZstdDictCompress dict;
|
|
||||||
|
|
||||||
ZstdCompressor(final int level, final int bufferSize, final byte[] dictionary) {
|
ZstdCompressor(final int level, final int bufferSize) {
|
||||||
this.level = level;
|
this.level = level;
|
||||||
this.bufferSize = bufferSize;
|
this.bufferSize = bufferSize;
|
||||||
this.inBuf = ByteBuffer.allocateDirect(bufferSize);
|
this.inBuf = ByteBuffer.allocateDirect(bufferSize);
|
||||||
this.outBuf = ByteBuffer.allocateDirect(bufferSize);
|
this.outBuf = ByteBuffer.allocateDirect(bufferSize);
|
||||||
this.outBuf.position(bufferSize);
|
this.outBuf.position(bufferSize);
|
||||||
if (dictionary != null) {
|
|
||||||
this.dict = new ZstdDictCompress(dictionary, level);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ZstdCompressor(final int level, final int bufferSize) {
|
|
||||||
this(level, bufferSize, null);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -81,12 +72,7 @@ public class ZstdCompressor implements CanReinit, Compressor {
|
||||||
} else {
|
} else {
|
||||||
outBuf.clear();
|
outBuf.clear();
|
||||||
}
|
}
|
||||||
int written;
|
int written = Zstd.compress(outBuf, inBuf, level);
|
||||||
if (dict != null) {
|
|
||||||
written = Zstd.compress(outBuf, inBuf, dict);
|
|
||||||
} else {
|
|
||||||
written = Zstd.compress(outBuf, inBuf, level);
|
|
||||||
}
|
|
||||||
bytesWritten += written;
|
bytesWritten += written;
|
||||||
inBuf.clear();
|
inBuf.clear();
|
||||||
LOG.trace("compress: compressed {} -> {} (level {})", uncompressed, written, level);
|
LOG.trace("compress: compressed {} -> {} (level {})", uncompressed, written, level);
|
||||||
|
@ -145,11 +131,6 @@ public class ZstdCompressor implements CanReinit, Compressor {
|
||||||
if (conf != null) {
|
if (conf != null) {
|
||||||
// Level might have changed
|
// Level might have changed
|
||||||
level = ZstdCodec.getLevel(conf);
|
level = ZstdCodec.getLevel(conf);
|
||||||
// Dictionary may have changed
|
|
||||||
byte[] b = ZstdCodec.getDictionary(conf);
|
|
||||||
if (b != null) {
|
|
||||||
dict = new ZstdDictCompress(b, level);
|
|
||||||
}
|
|
||||||
// Buffer size might have changed
|
// Buffer size might have changed
|
||||||
int newBufferSize = ZstdCodec.getBufferSize(conf);
|
int newBufferSize = ZstdCodec.getBufferSize(conf);
|
||||||
if (bufferSize != newBufferSize) {
|
if (bufferSize != newBufferSize) {
|
||||||
|
|
|
@ -26,7 +26,6 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import com.github.luben.zstd.Zstd;
|
import com.github.luben.zstd.Zstd;
|
||||||
import com.github.luben.zstd.ZstdDictDecompress;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Hadoop decompressor glue for zstd-java.
|
* Hadoop decompressor glue for zstd-java.
|
||||||
|
@ -39,20 +38,12 @@ public class ZstdDecompressor implements CanReinit, Decompressor {
|
||||||
protected int bufferSize;
|
protected int bufferSize;
|
||||||
protected int inLen;
|
protected int inLen;
|
||||||
protected boolean finished;
|
protected boolean finished;
|
||||||
protected ZstdDictDecompress dict;
|
|
||||||
|
|
||||||
ZstdDecompressor(final int bufferSize, final byte[] dictionary) {
|
ZstdDecompressor(final int bufferSize) {
|
||||||
this.bufferSize = bufferSize;
|
this.bufferSize = bufferSize;
|
||||||
this.inBuf = ByteBuffer.allocateDirect(bufferSize);
|
this.inBuf = ByteBuffer.allocateDirect(bufferSize);
|
||||||
this.outBuf = ByteBuffer.allocateDirect(bufferSize);
|
this.outBuf = ByteBuffer.allocateDirect(bufferSize);
|
||||||
this.outBuf.position(bufferSize);
|
this.outBuf.position(bufferSize);
|
||||||
if (dictionary != null) {
|
|
||||||
this.dict = new ZstdDictDecompress(dictionary);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ZstdDecompressor(final int bufferSize) {
|
|
||||||
this(bufferSize, null);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -69,11 +60,7 @@ public class ZstdDecompressor implements CanReinit, Decompressor {
|
||||||
inLen -= remaining;
|
inLen -= remaining;
|
||||||
outBuf.clear();
|
outBuf.clear();
|
||||||
int written;
|
int written;
|
||||||
if (dict != null) {
|
written = Zstd.decompress(outBuf, inBuf);
|
||||||
written = Zstd.decompress(outBuf, inBuf, dict);
|
|
||||||
} else {
|
|
||||||
written = Zstd.decompress(outBuf, inBuf);
|
|
||||||
}
|
|
||||||
inBuf.clear();
|
inBuf.clear();
|
||||||
LOG.trace("decompress: decompressed {} -> {}", remaining, written);
|
LOG.trace("decompress: decompressed {} -> {}", remaining, written);
|
||||||
outBuf.flip();
|
outBuf.flip();
|
||||||
|
@ -130,7 +117,7 @@ public class ZstdDecompressor implements CanReinit, Decompressor {
|
||||||
@Override
|
@Override
|
||||||
public void setDictionary(final byte[] b, final int off, final int len) {
|
public void setDictionary(final byte[] b, final int off, final int len) {
|
||||||
LOG.trace("setDictionary: off={} len={}", off, len);
|
LOG.trace("setDictionary: off={} len={}", off, len);
|
||||||
this.dict = new ZstdDictDecompress(b, off, len);
|
throw new UnsupportedOperationException("setDictionary not supported");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -156,11 +143,6 @@ public class ZstdDecompressor implements CanReinit, Decompressor {
|
||||||
public void reinit(final Configuration conf) {
|
public void reinit(final Configuration conf) {
|
||||||
LOG.trace("reinit");
|
LOG.trace("reinit");
|
||||||
if (conf != null) {
|
if (conf != null) {
|
||||||
// Dictionary may have changed
|
|
||||||
byte[] b = ZstdCodec.getDictionary(conf);
|
|
||||||
if (b != null) {
|
|
||||||
dict = new ZstdDictDecompress(b);
|
|
||||||
}
|
|
||||||
// Buffer size might have changed
|
// Buffer size might have changed
|
||||||
int newBufferSize = ZstdCodec.getBufferSize(conf);
|
int newBufferSize = ZstdCodec.getBufferSize(conf);
|
||||||
if (bufferSize != newBufferSize) {
|
if (bufferSize != newBufferSize) {
|
||||||
|
|
|
@ -16,20 +16,11 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.io.compress.zstd;
|
package org.apache.hadoop.hbase.io.compress.zstd;
|
||||||
|
|
||||||
import static org.junit.Assert.assertNotNull;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.FileOutputStream;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Random;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
|
||||||
import org.apache.hadoop.hbase.io.compress.CompressionTestBase;
|
import org.apache.hadoop.hbase.io.compress.CompressionTestBase;
|
||||||
import org.apache.hadoop.hbase.io.compress.DictionaryCache;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
import org.apache.hadoop.hbase.util.RandomDistribution;
|
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
@ -42,20 +33,20 @@ public class TestZstdCodec extends CompressionTestBase {
|
||||||
HBaseClassTestRule.forClass(TestZstdCodec.class);
|
HBaseClassTestRule.forClass(TestZstdCodec.class);
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testZstdCodecSmall() throws Exception {
|
public void testzstdCodecSmall() throws Exception {
|
||||||
codecSmallTest(new ZstdCodec());
|
codecSmallTest(new ZstdCodec());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testZstdCodecLarge() throws Exception {
|
public void testzstdCodecLarge() throws Exception {
|
||||||
codecLargeTest(new ZstdCodec(), 1.1); // poor compressability
|
codecLargeTest(new ZstdCodec(), 1.1); // poor compressability
|
||||||
codecLargeTest(new ZstdCodec(), 2);
|
codecLargeTest(new ZstdCodec(), 2);
|
||||||
codecLargeTest(new ZstdCodec(), 10); // very high compressability
|
codecLargeTest(new ZstdCodec(), 10); // very high compressability
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testZstdCodecVeryLarge() throws Exception {
|
public void testzstdCodecVeryLarge() throws Exception {
|
||||||
Configuration conf = HBaseConfiguration.create();
|
Configuration conf = new Configuration();
|
||||||
// ZStandard levels range from 1 to 22.
|
// ZStandard levels range from 1 to 22.
|
||||||
// Level 22 might take up to a minute to complete. 3 is the Hadoop default, and will be fast.
|
// Level 22 might take up to a minute to complete. 3 is the Hadoop default, and will be fast.
|
||||||
conf.setInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY, 3);
|
conf.setInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY, 3);
|
||||||
|
@ -64,55 +55,4 @@ public class TestZstdCodec extends CompressionTestBase {
|
||||||
codecVeryLargeTest(codec, 3); // like text
|
codecVeryLargeTest(codec, 3); // like text
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testZstdCodecWithDictionary() throws Exception {
|
|
||||||
// zstd.test.data compressed with zstd.test.dict at level 3 will produce a result of
|
|
||||||
// 365533 bytes
|
|
||||||
final int expectedCompressedSize = 365533;
|
|
||||||
Configuration conf = HBaseConfiguration.create();
|
|
||||||
conf.setInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY, 3);
|
|
||||||
// Configure for dictionary available in test resources
|
|
||||||
final String dictionaryPath = DictionaryCache.RESOURCE_SCHEME + "zstd.test.dict";
|
|
||||||
conf.set(ZstdCodec.ZSTD_DICTIONARY_KEY, dictionaryPath);
|
|
||||||
// Load test data from test resources
|
|
||||||
// This will throw an IOException if the test data cannot be loaded
|
|
||||||
final byte[] testData = DictionaryCache.loadFromResource(conf,
|
|
||||||
DictionaryCache.RESOURCE_SCHEME + "zstd.test.data", /* maxSize */ 1024*1024);
|
|
||||||
assertNotNull("Failed to load test data", testData);
|
|
||||||
// Run the test
|
|
||||||
// This will throw an IOException of some kind if there is a problem loading or using the
|
|
||||||
// dictionary.
|
|
||||||
ZstdCodec codec = new ZstdCodec();
|
|
||||||
codec.setConf(conf);
|
|
||||||
codecTest(codec, new byte[][] { testData }, expectedCompressedSize);
|
|
||||||
// Assert that the dictionary was actually loaded
|
|
||||||
assertTrue("Dictionary was not loaded by codec", DictionaryCache.contains(dictionaryPath));
|
|
||||||
}
|
|
||||||
|
|
||||||
//
|
|
||||||
// For generating the test data in src/test/resources/
|
|
||||||
//
|
|
||||||
|
|
||||||
public static void main(String[] args) throws IOException {
|
|
||||||
// Write 1000 1k blocks for training to the specified file
|
|
||||||
// Train with:
|
|
||||||
// zstd --train-fastcover=k=32,b=8 -B1024 -o <dictionary_file> <input_file>
|
|
||||||
if (args.length < 1) {
|
|
||||||
System.err.println("Usage: TestZstdCodec <outFile>");
|
|
||||||
System.exit(-1);
|
|
||||||
}
|
|
||||||
final RandomDistribution.DiscreteRNG rng =
|
|
||||||
new RandomDistribution.Zipf(new Random(), 0, Byte.MAX_VALUE, 2);
|
|
||||||
final File outFile = new File(args[0]);
|
|
||||||
final byte[] buffer = new byte[1024];
|
|
||||||
System.out.println("Generating " + outFile);
|
|
||||||
try (FileOutputStream os = new FileOutputStream(outFile)) {
|
|
||||||
for (int i = 0; i < 1000; i++) {
|
|
||||||
fill(rng, buffer);
|
|
||||||
os.write(buffer);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
System.out.println("Done");
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Binary file not shown.
Binary file not shown.
Loading…
Reference in New Issue