diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LoadBalancedFlowFileQueue.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LoadBalancedFlowFileQueue.java index f0eff27ef2..b9f6951960 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LoadBalancedFlowFileQueue.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LoadBalancedFlowFileQueue.java @@ -66,4 +66,10 @@ public interface LoadBalancedFlowFileQueue extends FlowFileQueue { */ boolean isPropagateBackpressureAcrossNodes(); + /** + * Determines whether or not the local partition's size >= backpressure threshold + * + * @return true if the number of FlowFiles or total size of FlowFiles in the local partition alone meets or exceeds the backpressure threshold, false otherwise. + */ + boolean isLocalPartitionFull(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java index 5bf75a4cc8..436b85d396 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java @@ -141,6 +141,10 @@ public abstract class AbstractFlowFileQueue implements FlowFileQueue { @Override public boolean isFull() { + return isFull(size()); + } + + protected boolean isFull(final QueueSize queueSize) { final MaxQueueSize maxSize = getMaxQueueSize(); // Check if max size is set @@ -148,7 +152,6 @@ public abstract class AbstractFlowFileQueue implements FlowFileQueue { return false; } - final QueueSize queueSize = size(); if (maxSize.getMaxCount() > 0 && queueSize.getObjectCount() >= maxSize.getMaxCount()) { return true; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java index 7b3a21124a..84731f769e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java @@ -601,6 +601,11 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple adjustSize(-flowFiles.size(), -flowFiles.stream().mapToLong(FlowFileRecord::getSize).sum()); } + @Override + public boolean isLocalPartitionFull() { + return isFull(localPartition.size()); + } + /** * Determines which QueuePartition the given FlowFile belongs to. Must be called with partition read lock held. * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java index dda71de58d..f508d12fba 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java @@ -248,13 +248,15 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol { "not configured to allow for Load Balancing"); } + final LoadBalancedFlowFileQueue loadBalancedFlowFileQueue = (LoadBalancedFlowFileQueue) flowFileQueue; + final int spaceCheck = dataIn.read(); if (spaceCheck < 0) { throw new EOFException("Expected to receive a request to determine whether or not space was available for Connection with ID " + connectionId + " from Peer " + peerDescription); } if (spaceCheck == CHECK_SPACE) { - if (flowFileQueue.isFull()) { + if (loadBalancedFlowFileQueue.isLocalPartitionFull()) { logger.debug("Received a 'Check Space' request from Peer {} for Connection with ID {}; responding with QUEUE_FULL", peerDescription, connectionId); out.write(QUEUE_FULL); out.flush();