HBASE-27019 Minor compression performance improvements (#4420)

TRACE level logging is expensive enough to warrant removal. They were
useful during development but are now just overhead.

Also we unnecessarily create new compressor and decompressor instances
in the reset() methods for the Aircompressor and Lz4 codecs. Remove.

Signed-off-by: Viraj Jasani <vjasani@apache.org>
Signed-off-by: Xiaolin Ha <haxiaolin@apache.org>
This commit is contained in:
Andrew Purtell 2022-05-13 18:29:10 -07:00
parent 4ba62c82f8
commit 2e4d9a2adf
12 changed files with 18 additions and 248 deletions

View File

@ -24,8 +24,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.compress.CanReinit;
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Hadoop compressor glue for aircompressor compressors.
@ -34,7 +32,6 @@ import org.slf4j.LoggerFactory;
public abstract class HadoopCompressor<T extends Compressor>
implements CanReinit, org.apache.hadoop.io.compress.Compressor {
protected static final Logger LOG = LoggerFactory.getLogger(HadoopCompressor.class);
protected T compressor;
protected ByteBuffer inBuf, outBuf;
protected int bufferSize;
@ -56,7 +53,6 @@ public abstract class HadoopCompressor<T extends Compressor>
if (outBuf.hasRemaining()) {
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
outBuf.get(b, off, n);
LOG.trace("compress: read {} remaining bytes from outBuf", n);
return n;
}
// We don't actually begin compression until our caller calls finish().
@ -77,7 +73,6 @@ public abstract class HadoopCompressor<T extends Compressor>
} else {
if (outBuf.capacity() < needed) {
needed = CompressionUtil.roundInt2(needed);
LOG.trace("compress: resize outBuf {}", needed);
outBuf = ByteBuffer.allocate(needed);
} else {
outBuf.clear();
@ -89,42 +84,34 @@ public abstract class HadoopCompressor<T extends Compressor>
final int written = writeBuffer.position() - oldPos;
bytesWritten += written;
inBuf.clear();
LOG.trace("compress: compressed {} -> {}", uncompressed, written);
finished = true;
if (!direct) {
outBuf.flip();
int n = Math.min(written, len);
outBuf.get(b, off, n);
LOG.trace("compress: {} bytes", n);
return n;
} else {
LOG.trace("compress: {} bytes direct", written);
return written;
}
} else {
finished = true;
}
}
LOG.trace("No output");
return 0;
}
@Override
public void end() {
LOG.trace("end");
}
@Override
public void finish() {
LOG.trace("finish");
finish = true;
}
@Override
public boolean finished() {
boolean b = finished && !outBuf.hasRemaining();
LOG.trace("finished: {}", b);
return b;
return finished && !outBuf.hasRemaining();
}
@Override
@ -139,14 +126,11 @@ public abstract class HadoopCompressor<T extends Compressor>
@Override
public boolean needsInput() {
boolean b = !finished();
LOG.trace("needsInput: {}", b);
return b;
return !finished();
}
@Override
public void reinit(Configuration conf) {
LOG.trace("reinit");
if (conf != null) {
// Buffer size might have changed
int newBufferSize = getBufferSize(conf);
@ -159,15 +143,8 @@ public abstract class HadoopCompressor<T extends Compressor>
reset();
}
@SuppressWarnings("unchecked")
@Override
public void reset() {
LOG.trace("reset");
try {
compressor = (T) (compressor.getClass().getDeclaredConstructor().newInstance());
} catch (Exception e) {
throw new RuntimeException(e);
}
inBuf.clear();
outBuf.clear();
outBuf.position(outBuf.capacity());
@ -184,13 +161,11 @@ public abstract class HadoopCompressor<T extends Compressor>
@Override
public void setInput(byte[] b, int off, int len) {
LOG.trace("setInput: off={} len={}", off, len);
if (inBuf.remaining() < len) {
// Get a new buffer that can accomodate the accumulated input plus the additional
// input that would cause a buffer overflow without reallocation.
// This condition should be fortunately rare, because it is expensive.
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
LOG.trace("setInput: resize inBuf {}", needed);
ByteBuffer newBuf = ByteBuffer.allocate(needed);
inBuf.flip();
newBuf.put(inBuf);

View File

@ -22,8 +22,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Hadoop decompressor glue for aircompressor decompressors.
@ -32,7 +30,6 @@ import org.slf4j.LoggerFactory;
public class HadoopDecompressor<T extends Decompressor>
implements org.apache.hadoop.io.compress.Decompressor {
protected static final Logger LOG = LoggerFactory.getLogger(HadoopDecompressor.class);
protected T decompressor;
protected ByteBuffer inBuf, outBuf;
protected int inLen;
@ -50,7 +47,6 @@ public class HadoopDecompressor<T extends Decompressor>
if (outBuf.hasRemaining()) {
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
outBuf.get(b, off, n);
LOG.trace("decompress: read {} remaining bytes from outBuf", n);
return n;
}
if (inBuf.position() > 0) {
@ -63,50 +59,36 @@ public class HadoopDecompressor<T extends Decompressor>
inBuf.rewind();
inBuf.limit(inBuf.capacity());
final int written = outBuf.position();
LOG.trace("decompress: decompressed {} -> {}", remaining, written);
outBuf.flip();
int n = Math.min(written, len);
outBuf.get(b, off, n);
LOG.trace("decompress: {} bytes", n);
return n;
}
LOG.trace("decompress: No output, finished");
finished = true;
return 0;
}
@Override
public void end() {
LOG.trace("end");
}
@Override
public boolean finished() {
LOG.trace("finished");
return finished;
}
@Override
public int getRemaining() {
LOG.trace("getRemaining: {}", inLen);
return inLen;
}
@Override
public boolean needsDictionary() {
LOG.trace("needsDictionary");
return false;
}
@SuppressWarnings("unchecked")
@Override
public void reset() {
LOG.trace("reset");
try {
decompressor = (T) (decompressor.getClass().getDeclaredConstructor().newInstance());
} catch (Exception e) {
throw new RuntimeException(e);
}
inBuf.rewind();
inBuf.limit(inBuf.capacity());
inLen = 0;
@ -117,9 +99,7 @@ public class HadoopDecompressor<T extends Decompressor>
@Override
public boolean needsInput() {
boolean b = (inBuf.position() == 0);
LOG.trace("needsInput: {}", b);
return b;
return inBuf.position() == 0;
}
@Override
@ -129,13 +109,11 @@ public class HadoopDecompressor<T extends Decompressor>
@Override
public void setInput(byte[] b, int off, int len) {
LOG.trace("setInput: off={} len={}", off, len);
if (inBuf.remaining() < len) {
// Get a new buffer that can accomodate the accumulated input plus the additional
// input that would cause a buffer overflow without reallocation.
// This condition should be fortunately rare, because it is expensive.
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
LOG.trace("setInput: resize inBuf {}", needed);
ByteBuffer newBuf = ByteBuffer.allocate(needed);
inBuf.flip();
newBuf.put(inBuf);

View File

@ -27,8 +27,6 @@ import org.apache.hadoop.hbase.io.compress.CanReinit;
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Hadoop compressor glue for Brotli4j
@ -36,7 +34,6 @@ import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
public class BrotliCompressor implements CanReinit, Compressor {
protected static final Logger LOG = LoggerFactory.getLogger(BrotliCompressor.class);
protected ByteBuffer inBuf, outBuf;
protected int bufferSize;
protected boolean finish, finished;
@ -64,7 +61,6 @@ public class BrotliCompressor implements CanReinit, Compressor {
if (outBuf.hasRemaining()) {
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
outBuf.get(b, off, n);
LOG.trace("compress: read {} remaining bytes from outBuf", n);
return n;
}
// We don't actually begin compression until our caller calls finish().
@ -84,7 +80,6 @@ public class BrotliCompressor implements CanReinit, Compressor {
} else {
if (outBuf.capacity() < needed) {
needed = CompressionUtil.roundInt2(needed);
LOG.trace("compress: resize outBuf {}", needed);
outBuf = ByteBuffer.allocate(needed);
} else {
outBuf.clear();
@ -96,42 +91,34 @@ public class BrotliCompressor implements CanReinit, Compressor {
final int written = writeBuf.position() - oldPos;
bytesWritten += written;
inBuf.clear();
LOG.trace("compress: compressed {} -> {}", uncompressed, written);
finished = true;
if (!direct) {
outBuf.flip();
int n = Math.min(written, len);
outBuf.get(b, off, n);
LOG.trace("compress: {} bytes", n);
return n;
} else {
LOG.trace("compress: {} bytes direct", written);
return written;
}
} else {
finished = true;
}
}
LOG.trace("No output");
return 0;
}
@Override
public void end() {
LOG.trace("end");
}
@Override
public void finish() {
LOG.trace("finish");
finish = true;
}
@Override
public boolean finished() {
boolean b = finished && !outBuf.hasRemaining();
LOG.trace("finished: {}", b);
return b;
return finished && !outBuf.hasRemaining();
}
@Override
@ -146,14 +133,11 @@ public class BrotliCompressor implements CanReinit, Compressor {
@Override
public boolean needsInput() {
boolean b = !finished();
LOG.trace("needsInput: {}", b);
return b;
return !finished();
}
@Override
public void reinit(Configuration conf) {
LOG.trace("reinit");
if (conf != null) {
// Quality or window settings might have changed
params.setQuality(BrotliCodec.getLevel(conf));
@ -171,7 +155,6 @@ public class BrotliCompressor implements CanReinit, Compressor {
@Override
public void reset() {
LOG.trace("reset");
inBuf.clear();
outBuf.clear();
outBuf.position(outBuf.capacity());
@ -188,13 +171,11 @@ public class BrotliCompressor implements CanReinit, Compressor {
@Override
public void setInput(byte[] b, int off, int len) {
LOG.trace("setInput: off={} len={}", off, len);
if (inBuf.remaining() < len) {
// Get a new buffer that can accomodate the accumulated input plus the additional
// input that would cause a buffer overflow without reallocation.
// This condition should be fortunately rare, because it is expensive.
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
LOG.trace("setInput: resize inBuf {}", needed);
ByteBuffer newBuf = ByteBuffer.allocate(needed);
inBuf.flip();
newBuf.put(inBuf);

View File

@ -25,8 +25,6 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Hadoop decompressor glue for Brotli4j
@ -34,7 +32,6 @@ import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
public class BrotliDecompressor implements Decompressor {
protected static final Logger LOG = LoggerFactory.getLogger(BrotliDecompressor.class);
protected ByteBuffer inBuf, outBuf;
protected int inLen;
protected boolean finished;
@ -54,7 +51,6 @@ public class BrotliDecompressor implements Decompressor {
if (outBuf.hasRemaining()) {
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
outBuf.get(b, off, n);
LOG.trace("decompress: read {} remaining bytes from outBuf", n);
return n;
}
if (inBuf.position() > 0) {
@ -63,7 +59,6 @@ public class BrotliDecompressor implements Decompressor {
inLen -= remaining;
outBuf.rewind();
outBuf.limit(outBuf.capacity());
// TODO: More inefficient than it could be, but it doesn't impact decompression speed
// terribly and the brotli4j API alternatives do not seem to work correctly.
// Maybe something more clever can be done as a future improvement.
@ -72,47 +67,38 @@ public class BrotliDecompressor implements Decompressor {
DirectDecompress result = Decoder.decompress(inb);
outBuf.put(result.getDecompressedDataByteBuf().nioBuffer());
final int written = outBuf.position();
inBuf.rewind();
inBuf.limit(inBuf.capacity());
LOG.trace("decompress: decompressed {} -> {}", remaining, written);
outBuf.flip();
int n = Math.min(written, len);
outBuf.get(b, off, n);
LOG.trace("decompress: {} bytes", n);
return n;
}
LOG.trace("decompress: No output, finished");
finished = true;
return 0;
}
@Override
public void end() {
LOG.trace("end");
}
@Override
public boolean finished() {
LOG.trace("finished");
return finished;
}
@Override
public int getRemaining() {
LOG.trace("getRemaining: {}", inLen);
return inLen;
}
@Override
public boolean needsDictionary() {
LOG.trace("needsDictionary");
return false;
}
@Override
public void reset() {
LOG.trace("reset");
inBuf.clear();
inLen = 0;
outBuf.clear();
@ -122,9 +108,7 @@ public class BrotliDecompressor implements Decompressor {
@Override
public boolean needsInput() {
boolean b = (inBuf.position() == 0);
LOG.trace("needsInput: {}", b);
return b;
return inBuf.position() == 0;
}
@Override
@ -134,13 +118,11 @@ public class BrotliDecompressor implements Decompressor {
@Override
public void setInput(byte[] b, int off, int len) {
LOG.trace("setInput: off={} len={}", off, len);
if (inBuf.remaining() < len) {
// Get a new buffer that can accomodate the accumulated input plus the additional
// input that would cause a buffer overflow without reallocation.
// This condition should be fortunately rare, because it is expensive.
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
LOG.trace("setInput: resize inBuf {}", needed);
ByteBuffer newBuf = ByteBuffer.allocate(needed);
inBuf.flip();
newBuf.put(inBuf);

View File

@ -26,8 +26,6 @@ import org.apache.hadoop.hbase.io.compress.CanReinit;
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Hadoop compressor glue for lz4-java.
@ -35,7 +33,6 @@ import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
public class Lz4Compressor implements CanReinit, Compressor {
protected static final Logger LOG = LoggerFactory.getLogger(Lz4Compressor.class);
protected LZ4Compressor compressor;
protected ByteBuffer inBuf, outBuf;
protected int bufferSize;
@ -57,7 +54,6 @@ public class Lz4Compressor implements CanReinit, Compressor {
if (outBuf.hasRemaining()) {
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
outBuf.get(b, off, n);
LOG.trace("compress: read {} remaining bytes from outBuf", n);
return n;
}
// We don't actually begin compression until our caller calls finish().
@ -77,7 +73,6 @@ public class Lz4Compressor implements CanReinit, Compressor {
// allocate a new one which does.
if (outBuf.capacity() < needed) {
needed = CompressionUtil.roundInt2(needed);
LOG.trace("compress: resize outBuf {}", needed);
outBuf = ByteBuffer.allocate(needed);
} else {
outBuf.clear();
@ -89,42 +84,34 @@ public class Lz4Compressor implements CanReinit, Compressor {
final int written = writeBuffer.position() - oldPos;
bytesWritten += written;
inBuf.clear();
LOG.trace("compress: compressed {} -> {}", uncompressed, written);
finished = true;
if (!direct) {
outBuf.flip();
int n = Math.min(written, len);
outBuf.get(b, off, n);
LOG.trace("compress: {} bytes", n);
return n;
} else {
LOG.trace("compress: {} bytes direct", written);
return written;
}
} else {
finished = true;
}
}
LOG.trace("No output");
return 0;
}
@Override
public void end() {
LOG.trace("end");
}
@Override
public void finish() {
LOG.trace("finish");
finish = true;
}
@Override
public boolean finished() {
boolean b = finished && !outBuf.hasRemaining();
LOG.trace("finished: {}", b);
return b;
return finished && !outBuf.hasRemaining();
}
@Override
@ -139,14 +126,11 @@ public class Lz4Compressor implements CanReinit, Compressor {
@Override
public boolean needsInput() {
boolean b = !finished();
LOG.trace("needsInput: {}", b);
return b;
return !finished();
}
@Override
public void reinit(Configuration conf) {
LOG.trace("reinit");
if (conf != null) {
// Buffer size might have changed
int newBufferSize = Lz4Codec.getBufferSize(conf);
@ -161,8 +145,6 @@ public class Lz4Compressor implements CanReinit, Compressor {
@Override
public void reset() {
LOG.trace("reset");
compressor = LZ4Factory.fastestInstance().fastCompressor();
inBuf.clear();
outBuf.clear();
outBuf.position(outBuf.capacity());
@ -179,13 +161,11 @@ public class Lz4Compressor implements CanReinit, Compressor {
@Override
public void setInput(byte[] b, int off, int len) {
LOG.trace("setInput: off={} len={}", off, len);
if (inBuf.remaining() < len) {
// Get a new buffer that can accomodate the accumulated input plus the additional
// input that would cause a buffer overflow without reallocation.
// This condition should be fortunately rare, because it is expensive.
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
LOG.trace("setInput: resize inBuf {}", needed);
ByteBuffer newBuf = ByteBuffer.allocate(needed);
inBuf.flip();
newBuf.put(inBuf);

View File

@ -24,8 +24,6 @@ import net.jpountz.lz4.LZ4SafeDecompressor;
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Hadoop decompressor glue for lz4-java.
@ -33,7 +31,6 @@ import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
public class Lz4Decompressor implements Decompressor {
protected static final Logger LOG = LoggerFactory.getLogger(Lz4Decompressor.class);
protected LZ4SafeDecompressor decompressor;
protected ByteBuffer inBuf, outBuf;
protected int bufferSize, inLen;
@ -52,7 +49,6 @@ public class Lz4Decompressor implements Decompressor {
if (outBuf.hasRemaining()) {
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
outBuf.get(b, off, n);
LOG.trace("decompress: read {} remaining bytes from outBuf", n);
return n;
}
if (inBuf.position() > 0) {
@ -63,45 +59,36 @@ public class Lz4Decompressor implements Decompressor {
decompressor.decompress(inBuf, outBuf);
inBuf.clear();
final int written = outBuf.position();
LOG.trace("decompress: decompressed {} -> {}", remaining, written);
outBuf.flip();
int n = Math.min(written, len);
outBuf.get(b, off, n);
LOG.trace("decompress: {} bytes", n);
return n;
}
LOG.trace("decompress: No output, finished");
finished = true;
return 0;
}
@Override
public void end() {
LOG.trace("end");
}
@Override
public boolean finished() {
LOG.trace("finished");
return finished;
}
@Override
public int getRemaining() {
LOG.trace("getRemaining: {}", inLen);
return inLen;
}
@Override
public boolean needsDictionary() {
LOG.trace("needsDictionary");
return false;
}
@Override
public void reset() {
LOG.trace("reset");
this.decompressor = LZ4Factory.fastestInstance().safeDecompressor();
inBuf.clear();
inLen = 0;
outBuf.clear();
@ -111,9 +98,7 @@ public class Lz4Decompressor implements Decompressor {
@Override
public boolean needsInput() {
boolean b = (inBuf.position() == 0);
LOG.trace("needsInput: {}", b);
return b;
return inBuf.position() == 0;
}
@Override
@ -123,13 +108,11 @@ public class Lz4Decompressor implements Decompressor {
@Override
public void setInput(byte[] b, int off, int len) {
LOG.trace("setInput: off={} len={}", off, len);
if (inBuf.remaining() < len) {
// Get a new buffer that can accomodate the accumulated input plus the additional
// input that would cause a buffer overflow without reallocation.
// This condition should be fortunately rare, because it is expensive.
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
LOG.trace("setInput: resize inBuf {}", needed);
ByteBuffer newBuf = ByteBuffer.allocate(needed);
inBuf.flip();
newBuf.put(inBuf);

View File

@ -24,8 +24,6 @@ import org.apache.hadoop.hbase.io.compress.CanReinit;
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xerial.snappy.Snappy;
/**
@ -34,7 +32,6 @@ import org.xerial.snappy.Snappy;
@InterfaceAudience.Private
public class SnappyCompressor implements CanReinit, Compressor {
protected static final Logger LOG = LoggerFactory.getLogger(SnappyCompressor.class);
protected ByteBuffer inBuf, outBuf;
protected int bufferSize;
protected boolean finish, finished;
@ -54,7 +51,6 @@ public class SnappyCompressor implements CanReinit, Compressor {
if (outBuf.hasRemaining()) {
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
outBuf.get(b, off, n);
LOG.trace("compress: read {} remaining bytes from outBuf", n);
return n;
}
// We don't actually begin compression until our caller calls finish().
@ -67,7 +63,6 @@ public class SnappyCompressor implements CanReinit, Compressor {
int needed = maxCompressedLength(uncompressed);
if (outBuf.capacity() < needed) {
needed = CompressionUtil.roundInt2(needed);
LOG.trace("setInput: resize inBuf {}", needed);
outBuf = ByteBuffer.allocateDirect(needed);
} else {
outBuf.clear();
@ -75,36 +70,29 @@ public class SnappyCompressor implements CanReinit, Compressor {
int written = Snappy.compress(inBuf, outBuf);
bytesWritten += written;
inBuf.clear();
LOG.trace("compress: compressed {} -> {}", uncompressed, written);
finished = true;
int n = Math.min(written, len);
outBuf.get(b, off, n);
LOG.trace("compress: {} bytes", n);
return n;
} else {
finished = true;
}
}
LOG.trace("No output");
return 0;
}
@Override
public void end() {
LOG.trace("end");
}
@Override
public void finish() {
LOG.trace("finish");
finish = true;
}
@Override
public boolean finished() {
boolean b = finished && !outBuf.hasRemaining();
LOG.trace("finished: {}", b);
return b;
return finished && !outBuf.hasRemaining();
}
@Override
@ -119,14 +107,11 @@ public class SnappyCompressor implements CanReinit, Compressor {
@Override
public boolean needsInput() {
boolean b = !finished();
LOG.trace("needsInput: {}", b);
return b;
return !finished();
}
@Override
public void reinit(Configuration conf) {
LOG.trace("reinit");
if (conf != null) {
// Buffer size might have changed
int newBufferSize = SnappyCodec.getBufferSize(conf);
@ -141,7 +126,6 @@ public class SnappyCompressor implements CanReinit, Compressor {
@Override
public void reset() {
LOG.trace("reset");
inBuf.clear();
outBuf.clear();
outBuf.position(outBuf.capacity());
@ -158,13 +142,11 @@ public class SnappyCompressor implements CanReinit, Compressor {
@Override
public void setInput(byte[] b, int off, int len) {
LOG.trace("setInput: off={} len={}", off, len);
if (inBuf.remaining() < len) {
// Get a new buffer that can accomodate the accumulated input plus the additional
// input that would cause a buffer overflow without reallocation.
// This condition should be fortunately rare, because it is expensive.
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
LOG.trace("setInput: resize inBuf {}", needed);
ByteBuffer newBuf = ByteBuffer.allocateDirect(needed);
inBuf.flip();
newBuf.put(inBuf);

View File

@ -22,8 +22,6 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xerial.snappy.Snappy;
/**
@ -32,7 +30,6 @@ import org.xerial.snappy.Snappy;
@InterfaceAudience.Private
public class SnappyDecompressor implements Decompressor {
protected static final Logger LOG = LoggerFactory.getLogger(SnappyDecompressor.class);
protected ByteBuffer inBuf, outBuf;
protected int inLen;
protected boolean finished;
@ -48,7 +45,6 @@ public class SnappyDecompressor implements Decompressor {
if (outBuf.hasRemaining()) {
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
outBuf.get(b, off, n);
LOG.trace("decompress: read {} remaining bytes from outBuf", n);
return n;
}
if (inBuf.position() > 0) {
@ -58,43 +54,35 @@ public class SnappyDecompressor implements Decompressor {
outBuf.clear();
int written = Snappy.uncompress(inBuf, outBuf);
inBuf.clear();
LOG.trace("decompress: decompressed {} -> {}", remaining, written);
int n = Math.min(written, len);
outBuf.get(b, off, n);
LOG.trace("decompress: {} bytes", n);
return n;
}
LOG.trace("decompress: No output, finished");
finished = true;
return 0;
}
@Override
public void end() {
LOG.trace("end");
}
@Override
public boolean finished() {
LOG.trace("finished");
return finished;
}
@Override
public int getRemaining() {
LOG.trace("getRemaining: {}", inLen);
return inLen;
}
@Override
public boolean needsDictionary() {
LOG.trace("needsDictionary");
return false;
}
@Override
public void reset() {
LOG.trace("reset");
inBuf.clear();
inLen = 0;
outBuf.clear();
@ -104,9 +92,7 @@ public class SnappyDecompressor implements Decompressor {
@Override
public boolean needsInput() {
boolean b = (inBuf.position() == 0);
LOG.trace("needsInput: {}", b);
return b;
return inBuf.position() == 0;
}
@Override
@ -116,13 +102,11 @@ public class SnappyDecompressor implements Decompressor {
@Override
public void setInput(byte[] b, int off, int len) {
LOG.trace("setInput: off={} len={}", off, len);
if (inBuf.remaining() < len) {
// Get a new buffer that can accomodate the accumulated input plus the additional
// input that would cause a buffer overflow without reallocation.
// This condition should be fortunately rare, because it is expensive.
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
LOG.trace("setInput: resize inBuf {}", needed);
ByteBuffer newBuf = ByteBuffer.allocateDirect(needed);
inBuf.flip();
newBuf.put(inBuf);

View File

@ -25,8 +25,6 @@ import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tukaani.xz.ArrayCache;
import org.tukaani.xz.BasicArrayCache;
import org.tukaani.xz.LZMA2Options;
@ -39,7 +37,6 @@ import org.tukaani.xz.UnsupportedOptionsException;
@InterfaceAudience.Private
public class LzmaCompressor implements Compressor {
protected static final Logger LOG = LoggerFactory.getLogger(LzmaCompressor.class);
protected static final ArrayCache ARRAY_CACHE = new BasicArrayCache();
protected ByteBuffer inBuf;
protected ByteBuffer outBuf;
@ -68,7 +65,6 @@ public class LzmaCompressor implements Compressor {
if (outBuf.hasRemaining()) {
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
outBuf.get(b, off, n);
LOG.trace("compress: read {} remaining bytes from outBuf", n);
return n;
}
// We don't actually begin compression until our caller calls finish().
@ -88,7 +84,6 @@ public class LzmaCompressor implements Compressor {
} else {
if (outBuf.capacity() < needed) {
needed = CompressionUtil.roundInt2(needed);
LOG.trace("compress: resize outBuf {}", needed);
outBuf = ByteBuffer.allocate(needed);
} else {
outBuf.clear();
@ -119,42 +114,34 @@ public class LzmaCompressor implements Compressor {
int written = writeBuffer.position() - oldPos;
bytesWritten += written;
inBuf.clear();
LOG.trace("compress: compressed {} -> {}", uncompressed, written);
finished = true;
outBuf.flip();
if (!direct) {
int n = Math.min(written, len);
outBuf.get(b, off, n);
LOG.trace("compress: {} bytes", n);
return n;
} else {
LOG.trace("compress: {} bytes direct", written);
return written;
}
} else {
finished = true;
}
}
LOG.trace("No output");
return 0;
}
@Override
public void end() {
LOG.trace("end");
}
@Override
public void finish() {
LOG.trace("finish");
finish = true;
}
@Override
public boolean finished() {
boolean b = finished && !outBuf.hasRemaining();
LOG.trace("finished: {}", b);
return b;
return finished && !outBuf.hasRemaining();
}
@Override
@ -169,14 +156,11 @@ public class LzmaCompressor implements Compressor {
@Override
public boolean needsInput() {
boolean b = !finished();
LOG.trace("needsInput: {}", b);
return b;
return !finished();
}
@Override
public void reinit(Configuration conf) {
LOG.trace("reinit");
if (conf != null) {
// Level might have changed
try {
@ -199,7 +183,6 @@ public class LzmaCompressor implements Compressor {
@Override
public void reset() {
LOG.trace("reset");
inBuf.clear();
outBuf.clear();
outBuf.position(outBuf.capacity());
@ -216,13 +199,11 @@ public class LzmaCompressor implements Compressor {
@Override
public void setInput(byte[] b, int off, int len) {
LOG.trace("setInput: off={} len={}", off, len);
if (inBuf.remaining() < len) {
// Get a new buffer that can accomodate the accumulated input plus the additional
// input that would cause a buffer overflow without reallocation.
// This condition should be fortunately rare, because it is expensive.
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
LOG.trace("setInput: resize inBuf to {}", needed);
ByteBuffer newBuf = ByteBuffer.allocate(needed);
inBuf.flip();
newBuf.put(inBuf);

View File

@ -23,8 +23,6 @@ import org.apache.hadoop.hbase.io.ByteBufferInputStream;
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tukaani.xz.ArrayCache;
import org.tukaani.xz.BasicArrayCache;
import org.tukaani.xz.LZMAInputStream;
@ -35,7 +33,6 @@ import org.tukaani.xz.LZMAInputStream;
@InterfaceAudience.Private
public class LzmaDecompressor implements Decompressor {
protected static final Logger LOG = LoggerFactory.getLogger(LzmaDecompressor.class);
protected static final ArrayCache ARRAY_CACHE = new BasicArrayCache() {
@Override
public byte[] getByteArray(int size, boolean fillWithZeros) {
@ -59,7 +56,6 @@ public class LzmaDecompressor implements Decompressor {
if (outBuf.hasRemaining()) {
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
outBuf.get(b, off, n);
LOG.trace("decompress: read {} remaining bytes from outBuf", n);
return n;
}
if (inBuf.position() > 0) {
@ -89,43 +85,35 @@ public class LzmaDecompressor implements Decompressor {
int written = outBuf.position();
outBuf.flip();
inBuf.clear();
LOG.trace("decompress: decompressed {} -> {}", remaining, written);
int n = Math.min(written, len);
outBuf.get(b, off, n);
LOG.trace("decompress: {} bytes", n);
return n;
}
LOG.trace("decompress: No output, finished");
finished = true;
return 0;
}
@Override
public void end() {
LOG.trace("end");
}
@Override
public boolean finished() {
LOG.trace("finished");
return finished;
}
@Override
public int getRemaining() {
LOG.trace("getRemaining: {}", inLen);
return inLen;
}
@Override
public boolean needsDictionary() {
LOG.trace("needsDictionary");
return false;
}
@Override
public void reset() {
LOG.trace("reset");
inBuf.clear();
inLen = 0;
outBuf.clear();
@ -135,9 +123,7 @@ public class LzmaDecompressor implements Decompressor {
@Override
public boolean needsInput() {
boolean b = (inBuf.position() == 0);
LOG.trace("needsInput: {}", b);
return b;
return inBuf.position() == 0;
}
@Override
@ -147,13 +133,11 @@ public class LzmaDecompressor implements Decompressor {
@Override
public void setInput(byte[] b, int off, int len) {
LOG.trace("setInput: off={} len={}", off, len);
if (inBuf.remaining() < len) {
// Get a new buffer that can accomodate the accumulated input plus the additional
// input that would cause a buffer overflow without reallocation.
// This condition should be fortunately rare, because it is expensive.
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
LOG.trace("setInput: resize inBuf {}", needed);
ByteBuffer newBuf = ByteBuffer.allocate(needed);
inBuf.flip();
newBuf.put(inBuf);

View File

@ -26,8 +26,6 @@ import org.apache.hadoop.hbase.io.compress.CanReinit;
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Hadoop compressor glue for zstd-jni.
@ -35,7 +33,6 @@ import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
public class ZstdCompressor implements CanReinit, Compressor {
protected static final Logger LOG = LoggerFactory.getLogger(ZstdCompressor.class);
protected int level, bufferSize;
protected ByteBuffer inBuf, outBuf;
protected boolean finish, finished;
@ -66,7 +63,6 @@ public class ZstdCompressor implements CanReinit, Compressor {
if (outBuf.hasRemaining()) {
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
outBuf.get(b, off, n);
LOG.trace("compress: read {} remaining bytes from outBuf", n);
return n;
}
// We don't actually begin compression until our caller calls finish().
@ -79,7 +75,6 @@ public class ZstdCompressor implements CanReinit, Compressor {
int needed = maxCompressedLength(uncompressed);
if (outBuf.capacity() < needed) {
needed = CompressionUtil.roundInt2(needed);
LOG.trace("compress: resize outBuf {}", needed);
outBuf = ByteBuffer.allocateDirect(needed);
} else {
outBuf.clear();
@ -92,37 +87,30 @@ public class ZstdCompressor implements CanReinit, Compressor {
}
bytesWritten += written;
inBuf.clear();
LOG.trace("compress: compressed {} -> {} (level {})", uncompressed, written, level);
finished = true;
outBuf.flip();
int n = Math.min(written, len);
outBuf.get(b, off, n);
LOG.trace("compress: {} bytes", n);
return n;
} else {
finished = true;
}
}
LOG.trace("No output");
return 0;
}
@Override
public void end() {
LOG.trace("end");
}
@Override
public void finish() {
LOG.trace("finish");
finish = true;
}
@Override
public boolean finished() {
boolean b = finished && !outBuf.hasRemaining();
LOG.trace("finished: {}", b);
return b;
return finished && !outBuf.hasRemaining();
}
@Override
@ -137,20 +125,16 @@ public class ZstdCompressor implements CanReinit, Compressor {
@Override
public boolean needsInput() {
boolean b = !finished();
LOG.trace("needsInput: {}", b);
return b;
return !finished();
}
@Override
public void reinit(final Configuration conf) {
LOG.trace("reinit");
if (conf != null) {
// Level might have changed
boolean levelChanged = false;
int newLevel = ZstdCodec.getLevel(conf);
if (level != newLevel) {
LOG.trace("Level changed, was {} now {}", level, newLevel);
level = newLevel;
levelChanged = true;
}
@ -162,7 +146,6 @@ public class ZstdCompressor implements CanReinit, Compressor {
if (dict == null || dictId != thisDictId || levelChanged) {
dictId = thisDictId;
dict = new ZstdDictCompress(b, level);
LOG.trace("Reloaded dictionary, new id is {}", dictId);
}
} else {
dict = null;
@ -173,7 +156,6 @@ public class ZstdCompressor implements CanReinit, Compressor {
bufferSize = newBufferSize;
this.inBuf = ByteBuffer.allocateDirect(bufferSize);
this.outBuf = ByteBuffer.allocateDirect(bufferSize);
LOG.trace("Resized buffers, new size is {}", bufferSize);
}
}
reset();
@ -181,7 +163,6 @@ public class ZstdCompressor implements CanReinit, Compressor {
@Override
public void reset() {
LOG.trace("reset");
inBuf.clear();
outBuf.clear();
outBuf.position(outBuf.capacity());
@ -198,13 +179,11 @@ public class ZstdCompressor implements CanReinit, Compressor {
@Override
public void setInput(final byte[] b, final int off, final int len) {
LOG.trace("setInput: off={} len={}", off, len);
if (inBuf.remaining() < len) {
// Get a new buffer that can accomodate the accumulated input plus the additional
// input that would cause a buffer overflow without reallocation.
// This condition should be fortunately rare, because it is expensive.
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
LOG.trace("setInput: resize inBuf {}", needed);
ByteBuffer newBuf = ByteBuffer.allocateDirect(needed);
inBuf.flip();
newBuf.put(inBuf);

View File

@ -26,8 +26,6 @@ import org.apache.hadoop.hbase.io.compress.CanReinit;
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Hadoop decompressor glue for zstd-java.
@ -35,7 +33,6 @@ import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
public class ZstdDecompressor implements CanReinit, Decompressor {
protected static final Logger LOG = LoggerFactory.getLogger(ZstdDecompressor.class);
protected ByteBuffer inBuf, outBuf;
protected int bufferSize;
protected int inLen;
@ -63,7 +60,6 @@ public class ZstdDecompressor implements CanReinit, Decompressor {
if (outBuf.hasRemaining()) {
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
outBuf.get(b, off, n);
LOG.trace("decompress: read {} remaining bytes from outBuf", n);
return n;
}
if (inBuf.position() > 0) {
@ -78,44 +74,36 @@ public class ZstdDecompressor implements CanReinit, Decompressor {
written = Zstd.decompress(outBuf, inBuf);
}
inBuf.clear();
LOG.trace("decompress: decompressed {} -> {}", remaining, written);
outBuf.flip();
int n = Math.min(written, len);
outBuf.get(b, off, n);
LOG.trace("decompress: {} bytes", n);
return n;
}
LOG.trace("decompress: No output, finished");
finished = true;
return 0;
}
@Override
public void end() {
LOG.trace("end");
}
@Override
public boolean finished() {
LOG.trace("finished");
return finished;
}
@Override
public int getRemaining() {
LOG.trace("getRemaining: {}", inLen);
return inLen;
}
@Override
public boolean needsDictionary() {
LOG.trace("needsDictionary");
return false;
}
@Override
public void reset() {
LOG.trace("reset");
inBuf.clear();
inLen = 0;
outBuf.clear();
@ -125,9 +113,7 @@ public class ZstdDecompressor implements CanReinit, Decompressor {
@Override
public boolean needsInput() {
final boolean b = (inBuf.position() == 0);
LOG.trace("needsInput: {}", b);
return b;
return (inBuf.position() == 0);
}
@Override
@ -137,13 +123,11 @@ public class ZstdDecompressor implements CanReinit, Decompressor {
@Override
public void setInput(final byte[] b, final int off, final int len) {
LOG.trace("setInput: off={} len={}", off, len);
if (inBuf.remaining() < len) {
// Get a new buffer that can accomodate the accumulated input plus the additional
// input that would cause a buffer overflow without reallocation.
// This condition should be fortunately rare, because it is expensive.
final int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
LOG.trace("setInput: resize inBuf {}", needed);
ByteBuffer newBuf = ByteBuffer.allocateDirect(needed);
inBuf.flip();
newBuf.put(inBuf);
@ -156,7 +140,6 @@ public class ZstdDecompressor implements CanReinit, Decompressor {
@Override
public void reinit(final Configuration conf) {
LOG.trace("reinit");
if (conf != null) {
// Dictionary may have changed
byte[] b = ZstdCodec.getDictionary(conf);
@ -166,7 +149,6 @@ public class ZstdDecompressor implements CanReinit, Decompressor {
if (dict == null || dictId != thisDictId) {
dictId = thisDictId;
dict = new ZstdDictDecompress(b);
LOG.trace("Reloaded dictionary, new id is {}", dictId);
}
} else {
dict = null;
@ -177,7 +159,6 @@ public class ZstdDecompressor implements CanReinit, Decompressor {
bufferSize = newBufferSize;
this.inBuf = ByteBuffer.allocateDirect(bufferSize);
this.outBuf = ByteBuffer.allocateDirect(bufferSize);
LOG.trace("Resized buffers, new size is {}", bufferSize);
}
}
reset();