Remove seeds depedency for remote cluster settings (#52829)
Currently 3 remote cluster settings (ping interval, skip unavailable, and compression) have a dependency on the seeds setting being comfigured. With proxy mode, it is now possible that these settings the seeds setting has not been configured. This commit removes this dependency and adds new validation for these settings.
This commit is contained in:
parent
2d01c005ba
commit
be8d704e2b
|
@ -248,8 +248,8 @@ public class CrossClusterSearchUnavailableClusterIT extends ESRestTestCase {
|
||||||
() -> client().performRequest(request));
|
() -> client().performRequest(request));
|
||||||
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
|
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
|
||||||
assertThat(responseException.getMessage(),
|
assertThat(responseException.getMessage(),
|
||||||
containsString("missing required setting [cluster.remote.remote1.seeds] " +
|
containsString("Cannot configure setting [cluster.remote.remote1.skip_unavailable] if remote cluster is " +
|
||||||
"for setting [cluster.remote.remote1.skip_unavailable]"));
|
"not enabled."));
|
||||||
}
|
}
|
||||||
|
|
||||||
Map<String, Object> settingsMap = new HashMap<>();
|
Map<String, Object> settingsMap = new HashMap<>();
|
||||||
|
@ -264,8 +264,8 @@ public class CrossClusterSearchUnavailableClusterIT extends ESRestTestCase {
|
||||||
ResponseException responseException = expectThrows(ResponseException.class,
|
ResponseException responseException = expectThrows(ResponseException.class,
|
||||||
() -> client().performRequest(request));
|
() -> client().performRequest(request));
|
||||||
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
|
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
|
||||||
assertThat(responseException.getMessage(), containsString("missing required setting [cluster.remote.remote1.seeds] " +
|
assertThat(responseException.getMessage(), containsString("Cannot configure setting " +
|
||||||
"for setting [cluster.remote.remote1.skip_unavailable]"));
|
"[cluster.remote.remote1.skip_unavailable] if remote cluster is not enabled."));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
|
|
|
@ -325,7 +325,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
||||||
RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE,
|
RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE,
|
||||||
RemoteClusterService.REMOTE_CLUSTER_COMPRESS,
|
RemoteClusterService.REMOTE_CLUSTER_COMPRESS,
|
||||||
RemoteConnectionStrategy.REMOTE_CONNECTION_MODE,
|
RemoteConnectionStrategy.REMOTE_CONNECTION_MODE,
|
||||||
ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES,
|
ProxyConnectionStrategy.PROXY_ADDRESS,
|
||||||
ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS,
|
ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS,
|
||||||
ProxyConnectionStrategy.SERVER_NAME,
|
ProxyConnectionStrategy.SERVER_NAME,
|
||||||
SniffConnectionStrategy.SEARCH_REMOTE_CLUSTERS_SEEDS,
|
SniffConnectionStrategy.SEARCH_REMOTE_CLUSTERS_SEEDS,
|
||||||
|
|
|
@ -1260,6 +1260,12 @@ public class Setting<T> implements ToXContentObject {
|
||||||
return new Setting<>(key, fallbackSetting, b -> parseBoolean(b, key, isFiltered(properties)), properties);
|
return new Setting<>(key, fallbackSetting, b -> parseBoolean(b, key, isFiltered(properties)), properties);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static Setting<Boolean> boolSetting(String key, Setting<Boolean> fallbackSetting, Validator<Boolean> validator,
|
||||||
|
Property... properties) {
|
||||||
|
return new Setting<>(new SimpleKey(key), fallbackSetting, fallbackSetting::getRaw, b -> parseBoolean(b, key,
|
||||||
|
isFiltered(properties)), validator, properties);
|
||||||
|
}
|
||||||
|
|
||||||
public static Setting<Boolean> boolSetting(String key, boolean defaultValue, Validator<Boolean> validator, Property... properties) {
|
public static Setting<Boolean> boolSetting(String key, boolean defaultValue, Validator<Boolean> validator, Property... properties) {
|
||||||
return new Setting<>(key, Boolean.toString(defaultValue), b -> parseBoolean(b, key, isFiltered(properties)), validator, properties);
|
return new Setting<>(key, Boolean.toString(defaultValue), b -> parseBoolean(b, key, isFiltered(properties)), validator, properties);
|
||||||
}
|
}
|
||||||
|
@ -1629,6 +1635,12 @@ public class Setting<T> implements ToXContentObject {
|
||||||
return new Setting<>(key, fallbackSetting, (s) -> TimeValue.parseTimeValue(s, key), properties);
|
return new Setting<>(key, fallbackSetting, (s) -> TimeValue.parseTimeValue(s, key), properties);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static Setting<TimeValue> timeSetting(String key, Setting<TimeValue> fallBackSetting, Validator<TimeValue> validator,
|
||||||
|
Property... properties) {
|
||||||
|
return new Setting<>(new SimpleKey(key), fallBackSetting, fallBackSetting::getRaw, (s) -> TimeValue.parseTimeValue(s, key),
|
||||||
|
validator, properties);
|
||||||
|
}
|
||||||
|
|
||||||
public static Setting<TimeValue> positiveTimeSetting(String key, TimeValue defaultValue, Property... properties) {
|
public static Setting<TimeValue> positiveTimeSetting(String key, TimeValue defaultValue, Property... properties) {
|
||||||
return timeSetting(key, defaultValue, TimeValue.timeValueMillis(0), properties);
|
return timeSetting(key, defaultValue, TimeValue.timeValueMillis(0), properties);
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,7 +53,7 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
|
||||||
/**
|
/**
|
||||||
* The remote address for the proxy. The connections will be opened to the configured address.
|
* The remote address for the proxy. The connections will be opened to the configured address.
|
||||||
*/
|
*/
|
||||||
public static final Setting.AffixSetting<String> REMOTE_CLUSTER_ADDRESSES = Setting.affixKeySetting(
|
public static final Setting.AffixSetting<String> PROXY_ADDRESS = Setting.affixKeySetting(
|
||||||
"cluster.remote.",
|
"cluster.remote.",
|
||||||
"proxy_address",
|
"proxy_address",
|
||||||
(ns, key) -> Setting.simpleString(key, new StrategyValidator<>(ns, key, ConnectionStrategy.PROXY, s -> {
|
(ns, key) -> Setting.simpleString(key, new StrategyValidator<>(ns, key, ConnectionStrategy.PROXY, s -> {
|
||||||
|
@ -99,7 +99,7 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
|
||||||
transportService,
|
transportService,
|
||||||
connectionManager,
|
connectionManager,
|
||||||
REMOTE_SOCKET_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(settings),
|
REMOTE_SOCKET_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(settings),
|
||||||
REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(settings),
|
PROXY_ADDRESS.getConcreteSettingForNamespace(clusterAlias).get(settings),
|
||||||
SERVER_NAME.getConcreteSettingForNamespace(clusterAlias).get(settings));
|
SERVER_NAME.getConcreteSettingForNamespace(clusterAlias).get(settings));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -141,7 +141,7 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
|
||||||
}
|
}
|
||||||
|
|
||||||
static Stream<Setting.AffixSetting<?>> enablementSettings() {
|
static Stream<Setting.AffixSetting<?>> enablementSettings() {
|
||||||
return Stream.of(ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES);
|
return Stream.of(ProxyConnectionStrategy.PROXY_ADDRESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
static Writeable.Reader<RemoteConnectionInfo.ModeInfo> infoReader() {
|
static Writeable.Reader<RemoteConnectionInfo.ModeInfo> infoReader() {
|
||||||
|
@ -155,7 +155,7 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean strategyMustBeRebuilt(Settings newSettings) {
|
protected boolean strategyMustBeRebuilt(Settings newSettings) {
|
||||||
String address = REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
|
String address = PROXY_ADDRESS.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
|
||||||
int numOfSockets = REMOTE_SOCKET_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
|
int numOfSockets = REMOTE_SOCKET_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
|
||||||
return numOfSockets != maxNumConnections || configuredAddress.equals(address) == false;
|
return numOfSockets != maxNumConnections || configuredAddress.equals(address) == false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -129,7 +129,7 @@ public abstract class RemoteClusterAware {
|
||||||
SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY,
|
SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY,
|
||||||
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS,
|
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS,
|
||||||
SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS,
|
SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS,
|
||||||
ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES,
|
ProxyConnectionStrategy.PROXY_ADDRESS,
|
||||||
ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS,
|
ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS,
|
||||||
ProxyConnectionStrategy.SERVER_NAME);
|
ProxyConnectionStrategy.SERVER_NAME);
|
||||||
clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::validateAndUpdateRemoteCluster);
|
clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::validateAndUpdateRemoteCluster);
|
||||||
|
|
|
@ -42,8 +42,10 @@ import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -120,8 +122,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
|
||||||
Setting.affixKeySetting(
|
Setting.affixKeySetting(
|
||||||
"search.remote.",
|
"search.remote.",
|
||||||
"skip_unavailable",
|
"skip_unavailable",
|
||||||
key -> boolSetting(key, false, Setting.Property.Deprecated, Setting.Property.Dynamic, Setting.Property.NodeScope),
|
key -> boolSetting(key, false, Setting.Property.Deprecated, Setting.Property.Dynamic, Setting.Property.NodeScope));
|
||||||
() -> SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS);
|
|
||||||
|
|
||||||
public static final SettingUpgrader<Boolean> SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE_UPGRADER = new SettingUpgrader<Boolean>() {
|
public static final SettingUpgrader<Boolean> SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE_UPGRADER = new SettingUpgrader<Boolean>() {
|
||||||
|
|
||||||
|
@ -141,27 +142,27 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
|
||||||
Setting.affixKeySetting(
|
Setting.affixKeySetting(
|
||||||
"cluster.remote.",
|
"cluster.remote.",
|
||||||
"skip_unavailable",
|
"skip_unavailable",
|
||||||
key -> boolSetting(
|
(ns, key) -> boolSetting(
|
||||||
key,
|
key,
|
||||||
// the default needs to be false when fallback is removed
|
// the default needs to be false when fallback is removed
|
||||||
"_na_".equals(key)
|
"_na_".equals(key)
|
||||||
? SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSettingForNamespace(key)
|
? SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSettingForNamespace(key)
|
||||||
: SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSetting(key.replaceAll("^cluster", "search")),
|
: SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSetting(key.replaceAll("^cluster", "search")),
|
||||||
|
new RemoteConnectionEnabled<>(ns, key),
|
||||||
Setting.Property.Dynamic,
|
Setting.Property.Dynamic,
|
||||||
Setting.Property.NodeScope),
|
Setting.Property.NodeScope));
|
||||||
() -> SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS);
|
|
||||||
|
|
||||||
public static final Setting.AffixSetting<TimeValue> REMOTE_CLUSTER_PING_SCHEDULE = Setting.affixKeySetting(
|
public static final Setting.AffixSetting<TimeValue> REMOTE_CLUSTER_PING_SCHEDULE = Setting.affixKeySetting(
|
||||||
"cluster.remote.",
|
"cluster.remote.",
|
||||||
"transport.ping_schedule",
|
"transport.ping_schedule",
|
||||||
key -> timeSetting(key, TransportSettings.PING_SCHEDULE, Setting.Property.Dynamic, Setting.Property.NodeScope),
|
(ns, key) -> timeSetting(key, TransportSettings.PING_SCHEDULE, new RemoteConnectionEnabled<>(ns, key), Setting.Property.Dynamic,
|
||||||
() -> SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS);
|
Setting.Property.NodeScope));
|
||||||
|
|
||||||
public static final Setting.AffixSetting<Boolean> REMOTE_CLUSTER_COMPRESS = Setting.affixKeySetting(
|
public static final Setting.AffixSetting<Boolean> REMOTE_CLUSTER_COMPRESS = Setting.affixKeySetting(
|
||||||
"cluster.remote.",
|
"cluster.remote.",
|
||||||
"transport.compress",
|
"transport.compress",
|
||||||
key -> boolSetting(key, TransportSettings.TRANSPORT_COMPRESS, Setting.Property.Dynamic, Setting.Property.NodeScope),
|
(ns, key) -> boolSetting(key, TransportSettings.TRANSPORT_COMPRESS,
|
||||||
() -> SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS);
|
new RemoteConnectionEnabled<>(ns, key), Setting.Property.Dynamic, Setting.Property.NodeScope));
|
||||||
|
|
||||||
private final TransportService transportService;
|
private final TransportService transportService;
|
||||||
private final Map<String, RemoteClusterConnection> remoteClusters = ConcurrentCollections.newConcurrentMap();
|
private final Map<String, RemoteClusterConnection> remoteClusters = ConcurrentCollections.newConcurrentMap();
|
||||||
|
@ -436,4 +437,38 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
|
||||||
Collection<RemoteClusterConnection> getConnections() {
|
Collection<RemoteClusterConnection> getConnections() {
|
||||||
return remoteClusters.values();
|
return remoteClusters.values();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class RemoteConnectionEnabled<T> implements Setting.Validator<T> {
|
||||||
|
|
||||||
|
private final String clusterAlias;
|
||||||
|
private final String key;
|
||||||
|
|
||||||
|
private RemoteConnectionEnabled(String clusterAlias, String key) {
|
||||||
|
this.clusterAlias = clusterAlias;
|
||||||
|
this.key = key;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void validate(T value) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void validate(T value, Map<Setting<?>, Object> settings, boolean isPresent) {
|
||||||
|
if (isPresent && RemoteConnectionStrategy.isConnectionEnabled(clusterAlias, settings) == false) {
|
||||||
|
throw new IllegalArgumentException("Cannot configure setting [" + key + "] if remote cluster is not enabled.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Iterator<Setting<?>> settings() {
|
||||||
|
return Stream.concat(Stream.of(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias)),
|
||||||
|
settingsStream()).iterator();
|
||||||
|
}
|
||||||
|
|
||||||
|
private Stream<Setting<?>> settingsStream() {
|
||||||
|
return Arrays.stream(RemoteConnectionStrategy.ConnectionStrategy.values())
|
||||||
|
.flatMap(strategy -> strategy.getEnablementSettings().get())
|
||||||
|
.map(as -> as.getConcreteSettingForNamespace(clusterAlias));
|
||||||
|
}
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
|
@ -88,6 +88,10 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis
|
||||||
return numberOfChannels;
|
return numberOfChannels;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Supplier<Stream<Setting.AffixSetting<?>>> getEnablementSettings() {
|
||||||
|
return enablementSettings;
|
||||||
|
}
|
||||||
|
|
||||||
public Writeable.Reader<RemoteConnectionInfo.ModeInfo> getReader() {
|
public Writeable.Reader<RemoteConnectionInfo.ModeInfo> getReader() {
|
||||||
return reader.get();
|
return reader.get();
|
||||||
}
|
}
|
||||||
|
@ -149,7 +153,7 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis
|
||||||
|
|
||||||
static Set<String> getRemoteClusters(Settings settings) {
|
static Set<String> getRemoteClusters(Settings settings) {
|
||||||
final Stream<Setting.AffixSetting<?>> enablementSettings = Arrays.stream(ConnectionStrategy.values())
|
final Stream<Setting.AffixSetting<?>> enablementSettings = Arrays.stream(ConnectionStrategy.values())
|
||||||
.flatMap(strategy -> strategy.enablementSettings.get());
|
.flatMap(strategy -> strategy.getEnablementSettings().get());
|
||||||
return enablementSettings.flatMap(s -> getClusterAlias(settings, s)).collect(Collectors.toSet());
|
return enablementSettings.flatMap(s -> getClusterAlias(settings, s)).collect(Collectors.toSet());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -159,7 +163,21 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis
|
||||||
List<String> seeds = SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings);
|
List<String> seeds = SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings);
|
||||||
return seeds.isEmpty() == false;
|
return seeds.isEmpty() == false;
|
||||||
} else {
|
} else {
|
||||||
String address = ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(settings);
|
String address = ProxyConnectionStrategy.PROXY_ADDRESS.getConcreteSettingForNamespace(clusterAlias).get(settings);
|
||||||
|
return Strings.isEmpty(address) == false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public static boolean isConnectionEnabled(String clusterAlias, Map<Setting<?>, Object> settings) {
|
||||||
|
ConnectionStrategy mode = (ConnectionStrategy) settings.get(REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias));
|
||||||
|
if (mode.equals(ConnectionStrategy.SNIFF)) {
|
||||||
|
List<String> seeds = (List<String>) settings.get(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS
|
||||||
|
.getConcreteSettingForNamespace(clusterAlias));
|
||||||
|
return seeds.isEmpty() == false;
|
||||||
|
} else {
|
||||||
|
String address = (String) settings.get(ProxyConnectionStrategy.PROXY_ADDRESS
|
||||||
|
.getConcreteSettingForNamespace(clusterAlias));
|
||||||
return Strings.isEmpty(address) == false;
|
return Strings.isEmpty(address) == false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -291,7 +291,7 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
|
||||||
|
|
||||||
Setting<?> modeSetting = RemoteConnectionStrategy.REMOTE_CONNECTION_MODE
|
Setting<?> modeSetting = RemoteConnectionStrategy.REMOTE_CONNECTION_MODE
|
||||||
.getConcreteSettingForNamespace("cluster-alias");
|
.getConcreteSettingForNamespace("cluster-alias");
|
||||||
Setting<?> addressesSetting = ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES
|
Setting<?> addressesSetting = ProxyConnectionStrategy.PROXY_ADDRESS
|
||||||
.getConcreteSettingForNamespace("cluster-alias");
|
.getConcreteSettingForNamespace("cluster-alias");
|
||||||
Setting<?> socketConnections = ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS
|
Setting<?> socketConnections = ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS
|
||||||
.getConcreteSettingForNamespace("cluster-alias");
|
.getConcreteSettingForNamespace("cluster-alias");
|
||||||
|
@ -320,7 +320,7 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
|
||||||
|
|
||||||
public void testModeSettingsCannotBeUsedWhenInDifferentMode() {
|
public void testModeSettingsCannotBeUsedWhenInDifferentMode() {
|
||||||
List<Tuple<Setting.AffixSetting<?>, String>> restrictedSettings = Arrays.asList(
|
List<Tuple<Setting.AffixSetting<?>, String>> restrictedSettings = Arrays.asList(
|
||||||
new Tuple<>(ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES, "192.168.0.1:8080"),
|
new Tuple<>(ProxyConnectionStrategy.PROXY_ADDRESS, "192.168.0.1:8080"),
|
||||||
new Tuple<>(ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS, "3"));
|
new Tuple<>(ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS, "3"));
|
||||||
|
|
||||||
RemoteConnectionStrategy.ConnectionStrategy sniff = RemoteConnectionStrategy.ConnectionStrategy.SNIFF;
|
RemoteConnectionStrategy.ConnectionStrategy sniff = RemoteConnectionStrategy.ConnectionStrategy.SNIFF;
|
||||||
|
|
|
@ -620,7 +620,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
||||||
|
|
||||||
private static Settings buildProxySettings(String clusterAlias, List<String> addresses) {
|
private static Settings buildProxySettings(String clusterAlias, List<String> addresses) {
|
||||||
Settings.Builder builder = Settings.builder();
|
Settings.Builder builder = Settings.builder();
|
||||||
builder.put(ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).getKey(),
|
builder.put(ProxyConnectionStrategy.PROXY_ADDRESS.getConcreteSettingForNamespace(clusterAlias).getKey(),
|
||||||
addresses.get(0));
|
addresses.get(0));
|
||||||
builder.put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).getKey(), "proxy");
|
builder.put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).getKey(), "proxy");
|
||||||
return builder.build();
|
return builder.build();
|
||||||
|
|
|
@ -85,7 +85,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
|
||||||
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER));
|
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER));
|
||||||
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS));
|
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS));
|
||||||
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS));
|
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS));
|
||||||
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES));
|
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(ProxyConnectionStrategy.PROXY_ADDRESS));
|
||||||
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS));
|
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -671,40 +671,72 @@ public class RemoteClusterServiceTests extends ESTestCase {
|
||||||
public void testRemoteClusterSkipIfDisconnectedSetting() {
|
public void testRemoteClusterSkipIfDisconnectedSetting() {
|
||||||
{
|
{
|
||||||
Settings settings = Settings.builder()
|
Settings settings = Settings.builder()
|
||||||
.put("cluster.remote.foo.skip_unavailable", true)
|
.put("cluster.remote.foo.seeds", "127.0.0.1:9300")
|
||||||
.put("cluster.remote.bar.skip_unavailable", false).build();
|
.put("cluster.remote.foo.skip_unavailable", true).build();
|
||||||
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE.getAllConcreteSettings(settings).forEach(setting -> setting.get(settings));
|
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE.getAllConcreteSettings(settings).forEach(setting -> setting.get(settings));
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
Settings brokenSettings = Settings.builder()
|
Settings brokenSettingsDependency = Settings.builder()
|
||||||
|
.put("cluster.remote.foo.skip_unavailable", true).build();
|
||||||
|
IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () ->
|
||||||
|
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE.getAllConcreteSettings(brokenSettingsDependency)
|
||||||
|
.forEach(setting -> setting.get(brokenSettingsDependency)));
|
||||||
|
assertEquals("Cannot configure setting [cluster.remote.foo.skip_unavailable] if remote cluster is not enabled.",
|
||||||
|
iae.getMessage());
|
||||||
|
}
|
||||||
|
{
|
||||||
|
Settings brokenSettingsType = Settings.builder()
|
||||||
.put("cluster.remote.foo.skip_unavailable", "broken").build();
|
.put("cluster.remote.foo.skip_unavailable", "broken").build();
|
||||||
expectThrows(IllegalArgumentException.class, () ->
|
IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () ->
|
||||||
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE.getAllConcreteSettings(brokenSettings)
|
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE.getAllConcreteSettings(brokenSettingsType)
|
||||||
.forEach(setting -> setting.get(brokenSettings)));
|
.forEach(setting -> setting.get(brokenSettingsType)));
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
Settings settings = Settings.builder()
|
||||||
|
.put("cluster.remote.foo.mode", "proxy")
|
||||||
|
.put("cluster.remote.foo.proxy_address", "127.0.0.1:9300")
|
||||||
|
.put("cluster.remote.foo.transport.ping_schedule", "5s").build();
|
||||||
|
RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE.getAllConcreteSettings(settings).forEach(setting -> setting.get(settings));
|
||||||
|
}
|
||||||
|
{
|
||||||
|
Settings brokenSettingsDependency = Settings.builder()
|
||||||
|
.put("cluster.remote.foo.proxy_address", "127.0.0.1:9300")
|
||||||
|
.put("cluster.remote.foo.transport.ping_schedule", "5s").build();
|
||||||
|
IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () ->
|
||||||
|
RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE.getAllConcreteSettings(brokenSettingsDependency)
|
||||||
|
.forEach(setting -> setting.get(brokenSettingsDependency)));
|
||||||
|
assertEquals("Cannot configure setting [cluster.remote.foo.transport.ping_schedule] if remote cluster is not enabled.",
|
||||||
|
iae.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
Settings settings = Settings.builder()
|
||||||
|
.put("cluster.remote.foo.seeds", "127.0.0.1:9300")
|
||||||
|
.put("cluster.remote.foo.transport.compress", false).build();
|
||||||
|
RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getAllConcreteSettings(settings).forEach(setting -> setting.get(settings));
|
||||||
|
}
|
||||||
|
{
|
||||||
|
Settings brokenSettingsDependency = Settings.builder()
|
||||||
|
.put("cluster.remote.foo.proxy_address", "127.0.0.1:9300")
|
||||||
|
.put("cluster.remote.foo.transport.compress", true).build();
|
||||||
|
IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () ->
|
||||||
|
RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getAllConcreteSettings(brokenSettingsDependency)
|
||||||
|
.forEach(setting -> setting.get(brokenSettingsDependency)));
|
||||||
|
assertEquals("Cannot configure setting [cluster.remote.foo.transport.compress] if remote cluster is not enabled.",
|
||||||
|
iae.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
AbstractScopedSettings service = new ClusterSettings(Settings.EMPTY,
|
AbstractScopedSettings service = new ClusterSettings(Settings.EMPTY,
|
||||||
new HashSet<>(Arrays.asList(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS,
|
new HashSet<>(Arrays.asList(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS,
|
||||||
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE)));
|
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE)));
|
||||||
{
|
{
|
||||||
Settings settings = Settings.builder().put("cluster.remote.foo.skip_unavailable", randomBoolean()).build();
|
Settings brokenSettingsDependency = Settings.builder().put("cluster.remote.foo.skip_unavailable", randomBoolean()).build();
|
||||||
IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> service.validate(settings, true));
|
IllegalArgumentException iae = expectThrows(IllegalArgumentException.class,
|
||||||
assertEquals("missing required setting [cluster.remote.foo.seeds] for setting [cluster.remote.foo.skip_unavailable]",
|
() -> service.validate(brokenSettingsDependency, true));
|
||||||
|
assertEquals("Cannot configure setting [cluster.remote.foo.skip_unavailable] if remote cluster is not enabled.",
|
||||||
iae.getMessage());
|
iae.getMessage());
|
||||||
}
|
}
|
||||||
{
|
|
||||||
try (MockTransportService remoteSeedTransport = startTransport("seed", new CopyOnWriteArrayList<>(), Version.CURRENT)) {
|
|
||||||
String seed = remoteSeedTransport.getLocalDiscoNode().getAddress().toString();
|
|
||||||
service.validate(Settings.builder().put("cluster.remote.foo.skip_unavailable", randomBoolean())
|
|
||||||
.put("cluster.remote.foo.seeds", seed).build(), true);
|
|
||||||
service.validate(Settings.builder().put("cluster.remote.foo.seeds", seed).build(), true);
|
|
||||||
|
|
||||||
AbstractScopedSettings service2 = new ClusterSettings(Settings.builder().put("cluster.remote.foo.seeds", seed).build(),
|
|
||||||
new HashSet<>(Arrays.asList(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS,
|
|
||||||
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE)));
|
|
||||||
service2.validate(Settings.builder().put("cluster.remote.foo.skip_unavailable", randomBoolean()).build(), false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testReconnectWhenStrategySettingsUpdated() throws Exception {
|
public void testReconnectWhenStrategySettingsUpdated() throws Exception {
|
||||||
|
|
|
@ -36,6 +36,7 @@ public class RemoteConnectionStrategyTests extends ESTestCase {
|
||||||
RemoteConnectionStrategy.ConnectionStrategy.PROXY);
|
RemoteConnectionStrategy.ConnectionStrategy.PROXY);
|
||||||
Settings newSettings = Settings.builder()
|
Settings newSettings = Settings.builder()
|
||||||
.put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace("cluster-alias").getKey(), "sniff")
|
.put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace("cluster-alias").getKey(), "sniff")
|
||||||
|
.put(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace("cluster-alias").getKey(), "127.0.0.1:9300")
|
||||||
.build();
|
.build();
|
||||||
assertTrue(first.shouldRebuildConnection(newSettings));
|
assertTrue(first.shouldRebuildConnection(newSettings));
|
||||||
}
|
}
|
||||||
|
@ -47,6 +48,7 @@ public class RemoteConnectionStrategyTests extends ESTestCase {
|
||||||
RemoteConnectionStrategy.ConnectionStrategy.PROXY);
|
RemoteConnectionStrategy.ConnectionStrategy.PROXY);
|
||||||
Settings newSettings = Settings.builder()
|
Settings newSettings = Settings.builder()
|
||||||
.put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace("cluster-alias").getKey(), "proxy")
|
.put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace("cluster-alias").getKey(), "proxy")
|
||||||
|
.put(ProxyConnectionStrategy.PROXY_ADDRESS.getConcreteSettingForNamespace("cluster-alias").getKey(), "127.0.0.1:9300")
|
||||||
.build();
|
.build();
|
||||||
assertFalse(first.shouldRebuildConnection(newSettings));
|
assertFalse(first.shouldRebuildConnection(newSettings));
|
||||||
}
|
}
|
||||||
|
@ -61,6 +63,7 @@ public class RemoteConnectionStrategyTests extends ESTestCase {
|
||||||
|
|
||||||
Settings.Builder newBuilder = Settings.builder();
|
Settings.Builder newBuilder = Settings.builder();
|
||||||
newBuilder.put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace("cluster-alias").getKey(), "proxy");
|
newBuilder.put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace("cluster-alias").getKey(), "proxy");
|
||||||
|
newBuilder.put(ProxyConnectionStrategy.PROXY_ADDRESS.getConcreteSettingForNamespace("cluster-alias").getKey(), "127.0.0.1:9300");
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
newBuilder.put(RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace("cluster-alias").getKey(),
|
newBuilder.put(RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace("cluster-alias").getKey(),
|
||||||
TimeValue.timeValueSeconds(5));
|
TimeValue.timeValueSeconds(5));
|
||||||
|
|
Loading…
Reference in New Issue