From b737869e01fe3334b948a38fe3835e48873bf3a6 Mon Sep 17 00:00:00 2001 From: kevins-29 <100220899+kevins-29@users.noreply.github.com> Date: Sat, 13 Aug 2022 01:05:13 +0200 Subject: [PATCH] HADOOP-18383. Codecs with @DoNotPool annotation are not closed causing memory leak (#4585) --- .../io/compress/AlreadyClosedException.java | 33 +++++++++++ .../apache/hadoop/io/compress/CodecPool.java | 2 + .../compress/zlib/BuiltInGzipCompressor.java | 7 +++ .../zlib/BuiltInGzipDecompressor.java | 13 +++- .../hadoop/io/compress/TestCodecPool.java | 59 +++++++++++++++++++ 5 files changed, 113 insertions(+), 1 deletion(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/AlreadyClosedException.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/AlreadyClosedException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/AlreadyClosedException.java new file mode 100644 index 00000000000..993d2678d2a --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/AlreadyClosedException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress; + +import java.io.IOException; + +/** + * An exception class for when a closed compressor/decopressor is being used + * {@link org.apache.hadoop.io.compress.Compressor} + * {@link org.apache.hadoop.io.compress.Decompressor} + */ +public class AlreadyClosedException extends IOException { + + public AlreadyClosedException(String message) { + super(message); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java index 1f095c6c673..5b1826f9e30 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java @@ -205,6 +205,7 @@ public static void returnCompressor(Compressor compressor) { } // if the compressor can't be reused, don't pool it. if (compressor.getClass().isAnnotationPresent(DoNotPool.class)) { + compressor.end(); return; } compressor.reset(); @@ -225,6 +226,7 @@ public static void returnDecompressor(Decompressor decompressor) { } // if the decompressor can't be reused, don't pool it. if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) { + decompressor.end(); return; } decompressor.reset(); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipCompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipCompressor.java index fcb431dce86..d44413cc309 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipCompressor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipCompressor.java @@ -24,6 +24,7 @@ import java.util.zip.GZIPOutputStream; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.AlreadyClosedException; import org.apache.hadoop.io.compress.Compressor; import org.apache.hadoop.io.compress.DoNotPool; import org.apache.hadoop.util.DataChecksum; @@ -83,6 +84,10 @@ public int compress(byte[] b, int off, int len) throws IOException { throw new IOException("compress called on finished compressor"); } + if (state == BuiltInGzipDecompressor.GzipStateLabel.ENDED) { + throw new AlreadyClosedException("compress called on closed compressor"); + } + int compressedBytesWritten = 0; // If we are not within uncompressed data yet, output the header. @@ -139,6 +144,8 @@ public long getBytesWritten() { @Override public void end() { deflater.end(); + + state = BuiltInGzipDecompressor.GzipStateLabel.ENDED; } @Override diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java index 47c21b4e3ea..d47864a71f4 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java @@ -23,6 +23,7 @@ import java.util.zip.DataFormatException; import java.util.zip.Inflater; +import org.apache.hadoop.io.compress.AlreadyClosedException; import org.apache.hadoop.io.compress.Decompressor; import org.apache.hadoop.io.compress.DoNotPool; import org.apache.hadoop.util.DataChecksum; @@ -109,7 +110,11 @@ public enum GzipStateLabel { * Immediately after the trailer (and potentially prior to the next gzip * member/substream header), without reset() having been called. */ - FINISHED; + FINISHED, + /** + * Immediately after end() has been called. + */ + ENDED; } /** @@ -186,6 +191,10 @@ public synchronized int decompress(byte[] b, int off, int len) throws IOException { int numAvailBytes = 0; + if (state == GzipStateLabel.ENDED) { + throw new AlreadyClosedException("decompress called on closed decompressor"); + } + if (state != GzipStateLabel.DEFLATE_STREAM) { executeHeaderState(); @@ -476,6 +485,8 @@ public synchronized void reset() { @Override public synchronized void end() { inflater.end(); + + state = GzipStateLabel.ENDED; } /** diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java index ec99598e79c..4b18ee6047b 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java @@ -19,6 +19,10 @@ import static org.junit.Assert.assertEquals; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.OutputStream; +import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -26,6 +30,9 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.zlib.BuiltInGzipCompressor; +import org.apache.hadoop.io.compress.zlib.BuiltInGzipDecompressor; +import org.apache.hadoop.test.LambdaTestUtils; import org.junit.Before; import org.junit.Test; @@ -189,4 +196,56 @@ public void testDecompressorNotReturnSameInstance() { CodecPool.returnDecompressor(decompressor); } } + + @Test(timeout = 10000) + public void testDoNotPoolCompressorNotUseableAfterReturn() throws Exception { + + final GzipCodec gzipCodec = new GzipCodec(); + gzipCodec.setConf(new Configuration()); + + // BuiltInGzipCompressor is an explicit example of a Compressor with the @DoNotPool annotation + final Compressor compressor = new BuiltInGzipCompressor(new Configuration()); + CodecPool.returnCompressor(compressor); + + final CompressionOutputStream outputStream = + gzipCodec.createOutputStream(new ByteArrayOutputStream(), compressor); + LambdaTestUtils.intercept( + AlreadyClosedException.class, + "compress called on closed compressor", + "Compressor from Codec with @DoNotPool should not be " + + "useable after returning to CodecPool", + () -> outputStream.write(1)); + } + + @Test(timeout = 10000) + public void testDoNotPoolDecompressorNotUseableAfterReturn() throws Exception { + + final GzipCodec gzipCodec = new GzipCodec(); + gzipCodec.setConf(new Configuration()); + + final Random random = new Random(); + final byte[] bytes = new byte[1024]; + random.nextBytes(bytes); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (OutputStream outputStream = gzipCodec.createOutputStream(baos)) { + outputStream.write(bytes); + } + + final byte[] gzipBytes = baos.toByteArray(); + final ByteArrayInputStream bais = new ByteArrayInputStream(gzipBytes); + + // BuiltInGzipDecompressor is an explicit example of a Decompressor + // with the @DoNotPool annotation + final Decompressor decompressor = new BuiltInGzipDecompressor(); + CodecPool.returnDecompressor(decompressor); + + final CompressionInputStream inputStream = gzipCodec.createInputStream(bais, decompressor); + LambdaTestUtils.intercept( + AlreadyClosedException.class, + "decompress called on closed decompressor", + "Decompressor from Codec with @DoNotPool should not be " + + "useable after returning to CodecPool", + () -> inputStream.read()); + } }