HBASE-11748 Cleanup and add pool usage tracing to Compression

This commit is contained in:
Nick Dimiduk 2014-08-14 14:30:46 -07:00
parent 7df6f58001
commit fe7f1320a0
1 changed files with 14 additions and 26 deletions

View File

@ -168,11 +168,6 @@ public final class Compression {
if (downStreamBufferSize > 0) {
return new BufferedInputStream(downStream, downStreamBufferSize);
}
// else {
// Make sure we bypass FSInputChecker buffer.
// return new BufferedInputStream(downStream, 1024);
// }
// }
return downStream;
}
@ -208,8 +203,7 @@ public final class Compression {
try {
Class<?> externalCodec =
getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.SnappyCodec");
return (CompressionCodec) ReflectionUtils.newInstance(externalCodec,
conf);
return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
@ -236,8 +230,7 @@ public final class Compression {
try {
Class<?> externalCodec =
getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.Lz4Codec");
return (CompressionCodec) ReflectionUtils.newInstance(externalCodec,
conf);
return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
@ -246,9 +239,9 @@ public final class Compression {
private final Configuration conf;
private final String compressName;
// data input buffer size to absorb small reads from application.
/** data input buffer size to absorb small reads from application. */
private static final int DATA_IBUF_SIZE = 1 * 1024;
// data output buffer size to absorb small writes from application.
/** data output buffer size to absorb small writes from application. */
private static final int DATA_OBUF_SIZE = 4 * 1024;
Algorithm(String name) {
@ -308,14 +301,11 @@ public final class Compression {
CompressionCodec codec = getCodec(conf);
if (codec != null) {
Compressor compressor = CodecPool.getCompressor(codec);
if (LOG.isTraceEnabled()) LOG.trace("Retrieved compressor " + compressor + " from pool.");
if (compressor != null) {
if (compressor.finished()) {
// Somebody returns the compressor to CodecPool but is still using
// it.
LOG
.warn("Compressor obtained from CodecPool is already finished()");
// throw new AssertionError(
// "Compressor obtained from CodecPool is already finished()");
// Somebody returns the compressor to CodecPool but is still using it.
LOG.warn("Compressor obtained from CodecPool is already finished()");
}
compressor.reset();
}
@ -326,6 +316,7 @@ public final class Compression {
public void returnCompressor(Compressor compressor) {
if (compressor != null) {
if (LOG.isTraceEnabled()) LOG.trace("Returning compressor " + compressor + " to pool.");
CodecPool.returnCompressor(compressor);
}
}
@ -334,14 +325,11 @@ public final class Compression {
CompressionCodec codec = getCodec(conf);
if (codec != null) {
Decompressor decompressor = CodecPool.getDecompressor(codec);
if (LOG.isTraceEnabled()) LOG.trace("Retrieved decompressor " + decompressor + " from pool.");
if (decompressor != null) {
if (decompressor.finished()) {
// Somebody returns the decompressor to CodecPool but is still using
// it.
LOG
.warn("Deompressor obtained from CodecPool is already finished()");
// throw new AssertionError(
// "Decompressor obtained from CodecPool is already finished()");
// Somebody returns the decompressor to CodecPool but is still using it.
LOG.warn("Deompressor obtained from CodecPool is already finished()");
}
decompressor.reset();
}
@ -353,8 +341,10 @@ public final class Compression {
public void returnDecompressor(Decompressor decompressor) {
if (decompressor != null) {
if (LOG.isTraceEnabled()) LOG.trace("Returning decompressor " + decompressor + " to pool.");
CodecPool.returnDecompressor(decompressor);
if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
if (LOG.isTraceEnabled()) LOG.trace("Ending decompressor " + decompressor);
decompressor.end();
}
}
@ -374,8 +364,7 @@ public final class Compression {
}
}
throw new IllegalArgumentException(
"Unsupported compression algorithm name: " + compressName);
throw new IllegalArgumentException("Unsupported compression algorithm name: " + compressName);
}
/**
@ -442,5 +431,4 @@ public final class Compression {
}
}
}
}