index template: Added support to add additional settings to the index templates.

Also removed the version checking from the index template support, so that if the settings are changed these settings always get stored in the index template. This means we always put two index templates once watcher started.

And also shutdown watcher after the cluster service has been stopped.

This was done caused deadlock in the integration test framework when we are stopping the test cluster.
If multiple nodes are stopped, a node may briefly try to or run Watcher during the test cluster shutdown.
As part if starting we always put an index template, which will block any other start or stop calls for the WatcherService.
A node was running a put index template call as part of the start procedure while the start framework would stop the node.
The stop call would wait because it doesn't own the lock yet.
The put index template call didn't return, which caused the stop call the keep waiting.

The reason why put index template was hanging is that it failed to return anything while it the node was stopped.
Likely the put index template request got queued up waiting for a change in the cluster state, the cluster service
got stopped and the request got lost, but the watcher start procedure was still waiting.

By stopping Watcher before we stop the cluster service we avoid the scenario described above.

Original commit: elastic/x-pack-elasticsearch@db94b2279e
This commit is contained in:
Martijn van Groningen 2015-05-04 16:31:25 +02:00
parent 1a9db6140a
commit 3f2070cc0a
12 changed files with 238 additions and 101 deletions

View File

@ -14,7 +14,6 @@ import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
/** /**
@ -29,7 +28,7 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
private volatile boolean manuallyStopped; private volatile boolean manuallyStopped;
@Inject @Inject
public WatcherLifeCycleService(Settings settings, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, WatcherService watcherService) { public WatcherLifeCycleService(Settings settings, ClusterService clusterService, ThreadPool threadPool, WatcherService watcherService) {
super(settings); super(settings);
this.clusterService = clusterService; this.clusterService = clusterService;
this.threadPool = threadPool; this.threadPool = threadPool;
@ -37,7 +36,7 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
clusterService.add(this); clusterService.add(this);
// Close if the indices service is being stopped, so we don't run into search failures (locally) that will // Close if the indices service is being stopped, so we don't run into search failures (locally) that will
// happen because we're shutting down and an watch is scheduled. // happen because we're shutting down and an watch is scheduled.
indicesService.addLifecycleListener(new LifecycleListener() { clusterService.addLifecycleListener(new LifecycleListener() {
@Override @Override
public void beforeStop() { public void beforeStop() {
stop(false); stop(false);

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.watcher.history; package org.elasticsearch.watcher.history;
import com.google.common.collect.ImmutableSet;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
@ -19,26 +20,28 @@ import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.settings.ClusterDynamicSettings;
import org.elasticsearch.cluster.settings.DynamicSettings;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.format.DateTimeFormat; import org.elasticsearch.common.joda.time.format.DateTimeFormat;
import org.elasticsearch.common.joda.time.format.DateTimeFormatter; import org.elasticsearch.common.joda.time.format.DateTimeFormatter;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.support.TemplateUtils; import org.elasticsearch.watcher.support.TemplateUtils;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy; import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.*;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
@ -46,34 +49,94 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
/** /**
*/ */
public class HistoryStore extends AbstractComponent { public class HistoryStore extends AbstractComponent implements NodeSettingsService.Listener {
public static final String INDEX_PREFIX = ".watch_history-"; public static final String INDEX_PREFIX = ".watch_history-";
public static final String DOC_TYPE = "watch_record"; public static final String DOC_TYPE = "watch_record";
public static final String INDEX_TEMPLATE_NAME = "watch_history"; public static final String INDEX_TEMPLATE_NAME = "watch_history";
static final DateTimeFormatter indexTimeFormat = DateTimeFormat.forPattern("YYYY.MM.dd"); static final DateTimeFormatter indexTimeFormat = DateTimeFormat.forPattern("YYYY.MM.dd");
private static final ImmutableSet<String> forbiddenIndexSettings = ImmutableSet.of("index.mapper.dynamic");
private final ClientProxy client; private final ClientProxy client;
private final TemplateUtils templateUtils; private final TemplateUtils templateUtils;
private final int scrollSize; private final int scrollSize;
private final TimeValue scrollTimeout; private final TimeValue scrollTimeout;
private final WatchRecord.Parser recordParser; private final WatchRecord.Parser recordParser;
private final ThreadPool threadPool;
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Lock putUpdateLock = readWriteLock.readLock(); private final Lock putUpdateLock = readWriteLock.readLock();
private final Lock stopLock = readWriteLock.writeLock(); private final Lock stopLock = readWriteLock.writeLock();
private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean started = new AtomicBoolean(false);
private volatile Settings customIndexSettings = ImmutableSettings.EMPTY;
@Inject @Inject
public HistoryStore(Settings settings, ClientProxy client, TemplateUtils templateUtils, WatchRecord.Parser recordParser) { public HistoryStore(Settings settings, ClientProxy client, TemplateUtils templateUtils, WatchRecord.Parser recordParser,
NodeSettingsService nodeSettingsService, @ClusterDynamicSettings DynamicSettings dynamicSettings,
ThreadPool threadPool) {
super(settings); super(settings);
this.client = client; this.client = client;
this.templateUtils = templateUtils; this.templateUtils = templateUtils;
this.recordParser = recordParser; this.recordParser = recordParser;
this.threadPool = threadPool;
this.scrollTimeout = componentSettings.getAsTime("scroll.timeout", TimeValue.timeValueSeconds(30)); this.scrollTimeout = componentSettings.getAsTime("scroll.timeout", TimeValue.timeValueSeconds(30));
this.scrollSize = componentSettings.getAsInt("scroll.size", 100); this.scrollSize = componentSettings.getAsInt("scroll.size", 100);
updateHistorySettings(settings, false);
nodeSettingsService.addListener(this);
dynamicSettings.addDynamicSetting("watcher.history.index.*");
} }
@Override
public void onRefreshSettings(Settings settings) {
updateHistorySettings(settings, true);
}
private void updateHistorySettings(Settings settings, boolean updateIndexTemplate) {
Settings newSettings = ImmutableSettings.builder()
.put(settings.getAsSettings("watcher.history.index"))
.build();
if (newSettings.names().isEmpty()) {
return;
}
boolean changed = false;
ImmutableSettings.Builder builder = ImmutableSettings.builder().put(customIndexSettings);
for (Map.Entry<String, String> entry : newSettings.getAsMap().entrySet()) {
String name = "index." + entry.getKey();
if (forbiddenIndexSettings.contains(name)) {
logger.warn("overriding the default [{}} setting is forbidden. ignoring...", name);
continue;
}
String newValue = entry.getValue();
String currentValue = customIndexSettings.get(name);
if (!newValue.equals(currentValue)) {
changed = true;
builder.put(name, newValue);
logger.info("changing setting [{}] from [{}] to [{}]", name, currentValue, newValue);
}
}
if (changed) {
customIndexSettings = builder.build();
if (updateIndexTemplate) {
// Need to fork to prevent dead lock. (We're on the cluster service update task, but the put index template
// needs to update the cluster state too, and because the update takes is a single threaded operation,
// we would then be stuck)
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
@Override
public void run() {
templateUtils.putTemplate(INDEX_TEMPLATE_NAME, customIndexSettings);
}
});
}
}
}
public void start() { public void start() {
started.set(true); started.set(true);
} }
@ -243,7 +306,7 @@ public class HistoryStore extends AbstractComponent {
String[] indices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), INDEX_PREFIX + "*"); String[] indices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), INDEX_PREFIX + "*");
if (indices.length == 0) { if (indices.length == 0) {
logger.debug("no .watch_history indices found. skipping loading awaiting watch records"); logger.debug("no .watch_history indices found. skipping loading awaiting watch records");
templateUtils.ensureIndexTemplateIsLoaded(state, INDEX_TEMPLATE_NAME); templateUtils.putTemplate(INDEX_TEMPLATE_NAME, customIndexSettings);
return Collections.emptySet(); return Collections.emptySet();
} }
int numPrimaryShards = 0; int numPrimaryShards = 0;
@ -292,7 +355,7 @@ public class HistoryStore extends AbstractComponent {
} finally { } finally {
client.clearScroll(response.getScrollId()); client.clearScroll(response.getScrollId());
} }
templateUtils.ensureIndexTemplateIsLoaded(state, INDEX_TEMPLATE_NAME); templateUtils.putTemplate(INDEX_TEMPLATE_NAME, customIndexSettings);
return records; return records;
} }

