Add proxy support to RemoteClusterConnection (#33062)

This adds support for connecting to a remote cluster through
a tcp proxy. A remote cluster can configured with an additional
`search.remote.$clustername.proxy` setting. This proxy will be used
to connect to remote nodes for every node connection established.
We still try to sniff the remote clsuter and connect to nodes directly
through the proxy which has to support some kind of routing to these nodes.
Yet, this routing mechanism requires the handshake request to include some
kind of information where to route to which is not yet implemented. The effort
to use the hostname and an optional node attribute for routing is tracked
in #32517

Closes #31840
This commit is contained in:
Simon Willnauer 2018-08-25 20:41:32 +02:00 committed by GitHub
parent 9dad82ece8
commit 3376922e8b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 334 additions and 64 deletions

View File

@ -272,6 +272,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING,
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS,
RemoteClusterAware.REMOTE_CLUSTERS_PROXY,
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE,
RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER,
RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING,

View File

@ -1009,6 +1009,10 @@ public class Setting<T> implements ToXContentObject {
return new Setting<>(key, s -> "", Function.identity(), properties);
}
public static Setting<String> simpleString(String key, Function<String, String> parser, Property... properties) {
return new Setting<>(key, s -> "", parser, properties);
}
public static Setting<String> simpleString(String key, Setting<String> fallback, Property... properties) {
return new Setting<>(key, fallback, Function.identity(), properties);
}

View File

@ -18,10 +18,14 @@
*/
package org.elasticsearch.transport;
import java.util.EnumSet;
import java.util.function.Supplier;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.ClusterNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
@ -66,6 +70,22 @@ public abstract class RemoteClusterAware extends AbstractComponent {
public static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':';
public static final String LOCAL_CLUSTER_GROUP_KEY = "";
/**
* A proxy address for the remote cluster.
* NOTE: this settings is undocumented until we have at last one transport that supports passing
* on the hostname via a mechanism like SNI.
*/
public static final Setting.AffixSetting<String> REMOTE_CLUSTERS_PROXY = Setting.affixKeySetting(
"search.remote.",
"proxy",
key -> Setting.simpleString(key, s -> {
if (Strings.hasLength(s)) {
parsePort(s);
}
return s;
}, Setting.Property.NodeScope, Setting.Property.Dynamic), REMOTE_CLUSTERS_SEEDS);
protected final ClusterNameExpressionResolver clusterNameResolver;
/**
@ -77,25 +97,42 @@ public abstract class RemoteClusterAware extends AbstractComponent {
this.clusterNameResolver = new ClusterNameExpressionResolver(settings);
}
protected static Map<String, List<Supplier<DiscoveryNode>>> buildRemoteClustersSeeds(Settings settings) {
/**
* Builds the dynamic per-cluster config from the given settings. This is a map keyed by the cluster alias that points to a tuple
* (ProxyAddresss, [SeedNodeSuppliers]). If a cluster is configured with a proxy address all seed nodes will point to
* {@link TransportAddress#META_ADDRESS} and their configured address will be used as the hostname for the generated discovery node.
*/
protected static Map<String, Tuple<String, List<Supplier<DiscoveryNode>>>> buildRemoteClustersDynamicConfig(Settings settings) {
Stream<Setting<List<String>>> allConcreteSettings = REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings);
return allConcreteSettings.collect(
Collectors.toMap(REMOTE_CLUSTERS_SEEDS::getNamespace, concreteSetting -> {
String clusterName = REMOTE_CLUSTERS_SEEDS.getNamespace(concreteSetting);
List<String> addresses = concreteSetting.get(settings);
final boolean proxyMode = REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterName).exists(settings);
List<Supplier<DiscoveryNode>> nodes = new ArrayList<>(addresses.size());
for (String address : addresses) {
nodes.add(() -> {
TransportAddress transportAddress = new TransportAddress(RemoteClusterAware.parseSeedAddress(address));
return new DiscoveryNode(clusterName + "#" + transportAddress.toString(),
transportAddress,
Version.CURRENT.minimumCompatibilityVersion());
});
nodes.add(() -> buildSeedNode(clusterName, address, proxyMode));
}
return nodes;
return new Tuple<>(REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterName).get(settings), nodes);
}));
}
static DiscoveryNode buildSeedNode(String clusterName, String address, boolean proxyMode) {
if (proxyMode) {
TransportAddress transportAddress = new TransportAddress(TransportAddress.META_ADDRESS, 0);
String hostName = address.substring(0, indexOfPortSeparator(address));
return new DiscoveryNode("", clusterName + "#" + address, UUIDs.randomBase64UUID(), hostName, address,
transportAddress, Collections
.emptyMap(), EnumSet.allOf(DiscoveryNode.Role.class),
Version.CURRENT.minimumCompatibilityVersion());
} else {
TransportAddress transportAddress = new TransportAddress(RemoteClusterAware.parseSeedAddress(address));
return new DiscoveryNode(clusterName + "#" + transportAddress.toString(),
transportAddress,
Version.CURRENT.minimumCompatibilityVersion());
}
}
/**
* Groups indices per cluster by splitting remote cluster-alias, index-name pairs on {@link #REMOTE_CLUSTER_INDEX_SEPARATOR}. All
* indices per cluster are collected as a list in the returned map keyed by the cluster alias. Local indices are grouped under
@ -138,20 +175,24 @@ public abstract class RemoteClusterAware extends AbstractComponent {
protected abstract Set<String> getRemoteClusterNames();
/**
* Subclasses must implement this to receive information about updated cluster aliases. If the given address list is
* empty the cluster alias is unregistered and should be removed.
*/
protected abstract void updateRemoteCluster(String clusterAlias, List<String> addresses);
protected abstract void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxy);
/**
* Registers this instance to listen to updates on the cluster settings.
*/
public void listenForUpdates(ClusterSettings clusterSettings) {
clusterSettings.addAffixUpdateConsumer(RemoteClusterAware.REMOTE_CLUSTERS_SEEDS, this::updateRemoteCluster,
clusterSettings.addAffixUpdateConsumer(RemoteClusterAware.REMOTE_CLUSTERS_PROXY,
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS,
(key, value) -> updateRemoteCluster(key, value.v2(), value.v1()),
(namespace, value) -> {});
}
protected static InetSocketAddress parseSeedAddress(String remoteHost) {
String host = remoteHost.substring(0, indexOfPortSeparator(remoteHost));
InetAddress hostAddress;

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.transport;
import java.net.InetSocketAddress;
import java.util.function.Supplier;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
@ -88,6 +89,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
private final int maxNumRemoteConnections;
private final Predicate<DiscoveryNode> nodePredicate;
private final ThreadPool threadPool;
private volatile String proxyAddress;
private volatile List<Supplier<DiscoveryNode>> seedNodes;
private volatile boolean skipUnavailable;
private final ConnectHandler connectHandler;
@ -106,6 +108,13 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
RemoteClusterConnection(Settings settings, String clusterAlias, List<Supplier<DiscoveryNode>> seedNodes,
TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections,
Predicate<DiscoveryNode> nodePredicate) {
this(settings, clusterAlias, seedNodes, transportService, connectionManager, maxNumRemoteConnections, nodePredicate, null);
}
RemoteClusterConnection(Settings settings, String clusterAlias, List<Supplier<DiscoveryNode>> seedNodes,
TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections, Predicate<DiscoveryNode>
nodePredicate,
String proxyAddress) {
super(settings);
this.transportService = transportService;
this.maxNumRemoteConnections = maxNumRemoteConnections;
@ -130,13 +139,26 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
connectionManager.addListener(this);
// we register the transport service here as a listener to make sure we notify handlers on disconnect etc.
connectionManager.addListener(transportService);
this.proxyAddress = proxyAddress;
}
private static DiscoveryNode maybeAddProxyAddress(String proxyAddress, DiscoveryNode node) {
if (proxyAddress == null || proxyAddress.isEmpty()) {
return node;
} else {
// resovle proxy address lazy here
InetSocketAddress proxyInetAddress = RemoteClusterAware.parseSeedAddress(proxyAddress);
return new DiscoveryNode(node.getName(), node.getId(), node.getEphemeralId(), node.getHostName(), node
.getHostAddress(), new TransportAddress(proxyInetAddress), node.getAttributes(), node.getRoles(), node.getVersion());
}
}
/**
* Updates the list of seed nodes for this cluster connection
*/
synchronized void updateSeedNodes(List<Supplier<DiscoveryNode>> seedNodes, ActionListener<Void> connectListener) {
synchronized void updateSeedNodes(String proxyAddress, List<Supplier<DiscoveryNode>> seedNodes, ActionListener<Void> connectListener) {
this.seedNodes = Collections.unmodifiableList(new ArrayList<>(seedNodes));
this.proxyAddress = proxyAddress;
connectHandler.connect(connectListener);
}
@ -281,6 +303,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
return new ProxyConnection(connection, remoteClusterNode);
}
static final class ProxyConnection implements Transport.Connection {
private final Transport.Connection proxyConnection;
private final DiscoveryNode targetNode;
@ -461,7 +484,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
try {
if (seedNodes.hasNext()) {
cancellableThreads.executeIO(() -> {
final DiscoveryNode seedNode = seedNodes.next().get();
final DiscoveryNode seedNode = maybeAddProxyAddress(proxyAddress, seedNodes.next().get());
final TransportService.HandshakeResponse handshakeResponse;
Transport.Connection connection = manager.openConnection(seedNode,
ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null));
@ -476,7 +499,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
throw ex;
}
final DiscoveryNode handshakeNode = handshakeResponse.getDiscoveryNode();
final DiscoveryNode handshakeNode = maybeAddProxyAddress(proxyAddress, handshakeResponse.getDiscoveryNode());
if (nodePredicate.test(handshakeNode) && connectedNodes.size() < maxNumRemoteConnections) {
manager.connectToNode(handshakeNode, remoteProfile, transportService.connectionValidator(handshakeNode));
if (remoteClusterName.get() == null) {
@ -583,7 +606,8 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
cancellableThreads.executeIO(() -> {
DiscoveryNodes nodes = response.getState().nodes();
Iterable<DiscoveryNode> nodesIter = nodes.getNodes()::valuesIt;
for (DiscoveryNode node : nodesIter) {
for (DiscoveryNode n : nodesIter) {
DiscoveryNode node = maybeAddProxyAddress(proxyAddress, n);
if (nodePredicate.test(node) && connectedNodes.size() < maxNumRemoteConnections) {
try {
connectionManager.connectToNode(node, remoteProfile,
@ -646,7 +670,8 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
* Get the information about remote nodes to be rendered on {@code _remote/info} requests.
*/
public RemoteConnectionInfo getConnectionInfo() {
List<TransportAddress> seedNodeAddresses = seedNodes.stream().map(node -> node.get().getAddress()).collect(Collectors.toList());
List<TransportAddress> seedNodeAddresses = seedNodes.stream().map(node -> node.get().getAddress()).collect
(Collectors.toList());
TimeValue initialConnectionTimeout = RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings);
return new RemoteConnectionInfo(clusterAlias, seedNodeAddresses, maxNumRemoteConnections, connectedNodes.size(),
initialConnectionTimeout, skipUnavailable);

View File

@ -31,10 +31,10 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.core.internal.io.IOUtils;
@ -116,8 +116,8 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
* @param seeds a cluster alias to discovery node mapping representing the remote clusters seeds nodes
* @param connectionListener a listener invoked once every configured cluster has been connected to
*/
private synchronized void updateRemoteClusters(Map<String, List<Supplier<DiscoveryNode>>> seeds,
ActionListener<Void> connectionListener) {
private synchronized void updateRemoteClusters(Map<String, Tuple<String, List<Supplier<DiscoveryNode>>>> seeds,
ActionListener<Void> connectionListener) {
if (seeds.containsKey(LOCAL_CLUSTER_GROUP_KEY)) {
throw new IllegalArgumentException("remote clusters must not have the empty string as its key");
}
@ -127,9 +127,12 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
} else {
CountDown countDown = new CountDown(seeds.size());
remoteClusters.putAll(this.remoteClusters);
for (Map.Entry<String, List<Supplier<DiscoveryNode>>> entry : seeds.entrySet()) {
for (Map.Entry<String, Tuple<String, List<Supplier<DiscoveryNode>>>> entry : seeds.entrySet()) {
List<Supplier<DiscoveryNode>> seedList = entry.getValue().v2();
String proxyAddress = entry.getValue().v1();
RemoteClusterConnection remote = this.remoteClusters.get(entry.getKey());
if (entry.getValue().isEmpty()) { // with no seed nodes we just remove the connection
if (seedList.isEmpty()) { // with no seed nodes we just remove the connection
try {
IOUtils.close(remote);
} catch (IOException e) {
@ -140,15 +143,15 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
}
if (remote == null) { // this is a new cluster we have to add a new representation
remote = new RemoteClusterConnection(settings, entry.getKey(), entry.getValue(), transportService,
remote = new RemoteClusterConnection(settings, entry.getKey(), seedList, transportService,
new ConnectionManager(settings, transportService.transport, transportService.threadPool), numRemoteConnections,
getNodePredicate(settings));
getNodePredicate(settings), proxyAddress);
remoteClusters.put(entry.getKey(), remote);
}
// now update the seed nodes no matter if it's new or already existing
RemoteClusterConnection finalRemote = remote;
remote.updateSeedNodes(entry.getValue(), ActionListener.wrap(
remote.updateSeedNodes(proxyAddress, seedList, ActionListener.wrap(
response -> {
if (countDown.countDown()) {
connectionListener.onResponse(response);
@ -302,8 +305,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
@Override
public void listenForUpdates(ClusterSettings clusterSettings) {
super.listenForUpdates(clusterSettings);
clusterSettings.addAffixUpdateConsumer(REMOTE_CLUSTER_SKIP_UNAVAILABLE, this::updateSkipUnavailable,
(clusterAlias, value) -> {});
clusterSettings.addAffixUpdateConsumer(REMOTE_CLUSTER_SKIP_UNAVAILABLE, this::updateSkipUnavailable, (alias, value) -> {});
}
synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavailable) {
@ -313,22 +315,21 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
}
}
@Override
protected void updateRemoteCluster(String clusterAlias, List<String> addresses) {
updateRemoteCluster(clusterAlias, addresses, ActionListener.wrap((x) -> {}, (x) -> {}));
protected void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxyAddress) {
updateRemoteCluster(clusterAlias, addresses, proxyAddress, ActionListener.wrap((x) -> {}, (x) -> {}));
}
void updateRemoteCluster(
final String clusterAlias,
final List<String> addresses,
final String proxyAddress,
final ActionListener<Void> connectionListener) {
final List<Supplier<DiscoveryNode>> nodes = addresses.stream().<Supplier<DiscoveryNode>>map(address -> () -> {
final TransportAddress transportAddress = new TransportAddress(RemoteClusterAware.parseSeedAddress(address));
final String id = clusterAlias + "#" + transportAddress.toString();
final Version version = Version.CURRENT.minimumCompatibilityVersion();
return new DiscoveryNode(id, transportAddress, version);
}).collect(Collectors.toList());
updateRemoteClusters(Collections.singletonMap(clusterAlias, nodes), connectionListener);
final List<Supplier<DiscoveryNode>> nodes = addresses.stream().<Supplier<DiscoveryNode>>map(address -> () ->
buildSeedNode(clusterAlias, address, Strings.hasLength(proxyAddress))
).collect(Collectors.toList());
updateRemoteClusters(Collections.singletonMap(clusterAlias, new Tuple<>(proxyAddress, nodes)), connectionListener);
}
/**
@ -338,7 +339,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
void initializeRemoteClusters() {
final TimeValue timeValue = REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings);
final PlainActionFuture<Void> future = new PlainActionFuture<>();
Map<String, List<Supplier<DiscoveryNode>>> seeds = RemoteClusterAware.buildRemoteClustersSeeds(settings);
Map<String, Tuple<String, List<Supplier<DiscoveryNode>>>> seeds = RemoteClusterAware.buildRemoteClustersDynamicConfig(settings);
updateRemoteClusters(seeds, future);
try {
future.get(timeValue.millis(), TimeUnit.MILLISECONDS);

View File

@ -18,6 +18,8 @@
*/
package org.elasticsearch.transport;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.Version;
@ -52,6 +54,7 @@ import org.elasticsearch.mocksocket.MockServerSocket;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.test.transport.StubbableTransport;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
@ -378,15 +381,19 @@ public class RemoteClusterConnectionTests extends ESTestCase {
}
}
}
private void updateSeedNodes(RemoteClusterConnection connection, List<Supplier<DiscoveryNode>> seedNodes) throws Exception {
updateSeedNodes(connection, seedNodes, null);
}
private void updateSeedNodes(RemoteClusterConnection connection, List<Supplier<DiscoveryNode>> seedNodes, String proxyAddress)
throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<>();
ActionListener<Void> listener = ActionListener.wrap(x -> latch.countDown(), x -> {
exceptionAtomicReference.set(x);
latch.countDown();
});
connection.updateSeedNodes(seedNodes, listener);
connection.updateSeedNodes(proxyAddress, seedNodes, listener);
latch.await();
if (exceptionAtomicReference.get() != null) {
throw exceptionAtomicReference.get();
@ -517,7 +524,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
exceptionReference.set(x);
listenerCalled.countDown();
});
connection.updateSeedNodes(Arrays.asList(() -> seedNode), listener);
connection.updateSeedNodes(null, Arrays.asList(() -> seedNode), listener);
acceptedLatch.await();
connection.close(); // now close it, this should trigger an interrupt on the socket and we can move on
assertTrue(connection.assertNoRunningConnections());
@ -787,7 +794,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
throw new AssertionError(x);
}
});
connection.updateSeedNodes(seedNodes, listener);
connection.updateSeedNodes(null, seedNodes, listener);
}
latch.await();
} catch (Exception ex) {
@ -875,7 +882,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
}
});
try {
connection.updateSeedNodes(seedNodes, listener);
connection.updateSeedNodes(null, seedNodes, listener);
} catch (Exception e) {
// it's ok if we're shutting down
assertThat(e.getMessage(), containsString("threadcontext is already closed"));
@ -1384,4 +1391,97 @@ public class RemoteClusterConnectionTests extends ESTestCase {
}
}
}
public void testProxyMode() throws Exception {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("node_0", knownNodes, Version.CURRENT);
MockTransportService discoverableTransport = startTransport("node_1", knownNodes, Version.CURRENT)) {
knownNodes.add(seedTransport.getLocalDiscoNode());
knownNodes.add(discoverableTransport.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());
final String proxyAddress = "1.1.1.1:99";
Map<String, DiscoveryNode> nodes = new HashMap<>();
nodes.put("node_0", seedTransport.getLocalDiscoNode());
nodes.put("node_1", discoverableTransport.getLocalDiscoNode());
Transport mockTcpTransport = getProxyTransport(threadPool, Collections.singletonMap(proxyAddress, nodes));
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, mockTcpTransport, Version.CURRENT,
threadPool, null, Collections.emptySet())) {
service.start();
service.acceptIncomingRequests();
Supplier<DiscoveryNode> seedSupplier = () ->
RemoteClusterAware.buildSeedNode("some-remote-cluster", "node_0:" + randomIntBetween(1, 10000), true);
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(seedSupplier), service, service.getConnectionManager(), Integer.MAX_VALUE, n -> true, proxyAddress)) {
updateSeedNodes(connection, Arrays.asList(seedSupplier), proxyAddress);
assertEquals(2, connection.getNumNodesConnected());
assertNotNull(connection.getConnection(discoverableTransport.getLocalDiscoNode()));
assertNotNull(connection.getConnection(seedTransport.getLocalDiscoNode()));
assertEquals(proxyAddress, connection.getConnection(seedTransport.getLocalDiscoNode())
.getNode().getAddress().toString());
assertEquals(proxyAddress, connection.getConnection(discoverableTransport.getLocalDiscoNode())
.getNode().getAddress().toString());
service.getConnectionManager().disconnectFromNode(knownNodes.get(0));
// ensure we reconnect
assertBusy(() -> {
assertEquals(2, connection.getNumNodesConnected());
});
discoverableTransport.close();
seedTransport.close();
}
}
}
}
public static Transport getProxyTransport(ThreadPool threadPool, Map<String, Map<String, DiscoveryNode>> nodeMap) {
if (nodeMap.isEmpty()) {
throw new IllegalArgumentException("nodeMap must be non-empty");
}
StubbableTransport stubbableTransport = new StubbableTransport(MockTransportService.newMockTransport(Settings.EMPTY, Version
.CURRENT, threadPool));
stubbableTransport.setDefaultConnectBehavior((t, node, profile) -> {
Map<String, DiscoveryNode> proxyMapping = nodeMap.get(node.getAddress().toString());
if (proxyMapping == null) {
throw new IllegalStateException("no proxy mapping for node: " + node);
}
DiscoveryNode proxyNode = proxyMapping.get(node.getName());
if (proxyNode == null) {
// this is a seednode - lets pick one randomly
assertEquals("seed node must not have a port in the hostname: " + node.getHostName(),
-1, node.getHostName().lastIndexOf(':'));
assertTrue("missing hostname: " + node, proxyMapping.containsKey(node.getHostName()));
// route by seed hostname
proxyNode = proxyMapping.get(node.getHostName());
}
Transport.Connection connection = t.openConnection(proxyNode, profile);
return new Transport.Connection() {
@Override
public DiscoveryNode getNode() {
return node;
}
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
connection.sendRequest(requestId, action, request, options);
}
@Override
public void addCloseListener(ActionListener<Void> listener) {
connection.addCloseListener(listener);
}
@Override
public boolean isClosed() {
return connection.isClosed();
}
@Override
public void close() {
connection.close();
}
};
});
return stubbableTransport;
}
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.AbstractScopedSettings;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
@ -55,6 +56,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.instanceOf;
@ -115,25 +117,38 @@ public class RemoteClusterServiceTests extends ESTestCase {
assertEquals("failed to parse port", e.getMessage());
}
public void testBuiltRemoteClustersSeeds() throws Exception {
Map<String, List<Supplier<DiscoveryNode>>> map = RemoteClusterService.buildRemoteClustersSeeds(
Settings.builder().put("search.remote.foo.seeds", "192.168.0.1:8080").put("search.remote.bar.seeds", "[::1]:9090").build());
assertEquals(2, map.size());
public void testBuildRemoteClustersDynamicConfig() throws Exception {
Map<String, Tuple<String, List<Supplier<DiscoveryNode>>>> map = RemoteClusterService.buildRemoteClustersDynamicConfig(
Settings.builder().put("search.remote.foo.seeds", "192.168.0.1:8080")
.put("search.remote.bar.seeds", "[::1]:9090")
.put("search.remote.boom.seeds", "boom-node1.internal:1000")
.put("search.remote.boom.proxy", "foo.bar.com:1234").build());
assertEquals(3, map.size());
assertTrue(map.containsKey("foo"));
assertTrue(map.containsKey("bar"));
assertEquals(1, map.get("foo").size());
assertEquals(1, map.get("bar").size());
DiscoveryNode foo = map.get("foo").get(0).get();
assertTrue(map.containsKey("boom"));
assertEquals(1, map.get("foo").v2().size());
assertEquals(1, map.get("bar").v2().size());
assertEquals(1, map.get("boom").v2().size());
DiscoveryNode foo = map.get("foo").v2().get(0).get();
assertEquals("", map.get("foo").v1());
assertEquals(foo.getAddress(), new TransportAddress(new InetSocketAddress(InetAddress.getByName("192.168.0.1"), 8080)));
assertEquals(foo.getId(), "foo#192.168.0.1:8080");
assertEquals(foo.getVersion(), Version.CURRENT.minimumCompatibilityVersion());
DiscoveryNode bar = map.get("bar").get(0).get();
DiscoveryNode bar = map.get("bar").v2().get(0).get();
assertEquals(bar.getAddress(), new TransportAddress(new InetSocketAddress(InetAddress.getByName("[::1]"), 9090)));
assertEquals(bar.getId(), "bar#[::1]:9090");
assertEquals("", map.get("bar").v1());
assertEquals(bar.getVersion(), Version.CURRENT.minimumCompatibilityVersion());
DiscoveryNode boom = map.get("boom").v2().get(0).get();
assertEquals(boom.getAddress(), new TransportAddress(TransportAddress.META_ADDRESS, 0));
assertEquals("boom-node1.internal", boom.getHostName());
assertEquals(boom.getId(), "boom#boom-node1.internal:1000");
assertEquals("foo.bar.com:1234", map.get("boom").v1());
assertEquals(boom.getVersion(), Version.CURRENT.minimumCompatibilityVersion());
}
@ -204,17 +219,17 @@ public class RemoteClusterServiceTests extends ESTestCase {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
assertFalse(service.isCrossClusterSearchEnabled());
service.updateRemoteCluster("cluster_1", Collections.singletonList(seedNode.getAddress().toString()));
service.updateRemoteCluster("cluster_1", Collections.singletonList(seedNode.getAddress().toString()), null);
assertTrue(service.isCrossClusterSearchEnabled());
assertTrue(service.isRemoteClusterRegistered("cluster_1"));
service.updateRemoteCluster("cluster_2", Collections.singletonList(otherSeedNode.getAddress().toString()));
service.updateRemoteCluster("cluster_2", Collections.singletonList(otherSeedNode.getAddress().toString()), null);
assertTrue(service.isCrossClusterSearchEnabled());
assertTrue(service.isRemoteClusterRegistered("cluster_1"));
assertTrue(service.isRemoteClusterRegistered("cluster_2"));
service.updateRemoteCluster("cluster_2", Collections.emptyList());
service.updateRemoteCluster("cluster_2", Collections.emptyList(), null);
assertFalse(service.isRemoteClusterRegistered("cluster_2"));
IllegalArgumentException iae = expectThrows(IllegalArgumentException.class,
() -> service.updateRemoteCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, Collections.emptyList()));
() -> service.updateRemoteCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, Collections.emptyList(), null));
assertEquals("remote clusters must not have the empty string as its key", iae.getMessage());
}
}
@ -265,14 +280,14 @@ public class RemoteClusterServiceTests extends ESTestCase {
final CountDownLatch firstLatch = new CountDownLatch(1);
service.updateRemoteCluster(
"cluster_1",
Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString()),
Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString()), null,
connectionListener(firstLatch));
firstLatch.await();
final CountDownLatch secondLatch = new CountDownLatch(1);
service.updateRemoteCluster(
"cluster_2",
Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString()),
Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString()), null,
connectionListener(secondLatch));
secondLatch.await();
@ -330,14 +345,14 @@ public class RemoteClusterServiceTests extends ESTestCase {
final CountDownLatch firstLatch = new CountDownLatch(1);
service.updateRemoteCluster(
"cluster_1",
Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString()),
Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString()), null,
connectionListener(firstLatch));
firstLatch.await();
final CountDownLatch secondLatch = new CountDownLatch(1);
service.updateRemoteCluster(
"cluster_2",
Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString()),
Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString()), null,
connectionListener(secondLatch));
secondLatch.await();
@ -403,14 +418,14 @@ public class RemoteClusterServiceTests extends ESTestCase {
final CountDownLatch firstLatch = new CountDownLatch(1);
service.updateRemoteCluster(
"cluster_1",
Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString()),
Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString()), null,
connectionListener(firstLatch));
firstLatch.await();
final CountDownLatch secondLatch = new CountDownLatch(1);
service.updateRemoteCluster(
"cluster_2",
Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString()),
Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString()), null,
connectionListener(secondLatch));
secondLatch.await();
CountDownLatch latch = new CountDownLatch(1);
@ -822,4 +837,76 @@ public class RemoteClusterServiceTests extends ESTestCase {
assertTrue(nodePredicate.test(node));
}
}
public void testRemoteClusterWithProxy() throws Exception {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService cluster_1_node0 = startTransport("cluster_1_node0", knownNodes, Version.CURRENT);
MockTransportService cluster_1_node_1 = startTransport("cluster_1_node1", knownNodes, Version.CURRENT);
MockTransportService cluster_2_node0 = startTransport("cluster_2_node0", Collections.emptyList(), Version.CURRENT)) {
knownNodes.add(cluster_1_node0.getLocalDiscoNode());
knownNodes.add(cluster_1_node_1.getLocalDiscoNode());
String cluster1Proxy = "1.1.1.1:99";
String cluster2Proxy = "2.2.2.2:99";
Map<String, DiscoveryNode> nodesCluster1 = new HashMap<>();
nodesCluster1.put("cluster_1_node0", cluster_1_node0.getLocalDiscoNode());
nodesCluster1.put("cluster_1_node1", cluster_1_node_1.getLocalDiscoNode());
Map<String, Map<String, DiscoveryNode>> mapping = new HashMap<>();
mapping.put(cluster1Proxy, nodesCluster1);
mapping.put(cluster2Proxy, Collections.singletonMap("cluster_2_node0", cluster_2_node0.getLocalDiscoNode()));
Collections.shuffle(knownNodes, random());
Transport proxyTransport = RemoteClusterConnectionTests.getProxyTransport(threadPool, mapping);
try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, proxyTransport,
Version.CURRENT, threadPool, null, Collections.emptySet());) {
transportService.start();
transportService.acceptIncomingRequests();
Settings.Builder builder = Settings.builder();
builder.putList("search.remote.cluster_1.seeds", "cluster_1_node0:8080");
builder.put("search.remote.cluster_1.proxy", cluster1Proxy);
try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
assertTrue(service.isCrossClusterSearchEnabled());
updateRemoteCluster(service, "cluster_1", Collections.singletonList("cluster_1_node1:8081"), cluster1Proxy);
assertTrue(service.isCrossClusterSearchEnabled());
assertTrue(service.isRemoteClusterRegistered("cluster_1"));
assertFalse(service.isRemoteClusterRegistered("cluster_2"));
updateRemoteCluster(service, "cluster_2", Collections.singletonList("cluster_2_node0:9300"), cluster2Proxy);
assertTrue(service.isCrossClusterSearchEnabled());
assertTrue(service.isRemoteClusterRegistered("cluster_1"));
assertTrue(service.isRemoteClusterRegistered("cluster_2"));
List<RemoteConnectionInfo> infos = service.getRemoteConnectionInfos().collect(Collectors.toList());
for (RemoteConnectionInfo info : infos) {
switch (info.clusterAlias) {
case "cluster_1":
assertEquals(2, info.numNodesConnected);
break;
case "cluster_2":
assertEquals(1, info.numNodesConnected);
break;
default:
fail("unknown cluster: " + info.clusterAlias);
}
}
service.updateRemoteCluster("cluster_2", Collections.emptyList(), randomBoolean() ? cluster2Proxy : null);
assertFalse(service.isRemoteClusterRegistered("cluster_2"));
}
}
}
}
private void updateRemoteCluster(RemoteClusterService service, String clusterAlias, List<String> addresses, String proxyAddress)
throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<>();
ActionListener<Void> listener = ActionListener.wrap(x -> latch.countDown(), x -> {
exceptionAtomicReference.set(x);
latch.countDown();
});
service.updateRemoteCluster(clusterAlias, addresses, proxyAddress, listener);
latch.await();
if (exceptionAtomicReference.get() != null) {
throw exceptionAtomicReference.get();
}
}
}

View File

@ -95,6 +95,12 @@ public final class MockTransportService extends TransportService {
public static MockTransportService createNewService(Settings settings, Version version, ThreadPool threadPool,
@Nullable ClusterSettings clusterSettings) {
MockTcpTransport mockTcpTransport = newMockTransport(settings, version, threadPool);
return createNewService(settings, mockTcpTransport, version, threadPool, clusterSettings,
Collections.emptySet());
}
public static MockTcpTransport newMockTransport(Settings settings, Version version, ThreadPool threadPool) {
// some tests use MockTransportService to do network based testing. Yet, we run tests in multiple JVMs that means
// concurrent tests could claim port that another JVM just released and if that test tries to simulate a disconnect it might
// be smart enough to re-connect depending on what is tested. To reduce the risk, since this is very hard to debug we use
@ -102,9 +108,8 @@ public final class MockTransportService extends TransportService {
int basePort = 10300 + (JVM_ORDINAL * 100); // use a non-default port otherwise some cluster in this JVM might reuse a port
settings = Settings.builder().put(TcpTransport.PORT.getKey(), basePort + "-" + (basePort + 100)).put(settings).build();
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(ClusterModule.getNamedWriteables());
final Transport transport = new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
return new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()), version);
return createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet());
}
public static MockTransportService createNewService(Settings settings, Transport transport, Version version, ThreadPool threadPool,

View File

@ -41,7 +41,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class StubbableTransport implements Transport {
public final class StubbableTransport implements Transport {
private final ConcurrentHashMap<TransportAddress, SendRequestBehavior> sendBehaviors = new ConcurrentHashMap<>();
private final ConcurrentHashMap<TransportAddress, OpenConnectionBehavior> connectBehaviors = new ConcurrentHashMap<>();
@ -60,6 +60,12 @@ public class StubbableTransport implements Transport {
return prior == null;
}
public boolean setDefaultConnectBehavior(OpenConnectionBehavior openConnectionBehavior) {
OpenConnectionBehavior prior = this.defaultConnectBehavior;
this.defaultConnectBehavior = openConnectionBehavior;
return prior == null;
}
boolean addSendBehavior(TransportAddress transportAddress, SendRequestBehavior sendBehavior) {
return sendBehaviors.put(transportAddress, sendBehavior) == null;
}

View File

@ -418,7 +418,7 @@ class IndicesAndAliasesResolver {
private RemoteClusterResolver(Settings settings, ClusterSettings clusterSettings) {
super(settings);
clusters = new CopyOnWriteArraySet<>(buildRemoteClustersSeeds(settings).keySet());
clusters = new CopyOnWriteArraySet<>(buildRemoteClustersDynamicConfig(settings).keySet());
listenForUpdates(clusterSettings);
}
@ -428,7 +428,7 @@ class IndicesAndAliasesResolver {
}
@Override
protected void updateRemoteCluster(String clusterAlias, List<String> addresses) {
protected void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxyAddress) {
if (addresses.isEmpty()) {
clusters.remove(clusterAlias);
} else {