diff --git a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java index 84c337399d5..0c6be15fd92 100644 --- a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java @@ -44,6 +44,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -62,6 +63,7 @@ public class ConnectionManager implements Closeable { private final TimeValue pingSchedule; private final ConnectionProfile defaultProfile; private final Lifecycle lifecycle = new Lifecycle(); + private final AtomicBoolean closed = new AtomicBoolean(false); private final ReadWriteLock closeLock = new ReentrantReadWriteLock(); private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener(); @@ -83,7 +85,9 @@ public class ConnectionManager implements Closeable { } public void addListener(TransportConnectionListener listener) { - this.connectionListener.listeners.add(listener); + if (connectionListener.listeners.contains(listener) == false) { + this.connectionListener.listeners.add(listener); + } } public void removeListener(TransportConnectionListener listener) { @@ -186,45 +190,50 @@ public class ConnectionManager implements Closeable { } } - public int connectedNodeCount() { + /** + * Returns the number of nodes this manager is connected to. + */ + public int size() { return connectedNodes.size(); } @Override public void close() { - lifecycle.moveToStopped(); - CountDownLatch latch = new CountDownLatch(1); + if (closed.compareAndSet(false, true)) { + lifecycle.moveToStopped(); + CountDownLatch latch = new CountDownLatch(1); - // TODO: Consider moving all read/write lock (in Transport and this class) to the TransportService - threadPool.generic().execute(() -> { - closeLock.writeLock().lock(); - try { - // we are holding a write lock so nobody modifies the connectedNodes / openConnections map - it's safe to first close - // all instances and then clear them maps - Iterator> iterator = connectedNodes.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry next = iterator.next(); - try { - IOUtils.closeWhileHandlingException(next.getValue()); - } finally { - iterator.remove(); + // TODO: Consider moving all read/write lock (in Transport and this class) to the TransportService + threadPool.generic().execute(() -> { + closeLock.writeLock().lock(); + try { + // we are holding a write lock so nobody modifies the connectedNodes / openConnections map - it's safe to first close + // all instances and then clear them maps + Iterator> iterator = connectedNodes.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry next = iterator.next(); + try { + IOUtils.closeWhileHandlingException(next.getValue()); + } finally { + iterator.remove(); + } } + } finally { + closeLock.writeLock().unlock(); + latch.countDown(); + } + }); + + try { + try { + latch.await(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + // ignore } } finally { - closeLock.writeLock().unlock(); - latch.countDown(); + lifecycle.moveToClosed(); } - }); - - try { - try { - latch.await(30, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - // ignore - } - } finally { - lifecycle.moveToClosed(); } } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 15cf7899dc0..5621b385578 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -35,6 +35,7 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; @@ -80,16 +81,17 @@ import java.util.stream.Collectors; final class RemoteClusterConnection extends AbstractComponent implements TransportConnectionListener, Closeable { private final TransportService transportService; + private final ConnectionManager connectionManager; private final ConnectionProfile remoteProfile; private final ConnectedNodes connectedNodes; private final String clusterAlias; private final int maxNumRemoteConnections; private final Predicate nodePredicate; + private final ThreadPool threadPool; private volatile List> seedNodes; private volatile boolean skipUnavailable; private final ConnectHandler connectHandler; private SetOnce remoteClusterName = new SetOnce<>(); - private final ClusterName localClusterName; /** * Creates a new {@link RemoteClusterConnection} @@ -97,13 +99,14 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo * @param clusterAlias the configured alias of the cluster to connect to * @param seedNodes a list of seed nodes to discover eligible nodes from * @param transportService the local nodes transport service + * @param connectionManager the connection manager to use for this remote connection * @param maxNumRemoteConnections the maximum number of connections to the remote cluster * @param nodePredicate a predicate to filter eligible remote nodes to connect to */ RemoteClusterConnection(Settings settings, String clusterAlias, List> seedNodes, - TransportService transportService, int maxNumRemoteConnections, Predicate nodePredicate) { + TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections, + Predicate nodePredicate) { super(settings); - this.localClusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); this.transportService = transportService; this.maxNumRemoteConnections = maxNumRemoteConnections; this.nodePredicate = nodePredicate; @@ -122,7 +125,11 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo this.skipUnavailable = RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE .getConcreteSettingForNamespace(clusterAlias).get(settings); this.connectHandler = new ConnectHandler(); - transportService.addConnectionListener(this); + this.threadPool = transportService.threadPool; + this.connectionManager = connectionManager; + connectionManager.addListener(this); + // we register the transport service here as a listener to make sure we notify handlers on disconnect etc. + connectionManager.addListener(transportService); } /** @@ -183,8 +190,9 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo private void fetchShardsInternal(ClusterSearchShardsRequest searchShardsRequest, final ActionListener listener) { - final DiscoveryNode node = connectedNodes.getAny(); - transportService.sendRequest(node, ClusterSearchShardsAction.NAME, searchShardsRequest, + final DiscoveryNode node = getAnyConnectedNode(); + Transport.Connection connection = connectionManager.getConnection(node); + transportService.sendRequest(connection, ClusterSearchShardsAction.NAME, searchShardsRequest, TransportRequestOptions.EMPTY, new TransportResponseHandler() { @Override @@ -219,12 +227,16 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo request.clear(); request.nodes(true); request.local(true); // run this on the node that gets the request it's as good as any other - final DiscoveryNode node = connectedNodes.getAny(); - transportService.sendRequest(node, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY, + final DiscoveryNode node = getAnyConnectedNode(); + Transport.Connection connection = connectionManager.getConnection(node); + transportService.sendRequest(connection, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY, new TransportResponseHandler() { + @Override - public ClusterStateResponse newInstance() { - return new ClusterStateResponse(); + public ClusterStateResponse read(StreamInput in) throws IOException { + ClusterStateResponse response = new ClusterStateResponse(); + response.readFrom(in); + return response; } @Override @@ -261,11 +273,11 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo * If such node is not connected, the returned connection will be a proxy connection that redirects to it. */ Transport.Connection getConnection(DiscoveryNode remoteClusterNode) { - if (transportService.nodeConnected(remoteClusterNode)) { - return transportService.getConnection(remoteClusterNode); + if (connectionManager.nodeConnected(remoteClusterNode)) { + return connectionManager.getConnection(remoteClusterNode); } - DiscoveryNode discoveryNode = connectedNodes.getAny(); - Transport.Connection connection = transportService.getConnection(discoveryNode); + DiscoveryNode discoveryNode = getAnyConnectedNode(); + Transport.Connection connection = connectionManager.getConnection(discoveryNode); return new ProxyConnection(connection, remoteClusterNode); } @@ -317,33 +329,18 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo } Transport.Connection getConnection() { - return transportService.getConnection(getAnyConnectedNode()); + return connectionManager.getConnection(getAnyConnectedNode()); } @Override public void close() throws IOException { - connectHandler.close(); + IOUtils.close(connectHandler, connectionManager); } public boolean isClosed() { return connectHandler.isClosed(); } - private ConnectionProfile getRemoteProfile(ClusterName name) { - // we can only compare the cluster name to make a decision if we should use a remote profile - // we can't use a cluster UUID here since we could be connecting to that remote cluster before - // the remote node has joined its cluster and have a cluster UUID. The fact that we just lose a - // rather smallish optimization on the connection layer under certain situations where remote clusters - // have the same name as the local one is minor here. - // the alternative here is to complicate the remote infrastructure to also wait until we formed a cluster, - // gained a cluster UUID and then start connecting etc. we rather use this simplification in order to maintain simplicity - if (this.localClusterName.equals(name)) { - return null; - } else { - return remoteProfile; - } - } - /** * The connect handler manages node discovery and the actual connect to the remote cluster. * There is at most one connect job running at any time. If such a connect job is triggered @@ -387,7 +384,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo final boolean runConnect; final Collection> toNotify; final ActionListener listener = connectListener == null ? null : - ContextPreservingActionListener.wrapPreservingContext(connectListener, transportService.getThreadPool().getThreadContext()); + ContextPreservingActionListener.wrapPreservingContext(connectListener, threadPool.getThreadContext()); synchronized (queue) { if (listener != null && queue.offer(listener) == false) { listener.onFailure(new RejectedExecutionException("connect queue is full")); @@ -415,7 +412,6 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo } private void forkConnect(final Collection> toNotify) { - ThreadPool threadPool = transportService.getThreadPool(); ExecutorService executor = threadPool.executor(ThreadPool.Names.MANAGEMENT); executor.submit(new AbstractRunnable() { @Override @@ -452,13 +448,13 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo maybeConnect(); } }); - collectRemoteNodes(seedNodes.iterator(), transportService, listener); + collectRemoteNodes(seedNodes.iterator(), transportService, connectionManager, listener); } }); } private void collectRemoteNodes(Iterator> seedNodes, - final TransportService transportService, ActionListener listener) { + final TransportService transportService, final ConnectionManager manager, ActionListener listener) { if (Thread.currentThread().isInterrupted()) { listener.onFailure(new InterruptedException("remote connect thread got interrupted")); } @@ -467,7 +463,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo cancellableThreads.executeIO(() -> { final DiscoveryNode seedNode = seedNodes.next().get(); final TransportService.HandshakeResponse handshakeResponse; - Transport.Connection connection = transportService.openConnection(seedNode, + Transport.Connection connection = manager.openConnection(seedNode, ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null)); boolean success = false; try { @@ -482,7 +478,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo final DiscoveryNode handshakeNode = handshakeResponse.getDiscoveryNode(); if (nodePredicate.test(handshakeNode) && connectedNodes.size() < maxNumRemoteConnections) { - transportService.connectToNode(handshakeNode, getRemoteProfile(handshakeResponse.getClusterName())); + manager.connectToNode(handshakeNode, remoteProfile, transportService.connectionValidator(handshakeNode)); if (remoteClusterName.get() == null) { assert handshakeResponse.getClusterName().value() != null; remoteClusterName.set(handshakeResponse.getClusterName()); @@ -524,7 +520,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo // ISE if we fail the handshake with an version incompatible node if (seedNodes.hasNext()) { logger.debug(() -> new ParameterizedMessage("fetching nodes from external cluster {} failed", clusterAlias), ex); - collectRemoteNodes(seedNodes, transportService, listener); + collectRemoteNodes(seedNodes, transportService, manager, listener); } else { listener.onFailure(ex); } @@ -552,7 +548,6 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo /* This class handles the _state response from the remote cluster when sniffing nodes to connect to */ private class SniffClusterStateResponseHandler implements TransportResponseHandler { - private final TransportService transportService; private final Transport.Connection connection; private final ActionListener listener; private final Iterator> seedNodes; @@ -561,7 +556,6 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo SniffClusterStateResponseHandler(TransportService transportService, Transport.Connection connection, ActionListener listener, Iterator> seedNodes, CancellableThreads cancellableThreads) { - this.transportService = transportService; this.connection = connection; this.listener = listener; this.seedNodes = seedNodes; @@ -592,8 +586,8 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo for (DiscoveryNode node : nodesIter) { if (nodePredicate.test(node) && connectedNodes.size() < maxNumRemoteConnections) { try { - transportService.connectToNode(node, getRemoteProfile(remoteClusterName.get())); // noop if node is - // connected + connectionManager.connectToNode(node, remoteProfile, + transportService.connectionValidator(node)); // noop if node is connected connectedNodes.add(node); } catch (ConnectTransportException | IllegalStateException ex) { // ISE if we fail the handshake with an version incompatible node @@ -609,7 +603,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo listener.onFailure(ex); // we got canceled - fail the listener and step out } catch (Exception ex) { logger.warn(() -> new ParameterizedMessage("fetching nodes from external cluster {} failed", clusterAlias), ex); - collectRemoteNodes(seedNodes, transportService, listener); + collectRemoteNodes(seedNodes, transportService, connectionManager, listener); } } @@ -620,7 +614,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo IOUtils.closeWhileHandlingException(connection); } finally { // once the connection is closed lets try the next node - collectRemoteNodes(seedNodes, transportService, listener); + collectRemoteNodes(seedNodes, transportService, connectionManager, listener); } } @@ -715,4 +709,8 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo } } } + + ConnectionManager getConnectionManager() { + return connectionManager; + } } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index 956a0d94179..34f13b67287 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.transport; +import java.util.Collection; import java.util.function.Supplier; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -139,7 +140,8 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl } if (remote == null) { // this is a new cluster we have to add a new representation - remote = new RemoteClusterConnection(settings, entry.getKey(), entry.getValue(), transportService, numRemoteConnections, + remote = new RemoteClusterConnection(settings, entry.getKey(), entry.getValue(), transportService, + new ConnectionManager(settings, transportService.transport, transportService.threadPool), numRemoteConnections, getNodePredicate(settings)); remoteClusters.put(entry.getKey(), remote); } @@ -411,4 +413,8 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl } return new RemoteClusterAwareClient(settings, threadPool, transportService, clusterAlias); } + + Collection getConnections() { + return remoteClusters.values(); + } } diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index fb14ae96dbf..e37ea81211a 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -28,6 +28,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.AbstractLifecycleComponent; @@ -56,6 +57,7 @@ import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; +import java.io.UncheckedIOException; import java.net.UnknownHostException; import java.util.Arrays; import java.util.Collections; @@ -268,8 +270,9 @@ public class TransportService extends AbstractLifecycleComponent implements Tran @Override protected void doStop() { try { - connectionManager.close(); - transport.stop(); + IOUtils.close(connectionManager, remoteClusterService, transport::stop); + } catch (IOException e) { + throw new UncheckedIOException(e); } finally { // in case the transport is not connected to our local node (thus cleaned on node disconnect) // make sure to clean any leftover on going handles @@ -306,7 +309,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran @Override protected void doClose() throws IOException { - IOUtils.close(remoteClusterService, transport); + transport.close(); } /** @@ -364,14 +367,18 @@ public class TransportService extends AbstractLifecycleComponent implements Tran if (isLocalNode(node)) { return; } + connectionManager.connectToNode(node, connectionProfile, connectionValidator(node)); + } - connectionManager.connectToNode(node, connectionProfile, (newConnection, actualProfile) -> { + public CheckedBiConsumer connectionValidator(DiscoveryNode node) { + return (newConnection, actualProfile) -> { // We don't validate cluster names to allow for CCS connections. final DiscoveryNode remote = handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true).discoveryNode; if (validateConnections && node.equals(remote) == false) { throw new ConnectTransportException(node, "handshake failed. unexpected remote node " + remote); } - }); + }; + } /** @@ -562,8 +569,12 @@ public class TransportService extends AbstractLifecycleComponent implements Tran final TransportRequest request, final TransportRequestOptions options, TransportResponseHandler handler) { - - asyncSender.sendRequest(connection, action, request, options, handler); + try { + asyncSender.sendRequest(connection, action, request, options, handler); + } catch (NodeNotConnectedException ex) { + // the caller might not handle this so we invoke the handler + handler.handleException(ex); + } } /** diff --git a/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java b/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java index afc6b47483e..9baf2e1c956 100644 --- a/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java @@ -379,10 +379,10 @@ public class TransportClientNodesServiceTests extends ESTestCase { transportClientNodesService.addTransportAddresses(remoteService.getLocalDiscoNode().getAddress()); assertEquals(1, transportClientNodesService.connectedNodes().size()); - assertEquals(1, clientService.connectionManager().connectedNodeCount()); + assertEquals(1, clientService.connectionManager().size()); transportClientNodesService.doSample(); - assertEquals(1, clientService.connectionManager().connectedNodeCount()); + assertEquals(1, clientService.connectionManager().size()); establishedConnections.clear(); handler.blockRequest(); diff --git a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java index 473f5152e8f..b8c39c48f88 100644 --- a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java @@ -134,7 +134,7 @@ public class NodeConnectionsServiceTests extends ESTestCase { private void assertConnectedExactlyToNodes(ClusterState state) { assertConnected(state.nodes()); - assertThat(transportService.getConnectionManager().connectedNodeCount(), equalTo(state.nodes().getSize())); + assertThat(transportService.getConnectionManager().size(), equalTo(state.nodes().getSize())); } private void assertConnected(Iterable nodes) { diff --git a/server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java b/server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java index 3c099c32bde..bff5a2b122d 100644 --- a/server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java @@ -159,7 +159,7 @@ public class ConnectionManagerTests extends ESTestCase { assertFalse(connection.isClosed()); assertTrue(connectionManager.nodeConnected(node)); assertSame(connection, connectionManager.getConnection(node)); - assertEquals(1, connectionManager.connectedNodeCount()); + assertEquals(1, connectionManager.size()); assertEquals(1, nodeConnectedCount.get()); assertEquals(0, nodeDisconnectedCount.get()); @@ -169,7 +169,7 @@ public class ConnectionManagerTests extends ESTestCase { connection.close(); } assertTrue(connection.isClosed()); - assertEquals(0, connectionManager.connectedNodeCount()); + assertEquals(0, connectionManager.size()); assertEquals(1, nodeConnectedCount.get()); assertEquals(1, nodeDisconnectedCount.get()); } @@ -205,7 +205,7 @@ public class ConnectionManagerTests extends ESTestCase { assertTrue(connection.isClosed()); assertFalse(connectionManager.nodeConnected(node)); expectThrows(NodeNotConnectedException.class, () -> connectionManager.getConnection(node)); - assertEquals(0, connectionManager.connectedNodeCount()); + assertEquals(0, connectionManager.size()); assertEquals(0, nodeConnectedCount.get()); assertEquals(0, nodeDisconnectedCount.get()); } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java index 8cfec0a07f9..34e22fd20de 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java @@ -81,13 +81,15 @@ public class RemoteClusterClientTests extends ESTestCase { try (MockTransportService service = MockTransportService.createNewService(localSettings, Version.CURRENT, threadPool, null)) { Semaphore semaphore = new Semaphore(1); service.start(); - service.addConnectionListener(new TransportConnectionListener() { - @Override - public void onNodeDisconnected(DiscoveryNode node) { - if (remoteNode.equals(node)) { - semaphore.release(); + service.getRemoteClusterService().getConnections().forEach(con -> { + con.getConnectionManager().addListener(new TransportConnectionListener() { + @Override + public void onNodeDisconnected(DiscoveryNode node) { + if (remoteNode.equals(node)) { + semaphore.release(); + } } - } + }); }); // this test is not perfect since we might reconnect concurrently but it will fail most of the time if we don't have // the right calls in place in the RemoteAwareClient @@ -95,7 +97,9 @@ public class RemoteClusterClientTests extends ESTestCase { for (int i = 0; i < 10; i++) { semaphore.acquire(); try { - service.disconnectFromNode(remoteNode); + service.getRemoteClusterService().getConnections().forEach(con -> { + con.getConnectionManager().disconnectFromNode(remoteNode); + }); semaphore.acquire(); RemoteClusterService remoteClusterService = service.getRemoteClusterService(); Client client = remoteClusterService.getRemoteClusterClient(threadPool, "test"); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 3d0388ccfad..e40486d63dc 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -145,7 +145,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { } } - public void testLocalProfileIsUsedForLocalCluster() throws Exception { + public void testRemoteProfileIsUsedForLocalCluster() throws Exception { List knownNodes = new CopyOnWriteArrayList<>(); try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) { @@ -159,7 +159,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { updateSeedNodes(connection, Arrays.asList(() -> seedNode)); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); @@ -175,9 +175,12 @@ public class RemoteClusterConnectionTests extends ESTestCase { }); TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.BULK) .build(); - service.sendRequest(connection.getConnection(), ClusterSearchShardsAction.NAME, new ClusterSearchShardsRequest(), - options, futureHandler); - futureHandler.txGet(); + IllegalStateException ise = (IllegalStateException) expectThrows(SendRequestTransportException.class, () -> { + service.sendRequest(discoverableNode, + ClusterSearchShardsAction.NAME, new ClusterSearchShardsRequest(), options, futureHandler); + futureHandler.txGet(); + }).getCause(); + assertEquals(ise.getMessage(), "can't select channel size is 0 for types: [RECOVERY, BULK, STATE]"); } } } @@ -199,7 +202,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { updateSeedNodes(connection, Arrays.asList(() -> seedNode)); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); @@ -255,7 +258,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { updateSeedNodes(connection, Arrays.asList(() -> seedNode)); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); @@ -284,7 +287,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, Integer.MAX_VALUE, n -> true)) { + seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { updateSeedNodes(connection, seedNodes); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); @@ -311,7 +314,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { updateSeedNodes(connection, Arrays.asList(() -> seedNode)); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); @@ -360,7 +363,8 @@ public class RemoteClusterConnectionTests extends ESTestCase { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> n.equals(rejectedNode) == false)) { + Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, + n -> n.equals(rejectedNode) == false)) { updateSeedNodes(connection, Arrays.asList(() -> seedNode)); if (rejectedNode.equals(seedNode)) { assertFalse(service.nodeConnected(seedNode)); @@ -399,7 +403,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { expectThrows(Exception.class, () -> updateSeedNodes(connection, Arrays.asList(() -> seedNode))); assertFalse(service.nodeConnected(seedNode)); assertTrue(connection.assertNoRunningConnections()); @@ -462,7 +466,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { connection.addConnectedNode(seedNode); for (DiscoveryNode node : knownNodes) { final Transport.Connection transportConnection = connection.getConnection(node); @@ -505,7 +509,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { CountDownLatch listenerCalled = new CountDownLatch(1); AtomicReference exceptionReference = new AtomicReference<>(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { ActionListener listener = ActionListener.wrap(x -> { listenerCalled.countDown(); fail("expected exception"); @@ -542,7 +546,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { service.acceptIncomingRequests(); List> nodes = Collections.singletonList(() -> seedNode); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - nodes, service, Integer.MAX_VALUE, n -> true)) { + nodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { if (randomBoolean()) { updateSeedNodes(connection, nodes); } @@ -582,7 +586,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { service.acceptIncomingRequests(); List> nodes = Collections.singletonList(() -> seedNode); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - nodes, service, Integer.MAX_VALUE, n -> true)) { + nodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { SearchRequest request = new SearchRequest("test-index"); Thread[] threads = new Thread[10]; for (int i = 0; i < threads.length; i++) { @@ -636,7 +640,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Collections.singletonList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) { + Collections.singletonList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { SearchRequest request = new SearchRequest("test-index"); ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest("test-index") @@ -746,7 +750,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, Integer.MAX_VALUE, n -> true)) { + seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { int numThreads = randomIntBetween(4, 10); Thread[] threads = new Thread[numThreads]; CyclicBarrier barrier = new CyclicBarrier(numThreads); @@ -824,7 +828,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, Integer.MAX_VALUE, n -> true)) { + seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { int numThreads = randomIntBetween(4, 10); Thread[] threads = new Thread[numThreads]; CyclicBarrier barrier = new CyclicBarrier(numThreads + 1); @@ -913,7 +917,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { service.acceptIncomingRequests(); int maxNumConnections = randomIntBetween(1, 5); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, maxNumConnections, n -> true)) { + seedNodes, service, service.connectionManager(), maxNumConnections, n -> true)) { // test no nodes connected RemoteConnectionInfo remoteConnectionInfo = assertSerialization(connection.getConnectionInfo()); assertNotNull(remoteConnectionInfo); @@ -1060,7 +1064,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { assertFalse(service.nodeConnected(seedNode)); assertFalse(service.nodeConnected(discoverableNode)); assertTrue(connection.assertNoRunningConnections()); @@ -1109,7 +1113,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) { + Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { if (randomBoolean()) { updateSeedNodes(connection, Arrays.asList(() -> seedNode)); } @@ -1157,7 +1161,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, Integer.MAX_VALUE, n -> true)) { + seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { final int numGetThreads = randomIntBetween(4, 10); final Thread[] getThreads = new Thread[numGetThreads]; final int numModifyingThreads = randomIntBetween(4, 10); @@ -1247,7 +1251,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList( () -> seedNode), service, Integer.MAX_VALUE, n -> true)) { + Arrays.asList( () -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { updateSeedNodes(connection, Arrays.asList(() -> seedNode)); assertTrue(service.nodeConnected(seedNode)); assertTrue(service.nodeConnected(discoverableNode)); @@ -1327,7 +1331,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Collections.singletonList(() -> connectedNode), service, Integer.MAX_VALUE, n -> true)) { + Collections.singletonList(() -> connectedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { connection.addConnectedNode(connectedNode); for (int i = 0; i < 10; i++) { //always a direct connection as the remote node is already connected @@ -1335,9 +1339,9 @@ public class RemoteClusterConnectionTests extends ESTestCase { assertSame(seedConnection, remoteConnection); } for (int i = 0; i < 10; i++) { - //always a direct connection as the remote node is already connected + // we don't use the transport service connection manager so we will get a proxy connection for the local node Transport.Connection remoteConnection = connection.getConnection(service.getLocalNode()); - assertThat(remoteConnection, not(instanceOf(RemoteClusterConnection.ProxyConnection.class))); + assertThat(remoteConnection, instanceOf(RemoteClusterConnection.ProxyConnection.class)); assertThat(remoteConnection.getNode(), equalTo(service.getLocalNode())); } for (int i = 0; i < 10; i++) { @@ -1369,7 +1373,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { return seedNode; }; try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedSupplier), service, Integer.MAX_VALUE, n -> true)) { + Arrays.asList(seedSupplier), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) { updateSeedNodes(connection, Arrays.asList(seedSupplier)); // Closing connections leads to RemoteClusterConnection.ConnectHandler.collectRemoteNodes // being called again so we try to resolve the same seed node's host twice diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index c94b1cbdef5..f1929e72d8b 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -283,6 +283,7 @@ public class RemoteClusterServiceTests extends ESTestCase { assertTrue(service.isRemoteClusterRegistered("cluster_2")); assertFalse(service.isRemoteNodeConnected("cluster_2", c2N1Node)); assertTrue(service.isRemoteNodeConnected("cluster_2", c2N2Node)); + assertEquals(0, transportService.getConnectionManager().size()); } } } @@ -347,6 +348,7 @@ public class RemoteClusterServiceTests extends ESTestCase { assertTrue(service.isRemoteClusterRegistered("cluster_2")); assertFalse(service.isRemoteNodeConnected("cluster_2", c2N1Node)); assertTrue(service.isRemoteNodeConnected("cluster_2", c2N2Node)); + assertEquals(0, transportService.getConnectionManager().size()); } } } @@ -579,14 +581,16 @@ public class RemoteClusterServiceTests extends ESTestCase { } CountDownLatch disconnectedLatch = new CountDownLatch(numDisconnectedClusters); - service.addConnectionListener(new TransportConnectionListener() { - @Override - public void onNodeDisconnected(DiscoveryNode node) { - if (disconnectedNodes.remove(node)) { - disconnectedLatch.countDown(); + for (RemoteClusterConnection connection : remoteClusterService.getConnections()) { + connection.getConnectionManager().addListener(new TransportConnectionListener() { + @Override + public void onNodeDisconnected(DiscoveryNode node) { + if (disconnectedNodes.remove(node)) { + disconnectedLatch.countDown(); + } } - } - }); + }); + } for (DiscoveryNode disconnectedNode : disconnectedNodes) { service.addFailToSendNoConnectRule(disconnectedNode.getAddress()); @@ -664,6 +668,7 @@ public class RemoteClusterServiceTests extends ESTestCase { assertTrue(shardsResponse != ClusterSearchShardsResponse.EMPTY); } } + assertEquals(0, service.getConnectionManager().size()); } } } finally { diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java index 486ccc805d0..012369feb83 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java @@ -120,8 +120,8 @@ public class StubbableConnectionManager extends ConnectionManager { } @Override - public int connectedNodeCount() { - return delegate.connectedNodeCount(); + public int size() { + return delegate.size(); } @Override