Backport remote proxy mode stats and naming ()

* Update remote cluster stats to support simple mode ()

Remote cluster stats API currently only returns useful information if
the strategy in use is the SNIFF mode. This PR modifies the API to
provide relevant information if the user is in the SIMPLE mode. This
information is the configured addresses, max socket connections, and
open socket connections.

* Send hostname in SNI header in simple remote mode ()

Currently an intermediate proxy must route conncctions to the
appropriate remote cluster when using simple mode. This commit offers
a additional mechanism for the proxy to route the connections by
including the hostname in the TLS SNI header.

* Rename the remote connection mode simple to proxy ()

This commit renames the simple connection mode to the proxy connection
mode for remote cluster connections. In order to do this, the mode specific
settings which we namespaced by their mode (ex: sniff.seed and
proxy.addresses) have been reverted.

* Modify proxy mode to support a single address ()

Currently, the remote proxy connection mode uses a list setting for the
proxy address. This commit modifies this so that the setting is
proxy_address and only supports a single remote proxy address.
This commit is contained in:
Tim Brooks 2019-12-19 18:02:48 -07:00 committed by GitHub
parent 878852352d
commit cb73fb0f9b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 729 additions and 518 deletions
client/rest-high-level/src/test/java/org/elasticsearch/client
docs/reference/ccr
qa
ccs-unavailable-clusters/src/test/java/org/elasticsearch/search
full-cluster-restart/src/test/java/org/elasticsearch/upgrades
multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster
server/src
x-pack
plugin/ccr/src/test/java/org/elasticsearch/xpack
qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster

@ -83,7 +83,7 @@ public class CCRIT extends ESRestHighLevelClientTestCase {
String transportAddress = (String) nodesResponse.get("transport_address");
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.transientSettings(Collections.singletonMap("cluster.remote.local_cluster.sniff.seeds", transportAddress));
updateSettingsRequest.transientSettings(Collections.singletonMap("cluster.remote.local_cluster.seeds", transportAddress));
ClusterUpdateSettingsResponse updateSettingsResponse =
highLevelClient().cluster().putSettings(updateSettingsRequest, RequestOptions.DEFAULT);
assertThat(updateSettingsResponse.isAcknowledged(), is(true));

@ -87,7 +87,7 @@ public class CCRDocumentationIT extends ESRestHighLevelClientTestCase {
String transportAddress = (String) nodesResponse.get("transport_address");
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.transientSettings(Collections.singletonMap("cluster.remote.local.sniff.seeds", transportAddress));
updateSettingsRequest.transientSettings(Collections.singletonMap("cluster.remote.local.seeds", transportAddress));
ClusterUpdateSettingsResponse updateSettingsResponse =
client.cluster().putSettings(updateSettingsRequest, RequestOptions.DEFAULT);
assertThat(updateSettingsResponse.isAcknowledged(), is(true));

@ -135,7 +135,8 @@ remote cluster.
"num_nodes_connected" : 1, <2>
"max_connections_per_cluster" : 3,
"initial_connect_timeout" : "30s",
"skip_unavailable" : false
"skip_unavailable" : false,
"mode" : "sniff"
}
}
--------------------------------------------------
@ -146,7 +147,7 @@ remote cluster.
alias `leader`
<2> This shows the number of nodes in the remote cluster the local cluster is
connected to.
Alternatively, you can manage remote clusters on the
*Management / Elasticsearch / Remote Clusters* page in {kib}:

@ -144,7 +144,7 @@ public class CrossClusterSearchUnavailableClusterIT extends ESRestTestCase {
try (MockTransportService remoteTransport = startTransport("node0", new CopyOnWriteArrayList<>(), Version.CURRENT, threadPool)) {
DiscoveryNode remoteNode = remoteTransport.getLocalDiscoNode();
updateRemoteClusterSettings(Collections.singletonMap("sniff.seeds", remoteNode.getAddress().toString()));
updateRemoteClusterSettings(Collections.singletonMap("seeds", remoteNode.getAddress().toString()));
for (int i = 0; i < 10; i++) {
restHighLevelClient.index(
@ -229,7 +229,7 @@ public class CrossClusterSearchUnavailableClusterIT extends ESRestTestCase {
assertSearchConnectFailure();
Map<String, Object> map = new HashMap<>();
map.put("sniff.seeds", null);
map.put("seeds", null);
map.put("skip_unavailable", null);
updateRemoteClusterSettings(map);
}
@ -248,32 +248,32 @@ public class CrossClusterSearchUnavailableClusterIT extends ESRestTestCase {
() -> client().performRequest(request));
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
assertThat(responseException.getMessage(),
containsString("missing required setting [cluster.remote.remote1.sniff.seeds] " +
containsString("missing required setting [cluster.remote.remote1.seeds] " +
"for setting [cluster.remote.remote1.skip_unavailable]"));
}
Map<String, Object> settingsMap = new HashMap<>();
settingsMap.put("sniff.seeds", remoteNode.getAddress().toString());
settingsMap.put("seeds", remoteNode.getAddress().toString());
settingsMap.put("skip_unavailable", randomBoolean());
updateRemoteClusterSettings(settingsMap);
{
//check that seeds cannot be reset alone if skip_unavailable is set
Request request = new Request("PUT", "/_cluster/settings");
request.setEntity(buildUpdateSettingsRequestBody(Collections.singletonMap("sniff.seeds", null)));
request.setEntity(buildUpdateSettingsRequestBody(Collections.singletonMap("seeds", null)));
ResponseException responseException = expectThrows(ResponseException.class,
() -> client().performRequest(request));
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
assertThat(responseException.getMessage(), containsString("missing required setting [cluster.remote.remote1.sniff.seeds] " +
assertThat(responseException.getMessage(), containsString("missing required setting [cluster.remote.remote1.seeds] " +
"for setting [cluster.remote.remote1.skip_unavailable]"));
}
if (randomBoolean()) {
updateRemoteClusterSettings(Collections.singletonMap("skip_unavailable", null));
updateRemoteClusterSettings(Collections.singletonMap("sniff.seeds", null));
updateRemoteClusterSettings(Collections.singletonMap("seeds", null));
} else {
Map<String, Object> nullMap = new HashMap<>();
nullMap.put("sniff.seeds", null);
nullMap.put("seeds", null);
nullMap.put("skip_unavailable", null);
updateRemoteClusterSettings(nullMap);
}

@ -92,16 +92,10 @@ public class FullClusterRestartSettingsUpgradeIT extends AbstractFullClusterRest
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSettingForNamespace("foo").exists(settings));
assertTrue(RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSettingForNamespace("foo").get(settings));
assertFalse(SEARCH_REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace("foo").exists(settings));
assertTrue(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace("foo").exists(settings));
assertFalse(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace("foo").exists(settings));
assertTrue(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace("foo")
.existsOrFallbackExists(settings));
assertTrue(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace("foo").exists(settings));
assertThat(
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace("foo").get(settings),
equalTo(Collections.singletonList("localhost:9200")));
assertThat(
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace("foo").get(settings),
equalTo(Collections.singletonList("localhost:9200")));
}
}
}

@ -1,5 +1,5 @@
---
"Add transient remote cluster in simple mode with invalid sniff settings":
"Add transient remote cluster in proxy mode with invalid sniff settings":
- do:
cluster.get_settings:
include_defaults: true
@ -12,14 +12,14 @@
flat_settings: true
body:
transient:
cluster.remote.test_remote_cluster.mode: "simple"
cluster.remote.test_remote_cluster.sniff.node_connections: "5"
cluster.remote.test_remote_cluster.simple.addresses: $remote_ip
cluster.remote.test_remote_cluster.mode: "proxy"
cluster.remote.test_remote_cluster.node_connections: "5"
cluster.remote.test_remote_cluster.proxy_address: $remote_ip
- match: { status: 400 }
- match: { error.root_cause.0.type: "illegal_argument_exception" }
- match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.sniff.node_connections\" cannot be
used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SNIFF, configured=SIMPLE]" }
- match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.node_connections\" cannot be
used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SNIFF, configured=PROXY]" }
- do:
catch: bad_request
@ -27,17 +27,17 @@
flat_settings: true
body:
transient:
cluster.remote.test_remote_cluster.mode: "simple"
cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip
cluster.remote.test_remote_cluster.simple.addresses: $remote_ip
cluster.remote.test_remote_cluster.mode: "proxy"
cluster.remote.test_remote_cluster.seeds: $remote_ip
cluster.remote.test_remote_cluster.proxy_address: $remote_ip
- match: { status: 400 }
- match: { error.root_cause.0.type: "illegal_argument_exception" }
- match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.sniff.seeds\" cannot be
used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SNIFF, configured=SIMPLE]" }
- match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.seeds\" cannot be
used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SNIFF, configured=PROXY]" }
---
"Add transient remote cluster in sniff mode with invalid simple settings":
"Add transient remote cluster in sniff mode with invalid proxy settings":
- do:
cluster.get_settings:
include_defaults: true
@ -50,13 +50,13 @@
flat_settings: true
body:
transient:
cluster.remote.test_remote_cluster.simple.socket_connections: "20"
cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip
cluster.remote.test_remote_cluster.proxy_socket_connections: "20"
cluster.remote.test_remote_cluster.seeds: $remote_ip
- match: { status: 400 }
- match: { error.root_cause.0.type: "illegal_argument_exception" }
- match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.simple.socket_connections\" cannot be
used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SIMPLE, configured=SNIFF]" }
- match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.proxy_socket_connections\" cannot be
used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=PROXY, configured=SNIFF]" }
- do:
catch: bad_request
@ -64,16 +64,16 @@
flat_settings: true
body:
transient:
cluster.remote.test_remote_cluster.simple.addresses: $remote_ip
cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip
cluster.remote.test_remote_cluster.proxy_address: $remote_ip
cluster.remote.test_remote_cluster.seeds: $remote_ip
- match: { status: 400 }
- match: { error.root_cause.0.type: "illegal_argument_exception" }
- match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.simple.addresses\" cannot be
used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SIMPLE, configured=SNIFF]" }
- match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.proxy_address\" cannot be
used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=PROXY, configured=SNIFF]" }
---
"Add transient remote cluster using simple connection mode using valid settings":
"Add transient remote cluster using proxy connection mode using valid settings":
- do:
cluster.get_settings:
include_defaults: true
@ -85,13 +85,13 @@
flat_settings: true
body:
transient:
cluster.remote.test_remote_cluster.mode: "simple"
cluster.remote.test_remote_cluster.simple.socket_connections: "3"
cluster.remote.test_remote_cluster.simple.addresses: $remote_ip
cluster.remote.test_remote_cluster.mode: "proxy"
cluster.remote.test_remote_cluster.proxy_socket_connections: "3"
cluster.remote.test_remote_cluster.proxy_address: $remote_ip
- match: {transient.cluster\.remote\.test_remote_cluster\.mode: "simple"}
- match: {transient.cluster\.remote\.test_remote_cluster\.simple\.socket_connections: "3"}
- match: {transient.cluster\.remote\.test_remote_cluster\.simple\.addresses: $remote_ip}
- match: {transient.cluster\.remote\.test_remote_cluster\.mode: "proxy"}
- match: {transient.cluster\.remote\.test_remote_cluster\.proxy_socket_connections: "3"}
- match: {transient.cluster\.remote\.test_remote_cluster\.proxy_address: $remote_ip}
- do:
search:
@ -120,12 +120,12 @@
body:
transient:
cluster.remote.test_remote_cluster.mode: "sniff"
cluster.remote.test_remote_cluster.sniff.node_connections: "3"
cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip
cluster.remote.test_remote_cluster.node_connections: "3"
cluster.remote.test_remote_cluster.seeds: $remote_ip
- match: {transient.cluster\.remote\.test_remote_cluster\.mode: "sniff"}
- match: {transient.cluster\.remote\.test_remote_cluster\.sniff\.node_connections: "3"}
- match: {transient.cluster\.remote\.test_remote_cluster\.sniff\.seeds: $remote_ip}
- match: {transient.cluster\.remote\.test_remote_cluster\.node_connections: "3"}
- match: {transient.cluster\.remote\.test_remote_cluster\.seeds: $remote_ip}
- do:
search:
@ -154,10 +154,10 @@
body:
transient:
cluster.remote.test_remote_cluster.mode: "sniff"
cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip
cluster.remote.test_remote_cluster.seeds: $remote_ip
- match: {transient.cluster\.remote\.test_remote_cluster\.mode: "sniff"}
- match: {transient.cluster\.remote\.test_remote_cluster\.sniff\.seeds: $remote_ip}
- match: {transient.cluster\.remote\.test_remote_cluster\.seeds: $remote_ip}
- do:
search:
@ -178,25 +178,25 @@
flat_settings: true
body:
transient:
cluster.remote.test_remote_cluster.mode: "simple"
cluster.remote.test_remote_cluster.simple.addresses: $remote_ip
cluster.remote.test_remote_cluster.mode: "proxy"
cluster.remote.test_remote_cluster.proxy_address: $remote_ip
- match: { status: 400 }
- match: { error.root_cause.0.type: "illegal_argument_exception" }
- match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.sniff.seeds\" cannot be
used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SNIFF, configured=SIMPLE]" }
- match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.seeds\" cannot be
used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SNIFF, configured=PROXY]" }
- do:
cluster.put_settings:
flat_settings: true
body:
transient:
cluster.remote.test_remote_cluster.mode: "simple"
cluster.remote.test_remote_cluster.sniff.seeds: null
cluster.remote.test_remote_cluster.simple.addresses: $remote_ip
cluster.remote.test_remote_cluster.mode: "proxy"
cluster.remote.test_remote_cluster.seeds: null
cluster.remote.test_remote_cluster.proxy_address: $remote_ip
- match: {transient.cluster\.remote\.test_remote_cluster\.mode: "simple"}
- match: {transient.cluster\.remote\.test_remote_cluster\.simple\.addresses: $remote_ip}
- match: {transient.cluster\.remote\.test_remote_cluster\.mode: "proxy"}
- match: {transient.cluster\.remote\.test_remote_cluster\.proxy_address: $remote_ip}
- do:
search:

