HADOOP-14376. Memory leak when reading a compressed file using the native library. Contributed by Eli Acherkan

This commit is contained in:
Jason Lowe 2017-05-12 16:54:08 -05:00 committed by Xiaoyu Yao
parent 15c7526e2c
commit 192f1e6318
7 changed files with 101 additions and 67 deletions

View File

@ -336,15 +336,11 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec {
}
public void close() throws IOException {
if (needsReset) {
// In the case that nothing is written to this stream, we still need to
// write out the header before closing, otherwise the stream won't be
// recognized by BZip2CompressionInputStream.
internalReset();
try {
super.close();
} finally {
output.close();
}
this.output.flush();
this.output.close();
needsReset = true;
}
}// end of class BZip2CompressionOutputStream
@ -454,8 +450,12 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec {
public void close() throws IOException {
if (!needsReset) {
input.close();
needsReset = true;
try {
input.close();
needsReset = true;
} finally {
super.close();
}
}
}

View File

@ -157,7 +157,10 @@ public class CodecPool {
LOG.debug("Got recycled compressor");
}
}
updateLeaseCount(compressorCounts, compressor, 1);
if (compressor != null &&
!compressor.getClass().isAnnotationPresent(DoNotPool.class)) {
updateLeaseCount(compressorCounts, compressor, 1);
}
return compressor;
}
@ -184,7 +187,10 @@ public class CodecPool {
LOG.debug("Got recycled decompressor");
}
}
updateLeaseCount(decompressorCounts, decompressor, 1);
if (decompressor != null &&
!decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
updateLeaseCount(decompressorCounts, decompressor, 1);
}
return decompressor;
}

View File

