diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index fbbb29b2d4..60dcdb3f75 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -538,7 +538,14 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE if (isSpuriousForkEvent(event, checkpoint.removedFlowFiles)) { continue; } - + if ( isSpuriousRouteEvent(event, checkpoint.records) ) { + continue; + } + + // Check if the event indicates that the FlowFile was routed to the same + // connection from which it was pulled (and only this connection). If so, discard the event. + isSpuriousRouteEvent(event, checkpoint.records); + recordsToSubmit.add(event); addEventType(eventTypesPerFlowFileId, event.getFlowFileUuid(), event.getEventType()); } @@ -776,6 +783,45 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE return false; } + + /** + * Checks if the given event is a spurious ROUTE, meaning that the ROUTE indicates that a FlowFile + * was routed to a relationship with only 1 connection and that Connection is the Connection from which + * the FlowFile was pulled. I.e., the FlowFile was really routed nowhere. + * + * @param event + * @param records + * @return + */ + private boolean isSpuriousRouteEvent(final ProvenanceEventRecord event, final Map records) { + if ( event.getEventType() == ProvenanceEventType.ROUTE ) { + final String relationshipName = event.getRelationship(); + final Relationship relationship = new Relationship.Builder().name(relationshipName).build(); + final Collection connectionsForRelationship = this.context.getConnections(relationship); + + // If the number of connections for this relationship is not 1, then we can't ignore this ROUTE event, + // as it may be cloning the FlowFile and adding to multiple connections. + if ( connectionsForRelationship.size() == 1 ) { + for ( final Map.Entry entry : records.entrySet() ) { + final FlowFileRecord flowFileRecord = entry.getKey(); + if ( event.getFlowFileUuid().equals(flowFileRecord.getAttribute(CoreAttributes.UUID.key())) ) { + final StandardRepositoryRecord repoRecord = entry.getValue(); + if ( repoRecord.getOriginalQueue() == null ) { + return false; + } + + final String originalQueueId = repoRecord.getOriginalQueue().getIdentifier(); + final Connection destinationConnection = connectionsForRelationship.iterator().next(); + final String destinationQueueId = destinationConnection.getFlowFileQueue().getIdentifier(); + return originalQueueId.equals(destinationQueueId); + } + } + } + } + + return false; + } + @Override public void rollback() { rollback(false);