diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java
index 0fe5a99a8d..754483e5ed 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java
@@ -44,6 +44,7 @@ public class ConnectionStatus implements Cloneable {
private long maxQueuedBytes;
private long totalQueuedDuration;
private long maxQueuedDuration;
+ private FlowFileAvailability flowFileAvailability;
public String getId() {
return id;
@@ -214,6 +215,14 @@ public class ConnectionStatus implements Cloneable {
this.maxQueuedDuration = maxQueuedDuration;
}
+ public FlowFileAvailability getFlowFileAvailability() {
+ return flowFileAvailability;
+ }
+
+ public void setFlowFileAvailability(final FlowFileAvailability availability) {
+ this.flowFileAvailability = availability;
+ }
+
@Override
public ConnectionStatus clone() {
final ConnectionStatus clonedObj = new ConnectionStatus();
@@ -230,6 +239,7 @@ public class ConnectionStatus implements Cloneable {
clonedObj.sourceName = sourceName;
clonedObj.destinationId = destinationId;
clonedObj.destinationName = destinationName;
+ clonedObj.flowFileAvailability = flowFileAvailability;
if (predictions != null) {
clonedObj.setPredictions(predictions.clone());
@@ -265,6 +275,8 @@ public class ConnectionStatus implements Cloneable {
builder.append(backPressureDataSizeThreshold);
builder.append(", backPressureObjectThreshold=");
builder.append(backPressureObjectThreshold);
+ builder.append(", flowFileAvailability=");
+ builder.append(flowFileAvailability);
builder.append(", inputCount=");
builder.append(inputCount);
builder.append(", inputBytes=");
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/FlowFileAvailability.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/FlowFileAvailability.java
new file mode 100644
index 0000000000..d805312dfa
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/FlowFileAvailability.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.status;
+
+public enum FlowFileAvailability {
+ ACTIVE_QUEUE_EMPTY,
+
+ HEAD_OF_QUEUE_PENALIZED,
+
+ FLOWFILE_AVAILABLE;
+}
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java
index 758a059802..3e4720ef6b 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java
@@ -16,11 +16,12 @@
*/
package org.apache.nifi.controller.status;
+import org.apache.nifi.registry.flow.VersionedFlowState;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
-import org.apache.nifi.registry.flow.VersionedFlowState;
/**
*/
@@ -255,7 +256,6 @@ public class ProcessGroupStatus implements Cloneable {
@Override
public ProcessGroupStatus clone() {
-
final ProcessGroupStatus clonedObj = new ProcessGroupStatus();
clonedObj.id = id;
@@ -447,6 +447,7 @@ public class ProcessGroupStatus implements Cloneable {
merged.setInputBytes(merged.getInputBytes() + statusToMerge.getInputBytes());
merged.setOutputCount(merged.getOutputCount() + statusToMerge.getOutputCount());
merged.setOutputBytes(merged.getOutputBytes() + statusToMerge.getOutputBytes());
+ merged.setFlowFileAvailability(mergeFlowFileAvailability(merged.getFlowFileAvailability(), statusToMerge.getFlowFileAvailability()));
}
target.setConnectionStatus(mergedConnectionMap.values());
@@ -588,4 +589,26 @@ public class ProcessGroupStatus implements Cloneable {
target.setRemoteProcessGroupStatus(mergedRemoteGroupMap.values());
}
+
+ public static FlowFileAvailability mergeFlowFileAvailability(final FlowFileAvailability availabilityA, final FlowFileAvailability availabilityB) {
+ if (availabilityA == availabilityB) {
+ return availabilityA;
+ }
+ if (availabilityA == null) {
+ return availabilityB;
+ }
+ if (availabilityB == null) {
+ return availabilityA;
+ }
+
+ if (availabilityA == FlowFileAvailability.FLOWFILE_AVAILABLE || availabilityB == FlowFileAvailability.FLOWFILE_AVAILABLE) {
+ return FlowFileAvailability.FLOWFILE_AVAILABLE;
+ }
+
+ if (availabilityA == FlowFileAvailability.HEAD_OF_QUEUE_PENALIZED || availabilityB == FlowFileAvailability.HEAD_OF_QUEUE_PENALIZED) {
+ return FlowFileAvailability.HEAD_OF_QUEUE_PENALIZED;
+ }
+
+ return FlowFileAvailability.FLOWFILE_AVAILABLE;
+ }
}
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
index d4b6b2ed65..8ed2d6c149 100644
--- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
@@ -18,6 +18,7 @@ package org.apache.nifi.controller.queue;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.SwapSummary;
+import org.apache.nifi.controller.status.FlowFileAvailability;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.processor.FlowFileFilter;
@@ -109,6 +110,11 @@ public interface FlowFileQueue {
*/
boolean isEmpty();
+ /**
+ * @return the FlowFile Availability for this queue
+ */
+ FlowFileAvailability getFlowFileAvailability();
+
/**
* @return true
if the queue is empty or contains only FlowFiles that already are being processed
* by others, false
if the queue contains at least one FlowFile that is available for processing,
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatusSnapshotDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatusSnapshotDTO.java
index e8c9df611a..ba507ec935 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatusSnapshotDTO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatusSnapshotDTO.java
@@ -48,6 +48,7 @@ public class ConnectionStatusSnapshotDTO implements Cloneable {
private String queuedCount;
private Integer percentUseCount;
private Integer percentUseBytes;
+ private String flowFileAvailability;
/* getters / setters */
/**
@@ -283,6 +284,15 @@ public class ConnectionStatusSnapshotDTO implements Cloneable {
this.percentUseBytes = percentUseBytes;
}
+ @ApiModelProperty("The availability of FlowFiles in this connection")
+ public String getFlowFileAvailability() {
+ return flowFileAvailability;
+ }
+
+ public void setFlowFileAvailability(final String availability) {
+ this.flowFileAvailability = availability;
+ }
+
@Override
public ConnectionStatusSnapshotDTO clone() {
final ConnectionStatusSnapshotDTO other = new ConnectionStatusSnapshotDTO();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ConnectionStatusEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ConnectionStatusEndpointMerger.java
index db455d4bd7..d993187696 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ConnectionStatusEndpointMerger.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ConnectionStatusEndpointMerger.java
@@ -52,7 +52,7 @@ public class ConnectionStatusEndpointMerger extends AbstractSingleEntityEndpoint
final NodeIdentifier selectedNodeId = entityMap.entrySet().stream()
.filter(e -> e.getValue() == clientEntity)
- .map(e -> e.getKey())
+ .map(Map.Entry::getKey)
.findFirst()
.orElse(null);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
index bf702773ff..0ad131fe79 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java
@@ -17,6 +17,8 @@
package org.apache.nifi.cluster.manager;
+import org.apache.nifi.controller.status.FlowFileAvailability;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.RunStatus;
import org.apache.nifi.controller.status.TransmissionStatus;
import org.apache.nifi.registry.flow.VersionedFlowState;
@@ -274,7 +276,7 @@ public class StatusMerger {
}
private static Collection replaceNull(final Collection collection) {
- return (collection == null) ? Collections.emptyList() : collection;
+ return (collection == null) ? Collections.emptyList() : collection;
}
@@ -490,6 +492,11 @@ public class StatusMerger {
target.setFlowFilesQueued(target.getFlowFilesQueued() + toMerge.getFlowFilesQueued());
target.setBytesQueued(target.getBytesQueued() + toMerge.getBytesQueued());
+ final FlowFileAvailability targetFlowFileAvailability = target.getFlowFileAvailability() == null ? null : FlowFileAvailability.valueOf(target.getFlowFileAvailability());
+ final FlowFileAvailability toMergeFlowFileAvailability = toMerge.getFlowFileAvailability() == null ? null : FlowFileAvailability.valueOf(toMerge.getFlowFileAvailability());
+ final FlowFileAvailability mergedFlowFileAvailability = ProcessGroupStatus.mergeFlowFileAvailability(targetFlowFileAvailability, toMergeFlowFileAvailability);
+ target.setFlowFileAvailability(mergedFlowFileAvailability == null ? null : mergedFlowFileAvailability.name());
+
if (target.getPercentUseBytes() == null) {
target.setPercentUseBytes(toMerge.getPercentUseBytes());
} else if (toMerge.getPercentUseBytes() != null) {
@@ -543,14 +550,15 @@ public class StatusMerger {
}
private static long minNonNegative(long a, long b){
- if(a < 0){
+ if (a < 0) {
return b;
- }else if(b < 0){
+ } else if (b < 0) {
return a;
- }else{
+ } else {
return Math.min(a, b);
}
}
+
public static void updatePrettyPrintedFields(final ConnectionStatusSnapshotDTO target) {
target.setQueued(prettyPrint(target.getFlowFilesQueued(), target.getBytesQueued()));
target.setQueuedCount(formatCount(target.getFlowFilesQueued()));
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java
index 56e87c6d5e..e98a3f7e71 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java
@@ -247,6 +247,7 @@ public abstract class AbstractEventAccess implements EventAccess {
connStatus.setTotalQueuedDuration(conn.getFlowFileQueue().getTotalQueuedDuration(now));
long minLastQueueDate = conn.getFlowFileQueue().getMinLastQueueDate();
connStatus.setMaxQueuedDuration(minLastQueueDate == 0 ? 0 : now - minLastQueueDate);
+ connStatus.setFlowFileAvailability(conn.getFlowFileQueue().getFlowFileAvailability());
final FlowFileEvent connectionStatusReport = statusReport.getReportEntry(conn.getIdentifier());
if (connectionStatusReport != null) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/Connectables.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/Connectables.java
index 5d74ebb481..4a36be5a85 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/Connectables.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/Connectables.java
@@ -16,18 +16,19 @@
*/
package org.apache.nifi.util;
-import java.util.Collection;
-import java.util.List;
-
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.status.FlowFileAvailability;
import org.apache.nifi.processor.Relationship;
+import java.util.Collection;
+import java.util.List;
+
public class Connectables {
public static boolean flowFilesQueued(final Connectable connectable) {
for (final Connection conn : connectable.getIncomingConnections()) {
- if (!conn.getFlowFileQueue().isActiveQueueEmpty()) {
+ if (conn.getFlowFileQueue().getFlowFileAvailability() == FlowFileAvailability.FLOWFILE_AVAILABLE) {
return true;
}
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java
index a87fcfe46d..3828588b35 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java
@@ -22,6 +22,7 @@ import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.FlowFileSwapManager;
import org.apache.nifi.controller.repository.SwapSummary;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.status.FlowFileAvailability;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.processor.FlowFileFilter;
@@ -177,6 +178,11 @@ public class StandardFlowFileQueue extends AbstractFlowFileQueue implements Flow
return queue.getFlowFileQueueSize().isEmpty();
}
+ @Override
+ public FlowFileAvailability getFlowFileAvailability() {
+ return queue.getFlowFileAvailability();
+ }
+
@Override
public boolean isActiveQueueEmpty() {
final FlowFileQueueSize queueSize = queue.getFlowFileQueueSize();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
index 34da62c615..92129a58ab 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
@@ -23,6 +23,7 @@ import org.apache.nifi.controller.repository.IncompleteSwapFileException;
import org.apache.nifi.controller.repository.SwapContents;
import org.apache.nifi.controller.repository.SwapSummary;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.status.FlowFileAvailability;
import org.apache.nifi.controller.swap.StandardSwapSummary;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.flowfile.FlowFile;
@@ -440,6 +441,31 @@ public class SwappablePriorityQueue {
return queueSize.getActiveCount() == 0 && queueSize.getSwappedCount() == 0;
}
+ public FlowFileAvailability getFlowFileAvailability() {
+ // If queue is empty, avoid obtaining a lock.
+ if (isActiveQueueEmpty()) {
+ return FlowFileAvailability.ACTIVE_QUEUE_EMPTY;
+ }
+
+ final FlowFileRecord top;
+ readLock.lock();
+ try {
+ top = activeQueue.peek();
+ } finally {
+ readLock.unlock("isFlowFileAvailable");
+ }
+
+ if (top == null) {
+ return FlowFileAvailability.ACTIVE_QUEUE_EMPTY;
+ }
+
+ if (top.isPenalized()) {
+ return FlowFileAvailability.HEAD_OF_QUEUE_PENALIZED;
+ }
+
+ return FlowFileAvailability.FLOWFILE_AVAILABLE;
+ }
+
public void acknowledge(final FlowFileRecord flowFile) {
logger.trace("{} Acknowledging {}", this, flowFile);
incrementUnacknowledgedQueueSize(-1, -flowFile.getSize());
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
index 6b8d418cfe..2a0c504afd 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
@@ -27,6 +27,7 @@ import org.apache.nifi.controller.queue.AbstractFlowFileQueue;
import org.apache.nifi.controller.queue.ConnectionEventListener;
import org.apache.nifi.controller.queue.DropFlowFileRequest;
import org.apache.nifi.controller.queue.DropFlowFileState;
+import org.apache.nifi.controller.status.FlowFileAvailability;
import org.apache.nifi.controller.queue.FlowFileQueueContents;
import org.apache.nifi.controller.queue.IllegalClusterStateException;
import org.apache.nifi.controller.queue.LoadBalanceStrategy;
@@ -559,6 +560,11 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
return size().getObjectCount() == 0;
}
+ @Override
+ public FlowFileAvailability getFlowFileAvailability() {
+ return localPartition.getFlowFileAvailability();
+ }
+
@Override
public boolean isActiveQueueEmpty() {
return localPartition.isActiveQueueEmpty();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalQueuePartition.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalQueuePartition.java
index 84f0bab3bc..0679b509ce 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalQueuePartition.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalQueuePartition.java
@@ -17,6 +17,7 @@
package org.apache.nifi.controller.queue.clustered.partition;
+import org.apache.nifi.controller.status.FlowFileAvailability;
import org.apache.nifi.controller.queue.FlowFileQueueContents;
import org.apache.nifi.controller.queue.LocalQueuePartitionDiagnostics;
import org.apache.nifi.controller.queue.PollStrategy;
@@ -38,6 +39,11 @@ public interface LocalQueuePartition extends QueuePartition {
*/
boolean isActiveQueueEmpty();
+ /**
+ * @return the availability of FlowFiles in the queue
+ */
+ FlowFileAvailability getFlowFileAvailability();
+
/**
* @return true
if there is at least one FlowFile that has not yet been acknowledged, false
if all FlowFiles have been acknowledged.
*/
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/SwappablePriorityQueueLocalPartition.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/SwappablePriorityQueueLocalPartition.java
index 03e8e1848a..ae41e554bc 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/SwappablePriorityQueueLocalPartition.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/SwappablePriorityQueueLocalPartition.java
@@ -20,6 +20,7 @@ package org.apache.nifi.controller.queue.clustered.partition;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.queue.DropFlowFileAction;
import org.apache.nifi.controller.queue.DropFlowFileRequest;
+import org.apache.nifi.controller.status.FlowFileAvailability;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.FlowFileQueueContents;
import org.apache.nifi.controller.queue.LocalQueuePartitionDiagnostics;
@@ -102,6 +103,11 @@ public class SwappablePriorityQueueLocalPartition implements LocalQueuePartition
return priorityQueue.isActiveQueueEmpty();
}
+ @Override
+ public FlowFileAvailability getFlowFileAvailability() {
+ return priorityQueue.getFlowFileAvailability();
+ }
+
@Override
public FlowFileRecord poll(final Set expiredRecords, final PollStrategy pollStrategy) {
return priorityQueue.poll(expiredRecords, getExpiration(), pollStrategy);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockFlowFileRecord.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockFlowFileRecord.java
index 4369fa21ae..1b62b69af4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockFlowFileRecord.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockFlowFileRecord.java
@@ -17,16 +17,16 @@
package org.apache.nifi.controller;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
public class MockFlowFileRecord implements FlowFileRecord {
private static final AtomicLong idGenerator = new AtomicLong(0L);
@@ -37,6 +37,9 @@ public class MockFlowFileRecord implements FlowFileRecord {
private final ContentClaim contentClaim;
private long lastQueuedDate = System.currentTimeMillis() + 1;
+ private volatile long penaltyExpiration = 0L;
+
+
public MockFlowFileRecord() {
this(1L);
}
@@ -85,7 +88,7 @@ public class MockFlowFileRecord implements FlowFileRecord {
@Override
public boolean isPenalized() {
- return false;
+ return penaltyExpiration > System.currentTimeMillis();
}
@Override
@@ -110,7 +113,11 @@ public class MockFlowFileRecord implements FlowFileRecord {
@Override
public long getPenaltyExpirationMillis() {
- return 0;
+ return penaltyExpiration;
+ }
+
+ public void setPenaltyExpiration(final long timestamp) {
+ penaltyExpiration = timestamp;
}
@Override
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
index 0deb917331..83eea46a34 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
@@ -25,6 +25,7 @@ import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.MockFlowFileRecord;
import org.apache.nifi.controller.MockSwapManager;
import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.status.FlowFileAvailability;
import org.apache.nifi.controller.queue.NopConnectionEventListener;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClientRegistry;
@@ -139,6 +140,22 @@ public class TestSocketLoadBalancedFlowFileQueue {
"localhost", nodePort++, "localhost", nodePort++, nodePort++, true, Collections.emptySet());
}
+ @Test
+ public void testFlowFileAvailability() {
+ assertTrue(queue.isEmpty());
+ assertSame(FlowFileAvailability.ACTIVE_QUEUE_EMPTY, queue.getFlowFileAvailability());
+
+ final MockFlowFileRecord penalizedFlowFile = new MockFlowFileRecord(0L);
+ penalizedFlowFile.setPenaltyExpiration(System.currentTimeMillis() + 500_000L);
+ queue.put(penalizedFlowFile);
+
+ assertFalse(queue.isEmpty());
+ assertSame(FlowFileAvailability.HEAD_OF_QUEUE_PENALIZED, queue.getFlowFileAvailability());
+
+ penalizedFlowFile.setPenaltyExpiration(System.currentTimeMillis() - 1);
+ assertFalse(queue.isEmpty());
+ assertSame(FlowFileAvailability.FLOWFILE_AVAILABLE, queue.getFlowFileAvailability());
+ }
@Test
public void testPriorities() {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
index 712a9e7c3a..39696bff4a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
@@ -16,31 +16,10 @@
*/
package org.apache.nifi.controller.repository;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.when;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.queue.DropFlowFileStatus;
+import org.apache.nifi.controller.status.FlowFileAvailability;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.FlowFileQueueSize;
import org.apache.nifi.controller.queue.ListFlowFileStatus;
@@ -76,6 +55,29 @@ import org.mockito.stubbing.Answer;
import org.wali.MinimalLockingWriteAheadLog;
import org.wali.WriteAheadRepository;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.when;
+
@SuppressWarnings("deprecation")
public class TestWriteAheadFlowFileRepository {
@@ -188,6 +190,11 @@ public class TestWriteAheadFlowFileRepository {
return false;
}
+ @Override
+ public FlowFileAvailability getFlowFileAvailability() {
+ return isActiveQueueEmpty() ? FlowFileAvailability.ACTIVE_QUEUE_EMPTY : FlowFileAvailability.FLOWFILE_AVAILABLE;
+ }
+
@Override
public boolean isActiveQueueEmpty() {
return false;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestConnectableTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestConnectableTask.java
index ca4e136948..92f922712f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestConnectableTask.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestConnectableTask.java
@@ -17,16 +17,6 @@
package org.apache.nifi.controller.tasks;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.when;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
@@ -38,14 +28,25 @@ import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.repository.RepositoryContext;
import org.apache.nifi.controller.repository.StandardRepositoryContext;
-import org.apache.nifi.controller.scheduling.RepositoryContextFactory;
import org.apache.nifi.controller.scheduling.LifecycleState;
+import org.apache.nifi.controller.scheduling.RepositoryContextFactory;
import org.apache.nifi.controller.scheduling.SchedulingAgent;
+import org.apache.nifi.controller.status.FlowFileAvailability;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.processor.Processor;
import org.junit.Test;
import org.mockito.Mockito;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
public class TestConnectableTask {
@@ -90,10 +91,10 @@ public class TestConnectableTask {
// Test with only a single connection that is self-looping and empty
final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
- when(flowFileQueue.isActiveQueueEmpty()).thenReturn(true);
+ when(flowFileQueue.getFlowFileAvailability()).thenReturn(FlowFileAvailability.ACTIVE_QUEUE_EMPTY);
final FlowFileQueue nonEmptyQueue = Mockito.mock(FlowFileQueue.class);
- when(nonEmptyQueue.isActiveQueueEmpty()).thenReturn(false);
+ when(nonEmptyQueue.getFlowFileAvailability()).thenReturn(FlowFileAvailability.FLOWFILE_AVAILABLE);
when(selfLoopingConnection.getFlowFileQueue()).thenReturn(nonEmptyQueue);
assertFalse(task.invoke().isYield());
@@ -139,7 +140,7 @@ public class TestConnectableTask {
when(funnel.getIncomingConnections()).thenReturn(Collections.singletonList(selfLoopingConnection));
final FlowFileQueue emptyQueue = Mockito.mock(FlowFileQueue.class);
- when(emptyQueue.isActiveQueueEmpty()).thenReturn(true);
+ when(emptyQueue.getFlowFileAvailability()).thenReturn(FlowFileAvailability.ACTIVE_QUEUE_EMPTY);
when(selfLoopingConnection.getFlowFileQueue()).thenReturn(emptyQueue);
final Set outgoingConnections = new HashSet<>();
@@ -173,7 +174,7 @@ public class TestConnectableTask {
// Adding input FlowFiles.
final FlowFileQueue nonEmptyQueue = Mockito.mock(FlowFileQueue.class);
- when(nonEmptyQueue.isActiveQueueEmpty()).thenReturn(false);
+ when(nonEmptyQueue.getFlowFileAvailability()).thenReturn(FlowFileAvailability.FLOWFILE_AVAILABLE);
when(incomingFromAnotherComponent.getFlowFileQueue()).thenReturn(nonEmptyQueue);
assertFalse("When a Funnel has both incoming and outgoing connections and FlowFiles to process, then it should be executed.",
task.invoke().isYield());
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 0b0d7f8177..ae2fd44ace 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -708,7 +708,7 @@ public final class DtoFactory {
dto.setBackPressureObjectThreshold(flowFileQueue.getBackPressureObjectThreshold());
dto.setBackPressureDataSizeThreshold(flowFileQueue.getBackPressureDataSizeThreshold());
dto.setFlowFileExpiration(flowFileQueue.getFlowFileExpiration());
- dto.setPrioritizers(new ArrayList());
+ dto.setPrioritizers(new ArrayList<>());
for (final FlowFilePrioritizer comparator : flowFileQueue.getPriorities()) {
dto.getPrioritizers().add(comparator.getClass().getCanonicalName());
}
@@ -717,7 +717,7 @@ public final class DtoFactory {
for (final Relationship selectedRelationship : connection.getRelationships()) {
if (!Relationship.ANONYMOUS.equals(selectedRelationship)) {
if (dto.getSelectedRelationships() == null) {
- dto.setSelectedRelationships(new TreeSet(Collator.getInstance(Locale.US)));
+ dto.setSelectedRelationships(new TreeSet<>(Collator.getInstance(Locale.US)));
}
dto.getSelectedRelationships().add(selectedRelationship.getName());
@@ -728,7 +728,7 @@ public final class DtoFactory {
for (final Relationship availableRelationship : connection.getSource().getRelationships()) {
if (!Relationship.ANONYMOUS.equals(availableRelationship)) {
if (dto.getAvailableRelationships() == null) {
- dto.setAvailableRelationships(new TreeSet(Collator.getInstance(Locale.US)));
+ dto.setAvailableRelationships(new TreeSet<>(Collator.getInstance(Locale.US)));
}
dto.getAvailableRelationships().add(availableRelationship.getName());
@@ -1177,6 +1177,7 @@ public final class DtoFactory {
snapshot.setFlowFilesQueued(connectionStatus.getQueuedCount());
snapshot.setBytesQueued(connectionStatus.getQueuedBytes());
+ snapshot.setFlowFileAvailability(connectionStatus.getFlowFileAvailability().name());
snapshot.setFlowFilesIn(connectionStatus.getInputCount());
snapshot.setBytesIn(connectionStatus.getInputBytes());
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/queue/StatelessFlowFileQueue.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/queue/StatelessFlowFileQueue.java
index 3b4a621d35..9981228497 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/queue/StatelessFlowFileQueue.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/queue/StatelessFlowFileQueue.java
@@ -18,6 +18,7 @@
package org.apache.nifi.stateless.queue;
import org.apache.nifi.controller.queue.DropFlowFileStatus;
+import org.apache.nifi.controller.status.FlowFileAvailability;
import org.apache.nifi.controller.queue.ListFlowFileStatus;
import org.apache.nifi.controller.queue.LoadBalanceCompression;
import org.apache.nifi.controller.queue.LoadBalanceStrategy;
@@ -123,6 +124,12 @@ public class StatelessFlowFileQueue implements DrainableFlowFileQueue {
return flowFiles.isEmpty() && unacknowledgedCount.get() == 0;
}
+ @Override
+ public FlowFileAvailability getFlowFileAvailability() {
+ // Penalization is ignored in stateless so we can just rely on whether or not the active queue is empty
+ return isActiveQueueEmpty() ? FlowFileAvailability.ACTIVE_QUEUE_EMPTY : FlowFileAvailability.FLOWFILE_AVAILABLE;
+ }
+
@Override
public boolean isActiveQueueEmpty() {
return flowFiles.isEmpty();