From 20c889cf826781b16a95a485b649e1829b070804 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 25 May 2021 11:42:26 -0400 Subject: [PATCH] NIFI-8620: Ensure that we provider appropriate error messages if attempting to migrate FlowFiles from one session to another without including full hierarchy; added tests to verify behavior Signed-off-by: Matthew Burgess This closes #5099 --- .../repository/StandardProcessSession.java | 19 +++- .../StandardProvenanceReporter.java | 4 + .../repository/StandardProcessSessionIT.java | 97 +++++++++++++++++++ 3 files changed, 119 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 3c331c2bf3..e5dbc97cc9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -1347,19 +1347,33 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn final ProvenanceEventBuilder eventBuilder = entry.getValue(); for (final String childId : eventBuilder.getChildFlowFileIds()) { if (!flowFileIds.contains(childId)) { - throw new IllegalStateException("Cannot migrate " + eventFlowFile + " to a new session because it was forked to create " + eventBuilder.getChildFlowFileIds().size() + throw new FlowFileHandlingException("Cannot migrate " + eventFlowFile + " to a new session because it was forked to create " + eventBuilder.getChildFlowFileIds().size() + " children and not all children are being migrated. If any FlowFile is forked, all of its children must also be migrated at the same time as the forked FlowFile"); } } + } else { + final ProvenanceEventBuilder eventBuilder = entry.getValue(); + for (final String childId : eventBuilder.getChildFlowFileIds()) { + if (flowFileIds.contains(childId)) { + throw new FlowFileHandlingException("Cannot migrate " + eventFlowFile + " to a new session because it was forked from a Parent FlowFile, but the parent is not being migrated. " + + "If any FlowFile is forked, the parent and all children must be migrated at the same time."); + } + } } } // If we have a FORK event where a FlowFile is a child of the FORK event, we want to create a FORK // event builder for the new owner of the FlowFile and remove the child from our fork event builder. + final Set forkedFlowFilesMigrated = new HashSet<>(); for (final Map.Entry entry : forkEventBuilders.entrySet()) { final FlowFile eventFlowFile = entry.getKey(); final ProvenanceEventBuilder eventBuilder = entry.getValue(); + // If the FlowFile that the event is attached to is not being migrated, we should not migrate the fork event builder either. + if (!flowFiles.contains(eventFlowFile)) { + continue; + } + final Set childrenIds = new HashSet<>(eventBuilder.getChildFlowFileIds()); ProvenanceEventBuilder copy = null; @@ -1378,9 +1392,12 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn if (copy != null) { newOwner.forkEventBuilders.put(eventFlowFile, copy); + forkedFlowFilesMigrated.add(eventFlowFile); } } + forkedFlowFilesMigrated.forEach(forkEventBuilders::remove); + newOwner.processingStartTime = Math.min(newOwner.processingStartTime, processingStartTime); for (final FlowFile flowFile : flowFiles) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java index 7c31f9c263..7b506ebc6e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java @@ -93,6 +93,10 @@ public class StandardProvenanceReporter implements InternalProvenanceReporter { for (final ProvenanceEventRecord event : events) { if (flowFileIds.contains(event.getFlowFileUuid())) { toMove.add(event); + } else if (event.getEventType() == ProvenanceEventType.CLONE) { + if (flowFileIds.containsAll(event.getChildUuids())) { + toMove.add(event); + } } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java index 9c65dce7e3..e8e456f181 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java @@ -40,6 +40,7 @@ import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.FlowFileAccessException; +import org.apache.nifi.processor.exception.FlowFileHandlingException; import org.apache.nifi.processor.exception.MissingFlowFileException; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; @@ -92,6 +93,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; +import java.util.stream.Collectors; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -307,6 +309,101 @@ public class StandardProcessSessionIT { verify(conn2, times(1)).poll(any(Set.class)); } + @Test + public void testFlowFileHandlingExceptionThrownIfMigratingChildNotParent() { + final StandardFlowFileRecord.Builder flowFileRecordBuilder = new StandardFlowFileRecord.Builder() + .id(1000L) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()); + + flowFileQueue.put(flowFileRecordBuilder.build()); + + FlowFile flowFile = session.get(); + assertNotNull(flowFile); + + final List children = new ArrayList<>(); + for (int i=0; i < 3; i++) { + FlowFile child = session.create(flowFile); + children.add(child); + } + + final ProcessSession secondSession = new StandardProcessSession(context, () -> false); + try { + session.migrate(secondSession, children); + Assert.fail("Expected a FlowFileHandlingException to be thrown because a child FlowFile was migrated while its parent was not"); + } catch (final FlowFileHandlingException expected) { + } + + try { + session.migrate(secondSession, Collections.singletonList(flowFile)); + Assert.fail("Expected a FlowFileHandlingException to be thrown because parent was forked and then migrated without children"); + } catch (final FlowFileHandlingException expected) { + } + + try { + session.migrate(secondSession, Arrays.asList(flowFile, children.get(0), children.get(1))); + Assert.fail("Expected a FlowFileHandlingException to be thrown because parent was forked and then migrated without children"); + } catch (final FlowFileHandlingException expected) { + } + + // Should succeed when migrating all FlowFiles. + final List allFlowFiles = new ArrayList<>(); + allFlowFiles.add(flowFile); + allFlowFiles.addAll(children); + session.migrate(secondSession, allFlowFiles); + session.commit(); + + final Relationship relationship = new Relationship.Builder().name("A").build(); + secondSession.transfer(allFlowFiles, relationship); + secondSession.commit(); + } + + @Test + public void testCloneForkChildMigrateCommit() throws IOException { + final StandardFlowFileRecord.Builder flowFileRecordBuilder = new StandardFlowFileRecord.Builder() + .id(1000L) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()); + + flowFileQueue.put(flowFileRecordBuilder.build()); + + FlowFile flowFile = session.get(); + assertNotNull(flowFile); + + final ProcessSession secondSession = new StandardProcessSession(context, () -> false); + + FlowFile clone = session.clone(flowFile); + session.migrate(secondSession, Collections.singletonList(clone)); + + final List children = new ArrayList<>(); + for (int i=0; i < 3; i++) { + FlowFile child = secondSession.create(clone); + children.add(child); + } + + secondSession.transfer(children, Relationship.ANONYMOUS); + secondSession.remove(clone); + secondSession.commit(); + + session.remove(flowFile); + session.commit(); + + final List provEvents = provenanceRepo.getEvents(0L, 1000); + assertEquals(3, provEvents.size()); + + final Map> eventsByType = provEvents.stream().collect(Collectors.groupingBy(ProvenanceEventRecord::getEventType)); + assertEquals(1, eventsByType.get(ProvenanceEventType.CLONE).size()); + assertEquals(1, eventsByType.get(ProvenanceEventType.DROP).size()); + assertEquals(1, eventsByType.get(ProvenanceEventType.FORK).size()); + + final ProvenanceEventRecord fork = eventsByType.get(ProvenanceEventType.FORK).get(0); + assertEquals(clone.getAttribute(CoreAttributes.UUID.key()), fork.getFlowFileUuid()); + assertEquals(Collections.singletonList(clone.getAttribute(CoreAttributes.UUID.key())), fork.getParentUuids()); + + final Set childUuids = children.stream().map(ff -> ff.getAttribute(CoreAttributes.UUID.key())).collect(Collectors.toSet()); + assertEquals(childUuids, new HashSet<>(fork.getChildUuids())); + } + @Test public void testCheckpointOnSessionDoesNotInteractWithFlowFile() { final Relationship relationship = new Relationship.Builder().name("A").build();