From e19940ea7ecff9b88b1fc2ac1aa5c1c8b3c3a777 Mon Sep 17 00:00:00 2001 From: Joe Gresock Date: Fri, 23 Apr 2021 15:22:28 -0400 Subject: [PATCH] NIFI-8466: Resolving offload bug with Single Node load balanced queues Signed-off-by: Nathan Gough This closes #5025. --- .../SocketLoadBalancedFlowFileQueue.java | 39 +++++++++++-------- .../TestSocketLoadBalancedFlowFileQueue.java | 28 +++++++++++++ 2 files changed, 51 insertions(+), 16 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java index 983fca258d..0c6c1d899e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java @@ -1169,22 +1169,29 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple partitionWriteLock.lock(); try { if (!offloaded) { - return; - } - - switch (newState) { - case CONNECTED: - if (nodeId != null && nodeId.equals(clusterCoordinator.getLocalNodeIdentifier())) { - // the node with this queue was connected to the cluster, make sure the queue is not offloaded - resetOffloadedQueue(); - } - break; - case OFFLOADED: - case OFFLOADING: - case DISCONNECTED: - case DISCONNECTING: - onNodeRemoved(nodeId); - break; + switch (newState) { + case OFFLOADING: + onNodeRemoved(nodeId); + break; + case CONNECTED: + onNodeAdded(nodeId); + break; + } + } else { + switch (newState) { + case CONNECTED: + if (nodeId != null && nodeId.equals(clusterCoordinator.getLocalNodeIdentifier())) { + // the node with this queue was connected to the cluster, make sure the queue is not offloaded + resetOffloadedQueue(); + } + break; + case OFFLOADED: + case OFFLOADING: + case DISCONNECTED: + case DISCONNECTING: + onNodeRemoved(nodeId); + break; + } } } finally { partitionWriteLock.unlock(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java index 89599599dd..0deb917331 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java @@ -19,6 +19,7 @@ package org.apache.nifi.controller.queue.clustered; import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener; +import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.MockFlowFileRecord; @@ -428,6 +429,33 @@ public class TestSocketLoadBalancedFlowFileQueue { } } + @Test + public void testOffloadAndReconnectKeepsQueueInCorrectOrder() { + // Simulate FirstNodePartitioner, which always selects the first node in the partition queue + queue.setFlowFilePartitioner(new StaticFlowFilePartitioner(0)); + + QueuePartition firstPartition = queue.putAndGetPartition(new MockFlowFileRecord()); + + final NodeIdentifier node1Identifier = nodeIds.get(0); + final NodeIdentifier node2Identifier = nodeIds.get(1); + + // The local node partition starts out first + Assert.assertEquals("local", firstPartition.getSwapPartitionName()); + + // Simulate offloading the first node + clusterTopologyEventListener.onNodeStateChange(node1Identifier, NodeConnectionState.OFFLOADING); + + // Now the remote partition for the second node should be returned + firstPartition = queue.putAndGetPartition(new MockFlowFileRecord()); + Assert.assertEquals(node2Identifier, firstPartition.getNodeIdentifier().get()); + + // Simulate reconnecting the first node + clusterTopologyEventListener.onNodeStateChange(node1Identifier, NodeConnectionState.CONNECTED); + + // Now the local node partition is returned again + firstPartition = queue.putAndGetPartition(new MockFlowFileRecord()); + Assert.assertEquals("local", firstPartition.getSwapPartitionName()); + } @Test(timeout = 30000) public void testChangeInClusterTopologyTriggersRebalanceOnlyOnRemovedNodeIfNecessary() throws InterruptedException {