Core: Changed the way index templates are installed

Before the index templates were installed via json files in the config directory, this commit installs templates from the classpath via the put index template api.
Also template versioning is enforced.

Original commit: elastic/x-pack-elasticsearch@22f6596015
This commit is contained in:
Martijn van Groningen 2014-11-09 20:50:03 +01:00
parent b62da0691b
commit 15c1b4c56d
11 changed files with 441 additions and 21 deletions

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import java.io.IOException; import java.io.IOException;
@ -169,6 +170,10 @@ public class AlertManager extends AbstractComponent {
// We're not the master // We're not the master
stop(); stop();
} else { } else {
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
return; // wait until the gateway has recovered from disk
}
if (started.get()) { if (started.get()) {
return; // We're already started return; // We're already started
} }

View File

@ -20,6 +20,7 @@ public class AlertingModule extends AbstractModule {
@Override @Override
protected void configure() { protected void configure() {
bind(TemplateHelper.class).asEagerSingleton();
bind(AlertsStore.class).asEagerSingleton(); bind(AlertsStore.class).asEagerSingleton();
bind(AlertManager.class).asEagerSingleton(); bind(AlertManager.class).asEagerSingleton();
bind(AlertActionManager.class).asEagerSingleton(); bind(AlertActionManager.class).asEagerSingleton();

View File

@ -66,17 +66,19 @@ public class AlertsStore extends AbstractComponent {
private final AlertActionRegistry alertActionRegistry; private final AlertActionRegistry alertActionRegistry;
private final TriggerManager triggerManager; private final TriggerManager triggerManager;
private final AtomicReference<State> state = new AtomicReference<>(State.STOPPED); private final AtomicReference<State> state = new AtomicReference<>(State.STOPPED);
private final TemplateHelper templateHelper;
private final int scrollSize; private final int scrollSize;
private final TimeValue scrollTimeout; private final TimeValue scrollTimeout;
@Inject @Inject
public AlertsStore(Settings settings, Client client, ThreadPool threadPool, AlertActionRegistry alertActionRegistry, public AlertsStore(Settings settings, Client client, ThreadPool threadPool, AlertActionRegistry alertActionRegistry,
TriggerManager triggerManager) { TriggerManager triggerManager, TemplateHelper templateHelper) {
super(settings); super(settings);
this.client = client; this.client = client;
this.threadPool = threadPool; this.threadPool = threadPool;
this.alertActionRegistry = alertActionRegistry; this.alertActionRegistry = alertActionRegistry;
this.templateHelper = templateHelper;
this.alertMap = ConcurrentCollections.newConcurrentMap(); this.alertMap = ConcurrentCollections.newConcurrentMap();
// Not using component settings, to let AlertsStore and AlertActionManager share the same settings // Not using component settings, to let AlertsStore and AlertActionManager share the same settings
this.scrollSize = settings.getAsInt("alerts.scroll.size", 100); this.scrollSize = settings.getAsInt("alerts.scroll.size", 100);
@ -162,7 +164,7 @@ public class AlertsStore extends AbstractComponent {
return alertMap; return alertMap;
} }
public void start(ClusterState state, final LoadingListener listener) { public void start(final ClusterState state, final LoadingListener listener) {
IndexMetaData alertIndexMetaData = state.getMetaData().index(ALERT_INDEX); IndexMetaData alertIndexMetaData = state.getMetaData().index(ALERT_INDEX);
if (alertIndexMetaData != null) { if (alertIndexMetaData != null) {
logger.info("Previous alerting index"); logger.info("Previous alerting index");
@ -175,6 +177,7 @@ public class AlertsStore extends AbstractComponent {
public void run() { public void run() {
boolean success = false; boolean success = false;
try { try {
templateHelper.checkAndUploadIndexTemplate(state, "alerts");
loadAlerts(); loadAlerts();
success = true; success = true;
} catch (Exception e) { } catch (Exception e) {
@ -195,8 +198,9 @@ public class AlertsStore extends AbstractComponent {
} }
} }
} else { } else {
logger.info("No previous .alert index");
if (AlertsStore.this.state.compareAndSet(State.STOPPED, State.STARTED)) { if (AlertsStore.this.state.compareAndSet(State.STOPPED, State.STARTED)) {
logger.info("No previous .alert index");
templateHelper.checkAndUploadIndexTemplate(state, "alerts");
listener.onSuccess(); listener.onSuccess();
} }
} }

View File

@ -0,0 +1,174 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.action.admin.indices.template.put.TransportPutIndexTemplateAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexTemplateService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
/**
*/
public class TemplateHelper extends AbstractComponent {
private final MetaDataIndexTemplateService indexTemplateService;
private final TransportPutIndexTemplateAction transportPutIndexTemplateAction;
@Inject
public TemplateHelper(Settings settings, MetaDataIndexTemplateService indexTemplateService, TransportPutIndexTemplateAction transportPutIndexTemplateAction) {
super(settings);
this.indexTemplateService = indexTemplateService;
this.transportPutIndexTemplateAction = transportPutIndexTemplateAction;
}
/**
* Checks if the template with the specified name exists and has the expected version.
* If that isn't the case then the template from the classpath will be uploaded to the cluster
*/
public void checkAndUploadIndexTemplate(ClusterState state, final String templateName) {
final byte[] template;
try {
InputStream is = AlertsStore.class.getResourceAsStream("/" + templateName + ".json");
if (is == null) {
throw new FileNotFoundException("Resource [/" + templateName + ".json] not found in classpath");
}
template = Streams.copyToByteArray(is);
is.close();
} catch (IOException e) {
// throwing an exception to stop exporting process - we don't want to send data unless
// we put in the template for it.
throw new RuntimeException("failed to load " + templateName + ".json", e);
}
try {
int expectedVersion = parseIndexVersionFromTemplate(template);
if (expectedVersion < 0) {
throw new RuntimeException("failed to find an index version in pre-configured index template");
}
IndexTemplateMetaData templateMetaData = state.metaData().templates().get(templateName);
if (templateMetaData != null) {
int foundVersion = templateMetaData.getSettings().getAsInt("alerts.template_version", -1);
if (foundVersion < 0) {
logger.warn("found an existing index template [{}] but couldn't extract it's version. leaving it as is.", templateName);
return;
} else if (foundVersion >= expectedVersion) {
logger.info("accepting existing index template [{}] (version [{}], needed [{}])", templateName, foundVersion, expectedVersion);
return;
} else {
logger.info("replacing existing index template [{}] (version [{}], needed [{}])", templateName, foundVersion, expectedVersion);
}
} else {
logger.info("Adding index template [{}], because none was found", templateName);
}
MetaDataIndexTemplateService.PutRequest putRequest = new MetaDataIndexTemplateService.PutRequest("alerts-template", templateName);
XContent xContent = XContentFactory.xContent(template, 0, template.length);
try (XContentParser parser = xContent.createParser(template, 0, template.length)) {
String currentFieldName = null;
XContentParser.Token token = parser.nextToken();
assert token == XContentParser.Token.START_OBJECT;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
switch (token) {
case FIELD_NAME:
currentFieldName = parser.currentName();
break;
case START_OBJECT:
switch (currentFieldName) {
case "settings":
XContentBuilder settingsBuilder = jsonBuilder();
settingsBuilder.copyCurrentStructure(parser);
String source = settingsBuilder.string();
putRequest.settings(ImmutableSettings.settingsBuilder().loadFromSource(source).build());
break;
case "mappings":
Map<String, String> mappingSource = new HashMap<>();
String currentMappingFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
switch (token) {
case FIELD_NAME:
currentMappingFieldName = parser.currentName();
break;
case START_OBJECT:
XContentBuilder mappingsBuilder = jsonBuilder();
mappingsBuilder.copyCurrentStructure(parser);
mappingSource.put(currentMappingFieldName, mappingsBuilder.string());
break;
}
}
putRequest.mappings(mappingSource);
break;
default:
throw new ElasticsearchIllegalArgumentException("Unsupported token [" + token + "]");
}
break;
case VALUE_STRING:
if ("template".equals(currentFieldName)) {
putRequest.template(parser.textOrNull());
} else {
throw new ElasticsearchIllegalArgumentException("Unsupported field [" + currentFieldName + "]");
}
break;
case VALUE_NUMBER:
if ("order".equals(currentFieldName)) {
putRequest.order(parser.intValue());
} else {
throw new ElasticsearchIllegalArgumentException("Unsupported field [" + currentFieldName + "]");
}
break;
default:
throw new ElasticsearchIllegalArgumentException("Unsupported token [" + token + "]");
}
}
}
indexTemplateService.putTemplate(putRequest, new MetaDataIndexTemplateService.PutListener() {
@Override
public void onResponse(MetaDataIndexTemplateService.PutResponse response) {
logger.info("Adding template [{}] was successful", templateName);
}
@Override
public void onFailure(Throwable t) {
logger.debug("failed to add template [{}]", t, templateName);
}
});
} catch (IOException e) {
// if we're not sure of the template, we can't send data... re-raise exception.
throw new RuntimeException("failed to load/verify index template", e);
}
}
private static int parseIndexVersionFromTemplate(byte[] template) throws UnsupportedEncodingException {
Pattern versionRegex = Pattern.compile("alerts.template_version\"\\s*:\\s*\"?(\\d+)\"?");
Matcher matcher = versionRegex.matcher(new String(template, "UTF-8"));
if (matcher.find()) {
return Integer.parseInt(matcher.group(1));
} else {
return -1;
}
}
}

View File

@ -12,10 +12,7 @@ import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.alerts.Alert; import org.elasticsearch.alerts.*;
import org.elasticsearch.alerts.AlertManager;
import org.elasticsearch.alerts.AlertsStore;
import org.elasticsearch.alerts.LoadingListener;
import org.elasticsearch.alerts.plugin.AlertsPlugin; import org.elasticsearch.alerts.plugin.AlertsPlugin;
import org.elasticsearch.alerts.triggers.TriggerManager; import org.elasticsearch.alerts.triggers.TriggerManager;
import org.elasticsearch.alerts.triggers.TriggerResult; import org.elasticsearch.alerts.triggers.TriggerResult;
@ -63,6 +60,7 @@ public class AlertActionManager extends AbstractComponent {
private final AlertsStore alertsStore; private final AlertsStore alertsStore;
private final AlertActionRegistry actionRegistry; private final AlertActionRegistry actionRegistry;
private final TriggerManager triggerManager; private final TriggerManager triggerManager;
private final TemplateHelper templateHelper;
private AlertManager alertManager; private AlertManager alertManager;
private final BlockingQueue<AlertActionEntry> actionsToBeProcessed = new LinkedBlockingQueue<>(); private final BlockingQueue<AlertActionEntry> actionsToBeProcessed = new LinkedBlockingQueue<>();
@ -75,13 +73,15 @@ public class AlertActionManager extends AbstractComponent {
@Inject @Inject
public AlertActionManager(Settings settings, Client client, AlertActionRegistry actionRegistry, public AlertActionManager(Settings settings, Client client, AlertActionRegistry actionRegistry,
ThreadPool threadPool, AlertsStore alertsStore, TriggerManager triggerManager) { ThreadPool threadPool, AlertsStore alertsStore, TriggerManager triggerManager,
TemplateHelper templateHelper) {
super(settings); super(settings);
this.client = client; this.client = client;
this.actionRegistry = actionRegistry; this.actionRegistry = actionRegistry;
this.threadPool = threadPool; this.threadPool = threadPool;
this.alertsStore = alertsStore; this.alertsStore = alertsStore;
this.triggerManager = triggerManager; this.triggerManager = triggerManager;
this.templateHelper = templateHelper;
// Not using component settings, to let AlertsStore and AlertActionManager share the same settings // Not using component settings, to let AlertsStore and AlertActionManager share the same settings
this.scrollSize = settings.getAsInt("alerts.scroll.size", 100); this.scrollSize = settings.getAsInt("alerts.scroll.size", 100);
this.scrollTimeout = settings.getAsTime("alerts.scroll.timeout", TimeValue.timeValueSeconds(30)); this.scrollTimeout = settings.getAsTime("alerts.scroll.timeout", TimeValue.timeValueSeconds(30));
@ -91,7 +91,7 @@ public class AlertActionManager extends AbstractComponent {
this.alertManager = alertManager; this.alertManager = alertManager;
} }
public void start(ClusterState state, final LoadingListener listener) { public void start(final ClusterState state, final LoadingListener listener) {
IndexMetaData indexMetaData = state.getMetaData().index(ALERT_HISTORY_INDEX); IndexMetaData indexMetaData = state.getMetaData().index(ALERT_HISTORY_INDEX);
if (indexMetaData != null) { if (indexMetaData != null) {
if (state.routingTable().index(ALERT_HISTORY_INDEX).allPrimaryShardsActive()) { if (state.routingTable().index(ALERT_HISTORY_INDEX).allPrimaryShardsActive()) {
@ -107,6 +107,7 @@ public class AlertActionManager extends AbstractComponent {
logger.error("Unable to load unfinished jobs into the job queue", e); logger.error("Unable to load unfinished jobs into the job queue", e);
} finally { } finally {
if (success) { if (success) {
templateHelper.checkAndUploadIndexTemplate(state, "alerthistory");
if (AlertActionManager.this.state.compareAndSet(State.LOADING, State.STARTED)) { if (AlertActionManager.this.state.compareAndSet(State.LOADING, State.STARTED)) {
doStart(); doStart();
listener.onSuccess(); listener.onSuccess();
@ -123,6 +124,7 @@ public class AlertActionManager extends AbstractComponent {
} }
} else { } else {
if (this.state.compareAndSet(State.STOPPED, State.STARTED)) { if (this.state.compareAndSet(State.STOPPED, State.STARTED)) {
templateHelper.checkAndUploadIndexTemplate(state, "alerthistory");
doStart(); doStart();
listener.onSuccess(); listener.onSuccess();
} }
@ -313,7 +315,10 @@ public class AlertActionManager extends AbstractComponent {
threadPool.executor(AlertsPlugin.ALERT_THREAD_POOL_NAME).execute(new AlertHistoryRunnable(entry)); threadPool.executor(AlertsPlugin.ALERT_THREAD_POOL_NAME).execute(new AlertHistoryRunnable(entry));
} }
} catch (Exception e) { } catch (Exception e) {
if (started() && !(e instanceof InterruptedException)) { if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
if (started()) {
logger.error("Error during reader thread, restarting queue reader thread...", e); logger.error("Error during reader thread, restarting queue reader thread...", e);
threadPool.executor(ThreadPool.Names.GENERIC).execute(new QueueReaderThread()); threadPool.executor(ThreadPool.Names.GENERIC).execute(new QueueReaderThread());
} else { } else {

View File

@ -11,10 +11,6 @@ import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.xcontent.*; import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.script.ExecutableScript; import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptService;
@ -36,11 +32,9 @@ public class ScriptedTriggerFactory implements TriggerFactory {
String scriptLang = null; String scriptLang = null;
String script = null; String script = null;
ScriptService.ScriptType scriptType = null; ScriptService.ScriptType scriptType = null;
ESLogger logger = Loggers.getLogger(this.getClass());
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) { if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName(); currentFieldName = parser.currentName();
logger.error("FOOOBAR : [{}]", currentFieldName);
} else if (token.isValue()) { } else if (token.isValue()) {
switch (currentFieldName) { switch (currentFieldName) {
case "script_id" : case "script_id" :

View File

@ -0,0 +1,56 @@
{
"template": "alerthistory",
"order": 0,
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1,
"alerts.template_version": 1
},
"mappings": {
"alerthistory": {
"dynamic" : "strict",
"properties": {
"alert_name": {
"type": "string",
"index": "not_analyzed"
},
"triggered": {
"type": "boolean"
},
"fire_time": {
"type": "date"
},
"scheduled_fire_time": {
"type": "date"
},
"state": {
"type": "string",
"index": "not_analyzed"
},
"errorMsg": {
"type": "string"
},
"request_binary" : {
"type" : "binary"
},
"response_binary" : {
"type" : "binary"
},
"response" : {
"type" : "object",
"dynamic" : true
},
"trigger": {
"type": "object",
"enabled" : false,
"dynamic" : true
},
"actions": {
"type" : "object",
"enabled" : false,
"dynamic" : true
}
}
}
}
}

View File

@ -0,0 +1,46 @@
{
"template": ".alerts",
"order": 0,
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1,
"alerts.template_version": 1
},
"mappings": {
"alert": {
"dynamic" : "strict",
"properties": {
"enable": {
"type": "boolean"
},
"schedule": {
"type": "string"
},
"last_fire_time": {
"type": "date"
},
"last_action_fire": {
"type": "date"
},
"trigger": {
"type": "object",
"enabled" : false,
"dynamic" : true
},
"actions": {
"type" : "object",
"enabled" : false,
"dynamic" : true
},
"request_binary": {
"type" : "binary"
},
"request": {
"type" : "object",
"enabled" : false,
"dynamic" : true
}
}
}
}
}

View File

@ -13,18 +13,28 @@ import org.elasticsearch.alerts.actions.AlertActionManager;
import org.elasticsearch.alerts.actions.AlertActionState; import org.elasticsearch.alerts.actions.AlertActionState;
import org.elasticsearch.alerts.client.AlertsClient; import org.elasticsearch.alerts.client.AlertsClient;
import org.elasticsearch.alerts.plugin.AlertsPlugin; import org.elasticsearch.alerts.plugin.AlertsPlugin;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.TestCluster;
import org.junit.After; import org.junit.After;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.boolQuery; import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
@ -47,10 +57,18 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
.build(); .build();
} }
@Override
protected TestCluster buildTestCluster(Scope scope, long seed) throws IOException {
// This overwrites the wipe logic of the test cluster to not remove the alerts and alerthistory templates. By default all templates are removed
// TODO: We should have the notion of a hidden template (like hidden index / type) that only gets removed when specifically mentioned.
final TestCluster testCluster = super.buildTestCluster(scope, seed);
return new AlertingWrappingCluster(seed, testCluster);
}
@After @After
public void clearAlerts() { public void clearAlerts() {
// Clear all in-memory alerts on all nodes, perhaps there isn't an elected master at this point // Clear all in-memory alerts on all nodes, perhaps there isn't an elected master at this point
for (AlertManager manager : internalCluster().getInstances(AlertManager.class)) { for (AlertManager manager : internalTestCluster().getInstances(AlertManager.class)) {
manager.clear(); manager.clear();
} }
} }
@ -86,7 +104,7 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
} }
protected AlertsClient alertClient() { protected AlertsClient alertClient() {
return internalCluster().getInstance(AlertsClient.class); return internalTestCluster().getInstance(AlertsClient.class);
} }
protected void assertAlertTriggered(final String alertName) throws Exception { protected void assertAlertTriggered(final String alertName) throws Exception {
@ -135,4 +153,103 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
}); });
} }
protected static InternalTestCluster internalTestCluster() {
return (InternalTestCluster) ((AlertingWrappingCluster) cluster()).testCluster;
}
private static class AlertingWrappingCluster extends TestCluster {
private final TestCluster testCluster;
private AlertingWrappingCluster(long seed, TestCluster testCluster) {
super(seed);
this.testCluster = testCluster;
}
@Override
public void beforeTest(Random random, double transportClientRatio) throws IOException {
testCluster.beforeTest(random, transportClientRatio);
}
@Override
public void wipe() {
wipeIndices("_all");
wipeRepositories();
if (size() > 0) {
List<String> templatesToWipe = new ArrayList<>();
ClusterState state = client().admin().cluster().prepareState().get().getState();
for (ObjectObjectCursor<String, IndexTemplateMetaData> cursor : state.getMetaData().templates()) {
if (cursor.key.equals("alerts") || cursor.key.equals("alerthistory")) {
continue;
}
templatesToWipe.add(cursor.key);
}
if (!templatesToWipe.isEmpty()) {
wipeTemplates(templatesToWipe.toArray(new String[templatesToWipe.size()]));
}
}
}
@Override
public void afterTest() throws IOException {
testCluster.afterTest();
}
@Override
public Client client() {
return testCluster.client();
}
@Override
public int size() {
return testCluster.size();
}
@Override
public int numDataNodes() {
return testCluster.numDataNodes();
}
@Override
public int numDataAndMasterNodes() {
return testCluster.numDataAndMasterNodes();
}
@Override
public int numBenchNodes() {
return testCluster.numBenchNodes();
}
@Override
public InetSocketAddress[] httpAddresses() {
return testCluster.httpAddresses();
}
@Override
public void close() throws IOException {
testCluster.close();
}
@Override
public void ensureEstimatedStats() {
testCluster.ensureEstimatedStats();
}
@Override
public boolean hasFilterCache() {
return testCluster.hasFilterCache();
}
@Override
public String getClusterName() {
return testCluster.getClusterName();
}
@Override
public Iterator<Client> iterator() {
return testCluster.iterator();
}
}
} }

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.alerts; package org.elasticsearch.alerts;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.alerts.client.AlertsClientInterface; import org.elasticsearch.alerts.client.AlertsClientInterface;
import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertRequest; import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertRequest;
@ -14,6 +15,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test; import org.junit.Test;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.index.query.QueryBuilders.termQuery; import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource; import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
@ -84,4 +86,20 @@ public class BasicAlertingTest extends AbstractAlertingTests {
assertFalse(deleteAlertResponse.deleteResponse().isFound()); assertFalse(deleteAlertResponse.deleteResponse().isFound());
} }
@Test(expected = ElasticsearchIllegalArgumentException.class)
public void testMalformedAlert() throws Exception {
AlertsClientInterface alertsClient = alertClient();
createIndex("my-index");
// Have a sample document in the index, the alert is going to evaluate
client().prepareIndex("my-index", "my-type").setSource("field", "value").get();
BytesReference alertSource = jsonBuilder().startObject()
.field("schedule", "0/5 * * * * ? *")
.startObject("trigger").startObject("script").field("script", "return trie").endObject().endObject()
.field("enable", true)
.field("malformed_field", "x")
.endObject().bytes();
alertsClient.prepareIndexAlert("my-first-alert")
.setAlertSource(alertSource)
.get();
}
} }

View File

@ -43,7 +43,7 @@ public class NoMasterNodeTests extends AbstractAlertingTests {
@Test @Test
public void testSimpleFailure() throws Exception { public void testSimpleFailure() throws Exception {
internalCluster().startNodesAsync(2).get(); internalTestCluster().startNodesAsync(2).get();
AlertsClientInterface alertsClient = alertClient(); AlertsClientInterface alertsClient = alertClient();
createIndex("my-index"); createIndex("my-index");
// Have a sample document in the index, the alert is going to evaluate // Have a sample document in the index, the alert is going to evaluate
@ -56,7 +56,7 @@ public class NoMasterNodeTests extends AbstractAlertingTests {
assertAlertTriggered("my-first-alert"); assertAlertTriggered("my-first-alert");
// Stop the elected master, no new master will be elected b/c of m_m_n is set to 2 // Stop the elected master, no new master will be elected b/c of m_m_n is set to 2
internalCluster().stopCurrentMasterNode(); internalTestCluster().stopCurrentMasterNode();
assertThat(awaitBusy(new Predicate<Object>() { assertThat(awaitBusy(new Predicate<Object>() {
public boolean apply(Object obj) { public boolean apply(Object obj) {
ClusterState state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); ClusterState state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
@ -75,7 +75,7 @@ public class NoMasterNodeTests extends AbstractAlertingTests {
} }
// Bring back the 2nd node and wait for elected master node to come back and alerting to work as expected. // Bring back the 2nd node and wait for elected master node to come back and alerting to work as expected.
internalCluster().startNode(); internalTestCluster().startNode();
ensureGreen(); ensureGreen();
// Delete an existing alert // Delete an existing alert