Watcher: Remove index template configuration via cluster settings (elastic/x-pack-elasticsearch#1680)
Watcher had an undocumented feature to configure the settings of any index template via updating the cluster settings. Instead of changing the template one could add a setting during runtime that overwrote what is written in the index template. As index template are created once and not overwritten, users should just change the index template - or the concrete index settings like number of replicas. This feature was not exposed in our documentation at all. Original commit: elastic/x-pack-elasticsearch@32e1769925
This commit is contained in:
parent
c1a3f50e19
commit
7f48337bf6
|
@ -112,7 +112,6 @@ import org.elasticsearch.xpack.watcher.rest.action.RestPutWatchAction;
|
|||
import org.elasticsearch.xpack.watcher.rest.action.RestWatchServiceAction;
|
||||
import org.elasticsearch.xpack.watcher.rest.action.RestWatcherStatsAction;
|
||||
import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry;
|
||||
import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry.TemplateConfig;
|
||||
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateService;
|
||||
import org.elasticsearch.xpack.watcher.transform.TransformFactory;
|
||||
import org.elasticsearch.xpack.watcher.transform.TransformRegistry;
|
||||
|
@ -304,8 +303,8 @@ public class Watcher implements ActionPlugin {
|
|||
final Consumer<Iterable<TriggerEvent>> triggerEngineListener = getTriggerEngineListener(executionService);
|
||||
triggerService.register(triggerEngineListener);
|
||||
|
||||
final WatcherIndexTemplateRegistry watcherIndexTemplateRegistry = new WatcherIndexTemplateRegistry(settings,
|
||||
clusterService.getClusterSettings(), clusterService, threadPool, internalClient);
|
||||
final WatcherIndexTemplateRegistry watcherIndexTemplateRegistry = new WatcherIndexTemplateRegistry(settings, clusterService,
|
||||
threadPool, internalClient);
|
||||
|
||||
WatcherService watcherService = new WatcherService(settings, triggerService, triggeredWatchStore, executionService,
|
||||
watchParser, internalClient);
|
||||
|
@ -361,9 +360,6 @@ public class Watcher implements ActionPlugin {
|
|||
|
||||
public List<Setting<?>> getSettings() {
|
||||
List<Setting<?>> settings = new ArrayList<>();
|
||||
for (TemplateConfig templateConfig : WatcherIndexTemplateRegistry.TEMPLATE_CONFIGS) {
|
||||
settings.add(templateConfig.getSetting());
|
||||
}
|
||||
settings.add(INDEX_WATCHER_TEMPLATE_VERSION_SETTING);
|
||||
settings.add(MAX_STOP_TIMEOUT_SETTING);
|
||||
settings.add(ExecutionService.DEFAULT_THROTTLE_PERIOD_SETTING);
|
||||
|
|
|
@ -6,7 +6,6 @@
|
|||
package org.elasticsearch.xpack.watcher.support;
|
||||
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.logging.log4j.util.Supplier;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
|
||||
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
|
||||
|
@ -15,8 +14,6 @@ import org.elasticsearch.cluster.ClusterState;
|
|||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
|
@ -26,62 +23,37 @@ import org.elasticsearch.xpack.security.InternalClient;
|
|||
import org.elasticsearch.xpack.template.TemplateUtils;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import static java.util.Collections.unmodifiableMap;
|
||||
|
||||
public class WatcherIndexTemplateRegistry extends AbstractComponent implements ClusterStateListener {
|
||||
|
||||
private static final String FORBIDDEN_INDEX_SETTING = "index.mapper.dynamic";
|
||||
public static final String INDEX_TEMPLATE_VERSION = "6";
|
||||
|
||||
public static final String HISTORY_TEMPLATE_NAME = ".watch-history-" + INDEX_TEMPLATE_VERSION;
|
||||
public static final String TRIGGERED_TEMPLATE_NAME = ".triggered_watches";
|
||||
public static final String WATCHES_TEMPLATE_NAME = ".watches";
|
||||
|
||||
public static final Setting<Settings> HISTORY_TEMPLATE_SETTING = Setting.groupSetting("xpack.watcher.history.index.",
|
||||
Setting.Property.Dynamic, Setting.Property.NodeScope);
|
||||
public static final Setting<Settings> TRIGGERED_TEMPLATE_SETTING = Setting.groupSetting("xpack.watcher.triggered_watches.index.",
|
||||
Setting.Property.Dynamic, Setting.Property.NodeScope);
|
||||
public static final Setting<Settings> WATCHES_TEMPLATE_SETTING = Setting.groupSetting("xpack.watcher.watches.index.",
|
||||
Setting.Property.Dynamic, Setting.Property.NodeScope);
|
||||
public static final TemplateConfig[] TEMPLATE_CONFIGS = new TemplateConfig[]{
|
||||
new TemplateConfig(TRIGGERED_TEMPLATE_NAME, "triggered-watches", TRIGGERED_TEMPLATE_SETTING),
|
||||
new TemplateConfig(HISTORY_TEMPLATE_NAME, "watch-history", HISTORY_TEMPLATE_SETTING),
|
||||
new TemplateConfig(WATCHES_TEMPLATE_NAME, "watches", WATCHES_TEMPLATE_SETTING)
|
||||
new TemplateConfig(TRIGGERED_TEMPLATE_NAME, "triggered-watches"),
|
||||
new TemplateConfig(HISTORY_TEMPLATE_NAME, "watch-history"),
|
||||
new TemplateConfig(WATCHES_TEMPLATE_NAME, "watches")
|
||||
};
|
||||
|
||||
private final InternalClient client;
|
||||
private final ThreadPool threadPool;
|
||||
private final ClusterService clusterService;
|
||||
private final TemplateConfig[] indexTemplates;
|
||||
|
||||
private final ConcurrentMap<String, AtomicBoolean> templateCreationsInProgress = new ConcurrentHashMap<>();
|
||||
|
||||
private volatile Map<String, Settings> customIndexSettings;
|
||||
|
||||
public WatcherIndexTemplateRegistry(Settings settings, ClusterSettings clusterSettings, ClusterService clusterService,
|
||||
ThreadPool threadPool, InternalClient client) {
|
||||
public WatcherIndexTemplateRegistry(Settings settings, ClusterService clusterService, ThreadPool threadPool, InternalClient client) {
|
||||
super(settings);
|
||||
this.client = client;
|
||||
this.threadPool = threadPool;
|
||||
this.clusterService = clusterService;
|
||||
this.indexTemplates = TEMPLATE_CONFIGS;
|
||||
clusterService.addListener(this);
|
||||
|
||||
Map<String, Settings> customIndexSettings = new HashMap<>();
|
||||
for (TemplateConfig indexTemplate : indexTemplates) {
|
||||
clusterSettings.addSettingsUpdateConsumer(indexTemplate.getSetting(), (s) -> updateConfig(indexTemplate, s));
|
||||
Settings customSettings = this.settings.getAsSettings(indexTemplate.getSetting().getKey());
|
||||
customIndexSettings.put(indexTemplate.getSetting().getKey(), customSettings);
|
||||
}
|
||||
this.customIndexSettings = unmodifiableMap(customIndexSettings);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -102,7 +74,7 @@ public class WatcherIndexTemplateRegistry extends AbstractComponent implements C
|
|||
addTemplatesIfMissing(state);
|
||||
}
|
||||
|
||||
void addTemplatesIfMissing(ClusterState state) {
|
||||
private void addTemplatesIfMissing(ClusterState state) {
|
||||
for (TemplateConfig template : indexTemplates) {
|
||||
final String templateName = template.getTemplateName();
|
||||
final AtomicBoolean creationCheck = templateCreationsInProgress.computeIfAbsent(templateName, key -> new AtomicBoolean(false));
|
||||
|
@ -119,47 +91,6 @@ public class WatcherIndexTemplateRegistry extends AbstractComponent implements C
|
|||
}
|
||||
}
|
||||
|
||||
private void updateConfig(TemplateConfig config, Settings settings) {
|
||||
if (clusterService.localNode().isMasterNode() == false) {
|
||||
// Only the node that runs or will run Watcher should update the templates. Otherwise unnecessary put template
|
||||
// calls would happen
|
||||
return;
|
||||
}
|
||||
if (settings.names().isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
Settings existingSettings = customIndexSettings.get(config.getSetting().getKey());
|
||||
if (existingSettings == null) {
|
||||
existingSettings = Settings.EMPTY;
|
||||
}
|
||||
|
||||
boolean changed = false;
|
||||
Settings.Builder builder = Settings.builder().put(existingSettings);
|
||||
for (Map.Entry<String, String> newSettingsEntry : settings.getAsMap().entrySet()) {
|
||||
String name = "index." + newSettingsEntry.getKey();
|
||||
if (FORBIDDEN_INDEX_SETTING.equals(name)) {
|
||||
logger.warn("overriding the default [{}} setting is forbidden. ignoring...", name);
|
||||
continue;
|
||||
}
|
||||
|
||||
String newValue = newSettingsEntry.getValue();
|
||||
String currentValue = existingSettings.get(name);
|
||||
if (!newValue.equals(currentValue)) {
|
||||
changed = true;
|
||||
builder.put(name, newValue);
|
||||
logger.info("changing setting [{}] from [{}] to [{}]", name, currentValue, newValue);
|
||||
}
|
||||
}
|
||||
|
||||
if (changed) {
|
||||
Map<String, Settings> customIndexSettings = new HashMap<String, Settings>(this.customIndexSettings);
|
||||
customIndexSettings.put(config.getSetting().getKey(), builder.build());
|
||||
this.customIndexSettings = customIndexSettings;
|
||||
putTemplate(config, templateCreationsInProgress.computeIfAbsent(config.getTemplateName(), key -> new AtomicBoolean(true)));
|
||||
}
|
||||
}
|
||||
|
||||
private void putTemplate(final TemplateConfig config, final AtomicBoolean creationCheck) {
|
||||
final Executor executor = threadPool.generic();
|
||||
executor.execute(() -> {
|
||||
|
@ -169,14 +100,6 @@ public class WatcherIndexTemplateRegistry extends AbstractComponent implements C
|
|||
|
||||
PutIndexTemplateRequest request = new PutIndexTemplateRequest(templateName).source(template, XContentType.JSON);
|
||||
request.masterNodeTimeout(TimeValue.timeValueMinutes(1));
|
||||
Settings customSettings = customIndexSettings.get(config.getSetting().getKey());
|
||||
if (customSettings != null && customSettings.names().size() > 0) {
|
||||
Settings updatedSettings = Settings.builder()
|
||||
.put(request.settings())
|
||||
.put(customSettings)
|
||||
.build();
|
||||
request.settings(updatedSettings);
|
||||
}
|
||||
client.admin().indices().putTemplate(request, new ActionListener<PutIndexTemplateResponse>() {
|
||||
@Override
|
||||
public void onResponse(PutIndexTemplateResponse response) {
|
||||
|
@ -199,12 +122,10 @@ public class WatcherIndexTemplateRegistry extends AbstractComponent implements C
|
|||
|
||||
private final String templateName;
|
||||
private String fileName;
|
||||
private final Setting<Settings> setting;
|
||||
|
||||
public TemplateConfig(String templateName, String fileName, Setting<Settings> setting) {
|
||||
public TemplateConfig(String templateName, String fileName) {
|
||||
this.templateName = templateName;
|
||||
this.fileName = fileName;
|
||||
this.setting = setting;
|
||||
}
|
||||
|
||||
public String getFileName() {
|
||||
|
@ -214,9 +135,5 @@ public class WatcherIndexTemplateRegistry extends AbstractComponent implements C
|
|||
public String getTemplateName() {
|
||||
return templateName;
|
||||
}
|
||||
|
||||
public Setting<Settings> getSetting() {
|
||||
return setting;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,75 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.watcher.history;
|
||||
|
||||
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry;
|
||||
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
|
||||
|
||||
import static org.elasticsearch.test.ESIntegTestCase.Scope.TEST;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
|
||||
@TestLogging("org.elasticsearch.cluster:DEBUG,org.elasticsearch.action.admin.cluster.settings:DEBUG,org.elasticsearch.xpack.watcher:DEBUG")
|
||||
@ESIntegTestCase.ClusterScope(scope = TEST, numClientNodes = 0, transportClientRatio = 0, randomDynamicTemplates = false,
|
||||
supportsDedicatedMasters = false, numDataNodes = 1)
|
||||
public class HistoryStoreSettingsTests extends AbstractWatcherIntegrationTestCase {
|
||||
|
||||
public void testChangeSettings() throws Exception {
|
||||
GetIndexTemplatesResponse response = client().admin().indices()
|
||||
.prepareGetTemplates(WatcherIndexTemplateRegistry.HISTORY_TEMPLATE_NAME).get();
|
||||
assertThat(response.getIndexTemplates().get(0).getSettings().get("index.number_of_shards"), equalTo("1"));
|
||||
// this isn't defined in the template, so we rely on ES's default, which is zero
|
||||
assertThat(response.getIndexTemplates().get(0).getSettings().get("index.number_of_replicas"), nullValue());
|
||||
// this isn't defined in the template, so we rely on ES's default, which is 1s
|
||||
assertThat(response.getIndexTemplates().get(0).getSettings().get("index.refresh_interval"), nullValue());
|
||||
assertAcked(
|
||||
client().admin().cluster().prepareUpdateSettings()
|
||||
.setTransientSettings(Settings.builder()
|
||||
.put("xpack.watcher.history.index.number_of_shards", "2")
|
||||
.put("xpack.watcher.history.index.number_of_replicas", "2")
|
||||
.put("xpack.watcher.history.index.refresh_interval", "5m"))
|
||||
.get()
|
||||
);
|
||||
|
||||
// use assertBusy(...) because we update the index template in an async manner
|
||||
assertBusy(() -> {
|
||||
GetIndexTemplatesResponse response1 = client().admin().indices()
|
||||
.prepareGetTemplates(WatcherIndexTemplateRegistry.HISTORY_TEMPLATE_NAME).get();
|
||||
assertThat(response1.getIndexTemplates().get(0).getSettings().get("index.number_of_shards"), equalTo("2"));
|
||||
assertThat(response1.getIndexTemplates().get(0).getSettings().get("index.number_of_replicas"), equalTo("2"));
|
||||
assertThat(response1.getIndexTemplates().get(0).getSettings().get("index.refresh_interval"), equalTo("5m"));
|
||||
});
|
||||
}
|
||||
|
||||
public void testChangeSettingsIgnoringForbiddenSetting() throws Exception {
|
||||
GetIndexTemplatesResponse response = client().admin().indices()
|
||||
.prepareGetTemplates(WatcherIndexTemplateRegistry.HISTORY_TEMPLATE_NAME).get();
|
||||
assertThat(response.getIndexTemplates().get(0).getSettings().get("index.number_of_shards"), equalTo("1"));
|
||||
assertThat(response.getIndexTemplates().get(0).getSettings().getAsBoolean("index.mapper.dynamic", null), is(false));
|
||||
|
||||
assertAcked(
|
||||
client().admin().cluster().prepareUpdateSettings()
|
||||
.setTransientSettings(Settings.builder()
|
||||
.put("xpack.watcher.history.index.number_of_shards", "2")
|
||||
.put("xpack.watcher.history.index.mapper.dynamic", true)) // forbidden setting, should not get updated
|
||||
.get()
|
||||
);
|
||||
|
||||
// use assertBusy(...) because we update the index template in an async manner
|
||||
assertBusy(() -> {
|
||||
GetIndexTemplatesResponse response1 = client().admin().indices()
|
||||
.prepareGetTemplates(WatcherIndexTemplateRegistry.HISTORY_TEMPLATE_NAME).get();
|
||||
assertThat(response1.getIndexTemplates().get(0).getSettings().get("index.number_of_shards"), equalTo("2"));
|
||||
assertThat(response1.getIndexTemplates().get(0).getSettings().getAsBoolean("index.mapper.dynamic", null), is(false));
|
||||
});
|
||||
}
|
||||
}
|
|
@ -39,7 +39,6 @@ import org.junit.Before;
|
|||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static org.elasticsearch.xpack.watcher.history.HistoryStore.getHistoryIndexNameForTime;
|
||||
import static org.elasticsearch.xpack.watcher.test.WatcherMatchers.indexRequest;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
|
|
|
@ -20,7 +20,6 @@ import org.elasticsearch.cluster.metadata.MetaData;
|
|||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
|
@ -30,14 +29,10 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
import org.elasticsearch.xpack.security.InternalClient;
|
||||
import org.junit.Before;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.elasticsearch.mock.orig.Mockito.verify;
|
||||
import static org.elasticsearch.mock.orig.Mockito.when;
|
||||
|
@ -56,11 +51,7 @@ public class WatcherIndexTemplateRegistryTests extends ESTestCase {
|
|||
|
||||
@Before
|
||||
public void createRegistryAndClient() {
|
||||
Set<Setting<?>> registeredSettings = new HashSet<>();
|
||||
registeredSettings.add(WatcherIndexTemplateRegistry.HISTORY_TEMPLATE_SETTING);
|
||||
registeredSettings.add(WatcherIndexTemplateRegistry.TRIGGERED_TEMPLATE_SETTING);
|
||||
registeredSettings.add(WatcherIndexTemplateRegistry.WATCHES_TEMPLATE_SETTING);
|
||||
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, registeredSettings);
|
||||
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, Collections.emptySet());
|
||||
|
||||
ThreadPool threadPool = mock(ThreadPool.class);
|
||||
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
|
||||
|
@ -72,19 +63,15 @@ public class WatcherIndexTemplateRegistryTests extends ESTestCase {
|
|||
IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class);
|
||||
when(adminClient.indices()).thenReturn(indicesAdminClient);
|
||||
when(client.admin()).thenReturn(adminClient);
|
||||
doAnswer(new Answer<Void>() {
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocationOnMock) {
|
||||
ActionListener<PutIndexTemplateResponse> listener =
|
||||
(ActionListener<PutIndexTemplateResponse>) invocationOnMock.getArguments()[2];
|
||||
listener.onResponse(new TestPutIndexTemplateResponse(true));
|
||||
return null;
|
||||
}
|
||||
doAnswer(invocationOnMock -> {
|
||||
ActionListener<PutIndexTemplateResponse> listener =
|
||||
(ActionListener<PutIndexTemplateResponse>) invocationOnMock.getArguments()[2];
|
||||
listener.onResponse(new TestPutIndexTemplateResponse(true));
|
||||
return null;
|
||||
}).when(client).execute(same(PutIndexTemplateAction.INSTANCE), any(), any());
|
||||
|
||||
ClusterService clusterService = mock(ClusterService.class);
|
||||
registry = new WatcherIndexTemplateRegistry(Settings.EMPTY, clusterSettings, clusterService, threadPool, internalClient);
|
||||
registry = new WatcherIndexTemplateRegistry(Settings.EMPTY, clusterService, threadPool, internalClient);
|
||||
}
|
||||
|
||||
private ClusterChangedEvent createClusterChangedEvent(List<String> existingTemplateNames) {
|
||||
|
|
Loading…
Reference in New Issue