HBASE-26324 Reuse compressors and decompressors in WAL CompressionContext (#3728)

Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
Andrew Purtell 2021-10-07 09:45:17 -07:00
parent e4b56fee72
commit 5af02201d2
1 changed files with 18 additions and 2 deletions

View File

@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.io.BoundedDelegatingInputStream;
import org.apache.hadoop.hbase.io.TagCompressionContext; import org.apache.hadoop.hbase.io.TagCompressionContext;
import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.util.Dictionary; import org.apache.hadoop.hbase.io.util.Dictionary;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -69,6 +71,8 @@ public class CompressionContext {
static final int IO_BUFFER_SIZE = 4096; static final int IO_BUFFER_SIZE = 4096;
private final Compression.Algorithm algorithm; private final Compression.Algorithm algorithm;
private Compressor compressor;
private Decompressor decompressor;
private BoundedDelegatingInputStream lowerIn; private BoundedDelegatingInputStream lowerIn;
private ByteArrayOutputStream lowerOut; private ByteArrayOutputStream lowerOut;
private InputStream compressedIn; private InputStream compressedIn;
@ -87,7 +91,10 @@ public class CompressionContext {
if (compressedOut == null) { if (compressedOut == null) {
// Create the output streams here the first time around. // Create the output streams here the first time around.
lowerOut = new ByteArrayOutputStream(); lowerOut = new ByteArrayOutputStream();
compressedOut = algorithm.createCompressionStream(lowerOut, algorithm.getCompressor(), if (compressor == null) {
compressor = algorithm.getCompressor();
}
compressedOut = algorithm.createCompressionStream(lowerOut, compressor,
IO_BUFFER_SIZE); IO_BUFFER_SIZE);
} else { } else {
lowerOut.reset(); lowerOut.reset();
@ -107,7 +114,10 @@ public class CompressionContext {
// Create the input streams here the first time around. // Create the input streams here the first time around.
if (compressedIn == null) { if (compressedIn == null) {
lowerIn = new BoundedDelegatingInputStream(in, inLength); lowerIn = new BoundedDelegatingInputStream(in, inLength);
compressedIn = algorithm.createDecompressionStream(lowerIn, algorithm.getDecompressor(), if (decompressor == null) {
decompressor = algorithm.getDecompressor();
}
compressedIn = algorithm.createDecompressionStream(lowerIn, decompressor,
IO_BUFFER_SIZE); IO_BUFFER_SIZE);
} else { } else {
lowerIn.setDelegate(in, inLength); lowerIn.setDelegate(in, inLength);
@ -152,6 +162,12 @@ public class CompressionContext {
} }
} }
lowerIn = null; lowerIn = null;
if (compressor != null) {
compressor.reset();
}
if (decompressor != null) {
decompressor.reset();
}
} }
} }