From 6e7478b8461699093be6dd0162512ae5d28393f2 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 14 Jan 2020 10:57:44 -0600 Subject: [PATCH] Allow proxy mode server name to be configured (#50951) Currently, proxy mode allows a remote cluster connection to be setup by expecting all open connections to be routed through an intermediate proxy. The proxy must use some logic to ensure that the connections end up on the correct remote cluster. One mechanism provided is that the default distribution TLS implementations will forward the host component of the configured address to the remote connection using the SNI extension. This is limiting as it requires that the proxy be configured in a way that always uses a valid hostname as the proxy address. Instead, this commit adds an additional setting to allow the server_name to be configured independently. This allows the proxy address to be specified as a IP literal, but the server_name specified as an arbitrary string which still must be a valid hostname. It also decouples the server_name from the requirement of being a DNS resolvable domain. --- .../common/settings/ClusterSettings.java | 3 +- .../transport/ProxyConnectionStrategy.java | 38 +++++++------------ .../transport/RemoteClusterAware.java | 2 +- .../transport/RemoteConnectionInfo.java | 8 ++-- .../transport/RemoteConnectionStrategy.java | 15 +++----- .../transport/SniffConnectionStrategy.java | 10 +---- .../ProxyConnectionStrategyTests.java | 10 ++--- 7 files changed, 31 insertions(+), 55 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 29daa0a8192..7410a05a272 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -323,10 +323,11 @@ public final class ClusterSettings extends AbstractScopedSettings { RemoteConnectionStrategy.REMOTE_CONNECTION_MODE, ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES, ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS, - ProxyConnectionStrategy.INCLUDE_SERVER_NAME, + ProxyConnectionStrategy.SERVER_NAME, SniffConnectionStrategy.SEARCH_REMOTE_CLUSTERS_SEEDS, SniffConnectionStrategy.SEARCH_REMOTE_CLUSTERS_PROXY, SniffConnectionStrategy.SEARCH_REMOTE_CONNECTIONS_PER_CLUSTER, + ProxyConnectionStrategy.SERVER_NAME, SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY, SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS, SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS, diff --git a/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java index 8432e4670aa..f75010f5660 100644 --- a/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java @@ -39,16 +39,13 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import java.util.stream.Stream; -import static org.elasticsearch.common.settings.Setting.boolSetting; import static org.elasticsearch.common.settings.Setting.intSetting; public class ProxyConnectionStrategy extends RemoteConnectionStrategy { @@ -75,12 +72,12 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy { Setting.Property.Dynamic, Setting.Property.NodeScope)); /** - * Whether to include the hostname as a server_name attribute + * A configurable server_name attribute */ - public static final Setting.AffixSetting INCLUDE_SERVER_NAME = Setting.affixKeySetting( + public static final Setting.AffixSetting SERVER_NAME = Setting.affixKeySetting( "cluster.remote.", - "include_server_name", - (ns, key) -> boolSetting(key, false, new StrategyValidator<>(ns, key, ConnectionStrategy.PROXY), + "server_name", + (ns, key) -> Setting.simpleString(key, new StrategyValidator<>(ns, key, ConnectionStrategy.PROXY), Setting.Property.Dynamic, Setting.Property.NodeScope)); static final int CHANNELS_PER_CONNECTION = 1; @@ -89,9 +86,8 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy { private static final Logger logger = LogManager.getLogger(ProxyConnectionStrategy.class); private final int maxNumConnections; - private final AtomicLong counter = new AtomicLong(0); private final String configuredAddress; - private final boolean includeServerName; + private final String configuredServerName; private final Supplier address; private final AtomicReference remoteClusterName = new AtomicReference<>(); private final ConnectionManager.ConnectionValidator clusterNameValidator; @@ -104,28 +100,28 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy { connectionManager, REMOTE_SOCKET_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(settings), REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(settings), - INCLUDE_SERVER_NAME.getConcreteSettingForNamespace(clusterAlias).get(settings)); + SERVER_NAME.getConcreteSettingForNamespace(clusterAlias).get(settings)); } ProxyConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager, int maxNumConnections, String configuredAddress) { this(clusterAlias, transportService, connectionManager, maxNumConnections, configuredAddress, - () -> resolveAddress(configuredAddress), false); + () -> resolveAddress(configuredAddress), null); } ProxyConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager, - int maxNumConnections, String configuredAddress, boolean includeServerName) { + int maxNumConnections, String configuredAddress, String configuredServerName) { this(clusterAlias, transportService, connectionManager, maxNumConnections, configuredAddress, - () -> resolveAddress(configuredAddress), includeServerName); + () -> resolveAddress(configuredAddress), configuredServerName); } ProxyConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager, int maxNumConnections, String configuredAddress, Supplier address, - boolean includeServerName) { + String configuredServerName) { super(clusterAlias, transportService, connectionManager); this.maxNumConnections = maxNumConnections; this.configuredAddress = configuredAddress; - this.includeServerName = includeServerName; + this.configuredServerName = configuredServerName; assert Strings.isEmpty(configuredAddress) == false : "Cannot use proxy connection strategy with no configured addresses"; this.address = address; this.clusterNameValidator = (newConnection, actualProfile, listener) -> @@ -217,10 +213,10 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy { for (int i = 0; i < remaining; ++i) { String id = clusterAlias + "#" + resolved; Map attributes; - if (includeServerName) { - attributes = Collections.singletonMap("server_name", resolved.address().getHostString()); - } else { + if (Strings.isNullOrEmpty(configuredServerName)) { attributes = Collections.emptyMap(); + } else { + attributes = Collections.singletonMap("server_name", configuredServerName); } DiscoveryNode node = new DiscoveryNode(id, resolved, attributes, DiscoveryNodeRole.BUILT_IN_ROLES, Version.CURRENT.minimumCompatibilityVersion()); @@ -252,12 +248,6 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy { } } - private TransportAddress nextAddress(List resolvedAddresses) { - long curr; - while ((curr = counter.getAndIncrement()) == Long.MIN_VALUE) ; - return resolvedAddresses.get(Math.toIntExact(Math.floorMod(curr, (long) resolvedAddresses.size()))); - } - private static TransportAddress resolveAddress(String address) { return new TransportAddress(parseConfiguredAddress(address)); } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java index 6aca1a783da..b3632d460ad 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java @@ -131,7 +131,7 @@ public abstract class RemoteClusterAware { SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS, ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES, ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS, - ProxyConnectionStrategy.INCLUDE_SERVER_NAME); + ProxyConnectionStrategy.SERVER_NAME); clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::validateAndUpdateRemoteCluster); } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java index 814a20ea99a..9a3c0b0ad00 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java @@ -20,7 +20,6 @@ package org.elasticsearch.transport; import org.elasticsearch.Version; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -132,12 +131,11 @@ public final class RemoteConnectionInfo implements ToXContentFragment, Writeable .stream() .map( s -> { - final Tuple hostPort = RemoteConnectionStrategy.parseHostPort(s); - assert hostPort.v2() != null : s; + final String host = RemoteConnectionStrategy.parseHost(s); + final int port = RemoteConnectionStrategy.parsePort(s); try { return new TransportAddress( - InetAddress.getByAddress(hostPort.v1(), TransportAddress.META_ADDRESS.getAddress()), - hostPort.v2()); + InetAddress.getByAddress(host, TransportAddress.META_ADDRESS.getAddress()), port); } catch (final UnknownHostException e) { throw new AssertionError(e); } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index 6a5b0ec10ef..42b7e8c31f9 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -27,7 +27,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -170,11 +169,9 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis return allConcreteSettings.map(affixSetting::getNamespace); } - static InetSocketAddress parseConfiguredAddress(String remoteHost) { - final Tuple hostPort = parseHostPort(remoteHost); - final String host = hostPort.v1(); - assert hostPort.v2() != null : remoteHost; - final int port = hostPort.v2(); + static InetSocketAddress parseConfiguredAddress(String configuredAddress) { + final String host = parseHost(configuredAddress); + final int port = parsePort(configuredAddress); InetAddress hostAddress; try { hostAddress = InetAddress.getByName(host); @@ -184,10 +181,8 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis return new InetSocketAddress(hostAddress, port); } - static Tuple parseHostPort(final String remoteHost) { - final String host = remoteHost.substring(0, indexOfPortSeparator(remoteHost)); - final int port = parsePort(remoteHost); - return Tuple.tuple(host, port); + static String parseHost(final String configuredAddress) { + return configuredAddress.substring(0, indexOfPortSeparator(configuredAddress)); } static int parsePort(String remoteHost) { diff --git a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java index 52c5a1a86ed..0489582df30 100644 --- a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java @@ -481,7 +481,7 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { Version.CURRENT.minimumCompatibilityVersion()); } else { TransportAddress transportAddress = new TransportAddress(parseConfiguredAddress(proxyAddress)); - String hostName = address.substring(0, indexOfPortSeparator(address)); + String hostName = RemoteConnectionStrategy.parseHost(proxyAddress); return new DiscoveryNode("", clusterAlias + "#" + address, UUIDs.randomBase64UUID(), hostName, address, transportAddress, Collections.singletonMap("server_name", hostName), DiscoveryNodeRole.BUILT_IN_ROLES, Version.CURRENT.minimumCompatibilityVersion()); @@ -498,14 +498,6 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { return DEFAULT_NODE_PREDICATE; } - private static int indexOfPortSeparator(String remoteHost) { - int portSeparator = remoteHost.lastIndexOf(':'); // in case we have a IPv6 address ie. [::1]:9300 - if (portSeparator == -1 || portSeparator == remoteHost.length()) { - throw new IllegalArgumentException("remote hosts need to be configured as [host:port], found [" + remoteHost + "] instead"); - } - return portSeparator; - } - private static DiscoveryNode maybeAddProxyAddress(String proxyAddress, DiscoveryNode node) { if (proxyAddress == null || proxyAddress.isEmpty()) { return node; diff --git a/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java index 0aa2c881429..4654918d898 100644 --- a/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java @@ -126,7 +126,7 @@ public class ProxyConnectionStrategyTests extends ESTestCase { try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager, - numOfConnections, address1.toString(), alternatingResolver(address1, address2, useAddress1), false)) { + numOfConnections, address1.toString(), alternatingResolver(address1, address2, useAddress1), null)) { assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2))); @@ -206,7 +206,7 @@ public class ProxyConnectionStrategyTests extends ESTestCase { try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager, - numOfConnections, address1.toString(), alternatingResolver(address1, address2, useAddress1), false)) { + numOfConnections, address1.toString(), alternatingResolver(address1, address2, useAddress1), null)) { assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2))); @@ -255,7 +255,7 @@ public class ProxyConnectionStrategyTests extends ESTestCase { int numOfConnections = randomIntBetween(4, 8); try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager, - numOfConnections, address.toString(), addressSupplier, false)) { + numOfConnections, address.toString(), addressSupplier, null)) { PlainActionFuture connectFuture = PlainActionFuture.newFuture(); strategy.connect(connectFuture); connectFuture.actionGet(); @@ -357,13 +357,13 @@ public class ProxyConnectionStrategyTests extends ESTestCase { localService.start(); localService.acceptIncomingRequests(); - String serverName = "localhost:" + address1.getPort(); + String address = "localhost:" + address1.getPort(); ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport); int numOfConnections = randomIntBetween(4, 8); try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager, - numOfConnections, serverName, true)) { + numOfConnections, address, "localhost")) { assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); PlainActionFuture connectFuture = PlainActionFuture.newFuture();