From b082858595c8207b124c52c765a6b93a0188d643 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Mon, 25 Jul 2016 12:35:43 -0400 Subject: [PATCH] Revert "NIFI-2292: Funnel all cluster node status changes through the cluster coordinator instead of having each node broadcast changes to the whole cluster. This gives us the ability to increment the updateId consistently without race conditions." This reverts commit 35ff0975bf1193a1c787af365e15a4e0da970ed9. --- .../ClusterCoordinationProtocolSender.java | 2 +- .../heartbeat/AbstractHeartbeatMonitor.java | 8 +- .../http/replication/RequestReplicator.java | 5 +- .../ThreadPoolRequestReplicator.java | 17 ++--- .../node/NodeClusterCoordinator.java | 73 +++---------------- .../TestThreadPoolRequestReplicator.java | 13 ++-- .../nifi/controller/FlowController.java | 4 +- .../nifi/web/StandardNiFiContentAccess.java | 2 +- .../StandardNiFiWebConfigurationContext.java | 2 +- .../nifi/web/api/ApplicationResource.java | 25 ++----- .../nifi/web/api/ControllerResource.java | 8 -- .../apache/nifi/web/api/CountersResource.java | 2 +- .../nifi/web/api/ProcessGroupResource.java | 2 +- .../web/api/SystemDiagnosticsResource.java | 2 +- 14 files changed, 41 insertions(+), 124 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java index b49f57ca79..a1af0f8f5c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java @@ -25,7 +25,7 @@ import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage; import org.apache.nifi.reporting.BulletinRepository; /** - * An interface for sending protocol messages from the cluster coordinator to nodes. + * An interface for sending protocol messages from the cluster manager to nodes. * */ public interface ClusterCoordinationProtocolSender { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java index 3bf5c0e37b..be9559dad9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java @@ -126,11 +126,9 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor { protected synchronized void monitorHeartbeats() { final Map latestHeartbeats = getLatestHeartbeats(); if (latestHeartbeats == null || latestHeartbeats.isEmpty()) { - if (!clusterCoordinator.isActiveClusterCoordinator()) { - logger.info("This node is no longer the Cluster Coordinator, so will stop monitoring heartbeats"); - stop(); - } - + // failed to fetch heartbeats; don't change anything. + clusterCoordinator.reportEvent(null, Severity.INFO, "Failed to retrieve any new heartbeat information for nodes. " + + "Will not make any decisions based on heartbeats."); return; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java index bfe528df88..672490173a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java @@ -98,13 +98,10 @@ public interface RequestReplicator { * @param headers any HTTP headers * @param indicateReplicated if true, will add a header indicating to the receiving nodes that the request * has already been replicated, so the receiving node will not replicate the request itself. - * @param performVerification if true, and the request is mutable, will verify that all nodes are connected before - * making the request and that all nodes are able to perform the request before acutally attempting to perform the task. - * If false, will perform no such verification * * @return an AsyncClusterResponse that indicates the current status of the request and provides an identifier for obtaining an updated response later */ - AsyncClusterResponse replicate(Set nodeIds, String method, URI uri, Object entity, Map headers, boolean indicateReplicated, boolean performVerification); + AsyncClusterResponse replicate(Set nodeIds, String method, URI uri, Object entity, Map headers, boolean indicateReplicated); /** *

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java index c5a8af54a5..f4fcc851a2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java @@ -211,12 +211,11 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { final Set nodeIdSet = new HashSet<>(nodeIds); - return replicate(nodeIdSet, method, uri, entity, headers, true, true); + return replicate(nodeIdSet, method, uri, entity, headers, true); } @Override - public AsyncClusterResponse replicate(Set nodeIds, String method, URI uri, Object entity, Map headers, - final boolean indicateReplicated, final boolean performVerification) { + public AsyncClusterResponse replicate(Set nodeIds, String method, URI uri, Object entity, Map headers, final boolean indicateReplicated) { final Map updatedHeaders = new HashMap<>(headers); updatedHeaders.put(RequestReplicator.CLUSTER_ID_GENERATION_SEED_HEADER, TypeOneUUIDGenerator.generateId().toString()); @@ -243,12 +242,12 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { lock.lock(); try { logger.debug("Lock {} obtained in order to replicate request {} {}", method, uri); - return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null); + return replicate(nodeIds, method, uri, entity, updatedHeaders, true, null); } finally { lock.unlock(); } } else { - return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null); + return replicate(nodeIds, method, uri, entity, updatedHeaders, true, null); } } @@ -260,13 +259,13 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { * @param uri the URI to send the request to * @param entity the entity to use * @param headers the HTTP Headers - * @param performVerification whether or not to verify that all nodes in the cluster are connected and that all nodes can perform request. Ignored if request is not mutable. + * @param performVerification whether or not to use 2-phase commit to verify that all nodes can handle the request. Ignored if request is not mutable. * @param response the response to update with the results * * @return an AsyncClusterResponse that can be used to obtain the result */ private AsyncClusterResponse replicate(Set nodeIds, String method, URI uri, Object entity, Map headers, boolean performVerification, - StandardAsyncClusterResponse response) { + StandardAsyncClusterResponse response) { // state validation Objects.requireNonNull(nodeIds); @@ -299,7 +298,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { final String requestId = updatedHeaders.computeIfAbsent(REQUEST_TRANSACTION_ID_HEADER, key -> UUID.randomUUID().toString()); if (performVerification) { - verifyClusterState(method, uri.getPath()); + verifyState(method, uri.getPath()); } int numRequests = responseMap.size(); @@ -531,7 +530,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { * * @throw IllegalClusterStateException if the cluster is not in a state that allows a request to made to the given URI Path using the given HTTP Method */ - private void verifyClusterState(final String httpMethod, final String uriPath) { + private void verifyState(final String httpMethod, final String uriPath) { final boolean mutableRequest = HttpMethod.DELETE.equals(httpMethod) || HttpMethod.POST.equals(httpMethod) || HttpMethod.PUT.equals(httpMethod); // check that the request can be applied diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java index c98a8885b6..3f8fa76bf5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java @@ -30,7 +30,6 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.function.Supplier; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -144,17 +143,9 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl } private NodeIdentifier waitForLocalNodeIdentifier() { - return waitForNodeIdentifier(() -> getLocalNodeIdentifier()); - } - - private NodeIdentifier waitForElectedClusterCoordinator() { - return waitForNodeIdentifier(() -> getElectedActiveCoordinatorNode(false)); - } - - private NodeIdentifier waitForNodeIdentifier(final Supplier fetchNodeId) { NodeIdentifier localNodeId = null; while (localNodeId == null) { - localNodeId = fetchNodeId.get(); + localNodeId = getLocalNodeIdentifier(); if (localNodeId == null) { try { Thread.sleep(100L); @@ -523,27 +514,17 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl @Override public NodeIdentifier getElectedActiveCoordinatorNode() { - return getElectedActiveCoordinatorNode(true); - } - - private NodeIdentifier getElectedActiveCoordinatorNode(final boolean warnOnError) { final String electedNodeAddress; try { electedNodeAddress = getElectedActiveCoordinatorAddress(); } catch (final IOException ioe) { - if (warnOnError) { - logger.warn("Failed to determine which node is elected active Cluster Coordinator. There may be no coordinator currently:", ioe); - } - + logger.warn("Failed to determine which node is elected active Cluster Coordinator. There may be no coordinator currently:", ioe); return null; } final int colonLoc = electedNodeAddress.indexOf(':'); if (colonLoc < 1) { - if (warnOnError) { - logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but this is not a valid address", electedNodeAddress); - } - + logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but this is not a valid address", electedNodeAddress); return null; } @@ -553,10 +534,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl try { electedNodePort = Integer.parseInt(portString); } catch (final NumberFormatException nfe) { - if (warnOnError) { - logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but this is not a valid address", electedNodeAddress); - } - + logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but this is not a valid address", electedNodeAddress); return null; } @@ -566,7 +544,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl .findFirst() .orElse(null); - if (electedNodeId == null && warnOnError) { + if (electedNodeId == null) { logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but there is no node with this address", electedNodeAddress); } @@ -632,37 +610,16 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl logger.debug("State of cluster nodes is now {}", nodeStatuses); if (currentState == null || currentState != status.getState()) { - // We notify all nodes of the status change if either this node is the current cluster coordinator, OR if the node was - // the cluster coordinator and no longer is. This is done because if a user disconnects the cluster coordinator, we need - // to broadcast to the cluster that this node is no longer the coordinator. Otherwise, all nodes but this one will still - // believe that this node is connected to the cluster. - final boolean notifyAllNodes = isActiveClusterCoordinator() || (currentStatus != null && currentStatus.getRoles().contains(ClusterRoles.CLUSTER_COORDINATOR)); - notifyOthersOfNodeStatusChange(status, notifyAllNodes); + notifyOthersOfNodeStatusChange(status); } } + private void notifyOthersOfNodeStatusChange(final NodeConnectionStatus updatedStatus) { - notifyOthersOfNodeStatusChange(updatedStatus, isActiveClusterCoordinator()); - } + final Set nodesToNotify = getNodeIdentifiers(NodeConnectionState.CONNECTED, NodeConnectionState.CONNECTING); - /** - * Notifies other nodes that the status of a node changed - * - * @param updatedStatus the updated status for a node in the cluster - * @param notifyAllNodes if true will notify all nodes. If false, will notify only the cluster coordinator - */ - private void notifyOthersOfNodeStatusChange(final NodeConnectionStatus updatedStatus, final boolean notifyAllNodes) { - // If this node is the active cluster coordinator, then we are going to replicate to all nodes. - // Otherwise, get the active coordinator (or wait for one to become active) and then notify the coordinator. - final Set nodesToNotify; - if (notifyAllNodes) { - nodesToNotify = getNodeIdentifiers(NodeConnectionState.CONNECTED, NodeConnectionState.CONNECTING); - - // Do not notify ourselves because we already know about the status update. - nodesToNotify.remove(getLocalNodeIdentifier()); - } else { - nodesToNotify = Collections.singleton(waitForElectedClusterCoordinator()); - } + // Do not notify ourselves because we already know about the status update. + nodesToNotify.remove(getLocalNodeIdentifier()); final NodeStatusChangeMessage message = new NodeStatusChangeMessage(); message.setNodeId(updatedStatus.getNodeIdentifier()); @@ -810,10 +767,6 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl nodeId, updatedStatus, oldStatus); } } - - if (isActiveClusterCoordinator()) { - notifyOthersOfNodeStatusChange(statusChangeMessage.getNodeConnectionStatus()); - } } private NodeIdentifier resolveNodeId(final NodeIdentifier proposedIdentifier) { @@ -919,12 +872,6 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl */ @Override public void afterRequest(final String uriPath, final String method, final Set nodeResponses) { - // if we are not the active cluster coordinator, then we are not responsible for monitoring the responses, - // as the cluster coordinator is responsible for performing the actual request replication. - if (!isActiveClusterCoordinator()) { - return; - } - final boolean mutableRequest = isMutableRequest(method); /* diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java index 5eac84616c..2af3e88d9d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java @@ -87,7 +87,7 @@ public class TestThreadPoolRequestReplicator { final URI uri = new URI("http://localhost:8080/processors/1"); final Entity entity = new ProcessorEntity(); - final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true, true); + final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true); // We should get back the same response object assertTrue(response == replicator.getClusterResponse(response.getRequestIdentifier())); @@ -115,7 +115,7 @@ public class TestThreadPoolRequestReplicator { final URI uri = new URI("http://localhost:8080/processors/1"); final Entity entity = new ProcessorEntity(); - final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true, true); + final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true); // We should get back the same response object assertTrue(response == replicator.getClusterResponse(response.getRequestIdentifier())); @@ -151,7 +151,7 @@ public class TestThreadPoolRequestReplicator { final URI uri = new URI("http://localhost:8080/processors/1"); final Entity entity = new ProcessorEntity(); - final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true, true); + final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true); assertNotNull(response.awaitMergedResponse(1, TimeUnit.SECONDS)); } , null, 0L, new IllegalArgumentException("Exception created for unit test")); } @@ -191,7 +191,7 @@ public class TestThreadPoolRequestReplicator { try { final AsyncClusterResponse clusterResponse = replicator.replicate(nodeIds, HttpMethod.POST, - new URI("http://localhost:80/processors/1"), new ProcessorEntity(), new HashMap<>(), true, true); + new URI("http://localhost:80/processors/1"), new ProcessorEntity(), new HashMap<>(), true); clusterResponse.awaitMergedResponse(); // Ensure that we received two requests - the first should contain the X-NcmExpects header; the second should not. @@ -235,8 +235,7 @@ public class TestThreadPoolRequestReplicator { Mockito.when(coordinator.getConnectionStates()).thenReturn(nodeMap); final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null) { @Override - public AsyncClusterResponse replicate(Set nodeIds, String method, URI uri, Object entity, Map headers, - boolean indicateReplicated, boolean verify) { + public AsyncClusterResponse replicate(Set nodeIds, String method, URI uri, Object entity, Map headers, boolean indicateReplicated) { return null; } }; @@ -309,7 +308,7 @@ public class TestThreadPoolRequestReplicator { try { final AsyncClusterResponse clusterResponse = replicator.replicate(nodeIds, HttpMethod.POST, - new URI("http://localhost:80/processors/1"), new ProcessorEntity(), new HashMap<>(), true, true); + new URI("http://localhost:80/processors/1"), new ProcessorEntity(), new HashMap<>(), true); clusterResponse.awaitMergedResponse(); Assert.fail("Expected to get an IllegalClusterStateException but did not"); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 30e382d378..d6e9308040 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -3260,7 +3260,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private void registerForClusterCoordinator() { leaderElectionManager.register(ClusterRoles.CLUSTER_COORDINATOR, new LeaderElectionStateChangeListener() { @Override - public synchronized void onLeaderRelinquish() { + public void onLeaderRelinquish() { heartbeatMonitor.stop(); if (clusterCoordinator != null) { @@ -3269,7 +3269,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } @Override - public synchronized void onLeaderElection() { + public void onLeaderElection() { heartbeatMonitor.start(); if (clusterCoordinator != null) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java index 1786bceaba..e2b63a2daf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java @@ -98,7 +98,7 @@ public class StandardNiFiContentAccess implements ContentAccess { throw new NoClusterCoordinatorException(); } final Set coordinatorNodes = Collections.singleton(coordinatorNode); - nodeResponse = requestReplicator.replicate(coordinatorNodes, HttpMethod.GET, dataUri, parameters, headers, false, true).awaitMergedResponse(); + nodeResponse = requestReplicator.replicate(coordinatorNodes, HttpMethod.GET, dataUri, parameters, headers, false).awaitMergedResponse(); } catch (InterruptedException e) { throw new IllegalClusterStateException("Interrupted while waiting for a response from node"); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java index fd3c474b6c..641ed38bfa 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java @@ -297,7 +297,7 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration } final Set coordinatorNodes = Collections.singleton(coordinatorNode); - return requestReplicator.replicate(coordinatorNodes, method, uri, entity, headers, false, true).awaitMergedResponse(); + return requestReplicator.replicate(coordinatorNodes, method, uri, entity, headers, false).awaitMergedResponse(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java index 6a9e1d08f8..a49ed5d251 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java @@ -565,11 +565,11 @@ public abstract class ApplicationResource { // If we are to replicate directly to the nodes, we need to indicate that the replication source is // the cluster coordinator so that the node knows to service the request. final Set targetNodes = Collections.singleton(nodeId); - return requestReplicator.replicate(targetNodes, method, path, entity, headers, true, true).awaitMergedResponse().getResponse(); + return requestReplicator.replicate(targetNodes, method, path, entity, headers, true).awaitMergedResponse().getResponse(); } else { headers.put(RequestReplicator.REPLICATION_TARGET_NODE_UUID_HEADER, nodeId.getId()); return requestReplicator.replicate(Collections.singleton(getClusterCoordinatorNode()), method, - path, entity, headers, false, true).awaitMergedResponse().getResponse(); + path, entity, headers, false).awaitMergedResponse().getResponse(); } } catch (final InterruptedException ie) { return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Request to " + method + " " + path + " was interrupted").type("text/plain").build(); @@ -589,38 +589,23 @@ public abstract class ApplicationResource { return clusterCoordinator.isActiveClusterCoordinator() ? ReplicationTarget.CLUSTER_NODES : ReplicationTarget.CLUSTER_COORDINATOR; } - protected Response replicate(final String method, final NodeIdentifier targetNode) { - return replicate(method, targetNode, getRequestParameters()); - } - - protected Response replicate(final String method, final NodeIdentifier targetNode, final Object entity) { try { // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly // to the cluster nodes themselves. if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { final Set nodeIds = Collections.singleton(targetNode); - return getRequestReplicator().replicate(nodeIds, method, getAbsolutePath(), entity, getHeaders(), true, true).awaitMergedResponse().getResponse(); + return getRequestReplicator().replicate(nodeIds, method, getAbsolutePath(), getRequestParameters(), getHeaders(), true).awaitMergedResponse().getResponse(); } else { final Set coordinatorNode = Collections.singleton(getClusterCoordinatorNode()); final Map headers = getHeaders(Collections.singletonMap(RequestReplicator.REPLICATION_TARGET_NODE_UUID_HEADER, targetNode.getId())); - return getRequestReplicator().replicate(coordinatorNode, method, getAbsolutePath(), entity, headers, false, true).awaitMergedResponse().getResponse(); + return getRequestReplicator().replicate(coordinatorNode, method, getAbsolutePath(), getRequestParameters(), headers, false).awaitMergedResponse().getResponse(); } } catch (final InterruptedException ie) { return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Request to " + method + " " + getAbsolutePath() + " was interrupted").type("text/plain").build(); } } - protected Response replicateToCoordinator(final String method, final Object entity) { - try { - final NodeIdentifier coordinatorNode = getClusterCoordinatorNode(); - final Set coordinatorNodes = Collections.singleton(coordinatorNode); - return getRequestReplicator().replicate(coordinatorNodes, method, getAbsolutePath(), entity, getHeaders(), true, false).awaitMergedResponse().getResponse(); - } catch (final InterruptedException ie) { - return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Request to " + method + " " + getAbsolutePath() + " was interrupted").type("text/plain").build(); - } - } - /** * Convenience method for calling {@link #replicate(String, Object)} with an entity of * {@link #getRequestParameters() getRequestParameters(true)} @@ -700,7 +685,7 @@ public abstract class ApplicationResource { if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { return requestReplicator.replicate(method, path, entity, headers).awaitMergedResponse(); } else { - return requestReplicator.replicate(Collections.singleton(getClusterCoordinatorNode()), method, path, entity, headers, false, true).awaitMergedResponse(); + return requestReplicator.replicate(Collections.singleton(getClusterCoordinatorNode()), method, path, entity, headers, false).awaitMergedResponse(); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java index 7a5dab74f5..2f73c70905 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java @@ -548,10 +548,6 @@ public class ControllerResource extends ApplicationResource { + "not equal the node id of the requested resource (%s).", requestNodeDTO.getNodeId(), id)); } - if (isReplicateRequest()) { - return replicateToCoordinator(HttpMethod.PUT, nodeEntity); - } - // update the node final NodeDTO node = serviceFacade.updateNode(requestNodeDTO); @@ -604,10 +600,6 @@ public class ControllerResource extends ApplicationResource { throw new IllegalClusterResourceRequestException("Only a node connected to a cluster can process the request."); } - if (isReplicateRequest()) { - return replicateToCoordinator(HttpMethod.DELETE, getRequestParameters()); - } - serviceFacade.deleteNode(id); // create the response entity diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/CountersResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/CountersResource.java index 26a708c81d..8dfe417fe2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/CountersResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/CountersResource.java @@ -168,7 +168,7 @@ public class CountersResource extends ApplicationResource { nodeResponse = getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders()).awaitMergedResponse(); } else { final Set coordinatorNode = Collections.singleton(getClusterCoordinatorNode()); - nodeResponse = getRequestReplicator().replicate(coordinatorNode, HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders(), false, true).awaitMergedResponse(); + nodeResponse = getRequestReplicator().replicate(coordinatorNode, HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders(), false).awaitMergedResponse(); } final CountersEntity entity = (CountersEntity) nodeResponse.getUpdatedEntity(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java index a4933a58fb..1ea373658a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java @@ -1980,7 +1980,7 @@ public class ProcessGroupResource extends ApplicationResource { return getRequestReplicator().replicate(HttpMethod.POST, importUri, entity, getHeaders(headersToOverride)).awaitMergedResponse().getResponse(); } else { final Set coordinatorNode = Collections.singleton(getClusterCoordinatorNode()); - return getRequestReplicator().replicate(coordinatorNode, HttpMethod.POST, importUri, entity, getHeaders(headersToOverride), false, true).awaitMergedResponse().getResponse(); + return getRequestReplicator().replicate(coordinatorNode, HttpMethod.POST, importUri, entity, getHeaders(headersToOverride), false).awaitMergedResponse().getResponse(); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java index d9db9928b2..8a093d0904 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java @@ -147,7 +147,7 @@ public class SystemDiagnosticsResource extends ApplicationResource { nodeResponse = getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders()).awaitMergedResponse(); } else { final Set coordinatorNode = Collections.singleton(getClusterCoordinatorNode()); - nodeResponse = getRequestReplicator().replicate(coordinatorNode, HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders(), false, true).awaitMergedResponse(); + nodeResponse = getRequestReplicator().replicate(coordinatorNode, HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders(), false).awaitMergedResponse(); } final SystemDiagnosticsEntity entity = (SystemDiagnosticsEntity) nodeResponse.getUpdatedEntity();