diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java index fbbd4bb615..d4b6b2ed65 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java @@ -147,18 +147,26 @@ public interface FlowFileQueue { /** * @param expiredRecords expired records + * @param pollStrategy strategy of polling * @return the next flow file on the queue; null if empty */ + FlowFileRecord poll(Set expiredRecords, final PollStrategy pollStrategy); + FlowFileRecord poll(Set expiredRecords); /** * @param maxResults limits how many results can be polled * @param expiredRecords for expired records + * @param pollStrategy strategy of polling * @return the next flow files on the queue up to the max results; null if * empty */ + List poll(int maxResults, Set expiredRecords, final PollStrategy pollStrategy); + List poll(int maxResults, Set expiredRecords); + List poll(FlowFileFilter filter, Set expiredRecords, final PollStrategy pollStrategy); + List poll(FlowFileFilter filter, Set expiredRecords); String getFlowFileExpiration(); diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/PollStrategy.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/PollStrategy.java new file mode 100644 index 0000000000..2f45ba2a90 --- /dev/null +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/PollStrategy.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue; + +/** + * Represents a strategy that how to poll the queue. + */ +public enum PollStrategy { + + ALL_FLOWFILES, + + UNPENALIZED_FLOWFILES +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/connectable/StandardConnection.java index bea95723be..1cf78b641b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/connectable/StandardConnection.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/connectable/StandardConnection.java @@ -32,6 +32,7 @@ import org.apache.nifi.controller.queue.ConnectionEventListener; import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.queue.FlowFileQueueFactory; import org.apache.nifi.controller.queue.LoadBalanceStrategy; +import org.apache.nifi.controller.queue.PollStrategy; import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.processor.FlowFileFilter; @@ -341,12 +342,12 @@ public final class StandardConnection implements Connection, ConnectionEventList @Override public List poll(final FlowFileFilter filter, final Set expiredRecords) { - return flowFileQueue.poll(filter, expiredRecords); + return flowFileQueue.poll(filter, expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES); } @Override public FlowFileRecord poll(final Set expiredRecords) { - return flowFileQueue.poll(expiredRecords); + return flowFileQueue.poll(expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES); } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 614abc4f20..769fad58b9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -24,6 +24,7 @@ import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.lifecycle.TaskTermination; import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.queue.PollStrategy; import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ContentClaimWriteCache; @@ -2284,7 +2285,7 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn for (final Connection conn : context.getConnectable().getIncomingConnections()) { do { expired.clear(); - conn.getFlowFileQueue().poll(filter, expired); + conn.getFlowFileQueue().poll(filter, expired, PollStrategy.ALL_FLOWFILES); removeExpired(expired, conn); } while (!expired.isEmpty()); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java index aac06597d4..de49ad2701 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java @@ -27,6 +27,7 @@ import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.FlowFileFilter; import org.apache.nifi.provenance.ProvenanceEventBuilder; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRepository; @@ -40,6 +41,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -483,4 +485,19 @@ public abstract class AbstractFlowFileQueue implements FlowFileQueue { loadBalanceReadLock.unlock(); } } + + @Override + public FlowFileRecord poll(Set expiredRecords) { + return poll(expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES); + } + + @Override + public List poll(int maxResults, Set expiredRecords) { + return poll(maxResults, expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES); + } + + @Override + public List poll(FlowFileFilter filter, Set expiredRecords) { + return poll(filter, expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES); + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/BlockingSwappablePriorityQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/BlockingSwappablePriorityQueue.java index 9a220ae385..db0e30e041 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/BlockingSwappablePriorityQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/BlockingSwappablePriorityQueue.java @@ -51,13 +51,13 @@ public class BlockingSwappablePriorityQueue extends SwappablePriorityQueue { } } - public FlowFileRecord poll(final Set expiredRecords, final long expirationMillis, final long waitMillis) throws InterruptedException { + public FlowFileRecord poll(final Set expiredRecords, final long expirationMillis, final long waitMillis, final PollStrategy pollStrategy) throws InterruptedException { final long maxTimestamp = System.currentTimeMillis() + waitMillis; synchronized (monitor) { FlowFileRecord flowFile = null; do { - flowFile = super.poll(expiredRecords, expirationMillis); + flowFile = super.poll(expiredRecords, expirationMillis, pollStrategy); if (flowFile != null) { return flowFile; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java index 903ed493ac..a87fcfe46d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java @@ -124,16 +124,16 @@ public class StandardFlowFileQueue extends AbstractFlowFileQueue implements Flow @Override - public FlowFileRecord poll(final Set expiredRecords) { + public FlowFileRecord poll(final Set expiredRecords, final PollStrategy pollStrategy) { // First check if we have any records Pre-Fetched. final long expirationMillis = getFlowFileExpiration(TimeUnit.MILLISECONDS); - return queue.poll(expiredRecords, expirationMillis); + return queue.poll(expiredRecords, expirationMillis, pollStrategy); } @Override - public List poll(int maxResults, final Set expiredRecords) { - return queue.poll(maxResults, expiredRecords, getFlowFileExpiration(TimeUnit.MILLISECONDS)); + public List poll(int maxResults, final Set expiredRecords, final PollStrategy pollStrategy) { + return queue.poll(maxResults, expiredRecords, getFlowFileExpiration(TimeUnit.MILLISECONDS), pollStrategy); } @@ -184,8 +184,8 @@ public class StandardFlowFileQueue extends AbstractFlowFileQueue implements Flow } @Override - public List poll(final FlowFileFilter filter, final Set expiredRecords) { - return queue.poll(filter, expiredRecords, getFlowFileExpiration(TimeUnit.MILLISECONDS)); + public List poll(final FlowFileFilter filter, final Set expiredRecords, final PollStrategy pollStrategy) { + return queue.poll(filter, expiredRecords, getFlowFileExpiration(TimeUnit.MILLISECONDS), pollStrategy); } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java index ec02009260..34da62c615 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java @@ -435,33 +435,6 @@ public class SwappablePriorityQueue { return getFlowFileQueueSize().isEmpty(); } - public boolean isFlowFileAvailable() { - if (isEmpty()) { - return false; - } - - readLock.lock(); - try { - // If we have data in the active or swap queue that is penalized, then we know that all FlowFiles - // are penalized. As a result, we can say that no FlowFile is available. - FlowFileRecord firstRecord = activeQueue.peek(); - if (firstRecord == null && !swapQueue.isEmpty()) { - firstRecord = swapQueue.get(0); - } - - if (firstRecord == null) { - // If the queue is not empty, then all data is swapped out. We don't actually know whether or not the swapped out data is penalized, so we assume - // that it is not penalized and is therefore available. - return !isEmpty(); - } - - // We do have a FlowFile that was retrieved from the active or swap queue. It is available if it is not penalized. - return !firstRecord.isPenalized(); - } finally { - readLock.unlock("isFlowFileAvailable"); - } - } - public boolean isActiveQueueEmpty() { final FlowFileQueueSize queueSize = getFlowFileQueueSize(); return queueSize.getActiveCount() == 0 && queueSize.getSwappedCount() == 0; @@ -524,12 +497,16 @@ public class SwappablePriorityQueue { } public FlowFileRecord poll(final Set expiredRecords, final long expirationMillis) { + return poll(expiredRecords, expirationMillis, PollStrategy.UNPENALIZED_FLOWFILES); + } + + public FlowFileRecord poll(final Set expiredRecords, final long expirationMillis, final PollStrategy pollStrategy) { FlowFileRecord flowFile; // First check if we have any records Pre-Fetched. writeLock.lock(); try { - flowFile = doPoll(expiredRecords, expirationMillis); + flowFile = doPoll(expiredRecords, expirationMillis, pollStrategy); if (flowFile != null) { logger.trace("{} poll() returning {}", this, flowFile); @@ -543,7 +520,7 @@ public class SwappablePriorityQueue { } - private FlowFileRecord doPoll(final Set expiredRecords, final long expirationMillis) { + private FlowFileRecord doPoll(final Set expiredRecords, final long expirationMillis, final PollStrategy pollStrategy) { FlowFileRecord flowFile; boolean isExpired; @@ -562,7 +539,7 @@ public class SwappablePriorityQueue { if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) { break; } - } else if (flowFile != null && flowFile.isPenalized()) { + } else if (flowFile != null && flowFile.isPenalized() && pollStrategy == PollStrategy.UNPENALIZED_FLOWFILES) { this.activeQueue.add(flowFile); flowFile = null; break; @@ -581,12 +558,16 @@ public class SwappablePriorityQueue { } public List poll(int maxResults, final Set expiredRecords, final long expirationMillis) { + return poll(maxResults, expiredRecords, expirationMillis, PollStrategy.UNPENALIZED_FLOWFILES); + } + + public List poll(int maxResults, final Set expiredRecords, final long expirationMillis, final PollStrategy pollStrategy) { final List records = new ArrayList<>(Math.min(1, maxResults)); // First check if we have any records Pre-Fetched. writeLock.lock(); try { - doPoll(records, maxResults, expiredRecords, expirationMillis); + doPoll(records, maxResults, expiredRecords, expirationMillis, pollStrategy); } finally { writeLock.unlock("poll(int, Set)"); } @@ -599,6 +580,10 @@ public class SwappablePriorityQueue { } public List poll(final FlowFileFilter filter, final Set expiredRecords, final long expirationMillis) { + return poll(filter, expiredRecords, expirationMillis, PollStrategy.UNPENALIZED_FLOWFILES); + } + + public List poll(final FlowFileFilter filter, final Set expiredRecords, final long expirationMillis, final PollStrategy pollStrategy) { long bytesPulled = 0L; int flowFilesPulled = 0; @@ -626,7 +611,7 @@ public class SwappablePriorityQueue { } else { continue; } - } else if (flowFile.isPenalized()) { + } else if (flowFile.isPenalized() && pollStrategy == PollStrategy.UNPENALIZED_FLOWFILES) { this.activeQueue.add(flowFile); break; // just stop searching because the rest are all penalized. } @@ -660,10 +645,10 @@ public class SwappablePriorityQueue { } } - private void doPoll(final List records, int maxResults, final Set expiredRecords, final long expirationMillis) { + private void doPoll(final List records, int maxResults, final Set expiredRecords, final long expirationMillis, final PollStrategy pollStrategy) { migrateSwapToActive(); - final long bytesDrained = drainQueue(activeQueue, records, maxResults, expiredRecords, expirationMillis); + final long bytesDrained = drainQueue(activeQueue, records, maxResults, expiredRecords, expirationMillis, pollStrategy); long expiredBytes = 0L; for (final FlowFileRecord record : expiredRecords) { @@ -701,7 +686,9 @@ public class SwappablePriorityQueue { } - private long drainQueue(final Queue sourceQueue, final List destination, int maxResults, final Set expiredRecords, final long expirationMillis) { + private long drainQueue(final Queue sourceQueue, final List destination, + int maxResults, final Set expiredRecords, final long expirationMillis, + final PollStrategy pollStrategy) { long drainedSize = 0L; FlowFileRecord pulled; @@ -712,7 +699,7 @@ public class SwappablePriorityQueue { break; } } else { - if (pulled.isPenalized()) { + if (pulled.isPenalized() && pollStrategy == PollStrategy.UNPENALIZED_FLOWFILES) { sourceQueue.add(pulled); break; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java index 1a1e187536..6b8d418cfe 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java @@ -32,6 +32,7 @@ import org.apache.nifi.controller.queue.IllegalClusterStateException; import org.apache.nifi.controller.queue.LoadBalanceStrategy; import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue; import org.apache.nifi.controller.queue.LocalQueuePartitionDiagnostics; +import org.apache.nifi.controller.queue.PollStrategy; import org.apache.nifi.controller.queue.QueueDiagnostics; import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.controller.queue.RemoteQueuePartitionDiagnostics; @@ -930,22 +931,22 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple } @Override - public FlowFileRecord poll(final Set expiredRecords) { - final FlowFileRecord flowFile = localPartition.poll(expiredRecords); + public FlowFileRecord poll(final Set expiredRecords, final PollStrategy pollStrategy) { + final FlowFileRecord flowFile = localPartition.poll(expiredRecords, pollStrategy); onAbort(expiredRecords); return flowFile; } @Override - public List poll(int maxResults, Set expiredRecords) { - final List flowFiles = localPartition.poll(maxResults, expiredRecords); + public List poll(int maxResults, Set expiredRecords, final PollStrategy pollStrategy) { + final List flowFiles = localPartition.poll(maxResults, expiredRecords, pollStrategy); onAbort(expiredRecords); return flowFiles; } @Override - public List poll(FlowFileFilter filter, Set expiredRecords) { - final List flowFiles = localPartition.poll(filter, expiredRecords); + public List poll(FlowFileFilter filter, Set expiredRecords, final PollStrategy pollStrategy) { + final List flowFiles = localPartition.poll(filter, expiredRecords, pollStrategy); onAbort(expiredRecords); return flowFiles; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/StandardLoadBalanceFlowFileCodec.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/StandardLoadBalanceFlowFileCodec.java index 8e9b165f1e..fe1baac7f8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/StandardLoadBalanceFlowFileCodec.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/StandardLoadBalanceFlowFileCodec.java @@ -39,6 +39,7 @@ public class StandardLoadBalanceFlowFileCodec implements LoadBalanceFlowFileCode out.writeLong(flowFile.getLineageStartDate()); out.writeLong(flowFile.getEntryDate()); + out.writeLong(flowFile.getPenaltyExpirationMillis()); } private void writeString(final String value, final DataOutputStream out) throws IOException { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalQueuePartition.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalQueuePartition.java index 9ee0e0ea6d..84f0bab3bc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalQueuePartition.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalQueuePartition.java @@ -19,6 +19,7 @@ package org.apache.nifi.controller.queue.clustered.partition; import org.apache.nifi.controller.queue.FlowFileQueueContents; import org.apache.nifi.controller.queue.LocalQueuePartitionDiagnostics; +import org.apache.nifi.controller.queue.PollStrategy; import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.processor.FlowFileFilter; @@ -46,8 +47,11 @@ public interface LocalQueuePartition extends QueuePartition { * Returns a single FlowFile with the highest priority that is available in the partition, or null if no FlowFile is available * * @param expiredRecords a Set of FlowFileRecord's to which any expired records that are encountered should be added + * @param pollStrategy strategy of polling * @return a single FlowFile with the highest priority that is available in the partition, or null if no FlowFile is available */ + FlowFileRecord poll(Set expiredRecords, final PollStrategy pollStrategy); + FlowFileRecord poll(Set expiredRecords); /** @@ -55,8 +59,11 @@ public interface LocalQueuePartition extends QueuePartition { * * @param maxResults the maximum number of FlowFiles to return * @param expiredRecords a Set of FlowFileRecord's to which any expired records that are encountered should be added + * @param pollStrategy strategy of polling * @return a List of FlowFiles (possibly empty) with the highest priority FlowFiles that are available in the partition */ + List poll(int maxResults, Set expiredRecords, final PollStrategy pollStrategy); + List poll(int maxResults, Set expiredRecords); /** @@ -64,8 +71,11 @@ public interface LocalQueuePartition extends QueuePartition { * * @param filter the filter to determine whether or not a given FlowFile should be returned * @param expiredRecords a Set of FlowFileRecord's to which any expired records that are encountered should be added + * @param pollStrategy strategy of polling * @return a List of FlowFiles (possibly empty) with FlowFiles that matched the given filter */ + List poll(FlowFileFilter filter, Set expiredRecords, final PollStrategy pollStrategy); + List poll(FlowFileFilter filter, Set expiredRecords); /** diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java index 53f0f9f898..5f69905e0f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java @@ -21,6 +21,7 @@ import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.controller.queue.DropFlowFileRequest; import org.apache.nifi.controller.queue.FlowFileQueueContents; import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue; +import org.apache.nifi.controller.queue.PollStrategy; import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.controller.queue.RemoteQueuePartitionDiagnostics; import org.apache.nifi.controller.queue.StandardRemoteQueuePartitionDiagnostics; @@ -150,7 +151,7 @@ public class RemoteQueuePartition implements QueuePartition { private FlowFileRecord getFlowFile() { final Set expired = new HashSet<>(); - final FlowFileRecord flowFile = priorityQueue.poll(expired, flowFileQueue.getFlowFileExpiration(TimeUnit.MILLISECONDS)); + final FlowFileRecord flowFile = priorityQueue.poll(expired, flowFileQueue.getFlowFileExpiration(TimeUnit.MILLISECONDS), PollStrategy.ALL_FLOWFILES); flowFileQueue.handleExpiredRecords(expired); return flowFile; } @@ -225,22 +226,13 @@ public class RemoteQueuePartition implements QueuePartition { } }; - // Consider the queue empty unless a FlowFile is available. This means that if the queue has only penalized FlowFiles, it will be considered empty. - // This is what we want for the purpose of load balancing the data. Otherwise, we would have a situation where we create a connection to the other node, - // determine that now FlowFile is available to send, and then notify the node of this and close the connection. And then this would repeat over and over - // until the FlowFile is no longer penalized. Instead, we want to consider the queue empty until a FlowFile is actually available, and only then bother - // creating the connection to send data. - final BooleanSupplier emptySupplier = this::isQueueEmpty; + final BooleanSupplier emptySupplier = priorityQueue::isEmpty; clientRegistry.register(flowFileQueue.getIdentifier(), nodeIdentifier, emptySupplier, this::getFlowFile, failureCallback, successCallback, flowFileQueue::getLoadBalanceCompression, flowFileQueue::isPropagateBackpressureAcrossNodes); running = true; } - private boolean isQueueEmpty() { - return !priorityQueue.isFlowFileAvailable(); - } - public void onRemoved() { clientRegistry.unregister(flowFileQueue.getIdentifier(), nodeIdentifier); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/StandardRebalancingPartition.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/StandardRebalancingPartition.java index 7ecac18d40..de46d1716d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/StandardRebalancingPartition.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/StandardRebalancingPartition.java @@ -23,6 +23,7 @@ import org.apache.nifi.controller.queue.DropFlowFileAction; import org.apache.nifi.controller.queue.DropFlowFileRequest; import org.apache.nifi.controller.queue.FlowFileQueueContents; import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue; +import org.apache.nifi.controller.queue.PollStrategy; import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.controller.repository.FlowFileSwapManager; @@ -191,7 +192,7 @@ public class StandardRebalancingPartition implements RebalancingPartition { // Wait up to #pollWaitMillis milliseconds to get a FlowFile. If none, then check if stopped // and if not, poll again. try { - polled = queue.poll(expiredRecords, -1, pollWaitMillis); + polled = queue.poll(expiredRecords, -1, pollWaitMillis, PollStrategy.ALL_FLOWFILES); } catch (final InterruptedException ie) { Thread.currentThread().interrupt(); continue; @@ -211,7 +212,7 @@ public class StandardRebalancingPartition implements RebalancingPartition { final List toDistribute = new ArrayList<>(); toDistribute.add(polled); - final List additionalRecords = queue.poll(999, expiredRecords, -1); + final List additionalRecords = queue.poll(999, expiredRecords, -1, PollStrategy.ALL_FLOWFILES); toDistribute.addAll(additionalRecords); flowFileQueue.handleExpiredRecords(expiredRecords); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/SwappablePriorityQueueLocalPartition.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/SwappablePriorityQueueLocalPartition.java index 075039c788..03e8e1848a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/SwappablePriorityQueueLocalPartition.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/SwappablePriorityQueueLocalPartition.java @@ -23,6 +23,7 @@ import org.apache.nifi.controller.queue.DropFlowFileRequest; import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.queue.FlowFileQueueContents; import org.apache.nifi.controller.queue.LocalQueuePartitionDiagnostics; +import org.apache.nifi.controller.queue.PollStrategy; import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.controller.queue.SwappablePriorityQueue; import org.apache.nifi.controller.repository.FlowFileRecord; @@ -102,18 +103,33 @@ public class SwappablePriorityQueueLocalPartition implements LocalQueuePartition } @Override - public FlowFileRecord poll(final Set expiredRecords) { - return priorityQueue.poll(expiredRecords, getExpiration()); + public FlowFileRecord poll(final Set expiredRecords, final PollStrategy pollStrategy) { + return priorityQueue.poll(expiredRecords, getExpiration(), pollStrategy); } @Override - public List poll(final int maxResults, final Set expiredRecords) { - return priorityQueue.poll(maxResults, expiredRecords, getExpiration()); + public FlowFileRecord poll(Set expiredRecords) { + return poll(expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES); } @Override - public List poll(final FlowFileFilter filter, final Set expiredRecords) { - return priorityQueue.poll(filter, expiredRecords, getExpiration()); + public List poll(final int maxResults, final Set expiredRecords, final PollStrategy pollStrategy) { + return priorityQueue.poll(maxResults, expiredRecords, getExpiration(), pollStrategy); + } + + @Override + public List poll(int maxResults, Set expiredRecords) { + return poll(maxResults, expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES); + } + + @Override + public List poll(final FlowFileFilter filter, final Set expiredRecords, final PollStrategy pollStrategy) { + return priorityQueue.poll(filter, expiredRecords, getExpiration(), pollStrategy); + } + + @Override + public List poll(FlowFileFilter filter, Set expiredRecords) { + return poll(filter, expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES); } private int getExpiration() { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java index 1e35bc71ac..858b76afc7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java @@ -509,6 +509,7 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol { final long lineageStartDate = metadataIn.readLong(); final long entryDate = metadataIn.readLong(); + final long penaltyExpirationMillis = metadataIn.readLong(); final ContentClaimTriple contentClaimTriple = consumeContent(dis, out, contentClaim, claimOffset, peerDescription, compression == LoadBalanceCompression.COMPRESS_ATTRIBUTES_AND_CONTENT); @@ -521,6 +522,7 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol { .size(contentClaimTriple.getContentLength()) .entryDate(entryDate) .lineageStart(lineageStartDate, lineageStartIndex.getAndIncrement()) + .penaltyExpirationTime(penaltyExpirationMillis) .build(); logger.debug("Received FlowFile {} with {} attributes and {} bytes of content", flowFileRecord, attributes.size(), contentClaimTriple.getContentLength()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java index 5cc3d7ef68..95a204b63c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java @@ -22,6 +22,7 @@ import org.apache.nifi.controller.MockSwapManager; import org.apache.nifi.controller.queue.DropFlowFileAction; import org.apache.nifi.controller.queue.DropFlowFileRequest; import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.queue.PollStrategy; import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.controller.queue.SwappablePriorityQueue; import org.apache.nifi.controller.repository.FlowFileRecord; @@ -325,6 +326,29 @@ public class TestSwappablePriorityQueue { } } + @Test + public void testPollWithPenalizedFlowFile() { + final FlowFileRecord penalizedFlowFile = mock(FlowFileRecord.class); + when(penalizedFlowFile.isPenalized()).thenReturn(true); + assertTrue(queue.isEmpty()); + queue.put(penalizedFlowFile); + + final Set expiredRecords = new HashSet<>(); + FlowFileRecord polled = queue.poll(expiredRecords, 0, PollStrategy.UNPENALIZED_FLOWFILES); + assertNull(polled); + + assertFalse(queue.isEmpty()); + + polled = queue.poll(expiredRecords, 0, PollStrategy.ALL_FLOWFILES); + assertNotNull(polled); + assertSame(penalizedFlowFile, polled); + + // queue is still not empty because FlowFile has not yet been acknowledged. + queue.acknowledge(polled); + + assertTrue(queue.isEmpty()); + } + @Test public void testPollWithOnlyExpiredFlowFile() { final FlowFileRecord expiredFlowFile = mock(FlowFileRecord.class); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java index cfe4e2bdfd..2ae0f0bc77 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java @@ -148,7 +148,7 @@ public class TestLoadBalanceSession { expectedDos.write(LoadBalanceProtocolConstants.CHECK_SPACE); expectedDos.write(LoadBalanceProtocolConstants.MORE_FLOWFILES); - expectedDos.writeInt(68); // metadata length + expectedDos.writeInt(76); // metadata length expectedDos.writeInt(1); // 1 attribute expectedDos.writeInt(4); // length of attribute expectedDos.write("uuid".getBytes()); @@ -156,13 +156,14 @@ public class TestLoadBalanceSession { expectedDos.write(flowFile1.getAttribute("uuid").getBytes()); expectedDos.writeLong(flowFile1.getLineageStartDate()); // lineage start date expectedDos.writeLong(flowFile1.getEntryDate()); // entry date + expectedDos.writeLong(flowFile1.getPenaltyExpirationMillis()); // penalty expiration time expectedDos.write(LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS); expectedDos.writeInt(5); expectedDos.write("hello".getBytes()); expectedDos.write(LoadBalanceProtocolConstants.NO_DATA_FRAME); expectedDos.write(LoadBalanceProtocolConstants.MORE_FLOWFILES); - expectedDos.writeInt(68); // metadata length + expectedDos.writeInt(76); // metadata length expectedDos.writeInt(1); // 1 attribute expectedDos.writeInt(4); // length of attribute expectedDos.write("uuid".getBytes()); @@ -170,6 +171,7 @@ public class TestLoadBalanceSession { expectedDos.write(flowFile2.getAttribute("uuid").getBytes()); expectedDos.writeLong(flowFile2.getLineageStartDate()); // lineage start date expectedDos.writeLong(flowFile2.getEntryDate()); // entry date + expectedDos.writeLong(flowFile2.getPenaltyExpirationMillis()); // penalty expiration time expectedDos.write(LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS); expectedDos.writeInt(8); expectedDos.write("good-bye".getBytes()); @@ -235,7 +237,7 @@ public class TestLoadBalanceSession { expectedDos.write(LoadBalanceProtocolConstants.CHECK_SPACE); expectedDos.write(LoadBalanceProtocolConstants.MORE_FLOWFILES); - expectedDos.writeInt(68); // metadata length + expectedDos.writeInt(76); // metadata length expectedDos.writeInt(1); // 1 attribute expectedDos.writeInt(4); // length of attribute expectedDos.write("uuid".getBytes()); @@ -243,6 +245,7 @@ public class TestLoadBalanceSession { expectedDos.write(flowFile1.getAttribute("uuid").getBytes()); expectedDos.writeLong(flowFile1.getLineageStartDate()); // lineage start date expectedDos.writeLong(flowFile1.getEntryDate()); // entry date + expectedDos.writeLong(flowFile1.getPenaltyExpirationMillis()); // penalty expiration time // first data frame expectedDos.write(LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java index bfd6f36c98..d74c2136df 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java @@ -633,6 +633,7 @@ public class TestStandardLoadBalanceProtocol { out.writeLong(0L); // lineage start date out.writeLong(0L); // entry date + out.writeLong(0L); // penalty expiration time dos.writeInt(baos.size()); baos.writeTo(dos); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java index 99de3a35b2..af26727999 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java @@ -275,7 +275,7 @@ public class StandardProcessSessionIT { Mockito.doAnswer(new Answer>() { @Override public List answer(InvocationOnMock invocation) throws Throwable { - return localFlowFileQueue.poll(invocation.getArgument(0), invocation.getArgument(1)); + return localFlowFileQueue.poll((FlowFileFilter) invocation.getArgument(0), invocation.getArgument(1)); } }).when(connection).poll(any(FlowFileFilter.class), any(Set.class)); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java index 44db6ccc96..712a9e7c3a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java @@ -47,6 +47,7 @@ import org.apache.nifi.controller.queue.ListFlowFileStatus; import org.apache.nifi.controller.queue.LoadBalanceCompression; import org.apache.nifi.controller.queue.LoadBalanceStrategy; import org.apache.nifi.controller.queue.NopConnectionEventListener; +import org.apache.nifi.controller.queue.PollStrategy; import org.apache.nifi.controller.queue.QueueDiagnostics; import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.controller.queue.StandardFlowFileQueue; @@ -218,16 +219,31 @@ public class TestWriteAheadFlowFileRepository { public void putAll(Collection files) { } + @Override + public FlowFileRecord poll(Set expiredRecords, PollStrategy pollStrategy) { + return null; + } + @Override public FlowFileRecord poll(Set expiredRecords) { return null; } + @Override + public List poll(int maxResults, Set expiredRecords, PollStrategy pollStrategy) { + return null; + } + @Override public List poll(int maxResults, Set expiredRecords) { return null; } + @Override + public List poll(FlowFileFilter filter, Set expiredRecords, PollStrategy pollStrategy) { + return null; + } + @Override public List poll(FlowFileFilter filter, Set expiredRecords) { return null; diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/queue/StatelessFlowFileQueue.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/queue/StatelessFlowFileQueue.java index f2d78a28b9..3b4a621d35 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/queue/StatelessFlowFileQueue.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/queue/StatelessFlowFileQueue.java @@ -21,6 +21,7 @@ import org.apache.nifi.controller.queue.DropFlowFileStatus; import org.apache.nifi.controller.queue.ListFlowFileStatus; import org.apache.nifi.controller.queue.LoadBalanceCompression; import org.apache.nifi.controller.queue.LoadBalanceStrategy; +import org.apache.nifi.controller.queue.PollStrategy; import org.apache.nifi.controller.queue.QueueDiagnostics; import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.controller.repository.FlowFileRecord; @@ -162,7 +163,7 @@ public class StatelessFlowFileQueue implements DrainableFlowFileQueue { } @Override - public synchronized FlowFileRecord poll(final Set expiredRecords) { + public synchronized FlowFileRecord poll(final Set expiredRecords, final PollStrategy pollStrategy) { while (!flowFiles.isEmpty()) { final FlowFileRecord flowFile = flowFiles.peek(); if (flowFile == null) { @@ -178,7 +179,7 @@ public class StatelessFlowFileQueue implements DrainableFlowFileQueue { continue; } - if (flowFile.isPenalized()) { + if (flowFile.isPenalized() && pollStrategy == PollStrategy.UNPENALIZED_FLOWFILES) { return null; } @@ -189,6 +190,11 @@ public class StatelessFlowFileQueue implements DrainableFlowFileQueue { return null; } + @Override + public FlowFileRecord poll(Set expiredRecords) { + return poll(expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES); + } + private boolean isExpired(final FlowFileRecord flowFile) { if (expirationMillis == 0L) { return false; @@ -199,10 +205,10 @@ public class StatelessFlowFileQueue implements DrainableFlowFileQueue { } @Override - public synchronized List poll(final int maxResults, final Set expiredRecords) { + public synchronized List poll(final int maxResults, final Set expiredRecords, final PollStrategy pollStrategy) { final List selected = new ArrayList<>(Math.min(maxResults, flowFiles.size())); for (int i=0; i < maxResults; i++) { - final FlowFileRecord flowFile = poll(expiredRecords); + final FlowFileRecord flowFile = poll(expiredRecords, pollStrategy); if (flowFile != null) { selected.add(flowFile); } @@ -216,7 +222,12 @@ public class StatelessFlowFileQueue implements DrainableFlowFileQueue { } @Override - public synchronized List poll(final FlowFileFilter filter, final Set expiredRecords) { + public List poll(int maxResults, Set expiredRecords) { + return poll(maxResults, expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES); + } + + @Override + public synchronized List poll(final FlowFileFilter filter, final Set expiredRecords, final PollStrategy pollStrategy) { final List selected = new ArrayList<>(); // Use an iterator to iterate over all FlowFiles in the queue. This allows us to @@ -235,7 +246,7 @@ public class StatelessFlowFileQueue implements DrainableFlowFileQueue { continue; } - if (flowFile.isPenalized()) { + if (flowFile.isPenalized() && pollStrategy == PollStrategy.UNPENALIZED_FLOWFILES) { break; } @@ -254,6 +265,11 @@ public class StatelessFlowFileQueue implements DrainableFlowFileQueue { return selected; } + @Override + public List poll(FlowFileFilter filter, Set expiredRecords) { + return poll(filter, expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES); + } + @Override public String getFlowFileExpiration() { return expirationMillis + " millis";