diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 10f0d8cd9c..002bac932e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -2091,6 +2091,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE throw cnfe; } finally { recursionSet.remove(source); + bytesRead += countingStream.getBytesRead(); // if cnfeThrown is true, we don't need to re-thrown the Exception; it will propagate. if (!cnfeThrown && ffais.getContentNotFoundException() != null) { @@ -2162,6 +2163,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public void close() throws IOException { + StandardProcessSession.this.bytesRead += countingStream.getBytesRead(); + ffais.close(); openInputStreams.remove(source); } @@ -2556,7 +2559,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE try (final InputStream is = getInputStream(source, currClaim, record.getCurrentClaimOffset(), true); final InputStream limitedIn = new LimitedInputStream(is, source.getSize()); final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn); - final InputStream countingIn = new ByteCountingInputStream(disableOnCloseIn, bytesRead); + final ByteCountingInputStream countingIn = new ByteCountingInputStream(disableOnCloseIn, bytesRead); final OutputStream os = context.getContentRepository().write(newClaim); final OutputStream disableOnCloseOut = new DisableOnCloseOutputStream(os); final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(disableOnCloseOut)) { @@ -2577,7 +2580,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE cnfeThrown = true; throw cnfe; } finally { - writtenToFlowFile = countingOut.getBytesWritten(); + this.bytesWritten += countingOut.getBytesWritten(); + this.bytesRead += countingIn.getBytesRead(); recursionSet.remove(source); // if cnfeThrown is true, we don't need to re-thrown the Exception; it will propagate. @@ -2598,8 +2602,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } catch (final Throwable t) { destroyContent(newClaim); throw t; - } finally { - bytesWritten += writtenToFlowFile; } removeTemporaryClaim(record);