From 0a07d6dee5baf885373ac67f9802aa9355db138c Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 26 Jun 2015 11:52:53 +0200 Subject: [PATCH] index templates: Change the TemplateUtils to be a TemplateRegistry that is responsible for maintaining the Watcher index templates. The TemplateRegistry adds templates based on if these index templates exist in the cluster state. Components that rely on index templates register their template config with the TemplateRegistry. The TemplateRegistry adds these templates in the background when a cluster state update occurs and add component index settings to the index template. Also when component index settings change, the index template will be updated by the TemplateRegistry. If a registered index template gets deleted, it will be added back by the TemplateRegistry in background if in a cluster state the index template is missing. Original commit: elastic/x-pack-elasticsearch@97f4f42160be63d4598a9af5e0b852001ef983d2 --- .../elasticsearch/watcher/WatcherModule.java | 17 +- .../execution/TriggeredWatchStore.java | 46 +---- .../watcher/history/HistoryModule.java | 2 - .../watcher/history/HistoryStore.java | 86 +------- .../watcher/support/TemplateUtils.java | 66 ------ .../support/WatcherIndexTemplateRegistry.java | 192 ++++++++++++++++++ .../watcher/watch/WatchStore.java | 8 +- .../execution/TriggeredWatchStoreTests.java | 6 +- .../execution/TriggeredWatchTests.java | 5 +- .../history/HistoryStoreSettingsTests.java | 9 +- .../watcher/history/HistoryStoreTests.java | 16 +- .../input/http/HttpInputIntegrationTests.java | 6 +- .../license/LicenseIntegrationTests.java | 4 +- .../watcher/support/TemplateUtilsTests.java | 56 ----- .../WatcherIndexTemplateRegistryTests.java | 62 ++++++ .../test/AbstractWatcherIntegrationTests.java | 140 ++----------- .../test/integration/NoMasterNodeTests.java | 20 +- .../watcher/watch/WatchStoreTests.java | 12 +- 18 files changed, 329 insertions(+), 424 deletions(-) delete mode 100644 src/main/java/org/elasticsearch/watcher/support/TemplateUtils.java create mode 100644 src/main/java/org/elasticsearch/watcher/support/WatcherIndexTemplateRegistry.java delete mode 100644 src/test/java/org/elasticsearch/watcher/support/TemplateUtilsTests.java create mode 100644 src/test/java/org/elasticsearch/watcher/support/WatcherIndexTemplateRegistryTests.java diff --git a/src/main/java/org/elasticsearch/watcher/WatcherModule.java b/src/main/java/org/elasticsearch/watcher/WatcherModule.java index 7735837eff8..5084e2e6dd4 100644 --- a/src/main/java/org/elasticsearch/watcher/WatcherModule.java +++ b/src/main/java/org/elasticsearch/watcher/WatcherModule.java @@ -9,6 +9,7 @@ package org.elasticsearch.watcher; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.inject.SpawnModules; +import org.elasticsearch.common.inject.multibindings.Multibinder; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.watcher.actions.ActionModule; import org.elasticsearch.watcher.client.WatcherClientModule; @@ -19,8 +20,8 @@ import org.elasticsearch.watcher.input.InputModule; import org.elasticsearch.watcher.license.LicenseModule; import org.elasticsearch.watcher.rest.WatcherRestModule; import org.elasticsearch.watcher.shield.WatcherShieldModule; -import org.elasticsearch.watcher.support.DynamicIndexName; -import org.elasticsearch.watcher.support.TemplateUtils; +import org.elasticsearch.watcher.support.WatcherIndexTemplateRegistry; +import org.elasticsearch.watcher.support.WatcherIndexTemplateRegistry.TemplateConfig; import org.elasticsearch.watcher.support.clock.ClockModule; import org.elasticsearch.watcher.support.http.HttpClientModule; import org.elasticsearch.watcher.support.init.InitializingModule; @@ -37,6 +38,10 @@ import java.util.Arrays; public class WatcherModule extends AbstractModule implements SpawnModules { + public static final String HISTORY_TEMPLATE_NAME = "watch_history"; + public static final String TRIGGERED_TEMPLATE_NAME = "triggered_watches"; + public static final String WATCHES_TEMPLATE_NAME = "watches"; + protected final Settings settings; public WatcherModule(Settings settings) { @@ -69,8 +74,14 @@ public class WatcherModule extends AbstractModule implements SpawnModules { @Override protected void configure() { bind(WatcherLifeCycleService.class).asEagerSingleton(); - bind(TemplateUtils.class).asEagerSingleton(); bind(WatcherSettingsValidation.class).asEagerSingleton(); + + bind(WatcherIndexTemplateRegistry.class).asEagerSingleton(); + Multibinder multibinder + = Multibinder.newSetBinder(binder(), TemplateConfig.class); + multibinder.addBinding().toInstance(new TemplateConfig(TRIGGERED_TEMPLATE_NAME, "watcher.triggered_watches.index")); + multibinder.addBinding().toInstance(new TemplateConfig(HISTORY_TEMPLATE_NAME, "watcher.history.index")); + multibinder.addBinding().toInstance(new TemplateConfig(WATCHES_TEMPLATE_NAME, "watcher.watches.index")); } } diff --git a/src/main/java/org/elasticsearch/watcher/execution/TriggeredWatchStore.java b/src/main/java/org/elasticsearch/watcher/execution/TriggeredWatchStore.java index a153f3b7ef5..9bd2821d580 100644 --- a/src/main/java/org/elasticsearch/watcher/execution/TriggeredWatchStore.java +++ b/src/main/java/org/elasticsearch/watcher/execution/TriggeredWatchStore.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.watcher.execution; -import com.google.common.collect.ImmutableSet; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; @@ -29,11 +28,13 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.watcher.history.HistoryException; import org.elasticsearch.watcher.history.TriggeredWatchException; -import org.elasticsearch.watcher.support.TemplateUtils; import org.elasticsearch.watcher.support.init.proxy.ClientProxy; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -43,14 +44,10 @@ public class TriggeredWatchStore extends AbstractComponent { public static final String INDEX_NAME = ".triggered_watches"; public static final String DOC_TYPE = "triggered_watch"; - public static final String INDEX_TEMPLATE_NAME = "triggered_watches"; - private static final ImmutableSet forbiddenIndexSettings = ImmutableSet.of("index.mapper.dynamic"); private final int scrollSize; private final ClientProxy client; private final TimeValue scrollTimeout; - private final TemplateUtils templateUtils; - private final Settings customIndexSettings; private final TriggeredWatch.Parser triggeredWatchParser; private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); @@ -59,47 +56,16 @@ public class TriggeredWatchStore extends AbstractComponent { private final AtomicBoolean started = new AtomicBoolean(false); @Inject - public TriggeredWatchStore(Settings settings, ClientProxy client, TemplateUtils templateUtils, TriggeredWatch.Parser triggeredWatchParser) { + public TriggeredWatchStore(Settings settings, ClientProxy client, TriggeredWatch.Parser triggeredWatchParser) { super(settings); this.scrollSize = settings.getAsInt("watcher.execution.scroll.size", 100); this.client = client; this.scrollTimeout = settings.getAsTime("watcher.execution.scroll.timeout", TimeValue.timeValueSeconds(30)); - this.templateUtils = templateUtils; - this.customIndexSettings = updateTriggerWatchesSettings(settings); this.triggeredWatchParser = triggeredWatchParser; } - private Settings updateTriggerWatchesSettings(Settings nodeSettings) { - Settings newSettings = Settings.builder() - .put(nodeSettings.getAsSettings("watcher.triggered_watches.index")) - .build(); - if (newSettings.names().isEmpty()) { - return Settings.EMPTY; - } - - // Filter out forbidden settings: - Settings.Builder builder = Settings.builder(); - for (Map.Entry entry : newSettings.getAsMap().entrySet()) { - String name = "index." + entry.getKey(); - if (forbiddenIndexSettings.contains(name)) { - logger.warn("overriding the default [{}} setting is forbidden. ignoring...", name); - continue; - } - builder.put(name, entry.getValue()); - } - return builder.build(); - } - - public void start() { - if (started.compareAndSet(false, true)) { - try { - templateUtils.putTemplate(INDEX_TEMPLATE_NAME, customIndexSettings); - } catch (Exception e) { - started.set(false); - throw e; - } - } + started.set(true); } public boolean validate(ClusterState state) { diff --git a/src/main/java/org/elasticsearch/watcher/history/HistoryModule.java b/src/main/java/org/elasticsearch/watcher/history/HistoryModule.java index a7e8227bf7a..73c3726f8be 100644 --- a/src/main/java/org/elasticsearch/watcher/history/HistoryModule.java +++ b/src/main/java/org/elasticsearch/watcher/history/HistoryModule.java @@ -13,11 +13,9 @@ import org.elasticsearch.watcher.execution.InternalWatchExecutor; */ public class HistoryModule extends AbstractModule { - public HistoryModule() { } - @Override protected void configure() { bind(HistoryStore.class).asEagerSingleton(); diff --git a/src/main/java/org/elasticsearch/watcher/history/HistoryStore.java b/src/main/java/org/elasticsearch/watcher/history/HistoryStore.java index 70b78e526ea..4553338cc2d 100644 --- a/src/main/java/org/elasticsearch/watcher/history/HistoryStore.java +++ b/src/main/java/org/elasticsearch/watcher/history/HistoryStore.java @@ -10,23 +10,17 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.settings.ClusterDynamicSettings; -import org.elasticsearch.cluster.settings.DynamicSettings; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.watcher.support.init.proxy.ClientProxy; import org.joda.time.DateTime; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.node.settings.NodeSettingsService; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.watcher.support.TemplateUtils; -import org.elasticsearch.watcher.support.init.proxy.ClientProxy; import java.io.IOException; -import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -34,97 +28,29 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; /** */ -public class HistoryStore extends AbstractComponent implements NodeSettingsService.Listener { +public class HistoryStore extends AbstractComponent { public static final String INDEX_PREFIX = ".watch_history-"; public static final String DOC_TYPE = "watch_record"; - public static final String INDEX_TEMPLATE_NAME = "watch_history"; static final DateTimeFormatter indexTimeFormat = DateTimeFormat.forPattern("YYYY.MM.dd"); private static final ImmutableSet forbiddenIndexSettings = ImmutableSet.of("index.mapper.dynamic"); private final ClientProxy client; - private final TemplateUtils templateUtils; - private final ThreadPool threadPool; private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); private final Lock putUpdateLock = readWriteLock.readLock(); private final Lock stopLock = readWriteLock.writeLock(); private final AtomicBoolean started = new AtomicBoolean(false); - private volatile Settings customIndexSettings = Settings.EMPTY; - @Inject - public HistoryStore(Settings settings, ClientProxy client, TemplateUtils templateUtils, NodeSettingsService nodeSettingsService, - @ClusterDynamicSettings DynamicSettings dynamicSettings, ThreadPool threadPool) { + public HistoryStore(Settings settings, ClientProxy client) { super(settings); this.client = client; - this.templateUtils = templateUtils; - this.threadPool = threadPool; - - updateHistorySettings(settings, false); - nodeSettingsService.addListener(this); - dynamicSettings.addDynamicSetting("watcher.history.index.*"); } - @Override - public void onRefreshSettings(Settings settings) { - updateHistorySettings(settings, true); - } - - private void updateHistorySettings(Settings settings, boolean updateIndexTemplate) { - Settings newSettings = Settings.builder() - .put(settings.getAsSettings("watcher.history.index")) - .build(); - if (newSettings.names().isEmpty()) { - return; - } - - boolean changed = false; - Settings.Builder builder = Settings.builder().put(customIndexSettings); - - for (Map.Entry entry : newSettings.getAsMap().entrySet()) { - String name = "index." + entry.getKey(); - if (forbiddenIndexSettings.contains(name)) { - logger.warn("overriding the default [{}} setting is forbidden. ignoring...", name); - continue; - } - - String newValue = entry.getValue(); - String currentValue = customIndexSettings.get(name); - if (!newValue.equals(currentValue)) { - changed = true; - builder.put(name, newValue); - logger.info("changing setting [{}] from [{}] to [{}]", name, currentValue, newValue); - } - } - - if (changed) { - customIndexSettings = builder.build(); - if (updateIndexTemplate) { - // Need to fork to prevent dead lock. (We're on the cluster service update task, but the put index template - // needs to update the cluster state too, and because the update takes is a single threaded operation, - // we would then be stuck) - threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { - @Override - public void run() { - templateUtils.putTemplate(INDEX_TEMPLATE_NAME, customIndexSettings); - } - }); - } - } - } - - public void start() { - if (started.compareAndSet(false, true)) { - try { - templateUtils.putTemplate(INDEX_TEMPLATE_NAME, customIndexSettings); - } catch (Exception e) { - started.set(false); - throw e; - } - } + started.set(true); } public boolean validate(ClusterState state) { diff --git a/src/main/java/org/elasticsearch/watcher/support/TemplateUtils.java b/src/main/java/org/elasticsearch/watcher/support/TemplateUtils.java deleted file mode 100644 index 784b050d8b4..00000000000 --- a/src/main/java/org/elasticsearch/watcher/support/TemplateUtils.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.watcher.support; - -import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; -import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse; -import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.Streams; -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.watcher.WatcherException; -import org.elasticsearch.watcher.support.init.proxy.ClientProxy; -import org.elasticsearch.watcher.watch.WatchStore; - -import java.io.FileNotFoundException; -import java.io.InputStream; - -/** - */ -public class TemplateUtils extends AbstractComponent { - - private final ClientProxy client; - - @Inject - public TemplateUtils(Settings settings, ClientProxy client) { - super(settings); - this.client = client; - } - - /** - * Resolves the template with the specified templateName from the classpath, optionally adds extra settings and - * puts the index template into the cluster. - * - * This method blocks until the template has been created. - */ - public void putTemplate(String templateName, Settings customSettings) { - try (InputStream is = WatchStore.class.getResourceAsStream("/" + templateName + ".json")) { - if (is == null) { - throw new FileNotFoundException("Resource [/" + templateName + ".json] not found in classpath"); - } - final byte[] template; - try (BytesStreamOutput out = new BytesStreamOutput()) { - Streams.copy(is, out); - template = out.bytes().toBytes(); - } - PutIndexTemplateRequest request = new PutIndexTemplateRequest(templateName).source(template); - if (customSettings != null && customSettings.names().size() > 0) { - Settings updatedSettings = Settings.builder() - .put(request.settings()) - .put(customSettings) - .build(); - request.settings(updatedSettings); - } - PutIndexTemplateResponse response = client.putTemplate(request); - } catch (Exception e) { - // throwing an exception to stop exporting process - we don't want to send data unless - // we put in the template for it. - throw new WatcherException("failed to load [{}.json]", e, templateName); - } - } - -} diff --git a/src/main/java/org/elasticsearch/watcher/support/WatcherIndexTemplateRegistry.java b/src/main/java/org/elasticsearch/watcher/support/WatcherIndexTemplateRegistry.java new file mode 100644 index 00000000000..d9e114a68ad --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/support/WatcherIndexTemplateRegistry.java @@ -0,0 +1,192 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.support; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.settings.ClusterDynamicSettings; +import org.elasticsearch.cluster.settings.DynamicSettings; +import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.gateway.GatewayService; +import org.elasticsearch.node.settings.NodeSettingsService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.watcher.support.init.proxy.ClientProxy; +import org.elasticsearch.watcher.watch.WatchStore; + +import java.io.InputStream; +import java.util.Map; +import java.util.Set; + +/** + */ +public class WatcherIndexTemplateRegistry extends AbstractComponent implements ClusterStateListener, NodeSettingsService.Listener { + + private static final ImmutableSet forbiddenIndexSettings = ImmutableSet.of("index.mapper.dynamic"); + + private final ClientProxy client; + private final ThreadPool threadPool; + private final ClusterService clusterService; + private final ImmutableSet indexTemplates; + + private volatile ImmutableMap customIndexSettings; + + @Inject + public WatcherIndexTemplateRegistry(Settings settings, NodeSettingsService nodeSettingsService, ClusterService clusterService, + ThreadPool threadPool, ClientProxy client, Set configs, + @ClusterDynamicSettings DynamicSettings dynamicSettings) { + super(settings); + this.client = client; + this.threadPool = threadPool; + this.clusterService = clusterService; + this.indexTemplates = ImmutableSet.copyOf(configs); + clusterService.add(this); + nodeSettingsService.addListener(this); + + ImmutableMap.Builder customIndexSettingsBuilder = ImmutableMap.builder(); + for (TemplateConfig indexTemplate : indexTemplates) { + Settings customSettings = this.settings.getAsSettings(indexTemplate.getSettingsPrefix()); + customIndexSettings = customIndexSettingsBuilder.put(indexTemplate.getSettingsPrefix(), customSettings).build(); + dynamicSettings.addDynamicSetting(indexTemplate.getDynamicSettingsPrefix()); + } + customIndexSettings = customIndexSettingsBuilder.build(); + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + ClusterState state = event.state(); + if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { + // wait until the gateway has recovered from disk, otherwise we think may not have the index templates, + // while they actually do exist + return; + } + + if (event.localNodeMaster() == false) { + // Only the node that runs or will run Watcher should update the templates. Otherwise unnecessary put template + // calls would happen + return; + } + + for (TemplateConfig template : indexTemplates) { + if (!state.metaData().getTemplates().containsKey(template.getTemplateName())) { + putTemplate(template); + } + } + } + + @Override + public void onRefreshSettings(Settings settings) { + if (clusterService.localNode().masterNode() == false) { + // Only the node that runs or will run Watcher should update the templates. Otherwise unnecessary put template + // calls would happen + return; + } + + for (TemplateConfig config : indexTemplates) { + Settings newSettings = Settings.builder() + .put(settings.getAsSettings(config.getSettingsPrefix())) + .build(); + if (newSettings.names().isEmpty()) { + continue; + } + + Settings existingSettings = customIndexSettings.get(config.getSettingsPrefix()); + if (existingSettings == null) { + existingSettings = Settings.EMPTY; + } + + boolean changed = false; + Settings.Builder builder = Settings.builder().put(existingSettings); + for (Map.Entry newSettingsEntry : newSettings.getAsMap().entrySet()) { + String name = "index." + newSettingsEntry.getKey(); + if (forbiddenIndexSettings.contains(name)) { + logger.warn("overriding the default [{}} setting is forbidden. ignoring...", name); + continue; + } + + String newValue = newSettingsEntry.getValue(); + String currentValue = existingSettings.get(name); + if (!newValue.equals(currentValue)) { + changed = true; + builder.put(name, newValue); + logger.info("changing setting [{}] from [{}] to [{}]", name, currentValue, newValue); + } + } + + if (changed) { + customIndexSettings = MapBuilder.newMapBuilder(customIndexSettings) + .put(config.getSettingsPrefix(), builder.build()) + .immutableMap(); + putTemplate(config); + } + } + } + + private void putTemplate(final TemplateConfig config) { + threadPool.generic().execute(new Runnable() { + @Override + public void run() { + try (InputStream is = WatchStore.class.getResourceAsStream("/" + config.getTemplateName()+ ".json")) { + if (is == null) { + logger.error("Resource [/" + config.getTemplateName() + ".json] not found in classpath"); + return; + } + final byte[] template; + try (BytesStreamOutput out = new BytesStreamOutput()) { + Streams.copy(is, out); + template = out.bytes().toBytes(); + } + + PutIndexTemplateRequest request = new PutIndexTemplateRequest(config.getTemplateName()).source(template); + Settings customSettings = customIndexSettings.get(config.getSettingsPrefix()); + if (customSettings != null && customSettings.names().size() > 0) { + Settings updatedSettings = Settings.builder() + .put(request.settings()) + .put(customSettings) + .build(); + request.settings(updatedSettings); + } + PutIndexTemplateResponse response = client.putTemplate(request); + } catch (Exception e) { + logger.error("failed to load [{}.json]", e, config.getTemplateName()); + } + } + }); + } + + public static class TemplateConfig { + + private final String templateName; + private final String settingsPrefix; + + public TemplateConfig(String templateName, String settingsPrefix) { + this.templateName = templateName; + this.settingsPrefix = settingsPrefix; + } + + public String getTemplateName() { + return templateName; + } + + public String getSettingsPrefix() { + return settingsPrefix; + } + + public String getDynamicSettingsPrefix() { + return settingsPrefix + ".*"; + } + } +} diff --git a/src/main/java/org/elasticsearch/watcher/watch/WatchStore.java b/src/main/java/org/elasticsearch/watcher/watch/WatchStore.java index 8007805a8e1..9463808c6c2 100644 --- a/src/main/java/org/elasticsearch/watcher/watch/WatchStore.java +++ b/src/main/java/org/elasticsearch/watcher/watch/WatchStore.java @@ -33,7 +33,6 @@ import org.elasticsearch.index.engine.DocumentMissingException; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.watcher.WatcherException; -import org.elasticsearch.watcher.support.TemplateUtils; import org.elasticsearch.watcher.support.init.proxy.ClientProxy; import java.io.IOException; @@ -45,11 +44,9 @@ import java.util.concurrent.atomic.AtomicBoolean; public class WatchStore extends AbstractComponent { public static final String INDEX = ".watches"; - public static final String INDEX_TEMPLATE = "watches"; public static final String DOC_TYPE = "watch"; private final ClientProxy client; - private final TemplateUtils templateUtils; private final Watch.Parser watchParser; private final ConcurrentMap watches; @@ -59,10 +56,9 @@ public class WatchStore extends AbstractComponent { private final TimeValue scrollTimeout; @Inject - public WatchStore(Settings settings, ClientProxy client, TemplateUtils templateUtils, Watch.Parser watchParser) { + public WatchStore(Settings settings, ClientProxy client, Watch.Parser watchParser) { super(settings); this.client = client; - this.templateUtils = templateUtils; this.watchParser = watchParser; this.watches = ConcurrentCollections.newConcurrentMap(); @@ -81,7 +77,6 @@ public class WatchStore extends AbstractComponent { try { int count = loadWatches(watchesIndexMetaData.numberOfShards()); logger.debug("loaded [{}] watches from the watches index [{}]", count, INDEX); - templateUtils.putTemplate(INDEX_TEMPLATE, null); started.set(true); } catch (Exception e) { logger.debug("failed to load watches for watch index [{}]", e, INDEX); @@ -89,7 +84,6 @@ public class WatchStore extends AbstractComponent { throw e; } } else { - templateUtils.putTemplate(INDEX_TEMPLATE, null); started.set(true); } } diff --git a/src/test/java/org/elasticsearch/watcher/execution/TriggeredWatchStoreTests.java b/src/test/java/org/elasticsearch/watcher/execution/TriggeredWatchStoreTests.java index d53dc87cea1..d4123ee3f19 100644 --- a/src/test/java/org/elasticsearch/watcher/execution/TriggeredWatchStoreTests.java +++ b/src/test/java/org/elasticsearch/watcher/execution/TriggeredWatchStoreTests.java @@ -28,7 +28,6 @@ import org.elasticsearch.search.internal.InternalSearchHits; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.watcher.history.TriggeredWatchException; -import org.elasticsearch.watcher.support.TemplateUtils; import org.elasticsearch.watcher.support.init.proxy.ClientProxy; import org.hamcrest.core.IsNull; import org.junit.Before; @@ -41,7 +40,6 @@ import static org.hamcrest.core.IsEqual.equalTo; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; -import static org.mockito.Matchers.same; import static org.mockito.Mockito.*; public class TriggeredWatchStoreTests extends ElasticsearchTestCase { @@ -53,11 +51,9 @@ public class TriggeredWatchStoreTests extends ElasticsearchTestCase { @Before public void init() { clientProxy = mock(ClientProxy.class); - TemplateUtils templateUtils = mock(TemplateUtils.class); parser = mock(TriggeredWatch.Parser.class); - triggeredWatchStore = new TriggeredWatchStore(Settings.EMPTY, clientProxy, templateUtils, parser); + triggeredWatchStore = new TriggeredWatchStore(Settings.EMPTY, clientProxy, parser); triggeredWatchStore.start(); - verify(templateUtils, times(1)).putTemplate(same(TriggeredWatchStore.INDEX_TEMPLATE_NAME), any(Settings.class)); } @Test diff --git a/src/test/java/org/elasticsearch/watcher/execution/TriggeredWatchTests.java b/src/test/java/org/elasticsearch/watcher/execution/TriggeredWatchTests.java index ec88470cd82..3c96dce3536 100644 --- a/src/test/java/org/elasticsearch/watcher/execution/TriggeredWatchTests.java +++ b/src/test/java/org/elasticsearch/watcher/execution/TriggeredWatchTests.java @@ -5,15 +5,14 @@ */ package org.elasticsearch.watcher.execution; -import org.joda.time.DateTime; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.watcher.execution.*; import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests; import org.elasticsearch.watcher.test.WatcherTestUtils; import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent; import org.elasticsearch.watcher.watch.Watch; +import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.junit.Test; @@ -40,7 +39,7 @@ public class TriggeredWatchTests extends AbstractWatcherIntegrationTests { } private TriggeredWatch.Parser triggeredWatchParser() { - return internalTestCluster().getInstance(TriggeredWatch.Parser.class); + return internalCluster().getInstance(TriggeredWatch.Parser.class); } diff --git a/src/test/java/org/elasticsearch/watcher/history/HistoryStoreSettingsTests.java b/src/test/java/org/elasticsearch/watcher/history/HistoryStoreSettingsTests.java index cf3fe906a78..ba4a372f913 100644 --- a/src/test/java/org/elasticsearch/watcher/history/HistoryStoreSettingsTests.java +++ b/src/test/java/org/elasticsearch/watcher/history/HistoryStoreSettingsTests.java @@ -9,6 +9,7 @@ import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResp import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.watcher.WatcherModule; import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests; import org.junit.Test; @@ -26,7 +27,7 @@ public class HistoryStoreSettingsTests extends AbstractWatcherIntegrationTests { @Test public void testChangeSettings() throws Exception { - GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(HistoryStore.INDEX_TEMPLATE_NAME).get(); + GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(WatcherModule.HISTORY_TEMPLATE_NAME).get(); assertThat(response.getIndexTemplates().get(0).getSettings().get("index.number_of_shards"), equalTo("1")); assertThat(response.getIndexTemplates().get(0).getSettings().get("index.number_of_replicas"), nullValue()); // this isn't defined in the template, so we rely on ES's default, which is zero assertThat(response.getIndexTemplates().get(0).getSettings().get("index.refresh_interval"), nullValue()); // this isn't defined in the template, so we rely on ES's default, which is 1s @@ -43,7 +44,7 @@ public class HistoryStoreSettingsTests extends AbstractWatcherIntegrationTests { assertBusy(new Runnable() { @Override public void run() { - GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(HistoryStore.INDEX_TEMPLATE_NAME).get(); + GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(WatcherModule.HISTORY_TEMPLATE_NAME).get(); assertThat(response.getIndexTemplates().get(0).getSettings().get("index.number_of_shards"), equalTo("2")); assertThat(response.getIndexTemplates().get(0).getSettings().get("index.number_of_replicas"), equalTo("2")); assertThat(response.getIndexTemplates().get(0).getSettings().get("index.refresh_interval"), equalTo("5m")); @@ -53,7 +54,7 @@ public class HistoryStoreSettingsTests extends AbstractWatcherIntegrationTests { @Test public void testChangeSettings_ignoringForbiddenSetting() throws Exception { - GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(HistoryStore.INDEX_TEMPLATE_NAME).get(); + GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(WatcherModule.HISTORY_TEMPLATE_NAME).get(); assertThat(response.getIndexTemplates().get(0).getSettings().get("index.number_of_shards"), equalTo("1")); assertThat(response.getIndexTemplates().get(0).getSettings().getAsBoolean("index.mapper.dynamic", null), is(false)); @@ -69,7 +70,7 @@ public class HistoryStoreSettingsTests extends AbstractWatcherIntegrationTests { assertBusy(new Runnable() { @Override public void run() { - GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(HistoryStore.INDEX_TEMPLATE_NAME).get(); + GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(WatcherModule.HISTORY_TEMPLATE_NAME).get(); assertThat(response.getIndexTemplates().get(0).getSettings().get("index.number_of_shards"), equalTo("2")); assertThat(response.getIndexTemplates().get(0).getSettings().getAsBoolean("index.mapper.dynamic", null), is(false)); } diff --git a/src/test/java/org/elasticsearch/watcher/history/HistoryStoreTests.java b/src/test/java/org/elasticsearch/watcher/history/HistoryStoreTests.java index bccae81b0b7..3ad1857e6c1 100644 --- a/src/test/java/org/elasticsearch/watcher/history/HistoryStoreTests.java +++ b/src/test/java/org/elasticsearch/watcher/history/HistoryStoreTests.java @@ -7,25 +7,21 @@ package org.elasticsearch.watcher.history; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.cluster.settings.DynamicSettings; -import org.elasticsearch.common.unit.TimeValue; -import org.joda.time.DateTime; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.node.settings.NodeSettingsService; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ElasticsearchTestCase; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.execution.ExecutionState; import org.elasticsearch.watcher.execution.Wid; -import org.elasticsearch.watcher.support.TemplateUtils; import org.elasticsearch.watcher.support.init.proxy.ClientProxy; import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent; +import org.joda.time.DateTime; import org.junit.Before; import org.junit.Test; import org.mockito.Matchers; -import static org.joda.time.DateTimeZone.UTC; import static org.elasticsearch.watcher.test.WatcherMatchers.indexRequest; import static org.hamcrest.core.IsEqual.equalTo; +import static org.joda.time.DateTimeZone.UTC; import static org.mockito.Mockito.*; /** @@ -38,11 +34,7 @@ public class HistoryStoreTests extends ElasticsearchTestCase { @Before public void init() { clientProxy = mock(ClientProxy.class); - TemplateUtils templateUtils = mock(TemplateUtils.class); - NodeSettingsService nodeSettingsService = mock(NodeSettingsService.class); - DynamicSettings dynamicSettings = mock(DynamicSettings.class); - ThreadPool threadPool = mock(ThreadPool.class); - historyStore = new HistoryStore(Settings.EMPTY, clientProxy, templateUtils, nodeSettingsService, dynamicSettings, threadPool); + historyStore = new HistoryStore(Settings.EMPTY, clientProxy); historyStore.start(); } diff --git a/src/test/java/org/elasticsearch/watcher/input/http/HttpInputIntegrationTests.java b/src/test/java/org/elasticsearch/watcher/input/http/HttpInputIntegrationTests.java index e2710a6436d..a0cab9c7884 100644 --- a/src/test/java/org/elasticsearch/watcher/input/http/HttpInputIntegrationTests.java +++ b/src/test/java/org/elasticsearch/watcher/input/http/HttpInputIntegrationTests.java @@ -53,7 +53,7 @@ public class HttpInputIntegrationTests extends AbstractWatcherIntegrationTests { createIndex("index"); client().prepareIndex("index", "type", "id").setSource("{}").setRefresh(true).get(); - InetSocketAddress address = internalTestCluster().httpAddresses()[0]; + InetSocketAddress address = internalCluster().httpAddresses()[0]; watcherClient().preparePutWatch("_name") .setSource(watchBuilder() .trigger(schedule(interval("5s"))) @@ -74,7 +74,7 @@ public class HttpInputIntegrationTests extends AbstractWatcherIntegrationTests { @Test public void testHttpInput_clusterStats() throws Exception { - InetSocketAddress address = internalTestCluster().httpAddresses()[0]; + InetSocketAddress address = internalCluster().httpAddresses()[0]; PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("_name") .setSource(watchBuilder() .trigger(schedule(interval("1s"))) @@ -102,7 +102,7 @@ public class HttpInputIntegrationTests extends AbstractWatcherIntegrationTests { client().prepareIndex("idx", "type").setSource("field", "value").get(); refresh(); - InetSocketAddress address = internalTestCluster().httpAddresses()[0]; + InetSocketAddress address = internalCluster().httpAddresses()[0]; XContentBuilder body = jsonBuilder().prettyPrint().startObject() .field("query").value(termQuery("field", "value")) .endObject(); diff --git a/src/test/java/org/elasticsearch/watcher/license/LicenseIntegrationTests.java b/src/test/java/org/elasticsearch/watcher/license/LicenseIntegrationTests.java index b208bf1fff2..466bba692cc 100644 --- a/src/test/java/org/elasticsearch/watcher/license/LicenseIntegrationTests.java +++ b/src/test/java/org/elasticsearch/watcher/license/LicenseIntegrationTests.java @@ -268,13 +268,13 @@ public class LicenseIntegrationTests extends AbstractWatcherIntegrationTests { } public static void disableLicensing() { - for (MockLicenseService service : internalTestCluster().getInstances(MockLicenseService.class)) { + for (MockLicenseService service : internalCluster().getInstances(MockLicenseService.class)) { service.disable(); } } public static void enableLicensing() { - for (MockLicenseService service : internalTestCluster().getInstances(MockLicenseService.class)) { + for (MockLicenseService service : internalCluster().getInstances(MockLicenseService.class)) { service.enable(); } } diff --git a/src/test/java/org/elasticsearch/watcher/support/TemplateUtilsTests.java b/src/test/java/org/elasticsearch/watcher/support/TemplateUtilsTests.java deleted file mode 100644 index 77b7a78b0c6..00000000000 --- a/src/test/java/org/elasticsearch/watcher/support/TemplateUtilsTests.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.watcher.support; - -import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.plugins.PluginsService; -import org.elasticsearch.test.ElasticsearchIntegrationTest; -import org.elasticsearch.watcher.history.HistoryStore; -import org.elasticsearch.watcher.support.init.proxy.ClientProxy; -import org.junit.Test; - -import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.SUITE; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.core.Is.is; - -/** - */ -@ElasticsearchIntegrationTest.ClusterScope(scope = SUITE, numClientNodes = 0, transportClientRatio = 0, randomDynamicTemplates = false, numDataNodes = 1) -public class TemplateUtilsTests extends ElasticsearchIntegrationTest { - - @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder() - .put(super.nodeSettings(nodeOrdinal)) - .put(PluginsService.LOAD_PLUGIN_FROM_CLASSPATH, false) - .build(); - } - - @Override - protected Settings transportClientSettings() { - return Settings.builder() - .put(super.transportClientSettings()) - .put(PluginsService.LOAD_PLUGIN_FROM_CLASSPATH, false) - .build(); - } - - @Test - public void testPutTemplate() throws Exception { - TemplateUtils templateUtils = new TemplateUtils(Settings.EMPTY, ClientProxy.of(client())); - - Settings.Builder options = Settings.builder(); - options.put("key", "value"); - templateUtils.putTemplate(HistoryStore.INDEX_TEMPLATE_NAME, options.build()); - - GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(HistoryStore.INDEX_TEMPLATE_NAME).get(); - // setting in the file on the classpath: - assertThat(response.getIndexTemplates().get(0).getSettings().getAsBoolean("index.mapper.dynamic", null), is(false)); - // additional setting: - assertThat(response.getIndexTemplates().get(0).getSettings().get("index.key"), equalTo("value")); - } - -} diff --git a/src/test/java/org/elasticsearch/watcher/support/WatcherIndexTemplateRegistryTests.java b/src/test/java/org/elasticsearch/watcher/support/WatcherIndexTemplateRegistryTests.java new file mode 100644 index 00000000000..044f07ad346 --- /dev/null +++ b/src/test/java/org/elasticsearch/watcher/support/WatcherIndexTemplateRegistryTests.java @@ -0,0 +1,62 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.support; + +import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.watcher.WatcherModule; +import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests; +import org.junit.Test; + +import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.TEST; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.core.Is.is; + +/** + */ +@ElasticsearchIntegrationTest.ClusterScope(scope = TEST, numClientNodes = 0, transportClientRatio = 0, randomDynamicTemplates = false, numDataNodes = 1) +public class WatcherIndexTemplateRegistryTests extends AbstractWatcherIntegrationTests { + + @Test + public void testTemplates() throws Exception { + assertAcked( + client().admin().cluster().prepareUpdateSettings() + .setTransientSettings(Settings.builder() + .put("watcher.history.index.key1", "value")) + .get() + ); + + assertBusy(new Runnable() { + @Override + public void run() { + GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(WatcherModule.HISTORY_TEMPLATE_NAME).get(); + assertThat(response.getIndexTemplates().size(), equalTo(1)); + // setting from the file on the classpath: + assertThat(response.getIndexTemplates().get(0).getSettings().getAsBoolean("index.mapper.dynamic", null), is(false)); + // additional setting defined in the node settings: + assertThat(response.getIndexTemplates().get(0).getSettings().get("index.key1"), equalTo("value")); + } + }); + + // Now delete the index template and verify the index template gets added back: + assertAcked(client().admin().indices().prepareDeleteTemplate(WatcherModule.HISTORY_TEMPLATE_NAME).get()); + + assertBusy(new Runnable() { + @Override + public void run() { + GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(WatcherModule.HISTORY_TEMPLATE_NAME).get(); + assertThat(response.getIndexTemplates().size(), equalTo(1)); + // setting from the file on the classpath: + assertThat(response.getIndexTemplates().get(0).getSettings().getAsBoolean("index.mapper.dynamic", null), is(false)); + // additional setting defined in the node settings: + assertThat(response.getIndexTemplates().get(0).getSettings().get("index.key1"), equalTo("value")); + } + }); + } + +} diff --git a/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java b/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java index 84c24338b20..a0cb5b0e8f7 100644 --- a/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java +++ b/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java @@ -5,15 +5,12 @@ */ package org.elasticsearch.watcher.test; -import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.Streams; @@ -32,7 +29,6 @@ import org.elasticsearch.shield.authc.support.SecuredString; import org.elasticsearch.shield.crypto.InternalCryptoService; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; -import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.TestCluster; import org.elasticsearch.watcher.WatcherLifeCycleService; import org.elasticsearch.watcher.WatcherPlugin; @@ -46,7 +42,6 @@ import org.elasticsearch.watcher.client.WatcherClient; import org.elasticsearch.watcher.execution.ExecutionService; import org.elasticsearch.watcher.execution.ExecutionState; import org.elasticsearch.watcher.history.HistoryStore; -import org.elasticsearch.watcher.execution.TriggeredWatchStore; import org.elasticsearch.watcher.license.LicenseService; import org.elasticsearch.watcher.support.clock.ClockMock; import org.elasticsearch.watcher.support.http.HttpClient; @@ -56,7 +51,6 @@ import org.elasticsearch.watcher.trigger.ScheduleTriggerEngineMock; import org.elasticsearch.watcher.trigger.TriggerService; import org.elasticsearch.watcher.trigger.schedule.ScheduleModule; import org.elasticsearch.watcher.watch.Watch; -import org.elasticsearch.watcher.watch.WatchStore; import org.hamcrest.Matcher; import org.jboss.netty.util.internal.SystemPropertyUtil; import org.junit.After; @@ -66,11 +60,13 @@ import org.junit.Before; import java.io.BufferedWriter; import java.io.IOException; import java.io.OutputStream; -import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; -import java.util.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Map; import static org.elasticsearch.index.query.QueryBuilders.boolQuery; import static org.elasticsearch.index.query.QueryBuilders.matchQuery; @@ -92,6 +88,15 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg private static ScheduleModule.Engine scheduleEngine; + @Override + protected TestCluster buildTestCluster(Scope scope, long seed) throws IOException { + if (shieldEnabled == null) { + shieldEnabled = enableShield(); + scheduleEngine = randomFrom(ScheduleModule.Engine.values()); + } + return super.buildTestCluster(scope, seed); + } + @Override protected Settings nodeSettings(int nodeOrdinal) { String scheduleImplName = scheduleEngine().name().toLowerCase(Locale.ROOT); @@ -208,7 +213,7 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg } private void startWatcherIfNodesExist() throws Exception { - if (internalTestCluster().size() > 0) { + if (internalCluster().size() > 0) { ensureLicenseEnabled(); WatcherState state = getInstanceFromMaster(WatcherService.class).state(); if (state == WatcherState.STOPPED) { @@ -234,14 +239,6 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg return false; } - @Override - protected TestCluster buildTestCluster(Scope scope, long seed) throws IOException { - // This overwrites the wipe logic of the test cluster to not remove the watches and watch_history templates. By default all templates are removed - // TODO: We should have the notion of a hidden template (like hidden index / type) that only gets removed when specifically mentioned. - final TestCluster testCluster = super.buildTestCluster(scope, seed); - return new WatcherWrappingCluster(seed, testCluster); - } - protected long docCount(String index, String type, QueryBuilder query) { refresh(); return docCount(index, type, SearchSourceBuilder.searchSource().query(query)); @@ -266,7 +263,7 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg } protected T getInstanceFromMaster(Class type) { - return internalTestCluster().getInstance(type, internalTestCluster().getMasterName()); + return internalCluster().getInstance(type, internalCluster().getMasterName()); } protected Watch.Parser watchParser() { @@ -291,16 +288,16 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg protected WatcherClient watcherClient() { return shieldEnabled ? - new WatcherClient(internalTestCluster().transportClient()) : + new WatcherClient(internalCluster().transportClient()) : new WatcherClient(client()); } protected ScriptServiceProxy scriptService() { - return internalTestCluster().getInstance(ScriptServiceProxy.class); + return internalCluster().getInstance(ScriptServiceProxy.class); } protected HttpClient watcherHttpClient() { - return internalTestCluster().getInstance(HttpClient.class); + return internalCluster().getInstance(HttpClient.class); } protected EmailService noopEmailService() { @@ -442,7 +439,7 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg assertBusy(new Runnable() { @Override public void run() { - for (LicenseService service : internalTestCluster().getInstances(LicenseService.class)) { + for (LicenseService service : internalCluster().getInstances(LicenseService.class)) { assertThat(service.enabled(), is(true)); } } @@ -494,7 +491,7 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg protected void ensureWatcherOnlyRunningOnce() { int running = 0; - for (WatcherService watcherService : internalTestCluster().getInstances(WatcherService.class)) { + for (WatcherService watcherService : internalCluster().getInstances(WatcherService.class)) { if (watcherService.state() == WatcherState.STARTED) { running++; } @@ -502,103 +499,6 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg assertThat("watcher should only run on the elected master node, but it is running on [" + running + "] nodes", running, equalTo(1)); } - protected static InternalTestCluster internalTestCluster() { - return (InternalTestCluster) ((WatcherWrappingCluster) cluster()).testCluster; - } - - // We need this custom impl, because we have custom wipe logic. We don't want the watcher index templates to get deleted between tests - private final class WatcherWrappingCluster extends TestCluster { - - private final TestCluster testCluster; - - private WatcherWrappingCluster(long seed, TestCluster testCluster) { - super(seed); - this.testCluster = testCluster; - } - - @Override - public void beforeTest(Random random, double transportClientRatio) throws IOException { - if (scheduleEngine == null) { - scheduleEngine = randomFrom(ScheduleModule.Engine.values()); - } - if (shieldEnabled == null) { - shieldEnabled = enableShield(); - } - testCluster.beforeTest(random, transportClientRatio); - } - - @Override - public void wipe() { - wipeIndices("_all"); - wipeRepositories(); - - if (size() > 0) { - List templatesToWipe = new ArrayList<>(); - ClusterState state = client().admin().cluster().prepareState().get().getState(); - for (ObjectObjectCursor cursor : state.getMetaData().templates()) { - if (cursor.key.equals(WatchStore.INDEX_TEMPLATE) || cursor.key.equals(HistoryStore.INDEX_TEMPLATE_NAME) || cursor.key.equals(TriggeredWatchStore.INDEX_TEMPLATE_NAME)) { - continue; - } - templatesToWipe.add(cursor.key); - } - if (!templatesToWipe.isEmpty()) { - wipeTemplates(templatesToWipe.toArray(new String[templatesToWipe.size()])); - } - } - } - - @Override - public void afterTest() throws IOException { - testCluster.afterTest(); - } - - @Override - public Client client() { - return testCluster.client(); - } - - @Override - public int size() { - return testCluster.size(); - } - - @Override - public int numDataNodes() { - return testCluster.numDataNodes(); - } - - @Override - public int numDataAndMasterNodes() { - return testCluster.numDataAndMasterNodes(); - } - - @Override - public InetSocketAddress[] httpAddresses() { - return testCluster.httpAddresses(); - } - - @Override - public void close() throws IOException { - testCluster.close(); - } - - @Override - public void ensureEstimatedStats() { - testCluster.ensureEstimatedStats(); - } - - @Override - public String getClusterName() { - return testCluster.getClusterName(); - } - - @Override - public Iterator iterator() { - return testCluster.iterator(); - } - - } - private static class NoopEmailService implements EmailService { @Override diff --git a/src/test/java/org/elasticsearch/watcher/test/integration/NoMasterNodeTests.java b/src/test/java/org/elasticsearch/watcher/test/integration/NoMasterNodeTests.java index 6119293559c..30a14bde7eb 100644 --- a/src/test/java/org/elasticsearch/watcher/test/integration/NoMasterNodeTests.java +++ b/src/test/java/org/elasticsearch/watcher/test/integration/NoMasterNodeTests.java @@ -84,7 +84,7 @@ public class NoMasterNodeTests extends AbstractWatcherIntegrationTests { @Test public void testSimpleFailure() throws Exception { config = new ClusterDiscoveryConfiguration.UnicastZen(2); - internalTestCluster().startNodesAsync(2).get(); + internalCluster().startNodesAsync(2).get(); createIndex("my-index"); ensureWatcherStarted(false); @@ -141,12 +141,12 @@ public class NoMasterNodeTests extends AbstractWatcherIntegrationTests { .put("node.data", false) .put("node.master", true) .build(); - internalTestCluster().startNodesAsync(3, settings).get(); + internalCluster().startNodesAsync(3, settings).get(); settings = Settings.builder() .put("node.data", true) .put("node.master", false) .build(); - internalTestCluster().startNodesAsync(7, settings).get(); + internalCluster().startNodesAsync(7, settings).get(); ensureWatcherStarted(false); ensureLicenseEnabled(); @@ -162,7 +162,7 @@ public class NoMasterNodeTests extends AbstractWatcherIntegrationTests { assertWatchWithMinimumPerformedActionsCount("_watch_id", 1, false); // We still have 2 master node, we should recover from this failure: - internalTestCluster().stopCurrentMasterNode(); + internalCluster().stopCurrentMasterNode(); ensureWatcherStarted(false); ensureWatcherOnlyRunningOnce(); assertWatchWithMinimumPerformedActionsCount("_watch_id", 2, false); @@ -192,7 +192,7 @@ public class NoMasterNodeTests extends AbstractWatcherIntegrationTests { int numberOfWatches = scaledRandomIntBetween(numberOfFailures, 12); logger.info("number of failures [{}], number of watches [{}]", numberOfFailures, numberOfWatches); config = new ClusterDiscoveryConfiguration.UnicastZen(2 + numberOfFailures); - internalTestCluster().startNodesAsync(2).get(); + internalCluster().startNodesAsync(2).get(); createIndex("my-index"); client().prepareIndex("my-index", "my-type").setSource("field", "value").get(); @@ -233,14 +233,14 @@ public class NoMasterNodeTests extends AbstractWatcherIntegrationTests { // will elect itself as master. This is bad and should be fixed in core. What I think that should happen is that // if a node detects that is has lost a node, a node should clear its unicast temporal responses or at least // remove the node that has been removed. This is a workaround: - for (ZenPingService pingService : internalTestCluster().getInstances(ZenPingService.class)) { + for (ZenPingService pingService : internalCluster().getInstances(ZenPingService.class)) { for (ZenPing zenPing : pingService.zenPings()) { if (zenPing instanceof UnicastZenPing) { ((UnicastZenPing) zenPing).clearTemporalResponses(); } } } - internalTestCluster().stopCurrentMasterNode(); + internalCluster().stopCurrentMasterNode(); // Can't use ensureWatcherStopped, b/c that relies on the watcher stats api which requires an elected master node assertBusy(new Runnable() { public void run () { @@ -251,16 +251,16 @@ public class NoMasterNodeTests extends AbstractWatcherIntegrationTests { } }, 30, TimeUnit.SECONDS); // Ensure that the watch manager doesn't run elsewhere - for (WatcherService watcherService : internalTestCluster().getInstances(WatcherService.class)) { + for (WatcherService watcherService : internalCluster().getInstances(WatcherService.class)) { assertThat(watcherService.state(), is(WatcherState.STOPPED)); } - for (ExecutionService executionService : internalTestCluster().getInstances(ExecutionService.class)) { + for (ExecutionService executionService : internalCluster().getInstances(ExecutionService.class)) { assertThat(executionService.executionThreadPoolQueueSize(), equalTo(0l)); } } private void startElectedMasterNodeAndWait() throws Exception { - internalTestCluster().startNode(); + internalCluster().startNode(); ensureWatcherStarted(false); ensureWatcherOnlyRunningOnce(); } diff --git a/src/test/java/org/elasticsearch/watcher/watch/WatchStoreTests.java b/src/test/java/org/elasticsearch/watcher/watch/WatchStoreTests.java index 943929662e8..8cedb6c7b9c 100644 --- a/src/test/java/org/elasticsearch/watcher/watch/WatchStoreTests.java +++ b/src/test/java/org/elasticsearch/watcher/watch/WatchStoreTests.java @@ -28,7 +28,6 @@ import org.elasticsearch.search.internal.InternalSearchHit; import org.elasticsearch.search.internal.InternalSearchHits; import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.watcher.WatcherException; -import org.elasticsearch.watcher.support.TemplateUtils; import org.elasticsearch.watcher.support.init.proxy.ClientProxy; import org.junit.Before; import org.junit.Test; @@ -46,15 +45,13 @@ public class WatchStoreTests extends ElasticsearchTestCase { private WatchStore watchStore; private ClientProxy clientProxy; - private TemplateUtils templateUtils; private Watch.Parser parser; @Before public void init() { clientProxy = mock(ClientProxy.class); - templateUtils = mock(TemplateUtils.class); parser = mock(Watch.Parser.class); - watchStore = new WatchStore(Settings.EMPTY, clientProxy, templateUtils, parser); + watchStore = new WatchStore(Settings.EMPTY, clientProxy, parser); } @Test @@ -68,11 +65,9 @@ public class WatchStoreTests extends ElasticsearchTestCase { watchStore.start(cs); assertThat(watchStore.started(), is(true)); assertThat(watchStore.watches().size(), equalTo(0)); - verify(templateUtils, times(1)).putTemplate("watches", null); verifyZeroInteractions(clientProxy); watchStore.start(cs); - verifyNoMoreInteractions(templateUtils); verifyZeroInteractions(clientProxy); } @@ -97,7 +92,6 @@ public class WatchStoreTests extends ElasticsearchTestCase { ClusterState cs = csBuilder.build(); assertThat(watchStore.validate(cs), is(false)); - verifyZeroInteractions(templateUtils); verifyZeroInteractions(clientProxy); } @@ -131,7 +125,6 @@ public class WatchStoreTests extends ElasticsearchTestCase { } catch (WatcherException e) { assertThat(e.getMessage(), equalTo("not all required shards have been refreshed")); } - verifyZeroInteractions(templateUtils); verify(clientProxy, times(1)).refresh(any(RefreshRequest.class)); verify(clientProxy, never()).search(any(SearchRequest.class), any(TimeValue.class)); verify(clientProxy, never()).clearScroll(anyString()); @@ -171,7 +164,6 @@ public class WatchStoreTests extends ElasticsearchTestCase { } catch (ElasticsearchException e) { assertThat(e.getMessage(), equalTo("Partial response while loading watches")); } - verifyZeroInteractions(templateUtils); verify(clientProxy, times(1)).refresh(any(RefreshRequest.class)); verify(clientProxy, times(1)).search(any(SearchRequest.class), any(TimeValue.class)); verify(clientProxy, times(1)).clearScroll(anyString()); @@ -209,7 +201,6 @@ public class WatchStoreTests extends ElasticsearchTestCase { watchStore.start(cs); assertThat(watchStore.started(), is(true)); assertThat(watchStore.watches().size(), equalTo(0)); - verify(templateUtils, times(1)).putTemplate("watches", null); verify(clientProxy, times(1)).refresh(any(RefreshRequest.class)); verify(clientProxy, times(1)).search(any(SearchRequest.class), any(TimeValue.class)); verify(clientProxy, times(1)).clearScroll(anyString()); @@ -264,7 +255,6 @@ public class WatchStoreTests extends ElasticsearchTestCase { watchStore.start(cs); assertThat(watchStore.started(), is(true)); assertThat(watchStore.watches().size(), equalTo(2)); - verify(templateUtils, times(1)).putTemplate("watches", null); verify(clientProxy, times(1)).refresh(any(RefreshRequest.class)); verify(clientProxy, times(1)).search(any(SearchRequest.class), any(TimeValue.class)); verify(clientProxy, times(2)).searchScroll(anyString(), any(TimeValue.class));