diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java index 4c2372caf9..1d4ac0ff30 100644 --- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java @@ -25,7 +25,8 @@ public class ByteCountingInputStream extends InputStream { private long bytesRead = 0L; private long bytesSkipped = 0L; - private long bytesSinceMark = 0L; + private long bytesReadSinceMark = 0L; + private long bytesSkippedSinceMark = 0L; public ByteCountingInputStream(final InputStream in) { this.in = in; @@ -41,7 +42,7 @@ public class ByteCountingInputStream extends InputStream { final int fromSuper = in.read(); if (fromSuper >= 0) { bytesRead++; - bytesSinceMark++; + bytesReadSinceMark++; } return fromSuper; } @@ -51,7 +52,7 @@ public class ByteCountingInputStream extends InputStream { final int fromSuper = in.read(b, off, len); if (fromSuper >= 0) { bytesRead += fromSuper; - bytesSinceMark += fromSuper; + bytesReadSinceMark += fromSuper; } return fromSuper; @@ -67,7 +68,7 @@ public class ByteCountingInputStream extends InputStream { final long skipped = in.skip(n); if (skipped >= 0) { bytesSkipped += skipped; - bytesSinceMark += skipped; + bytesSkippedSinceMark += skipped; } return skipped; } @@ -88,7 +89,8 @@ public class ByteCountingInputStream extends InputStream { public void mark(final int readlimit) { in.mark(readlimit); - bytesSinceMark = 0L; + bytesReadSinceMark = 0L; + bytesSkippedSinceMark = 0L; } @Override @@ -99,8 +101,11 @@ public class ByteCountingInputStream extends InputStream { @Override public void reset() throws IOException { in.reset(); - bytesRead -= bytesSinceMark; - bytesSinceMark = 0L; + bytesRead -= bytesReadSinceMark; + bytesSkipped -= bytesSkippedSinceMark; + + bytesReadSinceMark = 0L; + bytesSkippedSinceMark = 0L; } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index 267f22d168..e347c15bb0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -29,6 +29,7 @@ import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.controller.repository.claim.StandardContentClaim; import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; +import org.apache.nifi.controller.repository.metrics.RingBufferEventRepository; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.groups.ProcessGroup; @@ -114,6 +115,7 @@ public class TestStandardProcessSession { private ProvenanceEventRepository provenanceRepo; private MockFlowFileRepository flowFileRepo; private CounterRepository counterRepository; + private FlowFileEventRepository flowFileEventRepo; private final Relationship FAKE_RELATIONSHIP = new Relationship.Builder().name("FAKE").build(); private static StandardResourceClaimManager resourceClaimManager; @@ -155,7 +157,7 @@ public class TestStandardProcessSession { resourceClaimManager = new StandardResourceClaimManager(); System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, TestStandardProcessSession.class.getResource("/conf/nifi.properties").getFile()); - final FlowFileEventRepository flowFileEventRepo = Mockito.mock(FlowFileEventRepository.class); + flowFileEventRepo = new RingBufferEventRepository(1); counterRepository = new StandardCounterRepository(); provenanceRepo = new MockProvenanceRepository(); @@ -340,6 +342,68 @@ public class TestStandardProcessSession { assertEquals(2, bCounters); } + @Test + public void testReadCountCorrectWhenSkippingWithReadCallback() throws IOException { + final byte[] content = "This and that and the other.".getBytes(StandardCharsets.UTF_8); + + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .id(1000L) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .contentClaim(contentRepo.create(content)) + .size(content.length) + .build(); + + flowFileQueue.put(flowFileRecord); + + FlowFile flowFile = session.get(); + session.read(flowFile, in -> { + assertEquals('T', (char) in.read()); + in.mark(10); + assertEquals(5, in.skip(5L)); + assertEquals('n', (char) in.read()); + in.reset(); + }); + + session.transfer(flowFile); + session.commit(); + + final RepositoryStatusReport report = flowFileEventRepo.reportTransferEvents(0L); + final long bytesRead = report.getReportEntry("connectable-1").getBytesRead(); + assertEquals(1, bytesRead); + } + + @Test + public void testReadCountCorrectWhenSkippingWithReadInputStream() throws IOException { + final byte[] content = "This and that and the other.".getBytes(StandardCharsets.UTF_8); + + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .id(1000L) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .contentClaim(contentRepo.create(content)) + .size(content.length) + .build(); + + flowFileQueue.put(flowFileRecord); + + FlowFile flowFile = session.get(); + try (InputStream in = session.read(flowFile)) { + assertEquals('T', (char) in.read()); + in.mark(10); + assertEquals(5, in.skip(5L)); + assertEquals('n', (char) in.read()); + in.reset(); + }; + + session.transfer(flowFile); + session.commit(); + + final RepositoryStatusReport report = flowFileEventRepo.reportTransferEvents(0L); + final long bytesRead = report.getReportEntry("connectable-1").getBytesRead(); + assertEquals(1, bytesRead); + } + @Test public void testHandlingOfMultipleFlowFilesWithSameId() { // Add two FlowFiles with the same ID