diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java index a2e17b5c9e..4894fc5396 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java @@ -172,6 +172,11 @@ public interface ClusterCoordinator { */ NodeIdentifier getElectedActiveCoordinatorNode(); + /** + * @return the identifier of this node, if it is known, null if the Node Identifier has not yet been established. + */ + NodeIdentifier getLocalNodeIdentifier(); + /** * @return true if this node has been elected the active cluster coordinator, false otherwise. */ diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/DisconnectionCode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/DisconnectionCode.java index ae18699c38..48ca41d554 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/DisconnectionCode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/DisconnectionCode.java @@ -67,6 +67,11 @@ public enum DisconnectionCode { */ FAILED_TO_SERVICE_REQUEST("Failed to Service Request"), + /** + * Coordinator received a heartbeat from node, but the node is disconnected from the cluster + */ + HEARTBEAT_RECEIVED_FROM_DISCONNECTED_NODE("Heartbeat Received from Disconnected Node"), + /** * Node is being shut down */ diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java index a1af0f8f5c..b49f57ca79 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java @@ -25,7 +25,7 @@ import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage; import org.apache.nifi.reporting.BulletinRepository; /** - * An interface for sending protocol messages from the cluster manager to nodes. + * An interface for sending protocol messages from the cluster coordinator to nodes. * */ public interface ClusterCoordinationProtocolSender { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java index be9559dad9..0bd84d6d42 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java @@ -126,9 +126,7 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor { protected synchronized void monitorHeartbeats() { final Map latestHeartbeats = getLatestHeartbeats(); if (latestHeartbeats == null || latestHeartbeats.isEmpty()) { - // failed to fetch heartbeats; don't change anything. - clusterCoordinator.reportEvent(null, Severity.INFO, "Failed to retrieve any new heartbeat information for nodes. " - + "Will not make any decisions based on heartbeats."); + logger.debug("Received no new heartbeats. Will not disconnect any nodes due to lack of heartbeat"); return; } @@ -213,7 +211,7 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor { } else { // disconnected nodes should not heartbeat, so we need to issue a disconnection request. logger.info("Ignoring received heartbeat from disconnected node " + nodeId + ". Issuing disconnection request."); - clusterCoordinator.requestNodeDisconnect(nodeId, connectionStatus.getDisconnectCode(), connectionStatus.getDisconnectReason()); + clusterCoordinator.requestNodeDisconnect(nodeId, DisconnectionCode.HEARTBEAT_RECEIVED_FROM_DISCONNECTED_NODE, DisconnectionCode.HEARTBEAT_RECEIVED_FROM_DISCONNECTED_NODE.toString()); removeHeartbeat(nodeId); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java index 672490173a..bfe528df88 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java @@ -98,10 +98,13 @@ public interface RequestReplicator { * @param headers any HTTP headers * @param indicateReplicated if true, will add a header indicating to the receiving nodes that the request * has already been replicated, so the receiving node will not replicate the request itself. + * @param performVerification if true, and the request is mutable, will verify that all nodes are connected before + * making the request and that all nodes are able to perform the request before acutally attempting to perform the task. + * If false, will perform no such verification * * @return an AsyncClusterResponse that indicates the current status of the request and provides an identifier for obtaining an updated response later */ - AsyncClusterResponse replicate(Set nodeIds, String method, URI uri, Object entity, Map headers, boolean indicateReplicated); + AsyncClusterResponse replicate(Set nodeIds, String method, URI uri, Object entity, Map headers, boolean indicateReplicated, boolean performVerification); /** *

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java index f4fcc851a2..c5a8af54a5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java @@ -211,11 +211,12 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { final Set nodeIdSet = new HashSet<>(nodeIds); - return replicate(nodeIdSet, method, uri, entity, headers, true); + return replicate(nodeIdSet, method, uri, entity, headers, true, true); } @Override - public AsyncClusterResponse replicate(Set nodeIds, String method, URI uri, Object entity, Map headers, final boolean indicateReplicated) { + public AsyncClusterResponse replicate(Set nodeIds, String method, URI uri, Object entity, Map headers, + final boolean indicateReplicated, final boolean performVerification) { final Map updatedHeaders = new HashMap<>(headers); updatedHeaders.put(RequestReplicator.CLUSTER_ID_GENERATION_SEED_HEADER, TypeOneUUIDGenerator.generateId().toString()); @@ -242,12 +243,12 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { lock.lock(); try { logger.debug("Lock {} obtained in order to replicate request {} {}", method, uri); - return replicate(nodeIds, method, uri, entity, updatedHeaders, true, null); + return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null); } finally { lock.unlock(); } } else { - return replicate(nodeIds, method, uri, entity, updatedHeaders, true, null); + return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null); } } @@ -259,13 +260,13 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { * @param uri the URI to send the request to * @param entity the entity to use * @param headers the HTTP Headers - * @param performVerification whether or not to use 2-phase commit to verify that all nodes can handle the request. Ignored if request is not mutable. + * @param performVerification whether or not to verify that all nodes in the cluster are connected and that all nodes can perform request. Ignored if request is not mutable. * @param response the response to update with the results * * @return an AsyncClusterResponse that can be used to obtain the result */ private AsyncClusterResponse replicate(Set nodeIds, String method, URI uri, Object entity, Map headers, boolean performVerification, - StandardAsyncClusterResponse response) { + StandardAsyncClusterResponse response) { // state validation Objects.requireNonNull(nodeIds); @@ -298,7 +299,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { final String requestId = updatedHeaders.computeIfAbsent(REQUEST_TRANSACTION_ID_HEADER, key -> UUID.randomUUID().toString()); if (performVerification) { - verifyState(method, uri.getPath()); + verifyClusterState(method, uri.getPath()); } int numRequests = responseMap.size(); @@ -530,7 +531,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { * * @throw IllegalClusterStateException if the cluster is not in a state that allows a request to made to the given URI Path using the given HTTP Method */ - private void verifyState(final String httpMethod, final String uriPath) { + private void verifyClusterState(final String httpMethod, final String uriPath) { final boolean mutableRequest = HttpMethod.DELETE.equals(httpMethod) || HttpMethod.POST.equals(httpMethod) || HttpMethod.PUT.equals(httpMethod); // check that the request can be applied diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java index 3f8fa76bf5..b31530f631 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java @@ -30,6 +30,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.function.Supplier; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -138,14 +139,23 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl this.nodeId = nodeId; } - NodeIdentifier getLocalNodeIdentifier() { + @Override + public NodeIdentifier getLocalNodeIdentifier() { return nodeId; } private NodeIdentifier waitForLocalNodeIdentifier() { + return waitForNodeIdentifier(() -> getLocalNodeIdentifier()); + } + + private NodeIdentifier waitForElectedClusterCoordinator() { + return waitForNodeIdentifier(() -> getElectedActiveCoordinatorNode(false)); + } + + private NodeIdentifier waitForNodeIdentifier(final Supplier fetchNodeId) { NodeIdentifier localNodeId = null; while (localNodeId == null) { - localNodeId = getLocalNodeIdentifier(); + localNodeId = fetchNodeId.get(); if (localNodeId == null) { try { Thread.sleep(100L); @@ -279,8 +289,8 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl @Override public void requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) { - final int numConnected = getNodeIdentifiers(NodeConnectionState.CONNECTED).size(); - if (numConnected == 1) { + final Set connectedNodeIds = getNodeIdentifiers(NodeConnectionState.CONNECTED); + if (connectedNodeIds.size() == 1 && connectedNodeIds.contains(nodeId)) { throw new IllegalNodeDisconnectionException("Cannot disconnect node " + nodeId + " because it is the only node currently connected"); } @@ -514,17 +524,27 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl @Override public NodeIdentifier getElectedActiveCoordinatorNode() { + return getElectedActiveCoordinatorNode(true); + } + + private NodeIdentifier getElectedActiveCoordinatorNode(final boolean warnOnError) { final String electedNodeAddress; try { electedNodeAddress = getElectedActiveCoordinatorAddress(); } catch (final IOException ioe) { - logger.warn("Failed to determine which node is elected active Cluster Coordinator. There may be no coordinator currently:", ioe); + if (warnOnError) { + logger.warn("Failed to determine which node is elected active Cluster Coordinator. There may be no coordinator currently:", ioe); + } + return null; } final int colonLoc = electedNodeAddress.indexOf(':'); if (colonLoc < 1) { - logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but this is not a valid address", electedNodeAddress); + if (warnOnError) { + logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but this is not a valid address", electedNodeAddress); + } + return null; } @@ -534,7 +554,10 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl try { electedNodePort = Integer.parseInt(portString); } catch (final NumberFormatException nfe) { - logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but this is not a valid address", electedNodeAddress); + if (warnOnError) { + logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but this is not a valid address", electedNodeAddress); + } + return null; } @@ -544,7 +567,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl .findFirst() .orElse(null); - if (electedNodeId == null) { + if (electedNodeId == null && warnOnError) { logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but there is no node with this address", electedNodeAddress); } @@ -610,16 +633,37 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl logger.debug("State of cluster nodes is now {}", nodeStatuses); if (currentState == null || currentState != status.getState()) { - notifyOthersOfNodeStatusChange(status); + // We notify all nodes of the status change if either this node is the current cluster coordinator, OR if the node was + // the cluster coordinator and no longer is. This is done because if a user disconnects the cluster coordinator, we need + // to broadcast to the cluster that this node is no longer the coordinator. Otherwise, all nodes but this one will still + // believe that this node is connected to the cluster. + final boolean notifyAllNodes = isActiveClusterCoordinator() || (currentStatus != null && currentStatus.getRoles().contains(ClusterRoles.CLUSTER_COORDINATOR)); + notifyOthersOfNodeStatusChange(status, notifyAllNodes); } } + void notifyOthersOfNodeStatusChange(final NodeConnectionStatus updatedStatus) { + notifyOthersOfNodeStatusChange(updatedStatus, isActiveClusterCoordinator()); + } - private void notifyOthersOfNodeStatusChange(final NodeConnectionStatus updatedStatus) { - final Set nodesToNotify = getNodeIdentifiers(NodeConnectionState.CONNECTED, NodeConnectionState.CONNECTING); + /** + * Notifies other nodes that the status of a node changed + * + * @param updatedStatus the updated status for a node in the cluster + * @param notifyAllNodes if true will notify all nodes. If false, will notify only the cluster coordinator + */ + void notifyOthersOfNodeStatusChange(final NodeConnectionStatus updatedStatus, final boolean notifyAllNodes) { + // If this node is the active cluster coordinator, then we are going to replicate to all nodes. + // Otherwise, get the active coordinator (or wait for one to become active) and then notify the coordinator. + final Set nodesToNotify; + if (notifyAllNodes) { + nodesToNotify = getNodeIdentifiers(NodeConnectionState.CONNECTED, NodeConnectionState.CONNECTING); - // Do not notify ourselves because we already know about the status update. - nodesToNotify.remove(getLocalNodeIdentifier()); + // Do not notify ourselves because we already know about the status update. + nodesToNotify.remove(getLocalNodeIdentifier()); + } else { + nodesToNotify = Collections.singleton(waitForElectedClusterCoordinator()); + } final NodeStatusChangeMessage message = new NodeStatusChangeMessage(); message.setNodeId(updatedStatus.getNodeIdentifier()); @@ -767,6 +811,10 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl nodeId, updatedStatus, oldStatus); } } + + if (isActiveClusterCoordinator()) { + notifyOthersOfNodeStatusChange(statusChangeMessage.getNodeConnectionStatus()); + } } private NodeIdentifier resolveNodeId(final NodeIdentifier proposedIdentifier) { @@ -872,6 +920,12 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl */ @Override public void afterRequest(final String uriPath, final String method, final Set nodeResponses) { + // if we are not the active cluster coordinator, then we are not responsible for monitoring the responses, + // as the cluster coordinator is responsible for performing the actual request replication. + if (!isActiveClusterCoordinator()) { + return; + } + final boolean mutableRequest = isMutableRequest(method); /* diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java index 81d72ed432..9ef0a14c96 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java @@ -305,6 +305,11 @@ public class TestAbstractHeartbeatMonitor { @Override public void removeRole(String clusterRole) { } + + @Override + public NodeIdentifier getLocalNodeIdentifier() { + return null; + } } public static class ReportedEvent { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java index 2af3e88d9d..5eac84616c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java @@ -87,7 +87,7 @@ public class TestThreadPoolRequestReplicator { final URI uri = new URI("http://localhost:8080/processors/1"); final Entity entity = new ProcessorEntity(); - final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true); + final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true, true); // We should get back the same response object assertTrue(response == replicator.getClusterResponse(response.getRequestIdentifier())); @@ -115,7 +115,7 @@ public class TestThreadPoolRequestReplicator { final URI uri = new URI("http://localhost:8080/processors/1"); final Entity entity = new ProcessorEntity(); - final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true); + final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true, true); // We should get back the same response object assertTrue(response == replicator.getClusterResponse(response.getRequestIdentifier())); @@ -151,7 +151,7 @@ public class TestThreadPoolRequestReplicator { final URI uri = new URI("http://localhost:8080/processors/1"); final Entity entity = new ProcessorEntity(); - final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true); + final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true, true); assertNotNull(response.awaitMergedResponse(1, TimeUnit.SECONDS)); } , null, 0L, new IllegalArgumentException("Exception created for unit test")); } @@ -191,7 +191,7 @@ public class TestThreadPoolRequestReplicator { try { final AsyncClusterResponse clusterResponse = replicator.replicate(nodeIds, HttpMethod.POST, - new URI("http://localhost:80/processors/1"), new ProcessorEntity(), new HashMap<>(), true); + new URI("http://localhost:80/processors/1"), new ProcessorEntity(), new HashMap<>(), true, true); clusterResponse.awaitMergedResponse(); // Ensure that we received two requests - the first should contain the X-NcmExpects header; the second should not. @@ -235,7 +235,8 @@ public class TestThreadPoolRequestReplicator { Mockito.when(coordinator.getConnectionStates()).thenReturn(nodeMap); final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null) { @Override - public AsyncClusterResponse replicate(Set nodeIds, String method, URI uri, Object entity, Map headers, boolean indicateReplicated) { + public AsyncClusterResponse replicate(Set nodeIds, String method, URI uri, Object entity, Map headers, + boolean indicateReplicated, boolean verify) { return null; } }; @@ -308,7 +309,7 @@ public class TestThreadPoolRequestReplicator { try { final AsyncClusterResponse clusterResponse = replicator.replicate(nodeIds, HttpMethod.POST, - new URI("http://localhost:80/processors/1"), new ProcessorEntity(), new HashMap<>(), true); + new URI("http://localhost:80/processors/1"), new ProcessorEntity(), new HashMap<>(), true, true); clusterResponse.awaitMergedResponse(); Assert.fail("Expected to get an IllegalClusterStateException but did not"); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java index 25c55a06ee..6f0ad0f07e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java @@ -59,7 +59,7 @@ import org.mockito.stubbing.Answer; public class TestNodeClusterCoordinator { private NodeClusterCoordinator coordinator; private ClusterCoordinationProtocolSenderListener senderListener; - private List nodeStatusChangeMessages; + private List nodeStatuses; private Properties createProperties() { final Properties props = new Properties(); @@ -68,25 +68,20 @@ public class TestNodeClusterCoordinator { } @Before - @SuppressWarnings("unchecked") public void setup() throws IOException { senderListener = Mockito.mock(ClusterCoordinationProtocolSenderListener.class); - nodeStatusChangeMessages = Collections.synchronizedList(new ArrayList<>()); - - Mockito.doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocation) throws Throwable { - final NodeStatusChangeMessage statusChangeMessage = invocation.getArgumentAt(1, NodeStatusChangeMessage.class); - nodeStatusChangeMessages.add(statusChangeMessage); - return null; - } - }).when(senderListener).notifyNodeStatusChange(Mockito.any(Set.class), Mockito.any(NodeStatusChangeMessage.class)); + nodeStatuses = Collections.synchronizedList(new ArrayList<>()); final EventReporter eventReporter = Mockito.mock(EventReporter.class); final RevisionManager revisionManager = Mockito.mock(RevisionManager.class); Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList()); - coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties()); + coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties()) { + @Override + void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes) { + nodeStatuses.add(updatedStatus); + } + }; final FlowService flowService = Mockito.mock(FlowService.class); final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], new byte[50], new byte[50]); @@ -136,7 +131,11 @@ public class TestNodeClusterCoordinator { final RevisionManager revisionManager = Mockito.mock(RevisionManager.class); Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList()); - final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties()); + final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties()) { + @Override + void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes) { + } + }; final NodeIdentifier requestedNodeId = createNodeId(6); final ConnectionRequest request = new ConnectionRequest(requestedNodeId); @@ -170,7 +169,11 @@ public class TestNodeClusterCoordinator { final RevisionManager revisionManager = Mockito.mock(RevisionManager.class); Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList()); - final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties()); + final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties()) { + @Override + void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes) { + } + }; final FlowService flowService = Mockito.mock(FlowService.class); final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], new byte[50], new byte[50]); @@ -200,80 +203,60 @@ public class TestNodeClusterCoordinator { // Create a connection request message and send to the coordinator requestConnection(createNodeId(1), coordinator); - while (nodeStatusChangeMessages.isEmpty()) { + while (nodeStatuses.isEmpty()) { Thread.sleep(20L); } - assertEquals(NodeConnectionState.CONNECTING, nodeStatusChangeMessages.get(0).getNodeConnectionStatus().getState()); - nodeStatusChangeMessages.clear(); + assertEquals(NodeConnectionState.CONNECTING, nodeStatuses.get(0).getState()); + nodeStatuses.clear(); // Finish connecting. This should notify all that the status is now 'CONNECTED' coordinator.finishNodeConnection(nodeId); - while (nodeStatusChangeMessages.isEmpty()) { + while (nodeStatuses.isEmpty()) { Thread.sleep(20L); } - assertEquals(NodeConnectionState.CONNECTED, nodeStatusChangeMessages.get(0).getNodeConnectionStatus().getState()); + assertEquals(NodeConnectionState.CONNECTED, nodeStatuses.get(0).getState()); assertEquals(NodeConnectionState.CONNECTED, coordinator.getConnectionStatus(nodeId).getState()); } @Test(timeout = 5000) - @SuppressWarnings("unchecked") public void testStatusChangesReplicated() throws InterruptedException, IOException { - final ClusterCoordinationProtocolSenderListener senderListener = Mockito.mock(ClusterCoordinationProtocolSenderListener.class); - final List msgs = Collections.synchronizedList(new ArrayList<>()); - - Mockito.doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocation) throws Throwable { - final NodeStatusChangeMessage statusChangeMessage = invocation.getArgumentAt(1, NodeStatusChangeMessage.class); - msgs.add(statusChangeMessage); - return null; - } - }).when(senderListener).notifyNodeStatusChange(Mockito.any(Set.class), Mockito.any(NodeStatusChangeMessage.class)); - - final EventReporter eventReporter = Mockito.mock(EventReporter.class); final RevisionManager revisionManager = Mockito.mock(RevisionManager.class); Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList()); - final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties()); - - final FlowService flowService = Mockito.mock(FlowService.class); - final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], new byte[50], new byte[50]); - Mockito.when(flowService.createDataFlow()).thenReturn(dataFlow); - coordinator.setFlowService(flowService); // Create a connection request message and send to the coordinator final NodeIdentifier requestedNodeId = createNodeId(1); requestConnection(requestedNodeId, coordinator); // The above connection request should trigger a 'CONNECTING' state transition to be replicated - while (msgs.isEmpty()) { + while (nodeStatuses.isEmpty()) { Thread.sleep(20L); } - final NodeStatusChangeMessage connectingMsg = msgs.get(0); - assertEquals(NodeConnectionState.CONNECTING, connectingMsg.getNodeConnectionStatus().getState()); - assertEquals(requestedNodeId, connectingMsg.getNodeId()); + final NodeConnectionStatus connectingStatus = nodeStatuses.get(0); + assertEquals(NodeConnectionState.CONNECTING, connectingStatus.getState()); + assertEquals(requestedNodeId, connectingStatus.getNodeIdentifier()); // set node status to connected coordinator.finishNodeConnection(requestedNodeId); // the above method will result in the node identifier becoming 'CONNECTED'. Wait for this to happen and clear the map - while (msgs.isEmpty()) { + while (nodeStatuses.isEmpty()) { Thread.sleep(20L); } - msgs.clear(); + nodeStatuses.clear(); coordinator.disconnectionRequestedByNode(requestedNodeId, DisconnectionCode.NODE_SHUTDOWN, "Unit Test"); - while (msgs.isEmpty()) { + while (nodeStatuses.isEmpty()) { Thread.sleep(20L); } - assertEquals(1, msgs.size()); - final NodeStatusChangeMessage statusChangeMsg = msgs.get(0); - assertNotNull(statusChangeMsg); - assertEquals(createNodeId(1), statusChangeMsg.getNodeId()); - assertEquals(DisconnectionCode.NODE_SHUTDOWN, statusChangeMsg.getNodeConnectionStatus().getDisconnectCode()); - assertEquals("Unit Test", statusChangeMsg.getNodeConnectionStatus().getDisconnectReason()); + assertEquals(1, nodeStatuses.size()); + final NodeConnectionStatus statusChange = nodeStatuses.get(0); + assertNotNull(statusChange); + assertEquals(createNodeId(1), statusChange.getNodeIdentifier()); + assertEquals(DisconnectionCode.NODE_SHUTDOWN, statusChange.getDisconnectCode()); + assertEquals("Unit Test", statusChange.getDisconnectReason()); } @@ -343,20 +326,20 @@ public class TestNodeClusterCoordinator { coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), NodeConnectionState.CONNECTED, Collections.emptySet())); // wait for the status change message and clear it - while (nodeStatusChangeMessages.isEmpty()) { + while (nodeStatuses.isEmpty()) { Thread.sleep(10L); } - nodeStatusChangeMessages.clear(); + nodeStatuses.clear(); coordinator.requestNodeDisconnect(nodeId1, DisconnectionCode.USER_DISCONNECTED, "Unit Test"); assertEquals(NodeConnectionState.DISCONNECTED, coordinator.getConnectionStatus(nodeId1).getState()); - while (nodeStatusChangeMessages.isEmpty()) { + while (nodeStatuses.isEmpty()) { Thread.sleep(10L); } - final NodeStatusChangeMessage msg = nodeStatusChangeMessages.get(0); - assertEquals(nodeId1, msg.getNodeId()); - assertEquals(NodeConnectionState.DISCONNECTED, msg.getNodeConnectionStatus().getState()); + final NodeConnectionStatus status = nodeStatuses.get(0); + assertEquals(nodeId1, status.getNodeIdentifier()); + assertEquals(NodeConnectionState.DISCONNECTED, status.getState()); } @@ -364,13 +347,17 @@ public class TestNodeClusterCoordinator { public void testCannotDisconnectLastNode() throws InterruptedException { // Add a connected node final NodeIdentifier nodeId1 = createNodeId(1); + final NodeIdentifier nodeId2 = createNodeId(2); coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, NodeConnectionState.CONNECTED, Collections.emptySet())); + coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId2, NodeConnectionState.CONNECTED, Collections.emptySet())); // wait for the status change message and clear it - while (nodeStatusChangeMessages.isEmpty()) { + while (nodeStatuses.isEmpty()) { Thread.sleep(10L); } - nodeStatusChangeMessages.clear(); + nodeStatuses.clear(); + + coordinator.requestNodeDisconnect(nodeId2, DisconnectionCode.USER_DISCONNECTED, "Unit Test"); try { coordinator.requestNodeDisconnect(nodeId1, DisconnectionCode.USER_DISCONNECTED, "Unit Test"); @@ -378,6 +365,9 @@ public class TestNodeClusterCoordinator { } catch (final IllegalNodeDisconnectionException inde) { // expected } + + // Should still be able to request that node 2 disconnect, since it's not the node that is connected + coordinator.requestNodeDisconnect(nodeId2, DisconnectionCode.USER_DISCONNECTED, "Unit Test"); } @@ -391,10 +381,10 @@ public class TestNodeClusterCoordinator { coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId2, NodeConnectionState.CONNECTED, Collections.emptySet())); // wait for the status change message and clear it - while (nodeStatusChangeMessages.size() < 2) { + while (nodeStatuses.size() < 2) { Thread.sleep(10L); } - nodeStatusChangeMessages.clear(); + nodeStatuses.clear(); final NodeConnectionStatus oldStatus = new NodeConnectionStatus(-1L, nodeId1, NodeConnectionState.DISCONNECTED, DisconnectionCode.BLOCKED_BY_FIREWALL, null, 0L, null); @@ -405,7 +395,7 @@ public class TestNodeClusterCoordinator { // Ensure that no status change message was send Thread.sleep(1000); - assertTrue(nodeStatusChangeMessages.isEmpty()); + assertTrue(nodeStatuses.isEmpty()); // Status should not have changed because our status id is too small. NodeConnectionStatus curStatus = coordinator.getConnectionStatus(nodeId1); @@ -431,51 +421,51 @@ public class TestNodeClusterCoordinator { coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, NodeConnectionState.CONNECTED, Collections.emptySet())); // wait for the status change message and clear it - while (nodeStatusChangeMessages.isEmpty()) { + while (nodeStatuses.isEmpty()) { Thread.sleep(10L); } - nodeStatusChangeMessages.clear(); + nodeStatuses.clear(); coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId2, NodeConnectionState.CONNECTED, Collections.emptySet())); // wait for the status change message and clear it - while (nodeStatusChangeMessages.isEmpty()) { + while (nodeStatuses.isEmpty()) { Thread.sleep(10L); } - nodeStatusChangeMessages.clear(); + nodeStatuses.clear(); // Update role of node 1 to primary node coordinator.updateNodeRoles(nodeId1, Collections.singleton(ClusterRoles.PRIMARY_NODE)); // wait for the status change message - while (nodeStatusChangeMessages.isEmpty()) { + while (nodeStatuses.isEmpty()) { Thread.sleep(10L); } // verify the message - final NodeStatusChangeMessage msg = nodeStatusChangeMessages.get(0); - assertNotNull(msg); - assertEquals(nodeId1, msg.getNodeId()); - assertEquals(NodeConnectionState.CONNECTED, msg.getNodeConnectionStatus().getState()); - assertEquals(Collections.singleton(ClusterRoles.PRIMARY_NODE), msg.getNodeConnectionStatus().getRoles()); - nodeStatusChangeMessages.clear(); + final NodeConnectionStatus status = nodeStatuses.get(0); + assertNotNull(status); + assertEquals(nodeId1, status.getNodeIdentifier()); + assertEquals(NodeConnectionState.CONNECTED, status.getState()); + assertEquals(Collections.singleton(ClusterRoles.PRIMARY_NODE), status.getRoles()); + nodeStatuses.clear(); // Update role of node 2 to primary node. This should trigger 2 status changes - // node 1 should lose primary role & node 2 should gain it coordinator.updateNodeRoles(nodeId2, Collections.singleton(ClusterRoles.PRIMARY_NODE)); // wait for the status change message - while (nodeStatusChangeMessages.size() < 2) { + while (nodeStatuses.size() < 2) { Thread.sleep(10L); } - final NodeStatusChangeMessage msg1 = nodeStatusChangeMessages.get(0); - final NodeStatusChangeMessage msg2 = nodeStatusChangeMessages.get(1); - final NodeStatusChangeMessage id1Msg = (msg1.getNodeId().equals(nodeId1)) ? msg1 : msg2; - final NodeStatusChangeMessage id2Msg = (msg1.getNodeId().equals(nodeId2)) ? msg1 : msg2; + final NodeConnectionStatus status1 = nodeStatuses.get(0); + final NodeConnectionStatus status2 = nodeStatuses.get(1); + final NodeConnectionStatus id1Msg = (status1.getNodeIdentifier().equals(nodeId1)) ? status1 : status2; + final NodeConnectionStatus id2Msg = (status1.getNodeIdentifier().equals(nodeId2)) ? status1 : status2; assertNotSame(id1Msg, id2Msg); - assertTrue(id1Msg.getNodeConnectionStatus().getRoles().isEmpty()); - assertEquals(Collections.singleton(ClusterRoles.PRIMARY_NODE), id2Msg.getNodeConnectionStatus().getRoles()); + assertTrue(id1Msg.getRoles().isEmpty()); + assertEquals(Collections.singleton(ClusterRoles.PRIMARY_NODE), id2Msg.getRoles()); } @@ -513,7 +503,6 @@ public class TestNodeClusterCoordinator { assertEquals(conflictingId.getSocketPort(), conflictingNodeId.getSocketPort()); } - private NodeIdentifier createNodeId(final int index) { return new NodeIdentifier(String.valueOf(index), "localhost", 8000 + index, "localhost", 9000 + index, "localhost", 10000 + index, 11000 + index, false); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index d6e9308040..30e382d378 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -3260,7 +3260,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private void registerForClusterCoordinator() { leaderElectionManager.register(ClusterRoles.CLUSTER_COORDINATOR, new LeaderElectionStateChangeListener() { @Override - public void onLeaderRelinquish() { + public synchronized void onLeaderRelinquish() { heartbeatMonitor.stop(); if (clusterCoordinator != null) { @@ -3269,7 +3269,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } @Override - public void onLeaderElection() { + public synchronized void onLeaderElection() { heartbeatMonitor.start(); if (clusterCoordinator != null) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java index e2b63a2daf..1786bceaba 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java @@ -98,7 +98,7 @@ public class StandardNiFiContentAccess implements ContentAccess { throw new NoClusterCoordinatorException(); } final Set coordinatorNodes = Collections.singleton(coordinatorNode); - nodeResponse = requestReplicator.replicate(coordinatorNodes, HttpMethod.GET, dataUri, parameters, headers, false).awaitMergedResponse(); + nodeResponse = requestReplicator.replicate(coordinatorNodes, HttpMethod.GET, dataUri, parameters, headers, false, true).awaitMergedResponse(); } catch (InterruptedException e) { throw new IllegalClusterStateException("Interrupted while waiting for a response from node"); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java index 641ed38bfa..fd3c474b6c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java @@ -297,7 +297,7 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration } final Set coordinatorNodes = Collections.singleton(coordinatorNode); - return requestReplicator.replicate(coordinatorNodes, method, uri, entity, headers, false).awaitMergedResponse(); + return requestReplicator.replicate(coordinatorNodes, method, uri, entity, headers, false, true).awaitMergedResponse(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java index a49ed5d251..6a9e1d08f8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java @@ -565,11 +565,11 @@ public abstract class ApplicationResource { // If we are to replicate directly to the nodes, we need to indicate that the replication source is // the cluster coordinator so that the node knows to service the request. final Set targetNodes = Collections.singleton(nodeId); - return requestReplicator.replicate(targetNodes, method, path, entity, headers, true).awaitMergedResponse().getResponse(); + return requestReplicator.replicate(targetNodes, method, path, entity, headers, true, true).awaitMergedResponse().getResponse(); } else { headers.put(RequestReplicator.REPLICATION_TARGET_NODE_UUID_HEADER, nodeId.getId()); return requestReplicator.replicate(Collections.singleton(getClusterCoordinatorNode()), method, - path, entity, headers, false).awaitMergedResponse().getResponse(); + path, entity, headers, false, true).awaitMergedResponse().getResponse(); } } catch (final InterruptedException ie) { return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Request to " + method + " " + path + " was interrupted").type("text/plain").build(); @@ -589,23 +589,38 @@ public abstract class ApplicationResource { return clusterCoordinator.isActiveClusterCoordinator() ? ReplicationTarget.CLUSTER_NODES : ReplicationTarget.CLUSTER_COORDINATOR; } + protected Response replicate(final String method, final NodeIdentifier targetNode) { + return replicate(method, targetNode, getRequestParameters()); + } + + protected Response replicate(final String method, final NodeIdentifier targetNode, final Object entity) { try { // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly // to the cluster nodes themselves. if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { final Set nodeIds = Collections.singleton(targetNode); - return getRequestReplicator().replicate(nodeIds, method, getAbsolutePath(), getRequestParameters(), getHeaders(), true).awaitMergedResponse().getResponse(); + return getRequestReplicator().replicate(nodeIds, method, getAbsolutePath(), entity, getHeaders(), true, true).awaitMergedResponse().getResponse(); } else { final Set coordinatorNode = Collections.singleton(getClusterCoordinatorNode()); final Map headers = getHeaders(Collections.singletonMap(RequestReplicator.REPLICATION_TARGET_NODE_UUID_HEADER, targetNode.getId())); - return getRequestReplicator().replicate(coordinatorNode, method, getAbsolutePath(), getRequestParameters(), headers, false).awaitMergedResponse().getResponse(); + return getRequestReplicator().replicate(coordinatorNode, method, getAbsolutePath(), entity, headers, false, true).awaitMergedResponse().getResponse(); } } catch (final InterruptedException ie) { return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Request to " + method + " " + getAbsolutePath() + " was interrupted").type("text/plain").build(); } } + protected Response replicateToCoordinator(final String method, final Object entity) { + try { + final NodeIdentifier coordinatorNode = getClusterCoordinatorNode(); + final Set coordinatorNodes = Collections.singleton(coordinatorNode); + return getRequestReplicator().replicate(coordinatorNodes, method, getAbsolutePath(), entity, getHeaders(), true, false).awaitMergedResponse().getResponse(); + } catch (final InterruptedException ie) { + return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Request to " + method + " " + getAbsolutePath() + " was interrupted").type("text/plain").build(); + } + } + /** * Convenience method for calling {@link #replicate(String, Object)} with an entity of * {@link #getRequestParameters() getRequestParameters(true)} @@ -685,7 +700,7 @@ public abstract class ApplicationResource { if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { return requestReplicator.replicate(method, path, entity, headers).awaitMergedResponse(); } else { - return requestReplicator.replicate(Collections.singleton(getClusterCoordinatorNode()), method, path, entity, headers, false).awaitMergedResponse(); + return requestReplicator.replicate(Collections.singleton(getClusterCoordinatorNode()), method, path, entity, headers, false, true).awaitMergedResponse(); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java index 2f73c70905..7a5dab74f5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java @@ -548,6 +548,10 @@ public class ControllerResource extends ApplicationResource { + "not equal the node id of the requested resource (%s).", requestNodeDTO.getNodeId(), id)); } + if (isReplicateRequest()) { + return replicateToCoordinator(HttpMethod.PUT, nodeEntity); + } + // update the node final NodeDTO node = serviceFacade.updateNode(requestNodeDTO); @@ -600,6 +604,10 @@ public class ControllerResource extends ApplicationResource { throw new IllegalClusterResourceRequestException("Only a node connected to a cluster can process the request."); } + if (isReplicateRequest()) { + return replicateToCoordinator(HttpMethod.DELETE, getRequestParameters()); + } + serviceFacade.deleteNode(id); // create the response entity diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/CountersResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/CountersResource.java index 8dfe417fe2..26a708c81d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/CountersResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/CountersResource.java @@ -168,7 +168,7 @@ public class CountersResource extends ApplicationResource { nodeResponse = getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders()).awaitMergedResponse(); } else { final Set coordinatorNode = Collections.singleton(getClusterCoordinatorNode()); - nodeResponse = getRequestReplicator().replicate(coordinatorNode, HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders(), false).awaitMergedResponse(); + nodeResponse = getRequestReplicator().replicate(coordinatorNode, HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders(), false, true).awaitMergedResponse(); } final CountersEntity entity = (CountersEntity) nodeResponse.getUpdatedEntity(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java index 1ea373658a..a4933a58fb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java @@ -1980,7 +1980,7 @@ public class ProcessGroupResource extends ApplicationResource { return getRequestReplicator().replicate(HttpMethod.POST, importUri, entity, getHeaders(headersToOverride)).awaitMergedResponse().getResponse(); } else { final Set coordinatorNode = Collections.singleton(getClusterCoordinatorNode()); - return getRequestReplicator().replicate(coordinatorNode, HttpMethod.POST, importUri, entity, getHeaders(headersToOverride), false).awaitMergedResponse().getResponse(); + return getRequestReplicator().replicate(coordinatorNode, HttpMethod.POST, importUri, entity, getHeaders(headersToOverride), false, true).awaitMergedResponse().getResponse(); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java index 8a093d0904..d9db9928b2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java @@ -147,7 +147,7 @@ public class SystemDiagnosticsResource extends ApplicationResource { nodeResponse = getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders()).awaitMergedResponse(); } else { final Set coordinatorNode = Collections.singleton(getClusterCoordinatorNode()); - nodeResponse = getRequestReplicator().replicate(coordinatorNode, HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders(), false).awaitMergedResponse(); + nodeResponse = getRequestReplicator().replicate(coordinatorNode, HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders(), false, true).awaitMergedResponse(); } final SystemDiagnosticsEntity entity = (SystemDiagnosticsEntity) nodeResponse.getUpdatedEntity();