Enable simple remote connection strategy (#49561)

This commit back ports three commits related to enabling the simple
connection strategy.

Allow simple connection strategy to be configured (#49066)

Currently the simple connection strategy only exists in the code. It
cannot be configured. This commit moves in the direction of allowing it
to be configured. It introduces settings for the addresses and socket
count. Additionally it introduces new settings for the sniff strategy
so that the more generic number of connections and seed node settings
can be deprecated.

The simple settings are not yet registered as the registration is
dependent on follow-up work to validate the settings.

Ensure at least 1 seed configured in remote test (#49389)

This fixes #49384. Currently when we select a random subset of seed
nodes from a list, it is possible for 0 seeds to be selected. This test
depends on at least 1 seed being selected.

Add the simple strategy to cluster settings (#49414)

This is related to #49067. This commit adds the simple connection
strategy settings and strategy mode setting to the cluster settings
registry. With these changes, the simple connection mode can be used.
Additionally, it adds validation to ensure that settings cannot be
misconfigured.
This commit is contained in:
Tim Brooks 2019-11-25 16:53:07 -07:00 committed by GitHub
parent 99e313695f
commit 416178c7c8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 1184 additions and 511 deletions

View File

@ -83,7 +83,7 @@ public class CCRIT extends ESRestHighLevelClientTestCase {
String transportAddress = (String) nodesResponse.get("transport_address");
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.transientSettings(Collections.singletonMap("cluster.remote.local_cluster.seeds", transportAddress));
updateSettingsRequest.transientSettings(Collections.singletonMap("cluster.remote.local_cluster.sniff.seeds", transportAddress));
ClusterUpdateSettingsResponse updateSettingsResponse =
highLevelClient().cluster().putSettings(updateSettingsRequest, RequestOptions.DEFAULT);
assertThat(updateSettingsResponse.isAcknowledged(), is(true));

View File

@ -87,7 +87,7 @@ public class CCRDocumentationIT extends ESRestHighLevelClientTestCase {
String transportAddress = (String) nodesResponse.get("transport_address");
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.transientSettings(Collections.singletonMap("cluster.remote.local.seeds", transportAddress));
updateSettingsRequest.transientSettings(Collections.singletonMap("cluster.remote.local.sniff.seeds", transportAddress));
ClusterUpdateSettingsResponse updateSettingsResponse =
client.cluster().putSettings(updateSettingsRequest, RequestOptions.DEFAULT);
assertThat(updateSettingsResponse.isAcknowledged(), is(true));

View File

@ -144,7 +144,7 @@ public class CrossClusterSearchUnavailableClusterIT extends ESRestTestCase {
try (MockTransportService remoteTransport = startTransport("node0", new CopyOnWriteArrayList<>(), Version.CURRENT, threadPool)) {
DiscoveryNode remoteNode = remoteTransport.getLocalDiscoNode();
updateRemoteClusterSettings(Collections.singletonMap("seeds", remoteNode.getAddress().toString()));
updateRemoteClusterSettings(Collections.singletonMap("sniff.seeds", remoteNode.getAddress().toString()));
for (int i = 0; i < 10; i++) {
restHighLevelClient.index(
@ -229,7 +229,7 @@ public class CrossClusterSearchUnavailableClusterIT extends ESRestTestCase {
assertSearchConnectFailure();
Map<String, Object> map = new HashMap<>();
map.put("seeds", null);
map.put("sniff.seeds", null);
map.put("skip_unavailable", null);
updateRemoteClusterSettings(map);
}
@ -248,32 +248,32 @@ public class CrossClusterSearchUnavailableClusterIT extends ESRestTestCase {
() -> client().performRequest(request));
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
assertThat(responseException.getMessage(),
containsString("missing required setting [cluster.remote.remote1.seeds] " +
containsString("missing required setting [cluster.remote.remote1.sniff.seeds] " +
"for setting [cluster.remote.remote1.skip_unavailable]"));
}
Map<String, Object> settingsMap = new HashMap<>();
settingsMap.put("seeds", remoteNode.getAddress().toString());
settingsMap.put("sniff.seeds", remoteNode.getAddress().toString());
settingsMap.put("skip_unavailable", randomBoolean());
updateRemoteClusterSettings(settingsMap);
{
//check that seeds cannot be reset alone if skip_unavailable is set
Request request = new Request("PUT", "/_cluster/settings");
request.setEntity(buildUpdateSettingsRequestBody(Collections.singletonMap("seeds", null)));
request.setEntity(buildUpdateSettingsRequestBody(Collections.singletonMap("sniff.seeds", null)));
ResponseException responseException = expectThrows(ResponseException.class,
() -> client().performRequest(request));
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
assertThat(responseException.getMessage(), containsString("missing required setting [cluster.remote.remote1.seeds] " +
assertThat(responseException.getMessage(), containsString("missing required setting [cluster.remote.remote1.sniff.seeds] " +
"for setting [cluster.remote.remote1.skip_unavailable]"));
}
if (randomBoolean()) {
updateRemoteClusterSettings(Collections.singletonMap("skip_unavailable", null));
updateRemoteClusterSettings(Collections.singletonMap("seeds", null));
updateRemoteClusterSettings(Collections.singletonMap("sniff.seeds", null));
} else {
Map<String, Object> nullMap = new HashMap<>();
nullMap.put("seeds", null);
nullMap.put("sniff.seeds", null);
nullMap.put("skip_unavailable", null);
updateRemoteClusterSettings(nullMap);
}

View File

@ -30,12 +30,13 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.SniffConnectionStrategy;
import java.io.IOException;
import java.util.Collections;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.transport.RemoteClusterAware.SEARCH_REMOTE_CLUSTERS_SEEDS;
import static org.elasticsearch.transport.SniffConnectionStrategy.SEARCH_REMOTE_CLUSTERS_SEEDS;
import static org.elasticsearch.transport.RemoteClusterService.SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE;
import static org.hamcrest.Matchers.equalTo;
@ -91,9 +92,9 @@ public class FullClusterRestartSettingsUpgradeIT extends AbstractFullClusterRest
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSettingForNamespace("foo").exists(settings));
assertTrue(RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSettingForNamespace("foo").get(settings));
assertFalse(SEARCH_REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace("foo").exists(settings));
assertTrue(RemoteClusterService.REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace("foo").exists(settings));
assertTrue(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace("foo").exists(settings));
assertThat(
RemoteClusterService.REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace("foo").get(settings),
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace("foo").get(settings),
equalTo(Collections.singletonList("localhost:9200")));
}
}

View File

@ -0,0 +1,212 @@
---
"Add transient remote cluster in simple mode with invalid sniff settings":
- do:
cluster.get_settings:
include_defaults: true
- set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip }
- do:
catch: bad_request
cluster.put_settings:
flat_settings: true
body:
transient:
cluster.remote.test_remote_cluster.mode: "simple"
cluster.remote.test_remote_cluster.sniff.node_connections: "5"
cluster.remote.test_remote_cluster.simple.addresses: $remote_ip
- match: { status: 400 }
- match: { error.root_cause.0.type: "illegal_argument_exception" }
- match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.sniff.node_connections\" cannot be
used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SNIFF, configured=SIMPLE]" }
- do:
catch: bad_request
cluster.put_settings:
flat_settings: true
body:
transient:
cluster.remote.test_remote_cluster.mode: "simple"
cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip
cluster.remote.test_remote_cluster.simple.addresses: $remote_ip
- match: { status: 400 }
- match: { error.root_cause.0.type: "illegal_argument_exception" }
- match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.sniff.seeds\" cannot be
used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SNIFF, configured=SIMPLE]" }
---
"Add transient remote cluster in sniff mode with invalid simple settings":
- do:
cluster.get_settings:
include_defaults: true
- set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip }
- do:
catch: bad_request
cluster.put_settings:
flat_settings: true
body:
transient:
cluster.remote.test_remote_cluster.simple.socket_connections: "20"
cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip
- match: { status: 400 }
- match: { error.root_cause.0.type: "illegal_argument_exception" }
- match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.simple.socket_connections\" cannot be
used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SIMPLE, configured=SNIFF]" }
- do:
catch: bad_request
cluster.put_settings:
flat_settings: true
body:
transient:
cluster.remote.test_remote_cluster.simple.addresses: $remote_ip
cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip
- match: { status: 400 }
- match: { error.root_cause.0.type: "illegal_argument_exception" }
- match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.simple.addresses\" cannot be
used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SIMPLE, configured=SNIFF]" }
---
"Add transient remote cluster using simple connection mode using valid settings":
- do:
cluster.get_settings:
include_defaults: true
- set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip }
- do:
cluster.put_settings:
flat_settings: true
body:
transient:
cluster.remote.test_remote_cluster.mode: "simple"
cluster.remote.test_remote_cluster.simple.socket_connections: "3"
cluster.remote.test_remote_cluster.simple.addresses: $remote_ip
- match: {transient.cluster\.remote\.test_remote_cluster\.mode: "simple"}
- match: {transient.cluster\.remote\.test_remote_cluster\.simple\.socket_connections: "3"}
- match: {transient.cluster\.remote\.test_remote_cluster\.simple\.addresses: $remote_ip}
- do:
search:
rest_total_hits_as_int: true
index: test_remote_cluster:test_index
- is_false: num_reduce_phases
- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 3 }
- match: { hits.total: 6 }
- match: { hits.hits.0._index: "test_remote_cluster:test_index" }
---
"Add transient remote cluster using sniff connection mode using valid settings":
- do:
cluster.get_settings:
include_defaults: true
- set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip }
- do:
cluster.put_settings:
flat_settings: true
body:
transient:
cluster.remote.test_remote_cluster.mode: "sniff"
cluster.remote.test_remote_cluster.sniff.node_connections: "3"
cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip
- match: {transient.cluster\.remote\.test_remote_cluster\.mode: "sniff"}
- match: {transient.cluster\.remote\.test_remote_cluster\.sniff\.node_connections: "3"}
- match: {transient.cluster\.remote\.test_remote_cluster\.sniff\.seeds: $remote_ip}
- do:
search:
rest_total_hits_as_int: true
index: test_remote_cluster:test_index
- is_false: num_reduce_phases
- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 3 }
- match: { hits.total: 6 }
- match: { hits.hits.0._index: "test_remote_cluster:test_index" }
---
"Switch connection mode for configured cluster":
- do:
cluster.get_settings:
include_defaults: true
- set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip }
- do:
cluster.put_settings:
flat_settings: true
body:
transient:
cluster.remote.test_remote_cluster.mode: "sniff"
cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip
- match: {transient.cluster\.remote\.test_remote_cluster\.mode: "sniff"}
- match: {transient.cluster\.remote\.test_remote_cluster\.sniff\.seeds: $remote_ip}
- do:
search:
rest_total_hits_as_int: true
index: test_remote_cluster:test_index
- is_false: num_reduce_phases
- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 3 }
- match: { hits.total: 6 }
- match: { hits.hits.0._index: "test_remote_cluster:test_index" }
- do:
catch: bad_request
cluster.put_settings:
flat_settings: true
body:
transient:
cluster.remote.test_remote_cluster.mode: "simple"
cluster.remote.test_remote_cluster.simple.addresses: $remote_ip
- match: { status: 400 }
- match: { error.root_cause.0.type: "illegal_argument_exception" }
- match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.sniff.seeds\" cannot be
used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SNIFF, configured=SIMPLE]" }
- do:
cluster.put_settings:
flat_settings: true
body:
transient:
cluster.remote.test_remote_cluster.mode: "simple"
cluster.remote.test_remote_cluster.sniff.seeds: null
cluster.remote.test_remote_cluster.simple.addresses: $remote_ip
- match: {transient.cluster\.remote\.test_remote_cluster\.mode: "simple"}
- match: {transient.cluster\.remote\.test_remote_cluster\.simple\.addresses: $remote_ip}
- do:
search:
rest_total_hits_as_int: true
index: test_remote_cluster:test_index
- is_false: num_reduce_phases
- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 3 }
- match: { hits.total: 6 }
- match: { hits.hits.0._index: "test_remote_cluster:test_index" }

View File

@ -108,8 +108,10 @@ import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.search.fetch.subphase.highlight.FastVectorHighlighter;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.RemoteConnectionStrategy;
import org.elasticsearch.transport.SimpleConnectionStrategy;
import org.elasticsearch.transport.SniffConnectionStrategy;
import org.elasticsearch.transport.TransportSettings;
import org.elasticsearch.watcher.ResourceWatcherService;
@ -303,14 +305,8 @@ public final class ClusterSettings extends AbstractScopedSettings {
SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS,
ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING,
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS,
RemoteClusterAware.SEARCH_REMOTE_CLUSTERS_SEEDS,
RemoteClusterAware.REMOTE_CLUSTERS_PROXY,
RemoteClusterAware.SEARCH_REMOTE_CLUSTERS_PROXY,
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE,
RemoteClusterService.SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE,
RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER,
RemoteClusterService.SEARCH_REMOTE_CONNECTIONS_PER_CLUSTER,
RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING,
RemoteClusterService.SEARCH_REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING,
RemoteClusterService.REMOTE_NODE_ATTRIBUTE,
@ -319,6 +315,17 @@ public final class ClusterSettings extends AbstractScopedSettings {
RemoteClusterService.SEARCH_ENABLE_REMOTE_CLUSTERS,
RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE,
RemoteClusterService.REMOTE_CLUSTER_COMPRESS,
RemoteConnectionStrategy.REMOTE_CONNECTION_MODE,
SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES,
SimpleConnectionStrategy.REMOTE_SOCKET_CONNECTIONS,
SniffConnectionStrategy.SEARCH_REMOTE_CLUSTERS_SEEDS,
SniffConnectionStrategy.SEARCH_REMOTE_CLUSTERS_PROXY,
SniffConnectionStrategy.SEARCH_REMOTE_CONNECTIONS_PER_CLUSTER,
SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER,
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD,
SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY,
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS,
SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS,
TransportCloseIndexAction.CLUSTER_INDICES_CLOSE_ENABLE_SETTING,
ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING,
NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING,
@ -520,8 +527,8 @@ public final class ClusterSettings extends AbstractScopedSettings {
DiscoveryUpgradeService.ENABLE_UNSAFE_BOOTSTRAPPING_ON_UPGRADE_SETTING)));
public static List<SettingUpgrader<?>> BUILT_IN_SETTING_UPGRADERS = Collections.unmodifiableList(Arrays.asList(
RemoteClusterAware.SEARCH_REMOTE_CLUSTER_SEEDS_UPGRADER,
RemoteClusterAware.SEARCH_REMOTE_CLUSTERS_PROXY_UPGRADER,
SniffConnectionStrategy.SEARCH_REMOTE_CLUSTER_SEEDS_UPGRADER,
SniffConnectionStrategy.SEARCH_REMOTE_CLUSTERS_PROXY_UPGRADER,
RemoteClusterService.SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE_UPGRADER));
}

View File

@ -54,6 +54,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.regex.Matcher;
@ -444,6 +445,7 @@ public class Setting<T> implements ToXContentObject {
}
validator.validate(parsed);
validator.validate(parsed, map);
validator.validate(parsed, map, exists(settings));
}
return parsed;
} catch (ElasticsearchParseException ex) {
@ -670,10 +672,11 @@ public class Setting<T> implements ToXContentObject {
public static class AffixSetting<T> extends Setting<T> {
private final AffixKey key;
private final Function<String, Setting<T>> delegateFactory;
private final BiFunction<String, String, Setting<T>> delegateFactory;
private final Set<AffixSetting> dependencies;
public AffixSetting(AffixKey key, Setting<T> delegate, Function<String, Setting<T>> delegateFactory, AffixSetting... dependencies) {
public AffixSetting(AffixKey key, Setting<T> delegate, BiFunction<String, String, Setting<T>> delegateFactory,
AffixSetting... dependencies) {
super(key, delegate.defaultValue, delegate.parser, delegate.properties.toArray(new Property[0]));
this.key = key;
this.delegateFactory = delegateFactory;
@ -688,6 +691,7 @@ public class Setting<T> implements ToXContentObject {
return settings.keySet().stream().filter(this::match).map(key::getConcreteString);
}
@Override
public Set<Setting<?>> getSettingsDependencies(String settingsKey) {
if (dependencies.isEmpty()) {
return Collections.emptySet();
@ -712,7 +716,7 @@ public class Setting<T> implements ToXContentObject {
final Map<AbstractScopedSettings.SettingUpdater<T>, T> result = new IdentityHashMap<>();
Stream.concat(matchStream(current), matchStream(previous)).distinct().forEach(aKey -> {
String namespace = key.getNamespace(aKey);
Setting<T> concreteSetting = getConcreteSetting(aKey);
Setting<T> concreteSetting = getConcreteSetting(namespace, aKey);
AbstractScopedSettings.SettingUpdater<T> updater =
concreteSetting.newUpdater((v) -> consumer.accept(namespace, v), logger,
(v) -> validator.accept(namespace, v));
@ -750,7 +754,7 @@ public class Setting<T> implements ToXContentObject {
final Map<String, T> result = new IdentityHashMap<>();
Stream.concat(matchStream(current), matchStream(previous)).distinct().forEach(aKey -> {
String namespace = key.getNamespace(aKey);
Setting<T> concreteSetting = getConcreteSetting(aKey);
Setting<T> concreteSetting = getConcreteSetting(namespace, aKey);
AbstractScopedSettings.SettingUpdater<T> updater =
concreteSetting.newUpdater((v) -> {}, logger, (v) -> validator.accept(namespace, v));
if (updater.hasChanged(current, previous)) {
@ -785,7 +789,16 @@ public class Setting<T> implements ToXContentObject {
@Override
public Setting<T> getConcreteSetting(String key) {
if (match(key)) {
return delegateFactory.apply(key);
String namespace = this.key.getNamespace(key);
return delegateFactory.apply(namespace, key);
} else {
throw new IllegalArgumentException("key [" + key + "] must match [" + getKey() + "] but didn't.");
}
}
private Setting<T> getConcreteSetting(String namespace, String key) {
if (match(key)) {
return delegateFactory.apply(namespace, key);
} else {
throw new IllegalArgumentException("key [" + key + "] must match [" + getKey() + "] but didn't.");
}
@ -796,7 +809,7 @@ public class Setting<T> implements ToXContentObject {
*/
public Setting<T> getConcreteSettingForNamespace(String namespace) {
String fullKey = key.toConcreteKey(namespace).toString();
return getConcreteSetting(fullKey);
return getConcreteSetting(namespace, fullKey);
}
@Override
@ -833,8 +846,9 @@ public class Setting<T> implements ToXContentObject {
public Map<String, T> getAsMap(Settings settings) {
Map<String, T> map = new HashMap<>();
matchStream(settings).distinct().forEach(key -> {
Setting<T> concreteSetting = getConcreteSetting(key);
map.put(getNamespace(concreteSetting), concreteSetting.get(settings));
String namespace = this.key.getNamespace(key);
Setting<T> concreteSetting = getConcreteSetting(namespace, key);
map.put(namespace, concreteSetting.get(settings));
});
return Collections.unmodifiableMap(map);
}
@ -842,9 +856,9 @@ public class Setting<T> implements ToXContentObject {
/**
* Represents a validator for a setting. The {@link #validate(Object)} method is invoked early in the update setting process with the
* value of this setting for a fail-fast validation. Later on, the {@link #validate(Object, Map)} method is invoked with the value of
* this setting and a map from the settings specified by {@link #settings()}} to their values. All these values come from the same
* {@link Settings} instance.
* value of this setting for a fail-fast validation. Later on, the {@link #validate(Object, Map)} and
* {@link #validate(Object, Map, boolean)} methods are invoked with the value of this setting and a map from the settings specified by
* {@link #settings()}} to their values. All these values come from the same {@link Settings} instance.
*
* @param <T> the type of the {@link Setting}
*/
@ -868,6 +882,18 @@ public class Setting<T> implements ToXContentObject {
default void validate(T value, Map<Setting<?>, Object> settings) {
}
/**
* Validate this setting against its dependencies, specified by {@link #settings()}. This method allows validation logic
* to evaluate whether the setting will be present in the {@link Settings} after the update. The default implementation
* does nothing, accepting any value as valid as long as it passes the validation in {@link #validate(Object)}.
*
* @param value the value of this setting
* @param settings a map from the settings specified by {@link #settings()}} to their values
* @param isPresent boolean indicating if this setting is present
*/
default void validate(T value, Map<Setting<?>, Object> settings, boolean isPresent) {
}
/**
* The settings on which the validity of this setting depends. The values of the specified settings are passed to
* {@link #validate(Object, Map)}. By default this returns an empty iterator, indicating that this setting does not depend on any
@ -1065,6 +1091,12 @@ public class Setting<T> implements ToXContentObject {
properties);
}
public static Setting<Integer> intSetting(String key, int defaultValue, int minValue, Validator<Integer> validator,
Property... properties) {
return new Setting<>(key, Integer.toString(defaultValue), (s) -> parseInt(s, minValue, key, isFiltered(properties)), validator,
properties);
}
public static Setting<Integer> intSetting(String key, Setting<Integer> fallbackSetting, int minValue, Property... properties) {
return new Setting<>(key, fallbackSetting, (s) -> parseInt(s, minValue, key, isFiltered(properties)), properties);
}
@ -1088,6 +1120,10 @@ public class Setting<T> implements ToXContentObject {
return new Setting<>(new SimpleKey(key), null, s -> "", Function.identity(), validator, properties);
}
public static Setting<String> simpleString(String key, Validator<String> validator, Setting<String> fallback, Property... properties) {
return new Setting<>(new SimpleKey(key), fallback, fallback::getRaw, Function.identity(), validator, properties);
}
public static Setting<String> simpleString(String key, String defaultValue, Validator<String> validator, Property... properties) {
validator.validate(defaultValue);
return new Setting<>(new SimpleKey(key), null, s -> defaultValue, Function.identity(), validator, properties);
@ -1316,6 +1352,15 @@ public class Setting<T> implements ToXContentObject {
return listSetting(key, null, singleValueParser, defaultStringValue, properties);
}
public static <T> Setting<List<T>> listSetting(
final String key,
final Function<String, T> singleValueParser,
final Function<Settings, List<String>> defaultStringValue,
final Validator<List<T>> validator,
final Property... properties) {
return listSetting(key, null, singleValueParser, defaultStringValue, validator, properties);
}
public static <T> Setting<List<T>> listSetting(
final String key,
final @Nullable Setting<List<T>> fallbackSetting,
@ -1325,7 +1370,7 @@ public class Setting<T> implements ToXContentObject {
return listSetting(key, fallbackSetting, singleValueParser, defaultStringValue, v -> {}, properties);
}
static <T> Setting<List<T>> listSetting(
public static <T> Setting<List<T>> listSetting(
final String key,
final @Nullable Setting<List<T>> fallbackSetting,
final Function<String, T> singleValueParser,
@ -1583,7 +1628,8 @@ public class Setting<T> implements ToXContentObject {
* {@link #getConcreteSetting(String)} is used to pull the updater.
*/
public static <T> AffixSetting<T> prefixKeySetting(String prefix, Function<String, Setting<T>> delegateFactory) {
return affixKeySetting(new AffixKey(prefix), delegateFactory);
BiFunction<String, String, Setting<T>> delegateFactoryWithNamespace = (ns, k) -> delegateFactory.apply(k);
return affixKeySetting(new AffixKey(prefix), delegateFactoryWithNamespace);
}
/**
@ -1593,12 +1639,19 @@ public class Setting<T> implements ToXContentObject {
*/
public static <T> AffixSetting<T> affixKeySetting(String prefix, String suffix, Function<String, Setting<T>> delegateFactory,
AffixSetting... dependencies) {
return affixKeySetting(new AffixKey(prefix, suffix), delegateFactory, dependencies);
BiFunction<String, String, Setting<T>> delegateFactoryWithNamespace = (ns, k) -> delegateFactory.apply(k);
return affixKeySetting(new AffixKey(prefix, suffix), delegateFactoryWithNamespace, dependencies);
}
private static <T> AffixSetting<T> affixKeySetting(AffixKey key, Function<String, Setting<T>> delegateFactory,
public static <T> AffixSetting<T> affixKeySetting(String prefix, String suffix, BiFunction<String, String, Setting<T>> delegateFactory,
AffixSetting... dependencies) {
Setting<T> delegate = delegateFactory.apply("_na_", "_na_");
return new AffixSetting<>(new AffixKey(prefix, suffix), delegate, delegateFactory, dependencies);
}
private static <T> AffixSetting<T> affixKeySetting(AffixKey key, BiFunction<String, String, Setting<T>> delegateFactory,
AffixSetting... dependencies) {
Setting<T> delegate = delegateFactory.apply("_na_");
Setting<T> delegate = delegateFactory.apply("_na_", "_na_");
return new AffixSetting<>(key, delegate, delegateFactory, dependencies);
}

View File

@ -19,153 +19,27 @@
package org.elasticsearch.transport;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.ClusterNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.SettingUpgrader;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Base class for all services and components that need up-to-date information about the registered remote clusters
*/
public abstract class RemoteClusterAware {
static {
// remove search.remote.* settings in 8.0.0
assert Version.CURRENT.major < 8;
}
public static final Setting.AffixSetting<List<String>> SEARCH_REMOTE_CLUSTERS_SEEDS =
Setting.affixKeySetting(
"search.remote.",
"seeds",
key -> Setting.listSetting(
key,
Collections.emptyList(),
s -> {
parsePort(s);
return s;
},
Setting.Property.Deprecated,
Setting.Property.Dynamic,
Setting.Property.NodeScope));
public static final SettingUpgrader<List<String>> SEARCH_REMOTE_CLUSTER_SEEDS_UPGRADER = new SettingUpgrader<List<String>>() {
@Override
public Setting<List<String>> getSetting() {
return SEARCH_REMOTE_CLUSTERS_SEEDS;
}
@Override
public String getKey(final String key) {
return key.replaceFirst("^search", "cluster");
}
};
/**
* A list of initial seed nodes to discover eligible nodes from the remote cluster
*/
public static final Setting.AffixSetting<List<String>> REMOTE_CLUSTERS_SEEDS = Setting.affixKeySetting(
"cluster.remote.",
"seeds",
key -> Setting.listSetting(
key,
// the default needs to be emptyList() when fallback is removed
"_na_".equals(key)
? SEARCH_REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(key)
: SEARCH_REMOTE_CLUSTERS_SEEDS.getConcreteSetting(key.replaceAll("^cluster", "search")),
s -> {
// validate seed address
parsePort(s);
return s;
},
Setting.Property.Dynamic,
Setting.Property.NodeScope));
public static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':';
public static final String LOCAL_CLUSTER_GROUP_KEY = "";
public static final Setting.AffixSetting<String> SEARCH_REMOTE_CLUSTERS_PROXY = Setting.affixKeySetting(
"search.remote.",
"proxy",
key -> Setting.simpleString(
key,
s -> {
if (Strings.hasLength(s)) {
parsePort(s);
}
},
Setting.Property.Deprecated,
Setting.Property.Dynamic,
Setting.Property.NodeScope),
REMOTE_CLUSTERS_SEEDS);
public static final SettingUpgrader<String> SEARCH_REMOTE_CLUSTERS_PROXY_UPGRADER = new SettingUpgrader<String>() {
@Override
public Setting<String> getSetting() {
return SEARCH_REMOTE_CLUSTERS_PROXY;
}
@Override
public String getKey(final String key) {
return key.replaceFirst("^search", "cluster");
}
};
/**
* A proxy address for the remote cluster. By default this is not set, meaning that Elasticsearch will connect directly to the nodes in
* the remote cluster using their publish addresses. If this setting is set to an IP address or hostname then Elasticsearch will connect
* to the nodes in the remote cluster using this address instead. Use of this setting is not recommended and it is deliberately
* undocumented as it does not work well with all proxies.
*/
public static final Setting.AffixSetting<String> REMOTE_CLUSTERS_PROXY = Setting.affixKeySetting(
"cluster.remote.",
"proxy",
key -> Setting.simpleString(
key,
// no default is needed when fallback is removed, use simple string which gives empty
"_na_".equals(key)
? SEARCH_REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(key)
: SEARCH_REMOTE_CLUSTERS_PROXY.getConcreteSetting(key.replaceAll("^cluster", "search")),
s -> {
if (Strings.hasLength(s)) {
parsePort(s);
}
return s;
},
Setting.Property.Dynamic,
Setting.Property.NodeScope),
REMOTE_CLUSTERS_SEEDS);
protected final Settings settings;
private final ClusterNameExpressionResolver clusterNameResolver;
@ -183,71 +57,7 @@ public abstract class RemoteClusterAware {
* Returns remote clusters that are enabled in these settings
*/
protected static Set<String> getEnabledRemoteClusters(final Settings settings) {
final Stream<Setting<List<String>>> allConcreteSettings = REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings);
return allConcreteSettings
.map(REMOTE_CLUSTERS_SEEDS::getNamespace)
.filter(clusterAlias -> RemoteConnectionStrategy.isConnectionEnabled(clusterAlias, settings))
.collect(Collectors.toSet());
}
/**
* Builds the dynamic per-cluster config from the given settings. This is a map keyed by the cluster alias that points to a tuple
* (ProxyAddresss, [SeedNodeSuppliers]). If a cluster is configured with a proxy address all seed nodes will point to
* {@link TransportAddress#META_ADDRESS} and their configured address will be used as the hostname for the generated discovery node.
*/
protected static Map<String, Tuple<String, List<Tuple<String, Supplier<DiscoveryNode>>>>> buildRemoteClustersDynamicConfig(
final Settings settings) {
final Map<String, Tuple<String, List<Tuple<String, Supplier<DiscoveryNode>>>>> remoteSeeds =
buildRemoteClustersDynamicConfig(settings, REMOTE_CLUSTERS_SEEDS);
final Map<String, Tuple<String, List<Tuple<String, Supplier<DiscoveryNode>>>>> searchRemoteSeeds =
buildRemoteClustersDynamicConfig(settings, SEARCH_REMOTE_CLUSTERS_SEEDS);
// sort the intersection for predictable output order
final NavigableSet<String> intersection =
new TreeSet<>(Arrays.asList(
searchRemoteSeeds.keySet().stream().filter(s -> remoteSeeds.keySet().contains(s)).sorted().toArray(String[]::new)));
if (intersection.isEmpty() == false) {
final String message = String.format(
Locale.ROOT,
"found duplicate remote cluster configurations for cluster alias%s [%s]",
intersection.size() == 1 ? "" : "es",
String.join(",", intersection));
throw new IllegalArgumentException(message);
}
return Stream
.concat(remoteSeeds.entrySet().stream(), searchRemoteSeeds.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
private static Map<String, Tuple<String, List<Tuple<String, Supplier<DiscoveryNode>>>>> buildRemoteClustersDynamicConfig(
final Settings settings, final Setting.AffixSetting<List<String>> seedsSetting) {
final Stream<Setting<List<String>>> allConcreteSettings = seedsSetting.getAllConcreteSettings(settings);
return allConcreteSettings.collect(
Collectors.toMap(seedsSetting::getNamespace, concreteSetting -> {
String clusterName = seedsSetting.getNamespace(concreteSetting);
List<String> addresses = concreteSetting.get(settings);
final boolean proxyMode =
REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterName).existsOrFallbackExists(settings);
List<Tuple<String, Supplier<DiscoveryNode>>> nodes = new ArrayList<>(addresses.size());
for (String address : addresses) {
nodes.add(Tuple.tuple(address, () -> buildSeedNode(clusterName, address, proxyMode)));
}
return new Tuple<>(REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterName).get(settings), nodes);
}));
}
static DiscoveryNode buildSeedNode(String clusterName, String address, boolean proxyMode) {
if (proxyMode) {
TransportAddress transportAddress = new TransportAddress(TransportAddress.META_ADDRESS, 0);
String hostName = address.substring(0, indexOfPortSeparator(address));
return new DiscoveryNode("", clusterName + "#" + address, UUIDs.randomBase64UUID(), hostName, address,
transportAddress, Collections.singletonMap("server_name", hostName), DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT.minimumCompatibilityVersion());
} else {
TransportAddress transportAddress = new TransportAddress(RemoteClusterAware.parseSeedAddress(address));
return new DiscoveryNode(clusterName + "#" + transportAddress.toString(),
transportAddress,
Version.CURRENT.minimumCompatibilityVersion());
}
return RemoteConnectionStrategy.getRemoteClusters(settings);
}
/**
@ -310,53 +120,21 @@ public abstract class RemoteClusterAware {
* Registers this instance to listen to updates on the cluster settings.
*/
public void listenForUpdates(ClusterSettings clusterSettings) {
List<Setting.AffixSetting<?>> remoteClusterSettings = Arrays.asList(RemoteClusterAware.REMOTE_CLUSTERS_PROXY,
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS, RemoteClusterService.REMOTE_CLUSTER_COMPRESS,
RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE, RemoteClusterAware.SEARCH_REMOTE_CLUSTERS_PROXY,
RemoteClusterAware.SEARCH_REMOTE_CLUSTERS_SEEDS);
List<Setting.AffixSetting<?>> remoteClusterSettings = Arrays.asList(
RemoteClusterService.REMOTE_CLUSTER_COMPRESS,
RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE,
RemoteConnectionStrategy.REMOTE_CONNECTION_MODE,
SniffConnectionStrategy.SEARCH_REMOTE_CLUSTERS_SEEDS,
SniffConnectionStrategy.SEARCH_REMOTE_CLUSTERS_PROXY,
SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY,
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS,
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD,
SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS,
SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES,
SimpleConnectionStrategy.REMOTE_SOCKET_CONNECTIONS);
clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::validateAndUpdateRemoteCluster);
}
static InetSocketAddress parseSeedAddress(String remoteHost) {
final Tuple<String, Integer> hostPort = parseHostPort(remoteHost);
final String host = hostPort.v1();
assert hostPort.v2() != null : remoteHost;
final int port = hostPort.v2();
InetAddress hostAddress;
try {
hostAddress = InetAddress.getByName(host);
} catch (UnknownHostException e) {
throw new IllegalArgumentException("unknown host [" + host + "]", e);
}
return new InetSocketAddress(hostAddress, port);
}
public static Tuple<String, Integer> parseHostPort(final String remoteHost) {
final String host = remoteHost.substring(0, indexOfPortSeparator(remoteHost));
final int port = parsePort(remoteHost);
return Tuple.tuple(host, port);
}
private static int parsePort(String remoteHost) {
try {
int port = Integer.valueOf(remoteHost.substring(indexOfPortSeparator(remoteHost) + 1));
if (port <= 0) {
throw new IllegalArgumentException("port number must be > 0 but was: [" + port + "]");
}
return port;
} catch (NumberFormatException e) {
throw new IllegalArgumentException("failed to parse port", e);
}
}
private static int indexOfPortSeparator(String remoteHost) {
int portSeparator = remoteHost.lastIndexOf(':'); // in case we have a IPv6 address ie. [::1]:9300
if (portSeparator == -1 || portSeparator == remoteHost.length()) {
throw new IllegalArgumentException("remote hosts need to be configured as [host:port], found [" + remoteHost + "] instead");
}
return portSeparator;
}
public static String buildRemoteIndexName(String clusterAlias, String indexName) {
return clusterAlias == null || LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)
? indexName : clusterAlias + REMOTE_CLUSTER_INDEX_SEPARATOR + indexName;

View File

@ -47,7 +47,7 @@ import java.util.function.Function;
* in the remote cluster and connects to all eligible nodes, for details see {@link RemoteClusterService#REMOTE_NODE_ATTRIBUTE}.
*
* In the case of a disconnection, this class will issue a re-connect task to establish at most
* {@link RemoteClusterService#REMOTE_CONNECTIONS_PER_CLUSTER} until either all eligible nodes are exhausted or the maximum number of
* {@link SniffConnectionStrategy#REMOTE_CONNECTIONS_PER_CLUSTER} until either all eligible nodes are exhausted or the maximum number of
* connections per cluster has been reached.
*/
final class RemoteClusterConnection implements Closeable {
@ -68,7 +68,7 @@ final class RemoteClusterConnection implements Closeable {
*/
RemoteClusterConnection(Settings settings, String clusterAlias, TransportService transportService) {
this(settings, clusterAlias, transportService,
createConnectionManager(buildConnectionProfileFromSettings(settings, clusterAlias), transportService));
createConnectionManager(RemoteConnectionStrategy.buildConnectionProfile(clusterAlias, settings), transportService));
}
RemoteClusterConnection(Settings settings, String clusterAlias, TransportService transportService,
@ -76,7 +76,7 @@ final class RemoteClusterConnection implements Closeable {
this.transportService = transportService;
this.clusterAlias = clusterAlias;
this.remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
this.connectionStrategy = new SniffConnectionStrategy(clusterAlias, transportService, remoteConnectionManager, settings);
this.connectionStrategy = RemoteConnectionStrategy.buildStrategy(clusterAlias, transportService, remoteConnectionManager, settings);
// we register the transport service here as a listener to make sure we notify handlers on disconnect etc.
connectionManager.addListener(transportService);
this.skipUnavailable = RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE
@ -238,21 +238,7 @@ final class RemoteClusterConnection implements Closeable {
return remoteConnectionManager.getConnectionManager();
}
public boolean shouldRebuildConnection(Settings newSettings) {
boolean shouldRebuildConnection(Settings newSettings) {
return connectionStrategy.shouldRebuildConnection(newSettings);
}
static ConnectionProfile buildConnectionProfileFromSettings(Settings settings, String clusterName) {
return new ConnectionProfile.Builder()
.setConnectTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings))
.setHandshakeTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings))
.addConnections(6, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING) // TODO make this configurable?
// we don't want this to be used for anything else but search
.addConnections(0, TransportRequestOptions.Type.BULK,
TransportRequestOptions.Type.STATE,
TransportRequestOptions.Type.RECOVERY)
.setCompressionEnabled(RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterName).get(settings))
.setPingInterval(RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace(clusterName).get(settings))
.build();
}
}

View File

@ -72,20 +72,6 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
assert Version.CURRENT.major < 8;
}
public static final Setting<Integer> SEARCH_REMOTE_CONNECTIONS_PER_CLUSTER =
Setting.intSetting("search.remote.connections_per_cluster", 3, 1, Setting.Property.NodeScope, Setting.Property.Deprecated);
/**
* The maximum number of connections that will be established to a remote cluster. For instance if there is only a single
* seed node, other nodes will be discovered up to the given number of nodes in this setting. The default is 3.
*/
public static final Setting<Integer> REMOTE_CONNECTIONS_PER_CLUSTER =
Setting.intSetting(
"cluster.remote.connections_per_cluster",
SEARCH_REMOTE_CONNECTIONS_PER_CLUSTER, // the default needs to three when fallback is removed
1,
Setting.Property.NodeScope);
public static final Setting<TimeValue> SEARCH_REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING =
Setting.positiveTimeSetting(
"search.remote.initial_connect_timeout",
@ -137,7 +123,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
"search.remote.",
"skip_unavailable",
key -> boolSetting(key, false, Setting.Property.Deprecated, Setting.Property.Dynamic, Setting.Property.NodeScope),
REMOTE_CLUSTERS_SEEDS);
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS);
public static final SettingUpgrader<Boolean> SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE_UPGRADER = new SettingUpgrader<Boolean>() {
@ -165,19 +151,19 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
: SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSetting(key.replaceAll("^cluster", "search")),
Setting.Property.Dynamic,
Setting.Property.NodeScope),
REMOTE_CLUSTERS_SEEDS);
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS);
public static final Setting.AffixSetting<TimeValue> REMOTE_CLUSTER_PING_SCHEDULE = Setting.affixKeySetting(
"cluster.remote.",
"transport.ping_schedule",
key -> timeSetting(key, TransportSettings.PING_SCHEDULE, Setting.Property.Dynamic, Setting.Property.NodeScope),
REMOTE_CLUSTERS_SEEDS);
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS);
public static final Setting.AffixSetting<Boolean> REMOTE_CLUSTER_COMPRESS = Setting.affixKeySetting(
"cluster.remote.",
"transport.compress",
key -> boolSetting(key, TransportSettings.TRANSPORT_COMPRESS, Setting.Property.Dynamic, Setting.Property.NodeScope),
REMOTE_CLUSTERS_SEEDS);
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS);
private final TransportService transportService;
private final Map<String, RemoteClusterConnection> remoteClusters = ConcurrentCollections.newConcurrentMap();
@ -290,22 +276,18 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
@Override
protected void updateRemoteCluster(String clusterAlias, Settings settings) {
if (remoteClusters.containsKey(clusterAlias) == false) {
CountDownLatch latch = new CountDownLatch(1);
updateRemoteCluster(clusterAlias, settings, ActionListener.wrap(latch::countDown));
CountDownLatch latch = new CountDownLatch(1);
updateRemoteCluster(clusterAlias, settings, ActionListener.wrap(latch::countDown));
try {
// Wait 10 seconds for a new cluster. We must use a latch instead of a future because we
// are on the cluster state thread and our custom future implementation will throw an
// assertion.
if (latch.await(10, TimeUnit.SECONDS) == false) {
logger.warn("failed to connect to new remote cluster {} within {}", clusterAlias, TimeValue.timeValueSeconds(10));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
try {
// Wait 10 seconds for a connections. We must use a latch instead of a future because we
// are on the cluster state thread and our custom future implementation will throw an
// assertion.
if (latch.await(10, TimeUnit.SECONDS) == false) {
logger.warn("failed to connect to new remote cluster {} within {}", clusterAlias, TimeValue.timeValueSeconds(10));
}
} else {
updateRemoteCluster(clusterAlias, settings, noopListener);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@ -333,13 +315,14 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
return;
}
// this is a new cluster we have to add a new representation
if (remote == null) {
// this is a new cluster we have to add a new representation
Settings finalSettings = Settings.builder().put(this.settings, false).put(newSettings, false).build();
remote = new RemoteClusterConnection(finalSettings, clusterAlias, transportService);
remoteClusters.put(clusterAlias, remote);
remote.ensureConnected(listener);
} else if (remote.shouldRebuildConnection(newSettings)) {
// New ConnectionProfile. Must tear down existing connection
// Changes to connection configuration. Must tear down existing connection
try {
IOUtils.close(remote);
} catch (IOException e) {
@ -349,9 +332,11 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
Settings finalSettings = Settings.builder().put(this.settings, false).put(newSettings, false).build();
remote = new RemoteClusterConnection(finalSettings, clusterAlias, transportService);
remoteClusters.put(clusterAlias, remote);
remote.ensureConnected(listener);
} else {
// No changes to connection configuration.
listener.onResponse(null);
}
remote.ensureConnected(listener);
}
/**

View File

@ -126,7 +126,7 @@ public final class RemoteConnectionInfo implements ToXContentFragment, Writeable
.stream()
.map(
s -> {
final Tuple<String, Integer> hostPort = RemoteClusterAware.parseHostPort(s);
final Tuple<String, Integer> hostPort = RemoteConnectionStrategy.parseHostPort(s);
assert hostPort.v2() != null : s;
try {
return new TransportAddress(

View File

@ -26,6 +26,7 @@ import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -33,20 +34,43 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public abstract class RemoteConnectionStrategy implements TransportConnectionListener, Closeable {
enum ConnectionStrategy {
SNIFF,
SIMPLE
SNIFF(SniffConnectionStrategy.CHANNELS_PER_CONNECTION, SniffConnectionStrategy::enablementSettings),
SIMPLE(SimpleConnectionStrategy.CHANNELS_PER_CONNECTION, SimpleConnectionStrategy::enablementSettings);
private final int numberOfChannels;
private final Supplier<Stream<Setting.AffixSetting<?>>> enablementSettings;
ConnectionStrategy(int numberOfChannels, Supplier<Stream<Setting.AffixSetting<?>>> enablementSettings) {
this.numberOfChannels = numberOfChannels;
this.enablementSettings = enablementSettings;
}
public int getNumberOfChannels() {
return numberOfChannels;
}
}
public static final Setting.AffixSetting<ConnectionStrategy> REMOTE_CONNECTION_MODE = Setting.affixKeySetting(
@ -54,6 +78,7 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis
key,
ConnectionStrategy.SNIFF.name(),
value -> ConnectionStrategy.valueOf(value.toUpperCase(Locale.ROOT)),
Setting.Property.NodeScope,
Setting.Property.Dynamic));
@ -75,6 +100,96 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis
connectionManager.getConnectionManager().addListener(this);
}
static ConnectionProfile buildConnectionProfile(String clusterAlias, Settings settings) {
ConnectionStrategy mode = REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).get(settings);
ConnectionProfile.Builder builder = new ConnectionProfile.Builder()
.setConnectTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings))
.setHandshakeTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings))
.setCompressionEnabled(RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterAlias).get(settings))
.setPingInterval(RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace(clusterAlias).get(settings))
.addConnections(0, TransportRequestOptions.Type.BULK, TransportRequestOptions.Type.STATE,
TransportRequestOptions.Type.RECOVERY)
// TODO: Evaluate if we actually need PING channels?
.addConnections(mode.numberOfChannels, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING);
return builder.build();
}
static RemoteConnectionStrategy buildStrategy(String clusterAlias, TransportService transportService,
RemoteConnectionManager connectionManager, Settings settings) {
ConnectionStrategy mode = REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).get(settings);
switch (mode) {
case SNIFF:
return new SniffConnectionStrategy(clusterAlias, transportService, connectionManager, settings);
case SIMPLE:
return new SimpleConnectionStrategy(clusterAlias, transportService, connectionManager, settings);
default:
throw new AssertionError("Invalid connection strategy" + mode);
}
}
static Set<String> getRemoteClusters(Settings settings) {
final Stream<Setting.AffixSetting<?>> enablementSettings = Arrays.stream(ConnectionStrategy.values())
.flatMap(strategy -> strategy.enablementSettings.get());
return enablementSettings.flatMap(s -> getClusterAlias(settings, s)).collect(Collectors.toSet());
}
public static boolean isConnectionEnabled(String clusterAlias, Settings settings) {
ConnectionStrategy mode = REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).get(settings);
if (mode.equals(ConnectionStrategy.SNIFF)) {
List<String> seeds = SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings);
return seeds.isEmpty() == false;
} else {
List<String> addresses = SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias)
.get(settings);
return addresses.isEmpty() == false;
}
}
private static <T> Stream<String> getClusterAlias(Settings settings, Setting.AffixSetting<T> affixSetting) {
Stream<Setting<T>> allConcreteSettings = affixSetting.getAllConcreteSettings(settings);
return allConcreteSettings.map(affixSetting::getNamespace);
}
static InetSocketAddress parseSeedAddress(String remoteHost) {
final Tuple<String, Integer> hostPort = parseHostPort(remoteHost);
final String host = hostPort.v1();
assert hostPort.v2() != null : remoteHost;
final int port = hostPort.v2();
InetAddress hostAddress;
try {
hostAddress = InetAddress.getByName(host);
} catch (UnknownHostException e) {
throw new IllegalArgumentException("unknown host [" + host + "]", e);
}
return new InetSocketAddress(hostAddress, port);
}
static Tuple<String, Integer> parseHostPort(final String remoteHost) {
final String host = remoteHost.substring(0, indexOfPortSeparator(remoteHost));
final int port = parsePort(remoteHost);
return Tuple.tuple(host, port);
}
static int parsePort(String remoteHost) {
try {
int port = Integer.valueOf(remoteHost.substring(indexOfPortSeparator(remoteHost) + 1));
if (port <= 0) {
throw new IllegalArgumentException("port number must be > 0 but was: [" + port + "]");
}
return port;
} catch (NumberFormatException e) {
throw new IllegalArgumentException("failed to parse port", e);
}
}
private static int indexOfPortSeparator(String remoteHost) {
int portSeparator = remoteHost.lastIndexOf(':'); // in case we have a IPv6 address ie. [::1]:9300
if (portSeparator == -1 || portSeparator == remoteHost.length()) {
throw new IllegalArgumentException("remote hosts need to be configured as [host:port], found [" + remoteHost + "] instead");
}
return portSeparator;
}
/**
* Triggers a connect round unless there is one running already. If there is a connect round running, the listener will either
* be queued or rejected and failed.
@ -129,16 +244,6 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis
}
}
public static boolean isConnectionEnabled(String clusterAlias, Settings settings) {
ConnectionStrategy mode = REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).get(settings);
if (mode.equals(ConnectionStrategy.SNIFF)) {
List<String> seeds = RemoteClusterAware.REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings);
return seeds.isEmpty() == false;
} else {
return false;
}
}
boolean shouldRebuildConnection(Settings newSettings) {
ConnectionStrategy newMode = REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
if (newMode.equals(strategyType()) == false) {
@ -222,4 +327,45 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis
return Objects.equals(oldProfile.getCompressionEnabled(), newProfile.getCompressionEnabled()) == false
|| Objects.equals(oldProfile.getPingInterval(), newProfile.getPingInterval()) == false;
}
static class StrategyValidator<T> implements Setting.Validator<T> {
private final String key;
private final ConnectionStrategy expectedStrategy;
private final String namespace;
private final Consumer<T> valueChecker;
StrategyValidator(String namespace, String key, ConnectionStrategy expectedStrategy) {
this(namespace, key, expectedStrategy, (v) -> {});
}
StrategyValidator(String namespace, String key, ConnectionStrategy expectedStrategy, Consumer<T> valueChecker) {
this.namespace = namespace;
this.key = key;
this.expectedStrategy = expectedStrategy;
this.valueChecker = valueChecker;
}
@Override
public void validate(T value) {
valueChecker.accept(value);
}
@Override
public void validate(T value, Map<Setting<?>, Object> settings, boolean isPresent) {
Setting<ConnectionStrategy> concrete = REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(namespace);
ConnectionStrategy modeType = (ConnectionStrategy) settings.get(concrete);
if (isPresent && modeType.equals(expectedStrategy) == false) {
throw new IllegalArgumentException("Setting \"" + key + "\" cannot be used with the configured \"" + concrete.getKey()
+ "\" [required=" + expectedStrategy.name() + ", configured=" + modeType.name() + "]");
}
}
@Override
public Iterator<Setting<?>> settings() {
Setting<ConnectionStrategy> concrete = REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(namespace);
Stream<Setting<?>> settingStream = Stream.of(concrete);
return settingStream.iterator();
}
}
}

View File

@ -26,24 +26,53 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.CountDown;
import java.util.Iterator;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.elasticsearch.common.settings.Setting.intSetting;
public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
/**
* A list of addresses for remote cluster connections. The connections will be opened to the configured addresses in a round-robin
* fashion.
*/
public static final Setting.AffixSetting<List<String>> REMOTE_CLUSTER_ADDRESSES = Setting.affixKeySetting(
"cluster.remote.",
"simple.addresses",
(ns, key) -> Setting.listSetting(key, Collections.emptyList(), s -> {
// validate address
parsePort(s);
return s;
}, new StrategyValidator<>(ns, key, ConnectionStrategy.SIMPLE),
Setting.Property.Dynamic, Setting.Property.NodeScope));
/**
* The maximum number of socket connections that will be established to a remote cluster. The default is 18.
*/
public static final Setting.AffixSetting<Integer> REMOTE_SOCKET_CONNECTIONS = Setting.affixKeySetting(
"cluster.remote.",
"simple.socket_connections",
(ns, key) -> intSetting(key, 18, 1, new StrategyValidator<>(ns, key, ConnectionStrategy.SIMPLE),
Setting.Property.Dynamic, Setting.Property.NodeScope));
static final int CHANNELS_PER_CONNECTION = 1;
private static final int MAX_CONNECT_ATTEMPTS_PER_RUN = 3;
private static final Logger logger = LogManager.getLogger(SimpleConnectionStrategy.class);
private final int maxNumRemoteConnections;
private final int maxNumConnections;
private final AtomicLong counter = new AtomicLong(0);
private final List<Supplier<TransportAddress>> addresses;
private final AtomicReference<ClusterName> remoteClusterName = new AtomicReference<>();
@ -51,9 +80,26 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
private final ConnectionManager.ConnectionValidator clusterNameValidator;
SimpleConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
int maxNumRemoteConnections, List<Supplier<TransportAddress>> addresses) {
Settings settings) {
this(
clusterAlias,
transportService,
connectionManager,
REMOTE_SOCKET_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(settings),
REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(settings));
}
SimpleConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
int maxNumConnections, List<String> configuredAddresses) {
this(clusterAlias, transportService, connectionManager, maxNumConnections, configuredAddresses,
configuredAddresses.stream().map(address ->
(Supplier<TransportAddress>) () -> resolveAddress(address)).collect(Collectors.toList()));
}
SimpleConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
int maxNumConnections, List<String> configuredAddresses, List<Supplier<TransportAddress>> addresses) {
super(clusterAlias, transportService, connectionManager);
this.maxNumRemoteConnections = maxNumRemoteConnections;
this.maxNumConnections = maxNumConnections;
assert addresses.isEmpty() == false : "Cannot use simple connection strategy with no configured addresses";
this.addresses = addresses;
// TODO: Move into the ConnectionManager
@ -77,9 +123,13 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
}));
}
static Stream<Setting.AffixSetting<?>> enablementSettings() {
return Stream.of(SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES);
}
@Override
protected boolean shouldOpenMoreConnections() {
return connectionManager.size() < maxNumRemoteConnections;
return connectionManager.size() < maxNumConnections;
}
@Override
@ -94,10 +144,10 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
@Override
protected void connectImpl(ActionListener<Void> listener) {
performSimpleConnectionProcess(addresses.iterator(), listener);
performSimpleConnectionProcess(listener);
}
private void performSimpleConnectionProcess(Iterator<Supplier<TransportAddress>> addressIter, ActionListener<Void> listener) {
private void performSimpleConnectionProcess(ActionListener<Void> listener) {
openConnections(listener, 1);
}
@ -105,7 +155,7 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
if (attemptNumber <= MAX_CONNECT_ATTEMPTS_PER_RUN) {
List<TransportAddress> resolved = addresses.stream().map(Supplier::get).collect(Collectors.toList());
int remaining = maxNumRemoteConnections - connectionManager.size();
int remaining = maxNumConnections - connectionManager.size();
ActionListener<Void> compositeListener = new ActionListener<Void>() {
private final AtomicInteger successfulConnections = new AtomicInteger(0);
@ -158,7 +208,7 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
+ "]"));
} else {
logger.debug("unable to open maximum number of connections [remote cluster: {}, opened: {}, maximum: {}]", clusterAlias,
openConnections, maxNumRemoteConnections);
openConnections, maxNumConnections);
finished.onResponse(null);
}
}
@ -169,4 +219,8 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
while ((curr = counter.getAndIncrement()) == Long.MIN_VALUE) ;
return resolvedAddresses.get(Math.toIntExact(Math.floorMod(curr, (long) resolvedAddresses.size())));
}
private static TransportAddress resolveAddress(String address) {
return new TransportAddress(parseSeedAddress(address));
}
}

View File

@ -33,8 +33,11 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.SettingUpgrader;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.ThreadContext;
@ -53,9 +56,164 @@ import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.elasticsearch.common.settings.Setting.intSetting;
public class SniffConnectionStrategy extends RemoteConnectionStrategy {
static {
// remove search.remote.* settings in 8.0.0
assert Version.CURRENT.major < 8;
}
public static final Setting.AffixSetting<List<String>> SEARCH_REMOTE_CLUSTERS_SEEDS =
Setting.affixKeySetting(
"search.remote.",
"seeds",
key -> Setting.listSetting(
key,
Collections.emptyList(),
s -> {
parsePort(s);
return s;
},
Setting.Property.Deprecated,
Setting.Property.Dynamic,
Setting.Property.NodeScope));
public static final SettingUpgrader<List<String>> SEARCH_REMOTE_CLUSTER_SEEDS_UPGRADER = new SettingUpgrader<List<String>>() {
@Override
public Setting<List<String>> getSetting() {
return SEARCH_REMOTE_CLUSTERS_SEEDS;
}
@Override
public String getKey(final String key) {
return key.replaceFirst("^search", "cluster");
}
};
/**
* A list of initial seed nodes to discover eligible nodes from the remote cluster
*/
public static final Setting.AffixSetting<List<String>> REMOTE_CLUSTER_SEEDS_OLD = Setting.affixKeySetting(
"cluster.remote.",
"seeds",
(ns, key) -> Setting.listSetting(
key,
// the default needs to be emptyList() when fallback is removed
SEARCH_REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(ns),
s -> {
// validate seed address
parsePort(s);
return s;
},
s -> SEARCH_REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(ns).get(s),
new StrategyValidator<>(ns, key, ConnectionStrategy.SNIFF),
Setting.Property.Dynamic,
Setting.Property.NodeScope));
/**
* A list of initial seed nodes to discover eligible nodes from the remote cluster
*/
public static final Setting.AffixSetting<List<String>> REMOTE_CLUSTER_SEEDS = Setting.affixKeySetting(
"cluster.remote.",
"sniff.seeds",
(ns, key) -> Setting.listSetting(key,
REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace(ns),
s -> {
// validate seed address
parsePort(s);
return s;
},
s -> REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace(ns).get(s),
new StrategyValidator<>(ns, key, ConnectionStrategy.SNIFF),
Setting.Property.Dynamic,
Setting.Property.NodeScope));
public static final Setting.AffixSetting<String> SEARCH_REMOTE_CLUSTERS_PROXY = Setting.affixKeySetting(
"search.remote.",
"proxy",
key -> Setting.simpleString(
key,
s -> {
if (Strings.hasLength(s)) {
parsePort(s);
}
},
Setting.Property.Deprecated,
Setting.Property.Dynamic,
Setting.Property.NodeScope),
REMOTE_CLUSTER_SEEDS);
/**
* A proxy address for the remote cluster. By default this is not set, meaning that Elasticsearch will connect directly to the nodes in
* the remote cluster using their publish addresses. If this setting is set to an IP address or hostname then Elasticsearch will connect
* to the nodes in the remote cluster using this address instead. Use of this setting is not recommended and it is deliberately
* undocumented as it does not work well with all proxies.
*/
public static final Setting.AffixSetting<String> REMOTE_CLUSTERS_PROXY = Setting.affixKeySetting(
"cluster.remote.",
"proxy",
(ns, key) -> Setting.simpleString(
key,
new StrategyValidator<>(ns, key, ConnectionStrategy.SNIFF, s -> {
if (Strings.hasLength(s)) {
parsePort(s);
}
}),
SEARCH_REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(ns),
Setting.Property.Dynamic,
Setting.Property.NodeScope),
REMOTE_CLUSTER_SEEDS);
public static final Setting<Integer> SEARCH_REMOTE_CONNECTIONS_PER_CLUSTER =
intSetting("search.remote.connections_per_cluster", 3, 1, Setting.Property.NodeScope, Setting.Property.Deprecated);
/**
* The maximum number of connections that will be established to a remote cluster. For instance if there is only a single
* seed node, other nodes will be discovered up to the given number of nodes in this setting. The default is 3.
*/
public static final Setting<Integer> REMOTE_CONNECTIONS_PER_CLUSTER =
intSetting(
"cluster.remote.connections_per_cluster",
SEARCH_REMOTE_CONNECTIONS_PER_CLUSTER, // the default needs to three when fallback is removed,
1,
Setting.Property.NodeScope);
/**
* The maximum number of node connections that will be established to a remote cluster. For instance if there is only a single
* seed node, other nodes will be discovered up to the given number of nodes in this setting. The default is 3.
*/
public static final Setting.AffixSetting<Integer> REMOTE_NODE_CONNECTIONS = Setting.affixKeySetting(
"cluster.remote.",
"sniff.node_connections",
(ns, key) -> intSetting(
key,
REMOTE_CONNECTIONS_PER_CLUSTER,
1,
new StrategyValidator<>(ns, key, ConnectionStrategy.SNIFF),
Setting.Property.Dynamic,
Setting.Property.NodeScope));
public static final SettingUpgrader<String> SEARCH_REMOTE_CLUSTERS_PROXY_UPGRADER = new SettingUpgrader<String>() {
@Override
public Setting<String> getSetting() {
return SEARCH_REMOTE_CLUSTERS_PROXY;
}
@Override
public String getKey(final String key) {
return key.replaceFirst("^search", "cluster");
}
};
static final int CHANNELS_PER_CONNECTION = 6;
private static final Logger logger = LogManager.getLogger(SniffConnectionStrategy.class);
private static final Predicate<DiscoveryNode> DEFAULT_NODE_PREDICATE = (node) -> Version.CURRENT.isCompatible(node.getVersion())
@ -75,10 +233,10 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
clusterAlias,
transportService,
connectionManager,
RemoteClusterAware.REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterAlias).get(settings),
RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER.get(settings),
REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterAlias).get(settings),
REMOTE_NODE_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(settings),
getNodePredicate(settings),
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings));
REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings));
}
SniffConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
@ -100,6 +258,10 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
this.seedNodes = seedNodes;
}
static Stream<Setting.AffixSetting<?>> enablementSettings() {
return Stream.of(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS, SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD);
}
@Override
protected boolean shouldOpenMoreConnections() {
return connectionManager.size() < maxNumRemoteConnections;
@ -107,8 +269,8 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
@Override
protected boolean strategyMustBeRebuilt(Settings newSettings) {
String proxy = RemoteClusterAware.REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
List<String> addresses = RemoteClusterAware.REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
String proxy = REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
List<String> addresses = REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
return seedsChanged(configuredSeedNodes, addresses) || proxyChanged(proxyAddress, proxy);
}
@ -148,10 +310,9 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
final DiscoveryNode seedNode = seedNodes.next().get();
logger.debug("[{}] opening connection to seed node: [{}] proxy address: [{}]", clusterAlias, seedNode,
proxyAddress);
final ConnectionProfile profile = ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG);
final StepListener<Transport.Connection> openConnectionStep = new StepListener<>();
try {
connectionManager.openConnection(seedNode, profile, openConnectionStep);
connectionManager.openConnection(seedNode, null, openConnectionStep);
} catch (Exception e) {
onFailure.accept(e);
}
@ -318,11 +479,11 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
private static DiscoveryNode resolveSeedNode(String clusterAlias, String address, String proxyAddress) {
if (proxyAddress == null || proxyAddress.isEmpty()) {
TransportAddress transportAddress = new TransportAddress(RemoteClusterAware.parseSeedAddress(address));
TransportAddress transportAddress = new TransportAddress(parseSeedAddress(address));
return new DiscoveryNode(clusterAlias + "#" + transportAddress.toString(), transportAddress,
Version.CURRENT.minimumCompatibilityVersion());
} else {
TransportAddress transportAddress = new TransportAddress(RemoteClusterAware.parseSeedAddress(proxyAddress));
TransportAddress transportAddress = new TransportAddress(parseSeedAddress(proxyAddress));
String hostName = address.substring(0, indexOfPortSeparator(address));
return new DiscoveryNode("", clusterAlias + "#" + address, UUIDs.randomBase64UUID(), hostName, address,
transportAddress, Collections.singletonMap("server_name", hostName), DiscoveryNodeRole.BUILT_IN_ROLES,
@ -353,7 +514,7 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
return node;
} else {
// resolve proxy address lazy here
InetSocketAddress proxyInetAddress = RemoteClusterAware.parseSeedAddress(proxyAddress);
InetSocketAddress proxyInetAddress = parseSeedAddress(proxyAddress);
return new DiscoveryNode(node.getName(), node.getId(), node.getEphemeralId(), node.getHostName(), node
.getHostAddress(), new TransportAddress(proxyInetAddress), node.getAttributes(), node.getRoles(), node.getVersion());
}

View File

@ -355,7 +355,7 @@ public class TransportSearchActionTests extends ESTestCase {
DiscoveryNode remoteSeedNode = remoteSeedTransport.getLocalDiscoNode();
knownNodes.add(remoteSeedNode);
nodes[i] = remoteSeedNode;
settingsBuilder.put("cluster.remote.remote" + i + ".seeds", remoteSeedNode.getAddress().toString());
settingsBuilder.put("cluster.remote.remote" + i + ".sniff.seeds", remoteSeedNode.getAddress().toString());
remoteIndices.put("remote" + i, new OriginalIndices(new String[]{"index"}, IndicesOptions.lenientExpandOpen()));
}
return mockTransportServices;

View File

@ -25,6 +25,7 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.SniffConnectionStrategy;
import org.junit.After;
import java.util.Arrays;
@ -145,15 +146,18 @@ public class UpgradeSettingsIT extends ESSingleNodeTestCase {
assertThat(
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSettingForNamespace("foo").get(settings),
equalTo(skipUnavailable));
assertFalse(RemoteClusterService.SEARCH_REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace("foo").exists(settings));
assertTrue(RemoteClusterService.REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace("foo").exists(settings));
assertFalse(SniffConnectionStrategy.SEARCH_REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace("foo").exists(settings));
assertTrue(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace("foo").exists(settings));
assertFalse(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace("foo").exists(settings));
assertTrue(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace("foo").existsOrFallbackExists(settings));
assertThat(
RemoteClusterService.REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace("foo").get(settings),
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace("foo").get(settings),
equalTo(Collections.singletonList("localhost:9200")));
assertFalse(RemoteClusterService.SEARCH_REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace("foo").exists(settings));
assertTrue(RemoteClusterService.REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace("foo").exists(settings));
assertFalse(SniffConnectionStrategy.SEARCH_REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace("foo").exists(settings));
assertTrue(SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace("foo").exists(settings));
assertThat(
RemoteClusterService.REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace("foo").get(settings), equalTo("localhost:9200"));
SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace("foo").get(settings),
equalTo("localhost:9200"));
}
}

View File

@ -65,7 +65,7 @@ public class RemoteClusterAwareClientTests extends ESTestCase {
knownNodes.add(discoverableTransport.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());
Settings.Builder builder = Settings.builder();
builder.putList("cluster.remote.cluster1.seeds", seedTransport.getLocalDiscoNode().getAddress().toString());
builder.putList("cluster.remote.cluster1.sniff.seeds", seedTransport.getLocalDiscoNode().getAddress().toString());
try (MockTransportService service = MockTransportService.createNewService(builder.build(), Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
@ -96,7 +96,7 @@ public class RemoteClusterAwareClientTests extends ESTestCase {
knownNodes.add(discoverableTransport.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());
Settings.Builder builder = Settings.builder();
builder.putList("cluster.remote.cluster1.seeds", seedTransport.getLocalDiscoNode().getAddress().toString());
builder.putList("cluster.remote.cluster1.sniff.seeds", seedTransport.getLocalDiscoNode().getAddress().toString());
try (MockTransportService service = MockTransportService.createNewService(builder.build(), Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();

View File

@ -52,7 +52,8 @@ public class RemoteClusterClientTests extends ESTestCase {
Settings localSettings = Settings.builder()
.put(RemoteClusterService.ENABLE_REMOTE_CLUSTERS.getKey(), true)
.put("cluster.remote.test.seeds", remoteNode.getAddress().getAddress() + ":" + remoteNode.getAddress().getPort()).build();
.put("cluster.remote.test.sniff.seeds",
remoteNode.getAddress().getAddress() + ":" + remoteNode.getAddress().getPort()).build();
try (MockTransportService service = MockTransportService.createNewService(localSettings, Version.CURRENT, threadPool, null)) {
service.start();
// following two log lines added to investigate #41745, can be removed once issue is closed
@ -80,7 +81,8 @@ public class RemoteClusterClientTests extends ESTestCase {
DiscoveryNode remoteNode = remoteTransport.getLocalDiscoNode();
Settings localSettings = Settings.builder()
.put(RemoteClusterService.ENABLE_REMOTE_CLUSTERS.getKey(), true)
.put("cluster.remote.test.seeds", remoteNode.getAddress().getAddress() + ":" + remoteNode.getAddress().getPort()).build();
.put("cluster.remote.test.sniff.seeds",
remoteNode.getAddress().getAddress() + ":" + remoteNode.getAddress().getPort()).build();
try (MockTransportService service = MockTransportService.createNewService(localSettings, Version.CURRENT, threadPool, null)) {
Semaphore semaphore = new Semaphore(1);
service.start();

View File

@ -196,7 +196,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
CountDownLatch listenerCalled = new CountDownLatch(1);
AtomicReference<Exception> exceptionReference = new AtomicReference<>();
String clusterAlias = "test-cluster";
Settings settings = buildSniffSettings(clusterAlias, seedNodes(seedNode));
Settings settings = buildRandomSettings(clusterAlias, addresses(seedNode));
try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service)) {
ActionListener<Void> listener = ActionListener.wrap(x -> {
listenerCalled.countDown();
@ -221,7 +221,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
}
}
private static List<String> seedNodes(final DiscoveryNode... seedNodes) {
private static List<String> addresses(final DiscoveryNode... seedNodes) {
return Arrays.stream(seedNodes).map(s -> s.getAddress().toString()).collect(Collectors.toList());
}
@ -236,14 +236,14 @@ public class RemoteClusterConnectionTests extends ESTestCase {
knownNodes.add(discoverableTransport.getLocalDiscoNode());
knownNodes.add(seedTransport1.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());
List<String> seedNodes = seedNodes(seedNode1, seedNode);
List<String> seedNodes = addresses(seedNode1, seedNode);
Collections.shuffle(seedNodes, random());
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
String clusterAlias = "test-cluster";
Settings settings = buildSniffSettings(clusterAlias, seedNodes);
Settings settings = buildRandomSettings(clusterAlias, seedNodes);
try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service)) {
int numThreads = randomIntBetween(4, 10);
Thread[] threads = new Thread[numThreads];
@ -323,7 +323,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
knownNodes.add(transport3.getLocalDiscoNode());
knownNodes.add(transport2.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());
List<String> seedNodes = seedNodes(node3, node1, node2);
List<String> seedNodes = addresses(node3, node1, node2);
Collections.shuffle(seedNodes, random());
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
@ -332,7 +332,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
int maxNumConnections = randomIntBetween(1, 5);
String clusterAlias = "test-cluster";
Settings settings = Settings.builder().put(buildSniffSettings(clusterAlias, seedNodes))
.put(RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER.getKey(), maxNumConnections).build();
.put(SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER.getKey(), maxNumConnections).build();
try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service)) {
// test no nodes connected
RemoteConnectionInfo remoteConnectionInfo = assertSerialization(connection.getConnectionInfo());
@ -450,7 +450,8 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
String clusterAlias = "test-cluster";
Settings settings = buildSniffSettings(clusterAlias, seedNodes(seedNode));
Settings settings = buildRandomSettings(clusterAlias, addresses(seedNode));
try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service)) {
CountDownLatch responseLatch = new CountDownLatch(1);
AtomicReference<Function<String, DiscoveryNode>> reference = new AtomicReference<>();
@ -489,8 +490,8 @@ public class RemoteClusterConnectionTests extends ESTestCase {
discoverableTransports.add(transportService);
}
List<String> seedNodes = new CopyOnWriteArrayList<>(randomSubsetOf(discoverableNodes.stream()
.map(d -> d.getAddress().toString()).collect(Collectors.toList())));
List<String> seedNodes = new CopyOnWriteArrayList<>(randomSubsetOf(randomIntBetween(1, discoverableNodes.size()),
discoverableNodes.stream().map(d -> d.getAddress().toString()).collect(Collectors.toList())));
Collections.shuffle(seedNodes, random());
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
@ -498,7 +499,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.acceptIncomingRequests();
String clusterAlias = "test-cluster";
Settings settings = buildSniffSettings(clusterAlias, seedNodes);
Settings settings = buildRandomSettings(clusterAlias, seedNodes);
try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service)) {
final int numGetThreads = randomIntBetween(4, 10);
final Thread[] getThreads = new Thread[numGetThreads];
@ -599,7 +600,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
service.start();
service.acceptIncomingRequests();
String clusterAlias = "test-cluster";
Settings settings = buildSniffSettings(clusterAlias, seedNodes(connectedNode));
Settings settings = buildRandomSettings(clusterAlias, addresses(connectedNode));
try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service, connectionManager)) {
PlainActionFuture.get(fut -> connection.ensureConnected(ActionListener.map(fut, x -> null)));
for (int i = 0; i < 10; i++) {
@ -624,10 +625,32 @@ public class RemoteClusterConnectionTests extends ESTestCase {
}
}
private Settings buildRandomSettings(String clusterAlias, List<String> addresses) {
if (randomBoolean()) {
return buildSimpleSettings(clusterAlias, addresses);
} else {
return buildSniffSettings(clusterAlias, addresses);
}
}
private static Settings buildSimpleSettings(String clusterAlias, List<String> addresses) {
Settings.Builder builder = Settings.builder();
builder.put(SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).getKey(),
Strings.collectionToCommaDelimitedString(addresses));
builder.put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).getKey(), "simple");
return builder.build();
}
private static Settings buildSniffSettings(String clusterAlias, List<String> seedNodes) {
Settings.Builder builder = Settings.builder();
builder.put(RemoteClusterAware.REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(clusterAlias).getKey(),
Strings.collectionToCommaDelimitedString(seedNodes));
builder.put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).getKey(), "sniff");
if (randomBoolean()) {
builder.put(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace(clusterAlias).getKey(),
Strings.collectionToCommaDelimitedString(seedNodes));
} else {
builder.put(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(clusterAlias).getKey(),
Strings.collectionToCommaDelimitedString(seedNodes));
}
return builder.build();
}
}

View File

@ -25,11 +25,9 @@ import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.AbstractScopedSettings;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.test.ESTestCase;
@ -38,8 +36,6 @@ import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -51,14 +47,9 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
public class RemoteClusterServiceTests extends ESTestCase {
@ -85,109 +76,43 @@ public class RemoteClusterServiceTests extends ESTestCase {
}
public void testSettingsAreRegistered() {
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterAware.REMOTE_CLUSTERS_SEEDS));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_NODE_ATTRIBUTE));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CLUSTER_COMPRESS));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SimpleConnectionStrategy.REMOTE_SOCKET_CONNECTIONS));
}
public void testRemoteClusterSeedSetting() {
// simple validation
Settings settings = Settings.builder()
.put("cluster.remote.foo.seeds", "192.168.0.1:8080")
.put("cluster.remote.bar.seed", "[::1]:9090").build();
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings).forEach(setting -> setting.get(settings));
.put("cluster.remote.foo.sniff.seeds", "192.168.0.1:8080")
.put("cluster.remote.bar.sniff.seed", "[::1]:9090").build();
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getAllConcreteSettings(settings).forEach(setting -> setting.get(settings));
Settings brokenSettings = Settings.builder()
.put("cluster.remote.foo.seeds", "192.168.0.1").build();
.put("cluster.remote.foo.sniff.seeds", "192.168.0.1").build();
expectThrows(IllegalArgumentException.class, () ->
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(brokenSettings).forEach(setting -> setting.get(brokenSettings)));
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS
.getAllConcreteSettings(brokenSettings).forEach(setting -> setting.get(brokenSettings)));
Settings brokenPortSettings = Settings.builder()
.put("cluster.remote.foo.seeds", "192.168.0.1:123456789123456789").build();
.put("cluster.remote.foo.sniff.seeds", "192.168.0.1:123456789123456789").build();
Exception e = expectThrows(
IllegalArgumentException.class,
() -> RemoteClusterAware.REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(brokenSettings)
() -> SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getAllConcreteSettings(brokenSettings)
.forEach(setting -> setting.get(brokenPortSettings))
);
assertEquals("failed to parse port", e.getMessage());
}
public void testBuildRemoteClustersDynamicConfig() throws Exception {
Map<String, Tuple<String, List<Tuple<String, Supplier<DiscoveryNode>>>>> map =
RemoteClusterService.buildRemoteClustersDynamicConfig(
Settings.builder()
.put("cluster.remote.foo.seeds", "192.168.0.1:8080")
.put("cluster.remote.bar.seeds", "[::1]:9090")
.put("cluster.remote.boom.seeds", "boom-node1.internal:1000")
.put("cluster.remote.boom.proxy", "foo.bar.com:1234")
.put("search.remote.quux.seeds", "quux:9300")
.put("search.remote.quux.proxy", "quux-proxy:19300")
.build());
assertThat(map.keySet(), containsInAnyOrder(equalTo("foo"), equalTo("bar"), equalTo("boom"), equalTo("quux")));
assertThat(map.get("foo").v2(), hasSize(1));
assertThat(map.get("bar").v2(), hasSize(1));
assertThat(map.get("boom").v2(), hasSize(1));
assertThat(map.get("quux").v2(), hasSize(1));
DiscoveryNode foo = map.get("foo").v2().get(0).v2().get();
assertEquals("", map.get("foo").v1());
assertEquals(foo.getAddress(), new TransportAddress(new InetSocketAddress(InetAddress.getByName("192.168.0.1"), 8080)));
assertEquals(foo.getId(), "foo#192.168.0.1:8080");
assertEquals(foo.getVersion(), Version.CURRENT.minimumCompatibilityVersion());
DiscoveryNode bar = map.get("bar").v2().get(0).v2().get();
assertEquals(bar.getAddress(), new TransportAddress(new InetSocketAddress(InetAddress.getByName("[::1]"), 9090)));
assertEquals(bar.getId(), "bar#[::1]:9090");
assertEquals("", map.get("bar").v1());
assertEquals(bar.getVersion(), Version.CURRENT.minimumCompatibilityVersion());
DiscoveryNode boom = map.get("boom").v2().get(0).v2().get();
assertEquals(boom.getAddress(), new TransportAddress(TransportAddress.META_ADDRESS, 0));
assertEquals("boom-node1.internal", boom.getHostName());
assertEquals(boom.getId(), "boom#boom-node1.internal:1000");
assertEquals("foo.bar.com:1234", map.get("boom").v1());
assertEquals(boom.getVersion(), Version.CURRENT.minimumCompatibilityVersion());
DiscoveryNode quux = map.get("quux").v2().get(0).v2().get();
assertEquals(quux.getAddress(), new TransportAddress(TransportAddress.META_ADDRESS, 0));
assertEquals("quux", quux.getHostName());
assertEquals(quux.getId(), "quux#quux:9300");
assertEquals("quux-proxy:19300", map.get("quux").v1());
assertEquals(quux.getVersion(), Version.CURRENT.minimumCompatibilityVersion());
assertSettingDeprecationsAndWarnings(new String[]{"search.remote.quux.seeds", "search.remote.quux.proxy"});
}
public void testBuildRemoteClustersDynamicConfigWithDuplicate() {
final IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> RemoteClusterService.buildRemoteClustersDynamicConfig(
Settings.builder()
.put("cluster.remote.foo.seeds", "192.168.0.1:8080")
.put("search.remote.foo.seeds", "192.168.0.1:8080")
.build()));
assertThat(e, hasToString(containsString("found duplicate remote cluster configurations for cluster alias [foo]")));
assertSettingDeprecationsAndWarnings(new String[]{"search.remote.foo.seeds"});
}
public void testBuildRemoteClustersDynamicConfigWithDuplicates() {
final IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> RemoteClusterService.buildRemoteClustersDynamicConfig(
Settings.builder()
.put("cluster.remote.foo.seeds", "192.168.0.1:8080")
.put("search.remote.foo.seeds", "192.168.0.1:8080")
.put("cluster.remote.bar.seeds", "192.168.0.1:8080")
.put("search.remote.bar.seeds", "192.168.0.1:8080")
.build()));
assertThat(e, hasToString(containsString("found duplicate remote cluster configurations for cluster aliases [bar,foo]")));
assertSettingDeprecationsAndWarnings(new String[]{"search.remote.bar.seeds", "search.remote.foo.seeds"});
}
public void testGroupClusterIndices() throws IOException {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT);
@ -203,8 +128,8 @@ public class RemoteClusterServiceTests extends ESTestCase {
transportService.start();
transportService.acceptIncomingRequests();
Settings.Builder builder = Settings.builder();
builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString());
builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString());
builder.putList("cluster.remote.cluster_1.sniff.seeds", cluster1Seed.getAddress().toString());
builder.putList("cluster.remote.cluster_2.sniff.seeds", cluster2Seed.getAddress().toString());
try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
@ -436,7 +361,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
transportService.start();
transportService.acceptIncomingRequests();
Settings.Builder builder = Settings.builder();
builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString());
builder.putList("cluster.remote.cluster_1.sniff.seeds", cluster1Seed.getAddress().toString());
try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
service.initializeRemoteClusters();
RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1");
@ -445,7 +370,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
settingsChange.put("cluster.remote.cluster_1.transport.ping_schedule", pingSchedule);
boolean compressionEnabled = true;
settingsChange.put("cluster.remote.cluster_1.transport.compress", compressionEnabled);
settingsChange.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString());
settingsChange.putList("cluster.remote.cluster_1.sniff.seeds", cluster1Seed.getAddress().toString());
service.validateAndUpdateRemoteCluster("cluster_1", settingsChange.build());
assertBusy(remoteClusterConnection::isClosed);
@ -490,9 +415,9 @@ public class RemoteClusterServiceTests extends ESTestCase {
transportService.acceptIncomingRequests();
final Settings.Builder builder = Settings.builder();
builder.putList(
"cluster.remote.cluster_1.seeds", c1N1Node.getAddress().toString());
"cluster.remote.cluster_1.sniff.seeds", c1N1Node.getAddress().toString());
builder.putList(
"cluster.remote.cluster_2.seeds", c2N1Node.getAddress().toString());
"cluster.remote.cluster_2.sniff.seeds", c2N1Node.getAddress().toString());
try (RemoteClusterService service =
new RemoteClusterService(settings, transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
@ -557,8 +482,8 @@ public class RemoteClusterServiceTests extends ESTestCase {
transportService.start();
transportService.acceptIncomingRequests();
final Settings.Builder builder = Settings.builder();
builder.putList("cluster.remote.cluster_1.seeds", c1N1Node.getAddress().toString());
builder.putList("cluster.remote.cluster_2.seeds", c2N1Node.getAddress().toString());
builder.putList("cluster.remote.cluster_1.sniff.seeds", c1N1Node.getAddress().toString());
builder.putList("cluster.remote.cluster_2.sniff.seeds", c2N1Node.getAddress().toString());
try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
@ -628,9 +553,9 @@ public class RemoteClusterServiceTests extends ESTestCase {
transportService.acceptIncomingRequests();
final Settings.Builder builder = Settings.builder();
builder.putList(
"cluster.remote.cluster_1.seeds", c1N1Node.getAddress().toString());
"cluster.remote.cluster_1.sniff.seeds", c1N1Node.getAddress().toString());
builder.putList(
"cluster.remote.cluster_2.seeds", c2N1Node.getAddress().toString());
"cluster.remote.cluster_2.sniff.seeds", c2N1Node.getAddress().toString());
try (RemoteClusterService service =
new RemoteClusterService(settings, transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
@ -760,12 +685,12 @@ public class RemoteClusterServiceTests extends ESTestCase {
}
AbstractScopedSettings service = new ClusterSettings(Settings.EMPTY,
new HashSet<>(Arrays.asList(RemoteClusterAware.REMOTE_CLUSTERS_SEEDS,
new HashSet<>(Arrays.asList(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS, SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD,
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE)));
{
Settings settings = Settings.builder().put("cluster.remote.foo.skip_unavailable", randomBoolean()).build();
IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> service.validate(settings, true));
assertEquals("missing required setting [cluster.remote.foo.seeds] for setting [cluster.remote.foo.skip_unavailable]",
assertEquals("missing required setting [cluster.remote.foo.sniff.seeds] for setting [cluster.remote.foo.skip_unavailable]",
iae.getMessage());
}
{
@ -773,10 +698,10 @@ public class RemoteClusterServiceTests extends ESTestCase {
String seed = remoteSeedTransport.getLocalDiscoNode().getAddress().toString();
service.validate(Settings.builder().put("cluster.remote.foo.skip_unavailable", randomBoolean())
.put("cluster.remote.foo.seeds", seed).build(), true);
service.validate(Settings.builder().put("cluster.remote.foo.seeds", seed).build(), true);
service.validate(Settings.builder().put("cluster.remote.foo.sniff.seeds", seed).build(), true);
AbstractScopedSettings service2 = new ClusterSettings(Settings.builder().put("cluster.remote.foo.seeds", seed).build(),
new HashSet<>(Arrays.asList(RemoteClusterAware.REMOTE_CLUSTERS_SEEDS,
new HashSet<>(Arrays.asList(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD,
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE)));
service2.validate(Settings.builder().put("cluster.remote.foo.skip_unavailable", randomBoolean()).build(), false);
}
@ -800,7 +725,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
transportService.acceptIncomingRequests();
final Settings.Builder builder = Settings.builder();
builder.putList("cluster.remote.cluster_test.seeds", Collections.singletonList(node0.getAddress().toString()));
builder.putList("cluster.remote.cluster_test.sniff.seeds", Collections.singletonList(node0.getAddress().toString()));
try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
@ -896,8 +821,13 @@ public class RemoteClusterServiceTests extends ESTestCase {
private static Settings createSettings(String clusterAlias, List<String> seeds) {
Settings.Builder builder = Settings.builder();
builder.put(RemoteClusterAware.REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(clusterAlias).getKey(),
Strings.collectionToCommaDelimitedString(seeds));
if (randomBoolean()) {
builder.put(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace(clusterAlias).getKey(),
Strings.collectionToCommaDelimitedString(seeds));
} else {
builder.put(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(clusterAlias).getKey(),
Strings.collectionToCommaDelimitedString(seeds));
}
return builder.build();
}
}

View File

@ -28,18 +28,19 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.transport.RemoteClusterAware.REMOTE_CLUSTERS_PROXY;
import static org.elasticsearch.transport.RemoteClusterAware.REMOTE_CLUSTERS_SEEDS;
import static org.elasticsearch.transport.RemoteClusterAware.SEARCH_REMOTE_CLUSTERS_PROXY;
import static org.elasticsearch.transport.RemoteClusterAware.SEARCH_REMOTE_CLUSTERS_SEEDS;
import static org.elasticsearch.transport.SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY;
import static org.elasticsearch.transport.SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS;
import static org.elasticsearch.transport.SniffConnectionStrategy.SEARCH_REMOTE_CLUSTERS_PROXY;
import static org.elasticsearch.transport.SniffConnectionStrategy.SEARCH_REMOTE_CLUSTERS_SEEDS;
import static org.elasticsearch.transport.SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD;
import static org.elasticsearch.transport.RemoteClusterService.ENABLE_REMOTE_CLUSTERS;
import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE;
import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER;
import static org.elasticsearch.transport.SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER;
import static org.elasticsearch.transport.RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING;
import static org.elasticsearch.transport.RemoteClusterService.REMOTE_NODE_ATTRIBUTE;
import static org.elasticsearch.transport.RemoteClusterService.SEARCH_ENABLE_REMOTE_CLUSTERS;
import static org.elasticsearch.transport.RemoteClusterService.SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE;
import static org.elasticsearch.transport.RemoteClusterService.SEARCH_REMOTE_CONNECTIONS_PER_CLUSTER;
import static org.elasticsearch.transport.SniffConnectionStrategy.SEARCH_REMOTE_CONNECTIONS_PER_CLUSTER;
import static org.elasticsearch.transport.RemoteClusterService.SEARCH_REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING;
import static org.elasticsearch.transport.RemoteClusterService.SEARCH_REMOTE_NODE_ATTRIBUTE;
import static org.hamcrest.Matchers.emptyCollectionOf;
@ -117,13 +118,13 @@ public class RemoteClusterSettingsTests extends ESTestCase {
final Settings settings =
Settings.builder()
.put(SEARCH_REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(alias).getKey(), String.join(",", seeds)).build();
assertThat(REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(alias).get(settings), equalTo(seeds));
assertThat(REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(alias).get(settings), equalTo(seeds));
assertSettingDeprecationsAndWarnings(new Setting[]{SEARCH_REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(alias)});
}
public void testSeedsDefault() {
final String alias = randomAlphaOfLength(8);
assertThat(REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(alias).get(Settings.EMPTY), emptyCollectionOf(String.class));
assertThat(REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace(alias).get(Settings.EMPTY), emptyCollectionOf(String.class));
}
public void testProxyFallback() {
@ -143,4 +144,4 @@ public class RemoteClusterSettingsTests extends ESTestCase {
assertThat(REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(alias).get(Settings.EMPTY), equalTo(""));
}
}
}

View File

@ -59,8 +59,6 @@ public class RemoteConnectionStrategyTests extends ESTestCase {
FakeConnectionStrategy first = new FakeConnectionStrategy("cluster-alias", mock(TransportService.class), remoteConnectionManager,
RemoteConnectionStrategy.ConnectionStrategy.SIMPLE);
ConnectionProfile profile = connectionManager.getConnectionProfile();
Settings.Builder newBuilder = Settings.builder();
newBuilder.put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace("cluster-alias").getKey(), "simple");
if (randomBoolean()) {
@ -72,6 +70,18 @@ public class RemoteConnectionStrategyTests extends ESTestCase {
assertTrue(first.shouldRebuildConnection(newBuilder.build()));
}
public void testCorrectChannelNumber() {
String clusterAlias = "cluster-alias";
for (RemoteConnectionStrategy.ConnectionStrategy strategy : RemoteConnectionStrategy.ConnectionStrategy.values()) {
String settingKey = RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).getKey();
Settings simpleSettings = Settings.builder().put(settingKey, strategy.name()).build();
ConnectionProfile simpleProfile = RemoteConnectionStrategy.buildConnectionProfile(clusterAlias, simpleSettings);
assertEquals("Incorrect number of channels for " + strategy.name(),
strategy.getNumberOfChannels(), simpleProfile.getNumConnections());
}
}
private static class FakeConnectionStrategy extends RemoteConnectionStrategy {
private final ConnectionStrategy strategy;

View File

@ -22,6 +22,10 @@ package org.elasticsearch.transport;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.AbstractScopedSettings;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.test.ESTestCase;
@ -31,7 +35,11 @@ import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
@ -40,7 +48,9 @@ import java.util.stream.Collectors;
public class SimpleConnectionStrategyTests extends ESTestCase {
private final String clusterAlias = "cluster-alias";
private final ConnectionProfile profile = RemoteClusterConnection.buildConnectionProfileFromSettings(Settings.EMPTY, "cluster");
private final String modeKey = RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).getKey();
private final Settings settings = Settings.builder().put(modeKey, "simple").build();
private final ConnectionProfile profile = RemoteConnectionStrategy.buildConnectionProfile("cluster", settings);
private final ThreadPool threadPool = new TestThreadPool(getClass().getName());
@Override
@ -60,7 +70,7 @@ public class SimpleConnectionStrategyTests extends ESTestCase {
.put("node.name", id)
.put(settings)
.build();
MockTransportService newService = MockTransportService.createNewService(settings, version, threadPool);
MockTransportService newService = MockTransportService.createNewService(s, version, threadPool);
try {
newService.start();
newService.acceptIncomingRequests();
@ -262,7 +272,68 @@ public class SimpleConnectionStrategyTests extends ESTestCase {
}
}
private static List<Supplier<TransportAddress>> addresses(final TransportAddress... addresses) {
return Arrays.stream(addresses).map(s -> (Supplier<TransportAddress>) () -> s).collect(Collectors.toList());
public void testSimpleStrategyWillResolveAddressesEachConnect() throws Exception {
try (MockTransportService transport1 = startTransport("seed_node", Version.CURRENT)) {
TransportAddress address = transport1.boundAddress().publishAddress();
CountDownLatch multipleResolveLatch = new CountDownLatch(2);
Supplier<TransportAddress> addressSupplier = () -> {
multipleResolveLatch.countDown();
return address;
};
try (MockTransportService localService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool)) {
localService.start();
localService.acceptIncomingRequests();
ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport);
int numOfConnections = randomIntBetween(4, 8);
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
SimpleConnectionStrategy strategy = new SimpleConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
numOfConnections, addresses(address), Collections.singletonList(addressSupplier))) {
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
strategy.connect(connectFuture);
connectFuture.actionGet();
remoteConnectionManager.getAnyRemoteConnection().close();
assertTrue(multipleResolveLatch.await(30L, TimeUnit.SECONDS));
}
}
}
}
public void testModeSettingsCannotBeUsedWhenInDifferentMode() {
List<Tuple<Setting.AffixSetting<?>, String>> restrictedSettings = Arrays.asList(
new Tuple<>(SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES, "192.168.0.1:8080"),
new Tuple<>(SimpleConnectionStrategy.REMOTE_SOCKET_CONNECTIONS, "3"));
RemoteConnectionStrategy.ConnectionStrategy sniff = RemoteConnectionStrategy.ConnectionStrategy.SNIFF;
String clusterName = "cluster_name";
Settings settings = Settings.builder()
.put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterName).getKey(), sniff.name())
.build();
Set<Setting<?>> clusterSettings = new HashSet<>();
clusterSettings.add(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE);
clusterSettings.addAll(restrictedSettings.stream().map(Tuple::v1).collect(Collectors.toList()));
AbstractScopedSettings service = new ClusterSettings(Settings.EMPTY, clusterSettings);
// Should validate successfully
service.validate(settings, true);
for (Tuple<Setting.AffixSetting<?>, String> restrictedSetting : restrictedSettings) {
Setting<?> concreteSetting = restrictedSetting.v1().getConcreteSettingForNamespace(clusterName);
Settings invalid = Settings.builder().put(settings).put(concreteSetting.getKey(), restrictedSetting.v2()).build();
IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> service.validate(invalid, true));
String expected = "Setting \"" + concreteSetting.getKey() + "\" cannot be used with the configured " +
"\"cluster.remote.cluster_name.mode\" [required=SIMPLE, configured=SNIFF]";
assertEquals(expected, iae.getMessage());
}
}
private static List<String> addresses(final TransportAddress... addresses) {
return Arrays.stream(addresses).map(TransportAddress::toString).collect(Collectors.toList());
}
}

View File

@ -30,6 +30,9 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.AbstractScopedSettings;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
@ -61,7 +64,9 @@ import static org.hamcrest.Matchers.equalTo;
public class SniffConnectionStrategyTests extends ESTestCase {
private final String clusterAlias = "cluster-alias";
private final ConnectionProfile profile = RemoteClusterConnection.buildConnectionProfileFromSettings(Settings.EMPTY, "cluster");
private final String modeKey = RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).getKey();
private final Settings settings = Settings.builder().put(modeKey, "sniff").build();
private final ConnectionProfile profile = RemoteConnectionStrategy.buildConnectionProfile("cluster", settings);
private final ThreadPool threadPool = new TestThreadPool(getClass().getName());
@Override
@ -509,8 +514,8 @@ public class SniffConnectionStrategyTests extends ESTestCase {
assertTrue(connectionManager.nodeConnected(discoverableNode));
assertTrue(strategy.assertNoRunningConnections());
Setting<?> seedSetting = RemoteClusterService.REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace("cluster-alias");
Setting<?> proxySetting = RemoteClusterService.REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace("cluster-alias");
Setting<?> seedSetting = SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace("cluster-alias");
Setting<?> proxySetting = SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace("cluster-alias");
Settings noChange = Settings.builder()
.put(seedSetting.getKey(), Strings.arrayToCommaDelimitedString(seedNodes(seedNode).toArray()))
@ -636,6 +641,37 @@ public class SniffConnectionStrategyTests extends ESTestCase {
}
}
public void testModeSettingsCannotBeUsedWhenInDifferentMode() {
List<Tuple<Setting.AffixSetting<?>, String>> restrictedSettings = Arrays.asList(
new Tuple<>(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS, "192.168.0.1:8080"),
new Tuple<>(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD, "192.168.0.1:8080"),
new Tuple<>(SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS, "2"));
RemoteConnectionStrategy.ConnectionStrategy simple = RemoteConnectionStrategy.ConnectionStrategy.SIMPLE;
String clusterName = "cluster_name";
Settings settings = Settings.builder()
.put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterName).getKey(), simple.name())
.build();
Set<Setting<?>> clusterSettings = new HashSet<>();
clusterSettings.add(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE);
clusterSettings.addAll(restrictedSettings.stream().map(Tuple::v1).collect(Collectors.toList()));
AbstractScopedSettings service = new ClusterSettings(Settings.EMPTY, clusterSettings);
// Should validate successfully
service.validate(settings, true);
for (Tuple<Setting.AffixSetting<?>, String> restrictedSetting : restrictedSettings) {
Setting<?> concreteSetting = restrictedSetting.v1().getConcreteSettingForNamespace(clusterName);
Settings invalid = Settings.builder().put(settings).put(concreteSetting.getKey(), restrictedSetting.v2()).build();
IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> service.validate(invalid, true));
String expected = "Setting \"" + concreteSetting.getKey() + "\" cannot be used with the configured " +
"\"cluster.remote.cluster_name.mode\" [required=SNIFF, configured=SIMPLE]";
assertEquals(expected, iae.getMessage());
}
}
private static List<String> seedNodes(final DiscoveryNode... seedNodes) {
return Arrays.stream(seedNodes).map(s -> s.getAddress().toString()).collect(Collectors.toList());
}

