HBASE-26353 Support loadable dictionaries in hbase-compression-zstd (#3787)
ZStandard supports initialization of compressors and decompressors with a precomputed dictionary, which can dramatically improve and speed up compression of tables with small values. For more details, please see The Case For Small Data Compression https://github.com/facebook/zstd#the-case-for-small-data-compression Signed-off-by: Duo Zhang <zhangduo@apache.org> Conflicts: hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCodec.java
This commit is contained in:
parent
82fe021b5f
commit
f2c58fcf68
|
@ -19,7 +19,9 @@ package org.apache.hadoop.hbase.io.compress;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class CompressionUtil {
|
public final class CompressionUtil {
|
||||||
|
|
||||||
|
private CompressionUtil() { }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Round up to the next power of two, unless the value would become negative (ints
|
* Round up to the next power of two, unless the value would become negative (ints
|
||||||
|
|
|
@ -0,0 +1,164 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.io.compress;
|
||||||
|
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A utility class for managing compressor/decompressor dictionary loading and caching of load
|
||||||
|
* results. Useful for any codec that can support changing dictionaries at runtime,
|
||||||
|
* such as ZStandard.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public final class DictionaryCache {
|
||||||
|
|
||||||
|
public static final String DICTIONARY_MAX_SIZE_KEY = "hbase.io.compress.dictionary.max.size";
|
||||||
|
public static final int DEFAULT_DICTIONARY_MAX_SIZE = 10 * 1024 * 1024;
|
||||||
|
public static final String RESOURCE_SCHEME = "resource://";
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(DictionaryCache.class);
|
||||||
|
private static LoadingCache<String, byte[]> CACHE;
|
||||||
|
|
||||||
|
private DictionaryCache() { }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Load a dictionary or return a previously cached load.
|
||||||
|
* @param conf configuration
|
||||||
|
* @param path the hadoop Path where the dictionary is located, as a String
|
||||||
|
* @return the dictionary bytes if successful, null otherwise
|
||||||
|
*/
|
||||||
|
public static byte[] getDictionary(final Configuration conf, final String path)
|
||||||
|
throws IOException {
|
||||||
|
if (path == null || path.isEmpty()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
// Create the dictionary loading cache if we haven't already
|
||||||
|
if (CACHE == null) {
|
||||||
|
synchronized (DictionaryCache.class) {
|
||||||
|
if (CACHE == null) {
|
||||||
|
final int maxSize = conf.getInt(DICTIONARY_MAX_SIZE_KEY, DEFAULT_DICTIONARY_MAX_SIZE);
|
||||||
|
CACHE = CacheBuilder.newBuilder()
|
||||||
|
.maximumSize(100)
|
||||||
|
.expireAfterAccess(10, TimeUnit.MINUTES)
|
||||||
|
.build(
|
||||||
|
new CacheLoader<String, byte[]>() {
|
||||||
|
@Override
|
||||||
|
public byte[] load(String s) throws Exception {
|
||||||
|
byte[] bytes;
|
||||||
|
if (path.startsWith(RESOURCE_SCHEME)) {
|
||||||
|
bytes = loadFromResource(conf, path, maxSize);
|
||||||
|
} else {
|
||||||
|
bytes = loadFromHadoopFs(conf, path, maxSize);
|
||||||
|
}
|
||||||
|
LOG.info("Loaded dictionary from {} (size {})", s, bytes.length);
|
||||||
|
return bytes;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get or load the dictionary for the given path
|
||||||
|
try {
|
||||||
|
return CACHE.get(path);
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Visible for testing
|
||||||
|
public static byte[] loadFromResource(final Configuration conf, final String s,
|
||||||
|
final int maxSize) throws IOException {
|
||||||
|
if (!s.startsWith(RESOURCE_SCHEME)) {
|
||||||
|
throw new IOException("Path does not start with " + RESOURCE_SCHEME);
|
||||||
|
}
|
||||||
|
final String path = s.substring(RESOURCE_SCHEME.length(), s.length());
|
||||||
|
LOG.info("Loading resource {}", path);
|
||||||
|
final InputStream in = DictionaryCache.class.getClassLoader().getResourceAsStream(path);
|
||||||
|
if (in == null) {
|
||||||
|
throw new FileNotFoundException("Resource " + path + " not found");
|
||||||
|
}
|
||||||
|
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||||
|
try {
|
||||||
|
final byte[] buffer = new byte[8192];
|
||||||
|
int n, len = 0;
|
||||||
|
do {
|
||||||
|
n = in.read(buffer);
|
||||||
|
if (n > 0) {
|
||||||
|
len += n;
|
||||||
|
if (len > maxSize) {
|
||||||
|
throw new IOException("Dictionary " + s + " is too large, limit=" + maxSize);
|
||||||
|
}
|
||||||
|
baos.write(buffer, 0, n);
|
||||||
|
}
|
||||||
|
} while (n > 0);
|
||||||
|
} finally {
|
||||||
|
in.close();
|
||||||
|
}
|
||||||
|
return baos.toByteArray();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static byte[] loadFromHadoopFs(final Configuration conf, final String s,
|
||||||
|
final int maxSize) throws IOException {
|
||||||
|
final Path path = new Path(s);
|
||||||
|
final FileSystem fs = FileSystem.get(path.toUri(), conf);
|
||||||
|
LOG.info("Loading file {}", path);
|
||||||
|
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||||
|
final FSDataInputStream in = fs.open(path);
|
||||||
|
try {
|
||||||
|
final byte[] buffer = new byte[8192];
|
||||||
|
int n, len = 0;
|
||||||
|
do {
|
||||||
|
n = in.read(buffer);
|
||||||
|
if (n > 0) {
|
||||||
|
len += n;
|
||||||
|
if (len > maxSize) {
|
||||||
|
throw new IOException("Dictionary " + s + " is too large, limit=" + maxSize);
|
||||||
|
}
|
||||||
|
baos.write(buffer, 0, n);
|
||||||
|
}
|
||||||
|
} while (n > 0);
|
||||||
|
} finally {
|
||||||
|
in.close();
|
||||||
|
}
|
||||||
|
return baos.toByteArray();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Visible for testing
|
||||||
|
public static boolean contains(String dictionaryPath) {
|
||||||
|
if (CACHE != null) {
|
||||||
|
return CACHE.asMap().containsKey(dictionaryPath);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -17,12 +17,10 @@
|
||||||
package org.apache.hadoop.hbase.io.compress;
|
package org.apache.hadoop.hbase.io.compress;
|
||||||
|
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configurable;
|
import org.apache.hadoop.conf.Configurable;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
@ -31,6 +29,8 @@ import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||||
import org.apache.hadoop.io.compress.CompressionInputStream;
|
import org.apache.hadoop.io.compress.CompressionInputStream;
|
||||||
import org.apache.hadoop.io.compress.CompressionOutputStream;
|
import org.apache.hadoop.io.compress.CompressionOutputStream;
|
||||||
|
import org.apache.hadoop.io.compress.Compressor;
|
||||||
|
import org.apache.hadoop.io.compress.Decompressor;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -39,11 +39,11 @@ public class CompressionTestBase {
|
||||||
|
|
||||||
protected static final Logger LOG = LoggerFactory.getLogger(CompressionTestBase.class);
|
protected static final Logger LOG = LoggerFactory.getLogger(CompressionTestBase.class);
|
||||||
|
|
||||||
static final int LARGE_SIZE = 10 * 1024 * 1024;
|
protected static final int LARGE_SIZE = 10 * 1024 * 1024;
|
||||||
static final int VERY_LARGE_SIZE = 100 * 1024 * 1024;
|
protected static final int VERY_LARGE_SIZE = 100 * 1024 * 1024;
|
||||||
static final int BLOCK_SIZE = 4096;
|
protected static final int BLOCK_SIZE = 4096;
|
||||||
|
|
||||||
static final byte[] SMALL_INPUT;
|
protected static final byte[] SMALL_INPUT;
|
||||||
static {
|
static {
|
||||||
// 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987, 1597
|
// 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987, 1597
|
||||||
SMALL_INPUT = new byte[1+1+2+3+5+8+13+21+34+55+89+144+233+377+610+987+1597];
|
SMALL_INPUT = new byte[1+1+2+3+5+8+13+21+34+55+89+144+233+377+610+987+1597];
|
||||||
|
@ -67,15 +67,20 @@ public class CompressionTestBase {
|
||||||
Arrays.fill(SMALL_INPUT, off, (off+=1597), (byte)'Q');
|
Arrays.fill(SMALL_INPUT, off, (off+=1597), (byte)'Q');
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void codecTest(final CompressionCodec codec, final byte[][] input)
|
protected void codecTest(final CompressionCodec codec, final byte[][] input) throws Exception {
|
||||||
throws Exception {
|
codecTest(codec, input, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void codecTest(final CompressionCodec codec, final byte[][] input,
|
||||||
|
final Integer expectedCompressedSize) throws Exception {
|
||||||
// We do this in Compression.java
|
// We do this in Compression.java
|
||||||
((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024);
|
((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024);
|
||||||
// Compress
|
// Compress
|
||||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
|
||||||
CompressionOutputStream out = codec.createOutputStream(baos);
|
|
||||||
int inLen = 0;
|
|
||||||
long start = EnvironmentEdgeManager.currentTime();
|
long start = EnvironmentEdgeManager.currentTime();
|
||||||
|
Compressor compressor = codec.createCompressor();
|
||||||
|
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||||
|
CompressionOutputStream out = codec.createOutputStream(baos, compressor);
|
||||||
|
int inLen = 0;
|
||||||
for (int i = 0; i < input.length; i++) {
|
for (int i = 0; i < input.length; i++) {
|
||||||
out.write(input[i]);
|
out.write(input[i]);
|
||||||
inLen += input[i].length;
|
inLen += input[i].length;
|
||||||
|
@ -85,9 +90,15 @@ public class CompressionTestBase {
|
||||||
final byte[] compressed = baos.toByteArray();
|
final byte[] compressed = baos.toByteArray();
|
||||||
LOG.info("{} compressed {} bytes to {} bytes in {} ms", codec.getClass().getSimpleName(),
|
LOG.info("{} compressed {} bytes to {} bytes in {} ms", codec.getClass().getSimpleName(),
|
||||||
inLen, compressed.length, end - start);
|
inLen, compressed.length, end - start);
|
||||||
|
if (expectedCompressedSize != null) {
|
||||||
|
assertTrue("Expected compressed size does not match: (expected=" + expectedCompressedSize +
|
||||||
|
", actual=" + compressed.length + ")", expectedCompressedSize == compressed.length);
|
||||||
|
}
|
||||||
// Decompress
|
// Decompress
|
||||||
final byte[] plain = new byte[inLen];
|
final byte[] plain = new byte[inLen];
|
||||||
CompressionInputStream in = codec.createInputStream(new ByteArrayInputStream(compressed));
|
Decompressor decompressor = codec.createDecompressor();
|
||||||
|
CompressionInputStream in = codec.createInputStream(new ByteArrayInputStream(compressed),
|
||||||
|
decompressor);
|
||||||
start = EnvironmentEdgeManager.currentTime();
|
start = EnvironmentEdgeManager.currentTime();
|
||||||
IOUtils.readFully(in, plain, 0, plain.length);
|
IOUtils.readFully(in, plain, 0, plain.length);
|
||||||
in.close();
|
in.close();
|
||||||
|
@ -113,29 +124,37 @@ public class CompressionTestBase {
|
||||||
/**
|
/**
|
||||||
* Test with a large input (1MB) divided into blocks of 4KB.
|
* Test with a large input (1MB) divided into blocks of 4KB.
|
||||||
*/
|
*/
|
||||||
protected void codecLargeTest(final CompressionCodec codec, final double sigma) throws Exception {
|
protected void codecLargeTest(final CompressionCodec codec, final double sigma)
|
||||||
RandomDistribution.DiscreteRNG zipf =
|
throws Exception {
|
||||||
|
RandomDistribution.DiscreteRNG rng =
|
||||||
new RandomDistribution.Zipf(new Random(), 0, Byte.MAX_VALUE, sigma);
|
new RandomDistribution.Zipf(new Random(), 0, Byte.MAX_VALUE, sigma);
|
||||||
final byte[][] input = new byte[LARGE_SIZE/BLOCK_SIZE][BLOCK_SIZE];
|
final byte[][] input = new byte[LARGE_SIZE/BLOCK_SIZE][BLOCK_SIZE];
|
||||||
for (int i = 0; i < input.length; i++) {
|
fill(rng, input);
|
||||||
for (int j = 0; j < input[i].length; j++) {
|
|
||||||
input[i][j] = (byte)zipf.nextInt();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
codecTest(codec, input);
|
codecTest(codec, input);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test with a very large input (100MB) as a single input buffer.
|
* Test with a very large input (100MB) as a single input buffer.
|
||||||
*/
|
*/
|
||||||
protected void codecVeryLargeTest(final CompressionCodec codec, final double sigma) throws Exception {
|
protected void codecVeryLargeTest(final CompressionCodec codec, final double sigma)
|
||||||
RandomDistribution.DiscreteRNG zipf =
|
throws Exception {
|
||||||
|
RandomDistribution.DiscreteRNG rng =
|
||||||
new RandomDistribution.Zipf(new Random(), 0, Byte.MAX_VALUE, sigma);
|
new RandomDistribution.Zipf(new Random(), 0, Byte.MAX_VALUE, sigma);
|
||||||
final byte[][] input = new byte[1][VERY_LARGE_SIZE];
|
final byte[][] input = new byte[1][VERY_LARGE_SIZE];
|
||||||
for (int i = 0; i < VERY_LARGE_SIZE; i++) {
|
fill(rng, input);
|
||||||
input[0][i] = (byte)zipf.nextInt();
|
|
||||||
}
|
|
||||||
codecTest(codec, input);
|
codecTest(codec, input);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected static void fill(RandomDistribution.DiscreteRNG rng, byte[][] input) {
|
||||||
|
for (int i = 0; i < input.length; i++) {
|
||||||
|
fill(rng, input[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static void fill(RandomDistribution.DiscreteRNG rng, byte[] input) {
|
||||||
|
for (int i = 0; i < input.length; i++) {
|
||||||
|
input[i] = (byte) rng.nextInt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,10 +19,12 @@ package org.apache.hadoop.hbase.io.compress.zstd;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.ByteOrder;
|
||||||
import org.apache.hadoop.conf.Configurable;
|
import org.apache.hadoop.conf.Configurable;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
|
import org.apache.hadoop.hbase.io.compress.DictionaryCache;
|
||||||
import org.apache.hadoop.io.compress.BlockCompressorStream;
|
import org.apache.hadoop.io.compress.BlockCompressorStream;
|
||||||
import org.apache.hadoop.io.compress.BlockDecompressorStream;
|
import org.apache.hadoop.io.compress.BlockDecompressorStream;
|
||||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||||
|
@ -42,6 +44,7 @@ public class ZstdCodec implements Configurable, CompressionCodec {
|
||||||
|
|
||||||
public static final String ZSTD_LEVEL_KEY = "hbase.io.compress.zstd.level";
|
public static final String ZSTD_LEVEL_KEY = "hbase.io.compress.zstd.level";
|
||||||
public static final String ZSTD_BUFFER_SIZE_KEY = "hbase.io.compress.zstd.buffersize";
|
public static final String ZSTD_BUFFER_SIZE_KEY = "hbase.io.compress.zstd.buffersize";
|
||||||
|
public static final String ZSTD_DICTIONARY_KEY = "hbase.io.compress.zstd.dictionary";
|
||||||
|
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
|
|
||||||
|
@ -61,12 +64,12 @@ public class ZstdCodec implements Configurable, CompressionCodec {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Compressor createCompressor() {
|
public Compressor createCompressor() {
|
||||||
return new ZstdCompressor(getLevel(conf), getBufferSize(conf));
|
return new ZstdCompressor(getLevel(conf), getBufferSize(conf), getDictionary(conf));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Decompressor createDecompressor() {
|
public Decompressor createDecompressor() {
|
||||||
return new ZstdDecompressor(getBufferSize(conf));
|
return new ZstdDecompressor(getBufferSize(conf), getDictionary(conf));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -124,4 +127,31 @@ public class ZstdCodec implements Configurable, CompressionCodec {
|
||||||
return size > 0 ? size : 256 * 1024; // Don't change this default
|
return size > 0 ? size : 256 * 1024; // Don't change this default
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static byte[] getDictionary(final Configuration conf) {
|
||||||
|
String path = conf.get(ZSTD_DICTIONARY_KEY);
|
||||||
|
try {
|
||||||
|
return DictionaryCache.getDictionary(conf, path);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException("Unable to load dictionary at " + path, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Zstandard dictionaries begin with a 32-bit magic number, 0xEC30A437 in little-endian
|
||||||
|
// format, followed by a 32-bit identifier also in little-endian format.
|
||||||
|
// Reference: https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md
|
||||||
|
|
||||||
|
static boolean isDictionary(byte[] dictionary) {
|
||||||
|
return (dictionary[0] == (byte)0x37 &&
|
||||||
|
dictionary[1] == (byte)0xA4 &&
|
||||||
|
dictionary[2] == (byte)0x30 &&
|
||||||
|
dictionary[3] == (byte)0xEC);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int getDictionaryId(byte[] dictionary) {
|
||||||
|
if (!isDictionary(dictionary)) {
|
||||||
|
throw new IllegalArgumentException("Not a ZStandard dictionary");
|
||||||
|
}
|
||||||
|
return ByteBuffer.wrap(dictionary, 4, 4).order(ByteOrder.LITTLE_ENDIAN).getInt();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import com.github.luben.zstd.Zstd;
|
import com.github.luben.zstd.Zstd;
|
||||||
|
import com.github.luben.zstd.ZstdDictCompress;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Hadoop compressor glue for zstd-jni.
|
* Hadoop compressor glue for zstd-jni.
|
||||||
|
@ -40,13 +41,23 @@ public class ZstdCompressor implements CanReinit, Compressor {
|
||||||
protected ByteBuffer inBuf, outBuf;
|
protected ByteBuffer inBuf, outBuf;
|
||||||
protected boolean finish, finished;
|
protected boolean finish, finished;
|
||||||
protected long bytesRead, bytesWritten;
|
protected long bytesRead, bytesWritten;
|
||||||
|
protected int dictId;
|
||||||
|
protected ZstdDictCompress dict;
|
||||||
|
|
||||||
ZstdCompressor(final int level, final int bufferSize) {
|
ZstdCompressor(final int level, final int bufferSize, final byte[] dictionary) {
|
||||||
this.level = level;
|
this.level = level;
|
||||||
this.bufferSize = bufferSize;
|
this.bufferSize = bufferSize;
|
||||||
this.inBuf = ByteBuffer.allocateDirect(bufferSize);
|
this.inBuf = ByteBuffer.allocateDirect(bufferSize);
|
||||||
this.outBuf = ByteBuffer.allocateDirect(bufferSize);
|
this.outBuf = ByteBuffer.allocateDirect(bufferSize);
|
||||||
this.outBuf.position(bufferSize);
|
this.outBuf.position(bufferSize);
|
||||||
|
if (dictionary != null) {
|
||||||
|
this.dictId = ZstdCodec.getDictionaryId(dictionary);
|
||||||
|
this.dict = new ZstdDictCompress(dictionary, level);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ZstdCompressor(final int level, final int bufferSize) {
|
||||||
|
this(level, bufferSize, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -74,7 +85,12 @@ public class ZstdCompressor implements CanReinit, Compressor {
|
||||||
} else {
|
} else {
|
||||||
outBuf.clear();
|
outBuf.clear();
|
||||||
}
|
}
|
||||||
int written = Zstd.compress(outBuf, inBuf, level);
|
int written;
|
||||||
|
if (dict != null) {
|
||||||
|
written = Zstd.compress(outBuf, inBuf, dict);
|
||||||
|
} else {
|
||||||
|
written = Zstd.compress(outBuf, inBuf, level);
|
||||||
|
}
|
||||||
bytesWritten += written;
|
bytesWritten += written;
|
||||||
inBuf.clear();
|
inBuf.clear();
|
||||||
LOG.trace("compress: compressed {} -> {} (level {})", uncompressed, written, level);
|
LOG.trace("compress: compressed {} -> {} (level {})", uncompressed, written, level);
|
||||||
|
@ -132,13 +148,33 @@ public class ZstdCompressor implements CanReinit, Compressor {
|
||||||
LOG.trace("reinit");
|
LOG.trace("reinit");
|
||||||
if (conf != null) {
|
if (conf != null) {
|
||||||
// Level might have changed
|
// Level might have changed
|
||||||
level = ZstdCodec.getLevel(conf);
|
boolean levelChanged = false;
|
||||||
|
int newLevel = ZstdCodec.getLevel(conf);
|
||||||
|
if (level != newLevel) {
|
||||||
|
LOG.trace("Level changed, was {} now {}", level, newLevel);
|
||||||
|
level = newLevel;
|
||||||
|
levelChanged = true;
|
||||||
|
}
|
||||||
|
// Dictionary may have changed
|
||||||
|
byte[] b = ZstdCodec.getDictionary(conf);
|
||||||
|
if (b != null) {
|
||||||
|
// Don't casually create dictionary objects; they consume native memory
|
||||||
|
int thisDictId = ZstdCodec.getDictionaryId(b);
|
||||||
|
if (dict == null || dictId != thisDictId || levelChanged) {
|
||||||
|
dictId = thisDictId;
|
||||||
|
dict = new ZstdDictCompress(b, level);
|
||||||
|
LOG.trace("Reloaded dictionary, new id is {}", dictId);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
dict = null;
|
||||||
|
}
|
||||||
// Buffer size might have changed
|
// Buffer size might have changed
|
||||||
int newBufferSize = ZstdCodec.getBufferSize(conf);
|
int newBufferSize = ZstdCodec.getBufferSize(conf);
|
||||||
if (bufferSize != newBufferSize) {
|
if (bufferSize != newBufferSize) {
|
||||||
bufferSize = newBufferSize;
|
bufferSize = newBufferSize;
|
||||||
this.inBuf = ByteBuffer.allocateDirect(bufferSize);
|
this.inBuf = ByteBuffer.allocateDirect(bufferSize);
|
||||||
this.outBuf = ByteBuffer.allocateDirect(bufferSize);
|
this.outBuf = ByteBuffer.allocateDirect(bufferSize);
|
||||||
|
LOG.trace("Resized buffers, new size is {}", bufferSize);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
reset();
|
reset();
|
||||||
|
@ -182,7 +218,7 @@ public class ZstdCompressor implements CanReinit, Compressor {
|
||||||
|
|
||||||
// Package private
|
// Package private
|
||||||
|
|
||||||
int maxCompressedLength(final int len) {
|
static int maxCompressedLength(final int len) {
|
||||||
return (int) Zstd.compressBound(len);
|
return (int) Zstd.compressBound(len);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import com.github.luben.zstd.Zstd;
|
import com.github.luben.zstd.Zstd;
|
||||||
|
import com.github.luben.zstd.ZstdDictDecompress;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Hadoop decompressor glue for zstd-java.
|
* Hadoop decompressor glue for zstd-java.
|
||||||
|
@ -38,12 +39,22 @@ public class ZstdDecompressor implements CanReinit, Decompressor {
|
||||||
protected int bufferSize;
|
protected int bufferSize;
|
||||||
protected int inLen;
|
protected int inLen;
|
||||||
protected boolean finished;
|
protected boolean finished;
|
||||||
|
protected int dictId;
|
||||||
|
protected ZstdDictDecompress dict;
|
||||||
|
|
||||||
ZstdDecompressor(final int bufferSize) {
|
ZstdDecompressor(final int bufferSize, final byte[] dictionary) {
|
||||||
this.bufferSize = bufferSize;
|
this.bufferSize = bufferSize;
|
||||||
this.inBuf = ByteBuffer.allocateDirect(bufferSize);
|
this.inBuf = ByteBuffer.allocateDirect(bufferSize);
|
||||||
this.outBuf = ByteBuffer.allocateDirect(bufferSize);
|
this.outBuf = ByteBuffer.allocateDirect(bufferSize);
|
||||||
this.outBuf.position(bufferSize);
|
this.outBuf.position(bufferSize);
|
||||||
|
if (dictionary != null) {
|
||||||
|
this.dictId = ZstdCodec.getDictionaryId(dictionary);
|
||||||
|
this.dict = new ZstdDictDecompress(dictionary);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ZstdDecompressor(final int bufferSize) {
|
||||||
|
this(bufferSize, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -60,7 +71,11 @@ public class ZstdDecompressor implements CanReinit, Decompressor {
|
||||||
inLen -= remaining;
|
inLen -= remaining;
|
||||||
outBuf.clear();
|
outBuf.clear();
|
||||||
int written;
|
int written;
|
||||||
written = Zstd.decompress(outBuf, inBuf);
|
if (dict != null) {
|
||||||
|
written = Zstd.decompress(outBuf, inBuf, dict);
|
||||||
|
} else {
|
||||||
|
written = Zstd.decompress(outBuf, inBuf);
|
||||||
|
}
|
||||||
inBuf.clear();
|
inBuf.clear();
|
||||||
LOG.trace("decompress: decompressed {} -> {}", remaining, written);
|
LOG.trace("decompress: decompressed {} -> {}", remaining, written);
|
||||||
outBuf.flip();
|
outBuf.flip();
|
||||||
|
@ -116,8 +131,7 @@ public class ZstdDecompressor implements CanReinit, Decompressor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setDictionary(final byte[] b, final int off, final int len) {
|
public void setDictionary(final byte[] b, final int off, final int len) {
|
||||||
LOG.trace("setDictionary: off={} len={}", off, len);
|
throw new UnsupportedOperationException("setDictionary is not supported");
|
||||||
throw new UnsupportedOperationException("setDictionary not supported");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -143,12 +157,26 @@ public class ZstdDecompressor implements CanReinit, Decompressor {
|
||||||
public void reinit(final Configuration conf) {
|
public void reinit(final Configuration conf) {
|
||||||
LOG.trace("reinit");
|
LOG.trace("reinit");
|
||||||
if (conf != null) {
|
if (conf != null) {
|
||||||
|
// Dictionary may have changed
|
||||||
|
byte[] b = ZstdCodec.getDictionary(conf);
|
||||||
|
if (b != null) {
|
||||||
|
// Don't casually create dictionary objects; they consume native memory
|
||||||
|
int thisDictId = ZstdCodec.getDictionaryId(b);
|
||||||
|
if (dict == null || dictId != thisDictId) {
|
||||||
|
dictId = thisDictId;
|
||||||
|
dict = new ZstdDictDecompress(b);
|
||||||
|
LOG.trace("Reloaded dictionary, new id is {}", dictId);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
dict = null;
|
||||||
|
}
|
||||||
// Buffer size might have changed
|
// Buffer size might have changed
|
||||||
int newBufferSize = ZstdCodec.getBufferSize(conf);
|
int newBufferSize = ZstdCodec.getBufferSize(conf);
|
||||||
if (bufferSize != newBufferSize) {
|
if (bufferSize != newBufferSize) {
|
||||||
bufferSize = newBufferSize;
|
bufferSize = newBufferSize;
|
||||||
this.inBuf = ByteBuffer.allocateDirect(bufferSize);
|
this.inBuf = ByteBuffer.allocateDirect(bufferSize);
|
||||||
this.outBuf = ByteBuffer.allocateDirect(bufferSize);
|
this.outBuf = ByteBuffer.allocateDirect(bufferSize);
|
||||||
|
LOG.trace("Resized buffers, new size is {}", bufferSize);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
reset();
|
reset();
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.io.compress.zstd;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.io.compress.CompressionTestBase;
|
import org.apache.hadoop.hbase.io.compress.CompressionTestBase;
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
|
@ -33,20 +34,20 @@ public class TestZstdCodec extends CompressionTestBase {
|
||||||
HBaseClassTestRule.forClass(TestZstdCodec.class);
|
HBaseClassTestRule.forClass(TestZstdCodec.class);
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testzstdCodecSmall() throws Exception {
|
public void testZstdCodecSmall() throws Exception {
|
||||||
codecSmallTest(new ZstdCodec());
|
codecSmallTest(new ZstdCodec());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testzstdCodecLarge() throws Exception {
|
public void testZstdCodecLarge() throws Exception {
|
||||||
codecLargeTest(new ZstdCodec(), 1.1); // poor compressability
|
codecLargeTest(new ZstdCodec(), 1.1); // poor compressability
|
||||||
codecLargeTest(new ZstdCodec(), 2);
|
codecLargeTest(new ZstdCodec(), 2);
|
||||||
codecLargeTest(new ZstdCodec(), 10); // very high compressability
|
codecLargeTest(new ZstdCodec(), 10); // very high compressability
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testzstdCodecVeryLarge() throws Exception {
|
public void testZstdCodecVeryLarge() throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = HBaseConfiguration.create();
|
||||||
// ZStandard levels range from 1 to 22.
|
// ZStandard levels range from 1 to 22.
|
||||||
// Level 22 might take up to a minute to complete. 3 is the Hadoop default, and will be fast.
|
// Level 22 might take up to a minute to complete. 3 is the Hadoop default, and will be fast.
|
||||||
conf.setInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY, 3);
|
conf.setInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY, 3);
|
||||||
|
|
|
@ -0,0 +1,98 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.io.compress.zstd;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Random;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.io.compress.CompressionTestBase;
|
||||||
|
import org.apache.hadoop.hbase.io.compress.DictionaryCache;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.apache.hadoop.hbase.util.RandomDistribution;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category(SmallTests.class)
|
||||||
|
public class TestZstdDictionary extends CompressionTestBase {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestZstdDictionary.class);
|
||||||
|
|
||||||
|
private static final String DICTIONARY_PATH = DictionaryCache.RESOURCE_SCHEME + "zstd.test.dict";
|
||||||
|
// zstd.test.data compressed with zstd.test.dict at level 3 will produce a result of
|
||||||
|
// 358555 bytes
|
||||||
|
private static final int EXPECTED_COMPRESSED_SIZE = 358555;
|
||||||
|
|
||||||
|
private static byte[] TEST_DATA;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUp() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
TEST_DATA = DictionaryCache.loadFromResource(conf,
|
||||||
|
DictionaryCache.RESOURCE_SCHEME + "zstd.test.data", /* maxSize */ 1024*1024);
|
||||||
|
assertNotNull("Failed to load test data", TEST_DATA);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY, 3);
|
||||||
|
conf.set(ZstdCodec.ZSTD_DICTIONARY_KEY, DICTIONARY_PATH);
|
||||||
|
ZstdCodec codec = new ZstdCodec();
|
||||||
|
codec.setConf(conf);
|
||||||
|
codecTest(codec, new byte[][] { TEST_DATA }, EXPECTED_COMPRESSED_SIZE);
|
||||||
|
// Assert that the dictionary was actually loaded
|
||||||
|
assertTrue("Dictionary was not loaded by codec", DictionaryCache.contains(DICTIONARY_PATH));
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// For generating the test data in src/test/resources/
|
||||||
|
//
|
||||||
|
|
||||||
|
public static void main(String[] args) throws IOException {
|
||||||
|
// Write 1000 1k blocks for training to the specified file
|
||||||
|
// Train with:
|
||||||
|
// zstd --train -B1024 -o <dictionary_file> <input_file>
|
||||||
|
if (args.length < 1) {
|
||||||
|
System.err.println("Usage: TestZstdCodec <outFile>");
|
||||||
|
System.exit(-1);
|
||||||
|
}
|
||||||
|
final RandomDistribution.DiscreteRNG rng =
|
||||||
|
new RandomDistribution.Zipf(new Random(), 0, Byte.MAX_VALUE, 2);
|
||||||
|
final File outFile = new File(args[0]);
|
||||||
|
final byte[] buffer = new byte[1024];
|
||||||
|
System.out.println("Generating " + outFile);
|
||||||
|
try (FileOutputStream os = new FileOutputStream(outFile)) {
|
||||||
|
for (int i = 0; i < 1000; i++) {
|
||||||
|
fill(rng, buffer);
|
||||||
|
os.write(buffer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
System.out.println("Done");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,148 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.io.compress.zstd;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
|
||||||
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
|
import org.apache.hadoop.hbase.client.AsyncConnection;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||||
|
import org.apache.hadoop.hbase.io.compress.DictionaryCache;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category({ RegionServerTests.class, LargeTests.class })
|
||||||
|
public class TestZstdDictionarySplitMerge {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestZstdDictionarySplitMerge.class);
|
||||||
|
|
||||||
|
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
private static Configuration conf;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUp() throws Exception {
|
||||||
|
// NOTE: Don't put configuration settings in global site schema. We are testing if per
|
||||||
|
// CF or per table schema settings are applied correctly.
|
||||||
|
conf = TEST_UTIL.getConfiguration();
|
||||||
|
conf.set(Compression.ZSTD_CODEC_CLASS_KEY, ZstdCodec.class.getCanonicalName());
|
||||||
|
Compression.Algorithm.ZSTD.reload(conf);
|
||||||
|
conf.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 1000);
|
||||||
|
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
|
||||||
|
TEST_UTIL.startMiniCluster(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() throws Exception {
|
||||||
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test() throws Exception {
|
||||||
|
// Create the table
|
||||||
|
|
||||||
|
final TableName tableName = TableName.valueOf("TestZstdDictionarySplitMerge");
|
||||||
|
final byte[] cfName = Bytes.toBytes("info");
|
||||||
|
final String dictionaryPath = DictionaryCache.RESOURCE_SCHEME + "zstd.test.dict";
|
||||||
|
final TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName)
|
||||||
|
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(cfName)
|
||||||
|
.setCompressionType(Compression.Algorithm.ZSTD)
|
||||||
|
.setConfiguration(ZstdCodec.ZSTD_DICTIONARY_KEY, dictionaryPath)
|
||||||
|
.build())
|
||||||
|
.build();
|
||||||
|
final Admin admin = TEST_UTIL.getAdmin();
|
||||||
|
admin.createTable(td, new byte[][] { Bytes.toBytes(1) });
|
||||||
|
TEST_UTIL.waitTableAvailable(tableName);
|
||||||
|
|
||||||
|
// Load some data
|
||||||
|
|
||||||
|
Table t = ConnectionFactory.createConnection(conf).getTable(tableName);
|
||||||
|
TEST_UTIL.loadNumericRows(t, cfName, 0, 100_000);
|
||||||
|
admin.flush(tableName);
|
||||||
|
assertTrue("Dictionary was not loaded", DictionaryCache.contains(dictionaryPath));
|
||||||
|
TEST_UTIL.verifyNumericRows(t, cfName, 0, 100_000, 0);
|
||||||
|
|
||||||
|
// Test split procedure
|
||||||
|
|
||||||
|
admin.split(tableName, Bytes.toBytes(50_000));
|
||||||
|
TEST_UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
|
||||||
|
@Override
|
||||||
|
public boolean evaluate() throws Exception {
|
||||||
|
return TEST_UTIL.getMiniHBaseCluster().getRegions(tableName).size() == 3;
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public String explainFailure() throws Exception {
|
||||||
|
return "Split has not finished yet";
|
||||||
|
}
|
||||||
|
});
|
||||||
|
TEST_UTIL.waitUntilNoRegionsInTransition();
|
||||||
|
TEST_UTIL.verifyNumericRows(t, cfName, 0, 100_000, 0);
|
||||||
|
|
||||||
|
// Test merge procedure
|
||||||
|
|
||||||
|
RegionInfo regionA = null;
|
||||||
|
RegionInfo regionB = null;
|
||||||
|
for (RegionInfo region: admin.getRegions(tableName)) {
|
||||||
|
if (region.getStartKey().length == 0) {
|
||||||
|
regionA = region;
|
||||||
|
} else if (Bytes.equals(region.getStartKey(), Bytes.toBytes(1))) {
|
||||||
|
regionB = region;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertNotNull(regionA);
|
||||||
|
assertNotNull(regionB);
|
||||||
|
admin.mergeRegionsAsync(new byte[][] {
|
||||||
|
regionA.getRegionName(),
|
||||||
|
regionB.getRegionName()
|
||||||
|
}, false).get(30, TimeUnit.SECONDS);
|
||||||
|
assertEquals(2, admin.getRegions(tableName).size());
|
||||||
|
ServerName expected = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName();
|
||||||
|
assertEquals(expected, TEST_UTIL.getConnection().getRegionLocator(tableName)
|
||||||
|
.getRegionLocation(Bytes.toBytes(1), true).getServerName());
|
||||||
|
try (AsyncConnection asyncConn =
|
||||||
|
ConnectionFactory.createAsyncConnection(conf).get()) {
|
||||||
|
assertEquals(expected, asyncConn.getRegionLocator(tableName)
|
||||||
|
.getRegionLocation(Bytes.toBytes(1), true).get().getServerName());
|
||||||
|
}
|
||||||
|
TEST_UTIL.verifyNumericRows(t, cfName, 0, 100_000, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Binary file not shown.
Binary file not shown.
|
@ -24,6 +24,8 @@ import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.MetaMutationAnnotation;
|
import org.apache.hadoop.hbase.MetaMutationAnnotation;
|
||||||
|
@ -54,6 +56,7 @@ import org.apache.hadoop.hbase.quotas.QuotaExceededException;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
|
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
|
||||||
import org.apache.hadoop.hbase.regionserver.HStoreFile;
|
import org.apache.hadoop.hbase.regionserver.HStoreFile;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.StoreUtils;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||||
import org.apache.hadoop.hbase.wal.WALSplitUtil;
|
import org.apache.hadoop.hbase.wal.WALSplitUtil;
|
||||||
|
@ -610,10 +613,16 @@ public class MergeTableRegionsProcedure
|
||||||
String family = hcd.getNameAsString();
|
String family = hcd.getNameAsString();
|
||||||
final Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family);
|
final Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family);
|
||||||
if (storeFiles != null && storeFiles.size() > 0) {
|
if (storeFiles != null && storeFiles.size() > 0) {
|
||||||
|
final Configuration storeConfiguration =
|
||||||
|
StoreUtils.createStoreConfiguration(env.getMasterConfiguration(), htd, hcd);
|
||||||
for (StoreFileInfo storeFileInfo : storeFiles) {
|
for (StoreFileInfo storeFileInfo : storeFiles) {
|
||||||
// Create reference file(s) to parent region file here in mergedDir.
|
// Create reference file(s) to parent region file here in mergedDir.
|
||||||
// As this procedure is running on master, use CacheConfig.DISABLED means
|
// As this procedure is running on master, use CacheConfig.DISABLED means
|
||||||
// don't cache any block.
|
// don't cache any block.
|
||||||
|
// We also need to pass through a suitable CompoundConfiguration as if this
|
||||||
|
// is running in a regionserver's Store context, or we might not be able
|
||||||
|
// to read the hfiles.
|
||||||
|
storeFileInfo.setConf(storeConfiguration);
|
||||||
mergeRegionFs.mergeStoreFile(regionFs.getRegionInfo(), family,
|
mergeRegionFs.mergeStoreFile(regionFs.getRegionInfo(), family,
|
||||||
new HStoreFile(storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED));
|
new HStoreFile(storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED));
|
||||||
}
|
}
|
||||||
|
|
|
@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.regionserver.HStoreFile;
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
|
import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionSplitRestriction;
|
import org.apache.hadoop.hbase.regionserver.RegionSplitRestriction;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.StoreUtils;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
@ -711,12 +712,17 @@ public class SplitTableRegionProcedure
|
||||||
final ColumnFamilyDescriptor hcd = htd.getColumnFamily(familyName);
|
final ColumnFamilyDescriptor hcd = htd.getColumnFamily(familyName);
|
||||||
final Collection<StoreFileInfo> storeFiles = e.getValue();
|
final Collection<StoreFileInfo> storeFiles = e.getValue();
|
||||||
if (storeFiles != null && storeFiles.size() > 0) {
|
if (storeFiles != null && storeFiles.size() > 0) {
|
||||||
|
final Configuration storeConfiguration =
|
||||||
|
StoreUtils.createStoreConfiguration(env.getMasterConfiguration(), htd, hcd);
|
||||||
for (StoreFileInfo storeFileInfo : storeFiles) {
|
for (StoreFileInfo storeFileInfo : storeFiles) {
|
||||||
// As this procedure is running on master, use CacheConfig.DISABLED means
|
// As this procedure is running on master, use CacheConfig.DISABLED means
|
||||||
// don't cache any block.
|
// don't cache any block.
|
||||||
StoreFileSplitter sfs =
|
// We also need to pass through a suitable CompoundConfiguration as if this
|
||||||
new StoreFileSplitter(regionFs, familyName, new HStoreFile(
|
// is running in a regionserver's Store context, or we might not be able
|
||||||
storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED));
|
// to read the hfiles.
|
||||||
|
storeFileInfo.setConf(storeConfiguration);
|
||||||
|
StoreFileSplitter sfs = new StoreFileSplitter(regionFs, familyName,
|
||||||
|
new HStoreFile(storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED));
|
||||||
futures.add(threadPool.submit(sfs));
|
futures.add(threadPool.submit(sfs));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -531,6 +531,9 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
||||||
|
|
||||||
int totalValidStoreFile = 0;
|
int totalValidStoreFile = 0;
|
||||||
for (StoreFileInfo storeFileInfo : files) {
|
for (StoreFileInfo storeFileInfo : files) {
|
||||||
|
// The StoreFileInfo will carry store configuration down to HFile, we need to set it to
|
||||||
|
// our store's CompoundConfiguration here.
|
||||||
|
storeFileInfo.setConf(conf);
|
||||||
// open each store file in parallel
|
// open each store file in parallel
|
||||||
completionService.submit(() -> this.createStoreFileAndReader(storeFileInfo));
|
completionService.submit(() -> this.createStoreFileAndReader(storeFileInfo));
|
||||||
totalValidStoreFile++;
|
totalValidStoreFile++;
|
||||||
|
|
|
@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configurable;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
@ -48,7 +49,7 @@ import org.slf4j.LoggerFactory;
|
||||||
* Describe a StoreFile (hfile, reference, link)
|
* Describe a StoreFile (hfile, reference, link)
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class StoreFileInfo {
|
public class StoreFileInfo implements Configurable {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(StoreFileInfo.class);
|
private static final Logger LOG = LoggerFactory.getLogger(StoreFileInfo.class);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -87,7 +88,7 @@ public class StoreFileInfo {
|
||||||
public static final boolean DEFAULT_STORE_FILE_READER_NO_READAHEAD = false;
|
public static final boolean DEFAULT_STORE_FILE_READER_NO_READAHEAD = false;
|
||||||
|
|
||||||
// Configuration
|
// Configuration
|
||||||
private final Configuration conf;
|
private Configuration conf;
|
||||||
|
|
||||||
// FileSystem handle
|
// FileSystem handle
|
||||||
private final FileSystem fs;
|
private final FileSystem fs;
|
||||||
|
@ -234,6 +235,16 @@ public class StoreFileInfo {
|
||||||
DEFAULT_STORE_FILE_READER_NO_READAHEAD);
|
DEFAULT_STORE_FILE_READER_NO_READAHEAD);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Configuration getConf() {
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setConf(Configuration conf) {
|
||||||
|
this.conf = conf;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Size of the Hfile
|
* Size of the Hfile
|
||||||
* @return size
|
* @return size
|
||||||
|
@ -632,10 +643,6 @@ public class StoreFileInfo {
|
||||||
return this.fs;
|
return this.fs;
|
||||||
}
|
}
|
||||||
|
|
||||||
Configuration getConf() {
|
|
||||||
return this.conf;
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean isNoReadahead() {
|
boolean isNoReadahead() {
|
||||||
return this.noReadahead;
|
return this.noReadahead;
|
||||||
}
|
}
|
||||||
|
@ -673,4 +680,5 @@ public class StoreFileInfo {
|
||||||
public void initHFileInfo(ReaderContext context) throws IOException {
|
public void initHFileInfo(ReaderContext context) throws IOException {
|
||||||
this.hfileInfo = new HFileInfo(context, conf);
|
this.hfileInfo = new HFileInfo(context, conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue