diff --git a/src/main/java/org/elasticsearch/watcher/WatcherModule.java b/src/main/java/org/elasticsearch/watcher/WatcherModule.java index 7735837eff8..5084e2e6dd4 100644 --- a/src/main/java/org/elasticsearch/watcher/WatcherModule.java +++ b/src/main/java/org/elasticsearch/watcher/WatcherModule.java @@ -9,6 +9,7 @@ package org.elasticsearch.watcher; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.inject.SpawnModules; +import org.elasticsearch.common.inject.multibindings.Multibinder; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.watcher.actions.ActionModule; import org.elasticsearch.watcher.client.WatcherClientModule; @@ -19,8 +20,8 @@ import org.elasticsearch.watcher.input.InputModule; import org.elasticsearch.watcher.license.LicenseModule; import org.elasticsearch.watcher.rest.WatcherRestModule; import org.elasticsearch.watcher.shield.WatcherShieldModule; -import org.elasticsearch.watcher.support.DynamicIndexName; -import org.elasticsearch.watcher.support.TemplateUtils; +import org.elasticsearch.watcher.support.WatcherIndexTemplateRegistry; +import org.elasticsearch.watcher.support.WatcherIndexTemplateRegistry.TemplateConfig; import org.elasticsearch.watcher.support.clock.ClockModule; import org.elasticsearch.watcher.support.http.HttpClientModule; import org.elasticsearch.watcher.support.init.InitializingModule; @@ -37,6 +38,10 @@ import java.util.Arrays; public class WatcherModule extends AbstractModule implements SpawnModules { + public static final String HISTORY_TEMPLATE_NAME = "watch_history"; + public static final String TRIGGERED_TEMPLATE_NAME = "triggered_watches"; + public static final String WATCHES_TEMPLATE_NAME = "watches"; + protected final Settings settings; public WatcherModule(Settings settings) { @@ -69,8 +74,14 @@ public class WatcherModule extends AbstractModule implements SpawnModules { @Override protected void configure() { bind(WatcherLifeCycleService.class).asEagerSingleton(); - bind(TemplateUtils.class).asEagerSingleton(); bind(WatcherSettingsValidation.class).asEagerSingleton(); + + bind(WatcherIndexTemplateRegistry.class).asEagerSingleton(); + Multibinder multibinder + = Multibinder.newSetBinder(binder(), TemplateConfig.class); + multibinder.addBinding().toInstance(new TemplateConfig(TRIGGERED_TEMPLATE_NAME, "watcher.triggered_watches.index")); + multibinder.addBinding().toInstance(new TemplateConfig(HISTORY_TEMPLATE_NAME, "watcher.history.index")); + multibinder.addBinding().toInstance(new TemplateConfig(WATCHES_TEMPLATE_NAME, "watcher.watches.index")); } } diff --git a/src/main/java/org/elasticsearch/watcher/execution/TriggeredWatchStore.java b/src/main/java/org/elasticsearch/watcher/execution/TriggeredWatchStore.java index a153f3b7ef5..9bd2821d580 100644 --- a/src/main/java/org/elasticsearch/watcher/execution/TriggeredWatchStore.java +++ b/src/main/java/org/elasticsearch/watcher/execution/TriggeredWatchStore.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.watcher.execution; -import com.google.common.collect.ImmutableSet; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; @@ -29,11 +28,13 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.watcher.history.HistoryException; import org.elasticsearch.watcher.history.TriggeredWatchException; -import org.elasticsearch.watcher.support.TemplateUtils; import org.elasticsearch.watcher.support.init.proxy.ClientProxy; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -43,14 +44,10 @@ public class TriggeredWatchStore extends AbstractComponent { public static final String INDEX_NAME = ".triggered_watches"; public static final String DOC_TYPE = "triggered_watch"; - public static final String INDEX_TEMPLATE_NAME = "triggered_watches"; - private static final ImmutableSet forbiddenIndexSettings = ImmutableSet.of("index.mapper.dynamic"); private final int scrollSize; private final ClientProxy client; private final TimeValue scrollTimeout; - private final TemplateUtils templateUtils; - private final Settings customIndexSettings; private final TriggeredWatch.Parser triggeredWatchParser; private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); @@ -59,47 +56,16 @@ public class TriggeredWatchStore extends AbstractComponent { private final AtomicBoolean started = new AtomicBoolean(false); @Inject - public TriggeredWatchStore(Settings settings, ClientProxy client, TemplateUtils templateUtils, TriggeredWatch.Parser triggeredWatchParser) { + public TriggeredWatchStore(Settings settings, ClientProxy client, TriggeredWatch.Parser triggeredWatchParser) { super(settings); this.scrollSize = settings.getAsInt("watcher.execution.scroll.size", 100); this.client = client; this.scrollTimeout = settings.getAsTime("watcher.execution.scroll.timeout", TimeValue.timeValueSeconds(30)); - this.templateUtils = templateUtils; - this.customIndexSettings = updateTriggerWatchesSettings(settings); this.triggeredWatchParser = triggeredWatchParser; } - private Settings updateTriggerWatchesSettings(Settings nodeSettings) { - Settings newSettings = Settings.builder() - .put(nodeSettings.getAsSettings("watcher.triggered_watches.index")) - .build(); - if (newSettings.names().isEmpty()) { - return Settings.EMPTY; - } - - // Filter out forbidden settings: - Settings.Builder builder = Settings.builder(); - for (Map.Entry entry : newSettings.getAsMap().entrySet()) { - String name = "index." + entry.getKey(); - if (forbiddenIndexSettings.contains(name)) { - logger.warn("overriding the default [{}} setting is forbidden. ignoring...", name); - continue; - } - builder.put(name, entry.getValue()); - } - return builder.build(); - } - - public void start() { - if (started.compareAndSet(false, true)) { - try { - templateUtils.putTemplate(INDEX_TEMPLATE_NAME, customIndexSettings); - } catch (Exception e) { - started.set(false); - throw e; - } - } + started.set(true); } public boolean validate(ClusterState state) { diff --git a/src/main/java/org/elasticsearch/watcher/history/HistoryModule.java b/src/main/java/org/elasticsearch/watcher/history/HistoryModule.java index a7e8227bf7a..73c3726f8be 100644 --- a/src/main/java/org/elasticsearch/watcher/history/HistoryModule.java +++ b/src/main/java/org/elasticsearch/watcher/history/HistoryModule.java @@ -13,11 +13,9 @@ import org.elasticsearch.watcher.execution.InternalWatchExecutor; */ public class HistoryModule extends AbstractModule { - public HistoryModule() { } - @Override protected void configure() { bind(HistoryStore.class).asEagerSingleton(); diff --git a/src/main/java/org/elasticsearch/watcher/history/HistoryStore.java b/src/main/java/org/elasticsearch/watcher/history/HistoryStore.java index 70b78e526ea..4553338cc2d 100644 --- a/src/main/java/org/elasticsearch/watcher/history/HistoryStore.java +++ b/src/main/java/org/elasticsearch/watcher/history/HistoryStore.java @@ -10,23 +10,17 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.settings.ClusterDynamicSettings; -import org.elasticsearch.cluster.settings.DynamicSettings; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.watcher.support.init.proxy.ClientProxy; import org.joda.time.DateTime; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.node.settings.NodeSettingsService; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.watcher.support.TemplateUtils; -import org.elasticsearch.watcher.support.init.proxy.ClientProxy; import java.io.IOException; -import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -34,97 +28,29 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; /** */ -public class HistoryStore extends AbstractComponent implements NodeSettingsService.Listener { +public class HistoryStore extends AbstractComponent { public static final String INDEX_PREFIX = ".watch_history-"; public static final String DOC_TYPE = "watch_record"; - public static final String INDEX_TEMPLATE_NAME = "watch_history"; static final DateTimeFormatter indexTimeFormat = DateTimeFormat.forPattern("YYYY.MM.dd"); private static final ImmutableSet forbiddenIndexSettings = ImmutableSet.of("index.mapper.dynamic"); private final ClientProxy client; - private final TemplateUtils templateUtils; - private final ThreadPool threadPool; private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); private final Lock putUpdateLock = readWriteLock.readLock(); private final Lock stopLock = readWriteLock.writeLock(); private final AtomicBoolean started = new AtomicBoolean(false); - private volatile Settings customIndexSettings = Settings.EMPTY; - @Inject - public HistoryStore(Settings settings, ClientProxy client, TemplateUtils templateUtils, NodeSettingsService nodeSettingsService, - @ClusterDynamicSettings DynamicSettings dynamicSettings, ThreadPool threadPool) { + public HistoryStore(Settings settings, ClientProxy client) { super(settings); this.client = client; - this.templateUtils = templateUtils; - this.threadPool = threadPool; - - updateHistorySettings(settings, false); - nodeSettingsService.addListener(this); - dynamicSettings.addDynamicSetting("watcher.history.index.*"); } - @Override - public void onRefreshSettings(Settings settings) { - updateHistorySettings(settings, true); - } - - private void updateHistorySettings(Settings settings, boolean updateIndexTemplate) { - Settings newSettings = Settings.builder() - .put(settings.getAsSettings("watcher.history.index")) - .build(); - if (newSettings.names().isEmpty()) { - return; - } - - boolean changed = false; - Settings.Builder builder = Settings.builder().put(customIndexSettings); - - for (Map.Entry entry : newSettings.getAsMap().entrySet()) { - String name = "index." + entry.getKey(); - if (forbiddenIndexSettings.contains(name)) { - logger.warn("overriding the default [{}} setting is forbidden. ignoring...", name); - continue; - } - - String newValue = entry.getValue(); - String currentValue = customIndexSettings.get(name); - if (!newValue.equals(currentValue)) { - changed = true; - builder.put(name, newValue); - logger.info("changing setting [{}] from [{}] to [{}]", name, currentValue, newValue); - } - } - - if (changed) { - customIndexSettings = builder.build(); - if (updateIndexTemplate) { - // Need to fork to prevent dead lock. (We're on the cluster service update task, but the put index template - // needs to update the cluster state too, and because the update takes is a single threaded operation, - // we would then be stuck) - threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { - @Override - public void run() { - templateUtils.putTemplate(INDEX_TEMPLATE_NAME, customIndexSettings); - } - }); - } - } - } - - public void start() { - if (started.compareAndSet(false, true)) { - try { - templateUtils.putTemplate(INDEX_TEMPLATE_NAME, customIndexSettings); - } catch (Exception e) { - started.set(false); - throw e; - } - } + started.set(true); } public boolean validate(ClusterState state) { diff --git a/src/main/java/org/elasticsearch/watcher/support/TemplateUtils.java b/src/main/java/org/elasticsearch/watcher/support/TemplateUtils.java deleted file mode 100644 index 784b050d8b4..00000000000 --- a/src/main/java/org/elasticsearch/watcher/support/TemplateUtils.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.watcher.support; - -import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; -import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse; -import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.Streams; -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.watcher.WatcherException; -import org.elasticsearch.watcher.support.init.proxy.ClientProxy; -import org.elasticsearch.watcher.watch.WatchStore; - -import java.io.FileNotFoundException; -import java.io.InputStream; - -/** - */ -public class TemplateUtils extends AbstractComponent { - - private final ClientProxy client; - - @Inject - public TemplateUtils(Settings settings, ClientProxy client) { - super(settings); - this.client = client; - } - - /** - * Resolves the template with the specified templateName from the classpath, optionally adds extra settings and - * puts the index template into the cluster. - * - * This method blocks until the template has been created. - */ - public void putTemplate(String templateName, Settings customSettings) { - try (InputStream is = WatchStore.class.getResourceAsStream("/" + templateName + ".json")) { - if (is == null) { - throw new FileNotFoundException("Resource [/" + templateName + ".json] not found in classpath"); - } - final byte[] template; - try (BytesStreamOutput out = new BytesStreamOutput()) { - Streams.copy(is, out); - template = out.bytes().toBytes(); - } - PutIndexTemplateRequest request = new PutIndexTemplateRequest(templateName).source(template); - if (customSettings != null && customSettings.names().size() > 0) { - Settings updatedSettings = Settings.builder() - .put(request.settings()) - .put(customSettings) - .build(); - request.settings(updatedSettings); - } - PutIndexTemplateResponse response = client.putTemplate(request); - } catch (Exception e) { - // throwing an exception to stop exporting process - we don't want to send data unless - // we put in the template for it. - throw new WatcherException("failed to load [{}.json]", e, templateName); - } - } - -} diff --git a/src/main/java/org/elasticsearch/watcher/support/WatcherIndexTemplateRegistry.java b/src/main/java/org/elasticsearch/watcher/support/WatcherIndexTemplateRegistry.java new file mode 100644 index 00000000000..d9e114a68ad --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/support/WatcherIndexTemplateRegistry.java @@ -0,0 +1,192 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.support; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.settings.ClusterDynamicSettings; +import org.elasticsearch.cluster.settings.DynamicSettings; +import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.gateway.GatewayService; +import org.elasticsearch.node.settings.NodeSettingsService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.watcher.support.init.proxy.ClientProxy; +import org.elasticsearch.watcher.watch.WatchStore; + +import java.io.InputStream; +import java.util.Map; +import java.util.Set; + +/** + */ +public class WatcherIndexTemplateRegistry extends AbstractComponent implements ClusterStateListener, NodeSettingsService.Listener { + + private static final ImmutableSet forbiddenIndexSettings = ImmutableSet.of("index.mapper.dynamic"); + + private final ClientProxy client; + private final ThreadPool threadPool; + private final ClusterService clusterService; + private final ImmutableSet indexTemplates; + + private volatile ImmutableMap customIndexSettings; + + @Inject + public WatcherIndexTemplateRegistry(Settings settings, NodeSettingsService nodeSettingsService, ClusterService clusterService, + ThreadPool threadPool, ClientProxy client, Set configs, + @ClusterDynamicSettings DynamicSettings dynamicSettings) { + super(settings); + this.client = client; + this.threadPool = threadPool; + this.clusterService = clusterService; + this.indexTemplates = ImmutableSet.copyOf(configs); + clusterService.add(this); + nodeSettingsService.addListener(this); + + ImmutableMap.Builder customIndexSettingsBuilder = ImmutableMap.builder(); + for (TemplateConfig indexTemplate : indexTemplates) { + Settings customSettings = this.settings.getAsSettings(indexTemplate.getSettingsPrefix()); + customIndexSettings = customIndexSettingsBuilder.put(indexTemplate.getSettingsPrefix(), customSettings).build(); + dynamicSettings.addDynamicSetting(indexTemplate.getDynamicSettingsPrefix()); + } + customIndexSettings = customIndexSettingsBuilder.build(); + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + ClusterState state = event.state(); + if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { + // wait until the gateway has recovered from disk, otherwise we think may not have the index templates, + // while they actually do exist + return; + } + + if (event.localNodeMaster() == false) { + // Only the node that runs or will run Watcher should update the templates. Otherwise unnecessary put template + // calls would happen + return; + } + + for (TemplateConfig template : indexTemplates) { + if (!state.metaData().getTemplates().containsKey(template.getTemplateName())) { + putTemplate(template); + } + } + } + + @Override + public void onRefreshSettings(Settings settings) { + if (clusterService.localNode().masterNode() == false) { + // Only the node that runs or will run Watcher should update the templates. Otherwise unnecessary put template + // calls would happen + return; + } + + for (TemplateConfig config : indexTemplates) { + Settings newSettings = Settings.builder() + .put(settings.getAsSettings(config.getSettingsPrefix())) + .build(); + if (newSettings.names().isEmpty()) { + continue; + } + + Settings existingSettings = customIndexSettings.get(config.getSettingsPrefix()); + if (existingSettings == null) { + existingSettings = Settings.EMPTY; + } + + boolean changed = false; + Settings.Builder builder = Settings.builder().put(existingSettings); + for (Map.Entry newSettingsEntry : newSettings.getAsMap().entrySet()) { + String name = "index." + newSettingsEntry.getKey(); + if (forbiddenIndexSettings.contains(name)) { + logger.warn("overriding the default [{}} setting is forbidden. ignoring...", name); + continue; + } + + String newValue = newSettingsEntry.getValue(); + String currentValue = existingSettings.get(name); + if (!newValue.equals(currentValue)) { + changed = true; + builder.put(name, newValue); + logger.info("changing setting [{}] from [{}] to [{}]", name, currentValue, newValue); + } + } + + if (changed) { + customIndexSettings = MapBuilder.newMapBuilder(customIndexSettings) + .put(config.getSettingsPrefix(), builder.build()) + .immutableMap(); + putTemplate(config); + } + } + } + + private void putTemplate(final TemplateConfig config) { + threadPool.generic().execute(new Runnable() { + @Override + public void run() { + try (InputStream is = WatchStore.class.getResourceAsStream("/" + config.getTemplateName()+ ".json")) { + if (is == null) { + logger.error("Resource [/" + config.getTemplateName() + ".json] not found in classpath"); + return; + } + final byte[] template; + try (BytesStreamOutput out = new BytesStreamOutput()) { + Streams.copy(is, out); + template = out.bytes().toBytes(); + } + + PutIndexTemplateRequest request = new PutIndexTemplateRequest(config.getTemplateName()).source(template); + Settings customSettings = customIndexSettings.get(config.getSettingsPrefix()); + if (customSettings != null && customSettings.names().size() > 0) { + Settings updatedSettings = Settings.builder() + .put(request.settings()) + .put(customSettings) + .build(); + request.settings(updatedSettings); + } + PutIndexTemplateResponse response = client.putTemplate(request); + } catch (Exception e) { + logger.error("failed to load [{}.json]", e, config.getTemplateName()); + } + } + }); + } + + public static class TemplateConfig { + + private final String templateName; + private final String settingsPrefix; + + public TemplateConfig(String templateName, String settingsPrefix) { + this.templateName = templateName; + this.settingsPrefix = settingsPrefix; + } + + public String getTemplateName() { + return templateName; + } + + public String getSettingsPrefix() { + return settingsPrefix; + } + + public String getDynamicSettingsPrefix() { + return settingsPrefix + ".*"; + } + } +} diff --git a/src/main/java/org/elasticsearch/watcher/watch/WatchStore.java b/src/main/java/org/elasticsearch/watcher/watch/WatchStore.java index 8007805a8e1..9463808c6c2 100644 --- a/src/main/java/org/elasticsearch/watcher/watch/WatchStore.java +++ b/src/main/java/org/elasticsearch/watcher/watch/WatchStore.java @@ -33,7 +33,6 @@ import org.elasticsearch.index.engine.DocumentMissingException; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.watcher.WatcherException; -import org.elasticsearch.watcher.support.TemplateUtils; import org.elasticsearch.watcher.support.init.proxy.ClientProxy; import java.io.IOException; @@ -45,11 +44,9 @@ import java.util.concurrent.atomic.AtomicBoolean; public class WatchStore extends AbstractComponent { public static final String INDEX = ".watches"; - public static final String INDEX_TEMPLATE = "watches"; public static final String DOC_TYPE = "watch"; private final ClientProxy client; - private final TemplateUtils templateUtils; private final Watch.Parser watchParser; private final ConcurrentMap watches; @@ -59,10 +56,9 @@ public class WatchStore extends AbstractComponent { private final TimeValue scrollTimeout; @Inject - public WatchStore(Settings settings, ClientProxy client, TemplateUtils templateUtils, Watch.Parser watchParser) { + public WatchStore(Settings settings, ClientProxy client, Watch.Parser watchParser) { super(settings); this.client = client; - this.templateUtils = templateUtils; this.watchParser = watchParser; this.watches = ConcurrentCollections.newConcurrentMap(); @@ -81,7 +77,6 @@ public class WatchStore extends AbstractComponent { try { int count = loadWatches(watchesIndexMetaData.numberOfShards()); logger.debug("loaded [{}] watches from the watches index [{}]", count, INDEX); - templateUtils.putTemplate(INDEX_TEMPLATE, null); started.set(true); } catch (Exception e) { logger.debug("failed to load watches for watch index [{}]", e, INDEX); @@ -89,7 +84,6 @@ public class WatchStore extends AbstractComponent { throw e; } } else { - templateUtils.putTemplate(INDEX_TEMPLATE, null); started.set(true); } } diff --git a/src/test/java/org/elasticsearch/watcher/execution/TriggeredWatchStoreTests.java b/src/test/java/org/elasticsearch/watcher/execution/TriggeredWatchStoreTests.java index d53dc87cea1..d4123ee3f19 100644 --- a/src/test/java/org/elasticsearch/watcher/execution/TriggeredWatchStoreTests.java +++ b/src/test/java/org/elasticsearch/watcher/execution/TriggeredWatchStoreTests.java @@ -28,7 +28,6 @@ import org.elasticsearch.search.internal.InternalSearchHits; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.watcher.history.TriggeredWatchException; -import org.elasticsearch.watcher.support.TemplateUtils; import org.elasticsearch.watcher.support.init.proxy.ClientProxy; import org.hamcrest.core.IsNull; import org.junit.Before; @@ -41,7 +40,6 @@ import static org.hamcrest.core.IsEqual.equalTo; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; -import static org.mockito.Matchers.same; import static org.mockito.Mockito.*; public class TriggeredWatchStoreTests extends ElasticsearchTestCase { @@ -53,11 +51,9 @@ public class TriggeredWatchStoreTests extends ElasticsearchTestCase { @Before public void init() { clientProxy = mock(ClientProxy.class); - TemplateUtils templateUtils = mock(TemplateUtils.class); parser = mock(TriggeredWatch.Parser.class); - triggeredWatchStore = new TriggeredWatchStore(Settings.EMPTY, clientProxy, templateUtils, parser); + triggeredWatchStore = new TriggeredWatchStore(Settings.EMPTY, clientProxy, parser); triggeredWatchStore.start(); - verify(templateUtils, times(1)).putTemplate(same(TriggeredWatchStore.INDEX_TEMPLATE_NAME), any(Settings.class)); } @Test diff --git a/src/test/java/org/elasticsearch/watcher/execution/TriggeredWatchTests.java b/src/test/java/org/elasticsearch/watcher/execution/TriggeredWatchTests.java index ec88470cd82..3c96dce3536 100644 --- a/src/test/java/org/elasticsearch/watcher/execution/TriggeredWatchTests.java +++ b/src/test/java/org/elasticsearch/watcher/execution/TriggeredWatchTests.java @@ -5,15 +5,14 @@ */ package org.elasticsearch.watcher.execution; -import org.joda.time.DateTime; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.watcher.execution.*; import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests; import org.elasticsearch.watcher.test.WatcherTestUtils; import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent; import org.elasticsearch.watcher.watch.Watch; +import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.junit.Test; @@ -40,7 +39,7 @@ public class TriggeredWatchTests extends AbstractWatcherIntegrationTests { } private TriggeredWatch.Parser triggeredWatchParser() { - return internalTestCluster().getInstance(TriggeredWatch.Parser.class); + return internalCluster().getInstance(TriggeredWatch.Parser.class); } diff --git a/src/test/java/org/elasticsearch/watcher/history/HistoryStoreSettingsTests.java b/src/test/java/org/elasticsearch/watcher/history/HistoryStoreSettingsTests.java index cf3fe906a78..ba4a372f913 100644 --- a/src/test/java/org/elasticsearch/watcher/history/HistoryStoreSettingsTests.java +++ b/src/test/java/org/elasticsearch/watcher/history/HistoryStoreSettingsTests.java @@ -9,6 +9,7 @@ import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResp import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.watcher.WatcherModule; import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests; import org.junit.Test; @@ -26,7 +27,7 @@ public class HistoryStoreSettingsTests extends AbstractWatcherIntegrationTests { @Test public void testChangeSettings() throws Exception { - GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(HistoryStore.INDEX_TEMPLATE_NAME).get(); + GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(WatcherModule.HISTORY_TEMPLATE_NAME).get(); assertThat(response.getIndexTemplates().get(0).getSettings().get("index.number_of_shards"), equalTo("1")); assertThat(response.getIndexTemplates().get(0).getSettings().get("index.number_of_replicas"), nullValue()); // this isn't defined in the template, so we rely on ES's default, which is zero assertThat(response.getIndexTemplates().get(0).getSettings().get("index.refresh_interval"), nullValue()); // this isn't defined in the template, so we rely on ES's default, which is 1s @@ -43,7 +44,7 @@ public class HistoryStoreSettingsTests extends AbstractWatcherIntegrationTests { assertBusy(new Runnable() { @Override public void run() { - GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(HistoryStore.INDEX_TEMPLATE_NAME).get(); + GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(WatcherModule.HISTORY_TEMPLATE_NAME).get(); assertThat(response.getIndexTemplates().get(0).getSettings().get("index.number_of_shards"), equalTo("2")); assertThat(response.getIndexTemplates().get(0).getSettings().get("index.number_of_replicas"), equalTo("2")); assertThat(response.getIndexTemplates().get(0).getSettings().get("index.refresh_interval"), equalTo("5m")); @@ -53,7 +54,7 @@ public class HistoryStoreSettingsTests extends AbstractWatcherIntegrationTests { @Test public void testChangeSettings_ignoringForbiddenSetting() throws Exception { - GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(HistoryStore.INDEX_TEMPLATE_NAME).get(); + GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(WatcherModule.HISTORY_TEMPLATE_NAME).get(); assertThat(response.getIndexTemplates().get(0).getSettings().get("index.number_of_shards"), equalTo("1")); assertThat(response.getIndexTemplates().get(0).getSettings().getAsBoolean("index.mapper.dynamic", null), is(false)); @@ -69,7 +70,7 @@ public class HistoryStoreSettingsTests extends AbstractWatcherIntegrationTests { assertBusy(new Runnable() { @Override public void run() { - GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(HistoryStore.INDEX_TEMPLATE_NAME).get(); + GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(WatcherModule.HISTORY_TEMPLATE_NAME).get(); assertThat(response.getIndexTemplates().get(0).getSettings().get("index.number_of_shards"), equalTo("2")); assertThat(response.getIndexTemplates().get(0).getSettings().getAsBoolean("index.mapper.dynamic", null), is(false)); } diff --git a/src/test/java/org/elasticsearch/watcher/history/HistoryStoreTests.java b/src/test/java/org/elasticsearch/watcher/history/HistoryStoreTests.java index bccae81b0b7..3ad1857e6c1 100644 --- a/src/test/java/org/elasticsearch/watcher/history/HistoryStoreTests.java +++ b/src/test/java/org/elasticsearch/watcher/history/HistoryStoreTests.java @@ -7,25 +7,21 @@ package org.elasticsearch.watcher.history; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.cluster.settings.DynamicSettings; -import org.elasticsearch.common.unit.TimeValue; -import org.joda.time.DateTime; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.node.settings.NodeSettingsService; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ElasticsearchTestCase; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.execution.ExecutionState; import org.elasticsearch.watcher.execution.Wid; -import org.elasticsearch.watcher.support.TemplateUtils; import org.elasticsearch.watcher.support.init.proxy.ClientProxy; import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent; +import org.joda.time.DateTime; import org.junit.Before; import org.junit.Test; import org.mockito.Matchers; -import static org.joda.time.DateTimeZone.UTC; import static org.elasticsearch.watcher.test.WatcherMatchers.indexRequest; import static org.hamcrest.core.IsEqual.equalTo; +import static org.joda.time.DateTimeZone.UTC; import static org.mockito.Mockito.*; /** @@ -38,11 +34,7 @@ public class HistoryStoreTests extends ElasticsearchTestCase { @Before public void init() { clientProxy = mock(ClientProxy.class); - TemplateUtils templateUtils = mock(TemplateUtils.class); - NodeSettingsService nodeSettingsService = mock(NodeSettingsService.class); - DynamicSettings dynamicSettings = mock(DynamicSettings.class); - ThreadPool threadPool = mock(ThreadPool.class); - historyStore = new HistoryStore(Settings.EMPTY, clientProxy, templateUtils, nodeSettingsService, dynamicSettings, threadPool); + historyStore = new HistoryStore(Settings.EMPTY, clientProxy); historyStore.start(); } diff --git a/src/test/java/org/elasticsearch/watcher/input/http/HttpInputIntegrationTests.java b/src/test/java/org/elasticsearch/watcher/input/http/HttpInputIntegrationTests.java index e2710a6436d..a0cab9c7884 100644 --- a/src/test/java/org/elasticsearch/watcher/input/http/HttpInputIntegrationTests.java +++ b/src/test/java/org/elasticsearch/watcher/input/http/HttpInputIntegrationTests.java @@ -53,7 +53,7 @@ public class HttpInputIntegrationTests extends AbstractWatcherIntegrationTests { createIndex("index"); client().prepareIndex("index", "type", "id").setSource("{}").setRefresh(true).get(); - InetSocketAddress address = internalTestCluster().httpAddresses()[0]; + InetSocketAddress address = internalCluster().httpAddresses()[0]; watcherClient().preparePutWatch("_name") .setSource(watchBuilder() .trigger(schedule(interval("5s"))) @@ -74,7 +74,7 @@ public class HttpInputIntegrationTests extends AbstractWatcherIntegrationTests { @Test public void testHttpInput_clusterStats() throws Exception { - InetSocketAddress address = internalTestCluster().httpAddresses()[0]; + InetSocketAddress address = internalCluster().httpAddresses()[0]; PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("_name") .setSource(watchBuilder() .trigger(schedule(interval("1s"))) @@ -102,7 +102,7 @@ public class HttpInputIntegrationTests extends AbstractWatcherIntegrationTests { client().prepareIndex("idx", "type").setSource("field", "value").get(); refresh(); - InetSocketAddress address = internalTestCluster().httpAddresses()[0]; + InetSocketAddress address = internalCluster().httpAddresses()[0]; XContentBuilder body = jsonBuilder().prettyPrint().startObject() .field("query").value(termQuery("field", "value")) .endObject(); diff --git a/src/test/java/org/elasticsearch/watcher/license/LicenseIntegrationTests.java b/src/test/java/org/elasticsearch/watcher/license/LicenseIntegrationTests.java index b208bf1fff2..466bba692cc 100644 --- a/src/test/java/org/elasticsearch/watcher/license/LicenseIntegrationTests.java +++ b/src/test/java/org/elasticsearch/watcher/license/LicenseIntegrationTests.java @@ -268,13 +268,13 @@ public class LicenseIntegrationTests extends AbstractWatcherIntegrationTests { } public static void disableLicensing() { - for (MockLicenseService service : internalTestCluster().getInstances(MockLicenseService.class)) { + for (MockLicenseService service : internalCluster().getInstances(MockLicenseService.class)) { service.disable(); } } public static void enableLicensing() { - for (MockLicenseService service : internalTestCluster().getInstances(MockLicenseService.class)) { + for (MockLicenseService service : internalCluster().getInstances(MockLicenseService.class)) { service.enable(); } } diff --git a/src/test/java/org/elasticsearch/watcher/support/TemplateUtilsTests.java b/src/test/java/org/elasticsearch/watcher/support/TemplateUtilsTests.java deleted file mode 100644 index 77b7a78b0c6..00000000000 --- a/src/test/java/org/elasticsearch/watcher/support/TemplateUtilsTests.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.watcher.support; - -import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.plugins.PluginsService; -import org.elasticsearch.test.ElasticsearchIntegrationTest; -import org.elasticsearch.watcher.history.HistoryStore; -import org.elasticsearch.watcher.support.init.proxy.ClientProxy; -import org.junit.Test; - -import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.SUITE; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.core.Is.is; - -/** - */ -@ElasticsearchIntegrationTest.ClusterScope(scope = SUITE, numClientNodes = 0, transportClientRatio = 0, randomDynamicTemplates = false, numDataNodes = 1) -public class TemplateUtilsTests extends ElasticsearchIntegrationTest { - - @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder() - .put(super.nodeSettings(nodeOrdinal)) - .put(PluginsService.LOAD_PLUGIN_FROM_CLASSPATH, false) - .build(); - } - - @Override - protected Settings transportClientSettings() { - return Settings.builder() - .put(super.transportClientSettings()) - .put(PluginsService.LOAD_PLUGIN_FROM_CLASSPATH, false) - .build(); - } - - @Test - public void testPutTemplate() throws Exception { - TemplateUtils templateUtils = new TemplateUtils(Settings.EMPTY, ClientProxy.of(client())); - - Settings.Builder options = Settings.builder(); - options.put("key", "value"); - templateUtils.putTemplate(HistoryStore.INDEX_TEMPLATE_NAME, options.build()); - - GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(HistoryStore.INDEX_TEMPLATE_NAME).get(); - // setting in the file on the classpath: - assertThat(response.getIndexTemplates().get(0).getSettings().getAsBoolean("index.mapper.dynamic", null), is(false)); - // additional setting: - assertThat(response.getIndexTemplates().get(0).getSettings().get("index.key"), equalTo("value")); - } - -} diff --git a/src/test/java/org/elasticsearch/watcher/support/WatcherIndexTemplateRegistryTests.java b/src/test/java/org/elasticsearch/watcher/support/WatcherIndexTemplateRegistryTests.java new file mode 100644 index 00000000000..044f07ad346 --- /dev/null +++ b/src/test/java/org/elasticsearch/watcher/support/WatcherIndexTemplateRegistryTests.java @@ -0,0 +1,62 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.support; + +import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.watcher.WatcherModule; +import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests; +import org.junit.Test; + +import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.TEST; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.core.Is.is; + +/** + */ +@ElasticsearchIntegrationTest.ClusterScope(scope = TEST, numClientNodes = 0, transportClientRatio = 0, randomDynamicTemplates = false, numDataNodes = 1) +public class WatcherIndexTemplateRegistryTests extends AbstractWatcherIntegrationTests { + + @Test + public void testTemplates() throws Exception { + assertAcked( + client().admin().cluster().prepareUpdateSettings() + .setTransientSettings(Settings.builder() + .put("watcher.history.index.key1", "value")) + .get() + ); + + assertBusy(new Runnable() { + @Override + public void run() { + GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(WatcherModule.HISTORY_TEMPLATE_NAME).get(); + assertThat(response.getIndexTemplates().size(), equalTo(1)); + // setting from the file on the classpath: + assertThat(response.getIndexTemplates().get(0).getSettings().getAsBoolean("index.mapper.dynamic", null), is(false)); + // additional setting defined in the node settings: + assertThat(response.getIndexTemplates().get(0).getSettings().get("index.key1"), equalTo("value")); + } + }); + + // Now delete the index template and verify the index template gets added back: + assertAcked(client().admin().indices().prepareDeleteTemplate(WatcherModule.HISTORY_TEMPLATE_NAME).get()); + + assertBusy(new Runnable() { + @Override + public void run() { + GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(WatcherModule.HISTORY_TEMPLATE_NAME).get(); + assertThat(response.getIndexTemplates().size(), equalTo(1)); + // setting from the file on the classpath: + assertThat(response.getIndexTemplates().get(0).getSettings().getAsBoolean("index.mapper.dynamic", null), is(false)); + // additional setting defined in the node settings: + assertThat(response.getIndexTemplates().get(0).getSettings().get("index.key1"), equalTo("value")); + } + }); + } + +} diff --git a/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java b/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java index 84c24338b20..a0cb5b0e8f7 100644 --- a/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java +++ b/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java @@ -5,15 +5,12 @@ */ package org.elasticsearch.watcher.test; -import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.Streams; @@ -32,7 +29,6 @@ import org.elasticsearch.shield.authc.support.SecuredString; import org.elasticsearch.shield.crypto.InternalCryptoService; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; -import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.TestCluster; import org.elasticsearch.watcher.WatcherLifeCycleService; import org.elasticsearch.watcher.WatcherPlugin; @@ -46,7 +42,6 @@ import org.elasticsearch.watcher.client.WatcherClient; import org.elasticsearch.watcher.execution.ExecutionService; import org.elasticsearch.watcher.execution.ExecutionState; import org.elasticsearch.watcher.history.HistoryStore; -import org.elasticsearch.watcher.execution.TriggeredWatchStore; import org.elasticsearch.watcher.license.LicenseService; import org.elasticsearch.watcher.support.clock.ClockMock; import org.elasticsearch.watcher.support.http.HttpClient; @@ -56,7 +51,6 @@ import org.elasticsearch.watcher.trigger.ScheduleTriggerEngineMock; import org.elasticsearch.watcher.trigger.TriggerService; import org.elasticsearch.watcher.trigger.schedule.ScheduleModule; import org.elasticsearch.watcher.watch.Watch; -import org.elasticsearch.watcher.watch.WatchStore; import org.hamcrest.Matcher; import org.jboss.netty.util.internal.SystemPropertyUtil; import org.junit.After; @@ -66,11 +60,13 @@ import org.junit.Before; import java.io.BufferedWriter; import java.io.IOException; import java.io.OutputStream; -import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; -import java.util.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Map; import static org.elasticsearch.index.query.QueryBuilders.boolQuery; import static org.elasticsearch.index.query.QueryBuilders.matchQuery; @@ -92,6 +88,15 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg private static ScheduleModule.Engine scheduleEngine; + @Override + protected TestCluster buildTestCluster(Scope scope, long seed) throws IOException { + if (shieldEnabled == null) { + shieldEnabled = enableShield(); + scheduleEngine = randomFrom(ScheduleModule.Engine.values()); + } + return super.buildTestCluster(scope, seed); + } + @Override protected Settings nodeSettings(int nodeOrdinal) { String scheduleImplName = scheduleEngine().name().toLowerCase(Locale.ROOT); @@ -208,7 +213,7 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg } private void startWatcherIfNodesExist() throws Exception { - if (internalTestCluster().size() > 0) { + if (internalCluster().size() > 0) { ensureLicenseEnabled(); WatcherState state = getInstanceFromMaster(WatcherService.class).state(); if (state == WatcherState.STOPPED) { @@ -234,14 +239,6 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg return false; } - @Override - protected TestCluster buildTestCluster(Scope scope, long seed) throws IOException { - // This overwrites the wipe logic of the test cluster to not remove the watches and watch_history templates. By default all templates are removed - // TODO: We should have the notion of a hidden template (like hidden index / type) that only gets removed when specifically mentioned. - final TestCluster testCluster = super.buildTestCluster(scope, seed); - return new WatcherWrappingCluster(seed, testCluster); - } - protected long docCount(String index, String type, QueryBuilder query) { refresh(); return docCount(index, type, SearchSourceBuilder.searchSource().query(query)); @@ -266,7 +263,7 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg } protected T getInstanceFromMaster(Class type) { - return internalTestCluster().getInstance(type, internalTestCluster().getMasterName()); + return internalCluster().getInstance(type, internalCluster().getMasterName()); } protected Watch.Parser watchParser() { @@ -291,16 +288,16 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg protected WatcherClient watcherClient() { return shieldEnabled ? - new WatcherClient(internalTestCluster().transportClient()) : + new WatcherClient(internalCluster().transportClient()) : new WatcherClient(client()); } protected ScriptServiceProxy scriptService() { - return internalTestCluster().getInstance(ScriptServiceProxy.class); + return internalCluster().getInstance(ScriptServiceProxy.class); } protected HttpClient watcherHttpClient() { - return internalTestCluster().getInstance(HttpClient.class); + return internalCluster().getInstance(HttpClient.class); } protected EmailService noopEmailService() { @@ -442,7 +439,7 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg assertBusy(new Runnable() { @Override public void run() { - for (LicenseService service : internalTestCluster().getInstances(LicenseService.class)) { + for (LicenseService service : internalCluster().getInstances(LicenseService.class)) { assertThat(service.enabled(), is(true)); } } @@ -494,7 +491,7 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg protected void ensureWatcherOnlyRunningOnce() { int running = 0; - for (WatcherService watcherService : internalTestCluster().getInstances(WatcherService.class)) { + for (WatcherService watcherService : internalCluster().getInstances(WatcherService.class)) { if (watcherService.state() == WatcherState.STARTED) { running++; } @@ -502,103 +499,6 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg assertThat("watcher should only run on the elected master node, but it is running on [" + running + "] nodes", running, equalTo(1)); } - protected static InternalTestCluster internalTestCluster() { - return (InternalTestCluster) ((WatcherWrappingCluster) cluster()).testCluster; - } - - // We need this custom impl, because we have custom wipe logic. We don't want the watcher index templates to get deleted between tests - private final class WatcherWrappingCluster extends TestCluster { - - private final TestCluster testCluster; - - private WatcherWrappingCluster(long seed, TestCluster testCluster) { - super(seed); - this.testCluster = testCluster; - } - - @Override - public void beforeTest(Random random, double transportClientRatio) throws IOException { - if (scheduleEngine == null) { - scheduleEngine = randomFrom(ScheduleModule.Engine.values()); - } - if (shieldEnabled == null) { - shieldEnabled = enableShield(); - } - testCluster.beforeTest(random, transportClientRatio); - } - - @Override - public void wipe() { - wipeIndices("_all"); - wipeRepositories(); - - if (size() > 0) { - List templatesToWipe = new ArrayList<>(); - ClusterState state = client().admin().cluster().prepareState().get().getState(); - for (ObjectObjectCursor cursor : state.getMetaData().templates()) { - if (cursor.key.equals(WatchStore.INDEX_TEMPLATE) || cursor.key.equals(HistoryStore.INDEX_TEMPLATE_NAME) || cursor.key.equals(TriggeredWatchStore.INDEX_TEMPLATE_NAME)) { - continue; - } - templatesToWipe.add(cursor.key); - } - if (!templatesToWipe.isEmpty()) { - wipeTemplates(templatesToWipe.toArray(new String[templatesToWipe.size()])); - } - } - } - - @Override - public void afterTest() throws IOException { - testCluster.afterTest(); - } - - @Override - public Client client() { - return testCluster.client(); - } - - @Override - public int size() { - return testCluster.size(); - } - - @Override - public int numDataNodes() { - return testCluster.numDataNodes(); - } - - @Override - public int numDataAndMasterNodes() { - return testCluster.numDataAndMasterNodes(); - } - - @Override - public InetSocketAddress[] httpAddresses() { - return testCluster.httpAddresses(); - } - - @Override - public void close() throws IOException { - testCluster.close(); - } - - @Override - public void ensureEstimatedStats() { - testCluster.ensureEstimatedStats(); - } - - @Override - public String getClusterName() { - return testCluster.getClusterName(); - } - - @Override - public Iterator iterator() { - return testCluster.iterator(); - } - - } - private static class NoopEmailService implements EmailService { @Override diff --git a/src/test/java/org/elasticsearch/watcher/test/integration/NoMasterNodeTests.java b/src/test/java/org/elasticsearch/watcher/test/integration/NoMasterNodeTests.java index 6119293559c..30a14bde7eb 100644 --- a/src/test/java/org/elasticsearch/watcher/test/integration/NoMasterNodeTests.java +++ b/src/test/java/org/elasticsearch/watcher/test/integration/NoMasterNodeTests.java @@ -84,7 +84,7 @@ public class NoMasterNodeTests extends AbstractWatcherIntegrationTests { @Test public void testSimpleFailure() throws Exception { config = new ClusterDiscoveryConfiguration.UnicastZen(2); - internalTestCluster().startNodesAsync(2).get(); + internalCluster().startNodesAsync(2).get(); createIndex("my-index"); ensureWatcherStarted(false); @@ -141,12 +141,12 @@ public class NoMasterNodeTests extends AbstractWatcherIntegrationTests { .put("node.data", false) .put("node.master", true) .build(); - internalTestCluster().startNodesAsync(3, settings).get(); + internalCluster().startNodesAsync(3, settings).get(); settings = Settings.builder() .put("node.data", true) .put("node.master", false) .build(); - internalTestCluster().startNodesAsync(7, settings).get(); + internalCluster().startNodesAsync(7, settings).get(); ensureWatcherStarted(false); ensureLicenseEnabled(); @@ -162,7 +162,7 @@ public class NoMasterNodeTests extends AbstractWatcherIntegrationTests { assertWatchWithMinimumPerformedActionsCount("_watch_id", 1, false); // We still have 2 master node, we should recover from this failure: - internalTestCluster().stopCurrentMasterNode(); + internalCluster().stopCurrentMasterNode(); ensureWatcherStarted(false); ensureWatcherOnlyRunningOnce(); assertWatchWithMinimumPerformedActionsCount("_watch_id", 2, false); @@ -192,7 +192,7 @@ public class NoMasterNodeTests extends AbstractWatcherIntegrationTests { int numberOfWatches = scaledRandomIntBetween(numberOfFailures, 12); logger.info("number of failures [{}], number of watches [{}]", numberOfFailures, numberOfWatches); config = new ClusterDiscoveryConfiguration.UnicastZen(2 + numberOfFailures); - internalTestCluster().startNodesAsync(2).get(); + internalCluster().startNodesAsync(2).get(); createIndex("my-index"); client().prepareIndex("my-index", "my-type").setSource("field", "value").get(); @@ -233,14 +233,14 @@ public class NoMasterNodeTests extends AbstractWatcherIntegrationTests { // will elect itself as master. This is bad and should be fixed in core. What I think that should happen is that // if a node detects that is has lost a node, a node should clear its unicast temporal responses or at least // remove the node that has been removed. This is a workaround: - for (ZenPingService pingService : internalTestCluster().getInstances(ZenPingService.class)) { + for (ZenPingService pingService : internalCluster().getInstances(ZenPingService.class)) { for (ZenPing zenPing : pingService.zenPings()) { if (zenPing instanceof UnicastZenPing) { ((UnicastZenPing) zenPing).clearTemporalResponses(); } } } - internalTestCluster().stopCurrentMasterNode(); + internalCluster().stopCurrentMasterNode(); // Can't use ensureWatcherStopped, b/c that relies on the watcher stats api which requires an elected master node assertBusy(new Runnable() { public void run () { @@ -251,16 +251,16 @@ public class NoMasterNodeTests extends AbstractWatcherIntegrationTests { } }, 30, TimeUnit.SECONDS); // Ensure that the watch manager doesn't run elsewhere - for (WatcherService watcherService : internalTestCluster().getInstances(WatcherService.class)) { + for (WatcherService watcherService : internalCluster().getInstances(WatcherService.class)) { assertThat(watcherService.state(), is(WatcherState.STOPPED)); } - for (ExecutionService executionService : internalTestCluster().getInstances(ExecutionService.class)) { + for (ExecutionService executionService : internalCluster().getInstances(ExecutionService.class)) { assertThat(executionService.executionThreadPoolQueueSize(), equalTo(0l)); } } private void startElectedMasterNodeAndWait() throws Exception { - internalTestCluster().startNode(); + internalCluster().startNode(); ensureWatcherStarted(false); ensureWatcherOnlyRunningOnce(); } diff --git a/src/test/java/org/elasticsearch/watcher/watch/WatchStoreTests.java b/src/test/java/org/elasticsearch/watcher/watch/WatchStoreTests.java index 943929662e8..8cedb6c7b9c 100644 --- a/src/test/java/org/elasticsearch/watcher/watch/WatchStoreTests.java +++ b/src/test/java/org/elasticsearch/watcher/watch/WatchStoreTests.java @@ -28,7 +28,6 @@ import org.elasticsearch.search.internal.InternalSearchHit; import org.elasticsearch.search.internal.InternalSearchHits; import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.watcher.WatcherException; -import org.elasticsearch.watcher.support.TemplateUtils; import org.elasticsearch.watcher.support.init.proxy.ClientProxy; import org.junit.Before; import org.junit.Test; @@ -46,15 +45,13 @@ public class WatchStoreTests extends ElasticsearchTestCase { private WatchStore watchStore; private ClientProxy clientProxy; - private TemplateUtils templateUtils; private Watch.Parser parser; @Before public void init() { clientProxy = mock(ClientProxy.class); - templateUtils = mock(TemplateUtils.class); parser = mock(Watch.Parser.class); - watchStore = new WatchStore(Settings.EMPTY, clientProxy, templateUtils, parser); + watchStore = new WatchStore(Settings.EMPTY, clientProxy, parser); } @Test @@ -68,11 +65,9 @@ public class WatchStoreTests extends ElasticsearchTestCase { watchStore.start(cs); assertThat(watchStore.started(), is(true)); assertThat(watchStore.watches().size(), equalTo(0)); - verify(templateUtils, times(1)).putTemplate("watches", null); verifyZeroInteractions(clientProxy); watchStore.start(cs); - verifyNoMoreInteractions(templateUtils); verifyZeroInteractions(clientProxy); } @@ -97,7 +92,6 @@ public class WatchStoreTests extends ElasticsearchTestCase { ClusterState cs = csBuilder.build(); assertThat(watchStore.validate(cs), is(false)); - verifyZeroInteractions(templateUtils); verifyZeroInteractions(clientProxy); } @@ -131,7 +125,6 @@ public class WatchStoreTests extends ElasticsearchTestCase { } catch (WatcherException e) { assertThat(e.getMessage(), equalTo("not all required shards have been refreshed")); } - verifyZeroInteractions(templateUtils); verify(clientProxy, times(1)).refresh(any(RefreshRequest.class)); verify(clientProxy, never()).search(any(SearchRequest.class), any(TimeValue.class)); verify(clientProxy, never()).clearScroll(anyString()); @@ -171,7 +164,6 @@ public class WatchStoreTests extends ElasticsearchTestCase { } catch (ElasticsearchException e) { assertThat(e.getMessage(), equalTo("Partial response while loading watches")); } - verifyZeroInteractions(templateUtils); verify(clientProxy, times(1)).refresh(any(RefreshRequest.class)); verify(clientProxy, times(1)).search(any(SearchRequest.class), any(TimeValue.class)); verify(clientProxy, times(1)).clearScroll(anyString()); @@ -209,7 +201,6 @@ public class WatchStoreTests extends ElasticsearchTestCase { watchStore.start(cs); assertThat(watchStore.started(), is(true)); assertThat(watchStore.watches().size(), equalTo(0)); - verify(templateUtils, times(1)).putTemplate("watches", null); verify(clientProxy, times(1)).refresh(any(RefreshRequest.class)); verify(clientProxy, times(1)).search(any(SearchRequest.class), any(TimeValue.class)); verify(clientProxy, times(1)).clearScroll(anyString()); @@ -264,7 +255,6 @@ public class WatchStoreTests extends ElasticsearchTestCase { watchStore.start(cs); assertThat(watchStore.started(), is(true)); assertThat(watchStore.watches().size(), equalTo(2)); - verify(templateUtils, times(1)).putTemplate("watches", null); verify(clientProxy, times(1)).refresh(any(RefreshRequest.class)); verify(clientProxy, times(1)).search(any(SearchRequest.class), any(TimeValue.class)); verify(clientProxy, times(2)).searchScroll(anyString(), any(TimeValue.class));