NIFI-6841: Fixed bug that resulted in the wrong number of 'Bytes Read' being reported by ByteCountingInputStream in the event that #skip was called between calls to #mark and #reset

This closes #3868
This commit is contained in:
Mark Payne 2019-11-05 09:38:01 -05:00 committed by Matt Gilman
parent 84a05c8595
commit a9db5a8cb7
No known key found for this signature in database
GPG Key ID: DF61EC19432AEE37
2 changed files with 77 additions and 8 deletions

View File

@ -25,7 +25,8 @@ public class ByteCountingInputStream extends InputStream {
private long bytesRead = 0L; private long bytesRead = 0L;
private long bytesSkipped = 0L; private long bytesSkipped = 0L;
private long bytesSinceMark = 0L; private long bytesReadSinceMark = 0L;
private long bytesSkippedSinceMark = 0L;
public ByteCountingInputStream(final InputStream in) { public ByteCountingInputStream(final InputStream in) {
this.in = in; this.in = in;
@ -41,7 +42,7 @@ public class ByteCountingInputStream extends InputStream {
final int fromSuper = in.read(); final int fromSuper = in.read();
if (fromSuper >= 0) { if (fromSuper >= 0) {
bytesRead++; bytesRead++;
bytesSinceMark++; bytesReadSinceMark++;
} }
return fromSuper; return fromSuper;
} }
@ -51,7 +52,7 @@ public class ByteCountingInputStream extends InputStream {
final int fromSuper = in.read(b, off, len); final int fromSuper = in.read(b, off, len);
if (fromSuper >= 0) { if (fromSuper >= 0) {
bytesRead += fromSuper; bytesRead += fromSuper;
bytesSinceMark += fromSuper; bytesReadSinceMark += fromSuper;
} }
return fromSuper; return fromSuper;
@ -67,7 +68,7 @@ public class ByteCountingInputStream extends InputStream {
final long skipped = in.skip(n); final long skipped = in.skip(n);
if (skipped >= 0) { if (skipped >= 0) {
bytesSkipped += skipped; bytesSkipped += skipped;
bytesSinceMark += skipped; bytesSkippedSinceMark += skipped;
} }
return skipped; return skipped;
} }
@ -88,7 +89,8 @@ public class ByteCountingInputStream extends InputStream {
public void mark(final int readlimit) { public void mark(final int readlimit) {
in.mark(readlimit); in.mark(readlimit);
bytesSinceMark = 0L; bytesReadSinceMark = 0L;
bytesSkippedSinceMark = 0L;
} }
@Override @Override
@ -99,8 +101,11 @@ public class ByteCountingInputStream extends InputStream {
@Override @Override
public void reset() throws IOException { public void reset() throws IOException {
in.reset(); in.reset();
bytesRead -= bytesSinceMark; bytesRead -= bytesReadSinceMark;
bytesSinceMark = 0L; bytesSkipped -= bytesSkippedSinceMark;
bytesReadSinceMark = 0L;
bytesSkippedSinceMark = 0L;
} }
@Override @Override

View File

@ -29,6 +29,7 @@ import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.StandardContentClaim; import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.controller.repository.metrics.RingBufferEventRepository;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroup;
@ -114,6 +115,7 @@ public class TestStandardProcessSession {
private ProvenanceEventRepository provenanceRepo; private ProvenanceEventRepository provenanceRepo;
private MockFlowFileRepository flowFileRepo; private MockFlowFileRepository flowFileRepo;
private CounterRepository counterRepository; private CounterRepository counterRepository;
private FlowFileEventRepository flowFileEventRepo;
private final Relationship FAKE_RELATIONSHIP = new Relationship.Builder().name("FAKE").build(); private final Relationship FAKE_RELATIONSHIP = new Relationship.Builder().name("FAKE").build();
private static StandardResourceClaimManager resourceClaimManager; private static StandardResourceClaimManager resourceClaimManager;
@ -155,7 +157,7 @@ public class TestStandardProcessSession {
resourceClaimManager = new StandardResourceClaimManager(); resourceClaimManager = new StandardResourceClaimManager();
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, TestStandardProcessSession.class.getResource("/conf/nifi.properties").getFile()); System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, TestStandardProcessSession.class.getResource("/conf/nifi.properties").getFile());
final FlowFileEventRepository flowFileEventRepo = Mockito.mock(FlowFileEventRepository.class); flowFileEventRepo = new RingBufferEventRepository(1);
counterRepository = new StandardCounterRepository(); counterRepository = new StandardCounterRepository();
provenanceRepo = new MockProvenanceRepository(); provenanceRepo = new MockProvenanceRepository();
@ -340,6 +342,68 @@ public class TestStandardProcessSession {
assertEquals(2, bCounters); assertEquals(2, bCounters);
} }
@Test
public void testReadCountCorrectWhenSkippingWithReadCallback() throws IOException {
final byte[] content = "This and that and the other.".getBytes(StandardCharsets.UTF_8);
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.id(1000L)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.contentClaim(contentRepo.create(content))
.size(content.length)
.build();
flowFileQueue.put(flowFileRecord);
FlowFile flowFile = session.get();
session.read(flowFile, in -> {
assertEquals('T', (char) in.read());
in.mark(10);
assertEquals(5, in.skip(5L));
assertEquals('n', (char) in.read());
in.reset();
});
session.transfer(flowFile);
session.commit();
final RepositoryStatusReport report = flowFileEventRepo.reportTransferEvents(0L);
final long bytesRead = report.getReportEntry("connectable-1").getBytesRead();
assertEquals(1, bytesRead);
}
@Test
public void testReadCountCorrectWhenSkippingWithReadInputStream() throws IOException {
final byte[] content = "This and that and the other.".getBytes(StandardCharsets.UTF_8);
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.id(1000L)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.contentClaim(contentRepo.create(content))
.size(content.length)
.build();
flowFileQueue.put(flowFileRecord);
FlowFile flowFile = session.get();
try (InputStream in = session.read(flowFile)) {
assertEquals('T', (char) in.read());
in.mark(10);
assertEquals(5, in.skip(5L));
assertEquals('n', (char) in.read());
in.reset();
};
session.transfer(flowFile);
session.commit();
final RepositoryStatusReport report = flowFileEventRepo.reportTransferEvents(0L);
final long bytesRead = report.getReportEntry("connectable-1").getBytesRead();
assertEquals(1, bytesRead);
}
@Test @Test
public void testHandlingOfMultipleFlowFilesWithSameId() { public void testHandlingOfMultipleFlowFilesWithSameId() {
// Add two FlowFiles with the same ID // Add two FlowFiles with the same ID