diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 799cab8cc2..f851fdd311 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -1250,7 +1250,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final List connections = context.getPollableConnections(); final int numConnections = connections.size(); for (int numAttempts = 0; numAttempts < numConnections; numAttempts++) { - final Connection conn = connections.get(context.getNextIncomingConnectionIndex() % connections.size()); + final Connection conn = connections.get(context.getNextIncomingConnectionIndex() % numConnections); final Set expired = new HashSet<>(); final FlowFileRecord flowFile = conn.getFlowFileQueue().poll(expired); removeExpired(expired, conn); @@ -1273,7 +1273,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE return Collections.emptyList(); } - return get(new QueuePoller() { + // get batch of flow files in a round-robin manner + final List connections = context.getPollableConnections(); + final Connection connection = connections.get(context.getNextIncomingConnectionIndex() % connections.size()); + + return get(connection, new QueuePoller() { @Override public List poll(final FlowFileQueue queue, final Set expiredRecords) { return queue.poll(new FlowFileFilter() { @@ -1302,6 +1306,32 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE }, true); } + private List get(final Connection connection, final QueuePoller poller, final boolean lockQueue) { + if (lockQueue) { + connection.lock(); + } + + try { + final Set expired = new HashSet<>(); + final List newlySelected = poller.poll(connection.getFlowFileQueue(), expired); + removeExpired(expired, connection); + + if (newlySelected.isEmpty() && expired.isEmpty()) { + return new ArrayList<>(); + } + + for (final FlowFileRecord flowFile : newlySelected) { + registerDequeuedRecord(flowFile, connection); + } + + return new ArrayList(newlySelected); + } finally { + if (lockQueue) { + connection.unlock(); + } + } + } + private List get(final QueuePoller poller, final boolean lockAllQueues) { final List connections = context.getPollableConnections(); if (lockAllQueues) {