Upgrade remote cluster settings (#33537)
This commit adds settings upgraders for the search.remote.* settings that can be in the cluster state to automatically upgrade these settings to cluster.remote.*. Because of the infrastructure that we have here, these settings can be upgraded when recovering the cluster state, but also when a user tries to make a dynamic update for these settings.
This commit is contained in:
parent
94cdf0ceba
commit
c74c46edc3
|
@ -997,15 +997,9 @@ public class FullClusterRestartIT extends AbstractFullClusterRestartTestCase {
|
|||
Request clusterSettingsRequest = new Request("GET", "/_cluster/settings");
|
||||
clusterSettingsRequest.addParameter("flat_settings", "true");
|
||||
Map<String, Object> clusterSettingsResponse = entityAsMap(client().performRequest(clusterSettingsRequest));
|
||||
Map<String, Object> expectedClusterSettings = new HashMap<>();
|
||||
expectedClusterSettings.put("transient", emptyMap());
|
||||
expectedClusterSettings.put("persistent",
|
||||
singletonMap("cluster.routing.allocation.exclude.test_attr", getOldClusterVersion().toString()));
|
||||
if (expectedClusterSettings.equals(clusterSettingsResponse) == false) {
|
||||
NotEqualMessageBuilder builder = new NotEqualMessageBuilder();
|
||||
builder.compareMaps(clusterSettingsResponse, expectedClusterSettings);
|
||||
fail("settings don't match:\n" + builder.toString());
|
||||
}
|
||||
@SuppressWarnings("unchecked") final Map<String, Object> persistentSettings =
|
||||
(Map<String, Object>)clusterSettingsResponse.get("persistent");
|
||||
assertThat(persistentSettings.get("cluster.routing.allocation.exclude.test_attr"), equalTo(getOldClusterVersion().toString()));
|
||||
|
||||
// Check that the template was restored successfully
|
||||
Map<String, Object> getTemplateResponse = entityAsMap(client().performRequest(new Request("GET", "/_template/test_template")));
|
||||
|
|
|
@ -0,0 +1,101 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.upgrades;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.admin.cluster.settings.ClusterGetSettingsResponse;
|
||||
import org.elasticsearch.client.Request;
|
||||
import org.elasticsearch.client.Response;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.transport.RemoteClusterService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.elasticsearch.transport.RemoteClusterAware.SEARCH_REMOTE_CLUSTERS_SEEDS;
|
||||
import static org.elasticsearch.transport.RemoteClusterService.SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class FullClusterRestartSettingsUpgradeIT extends AbstractFullClusterRestartTestCase {
|
||||
|
||||
public void testRemoteClusterSettingsUpgraded() throws IOException {
|
||||
assumeTrue("settings automatically upgraded since 7.0.0", getOldClusterVersion().before(Version.V_7_0_0_alpha1));
|
||||
if (isRunningAgainstOldCluster()) {
|
||||
final Request putSettingsRequest = new Request("PUT", "/_cluster/settings");
|
||||
try (XContentBuilder builder = jsonBuilder()) {
|
||||
builder.startObject();
|
||||
{
|
||||
builder.startObject("persistent");
|
||||
{
|
||||
builder.field("search.remote.foo.skip_unavailable", true);
|
||||
builder.field("search.remote.foo.seeds", Collections.singletonList("localhost:9200"));
|
||||
}
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endObject();
|
||||
putSettingsRequest.setJsonEntity(Strings.toString(builder));
|
||||
}
|
||||
client().performRequest(putSettingsRequest);
|
||||
|
||||
final Request getSettingsRequest = new Request("GET", "/_cluster/settings");
|
||||
final Response response = client().performRequest(getSettingsRequest);
|
||||
try (XContentParser parser = createParser(JsonXContent.jsonXContent, response.getEntity().getContent())) {
|
||||
final ClusterGetSettingsResponse clusterGetSettingsResponse = ClusterGetSettingsResponse.fromXContent(parser);
|
||||
final Settings settings = clusterGetSettingsResponse.getPersistentSettings();
|
||||
|
||||
assertTrue(SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSettingForNamespace("foo").exists(settings));
|
||||
assertTrue(SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSettingForNamespace("foo").get(settings));
|
||||
assertTrue(SEARCH_REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace("foo").exists(settings));
|
||||
assertThat(
|
||||
SEARCH_REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace("foo").get(settings),
|
||||
equalTo(Collections.singletonList("localhost:9200")));
|
||||
}
|
||||
|
||||
assertSettingDeprecationsAndWarnings(new Setting<?>[]{
|
||||
SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSettingForNamespace("foo"),
|
||||
SEARCH_REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace("foo")});
|
||||
} else {
|
||||
final Request getSettingsRequest = new Request("GET", "/_cluster/settings");
|
||||
final Response getSettingsResponse = client().performRequest(getSettingsRequest);
|
||||
try (XContentParser parser = createParser(JsonXContent.jsonXContent, getSettingsResponse.getEntity().getContent())) {
|
||||
final ClusterGetSettingsResponse clusterGetSettingsResponse = ClusterGetSettingsResponse.fromXContent(parser);
|
||||
final Settings settings = clusterGetSettingsResponse.getPersistentSettings();
|
||||
|
||||
assertFalse(SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSettingForNamespace("foo").exists(settings));
|
||||
assertTrue(
|
||||
settings.toString(),
|
||||
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSettingForNamespace("foo").exists(settings));
|
||||
assertTrue(RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSettingForNamespace("foo").get(settings));
|
||||
assertFalse(SEARCH_REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace("foo").exists(settings));
|
||||
assertTrue(RemoteClusterService.REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace("foo").exists(settings));
|
||||
assertThat(
|
||||
RemoteClusterService.REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace("foo").get(settings),
|
||||
equalTo(Collections.singletonList("localhost:9200")));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -788,7 +788,8 @@ public abstract class AbstractScopedSettings extends AbstractComponent {
|
|||
} else {
|
||||
// the setting has an upgrader, so mark that we have changed a setting and apply the upgrade logic
|
||||
changed = true;
|
||||
if (setting.isListSetting()) {
|
||||
// noinspection ConstantConditions
|
||||
if (setting.getConcreteSetting(key).isListSetting()) {
|
||||
final List<String> value = settings.getAsList(key);
|
||||
final String upgradedKey = upgrader.getKey(key);
|
||||
final List<String> upgradedValue = upgrader.getListValue(value);
|
||||
|
|
|
@ -443,6 +443,9 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
|||
EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING
|
||||
)));
|
||||
|
||||
public static List<SettingUpgrader<?>> BUILT_IN_SETTING_UPGRADERS = Collections.emptyList();
|
||||
public static List<SettingUpgrader<?>> BUILT_IN_SETTING_UPGRADERS = Collections.unmodifiableList(Arrays.asList(
|
||||
RemoteClusterAware.SEARCH_REMOTE_CLUSTER_SEEDS_UPGRADER,
|
||||
RemoteClusterAware.SEARCH_REMOTE_CLUSTERS_PROXY_UPGRADER,
|
||||
RemoteClusterService.SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE_UPGRADER));
|
||||
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.elasticsearch.common.collect.Tuple;
|
|||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.SettingUpgrader;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
|
||||
|
@ -66,6 +67,20 @@ public abstract class RemoteClusterAware extends AbstractComponent {
|
|||
Setting.Property.Dynamic,
|
||||
Setting.Property.NodeScope));
|
||||
|
||||
public static final SettingUpgrader<List<String>> SEARCH_REMOTE_CLUSTER_SEEDS_UPGRADER = new SettingUpgrader<List<String>>() {
|
||||
|
||||
@Override
|
||||
public Setting<List<String>> getSetting() {
|
||||
return SEARCH_REMOTE_CLUSTERS_SEEDS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getKey(final String key) {
|
||||
return key.replaceFirst("^search", "cluster");
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
/**
|
||||
* A list of initial seed nodes to discover eligible nodes from the remote cluster
|
||||
*/
|
||||
|
@ -105,6 +120,20 @@ public abstract class RemoteClusterAware extends AbstractComponent {
|
|||
Setting.Property.NodeScope),
|
||||
REMOTE_CLUSTERS_SEEDS);
|
||||
|
||||
public static final SettingUpgrader<String> SEARCH_REMOTE_CLUSTERS_PROXY_UPGRADER = new SettingUpgrader<String>() {
|
||||
|
||||
@Override
|
||||
public Setting<String> getSetting() {
|
||||
return SEARCH_REMOTE_CLUSTERS_PROXY;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getKey(final String key) {
|
||||
return key.replaceFirst("^search", "cluster");
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
/**
|
||||
* A proxy address for the remote cluster.
|
||||
* NOTE: this settings is undocumented until we have at last one transport that supports passing
|
||||
|
|
|
@ -19,8 +19,6 @@
|
|||
|
||||
package org.elasticsearch.transport;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.function.Supplier;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.OriginalIndices;
|
||||
|
@ -35,6 +33,7 @@ import org.elasticsearch.common.Strings;
|
|||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.SettingUpgrader;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.CountDown;
|
||||
|
@ -43,6 +42,7 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
@ -55,6 +55,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
import java.util.function.BiFunction;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
|
@ -132,6 +133,20 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
|
|||
key -> boolSetting(key, false, Setting.Property.Deprecated, Setting.Property.Dynamic, Setting.Property.NodeScope),
|
||||
REMOTE_CLUSTERS_SEEDS);
|
||||
|
||||
public static final SettingUpgrader<Boolean> SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE_UPGRADER = new SettingUpgrader<Boolean>() {
|
||||
|
||||
@Override
|
||||
public Setting<Boolean> getSetting() {
|
||||
return SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getKey(final String key) {
|
||||
return key.replaceFirst("^search", "cluster");
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
public static final Setting.AffixSetting<Boolean> REMOTE_CLUSTER_SKIP_UNAVAILABLE =
|
||||
Setting.affixKeySetting(
|
||||
"cluster.remote.",
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
|||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||
import org.elasticsearch.transport.RemoteClusterService;
|
||||
import org.junit.After;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
@ -122,4 +123,37 @@ public class UpgradeSettingsIT extends ESSingleNodeTestCase {
|
|||
assertThat(UpgradeSettingsPlugin.newSetting.get(settingsFunction.apply(response.getState().metaData())), equalTo("new." + value));
|
||||
}
|
||||
|
||||
public void testUpgradeRemoteClusterSettings() {
|
||||
final boolean skipUnavailable = randomBoolean();
|
||||
client()
|
||||
.admin()
|
||||
.cluster()
|
||||
.prepareUpdateSettings()
|
||||
.setPersistentSettings(
|
||||
Settings.builder()
|
||||
.put("search.remote.foo.skip_unavailable", skipUnavailable)
|
||||
.putList("search.remote.foo.seeds", Collections.singletonList("localhost:9200"))
|
||||
.put("search.remote.foo.proxy", "localhost:9200")
|
||||
.build())
|
||||
.get();
|
||||
|
||||
final ClusterStateResponse response = client().admin().cluster().prepareState().clear().setMetaData(true).get();
|
||||
|
||||
final Settings settings = response.getState().metaData().persistentSettings();
|
||||
assertFalse(RemoteClusterService.SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSettingForNamespace("foo").exists(settings));
|
||||
assertTrue(RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSettingForNamespace("foo").exists(settings));
|
||||
assertThat(
|
||||
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSettingForNamespace("foo").get(settings),
|
||||
equalTo(skipUnavailable));
|
||||
assertFalse(RemoteClusterService.SEARCH_REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace("foo").exists(settings));
|
||||
assertTrue(RemoteClusterService.REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace("foo").exists(settings));
|
||||
assertThat(
|
||||
RemoteClusterService.REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace("foo").get(settings),
|
||||
equalTo(Collections.singletonList("localhost:9200")));
|
||||
assertFalse(RemoteClusterService.SEARCH_REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace("foo").exists(settings));
|
||||
assertTrue(RemoteClusterService.REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace("foo").exists(settings));
|
||||
assertThat(
|
||||
RemoteClusterService.REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace("foo").get(settings), equalTo("localhost:9200"));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -182,6 +182,7 @@ subprojects {
|
|||
systemProperty 'tests.old_cluster_version', version.toString().minus("-SNAPSHOT")
|
||||
systemProperty 'tests.path.repo', new File(buildDir, "cluster/shared/repo")
|
||||
exclude 'org/elasticsearch/upgrades/FullClusterRestartIT.class'
|
||||
exclude 'org/elasticsearch/upgrades/FullClusterRestartSettingsUpgradeIT.class'
|
||||
exclude 'org/elasticsearch/upgrades/QueryBuilderBWCIT.class'
|
||||
}
|
||||
|
||||
|
@ -218,6 +219,7 @@ subprojects {
|
|||
systemProperty 'tests.old_cluster_version', version.toString().minus("-SNAPSHOT")
|
||||
systemProperty 'tests.path.repo', new File(buildDir, "cluster/shared/repo")
|
||||
exclude 'org/elasticsearch/upgrades/FullClusterRestartIT.class'
|
||||
exclude 'org/elasticsearch/upgrades/FullClusterRestartSettingsUpgradeIT.class'
|
||||
exclude 'org/elasticsearch/upgrades/QueryBuilderBWCIT.class'
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,24 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.restart;
|
||||
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Base64;
|
||||
|
||||
public class FullClusterRestartSettingsUpgradeIT extends org.elasticsearch.upgrades.FullClusterRestartSettingsUpgradeIT {
|
||||
|
||||
@Override
|
||||
protected Settings restClientSettings() {
|
||||
final String token =
|
||||
"Basic " + Base64.getEncoder().encodeToString("test_user:x-pack-test-password".getBytes(StandardCharsets.UTF_8));
|
||||
return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build();
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue