NIFI-5585: Fixed bug that arised when multiple nodes were decommissioning at same time; could get into state where the nodes queued up data for one another so the data just stayed put

This commit is contained in:
Mark Payne 2018-09-24 09:17:22 -04:00
parent 04d8da8f46
commit be2c24cfaf
4 changed files with 101 additions and 51 deletions

View File

@ -17,15 +17,16 @@
package org.apache.nifi.cluster.coordination; package org.apache.nifi.cluster.coordination;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.cluster.protocol.NodeIdentifier;
public interface ClusterTopologyEventListener { public interface ClusterTopologyEventListener {
void onNodeAdded(NodeIdentifier nodeId); void onNodeAdded(NodeIdentifier nodeId);
void onNodeOffloaded(NodeIdentifier nodeId);
void onNodeRemoved(NodeIdentifier nodeId); void onNodeRemoved(NodeIdentifier nodeId);
void onLocalNodeIdentifierSet(NodeIdentifier localNodeId); void onLocalNodeIdentifierSet(NodeIdentifier localNodeId);
void onNodeStateChange(NodeIdentifier nodeId, NodeConnectionState newState);
} }

View File

@ -341,6 +341,8 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
final NodeConnectionStatus evictedStatus = nodeStatuses.put(nodeId, updatedStatus); final NodeConnectionStatus evictedStatus = nodeStatuses.put(nodeId, updatedStatus);
if (evictedStatus == null) { if (evictedStatus == null) {
onNodeAdded(nodeId, storeState); onNodeAdded(nodeId, storeState);
} else {
onNodeStateChange(nodeId, updatedStatus.getState());
} }
return evictedStatus; return evictedStatus;
@ -359,6 +361,10 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
updated = nodeStatuses.replace(nodeId, expectedStatus, updatedStatus); updated = nodeStatuses.replace(nodeId, expectedStatus, updatedStatus);
} }
if (updated) {
onNodeStateChange(nodeId, updatedStatus.getState());
}
return updated; return updated;
} }
@ -511,7 +517,6 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
request.setExplanation(explanation); request.setExplanation(explanation);
addNodeEvent(nodeId, "Offload requested due to " + explanation); addNodeEvent(nodeId, "Offload requested due to " + explanation);
onNodeOffloaded(nodeId);
offloadAsynchronously(request, 10, 5); offloadAsynchronously(request, 10, 5);
} }
@ -572,10 +577,6 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
storeState(); storeState();
} }
private void onNodeOffloaded(final NodeIdentifier nodeId) {
eventListeners.forEach(listener -> listener.onNodeOffloaded(nodeId));
}
private void onNodeRemoved(final NodeIdentifier nodeId) { private void onNodeRemoved(final NodeIdentifier nodeId) {
eventListeners.forEach(listener -> listener.onNodeRemoved(nodeId)); eventListeners.forEach(listener -> listener.onNodeRemoved(nodeId));
} }
@ -587,6 +588,10 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
eventListeners.forEach(listener -> listener.onNodeAdded(nodeId)); eventListeners.forEach(listener -> listener.onNodeAdded(nodeId));
} }
private void onNodeStateChange(final NodeIdentifier nodeId, final NodeConnectionState nodeConnectionState) {
eventListeners.forEach(listener -> listener.onNodeStateChange(nodeId, nodeConnectionState));
}
@Override @Override
public NodeConnectionStatus getConnectionStatus(final NodeIdentifier nodeId) { public NodeConnectionStatus getConnectionStatus(final NodeIdentifier nodeId) {
return nodeStatuses.get(nodeId); return nodeStatuses.get(nodeId);

View File

@ -19,6 +19,8 @@ package org.apache.nifi.controller.queue.clustered;
import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener; import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.queue.AbstractFlowFileQueue; import org.apache.nifi.controller.queue.AbstractFlowFileQueue;
@ -114,7 +116,7 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
private QueuePartition[] queuePartitions; private QueuePartition[] queuePartitions;
private FlowFilePartitioner partitioner; private FlowFilePartitioner partitioner;
private boolean stopped = true; private boolean stopped = true;
private boolean offloaded = false; private volatile boolean offloaded = false;
public SocketLoadBalancedFlowFileQueue(final String identifier, final ConnectionEventListener eventListener, final ProcessScheduler scheduler, final FlowFileRepository flowFileRepo, public SocketLoadBalancedFlowFileQueue(final String identifier, final ConnectionEventListener eventListener, final ProcessScheduler scheduler, final FlowFileRepository flowFileRepo,
@ -184,26 +186,28 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
return; return;
} }
// We are already load balancing but are changing how we are load balancing. if (!offloaded) {
final FlowFilePartitioner partitioner; // We are already load balancing but are changing how we are load balancing.
switch (strategy) { final FlowFilePartitioner partitioner;
case DO_NOT_LOAD_BALANCE: switch (strategy) {
partitioner = new LocalPartitionPartitioner(); case DO_NOT_LOAD_BALANCE:
break; partitioner = new LocalPartitionPartitioner();
case PARTITION_BY_ATTRIBUTE: break;
partitioner = new CorrelationAttributePartitioner(partitioningAttribute); case PARTITION_BY_ATTRIBUTE:
break; partitioner = new CorrelationAttributePartitioner(partitioningAttribute);
case ROUND_ROBIN: break;
partitioner = new RoundRobinPartitioner(); case ROUND_ROBIN:
break; partitioner = new RoundRobinPartitioner();
case SINGLE_NODE: break;
partitioner = new FirstNodePartitioner(); case SINGLE_NODE:
break; partitioner = new FirstNodePartitioner();
default: break;
throw new IllegalArgumentException(); default:
} throw new IllegalArgumentException();
}
setFlowFilePartitioner(partitioner); setFlowFilePartitioner(partitioner);
}
} }
@Override @Override
@ -215,8 +219,33 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
offloaded = true; offloaded = true;
// TODO need to be able to reset the partitioner to the previous partitioner if this node is reconnected to the cluster partitionWriteLock.lock();
setFlowFilePartitioner(new NonLocalPartitionPartitioner()); try {
final Set<NodeIdentifier> nodesToKeep = new HashSet<>();
// If we have any nodes that are connected, we only want to send data to the connected nodes.
for (final QueuePartition partition : queuePartitions) {
final Optional<NodeIdentifier> nodeIdOption = partition.getNodeIdentifier();
if (!nodeIdOption.isPresent()) {
continue;
}
final NodeIdentifier nodeId = nodeIdOption.get();
final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(nodeId);
if (status != null && status.getState() == NodeConnectionState.CONNECTED) {
nodesToKeep.add(nodeId);
}
}
if (!nodesToKeep.isEmpty()) {
setNodeIdentifiers(nodesToKeep, false);
}
// Update our partitioner so that we don't keep any data on the local partition
setFlowFilePartitioner(new NonLocalPartitionPartitioner());
} finally {
partitionWriteLock.unlock();
}
} }
public synchronized void startLoadBalancing() { public synchronized void startLoadBalancing() {
@ -566,11 +595,6 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
return; return;
} }
if (offloaded) {
logger.debug("{} Not going to rebalance Queue even though setNodeIdentifiers was called, because the queue has been offloaded", this);
return;
}
logger.debug("{} Stopping the {} queue partitions in order to change node identifiers from {} to {}", this, queuePartitions.length, this.nodeIdentifiers, updatedNodeIdentifiers); logger.debug("{} Stopping the {} queue partitions in order to change node identifiers from {} to {}", this, queuePartitions.length, this.nodeIdentifiers, updatedNodeIdentifiers);
for (final QueuePartition queuePartition : queuePartitions) { for (final QueuePartition queuePartition : queuePartitions) {
queuePartition.stop(); queuePartition.stop();
@ -593,7 +617,7 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
final List<NodeIdentifier> sortedNodeIdentifiers = new ArrayList<>(updatedNodeIdentifiers); final List<NodeIdentifier> sortedNodeIdentifiers = new ArrayList<>(updatedNodeIdentifiers);
sortedNodeIdentifiers.sort(Comparator.comparing(nodeId -> nodeId.getApiAddress() + ":" + nodeId.getApiPort())); sortedNodeIdentifiers.sort(Comparator.comparing(nodeId -> nodeId.getApiAddress() + ":" + nodeId.getApiPort()));
final QueuePartition[] updatedQueuePartitions; QueuePartition[] updatedQueuePartitions;
if (sortedNodeIdentifiers.isEmpty()) { if (sortedNodeIdentifiers.isEmpty()) {
updatedQueuePartitions = new QueuePartition[] { localPartition }; updatedQueuePartitions = new QueuePartition[] { localPartition };
} else { } else {
@ -601,10 +625,12 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
} }
// Populate the new QueuePartitions. // Populate the new QueuePartitions.
boolean localPartitionIncluded = false;
for (int i = 0; i < sortedNodeIdentifiers.size(); i++) { for (int i = 0; i < sortedNodeIdentifiers.size(); i++) {
final NodeIdentifier nodeId = sortedNodeIdentifiers.get(i); final NodeIdentifier nodeId = sortedNodeIdentifiers.get(i);
if (nodeId.equals(clusterCoordinator.getLocalNodeIdentifier())) { if (nodeId.equals(clusterCoordinator.getLocalNodeIdentifier())) {
updatedQueuePartitions[i] = localPartition; updatedQueuePartitions[i] = localPartition;
localPartitionIncluded = true;
// If we have RemoteQueuePartition with this Node ID with data, that data must be migrated to the local partition. // If we have RemoteQueuePartition with this Node ID with data, that data must be migrated to the local partition.
// This can happen if we didn't previously know our Node UUID. // This can happen if we didn't previously know our Node UUID.
@ -622,6 +648,13 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
updatedQueuePartitions[i] = existingPartition == null ? createRemotePartition(nodeId) : existingPartition; updatedQueuePartitions[i] = existingPartition == null ? createRemotePartition(nodeId) : existingPartition;
} }
if (!localPartitionIncluded) {
final QueuePartition[] withLocal = new QueuePartition[updatedQueuePartitions.length + 1];
System.arraycopy(updatedQueuePartitions, 0, withLocal, 0, updatedQueuePartitions.length);
withLocal[withLocal.length - 1] = localPartition;
updatedQueuePartitions = withLocal;
}
// If the partition requires that all partitions be re-balanced when the number of partitions changes, then do so. // If the partition requires that all partitions be re-balanced when the number of partitions changes, then do so.
// Otherwise, just rebalance the data from any Partitions that were removed, if any. // Otherwise, just rebalance the data from any Partitions that were removed, if any.
if (partitioner.isRebalanceOnClusterResize()) { if (partitioner.isRebalanceOnClusterResize()) {
@ -669,6 +702,7 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
} }
protected void rebalance(final QueuePartition partition) { protected void rebalance(final QueuePartition partition) {
logger.debug("Rebalancing Partition {}", partition);
final FlowFileQueueContents contents = partition.packageForRebalance(rebalancingPartition.getSwapPartitionName()); final FlowFileQueueContents contents = partition.packageForRebalance(rebalancingPartition.getSwapPartitionName());
rebalancingPartition.rebalance(contents); rebalancingPartition.rebalance(contents);
} }
@ -988,26 +1022,15 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
} }
} }
@Override
public void onNodeOffloaded(final NodeIdentifier nodeId) {
partitionWriteLock.lock();
try {
final Set<NodeIdentifier> updatedNodeIds = new HashSet<>(nodeIdentifiers);
updatedNodeIds.remove(nodeId);
logger.debug("Node Identifier {} offloaded. Node ID's changing from {} to {}", nodeId, nodeIdentifiers, updatedNodeIds);
setNodeIdentifiers(updatedNodeIds, false);
} finally {
partitionWriteLock.unlock();
}
}
@Override @Override
public void onNodeRemoved(final NodeIdentifier nodeId) { public void onNodeRemoved(final NodeIdentifier nodeId) {
partitionWriteLock.lock(); partitionWriteLock.lock();
try { try {
final Set<NodeIdentifier> updatedNodeIds = new HashSet<>(nodeIdentifiers); final Set<NodeIdentifier> updatedNodeIds = new HashSet<>(nodeIdentifiers);
updatedNodeIds.remove(nodeId); final boolean removed = updatedNodeIds.remove(nodeId);
if (!removed) {
return;
}
logger.debug("Node Identifier {} removed from cluster. Node ID's changing from {} to {}", nodeId, nodeIdentifiers, updatedNodeIds); logger.debug("Node Identifier {} removed from cluster. Node ID's changing from {} to {}", nodeId, nodeIdentifiers, updatedNodeIds);
setNodeIdentifiers(updatedNodeIds, false); setNodeIdentifiers(updatedNodeIds, false);
@ -1063,6 +1086,27 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
partitionWriteLock.unlock(); partitionWriteLock.unlock();
} }
} }
@Override
public void onNodeStateChange(final NodeIdentifier nodeId, final NodeConnectionState newState) {
partitionWriteLock.lock();
try {
if (!offloaded) {
return;
}
switch (newState) {
case OFFLOADED:
case OFFLOADING:
case DISCONNECTED:
case DISCONNECTING:
onNodeRemoved(nodeId);
break;
}
} finally {
partitionWriteLock.unlock();
}
}
} }
} }

View File

@ -47,7 +47,7 @@ public class NonLocalPartitionPartitioner implements FlowFilePartitioner {
@Override @Override
public boolean isRebalanceOnClusterResize() { public boolean isRebalanceOnClusterResize() {
return false; return true;
} }