From 234ddb0fe1a36ad947c340114058d82c777d791f Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 24 Oct 2018 13:06:57 -0400 Subject: [PATCH] NIFI-5745: When determining if backpressure should be applied across nodes for load balancing, only consider if the local partition has reached the threshold limits instead of considering the size of the entire queue This closes #3108. Signed-off-by: Koji Kawamura --- .../nifi/controller/queue/LoadBalancedFlowFileQueue.java | 6 ++++++ .../apache/nifi/controller/queue/AbstractFlowFileQueue.java | 5 ++++- .../queue/clustered/SocketLoadBalancedFlowFileQueue.java | 5 +++++ .../queue/clustered/server/StandardLoadBalanceProtocol.java | 4 +++- 4 files changed, 18 insertions(+), 2 deletions(-) diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LoadBalancedFlowFileQueue.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LoadBalancedFlowFileQueue.java index f0eff27ef2..b9f6951960 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LoadBalancedFlowFileQueue.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LoadBalancedFlowFileQueue.java @@ -66,4 +66,10 @@ public interface LoadBalancedFlowFileQueue extends FlowFileQueue { */ boolean isPropagateBackpressureAcrossNodes(); + /** + * Determines whether or not the local partition's size >= backpressure threshold + * + * @return true if the number of FlowFiles or total size of FlowFiles in the local partition alone meets or exceeds the backpressure threshold, false otherwise. + */ + boolean isLocalPartitionFull(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java index 5bf75a4cc8..436b85d396 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java @@ -141,6 +141,10 @@ public abstract class AbstractFlowFileQueue implements FlowFileQueue { @Override public boolean isFull() { + return isFull(size()); + } + + protected boolean isFull(final QueueSize queueSize) { final MaxQueueSize maxSize = getMaxQueueSize(); // Check if max size is set @@ -148,7 +152,6 @@ public abstract class AbstractFlowFileQueue implements FlowFileQueue { return false; } - final QueueSize queueSize = size(); if (maxSize.getMaxCount() > 0 && queueSize.getObjectCount() >= maxSize.getMaxCount()) { return true; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java index 7b3a21124a..84731f769e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java @@ -601,6 +601,11 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple adjustSize(-flowFiles.size(), -flowFiles.stream().mapToLong(FlowFileRecord::getSize).sum()); } + @Override + public boolean isLocalPartitionFull() { + return isFull(localPartition.size()); + } + /** * Determines which QueuePartition the given FlowFile belongs to. Must be called with partition read lock held. * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java index dda71de58d..f508d12fba 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java @@ -248,13 +248,15 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol { "not configured to allow for Load Balancing"); } + final LoadBalancedFlowFileQueue loadBalancedFlowFileQueue = (LoadBalancedFlowFileQueue) flowFileQueue; + final int spaceCheck = dataIn.read(); if (spaceCheck < 0) { throw new EOFException("Expected to receive a request to determine whether or not space was available for Connection with ID " + connectionId + " from Peer " + peerDescription); } if (spaceCheck == CHECK_SPACE) { - if (flowFileQueue.isFull()) { + if (loadBalancedFlowFileQueue.isLocalPartitionFull()) { logger.debug("Received a 'Check Space' request from Peer {} for Connection with ID {}; responding with QUEUE_FULL", peerDescription, connectionId); out.write(QUEUE_FULL); out.flush();