Use default profile for remote connections (#50947)

Currently, the connection manager is configured with a default profile
for both the sniff and proxy connection stratgies. This profile
correctly reflects the expected number of connection (6 for sniff, 18
for proxy). This commit removes the proxy strategy usages of the per
connection attempt profile configuration.

Additionally, it refactors other unnecessary code around the connection
manager. The connection manager now can always be built inside the
remote connection.
This commit is contained in:
Tim Brooks 2020-01-13 21:46:23 -06:00 committed by GitHub
parent 90ba77951a
commit 50cb770315
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 13 additions and 53 deletions

View File

@ -94,7 +94,6 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
private final boolean includeServerName;
private final Supplier<TransportAddress> address;
private final AtomicReference<ClusterName> remoteClusterName = new AtomicReference<>();
private final ConnectionProfile profile;
private final ConnectionManager.ConnectionValidator clusterNameValidator;
ProxyConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
@ -129,11 +128,6 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
this.includeServerName = includeServerName;
assert Strings.isEmpty(configuredAddress) == false : "Cannot use proxy connection strategy with no configured addresses";
this.address = address;
// TODO: Move into the ConnectionManager
this.profile = new ConnectionProfile.Builder()
.addConnections(1, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING)
.addConnections(0, TransportRequestOptions.Type.BULK, TransportRequestOptions.Type.STATE, TransportRequestOptions.Type.RECOVERY)
.build();
this.clusterNameValidator = (newConnection, actualProfile, listener) ->
transportService.handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true,
ActionListener.map(listener, resp -> {
@ -231,7 +225,7 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
DiscoveryNode node = new DiscoveryNode(id, resolved, attributes, DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT.minimumCompatibilityVersion());
connectionManager.connectToNode(node, profile, clusterNameValidator, new ActionListener<Void>() {
connectionManager.connectToNode(node, null, clusterNameValidator, new ActionListener<Void>() {
@Override
public void onResponse(Void v) {
compositeListener.onResponse(v);

View File

@ -66,18 +66,13 @@ final class RemoteClusterConnection implements Closeable {
* @param transportService the local nodes transport service
*/
RemoteClusterConnection(Settings settings, String clusterAlias, TransportService transportService) {
this(settings, clusterAlias, transportService,
createConnectionManager(RemoteConnectionStrategy.buildConnectionProfile(clusterAlias, settings), transportService));
}
RemoteClusterConnection(Settings settings, String clusterAlias, TransportService transportService,
ConnectionManager connectionManager) {
this.transportService = transportService;
this.clusterAlias = clusterAlias;
this.remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
ConnectionProfile profile = RemoteConnectionStrategy.buildConnectionProfile(clusterAlias, settings);
this.remoteConnectionManager = new RemoteConnectionManager(clusterAlias, createConnectionManager(profile, transportService));
this.connectionStrategy = RemoteConnectionStrategy.buildStrategy(clusterAlias, transportService, remoteConnectionManager, settings);
// we register the transport service here as a listener to make sure we notify handlers on disconnect etc.
connectionManager.addListener(transportService);
this.remoteConnectionManager.getConnectionManager().addListener(transportService);
this.skipUnavailable = RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE
.getConcreteSettingForNamespace(clusterAlias).get(settings);
this.threadPool = transportService.threadPool;

View File

@ -57,7 +57,6 @@ import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.test.transport.StubbableConnectionManager;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
@ -86,7 +85,6 @@ import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.sameInstance;
public class RemoteClusterConnectionTests extends ESTestCase {
@ -576,51 +574,24 @@ public class RemoteClusterConnectionTests extends ESTestCase {
public void testGetConnection() throws Exception {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) {
MockTransportService disconnectedTransport = startTransport("disconnected_node", knownNodes, Version.CURRENT)) {
DiscoveryNode connectedNode = seedTransport.getLocalDiscoNode();
assertThat(connectedNode, notNullValue());
knownNodes.add(connectedNode);
DiscoveryNode seedNode = seedTransport.getLocalNode();
knownNodes.add(seedNode);
DiscoveryNode disconnectedNode = discoverableTransport.getLocalDiscoNode();
assertThat(disconnectedNode, notNullValue());
knownNodes.add(disconnectedNode);
DiscoveryNode disconnectedNode = disconnectedTransport.getLocalNode();
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
Transport.Connection seedConnection = new CloseableConnection() {
@Override
public DiscoveryNode getNode() {
return connectedNode;
}
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws TransportException {
// no-op
}
};
ConnectionManager delegate = new ConnectionManager(Settings.EMPTY, service.transport);
StubbableConnectionManager connectionManager = new StubbableConnectionManager(delegate, Settings.EMPTY, service.transport);
connectionManager.setDefaultNodeConnectedBehavior((cm, node) -> connectedNode.equals(node));
connectionManager.addGetConnectionBehavior(connectedNode.getAddress(), (cm, discoveryNode) -> seedConnection);
connectionManager.addGetConnectionBehavior(disconnectedNode.getAddress(), (cm, discoveryNode) -> {
throw new NodeNotConnectedException(discoveryNode, "");
});
service.start();
service.acceptIncomingRequests();
String clusterAlias = "test-cluster";
Settings settings = buildRandomSettings(clusterAlias, addresses(connectedNode));
try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service, connectionManager)) {
Settings settings = buildRandomSettings(clusterAlias, addresses(seedNode));
try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service)) {
PlainActionFuture.get(fut -> connection.ensureConnected(ActionListener.map(fut, x -> null)));
for (int i = 0; i < 10; i++) {
//always a direct connection as the remote node is already connected
Transport.Connection remoteConnection = connection.getConnection(connectedNode);
assertSame(seedConnection, remoteConnection);
Transport.Connection remoteConnection = connection.getConnection(seedNode);
assertEquals(seedNode, remoteConnection.getNode());
}
for (int i = 0; i < 10; i++) {
// we don't use the transport service connection manager so we will get a proxy connection for the local node
@ -629,7 +600,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
assertThat(remoteConnection.getNode(), equalTo(service.getLocalNode()));
}
for (int i = 0; i < 10; i++) {
//always a proxy connection as the target node is not connected
// always a proxy connection as the target node is not connected
Transport.Connection remoteConnection = connection.getConnection(disconnectedNode);
assertThat(remoteConnection, instanceOf(RemoteConnectionManager.ProxyConnection.class));
assertThat(remoteConnection.getNode(), sameInstance(disconnectedNode));