NIFI-9794: If a node is OFFLOADING, do not allow connections to be deleted. Also if we fail to notify the node that it needs to offload its data, change its state back to DISCONNECTED. (#5865)

This commit is contained in:
markap14 2022-03-14 15:45:02 -04:00 committed by GitHub
parent a7edabb4ff
commit 21922af90c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 29 additions and 5 deletions

View File

@ -24,12 +24,14 @@ import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.http.HttpResponseMapper;
import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMapper;
import org.apache.nifi.cluster.coordination.http.endpoints.ConnectionEndpointMerger;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException;
import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException;
import org.apache.nifi.cluster.manager.exception.OffloadedNodeMutableRequestException;
import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
import org.apache.nifi.cluster.manager.exception.UriConstructionException;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
@ -164,9 +166,10 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
@Override
public AsyncClusterResponse replicate(NiFiUser user, String method, URI uri, Object entity, Map<String, String> headers) {
final Map<NodeConnectionState, List<NodeIdentifier>> stateMap = clusterCoordinator.getConnectionStates();
final boolean mutable = isMutableRequest(method, uri.getPath());
final boolean mutable = isMutableRequest(method);
// If the request is mutable, ensure that all nodes are connected.
// If the request is mutable, ensure the appropriate state: there can be no Connecting Nodes (in order to avoid confusion where a node gets the dataflow, and then gets modified before the
// node fully loads the dataflow), and we cannot delete a connection while a node is OFFLOADING (otherwise, we could delete a connection while a node is trying to push data to it).
if (mutable) {
final List<NodeIdentifier> connecting = stateMap.get(NodeConnectionState.CONNECTING);
if (connecting != null && !connecting.isEmpty()) {
@ -176,6 +179,13 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
throw new ConnectingNodeMutableRequestException(connecting.size() + " Nodes are currently connecting");
}
}
if (isDeleteConnection(method, uri.getPath())) {
final List<NodeIdentifier> offloading = stateMap.get(NodeConnectionState.OFFLOADING);
if (offloading != null && !offloading.isEmpty()) {
throw new OffloadedNodeMutableRequestException("Cannot delete conection because the following Nodes are currently being offloaded: " + offloading);
}
}
}
final List<NodeIdentifier> nodeIds = stateMap.get(NodeConnectionState.CONNECTED);
@ -243,7 +253,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
// performing an action, rather than simply proxying the request to the cluster coordinator. In this case,
// we need to ensure that we use proper locking. We don't want two requests modifying the flow at the same
// time, so we use a write lock if the request is mutable and a read lock otherwise.
final Lock lock = isMutableRequest(method, uri.getPath()) ? writeLock : readLock;
final Lock lock = isMutableRequest(method) ? writeLock : readLock;
logger.debug("Obtaining lock {} in order to replicate request {} {}", lock, method, uri);
lock.lock();
try {
@ -394,7 +404,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
// issue the request. This is all handled by calling performVerification, which will replicate
// the 'vote' request to all nodes and then if successful will call back into this method to
// replicate the actual request.
final boolean mutableRequest = isMutableRequest(method, uri.getPath());
final boolean mutableRequest = isMutableRequest(method);
if (mutableRequest && performVerification) {
logger.debug("Performing verification (first phase of two-phase commit) for Request ID {}", requestId);
performVerification(nodeIds, method, uri, entity, updatedHeaders, response, merge, monitor);
@ -617,7 +627,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
return nodeResponse;
}
private boolean isMutableRequest(final String method, final String uriPath) {
private boolean isMutableRequest(final String method) {
switch (method.toUpperCase()) {
case HttpMethod.GET:
case HttpMethod.HEAD:
@ -628,6 +638,15 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
}
}
private boolean isDeleteConnection(final String method, final String uriPath) {
if (!HttpMethod.DELETE.equalsIgnoreCase(method)) {
return false;
}
final boolean isConnectionUri = ConnectionEndpointMerger.CONNECTION_URI_PATTERN.matcher(uriPath).matches();
return isConnectionUri;
}
/**
* Verifies that the cluster is in a state that will allow requests to be made using the given HTTP Method and URI path
*

View File

@ -950,6 +950,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
try {
senderListener.offload(request);
reportEvent(nodeId, Severity.INFO, "Node was offloaded due to " + request.getExplanation());
future.complete(null);
return;
} catch (final Exception e) {
@ -965,6 +966,10 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
}
}
updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.DISCONNECTED, null,
"Attempted to offload node but failed to notify node that it was to offload its data. State reset to disconnected."));
addNodeEvent(nodeId, "Failed to initiate node offload: " + lastException);
future.completeExceptionally(lastException);
}
}, "Offload " + request.getNodeId());