1
0
mirror of https://github.com/apache/nifi.git synced 2025-02-27 14:09:52 +00:00

NIFI-9689: When checking FlowFile Availability, consider swap queue and trigger data to be swapped in, since calling poll() will no longer happen if no data is available

Signed-off-by: Joe Gresock <jgresock@gmail.com>

This closes .
This commit is contained in:
Mark Payne 2022-03-01 16:41:09 -05:00 committed by Joe Gresock
parent a327ba478a
commit 53a35ae4c9
No known key found for this signature in database
GPG Key ID: 37F5B9B6E258C8B7

@ -443,18 +443,45 @@ public class SwappablePriorityQueue {
public FlowFileAvailability getFlowFileAvailability() {
// If queue is empty, avoid obtaining a lock.
if (isActiveQueueEmpty()) {
final FlowFileQueueSize queueSize = getFlowFileQueueSize();
if (queueSize.getActiveCount() == 0 && queueSize.getSwappedCount() == 0) {
return FlowFileAvailability.ACTIVE_QUEUE_EMPTY;
}
final FlowFileRecord top;
boolean mustMigrateSwapToActive = false;
FlowFileRecord top;
readLock.lock();
try {
top = activeQueue.peek();
if (top == null) {
if (swapQueue.isEmpty() && queueSize.getSwapFileCount() > 0) {
// Nothing available in the active queue or swap queue, but there is data swapped out.
// We need to trigger that data to be swapped back in. But to do this, we need to hold the write lock.
// Because we cannot obtain the write lock while already holding the read lock, we set a flag so that we
// can migrate swap to active queue only after we've released the read lock.
mustMigrateSwapToActive = true;
} else {
top = swapQueue.get(0);
}
}
} finally {
readLock.unlock("isFlowFileAvailable");
}
// If we need to migrate swapped data to the active queue, we can do that now that the read lock has been released.
// There may well be multiple threads attempting this concurrently, though, so only use tryLock() and if the lock
// is not obtained, the other thread can swap data in, or the next iteration of #getFlowFileAvailability will.
if (mustMigrateSwapToActive) {
final boolean lockObtained = writeLock.tryLock();
if (lockObtained) {
try {
migrateSwapToActive();
} finally {
writeLock.unlock("getFlowFileAvailability");
}
}
}
if (top == null) {
return FlowFileAvailability.ACTIVE_QUEUE_EMPTY;
}