Only connect to new nodes on new cluster state (#39629)

Today, when applying new cluster state we attempt to connect to all of its
nodes as a blocking part of the application process. This is the right thing to
do with new nodes, and is a no-op on any already-connected nodes, but is
questionable on known nodes from which we are currently disconnected: there is
a risk that we are partitioned from these nodes so that any attempt to connect
to them will hang until it times out. This can dramatically slow down the
application of new cluster states which hinders the recovery of the cluster
during certain kinds of partition.

If nodes are disconnected from the master then it is likely that they are to be
removed as part of a subsequent cluster state update, so there's no need to try
and reconnect to them like this. Moreover there is no need to attempt to
reconnect to disconnected nodes as part of the cluster state application
process, because we periodically try and reconnect to any disconnected nodes,
and handle their disconnectedness reasonably gracefully in the meantime.

This commit alters this behaviour to avoid reconnecting to known nodes during
cluster state application.

Resolves #29025.
This commit is contained in:
David Turner 2019-03-12 19:26:25 +00:00
parent aeb0116355
commit 049970af3e
8 changed files with 643 additions and 243 deletions

View File

@ -18,58 +18,75 @@
*/
package org.elasticsearch.cluster;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.PlainListenableActionFuture;
import org.elasticsearch.cluster.coordination.FollowersChecker;
import org.elasticsearch.cluster.coordination.LeaderChecker;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.common.settings.Setting.Property;
import static org.elasticsearch.common.settings.Setting.positiveTimeSetting;
/**
* This component is responsible for connecting to nodes once they are added to the cluster state, and disconnect when they are
* removed. Also, it periodically checks that all connections are still open and if needed restores them.
* Note that this component is *not* responsible for removing nodes from the cluster if they disconnect / do not respond
* to pings. This is done by {@link FollowersChecker}. Master fault detection is done by {@link LeaderChecker}.
* This component is responsible for maintaining connections from this node to all the nodes listed in the cluster state, and for
* disconnecting from nodes once they are removed from the cluster state. It periodically checks that all connections are still open and
* restores them if needed. Note that this component is *not* responsible for removing nodes from the cluster state if they disconnect or
* are unresponsive: this is the job of the master's fault detection components, particularly {@link FollowersChecker}.
* <p>
* The {@link NodeConnectionsService#connectToNodes(DiscoveryNodes, Runnable)} and {@link
* NodeConnectionsService#disconnectFromNodesExcept(DiscoveryNodes)} methods are called on the {@link ClusterApplier} thread. This component
* allows the {@code ClusterApplier} to block on forming connections to _new_ nodes, because the rest of the system treats a missing
* connection with another node in the cluster state as an exceptional condition and we don't want this to happen to new nodes. However we
* need not block on re-establishing existing connections because if a connection is down then we are already in an exceptional situation
* and it doesn't matter much if we stay in this situation a little longer.
* <p>
* This component does not block on disconnections at all, because a disconnection might need to wait for an ongoing (background) connection
* attempt to complete first.
*/
public class NodeConnectionsService extends AbstractLifecycleComponent {
private static final Logger logger = LogManager.getLogger(NodeConnectionsService.class);
public static final Setting<TimeValue> CLUSTER_NODE_RECONNECT_INTERVAL_SETTING =
positiveTimeSetting("cluster.nodes.reconnect_interval", TimeValue.timeValueSeconds(10), Property.NodeScope);
positiveTimeSetting("cluster.nodes.reconnect_interval", TimeValue.timeValueSeconds(10), Property.NodeScope);
private final ThreadPool threadPool;
private final TransportService transportService;
// map between current node and the number of failed connection attempts. 0 means successfully connected.
// if a node doesn't appear in this list it shouldn't be monitored
private ConcurrentMap<DiscoveryNode, Integer> nodes = ConcurrentCollections.newConcurrentMap();
// Protects changes to targetsByNode and its values (i.e. ConnectionTarget#activityType and ConnectionTarget#listener).
// Crucially there are no blocking calls under this mutex: it is not held while connecting or disconnecting.
private final Object mutex = new Object();
private final KeyedLock<DiscoveryNode> nodeLocks = new KeyedLock<>();
// contains an entry for every node in the latest cluster state, as well as for nodes from which we are in the process of
// disconnecting
private final Map<DiscoveryNode, ConnectionTarget> targetsByNode = new HashMap<>();
private final TimeValue reconnectInterval;
private volatile Scheduler.Cancellable backgroundCancellable = null;
private volatile ConnectionChecker connectionChecker;
@Inject
public NodeConnectionsService(Settings settings, ThreadPool threadPool, TransportService transportService) {
@ -78,132 +95,337 @@ public class NodeConnectionsService extends AbstractLifecycleComponent {
this.reconnectInterval = NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.get(settings);
}
public void connectToNodes(DiscoveryNodes discoveryNodes) {
CountDownLatch latch = new CountDownLatch(discoveryNodes.getSize());
for (final DiscoveryNode node : discoveryNodes) {
final boolean connected;
try (Releasable ignored = nodeLocks.acquire(node)) {
nodes.putIfAbsent(node, 0);
connected = transportService.nodeConnected(node);
}
if (connected) {
latch.countDown();
} else {
// spawn to another thread to do in parallel
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
// both errors and rejections are logged here. the service
// will try again after `cluster.nodes.reconnect_interval` on all nodes but the current master.
// On the master, node fault detection will remove these nodes from the cluster as their are not
// connected. Note that it is very rare that we end up here on the master.
logger.warn(() -> new ParameterizedMessage("failed to connect to {}", node), e);
}
/**
* Connect to all the given nodes, but do not disconnect from any extra nodes. Calls the completion handler on completion of all
* connection attempts to _new_ nodes, but not on attempts to re-establish connections to nodes that are already known.
*/
public void connectToNodes(DiscoveryNodes discoveryNodes, Runnable onCompletion) {
@Override
protected void doRun() {
try (Releasable ignored = nodeLocks.acquire(node)) {
validateAndConnectIfNeeded(node);
}
}
if (discoveryNodes.getSize() == 0) {
onCompletion.run();
return;
}
@Override
public void onAfter() {
latch.countDown();
}
});
final GroupedActionListener<Void> listener
= new GroupedActionListener<>(ActionListener.wrap(onCompletion), discoveryNodes.getSize());
final List<Runnable> runnables = new ArrayList<>(discoveryNodes.getSize());
synchronized (mutex) {
for (final DiscoveryNode discoveryNode : discoveryNodes) {
ConnectionTarget connectionTarget = targetsByNode.get(discoveryNode);
final boolean isNewNode;
if (connectionTarget == null) {
// new node, set up target and listener
connectionTarget = new ConnectionTarget(discoveryNode);
targetsByNode.put(discoveryNode, connectionTarget);
isNewNode = true;
} else {
// existing node, but maybe we're disconnecting from it, in which case it was recently removed from the cluster
// state and has now been re-added so we should wait for the re-connection
isNewNode = connectionTarget.isPendingDisconnection();
}
if (isNewNode) {
runnables.add(connectionTarget.connect(listener));
} else {
// known node, try and ensure it's connected but do not wait
runnables.add(connectionTarget.connect(null));
runnables.add(() -> listener.onResponse(null));
}
}
}
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
runnables.forEach(Runnable::run);
}
/**
* Disconnects from all nodes except the ones provided as parameter
* Disconnect from any nodes to which we are currently connected which do not appear in the given nodes. Does not wait for the
* disconnections to complete, because they might have to wait for ongoing connection attempts first.
*/
public void disconnectFromNodesExcept(DiscoveryNodes nodesToKeep) {
Set<DiscoveryNode> currentNodes = new HashSet<>(nodes.keySet());
for (DiscoveryNode node : nodesToKeep) {
currentNodes.remove(node);
}
for (final DiscoveryNode node : currentNodes) {
try (Releasable ignored = nodeLocks.acquire(node)) {
Integer current = nodes.remove(node);
assert current != null : "node " + node + " was removed in event but not in internal nodes";
try {
transportService.disconnectFromNode(node);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("failed to disconnect to node [{}]", node), e);
}
public void disconnectFromNodesExcept(DiscoveryNodes discoveryNodes) {
final List<Runnable> runnables = new ArrayList<>();
synchronized (mutex) {
final Set<DiscoveryNode> nodesToDisconnect = new HashSet<>(targetsByNode.keySet());
for (final DiscoveryNode discoveryNode : discoveryNodes) {
nodesToDisconnect.remove(discoveryNode);
}
for (final DiscoveryNode discoveryNode : nodesToDisconnect) {
runnables.add(targetsByNode.get(discoveryNode).disconnect());
}
}
runnables.forEach(Runnable::run);
}
void validateAndConnectIfNeeded(DiscoveryNode node) {
assert nodeLocks.isHeldByCurrentThread(node) : "validateAndConnectIfNeeded must be called under lock";
if (lifecycle.stoppedOrClosed() ||
nodes.containsKey(node) == false) { // we double check existence of node since connectToNode might take time...
// nothing to do
} else {
try {
// connecting to an already connected node is a noop
transportService.connectToNode(node);
nodes.put(node, 0);
} catch (Exception e) {
Integer nodeFailureCount = nodes.get(node);
assert nodeFailureCount != null : node + " didn't have a counter in nodes map";
nodeFailureCount = nodeFailureCount + 1;
// log every 6th failure
if ((nodeFailureCount % 6) == 1) {
final int finalNodeFailureCount = nodeFailureCount;
logger.warn(() -> new ParameterizedMessage(
"failed to connect to node {} (tried [{}] times)", node, finalNodeFailureCount), e);
/**
* Makes a single attempt to reconnect to any nodes which are disconnected but should be connected. Does not attempt to reconnect any
* nodes which are in the process of disconnecting. The onCompletion handler is called after all ongoing connection/disconnection
* attempts have completed.
*/
void ensureConnections(Runnable onCompletion) {
final List<Runnable> runnables = new ArrayList<>();
synchronized (mutex) {
final Collection<ConnectionTarget> connectionTargets = targetsByNode.values();
if (connectionTargets.isEmpty()) {
runnables.add(onCompletion);
} else {
logger.trace("ensuring connections to {}", targetsByNode);
final GroupedActionListener<Void> listener = new GroupedActionListener<>(
ActionListener.wrap(onCompletion), connectionTargets.size());
for (final ConnectionTarget connectionTarget : connectionTargets) {
runnables.add(connectionTarget.ensureConnected(listener));
}
nodes.put(node, nodeFailureCount);
}
}
runnables.forEach(Runnable::run);
}
class ConnectionChecker extends AbstractRunnable {
protected void doRun() {
if (connectionChecker == this) {
ensureConnections(this::scheduleNextCheck);
}
}
void scheduleNextCheck() {
if (connectionChecker == this) {
threadPool.scheduleUnlessShuttingDown(reconnectInterval, ThreadPool.Names.GENERIC, this);
}
}
@Override
public void onFailure(Exception e) {
logger.warn("unexpected error while checking for node reconnects", e);
}
protected void doRun() {
for (DiscoveryNode node : nodes.keySet()) {
try (Releasable ignored = nodeLocks.acquire(node)) {
validateAndConnectIfNeeded(node);
}
}
scheduleNextCheck();
}
@Override
public void onAfter() {
if (lifecycle.started()) {
backgroundCancellable = threadPool.schedule(this, reconnectInterval, ThreadPool.Names.GENERIC);
}
public String toString() {
return "periodic reconnection checker";
}
}
@Override
protected void doStart() {
backgroundCancellable = threadPool.schedule(new ConnectionChecker(), reconnectInterval, ThreadPool.Names.GENERIC);
final ConnectionChecker connectionChecker = new ConnectionChecker();
this.connectionChecker = connectionChecker;
connectionChecker.scheduleNextCheck();
}
@Override
protected void doStop() {
if (backgroundCancellable != null) {
backgroundCancellable.cancel();
}
connectionChecker = null;
}
@Override
protected void doClose() {
}
// for disruption tests, re-establish any disrupted connections
public void reconnectToNodes(DiscoveryNodes discoveryNodes, Runnable onCompletion) {
connectToNodes(discoveryNodes, () -> {
disconnectFromNodesExcept(discoveryNodes);
ensureConnections(onCompletion);
});
}
private enum ActivityType {
IDLE,
CONNECTING,
DISCONNECTING
}
/**
* {@link ConnectionTarget} ensures that we are never concurrently connecting to and disconnecting from a node, and that we eventually
* either connect to or disconnect from it according to whether {@link ConnectionTarget#connect(ActionListener)} or
* {@link ConnectionTarget#disconnect()} was called last.
* <p>
* Each {@link ConnectionTarget} is in one of these states:
* <p>
* - idle ({@link ConnectionTarget#future} has no listeners)
* - awaiting connection ({@link ConnectionTarget#future} may contain listeners awaiting a connection)
* - awaiting disconnection ({@link ConnectionTarget#future} may contain listeners awaiting a disconnection)
* <p>
* It will be awaiting connection (respectively disconnection) after calling {@code connect()} (respectively {@code disconnect()}). It
* will eventually become idle if these methods are not called infinitely often.
* <p>
* These methods return a {@link Runnable} which starts the connection/disconnection process iff it was idle before the method was
* called, and which notifies any failed listeners if the {@code ConnectionTarget} went from {@code CONNECTING} to {@code DISCONNECTING}
* or vice versa. The connection/disconnection process continues until all listeners have been removed, at which point it becomes idle
* again.
* <p>
* Additionally if the last step of the process was a disconnection then this target is removed from the current set of targets. Thus
* if this {@link ConnectionTarget} is idle and in the current set of targets then it should be connected.
* <p>
* All of the {@code listeners} are awaiting the completion of the same activity, which is either a connection or a disconnection. If
* we are currently connecting and then {@link ConnectionTarget#disconnect()} is called then all connection listeners are
* removed from the list so they can be notified of failure; once the connecting process has finished a disconnection will be started.
* Similarly if we are currently disconnecting and then {@link ConnectionTarget#connect(ActionListener)} is called then all
* disconnection listeners are immediately removed for failure notification and a connection is started once the disconnection is
* complete.
*/
private class ConnectionTarget {
private final DiscoveryNode discoveryNode;
private PlainListenableActionFuture<Void> future = PlainListenableActionFuture.newListenableFuture();
private ActivityType activityType = ActivityType.IDLE; // indicates what any listeners are awaiting
private final AtomicInteger consecutiveFailureCount = new AtomicInteger();
private final Runnable connectActivity = () -> threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() {
@Override
protected void doRun() {
assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held";
transportService.connectToNode(discoveryNode);
consecutiveFailureCount.set(0);
logger.debug("connected to {}", discoveryNode);
onCompletion(ActivityType.CONNECTING, null, disconnectActivity);
}
@Override
public void onFailure(Exception e) {
assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held";
final int currentFailureCount = consecutiveFailureCount.incrementAndGet();
// only warn every 6th failure
final Level level = currentFailureCount % 6 == 1 ? Level.WARN : Level.DEBUG;
logger.log(level, new ParameterizedMessage("failed to connect to {} (tried [{}] times)",
discoveryNode, currentFailureCount), e);
onCompletion(ActivityType.CONNECTING, e, disconnectActivity);
}
@Override
public String toString() {
return "connect to " + discoveryNode;
}
});
private final Runnable disconnectActivity = new AbstractRunnable() {
@Override
protected void doRun() {
assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held";
transportService.disconnectFromNode(discoveryNode);
consecutiveFailureCount.set(0);
logger.debug("disconnected from {}", discoveryNode);
onCompletion(ActivityType.DISCONNECTING, null, connectActivity);
}
@Override
public void onFailure(Exception e) {
assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held";
consecutiveFailureCount.incrementAndGet();
// we may not have disconnected, but will not retry, so this connection might have leaked
logger.warn(new ParameterizedMessage("failed to disconnect from {}, possible connection leak", discoveryNode), e);
assert false : "failed to disconnect from " + discoveryNode + ", possible connection leak\n" + e;
onCompletion(ActivityType.DISCONNECTING, e, connectActivity);
}
};
ConnectionTarget(DiscoveryNode discoveryNode) {
this.discoveryNode = discoveryNode;
}
Runnable connect(@Nullable ActionListener<Void> listener) {
return addListenerAndStartActivity(listener, ActivityType.CONNECTING, connectActivity,
"disconnection cancelled by reconnection");
}
Runnable disconnect() {
return addListenerAndStartActivity(null, ActivityType.DISCONNECTING, disconnectActivity,
"connection cancelled by disconnection");
}
Runnable ensureConnected(@Nullable ActionListener<Void> listener) {
assert Thread.holdsLock(mutex) : "mutex not held";
if (activityType == ActivityType.IDLE) {
if (transportService.nodeConnected(discoveryNode)) {
return () -> listener.onResponse(null);
} else {
// target is disconnected, and we are currently idle, so start a connection process.
activityType = ActivityType.CONNECTING;
addListener(listener);
return connectActivity;
}
} else {
addListener(listener);
return () -> {
};
}
}
private void addListener(@Nullable ActionListener<Void> listener) {
assert Thread.holdsLock(mutex) : "mutex not held";
assert activityType != ActivityType.IDLE;
if (listener != null) {
future.addListener(listener);
}
}
private PlainListenableActionFuture<Void> getAndClearFuture() {
assert Thread.holdsLock(mutex) : "mutex not held";
final PlainListenableActionFuture<Void> drainedFuture = future;
future = PlainListenableActionFuture.newListenableFuture();
return drainedFuture;
}
private Runnable addListenerAndStartActivity(@Nullable ActionListener<Void> listener, ActivityType newActivityType,
Runnable activity, String cancellationMessage) {
assert Thread.holdsLock(mutex) : "mutex not held";
assert newActivityType.equals(ActivityType.IDLE) == false;
if (activityType == ActivityType.IDLE) {
activityType = newActivityType;
addListener(listener);
return activity;
}
if (activityType == newActivityType) {
addListener(listener);
return () -> {
};
}
activityType = newActivityType;
final PlainListenableActionFuture<Void> oldFuture = getAndClearFuture();
addListener(listener);
return () -> oldFuture.onFailure(new ElasticsearchException(cancellationMessage));
}
private void onCompletion(ActivityType completedActivityType, @Nullable Exception e, Runnable oppositeActivity) {
assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held";
final Runnable cleanup;
synchronized (mutex) {
assert activityType != ActivityType.IDLE;
if (activityType == completedActivityType) {
final PlainListenableActionFuture<Void> oldFuture = getAndClearFuture();
activityType = ActivityType.IDLE;
cleanup = e == null ? () -> oldFuture.onResponse(null) : () -> oldFuture.onFailure(e);
if (completedActivityType.equals(ActivityType.DISCONNECTING)) {
final ConnectionTarget removedTarget = targetsByNode.remove(discoveryNode);
assert removedTarget == this : removedTarget + " vs " + this;
}
} else {
cleanup = oppositeActivity;
}
}
cleanup.run();
}
boolean isPendingDisconnection() {
assert Thread.holdsLock(mutex) : "mutex not held";
return activityType == ActivityType.DISCONNECTING;
}
@Override
public String toString() {
synchronized (mutex) {
return "ConnectionTarget{" +
"discoveryNode=" + discoveryNode +
", activityType=" + activityType +
'}';
}
}
}
}

View File

@ -55,6 +55,7 @@ import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
@ -450,7 +451,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
}
logger.trace("connecting to nodes of cluster state with version {}", newClusterState.version());
nodeConnectionsService.connectToNodes(newClusterState.nodes());
connectToNodesAndWait(newClusterState);
// nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency
if (clusterChangedEvent.state().blocks().disableStatePersistence() == false && clusterChangedEvent.metaDataChanged()) {
@ -470,6 +471,18 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
callClusterStateListeners(clusterChangedEvent);
}
protected void connectToNodesAndWait(ClusterState newClusterState) {
// can't wait for an ActionFuture on the cluster applier thread, but we do want to block the thread here, so use a CountDownLatch.
final CountDownLatch countDownLatch = new CountDownLatch(1);
nodeConnectionsService.connectToNodes(newClusterState.nodes(), countDownLatch::countDown);
try {
countDownLatch.await();
} catch (InterruptedException e) {
logger.debug("interrupted while connecting to nodes, continuing", e);
Thread.currentThread().interrupt();
}
}
private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent) {
clusterStateAppliers.forEach(applier -> {
logger.trace("calling [{}] with change to version [{}]", applier, clusterChangedEvent.state().version());

View File

@ -19,15 +19,18 @@
package org.elasticsearch.cluster;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.coordination.DeterministicTaskQueue;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
@ -39,7 +42,6 @@ import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.RequestHandlerRegistry;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportMessageListener;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
@ -55,10 +57,17 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import static java.util.Collections.emptySet;
import static org.elasticsearch.cluster.NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING;
import static org.elasticsearch.common.settings.Settings.builder;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.hamcrest.Matchers.equalTo;
public class NodeConnectionsServiceTests extends ESTestCase {
@ -66,6 +75,7 @@ public class NodeConnectionsServiceTests extends ESTestCase {
private ThreadPool threadPool;
private MockTransport transport;
private TransportService transportService;
private Map<DiscoveryNode, CheckedRunnable<Exception>> nodeConnectionBlocks;
private List<DiscoveryNode> generateNodes() {
List<DiscoveryNode> nodes = new ArrayList<>();
@ -77,65 +87,226 @@ public class NodeConnectionsServiceTests extends ESTestCase {
return nodes;
}
private ClusterState clusterStateFromNodes(List<DiscoveryNode> nodes) {
private DiscoveryNodes discoveryNodesFromList(List<DiscoveryNode> discoveryNodes) {
final DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
for (DiscoveryNode node : nodes) {
builder.add(node);
for (final DiscoveryNode discoveryNode : discoveryNodes) {
builder.add(discoveryNode);
}
return ClusterState.builder(new ClusterName("test")).nodes(builder).build();
return builder.build();
}
public void testConnectAndDisconnect() {
List<DiscoveryNode> nodes = generateNodes();
NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService);
public void testConnectAndDisconnect() throws Exception {
final NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService);
ClusterState current = clusterStateFromNodes(Collections.emptyList());
ClusterChangedEvent event = new ClusterChangedEvent("test", clusterStateFromNodes(randomSubsetOf(nodes)), current);
final AtomicBoolean stopReconnecting = new AtomicBoolean();
final Thread reconnectionThread = new Thread(() -> {
while (stopReconnecting.get() == false) {
final PlainActionFuture<Void> future = new PlainActionFuture<>();
service.ensureConnections(() -> future.onResponse(null));
future.actionGet();
}
});
reconnectionThread.start();
service.connectToNodes(event.state().nodes());
assertConnected(event.state().nodes());
final List<DiscoveryNode> allNodes = generateNodes();
for (int iteration = 0; iteration < 3; iteration++) {
service.disconnectFromNodesExcept(event.state().nodes());
assertConnectedExactlyToNodes(event.state());
final boolean isDisrupting = randomBoolean();
final AtomicBoolean stopDisrupting = new AtomicBoolean();
final Thread disruptionThread = new Thread(() -> {
while (isDisrupting && stopDisrupting.get() == false) {
transportService.disconnectFromNode(randomFrom(allNodes));
}
});
disruptionThread.start();
current = event.state();
event = new ClusterChangedEvent("test", clusterStateFromNodes(randomSubsetOf(nodes)), current);
final DiscoveryNodes nodes = discoveryNodesFromList(randomSubsetOf(allNodes));
final PlainActionFuture<Void> future = new PlainActionFuture<>();
service.connectToNodes(nodes, () -> future.onResponse(null));
future.actionGet();
if (isDisrupting == false) {
assertConnected(nodes);
}
service.disconnectFromNodesExcept(nodes);
service.connectToNodes(event.state().nodes());
assertConnected(event.state().nodes());
assertTrue(stopDisrupting.compareAndSet(false, true));
disruptionThread.join();
service.disconnectFromNodesExcept(event.state().nodes());
assertConnectedExactlyToNodes(event.state());
if (randomBoolean()) {
// sometimes do not wait for the disconnections to complete before starting the next connections
if (usually()) {
ensureConnections(service);
assertConnectedExactlyToNodes(nodes);
} else {
assertBusy(() -> assertConnectedExactlyToNodes(nodes));
}
}
}
assertTrue(stopReconnecting.compareAndSet(false, true));
reconnectionThread.join();
}
public void testReconnect() {
List<DiscoveryNode> nodes = generateNodes();
NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService);
public void testPeriodicReconnection() {
final Settings.Builder settings = Settings.builder();
final long reconnectIntervalMillis;
if (randomBoolean()) {
reconnectIntervalMillis = CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.get(Settings.EMPTY).millis();
} else {
reconnectIntervalMillis = randomLongBetween(1, 100000);
settings.put(CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.getKey(), reconnectIntervalMillis + "ms");
}
ClusterState current = clusterStateFromNodes(Collections.emptyList());
ClusterChangedEvent event = new ClusterChangedEvent("test", clusterStateFromNodes(randomSubsetOf(nodes)), current);
final DeterministicTaskQueue deterministicTaskQueue
= new DeterministicTaskQueue(builder().put(NODE_NAME_SETTING.getKey(), "node").build(), random());
final NodeConnectionsService service
= new NodeConnectionsService(settings.build(), deterministicTaskQueue.getThreadPool(), transportService);
service.start();
final List<DiscoveryNode> allNodes = generateNodes();
final DiscoveryNodes targetNodes = discoveryNodesFromList(randomSubsetOf(allNodes));
transport.randomConnectionExceptions = true;
service.connectToNodes(event.state().nodes());
final AtomicBoolean connectionCompleted = new AtomicBoolean();
service.connectToNodes(targetNodes, () -> connectionCompleted.set(true));
deterministicTaskQueue.runAllRunnableTasks();
assertTrue(connectionCompleted.get());
for (int i = 0; i < 3; i++) {
long maxDisconnectionTime = 0;
for (int iteration = 0; iteration < 3; iteration++) {
// simulate disconnects
for (DiscoveryNode node : randomSubsetOf(nodes)) {
transportService.disconnectFromNode(node);
for (DiscoveryNode node : allNodes) {
if (randomBoolean()) {
final long disconnectionTime = randomLongBetween(0, 120000);
maxDisconnectionTime = Math.max(maxDisconnectionTime, disconnectionTime);
deterministicTaskQueue.scheduleAt(disconnectionTime, new Runnable() {
@Override
public void run() {
transportService.disconnectFromNode(node);
}
@Override
public String toString() {
return "scheduled disconnection of " + node;
}
});
}
}
service.new ConnectionChecker().run();
}
runTasksUntil(deterministicTaskQueue, maxDisconnectionTime);
// disable exceptions so things can be restored
transport.randomConnectionExceptions = false;
service.new ConnectionChecker().run();
assertConnectedExactlyToNodes(event.state());
logger.info("renewing connections");
runTasksUntil(deterministicTaskQueue, maxDisconnectionTime + reconnectIntervalMillis);
assertConnectedExactlyToNodes(targetNodes);
}
private void assertConnectedExactlyToNodes(ClusterState state) {
assertConnected(state.nodes());
assertThat(transportService.getConnectionManager().size(), equalTo(state.nodes().getSize()));
public void testOnlyBlocksOnConnectionsToNewNodes() throws Exception {
final NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService);
// connect to one node
final DiscoveryNode node0 = new DiscoveryNode("node0", buildNewFakeTransportAddress(), Version.CURRENT);
final DiscoveryNodes nodes0 = DiscoveryNodes.builder().add(node0).build();
final PlainActionFuture<Void> future0 = new PlainActionFuture<>();
service.connectToNodes(nodes0, () -> future0.onResponse(null));
future0.actionGet();
assertConnectedExactlyToNodes(nodes0);
// connection attempts to node0 block indefinitely
final CyclicBarrier connectionBarrier = new CyclicBarrier(2);
try {
nodeConnectionBlocks.put(node0, connectionBarrier::await);
transportService.disconnectFromNode(node0);
// can still connect to another node without blocking
final DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT);
final DiscoveryNodes nodes1 = DiscoveryNodes.builder().add(node1).build();
final DiscoveryNodes nodes01 = DiscoveryNodes.builder(nodes0).add(node1).build();
final PlainActionFuture<Void> future1 = new PlainActionFuture<>();
service.connectToNodes(nodes01, () -> future1.onResponse(null));
future1.actionGet();
assertConnectedExactlyToNodes(nodes1);
// can also disconnect from node0 without blocking
final PlainActionFuture<Void> future2 = new PlainActionFuture<>();
service.connectToNodes(nodes1, () -> future2.onResponse(null));
future2.actionGet();
service.disconnectFromNodesExcept(nodes1);
assertConnectedExactlyToNodes(nodes1);
// however, now node0 is considered to be a new node so we will block on a subsequent attempt to connect to it
final PlainActionFuture<Void> future3 = new PlainActionFuture<>();
service.connectToNodes(nodes01, () -> future3.onResponse(null));
expectThrows(ElasticsearchTimeoutException.class, () -> future3.actionGet(timeValueMillis(scaledRandomIntBetween(1, 1000))));
// once the connection is unblocked we successfully connect to it.
nodeConnectionBlocks.clear();
connectionBarrier.await(0, TimeUnit.SECONDS);
future3.actionGet();
assertConnectedExactlyToNodes(nodes01);
// if we disconnect from a node while blocked trying to connect to it then we do eventually disconnect from it
nodeConnectionBlocks.put(node0, connectionBarrier::await);
transportService.disconnectFromNode(node0);
final PlainActionFuture<Void> future4 = new PlainActionFuture<>();
service.connectToNodes(nodes01, () -> future4.onResponse(null));
future4.actionGet();
assertConnectedExactlyToNodes(nodes1);
service.disconnectFromNodesExcept(nodes1);
connectionBarrier.await();
if (randomBoolean()) {
// assertBusy because the connection completes before disconnecting, so we might briefly observe a connection to node0
assertBusy(() -> assertConnectedExactlyToNodes(nodes1));
}
// use ensureConnections() to wait until the service is idle
ensureConnections(service);
assertConnectedExactlyToNodes(nodes1);
// if we disconnect from a node while blocked trying to connect to it then the listener is notified
final PlainActionFuture<Void> future6 = new PlainActionFuture<>();
service.connectToNodes(nodes01, () -> future6.onResponse(null));
expectThrows(ElasticsearchTimeoutException.class, () -> future6.actionGet(timeValueMillis(scaledRandomIntBetween(1, 1000))));
service.disconnectFromNodesExcept(nodes1);
future6.actionGet(); // completed even though the connection attempt is still blocked
assertConnectedExactlyToNodes(nodes1);
nodeConnectionBlocks.clear();
connectionBarrier.await(10, TimeUnit.SECONDS);
ensureConnections(service);
assertConnectedExactlyToNodes(nodes1);
} finally {
nodeConnectionBlocks.clear();
connectionBarrier.reset();
}
}
private void runTasksUntil(DeterministicTaskQueue deterministicTaskQueue, long endTimeMillis) {
while (deterministicTaskQueue.getCurrentTimeMillis() < endTimeMillis) {
if (deterministicTaskQueue.hasRunnableTasks() && randomBoolean()) {
deterministicTaskQueue.runRandomTask();
} else if (deterministicTaskQueue.hasDeferredTasks()) {
deterministicTaskQueue.advanceTime();
}
}
deterministicTaskQueue.runAllRunnableTasks();
}
private void ensureConnections(NodeConnectionsService service) {
final PlainActionFuture<Void> future = new PlainActionFuture<>();
service.ensureConnections(() -> future.onResponse(null));
future.actionGet();
}
private void assertConnectedExactlyToNodes(DiscoveryNodes discoveryNodes) {
assertConnected(discoveryNodes);
assertThat(transportService.getConnectionManager().size(), equalTo(discoveryNodes.getSize()));
}
private void assertConnected(Iterable<DiscoveryNode> nodes) {
@ -150,10 +321,8 @@ public class NodeConnectionsServiceTests extends ESTestCase {
super.setUp();
this.threadPool = new TestThreadPool(getClass().getName());
this.transport = new MockTransport();
transportService = new NoHandshakeTransportService(Settings.EMPTY, transport, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundAddress -> DiscoveryNode.createLocal(Settings.EMPTY, buildNewFakeTransportAddress(), UUIDs.randomBase64UUID()), null,
Collections.emptySet());
nodeConnectionBlocks = newConcurrentMap();
transportService = new TestTransportService(transport, threadPool);
transportService.start();
transportService.acceptIncomingRequests();
}
@ -167,22 +336,31 @@ public class NodeConnectionsServiceTests extends ESTestCase {
super.tearDown();
}
private final class NoHandshakeTransportService extends TransportService {
private final class TestTransportService extends TransportService {
private NoHandshakeTransportService(Settings settings,
Transport transport,
ThreadPool threadPool,
TransportInterceptor transportInterceptor,
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
ClusterSettings clusterSettings,
Set<String> taskHeaders) {
super(settings, transport, threadPool, transportInterceptor, localNodeFactory, clusterSettings, taskHeaders);
private TestTransportService(Transport transport, ThreadPool threadPool) {
super(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundAddress -> DiscoveryNode.createLocal(Settings.EMPTY, buildNewFakeTransportAddress(), UUIDs.randomBase64UUID()),
null, emptySet());
}
@Override
public HandshakeResponse handshake(Transport.Connection connection, long timeout, Predicate<ClusterName> clusterNamePredicate) {
return new HandshakeResponse(connection.getNode(), new ClusterName(""), Version.CURRENT);
}
@Override
public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
final CheckedRunnable<Exception> connectionBlock = nodeConnectionBlocks.get(node);
if (connectionBlock != null) {
try {
connectionBlock.run();
} catch (Exception e) {
throw new AssertionError(e);
}
}
super.connectToNode(node);
}
}
private final class MockTransport implements Transport {
@ -224,40 +402,36 @@ public class NodeConnectionsServiceTests extends ESTestCase {
@Override
public Releasable openConnection(DiscoveryNode node, ConnectionProfile profile, ActionListener<Connection> listener) {
if (profile == null) {
if (randomConnectionExceptions && randomBoolean()) {
listener.onFailure(new ConnectTransportException(node, "simulated"));
return () -> {};
}
if (profile == null && randomConnectionExceptions && randomBoolean()) {
threadPool.generic().execute(() -> listener.onFailure(new ConnectTransportException(node, "simulated")));
} else {
threadPool.generic().execute(() -> listener.onResponse(new Connection() {
@Override
public DiscoveryNode getNode() {
return node;
}
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws TransportException {
}
@Override
public void addCloseListener(ActionListener<Void> listener) {
}
@Override
public void close() {
}
@Override
public boolean isClosed() {
return false;
}
}));
}
listener.onResponse(new Connection() {
@Override
public DiscoveryNode getNode() {
return node;
}
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws TransportException {
}
@Override
public void addCloseListener(ActionListener<Void> listener) {
}
@Override
public void close() {
}
@Override
public boolean isClosed() {
return false;
}
});
return () -> {};
return () -> {
};
}
@Override

View File

@ -44,7 +44,6 @@ import org.elasticsearch.cluster.metadata.Manifest;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNode.Role;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
@ -1755,12 +1754,7 @@ public class CoordinatorTests extends ESTestCase {
clusterService = new ClusterService(settings, clusterSettings, masterService, clusterApplierService);
clusterService.setNodeConnectionsService(
new NodeConnectionsService(clusterService.getSettings(), deterministicTaskQueue.getThreadPool(this::onNode),
transportService) {
@Override
public void connectToNodes(DiscoveryNodes discoveryNodes) {
// override this method as it does blocking calls
}
});
transportService));
final Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators =
Collections.singletonList((dn, cs) -> extraJoinValidators.forEach(validator -> validator.accept(dn, cs)));
coordinator = new Coordinator("test_node", settings, clusterSettings, transportService, writableRegistry(),
@ -2149,6 +2143,10 @@ public class CoordinatorTests extends ESTestCase {
}
}
@Override
protected void connectToNodesAndWait(ClusterState newClusterState) {
// don't do anything, and don't block
}
}
private static DiscoveryNode createDiscoveryNode(int nodeIndex, boolean masterEligible) {

View File

@ -19,14 +19,13 @@
package org.elasticsearch.cluster.service;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.coordination.NoMasterBlockService;
import org.elasticsearch.cluster.metadata.MetaData;
@ -54,6 +53,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.test.ClusterServiceUtils.createNoOpNodeConnectionsService;
import static org.elasticsearch.test.ClusterServiceUtils.setState;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
@ -88,23 +88,13 @@ public class ClusterApplierServiceTests extends ESTestCase {
super.tearDown();
}
TimedClusterApplierService createTimedClusterService(boolean makeMaster) {
private TimedClusterApplierService createTimedClusterService(boolean makeMaster) {
DiscoveryNode localNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(),
emptySet(), Version.CURRENT);
TimedClusterApplierService timedClusterApplierService = new TimedClusterApplierService(Settings.builder().put("cluster.name",
"ClusterApplierServiceTests").build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool);
timedClusterApplierService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) {
@Override
public void connectToNodes(DiscoveryNodes discoveryNodes) {
// skip
}
@Override
public void disconnectFromNodesExcept(DiscoveryNodes nodesToKeep) {
// skip
}
});
timedClusterApplierService.setNodeConnectionsService(createNoOpNodeConnectionsService());
timedClusterApplierService.setInitialState(ClusterState.builder(new ClusterName("ClusterApplierServiceTests"))
.nodes(DiscoveryNodes.builder()
.add(localNode)

View File

@ -748,6 +748,11 @@ public class SnapshotResiliencyTests extends ESTestCase {
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
return new MockSinglePrioritizingExecutor(node.getName(), deterministicTaskQueue);
}
@Override
protected void connectToNodesAndWait(ClusterState newClusterState) {
// don't do anything, and don't block
}
});
mockTransport = new DisruptableMockTransport(node, logger) {
@Override
@ -992,23 +997,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
coordinator.start();
masterService.start();
clusterService.getClusterApplierService().setNodeConnectionsService(
new NodeConnectionsService(clusterService.getSettings(), threadPool, transportService) {
@Override
public void connectToNodes(DiscoveryNodes discoveryNodes) {
// override this method as it does blocking calls
boolean callSuper = true;
for (final DiscoveryNode node : discoveryNodes) {
try {
transportService.connectToNode(node);
} catch (Exception e) {
callSuper = false;
}
}
if (callSuper) {
super.connectToNodes(discoveryNodes);
}
}
});
new NodeConnectionsService(clusterService.getSettings(), threadPool, transportService));
clusterService.getClusterApplierService().start();
indicesService.start();
indicesClusterStateService.start();

View File

@ -137,17 +137,7 @@ public class ClusterServiceUtils {
.put("cluster.name", "ClusterServiceTests")
.build();
ClusterService clusterService = new ClusterService(settings, clusterSettings, threadPool);
clusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) {
@Override
public void connectToNodes(DiscoveryNodes discoveryNodes) {
// skip
}
@Override
public void disconnectFromNodesExcept(DiscoveryNodes nodesToKeep) {
// skip
}
});
clusterService.setNodeConnectionsService(createNoOpNodeConnectionsService());
ClusterState initialClusterState = ClusterState.builder(new ClusterName(ClusterServiceUtils.class.getSimpleName()))
.nodes(DiscoveryNodes.builder()
.add(localNode)
@ -162,6 +152,21 @@ public class ClusterServiceUtils {
return clusterService;
}
public static NodeConnectionsService createNoOpNodeConnectionsService() {
return new NodeConnectionsService(Settings.EMPTY, null, null) {
@Override
public void connectToNodes(DiscoveryNodes discoveryNodes, Runnable onCompletion) {
// don't do anything
onCompletion.run();
}
@Override
public void disconnectFromNodesExcept(DiscoveryNodes nodesToKeep) {
// don't do anything
}
};
}
public static ClusterStatePublisher createClusterStatePublisher(ClusterApplier clusterApplier) {
return (event, publishListener, ackListener) ->
clusterApplier.onNewClusterState("mock_publish_to_self[" + event.source() + "]", () -> event.state(),

View File

@ -38,6 +38,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiConsumer;
import static org.junit.Assert.assertFalse;
@ -49,7 +50,7 @@ import static org.junit.Assert.assertFalse;
*/
public class NetworkDisruption implements ServiceDisruptionScheme {
private final Logger logger = LogManager.getLogger(NetworkDisruption.class);
private static final Logger logger = LogManager.getLogger(NetworkDisruption.class);
private final DisruptedLinks disruptedLinks;
private final NetworkLinkDisruptionType networkLinkDisruptionType;
@ -103,9 +104,17 @@ public class NetworkDisruption implements ServiceDisruptionScheme {
* handy to be able to ensure this happens faster
*/
public static void ensureFullyConnectedCluster(InternalTestCluster cluster) {
for (String node: cluster.getNodeNames()) {
final String[] nodeNames = cluster.getNodeNames();
final CountDownLatch countDownLatch = new CountDownLatch(nodeNames.length);
for (String node : nodeNames) {
ClusterState stateOnNode = cluster.getInstance(ClusterService.class, node).state();
cluster.getInstance(NodeConnectionsService.class, node).connectToNodes(stateOnNode.nodes());
cluster.getInstance(NodeConnectionsService.class, node).reconnectToNodes(stateOnNode.nodes(), countDownLatch::countDown);
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
}