Alert History : Make alert history index time based.

This commit makes the alert history index a time based index.
The alert history now is a timebased index prefixed with .alert_history_
with the time fomat YYYY-MM-dd.

This commit makes the alert history index a time based index.

Original commit: elastic/x-pack-elasticsearch@df6d6dee29
This commit is contained in:
Brian Murphy 2014-11-26 13:32:47 +00:00
parent 8512dfcb36
commit f6027e9a6b
15 changed files with 123 additions and 80 deletions

View File

@ -24,7 +24,7 @@ public class Alert implements ToXContent {
private AlertTrigger trigger; private AlertTrigger trigger;
private List<AlertAction> actions; private List<AlertAction> actions;
private String schedule; private String schedule;
private DateTime lastActionFire; private DateTime lastExecuteTime;
private long version; private long version;
private TimeValue throttlePeriod = new TimeValue(0); private TimeValue throttlePeriod = new TimeValue(0);
private DateTime timeLastActionExecuted = null; private DateTime timeLastActionExecuted = null;
@ -34,13 +34,14 @@ public class Alert implements ToXContent {
actions = new ArrayList<>(); actions = new ArrayList<>();
} }
public Alert(String alertName, SearchRequest searchRequest, AlertTrigger trigger, List<AlertAction> actions, String schedule, DateTime lastActionFire, long version, TimeValue throttlePeriod, AlertAckState ackState) {
public Alert(String alertName, SearchRequest searchRequest, AlertTrigger trigger, List<AlertAction> actions, String schedule, DateTime lastExecuteTime, long version, TimeValue throttlePeriod, AlertAckState ackState) {
this.alertName = alertName; this.alertName = alertName;
this.searchRequest = searchRequest; this.searchRequest = searchRequest;
this.trigger = trigger; this.trigger = trigger;
this.actions = actions; this.actions = actions;
this.schedule = schedule; this.schedule = schedule;
this.lastActionFire = lastActionFire; this.lastExecuteTime = lastExecuteTime;
this.version = version; this.version = version;
this.throttlePeriod = throttlePeriod; this.throttlePeriod = throttlePeriod;
this.ackState = ackState; this.ackState = ackState;
@ -59,8 +60,8 @@ public class Alert implements ToXContent {
builder.field(AlertsStore.LAST_ACTION_EXECUTED_FIELD.getPreferredName(), timeLastActionExecuted); builder.field(AlertsStore.LAST_ACTION_EXECUTED_FIELD.getPreferredName(), timeLastActionExecuted);
} }
if (lastActionFire != null) { if (lastExecuteTime != null) {
builder.field(AlertsStore.LAST_ACTION_FIRE.getPreferredName(), lastActionFire); builder.field(AlertsStore.LAST_ACTION_FIRE.getPreferredName(), lastExecuteTime);
} }
if (actions != null && !actions.isEmpty()) { if (actions != null && !actions.isEmpty()) {
@ -85,12 +86,12 @@ public class Alert implements ToXContent {
/** /**
* @return The last time this alert ran. * @return The last time this alert ran.
*/ */
public DateTime lastActionFire() { public DateTime lastExecuteTime() {
return lastActionFire; return lastExecuteTime;
} }
public void lastActionFire(DateTime lastActionFire) { public void lastExecuteTime(DateTime lastActionFire) {
this.lastActionFire = lastActionFire; this.lastExecuteTime = lastActionFire;
} }
/** /**

View File

@ -142,24 +142,23 @@ public class AlertManager extends AbstractComponent {
} }
TriggerResult triggerResult = triggerManager.isTriggered(alert, entry.getScheduledTime(), entry.getFireTime()); TriggerResult triggerResult = triggerManager.isTriggered(alert, entry.getScheduledTime(), entry.getFireTime());
entry.setSearchResponse(triggerResult.getResponse()); entry.setSearchResponse(triggerResult.getResponse());
if (triggerResult.isTriggered()) { if (triggerResult.isTriggered()) {
entry.setTriggered(true); entry.setTriggered(true);
if (!isActionThrottled(alert)) { if (!isActionThrottled(alert)) {
actionRegistry.doAction(alert, triggerResult); actionRegistry.doAction(alert, triggerResult);
alert.setTimeLastActionExecuted(entry.getFireTime()); alert.setTimeLastActionExecuted(entry.getScheduledTime());
if (alert.getAckState() == AlertAckState.NOT_TRIGGERED) { if (alert.getAckState() == AlertAckState.NOT_TRIGGERED) {
alert.setAckState(AlertAckState.NEEDS_ACK); alert.setAckState(AlertAckState.NEEDS_ACK);
} }
} else { } else {
entry.setState(AlertActionState.THROTTLED); entry.setState(AlertActionState.THROTTLED);
} }
alert.lastActionFire(entry.getFireTime());
alertsStore.updateAlert(alert);
} else if (alert.getAckState() == AlertAckState.ACKED) { } else if (alert.getAckState() == AlertAckState.ACKED) {
alert.setAckState(AlertAckState.NOT_TRIGGERED); alert.setAckState(AlertAckState.NOT_TRIGGERED);
alertsStore.updateAlert(alert);
} }
alert.lastExecuteTime(entry.getFireTime());
alertsStore.updateAlert(alert);
return triggerResult; return triggerResult;
} finally { } finally {
alertLock.release(entry.getAlertName()); alertLock.release(entry.getAlertName());

View File

@ -51,7 +51,7 @@ public class AlertsStore extends AbstractComponent {
public static final ParseField SCHEDULE_FIELD = new ParseField("schedule"); public static final ParseField SCHEDULE_FIELD = new ParseField("schedule");
public static final ParseField TRIGGER_FIELD = new ParseField("trigger"); public static final ParseField TRIGGER_FIELD = new ParseField("trigger");
public static final ParseField ACTION_FIELD = new ParseField("actions"); public static final ParseField ACTION_FIELD = new ParseField("actions");
public static final ParseField LAST_ACTION_FIRE = new ParseField("last_action_fire"); public static final ParseField LAST_ACTION_FIRE = new ParseField("last_alert_executed");
public static final ParseField REQUEST_FIELD = new ParseField("request"); public static final ParseField REQUEST_FIELD = new ParseField("request");
public static final ParseField THROTTLE_PERIOD_FIELD = new ParseField("throttle_period"); public static final ParseField THROTTLE_PERIOD_FIELD = new ParseField("throttle_period");
public static final ParseField LAST_ACTION_EXECUTED_FIELD = new ParseField("last_action_executed"); public static final ParseField LAST_ACTION_EXECUTED_FIELD = new ParseField("last_action_executed");
@ -250,7 +250,7 @@ public class AlertsStore extends AbstractComponent {
if (SCHEDULE_FIELD.match(currentFieldName)) { if (SCHEDULE_FIELD.match(currentFieldName)) {
alert.schedule(parser.textOrNull()); alert.schedule(parser.textOrNull());
} else if (LAST_ACTION_FIRE.match(currentFieldName)) { } else if (LAST_ACTION_FIRE.match(currentFieldName)) {
alert.lastActionFire(DateTime.parse(parser.textOrNull())); alert.lastExecuteTime(DateTime.parse(parser.textOrNull()));
} else if (LAST_ACTION_EXECUTED_FIELD.match(currentFieldName)) { } else if (LAST_ACTION_EXECUTED_FIELD.match(currentFieldName)) {
alert.setTimeLastActionExecuted(DateTime.parse(parser.textOrNull())); alert.setTimeLastActionExecuted(DateTime.parse(parser.textOrNull()));
} else if (THROTTLE_PERIOD_FIELD.match(currentFieldName)) { } else if (THROTTLE_PERIOD_FIELD.match(currentFieldName)) {
@ -268,8 +268,8 @@ public class AlertsStore extends AbstractComponent {
throw new ElasticsearchException("Error during parsing alert", e); throw new ElasticsearchException("Error during parsing alert", e);
} }
if (alert.lastActionFire() == null) { if (alert.lastExecuteTime() == null) {
alert.lastActionFire(new DateTime(0)); alert.lastExecuteTime(new DateTime(0));
} }
if (alert.schedule() == null) { if (alert.schedule() == null) {

View File

@ -13,6 +13,7 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.alerts.*; import org.elasticsearch.alerts.*;
import org.elasticsearch.alerts.plugin.AlertsPlugin; import org.elasticsearch.alerts.plugin.AlertsPlugin;
import org.elasticsearch.alerts.triggers.TriggerManager; import org.elasticsearch.alerts.triggers.TriggerManager;
@ -21,9 +22,13 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.hppc.cursors.ObjectCursor;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.format.DateTimeFormat;
import org.elasticsearch.common.joda.time.format.DateTimeFormatter;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
@ -54,7 +59,8 @@ public class AlertActionManager extends AbstractComponent {
public static final String ACTIONS_FIELD = "actions"; public static final String ACTIONS_FIELD = "actions";
public static final String STATE = "state"; public static final String STATE = "state";
public static final String ALERT_HISTORY_INDEX = ".alert_history"; public static final String ALERT_HISTORY_INDEX_PREFIX = ".alert_history_";
public static final DateTimeFormatter alertHistoryIndexTimeFormat = DateTimeFormat.forPattern("YYYY-MM-dd");
public static final String ALERT_HISTORY_TYPE = "alerthistory"; public static final String ALERT_HISTORY_TYPE = "alerthistory";
private static AlertActionEntry END_ENTRY = new AlertActionEntry(); private static AlertActionEntry END_ENTRY = new AlertActionEntry();
@ -98,29 +104,34 @@ public class AlertActionManager extends AbstractComponent {
if (started.get()) { if (started.get()) {
return true; return true;
} }
String[] indices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), ALERT_HISTORY_INDEX_PREFIX + "*");
IndexMetaData indexMetaData = state.getMetaData().index(ALERT_HISTORY_INDEX); if (indices.length == 0) {
if (indexMetaData != null) {
if (state.routingTable().index(ALERT_HISTORY_INDEX).allPrimaryShardsActive()) {
try {
loadQueue();
} catch (Exception e) {
logger.error("Unable to load unfinished jobs into the job queue", e);
actionsToBeProcessed.clear();
}
templateHelper.checkAndUploadIndexTemplate(state, "alerthistory");
doStart();
return true;
} else {
logger.info("Not all primary shards of the .alertshistory index are started");
return false;
}
} else {
logger.info("No previous .alerthistory index, skip loading of alert actions"); logger.info("No previous .alerthistory index, skip loading of alert actions");
templateHelper.checkAndUploadIndexTemplate(state, "alerthistory"); templateHelper.checkAndUploadIndexTemplate(state, "alerthistory");
doStart(); doStart();
return true; return true;
} }
for (String index : indices) {
IndexMetaData indexMetaData = state.getMetaData().index(index);
if (indexMetaData != null) {
if (!state.routingTable().index(index).allPrimaryShardsActive()) {
logger.info("Not all primary shards of the [{}] index are started", index);
return false;
}
}
}
try {
loadQueue();
} catch (Exception e) {
logger.error("Unable to load unfinished jobs into the job queue", e);
actionsToBeProcessed.clear();
}
templateHelper.checkAndUploadIndexTemplate(state, "alerthistory");
doStart();
return true;
} }
public void stop() { public void stop() {
@ -135,6 +146,20 @@ public class AlertActionManager extends AbstractComponent {
return started.get(); return started.get();
} }
/**
* Calculates the correct alert history index name for a given time using alertHistoryIndexTimeFormat
<<<<<<< HEAD
=======
* @param time
* @return
>>>>>>> 7462f14f6d6a8c1529fc4f4203184f30c83057d7
*/
public static String getAlertHistoryIndexNameForTime(DateTime time) {
StringBuffer sb = new StringBuffer(ALERT_HISTORY_INDEX_PREFIX);
alertHistoryIndexTimeFormat.printTo(sb, time);
return sb.toString();
}
private void doStart() { private void doStart() {
logger.info("Starting job queue"); logger.info("Starting job queue");
if (started.compareAndSet(false, true)) { if (started.compareAndSet(false, true)) {
@ -143,9 +168,9 @@ public class AlertActionManager extends AbstractComponent {
} }
public void loadQueue() { public void loadQueue() {
client.admin().indices().refresh(new RefreshRequest(ALERT_HISTORY_INDEX)).actionGet(); client.admin().indices().refresh(new RefreshRequest(ALERT_HISTORY_INDEX_PREFIX + "*")).actionGet();
SearchResponse response = client.prepareSearch(ALERT_HISTORY_INDEX) SearchResponse response = client.prepareSearch(ALERT_HISTORY_INDEX_PREFIX + "*")
.setQuery(QueryBuilders.termQuery(STATE, AlertActionState.SEARCH_NEEDED.toString())) .setQuery(QueryBuilders.termQuery(STATE, AlertActionState.SEARCH_NEEDED.toString()))
.setSearchType(SearchType.SCAN) .setSearchType(SearchType.SCAN)
.setScroll(scrollTimeout) .setScroll(scrollTimeout)
@ -241,7 +266,7 @@ public class AlertActionManager extends AbstractComponent {
public void addAlertAction(Alert alert, DateTime scheduledFireTime, DateTime fireTime) throws IOException { public void addAlertAction(Alert alert, DateTime scheduledFireTime, DateTime fireTime) throws IOException {
ensureStarted(); ensureStarted();
AlertActionEntry entry = new AlertActionEntry(alert, scheduledFireTime, fireTime, AlertActionState.SEARCH_NEEDED); AlertActionEntry entry = new AlertActionEntry(alert, scheduledFireTime, fireTime, AlertActionState.SEARCH_NEEDED);
IndexResponse response = client.prepareIndex(ALERT_HISTORY_INDEX, ALERT_HISTORY_TYPE, entry.getId()) IndexResponse response = client.prepareIndex(getAlertHistoryIndexNameForTime(scheduledFireTime), ALERT_HISTORY_TYPE, entry.getId())
.setSource(XContentFactory.jsonBuilder().value(entry)) .setSource(XContentFactory.jsonBuilder().value(entry))
.setOpType(IndexRequest.OpType.CREATE) .setOpType(IndexRequest.OpType.CREATE)
.get(); .get();
@ -264,7 +289,7 @@ public class AlertActionManager extends AbstractComponent {
private void updateHistoryEntry(AlertActionEntry entry) throws IOException { private void updateHistoryEntry(AlertActionEntry entry) throws IOException {
ensureStarted(); ensureStarted();
IndexResponse response = client.prepareIndex(ALERT_HISTORY_INDEX, ALERT_HISTORY_TYPE, entry.getId()) IndexResponse response = client.prepareIndex(getAlertHistoryIndexNameForTime(entry.getScheduledTime()), ALERT_HISTORY_TYPE, entry.getId())
.setSource(XContentFactory.jsonBuilder().value(entry)) .setSource(XContentFactory.jsonBuilder().value(entry))
.get(); .get();
entry.setVersion(response.getVersion()); entry.setVersion(response.getVersion());

View File

@ -71,7 +71,7 @@ public class IndexAlertActionFactory implements AlertActionFactory {
XContentBuilder resultBuilder = XContentFactory.jsonBuilder().prettyPrint(); XContentBuilder resultBuilder = XContentFactory.jsonBuilder().prettyPrint();
resultBuilder.startObject(); resultBuilder.startObject();
resultBuilder.field("response", result.getResponse()); resultBuilder.field("response", result.getResponse());
resultBuilder.field("timestamp", alert.lastActionFire()); ///@TODO FIXME the firetime should be in the result ? resultBuilder.field("timestamp", alert.lastExecuteTime()); ///@TODO FIXME the firetime should be in the result ?
resultBuilder.endObject(); resultBuilder.endObject();
indexRequest.source(resultBuilder); indexRequest.source(resultBuilder);
} catch (IOException ie) { } catch (IOException ie) {

View File

@ -8,6 +8,7 @@ package org.elasticsearch.alerts.transport.actions.ack;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.alerts.AlertManager; import org.elasticsearch.alerts.AlertManager;
import org.elasticsearch.alerts.AlertsStore; import org.elasticsearch.alerts.AlertsStore;
@ -21,6 +22,10 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/** /**
* Performs the delete operation. * Performs the delete operation.
*/ */
@ -62,7 +67,10 @@ public class TransportAckAlertAction extends TransportMasterNodeOperationAction<
@Override @Override
protected ClusterBlockException checkBlock(AckAlertRequest request, ClusterState state) { protected ClusterBlockException checkBlock(AckAlertRequest request, ClusterState state) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.WRITE, new String[]{AlertsStore.ALERT_INDEX, AlertActionManager.ALERT_HISTORY_INDEX}); String[] indices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), AlertActionManager.ALERT_HISTORY_INDEX_PREFIX + "*");
List<String> indicesToCheck = new ArrayList<>(Arrays.asList(indices));
indicesToCheck.add(AlertsStore.ALERT_INDEX);
return state.blocks().indicesBlockedException(ClusterBlockLevel.WRITE, indicesToCheck.toArray(new String[indicesToCheck.size()]));
} }

View File

@ -8,6 +8,7 @@ package org.elasticsearch.alerts.transport.actions.delete;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.alerts.AlertManager; import org.elasticsearch.alerts.AlertManager;
import org.elasticsearch.alerts.AlertsStore; import org.elasticsearch.alerts.AlertsStore;
@ -21,6 +22,10 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/** /**
* Performs the delete operation. * Performs the delete operation.
*/ */
@ -62,7 +67,10 @@ public class TransportDeleteAlertAction extends TransportMasterNodeOperationActi
@Override @Override
protected ClusterBlockException checkBlock(DeleteAlertRequest request, ClusterState state) { protected ClusterBlockException checkBlock(DeleteAlertRequest request, ClusterState state) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.WRITE, new String[]{AlertsStore.ALERT_INDEX, AlertActionManager.ALERT_HISTORY_INDEX}); String[] indices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), AlertActionManager.ALERT_HISTORY_INDEX_PREFIX + "*");
List<String> indicesToCheck = new ArrayList<>(Arrays.asList(indices));
indicesToCheck.add(AlertsStore.ALERT_INDEX);
return state.blocks().indicesBlockedException(ClusterBlockLevel.WRITE, indicesToCheck.toArray(new String[indicesToCheck.size()]));
} }

View File

@ -9,6 +9,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.alerts.AlertManager; import org.elasticsearch.alerts.AlertManager;
import org.elasticsearch.alerts.AlertsStore; import org.elasticsearch.alerts.AlertsStore;
@ -22,6 +23,10 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/** /**
*/ */
public class TransportPutAlertAction extends TransportMasterNodeOperationAction<PutAlertRequest, PutAlertResponse> { public class TransportPutAlertAction extends TransportMasterNodeOperationAction<PutAlertRequest, PutAlertResponse> {
@ -63,7 +68,10 @@ public class TransportPutAlertAction extends TransportMasterNodeOperationAction<
@Override @Override
protected ClusterBlockException checkBlock(PutAlertRequest request, ClusterState state) { protected ClusterBlockException checkBlock(PutAlertRequest request, ClusterState state) {
request.beforeLocalFork(); // This is the best place to make the alert source safe request.beforeLocalFork(); // This is the best place to make the alert source safe
return state.blocks().indicesBlockedException(ClusterBlockLevel.WRITE, new String[]{AlertsStore.ALERT_INDEX, AlertActionManager.ALERT_HISTORY_INDEX}); String[] indices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), AlertActionManager.ALERT_HISTORY_INDEX_PREFIX + "*");
List<String> indicesToCheck = new ArrayList<>(Arrays.asList(indices));
indicesToCheck.add(AlertsStore.ALERT_INDEX);
return state.blocks().indicesBlockedException(ClusterBlockLevel.WRITE, indicesToCheck.toArray(new String[indicesToCheck.size()]));
} }

View File

@ -1,5 +1,5 @@
{ {
"template": ".alert_history", "template": ".alert_history*",
"order": 0, "order": 0,
"settings": { "settings": {
"number_of_shards": 1, "number_of_shards": 1,

View File

@ -16,7 +16,7 @@
"last_fire_time": { "last_fire_time": {
"type": "date" "type": "date"
}, },
"last_action_fire": { "last_alert_executed": {
"type": "date" "type": "date"
}, },
"last_action_executed": { "last_action_executed": {

View File

@ -5,7 +5,6 @@
*/ */
package org.elasticsearch.alerts; package org.elasticsearch.alerts;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.IndicesOptions;
@ -17,7 +16,6 @@ import org.elasticsearch.alerts.transport.actions.stats.AlertsStatsResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.hppc.cursors.ObjectObjectCursor; import org.elasticsearch.common.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
@ -38,8 +36,10 @@ import java.util.*;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.boolQuery; import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
import static org.elasticsearch.index.query.QueryBuilders.matchQuery; import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.core.Is.is; import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsNot.not;
/** /**
*/ */
@ -83,6 +83,10 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
// Clear all internal alerting state for the next test method: // Clear all internal alerting state for the next test method:
logger.info("[{}#{}]: clearing alerts", getTestClass().getSimpleName(), getTestName()); logger.info("[{}#{}]: clearing alerts", getTestClass().getSimpleName(), getTestName());
stopAlerting(); stopAlerting();
client().admin().indices().prepareDelete(AlertsStore.ALERT_INDEX, AlertActionManager.ALERT_HISTORY_INDEX_PREFIX + "*")
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.get();
startAlerting();
} }
protected BytesReference createAlertSource(String cron, SearchRequest request, String scriptTrigger) throws IOException { protected BytesReference createAlertSource(String cron, SearchRequest request, String scriptTrigger) throws IOException {
@ -118,6 +122,7 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
return internalTestCluster().getInstance(AlertsClient.class); return internalTestCluster().getInstance(AlertsClient.class);
} }
protected void assertAlertTriggered(final String alertName, final long minimumExpectedAlertActionsWithActionPerformed) throws Exception { protected void assertAlertTriggered(final String alertName, final long minimumExpectedAlertActionsWithActionPerformed) throws Exception {
assertAlertTriggered(alertName, minimumExpectedAlertActionsWithActionPerformed, true); assertAlertTriggered(alertName, minimumExpectedAlertActionsWithActionPerformed, true);
} }
@ -126,15 +131,7 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
assertBusy(new Runnable() { assertBusy(new Runnable() {
@Override @Override
public void run() { public void run() {
// The alerthistory index gets created in the background when the first alert fires, so we to check first is this index is created and shards are started SearchResponse searchResponse = client().prepareSearch(AlertActionManager.ALERT_HISTORY_INDEX_PREFIX + "*")
IndicesExistsResponse indicesExistsResponse = client().admin().indices().prepareExists(AlertActionManager.ALERT_HISTORY_INDEX).get();
assertThat(indicesExistsResponse.isExists(), is(true));
ClusterState state = client().admin().cluster().prepareState().get().getState();
IndexRoutingTable routingTable = state.getRoutingTable().index(AlertActionManager.ALERT_HISTORY_INDEX);
assertThat(routingTable, notNullValue());
assertThat(routingTable.allPrimaryShardsActive(), is(true));
SearchResponse searchResponse = client().prepareSearch(AlertActionManager.ALERT_HISTORY_INDEX)
.setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(boolQuery().must(matchQuery("alert_name", alertName)).must(matchQuery("state", AlertActionState.ACTION_PERFORMED.toString()))) .setQuery(boolQuery().must(matchQuery("alert_name", alertName)).must(matchQuery("state", AlertActionState.ACTION_PERFORMED.toString())))
.get(); .get();
@ -147,7 +144,7 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
} }
protected long findNumberOfPerformedActions(String alertName) { protected long findNumberOfPerformedActions(String alertName) {
SearchResponse searchResponse = client().prepareSearch(AlertActionManager.ALERT_HISTORY_INDEX) SearchResponse searchResponse = client().prepareSearch(AlertActionManager.ALERT_HISTORY_INDEX_PREFIX + "*")
.setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(boolQuery().must(matchQuery("alert_name", alertName)).must(matchQuery("state", AlertActionState.ACTION_PERFORMED.toString()))) .setQuery(boolQuery().must(matchQuery("alert_name", alertName)).must(matchQuery("state", AlertActionState.ACTION_PERFORMED.toString())))
.get(); .get();
@ -158,15 +155,7 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
assertBusy(new Runnable() { assertBusy(new Runnable() {
@Override @Override
public void run() { public void run() {
// The alerthistory index gets created in the background when the first alert fires, so we to check first is this index is created and shards are started SearchResponse searchResponse = client().prepareSearch(AlertActionManager.ALERT_HISTORY_INDEX_PREFIX + "*")
IndicesExistsResponse indicesExistsResponse = client().admin().indices().prepareExists(AlertActionManager.ALERT_HISTORY_INDEX).get();
assertThat(indicesExistsResponse.isExists(), is(true));
ClusterState state = client().admin().cluster().prepareState().get().getState();
IndexRoutingTable routingTable = state.getRoutingTable().index(AlertActionManager.ALERT_HISTORY_INDEX);
assertThat(routingTable, notNullValue());
assertThat(routingTable.allPrimaryShardsActive(), is(true));
SearchResponse searchResponse = client().prepareSearch(AlertActionManager.ALERT_HISTORY_INDEX)
.setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(boolQuery().must(matchQuery("alert_name", alertName)).must(matchQuery("state", AlertActionState.NO_ACTION_NEEDED.toString()))) .setQuery(boolQuery().must(matchQuery("alert_name", alertName)).must(matchQuery("state", AlertActionState.NO_ACTION_NEEDED.toString())))
.get(); .get();

View File

@ -50,7 +50,7 @@ public class AlertSerializationTest extends ElasticsearchIntegrationTest {
Alert parsedAlert = alertsStore.parseAlert("test-serialization", jsonBuilder.bytes()); Alert parsedAlert = alertsStore.parseAlert("test-serialization", jsonBuilder.bytes());
assertEquals(parsedAlert.version(), alert.version()); assertEquals(parsedAlert.version(), alert.version());
assertEquals(parsedAlert.actions(), alert.actions()); assertEquals(parsedAlert.actions(), alert.actions());
assertEquals(parsedAlert.lastActionFire().getMillis(), alert.lastActionFire().getMillis()); assertEquals(parsedAlert.lastExecuteTime().getMillis(), alert.lastExecuteTime().getMillis());
assertEquals(parsedAlert.schedule(), alert.schedule()); assertEquals(parsedAlert.schedule(), alert.schedule());
assertEquals(parsedAlert.getSearchRequest().source(), alert.getSearchRequest().source()); assertEquals(parsedAlert.getSearchRequest().source(), alert.getSearchRequest().source());
assertEquals(parsedAlert.trigger(), alert.trigger()); assertEquals(parsedAlert.trigger(), alert.trigger());

View File

@ -56,8 +56,8 @@ public class AlertThrottleTests extends AbstractAlertingTests {
alert.trigger(new ScriptedTrigger("hits.total > 0", ScriptService.ScriptType.INLINE, "groovy")); alert.trigger(new ScriptedTrigger("hits.total > 0", ScriptService.ScriptType.INLINE, "groovy"));
alert.actions().add(new IndexAlertAction("action-index", "action-type")); alert.actions().add(new IndexAlertAction("action-index", "action-type"));
alert.schedule( "0/5 * * * * ? *"); alert.schedule( "0/5 * * * * ? *");
alert.lastActionFire(new DateTime());
alert.lastExecuteTime(new DateTime());
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder(); XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
alert.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS); alert.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
@ -105,7 +105,7 @@ public class AlertThrottleTests extends AbstractAlertingTests {
assertThat(parsedAlert.getAckState(), equalTo(AlertAckState.NOT_TRIGGERED)); assertThat(parsedAlert.getAckState(), equalTo(AlertAckState.NOT_TRIGGERED));
CountResponse countOfThrottledActions = client() CountResponse countOfThrottledActions = client()
.prepareCount(AlertActionManager.ALERT_HISTORY_INDEX) .prepareCount(AlertActionManager.ALERT_HISTORY_INDEX_PREFIX + "*")
.setQuery(QueryBuilders.matchQuery(AlertActionManager.STATE, AlertActionState.THROTTLED.toString())) .setQuery(QueryBuilders.matchQuery(AlertActionManager.STATE, AlertActionState.THROTTLED.toString()))
.get(); .get();
assertThat(countOfThrottledActions.getCount(), greaterThan(0L)); assertThat(countOfThrottledActions.getCount(), greaterThan(0L));
@ -128,7 +128,7 @@ public class AlertThrottleTests extends AbstractAlertingTests {
alert.trigger(new ScriptedTrigger("hits.total > 0", ScriptService.ScriptType.INLINE, "groovy")); alert.trigger(new ScriptedTrigger("hits.total > 0", ScriptService.ScriptType.INLINE, "groovy"));
alert.actions().add(new IndexAlertAction("action-index", "action-type")); alert.actions().add(new IndexAlertAction("action-index", "action-type"));
alert.schedule("0/5 * * * * ? *"); alert.schedule("0/5 * * * * ? *");
alert.lastActionFire(new DateTime()); alert.lastExecuteTime(new DateTime());
alert.setThrottlePeriod(new TimeValue(10, TimeUnit.SECONDS)); alert.setThrottlePeriod(new TimeValue(10, TimeUnit.SECONDS));
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder(); XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
@ -149,7 +149,7 @@ public class AlertThrottleTests extends AbstractAlertingTests {
.get(); .get();
if (countResponse.getCount() != 1){ if (countResponse.getCount() != 1){
SearchResponse actionResponse = client().prepareSearch(AlertActionManager.ALERT_HISTORY_INDEX) SearchResponse actionResponse = client().prepareSearch(AlertActionManager.ALERT_HISTORY_INDEX_PREFIX + "*")
.setQuery(matchAllQuery()) .setQuery(matchAllQuery())
.get(); .get();
for (SearchHit hit : actionResponse.getHits()) { for (SearchHit hit : actionResponse.getHits()) {
@ -176,7 +176,7 @@ public class AlertThrottleTests extends AbstractAlertingTests {
}); });
CountResponse countOfThrottledActions = client() CountResponse countOfThrottledActions = client()
.prepareCount(AlertActionManager.ALERT_HISTORY_INDEX) .prepareCount(AlertActionManager.ALERT_HISTORY_INDEX_PREFIX + "*")
.setQuery(QueryBuilders.matchQuery(AlertActionManager.STATE, AlertActionState.THROTTLED.toString())) .setQuery(QueryBuilders.matchQuery(AlertActionManager.STATE, AlertActionState.THROTTLED.toString()))
.get(); .get();
assertThat(countOfThrottledActions.getCount(), greaterThan(0L)); assertThat(countOfThrottledActions.getCount(), greaterThan(0L));

View File

@ -73,12 +73,19 @@ public class BootStrapTest extends AbstractAlertingTests {
new TimeValue(0), new TimeValue(0),
AlertAckState.NOT_ACKABLE); AlertAckState.NOT_ACKABLE);
AlertActionEntry entry = new AlertActionEntry(alert, new DateTime(), new DateTime(), AlertActionState.SEARCH_NEEDED); DateTime scheduledFireTime = new DateTime();
IndexResponse indexResponse = client().prepareIndex(AlertActionManager.ALERT_HISTORY_INDEX, AlertActionManager.ALERT_HISTORY_TYPE, entry.getId()) AlertActionEntry entry = new AlertActionEntry(alert, scheduledFireTime, scheduledFireTime, AlertActionState.SEARCH_NEEDED);
String actionHistoryIndex = AlertActionManager.getAlertHistoryIndexNameForTime(scheduledFireTime);
createIndex(actionHistoryIndex);
ensureGreen(actionHistoryIndex);
IndexResponse indexResponse = client().prepareIndex(actionHistoryIndex, AlertActionManager.ALERT_HISTORY_TYPE, entry.getId())
.setConsistencyLevel(WriteConsistencyLevel.ALL) .setConsistencyLevel(WriteConsistencyLevel.ALL)
.setSource(XContentFactory.jsonBuilder().value(entry)) .setSource(XContentFactory.jsonBuilder().value(entry))
.get(); .get();
assertTrue(indexResponse.isCreated()); assertTrue(indexResponse.isCreated());
client().admin().indices().prepareRefresh(actionHistoryIndex).get();
stopAlerting(); stopAlerting();
startAlerting(); startAlerting();

View File

@ -102,10 +102,8 @@ public class AlertActionsTest extends AbstractAlertingTests {
@Test @Test
public void testAlertActions() throws Exception { public void testAlertActions() throws Exception {
createIndex("my-index"); createIndex("my-index");
createIndex(AlertsStore.ALERT_INDEX);
createIndex(AlertActionManager.ALERT_HISTORY_INDEX);
ensureGreen("my-index", AlertsStore.ALERT_INDEX, AlertActionManager.ALERT_HISTORY_INDEX); ensureGreen("my-index");
client().preparePutIndexedScript() client().preparePutIndexedScript()
.setScriptLang("mustache") .setScriptLang("mustache")