From 53a35ae4c999824b1c79c31954d2b9b719efd997 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 1 Mar 2022 16:41:09 -0500 Subject: [PATCH] NIFI-9689: When checking FlowFile Availability, consider swap queue and trigger data to be swapped in, since calling poll() will no longer happen if no data is available Signed-off-by: Joe Gresock This closes #5821. --- .../queue/SwappablePriorityQueue.java | 31 +++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java index 92129a58ab..81e96c8699 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java @@ -443,18 +443,45 @@ public class SwappablePriorityQueue { public FlowFileAvailability getFlowFileAvailability() { // If queue is empty, avoid obtaining a lock. - if (isActiveQueueEmpty()) { + final FlowFileQueueSize queueSize = getFlowFileQueueSize(); + if (queueSize.getActiveCount() == 0 && queueSize.getSwappedCount() == 0) { return FlowFileAvailability.ACTIVE_QUEUE_EMPTY; } - final FlowFileRecord top; + boolean mustMigrateSwapToActive = false; + FlowFileRecord top; readLock.lock(); try { top = activeQueue.peek(); + if (top == null) { + if (swapQueue.isEmpty() && queueSize.getSwapFileCount() > 0) { + // Nothing available in the active queue or swap queue, but there is data swapped out. + // We need to trigger that data to be swapped back in. But to do this, we need to hold the write lock. + // Because we cannot obtain the write lock while already holding the read lock, we set a flag so that we + // can migrate swap to active queue only after we've released the read lock. + mustMigrateSwapToActive = true; + } else { + top = swapQueue.get(0); + } + } } finally { readLock.unlock("isFlowFileAvailable"); } + // If we need to migrate swapped data to the active queue, we can do that now that the read lock has been released. + // There may well be multiple threads attempting this concurrently, though, so only use tryLock() and if the lock + // is not obtained, the other thread can swap data in, or the next iteration of #getFlowFileAvailability will. + if (mustMigrateSwapToActive) { + final boolean lockObtained = writeLock.tryLock(); + if (lockObtained) { + try { + migrateSwapToActive(); + } finally { + writeLock.unlock("getFlowFileAvailability"); + } + } + } + if (top == null) { return FlowFileAvailability.ACTIVE_QUEUE_EMPTY; }