NIFI-8466: Resolving offload bug with Single Node load balanced queues

Signed-off-by: Nathan Gough <thenatog@gmail.com>

This closes #5025.
This commit is contained in:
Joe Gresock 2021-04-23 15:22:28 -04:00 committed by Nathan Gough
parent ecacfdaa4c
commit e19940ea7e
2 changed files with 51 additions and 16 deletions

View File

@ -1169,22 +1169,29 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
partitionWriteLock.lock();
try {
if (!offloaded) {
return;
}
switch (newState) {
case CONNECTED:
if (nodeId != null && nodeId.equals(clusterCoordinator.getLocalNodeIdentifier())) {
// the node with this queue was connected to the cluster, make sure the queue is not offloaded
resetOffloadedQueue();
}
break;
case OFFLOADED:
case OFFLOADING:
case DISCONNECTED:
case DISCONNECTING:
onNodeRemoved(nodeId);
break;
switch (newState) {
case OFFLOADING:
onNodeRemoved(nodeId);
break;
case CONNECTED:
onNodeAdded(nodeId);
break;
}
} else {
switch (newState) {
case CONNECTED:
if (nodeId != null && nodeId.equals(clusterCoordinator.getLocalNodeIdentifier())) {
// the node with this queue was connected to the cluster, make sure the queue is not offloaded
resetOffloadedQueue();
}
break;
case OFFLOADED:
case OFFLOADING:
case DISCONNECTED:
case DISCONNECTING:
onNodeRemoved(nodeId);
break;
}
}
} finally {
partitionWriteLock.unlock();

View File

@ -19,6 +19,7 @@ package org.apache.nifi.controller.queue.clustered;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.MockFlowFileRecord;
@ -428,6 +429,33 @@ public class TestSocketLoadBalancedFlowFileQueue {
}
}
@Test
public void testOffloadAndReconnectKeepsQueueInCorrectOrder() {
// Simulate FirstNodePartitioner, which always selects the first node in the partition queue
queue.setFlowFilePartitioner(new StaticFlowFilePartitioner(0));
QueuePartition firstPartition = queue.putAndGetPartition(new MockFlowFileRecord());
final NodeIdentifier node1Identifier = nodeIds.get(0);
final NodeIdentifier node2Identifier = nodeIds.get(1);
// The local node partition starts out first
Assert.assertEquals("local", firstPartition.getSwapPartitionName());
// Simulate offloading the first node
clusterTopologyEventListener.onNodeStateChange(node1Identifier, NodeConnectionState.OFFLOADING);
// Now the remote partition for the second node should be returned
firstPartition = queue.putAndGetPartition(new MockFlowFileRecord());
Assert.assertEquals(node2Identifier, firstPartition.getNodeIdentifier().get());
// Simulate reconnecting the first node
clusterTopologyEventListener.onNodeStateChange(node1Identifier, NodeConnectionState.CONNECTED);
// Now the local node partition is returned again
firstPartition = queue.putAndGetPartition(new MockFlowFileRecord());
Assert.assertEquals("local", firstPartition.getSwapPartitionName());
}
@Test(timeout = 30000)
public void testChangeInClusterTopologyTriggersRebalanceOnlyOnRemovedNodeIfNecessary() throws InterruptedException {