HBASE-27672 Read RPC threads may BLOCKED at the Configuration.get when using java compression (#5075)
Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
This commit is contained in:
parent
59fdaa28f7
commit
3eedc0987a
|
@ -46,9 +46,11 @@ public class Lz4Codec implements Configurable, CompressionCodec {
|
|||
public static final String LZ4_BUFFER_SIZE_KEY = "hbase.io.compress.lz4.buffersize";
|
||||
|
||||
private Configuration conf;
|
||||
private int bufferSize;
|
||||
|
||||
public Lz4Codec() {
|
||||
conf = new Configuration();
|
||||
bufferSize = getBufferSize(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -59,6 +61,7 @@ public class Lz4Codec implements Configurable, CompressionCodec {
|
|||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
this.conf = conf;
|
||||
this.bufferSize = getBufferSize(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -79,7 +82,7 @@ public class Lz4Codec implements Configurable, CompressionCodec {
|
|||
@Override
|
||||
public CompressionInputStream createInputStream(InputStream in, Decompressor d)
|
||||
throws IOException {
|
||||
return new BlockDecompressorStream(in, d, getBufferSize(conf));
|
||||
return new BlockDecompressorStream(in, d, bufferSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -90,7 +93,6 @@ public class Lz4Codec implements Configurable, CompressionCodec {
|
|||
@Override
|
||||
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
|
||||
throws IOException {
|
||||
int bufferSize = getBufferSize(conf);
|
||||
return new BlockCompressorStream(out, c, bufferSize,
|
||||
CompressionUtil.compressionOverhead(bufferSize));
|
||||
}
|
||||
|
|
|
@ -46,9 +46,11 @@ public class LzoCodec implements Configurable, CompressionCodec {
|
|||
public static final String LZO_BUFFER_SIZE_KEY = "hbase.io.compress.lzo.buffersize";
|
||||
|
||||
private Configuration conf;
|
||||
private int bufferSize;
|
||||
|
||||
public LzoCodec() {
|
||||
conf = new Configuration();
|
||||
bufferSize = getBufferSize(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -59,6 +61,7 @@ public class LzoCodec implements Configurable, CompressionCodec {
|
|||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
this.conf = conf;
|
||||
this.bufferSize = getBufferSize(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -79,7 +82,7 @@ public class LzoCodec implements Configurable, CompressionCodec {
|
|||
@Override
|
||||
public CompressionInputStream createInputStream(InputStream in, Decompressor d)
|
||||
throws IOException {
|
||||
return new BlockDecompressorStream(in, d, getBufferSize(conf));
|
||||
return new BlockDecompressorStream(in, d, bufferSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -90,7 +93,6 @@ public class LzoCodec implements Configurable, CompressionCodec {
|
|||
@Override
|
||||
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
|
||||
throws IOException {
|
||||
int bufferSize = getBufferSize(conf);
|
||||
return new BlockCompressorStream(out, c, bufferSize,
|
||||
CompressionUtil.compressionOverhead(bufferSize));
|
||||
}
|
||||
|
|
|
@ -46,9 +46,11 @@ public class SnappyCodec implements Configurable, CompressionCodec {
|
|||
public static final String SNAPPY_BUFFER_SIZE_KEY = "hbase.io.compress.snappy.buffersize";
|
||||
|
||||
private Configuration conf;
|
||||
private int bufferSize;
|
||||
|
||||
public SnappyCodec() {
|
||||
conf = new Configuration();
|
||||
bufferSize = getBufferSize(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -59,6 +61,7 @@ public class SnappyCodec implements Configurable, CompressionCodec {
|
|||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
this.conf = conf;
|
||||
this.bufferSize = getBufferSize(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -79,7 +82,7 @@ public class SnappyCodec implements Configurable, CompressionCodec {
|
|||
@Override
|
||||
public CompressionInputStream createInputStream(InputStream in, Decompressor d)
|
||||
throws IOException {
|
||||
return new BlockDecompressorStream(in, d, getBufferSize(conf));
|
||||
return new BlockDecompressorStream(in, d, bufferSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -90,7 +93,6 @@ public class SnappyCodec implements Configurable, CompressionCodec {
|
|||
@Override
|
||||
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
|
||||
throws IOException {
|
||||
int bufferSize = getBufferSize(conf);
|
||||
return new BlockCompressorStream(out, c, bufferSize,
|
||||
CompressionUtil.compressionOverhead(bufferSize));
|
||||
}
|
||||
|
|
|
@ -54,14 +54,17 @@ public class ZstdCodec implements Configurable, CompressionCodec {
|
|||
public static final int ZSTD_BUFFER_SIZE_DEFAULT = 256 * 1024;
|
||||
|
||||
private Configuration conf;
|
||||
private int bufferSize;
|
||||
|
||||
public ZstdCodec() {
|
||||
conf = new Configuration();
|
||||
bufferSize = getBufferSize(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
this.conf = conf;
|
||||
this.bufferSize = getBufferSize(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -87,7 +90,7 @@ public class ZstdCodec implements Configurable, CompressionCodec {
|
|||
@Override
|
||||
public CompressionInputStream createInputStream(InputStream in, Decompressor d)
|
||||
throws IOException {
|
||||
return new BlockDecompressorStream(in, d, getBufferSize(conf));
|
||||
return new BlockDecompressorStream(in, d, bufferSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -98,7 +101,6 @@ public class ZstdCodec implements Configurable, CompressionCodec {
|
|||
@Override
|
||||
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
|
||||
throws IOException {
|
||||
int bufferSize = getBufferSize(conf);
|
||||
return new BlockCompressorStream(out, c, bufferSize,
|
||||
CompressionUtil.compressionOverhead(bufferSize));
|
||||
}
|
||||
|
|
|
@ -47,9 +47,15 @@ public class BrotliCodec implements Configurable, CompressionCodec {
|
|||
public static final int BROTLI_BUFFERSIZE_DEFAULT = 256 * 1024;
|
||||
|
||||
private Configuration conf;
|
||||
private int bufferSize;
|
||||
private int level;
|
||||
private int window;
|
||||
|
||||
public BrotliCodec() {
|
||||
conf = new Configuration();
|
||||
bufferSize = getBufferSize(conf);
|
||||
level = getLevel(conf);
|
||||
window = getWindow(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -60,16 +66,19 @@ public class BrotliCodec implements Configurable, CompressionCodec {
|
|||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
this.conf = conf;
|
||||
this.bufferSize = getBufferSize(conf);
|
||||
this.level = getLevel(conf);
|
||||
this.window = getWindow(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Compressor createCompressor() {
|
||||
return new BrotliCompressor(getLevel(conf), getWindow(conf), getBufferSize(conf));
|
||||
return new BrotliCompressor(level, window, bufferSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Decompressor createDecompressor() {
|
||||
return new BrotliDecompressor(getBufferSize(conf));
|
||||
return new BrotliDecompressor(bufferSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -80,7 +89,7 @@ public class BrotliCodec implements Configurable, CompressionCodec {
|
|||
@Override
|
||||
public CompressionInputStream createInputStream(InputStream in, Decompressor d)
|
||||
throws IOException {
|
||||
return new BlockDecompressorStream(in, d, getBufferSize(conf));
|
||||
return new BlockDecompressorStream(in, d, bufferSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -91,7 +100,6 @@ public class BrotliCodec implements Configurable, CompressionCodec {
|
|||
@Override
|
||||
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
|
||||
throws IOException {
|
||||
int bufferSize = getBufferSize(conf);
|
||||
return new BlockCompressorStream(out, c, bufferSize,
|
||||
CompressionUtil.compressionOverhead(bufferSize));
|
||||
}
|
||||
|
|
|
@ -44,9 +44,11 @@ public class Lz4Codec implements Configurable, CompressionCodec {
|
|||
public static final String LZ4_BUFFER_SIZE_KEY = "hbase.io.compress.lz4.buffersize";
|
||||
|
||||
private Configuration conf;
|
||||
private int bufferSize;
|
||||
|
||||
public Lz4Codec() {
|
||||
conf = new Configuration();
|
||||
this.bufferSize = getBufferSize(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -57,16 +59,17 @@ public class Lz4Codec implements Configurable, CompressionCodec {
|
|||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
this.conf = conf;
|
||||
this.bufferSize = getBufferSize(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Compressor createCompressor() {
|
||||
return new Lz4Compressor(getBufferSize(conf));
|
||||
return new Lz4Compressor(bufferSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Decompressor createDecompressor() {
|
||||
return new Lz4Decompressor(getBufferSize(conf));
|
||||
return new Lz4Decompressor(bufferSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -77,7 +80,7 @@ public class Lz4Codec implements Configurable, CompressionCodec {
|
|||
@Override
|
||||
public CompressionInputStream createInputStream(InputStream in, Decompressor d)
|
||||
throws IOException {
|
||||
return new BlockDecompressorStream(in, d, getBufferSize(conf));
|
||||
return new BlockDecompressorStream(in, d, bufferSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -88,7 +91,6 @@ public class Lz4Codec implements Configurable, CompressionCodec {
|
|||
@Override
|
||||
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
|
||||
throws IOException {
|
||||
int bufferSize = getBufferSize(conf);
|
||||
return new BlockCompressorStream(out, c, bufferSize,
|
||||
CompressionUtil.compressionOverhead(bufferSize));
|
||||
}
|
||||
|
|
|
@ -44,9 +44,11 @@ public class SnappyCodec implements Configurable, CompressionCodec {
|
|||
public static final String SNAPPY_BUFFER_SIZE_KEY = "hbase.io.compress.snappy.buffersize";
|
||||
|
||||
private Configuration conf;
|
||||
private int bufferSize;
|
||||
|
||||
public SnappyCodec() {
|
||||
conf = new Configuration();
|
||||
bufferSize = getBufferSize(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -57,16 +59,17 @@ public class SnappyCodec implements Configurable, CompressionCodec {
|
|||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
this.conf = conf;
|
||||
this.bufferSize = getBufferSize(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Compressor createCompressor() {
|
||||
return new SnappyCompressor(getBufferSize(conf));
|
||||
return new SnappyCompressor(bufferSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Decompressor createDecompressor() {
|
||||
return new SnappyDecompressor(getBufferSize(conf));
|
||||
return new SnappyDecompressor(bufferSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -77,7 +80,7 @@ public class SnappyCodec implements Configurable, CompressionCodec {
|
|||
@Override
|
||||
public CompressionInputStream createInputStream(InputStream in, Decompressor d)
|
||||
throws IOException {
|
||||
return new BlockDecompressorStream(in, d, getBufferSize(conf));
|
||||
return new BlockDecompressorStream(in, d, bufferSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -88,7 +91,6 @@ public class SnappyCodec implements Configurable, CompressionCodec {
|
|||
@Override
|
||||
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
|
||||
throws IOException {
|
||||
int bufferSize = getBufferSize(conf);
|
||||
return new BlockCompressorStream(out, c, bufferSize,
|
||||
Snappy.maxCompressedLength(bufferSize) - bufferSize); // overhead only
|
||||
}
|
||||
|
|
|
@ -44,9 +44,13 @@ public class LzmaCodec implements Configurable, CompressionCodec {
|
|||
public static final int LZMA_BUFFERSIZE_DEFAULT = 256 * 1024;
|
||||
|
||||
private Configuration conf;
|
||||
private int bufferSize;
|
||||
private int level;
|
||||
|
||||
public LzmaCodec() {
|
||||
conf = new Configuration();
|
||||
bufferSize = getBufferSize(conf);
|
||||
level = getLevel(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -57,16 +61,18 @@ public class LzmaCodec implements Configurable, CompressionCodec {
|
|||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
this.conf = conf;
|
||||
this.bufferSize = getBufferSize(conf);
|
||||
this.level = getLevel(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Compressor createCompressor() {
|
||||
return new LzmaCompressor(getLevel(conf), getBufferSize(conf));
|
||||
return new LzmaCompressor(level, bufferSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Decompressor createDecompressor() {
|
||||
return new LzmaDecompressor(getBufferSize(conf));
|
||||
return new LzmaDecompressor(bufferSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -77,7 +83,7 @@ public class LzmaCodec implements Configurable, CompressionCodec {
|
|||
@Override
|
||||
public CompressionInputStream createInputStream(InputStream in, Decompressor d)
|
||||
throws IOException {
|
||||
return new BlockDecompressorStream(in, d, getBufferSize(conf));
|
||||
return new BlockDecompressorStream(in, d, bufferSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -88,7 +94,6 @@ public class LzmaCodec implements Configurable, CompressionCodec {
|
|||
@Override
|
||||
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
|
||||
throws IOException {
|
||||
int bufferSize = getBufferSize(conf);
|
||||
return new BlockCompressorStream(out, c, bufferSize,
|
||||
CompressionUtil.compressionOverhead(bufferSize));
|
||||
}
|
||||
|
|
|
@ -50,9 +50,13 @@ public class ZstdCodec implements Configurable, CompressionCodec {
|
|||
public static final String ZSTD_DICTIONARY_KEY = "hbase.io.compress.zstd.dictionary";
|
||||
|
||||
private Configuration conf;
|
||||
private int bufferSize;
|
||||
private int level;
|
||||
private byte[] dictionary;
|
||||
|
||||
public ZstdCodec() {
|
||||
conf = new Configuration();
|
||||
init();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -63,16 +67,17 @@ public class ZstdCodec implements Configurable, CompressionCodec {
|
|||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
this.conf = conf;
|
||||
init();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Compressor createCompressor() {
|
||||
return new ZstdCompressor(getLevel(conf), getBufferSize(conf), getDictionary(conf));
|
||||
return new ZstdCompressor(level, bufferSize, dictionary);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Decompressor createDecompressor() {
|
||||
return new ZstdDecompressor(getBufferSize(conf), getDictionary(conf));
|
||||
return new ZstdDecompressor(bufferSize, dictionary);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -83,7 +88,7 @@ public class ZstdCodec implements Configurable, CompressionCodec {
|
|||
@Override
|
||||
public CompressionInputStream createInputStream(InputStream in, Decompressor d)
|
||||
throws IOException {
|
||||
return new BlockDecompressorStream(in, d, getBufferSize(conf));
|
||||
return new BlockDecompressorStream(in, d, bufferSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -94,7 +99,6 @@ public class ZstdCodec implements Configurable, CompressionCodec {
|
|||
@Override
|
||||
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
|
||||
throws IOException {
|
||||
int bufferSize = getBufferSize(conf);
|
||||
return new BlockCompressorStream(out, c, bufferSize,
|
||||
(int) Zstd.compressBound(bufferSize) - bufferSize); // overhead only
|
||||
}
|
||||
|
@ -154,4 +158,9 @@ public class ZstdCodec implements Configurable, CompressionCodec {
|
|||
return ByteBuffer.wrap(dictionary, 4, 4).order(ByteOrder.LITTLE_ENDIAN).getInt();
|
||||
}
|
||||
|
||||
private void init() {
|
||||
this.bufferSize = getBufferSize(conf);
|
||||
this.level = getLevel(conf);
|
||||
this.dictionary = getDictionary(conf);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue