From 4c0555a47624cd9776d6d9fd06eca9aecae553c3 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 12 Jul 2022 17:15:05 -0400 Subject: [PATCH] NIFI-10203: Ensure that when a FlowFile is transferred and is not retried that we remove the retryCount. attribute This closes #6201 Signed-off-by: Paul Grey --- .../controller/repository/StandardProcessSession.java | 10 ++++++++-- .../repository/StandardProcessSessionIT.java | 6 ++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 309b5da16b..fda62384e2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -348,7 +348,14 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn } } } else { - final FlowFileRecord currRec = record.getCurrent(); + FlowFileRecord currRec = record.getCurrent(); + + // If there's a retry attribute present, remove it. The attribute should only live while the FlowFile is being processed by the current component + if (currRec.getAttribute(retryAttribute) != null) { + currRec = new StandardFlowFileRecord.Builder().fromFlowFile(currRec).removeAttributes(retryAttribute).build(); + record.setWorking(currRec, retryAttribute, null, false); + } + final Connection finalDestination = destinations.remove(destinations.size() - 1); // remove last element record.setDestination(finalDestination.getFlowFileQueue()); incrementConnectionInputCounts(finalDestination, record); @@ -357,7 +364,6 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn incrementConnectionInputCounts(destination, record); final StandardFlowFileRecord.Builder builder = new StandardFlowFileRecord.Builder().fromFlowFile(currRec); - builder.removeAttributes(retryAttribute); builder.id(context.getNextFlowFileSequence()); final String newUuid = UUID.randomUUID().toString(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java index a4e6b399b7..d94724c9a5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java @@ -2968,7 +2968,8 @@ public class StandardProcessSessionIT { StandardProcessSession session = createSessionForRetry(processor); final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() - .build(); + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .build(); flowFileQueue.put(flowFileRecord); @@ -2991,7 +2992,8 @@ public class StandardProcessSessionIT { final StandardProcessSession session = createSessionForRetry(processor); final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() - .build(); + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .build(); flowFileQueue.put(flowFileRecord);