View File

@ -5,104 +5,58 @@
*/ */
package org.elasticsearch.watcher.support; package org.elasticsearch.watcher.support;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
import org.elasticsearch.action.admin.indices.template.put.TransportPutIndexTemplateAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.common.base.Charsets;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.watcher.shield.ShieldIntegration; import org.elasticsearch.watcher.WatcherException;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.watch.WatchStore; import org.elasticsearch.watcher.watch.WatchStore;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/** /**
*/ */
public class TemplateUtils extends AbstractComponent { public class TemplateUtils extends AbstractComponent {
private final static Pattern TEMPLATE_VERSION_PATTERN = Pattern.compile("watcher.template_version\"\\s*:\\s*\"?(\\d+)\"?"); private final ClientProxy client;
private final ShieldIntegration shieldIntegration;
private final TransportPutIndexTemplateAction action;
@Inject @Inject
public TemplateUtils(Settings settings, TransportPutIndexTemplateAction action, ShieldIntegration shieldIntegration) { public TemplateUtils(Settings settings, ClientProxy client) {
super(settings); super(settings);
this.action = action; this.client = client;
this.shieldIntegration = shieldIntegration;
} }
/** /**
* Checks if the template with the specified name exists and has the expected version. * Resolves the template with the specified templateName from the classpath, optionally adds extra settings and
* If that isn't the case then the template from the classpath will be uploaded to the cluster. * puts the index template into the cluster.
* *
* In the the template doesn't exists this method blocks until the template has been created. * This method blocks until the template has been created.
*/ */
public void ensureIndexTemplateIsLoaded(ClusterState state, final String templateName) { public void putTemplate(String templateName, Settings customSettings) {
final byte[] template; try (InputStream is = WatchStore.class.getResourceAsStream("/" + templateName + ".json")) {
try {
InputStream is = WatchStore.class.getResourceAsStream("/" + templateName + ".json");
if (is == null) { if (is == null) {
throw new FileNotFoundException("Resource [/" + templateName + ".json] not found in classpath"); throw new FileNotFoundException("Resource [/" + templateName + ".json] not found in classpath");
} }
template = Streams.copyToByteArray(is); final byte[] template = Streams.copyToByteArray(is);
is.close(); PutIndexTemplateRequest request = new PutIndexTemplateRequest(templateName).source(template);
} catch (IOException e) { if (customSettings != null) {
Settings updatedSettings = ImmutableSettings.builder()
.put(request.settings())
.put(customSettings)
.build();
request.settings(updatedSettings);
}
PutIndexTemplateResponse response = client.putTemplate(request);
} catch (Exception e) {
// throwing an exception to stop exporting process - we don't want to send data unless // throwing an exception to stop exporting process - we don't want to send data unless
// we put in the template for it. // we put in the template for it.
throw new RuntimeException("failed to load " + templateName + ".json", e); throw new WatcherException("failed to load [{}.json]", e, templateName);
} }
try {
int expectedVersion = parseIndexVersionFromTemplate(template);
if (expectedVersion < 0) {
throw new RuntimeException("failed to find an index version in pre-configured index template");
}
IndexTemplateMetaData templateMetaData = state.metaData().templates().get(templateName);
if (templateMetaData != null) {
int foundVersion = templateMetaData.getSettings().getAsInt("index.watcher.template_version", -1);
if (foundVersion < 0) {
logger.warn("found an existing index template [{}] but couldn't extract it's version. leaving it as is.", templateName);
return;
} else if (foundVersion >= expectedVersion) {
logger.debug("accepting existing index template [{}] (version [{}], needed [{}])", templateName, foundVersion, expectedVersion);
return;
} else {
logger.info("replacing existing index template [{}] (version [{}], needed [{}])", templateName, foundVersion, expectedVersion);
}
} else {
logger.info("Adding index template [{}], because none was found", templateName);
}
PutIndexTemplateRequest request = new PutIndexTemplateRequest(templateName).source(template);
shieldIntegration.bindWatcherUser(request);
// We're already running on the master and TransportPutIndexTemplateAction#executor() is SAME, so it is ok to wait:
ActionFuture<PutIndexTemplateResponse> future = action.execute(request);
PutIndexTemplateResponse response = future.actionGet();
} catch (IOException e) {
// if we're not sure of the template, we can't send data... re-raise exception.
throw new RuntimeException("failed to load/verify index template", e);
}
}
private static int parseIndexVersionFromTemplate(byte[] template) throws UnsupportedEncodingException {
Matcher matcher = TEMPLATE_VERSION_PATTERN.matcher(new String(template, Charsets.UTF_8));
if (!matcher.find()) {
return -1;
}
return Integer.parseInt(matcher.group(1));
} }
} }