@ -1,5 +1,5 @@
---
"Fetch remote cluster info for existing cluster":
"Fetch remote cluster sniff info for existing cluster":
- do:
cluster.remote_info: {}
@ -7,6 +7,7 @@
- match: { my_remote_cluster.num_nodes_connected: 1}
- match: { my_remote_cluster.max_connections_per_cluster: 1}
- match: { my_remote_cluster.initial_connect_timeout: "30s" }
- match: { my_remote_cluster.mode: "sniff" }
---
"Add transient remote cluster based on the preset cluster and check remote info":
@ -21,9 +22,13 @@
flat_settings: true
body:
transient:
cluster.remote.test_remote_cluster.mode: "sniff"
cluster.remote.test_remote_cluster.node_connections: "2"
cluster.remote.test_remote_cluster.seeds: $remote_ip
- match: {transient: {cluster.remote.test_remote_cluster.seeds: $remote_ip}}
- match: {transient.cluster\.remote\.test_remote_cluster\.mode: "sniff"}
- match: {transient.cluster\.remote\.test_remote_cluster\.node_connections: "2"}
- match: {transient.cluster\.remote\.test_remote_cluster\.seeds: $remote_ip}
# we do another search here since this will enforce the connection to be established
# otherwise the cluster might not have been connected yet.
@ -45,19 +50,49 @@
- match: { my_remote_cluster.seeds.0: $remote_ip }
- match: { my_remote_cluster.num_nodes_connected: 1}
- match: { test_remote_cluster.num_nodes_connected: 1}
- gt: { test_remote_cluster.num_nodes_connected: 0}
- match: { my_remote_cluster.max_connections_per_cluster: 1}
- match: { test_remote_cluster.max_connections_per_cluster: 1}
- match: { test_remote_cluster.max_connections_per_cluster: 2}
- match: { my_remote_cluster.initial_connect_timeout: "30s" }
- match: { test_remote_cluster.initial_connect_timeout: "30s" }
- match: { my_remote_cluster.mode: "sniff" }
- match: { test_remote_cluster.mode: "sniff" }
- do:
cluster.put_settings:
flat_settings: true
body:
transient:
cluster.remote.test_remote_cluster.mode: "proxy"
cluster.remote.test_remote_cluster.seeds: null
cluster.remote.test_remote_cluster.node_connections: null
cluster.remote.test_remote_cluster.proxy_socket_connections: "10"
cluster.remote.test_remote_cluster.proxy_address: $remote_ip
- match: {transient.cluster\.remote\.test_remote_cluster\.mode: "proxy"}
- match: {transient.cluster\.remote\.test_remote_cluster\.proxy_socket_connections: "10"}
- match: {transient.cluster\.remote\.test_remote_cluster\.proxy_address: $remote_ip}
- do:
cluster.remote_info: {}
- match: { test_remote_cluster.connected: true }
- match: { test_remote_cluster.address: $remote_ip }
- gt: { test_remote_cluster.num_sockets_connected: 0}
- match: { test_remote_cluster.max_socket_connections: 10}
- match: { test_remote_cluster.initial_connect_timeout: "30s" }
- match: { test_remote_cluster.mode: "proxy" }
- do:
cluster.put_settings:
body:
transient:
cluster.remote.test_remote_cluster.seeds: null
cluster.remote.test_remote_cluster.mode: null
cluster.remote.test_remote_cluster.proxy_socket_connections: null
cluster.remote.test_remote_cluster.proxy_address: null
---
"skip_unavailable is returned as part of _remote/info response":

