NIFI-1105: Only trigger a processor that requires input to run if data is available for it process

This commit is contained in:
Mark Payne 2015-11-04 08:41:10 -05:00
parent dbf0c7893f
commit 2b1d093120
3 changed files with 6 additions and 6 deletions

View File

@ -100,10 +100,9 @@ public interface FlowFileQueue {
boolean isEmpty(); boolean isEmpty();
/** /**
* @return true if the active queue is empty; false otherwise. The Active * @return <code>true</code> if the queue is empty or contains only FlowFiles that already are being processed
* queue contains those FlowFiles that can be processed immediately and does * by others, <code>false</code> if the queue contains at least one FlowFile that is available for processing,
* not include those FlowFiles that have been swapped out or are currently * regardless of whether that FlowFile(s) is in-memory or swapped out.
* being processed
*/ */
boolean isActiveQueueEmpty(); boolean isActiveQueueEmpty();

View File

@ -210,7 +210,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
@Override @Override
public boolean isActiveQueueEmpty() { public boolean isActiveQueueEmpty() {
return size.get().activeQueueCount == 0; final FlowFileQueueSize queueSize = size.get();
return queueSize.activeQueueCount == 0 && queueSize.swappedCount == 0;
} }
public QueueSize getActiveQueueSize() { public QueueSize getActiveQueueSize() {

View File

@ -26,7 +26,7 @@ public class Connectables {
public static boolean flowFilesQueued(final Connectable connectable) { public static boolean flowFilesQueued(final Connectable connectable) {
for (final Connection conn : connectable.getIncomingConnections()) { for (final Connection conn : connectable.getIncomingConnections()) {
if (!conn.getFlowFileQueue().isEmpty()) { if (!conn.getFlowFileQueue().isActiveQueueEmpty()) {
return true; return true;
} }
} }