HADOOP-14376. Memory leak when reading a compressed file using the native library. Contributed by Eli Acherkan

This commit is contained in:
Jason Lowe 2017-05-12 16:54:08 -05:00
parent 6c35001b9f
commit 7bc2172248
7 changed files with 101 additions and 67 deletions

View File

@ -336,15 +336,11 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec {
} }
public void close() throws IOException { public void close() throws IOException {
if (needsReset) { try {
// In the case that nothing is written to this stream, we still need to super.close();
// write out the header before closing, otherwise the stream won't be } finally {
// recognized by BZip2CompressionInputStream. output.close();
internalReset();
} }
this.output.flush();
this.output.close();
needsReset = true;
} }
}// end of class BZip2CompressionOutputStream }// end of class BZip2CompressionOutputStream
@ -454,8 +450,12 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec {
public void close() throws IOException { public void close() throws IOException {
if (!needsReset) { if (!needsReset) {
try {
input.close(); input.close();
needsReset = true; needsReset = true;
} finally {
super.close();
}
} }
} }

View File

@ -157,7 +157,10 @@ public class CodecPool {
LOG.debug("Got recycled compressor"); LOG.debug("Got recycled compressor");
} }
} }
if (compressor != null &&
!compressor.getClass().isAnnotationPresent(DoNotPool.class)) {
updateLeaseCount(compressorCounts, compressor, 1); updateLeaseCount(compressorCounts, compressor, 1);
}
return compressor; return compressor;
} }
@ -184,7 +187,10 @@ public class CodecPool {
LOG.debug("Got recycled decompressor"); LOG.debug("Got recycled decompressor");
} }
} }
if (decompressor != null &&
!decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
updateLeaseCount(decompressorCounts, decompressor, 1); updateLeaseCount(decompressorCounts, decompressor, 1);
}
return decompressor; return decompressor;
} }

View File

