NIFI-14003: Allow stateless ready queue to update order. (#9515)

When more than one processor is ready we should allow the order of the
queue to change to prevent execution from getting stuck on a single
processor.
This commit is contained in:
Bob Paulin 2024-11-15 13:33:00 -06:00 committed by GitHub
parent 588e84e5c6
commit 9708f05b9c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 11 additions and 9 deletions

View File

@ -30,7 +30,6 @@ import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.Stack;
import java.util.function.Consumer;
@ -41,7 +40,7 @@ public class AsynchronousCommitTracker {
private static final Logger logger = LoggerFactory.getLogger(AsynchronousCommitTracker.class);
private final ProcessGroup rootGroup;
private final Set<Connectable> ready = new LinkedHashSet<>();
private final LinkedHashSet<Connectable> ready = new LinkedHashSet<>();
private final Stack<CommitCallbacks> commitCallbacks = new Stack<>();
private int flowFilesProduced = 0;
private long bytesProduced = 0L;
@ -70,15 +69,17 @@ public class AsynchronousCommitTracker {
}
}
public Connectable getNextReady() {
if (ready.isEmpty()) {
return null;
}
Connectable last = null;
for (final Connectable connectable : ready) {
last = connectable;
final Connectable last = ready.getLast();
//When selecting the next ready Connectable move the selected Connectable to the first position. The Connectable in the first position will be
//the last to be executed. This way execution of Connectables gets continuously rotated. We only need to do this when there is more than one ready.
if (ready.size() > 1) {
ready.addFirst(last);
}
return last;

View File

@ -51,17 +51,18 @@ public class TestAsynchronousCommitTracker {
assertEquals(connectable2, tracker.getNextReady());
tracker.addConnectable(connectable3);
assertEquals(Arrays.asList(connectable3, connectable2, connectable1), tracker.getReady());
assertEquals(Arrays.asList(connectable3, connectable1, connectable2), tracker.getReady());
assertEquals(connectable3, tracker.getNextReady());
// connectable1 should now be moved to the start of the List
// connectable3 should be at the end from the last call to getNextReady
tracker.addConnectable(connectable1);
assertEquals(Arrays.asList(connectable1, connectable3, connectable2), tracker.getReady());
assertEquals(Arrays.asList(connectable1, connectable2, connectable3), tracker.getReady());
assertEquals(connectable1, tracker.getNextReady());
// Adding connectable1 again should now have effect since it is already first
tracker.addConnectable(connectable1);
assertEquals(Arrays.asList(connectable1, connectable3, connectable2), tracker.getReady());
assertEquals(Arrays.asList(connectable1, connectable2, connectable3), tracker.getReady());
assertEquals(connectable1, tracker.getNextReady());
}