NIFI-11333: Do not allow components to be removed while a node is disconnected

This closes #7085

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2023-03-24 15:06:43 -04:00 committed by exceptionfactory
parent 474e124667
commit d2fdff7b93
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
4 changed files with 120 additions and 60 deletions

View File

@ -27,11 +27,11 @@ import java.util.Set;
import java.util.regex.Pattern; import java.util.regex.Pattern;
public class ParameterContextEndpointMerger extends AbstractSingleEntityEndpoint<ParameterContextEntity> implements EndpointResponseMerger { public class ParameterContextEndpointMerger extends AbstractSingleEntityEndpoint<ParameterContextEntity> implements EndpointResponseMerger {
private static final Pattern PARAMETER_CONTEXT_URI = Pattern.compile("/nifi-api/parameter-contexts/[a-f0-9\\-]{36}"); public static final Pattern PARAMETER_CONTEXT_URI_PATTERN = Pattern.compile("/nifi-api/parameter-contexts/[a-f0-9\\-]{36}");
@Override @Override
public boolean canHandle(final URI uri, final String method) { public boolean canHandle(final URI uri, final String method) {
return ("GET".equalsIgnoreCase(method) || "DELETE".equalsIgnoreCase(method)) && PARAMETER_CONTEXT_URI.matcher(uri.getPath()).matches(); return ("GET".equalsIgnoreCase(method) || "DELETE".equalsIgnoreCase(method)) && PARAMETER_CONTEXT_URI_PATTERN.matcher(uri.getPath()).matches();
} }
@Override @Override

View File

@ -27,11 +27,11 @@ import java.util.Set;
import java.util.regex.Pattern; import java.util.regex.Pattern;
public class ParameterProviderEndpointMerger extends AbstractSingleEntityEndpoint<ParameterProviderEntity> implements EndpointResponseMerger { public class ParameterProviderEndpointMerger extends AbstractSingleEntityEndpoint<ParameterProviderEntity> implements EndpointResponseMerger {
private static final Pattern PARAMETER_PROVIDER_URI = Pattern.compile("/nifi-api/parameter-providers/[a-f0-9\\-]{36}"); public static final Pattern PARAMETER_PROVIDER_URI_PATTERN = Pattern.compile("/nifi-api/parameter-providers/[a-f0-9\\-]{36}");
@Override @Override
public boolean canHandle(final URI uri, final String method) { public boolean canHandle(final URI uri, final String method) {
return ("GET".equalsIgnoreCase(method) || "DELETE".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && PARAMETER_PROVIDER_URI.matcher(uri.getPath()).matches(); return ("GET".equalsIgnoreCase(method) || "DELETE".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && PARAMETER_PROVIDER_URI_PATTERN.matcher(uri.getPath()).matches();
} }
@Override @Override

View File

@ -25,13 +25,23 @@ import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.http.HttpResponseMapper; import org.apache.nifi.cluster.coordination.http.HttpResponseMapper;
import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMapper; import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMapper;
import org.apache.nifi.cluster.coordination.http.endpoints.ConnectionEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.ConnectionEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServiceEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.FlowRegistryClientEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.FunnelEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.LabelEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.ParameterContextEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.ParameterProviderEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.PortEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.ProcessGroupEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTaskEndpointMerger;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException; import org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException;
import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException; import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException; import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException;
import org.apache.nifi.cluster.manager.exception.OffloadedNodeMutableRequestException;
import org.apache.nifi.cluster.manager.exception.UnknownNodeException; import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
import org.apache.nifi.cluster.manager.exception.UriConstructionException; import org.apache.nifi.cluster.manager.exception.UriConstructionException;
import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.cluster.protocol.NodeIdentifier;
@ -51,6 +61,7 @@ import javax.ws.rs.core.Response.Status;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@ -73,12 +84,14 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function; import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
public class ThreadPoolRequestReplicator implements RequestReplicator { public class ThreadPoolRequestReplicator implements RequestReplicator {
private static final Logger logger = LoggerFactory.getLogger(ThreadPoolRequestReplicator.class); private static final Logger logger = LoggerFactory.getLogger(ThreadPoolRequestReplicator.class);
private static final Pattern SNIPPET_URI_PATTERN = Pattern.compile("/nifi-api/snippets/[a-f0-9\\-]{36}");
private final int maxConcurrentRequests; // maximum number of concurrent requests private final int maxConcurrentRequests; // maximum number of concurrent requests
private final HttpResponseMapper responseMapper; private final HttpResponseMapper responseMapper;
@ -180,10 +193,11 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
} }
} }
if (isDeleteConnection(method, uri.getPath())) { // Do not allow any components to be deleted unless all nodes are connected.
final List<NodeIdentifier> offloading = stateMap.get(NodeConnectionState.OFFLOADING); if (isDeleteComponent(method, uri.getPath())) {
if (offloading != null && !offloading.isEmpty()) { final List<NodeIdentifier> nonConnectedNodes = getNonConnectedNodes(stateMap);
throw new OffloadedNodeMutableRequestException("Cannot delete conection because the following Nodes are currently being offloaded: " + offloading); if (!nonConnectedNodes.isEmpty()) {
throw new IllegalClusterStateException("Cannot delete component because the following Nodes are not connected: " + nonConnectedNodes);
} }
} }
} }
@ -198,6 +212,25 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
return replicate(nodeIdSet, user, method, uri, entity, headers, true, true); return replicate(nodeIdSet, user, method, uri, entity, headers, true, true);
} }
private List<NodeIdentifier> getNonConnectedNodes(final Map<NodeConnectionState, List<NodeIdentifier>> stateMap) {
// We want to include all nodes except for those that are CONNECTED or REMOVED.
// We do that by first adding all nodes to the List and then removing those that are CONNECTED or REMOVED.
final List<NodeIdentifier> nonConnectedNodes = new ArrayList<>();
stateMap.values().forEach(nonConnectedNodes::addAll);
final List<NodeIdentifier> connectedNodes = stateMap.get(NodeConnectionState.CONNECTED);
if (connectedNodes != null) {
nonConnectedNodes.removeAll(connectedNodes);
}
final List<NodeIdentifier> removedNodes = stateMap.get(NodeConnectionState.REMOVED);
if (removedNodes != null) {
nonConnectedNodes.removeAll(removedNodes);
}
return nonConnectedNodes;
}
void updateRequestHeaders(final Map<String, String> headers, final NiFiUser user) { void updateRequestHeaders(final Map<String, String> headers, final NiFiUser user) {
if (user == null) { if (user == null) {
throw new AccessDeniedException("Unknown user"); throw new AccessDeniedException("Unknown user");
@ -638,13 +671,33 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
} }
} }
private boolean isDeleteConnection(final String method, final String uriPath) { private boolean isDeleteComponent(final String method, final String uriPath) {
if (!HttpMethod.DELETE.equalsIgnoreCase(method)) { if (!HttpMethod.DELETE.equalsIgnoreCase(method)) {
return false; return false;
} }
final boolean isConnectionUri = ConnectionEndpointMerger.CONNECTION_URI_PATTERN.matcher(uriPath).matches(); // Check if the URI indicates that a component should be deleted.
return isConnectionUri; // We cannot simply make our decision based on the fact that the request is a DELETE request.
// This is because we do need to allow deletion of asynchronous requests, such as updating parameters, querying provenance, etc.
// which create a request, poll until the request completes, and then deletes it. Additionally, we want to allow terminating
// Processors, which is done by issuing a request to DELETE /processors/<id>/threads
final boolean componentUri = ConnectionEndpointMerger.CONNECTION_URI_PATTERN.matcher(uriPath).matches()
|| ProcessorEndpointMerger.PROCESSOR_URI_PATTERN.matcher(uriPath).matches()
|| FunnelEndpointMerger.FUNNEL_URI_PATTERN.matcher(uriPath).matches()
|| PortEndpointMerger.INPUT_PORT_URI_PATTERN.matcher(uriPath).matches()
|| PortEndpointMerger.OUTPUT_PORT_URI_PATTERN.matcher(uriPath).matches()
|| RemoteProcessGroupEndpointMerger.REMOTE_PROCESS_GROUP_URI_PATTERN.matcher(uriPath).matches()
|| ProcessGroupEndpointMerger.PROCESS_GROUP_URI_PATTERN.matcher(uriPath).matches()
|| ControllerServiceEndpointMerger.CONTROLLER_CONTROLLER_SERVICES_URI.equals(uriPath)
|| ControllerServiceEndpointMerger.CONTROLLER_SERVICE_URI_PATTERN.matcher(uriPath).matches()
|| ReportingTaskEndpointMerger.REPORTING_TASK_URI_PATTERN.matcher(uriPath).matches()
|| ParameterContextEndpointMerger.PARAMETER_CONTEXT_URI_PATTERN.matcher(uriPath).matches()
|| LabelEndpointMerger.LABEL_URI_PATTERN.matcher(uriPath).matches()
|| ParameterProviderEndpointMerger.PARAMETER_PROVIDER_URI_PATTERN.matcher(uriPath).matches()
|| FlowRegistryClientEndpointMerger.CONTROLLER_REGISTRY_URI_PATTERN.matcher(uriPath).matches()
|| SNIPPET_URI_PATTERN.matcher(uriPath).matches();
return componentUri;
} }
/** /**
@ -652,7 +705,6 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
* *
* @param httpMethod the HTTP Method * @param httpMethod the HTTP Method
* @param uriPath the URI Path * @param uriPath the URI Path
* @throw IllegalClusterStateException if the cluster is not in a state that allows a request to made to the given URI Path using the given HTTP Method
*/ */
private void verifyClusterState(final String httpMethod, final String uriPath) { private void verifyClusterState(final String httpMethod, final String uriPath) {
final boolean mutableRequest = HttpMethod.DELETE.equals(httpMethod) || HttpMethod.POST.equals(httpMethod) || HttpMethod.PUT.equals(httpMethod); final boolean mutableRequest = HttpMethod.DELETE.equals(httpMethod) || HttpMethod.POST.equals(httpMethod) || HttpMethod.PUT.equals(httpMethod);
@ -666,6 +718,13 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
throw new ConnectingNodeMutableRequestException("Received a mutable request [" + httpMethod + " " + uriPath + "] while a node is trying to connect to the cluster"); throw new ConnectingNodeMutableRequestException("Received a mutable request [" + httpMethod + " " + uriPath + "] while a node is trying to connect to the cluster");
} }
} }
if (isDeleteComponent(httpMethod, uriPath)) {
final List<NodeIdentifier> nonConnectedNodes = getNonConnectedNodes(clusterCoordinator.getConnectionStates());
if (!nonConnectedNodes.isEmpty()) {
throw new IllegalClusterStateException("Cannot delete component because the following Nodes are not connected: " + nonConnectedNodes);
}
}
} }
/** /**

View File

@ -35,7 +35,6 @@ import org.apache.nifi.toolkit.cli.impl.client.nifi.ProcessorClient;
import org.apache.nifi.toolkit.cli.impl.client.nifi.RequestConfig; import org.apache.nifi.toolkit.cli.impl.client.nifi.RequestConfig;
import org.apache.nifi.web.api.dto.ControllerServiceDTO; import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.NodeDTO; import org.apache.nifi.web.api.dto.NodeDTO;
import org.apache.nifi.web.api.dto.NodeEventDTO;
import org.apache.nifi.web.api.dto.PortDTO; import org.apache.nifi.web.api.dto.PortDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO; import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.flow.FlowDTO; import org.apache.nifi.web.api.dto.flow.FlowDTO;
@ -65,13 +64,13 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.zip.GZIPInputStream; import java.util.zip.GZIPInputStream;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
public class FlowSynchronizationIT extends NiFiSystemIT { public class FlowSynchronizationIT extends NiFiSystemIT {
@ -383,13 +382,9 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
final NiFiInstance node2 = getNiFiInstance().getNodeInstance(2); final NiFiInstance node2 = getNiFiInstance().getNodeInstance(2);
node2.stop(); node2.stop();
// Remove node from the cluster. This way we know when it's attempted to connected // Remove node from the cluster. This way we know when it's attempted to connect
final NodeDTO node2Dto = getNodeEntity(2).getNode(); final Integer node2ApiPort = getNodeApiPort(2);
final String node2Id = node2Dto.getNodeId(); removeNode(2);
final Integer node2ApiPort = node2Dto.getApiPort();
getNifiClient().getControllerClient().deleteNode(node2Id);
waitFor(() -> isNodeRemoved(node2ApiPort));
removeExtensionsNar(node2); removeExtensionsNar(node2);
node2.start(false); node2.start(false);
@ -399,6 +394,26 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
// Wait for node to show as disconnected because it doesn't have the necessary nar // Wait for node to show as disconnected because it doesn't have the necessary nar
waitForNodeState(2, NodeConnectionState.DISCONNECTED); waitForNodeState(2, NodeConnectionState.DISCONNECTED);
// We need to either restart Node 2 or remove it from the cluster in order to ensure that we can properly shutdown.
// Reconnecting to the cluster would require restoring the NAR file and restarting, which will take longer than simply removing the
// node from the cluster. So we opt for shutting down the node and removing it from the cluster.
node2.stop();
removeNode(2);
}
private void removeNode(final int index) throws NiFiClientException, IOException, InterruptedException {
final NodeDTO nodeDto = getNodeEntity(index).getNode();
final String nodeId = nodeDto.getNodeId();
final Integer apiPort = nodeDto.getApiPort();
getNifiClient().getControllerClient().deleteNode(nodeId);
waitFor(() -> isNodeRemoved(apiPort));
}
private Integer getNodeApiPort(final int index) throws NiFiClientException, IOException {
final NodeDTO nodeDto = getNodeEntity(index).getNode();
final Integer apiPort = nodeDto.getApiPort();
return apiPort;
} }
@Test @Test
@ -424,30 +439,34 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
assertTrue(getNifiClient().getProcessorClient().getProcessor(generate.getId()).getComponent().getExtensionMissing()); assertTrue(getNifiClient().getProcessorClient().getProcessor(generate.getId()).getComponent().getExtensionMissing());
} }
@Test @Test
public void testCannotJoinIfMissingConnectionHasData() throws NiFiClientException, IOException, InterruptedException { public void testCannotRemoveComponentsWhileNodeDisconnected() throws NiFiClientException, IOException, InterruptedException {
final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile"); final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile");
final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile"); final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile");
final ConnectionEntity connection = getClientUtil().createConnection(generate, terminate, "success"); final ConnectionEntity connection = getClientUtil().createConnection(generate, terminate, "success");
getClientUtil().updateProcessorSchedulingPeriod(generate, "60 sec");
// Shut down node 2 // Shut down node 2
disconnectNode(2); disconnectNode(2);
waitForNodeState(2, NodeConnectionState.DISCONNECTED);
switchClientToNode(2); // Attempt to delete connection. It should throw an Exception.
getClientUtil().startProcessor(generate); assertThrows(Exception.class, () -> getNifiClient().getConnectionClient().deleteConnection(connection));
waitForQueueCount(connection.getId(), 1);
switchClientToNode(1); // Attempt to delete processor. It should throw an Exception.
getNifiClient().getConnectionClient().deleteConnection(connection); assertThrows(Exception.class, () -> getNifiClient().getProcessorClient().deleteProcessor(generate));
// Reconnect the node
reconnectNode(2); reconnectNode(2);
waitForAllNodesConnected();
// Wait for node to be disconnected due to connection containing data // Ensure that we can delete the connection and the processors
waitFor(() -> isNodeDisconnectedDueToMissingConnection(5672, connection.getId())); getNifiClient().getConnectionClient().deleteConnection(connection);
getNifiClient().getProcessorClient().deleteProcessor(generate);
getNifiClient().getProcessorClient().deleteProcessor(terminate);
} }
@Test @Test
public void testComponentStatesRestoredOnReconnect() throws NiFiClientException, IOException, InterruptedException { public void testComponentStatesRestoredOnReconnect() throws NiFiClientException, IOException, InterruptedException {
final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile"); final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile");
@ -485,40 +504,22 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
}); });
} }
private boolean isNodeDisconnectedDueToMissingConnection(final int nodeApiPort, final String connectionId) throws NiFiClientException, IOException {
final NodeDTO node2Dto = getNifiClient().getControllerClient().getNodes().getCluster().getNodes().stream()
.filter(dto -> dto.getApiPort() == nodeApiPort)
.findFirst()
.orElse(null);
if (node2Dto == null) {
return false;
}
if (!NodeConnectionState.DISCONNECTED.name().equals(node2Dto.getStatus())) {
return false;
}
// We should have an event indicating the ID of the connection that could not be removed
final List<NodeEventDTO> nodeEvents = node2Dto.getEvents();
for (final NodeEventDTO event : nodeEvents) {
if (event.getMessage().contains(connectionId)) {
return true;
}
}
return false;
}
private void removeExtensionsNar(final NiFiInstance nifiInstance) { private void removeExtensionsNar(final NiFiInstance nifiInstance) {
final File node2Lib = new File(nifiInstance.getInstanceDirectory(), "lib"); final File extensionsNar = getExtensionsNar(nifiInstance);
final File[] testExtensionsNar = node2Lib.listFiles(file -> file.getName().startsWith("nifi-system-test-extensions-nar-")); final File backupFile = new File(extensionsNar.getParentFile(), extensionsNar.getName() + ".backup");
assertTrue(extensionsNar.renameTo(backupFile));
}
private File getExtensionsNar(final NiFiInstance nifiInstance) {
final File libDir = new File(nifiInstance.getInstanceDirectory(), "lib");
final File[] testExtensionsNar = libDir.listFiles(file -> file.getName().startsWith("nifi-system-test-extensions-nar-"));
assertEquals(1, testExtensionsNar.length); assertEquals(1, testExtensionsNar.length);
final File extensionsNar = testExtensionsNar[0]; return testExtensionsNar[0];
assertTrue(extensionsNar.delete());
} }
private boolean isNodeRemoved(final int apiPort) { private boolean isNodeRemoved(final int apiPort) {
try { try {
return getNifiClient().getControllerClient().getNodes().getCluster().getNodes().stream() return getNifiClient().getControllerClient().getNodes().getCluster().getNodes().stream()