View File

@ -81,6 +81,7 @@ import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.transport.NoSuchRemoteClusterException;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.SniffConnectionStrategy;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.CcrIntegTestCase;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
@ -1361,7 +1362,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
String address = getLeaderCluster().getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString();
Setting<Boolean> compress = RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace("leader_cluster");
Setting<List<String>> seeds = RemoteClusterService.REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace("leader_cluster");
Setting<List<String>> seeds = SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace("leader_cluster");
settingsRequest.persistentSettings(Settings.builder().put(compress.getKey(), true).put(seeds.getKey(), address));
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
@ -1392,7 +1393,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
String address = getLeaderCluster().getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString();
Setting<Boolean> compress = RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace("leader_cluster");
Setting<List<String>> seeds = RemoteClusterService.REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace("leader_cluster");
Setting<List<String>> seeds = SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD.getConcreteSettingForNamespace("leader_cluster");
settingsRequest.persistentSettings(Settings.builder().put(compress.getKey(), compress.getDefault(Settings.EMPTY))
.put(seeds.getKey(), address));
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());

View File

@ -74,7 +74,7 @@ public class RealmSettings {
* The {@code Function} takes the <em>realm-type</em> as an argument.
* @param suffix The suffix of the setting (everything following the realm name in the affix setting)
* @param delegateFactory A factory to produce the concrete setting.
* See {@link Setting#affixKeySetting(Setting.AffixKey, Function, Setting.AffixSetting[])}
* See {@link Setting#affixKeySetting(String, String, Function, Setting.AffixSetting[])}
*/
public static <T> Function<String, Setting.AffixSetting<T>> affixSetting(String suffix, Function<String, Setting<T>> delegateFactory) {
return realmType -> Setting.affixKeySetting(realmSettingPrefix(realmType), suffix, delegateFactory);

View File

@ -0,0 +1,212 @@
---
"Add transient remote cluster in simple mode with invalid sniff settings":
- do:
cluster.get_settings:
include_defaults: true
- set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip }
- do:
catch: bad_request
cluster.put_settings:
flat_settings: true
body:
transient:
cluster.remote.test_remote_cluster.mode: "simple"
cluster.remote.test_remote_cluster.sniff.node_connections: "5"
cluster.remote.test_remote_cluster.simple.addresses: $remote_ip
- match: { status: 400 }
- match: { error.root_cause.0.type: "illegal_argument_exception" }
- match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.sniff.node_connections\" cannot be
used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SNIFF, configured=SIMPLE]" }
- do:
catch: bad_request
cluster.put_settings:
flat_settings: true
body:
transient:
cluster.remote.test_remote_cluster.mode: "simple"
cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip
cluster.remote.test_remote_cluster.simple.addresses: $remote_ip
- match: { status: 400 }
- match: { error.root_cause.0.type: "illegal_argument_exception" }
- match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.sniff.seeds\" cannot be
used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SNIFF, configured=SIMPLE]" }
---
"Add transient remote cluster in sniff mode with invalid simple settings":
- do:
cluster.get_settings:
include_defaults: true
- set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip }
- do:
catch: bad_request
cluster.put_settings:
flat_settings: true
body:
transient:
cluster.remote.test_remote_cluster.simple.socket_connections: "20"
cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip
- match: { status: 400 }
- match: { error.root_cause.0.type: "illegal_argument_exception" }
- match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.simple.socket_connections\" cannot be
used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SIMPLE, configured=SNIFF]" }
- do:
catch: bad_request
cluster.put_settings:
flat_settings: true
body:
transient:
cluster.remote.test_remote_cluster.simple.addresses: $remote_ip
cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip
- match: { status: 400 }
- match: { error.root_cause.0.type: "illegal_argument_exception" }
- match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.simple.addresses\" cannot be
used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SIMPLE, configured=SNIFF]" }
---
"Add transient remote cluster using simple connection mode using valid settings":
- do:
cluster.get_settings:
include_defaults: true
- set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip }
- do:
cluster.put_settings:
flat_settings: true
body:
transient:
cluster.remote.test_remote_cluster.mode: "simple"
cluster.remote.test_remote_cluster.simple.socket_connections: "3"
cluster.remote.test_remote_cluster.simple.addresses: $remote_ip
- match: {transient.cluster\.remote\.test_remote_cluster\.mode: "simple"}
- match: {transient.cluster\.remote\.test_remote_cluster\.simple\.socket_connections: "3"}
- match: {transient.cluster\.remote\.test_remote_cluster\.simple\.addresses: $remote_ip}
- do:
search:
rest_total_hits_as_int: true
index: test_remote_cluster:test_index
- is_false: num_reduce_phases
- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 3 }
- match: { hits.total: 6 }
- match: { hits.hits.0._index: "test_remote_cluster:test_index" }
---
"Add transient remote cluster using sniff connection mode using valid settings":
- do:
cluster.get_settings:
include_defaults: true
- set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip }
- do:
cluster.put_settings:
flat_settings: true
body:
transient:
cluster.remote.test_remote_cluster.mode: "sniff"
cluster.remote.test_remote_cluster.sniff.node_connections: "3"
cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip
- match: {transient.cluster\.remote\.test_remote_cluster\.mode: "sniff"}
- match: {transient.cluster\.remote\.test_remote_cluster\.sniff\.node_connections: "3"}
- match: {transient.cluster\.remote\.test_remote_cluster\.sniff\.seeds: $remote_ip}
- do:
search:
rest_total_hits_as_int: true
index: test_remote_cluster:test_index
- is_false: num_reduce_phases
- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 3 }
- match: { hits.total: 6 }
- match: { hits.hits.0._index: "test_remote_cluster:test_index" }
---
"Switch connection mode for configured cluster":
- do:
cluster.get_settings:
include_defaults: true
- set: { defaults.cluster.remote.my_remote_cluster.seeds.0: remote_ip }
- do:
cluster.put_settings:
flat_settings: true
body:
transient:
cluster.remote.test_remote_cluster.mode: "sniff"
cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip
- match: {transient.cluster\.remote\.test_remote_cluster\.mode: "sniff"}
- match: {transient.cluster\.remote\.test_remote_cluster\.sniff\.seeds: $remote_ip}
- do:
search:
rest_total_hits_as_int: true
index: test_remote_cluster:test_index
- is_false: num_reduce_phases
- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 3 }
- match: { hits.total: 6 }
- match: { hits.hits.0._index: "test_remote_cluster:test_index" }
- do:
catch: bad_request
cluster.put_settings:
flat_settings: true
body:
transient:
cluster.remote.test_remote_cluster.mode: "simple"
cluster.remote.test_remote_cluster.simple.addresses: $remote_ip
- match: { status: 400 }
- match: { error.root_cause.0.type: "illegal_argument_exception" }
- match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.sniff.seeds\" cannot be
used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=SNIFF, configured=SIMPLE]" }
- do:
cluster.put_settings:
flat_settings: true
body:
transient:
cluster.remote.test_remote_cluster.mode: "simple"
cluster.remote.test_remote_cluster.sniff.seeds: null
cluster.remote.test_remote_cluster.simple.addresses: $remote_ip
- match: {transient.cluster\.remote\.test_remote_cluster\.mode: "simple"}
- match: {transient.cluster\.remote\.test_remote_cluster\.simple\.addresses: $remote_ip}
- do:
search:
rest_total_hits_as_int: true
index: test_remote_cluster:test_index
- is_false: num_reduce_phases
- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 3 }
- match: { hits.total: 6 }
- match: { hits.hits.0._index: "test_remote_cluster:test_index" }