diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java b/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java
index 28249754b50..dcb24008a28 100644
--- a/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java
+++ b/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java
@@ -18,58 +18,75 @@
*/
package org.elasticsearch.cluster;
+import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.GroupedActionListener;
+import org.elasticsearch.action.support.PlainListenableActionFuture;
import org.elasticsearch.cluster.coordination.FollowersChecker;
-import org.elasticsearch.cluster.coordination.LeaderChecker;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.service.ClusterApplier;
+import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
-import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
-import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
-import org.elasticsearch.common.util.concurrent.KeyedLock;
-import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.common.settings.Setting.Property;
import static org.elasticsearch.common.settings.Setting.positiveTimeSetting;
-
/**
- * This component is responsible for connecting to nodes once they are added to the cluster state, and disconnect when they are
- * removed. Also, it periodically checks that all connections are still open and if needed restores them.
- * Note that this component is *not* responsible for removing nodes from the cluster if they disconnect / do not respond
- * to pings. This is done by {@link FollowersChecker}. Master fault detection is done by {@link LeaderChecker}.
+ * This component is responsible for maintaining connections from this node to all the nodes listed in the cluster state, and for
+ * disconnecting from nodes once they are removed from the cluster state. It periodically checks that all connections are still open and
+ * restores them if needed. Note that this component is *not* responsible for removing nodes from the cluster state if they disconnect or
+ * are unresponsive: this is the job of the master's fault detection components, particularly {@link FollowersChecker}.
+ *
+ * The {@link NodeConnectionsService#connectToNodes(DiscoveryNodes, Runnable)} and {@link
+ * NodeConnectionsService#disconnectFromNodesExcept(DiscoveryNodes)} methods are called on the {@link ClusterApplier} thread. This component
+ * allows the {@code ClusterApplier} to block on forming connections to _new_ nodes, because the rest of the system treats a missing
+ * connection with another node in the cluster state as an exceptional condition and we don't want this to happen to new nodes. However we
+ * need not block on re-establishing existing connections because if a connection is down then we are already in an exceptional situation
+ * and it doesn't matter much if we stay in this situation a little longer.
+ *
+ * This component does not block on disconnections at all, because a disconnection might need to wait for an ongoing (background) connection
+ * attempt to complete first.
*/
public class NodeConnectionsService extends AbstractLifecycleComponent {
private static final Logger logger = LogManager.getLogger(NodeConnectionsService.class);
public static final Setting CLUSTER_NODE_RECONNECT_INTERVAL_SETTING =
- positiveTimeSetting("cluster.nodes.reconnect_interval", TimeValue.timeValueSeconds(10), Property.NodeScope);
+ positiveTimeSetting("cluster.nodes.reconnect_interval", TimeValue.timeValueSeconds(10), Property.NodeScope);
+
private final ThreadPool threadPool;
private final TransportService transportService;
- // map between current node and the number of failed connection attempts. 0 means successfully connected.
- // if a node doesn't appear in this list it shouldn't be monitored
- private ConcurrentMap nodes = ConcurrentCollections.newConcurrentMap();
+ // Protects changes to targetsByNode and its values (i.e. ConnectionTarget#activityType and ConnectionTarget#listener).
+ // Crucially there are no blocking calls under this mutex: it is not held while connecting or disconnecting.
+ private final Object mutex = new Object();
- private final KeyedLock nodeLocks = new KeyedLock<>();
+ // contains an entry for every node in the latest cluster state, as well as for nodes from which we are in the process of
+ // disconnecting
+ private final Map targetsByNode = new HashMap<>();
private final TimeValue reconnectInterval;
-
- private volatile Scheduler.Cancellable backgroundCancellable = null;
+ private volatile ConnectionChecker connectionChecker;
@Inject
public NodeConnectionsService(Settings settings, ThreadPool threadPool, TransportService transportService) {
@@ -78,132 +95,337 @@ public class NodeConnectionsService extends AbstractLifecycleComponent {
this.reconnectInterval = NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.get(settings);
}
- public void connectToNodes(DiscoveryNodes discoveryNodes) {
- CountDownLatch latch = new CountDownLatch(discoveryNodes.getSize());
- for (final DiscoveryNode node : discoveryNodes) {
- final boolean connected;
- try (Releasable ignored = nodeLocks.acquire(node)) {
- nodes.putIfAbsent(node, 0);
- connected = transportService.nodeConnected(node);
- }
- if (connected) {
- latch.countDown();
- } else {
- // spawn to another thread to do in parallel
- threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() {
- @Override
- public void onFailure(Exception e) {
- // both errors and rejections are logged here. the service
- // will try again after `cluster.nodes.reconnect_interval` on all nodes but the current master.
- // On the master, node fault detection will remove these nodes from the cluster as their are not
- // connected. Note that it is very rare that we end up here on the master.
- logger.warn(() -> new ParameterizedMessage("failed to connect to {}", node), e);
- }
+ /**
+ * Connect to all the given nodes, but do not disconnect from any extra nodes. Calls the completion handler on completion of all
+ * connection attempts to _new_ nodes, but not on attempts to re-establish connections to nodes that are already known.
+ */
+ public void connectToNodes(DiscoveryNodes discoveryNodes, Runnable onCompletion) {
- @Override
- protected void doRun() {
- try (Releasable ignored = nodeLocks.acquire(node)) {
- validateAndConnectIfNeeded(node);
- }
- }
+ if (discoveryNodes.getSize() == 0) {
+ onCompletion.run();
+ return;
+ }
- @Override
- public void onAfter() {
- latch.countDown();
- }
- });
+ final GroupedActionListener listener
+ = new GroupedActionListener<>(ActionListener.wrap(onCompletion), discoveryNodes.getSize());
+
+ final List runnables = new ArrayList<>(discoveryNodes.getSize());
+ synchronized (mutex) {
+ for (final DiscoveryNode discoveryNode : discoveryNodes) {
+ ConnectionTarget connectionTarget = targetsByNode.get(discoveryNode);
+ final boolean isNewNode;
+ if (connectionTarget == null) {
+ // new node, set up target and listener
+ connectionTarget = new ConnectionTarget(discoveryNode);
+ targetsByNode.put(discoveryNode, connectionTarget);
+ isNewNode = true;
+ } else {
+ // existing node, but maybe we're disconnecting from it, in which case it was recently removed from the cluster
+ // state and has now been re-added so we should wait for the re-connection
+ isNewNode = connectionTarget.isPendingDisconnection();
+ }
+
+ if (isNewNode) {
+ runnables.add(connectionTarget.connect(listener));
+ } else {
+ // known node, try and ensure it's connected but do not wait
+ runnables.add(connectionTarget.connect(null));
+ runnables.add(() -> listener.onResponse(null));
+ }
}
}
- try {
- latch.await();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
+ runnables.forEach(Runnable::run);
}
/**
- * Disconnects from all nodes except the ones provided as parameter
+ * Disconnect from any nodes to which we are currently connected which do not appear in the given nodes. Does not wait for the
+ * disconnections to complete, because they might have to wait for ongoing connection attempts first.
*/
- public void disconnectFromNodesExcept(DiscoveryNodes nodesToKeep) {
- Set currentNodes = new HashSet<>(nodes.keySet());
- for (DiscoveryNode node : nodesToKeep) {
- currentNodes.remove(node);
- }
- for (final DiscoveryNode node : currentNodes) {
- try (Releasable ignored = nodeLocks.acquire(node)) {
- Integer current = nodes.remove(node);
- assert current != null : "node " + node + " was removed in event but not in internal nodes";
- try {
- transportService.disconnectFromNode(node);
- } catch (Exception e) {
- logger.warn(() -> new ParameterizedMessage("failed to disconnect to node [{}]", node), e);
- }
+ public void disconnectFromNodesExcept(DiscoveryNodes discoveryNodes) {
+ final List runnables = new ArrayList<>();
+ synchronized (mutex) {
+ final Set nodesToDisconnect = new HashSet<>(targetsByNode.keySet());
+ for (final DiscoveryNode discoveryNode : discoveryNodes) {
+ nodesToDisconnect.remove(discoveryNode);
+ }
+
+ for (final DiscoveryNode discoveryNode : nodesToDisconnect) {
+ runnables.add(targetsByNode.get(discoveryNode).disconnect());
}
}
+ runnables.forEach(Runnable::run);
}
- void validateAndConnectIfNeeded(DiscoveryNode node) {
- assert nodeLocks.isHeldByCurrentThread(node) : "validateAndConnectIfNeeded must be called under lock";
- if (lifecycle.stoppedOrClosed() ||
- nodes.containsKey(node) == false) { // we double check existence of node since connectToNode might take time...
- // nothing to do
- } else {
- try {
- // connecting to an already connected node is a noop
- transportService.connectToNode(node);
- nodes.put(node, 0);
- } catch (Exception e) {
- Integer nodeFailureCount = nodes.get(node);
- assert nodeFailureCount != null : node + " didn't have a counter in nodes map";
- nodeFailureCount = nodeFailureCount + 1;
- // log every 6th failure
- if ((nodeFailureCount % 6) == 1) {
- final int finalNodeFailureCount = nodeFailureCount;
- logger.warn(() -> new ParameterizedMessage(
- "failed to connect to node {} (tried [{}] times)", node, finalNodeFailureCount), e);
+ /**
+ * Makes a single attempt to reconnect to any nodes which are disconnected but should be connected. Does not attempt to reconnect any
+ * nodes which are in the process of disconnecting. The onCompletion handler is called after all ongoing connection/disconnection
+ * attempts have completed.
+ */
+ void ensureConnections(Runnable onCompletion) {
+ final List runnables = new ArrayList<>();
+ synchronized (mutex) {
+ final Collection connectionTargets = targetsByNode.values();
+ if (connectionTargets.isEmpty()) {
+ runnables.add(onCompletion);
+ } else {
+ logger.trace("ensuring connections to {}", targetsByNode);
+ final GroupedActionListener listener = new GroupedActionListener<>(
+ ActionListener.wrap(onCompletion), connectionTargets.size());
+ for (final ConnectionTarget connectionTarget : connectionTargets) {
+ runnables.add(connectionTarget.ensureConnected(listener));
}
- nodes.put(node, nodeFailureCount);
}
}
+ runnables.forEach(Runnable::run);
}
class ConnectionChecker extends AbstractRunnable {
+ protected void doRun() {
+ if (connectionChecker == this) {
+ ensureConnections(this::scheduleNextCheck);
+ }
+ }
+
+ void scheduleNextCheck() {
+ if (connectionChecker == this) {
+ threadPool.scheduleUnlessShuttingDown(reconnectInterval, ThreadPool.Names.GENERIC, this);
+ }
+ }
@Override
public void onFailure(Exception e) {
logger.warn("unexpected error while checking for node reconnects", e);
- }
-
- protected void doRun() {
- for (DiscoveryNode node : nodes.keySet()) {
- try (Releasable ignored = nodeLocks.acquire(node)) {
- validateAndConnectIfNeeded(node);
- }
- }
+ scheduleNextCheck();
}
@Override
- public void onAfter() {
- if (lifecycle.started()) {
- backgroundCancellable = threadPool.schedule(this, reconnectInterval, ThreadPool.Names.GENERIC);
- }
+ public String toString() {
+ return "periodic reconnection checker";
}
}
@Override
protected void doStart() {
- backgroundCancellable = threadPool.schedule(new ConnectionChecker(), reconnectInterval, ThreadPool.Names.GENERIC);
+ final ConnectionChecker connectionChecker = new ConnectionChecker();
+ this.connectionChecker = connectionChecker;
+ connectionChecker.scheduleNextCheck();
}
@Override
protected void doStop() {
- if (backgroundCancellable != null) {
- backgroundCancellable.cancel();
- }
+ connectionChecker = null;
}
@Override
protected void doClose() {
+ }
+ // for disruption tests, re-establish any disrupted connections
+ public void reconnectToNodes(DiscoveryNodes discoveryNodes, Runnable onCompletion) {
+ connectToNodes(discoveryNodes, () -> {
+ disconnectFromNodesExcept(discoveryNodes);
+ ensureConnections(onCompletion);
+ });
+ }
+
+ private enum ActivityType {
+ IDLE,
+ CONNECTING,
+ DISCONNECTING
+ }
+
+ /**
+ * {@link ConnectionTarget} ensures that we are never concurrently connecting to and disconnecting from a node, and that we eventually
+ * either connect to or disconnect from it according to whether {@link ConnectionTarget#connect(ActionListener)} or
+ * {@link ConnectionTarget#disconnect()} was called last.
+ *
+ * Each {@link ConnectionTarget} is in one of these states:
+ *
+ * - idle ({@link ConnectionTarget#future} has no listeners)
+ * - awaiting connection ({@link ConnectionTarget#future} may contain listeners awaiting a connection)
+ * - awaiting disconnection ({@link ConnectionTarget#future} may contain listeners awaiting a disconnection)
+ *
+ * It will be awaiting connection (respectively disconnection) after calling {@code connect()} (respectively {@code disconnect()}). It
+ * will eventually become idle if these methods are not called infinitely often.
+ *
+ * These methods return a {@link Runnable} which starts the connection/disconnection process iff it was idle before the method was
+ * called, and which notifies any failed listeners if the {@code ConnectionTarget} went from {@code CONNECTING} to {@code DISCONNECTING}
+ * or vice versa. The connection/disconnection process continues until all listeners have been removed, at which point it becomes idle
+ * again.
+ *
+ * Additionally if the last step of the process was a disconnection then this target is removed from the current set of targets. Thus
+ * if this {@link ConnectionTarget} is idle and in the current set of targets then it should be connected.
+ *
+ * All of the {@code listeners} are awaiting the completion of the same activity, which is either a connection or a disconnection. If
+ * we are currently connecting and then {@link ConnectionTarget#disconnect()} is called then all connection listeners are
+ * removed from the list so they can be notified of failure; once the connecting process has finished a disconnection will be started.
+ * Similarly if we are currently disconnecting and then {@link ConnectionTarget#connect(ActionListener)} is called then all
+ * disconnection listeners are immediately removed for failure notification and a connection is started once the disconnection is
+ * complete.
+ */
+ private class ConnectionTarget {
+ private final DiscoveryNode discoveryNode;
+
+ private PlainListenableActionFuture future = PlainListenableActionFuture.newListenableFuture();
+ private ActivityType activityType = ActivityType.IDLE; // indicates what any listeners are awaiting
+
+ private final AtomicInteger consecutiveFailureCount = new AtomicInteger();
+
+ private final Runnable connectActivity = () -> threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() {
+ @Override
+ protected void doRun() {
+ assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held";
+ transportService.connectToNode(discoveryNode);
+ consecutiveFailureCount.set(0);
+ logger.debug("connected to {}", discoveryNode);
+ onCompletion(ActivityType.CONNECTING, null, disconnectActivity);
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held";
+ final int currentFailureCount = consecutiveFailureCount.incrementAndGet();
+ // only warn every 6th failure
+ final Level level = currentFailureCount % 6 == 1 ? Level.WARN : Level.DEBUG;
+ logger.log(level, new ParameterizedMessage("failed to connect to {} (tried [{}] times)",
+ discoveryNode, currentFailureCount), e);
+ onCompletion(ActivityType.CONNECTING, e, disconnectActivity);
+ }
+
+ @Override
+ public String toString() {
+ return "connect to " + discoveryNode;
+ }
+ });
+
+ private final Runnable disconnectActivity = new AbstractRunnable() {
+ @Override
+ protected void doRun() {
+ assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held";
+ transportService.disconnectFromNode(discoveryNode);
+ consecutiveFailureCount.set(0);
+ logger.debug("disconnected from {}", discoveryNode);
+ onCompletion(ActivityType.DISCONNECTING, null, connectActivity);
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held";
+ consecutiveFailureCount.incrementAndGet();
+ // we may not have disconnected, but will not retry, so this connection might have leaked
+ logger.warn(new ParameterizedMessage("failed to disconnect from {}, possible connection leak", discoveryNode), e);
+ assert false : "failed to disconnect from " + discoveryNode + ", possible connection leak\n" + e;
+ onCompletion(ActivityType.DISCONNECTING, e, connectActivity);
+ }
+ };
+
+ ConnectionTarget(DiscoveryNode discoveryNode) {
+ this.discoveryNode = discoveryNode;
+ }
+
+ Runnable connect(@Nullable ActionListener listener) {
+ return addListenerAndStartActivity(listener, ActivityType.CONNECTING, connectActivity,
+ "disconnection cancelled by reconnection");
+ }
+
+ Runnable disconnect() {
+ return addListenerAndStartActivity(null, ActivityType.DISCONNECTING, disconnectActivity,
+ "connection cancelled by disconnection");
+ }
+
+ Runnable ensureConnected(@Nullable ActionListener listener) {
+ assert Thread.holdsLock(mutex) : "mutex not held";
+
+ if (activityType == ActivityType.IDLE) {
+ if (transportService.nodeConnected(discoveryNode)) {
+ return () -> listener.onResponse(null);
+ } else {
+ // target is disconnected, and we are currently idle, so start a connection process.
+ activityType = ActivityType.CONNECTING;
+ addListener(listener);
+ return connectActivity;
+ }
+ } else {
+ addListener(listener);
+ return () -> {
+ };
+ }
+ }
+
+ private void addListener(@Nullable ActionListener listener) {
+ assert Thread.holdsLock(mutex) : "mutex not held";
+ assert activityType != ActivityType.IDLE;
+ if (listener != null) {
+ future.addListener(listener);
+ }
+ }
+
+ private PlainListenableActionFuture getAndClearFuture() {
+ assert Thread.holdsLock(mutex) : "mutex not held";
+ final PlainListenableActionFuture drainedFuture = future;
+ future = PlainListenableActionFuture.newListenableFuture();
+ return drainedFuture;
+ }
+
+ private Runnable addListenerAndStartActivity(@Nullable ActionListener listener, ActivityType newActivityType,
+ Runnable activity, String cancellationMessage) {
+ assert Thread.holdsLock(mutex) : "mutex not held";
+ assert newActivityType.equals(ActivityType.IDLE) == false;
+
+ if (activityType == ActivityType.IDLE) {
+ activityType = newActivityType;
+ addListener(listener);
+ return activity;
+ }
+
+ if (activityType == newActivityType) {
+ addListener(listener);
+ return () -> {
+ };
+ }
+
+ activityType = newActivityType;
+ final PlainListenableActionFuture oldFuture = getAndClearFuture();
+ addListener(listener);
+ return () -> oldFuture.onFailure(new ElasticsearchException(cancellationMessage));
+ }
+
+ private void onCompletion(ActivityType completedActivityType, @Nullable Exception e, Runnable oppositeActivity) {
+ assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held";
+
+ final Runnable cleanup;
+ synchronized (mutex) {
+ assert activityType != ActivityType.IDLE;
+ if (activityType == completedActivityType) {
+ final PlainListenableActionFuture oldFuture = getAndClearFuture();
+ activityType = ActivityType.IDLE;
+
+ cleanup = e == null ? () -> oldFuture.onResponse(null) : () -> oldFuture.onFailure(e);
+
+ if (completedActivityType.equals(ActivityType.DISCONNECTING)) {
+ final ConnectionTarget removedTarget = targetsByNode.remove(discoveryNode);
+ assert removedTarget == this : removedTarget + " vs " + this;
+ }
+ } else {
+ cleanup = oppositeActivity;
+ }
+ }
+ cleanup.run();
+ }
+
+ boolean isPendingDisconnection() {
+ assert Thread.holdsLock(mutex) : "mutex not held";
+ return activityType == ActivityType.DISCONNECTING;
+ }
+
+ @Override
+ public String toString() {
+ synchronized (mutex) {
+ return "ConnectionTarget{" +
+ "discoveryNode=" + discoveryNode +
+ ", activityType=" + activityType +
+ '}';
+ }
+ }
}
}
diff --git a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java
index f2b0756d3d8..11b6f451d59 100644
--- a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java
+++ b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java
@@ -55,6 +55,7 @@ import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
@@ -450,7 +451,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
}
logger.trace("connecting to nodes of cluster state with version {}", newClusterState.version());
- nodeConnectionsService.connectToNodes(newClusterState.nodes());
+ connectToNodesAndWait(newClusterState);
// nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency
if (clusterChangedEvent.state().blocks().disableStatePersistence() == false && clusterChangedEvent.metaDataChanged()) {
@@ -470,6 +471,18 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
callClusterStateListeners(clusterChangedEvent);
}
+ protected void connectToNodesAndWait(ClusterState newClusterState) {
+ // can't wait for an ActionFuture on the cluster applier thread, but we do want to block the thread here, so use a CountDownLatch.
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
+ nodeConnectionsService.connectToNodes(newClusterState.nodes(), countDownLatch::countDown);
+ try {
+ countDownLatch.await();
+ } catch (InterruptedException e) {
+ logger.debug("interrupted while connecting to nodes, continuing", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent) {
clusterStateAppliers.forEach(applier -> {
logger.trace("calling [{}] with change to version [{}]", applier, clusterChangedEvent.state().version());
diff --git a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java
index d0299a2858c..65e17972f0c 100644
--- a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java
+++ b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java
@@ -19,15 +19,18 @@
package org.elasticsearch.cluster;
+import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.cluster.coordination.DeterministicTaskQueue;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.lease.Releasable;
-import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
@@ -39,7 +42,6 @@ import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.RequestHandlerRegistry;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
-import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportMessageListener;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
@@ -55,10 +57,17 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
+import static java.util.Collections.emptySet;
+import static org.elasticsearch.cluster.NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING;
+import static org.elasticsearch.common.settings.Settings.builder;
+import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
+import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
+import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.hamcrest.Matchers.equalTo;
public class NodeConnectionsServiceTests extends ESTestCase {
@@ -66,6 +75,7 @@ public class NodeConnectionsServiceTests extends ESTestCase {
private ThreadPool threadPool;
private MockTransport transport;
private TransportService transportService;
+ private Map> nodeConnectionBlocks;
private List generateNodes() {
List nodes = new ArrayList<>();
@@ -77,65 +87,226 @@ public class NodeConnectionsServiceTests extends ESTestCase {
return nodes;
}
- private ClusterState clusterStateFromNodes(List nodes) {
+ private DiscoveryNodes discoveryNodesFromList(List discoveryNodes) {
final DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
- for (DiscoveryNode node : nodes) {
- builder.add(node);
+ for (final DiscoveryNode discoveryNode : discoveryNodes) {
+ builder.add(discoveryNode);
}
- return ClusterState.builder(new ClusterName("test")).nodes(builder).build();
+ return builder.build();
}
- public void testConnectAndDisconnect() {
- List nodes = generateNodes();
- NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService);
+ public void testConnectAndDisconnect() throws Exception {
+ final NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService);
- ClusterState current = clusterStateFromNodes(Collections.emptyList());
- ClusterChangedEvent event = new ClusterChangedEvent("test", clusterStateFromNodes(randomSubsetOf(nodes)), current);
+ final AtomicBoolean stopReconnecting = new AtomicBoolean();
+ final Thread reconnectionThread = new Thread(() -> {
+ while (stopReconnecting.get() == false) {
+ final PlainActionFuture future = new PlainActionFuture<>();
+ service.ensureConnections(() -> future.onResponse(null));
+ future.actionGet();
+ }
+ });
+ reconnectionThread.start();
- service.connectToNodes(event.state().nodes());
- assertConnected(event.state().nodes());
+ final List allNodes = generateNodes();
+ for (int iteration = 0; iteration < 3; iteration++) {
- service.disconnectFromNodesExcept(event.state().nodes());
- assertConnectedExactlyToNodes(event.state());
+ final boolean isDisrupting = randomBoolean();
+ final AtomicBoolean stopDisrupting = new AtomicBoolean();
+ final Thread disruptionThread = new Thread(() -> {
+ while (isDisrupting && stopDisrupting.get() == false) {
+ transportService.disconnectFromNode(randomFrom(allNodes));
+ }
+ });
+ disruptionThread.start();
- current = event.state();
- event = new ClusterChangedEvent("test", clusterStateFromNodes(randomSubsetOf(nodes)), current);
+ final DiscoveryNodes nodes = discoveryNodesFromList(randomSubsetOf(allNodes));
+ final PlainActionFuture future = new PlainActionFuture<>();
+ service.connectToNodes(nodes, () -> future.onResponse(null));
+ future.actionGet();
+ if (isDisrupting == false) {
+ assertConnected(nodes);
+ }
+ service.disconnectFromNodesExcept(nodes);
- service.connectToNodes(event.state().nodes());
- assertConnected(event.state().nodes());
+ assertTrue(stopDisrupting.compareAndSet(false, true));
+ disruptionThread.join();
- service.disconnectFromNodesExcept(event.state().nodes());
- assertConnectedExactlyToNodes(event.state());
+ if (randomBoolean()) {
+ // sometimes do not wait for the disconnections to complete before starting the next connections
+ if (usually()) {
+ ensureConnections(service);
+ assertConnectedExactlyToNodes(nodes);
+ } else {
+ assertBusy(() -> assertConnectedExactlyToNodes(nodes));
+ }
+ }
+ }
+
+ assertTrue(stopReconnecting.compareAndSet(false, true));
+ reconnectionThread.join();
}
- public void testReconnect() {
- List nodes = generateNodes();
- NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService);
+ public void testPeriodicReconnection() {
+ final Settings.Builder settings = Settings.builder();
+ final long reconnectIntervalMillis;
+ if (randomBoolean()) {
+ reconnectIntervalMillis = CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.get(Settings.EMPTY).millis();
+ } else {
+ reconnectIntervalMillis = randomLongBetween(1, 100000);
+ settings.put(CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.getKey(), reconnectIntervalMillis + "ms");
+ }
- ClusterState current = clusterStateFromNodes(Collections.emptyList());
- ClusterChangedEvent event = new ClusterChangedEvent("test", clusterStateFromNodes(randomSubsetOf(nodes)), current);
+ final DeterministicTaskQueue deterministicTaskQueue
+ = new DeterministicTaskQueue(builder().put(NODE_NAME_SETTING.getKey(), "node").build(), random());
+
+ final NodeConnectionsService service
+ = new NodeConnectionsService(settings.build(), deterministicTaskQueue.getThreadPool(), transportService);
+ service.start();
+
+ final List allNodes = generateNodes();
+ final DiscoveryNodes targetNodes = discoveryNodesFromList(randomSubsetOf(allNodes));
transport.randomConnectionExceptions = true;
- service.connectToNodes(event.state().nodes());
+ final AtomicBoolean connectionCompleted = new AtomicBoolean();
+ service.connectToNodes(targetNodes, () -> connectionCompleted.set(true));
+ deterministicTaskQueue.runAllRunnableTasks();
+ assertTrue(connectionCompleted.get());
- for (int i = 0; i < 3; i++) {
+ long maxDisconnectionTime = 0;
+ for (int iteration = 0; iteration < 3; iteration++) {
// simulate disconnects
- for (DiscoveryNode node : randomSubsetOf(nodes)) {
- transportService.disconnectFromNode(node);
+ for (DiscoveryNode node : allNodes) {
+ if (randomBoolean()) {
+ final long disconnectionTime = randomLongBetween(0, 120000);
+ maxDisconnectionTime = Math.max(maxDisconnectionTime, disconnectionTime);
+ deterministicTaskQueue.scheduleAt(disconnectionTime, new Runnable() {
+ @Override
+ public void run() {
+ transportService.disconnectFromNode(node);
+ }
+
+ @Override
+ public String toString() {
+ return "scheduled disconnection of " + node;
+ }
+ });
+ }
}
- service.new ConnectionChecker().run();
}
+ runTasksUntil(deterministicTaskQueue, maxDisconnectionTime);
+
// disable exceptions so things can be restored
transport.randomConnectionExceptions = false;
- service.new ConnectionChecker().run();
- assertConnectedExactlyToNodes(event.state());
+ logger.info("renewing connections");
+ runTasksUntil(deterministicTaskQueue, maxDisconnectionTime + reconnectIntervalMillis);
+ assertConnectedExactlyToNodes(targetNodes);
}
- private void assertConnectedExactlyToNodes(ClusterState state) {
- assertConnected(state.nodes());
- assertThat(transportService.getConnectionManager().size(), equalTo(state.nodes().getSize()));
+ public void testOnlyBlocksOnConnectionsToNewNodes() throws Exception {
+ final NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService);
+
+ // connect to one node
+ final DiscoveryNode node0 = new DiscoveryNode("node0", buildNewFakeTransportAddress(), Version.CURRENT);
+ final DiscoveryNodes nodes0 = DiscoveryNodes.builder().add(node0).build();
+ final PlainActionFuture future0 = new PlainActionFuture<>();
+ service.connectToNodes(nodes0, () -> future0.onResponse(null));
+ future0.actionGet();
+ assertConnectedExactlyToNodes(nodes0);
+
+ // connection attempts to node0 block indefinitely
+ final CyclicBarrier connectionBarrier = new CyclicBarrier(2);
+ try {
+ nodeConnectionBlocks.put(node0, connectionBarrier::await);
+ transportService.disconnectFromNode(node0);
+
+ // can still connect to another node without blocking
+ final DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT);
+ final DiscoveryNodes nodes1 = DiscoveryNodes.builder().add(node1).build();
+ final DiscoveryNodes nodes01 = DiscoveryNodes.builder(nodes0).add(node1).build();
+ final PlainActionFuture future1 = new PlainActionFuture<>();
+ service.connectToNodes(nodes01, () -> future1.onResponse(null));
+ future1.actionGet();
+ assertConnectedExactlyToNodes(nodes1);
+
+ // can also disconnect from node0 without blocking
+ final PlainActionFuture future2 = new PlainActionFuture<>();
+ service.connectToNodes(nodes1, () -> future2.onResponse(null));
+ future2.actionGet();
+ service.disconnectFromNodesExcept(nodes1);
+ assertConnectedExactlyToNodes(nodes1);
+
+ // however, now node0 is considered to be a new node so we will block on a subsequent attempt to connect to it
+ final PlainActionFuture future3 = new PlainActionFuture<>();
+ service.connectToNodes(nodes01, () -> future3.onResponse(null));
+ expectThrows(ElasticsearchTimeoutException.class, () -> future3.actionGet(timeValueMillis(scaledRandomIntBetween(1, 1000))));
+
+ // once the connection is unblocked we successfully connect to it.
+ nodeConnectionBlocks.clear();
+ connectionBarrier.await(0, TimeUnit.SECONDS);
+ future3.actionGet();
+ assertConnectedExactlyToNodes(nodes01);
+
+ // if we disconnect from a node while blocked trying to connect to it then we do eventually disconnect from it
+ nodeConnectionBlocks.put(node0, connectionBarrier::await);
+ transportService.disconnectFromNode(node0);
+ final PlainActionFuture future4 = new PlainActionFuture<>();
+ service.connectToNodes(nodes01, () -> future4.onResponse(null));
+ future4.actionGet();
+ assertConnectedExactlyToNodes(nodes1);
+
+ service.disconnectFromNodesExcept(nodes1);
+ connectionBarrier.await();
+ if (randomBoolean()) {
+ // assertBusy because the connection completes before disconnecting, so we might briefly observe a connection to node0
+ assertBusy(() -> assertConnectedExactlyToNodes(nodes1));
+ }
+
+ // use ensureConnections() to wait until the service is idle
+ ensureConnections(service);
+ assertConnectedExactlyToNodes(nodes1);
+
+ // if we disconnect from a node while blocked trying to connect to it then the listener is notified
+ final PlainActionFuture future6 = new PlainActionFuture<>();
+ service.connectToNodes(nodes01, () -> future6.onResponse(null));
+ expectThrows(ElasticsearchTimeoutException.class, () -> future6.actionGet(timeValueMillis(scaledRandomIntBetween(1, 1000))));
+
+ service.disconnectFromNodesExcept(nodes1);
+ future6.actionGet(); // completed even though the connection attempt is still blocked
+ assertConnectedExactlyToNodes(nodes1);
+
+ nodeConnectionBlocks.clear();
+ connectionBarrier.await(10, TimeUnit.SECONDS);
+ ensureConnections(service);
+ assertConnectedExactlyToNodes(nodes1);
+ } finally {
+ nodeConnectionBlocks.clear();
+ connectionBarrier.reset();
+ }
+ }
+
+ private void runTasksUntil(DeterministicTaskQueue deterministicTaskQueue, long endTimeMillis) {
+ while (deterministicTaskQueue.getCurrentTimeMillis() < endTimeMillis) {
+ if (deterministicTaskQueue.hasRunnableTasks() && randomBoolean()) {
+ deterministicTaskQueue.runRandomTask();
+ } else if (deterministicTaskQueue.hasDeferredTasks()) {
+ deterministicTaskQueue.advanceTime();
+ }
+ }
+ deterministicTaskQueue.runAllRunnableTasks();
+ }
+
+ private void ensureConnections(NodeConnectionsService service) {
+ final PlainActionFuture future = new PlainActionFuture<>();
+ service.ensureConnections(() -> future.onResponse(null));
+ future.actionGet();
+ }
+
+ private void assertConnectedExactlyToNodes(DiscoveryNodes discoveryNodes) {
+ assertConnected(discoveryNodes);
+ assertThat(transportService.getConnectionManager().size(), equalTo(discoveryNodes.getSize()));
}
private void assertConnected(Iterable nodes) {
@@ -150,10 +321,8 @@ public class NodeConnectionsServiceTests extends ESTestCase {
super.setUp();
this.threadPool = new TestThreadPool(getClass().getName());
this.transport = new MockTransport();
- transportService = new NoHandshakeTransportService(Settings.EMPTY, transport, threadPool,
- TransportService.NOOP_TRANSPORT_INTERCEPTOR,
- boundAddress -> DiscoveryNode.createLocal(Settings.EMPTY, buildNewFakeTransportAddress(), UUIDs.randomBase64UUID()), null,
- Collections.emptySet());
+ nodeConnectionBlocks = newConcurrentMap();
+ transportService = new TestTransportService(transport, threadPool);
transportService.start();
transportService.acceptIncomingRequests();
}
@@ -167,22 +336,31 @@ public class NodeConnectionsServiceTests extends ESTestCase {
super.tearDown();
}
- private final class NoHandshakeTransportService extends TransportService {
+ private final class TestTransportService extends TransportService {
- private NoHandshakeTransportService(Settings settings,
- Transport transport,
- ThreadPool threadPool,
- TransportInterceptor transportInterceptor,
- Function localNodeFactory,
- ClusterSettings clusterSettings,
- Set taskHeaders) {
- super(settings, transport, threadPool, transportInterceptor, localNodeFactory, clusterSettings, taskHeaders);
+ private TestTransportService(Transport transport, ThreadPool threadPool) {
+ super(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
+ boundAddress -> DiscoveryNode.createLocal(Settings.EMPTY, buildNewFakeTransportAddress(), UUIDs.randomBase64UUID()),
+ null, emptySet());
}
@Override
public HandshakeResponse handshake(Transport.Connection connection, long timeout, Predicate clusterNamePredicate) {
return new HandshakeResponse(connection.getNode(), new ClusterName(""), Version.CURRENT);
}
+
+ @Override
+ public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
+ final CheckedRunnable connectionBlock = nodeConnectionBlocks.get(node);
+ if (connectionBlock != null) {
+ try {
+ connectionBlock.run();
+ } catch (Exception e) {
+ throw new AssertionError(e);
+ }
+ }
+ super.connectToNode(node);
+ }
}
private final class MockTransport implements Transport {
@@ -224,40 +402,36 @@ public class NodeConnectionsServiceTests extends ESTestCase {
@Override
public Releasable openConnection(DiscoveryNode node, ConnectionProfile profile, ActionListener listener) {
- if (profile == null) {
- if (randomConnectionExceptions && randomBoolean()) {
- listener.onFailure(new ConnectTransportException(node, "simulated"));
- return () -> {};
- }
+ if (profile == null && randomConnectionExceptions && randomBoolean()) {
+ threadPool.generic().execute(() -> listener.onFailure(new ConnectTransportException(node, "simulated")));
+ } else {
+ threadPool.generic().execute(() -> listener.onResponse(new Connection() {
+ @Override
+ public DiscoveryNode getNode() {
+ return node;
+ }
+
+ @Override
+ public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
+ throws TransportException {
+ }
+
+ @Override
+ public void addCloseListener(ActionListener listener) {
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public boolean isClosed() {
+ return false;
+ }
+ }));
}
- listener.onResponse(new Connection() {
- @Override
- public DiscoveryNode getNode() {
- return node;
- }
-
- @Override
- public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
- throws TransportException {
-
- }
-
- @Override
- public void addCloseListener(ActionListener listener) {
-
- }
-
- @Override
- public void close() {
-
- }
-
- @Override
- public boolean isClosed() {
- return false;
- }
- });
- return () -> {};
+ return () -> {
+ };
}
@Override
diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java
index 623d3fdc34b..d36423e3886 100644
--- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java
+++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java
@@ -44,7 +44,6 @@ import org.elasticsearch.cluster.metadata.Manifest;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNode.Role;
-import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
@@ -1755,12 +1754,7 @@ public class CoordinatorTests extends ESTestCase {
clusterService = new ClusterService(settings, clusterSettings, masterService, clusterApplierService);
clusterService.setNodeConnectionsService(
new NodeConnectionsService(clusterService.getSettings(), deterministicTaskQueue.getThreadPool(this::onNode),
- transportService) {
- @Override
- public void connectToNodes(DiscoveryNodes discoveryNodes) {
- // override this method as it does blocking calls
- }
- });
+ transportService));
final Collection> onJoinValidators =
Collections.singletonList((dn, cs) -> extraJoinValidators.forEach(validator -> validator.accept(dn, cs)));
coordinator = new Coordinator("test_node", settings, clusterSettings, transportService, writableRegistry(),
@@ -2149,6 +2143,10 @@ public class CoordinatorTests extends ESTestCase {
}
}
+ @Override
+ protected void connectToNodesAndWait(ClusterState newClusterState) {
+ // don't do anything, and don't block
+ }
}
private static DiscoveryNode createDiscoveryNode(int nodeIndex, boolean masterEligible) {
diff --git a/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java
index 740283736a2..0736d29cf66 100644
--- a/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java
+++ b/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java
@@ -19,14 +19,13 @@
package org.elasticsearch.cluster.service;
import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.LocalNodeMasterListener;
-import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.coordination.NoMasterBlockService;
import org.elasticsearch.cluster.metadata.MetaData;
@@ -54,6 +53,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
+import static org.elasticsearch.test.ClusterServiceUtils.createNoOpNodeConnectionsService;
import static org.elasticsearch.test.ClusterServiceUtils.setState;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
@@ -88,23 +88,13 @@ public class ClusterApplierServiceTests extends ESTestCase {
super.tearDown();
}
- TimedClusterApplierService createTimedClusterService(boolean makeMaster) {
+ private TimedClusterApplierService createTimedClusterService(boolean makeMaster) {
DiscoveryNode localNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(),
emptySet(), Version.CURRENT);
TimedClusterApplierService timedClusterApplierService = new TimedClusterApplierService(Settings.builder().put("cluster.name",
"ClusterApplierServiceTests").build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool);
- timedClusterApplierService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) {
- @Override
- public void connectToNodes(DiscoveryNodes discoveryNodes) {
- // skip
- }
-
- @Override
- public void disconnectFromNodesExcept(DiscoveryNodes nodesToKeep) {
- // skip
- }
- });
+ timedClusterApplierService.setNodeConnectionsService(createNoOpNodeConnectionsService());
timedClusterApplierService.setInitialState(ClusterState.builder(new ClusterName("ClusterApplierServiceTests"))
.nodes(DiscoveryNodes.builder()
.add(localNode)
diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java
index ece8bbd7194..9c1d256b552 100644
--- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java
+++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java
@@ -748,6 +748,11 @@ public class SnapshotResiliencyTests extends ESTestCase {
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
return new MockSinglePrioritizingExecutor(node.getName(), deterministicTaskQueue);
}
+
+ @Override
+ protected void connectToNodesAndWait(ClusterState newClusterState) {
+ // don't do anything, and don't block
+ }
});
mockTransport = new DisruptableMockTransport(node, logger) {
@Override
@@ -992,23 +997,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
coordinator.start();
masterService.start();
clusterService.getClusterApplierService().setNodeConnectionsService(
- new NodeConnectionsService(clusterService.getSettings(), threadPool, transportService) {
- @Override
- public void connectToNodes(DiscoveryNodes discoveryNodes) {
- // override this method as it does blocking calls
- boolean callSuper = true;
- for (final DiscoveryNode node : discoveryNodes) {
- try {
- transportService.connectToNode(node);
- } catch (Exception e) {
- callSuper = false;
- }
- }
- if (callSuper) {
- super.connectToNodes(discoveryNodes);
- }
- }
- });
+ new NodeConnectionsService(clusterService.getSettings(), threadPool, transportService));
clusterService.getClusterApplierService().start();
indicesService.start();
indicesClusterStateService.start();
diff --git a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java
index aae232594a1..5477d292cc2 100644
--- a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java
+++ b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java
@@ -137,17 +137,7 @@ public class ClusterServiceUtils {
.put("cluster.name", "ClusterServiceTests")
.build();
ClusterService clusterService = new ClusterService(settings, clusterSettings, threadPool);
- clusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) {
- @Override
- public void connectToNodes(DiscoveryNodes discoveryNodes) {
- // skip
- }
-
- @Override
- public void disconnectFromNodesExcept(DiscoveryNodes nodesToKeep) {
- // skip
- }
- });
+ clusterService.setNodeConnectionsService(createNoOpNodeConnectionsService());
ClusterState initialClusterState = ClusterState.builder(new ClusterName(ClusterServiceUtils.class.getSimpleName()))
.nodes(DiscoveryNodes.builder()
.add(localNode)
@@ -162,6 +152,21 @@ public class ClusterServiceUtils {
return clusterService;
}
+ public static NodeConnectionsService createNoOpNodeConnectionsService() {
+ return new NodeConnectionsService(Settings.EMPTY, null, null) {
+ @Override
+ public void connectToNodes(DiscoveryNodes discoveryNodes, Runnable onCompletion) {
+ // don't do anything
+ onCompletion.run();
+ }
+
+ @Override
+ public void disconnectFromNodesExcept(DiscoveryNodes nodesToKeep) {
+ // don't do anything
+ }
+ };
+ }
+
public static ClusterStatePublisher createClusterStatePublisher(ClusterApplier clusterApplier) {
return (event, publishListener, ackListener) ->
clusterApplier.onNewClusterState("mock_publish_to_self[" + event.source() + "]", () -> event.state(),
diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruption.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruption.java
index d620e7633f2..66514b1b274 100644
--- a/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruption.java
+++ b/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruption.java
@@ -38,6 +38,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.function.BiConsumer;
import static org.junit.Assert.assertFalse;
@@ -49,7 +50,7 @@ import static org.junit.Assert.assertFalse;
*/
public class NetworkDisruption implements ServiceDisruptionScheme {
- private final Logger logger = LogManager.getLogger(NetworkDisruption.class);
+ private static final Logger logger = LogManager.getLogger(NetworkDisruption.class);
private final DisruptedLinks disruptedLinks;
private final NetworkLinkDisruptionType networkLinkDisruptionType;
@@ -103,9 +104,17 @@ public class NetworkDisruption implements ServiceDisruptionScheme {
* handy to be able to ensure this happens faster
*/
public static void ensureFullyConnectedCluster(InternalTestCluster cluster) {
- for (String node: cluster.getNodeNames()) {
+ final String[] nodeNames = cluster.getNodeNames();
+ final CountDownLatch countDownLatch = new CountDownLatch(nodeNames.length);
+ for (String node : nodeNames) {
ClusterState stateOnNode = cluster.getInstance(ClusterService.class, node).state();
- cluster.getInstance(NodeConnectionsService.class, node).connectToNodes(stateOnNode.nodes());
+ cluster.getInstance(NodeConnectionsService.class, node).reconnectToNodes(stateOnNode.nodes(), countDownLatch::countDown);
+ }
+
+ try {
+ countDownLatch.await();
+ } catch (InterruptedException e) {
+ throw new AssertionError(e);
}
}