@ -110,7 +110,7 @@ import org.elasticsearch.search.fetch.subphase.highlight.FastVectorHighlighter;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.RemoteConnectionStrategy;
import org.elasticsearch.transport.SimpleConnectionStrategy;
import org.elasticsearch.transport.ProxyConnectionStrategy;
import org.elasticsearch.transport.SniffConnectionStrategy;
import org.elasticsearch.transport.TransportSettings;
import org.elasticsearch.watcher.ResourceWatcherService;
@ -308,6 +308,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE,
RemoteClusterService.SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE,
SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER,
RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING,
RemoteClusterService.SEARCH_REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING,
RemoteClusterService.REMOTE_NODE_ATTRIBUTE,
@ -317,13 +318,12 @@ public final class ClusterSettings extends AbstractScopedSettings {
RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE,
RemoteClusterService.REMOTE_CLUSTER_COMPRESS,
RemoteConnectionStrategy.REMOTE_CONNECTION_MODE,
SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES,
SimpleConnectionStrategy.REMOTE_SOCKET_CONNECTIONS,
ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES,
ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS,
ProxyConnectionStrategy.INCLUDE_SERVER_NAME,
SniffConnectionStrategy.SEARCH_REMOTE_CLUSTERS_SEEDS,
SniffConnectionStrategy.SEARCH_REMOTE_CLUSTERS_PROXY,
SniffConnectionStrategy.SEARCH_REMOTE_CONNECTIONS_PER_CLUSTER,
SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER,
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD,
SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY,
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS,
SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS,

@ -1260,6 +1260,10 @@ public class Setting<T> implements ToXContentObject {
return new Setting<>(key, fallbackSetting, b -> parseBoolean(b, key, isFiltered(properties)), properties);
}
public static Setting<Boolean> boolSetting(String key, boolean defaultValue, Validator<Boolean> validator, Property... properties) {
return new Setting<>(key, Boolean.toString(defaultValue), b -> parseBoolean(b, key, isFiltered(properties)), validator, properties);
}
public static Setting<Boolean> boolSetting(String key, Function<Settings, String> defaultValueFn, Property... properties) {
return new Setting<>(key, defaultValueFn, b -> parseBoolean(b, key, isFiltered(properties)), properties);
}

@ -26,86 +26,109 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.elasticsearch.common.settings.Setting.boolSetting;
import static org.elasticsearch.common.settings.Setting.intSetting;
public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
/**
* A list of addresses for remote cluster connections. The connections will be opened to the configured addresses in a round-robin
* fashion.
* The remote address for the proxy. The connections will be opened to the configured address.
*/
public static final Setting.AffixSetting<List<String>> REMOTE_CLUSTER_ADDRESSES = Setting.affixKeySetting(
public static final Setting.AffixSetting<String> REMOTE_CLUSTER_ADDRESSES = Setting.affixKeySetting(
"cluster.remote.",
"simple.addresses",
(ns, key) -> Setting.listSetting(key, Collections.emptyList(), s -> {
// validate address
parsePort(s);
return s;
}, new StrategyValidator<>(ns, key, ConnectionStrategy.SIMPLE),
Setting.Property.Dynamic, Setting.Property.NodeScope));
"proxy_address",
(ns, key) -> Setting.simpleString(key, new StrategyValidator<>(ns, key, ConnectionStrategy.PROXY, s -> {
if (Strings.hasLength(s)) {
parsePort(s);
}
}), Setting.Property.Dynamic, Setting.Property.NodeScope));
/**
* The maximum number of socket connections that will be established to a remote cluster. The default is 18.
*/
public static final Setting.AffixSetting<Integer> REMOTE_SOCKET_CONNECTIONS = Setting.affixKeySetting(
"cluster.remote.",
"simple.socket_connections",
(ns, key) -> intSetting(key, 18, 1, new StrategyValidator<>(ns, key, ConnectionStrategy.SIMPLE),
"proxy_socket_connections",
(ns, key) -> intSetting(key, 18, 1, new StrategyValidator<>(ns, key, ConnectionStrategy.PROXY),
Setting.Property.Dynamic, Setting.Property.NodeScope));
/**
* Whether to include the hostname as a server_name attribute
*/
public static final Setting.AffixSetting<Boolean> INCLUDE_SERVER_NAME = Setting.affixKeySetting(
"cluster.remote.",
"include_server_name",
(ns, key) -> boolSetting(key, false, new StrategyValidator<>(ns, key, ConnectionStrategy.PROXY),
Setting.Property.Dynamic, Setting.Property.NodeScope));
static final int CHANNELS_PER_CONNECTION = 1;
private static final int MAX_CONNECT_ATTEMPTS_PER_RUN = 3;
private static final Logger logger = LogManager.getLogger(SimpleConnectionStrategy.class);
private static final Logger logger = LogManager.getLogger(ProxyConnectionStrategy.class);
private final int maxNumConnections;
private final AtomicLong counter = new AtomicLong(0);
private final List<String> configuredAddresses;
private final List<Supplier<TransportAddress>> addresses;
private final String configuredAddress;
private final boolean includeServerName;
private final Supplier<TransportAddress> address;
private final AtomicReference<ClusterName> remoteClusterName = new AtomicReference<>();
private final ConnectionProfile profile;
private final ConnectionManager.ConnectionValidator clusterNameValidator;
SimpleConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
Settings settings) {
ProxyConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
Settings settings) {
this(
clusterAlias,
transportService,
connectionManager,
REMOTE_SOCKET_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(settings),
REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(settings));
REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(settings),
INCLUDE_SERVER_NAME.getConcreteSettingForNamespace(clusterAlias).get(settings));
}
SimpleConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
int maxNumConnections, List<String> configuredAddresses) {
this(clusterAlias, transportService, connectionManager, maxNumConnections, configuredAddresses,
configuredAddresses.stream().map(address ->
(Supplier<TransportAddress>) () -> resolveAddress(address)).collect(Collectors.toList()));
ProxyConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
int maxNumConnections, String configuredAddress) {
this(clusterAlias, transportService, connectionManager, maxNumConnections, configuredAddress,
() -> resolveAddress(configuredAddress), false);
}
SimpleConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
int maxNumConnections, List<String> configuredAddresses, List<Supplier<TransportAddress>> addresses) {
ProxyConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
int maxNumConnections, String configuredAddress, boolean includeServerName) {
this(clusterAlias, transportService, connectionManager, maxNumConnections, configuredAddress,
() -> resolveAddress(configuredAddress), includeServerName);
}
ProxyConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
int maxNumConnections, String configuredAddress, Supplier<TransportAddress> address,
boolean includeServerName) {
super(clusterAlias, transportService, connectionManager);
this.maxNumConnections = maxNumConnections;
this.configuredAddresses = configuredAddresses;
assert addresses.isEmpty() == false : "Cannot use simple connection strategy with no configured addresses";
this.addresses = addresses;
this.configuredAddress = configuredAddress;
this.includeServerName = includeServerName;
assert Strings.isEmpty(configuredAddress) == false : "Cannot use proxy connection strategy with no configured addresses";
this.address = address;
// TODO: Move into the ConnectionManager
this.profile = new ConnectionProfile.Builder()
.addConnections(1, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING)
@ -128,7 +151,11 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
}
static Stream<Setting.AffixSetting<?>> enablementSettings() {
return Stream.of(SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES);
return Stream.of(ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES);
}
static Writeable.Reader<RemoteConnectionInfo.ModeInfo> infoReader() {
return ProxyModeInfo::new;
}
@Override
@ -138,28 +165,33 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
@Override
protected boolean strategyMustBeRebuilt(Settings newSettings) {
List<String> addresses = REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
String address = REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
int numOfSockets = REMOTE_SOCKET_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
return numOfSockets != maxNumConnections || addressesChanged(configuredAddresses, addresses);
return numOfSockets != maxNumConnections || configuredAddress.equals(address) == false;
}
@Override
protected ConnectionStrategy strategyType() {
return ConnectionStrategy.SIMPLE;
return ConnectionStrategy.PROXY;
}
@Override
protected void connectImpl(ActionListener<Void> listener) {
performSimpleConnectionProcess(listener);
performProxyConnectionProcess(listener);
}
private void performSimpleConnectionProcess(ActionListener<Void> listener) {
@Override
public RemoteConnectionInfo.ModeInfo getModeInfo() {
return new ProxyModeInfo(configuredAddress, maxNumConnections, connectionManager.size());
}
private void performProxyConnectionProcess(ActionListener<Void> listener) {
openConnections(listener, 1);
}
private void openConnections(ActionListener<Void> finished, int attemptNumber) {
if (attemptNumber <= MAX_CONNECT_ATTEMPTS_PER_RUN) {
List<TransportAddress> resolved = addresses.stream().map(Supplier::get).collect(Collectors.toList());
TransportAddress resolved = address.get();
int remaining = maxNumConnections - connectionManager.size();
ActionListener<Void> compositeListener = new ActionListener<Void>() {
@ -189,9 +221,15 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
for (int i = 0; i < remaining; ++i) {
TransportAddress address = nextAddress(resolved);
String id = clusterAlias + "#" + address;
DiscoveryNode node = new DiscoveryNode(id, address, Version.CURRENT.minimumCompatibilityVersion());
String id = clusterAlias + "#" + resolved;
Map<String, String> attributes;
if (includeServerName) {
attributes = Collections.singletonMap("server_name", resolved.address().getHostString());
} else {
attributes = Collections.emptyMap();
}
DiscoveryNode node = new DiscoveryNode(id, resolved, attributes, DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT.minimumCompatibilityVersion());
connectionManager.connectToNode(node, profile, clusterNameValidator, new ActionListener<Void>() {
@Override
@ -202,7 +240,7 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
@Override
public void onFailure(Exception e) {
logger.debug(new ParameterizedMessage("failed to open remote connection [remote cluster: {}, address: {}]",
clusterAlias, address), e);
clusterAlias, resolved), e);
compositeListener.onFailure(e);
}
});
@ -210,7 +248,7 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
} else {
int openConnections = connectionManager.size();
if (openConnections == 0) {
finished.onFailure(new IllegalStateException("Unable to open any simple connections to remote cluster [" + clusterAlias
finished.onFailure(new IllegalStateException("Unable to open any proxy connections to remote cluster [" + clusterAlias
+ "]"));
} else {
logger.debug("unable to open maximum number of connections [remote cluster: {}, opened: {}, maximum: {}]", clusterAlias,
@ -227,15 +265,70 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
}
private static TransportAddress resolveAddress(String address) {
return new TransportAddress(parseSeedAddress(address));
return new TransportAddress(parseConfiguredAddress(address));
}
private boolean addressesChanged(final List<String> oldAddresses, final List<String> newAddresses) {
if (oldAddresses.size() != newAddresses.size()) {
return true;
static class ProxyModeInfo implements RemoteConnectionInfo.ModeInfo {
private final String address;
private final int maxSocketConnections;
private final int numSocketsConnected;
ProxyModeInfo(String address, int maxSocketConnections, int numSocketsConnected) {
this.address = address;
this.maxSocketConnections = maxSocketConnections;
this.numSocketsConnected = numSocketsConnected;
}
private ProxyModeInfo(StreamInput input) throws IOException {
address = input.readString();
maxSocketConnections = input.readVInt();
numSocketsConnected = input.readVInt();
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("address", address);
builder.field("num_sockets_connected", numSocketsConnected);
builder.field("max_socket_connections", maxSocketConnections);
return builder;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(address);
out.writeVInt(maxSocketConnections);
out.writeVInt(numSocketsConnected);
}
@Override
public boolean isConnected() {
return numSocketsConnected > 0;
}
@Override
public String modeName() {
return "proxy";
}
@Override
public RemoteConnectionStrategy.ConnectionStrategy modeType() {
return RemoteConnectionStrategy.ConnectionStrategy.PROXY;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ProxyModeInfo otherProxy = (ProxyModeInfo) o;
return maxSocketConnections == otherProxy.maxSocketConnections &&
numSocketsConnected == otherProxy.numSocketsConnected &&
Objects.equals(address, otherProxy.address);
}
@Override
public int hashCode() {
return Objects.hash(address, maxSocketConnections, numSocketsConnected);
}
Set<String> oldSeeds = new HashSet<>(oldAddresses);
Set<String> newSeeds = new HashSet<>(newAddresses);
return oldSeeds.equals(newSeeds) == false;
}
}

@ -128,10 +128,10 @@ public abstract class RemoteClusterAware {
SniffConnectionStrategy.SEARCH_REMOTE_CLUSTERS_PROXY,
SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY,
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS,
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD,
SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS,
SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES,
SimpleConnectionStrategy.REMOTE_SOCKET_CONNECTIONS);
ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES,
ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS,
ProxyConnectionStrategy.INCLUDE_SERVER_NAME);
clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::validateAndUpdateRemoteCluster);
}

@ -34,7 +34,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.function.Function;
/**
@ -206,24 +205,7 @@ final class RemoteClusterConnection implements Closeable {
* Get the information about remote nodes to be rendered on {@code _remote/info} requests.
*/
public RemoteConnectionInfo getConnectionInfo() {
if (connectionStrategy instanceof SniffConnectionStrategy) {
SniffConnectionStrategy sniffStrategy = (SniffConnectionStrategy) this.connectionStrategy;
return new RemoteConnectionInfo(
clusterAlias,
sniffStrategy.getSeedNodes(),
sniffStrategy.getMaxConnections(),
getNumNodesConnected(),
initialConnectionTimeout,
skipUnavailable);
} else {
return new RemoteConnectionInfo(
clusterAlias,
Collections.emptyList(),
0,
getNumNodesConnected(),
initialConnectionTimeout,
skipUnavailable);
}
return new RemoteConnectionInfo(clusterAlias, connectionStrategy.getModeInfo(), initialConnectionTimeout, skipUnavailable);
}
int getNumNodesConnected() {

@ -44,116 +44,131 @@ import static java.util.Collections.emptyList;
* {@code _remote/info} requests.
*/
public final class RemoteConnectionInfo implements ToXContentFragment, Writeable {
final List<String> seedNodes;
final int connectionsPerCluster;
final ModeInfo modeInfo;
final TimeValue initialConnectionTimeout;
final int numNodesConnected;
final String clusterAlias;
final boolean skipUnavailable;
RemoteConnectionInfo(String clusterAlias, List<String> seedNodes,
int connectionsPerCluster, int numNodesConnected,
TimeValue initialConnectionTimeout, boolean skipUnavailable) {
RemoteConnectionInfo(String clusterAlias, ModeInfo modeInfo, TimeValue initialConnectionTimeout, boolean skipUnavailable) {
this.clusterAlias = clusterAlias;
this.seedNodes = seedNodes;
this.connectionsPerCluster = connectionsPerCluster;
this.numNodesConnected = numNodesConnected;
this.modeInfo = modeInfo;
this.initialConnectionTimeout = initialConnectionTimeout;
this.skipUnavailable = skipUnavailable;
}
public RemoteConnectionInfo(StreamInput input) throws IOException {
if (input.getVersion().onOrAfter(Version.V_7_0_0)) {
seedNodes = Arrays.asList(input.readStringArray());
if (input.getVersion().onOrAfter(Version.V_7_6_0)) {
RemoteConnectionStrategy.ConnectionStrategy mode = input.readEnum(RemoteConnectionStrategy.ConnectionStrategy.class);
modeInfo = mode.getReader().read(input);
initialConnectionTimeout = input.readTimeValue();
clusterAlias = input.readString();
skipUnavailable = input.readBoolean();
} else {
// versions prior to 7.0.0 sent the resolved transport address of the seed nodes
final List<TransportAddress> transportAddresses = input.readList(TransportAddress::new);
seedNodes =
transportAddresses
.stream()
.map(a -> a.address().getHostString() + ":" + a.address().getPort())
.collect(Collectors.toList());
List<String> seedNodes;
if (input.getVersion().onOrAfter(Version.V_7_0_0)) {
seedNodes = Arrays.asList(input.readStringArray());
} else {
// versions prior to 7.0.0 sent the resolved transport address of the seed nodes
final List<TransportAddress> transportAddresses = input.readList(TransportAddress::new);
seedNodes = transportAddresses
.stream()
.map(a -> a.address().getHostString() + ":" + a.address().getPort())
.collect(Collectors.toList());
/*
* Versions before 7.0 sent the HTTP addresses of all nodes in the
* remote cluster here but it was expensive to fetch and we
* ultimately figured out how to do without it. So we removed it.
*
* We just throw any HTTP addresses received here on the floor
* because we don't need to do anything with them.
*/
input.readList(TransportAddress::new);
}
int connectionsPerCluster = input.readVInt();
initialConnectionTimeout = input.readTimeValue();
int numNodesConnected = input.readVInt();
clusterAlias = input.readString();
skipUnavailable = input.readBoolean();
modeInfo = new SniffConnectionStrategy.SniffModeInfo(seedNodes, connectionsPerCluster, numNodesConnected);
}
if (input.getVersion().before(Version.V_7_0_0)) {
/*
* Versions before 7.0 sent the HTTP addresses of all nodes in the
* remote cluster here but it was expensive to fetch and we
* ultimately figured out how to do without it. So we removed it.
*
* We just throw any HTTP addresses received here on the floor
* because we don't need to do anything with them.
*/
input.readList(TransportAddress::new);
}
connectionsPerCluster = input.readVInt();
initialConnectionTimeout = input.readTimeValue();
numNodesConnected = input.readVInt();
clusterAlias = input.readString();
skipUnavailable = input.readBoolean();
}
public List<String> getSeedNodes() {
return seedNodes;
}
public int getConnectionsPerCluster() {
return connectionsPerCluster;
}
public TimeValue getInitialConnectionTimeout() {
return initialConnectionTimeout;
}
public int getNumNodesConnected() {
return numNodesConnected;
public boolean isConnected() {
return modeInfo.isConnected();
}
public String getClusterAlias() {
return clusterAlias;
}
public boolean isSkipUnavailable() {
return skipUnavailable;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeStringArray(seedNodes.toArray(new String[0]));
if (out.getVersion().onOrAfter(Version.V_7_6_0)) {
out.writeEnum(modeInfo.modeType());
modeInfo.writeTo(out);
out.writeTimeValue(initialConnectionTimeout);
} else {
// versions prior to 7.0.0 received the resolved transport address of the seed nodes
out.writeList(seedNodes
.stream()
.map(
if (modeInfo.modeType() == RemoteConnectionStrategy.ConnectionStrategy.SNIFF) {
SniffConnectionStrategy.SniffModeInfo sniffInfo = (SniffConnectionStrategy.SniffModeInfo) this.modeInfo;
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeStringArray(sniffInfo.seedNodes.toArray(new String[0]));
} else {
// versions prior to 7.0.0 received the resolved transport address of the seed nodes
out.writeList(sniffInfo.seedNodes
.stream()
.map(
s -> {
final Tuple<String, Integer> hostPort = RemoteConnectionStrategy.parseHostPort(s);
assert hostPort.v2() != null : s;
try {
return new TransportAddress(
InetAddress.getByAddress(hostPort.v1(), TransportAddress.META_ADDRESS.getAddress()),
hostPort.v2());
InetAddress.getByAddress(hostPort.v1(), TransportAddress.META_ADDRESS.getAddress()),
hostPort.v2());
} catch (final UnknownHostException e) {
throw new AssertionError(e);
}
})
.collect(Collectors.toList()));
.collect(Collectors.toList()));
/*
* Versions before 7.0 sent the HTTP addresses of all nodes in the
* remote cluster here but it was expensive to fetch and we
* ultimately figured out how to do without it. So we removed it.
*
* When sending this request to a node that expects HTTP addresses
* here we pretend that we didn't find any. This *should* be fine
* because, after all, we haven't been using this information for
* a while.
*/
out.writeList(emptyList());
}
out.writeVInt(sniffInfo.maxConnectionsPerCluster);
out.writeTimeValue(initialConnectionTimeout);
out.writeVInt(sniffInfo.numNodesConnected);
} else {
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeStringArray(new String[0]);
} else {
// versions prior to 7.0.0 received the resolved transport address of the seed nodes
out.writeList(emptyList());
/*
* Versions before 7.0 sent the HTTP addresses of all nodes in the
* remote cluster here but it was expensive to fetch and we
* ultimately figured out how to do without it. So we removed it.
*
* When sending this request to a node that expects HTTP addresses
* here we pretend that we didn't find any. This *should* be fine
* because, after all, we haven't been using this information for
* a while.
*/
out.writeList(emptyList());
}
out.writeVInt(0);
out.writeTimeValue(initialConnectionTimeout);
out.writeVInt(0);
}
}
if (out.getVersion().before(Version.V_7_0_0)) {
/*
* Versions before 7.0 sent the HTTP addresses of all nodes in the
* remote cluster here but it was expensive to fetch and we
* ultimately figured out how to do without it. So we removed it.
*
* When sending this request to a node that expects HTTP addresses
* here we pretend that we didn't find any. This *should* be fine
* because, after all, we haven't been using this information for
* a while.
*/
out.writeList(emptyList());
}
out.writeVInt(connectionsPerCluster);
out.writeTimeValue(initialConnectionTimeout);
out.writeVInt(numNodesConnected);
out.writeString(clusterAlias);
out.writeBoolean(skipUnavailable);
}
@ -162,14 +177,9 @@ public final class RemoteConnectionInfo implements ToXContentFragment, Writeable
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(clusterAlias);
{
builder.startArray("seeds");
for (String addr : seedNodes) {
builder.value(addr);
}
builder.endArray();
builder.field("connected", numNodesConnected > 0);
builder.field("num_nodes_connected", numNodesConnected);
builder.field("max_connections_per_cluster", connectionsPerCluster);
builder.field("connected", modeInfo.isConnected());
builder.field("mode", modeInfo.modeName());
modeInfo.toXContent(builder, params);
builder.field("initial_connect_timeout", initialConnectionTimeout);
builder.field("skip_unavailable", skipUnavailable);
}
@ -182,18 +192,23 @@ public final class RemoteConnectionInfo implements ToXContentFragment, Writeable
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RemoteConnectionInfo that = (RemoteConnectionInfo) o;
return connectionsPerCluster == that.connectionsPerCluster &&
numNodesConnected == that.numNodesConnected &&
Objects.equals(seedNodes, that.seedNodes) &&
return skipUnavailable == that.skipUnavailable &&
Objects.equals(modeInfo, that.modeInfo) &&
Objects.equals(initialConnectionTimeout, that.initialConnectionTimeout) &&
Objects.equals(clusterAlias, that.clusterAlias) &&
skipUnavailable == that.skipUnavailable;
Objects.equals(clusterAlias, that.clusterAlias);
}
@Override
public int hashCode() {
return Objects.hash(seedNodes, connectionsPerCluster, initialConnectionTimeout,
numNodesConnected, clusterAlias, skipUnavailable);
return Objects.hash(modeInfo, initialConnectionTimeout, clusterAlias, skipUnavailable);
}
public interface ModeInfo extends ToXContentFragment, Writeable {
boolean isConnected();
String modeName();
RemoteConnectionStrategy.ConnectionStrategy modeType();
}
}

@ -26,7 +26,9 @@ import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -57,20 +59,39 @@ import java.util.stream.Stream;
public abstract class RemoteConnectionStrategy implements TransportConnectionListener, Closeable {
enum ConnectionStrategy {
SNIFF(SniffConnectionStrategy.CHANNELS_PER_CONNECTION, SniffConnectionStrategy::enablementSettings),
SIMPLE(SimpleConnectionStrategy.CHANNELS_PER_CONNECTION, SimpleConnectionStrategy::enablementSettings);
SNIFF(SniffConnectionStrategy.CHANNELS_PER_CONNECTION, SniffConnectionStrategy::enablementSettings,
SniffConnectionStrategy::infoReader) {
@Override
public String toString() {
return "sniff";
}
},
PROXY(ProxyConnectionStrategy.CHANNELS_PER_CONNECTION, ProxyConnectionStrategy::enablementSettings,
ProxyConnectionStrategy::infoReader) {
@Override
public String toString() {
return "proxy";
}
};
private final int numberOfChannels;
private final Supplier<Stream<Setting.AffixSetting<?>>> enablementSettings;
private final Supplier<Writeable.Reader<RemoteConnectionInfo.ModeInfo>> reader;
ConnectionStrategy(int numberOfChannels, Supplier<Stream<Setting.AffixSetting<?>>> enablementSettings) {
ConnectionStrategy(int numberOfChannels, Supplier<Stream<Setting.AffixSetting<?>>> enablementSettings,
Supplier<Writeable.Reader<RemoteConnectionInfo.ModeInfo>> reader) {
this.numberOfChannels = numberOfChannels;
this.enablementSettings = enablementSettings;
this.reader = reader;
}
public int getNumberOfChannels() {
return numberOfChannels;
}
public Writeable.Reader<RemoteConnectionInfo.ModeInfo> getReader() {
return reader.get();
}
}
public static final Setting.AffixSetting<ConnectionStrategy> REMOTE_CONNECTION_MODE = Setting.affixKeySetting(
@ -120,8 +141,8 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis
switch (mode) {
case SNIFF:
return new SniffConnectionStrategy(clusterAlias, transportService, connectionManager, settings);
case SIMPLE:
return new SimpleConnectionStrategy(clusterAlias, transportService, connectionManager, settings);
case PROXY:
return new ProxyConnectionStrategy(clusterAlias, transportService, connectionManager, settings);
default:
throw new AssertionError("Invalid connection strategy" + mode);
}
@ -139,9 +160,8 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis
List<String> seeds = SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings);
return seeds.isEmpty() == false;
} else {
List<String> addresses = SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias)
.get(settings);
return addresses.isEmpty() == false;
String address = ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(settings);
return Strings.isEmpty(address) == false;
}
}
@ -150,7 +170,7 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis
return allConcreteSettings.map(affixSetting::getNamespace);
}
static InetSocketAddress parseSeedAddress(String remoteHost) {
static InetSocketAddress parseConfiguredAddress(String remoteHost) {
final Tuple<String, Integer> hostPort = parseHostPort(remoteHost);
final String host = hostPort.v1();
assert hostPort.v2() != null : remoteHost;
@ -310,6 +330,8 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis
protected abstract void connectImpl(ActionListener<Void> listener);
protected abstract RemoteConnectionInfo.ModeInfo getModeInfo();
private List<ActionListener<Void>> getAndClearListeners() {
final List<ActionListener<Void>> result;
synchronized (mutex) {

@ -36,16 +36,20 @@ import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.SettingUpgrader;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
@ -99,7 +103,7 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
/**
* A list of initial seed nodes to discover eligible nodes from the remote cluster
*/
public static final Setting.AffixSetting<List<String>> REMOTE_CLUSTER_SEEDS_OLD = Setting.affixKeySetting(
public static final Setting.AffixSetting<List<String>> REMOTE_CLUSTER_SEEDS = Setting.affixKeySetting(
"cluster.remote.",
"seeds",
(ns, key) -> Setting.listSetting(
@ -116,24 +120,6 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
Setting.Property.Dynamic,
Setting.Property.NodeScope));
/**
* A list of initial seed nodes to discover eligible nodes from the remote cluster
*/
public static final Setting.AffixSetting<List<String>> REMOTE_CLUSTER_SEEDS = Setting.affixKeySetting(
"cluster.remote.",
"sniff.seeds",
(ns, key) -> Setting.listSetting(key,
REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace(ns),
s -> {
// validate seed address
parsePort(s);
return s;
},
s -> REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace(ns).get(s),
new StrategyValidator<>(ns, key, ConnectionStrategy.SNIFF),
Setting.Property.Dynamic,
Setting.Property.NodeScope));
public static final Setting.AffixSetting<String> SEARCH_REMOTE_CLUSTERS_PROXY = Setting.affixKeySetting(
"search.remote.",
"proxy",
@ -189,7 +175,7 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
*/
public static final Setting.AffixSetting<Integer> REMOTE_NODE_CONNECTIONS = Setting.affixKeySetting(
"cluster.remote.",
"sniff.node_connections",
"node_connections",
(ns, key) -> intSetting(
key,
REMOTE_CONNECTIONS_PER_CLUSTER,
@ -259,7 +245,11 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
}
static Stream<Setting.AffixSetting<?>> enablementSettings() {
return Stream.of(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS, SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD);
return Stream.of(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS);
}
static Writeable.Reader<RemoteConnectionInfo.ModeInfo> infoReader() {
return SniffModeInfo::new;
}
@Override
@ -286,6 +276,11 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
collectRemoteNodes(seedNodes.iterator(), listener);
}
@Override
protected RemoteConnectionInfo.ModeInfo getModeInfo() {
return new SniffModeInfo(configuredSeedNodes, maxNumRemoteConnections, connectionManager.size());
}
private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes, ActionListener<Void> listener) {
if (Thread.currentThread().isInterrupted()) {
listener.onFailure(new InterruptedException("remote connect thread got interrupted"));
@ -481,11 +476,11 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
private static DiscoveryNode resolveSeedNode(String clusterAlias, String address, String proxyAddress) {
if (proxyAddress == null || proxyAddress.isEmpty()) {
TransportAddress transportAddress = new TransportAddress(parseSeedAddress(address));
TransportAddress transportAddress = new TransportAddress(parseConfiguredAddress(address));
return new DiscoveryNode(clusterAlias + "#" + transportAddress.toString(), transportAddress,
Version.CURRENT.minimumCompatibilityVersion());
} else {
TransportAddress transportAddress = new TransportAddress(parseSeedAddress(proxyAddress));
TransportAddress transportAddress = new TransportAddress(parseConfiguredAddress(proxyAddress));
String hostName = address.substring(0, indexOfPortSeparator(address));
return new DiscoveryNode("", clusterAlias + "#" + address, UUIDs.randomBase64UUID(), hostName, address,
transportAddress, Collections.singletonMap("server_name", hostName), DiscoveryNodeRole.BUILT_IN_ROLES,
@ -516,7 +511,7 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
return node;
} else {
// resolve proxy address lazy here
InetSocketAddress proxyInetAddress = parseSeedAddress(proxyAddress);
InetSocketAddress proxyInetAddress = parseConfiguredAddress(proxyAddress);
return new DiscoveryNode(node.getName(), node.getId(), node.getEphemeralId(), node.getHostName(), node
.getHostAddress(), new TransportAddress(proxyInetAddress), node.getAttributes(), node.getRoles(), node.getVersion());
}
@ -538,4 +533,72 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
return Objects.equals(oldProxy, newProxy) == false;
}
static class SniffModeInfo implements RemoteConnectionInfo.ModeInfo {
final List<String> seedNodes;
final int maxConnectionsPerCluster;
final int numNodesConnected;
SniffModeInfo(List<String> seedNodes, int maxConnectionsPerCluster, int numNodesConnected) {
this.seedNodes = seedNodes;
this.maxConnectionsPerCluster = maxConnectionsPerCluster;
this.numNodesConnected = numNodesConnected;
}
private SniffModeInfo(StreamInput input) throws IOException {
seedNodes = Arrays.asList(input.readStringArray());
maxConnectionsPerCluster = input.readVInt();
numNodesConnected = input.readVInt();
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startArray("seeds");
for (String address : seedNodes) {
builder.value(address);
}
builder.endArray();
builder.field("num_nodes_connected", numNodesConnected);
builder.field("max_connections_per_cluster", maxConnectionsPerCluster);
return builder;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeStringArray(seedNodes.toArray(new String[0]));
out.writeVInt(maxConnectionsPerCluster);
out.writeVInt(numNodesConnected);
}
@Override
public boolean isConnected() {
return numNodesConnected > 0;
}
@Override
public String modeName() {
return "sniff";
}
@Override
public RemoteConnectionStrategy.ConnectionStrategy modeType() {
return RemoteConnectionStrategy.ConnectionStrategy.SNIFF;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SniffModeInfo sniff = (SniffModeInfo) o;
return maxConnectionsPerCluster == sniff.maxConnectionsPerCluster &&
numNodesConnected == sniff.numNodesConnected &&
Objects.equals(seedNodes, sniff.seedNodes);
}
@Override
public int hashCode() {
return Objects.hash(seedNodes, maxConnectionsPerCluster, numNodesConnected);
}
}
}

@ -351,7 +351,7 @@ public class TransportSearchActionTests extends ESTestCase {
DiscoveryNode remoteSeedNode = remoteSeedTransport.getLocalDiscoNode();
knownNodes.add(remoteSeedNode);
nodes[i] = remoteSeedNode;
settingsBuilder.put("cluster.remote.remote" + i + ".sniff.seeds", remoteSeedNode.getAddress().toString());
settingsBuilder.put("cluster.remote.remote" + i + ".seeds", remoteSeedNode.getAddress().toString());
remoteIndices.put("remote" + i, new OriginalIndices(new String[]{"index"}, IndicesOptions.lenientExpandOpen()));
}
return mockTransportServices;

@ -147,9 +147,7 @@ public class UpgradeSettingsIT extends ESSingleNodeTestCase {
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSettingForNamespace("foo").get(settings),
equalTo(skipUnavailable));
assertFalse(SniffConnectionStrategy.SEARCH_REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace("foo").exists(settings));
assertTrue(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace("foo").exists(settings));
assertFalse(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace("foo").exists(settings));
assertTrue(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace("foo").existsOrFallbackExists(settings));
assertTrue(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace("foo").exists(settings));
assertThat(
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace("foo").get(settings),
equalTo(Collections.singletonList("localhost:9200")));

@ -22,7 +22,7 @@ package org.elasticsearch.transport;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.Strings;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.AbstractScopedSettings;
import org.elasticsearch.common.settings.ClusterSettings;
@ -31,26 +31,24 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.test.transport.StubbableTransport;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
public class SimpleConnectionStrategyTests extends ESTestCase {
public class ProxyConnectionStrategyTests extends ESTestCase {
private final String clusterAlias = "cluster-alias";
private final String modeKey = RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).getKey();
private final Settings settings = Settings.builder().put(modeKey, "simple").build();
private final Settings settings = Settings.builder().put(modeKey, "proxy").build();
private final ConnectionProfile profile = RemoteConnectionStrategy.buildConnectionProfile("cluster", settings);
private final ThreadPool threadPool = new TestThreadPool(getClass().getName());
@ -84,11 +82,9 @@ public class SimpleConnectionStrategyTests extends ESTestCase {
}
}
public void testSimpleStrategyWillOpenExpectedNumberOfConnectionsToAddresses() {
try (MockTransportService transport1 = startTransport("node1", Version.CURRENT);
MockTransportService transport2 = startTransport("node2", Version.CURRENT)) {
public void testProxyStrategyWillOpenExpectedNumberOfConnectionsToAddress() {
try (MockTransportService transport1 = startTransport("node1", Version.CURRENT)) {
TransportAddress address1 = transport1.boundAddress().publishAddress();
TransportAddress address2 = transport2.boundAddress().publishAddress();
try (MockTransportService localService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool)) {
localService.start();
@ -97,17 +93,15 @@ public class SimpleConnectionStrategyTests extends ESTestCase {
ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport);
int numOfConnections = randomIntBetween(4, 8);
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
SimpleConnectionStrategy strategy = new SimpleConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
numOfConnections, addresses(address1, address2))) {
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
numOfConnections, address1.toString())) {
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2)));
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
strategy.connect(connectFuture);
connectFuture.actionGet();
assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2)));
assertEquals(numOfConnections, connectionManager.size());
assertTrue(strategy.assertNoRunningConnections());
}
@ -115,7 +109,7 @@ public class SimpleConnectionStrategyTests extends ESTestCase {
}
}
public void testSimpleStrategyWillOpenNewConnectionsOnDisconnect() throws Exception {
public void testProxyStrategyWillOpenNewConnectionsOnDisconnect() throws Exception {
try (MockTransportService transport1 = startTransport("node1", Version.CURRENT);
MockTransportService transport2 = startTransport("node2", Version.CURRENT)) {
TransportAddress address1 = transport1.boundAddress().publishAddress();
@ -127,9 +121,10 @@ public class SimpleConnectionStrategyTests extends ESTestCase {
ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport);
int numOfConnections = randomIntBetween(4, 8);
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
SimpleConnectionStrategy strategy = new SimpleConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
numOfConnections, addresses(address1, address2))) {
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
numOfConnections, address1.toString(), alternatingResolver(address1, address2), false)) {
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2)));
@ -141,7 +136,7 @@ public class SimpleConnectionStrategyTests extends ESTestCase {
long initialConnectionsToTransport2 = connectionManager.getAllConnectedNodes().stream()
.filter(n -> n.getAddress().equals(address2))
.count();
assertNotEquals(0, initialConnectionsToTransport2);
assertEquals(0, initialConnectionsToTransport2);
assertEquals(numOfConnections, connectionManager.size());
assertTrue(strategy.assertNoRunningConnections());
@ -149,11 +144,12 @@ public class SimpleConnectionStrategyTests extends ESTestCase {
assertBusy(() -> {
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
// More connections now pointing to transport2
// Connections now pointing to transport2
long finalConnectionsToTransport2 = connectionManager.getAllConnectedNodes().stream()
.filter(n -> n.getAddress().equals(address2))
.count();
assertTrue(finalConnectionsToTransport2 > initialConnectionsToTransport2);
assertNotEquals(0, finalConnectionsToTransport2);
assertEquals(numOfConnections, connectionManager.size());
assertTrue(strategy.assertNoRunningConnections());
});
}
@ -161,56 +157,6 @@ public class SimpleConnectionStrategyTests extends ESTestCase {
}
}
public void testConnectWithSingleIncompatibleNode() {
Version incompatibleVersion = Version.CURRENT.minimumCompatibilityVersion().minimumCompatibilityVersion();
try (MockTransportService transport1 = startTransport("compatible-node", Version.CURRENT);
MockTransportService transport2 = startTransport("incompatible-node", incompatibleVersion)) {
TransportAddress address1 = transport1.boundAddress().publishAddress();
TransportAddress address2 = transport2.boundAddress().publishAddress();
try (MockTransportService localService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool)) {
localService.start();
localService.acceptIncomingRequests();
StubbableTransport stubbableTransport = new StubbableTransport(localService.transport);
ConnectionManager connectionManager = new ConnectionManager(profile, stubbableTransport);
AtomicInteger address1Attempts = new AtomicInteger(0);
AtomicInteger address2Attempts = new AtomicInteger(0);
stubbableTransport.setDefaultConnectBehavior((transport, discoveryNode, profile, listener) -> {
if (discoveryNode.getAddress().equals(address1)) {
address1Attempts.incrementAndGet();
transport.openConnection(discoveryNode, profile, listener);
} else if (discoveryNode.getAddress().equals(address2)) {
address2Attempts.incrementAndGet();
transport.openConnection(discoveryNode, profile, listener);
} else {
throw new AssertionError("Unexpected address");
}
});
int numOfConnections = 5;
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
SimpleConnectionStrategy strategy = new SimpleConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
numOfConnections, addresses(address1, address2))) {
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2)));
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
strategy.connect(connectFuture);
connectFuture.actionGet();
assertEquals(4 ,connectionManager.size());
assertEquals(4 ,connectionManager.getAllConnectedNodes().stream().map(n -> n.getAddress().equals(address1)).count());
// Three attempts on first round, one attempts on second round, zero attempts on third round
assertEquals(4, address1Attempts.get());
// Two attempts on first round, one attempt on second round, one attempt on third round
assertEquals(4, address2Attempts.get());
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2)));
assertTrue(strategy.assertNoRunningConnections());
}
}
}
}
public void testConnectFailsWithIncompatibleNodes() {
Version incompatibleVersion = Version.CURRENT.minimumCompatibilityVersion().minimumCompatibilityVersion();
try (MockTransportService transport1 = startTransport("incompatible-node", incompatibleVersion)) {
@ -223,8 +169,8 @@ public class SimpleConnectionStrategyTests extends ESTestCase {
ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport);
int numOfConnections = randomIntBetween(4, 8);
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
SimpleConnectionStrategy strategy = new SimpleConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
numOfConnections, addresses(address1))) {
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
numOfConnections, address1.toString())) {
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
strategy.connect(connectFuture);
@ -252,9 +198,11 @@ public class SimpleConnectionStrategyTests extends ESTestCase {
ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport);
int numOfConnections = randomIntBetween(4, 8);
Supplier<TransportAddress> resolver = alternatingResolver(address1, address2);
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
SimpleConnectionStrategy strategy = new SimpleConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
numOfConnections, addresses(address1, address2))) {
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
numOfConnections, address1.toString(), resolver, false)) {
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2)));
@ -262,18 +210,29 @@ public class SimpleConnectionStrategyTests extends ESTestCase {
strategy.connect(connectFuture);
connectFuture.actionGet();
if (connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))) {
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2)));
} else {
assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2)));
}
assertTrue(strategy.assertNoRunningConnections());
assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2)));
transport1.close();
assertBusy(() -> {
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
assertTrue(strategy.assertNoRunningConnections());
long finalConnectionsToTransport2 = connectionManager.getAllConnectedNodes().stream()
.filter(n -> n.getAddress().equals(address2))
.count();
// Connections not pointing to transport2 because the cluster name is different
assertEquals(0, finalConnectionsToTransport2);
assertEquals(0, connectionManager.size());
});
}
}
}
}
public void testSimpleStrategyWillResolveAddressesEachConnect() throws Exception {
public void testProxyStrategyWillResolveAddressesEachConnect() throws Exception {
try (MockTransportService transport1 = startTransport("seed_node", Version.CURRENT)) {
TransportAddress address = transport1.boundAddress().publishAddress();
@ -290,8 +249,8 @@ public class SimpleConnectionStrategyTests extends ESTestCase {
ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport);
int numOfConnections = randomIntBetween(4, 8);
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
SimpleConnectionStrategy strategy = new SimpleConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
numOfConnections, addresses(address), Collections.singletonList(addressSupplier))) {
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
numOfConnections, address.toString(), addressSupplier, false)) {
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
strategy.connect(connectFuture);
connectFuture.actionGet();
@ -304,11 +263,9 @@ public class SimpleConnectionStrategyTests extends ESTestCase {
}
}
public void testSimpleStrategyWillNeedToBeRebuiltIfNumOfSocketsOrAddressesChange() {
try (MockTransportService transport1 = startTransport("node1", Version.CURRENT);
MockTransportService transport2 = startTransport("node2", Version.CURRENT)) {
TransportAddress address1 = transport1.boundAddress().publishAddress();
TransportAddress address2 = transport2.boundAddress().publishAddress();
public void testProxyStrategyWillNeedToBeRebuiltIfNumOfSocketsOrAddressesChange() {
try (MockTransportService remoteTransport = startTransport("node1", Version.CURRENT)) {
TransportAddress remoteAddress = remoteTransport.boundAddress().publishAddress();
try (MockTransportService localService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool)) {
localService.start();
@ -317,38 +274,37 @@ public class SimpleConnectionStrategyTests extends ESTestCase {
ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport);
int numOfConnections = randomIntBetween(4, 8);
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
SimpleConnectionStrategy strategy = new SimpleConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
numOfConnections, addresses(address1, address2))) {
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
numOfConnections, remoteAddress.toString())) {
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
strategy.connect(connectFuture);
connectFuture.actionGet();
assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2)));
assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(remoteAddress)));
assertEquals(numOfConnections, connectionManager.size());
assertTrue(strategy.assertNoRunningConnections());
Setting<?> modeSetting = RemoteConnectionStrategy.REMOTE_CONNECTION_MODE
.getConcreteSettingForNamespace("cluster-alias");
Setting<?> addressesSetting = SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES
Setting<?> addressesSetting = ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES
.getConcreteSettingForNamespace("cluster-alias");
Setting<?> socketConnections = SimpleConnectionStrategy.REMOTE_SOCKET_CONNECTIONS
Setting<?> socketConnections = ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS
.getConcreteSettingForNamespace("cluster-alias");
Settings noChange = Settings.builder()
.put(modeSetting.getKey(), "simple")
.put(addressesSetting.getKey(), Strings.arrayToCommaDelimitedString(addresses(address1, address2).toArray()))
.put(modeSetting.getKey(), "proxy")
.put(addressesSetting.getKey(), remoteAddress.toString())
.put(socketConnections.getKey(), numOfConnections)
.build();
assertFalse(strategy.shouldRebuildConnection(noChange));
Settings addressesChanged = Settings.builder()
.put(modeSetting.getKey(), "simple")
.put(addressesSetting.getKey(), Strings.arrayToCommaDelimitedString(addresses(address1).toArray()))
.put(modeSetting.getKey(), "proxy")
.put(addressesSetting.getKey(), remoteAddress.toString())
.build();
assertTrue(strategy.shouldRebuildConnection(addressesChanged));
Settings socketsChanged = Settings.builder()
.put(modeSetting.getKey(), "simple")
.put(addressesSetting.getKey(), Strings.arrayToCommaDelimitedString(addresses(address1, address2).toArray()))
.put(modeSetting.getKey(), "proxy")
.put(addressesSetting.getKey(), remoteAddress.toString())
.put(socketConnections.getKey(), numOfConnections + 1)
.build();
assertTrue(strategy.shouldRebuildConnection(socketsChanged));
@ -359,8 +315,8 @@ public class SimpleConnectionStrategyTests extends ESTestCase {
public void testModeSettingsCannotBeUsedWhenInDifferentMode() {
List<Tuple<Setting.AffixSetting<?>, String>> restrictedSettings = Arrays.asList(
new Tuple<>(SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES, "192.168.0.1:8080"),
new Tuple<>(SimpleConnectionStrategy.REMOTE_SOCKET_CONNECTIONS, "3"));
new Tuple<>(ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES, "192.168.0.1:8080"),
new Tuple<>(ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS, "3"));
RemoteConnectionStrategy.ConnectionStrategy sniff = RemoteConnectionStrategy.ConnectionStrategy.SNIFF;
@ -382,12 +338,55 @@ public class SimpleConnectionStrategyTests extends ESTestCase {
Settings invalid = Settings.builder().put(settings).put(concreteSetting.getKey(), restrictedSetting.v2()).build();
IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> service.validate(invalid, true));
String expected = "Setting \"" + concreteSetting.getKey() + "\" cannot be used with the configured " +
"\"cluster.remote.cluster_name.mode\" [required=SIMPLE, configured=SNIFF]";
"\"cluster.remote.cluster_name.mode\" [required=PROXY, configured=SNIFF]";
assertEquals(expected, iae.getMessage());
}
}
private static List<String> addresses(final TransportAddress... addresses) {
return Arrays.stream(addresses).map(TransportAddress::toString).collect(Collectors.toList());
public void testServerNameAttributes() {
Settings bindSettings = Settings.builder().put(TransportSettings.BIND_HOST.getKey(), "localhost").build();
try (MockTransportService transport1 = startTransport("node1", Version.CURRENT, bindSettings)) {
TransportAddress address1 = transport1.boundAddress().publishAddress();
try (MockTransportService localService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool)) {
localService.start();
localService.acceptIncomingRequests();
String serverName = "localhost:" + address1.getPort();
ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport);
int numOfConnections = randomIntBetween(4, 8);
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
numOfConnections, serverName, true)) {
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
strategy.connect(connectFuture);
connectFuture.actionGet();
assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
assertTrue(strategy.assertNoRunningConnections());
DiscoveryNode discoveryNode = connectionManager.getAllConnectedNodes().stream().findFirst().get();
assertEquals("localhost", discoveryNode.getAttributes().get("server_name"));
}
}
}
}
private Supplier<TransportAddress> alternatingResolver(TransportAddress address1, TransportAddress address2) {
// On the first connection round, the connections will be routed to transport1. On the second
//connection round, the connections will be routed to transport2
AtomicBoolean transportSwitch = new AtomicBoolean(true);
return () -> {
if (transportSwitch.get()) {
transportSwitch.set(false);
return address1;
} else {
transportSwitch.set(true);
return address2;
}
};
}
}

