From 73356ea448dced6a789f2d71c5cb5aceac45520e Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 9 Mar 2022 13:44:08 -0500 Subject: [PATCH] NIFI-9783: This closes #5855. When migrating FlowFiles from one ProcessSession to another, if any FlowFile had already been transferred, and the Relationship to which it was transferred was auto-terminated, we were updating the wrong member variable, which threw off our stats for the processor. Fixed that. Signed-off-by: Joe Witt --- .../repository/StandardProcessSession.java | 21 +++++++++--- .../repository/StandardProcessSessionIT.java | 33 +++++++++++++++++++ 2 files changed, 50 insertions(+), 4 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 5bec8a68e9..bf1a1ab1fd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -1599,11 +1599,24 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn } if (repoRecord.getTransferRelationship() != null) { - flowFilesOut--; - contentSizeOut -= flowFile.getSize(); + final Relationship transferRelationship = repoRecord.getTransferRelationship(); + final Collection destinations = context.getConnections(transferRelationship); + final int numDestinations = destinations.size(); + final boolean autoTerminated = numDestinations == 0 && context.getConnectable().isAutoTerminated(transferRelationship); - newOwner.flowFilesOut++; - newOwner.contentSizeOut += flowFile.getSize(); + if (autoTerminated) { + removedCount--; + removedBytes -= flowFile.getSize(); + + newOwner.removedCount++; + newOwner.removedBytes += flowFile.getSize(); + } else { + flowFilesOut--; + contentSizeOut -= flowFile.getSize(); + + newOwner.flowFilesOut++; + newOwner.contentSizeOut += flowFile.getSize(); + } } final List events = generatedProvenanceEvents.remove(flowFile); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java index dc51885d7f..4e300d16de 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java @@ -2452,6 +2452,39 @@ public class StandardProcessSessionIT { session.commit(); } + @Test + public void testMigrateAfterTransferToAutoTerminatedRelationship() { + final long start = System.currentTimeMillis(); + + FlowFile flowFile = session.create(); + flowFile = session.write(flowFile, out -> out.write("Hello".getBytes(StandardCharsets.UTF_8))); + + final StandardProcessSession newSession = new StandardProcessSession(context, () -> false); + + when(connectable.getConnections(any(Relationship.class))).thenReturn(Collections.emptySet()); + when(connectable.isAutoTerminated(any(Relationship.class))).thenReturn(true); + + session.transfer(flowFile, new Relationship.Builder().name("success").build()); + session.migrate(newSession, Collections.singleton(flowFile)); + + session.commit(); + + RepositoryStatusReport report = flowFileEventRepository.reportTransferEvents(start - 1); + FlowFileEvent event = report.getReportEntries().values().iterator().next(); + assertEquals(0, event.getFlowFilesRemoved()); + assertEquals(0, event.getContentSizeRemoved()); + assertEquals(0, event.getFlowFilesOut()); + assertEquals(0, event.getContentSizeOut()); + + newSession.commit(); + report = flowFileEventRepository.reportTransferEvents(start - 1); + event = report.getReportEntries().values().iterator().next(); + assertEquals(1, event.getFlowFilesRemoved()); + assertEquals(5, event.getContentSizeRemoved()); + assertEquals(0, event.getFlowFilesOut()); + assertEquals(0, event.getContentSizeOut()); + } + @Test public void testNewFlowFileModifiedMultipleTimesHasTransientClaimsOnCommit() { FlowFile flowFile = session.create();