HBASE-27019 Minor compression performance improvements (#4420)
TRACE level logging is expensive enough to warrant removal. They were useful during development but are now just overhead. Also we unnecessarily create new compressor and decompressor instances in the reset() methods for the Aircompressor and Lz4 codecs. Remove. Signed-off-by: Viraj Jasani <vjasani@apache.org> Signed-off-by: Xiaolin Ha <haxiaolin@apache.org>
This commit is contained in:
parent
7bc7eb5f48
commit
2efaa7d221
|
@ -24,8 +24,6 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.io.compress.CanReinit;
|
import org.apache.hadoop.hbase.io.compress.CanReinit;
|
||||||
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Hadoop compressor glue for aircompressor compressors.
|
* Hadoop compressor glue for aircompressor compressors.
|
||||||
|
@ -34,7 +32,6 @@ import org.slf4j.LoggerFactory;
|
||||||
public abstract class HadoopCompressor<T extends Compressor>
|
public abstract class HadoopCompressor<T extends Compressor>
|
||||||
implements CanReinit, org.apache.hadoop.io.compress.Compressor {
|
implements CanReinit, org.apache.hadoop.io.compress.Compressor {
|
||||||
|
|
||||||
protected static final Logger LOG = LoggerFactory.getLogger(HadoopCompressor.class);
|
|
||||||
protected T compressor;
|
protected T compressor;
|
||||||
protected ByteBuffer inBuf, outBuf;
|
protected ByteBuffer inBuf, outBuf;
|
||||||
protected int bufferSize;
|
protected int bufferSize;
|
||||||
|
@ -56,7 +53,6 @@ public abstract class HadoopCompressor<T extends Compressor>
|
||||||
if (outBuf.hasRemaining()) {
|
if (outBuf.hasRemaining()) {
|
||||||
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
||||||
outBuf.get(b, off, n);
|
outBuf.get(b, off, n);
|
||||||
LOG.trace("compress: read {} remaining bytes from outBuf", n);
|
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
// We don't actually begin compression until our caller calls finish().
|
// We don't actually begin compression until our caller calls finish().
|
||||||
|
@ -77,7 +73,6 @@ public abstract class HadoopCompressor<T extends Compressor>
|
||||||
} else {
|
} else {
|
||||||
if (outBuf.capacity() < needed) {
|
if (outBuf.capacity() < needed) {
|
||||||
needed = CompressionUtil.roundInt2(needed);
|
needed = CompressionUtil.roundInt2(needed);
|
||||||
LOG.trace("compress: resize outBuf {}", needed);
|
|
||||||
outBuf = ByteBuffer.allocate(needed);
|
outBuf = ByteBuffer.allocate(needed);
|
||||||
} else {
|
} else {
|
||||||
outBuf.clear();
|
outBuf.clear();
|
||||||
|
@ -89,42 +84,34 @@ public abstract class HadoopCompressor<T extends Compressor>
|
||||||
final int written = writeBuffer.position() - oldPos;
|
final int written = writeBuffer.position() - oldPos;
|
||||||
bytesWritten += written;
|
bytesWritten += written;
|
||||||
inBuf.clear();
|
inBuf.clear();
|
||||||
LOG.trace("compress: compressed {} -> {}", uncompressed, written);
|
|
||||||
finished = true;
|
finished = true;
|
||||||
if (!direct) {
|
if (!direct) {
|
||||||
outBuf.flip();
|
outBuf.flip();
|
||||||
int n = Math.min(written, len);
|
int n = Math.min(written, len);
|
||||||
outBuf.get(b, off, n);
|
outBuf.get(b, off, n);
|
||||||
LOG.trace("compress: {} bytes", n);
|
|
||||||
return n;
|
return n;
|
||||||
} else {
|
} else {
|
||||||
LOG.trace("compress: {} bytes direct", written);
|
|
||||||
return written;
|
return written;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
finished = true;
|
finished = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
LOG.trace("No output");
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void end() {
|
public void end() {
|
||||||
LOG.trace("end");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void finish() {
|
public void finish() {
|
||||||
LOG.trace("finish");
|
|
||||||
finish = true;
|
finish = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean finished() {
|
public boolean finished() {
|
||||||
boolean b = finished && !outBuf.hasRemaining();
|
return finished && !outBuf.hasRemaining();
|
||||||
LOG.trace("finished: {}", b);
|
|
||||||
return b;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -139,14 +126,11 @@ public abstract class HadoopCompressor<T extends Compressor>
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean needsInput() {
|
public boolean needsInput() {
|
||||||
boolean b = !finished();
|
return !finished();
|
||||||
LOG.trace("needsInput: {}", b);
|
|
||||||
return b;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void reinit(Configuration conf) {
|
public void reinit(Configuration conf) {
|
||||||
LOG.trace("reinit");
|
|
||||||
if (conf != null) {
|
if (conf != null) {
|
||||||
// Buffer size might have changed
|
// Buffer size might have changed
|
||||||
int newBufferSize = getBufferSize(conf);
|
int newBufferSize = getBufferSize(conf);
|
||||||
|
@ -159,15 +143,8 @@ public abstract class HadoopCompressor<T extends Compressor>
|
||||||
reset();
|
reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
@Override
|
@Override
|
||||||
public void reset() {
|
public void reset() {
|
||||||
LOG.trace("reset");
|
|
||||||
try {
|
|
||||||
compressor = (T) (compressor.getClass().getDeclaredConstructor().newInstance());
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
inBuf.clear();
|
inBuf.clear();
|
||||||
outBuf.clear();
|
outBuf.clear();
|
||||||
outBuf.position(outBuf.capacity());
|
outBuf.position(outBuf.capacity());
|
||||||
|
@ -184,13 +161,11 @@ public abstract class HadoopCompressor<T extends Compressor>
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setInput(byte[] b, int off, int len) {
|
public void setInput(byte[] b, int off, int len) {
|
||||||
LOG.trace("setInput: off={} len={}", off, len);
|
|
||||||
if (inBuf.remaining() < len) {
|
if (inBuf.remaining() < len) {
|
||||||
// Get a new buffer that can accomodate the accumulated input plus the additional
|
// Get a new buffer that can accomodate the accumulated input plus the additional
|
||||||
// input that would cause a buffer overflow without reallocation.
|
// input that would cause a buffer overflow without reallocation.
|
||||||
// This condition should be fortunately rare, because it is expensive.
|
// This condition should be fortunately rare, because it is expensive.
|
||||||
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
|
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
|
||||||
LOG.trace("setInput: resize inBuf {}", needed);
|
|
||||||
ByteBuffer newBuf = ByteBuffer.allocate(needed);
|
ByteBuffer newBuf = ByteBuffer.allocate(needed);
|
||||||
inBuf.flip();
|
inBuf.flip();
|
||||||
newBuf.put(inBuf);
|
newBuf.put(inBuf);
|
||||||
|
|
|
@ -22,8 +22,6 @@ import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Hadoop decompressor glue for aircompressor decompressors.
|
* Hadoop decompressor glue for aircompressor decompressors.
|
||||||
|
@ -32,7 +30,6 @@ import org.slf4j.LoggerFactory;
|
||||||
public class HadoopDecompressor<T extends Decompressor>
|
public class HadoopDecompressor<T extends Decompressor>
|
||||||
implements org.apache.hadoop.io.compress.Decompressor {
|
implements org.apache.hadoop.io.compress.Decompressor {
|
||||||
|
|
||||||
protected static final Logger LOG = LoggerFactory.getLogger(HadoopDecompressor.class);
|
|
||||||
protected T decompressor;
|
protected T decompressor;
|
||||||
protected ByteBuffer inBuf, outBuf;
|
protected ByteBuffer inBuf, outBuf;
|
||||||
protected int inLen;
|
protected int inLen;
|
||||||
|
@ -50,7 +47,6 @@ public class HadoopDecompressor<T extends Decompressor>
|
||||||
if (outBuf.hasRemaining()) {
|
if (outBuf.hasRemaining()) {
|
||||||
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
||||||
outBuf.get(b, off, n);
|
outBuf.get(b, off, n);
|
||||||
LOG.trace("decompress: read {} remaining bytes from outBuf", n);
|
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
if (inBuf.position() > 0) {
|
if (inBuf.position() > 0) {
|
||||||
|
@ -63,50 +59,36 @@ public class HadoopDecompressor<T extends Decompressor>
|
||||||
inBuf.rewind();
|
inBuf.rewind();
|
||||||
inBuf.limit(inBuf.capacity());
|
inBuf.limit(inBuf.capacity());
|
||||||
final int written = outBuf.position();
|
final int written = outBuf.position();
|
||||||
LOG.trace("decompress: decompressed {} -> {}", remaining, written);
|
|
||||||
outBuf.flip();
|
outBuf.flip();
|
||||||
int n = Math.min(written, len);
|
int n = Math.min(written, len);
|
||||||
outBuf.get(b, off, n);
|
outBuf.get(b, off, n);
|
||||||
LOG.trace("decompress: {} bytes", n);
|
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
LOG.trace("decompress: No output, finished");
|
|
||||||
finished = true;
|
finished = true;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void end() {
|
public void end() {
|
||||||
LOG.trace("end");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean finished() {
|
public boolean finished() {
|
||||||
LOG.trace("finished");
|
|
||||||
return finished;
|
return finished;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getRemaining() {
|
public int getRemaining() {
|
||||||
LOG.trace("getRemaining: {}", inLen);
|
|
||||||
return inLen;
|
return inLen;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean needsDictionary() {
|
public boolean needsDictionary() {
|
||||||
LOG.trace("needsDictionary");
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
@Override
|
@Override
|
||||||
public void reset() {
|
public void reset() {
|
||||||
LOG.trace("reset");
|
|
||||||
try {
|
|
||||||
decompressor = (T) (decompressor.getClass().getDeclaredConstructor().newInstance());
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
inBuf.rewind();
|
inBuf.rewind();
|
||||||
inBuf.limit(inBuf.capacity());
|
inBuf.limit(inBuf.capacity());
|
||||||
inLen = 0;
|
inLen = 0;
|
||||||
|
@ -117,9 +99,7 @@ public class HadoopDecompressor<T extends Decompressor>
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean needsInput() {
|
public boolean needsInput() {
|
||||||
boolean b = (inBuf.position() == 0);
|
return inBuf.position() == 0;
|
||||||
LOG.trace("needsInput: {}", b);
|
|
||||||
return b;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -129,13 +109,11 @@ public class HadoopDecompressor<T extends Decompressor>
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setInput(byte[] b, int off, int len) {
|
public void setInput(byte[] b, int off, int len) {
|
||||||
LOG.trace("setInput: off={} len={}", off, len);
|
|
||||||
if (inBuf.remaining() < len) {
|
if (inBuf.remaining() < len) {
|
||||||
// Get a new buffer that can accomodate the accumulated input plus the additional
|
// Get a new buffer that can accomodate the accumulated input plus the additional
|
||||||
// input that would cause a buffer overflow without reallocation.
|
// input that would cause a buffer overflow without reallocation.
|
||||||
// This condition should be fortunately rare, because it is expensive.
|
// This condition should be fortunately rare, because it is expensive.
|
||||||
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
|
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
|
||||||
LOG.trace("setInput: resize inBuf {}", needed);
|
|
||||||
ByteBuffer newBuf = ByteBuffer.allocate(needed);
|
ByteBuffer newBuf = ByteBuffer.allocate(needed);
|
||||||
inBuf.flip();
|
inBuf.flip();
|
||||||
newBuf.put(inBuf);
|
newBuf.put(inBuf);
|
||||||
|
|
|
@ -27,8 +27,6 @@ import org.apache.hadoop.hbase.io.compress.CanReinit;
|
||||||
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
||||||
import org.apache.hadoop.io.compress.Compressor;
|
import org.apache.hadoop.io.compress.Compressor;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Hadoop compressor glue for Brotli4j
|
* Hadoop compressor glue for Brotli4j
|
||||||
|
@ -36,7 +34,6 @@ import org.slf4j.LoggerFactory;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class BrotliCompressor implements CanReinit, Compressor {
|
public class BrotliCompressor implements CanReinit, Compressor {
|
||||||
|
|
||||||
protected static final Logger LOG = LoggerFactory.getLogger(BrotliCompressor.class);
|
|
||||||
protected ByteBuffer inBuf, outBuf;
|
protected ByteBuffer inBuf, outBuf;
|
||||||
protected int bufferSize;
|
protected int bufferSize;
|
||||||
protected boolean finish, finished;
|
protected boolean finish, finished;
|
||||||
|
@ -64,7 +61,6 @@ public class BrotliCompressor implements CanReinit, Compressor {
|
||||||
if (outBuf.hasRemaining()) {
|
if (outBuf.hasRemaining()) {
|
||||||
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
||||||
outBuf.get(b, off, n);
|
outBuf.get(b, off, n);
|
||||||
LOG.trace("compress: read {} remaining bytes from outBuf", n);
|
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
// We don't actually begin compression until our caller calls finish().
|
// We don't actually begin compression until our caller calls finish().
|
||||||
|
@ -84,7 +80,6 @@ public class BrotliCompressor implements CanReinit, Compressor {
|
||||||
} else {
|
} else {
|
||||||
if (outBuf.capacity() < needed) {
|
if (outBuf.capacity() < needed) {
|
||||||
needed = CompressionUtil.roundInt2(needed);
|
needed = CompressionUtil.roundInt2(needed);
|
||||||
LOG.trace("compress: resize outBuf {}", needed);
|
|
||||||
outBuf = ByteBuffer.allocate(needed);
|
outBuf = ByteBuffer.allocate(needed);
|
||||||
} else {
|
} else {
|
||||||
outBuf.clear();
|
outBuf.clear();
|
||||||
|
@ -96,42 +91,34 @@ public class BrotliCompressor implements CanReinit, Compressor {
|
||||||
final int written = writeBuf.position() - oldPos;
|
final int written = writeBuf.position() - oldPos;
|
||||||
bytesWritten += written;
|
bytesWritten += written;
|
||||||
inBuf.clear();
|
inBuf.clear();
|
||||||
LOG.trace("compress: compressed {} -> {}", uncompressed, written);
|
|
||||||
finished = true;
|
finished = true;
|
||||||
if (!direct) {
|
if (!direct) {
|
||||||
outBuf.flip();
|
outBuf.flip();
|
||||||
int n = Math.min(written, len);
|
int n = Math.min(written, len);
|
||||||
outBuf.get(b, off, n);
|
outBuf.get(b, off, n);
|
||||||
LOG.trace("compress: {} bytes", n);
|
|
||||||
return n;
|
return n;
|
||||||
} else {
|
} else {
|
||||||
LOG.trace("compress: {} bytes direct", written);
|
|
||||||
return written;
|
return written;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
finished = true;
|
finished = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
LOG.trace("No output");
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void end() {
|
public void end() {
|
||||||
LOG.trace("end");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void finish() {
|
public void finish() {
|
||||||
LOG.trace("finish");
|
|
||||||
finish = true;
|
finish = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean finished() {
|
public boolean finished() {
|
||||||
boolean b = finished && !outBuf.hasRemaining();
|
return finished && !outBuf.hasRemaining();
|
||||||
LOG.trace("finished: {}", b);
|
|
||||||
return b;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -146,14 +133,11 @@ public class BrotliCompressor implements CanReinit, Compressor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean needsInput() {
|
public boolean needsInput() {
|
||||||
boolean b = !finished();
|
return !finished();
|
||||||
LOG.trace("needsInput: {}", b);
|
|
||||||
return b;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void reinit(Configuration conf) {
|
public void reinit(Configuration conf) {
|
||||||
LOG.trace("reinit");
|
|
||||||
if (conf != null) {
|
if (conf != null) {
|
||||||
// Quality or window settings might have changed
|
// Quality or window settings might have changed
|
||||||
params.setQuality(BrotliCodec.getLevel(conf));
|
params.setQuality(BrotliCodec.getLevel(conf));
|
||||||
|
@ -171,7 +155,6 @@ public class BrotliCompressor implements CanReinit, Compressor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void reset() {
|
public void reset() {
|
||||||
LOG.trace("reset");
|
|
||||||
inBuf.clear();
|
inBuf.clear();
|
||||||
outBuf.clear();
|
outBuf.clear();
|
||||||
outBuf.position(outBuf.capacity());
|
outBuf.position(outBuf.capacity());
|
||||||
|
@ -188,13 +171,11 @@ public class BrotliCompressor implements CanReinit, Compressor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setInput(byte[] b, int off, int len) {
|
public void setInput(byte[] b, int off, int len) {
|
||||||
LOG.trace("setInput: off={} len={}", off, len);
|
|
||||||
if (inBuf.remaining() < len) {
|
if (inBuf.remaining() < len) {
|
||||||
// Get a new buffer that can accomodate the accumulated input plus the additional
|
// Get a new buffer that can accomodate the accumulated input plus the additional
|
||||||
// input that would cause a buffer overflow without reallocation.
|
// input that would cause a buffer overflow without reallocation.
|
||||||
// This condition should be fortunately rare, because it is expensive.
|
// This condition should be fortunately rare, because it is expensive.
|
||||||
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
|
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
|
||||||
LOG.trace("setInput: resize inBuf {}", needed);
|
|
||||||
ByteBuffer newBuf = ByteBuffer.allocate(needed);
|
ByteBuffer newBuf = ByteBuffer.allocate(needed);
|
||||||
inBuf.flip();
|
inBuf.flip();
|
||||||
newBuf.put(inBuf);
|
newBuf.put(inBuf);
|
||||||
|
|
|
@ -25,8 +25,6 @@ import java.nio.ByteBuffer;
|
||||||
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
||||||
import org.apache.hadoop.io.compress.Decompressor;
|
import org.apache.hadoop.io.compress.Decompressor;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Hadoop decompressor glue for Brotli4j
|
* Hadoop decompressor glue for Brotli4j
|
||||||
|
@ -34,7 +32,6 @@ import org.slf4j.LoggerFactory;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class BrotliDecompressor implements Decompressor {
|
public class BrotliDecompressor implements Decompressor {
|
||||||
|
|
||||||
protected static final Logger LOG = LoggerFactory.getLogger(BrotliDecompressor.class);
|
|
||||||
protected ByteBuffer inBuf, outBuf;
|
protected ByteBuffer inBuf, outBuf;
|
||||||
protected int inLen;
|
protected int inLen;
|
||||||
protected boolean finished;
|
protected boolean finished;
|
||||||
|
@ -54,7 +51,6 @@ public class BrotliDecompressor implements Decompressor {
|
||||||
if (outBuf.hasRemaining()) {
|
if (outBuf.hasRemaining()) {
|
||||||
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
||||||
outBuf.get(b, off, n);
|
outBuf.get(b, off, n);
|
||||||
LOG.trace("decompress: read {} remaining bytes from outBuf", n);
|
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
if (inBuf.position() > 0) {
|
if (inBuf.position() > 0) {
|
||||||
|
@ -63,7 +59,6 @@ public class BrotliDecompressor implements Decompressor {
|
||||||
inLen -= remaining;
|
inLen -= remaining;
|
||||||
outBuf.rewind();
|
outBuf.rewind();
|
||||||
outBuf.limit(outBuf.capacity());
|
outBuf.limit(outBuf.capacity());
|
||||||
|
|
||||||
// TODO: More inefficient than it could be, but it doesn't impact decompression speed
|
// TODO: More inefficient than it could be, but it doesn't impact decompression speed
|
||||||
// terribly and the brotli4j API alternatives do not seem to work correctly.
|
// terribly and the brotli4j API alternatives do not seem to work correctly.
|
||||||
// Maybe something more clever can be done as a future improvement.
|
// Maybe something more clever can be done as a future improvement.
|
||||||
|
@ -72,47 +67,38 @@ public class BrotliDecompressor implements Decompressor {
|
||||||
DirectDecompress result = Decoder.decompress(inb);
|
DirectDecompress result = Decoder.decompress(inb);
|
||||||
outBuf.put(result.getDecompressedDataByteBuf().nioBuffer());
|
outBuf.put(result.getDecompressedDataByteBuf().nioBuffer());
|
||||||
final int written = outBuf.position();
|
final int written = outBuf.position();
|
||||||
|
|
||||||
inBuf.rewind();
|
inBuf.rewind();
|
||||||
inBuf.limit(inBuf.capacity());
|
inBuf.limit(inBuf.capacity());
|
||||||
LOG.trace("decompress: decompressed {} -> {}", remaining, written);
|
|
||||||
outBuf.flip();
|
outBuf.flip();
|
||||||
int n = Math.min(written, len);
|
int n = Math.min(written, len);
|
||||||
outBuf.get(b, off, n);
|
outBuf.get(b, off, n);
|
||||||
LOG.trace("decompress: {} bytes", n);
|
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
LOG.trace("decompress: No output, finished");
|
|
||||||
finished = true;
|
finished = true;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void end() {
|
public void end() {
|
||||||
LOG.trace("end");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean finished() {
|
public boolean finished() {
|
||||||
LOG.trace("finished");
|
|
||||||
return finished;
|
return finished;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getRemaining() {
|
public int getRemaining() {
|
||||||
LOG.trace("getRemaining: {}", inLen);
|
|
||||||
return inLen;
|
return inLen;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean needsDictionary() {
|
public boolean needsDictionary() {
|
||||||
LOG.trace("needsDictionary");
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void reset() {
|
public void reset() {
|
||||||
LOG.trace("reset");
|
|
||||||
inBuf.clear();
|
inBuf.clear();
|
||||||
inLen = 0;
|
inLen = 0;
|
||||||
outBuf.clear();
|
outBuf.clear();
|
||||||
|
@ -122,9 +108,7 @@ public class BrotliDecompressor implements Decompressor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean needsInput() {
|
public boolean needsInput() {
|
||||||
boolean b = (inBuf.position() == 0);
|
return inBuf.position() == 0;
|
||||||
LOG.trace("needsInput: {}", b);
|
|
||||||
return b;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -134,13 +118,11 @@ public class BrotliDecompressor implements Decompressor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setInput(byte[] b, int off, int len) {
|
public void setInput(byte[] b, int off, int len) {
|
||||||
LOG.trace("setInput: off={} len={}", off, len);
|
|
||||||
if (inBuf.remaining() < len) {
|
if (inBuf.remaining() < len) {
|
||||||
// Get a new buffer that can accomodate the accumulated input plus the additional
|
// Get a new buffer that can accomodate the accumulated input plus the additional
|
||||||
// input that would cause a buffer overflow without reallocation.
|
// input that would cause a buffer overflow without reallocation.
|
||||||
// This condition should be fortunately rare, because it is expensive.
|
// This condition should be fortunately rare, because it is expensive.
|
||||||
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
|
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
|
||||||
LOG.trace("setInput: resize inBuf {}", needed);
|
|
||||||
ByteBuffer newBuf = ByteBuffer.allocate(needed);
|
ByteBuffer newBuf = ByteBuffer.allocate(needed);
|
||||||
inBuf.flip();
|
inBuf.flip();
|
||||||
newBuf.put(inBuf);
|
newBuf.put(inBuf);
|
||||||
|
|
|
@ -26,8 +26,6 @@ import org.apache.hadoop.hbase.io.compress.CanReinit;
|
||||||
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
||||||
import org.apache.hadoop.io.compress.Compressor;
|
import org.apache.hadoop.io.compress.Compressor;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Hadoop compressor glue for lz4-java.
|
* Hadoop compressor glue for lz4-java.
|
||||||
|
@ -35,7 +33,6 @@ import org.slf4j.LoggerFactory;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class Lz4Compressor implements CanReinit, Compressor {
|
public class Lz4Compressor implements CanReinit, Compressor {
|
||||||
|
|
||||||
protected static final Logger LOG = LoggerFactory.getLogger(Lz4Compressor.class);
|
|
||||||
protected LZ4Compressor compressor;
|
protected LZ4Compressor compressor;
|
||||||
protected ByteBuffer inBuf, outBuf;
|
protected ByteBuffer inBuf, outBuf;
|
||||||
protected int bufferSize;
|
protected int bufferSize;
|
||||||
|
@ -57,7 +54,6 @@ public class Lz4Compressor implements CanReinit, Compressor {
|
||||||
if (outBuf.hasRemaining()) {
|
if (outBuf.hasRemaining()) {
|
||||||
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
||||||
outBuf.get(b, off, n);
|
outBuf.get(b, off, n);
|
||||||
LOG.trace("compress: read {} remaining bytes from outBuf", n);
|
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
// We don't actually begin compression until our caller calls finish().
|
// We don't actually begin compression until our caller calls finish().
|
||||||
|
@ -77,7 +73,6 @@ public class Lz4Compressor implements CanReinit, Compressor {
|
||||||
// allocate a new one which does.
|
// allocate a new one which does.
|
||||||
if (outBuf.capacity() < needed) {
|
if (outBuf.capacity() < needed) {
|
||||||
needed = CompressionUtil.roundInt2(needed);
|
needed = CompressionUtil.roundInt2(needed);
|
||||||
LOG.trace("compress: resize outBuf {}", needed);
|
|
||||||
outBuf = ByteBuffer.allocate(needed);
|
outBuf = ByteBuffer.allocate(needed);
|
||||||
} else {
|
} else {
|
||||||
outBuf.clear();
|
outBuf.clear();
|
||||||
|
@ -89,42 +84,34 @@ public class Lz4Compressor implements CanReinit, Compressor {
|
||||||
final int written = writeBuffer.position() - oldPos;
|
final int written = writeBuffer.position() - oldPos;
|
||||||
bytesWritten += written;
|
bytesWritten += written;
|
||||||
inBuf.clear();
|
inBuf.clear();
|
||||||
LOG.trace("compress: compressed {} -> {}", uncompressed, written);
|
|
||||||
finished = true;
|
finished = true;
|
||||||
if (!direct) {
|
if (!direct) {
|
||||||
outBuf.flip();
|
outBuf.flip();
|
||||||
int n = Math.min(written, len);
|
int n = Math.min(written, len);
|
||||||
outBuf.get(b, off, n);
|
outBuf.get(b, off, n);
|
||||||
LOG.trace("compress: {} bytes", n);
|
|
||||||
return n;
|
return n;
|
||||||
} else {
|
} else {
|
||||||
LOG.trace("compress: {} bytes direct", written);
|
|
||||||
return written;
|
return written;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
finished = true;
|
finished = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
LOG.trace("No output");
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void end() {
|
public void end() {
|
||||||
LOG.trace("end");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void finish() {
|
public void finish() {
|
||||||
LOG.trace("finish");
|
|
||||||
finish = true;
|
finish = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean finished() {
|
public boolean finished() {
|
||||||
boolean b = finished && !outBuf.hasRemaining();
|
return finished && !outBuf.hasRemaining();
|
||||||
LOG.trace("finished: {}", b);
|
|
||||||
return b;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -139,14 +126,11 @@ public class Lz4Compressor implements CanReinit, Compressor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean needsInput() {
|
public boolean needsInput() {
|
||||||
boolean b = !finished();
|
return !finished();
|
||||||
LOG.trace("needsInput: {}", b);
|
|
||||||
return b;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void reinit(Configuration conf) {
|
public void reinit(Configuration conf) {
|
||||||
LOG.trace("reinit");
|
|
||||||
if (conf != null) {
|
if (conf != null) {
|
||||||
// Buffer size might have changed
|
// Buffer size might have changed
|
||||||
int newBufferSize = Lz4Codec.getBufferSize(conf);
|
int newBufferSize = Lz4Codec.getBufferSize(conf);
|
||||||
|
@ -161,8 +145,6 @@ public class Lz4Compressor implements CanReinit, Compressor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void reset() {
|
public void reset() {
|
||||||
LOG.trace("reset");
|
|
||||||
compressor = LZ4Factory.fastestInstance().fastCompressor();
|
|
||||||
inBuf.clear();
|
inBuf.clear();
|
||||||
outBuf.clear();
|
outBuf.clear();
|
||||||
outBuf.position(outBuf.capacity());
|
outBuf.position(outBuf.capacity());
|
||||||
|
@ -179,13 +161,11 @@ public class Lz4Compressor implements CanReinit, Compressor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setInput(byte[] b, int off, int len) {
|
public void setInput(byte[] b, int off, int len) {
|
||||||
LOG.trace("setInput: off={} len={}", off, len);
|
|
||||||
if (inBuf.remaining() < len) {
|
if (inBuf.remaining() < len) {
|
||||||
// Get a new buffer that can accomodate the accumulated input plus the additional
|
// Get a new buffer that can accomodate the accumulated input plus the additional
|
||||||
// input that would cause a buffer overflow without reallocation.
|
// input that would cause a buffer overflow without reallocation.
|
||||||
// This condition should be fortunately rare, because it is expensive.
|
// This condition should be fortunately rare, because it is expensive.
|
||||||
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
|
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
|
||||||
LOG.trace("setInput: resize inBuf {}", needed);
|
|
||||||
ByteBuffer newBuf = ByteBuffer.allocate(needed);
|
ByteBuffer newBuf = ByteBuffer.allocate(needed);
|
||||||
inBuf.flip();
|
inBuf.flip();
|
||||||
newBuf.put(inBuf);
|
newBuf.put(inBuf);
|
||||||
|
|
|
@ -24,8 +24,6 @@ import net.jpountz.lz4.LZ4SafeDecompressor;
|
||||||
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
||||||
import org.apache.hadoop.io.compress.Decompressor;
|
import org.apache.hadoop.io.compress.Decompressor;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Hadoop decompressor glue for lz4-java.
|
* Hadoop decompressor glue for lz4-java.
|
||||||
|
@ -33,7 +31,6 @@ import org.slf4j.LoggerFactory;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class Lz4Decompressor implements Decompressor {
|
public class Lz4Decompressor implements Decompressor {
|
||||||
|
|
||||||
protected static final Logger LOG = LoggerFactory.getLogger(Lz4Decompressor.class);
|
|
||||||
protected LZ4SafeDecompressor decompressor;
|
protected LZ4SafeDecompressor decompressor;
|
||||||
protected ByteBuffer inBuf, outBuf;
|
protected ByteBuffer inBuf, outBuf;
|
||||||
protected int bufferSize, inLen;
|
protected int bufferSize, inLen;
|
||||||
|
@ -52,7 +49,6 @@ public class Lz4Decompressor implements Decompressor {
|
||||||
if (outBuf.hasRemaining()) {
|
if (outBuf.hasRemaining()) {
|
||||||
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
||||||
outBuf.get(b, off, n);
|
outBuf.get(b, off, n);
|
||||||
LOG.trace("decompress: read {} remaining bytes from outBuf", n);
|
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
if (inBuf.position() > 0) {
|
if (inBuf.position() > 0) {
|
||||||
|
@ -63,45 +59,36 @@ public class Lz4Decompressor implements Decompressor {
|
||||||
decompressor.decompress(inBuf, outBuf);
|
decompressor.decompress(inBuf, outBuf);
|
||||||
inBuf.clear();
|
inBuf.clear();
|
||||||
final int written = outBuf.position();
|
final int written = outBuf.position();
|
||||||
LOG.trace("decompress: decompressed {} -> {}", remaining, written);
|
|
||||||
outBuf.flip();
|
outBuf.flip();
|
||||||
int n = Math.min(written, len);
|
int n = Math.min(written, len);
|
||||||
outBuf.get(b, off, n);
|
outBuf.get(b, off, n);
|
||||||
LOG.trace("decompress: {} bytes", n);
|
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
LOG.trace("decompress: No output, finished");
|
|
||||||
finished = true;
|
finished = true;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void end() {
|
public void end() {
|
||||||
LOG.trace("end");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean finished() {
|
public boolean finished() {
|
||||||
LOG.trace("finished");
|
|
||||||
return finished;
|
return finished;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getRemaining() {
|
public int getRemaining() {
|
||||||
LOG.trace("getRemaining: {}", inLen);
|
|
||||||
return inLen;
|
return inLen;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean needsDictionary() {
|
public boolean needsDictionary() {
|
||||||
LOG.trace("needsDictionary");
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void reset() {
|
public void reset() {
|
||||||
LOG.trace("reset");
|
|
||||||
this.decompressor = LZ4Factory.fastestInstance().safeDecompressor();
|
|
||||||
inBuf.clear();
|
inBuf.clear();
|
||||||
inLen = 0;
|
inLen = 0;
|
||||||
outBuf.clear();
|
outBuf.clear();
|
||||||
|
@ -111,9 +98,7 @@ public class Lz4Decompressor implements Decompressor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean needsInput() {
|
public boolean needsInput() {
|
||||||
boolean b = (inBuf.position() == 0);
|
return inBuf.position() == 0;
|
||||||
LOG.trace("needsInput: {}", b);
|
|
||||||
return b;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -123,13 +108,11 @@ public class Lz4Decompressor implements Decompressor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setInput(byte[] b, int off, int len) {
|
public void setInput(byte[] b, int off, int len) {
|
||||||
LOG.trace("setInput: off={} len={}", off, len);
|
|
||||||
if (inBuf.remaining() < len) {
|
if (inBuf.remaining() < len) {
|
||||||
// Get a new buffer that can accomodate the accumulated input plus the additional
|
// Get a new buffer that can accomodate the accumulated input plus the additional
|
||||||
// input that would cause a buffer overflow without reallocation.
|
// input that would cause a buffer overflow without reallocation.
|
||||||
// This condition should be fortunately rare, because it is expensive.
|
// This condition should be fortunately rare, because it is expensive.
|
||||||
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
|
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
|
||||||
LOG.trace("setInput: resize inBuf {}", needed);
|
|
||||||
ByteBuffer newBuf = ByteBuffer.allocate(needed);
|
ByteBuffer newBuf = ByteBuffer.allocate(needed);
|
||||||
inBuf.flip();
|
inBuf.flip();
|
||||||
newBuf.put(inBuf);
|
newBuf.put(inBuf);
|
||||||
|
|
|
@ -24,8 +24,6 @@ import org.apache.hadoop.hbase.io.compress.CanReinit;
|
||||||
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
||||||
import org.apache.hadoop.io.compress.Compressor;
|
import org.apache.hadoop.io.compress.Compressor;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.xerial.snappy.Snappy;
|
import org.xerial.snappy.Snappy;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -34,7 +32,6 @@ import org.xerial.snappy.Snappy;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class SnappyCompressor implements CanReinit, Compressor {
|
public class SnappyCompressor implements CanReinit, Compressor {
|
||||||
|
|
||||||
protected static final Logger LOG = LoggerFactory.getLogger(SnappyCompressor.class);
|
|
||||||
protected ByteBuffer inBuf, outBuf;
|
protected ByteBuffer inBuf, outBuf;
|
||||||
protected int bufferSize;
|
protected int bufferSize;
|
||||||
protected boolean finish, finished;
|
protected boolean finish, finished;
|
||||||
|
@ -54,7 +51,6 @@ public class SnappyCompressor implements CanReinit, Compressor {
|
||||||
if (outBuf.hasRemaining()) {
|
if (outBuf.hasRemaining()) {
|
||||||
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
||||||
outBuf.get(b, off, n);
|
outBuf.get(b, off, n);
|
||||||
LOG.trace("compress: read {} remaining bytes from outBuf", n);
|
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
// We don't actually begin compression until our caller calls finish().
|
// We don't actually begin compression until our caller calls finish().
|
||||||
|
@ -67,7 +63,6 @@ public class SnappyCompressor implements CanReinit, Compressor {
|
||||||
int needed = maxCompressedLength(uncompressed);
|
int needed = maxCompressedLength(uncompressed);
|
||||||
if (outBuf.capacity() < needed) {
|
if (outBuf.capacity() < needed) {
|
||||||
needed = CompressionUtil.roundInt2(needed);
|
needed = CompressionUtil.roundInt2(needed);
|
||||||
LOG.trace("setInput: resize inBuf {}", needed);
|
|
||||||
outBuf = ByteBuffer.allocateDirect(needed);
|
outBuf = ByteBuffer.allocateDirect(needed);
|
||||||
} else {
|
} else {
|
||||||
outBuf.clear();
|
outBuf.clear();
|
||||||
|
@ -75,36 +70,29 @@ public class SnappyCompressor implements CanReinit, Compressor {
|
||||||
int written = Snappy.compress(inBuf, outBuf);
|
int written = Snappy.compress(inBuf, outBuf);
|
||||||
bytesWritten += written;
|
bytesWritten += written;
|
||||||
inBuf.clear();
|
inBuf.clear();
|
||||||
LOG.trace("compress: compressed {} -> {}", uncompressed, written);
|
|
||||||
finished = true;
|
finished = true;
|
||||||
int n = Math.min(written, len);
|
int n = Math.min(written, len);
|
||||||
outBuf.get(b, off, n);
|
outBuf.get(b, off, n);
|
||||||
LOG.trace("compress: {} bytes", n);
|
|
||||||
return n;
|
return n;
|
||||||
} else {
|
} else {
|
||||||
finished = true;
|
finished = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
LOG.trace("No output");
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void end() {
|
public void end() {
|
||||||
LOG.trace("end");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void finish() {
|
public void finish() {
|
||||||
LOG.trace("finish");
|
|
||||||
finish = true;
|
finish = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean finished() {
|
public boolean finished() {
|
||||||
boolean b = finished && !outBuf.hasRemaining();
|
return finished && !outBuf.hasRemaining();
|
||||||
LOG.trace("finished: {}", b);
|
|
||||||
return b;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -119,14 +107,11 @@ public class SnappyCompressor implements CanReinit, Compressor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean needsInput() {
|
public boolean needsInput() {
|
||||||
boolean b = !finished();
|
return !finished();
|
||||||
LOG.trace("needsInput: {}", b);
|
|
||||||
return b;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void reinit(Configuration conf) {
|
public void reinit(Configuration conf) {
|
||||||
LOG.trace("reinit");
|
|
||||||
if (conf != null) {
|
if (conf != null) {
|
||||||
// Buffer size might have changed
|
// Buffer size might have changed
|
||||||
int newBufferSize = SnappyCodec.getBufferSize(conf);
|
int newBufferSize = SnappyCodec.getBufferSize(conf);
|
||||||
|
@ -141,7 +126,6 @@ public class SnappyCompressor implements CanReinit, Compressor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void reset() {
|
public void reset() {
|
||||||
LOG.trace("reset");
|
|
||||||
inBuf.clear();
|
inBuf.clear();
|
||||||
outBuf.clear();
|
outBuf.clear();
|
||||||
outBuf.position(outBuf.capacity());
|
outBuf.position(outBuf.capacity());
|
||||||
|
@ -158,13 +142,11 @@ public class SnappyCompressor implements CanReinit, Compressor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setInput(byte[] b, int off, int len) {
|
public void setInput(byte[] b, int off, int len) {
|
||||||
LOG.trace("setInput: off={} len={}", off, len);
|
|
||||||
if (inBuf.remaining() < len) {
|
if (inBuf.remaining() < len) {
|
||||||
// Get a new buffer that can accomodate the accumulated input plus the additional
|
// Get a new buffer that can accomodate the accumulated input plus the additional
|
||||||
// input that would cause a buffer overflow without reallocation.
|
// input that would cause a buffer overflow without reallocation.
|
||||||
// This condition should be fortunately rare, because it is expensive.
|
// This condition should be fortunately rare, because it is expensive.
|
||||||
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
|
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
|
||||||
LOG.trace("setInput: resize inBuf {}", needed);
|
|
||||||
ByteBuffer newBuf = ByteBuffer.allocateDirect(needed);
|
ByteBuffer newBuf = ByteBuffer.allocateDirect(needed);
|
||||||
inBuf.flip();
|
inBuf.flip();
|
||||||
newBuf.put(inBuf);
|
newBuf.put(inBuf);
|
||||||
|
|
|
@ -22,8 +22,6 @@ import java.nio.ByteBuffer;
|
||||||
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
||||||
import org.apache.hadoop.io.compress.Decompressor;
|
import org.apache.hadoop.io.compress.Decompressor;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.xerial.snappy.Snappy;
|
import org.xerial.snappy.Snappy;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -32,7 +30,6 @@ import org.xerial.snappy.Snappy;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class SnappyDecompressor implements Decompressor {
|
public class SnappyDecompressor implements Decompressor {
|
||||||
|
|
||||||
protected static final Logger LOG = LoggerFactory.getLogger(SnappyDecompressor.class);
|
|
||||||
protected ByteBuffer inBuf, outBuf;
|
protected ByteBuffer inBuf, outBuf;
|
||||||
protected int inLen;
|
protected int inLen;
|
||||||
protected boolean finished;
|
protected boolean finished;
|
||||||
|
@ -48,7 +45,6 @@ public class SnappyDecompressor implements Decompressor {
|
||||||
if (outBuf.hasRemaining()) {
|
if (outBuf.hasRemaining()) {
|
||||||
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
||||||
outBuf.get(b, off, n);
|
outBuf.get(b, off, n);
|
||||||
LOG.trace("decompress: read {} remaining bytes from outBuf", n);
|
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
if (inBuf.position() > 0) {
|
if (inBuf.position() > 0) {
|
||||||
|
@ -58,43 +54,35 @@ public class SnappyDecompressor implements Decompressor {
|
||||||
outBuf.clear();
|
outBuf.clear();
|
||||||
int written = Snappy.uncompress(inBuf, outBuf);
|
int written = Snappy.uncompress(inBuf, outBuf);
|
||||||
inBuf.clear();
|
inBuf.clear();
|
||||||
LOG.trace("decompress: decompressed {} -> {}", remaining, written);
|
|
||||||
int n = Math.min(written, len);
|
int n = Math.min(written, len);
|
||||||
outBuf.get(b, off, n);
|
outBuf.get(b, off, n);
|
||||||
LOG.trace("decompress: {} bytes", n);
|
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
LOG.trace("decompress: No output, finished");
|
|
||||||
finished = true;
|
finished = true;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void end() {
|
public void end() {
|
||||||
LOG.trace("end");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean finished() {
|
public boolean finished() {
|
||||||
LOG.trace("finished");
|
|
||||||
return finished;
|
return finished;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getRemaining() {
|
public int getRemaining() {
|
||||||
LOG.trace("getRemaining: {}", inLen);
|
|
||||||
return inLen;
|
return inLen;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean needsDictionary() {
|
public boolean needsDictionary() {
|
||||||
LOG.trace("needsDictionary");
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void reset() {
|
public void reset() {
|
||||||
LOG.trace("reset");
|
|
||||||
inBuf.clear();
|
inBuf.clear();
|
||||||
inLen = 0;
|
inLen = 0;
|
||||||
outBuf.clear();
|
outBuf.clear();
|
||||||
|
@ -104,9 +92,7 @@ public class SnappyDecompressor implements Decompressor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean needsInput() {
|
public boolean needsInput() {
|
||||||
boolean b = (inBuf.position() == 0);
|
return inBuf.position() == 0;
|
||||||
LOG.trace("needsInput: {}", b);
|
|
||||||
return b;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -116,13 +102,11 @@ public class SnappyDecompressor implements Decompressor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setInput(byte[] b, int off, int len) {
|
public void setInput(byte[] b, int off, int len) {
|
||||||
LOG.trace("setInput: off={} len={}", off, len);
|
|
||||||
if (inBuf.remaining() < len) {
|
if (inBuf.remaining() < len) {
|
||||||
// Get a new buffer that can accomodate the accumulated input plus the additional
|
// Get a new buffer that can accomodate the accumulated input plus the additional
|
||||||
// input that would cause a buffer overflow without reallocation.
|
// input that would cause a buffer overflow without reallocation.
|
||||||
// This condition should be fortunately rare, because it is expensive.
|
// This condition should be fortunately rare, because it is expensive.
|
||||||
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
|
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
|
||||||
LOG.trace("setInput: resize inBuf {}", needed);
|
|
||||||
ByteBuffer newBuf = ByteBuffer.allocateDirect(needed);
|
ByteBuffer newBuf = ByteBuffer.allocateDirect(needed);
|
||||||
inBuf.flip();
|
inBuf.flip();
|
||||||
newBuf.put(inBuf);
|
newBuf.put(inBuf);
|
||||||
|
|
|
@ -25,8 +25,6 @@ import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
|
||||||
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
||||||
import org.apache.hadoop.io.compress.Compressor;
|
import org.apache.hadoop.io.compress.Compressor;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.tukaani.xz.ArrayCache;
|
import org.tukaani.xz.ArrayCache;
|
||||||
import org.tukaani.xz.BasicArrayCache;
|
import org.tukaani.xz.BasicArrayCache;
|
||||||
import org.tukaani.xz.LZMA2Options;
|
import org.tukaani.xz.LZMA2Options;
|
||||||
|
@ -39,7 +37,6 @@ import org.tukaani.xz.UnsupportedOptionsException;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class LzmaCompressor implements Compressor {
|
public class LzmaCompressor implements Compressor {
|
||||||
|
|
||||||
protected static final Logger LOG = LoggerFactory.getLogger(LzmaCompressor.class);
|
|
||||||
protected static final ArrayCache ARRAY_CACHE = new BasicArrayCache();
|
protected static final ArrayCache ARRAY_CACHE = new BasicArrayCache();
|
||||||
protected ByteBuffer inBuf;
|
protected ByteBuffer inBuf;
|
||||||
protected ByteBuffer outBuf;
|
protected ByteBuffer outBuf;
|
||||||
|
@ -68,7 +65,6 @@ public class LzmaCompressor implements Compressor {
|
||||||
if (outBuf.hasRemaining()) {
|
if (outBuf.hasRemaining()) {
|
||||||
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
||||||
outBuf.get(b, off, n);
|
outBuf.get(b, off, n);
|
||||||
LOG.trace("compress: read {} remaining bytes from outBuf", n);
|
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
// We don't actually begin compression until our caller calls finish().
|
// We don't actually begin compression until our caller calls finish().
|
||||||
|
@ -88,7 +84,6 @@ public class LzmaCompressor implements Compressor {
|
||||||
} else {
|
} else {
|
||||||
if (outBuf.capacity() < needed) {
|
if (outBuf.capacity() < needed) {
|
||||||
needed = CompressionUtil.roundInt2(needed);
|
needed = CompressionUtil.roundInt2(needed);
|
||||||
LOG.trace("compress: resize outBuf {}", needed);
|
|
||||||
outBuf = ByteBuffer.allocate(needed);
|
outBuf = ByteBuffer.allocate(needed);
|
||||||
} else {
|
} else {
|
||||||
outBuf.clear();
|
outBuf.clear();
|
||||||
|
@ -119,42 +114,34 @@ public class LzmaCompressor implements Compressor {
|
||||||
int written = writeBuffer.position() - oldPos;
|
int written = writeBuffer.position() - oldPos;
|
||||||
bytesWritten += written;
|
bytesWritten += written;
|
||||||
inBuf.clear();
|
inBuf.clear();
|
||||||
LOG.trace("compress: compressed {} -> {}", uncompressed, written);
|
|
||||||
finished = true;
|
finished = true;
|
||||||
outBuf.flip();
|
outBuf.flip();
|
||||||
if (!direct) {
|
if (!direct) {
|
||||||
int n = Math.min(written, len);
|
int n = Math.min(written, len);
|
||||||
outBuf.get(b, off, n);
|
outBuf.get(b, off, n);
|
||||||
LOG.trace("compress: {} bytes", n);
|
|
||||||
return n;
|
return n;
|
||||||
} else {
|
} else {
|
||||||
LOG.trace("compress: {} bytes direct", written);
|
|
||||||
return written;
|
return written;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
finished = true;
|
finished = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
LOG.trace("No output");
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void end() {
|
public void end() {
|
||||||
LOG.trace("end");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void finish() {
|
public void finish() {
|
||||||
LOG.trace("finish");
|
|
||||||
finish = true;
|
finish = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean finished() {
|
public boolean finished() {
|
||||||
boolean b = finished && !outBuf.hasRemaining();
|
return finished && !outBuf.hasRemaining();
|
||||||
LOG.trace("finished: {}", b);
|
|
||||||
return b;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -169,14 +156,11 @@ public class LzmaCompressor implements Compressor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean needsInput() {
|
public boolean needsInput() {
|
||||||
boolean b = !finished();
|
return !finished();
|
||||||
LOG.trace("needsInput: {}", b);
|
|
||||||
return b;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void reinit(Configuration conf) {
|
public void reinit(Configuration conf) {
|
||||||
LOG.trace("reinit");
|
|
||||||
if (conf != null) {
|
if (conf != null) {
|
||||||
// Level might have changed
|
// Level might have changed
|
||||||
try {
|
try {
|
||||||
|
@ -199,7 +183,6 @@ public class LzmaCompressor implements Compressor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void reset() {
|
public void reset() {
|
||||||
LOG.trace("reset");
|
|
||||||
inBuf.clear();
|
inBuf.clear();
|
||||||
outBuf.clear();
|
outBuf.clear();
|
||||||
outBuf.position(outBuf.capacity());
|
outBuf.position(outBuf.capacity());
|
||||||
|
@ -216,13 +199,11 @@ public class LzmaCompressor implements Compressor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setInput(byte[] b, int off, int len) {
|
public void setInput(byte[] b, int off, int len) {
|
||||||
LOG.trace("setInput: off={} len={}", off, len);
|
|
||||||
if (inBuf.remaining() < len) {
|
if (inBuf.remaining() < len) {
|
||||||
// Get a new buffer that can accomodate the accumulated input plus the additional
|
// Get a new buffer that can accomodate the accumulated input plus the additional
|
||||||
// input that would cause a buffer overflow without reallocation.
|
// input that would cause a buffer overflow without reallocation.
|
||||||
// This condition should be fortunately rare, because it is expensive.
|
// This condition should be fortunately rare, because it is expensive.
|
||||||
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
|
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
|
||||||
LOG.trace("setInput: resize inBuf to {}", needed);
|
|
||||||
ByteBuffer newBuf = ByteBuffer.allocate(needed);
|
ByteBuffer newBuf = ByteBuffer.allocate(needed);
|
||||||
inBuf.flip();
|
inBuf.flip();
|
||||||
newBuf.put(inBuf);
|
newBuf.put(inBuf);
|
||||||
|
|
|
@ -23,8 +23,6 @@ import org.apache.hadoop.hbase.io.ByteBufferInputStream;
|
||||||
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
||||||
import org.apache.hadoop.io.compress.Decompressor;
|
import org.apache.hadoop.io.compress.Decompressor;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.tukaani.xz.ArrayCache;
|
import org.tukaani.xz.ArrayCache;
|
||||||
import org.tukaani.xz.BasicArrayCache;
|
import org.tukaani.xz.BasicArrayCache;
|
||||||
import org.tukaani.xz.LZMAInputStream;
|
import org.tukaani.xz.LZMAInputStream;
|
||||||
|
@ -35,7 +33,6 @@ import org.tukaani.xz.LZMAInputStream;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class LzmaDecompressor implements Decompressor {
|
public class LzmaDecompressor implements Decompressor {
|
||||||
|
|
||||||
protected static final Logger LOG = LoggerFactory.getLogger(LzmaDecompressor.class);
|
|
||||||
protected static final ArrayCache ARRAY_CACHE = new BasicArrayCache() {
|
protected static final ArrayCache ARRAY_CACHE = new BasicArrayCache() {
|
||||||
@Override
|
@Override
|
||||||
public byte[] getByteArray(int size, boolean fillWithZeros) {
|
public byte[] getByteArray(int size, boolean fillWithZeros) {
|
||||||
|
@ -59,7 +56,6 @@ public class LzmaDecompressor implements Decompressor {
|
||||||
if (outBuf.hasRemaining()) {
|
if (outBuf.hasRemaining()) {
|
||||||
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
||||||
outBuf.get(b, off, n);
|
outBuf.get(b, off, n);
|
||||||
LOG.trace("decompress: read {} remaining bytes from outBuf", n);
|
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
if (inBuf.position() > 0) {
|
if (inBuf.position() > 0) {
|
||||||
|
@ -89,43 +85,35 @@ public class LzmaDecompressor implements Decompressor {
|
||||||
int written = outBuf.position();
|
int written = outBuf.position();
|
||||||
outBuf.flip();
|
outBuf.flip();
|
||||||
inBuf.clear();
|
inBuf.clear();
|
||||||
LOG.trace("decompress: decompressed {} -> {}", remaining, written);
|
|
||||||
int n = Math.min(written, len);
|
int n = Math.min(written, len);
|
||||||
outBuf.get(b, off, n);
|
outBuf.get(b, off, n);
|
||||||
LOG.trace("decompress: {} bytes", n);
|
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
LOG.trace("decompress: No output, finished");
|
|
||||||
finished = true;
|
finished = true;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void end() {
|
public void end() {
|
||||||
LOG.trace("end");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean finished() {
|
public boolean finished() {
|
||||||
LOG.trace("finished");
|
|
||||||
return finished;
|
return finished;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getRemaining() {
|
public int getRemaining() {
|
||||||
LOG.trace("getRemaining: {}", inLen);
|
|
||||||
return inLen;
|
return inLen;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean needsDictionary() {
|
public boolean needsDictionary() {
|
||||||
LOG.trace("needsDictionary");
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void reset() {
|
public void reset() {
|
||||||
LOG.trace("reset");
|
|
||||||
inBuf.clear();
|
inBuf.clear();
|
||||||
inLen = 0;
|
inLen = 0;
|
||||||
outBuf.clear();
|
outBuf.clear();
|
||||||
|
@ -135,9 +123,7 @@ public class LzmaDecompressor implements Decompressor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean needsInput() {
|
public boolean needsInput() {
|
||||||
boolean b = (inBuf.position() == 0);
|
return inBuf.position() == 0;
|
||||||
LOG.trace("needsInput: {}", b);
|
|
||||||
return b;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -147,13 +133,11 @@ public class LzmaDecompressor implements Decompressor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setInput(byte[] b, int off, int len) {
|
public void setInput(byte[] b, int off, int len) {
|
||||||
LOG.trace("setInput: off={} len={}", off, len);
|
|
||||||
if (inBuf.remaining() < len) {
|
if (inBuf.remaining() < len) {
|
||||||
// Get a new buffer that can accomodate the accumulated input plus the additional
|
// Get a new buffer that can accomodate the accumulated input plus the additional
|
||||||
// input that would cause a buffer overflow without reallocation.
|
// input that would cause a buffer overflow without reallocation.
|
||||||
// This condition should be fortunately rare, because it is expensive.
|
// This condition should be fortunately rare, because it is expensive.
|
||||||
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
|
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
|
||||||
LOG.trace("setInput: resize inBuf {}", needed);
|
|
||||||
ByteBuffer newBuf = ByteBuffer.allocate(needed);
|
ByteBuffer newBuf = ByteBuffer.allocate(needed);
|
||||||
inBuf.flip();
|
inBuf.flip();
|
||||||
newBuf.put(inBuf);
|
newBuf.put(inBuf);
|
||||||
|
|
|
@ -26,8 +26,6 @@ import org.apache.hadoop.hbase.io.compress.CanReinit;
|
||||||
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
||||||
import org.apache.hadoop.io.compress.Compressor;
|
import org.apache.hadoop.io.compress.Compressor;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Hadoop compressor glue for zstd-jni.
|
* Hadoop compressor glue for zstd-jni.
|
||||||
|
@ -35,7 +33,6 @@ import org.slf4j.LoggerFactory;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class ZstdCompressor implements CanReinit, Compressor {
|
public class ZstdCompressor implements CanReinit, Compressor {
|
||||||
|
|
||||||
protected static final Logger LOG = LoggerFactory.getLogger(ZstdCompressor.class);
|
|
||||||
protected int level, bufferSize;
|
protected int level, bufferSize;
|
||||||
protected ByteBuffer inBuf, outBuf;
|
protected ByteBuffer inBuf, outBuf;
|
||||||
protected boolean finish, finished;
|
protected boolean finish, finished;
|
||||||
|
@ -66,7 +63,6 @@ public class ZstdCompressor implements CanReinit, Compressor {
|
||||||
if (outBuf.hasRemaining()) {
|
if (outBuf.hasRemaining()) {
|
||||||
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
||||||
outBuf.get(b, off, n);
|
outBuf.get(b, off, n);
|
||||||
LOG.trace("compress: read {} remaining bytes from outBuf", n);
|
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
// We don't actually begin compression until our caller calls finish().
|
// We don't actually begin compression until our caller calls finish().
|
||||||
|
@ -79,7 +75,6 @@ public class ZstdCompressor implements CanReinit, Compressor {
|
||||||
int needed = maxCompressedLength(uncompressed);
|
int needed = maxCompressedLength(uncompressed);
|
||||||
if (outBuf.capacity() < needed) {
|
if (outBuf.capacity() < needed) {
|
||||||
needed = CompressionUtil.roundInt2(needed);
|
needed = CompressionUtil.roundInt2(needed);
|
||||||
LOG.trace("compress: resize outBuf {}", needed);
|
|
||||||
outBuf = ByteBuffer.allocateDirect(needed);
|
outBuf = ByteBuffer.allocateDirect(needed);
|
||||||
} else {
|
} else {
|
||||||
outBuf.clear();
|
outBuf.clear();
|
||||||
|
@ -92,37 +87,30 @@ public class ZstdCompressor implements CanReinit, Compressor {
|
||||||
}
|
}
|
||||||
bytesWritten += written;
|
bytesWritten += written;
|
||||||
inBuf.clear();
|
inBuf.clear();
|
||||||
LOG.trace("compress: compressed {} -> {} (level {})", uncompressed, written, level);
|
|
||||||
finished = true;
|
finished = true;
|
||||||
outBuf.flip();
|
outBuf.flip();
|
||||||
int n = Math.min(written, len);
|
int n = Math.min(written, len);
|
||||||
outBuf.get(b, off, n);
|
outBuf.get(b, off, n);
|
||||||
LOG.trace("compress: {} bytes", n);
|
|
||||||
return n;
|
return n;
|
||||||
} else {
|
} else {
|
||||||
finished = true;
|
finished = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
LOG.trace("No output");
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void end() {
|
public void end() {
|
||||||
LOG.trace("end");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void finish() {
|
public void finish() {
|
||||||
LOG.trace("finish");
|
|
||||||
finish = true;
|
finish = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean finished() {
|
public boolean finished() {
|
||||||
boolean b = finished && !outBuf.hasRemaining();
|
return finished && !outBuf.hasRemaining();
|
||||||
LOG.trace("finished: {}", b);
|
|
||||||
return b;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -137,20 +125,16 @@ public class ZstdCompressor implements CanReinit, Compressor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean needsInput() {
|
public boolean needsInput() {
|
||||||
boolean b = !finished();
|
return !finished();
|
||||||
LOG.trace("needsInput: {}", b);
|
|
||||||
return b;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void reinit(final Configuration conf) {
|
public void reinit(final Configuration conf) {
|
||||||
LOG.trace("reinit");
|
|
||||||
if (conf != null) {
|
if (conf != null) {
|
||||||
// Level might have changed
|
// Level might have changed
|
||||||
boolean levelChanged = false;
|
boolean levelChanged = false;
|
||||||
int newLevel = ZstdCodec.getLevel(conf);
|
int newLevel = ZstdCodec.getLevel(conf);
|
||||||
if (level != newLevel) {
|
if (level != newLevel) {
|
||||||
LOG.trace("Level changed, was {} now {}", level, newLevel);
|
|
||||||
level = newLevel;
|
level = newLevel;
|
||||||
levelChanged = true;
|
levelChanged = true;
|
||||||
}
|
}
|
||||||
|
@ -162,7 +146,6 @@ public class ZstdCompressor implements CanReinit, Compressor {
|
||||||
if (dict == null || dictId != thisDictId || levelChanged) {
|
if (dict == null || dictId != thisDictId || levelChanged) {
|
||||||
dictId = thisDictId;
|
dictId = thisDictId;
|
||||||
dict = new ZstdDictCompress(b, level);
|
dict = new ZstdDictCompress(b, level);
|
||||||
LOG.trace("Reloaded dictionary, new id is {}", dictId);
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
dict = null;
|
dict = null;
|
||||||
|
@ -173,7 +156,6 @@ public class ZstdCompressor implements CanReinit, Compressor {
|
||||||
bufferSize = newBufferSize;
|
bufferSize = newBufferSize;
|
||||||
this.inBuf = ByteBuffer.allocateDirect(bufferSize);
|
this.inBuf = ByteBuffer.allocateDirect(bufferSize);
|
||||||
this.outBuf = ByteBuffer.allocateDirect(bufferSize);
|
this.outBuf = ByteBuffer.allocateDirect(bufferSize);
|
||||||
LOG.trace("Resized buffers, new size is {}", bufferSize);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
reset();
|
reset();
|
||||||
|
@ -181,7 +163,6 @@ public class ZstdCompressor implements CanReinit, Compressor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void reset() {
|
public void reset() {
|
||||||
LOG.trace("reset");
|
|
||||||
inBuf.clear();
|
inBuf.clear();
|
||||||
outBuf.clear();
|
outBuf.clear();
|
||||||
outBuf.position(outBuf.capacity());
|
outBuf.position(outBuf.capacity());
|
||||||
|
@ -198,13 +179,11 @@ public class ZstdCompressor implements CanReinit, Compressor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setInput(final byte[] b, final int off, final int len) {
|
public void setInput(final byte[] b, final int off, final int len) {
|
||||||
LOG.trace("setInput: off={} len={}", off, len);
|
|
||||||
if (inBuf.remaining() < len) {
|
if (inBuf.remaining() < len) {
|
||||||
// Get a new buffer that can accomodate the accumulated input plus the additional
|
// Get a new buffer that can accomodate the accumulated input plus the additional
|
||||||
// input that would cause a buffer overflow without reallocation.
|
// input that would cause a buffer overflow without reallocation.
|
||||||
// This condition should be fortunately rare, because it is expensive.
|
// This condition should be fortunately rare, because it is expensive.
|
||||||
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
|
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
|
||||||
LOG.trace("setInput: resize inBuf {}", needed);
|
|
||||||
ByteBuffer newBuf = ByteBuffer.allocateDirect(needed);
|
ByteBuffer newBuf = ByteBuffer.allocateDirect(needed);
|
||||||
inBuf.flip();
|
inBuf.flip();
|
||||||
newBuf.put(inBuf);
|
newBuf.put(inBuf);
|
||||||
|
|
|
@ -26,8 +26,6 @@ import org.apache.hadoop.hbase.io.compress.CanReinit;
|
||||||
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
|
||||||
import org.apache.hadoop.io.compress.Decompressor;
|
import org.apache.hadoop.io.compress.Decompressor;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Hadoop decompressor glue for zstd-java.
|
* Hadoop decompressor glue for zstd-java.
|
||||||
|
@ -35,7 +33,6 @@ import org.slf4j.LoggerFactory;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class ZstdDecompressor implements CanReinit, Decompressor {
|
public class ZstdDecompressor implements CanReinit, Decompressor {
|
||||||
|
|
||||||
protected static final Logger LOG = LoggerFactory.getLogger(ZstdDecompressor.class);
|
|
||||||
protected ByteBuffer inBuf, outBuf;
|
protected ByteBuffer inBuf, outBuf;
|
||||||
protected int bufferSize;
|
protected int bufferSize;
|
||||||
protected int inLen;
|
protected int inLen;
|
||||||
|
@ -63,7 +60,6 @@ public class ZstdDecompressor implements CanReinit, Decompressor {
|
||||||
if (outBuf.hasRemaining()) {
|
if (outBuf.hasRemaining()) {
|
||||||
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
|
||||||
outBuf.get(b, off, n);
|
outBuf.get(b, off, n);
|
||||||
LOG.trace("decompress: read {} remaining bytes from outBuf", n);
|
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
if (inBuf.position() > 0) {
|
if (inBuf.position() > 0) {
|
||||||
|
@ -78,44 +74,36 @@ public class ZstdDecompressor implements CanReinit, Decompressor {
|
||||||
written = Zstd.decompress(outBuf, inBuf);
|
written = Zstd.decompress(outBuf, inBuf);
|
||||||
}
|
}
|
||||||
inBuf.clear();
|
inBuf.clear();
|
||||||
LOG.trace("decompress: decompressed {} -> {}", remaining, written);
|
|
||||||
outBuf.flip();
|
outBuf.flip();
|
||||||
int n = Math.min(written, len);
|
int n = Math.min(written, len);
|
||||||
outBuf.get(b, off, n);
|
outBuf.get(b, off, n);
|
||||||
LOG.trace("decompress: {} bytes", n);
|
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
LOG.trace("decompress: No output, finished");
|
|
||||||
finished = true;
|
finished = true;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void end() {
|
public void end() {
|
||||||
LOG.trace("end");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean finished() {
|
public boolean finished() {
|
||||||
LOG.trace("finished");
|
|
||||||
return finished;
|
return finished;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getRemaining() {
|
public int getRemaining() {
|
||||||
LOG.trace("getRemaining: {}", inLen);
|
|
||||||
return inLen;
|
return inLen;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean needsDictionary() {
|
public boolean needsDictionary() {
|
||||||
LOG.trace("needsDictionary");
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void reset() {
|
public void reset() {
|
||||||
LOG.trace("reset");
|
|
||||||
inBuf.clear();
|
inBuf.clear();
|
||||||
inLen = 0;
|
inLen = 0;
|
||||||
outBuf.clear();
|
outBuf.clear();
|
||||||
|
@ -125,9 +113,7 @@ public class ZstdDecompressor implements CanReinit, Decompressor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean needsInput() {
|
public boolean needsInput() {
|
||||||
final boolean b = (inBuf.position() == 0);
|
return (inBuf.position() == 0);
|
||||||
LOG.trace("needsInput: {}", b);
|
|
||||||
return b;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -137,13 +123,11 @@ public class ZstdDecompressor implements CanReinit, Decompressor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setInput(final byte[] b, final int off, final int len) {
|
public void setInput(final byte[] b, final int off, final int len) {
|
||||||
LOG.trace("setInput: off={} len={}", off, len);
|
|
||||||
if (inBuf.remaining() < len) {
|
if (inBuf.remaining() < len) {
|
||||||
// Get a new buffer that can accomodate the accumulated input plus the additional
|
// Get a new buffer that can accomodate the accumulated input plus the additional
|
||||||
// input that would cause a buffer overflow without reallocation.
|
// input that would cause a buffer overflow without reallocation.
|
||||||
// This condition should be fortunately rare, because it is expensive.
|
// This condition should be fortunately rare, because it is expensive.
|
||||||
final int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
|
final int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
|
||||||
LOG.trace("setInput: resize inBuf {}", needed);
|
|
||||||
ByteBuffer newBuf = ByteBuffer.allocateDirect(needed);
|
ByteBuffer newBuf = ByteBuffer.allocateDirect(needed);
|
||||||
inBuf.flip();
|
inBuf.flip();
|
||||||
newBuf.put(inBuf);
|
newBuf.put(inBuf);
|
||||||
|
@ -156,7 +140,6 @@ public class ZstdDecompressor implements CanReinit, Decompressor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void reinit(final Configuration conf) {
|
public void reinit(final Configuration conf) {
|
||||||
LOG.trace("reinit");
|
|
||||||
if (conf != null) {
|
if (conf != null) {
|
||||||
// Dictionary may have changed
|
// Dictionary may have changed
|
||||||
byte[] b = ZstdCodec.getDictionary(conf);
|
byte[] b = ZstdCodec.getDictionary(conf);
|
||||||
|
@ -166,7 +149,6 @@ public class ZstdDecompressor implements CanReinit, Decompressor {
|
||||||
if (dict == null || dictId != thisDictId) {
|
if (dict == null || dictId != thisDictId) {
|
||||||
dictId = thisDictId;
|
dictId = thisDictId;
|
||||||
dict = new ZstdDictDecompress(b);
|
dict = new ZstdDictDecompress(b);
|
||||||
LOG.trace("Reloaded dictionary, new id is {}", dictId);
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
dict = null;
|
dict = null;
|
||||||
|
@ -177,7 +159,6 @@ public class ZstdDecompressor implements CanReinit, Decompressor {
|
||||||
bufferSize = newBufferSize;
|
bufferSize = newBufferSize;
|
||||||
this.inBuf = ByteBuffer.allocateDirect(bufferSize);
|
this.inBuf = ByteBuffer.allocateDirect(bufferSize);
|
||||||
this.outBuf = ByteBuffer.allocateDirect(bufferSize);
|
this.outBuf = ByteBuffer.allocateDirect(bufferSize);
|
||||||
LOG.trace("Resized buffers, new size is {}", bufferSize);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
reset();
|
reset();
|
||||||
|
|
Loading…
Reference in New Issue