@ -65,7 +65,7 @@ public class RemoteClusterAwareClientTests extends ESTestCase {
knownNodes.add(discoverableTransport.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());
Settings.Builder builder = Settings.builder();
builder.putList("cluster.remote.cluster1.sniff.seeds", seedTransport.getLocalDiscoNode().getAddress().toString());
builder.putList("cluster.remote.cluster1.seeds", seedTransport.getLocalDiscoNode().getAddress().toString());
try (MockTransportService service = MockTransportService.createNewService(builder.build(), Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
@ -96,7 +96,7 @@ public class RemoteClusterAwareClientTests extends ESTestCase {
knownNodes.add(discoverableTransport.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());
Settings.Builder builder = Settings.builder();
builder.putList("cluster.remote.cluster1.sniff.seeds", seedTransport.getLocalDiscoNode().getAddress().toString());
builder.putList("cluster.remote.cluster1.seeds", seedTransport.getLocalDiscoNode().getAddress().toString());
try (MockTransportService service = MockTransportService.createNewService(builder.build(), Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();

@ -52,7 +52,7 @@ public class RemoteClusterClientTests extends ESTestCase {
Settings localSettings = Settings.builder()
.put(RemoteClusterService.ENABLE_REMOTE_CLUSTERS.getKey(), true)
.put("cluster.remote.test.sniff.seeds",
.put("cluster.remote.test.seeds",
remoteNode.getAddress().getAddress() + ":" + remoteNode.getAddress().getPort()).build();
try (MockTransportService service = MockTransportService.createNewService(localSettings, Version.CURRENT, threadPool, null)) {
service.start();
@ -81,7 +81,7 @@ public class RemoteClusterClientTests extends ESTestCase {
DiscoveryNode remoteNode = remoteTransport.getLocalDiscoNode();
Settings localSettings = Settings.builder()
.put(RemoteClusterService.ENABLE_REMOTE_CLUSTERS.getKey(), true)
.put("cluster.remote.test.sniff.seeds",
.put("cluster.remote.test.seeds",
remoteNode.getAddress().getAddress() + ":" + remoteNode.getAddress().getPort()).build();
try (MockTransportService service = MockTransportService.createNewService(localSettings, Version.CURRENT, threadPool, null)) {
Semaphore semaphore = new Semaphore(1);

@ -326,7 +326,8 @@ public class RemoteClusterConnectionTests extends ESTestCase {
List<String> seedNodes = addresses(node3, node1, node2);
Collections.shuffle(seedNodes, random());
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT,
threadPool, null)) {
service.start();
service.acceptIncomingRequests();
int maxNumConnections = randomIntBetween(1, 5);
@ -337,9 +338,10 @@ public class RemoteClusterConnectionTests extends ESTestCase {
// test no nodes connected
RemoteConnectionInfo remoteConnectionInfo = assertSerialization(connection.getConnectionInfo());
assertNotNull(remoteConnectionInfo);
assertEquals(0, remoteConnectionInfo.numNodesConnected);
assertEquals(3, remoteConnectionInfo.seedNodes.size());
assertEquals(maxNumConnections, remoteConnectionInfo.connectionsPerCluster);
SniffConnectionStrategy.SniffModeInfo sniffInfo = (SniffConnectionStrategy.SniffModeInfo) remoteConnectionInfo.modeInfo;
assertEquals(0, sniffInfo.numNodesConnected);
assertEquals(3, sniffInfo.seedNodes.size());
assertEquals(maxNumConnections, sniffInfo.maxConnectionsPerCluster);
assertEquals(clusterAlias, remoteConnectionInfo.clusterAlias);
}
}
@ -347,32 +349,37 @@ public class RemoteClusterConnectionTests extends ESTestCase {
}
public void testRemoteConnectionInfo() throws IOException {
List<String> remoteAddresses = Collections.singletonList("seed:1");
RemoteConnectionInfo.ModeInfo modeInfo1;
RemoteConnectionInfo.ModeInfo modeInfo2;
if (randomBoolean()) {
modeInfo1 = new SniffConnectionStrategy.SniffModeInfo(remoteAddresses, 4, 4);
modeInfo2 = new SniffConnectionStrategy.SniffModeInfo(remoteAddresses, 4, 3);
} else {
modeInfo1 = new ProxyConnectionStrategy.ProxyModeInfo(remoteAddresses.get(0), 18, 18);
modeInfo2 = new ProxyConnectionStrategy.ProxyModeInfo(remoteAddresses.get(0), 18, 17);
}
RemoteConnectionInfo stats =
new RemoteConnectionInfo("test_cluster", Arrays.asList("seed:1"), 4, 3, TimeValue.timeValueMinutes(30), false);
new RemoteConnectionInfo("test_cluster", modeInfo1, TimeValue.timeValueMinutes(30), false);
assertSerialization(stats);
RemoteConnectionInfo stats1 =
new RemoteConnectionInfo("test_cluster", Arrays.asList("seed:1"), 4, 4, TimeValue.timeValueMinutes(30), true);
new RemoteConnectionInfo("test_cluster", modeInfo1, TimeValue.timeValueMinutes(30), true);
assertSerialization(stats1);
assertNotEquals(stats, stats1);
stats1 = new RemoteConnectionInfo("test_cluster_1", Arrays.asList("seed:1"), 4, 3, TimeValue.timeValueMinutes(30), false);
stats1 = new RemoteConnectionInfo("test_cluster_1", modeInfo1, TimeValue.timeValueMinutes(30), false);
assertSerialization(stats1);
assertNotEquals(stats, stats1);
stats1 = new RemoteConnectionInfo("test_cluster", Arrays.asList("seed:15"), 4, 3, TimeValue.timeValueMinutes(30), false);
stats1 = new RemoteConnectionInfo("test_cluster", modeInfo1, TimeValue.timeValueMinutes(325), false);
assertSerialization(stats1);
assertNotEquals(stats, stats1);
stats1 = new RemoteConnectionInfo("test_cluster", Arrays.asList("seed:1"), 4, 3, TimeValue.timeValueMinutes(30), true);
assertSerialization(stats1);
assertNotEquals(stats, stats1);
stats1 = new RemoteConnectionInfo("test_cluster", Arrays.asList("seed:1"), 4, 3, TimeValue.timeValueMinutes(325), true);
assertSerialization(stats1);
assertNotEquals(stats, stats1);
stats1 = new RemoteConnectionInfo("test_cluster", Arrays.asList("seed:1"), 5, 3, TimeValue.timeValueMinutes(30), false);
stats1 = new RemoteConnectionInfo("test_cluster", modeInfo2, TimeValue.timeValueMinutes(30), false);
assertSerialization(stats1);
assertNotEquals(stats, stats1);
}
@ -393,8 +400,9 @@ public class RemoteClusterConnectionTests extends ESTestCase {
public void testRemoteConnectionInfoBwComp() throws IOException {
final Version version = VersionUtils.randomVersionBetween(random(),
Version.V_6_1_0, VersionUtils.getPreviousVersion(Version.V_7_0_0));
SniffConnectionStrategy.SniffModeInfo modeInfo = new SniffConnectionStrategy.SniffModeInfo(Arrays.asList("0.0.0.0:1"), 4, 4);
RemoteConnectionInfo expected =
new RemoteConnectionInfo("test_cluster", Arrays.asList("0.0.0.0:1"), 4, 4, new TimeValue(30, TimeUnit.MINUTES), false);
new RemoteConnectionInfo("test_cluster", modeInfo, new TimeValue(30, TimeUnit.MINUTES), false);
// This version was created using the serialization code in use from 6.1 but before 7.0
String encoded = "AQQAAAAABzAuMC4wLjAAAAABAQQAAAAABzAuMC4wLjAAAABQBDwEBAx0ZXN0X2NsdXN0ZXIA";
@ -418,27 +426,33 @@ public class RemoteClusterConnectionTests extends ESTestCase {
}
public void testRenderConnectionInfoXContent() throws IOException {
RemoteConnectionInfo stats =
new RemoteConnectionInfo("test_cluster", Arrays.asList("seed:1"), 4, 3, TimeValue.timeValueMinutes(30), true);
List<String> remoteAddresses = Arrays.asList("seed:1", "seed:2");
RemoteConnectionInfo.ModeInfo modeInfo;
boolean sniff = randomBoolean();
if (sniff) {
modeInfo = new SniffConnectionStrategy.SniffModeInfo(remoteAddresses, 3, 2);
} else {
modeInfo = new ProxyConnectionStrategy.ProxyModeInfo(remoteAddresses.get(0), 18, 16);
}
RemoteConnectionInfo stats = new RemoteConnectionInfo("test_cluster", modeInfo, TimeValue.timeValueMinutes(30), true);
stats = assertSerialization(stats);
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
stats.toXContent(builder, null);
builder.endObject();
assertEquals("{\"test_cluster\":{\"seeds\":[\"seed:1\"],\"connected\":true," +
"\"num_nodes_connected\":3,\"max_connections_per_cluster\":4,\"initial_connect_timeout\":\"30m\"," +
"\"skip_unavailable\":true}}", Strings.toString(builder));
stats = new RemoteConnectionInfo(
"some_other_cluster", Arrays.asList("seed:1", "seed:2"), 2, 0, TimeValue.timeValueSeconds(30), false);
stats = assertSerialization(stats);
builder = XContentFactory.jsonBuilder();
builder.startObject();
stats.toXContent(builder, null);
builder.endObject();
assertEquals("{\"some_other_cluster\":{\"seeds\":[\"seed:1\",\"seed:2\"],"
+ "\"connected\":false,\"num_nodes_connected\":0,\"max_connections_per_cluster\":2,\"initial_connect_timeout\":\"30s\"," +
"\"skip_unavailable\":false}}", Strings.toString(builder));
if (sniff) {
assertEquals("{\"test_cluster\":{\"connected\":true,\"mode\":\"sniff\",\"seeds\":[\"seed:1\",\"seed:2\"]," +
"\"num_nodes_connected\":2,\"max_connections_per_cluster\":3,\"initial_connect_timeout\":\"30m\"," +
"\"skip_unavailable\":true}}", Strings.toString(builder));
} else {
assertEquals("{\"test_cluster\":{\"connected\":true,\"mode\":\"proxy\",\"address\":\"seed:1\"," +
"\"num_sockets_connected\":16,\"max_socket_connections\":18,\"initial_connect_timeout\":\"30m\"," +
"\"skip_unavailable\":true}}", Strings.toString(builder));
}
}
public void testCollectNodes() throws Exception {
@ -627,30 +641,25 @@ public class RemoteClusterConnectionTests extends ESTestCase {
private Settings buildRandomSettings(String clusterAlias, List<String> addresses) {
if (randomBoolean()) {
return buildSimpleSettings(clusterAlias, addresses);
return buildProxySettings(clusterAlias, addresses);
} else {
return buildSniffSettings(clusterAlias, addresses);
}
}
private static Settings buildSimpleSettings(String clusterAlias, List<String> addresses) {
private static Settings buildProxySettings(String clusterAlias, List<String> addresses) {
Settings.Builder builder = Settings.builder();
builder.put(SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).getKey(),
Strings.collectionToCommaDelimitedString(addresses));
builder.put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).getKey(), "simple");
builder.put(ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).getKey(),
addresses.get(0));
builder.put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).getKey(), "proxy");
return builder.build();
}
private static Settings buildSniffSettings(String clusterAlias, List<String> seedNodes) {
Settings.Builder builder = Settings.builder();
builder.put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).getKey(), "sniff");
if (randomBoolean()) {
builder.put(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace(clusterAlias).getKey(),
Strings.collectionToCommaDelimitedString(seedNodes));
} else {
builder.put(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(clusterAlias).getKey(),
Strings.collectionToCommaDelimitedString(seedNodes));
}
builder.put(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(clusterAlias).getKey(),
Strings.collectionToCommaDelimitedString(seedNodes));
return builder.build();
}
}

@ -84,27 +84,26 @@ public class RemoteClusterServiceTests extends ESTestCase {
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SimpleConnectionStrategy.REMOTE_SOCKET_CONNECTIONS));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS));
}
public void testRemoteClusterSeedSetting() {
// simple validation
Settings settings = Settings.builder()
.put("cluster.remote.foo.sniff.seeds", "192.168.0.1:8080")
.put("cluster.remote.bar.sniff.seed", "[::1]:9090").build();
.put("cluster.remote.foo.seeds", "192.168.0.1:8080")
.put("cluster.remote.bar.seeds", "[::1]:9090").build();
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getAllConcreteSettings(settings).forEach(setting -> setting.get(settings));
Settings brokenSettings = Settings.builder()
.put("cluster.remote.foo.sniff.seeds", "192.168.0.1").build();
.put("cluster.remote.foo.seeds", "192.168.0.1").build();
expectThrows(IllegalArgumentException.class, () ->
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS
.getAllConcreteSettings(brokenSettings).forEach(setting -> setting.get(brokenSettings)));
Settings brokenPortSettings = Settings.builder()
.put("cluster.remote.foo.sniff.seeds", "192.168.0.1:123456789123456789").build();
.put("cluster.remote.foo.seeds", "192.168.0.1:123456789123456789").build();
Exception e = expectThrows(
IllegalArgumentException.class,
() -> SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getAllConcreteSettings(brokenSettings)
@ -128,8 +127,8 @@ public class RemoteClusterServiceTests extends ESTestCase {
transportService.start();
transportService.acceptIncomingRequests();
Settings.Builder builder = Settings.builder();
builder.putList("cluster.remote.cluster_1.sniff.seeds", cluster1Seed.getAddress().toString());
builder.putList("cluster.remote.cluster_2.sniff.seeds", cluster2Seed.getAddress().toString());
builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString());
builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString());
try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
@ -361,7 +360,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
transportService.start();
transportService.acceptIncomingRequests();
Settings.Builder builder = Settings.builder();
builder.putList("cluster.remote.cluster_1.sniff.seeds", cluster1Seed.getAddress().toString());
builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString());
try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
service.initializeRemoteClusters();
RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1");
@ -370,7 +369,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
settingsChange.put("cluster.remote.cluster_1.transport.ping_schedule", pingSchedule);
boolean compressionEnabled = true;
settingsChange.put("cluster.remote.cluster_1.transport.compress", compressionEnabled);
settingsChange.putList("cluster.remote.cluster_1.sniff.seeds", cluster1Seed.getAddress().toString());
settingsChange.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString());
service.validateAndUpdateRemoteCluster("cluster_1", settingsChange.build());
assertBusy(remoteClusterConnection::isClosed);
@ -415,9 +414,9 @@ public class RemoteClusterServiceTests extends ESTestCase {
transportService.acceptIncomingRequests();
final Settings.Builder builder = Settings.builder();
builder.putList(
"cluster.remote.cluster_1.sniff.seeds", c1N1Node.getAddress().toString());
"cluster.remote.cluster_1.seed", c1N1Node.getAddress().toString());
builder.putList(
"cluster.remote.cluster_2.sniff.seeds", c2N1Node.getAddress().toString());
"cluster.remote.cluster_2.seed", c2N1Node.getAddress().toString());
try (RemoteClusterService service =
new RemoteClusterService(settings, transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
@ -482,8 +481,8 @@ public class RemoteClusterServiceTests extends ESTestCase {
transportService.start();
transportService.acceptIncomingRequests();
final Settings.Builder builder = Settings.builder();
builder.putList("cluster.remote.cluster_1.sniff.seeds", c1N1Node.getAddress().toString());
builder.putList("cluster.remote.cluster_2.sniff.seeds", c2N1Node.getAddress().toString());
builder.putList("cluster.remote.cluster_1.seed", c1N1Node.getAddress().toString());
builder.putList("cluster.remote.cluster_2.seed", c2N1Node.getAddress().toString());
try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
@ -553,9 +552,9 @@ public class RemoteClusterServiceTests extends ESTestCase {
transportService.acceptIncomingRequests();
final Settings.Builder builder = Settings.builder();
builder.putList(
"cluster.remote.cluster_1.sniff.seeds", c1N1Node.getAddress().toString());
"cluster.remote.cluster_1.seed", c1N1Node.getAddress().toString());
builder.putList(
"cluster.remote.cluster_2.sniff.seeds", c2N1Node.getAddress().toString());
"cluster.remote.cluster_2.seed", c2N1Node.getAddress().toString());
try (RemoteClusterService service =
new RemoteClusterService(settings, transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
@ -685,12 +684,12 @@ public class RemoteClusterServiceTests extends ESTestCase {
}
AbstractScopedSettings service = new ClusterSettings(Settings.EMPTY,
new HashSet<>(Arrays.asList(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS, SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD,
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE)));
new HashSet<>(Arrays.asList(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS,
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE)));
{
Settings settings = Settings.builder().put("cluster.remote.foo.skip_unavailable", randomBoolean()).build();
IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> service.validate(settings, true));
assertEquals("missing required setting [cluster.remote.foo.sniff.seeds] for setting [cluster.remote.foo.skip_unavailable]",
assertEquals("missing required setting [cluster.remote.foo.seeds] for setting [cluster.remote.foo.skip_unavailable]",
iae.getMessage());
}
{
@ -698,11 +697,11 @@ public class RemoteClusterServiceTests extends ESTestCase {
String seed = remoteSeedTransport.getLocalDiscoNode().getAddress().toString();
service.validate(Settings.builder().put("cluster.remote.foo.skip_unavailable", randomBoolean())
.put("cluster.remote.foo.seeds", seed).build(), true);
service.validate(Settings.builder().put("cluster.remote.foo.sniff.seeds", seed).build(), true);
service.validate(Settings.builder().put("cluster.remote.foo.seeds", seed).build(), true);
AbstractScopedSettings service2 = new ClusterSettings(Settings.builder().put("cluster.remote.foo.seeds", seed).build(),
new HashSet<>(Arrays.asList(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD,
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE)));
new HashSet<>(Arrays.asList(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS,
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE)));
service2.validate(Settings.builder().put("cluster.remote.foo.skip_unavailable", randomBoolean()).build(), false);
}
}
@ -725,7 +724,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
transportService.acceptIncomingRequests();
final Settings.Builder builder = Settings.builder();
builder.putList("cluster.remote.cluster_test.sniff.seeds", Collections.singletonList(node0.getAddress().toString()));
builder.putList("cluster.remote.cluster_test.seeds", Collections.singletonList(node0.getAddress().toString()));
try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
@ -821,13 +820,8 @@ public class RemoteClusterServiceTests extends ESTestCase {
private static Settings createSettings(String clusterAlias, List<String> seeds) {
Settings.Builder builder = Settings.builder();
if (randomBoolean()) {
builder.put(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace(clusterAlias).getKey(),
Strings.collectionToCommaDelimitedString(seeds));
} else {
builder.put(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(clusterAlias).getKey(),
Strings.collectionToCommaDelimitedString(seeds));
}
builder.put(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(clusterAlias).getKey(),
Strings.collectionToCommaDelimitedString(seeds));
return builder.build();
}
}

@ -32,7 +32,6 @@ import static org.elasticsearch.transport.SniffConnectionStrategy.REMOTE_CLUSTER
import static org.elasticsearch.transport.SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS;
import static org.elasticsearch.transport.SniffConnectionStrategy.SEARCH_REMOTE_CLUSTERS_PROXY;
import static org.elasticsearch.transport.SniffConnectionStrategy.SEARCH_REMOTE_CLUSTERS_SEEDS;
import static org.elasticsearch.transport.SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD;
import static org.elasticsearch.transport.RemoteClusterService.ENABLE_REMOTE_CLUSTERS;
import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE;
import static org.elasticsearch.transport.SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER;
@ -124,7 +123,7 @@ public class RemoteClusterSettingsTests extends ESTestCase {
public void testSeedsDefault() {
final String alias = randomAlphaOfLength(8);
assertThat(REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace(alias).get(Settings.EMPTY), emptyCollectionOf(String.class));
assertThat(REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(alias).get(Settings.EMPTY), emptyCollectionOf(String.class));
}
public void testProxyFallback() {

@ -33,7 +33,7 @@ public class RemoteConnectionStrategyTests extends ESTestCase {
ConnectionManager connectionManager = new ConnectionManager(Settings.EMPTY, mock(Transport.class));
RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager("cluster-alias", connectionManager);
FakeConnectionStrategy first = new FakeConnectionStrategy("cluster-alias", mock(TransportService.class), remoteConnectionManager,
RemoteConnectionStrategy.ConnectionStrategy.SIMPLE);
RemoteConnectionStrategy.ConnectionStrategy.PROXY);
Settings newSettings = Settings.builder()
.put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace("cluster-alias").getKey(), "sniff")
.build();
@ -44,9 +44,9 @@ public class RemoteConnectionStrategyTests extends ESTestCase {
ConnectionManager connectionManager = new ConnectionManager(Settings.EMPTY, mock(Transport.class));
RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager("cluster-alias", connectionManager);
FakeConnectionStrategy first = new FakeConnectionStrategy("cluster-alias", mock(TransportService.class), remoteConnectionManager,
RemoteConnectionStrategy.ConnectionStrategy.SIMPLE);
RemoteConnectionStrategy.ConnectionStrategy.PROXY);
Settings newSettings = Settings.builder()
.put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace("cluster-alias").getKey(), "simple")
.put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace("cluster-alias").getKey(), "proxy")
.build();
assertFalse(first.shouldRebuildConnection(newSettings));
}
@ -57,10 +57,10 @@ public class RemoteConnectionStrategyTests extends ESTestCase {
assertEquals(false, connectionManager.getConnectionProfile().getCompressionEnabled());
RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager("cluster-alias", connectionManager);
FakeConnectionStrategy first = new FakeConnectionStrategy("cluster-alias", mock(TransportService.class), remoteConnectionManager,
RemoteConnectionStrategy.ConnectionStrategy.SIMPLE);
RemoteConnectionStrategy.ConnectionStrategy.PROXY);
Settings.Builder newBuilder = Settings.builder();
newBuilder.put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace("cluster-alias").getKey(), "simple");
newBuilder.put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace("cluster-alias").getKey(), "proxy");
if (randomBoolean()) {
newBuilder.put(RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace("cluster-alias").getKey(),
TimeValue.timeValueSeconds(5));
@ -75,10 +75,10 @@ public class RemoteConnectionStrategyTests extends ESTestCase {
for (RemoteConnectionStrategy.ConnectionStrategy strategy : RemoteConnectionStrategy.ConnectionStrategy.values()) {
String settingKey = RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).getKey();
Settings simpleSettings = Settings.builder().put(settingKey, strategy.name()).build();
ConnectionProfile simpleProfile = RemoteConnectionStrategy.buildConnectionProfile(clusterAlias, simpleSettings);
Settings proxySettings = Settings.builder().put(settingKey, strategy.name()).build();
ConnectionProfile proxyProfile = RemoteConnectionStrategy.buildConnectionProfile(clusterAlias, proxySettings);
assertEquals("Incorrect number of channels for " + strategy.name(),
strategy.getNumberOfChannels(), simpleProfile.getNumConnections());
strategy.getNumberOfChannels(), proxyProfile.getNumConnections());
}
}
@ -111,5 +111,10 @@ public class RemoteConnectionStrategyTests extends ESTestCase {
protected void connectImpl(ActionListener<Void> listener) {
}
@Override
protected RemoteConnectionInfo.ModeInfo getModeInfo() {
return null;
}
}
}

