NIFI-3894: This closes #1820. Fixed close and consume order with compression.

Before this fix, 'NullPointerException: Inflater has been closed' can be thrown as the Inflater is closed before input stream is consumed.

Also, calling close from AbstractTransaction.receive is removed, because the DataPacket is exposed as its return value and this class will not be able to know when to close the stream.

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Koji Kawamura 2017-05-18 10:05:08 +09:00 committed by joewitt
parent 8e1c79eaaf
commit 36e7bd6164
2 changed files with 5 additions and 10 deletions

View File

@ -148,11 +148,6 @@ public abstract class AbstractTransaction implements Transaction {
final InputStream dataIn = compress ? new CompressionInputStream(is) : is; final InputStream dataIn = compress ? new CompressionInputStream(is) : is;
final DataPacket packet = codec.decode(new CheckedInputStream(dataIn, crc)); final DataPacket packet = codec.decode(new CheckedInputStream(dataIn, crc));
if (compress) {
// Close CompressionInputStream to free acquired memory, without closing underlying stream.
dataIn.close();
}
if (packet == null) { if (packet == null) {
this.dataAvailable = false; this.dataAvailable = false;
} else { } else {

View File

@ -441,11 +441,6 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol {
final DataPacket dataPacket = codec.decode(checkedInputStream); final DataPacket dataPacket = codec.decode(checkedInputStream);
if (handshakeProperties.isUseGzip()) {
// Close CompressionInputStream to free acquired memory, without closing underlying stream.
checkedInputStream.close();
}
if (dataPacket == null) { if (dataPacket == null) {
logger.debug("{} Received null dataPacket indicating the end of transaction from {}", this, peer); logger.debug("{} Received null dataPacket indicating the end of transaction from {}", this, peer);
break; break;
@ -454,6 +449,11 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol {
flowFile = session.importFrom(dataPacket.getData(), flowFile); flowFile = session.importFrom(dataPacket.getData(), flowFile);
flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes()); flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes());
if (handshakeProperties.isUseGzip()) {
// Close CompressionInputStream to free acquired memory, without closing underlying stream.
checkedInputStream.close();
}
final long transferNanos = System.nanoTime() - startNanos; final long transferNanos = System.nanoTime() - startNanos;
final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS); final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
final String sourceSystemFlowFileUuid = dataPacket.getAttributes().get(CoreAttributes.UUID.key()); final String sourceSystemFlowFileUuid = dataPacket.getAttributes().get(CoreAttributes.UUID.key());