From 9fce60d90899504410bc9e413a17535762f7b1ec Mon Sep 17 00:00:00 2001 From: Colin McCabe Date: Thu, 17 Jul 2014 18:20:53 +0000 Subject: [PATCH] HADOOP-10591. Compression codecs must used pooled direct buffers or deallocate direct buffers when stream is closed (cmccabe) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1611427 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 3 + .../apache/hadoop/io/compress/BZip2Codec.java | 6 +- .../hadoop/io/compress/CompressionCodec.java | 55 +++++++++++++++++++ .../io/compress/CompressionInputStream.java | 10 ++++ .../io/compress/CompressionOutputStream.java | 18 +++++- .../hadoop/io/compress/DefaultCodec.java | 14 ++--- .../apache/hadoop/io/compress/GzipCodec.java | 14 +++-- .../apache/hadoop/io/compress/Lz4Codec.java | 6 +- .../hadoop/io/compress/SnappyCodec.java | 6 +- 9 files changed, 108 insertions(+), 24 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index fad562529a5..9bb4fe43e71 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -51,6 +51,9 @@ Release 2.6.0 - UNRELEASED HADOOP-9921. daemon scripts should remove pid file on stop call after stop or process is found not running ( vinayakumarb ) + HADOOP-10591. Compression codecs must used pooled direct buffers or + deallocate direct buffers when stream is closed (cmccabe) + Release 2.5.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java index 42e96cfdc50..37b97f2a641 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java @@ -100,7 +100,8 @@ public BZip2Codec() { } @Override public CompressionOutputStream createOutputStream(OutputStream out) throws IOException { - return createOutputStream(out, createCompressor()); + return CompressionCodec.Util. + createOutputStreamWithCodecPool(this, conf, out); } /** @@ -153,7 +154,8 @@ public Compressor createCompressor() { @Override public CompressionInputStream createInputStream(InputStream in) throws IOException { - return createInputStream(in, createDecompressor()); + return CompressionCodec.Util. + createInputStreamWithCodecPool(this, conf, in); } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionCodec.java index af2ff20b39d..f37aadfcb57 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionCodec.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionCodec.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; /** * This class encapsulates a streaming compression/decompression pair. @@ -113,4 +114,58 @@ CompressionInputStream createInputStream(InputStream in, * @return the extension including the '.' */ String getDefaultExtension(); + + static class Util { + /** + * Create an output stream with a codec taken from the global CodecPool. + * + * @param codec The codec to use to create the output stream. + * @param conf The configuration to use if we need to create a new codec. + * @param out The output stream to wrap. + * @return The new output stream + * @throws IOException + */ + static CompressionOutputStream createOutputStreamWithCodecPool( + CompressionCodec codec, Configuration conf, OutputStream out) + throws IOException { + Compressor compressor = CodecPool.getCompressor(codec, conf); + CompressionOutputStream stream = null; + try { + stream = codec.createOutputStream(out, compressor); + } finally { + if (stream == null) { + CodecPool.returnCompressor(compressor); + } else { + stream.setTrackedCompressor(compressor); + } + } + return stream; + } + + /** + * Create an input stream with a codec taken from the global CodecPool. + * + * @param codec The codec to use to create the input stream. + * @param conf The configuration to use if we need to create a new codec. + * @param in The input stream to wrap. + * @return The new input stream + * @throws IOException + */ + static CompressionInputStream createInputStreamWithCodecPool( + CompressionCodec codec, Configuration conf, InputStream in) + throws IOException { + Decompressor decompressor = CodecPool.getDecompressor(codec); + CompressionInputStream stream = null; + try { + stream = codec.createInputStream(in, decompressor); + } finally { + if (stream == null) { + CodecPool.returnDecompressor(decompressor); + } else { + stream.setTrackedDecompressor(decompressor); + } + } + return stream; + } + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java index 4491819d72c..cf3ac401cdd 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java @@ -41,6 +41,8 @@ public abstract class CompressionInputStream extends InputStream implements Seek protected final InputStream in; protected long maxAvailableData = 0L; + private Decompressor trackedDecompressor; + /** * Create a compression input stream that reads * the decompressed bytes from the given stream. @@ -58,6 +60,10 @@ protected CompressionInputStream(InputStream in) throws IOException { @Override public void close() throws IOException { in.close(); + if (trackedDecompressor != null) { + CodecPool.returnDecompressor(trackedDecompressor); + trackedDecompressor = null; + } } /** @@ -112,4 +118,8 @@ public void seek(long pos) throws UnsupportedOperationException { public boolean seekToNewSource(long targetPos) throws UnsupportedOperationException { throw new UnsupportedOperationException(); } + + void setTrackedDecompressor(Decompressor decompressor) { + trackedDecompressor = decompressor; + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java index 9bd6b84f988..00e272a9cc5 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java @@ -34,7 +34,13 @@ public abstract class CompressionOutputStream extends OutputStream { * The output stream to be compressed. */ protected final OutputStream out; - + + /** + * If non-null, this is the Compressor object that we should call + * CodecPool#returnCompressor on when this stream is closed. + */ + private Compressor trackedCompressor; + /** * Create a compression output stream that writes * the compressed bytes to the given stream. @@ -43,11 +49,19 @@ public abstract class CompressionOutputStream extends OutputStream { protected CompressionOutputStream(OutputStream out) { this.out = out; } - + + void setTrackedCompressor(Compressor compressor) { + trackedCompressor = compressor; + } + @Override public void close() throws IOException { finish(); out.close(); + if (trackedCompressor != null) { + CodecPool.returnCompressor(trackedCompressor); + trackedCompressor = null; + } } @Override diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DefaultCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DefaultCodec.java index dc02dcaf429..0e6f02cc9f4 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DefaultCodec.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DefaultCodec.java @@ -51,14 +51,8 @@ public Configuration getConf() { @Override public CompressionOutputStream createOutputStream(OutputStream out) throws IOException { - // This may leak memory if called in a loop. The createCompressor() call - // may cause allocation of an untracked direct-backed buffer if native - // libs are being used (even if you close the stream). A Compressor - // object should be reused between successive calls. - LOG.warn("DefaultCodec.createOutputStream() may leak memory. " - + "Create a compressor first."); - return new CompressorStream(out, createCompressor(), - conf.getInt("io.file.buffer.size", 4*1024)); + return CompressionCodec.Util. + createOutputStreamWithCodecPool(this, conf, out); } @Override @@ -82,8 +76,8 @@ public Compressor createCompressor() { @Override public CompressionInputStream createInputStream(InputStream in) throws IOException { - return new DecompressorStream(in, createDecompressor(), - conf.getInt("io.file.buffer.size", 4*1024)); + return CompressionCodec.Util. + createInputStreamWithCodecPool(this, conf, in); } @Override diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java index e7e94a0b3d9..da0c5ad8478 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java @@ -104,10 +104,11 @@ public void resetState() throws IOException { @Override public CompressionOutputStream createOutputStream(OutputStream out) throws IOException { - return (ZlibFactory.isNativeZlibLoaded(conf)) ? - new CompressorStream(out, createCompressor(), - conf.getInt("io.file.buffer.size", 4*1024)) : - new GzipOutputStream(out); + if (!ZlibFactory.isNativeZlibLoaded(conf)) { + return new GzipOutputStream(out); + } + return CompressionCodec.Util. + createOutputStreamWithCodecPool(this, conf, out); } @Override @@ -137,8 +138,9 @@ public Class getCompressorType() { @Override public CompressionInputStream createInputStream(InputStream in) - throws IOException { - return createInputStream(in, null); + throws IOException { + return CompressionCodec.Util. + createInputStreamWithCodecPool(this, conf, in); } @Override diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java index 4b0ea796b71..61462c08ddc 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java @@ -84,7 +84,8 @@ public static String getLibraryName() { @Override public CompressionOutputStream createOutputStream(OutputStream out) throws IOException { - return createOutputStream(out, createCompressor()); + return CompressionCodec.Util. + createOutputStreamWithCodecPool(this, conf, out); } /** @@ -157,7 +158,8 @@ public Compressor createCompressor() { @Override public CompressionInputStream createInputStream(InputStream in) throws IOException { - return createInputStream(in, createDecompressor()); + return CompressionCodec.Util. + createInputStreamWithCodecPool(this, conf, in); } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java index 402f8c8e99f..8d2fa1a6fb4 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java @@ -95,7 +95,8 @@ public static String getLibraryName() { @Override public CompressionOutputStream createOutputStream(OutputStream out) throws IOException { - return createOutputStream(out, createCompressor()); + return CompressionCodec.Util. + createOutputStreamWithCodecPool(this, conf, out); } /** @@ -158,7 +159,8 @@ public Compressor createCompressor() { @Override public CompressionInputStream createInputStream(InputStream in) throws IOException { - return createInputStream(in, createDecompressor()); + return CompressionCodec.Util. + createInputStreamWithCodecPool(this, conf, in); } /**