@ -59,10 +59,13 @@ public abstract class CompressionInputStream extends InputStream implements Seek
@Override
public void close() throws IOException {
in.close();
if (trackedDecompressor != null) {
CodecPool.returnDecompressor(trackedDecompressor);
trackedDecompressor = null;
try {
in.close();
} finally {
if (trackedDecompressor != null) {
CodecPool.returnDecompressor(trackedDecompressor);
trackedDecompressor = null;
}
}
}

View File

@ -56,11 +56,17 @@ public abstract class CompressionOutputStream extends OutputStream {
@Override
public void close() throws IOException {
finish();
out.close();
if (trackedCompressor != null) {
CodecPool.returnCompressor(trackedCompressor);
trackedCompressor = null;
try {
finish();
} finally {
try {
out.close();
} finally {
if (trackedCompressor != null) {
CodecPool.returnCompressor(trackedCompressor);
trackedCompressor = null;
}
}
}
}

View File

@ -103,10 +103,9 @@ public class CompressorStream extends CompressionOutputStream {
public void close() throws IOException {
if (!closed) {
try {
finish();
super.close();
}
finally {
out.close();
closed = true;
}
}

View File

@ -221,8 +221,11 @@ public class DecompressorStream extends CompressionInputStream {
@Override
public void close() throws IOException {
if (!closed) {
in.close();
closed = true;
try {
super.close();
} finally {
closed = true;
}
}
}

View File

@ -205,66 +205,83 @@ public class TestCodec {
// Compress data
DataOutputBuffer compressedDataBuffer = new DataOutputBuffer();
CompressionOutputStream deflateFilter =
int leasedCompressorsBefore = codec.getCompressorType() == null ? -1
: CodecPool.getLeasedCompressorsCount(codec);
try (CompressionOutputStream deflateFilter =
codec.createOutputStream(compressedDataBuffer);
DataOutputStream deflateOut =
new DataOutputStream(new BufferedOutputStream(deflateFilter));
deflateOut.write(data.getData(), 0, data.getLength());
deflateOut.flush();
deflateFilter.finish();
DataOutputStream deflateOut =
new DataOutputStream(new BufferedOutputStream(deflateFilter))) {
deflateOut.write(data.getData(), 0, data.getLength());
deflateOut.flush();
deflateFilter.finish();
}
if (leasedCompressorsBefore > -1) {
assertEquals("leased compressor not returned to the codec pool",
leasedCompressorsBefore, CodecPool.getLeasedCompressorsCount(codec));
}
LOG.info("Finished compressing data");
// De-compress data
DataInputBuffer deCompressedDataBuffer = new DataInputBuffer();
deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0,
compressedDataBuffer.getLength());
CompressionInputStream inflateFilter =
codec.createInputStream(deCompressedDataBuffer);
DataInputStream inflateIn =
new DataInputStream(new BufferedInputStream(inflateFilter));
// Check
DataInputBuffer originalData = new DataInputBuffer();
originalData.reset(data.getData(), 0, data.getLength());
DataInputStream originalIn = new DataInputStream(new BufferedInputStream(originalData));
for(int i=0; i < count; ++i) {
RandomDatum k1 = new RandomDatum();
RandomDatum v1 = new RandomDatum();
k1.readFields(originalIn);
v1.readFields(originalIn);
int leasedDecompressorsBefore =
CodecPool.getLeasedDecompressorsCount(codec);
try (CompressionInputStream inflateFilter =
codec.createInputStream(deCompressedDataBuffer);
DataInputStream inflateIn =
new DataInputStream(new BufferedInputStream(inflateFilter))) {
// Check
originalData.reset(data.getData(), 0, data.getLength());
DataInputStream originalIn =
new DataInputStream(new BufferedInputStream(originalData));
for(int i=0; i < count; ++i) {
RandomDatum k1 = new RandomDatum();
RandomDatum v1 = new RandomDatum();
k1.readFields(originalIn);
v1.readFields(originalIn);
RandomDatum k2 = new RandomDatum();
RandomDatum v2 = new RandomDatum();
k2.readFields(inflateIn);
v2.readFields(inflateIn);
assertTrue("original and compressed-then-decompressed-output not equal",
k1.equals(k2) && v1.equals(v2));
RandomDatum k2 = new RandomDatum();
RandomDatum v2 = new RandomDatum();
k2.readFields(inflateIn);
v2.readFields(inflateIn);
assertTrue("original and compressed-then-decompressed-output not equal",
k1.equals(k2) && v1.equals(v2));
// original and compressed-then-decompressed-output have the same hashCode
Map<RandomDatum, String> m = new HashMap<RandomDatum, String>();
m.put(k1, k1.toString());
m.put(v1, v1.toString());
String result = m.get(k2);
assertEquals("k1 and k2 hashcode not equal", result, k1.toString());
result = m.get(v2);
assertEquals("v1 and v2 hashcode not equal", result, v1.toString());
// original and compressed-then-decompressed-output have the same
// hashCode
Map<RandomDatum, String> m = new HashMap<RandomDatum, String>();
m.put(k1, k1.toString());
m.put(v1, v1.toString());
String result = m.get(k2);
assertEquals("k1 and k2 hashcode not equal", result, k1.toString());
result = m.get(v2);
assertEquals("v1 and v2 hashcode not equal", result, v1.toString());
}
}
assertEquals("leased decompressor not returned to the codec pool",
leasedDecompressorsBefore,
CodecPool.getLeasedDecompressorsCount(codec));
// De-compress data byte-at-a-time
originalData.reset(data.getData(), 0, data.getLength());
deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0,
compressedDataBuffer.getLength());
inflateFilter =
try (CompressionInputStream inflateFilter =
codec.createInputStream(deCompressedDataBuffer);
DataInputStream originalIn =
new DataInputStream(new BufferedInputStream(originalData))) {
// Check
originalIn = new DataInputStream(new BufferedInputStream(originalData));
int expected;
do {
expected = originalIn.read();
assertEquals("Inflated stream read by byte does not match",
expected, inflateFilter.read());
} while (expected != -1);
// Check
int expected;
do {
expected = originalIn.read();
assertEquals("Inflated stream read by byte does not match",
expected, inflateFilter.read());
} while (expected != -1);
}
LOG.info("SUCCESS! Completed checking " + count + " records");
}