View File

@ -9,6 +9,8 @@ import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteRequest;
@ -99,6 +101,11 @@ public class ClientProxy implements InitializingService.Initializable {
return client.admin().indices().refresh(preProcess(request)).actionGet(); return client.admin().indices().refresh(preProcess(request)).actionGet();
} }
public PutIndexTemplateResponse putTemplate(PutIndexTemplateRequest request) {
preProcess(request);
return client.admin().indices().putTemplate(request).actionGet();
}
<M extends TransportMessage> M preProcess(M message) { <M extends TransportMessage> M preProcess(M message) {
if (shieldIntegration != null) { if (shieldIntegration != null) {
shieldIntegration.bindWatcherUser(message); shieldIntegration.bindWatcherUser(message);

View File

@ -76,7 +76,7 @@ public class WatchStore extends AbstractComponent {
try { try {
int count = loadWatches(watchesIndexMetaData.numberOfShards()); int count = loadWatches(watchesIndexMetaData.numberOfShards());
logger.debug("loaded [{}] watches from the watches index [{}]", count, INDEX); logger.debug("loaded [{}] watches from the watches index [{}]", count, INDEX);
templateUtils.ensureIndexTemplateIsLoaded(state, INDEX_TEMPLATE); templateUtils.putTemplate(INDEX_TEMPLATE, null);
started.set(true); started.set(true);
} catch (Exception e) { } catch (Exception e) {
logger.debug("failed to load watches for watch index [{}]", e, INDEX); logger.debug("failed to load watches for watch index [{}]", e, INDEX);
@ -84,7 +84,7 @@ public class WatchStore extends AbstractComponent {
throw e; throw e;
} }
} else { } else {
templateUtils.ensureIndexTemplateIsLoaded(state, INDEX_TEMPLATE); templateUtils.putTemplate(INDEX_TEMPLATE, null);
started.set(true); started.set(true);
} }
} }

View File

@ -3,8 +3,6 @@
"order": 2147483647, "order": 2147483647,
"settings": { "settings": {
"index.number_of_shards": 1, "index.number_of_shards": 1,
"index.number_of_replicas": 1,
"index.watcher.template_version": 1,
"index.mapper.dynamic" : false "index.mapper.dynamic" : false
}, },
"mappings": { "mappings": {

View File

@ -3,8 +3,6 @@
"order": 2147483647, "order": 2147483647,
"settings": { "settings": {
"index.number_of_shards": 1, "index.number_of_shards": 1,
"index.number_of_replicas": 1,
"index.watcher.template_version": 1,
"index.mapper.dynamic" : false "index.mapper.dynamic" : false
}, },
"mappings": { "mappings": {

View File

@ -14,7 +14,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.util.concurrent.MoreExecutors; import org.elasticsearch.common.util.concurrent.MoreExecutors;
import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.junit.Before; import org.junit.Before;
@ -27,20 +26,16 @@ import static org.mockito.Mockito.*;
*/ */
public class WatcherLifeCycleServiceTest extends ElasticsearchTestCase { public class WatcherLifeCycleServiceTest extends ElasticsearchTestCase {
private ThreadPool threadPool;
private WatcherService watcherService; private WatcherService watcherService;
private ClusterService clusterService;
private IndicesService indicesService;
private WatcherLifeCycleService lifeCycleService; private WatcherLifeCycleService lifeCycleService;
@Before @Before
public void prepareServices() { public void prepareServices() {
threadPool = mock(ThreadPool.class); ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.executor(anyString())).thenReturn(MoreExecutors.newDirectExecutorService()); when(threadPool.executor(anyString())).thenReturn(MoreExecutors.newDirectExecutorService());
watcherService = mock(WatcherService.class); watcherService = mock(WatcherService.class);
clusterService = mock(ClusterService.class); ClusterService clusterService = mock(ClusterService.class);
indicesService = mock(IndicesService.class); lifeCycleService = new WatcherLifeCycleService(ImmutableSettings.EMPTY, clusterService, threadPool, watcherService);
lifeCycleService = new WatcherLifeCycleService(ImmutableSettings.EMPTY, clusterService, indicesService, threadPool, watcherService);
} }
@Test @Test

View File

@ -17,6 +17,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.*; import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.settings.DynamicSettings;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.joda.time.DateTime;
@ -25,11 +26,13 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.StringText; import org.elasticsearch.common.text.StringText;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.InternalSearchHit; import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.search.internal.InternalSearchHits; import org.elasticsearch.search.internal.InternalSearchHits;
import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.condition.always.ExecutableAlwaysCondition; import org.elasticsearch.watcher.condition.always.ExecutableAlwaysCondition;
import org.elasticsearch.watcher.execution.Wid; import org.elasticsearch.watcher.execution.Wid;
import org.elasticsearch.watcher.input.none.ExecutableNoneInput; import org.elasticsearch.watcher.input.none.ExecutableNoneInput;
@ -48,6 +51,7 @@ import static org.elasticsearch.watcher.test.WatcherMatchers.indexRequest;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
import static org.hamcrest.core.IsEqual.equalTo; import static org.hamcrest.core.IsEqual.equalTo;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
/** /**
@ -58,13 +62,19 @@ public class HistoryStoreTests extends ElasticsearchTestCase {
private ClientProxy clientProxy; private ClientProxy clientProxy;
private TemplateUtils templateUtils; private TemplateUtils templateUtils;
private WatchRecord.Parser parser; private WatchRecord.Parser parser;
private NodeSettingsService nodeSettingsService;
private DynamicSettings dynamicSettings;
private ThreadPool threadPool;
@Before @Before
public void init() { public void init() {
clientProxy = mock(ClientProxy.class); clientProxy = mock(ClientProxy.class);
templateUtils = mock(TemplateUtils.class); templateUtils = mock(TemplateUtils.class);
parser = mock(WatchRecord.Parser.class); parser = mock(WatchRecord.Parser.class);
historyStore = new HistoryStore(ImmutableSettings.EMPTY, clientProxy, templateUtils, parser); nodeSettingsService = mock(NodeSettingsService.class);
dynamicSettings = mock(DynamicSettings.class);
threadPool = mock(ThreadPool.class);
historyStore = new HistoryStore(ImmutableSettings.EMPTY, clientProxy, templateUtils, parser, nodeSettingsService, dynamicSettings, threadPool);
historyStore.start(); historyStore.start();
} }
@ -160,7 +170,7 @@ public class HistoryStoreTests extends ElasticsearchTestCase {
Collection<WatchRecord> records = historyStore.loadRecords(cs, WatchRecord.State.AWAITS_EXECUTION); Collection<WatchRecord> records = historyStore.loadRecords(cs, WatchRecord.State.AWAITS_EXECUTION);
assertThat(records, notNullValue()); assertThat(records, notNullValue());
assertThat(records, hasSize(0)); assertThat(records, hasSize(0));
verify(templateUtils, times(1)).ensureIndexTemplateIsLoaded(cs, "watch_history"); verify(templateUtils, times(1)).putTemplate(same("watch_history"), any(Settings.class));
verifyZeroInteractions(clientProxy); verifyZeroInteractions(clientProxy);
} }
@ -325,7 +335,7 @@ public class HistoryStoreTests extends ElasticsearchTestCase {
assertThat(records, IsNull.notNullValue()); assertThat(records, IsNull.notNullValue());
assertThat(records, hasSize(0)); assertThat(records, hasSize(0));
verify(templateUtils, times(1)).ensureIndexTemplateIsLoaded(cs, "watch_history"); verify(templateUtils, times(1)).putTemplate(same("watch_history"), any(Settings.class));
verify(clientProxy, times(1)).refresh(any(RefreshRequest.class)); verify(clientProxy, times(1)).refresh(any(RefreshRequest.class));
} }
@ -383,7 +393,7 @@ public class HistoryStoreTests extends ElasticsearchTestCase {
assertThat(records, notNullValue()); assertThat(records, notNullValue());
assertThat(records, hasSize(0)); assertThat(records, hasSize(0));
verify(templateUtils, times(1)).ensureIndexTemplateIsLoaded(cs, "watch_history"); verify(templateUtils, times(1)).putTemplate(same("watch_history"), any(Settings.class));
verify(clientProxy, times(1)).refresh(any(RefreshRequest.class)); verify(clientProxy, times(1)).refresh(any(RefreshRequest.class));
} }

View File

@ -0,0 +1,39 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.support;
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.watcher.history.HistoryStore;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.junit.Test;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.SUITE;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.core.Is.is;
/**
*/
@ElasticsearchIntegrationTest.ClusterScope(scope = SUITE, numClientNodes = 0, transportClientRatio = 0, randomDynamicTemplates = false, numDataNodes = 1)
public class TemplateUtilsTests extends ElasticsearchIntegrationTest {
@Test
public void testPutTemplate() throws Exception {
TemplateUtils templateUtils = new TemplateUtils(ImmutableSettings.EMPTY, ClientProxy.of(client()));
ImmutableSettings.Builder options = ImmutableSettings.builder();
options.put("key", "value");
templateUtils.putTemplate(HistoryStore.INDEX_TEMPLATE_NAME, options.build());
GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(HistoryStore.INDEX_TEMPLATE_NAME).get();
// setting in the file on the classpath:
assertThat(response.getIndexTemplates().get(0).getSettings().getAsBoolean("index.mapper.dynamic", null), is(false));
// additional setting:
assertThat(response.getIndexTemplates().get(0).getSettings().get("index.key"), equalTo("value"));
}
}

View File

@ -0,0 +1,74 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.test.integration;
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.watcher.history.HistoryStore;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.junit.Test;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.TEST;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.core.Is.is;
/**
*/
@ElasticsearchIntegrationTest.ClusterScope(scope = TEST, numClientNodes = 0, transportClientRatio = 0, randomDynamicTemplates = false, numDataNodes = 1)
public class HistoryStoreSettingsTest extends AbstractWatcherIntegrationTests {
@Test
public void testChangeSettings() throws Exception {
GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(HistoryStore.INDEX_TEMPLATE_NAME).get();
assertThat(response.getIndexTemplates().get(0).getSettings().get("index.number_of_shards"), equalTo("1"));
assertThat(response.getIndexTemplates().get(0).getSettings().get("index.number_of_replicas"), nullValue()); // this isn't defined in the template, so we rely on ES's default, which is zero
assertThat(response.getIndexTemplates().get(0).getSettings().get("index.refresh_interval"), nullValue()); // this isn't defined in the template, so we rely on ES's default, which is 1s
client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(ImmutableSettings.builder()
.put("watcher.history.index.number_of_shards", "2")
.put("watcher.history.index.number_of_replicas", "2")
.put("watcher.history.index.refresh_interval", "5m"))
.get();
// use assertBusy(...) because we update the index template in an async manner
assertBusy(new Runnable() {
@Override
public void run() {
GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(HistoryStore.INDEX_TEMPLATE_NAME).get();
assertThat(response.getIndexTemplates().get(0).getSettings().get("index.number_of_shards"), equalTo("2"));
assertThat(response.getIndexTemplates().get(0).getSettings().get("index.number_of_replicas"), equalTo("2"));
assertThat(response.getIndexTemplates().get(0).getSettings().get("index.refresh_interval"), equalTo("5m"));
}
});
}
@Test
public void testChangeSettings_ignoringForbiddenSetting() throws Exception {
GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(HistoryStore.INDEX_TEMPLATE_NAME).get();
assertThat(response.getIndexTemplates().get(0).getSettings().get("index.number_of_shards"), equalTo("1"));
assertThat(response.getIndexTemplates().get(0).getSettings().getAsBoolean("index.mapper.dynamic", null), is(false));
client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(ImmutableSettings.builder()
.put("watcher.history.index.number_of_shards", "2")
.put("watcher.history.index.mapper.dynamic", true)) // forbidden setting, should not get updated
.get();
// use assertBusy(...) because we update the index template in an async manner
assertBusy(new Runnable() {
@Override
public void run() {
GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(HistoryStore.INDEX_TEMPLATE_NAME).get();
assertThat(response.getIndexTemplates().get(0).getSettings().get("index.number_of_shards"), equalTo("2"));
assertThat(response.getIndexTemplates().get(0).getSettings().getAsBoolean("index.mapper.dynamic", null), is(false));
}
});
}
}

View File

@ -68,7 +68,7 @@ public class WatchStoreTests extends ElasticsearchTestCase {
watchStore.start(cs); watchStore.start(cs);
assertThat(watchStore.started(), is(true)); assertThat(watchStore.started(), is(true));
assertThat(watchStore.watches().size(), equalTo(0)); assertThat(watchStore.watches().size(), equalTo(0));
verify(templateUtils, times(1)).ensureIndexTemplateIsLoaded(cs, "watches"); verify(templateUtils, times(1)).putTemplate("watches", null);
verifyZeroInteractions(clientProxy); verifyZeroInteractions(clientProxy);
watchStore.start(cs); watchStore.start(cs);
@ -209,7 +209,7 @@ public class WatchStoreTests extends ElasticsearchTestCase {
watchStore.start(cs); watchStore.start(cs);
assertThat(watchStore.started(), is(true)); assertThat(watchStore.started(), is(true));
assertThat(watchStore.watches().size(), equalTo(0)); assertThat(watchStore.watches().size(), equalTo(0));
verify(templateUtils, times(1)).ensureIndexTemplateIsLoaded(cs, "watches"); verify(templateUtils, times(1)).putTemplate("watches", null);
verify(clientProxy, times(1)).refresh(any(RefreshRequest.class)); verify(clientProxy, times(1)).refresh(any(RefreshRequest.class));
verify(clientProxy, times(1)).search(any(SearchRequest.class)); verify(clientProxy, times(1)).search(any(SearchRequest.class));
verify(clientProxy, times(1)).clearScroll(anyString()); verify(clientProxy, times(1)).clearScroll(anyString());
@ -264,7 +264,7 @@ public class WatchStoreTests extends ElasticsearchTestCase {
watchStore.start(cs); watchStore.start(cs);
assertThat(watchStore.started(), is(true)); assertThat(watchStore.started(), is(true));
assertThat(watchStore.watches().size(), equalTo(2)); assertThat(watchStore.watches().size(), equalTo(2));
verify(templateUtils, times(1)).ensureIndexTemplateIsLoaded(cs, "watches"); verify(templateUtils, times(1)).putTemplate("watches", null);
verify(clientProxy, times(1)).refresh(any(RefreshRequest.class)); verify(clientProxy, times(1)).refresh(any(RefreshRequest.class));
verify(clientProxy, times(1)).search(any(SearchRequest.class)); verify(clientProxy, times(1)).search(any(SearchRequest.class));
verify(clientProxy, times(2)).searchScroll(anyString(), any(TimeValue.class)); verify(clientProxy, times(2)).searchScroll(anyString(), any(TimeValue.class));