mirror of https://github.com/apache/nifi.git
NIFI-2292: Funnel all cluster node status changes through the cluster coordinator instead of having each node broadcast changes to the whole cluster. This gives us the ability to increment the updateId consistently without race conditions.
This closes #717 Signed-off-by: jpercivall <joepercivall@yahoo.com>
This commit is contained in:
parent
3e9867d5da
commit
7779af69b4
|
@ -172,6 +172,11 @@ public interface ClusterCoordinator {
|
||||||
*/
|
*/
|
||||||
NodeIdentifier getElectedActiveCoordinatorNode();
|
NodeIdentifier getElectedActiveCoordinatorNode();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the identifier of this node, if it is known, <code>null</code> if the Node Identifier has not yet been established.
|
||||||
|
*/
|
||||||
|
NodeIdentifier getLocalNodeIdentifier();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return <code>true</code> if this node has been elected the active cluster coordinator, <code>false</code> otherwise.
|
* @return <code>true</code> if this node has been elected the active cluster coordinator, <code>false</code> otherwise.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -67,6 +67,11 @@ public enum DisconnectionCode {
|
||||||
*/
|
*/
|
||||||
FAILED_TO_SERVICE_REQUEST("Failed to Service Request"),
|
FAILED_TO_SERVICE_REQUEST("Failed to Service Request"),
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Coordinator received a heartbeat from node, but the node is disconnected from the cluster
|
||||||
|
*/
|
||||||
|
HEARTBEAT_RECEIVED_FROM_DISCONNECTED_NODE("Heartbeat Received from Disconnected Node"),
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Node is being shut down
|
* Node is being shut down
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -25,7 +25,7 @@ import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
|
||||||
import org.apache.nifi.reporting.BulletinRepository;
|
import org.apache.nifi.reporting.BulletinRepository;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An interface for sending protocol messages from the cluster manager to nodes.
|
* An interface for sending protocol messages from the cluster coordinator to nodes.
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public interface ClusterCoordinationProtocolSender {
|
public interface ClusterCoordinationProtocolSender {
|
||||||
|
|
|
@ -126,9 +126,7 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor {
|
||||||
protected synchronized void monitorHeartbeats() {
|
protected synchronized void monitorHeartbeats() {
|
||||||
final Map<NodeIdentifier, NodeHeartbeat> latestHeartbeats = getLatestHeartbeats();
|
final Map<NodeIdentifier, NodeHeartbeat> latestHeartbeats = getLatestHeartbeats();
|
||||||
if (latestHeartbeats == null || latestHeartbeats.isEmpty()) {
|
if (latestHeartbeats == null || latestHeartbeats.isEmpty()) {
|
||||||
// failed to fetch heartbeats; don't change anything.
|
logger.debug("Received no new heartbeats. Will not disconnect any nodes due to lack of heartbeat");
|
||||||
clusterCoordinator.reportEvent(null, Severity.INFO, "Failed to retrieve any new heartbeat information for nodes. "
|
|
||||||
+ "Will not make any decisions based on heartbeats.");
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -213,7 +211,7 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor {
|
||||||
} else {
|
} else {
|
||||||
// disconnected nodes should not heartbeat, so we need to issue a disconnection request.
|
// disconnected nodes should not heartbeat, so we need to issue a disconnection request.
|
||||||
logger.info("Ignoring received heartbeat from disconnected node " + nodeId + ". Issuing disconnection request.");
|
logger.info("Ignoring received heartbeat from disconnected node " + nodeId + ". Issuing disconnection request.");
|
||||||
clusterCoordinator.requestNodeDisconnect(nodeId, connectionStatus.getDisconnectCode(), connectionStatus.getDisconnectReason());
|
clusterCoordinator.requestNodeDisconnect(nodeId, DisconnectionCode.HEARTBEAT_RECEIVED_FROM_DISCONNECTED_NODE, DisconnectionCode.HEARTBEAT_RECEIVED_FROM_DISCONNECTED_NODE.toString());
|
||||||
removeHeartbeat(nodeId);
|
removeHeartbeat(nodeId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -98,10 +98,13 @@ public interface RequestReplicator {
|
||||||
* @param headers any HTTP headers
|
* @param headers any HTTP headers
|
||||||
* @param indicateReplicated if <code>true</code>, will add a header indicating to the receiving nodes that the request
|
* @param indicateReplicated if <code>true</code>, will add a header indicating to the receiving nodes that the request
|
||||||
* has already been replicated, so the receiving node will not replicate the request itself.
|
* has already been replicated, so the receiving node will not replicate the request itself.
|
||||||
|
* @param performVerification if <code>true</code>, and the request is mutable, will verify that all nodes are connected before
|
||||||
|
* making the request and that all nodes are able to perform the request before acutally attempting to perform the task.
|
||||||
|
* If false, will perform no such verification
|
||||||
*
|
*
|
||||||
* @return an AsyncClusterResponse that indicates the current status of the request and provides an identifier for obtaining an updated response later
|
* @return an AsyncClusterResponse that indicates the current status of the request and provides an identifier for obtaining an updated response later
|
||||||
*/
|
*/
|
||||||
AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, boolean indicateReplicated);
|
AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, boolean indicateReplicated, boolean performVerification);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
|
|
|
@ -211,11 +211,12 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
|
||||||
|
|
||||||
final Set<NodeIdentifier> nodeIdSet = new HashSet<>(nodeIds);
|
final Set<NodeIdentifier> nodeIdSet = new HashSet<>(nodeIds);
|
||||||
|
|
||||||
return replicate(nodeIdSet, method, uri, entity, headers, true);
|
return replicate(nodeIdSet, method, uri, entity, headers, true, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, final boolean indicateReplicated) {
|
public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers,
|
||||||
|
final boolean indicateReplicated, final boolean performVerification) {
|
||||||
final Map<String, String> updatedHeaders = new HashMap<>(headers);
|
final Map<String, String> updatedHeaders = new HashMap<>(headers);
|
||||||
|
|
||||||
updatedHeaders.put(RequestReplicator.CLUSTER_ID_GENERATION_SEED_HEADER, TypeOneUUIDGenerator.generateId().toString());
|
updatedHeaders.put(RequestReplicator.CLUSTER_ID_GENERATION_SEED_HEADER, TypeOneUUIDGenerator.generateId().toString());
|
||||||
|
@ -242,12 +243,12 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
|
||||||
lock.lock();
|
lock.lock();
|
||||||
try {
|
try {
|
||||||
logger.debug("Lock {} obtained in order to replicate request {} {}", method, uri);
|
logger.debug("Lock {} obtained in order to replicate request {} {}", method, uri);
|
||||||
return replicate(nodeIds, method, uri, entity, updatedHeaders, true, null);
|
return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null);
|
||||||
} finally {
|
} finally {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return replicate(nodeIds, method, uri, entity, updatedHeaders, true, null);
|
return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -259,13 +260,13 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
|
||||||
* @param uri the URI to send the request to
|
* @param uri the URI to send the request to
|
||||||
* @param entity the entity to use
|
* @param entity the entity to use
|
||||||
* @param headers the HTTP Headers
|
* @param headers the HTTP Headers
|
||||||
* @param performVerification whether or not to use 2-phase commit to verify that all nodes can handle the request. Ignored if request is not mutable.
|
* @param performVerification whether or not to verify that all nodes in the cluster are connected and that all nodes can perform request. Ignored if request is not mutable.
|
||||||
* @param response the response to update with the results
|
* @param response the response to update with the results
|
||||||
*
|
*
|
||||||
* @return an AsyncClusterResponse that can be used to obtain the result
|
* @return an AsyncClusterResponse that can be used to obtain the result
|
||||||
*/
|
*/
|
||||||
private AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, boolean performVerification,
|
private AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, boolean performVerification,
|
||||||
StandardAsyncClusterResponse response) {
|
StandardAsyncClusterResponse response) {
|
||||||
|
|
||||||
// state validation
|
// state validation
|
||||||
Objects.requireNonNull(nodeIds);
|
Objects.requireNonNull(nodeIds);
|
||||||
|
@ -298,7 +299,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
|
||||||
final String requestId = updatedHeaders.computeIfAbsent(REQUEST_TRANSACTION_ID_HEADER, key -> UUID.randomUUID().toString());
|
final String requestId = updatedHeaders.computeIfAbsent(REQUEST_TRANSACTION_ID_HEADER, key -> UUID.randomUUID().toString());
|
||||||
|
|
||||||
if (performVerification) {
|
if (performVerification) {
|
||||||
verifyState(method, uri.getPath());
|
verifyClusterState(method, uri.getPath());
|
||||||
}
|
}
|
||||||
|
|
||||||
int numRequests = responseMap.size();
|
int numRequests = responseMap.size();
|
||||||
|
@ -530,7 +531,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
|
||||||
*
|
*
|
||||||
* @throw IllegalClusterStateException if the cluster is not in a state that allows a request to made to the given URI Path using the given HTTP Method
|
* @throw IllegalClusterStateException if the cluster is not in a state that allows a request to made to the given URI Path using the given HTTP Method
|
||||||
*/
|
*/
|
||||||
private void verifyState(final String httpMethod, final String uriPath) {
|
private void verifyClusterState(final String httpMethod, final String uriPath) {
|
||||||
final boolean mutableRequest = HttpMethod.DELETE.equals(httpMethod) || HttpMethod.POST.equals(httpMethod) || HttpMethod.PUT.equals(httpMethod);
|
final boolean mutableRequest = HttpMethod.DELETE.equals(httpMethod) || HttpMethod.POST.equals(httpMethod) || HttpMethod.PUT.equals(httpMethod);
|
||||||
|
|
||||||
// check that the request can be applied
|
// check that the request can be applied
|
||||||
|
|
|
@ -30,6 +30,7 @@ import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
import java.util.function.Supplier;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
@ -138,14 +139,23 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
|
||||||
this.nodeId = nodeId;
|
this.nodeId = nodeId;
|
||||||
}
|
}
|
||||||
|
|
||||||
NodeIdentifier getLocalNodeIdentifier() {
|
@Override
|
||||||
|
public NodeIdentifier getLocalNodeIdentifier() {
|
||||||
return nodeId;
|
return nodeId;
|
||||||
}
|
}
|
||||||
|
|
||||||
private NodeIdentifier waitForLocalNodeIdentifier() {
|
private NodeIdentifier waitForLocalNodeIdentifier() {
|
||||||
|
return waitForNodeIdentifier(() -> getLocalNodeIdentifier());
|
||||||
|
}
|
||||||
|
|
||||||
|
private NodeIdentifier waitForElectedClusterCoordinator() {
|
||||||
|
return waitForNodeIdentifier(() -> getElectedActiveCoordinatorNode(false));
|
||||||
|
}
|
||||||
|
|
||||||
|
private NodeIdentifier waitForNodeIdentifier(final Supplier<NodeIdentifier> fetchNodeId) {
|
||||||
NodeIdentifier localNodeId = null;
|
NodeIdentifier localNodeId = null;
|
||||||
while (localNodeId == null) {
|
while (localNodeId == null) {
|
||||||
localNodeId = getLocalNodeIdentifier();
|
localNodeId = fetchNodeId.get();
|
||||||
if (localNodeId == null) {
|
if (localNodeId == null) {
|
||||||
try {
|
try {
|
||||||
Thread.sleep(100L);
|
Thread.sleep(100L);
|
||||||
|
@ -279,8 +289,8 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) {
|
public void requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) {
|
||||||
final int numConnected = getNodeIdentifiers(NodeConnectionState.CONNECTED).size();
|
final Set<NodeIdentifier> connectedNodeIds = getNodeIdentifiers(NodeConnectionState.CONNECTED);
|
||||||
if (numConnected == 1) {
|
if (connectedNodeIds.size() == 1 && connectedNodeIds.contains(nodeId)) {
|
||||||
throw new IllegalNodeDisconnectionException("Cannot disconnect node " + nodeId + " because it is the only node currently connected");
|
throw new IllegalNodeDisconnectionException("Cannot disconnect node " + nodeId + " because it is the only node currently connected");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -514,17 +524,27 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public NodeIdentifier getElectedActiveCoordinatorNode() {
|
public NodeIdentifier getElectedActiveCoordinatorNode() {
|
||||||
|
return getElectedActiveCoordinatorNode(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private NodeIdentifier getElectedActiveCoordinatorNode(final boolean warnOnError) {
|
||||||
final String electedNodeAddress;
|
final String electedNodeAddress;
|
||||||
try {
|
try {
|
||||||
electedNodeAddress = getElectedActiveCoordinatorAddress();
|
electedNodeAddress = getElectedActiveCoordinatorAddress();
|
||||||
} catch (final IOException ioe) {
|
} catch (final IOException ioe) {
|
||||||
logger.warn("Failed to determine which node is elected active Cluster Coordinator. There may be no coordinator currently:", ioe);
|
if (warnOnError) {
|
||||||
|
logger.warn("Failed to determine which node is elected active Cluster Coordinator. There may be no coordinator currently:", ioe);
|
||||||
|
}
|
||||||
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
final int colonLoc = electedNodeAddress.indexOf(':');
|
final int colonLoc = electedNodeAddress.indexOf(':');
|
||||||
if (colonLoc < 1) {
|
if (colonLoc < 1) {
|
||||||
logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but this is not a valid address", electedNodeAddress);
|
if (warnOnError) {
|
||||||
|
logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but this is not a valid address", electedNodeAddress);
|
||||||
|
}
|
||||||
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -534,7 +554,10 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
|
||||||
try {
|
try {
|
||||||
electedNodePort = Integer.parseInt(portString);
|
electedNodePort = Integer.parseInt(portString);
|
||||||
} catch (final NumberFormatException nfe) {
|
} catch (final NumberFormatException nfe) {
|
||||||
logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but this is not a valid address", electedNodeAddress);
|
if (warnOnError) {
|
||||||
|
logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but this is not a valid address", electedNodeAddress);
|
||||||
|
}
|
||||||
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -544,7 +567,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
|
||||||
.findFirst()
|
.findFirst()
|
||||||
.orElse(null);
|
.orElse(null);
|
||||||
|
|
||||||
if (electedNodeId == null) {
|
if (electedNodeId == null && warnOnError) {
|
||||||
logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but there is no node with this address", electedNodeAddress);
|
logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but there is no node with this address", electedNodeAddress);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -610,16 +633,37 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
|
||||||
logger.debug("State of cluster nodes is now {}", nodeStatuses);
|
logger.debug("State of cluster nodes is now {}", nodeStatuses);
|
||||||
|
|
||||||
if (currentState == null || currentState != status.getState()) {
|
if (currentState == null || currentState != status.getState()) {
|
||||||
notifyOthersOfNodeStatusChange(status);
|
// We notify all nodes of the status change if either this node is the current cluster coordinator, OR if the node was
|
||||||
|
// the cluster coordinator and no longer is. This is done because if a user disconnects the cluster coordinator, we need
|
||||||
|
// to broadcast to the cluster that this node is no longer the coordinator. Otherwise, all nodes but this one will still
|
||||||
|
// believe that this node is connected to the cluster.
|
||||||
|
final boolean notifyAllNodes = isActiveClusterCoordinator() || (currentStatus != null && currentStatus.getRoles().contains(ClusterRoles.CLUSTER_COORDINATOR));
|
||||||
|
notifyOthersOfNodeStatusChange(status, notifyAllNodes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void notifyOthersOfNodeStatusChange(final NodeConnectionStatus updatedStatus) {
|
||||||
|
notifyOthersOfNodeStatusChange(updatedStatus, isActiveClusterCoordinator());
|
||||||
|
}
|
||||||
|
|
||||||
private void notifyOthersOfNodeStatusChange(final NodeConnectionStatus updatedStatus) {
|
/**
|
||||||
final Set<NodeIdentifier> nodesToNotify = getNodeIdentifiers(NodeConnectionState.CONNECTED, NodeConnectionState.CONNECTING);
|
* Notifies other nodes that the status of a node changed
|
||||||
|
*
|
||||||
|
* @param updatedStatus the updated status for a node in the cluster
|
||||||
|
* @param notifyAllNodes if <code>true</code> will notify all nodes. If <code>false</code>, will notify only the cluster coordinator
|
||||||
|
*/
|
||||||
|
void notifyOthersOfNodeStatusChange(final NodeConnectionStatus updatedStatus, final boolean notifyAllNodes) {
|
||||||
|
// If this node is the active cluster coordinator, then we are going to replicate to all nodes.
|
||||||
|
// Otherwise, get the active coordinator (or wait for one to become active) and then notify the coordinator.
|
||||||
|
final Set<NodeIdentifier> nodesToNotify;
|
||||||
|
if (notifyAllNodes) {
|
||||||
|
nodesToNotify = getNodeIdentifiers(NodeConnectionState.CONNECTED, NodeConnectionState.CONNECTING);
|
||||||
|
|
||||||
// Do not notify ourselves because we already know about the status update.
|
// Do not notify ourselves because we already know about the status update.
|
||||||
nodesToNotify.remove(getLocalNodeIdentifier());
|
nodesToNotify.remove(getLocalNodeIdentifier());
|
||||||
|
} else {
|
||||||
|
nodesToNotify = Collections.singleton(waitForElectedClusterCoordinator());
|
||||||
|
}
|
||||||
|
|
||||||
final NodeStatusChangeMessage message = new NodeStatusChangeMessage();
|
final NodeStatusChangeMessage message = new NodeStatusChangeMessage();
|
||||||
message.setNodeId(updatedStatus.getNodeIdentifier());
|
message.setNodeId(updatedStatus.getNodeIdentifier());
|
||||||
|
@ -767,6 +811,10 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
|
||||||
nodeId, updatedStatus, oldStatus);
|
nodeId, updatedStatus, oldStatus);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (isActiveClusterCoordinator()) {
|
||||||
|
notifyOthersOfNodeStatusChange(statusChangeMessage.getNodeConnectionStatus());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private NodeIdentifier resolveNodeId(final NodeIdentifier proposedIdentifier) {
|
private NodeIdentifier resolveNodeId(final NodeIdentifier proposedIdentifier) {
|
||||||
|
@ -872,6 +920,12 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void afterRequest(final String uriPath, final String method, final Set<NodeResponse> nodeResponses) {
|
public void afterRequest(final String uriPath, final String method, final Set<NodeResponse> nodeResponses) {
|
||||||
|
// if we are not the active cluster coordinator, then we are not responsible for monitoring the responses,
|
||||||
|
// as the cluster coordinator is responsible for performing the actual request replication.
|
||||||
|
if (!isActiveClusterCoordinator()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
final boolean mutableRequest = isMutableRequest(method);
|
final boolean mutableRequest = isMutableRequest(method);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -305,6 +305,11 @@ public class TestAbstractHeartbeatMonitor {
|
||||||
@Override
|
@Override
|
||||||
public void removeRole(String clusterRole) {
|
public void removeRole(String clusterRole) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public NodeIdentifier getLocalNodeIdentifier() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class ReportedEvent {
|
public static class ReportedEvent {
|
||||||
|
|
|
@ -87,7 +87,7 @@ public class TestThreadPoolRequestReplicator {
|
||||||
final URI uri = new URI("http://localhost:8080/processors/1");
|
final URI uri = new URI("http://localhost:8080/processors/1");
|
||||||
final Entity entity = new ProcessorEntity();
|
final Entity entity = new ProcessorEntity();
|
||||||
|
|
||||||
final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true);
|
final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true, true);
|
||||||
|
|
||||||
// We should get back the same response object
|
// We should get back the same response object
|
||||||
assertTrue(response == replicator.getClusterResponse(response.getRequestIdentifier()));
|
assertTrue(response == replicator.getClusterResponse(response.getRequestIdentifier()));
|
||||||
|
@ -115,7 +115,7 @@ public class TestThreadPoolRequestReplicator {
|
||||||
final URI uri = new URI("http://localhost:8080/processors/1");
|
final URI uri = new URI("http://localhost:8080/processors/1");
|
||||||
final Entity entity = new ProcessorEntity();
|
final Entity entity = new ProcessorEntity();
|
||||||
|
|
||||||
final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true);
|
final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true, true);
|
||||||
|
|
||||||
// We should get back the same response object
|
// We should get back the same response object
|
||||||
assertTrue(response == replicator.getClusterResponse(response.getRequestIdentifier()));
|
assertTrue(response == replicator.getClusterResponse(response.getRequestIdentifier()));
|
||||||
|
@ -151,7 +151,7 @@ public class TestThreadPoolRequestReplicator {
|
||||||
final URI uri = new URI("http://localhost:8080/processors/1");
|
final URI uri = new URI("http://localhost:8080/processors/1");
|
||||||
final Entity entity = new ProcessorEntity();
|
final Entity entity = new ProcessorEntity();
|
||||||
|
|
||||||
final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true);
|
final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true, true);
|
||||||
assertNotNull(response.awaitMergedResponse(1, TimeUnit.SECONDS));
|
assertNotNull(response.awaitMergedResponse(1, TimeUnit.SECONDS));
|
||||||
} , null, 0L, new IllegalArgumentException("Exception created for unit test"));
|
} , null, 0L, new IllegalArgumentException("Exception created for unit test"));
|
||||||
}
|
}
|
||||||
|
@ -191,7 +191,7 @@ public class TestThreadPoolRequestReplicator {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
final AsyncClusterResponse clusterResponse = replicator.replicate(nodeIds, HttpMethod.POST,
|
final AsyncClusterResponse clusterResponse = replicator.replicate(nodeIds, HttpMethod.POST,
|
||||||
new URI("http://localhost:80/processors/1"), new ProcessorEntity(), new HashMap<>(), true);
|
new URI("http://localhost:80/processors/1"), new ProcessorEntity(), new HashMap<>(), true, true);
|
||||||
clusterResponse.awaitMergedResponse();
|
clusterResponse.awaitMergedResponse();
|
||||||
|
|
||||||
// Ensure that we received two requests - the first should contain the X-NcmExpects header; the second should not.
|
// Ensure that we received two requests - the first should contain the X-NcmExpects header; the second should not.
|
||||||
|
@ -235,7 +235,8 @@ public class TestThreadPoolRequestReplicator {
|
||||||
Mockito.when(coordinator.getConnectionStates()).thenReturn(nodeMap);
|
Mockito.when(coordinator.getConnectionStates()).thenReturn(nodeMap);
|
||||||
final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null) {
|
final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null) {
|
||||||
@Override
|
@Override
|
||||||
public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, boolean indicateReplicated) {
|
public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers,
|
||||||
|
boolean indicateReplicated, boolean verify) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -308,7 +309,7 @@ public class TestThreadPoolRequestReplicator {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
final AsyncClusterResponse clusterResponse = replicator.replicate(nodeIds, HttpMethod.POST,
|
final AsyncClusterResponse clusterResponse = replicator.replicate(nodeIds, HttpMethod.POST,
|
||||||
new URI("http://localhost:80/processors/1"), new ProcessorEntity(), new HashMap<>(), true);
|
new URI("http://localhost:80/processors/1"), new ProcessorEntity(), new HashMap<>(), true, true);
|
||||||
clusterResponse.awaitMergedResponse();
|
clusterResponse.awaitMergedResponse();
|
||||||
|
|
||||||
Assert.fail("Expected to get an IllegalClusterStateException but did not");
|
Assert.fail("Expected to get an IllegalClusterStateException but did not");
|
||||||
|
|
|
@ -59,7 +59,7 @@ import org.mockito.stubbing.Answer;
|
||||||
public class TestNodeClusterCoordinator {
|
public class TestNodeClusterCoordinator {
|
||||||
private NodeClusterCoordinator coordinator;
|
private NodeClusterCoordinator coordinator;
|
||||||
private ClusterCoordinationProtocolSenderListener senderListener;
|
private ClusterCoordinationProtocolSenderListener senderListener;
|
||||||
private List<NodeStatusChangeMessage> nodeStatusChangeMessages;
|
private List<NodeConnectionStatus> nodeStatuses;
|
||||||
|
|
||||||
private Properties createProperties() {
|
private Properties createProperties() {
|
||||||
final Properties props = new Properties();
|
final Properties props = new Properties();
|
||||||
|
@ -68,25 +68,20 @@ public class TestNodeClusterCoordinator {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
public void setup() throws IOException {
|
public void setup() throws IOException {
|
||||||
senderListener = Mockito.mock(ClusterCoordinationProtocolSenderListener.class);
|
senderListener = Mockito.mock(ClusterCoordinationProtocolSenderListener.class);
|
||||||
nodeStatusChangeMessages = Collections.synchronizedList(new ArrayList<>());
|
nodeStatuses = Collections.synchronizedList(new ArrayList<>());
|
||||||
|
|
||||||
Mockito.doAnswer(new Answer<Object>() {
|
|
||||||
@Override
|
|
||||||
public Object answer(InvocationOnMock invocation) throws Throwable {
|
|
||||||
final NodeStatusChangeMessage statusChangeMessage = invocation.getArgumentAt(1, NodeStatusChangeMessage.class);
|
|
||||||
nodeStatusChangeMessages.add(statusChangeMessage);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}).when(senderListener).notifyNodeStatusChange(Mockito.any(Set.class), Mockito.any(NodeStatusChangeMessage.class));
|
|
||||||
|
|
||||||
final EventReporter eventReporter = Mockito.mock(EventReporter.class);
|
final EventReporter eventReporter = Mockito.mock(EventReporter.class);
|
||||||
final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
|
final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
|
||||||
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
|
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
|
||||||
|
|
||||||
coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties());
|
coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties()) {
|
||||||
|
@Override
|
||||||
|
void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes) {
|
||||||
|
nodeStatuses.add(updatedStatus);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
final FlowService flowService = Mockito.mock(FlowService.class);
|
final FlowService flowService = Mockito.mock(FlowService.class);
|
||||||
final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], new byte[50], new byte[50]);
|
final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], new byte[50], new byte[50]);
|
||||||
|
@ -136,7 +131,11 @@ public class TestNodeClusterCoordinator {
|
||||||
final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
|
final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
|
||||||
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
|
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
|
||||||
|
|
||||||
final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties());
|
final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties()) {
|
||||||
|
@Override
|
||||||
|
void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes) {
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
final NodeIdentifier requestedNodeId = createNodeId(6);
|
final NodeIdentifier requestedNodeId = createNodeId(6);
|
||||||
final ConnectionRequest request = new ConnectionRequest(requestedNodeId);
|
final ConnectionRequest request = new ConnectionRequest(requestedNodeId);
|
||||||
|
@ -170,7 +169,11 @@ public class TestNodeClusterCoordinator {
|
||||||
final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
|
final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
|
||||||
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
|
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
|
||||||
|
|
||||||
final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties());
|
final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties()) {
|
||||||
|
@Override
|
||||||
|
void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes) {
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
final FlowService flowService = Mockito.mock(FlowService.class);
|
final FlowService flowService = Mockito.mock(FlowService.class);
|
||||||
final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], new byte[50], new byte[50]);
|
final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], new byte[50], new byte[50]);
|
||||||
|
@ -200,80 +203,60 @@ public class TestNodeClusterCoordinator {
|
||||||
// Create a connection request message and send to the coordinator
|
// Create a connection request message and send to the coordinator
|
||||||
requestConnection(createNodeId(1), coordinator);
|
requestConnection(createNodeId(1), coordinator);
|
||||||
|
|
||||||
while (nodeStatusChangeMessages.isEmpty()) {
|
while (nodeStatuses.isEmpty()) {
|
||||||
Thread.sleep(20L);
|
Thread.sleep(20L);
|
||||||
}
|
}
|
||||||
assertEquals(NodeConnectionState.CONNECTING, nodeStatusChangeMessages.get(0).getNodeConnectionStatus().getState());
|
assertEquals(NodeConnectionState.CONNECTING, nodeStatuses.get(0).getState());
|
||||||
nodeStatusChangeMessages.clear();
|
nodeStatuses.clear();
|
||||||
|
|
||||||
// Finish connecting. This should notify all that the status is now 'CONNECTED'
|
// Finish connecting. This should notify all that the status is now 'CONNECTED'
|
||||||
coordinator.finishNodeConnection(nodeId);
|
coordinator.finishNodeConnection(nodeId);
|
||||||
|
|
||||||
while (nodeStatusChangeMessages.isEmpty()) {
|
while (nodeStatuses.isEmpty()) {
|
||||||
Thread.sleep(20L);
|
Thread.sleep(20L);
|
||||||
}
|
}
|
||||||
assertEquals(NodeConnectionState.CONNECTED, nodeStatusChangeMessages.get(0).getNodeConnectionStatus().getState());
|
assertEquals(NodeConnectionState.CONNECTED, nodeStatuses.get(0).getState());
|
||||||
assertEquals(NodeConnectionState.CONNECTED, coordinator.getConnectionStatus(nodeId).getState());
|
assertEquals(NodeConnectionState.CONNECTED, coordinator.getConnectionStatus(nodeId).getState());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 5000)
|
@Test(timeout = 5000)
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
public void testStatusChangesReplicated() throws InterruptedException, IOException {
|
public void testStatusChangesReplicated() throws InterruptedException, IOException {
|
||||||
final ClusterCoordinationProtocolSenderListener senderListener = Mockito.mock(ClusterCoordinationProtocolSenderListener.class);
|
|
||||||
final List<NodeStatusChangeMessage> msgs = Collections.synchronizedList(new ArrayList<>());
|
|
||||||
|
|
||||||
Mockito.doAnswer(new Answer<Object>() {
|
|
||||||
@Override
|
|
||||||
public Object answer(InvocationOnMock invocation) throws Throwable {
|
|
||||||
final NodeStatusChangeMessage statusChangeMessage = invocation.getArgumentAt(1, NodeStatusChangeMessage.class);
|
|
||||||
msgs.add(statusChangeMessage);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}).when(senderListener).notifyNodeStatusChange(Mockito.any(Set.class), Mockito.any(NodeStatusChangeMessage.class));
|
|
||||||
|
|
||||||
final EventReporter eventReporter = Mockito.mock(EventReporter.class);
|
|
||||||
final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
|
final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
|
||||||
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
|
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
|
||||||
final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, revisionManager, createProperties());
|
|
||||||
|
|
||||||
final FlowService flowService = Mockito.mock(FlowService.class);
|
|
||||||
final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], new byte[50], new byte[50]);
|
|
||||||
Mockito.when(flowService.createDataFlow()).thenReturn(dataFlow);
|
|
||||||
coordinator.setFlowService(flowService);
|
|
||||||
|
|
||||||
// Create a connection request message and send to the coordinator
|
// Create a connection request message and send to the coordinator
|
||||||
final NodeIdentifier requestedNodeId = createNodeId(1);
|
final NodeIdentifier requestedNodeId = createNodeId(1);
|
||||||
requestConnection(requestedNodeId, coordinator);
|
requestConnection(requestedNodeId, coordinator);
|
||||||
|
|
||||||
// The above connection request should trigger a 'CONNECTING' state transition to be replicated
|
// The above connection request should trigger a 'CONNECTING' state transition to be replicated
|
||||||
while (msgs.isEmpty()) {
|
while (nodeStatuses.isEmpty()) {
|
||||||
Thread.sleep(20L);
|
Thread.sleep(20L);
|
||||||
}
|
}
|
||||||
final NodeStatusChangeMessage connectingMsg = msgs.get(0);
|
final NodeConnectionStatus connectingStatus = nodeStatuses.get(0);
|
||||||
assertEquals(NodeConnectionState.CONNECTING, connectingMsg.getNodeConnectionStatus().getState());
|
assertEquals(NodeConnectionState.CONNECTING, connectingStatus.getState());
|
||||||
assertEquals(requestedNodeId, connectingMsg.getNodeId());
|
assertEquals(requestedNodeId, connectingStatus.getNodeIdentifier());
|
||||||
|
|
||||||
// set node status to connected
|
// set node status to connected
|
||||||
coordinator.finishNodeConnection(requestedNodeId);
|
coordinator.finishNodeConnection(requestedNodeId);
|
||||||
|
|
||||||
// the above method will result in the node identifier becoming 'CONNECTED'. Wait for this to happen and clear the map
|
// the above method will result in the node identifier becoming 'CONNECTED'. Wait for this to happen and clear the map
|
||||||
while (msgs.isEmpty()) {
|
while (nodeStatuses.isEmpty()) {
|
||||||
Thread.sleep(20L);
|
Thread.sleep(20L);
|
||||||
}
|
}
|
||||||
msgs.clear();
|
nodeStatuses.clear();
|
||||||
|
|
||||||
coordinator.disconnectionRequestedByNode(requestedNodeId, DisconnectionCode.NODE_SHUTDOWN, "Unit Test");
|
coordinator.disconnectionRequestedByNode(requestedNodeId, DisconnectionCode.NODE_SHUTDOWN, "Unit Test");
|
||||||
|
|
||||||
while (msgs.isEmpty()) {
|
while (nodeStatuses.isEmpty()) {
|
||||||
Thread.sleep(20L);
|
Thread.sleep(20L);
|
||||||
}
|
}
|
||||||
|
|
||||||
assertEquals(1, msgs.size());
|
assertEquals(1, nodeStatuses.size());
|
||||||
final NodeStatusChangeMessage statusChangeMsg = msgs.get(0);
|
final NodeConnectionStatus statusChange = nodeStatuses.get(0);
|
||||||
assertNotNull(statusChangeMsg);
|
assertNotNull(statusChange);
|
||||||
assertEquals(createNodeId(1), statusChangeMsg.getNodeId());
|
assertEquals(createNodeId(1), statusChange.getNodeIdentifier());
|
||||||
assertEquals(DisconnectionCode.NODE_SHUTDOWN, statusChangeMsg.getNodeConnectionStatus().getDisconnectCode());
|
assertEquals(DisconnectionCode.NODE_SHUTDOWN, statusChange.getDisconnectCode());
|
||||||
assertEquals("Unit Test", statusChangeMsg.getNodeConnectionStatus().getDisconnectReason());
|
assertEquals("Unit Test", statusChange.getDisconnectReason());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -343,20 +326,20 @@ public class TestNodeClusterCoordinator {
|
||||||
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), NodeConnectionState.CONNECTED, Collections.emptySet()));
|
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), NodeConnectionState.CONNECTED, Collections.emptySet()));
|
||||||
|
|
||||||
// wait for the status change message and clear it
|
// wait for the status change message and clear it
|
||||||
while (nodeStatusChangeMessages.isEmpty()) {
|
while (nodeStatuses.isEmpty()) {
|
||||||
Thread.sleep(10L);
|
Thread.sleep(10L);
|
||||||
}
|
}
|
||||||
nodeStatusChangeMessages.clear();
|
nodeStatuses.clear();
|
||||||
|
|
||||||
coordinator.requestNodeDisconnect(nodeId1, DisconnectionCode.USER_DISCONNECTED, "Unit Test");
|
coordinator.requestNodeDisconnect(nodeId1, DisconnectionCode.USER_DISCONNECTED, "Unit Test");
|
||||||
assertEquals(NodeConnectionState.DISCONNECTED, coordinator.getConnectionStatus(nodeId1).getState());
|
assertEquals(NodeConnectionState.DISCONNECTED, coordinator.getConnectionStatus(nodeId1).getState());
|
||||||
|
|
||||||
while (nodeStatusChangeMessages.isEmpty()) {
|
while (nodeStatuses.isEmpty()) {
|
||||||
Thread.sleep(10L);
|
Thread.sleep(10L);
|
||||||
}
|
}
|
||||||
final NodeStatusChangeMessage msg = nodeStatusChangeMessages.get(0);
|
final NodeConnectionStatus status = nodeStatuses.get(0);
|
||||||
assertEquals(nodeId1, msg.getNodeId());
|
assertEquals(nodeId1, status.getNodeIdentifier());
|
||||||
assertEquals(NodeConnectionState.DISCONNECTED, msg.getNodeConnectionStatus().getState());
|
assertEquals(NodeConnectionState.DISCONNECTED, status.getState());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -364,13 +347,17 @@ public class TestNodeClusterCoordinator {
|
||||||
public void testCannotDisconnectLastNode() throws InterruptedException {
|
public void testCannotDisconnectLastNode() throws InterruptedException {
|
||||||
// Add a connected node
|
// Add a connected node
|
||||||
final NodeIdentifier nodeId1 = createNodeId(1);
|
final NodeIdentifier nodeId1 = createNodeId(1);
|
||||||
|
final NodeIdentifier nodeId2 = createNodeId(2);
|
||||||
coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, NodeConnectionState.CONNECTED, Collections.emptySet()));
|
coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, NodeConnectionState.CONNECTED, Collections.emptySet()));
|
||||||
|
coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId2, NodeConnectionState.CONNECTED, Collections.emptySet()));
|
||||||
|
|
||||||
// wait for the status change message and clear it
|
// wait for the status change message and clear it
|
||||||
while (nodeStatusChangeMessages.isEmpty()) {
|
while (nodeStatuses.isEmpty()) {
|
||||||
Thread.sleep(10L);
|
Thread.sleep(10L);
|
||||||
}
|
}
|
||||||
nodeStatusChangeMessages.clear();
|
nodeStatuses.clear();
|
||||||
|
|
||||||
|
coordinator.requestNodeDisconnect(nodeId2, DisconnectionCode.USER_DISCONNECTED, "Unit Test");
|
||||||
|
|
||||||
try {
|
try {
|
||||||
coordinator.requestNodeDisconnect(nodeId1, DisconnectionCode.USER_DISCONNECTED, "Unit Test");
|
coordinator.requestNodeDisconnect(nodeId1, DisconnectionCode.USER_DISCONNECTED, "Unit Test");
|
||||||
|
@ -378,6 +365,9 @@ public class TestNodeClusterCoordinator {
|
||||||
} catch (final IllegalNodeDisconnectionException inde) {
|
} catch (final IllegalNodeDisconnectionException inde) {
|
||||||
// expected
|
// expected
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Should still be able to request that node 2 disconnect, since it's not the node that is connected
|
||||||
|
coordinator.requestNodeDisconnect(nodeId2, DisconnectionCode.USER_DISCONNECTED, "Unit Test");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -391,10 +381,10 @@ public class TestNodeClusterCoordinator {
|
||||||
coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId2, NodeConnectionState.CONNECTED, Collections.emptySet()));
|
coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId2, NodeConnectionState.CONNECTED, Collections.emptySet()));
|
||||||
|
|
||||||
// wait for the status change message and clear it
|
// wait for the status change message and clear it
|
||||||
while (nodeStatusChangeMessages.size() < 2) {
|
while (nodeStatuses.size() < 2) {
|
||||||
Thread.sleep(10L);
|
Thread.sleep(10L);
|
||||||
}
|
}
|
||||||
nodeStatusChangeMessages.clear();
|
nodeStatuses.clear();
|
||||||
|
|
||||||
final NodeConnectionStatus oldStatus = new NodeConnectionStatus(-1L, nodeId1, NodeConnectionState.DISCONNECTED,
|
final NodeConnectionStatus oldStatus = new NodeConnectionStatus(-1L, nodeId1, NodeConnectionState.DISCONNECTED,
|
||||||
DisconnectionCode.BLOCKED_BY_FIREWALL, null, 0L, null);
|
DisconnectionCode.BLOCKED_BY_FIREWALL, null, 0L, null);
|
||||||
|
@ -405,7 +395,7 @@ public class TestNodeClusterCoordinator {
|
||||||
|
|
||||||
// Ensure that no status change message was send
|
// Ensure that no status change message was send
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
assertTrue(nodeStatusChangeMessages.isEmpty());
|
assertTrue(nodeStatuses.isEmpty());
|
||||||
|
|
||||||
// Status should not have changed because our status id is too small.
|
// Status should not have changed because our status id is too small.
|
||||||
NodeConnectionStatus curStatus = coordinator.getConnectionStatus(nodeId1);
|
NodeConnectionStatus curStatus = coordinator.getConnectionStatus(nodeId1);
|
||||||
|
@ -431,51 +421,51 @@ public class TestNodeClusterCoordinator {
|
||||||
|
|
||||||
coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, NodeConnectionState.CONNECTED, Collections.emptySet()));
|
coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, NodeConnectionState.CONNECTED, Collections.emptySet()));
|
||||||
// wait for the status change message and clear it
|
// wait for the status change message and clear it
|
||||||
while (nodeStatusChangeMessages.isEmpty()) {
|
while (nodeStatuses.isEmpty()) {
|
||||||
Thread.sleep(10L);
|
Thread.sleep(10L);
|
||||||
}
|
}
|
||||||
nodeStatusChangeMessages.clear();
|
nodeStatuses.clear();
|
||||||
|
|
||||||
coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId2, NodeConnectionState.CONNECTED, Collections.emptySet()));
|
coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId2, NodeConnectionState.CONNECTED, Collections.emptySet()));
|
||||||
// wait for the status change message and clear it
|
// wait for the status change message and clear it
|
||||||
while (nodeStatusChangeMessages.isEmpty()) {
|
while (nodeStatuses.isEmpty()) {
|
||||||
Thread.sleep(10L);
|
Thread.sleep(10L);
|
||||||
}
|
}
|
||||||
nodeStatusChangeMessages.clear();
|
nodeStatuses.clear();
|
||||||
|
|
||||||
// Update role of node 1 to primary node
|
// Update role of node 1 to primary node
|
||||||
coordinator.updateNodeRoles(nodeId1, Collections.singleton(ClusterRoles.PRIMARY_NODE));
|
coordinator.updateNodeRoles(nodeId1, Collections.singleton(ClusterRoles.PRIMARY_NODE));
|
||||||
|
|
||||||
// wait for the status change message
|
// wait for the status change message
|
||||||
while (nodeStatusChangeMessages.isEmpty()) {
|
while (nodeStatuses.isEmpty()) {
|
||||||
Thread.sleep(10L);
|
Thread.sleep(10L);
|
||||||
}
|
}
|
||||||
// verify the message
|
// verify the message
|
||||||
final NodeStatusChangeMessage msg = nodeStatusChangeMessages.get(0);
|
final NodeConnectionStatus status = nodeStatuses.get(0);
|
||||||
assertNotNull(msg);
|
assertNotNull(status);
|
||||||
assertEquals(nodeId1, msg.getNodeId());
|
assertEquals(nodeId1, status.getNodeIdentifier());
|
||||||
assertEquals(NodeConnectionState.CONNECTED, msg.getNodeConnectionStatus().getState());
|
assertEquals(NodeConnectionState.CONNECTED, status.getState());
|
||||||
assertEquals(Collections.singleton(ClusterRoles.PRIMARY_NODE), msg.getNodeConnectionStatus().getRoles());
|
assertEquals(Collections.singleton(ClusterRoles.PRIMARY_NODE), status.getRoles());
|
||||||
nodeStatusChangeMessages.clear();
|
nodeStatuses.clear();
|
||||||
|
|
||||||
// Update role of node 2 to primary node. This should trigger 2 status changes -
|
// Update role of node 2 to primary node. This should trigger 2 status changes -
|
||||||
// node 1 should lose primary role & node 2 should gain it
|
// node 1 should lose primary role & node 2 should gain it
|
||||||
coordinator.updateNodeRoles(nodeId2, Collections.singleton(ClusterRoles.PRIMARY_NODE));
|
coordinator.updateNodeRoles(nodeId2, Collections.singleton(ClusterRoles.PRIMARY_NODE));
|
||||||
|
|
||||||
// wait for the status change message
|
// wait for the status change message
|
||||||
while (nodeStatusChangeMessages.size() < 2) {
|
while (nodeStatuses.size() < 2) {
|
||||||
Thread.sleep(10L);
|
Thread.sleep(10L);
|
||||||
}
|
}
|
||||||
|
|
||||||
final NodeStatusChangeMessage msg1 = nodeStatusChangeMessages.get(0);
|
final NodeConnectionStatus status1 = nodeStatuses.get(0);
|
||||||
final NodeStatusChangeMessage msg2 = nodeStatusChangeMessages.get(1);
|
final NodeConnectionStatus status2 = nodeStatuses.get(1);
|
||||||
final NodeStatusChangeMessage id1Msg = (msg1.getNodeId().equals(nodeId1)) ? msg1 : msg2;
|
final NodeConnectionStatus id1Msg = (status1.getNodeIdentifier().equals(nodeId1)) ? status1 : status2;
|
||||||
final NodeStatusChangeMessage id2Msg = (msg1.getNodeId().equals(nodeId2)) ? msg1 : msg2;
|
final NodeConnectionStatus id2Msg = (status1.getNodeIdentifier().equals(nodeId2)) ? status1 : status2;
|
||||||
|
|
||||||
assertNotSame(id1Msg, id2Msg);
|
assertNotSame(id1Msg, id2Msg);
|
||||||
|
|
||||||
assertTrue(id1Msg.getNodeConnectionStatus().getRoles().isEmpty());
|
assertTrue(id1Msg.getRoles().isEmpty());
|
||||||
assertEquals(Collections.singleton(ClusterRoles.PRIMARY_NODE), id2Msg.getNodeConnectionStatus().getRoles());
|
assertEquals(Collections.singleton(ClusterRoles.PRIMARY_NODE), id2Msg.getRoles());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -513,7 +503,6 @@ public class TestNodeClusterCoordinator {
|
||||||
assertEquals(conflictingId.getSocketPort(), conflictingNodeId.getSocketPort());
|
assertEquals(conflictingId.getSocketPort(), conflictingNodeId.getSocketPort());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private NodeIdentifier createNodeId(final int index) {
|
private NodeIdentifier createNodeId(final int index) {
|
||||||
return new NodeIdentifier(String.valueOf(index), "localhost", 8000 + index, "localhost", 9000 + index, "localhost", 10000 + index, 11000 + index, false);
|
return new NodeIdentifier(String.valueOf(index), "localhost", 8000 + index, "localhost", 9000 + index, "localhost", 10000 + index, 11000 + index, false);
|
||||||
}
|
}
|
||||||
|
|
|
@ -3260,7 +3260,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
||||||
private void registerForClusterCoordinator() {
|
private void registerForClusterCoordinator() {
|
||||||
leaderElectionManager.register(ClusterRoles.CLUSTER_COORDINATOR, new LeaderElectionStateChangeListener() {
|
leaderElectionManager.register(ClusterRoles.CLUSTER_COORDINATOR, new LeaderElectionStateChangeListener() {
|
||||||
@Override
|
@Override
|
||||||
public void onLeaderRelinquish() {
|
public synchronized void onLeaderRelinquish() {
|
||||||
heartbeatMonitor.stop();
|
heartbeatMonitor.stop();
|
||||||
|
|
||||||
if (clusterCoordinator != null) {
|
if (clusterCoordinator != null) {
|
||||||
|
@ -3269,7 +3269,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onLeaderElection() {
|
public synchronized void onLeaderElection() {
|
||||||
heartbeatMonitor.start();
|
heartbeatMonitor.start();
|
||||||
|
|
||||||
if (clusterCoordinator != null) {
|
if (clusterCoordinator != null) {
|
||||||
|
|
|
@ -98,7 +98,7 @@ public class StandardNiFiContentAccess implements ContentAccess {
|
||||||
throw new NoClusterCoordinatorException();
|
throw new NoClusterCoordinatorException();
|
||||||
}
|
}
|
||||||
final Set<NodeIdentifier> coordinatorNodes = Collections.singleton(coordinatorNode);
|
final Set<NodeIdentifier> coordinatorNodes = Collections.singleton(coordinatorNode);
|
||||||
nodeResponse = requestReplicator.replicate(coordinatorNodes, HttpMethod.GET, dataUri, parameters, headers, false).awaitMergedResponse();
|
nodeResponse = requestReplicator.replicate(coordinatorNodes, HttpMethod.GET, dataUri, parameters, headers, false, true).awaitMergedResponse();
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
throw new IllegalClusterStateException("Interrupted while waiting for a response from node");
|
throw new IllegalClusterStateException("Interrupted while waiting for a response from node");
|
||||||
}
|
}
|
||||||
|
|
|
@ -297,7 +297,7 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
|
||||||
}
|
}
|
||||||
|
|
||||||
final Set<NodeIdentifier> coordinatorNodes = Collections.singleton(coordinatorNode);
|
final Set<NodeIdentifier> coordinatorNodes = Collections.singleton(coordinatorNode);
|
||||||
return requestReplicator.replicate(coordinatorNodes, method, uri, entity, headers, false).awaitMergedResponse();
|
return requestReplicator.replicate(coordinatorNodes, method, uri, entity, headers, false, true).awaitMergedResponse();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -565,11 +565,11 @@ public abstract class ApplicationResource {
|
||||||
// If we are to replicate directly to the nodes, we need to indicate that the replication source is
|
// If we are to replicate directly to the nodes, we need to indicate that the replication source is
|
||||||
// the cluster coordinator so that the node knows to service the request.
|
// the cluster coordinator so that the node knows to service the request.
|
||||||
final Set<NodeIdentifier> targetNodes = Collections.singleton(nodeId);
|
final Set<NodeIdentifier> targetNodes = Collections.singleton(nodeId);
|
||||||
return requestReplicator.replicate(targetNodes, method, path, entity, headers, true).awaitMergedResponse().getResponse();
|
return requestReplicator.replicate(targetNodes, method, path, entity, headers, true, true).awaitMergedResponse().getResponse();
|
||||||
} else {
|
} else {
|
||||||
headers.put(RequestReplicator.REPLICATION_TARGET_NODE_UUID_HEADER, nodeId.getId());
|
headers.put(RequestReplicator.REPLICATION_TARGET_NODE_UUID_HEADER, nodeId.getId());
|
||||||
return requestReplicator.replicate(Collections.singleton(getClusterCoordinatorNode()), method,
|
return requestReplicator.replicate(Collections.singleton(getClusterCoordinatorNode()), method,
|
||||||
path, entity, headers, false).awaitMergedResponse().getResponse();
|
path, entity, headers, false, true).awaitMergedResponse().getResponse();
|
||||||
}
|
}
|
||||||
} catch (final InterruptedException ie) {
|
} catch (final InterruptedException ie) {
|
||||||
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Request to " + method + " " + path + " was interrupted").type("text/plain").build();
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Request to " + method + " " + path + " was interrupted").type("text/plain").build();
|
||||||
|
@ -589,23 +589,38 @@ public abstract class ApplicationResource {
|
||||||
return clusterCoordinator.isActiveClusterCoordinator() ? ReplicationTarget.CLUSTER_NODES : ReplicationTarget.CLUSTER_COORDINATOR;
|
return clusterCoordinator.isActiveClusterCoordinator() ? ReplicationTarget.CLUSTER_NODES : ReplicationTarget.CLUSTER_COORDINATOR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
protected Response replicate(final String method, final NodeIdentifier targetNode) {
|
protected Response replicate(final String method, final NodeIdentifier targetNode) {
|
||||||
|
return replicate(method, targetNode, getRequestParameters());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Response replicate(final String method, final NodeIdentifier targetNode, final Object entity) {
|
||||||
try {
|
try {
|
||||||
// Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly
|
// Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly
|
||||||
// to the cluster nodes themselves.
|
// to the cluster nodes themselves.
|
||||||
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
|
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
|
||||||
final Set<NodeIdentifier> nodeIds = Collections.singleton(targetNode);
|
final Set<NodeIdentifier> nodeIds = Collections.singleton(targetNode);
|
||||||
return getRequestReplicator().replicate(nodeIds, method, getAbsolutePath(), getRequestParameters(), getHeaders(), true).awaitMergedResponse().getResponse();
|
return getRequestReplicator().replicate(nodeIds, method, getAbsolutePath(), entity, getHeaders(), true, true).awaitMergedResponse().getResponse();
|
||||||
} else {
|
} else {
|
||||||
final Set<NodeIdentifier> coordinatorNode = Collections.singleton(getClusterCoordinatorNode());
|
final Set<NodeIdentifier> coordinatorNode = Collections.singleton(getClusterCoordinatorNode());
|
||||||
final Map<String, String> headers = getHeaders(Collections.singletonMap(RequestReplicator.REPLICATION_TARGET_NODE_UUID_HEADER, targetNode.getId()));
|
final Map<String, String> headers = getHeaders(Collections.singletonMap(RequestReplicator.REPLICATION_TARGET_NODE_UUID_HEADER, targetNode.getId()));
|
||||||
return getRequestReplicator().replicate(coordinatorNode, method, getAbsolutePath(), getRequestParameters(), headers, false).awaitMergedResponse().getResponse();
|
return getRequestReplicator().replicate(coordinatorNode, method, getAbsolutePath(), entity, headers, false, true).awaitMergedResponse().getResponse();
|
||||||
}
|
}
|
||||||
} catch (final InterruptedException ie) {
|
} catch (final InterruptedException ie) {
|
||||||
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Request to " + method + " " + getAbsolutePath() + " was interrupted").type("text/plain").build();
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Request to " + method + " " + getAbsolutePath() + " was interrupted").type("text/plain").build();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected Response replicateToCoordinator(final String method, final Object entity) {
|
||||||
|
try {
|
||||||
|
final NodeIdentifier coordinatorNode = getClusterCoordinatorNode();
|
||||||
|
final Set<NodeIdentifier> coordinatorNodes = Collections.singleton(coordinatorNode);
|
||||||
|
return getRequestReplicator().replicate(coordinatorNodes, method, getAbsolutePath(), entity, getHeaders(), true, false).awaitMergedResponse().getResponse();
|
||||||
|
} catch (final InterruptedException ie) {
|
||||||
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Request to " + method + " " + getAbsolutePath() + " was interrupted").type("text/plain").build();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Convenience method for calling {@link #replicate(String, Object)} with an entity of
|
* Convenience method for calling {@link #replicate(String, Object)} with an entity of
|
||||||
* {@link #getRequestParameters() getRequestParameters(true)}
|
* {@link #getRequestParameters() getRequestParameters(true)}
|
||||||
|
@ -685,7 +700,7 @@ public abstract class ApplicationResource {
|
||||||
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
|
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
|
||||||
return requestReplicator.replicate(method, path, entity, headers).awaitMergedResponse();
|
return requestReplicator.replicate(method, path, entity, headers).awaitMergedResponse();
|
||||||
} else {
|
} else {
|
||||||
return requestReplicator.replicate(Collections.singleton(getClusterCoordinatorNode()), method, path, entity, headers, false).awaitMergedResponse();
|
return requestReplicator.replicate(Collections.singleton(getClusterCoordinatorNode()), method, path, entity, headers, false, true).awaitMergedResponse();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -548,6 +548,10 @@ public class ControllerResource extends ApplicationResource {
|
||||||
+ "not equal the node id of the requested resource (%s).", requestNodeDTO.getNodeId(), id));
|
+ "not equal the node id of the requested resource (%s).", requestNodeDTO.getNodeId(), id));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (isReplicateRequest()) {
|
||||||
|
return replicateToCoordinator(HttpMethod.PUT, nodeEntity);
|
||||||
|
}
|
||||||
|
|
||||||
// update the node
|
// update the node
|
||||||
final NodeDTO node = serviceFacade.updateNode(requestNodeDTO);
|
final NodeDTO node = serviceFacade.updateNode(requestNodeDTO);
|
||||||
|
|
||||||
|
@ -600,6 +604,10 @@ public class ControllerResource extends ApplicationResource {
|
||||||
throw new IllegalClusterResourceRequestException("Only a node connected to a cluster can process the request.");
|
throw new IllegalClusterResourceRequestException("Only a node connected to a cluster can process the request.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (isReplicateRequest()) {
|
||||||
|
return replicateToCoordinator(HttpMethod.DELETE, getRequestParameters());
|
||||||
|
}
|
||||||
|
|
||||||
serviceFacade.deleteNode(id);
|
serviceFacade.deleteNode(id);
|
||||||
|
|
||||||
// create the response entity
|
// create the response entity
|
||||||
|
|
|
@ -168,7 +168,7 @@ public class CountersResource extends ApplicationResource {
|
||||||
nodeResponse = getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders()).awaitMergedResponse();
|
nodeResponse = getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders()).awaitMergedResponse();
|
||||||
} else {
|
} else {
|
||||||
final Set<NodeIdentifier> coordinatorNode = Collections.singleton(getClusterCoordinatorNode());
|
final Set<NodeIdentifier> coordinatorNode = Collections.singleton(getClusterCoordinatorNode());
|
||||||
nodeResponse = getRequestReplicator().replicate(coordinatorNode, HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders(), false).awaitMergedResponse();
|
nodeResponse = getRequestReplicator().replicate(coordinatorNode, HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders(), false, true).awaitMergedResponse();
|
||||||
}
|
}
|
||||||
|
|
||||||
final CountersEntity entity = (CountersEntity) nodeResponse.getUpdatedEntity();
|
final CountersEntity entity = (CountersEntity) nodeResponse.getUpdatedEntity();
|
||||||
|
|
|
@ -1980,7 +1980,7 @@ public class ProcessGroupResource extends ApplicationResource {
|
||||||
return getRequestReplicator().replicate(HttpMethod.POST, importUri, entity, getHeaders(headersToOverride)).awaitMergedResponse().getResponse();
|
return getRequestReplicator().replicate(HttpMethod.POST, importUri, entity, getHeaders(headersToOverride)).awaitMergedResponse().getResponse();
|
||||||
} else {
|
} else {
|
||||||
final Set<NodeIdentifier> coordinatorNode = Collections.singleton(getClusterCoordinatorNode());
|
final Set<NodeIdentifier> coordinatorNode = Collections.singleton(getClusterCoordinatorNode());
|
||||||
return getRequestReplicator().replicate(coordinatorNode, HttpMethod.POST, importUri, entity, getHeaders(headersToOverride), false).awaitMergedResponse().getResponse();
|
return getRequestReplicator().replicate(coordinatorNode, HttpMethod.POST, importUri, entity, getHeaders(headersToOverride), false, true).awaitMergedResponse().getResponse();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -147,7 +147,7 @@ public class SystemDiagnosticsResource extends ApplicationResource {
|
||||||
nodeResponse = getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders()).awaitMergedResponse();
|
nodeResponse = getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders()).awaitMergedResponse();
|
||||||
} else {
|
} else {
|
||||||
final Set<NodeIdentifier> coordinatorNode = Collections.singleton(getClusterCoordinatorNode());
|
final Set<NodeIdentifier> coordinatorNode = Collections.singleton(getClusterCoordinatorNode());
|
||||||
nodeResponse = getRequestReplicator().replicate(coordinatorNode, HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders(), false).awaitMergedResponse();
|
nodeResponse = getRequestReplicator().replicate(coordinatorNode, HttpMethod.GET, getAbsolutePath(), getRequestParameters(), getHeaders(), false, true).awaitMergedResponse();
|
||||||
}
|
}
|
||||||
|
|
||||||
final SystemDiagnosticsEntity entity = (SystemDiagnosticsEntity) nodeResponse.getUpdatedEntity();
|
final SystemDiagnosticsEntity entity = (SystemDiagnosticsEntity) nodeResponse.getUpdatedEntity();
|
||||||
|
|
Loading…
Reference in New Issue