index templates: Change the TemplateUtils to be a TemplateRegistry that is responsible for maintaining the Watcher index templates.

The TemplateRegistry adds templates based on if these index templates exist in the cluster state. Components that rely on index templates register their template config with the TemplateRegistry. The TemplateRegistry adds these templates in the background when a cluster state update occurs and add component index settings to the index template. Also when component index settings change, the index template will be updated by the TemplateRegistry.

If a registered index template gets deleted, it will be added back by the TemplateRegistry in background if in a cluster state the index template is missing.

Original commit: elastic/x-pack-elasticsearch@97f4f42160
This commit is contained in:
Martijn van Groningen 2015-06-26 11:52:53 +02:00
parent 521b6e8cf3
commit 0a07d6dee5
18 changed files with 329 additions and 424 deletions

View File

@ -9,6 +9,7 @@ package org.elasticsearch.watcher;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.SpawnModules;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.watcher.actions.ActionModule;
import org.elasticsearch.watcher.client.WatcherClientModule;
@ -19,8 +20,8 @@ import org.elasticsearch.watcher.input.InputModule;
import org.elasticsearch.watcher.license.LicenseModule;
import org.elasticsearch.watcher.rest.WatcherRestModule;
import org.elasticsearch.watcher.shield.WatcherShieldModule;
import org.elasticsearch.watcher.support.DynamicIndexName;
import org.elasticsearch.watcher.support.TemplateUtils;
import org.elasticsearch.watcher.support.WatcherIndexTemplateRegistry;
import org.elasticsearch.watcher.support.WatcherIndexTemplateRegistry.TemplateConfig;
import org.elasticsearch.watcher.support.clock.ClockModule;
import org.elasticsearch.watcher.support.http.HttpClientModule;
import org.elasticsearch.watcher.support.init.InitializingModule;
@ -37,6 +38,10 @@ import java.util.Arrays;
public class WatcherModule extends AbstractModule implements SpawnModules {
public static final String HISTORY_TEMPLATE_NAME = "watch_history";
public static final String TRIGGERED_TEMPLATE_NAME = "triggered_watches";
public static final String WATCHES_TEMPLATE_NAME = "watches";
protected final Settings settings;
public WatcherModule(Settings settings) {
@ -69,8 +74,14 @@ public class WatcherModule extends AbstractModule implements SpawnModules {
@Override
protected void configure() {
bind(WatcherLifeCycleService.class).asEagerSingleton();
bind(TemplateUtils.class).asEagerSingleton();
bind(WatcherSettingsValidation.class).asEagerSingleton();
bind(WatcherIndexTemplateRegistry.class).asEagerSingleton();
Multibinder<TemplateConfig> multibinder
= Multibinder.newSetBinder(binder(), TemplateConfig.class);
multibinder.addBinding().toInstance(new TemplateConfig(TRIGGERED_TEMPLATE_NAME, "watcher.triggered_watches.index"));
multibinder.addBinding().toInstance(new TemplateConfig(HISTORY_TEMPLATE_NAME, "watcher.history.index"));
multibinder.addBinding().toInstance(new TemplateConfig(WATCHES_TEMPLATE_NAME, "watcher.watches.index"));
}
}

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.watcher.execution;
import com.google.common.collect.ImmutableSet;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
@ -29,11 +28,13 @@ import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.watcher.history.HistoryException;
import org.elasticsearch.watcher.history.TriggeredWatchException;
import org.elasticsearch.watcher.support.TemplateUtils;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import java.io.IOException;
import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@ -43,14 +44,10 @@ public class TriggeredWatchStore extends AbstractComponent {
public static final String INDEX_NAME = ".triggered_watches";
public static final String DOC_TYPE = "triggered_watch";
public static final String INDEX_TEMPLATE_NAME = "triggered_watches";
private static final ImmutableSet<String> forbiddenIndexSettings = ImmutableSet.of("index.mapper.dynamic");
private final int scrollSize;
private final ClientProxy client;
private final TimeValue scrollTimeout;
private final TemplateUtils templateUtils;
private final Settings customIndexSettings;
private final TriggeredWatch.Parser triggeredWatchParser;
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
@ -59,47 +56,16 @@ public class TriggeredWatchStore extends AbstractComponent {
private final AtomicBoolean started = new AtomicBoolean(false);
@Inject
public TriggeredWatchStore(Settings settings, ClientProxy client, TemplateUtils templateUtils, TriggeredWatch.Parser triggeredWatchParser) {
public TriggeredWatchStore(Settings settings, ClientProxy client, TriggeredWatch.Parser triggeredWatchParser) {
super(settings);
this.scrollSize = settings.getAsInt("watcher.execution.scroll.size", 100);
this.client = client;
this.scrollTimeout = settings.getAsTime("watcher.execution.scroll.timeout", TimeValue.timeValueSeconds(30));
this.templateUtils = templateUtils;
this.customIndexSettings = updateTriggerWatchesSettings(settings);
this.triggeredWatchParser = triggeredWatchParser;
}
private Settings updateTriggerWatchesSettings(Settings nodeSettings) {
Settings newSettings = Settings.builder()
.put(nodeSettings.getAsSettings("watcher.triggered_watches.index"))
.build();
if (newSettings.names().isEmpty()) {
return Settings.EMPTY;
}
// Filter out forbidden settings:
Settings.Builder builder = Settings.builder();
for (Map.Entry<String, String> entry : newSettings.getAsMap().entrySet()) {
String name = "index." + entry.getKey();
if (forbiddenIndexSettings.contains(name)) {
logger.warn("overriding the default [{}} setting is forbidden. ignoring...", name);
continue;
}
builder.put(name, entry.getValue());
}
return builder.build();
}
public void start() {
if (started.compareAndSet(false, true)) {
try {
templateUtils.putTemplate(INDEX_TEMPLATE_NAME, customIndexSettings);
} catch (Exception e) {
started.set(false);
throw e;
}
}
started.set(true);
}
public boolean validate(ClusterState state) {

View File

@ -13,11 +13,9 @@ import org.elasticsearch.watcher.execution.InternalWatchExecutor;
*/
public class HistoryModule extends AbstractModule {
public HistoryModule() {
}
@Override
protected void configure() {
bind(HistoryStore.class).asEagerSingleton();

View File

@ -10,23 +10,17 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.settings.ClusterDynamicSettings;
import org.elasticsearch.cluster.settings.DynamicSettings;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.support.TemplateUtils;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@ -34,97 +28,29 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
*/
public class HistoryStore extends AbstractComponent implements NodeSettingsService.Listener {
public class HistoryStore extends AbstractComponent {
public static final String INDEX_PREFIX = ".watch_history-";
public static final String DOC_TYPE = "watch_record";
public static final String INDEX_TEMPLATE_NAME = "watch_history";
static final DateTimeFormatter indexTimeFormat = DateTimeFormat.forPattern("YYYY.MM.dd");
private static final ImmutableSet<String> forbiddenIndexSettings = ImmutableSet.of("index.mapper.dynamic");
private final ClientProxy client;
private final TemplateUtils templateUtils;
private final ThreadPool threadPool;
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Lock putUpdateLock = readWriteLock.readLock();
private final Lock stopLock = readWriteLock.writeLock();
private final AtomicBoolean started = new AtomicBoolean(false);
private volatile Settings customIndexSettings = Settings.EMPTY;
@Inject
public HistoryStore(Settings settings, ClientProxy client, TemplateUtils templateUtils, NodeSettingsService nodeSettingsService,
@ClusterDynamicSettings DynamicSettings dynamicSettings, ThreadPool threadPool) {
public HistoryStore(Settings settings, ClientProxy client) {
super(settings);
this.client = client;
this.templateUtils = templateUtils;
this.threadPool = threadPool;
updateHistorySettings(settings, false);
nodeSettingsService.addListener(this);
dynamicSettings.addDynamicSetting("watcher.history.index.*");
}
@Override
public void onRefreshSettings(Settings settings) {
updateHistorySettings(settings, true);
}
private void updateHistorySettings(Settings settings, boolean updateIndexTemplate) {
Settings newSettings = Settings.builder()
.put(settings.getAsSettings("watcher.history.index"))
.build();
if (newSettings.names().isEmpty()) {
return;
}
boolean changed = false;
Settings.Builder builder = Settings.builder().put(customIndexSettings);
for (Map.Entry<String, String> entry : newSettings.getAsMap().entrySet()) {
String name = "index." + entry.getKey();
if (forbiddenIndexSettings.contains(name)) {
logger.warn("overriding the default [{}} setting is forbidden. ignoring...", name);
continue;
}
String newValue = entry.getValue();
String currentValue = customIndexSettings.get(name);
if (!newValue.equals(currentValue)) {
changed = true;
builder.put(name, newValue);
logger.info("changing setting [{}] from [{}] to [{}]", name, currentValue, newValue);
}
}
if (changed) {
customIndexSettings = builder.build();
if (updateIndexTemplate) {
// Need to fork to prevent dead lock. (We're on the cluster service update task, but the put index template
// needs to update the cluster state too, and because the update takes is a single threaded operation,
// we would then be stuck)
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
@Override
public void run() {
templateUtils.putTemplate(INDEX_TEMPLATE_NAME, customIndexSettings);
}
});
}
}
}
public void start() {
if (started.compareAndSet(false, true)) {
try {
templateUtils.putTemplate(INDEX_TEMPLATE_NAME, customIndexSettings);
} catch (Exception e) {
started.set(false);
throw e;
}
}
started.set(true);
}
public boolean validate(ClusterState state) {

View File

@ -1,66 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.support;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.watcher.WatcherException;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.watch.WatchStore;
import java.io.FileNotFoundException;
import java.io.InputStream;
/**
*/
public class TemplateUtils extends AbstractComponent {
private final ClientProxy client;
@Inject
public TemplateUtils(Settings settings, ClientProxy client) {
super(settings);
this.client = client;
}
/**
* Resolves the template with the specified templateName from the classpath, optionally adds extra settings and
* puts the index template into the cluster.
*
* This method blocks until the template has been created.
*/
public void putTemplate(String templateName, Settings customSettings) {
try (InputStream is = WatchStore.class.getResourceAsStream("/" + templateName + ".json")) {
if (is == null) {
throw new FileNotFoundException("Resource [/" + templateName + ".json] not found in classpath");
}
final byte[] template;
try (BytesStreamOutput out = new BytesStreamOutput()) {
Streams.copy(is, out);
template = out.bytes().toBytes();
}
PutIndexTemplateRequest request = new PutIndexTemplateRequest(templateName).source(template);
if (customSettings != null && customSettings.names().size() > 0) {
Settings updatedSettings = Settings.builder()
.put(request.settings())
.put(customSettings)
.build();
request.settings(updatedSettings);
}
PutIndexTemplateResponse response = client.putTemplate(request);
} catch (Exception e) {
// throwing an exception to stop exporting process - we don't want to send data unless
// we put in the template for it.
throw new WatcherException("failed to load [{}.json]", e, templateName);
}
}
}

View File

@ -0,0 +1,192 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.support;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.settings.ClusterDynamicSettings;
import org.elasticsearch.cluster.settings.DynamicSettings;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.watch.WatchStore;
import java.io.InputStream;
import java.util.Map;
import java.util.Set;
/**
*/
public class WatcherIndexTemplateRegistry extends AbstractComponent implements ClusterStateListener, NodeSettingsService.Listener {
private static final ImmutableSet<String> forbiddenIndexSettings = ImmutableSet.of("index.mapper.dynamic");
private final ClientProxy client;
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final ImmutableSet<TemplateConfig> indexTemplates;
private volatile ImmutableMap<String, Settings> customIndexSettings;
@Inject
public WatcherIndexTemplateRegistry(Settings settings, NodeSettingsService nodeSettingsService, ClusterService clusterService,
ThreadPool threadPool, ClientProxy client, Set<TemplateConfig> configs,
@ClusterDynamicSettings DynamicSettings dynamicSettings) {
super(settings);
this.client = client;
this.threadPool = threadPool;
this.clusterService = clusterService;
this.indexTemplates = ImmutableSet.copyOf(configs);
clusterService.add(this);
nodeSettingsService.addListener(this);
ImmutableMap.Builder<String, Settings> customIndexSettingsBuilder = ImmutableMap.builder();
for (TemplateConfig indexTemplate : indexTemplates) {
Settings customSettings = this.settings.getAsSettings(indexTemplate.getSettingsPrefix());
customIndexSettings = customIndexSettingsBuilder.put(indexTemplate.getSettingsPrefix(), customSettings).build();
dynamicSettings.addDynamicSetting(indexTemplate.getDynamicSettingsPrefix());
}
customIndexSettings = customIndexSettingsBuilder.build();
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
ClusterState state = event.state();
if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// wait until the gateway has recovered from disk, otherwise we think may not have the index templates,
// while they actually do exist
return;
}
if (event.localNodeMaster() == false) {
// Only the node that runs or will run Watcher should update the templates. Otherwise unnecessary put template
// calls would happen
return;
}
for (TemplateConfig template : indexTemplates) {
if (!state.metaData().getTemplates().containsKey(template.getTemplateName())) {
putTemplate(template);
}
}
}
@Override
public void onRefreshSettings(Settings settings) {
if (clusterService.localNode().masterNode() == false) {
// Only the node that runs or will run Watcher should update the templates. Otherwise unnecessary put template
// calls would happen
return;
}
for (TemplateConfig config : indexTemplates) {
Settings newSettings = Settings.builder()
.put(settings.getAsSettings(config.getSettingsPrefix()))
.build();
if (newSettings.names().isEmpty()) {
continue;
}
Settings existingSettings = customIndexSettings.get(config.getSettingsPrefix());
if (existingSettings == null) {
existingSettings = Settings.EMPTY;
}
boolean changed = false;
Settings.Builder builder = Settings.builder().put(existingSettings);
for (Map.Entry<String, String> newSettingsEntry : newSettings.getAsMap().entrySet()) {
String name = "index." + newSettingsEntry.getKey();
if (forbiddenIndexSettings.contains(name)) {
logger.warn("overriding the default [{}} setting is forbidden. ignoring...", name);
continue;
}
String newValue = newSettingsEntry.getValue();
String currentValue = existingSettings.get(name);
if (!newValue.equals(currentValue)) {
changed = true;
builder.put(name, newValue);
logger.info("changing setting [{}] from [{}] to [{}]", name, currentValue, newValue);
}
}
if (changed) {
customIndexSettings = MapBuilder.newMapBuilder(customIndexSettings)
.put(config.getSettingsPrefix(), builder.build())
.immutableMap();
putTemplate(config);
}
}
}
private void putTemplate(final TemplateConfig config) {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
try (InputStream is = WatchStore.class.getResourceAsStream("/" + config.getTemplateName()+ ".json")) {
if (is == null) {
logger.error("Resource [/" + config.getTemplateName() + ".json] not found in classpath");
return;
}
final byte[] template;
try (BytesStreamOutput out = new BytesStreamOutput()) {
Streams.copy(is, out);
template = out.bytes().toBytes();
}
PutIndexTemplateRequest request = new PutIndexTemplateRequest(config.getTemplateName()).source(template);
Settings customSettings = customIndexSettings.get(config.getSettingsPrefix());
if (customSettings != null && customSettings.names().size() > 0) {
Settings updatedSettings = Settings.builder()
.put(request.settings())
.put(customSettings)
.build();
request.settings(updatedSettings);
}
PutIndexTemplateResponse response = client.putTemplate(request);
} catch (Exception e) {
logger.error("failed to load [{}.json]", e, config.getTemplateName());
}
}
});
}
public static class TemplateConfig {
private final String templateName;
private final String settingsPrefix;
public TemplateConfig(String templateName, String settingsPrefix) {
this.templateName = templateName;
this.settingsPrefix = settingsPrefix;
}
public String getTemplateName() {
return templateName;
}
public String getSettingsPrefix() {
return settingsPrefix;
}
public String getDynamicSettingsPrefix() {
return settingsPrefix + ".*";
}
}
}

View File

@ -33,7 +33,6 @@ import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.watcher.WatcherException;
import org.elasticsearch.watcher.support.TemplateUtils;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import java.io.IOException;
@ -45,11 +44,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class WatchStore extends AbstractComponent {
public static final String INDEX = ".watches";
public static final String INDEX_TEMPLATE = "watches";
public static final String DOC_TYPE = "watch";
private final ClientProxy client;
private final TemplateUtils templateUtils;
private final Watch.Parser watchParser;
private final ConcurrentMap<String, Watch> watches;
@ -59,10 +56,9 @@ public class WatchStore extends AbstractComponent {
private final TimeValue scrollTimeout;
@Inject
public WatchStore(Settings settings, ClientProxy client, TemplateUtils templateUtils, Watch.Parser watchParser) {
public WatchStore(Settings settings, ClientProxy client, Watch.Parser watchParser) {
super(settings);
this.client = client;
this.templateUtils = templateUtils;
this.watchParser = watchParser;
this.watches = ConcurrentCollections.newConcurrentMap();
@ -81,7 +77,6 @@ public class WatchStore extends AbstractComponent {
try {
int count = loadWatches(watchesIndexMetaData.numberOfShards());
logger.debug("loaded [{}] watches from the watches index [{}]", count, INDEX);
templateUtils.putTemplate(INDEX_TEMPLATE, null);
started.set(true);
} catch (Exception e) {
logger.debug("failed to load watches for watch index [{}]", e, INDEX);
@ -89,7 +84,6 @@ public class WatchStore extends AbstractComponent {
throw e;
}
} else {
templateUtils.putTemplate(INDEX_TEMPLATE, null);
started.set(true);
}
}

View File

@ -28,7 +28,6 @@ import org.elasticsearch.search.internal.InternalSearchHits;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.watcher.history.TriggeredWatchException;
import org.elasticsearch.watcher.support.TemplateUtils;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.hamcrest.core.IsNull;
import org.junit.Before;
@ -41,7 +40,6 @@ import static org.hamcrest.core.IsEqual.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.*;
public class TriggeredWatchStoreTests extends ElasticsearchTestCase {
@ -53,11 +51,9 @@ public class TriggeredWatchStoreTests extends ElasticsearchTestCase {
@Before
public void init() {
clientProxy = mock(ClientProxy.class);
TemplateUtils templateUtils = mock(TemplateUtils.class);
parser = mock(TriggeredWatch.Parser.class);
triggeredWatchStore = new TriggeredWatchStore(Settings.EMPTY, clientProxy, templateUtils, parser);
triggeredWatchStore = new TriggeredWatchStore(Settings.EMPTY, clientProxy, parser);
triggeredWatchStore.start();
verify(templateUtils, times(1)).putTemplate(same(TriggeredWatchStore.INDEX_TEMPLATE_NAME), any(Settings.class));
}
@Test

View File

@ -5,15 +5,14 @@
*/
package org.elasticsearch.watcher.execution;
import org.joda.time.DateTime;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.watcher.execution.*;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.test.WatcherTestUtils;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.elasticsearch.watcher.watch.Watch;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.junit.Test;
@ -40,7 +39,7 @@ public class TriggeredWatchTests extends AbstractWatcherIntegrationTests {
}
private TriggeredWatch.Parser triggeredWatchParser() {
return internalTestCluster().getInstance(TriggeredWatch.Parser.class);
return internalCluster().getInstance(TriggeredWatch.Parser.class);
}

View File

@ -9,6 +9,7 @@ import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResp
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.watcher.WatcherModule;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.junit.Test;
@ -26,7 +27,7 @@ public class HistoryStoreSettingsTests extends AbstractWatcherIntegrationTests {
@Test
public void testChangeSettings() throws Exception {
GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(HistoryStore.INDEX_TEMPLATE_NAME).get();
GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(WatcherModule.HISTORY_TEMPLATE_NAME).get();
assertThat(response.getIndexTemplates().get(0).getSettings().get("index.number_of_shards"), equalTo("1"));
assertThat(response.getIndexTemplates().get(0).getSettings().get("index.number_of_replicas"), nullValue()); // this isn't defined in the template, so we rely on ES's default, which is zero
assertThat(response.getIndexTemplates().get(0).getSettings().get("index.refresh_interval"), nullValue()); // this isn't defined in the template, so we rely on ES's default, which is 1s
@ -43,7 +44,7 @@ public class HistoryStoreSettingsTests extends AbstractWatcherIntegrationTests {
assertBusy(new Runnable() {
@Override
public void run() {
GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(HistoryStore.INDEX_TEMPLATE_NAME).get();
GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(WatcherModule.HISTORY_TEMPLATE_NAME).get();
assertThat(response.getIndexTemplates().get(0).getSettings().get("index.number_of_shards"), equalTo("2"));
assertThat(response.getIndexTemplates().get(0).getSettings().get("index.number_of_replicas"), equalTo("2"));
assertThat(response.getIndexTemplates().get(0).getSettings().get("index.refresh_interval"), equalTo("5m"));
@ -53,7 +54,7 @@ public class HistoryStoreSettingsTests extends AbstractWatcherIntegrationTests {
@Test
public void testChangeSettings_ignoringForbiddenSetting() throws Exception {
GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(HistoryStore.INDEX_TEMPLATE_NAME).get();
GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(WatcherModule.HISTORY_TEMPLATE_NAME).get();
assertThat(response.getIndexTemplates().get(0).getSettings().get("index.number_of_shards"), equalTo("1"));
assertThat(response.getIndexTemplates().get(0).getSettings().getAsBoolean("index.mapper.dynamic", null), is(false));
@ -69,7 +70,7 @@ public class HistoryStoreSettingsTests extends AbstractWatcherIntegrationTests {
assertBusy(new Runnable() {
@Override
public void run() {
GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(HistoryStore.INDEX_TEMPLATE_NAME).get();
GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(WatcherModule.HISTORY_TEMPLATE_NAME).get();
assertThat(response.getIndexTemplates().get(0).getSettings().get("index.number_of_shards"), equalTo("2"));
assertThat(response.getIndexTemplates().get(0).getSettings().getAsBoolean("index.mapper.dynamic", null), is(false));
}

View File

@ -7,25 +7,21 @@ package org.elasticsearch.watcher.history;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.cluster.settings.DynamicSettings;
import org.elasticsearch.common.unit.TimeValue;
import org.joda.time.DateTime;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.execution.ExecutionState;
import org.elasticsearch.watcher.execution.Wid;
import org.elasticsearch.watcher.support.TemplateUtils;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.joda.time.DateTime;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import static org.joda.time.DateTimeZone.UTC;
import static org.elasticsearch.watcher.test.WatcherMatchers.indexRequest;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.joda.time.DateTimeZone.UTC;
import static org.mockito.Mockito.*;
/**
@ -38,11 +34,7 @@ public class HistoryStoreTests extends ElasticsearchTestCase {
@Before
public void init() {
clientProxy = mock(ClientProxy.class);
TemplateUtils templateUtils = mock(TemplateUtils.class);
NodeSettingsService nodeSettingsService = mock(NodeSettingsService.class);
DynamicSettings dynamicSettings = mock(DynamicSettings.class);
ThreadPool threadPool = mock(ThreadPool.class);
historyStore = new HistoryStore(Settings.EMPTY, clientProxy, templateUtils, nodeSettingsService, dynamicSettings, threadPool);
historyStore = new HistoryStore(Settings.EMPTY, clientProxy);
historyStore.start();
}

View File

@ -53,7 +53,7 @@ public class HttpInputIntegrationTests extends AbstractWatcherIntegrationTests {
createIndex("index");
client().prepareIndex("index", "type", "id").setSource("{}").setRefresh(true).get();
InetSocketAddress address = internalTestCluster().httpAddresses()[0];
InetSocketAddress address = internalCluster().httpAddresses()[0];
watcherClient().preparePutWatch("_name")
.setSource(watchBuilder()
.trigger(schedule(interval("5s")))
@ -74,7 +74,7 @@ public class HttpInputIntegrationTests extends AbstractWatcherIntegrationTests {
@Test
public void testHttpInput_clusterStats() throws Exception {
InetSocketAddress address = internalTestCluster().httpAddresses()[0];
InetSocketAddress address = internalCluster().httpAddresses()[0];
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("_name")
.setSource(watchBuilder()
.trigger(schedule(interval("1s")))
@ -102,7 +102,7 @@ public class HttpInputIntegrationTests extends AbstractWatcherIntegrationTests {
client().prepareIndex("idx", "type").setSource("field", "value").get();
refresh();
InetSocketAddress address = internalTestCluster().httpAddresses()[0];
InetSocketAddress address = internalCluster().httpAddresses()[0];
XContentBuilder body = jsonBuilder().prettyPrint().startObject()
.field("query").value(termQuery("field", "value"))
.endObject();

View File

@ -268,13 +268,13 @@ public class LicenseIntegrationTests extends AbstractWatcherIntegrationTests {
}
public static void disableLicensing() {
for (MockLicenseService service : internalTestCluster().getInstances(MockLicenseService.class)) {
for (MockLicenseService service : internalCluster().getInstances(MockLicenseService.class)) {
service.disable();
}
}
public static void enableLicensing() {
for (MockLicenseService service : internalTestCluster().getInstances(MockLicenseService.class)) {
for (MockLicenseService service : internalCluster().getInstances(MockLicenseService.class)) {
service.enable();
}
}

View File

@ -1,56 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.support;
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.watcher.history.HistoryStore;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.junit.Test;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.SUITE;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.core.Is.is;
/**
*/
@ElasticsearchIntegrationTest.ClusterScope(scope = SUITE, numClientNodes = 0, transportClientRatio = 0, randomDynamicTemplates = false, numDataNodes = 1)
public class TemplateUtilsTests extends ElasticsearchIntegrationTest {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(PluginsService.LOAD_PLUGIN_FROM_CLASSPATH, false)
.build();
}
@Override
protected Settings transportClientSettings() {
return Settings.builder()
.put(super.transportClientSettings())
.put(PluginsService.LOAD_PLUGIN_FROM_CLASSPATH, false)
.build();
}
@Test
public void testPutTemplate() throws Exception {
TemplateUtils templateUtils = new TemplateUtils(Settings.EMPTY, ClientProxy.of(client()));
Settings.Builder options = Settings.builder();
options.put("key", "value");
templateUtils.putTemplate(HistoryStore.INDEX_TEMPLATE_NAME, options.build());
GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(HistoryStore.INDEX_TEMPLATE_NAME).get();
// setting in the file on the classpath:
assertThat(response.getIndexTemplates().get(0).getSettings().getAsBoolean("index.mapper.dynamic", null), is(false));
// additional setting:
assertThat(response.getIndexTemplates().get(0).getSettings().get("index.key"), equalTo("value"));
}
}

View File

@ -0,0 +1,62 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.support;
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.watcher.WatcherModule;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.junit.Test;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.TEST;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.core.Is.is;
/**
*/
@ElasticsearchIntegrationTest.ClusterScope(scope = TEST, numClientNodes = 0, transportClientRatio = 0, randomDynamicTemplates = false, numDataNodes = 1)
public class WatcherIndexTemplateRegistryTests extends AbstractWatcherIntegrationTests {
@Test
public void testTemplates() throws Exception {
assertAcked(
client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder()
.put("watcher.history.index.key1", "value"))
.get()
);
assertBusy(new Runnable() {
@Override
public void run() {
GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(WatcherModule.HISTORY_TEMPLATE_NAME).get();
assertThat(response.getIndexTemplates().size(), equalTo(1));
// setting from the file on the classpath:
assertThat(response.getIndexTemplates().get(0).getSettings().getAsBoolean("index.mapper.dynamic", null), is(false));
// additional setting defined in the node settings:
assertThat(response.getIndexTemplates().get(0).getSettings().get("index.key1"), equalTo("value"));
}
});
// Now delete the index template and verify the index template gets added back:
assertAcked(client().admin().indices().prepareDeleteTemplate(WatcherModule.HISTORY_TEMPLATE_NAME).get());
assertBusy(new Runnable() {
@Override
public void run() {
GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(WatcherModule.HISTORY_TEMPLATE_NAME).get();
assertThat(response.getIndexTemplates().size(), equalTo(1));
// setting from the file on the classpath:
assertThat(response.getIndexTemplates().get(0).getSettings().getAsBoolean("index.mapper.dynamic", null), is(false));
// additional setting defined in the node settings:
assertThat(response.getIndexTemplates().get(0).getSettings().get("index.key1"), equalTo("value"));
}
});
}
}

View File

@ -5,15 +5,12 @@
*/
package org.elasticsearch.watcher.test;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.Streams;
@ -32,7 +29,6 @@ import org.elasticsearch.shield.authc.support.SecuredString;
import org.elasticsearch.shield.crypto.InternalCryptoService;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.TestCluster;
import org.elasticsearch.watcher.WatcherLifeCycleService;
import org.elasticsearch.watcher.WatcherPlugin;
@ -46,7 +42,6 @@ import org.elasticsearch.watcher.client.WatcherClient;
import org.elasticsearch.watcher.execution.ExecutionService;
import org.elasticsearch.watcher.execution.ExecutionState;
import org.elasticsearch.watcher.history.HistoryStore;
import org.elasticsearch.watcher.execution.TriggeredWatchStore;
import org.elasticsearch.watcher.license.LicenseService;
import org.elasticsearch.watcher.support.clock.ClockMock;
import org.elasticsearch.watcher.support.http.HttpClient;
@ -56,7 +51,6 @@ import org.elasticsearch.watcher.trigger.ScheduleTriggerEngineMock;
import org.elasticsearch.watcher.trigger.TriggerService;
import org.elasticsearch.watcher.trigger.schedule.ScheduleModule;
import org.elasticsearch.watcher.watch.Watch;
import org.elasticsearch.watcher.watch.WatchStore;
import org.hamcrest.Matcher;
import org.jboss.netty.util.internal.SystemPropertyUtil;
import org.junit.After;
@ -66,11 +60,13 @@ import org.junit.Before;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
@ -92,6 +88,15 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
private static ScheduleModule.Engine scheduleEngine;
@Override
protected TestCluster buildTestCluster(Scope scope, long seed) throws IOException {
if (shieldEnabled == null) {
shieldEnabled = enableShield();
scheduleEngine = randomFrom(ScheduleModule.Engine.values());
}
return super.buildTestCluster(scope, seed);
}
@Override
protected Settings nodeSettings(int nodeOrdinal) {
String scheduleImplName = scheduleEngine().name().toLowerCase(Locale.ROOT);
@ -208,7 +213,7 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
}
private void startWatcherIfNodesExist() throws Exception {
if (internalTestCluster().size() > 0) {
if (internalCluster().size() > 0) {
ensureLicenseEnabled();
WatcherState state = getInstanceFromMaster(WatcherService.class).state();
if (state == WatcherState.STOPPED) {
@ -234,14 +239,6 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
return false;
}
@Override
protected TestCluster buildTestCluster(Scope scope, long seed) throws IOException {
// This overwrites the wipe logic of the test cluster to not remove the watches and watch_history templates. By default all templates are removed
// TODO: We should have the notion of a hidden template (like hidden index / type) that only gets removed when specifically mentioned.
final TestCluster testCluster = super.buildTestCluster(scope, seed);
return new WatcherWrappingCluster(seed, testCluster);
}
protected long docCount(String index, String type, QueryBuilder query) {
refresh();
return docCount(index, type, SearchSourceBuilder.searchSource().query(query));
@ -266,7 +263,7 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
}
protected <T> T getInstanceFromMaster(Class<T> type) {
return internalTestCluster().getInstance(type, internalTestCluster().getMasterName());
return internalCluster().getInstance(type, internalCluster().getMasterName());
}
protected Watch.Parser watchParser() {
@ -291,16 +288,16 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
protected WatcherClient watcherClient() {
return shieldEnabled ?
new WatcherClient(internalTestCluster().transportClient()) :
new WatcherClient(internalCluster().transportClient()) :
new WatcherClient(client());
}
protected ScriptServiceProxy scriptService() {
return internalTestCluster().getInstance(ScriptServiceProxy.class);
return internalCluster().getInstance(ScriptServiceProxy.class);
}
protected HttpClient watcherHttpClient() {
return internalTestCluster().getInstance(HttpClient.class);
return internalCluster().getInstance(HttpClient.class);
}
protected EmailService noopEmailService() {
@ -442,7 +439,7 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
assertBusy(new Runnable() {
@Override
public void run() {
for (LicenseService service : internalTestCluster().getInstances(LicenseService.class)) {
for (LicenseService service : internalCluster().getInstances(LicenseService.class)) {
assertThat(service.enabled(), is(true));
}
}
@ -494,7 +491,7 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
protected void ensureWatcherOnlyRunningOnce() {
int running = 0;
for (WatcherService watcherService : internalTestCluster().getInstances(WatcherService.class)) {
for (WatcherService watcherService : internalCluster().getInstances(WatcherService.class)) {
if (watcherService.state() == WatcherState.STARTED) {
running++;
}
@ -502,103 +499,6 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
assertThat("watcher should only run on the elected master node, but it is running on [" + running + "] nodes", running, equalTo(1));
}
protected static InternalTestCluster internalTestCluster() {
return (InternalTestCluster) ((WatcherWrappingCluster) cluster()).testCluster;
}
// We need this custom impl, because we have custom wipe logic. We don't want the watcher index templates to get deleted between tests
private final class WatcherWrappingCluster extends TestCluster {
private final TestCluster testCluster;
private WatcherWrappingCluster(long seed, TestCluster testCluster) {
super(seed);
this.testCluster = testCluster;
}
@Override
public void beforeTest(Random random, double transportClientRatio) throws IOException {
if (scheduleEngine == null) {
scheduleEngine = randomFrom(ScheduleModule.Engine.values());
}
if (shieldEnabled == null) {
shieldEnabled = enableShield();
}
testCluster.beforeTest(random, transportClientRatio);
}
@Override
public void wipe() {
wipeIndices("_all");
wipeRepositories();
if (size() > 0) {
List<String> templatesToWipe = new ArrayList<>();
ClusterState state = client().admin().cluster().prepareState().get().getState();
for (ObjectObjectCursor<String, IndexTemplateMetaData> cursor : state.getMetaData().templates()) {
if (cursor.key.equals(WatchStore.INDEX_TEMPLATE) || cursor.key.equals(HistoryStore.INDEX_TEMPLATE_NAME) || cursor.key.equals(TriggeredWatchStore.INDEX_TEMPLATE_NAME)) {
continue;
}
templatesToWipe.add(cursor.key);
}
if (!templatesToWipe.isEmpty()) {
wipeTemplates(templatesToWipe.toArray(new String[templatesToWipe.size()]));
}
}
}
@Override
public void afterTest() throws IOException {
testCluster.afterTest();
}
@Override
public Client client() {
return testCluster.client();
}
@Override
public int size() {
return testCluster.size();
}
@Override
public int numDataNodes() {
return testCluster.numDataNodes();
}
@Override
public int numDataAndMasterNodes() {
return testCluster.numDataAndMasterNodes();
}
@Override
public InetSocketAddress[] httpAddresses() {
return testCluster.httpAddresses();
}
@Override
public void close() throws IOException {
testCluster.close();
}
@Override
public void ensureEstimatedStats() {
testCluster.ensureEstimatedStats();
}
@Override
public String getClusterName() {
return testCluster.getClusterName();
}
@Override
public Iterator<Client> iterator() {
return testCluster.iterator();
}
}
private static class NoopEmailService implements EmailService {
@Override

View File

@ -84,7 +84,7 @@ public class NoMasterNodeTests extends AbstractWatcherIntegrationTests {
@Test
public void testSimpleFailure() throws Exception {
config = new ClusterDiscoveryConfiguration.UnicastZen(2);
internalTestCluster().startNodesAsync(2).get();
internalCluster().startNodesAsync(2).get();
createIndex("my-index");
ensureWatcherStarted(false);
@ -141,12 +141,12 @@ public class NoMasterNodeTests extends AbstractWatcherIntegrationTests {
.put("node.data", false)
.put("node.master", true)
.build();
internalTestCluster().startNodesAsync(3, settings).get();
internalCluster().startNodesAsync(3, settings).get();
settings = Settings.builder()
.put("node.data", true)
.put("node.master", false)
.build();
internalTestCluster().startNodesAsync(7, settings).get();
internalCluster().startNodesAsync(7, settings).get();
ensureWatcherStarted(false);
ensureLicenseEnabled();
@ -162,7 +162,7 @@ public class NoMasterNodeTests extends AbstractWatcherIntegrationTests {
assertWatchWithMinimumPerformedActionsCount("_watch_id", 1, false);
// We still have 2 master node, we should recover from this failure:
internalTestCluster().stopCurrentMasterNode();
internalCluster().stopCurrentMasterNode();
ensureWatcherStarted(false);
ensureWatcherOnlyRunningOnce();
assertWatchWithMinimumPerformedActionsCount("_watch_id", 2, false);
@ -192,7 +192,7 @@ public class NoMasterNodeTests extends AbstractWatcherIntegrationTests {
int numberOfWatches = scaledRandomIntBetween(numberOfFailures, 12);
logger.info("number of failures [{}], number of watches [{}]", numberOfFailures, numberOfWatches);
config = new ClusterDiscoveryConfiguration.UnicastZen(2 + numberOfFailures);
internalTestCluster().startNodesAsync(2).get();
internalCluster().startNodesAsync(2).get();
createIndex("my-index");
client().prepareIndex("my-index", "my-type").setSource("field", "value").get();
@ -233,14 +233,14 @@ public class NoMasterNodeTests extends AbstractWatcherIntegrationTests {
// will elect itself as master. This is bad and should be fixed in core. What I think that should happen is that
// if a node detects that is has lost a node, a node should clear its unicast temporal responses or at least
// remove the node that has been removed. This is a workaround:
for (ZenPingService pingService : internalTestCluster().getInstances(ZenPingService.class)) {
for (ZenPingService pingService : internalCluster().getInstances(ZenPingService.class)) {
for (ZenPing zenPing : pingService.zenPings()) {
if (zenPing instanceof UnicastZenPing) {
((UnicastZenPing) zenPing).clearTemporalResponses();
}
}
}
internalTestCluster().stopCurrentMasterNode();
internalCluster().stopCurrentMasterNode();
// Can't use ensureWatcherStopped, b/c that relies on the watcher stats api which requires an elected master node
assertBusy(new Runnable() {
public void run () {
@ -251,16 +251,16 @@ public class NoMasterNodeTests extends AbstractWatcherIntegrationTests {
}
}, 30, TimeUnit.SECONDS);
// Ensure that the watch manager doesn't run elsewhere
for (WatcherService watcherService : internalTestCluster().getInstances(WatcherService.class)) {
for (WatcherService watcherService : internalCluster().getInstances(WatcherService.class)) {
assertThat(watcherService.state(), is(WatcherState.STOPPED));
}
for (ExecutionService executionService : internalTestCluster().getInstances(ExecutionService.class)) {
for (ExecutionService executionService : internalCluster().getInstances(ExecutionService.class)) {
assertThat(executionService.executionThreadPoolQueueSize(), equalTo(0l));
}
}
private void startElectedMasterNodeAndWait() throws Exception {
internalTestCluster().startNode();
internalCluster().startNode();
ensureWatcherStarted(false);
ensureWatcherOnlyRunningOnce();
}

View File

@ -28,7 +28,6 @@ import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.search.internal.InternalSearchHits;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.watcher.WatcherException;
import org.elasticsearch.watcher.support.TemplateUtils;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.junit.Before;
import org.junit.Test;
@ -46,15 +45,13 @@ public class WatchStoreTests extends ElasticsearchTestCase {
private WatchStore watchStore;
private ClientProxy clientProxy;
private TemplateUtils templateUtils;
private Watch.Parser parser;
@Before
public void init() {
clientProxy = mock(ClientProxy.class);
templateUtils = mock(TemplateUtils.class);
parser = mock(Watch.Parser.class);
watchStore = new WatchStore(Settings.EMPTY, clientProxy, templateUtils, parser);
watchStore = new WatchStore(Settings.EMPTY, clientProxy, parser);
}
@Test
@ -68,11 +65,9 @@ public class WatchStoreTests extends ElasticsearchTestCase {
watchStore.start(cs);
assertThat(watchStore.started(), is(true));
assertThat(watchStore.watches().size(), equalTo(0));
verify(templateUtils, times(1)).putTemplate("watches", null);
verifyZeroInteractions(clientProxy);
watchStore.start(cs);
verifyNoMoreInteractions(templateUtils);
verifyZeroInteractions(clientProxy);
}
@ -97,7 +92,6 @@ public class WatchStoreTests extends ElasticsearchTestCase {
ClusterState cs = csBuilder.build();
assertThat(watchStore.validate(cs), is(false));
verifyZeroInteractions(templateUtils);
verifyZeroInteractions(clientProxy);
}
@ -131,7 +125,6 @@ public class WatchStoreTests extends ElasticsearchTestCase {
} catch (WatcherException e) {
assertThat(e.getMessage(), equalTo("not all required shards have been refreshed"));
}
verifyZeroInteractions(templateUtils);
verify(clientProxy, times(1)).refresh(any(RefreshRequest.class));
verify(clientProxy, never()).search(any(SearchRequest.class), any(TimeValue.class));
verify(clientProxy, never()).clearScroll(anyString());
@ -171,7 +164,6 @@ public class WatchStoreTests extends ElasticsearchTestCase {
} catch (ElasticsearchException e) {
assertThat(e.getMessage(), equalTo("Partial response while loading watches"));
}
verifyZeroInteractions(templateUtils);
verify(clientProxy, times(1)).refresh(any(RefreshRequest.class));
verify(clientProxy, times(1)).search(any(SearchRequest.class), any(TimeValue.class));
verify(clientProxy, times(1)).clearScroll(anyString());
@ -209,7 +201,6 @@ public class WatchStoreTests extends ElasticsearchTestCase {
watchStore.start(cs);
assertThat(watchStore.started(), is(true));
assertThat(watchStore.watches().size(), equalTo(0));
verify(templateUtils, times(1)).putTemplate("watches", null);
verify(clientProxy, times(1)).refresh(any(RefreshRequest.class));
verify(clientProxy, times(1)).search(any(SearchRequest.class), any(TimeValue.class));
verify(clientProxy, times(1)).clearScroll(anyString());
@ -264,7 +255,6 @@ public class WatchStoreTests extends ElasticsearchTestCase {
watchStore.start(cs);
assertThat(watchStore.started(), is(true));
assertThat(watchStore.watches().size(), equalTo(2));
verify(templateUtils, times(1)).putTemplate("watches", null);
verify(clientProxy, times(1)).refresh(any(RefreshRequest.class));
verify(clientProxy, times(1)).search(any(SearchRequest.class), any(TimeValue.class));
verify(clientProxy, times(2)).searchScroll(anyString(), any(TimeValue.class));