From 049970af3ef4de127601474ce9db33c590e40e54 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 12 Mar 2019 19:26:25 +0000 Subject: [PATCH] Only connect to new nodes on new cluster state (#39629) Today, when applying new cluster state we attempt to connect to all of its nodes as a blocking part of the application process. This is the right thing to do with new nodes, and is a no-op on any already-connected nodes, but is questionable on known nodes from which we are currently disconnected: there is a risk that we are partitioned from these nodes so that any attempt to connect to them will hang until it times out. This can dramatically slow down the application of new cluster states which hinders the recovery of the cluster during certain kinds of partition. If nodes are disconnected from the master then it is likely that they are to be removed as part of a subsequent cluster state update, so there's no need to try and reconnect to them like this. Moreover there is no need to attempt to reconnect to disconnected nodes as part of the cluster state application process, because we periodically try and reconnect to any disconnected nodes, and handle their disconnectedness reasonably gracefully in the meantime. This commit alters this behaviour to avoid reconnecting to known nodes during cluster state application. Resolves #29025. --- .../cluster/NodeConnectionsService.java | 436 +++++++++++++----- .../service/ClusterApplierService.java | 15 +- .../cluster/NodeConnectionsServiceTests.java | 340 ++++++++++---- .../coordination/CoordinatorTests.java | 12 +- .../service/ClusterApplierServiceTests.java | 18 +- .../snapshots/SnapshotResiliencyTests.java | 23 +- .../test/ClusterServiceUtils.java | 27 +- .../test/disruption/NetworkDisruption.java | 15 +- 8 files changed, 643 insertions(+), 243 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java b/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java index 28249754b50..dcb24008a28 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java @@ -18,58 +18,75 @@ */ package org.elasticsearch.cluster; +import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.GroupedActionListener; +import org.elasticsearch.action.support.PlainListenableActionFuture; import org.elasticsearch.cluster.coordination.FollowersChecker; -import org.elasticsearch.cluster.coordination.LeaderChecker; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterApplier; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.common.util.concurrent.KeyedLock; -import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.common.settings.Setting.Property; import static org.elasticsearch.common.settings.Setting.positiveTimeSetting; - /** - * This component is responsible for connecting to nodes once they are added to the cluster state, and disconnect when they are - * removed. Also, it periodically checks that all connections are still open and if needed restores them. - * Note that this component is *not* responsible for removing nodes from the cluster if they disconnect / do not respond - * to pings. This is done by {@link FollowersChecker}. Master fault detection is done by {@link LeaderChecker}. + * This component is responsible for maintaining connections from this node to all the nodes listed in the cluster state, and for + * disconnecting from nodes once they are removed from the cluster state. It periodically checks that all connections are still open and + * restores them if needed. Note that this component is *not* responsible for removing nodes from the cluster state if they disconnect or + * are unresponsive: this is the job of the master's fault detection components, particularly {@link FollowersChecker}. + *

+ * The {@link NodeConnectionsService#connectToNodes(DiscoveryNodes, Runnable)} and {@link + * NodeConnectionsService#disconnectFromNodesExcept(DiscoveryNodes)} methods are called on the {@link ClusterApplier} thread. This component + * allows the {@code ClusterApplier} to block on forming connections to _new_ nodes, because the rest of the system treats a missing + * connection with another node in the cluster state as an exceptional condition and we don't want this to happen to new nodes. However we + * need not block on re-establishing existing connections because if a connection is down then we are already in an exceptional situation + * and it doesn't matter much if we stay in this situation a little longer. + *

+ * This component does not block on disconnections at all, because a disconnection might need to wait for an ongoing (background) connection + * attempt to complete first. */ public class NodeConnectionsService extends AbstractLifecycleComponent { private static final Logger logger = LogManager.getLogger(NodeConnectionsService.class); public static final Setting CLUSTER_NODE_RECONNECT_INTERVAL_SETTING = - positiveTimeSetting("cluster.nodes.reconnect_interval", TimeValue.timeValueSeconds(10), Property.NodeScope); + positiveTimeSetting("cluster.nodes.reconnect_interval", TimeValue.timeValueSeconds(10), Property.NodeScope); + private final ThreadPool threadPool; private final TransportService transportService; - // map between current node and the number of failed connection attempts. 0 means successfully connected. - // if a node doesn't appear in this list it shouldn't be monitored - private ConcurrentMap nodes = ConcurrentCollections.newConcurrentMap(); + // Protects changes to targetsByNode and its values (i.e. ConnectionTarget#activityType and ConnectionTarget#listener). + // Crucially there are no blocking calls under this mutex: it is not held while connecting or disconnecting. + private final Object mutex = new Object(); - private final KeyedLock nodeLocks = new KeyedLock<>(); + // contains an entry for every node in the latest cluster state, as well as for nodes from which we are in the process of + // disconnecting + private final Map targetsByNode = new HashMap<>(); private final TimeValue reconnectInterval; - - private volatile Scheduler.Cancellable backgroundCancellable = null; + private volatile ConnectionChecker connectionChecker; @Inject public NodeConnectionsService(Settings settings, ThreadPool threadPool, TransportService transportService) { @@ -78,132 +95,337 @@ public class NodeConnectionsService extends AbstractLifecycleComponent { this.reconnectInterval = NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.get(settings); } - public void connectToNodes(DiscoveryNodes discoveryNodes) { - CountDownLatch latch = new CountDownLatch(discoveryNodes.getSize()); - for (final DiscoveryNode node : discoveryNodes) { - final boolean connected; - try (Releasable ignored = nodeLocks.acquire(node)) { - nodes.putIfAbsent(node, 0); - connected = transportService.nodeConnected(node); - } - if (connected) { - latch.countDown(); - } else { - // spawn to another thread to do in parallel - threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - // both errors and rejections are logged here. the service - // will try again after `cluster.nodes.reconnect_interval` on all nodes but the current master. - // On the master, node fault detection will remove these nodes from the cluster as their are not - // connected. Note that it is very rare that we end up here on the master. - logger.warn(() -> new ParameterizedMessage("failed to connect to {}", node), e); - } + /** + * Connect to all the given nodes, but do not disconnect from any extra nodes. Calls the completion handler on completion of all + * connection attempts to _new_ nodes, but not on attempts to re-establish connections to nodes that are already known. + */ + public void connectToNodes(DiscoveryNodes discoveryNodes, Runnable onCompletion) { - @Override - protected void doRun() { - try (Releasable ignored = nodeLocks.acquire(node)) { - validateAndConnectIfNeeded(node); - } - } + if (discoveryNodes.getSize() == 0) { + onCompletion.run(); + return; + } - @Override - public void onAfter() { - latch.countDown(); - } - }); + final GroupedActionListener listener + = new GroupedActionListener<>(ActionListener.wrap(onCompletion), discoveryNodes.getSize()); + + final List runnables = new ArrayList<>(discoveryNodes.getSize()); + synchronized (mutex) { + for (final DiscoveryNode discoveryNode : discoveryNodes) { + ConnectionTarget connectionTarget = targetsByNode.get(discoveryNode); + final boolean isNewNode; + if (connectionTarget == null) { + // new node, set up target and listener + connectionTarget = new ConnectionTarget(discoveryNode); + targetsByNode.put(discoveryNode, connectionTarget); + isNewNode = true; + } else { + // existing node, but maybe we're disconnecting from it, in which case it was recently removed from the cluster + // state and has now been re-added so we should wait for the re-connection + isNewNode = connectionTarget.isPendingDisconnection(); + } + + if (isNewNode) { + runnables.add(connectionTarget.connect(listener)); + } else { + // known node, try and ensure it's connected but do not wait + runnables.add(connectionTarget.connect(null)); + runnables.add(() -> listener.onResponse(null)); + } } } - try { - latch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + runnables.forEach(Runnable::run); } /** - * Disconnects from all nodes except the ones provided as parameter + * Disconnect from any nodes to which we are currently connected which do not appear in the given nodes. Does not wait for the + * disconnections to complete, because they might have to wait for ongoing connection attempts first. */ - public void disconnectFromNodesExcept(DiscoveryNodes nodesToKeep) { - Set currentNodes = new HashSet<>(nodes.keySet()); - for (DiscoveryNode node : nodesToKeep) { - currentNodes.remove(node); - } - for (final DiscoveryNode node : currentNodes) { - try (Releasable ignored = nodeLocks.acquire(node)) { - Integer current = nodes.remove(node); - assert current != null : "node " + node + " was removed in event but not in internal nodes"; - try { - transportService.disconnectFromNode(node); - } catch (Exception e) { - logger.warn(() -> new ParameterizedMessage("failed to disconnect to node [{}]", node), e); - } + public void disconnectFromNodesExcept(DiscoveryNodes discoveryNodes) { + final List runnables = new ArrayList<>(); + synchronized (mutex) { + final Set nodesToDisconnect = new HashSet<>(targetsByNode.keySet()); + for (final DiscoveryNode discoveryNode : discoveryNodes) { + nodesToDisconnect.remove(discoveryNode); + } + + for (final DiscoveryNode discoveryNode : nodesToDisconnect) { + runnables.add(targetsByNode.get(discoveryNode).disconnect()); } } + runnables.forEach(Runnable::run); } - void validateAndConnectIfNeeded(DiscoveryNode node) { - assert nodeLocks.isHeldByCurrentThread(node) : "validateAndConnectIfNeeded must be called under lock"; - if (lifecycle.stoppedOrClosed() || - nodes.containsKey(node) == false) { // we double check existence of node since connectToNode might take time... - // nothing to do - } else { - try { - // connecting to an already connected node is a noop - transportService.connectToNode(node); - nodes.put(node, 0); - } catch (Exception e) { - Integer nodeFailureCount = nodes.get(node); - assert nodeFailureCount != null : node + " didn't have a counter in nodes map"; - nodeFailureCount = nodeFailureCount + 1; - // log every 6th failure - if ((nodeFailureCount % 6) == 1) { - final int finalNodeFailureCount = nodeFailureCount; - logger.warn(() -> new ParameterizedMessage( - "failed to connect to node {} (tried [{}] times)", node, finalNodeFailureCount), e); + /** + * Makes a single attempt to reconnect to any nodes which are disconnected but should be connected. Does not attempt to reconnect any + * nodes which are in the process of disconnecting. The onCompletion handler is called after all ongoing connection/disconnection + * attempts have completed. + */ + void ensureConnections(Runnable onCompletion) { + final List runnables = new ArrayList<>(); + synchronized (mutex) { + final Collection connectionTargets = targetsByNode.values(); + if (connectionTargets.isEmpty()) { + runnables.add(onCompletion); + } else { + logger.trace("ensuring connections to {}", targetsByNode); + final GroupedActionListener listener = new GroupedActionListener<>( + ActionListener.wrap(onCompletion), connectionTargets.size()); + for (final ConnectionTarget connectionTarget : connectionTargets) { + runnables.add(connectionTarget.ensureConnected(listener)); } - nodes.put(node, nodeFailureCount); } } + runnables.forEach(Runnable::run); } class ConnectionChecker extends AbstractRunnable { + protected void doRun() { + if (connectionChecker == this) { + ensureConnections(this::scheduleNextCheck); + } + } + + void scheduleNextCheck() { + if (connectionChecker == this) { + threadPool.scheduleUnlessShuttingDown(reconnectInterval, ThreadPool.Names.GENERIC, this); + } + } @Override public void onFailure(Exception e) { logger.warn("unexpected error while checking for node reconnects", e); - } - - protected void doRun() { - for (DiscoveryNode node : nodes.keySet()) { - try (Releasable ignored = nodeLocks.acquire(node)) { - validateAndConnectIfNeeded(node); - } - } + scheduleNextCheck(); } @Override - public void onAfter() { - if (lifecycle.started()) { - backgroundCancellable = threadPool.schedule(this, reconnectInterval, ThreadPool.Names.GENERIC); - } + public String toString() { + return "periodic reconnection checker"; } } @Override protected void doStart() { - backgroundCancellable = threadPool.schedule(new ConnectionChecker(), reconnectInterval, ThreadPool.Names.GENERIC); + final ConnectionChecker connectionChecker = new ConnectionChecker(); + this.connectionChecker = connectionChecker; + connectionChecker.scheduleNextCheck(); } @Override protected void doStop() { - if (backgroundCancellable != null) { - backgroundCancellable.cancel(); - } + connectionChecker = null; } @Override protected void doClose() { + } + // for disruption tests, re-establish any disrupted connections + public void reconnectToNodes(DiscoveryNodes discoveryNodes, Runnable onCompletion) { + connectToNodes(discoveryNodes, () -> { + disconnectFromNodesExcept(discoveryNodes); + ensureConnections(onCompletion); + }); + } + + private enum ActivityType { + IDLE, + CONNECTING, + DISCONNECTING + } + + /** + * {@link ConnectionTarget} ensures that we are never concurrently connecting to and disconnecting from a node, and that we eventually + * either connect to or disconnect from it according to whether {@link ConnectionTarget#connect(ActionListener)} or + * {@link ConnectionTarget#disconnect()} was called last. + *

+ * Each {@link ConnectionTarget} is in one of these states: + *

+ * - idle ({@link ConnectionTarget#future} has no listeners) + * - awaiting connection ({@link ConnectionTarget#future} may contain listeners awaiting a connection) + * - awaiting disconnection ({@link ConnectionTarget#future} may contain listeners awaiting a disconnection) + *

+ * It will be awaiting connection (respectively disconnection) after calling {@code connect()} (respectively {@code disconnect()}). It + * will eventually become idle if these methods are not called infinitely often. + *

+ * These methods return a {@link Runnable} which starts the connection/disconnection process iff it was idle before the method was + * called, and which notifies any failed listeners if the {@code ConnectionTarget} went from {@code CONNECTING} to {@code DISCONNECTING} + * or vice versa. The connection/disconnection process continues until all listeners have been removed, at which point it becomes idle + * again. + *

+ * Additionally if the last step of the process was a disconnection then this target is removed from the current set of targets. Thus + * if this {@link ConnectionTarget} is idle and in the current set of targets then it should be connected. + *

+ * All of the {@code listeners} are awaiting the completion of the same activity, which is either a connection or a disconnection. If + * we are currently connecting and then {@link ConnectionTarget#disconnect()} is called then all connection listeners are + * removed from the list so they can be notified of failure; once the connecting process has finished a disconnection will be started. + * Similarly if we are currently disconnecting and then {@link ConnectionTarget#connect(ActionListener)} is called then all + * disconnection listeners are immediately removed for failure notification and a connection is started once the disconnection is + * complete. + */ + private class ConnectionTarget { + private final DiscoveryNode discoveryNode; + + private PlainListenableActionFuture future = PlainListenableActionFuture.newListenableFuture(); + private ActivityType activityType = ActivityType.IDLE; // indicates what any listeners are awaiting + + private final AtomicInteger consecutiveFailureCount = new AtomicInteger(); + + private final Runnable connectActivity = () -> threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() { + @Override + protected void doRun() { + assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held"; + transportService.connectToNode(discoveryNode); + consecutiveFailureCount.set(0); + logger.debug("connected to {}", discoveryNode); + onCompletion(ActivityType.CONNECTING, null, disconnectActivity); + } + + @Override + public void onFailure(Exception e) { + assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held"; + final int currentFailureCount = consecutiveFailureCount.incrementAndGet(); + // only warn every 6th failure + final Level level = currentFailureCount % 6 == 1 ? Level.WARN : Level.DEBUG; + logger.log(level, new ParameterizedMessage("failed to connect to {} (tried [{}] times)", + discoveryNode, currentFailureCount), e); + onCompletion(ActivityType.CONNECTING, e, disconnectActivity); + } + + @Override + public String toString() { + return "connect to " + discoveryNode; + } + }); + + private final Runnable disconnectActivity = new AbstractRunnable() { + @Override + protected void doRun() { + assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held"; + transportService.disconnectFromNode(discoveryNode); + consecutiveFailureCount.set(0); + logger.debug("disconnected from {}", discoveryNode); + onCompletion(ActivityType.DISCONNECTING, null, connectActivity); + } + + @Override + public void onFailure(Exception e) { + assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held"; + consecutiveFailureCount.incrementAndGet(); + // we may not have disconnected, but will not retry, so this connection might have leaked + logger.warn(new ParameterizedMessage("failed to disconnect from {}, possible connection leak", discoveryNode), e); + assert false : "failed to disconnect from " + discoveryNode + ", possible connection leak\n" + e; + onCompletion(ActivityType.DISCONNECTING, e, connectActivity); + } + }; + + ConnectionTarget(DiscoveryNode discoveryNode) { + this.discoveryNode = discoveryNode; + } + + Runnable connect(@Nullable ActionListener listener) { + return addListenerAndStartActivity(listener, ActivityType.CONNECTING, connectActivity, + "disconnection cancelled by reconnection"); + } + + Runnable disconnect() { + return addListenerAndStartActivity(null, ActivityType.DISCONNECTING, disconnectActivity, + "connection cancelled by disconnection"); + } + + Runnable ensureConnected(@Nullable ActionListener listener) { + assert Thread.holdsLock(mutex) : "mutex not held"; + + if (activityType == ActivityType.IDLE) { + if (transportService.nodeConnected(discoveryNode)) { + return () -> listener.onResponse(null); + } else { + // target is disconnected, and we are currently idle, so start a connection process. + activityType = ActivityType.CONNECTING; + addListener(listener); + return connectActivity; + } + } else { + addListener(listener); + return () -> { + }; + } + } + + private void addListener(@Nullable ActionListener listener) { + assert Thread.holdsLock(mutex) : "mutex not held"; + assert activityType != ActivityType.IDLE; + if (listener != null) { + future.addListener(listener); + } + } + + private PlainListenableActionFuture getAndClearFuture() { + assert Thread.holdsLock(mutex) : "mutex not held"; + final PlainListenableActionFuture drainedFuture = future; + future = PlainListenableActionFuture.newListenableFuture(); + return drainedFuture; + } + + private Runnable addListenerAndStartActivity(@Nullable ActionListener listener, ActivityType newActivityType, + Runnable activity, String cancellationMessage) { + assert Thread.holdsLock(mutex) : "mutex not held"; + assert newActivityType.equals(ActivityType.IDLE) == false; + + if (activityType == ActivityType.IDLE) { + activityType = newActivityType; + addListener(listener); + return activity; + } + + if (activityType == newActivityType) { + addListener(listener); + return () -> { + }; + } + + activityType = newActivityType; + final PlainListenableActionFuture oldFuture = getAndClearFuture(); + addListener(listener); + return () -> oldFuture.onFailure(new ElasticsearchException(cancellationMessage)); + } + + private void onCompletion(ActivityType completedActivityType, @Nullable Exception e, Runnable oppositeActivity) { + assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held"; + + final Runnable cleanup; + synchronized (mutex) { + assert activityType != ActivityType.IDLE; + if (activityType == completedActivityType) { + final PlainListenableActionFuture oldFuture = getAndClearFuture(); + activityType = ActivityType.IDLE; + + cleanup = e == null ? () -> oldFuture.onResponse(null) : () -> oldFuture.onFailure(e); + + if (completedActivityType.equals(ActivityType.DISCONNECTING)) { + final ConnectionTarget removedTarget = targetsByNode.remove(discoveryNode); + assert removedTarget == this : removedTarget + " vs " + this; + } + } else { + cleanup = oppositeActivity; + } + } + cleanup.run(); + } + + boolean isPendingDisconnection() { + assert Thread.holdsLock(mutex) : "mutex not held"; + return activityType == ActivityType.DISCONNECTING; + } + + @Override + public String toString() { + synchronized (mutex) { + return "ConnectionTarget{" + + "discoveryNode=" + discoveryNode + + ", activityType=" + activityType + + '}'; + } + } } } diff --git a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java index f2b0756d3d8..11b6f451d59 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java @@ -55,6 +55,7 @@ import java.util.Objects; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -450,7 +451,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements } logger.trace("connecting to nodes of cluster state with version {}", newClusterState.version()); - nodeConnectionsService.connectToNodes(newClusterState.nodes()); + connectToNodesAndWait(newClusterState); // nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency if (clusterChangedEvent.state().blocks().disableStatePersistence() == false && clusterChangedEvent.metaDataChanged()) { @@ -470,6 +471,18 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements callClusterStateListeners(clusterChangedEvent); } + protected void connectToNodesAndWait(ClusterState newClusterState) { + // can't wait for an ActionFuture on the cluster applier thread, but we do want to block the thread here, so use a CountDownLatch. + final CountDownLatch countDownLatch = new CountDownLatch(1); + nodeConnectionsService.connectToNodes(newClusterState.nodes(), countDownLatch::countDown); + try { + countDownLatch.await(); + } catch (InterruptedException e) { + logger.debug("interrupted while connecting to nodes, continuing", e); + Thread.currentThread().interrupt(); + } + } + private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent) { clusterStateAppliers.forEach(applier -> { logger.trace("calling [{}] with change to version [{}]", applier, clusterChangedEvent.state().version()); diff --git a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java index d0299a2858c..65e17972f0c 100644 --- a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java @@ -19,15 +19,18 @@ package org.elasticsearch.cluster; +import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.coordination.DeterministicTaskQueue; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.lease.Releasable; -import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; @@ -39,7 +42,6 @@ import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.RequestHandlerRegistry; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportException; -import org.elasticsearch.transport.TransportInterceptor; import org.elasticsearch.transport.TransportMessageListener; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; @@ -55,10 +57,17 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; -import java.util.function.Function; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; +import static java.util.Collections.emptySet; +import static org.elasticsearch.cluster.NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING; +import static org.elasticsearch.common.settings.Settings.builder; +import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; +import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; +import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.hamcrest.Matchers.equalTo; public class NodeConnectionsServiceTests extends ESTestCase { @@ -66,6 +75,7 @@ public class NodeConnectionsServiceTests extends ESTestCase { private ThreadPool threadPool; private MockTransport transport; private TransportService transportService; + private Map> nodeConnectionBlocks; private List generateNodes() { List nodes = new ArrayList<>(); @@ -77,65 +87,226 @@ public class NodeConnectionsServiceTests extends ESTestCase { return nodes; } - private ClusterState clusterStateFromNodes(List nodes) { + private DiscoveryNodes discoveryNodesFromList(List discoveryNodes) { final DiscoveryNodes.Builder builder = DiscoveryNodes.builder(); - for (DiscoveryNode node : nodes) { - builder.add(node); + for (final DiscoveryNode discoveryNode : discoveryNodes) { + builder.add(discoveryNode); } - return ClusterState.builder(new ClusterName("test")).nodes(builder).build(); + return builder.build(); } - public void testConnectAndDisconnect() { - List nodes = generateNodes(); - NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService); + public void testConnectAndDisconnect() throws Exception { + final NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService); - ClusterState current = clusterStateFromNodes(Collections.emptyList()); - ClusterChangedEvent event = new ClusterChangedEvent("test", clusterStateFromNodes(randomSubsetOf(nodes)), current); + final AtomicBoolean stopReconnecting = new AtomicBoolean(); + final Thread reconnectionThread = new Thread(() -> { + while (stopReconnecting.get() == false) { + final PlainActionFuture future = new PlainActionFuture<>(); + service.ensureConnections(() -> future.onResponse(null)); + future.actionGet(); + } + }); + reconnectionThread.start(); - service.connectToNodes(event.state().nodes()); - assertConnected(event.state().nodes()); + final List allNodes = generateNodes(); + for (int iteration = 0; iteration < 3; iteration++) { - service.disconnectFromNodesExcept(event.state().nodes()); - assertConnectedExactlyToNodes(event.state()); + final boolean isDisrupting = randomBoolean(); + final AtomicBoolean stopDisrupting = new AtomicBoolean(); + final Thread disruptionThread = new Thread(() -> { + while (isDisrupting && stopDisrupting.get() == false) { + transportService.disconnectFromNode(randomFrom(allNodes)); + } + }); + disruptionThread.start(); - current = event.state(); - event = new ClusterChangedEvent("test", clusterStateFromNodes(randomSubsetOf(nodes)), current); + final DiscoveryNodes nodes = discoveryNodesFromList(randomSubsetOf(allNodes)); + final PlainActionFuture future = new PlainActionFuture<>(); + service.connectToNodes(nodes, () -> future.onResponse(null)); + future.actionGet(); + if (isDisrupting == false) { + assertConnected(nodes); + } + service.disconnectFromNodesExcept(nodes); - service.connectToNodes(event.state().nodes()); - assertConnected(event.state().nodes()); + assertTrue(stopDisrupting.compareAndSet(false, true)); + disruptionThread.join(); - service.disconnectFromNodesExcept(event.state().nodes()); - assertConnectedExactlyToNodes(event.state()); + if (randomBoolean()) { + // sometimes do not wait for the disconnections to complete before starting the next connections + if (usually()) { + ensureConnections(service); + assertConnectedExactlyToNodes(nodes); + } else { + assertBusy(() -> assertConnectedExactlyToNodes(nodes)); + } + } + } + + assertTrue(stopReconnecting.compareAndSet(false, true)); + reconnectionThread.join(); } - public void testReconnect() { - List nodes = generateNodes(); - NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService); + public void testPeriodicReconnection() { + final Settings.Builder settings = Settings.builder(); + final long reconnectIntervalMillis; + if (randomBoolean()) { + reconnectIntervalMillis = CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.get(Settings.EMPTY).millis(); + } else { + reconnectIntervalMillis = randomLongBetween(1, 100000); + settings.put(CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.getKey(), reconnectIntervalMillis + "ms"); + } - ClusterState current = clusterStateFromNodes(Collections.emptyList()); - ClusterChangedEvent event = new ClusterChangedEvent("test", clusterStateFromNodes(randomSubsetOf(nodes)), current); + final DeterministicTaskQueue deterministicTaskQueue + = new DeterministicTaskQueue(builder().put(NODE_NAME_SETTING.getKey(), "node").build(), random()); + + final NodeConnectionsService service + = new NodeConnectionsService(settings.build(), deterministicTaskQueue.getThreadPool(), transportService); + service.start(); + + final List allNodes = generateNodes(); + final DiscoveryNodes targetNodes = discoveryNodesFromList(randomSubsetOf(allNodes)); transport.randomConnectionExceptions = true; - service.connectToNodes(event.state().nodes()); + final AtomicBoolean connectionCompleted = new AtomicBoolean(); + service.connectToNodes(targetNodes, () -> connectionCompleted.set(true)); + deterministicTaskQueue.runAllRunnableTasks(); + assertTrue(connectionCompleted.get()); - for (int i = 0; i < 3; i++) { + long maxDisconnectionTime = 0; + for (int iteration = 0; iteration < 3; iteration++) { // simulate disconnects - for (DiscoveryNode node : randomSubsetOf(nodes)) { - transportService.disconnectFromNode(node); + for (DiscoveryNode node : allNodes) { + if (randomBoolean()) { + final long disconnectionTime = randomLongBetween(0, 120000); + maxDisconnectionTime = Math.max(maxDisconnectionTime, disconnectionTime); + deterministicTaskQueue.scheduleAt(disconnectionTime, new Runnable() { + @Override + public void run() { + transportService.disconnectFromNode(node); + } + + @Override + public String toString() { + return "scheduled disconnection of " + node; + } + }); + } } - service.new ConnectionChecker().run(); } + runTasksUntil(deterministicTaskQueue, maxDisconnectionTime); + // disable exceptions so things can be restored transport.randomConnectionExceptions = false; - service.new ConnectionChecker().run(); - assertConnectedExactlyToNodes(event.state()); + logger.info("renewing connections"); + runTasksUntil(deterministicTaskQueue, maxDisconnectionTime + reconnectIntervalMillis); + assertConnectedExactlyToNodes(targetNodes); } - private void assertConnectedExactlyToNodes(ClusterState state) { - assertConnected(state.nodes()); - assertThat(transportService.getConnectionManager().size(), equalTo(state.nodes().getSize())); + public void testOnlyBlocksOnConnectionsToNewNodes() throws Exception { + final NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService); + + // connect to one node + final DiscoveryNode node0 = new DiscoveryNode("node0", buildNewFakeTransportAddress(), Version.CURRENT); + final DiscoveryNodes nodes0 = DiscoveryNodes.builder().add(node0).build(); + final PlainActionFuture future0 = new PlainActionFuture<>(); + service.connectToNodes(nodes0, () -> future0.onResponse(null)); + future0.actionGet(); + assertConnectedExactlyToNodes(nodes0); + + // connection attempts to node0 block indefinitely + final CyclicBarrier connectionBarrier = new CyclicBarrier(2); + try { + nodeConnectionBlocks.put(node0, connectionBarrier::await); + transportService.disconnectFromNode(node0); + + // can still connect to another node without blocking + final DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT); + final DiscoveryNodes nodes1 = DiscoveryNodes.builder().add(node1).build(); + final DiscoveryNodes nodes01 = DiscoveryNodes.builder(nodes0).add(node1).build(); + final PlainActionFuture future1 = new PlainActionFuture<>(); + service.connectToNodes(nodes01, () -> future1.onResponse(null)); + future1.actionGet(); + assertConnectedExactlyToNodes(nodes1); + + // can also disconnect from node0 without blocking + final PlainActionFuture future2 = new PlainActionFuture<>(); + service.connectToNodes(nodes1, () -> future2.onResponse(null)); + future2.actionGet(); + service.disconnectFromNodesExcept(nodes1); + assertConnectedExactlyToNodes(nodes1); + + // however, now node0 is considered to be a new node so we will block on a subsequent attempt to connect to it + final PlainActionFuture future3 = new PlainActionFuture<>(); + service.connectToNodes(nodes01, () -> future3.onResponse(null)); + expectThrows(ElasticsearchTimeoutException.class, () -> future3.actionGet(timeValueMillis(scaledRandomIntBetween(1, 1000)))); + + // once the connection is unblocked we successfully connect to it. + nodeConnectionBlocks.clear(); + connectionBarrier.await(0, TimeUnit.SECONDS); + future3.actionGet(); + assertConnectedExactlyToNodes(nodes01); + + // if we disconnect from a node while blocked trying to connect to it then we do eventually disconnect from it + nodeConnectionBlocks.put(node0, connectionBarrier::await); + transportService.disconnectFromNode(node0); + final PlainActionFuture future4 = new PlainActionFuture<>(); + service.connectToNodes(nodes01, () -> future4.onResponse(null)); + future4.actionGet(); + assertConnectedExactlyToNodes(nodes1); + + service.disconnectFromNodesExcept(nodes1); + connectionBarrier.await(); + if (randomBoolean()) { + // assertBusy because the connection completes before disconnecting, so we might briefly observe a connection to node0 + assertBusy(() -> assertConnectedExactlyToNodes(nodes1)); + } + + // use ensureConnections() to wait until the service is idle + ensureConnections(service); + assertConnectedExactlyToNodes(nodes1); + + // if we disconnect from a node while blocked trying to connect to it then the listener is notified + final PlainActionFuture future6 = new PlainActionFuture<>(); + service.connectToNodes(nodes01, () -> future6.onResponse(null)); + expectThrows(ElasticsearchTimeoutException.class, () -> future6.actionGet(timeValueMillis(scaledRandomIntBetween(1, 1000)))); + + service.disconnectFromNodesExcept(nodes1); + future6.actionGet(); // completed even though the connection attempt is still blocked + assertConnectedExactlyToNodes(nodes1); + + nodeConnectionBlocks.clear(); + connectionBarrier.await(10, TimeUnit.SECONDS); + ensureConnections(service); + assertConnectedExactlyToNodes(nodes1); + } finally { + nodeConnectionBlocks.clear(); + connectionBarrier.reset(); + } + } + + private void runTasksUntil(DeterministicTaskQueue deterministicTaskQueue, long endTimeMillis) { + while (deterministicTaskQueue.getCurrentTimeMillis() < endTimeMillis) { + if (deterministicTaskQueue.hasRunnableTasks() && randomBoolean()) { + deterministicTaskQueue.runRandomTask(); + } else if (deterministicTaskQueue.hasDeferredTasks()) { + deterministicTaskQueue.advanceTime(); + } + } + deterministicTaskQueue.runAllRunnableTasks(); + } + + private void ensureConnections(NodeConnectionsService service) { + final PlainActionFuture future = new PlainActionFuture<>(); + service.ensureConnections(() -> future.onResponse(null)); + future.actionGet(); + } + + private void assertConnectedExactlyToNodes(DiscoveryNodes discoveryNodes) { + assertConnected(discoveryNodes); + assertThat(transportService.getConnectionManager().size(), equalTo(discoveryNodes.getSize())); } private void assertConnected(Iterable nodes) { @@ -150,10 +321,8 @@ public class NodeConnectionsServiceTests extends ESTestCase { super.setUp(); this.threadPool = new TestThreadPool(getClass().getName()); this.transport = new MockTransport(); - transportService = new NoHandshakeTransportService(Settings.EMPTY, transport, threadPool, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, - boundAddress -> DiscoveryNode.createLocal(Settings.EMPTY, buildNewFakeTransportAddress(), UUIDs.randomBase64UUID()), null, - Collections.emptySet()); + nodeConnectionBlocks = newConcurrentMap(); + transportService = new TestTransportService(transport, threadPool); transportService.start(); transportService.acceptIncomingRequests(); } @@ -167,22 +336,31 @@ public class NodeConnectionsServiceTests extends ESTestCase { super.tearDown(); } - private final class NoHandshakeTransportService extends TransportService { + private final class TestTransportService extends TransportService { - private NoHandshakeTransportService(Settings settings, - Transport transport, - ThreadPool threadPool, - TransportInterceptor transportInterceptor, - Function localNodeFactory, - ClusterSettings clusterSettings, - Set taskHeaders) { - super(settings, transport, threadPool, transportInterceptor, localNodeFactory, clusterSettings, taskHeaders); + private TestTransportService(Transport transport, ThreadPool threadPool) { + super(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> DiscoveryNode.createLocal(Settings.EMPTY, buildNewFakeTransportAddress(), UUIDs.randomBase64UUID()), + null, emptySet()); } @Override public HandshakeResponse handshake(Transport.Connection connection, long timeout, Predicate clusterNamePredicate) { return new HandshakeResponse(connection.getNode(), new ClusterName(""), Version.CURRENT); } + + @Override + public void connectToNode(DiscoveryNode node) throws ConnectTransportException { + final CheckedRunnable connectionBlock = nodeConnectionBlocks.get(node); + if (connectionBlock != null) { + try { + connectionBlock.run(); + } catch (Exception e) { + throw new AssertionError(e); + } + } + super.connectToNode(node); + } } private final class MockTransport implements Transport { @@ -224,40 +402,36 @@ public class NodeConnectionsServiceTests extends ESTestCase { @Override public Releasable openConnection(DiscoveryNode node, ConnectionProfile profile, ActionListener listener) { - if (profile == null) { - if (randomConnectionExceptions && randomBoolean()) { - listener.onFailure(new ConnectTransportException(node, "simulated")); - return () -> {}; - } + if (profile == null && randomConnectionExceptions && randomBoolean()) { + threadPool.generic().execute(() -> listener.onFailure(new ConnectTransportException(node, "simulated"))); + } else { + threadPool.generic().execute(() -> listener.onResponse(new Connection() { + @Override + public DiscoveryNode getNode() { + return node; + } + + @Override + public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) + throws TransportException { + } + + @Override + public void addCloseListener(ActionListener listener) { + } + + @Override + public void close() { + } + + @Override + public boolean isClosed() { + return false; + } + })); } - listener.onResponse(new Connection() { - @Override - public DiscoveryNode getNode() { - return node; - } - - @Override - public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) - throws TransportException { - - } - - @Override - public void addCloseListener(ActionListener listener) { - - } - - @Override - public void close() { - - } - - @Override - public boolean isClosed() { - return false; - } - }); - return () -> {}; + return () -> { + }; } @Override diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 623d3fdc34b..d36423e3886 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -44,7 +44,6 @@ import org.elasticsearch.cluster.metadata.Manifest; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode.Role; -import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; @@ -1755,12 +1754,7 @@ public class CoordinatorTests extends ESTestCase { clusterService = new ClusterService(settings, clusterSettings, masterService, clusterApplierService); clusterService.setNodeConnectionsService( new NodeConnectionsService(clusterService.getSettings(), deterministicTaskQueue.getThreadPool(this::onNode), - transportService) { - @Override - public void connectToNodes(DiscoveryNodes discoveryNodes) { - // override this method as it does blocking calls - } - }); + transportService)); final Collection> onJoinValidators = Collections.singletonList((dn, cs) -> extraJoinValidators.forEach(validator -> validator.accept(dn, cs))); coordinator = new Coordinator("test_node", settings, clusterSettings, transportService, writableRegistry(), @@ -2149,6 +2143,10 @@ public class CoordinatorTests extends ESTestCase { } } + @Override + protected void connectToNodesAndWait(ClusterState newClusterState) { + // don't do anything, and don't block + } } private static DiscoveryNode createDiscoveryNode(int nodeIndex, boolean masterEligible) { diff --git a/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java index 740283736a2..0736d29cf66 100644 --- a/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java @@ -19,14 +19,13 @@ package org.elasticsearch.cluster.service; import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.LocalNodeMasterListener; -import org.elasticsearch.cluster.NodeConnectionsService; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.coordination.NoMasterBlockService; import org.elasticsearch.cluster.metadata.MetaData; @@ -54,6 +53,7 @@ import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static org.elasticsearch.test.ClusterServiceUtils.createNoOpNodeConnectionsService; import static org.elasticsearch.test.ClusterServiceUtils.setState; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.is; @@ -88,23 +88,13 @@ public class ClusterApplierServiceTests extends ESTestCase { super.tearDown(); } - TimedClusterApplierService createTimedClusterService(boolean makeMaster) { + private TimedClusterApplierService createTimedClusterService(boolean makeMaster) { DiscoveryNode localNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); TimedClusterApplierService timedClusterApplierService = new TimedClusterApplierService(Settings.builder().put("cluster.name", "ClusterApplierServiceTests").build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool); - timedClusterApplierService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) { - @Override - public void connectToNodes(DiscoveryNodes discoveryNodes) { - // skip - } - - @Override - public void disconnectFromNodesExcept(DiscoveryNodes nodesToKeep) { - // skip - } - }); + timedClusterApplierService.setNodeConnectionsService(createNoOpNodeConnectionsService()); timedClusterApplierService.setInitialState(ClusterState.builder(new ClusterName("ClusterApplierServiceTests")) .nodes(DiscoveryNodes.builder() .add(localNode) diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index ece8bbd7194..9c1d256b552 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -748,6 +748,11 @@ public class SnapshotResiliencyTests extends ESTestCase { protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { return new MockSinglePrioritizingExecutor(node.getName(), deterministicTaskQueue); } + + @Override + protected void connectToNodesAndWait(ClusterState newClusterState) { + // don't do anything, and don't block + } }); mockTransport = new DisruptableMockTransport(node, logger) { @Override @@ -992,23 +997,7 @@ public class SnapshotResiliencyTests extends ESTestCase { coordinator.start(); masterService.start(); clusterService.getClusterApplierService().setNodeConnectionsService( - new NodeConnectionsService(clusterService.getSettings(), threadPool, transportService) { - @Override - public void connectToNodes(DiscoveryNodes discoveryNodes) { - // override this method as it does blocking calls - boolean callSuper = true; - for (final DiscoveryNode node : discoveryNodes) { - try { - transportService.connectToNode(node); - } catch (Exception e) { - callSuper = false; - } - } - if (callSuper) { - super.connectToNodes(discoveryNodes); - } - } - }); + new NodeConnectionsService(clusterService.getSettings(), threadPool, transportService)); clusterService.getClusterApplierService().start(); indicesService.start(); indicesClusterStateService.start(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java index aae232594a1..5477d292cc2 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java @@ -137,17 +137,7 @@ public class ClusterServiceUtils { .put("cluster.name", "ClusterServiceTests") .build(); ClusterService clusterService = new ClusterService(settings, clusterSettings, threadPool); - clusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) { - @Override - public void connectToNodes(DiscoveryNodes discoveryNodes) { - // skip - } - - @Override - public void disconnectFromNodesExcept(DiscoveryNodes nodesToKeep) { - // skip - } - }); + clusterService.setNodeConnectionsService(createNoOpNodeConnectionsService()); ClusterState initialClusterState = ClusterState.builder(new ClusterName(ClusterServiceUtils.class.getSimpleName())) .nodes(DiscoveryNodes.builder() .add(localNode) @@ -162,6 +152,21 @@ public class ClusterServiceUtils { return clusterService; } + public static NodeConnectionsService createNoOpNodeConnectionsService() { + return new NodeConnectionsService(Settings.EMPTY, null, null) { + @Override + public void connectToNodes(DiscoveryNodes discoveryNodes, Runnable onCompletion) { + // don't do anything + onCompletion.run(); + } + + @Override + public void disconnectFromNodesExcept(DiscoveryNodes nodesToKeep) { + // don't do anything + } + }; + } + public static ClusterStatePublisher createClusterStatePublisher(ClusterApplier clusterApplier) { return (event, publishListener, ackListener) -> clusterApplier.onNewClusterState("mock_publish_to_self[" + event.source() + "]", () -> event.state(), diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruption.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruption.java index d620e7633f2..66514b1b274 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruption.java +++ b/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruption.java @@ -38,6 +38,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.Random; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.function.BiConsumer; import static org.junit.Assert.assertFalse; @@ -49,7 +50,7 @@ import static org.junit.Assert.assertFalse; */ public class NetworkDisruption implements ServiceDisruptionScheme { - private final Logger logger = LogManager.getLogger(NetworkDisruption.class); + private static final Logger logger = LogManager.getLogger(NetworkDisruption.class); private final DisruptedLinks disruptedLinks; private final NetworkLinkDisruptionType networkLinkDisruptionType; @@ -103,9 +104,17 @@ public class NetworkDisruption implements ServiceDisruptionScheme { * handy to be able to ensure this happens faster */ public static void ensureFullyConnectedCluster(InternalTestCluster cluster) { - for (String node: cluster.getNodeNames()) { + final String[] nodeNames = cluster.getNodeNames(); + final CountDownLatch countDownLatch = new CountDownLatch(nodeNames.length); + for (String node : nodeNames) { ClusterState stateOnNode = cluster.getInstance(ClusterService.class, node).state(); - cluster.getInstance(NodeConnectionsService.class, node).connectToNodes(stateOnNode.nodes()); + cluster.getInstance(NodeConnectionsService.class, node).reconnectToNodes(stateOnNode.nodes(), countDownLatch::countDown); + } + + try { + countDownLatch.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); } }