@ -652,14 +652,13 @@ public class SniffConnectionStrategyTests extends ESTestCase {
public void testModeSettingsCannotBeUsedWhenInDifferentMode() {
List<Tuple<Setting.AffixSetting<?>, String>> restrictedSettings = Arrays.asList(
new Tuple<>(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS, "192.168.0.1:8080"),
new Tuple<>(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD, "192.168.0.1:8080"),
new Tuple<>(SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS, "2"));
RemoteConnectionStrategy.ConnectionStrategy simple = RemoteConnectionStrategy.ConnectionStrategy.SIMPLE;
RemoteConnectionStrategy.ConnectionStrategy proxy = RemoteConnectionStrategy.ConnectionStrategy.PROXY;
String clusterName = "cluster_name";
Settings settings = Settings.builder()
.put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterName).getKey(), simple.name())
.put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterName).getKey(), proxy.name())
.build();
Set<Setting<?>> clusterSettings = new HashSet<>();
@ -675,7 +674,7 @@ public class SniffConnectionStrategyTests extends ESTestCase {
Settings invalid = Settings.builder().put(settings).put(concreteSetting.getKey(), restrictedSetting.v2()).build();
IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> service.validate(invalid, true));
String expected = "Setting \"" + concreteSetting.getKey() + "\" cannot be used with the configured " +
"\"cluster.remote.cluster_name.mode\" [required=SNIFF, configured=SIMPLE]";
"\"cluster.remote.cluster_name.mode\" [required=SNIFF, configured=PROXY]";
assertEquals(expected, iae.getMessage());
}
}

