Allow proxy mode server name to be configured (#50951)
Currently, proxy mode allows a remote cluster connection to be setup by expecting all open connections to be routed through an intermediate proxy. The proxy must use some logic to ensure that the connections end up on the correct remote cluster. One mechanism provided is that the default distribution TLS implementations will forward the host component of the configured address to the remote connection using the SNI extension. This is limiting as it requires that the proxy be configured in a way that always uses a valid hostname as the proxy address. Instead, this commit adds an additional setting to allow the server_name to be configured independently. This allows the proxy address to be specified as a IP literal, but the server_name specified as an arbitrary string which still must be a valid hostname. It also decouples the server_name from the requirement of being a DNS resolvable domain.
This commit is contained in:
parent
1fe2d76a91
commit
6e7478b846
|
@ -323,10 +323,11 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
|||
RemoteConnectionStrategy.REMOTE_CONNECTION_MODE,
|
||||
ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES,
|
||||
ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS,
|
||||
ProxyConnectionStrategy.INCLUDE_SERVER_NAME,
|
||||
ProxyConnectionStrategy.SERVER_NAME,
|
||||
SniffConnectionStrategy.SEARCH_REMOTE_CLUSTERS_SEEDS,
|
||||
SniffConnectionStrategy.SEARCH_REMOTE_CLUSTERS_PROXY,
|
||||
SniffConnectionStrategy.SEARCH_REMOTE_CONNECTIONS_PER_CLUSTER,
|
||||
ProxyConnectionStrategy.SERVER_NAME,
|
||||
SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY,
|
||||
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS,
|
||||
SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS,
|
||||
|
|
|
@ -39,16 +39,13 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.elasticsearch.common.settings.Setting.boolSetting;
|
||||
import static org.elasticsearch.common.settings.Setting.intSetting;
|
||||
|
||||
public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
|
||||
|
@ -75,12 +72,12 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
|
|||
Setting.Property.Dynamic, Setting.Property.NodeScope));
|
||||
|
||||
/**
|
||||
* Whether to include the hostname as a server_name attribute
|
||||
* A configurable server_name attribute
|
||||
*/
|
||||
public static final Setting.AffixSetting<Boolean> INCLUDE_SERVER_NAME = Setting.affixKeySetting(
|
||||
public static final Setting.AffixSetting<String> SERVER_NAME = Setting.affixKeySetting(
|
||||
"cluster.remote.",
|
||||
"include_server_name",
|
||||
(ns, key) -> boolSetting(key, false, new StrategyValidator<>(ns, key, ConnectionStrategy.PROXY),
|
||||
"server_name",
|
||||
(ns, key) -> Setting.simpleString(key, new StrategyValidator<>(ns, key, ConnectionStrategy.PROXY),
|
||||
Setting.Property.Dynamic, Setting.Property.NodeScope));
|
||||
|
||||
static final int CHANNELS_PER_CONNECTION = 1;
|
||||
|
@ -89,9 +86,8 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
|
|||
private static final Logger logger = LogManager.getLogger(ProxyConnectionStrategy.class);
|
||||
|
||||
private final int maxNumConnections;
|
||||
private final AtomicLong counter = new AtomicLong(0);
|
||||
private final String configuredAddress;
|
||||
private final boolean includeServerName;
|
||||
private final String configuredServerName;
|
||||
private final Supplier<TransportAddress> address;
|
||||
private final AtomicReference<ClusterName> remoteClusterName = new AtomicReference<>();
|
||||
private final ConnectionManager.ConnectionValidator clusterNameValidator;
|
||||
|
@ -104,28 +100,28 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
|
|||
connectionManager,
|
||||
REMOTE_SOCKET_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(settings),
|
||||
REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(settings),
|
||||
INCLUDE_SERVER_NAME.getConcreteSettingForNamespace(clusterAlias).get(settings));
|
||||
SERVER_NAME.getConcreteSettingForNamespace(clusterAlias).get(settings));
|
||||
}
|
||||
|
||||
ProxyConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
|
||||
int maxNumConnections, String configuredAddress) {
|
||||
this(clusterAlias, transportService, connectionManager, maxNumConnections, configuredAddress,
|
||||
() -> resolveAddress(configuredAddress), false);
|
||||
() -> resolveAddress(configuredAddress), null);
|
||||
}
|
||||
|
||||
ProxyConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
|
||||
int maxNumConnections, String configuredAddress, boolean includeServerName) {
|
||||
int maxNumConnections, String configuredAddress, String configuredServerName) {
|
||||
this(clusterAlias, transportService, connectionManager, maxNumConnections, configuredAddress,
|
||||
() -> resolveAddress(configuredAddress), includeServerName);
|
||||
() -> resolveAddress(configuredAddress), configuredServerName);
|
||||
}
|
||||
|
||||
ProxyConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
|
||||
int maxNumConnections, String configuredAddress, Supplier<TransportAddress> address,
|
||||
boolean includeServerName) {
|
||||
String configuredServerName) {
|
||||
super(clusterAlias, transportService, connectionManager);
|
||||
this.maxNumConnections = maxNumConnections;
|
||||
this.configuredAddress = configuredAddress;
|
||||
this.includeServerName = includeServerName;
|
||||
this.configuredServerName = configuredServerName;
|
||||
assert Strings.isEmpty(configuredAddress) == false : "Cannot use proxy connection strategy with no configured addresses";
|
||||
this.address = address;
|
||||
this.clusterNameValidator = (newConnection, actualProfile, listener) ->
|
||||
|
@ -217,10 +213,10 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
|
|||
for (int i = 0; i < remaining; ++i) {
|
||||
String id = clusterAlias + "#" + resolved;
|
||||
Map<String, String> attributes;
|
||||
if (includeServerName) {
|
||||
attributes = Collections.singletonMap("server_name", resolved.address().getHostString());
|
||||
} else {
|
||||
if (Strings.isNullOrEmpty(configuredServerName)) {
|
||||
attributes = Collections.emptyMap();
|
||||
} else {
|
||||
attributes = Collections.singletonMap("server_name", configuredServerName);
|
||||
}
|
||||
DiscoveryNode node = new DiscoveryNode(id, resolved, attributes, DiscoveryNodeRole.BUILT_IN_ROLES,
|
||||
Version.CURRENT.minimumCompatibilityVersion());
|
||||
|
@ -252,12 +248,6 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
|
|||
}
|
||||
}
|
||||
|
||||
private TransportAddress nextAddress(List<TransportAddress> resolvedAddresses) {
|
||||
long curr;
|
||||
while ((curr = counter.getAndIncrement()) == Long.MIN_VALUE) ;
|
||||
return resolvedAddresses.get(Math.toIntExact(Math.floorMod(curr, (long) resolvedAddresses.size())));
|
||||
}
|
||||
|
||||
private static TransportAddress resolveAddress(String address) {
|
||||
return new TransportAddress(parseConfiguredAddress(address));
|
||||
}
|
||||
|
|
|
@ -131,7 +131,7 @@ public abstract class RemoteClusterAware {
|
|||
SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS,
|
||||
ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES,
|
||||
ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS,
|
||||
ProxyConnectionStrategy.INCLUDE_SERVER_NAME);
|
||||
ProxyConnectionStrategy.SERVER_NAME);
|
||||
clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::validateAndUpdateRemoteCluster);
|
||||
}
|
||||
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
|
@ -132,12 +131,11 @@ public final class RemoteConnectionInfo implements ToXContentFragment, Writeable
|
|||
.stream()
|
||||
.map(
|
||||
s -> {
|
||||
final Tuple<String, Integer> hostPort = RemoteConnectionStrategy.parseHostPort(s);
|
||||
assert hostPort.v2() != null : s;
|
||||
final String host = RemoteConnectionStrategy.parseHost(s);
|
||||
final int port = RemoteConnectionStrategy.parsePort(s);
|
||||
try {
|
||||
return new TransportAddress(
|
||||
InetAddress.getByAddress(hostPort.v1(), TransportAddress.META_ADDRESS.getAddress()),
|
||||
hostPort.v2());
|
||||
InetAddress.getByAddress(host, TransportAddress.META_ADDRESS.getAddress()), port);
|
||||
} catch (final UnknownHostException e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
|
|
|
@ -27,7 +27,6 @@ import org.elasticsearch.action.ActionListener;
|
|||
import org.elasticsearch.action.support.ContextPreservingActionListener;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -170,11 +169,9 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis
|
|||
return allConcreteSettings.map(affixSetting::getNamespace);
|
||||
}
|
||||
|
||||
static InetSocketAddress parseConfiguredAddress(String remoteHost) {
|
||||
final Tuple<String, Integer> hostPort = parseHostPort(remoteHost);
|
||||
final String host = hostPort.v1();
|
||||
assert hostPort.v2() != null : remoteHost;
|
||||
final int port = hostPort.v2();
|
||||
static InetSocketAddress parseConfiguredAddress(String configuredAddress) {
|
||||
final String host = parseHost(configuredAddress);
|
||||
final int port = parsePort(configuredAddress);
|
||||
InetAddress hostAddress;
|
||||
try {
|
||||
hostAddress = InetAddress.getByName(host);
|
||||
|
@ -184,10 +181,8 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis
|
|||
return new InetSocketAddress(hostAddress, port);
|
||||
}
|
||||
|
||||
static Tuple<String, Integer> parseHostPort(final String remoteHost) {
|
||||
final String host = remoteHost.substring(0, indexOfPortSeparator(remoteHost));
|
||||
final int port = parsePort(remoteHost);
|
||||
return Tuple.tuple(host, port);
|
||||
static String parseHost(final String configuredAddress) {
|
||||
return configuredAddress.substring(0, indexOfPortSeparator(configuredAddress));
|
||||
}
|
||||
|
||||
static int parsePort(String remoteHost) {
|
||||
|
|
|
@ -481,7 +481,7 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
|
|||
Version.CURRENT.minimumCompatibilityVersion());
|
||||
} else {
|
||||
TransportAddress transportAddress = new TransportAddress(parseConfiguredAddress(proxyAddress));
|
||||
String hostName = address.substring(0, indexOfPortSeparator(address));
|
||||
String hostName = RemoteConnectionStrategy.parseHost(proxyAddress);
|
||||
return new DiscoveryNode("", clusterAlias + "#" + address, UUIDs.randomBase64UUID(), hostName, address,
|
||||
transportAddress, Collections.singletonMap("server_name", hostName), DiscoveryNodeRole.BUILT_IN_ROLES,
|
||||
Version.CURRENT.minimumCompatibilityVersion());
|
||||
|
@ -498,14 +498,6 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
|
|||
return DEFAULT_NODE_PREDICATE;
|
||||
}
|
||||
|
||||
private static int indexOfPortSeparator(String remoteHost) {
|
||||
int portSeparator = remoteHost.lastIndexOf(':'); // in case we have a IPv6 address ie. [::1]:9300
|
||||
if (portSeparator == -1 || portSeparator == remoteHost.length()) {
|
||||
throw new IllegalArgumentException("remote hosts need to be configured as [host:port], found [" + remoteHost + "] instead");
|
||||
}
|
||||
return portSeparator;
|
||||
}
|
||||
|
||||
private static DiscoveryNode maybeAddProxyAddress(String proxyAddress, DiscoveryNode node) {
|
||||
if (proxyAddress == null || proxyAddress.isEmpty()) {
|
||||
return node;
|
||||
|
|
|
@ -126,7 +126,7 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
|
|||
|
||||
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
|
||||
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
|
||||
numOfConnections, address1.toString(), alternatingResolver(address1, address2, useAddress1), false)) {
|
||||
numOfConnections, address1.toString(), alternatingResolver(address1, address2, useAddress1), null)) {
|
||||
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
|
||||
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2)));
|
||||
|
||||
|
@ -206,7 +206,7 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
|
|||
|
||||
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
|
||||
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
|
||||
numOfConnections, address1.toString(), alternatingResolver(address1, address2, useAddress1), false)) {
|
||||
numOfConnections, address1.toString(), alternatingResolver(address1, address2, useAddress1), null)) {
|
||||
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
|
||||
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2)));
|
||||
|
||||
|
@ -255,7 +255,7 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
|
|||
int numOfConnections = randomIntBetween(4, 8);
|
||||
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
|
||||
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
|
||||
numOfConnections, address.toString(), addressSupplier, false)) {
|
||||
numOfConnections, address.toString(), addressSupplier, null)) {
|
||||
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
|
||||
strategy.connect(connectFuture);
|
||||
connectFuture.actionGet();
|
||||
|
@ -357,13 +357,13 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
|
|||
localService.start();
|
||||
localService.acceptIncomingRequests();
|
||||
|
||||
String serverName = "localhost:" + address1.getPort();
|
||||
String address = "localhost:" + address1.getPort();
|
||||
|
||||
ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport);
|
||||
int numOfConnections = randomIntBetween(4, 8);
|
||||
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
|
||||
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
|
||||
numOfConnections, serverName, true)) {
|
||||
numOfConnections, address, "localhost")) {
|
||||
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
|
||||
|
||||
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
|
||||
|
|
Loading…
Reference in New Issue