diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java index 08b4d4d5df9..331606eb449 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java @@ -336,15 +336,11 @@ public void write(byte[] b, int off, int len) throws IOException { } public void close() throws IOException { - if (needsReset) { - // In the case that nothing is written to this stream, we still need to - // write out the header before closing, otherwise the stream won't be - // recognized by BZip2CompressionInputStream. - internalReset(); + try { + super.close(); + } finally { + output.close(); } - this.output.flush(); - this.output.close(); - needsReset = true; } }// end of class BZip2CompressionOutputStream @@ -454,8 +450,12 @@ private BufferedInputStream readStreamHeader() throws IOException { public void close() throws IOException { if (!needsReset) { - input.close(); - needsReset = true; + try { + input.close(); + needsReset = true; + } finally { + super.close(); + } } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java index bb566ded68d..01bffa78a1d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java @@ -157,7 +157,10 @@ public static Compressor getCompressor(CompressionCodec codec, Configuration con LOG.debug("Got recycled compressor"); } } - updateLeaseCount(compressorCounts, compressor, 1); + if (compressor != null && + !compressor.getClass().isAnnotationPresent(DoNotPool.class)) { + updateLeaseCount(compressorCounts, compressor, 1); + } return compressor; } @@ -184,7 +187,10 @@ public static Decompressor getDecompressor(CompressionCodec codec) { LOG.debug("Got recycled decompressor"); } } - updateLeaseCount(decompressorCounts, decompressor, 1); + if (decompressor != null && + !decompressor.getClass().isAnnotationPresent(DoNotPool.class)) { + updateLeaseCount(decompressorCounts, decompressor, 1); + } return decompressor; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java index cf3ac401cdd..2dfa30bf76e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java @@ -59,10 +59,13 @@ protected CompressionInputStream(InputStream in) throws IOException { @Override public void close() throws IOException { - in.close(); - if (trackedDecompressor != null) { - CodecPool.returnDecompressor(trackedDecompressor); - trackedDecompressor = null; + try { + in.close(); + } finally { + if (trackedDecompressor != null) { + CodecPool.returnDecompressor(trackedDecompressor); + trackedDecompressor = null; + } } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java index 00e272a9cc5..71c7f32e665 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java @@ -56,11 +56,17 @@ void setTrackedCompressor(Compressor compressor) { @Override public void close() throws IOException { - finish(); - out.close(); - if (trackedCompressor != null) { - CodecPool.returnCompressor(trackedCompressor); - trackedCompressor = null; + try { + finish(); + } finally { + try { + out.close(); + } finally { + if (trackedCompressor != null) { + CodecPool.returnCompressor(trackedCompressor); + trackedCompressor = null; + } + } } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressorStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressorStream.java index 34426f84024..be5eee00bb2 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressorStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressorStream.java @@ -103,10 +103,9 @@ public void resetState() throws IOException { public void close() throws IOException { if (!closed) { try { - finish(); + super.close(); } finally { - out.close(); closed = true; } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DecompressorStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DecompressorStream.java index dab366adbfb..756ccf3c8ed 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DecompressorStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DecompressorStream.java @@ -221,8 +221,11 @@ public int available() throws IOException { @Override public void close() throws IOException { if (!closed) { - in.close(); - closed = true; + try { + super.close(); + } finally { + closed = true; + } } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java index 10295174519..bc0d21ed2b5 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java @@ -195,66 +195,83 @@ private static void codecTest(Configuration conf, int seed, int count, // Compress data DataOutputBuffer compressedDataBuffer = new DataOutputBuffer(); - CompressionOutputStream deflateFilter = + int leasedCompressorsBefore = codec.getCompressorType() == null ? -1 + : CodecPool.getLeasedCompressorsCount(codec); + try (CompressionOutputStream deflateFilter = codec.createOutputStream(compressedDataBuffer); - DataOutputStream deflateOut = - new DataOutputStream(new BufferedOutputStream(deflateFilter)); - deflateOut.write(data.getData(), 0, data.getLength()); - deflateOut.flush(); - deflateFilter.finish(); + DataOutputStream deflateOut = + new DataOutputStream(new BufferedOutputStream(deflateFilter))) { + deflateOut.write(data.getData(), 0, data.getLength()); + deflateOut.flush(); + deflateFilter.finish(); + } + if (leasedCompressorsBefore > -1) { + assertEquals("leased compressor not returned to the codec pool", + leasedCompressorsBefore, CodecPool.getLeasedCompressorsCount(codec)); + } LOG.info("Finished compressing data"); // De-compress data DataInputBuffer deCompressedDataBuffer = new DataInputBuffer(); deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0, compressedDataBuffer.getLength()); - CompressionInputStream inflateFilter = - codec.createInputStream(deCompressedDataBuffer); - DataInputStream inflateIn = - new DataInputStream(new BufferedInputStream(inflateFilter)); - - // Check DataInputBuffer originalData = new DataInputBuffer(); - originalData.reset(data.getData(), 0, data.getLength()); - DataInputStream originalIn = new DataInputStream(new BufferedInputStream(originalData)); - for(int i=0; i < count; ++i) { - RandomDatum k1 = new RandomDatum(); - RandomDatum v1 = new RandomDatum(); - k1.readFields(originalIn); - v1.readFields(originalIn); + int leasedDecompressorsBefore = + CodecPool.getLeasedDecompressorsCount(codec); + try (CompressionInputStream inflateFilter = + codec.createInputStream(deCompressedDataBuffer); + DataInputStream inflateIn = + new DataInputStream(new BufferedInputStream(inflateFilter))) { + + // Check + originalData.reset(data.getData(), 0, data.getLength()); + DataInputStream originalIn = + new DataInputStream(new BufferedInputStream(originalData)); + for(int i=0; i < count; ++i) { + RandomDatum k1 = new RandomDatum(); + RandomDatum v1 = new RandomDatum(); + k1.readFields(originalIn); + v1.readFields(originalIn); - RandomDatum k2 = new RandomDatum(); - RandomDatum v2 = new RandomDatum(); - k2.readFields(inflateIn); - v2.readFields(inflateIn); - assertTrue("original and compressed-then-decompressed-output not equal", - k1.equals(k2) && v1.equals(v2)); + RandomDatum k2 = new RandomDatum(); + RandomDatum v2 = new RandomDatum(); + k2.readFields(inflateIn); + v2.readFields(inflateIn); + assertTrue("original and compressed-then-decompressed-output not equal", + k1.equals(k2) && v1.equals(v2)); - // original and compressed-then-decompressed-output have the same hashCode - Map m = new HashMap(); - m.put(k1, k1.toString()); - m.put(v1, v1.toString()); - String result = m.get(k2); - assertEquals("k1 and k2 hashcode not equal", result, k1.toString()); - result = m.get(v2); - assertEquals("v1 and v2 hashcode not equal", result, v1.toString()); + // original and compressed-then-decompressed-output have the same + // hashCode + Map m = new HashMap(); + m.put(k1, k1.toString()); + m.put(v1, v1.toString()); + String result = m.get(k2); + assertEquals("k1 and k2 hashcode not equal", result, k1.toString()); + result = m.get(v2); + assertEquals("v1 and v2 hashcode not equal", result, v1.toString()); + } } + assertEquals("leased decompressor not returned to the codec pool", + leasedDecompressorsBefore, + CodecPool.getLeasedDecompressorsCount(codec)); // De-compress data byte-at-a-time originalData.reset(data.getData(), 0, data.getLength()); deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0, compressedDataBuffer.getLength()); - inflateFilter = + try (CompressionInputStream inflateFilter = codec.createInputStream(deCompressedDataBuffer); + DataInputStream originalIn = + new DataInputStream(new BufferedInputStream(originalData))) { - // Check - originalIn = new DataInputStream(new BufferedInputStream(originalData)); - int expected; - do { - expected = originalIn.read(); - assertEquals("Inflated stream read by byte does not match", - expected, inflateFilter.read()); - } while (expected != -1); + // Check + int expected; + do { + expected = originalIn.read(); + assertEquals("Inflated stream read by byte does not match", + expected, inflateFilter.read()); + } while (expected != -1); + } LOG.info("SUCCESS! Completed checking " + count + " records"); }