NIFI-10203: Fixed bug in which same FlowFile Builder was used repeatedly for multiple FlowFiles; this caused mingling of their attributes when a FlowFile is routed to a relationship that has more than 1 destination (i.e., many connections with the same Relationship)

This closes #6187
Signed-off-by: Paul Grey <greyp@apache.org>
This commit is contained in:
Mark Payne 2022-07-08 15:21:44 -04:00 committed by Paul Grey
parent 274e4feeaa
commit 996d8faaf4
No known key found for this signature in database
GPG Key ID: 8DDF32B9C7EE39D0
2 changed files with 49 additions and 6 deletions

View File

@ -348,17 +348,16 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
}
}
} else {
final Connection finalDestination = destinations.remove(destinations.size() - 1); // remove last element
final FlowFileRecord currRec = record.getCurrent();
final StandardFlowFileRecord.Builder builder = new StandardFlowFileRecord.Builder().fromFlowFile(currRec);
builder.removeAttributes(retryAttribute);
final Connection finalDestination = destinations.remove(destinations.size() - 1); // remove last element
record.setDestination(finalDestination.getFlowFileQueue());
record.setWorking(builder.build(), false);
incrementConnectionInputCounts(finalDestination, record);
for (final Connection destination : destinations) { // iterate over remaining destinations and "clone" as needed
incrementConnectionInputCounts(destination, record);
final StandardFlowFileRecord.Builder builder = new StandardFlowFileRecord.Builder().fromFlowFile(currRec);
builder.removeAttributes(retryAttribute);
builder.id(context.getNextFlowFileSequence());
final String newUuid = UUID.randomUUID().toString();
@ -372,7 +371,7 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
if (claim != null) {
context.getContentRepository().incrementClaimaintCount(claim);
}
newRecord.setWorking(clone, Collections.<String, String>emptyMap(), false);
newRecord.setWorking(clone, Collections.emptyMap(), false);
newRecord.setDestination(destination.getFlowFileQueue());
newRecord.setTransferRelationship(record.getTransferRelationship());

View File

@ -100,6 +100,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@ -418,6 +419,49 @@ public class StandardProcessSessionIT {
assertEquals(childUuids, new HashSet<>(fork.getChildUuids()));
}
@Test
public void testCloneOnMultipleDestinations() {
final String originalUuid = "12345678-1234-1234-1234-123456789012";
final StandardFlowFileRecord.Builder flowFileRecordBuilder = new StandardFlowFileRecord.Builder()
.id(1000L)
.addAttribute("uuid", originalUuid)
.addAttribute("abc", "xyz")
.entryDate(System.currentTimeMillis());
flowFileQueue.put(flowFileRecordBuilder.build());
FlowFile flowFile = session.get();
assertNotNull(flowFile);
final List<Connection> connectionList = new ArrayList<>();
for (int i=0; i < 3; i++) {
connectionList.add(createConnection());
}
when(connectable.getConnections(any(Relationship.class))).thenReturn(new HashSet<>(connectionList));
session.transfer(flowFile, Relationship.ANONYMOUS);
session.commit();
final List<FlowFileRecord> outputFlowFiles = new ArrayList<>();
for (final Connection connection : connectionList) {
final FlowFileRecord outputFlowFile = connection.getFlowFileQueue().poll(Collections.emptySet());
outputFlowFiles.add(outputFlowFile);
}
assertEquals(3, outputFlowFiles.size());
final Set<String> uuids = outputFlowFiles.stream()
.map(ff -> ff.getAttribute("uuid"))
.collect(Collectors.toSet());
assertEquals(3, uuids.size());
assertTrue(uuids.contains(originalUuid));
final Predicate<FlowFileRecord> attributeAbcMatches = ff -> ff.getAttribute("abc").equals("xyz");
assertTrue(outputFlowFiles.stream().allMatch(attributeAbcMatches));
}
@Test
public void testCheckpointOnSessionDoesNotInteractWithFlowFile() {
final Relationship relationship = new Relationship.Builder().name("A").build();