NIFI-8739 Penalized flowfiles should be able to be polled from the queue in some cases (#5189)

This commit is contained in:
Hsin-Ying Lee 2021-08-30 23:26:02 +08:00 committed by GitHub
parent 4a3e81531b
commit 59f0b6bac3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 207 additions and 82 deletions

View File

@ -147,18 +147,26 @@ public interface FlowFileQueue {
/**
* @param expiredRecords expired records
* @param pollStrategy strategy of polling
* @return the next flow file on the queue; null if empty
*/
FlowFileRecord poll(Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy);
FlowFileRecord poll(Set<FlowFileRecord> expiredRecords);
/**
* @param maxResults limits how many results can be polled
* @param expiredRecords for expired records
* @param pollStrategy strategy of polling
* @return the next flow files on the queue up to the max results; null if
* empty
*/
List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy);
List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords);
List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy);
List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords);
String getFlowFileExpiration();

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.queue;
/**
* Represents a strategy that how to poll the queue.
*/
public enum PollStrategy {
ALL_FLOWFILES,
UNPENALIZED_FLOWFILES
}

View File

@ -32,6 +32,7 @@ import org.apache.nifi.controller.queue.ConnectionEventListener;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.FlowFileQueueFactory;
import org.apache.nifi.controller.queue.LoadBalanceStrategy;
import org.apache.nifi.controller.queue.PollStrategy;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.FlowFileFilter;
@ -341,12 +342,12 @@ public final class StandardConnection implements Connection, ConnectionEventList
@Override
public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords) {
return flowFileQueue.poll(filter, expiredRecords);
return flowFileQueue.poll(filter, expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
}
@Override
public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords) {
return flowFileQueue.poll(expiredRecords);
return flowFileQueue.poll(expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
}
@Override

View File

@ -24,6 +24,7 @@ import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.lifecycle.TaskTermination;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.PollStrategy;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ContentClaimWriteCache;
@ -2284,7 +2285,7 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
for (final Connection conn : context.getConnectable().getIncomingConnections()) {
do {
expired.clear();
conn.getFlowFileQueue().poll(filter, expired);
conn.getFlowFileQueue().poll(filter, expired, PollStrategy.ALL_FLOWFILES);
removeExpired(expired, conn);
} while (!expired.isEmpty());
}

View File

@ -27,6 +27,7 @@ import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.provenance.ProvenanceEventBuilder;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository;
@ -40,6 +41,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@ -483,4 +485,19 @@ public abstract class AbstractFlowFileQueue implements FlowFileQueue {
loadBalanceReadLock.unlock();
}
}
@Override
public FlowFileRecord poll(Set<FlowFileRecord> expiredRecords) {
return poll(expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
}
@Override
public List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords) {
return poll(maxResults, expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
}
@Override
public List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords) {
return poll(filter, expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
}
}

View File

