diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 29daa0a8192..7410a05a272 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -323,10 +323,11 @@ public final class ClusterSettings extends AbstractScopedSettings { RemoteConnectionStrategy.REMOTE_CONNECTION_MODE, ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES, ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS, - ProxyConnectionStrategy.INCLUDE_SERVER_NAME, + ProxyConnectionStrategy.SERVER_NAME, SniffConnectionStrategy.SEARCH_REMOTE_CLUSTERS_SEEDS, SniffConnectionStrategy.SEARCH_REMOTE_CLUSTERS_PROXY, SniffConnectionStrategy.SEARCH_REMOTE_CONNECTIONS_PER_CLUSTER, + ProxyConnectionStrategy.SERVER_NAME, SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY, SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS, SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS, diff --git a/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java index 8432e4670aa..f75010f5660 100644 --- a/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java @@ -39,16 +39,13 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import java.util.stream.Stream; -import static org.elasticsearch.common.settings.Setting.boolSetting; import static org.elasticsearch.common.settings.Setting.intSetting; public class ProxyConnectionStrategy extends RemoteConnectionStrategy { @@ -75,12 +72,12 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy { Setting.Property.Dynamic, Setting.Property.NodeScope)); /** - * Whether to include the hostname as a server_name attribute + * A configurable server_name attribute */ - public static final Setting.AffixSetting INCLUDE_SERVER_NAME = Setting.affixKeySetting( + public static final Setting.AffixSetting SERVER_NAME = Setting.affixKeySetting( "cluster.remote.", - "include_server_name", - (ns, key) -> boolSetting(key, false, new StrategyValidator<>(ns, key, ConnectionStrategy.PROXY), + "server_name", + (ns, key) -> Setting.simpleString(key, new StrategyValidator<>(ns, key, ConnectionStrategy.PROXY), Setting.Property.Dynamic, Setting.Property.NodeScope)); static final int CHANNELS_PER_CONNECTION = 1; @@ -89,9 +86,8 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy { private static final Logger logger = LogManager.getLogger(ProxyConnectionStrategy.class); private final int maxNumConnections; - private final AtomicLong counter = new AtomicLong(0); private final String configuredAddress; - private final boolean includeServerName; + private final String configuredServerName; private final Supplier address; private final AtomicReference remoteClusterName = new AtomicReference<>(); private final ConnectionManager.ConnectionValidator clusterNameValidator; @@ -104,28 +100,28 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy { connectionManager, REMOTE_SOCKET_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(settings), REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(settings), - INCLUDE_SERVER_NAME.getConcreteSettingForNamespace(clusterAlias).get(settings)); + SERVER_NAME.getConcreteSettingForNamespace(clusterAlias).get(settings)); } ProxyConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager, int maxNumConnections, String configuredAddress) { this(clusterAlias, transportService, connectionManager, maxNumConnections, configuredAddress, - () -> resolveAddress(configuredAddress), false); + () -> resolveAddress(configuredAddress), null); } ProxyConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager, - int maxNumConnections, String configuredAddress, boolean includeServerName) { + int maxNumConnections, String configuredAddress, String configuredServerName) { this(clusterAlias, transportService, connectionManager, maxNumConnections, configuredAddress, - () -> resolveAddress(configuredAddress), includeServerName); + () -> resolveAddress(configuredAddress), configuredServerName); } ProxyConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager, int maxNumConnections, String configuredAddress, Supplier address, - boolean includeServerName) { + String configuredServerName) { super(clusterAlias, transportService, connectionManager); this.maxNumConnections = maxNumConnections; this.configuredAddress = configuredAddress; - this.includeServerName = includeServerName; + this.configuredServerName = configuredServerName; assert Strings.isEmpty(configuredAddress) == false : "Cannot use proxy connection strategy with no configured addresses"; this.address = address; this.clusterNameValidator = (newConnection, actualProfile, listener) -> @@ -217,10 +213,10 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy { for (int i = 0; i < remaining; ++i) { String id = clusterAlias + "#" + resolved; Map attributes; - if (includeServerName) { - attributes = Collections.singletonMap("server_name", resolved.address().getHostString()); - } else { + if (Strings.isNullOrEmpty(configuredServerName)) { attributes = Collections.emptyMap(); + } else { + attributes = Collections.singletonMap("server_name", configuredServerName); } DiscoveryNode node = new DiscoveryNode(id, resolved, attributes, DiscoveryNodeRole.BUILT_IN_ROLES, Version.CURRENT.minimumCompatibilityVersion()); @@ -252,12 +248,6 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy { } } - private TransportAddress nextAddress(List resolvedAddresses) { - long curr; - while ((curr = counter.getAndIncrement()) == Long.MIN_VALUE) ; - return resolvedAddresses.get(Math.toIntExact(Math.floorMod(curr, (long) resolvedAddresses.size()))); - } - private static TransportAddress resolveAddress(String address) { return new TransportAddress(parseConfiguredAddress(address)); } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java index 6aca1a783da..b3632d460ad 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java @@ -131,7 +131,7 @@ public abstract class RemoteClusterAware { SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS, ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES, ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS, - ProxyConnectionStrategy.INCLUDE_SERVER_NAME); + ProxyConnectionStrategy.SERVER_NAME); clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::validateAndUpdateRemoteCluster); } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java index 814a20ea99a..9a3c0b0ad00 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java @@ -20,7 +20,6 @@ package org.elasticsearch.transport; import org.elasticsearch.Version; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -132,12 +131,11 @@ public final class RemoteConnectionInfo implements ToXContentFragment, Writeable .stream() .map( s -> { - final Tuple hostPort = RemoteConnectionStrategy.parseHostPort(s); - assert hostPort.v2() != null : s; + final String host = RemoteConnectionStrategy.parseHost(s); + final int port = RemoteConnectionStrategy.parsePort(s); try { return new TransportAddress( - InetAddress.getByAddress(hostPort.v1(), TransportAddress.META_ADDRESS.getAddress()), - hostPort.v2()); + InetAddress.getByAddress(host, TransportAddress.META_ADDRESS.getAddress()), port); } catch (final UnknownHostException e) { throw new AssertionError(e); } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index 6a5b0ec10ef..42b7e8c31f9 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -27,7 +27,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -170,11 +169,9 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis return allConcreteSettings.map(affixSetting::getNamespace); } - static InetSocketAddress parseConfiguredAddress(String remoteHost) { - final Tuple hostPort = parseHostPort(remoteHost); - final String host = hostPort.v1(); - assert hostPort.v2() != null : remoteHost; - final int port = hostPort.v2(); + static InetSocketAddress parseConfiguredAddress(String configuredAddress) { + final String host = parseHost(configuredAddress); + final int port = parsePort(configuredAddress); InetAddress hostAddress; try { hostAddress = InetAddress.getByName(host); @@ -184,10 +181,8 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis return new InetSocketAddress(hostAddress, port); } - static Tuple parseHostPort(final String remoteHost) { - final String host = remoteHost.substring(0, indexOfPortSeparator(remoteHost)); - final int port = parsePort(remoteHost); - return Tuple.tuple(host, port); + static String parseHost(final String configuredAddress) { + return configuredAddress.substring(0, indexOfPortSeparator(configuredAddress)); } static int parsePort(String remoteHost) { diff --git a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java index 52c5a1a86ed..0489582df30 100644 --- a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java @@ -481,7 +481,7 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { Version.CURRENT.minimumCompatibilityVersion()); } else { TransportAddress transportAddress = new TransportAddress(parseConfiguredAddress(proxyAddress)); - String hostName = address.substring(0, indexOfPortSeparator(address)); + String hostName = RemoteConnectionStrategy.parseHost(proxyAddress); return new DiscoveryNode("", clusterAlias + "#" + address, UUIDs.randomBase64UUID(), hostName, address, transportAddress, Collections.singletonMap("server_name", hostName), DiscoveryNodeRole.BUILT_IN_ROLES, Version.CURRENT.minimumCompatibilityVersion()); @@ -498,14 +498,6 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { return DEFAULT_NODE_PREDICATE; } - private static int indexOfPortSeparator(String remoteHost) { - int portSeparator = remoteHost.lastIndexOf(':'); // in case we have a IPv6 address ie. [::1]:9300 - if (portSeparator == -1 || portSeparator == remoteHost.length()) { - throw new IllegalArgumentException("remote hosts need to be configured as [host:port], found [" + remoteHost + "] instead"); - } - return portSeparator; - } - private static DiscoveryNode maybeAddProxyAddress(String proxyAddress, DiscoveryNode node) { if (proxyAddress == null || proxyAddress.isEmpty()) { return node; diff --git a/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java index 0aa2c881429..4654918d898 100644 --- a/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java @@ -126,7 +126,7 @@ public class ProxyConnectionStrategyTests extends ESTestCase { try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager, - numOfConnections, address1.toString(), alternatingResolver(address1, address2, useAddress1), false)) { + numOfConnections, address1.toString(), alternatingResolver(address1, address2, useAddress1), null)) { assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2))); @@ -206,7 +206,7 @@ public class ProxyConnectionStrategyTests extends ESTestCase { try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager, - numOfConnections, address1.toString(), alternatingResolver(address1, address2, useAddress1), false)) { + numOfConnections, address1.toString(), alternatingResolver(address1, address2, useAddress1), null)) { assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2))); @@ -255,7 +255,7 @@ public class ProxyConnectionStrategyTests extends ESTestCase { int numOfConnections = randomIntBetween(4, 8); try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager, - numOfConnections, address.toString(), addressSupplier, false)) { + numOfConnections, address.toString(), addressSupplier, null)) { PlainActionFuture connectFuture = PlainActionFuture.newFuture(); strategy.connect(connectFuture); connectFuture.actionGet(); @@ -357,13 +357,13 @@ public class ProxyConnectionStrategyTests extends ESTestCase { localService.start(); localService.acceptIncomingRequests(); - String serverName = "localhost:" + address1.getPort(); + String address = "localhost:" + address1.getPort(); ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport); int numOfConnections = randomIntBetween(4, 8); try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager, - numOfConnections, serverName, true)) { + numOfConnections, address, "localhost")) { assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); PlainActionFuture connectFuture = PlainActionFuture.newFuture();