NIFI-9017: Update Load Balanced Connection logic so that if a node connects to the cluster with a different load balancing hostname/port, it starts sending to the new endpoint instead of failing to send to the old endpoint (#5287)

Self-merging based on +1 feedback from multiple active community members who have reviewed & tested code
This commit is contained in:
markap14 2021-08-27 21:37:06 -04:00 committed by GitHub
parent e436381c3a
commit d90ef06752
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 158 additions and 9 deletions

View File

@ -3882,7 +3882,9 @@ by the `nifi.cluster.flow.election.max.candidates` property, the cluster will no
|`nifi.cluster.flow.election.max.candidates`|Specifies the number of Nodes required in the cluster to cause early election of Flows. This allows the Nodes in the cluster to avoid having to wait a |`nifi.cluster.flow.election.max.candidates`|Specifies the number of Nodes required in the cluster to cause early election of Flows. This allows the Nodes in the cluster to avoid having to wait a
long time before starting processing if we reach at least this number of nodes in the cluster. long time before starting processing if we reach at least this number of nodes in the cluster.
|`nifi.cluster.load.balance.port`|Specifies the port to listen on for incoming connections for load balancing data across the cluster. The default value is `6342`. |`nifi.cluster.load.balance.port`|Specifies the port to listen on for incoming connections for load balancing data across the cluster. The default value is `6342`.
|`nifi.cluster.load.balance.host`|Specifies the hostname to listen on for incoming connections for load balancing data across the cluster. If not specified, will default to the value used by the `nifi.cluster.node.address` property. |`nifi.cluster.load.balance.host`|Specifies the hostname to listen on for incoming connections for load balancing data across the cluster. If not specified, will default to the value used by the
`nifi.cluster.node.address` property. The value set here does not have to be a hostname/IP address that is addressable outside of the cluster. However, all nodes within the cluster must be able to
connect to the node using this hostname/IP address.
|`nifi.cluster.load.balance.connections.per.node`|The maximum number of connections to create between this node and each other node in the cluster. For example, if there are 5 nodes in the cluster and this value is set to 4, there will be up to 20 socket connections established for load-balancing purposes (5 x 4 = 20). The default value is `1`. |`nifi.cluster.load.balance.connections.per.node`|The maximum number of connections to create between this node and each other node in the cluster. For example, if there are 5 nodes in the cluster and this value is set to 4, there will be up to 20 socket connections established for load-balancing purposes (5 x 4 = 20). The default value is `1`.
|`nifi.cluster.load.balance.max.thread.count`|The maximum number of threads to use for transferring data from this node to other nodes in the cluster. While a given thread can only write to a single socket at a time, a single thread is capable of servicing multiple connections simultaneously because a given connection may not be available for reading/writing at any given time. The default value is `8`—i.e., up to 8 threads will be responsible for transferring data to other nodes, regardless of how many nodes are in the cluster. |`nifi.cluster.load.balance.max.thread.count`|The maximum number of threads to use for transferring data from this node to other nodes in the cluster. While a given thread can only write to a single socket at a time, a single thread is capable of servicing multiple connections simultaneously because a given connection may not be available for reading/writing at any given time. The default value is `8`—i.e., up to 8 threads will be responsible for transferring data to other nodes, regardless of how many nodes are in the cluster.

View File

@ -453,6 +453,50 @@ public class TestNodeClusterCoordinator {
assertEquals(conflictingId.getSocketPort(), conflictingNodeId.getSocketPort()); assertEquals(conflictingId.getSocketPort(), conflictingNodeId.getSocketPort());
} }
@Test
public void testAddNodeIdentifierWithSameAddressDifferentLoadBalanceEndpoint() {
// Add Node 1 to the cluster
final NodeIdentifier id1 = new NodeIdentifier("1234", "localhost", 8000, "localhost", 9000, "localhost", 10000, 11000, false);
final ConnectionRequest connectionRequest = new ConnectionRequest(id1, new StandardDataFlow(new byte[0], new byte[0], new byte[0], new HashSet<>()));
final ConnectionRequestMessage crm = new ConnectionRequestMessage();
crm.setConnectionRequest(connectionRequest);
final ProtocolMessage response = coordinator.handle(crm, Collections.emptySet());
assertNotNull(response);
assertTrue(response instanceof ConnectionResponseMessage);
final ConnectionResponseMessage responseMessage = (ConnectionResponseMessage) response;
final NodeIdentifier resolvedNodeId = responseMessage.getConnectionResponse().getNodeIdentifier();
assertEquals(id1, resolvedNodeId);
// Add in a conflicting ID
final NodeIdentifier conflictingId = new NodeIdentifier("1234", "localhost", 8001, "localhost", 9000, "loadbalance-2", 4848, "localhost", 10000, 11000, false, null);
final ConnectionRequest conRequest2 = new ConnectionRequest(conflictingId, new StandardDataFlow(new byte[0], new byte[0], new byte[0], new HashSet<>()));
final ConnectionRequestMessage crm2 = new ConnectionRequestMessage();
crm2.setConnectionRequest(conRequest2);
final ProtocolMessage conflictingResponse = coordinator.handle(crm2, Collections.emptySet());
assertNotNull(conflictingResponse);
assertTrue(conflictingResponse instanceof ConnectionResponseMessage);
final ConnectionResponseMessage conflictingResponseMessage = (ConnectionResponseMessage) conflictingResponse;
final NodeIdentifier conflictingNodeId = conflictingResponseMessage.getConnectionResponse().getNodeIdentifier();
assertEquals(id1.getId(), conflictingNodeId.getId());
assertEquals(conflictingId.getApiAddress(), conflictingNodeId.getApiAddress());
assertEquals(conflictingId.getApiPort(), conflictingNodeId.getApiPort());
assertEquals(conflictingId.getSiteToSiteAddress(), conflictingNodeId.getSiteToSiteAddress());
assertEquals(conflictingId.getSiteToSitePort(), conflictingNodeId.getSiteToSitePort());
assertEquals(conflictingId.getSocketAddress(), conflictingNodeId.getSocketAddress());
assertEquals(conflictingId.getSocketPort(), conflictingNodeId.getSocketPort());
// Ensure that the values were updated
final Set<NodeIdentifier> registeredNodeIds = coordinator.getNodeIdentifiers();
assertEquals(1, registeredNodeIds.size());
final NodeIdentifier registeredId = registeredNodeIds.iterator().next();
assertEquals("loadbalance-2", registeredId.getLoadBalanceAddress());
assertEquals(4848, registeredId.getLoadBalancePort());
}
private NodeIdentifier createNodeId(final int index) { private NodeIdentifier createNodeId(final int index) {
return new NodeIdentifier(String.valueOf(index), "localhost", 8000 + index, "localhost", 9000 + index, "localhost", 10000 + index, 11000 + index, false); return new NodeIdentifier(String.valueOf(index), "localhost", 8000 + index, "localhost", 9000 + index, "localhost", 10000 + index, 11000 + index, false);
} }

