HBASE-26353 Support loadable dictionaries in hbase-compression-zstd (#3748)

ZStandard supports initialization of compressors and decompressors with a
precomputed dictionary, which can dramatically improve and speed up compression
of tables with small values. For more details, please see

  The Case For Small Data Compression
  https://github.com/facebook/zstd#the-case-for-small-data-compression

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Andrew Purtell 2021-10-19 13:37:24 -07:00
parent b6bb18022f
commit 8ac0b5ed7f
9 changed files with 332 additions and 35 deletions

View File

@ -21,6 +21,8 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public class CompressionUtil {
private CompressionUtil() { }
/**
* Round up to the next power of two, unless the value would become negative (ints
* are signed), in which case just return Integer.MAX_VALUE.

View File

@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hbase.io.compress;
import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
/**
* A utility class for managing compressor/decompressor dictionary loading and caching of load
* results. Useful for any codec that can support changing dictionaries at runtime,
* such as ZStandard.
*/
@InterfaceAudience.Private
public class DictionaryCache {
public static final String DICTIONARY_MAX_SIZE_KEY = "hbase.io.compress.dictionary.max.size";
public static final int DEFAULT_DICTIONARY_MAX_SIZE = 10 * 1024 * 1024;
public static final String RESOURCE_SCHEME = "resource://";
private static final Logger LOG = LoggerFactory.getLogger(DictionaryCache.class);
private static volatile LoadingCache<String, byte[]> CACHE;
private DictionaryCache() { }
/**
* Load a dictionary or return a previously cached load.
* @param conf configuration
* @param path the hadoop Path where the dictionary is located, as a String
* @return the dictionary bytes if successful, null otherwise
*/
public static byte[] getDictionary(final Configuration conf, final String path)
throws IOException {
if (path == null || path.isEmpty()) {
return null;
}
// Create the dictionary loading cache if we haven't already
if (CACHE == null) {
synchronized (DictionaryCache.class) {
if (CACHE == null) {
final int maxSize = conf.getInt(DICTIONARY_MAX_SIZE_KEY, DEFAULT_DICTIONARY_MAX_SIZE);
CACHE = CacheBuilder.newBuilder()
.maximumSize(100)
.expireAfterAccess(10, TimeUnit.MINUTES)
.build(
new CacheLoader<String, byte[]>() {
public byte[] load(String s) throws Exception {
byte[] bytes;
if (path.startsWith(RESOURCE_SCHEME)) {
bytes = loadFromResource(conf, path, maxSize);
} else {
bytes = loadFromHadoopFs(conf, path, maxSize);
}
LOG.info("Loaded dictionary from {} (size {})", s, bytes.length);
return bytes;
}
});
}
}
}
// Get or load the dictionary for the given path
try {
return CACHE.get(path);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
// Visible for testing
public static byte[] loadFromResource(final Configuration conf, final String s,
final int maxSize) throws IOException {
if (!s.startsWith(RESOURCE_SCHEME)) {
throw new IllegalArgumentException("Path does not start with " + RESOURCE_SCHEME);
}
final String path = s.substring(RESOURCE_SCHEME.length(), s.length());
final InputStream in = DictionaryCache.class.getClassLoader().getResourceAsStream(path);
if (in == null) {
throw new FileNotFoundException("Resource " + path + " not found");
}
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final byte[] buffer = new byte[8192];
try {
int n, len = 0;
do {
n = in.read(buffer);
if (n > 0) {
len += n;
if (len > maxSize) {
throw new IllegalArgumentException("Dictionary " + s + " is too large" +
", limit=" + maxSize);
}
baos.write(buffer, 0, n);
}
} while (n > 0);
} finally {
in.close();
}
return baos.toByteArray();
}
private static byte[] loadFromHadoopFs(final Configuration conf, final String s,
final int maxSize) throws IOException {
final Path path = new Path(s);
final FileSystem fs = FileSystem.get(path.toUri(), conf);
final FileStatus stat = fs.getFileStatus(path);
if (!stat.isFile()) {
throw new IllegalArgumentException(s + " is not a file");
}
if (stat.getLen() > maxSize) {
throw new IllegalArgumentException("Dictionary " + s + " is too large" +
", size=" + stat.getLen() + ", limit=" + maxSize);
}
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final byte[] buffer = new byte[8192];
try (final FSDataInputStream in = fs.open(path)) {
int n;
do {
n = in.read(buffer);
if (n > 0) {
baos.write(buffer, 0, n);
}
} while (n > 0);
}
return baos.toByteArray();
}
// Visible for testing
public static boolean contains(String dictionaryPath) {
if (CACHE != null) {
return CACHE.asMap().containsKey(dictionaryPath);
}
return false;
}
}

View File

@ -17,12 +17,10 @@
package org.apache.hadoop.hbase.io.compress;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.util.Arrays;
import java.util.Random;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -31,6 +29,8 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -39,11 +39,11 @@ public class CompressionTestBase {
protected static final Logger LOG = LoggerFactory.getLogger(CompressionTestBase.class);
static final int LARGE_SIZE = 10 * 1024 * 1024;
static final int VERY_LARGE_SIZE = 100 * 1024 * 1024;
static final int BLOCK_SIZE = 4096;
protected static final int LARGE_SIZE = 10 * 1024 * 1024;
protected static final int VERY_LARGE_SIZE = 100 * 1024 * 1024;
protected static final int BLOCK_SIZE = 4096;
static final byte[] SMALL_INPUT;
protected static final byte[] SMALL_INPUT;
static {
// 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987, 1597
SMALL_INPUT = new byte[1+1+2+3+5+8+13+21+34+55+89+144+233+377+610+987+1597];
@ -67,15 +67,21 @@ public class CompressionTestBase {
Arrays.fill(SMALL_INPUT, off, (off+=1597), (byte)'Q');
}
protected void codecTest(final CompressionCodec codec, final byte[][] input)
throws Exception {
protected void codecTest(final CompressionCodec codec, final byte[][] input) throws Exception {
codecTest(codec, input, null);
}
protected void codecTest(final CompressionCodec codec, final byte[][] input,
final Integer expectedCompressedSize) throws Exception {
// We do this in Compression.java
((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024);
// Compress
ByteArrayOutputStream baos = new ByteArrayOutputStream();
CompressionOutputStream out = codec.createOutputStream(baos);
int inLen = 0;
long start = EnvironmentEdgeManager.currentTime();
Compressor compressor = codec.createCompressor();
compressor.reinit(((Configurable)codec).getConf());
ByteArrayOutputStream baos = new ByteArrayOutputStream();
CompressionOutputStream out = codec.createOutputStream(baos, compressor);
int inLen = 0;
for (int i = 0; i < input.length; i++) {
out.write(input[i]);
inLen += input[i].length;
@ -85,9 +91,18 @@ public class CompressionTestBase {
final byte[] compressed = baos.toByteArray();
LOG.info("{} compressed {} bytes to {} bytes in {} ms", codec.getClass().getSimpleName(),
inLen, compressed.length, end - start);
if (expectedCompressedSize != null) {
assertTrue("Expected compressed size does not match: (expected=" + expectedCompressedSize +
", actual=" + compressed.length + ")", expectedCompressedSize == compressed.length);
}
// Decompress
final byte[] plain = new byte[inLen];
CompressionInputStream in = codec.createInputStream(new ByteArrayInputStream(compressed));
Decompressor decompressor = codec.createDecompressor();
if (decompressor instanceof CanReinit) {
((CanReinit)decompressor).reinit(((Configurable)codec).getConf());
}
CompressionInputStream in = codec.createInputStream(new ByteArrayInputStream(compressed),
decompressor);
start = EnvironmentEdgeManager.currentTime();
IOUtils.readFully(in, plain, 0, plain.length);
in.close();
@ -113,29 +128,37 @@ public class CompressionTestBase {
/**
* Test with a large input (1MB) divided into blocks of 4KB.
*/
protected void codecLargeTest(final CompressionCodec codec, final double sigma) throws Exception {
RandomDistribution.DiscreteRNG zipf =
protected void codecLargeTest(final CompressionCodec codec, final double sigma)
throws Exception {
RandomDistribution.DiscreteRNG rng =
new RandomDistribution.Zipf(new Random(), 0, Byte.MAX_VALUE, sigma);
final byte[][] input = new byte[LARGE_SIZE/BLOCK_SIZE][BLOCK_SIZE];
for (int i = 0; i < input.length; i++) {
for (int j = 0; j < input[i].length; j++) {
input[i][j] = (byte)zipf.nextInt();
}
}
fill(rng, input);
codecTest(codec, input);
}
/**
* Test with a very large input (100MB) as a single input buffer.
*/
protected void codecVeryLargeTest(final CompressionCodec codec, final double sigma) throws Exception {
RandomDistribution.DiscreteRNG zipf =
protected void codecVeryLargeTest(final CompressionCodec codec, final double sigma)
throws Exception {
RandomDistribution.DiscreteRNG rng =
new RandomDistribution.Zipf(new Random(), 0, Byte.MAX_VALUE, sigma);
final byte[][] input = new byte[1][VERY_LARGE_SIZE];
for (int i = 0; i < VERY_LARGE_SIZE; i++) {
input[0][i] = (byte)zipf.nextInt();
}
fill(rng, input);
codecTest(codec, input);
}
protected static void fill(RandomDistribution.DiscreteRNG rng, byte[][] input) {
for (int i = 0; i < input.length; i++) {
fill(rng, input[i]);
}
}
protected static void fill(RandomDistribution.DiscreteRNG rng, byte[] input) {
for (int i = 0; i < input.length; i++) {
input[i] = (byte) rng.nextInt();
}
}
}

View File

@ -23,6 +23,7 @@ import java.io.OutputStream;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hbase.io.compress.DictionaryCache;
import org.apache.hadoop.io.compress.BlockCompressorStream;
import org.apache.hadoop.io.compress.BlockDecompressorStream;
import org.apache.hadoop.io.compress.CompressionCodec;
@ -42,6 +43,7 @@ public class ZstdCodec implements Configurable, CompressionCodec {
public static final String ZSTD_LEVEL_KEY = "hbase.io.compress.zstd.level";
public static final String ZSTD_BUFFER_SIZE_KEY = "hbase.io.compress.zstd.buffersize";
public static final String ZSTD_DICTIONARY_KEY = "hbase.io.compress.zstd.dictionary";
private Configuration conf;
@ -61,12 +63,12 @@ public class ZstdCodec implements Configurable, CompressionCodec {
@Override
public Compressor createCompressor() {
return new ZstdCompressor(getLevel(conf), getBufferSize(conf));
return new ZstdCompressor(getLevel(conf), getBufferSize(conf), getDictionary(conf));
}
@Override
public Decompressor createDecompressor() {
return new ZstdDecompressor(getBufferSize(conf));
return new ZstdDecompressor(getBufferSize(conf), getDictionary(conf));
}
@Override
@ -124,4 +126,13 @@ public class ZstdCodec implements Configurable, CompressionCodec {
return size > 0 ? size : 256 * 1024; // Don't change this default
}
static byte[] getDictionary(final Configuration conf) {
String path = conf.get(ZSTD_DICTIONARY_KEY);
try {
return DictionaryCache.getDictionary(conf, path);
} catch (IOException e) {
throw new RuntimeException("Unable to load dictionary at " + path, e);
}
}
}

View File

@ -28,6 +28,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.luben.zstd.Zstd;
import com.github.luben.zstd.ZstdDictCompress;
/**
* Hadoop compressor glue for zstd-jni.
@ -40,13 +41,21 @@ public class ZstdCompressor implements CanReinit, Compressor {
protected ByteBuffer inBuf, outBuf;
protected boolean finish, finished;
protected long bytesRead, bytesWritten;
protected ZstdDictCompress dict;
ZstdCompressor(final int level, final int bufferSize) {
ZstdCompressor(final int level, final int bufferSize, final byte[] dictionary) {
this.level = level;
this.bufferSize = bufferSize;
this.inBuf = ByteBuffer.allocateDirect(bufferSize);
this.outBuf = ByteBuffer.allocateDirect(bufferSize);
this.outBuf.position(bufferSize);
if (dictionary != null) {
this.dict = new ZstdDictCompress(dictionary, level);
}
}
ZstdCompressor(final int level, final int bufferSize) {
this(level, bufferSize, null);
}
@Override
@ -74,7 +83,12 @@ public class ZstdCompressor implements CanReinit, Compressor {
} else {
outBuf.clear();
}
int written = Zstd.compress(outBuf, inBuf, level);
int written;
if (dict != null) {
written = Zstd.compress(outBuf, inBuf, dict);
} else {
written = Zstd.compress(outBuf, inBuf, level);
}
bytesWritten += written;
inBuf.clear();
LOG.trace("compress: compressed {} -> {} (level {})", uncompressed, written, level);
@ -133,6 +147,11 @@ public class ZstdCompressor implements CanReinit, Compressor {
if (conf != null) {
// Level might have changed
level = ZstdCodec.getLevel(conf);
// Dictionary may have changed
byte[] b = ZstdCodec.getDictionary(conf);
if (b != null) {
dict = new ZstdDictCompress(b, level);
}
// Buffer size might have changed
int newBufferSize = ZstdCodec.getBufferSize(conf);
if (bufferSize != newBufferSize) {

View File

@ -26,6 +26,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.luben.zstd.Zstd;
import com.github.luben.zstd.ZstdDictDecompress;
/**
* Hadoop decompressor glue for zstd-java.
@ -38,12 +39,20 @@ public class ZstdDecompressor implements CanReinit, Decompressor {
protected int bufferSize;
protected int inLen;
protected boolean finished;
protected ZstdDictDecompress dict;
ZstdDecompressor(final int bufferSize) {
ZstdDecompressor(final int bufferSize, final byte[] dictionary) {
this.bufferSize = bufferSize;
this.inBuf = ByteBuffer.allocateDirect(bufferSize);
this.outBuf = ByteBuffer.allocateDirect(bufferSize);
this.outBuf.position(bufferSize);
if (dictionary != null) {
this.dict = new ZstdDictDecompress(dictionary);
}
}
ZstdDecompressor(final int bufferSize) {
this(bufferSize, null);
}
@Override
@ -60,7 +69,11 @@ public class ZstdDecompressor implements CanReinit, Decompressor {
inLen -= remaining;
outBuf.clear();
int written;
written = Zstd.decompress(outBuf, inBuf);
if (dict != null) {
written = Zstd.decompress(outBuf, inBuf, dict);
} else {
written = Zstd.decompress(outBuf, inBuf);
}
inBuf.clear();
LOG.trace("decompress: decompressed {} -> {}", remaining, written);
outBuf.flip();
@ -117,7 +130,7 @@ public class ZstdDecompressor implements CanReinit, Decompressor {
@Override
public void setDictionary(final byte[] b, final int off, final int len) {
LOG.trace("setDictionary: off={} len={}", off, len);
throw new UnsupportedOperationException("setDictionary not supported");
this.dict = new ZstdDictDecompress(b, off, len);
}
@Override
@ -143,6 +156,11 @@ public class ZstdDecompressor implements CanReinit, Decompressor {
public void reinit(final Configuration conf) {
LOG.trace("reinit");
if (conf != null) {
// Dictionary may have changed
byte[] b = ZstdCodec.getDictionary(conf);
if (b != null) {
dict = new ZstdDictDecompress(b);
}
// Buffer size might have changed
int newBufferSize = ZstdCodec.getBufferSize(conf);
if (bufferSize != newBufferSize) {

View File

@ -16,11 +16,20 @@
*/
package org.apache.hadoop.hbase.io.compress.zstd;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.io.compress.CompressionTestBase;
import org.apache.hadoop.hbase.io.compress.DictionaryCache;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.RandomDistribution;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -33,20 +42,20 @@ public class TestZstdCodec extends CompressionTestBase {
HBaseClassTestRule.forClass(TestZstdCodec.class);
@Test
public void testzstdCodecSmall() throws Exception {
public void testZstdCodecSmall() throws Exception {
codecSmallTest(new ZstdCodec());
}
@Test
public void testzstdCodecLarge() throws Exception {
public void testZstdCodecLarge() throws Exception {
codecLargeTest(new ZstdCodec(), 1.1); // poor compressability
codecLargeTest(new ZstdCodec(), 2);
codecLargeTest(new ZstdCodec(), 10); // very high compressability
}
@Test
public void testzstdCodecVeryLarge() throws Exception {
Configuration conf = new Configuration();
public void testZstdCodecVeryLarge() throws Exception {
Configuration conf = HBaseConfiguration.create();
// ZStandard levels range from 1 to 22.
// Level 22 might take up to a minute to complete. 3 is the Hadoop default, and will be fast.
conf.setInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY, 3);
@ -55,4 +64,55 @@ public class TestZstdCodec extends CompressionTestBase {
codecVeryLargeTest(codec, 3); // like text
}
@Test
public void testZstdCodecWithDictionary() throws Exception {
// zstd.test.data compressed with zstd.test.dict at level 3 will produce a result of
// 365533 bytes
final int expectedCompressedSize = 365533;
Configuration conf = HBaseConfiguration.create();
conf.setInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY, 3);
// Configure for dictionary available in test resources
final String dictionaryPath = DictionaryCache.RESOURCE_SCHEME + "zstd.test.dict";
conf.set(ZstdCodec.ZSTD_DICTIONARY_KEY, dictionaryPath);
// Load test data from test resources
// This will throw an IOException if the test data cannot be loaded
final byte[] testData = DictionaryCache.loadFromResource(conf,
DictionaryCache.RESOURCE_SCHEME + "zstd.test.data", /* maxSize */ 1024*1024);
assertNotNull("Failed to load test data", testData);
// Run the test
// This will throw an IOException of some kind if there is a problem loading or using the
// dictionary.
ZstdCodec codec = new ZstdCodec();
codec.setConf(conf);
codecTest(codec, new byte[][] { testData }, expectedCompressedSize);
// Assert that the dictionary was actually loaded
assertTrue("Dictionary was not loaded by codec", DictionaryCache.contains(dictionaryPath));
}
//
// For generating the test data in src/test/resources/
//
public static void main(String[] args) throws IOException {
// Write 1000 1k blocks for training to the specified file
// Train with:
// zstd --train-fastcover=k=32,b=8 -B1024 -o <dictionary_file> <input_file>
if (args.length < 1) {
System.err.println("Usage: TestZstdCodec <outFile>");
System.exit(-1);
}
final RandomDistribution.DiscreteRNG rng =
new RandomDistribution.Zipf(new Random(), 0, Byte.MAX_VALUE, 2);
final File outFile = new File(args[0]);
final byte[] buffer = new byte[1024];
System.out.println("Generating " + outFile);
try (FileOutputStream os = new FileOutputStream(outFile)) {
for (int i = 0; i < 1000; i++) {
fill(rng, buffer);
os.write(buffer);
}
}
System.out.println("Done");
}
}