diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/AlreadyClosedException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/AlreadyClosedException.java new file mode 100644 index 00000000000..104ad24577f --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/AlreadyClosedException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress; + +import java.io.IOException; + +/** + * An exception class for when a closed compressor/decopressor is being used. + * {@link org.apache.hadoop.io.compress.Compressor} + * {@link org.apache.hadoop.io.compress.Decompressor} + */ +public class AlreadyClosedException extends IOException { + + public AlreadyClosedException(String message) { + super(message); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java index 69e8c99a1f4..9a18f9c52c7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java @@ -205,6 +205,7 @@ public class CodecPool { } // if the compressor can't be reused, don't pool it. if (compressor.getClass().isAnnotationPresent(DoNotPool.class)) { + compressor.end(); return; } compressor.reset(); @@ -225,6 +226,7 @@ public class CodecPool { } // if the decompressor can't be reused, don't pool it. if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) { + decompressor.end(); return; } decompressor.reset(); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java index 896d35eb180..24b8c392f76 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java @@ -23,6 +23,7 @@ import java.util.zip.Checksum; import java.util.zip.DataFormatException; import java.util.zip.Inflater; +import org.apache.hadoop.io.compress.AlreadyClosedException; import org.apache.hadoop.io.compress.Decompressor; import org.apache.hadoop.io.compress.DoNotPool; import org.apache.hadoop.util.DataChecksum; @@ -105,7 +106,11 @@ public class BuiltInGzipDecompressor implements Decompressor { * Immediately after the trailer (and potentially prior to the next gzip * member/substream header), without reset() having been called. */ - FINISHED; + FINISHED, + /** + * Immediately after end() has been called. + */ + ENDED; } /** @@ -182,6 +187,10 @@ public class BuiltInGzipDecompressor implements Decompressor { throws IOException { int numAvailBytes = 0; + if (state == GzipStateLabel.ENDED) { + throw new AlreadyClosedException("decompress called on closed decompressor"); + } + if (state != GzipStateLabel.DEFLATE_STREAM) { executeHeaderState(); @@ -472,6 +481,8 @@ public class BuiltInGzipDecompressor implements Decompressor { @Override public synchronized void end() { inflater.end(); + + state = GzipStateLabel.ENDED; } /** diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java index 1fb25cb9087..367b85862e8 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java @@ -19,6 +19,10 @@ package org.apache.hadoop.io.compress; import static org.junit.Assert.assertEquals; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.OutputStream; +import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -26,6 +30,8 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.zlib.BuiltInGzipDecompressor; +import org.apache.hadoop.test.LambdaTestUtils; import org.junit.Before; import org.junit.Test; @@ -189,4 +195,36 @@ public class TestCodecPool { CodecPool.returnDecompressor(decompressor); } } + + @Test(timeout = 10000) + public void testDoNotPoolDecompressorNotUseableAfterReturn() throws Exception { + + final GzipCodec gzipCodec = new GzipCodec(); + gzipCodec.setConf(new Configuration()); + + final Random random = new Random(); + final byte[] bytes = new byte[1024]; + random.nextBytes(bytes); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (OutputStream outputStream = gzipCodec.createOutputStream(baos)) { + outputStream.write(bytes); + } + + final byte[] gzipBytes = baos.toByteArray(); + final ByteArrayInputStream bais = new ByteArrayInputStream(gzipBytes); + + // BuiltInGzipDecompressor is an explicit example of a Decompressor + // with the @DoNotPool annotation + final Decompressor decompressor = new BuiltInGzipDecompressor(); + CodecPool.returnDecompressor(decompressor); + + final CompressionInputStream inputStream = gzipCodec.createInputStream(bais, decompressor); + LambdaTestUtils.intercept( + AlreadyClosedException.class, + "decompress called on closed decompressor", + "Decompressor from Codec with @DoNotPool should not be " + + "useable after returning to CodecPool", + () -> inputStream.read()); + } }