From dce24b5a1055074864506064a9b53c76d4b6426e Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 21 Dec 2016 10:00:04 +0100 Subject: [PATCH] make connection to nodes async and ensure that if we are not fully connected a search will fork or a reconnect --- .../search/RemoteClusterConnection.java | 254 +++++++++++------- .../action/search/SearchTransportService.java | 25 +- .../action/search/TransportSearchAction.java | 16 +- .../transport/TransportService.java | 7 + .../search/RemoteClusterConnectionTests.java | 46 ++-- 5 files changed, 212 insertions(+), 136 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java b/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java index 533f1c85f5e..6f719239857 100644 --- a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java +++ b/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java @@ -32,6 +32,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.Transport; @@ -43,26 +44,29 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Semaphore; -class RemoteClusterConnection extends AbstractComponent implements TransportConnectionListener { +final class RemoteClusterConnection extends AbstractComponent implements TransportConnectionListener { private final TransportService transportService; private final ConnectionProfile remoteProfile; private final CopyOnWriteArrayList clusterNodes = new CopyOnWriteArrayList(); private final Supplier nodeSupplier; private final String clusterName; - private final CountDownLatch connected; private volatile List seedNodes; + private final ConnectHandler connectHandler; RemoteClusterConnection(Settings settings, String clusterName, List seedNodes, TransportService transportService) { super(settings); - this.connected = new CountDownLatch(1); this.transportService = transportService; this.clusterName = clusterName; ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); @@ -86,91 +90,12 @@ class RemoteClusterConnection extends AbstractComponent implements TransportConn } }; this.seedNodes = seedNodes; + this.connectHandler = new ConnectHandler(); } - public synchronized void connectWithSeeds(ActionListener connectListener) { - if (clusterNodes.isEmpty()) { - TimeValue connectTimeout = TimeValue.timeValueSeconds(10); // TODO make configurable - Iterator iterator = Collections.synchronizedList(seedNodes).iterator(); - handshakeAndConnect(iterator, transportService, connectTimeout, connectListener, true); - } else { - connectListener.onResponse(null); - } - - } - - public synchronized void updateSeedNodes(List seedNodes) { - if (this.seedNodes.containsAll(seedNodes) == false || this.seedNodes.size() != seedNodes.size()) { - this.seedNodes = new ArrayList<>(seedNodes); - ActionListener listener = ActionListener.wrap(x -> {}, - e -> logger.error("failed to establish connection to remote cluster", e)); - connectWithSeeds(listener); - } - } - - private void handshakeAndConnect(Iterator seedNodes, - final TransportService transportService, TimeValue connectTimeout, ActionListener listener, - boolean connect) { - try { - if (seedNodes.hasNext()) { - final DiscoveryNode seedNode = seedNodes.next(); - final DiscoveryNode handshakeNode; - if (connect) { - try (Transport.Connection connection = transportService.openConnection(seedNode, ConnectionProfile.LIGHT_PROFILE)) { - handshakeNode = transportService.handshake(connection, connectTimeout.millis(), (c) -> true); - transportService.connectToNode(handshakeNode, remoteProfile); - clusterNodes.add(handshakeNode); - } - } else { - handshakeNode = seedNode; - } - ClusterStateRequest request = new ClusterStateRequest(); - request.clear(); - request.nodes(true); - transportService.sendRequest(transportService.getConnection(handshakeNode), - ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY, - new TransportResponseHandler() { - - @Override - public ClusterStateResponse newInstance() { - return new ClusterStateResponse(); - } - - @Override - public void handleResponse(ClusterStateResponse response) { - DiscoveryNodes nodes = response.getState().nodes(); - Iterable nodesIter = nodes.getDataNodes()::valuesIt; - for (DiscoveryNode node : nodesIter) { - transportService.connectToNode(node); // noop if node is connected - clusterNodes.add(node); - } - listener.onResponse(null); - } - - @Override - public void handleException(TransportException exp) { - logger.warn((Supplier) () -> new ParameterizedMessage("fetching nodes from external cluster {} failed", - clusterName), exp); - handshakeAndConnect(seedNodes, transportService, connectTimeout, listener, connect); - } - - @Override - public String executor() { - return ThreadPool.Names.MANAGEMENT; - } - }); - } else { - listener.onFailure(new IllegalStateException("no seed node left")); - } - } catch (IOException ex) { - if (seedNodes.hasNext()) { - logger.debug((Supplier) () -> new ParameterizedMessage("fetching nodes from external cluster {} failed", - clusterName), ex); - handshakeAndConnect(seedNodes, transportService, connectTimeout, listener, connect); - } else { - listener.onFailure(ex); - } - } + synchronized void updateSeedNodes(List seedNodes, ActionListener connectListener) { + this.seedNodes = new ArrayList<>(seedNodes); + connectHandler.handshake(connectListener); } @Override @@ -180,7 +105,7 @@ class RemoteClusterConnection extends AbstractComponent implements TransportConn // try to reconnect ActionListener listener = ActionListener.wrap(x -> {}, e -> logger.error("failed to establish connection to remote cluster", e)); - connectWithSeeds(listener); + connectHandler.handshake(listener); } } @@ -193,8 +118,8 @@ class RemoteClusterConnection extends AbstractComponent implements TransportConn } } if (seenNotConnectedNode) { - final TimeValue connectTimeout = TimeValue.timeValueSeconds(10); // TODO make configurable - handshakeAndConnect(clusterNodes.iterator(), transportService, connectTimeout, + TimeValue connectTimeout = TimeValue.timeValueSeconds(10); // TODO make configurable + connectHandler.handshakeAndConnect(clusterNodes.iterator(), transportService, connectTimeout, ActionListener.wrap((x) -> { }, x -> { }), false); // nocommit handle exceptions here what should we do @@ -203,6 +128,14 @@ class RemoteClusterConnection extends AbstractComponent implements TransportConn public void fetchSearchShards(SearchRequest searchRequest, final List indices, ActionListener listener) { + if (clusterNodes.isEmpty()) { + connectHandler.handshake(ActionListener.wrap((x) -> fetchSearchShards(searchRequest, indices, listener), listener::onFailure)); + } else { + fetchShardsInternal(searchRequest, indices, listener); + } + } + + private void fetchShardsInternal(SearchRequest searchRequest, List indices, final ActionListener listener) { final DiscoveryNode node = nodeSupplier.get(); ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices.toArray(new String[indices.size()])) .indicesOptions(searchRequest.indicesOptions()).local(true).preference(searchRequest.preference()) @@ -236,4 +169,145 @@ class RemoteClusterConnection extends AbstractComponent implements TransportConn public String getClusterName() { return clusterName; } + + + private class ConnectHandler { + private Semaphore running = new Semaphore(1); + private BlockingQueue> queue = new ArrayBlockingQueue<>(100); + + public void handshake(ActionListener connectListener) { + final boolean runConnect; + final Collection> toNotify; + synchronized (queue) { + if (connectListener != null && queue.offer(connectListener) == false) { + throw new IllegalStateException("connect queue is full"); + } + if (queue.isEmpty()) { + return; + } + runConnect = running.tryAcquire(); + if (runConnect) { + toNotify = new ArrayList<>(); + queue.drainTo(toNotify); + } else { + toNotify = Collections.emptyList(); + } + } + if (runConnect) { + forkConnect(toNotify); + } + + } + + private void forkConnect(final Collection> toNotify) { + ThreadPool threadPool = transportService.getThreadPool(); + ExecutorService executor = threadPool.executor(ThreadPool.Names.MANAGEMENT); + executor.submit(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + synchronized (queue) { + running.release(); + } + for (ActionListener queuedListener : toNotify) { + queuedListener.onFailure(e); + } + } + + @Override + protected void doRun() throws Exception { + ActionListener listener = ActionListener.wrap((x) -> { + + synchronized (queue) { + running.release(); + } + for (ActionListener queuedListener : toNotify) { + queuedListener.onResponse(x); + } + handshake(null); + }, + (e) -> { + synchronized (queue) { + running.release(); + } + for (ActionListener queuedListener : toNotify) { + queuedListener.onFailure(e); + } + handshake(null); + }); + TimeValue connectTimeout = TimeValue.timeValueSeconds(10); // TODO make configurable + Iterator iterator = Collections.synchronizedList(seedNodes).iterator(); + handshakeAndConnect(iterator, transportService, connectTimeout, listener, true); + } + }); + + } + + + + void handshakeAndConnect(Iterator seedNodes, + final TransportService transportService, TimeValue connectTimeout, ActionListener listener, + boolean connect) { + try { + if (seedNodes.hasNext()) { + final DiscoveryNode seedNode = seedNodes.next(); + final DiscoveryNode handshakeNode; + if (connect) { + try (Transport.Connection connection = transportService.openConnection(seedNode, ConnectionProfile.LIGHT_PROFILE)) { + handshakeNode = transportService.handshake(connection, connectTimeout.millis(), (c) -> true); + transportService.connectToNode(handshakeNode, remoteProfile); + clusterNodes.add(handshakeNode); + } + } else { + handshakeNode = seedNode; + } + ClusterStateRequest request = new ClusterStateRequest(); + request.clear(); + request.nodes(true); + transportService.sendRequest(transportService.getConnection(handshakeNode), + ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY, + new TransportResponseHandler() { + + @Override + public ClusterStateResponse newInstance() { + return new ClusterStateResponse(); + } + + @Override + public void handleResponse(ClusterStateResponse response) { + DiscoveryNodes nodes = response.getState().nodes(); + Iterable nodesIter = nodes.getDataNodes()::valuesIt; + for (DiscoveryNode node : nodesIter) { + transportService.connectToNode(node); // noop if node is connected + clusterNodes.add(node); + } + listener.onResponse(null); + } + + @Override + public void handleException(TransportException exp) { + logger.warn((Supplier) () -> new ParameterizedMessage("fetching nodes from external cluster {} failed", + clusterName), exp); + handshakeAndConnect(seedNodes, transportService, connectTimeout, listener, connect); + } + + @Override + public String executor() { + return ThreadPool.Names.MANAGEMENT; + } + }); + } else { + listener.onFailure(new IllegalStateException("no seed node left")); + } + } catch (IOException ex) { + if (seedNodes.hasNext()) { + logger.debug((Supplier) () -> new ParameterizedMessage("fetching nodes from external cluster {} failed", + clusterName), ex); + handshakeAndConnect(seedNodes, transportService, connectTimeout, listener, connect); + } else { + listener.onFailure(ex); + } + } + } + } + } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 740f4e6d23c..cf44fb37b95 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -106,25 +106,9 @@ public class SearchTransportService extends AbstractComponent { } public void setupRemoteClusters() { - // nocommit we have to figure out a good way to set-up these connections setRemoteClusters(REMOTE_CLUSTERS_SEEDS.get(settings)); } - private void connect() { - int size = remoteClusters.size(); - CountDownLatch latch = new CountDownLatch(size); - for (RemoteClusterConnection connection : remoteClusters.values()) { - connection.connectWithSeeds(ActionListener.wrap(x -> latch.countDown(), ex -> { - throw new Error("failed to connect to to remote cluster " + connection.getClusterName(), ex); - })); - } - try { - latch.await(); // NOCOMMIT timeout? - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - private static void validateRemoteClustersSeeds(Settings settings) { //TODO do we need a static whitelist like in reindex from remote? for (String clusterName : settings.names()) { @@ -195,14 +179,13 @@ public class SearchTransportService extends AbstractComponent { if (remote == null) { remote = new RemoteClusterConnection(settings, entry.getKey(), entry.getValue(), transportService); remoteClusters.put(entry.getKey(), remote); - } else { - remote.updateSeedNodes(entry.getValue()); } + remote.updateSeedNodes(entry.getValue(), ActionListener.wrap((x) -> {}, + e -> logger.error("failed to update seed list for cluster: " + entry.getKey(), e) )); } if (remoteClusters.isEmpty() == false) { remoteClusters.putAll(this.remoteClusters); this.remoteClusters = Collections.unmodifiableMap(remoteClusters); - connect(); //nocommit this sucks as it's executed on the state update thread } } @@ -214,10 +197,6 @@ public class SearchTransportService extends AbstractComponent { return remoteClusters.containsKey(clusterName); } - void connectToRemoteNode(DiscoveryNode remoteNode) { - transportService.connectToNode(remoteNode); - } - void sendSearchShards(SearchRequest searchRequest, Map> remoteIndicesByCluster, ActionListener> listener) { final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size()); diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index f655fadf5b7..9b2d967f65c 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -279,13 +279,21 @@ public class TransportSearchAction extends HandledTransportAction builder = ImmutableOpenMap.builder(nodes.getNodes()); + ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); for (DiscoveryNode remoteNode : remoteNodes) { - //TODO shall we catch connect exceptions here? Otherwise we will return an error but we could rather return partial results? - searchTransportService.connectToRemoteNode(remoteNode); builder.put(remoteNode.getId(), remoteNode); } - return builder.build()::get; + ImmutableOpenMap remoteNodesMap = builder.build(); + return (nodeId) -> { + DiscoveryNode discoveryNode = nodes.get(nodeId); + if (discoveryNode == null) { + discoveryNode = remoteNodesMap.get(nodeId); + } + if (discoveryNode == null) { + throw new IllegalArgumentException("no node found for id: " + nodeId); + } + return discoveryNode; + }; } @Override diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index b526390b7e5..9ed0603bdf4 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -1182,4 +1182,11 @@ public class TransportService extends AbstractLifecycleComponent { return "direct"; } } + + /** + * Returns the internal thread pool + */ + public ThreadPool getThreadPool() { + return threadPool; + } } diff --git a/core/src/test/java/org/elasticsearch/action/search/RemoteClusterConnectionTests.java b/core/src/test/java/org/elasticsearch/action/search/RemoteClusterConnectionTests.java index 4d0e0af4488..07f88134b22 100644 --- a/core/src/test/java/org/elasticsearch/action/search/RemoteClusterConnectionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/RemoteClusterConnectionTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import java.util.Arrays; +import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -59,7 +60,7 @@ public class RemoteClusterConnectionTests extends ESIntegTestCase { exceptionAtomicReference.set(x); latch.countDown(); }); - connection.connectWithSeeds(listener); + connection.updateSeedNodes(Arrays.asList(node),listener); latch.await(); assertTrue(service.nodeConnected(node)); Iterable nodesIterable = dataNodes::valuesIt; @@ -79,28 +80,35 @@ public class RemoteClusterConnectionTests extends ESIntegTestCase { try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { service.start(); service.acceptIncomingRequests(); - RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", Arrays.asList(node), service); + final boolean hasInitialNodes = randomBoolean(); + RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", + hasInitialNodes ? Arrays.asList(node) : Collections.emptyList(), service); CountDownLatch latch = new CountDownLatch(1); - AtomicReference exceptionAtomicReference = new AtomicReference<>(); - ActionListener listener = ActionListener.wrap(x -> latch.countDown(), x -> { - exceptionAtomicReference.set(x); - latch.countDown(); - }); - connection.connectWithSeeds(listener); - latch.await(); - String newNode = internalCluster().startDataOnlyNode(); - createIndex("test-index"); - assertTrue(service.nodeConnected(node)); - Iterable nodesIterable = dataNodes::valuesIt; - for (DiscoveryNode dataNode : nodesIterable) { - if (dataNode.getName().equals(newNode)) { - assertFalse(service.nodeConnected(dataNode)); - } else { - assertTrue(service.nodeConnected(dataNode)); + if (hasInitialNodes == false) { + AtomicReference exceptionAtomicReference = new AtomicReference<>(); + ActionListener listener = ActionListener.wrap(x -> latch.countDown(), x -> { + exceptionAtomicReference.set(x); + latch.countDown(); + }); + connection.updateSeedNodes(Arrays.asList(node), listener); + latch.await(); + + String newNode = internalCluster().startDataOnlyNode(); + createIndex("test-index"); + assertTrue(service.nodeConnected(node)); + Iterable nodesIterable = dataNodes::valuesIt; + for (DiscoveryNode dataNode : nodesIterable) { + if (dataNode.getName().equals(newNode)) { + assertFalse(service.nodeConnected(dataNode)); + } else { + assertTrue(service.nodeConnected(dataNode)); + } } + assertNull(exceptionAtomicReference.get()); + } else { + createIndex("test-index"); } - assertNull(exceptionAtomicReference.get()); SearchRequest request = new SearchRequest("test-index"); CountDownLatch responseLatch = new CountDownLatch(1);