Generalize search.remote settings to cluster.remote (#33413)

With features like CCR building on the CCS infrastructure, the settings
prefix search.remote makes less sense as the namespace for these remote
cluster settings than does a more general namespace like
cluster.remote. This commit replaces these settings with cluster.remote
with a fallback to the deprecated settings search.remote.
This commit is contained in:
Jason Tedor 2018-09-05 20:43:44 -04:00 committed by GitHub
parent 39e3bd93c7
commit d71ced1b00
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 527 additions and 230 deletions

View File

@ -38,7 +38,7 @@ remote clusters that should be connected to, for instance:
[source,yaml]
--------------------------------
search:
cluster:
remote:
cluster_one: <1>
seeds: 127.0.0.1:9300
@ -58,7 +58,7 @@ following:
PUT _cluster/settings
{
"persistent": {
"search": {
"cluster": {
"remote": {
"cluster_one": {
"seeds": [
@ -94,7 +94,7 @@ because we'll use them later.
{
"acknowledged" : true,
"persistent": {
"search": {
"cluster": {
"remote": {
"cluster_one": {
"seeds": [
@ -129,7 +129,7 @@ A remote cluster can be deleted from the cluster settings by setting its seeds t
PUT _cluster/settings
{
"persistent": {
"search": {
"cluster": {
"remote": {
"cluster_three": {
"seeds": null <1>
@ -309,7 +309,7 @@ boolean `skip_unavailable` setting, set to `false` by default.
PUT _cluster/settings
{
"persistent": {
"search.remote.cluster_two.skip_unavailable": true <1>
"cluster.remote.cluster_two.skip_unavailable": true <1>
}
}
--------------------------------
@ -391,30 +391,30 @@ GET /cluster_one:twitter,cluster_two:twitter,twitter/_search <1>
[[cross-cluster-search-settings]]
=== Cross cluster search settings
`search.remote.connections_per_cluster`::
`cluster.remote.connections_per_cluster`::
The number of nodes to connect to per remote cluster. The default is `3`.
`search.remote.initial_connect_timeout`::
`cluster.remote.initial_connect_timeout`::
The time to wait for remote connections to be established when the node starts. The default is `30s`.
`search.remote.node.attr`::
`cluster.remote.node.attr`::
A node attribute to filter out nodes that are eligible as a gateway node in
the remote cluster. For instance a node can have a node attribute
`node.attr.gateway: true` such that only nodes with this attribute will be
connected to if `search.remote.node.attr` is set to `gateway`.
connected to if `cluster.remote.node.attr` is set to `gateway`.
`search.remote.connect`::
`cluster.remote.connect`::
By default, any node in the cluster can act as a cross-cluster client and
connect to remote clusters. The `search.remote.connect` setting can be set
connect to remote clusters. The `cluster.remote.connect` setting can be set
to `false` (defaults to `true`) to prevent certain nodes from connecting to
remote clusters. Cross-cluster search requests must be sent to a node that
is allowed to act as a cross-cluster client.
`search.remote.${cluster_alias}.skip_unavailable`::
`cluster.remote.${cluster_alias}.skip_unavailable`::
Per cluster boolean setting that allows to skip specific clusters when no
nodes belonging to them are available and they are searched as part of a

View File

@ -59,7 +59,7 @@ To create a dedicated ingest node when {xpack} is installed, set:
node.master: false <1>
node.data: false <2>
node.ingest: true <3>
search.remote.connect: false <4>
cluster.remote.connect: false <4>
node.ml: false <5>
-------------------
<1> Disable the `node.master` role (enabled by default).
@ -75,7 +75,7 @@ To create a dedicated coordinating node when {xpack} is installed, set:
node.master: false <1>
node.data: false <2>
node.ingest: false <3>
search.remote.connect: false <4>
cluster.remote.connect: false <4>
node.ml: false <5>
-------------------
<1> Disable the `node.master` role (enabled by default).
@ -105,7 +105,7 @@ To create a dedicated {ml} node, set:
node.master: false <1>
node.data: false <2>
node.ingest: false <3>
search.remote.connect: false <4>
cluster.remote.connect: false <4>
node.ml: true <5>
xpack.ml.enabled: true <6>
-------------------

View File

@ -93,7 +93,7 @@ To create a dedicated master-eligible node, set:
node.master: true <1>
node.data: false <2>
node.ingest: false <3>
search.remote.connect: false <4>
cluster.remote.connect: false <4>
-------------------
<1> The `node.master` role is enabled by default.
<2> Disable the `node.data` role (enabled by default).
@ -192,7 +192,7 @@ To create a dedicated data node, set:
node.master: false <1>
node.data: true <2>
node.ingest: false <3>
search.remote.connect: false <4>
cluster.remote.connect: false <4>
-------------------
<1> Disable the `node.master` role (enabled by default).
<2> The `node.data` role is enabled by default.
@ -220,7 +220,7 @@ To create a dedicated ingest node, set:
node.master: false <1>
node.data: false <2>
node.ingest: true <3>
search.remote.connect: false <4>
cluster.remote.connect: false <4>
-------------------
<1> Disable the `node.master` role (enabled by default).
<2> Disable the `node.data` role (enabled by default).
@ -260,7 +260,7 @@ To create a dedicated coordinating node, set:
node.master: false <1>
node.data: false <2>
node.ingest: false <3>
search.remote.connect: false <4>
cluster.remote.connect: false <4>
-------------------
<1> Disable the `node.master` role (enabled by default).
<2> Disable the `node.data` role (enabled by default).

View File

@ -235,8 +235,8 @@ public class CrossClusterSearchUnavailableClusterIT extends ESRestTestCase {
() -> client().performRequest(request));
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
assertThat(responseException.getMessage(),
containsString("Missing required setting [search.remote.remote1.seeds] " +
"for setting [search.remote.remote1.skip_unavailable]"));
containsString("Missing required setting [cluster.remote.remote1.seeds] " +
"for setting [cluster.remote.remote1.skip_unavailable]"));
}
Map<String, Object> settingsMap = new HashMap<>();
@ -251,8 +251,8 @@ public class CrossClusterSearchUnavailableClusterIT extends ESRestTestCase {
ResponseException responseException = expectThrows(ResponseException.class,
() -> client().performRequest(request));
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
assertThat(responseException.getMessage(), containsString("Missing required setting [search.remote.remote1.seeds] " +
"for setting [search.remote.remote1.skip_unavailable]"));
assertThat(responseException.getMessage(), containsString("Missing required setting [cluster.remote.remote1.seeds] " +
"for setting [cluster.remote.remote1.skip_unavailable]"));
}
if (randomBoolean()) {
@ -304,7 +304,7 @@ public class CrossClusterSearchUnavailableClusterIT extends ESRestTestCase {
{
builder.startObject("persistent");
{
builder.startObject("search.remote.remote1");
builder.startObject("cluster.remote.remote1");
{
for (Map.Entry<String, Object> entry : settings.entrySet()) {
builder.field(entry.getKey(), entry.getValue());

View File

@ -28,7 +28,7 @@ task remoteClusterTest(type: RestIntegTestTask) {
remoteClusterTestCluster {
numNodes = 2
clusterName = 'remote-cluster'
setting 'search.remote.connect', false
setting 'cluster.remote.connect', false
}
remoteClusterTestRunner {
@ -39,9 +39,9 @@ task mixedClusterTest(type: RestIntegTestTask) {}
mixedClusterTestCluster {
dependsOn remoteClusterTestRunner
setting 'search.remote.my_remote_cluster.seeds', "\"${-> remoteClusterTest.nodes.get(0).transportUri()}\""
setting 'search.remote.connections_per_cluster', 1
setting 'search.remote.connect', true
setting 'cluster.remote.my_remote_cluster.seeds', "\"${-> remoteClusterTest.nodes.get(0).transportUri()}\""
setting 'cluster.remote.connections_per_cluster', 1
setting 'cluster.remote.connect', true
}
mixedClusterTestRunner {

View File

@ -99,16 +99,16 @@
cluster.get_settings:
include_defaults: true
- set: { defaults.search.remote.my_remote_cluster.seeds.0: remote_ip }
- set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip }
- do:
cluster.put_settings:
flat_settings: true
body:
transient:
search.remote.test_remote_cluster.seeds: $remote_ip
cluster.remote.test_remote_cluster.seeds: $remote_ip
- match: {transient: {search.remote.test_remote_cluster.seeds: $remote_ip}}
- match: {transient: {cluster.remote.test_remote_cluster.seeds: $remote_ip}}
- do:
search:
@ -124,16 +124,16 @@
cluster.get_settings:
include_defaults: true
- set: { defaults.search.remote.my_remote_cluster.seeds.0: remote_ip }
- set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip }
- do:
cluster.put_settings:
flat_settings: true
body:
transient:
search.remote.test_remote_cluster.seeds: $remote_ip
cluster.remote.test_remote_cluster.seeds: $remote_ip
- match: {transient: {search.remote.test_remote_cluster.seeds: $remote_ip}}
- match: {transient: {cluster.remote.test_remote_cluster.seeds: $remote_ip}}
- do:
search:

View File

@ -14,16 +14,16 @@
cluster.get_settings:
include_defaults: true
- set: { defaults.search.remote.my_remote_cluster.seeds.0: remote_ip }
- set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip }
- do:
cluster.put_settings:
flat_settings: true
body:
transient:
search.remote.test_remote_cluster.seeds: $remote_ip
cluster.remote.test_remote_cluster.seeds: $remote_ip
- match: {transient: {search.remote.test_remote_cluster.seeds: $remote_ip}}
- match: {transient: {cluster.remote.test_remote_cluster.seeds: $remote_ip}}
# we do another search here since this will enforce the connection to be established
# otherwise the cluster might not have been connected yet.
@ -56,7 +56,7 @@
cluster.put_settings:
body:
transient:
search.remote.test_remote_cluster.seeds: null
cluster.remote.test_remote_cluster.seeds: null
---
"skip_unavailable is returned as part of _remote/info response":
@ -68,16 +68,16 @@
cluster.get_settings:
include_defaults: true
- set: { defaults.search.remote.my_remote_cluster.seeds.0: remote_ip }
- set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip }
- do:
cluster.put_settings:
flat_settings: true
body:
transient:
search.remote.remote1.seeds: $remote_ip
cluster.remote.remote1.seeds: $remote_ip
- match: {transient: {search.remote.remote1.seeds: $remote_ip}}
- match: {transient: {cluster.remote.remote1.seeds: $remote_ip}}
- do:
cluster.remote_info: {}
@ -87,9 +87,9 @@
cluster.put_settings:
body:
transient:
search.remote.remote1.skip_unavailable: true
cluster.remote.remote1.skip_unavailable: true
- is_true: transient.search.remote.remote1.skip_unavailable
- is_true: transient.cluster.remote.remote1.skip_unavailable
- do:
cluster.remote_info: {}
@ -100,9 +100,9 @@
cluster.put_settings:
body:
transient:
search.remote.remote1.skip_unavailable: false
cluster.remote.remote1.skip_unavailable: false
- is_false: transient.search.remote.remote1.skip_unavailable
- is_false: transient.cluster.remote.remote1.skip_unavailable
- do:
cluster.remote_info: {}
@ -113,7 +113,7 @@
cluster.put_settings:
body:
transient:
search.remote.remote1.skip_unavailable: null
cluster.remote.remote1.skip_unavailable: null
- match: {transient: {}}
@ -126,5 +126,5 @@
cluster.put_settings:
body:
transient:
search.remote.remote1.seeds: null
search.remote.remote1.skip_unavailable: null
cluster.remote.remote1.seeds: null
cluster.remote.remote1.skip_unavailable: null

View File

@ -273,12 +273,19 @@ public final class ClusterSettings extends AbstractScopedSettings {
ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING,
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS,
RemoteClusterAware.SEARCH_REMOTE_CLUSTERS_SEEDS,
RemoteClusterAware.REMOTE_CLUSTERS_PROXY,
RemoteClusterAware.SEARCH_REMOTE_CLUSTERS_PROXY,
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE,
RemoteClusterService.SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE,
RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER,
RemoteClusterService.SEARCH_REMOTE_CONNECTIONS_PER_CLUSTER,
RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING,
RemoteClusterService.SEARCH_REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING,
RemoteClusterService.REMOTE_NODE_ATTRIBUTE,
RemoteClusterService.SEARCH_REMOTE_NODE_ATTRIBUTE,
RemoteClusterService.ENABLE_REMOTE_CLUSTERS,
RemoteClusterService.SEARCH_ENABLE_REMOTE_CLUSTERS,
TransportService.TRACE_LOG_EXCLUDE_SETTING,
TransportService.TRACE_LOG_INCLUDE_SETTING,
TransportCloseIndexAction.CLUSTER_INDICES_CLOSE_ENABLE_SETTING,

View File

@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.settings;
import org.apache.logging.log4j.Logger;
@ -753,7 +754,7 @@ public class Setting<T> implements ToXContentObject {
/**
* Returns the namespace for a concrete setting. Ie. an affix setting with prefix: {@code search.} and suffix: {@code username}
* will return {@code remote} as a namespace for the setting {@code search.remote.username}
* will return {@code remote} as a namespace for the setting {@code cluster.remote.username}
*/
public String getNamespace(Setting<T> concreteSetting) {
return key.getNamespace(concreteSetting.getKey());
@ -1043,7 +1044,15 @@ public class Setting<T> implements ToXContentObject {
}
public static Setting<String> simpleString(String key, Setting<String> fallback, Property... properties) {
return new Setting<>(key, fallback, Function.identity(), properties);
return simpleString(key, fallback, Function.identity(), properties);
}
public static Setting<String> simpleString(
final String key,
final Setting<String> fallback,
final Function<String, String> parser,
final Property... properties) {
return new Setting<>(key, fallback, parser, properties);
}
public static Setting<String> simpleString(String key, Validator<String> validator, Property... properties) {
@ -1275,15 +1284,41 @@ public class Setting<T> implements ToXContentObject {
return new GroupSetting(key, validator, properties);
}
public static Setting<TimeValue> timeSetting(String key, Function<Settings, TimeValue> defaultValue, TimeValue minValue,
Property... properties) {
return new Setting<>(key, (s) -> defaultValue.apply(s).getStringRep(), (s) -> {
TimeValue timeValue = TimeValue.parseTimeValue(s, null, key);
if (timeValue.millis() < minValue.millis()) {
throw new IllegalArgumentException("Failed to parse value [" + s + "] for setting [" + key + "] must be >= " + minValue);
public static Setting<TimeValue> timeSetting(
final String key,
final Setting<TimeValue> fallbackSetting,
final TimeValue minValue,
final Property... properties) {
final SimpleKey simpleKey = new SimpleKey(key);
return new Setting<>(
simpleKey,
fallbackSetting,
fallbackSetting::getRaw,
minTimeValueParser(key, minValue),
(v, s) -> {},
properties);
}
public static Setting<TimeValue> timeSetting(
final String key, Function<Settings, TimeValue> defaultValue, final TimeValue minValue, final Property... properties) {
final SimpleKey simpleKey = new SimpleKey(key);
return new Setting<>(simpleKey, s -> defaultValue.apply(s).getStringRep(), minTimeValueParser(key, minValue), properties);
}
private static Function<String, TimeValue> minTimeValueParser(final String key, final TimeValue minValue) {
return s -> {
final TimeValue value = TimeValue.parseTimeValue(s, null, key);
if (value.millis() < minValue.millis()) {
final String message = String.format(
Locale.ROOT,
"failed to parse value [%s] for setting [%s], must be >= [%s]",
s,
key,
minValue.getStringRep());
throw new IllegalArgumentException(message);
}
return timeValue;
}, properties);
return value;
};
}
public static Setting<TimeValue> timeSetting(String key, TimeValue defaultValue, TimeValue minValue, Property... properties) {
@ -1302,6 +1337,14 @@ public class Setting<T> implements ToXContentObject {
return timeSetting(key, defaultValue, TimeValue.timeValueMillis(0), properties);
}
public static Setting<TimeValue> positiveTimeSetting(
final String key,
final Setting<TimeValue> fallbackSetting,
final TimeValue minValue,
final Property... properties) {
return timeSetting(key, fallbackSetting, minValue, properties);
}
public static Setting<Double> doubleSetting(String key, double defaultValue, double minValue, Property... properties) {
return doubleSetting(key, defaultValue, minValue, Double.POSITIVE_INFINITY, properties);
}

View File

@ -16,10 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import java.util.EnumSet;
import java.util.function.Supplier;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.ClusterNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -37,11 +36,13 @@ import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -50,40 +51,83 @@ import java.util.stream.Stream;
*/
public abstract class RemoteClusterAware extends AbstractComponent {
public static final Setting.AffixSetting<List<String>> SEARCH_REMOTE_CLUSTERS_SEEDS =
Setting.affixKeySetting(
"search.remote.",
"seeds",
key -> Setting.listSetting(
key,
Collections.emptyList(),
s -> {
parsePort(s);
return s;
},
Setting.Property.Deprecated,
Setting.Property.Dynamic,
Setting.Property.NodeScope));
/**
* A list of initial seed nodes to discover eligible nodes from the remote cluster
*/
public static final Setting.AffixSetting<List<String>> REMOTE_CLUSTERS_SEEDS = Setting.affixKeySetting(
"search.remote.",
"seeds",
key -> Setting.listSetting(
key, Collections.emptyList(),
s -> {
// validate seed address
parsePort(s);
return s;
},
Setting.Property.NodeScope,
Setting.Property.Dynamic
)
);
"cluster.remote.",
"seeds",
key -> Setting.listSetting(
key,
// the default needs to be emptyList() when fallback is removed
"_na_".equals(key)
? SEARCH_REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(key)
: SEARCH_REMOTE_CLUSTERS_SEEDS.getConcreteSetting(key.replaceAll("^cluster", "search")),
s -> {
// validate seed address
parsePort(s);
return s;
},
Setting.Property.Dynamic,
Setting.Property.NodeScope));
public static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':';
public static final String LOCAL_CLUSTER_GROUP_KEY = "";
public static final Setting.AffixSetting<String> SEARCH_REMOTE_CLUSTERS_PROXY = Setting.affixKeySetting(
"search.remote.",
"proxy",
key -> Setting.simpleString(
key,
s -> {
if (Strings.hasLength(s)) {
parsePort(s);
}
return s;
},
Setting.Property.Deprecated,
Setting.Property.Dynamic,
Setting.Property.NodeScope),
REMOTE_CLUSTERS_SEEDS);
/**
* A proxy address for the remote cluster.
* NOTE: this settings is undocumented until we have at last one transport that supports passing
* on the hostname via a mechanism like SNI.
*/
public static final Setting.AffixSetting<String> REMOTE_CLUSTERS_PROXY = Setting.affixKeySetting(
"search.remote.",
"proxy",
key -> Setting.simpleString(key, s -> {
if (Strings.hasLength(s)) {
parsePort(s);
}
return s;
}, Setting.Property.NodeScope, Setting.Property.Dynamic), REMOTE_CLUSTERS_SEEDS);
"cluster.remote.",
"proxy",
key -> Setting.simpleString(
key,
// no default is needed when fallback is removed, use simple string which gives empty
"_na_".equals(key)
? SEARCH_REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(key)
: SEARCH_REMOTE_CLUSTERS_PROXY.getConcreteSetting(key.replaceAll("^cluster", "search")),
s -> {
if (Strings.hasLength(s)) {
parsePort(s);
}
return s;
},
Setting.Property.Dynamic,
Setting.Property.NodeScope),
REMOTE_CLUSTERS_SEEDS);
protected final ClusterNameExpressionResolver clusterNameResolver;
@ -105,16 +149,16 @@ public abstract class RemoteClusterAware extends AbstractComponent {
protected static Map<String, Tuple<String, List<Supplier<DiscoveryNode>>>> buildRemoteClustersDynamicConfig(Settings settings) {
Stream<Setting<List<String>>> allConcreteSettings = REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings);
return allConcreteSettings.collect(
Collectors.toMap(REMOTE_CLUSTERS_SEEDS::getNamespace, concreteSetting -> {
String clusterName = REMOTE_CLUSTERS_SEEDS.getNamespace(concreteSetting);
List<String> addresses = concreteSetting.get(settings);
final boolean proxyMode = REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterName).exists(settings);
List<Supplier<DiscoveryNode>> nodes = new ArrayList<>(addresses.size());
for (String address : addresses) {
nodes.add(() -> buildSeedNode(clusterName, address, proxyMode));
}
return new Tuple<>(REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterName).get(settings), nodes);
}));
Collectors.toMap(REMOTE_CLUSTERS_SEEDS::getNamespace, concreteSetting -> {
String clusterName = REMOTE_CLUSTERS_SEEDS.getNamespace(concreteSetting);
List<String> addresses = concreteSetting.get(settings);
final boolean proxyMode = REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterName).exists(settings);
List<Supplier<DiscoveryNode>> nodes = new ArrayList<>(addresses.size());
for (String address : addresses) {
nodes.add(() -> buildSeedNode(clusterName, address, proxyMode));
}
return new Tuple<>(REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterName).get(settings), nodes);
}));
}
static DiscoveryNode buildSeedNode(String clusterName, String address, boolean proxyMode) {
@ -122,14 +166,14 @@ public abstract class RemoteClusterAware extends AbstractComponent {
TransportAddress transportAddress = new TransportAddress(TransportAddress.META_ADDRESS, 0);
String hostName = address.substring(0, indexOfPortSeparator(address));
return new DiscoveryNode("", clusterName + "#" + address, UUIDs.randomBase64UUID(), hostName, address,
transportAddress, Collections
.emptyMap(), EnumSet.allOf(DiscoveryNode.Role.class),
Version.CURRENT.minimumCompatibilityVersion());
transportAddress, Collections
.emptyMap(), EnumSet.allOf(DiscoveryNode.Role.class),
Version.CURRENT.minimumCompatibilityVersion());
} else {
TransportAddress transportAddress = new TransportAddress(RemoteClusterAware.parseSeedAddress(address));
return new DiscoveryNode(clusterName + "#" + transportAddress.toString(),
transportAddress,
Version.CURRENT.minimumCompatibilityVersion());
transportAddress,
Version.CURRENT.minimumCompatibilityVersion());
}
}
@ -157,8 +201,8 @@ public abstract class RemoteClusterAware extends AbstractComponent {
// remote_cluster_alias:index_name - for this case we fail the request. the user can easily change the cluster alias
// if that happens
throw new IllegalArgumentException("Can not filter indices; index " + index +
" exists but there is also a remote cluster named: " + remoteClusterName);
}
" exists but there is also a remote cluster named: " + remoteClusterName);
}
String indexName = index.substring(i + 1);
for (String clusterName : clusters) {
perClusterIndices.computeIfAbsent(clusterName, k -> new ArrayList<>()).add(indexName);
@ -186,10 +230,16 @@ public abstract class RemoteClusterAware extends AbstractComponent {
* Registers this instance to listen to updates on the cluster settings.
*/
public void listenForUpdates(ClusterSettings clusterSettings) {
clusterSettings.addAffixUpdateConsumer(RemoteClusterAware.REMOTE_CLUSTERS_PROXY,
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS,
(key, value) -> updateRemoteCluster(key, value.v2(), value.v1()),
(namespace, value) -> {});
clusterSettings.addAffixUpdateConsumer(
RemoteClusterAware.REMOTE_CLUSTERS_PROXY,
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS,
(key, value) -> updateRemoteCluster(key, value.v2(), value.v1()),
(namespace, value) -> {});
clusterSettings.addAffixUpdateConsumer(
RemoteClusterAware.SEARCH_REMOTE_CLUSTERS_PROXY,
RemoteClusterAware.SEARCH_REMOTE_CLUSTERS_SEEDS,
(key, value) -> updateRemoteCluster(key, value.v2(), value.v1()),
(namespace, value) -> {});
}
@ -227,4 +277,5 @@ public abstract class RemoteClusterAware extends AbstractComponent {
public static String buildRemoteIndexName(String clusterAlias, String indexName) {
return clusterAlias != null ? clusterAlias + REMOTE_CLUSTER_INDEX_SEPARATOR + indexName : indexName;
}
}

View File

@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import java.util.Collection;
@ -64,18 +65,39 @@ import static org.elasticsearch.common.settings.Setting.boolSetting;
*/
public final class RemoteClusterService extends RemoteClusterAware implements Closeable {
public static final Setting<Integer> SEARCH_REMOTE_CONNECTIONS_PER_CLUSTER =
Setting.intSetting("search.remote.connections_per_cluster", 3, 1, Setting.Property.NodeScope, Setting.Property.Deprecated);
/**
* The maximum number of connections that will be established to a remote cluster. For instance if there is only a single
* seed node, other nodes will be discovered up to the given number of nodes in this setting. The default is 3.
*/
public static final Setting<Integer> REMOTE_CONNECTIONS_PER_CLUSTER = Setting.intSetting("search.remote.connections_per_cluster",
3, 1, Setting.Property.NodeScope);
public static final Setting<Integer> REMOTE_CONNECTIONS_PER_CLUSTER =
Setting.intSetting(
"cluster.remote.connections_per_cluster",
SEARCH_REMOTE_CONNECTIONS_PER_CLUSTER, // the default needs to three when fallback is removed
1,
Setting.Property.NodeScope);
public static final Setting<TimeValue> SEARCH_REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING =
Setting.positiveTimeSetting(
"search.remote.initial_connect_timeout",
TimeValue.timeValueSeconds(30),
Setting.Property.NodeScope,
Setting.Property.Deprecated);
/**
* The initial connect timeout for remote cluster connections
*/
public static final Setting<TimeValue> REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING =
Setting.positiveTimeSetting("search.remote.initial_connect_timeout", TimeValue.timeValueSeconds(30), Setting.Property.NodeScope);
Setting.positiveTimeSetting(
"cluster.remote.initial_connect_timeout",
SEARCH_REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING, // the default needs to be thirty seconds when fallback is removed
TimeValue.timeValueSeconds(30),
Setting.Property.NodeScope);
public static final Setting<String> SEARCH_REMOTE_NODE_ATTRIBUTE =
Setting.simpleString("search.remote.node.attr", Setting.Property.NodeScope, Setting.Property.Deprecated);
/**
* The name of a node attribute to select nodes that should be connected to in the remote cluster.
@ -83,20 +105,46 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
* clusters. In that case {@code search.remote.node.attr: gateway} can be used to filter out other nodes in the remote cluster.
* The value of the setting is expected to be a boolean, {@code true} for nodes that can become gateways, {@code false} otherwise.
*/
public static final Setting<String> REMOTE_NODE_ATTRIBUTE = Setting.simpleString("search.remote.node.attr",
Setting.Property.NodeScope);
public static final Setting<String> REMOTE_NODE_ATTRIBUTE =
Setting.simpleString(
"cluster.remote.node.attr",
SEARCH_REMOTE_NODE_ATTRIBUTE, // no default is needed when fallback is removed, use simple string which gives empty
Setting.Property.NodeScope);
public static final Setting<Boolean> SEARCH_ENABLE_REMOTE_CLUSTERS =
Setting.boolSetting("search.remote.connect", true, Setting.Property.NodeScope, Setting.Property.Deprecated);
/**
* If <code>true</code> connecting to remote clusters is supported on this node. If <code>false</code> this node will not establish
* connections to any remote clusters configured. Search requests executed against this node (where this node is the coordinating node)
* will fail if remote cluster syntax is used as an index pattern. The default is <code>true</code>
*/
public static final Setting<Boolean> ENABLE_REMOTE_CLUSTERS = Setting.boolSetting("search.remote.connect", true,
Setting.Property.NodeScope);
public static final Setting<Boolean> ENABLE_REMOTE_CLUSTERS =
Setting.boolSetting(
"cluster.remote.connect",
SEARCH_ENABLE_REMOTE_CLUSTERS, // the default needs to be true when fallback is removed
Setting.Property.NodeScope);
public static final Setting.AffixSetting<Boolean> SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE =
Setting.affixKeySetting(
"search.remote.",
"skip_unavailable",
key -> boolSetting(key, false, Setting.Property.Deprecated, Setting.Property.Dynamic, Setting.Property.NodeScope),
REMOTE_CLUSTERS_SEEDS);
public static final Setting.AffixSetting<Boolean> REMOTE_CLUSTER_SKIP_UNAVAILABLE =
Setting.affixKeySetting("search.remote.", "skip_unavailable",
key -> boolSetting(key, false, Setting.Property.NodeScope, Setting.Property.Dynamic), REMOTE_CLUSTERS_SEEDS);
Setting.affixKeySetting(
"cluster.remote.",
"skip_unavailable",
key -> boolSetting(
key,
// the default needs to be false when fallback is removed
"_na_".equals(key)
? SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSettingForNamespace(key)
: SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSetting(key.replaceAll("^cluster", "search")),
Setting.Property.Dynamic,
Setting.Property.NodeScope),
REMOTE_CLUSTERS_SEEDS);
private static final Predicate<DiscoveryNode> DEFAULT_NODE_PREDICATE = (node) -> Version.CURRENT.isCompatible(node.getVersion())
&& (node.isMasterNode() == false || node.isDataNode() || node.isIngestNode());
@ -144,27 +192,27 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
if (remote == null) { // this is a new cluster we have to add a new representation
remote = new RemoteClusterConnection(settings, entry.getKey(), seedList, transportService,
new ConnectionManager(settings, transportService.transport, transportService.threadPool), numRemoteConnections,
getNodePredicate(settings), proxyAddress);
new ConnectionManager(settings, transportService.transport, transportService.threadPool), numRemoteConnections,
getNodePredicate(settings), proxyAddress);
remoteClusters.put(entry.getKey(), remote);
}
// now update the seed nodes no matter if it's new or already existing
RemoteClusterConnection finalRemote = remote;
remote.updateSeedNodes(proxyAddress, seedList, ActionListener.wrap(
response -> {
if (countDown.countDown()) {
connectionListener.onResponse(response);
}
},
exception -> {
if (countDown.fastForward()) {
connectionListener.onFailure(exception);
}
if (finalRemote.isClosed() == false) {
logger.warn("failed to update seed list for cluster: " + entry.getKey(), exception);
}
}));
response -> {
if (countDown.countDown()) {
connectionListener.onResponse(response);
}
},
exception -> {
if (countDown.fastForward()) {
connectionListener.onFailure(exception);
}
if (finalRemote.isClosed() == false) {
logger.warn("failed to update seed list for cluster: " + entry.getKey(), exception);
}
}));
}
}
this.remoteClusters = Collections.unmodifiableMap(remoteClusters);
@ -198,7 +246,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
String clusterAlias = entry.getKey();
List<String> originalIndices = entry.getValue();
originalIndicesMap.put(clusterAlias,
new OriginalIndices(originalIndices.toArray(new String[originalIndices.size()]), indicesOptions));
new OriginalIndices(originalIndices.toArray(new String[originalIndices.size()]), indicesOptions));
}
if (originalIndicesMap.containsKey(LOCAL_CLUSTER_GROUP_KEY) == false) {
originalIndicesMap.put(LOCAL_CLUSTER_GROUP_KEY, new OriginalIndices(Strings.EMPTY_ARRAY, indicesOptions));
@ -230,38 +278,38 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
}
final String[] indices = entry.getValue().indices();
ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices)
.indicesOptions(indicesOptions).local(true).preference(preference)
.routing(routing);
.indicesOptions(indicesOptions).local(true).preference(preference)
.routing(routing);
remoteClusterConnection.fetchSearchShards(searchShardsRequest,
new ActionListener<ClusterSearchShardsResponse>() {
@Override
public void onResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) {
searchShardsResponses.put(clusterName, clusterSearchShardsResponse);
if (responsesCountDown.countDown()) {
RemoteTransportException exception = transportException.get();
if (exception == null) {
listener.onResponse(searchShardsResponses);
} else {
listener.onFailure(transportException.get());
new ActionListener<ClusterSearchShardsResponse>() {
@Override
public void onResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) {
searchShardsResponses.put(clusterName, clusterSearchShardsResponse);
if (responsesCountDown.countDown()) {
RemoteTransportException exception = transportException.get();
if (exception == null) {
listener.onResponse(searchShardsResponses);
} else {
listener.onFailure(transportException.get());
}
}
}
}
@Override
public void onFailure(Exception e) {
RemoteTransportException exception = new RemoteTransportException("error while communicating with remote cluster ["
+ clusterName + "]", e);
if (transportException.compareAndSet(null, exception) == false) {
exception = transportException.accumulateAndGet(exception, (previous, current) -> {
current.addSuppressed(previous);
return current;
});
@Override
public void onFailure(Exception e) {
RemoteTransportException exception =
new RemoteTransportException("error while communicating with remote cluster [" + clusterName + "]", e);
if (transportException.compareAndSet(null, exception) == false) {
exception = transportException.accumulateAndGet(exception, (previous, current) -> {
current.addSuppressed(previous);
return current;
});
}
if (responsesCountDown.countDown()) {
listener.onFailure(exception);
}
}
if (responsesCountDown.countDown()) {
listener.onFailure(exception);
}
}
});
});
}
}
@ -306,6 +354,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
public void listenForUpdates(ClusterSettings clusterSettings) {
super.listenForUpdates(clusterSettings);
clusterSettings.addAffixUpdateConsumer(REMOTE_CLUSTER_SKIP_UNAVAILABLE, this::updateSkipUnavailable, (alias, value) -> {});
clusterSettings.addAffixUpdateConsumer(SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE, this::updateSkipUnavailable, (alias, value) -> {});
}
synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavailable) {
@ -327,7 +376,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
final String proxyAddress,
final ActionListener<Void> connectionListener) {
final List<Supplier<DiscoveryNode>> nodes = addresses.stream().<Supplier<DiscoveryNode>>map(address -> () ->
buildSeedNode(clusterAlias, address, Strings.hasLength(proxyAddress))
buildSeedNode(clusterAlias, address, Strings.hasLength(proxyAddress))
).collect(Collectors.toList());
updateRemoteClusters(Collections.singletonMap(clusterAlias, new Tuple<>(proxyAddress, nodes)), connectionListener);
}
@ -387,7 +436,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
}
if (countDown.countDown()) {
listener.onResponse((clusterAlias, nodeId)
-> clusterMap.getOrDefault(clusterAlias, nullFunction).apply(nodeId));
-> clusterMap.getOrDefault(clusterAlias, nullFunction).apply(nodeId));
}
}
@ -418,4 +467,5 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
Collection<RemoteClusterConnection> getConnections() {
return remoteClusters.values();
}
}

View File

@ -60,7 +60,7 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
final boolean shard2 = randomBoolean();
SearchTransportService searchTransportService = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null, null) {
Settings.builder().put("cluster.remote.connect", false).build(), null, null) {
@Override
public void sendCanMatch(Transport.Connection connection, ShardSearchTransportRequest request, SearchTask task,
@ -119,7 +119,7 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
lookup.put("node2", new SearchAsyncActionTests.MockConnection(replicaNode));
final boolean shard1 = randomBoolean();
SearchTransportService searchTransportService = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null, null) {
Settings.builder().put("cluster.remote.connect", false).build(), null, null) {
@Override
public void sendCanMatch(Transport.Connection connection, ShardSearchTransportRequest request, SearchTask task,
@ -186,7 +186,7 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
final SearchTransportService searchTransportService =
new SearchTransportService(Settings.builder().put("search.remote.connect", false).build(), null, null) {
new SearchTransportService(Settings.builder().put("cluster.remote.connect", false).build(), null, null) {
@Override
public void sendCanMatch(
Transport.Connection connection,

View File

@ -60,7 +60,7 @@ public class DfsQueryPhaseTests extends ESTestCase {
SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY,
(b) -> new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, b));
SearchTransportService searchTransportService = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null, null) {
Settings.builder().put("cluster.remote.connect", false).build(), null, null) {
@Override
public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest request, SearchTask task,
@ -118,7 +118,7 @@ public class DfsQueryPhaseTests extends ESTestCase {
SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY,
(b) -> new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, b));
SearchTransportService searchTransportService = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null, null) {
Settings.builder().put("cluster.remote.connect", false).build(), null, null) {
@Override
public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest request, SearchTask task,
@ -175,7 +175,7 @@ public class DfsQueryPhaseTests extends ESTestCase {
SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY,
(b) -> new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, b));
SearchTransportService searchTransportService = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null, null) {
Settings.builder().put("cluster.remote.connect", false).build(), null, null) {
@Override
public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest request, SearchTask task,

View File

@ -70,7 +70,7 @@ public class ExpandSearchPhaseTests extends ESTestCase {
.collect(Collectors.toList()))));
mockSearchPhaseContext.getRequest().source().query(originalQuery);
mockSearchPhaseContext.searchTransport = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null, null) {
Settings.builder().put("cluster.remote.connect", false).build(), null, null) {
@Override
void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionListener<MultiSearchResponse> listener) {
@ -145,7 +145,7 @@ public class ExpandSearchPhaseTests extends ESTestCase {
mockSearchPhaseContext.getRequest().source(new SearchSourceBuilder()
.collapse(new CollapseBuilder("someField").setInnerHits(new InnerHitBuilder().setName("foobarbaz"))));
mockSearchPhaseContext.searchTransport = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null, null) {
Settings.builder().put("cluster.remote.connect", false).build(), null, null) {
@Override
void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionListener<MultiSearchResponse> listener) {
@ -187,7 +187,7 @@ public class ExpandSearchPhaseTests extends ESTestCase {
public void testSkipPhase() throws IOException {
MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(1);
mockSearchPhaseContext.searchTransport = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null, null) {
Settings.builder().put("cluster.remote.connect", false).build(), null, null) {
@Override
void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionListener<MultiSearchResponse> listener) {
@ -218,7 +218,7 @@ public class ExpandSearchPhaseTests extends ESTestCase {
public void testSkipExpandCollapseNoHits() throws IOException {
MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(1);
mockSearchPhaseContext.searchTransport = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null, null) {
Settings.builder().put("cluster.remote.connect", false).build(), null, null) {
@Override
void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionListener<MultiSearchResponse> listener) {
@ -250,7 +250,7 @@ public class ExpandSearchPhaseTests extends ESTestCase {
boolean version = randomBoolean();
mockSearchPhaseContext.searchTransport = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null, null) {
Settings.builder().put("cluster.remote.connect", false).build(), null, null) {
@Override
void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionListener<MultiSearchResponse> listener) {

View File

@ -106,7 +106,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
results.consumeResult(queryResult);
SearchTransportService searchTransportService = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null, null) {
Settings.builder().put("cluster.remote.connect", false).build(), null, null) {
@Override
public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task,
SearchActionListener<FetchSearchResult> listener) {
@ -161,7 +161,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
results.consumeResult(queryResult);
SearchTransportService searchTransportService = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null, null) {
Settings.builder().put("cluster.remote.connect", false).build(), null, null) {
@Override
public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task,
SearchActionListener<FetchSearchResult> listener) {
@ -215,7 +215,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
results.consumeResult(queryResult);
}
SearchTransportService searchTransportService = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null, null) {
Settings.builder().put("cluster.remote.connect", false).build(), null, null) {
@Override
public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task,
SearchActionListener<FetchSearchResult> listener) {
@ -277,7 +277,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
results.consumeResult(queryResult);
AtomicInteger numFetches = new AtomicInteger(0);
SearchTransportService searchTransportService = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null, null) {
Settings.builder().put("cluster.remote.connect", false).build(), null, null) {
@Override
public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task,
SearchActionListener<FetchSearchResult> listener) {
@ -331,7 +331,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
results.consumeResult(queryResult);
SearchTransportService searchTransportService = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null, null) {
Settings.builder().put("cluster.remote.connect", false).build(), null, null) {
@Override
public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task,
SearchActionListener<FetchSearchResult> listener) {

View File

@ -286,7 +286,7 @@ public class ClusterSettingsIT extends ESIntegTestCase {
.get();
fail("bogus value");
} catch (IllegalArgumentException ex) {
assertEquals(ex.getMessage(), "Failed to parse value [-1] for setting [discovery.zen.publish_timeout] must be >= 0s");
assertEquals(ex.getMessage(), "failed to parse value [-1] for setting [discovery.zen.publish_timeout], must be >= [0ms]");
}
assertThat(discoverySettings.getPublishTimeout().seconds(), equalTo(1L));

View File

@ -318,7 +318,7 @@ public class IndexServiceTests extends ESSingleNodeTestCase {
createIndex("test", settings);
fail();
} catch (IllegalArgumentException ex) {
assertEquals("Failed to parse value [0ms] for setting [index.translog.sync_interval] must be >= 100ms", ex.getMessage());
assertEquals("failed to parse value [0ms] for setting [index.translog.sync_interval], must be >= [100ms]", ex.getMessage());
}
}
}

View File

@ -258,7 +258,7 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
Exception e = expectThrows(IllegalArgumentException.class,
() -> new MockController(Settings.builder()
.put("indices.memory.interval", "-42s").build()));
assertEquals("Failed to parse value [-42s] for setting [indices.memory.interval] must be >= 0s", e.getMessage());
assertEquals("failed to parse value [-42s] for setting [indices.memory.interval], must be >= [0ms]", e.getMessage());
}
@ -266,7 +266,7 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
Exception e = expectThrows(IllegalArgumentException.class,
() -> new MockController(Settings.builder()
.put("indices.memory.shard_inactive_time", "-42s").build()));
assertEquals("Failed to parse value [-42s] for setting [indices.memory.shard_inactive_time] must be >= 0s", e.getMessage());
assertEquals("failed to parse value [-42s] for setting [indices.memory.shard_inactive_time], must be >= [0ms]", e.getMessage());
}

View File

@ -52,7 +52,7 @@ public class RemoteClusterClientTests extends ESTestCase {
Settings localSettings = Settings.builder()
.put(RemoteClusterService.ENABLE_REMOTE_CLUSTERS.getKey(), true)
.put("search.remote.test.seeds", remoteNode.getAddress().getAddress() + ":" + remoteNode.getAddress().getPort()).build();
.put("cluster.remote.test.seeds", remoteNode.getAddress().getAddress() + ":" + remoteNode.getAddress().getPort()).build();
try (MockTransportService service = MockTransportService.createNewService(localSettings, Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
@ -77,7 +77,7 @@ public class RemoteClusterClientTests extends ESTestCase {
DiscoveryNode remoteNode = remoteTransport.getLocalDiscoNode();
Settings localSettings = Settings.builder()
.put(RemoteClusterService.ENABLE_REMOTE_CLUSTERS.getKey(), true)
.put("search.remote.test.seeds", remoteNode.getAddress().getAddress() + ":" + remoteNode.getAddress().getPort()).build();
.put("cluster.remote.test.seeds", remoteNode.getAddress().getAddress() + ":" + remoteNode.getAddress().getPort()).build();
try (MockTransportService service = MockTransportService.createNewService(localSettings, Version.CURRENT, threadPool, null)) {
Semaphore semaphore = new Semaphore(1);
service.start();

View File

@ -98,17 +98,17 @@ public class RemoteClusterServiceTests extends ESTestCase {
public void testRemoteClusterSeedSetting() {
// simple validation
Settings settings = Settings.builder()
.put("search.remote.foo.seeds", "192.168.0.1:8080")
.put("search.remote.bar.seed", "[::1]:9090").build();
.put("cluster.remote.foo.seeds", "192.168.0.1:8080")
.put("cluster.remote.bar.seed", "[::1]:9090").build();
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings).forEach(setting -> setting.get(settings));
Settings brokenSettings = Settings.builder()
.put("search.remote.foo.seeds", "192.168.0.1").build();
.put("cluster.remote.foo.seeds", "192.168.0.1").build();
expectThrows(IllegalArgumentException.class, () ->
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(brokenSettings).forEach(setting -> setting.get(brokenSettings)));
Settings brokenPortSettings = Settings.builder()
.put("search.remote.foo.seeds", "192.168.0.1:123456789123456789").build();
.put("cluster.remote.foo.seeds", "192.168.0.1:123456789123456789").build();
Exception e = expectThrows(
IllegalArgumentException.class,
() -> RemoteClusterAware.REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(brokenSettings)
@ -119,10 +119,10 @@ public class RemoteClusterServiceTests extends ESTestCase {
public void testBuildRemoteClustersDynamicConfig() throws Exception {
Map<String, Tuple<String, List<Supplier<DiscoveryNode>>>> map = RemoteClusterService.buildRemoteClustersDynamicConfig(
Settings.builder().put("search.remote.foo.seeds", "192.168.0.1:8080")
.put("search.remote.bar.seeds", "[::1]:9090")
.put("search.remote.boom.seeds", "boom-node1.internal:1000")
.put("search.remote.boom.proxy", "foo.bar.com:1234").build());
Settings.builder().put("cluster.remote.foo.seeds", "192.168.0.1:8080")
.put("cluster.remote.bar.seeds", "[::1]:9090")
.put("cluster.remote.boom.seeds", "boom-node1.internal:1000")
.put("cluster.remote.boom.proxy", "foo.bar.com:1234").build());
assertEquals(3, map.size());
assertTrue(map.containsKey("foo"));
assertTrue(map.containsKey("bar"));
@ -167,8 +167,8 @@ public class RemoteClusterServiceTests extends ESTestCase {
transportService.start();
transportService.acceptIncomingRequests();
Settings.Builder builder = Settings.builder();
builder.putList("search.remote.cluster_1.seeds", seedNode.getAddress().toString());
builder.putList("search.remote.cluster_2.seeds", otherSeedNode.getAddress().toString());
builder.putList("cluster.remote.cluster_1.seeds", seedNode.getAddress().toString());
builder.putList("cluster.remote.cluster_2.seeds", otherSeedNode.getAddress().toString());
try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
@ -213,8 +213,8 @@ public class RemoteClusterServiceTests extends ESTestCase {
transportService.start();
transportService.acceptIncomingRequests();
Settings.Builder builder = Settings.builder();
builder.putList("search.remote.cluster_1.seeds", seedNode.getAddress().toString());
builder.putList("search.remote.cluster_2.seeds", otherSeedNode.getAddress().toString());
builder.putList("cluster.remote.cluster_1.seeds", seedNode.getAddress().toString());
builder.putList("cluster.remote.cluster_2.seeds", otherSeedNode.getAddress().toString());
try (RemoteClusterService service = new RemoteClusterService(Settings.EMPTY, transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
@ -238,7 +238,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
public void testRemoteNodeAttribute() throws IOException, InterruptedException {
final Settings settings =
Settings.builder().put("search.remote.node.attr", "gateway").build();
Settings.builder().put("cluster.remote.node.attr", "gateway").build();
final List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
final Settings gateway = Settings.builder().put("node.attr.gateway", true).build();
try (MockTransportService c1N1 =
@ -268,9 +268,9 @@ public class RemoteClusterServiceTests extends ESTestCase {
transportService.acceptIncomingRequests();
final Settings.Builder builder = Settings.builder();
builder.putList(
"search.remote.cluster_1.seeds", c1N1Node.getAddress().toString());
"cluster.remote.cluster_1.seeds", c1N1Node.getAddress().toString());
builder.putList(
"search.remote.cluster_2.seeds", c2N1Node.getAddress().toString());
"cluster.remote.cluster_2.seeds", c2N1Node.getAddress().toString());
try (RemoteClusterService service =
new RemoteClusterService(settings, transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
@ -335,8 +335,8 @@ public class RemoteClusterServiceTests extends ESTestCase {
transportService.start();
transportService.acceptIncomingRequests();
final Settings.Builder builder = Settings.builder();
builder.putList("search.remote.cluster_1.seeds", c1N1Node.getAddress().toString());
builder.putList("search.remote.cluster_2.seeds", c2N1Node.getAddress().toString());
builder.putList("cluster.remote.cluster_1.seeds", c1N1Node.getAddress().toString());
builder.putList("cluster.remote.cluster_2.seeds", c2N1Node.getAddress().toString());
try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
@ -406,9 +406,9 @@ public class RemoteClusterServiceTests extends ESTestCase {
transportService.acceptIncomingRequests();
final Settings.Builder builder = Settings.builder();
builder.putList(
"search.remote.cluster_1.seeds", c1N1Node.getAddress().toString());
"cluster.remote.cluster_1.seeds", c1N1Node.getAddress().toString());
builder.putList(
"search.remote.cluster_2.seeds", c2N1Node.getAddress().toString());
"cluster.remote.cluster_2.seeds", c2N1Node.getAddress().toString());
try (RemoteClusterService service =
new RemoteClusterService(settings, transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
@ -540,7 +540,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
DiscoveryNode remoteSeedNode = remoteSeedTransport.getLocalDiscoNode();
knownNodes.add(remoteSeedNode);
nodes[i] = remoteSeedNode;
builder.put("search.remote.remote" + i + ".seeds", remoteSeedNode.getAddress().toString());
builder.put("cluster.remote.remote" + i + ".seeds", remoteSeedNode.getAddress().toString());
remoteIndicesByCluster.put("remote" + i, new OriginalIndices(new String[]{"index"}, IndicesOptions.lenientExpandOpen()));
}
Settings settings = builder.build();
@ -696,13 +696,13 @@ public class RemoteClusterServiceTests extends ESTestCase {
public void testRemoteClusterSkipIfDisconnectedSetting() {
{
Settings settings = Settings.builder()
.put("search.remote.foo.skip_unavailable", true)
.put("search.remote.bar.skip_unavailable", false).build();
.put("cluster.remote.foo.skip_unavailable", true)
.put("cluster.remote.bar.skip_unavailable", false).build();
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE.getAllConcreteSettings(settings).forEach(setting -> setting.get(settings));
}
{
Settings brokenSettings = Settings.builder()
.put("search.remote.foo.skip_unavailable", "broken").build();
.put("cluster.remote.foo.skip_unavailable", "broken").build();
expectThrows(IllegalArgumentException.class, () ->
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE.getAllConcreteSettings(brokenSettings)
.forEach(setting -> setting.get(brokenSettings)));
@ -712,22 +712,22 @@ public class RemoteClusterServiceTests extends ESTestCase {
new HashSet<>(Arrays.asList(RemoteClusterAware.REMOTE_CLUSTERS_SEEDS,
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE)));
{
Settings settings = Settings.builder().put("search.remote.foo.skip_unavailable", randomBoolean()).build();
Settings settings = Settings.builder().put("cluster.remote.foo.skip_unavailable", randomBoolean()).build();
IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> service.validate(settings, true));
assertEquals("Missing required setting [search.remote.foo.seeds] for setting [search.remote.foo.skip_unavailable]",
assertEquals("Missing required setting [cluster.remote.foo.seeds] for setting [cluster.remote.foo.skip_unavailable]",
iae.getMessage());
}
{
try (MockTransportService remoteSeedTransport = startTransport("seed", new CopyOnWriteArrayList<>(), Version.CURRENT)) {
String seed = remoteSeedTransport.getLocalDiscoNode().getAddress().toString();
service.validate(Settings.builder().put("search.remote.foo.skip_unavailable", randomBoolean())
.put("search.remote.foo.seeds", seed).build(), true);
service.validate(Settings.builder().put("search.remote.foo.seeds", seed).build(), true);
service.validate(Settings.builder().put("cluster.remote.foo.skip_unavailable", randomBoolean())
.put("cluster.remote.foo.seeds", seed).build(), true);
service.validate(Settings.builder().put("cluster.remote.foo.seeds", seed).build(), true);
AbstractScopedSettings service2 = new ClusterSettings(Settings.builder().put("search.remote.foo.seeds", seed).build(),
AbstractScopedSettings service2 = new ClusterSettings(Settings.builder().put("cluster.remote.foo.seeds", seed).build(),
new HashSet<>(Arrays.asList(RemoteClusterAware.REMOTE_CLUSTERS_SEEDS,
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE)));
service2.validate(Settings.builder().put("search.remote.foo.skip_unavailable", randomBoolean()).build(), false);
service2.validate(Settings.builder().put("cluster.remote.foo.skip_unavailable", randomBoolean()).build(), false);
}
}
}
@ -789,7 +789,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
public void testGetNodePredicateNodeAttrs() {
TransportAddress address = new TransportAddress(TransportAddress.META_ADDRESS, 0);
Set<DiscoveryNode.Role> roles = new HashSet<>(EnumSet.allOf(DiscoveryNode.Role.class));
Settings settings = Settings.builder().put("search.remote.node.attr", "gateway").build();
Settings settings = Settings.builder().put("cluster.remote.node.attr", "gateway").build();
Predicate<DiscoveryNode> nodePredicate = RemoteClusterService.getNodePredicate(settings);
{
DiscoveryNode nonGatewayNode = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "false"),
@ -812,7 +812,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
public void testGetNodePredicatesCombination() {
TransportAddress address = new TransportAddress(TransportAddress.META_ADDRESS, 0);
Settings settings = Settings.builder().put("search.remote.node.attr", "gateway").build();
Settings settings = Settings.builder().put("cluster.remote.node.attr", "gateway").build();
Predicate<DiscoveryNode> nodePredicate = RemoteClusterService.getNodePredicate(settings);
Set<DiscoveryNode.Role> allRoles = new HashSet<>(EnumSet.allOf(DiscoveryNode.Role.class));
Set<DiscoveryNode.Role> dedicatedMasterRoles = new HashSet<>(EnumSet.of(DiscoveryNode.Role.MASTER));
@ -861,8 +861,8 @@ public class RemoteClusterServiceTests extends ESTestCase {
transportService.start();
transportService.acceptIncomingRequests();
Settings.Builder builder = Settings.builder();
builder.putList("search.remote.cluster_1.seeds", "cluster_1_node0:8080");
builder.put("search.remote.cluster_1.proxy", cluster1Proxy);
builder.putList("cluster.remote.cluster_1.seeds", "cluster_1_node0:8080");
builder.put("cluster.remote.cluster_1.proxy", cluster1Proxy);
try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();

View File

@ -0,0 +1,146 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.transport.RemoteClusterAware.REMOTE_CLUSTERS_PROXY;
import static org.elasticsearch.transport.RemoteClusterAware.REMOTE_CLUSTERS_SEEDS;
import static org.elasticsearch.transport.RemoteClusterAware.SEARCH_REMOTE_CLUSTERS_PROXY;
import static org.elasticsearch.transport.RemoteClusterAware.SEARCH_REMOTE_CLUSTERS_SEEDS;
import static org.elasticsearch.transport.RemoteClusterService.ENABLE_REMOTE_CLUSTERS;
import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE;
import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER;
import static org.elasticsearch.transport.RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING;
import static org.elasticsearch.transport.RemoteClusterService.REMOTE_NODE_ATTRIBUTE;
import static org.elasticsearch.transport.RemoteClusterService.SEARCH_ENABLE_REMOTE_CLUSTERS;
import static org.elasticsearch.transport.RemoteClusterService.SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE;
import static org.elasticsearch.transport.RemoteClusterService.SEARCH_REMOTE_CONNECTIONS_PER_CLUSTER;
import static org.elasticsearch.transport.RemoteClusterService.SEARCH_REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING;
import static org.elasticsearch.transport.RemoteClusterService.SEARCH_REMOTE_NODE_ATTRIBUTE;
import static org.hamcrest.Matchers.emptyCollectionOf;
import static org.hamcrest.Matchers.equalTo;
public class RemoteClusterSettingsTests extends ESTestCase {
public void testConnectionsPerClusterFallback() {
final int value = randomIntBetween(1, 8);
final Settings settings = Settings.builder().put(SEARCH_REMOTE_CONNECTIONS_PER_CLUSTER.getKey(), value).build();
assertThat(REMOTE_CONNECTIONS_PER_CLUSTER.get(settings), equalTo(value));
assertSettingDeprecationsAndWarnings(new Setting[]{SEARCH_REMOTE_CONNECTIONS_PER_CLUSTER});
}
public void testConnectionsPerClusterDefault() {
assertThat(REMOTE_CONNECTIONS_PER_CLUSTER.get(Settings.EMPTY), equalTo(3));
}
public void testInitialConnectTimeoutFallback() {
final String value = randomTimeValue(30, 300, "s");
final Settings settings = Settings.builder().put(SEARCH_REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.getKey(), value).build();
assertThat(
REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings),
equalTo(TimeValue.parseTimeValue(value, SEARCH_REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.getKey())));
assertSettingDeprecationsAndWarnings(new Setting[]{SEARCH_REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING});
}
public void testInitialConnectTimeoutDefault() {
assertThat(REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(Settings.EMPTY), equalTo(new TimeValue(30, TimeUnit.SECONDS)));
}
public void testRemoteNodeAttributeFallback() {
final String attribute = randomAlphaOfLength(8);
final Settings settings = Settings.builder().put(SEARCH_REMOTE_NODE_ATTRIBUTE.getKey(), attribute).build();
assertThat(REMOTE_NODE_ATTRIBUTE.get(settings), equalTo(attribute));
assertSettingDeprecationsAndWarnings(new Setting[]{SEARCH_REMOTE_NODE_ATTRIBUTE});
}
public void testRemoteNodeAttributeDefault() {
assertThat(REMOTE_NODE_ATTRIBUTE.get(Settings.EMPTY), equalTo(""));
}
public void testEnableRemoteClustersFallback() {
final boolean enable = randomBoolean();
final Settings settings = Settings.builder().put(SEARCH_ENABLE_REMOTE_CLUSTERS.getKey(), enable).build();
assertThat(ENABLE_REMOTE_CLUSTERS.get(settings), equalTo(enable));
assertSettingDeprecationsAndWarnings(new Setting[]{SEARCH_ENABLE_REMOTE_CLUSTERS});
}
public void testEnableRemoteClustersDefault() {
assertTrue(ENABLE_REMOTE_CLUSTERS.get(Settings.EMPTY));
}
public void testSkipUnavailableFallback() {
final String alias = randomAlphaOfLength(8);
final boolean skip = randomBoolean();
final Settings settings =
Settings.builder().put(SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSettingForNamespace(alias).getKey(), skip).build();
assertThat(REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSettingForNamespace(alias).get(settings), equalTo(skip));
assertSettingDeprecationsAndWarnings(new Setting[]{SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSettingForNamespace(alias)});
}
public void testSkipUnavailableDefault() {
final String alias = randomAlphaOfLength(8);
assertFalse(REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSettingForNamespace(alias).get(Settings.EMPTY));
}
public void testSeedsFallback() {
final String alias = randomAlphaOfLength(8);
final int numberOfSeeds = randomIntBetween(1, 8);
final List<String> seeds = new ArrayList<>(numberOfSeeds);
for (int i = 0; i < numberOfSeeds; i++) {
seeds.add("localhost:" + Integer.toString(9200 + i));
}
final Settings settings =
Settings.builder()
.put(SEARCH_REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(alias).getKey(), String.join(",", seeds)).build();
assertThat(REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(alias).get(settings), equalTo(seeds));
assertSettingDeprecationsAndWarnings(new Setting[]{SEARCH_REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(alias)});
}
public void testSeedsDefault() {
final String alias = randomAlphaOfLength(8);
assertThat(REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(alias).get(Settings.EMPTY), emptyCollectionOf(String.class));
}
public void testProxyFallback() {
final String alias = randomAlphaOfLength(8);
final String proxy = randomAlphaOfLength(8);
final int port = randomIntBetween(9200, 9300);
final String value = proxy + ":" + port;
final Settings settings =
Settings.builder()
.put(SEARCH_REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(alias).getKey(), value).build();
assertThat(REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(alias).get(settings), equalTo(value));
assertSettingDeprecationsAndWarnings(new Setting[]{SEARCH_REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(alias)});
}
public void testProxyDefault() {
final String alias = randomAlphaOfLength(8);
assertThat(REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(alias).get(Settings.EMPTY), equalTo(""));
}
}

View File

@ -49,7 +49,7 @@ information about the `xpack.security.enabled` setting, see
PUT _cluster/settings
{
"persistent": {
"search": {
"cluster": {
"remote": {
"cluster_one": {
"seeds": [ "10.0.1.1:9300" ]
@ -82,7 +82,7 @@ First, enable cluster `one` to perform cross cluster search on remote cluster
PUT _cluster/settings
{
"persistent": {
"search.remote.cluster_two.seeds": [ "10.0.2.1:9300" ]
"cluster.remote.cluster_two.seeds": [ "10.0.2.1:9300" ]
}
}
-----------------------------------------------------------

View File

@ -27,7 +27,7 @@ followClusterTestCluster {
numNodes = 1
clusterName = 'follow-cluster'
setting 'xpack.license.self_generated.type', 'trial'
setting 'search.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\""
setting 'cluster.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\""
}
followClusterTestRunner {

View File

@ -44,7 +44,7 @@ followClusterTestCluster {
dependsOn leaderClusterTestRunner
numNodes = 1
clusterName = 'follow-cluster'
setting 'search.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\""
setting 'cluster.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\""
setting 'xpack.license.self_generated.type', 'trial'
setting 'xpack.security.enabled', 'true'
setting 'xpack.monitoring.enabled', 'false'

View File

@ -28,7 +28,7 @@ followClusterTestCluster {
numNodes = 1
clusterName = 'follow-cluster'
setting 'xpack.license.self_generated.type', 'trial'
setting 'search.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\""
setting 'cluster.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\""
}
followClusterTestRunner {

View File

@ -183,7 +183,7 @@ public class AuthorizationServiceTests extends ESTestCase {
rolesStore = mock(CompositeRolesStore.class);
clusterService = mock(ClusterService.class);
final Settings settings = Settings.builder()
.put("search.remote.other_cluster.seeds", "localhost:9999")
.put("cluster.remote.other_cluster.seeds", "localhost:9999")
.build();
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);

View File

@ -110,8 +110,8 @@ public class IndicesAndAliasesResolverTests extends ESTestCase {
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 2))
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(0, 1))
.put("search.remote.remote.seeds", "127.0.0.1:" + randomIntBetween(9301, 9350))
.put("search.remote.other_remote.seeds", "127.0.0.1:" + randomIntBetween(9351, 9399))
.put("cluster.remote.remote.seeds", "127.0.0.1:" + randomIntBetween(9301, 9350))
.put("cluster.remote.other_remote.seeds", "127.0.0.1:" + randomIntBetween(9351, 9399))
.build();
indexNameExpressionResolver = new IndexNameExpressionResolver(Settings.EMPTY);

View File

@ -15,7 +15,7 @@ task remoteClusterTest(type: RestIntegTestTask) {
remoteClusterTestCluster {
numNodes = 2
clusterName = 'remote-cluster'
setting 'search.remote.connect', false
setting 'cluster.remote.connect', false
setting 'xpack.security.enabled', 'true'
setting 'xpack.watcher.enabled', 'false'
setting 'xpack.monitoring.enabled', 'false'
@ -60,9 +60,9 @@ mixedClusterTestCluster {
retries: 10)
return tmpFile.exists()
}
setting 'search.remote.my_remote_cluster.seeds', "\"${-> remoteClusterTest.nodes.get(0).transportUri()}\""
setting 'search.remote.connections_per_cluster', 1
setting 'search.remote.connect', true
setting 'cluster.remote.my_remote_cluster.seeds', "\"${-> remoteClusterTest.nodes.get(0).transportUri()}\""
setting 'cluster.remote.connections_per_cluster', 1
setting 'cluster.remote.connect', true
}
mixedClusterTestRunner {

View File

@ -160,16 +160,16 @@ teardown:
cluster.get_settings:
include_defaults: true
- set: { defaults.search.remote.my_remote_cluster.seeds.0: remote_ip }
- set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip }
- do:
cluster.put_settings:
flat_settings: true
body:
transient:
search.remote.test_remote_cluster.seeds: $remote_ip
cluster.remote.test_remote_cluster.seeds: $remote_ip
- match: {transient: {search.remote.test_remote_cluster.seeds: $remote_ip}}
- match: {transient: {cluster.remote.test_remote_cluster.seeds: $remote_ip}}
- do:
headers: { Authorization: "Basic am9lOnMza3JpdA==" }

View File

@ -48,16 +48,16 @@ teardown:
cluster.get_settings:
include_defaults: true
- set: { defaults.search.remote.my_remote_cluster.seeds.0: remote_ip }
- set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip }
- do:
cluster.put_settings:
flat_settings: true
body:
transient:
search.remote.test_remote_cluster.seeds: $remote_ip
cluster.remote.test_remote_cluster.seeds: $remote_ip
- match: {transient: {search.remote.test_remote_cluster.seeds: $remote_ip}}
- match: {transient: {cluster.remote.test_remote_cluster.seeds: $remote_ip}}
# we do another search here since this will enforce the connection to be established
# otherwise the cluster might not have been connected yet.