diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterTopologyEventListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterTopologyEventListener.java index ad9be3d738..d31339b91a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterTopologyEventListener.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterTopologyEventListener.java @@ -17,15 +17,16 @@ package org.apache.nifi.cluster.coordination; +import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.cluster.protocol.NodeIdentifier; public interface ClusterTopologyEventListener { void onNodeAdded(NodeIdentifier nodeId); - void onNodeOffloaded(NodeIdentifier nodeId); - void onNodeRemoved(NodeIdentifier nodeId); void onLocalNodeIdentifierSet(NodeIdentifier localNodeId); + + void onNodeStateChange(NodeIdentifier nodeId, NodeConnectionState newState); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java index e165041775..5b90e76250 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java @@ -341,6 +341,8 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl final NodeConnectionStatus evictedStatus = nodeStatuses.put(nodeId, updatedStatus); if (evictedStatus == null) { onNodeAdded(nodeId, storeState); + } else { + onNodeStateChange(nodeId, updatedStatus.getState()); } return evictedStatus; @@ -359,6 +361,10 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl updated = nodeStatuses.replace(nodeId, expectedStatus, updatedStatus); } + if (updated) { + onNodeStateChange(nodeId, updatedStatus.getState()); + } + return updated; } @@ -511,7 +517,6 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl request.setExplanation(explanation); addNodeEvent(nodeId, "Offload requested due to " + explanation); - onNodeOffloaded(nodeId); offloadAsynchronously(request, 10, 5); } @@ -572,10 +577,6 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl storeState(); } - private void onNodeOffloaded(final NodeIdentifier nodeId) { - eventListeners.forEach(listener -> listener.onNodeOffloaded(nodeId)); - } - private void onNodeRemoved(final NodeIdentifier nodeId) { eventListeners.forEach(listener -> listener.onNodeRemoved(nodeId)); } @@ -587,6 +588,10 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl eventListeners.forEach(listener -> listener.onNodeAdded(nodeId)); } + private void onNodeStateChange(final NodeIdentifier nodeId, final NodeConnectionState nodeConnectionState) { + eventListeners.forEach(listener -> listener.onNodeStateChange(nodeId, nodeConnectionState)); + } + @Override public NodeConnectionStatus getConnectionStatus(final NodeIdentifier nodeId) { return nodeStatuses.get(nodeId); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java index 4c9188b4ec..e99d17dd6b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java @@ -19,6 +19,8 @@ package org.apache.nifi.controller.queue.clustered; import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener; +import org.apache.nifi.cluster.coordination.node.NodeConnectionState; +import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.queue.AbstractFlowFileQueue; @@ -114,7 +116,7 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple private QueuePartition[] queuePartitions; private FlowFilePartitioner partitioner; private boolean stopped = true; - private boolean offloaded = false; + private volatile boolean offloaded = false; public SocketLoadBalancedFlowFileQueue(final String identifier, final ConnectionEventListener eventListener, final ProcessScheduler scheduler, final FlowFileRepository flowFileRepo, @@ -184,26 +186,28 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple return; } - // We are already load balancing but are changing how we are load balancing. - final FlowFilePartitioner partitioner; - switch (strategy) { - case DO_NOT_LOAD_BALANCE: - partitioner = new LocalPartitionPartitioner(); - break; - case PARTITION_BY_ATTRIBUTE: - partitioner = new CorrelationAttributePartitioner(partitioningAttribute); - break; - case ROUND_ROBIN: - partitioner = new RoundRobinPartitioner(); - break; - case SINGLE_NODE: - partitioner = new FirstNodePartitioner(); - break; - default: - throw new IllegalArgumentException(); - } + if (!offloaded) { + // We are already load balancing but are changing how we are load balancing. + final FlowFilePartitioner partitioner; + switch (strategy) { + case DO_NOT_LOAD_BALANCE: + partitioner = new LocalPartitionPartitioner(); + break; + case PARTITION_BY_ATTRIBUTE: + partitioner = new CorrelationAttributePartitioner(partitioningAttribute); + break; + case ROUND_ROBIN: + partitioner = new RoundRobinPartitioner(); + break; + case SINGLE_NODE: + partitioner = new FirstNodePartitioner(); + break; + default: + throw new IllegalArgumentException(); + } - setFlowFilePartitioner(partitioner); + setFlowFilePartitioner(partitioner); + } } @Override @@ -215,8 +219,33 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple offloaded = true; - // TODO need to be able to reset the partitioner to the previous partitioner if this node is reconnected to the cluster - setFlowFilePartitioner(new NonLocalPartitionPartitioner()); + partitionWriteLock.lock(); + try { + final Set nodesToKeep = new HashSet<>(); + + // If we have any nodes that are connected, we only want to send data to the connected nodes. + for (final QueuePartition partition : queuePartitions) { + final Optional nodeIdOption = partition.getNodeIdentifier(); + if (!nodeIdOption.isPresent()) { + continue; + } + + final NodeIdentifier nodeId = nodeIdOption.get(); + final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(nodeId); + if (status != null && status.getState() == NodeConnectionState.CONNECTED) { + nodesToKeep.add(nodeId); + } + } + + if (!nodesToKeep.isEmpty()) { + setNodeIdentifiers(nodesToKeep, false); + } + + // Update our partitioner so that we don't keep any data on the local partition + setFlowFilePartitioner(new NonLocalPartitionPartitioner()); + } finally { + partitionWriteLock.unlock(); + } } public synchronized void startLoadBalancing() { @@ -566,11 +595,6 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple return; } - if (offloaded) { - logger.debug("{} Not going to rebalance Queue even though setNodeIdentifiers was called, because the queue has been offloaded", this); - return; - } - logger.debug("{} Stopping the {} queue partitions in order to change node identifiers from {} to {}", this, queuePartitions.length, this.nodeIdentifiers, updatedNodeIdentifiers); for (final QueuePartition queuePartition : queuePartitions) { queuePartition.stop(); @@ -593,7 +617,7 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple final List sortedNodeIdentifiers = new ArrayList<>(updatedNodeIdentifiers); sortedNodeIdentifiers.sort(Comparator.comparing(nodeId -> nodeId.getApiAddress() + ":" + nodeId.getApiPort())); - final QueuePartition[] updatedQueuePartitions; + QueuePartition[] updatedQueuePartitions; if (sortedNodeIdentifiers.isEmpty()) { updatedQueuePartitions = new QueuePartition[] { localPartition }; } else { @@ -601,10 +625,12 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple } // Populate the new QueuePartitions. + boolean localPartitionIncluded = false; for (int i = 0; i < sortedNodeIdentifiers.size(); i++) { final NodeIdentifier nodeId = sortedNodeIdentifiers.get(i); if (nodeId.equals(clusterCoordinator.getLocalNodeIdentifier())) { updatedQueuePartitions[i] = localPartition; + localPartitionIncluded = true; // If we have RemoteQueuePartition with this Node ID with data, that data must be migrated to the local partition. // This can happen if we didn't previously know our Node UUID. @@ -622,6 +648,13 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple updatedQueuePartitions[i] = existingPartition == null ? createRemotePartition(nodeId) : existingPartition; } + if (!localPartitionIncluded) { + final QueuePartition[] withLocal = new QueuePartition[updatedQueuePartitions.length + 1]; + System.arraycopy(updatedQueuePartitions, 0, withLocal, 0, updatedQueuePartitions.length); + withLocal[withLocal.length - 1] = localPartition; + updatedQueuePartitions = withLocal; + } + // If the partition requires that all partitions be re-balanced when the number of partitions changes, then do so. // Otherwise, just rebalance the data from any Partitions that were removed, if any. if (partitioner.isRebalanceOnClusterResize()) { @@ -669,6 +702,7 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple } protected void rebalance(final QueuePartition partition) { + logger.debug("Rebalancing Partition {}", partition); final FlowFileQueueContents contents = partition.packageForRebalance(rebalancingPartition.getSwapPartitionName()); rebalancingPartition.rebalance(contents); } @@ -988,26 +1022,15 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple } } - @Override - public void onNodeOffloaded(final NodeIdentifier nodeId) { - partitionWriteLock.lock(); - try { - final Set updatedNodeIds = new HashSet<>(nodeIdentifiers); - updatedNodeIds.remove(nodeId); - - logger.debug("Node Identifier {} offloaded. Node ID's changing from {} to {}", nodeId, nodeIdentifiers, updatedNodeIds); - setNodeIdentifiers(updatedNodeIds, false); - } finally { - partitionWriteLock.unlock(); - } - } - @Override public void onNodeRemoved(final NodeIdentifier nodeId) { partitionWriteLock.lock(); try { final Set updatedNodeIds = new HashSet<>(nodeIdentifiers); - updatedNodeIds.remove(nodeId); + final boolean removed = updatedNodeIds.remove(nodeId); + if (!removed) { + return; + } logger.debug("Node Identifier {} removed from cluster. Node ID's changing from {} to {}", nodeId, nodeIdentifiers, updatedNodeIds); setNodeIdentifiers(updatedNodeIds, false); @@ -1063,6 +1086,27 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple partitionWriteLock.unlock(); } } + + @Override + public void onNodeStateChange(final NodeIdentifier nodeId, final NodeConnectionState newState) { + partitionWriteLock.lock(); + try { + if (!offloaded) { + return; + } + + switch (newState) { + case OFFLOADED: + case OFFLOADING: + case DISCONNECTED: + case DISCONNECTING: + onNodeRemoved(nodeId); + break; + } + } finally { + partitionWriteLock.unlock(); + } + } } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/NonLocalPartitionPartitioner.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/NonLocalPartitionPartitioner.java index cffaefd5cb..0953ce2c4d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/NonLocalPartitionPartitioner.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/NonLocalPartitionPartitioner.java @@ -47,7 +47,7 @@ public class NonLocalPartitionPartitioner implements FlowFilePartitioner { @Override public boolean isRebalanceOnClusterResize() { - return false; + return true; }