HBASE-26324 Reuse compressors and decompressors in WAL CompressionContext (#3728)

Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
Andrew Purtell 2021-10-07 09:45:17 -07:00 committed by GitHub
parent 7ae71e880d
commit 39a20c528e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 18 additions and 2 deletions

View File

@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.io.BoundedDelegatingInputStream;
import org.apache.hadoop.hbase.io.TagCompressionContext;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.util.Dictionary;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -69,6 +71,8 @@ public class CompressionContext {
static final int IO_BUFFER_SIZE = 4096;
private final Compression.Algorithm algorithm;
private Compressor compressor;
private Decompressor decompressor;
private BoundedDelegatingInputStream lowerIn;
private ByteArrayOutputStream lowerOut;
private InputStream compressedIn;
@ -87,7 +91,10 @@ public class CompressionContext {
if (compressedOut == null) {
// Create the output streams here the first time around.
lowerOut = new ByteArrayOutputStream();
compressedOut = algorithm.createCompressionStream(lowerOut, algorithm.getCompressor(),
if (compressor == null) {
compressor = algorithm.getCompressor();
}
compressedOut = algorithm.createCompressionStream(lowerOut, compressor,
IO_BUFFER_SIZE);
} else {
lowerOut.reset();
@ -107,7 +114,10 @@ public class CompressionContext {
// Create the input streams here the first time around.
if (compressedIn == null) {
lowerIn = new BoundedDelegatingInputStream(in, inLength);
compressedIn = algorithm.createDecompressionStream(lowerIn, algorithm.getDecompressor(),
if (decompressor == null) {
decompressor = algorithm.getDecompressor();
}
compressedIn = algorithm.createDecompressionStream(lowerIn, decompressor,
IO_BUFFER_SIZE);
} else {
lowerIn.setDelegate(in, inLength);
@ -152,6 +162,12 @@ public class CompressionContext {
}
}
lowerIn = null;
if (compressor != null) {
compressor.reset();
}
if (decompressor != null) {
decompressor.reset();
}
}
}