diff --git a/src/main/java/org/elasticsearch/watcher/WatcherLifeCycleService.java b/src/main/java/org/elasticsearch/watcher/WatcherLifeCycleService.java index 4355d6d26da..b8aef15dd6a 100644 --- a/src/main/java/org/elasticsearch/watcher/WatcherLifeCycleService.java +++ b/src/main/java/org/elasticsearch/watcher/WatcherLifeCycleService.java @@ -14,7 +14,6 @@ import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.gateway.GatewayService; -import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; /** @@ -29,7 +28,7 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste private volatile boolean manuallyStopped; @Inject - public WatcherLifeCycleService(Settings settings, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, WatcherService watcherService) { + public WatcherLifeCycleService(Settings settings, ClusterService clusterService, ThreadPool threadPool, WatcherService watcherService) { super(settings); this.clusterService = clusterService; this.threadPool = threadPool; @@ -37,7 +36,7 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste clusterService.add(this); // Close if the indices service is being stopped, so we don't run into search failures (locally) that will // happen because we're shutting down and an watch is scheduled. - indicesService.addLifecycleListener(new LifecycleListener() { + clusterService.addLifecycleListener(new LifecycleListener() { @Override public void beforeStop() { stop(false); diff --git a/src/main/java/org/elasticsearch/watcher/history/HistoryStore.java b/src/main/java/org/elasticsearch/watcher/history/HistoryStore.java index c5622262775..38b9088ae2a 100644 --- a/src/main/java/org/elasticsearch/watcher/history/HistoryStore.java +++ b/src/main/java/org/elasticsearch/watcher/history/HistoryStore.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.watcher.history; +import com.google.common.collect.ImmutableSet; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; @@ -19,26 +20,28 @@ import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.settings.ClusterDynamicSettings; +import org.elasticsearch.cluster.settings.DynamicSettings; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.joda.time.format.DateTimeFormat; import org.elasticsearch.common.joda.time.format.DateTimeFormatter; +import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.support.TemplateUtils; import org.elasticsearch.watcher.support.init.proxy.ClientProxy; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -46,34 +49,94 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; /** */ -public class HistoryStore extends AbstractComponent { +public class HistoryStore extends AbstractComponent implements NodeSettingsService.Listener { public static final String INDEX_PREFIX = ".watch_history-"; public static final String DOC_TYPE = "watch_record"; public static final String INDEX_TEMPLATE_NAME = "watch_history"; static final DateTimeFormatter indexTimeFormat = DateTimeFormat.forPattern("YYYY.MM.dd"); + private static final ImmutableSet forbiddenIndexSettings = ImmutableSet.of("index.mapper.dynamic"); private final ClientProxy client; private final TemplateUtils templateUtils; private final int scrollSize; private final TimeValue scrollTimeout; private final WatchRecord.Parser recordParser; + private final ThreadPool threadPool; private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); private final Lock putUpdateLock = readWriteLock.readLock(); private final Lock stopLock = readWriteLock.writeLock(); private final AtomicBoolean started = new AtomicBoolean(false); + private volatile Settings customIndexSettings = ImmutableSettings.EMPTY; + @Inject - public HistoryStore(Settings settings, ClientProxy client, TemplateUtils templateUtils, WatchRecord.Parser recordParser) { + public HistoryStore(Settings settings, ClientProxy client, TemplateUtils templateUtils, WatchRecord.Parser recordParser, + NodeSettingsService nodeSettingsService, @ClusterDynamicSettings DynamicSettings dynamicSettings, + ThreadPool threadPool) { super(settings); this.client = client; this.templateUtils = templateUtils; this.recordParser = recordParser; + this.threadPool = threadPool; this.scrollTimeout = componentSettings.getAsTime("scroll.timeout", TimeValue.timeValueSeconds(30)); this.scrollSize = componentSettings.getAsInt("scroll.size", 100); + + updateHistorySettings(settings, false); + nodeSettingsService.addListener(this); + dynamicSettings.addDynamicSetting("watcher.history.index.*"); } + @Override + public void onRefreshSettings(Settings settings) { + updateHistorySettings(settings, true); + } + + private void updateHistorySettings(Settings settings, boolean updateIndexTemplate) { + Settings newSettings = ImmutableSettings.builder() + .put(settings.getAsSettings("watcher.history.index")) + .build(); + if (newSettings.names().isEmpty()) { + return; + } + + boolean changed = false; + ImmutableSettings.Builder builder = ImmutableSettings.builder().put(customIndexSettings); + + for (Map.Entry entry : newSettings.getAsMap().entrySet()) { + String name = "index." + entry.getKey(); + if (forbiddenIndexSettings.contains(name)) { + logger.warn("overriding the default [{}} setting is forbidden. ignoring...", name); + continue; + } + + String newValue = entry.getValue(); + String currentValue = customIndexSettings.get(name); + if (!newValue.equals(currentValue)) { + changed = true; + builder.put(name, newValue); + logger.info("changing setting [{}] from [{}] to [{}]", name, currentValue, newValue); + } + } + + if (changed) { + customIndexSettings = builder.build(); + if (updateIndexTemplate) { + // Need to fork to prevent dead lock. (We're on the cluster service update task, but the put index template + // needs to update the cluster state too, and because the update takes is a single threaded operation, + // we would then be stuck) + threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { + @Override + public void run() { + templateUtils.putTemplate(INDEX_TEMPLATE_NAME, customIndexSettings); + } + }); + } + } + } + + public void start() { started.set(true); } @@ -243,7 +306,7 @@ public class HistoryStore extends AbstractComponent { String[] indices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), INDEX_PREFIX + "*"); if (indices.length == 0) { logger.debug("no .watch_history indices found. skipping loading awaiting watch records"); - templateUtils.ensureIndexTemplateIsLoaded(state, INDEX_TEMPLATE_NAME); + templateUtils.putTemplate(INDEX_TEMPLATE_NAME, customIndexSettings); return Collections.emptySet(); } int numPrimaryShards = 0; @@ -292,7 +355,7 @@ public class HistoryStore extends AbstractComponent { } finally { client.clearScroll(response.getScrollId()); } - templateUtils.ensureIndexTemplateIsLoaded(state, INDEX_TEMPLATE_NAME); + templateUtils.putTemplate(INDEX_TEMPLATE_NAME, customIndexSettings); return records; } diff --git a/src/main/java/org/elasticsearch/watcher/support/TemplateUtils.java b/src/main/java/org/elasticsearch/watcher/support/TemplateUtils.java index c86a152e355..8e2e3f0073c 100644 --- a/src/main/java/org/elasticsearch/watcher/support/TemplateUtils.java +++ b/src/main/java/org/elasticsearch/watcher/support/TemplateUtils.java @@ -5,104 +5,58 @@ */ package org.elasticsearch.watcher.support; -import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse; -import org.elasticsearch.action.admin.indices.template.put.TransportPutIndexTemplateAction; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; -import org.elasticsearch.common.base.Charsets; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.watcher.shield.ShieldIntegration; +import org.elasticsearch.watcher.WatcherException; +import org.elasticsearch.watcher.support.init.proxy.ClientProxy; import org.elasticsearch.watcher.watch.WatchStore; import java.io.FileNotFoundException; -import java.io.IOException; import java.io.InputStream; -import java.io.UnsupportedEncodingException; -import java.util.regex.Matcher; -import java.util.regex.Pattern; /** */ public class TemplateUtils extends AbstractComponent { - private final static Pattern TEMPLATE_VERSION_PATTERN = Pattern.compile("watcher.template_version\"\\s*:\\s*\"?(\\d+)\"?"); - - private final ShieldIntegration shieldIntegration; - private final TransportPutIndexTemplateAction action; + private final ClientProxy client; @Inject - public TemplateUtils(Settings settings, TransportPutIndexTemplateAction action, ShieldIntegration shieldIntegration) { + public TemplateUtils(Settings settings, ClientProxy client) { super(settings); - this.action = action; - this.shieldIntegration = shieldIntegration; + this.client = client; } /** - * Checks if the template with the specified name exists and has the expected version. - * If that isn't the case then the template from the classpath will be uploaded to the cluster. + * Resolves the template with the specified templateName from the classpath, optionally adds extra settings and + * puts the index template into the cluster. * - * In the the template doesn't exists this method blocks until the template has been created. + * This method blocks until the template has been created. */ - public void ensureIndexTemplateIsLoaded(ClusterState state, final String templateName) { - final byte[] template; - try { - InputStream is = WatchStore.class.getResourceAsStream("/" + templateName + ".json"); + public void putTemplate(String templateName, Settings customSettings) { + try (InputStream is = WatchStore.class.getResourceAsStream("/" + templateName + ".json")) { if (is == null) { throw new FileNotFoundException("Resource [/" + templateName + ".json] not found in classpath"); } - template = Streams.copyToByteArray(is); - is.close(); - } catch (IOException e) { + final byte[] template = Streams.copyToByteArray(is); + PutIndexTemplateRequest request = new PutIndexTemplateRequest(templateName).source(template); + if (customSettings != null) { + Settings updatedSettings = ImmutableSettings.builder() + .put(request.settings()) + .put(customSettings) + .build(); + request.settings(updatedSettings); + } + PutIndexTemplateResponse response = client.putTemplate(request); + } catch (Exception e) { // throwing an exception to stop exporting process - we don't want to send data unless // we put in the template for it. - throw new RuntimeException("failed to load " + templateName + ".json", e); + throw new WatcherException("failed to load [{}.json]", e, templateName); } - - try { - int expectedVersion = parseIndexVersionFromTemplate(template); - if (expectedVersion < 0) { - throw new RuntimeException("failed to find an index version in pre-configured index template"); - } - - IndexTemplateMetaData templateMetaData = state.metaData().templates().get(templateName); - if (templateMetaData != null) { - int foundVersion = templateMetaData.getSettings().getAsInt("index.watcher.template_version", -1); - if (foundVersion < 0) { - logger.warn("found an existing index template [{}] but couldn't extract it's version. leaving it as is.", templateName); - return; - } else if (foundVersion >= expectedVersion) { - logger.debug("accepting existing index template [{}] (version [{}], needed [{}])", templateName, foundVersion, expectedVersion); - return; - } else { - logger.info("replacing existing index template [{}] (version [{}], needed [{}])", templateName, foundVersion, expectedVersion); - } - } else { - logger.info("Adding index template [{}], because none was found", templateName); - } - - PutIndexTemplateRequest request = new PutIndexTemplateRequest(templateName).source(template); - shieldIntegration.bindWatcherUser(request); - - // We're already running on the master and TransportPutIndexTemplateAction#executor() is SAME, so it is ok to wait: - ActionFuture future = action.execute(request); - PutIndexTemplateResponse response = future.actionGet(); - } catch (IOException e) { - // if we're not sure of the template, we can't send data... re-raise exception. - throw new RuntimeException("failed to load/verify index template", e); - } - } - - private static int parseIndexVersionFromTemplate(byte[] template) throws UnsupportedEncodingException { - Matcher matcher = TEMPLATE_VERSION_PATTERN.matcher(new String(template, Charsets.UTF_8)); - if (!matcher.find()) { - return -1; - } - return Integer.parseInt(matcher.group(1)); } } diff --git a/src/main/java/org/elasticsearch/watcher/support/init/proxy/ClientProxy.java b/src/main/java/org/elasticsearch/watcher/support/init/proxy/ClientProxy.java index 0e5b34633a9..9f070a8fd0f 100644 --- a/src/main/java/org/elasticsearch/watcher/support/init/proxy/ClientProxy.java +++ b/src/main/java/org/elasticsearch/watcher/support/init/proxy/ClientProxy.java @@ -9,6 +9,8 @@ import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; @@ -99,6 +101,11 @@ public class ClientProxy implements InitializingService.Initializable { return client.admin().indices().refresh(preProcess(request)).actionGet(); } + public PutIndexTemplateResponse putTemplate(PutIndexTemplateRequest request) { + preProcess(request); + return client.admin().indices().putTemplate(request).actionGet(); + } + M preProcess(M message) { if (shieldIntegration != null) { shieldIntegration.bindWatcherUser(message); diff --git a/src/main/java/org/elasticsearch/watcher/watch/WatchStore.java b/src/main/java/org/elasticsearch/watcher/watch/WatchStore.java index 5abb2f928e5..da8e974884c 100644 --- a/src/main/java/org/elasticsearch/watcher/watch/WatchStore.java +++ b/src/main/java/org/elasticsearch/watcher/watch/WatchStore.java @@ -76,7 +76,7 @@ public class WatchStore extends AbstractComponent { try { int count = loadWatches(watchesIndexMetaData.numberOfShards()); logger.debug("loaded [{}] watches from the watches index [{}]", count, INDEX); - templateUtils.ensureIndexTemplateIsLoaded(state, INDEX_TEMPLATE); + templateUtils.putTemplate(INDEX_TEMPLATE, null); started.set(true); } catch (Exception e) { logger.debug("failed to load watches for watch index [{}]", e, INDEX); @@ -84,7 +84,7 @@ public class WatchStore extends AbstractComponent { throw e; } } else { - templateUtils.ensureIndexTemplateIsLoaded(state, INDEX_TEMPLATE); + templateUtils.putTemplate(INDEX_TEMPLATE, null); started.set(true); } } diff --git a/src/main/resources/watch_history.json b/src/main/resources/watch_history.json index 8b0a390e54e..de507351456 100644 --- a/src/main/resources/watch_history.json +++ b/src/main/resources/watch_history.json @@ -3,8 +3,6 @@ "order": 2147483647, "settings": { "index.number_of_shards": 1, - "index.number_of_replicas": 1, - "index.watcher.template_version": 1, "index.mapper.dynamic" : false }, "mappings": { diff --git a/src/main/resources/watches.json b/src/main/resources/watches.json index e6721ce8a49..2e633fb9bfe 100644 --- a/src/main/resources/watches.json +++ b/src/main/resources/watches.json @@ -3,8 +3,6 @@ "order": 2147483647, "settings": { "index.number_of_shards": 1, - "index.number_of_replicas": 1, - "index.watcher.template_version": 1, "index.mapper.dynamic" : false }, "mappings": { diff --git a/src/test/java/org/elasticsearch/watcher/WatcherLifeCycleServiceTest.java b/src/test/java/org/elasticsearch/watcher/WatcherLifeCycleServiceTest.java index 16593a19586..be9471b2027 100644 --- a/src/test/java/org/elasticsearch/watcher/WatcherLifeCycleServiceTest.java +++ b/src/test/java/org/elasticsearch/watcher/WatcherLifeCycleServiceTest.java @@ -14,7 +14,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.util.concurrent.MoreExecutors; import org.elasticsearch.gateway.GatewayService; -import org.elasticsearch.indices.IndicesService; import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.junit.Before; @@ -27,20 +26,16 @@ import static org.mockito.Mockito.*; */ public class WatcherLifeCycleServiceTest extends ElasticsearchTestCase { - private ThreadPool threadPool; private WatcherService watcherService; - private ClusterService clusterService; - private IndicesService indicesService; private WatcherLifeCycleService lifeCycleService; @Before public void prepareServices() { - threadPool = mock(ThreadPool.class); + ThreadPool threadPool = mock(ThreadPool.class); when(threadPool.executor(anyString())).thenReturn(MoreExecutors.newDirectExecutorService()); watcherService = mock(WatcherService.class); - clusterService = mock(ClusterService.class); - indicesService = mock(IndicesService.class); - lifeCycleService = new WatcherLifeCycleService(ImmutableSettings.EMPTY, clusterService, indicesService, threadPool, watcherService); + ClusterService clusterService = mock(ClusterService.class); + lifeCycleService = new WatcherLifeCycleService(ImmutableSettings.EMPTY, clusterService, threadPool, watcherService); } @Test diff --git a/src/test/java/org/elasticsearch/watcher/history/HistoryStoreTests.java b/src/test/java/org/elasticsearch/watcher/history/HistoryStoreTests.java index 4e01cde31b3..f5bc335d411 100644 --- a/src/test/java/org/elasticsearch/watcher/history/HistoryStoreTests.java +++ b/src/test/java/org/elasticsearch/watcher/history/HistoryStoreTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.*; +import org.elasticsearch.cluster.settings.DynamicSettings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.joda.time.DateTime; @@ -25,11 +26,13 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.text.StringText; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.InternalSearchHit; import org.elasticsearch.search.internal.InternalSearchHits; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.test.ElasticsearchTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.condition.always.ExecutableAlwaysCondition; import org.elasticsearch.watcher.execution.Wid; import org.elasticsearch.watcher.input.none.ExecutableNoneInput; @@ -48,6 +51,7 @@ import static org.elasticsearch.watcher.test.WatcherMatchers.indexRequest; import static org.hamcrest.Matchers.*; import static org.hamcrest.core.IsEqual.equalTo; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.same; import static org.mockito.Mockito.*; /** @@ -58,13 +62,19 @@ public class HistoryStoreTests extends ElasticsearchTestCase { private ClientProxy clientProxy; private TemplateUtils templateUtils; private WatchRecord.Parser parser; + private NodeSettingsService nodeSettingsService; + private DynamicSettings dynamicSettings; + private ThreadPool threadPool; @Before public void init() { clientProxy = mock(ClientProxy.class); templateUtils = mock(TemplateUtils.class); parser = mock(WatchRecord.Parser.class); - historyStore = new HistoryStore(ImmutableSettings.EMPTY, clientProxy, templateUtils, parser); + nodeSettingsService = mock(NodeSettingsService.class); + dynamicSettings = mock(DynamicSettings.class); + threadPool = mock(ThreadPool.class); + historyStore = new HistoryStore(ImmutableSettings.EMPTY, clientProxy, templateUtils, parser, nodeSettingsService, dynamicSettings, threadPool); historyStore.start(); } @@ -160,7 +170,7 @@ public class HistoryStoreTests extends ElasticsearchTestCase { Collection records = historyStore.loadRecords(cs, WatchRecord.State.AWAITS_EXECUTION); assertThat(records, notNullValue()); assertThat(records, hasSize(0)); - verify(templateUtils, times(1)).ensureIndexTemplateIsLoaded(cs, "watch_history"); + verify(templateUtils, times(1)).putTemplate(same("watch_history"), any(Settings.class)); verifyZeroInteractions(clientProxy); } @@ -325,7 +335,7 @@ public class HistoryStoreTests extends ElasticsearchTestCase { assertThat(records, IsNull.notNullValue()); assertThat(records, hasSize(0)); - verify(templateUtils, times(1)).ensureIndexTemplateIsLoaded(cs, "watch_history"); + verify(templateUtils, times(1)).putTemplate(same("watch_history"), any(Settings.class)); verify(clientProxy, times(1)).refresh(any(RefreshRequest.class)); } @@ -383,7 +393,7 @@ public class HistoryStoreTests extends ElasticsearchTestCase { assertThat(records, notNullValue()); assertThat(records, hasSize(0)); - verify(templateUtils, times(1)).ensureIndexTemplateIsLoaded(cs, "watch_history"); + verify(templateUtils, times(1)).putTemplate(same("watch_history"), any(Settings.class)); verify(clientProxy, times(1)).refresh(any(RefreshRequest.class)); } diff --git a/src/test/java/org/elasticsearch/watcher/support/TemplateUtilsTests.java b/src/test/java/org/elasticsearch/watcher/support/TemplateUtilsTests.java new file mode 100644 index 00000000000..74d848b33be --- /dev/null +++ b/src/test/java/org/elasticsearch/watcher/support/TemplateUtilsTests.java @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.support; + +import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.watcher.history.HistoryStore; +import org.elasticsearch.watcher.support.init.proxy.ClientProxy; +import org.junit.Test; + +import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.SUITE; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.core.Is.is; + +/** + */ +@ElasticsearchIntegrationTest.ClusterScope(scope = SUITE, numClientNodes = 0, transportClientRatio = 0, randomDynamicTemplates = false, numDataNodes = 1) +public class TemplateUtilsTests extends ElasticsearchIntegrationTest { + + @Test + public void testPutTemplate() throws Exception { + TemplateUtils templateUtils = new TemplateUtils(ImmutableSettings.EMPTY, ClientProxy.of(client())); + + ImmutableSettings.Builder options = ImmutableSettings.builder(); + options.put("key", "value"); + templateUtils.putTemplate(HistoryStore.INDEX_TEMPLATE_NAME, options.build()); + + GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(HistoryStore.INDEX_TEMPLATE_NAME).get(); + // setting in the file on the classpath: + assertThat(response.getIndexTemplates().get(0).getSettings().getAsBoolean("index.mapper.dynamic", null), is(false)); + // additional setting: + assertThat(response.getIndexTemplates().get(0).getSettings().get("index.key"), equalTo("value")); + } + +} diff --git a/src/test/java/org/elasticsearch/watcher/test/integration/HistoryStoreSettingsTest.java b/src/test/java/org/elasticsearch/watcher/test/integration/HistoryStoreSettingsTest.java new file mode 100644 index 00000000000..a9d434b0633 --- /dev/null +++ b/src/test/java/org/elasticsearch/watcher/test/integration/HistoryStoreSettingsTest.java @@ -0,0 +1,74 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.test.integration; + +import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.watcher.history.HistoryStore; +import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests; +import org.junit.Test; + +import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.TEST; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.core.Is.is; + +/** + */ +@ElasticsearchIntegrationTest.ClusterScope(scope = TEST, numClientNodes = 0, transportClientRatio = 0, randomDynamicTemplates = false, numDataNodes = 1) +public class HistoryStoreSettingsTest extends AbstractWatcherIntegrationTests { + + @Test + public void testChangeSettings() throws Exception { + GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(HistoryStore.INDEX_TEMPLATE_NAME).get(); + assertThat(response.getIndexTemplates().get(0).getSettings().get("index.number_of_shards"), equalTo("1")); + assertThat(response.getIndexTemplates().get(0).getSettings().get("index.number_of_replicas"), nullValue()); // this isn't defined in the template, so we rely on ES's default, which is zero + assertThat(response.getIndexTemplates().get(0).getSettings().get("index.refresh_interval"), nullValue()); // this isn't defined in the template, so we rely on ES's default, which is 1s + + client().admin().cluster().prepareUpdateSettings() + .setTransientSettings(ImmutableSettings.builder() + .put("watcher.history.index.number_of_shards", "2") + .put("watcher.history.index.number_of_replicas", "2") + .put("watcher.history.index.refresh_interval", "5m")) + .get(); + + // use assertBusy(...) because we update the index template in an async manner + assertBusy(new Runnable() { + @Override + public void run() { + GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(HistoryStore.INDEX_TEMPLATE_NAME).get(); + assertThat(response.getIndexTemplates().get(0).getSettings().get("index.number_of_shards"), equalTo("2")); + assertThat(response.getIndexTemplates().get(0).getSettings().get("index.number_of_replicas"), equalTo("2")); + assertThat(response.getIndexTemplates().get(0).getSettings().get("index.refresh_interval"), equalTo("5m")); + } + }); + } + + @Test + public void testChangeSettings_ignoringForbiddenSetting() throws Exception { + GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(HistoryStore.INDEX_TEMPLATE_NAME).get(); + assertThat(response.getIndexTemplates().get(0).getSettings().get("index.number_of_shards"), equalTo("1")); + assertThat(response.getIndexTemplates().get(0).getSettings().getAsBoolean("index.mapper.dynamic", null), is(false)); + + client().admin().cluster().prepareUpdateSettings() + .setTransientSettings(ImmutableSettings.builder() + .put("watcher.history.index.number_of_shards", "2") + .put("watcher.history.index.mapper.dynamic", true)) // forbidden setting, should not get updated + .get(); + + // use assertBusy(...) because we update the index template in an async manner + assertBusy(new Runnable() { + @Override + public void run() { + GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(HistoryStore.INDEX_TEMPLATE_NAME).get(); + assertThat(response.getIndexTemplates().get(0).getSettings().get("index.number_of_shards"), equalTo("2")); + assertThat(response.getIndexTemplates().get(0).getSettings().getAsBoolean("index.mapper.dynamic", null), is(false)); + } + }); + } + +} diff --git a/src/test/java/org/elasticsearch/watcher/watch/WatchStoreTests.java b/src/test/java/org/elasticsearch/watcher/watch/WatchStoreTests.java index 2426fa85700..80ce3443cca 100644 --- a/src/test/java/org/elasticsearch/watcher/watch/WatchStoreTests.java +++ b/src/test/java/org/elasticsearch/watcher/watch/WatchStoreTests.java @@ -68,7 +68,7 @@ public class WatchStoreTests extends ElasticsearchTestCase { watchStore.start(cs); assertThat(watchStore.started(), is(true)); assertThat(watchStore.watches().size(), equalTo(0)); - verify(templateUtils, times(1)).ensureIndexTemplateIsLoaded(cs, "watches"); + verify(templateUtils, times(1)).putTemplate("watches", null); verifyZeroInteractions(clientProxy); watchStore.start(cs); @@ -209,7 +209,7 @@ public class WatchStoreTests extends ElasticsearchTestCase { watchStore.start(cs); assertThat(watchStore.started(), is(true)); assertThat(watchStore.watches().size(), equalTo(0)); - verify(templateUtils, times(1)).ensureIndexTemplateIsLoaded(cs, "watches"); + verify(templateUtils, times(1)).putTemplate("watches", null); verify(clientProxy, times(1)).refresh(any(RefreshRequest.class)); verify(clientProxy, times(1)).search(any(SearchRequest.class)); verify(clientProxy, times(1)).clearScroll(anyString()); @@ -264,7 +264,7 @@ public class WatchStoreTests extends ElasticsearchTestCase { watchStore.start(cs); assertThat(watchStore.started(), is(true)); assertThat(watchStore.watches().size(), equalTo(2)); - verify(templateUtils, times(1)).ensureIndexTemplateIsLoaded(cs, "watches"); + verify(templateUtils, times(1)).putTemplate("watches", null); verify(clientProxy, times(1)).refresh(any(RefreshRequest.class)); verify(clientProxy, times(1)).search(any(SearchRequest.class)); verify(clientProxy, times(2)).searchScroll(anyString(), any(TimeValue.class));