NIFI-9783: This closes #5855. When migrating FlowFiles from one ProcessSession to another, if any FlowFile had already been transferred, and the Relationship to which it was transferred was auto-terminated, we were updating the wrong member variable, which threw off our stats for the processor. Fixed that.

Signed-off-by: Joe Witt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2022-03-09 13:44:08 -05:00 committed by Joe Witt
parent c73573b325
commit 73356ea448
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
2 changed files with 50 additions and 4 deletions

View File

@ -1599,12 +1599,25 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
} }
if (repoRecord.getTransferRelationship() != null) { if (repoRecord.getTransferRelationship() != null) {
final Relationship transferRelationship = repoRecord.getTransferRelationship();
final Collection<Connection> destinations = context.getConnections(transferRelationship);
final int numDestinations = destinations.size();
final boolean autoTerminated = numDestinations == 0 && context.getConnectable().isAutoTerminated(transferRelationship);
if (autoTerminated) {
removedCount--;
removedBytes -= flowFile.getSize();
newOwner.removedCount++;
newOwner.removedBytes += flowFile.getSize();
} else {
flowFilesOut--; flowFilesOut--;
contentSizeOut -= flowFile.getSize(); contentSizeOut -= flowFile.getSize();
newOwner.flowFilesOut++; newOwner.flowFilesOut++;
newOwner.contentSizeOut += flowFile.getSize(); newOwner.contentSizeOut += flowFile.getSize();
} }
}
final List<ProvenanceEventRecord> events = generatedProvenanceEvents.remove(flowFile); final List<ProvenanceEventRecord> events = generatedProvenanceEvents.remove(flowFile);
if (events != null) { if (events != null) {

View File

@ -2452,6 +2452,39 @@ public class StandardProcessSessionIT {
session.commit(); session.commit();
} }
@Test
public void testMigrateAfterTransferToAutoTerminatedRelationship() {
final long start = System.currentTimeMillis();
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, out -> out.write("Hello".getBytes(StandardCharsets.UTF_8)));
final StandardProcessSession newSession = new StandardProcessSession(context, () -> false);
when(connectable.getConnections(any(Relationship.class))).thenReturn(Collections.emptySet());
when(connectable.isAutoTerminated(any(Relationship.class))).thenReturn(true);
session.transfer(flowFile, new Relationship.Builder().name("success").build());
session.migrate(newSession, Collections.singleton(flowFile));
session.commit();
RepositoryStatusReport report = flowFileEventRepository.reportTransferEvents(start - 1);
FlowFileEvent event = report.getReportEntries().values().iterator().next();
assertEquals(0, event.getFlowFilesRemoved());
assertEquals(0, event.getContentSizeRemoved());
assertEquals(0, event.getFlowFilesOut());
assertEquals(0, event.getContentSizeOut());
newSession.commit();
report = flowFileEventRepository.reportTransferEvents(start - 1);
event = report.getReportEntries().values().iterator().next();
assertEquals(1, event.getFlowFilesRemoved());
assertEquals(5, event.getContentSizeRemoved());
assertEquals(0, event.getFlowFilesOut());
assertEquals(0, event.getContentSizeOut());
}
@Test @Test
public void testNewFlowFileModifiedMultipleTimesHasTransientClaimsOnCommit() { public void testNewFlowFileModifiedMultipleTimesHasTransientClaimsOnCommit() {
FlowFile flowFile = session.create(); FlowFile flowFile = session.create();