NIFI-9689: When all FlowFiles in a FlowFile Queue are penalized, do not schedule the destination to run. Also expose this fact via the ConnectionStatusSnapshotDTO, as this allows the front-end to render this information to the user in order to avoid confusion when it appears that the Processor has data but does nothing

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #5771
This commit is contained in:
Mark Payne 2022-02-15 14:25:36 -05:00 committed by Matthew Burgess
parent 4210e30047
commit 2aa6bd1e13
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
20 changed files with 235 additions and 58 deletions

View File

@ -44,6 +44,7 @@ public class ConnectionStatus implements Cloneable {
private long maxQueuedBytes;
private long totalQueuedDuration;
private long maxQueuedDuration;
private FlowFileAvailability flowFileAvailability;
public String getId() {
return id;
@ -214,6 +215,14 @@ public class ConnectionStatus implements Cloneable {
this.maxQueuedDuration = maxQueuedDuration;
}
public FlowFileAvailability getFlowFileAvailability() {
return flowFileAvailability;
}
public void setFlowFileAvailability(final FlowFileAvailability availability) {
this.flowFileAvailability = availability;
}
@Override
public ConnectionStatus clone() {
final ConnectionStatus clonedObj = new ConnectionStatus();
@ -230,6 +239,7 @@ public class ConnectionStatus implements Cloneable {
clonedObj.sourceName = sourceName;
clonedObj.destinationId = destinationId;
clonedObj.destinationName = destinationName;
clonedObj.flowFileAvailability = flowFileAvailability;
if (predictions != null) {
clonedObj.setPredictions(predictions.clone());
@ -265,6 +275,8 @@ public class ConnectionStatus implements Cloneable {
builder.append(backPressureDataSizeThreshold);
builder.append(", backPressureObjectThreshold=");
builder.append(backPressureObjectThreshold);
builder.append(", flowFileAvailability=");
builder.append(flowFileAvailability);
builder.append(", inputCount=");
builder.append(inputCount);
builder.append(", inputBytes=");

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.status;
public enum FlowFileAvailability {
ACTIVE_QUEUE_EMPTY,
HEAD_OF_QUEUE_PENALIZED,
FLOWFILE_AVAILABLE;
}

View File

@ -16,11 +16,12 @@
*/
package org.apache.nifi.controller.status;
import org.apache.nifi.registry.flow.VersionedFlowState;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.nifi.registry.flow.VersionedFlowState;
/**
*/
@ -255,7 +256,6 @@ public class ProcessGroupStatus implements Cloneable {
@Override
public ProcessGroupStatus clone() {
final ProcessGroupStatus clonedObj = new ProcessGroupStatus();
clonedObj.id = id;
@ -447,6 +447,7 @@ public class ProcessGroupStatus implements Cloneable {
merged.setInputBytes(merged.getInputBytes() + statusToMerge.getInputBytes());
merged.setOutputCount(merged.getOutputCount() + statusToMerge.getOutputCount());
merged.setOutputBytes(merged.getOutputBytes() + statusToMerge.getOutputBytes());
merged.setFlowFileAvailability(mergeFlowFileAvailability(merged.getFlowFileAvailability(), statusToMerge.getFlowFileAvailability()));
}
target.setConnectionStatus(mergedConnectionMap.values());
@ -588,4 +589,26 @@ public class ProcessGroupStatus implements Cloneable {
target.setRemoteProcessGroupStatus(mergedRemoteGroupMap.values());
}
public static FlowFileAvailability mergeFlowFileAvailability(final FlowFileAvailability availabilityA, final FlowFileAvailability availabilityB) {
if (availabilityA == availabilityB) {
return availabilityA;
}
if (availabilityA == null) {
return availabilityB;
}
if (availabilityB == null) {
return availabilityA;
}
if (availabilityA == FlowFileAvailability.FLOWFILE_AVAILABLE || availabilityB == FlowFileAvailability.FLOWFILE_AVAILABLE) {
return FlowFileAvailability.FLOWFILE_AVAILABLE;
}
if (availabilityA == FlowFileAvailability.HEAD_OF_QUEUE_PENALIZED || availabilityB == FlowFileAvailability.HEAD_OF_QUEUE_PENALIZED) {
return FlowFileAvailability.HEAD_OF_QUEUE_PENALIZED;
}
return FlowFileAvailability.FLOWFILE_AVAILABLE;
}
}

View File

@ -18,6 +18,7 @@ package org.apache.nifi.controller.queue;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.SwapSummary;
import org.apache.nifi.controller.status.FlowFileAvailability;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.processor.FlowFileFilter;
@ -109,6 +110,11 @@ public interface FlowFileQueue {
*/
boolean isEmpty();
/**
* @return the FlowFile Availability for this queue
*/
FlowFileAvailability getFlowFileAvailability();
/**
* @return <code>true</code> if the queue is empty or contains only FlowFiles that already are being processed
* by others, <code>false</code> if the queue contains at least one FlowFile that is available for processing,

View File

@ -48,6 +48,7 @@ public class ConnectionStatusSnapshotDTO implements Cloneable {
private String queuedCount;
private Integer percentUseCount;
private Integer percentUseBytes;
private String flowFileAvailability;
/* getters / setters */
/**
@ -283,6 +284,15 @@ public class ConnectionStatusSnapshotDTO implements Cloneable {
this.percentUseBytes = percentUseBytes;
}
@ApiModelProperty("The availability of FlowFiles in this connection")
public String getFlowFileAvailability() {
return flowFileAvailability;
}
public void setFlowFileAvailability(final String availability) {
this.flowFileAvailability = availability;
}
@Override
public ConnectionStatusSnapshotDTO clone() {
final ConnectionStatusSnapshotDTO other = new ConnectionStatusSnapshotDTO();

View File

@ -52,7 +52,7 @@ public class ConnectionStatusEndpointMerger extends AbstractSingleEntityEndpoint
final NodeIdentifier selectedNodeId = entityMap.entrySet().stream()
.filter(e -> e.getValue() == clientEntity)
.map(e -> e.getKey())
.map(Map.Entry::getKey)
.findFirst()
.orElse(null);

View File

@ -17,6 +17,8 @@
package org.apache.nifi.cluster.manager;
import org.apache.nifi.controller.status.FlowFileAvailability;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.RunStatus;
import org.apache.nifi.controller.status.TransmissionStatus;
import org.apache.nifi.registry.flow.VersionedFlowState;
@ -274,7 +276,7 @@ public class StatusMerger {
}
private static <T> Collection<T> replaceNull(final Collection<T> collection) {
return (collection == null) ? Collections.<T>emptyList() : collection;
return (collection == null) ? Collections.emptyList() : collection;
}
@ -490,6 +492,11 @@ public class StatusMerger {
target.setFlowFilesQueued(target.getFlowFilesQueued() + toMerge.getFlowFilesQueued());
target.setBytesQueued(target.getBytesQueued() + toMerge.getBytesQueued());
final FlowFileAvailability targetFlowFileAvailability = target.getFlowFileAvailability() == null ? null : FlowFileAvailability.valueOf(target.getFlowFileAvailability());
final FlowFileAvailability toMergeFlowFileAvailability = toMerge.getFlowFileAvailability() == null ? null : FlowFileAvailability.valueOf(toMerge.getFlowFileAvailability());
final FlowFileAvailability mergedFlowFileAvailability = ProcessGroupStatus.mergeFlowFileAvailability(targetFlowFileAvailability, toMergeFlowFileAvailability);
target.setFlowFileAvailability(mergedFlowFileAvailability == null ? null : mergedFlowFileAvailability.name());
if (target.getPercentUseBytes() == null) {
target.setPercentUseBytes(toMerge.getPercentUseBytes());
} else if (toMerge.getPercentUseBytes() != null) {
@ -543,14 +550,15 @@ public class StatusMerger {
}
private static long minNonNegative(long a, long b){
if(a < 0){
if (a < 0) {
return b;
}else if(b < 0){
} else if (b < 0) {
return a;
}else{
} else {
return Math.min(a, b);
}
}
public static void updatePrettyPrintedFields(final ConnectionStatusSnapshotDTO target) {
target.setQueued(prettyPrint(target.getFlowFilesQueued(), target.getBytesQueued()));
target.setQueuedCount(formatCount(target.getFlowFilesQueued()));

View File

@ -247,6 +247,7 @@ public abstract class AbstractEventAccess implements EventAccess {
connStatus.setTotalQueuedDuration(conn.getFlowFileQueue().getTotalQueuedDuration(now));
long minLastQueueDate = conn.getFlowFileQueue().getMinLastQueueDate();
connStatus.setMaxQueuedDuration(minLastQueueDate == 0 ? 0 : now - minLastQueueDate);
connStatus.setFlowFileAvailability(conn.getFlowFileQueue().getFlowFileAvailability());
final FlowFileEvent connectionStatusReport = statusReport.getReportEntry(conn.getIdentifier());
if (connectionStatusReport != null) {

View File

@ -16,18 +16,19 @@
*/
package org.apache.nifi.util;
import java.util.Collection;
import java.util.List;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.status.FlowFileAvailability;
import org.apache.nifi.processor.Relationship;
import java.util.Collection;
import java.util.List;
public class Connectables {
public static boolean flowFilesQueued(final Connectable connectable) {
for (final Connection conn : connectable.getIncomingConnections()) {
if (!conn.getFlowFileQueue().isActiveQueueEmpty()) {
if (conn.getFlowFileQueue().getFlowFileAvailability() == FlowFileAvailability.FLOWFILE_AVAILABLE) {
return true;
}
}

View File

@ -22,6 +22,7 @@ import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.FlowFileSwapManager;
import org.apache.nifi.controller.repository.SwapSummary;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.status.FlowFileAvailability;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.processor.FlowFileFilter;
@ -177,6 +178,11 @@ public class StandardFlowFileQueue extends AbstractFlowFileQueue implements Flow
return queue.getFlowFileQueueSize().isEmpty();
}
@Override
public FlowFileAvailability getFlowFileAvailability() {
return queue.getFlowFileAvailability();
}
@Override
public boolean isActiveQueueEmpty() {
final FlowFileQueueSize queueSize = queue.getFlowFileQueueSize();

View File

@ -23,6 +23,7 @@ import org.apache.nifi.controller.repository.IncompleteSwapFileException;
import org.apache.nifi.controller.repository.SwapContents;
import org.apache.nifi.controller.repository.SwapSummary;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.status.FlowFileAvailability;
import org.apache.nifi.controller.swap.StandardSwapSummary;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.flowfile.FlowFile;
@ -440,6 +441,31 @@ public class SwappablePriorityQueue {
return queueSize.getActiveCount() == 0 && queueSize.getSwappedCount() == 0;
}
public FlowFileAvailability getFlowFileAvailability() {
// If queue is empty, avoid obtaining a lock.
if (isActiveQueueEmpty()) {
return FlowFileAvailability.ACTIVE_QUEUE_EMPTY;
}
final FlowFileRecord top;
readLock.lock();
try {
top = activeQueue.peek();
} finally {
readLock.unlock("isFlowFileAvailable");
}
if (top == null) {
return FlowFileAvailability.ACTIVE_QUEUE_EMPTY;
}
if (top.isPenalized()) {
return FlowFileAvailability.HEAD_OF_QUEUE_PENALIZED;
}
return FlowFileAvailability.FLOWFILE_AVAILABLE;
}
public void acknowledge(final FlowFileRecord flowFile) {
logger.trace("{} Acknowledging {}", this, flowFile);
incrementUnacknowledgedQueueSize(-1, -flowFile.getSize());

View File

@ -27,6 +27,7 @@ import org.apache.nifi.controller.queue.AbstractFlowFileQueue;
import org.apache.nifi.controller.queue.ConnectionEventListener;
import org.apache.nifi.controller.queue.DropFlowFileRequest;
import org.apache.nifi.controller.queue.DropFlowFileState;
import org.apache.nifi.controller.status.FlowFileAvailability;
import org.apache.nifi.controller.queue.FlowFileQueueContents;
import org.apache.nifi.controller.queue.IllegalClusterStateException;
import org.apache.nifi.controller.queue.LoadBalanceStrategy;
@ -559,6 +560,11 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
return size().getObjectCount() == 0;
}
@Override
public FlowFileAvailability getFlowFileAvailability() {
return localPartition.getFlowFileAvailability();
}
@Override
public boolean isActiveQueueEmpty() {
return localPartition.isActiveQueueEmpty();

View File

@ -17,6 +17,7 @@
package org.apache.nifi.controller.queue.clustered.partition;
import org.apache.nifi.controller.status.FlowFileAvailability;
import org.apache.nifi.controller.queue.FlowFileQueueContents;
import org.apache.nifi.controller.queue.LocalQueuePartitionDiagnostics;
import org.apache.nifi.controller.queue.PollStrategy;
@ -38,6 +39,11 @@ public interface LocalQueuePartition extends QueuePartition {
*/
boolean isActiveQueueEmpty();
/**
* @return the availability of FlowFiles in the queue
*/
FlowFileAvailability getFlowFileAvailability();
/**
* @return <code>true</code> if there is at least one FlowFile that has not yet been acknowledged, <code>false</code> if all FlowFiles have been acknowledged.
*/

View File

@ -20,6 +20,7 @@ package org.apache.nifi.controller.queue.clustered.partition;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.queue.DropFlowFileAction;
import org.apache.nifi.controller.queue.DropFlowFileRequest;
import org.apache.nifi.controller.status.FlowFileAvailability;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.FlowFileQueueContents;
import org.apache.nifi.controller.queue.LocalQueuePartitionDiagnostics;
@ -102,6 +103,11 @@ public class SwappablePriorityQueueLocalPartition implements LocalQueuePartition
return priorityQueue.isActiveQueueEmpty();
}
@Override
public FlowFileAvailability getFlowFileAvailability() {
return priorityQueue.getFlowFileAvailability();
}
@Override
public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) {
return priorityQueue.poll(expiredRecords, getExpiration(), pollStrategy);

View File

@ -17,16 +17,16 @@
package org.apache.nifi.controller;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
public class MockFlowFileRecord implements FlowFileRecord {
private static final AtomicLong idGenerator = new AtomicLong(0L);
@ -37,6 +37,9 @@ public class MockFlowFileRecord implements FlowFileRecord {
private final ContentClaim contentClaim;
private long lastQueuedDate = System.currentTimeMillis() + 1;
private volatile long penaltyExpiration = 0L;
public MockFlowFileRecord() {
this(1L);
}
@ -85,7 +88,7 @@ public class MockFlowFileRecord implements FlowFileRecord {
@Override
public boolean isPenalized() {
return false;
return penaltyExpiration > System.currentTimeMillis();
}
@Override
@ -110,7 +113,11 @@ public class MockFlowFileRecord implements FlowFileRecord {
@Override
public long getPenaltyExpirationMillis() {
return 0;
return penaltyExpiration;
}
public void setPenaltyExpiration(final long timestamp) {
penaltyExpiration = timestamp;
}
@Override

View File

@ -25,6 +25,7 @@ import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.MockFlowFileRecord;
import org.apache.nifi.controller.MockSwapManager;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.status.FlowFileAvailability;
import org.apache.nifi.controller.queue.NopConnectionEventListener;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClientRegistry;
@ -139,6 +140,22 @@ public class TestSocketLoadBalancedFlowFileQueue {
"localhost", nodePort++, "localhost", nodePort++, nodePort++, true, Collections.emptySet());
}
@Test
public void testFlowFileAvailability() {
assertTrue(queue.isEmpty());
assertSame(FlowFileAvailability.ACTIVE_QUEUE_EMPTY, queue.getFlowFileAvailability());
final MockFlowFileRecord penalizedFlowFile = new MockFlowFileRecord(0L);
penalizedFlowFile.setPenaltyExpiration(System.currentTimeMillis() + 500_000L);
queue.put(penalizedFlowFile);
assertFalse(queue.isEmpty());
assertSame(FlowFileAvailability.HEAD_OF_QUEUE_PENALIZED, queue.getFlowFileAvailability());
penalizedFlowFile.setPenaltyExpiration(System.currentTimeMillis() - 1);
assertFalse(queue.isEmpty());
assertSame(FlowFileAvailability.FLOWFILE_AVAILABLE, queue.getFlowFileAvailability());
}
@Test
public void testPriorities() {

View File

@ -16,31 +16,10 @@
*/
package org.apache.nifi.controller.repository;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.controller.status.FlowFileAvailability;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.FlowFileQueueSize;
import org.apache.nifi.controller.queue.ListFlowFileStatus;
@ -76,6 +55,29 @@ import org.mockito.stubbing.Answer;
import org.wali.MinimalLockingWriteAheadLog;
import org.wali.WriteAheadRepository;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;
@SuppressWarnings("deprecation")
public class TestWriteAheadFlowFileRepository {
@ -188,6 +190,11 @@ public class TestWriteAheadFlowFileRepository {
return false;
}
@Override
public FlowFileAvailability getFlowFileAvailability() {
return isActiveQueueEmpty() ? FlowFileAvailability.ACTIVE_QUEUE_EMPTY : FlowFileAvailability.FLOWFILE_AVAILABLE;
}
@Override
public boolean isActiveQueueEmpty() {
return false;

View File

@ -17,16 +17,6 @@
package org.apache.nifi.controller.tasks;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
@ -38,14 +28,25 @@ import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.repository.RepositoryContext;
import org.apache.nifi.controller.repository.StandardRepositoryContext;
import org.apache.nifi.controller.scheduling.RepositoryContextFactory;
import org.apache.nifi.controller.scheduling.LifecycleState;
import org.apache.nifi.controller.scheduling.RepositoryContextFactory;
import org.apache.nifi.controller.scheduling.SchedulingAgent;
import org.apache.nifi.controller.status.FlowFileAvailability;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.processor.Processor;
import org.junit.Test;
import org.mockito.Mockito;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;
public class TestConnectableTask {
@ -90,10 +91,10 @@ public class TestConnectableTask {
// Test with only a single connection that is self-looping and empty
final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
when(flowFileQueue.isActiveQueueEmpty()).thenReturn(true);
when(flowFileQueue.getFlowFileAvailability()).thenReturn(FlowFileAvailability.ACTIVE_QUEUE_EMPTY);
final FlowFileQueue nonEmptyQueue = Mockito.mock(FlowFileQueue.class);
when(nonEmptyQueue.isActiveQueueEmpty()).thenReturn(false);
when(nonEmptyQueue.getFlowFileAvailability()).thenReturn(FlowFileAvailability.FLOWFILE_AVAILABLE);
when(selfLoopingConnection.getFlowFileQueue()).thenReturn(nonEmptyQueue);
assertFalse(task.invoke().isYield());
@ -139,7 +140,7 @@ public class TestConnectableTask {
when(funnel.getIncomingConnections()).thenReturn(Collections.singletonList(selfLoopingConnection));
final FlowFileQueue emptyQueue = Mockito.mock(FlowFileQueue.class);
when(emptyQueue.isActiveQueueEmpty()).thenReturn(true);
when(emptyQueue.getFlowFileAvailability()).thenReturn(FlowFileAvailability.ACTIVE_QUEUE_EMPTY);
when(selfLoopingConnection.getFlowFileQueue()).thenReturn(emptyQueue);
final Set<Connection> outgoingConnections = new HashSet<>();
@ -173,7 +174,7 @@ public class TestConnectableTask {
// Adding input FlowFiles.
final FlowFileQueue nonEmptyQueue = Mockito.mock(FlowFileQueue.class);
when(nonEmptyQueue.isActiveQueueEmpty()).thenReturn(false);
when(nonEmptyQueue.getFlowFileAvailability()).thenReturn(FlowFileAvailability.FLOWFILE_AVAILABLE);
when(incomingFromAnotherComponent.getFlowFileQueue()).thenReturn(nonEmptyQueue);
assertFalse("When a Funnel has both incoming and outgoing connections and FlowFiles to process, then it should be executed.",
task.invoke().isYield());

View File

@ -708,7 +708,7 @@ public final class DtoFactory {
dto.setBackPressureObjectThreshold(flowFileQueue.getBackPressureObjectThreshold());
dto.setBackPressureDataSizeThreshold(flowFileQueue.getBackPressureDataSizeThreshold());
dto.setFlowFileExpiration(flowFileQueue.getFlowFileExpiration());
dto.setPrioritizers(new ArrayList<String>());
dto.setPrioritizers(new ArrayList<>());
for (final FlowFilePrioritizer comparator : flowFileQueue.getPriorities()) {
dto.getPrioritizers().add(comparator.getClass().getCanonicalName());
}
@ -717,7 +717,7 @@ public final class DtoFactory {
for (final Relationship selectedRelationship : connection.getRelationships()) {
if (!Relationship.ANONYMOUS.equals(selectedRelationship)) {
if (dto.getSelectedRelationships() == null) {
dto.setSelectedRelationships(new TreeSet<String>(Collator.getInstance(Locale.US)));
dto.setSelectedRelationships(new TreeSet<>(Collator.getInstance(Locale.US)));
}
dto.getSelectedRelationships().add(selectedRelationship.getName());
@ -728,7 +728,7 @@ public final class DtoFactory {
for (final Relationship availableRelationship : connection.getSource().getRelationships()) {
if (!Relationship.ANONYMOUS.equals(availableRelationship)) {
if (dto.getAvailableRelationships() == null) {
dto.setAvailableRelationships(new TreeSet<String>(Collator.getInstance(Locale.US)));
dto.setAvailableRelationships(new TreeSet<>(Collator.getInstance(Locale.US)));
}
dto.getAvailableRelationships().add(availableRelationship.getName());
@ -1177,6 +1177,7 @@ public final class DtoFactory {
snapshot.setFlowFilesQueued(connectionStatus.getQueuedCount());
snapshot.setBytesQueued(connectionStatus.getQueuedBytes());
snapshot.setFlowFileAvailability(connectionStatus.getFlowFileAvailability().name());
snapshot.setFlowFilesIn(connectionStatus.getInputCount());
snapshot.setBytesIn(connectionStatus.getInputBytes());

View File

@ -18,6 +18,7 @@
package org.apache.nifi.stateless.queue;
import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.controller.status.FlowFileAvailability;
import org.apache.nifi.controller.queue.ListFlowFileStatus;
import org.apache.nifi.controller.queue.LoadBalanceCompression;
import org.apache.nifi.controller.queue.LoadBalanceStrategy;
@ -123,6 +124,12 @@ public class StatelessFlowFileQueue implements DrainableFlowFileQueue {
return flowFiles.isEmpty() && unacknowledgedCount.get() == 0;
}
@Override
public FlowFileAvailability getFlowFileAvailability() {
// Penalization is ignored in stateless so we can just rely on whether or not the active queue is empty
return isActiveQueueEmpty() ? FlowFileAvailability.ACTIVE_QUEUE_EMPTY : FlowFileAvailability.FLOWFILE_AVAILABLE;
}
@Override
public boolean isActiveQueueEmpty() {
return flowFiles.isEmpty();