diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc index 1bb54a5871..46f25aed7d 100644 --- a/nifi-docs/src/main/asciidoc/administration-guide.adoc +++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc @@ -3882,7 +3882,9 @@ by the `nifi.cluster.flow.election.max.candidates` property, the cluster will no |`nifi.cluster.flow.election.max.candidates`|Specifies the number of Nodes required in the cluster to cause early election of Flows. This allows the Nodes in the cluster to avoid having to wait a long time before starting processing if we reach at least this number of nodes in the cluster. |`nifi.cluster.load.balance.port`|Specifies the port to listen on for incoming connections for load balancing data across the cluster. The default value is `6342`. -|`nifi.cluster.load.balance.host`|Specifies the hostname to listen on for incoming connections for load balancing data across the cluster. If not specified, will default to the value used by the `nifi.cluster.node.address` property. +|`nifi.cluster.load.balance.host`|Specifies the hostname to listen on for incoming connections for load balancing data across the cluster. If not specified, will default to the value used by the +`nifi.cluster.node.address` property. The value set here does not have to be a hostname/IP address that is addressable outside of the cluster. However, all nodes within the cluster must be able to +connect to the node using this hostname/IP address. |`nifi.cluster.load.balance.connections.per.node`|The maximum number of connections to create between this node and each other node in the cluster. For example, if there are 5 nodes in the cluster and this value is set to 4, there will be up to 20 socket connections established for load-balancing purposes (5 x 4 = 20). The default value is `1`. |`nifi.cluster.load.balance.max.thread.count`|The maximum number of threads to use for transferring data from this node to other nodes in the cluster. While a given thread can only write to a single socket at a time, a single thread is capable of servicing multiple connections simultaneously because a given connection may not be available for reading/writing at any given time. The default value is `8`—i.e., up to 8 threads will be responsible for transferring data to other nodes, regardless of how many nodes are in the cluster. diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java index 0d7b18370e..511620c461 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java @@ -453,6 +453,50 @@ public class TestNodeClusterCoordinator { assertEquals(conflictingId.getSocketPort(), conflictingNodeId.getSocketPort()); } + @Test + public void testAddNodeIdentifierWithSameAddressDifferentLoadBalanceEndpoint() { + // Add Node 1 to the cluster + final NodeIdentifier id1 = new NodeIdentifier("1234", "localhost", 8000, "localhost", 9000, "localhost", 10000, 11000, false); + + final ConnectionRequest connectionRequest = new ConnectionRequest(id1, new StandardDataFlow(new byte[0], new byte[0], new byte[0], new HashSet<>())); + final ConnectionRequestMessage crm = new ConnectionRequestMessage(); + crm.setConnectionRequest(connectionRequest); + + final ProtocolMessage response = coordinator.handle(crm, Collections.emptySet()); + assertNotNull(response); + assertTrue(response instanceof ConnectionResponseMessage); + final ConnectionResponseMessage responseMessage = (ConnectionResponseMessage) response; + final NodeIdentifier resolvedNodeId = responseMessage.getConnectionResponse().getNodeIdentifier(); + assertEquals(id1, resolvedNodeId); + + // Add in a conflicting ID + final NodeIdentifier conflictingId = new NodeIdentifier("1234", "localhost", 8001, "localhost", 9000, "loadbalance-2", 4848, "localhost", 10000, 11000, false, null); + final ConnectionRequest conRequest2 = new ConnectionRequest(conflictingId, new StandardDataFlow(new byte[0], new byte[0], new byte[0], new HashSet<>())); + final ConnectionRequestMessage crm2 = new ConnectionRequestMessage(); + crm2.setConnectionRequest(conRequest2); + + final ProtocolMessage conflictingResponse = coordinator.handle(crm2, Collections.emptySet()); + assertNotNull(conflictingResponse); + assertTrue(conflictingResponse instanceof ConnectionResponseMessage); + final ConnectionResponseMessage conflictingResponseMessage = (ConnectionResponseMessage) conflictingResponse; + final NodeIdentifier conflictingNodeId = conflictingResponseMessage.getConnectionResponse().getNodeIdentifier(); + assertEquals(id1.getId(), conflictingNodeId.getId()); + assertEquals(conflictingId.getApiAddress(), conflictingNodeId.getApiAddress()); + assertEquals(conflictingId.getApiPort(), conflictingNodeId.getApiPort()); + assertEquals(conflictingId.getSiteToSiteAddress(), conflictingNodeId.getSiteToSiteAddress()); + assertEquals(conflictingId.getSiteToSitePort(), conflictingNodeId.getSiteToSitePort()); + assertEquals(conflictingId.getSocketAddress(), conflictingNodeId.getSocketAddress()); + assertEquals(conflictingId.getSocketPort(), conflictingNodeId.getSocketPort()); + + // Ensure that the values were updated + final Set registeredNodeIds = coordinator.getNodeIdentifiers(); + assertEquals(1, registeredNodeIds.size()); + + final NodeIdentifier registeredId = registeredNodeIds.iterator().next(); + assertEquals("loadbalance-2", registeredId.getLoadBalanceAddress()); + assertEquals(4848, registeredId.getLoadBalancePort()); + } + private NodeIdentifier createNodeId(final int index) { return new NodeIdentifier(String.valueOf(index), "localhost", 8000 + index, "localhost", 9000 + index, "localhost", 10000 + index, 11000 + index, false); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java index 0c6c1d899e..1a1e187536 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java @@ -85,6 +85,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -95,6 +96,9 @@ import java.util.stream.Collectors; public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue implements LoadBalancedFlowFileQueue { private static final Logger logger = LoggerFactory.getLogger(SocketLoadBalancedFlowFileQueue.class); private static final int NODE_SWAP_THRESHOLD = 1000; + private static final Comparator loadBalanceEndpointComparator = + Comparator.comparing(NodeIdentifier::getLoadBalanceAddress) + .thenComparing(NodeIdentifier::getLoadBalancePort); private final List prioritizers = new ArrayList<>(); private final ConnectionEventListener eventListener; @@ -139,7 +143,11 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple rebalancingPartition = new StandardRebalancingPartition(swapManager, swapThreshold, eventReporter, this, this::drop); // Create a RemoteQueuePartition for each node - nodeIdentifiers = clusterCoordinator == null ? Collections.emptySet() : clusterCoordinator.getNodeIdentifiers(); + nodeIdentifiers = clusterCoordinator == null ? Collections.emptySet() : new TreeSet<>(loadBalanceEndpointComparator); + + if (clusterCoordinator != null) { + nodeIdentifiers.addAll(clusterCoordinator.getNodeIdentifiers()); + } final List sortedNodeIdentifiers = new ArrayList<>(nodeIdentifiers); sortedNodeIdentifiers.sort(Comparator.comparing(NodeIdentifier::getApiAddress)); @@ -656,16 +664,19 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple } // Determine which Node Identifiers, if any, were removed. - final Set removedNodeIds = new HashSet<>(this.nodeIdentifiers); + final Set removedNodeIds = new TreeSet<>(loadBalanceEndpointComparator); + removedNodeIds.addAll(this.nodeIdentifiers); removedNodeIds.removeAll(updatedNodeIdentifiers); logger.debug("{} The following Node Identifiers were removed from the cluster: {}", this, removedNodeIds); + final Function mapKeyTransform = nodeId -> nodeId.getLoadBalanceAddress() + ":" + nodeId.getLoadBalancePort(); + // Build up a Map of Node ID to Queue Partition so that we can easily pull over the existing // QueuePartition objects instead of having to create new ones. - final Map partitionMap = new HashMap<>(); + final Map partitionMap = new HashMap<>(); for (final QueuePartition partition : this.queuePartitions) { final Optional nodeIdOption = partition.getNodeIdentifier(); - nodeIdOption.ifPresent(nodeIdentifier -> partitionMap.put(nodeIdentifier, partition)); + nodeIdOption.ifPresent(nodeIdentifier -> partitionMap.put(mapKeyTransform.apply(nodeIdentifier), partition)); } // Re-define 'queuePartitions' array @@ -683,13 +694,15 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple boolean localPartitionIncluded = false; for (int i = 0; i < sortedNodeIdentifiers.size(); i++) { final NodeIdentifier nodeId = sortedNodeIdentifiers.get(i); + final String nodeIdMapKey = mapKeyTransform.apply(nodeId); + if (nodeId.equals(clusterCoordinator.getLocalNodeIdentifier())) { updatedQueuePartitions[i] = localPartition; localPartitionIncluded = true; // If we have RemoteQueuePartition with this Node ID with data, that data must be migrated to the local partition. // This can happen if we didn't previously know our Node UUID. - final QueuePartition existingPartition = partitionMap.get(nodeId); + final QueuePartition existingPartition = partitionMap.get(nodeIdMapKey); if (existingPartition != null && existingPartition != localPartition) { final FlowFileQueueContents partitionContents = existingPartition.packageForRebalance(localPartition.getSwapPartitionName()); logger.debug("Transferred data from {} to {}", existingPartition, localPartition); @@ -699,7 +712,7 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple continue; } - final QueuePartition existingPartition = partitionMap.get(nodeId); + final QueuePartition existingPartition = partitionMap.get(nodeIdMapKey); updatedQueuePartitions[i] = existingPartition == null ? createRemotePartition(nodeId) : existingPartition; } @@ -721,7 +734,9 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple // Not all partitions need to be rebalanced, so just ensure that we rebalance any FlowFiles that are destined // for a node that is no longer in the cluster. for (final NodeIdentifier removedNodeId : removedNodeIds) { - final QueuePartition removedPartition = partitionMap.get(removedNodeId); + final String removedNodeMapKey = mapKeyTransform.apply(removedNodeId); + + final QueuePartition removedPartition = partitionMap.get(removedNodeMapKey); if (removedPartition == null) { continue; } @@ -733,7 +748,9 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple // Unregister any client for which the node was removed from the cluster for (final NodeIdentifier removedNodeId : removedNodeIds) { - final QueuePartition removedPartition = partitionMap.get(removedNodeId); + final String removedNodeMapKey = mapKeyTransform.apply(removedNodeId); + + final QueuePartition removedPartition = partitionMap.get(removedNodeMapKey); if (removedPartition instanceof RemoteQueuePartition) { ((RemoteQueuePartition) removedPartition).onRemoved(); } @@ -1089,7 +1106,16 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple public void onNodeAdded(final NodeIdentifier nodeId) { partitionWriteLock.lock(); try { + if (nodeIdentifiers.contains(nodeId)) { + logger.debug("Node Identifier {} added to cluster but already known in set: {}", nodeId, nodeIdentifiers); + return; + } + final Set updatedNodeIds = new HashSet<>(nodeIdentifiers); + + // If there is any Node Identifier already that has the same identifier as the new one, remove it. This allows us to ensure that we + // have the correct Node Identifier in terms of Load Balancing host/port, even if the newly connected node changed its load balancing host/port + updatedNodeIds.removeIf(id -> id.getId().equals(nodeId.getId())); updatedNodeIds.add(nodeId); logger.debug("Node Identifier {} added to cluster. Node ID's changing from {} to {}", nodeId, nodeIdentifiers, updatedNodeIds); diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java index a0e93aa07b..04716b9beb 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java @@ -19,6 +19,7 @@ package org.apache.nifi.tests.system.loadbalance; import org.apache.nifi.controller.queue.LoadBalanceCompression; import org.apache.nifi.controller.queue.LoadBalanceStrategy; import org.apache.nifi.scheduling.ExecutionNode; +import org.apache.nifi.tests.system.NiFiInstance; import org.apache.nifi.tests.system.NiFiInstanceFactory; import org.apache.nifi.tests.system.NiFiSystemIT; import org.apache.nifi.tests.system.SpawnedClusterNiFiInstanceFactory; @@ -335,4 +336,80 @@ public class LoadBalanceIT extends NiFiSystemIT { return stats.getMin() == stats.getMax(); } + + @Test + public void testRoundRobinWithRestartAndPortChange() throws NiFiClientException, IOException, InterruptedException { + ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile"); + final ProcessorEntity count = getClientUtil().createProcessor("CountEvents"); + + final ConnectionEntity connection = getClientUtil().createConnection(generate, count, "success"); + getClientUtil().setAutoTerminatedRelationships(count, "success"); + + // Configure Processor to generate 20 FlowFiles, each 1 MB and run on Primary Node. + final Map generateProperties = new HashMap<>(); + generateProperties.put("File Size", "1 MB"); + generateProperties.put("Batch Size", "20"); + getClientUtil().updateProcessorProperties(generate, generateProperties); + getClientUtil().updateProcessorExecutionNode(generate, ExecutionNode.PRIMARY); + + // Round Robin between nodes. This should result in 10 FlowFiles on each node. + getClientUtil().updateConnectionLoadBalancing(connection, LoadBalanceStrategy.ROUND_ROBIN, LoadBalanceCompression.DO_NOT_COMPRESS, null); + + // Generate the data. + getNifiClient().getProcessorClient().startProcessor(generate); + + // Wait until all 20 FlowFiles are queued up. + waitFor(() -> { + final ConnectionStatusEntity statusEntity = getConnectionStatus(connection.getId()); + return statusEntity.getConnectionStatus().getAggregateSnapshot().getFlowFilesQueued() == 20; + }); + + // Wait until load balancing is complete + waitFor(() -> isConnectionDoneLoadBalancing(connection.getId())); + + // Ensure that the FlowFiles are evenly distributed between the nodes. + final ConnectionStatusEntity statusEntity = getConnectionStatus(connection.getId()); + assertTrue(isEvenlyDistributed(statusEntity)); + + assertEquals(20, getQueueSize(connection.getId())); + assertEquals(20 * 1024 * 1024, getQueueBytes(connection.getId())); + + getNifiClient().getProcessorClient().stopProcessor(generate); + + // Empty the queue because on restart, Node 2 will rebalance all of its data using the Load-Balance strategy, and we don't want + // the data to start out lopsided. + getClientUtil().emptyQueue(connection.getId()); + + final NiFiInstance instance2 = this.getNiFiInstance().getNodeInstance(2); + instance2.stop(); + + final Map updatedLoadBalanceProperties = new HashMap<>(); + updatedLoadBalanceProperties.put("nifi.cluster.load.balance.host", "127.0.0.1"); + updatedLoadBalanceProperties.put("nifi.cluster.load.balance.port", "7676"); + instance2.setProperties(updatedLoadBalanceProperties); + + instance2.start(true); + waitForAllNodesConnected(); + + // Generate the data again + generate = getNifiClient().getProcessorClient().getProcessor(generate.getId()); + getNifiClient().getProcessorClient().startProcessor(generate); + + // Wait until all 20 FlowFiles are queued up + waitFor(() -> { + final ConnectionStatusEntity secondRoundStatusEntity = getConnectionStatus(connection.getId()); + return secondRoundStatusEntity.getConnectionStatus().getAggregateSnapshot().getFlowFilesQueued() == 20; + }); + + // Wait until load balancing is complete + waitFor(() -> isConnectionDoneLoadBalancing(connection.getId())); + + // Ensure that the FlowFiles are evenly distributed between the nodes. + final ConnectionStatusEntity afterSecondDataGenerationStatusEntity = getConnectionStatus(connection.getId()); + assertTrue(isEvenlyDistributed(afterSecondDataGenerationStatusEntity)); + + assertEquals(20, getQueueSize(connection.getId())); + assertEquals(20 * 1024 * 1024, getQueueBytes(connection.getId())); + } + }