From a742c58d45dfe77848e4603415f3754eecc30dc2 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 31 Jan 2020 09:43:25 -0700 Subject: [PATCH] Extract a ConnectionManager interface (#51722) Currently we have three different implementations representing a `ConnectionManager`. There is the basic `ConnectionManager` which holds all connections for a cluster. And a remote connection manager which support proxy behavior. And a stubbable connection manager for tests. The remote and stubbable instances use the delegate pattern, so this commit extracts an interface for them all to implement. --- .../transport/ClusterConnectionManager.java | 281 ++++++++++++++++++ .../transport/ConnectionManager.java | 270 ++--------------- .../transport/RemoteClusterConnection.java | 10 +- .../transport/RemoteConnectionManager.java | 62 +++- .../transport/RemoteConnectionStrategy.java | 6 +- .../transport/SniffConnectionStrategy.java | 2 +- .../transport/TransportService.java | 2 +- .../discovery/PeerFinderTests.java | 5 +- ...ava => ClusterConnectionManagerTests.java} | 13 +- .../ProxyConnectionStrategyTests.java | 14 +- .../RemoteClusterConnectionTests.java | 2 +- .../RemoteConnectionManagerTests.java | 23 +- .../RemoteConnectionStrategyTests.java | 6 +- .../SniffConnectionStrategyTests.java | 20 +- .../test/transport/MockTransport.java | 5 +- .../test/transport/MockTransportService.java | 4 +- .../transport/StubbableConnectionManager.java | 18 +- 17 files changed, 431 insertions(+), 312 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java rename server/src/test/java/org/elasticsearch/transport/{ConnectionManagerTests.java => ClusterConnectionManagerTests.java} (97%) diff --git a/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java new file mode 100644 index 00000000000..e8dba1c28f1 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java @@ -0,0 +1,281 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.transport; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRefCounted; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.ListenableFuture; +import org.elasticsearch.common.util.concurrent.RunOnce; +import org.elasticsearch.core.internal.io.IOUtils; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * This class manages node connections within a cluster. The connection is opened by the underlying transport. + * Once the connection is opened, this class manages the connection. This includes closing the connection when + * the connection manager is closed. + */ +public class ClusterConnectionManager implements ConnectionManager { + + private static final Logger logger = LogManager.getLogger(ClusterConnectionManager.class); + + private final ConcurrentMap connectedNodes = ConcurrentCollections.newConcurrentMap(); + private final ConcurrentMap> pendingConnections = ConcurrentCollections.newConcurrentMap(); + private final AbstractRefCounted connectingRefCounter = new AbstractRefCounted("connection manager") { + @Override + protected void closeInternal() { + Iterator> iterator = connectedNodes.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry next = iterator.next(); + try { + IOUtils.closeWhileHandlingException(next.getValue()); + } finally { + iterator.remove(); + } + } + closeLatch.countDown(); + } + }; + private final Transport transport; + private final ConnectionProfile defaultProfile; + private final AtomicBoolean closing = new AtomicBoolean(false); + private final CountDownLatch closeLatch = new CountDownLatch(1); + private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener(); + + public ClusterConnectionManager(Settings settings, Transport transport) { + this(ConnectionProfile.buildDefaultConnectionProfile(settings), transport); + } + + public ClusterConnectionManager(ConnectionProfile connectionProfile, Transport transport) { + this.transport = transport; + this.defaultProfile = connectionProfile; + } + + @Override + public void addListener(TransportConnectionListener listener) { + this.connectionListener.addListener(listener); + } + + @Override + public void removeListener(TransportConnectionListener listener) { + this.connectionListener.removeListener(listener); + } + + @Override + public void openConnection(DiscoveryNode node, ConnectionProfile connectionProfile, ActionListener listener) { + ConnectionProfile resolvedProfile = ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile); + internalOpenConnection(node, resolvedProfile, listener); + } + + /** + * Connects to a node with the given connection profile. If the node is already connected this method has no effect. + * Once a successful is established, it can be validated before being exposed. + * The ActionListener will be called on the calling thread or the generic thread pool. + */ + @Override + public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile, + ConnectionValidator connectionValidator, + ActionListener listener) throws ConnectTransportException { + ConnectionProfile resolvedProfile = ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile); + if (node == null) { + listener.onFailure(new ConnectTransportException(null, "can't connect to a null node")); + return; + } + + if (connectingRefCounter.tryIncRef() == false) { + listener.onFailure(new IllegalStateException("connection manager is closed")); + return; + } + + if (connectedNodes.containsKey(node)) { + connectingRefCounter.decRef(); + listener.onResponse(null); + return; + } + + final ListenableFuture currentListener = new ListenableFuture<>(); + final ListenableFuture existingListener = pendingConnections.putIfAbsent(node, currentListener); + if (existingListener != null) { + try { + // wait on previous entry to complete connection attempt + existingListener.addListener(listener, EsExecutors.newDirectExecutorService()); + } finally { + connectingRefCounter.decRef(); + } + return; + } + + currentListener.addListener(listener, EsExecutors.newDirectExecutorService()); + + final RunOnce releaseOnce = new RunOnce(connectingRefCounter::decRef); + internalOpenConnection(node, resolvedProfile, ActionListener.wrap(conn -> { + connectionValidator.validate(conn, resolvedProfile, ActionListener.wrap( + ignored -> { + assert Transports.assertNotTransportThread("connection validator success"); + try { + if (connectedNodes.putIfAbsent(node, conn) != null) { + logger.debug("existing connection to node [{}], closing new redundant connection", node); + IOUtils.closeWhileHandlingException(conn); + } else { + logger.debug("connected to node [{}]", node); + try { + connectionListener.onNodeConnected(node, conn); + } finally { + final Transport.Connection finalConnection = conn; + conn.addCloseListener(ActionListener.wrap(() -> { + logger.trace("unregistering {} after connection close and marking as disconnected", node); + connectedNodes.remove(node, finalConnection); + connectionListener.onNodeDisconnected(node, conn); + })); + } + } + } finally { + ListenableFuture future = pendingConnections.remove(node); + assert future == currentListener : "Listener in pending map is different than the expected listener"; + releaseOnce.run(); + future.onResponse(null); + } + }, e -> { + assert Transports.assertNotTransportThread("connection validator failure"); + IOUtils.closeWhileHandlingException(conn); + failConnectionListeners(node, releaseOnce, e, currentListener); + })); + }, e -> { + assert Transports.assertNotTransportThread("internalOpenConnection failure"); + failConnectionListeners(node, releaseOnce, e, currentListener); + })); + } + + /** + * Returns a connection for the given node if the node is connected. + * Connections returned from this method must not be closed. The lifecycle of this connection is + * maintained by this connection manager + * + * @throws NodeNotConnectedException if the node is not connected + * @see #connectToNode(DiscoveryNode, ConnectionProfile, ConnectionValidator, ActionListener) + */ + @Override + public Transport.Connection getConnection(DiscoveryNode node) { + Transport.Connection connection = connectedNodes.get(node); + if (connection == null) { + throw new NodeNotConnectedException(node, "Node not connected"); + } + return connection; + } + + /** + * Returns {@code true} if the node is connected. + */ + @Override + public boolean nodeConnected(DiscoveryNode node) { + return connectedNodes.containsKey(node); + } + + /** + * Disconnected from the given node, if not connected, will do nothing. + */ + @Override + public void disconnectFromNode(DiscoveryNode node) { + Transport.Connection nodeChannels = connectedNodes.remove(node); + if (nodeChannels != null) { + // if we found it and removed it we close + nodeChannels.close(); + } + } + + /** + * Returns the number of nodes this manager is connected to. + */ + @Override + public int size() { + return connectedNodes.size(); + } + + public Set getAllConnectedNodes() { + return Collections.unmodifiableSet(connectedNodes.keySet()); + } + + @Override + public void close() { + internalClose(true); + } + + @Override + public void closeNoBlock() { + internalClose(false); + } + + private void internalClose(boolean waitForPendingConnections) { + assert Transports.assertNotTransportThread("Closing ConnectionManager"); + if (closing.compareAndSet(false, true)) { + connectingRefCounter.decRef(); + if (waitForPendingConnections) { + try { + closeLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } + } + } + } + + private void internalOpenConnection(DiscoveryNode node, ConnectionProfile connectionProfile, + ActionListener listener) { + transport.openConnection(node, connectionProfile, ActionListener.map(listener, connection -> { + assert Transports.assertNotTransportThread("internalOpenConnection success"); + try { + connectionListener.onConnectionOpened(connection); + } finally { + connection.addCloseListener(ActionListener.wrap(() -> connectionListener.onConnectionClosed(connection))); + } + if (connection.isClosed()) { + throw new ConnectTransportException(node, "a channel closed while connecting"); + } + return connection; + })); + } + + private void failConnectionListeners(DiscoveryNode node, RunOnce releaseOnce, Exception e, ListenableFuture expectedListener) { + ListenableFuture future = pendingConnections.remove(node); + releaseOnce.run(); + if (future != null) { + assert future == expectedListener : "Listener in pending map is different than the expected listener"; + future.onFailure(e); + } + } + + @Override + public ConnectionProfile getConnectionProfile() { + return defaultProfile; + } + +} diff --git a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java index 110053bcee7..1f8a73d575d 100644 --- a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java @@ -16,266 +16,48 @@ * specific language governing permissions and limitations * under the License. */ + package org.elasticsearch.transport; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.AbstractRefCounted; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.util.concurrent.ListenableFuture; -import org.elasticsearch.common.util.concurrent.RunOnce; -import org.elasticsearch.core.internal.io.IOUtils; import java.io.Closeable; -import java.util.Collections; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; -/** - * This class manages node connections. The connection is opened by the underlying transport. Once the - * connection is opened, this class manages the connection. This includes keep-alive pings and closing - * the connection when the connection manager is closed. - */ -public class ConnectionManager implements Closeable { +public interface ConnectionManager extends Closeable { - private static final Logger logger = LogManager.getLogger(ConnectionManager.class); + void addListener(TransportConnectionListener listener); - private final ConcurrentMap connectedNodes = ConcurrentCollections.newConcurrentMap(); - private final ConcurrentMap> pendingConnections = ConcurrentCollections.newConcurrentMap(); - private final AbstractRefCounted connectingRefCounter = new AbstractRefCounted("connection manager") { - @Override - protected void closeInternal() { - Iterator> iterator = connectedNodes.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry next = iterator.next(); - try { - IOUtils.closeWhileHandlingException(next.getValue()); - } finally { - iterator.remove(); - } - } - closeLatch.countDown(); - } - }; - private final Transport transport; - private final ConnectionProfile defaultProfile; - private final AtomicBoolean closing = new AtomicBoolean(false); - private final CountDownLatch closeLatch = new CountDownLatch(1); - private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener(); + void removeListener(TransportConnectionListener listener); - public ConnectionManager(Settings settings, Transport transport) { - this(ConnectionProfile.buildDefaultConnectionProfile(settings), transport); - } + void openConnection(DiscoveryNode node, ConnectionProfile connectionProfile, ActionListener listener); - public ConnectionManager(ConnectionProfile connectionProfile, Transport transport) { - this.transport = transport; - this.defaultProfile = connectionProfile; - } + void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile, + ConnectionValidator connectionValidator, + ActionListener listener) throws ConnectTransportException; - public void addListener(TransportConnectionListener listener) { - this.connectionListener.listeners.addIfAbsent(listener); - } + Transport.Connection getConnection(DiscoveryNode node); - public void removeListener(TransportConnectionListener listener) { - this.connectionListener.listeners.remove(listener); - } + boolean nodeConnected(DiscoveryNode node); - public void openConnection(DiscoveryNode node, ConnectionProfile connectionProfile, ActionListener listener) { - ConnectionProfile resolvedProfile = ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile); - internalOpenConnection(node, resolvedProfile, listener); - } + void disconnectFromNode(DiscoveryNode node); + + int size(); + + @Override + void close(); + + void closeNoBlock(); + + ConnectionProfile getConnectionProfile(); @FunctionalInterface - public interface ConnectionValidator { + interface ConnectionValidator { void validate(Transport.Connection connection, ConnectionProfile profile, ActionListener listener); } - /** - * Connects to a node with the given connection profile. If the node is already connected this method has no effect. - * Once a successful is established, it can be validated before being exposed. - * The ActionListener will be called on the calling thread or the generic thread pool. - */ - public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile, - ConnectionValidator connectionValidator, - ActionListener listener) throws ConnectTransportException { - ConnectionProfile resolvedProfile = ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile); - if (node == null) { - listener.onFailure(new ConnectTransportException(null, "can't connect to a null node")); - return; - } - - if (connectingRefCounter.tryIncRef() == false) { - listener.onFailure(new IllegalStateException("connection manager is closed")); - return; - } - - if (connectedNodes.containsKey(node)) { - connectingRefCounter.decRef(); - listener.onResponse(null); - return; - } - - final ListenableFuture currentListener = new ListenableFuture<>(); - final ListenableFuture existingListener = pendingConnections.putIfAbsent(node, currentListener); - if (existingListener != null) { - try { - // wait on previous entry to complete connection attempt - existingListener.addListener(listener, EsExecutors.newDirectExecutorService()); - } finally { - connectingRefCounter.decRef(); - } - return; - } - - currentListener.addListener(listener, EsExecutors.newDirectExecutorService()); - - final RunOnce releaseOnce = new RunOnce(connectingRefCounter::decRef); - internalOpenConnection(node, resolvedProfile, ActionListener.wrap(conn -> { - connectionValidator.validate(conn, resolvedProfile, ActionListener.wrap( - ignored -> { - assert Transports.assertNotTransportThread("connection validator success"); - try { - if (connectedNodes.putIfAbsent(node, conn) != null) { - logger.debug("existing connection to node [{}], closing new redundant connection", node); - IOUtils.closeWhileHandlingException(conn); - } else { - logger.debug("connected to node [{}]", node); - try { - connectionListener.onNodeConnected(node, conn); - } finally { - final Transport.Connection finalConnection = conn; - conn.addCloseListener(ActionListener.wrap(() -> { - logger.trace("unregistering {} after connection close and marking as disconnected", node); - connectedNodes.remove(node, finalConnection); - connectionListener.onNodeDisconnected(node, conn); - })); - } - } - } finally { - ListenableFuture future = pendingConnections.remove(node); - assert future == currentListener : "Listener in pending map is different than the expected listener"; - releaseOnce.run(); - future.onResponse(null); - } - }, e -> { - assert Transports.assertNotTransportThread("connection validator failure"); - IOUtils.closeWhileHandlingException(conn); - failConnectionListeners(node, releaseOnce, e, currentListener); - })); - }, e -> { - assert Transports.assertNotTransportThread("internalOpenConnection failure"); - failConnectionListeners(node, releaseOnce, e, currentListener); - })); - } - - /** - * Returns a connection for the given node if the node is connected. - * Connections returned from this method must not be closed. The lifecycle of this connection is - * maintained by this connection manager - * - * @throws NodeNotConnectedException if the node is not connected - * @see #connectToNode(DiscoveryNode, ConnectionProfile, ConnectionValidator, ActionListener) - */ - public Transport.Connection getConnection(DiscoveryNode node) { - Transport.Connection connection = connectedNodes.get(node); - if (connection == null) { - throw new NodeNotConnectedException(node, "Node not connected"); - } - return connection; - } - - /** - * Returns {@code true} if the node is connected. - */ - public boolean nodeConnected(DiscoveryNode node) { - return connectedNodes.containsKey(node); - } - - /** - * Disconnected from the given node, if not connected, will do nothing. - */ - public void disconnectFromNode(DiscoveryNode node) { - Transport.Connection nodeChannels = connectedNodes.remove(node); - if (nodeChannels != null) { - // if we found it and removed it we close - nodeChannels.close(); - } - } - - /** - * Returns the number of nodes this manager is connected to. - */ - public int size() { - return connectedNodes.size(); - } - - public Set getAllConnectedNodes() { - return Collections.unmodifiableSet(connectedNodes.keySet()); - } - - @Override - public void close() { - internalClose(true); - } - - public void closeNoBlock() { - internalClose(false); - } - - private void internalClose(boolean waitForPendingConnections) { - assert Transports.assertNotTransportThread("Closing ConnectionManager"); - if (closing.compareAndSet(false, true)) { - connectingRefCounter.decRef(); - if (waitForPendingConnections) { - try { - closeLatch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IllegalStateException(e); - } - } - } - } - - private void internalOpenConnection(DiscoveryNode node, ConnectionProfile connectionProfile, - ActionListener listener) { - transport.openConnection(node, connectionProfile, ActionListener.map(listener, connection -> { - assert Transports.assertNotTransportThread("internalOpenConnection success"); - try { - connectionListener.onConnectionOpened(connection); - } finally { - connection.addCloseListener(ActionListener.wrap(() -> connectionListener.onConnectionClosed(connection))); - } - if (connection.isClosed()) { - throw new ConnectTransportException(node, "a channel closed while connecting"); - } - return connection; - })); - } - - private void failConnectionListeners(DiscoveryNode node, RunOnce releaseOnce, Exception e, ListenableFuture expectedListener) { - ListenableFuture future = pendingConnections.remove(node); - releaseOnce.run(); - if (future != null) { - assert future == expectedListener : "Listener in pending map is different than the expected listener"; - future.onFailure(e); - } - } - - ConnectionProfile getConnectionProfile() { - return defaultProfile; - } - - private static final class DelegatingNodeConnectionListener implements TransportConnectionListener { + final class DelegatingNodeConnectionListener implements TransportConnectionListener { private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList<>(); @@ -306,5 +88,13 @@ public class ConnectionManager implements Closeable { listener.onConnectionClosed(connection); } } + + public void addListener(TransportConnectionListener listener) { + listeners.addIfAbsent(listener); + } + + public void removeListener(TransportConnectionListener listener) { + listeners.remove(listener); + } } } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index c87872317de..be663117154 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -72,7 +72,7 @@ final class RemoteClusterConnection implements Closeable { this.remoteConnectionManager = new RemoteConnectionManager(clusterAlias, createConnectionManager(profile, transportService)); this.connectionStrategy = RemoteConnectionStrategy.buildStrategy(clusterAlias, transportService, remoteConnectionManager, settings); // we register the transport service here as a listener to make sure we notify handlers on disconnect etc. - this.remoteConnectionManager.getConnectionManager().addListener(transportService); + this.remoteConnectionManager.addListener(transportService); this.skipUnavailable = RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE .getConcreteSettingForNamespace(clusterAlias).get(settings); this.threadPool = transportService.threadPool; @@ -171,7 +171,7 @@ final class RemoteClusterConnection implements Closeable { * If such node is not connected, the returned connection will be a proxy connection that redirects to it. */ Transport.Connection getConnection(DiscoveryNode remoteClusterNode) { - return remoteConnectionManager.getRemoteConnection(remoteClusterNode); + return remoteConnectionManager.getConnection(remoteClusterNode); } Transport.Connection getConnection() { @@ -193,7 +193,7 @@ final class RemoteClusterConnection implements Closeable { } boolean isNodeConnected(final DiscoveryNode node) { - return remoteConnectionManager.getConnectionManager().nodeConnected(node); + return remoteConnectionManager.nodeConnected(node); } /** @@ -208,11 +208,11 @@ final class RemoteClusterConnection implements Closeable { } private static ConnectionManager createConnectionManager(ConnectionProfile connectionProfile, TransportService transportService) { - return new ConnectionManager(connectionProfile, transportService.transport); + return new ClusterConnectionManager(connectionProfile, transportService.transport); } ConnectionManager getConnectionManager() { - return remoteConnectionManager.getConnectionManager(); + return remoteConnectionManager; } boolean shouldRebuildConnection(Settings newSettings) { diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionManager.java index 90340d841ba..de6f8d1300d 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionManager.java @@ -22,24 +22,23 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; -import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicLong; -public class RemoteConnectionManager implements Closeable { +public class RemoteConnectionManager implements ConnectionManager { private final String clusterAlias; - private final ConnectionManager connectionManager; + private final ConnectionManager delegate; private final AtomicLong counter = new AtomicLong(); private volatile List connections = Collections.emptyList(); - RemoteConnectionManager(String clusterAlias, ConnectionManager connectionManager) { + RemoteConnectionManager(String clusterAlias, ConnectionManager delegate) { this.clusterAlias = clusterAlias; - this.connectionManager = connectionManager; - this.connectionManager.addListener(new TransportConnectionListener() { + this.delegate = delegate; + this.delegate.addListener(new TransportConnectionListener() { @Override public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) { addConnection(connection); @@ -52,24 +51,52 @@ public class RemoteConnectionManager implements Closeable { }); } + @Override public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile, ConnectionManager.ConnectionValidator connectionValidator, ActionListener listener) throws ConnectTransportException { - connectionManager.connectToNode(node, connectionProfile, connectionValidator, listener); + delegate.connectToNode(node, connectionProfile, connectionValidator, listener); } + @Override + public void addListener(TransportConnectionListener listener) { + delegate.addListener(listener); + } + + @Override + public void removeListener(TransportConnectionListener listener) { + delegate.removeListener(listener); + } + + @Override public void openConnection(DiscoveryNode node, ConnectionProfile profile, ActionListener listener) { - connectionManager.openConnection(node, profile, listener); + delegate.openConnection(node, profile, listener); } - public Transport.Connection getRemoteConnection(DiscoveryNode node) { + @Override + public Transport.Connection getConnection(DiscoveryNode node) { try { - return connectionManager.getConnection(node); + return delegate.getConnection(node); } catch (NodeNotConnectedException e) { return new ProxyConnection(getAnyRemoteConnection(), node); } } + @Override + public boolean nodeConnected(DiscoveryNode node) { + return delegate.nodeConnected(node); + } + + @Override + public void disconnectFromNode(DiscoveryNode node) { + delegate.disconnectFromNode(node); + } + + @Override + public ConnectionProfile getConnectionProfile() { + return delegate.getConnectionProfile(); + } + public Transport.Connection getAnyRemoteConnection() { List localConnections = this.connections; if (localConnections.isEmpty()) { @@ -81,16 +108,19 @@ public class RemoteConnectionManager implements Closeable { } } - public ConnectionManager getConnectionManager() { - return connectionManager; - } - + @Override public int size() { - return connectionManager.size(); + return delegate.size(); } + @Override public void close() { - connectionManager.closeNoBlock(); + delegate.closeNoBlock(); + } + + @Override + public void closeNoBlock() { + delegate.closeNoBlock(); } private synchronized void addConnection(Transport.Connection addedConnection) { diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index 42b7e8c31f9..848c59c2255 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -117,7 +117,7 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis this.clusterAlias = clusterAlias; this.transportService = transportService; this.connectionManager = connectionManager; - connectionManager.getConnectionManager().addListener(this); + connectionManager.addListener(this); } static ConnectionProfile buildConnectionProfile(String clusterAlias, Settings settings) { @@ -271,7 +271,7 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis .getConcreteSettingForNamespace(clusterAlias) .get(newSettings); - ConnectionProfile oldProfile = connectionManager.getConnectionManager().getConnectionProfile(); + ConnectionProfile oldProfile = connectionManager.getConnectionProfile(); ConnectionProfile.Builder builder = new ConnectionProfile.Builder(oldProfile); builder.setCompressionEnabled(compressionEnabled); builder.setPingInterval(pingSchedule); @@ -299,7 +299,7 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis final List> toNotify; synchronized (mutex) { if (closed.compareAndSet(false, true)) { - connectionManager.getConnectionManager().removeListener(this); + connectionManager.removeListener(this); toNotify = listeners; listeners = Collections.emptyList(); } else { diff --git a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java index 0489582df30..8835fe86a69 100644 --- a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java @@ -316,7 +316,7 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { final StepListener handshakeStep = new StepListener<>(); openConnectionStep.whenComplete(connection -> { - ConnectionProfile connectionProfile = connectionManager.getConnectionManager().getConnectionProfile(); + ConnectionProfile connectionProfile = connectionManager.getConnectionProfile(); transportService.handshake(connection, connectionProfile.getHandshakeTimeout().millis(), getRemoteClusterNamePredicate(), handshakeStep); }, onFailure); diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index a83e138f04c..08c22966d36 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -152,7 +152,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran Function localNodeFactory, @Nullable ClusterSettings clusterSettings, Set taskHeaders) { this(settings, transport, threadPool, transportInterceptor, localNodeFactory, clusterSettings, taskHeaders, - new ConnectionManager(settings, transport)); + new ClusterConnectionManager(settings, transport)); } public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor, diff --git a/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java b/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java index 33397ae5802..1c268233d6c 100644 --- a/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java @@ -36,6 +36,7 @@ import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.test.transport.CapturingTransport.CapturedRequest; import org.elasticsearch.test.transport.StubbableConnectionManager; import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.ClusterConnectionManager; import org.elasticsearch.transport.ConnectionManager; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportResponseHandler; @@ -211,9 +212,9 @@ public class PeerFinderTests extends ESTestCase { localNode = newDiscoveryNode("local-node"); ConnectionManager innerConnectionManager - = new ConnectionManager(settings, capturingTransport); + = new ClusterConnectionManager(settings, capturingTransport); StubbableConnectionManager connectionManager - = new StubbableConnectionManager(innerConnectionManager, settings, capturingTransport); + = new StubbableConnectionManager(innerConnectionManager); connectionManager.setDefaultNodeConnectedBehavior((cm, discoveryNode) -> { final boolean isConnected = connectedNodes.contains(discoveryNode); final boolean isDisconnected = disconnectedNodes.contains(discoveryNode); diff --git a/server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java b/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java similarity index 97% rename from server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java rename to server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java index e82158a3c15..30dbd21899e 100644 --- a/server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java @@ -49,9 +49,9 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; -public class ConnectionManagerTests extends ESTestCase { +public class ClusterConnectionManagerTests extends ESTestCase { - private ConnectionManager connectionManager; + private ClusterConnectionManager connectionManager; private ThreadPool threadPool; private Transport transport; private ConnectionProfile connectionProfile; @@ -59,11 +59,11 @@ public class ConnectionManagerTests extends ESTestCase { @Before public void createConnectionManager() { Settings settings = Settings.builder() - .put("node.name", ConnectionManagerTests.class.getSimpleName()) + .put("node.name", ClusterConnectionManagerTests.class.getSimpleName()) .build(); threadPool = new ThreadPool(settings); transport = mock(Transport.class); - connectionManager = new ConnectionManager(settings, transport); + connectionManager = new ClusterConnectionManager(settings, transport); TimeValue oneSecond = new TimeValue(1000); TimeValue oneMinute = TimeValue.timeValueMinutes(1); connectionProfile = ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, oneSecond, oneSecond, @@ -321,6 +321,11 @@ public class ConnectionManagerTests extends ESTestCase { return node; } + @Override + public Version getVersion() { + return node.getVersion(); + } + @Override public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) throws TransportException { diff --git a/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java index 4654918d898..47ba6b17205 100644 --- a/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java @@ -90,7 +90,7 @@ public class ProxyConnectionStrategyTests extends ESTestCase { localService.start(); localService.acceptIncomingRequests(); - ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport); + ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); int numOfConnections = randomIntBetween(4, 8); try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager, @@ -119,7 +119,7 @@ public class ProxyConnectionStrategyTests extends ESTestCase { localService.start(); localService.acceptIncomingRequests(); - ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport); + ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); int numOfConnections = randomIntBetween(4, 8); AtomicBoolean useAddress1 = new AtomicBoolean(true); @@ -169,7 +169,7 @@ public class ProxyConnectionStrategyTests extends ESTestCase { localService.start(); localService.acceptIncomingRequests(); - ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport); + ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); int numOfConnections = randomIntBetween(4, 8); try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager, @@ -199,7 +199,7 @@ public class ProxyConnectionStrategyTests extends ESTestCase { localService.start(); localService.acceptIncomingRequests(); - ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport); + ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); int numOfConnections = randomIntBetween(4, 8); AtomicBoolean useAddress1 = new AtomicBoolean(true); @@ -251,7 +251,7 @@ public class ProxyConnectionStrategyTests extends ESTestCase { localService.start(); localService.acceptIncomingRequests(); - ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport); + ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); int numOfConnections = randomIntBetween(4, 8); try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager, @@ -276,7 +276,7 @@ public class ProxyConnectionStrategyTests extends ESTestCase { localService.start(); localService.acceptIncomingRequests(); - ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport); + ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); int numOfConnections = randomIntBetween(4, 8); try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager, @@ -359,7 +359,7 @@ public class ProxyConnectionStrategyTests extends ESTestCase { String address = "localhost:" + address1.getPort(); - ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport); + ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); int numOfConnections = randomIntBetween(4, 8); try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager, diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 11e0dfbce90..0abcca40811 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -547,7 +547,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { DiscoveryNode node = randomFrom(discoverableNodes); try { connection.getConnectionManager().getConnection(node); - } catch (NodeNotConnectedException e) { + } catch (NoSuchRemoteClusterException e) { // Ignore } } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionManagerTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionManagerTests.java index e2d33a6263a..ec60546b80e 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionManagerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionManagerTests.java @@ -46,7 +46,7 @@ public class RemoteConnectionManagerTests extends ESTestCase { public void setUp() throws Exception { super.setUp(); transport = mock(Transport.class); - remoteConnectionManager = new RemoteConnectionManager("remote-cluster", new ConnectionManager(Settings.EMPTY, transport)); + remoteConnectionManager = new RemoteConnectionManager("remote-cluster", new ClusterConnectionManager(Settings.EMPTY, transport)); } @SuppressWarnings("unchecked") @@ -72,25 +72,25 @@ public class RemoteConnectionManagerTests extends ESTestCase { remoteConnectionManager.connectToNode(node2, null, validator, future2); assertTrue(future2.isDone()); - assertEquals(node1, remoteConnectionManager.getRemoteConnection(node1).getNode()); - assertEquals(node2, remoteConnectionManager.getRemoteConnection(node2).getNode()); + assertEquals(node1, remoteConnectionManager.getConnection(node1).getNode()); + assertEquals(node2, remoteConnectionManager.getConnection(node2).getNode()); DiscoveryNode node4 = new DiscoveryNode("node-4", address, Version.CURRENT); - assertThat(remoteConnectionManager.getRemoteConnection(node4), instanceOf(RemoteConnectionManager.ProxyConnection.class)); + assertThat(remoteConnectionManager.getConnection(node4), instanceOf(RemoteConnectionManager.ProxyConnection.class)); // Test round robin Set versions = new HashSet<>(); - versions.add(remoteConnectionManager.getRemoteConnection(node4).getVersion()); - versions.add(remoteConnectionManager.getRemoteConnection(node4).getVersion()); + versions.add(remoteConnectionManager.getConnection(node4).getVersion()); + versions.add(remoteConnectionManager.getConnection(node4).getVersion()); assertThat(versions, hasItems(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion())); // Test that the connection is cleared from the round robin list when it is closed - remoteConnectionManager.getRemoteConnection(node1).close(); + remoteConnectionManager.getConnection(node1).close(); versions.clear(); - versions.add(remoteConnectionManager.getRemoteConnection(node4).getVersion()); - versions.add(remoteConnectionManager.getRemoteConnection(node4).getVersion()); + versions.add(remoteConnectionManager.getConnection(node4).getVersion()); + versions.add(remoteConnectionManager.getConnection(node4).getVersion()); assertThat(versions, hasItems(Version.CURRENT.minimumCompatibilityVersion())); assertEquals(1, versions.size()); @@ -109,6 +109,11 @@ public class RemoteConnectionManagerTests extends ESTestCase { return node; } + @Override + public Version getVersion() { + return node.getVersion(); + } + @Override public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) throws TransportException { diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java index 814a3bd1913..c9ce9a4c9a6 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java @@ -30,7 +30,7 @@ import static org.mockito.Mockito.mock; public class RemoteConnectionStrategyTests extends ESTestCase { public void testStrategyChangeMeansThatStrategyMustBeRebuilt() { - ConnectionManager connectionManager = new ConnectionManager(Settings.EMPTY, mock(Transport.class)); + ClusterConnectionManager connectionManager = new ClusterConnectionManager(Settings.EMPTY, mock(Transport.class)); RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager("cluster-alias", connectionManager); FakeConnectionStrategy first = new FakeConnectionStrategy("cluster-alias", mock(TransportService.class), remoteConnectionManager, RemoteConnectionStrategy.ConnectionStrategy.PROXY); @@ -41,7 +41,7 @@ public class RemoteConnectionStrategyTests extends ESTestCase { } public void testSameStrategyChangeMeansThatStrategyDoesNotNeedToBeRebuilt() { - ConnectionManager connectionManager = new ConnectionManager(Settings.EMPTY, mock(Transport.class)); + ClusterConnectionManager connectionManager = new ClusterConnectionManager(Settings.EMPTY, mock(Transport.class)); RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager("cluster-alias", connectionManager); FakeConnectionStrategy first = new FakeConnectionStrategy("cluster-alias", mock(TransportService.class), remoteConnectionManager, RemoteConnectionStrategy.ConnectionStrategy.PROXY); @@ -52,7 +52,7 @@ public class RemoteConnectionStrategyTests extends ESTestCase { } public void testChangeInConnectionProfileMeansTheStrategyMustBeRebuilt() { - ConnectionManager connectionManager = new ConnectionManager(TestProfiles.LIGHT_PROFILE, mock(Transport.class)); + ClusterConnectionManager connectionManager = new ClusterConnectionManager(TestProfiles.LIGHT_PROFILE, mock(Transport.class)); assertEquals(TimeValue.MINUS_ONE, connectionManager.getConnectionProfile().getPingInterval()); assertEquals(false, connectionManager.getConnectionProfile().getCompressionEnabled()); RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager("cluster-alias", connectionManager); diff --git a/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java index 80c11eef158..297c705b0e0 100644 --- a/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java @@ -125,7 +125,7 @@ public class SniffConnectionStrategyTests extends ESTestCase { localService.start(); localService.acceptIncomingRequests(); - ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport); + ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); SniffConnectionStrategy strategy = new SniffConnectionStrategy(clusterAlias, localService, remoteConnectionManager, null, 3, n -> true, seedNodes(seedNode))) { @@ -162,7 +162,7 @@ public class SniffConnectionStrategyTests extends ESTestCase { localService.start(); localService.acceptIncomingRequests(); - ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport); + ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); SniffConnectionStrategy strategy = new SniffConnectionStrategy(clusterAlias, localService, remoteConnectionManager, null, 3, n -> true, seedNodes(seedNode), Collections.singletonList(seedNodeSupplier))) { @@ -198,7 +198,7 @@ public class SniffConnectionStrategyTests extends ESTestCase { localService.start(); localService.acceptIncomingRequests(); - ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport); + ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); SniffConnectionStrategy strategy = new SniffConnectionStrategy(clusterAlias, localService, remoteConnectionManager, null, 2, n -> true, seedNodes(seedNode))) { @@ -244,7 +244,7 @@ public class SniffConnectionStrategyTests extends ESTestCase { localService.start(); localService.acceptIncomingRequests(); - ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport); + ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); SniffConnectionStrategy strategy = new SniffConnectionStrategy(clusterAlias, localService, remoteConnectionManager, null, 3, n -> true, seedNodes(seedNode))) { @@ -273,7 +273,7 @@ public class SniffConnectionStrategyTests extends ESTestCase { localService.start(); localService.acceptIncomingRequests(); - ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport); + ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); SniffConnectionStrategy strategy = new SniffConnectionStrategy(clusterAlias, localService, remoteConnectionManager, null, 3, n -> true, seedNodes(incompatibleSeedNode))) { @@ -303,7 +303,7 @@ public class SniffConnectionStrategyTests extends ESTestCase { localService.start(); localService.acceptIncomingRequests(); - ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport); + ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); SniffConnectionStrategy strategy = new SniffConnectionStrategy(clusterAlias, localService, remoteConnectionManager, null, 3, n -> n.equals(rejectedNode) == false, seedNodes(seedNode))) { @@ -347,7 +347,7 @@ public class SniffConnectionStrategyTests extends ESTestCase { localService.start(); localService.acceptIncomingRequests(); - ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport); + ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); SniffConnectionStrategy strategy = new SniffConnectionStrategy(clusterAlias, localService, remoteConnectionManager, null, 3, n -> true, seedNodes(seedNode, otherSeedNode))) { @@ -395,7 +395,7 @@ public class SniffConnectionStrategyTests extends ESTestCase { localService.start(); localService.acceptIncomingRequests(); - ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport); + ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); SniffConnectionStrategy strategy = new SniffConnectionStrategy(clusterAlias, localService, remoteConnectionManager, null, 3, n -> true, seedNodes(seedNode))) { @@ -465,7 +465,7 @@ public class SniffConnectionStrategyTests extends ESTestCase { List seedNodes = Collections.singletonList(accessibleNode.toString()); TransportAddress proxyAddress = accessibleNode.getAddress(); - ConnectionManager connectionManager = new ConnectionManager(profile, transport); + ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, transport); try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); SniffConnectionStrategy strategy = new SniffConnectionStrategy(clusterAlias, localService, remoteConnectionManager, proxyAddress.toString(), 3, n -> true, seedNodes)) { @@ -502,7 +502,7 @@ public class SniffConnectionStrategyTests extends ESTestCase { localService.start(); localService.acceptIncomingRequests(); - ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport); + ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); SniffConnectionStrategy strategy = new SniffConnectionStrategy(clusterAlias, localService, remoteConnectionManager, null, 3, n -> true, seedNodes(seedNode))) { diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java index f796b3037a7..ccb10d3a5e4 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java @@ -38,7 +38,7 @@ import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.CloseableConnection; -import org.elasticsearch.transport.ConnectionManager; +import org.elasticsearch.transport.ClusterConnectionManager; import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.RequestHandlerRegistry; @@ -79,8 +79,7 @@ public class MockTransport implements Transport, LifecycleComponent { public TransportService createTransportService(Settings settings, ThreadPool threadPool, TransportInterceptor interceptor, Function localNodeFactory, @Nullable ClusterSettings clusterSettings, Set taskHeaders) { - StubbableConnectionManager connectionManager = new StubbableConnectionManager(new ConnectionManager(settings, this), - settings, this); + StubbableConnectionManager connectionManager = new StubbableConnectionManager(new ClusterConnectionManager(settings, this)); connectionManager.setDefaultNodeConnectedBehavior((cm, node) -> false); connectionManager.setDefaultGetConnectionBehavior((cm, discoveryNode) -> createConnection(discoveryNode)); return new TransportService(settings, this, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders, diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 806b78b7673..0653bcd8daa 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -48,7 +48,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.tasks.MockTaskManager; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; -import org.elasticsearch.transport.ConnectionManager; +import org.elasticsearch.transport.ClusterConnectionManager; import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.RequestHandlerRegistry; import org.elasticsearch.transport.Transport; @@ -160,7 +160,7 @@ public final class MockTransportService extends TransportService { Function localNodeFactory, @Nullable ClusterSettings clusterSettings, Set taskHeaders) { super(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders, - new StubbableConnectionManager(new ConnectionManager(settings, transport), settings, transport)); + new StubbableConnectionManager(new ClusterConnectionManager(settings, transport))); this.original = transport.getDelegate(); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java index ea6e145d882..9bdc5df6542 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java @@ -20,26 +20,24 @@ package org.elasticsearch.test.transport; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.transport.ConnectTransportException; -import org.elasticsearch.transport.ConnectionManager; import org.elasticsearch.transport.ConnectionProfile; +import org.elasticsearch.transport.ConnectionManager; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportConnectionListener; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -public class StubbableConnectionManager extends ConnectionManager { +public class StubbableConnectionManager implements ConnectionManager { private final ConnectionManager delegate; private final ConcurrentMap getConnectionBehaviors; private volatile GetConnectionBehavior defaultGetConnectionBehavior = ConnectionManager::getConnection; private volatile NodeConnectedBehavior defaultNodeConnectedBehavior = ConnectionManager::nodeConnected; - public StubbableConnectionManager(ConnectionManager delegate, Settings settings, Transport transport) { - super(settings, transport); + public StubbableConnectionManager(ConnectionManager delegate) { this.delegate = delegate; this.getConnectionBehaviors = new ConcurrentHashMap<>(); } @@ -118,6 +116,16 @@ public class StubbableConnectionManager extends ConnectionManager { delegate.close(); } + @Override + public void closeNoBlock() { + delegate.closeNoBlock(); + } + + @Override + public ConnectionProfile getConnectionProfile() { + return delegate.getConnectionProfile(); + } + @FunctionalInterface public interface GetConnectionBehavior { Transport.Connection getConnection(ConnectionManager connectionManager, DiscoveryNode discoveryNode);