From 5af02201d261c95dbc0c2f69ded23d372db1f223 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Thu, 7 Oct 2021 09:45:17 -0700 Subject: [PATCH] HBASE-26324 Reuse compressors and decompressors in WAL CompressionContext (#3728) Signed-off-by: Viraj Jasani --- .../regionserver/wal/CompressionContext.java | 20 +++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java index 82bad934393..bfb7f9a85a5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java @@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.io.BoundedDelegatingInputStream; import org.apache.hadoop.hbase.io.TagCompressionContext; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.util.Dictionary; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.Decompressor; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,6 +71,8 @@ public class CompressionContext { static final int IO_BUFFER_SIZE = 4096; private final Compression.Algorithm algorithm; + private Compressor compressor; + private Decompressor decompressor; private BoundedDelegatingInputStream lowerIn; private ByteArrayOutputStream lowerOut; private InputStream compressedIn; @@ -87,7 +91,10 @@ public class CompressionContext { if (compressedOut == null) { // Create the output streams here the first time around. lowerOut = new ByteArrayOutputStream(); - compressedOut = algorithm.createCompressionStream(lowerOut, algorithm.getCompressor(), + if (compressor == null) { + compressor = algorithm.getCompressor(); + } + compressedOut = algorithm.createCompressionStream(lowerOut, compressor, IO_BUFFER_SIZE); } else { lowerOut.reset(); @@ -107,7 +114,10 @@ public class CompressionContext { // Create the input streams here the first time around. if (compressedIn == null) { lowerIn = new BoundedDelegatingInputStream(in, inLength); - compressedIn = algorithm.createDecompressionStream(lowerIn, algorithm.getDecompressor(), + if (decompressor == null) { + decompressor = algorithm.getDecompressor(); + } + compressedIn = algorithm.createDecompressionStream(lowerIn, decompressor, IO_BUFFER_SIZE); } else { lowerIn.setDelegate(in, inLength); @@ -152,6 +162,12 @@ public class CompressionContext { } } lowerIn = null; + if (compressor != null) { + compressor.reset(); + } + if (decompressor != null) { + decompressor.reset(); + } } }