diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java index 84731f769e..353af49f2c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java @@ -769,8 +769,26 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple putAll(flowFiles); } else { logger.debug("Received the following FlowFiles from Peer: {}. Will accept FlowFiles to the local partition", flowFiles); - localPartition.putAll(flowFiles); + + // As explained in the putAllAndGetPartitions() method, we must ensure that we call adjustSize() before we + // put the FlowFiles on the queue. Otherwise, we will encounter a race condition. Specifically, that race condition + // can play out like so: + // + // Thread 1: Call localPartition.putAll() when the queue is empty (has a queue size of 0) but has not yet adjusted the size. + // Thread 2: Call poll() to obtain the FlowFile just received. + // Thread 2: Transfer the FlowFile to some Relationship + // Thread 2: Commit the session, which will call acknowledge on this queue. + // Thread 2: The acknowledge() method attempts to decrement the size of the queue to -1. + // This causes an Exception to be thrown and the queue size to remain at 0. + // However, the FlowFile has already been successfully transferred to the next Queue. + // Thread 1: Call adjustSize() to increment the size of the queue to 1 FlowFile. + // + // In this scenario, we now have no FlowFiles in the queue. However, the queue size is set to 1. + // We can avoid this race condition by simply ensuring that we call adjustSize() before making the FlowFiles + // available on the queue. This way, we cannot possibly obtain the FlowFiles and process/acknowledge them before the queue + // size has been updated to account for them and therefore we will not attempt to assign a negative queue size. adjustSize(flowFiles.size(), flowFiles.stream().mapToLong(FlowFileRecord::getSize).sum()); + localPartition.putAll(flowFiles); } } finally { partitionReadLock.unlock();