From 9708f05b9c0302d4c353282068dd02883c07d215 Mon Sep 17 00:00:00 2001 From: Bob Paulin Date: Fri, 15 Nov 2024 13:33:00 -0600 Subject: [PATCH] NIFI-14003: Allow stateless ready queue to update order. (#9515) When more than one processor is ready we should allow the order of the queue to change to prevent execution from getting stuck on a single processor. --- .../session/AsynchronousCommitTracker.java | 13 +++++++------ .../session/TestAsynchronousCommitTracker.java | 7 ++++--- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/AsynchronousCommitTracker.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/AsynchronousCommitTracker.java index 793bdfde16..53d13cfe51 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/AsynchronousCommitTracker.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/AsynchronousCommitTracker.java @@ -30,7 +30,6 @@ import java.util.Collections; import java.util.LinkedHashSet; import java.util.List; import java.util.Objects; -import java.util.Set; import java.util.Stack; import java.util.function.Consumer; @@ -41,7 +40,7 @@ public class AsynchronousCommitTracker { private static final Logger logger = LoggerFactory.getLogger(AsynchronousCommitTracker.class); private final ProcessGroup rootGroup; - private final Set ready = new LinkedHashSet<>(); + private final LinkedHashSet ready = new LinkedHashSet<>(); private final Stack commitCallbacks = new Stack<>(); private int flowFilesProduced = 0; private long bytesProduced = 0L; @@ -70,15 +69,17 @@ public class AsynchronousCommitTracker { } } - public Connectable getNextReady() { if (ready.isEmpty()) { return null; } - Connectable last = null; - for (final Connectable connectable : ready) { - last = connectable; + final Connectable last = ready.getLast(); + + //When selecting the next ready Connectable move the selected Connectable to the first position. The Connectable in the first position will be + //the last to be executed. This way execution of Connectables gets continuously rotated. We only need to do this when there is more than one ready. + if (ready.size() > 1) { + ready.addFirst(last); } return last; diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/session/TestAsynchronousCommitTracker.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/session/TestAsynchronousCommitTracker.java index aeedd20ce3..905a1a54db 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/session/TestAsynchronousCommitTracker.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/session/TestAsynchronousCommitTracker.java @@ -51,17 +51,18 @@ public class TestAsynchronousCommitTracker { assertEquals(connectable2, tracker.getNextReady()); tracker.addConnectable(connectable3); - assertEquals(Arrays.asList(connectable3, connectable2, connectable1), tracker.getReady()); + assertEquals(Arrays.asList(connectable3, connectable1, connectable2), tracker.getReady()); assertEquals(connectable3, tracker.getNextReady()); // connectable1 should now be moved to the start of the List + // connectable3 should be at the end from the last call to getNextReady tracker.addConnectable(connectable1); - assertEquals(Arrays.asList(connectable1, connectable3, connectable2), tracker.getReady()); + assertEquals(Arrays.asList(connectable1, connectable2, connectable3), tracker.getReady()); assertEquals(connectable1, tracker.getNextReady()); // Adding connectable1 again should now have effect since it is already first tracker.addConnectable(connectable1); - assertEquals(Arrays.asList(connectable1, connectable3, connectable2), tracker.getReady()); + assertEquals(Arrays.asList(connectable1, connectable2, connectable3), tracker.getReady()); assertEquals(connectable1, tracker.getNextReady()); }