View File

@ -85,6 +85,7 @@ import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
@ -95,6 +96,9 @@ import java.util.stream.Collectors;
public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue implements LoadBalancedFlowFileQueue { public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue implements LoadBalancedFlowFileQueue {
private static final Logger logger = LoggerFactory.getLogger(SocketLoadBalancedFlowFileQueue.class); private static final Logger logger = LoggerFactory.getLogger(SocketLoadBalancedFlowFileQueue.class);
private static final int NODE_SWAP_THRESHOLD = 1000; private static final int NODE_SWAP_THRESHOLD = 1000;
private static final Comparator<NodeIdentifier> loadBalanceEndpointComparator =
Comparator.comparing(NodeIdentifier::getLoadBalanceAddress)
.thenComparing(NodeIdentifier::getLoadBalancePort);
private final List<FlowFilePrioritizer> prioritizers = new ArrayList<>(); private final List<FlowFilePrioritizer> prioritizers = new ArrayList<>();
private final ConnectionEventListener eventListener; private final ConnectionEventListener eventListener;
@ -139,7 +143,11 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
rebalancingPartition = new StandardRebalancingPartition(swapManager, swapThreshold, eventReporter, this, this::drop); rebalancingPartition = new StandardRebalancingPartition(swapManager, swapThreshold, eventReporter, this, this::drop);
// Create a RemoteQueuePartition for each node // Create a RemoteQueuePartition for each node
nodeIdentifiers = clusterCoordinator == null ? Collections.emptySet() : clusterCoordinator.getNodeIdentifiers(); nodeIdentifiers = clusterCoordinator == null ? Collections.emptySet() : new TreeSet<>(loadBalanceEndpointComparator);
if (clusterCoordinator != null) {
nodeIdentifiers.addAll(clusterCoordinator.getNodeIdentifiers());
}
final List<NodeIdentifier> sortedNodeIdentifiers = new ArrayList<>(nodeIdentifiers); final List<NodeIdentifier> sortedNodeIdentifiers = new ArrayList<>(nodeIdentifiers);
sortedNodeIdentifiers.sort(Comparator.comparing(NodeIdentifier::getApiAddress)); sortedNodeIdentifiers.sort(Comparator.comparing(NodeIdentifier::getApiAddress));
@ -656,16 +664,19 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
} }
// Determine which Node Identifiers, if any, were removed. // Determine which Node Identifiers, if any, were removed.
final Set<NodeIdentifier> removedNodeIds = new HashSet<>(this.nodeIdentifiers); final Set<NodeIdentifier> removedNodeIds = new TreeSet<>(loadBalanceEndpointComparator);
removedNodeIds.addAll(this.nodeIdentifiers);
removedNodeIds.removeAll(updatedNodeIdentifiers); removedNodeIds.removeAll(updatedNodeIdentifiers);
logger.debug("{} The following Node Identifiers were removed from the cluster: {}", this, removedNodeIds); logger.debug("{} The following Node Identifiers were removed from the cluster: {}", this, removedNodeIds);
final Function<NodeIdentifier, String> mapKeyTransform = nodeId -> nodeId.getLoadBalanceAddress() + ":" + nodeId.getLoadBalancePort();
// Build up a Map of Node ID to Queue Partition so that we can easily pull over the existing // Build up a Map of Node ID to Queue Partition so that we can easily pull over the existing
// QueuePartition objects instead of having to create new ones. // QueuePartition objects instead of having to create new ones.
final Map<NodeIdentifier, QueuePartition> partitionMap = new HashMap<>(); final Map<String, QueuePartition> partitionMap = new HashMap<>();
for (final QueuePartition partition : this.queuePartitions) { for (final QueuePartition partition : this.queuePartitions) {
final Optional<NodeIdentifier> nodeIdOption = partition.getNodeIdentifier(); final Optional<NodeIdentifier> nodeIdOption = partition.getNodeIdentifier();
nodeIdOption.ifPresent(nodeIdentifier -> partitionMap.put(nodeIdentifier, partition)); nodeIdOption.ifPresent(nodeIdentifier -> partitionMap.put(mapKeyTransform.apply(nodeIdentifier), partition));
} }
// Re-define 'queuePartitions' array // Re-define 'queuePartitions' array
@ -683,13 +694,15 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
boolean localPartitionIncluded = false; boolean localPartitionIncluded = false;
for (int i = 0; i < sortedNodeIdentifiers.size(); i++) { for (int i = 0; i < sortedNodeIdentifiers.size(); i++) {
final NodeIdentifier nodeId = sortedNodeIdentifiers.get(i); final NodeIdentifier nodeId = sortedNodeIdentifiers.get(i);
final String nodeIdMapKey = mapKeyTransform.apply(nodeId);
if (nodeId.equals(clusterCoordinator.getLocalNodeIdentifier())) { if (nodeId.equals(clusterCoordinator.getLocalNodeIdentifier())) {
updatedQueuePartitions[i] = localPartition; updatedQueuePartitions[i] = localPartition;
localPartitionIncluded = true; localPartitionIncluded = true;
// If we have RemoteQueuePartition with this Node ID with data, that data must be migrated to the local partition. // If we have RemoteQueuePartition with this Node ID with data, that data must be migrated to the local partition.
// This can happen if we didn't previously know our Node UUID. // This can happen if we didn't previously know our Node UUID.
final QueuePartition existingPartition = partitionMap.get(nodeId); final QueuePartition existingPartition = partitionMap.get(nodeIdMapKey);
if (existingPartition != null && existingPartition != localPartition) { if (existingPartition != null && existingPartition != localPartition) {
final FlowFileQueueContents partitionContents = existingPartition.packageForRebalance(localPartition.getSwapPartitionName()); final FlowFileQueueContents partitionContents = existingPartition.packageForRebalance(localPartition.getSwapPartitionName());
logger.debug("Transferred data from {} to {}", existingPartition, localPartition); logger.debug("Transferred data from {} to {}", existingPartition, localPartition);
@ -699,7 +712,7 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
continue; continue;
} }
final QueuePartition existingPartition = partitionMap.get(nodeId); final QueuePartition existingPartition = partitionMap.get(nodeIdMapKey);
updatedQueuePartitions[i] = existingPartition == null ? createRemotePartition(nodeId) : existingPartition; updatedQueuePartitions[i] = existingPartition == null ? createRemotePartition(nodeId) : existingPartition;
} }
@ -721,7 +734,9 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
// Not all partitions need to be rebalanced, so just ensure that we rebalance any FlowFiles that are destined // Not all partitions need to be rebalanced, so just ensure that we rebalance any FlowFiles that are destined
// for a node that is no longer in the cluster. // for a node that is no longer in the cluster.
for (final NodeIdentifier removedNodeId : removedNodeIds) { for (final NodeIdentifier removedNodeId : removedNodeIds) {
final QueuePartition removedPartition = partitionMap.get(removedNodeId); final String removedNodeMapKey = mapKeyTransform.apply(removedNodeId);
final QueuePartition removedPartition = partitionMap.get(removedNodeMapKey);
if (removedPartition == null) { if (removedPartition == null) {
continue; continue;
} }
@ -733,7 +748,9 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
// Unregister any client for which the node was removed from the cluster // Unregister any client for which the node was removed from the cluster
for (final NodeIdentifier removedNodeId : removedNodeIds) { for (final NodeIdentifier removedNodeId : removedNodeIds) {
final QueuePartition removedPartition = partitionMap.get(removedNodeId); final String removedNodeMapKey = mapKeyTransform.apply(removedNodeId);
final QueuePartition removedPartition = partitionMap.get(removedNodeMapKey);
if (removedPartition instanceof RemoteQueuePartition) { if (removedPartition instanceof RemoteQueuePartition) {
((RemoteQueuePartition) removedPartition).onRemoved(); ((RemoteQueuePartition) removedPartition).onRemoved();
} }
@ -1089,7 +1106,16 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
public void onNodeAdded(final NodeIdentifier nodeId) { public void onNodeAdded(final NodeIdentifier nodeId) {
partitionWriteLock.lock(); partitionWriteLock.lock();
try { try {
if (nodeIdentifiers.contains(nodeId)) {
logger.debug("Node Identifier {} added to cluster but already known in set: {}", nodeId, nodeIdentifiers);
return;
}
final Set<NodeIdentifier> updatedNodeIds = new HashSet<>(nodeIdentifiers); final Set<NodeIdentifier> updatedNodeIds = new HashSet<>(nodeIdentifiers);
// If there is any Node Identifier already that has the same identifier as the new one, remove it. This allows us to ensure that we
// have the correct Node Identifier in terms of Load Balancing host/port, even if the newly connected node changed its load balancing host/port
updatedNodeIds.removeIf(id -> id.getId().equals(nodeId.getId()));
updatedNodeIds.add(nodeId); updatedNodeIds.add(nodeId);
logger.debug("Node Identifier {} added to cluster. Node ID's changing from {} to {}", nodeId, nodeIdentifiers, updatedNodeIds); logger.debug("Node Identifier {} added to cluster. Node ID's changing from {} to {}", nodeId, nodeIdentifiers, updatedNodeIds);

View File

@ -19,6 +19,7 @@ package org.apache.nifi.tests.system.loadbalance;
import org.apache.nifi.controller.queue.LoadBalanceCompression; import org.apache.nifi.controller.queue.LoadBalanceCompression;
import org.apache.nifi.controller.queue.LoadBalanceStrategy; import org.apache.nifi.controller.queue.LoadBalanceStrategy;
import org.apache.nifi.scheduling.ExecutionNode; import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.tests.system.NiFiInstance;
import org.apache.nifi.tests.system.NiFiInstanceFactory; import org.apache.nifi.tests.system.NiFiInstanceFactory;
import org.apache.nifi.tests.system.NiFiSystemIT; import org.apache.nifi.tests.system.NiFiSystemIT;
import org.apache.nifi.tests.system.SpawnedClusterNiFiInstanceFactory; import org.apache.nifi.tests.system.SpawnedClusterNiFiInstanceFactory;
@ -335,4 +336,80 @@ public class LoadBalanceIT extends NiFiSystemIT {
return stats.getMin() == stats.getMax(); return stats.getMin() == stats.getMax();
} }
@Test
public void testRoundRobinWithRestartAndPortChange() throws NiFiClientException, IOException, InterruptedException {
ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile");
final ProcessorEntity count = getClientUtil().createProcessor("CountEvents");
final ConnectionEntity connection = getClientUtil().createConnection(generate, count, "success");
getClientUtil().setAutoTerminatedRelationships(count, "success");
// Configure Processor to generate 20 FlowFiles, each 1 MB and run on Primary Node.
final Map<String, String> generateProperties = new HashMap<>();
generateProperties.put("File Size", "1 MB");
generateProperties.put("Batch Size", "20");
getClientUtil().updateProcessorProperties(generate, generateProperties);
getClientUtil().updateProcessorExecutionNode(generate, ExecutionNode.PRIMARY);
// Round Robin between nodes. This should result in 10 FlowFiles on each node.
getClientUtil().updateConnectionLoadBalancing(connection, LoadBalanceStrategy.ROUND_ROBIN, LoadBalanceCompression.DO_NOT_COMPRESS, null);
// Generate the data.
getNifiClient().getProcessorClient().startProcessor(generate);
// Wait until all 20 FlowFiles are queued up.
waitFor(() -> {
final ConnectionStatusEntity statusEntity = getConnectionStatus(connection.getId());
return statusEntity.getConnectionStatus().getAggregateSnapshot().getFlowFilesQueued() == 20;
});
// Wait until load balancing is complete
waitFor(() -> isConnectionDoneLoadBalancing(connection.getId()));
// Ensure that the FlowFiles are evenly distributed between the nodes.
final ConnectionStatusEntity statusEntity = getConnectionStatus(connection.getId());
assertTrue(isEvenlyDistributed(statusEntity));
assertEquals(20, getQueueSize(connection.getId()));
assertEquals(20 * 1024 * 1024, getQueueBytes(connection.getId()));
getNifiClient().getProcessorClient().stopProcessor(generate);
// Empty the queue because on restart, Node 2 will rebalance all of its data using the Load-Balance strategy, and we don't want
// the data to start out lopsided.
getClientUtil().emptyQueue(connection.getId());
final NiFiInstance instance2 = this.getNiFiInstance().getNodeInstance(2);
instance2.stop();
final Map<String, String> updatedLoadBalanceProperties = new HashMap<>();
updatedLoadBalanceProperties.put("nifi.cluster.load.balance.host", "127.0.0.1");
updatedLoadBalanceProperties.put("nifi.cluster.load.balance.port", "7676");
instance2.setProperties(updatedLoadBalanceProperties);
instance2.start(true);
waitForAllNodesConnected();
// Generate the data again
generate = getNifiClient().getProcessorClient().getProcessor(generate.getId());
getNifiClient().getProcessorClient().startProcessor(generate);
// Wait until all 20 FlowFiles are queued up
waitFor(() -> {
final ConnectionStatusEntity secondRoundStatusEntity = getConnectionStatus(connection.getId());
return secondRoundStatusEntity.getConnectionStatus().getAggregateSnapshot().getFlowFilesQueued() == 20;
});
// Wait until load balancing is complete
waitFor(() -> isConnectionDoneLoadBalancing(connection.getId()));
// Ensure that the FlowFiles are evenly distributed between the nodes.
final ConnectionStatusEntity afterSecondDataGenerationStatusEntity = getConnectionStatus(connection.getId());
assertTrue(isEvenlyDistributed(afterSecondDataGenerationStatusEntity));
assertEquals(20, getQueueSize(connection.getId()));
assertEquals(20 * 1024 * 1024, getQueueBytes(connection.getId()));
}
} }