diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 002bac932e..80c917cebe 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -2580,7 +2580,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE cnfeThrown = true; throw cnfe; } finally { - this.bytesWritten += countingOut.getBytesWritten(); + writtenToFlowFile = countingOut.getBytesWritten(); + this.bytesWritten += writtenToFlowFile; this.bytesRead += countingIn.getBytesRead(); recursionSet.remove(source); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index 8cc088d480..6f94994ef2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -276,6 +276,65 @@ public class TestStandardProcessSession { assertEquals(0, numClaims); } + @Test + public void testModifyContentWithStreamCallbackHasCorrectSize() throws IOException { + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .id(1000L) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .build(); + flowFileQueue.put(flowFileRecord); + FlowFile original = session.get(); + assertNotNull(original); + + FlowFile child = session.write(original, (in, out) -> out.write("hello".getBytes())); + session.transfer(child); + session.commit(); + + final FlowFileRecord onQueue = flowFileQueue.poll(Collections.emptySet()); + assertEquals(5, onQueue.getSize()); + } + + @Test + public void testModifyContentWithOutputStreamCallbackHasCorrectSize() throws IOException { + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .id(1000L) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .build(); + flowFileQueue.put(flowFileRecord); + FlowFile original = session.get(); + assertNotNull(original); + + FlowFile child = session.write(original, out -> out.write("hello".getBytes())); + session.transfer(child); + session.commit(); + + final FlowFileRecord onQueue = flowFileQueue.poll(Collections.emptySet()); + assertEquals(5, onQueue.getSize()); + } + + @Test + public void testModifyContentWithAppendHasCorrectSize() throws IOException { + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .id(1000L) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .build(); + flowFileQueue.put(flowFileRecord); + FlowFile original = session.get(); + assertNotNull(original); + + FlowFile child = session.append(original, out -> out.write("hello".getBytes())); + session.transfer(child); + session.commit(); + + final FlowFileRecord onQueue = flowFileQueue.poll(Collections.emptySet()); + assertEquals(5, onQueue.getSize()); + } + + + @Test public void testModifyContentThenRollback() throws IOException { assertEquals(0, contentRepo.getExistingClaims().size());