NIFI-8620: Ensure that we provider appropriate error messages if attempting to migrate FlowFiles from one session to another without including full hierarchy; added tests to verify behavior

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #5099
This commit is contained in:
Mark Payne 2021-05-25 11:42:26 -04:00 committed by Matthew Burgess
parent 48befe22f6
commit 20c889cf82
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
3 changed files with 119 additions and 1 deletions

View File

@ -1347,19 +1347,33 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
final ProvenanceEventBuilder eventBuilder = entry.getValue();
for (final String childId : eventBuilder.getChildFlowFileIds()) {
if (!flowFileIds.contains(childId)) {
throw new IllegalStateException("Cannot migrate " + eventFlowFile + " to a new session because it was forked to create " + eventBuilder.getChildFlowFileIds().size()
throw new FlowFileHandlingException("Cannot migrate " + eventFlowFile + " to a new session because it was forked to create " + eventBuilder.getChildFlowFileIds().size()
+ " children and not all children are being migrated. If any FlowFile is forked, all of its children must also be migrated at the same time as the forked FlowFile");
}
}
} else {
final ProvenanceEventBuilder eventBuilder = entry.getValue();
for (final String childId : eventBuilder.getChildFlowFileIds()) {
if (flowFileIds.contains(childId)) {
throw new FlowFileHandlingException("Cannot migrate " + eventFlowFile + " to a new session because it was forked from a Parent FlowFile, but the parent is not being migrated. "
+ "If any FlowFile is forked, the parent and all children must be migrated at the same time.");
}
}
}
}
// If we have a FORK event where a FlowFile is a child of the FORK event, we want to create a FORK
// event builder for the new owner of the FlowFile and remove the child from our fork event builder.
final Set<FlowFile> forkedFlowFilesMigrated = new HashSet<>();
for (final Map.Entry<FlowFile, ProvenanceEventBuilder> entry : forkEventBuilders.entrySet()) {
final FlowFile eventFlowFile = entry.getKey();
final ProvenanceEventBuilder eventBuilder = entry.getValue();
// If the FlowFile that the event is attached to is not being migrated, we should not migrate the fork event builder either.
if (!flowFiles.contains(eventFlowFile)) {
continue;
}
final Set<String> childrenIds = new HashSet<>(eventBuilder.getChildFlowFileIds());
ProvenanceEventBuilder copy = null;
@ -1378,9 +1392,12 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
if (copy != null) {
newOwner.forkEventBuilders.put(eventFlowFile, copy);
forkedFlowFilesMigrated.add(eventFlowFile);
}
}
forkedFlowFilesMigrated.forEach(forkEventBuilders::remove);
newOwner.processingStartTime = Math.min(newOwner.processingStartTime, processingStartTime);
for (final FlowFile flowFile : flowFiles) {

View File

@ -93,6 +93,10 @@ public class StandardProvenanceReporter implements InternalProvenanceReporter {
for (final ProvenanceEventRecord event : events) {
if (flowFileIds.contains(event.getFlowFileUuid())) {
toMove.add(event);
} else if (event.getEventType() == ProvenanceEventType.CLONE) {
if (flowFileIds.containsAll(event.getChildUuids())) {
toMove.add(event);
}
}
}

View File

@ -40,6 +40,7 @@ import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.FlowFileHandlingException;
import org.apache.nifi.processor.exception.MissingFlowFileException;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
@ -92,6 +93,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@ -307,6 +309,101 @@ public class StandardProcessSessionIT {
verify(conn2, times(1)).poll(any(Set.class));
}
@Test
public void testFlowFileHandlingExceptionThrownIfMigratingChildNotParent() {
final StandardFlowFileRecord.Builder flowFileRecordBuilder = new StandardFlowFileRecord.Builder()
.id(1000L)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis());
flowFileQueue.put(flowFileRecordBuilder.build());
FlowFile flowFile = session.get();
assertNotNull(flowFile);
final List<FlowFile> children = new ArrayList<>();
for (int i=0; i < 3; i++) {
FlowFile child = session.create(flowFile);
children.add(child);
}
final ProcessSession secondSession = new StandardProcessSession(context, () -> false);
try {
session.migrate(secondSession, children);
Assert.fail("Expected a FlowFileHandlingException to be thrown because a child FlowFile was migrated while its parent was not");
} catch (final FlowFileHandlingException expected) {
}
try {
session.migrate(secondSession, Collections.singletonList(flowFile));
Assert.fail("Expected a FlowFileHandlingException to be thrown because parent was forked and then migrated without children");
} catch (final FlowFileHandlingException expected) {
}
try {
session.migrate(secondSession, Arrays.asList(flowFile, children.get(0), children.get(1)));
Assert.fail("Expected a FlowFileHandlingException to be thrown because parent was forked and then migrated without children");
} catch (final FlowFileHandlingException expected) {
}
// Should succeed when migrating all FlowFiles.
final List<FlowFile> allFlowFiles = new ArrayList<>();
allFlowFiles.add(flowFile);
allFlowFiles.addAll(children);
session.migrate(secondSession, allFlowFiles);
session.commit();
final Relationship relationship = new Relationship.Builder().name("A").build();
secondSession.transfer(allFlowFiles, relationship);
secondSession.commit();
}
@Test
public void testCloneForkChildMigrateCommit() throws IOException {
final StandardFlowFileRecord.Builder flowFileRecordBuilder = new StandardFlowFileRecord.Builder()
.id(1000L)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis());
flowFileQueue.put(flowFileRecordBuilder.build());
FlowFile flowFile = session.get();
assertNotNull(flowFile);
final ProcessSession secondSession = new StandardProcessSession(context, () -> false);
FlowFile clone = session.clone(flowFile);
session.migrate(secondSession, Collections.singletonList(clone));
final List<FlowFile> children = new ArrayList<>();
for (int i=0; i < 3; i++) {
FlowFile child = secondSession.create(clone);
children.add(child);
}
secondSession.transfer(children, Relationship.ANONYMOUS);
secondSession.remove(clone);
secondSession.commit();
session.remove(flowFile);
session.commit();
final List<ProvenanceEventRecord> provEvents = provenanceRepo.getEvents(0L, 1000);
assertEquals(3, provEvents.size());
final Map<ProvenanceEventType, List<ProvenanceEventRecord>> eventsByType = provEvents.stream().collect(Collectors.groupingBy(ProvenanceEventRecord::getEventType));
assertEquals(1, eventsByType.get(ProvenanceEventType.CLONE).size());
assertEquals(1, eventsByType.get(ProvenanceEventType.DROP).size());
assertEquals(1, eventsByType.get(ProvenanceEventType.FORK).size());
final ProvenanceEventRecord fork = eventsByType.get(ProvenanceEventType.FORK).get(0);
assertEquals(clone.getAttribute(CoreAttributes.UUID.key()), fork.getFlowFileUuid());
assertEquals(Collections.singletonList(clone.getAttribute(CoreAttributes.UUID.key())), fork.getParentUuids());
final Set<String> childUuids = children.stream().map(ff -> ff.getAttribute(CoreAttributes.UUID.key())).collect(Collectors.toSet());
assertEquals(childUuids, new HashSet<>(fork.getChildUuids()));
}
@Test
public void testCheckpointOnSessionDoesNotInteractWithFlowFile() {
final Relationship relationship = new Relationship.Builder().name("A").build();