From a5d6f88c2e0c6fc753fa753f259aaa6e3475049a Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 19 Jun 2015 12:55:04 -0400 Subject: [PATCH] NIFI-703: Do not use prefetch if using FlowFileFilter, as this can result in FlowFiles not being pulled from the queue --- .../nifi/controller/StandardFlowFileQueue.java | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java index 8f6c8ed944..f47ea2f1a8 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -363,7 +363,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { return true; } - if (maxBytes > 0 && (queueSize.getByteCount() >= maxBytes)) { + if (maxBytes > 0 && queueSize.getByteCount() >= maxBytes) { return true; } @@ -437,7 +437,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { final List swapRecords = new ArrayList<>(Math.min(SWAP_RECORD_POLL_SIZE, swapQueue.size())); final Iterator itr = swapQueue.iterator(); while (itr.hasNext() && swapRecords.size() < SWAP_RECORD_POLL_SIZE) { - FlowFileRecord record = itr.next(); + final FlowFileRecord record = itr.next(); swapRecords.add(record); itr.remove(); } @@ -606,7 +606,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { boolean isExpired; migrateSwapToActive(); - boolean queueFullAtStart = queueFullRef.get(); + final boolean queueFullAtStart = queueFullRef.get(); do { flowFile = this.activeQueue.poll(); @@ -794,9 +794,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { writeLock.lock(); try { migrateSwapToActive(); - if (activeQueue.isEmpty()) { - return Collections.emptyList(); - } final long expirationMillis = this.flowFileExpirationMillis.get(); final boolean queueFullAtStart = queueFullRef.get(); @@ -804,6 +801,13 @@ public final class StandardFlowFileQueue implements FlowFileQueue { final List selectedFlowFiles = new ArrayList<>(); final List unselected = new ArrayList<>(); + // the prefetch doesn't allow us to add records back. So when this method is used, + // if there are prefetched records, we have to requeue them into the active queue first. + final PreFetch prefetch = preFetchRef.get(); + if (prefetch != null) { + requeueExpiredPrefetch(prefetch); + } + while (true) { FlowFileRecord flowFile = this.activeQueue.poll(); if (flowFile == null) { @@ -970,7 +974,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { boolean updated = false; do { - QueueSize queueSize = unacknowledgedSizeRef.get(); + final QueueSize queueSize = unacknowledgedSizeRef.get(); final QueueSize newSize = new QueueSize(queueSize.getObjectCount() + addToCount, queueSize.getByteCount() + addToSize); updated = unacknowledgedSizeRef.compareAndSet(queueSize, newSize); } while (!updated);