@ -59,12 +59,15 @@ public abstract class CompressionInputStream extends InputStream implements Seek
@Override @Override
public void close() throws IOException { public void close() throws IOException {
try {
in.close(); in.close();
} finally {
if (trackedDecompressor != null) { if (trackedDecompressor != null) {
CodecPool.returnDecompressor(trackedDecompressor); CodecPool.returnDecompressor(trackedDecompressor);
trackedDecompressor = null; trackedDecompressor = null;
} }
} }
}
/** /**
* Read bytes from the stream. * Read bytes from the stream.

View File

@ -56,13 +56,19 @@ public abstract class CompressionOutputStream extends OutputStream {
@Override @Override
public void close() throws IOException { public void close() throws IOException {
try {
finish(); finish();
} finally {
try {
out.close(); out.close();
} finally {
if (trackedCompressor != null) { if (trackedCompressor != null) {
CodecPool.returnCompressor(trackedCompressor); CodecPool.returnCompressor(trackedCompressor);
trackedCompressor = null; trackedCompressor = null;
} }
} }
}
}
@Override @Override
public void flush() throws IOException { public void flush() throws IOException {

View File

@ -103,10 +103,9 @@ public class CompressorStream extends CompressionOutputStream {
public void close() throws IOException { public void close() throws IOException {
if (!closed) { if (!closed) {
try { try {
finish(); super.close();
} }
finally { finally {
out.close();
closed = true; closed = true;
} }
} }

View File

@ -221,10 +221,13 @@ public class DecompressorStream extends CompressionInputStream {
@Override @Override
public void close() throws IOException { public void close() throws IOException {
if (!closed) { if (!closed) {
in.close(); try {
super.close();
} finally {
closed = true; closed = true;
} }
} }
}
@Override @Override
public boolean markSupported() { public boolean markSupported() {

View File

@ -205,28 +205,38 @@ public class TestCodec {
// Compress data // Compress data
DataOutputBuffer compressedDataBuffer = new DataOutputBuffer(); DataOutputBuffer compressedDataBuffer = new DataOutputBuffer();
CompressionOutputStream deflateFilter = int leasedCompressorsBefore = codec.getCompressorType() == null ? -1
: CodecPool.getLeasedCompressorsCount(codec);
try (CompressionOutputStream deflateFilter =
codec.createOutputStream(compressedDataBuffer); codec.createOutputStream(compressedDataBuffer);
DataOutputStream deflateOut = DataOutputStream deflateOut =
new DataOutputStream(new BufferedOutputStream(deflateFilter)); new DataOutputStream(new BufferedOutputStream(deflateFilter))) {
deflateOut.write(data.getData(), 0, data.getLength()); deflateOut.write(data.getData(), 0, data.getLength());
deflateOut.flush(); deflateOut.flush();
deflateFilter.finish(); deflateFilter.finish();
}
if (leasedCompressorsBefore > -1) {
assertEquals("leased compressor not returned to the codec pool",
leasedCompressorsBefore, CodecPool.getLeasedCompressorsCount(codec));
}
LOG.info("Finished compressing data"); LOG.info("Finished compressing data");
// De-compress data // De-compress data
DataInputBuffer deCompressedDataBuffer = new DataInputBuffer(); DataInputBuffer deCompressedDataBuffer = new DataInputBuffer();
deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0, deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0,
compressedDataBuffer.getLength()); compressedDataBuffer.getLength());
CompressionInputStream inflateFilter = DataInputBuffer originalData = new DataInputBuffer();
int leasedDecompressorsBefore =
CodecPool.getLeasedDecompressorsCount(codec);
try (CompressionInputStream inflateFilter =
codec.createInputStream(deCompressedDataBuffer); codec.createInputStream(deCompressedDataBuffer);
DataInputStream inflateIn = DataInputStream inflateIn =
new DataInputStream(new BufferedInputStream(inflateFilter)); new DataInputStream(new BufferedInputStream(inflateFilter))) {
// Check // Check
DataInputBuffer originalData = new DataInputBuffer();
originalData.reset(data.getData(), 0, data.getLength()); originalData.reset(data.getData(), 0, data.getLength());
DataInputStream originalIn = new DataInputStream(new BufferedInputStream(originalData)); DataInputStream originalIn =
new DataInputStream(new BufferedInputStream(originalData));
for(int i=0; i < count; ++i) { for(int i=0; i < count; ++i) {
RandomDatum k1 = new RandomDatum(); RandomDatum k1 = new RandomDatum();
RandomDatum v1 = new RandomDatum(); RandomDatum v1 = new RandomDatum();
@ -240,7 +250,8 @@ public class TestCodec {
assertTrue("original and compressed-then-decompressed-output not equal", assertTrue("original and compressed-then-decompressed-output not equal",
k1.equals(k2) && v1.equals(v2)); k1.equals(k2) && v1.equals(v2));
// original and compressed-then-decompressed-output have the same hashCode // original and compressed-then-decompressed-output have the same
// hashCode
Map<RandomDatum, String> m = new HashMap<RandomDatum, String>(); Map<RandomDatum, String> m = new HashMap<RandomDatum, String>();
m.put(k1, k1.toString()); m.put(k1, k1.toString());
m.put(v1, v1.toString()); m.put(v1, v1.toString());
@ -249,22 +260,28 @@ public class TestCodec {
result = m.get(v2); result = m.get(v2);
assertEquals("v1 and v2 hashcode not equal", result, v1.toString()); assertEquals("v1 and v2 hashcode not equal", result, v1.toString());
} }
}
assertEquals("leased decompressor not returned to the codec pool",
leasedDecompressorsBefore,
CodecPool.getLeasedDecompressorsCount(codec));
// De-compress data byte-at-a-time // De-compress data byte-at-a-time
originalData.reset(data.getData(), 0, data.getLength()); originalData.reset(data.getData(), 0, data.getLength());
deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0, deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0,
compressedDataBuffer.getLength()); compressedDataBuffer.getLength());
inflateFilter = try (CompressionInputStream inflateFilter =
codec.createInputStream(deCompressedDataBuffer); codec.createInputStream(deCompressedDataBuffer);
DataInputStream originalIn =
new DataInputStream(new BufferedInputStream(originalData))) {
// Check // Check
originalIn = new DataInputStream(new BufferedInputStream(originalData));
int expected; int expected;
do { do {
expected = originalIn.read(); expected = originalIn.read();
assertEquals("Inflated stream read by byte does not match", assertEquals("Inflated stream read by byte does not match",
expected, inflateFilter.read()); expected, inflateFilter.read());
} while (expected != -1); } while (expected != -1);
}
LOG.info("SUCCESS! Completed checking " + count + " records"); LOG.info("SUCCESS! Completed checking " + count + " records");
} }