From bd8470e738c5bf460963b7300e3c06bdeb023de8 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 25 Jul 2019 22:51:04 +0200 Subject: [PATCH] Asynchronously connect to remote clusters (#44825) Refactors RemoteClusterConnection so that it no longer blockingly connects to remote clusters. Relates to #40150 --- .../transport/RemoteClusterConnection.java | 393 ++++++++---------- .../RemoteClusterConnectionTests.java | 30 +- 2 files changed, 186 insertions(+), 237 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index eee1312d952..eecb6ef0dba 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -25,11 +25,11 @@ import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.StepListener; import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.support.ContextPreservingActionListener; -import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -38,7 +38,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.internal.io.IOUtils; @@ -48,17 +47,14 @@ import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; @@ -138,7 +134,7 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos if (proxyAddress == null || proxyAddress.isEmpty()) { return node; } else { - // resovle proxy address lazy here + // resolve proxy address lazy here InetSocketAddress proxyInetAddress = RemoteClusterAware.parseSeedAddress(proxyAddress); return new DiscoveryNode(node.getName(), node.getId(), node.getEphemeralId(), node.getHostName(), node .getHostAddress(), new TransportAddress(proxyInetAddress), node.getAttributes(), node.getRoles(), node.getVersion()); @@ -175,7 +171,9 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos public void onNodeDisconnected(DiscoveryNode node) { if (connectionManager.size() < maxNumRemoteConnections) { // try to reconnect and fill up the slot of the disconnected node - connectHandler.forceConnect(); + connectHandler.connect(ActionListener.wrap( + ignore -> logger.trace("successfully connected after disconnect of {}", node), + e -> logger.trace(() -> new ParameterizedMessage("failed to connect after disconnect of {}", node), e))); } } @@ -344,201 +342,178 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos * we will just reject the connect trigger which will lead to failing searches. */ private class ConnectHandler implements Closeable { - private final Semaphore running = new Semaphore(1); + private static final int MAX_LISTENERS = 100; private final AtomicBoolean closed = new AtomicBoolean(false); - private final BlockingQueue> queue = new ArrayBlockingQueue<>(100); - private final CancellableThreads cancellableThreads = new CancellableThreads(); - - /** - * Triggers a connect round iff there are pending requests queued up and if there is no - * connect round currently running. - */ - void maybeConnect() { - connect(null); - } + private final Object mutex = new Object(); + private List> listeners = new ArrayList<>(); /** * Triggers a connect round unless there is one running already. If there is a connect round running, the listener will either * be queued or rejected and failed. */ void connect(ActionListener connectListener) { - connect(connectListener, false); - } - - /** - * Triggers a connect round unless there is one already running. In contrast to {@link #maybeConnect()} will this method also - * trigger a connect round if there is no listener queued up. - */ - void forceConnect() { - connect(null, true); - } - - private void connect(ActionListener connectListener, boolean forceRun) { - final boolean runConnect; - final Collection> toNotify; - final ActionListener listener = connectListener == null ? null : + boolean runConnect = false; + final ActionListener listener = ContextPreservingActionListener.wrapPreservingContext(connectListener, threadPool.getThreadContext()); - synchronized (queue) { - if (listener != null && queue.offer(listener) == false) { - listener.onFailure(new RejectedExecutionException("connect queue is full")); - return; - } - if (forceRun == false && queue.isEmpty()) { - return; - } - runConnect = running.tryAcquire(); - if (runConnect) { - toNotify = new ArrayList<>(); - queue.drainTo(toNotify); - if (closed.get()) { - running.release(); - ActionListener.onFailure(toNotify, new AlreadyClosedException("connect handler is already closed")); - return; - } + synchronized (mutex) { + if (closed.get()) { + assert listeners.isEmpty(); } else { - toNotify = Collections.emptyList(); + if (listeners.size() >= MAX_LISTENERS) { + assert listeners.size() == MAX_LISTENERS; + listener.onFailure(new RejectedExecutionException("connect queue is full")); + return; + } else { + listeners.add(listener); + } + runConnect = listeners.size() == 1; } } + if (closed.get()) { + connectListener.onFailure(new AlreadyClosedException("connect handler is already closed")); + return; + } if (runConnect) { - forkConnect(toNotify); + ExecutorService executor = threadPool.executor(ThreadPool.Names.MANAGEMENT); + executor.submit(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + ActionListener.onFailure(getAndClearListeners(), e); + } + + @Override + protected void doRun() { + collectRemoteNodes(seedNodes.stream().map(Tuple::v2).iterator(), + new ActionListener() { + @Override + public void onResponse(Void aVoid) { + ActionListener.onResponse(getAndClearListeners(), aVoid); + } + + @Override + public void onFailure(Exception e) { + ActionListener.onFailure(getAndClearListeners(), e); + } + }); + } + }); } } - private void forkConnect(final Collection> toNotify) { - ExecutorService executor = threadPool.executor(ThreadPool.Names.MANAGEMENT); - executor.submit(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - synchronized (queue) { - running.release(); - } - try { - ActionListener.onFailure(toNotify, e); - } finally { - maybeConnect(); - } + private List> getAndClearListeners() { + final List> result; + synchronized (mutex) { + if (listeners.isEmpty()) { + result = Collections.emptyList(); + } else { + result = listeners; + listeners = new ArrayList<>(); } - - @Override - protected void doRun() { - ActionListener listener = ActionListener.wrap((x) -> { - synchronized (queue) { - running.release(); - } - try { - ActionListener.onResponse(toNotify, x); - } finally { - maybeConnect(); - } - - }, (e) -> { - synchronized (queue) { - running.release(); - } - try { - ActionListener.onFailure(toNotify, e); - } finally { - maybeConnect(); - } - }); - collectRemoteNodes(seedNodes.stream().map(Tuple::v2).iterator(), transportService, connectionManager, listener); - } - }); + } + return result; } - private void collectRemoteNodes(Iterator> seedNodes, final TransportService transportService, - final ConnectionManager manager, ActionListener listener) { + private void collectRemoteNodes(Iterator> seedNodes, ActionListener listener) { if (Thread.currentThread().isInterrupted()) { listener.onFailure(new InterruptedException("remote connect thread got interrupted")); } - try { - if (seedNodes.hasNext()) { - cancellableThreads.executeIO(() -> { - final DiscoveryNode seedNode = maybeAddProxyAddress(proxyAddress, seedNodes.next().get()); - logger.debug("[{}] opening connection to seed node: [{}] proxy address: [{}]", clusterAlias, seedNode, - proxyAddress); - final TransportService.HandshakeResponse handshakeResponse; - final ConnectionProfile profile = ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG); - final Transport.Connection connection = PlainActionFuture.get( - fut -> manager.openConnection(seedNode, profile, fut)); - boolean success = false; - try { - try { - ConnectionProfile connectionProfile = connectionManager.getConnectionProfile(); - handshakeResponse = PlainActionFuture.get(fut -> - transportService.handshake(connection, connectionProfile.getHandshakeTimeout().millis(), - getRemoteClusterNamePredicate(), fut)); - } catch (IllegalStateException ex) { - logger.warn(new ParameterizedMessage("failed to connect to seed node [{}]", connection.getNode()), ex); - throw ex; - } - final DiscoveryNode handshakeNode = maybeAddProxyAddress(proxyAddress, handshakeResponse.getDiscoveryNode()); - if (nodePredicate.test(handshakeNode) && manager.size() < maxNumRemoteConnections) { - PlainActionFuture.get(fut -> manager.connectToNode(handshakeNode, null, - transportService.connectionValidator(handshakeNode), ActionListener.map(fut, x -> null))); - if (remoteClusterName.get() == null) { - assert handshakeResponse.getClusterName().value() != null; - remoteClusterName.set(handshakeResponse.getClusterName()); - } - } - ClusterStateRequest request = new ClusterStateRequest(); - request.clear(); - request.nodes(true); - // here we pass on the connection since we can only close it once the sendRequest returns otherwise - // due to the async nature (it will return before it's actually sent) this can cause the request to fail - // due to an already closed connection. - ThreadPool threadPool = transportService.getThreadPool(); - ThreadContext threadContext = threadPool.getThreadContext(); - TransportService.ContextRestoreResponseHandler responseHandler = new TransportService - .ContextRestoreResponseHandler<>(threadContext.newRestorableContext(false), - new SniffClusterStateResponseHandler(connection, listener, seedNodes, - cancellableThreads)); - try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { - // we stash any context here since this is an internal execution and should not leak any - // existing context information. - threadContext.markAsSystemContext(); - transportService.sendRequest(connection, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY, - responseHandler); - } - success = true; - } finally { - if (success == false) { - connection.close(); - } + if (seedNodes.hasNext()) { + final DiscoveryNode seedNode = maybeAddProxyAddress(proxyAddress, seedNodes.next().get()); + logger.debug("[{}] opening connection to seed node: [{}] proxy address: [{}]", clusterAlias, seedNode, + proxyAddress); + final ConnectionProfile profile = ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG); + + final StepListener openConnectionStep = new StepListener<>(); + connectionManager.openConnection(seedNode, profile, openConnectionStep); + + final Consumer onFailure = e -> { + if (e instanceof ConnectTransportException || + e instanceof IOException || + e instanceof IllegalStateException) { + // ISE if we fail the handshake with an version incompatible node + if (seedNodes.hasNext()) { + logger.debug(() -> new ParameterizedMessage( + "fetching nodes from external cluster [{}] failed moving to next node", clusterAlias), e); + collectRemoteNodes(seedNodes, listener); + return; } - }); - } else { - listener.onFailure(new IllegalStateException("no seed node left")); - } - } catch (CancellableThreads.ExecutionCancelledException ex) { - logger.warn(() -> new ParameterizedMessage("fetching nodes from external cluster [{}] failed", clusterAlias), ex); - listener.onFailure(ex); // we got canceled - fail the listener and step out - } catch (ConnectTransportException | IOException | IllegalStateException ex) { - // ISE if we fail the handshake with an version incompatible node - if (seedNodes.hasNext()) { - logger.debug(() -> new ParameterizedMessage("fetching nodes from external cluster [{}] failed moving to next node", - clusterAlias), ex); - collectRemoteNodes(seedNodes, transportService, manager, listener); - } else { - logger.warn(() -> new ParameterizedMessage("fetching nodes from external cluster [{}] failed", clusterAlias), ex); - listener.onFailure(ex); - } + } + logger.warn(() -> new ParameterizedMessage("fetching nodes from external cluster [{}] failed", clusterAlias), e); + listener.onFailure(e); + }; + + final StepListener handShakeStep = new StepListener<>(); + openConnectionStep.whenComplete(connection -> { + ConnectionProfile connectionProfile = connectionManager.getConnectionProfile(); + transportService.handshake(connection, connectionProfile.getHandshakeTimeout().millis(), + getRemoteClusterNamePredicate(), handShakeStep); + }, onFailure); + + final StepListener fullConnectionStep = new StepListener<>(); + handShakeStep.whenComplete(handshakeResponse -> { + final DiscoveryNode handshakeNode = maybeAddProxyAddress(proxyAddress, handshakeResponse.getDiscoveryNode()); + + if (nodePredicate.test(handshakeNode) && connectionManager.size() < maxNumRemoteConnections) { + connectionManager.connectToNode(handshakeNode, null, + transportService.connectionValidator(handshakeNode), fullConnectionStep); + } else { + fullConnectionStep.onResponse(null); + } + }, e -> { + final Transport.Connection connection = openConnectionStep.result(); + logger.warn(new ParameterizedMessage("failed to connect to seed node [{}]", connection.getNode()), e); + IOUtils.closeWhileHandlingException(connection); + onFailure.accept(e); + }); + + fullConnectionStep.whenComplete(aVoid -> { + if (remoteClusterName.get() == null) { + TransportService.HandshakeResponse handshakeResponse = handShakeStep.result(); + assert handshakeResponse.getClusterName().value() != null; + remoteClusterName.set(handshakeResponse.getClusterName()); + } + final Transport.Connection connection = openConnectionStep.result(); + + ClusterStateRequest request = new ClusterStateRequest(); + request.clear(); + request.nodes(true); + // here we pass on the connection since we can only close it once the sendRequest returns otherwise + // due to the async nature (it will return before it's actually sent) this can cause the request to fail + // due to an already closed connection. + ThreadPool threadPool = transportService.getThreadPool(); + ThreadContext threadContext = threadPool.getThreadContext(); + TransportService.ContextRestoreResponseHandler responseHandler = new TransportService + .ContextRestoreResponseHandler<>(threadContext.newRestorableContext(false), + new SniffClusterStateResponseHandler(connection, listener, seedNodes)); + try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { + // we stash any context here since this is an internal execution and should not leak any + // existing context information. + threadContext.markAsSystemContext(); + transportService.sendRequest(connection, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY, + responseHandler); + } + }, e -> { + IOUtils.closeWhileHandlingException(openConnectionStep.result()); + onFailure.accept(e); + }); + } else { + listener.onFailure(new IllegalStateException("no seed node left")); } } @Override public void close() throws IOException { - try { + final List> toNotify; + synchronized (mutex) { if (closed.compareAndSet(false, true)) { - cancellableThreads.cancel("connect handler is closed"); - running.acquire(); // acquire the semaphore to ensure all connections are closed and all thread joined - running.release(); - maybeConnect(); // now go and notify pending listeners + toNotify = listeners; + listeners = Collections.emptyList(); + } else { + toNotify = Collections.emptyList(); } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); } + ActionListener.onFailure(toNotify, new AlreadyClosedException("connect handler is already closed")); } final boolean isClosed() { @@ -551,15 +526,12 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos private final Transport.Connection connection; private final ActionListener listener; private final Iterator> seedNodes; - private final CancellableThreads cancellableThreads; SniffClusterStateResponseHandler(Transport.Connection connection, ActionListener listener, - Iterator> seedNodes, - CancellableThreads cancellableThreads) { + Iterator> seedNodes) { this.connection = connection; this.listener = listener; this.seedNodes = seedNodes; - this.cancellableThreads = cancellableThreads; } @Override @@ -569,43 +541,44 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos @Override public void handleResponse(ClusterStateResponse response) { - try { - if (remoteClusterName.get() == null) { - assert response.getClusterName().value() != null; - remoteClusterName.set(response.getClusterName()); - } - try (Closeable theConnection = connection) { // the connection is unused - see comment in #collectRemoteNodes - // we have to close this connection before we notify listeners - this is mainly needed for test correctness - // since if we do it afterwards we might fail assertions that check if all high level connections are closed. - // from a code correctness perspective we could also close it afterwards. This try/with block will - // maintain the possibly exceptions thrown from within the try block and suppress the ones that are possible thrown - // by closing the connection - cancellableThreads.executeIO(() -> { - DiscoveryNodes nodes = response.getState().nodes(); - Iterable nodesIter = nodes.getNodes()::valuesIt; - for (DiscoveryNode n : nodesIter) { - DiscoveryNode node = maybeAddProxyAddress(proxyAddress, n); - if (nodePredicate.test(node) && connectionManager.size() < maxNumRemoteConnections) { - try { - // noop if node is connected - PlainActionFuture.get(fut -> connectionManager.connectToNode(node, null, - transportService.connectionValidator(node), ActionListener.map(fut, x -> null))); - } catch (ConnectTransportException | IllegalStateException ex) { + handleNodes(response.getState().nodes().getNodes().valuesIt()); + } + + private void handleNodes(Iterator nodesIter) { + while (nodesIter.hasNext()) { + final DiscoveryNode node = maybeAddProxyAddress(proxyAddress, nodesIter.next()); + if (nodePredicate.test(node) && connectionManager.size() < maxNumRemoteConnections) { + connectionManager.connectToNode(node, null, + transportService.connectionValidator(node), new ActionListener() { + @Override + public void onResponse(Void aVoid) { + handleNodes(nodesIter); + } + + @Override + public void onFailure(Exception e) { + if (e instanceof ConnectTransportException || + e instanceof IllegalStateException) { // ISE if we fail the handshake with an version incompatible node // fair enough we can't connect just move on - logger.debug(() -> new ParameterizedMessage("failed to connect to node {}", node), ex); + logger.debug(() -> new ParameterizedMessage("failed to connect to node {}", node), e); + handleNodes(nodesIter); + } else { + logger.warn(() -> + new ParameterizedMessage("fetching nodes from external cluster {} failed", clusterAlias), e); + IOUtils.closeWhileHandlingException(connection); + collectRemoteNodes(seedNodes, listener); } } - } - }); + }); + return; } - listener.onResponse(null); - } catch (CancellableThreads.ExecutionCancelledException ex) { - listener.onFailure(ex); // we got canceled - fail the listener and step out - } catch (Exception ex) { - logger.warn(() -> new ParameterizedMessage("fetching nodes from external cluster {} failed", clusterAlias), ex); - collectRemoteNodes(seedNodes, transportService, connectionManager, listener); } + // We have to close this connection before we notify listeners - this is mainly needed for test correctness + // since if we do it afterwards we might fail assertions that check if all high level connections are closed. + // from a code correctness perspective we could also close it afterwards. + IOUtils.closeWhileHandlingException(connection); + listener.onResponse(null); } @Override @@ -615,7 +588,7 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos IOUtils.closeWhileHandlingException(connection); } finally { // once the connection is closed lets try the next node - collectRemoteNodes(seedNodes, transportService, connectionManager, listener); + collectRemoteNodes(seedNodes, listener); } } @@ -627,7 +600,9 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos } boolean assertNoRunningConnections() { // for testing only - assert connectHandler.running.availablePermits() == 1; + synchronized (connectHandler.mutex) { + assert connectHandler.listeners.isEmpty(); + } return true; } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index c44f609f77b..2488551f7d6 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -47,7 +47,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.core.internal.io.IOUtils; @@ -432,20 +431,6 @@ public class RemoteClusterConnectionTests extends ESTestCase { ActionListener listener = ActionListener.wrap( x -> latch.countDown(), x -> { - /* - * This can occur on a thread submitted to the thread pool while we are closing the - * remote cluster connection at the end of the test. - */ - if (x instanceof CancellableThreads.ExecutionCancelledException) { - try { - // we should already be shutting down - assertEquals(0L, latch.getCount()); - } finally { - // ensure we count down the latch on failure as well to not prevent failing tests from ending - latch.countDown(); - } - return; - } exceptionAtomicReference.set(x); latch.countDown(); } @@ -580,7 +565,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { closeRemote.countDown(); listenerCalled.await(); assertNotNull(exceptionReference.get()); - expectThrows(CancellableThreads.ExecutionCancelledException.class, () -> { + expectThrows(AlreadyClosedException.class, () -> { throw exceptionReference.get(); }); @@ -640,16 +625,6 @@ public class RemoteClusterConnectionTests extends ESTestCase { latch.countDown(); }, x -> { - /* - * This can occur on a thread submitted to the thread pool while we are closing the - * remote cluster connection at the end of the test. - */ - if (x instanceof CancellableThreads.ExecutionCancelledException) { - // we should already be shutting down - assertTrue(executed.get()); - return; - } - assertTrue(executed.compareAndSet(false, true)); latch.countDown(); @@ -737,8 +712,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { throw assertionError; } } - if (x instanceof RejectedExecutionException || x instanceof AlreadyClosedException - || x instanceof CancellableThreads.ExecutionCancelledException) { + if (x instanceof RejectedExecutionException || x instanceof AlreadyClosedException) { // that's fine } else { throw new AssertionError(x);