diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java index b49f57ca79..a1af0f8f5c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java @@ -25,7 +25,7 @@ import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage; import org.apache.nifi.reporting.BulletinRepository; /** - * An interface for sending protocol messages from the cluster coordinator to nodes. + * An interface for sending protocol messages from the cluster manager to nodes. * */ public interface ClusterCoordinationProtocolSender { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java index 3bf5c0e37b..be9559dad9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java @@ -126,11 +126,9 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor { protected synchronized void monitorHeartbeats() { final Map latestHeartbeats = getLatestHeartbeats(); if (latestHeartbeats == null || latestHeartbeats.isEmpty()) { - if (!clusterCoordinator.isActiveClusterCoordinator()) { - logger.info("This node is no longer the Cluster Coordinator, so will stop monitoring heartbeats"); - stop(); - } - + // failed to fetch heartbeats; don't change anything. + clusterCoordinator.reportEvent(null, Severity.INFO, "Failed to retrieve any new heartbeat information for nodes. " + + "Will not make any decisions based on heartbeats."); return; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java index bfe528df88..672490173a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java @@ -98,13 +98,10 @@ public interface RequestReplicator { * @param headers any HTTP headers * @param indicateReplicated if true, will add a header indicating to the receiving nodes that the request * has already been replicated, so the receiving node will not replicate the request itself. - * @param performVerification if true, and the request is mutable, will verify that all nodes are connected before - * making the request and that all nodes are able to perform the request before acutally attempting to perform the task. - * If false, will perform no such verification * * @return an AsyncClusterResponse that indicates the current status of the request and provides an identifier for obtaining an updated response later */ - AsyncClusterResponse replicate(Set nodeIds, String method, URI uri, Object entity, Map headers, boolean indicateReplicated, boolean performVerification); + AsyncClusterResponse replicate(Set nodeIds, String method, URI uri, Object entity, Map headers, boolean indicateReplicated); /** *

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java index c5a8af54a5..f4fcc851a2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java @@ -211,12 +211,11 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { final Set nodeIdSet = new HashSet<>(nodeIds); - return replicate(nodeIdSet, method, uri, entity, headers, true, true); + return replicate(nodeIdSet, method, uri, entity, headers, true); } @Override - public AsyncClusterResponse replicate(Set nodeIds, String method, URI uri, Object entity, Map headers, - final boolean indicateReplicated, final boolean performVerification) { + public AsyncClusterResponse replicate(Set nodeIds, String method, URI uri, Object entity, Map headers, final boolean indicateReplicated) { final Map updatedHeaders = new HashMap<>(headers); updatedHeaders.put(RequestReplicator.CLUSTER_ID_GENERATION_SEED_HEADER, TypeOneUUIDGenerator.generateId().toString()); @@ -243,12 +242,12 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { lock.lock(); try { logger.debug("Lock {} obtained in order to replicate request {} {}", method, uri); - return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null); + return replicate(nodeIds, method, uri, entity, updatedHeaders, true, null); } finally { lock.unlock(); } } else { - return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null); + return replicate(nodeIds, method, uri, entity, updatedHeaders, true, null); } } @@ -260,13 +259,13 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { * @param uri the URI to send the request to * @param entity the entity to use * @param headers the HTTP Headers - * @param performVerification whether or not to verify that all nodes in the cluster are connected and that all nodes can perform request. Ignored if request is not mutable. + * @param performVerification whether or not to use 2-phase commit to verify that all nodes can handle the request. Ignored if request is not mutable. * @param response the response to update with the results * * @return an AsyncClusterResponse that can be used to obtain the result */ private AsyncClusterResponse replicate(Set nodeIds, String method, URI uri, Object entity, Map headers, boolean performVerification, - StandardAsyncClusterResponse response) { + StandardAsyncClusterResponse response) { // state validation Objects.requireNonNull(nodeIds); @@ -299,7 +298,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { final String requestId = updatedHeaders.computeIfAbsent(REQUEST_TRANSACTION_ID_HEADER, key -> UUID.randomUUID().toString()); if (performVerification) { - verifyClusterState(method, uri.getPath()); + verifyState(method, uri.getPath()); } int numRequests = responseMap.size(); @@ -531,7 +530,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { * * @throw IllegalClusterStateException if the cluster is not in a state that allows a request to made to the given URI Path using the given HTTP Method */ - private void verifyClusterState(final String httpMethod, final String uriPath) { + private void verifyState(final String httpMethod, final String uriPath) { final boolean mutableRequest = HttpMethod.DELETE.equals(httpMethod) || HttpMethod.POST.equals(httpMethod) || HttpMethod.PUT.equals(httpMethod); // check that the request can be applied diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java index c98a8885b6..3f8fa76bf5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java @@ -30,7 +30,6 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.function.Supplier; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -144,17 +143,9 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl } private NodeIdentifier waitForLocalNodeIdentifier() { - return waitForNodeIdentifier(() -> getLocalNodeIdentifier()); - } - - private NodeIdentifier waitForElectedClusterCoordinator() { - return waitForNodeIdentifier(() -> getElectedActiveCoordinatorNode(false)); - } - - private NodeIdentifier waitForNodeIdentifier(final Supplier fetchNodeId) { NodeIdentifier localNodeId = null; while (localNodeId == null) { - localNodeId = fetchNodeId.get(); + localNodeId = getLocalNodeIdentifier(); if (localNodeId == null) { try { Thread.sleep(100L); @@ -523,27 +514,17 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl @Override public NodeIdentifier getElectedActiveCoordinatorNode() { - return getElectedActiveCoordinatorNode(true); - } - - private NodeIdentifier getElectedActiveCoordinatorNode(final boolean warnOnError) { final String electedNodeAddress; try { electedNodeAddress = getElectedActiveCoordinatorAddress(); } catch (final IOException ioe) { - if (warnOnError) { - logger.warn("Failed to determine which node is elected active Cluster Coordinator. There may be no coordinator currently:", ioe); - } - + logger.warn("Failed to determine which node is elected active Cluster Coordinator. There may be no coordinator currently:", ioe); return null; } final int colonLoc = electedNodeAddress.indexOf(':'); if (colonLoc < 1) { - if (warnOnError) { - logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but this is not a valid address", electedNodeAddress); - } - + logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but this is not a valid address", electedNodeAddress); return null; } @@ -553,10 +534,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl try { electedNodePort = Integer.parseInt(portString); } catch (final NumberFormatException nfe) { - if (warnOnError) { - logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but this is not a valid address", electedNodeAddress); - } - + logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but this is not a valid address", electedNodeAddress); return null; } @@ -566,7 +544,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl .findFirst() .orElse(null); - if (electedNodeId == null && warnOnError) { + if (electedNodeId == null) { logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but there is no node with this address", electedNodeAddress); } @@ -632,37 +610,16 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl logger.debug("State of cluster nodes is now {}", nodeStatuses); if (currentState == null || currentState != status.getState()) { - // We notify all nodes of the status change if either this node is the current cluster coordinator, OR if the node was - // the cluster coordinator and no longer is. This is done because if a user disconnects the cluster coordinator, we need - // to broadcast to the cluster that this node is no longer the coordinator. Otherwise, all nodes but this one will still - // believe that this node is connected to the cluster. - final boolean notifyAllNodes = isActiveClusterCoordinator() || (currentStatus != null && currentStatus.getRoles().contains(ClusterRoles.CLUSTER_COORDINATOR)); - notifyOthersOfNodeStatusChange(status, notifyAllNodes); + notifyOthersOfNodeStatusChange(status); } } + private void notifyOthersOfNodeStatusChange(final NodeConnectionStatus updatedStatus) { - notifyOthersOfNodeStatusChange(updatedStatus, isActiveClusterCoordinator()); - } + final Set nodesToNotify = getNodeIdentifiers(NodeConnectionState.CONNECTED, NodeConnectionState.CONNECTING); - /** - * Notifies other nodes that the status of a node changed - * - * @param updatedStatus the updated status for a node in the cluster - * @param notifyAllNodes if true will notify all nodes. If false, will notify only the cluster coordinator - */ - private void notifyOthersOfNodeStatusChange(final NodeConnectionStatus updatedStatus, final boolean notifyAllNodes) { - // If this node is the active cluster coordinator, then we are going to replicate to all nodes. - // Otherwise, get the active coordinator (or wait for one to become active) and then notify the coordinator. - final Set nodesToNotify; - if (notifyAllNodes) { - nodesToNotify = getNodeIdentifiers(NodeConnectionState.CONNECTED, NodeConnectionState.CONNECTING); - - // Do not notify ourselves because we already know about the status update. - nodesToNotify.remove(getLocalNodeIdentifier()); - } else { - nodesToNotify = Collections.singleton(waitForElectedClusterCoordinator()); - } + // Do not notify ourselves because we already know about the status update. + nodesToNotify.remove(getLocalNodeIdentifier()); final NodeStatusChangeMessage message = new NodeStatusChangeMessage(); message.setNodeId(updatedStatus.getNodeIdentifier()); @@ -810,10 +767,6 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl nodeId, updatedStatus, oldStatus); } } - - if (isActiveClusterCoordinator()) { - notifyOthersOfNodeStatusChange(statusChangeMessage.getNodeConnectionStatus()); - } } private NodeIdentifier resolveNodeId(final NodeIdentifier proposedIdentifier) { @@ -919,12 +872,6 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl */ @Override public void afterRequest(final String uriPath, final String method, final Set nodeResponses) { - // if we are not the active cluster coordinator, then we are not responsible for monitoring the responses, - // as the cluster coordinator is responsible for performing the actual request replication. - if (!isActiveClusterCoordinator()) { - return; - } - final boolean mutableRequest = isMutableRequest(method); /* diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java index 5eac84616c..2af3e88d9d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java @@ -87,7 +87,7 @@ public class TestThreadPoolRequestReplicator { final URI uri = new URI("http://localhost:8080/processors/1"); final Entity entity = new ProcessorEntity(); - final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true, true); + final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true); // We should get back the same response object assertTrue(response == replicator.getClusterResponse(response.getRequestIdentifier())); @@ -115,7 +115,7 @@ public class TestThreadPoolRequestReplicator { final URI uri = new URI("http://localhost:8080/processors/1"); final Entity entity = new ProcessorEntity(); - final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true, true); + final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true); // We should get back the same response object assertTrue(response == replicator.getClusterResponse(response.getRequestIdentifier())); @@ -151,7 +151,7 @@ public class TestThreadPoolRequestReplicator { final URI uri = new URI("http://localhost:8080/processors/1"); final Entity entity = new ProcessorEntity(); - final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true, true); + final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true); assertNotNull(response.awaitMergedResponse(1, TimeUnit.SECONDS)); } , null, 0L, new IllegalArgumentException("Exception created for unit test")); } @@ -191,7 +191,7 @@ public class TestThreadPoolRequestReplicator { try { final AsyncClusterResponse clusterResponse = replicator.replicate(nodeIds, HttpMethod.POST, - new URI("http://localhost:80/processors/1"), new ProcessorEntity(), new HashMap<>(), true, true); + new URI("http://localhost:80/processors/1"), new ProcessorEntity(), new HashMap<>(), true); clusterResponse.awaitMergedResponse(); // Ensure that we received two requests - the first should contain the X-NcmExpects header; the second should not. @@ -235,8 +235,7 @@ public class TestThreadPoolRequestReplicator { Mockito.when(coordinator.getConnectionStates()).thenReturn(nodeMap); final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null) { @Override - public AsyncClusterResponse replicate(Set nodeIds, String method, URI uri, Object entity, Map headers, - boolean indicateReplicated, boolean verify) { + public AsyncClusterResponse replicate(Set nodeIds, String method, URI uri, Object entity, Map headers, boolean indicateReplicated) { return null; } }; @@ -309,7 +308,7 @@ public class TestThreadPoolRequestReplicator { try { final AsyncClusterResponse clusterResponse = replicator.replicate(nodeIds, HttpMethod.POST, - new URI("http://localhost:80/processors/1"), new ProcessorEntity(), new HashMap<>(), true, true); + new URI("http://localhost:80/processors/1"), new ProcessorEntity(), new HashMap<>(), true); clusterResponse.awaitMergedResponse(); Assert.fail("Expected to get an IllegalClusterStateException but did not"); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 30e382d378..d6e9308040 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -3260,7 +3260,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private void registerForClusterCoordinator() { leaderElectionManager.register(ClusterRoles.CLUSTER_COORDINATOR, new LeaderElectionStateChangeListener() { @Override - public synchronized void onLeaderRelinquish() { + public void onLeaderRelinquish() { heartbeatMonitor.stop(); if (clusterCoordinator != null) { @@ -3269,7 +3269,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } @Override - public synchronized void onLeaderElection() { + public void onLeaderElection() { heartbeatMonitor.start(); if (clusterCoordinator != null) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java index 1786bceaba..e2b63a2daf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java @@ -98,7 +98,7 @@ public class StandardNiFiContentAccess implements ContentAccess { throw new NoClusterCoordinatorException(); } final Set coordinatorNodes = Collections.singleton(coordinatorNode); - nodeResponse = requestReplicator.replicate(coordinatorNodes, HttpMethod.GET, dataUri, parameters, headers, false, true).awaitMergedResponse(); + nodeResponse = requestReplicator.replicate(coordinatorNodes, HttpMethod.GET, dataUri, parameters, headers, false).awaitMergedResponse(); } catch (InterruptedException e) { throw new IllegalClusterStateException("Interrupted while waiting for a response from node"); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java index fd3c474b6c..641ed38bfa 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java @@ -297,7 +297,7 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration } final Set coordinatorNodes = Collections.singleton(coordinatorNode); - return requestReplicator.replicate(coordinatorNodes, method, uri, entity, headers, false, true).awaitMergedResponse(); + return requestReplicator.replicate(coordinatorNodes, method, uri, entity, headers, false).awaitMergedResponse(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java index 6a9e1d08f8..a49ed5d251 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java @@ -565,11 +565,11 @@ public abstract class ApplicationResource { // If we are to replicate directly to the nodes, we need to indicate that the replication source is // the cluster coordinator so that the node knows to service the request. final Set targetNodes = Collections.singleton(nodeId); - return requestReplicator.replicate(targetNodes, method, path, entity, headers, true, true).awaitMergedResponse().getResponse(); + return requestReplicator.replicate(targetNodes, method, path, entity, headers, true).awaitMergedResponse().getResponse(); } else { headers.put(RequestReplicator.REPLICATION_TARGET_NODE_UUID_HEADER, nodeId.getId()); return requestReplicator.replicate(Collections.singleton(getClusterCoordinatorNode()), method, - path, entity, headers, false, true).awaitMergedResponse().getResponse(); + path, entity, headers, false).awaitMergedResponse().getResponse(); } } catch (final InterruptedException ie) { return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Request to " + method + " " + path + " was interrupted").type("text/plain").build(); @@ -589,38 +589,23 @@ public abstract class ApplicationResource { return clusterCoordinator.isActiveClusterCoordinator() ? ReplicationTarget.CLUSTER_NODES : ReplicationTarget.CLUSTER_COORDINATOR; } - protected Response replicate(final String method, final NodeIdentifier targetNode) { - return replicate(method, targetNode, getRequestParameters()); - } - - protected Response replicate(final String method, final NodeIdentifier targetNode, final Object entity) { try { // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly // to the cluster nodes themselves. if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { final Set nodeIds = Collections.singleton(targetNode); - return getRequestReplicator().replicate(nodeIds, method, getAbsolutePath(), entity, getHeaders(), true, true).awaitMergedResponse().getResponse(); + return getRequestReplicator().replicate(nodeIds, method, getAbsolutePath(), getRequestParameters(), getHeaders(), true).awaitMergedResponse().getResponse(); } else { final Set coordinatorNode = Collections.singleton(getClusterCoordinatorNode()); final Map headers = getHeaders(Collections.singletonMap(RequestReplicator.REPLICATION_TARGET_NODE_UUID_HEADER, targetNode.getId())); - return getRequestReplicator().replicate(coordinatorNode, method, getAbsolutePath(), entity, headers, false, true).awaitMergedResponse().getResponse(); + return getRequestReplicator().replicate(coordinatorNode, method, getAbsolutePath(), getRequestParameters(), headers, false).awaitMergedResponse().getResponse(); } } catch (final InterruptedException ie) { return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Request to " + method + " " + getAbsolutePath() + " was interrupted").type("text/plain").build(); } } - protected Response replicateToCoordinator(final String method, final Object entity) { - try { - final NodeIdentifier coordinatorNode = getClusterCoordinatorNode(); - final Set coordinatorNodes = Collections.singleton(coordinatorNode); - return getRequestReplicator().replicate(coordinatorNodes, method, getAbsolutePath(), entity, getHeaders(), true, false).awaitMergedResponse().getResponse(); - } catch (final InterruptedException ie) { - return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Request to " + method + " " + getAbsolutePath() + " was interrupted").type("text/plain").build(); - } - } - /** * Convenience method for calling {@link #replicate(String, Object)} with an entity of * {@link #getRequestParameters() getRequestParameters(true)} @@ -700,7 +685,7 @@ public abstract class ApplicationResource { if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { return requestReplicator.replicate(method, path, entity, headers).awaitMergedResponse(); } else { - return requestReplicator.replicate(Collections.singleton(getClusterCoordinatorNode()), method, path, entity, headers, false, true).awaitMergedResponse(); + return requestReplicator.replicate(Collections.singleton(getClusterCoordinatorNode()), method, path, entity, headers, false).awaitMergedResponse(); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java index 7a5dab74f5..2f73c70905 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java @@ -548,10 +548,6 @@ public class ControllerResource extends ApplicationResource { + "not equal the node id of the requested resource (%s).", requestNodeDTO.getNodeId(), id)); } - if (isReplicateRequest()) { - return replicateToCoordinator(HttpMethod.PUT, nodeEntity); - } - // update the node final NodeDTO node = serviceFacade.updateNode(requestNodeDTO); @@ -604,10 +600,6 @@ public class ControllerResource extends ApplicationResource { throw new IllegalClusterResourceRequestException("Only a node connected to a cluster can process the request."); } - if (isReplicateRequest()) { - return replicateToCoordinator(HttpMethod.DELETE, getRequestParameters()); - } - serviceFacade.deleteNode(id); // create the response entity diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/CountersResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/CountersResource.java index 26a708c81d..8dfe417fe2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/CountersResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/CountersResource.java @@ -168,7 +168,7 @@ public class CountersResource extends ApplicationResource { nodeResponse = getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders()).awaitMergedResponse(); } else { final Set coordinatorNode = Collections.singleton(getClusterCoordinatorNode()); - nodeResponse = getRequestReplicator().replicate(coordinatorNode, HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders(), false, true).awaitMergedResponse(); + nodeResponse = getRequestReplicator().replicate(coordinatorNode, HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders(), false).awaitMergedResponse(); } final CountersEntity entity = (CountersEntity) nodeResponse.getUpdatedEntity(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java index a4933a58fb..1ea373658a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java @@ -1980,7 +1980,7 @@ public class ProcessGroupResource extends ApplicationResource { return getRequestReplicator().replicate(HttpMethod.POST, importUri, entity, getHeaders(headersToOverride)).awaitMergedResponse().getResponse(); } else { final Set coordinatorNode = Collections.singleton(getClusterCoordinatorNode()); - return getRequestReplicator().replicate(coordinatorNode, HttpMethod.POST, importUri, entity, getHeaders(headersToOverride), false, true).awaitMergedResponse().getResponse(); + return getRequestReplicator().replicate(coordinatorNode, HttpMethod.POST, importUri, entity, getHeaders(headersToOverride), false).awaitMergedResponse().getResponse(); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java index d9db9928b2..8a093d0904 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java @@ -147,7 +147,7 @@ public class SystemDiagnosticsResource extends ApplicationResource { nodeResponse = getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders()).awaitMergedResponse(); } else { final Set coordinatorNode = Collections.singleton(getClusterCoordinatorNode()); - nodeResponse = getRequestReplicator().replicate(coordinatorNode, HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders(), false, true).awaitMergedResponse(); + nodeResponse = getRequestReplicator().replicate(coordinatorNode, HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders(), false).awaitMergedResponse(); } final SystemDiagnosticsEntity entity = (SystemDiagnosticsEntity) nodeResponse.getUpdatedEntity();