NIFI-703: Do not use prefetch if using FlowFileFilter, as this can result in FlowFiles not being pulled from the queue

This commit is contained in:
Mark Payne 2015-06-19 12:55:04 -04:00
parent a1f0438451
commit a5d6f88c2e
1 changed files with 11 additions and 7 deletions

View File

@ -363,7 +363,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
return true; return true;
} }
if (maxBytes > 0 && (queueSize.getByteCount() >= maxBytes)) { if (maxBytes > 0 && queueSize.getByteCount() >= maxBytes) {
return true; return true;
} }
@ -437,7 +437,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
final List<FlowFileRecord> swapRecords = new ArrayList<>(Math.min(SWAP_RECORD_POLL_SIZE, swapQueue.size())); final List<FlowFileRecord> swapRecords = new ArrayList<>(Math.min(SWAP_RECORD_POLL_SIZE, swapQueue.size()));
final Iterator<FlowFileRecord> itr = swapQueue.iterator(); final Iterator<FlowFileRecord> itr = swapQueue.iterator();
while (itr.hasNext() && swapRecords.size() < SWAP_RECORD_POLL_SIZE) { while (itr.hasNext() && swapRecords.size() < SWAP_RECORD_POLL_SIZE) {
FlowFileRecord record = itr.next(); final FlowFileRecord record = itr.next();
swapRecords.add(record); swapRecords.add(record);
itr.remove(); itr.remove();
} }
@ -606,7 +606,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
boolean isExpired; boolean isExpired;
migrateSwapToActive(); migrateSwapToActive();
boolean queueFullAtStart = queueFullRef.get(); final boolean queueFullAtStart = queueFullRef.get();
do { do {
flowFile = this.activeQueue.poll(); flowFile = this.activeQueue.poll();
@ -794,9 +794,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
writeLock.lock(); writeLock.lock();
try { try {
migrateSwapToActive(); migrateSwapToActive();
if (activeQueue.isEmpty()) {
return Collections.emptyList();
}
final long expirationMillis = this.flowFileExpirationMillis.get(); final long expirationMillis = this.flowFileExpirationMillis.get();
final boolean queueFullAtStart = queueFullRef.get(); final boolean queueFullAtStart = queueFullRef.get();
@ -804,6 +801,13 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
final List<FlowFileRecord> selectedFlowFiles = new ArrayList<>(); final List<FlowFileRecord> selectedFlowFiles = new ArrayList<>();
final List<FlowFileRecord> unselected = new ArrayList<>(); final List<FlowFileRecord> unselected = new ArrayList<>();
// the prefetch doesn't allow us to add records back. So when this method is used,
// if there are prefetched records, we have to requeue them into the active queue first.
final PreFetch prefetch = preFetchRef.get();
if (prefetch != null) {
requeueExpiredPrefetch(prefetch);
}
while (true) { while (true) {
FlowFileRecord flowFile = this.activeQueue.poll(); FlowFileRecord flowFile = this.activeQueue.poll();
if (flowFile == null) { if (flowFile == null) {
@ -970,7 +974,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
boolean updated = false; boolean updated = false;
do { do {
QueueSize queueSize = unacknowledgedSizeRef.get(); final QueueSize queueSize = unacknowledgedSizeRef.get();
final QueueSize newSize = new QueueSize(queueSize.getObjectCount() + addToCount, queueSize.getByteCount() + addToSize); final QueueSize newSize = new QueueSize(queueSize.getObjectCount() + addToCount, queueSize.getByteCount() + addToSize);
updated = unacknowledgedSizeRef.compareAndSet(queueSize, newSize); updated = unacknowledgedSizeRef.compareAndSet(queueSize, newSize);
} while (!updated); } while (!updated);