@ -51,13 +51,13 @@ public class BlockingSwappablePriorityQueue extends SwappablePriorityQueue {
}
}
public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords, final long expirationMillis, final long waitMillis) throws InterruptedException {
public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords, final long expirationMillis, final long waitMillis, final PollStrategy pollStrategy) throws InterruptedException {
final long maxTimestamp = System.currentTimeMillis() + waitMillis;
synchronized (monitor) {
FlowFileRecord flowFile = null;
do {
flowFile = super.poll(expiredRecords, expirationMillis);
flowFile = super.poll(expiredRecords, expirationMillis, pollStrategy);
if (flowFile != null) {
return flowFile;
}

View File

@ -124,16 +124,16 @@ public class StandardFlowFileQueue extends AbstractFlowFileQueue implements Flow
@Override
public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords) {
public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) {
// First check if we have any records Pre-Fetched.
final long expirationMillis = getFlowFileExpiration(TimeUnit.MILLISECONDS);
return queue.poll(expiredRecords, expirationMillis);
return queue.poll(expiredRecords, expirationMillis, pollStrategy);
}
@Override
public List<FlowFileRecord> poll(int maxResults, final Set<FlowFileRecord> expiredRecords) {
return queue.poll(maxResults, expiredRecords, getFlowFileExpiration(TimeUnit.MILLISECONDS));
public List<FlowFileRecord> poll(int maxResults, final Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) {
return queue.poll(maxResults, expiredRecords, getFlowFileExpiration(TimeUnit.MILLISECONDS), pollStrategy);
}
@ -184,8 +184,8 @@ public class StandardFlowFileQueue extends AbstractFlowFileQueue implements Flow
}
@Override
public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords) {
return queue.poll(filter, expiredRecords, getFlowFileExpiration(TimeUnit.MILLISECONDS));
public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) {
return queue.poll(filter, expiredRecords, getFlowFileExpiration(TimeUnit.MILLISECONDS), pollStrategy);
}
@Override

View File

@ -435,33 +435,6 @@ public class SwappablePriorityQueue {
return getFlowFileQueueSize().isEmpty();
}
public boolean isFlowFileAvailable() {
if (isEmpty()) {
return false;
}
readLock.lock();
try {
// If we have data in the active or swap queue that is penalized, then we know that all FlowFiles
// are penalized. As a result, we can say that no FlowFile is available.
FlowFileRecord firstRecord = activeQueue.peek();
if (firstRecord == null && !swapQueue.isEmpty()) {
firstRecord = swapQueue.get(0);
}
if (firstRecord == null) {
// If the queue is not empty, then all data is swapped out. We don't actually know whether or not the swapped out data is penalized, so we assume
// that it is not penalized and is therefore available.
return !isEmpty();
}
// We do have a FlowFile that was retrieved from the active or swap queue. It is available if it is not penalized.
return !firstRecord.isPenalized();
} finally {
readLock.unlock("isFlowFileAvailable");
}
}
public boolean isActiveQueueEmpty() {
final FlowFileQueueSize queueSize = getFlowFileQueueSize();
return queueSize.getActiveCount() == 0 && queueSize.getSwappedCount() == 0;
@ -524,12 +497,16 @@ public class SwappablePriorityQueue {
}
public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords, final long expirationMillis) {
return poll(expiredRecords, expirationMillis, PollStrategy.UNPENALIZED_FLOWFILES);
}
public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords, final long expirationMillis, final PollStrategy pollStrategy) {
FlowFileRecord flowFile;
// First check if we have any records Pre-Fetched.
writeLock.lock();
try {
flowFile = doPoll(expiredRecords, expirationMillis);
flowFile = doPoll(expiredRecords, expirationMillis, pollStrategy);
if (flowFile != null) {
logger.trace("{} poll() returning {}", this, flowFile);
@ -543,7 +520,7 @@ public class SwappablePriorityQueue {
}
private FlowFileRecord doPoll(final Set<FlowFileRecord> expiredRecords, final long expirationMillis) {
private FlowFileRecord doPoll(final Set<FlowFileRecord> expiredRecords, final long expirationMillis, final PollStrategy pollStrategy) {
FlowFileRecord flowFile;
boolean isExpired;
@ -562,7 +539,7 @@ public class SwappablePriorityQueue {
if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) {
break;
}
} else if (flowFile != null && flowFile.isPenalized()) {
} else if (flowFile != null && flowFile.isPenalized() && pollStrategy == PollStrategy.UNPENALIZED_FLOWFILES) {
this.activeQueue.add(flowFile);
flowFile = null;
break;
@ -581,12 +558,16 @@ public class SwappablePriorityQueue {
}
public List<FlowFileRecord> poll(int maxResults, final Set<FlowFileRecord> expiredRecords, final long expirationMillis) {
return poll(maxResults, expiredRecords, expirationMillis, PollStrategy.UNPENALIZED_FLOWFILES);
}
public List<FlowFileRecord> poll(int maxResults, final Set<FlowFileRecord> expiredRecords, final long expirationMillis, final PollStrategy pollStrategy) {
final List<FlowFileRecord> records = new ArrayList<>(Math.min(1, maxResults));
// First check if we have any records Pre-Fetched.
writeLock.lock();
try {
doPoll(records, maxResults, expiredRecords, expirationMillis);
doPoll(records, maxResults, expiredRecords, expirationMillis, pollStrategy);
} finally {
writeLock.unlock("poll(int, Set)");
}
@ -599,6 +580,10 @@ public class SwappablePriorityQueue {
}
public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords, final long expirationMillis) {
return poll(filter, expiredRecords, expirationMillis, PollStrategy.UNPENALIZED_FLOWFILES);
}
public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords, final long expirationMillis, final PollStrategy pollStrategy) {
long bytesPulled = 0L;
int flowFilesPulled = 0;
@ -626,7 +611,7 @@ public class SwappablePriorityQueue {
} else {
continue;
}
} else if (flowFile.isPenalized()) {
} else if (flowFile.isPenalized() && pollStrategy == PollStrategy.UNPENALIZED_FLOWFILES) {
this.activeQueue.add(flowFile);
break; // just stop searching because the rest are all penalized.
}
@ -660,10 +645,10 @@ public class SwappablePriorityQueue {
}
}
private void doPoll(final List<FlowFileRecord> records, int maxResults, final Set<FlowFileRecord> expiredRecords, final long expirationMillis) {
private void doPoll(final List<FlowFileRecord> records, int maxResults, final Set<FlowFileRecord> expiredRecords, final long expirationMillis, final PollStrategy pollStrategy) {
migrateSwapToActive();
final long bytesDrained = drainQueue(activeQueue, records, maxResults, expiredRecords, expirationMillis);
final long bytesDrained = drainQueue(activeQueue, records, maxResults, expiredRecords, expirationMillis, pollStrategy);
long expiredBytes = 0L;
for (final FlowFileRecord record : expiredRecords) {
@ -701,7 +686,9 @@ public class SwappablePriorityQueue {
}
private long drainQueue(final Queue<FlowFileRecord> sourceQueue, final List<FlowFileRecord> destination, int maxResults, final Set<FlowFileRecord> expiredRecords, final long expirationMillis) {
private long drainQueue(final Queue<FlowFileRecord> sourceQueue, final List<FlowFileRecord> destination,
int maxResults, final Set<FlowFileRecord> expiredRecords, final long expirationMillis,
final PollStrategy pollStrategy) {
long drainedSize = 0L;
FlowFileRecord pulled;
@ -712,7 +699,7 @@ public class SwappablePriorityQueue {
break;
}
} else {
if (pulled.isPenalized()) {
if (pulled.isPenalized() && pollStrategy == PollStrategy.UNPENALIZED_FLOWFILES) {
sourceQueue.add(pulled);
break;
}

View File

@ -32,6 +32,7 @@ import org.apache.nifi.controller.queue.IllegalClusterStateException;
import org.apache.nifi.controller.queue.LoadBalanceStrategy;
import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue;
import org.apache.nifi.controller.queue.LocalQueuePartitionDiagnostics;
import org.apache.nifi.controller.queue.PollStrategy;
import org.apache.nifi.controller.queue.QueueDiagnostics;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.queue.RemoteQueuePartitionDiagnostics;
@ -930,22 +931,22 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
}
@Override
public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords) {
final FlowFileRecord flowFile = localPartition.poll(expiredRecords);
public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) {
final FlowFileRecord flowFile = localPartition.poll(expiredRecords, pollStrategy);
onAbort(expiredRecords);
return flowFile;
}
@Override
public List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords) {
final List<FlowFileRecord> flowFiles = localPartition.poll(maxResults, expiredRecords);
public List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) {
final List<FlowFileRecord> flowFiles = localPartition.poll(maxResults, expiredRecords, pollStrategy);
onAbort(expiredRecords);
return flowFiles;
}
@Override
public List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords) {
final List<FlowFileRecord> flowFiles = localPartition.poll(filter, expiredRecords);
public List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) {
final List<FlowFileRecord> flowFiles = localPartition.poll(filter, expiredRecords, pollStrategy);
onAbort(expiredRecords);
return flowFiles;
}

View File

@ -39,6 +39,7 @@ public class StandardLoadBalanceFlowFileCodec implements LoadBalanceFlowFileCode
out.writeLong(flowFile.getLineageStartDate());
out.writeLong(flowFile.getEntryDate());
out.writeLong(flowFile.getPenaltyExpirationMillis());
}
private void writeString(final String value, final DataOutputStream out) throws IOException {

View File

@ -19,6 +19,7 @@ package org.apache.nifi.controller.queue.clustered.partition;
import org.apache.nifi.controller.queue.FlowFileQueueContents;
import org.apache.nifi.controller.queue.LocalQueuePartitionDiagnostics;
import org.apache.nifi.controller.queue.PollStrategy;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.processor.FlowFileFilter;
@ -46,8 +47,11 @@ public interface LocalQueuePartition extends QueuePartition {
* Returns a single FlowFile with the highest priority that is available in the partition, or <code>null</code> if no FlowFile is available
*
* @param expiredRecords a Set of FlowFileRecord's to which any expired records that are encountered should be added
* @param pollStrategy strategy of polling
* @return a single FlowFile with the highest priority that is available in the partition, or <code>null</code> if no FlowFile is available
*/
FlowFileRecord poll(Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy);
FlowFileRecord poll(Set<FlowFileRecord> expiredRecords);
/**
@ -55,8 +59,11 @@ public interface LocalQueuePartition extends QueuePartition {
*
* @param maxResults the maximum number of FlowFiles to return
* @param expiredRecords a Set of FlowFileRecord's to which any expired records that are encountered should be added
* @param pollStrategy strategy of polling
* @return a List of FlowFiles (possibly empty) with the highest priority FlowFiles that are available in the partition
*/
List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy);
List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords);
/**
@ -64,8 +71,11 @@ public interface LocalQueuePartition extends QueuePartition {
*
* @param filter the filter to determine whether or not a given FlowFile should be returned
* @param expiredRecords a Set of FlowFileRecord's to which any expired records that are encountered should be added
* @param pollStrategy strategy of polling
* @return a List of FlowFiles (possibly empty) with FlowFiles that matched the given filter
*/
List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy);
List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords);
/**

View File

@ -21,6 +21,7 @@ import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.queue.DropFlowFileRequest;
import org.apache.nifi.controller.queue.FlowFileQueueContents;
import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue;
import org.apache.nifi.controller.queue.PollStrategy;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.queue.RemoteQueuePartitionDiagnostics;
import org.apache.nifi.controller.queue.StandardRemoteQueuePartitionDiagnostics;
@ -150,7 +151,7 @@ public class RemoteQueuePartition implements QueuePartition {
private FlowFileRecord getFlowFile() {
final Set<FlowFileRecord> expired = new HashSet<>();
final FlowFileRecord flowFile = priorityQueue.poll(expired, flowFileQueue.getFlowFileExpiration(TimeUnit.MILLISECONDS));
final FlowFileRecord flowFile = priorityQueue.poll(expired, flowFileQueue.getFlowFileExpiration(TimeUnit.MILLISECONDS), PollStrategy.ALL_FLOWFILES);
flowFileQueue.handleExpiredRecords(expired);
return flowFile;
}
@ -225,22 +226,13 @@ public class RemoteQueuePartition implements QueuePartition {
}
};
// Consider the queue empty unless a FlowFile is available. This means that if the queue has only penalized FlowFiles, it will be considered empty.
// This is what we want for the purpose of load balancing the data. Otherwise, we would have a situation where we create a connection to the other node,
// determine that now FlowFile is available to send, and then notify the node of this and close the connection. And then this would repeat over and over
// until the FlowFile is no longer penalized. Instead, we want to consider the queue empty until a FlowFile is actually available, and only then bother
// creating the connection to send data.
final BooleanSupplier emptySupplier = this::isQueueEmpty;
final BooleanSupplier emptySupplier = priorityQueue::isEmpty;
clientRegistry.register(flowFileQueue.getIdentifier(), nodeIdentifier, emptySupplier, this::getFlowFile,
failureCallback, successCallback, flowFileQueue::getLoadBalanceCompression, flowFileQueue::isPropagateBackpressureAcrossNodes);
running = true;
}
private boolean isQueueEmpty() {
return !priorityQueue.isFlowFileAvailable();
}
public void onRemoved() {
clientRegistry.unregister(flowFileQueue.getIdentifier(), nodeIdentifier);
}

View File

@ -23,6 +23,7 @@ import org.apache.nifi.controller.queue.DropFlowFileAction;
import org.apache.nifi.controller.queue.DropFlowFileRequest;
import org.apache.nifi.controller.queue.FlowFileQueueContents;
import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue;
import org.apache.nifi.controller.queue.PollStrategy;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileSwapManager;
@ -191,7 +192,7 @@ public class StandardRebalancingPartition implements RebalancingPartition {
// Wait up to #pollWaitMillis milliseconds to get a FlowFile. If none, then check if stopped
// and if not, poll again.
try {
polled = queue.poll(expiredRecords, -1, pollWaitMillis);
polled = queue.poll(expiredRecords, -1, pollWaitMillis, PollStrategy.ALL_FLOWFILES);
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
continue;
@ -211,7 +212,7 @@ public class StandardRebalancingPartition implements RebalancingPartition {
final List<FlowFileRecord> toDistribute = new ArrayList<>();
toDistribute.add(polled);
final List<FlowFileRecord> additionalRecords = queue.poll(999, expiredRecords, -1);
final List<FlowFileRecord> additionalRecords = queue.poll(999, expiredRecords, -1, PollStrategy.ALL_FLOWFILES);
toDistribute.addAll(additionalRecords);
flowFileQueue.handleExpiredRecords(expiredRecords);

View File

@ -23,6 +23,7 @@ import org.apache.nifi.controller.queue.DropFlowFileRequest;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.FlowFileQueueContents;
import org.apache.nifi.controller.queue.LocalQueuePartitionDiagnostics;
import org.apache.nifi.controller.queue.PollStrategy;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.queue.SwappablePriorityQueue;
import org.apache.nifi.controller.repository.FlowFileRecord;
@ -102,18 +103,33 @@ public class SwappablePriorityQueueLocalPartition implements LocalQueuePartition
}
@Override
public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords) {
return priorityQueue.poll(expiredRecords, getExpiration());
public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) {
return priorityQueue.poll(expiredRecords, getExpiration(), pollStrategy);
}
@Override
public List<FlowFileRecord> poll(final int maxResults, final Set<FlowFileRecord> expiredRecords) {
return priorityQueue.poll(maxResults, expiredRecords, getExpiration());
public FlowFileRecord poll(Set<FlowFileRecord> expiredRecords) {
return poll(expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
}
@Override
public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords) {
return priorityQueue.poll(filter, expiredRecords, getExpiration());
public List<FlowFileRecord> poll(final int maxResults, final Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) {
return priorityQueue.poll(maxResults, expiredRecords, getExpiration(), pollStrategy);
}
@Override
public List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords) {
return poll(maxResults, expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
}
@Override
public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) {
return priorityQueue.poll(filter, expiredRecords, getExpiration(), pollStrategy);
}
@Override
public List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords) {
return poll(filter, expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
}
private int getExpiration() {

View File

@ -509,6 +509,7 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol {
final long lineageStartDate = metadataIn.readLong();
final long entryDate = metadataIn.readLong();
final long penaltyExpirationMillis = metadataIn.readLong();
final ContentClaimTriple contentClaimTriple = consumeContent(dis, out, contentClaim, claimOffset, peerDescription, compression == LoadBalanceCompression.COMPRESS_ATTRIBUTES_AND_CONTENT);
@ -521,6 +522,7 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol {
.size(contentClaimTriple.getContentLength())
.entryDate(entryDate)
.lineageStart(lineageStartDate, lineageStartIndex.getAndIncrement())
.penaltyExpirationTime(penaltyExpirationMillis)
.build();
logger.debug("Received FlowFile {} with {} attributes and {} bytes of content", flowFileRecord, attributes.size(), contentClaimTriple.getContentLength());

View File

@ -22,6 +22,7 @@ import org.apache.nifi.controller.MockSwapManager;
import org.apache.nifi.controller.queue.DropFlowFileAction;
import org.apache.nifi.controller.queue.DropFlowFileRequest;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.PollStrategy;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.queue.SwappablePriorityQueue;
import org.apache.nifi.controller.repository.FlowFileRecord;
@ -325,6 +326,29 @@ public class TestSwappablePriorityQueue {
}
}
@Test
public void testPollWithPenalizedFlowFile() {
final FlowFileRecord penalizedFlowFile = mock(FlowFileRecord.class);
when(penalizedFlowFile.isPenalized()).thenReturn(true);
assertTrue(queue.isEmpty());
queue.put(penalizedFlowFile);
final Set<FlowFileRecord> expiredRecords = new HashSet<>();
FlowFileRecord polled = queue.poll(expiredRecords, 0, PollStrategy.UNPENALIZED_FLOWFILES);
assertNull(polled);
assertFalse(queue.isEmpty());
polled = queue.poll(expiredRecords, 0, PollStrategy.ALL_FLOWFILES);
assertNotNull(polled);
assertSame(penalizedFlowFile, polled);
// queue is still not empty because FlowFile has not yet been acknowledged.
queue.acknowledge(polled);
assertTrue(queue.isEmpty());
}
@Test
public void testPollWithOnlyExpiredFlowFile() {
final FlowFileRecord expiredFlowFile = mock(FlowFileRecord.class);

View File

@ -148,7 +148,7 @@ public class TestLoadBalanceSession {
expectedDos.write(LoadBalanceProtocolConstants.CHECK_SPACE);
expectedDos.write(LoadBalanceProtocolConstants.MORE_FLOWFILES);
expectedDos.writeInt(68); // metadata length
expectedDos.writeInt(76); // metadata length
expectedDos.writeInt(1); // 1 attribute
expectedDos.writeInt(4); // length of attribute
expectedDos.write("uuid".getBytes());
@ -156,13 +156,14 @@ public class TestLoadBalanceSession {
expectedDos.write(flowFile1.getAttribute("uuid").getBytes());
expectedDos.writeLong(flowFile1.getLineageStartDate()); // lineage start date
expectedDos.writeLong(flowFile1.getEntryDate()); // entry date
expectedDos.writeLong(flowFile1.getPenaltyExpirationMillis()); // penalty expiration time
expectedDos.write(LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS);
expectedDos.writeInt(5);
expectedDos.write("hello".getBytes());
expectedDos.write(LoadBalanceProtocolConstants.NO_DATA_FRAME);
expectedDos.write(LoadBalanceProtocolConstants.MORE_FLOWFILES);
expectedDos.writeInt(68); // metadata length
expectedDos.writeInt(76); // metadata length
expectedDos.writeInt(1); // 1 attribute
expectedDos.writeInt(4); // length of attribute
expectedDos.write("uuid".getBytes());
@ -170,6 +171,7 @@ public class TestLoadBalanceSession {
expectedDos.write(flowFile2.getAttribute("uuid").getBytes());
expectedDos.writeLong(flowFile2.getLineageStartDate()); // lineage start date
expectedDos.writeLong(flowFile2.getEntryDate()); // entry date
expectedDos.writeLong(flowFile2.getPenaltyExpirationMillis()); // penalty expiration time
expectedDos.write(LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS);
expectedDos.writeInt(8);
expectedDos.write("good-bye".getBytes());
@ -235,7 +237,7 @@ public class TestLoadBalanceSession {
expectedDos.write(LoadBalanceProtocolConstants.CHECK_SPACE);
expectedDos.write(LoadBalanceProtocolConstants.MORE_FLOWFILES);
expectedDos.writeInt(68); // metadata length
expectedDos.writeInt(76); // metadata length
expectedDos.writeInt(1); // 1 attribute
expectedDos.writeInt(4); // length of attribute
expectedDos.write("uuid".getBytes());
@ -243,6 +245,7 @@ public class TestLoadBalanceSession {
expectedDos.write(flowFile1.getAttribute("uuid").getBytes());
expectedDos.writeLong(flowFile1.getLineageStartDate()); // lineage start date
expectedDos.writeLong(flowFile1.getEntryDate()); // entry date
expectedDos.writeLong(flowFile1.getPenaltyExpirationMillis()); // penalty expiration time
// first data frame
expectedDos.write(LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS);

View File

@ -633,6 +633,7 @@ public class TestStandardLoadBalanceProtocol {
out.writeLong(0L); // lineage start date
out.writeLong(0L); // entry date
out.writeLong(0L); // penalty expiration time
dos.writeInt(baos.size());
baos.writeTo(dos);

View File

@ -275,7 +275,7 @@ public class StandardProcessSessionIT {
Mockito.doAnswer(new Answer<List<FlowFileRecord>>() {
@Override
public List<FlowFileRecord> answer(InvocationOnMock invocation) throws Throwable {
return localFlowFileQueue.poll(invocation.getArgument(0), invocation.getArgument(1));
return localFlowFileQueue.poll((FlowFileFilter) invocation.getArgument(0), invocation.getArgument(1));
}
}).when(connection).poll(any(FlowFileFilter.class), any(Set.class));

View File

@ -47,6 +47,7 @@ import org.apache.nifi.controller.queue.ListFlowFileStatus;
import org.apache.nifi.controller.queue.LoadBalanceCompression;
import org.apache.nifi.controller.queue.LoadBalanceStrategy;
import org.apache.nifi.controller.queue.NopConnectionEventListener;
import org.apache.nifi.controller.queue.PollStrategy;
import org.apache.nifi.controller.queue.QueueDiagnostics;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.queue.StandardFlowFileQueue;
@ -218,16 +219,31 @@ public class TestWriteAheadFlowFileRepository {
public void putAll(Collection<FlowFileRecord> files) {
}
@Override
public FlowFileRecord poll(Set<FlowFileRecord> expiredRecords, PollStrategy pollStrategy) {
return null;
}
@Override
public FlowFileRecord poll(Set<FlowFileRecord> expiredRecords) {
return null;
}
@Override
public List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords, PollStrategy pollStrategy) {
return null;
}
@Override
public List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords) {
return null;
}
@Override
public List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords, PollStrategy pollStrategy) {
return null;
}
@Override
public List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords) {
return null;

View File

@ -21,6 +21,7 @@ import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.controller.queue.ListFlowFileStatus;
import org.apache.nifi.controller.queue.LoadBalanceCompression;
import org.apache.nifi.controller.queue.LoadBalanceStrategy;
import org.apache.nifi.controller.queue.PollStrategy;
import org.apache.nifi.controller.queue.QueueDiagnostics;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.FlowFileRecord;
@ -162,7 +163,7 @@ public class StatelessFlowFileQueue implements DrainableFlowFileQueue {
}
@Override
public synchronized FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords) {
public synchronized FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) {
while (!flowFiles.isEmpty()) {
final FlowFileRecord flowFile = flowFiles.peek();
if (flowFile == null) {
@ -178,7 +179,7 @@ public class StatelessFlowFileQueue implements DrainableFlowFileQueue {
continue;
}
if (flowFile.isPenalized()) {
if (flowFile.isPenalized() && pollStrategy == PollStrategy.UNPENALIZED_FLOWFILES) {
return null;
}
@ -189,6 +190,11 @@ public class StatelessFlowFileQueue implements DrainableFlowFileQueue {
return null;
}
@Override
public FlowFileRecord poll(Set<FlowFileRecord> expiredRecords) {
return poll(expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
}
private boolean isExpired(final FlowFileRecord flowFile) {
if (expirationMillis == 0L) {
return false;
@ -199,10 +205,10 @@ public class StatelessFlowFileQueue implements DrainableFlowFileQueue {
}
@Override
public synchronized List<FlowFileRecord> poll(final int maxResults, final Set<FlowFileRecord> expiredRecords) {
public synchronized List<FlowFileRecord> poll(final int maxResults, final Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) {
final List<FlowFileRecord> selected = new ArrayList<>(Math.min(maxResults, flowFiles.size()));
for (int i=0; i < maxResults; i++) {
final FlowFileRecord flowFile = poll(expiredRecords);
final FlowFileRecord flowFile = poll(expiredRecords, pollStrategy);
if (flowFile != null) {
selected.add(flowFile);
}
@ -216,7 +222,12 @@ public class StatelessFlowFileQueue implements DrainableFlowFileQueue {
}
@Override
public synchronized List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords) {
public List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords) {
return poll(maxResults, expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
}
@Override
public synchronized List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) {
final List<FlowFileRecord> selected = new ArrayList<>();
// Use an iterator to iterate over all FlowFiles in the queue. This allows us to
@ -235,7 +246,7 @@ public class StatelessFlowFileQueue implements DrainableFlowFileQueue {
continue;
}
if (flowFile.isPenalized()) {
if (flowFile.isPenalized() && pollStrategy == PollStrategy.UNPENALIZED_FLOWFILES) {
break;
}
@ -254,6 +265,11 @@ public class StatelessFlowFileQueue implements DrainableFlowFileQueue {
return selected;
}
@Override
public List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords) {
return poll(filter, expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
}
@Override
public String getFlowFileExpiration() {
return expirationMillis + " millis";