From 36e7bd6164f6d7294ec17fe6487a74b042ba2f25 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Thu, 18 May 2017 10:05:08 +0900 Subject: [PATCH] NIFI-3894: This closes #1820. Fixed close and consume order with compression. Before this fix, 'NullPointerException: Inflater has been closed' can be thrown as the Inflater is closed before input stream is consumed. Also, calling close from AbstractTransaction.receive is removed, because the DataPacket is exposed as its return value and this class will not be able to know when to close the stream. Signed-off-by: joewitt --- .../org/apache/nifi/remote/AbstractTransaction.java | 5 ----- .../protocol/AbstractFlowFileServerProtocol.java | 10 +++++----- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java index 2d6b2e12d4..826cf00df0 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java @@ -148,11 +148,6 @@ public abstract class AbstractTransaction implements Transaction { final InputStream dataIn = compress ? new CompressionInputStream(is) : is; final DataPacket packet = codec.decode(new CheckedInputStream(dataIn, crc)); - if (compress) { - // Close CompressionInputStream to free acquired memory, without closing underlying stream. - dataIn.close(); - } - if (packet == null) { this.dataAvailable = false; } else { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java index f5398083ee..3a5f23ca41 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java @@ -441,11 +441,6 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol { final DataPacket dataPacket = codec.decode(checkedInputStream); - if (handshakeProperties.isUseGzip()) { - // Close CompressionInputStream to free acquired memory, without closing underlying stream. - checkedInputStream.close(); - } - if (dataPacket == null) { logger.debug("{} Received null dataPacket indicating the end of transaction from {}", this, peer); break; @@ -454,6 +449,11 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol { flowFile = session.importFrom(dataPacket.getData(), flowFile); flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes()); + if (handshakeProperties.isUseGzip()) { + // Close CompressionInputStream to free acquired memory, without closing underlying stream. + checkedInputStream.close(); + } + final long transferNanos = System.nanoTime() - startNanos; final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS); final String sourceSystemFlowFileUuid = dataPacket.getAttributes().get(CoreAttributes.UUID.key());