NIFI-11333: Do not allow components to be removed while a node is disconnected

This closes #7085

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2023-03-24 15:06:43 -04:00 committed by exceptionfactory
parent 969fc50778
commit 94ae926c42
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
4 changed files with 120 additions and 60 deletions

View File

@ -27,11 +27,11 @@ import java.util.Set;
import java.util.regex.Pattern;
public class ParameterContextEndpointMerger extends AbstractSingleEntityEndpoint<ParameterContextEntity> implements EndpointResponseMerger {
private static final Pattern PARAMETER_CONTEXT_URI = Pattern.compile("/nifi-api/parameter-contexts/[a-f0-9\\-]{36}");
public static final Pattern PARAMETER_CONTEXT_URI_PATTERN = Pattern.compile("/nifi-api/parameter-contexts/[a-f0-9\\-]{36}");
@Override
public boolean canHandle(final URI uri, final String method) {
return ("GET".equalsIgnoreCase(method) || "DELETE".equalsIgnoreCase(method)) && PARAMETER_CONTEXT_URI.matcher(uri.getPath()).matches();
return ("GET".equalsIgnoreCase(method) || "DELETE".equalsIgnoreCase(method)) && PARAMETER_CONTEXT_URI_PATTERN.matcher(uri.getPath()).matches();
}
@Override

View File

@ -27,11 +27,11 @@ import java.util.Set;
import java.util.regex.Pattern;
public class ParameterProviderEndpointMerger extends AbstractSingleEntityEndpoint<ParameterProviderEntity> implements EndpointResponseMerger {
private static final Pattern PARAMETER_PROVIDER_URI = Pattern.compile("/nifi-api/parameter-providers/[a-f0-9\\-]{36}");
public static final Pattern PARAMETER_PROVIDER_URI_PATTERN = Pattern.compile("/nifi-api/parameter-providers/[a-f0-9\\-]{36}");
@Override
public boolean canHandle(final URI uri, final String method) {
return ("GET".equalsIgnoreCase(method) || "DELETE".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && PARAMETER_PROVIDER_URI.matcher(uri.getPath()).matches();
return ("GET".equalsIgnoreCase(method) || "DELETE".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && PARAMETER_PROVIDER_URI_PATTERN.matcher(uri.getPath()).matches();
}
@Override

View File

@ -25,13 +25,23 @@ import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.http.HttpResponseMapper;
import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMapper;
import org.apache.nifi.cluster.coordination.http.endpoints.ConnectionEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServiceEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.FlowRegistryClientEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.FunnelEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.LabelEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.ParameterContextEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.ParameterProviderEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.PortEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.ProcessGroupEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTaskEndpointMerger;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException;
import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException;
import org.apache.nifi.cluster.manager.exception.OffloadedNodeMutableRequestException;
import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
import org.apache.nifi.cluster.manager.exception.UriConstructionException;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
@ -51,6 +61,7 @@ import javax.ws.rs.core.Response.Status;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -73,12 +84,14 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class ThreadPoolRequestReplicator implements RequestReplicator {
private static final Logger logger = LoggerFactory.getLogger(ThreadPoolRequestReplicator.class);
private static final Pattern SNIPPET_URI_PATTERN = Pattern.compile("/nifi-api/snippets/[a-f0-9\\-]{36}");
private final int maxConcurrentRequests; // maximum number of concurrent requests
private final HttpResponseMapper responseMapper;
@ -180,10 +193,11 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
}
}
if (isDeleteConnection(method, uri.getPath())) {
final List<NodeIdentifier> offloading = stateMap.get(NodeConnectionState.OFFLOADING);
if (offloading != null && !offloading.isEmpty()) {
throw new OffloadedNodeMutableRequestException("Cannot delete conection because the following Nodes are currently being offloaded: " + offloading);
// Do not allow any components to be deleted unless all nodes are connected.
if (isDeleteComponent(method, uri.getPath())) {
final List<NodeIdentifier> nonConnectedNodes = getNonConnectedNodes(stateMap);
if (!nonConnectedNodes.isEmpty()) {
throw new IllegalClusterStateException("Cannot delete component because the following Nodes are not connected: " + nonConnectedNodes);
}
}
}
@ -198,6 +212,25 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
return replicate(nodeIdSet, user, method, uri, entity, headers, true, true);
}
private List<NodeIdentifier> getNonConnectedNodes(final Map<NodeConnectionState, List<NodeIdentifier>> stateMap) {
// We want to include all nodes except for those that are CONNECTED or REMOVED.
// We do that by first adding all nodes to the List and then removing those that are CONNECTED or REMOVED.
final List<NodeIdentifier> nonConnectedNodes = new ArrayList<>();
stateMap.values().forEach(nonConnectedNodes::addAll);
final List<NodeIdentifier> connectedNodes = stateMap.get(NodeConnectionState.CONNECTED);
if (connectedNodes != null) {
nonConnectedNodes.removeAll(connectedNodes);
}
final List<NodeIdentifier> removedNodes = stateMap.get(NodeConnectionState.REMOVED);
if (removedNodes != null) {
nonConnectedNodes.removeAll(removedNodes);
}
return nonConnectedNodes;
}
void updateRequestHeaders(final Map<String, String> headers, final NiFiUser user) {
if (user == null) {
throw new AccessDeniedException("Unknown user");
@ -638,13 +671,33 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
}
}
private boolean isDeleteConnection(final String method, final String uriPath) {
private boolean isDeleteComponent(final String method, final String uriPath) {
if (!HttpMethod.DELETE.equalsIgnoreCase(method)) {
return false;
}
final boolean isConnectionUri = ConnectionEndpointMerger.CONNECTION_URI_PATTERN.matcher(uriPath).matches();
return isConnectionUri;
// Check if the URI indicates that a component should be deleted.
// We cannot simply make our decision based on the fact that the request is a DELETE request.
// This is because we do need to allow deletion of asynchronous requests, such as updating parameters, querying provenance, etc.
// which create a request, poll until the request completes, and then deletes it. Additionally, we want to allow terminating
// Processors, which is done by issuing a request to DELETE /processors/<id>/threads
final boolean componentUri = ConnectionEndpointMerger.CONNECTION_URI_PATTERN.matcher(uriPath).matches()
|| ProcessorEndpointMerger.PROCESSOR_URI_PATTERN.matcher(uriPath).matches()
|| FunnelEndpointMerger.FUNNEL_URI_PATTERN.matcher(uriPath).matches()
|| PortEndpointMerger.INPUT_PORT_URI_PATTERN.matcher(uriPath).matches()
|| PortEndpointMerger.OUTPUT_PORT_URI_PATTERN.matcher(uriPath).matches()
|| RemoteProcessGroupEndpointMerger.REMOTE_PROCESS_GROUP_URI_PATTERN.matcher(uriPath).matches()
|| ProcessGroupEndpointMerger.PROCESS_GROUP_URI_PATTERN.matcher(uriPath).matches()
|| ControllerServiceEndpointMerger.CONTROLLER_CONTROLLER_SERVICES_URI.equals(uriPath)
|| ControllerServiceEndpointMerger.CONTROLLER_SERVICE_URI_PATTERN.matcher(uriPath).matches()
|| ReportingTaskEndpointMerger.REPORTING_TASK_URI_PATTERN.matcher(uriPath).matches()
|| ParameterContextEndpointMerger.PARAMETER_CONTEXT_URI_PATTERN.matcher(uriPath).matches()
|| LabelEndpointMerger.LABEL_URI_PATTERN.matcher(uriPath).matches()
|| ParameterProviderEndpointMerger.PARAMETER_PROVIDER_URI_PATTERN.matcher(uriPath).matches()
|| FlowRegistryClientEndpointMerger.CONTROLLER_REGISTRY_URI_PATTERN.matcher(uriPath).matches()
|| SNIPPET_URI_PATTERN.matcher(uriPath).matches();
return componentUri;
}
/**
@ -652,7 +705,6 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
*
* @param httpMethod the HTTP Method
* @param uriPath the URI Path
* @throw IllegalClusterStateException if the cluster is not in a state that allows a request to made to the given URI Path using the given HTTP Method
*/
private void verifyClusterState(final String httpMethod, final String uriPath) {
final boolean mutableRequest = HttpMethod.DELETE.equals(httpMethod) || HttpMethod.POST.equals(httpMethod) || HttpMethod.PUT.equals(httpMethod);
@ -666,6 +718,13 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
throw new ConnectingNodeMutableRequestException("Received a mutable request [" + httpMethod + " " + uriPath + "] while a node is trying to connect to the cluster");
}
}
if (isDeleteComponent(httpMethod, uriPath)) {
final List<NodeIdentifier> nonConnectedNodes = getNonConnectedNodes(clusterCoordinator.getConnectionStates());
if (!nonConnectedNodes.isEmpty()) {
throw new IllegalClusterStateException("Cannot delete component because the following Nodes are not connected: " + nonConnectedNodes);
}
}
}
/**

View File

@ -35,7 +35,6 @@ import org.apache.nifi.toolkit.cli.impl.client.nifi.ProcessorClient;
import org.apache.nifi.toolkit.cli.impl.client.nifi.RequestConfig;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.NodeDTO;
import org.apache.nifi.web.api.dto.NodeEventDTO;
import org.apache.nifi.web.api.dto.PortDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.flow.FlowDTO;
@ -65,13 +64,13 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.zip.GZIPInputStream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class FlowSynchronizationIT extends NiFiSystemIT {
@ -383,13 +382,9 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
final NiFiInstance node2 = getNiFiInstance().getNodeInstance(2);
node2.stop();
// Remove node from the cluster. This way we know when it's attempted to connected
final NodeDTO node2Dto = getNodeEntity(2).getNode();
final String node2Id = node2Dto.getNodeId();
final Integer node2ApiPort = node2Dto.getApiPort();
getNifiClient().getControllerClient().deleteNode(node2Id);
waitFor(() -> isNodeRemoved(node2ApiPort));
// Remove node from the cluster. This way we know when it's attempted to connect
final Integer node2ApiPort = getNodeApiPort(2);
removeNode(2);
removeExtensionsNar(node2);
node2.start(false);
@ -399,6 +394,26 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
// Wait for node to show as disconnected because it doesn't have the necessary nar
waitForNodeState(2, NodeConnectionState.DISCONNECTED);
// We need to either restart Node 2 or remove it from the cluster in order to ensure that we can properly shutdown.
// Reconnecting to the cluster would require restoring the NAR file and restarting, which will take longer than simply removing the
// node from the cluster. So we opt for shutting down the node and removing it from the cluster.
node2.stop();
removeNode(2);
}
private void removeNode(final int index) throws NiFiClientException, IOException, InterruptedException {
final NodeDTO nodeDto = getNodeEntity(index).getNode();
final String nodeId = nodeDto.getNodeId();
final Integer apiPort = nodeDto.getApiPort();
getNifiClient().getControllerClient().deleteNode(nodeId);
waitFor(() -> isNodeRemoved(apiPort));
}
private Integer getNodeApiPort(final int index) throws NiFiClientException, IOException {
final NodeDTO nodeDto = getNodeEntity(index).getNode();
final Integer apiPort = nodeDto.getApiPort();
return apiPort;
}
@Test
@ -424,30 +439,34 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
assertTrue(getNifiClient().getProcessorClient().getProcessor(generate.getId()).getComponent().getExtensionMissing());
}
@Test
public void testCannotJoinIfMissingConnectionHasData() throws NiFiClientException, IOException, InterruptedException {
public void testCannotRemoveComponentsWhileNodeDisconnected() throws NiFiClientException, IOException, InterruptedException {
final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile");
final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile");
final ConnectionEntity connection = getClientUtil().createConnection(generate, terminate, "success");
getClientUtil().updateProcessorSchedulingPeriod(generate, "60 sec");
// Shut down node 2
disconnectNode(2);
waitForNodeState(2, NodeConnectionState.DISCONNECTED);
switchClientToNode(2);
getClientUtil().startProcessor(generate);
waitForQueueCount(connection.getId(), 1);
// Attempt to delete connection. It should throw an Exception.
assertThrows(Exception.class, () -> getNifiClient().getConnectionClient().deleteConnection(connection));
switchClientToNode(1);
getNifiClient().getConnectionClient().deleteConnection(connection);
// Attempt to delete processor. It should throw an Exception.
assertThrows(Exception.class, () -> getNifiClient().getProcessorClient().deleteProcessor(generate));
// Reconnect the node
reconnectNode(2);
waitForAllNodesConnected();
// Wait for node to be disconnected due to connection containing data
waitFor(() -> isNodeDisconnectedDueToMissingConnection(5672, connection.getId()));
// Ensure that we can delete the connection and the processors
getNifiClient().getConnectionClient().deleteConnection(connection);
getNifiClient().getProcessorClient().deleteProcessor(generate);
getNifiClient().getProcessorClient().deleteProcessor(terminate);
}
@Test
public void testComponentStatesRestoredOnReconnect() throws NiFiClientException, IOException, InterruptedException {
final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile");
@ -485,40 +504,22 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
});
}
private boolean isNodeDisconnectedDueToMissingConnection(final int nodeApiPort, final String connectionId) throws NiFiClientException, IOException {
final NodeDTO node2Dto = getNifiClient().getControllerClient().getNodes().getCluster().getNodes().stream()
.filter(dto -> dto.getApiPort() == nodeApiPort)
.findFirst()
.orElse(null);
if (node2Dto == null) {
return false;
}
if (!NodeConnectionState.DISCONNECTED.name().equals(node2Dto.getStatus())) {
return false;
}
// We should have an event indicating the ID of the connection that could not be removed
final List<NodeEventDTO> nodeEvents = node2Dto.getEvents();
for (final NodeEventDTO event : nodeEvents) {
if (event.getMessage().contains(connectionId)) {
return true;
}
}
return false;
}
private void removeExtensionsNar(final NiFiInstance nifiInstance) {
final File node2Lib = new File(nifiInstance.getInstanceDirectory(), "lib");
final File[] testExtensionsNar = node2Lib.listFiles(file -> file.getName().startsWith("nifi-system-test-extensions-nar-"));
final File extensionsNar = getExtensionsNar(nifiInstance);
final File backupFile = new File(extensionsNar.getParentFile(), extensionsNar.getName() + ".backup");
assertTrue(extensionsNar.renameTo(backupFile));
}
private File getExtensionsNar(final NiFiInstance nifiInstance) {
final File libDir = new File(nifiInstance.getInstanceDirectory(), "lib");
final File[] testExtensionsNar = libDir.listFiles(file -> file.getName().startsWith("nifi-system-test-extensions-nar-"));
assertEquals(1, testExtensionsNar.length);
final File extensionsNar = testExtensionsNar[0];
assertTrue(extensionsNar.delete());
return testExtensionsNar[0];
}
private boolean isNodeRemoved(final int apiPort) {
try {
return getNifiClient().getControllerClient().getNodes().getCluster().getNodes().stream()