Do not resolve addresses in remote connection info (#36671)
The remote connection info API leads to resolving addresses of seed nodes when invoked. This is problematic because if a hostname fails to resolve, we would not display any remote connection info. Yet, a hostname not resolving can happen across remote clusters, especially in the modern world of cloud services with dynamically chaning IPs. Instead, the remote connection info API should be providing the configured seed nodes. This commit changes the remote connection info to display the configured seed nodes, avoiding a hostname resolution. Note that care was taken to preserve backwards compatibility with previous versions that expect the remote connection info to serialize a transport address instead of a string representing the hostname.
This commit is contained in:
parent
1f80c80b3a
commit
cd632de116
|
@ -183,10 +183,11 @@ public abstract class RemoteClusterAware {
|
|||
* (ProxyAddresss, [SeedNodeSuppliers]). If a cluster is configured with a proxy address all seed nodes will point to
|
||||
* {@link TransportAddress#META_ADDRESS} and their configured address will be used as the hostname for the generated discovery node.
|
||||
*/
|
||||
protected static Map<String, Tuple<String, List<Supplier<DiscoveryNode>>>> buildRemoteClustersDynamicConfig(Settings settings) {
|
||||
final Map<String, Tuple<String, List<Supplier<DiscoveryNode>>>> remoteSeeds =
|
||||
protected static Map<String, Tuple<String, List<Tuple<String, Supplier<DiscoveryNode>>>>> buildRemoteClustersDynamicConfig(
|
||||
final Settings settings) {
|
||||
final Map<String, Tuple<String, List<Tuple<String, Supplier<DiscoveryNode>>>>> remoteSeeds =
|
||||
buildRemoteClustersDynamicConfig(settings, REMOTE_CLUSTERS_SEEDS);
|
||||
final Map<String, Tuple<String, List<Supplier<DiscoveryNode>>>> searchRemoteSeeds =
|
||||
final Map<String, Tuple<String, List<Tuple<String, Supplier<DiscoveryNode>>>>> searchRemoteSeeds =
|
||||
buildRemoteClustersDynamicConfig(settings, SEARCH_REMOTE_CLUSTERS_SEEDS);
|
||||
// sort the intersection for predictable output order
|
||||
final NavigableSet<String> intersection =
|
||||
|
@ -205,7 +206,7 @@ public abstract class RemoteClusterAware {
|
|||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
}
|
||||
|
||||
private static Map<String, Tuple<String, List<Supplier<DiscoveryNode>>>> buildRemoteClustersDynamicConfig(
|
||||
private static Map<String, Tuple<String, List<Tuple<String, Supplier<DiscoveryNode>>>>> buildRemoteClustersDynamicConfig(
|
||||
final Settings settings, final Setting.AffixSetting<List<String>> seedsSetting) {
|
||||
final Stream<Setting<List<String>>> allConcreteSettings = seedsSetting.getAllConcreteSettings(settings);
|
||||
return allConcreteSettings.collect(
|
||||
|
@ -214,9 +215,9 @@ public abstract class RemoteClusterAware {
|
|||
List<String> addresses = concreteSetting.get(settings);
|
||||
final boolean proxyMode =
|
||||
REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterName).existsOrFallbackExists(settings);
|
||||
List<Supplier<DiscoveryNode>> nodes = new ArrayList<>(addresses.size());
|
||||
List<Tuple<String, Supplier<DiscoveryNode>>> nodes = new ArrayList<>(addresses.size());
|
||||
for (String address : addresses) {
|
||||
nodes.add(() -> buildSeedNode(clusterName, address, proxyMode));
|
||||
nodes.add(Tuple.tuple(address, () -> buildSeedNode(clusterName, address, proxyMode)));
|
||||
}
|
||||
return new Tuple<>(REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterName).get(settings), nodes);
|
||||
}));
|
||||
|
@ -304,16 +305,24 @@ public abstract class RemoteClusterAware {
|
|||
(namespace, value) -> {});
|
||||
}
|
||||
|
||||
|
||||
protected static InetSocketAddress parseSeedAddress(String remoteHost) {
|
||||
String host = remoteHost.substring(0, indexOfPortSeparator(remoteHost));
|
||||
static InetSocketAddress parseSeedAddress(String remoteHost) {
|
||||
final Tuple<String, Integer> hostPort = parseHostPort(remoteHost);
|
||||
final String host = hostPort.v1();
|
||||
assert hostPort.v2() != null : remoteHost;
|
||||
final int port = hostPort.v2();
|
||||
InetAddress hostAddress;
|
||||
try {
|
||||
hostAddress = InetAddress.getByName(host);
|
||||
} catch (UnknownHostException e) {
|
||||
throw new IllegalArgumentException("unknown host [" + host + "]", e);
|
||||
}
|
||||
return new InetSocketAddress(hostAddress, parsePort(remoteHost));
|
||||
return new InetSocketAddress(hostAddress, port);
|
||||
}
|
||||
|
||||
public static Tuple<String, Integer> parseHostPort(final String remoteHost) {
|
||||
final String host = remoteHost.substring(0, indexOfPortSeparator(remoteHost));
|
||||
final int port = parsePort(remoteHost);
|
||||
return Tuple.tuple(host, port);
|
||||
}
|
||||
|
||||
private static int parsePort(String remoteHost) {
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.elasticsearch.action.support.ContextPreservingActionListener;
|
|||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
|
@ -95,7 +96,7 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
|
|||
private final Predicate<DiscoveryNode> nodePredicate;
|
||||
private final ThreadPool threadPool;
|
||||
private volatile String proxyAddress;
|
||||
private volatile List<Supplier<DiscoveryNode>> seedNodes;
|
||||
private volatile List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes;
|
||||
private volatile boolean skipUnavailable;
|
||||
private final ConnectHandler connectHandler;
|
||||
private final TimeValue initialConnectionTimeout;
|
||||
|
@ -111,7 +112,7 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
|
|||
* @param nodePredicate a predicate to filter eligible remote nodes to connect to
|
||||
* @param proxyAddress the proxy address
|
||||
*/
|
||||
RemoteClusterConnection(Settings settings, String clusterAlias, List<Supplier<DiscoveryNode>> seedNodes,
|
||||
RemoteClusterConnection(Settings settings, String clusterAlias, List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes,
|
||||
TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate,
|
||||
String proxyAddress) {
|
||||
this(settings, clusterAlias, seedNodes, transportService, maxNumRemoteConnections, nodePredicate, proxyAddress,
|
||||
|
@ -119,7 +120,7 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
|
|||
}
|
||||
|
||||
// Public for tests to pass a StubbableConnectionManager
|
||||
RemoteClusterConnection(Settings settings, String clusterAlias, List<Supplier<DiscoveryNode>> seedNodes,
|
||||
RemoteClusterConnection(Settings settings, String clusterAlias, List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes,
|
||||
TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate,
|
||||
String proxyAddress, ConnectionManager connectionManager) {
|
||||
this.transportService = transportService;
|
||||
|
@ -155,7 +156,10 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
|
|||
/**
|
||||
* Updates the list of seed nodes for this cluster connection
|
||||
*/
|
||||
synchronized void updateSeedNodes(String proxyAddress, List<Supplier<DiscoveryNode>> seedNodes, ActionListener<Void> connectListener) {
|
||||
synchronized void updateSeedNodes(
|
||||
final String proxyAddress,
|
||||
final List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes,
|
||||
final ActionListener<Void> connectListener) {
|
||||
this.seedNodes = Collections.unmodifiableList(new ArrayList<>(seedNodes));
|
||||
this.proxyAddress = proxyAddress;
|
||||
connectHandler.connect(connectListener);
|
||||
|
@ -465,7 +469,7 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
|
|||
maybeConnect();
|
||||
}
|
||||
});
|
||||
collectRemoteNodes(seedNodes.iterator(), transportService, connectionManager, listener);
|
||||
collectRemoteNodes(seedNodes.stream().map(Tuple::v2).iterator(), transportService, connectionManager, listener);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -672,10 +676,13 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
|
|||
* Get the information about remote nodes to be rendered on {@code _remote/info} requests.
|
||||
*/
|
||||
public RemoteConnectionInfo getConnectionInfo() {
|
||||
List<TransportAddress> seedNodeAddresses = seedNodes.stream().map(node -> node.get().getAddress()).collect
|
||||
(Collectors.toList());
|
||||
return new RemoteConnectionInfo(clusterAlias, seedNodeAddresses, maxNumRemoteConnections, connectedNodes.size(),
|
||||
initialConnectionTimeout, skipUnavailable);
|
||||
return new RemoteConnectionInfo(
|
||||
clusterAlias,
|
||||
seedNodes.stream().map(Tuple::v1).collect(Collectors.toList()),
|
||||
maxNumRemoteConnections,
|
||||
connectedNodes.size(),
|
||||
initialConnectionTimeout,
|
||||
skipUnavailable);
|
||||
}
|
||||
|
||||
int getNumNodesConnected() {
|
||||
|
|
|
@ -201,7 +201,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
|
|||
* @param seeds a cluster alias to discovery node mapping representing the remote clusters seeds nodes
|
||||
* @param connectionListener a listener invoked once every configured cluster has been connected to
|
||||
*/
|
||||
private synchronized void updateRemoteClusters(Map<String, Tuple<String, List<Supplier<DiscoveryNode>>>> seeds,
|
||||
private synchronized void updateRemoteClusters(Map<String, Tuple<String, List<Tuple<String, Supplier<DiscoveryNode>>>>> seeds,
|
||||
ActionListener<Void> connectionListener) {
|
||||
if (seeds.containsKey(LOCAL_CLUSTER_GROUP_KEY)) {
|
||||
throw new IllegalArgumentException("remote clusters must not have the empty string as its key");
|
||||
|
@ -212,8 +212,8 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
|
|||
} else {
|
||||
CountDown countDown = new CountDown(seeds.size());
|
||||
remoteClusters.putAll(this.remoteClusters);
|
||||
for (Map.Entry<String, Tuple<String, List<Supplier<DiscoveryNode>>>> entry : seeds.entrySet()) {
|
||||
List<Supplier<DiscoveryNode>> seedList = entry.getValue().v2();
|
||||
for (Map.Entry<String, Tuple<String, List<Tuple<String, Supplier<DiscoveryNode>>>>> entry : seeds.entrySet()) {
|
||||
List<Tuple<String, Supplier<DiscoveryNode>>> seedList = entry.getValue().v2();
|
||||
String proxyAddress = entry.getValue().v1();
|
||||
|
||||
RemoteClusterConnection remote = this.remoteClusters.get(entry.getKey());
|
||||
|
@ -408,9 +408,10 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
|
|||
final List<String> addresses,
|
||||
final String proxyAddress,
|
||||
final ActionListener<Void> connectionListener) {
|
||||
final List<Supplier<DiscoveryNode>> nodes = addresses.stream().<Supplier<DiscoveryNode>>map(address -> () ->
|
||||
buildSeedNode(clusterAlias, address, Strings.hasLength(proxyAddress))
|
||||
).collect(Collectors.toList());
|
||||
final List<Tuple<String, Supplier<DiscoveryNode>>> nodes =
|
||||
addresses.stream().<Tuple<String, Supplier<DiscoveryNode>>>map(address -> Tuple.tuple(address, () ->
|
||||
buildSeedNode(clusterAlias, address, Strings.hasLength(proxyAddress)))
|
||||
).collect(Collectors.toList());
|
||||
updateRemoteClusters(Collections.singletonMap(clusterAlias, new Tuple<>(proxyAddress, nodes)), connectionListener);
|
||||
}
|
||||
|
||||
|
@ -421,7 +422,8 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
|
|||
void initializeRemoteClusters() {
|
||||
final TimeValue timeValue = REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings);
|
||||
final PlainActionFuture<Void> future = new PlainActionFuture<>();
|
||||
Map<String, Tuple<String, List<Supplier<DiscoveryNode>>>> seeds = RemoteClusterAware.buildRemoteClustersDynamicConfig(settings);
|
||||
Map<String, Tuple<String, List<Tuple<String, Supplier<DiscoveryNode>>>>> seeds =
|
||||
RemoteClusterAware.buildRemoteClustersDynamicConfig(settings);
|
||||
updateRemoteClusters(seeds, future);
|
||||
try {
|
||||
future.get(timeValue.millis(), TimeUnit.MILLISECONDS);
|
||||
|
|
|
@ -16,9 +16,11 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
|
@ -27,25 +29,29 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||
import org.elasticsearch.common.xcontent.ToXContentFragment;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
|
||||
/**
|
||||
* This class encapsulates all remote cluster information to be rendered on
|
||||
* {@code _remote/info} requests.
|
||||
*/
|
||||
public final class RemoteConnectionInfo implements ToXContentFragment, Writeable {
|
||||
final List<TransportAddress> seedNodes;
|
||||
final List<String> seedNodes;
|
||||
final int connectionsPerCluster;
|
||||
final TimeValue initialConnectionTimeout;
|
||||
final int numNodesConnected;
|
||||
final String clusterAlias;
|
||||
final boolean skipUnavailable;
|
||||
|
||||
RemoteConnectionInfo(String clusterAlias, List<TransportAddress> seedNodes,
|
||||
RemoteConnectionInfo(String clusterAlias, List<String> seedNodes,
|
||||
int connectionsPerCluster, int numNodesConnected,
|
||||
TimeValue initialConnectionTimeout, boolean skipUnavailable) {
|
||||
this.clusterAlias = clusterAlias;
|
||||
|
@ -57,7 +63,17 @@ public final class RemoteConnectionInfo implements ToXContentFragment, Writeable
|
|||
}
|
||||
|
||||
public RemoteConnectionInfo(StreamInput input) throws IOException {
|
||||
seedNodes = input.readList(TransportAddress::new);
|
||||
if (input.getVersion().onOrAfter(Version.V_7_0_0)) {
|
||||
seedNodes = Arrays.asList(input.readStringArray());
|
||||
} else {
|
||||
// versions prior to 7.0.0 sent the resolved transport address of the seed nodes
|
||||
final List<TransportAddress> transportAddresses = input.readList(TransportAddress::new);
|
||||
seedNodes =
|
||||
transportAddresses
|
||||
.stream()
|
||||
.map(a -> a.address().getHostString() + ":" + a.address().getPort())
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
if (input.getVersion().before(Version.V_7_0_0)) {
|
||||
/*
|
||||
* Versions before 7.0 sent the HTTP addresses of all nodes in the
|
||||
|
@ -78,7 +94,26 @@ public final class RemoteConnectionInfo implements ToXContentFragment, Writeable
|
|||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeList(seedNodes);
|
||||
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
|
||||
out.writeStringArray(seedNodes.toArray(new String[0]));
|
||||
} else {
|
||||
// versions prior to 7.0.0 received the resolved transport address of the seed nodes
|
||||
out.writeList(seedNodes
|
||||
.stream()
|
||||
.map(
|
||||
s -> {
|
||||
final Tuple<String, Integer> hostPort = RemoteClusterAware.parseHostPort(s);
|
||||
assert hostPort.v2() != null : s;
|
||||
try {
|
||||
return new TransportAddress(
|
||||
InetAddress.getByAddress(hostPort.v1(), TransportAddress.META_ADDRESS.getAddress()),
|
||||
hostPort.v2());
|
||||
} catch (final UnknownHostException e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
})
|
||||
.collect(Collectors.toList()));
|
||||
}
|
||||
if (out.getVersion().before(Version.V_7_0_0)) {
|
||||
/*
|
||||
* Versions before 7.0 sent the HTTP addresses of all nodes in the
|
||||
|
@ -104,8 +139,8 @@ public final class RemoteConnectionInfo implements ToXContentFragment, Writeable
|
|||
builder.startObject(clusterAlias);
|
||||
{
|
||||
builder.startArray("seeds");
|
||||
for (TransportAddress addr : seedNodes) {
|
||||
builder.value(addr.toString());
|
||||
for (String addr : seedNodes) {
|
||||
builder.value(addr);
|
||||
}
|
||||
builder.endArray();
|
||||
builder.field("connected", numNodesConnected > 0);
|
||||
|
@ -136,4 +171,5 @@ public final class RemoteConnectionInfo implements ToXContentFragment, Writeable
|
|||
return Objects.hash(seedNodes, connectionsPerCluster, initialConnectionTimeout,
|
||||
numNodesConnected, clusterAlias, skipUnavailable);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -80,6 +81,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.emptySet;
|
||||
|
@ -164,9 +166,9 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
service.start();
|
||||
service.acceptIncomingRequests();
|
||||
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
|
||||
Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true, null)) {
|
||||
Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null)) {
|
||||
ConnectionManager connectionManager = connection.getConnectionManager();
|
||||
updateSeedNodes(connection, Arrays.asList(() -> seedNode));
|
||||
updateSeedNodes(connection, seedNodes(seedNode));
|
||||
assertTrue(connectionManager.nodeConnected(seedNode));
|
||||
assertTrue(connectionManager.nodeConnected(discoverableNode));
|
||||
assertTrue(connection.assertNoRunningConnections());
|
||||
|
@ -206,9 +208,9 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
service.start();
|
||||
service.acceptIncomingRequests();
|
||||
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
|
||||
Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true, null)) {
|
||||
Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null)) {
|
||||
ConnectionManager connectionManager = connection.getConnectionManager();
|
||||
updateSeedNodes(connection, Arrays.asList(() -> seedNode));
|
||||
updateSeedNodes(connection, seedNodes(seedNode));
|
||||
assertTrue(connectionManager.nodeConnected(seedNode));
|
||||
assertTrue(connectionManager.nodeConnected(discoverableNode));
|
||||
assertTrue(connection.assertNoRunningConnections());
|
||||
|
@ -259,9 +261,9 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
service.start();
|
||||
service.acceptIncomingRequests();
|
||||
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
|
||||
Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true, null)) {
|
||||
Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null)) {
|
||||
ConnectionManager connectionManager = connection.getConnectionManager();
|
||||
updateSeedNodes(connection, Arrays.asList(() -> seedNode));
|
||||
updateSeedNodes(connection, seedNodes(seedNode));
|
||||
assertTrue(connectionManager.nodeConnected(seedNode));
|
||||
assertTrue(connectionManager.nodeConnected(discoverableNode));
|
||||
assertTrue(connection.assertNoRunningConnections());
|
||||
|
@ -282,7 +284,9 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
knownNodes.add(discoverableTransport.getLocalDiscoNode());
|
||||
knownNodes.add(incompatibleTransport.getLocalDiscoNode());
|
||||
Collections.shuffle(knownNodes, random());
|
||||
List<Supplier<DiscoveryNode>> seedNodes = Arrays.asList(() -> incompatibleSeedNode, () -> seedNode);
|
||||
List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes = Arrays.asList(
|
||||
Tuple.tuple(incompatibleSeedNode.toString(), () -> incompatibleSeedNode),
|
||||
Tuple.tuple(seedNode.toString(), () -> seedNode));
|
||||
Collections.shuffle(seedNodes, random());
|
||||
|
||||
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
|
||||
|
@ -317,9 +321,9 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
service.start();
|
||||
service.acceptIncomingRequests();
|
||||
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
|
||||
Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true, null)) {
|
||||
seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null)) {
|
||||
ConnectionManager connectionManager = connection.getConnectionManager();
|
||||
updateSeedNodes(connection, Arrays.asList(() -> seedNode));
|
||||
updateSeedNodes(connection, seedNodes(seedNode));
|
||||
assertTrue(connectionManager.nodeConnected(seedNode));
|
||||
assertTrue(connectionManager.nodeConnected(discoverableNode));
|
||||
assertFalse(connectionManager.nodeConnected(spareNode));
|
||||
|
@ -367,9 +371,9 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
service.start();
|
||||
service.acceptIncomingRequests();
|
||||
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
|
||||
Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> n.equals(rejectedNode) == false, null)) {
|
||||
seedNodes(seedNode), service, Integer.MAX_VALUE, n -> n.equals(rejectedNode) == false, null)) {
|
||||
ConnectionManager connectionManager = connection.getConnectionManager();
|
||||
updateSeedNodes(connection, Arrays.asList(() -> seedNode));
|
||||
updateSeedNodes(connection, seedNodes(seedNode));
|
||||
if (rejectedNode.equals(seedNode)) {
|
||||
assertFalse(connectionManager.nodeConnected(seedNode));
|
||||
assertTrue(connectionManager.nodeConnected(discoverableNode));
|
||||
|
@ -382,11 +386,15 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
}
|
||||
private void updateSeedNodes(RemoteClusterConnection connection, List<Supplier<DiscoveryNode>> seedNodes) throws Exception {
|
||||
private void updateSeedNodes(
|
||||
final RemoteClusterConnection connection, final List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes) throws Exception {
|
||||
updateSeedNodes(connection, seedNodes, null);
|
||||
}
|
||||
|
||||
private void updateSeedNodes(RemoteClusterConnection connection, List<Supplier<DiscoveryNode>> seedNodes, String proxyAddress)
|
||||
private void updateSeedNodes(
|
||||
final RemoteClusterConnection connection,
|
||||
final List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes,
|
||||
final String proxyAddress)
|
||||
throws Exception {
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<>();
|
||||
|
@ -428,9 +436,11 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
service.start();
|
||||
service.acceptIncomingRequests();
|
||||
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
|
||||
Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true, null)) {
|
||||
Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null)) {
|
||||
ConnectionManager connectionManager = connection.getConnectionManager();
|
||||
expectThrows(Exception.class, () -> updateSeedNodes(connection, Arrays.asList(() -> seedNode)));
|
||||
expectThrows(
|
||||
Exception.class,
|
||||
() -> updateSeedNodes(connection, Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode))));
|
||||
assertFalse(connectionManager.nodeConnected(seedNode));
|
||||
assertTrue(connection.assertNoRunningConnections());
|
||||
}
|
||||
|
@ -481,7 +491,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
service.start();
|
||||
service.acceptIncomingRequests();
|
||||
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
|
||||
Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true, null, connectionManager)) {
|
||||
seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null, connectionManager)) {
|
||||
connection.addConnectedNode(seedNode);
|
||||
for (DiscoveryNode node : knownNodes) {
|
||||
final Transport.Connection transportConnection = connection.getConnection(node);
|
||||
|
@ -524,7 +534,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
CountDownLatch listenerCalled = new CountDownLatch(1);
|
||||
AtomicReference<Exception> exceptionReference = new AtomicReference<>();
|
||||
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
|
||||
Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true, null)) {
|
||||
seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null)) {
|
||||
ActionListener<Void> listener = ActionListener.wrap(x -> {
|
||||
listenerCalled.countDown();
|
||||
fail("expected exception");
|
||||
|
@ -532,7 +542,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
exceptionReference.set(x);
|
||||
listenerCalled.countDown();
|
||||
});
|
||||
connection.updateSeedNodes(null, Arrays.asList(() -> seedNode), listener);
|
||||
connection.updateSeedNodes(null, seedNodes(seedNode), listener);
|
||||
acceptedLatch.await();
|
||||
connection.close(); // now close it, this should trigger an interrupt on the socket and we can move on
|
||||
assertTrue(connection.assertNoRunningConnections());
|
||||
|
@ -548,6 +558,18 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
private List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes(final DiscoveryNode... seedNodes) {
|
||||
if (seedNodes.length == 0) {
|
||||
return Collections.emptyList();
|
||||
} else if (seedNodes.length == 1) {
|
||||
return Collections.singletonList(Tuple.tuple(seedNodes[0].toString(), () -> seedNodes[0]));
|
||||
} else {
|
||||
return Arrays.stream(seedNodes)
|
||||
.map(s -> Tuple.tuple(s.toString(), (Supplier<DiscoveryNode>)() -> s))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
|
||||
public void testFetchShards() throws Exception {
|
||||
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
|
||||
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
|
||||
|
@ -559,11 +581,11 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
|
||||
service.start();
|
||||
service.acceptIncomingRequests();
|
||||
List<Supplier<DiscoveryNode>> nodes = Collections.singletonList(() -> seedNode);
|
||||
final List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes = seedNodes(seedNode);
|
||||
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
|
||||
nodes, service, Integer.MAX_VALUE, n -> true, null)) {
|
||||
seedNodes, service, Integer.MAX_VALUE, n -> true, null)) {
|
||||
if (randomBoolean()) {
|
||||
updateSeedNodes(connection, nodes);
|
||||
updateSeedNodes(connection, seedNodes);
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
connection.updateSkipUnavailable(randomBoolean());
|
||||
|
@ -599,9 +621,9 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
|
||||
service.start();
|
||||
service.acceptIncomingRequests();
|
||||
List<Supplier<DiscoveryNode>> nodes = Collections.singletonList(() -> seedNode);
|
||||
final List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes = seedNodes(seedNode);
|
||||
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
|
||||
nodes, service, Integer.MAX_VALUE, n -> true, null)) {
|
||||
seedNodes, service, Integer.MAX_VALUE, n -> true, null)) {
|
||||
SearchRequest request = new SearchRequest("test-index");
|
||||
Thread[] threads = new Thread[10];
|
||||
for (int i = 0; i < threads.length; i++) {
|
||||
|
@ -655,7 +677,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
service.start();
|
||||
service.acceptIncomingRequests();
|
||||
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
|
||||
Collections.singletonList(() -> seedNode), service, Integer.MAX_VALUE, n -> true, null)) {
|
||||
seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null)) {
|
||||
ConnectionManager connectionManager = connection.getConnectionManager();
|
||||
|
||||
SearchRequest request = new SearchRequest("test-index");
|
||||
|
@ -759,7 +781,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
knownNodes.add(discoverableTransport.getLocalDiscoNode());
|
||||
knownNodes.add(seedTransport1.getLocalDiscoNode());
|
||||
Collections.shuffle(knownNodes, random());
|
||||
List<Supplier<DiscoveryNode>> seedNodes = Arrays.asList(() -> seedNode1, () -> seedNode);
|
||||
List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes = seedNodes(seedNode1, seedNode);
|
||||
Collections.shuffle(seedNodes, random());
|
||||
|
||||
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
|
||||
|
@ -839,7 +861,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
knownNodes.add(discoverableTransport.getLocalDiscoNode());
|
||||
knownNodes.add(seedTransport1.getLocalDiscoNode());
|
||||
Collections.shuffle(knownNodes, random());
|
||||
List<Supplier<DiscoveryNode>> seedNodes = Arrays.asList(() -> seedNode1, () -> seedNode);
|
||||
List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes = seedNodes(seedNode1, seedNode);
|
||||
Collections.shuffle(seedNodes, random());
|
||||
|
||||
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
|
||||
|
@ -926,7 +948,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
knownNodes.add(transport3.getLocalDiscoNode());
|
||||
knownNodes.add(transport2.getLocalDiscoNode());
|
||||
Collections.shuffle(knownNodes, random());
|
||||
List<Supplier<DiscoveryNode>> seedNodes = Arrays.asList(() -> node3, () -> node1, () -> node2);
|
||||
List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes = seedNodes(node3, node1, node2);
|
||||
Collections.shuffle(seedNodes, random());
|
||||
|
||||
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
|
||||
|
@ -958,44 +980,32 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testRemoteConnectionInfo() throws IOException {
|
||||
RemoteConnectionInfo stats = new RemoteConnectionInfo("test_cluster",
|
||||
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)),
|
||||
4, 3, TimeValue.timeValueMinutes(30), false);
|
||||
RemoteConnectionInfo stats =
|
||||
new RemoteConnectionInfo("test_cluster", Arrays.asList("seed:1"), 4, 3, TimeValue.timeValueMinutes(30), false);
|
||||
assertSerialization(stats);
|
||||
|
||||
RemoteConnectionInfo stats1 = new RemoteConnectionInfo("test_cluster",
|
||||
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)),
|
||||
4, 4, TimeValue.timeValueMinutes(30), true);
|
||||
RemoteConnectionInfo stats1 =
|
||||
new RemoteConnectionInfo("test_cluster", Arrays.asList("seed:1"), 4, 4, TimeValue.timeValueMinutes(30), true);
|
||||
assertSerialization(stats1);
|
||||
assertNotEquals(stats, stats1);
|
||||
|
||||
stats1 = new RemoteConnectionInfo("test_cluster_1",
|
||||
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)),
|
||||
4, 3, TimeValue.timeValueMinutes(30), false);
|
||||
stats1 = new RemoteConnectionInfo("test_cluster_1", Arrays.asList("seed:1"), 4, 3, TimeValue.timeValueMinutes(30), false);
|
||||
assertSerialization(stats1);
|
||||
assertNotEquals(stats, stats1);
|
||||
|
||||
stats1 = new RemoteConnectionInfo("test_cluster",
|
||||
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 15)),
|
||||
4, 3, TimeValue.timeValueMinutes(30), false);
|
||||
stats1 = new RemoteConnectionInfo("test_cluster", Arrays.asList("seed:15"), 4, 3, TimeValue.timeValueMinutes(30), false);
|
||||
assertSerialization(stats1);
|
||||
assertNotEquals(stats, stats1);
|
||||
|
||||
stats1 = new RemoteConnectionInfo("test_cluster",
|
||||
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)),
|
||||
4, 3, TimeValue.timeValueMinutes(30), true);
|
||||
stats1 = new RemoteConnectionInfo("test_cluster", Arrays.asList("seed:1"), 4, 3, TimeValue.timeValueMinutes(30), true);
|
||||
assertSerialization(stats1);
|
||||
assertNotEquals(stats, stats1);
|
||||
|
||||
stats1 = new RemoteConnectionInfo("test_cluster",
|
||||
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)),
|
||||
4, 3, TimeValue.timeValueMinutes(325), true);
|
||||
stats1 = new RemoteConnectionInfo("test_cluster", Arrays.asList("seed:1"), 4, 3, TimeValue.timeValueMinutes(325), true);
|
||||
assertSerialization(stats1);
|
||||
assertNotEquals(stats, stats1);
|
||||
|
||||
stats1 = new RemoteConnectionInfo("test_cluster",
|
||||
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)),
|
||||
5, 3, TimeValue.timeValueMinutes(30), false);
|
||||
stats1 = new RemoteConnectionInfo("test_cluster", Arrays.asList("seed:1"), 5, 3, TimeValue.timeValueMinutes(30), false);
|
||||
assertSerialization(stats1);
|
||||
assertNotEquals(stats, stats1);
|
||||
}
|
||||
|
@ -1016,9 +1026,8 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
public void testRemoteConnectionInfoBwComp() throws IOException {
|
||||
final Version version = VersionUtils.randomVersionBetween(random(),
|
||||
Version.V_6_1_0, VersionUtils.getPreviousVersion(Version.V_7_0_0));
|
||||
RemoteConnectionInfo expected = new RemoteConnectionInfo("test_cluster",
|
||||
Collections.singletonList(new TransportAddress(TransportAddress.META_ADDRESS, 1)),
|
||||
4, 4, new TimeValue(30, TimeUnit.MINUTES), false);
|
||||
RemoteConnectionInfo expected =
|
||||
new RemoteConnectionInfo("test_cluster", Arrays.asList("0.0.0.0:1"), 4, 4, new TimeValue(30, TimeUnit.MINUTES), false);
|
||||
|
||||
// This version was created using the serialization code in use from 6.1 but before 7.0
|
||||
String encoded = "AQQAAAAABzAuMC4wLjAAAAABAQQAAAAABzAuMC4wLjAAAABQBDwEBAx0ZXN0X2NsdXN0ZXIA";
|
||||
|
@ -1042,27 +1051,25 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testRenderConnectionInfoXContent() throws IOException {
|
||||
RemoteConnectionInfo stats = new RemoteConnectionInfo("test_cluster",
|
||||
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)),
|
||||
4, 3, TimeValue.timeValueMinutes(30), true);
|
||||
RemoteConnectionInfo stats =
|
||||
new RemoteConnectionInfo("test_cluster", Arrays.asList("seed:1"), 4, 3, TimeValue.timeValueMinutes(30), true);
|
||||
stats = assertSerialization(stats);
|
||||
XContentBuilder builder = XContentFactory.jsonBuilder();
|
||||
builder.startObject();
|
||||
stats.toXContent(builder, null);
|
||||
builder.endObject();
|
||||
assertEquals("{\"test_cluster\":{\"seeds\":[\"0.0.0.0:1\"],\"connected\":true," +
|
||||
assertEquals("{\"test_cluster\":{\"seeds\":[\"seed:1\"],\"connected\":true," +
|
||||
"\"num_nodes_connected\":3,\"max_connections_per_cluster\":4,\"initial_connect_timeout\":\"30m\"," +
|
||||
"\"skip_unavailable\":true}}", Strings.toString(builder));
|
||||
|
||||
stats = new RemoteConnectionInfo("some_other_cluster",
|
||||
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1), new TransportAddress(TransportAddress.META_ADDRESS, 2)),
|
||||
2, 0, TimeValue.timeValueSeconds(30), false);
|
||||
stats = new RemoteConnectionInfo(
|
||||
"some_other_cluster", Arrays.asList("seed:1", "seed:2"), 2, 0, TimeValue.timeValueSeconds(30), false);
|
||||
stats = assertSerialization(stats);
|
||||
builder = XContentFactory.jsonBuilder();
|
||||
builder.startObject();
|
||||
stats.toXContent(builder, null);
|
||||
builder.endObject();
|
||||
assertEquals("{\"some_other_cluster\":{\"seeds\":[\"0.0.0.0:1\",\"0.0.0.0:2\"],"
|
||||
assertEquals("{\"some_other_cluster\":{\"seeds\":[\"seed:1\",\"seed:2\"],"
|
||||
+ "\"connected\":false,\"num_nodes_connected\":0,\"max_connections_per_cluster\":2,\"initial_connect_timeout\":\"30s\"," +
|
||||
"\"skip_unavailable\":false}}", Strings.toString(builder));
|
||||
}
|
||||
|
@ -1081,7 +1088,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
service.start();
|
||||
service.acceptIncomingRequests();
|
||||
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
|
||||
Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true, null)) {
|
||||
seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null)) {
|
||||
ConnectionManager connectionManager = connection.getConnectionManager();
|
||||
assertFalse(connectionManager.nodeConnected(seedNode));
|
||||
assertFalse(connectionManager.nodeConnected(discoverableNode));
|
||||
|
@ -1131,9 +1138,9 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
service.start();
|
||||
service.acceptIncomingRequests();
|
||||
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
|
||||
Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true, null)) {
|
||||
seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null)) {
|
||||
if (randomBoolean()) {
|
||||
updateSeedNodes(connection, Arrays.asList(() -> seedNode));
|
||||
updateSeedNodes(connection, seedNodes(seedNode));
|
||||
}
|
||||
CountDownLatch responseLatch = new CountDownLatch(1);
|
||||
AtomicReference<Function<String, DiscoveryNode>> reference = new AtomicReference<>();
|
||||
|
@ -1165,14 +1172,14 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
List<MockTransportService> discoverableTransports = new CopyOnWriteArrayList<>();
|
||||
try {
|
||||
final int numDiscoverableNodes = randomIntBetween(5, 20);
|
||||
List<Supplier<DiscoveryNode>> discoverableNodes = new ArrayList<>(numDiscoverableNodes);
|
||||
List<Tuple<String, Supplier<DiscoveryNode>>> discoverableNodes = new ArrayList<>(numDiscoverableNodes);
|
||||
for (int i = 0; i < numDiscoverableNodes; i++ ) {
|
||||
MockTransportService transportService = startTransport("discoverable_node" + i, knownNodes, Version.CURRENT);
|
||||
discoverableNodes.add(transportService::getLocalDiscoNode);
|
||||
discoverableNodes.add(Tuple.tuple("discoverable_node" + i, transportService::getLocalDiscoNode));
|
||||
discoverableTransports.add(transportService);
|
||||
}
|
||||
|
||||
List<Supplier<DiscoveryNode>> seedNodes = randomSubsetOf(discoverableNodes);
|
||||
List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes = randomSubsetOf(discoverableNodes);
|
||||
Collections.shuffle(seedNodes, random());
|
||||
|
||||
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
|
||||
|
@ -1221,7 +1228,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
discoverableTransports.add(transportService);
|
||||
connection.addConnectedNode(transportService.getLocalDiscoNode());
|
||||
} else {
|
||||
DiscoveryNode node = randomFrom(discoverableNodes).get();
|
||||
DiscoveryNode node = randomFrom(discoverableNodes).v2().get();
|
||||
connection.onNodeDisconnected(node);
|
||||
}
|
||||
}
|
||||
|
@ -1269,14 +1276,16 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
service.start();
|
||||
service.acceptIncomingRequests();
|
||||
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
|
||||
Arrays.asList( () -> seedNode), service, Integer.MAX_VALUE, n -> true, null)) {
|
||||
seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null)) {
|
||||
ConnectionManager connectionManager = connection.getConnectionManager();
|
||||
updateSeedNodes(connection, Collections.singletonList(() -> seedNode));
|
||||
updateSeedNodes(connection, seedNodes(seedNode));
|
||||
assertTrue(connectionManager.nodeConnected(seedNode));
|
||||
assertTrue(connectionManager.nodeConnected(discoverableNode));
|
||||
assertTrue(connection.assertNoRunningConnections());
|
||||
List<Supplier<DiscoveryNode>> discoveryNodes =
|
||||
Arrays.asList(otherClusterTransport::getLocalDiscoNode, () -> seedNode);
|
||||
List<Tuple<String, Supplier<DiscoveryNode>>> discoveryNodes =
|
||||
Arrays.asList(
|
||||
Tuple.tuple("other", otherClusterTransport::getLocalDiscoNode),
|
||||
Tuple.tuple(seedNode.toString(), () -> seedNode));
|
||||
Collections.shuffle(discoveryNodes, random());
|
||||
updateSeedNodes(connection, discoveryNodes);
|
||||
assertTrue(connectionManager.nodeConnected(seedNode));
|
||||
|
@ -1287,7 +1296,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
assertTrue(connectionManager.nodeConnected(discoverableNode));
|
||||
assertTrue(connection.assertNoRunningConnections());
|
||||
IllegalStateException illegalStateException = expectThrows(IllegalStateException.class, () ->
|
||||
updateSeedNodes(connection, Arrays.asList(() -> otherClusterTransport.getLocalDiscoNode())));
|
||||
updateSeedNodes(connection, Arrays.asList(Tuple.tuple("other", otherClusterTransport::getLocalDiscoNode))));
|
||||
assertThat(illegalStateException.getMessage(),
|
||||
startsWith("handshake failed, mismatched cluster name [Cluster [otherCluster]]" +
|
||||
" - {other_cluster_discoverable_node}"));
|
||||
|
@ -1339,7 +1348,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
service.start();
|
||||
service.acceptIncomingRequests();
|
||||
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
|
||||
Collections.singletonList(() -> connectedNode), service, Integer.MAX_VALUE, n -> true, null, connectionManager)) {
|
||||
seedNodes(connectedNode), service, Integer.MAX_VALUE, n -> true, null, connectionManager)) {
|
||||
connection.addConnectedNode(connectedNode);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
//always a direct connection as the remote node is already connected
|
||||
|
@ -1376,10 +1385,10 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
service.start();
|
||||
service.acceptIncomingRequests();
|
||||
CountDownLatch multipleResolveLatch = new CountDownLatch(2);
|
||||
Supplier<DiscoveryNode> seedSupplier = () -> {
|
||||
Tuple<String, Supplier<DiscoveryNode>> seedSupplier = Tuple.tuple(seedNode.toString(), () -> {
|
||||
multipleResolveLatch.countDown();
|
||||
return seedNode;
|
||||
};
|
||||
});
|
||||
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
|
||||
Arrays.asList(seedSupplier), service, Integer.MAX_VALUE, n -> true, null)) {
|
||||
updateSeedNodes(connection, Arrays.asList(seedSupplier));
|
||||
|
@ -1409,9 +1418,9 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
threadPool, null, Collections.emptySet())) {
|
||||
service.start();
|
||||
service.acceptIncomingRequests();
|
||||
Supplier<DiscoveryNode> seedSupplier = () ->
|
||||
RemoteClusterAware.buildSeedNode("some-remote-cluster", "node_0:" + randomIntBetween(1, 10000), true);
|
||||
assertEquals("node_0", seedSupplier.get().getAttributes().get("server_name"));
|
||||
Tuple<String, Supplier<DiscoveryNode>> seedSupplier = Tuple.tuple("node_0", () ->
|
||||
RemoteClusterAware.buildSeedNode("some-remote-cluster", "node_0:" + randomIntBetween(1, 10000), true));
|
||||
assertEquals("node_0", seedSupplier.v2().get().getAttributes().get("server_name"));
|
||||
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
|
||||
Arrays.asList(seedSupplier), service, Integer.MAX_VALUE, n -> true, proxyAddress)) {
|
||||
updateSeedNodes(connection, Arrays.asList(seedSupplier), proxyAddress);
|
||||
|
|
|
@ -125,41 +125,42 @@ public class RemoteClusterServiceTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testBuildRemoteClustersDynamicConfig() throws Exception {
|
||||
Map<String, Tuple<String, List<Supplier<DiscoveryNode>>>> map = RemoteClusterService.buildRemoteClustersDynamicConfig(
|
||||
Settings.builder()
|
||||
.put("cluster.remote.foo.seeds", "192.168.0.1:8080")
|
||||
.put("cluster.remote.bar.seeds", "[::1]:9090")
|
||||
.put("cluster.remote.boom.seeds", "boom-node1.internal:1000")
|
||||
.put("cluster.remote.boom.proxy", "foo.bar.com:1234")
|
||||
.put("search.remote.quux.seeds", "quux:9300")
|
||||
.put("search.remote.quux.proxy", "quux-proxy:19300")
|
||||
.build());
|
||||
Map<String, Tuple<String, List<Tuple<String, Supplier<DiscoveryNode>>>>> map =
|
||||
RemoteClusterService.buildRemoteClustersDynamicConfig(
|
||||
Settings.builder()
|
||||
.put("cluster.remote.foo.seeds", "192.168.0.1:8080")
|
||||
.put("cluster.remote.bar.seeds", "[::1]:9090")
|
||||
.put("cluster.remote.boom.seeds", "boom-node1.internal:1000")
|
||||
.put("cluster.remote.boom.proxy", "foo.bar.com:1234")
|
||||
.put("search.remote.quux.seeds", "quux:9300")
|
||||
.put("search.remote.quux.proxy", "quux-proxy:19300")
|
||||
.build());
|
||||
assertThat(map.keySet(), containsInAnyOrder(equalTo("foo"), equalTo("bar"), equalTo("boom"), equalTo("quux")));
|
||||
assertThat(map.get("foo").v2(), hasSize(1));
|
||||
assertThat(map.get("bar").v2(), hasSize(1));
|
||||
assertThat(map.get("boom").v2(), hasSize(1));
|
||||
assertThat(map.get("quux").v2(), hasSize(1));
|
||||
|
||||
DiscoveryNode foo = map.get("foo").v2().get(0).get();
|
||||
DiscoveryNode foo = map.get("foo").v2().get(0).v2().get();
|
||||
assertEquals("", map.get("foo").v1());
|
||||
assertEquals(foo.getAddress(), new TransportAddress(new InetSocketAddress(InetAddress.getByName("192.168.0.1"), 8080)));
|
||||
assertEquals(foo.getId(), "foo#192.168.0.1:8080");
|
||||
assertEquals(foo.getVersion(), Version.CURRENT.minimumCompatibilityVersion());
|
||||
|
||||
DiscoveryNode bar = map.get("bar").v2().get(0).get();
|
||||
DiscoveryNode bar = map.get("bar").v2().get(0).v2().get();
|
||||
assertEquals(bar.getAddress(), new TransportAddress(new InetSocketAddress(InetAddress.getByName("[::1]"), 9090)));
|
||||
assertEquals(bar.getId(), "bar#[::1]:9090");
|
||||
assertEquals("", map.get("bar").v1());
|
||||
assertEquals(bar.getVersion(), Version.CURRENT.minimumCompatibilityVersion());
|
||||
|
||||
DiscoveryNode boom = map.get("boom").v2().get(0).get();
|
||||
DiscoveryNode boom = map.get("boom").v2().get(0).v2().get();
|
||||
assertEquals(boom.getAddress(), new TransportAddress(TransportAddress.META_ADDRESS, 0));
|
||||
assertEquals("boom-node1.internal", boom.getHostName());
|
||||
assertEquals(boom.getId(), "boom#boom-node1.internal:1000");
|
||||
assertEquals("foo.bar.com:1234", map.get("boom").v1());
|
||||
assertEquals(boom.getVersion(), Version.CURRENT.minimumCompatibilityVersion());
|
||||
|
||||
DiscoveryNode quux = map.get("quux").v2().get(0).get();
|
||||
DiscoveryNode quux = map.get("quux").v2().get(0).v2().get();
|
||||
assertEquals(quux.getAddress(), new TransportAddress(TransportAddress.META_ADDRESS, 0));
|
||||
assertEquals("quux", quux.getHostName());
|
||||
assertEquals(quux.getId(), "quux#quux:9300");
|
||||
|
|
Loading…
Reference in New Issue