HBASE-11748 Cleanup and add pool usage tracing to Compression
This commit is contained in:
parent
e1e70a7e2f
commit
9936b86f70
|
@ -168,11 +168,6 @@ public final class Compression {
|
|||
if (downStreamBufferSize > 0) {
|
||||
return new BufferedInputStream(downStream, downStreamBufferSize);
|
||||
}
|
||||
// else {
|
||||
// Make sure we bypass FSInputChecker buffer.
|
||||
// return new BufferedInputStream(downStream, 1024);
|
||||
// }
|
||||
// }
|
||||
return downStream;
|
||||
}
|
||||
|
||||
|
@ -208,8 +203,7 @@ public final class Compression {
|
|||
try {
|
||||
Class<?> externalCodec =
|
||||
getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.SnappyCodec");
|
||||
return (CompressionCodec) ReflectionUtils.newInstance(externalCodec,
|
||||
conf);
|
||||
return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf);
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
@ -236,8 +230,7 @@ public final class Compression {
|
|||
try {
|
||||
Class<?> externalCodec =
|
||||
getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.Lz4Codec");
|
||||
return (CompressionCodec) ReflectionUtils.newInstance(externalCodec,
|
||||
conf);
|
||||
return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf);
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
@ -246,9 +239,9 @@ public final class Compression {
|
|||
|
||||
private final Configuration conf;
|
||||
private final String compressName;
|
||||
// data input buffer size to absorb small reads from application.
|
||||
/** data input buffer size to absorb small reads from application. */
|
||||
private static final int DATA_IBUF_SIZE = 1 * 1024;
|
||||
// data output buffer size to absorb small writes from application.
|
||||
/** data output buffer size to absorb small writes from application. */
|
||||
private static final int DATA_OBUF_SIZE = 4 * 1024;
|
||||
|
||||
Algorithm(String name) {
|
||||
|
@ -308,14 +301,11 @@ public final class Compression {
|
|||
CompressionCodec codec = getCodec(conf);
|
||||
if (codec != null) {
|
||||
Compressor compressor = CodecPool.getCompressor(codec);
|
||||
if (LOG.isTraceEnabled()) LOG.trace("Retrieved compressor " + compressor + " from pool.");
|
||||
if (compressor != null) {
|
||||
if (compressor.finished()) {
|
||||
// Somebody returns the compressor to CodecPool but is still using
|
||||
// it.
|
||||
LOG
|
||||
.warn("Compressor obtained from CodecPool is already finished()");
|
||||
// throw new AssertionError(
|
||||
// "Compressor obtained from CodecPool is already finished()");
|
||||
// Somebody returns the compressor to CodecPool but is still using it.
|
||||
LOG.warn("Compressor obtained from CodecPool is already finished()");
|
||||
}
|
||||
compressor.reset();
|
||||
}
|
||||
|
@ -326,6 +316,7 @@ public final class Compression {
|
|||
|
||||
public void returnCompressor(Compressor compressor) {
|
||||
if (compressor != null) {
|
||||
if (LOG.isTraceEnabled()) LOG.trace("Returning compressor " + compressor + " to pool.");
|
||||
CodecPool.returnCompressor(compressor);
|
||||
}
|
||||
}
|
||||
|
@ -334,14 +325,11 @@ public final class Compression {
|
|||
CompressionCodec codec = getCodec(conf);
|
||||
if (codec != null) {
|
||||
Decompressor decompressor = CodecPool.getDecompressor(codec);
|
||||
if (LOG.isTraceEnabled()) LOG.trace("Retrieved decompressor " + decompressor + " from pool.");
|
||||
if (decompressor != null) {
|
||||
if (decompressor.finished()) {
|
||||
// Somebody returns the decompressor to CodecPool but is still using
|
||||
// it.
|
||||
LOG
|
||||
.warn("Deompressor obtained from CodecPool is already finished()");
|
||||
// throw new AssertionError(
|
||||
// "Decompressor obtained from CodecPool is already finished()");
|
||||
// Somebody returns the decompressor to CodecPool but is still using it.
|
||||
LOG.warn("Deompressor obtained from CodecPool is already finished()");
|
||||
}
|
||||
decompressor.reset();
|
||||
}
|
||||
|
@ -353,8 +341,10 @@ public final class Compression {
|
|||
|
||||
public void returnDecompressor(Decompressor decompressor) {
|
||||
if (decompressor != null) {
|
||||
if (LOG.isTraceEnabled()) LOG.trace("Returning decompressor " + decompressor + " to pool.");
|
||||
CodecPool.returnDecompressor(decompressor);
|
||||
if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
|
||||
if (LOG.isTraceEnabled()) LOG.trace("Ending decompressor " + decompressor);
|
||||
decompressor.end();
|
||||
}
|
||||
}
|
||||
|
@ -374,8 +364,7 @@ public final class Compression {
|
|||
}
|
||||
}
|
||||
|
||||
throw new IllegalArgumentException(
|
||||
"Unsupported compression algorithm name: " + compressName);
|
||||
throw new IllegalArgumentException("Unsupported compression algorithm name: " + compressName);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -442,5 +431,4 @@ public final class Compression {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue