NETWORKING: Make RemoteClusterConn. Lazy Resolve DNS (#32764)

* Lazy resolve DNS (i.e. `String` to `DiscoveryNode`) to not run into indefinitely caching lookup issues (provided the JVM dns cache is configured correctly as explained in https://www.elastic.co/guide/en/elasticsearch/reference/6.3/networkaddress-cache-ttl.html)
   * Changed `InetAddress` type to `String` for that higher up the stack
   * Passed down `Supplier<DiscoveryNode>` instead of outright `DiscoveryNode` from `RemoteClusterAware#buildRemoteClustersSeeds` on to lazy resolve DNS when the `DiscoveryNode` is actually used (could've also passed down the value of `clusterName = REMOTE_CLUSTERS_SEEDS.getNamespace(concreteSetting)` together with the `List<String>` of hosts, but this route seemed to introduce less duplication and resulted in a significantly smaller changeset).
* Closes #28858
This commit is contained in:
Armin Braun 2018-08-18 08:46:44 +02:00 committed by GitHub
parent 532d552ffd
commit f82bb64feb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 153 additions and 102 deletions

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.transport;
import java.util.function.Supplier;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.ClusterNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -48,9 +49,20 @@ public abstract class RemoteClusterAware extends AbstractComponent {
/**
* A list of initial seed nodes to discover eligible nodes from the remote cluster
*/
public static final Setting.AffixSetting<List<InetSocketAddress>> REMOTE_CLUSTERS_SEEDS = Setting.affixKeySetting("search.remote.",
"seeds", (key) -> Setting.listSetting(key, Collections.emptyList(), RemoteClusterAware::parseSeedAddress,
Setting.Property.NodeScope, Setting.Property.Dynamic));
public static final Setting.AffixSetting<List<String>> REMOTE_CLUSTERS_SEEDS = Setting.affixKeySetting(
"search.remote.",
"seeds",
key -> Setting.listSetting(
key, Collections.emptyList(),
s -> {
// validate seed address
parsePort(s);
return s;
},
Setting.Property.NodeScope,
Setting.Property.Dynamic
)
);
public static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':';
public static final String LOCAL_CLUSTER_GROUP_KEY = "";
@ -65,18 +77,20 @@ public abstract class RemoteClusterAware extends AbstractComponent {
this.clusterNameResolver = new ClusterNameExpressionResolver(settings);
}
protected static Map<String, List<DiscoveryNode>> buildRemoteClustersSeeds(Settings settings) {
Stream<Setting<List<InetSocketAddress>>> allConcreteSettings = REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings);
protected static Map<String, List<Supplier<DiscoveryNode>>> buildRemoteClustersSeeds(Settings settings) {
Stream<Setting<List<String>>> allConcreteSettings = REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings);
return allConcreteSettings.collect(
Collectors.toMap(REMOTE_CLUSTERS_SEEDS::getNamespace, concreteSetting -> {
String clusterName = REMOTE_CLUSTERS_SEEDS.getNamespace(concreteSetting);
List<DiscoveryNode> nodes = new ArrayList<>();
for (InetSocketAddress address : concreteSetting.get(settings)) {
TransportAddress transportAddress = new TransportAddress(address);
DiscoveryNode node = new DiscoveryNode(clusterName + "#" + transportAddress.toString(),
List<String> addresses = concreteSetting.get(settings);
List<Supplier<DiscoveryNode>> nodes = new ArrayList<>(addresses.size());
for (String address : addresses) {
nodes.add(() -> {
TransportAddress transportAddress = new TransportAddress(RemoteClusterAware.parseSeedAddress(address));
return new DiscoveryNode(clusterName + "#" + transportAddress.toString(),
transportAddress,
Version.CURRENT.minimumCompatibilityVersion());
nodes.add(node);
});
}
return nodes;
}));
@ -128,7 +142,7 @@ public abstract class RemoteClusterAware extends AbstractComponent {
* Subclasses must implement this to receive information about updated cluster aliases. If the given address list is
* empty the cluster alias is unregistered and should be removed.
*/
protected abstract void updateRemoteCluster(String clusterAlias, List<InetSocketAddress> addresses);
protected abstract void updateRemoteCluster(String clusterAlias, List<String> addresses);
/**
* Registers this instance to listen to updates on the cluster settings.
@ -138,29 +152,37 @@ public abstract class RemoteClusterAware extends AbstractComponent {
(namespace, value) -> {});
}
private static InetSocketAddress parseSeedAddress(String remoteHost) {
int portSeparator = remoteHost.lastIndexOf(':'); // in case we have a IPv6 address ie. [::1]:9300
if (portSeparator == -1 || portSeparator == remoteHost.length()) {
throw new IllegalArgumentException("remote hosts need to be configured as [host:port], found [" + remoteHost + "] instead");
}
String host = remoteHost.substring(0, portSeparator);
protected static InetSocketAddress parseSeedAddress(String remoteHost) {
String host = remoteHost.substring(0, indexOfPortSeparator(remoteHost));
InetAddress hostAddress;
try {
hostAddress = InetAddress.getByName(host);
} catch (UnknownHostException e) {
throw new IllegalArgumentException("unknown host [" + host + "]", e);
}
return new InetSocketAddress(hostAddress, parsePort(remoteHost));
}
private static int parsePort(String remoteHost) {
try {
int port = Integer.valueOf(remoteHost.substring(portSeparator + 1));
int port = Integer.valueOf(remoteHost.substring(indexOfPortSeparator(remoteHost) + 1));
if (port <= 0) {
throw new IllegalArgumentException("port number must be > 0 but was: [" + port + "]");
}
return new InetSocketAddress(hostAddress, port);
return port;
} catch (NumberFormatException e) {
throw new IllegalArgumentException("port must be a number", e);
throw new IllegalArgumentException("failed to parse port", e);
}
}
private static int indexOfPortSeparator(String remoteHost) {
int portSeparator = remoteHost.lastIndexOf(':'); // in case we have a IPv6 address ie. [::1]:9300
if (portSeparator == -1 || portSeparator == remoteHost.length()) {
throw new IllegalArgumentException("remote hosts need to be configured as [host:port], found [" + remoteHost + "] instead");
}
return portSeparator;
}
public static String buildRemoteIndexName(String clusterAlias, String indexName) {
return clusterAlias != null ? clusterAlias + REMOTE_CLUSTER_INDEX_SEPARATOR + indexName : indexName;
}

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.transport;
import java.util.function.Supplier;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.SetOnce;
@ -84,7 +85,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
private final String clusterAlias;
private final int maxNumRemoteConnections;
private final Predicate<DiscoveryNode> nodePredicate;
private volatile List<DiscoveryNode> seedNodes;
private volatile List<Supplier<DiscoveryNode>> seedNodes;
private volatile boolean skipUnavailable;
private final ConnectHandler connectHandler;
private SetOnce<ClusterName> remoteClusterName = new SetOnce<>();
@ -99,7 +100,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
* @param maxNumRemoteConnections the maximum number of connections to the remote cluster
* @param nodePredicate a predicate to filter eligible remote nodes to connect to
*/
RemoteClusterConnection(Settings settings, String clusterAlias, List<DiscoveryNode> seedNodes,
RemoteClusterConnection(Settings settings, String clusterAlias, List<Supplier<DiscoveryNode>> seedNodes,
TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate) {
super(settings);
this.localClusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
@ -127,7 +128,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
/**
* Updates the list of seed nodes for this cluster connection
*/
synchronized void updateSeedNodes(List<DiscoveryNode> seedNodes, ActionListener<Void> connectListener) {
synchronized void updateSeedNodes(List<Supplier<DiscoveryNode>> seedNodes, ActionListener<Void> connectListener) {
this.seedNodes = Collections.unmodifiableList(new ArrayList<>(seedNodes));
connectHandler.connect(connectListener);
}
@ -456,7 +457,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
});
}
void collectRemoteNodes(Iterator<DiscoveryNode> seedNodes,
private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes,
final TransportService transportService, ActionListener<Void> listener) {
if (Thread.currentThread().isInterrupted()) {
listener.onFailure(new InterruptedException("remote connect thread got interrupted"));
@ -464,7 +465,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
try {
if (seedNodes.hasNext()) {
cancellableThreads.executeIO(() -> {
final DiscoveryNode seedNode = seedNodes.next();
final DiscoveryNode seedNode = seedNodes.next().get();
final TransportService.HandshakeResponse handshakeResponse;
Transport.Connection connection = transportService.openConnection(seedNode,
ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null));
@ -554,11 +555,11 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
private final TransportService transportService;
private final Transport.Connection connection;
private final ActionListener<Void> listener;
private final Iterator<DiscoveryNode> seedNodes;
private final Iterator<Supplier<DiscoveryNode>> seedNodes;
private final CancellableThreads cancellableThreads;
SniffClusterStateResponseHandler(TransportService transportService, Transport.Connection connection,
ActionListener<Void> listener, Iterator<DiscoveryNode> seedNodes,
ActionListener<Void> listener, Iterator<Supplier<DiscoveryNode>> seedNodes,
CancellableThreads cancellableThreads) {
this.transportService = transportService;
this.connection = connection;
@ -651,7 +652,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
* Get the information about remote nodes to be rendered on {@code _remote/info} requests.
*/
public RemoteConnectionInfo getConnectionInfo() {
List<TransportAddress> seedNodeAddresses = seedNodes.stream().map(DiscoveryNode::getAddress).collect(Collectors.toList());
List<TransportAddress> seedNodeAddresses = seedNodes.stream().map(node -> node.get().getAddress()).collect(Collectors.toList());
TimeValue initialConnectionTimeout = RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings);
return new RemoteConnectionInfo(clusterAlias, seedNodeAddresses, maxNumRemoteConnections, connectedNodes.size(),
initialConnectionTimeout, skipUnavailable);

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.transport;
import java.util.function.Supplier;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
@ -40,7 +41,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -115,7 +115,8 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
* @param seeds a cluster alias to discovery node mapping representing the remote clusters seeds nodes
* @param connectionListener a listener invoked once every configured cluster has been connected to
*/
private synchronized void updateRemoteClusters(Map<String, List<DiscoveryNode>> seeds, ActionListener<Void> connectionListener) {
private synchronized void updateRemoteClusters(Map<String, List<Supplier<DiscoveryNode>>> seeds,
ActionListener<Void> connectionListener) {
if (seeds.containsKey(LOCAL_CLUSTER_GROUP_KEY)) {
throw new IllegalArgumentException("remote clusters must not have the empty string as its key");
}
@ -125,7 +126,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
} else {
CountDown countDown = new CountDown(seeds.size());
remoteClusters.putAll(this.remoteClusters);
for (Map.Entry<String, List<DiscoveryNode>> entry : seeds.entrySet()) {
for (Map.Entry<String, List<Supplier<DiscoveryNode>>> entry : seeds.entrySet()) {
RemoteClusterConnection remote = this.remoteClusters.get(entry.getKey());
if (entry.getValue().isEmpty()) { // with no seed nodes we just remove the connection
try {
@ -310,16 +311,17 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
}
}
protected void updateRemoteCluster(String clusterAlias, List<InetSocketAddress> addresses) {
@Override
protected void updateRemoteCluster(String clusterAlias, List<String> addresses) {
updateRemoteCluster(clusterAlias, addresses, ActionListener.wrap((x) -> {}, (x) -> {}));
}
void updateRemoteCluster(
final String clusterAlias,
final List<InetSocketAddress> addresses,
final List<String> addresses,
final ActionListener<Void> connectionListener) {
final List<DiscoveryNode> nodes = addresses.stream().map(address -> {
final TransportAddress transportAddress = new TransportAddress(address);
final List<Supplier<DiscoveryNode>> nodes = addresses.stream().<Supplier<DiscoveryNode>>map(address -> () -> {
final TransportAddress transportAddress = new TransportAddress(RemoteClusterAware.parseSeedAddress(address));
final String id = clusterAlias + "#" + transportAddress.toString();
final Version version = Version.CURRENT.minimumCompatibilityVersion();
return new DiscoveryNode(id, transportAddress, version);
@ -334,7 +336,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
void initializeRemoteClusters() {
final TimeValue timeValue = REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings);
final PlainActionFuture<Void> future = new PlainActionFuture<>();
Map<String, List<DiscoveryNode>> seeds = RemoteClusterAware.buildRemoteClustersSeeds(settings);
Map<String, List<Supplier<DiscoveryNode>>> seeds = RemoteClusterAware.buildRemoteClustersSeeds(settings);
updateRemoteClusters(seeds, future);
try {
future.get(timeValue.millis(), TimeUnit.MILLISECONDS);

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.transport;
import java.util.function.Supplier;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
@ -158,8 +159,8 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) {
updateSeedNodes(connection, Arrays.asList(seedNode));
Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) {
updateSeedNodes(connection, Arrays.asList(() -> seedNode));
assertTrue(service.nodeConnected(seedNode));
assertTrue(service.nodeConnected(discoverableNode));
assertTrue(connection.assertNoRunningConnections());
@ -198,8 +199,8 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) {
updateSeedNodes(connection, Arrays.asList(seedNode));
Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) {
updateSeedNodes(connection, Arrays.asList(() -> seedNode));
assertTrue(service.nodeConnected(seedNode));
assertTrue(service.nodeConnected(discoverableNode));
assertTrue(connection.assertNoRunningConnections());
@ -254,8 +255,8 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) {
updateSeedNodes(connection, Arrays.asList(seedNode));
Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) {
updateSeedNodes(connection, Arrays.asList(() -> seedNode));
assertTrue(service.nodeConnected(seedNode));
assertTrue(service.nodeConnected(discoverableNode));
assertTrue(connection.assertNoRunningConnections());
@ -276,7 +277,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
knownNodes.add(discoverableTransport.getLocalDiscoNode());
knownNodes.add(incompatibleTransport.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());
List<DiscoveryNode> seedNodes = Arrays.asList(incompatibleSeedNode, seedNode);
List<Supplier<DiscoveryNode>> seedNodes = Arrays.asList(() -> incompatibleSeedNode, () -> seedNode);
Collections.shuffle(seedNodes, random());
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
@ -310,8 +311,8 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) {
updateSeedNodes(connection, Arrays.asList(seedNode));
Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) {
updateSeedNodes(connection, Arrays.asList(() -> seedNode));
assertTrue(service.nodeConnected(seedNode));
assertTrue(service.nodeConnected(discoverableNode));
assertFalse(service.nodeConnected(spareNode));
@ -359,8 +360,8 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> n.equals(rejectedNode) == false)) {
updateSeedNodes(connection, Arrays.asList(seedNode));
Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> n.equals(rejectedNode) == false)) {
updateSeedNodes(connection, Arrays.asList(() -> seedNode));
if (rejectedNode.equals(seedNode)) {
assertFalse(service.nodeConnected(seedNode));
assertTrue(service.nodeConnected(discoverableNode));
@ -374,7 +375,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
}
}
private void updateSeedNodes(RemoteClusterConnection connection, List<DiscoveryNode> seedNodes) throws Exception {
private void updateSeedNodes(RemoteClusterConnection connection, List<Supplier<DiscoveryNode>> seedNodes) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<>();
ActionListener<Void> listener = ActionListener.wrap(x -> latch.countDown(), x -> {
@ -398,8 +399,8 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) {
expectThrows(Exception.class, () -> updateSeedNodes(connection, Arrays.asList(seedNode)));
Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) {
expectThrows(Exception.class, () -> updateSeedNodes(connection, Arrays.asList(() -> seedNode)));
assertFalse(service.nodeConnected(seedNode));
assertTrue(connection.assertNoRunningConnections());
}
@ -461,7 +462,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) {
Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) {
connection.addConnectedNode(seedNode);
for (DiscoveryNode node : knownNodes) {
final Transport.Connection transportConnection = connection.getConnection(node);
@ -504,7 +505,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
CountDownLatch listenerCalled = new CountDownLatch(1);
AtomicReference<Exception> exceptionReference = new AtomicReference<>();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) {
Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) {
ActionListener<Void> listener = ActionListener.wrap(x -> {
listenerCalled.countDown();
fail("expected exception");
@ -512,7 +513,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
exceptionReference.set(x);
listenerCalled.countDown();
});
connection.updateSeedNodes(Arrays.asList(seedNode), listener);
connection.updateSeedNodes(Arrays.asList(() -> seedNode), listener);
acceptedLatch.await();
connection.close(); // now close it, this should trigger an interrupt on the socket and we can move on
assertTrue(connection.assertNoRunningConnections());
@ -539,7 +540,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
List<DiscoveryNode> nodes = Collections.singletonList(seedNode);
List<Supplier<DiscoveryNode>> nodes = Collections.singletonList(() -> seedNode);
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
nodes, service, Integer.MAX_VALUE, n -> true)) {
if (randomBoolean()) {
@ -579,7 +580,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
List<DiscoveryNode> nodes = Collections.singletonList(seedNode);
List<Supplier<DiscoveryNode>> nodes = Collections.singletonList(() -> seedNode);
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
nodes, service, Integer.MAX_VALUE, n -> true)) {
SearchRequest request = new SearchRequest("test-index");
@ -635,7 +636,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Collections.singletonList(seedNode), service, Integer.MAX_VALUE, n -> true)) {
Collections.singletonList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) {
SearchRequest request = new SearchRequest("test-index");
ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest("test-index")
@ -738,7 +739,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
knownNodes.add(discoverableTransport.getLocalDiscoNode());
knownNodes.add(seedTransport1.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());
List<DiscoveryNode> seedNodes = Arrays.asList(seedNode1, seedNode);
List<Supplier<DiscoveryNode>> seedNodes = Arrays.asList(() -> seedNode1, () -> seedNode);
Collections.shuffle(seedNodes, random());
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
@ -816,7 +817,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
knownNodes.add(discoverableTransport.getLocalDiscoNode());
knownNodes.add(seedTransport1.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());
List<DiscoveryNode> seedNodes = Arrays.asList(seedNode1, seedNode);
List<Supplier<DiscoveryNode>> seedNodes = Arrays.asList(() -> seedNode1, () -> seedNode);
Collections.shuffle(seedNodes, random());
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
@ -904,7 +905,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
knownNodes.add(transport3.getLocalDiscoNode());
knownNodes.add(transport2.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());
List<DiscoveryNode> seedNodes = Arrays.asList(node3, node1, node2);
List<Supplier<DiscoveryNode>> seedNodes = Arrays.asList(() -> node3, () -> node1, () -> node2);
Collections.shuffle(seedNodes, random());
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
@ -1059,7 +1060,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) {
Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) {
assertFalse(service.nodeConnected(seedNode));
assertFalse(service.nodeConnected(discoverableNode));
assertTrue(connection.assertNoRunningConnections());
@ -1108,9 +1109,9 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) {
Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true)) {
if (randomBoolean()) {
updateSeedNodes(connection, Arrays.asList(seedNode));
updateSeedNodes(connection, Arrays.asList(() -> seedNode));
}
CountDownLatch responseLatch = new CountDownLatch(1);
AtomicReference<Function<String, DiscoveryNode>> reference = new AtomicReference<>();
@ -1142,14 +1143,14 @@ public class RemoteClusterConnectionTests extends ESTestCase {
List<MockTransportService> discoverableTransports = new CopyOnWriteArrayList<>();
try {
final int numDiscoverableNodes = randomIntBetween(5, 20);
List<DiscoveryNode> discoverableNodes = new ArrayList<>(numDiscoverableNodes);
for (int i = 0; i < numDiscoverableNodes; i++) {
List<Supplier<DiscoveryNode>> discoverableNodes = new ArrayList<>(numDiscoverableNodes);
for (int i = 0; i < numDiscoverableNodes; i++ ) {
MockTransportService transportService = startTransport("discoverable_node" + i, knownNodes, Version.CURRENT);
discoverableNodes.add(transportService.getLocalDiscoNode());
discoverableNodes.add(transportService::getLocalDiscoNode);
discoverableTransports.add(transportService);
}
List<DiscoveryNode> seedNodes = randomSubsetOf(discoverableNodes);
List<Supplier<DiscoveryNode>> seedNodes = randomSubsetOf(discoverableNodes);
Collections.shuffle(seedNodes, random());
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
@ -1198,7 +1199,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
discoverableTransports.add(transportService);
connection.addConnectedNode(transportService.getLocalDiscoNode());
} else {
DiscoveryNode node = randomFrom(discoverableNodes);
DiscoveryNode node = randomFrom(discoverableNodes).get();
connection.onNodeDisconnected(node);
}
}
@ -1246,12 +1247,13 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) {
updateSeedNodes(connection, Arrays.asList(seedNode));
Arrays.asList( () -> seedNode), service, Integer.MAX_VALUE, n -> true)) {
updateSeedNodes(connection, Arrays.asList(() -> seedNode));
assertTrue(service.nodeConnected(seedNode));
assertTrue(service.nodeConnected(discoverableNode));
assertTrue(connection.assertNoRunningConnections());
List<DiscoveryNode> discoveryNodes = Arrays.asList(otherClusterTransport.getLocalDiscoNode(), seedNode);
List<Supplier<DiscoveryNode>> discoveryNodes =
Arrays.asList(() -> otherClusterTransport.getLocalDiscoNode(), () -> seedNode);
Collections.shuffle(discoveryNodes, random());
updateSeedNodes(connection, discoveryNodes);
assertTrue(service.nodeConnected(seedNode));
@ -1262,7 +1264,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
assertTrue(service.nodeConnected(discoverableNode));
assertTrue(connection.assertNoRunningConnections());
IllegalStateException illegalStateException = expectThrows(IllegalStateException.class, () ->
updateSeedNodes(connection, Arrays.asList(otherClusterTransport.getLocalDiscoNode())));
updateSeedNodes(connection, Arrays.asList(() -> otherClusterTransport.getLocalDiscoNode())));
assertThat(illegalStateException.getMessage(),
startsWith("handshake failed, mismatched cluster name [Cluster [otherCluster]]" +
" - {other_cluster_discoverable_node}"));
@ -1325,7 +1327,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Collections.singletonList(connectedNode), service, Integer.MAX_VALUE, n -> true)) {
Collections.singletonList(() -> connectedNode), service, Integer.MAX_VALUE, n -> true)) {
connection.addConnectedNode(connectedNode);
for (int i = 0; i < 10; i++) {
//always a direct connection as the remote node is already connected
@ -1348,4 +1350,34 @@ public class RemoteClusterConnectionTests extends ESTestCase {
}
}
}
public void testLazyResolveTransportAddress() throws Exception {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) {
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
knownNodes.add(seedTransport.getLocalDiscoNode());
knownNodes.add(discoverableTransport.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
CountDownLatch multipleResolveLatch = new CountDownLatch(2);
Supplier<DiscoveryNode> seedSupplier = () -> {
multipleResolveLatch.countDown();
return seedNode;
};
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(seedSupplier), service, Integer.MAX_VALUE, n -> true)) {
updateSeedNodes(connection, Arrays.asList(seedSupplier));
// Closing connections leads to RemoteClusterConnection.ConnectHandler.collectRemoteNodes
// being called again so we try to resolve the same seed node's host twice
discoverableTransport.close();
seedTransport.close();
assertTrue(multipleResolveLatch.await(30L, TimeUnit.SECONDS));
}
}
}
}
}

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.transport;
import java.util.function.Supplier;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
@ -103,10 +104,19 @@ public class RemoteClusterServiceTests extends ESTestCase {
.put("search.remote.foo.seeds", "192.168.0.1").build();
expectThrows(IllegalArgumentException.class, () ->
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(brokenSettings).forEach(setting -> setting.get(brokenSettings)));
Settings brokenPortSettings = Settings.builder()
.put("search.remote.foo.seeds", "192.168.0.1:123456789123456789").build();
Exception e = expectThrows(
IllegalArgumentException.class,
() -> RemoteClusterAware.REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(brokenSettings)
.forEach(setting -> setting.get(brokenPortSettings))
);
assertEquals("failed to parse port", e.getMessage());
}
public void testBuiltRemoteClustersSeeds() throws Exception {
Map<String, List<DiscoveryNode>> map = RemoteClusterService.buildRemoteClustersSeeds(
Map<String, List<Supplier<DiscoveryNode>>> map = RemoteClusterService.buildRemoteClustersSeeds(
Settings.builder().put("search.remote.foo.seeds", "192.168.0.1:8080").put("search.remote.bar.seeds", "[::1]:9090").build());
assertEquals(2, map.size());
assertTrue(map.containsKey("foo"));
@ -114,13 +124,13 @@ public class RemoteClusterServiceTests extends ESTestCase {
assertEquals(1, map.get("foo").size());
assertEquals(1, map.get("bar").size());
DiscoveryNode foo = map.get("foo").get(0);
DiscoveryNode foo = map.get("foo").get(0).get();
assertEquals(foo.getAddress(), new TransportAddress(new InetSocketAddress(InetAddress.getByName("192.168.0.1"), 8080)));
assertEquals(foo.getId(), "foo#192.168.0.1:8080");
assertEquals(foo.getVersion(), Version.CURRENT.minimumCompatibilityVersion());
DiscoveryNode bar = map.get("bar").get(0);
DiscoveryNode bar = map.get("bar").get(0).get();
assertEquals(bar.getAddress(), new TransportAddress(new InetSocketAddress(InetAddress.getByName("[::1]"), 9090)));
assertEquals(bar.getId(), "bar#[::1]:9090");
assertEquals(bar.getVersion(), Version.CURRENT.minimumCompatibilityVersion());
@ -194,10 +204,10 @@ public class RemoteClusterServiceTests extends ESTestCase {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
assertFalse(service.isCrossClusterSearchEnabled());
service.updateRemoteCluster("cluster_1", Collections.singletonList(seedNode.getAddress().address()));
service.updateRemoteCluster("cluster_1", Collections.singletonList(seedNode.getAddress().toString()));
assertTrue(service.isCrossClusterSearchEnabled());
assertTrue(service.isRemoteClusterRegistered("cluster_1"));
service.updateRemoteCluster("cluster_2", Collections.singletonList(otherSeedNode.getAddress().address()));
service.updateRemoteCluster("cluster_2", Collections.singletonList(otherSeedNode.getAddress().toString()));
assertTrue(service.isCrossClusterSearchEnabled());
assertTrue(service.isRemoteClusterRegistered("cluster_1"));
assertTrue(service.isRemoteClusterRegistered("cluster_2"));
@ -252,22 +262,17 @@ public class RemoteClusterServiceTests extends ESTestCase {
service.initializeRemoteClusters();
assertFalse(service.isCrossClusterSearchEnabled());
final InetSocketAddress c1N1Address = c1N1Node.getAddress().address();
final InetSocketAddress c1N2Address = c1N2Node.getAddress().address();
final InetSocketAddress c2N1Address = c2N1Node.getAddress().address();
final InetSocketAddress c2N2Address = c2N2Node.getAddress().address();
final CountDownLatch firstLatch = new CountDownLatch(1);
service.updateRemoteCluster(
"cluster_1",
Arrays.asList(c1N1Address, c1N2Address),
Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString()),
connectionListener(firstLatch));
firstLatch.await();
final CountDownLatch secondLatch = new CountDownLatch(1);
service.updateRemoteCluster(
"cluster_2",
Arrays.asList(c2N1Address, c2N2Address),
Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString()),
connectionListener(secondLatch));
secondLatch.await();
@ -321,22 +326,17 @@ public class RemoteClusterServiceTests extends ESTestCase {
service.initializeRemoteClusters();
assertFalse(service.isCrossClusterSearchEnabled());
final InetSocketAddress c1N1Address = c1N1Node.getAddress().address();
final InetSocketAddress c1N2Address = c1N2Node.getAddress().address();
final InetSocketAddress c2N1Address = c2N1Node.getAddress().address();
final InetSocketAddress c2N2Address = c2N2Node.getAddress().address();
final CountDownLatch firstLatch = new CountDownLatch(1);
service.updateRemoteCluster(
"cluster_1",
Arrays.asList(c1N1Address, c1N2Address),
Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString()),
connectionListener(firstLatch));
firstLatch.await();
final CountDownLatch secondLatch = new CountDownLatch(1);
service.updateRemoteCluster(
"cluster_2",
Arrays.asList(c2N1Address, c2N2Address),
Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString()),
connectionListener(secondLatch));
secondLatch.await();
@ -398,22 +398,17 @@ public class RemoteClusterServiceTests extends ESTestCase {
service.initializeRemoteClusters();
assertFalse(service.isCrossClusterSearchEnabled());
final InetSocketAddress c1N1Address = c1N1Node.getAddress().address();
final InetSocketAddress c1N2Address = c1N2Node.getAddress().address();
final InetSocketAddress c2N1Address = c2N1Node.getAddress().address();
final InetSocketAddress c2N2Address = c2N2Node.getAddress().address();
final CountDownLatch firstLatch = new CountDownLatch(1);
service.updateRemoteCluster(
"cluster_1",
Arrays.asList(c1N1Address, c1N2Address),
Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString()),
connectionListener(firstLatch));
firstLatch.await();
final CountDownLatch secondLatch = new CountDownLatch(1);
service.updateRemoteCluster(
"cluster_2",
Arrays.asList(c2N1Address, c2N2Address),
Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString()),
connectionListener(secondLatch));
secondLatch.await();
CountDownLatch latch = new CountDownLatch(1);

View File

@ -30,7 +30,6 @@ import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.xpack.core.graph.action.GraphExploreRequest;
import org.elasticsearch.xpack.core.security.authz.IndicesAndAliasesResolverField;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -429,7 +428,7 @@ class IndicesAndAliasesResolver {
}
@Override
protected void updateRemoteCluster(String clusterAlias, List<InetSocketAddress> addresses) {
protected void updateRemoteCluster(String clusterAlias, List<String> addresses) {
if (addresses.isEmpty()) {
clusters.remove(clusterAlias);
} else {