diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java index 2d6b2e12d4..826cf00df0 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java @@ -148,11 +148,6 @@ public abstract class AbstractTransaction implements Transaction { final InputStream dataIn = compress ? new CompressionInputStream(is) : is; final DataPacket packet = codec.decode(new CheckedInputStream(dataIn, crc)); - if (compress) { - // Close CompressionInputStream to free acquired memory, without closing underlying stream. - dataIn.close(); - } - if (packet == null) { this.dataAvailable = false; } else { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java index f5398083ee..3a5f23ca41 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java @@ -441,11 +441,6 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol { final DataPacket dataPacket = codec.decode(checkedInputStream); - if (handshakeProperties.isUseGzip()) { - // Close CompressionInputStream to free acquired memory, without closing underlying stream. - checkedInputStream.close(); - } - if (dataPacket == null) { logger.debug("{} Received null dataPacket indicating the end of transaction from {}", this, peer); break; @@ -454,6 +449,11 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol { flowFile = session.importFrom(dataPacket.getData(), flowFile); flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes()); + if (handshakeProperties.isUseGzip()) { + // Close CompressionInputStream to free acquired memory, without closing underlying stream. + checkedInputStream.close(); + } + final long transferNanos = System.nanoTime() - startNanos; final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS); final String sourceSystemFlowFileUuid = dataPacket.getAttributes().get(CoreAttributes.UUID.key());