Connect to new nodes concurrently (#22984)

When a node receives a new cluster state from the master, it opens up connections to any new node in the cluster state. That has always been done serially on the cluster state thread but it has been a long standing TODO to do this concurrently, which is done by this PR.

This is spin off of #22828, where an extra handshake is done whenever connecting to a node, which may slow down connecting. Also, the handshake is done in a blocking fashion which triggers assertions w.r.t blocking requests on the cluster state thread. Instead of adding an exception, I opted to implement concurrent connections which both side steps the assertion and compensates for the extra handshake.
This commit is contained in:
Boaz Leskes 2017-02-06 16:32:41 +01:00 committed by GitHub
parent e4663d6263
commit 5e7d22357f
4 changed files with 57 additions and 28 deletions

View File

@ -21,6 +21,7 @@ package org.elasticsearch.cluster;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier; import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasable;
@ -37,9 +38,9 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import static org.elasticsearch.common.settings.Setting.Property; import static org.elasticsearch.common.settings.Setting.Property;
@ -78,21 +79,53 @@ public class NodeConnectionsService extends AbstractLifecycleComponent {
this.reconnectInterval = NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.get(settings); this.reconnectInterval = NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.get(settings);
} }
public void connectToNodes(Iterable<DiscoveryNode> discoveryNodes) { public void connectToNodes(DiscoveryNodes discoveryNodes) {
CountDownLatch latch = new CountDownLatch(discoveryNodes.getSize());
// TODO: do this in parallel (and wait)
for (final DiscoveryNode node : discoveryNodes) { for (final DiscoveryNode node : discoveryNodes) {
final boolean connected;
try (Releasable ignored = nodeLocks.acquire(node)) { try (Releasable ignored = nodeLocks.acquire(node)) {
nodes.putIfAbsent(node, 0); nodes.putIfAbsent(node, 0);
validateNodeConnected(node); connected = transportService.nodeConnected(node);
} }
if (connected) {
latch.countDown();
} else {
// spawn to another thread to do in parallel
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
// both errors and rejections are logged here. the service
// will try again after `cluster.nodes.reconnect_interval` on all nodes but the current master.
// On the master, node fault detection will remove these nodes from the cluster as their are not
// connected. Note that it is very rare that we end up here on the master.
logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to connect to {}", node), e);
}
@Override
protected void doRun() throws Exception {
try (Releasable ignored = nodeLocks.acquire(node)) {
validateAndConnectIfNeeded(node);
}
}
@Override
public void onAfter() {
latch.countDown();
}
});
}
}
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} }
} }
/** /**
* Disconnects from all nodes except the ones provided as parameter * Disconnects from all nodes except the ones provided as parameter
*/ */
public void disconnectFromNodesExcept(Iterable<DiscoveryNode> nodesToKeep) { public void disconnectFromNodesExcept(DiscoveryNodes nodesToKeep) {
Set<DiscoveryNode> currentNodes = new HashSet<>(nodes.keySet()); Set<DiscoveryNode> currentNodes = new HashSet<>(nodes.keySet());
for (DiscoveryNode node : nodesToKeep) { for (DiscoveryNode node : nodesToKeep) {
currentNodes.remove(node); currentNodes.remove(node);
@ -110,8 +143,8 @@ public class NodeConnectionsService extends AbstractLifecycleComponent {
} }
} }
void validateNodeConnected(DiscoveryNode node) { void validateAndConnectIfNeeded(DiscoveryNode node) {
assert nodeLocks.isHeldByCurrentThread(node) : "validateNodeConnected must be called under lock"; assert nodeLocks.isHeldByCurrentThread(node) : "validateAndConnectIfNeeded must be called under lock";
if (lifecycle.stoppedOrClosed() || if (lifecycle.stoppedOrClosed() ||
nodes.containsKey(node) == false) { // we double check existence of node since connectToNode might take time... nodes.containsKey(node) == false) { // we double check existence of node since connectToNode might take time...
// nothing to do // nothing to do
@ -147,7 +180,7 @@ public class NodeConnectionsService extends AbstractLifecycleComponent {
protected void doRun() { protected void doRun() {
for (DiscoveryNode node : nodes.keySet()) { for (DiscoveryNode node : nodes.keySet()) {
try (Releasable ignored = nodeLocks.acquire(node)) { try (Releasable ignored = nodeLocks.acquire(node)) {
validateNodeConnected(node); validateAndConnectIfNeeded(node);
} }
} }
} }

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.ConnectionProfile;
@ -40,7 +41,6 @@ import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.TransportServiceAdapter; import org.elasticsearch.transport.TransportServiceAdapter;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
import java.io.IOException; import java.io.IOException;
@ -59,7 +59,7 @@ import static org.hamcrest.Matchers.equalTo;
public class NodeConnectionsServiceTests extends ESTestCase { public class NodeConnectionsServiceTests extends ESTestCase {
private static ThreadPool THREAD_POOL; private ThreadPool threadPool;
private MockTransport transport; private MockTransport transport;
private TransportService transportService; private TransportService transportService;
@ -83,7 +83,7 @@ public class NodeConnectionsServiceTests extends ESTestCase {
public void testConnectAndDisconnect() { public void testConnectAndDisconnect() {
List<DiscoveryNode> nodes = generateNodes(); List<DiscoveryNode> nodes = generateNodes();
NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, THREAD_POOL, transportService); NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService);
ClusterState current = clusterStateFromNodes(Collections.emptyList()); ClusterState current = clusterStateFromNodes(Collections.emptyList());
ClusterChangedEvent event = new ClusterChangedEvent("test", clusterStateFromNodes(randomSubsetOf(nodes)), current); ClusterChangedEvent event = new ClusterChangedEvent("test", clusterStateFromNodes(randomSubsetOf(nodes)), current);
@ -107,14 +107,14 @@ public class NodeConnectionsServiceTests extends ESTestCase {
public void testReconnect() { public void testReconnect() {
List<DiscoveryNode> nodes = generateNodes(); List<DiscoveryNode> nodes = generateNodes();
NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, THREAD_POOL, transportService); NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService);
ClusterState current = clusterStateFromNodes(Collections.emptyList()); ClusterState current = clusterStateFromNodes(Collections.emptyList());
ClusterChangedEvent event = new ClusterChangedEvent("test", clusterStateFromNodes(randomSubsetOf(nodes)), current); ClusterChangedEvent event = new ClusterChangedEvent("test", clusterStateFromNodes(randomSubsetOf(nodes)), current);
transport.randomConnectionExceptions = true; transport.randomConnectionExceptions = true;
service.connectToNodes(event.nodesDelta().addedNodes()); service.connectToNodes(event.state().nodes());
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
// simulate disconnects // simulate disconnects
@ -151,8 +151,9 @@ public class NodeConnectionsServiceTests extends ESTestCase {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
super.setUp(); super.setUp();
this.threadPool = new TestThreadPool(getClass().getName());
this.transport = new MockTransport(); this.transport = new MockTransport();
transportService = new TransportService(Settings.EMPTY, transport, THREAD_POOL, TransportService.NOOP_TRANSPORT_INTERCEPTOR, transportService = new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundAddress -> DiscoveryNode.createLocal(Settings.EMPTY, buildNewFakeTransportAddress(), UUIDs.randomBase64UUID()), null); boundAddress -> DiscoveryNode.createLocal(Settings.EMPTY, buildNewFakeTransportAddress(), UUIDs.randomBase64UUID()), null);
transportService.start(); transportService.start();
transportService.acceptIncomingRequests(); transportService.acceptIncomingRequests();
@ -162,16 +163,11 @@ public class NodeConnectionsServiceTests extends ESTestCase {
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
transportService.stop(); transportService.stop();
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
threadPool = null;
super.tearDown(); super.tearDown();
} }
@AfterClass
public static void stopThreadPool() {
ThreadPool.terminate(THREAD_POOL, 30, TimeUnit.SECONDS);
THREAD_POOL = null;
}
final class MockTransport implements Transport { final class MockTransport implements Transport {
private final AtomicLong requestId = new AtomicLong(); private final AtomicLong requestId = new AtomicLong();
Set<DiscoveryNode> connectedNodes = ConcurrentCollections.newConcurrentSet(); Set<DiscoveryNode> connectedNodes = ConcurrentCollections.newConcurrentSet();

View File

@ -126,12 +126,12 @@ public class ClusterServiceTests extends ESTestCase {
emptySet(), Version.CURRENT)); emptySet(), Version.CURRENT));
timedClusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) { timedClusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) {
@Override @Override
public void connectToNodes(Iterable<DiscoveryNode> discoveryNodes) { public void connectToNodes(DiscoveryNodes discoveryNodes) {
// skip // skip
} }
@Override @Override
public void disconnectFromNodesExcept(Iterable<DiscoveryNode> nodesToKeep) { public void disconnectFromNodesExcept(DiscoveryNodes nodesToKeep) {
// skip // skip
} }
}); });
@ -1059,12 +1059,12 @@ public class ClusterServiceTests extends ESTestCase {
Set<DiscoveryNode> currentNodes = new HashSet<>(); Set<DiscoveryNode> currentNodes = new HashSet<>();
timedClusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) { timedClusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) {
@Override @Override
public void connectToNodes(Iterable<DiscoveryNode> discoveryNodes) { public void connectToNodes(DiscoveryNodes discoveryNodes) {
discoveryNodes.forEach(currentNodes::add); discoveryNodes.forEach(currentNodes::add);
} }
@Override @Override
public void disconnectFromNodesExcept(Iterable<DiscoveryNode> nodesToKeep) { public void disconnectFromNodesExcept(DiscoveryNodes nodesToKeep) {
Set<DiscoveryNode> nodeSet = new HashSet<>(); Set<DiscoveryNode> nodeSet = new HashSet<>();
nodesToKeep.iterator().forEachRemaining(nodeSet::add); nodesToKeep.iterator().forEachRemaining(nodeSet::add);
currentNodes.removeIf(node -> nodeSet.contains(node) == false); currentNodes.removeIf(node -> nodeSet.contains(node) == false);

View File

@ -53,12 +53,12 @@ public class ClusterServiceUtils {
threadPool, () -> localNode); threadPool, () -> localNode);
clusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) { clusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) {
@Override @Override
public void connectToNodes(Iterable<DiscoveryNode> discoveryNodes) { public void connectToNodes(DiscoveryNodes discoveryNodes) {
// skip // skip
} }
@Override @Override
public void disconnectFromNodesExcept(Iterable<DiscoveryNode> nodesToKeep) { public void disconnectFromNodesExcept(DiscoveryNodes nodesToKeep) {
// skip // skip
} }
}); });