@ -69,7 +69,7 @@ public abstract class CcrSingleNodeTestCase extends ESSingleNodeTestCase {
List<RemoteConnectionInfo> infos = client().execute(RemoteInfoAction.INSTANCE, new RemoteInfoRequest()).get().getInfos();
assertThat(infos.size(), equalTo(1));
assertThat(infos.get(0).getNumNodesConnected(), equalTo(1));
assertTrue(infos.get(0).isConnected());
}
@Before

@ -1393,7 +1393,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
String address = getLeaderCluster().getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString();
Setting<Boolean> compress = RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace("leader_cluster");
Setting<List<String>> seeds = SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace("leader_cluster");
Setting<List<String>> seeds = SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace("leader_cluster");
settingsRequest.persistentSettings(Settings.builder().put(compress.getKey(), compress.getDefault(Settings.EMPTY))
.put(seeds.getKey(), address));
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());

@ -23,7 +23,6 @@ import java.util.Locale;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
public class RestartIndexFollowingIT extends CcrIntegTestCase {
@ -96,7 +95,7 @@ public class RestartIndexFollowingIT extends CcrIntegTestCase {
List<RemoteConnectionInfo> infos =
followerClient().execute(RemoteInfoAction.INSTANCE, new RemoteInfoRequest()).get().getInfos();
assertThat(infos.size(), equalTo(1));
assertThat(infos.get(0).getNumNodesConnected(), greaterThanOrEqualTo(1));
assertTrue(infos.get(0).isConnected());
}
private void cleanRemoteCluster() throws Exception {

@ -1,5 +1,5 @@
---
"Add transient remote cluster in simple mode with invalid sniff settings":
"Add transient remote cluster in proxy mode with invalid sniff settings":
- do:
cluster.get_settings:
include_defaults: true
@ -12,14 +12,14 @@
flat_settings: true
body:
transient:
cluster.remote.test_remote_cluster.mode: "simple"
cluster.remote.test_remote_cluster.sniff.node_connections: "5"
cluster.remote.test_remote_cluster.simple.addresses: $remote_ip
cluster.remote.test_remote_cluster.mode: "proxy"
cluster.remote.test_remote_cluster.node_connections: "5"
cluster.remote.test_remote_cluster.proxy_address: $remote_ip
- match: { status: 400 }
- match: { error.root_cause.0.type: "illegal_argument_exception" }
- match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.sniff.node_connections\" cannot be
used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SNIFF, configured=SIMPLE]" }
- match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.node_connections\" cannot be
used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SNIFF, configured=PROXY]" }
- do:
catch: bad_request
@ -27,17 +27,17 @@
flat_settings: true
body:
transient:
cluster.remote.test_remote_cluster.mode: "simple"
cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip
cluster.remote.test_remote_cluster.simple.addresses: $remote_ip
cluster.remote.test_remote_cluster.mode: "proxy"
cluster.remote.test_remote_cluster.seeds: $remote_ip
cluster.remote.test_remote_cluster.proxy_address: $remote_ip
- match: { status: 400 }
- match: { error.root_cause.0.type: "illegal_argument_exception" }
- match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.sniff.seeds\" cannot be
used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SNIFF, configured=SIMPLE]" }
- match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.seeds\" cannot be
used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SNIFF, configured=PROXY]" }
---
"Add transient remote cluster in sniff mode with invalid simple settings":
"Add transient remote cluster in sniff mode with invalid proxy settings":
- do:
cluster.get_settings:
include_defaults: true
@ -50,13 +50,13 @@
flat_settings: true
body:
transient:
cluster.remote.test_remote_cluster.simple.socket_connections: "20"
cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip
cluster.remote.test_remote_cluster.proxy_socket_connections: "20"
cluster.remote.test_remote_cluster.seeds: $remote_ip
- match: { status: 400 }
- match: { error.root_cause.0.type: "illegal_argument_exception" }
- match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.simple.socket_connections\" cannot be
used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SIMPLE, configured=SNIFF]" }
- match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.proxy_socket_connections\" cannot be
used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=PROXY, configured=SNIFF]" }
- do:
catch: bad_request
@ -64,16 +64,16 @@
flat_settings: true
body:
transient:
cluster.remote.test_remote_cluster.simple.addresses: $remote_ip
cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip
cluster.remote.test_remote_cluster.proxy_address: $remote_ip
cluster.remote.test_remote_cluster.seeds: $remote_ip
- match: { status: 400 }
- match: { error.root_cause.0.type: "illegal_argument_exception" }
- match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.simple.addresses\" cannot be
used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SIMPLE, configured=SNIFF]" }
- match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.proxy_address\" cannot be
used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=PROXY, configured=SNIFF]" }
---
"Add transient remote cluster using simple connection mode using valid settings":
"Add transient remote cluster using proxy connection mode using valid settings":
- do:
cluster.get_settings:
include_defaults: true
@ -85,13 +85,13 @@
flat_settings: true
body:
transient:
cluster.remote.test_remote_cluster.mode: "simple"
cluster.remote.test_remote_cluster.simple.socket_connections: "3"
cluster.remote.test_remote_cluster.simple.addresses: $remote_ip
cluster.remote.test_remote_cluster.mode: "proxy"
cluster.remote.test_remote_cluster.proxy_socket_connections: "3"
cluster.remote.test_remote_cluster.proxy_address: $remote_ip
- match: {transient.cluster\.remote\.test_remote_cluster\.mode: "simple"}
- match: {transient.cluster\.remote\.test_remote_cluster\.simple\.socket_connections: "3"}
- match: {transient.cluster\.remote\.test_remote_cluster\.simple\.addresses: $remote_ip}
- match: {transient.cluster\.remote\.test_remote_cluster\.mode: "proxy"}
- match: {transient.cluster\.remote\.test_remote_cluster\.proxy_socket_connections: "3"}
- match: {transient.cluster\.remote\.test_remote_cluster\.proxy_address: $remote_ip}
- do:
search:
@ -120,12 +120,12 @@
body:
transient:
cluster.remote.test_remote_cluster.mode: "sniff"
cluster.remote.test_remote_cluster.sniff.node_connections: "3"
cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip
cluster.remote.test_remote_cluster.node_connections: "3"
cluster.remote.test_remote_cluster.seeds: $remote_ip
- match: {transient.cluster\.remote\.test_remote_cluster\.mode: "sniff"}
- match: {transient.cluster\.remote\.test_remote_cluster\.sniff\.node_connections: "3"}
- match: {transient.cluster\.remote\.test_remote_cluster\.sniff\.seeds: $remote_ip}
- match: {transient.cluster\.remote\.test_remote_cluster\.node_connections: "3"}
- match: {transient.cluster\.remote\.test_remote_cluster\.seeds: $remote_ip}
- do:
search:
@ -154,10 +154,10 @@
body:
transient:
cluster.remote.test_remote_cluster.mode: "sniff"
cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip
cluster.remote.test_remote_cluster.seeds: $remote_ip
- match: {transient.cluster\.remote\.test_remote_cluster\.mode: "sniff"}
- match: {transient.cluster\.remote\.test_remote_cluster\.sniff\.seeds: $remote_ip}
- match: {transient.cluster\.remote\.test_remote_cluster\.seeds: $remote_ip}
- do:
search:
@ -178,25 +178,25 @@
flat_settings: true
body:
transient:
cluster.remote.test_remote_cluster.mode: "simple"
cluster.remote.test_remote_cluster.simple.addresses: $remote_ip
cluster.remote.test_remote_cluster.mode: "proxy"
cluster.remote.test_remote_cluster.proxy_address: $remote_ip
- match: { status: 400 }
- match: { error.root_cause.0.type: "illegal_argument_exception" }
- match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.sniff.seeds\" cannot be
used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SNIFF, configured=SIMPLE]" }
- match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.seeds\" cannot be
used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SNIFF, configured=PROXY]" }
- do:
cluster.put_settings:
flat_settings: true
body:
transient:
cluster.remote.test_remote_cluster.mode: "simple"
cluster.remote.test_remote_cluster.sniff.seeds: null
cluster.remote.test_remote_cluster.simple.addresses: $remote_ip
cluster.remote.test_remote_cluster.mode: "proxy"
cluster.remote.test_remote_cluster.seeds: null
cluster.remote.test_remote_cluster.proxy_address: $remote_ip
- match: {transient.cluster\.remote\.test_remote_cluster\.mode: "simple"}
- match: {transient.cluster\.remote\.test_remote_cluster\.simple\.addresses: $remote_ip}
- match: {transient.cluster\.remote\.test_remote_cluster\.mode: "proxy"}
- match: {transient.cluster\.remote\.test_remote_cluster\.proxy_address: $remote_ip}
- do:
search: