NIFI-5745: When determining if backpressure should be applied across nodes for load balancing, only consider if the local partition has reached the threshold limits instead of considering the size of the entire queue

This closes #3108.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Mark Payne 2018-10-24 13:06:57 -04:00 committed by Koji Kawamura
parent c7ff2fc5db
commit 234ddb0fe1
4 changed files with 18 additions and 2 deletions

View File

@ -66,4 +66,10 @@ public interface LoadBalancedFlowFileQueue extends FlowFileQueue {
*/ */
boolean isPropagateBackpressureAcrossNodes(); boolean isPropagateBackpressureAcrossNodes();
/**
* Determines whether or not the local partition's size >= backpressure threshold
*
* @return <code>true</code> if the number of FlowFiles or total size of FlowFiles in the local partition alone meets or exceeds the backpressure threshold, <code>false</code> otherwise.
*/
boolean isLocalPartitionFull();
} }

View File

@ -141,6 +141,10 @@ public abstract class AbstractFlowFileQueue implements FlowFileQueue {
@Override @Override
public boolean isFull() { public boolean isFull() {
return isFull(size());
}
protected boolean isFull(final QueueSize queueSize) {
final MaxQueueSize maxSize = getMaxQueueSize(); final MaxQueueSize maxSize = getMaxQueueSize();
// Check if max size is set // Check if max size is set
@ -148,7 +152,6 @@ public abstract class AbstractFlowFileQueue implements FlowFileQueue {
return false; return false;
} }
final QueueSize queueSize = size();
if (maxSize.getMaxCount() > 0 && queueSize.getObjectCount() >= maxSize.getMaxCount()) { if (maxSize.getMaxCount() > 0 && queueSize.getObjectCount() >= maxSize.getMaxCount()) {
return true; return true;
} }

View File

@ -601,6 +601,11 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
adjustSize(-flowFiles.size(), -flowFiles.stream().mapToLong(FlowFileRecord::getSize).sum()); adjustSize(-flowFiles.size(), -flowFiles.stream().mapToLong(FlowFileRecord::getSize).sum());
} }
@Override
public boolean isLocalPartitionFull() {
return isFull(localPartition.size());
}
/** /**
* Determines which QueuePartition the given FlowFile belongs to. Must be called with partition read lock held. * Determines which QueuePartition the given FlowFile belongs to. Must be called with partition read lock held.
* *

View File

@ -248,13 +248,15 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol {
"not configured to allow for Load Balancing"); "not configured to allow for Load Balancing");
} }
final LoadBalancedFlowFileQueue loadBalancedFlowFileQueue = (LoadBalancedFlowFileQueue) flowFileQueue;
final int spaceCheck = dataIn.read(); final int spaceCheck = dataIn.read();
if (spaceCheck < 0) { if (spaceCheck < 0) {
throw new EOFException("Expected to receive a request to determine whether or not space was available for Connection with ID " + connectionId + " from Peer " + peerDescription); throw new EOFException("Expected to receive a request to determine whether or not space was available for Connection with ID " + connectionId + " from Peer " + peerDescription);
} }
if (spaceCheck == CHECK_SPACE) { if (spaceCheck == CHECK_SPACE) {
if (flowFileQueue.isFull()) { if (loadBalancedFlowFileQueue.isLocalPartitionFull()) {
logger.debug("Received a 'Check Space' request from Peer {} for Connection with ID {}; responding with QUEUE_FULL", peerDescription, connectionId); logger.debug("Received a 'Check Space' request from Peer {} for Connection with ID {}; responding with QUEUE_FULL", peerDescription, connectionId);
out.write(QUEUE_FULL); out.write(QUEUE_FULL);
out.flush(); out.flush();