diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ParameterContextEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ParameterContextEndpointMerger.java index 4719965812..f6222aa74f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ParameterContextEndpointMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ParameterContextEndpointMerger.java @@ -27,11 +27,11 @@ import java.util.Set; import java.util.regex.Pattern; public class ParameterContextEndpointMerger extends AbstractSingleEntityEndpoint implements EndpointResponseMerger { - private static final Pattern PARAMETER_CONTEXT_URI = Pattern.compile("/nifi-api/parameter-contexts/[a-f0-9\\-]{36}"); + public static final Pattern PARAMETER_CONTEXT_URI_PATTERN = Pattern.compile("/nifi-api/parameter-contexts/[a-f0-9\\-]{36}"); @Override public boolean canHandle(final URI uri, final String method) { - return ("GET".equalsIgnoreCase(method) || "DELETE".equalsIgnoreCase(method)) && PARAMETER_CONTEXT_URI.matcher(uri.getPath()).matches(); + return ("GET".equalsIgnoreCase(method) || "DELETE".equalsIgnoreCase(method)) && PARAMETER_CONTEXT_URI_PATTERN.matcher(uri.getPath()).matches(); } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ParameterProviderEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ParameterProviderEndpointMerger.java index 14d5ad0328..7714a79ec4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ParameterProviderEndpointMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ParameterProviderEndpointMerger.java @@ -27,11 +27,11 @@ import java.util.Set; import java.util.regex.Pattern; public class ParameterProviderEndpointMerger extends AbstractSingleEntityEndpoint implements EndpointResponseMerger { - private static final Pattern PARAMETER_PROVIDER_URI = Pattern.compile("/nifi-api/parameter-providers/[a-f0-9\\-]{36}"); + public static final Pattern PARAMETER_PROVIDER_URI_PATTERN = Pattern.compile("/nifi-api/parameter-providers/[a-f0-9\\-]{36}"); @Override public boolean canHandle(final URI uri, final String method) { - return ("GET".equalsIgnoreCase(method) || "DELETE".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && PARAMETER_PROVIDER_URI.matcher(uri.getPath()).matches(); + return ("GET".equalsIgnoreCase(method) || "DELETE".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && PARAMETER_PROVIDER_URI_PATTERN.matcher(uri.getPath()).matches(); } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java index fe39c8a76f..1c346f5d8c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java @@ -25,13 +25,23 @@ import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.http.HttpResponseMapper; import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMapper; import org.apache.nifi.cluster.coordination.http.endpoints.ConnectionEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServiceEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.FlowRegistryClientEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.FunnelEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.LabelEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ParameterContextEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ParameterProviderEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.PortEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ProcessGroupEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTaskEndpointMerger; import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException; import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException; import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException; -import org.apache.nifi.cluster.manager.exception.OffloadedNodeMutableRequestException; import org.apache.nifi.cluster.manager.exception.UnknownNodeException; import org.apache.nifi.cluster.manager.exception.UriConstructionException; import org.apache.nifi.cluster.protocol.NodeIdentifier; @@ -51,6 +61,7 @@ import javax.ws.rs.core.Response.Status; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -73,12 +84,14 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; +import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; public class ThreadPoolRequestReplicator implements RequestReplicator { private static final Logger logger = LoggerFactory.getLogger(ThreadPoolRequestReplicator.class); + private static final Pattern SNIPPET_URI_PATTERN = Pattern.compile("/nifi-api/snippets/[a-f0-9\\-]{36}"); private final int maxConcurrentRequests; // maximum number of concurrent requests private final HttpResponseMapper responseMapper; @@ -180,10 +193,11 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { } } - if (isDeleteConnection(method, uri.getPath())) { - final List offloading = stateMap.get(NodeConnectionState.OFFLOADING); - if (offloading != null && !offloading.isEmpty()) { - throw new OffloadedNodeMutableRequestException("Cannot delete conection because the following Nodes are currently being offloaded: " + offloading); + // Do not allow any components to be deleted unless all nodes are connected. + if (isDeleteComponent(method, uri.getPath())) { + final List nonConnectedNodes = getNonConnectedNodes(stateMap); + if (!nonConnectedNodes.isEmpty()) { + throw new IllegalClusterStateException("Cannot delete component because the following Nodes are not connected: " + nonConnectedNodes); } } } @@ -198,6 +212,25 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { return replicate(nodeIdSet, user, method, uri, entity, headers, true, true); } + private List getNonConnectedNodes(final Map> stateMap) { + // We want to include all nodes except for those that are CONNECTED or REMOVED. + // We do that by first adding all nodes to the List and then removing those that are CONNECTED or REMOVED. + final List nonConnectedNodes = new ArrayList<>(); + stateMap.values().forEach(nonConnectedNodes::addAll); + + final List connectedNodes = stateMap.get(NodeConnectionState.CONNECTED); + if (connectedNodes != null) { + nonConnectedNodes.removeAll(connectedNodes); + } + + final List removedNodes = stateMap.get(NodeConnectionState.REMOVED); + if (removedNodes != null) { + nonConnectedNodes.removeAll(removedNodes); + } + + return nonConnectedNodes; + } + void updateRequestHeaders(final Map headers, final NiFiUser user) { if (user == null) { throw new AccessDeniedException("Unknown user"); @@ -638,13 +671,33 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { } } - private boolean isDeleteConnection(final String method, final String uriPath) { + private boolean isDeleteComponent(final String method, final String uriPath) { if (!HttpMethod.DELETE.equalsIgnoreCase(method)) { return false; } - final boolean isConnectionUri = ConnectionEndpointMerger.CONNECTION_URI_PATTERN.matcher(uriPath).matches(); - return isConnectionUri; + // Check if the URI indicates that a component should be deleted. + // We cannot simply make our decision based on the fact that the request is a DELETE request. + // This is because we do need to allow deletion of asynchronous requests, such as updating parameters, querying provenance, etc. + // which create a request, poll until the request completes, and then deletes it. Additionally, we want to allow terminating + // Processors, which is done by issuing a request to DELETE /processors//threads + final boolean componentUri = ConnectionEndpointMerger.CONNECTION_URI_PATTERN.matcher(uriPath).matches() + || ProcessorEndpointMerger.PROCESSOR_URI_PATTERN.matcher(uriPath).matches() + || FunnelEndpointMerger.FUNNEL_URI_PATTERN.matcher(uriPath).matches() + || PortEndpointMerger.INPUT_PORT_URI_PATTERN.matcher(uriPath).matches() + || PortEndpointMerger.OUTPUT_PORT_URI_PATTERN.matcher(uriPath).matches() + || RemoteProcessGroupEndpointMerger.REMOTE_PROCESS_GROUP_URI_PATTERN.matcher(uriPath).matches() + || ProcessGroupEndpointMerger.PROCESS_GROUP_URI_PATTERN.matcher(uriPath).matches() + || ControllerServiceEndpointMerger.CONTROLLER_CONTROLLER_SERVICES_URI.equals(uriPath) + || ControllerServiceEndpointMerger.CONTROLLER_SERVICE_URI_PATTERN.matcher(uriPath).matches() + || ReportingTaskEndpointMerger.REPORTING_TASK_URI_PATTERN.matcher(uriPath).matches() + || ParameterContextEndpointMerger.PARAMETER_CONTEXT_URI_PATTERN.matcher(uriPath).matches() + || LabelEndpointMerger.LABEL_URI_PATTERN.matcher(uriPath).matches() + || ParameterProviderEndpointMerger.PARAMETER_PROVIDER_URI_PATTERN.matcher(uriPath).matches() + || FlowRegistryClientEndpointMerger.CONTROLLER_REGISTRY_URI_PATTERN.matcher(uriPath).matches() + || SNIPPET_URI_PATTERN.matcher(uriPath).matches(); + + return componentUri; } /** @@ -652,7 +705,6 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { * * @param httpMethod the HTTP Method * @param uriPath the URI Path - * @throw IllegalClusterStateException if the cluster is not in a state that allows a request to made to the given URI Path using the given HTTP Method */ private void verifyClusterState(final String httpMethod, final String uriPath) { final boolean mutableRequest = HttpMethod.DELETE.equals(httpMethod) || HttpMethod.POST.equals(httpMethod) || HttpMethod.PUT.equals(httpMethod); @@ -666,6 +718,13 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { throw new ConnectingNodeMutableRequestException("Received a mutable request [" + httpMethod + " " + uriPath + "] while a node is trying to connect to the cluster"); } } + + if (isDeleteComponent(httpMethod, uriPath)) { + final List nonConnectedNodes = getNonConnectedNodes(clusterCoordinator.getConnectionStates()); + if (!nonConnectedNodes.isEmpty()) { + throw new IllegalClusterStateException("Cannot delete component because the following Nodes are not connected: " + nonConnectedNodes); + } + } } /** diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java index a6da0107ed..aff856f05d 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java @@ -35,7 +35,6 @@ import org.apache.nifi.toolkit.cli.impl.client.nifi.ProcessorClient; import org.apache.nifi.toolkit.cli.impl.client.nifi.RequestConfig; import org.apache.nifi.web.api.dto.ControllerServiceDTO; import org.apache.nifi.web.api.dto.NodeDTO; -import org.apache.nifi.web.api.dto.NodeEventDTO; import org.apache.nifi.web.api.dto.PortDTO; import org.apache.nifi.web.api.dto.ProcessorDTO; import org.apache.nifi.web.api.dto.flow.FlowDTO; @@ -65,13 +64,13 @@ import java.io.IOException; import java.io.InputStream; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.zip.GZIPInputStream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; public class FlowSynchronizationIT extends NiFiSystemIT { @@ -383,13 +382,9 @@ public class FlowSynchronizationIT extends NiFiSystemIT { final NiFiInstance node2 = getNiFiInstance().getNodeInstance(2); node2.stop(); - // Remove node from the cluster. This way we know when it's attempted to connected - final NodeDTO node2Dto = getNodeEntity(2).getNode(); - final String node2Id = node2Dto.getNodeId(); - final Integer node2ApiPort = node2Dto.getApiPort(); - getNifiClient().getControllerClient().deleteNode(node2Id); - waitFor(() -> isNodeRemoved(node2ApiPort)); - + // Remove node from the cluster. This way we know when it's attempted to connect + final Integer node2ApiPort = getNodeApiPort(2); + removeNode(2); removeExtensionsNar(node2); node2.start(false); @@ -399,6 +394,26 @@ public class FlowSynchronizationIT extends NiFiSystemIT { // Wait for node to show as disconnected because it doesn't have the necessary nar waitForNodeState(2, NodeConnectionState.DISCONNECTED); + + // We need to either restart Node 2 or remove it from the cluster in order to ensure that we can properly shutdown. + // Reconnecting to the cluster would require restoring the NAR file and restarting, which will take longer than simply removing the + // node from the cluster. So we opt for shutting down the node and removing it from the cluster. + node2.stop(); + removeNode(2); + } + + private void removeNode(final int index) throws NiFiClientException, IOException, InterruptedException { + final NodeDTO nodeDto = getNodeEntity(index).getNode(); + final String nodeId = nodeDto.getNodeId(); + final Integer apiPort = nodeDto.getApiPort(); + getNifiClient().getControllerClient().deleteNode(nodeId); + waitFor(() -> isNodeRemoved(apiPort)); + } + + private Integer getNodeApiPort(final int index) throws NiFiClientException, IOException { + final NodeDTO nodeDto = getNodeEntity(index).getNode(); + final Integer apiPort = nodeDto.getApiPort(); + return apiPort; } @Test @@ -424,30 +439,34 @@ public class FlowSynchronizationIT extends NiFiSystemIT { assertTrue(getNifiClient().getProcessorClient().getProcessor(generate.getId()).getComponent().getExtensionMissing()); } + @Test - public void testCannotJoinIfMissingConnectionHasData() throws NiFiClientException, IOException, InterruptedException { + public void testCannotRemoveComponentsWhileNodeDisconnected() throws NiFiClientException, IOException, InterruptedException { final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile"); final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile"); final ConnectionEntity connection = getClientUtil().createConnection(generate, terminate, "success"); - getClientUtil().updateProcessorSchedulingPeriod(generate, "60 sec"); - // Shut down node 2 disconnectNode(2); + waitForNodeState(2, NodeConnectionState.DISCONNECTED); - switchClientToNode(2); - getClientUtil().startProcessor(generate); - waitForQueueCount(connection.getId(), 1); + // Attempt to delete connection. It should throw an Exception. + assertThrows(Exception.class, () -> getNifiClient().getConnectionClient().deleteConnection(connection)); - switchClientToNode(1); - getNifiClient().getConnectionClient().deleteConnection(connection); + // Attempt to delete processor. It should throw an Exception. + assertThrows(Exception.class, () -> getNifiClient().getProcessorClient().deleteProcessor(generate)); + // Reconnect the node reconnectNode(2); + waitForAllNodesConnected(); - // Wait for node to be disconnected due to connection containing data - waitFor(() -> isNodeDisconnectedDueToMissingConnection(5672, connection.getId())); + // Ensure that we can delete the connection and the processors + getNifiClient().getConnectionClient().deleteConnection(connection); + getNifiClient().getProcessorClient().deleteProcessor(generate); + getNifiClient().getProcessorClient().deleteProcessor(terminate); } + @Test public void testComponentStatesRestoredOnReconnect() throws NiFiClientException, IOException, InterruptedException { final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile"); @@ -485,40 +504,22 @@ public class FlowSynchronizationIT extends NiFiSystemIT { }); } - private boolean isNodeDisconnectedDueToMissingConnection(final int nodeApiPort, final String connectionId) throws NiFiClientException, IOException { - final NodeDTO node2Dto = getNifiClient().getControllerClient().getNodes().getCluster().getNodes().stream() - .filter(dto -> dto.getApiPort() == nodeApiPort) - .findFirst() - .orElse(null); - - if (node2Dto == null) { - return false; - } - - if (!NodeConnectionState.DISCONNECTED.name().equals(node2Dto.getStatus())) { - return false; - } - - // We should have an event indicating the ID of the connection that could not be removed - final List nodeEvents = node2Dto.getEvents(); - for (final NodeEventDTO event : nodeEvents) { - if (event.getMessage().contains(connectionId)) { - return true; - } - } - - return false; - } private void removeExtensionsNar(final NiFiInstance nifiInstance) { - final File node2Lib = new File(nifiInstance.getInstanceDirectory(), "lib"); - final File[] testExtensionsNar = node2Lib.listFiles(file -> file.getName().startsWith("nifi-system-test-extensions-nar-")); + final File extensionsNar = getExtensionsNar(nifiInstance); + final File backupFile = new File(extensionsNar.getParentFile(), extensionsNar.getName() + ".backup"); + assertTrue(extensionsNar.renameTo(backupFile)); + } + + private File getExtensionsNar(final NiFiInstance nifiInstance) { + final File libDir = new File(nifiInstance.getInstanceDirectory(), "lib"); + final File[] testExtensionsNar = libDir.listFiles(file -> file.getName().startsWith("nifi-system-test-extensions-nar-")); assertEquals(1, testExtensionsNar.length); - final File extensionsNar = testExtensionsNar[0]; - assertTrue(extensionsNar.delete()); + return testExtensionsNar[0]; } + private boolean isNodeRemoved(final int apiPort) { try { return getNifiClient().getControllerClient().getNodes().getCluster().getNodes().stream()