Use a dedicated ConnectionManger for RemoteClusterConnection (#32988)

This change introduces a dedicated ConnectionManager for every RemoteClusterConnection
such that there is not state shared with the TransportService internal ConnectionManager.
All connections to a remote cluster are isolated from the TransportService but still uses
the TransportService and it's internal properties like the Transport, tracing and internal
listener actions on disconnects etc.
This allows a remote cluster connection to have a different lifecycle than a local cluster connection,
also local discovery code doesn't get notified if there is a disconnect on from a remote cluster and
each connection can use it's own dedicated connection profile which allows to have a reduced set of
connections per cluster without conflicting with the local cluster.

Closes #31835
This commit is contained in:
Simon Willnauer 2018-08-21 12:43:25 +02:00 committed by GitHub
parent 65d4f27873
commit 92076497e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 168 additions and 131 deletions

View File

@ -44,6 +44,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -62,6 +63,7 @@ public class ConnectionManager implements Closeable {
private final TimeValue pingSchedule;
private final ConnectionProfile defaultProfile;
private final Lifecycle lifecycle = new Lifecycle();
private final AtomicBoolean closed = new AtomicBoolean(false);
private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener();
@ -83,7 +85,9 @@ public class ConnectionManager implements Closeable {
}
public void addListener(TransportConnectionListener listener) {
this.connectionListener.listeners.add(listener);
if (connectionListener.listeners.contains(listener) == false) {
this.connectionListener.listeners.add(listener);
}
}
public void removeListener(TransportConnectionListener listener) {
@ -186,45 +190,50 @@ public class ConnectionManager implements Closeable {
}
}
public int connectedNodeCount() {
/**
* Returns the number of nodes this manager is connected to.
*/
public int size() {
return connectedNodes.size();
}
@Override
public void close() {
lifecycle.moveToStopped();
CountDownLatch latch = new CountDownLatch(1);
if (closed.compareAndSet(false, true)) {
lifecycle.moveToStopped();
CountDownLatch latch = new CountDownLatch(1);
// TODO: Consider moving all read/write lock (in Transport and this class) to the TransportService
threadPool.generic().execute(() -> {
closeLock.writeLock().lock();
try {
// we are holding a write lock so nobody modifies the connectedNodes / openConnections map - it's safe to first close
// all instances and then clear them maps
Iterator<Map.Entry<DiscoveryNode, Transport.Connection>> iterator = connectedNodes.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<DiscoveryNode, Transport.Connection> next = iterator.next();
try {
IOUtils.closeWhileHandlingException(next.getValue());
} finally {
iterator.remove();
// TODO: Consider moving all read/write lock (in Transport and this class) to the TransportService
threadPool.generic().execute(() -> {
closeLock.writeLock().lock();
try {
// we are holding a write lock so nobody modifies the connectedNodes / openConnections map - it's safe to first close
// all instances and then clear them maps
Iterator<Map.Entry<DiscoveryNode, Transport.Connection>> iterator = connectedNodes.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<DiscoveryNode, Transport.Connection> next = iterator.next();
try {
IOUtils.closeWhileHandlingException(next.getValue());
} finally {
iterator.remove();
}
}
} finally {
closeLock.writeLock().unlock();
latch.countDown();
}
});
try {
try {
latch.await(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// ignore
}
} finally {
closeLock.writeLock().unlock();
latch.countDown();
lifecycle.moveToClosed();
}
});
try {
try {
latch.await(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// ignore
}
} finally {
lifecycle.moveToClosed();
}
}

View File

@ -35,6 +35,7 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
@ -80,16 +81,17 @@ import java.util.stream.Collectors;
final class RemoteClusterConnection extends AbstractComponent implements TransportConnectionListener, Closeable {
private final TransportService transportService;
private final ConnectionManager connectionManager;
private final ConnectionProfile remoteProfile;
private final ConnectedNodes connectedNodes;
private final String clusterAlias;
private final int maxNumRemoteConnections;
private final Predicate<DiscoveryNode> nodePredicate;
private final ThreadPool threadPool;
private volatile List<Supplier<DiscoveryNode>> seedNodes;
private volatile boolean skipUnavailable;
private final ConnectHandler connectHandler;
private SetOnce<ClusterName> remoteClusterName = new SetOnce<>();
private final ClusterName localClusterName;
/**
* Creates a new {@link RemoteClusterConnection}
@ -97,13 +99,14 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
* @param clusterAlias the configured alias of the cluster to connect to
* @param seedNodes a list of seed nodes to discover eligible nodes from
* @param transportService the local nodes transport service
* @param connectionManager the connection manager to use for this remote connection
* @param maxNumRemoteConnections the maximum number of connections to the remote cluster
* @param nodePredicate a predicate to filter eligible remote nodes to connect to
*/
RemoteClusterConnection(Settings settings, String clusterAlias, List<Supplier<DiscoveryNode>> seedNodes,
TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate) {
TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections,
Predicate<DiscoveryNode> nodePredicate) {
super(settings);
this.localClusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
this.transportService = transportService;
this.maxNumRemoteConnections = maxNumRemoteConnections;
this.nodePredicate = nodePredicate;
@ -122,7 +125,11 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
this.skipUnavailable = RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE
.getConcreteSettingForNamespace(clusterAlias).get(settings);
this.connectHandler = new ConnectHandler();
transportService.addConnectionListener(this);
this.threadPool = transportService.threadPool;
this.connectionManager = connectionManager;
connectionManager.addListener(this);
// we register the transport service here as a listener to make sure we notify handlers on disconnect etc.
connectionManager.addListener(transportService);
}
/**
@ -183,8 +190,9 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
private void fetchShardsInternal(ClusterSearchShardsRequest searchShardsRequest,
final ActionListener<ClusterSearchShardsResponse> listener) {
final DiscoveryNode node = connectedNodes.getAny();
transportService.sendRequest(node, ClusterSearchShardsAction.NAME, searchShardsRequest,
final DiscoveryNode node = getAnyConnectedNode();
Transport.Connection connection = connectionManager.getConnection(node);
transportService.sendRequest(connection, ClusterSearchShardsAction.NAME, searchShardsRequest, TransportRequestOptions.EMPTY,
new TransportResponseHandler<ClusterSearchShardsResponse>() {
@Override
@ -219,12 +227,16 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
request.clear();
request.nodes(true);
request.local(true); // run this on the node that gets the request it's as good as any other
final DiscoveryNode node = connectedNodes.getAny();
transportService.sendRequest(node, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY,
final DiscoveryNode node = getAnyConnectedNode();
Transport.Connection connection = connectionManager.getConnection(node);
transportService.sendRequest(connection, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY,
new TransportResponseHandler<ClusterStateResponse>() {
@Override
public ClusterStateResponse newInstance() {
return new ClusterStateResponse();
public ClusterStateResponse read(StreamInput in) throws IOException {
ClusterStateResponse response = new ClusterStateResponse();
response.readFrom(in);
return response;
}
@Override
@ -261,11 +273,11 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
* If such node is not connected, the returned connection will be a proxy connection that redirects to it.
*/
Transport.Connection getConnection(DiscoveryNode remoteClusterNode) {
if (transportService.nodeConnected(remoteClusterNode)) {
return transportService.getConnection(remoteClusterNode);
if (connectionManager.nodeConnected(remoteClusterNode)) {
return connectionManager.getConnection(remoteClusterNode);
}
DiscoveryNode discoveryNode = connectedNodes.getAny();
Transport.Connection connection = transportService.getConnection(discoveryNode);
DiscoveryNode discoveryNode = getAnyConnectedNode();
Transport.Connection connection = connectionManager.getConnection(discoveryNode);
return new ProxyConnection(connection, remoteClusterNode);
}
@ -317,33 +329,18 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
}
Transport.Connection getConnection() {
return transportService.getConnection(getAnyConnectedNode());
return connectionManager.getConnection(getAnyConnectedNode());
}
@Override
public void close() throws IOException {
connectHandler.close();
IOUtils.close(connectHandler, connectionManager);
}
public boolean isClosed() {
return connectHandler.isClosed();
}
private ConnectionProfile getRemoteProfile(ClusterName name) {
// we can only compare the cluster name to make a decision if we should use a remote profile
// we can't use a cluster UUID here since we could be connecting to that remote cluster before
// the remote node has joined its cluster and have a cluster UUID. The fact that we just lose a
// rather smallish optimization on the connection layer under certain situations where remote clusters
// have the same name as the local one is minor here.
// the alternative here is to complicate the remote infrastructure to also wait until we formed a cluster,
// gained a cluster UUID and then start connecting etc. we rather use this simplification in order to maintain simplicity
if (this.localClusterName.equals(name)) {
return null;
} else {
return remoteProfile;
}
}
/**
* The connect handler manages node discovery and the actual connect to the remote cluster.
* There is at most one connect job running at any time. If such a connect job is triggered
@ -387,7 +384,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
final boolean runConnect;
final Collection<ActionListener<Void>> toNotify;
final ActionListener<Void> listener = connectListener == null ? null :
ContextPreservingActionListener.wrapPreservingContext(connectListener, transportService.getThreadPool().getThreadContext());
ContextPreservingActionListener.wrapPreservingContext(connectListener, threadPool.getThreadContext());
synchronized (queue) {
if (listener != null && queue.offer(listener) == false) {
listener.onFailure(new RejectedExecutionException("connect queue is full"));
@ -415,7 +412,6 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
}
private void forkConnect(final Collection<ActionListener<Void>> toNotify) {
ThreadPool threadPool = transportService.getThreadPool();
ExecutorService executor = threadPool.executor(ThreadPool.Names.MANAGEMENT);
executor.submit(new AbstractRunnable() {
@Override
@ -452,13 +448,13 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
maybeConnect();
}
});
collectRemoteNodes(seedNodes.iterator(), transportService, listener);
collectRemoteNodes(seedNodes.iterator(), transportService, connectionManager, listener);
}
});
}
private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes,
final TransportService transportService, ActionListener<Void> listener) {
final TransportService transportService, final ConnectionManager manager, ActionListener<Void> listener) {
if (Thread.currentThread().isInterrupted()) {
listener.onFailure(new InterruptedException("remote connect thread got interrupted"));
}
@ -467,7 +463,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
cancellableThreads.executeIO(() -> {
final DiscoveryNode seedNode = seedNodes.next().get();
final TransportService.HandshakeResponse handshakeResponse;
Transport.Connection connection = transportService.openConnection(seedNode,
Transport.Connection connection = manager.openConnection(seedNode,
ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null));
boolean success = false;
try {
@ -482,7 +478,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
final DiscoveryNode handshakeNode = handshakeResponse.getDiscoveryNode();
if (nodePredicate.test(handshakeNode) && connectedNodes.size() < maxNumRemoteConnections) {
transportService.connectToNode(handshakeNode, getRemoteProfile(handshakeResponse.getClusterName()));
manager.connectToNode(handshakeNode, remoteProfile, transportService.connectionValidator(handshakeNode));
if (remoteClusterName.get() == null) {
assert handshakeResponse.getClusterName().value() != null;
remoteClusterName.set(handshakeResponse.getClusterName());
@ -524,7 +520,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
// ISE if we fail the handshake with an version incompatible node
if (seedNodes.hasNext()) {
logger.debug(() -> new ParameterizedMessage("fetching nodes from external cluster {} failed", clusterAlias), ex);
collectRemoteNodes(seedNodes, transportService, listener);
collectRemoteNodes(seedNodes, transportService, manager, listener);
} else {
listener.onFailure(ex);
}
@ -552,7 +548,6 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
/* This class handles the _state response from the remote cluster when sniffing nodes to connect to */
private class SniffClusterStateResponseHandler implements TransportResponseHandler<ClusterStateResponse> {
private final TransportService transportService;
private final Transport.Connection connection;
private final ActionListener<Void> listener;
private final Iterator<Supplier<DiscoveryNode>> seedNodes;
@ -561,7 +556,6 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
SniffClusterStateResponseHandler(TransportService transportService, Transport.Connection connection,
ActionListener<Void> listener, Iterator<Supplier<DiscoveryNode>> seedNodes,
CancellableThreads cancellableThreads) {
this.transportService = transportService;
this.connection = connection;
this.listener = listener;
this.seedNodes = seedNodes;
@ -592,8 +586,8 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
for (DiscoveryNode node : nodesIter) {
if (nodePredicate.test(node) && connectedNodes.size() < maxNumRemoteConnections) {
try {
transportService.connectToNode(node, getRemoteProfile(remoteClusterName.get())); // noop if node is
// connected
connectionManager.connectToNode(node, remoteProfile,
transportService.connectionValidator(node)); // noop if node is connected
connectedNodes.add(node);
} catch (ConnectTransportException | IllegalStateException ex) {
// ISE if we fail the handshake with an version incompatible node
@ -609,7 +603,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
listener.onFailure(ex); // we got canceled - fail the listener and step out
} catch (Exception ex) {
logger.warn(() -> new ParameterizedMessage("fetching nodes from external cluster {} failed", clusterAlias), ex);
collectRemoteNodes(seedNodes, transportService, listener);
collectRemoteNodes(seedNodes, transportService, connectionManager, listener);
}
}
@ -620,7 +614,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
IOUtils.closeWhileHandlingException(connection);
} finally {
// once the connection is closed lets try the next node
collectRemoteNodes(seedNodes, transportService, listener);
collectRemoteNodes(seedNodes, transportService, connectionManager, listener);
}
}
@ -715,4 +709,8 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
}
}
}
ConnectionManager getConnectionManager() {
return connectionManager;
}
}

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.transport;
import java.util.Collection;
import java.util.function.Supplier;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
@ -139,7 +140,8 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
}
if (remote == null) { // this is a new cluster we have to add a new representation
remote = new RemoteClusterConnection(settings, entry.getKey(), entry.getValue(), transportService, numRemoteConnections,
remote = new RemoteClusterConnection(settings, entry.getKey(), entry.getValue(), transportService,
new ConnectionManager(settings, transportService.transport, transportService.threadPool), numRemoteConnections,
getNodePredicate(settings));
remoteClusters.put(entry.getKey(), remote);
}
@ -411,4 +413,8 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
}
return new RemoteClusterAwareClient(settings, threadPool, transportService, clusterAlias);
}
Collection<RemoteClusterConnection> getConnections() {
return remoteClusters.values();
}
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
@ -56,6 +57,7 @@ import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collections;
@ -268,8 +270,9 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
@Override
protected void doStop() {
try {
connectionManager.close();
transport.stop();
IOUtils.close(connectionManager, remoteClusterService, transport::stop);
} catch (IOException e) {
throw new UncheckedIOException(e);
} finally {
// in case the transport is not connected to our local node (thus cleaned on node disconnect)
// make sure to clean any leftover on going handles
@ -306,7 +309,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
@Override
protected void doClose() throws IOException {
IOUtils.close(remoteClusterService, transport);
transport.close();
}
/**
@ -364,14 +367,18 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
if (isLocalNode(node)) {
return;
}
connectionManager.connectToNode(node, connectionProfile, connectionValidator(node));
}
connectionManager.connectToNode(node, connectionProfile, (newConnection, actualProfile) -> {
public CheckedBiConsumer<Transport.Connection, ConnectionProfile, IOException> connectionValidator(DiscoveryNode node) {
return (newConnection, actualProfile) -> {
// We don't validate cluster names to allow for CCS connections.
final DiscoveryNode remote = handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true).discoveryNode;
if (validateConnections && node.equals(remote) == false) {
throw new ConnectTransportException(node, "handshake failed. unexpected remote node " + remote);
}
});
};
}
/**
@ -562,8 +569,12 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
final TransportRequest request,
final TransportRequestOptions options,
TransportResponseHandler<T> handler) {
asyncSender.sendRequest(connection, action, request, options, handler);
try {
asyncSender.sendRequest(connection, action, request, options, handler);
} catch (NodeNotConnectedException ex) {
// the caller might not handle this so we invoke the handler
handler.handleException(ex);
}
}
/**

View File

@ -379,10 +379,10 @@ public class TransportClientNodesServiceTests extends ESTestCase {
transportClientNodesService.addTransportAddresses(remoteService.getLocalDiscoNode().getAddress());
assertEquals(1, transportClientNodesService.connectedNodes().size());
assertEquals(1, clientService.connectionManager().connectedNodeCount());
assertEquals(1, clientService.connectionManager().size());
transportClientNodesService.doSample();
assertEquals(1, clientService.connectionManager().connectedNodeCount());
assertEquals(1, clientService.connectionManager().size());
establishedConnections.clear();
handler.blockRequest();

View File

@ -134,7 +134,7 @@ public class NodeConnectionsServiceTests extends ESTestCase {
private void assertConnectedExactlyToNodes(ClusterState state) {
assertConnected(state.nodes());
assertThat(transportService.getConnectionManager().connectedNodeCount(), equalTo(state.nodes().getSize()));
assertThat(transportService.getConnectionManager().size(), equalTo(state.nodes().getSize()));
}
private void assertConnected(Iterable<DiscoveryNode> nodes) {

View File

@ -159,7 +159,7 @@ public class ConnectionManagerTests extends ESTestCase {
assertFalse(connection.isClosed());
assertTrue(connectionManager.nodeConnected(node));
assertSame(connection, connectionManager.getConnection(node));
assertEquals(1, connectionManager.connectedNodeCount());
assertEquals(1, connectionManager.size());
assertEquals(1, nodeConnectedCount.get());
assertEquals(0, nodeDisconnectedCount.get());
@ -169,7 +169,7 @@ public class ConnectionManagerTests extends ESTestCase {
connection.close();
}
assertTrue(connection.isClosed());
assertEquals(0, connectionManager.connectedNodeCount());
assertEquals(0, connectionManager.size());
assertEquals(1, nodeConnectedCount.get());
assertEquals(1, nodeDisconnectedCount.get());
}
@ -205,7 +205,7 @@ public class ConnectionManagerTests extends ESTestCase {
assertTrue(connection.isClosed());
assertFalse(connectionManager.nodeConnected(node));
expectThrows(NodeNotConnectedException.class, () -> connectionManager.getConnection(node));
assertEquals(0, connectionManager.connectedNodeCount());
assertEquals(0, connectionManager.size());
assertEquals(0, nodeConnectedCount.get());
assertEquals(0, nodeDisconnectedCount.get());
}

View File

@ -81,13 +81,15 @@ public class RemoteClusterClientTests extends ESTestCase {
try (MockTransportService service = MockTransportService.createNewService(localSettings, Version.CURRENT, threadPool, null)) {
Semaphore semaphore = new Semaphore(1);
service.start();
service.addConnectionListener(new TransportConnectionListener() {
@Override
public void onNodeDisconnected(DiscoveryNode node) {
if (remoteNode.equals(node)) {
semaphore.release();
service.getRemoteClusterService().getConnections().forEach(con -> {
con.getConnectionManager().addListener(new TransportConnectionListener() {
@Override
public void onNodeDisconnected(DiscoveryNode node) {
if (remoteNode.equals(node)) {
semaphore.release();
}
}
}
});
});
// this test is not perfect since we might reconnect concurrently but it will fail most of the time if we don't have
// the right calls in place in the RemoteAwareClient
@ -95,7 +97,9 @@ public class RemoteClusterClientTests extends ESTestCase {
for (int i = 0; i < 10; i++) {
semaphore.acquire();
try {
service.disconnectFromNode(remoteNode);
service.getRemoteClusterService().getConnections().forEach(con -> {
con.getConnectionManager().disconnectFromNode(remoteNode);
});
semaphore.acquire();
RemoteClusterService remoteClusterService = service.getRemoteClusterService();
Client client = remoteClusterService.getRemoteClusterClient(threadPool, "test");

View File

@ -145,7 +145,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
}
}
public void testLocalProfileIsUsedForLocalCluster() throws Exception {
public void testRemoteProfileIsUsedForLocalCluster() throws Exception {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) {
@ -159,7 +159,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) {
Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) {
updateSeedNodes(connection, Arrays.asList(() -> seedNode));
assertTrue(service.nodeConnected(seedNode));
assertTrue(service.nodeConnected(discoverableNode));
@ -175,9 +175,12 @@ public class RemoteClusterConnectionTests extends ESTestCase {
});
TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.BULK)
.build();
service.sendRequest(connection.getConnection(), ClusterSearchShardsAction.NAME, new ClusterSearchShardsRequest(),
options, futureHandler);
futureHandler.txGet();
IllegalStateException ise = (IllegalStateException) expectThrows(SendRequestTransportException.class, () -> {
service.sendRequest(discoverableNode,
ClusterSearchShardsAction.NAME, new ClusterSearchShardsRequest(), options, futureHandler);
futureHandler.txGet();
}).getCause();
assertEquals(ise.getMessage(), "can't select channel size is 0 for types: [RECOVERY, BULK, STATE]");
}
}
}
@ -199,7 +202,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) {
Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) {
updateSeedNodes(connection, Arrays.asList(() -> seedNode));
assertTrue(service.nodeConnected(seedNode));
assertTrue(service.nodeConnected(discoverableNode));
@ -255,7 +258,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) {
Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) {
updateSeedNodes(connection, Arrays.asList(() -> seedNode));
assertTrue(service.nodeConnected(seedNode));
assertTrue(service.nodeConnected(discoverableNode));
@ -284,7 +287,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
seedNodes, service, Integer.MAX_VALUE, n -> true)) {
seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) {
updateSeedNodes(connection, seedNodes);
assertTrue(service.nodeConnected(seedNode));
assertTrue(service.nodeConnected(discoverableNode));
@ -311,7 +314,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) {
Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) {
updateSeedNodes(connection, Arrays.asList(() -> seedNode));
assertTrue(service.nodeConnected(seedNode));
assertTrue(service.nodeConnected(discoverableNode));
@ -360,7 +363,8 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> n.equals(rejectedNode) == false)) {
Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE,
n -> n.equals(rejectedNode) == false)) {
updateSeedNodes(connection, Arrays.asList(() -> seedNode));
if (rejectedNode.equals(seedNode)) {
assertFalse(service.nodeConnected(seedNode));
@ -399,7 +403,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) {
Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) {
expectThrows(Exception.class, () -> updateSeedNodes(connection, Arrays.asList(() -> seedNode)));
assertFalse(service.nodeConnected(seedNode));
assertTrue(connection.assertNoRunningConnections());
@ -462,7 +466,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) {
Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) {
connection.addConnectedNode(seedNode);
for (DiscoveryNode node : knownNodes) {
final Transport.Connection transportConnection = connection.getConnection(node);
@ -505,7 +509,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
CountDownLatch listenerCalled = new CountDownLatch(1);
AtomicReference<Exception> exceptionReference = new AtomicReference<>();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) {
Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) {
ActionListener<Void> listener = ActionListener.wrap(x -> {
listenerCalled.countDown();
fail("expected exception");
@ -542,7 +546,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.acceptIncomingRequests();
List<Supplier<DiscoveryNode>> nodes = Collections.singletonList(() -> seedNode);
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
nodes, service, Integer.MAX_VALUE, n -> true)) {
nodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) {
if (randomBoolean()) {
updateSeedNodes(connection, nodes);
}
@ -582,7 +586,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.acceptIncomingRequests();
List<Supplier<DiscoveryNode>> nodes = Collections.singletonList(() -> seedNode);
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
nodes, service, Integer.MAX_VALUE, n -> true)) {
nodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) {
SearchRequest request = new SearchRequest("test-index");
Thread[] threads = new Thread[10];
for (int i = 0; i < threads.length; i++) {
@ -636,7 +640,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Collections.singletonList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) {
Collections.singletonList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) {
SearchRequest request = new SearchRequest("test-index");
ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest("test-index")
@ -746,7 +750,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
seedNodes, service, Integer.MAX_VALUE, n -> true)) {
seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) {
int numThreads = randomIntBetween(4, 10);
Thread[] threads = new Thread[numThreads];
CyclicBarrier barrier = new CyclicBarrier(numThreads);
@ -824,7 +828,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
seedNodes, service, Integer.MAX_VALUE, n -> true)) {
seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) {
int numThreads = randomIntBetween(4, 10);
Thread[] threads = new Thread[numThreads];
CyclicBarrier barrier = new CyclicBarrier(numThreads + 1);
@ -913,7 +917,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.acceptIncomingRequests();
int maxNumConnections = randomIntBetween(1, 5);
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
seedNodes, service, maxNumConnections, n -> true)) {
seedNodes, service, service.connectionManager(), maxNumConnections, n -> true)) {
// test no nodes connected
RemoteConnectionInfo remoteConnectionInfo = assertSerialization(connection.getConnectionInfo());
assertNotNull(remoteConnectionInfo);
@ -1060,7 +1064,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) {
Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) {
assertFalse(service.nodeConnected(seedNode));
assertFalse(service.nodeConnected(discoverableNode));
assertTrue(connection.assertNoRunningConnections());
@ -1109,7 +1113,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) {
Arrays.asList(() -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) {
if (randomBoolean()) {
updateSeedNodes(connection, Arrays.asList(() -> seedNode));
}
@ -1157,7 +1161,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
seedNodes, service, Integer.MAX_VALUE, n -> true)) {
seedNodes, service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) {
final int numGetThreads = randomIntBetween(4, 10);
final Thread[] getThreads = new Thread[numGetThreads];
final int numModifyingThreads = randomIntBetween(4, 10);
@ -1247,7 +1251,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList( () -> seedNode), service, Integer.MAX_VALUE, n -> true)) {
Arrays.asList( () -> seedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) {
updateSeedNodes(connection, Arrays.asList(() -> seedNode));
assertTrue(service.nodeConnected(seedNode));
assertTrue(service.nodeConnected(discoverableNode));
@ -1327,7 +1331,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Collections.singletonList(() -> connectedNode), service, Integer.MAX_VALUE, n -> true)) {
Collections.singletonList(() -> connectedNode), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) {
connection.addConnectedNode(connectedNode);
for (int i = 0; i < 10; i++) {
//always a direct connection as the remote node is already connected
@ -1335,9 +1339,9 @@ public class RemoteClusterConnectionTests extends ESTestCase {
assertSame(seedConnection, remoteConnection);
}
for (int i = 0; i < 10; i++) {
//always a direct connection as the remote node is already connected
// we don't use the transport service connection manager so we will get a proxy connection for the local node
Transport.Connection remoteConnection = connection.getConnection(service.getLocalNode());
assertThat(remoteConnection, not(instanceOf(RemoteClusterConnection.ProxyConnection.class)));
assertThat(remoteConnection, instanceOf(RemoteClusterConnection.ProxyConnection.class));
assertThat(remoteConnection.getNode(), equalTo(service.getLocalNode()));
}
for (int i = 0; i < 10; i++) {
@ -1369,7 +1373,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
return seedNode;
};
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(seedSupplier), service, Integer.MAX_VALUE, n -> true)) {
Arrays.asList(seedSupplier), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true)) {
updateSeedNodes(connection, Arrays.asList(seedSupplier));
// Closing connections leads to RemoteClusterConnection.ConnectHandler.collectRemoteNodes
// being called again so we try to resolve the same seed node's host twice

View File

@ -283,6 +283,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
assertTrue(service.isRemoteClusterRegistered("cluster_2"));
assertFalse(service.isRemoteNodeConnected("cluster_2", c2N1Node));
assertTrue(service.isRemoteNodeConnected("cluster_2", c2N2Node));
assertEquals(0, transportService.getConnectionManager().size());
}
}
}
@ -347,6 +348,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
assertTrue(service.isRemoteClusterRegistered("cluster_2"));
assertFalse(service.isRemoteNodeConnected("cluster_2", c2N1Node));
assertTrue(service.isRemoteNodeConnected("cluster_2", c2N2Node));
assertEquals(0, transportService.getConnectionManager().size());
}
}
}
@ -579,14 +581,16 @@ public class RemoteClusterServiceTests extends ESTestCase {
}
CountDownLatch disconnectedLatch = new CountDownLatch(numDisconnectedClusters);
service.addConnectionListener(new TransportConnectionListener() {
@Override
public void onNodeDisconnected(DiscoveryNode node) {
if (disconnectedNodes.remove(node)) {
disconnectedLatch.countDown();
for (RemoteClusterConnection connection : remoteClusterService.getConnections()) {
connection.getConnectionManager().addListener(new TransportConnectionListener() {
@Override
public void onNodeDisconnected(DiscoveryNode node) {
if (disconnectedNodes.remove(node)) {
disconnectedLatch.countDown();
}
}
}
});
});
}
for (DiscoveryNode disconnectedNode : disconnectedNodes) {
service.addFailToSendNoConnectRule(disconnectedNode.getAddress());
@ -664,6 +668,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
assertTrue(shardsResponse != ClusterSearchShardsResponse.EMPTY);
}
}
assertEquals(0, service.getConnectionManager().size());
}
}
} finally {

View File

@ -120,8 +120,8 @@ public class StubbableConnectionManager extends ConnectionManager {
}
@Override
public int connectedNodeCount() {
return delegate.connectedNodeCount();
public int size() {
return delegate.size();
}
@Override