NIFI-3894: Call Inflater/Deflater.end to free up memory

This closes #1796.
This commit is contained in:
Koji Kawamura 2017-05-15 09:45:48 +09:00 committed by Mark Payne
parent 4fdea680ec
commit 77a676bf92
4 changed files with 19 additions and 2 deletions

View File

@ -148,6 +148,11 @@ public abstract class AbstractTransaction implements Transaction {
final InputStream dataIn = compress ? new CompressionInputStream(is) : is;
final DataPacket packet = codec.decode(new CheckedInputStream(dataIn, crc));
if (compress) {
// Close CompressionInputStream to free acquired memory, without closing underlying stream.
dataIn.close();
}
if (packet == null) {
this.dataAvailable = false;
} else {

View File

@ -174,12 +174,13 @@ public class CompressionInputStream extends InputStream {
}
/**
* Does nothing. Does NOT close underlying InputStream
* Calls {@link Inflater#end()} to free acquired memory to prevent OutOfMemory error.
* However, does NOT close underlying InputStream.
*
* @throws java.io.IOException for any issues closing underlying stream
*/
@Override
public void close() throws IOException {
inflater.end();
}
}

View File

@ -137,10 +137,15 @@ public class CompressionOutputStream extends OutputStream {
super.flush();
}
/**
* Flushes remaining buffer and calls {@link Deflater#end()} to free acquired memory to prevent OutOfMemory error.
* @throws IOException for any issues closing underlying stream
*/
@Override
public void close() throws IOException {
compressAndWrite();
out.write(0); // indicate that the stream is finished.
out.flush();
deflater.end();
}
}

View File

@ -440,6 +440,12 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol {
final CheckedInputStream checkedInputStream = new CheckedInputStream(flowFileInputStream, crc);
final DataPacket dataPacket = codec.decode(checkedInputStream);
if (handshakeProperties.isUseGzip()) {
// Close CompressionInputStream to free acquired memory, without closing underlying stream.
checkedInputStream.close();
}
if (dataPacket == null) {
logger.debug("{} Received null dataPacket indicating the end of transaction from {}", this, peer);
break;