mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-27 23:49:13 +00:00
Merge pull request elastic/elasticsearch#49 from elasticsearch/timebased_histroy
Timebased history index Original commit: elastic/x-pack-elasticsearch@0be1188599
This commit is contained in:
commit
c55ebc29e2
@ -24,7 +24,7 @@ public class Alert implements ToXContent {
|
||||
private AlertTrigger trigger;
|
||||
private List<AlertAction> actions;
|
||||
private String schedule;
|
||||
private DateTime lastActionFire;
|
||||
private DateTime lastExecuteTime;
|
||||
private long version;
|
||||
private TimeValue throttlePeriod = new TimeValue(0);
|
||||
private DateTime timeLastActionExecuted = null;
|
||||
@ -34,13 +34,14 @@ public class Alert implements ToXContent {
|
||||
actions = new ArrayList<>();
|
||||
}
|
||||
|
||||
public Alert(String alertName, SearchRequest searchRequest, AlertTrigger trigger, List<AlertAction> actions, String schedule, DateTime lastActionFire, long version, TimeValue throttlePeriod, AlertAckState ackState) {
|
||||
|
||||
public Alert(String alertName, SearchRequest searchRequest, AlertTrigger trigger, List<AlertAction> actions, String schedule, DateTime lastExecuteTime, long version, TimeValue throttlePeriod, AlertAckState ackState) {
|
||||
this.alertName = alertName;
|
||||
this.searchRequest = searchRequest;
|
||||
this.trigger = trigger;
|
||||
this.actions = actions;
|
||||
this.schedule = schedule;
|
||||
this.lastActionFire = lastActionFire;
|
||||
this.lastExecuteTime = lastExecuteTime;
|
||||
this.version = version;
|
||||
this.throttlePeriod = throttlePeriod;
|
||||
this.ackState = ackState;
|
||||
@ -59,8 +60,8 @@ public class Alert implements ToXContent {
|
||||
builder.field(AlertsStore.LAST_ACTION_EXECUTED_FIELD.getPreferredName(), timeLastActionExecuted);
|
||||
}
|
||||
|
||||
if (lastActionFire != null) {
|
||||
builder.field(AlertsStore.LAST_ACTION_FIRE.getPreferredName(), lastActionFire);
|
||||
if (lastExecuteTime != null) {
|
||||
builder.field(AlertsStore.LAST_ACTION_FIRE.getPreferredName(), lastExecuteTime);
|
||||
}
|
||||
|
||||
if (actions != null && !actions.isEmpty()) {
|
||||
@ -85,12 +86,12 @@ public class Alert implements ToXContent {
|
||||
/**
|
||||
* @return The last time this alert ran.
|
||||
*/
|
||||
public DateTime lastActionFire() {
|
||||
return lastActionFire;
|
||||
public DateTime lastExecuteTime() {
|
||||
return lastExecuteTime;
|
||||
}
|
||||
|
||||
public void lastActionFire(DateTime lastActionFire) {
|
||||
this.lastActionFire = lastActionFire;
|
||||
public void lastExecuteTime(DateTime lastActionFire) {
|
||||
this.lastExecuteTime = lastActionFire;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -142,24 +142,23 @@ public class AlertManager extends AbstractComponent {
|
||||
}
|
||||
TriggerResult triggerResult = triggerManager.isTriggered(alert, entry.getScheduledTime(), entry.getFireTime());
|
||||
entry.setSearchResponse(triggerResult.getResponse());
|
||||
|
||||
if (triggerResult.isTriggered()) {
|
||||
entry.setTriggered(true);
|
||||
if (!isActionThrottled(alert)) {
|
||||
actionRegistry.doAction(alert, triggerResult);
|
||||
alert.setTimeLastActionExecuted(entry.getFireTime());
|
||||
alert.setTimeLastActionExecuted(entry.getScheduledTime());
|
||||
if (alert.getAckState() == AlertAckState.NOT_TRIGGERED) {
|
||||
alert.setAckState(AlertAckState.NEEDS_ACK);
|
||||
}
|
||||
} else {
|
||||
entry.setState(AlertActionState.THROTTLED);
|
||||
}
|
||||
alert.lastActionFire(entry.getFireTime());
|
||||
alertsStore.updateAlert(alert);
|
||||
|
||||
} else if (alert.getAckState() == AlertAckState.ACKED) {
|
||||
alert.setAckState(AlertAckState.NOT_TRIGGERED);
|
||||
alertsStore.updateAlert(alert);
|
||||
}
|
||||
alert.lastExecuteTime(entry.getFireTime());
|
||||
alertsStore.updateAlert(alert);
|
||||
return triggerResult;
|
||||
} finally {
|
||||
alertLock.release(entry.getAlertName());
|
||||
|
@ -51,7 +51,7 @@ public class AlertsStore extends AbstractComponent {
|
||||
public static final ParseField SCHEDULE_FIELD = new ParseField("schedule");
|
||||
public static final ParseField TRIGGER_FIELD = new ParseField("trigger");
|
||||
public static final ParseField ACTION_FIELD = new ParseField("actions");
|
||||
public static final ParseField LAST_ACTION_FIRE = new ParseField("last_action_fire");
|
||||
public static final ParseField LAST_ACTION_FIRE = new ParseField("last_alert_executed");
|
||||
public static final ParseField REQUEST_FIELD = new ParseField("request");
|
||||
public static final ParseField THROTTLE_PERIOD_FIELD = new ParseField("throttle_period");
|
||||
public static final ParseField LAST_ACTION_EXECUTED_FIELD = new ParseField("last_action_executed");
|
||||
@ -250,7 +250,7 @@ public class AlertsStore extends AbstractComponent {
|
||||
if (SCHEDULE_FIELD.match(currentFieldName)) {
|
||||
alert.schedule(parser.textOrNull());
|
||||
} else if (LAST_ACTION_FIRE.match(currentFieldName)) {
|
||||
alert.lastActionFire(DateTime.parse(parser.textOrNull()));
|
||||
alert.lastExecuteTime(DateTime.parse(parser.textOrNull()));
|
||||
} else if (LAST_ACTION_EXECUTED_FIELD.match(currentFieldName)) {
|
||||
alert.setTimeLastActionExecuted(DateTime.parse(parser.textOrNull()));
|
||||
} else if (THROTTLE_PERIOD_FIELD.match(currentFieldName)) {
|
||||
@ -268,8 +268,8 @@ public class AlertsStore extends AbstractComponent {
|
||||
throw new ElasticsearchException("Error during parsing alert", e);
|
||||
}
|
||||
|
||||
if (alert.lastActionFire() == null) {
|
||||
alert.lastActionFire(new DateTime(0));
|
||||
if (alert.lastExecuteTime() == null) {
|
||||
alert.lastExecuteTime(new DateTime(0));
|
||||
}
|
||||
|
||||
if (alert.schedule() == null) {
|
||||
|
@ -13,6 +13,7 @@ import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.search.SearchType;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.alerts.*;
|
||||
import org.elasticsearch.alerts.plugin.AlertsPlugin;
|
||||
import org.elasticsearch.alerts.triggers.TriggerManager;
|
||||
@ -24,6 +25,8 @@ import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.joda.time.DateTime;
|
||||
import org.elasticsearch.common.joda.time.format.DateTimeFormat;
|
||||
import org.elasticsearch.common.joda.time.format.DateTimeFormatter;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
@ -54,7 +57,8 @@ public class AlertActionManager extends AbstractComponent {
|
||||
public static final String ACTIONS_FIELD = "actions";
|
||||
public static final String STATE = "state";
|
||||
|
||||
public static final String ALERT_HISTORY_INDEX = ".alert_history";
|
||||
public static final String ALERT_HISTORY_INDEX_PREFIX = ".alert_history_";
|
||||
public static final DateTimeFormatter alertHistoryIndexTimeFormat = DateTimeFormat.forPattern("YYYY-MM-dd");
|
||||
public static final String ALERT_HISTORY_TYPE = "alerthistory";
|
||||
|
||||
private static AlertActionEntry END_ENTRY = new AlertActionEntry();
|
||||
@ -98,29 +102,34 @@ public class AlertActionManager extends AbstractComponent {
|
||||
if (started.get()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
IndexMetaData indexMetaData = state.getMetaData().index(ALERT_HISTORY_INDEX);
|
||||
if (indexMetaData != null) {
|
||||
if (state.routingTable().index(ALERT_HISTORY_INDEX).allPrimaryShardsActive()) {
|
||||
try {
|
||||
loadQueue();
|
||||
} catch (Exception e) {
|
||||
logger.error("Unable to load unfinished jobs into the job queue", e);
|
||||
actionsToBeProcessed.clear();
|
||||
}
|
||||
templateHelper.checkAndUploadIndexTemplate(state, "alerthistory");
|
||||
doStart();
|
||||
return true;
|
||||
} else {
|
||||
logger.info("Not all primary shards of the .alertshistory index are started");
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
String[] indices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), ALERT_HISTORY_INDEX_PREFIX + "*");
|
||||
if (indices.length == 0) {
|
||||
logger.info("No previous .alerthistory index, skip loading of alert actions");
|
||||
templateHelper.checkAndUploadIndexTemplate(state, "alerthistory");
|
||||
doStart();
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
for (String index : indices) {
|
||||
IndexMetaData indexMetaData = state.getMetaData().index(index);
|
||||
if (indexMetaData != null) {
|
||||
if (!state.routingTable().index(index).allPrimaryShardsActive()) {
|
||||
logger.info("Not all primary shards of the [{}] index are started", index);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
loadQueue();
|
||||
} catch (Exception e) {
|
||||
logger.error("Unable to load unfinished jobs into the job queue", e);
|
||||
actionsToBeProcessed.clear();
|
||||
}
|
||||
templateHelper.checkAndUploadIndexTemplate(state, "alerthistory");
|
||||
doStart();
|
||||
return true;
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
@ -135,6 +144,15 @@ public class AlertActionManager extends AbstractComponent {
|
||||
return started.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculates the correct alert history index name for a given time using alertHistoryIndexTimeFormat
|
||||
*/
|
||||
public static String getAlertHistoryIndexNameForTime(DateTime time) {
|
||||
StringBuffer sb = new StringBuffer(ALERT_HISTORY_INDEX_PREFIX);
|
||||
alertHistoryIndexTimeFormat.printTo(sb, time);
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
private void doStart() {
|
||||
logger.info("Starting job queue");
|
||||
if (started.compareAndSet(false, true)) {
|
||||
@ -143,9 +161,9 @@ public class AlertActionManager extends AbstractComponent {
|
||||
}
|
||||
|
||||
public void loadQueue() {
|
||||
client.admin().indices().refresh(new RefreshRequest(ALERT_HISTORY_INDEX)).actionGet();
|
||||
client.admin().indices().refresh(new RefreshRequest(ALERT_HISTORY_INDEX_PREFIX + "*")).actionGet();
|
||||
|
||||
SearchResponse response = client.prepareSearch(ALERT_HISTORY_INDEX)
|
||||
SearchResponse response = client.prepareSearch(ALERT_HISTORY_INDEX_PREFIX + "*")
|
||||
.setQuery(QueryBuilders.termQuery(STATE, AlertActionState.SEARCH_NEEDED.toString()))
|
||||
.setSearchType(SearchType.SCAN)
|
||||
.setScroll(scrollTimeout)
|
||||
@ -241,7 +259,7 @@ public class AlertActionManager extends AbstractComponent {
|
||||
public void addAlertAction(Alert alert, DateTime scheduledFireTime, DateTime fireTime) throws IOException {
|
||||
ensureStarted();
|
||||
AlertActionEntry entry = new AlertActionEntry(alert, scheduledFireTime, fireTime, AlertActionState.SEARCH_NEEDED);
|
||||
IndexResponse response = client.prepareIndex(ALERT_HISTORY_INDEX, ALERT_HISTORY_TYPE, entry.getId())
|
||||
IndexResponse response = client.prepareIndex(getAlertHistoryIndexNameForTime(scheduledFireTime), ALERT_HISTORY_TYPE, entry.getId())
|
||||
.setSource(XContentFactory.jsonBuilder().value(entry))
|
||||
.setOpType(IndexRequest.OpType.CREATE)
|
||||
.get();
|
||||
@ -264,7 +282,7 @@ public class AlertActionManager extends AbstractComponent {
|
||||
|
||||
private void updateHistoryEntry(AlertActionEntry entry) throws IOException {
|
||||
ensureStarted();
|
||||
IndexResponse response = client.prepareIndex(ALERT_HISTORY_INDEX, ALERT_HISTORY_TYPE, entry.getId())
|
||||
IndexResponse response = client.prepareIndex(getAlertHistoryIndexNameForTime(entry.getScheduledTime()), ALERT_HISTORY_TYPE, entry.getId())
|
||||
.setSource(XContentFactory.jsonBuilder().value(entry))
|
||||
.get();
|
||||
entry.setVersion(response.getVersion());
|
||||
|
@ -71,7 +71,7 @@ public class IndexAlertActionFactory implements AlertActionFactory {
|
||||
XContentBuilder resultBuilder = XContentFactory.jsonBuilder().prettyPrint();
|
||||
resultBuilder.startObject();
|
||||
resultBuilder.field("response", result.getResponse());
|
||||
resultBuilder.field("timestamp", alert.lastActionFire()); ///@TODO FIXME the firetime should be in the result ?
|
||||
resultBuilder.field("timestamp", alert.lastExecuteTime()); ///@TODO FIXME the firetime should be in the result ?
|
||||
resultBuilder.endObject();
|
||||
indexRequest.source(resultBuilder);
|
||||
} catch (IOException ie) {
|
||||
|
@ -11,7 +11,6 @@ import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
|
||||
import org.elasticsearch.alerts.AlertManager;
|
||||
import org.elasticsearch.alerts.AlertsStore;
|
||||
import org.elasticsearch.alerts.actions.AlertActionManager;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
@ -62,7 +61,7 @@ public class TransportAckAlertAction extends TransportMasterNodeOperationAction<
|
||||
|
||||
@Override
|
||||
protected ClusterBlockException checkBlock(AckAlertRequest request, ClusterState state) {
|
||||
return state.blocks().indicesBlockedException(ClusterBlockLevel.WRITE, new String[]{AlertsStore.ALERT_INDEX, AlertActionManager.ALERT_HISTORY_INDEX});
|
||||
return state.blocks().indicesBlockedException(ClusterBlockLevel.WRITE, new String[]{AlertsStore.ALERT_INDEX});
|
||||
}
|
||||
|
||||
|
||||
|
@ -11,7 +11,6 @@ import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
|
||||
import org.elasticsearch.alerts.AlertManager;
|
||||
import org.elasticsearch.alerts.AlertsStore;
|
||||
import org.elasticsearch.alerts.actions.AlertActionManager;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
@ -62,7 +61,7 @@ public class TransportDeleteAlertAction extends TransportMasterNodeOperationActi
|
||||
|
||||
@Override
|
||||
protected ClusterBlockException checkBlock(DeleteAlertRequest request, ClusterState state) {
|
||||
return state.blocks().indicesBlockedException(ClusterBlockLevel.WRITE, new String[]{AlertsStore.ALERT_INDEX, AlertActionManager.ALERT_HISTORY_INDEX});
|
||||
return state.blocks().indicesBlockedException(ClusterBlockLevel.WRITE, new String[]{AlertsStore.ALERT_INDEX});
|
||||
}
|
||||
|
||||
|
||||
|
@ -12,7 +12,6 @@ import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
|
||||
import org.elasticsearch.alerts.AlertManager;
|
||||
import org.elasticsearch.alerts.AlertsStore;
|
||||
import org.elasticsearch.alerts.actions.AlertActionManager;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
@ -63,8 +62,7 @@ public class TransportPutAlertAction extends TransportMasterNodeOperationAction<
|
||||
@Override
|
||||
protected ClusterBlockException checkBlock(PutAlertRequest request, ClusterState state) {
|
||||
request.beforeLocalFork(); // This is the best place to make the alert source safe
|
||||
return state.blocks().indicesBlockedException(ClusterBlockLevel.WRITE, new String[]{AlertsStore.ALERT_INDEX, AlertActionManager.ALERT_HISTORY_INDEX});
|
||||
|
||||
return state.blocks().indicesBlockedException(ClusterBlockLevel.WRITE, new String[]{AlertsStore.ALERT_INDEX});
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,5 +1,5 @@
|
||||
{
|
||||
"template": ".alert_history",
|
||||
"template": ".alert_history*",
|
||||
"order": 0,
|
||||
"settings": {
|
||||
"number_of_shards": 1,
|
||||
|
@ -16,7 +16,7 @@
|
||||
"last_fire_time": {
|
||||
"type": "date"
|
||||
},
|
||||
"last_action_fire": {
|
||||
"last_alert_executed": {
|
||||
"type": "date"
|
||||
},
|
||||
"last_action_executed": {
|
||||
|
@ -5,7 +5,6 @@
|
||||
*/
|
||||
package org.elasticsearch.alerts;
|
||||
|
||||
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
@ -17,7 +16,6 @@ import org.elasticsearch.alerts.transport.actions.stats.AlertsStatsResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.hppc.cursors.ObjectObjectCursor;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
@ -38,8 +36,10 @@ import java.util.*;
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
|
||||
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.hamcrest.core.IsNot.not;
|
||||
|
||||
/**
|
||||
*/
|
||||
@ -83,6 +83,10 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
|
||||
// Clear all internal alerting state for the next test method:
|
||||
logger.info("[{}#{}]: clearing alerts", getTestClass().getSimpleName(), getTestName());
|
||||
stopAlerting();
|
||||
client().admin().indices().prepareDelete(AlertsStore.ALERT_INDEX, AlertActionManager.ALERT_HISTORY_INDEX_PREFIX + "*")
|
||||
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
|
||||
.get();
|
||||
startAlerting();
|
||||
}
|
||||
|
||||
protected BytesReference createAlertSource(String cron, SearchRequest request, String scriptTrigger) throws IOException {
|
||||
@ -118,6 +122,7 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
|
||||
return internalTestCluster().getInstance(AlertsClient.class);
|
||||
}
|
||||
|
||||
|
||||
protected void assertAlertTriggered(final String alertName, final long minimumExpectedAlertActionsWithActionPerformed) throws Exception {
|
||||
assertAlertTriggered(alertName, minimumExpectedAlertActionsWithActionPerformed, true);
|
||||
}
|
||||
@ -126,15 +131,7 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
// The alerthistory index gets created in the background when the first alert fires, so we to check first is this index is created and shards are started
|
||||
IndicesExistsResponse indicesExistsResponse = client().admin().indices().prepareExists(AlertActionManager.ALERT_HISTORY_INDEX).get();
|
||||
assertThat(indicesExistsResponse.isExists(), is(true));
|
||||
ClusterState state = client().admin().cluster().prepareState().get().getState();
|
||||
IndexRoutingTable routingTable = state.getRoutingTable().index(AlertActionManager.ALERT_HISTORY_INDEX);
|
||||
assertThat(routingTable, notNullValue());
|
||||
assertThat(routingTable.allPrimaryShardsActive(), is(true));
|
||||
|
||||
SearchResponse searchResponse = client().prepareSearch(AlertActionManager.ALERT_HISTORY_INDEX)
|
||||
SearchResponse searchResponse = client().prepareSearch(AlertActionManager.ALERT_HISTORY_INDEX_PREFIX + "*")
|
||||
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
|
||||
.setQuery(boolQuery().must(matchQuery("alert_name", alertName)).must(matchQuery("state", AlertActionState.ACTION_PERFORMED.toString())))
|
||||
.get();
|
||||
@ -147,7 +144,7 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
|
||||
}
|
||||
|
||||
protected long findNumberOfPerformedActions(String alertName) {
|
||||
SearchResponse searchResponse = client().prepareSearch(AlertActionManager.ALERT_HISTORY_INDEX)
|
||||
SearchResponse searchResponse = client().prepareSearch(AlertActionManager.ALERT_HISTORY_INDEX_PREFIX + "*")
|
||||
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
|
||||
.setQuery(boolQuery().must(matchQuery("alert_name", alertName)).must(matchQuery("state", AlertActionState.ACTION_PERFORMED.toString())))
|
||||
.get();
|
||||
@ -158,15 +155,7 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
// The alerthistory index gets created in the background when the first alert fires, so we to check first is this index is created and shards are started
|
||||
IndicesExistsResponse indicesExistsResponse = client().admin().indices().prepareExists(AlertActionManager.ALERT_HISTORY_INDEX).get();
|
||||
assertThat(indicesExistsResponse.isExists(), is(true));
|
||||
ClusterState state = client().admin().cluster().prepareState().get().getState();
|
||||
IndexRoutingTable routingTable = state.getRoutingTable().index(AlertActionManager.ALERT_HISTORY_INDEX);
|
||||
assertThat(routingTable, notNullValue());
|
||||
assertThat(routingTable.allPrimaryShardsActive(), is(true));
|
||||
|
||||
SearchResponse searchResponse = client().prepareSearch(AlertActionManager.ALERT_HISTORY_INDEX)
|
||||
SearchResponse searchResponse = client().prepareSearch(AlertActionManager.ALERT_HISTORY_INDEX_PREFIX + "*")
|
||||
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
|
||||
.setQuery(boolQuery().must(matchQuery("alert_name", alertName)).must(matchQuery("state", AlertActionState.NO_ACTION_NEEDED.toString())))
|
||||
.get();
|
||||
|
@ -0,0 +1,28 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.alerts;
|
||||
|
||||
import org.elasticsearch.alerts.actions.AlertActionManager;
|
||||
import org.elasticsearch.common.joda.time.DateTime;
|
||||
import org.elasticsearch.common.joda.time.DateTimeZone;
|
||||
import org.elasticsearch.test.ElasticsearchTestCase;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.hamcrest.core.IsEqual.equalTo;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class ActionHistoryIndexNameTest extends ElasticsearchTestCase {
|
||||
|
||||
@Test
|
||||
public void testActionHistoryNameTest() {
|
||||
assertThat(AlertActionManager.getAlertHistoryIndexNameForTime(new DateTime(0, DateTimeZone.UTC)), equalTo(".alert_history_1970-01-01"));
|
||||
assertThat(AlertActionManager.getAlertHistoryIndexNameForTime(new DateTime(100000000000L, DateTimeZone.UTC)), equalTo(".alert_history_1973-03-03"));
|
||||
assertThat(AlertActionManager.getAlertHistoryIndexNameForTime(new DateTime(1416582852000L, DateTimeZone.UTC)), equalTo(".alert_history_2014-11-21"));
|
||||
assertThat(AlertActionManager.getAlertHistoryIndexNameForTime(new DateTime(2833165811000L, DateTimeZone.UTC)), equalTo(".alert_history_2059-10-12"));
|
||||
}
|
||||
|
||||
}
|
@ -50,7 +50,7 @@ public class AlertSerializationTest extends ElasticsearchIntegrationTest {
|
||||
Alert parsedAlert = alertsStore.parseAlert("test-serialization", jsonBuilder.bytes());
|
||||
assertEquals(parsedAlert.version(), alert.version());
|
||||
assertEquals(parsedAlert.actions(), alert.actions());
|
||||
assertEquals(parsedAlert.lastActionFire().getMillis(), alert.lastActionFire().getMillis());
|
||||
assertEquals(parsedAlert.lastExecuteTime().getMillis(), alert.lastExecuteTime().getMillis());
|
||||
assertEquals(parsedAlert.schedule(), alert.schedule());
|
||||
assertEquals(parsedAlert.getSearchRequest().source(), alert.getSearchRequest().source());
|
||||
assertEquals(parsedAlert.trigger(), alert.trigger());
|
||||
|
@ -56,8 +56,8 @@ public class AlertThrottleTests extends AbstractAlertingTests {
|
||||
alert.trigger(new ScriptedTrigger("hits.total > 0", ScriptService.ScriptType.INLINE, "groovy"));
|
||||
alert.actions().add(new IndexAlertAction("action-index", "action-type"));
|
||||
alert.schedule( "0/5 * * * * ? *");
|
||||
alert.lastActionFire(new DateTime());
|
||||
|
||||
alert.lastExecuteTime(new DateTime());
|
||||
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
|
||||
alert.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
|
||||
|
||||
@ -105,7 +105,7 @@ public class AlertThrottleTests extends AbstractAlertingTests {
|
||||
assertThat(parsedAlert.getAckState(), equalTo(AlertAckState.NOT_TRIGGERED));
|
||||
|
||||
CountResponse countOfThrottledActions = client()
|
||||
.prepareCount(AlertActionManager.ALERT_HISTORY_INDEX)
|
||||
.prepareCount(AlertActionManager.ALERT_HISTORY_INDEX_PREFIX + "*")
|
||||
.setQuery(QueryBuilders.matchQuery(AlertActionManager.STATE, AlertActionState.THROTTLED.toString()))
|
||||
.get();
|
||||
assertThat(countOfThrottledActions.getCount(), greaterThan(0L));
|
||||
@ -128,7 +128,7 @@ public class AlertThrottleTests extends AbstractAlertingTests {
|
||||
alert.trigger(new ScriptedTrigger("hits.total > 0", ScriptService.ScriptType.INLINE, "groovy"));
|
||||
alert.actions().add(new IndexAlertAction("action-index", "action-type"));
|
||||
alert.schedule("0/5 * * * * ? *");
|
||||
alert.lastActionFire(new DateTime());
|
||||
alert.lastExecuteTime(new DateTime());
|
||||
alert.setThrottlePeriod(new TimeValue(10, TimeUnit.SECONDS));
|
||||
|
||||
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
|
||||
@ -149,7 +149,7 @@ public class AlertThrottleTests extends AbstractAlertingTests {
|
||||
.get();
|
||||
|
||||
if (countResponse.getCount() != 1){
|
||||
SearchResponse actionResponse = client().prepareSearch(AlertActionManager.ALERT_HISTORY_INDEX)
|
||||
SearchResponse actionResponse = client().prepareSearch(AlertActionManager.ALERT_HISTORY_INDEX_PREFIX + "*")
|
||||
.setQuery(matchAllQuery())
|
||||
.get();
|
||||
for (SearchHit hit : actionResponse.getHits()) {
|
||||
@ -176,7 +176,7 @@ public class AlertThrottleTests extends AbstractAlertingTests {
|
||||
});
|
||||
|
||||
CountResponse countOfThrottledActions = client()
|
||||
.prepareCount(AlertActionManager.ALERT_HISTORY_INDEX)
|
||||
.prepareCount(AlertActionManager.ALERT_HISTORY_INDEX_PREFIX + "*")
|
||||
.setQuery(QueryBuilders.matchQuery(AlertActionManager.STATE, AlertActionState.THROTTLED.toString()))
|
||||
.get();
|
||||
assertThat(countOfThrottledActions.getCount(), greaterThan(0L));
|
||||
|
@ -22,6 +22,7 @@ import org.elasticsearch.script.ScriptService;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
|
||||
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
|
||||
@ -73,12 +74,19 @@ public class BootStrapTest extends AbstractAlertingTests {
|
||||
new TimeValue(0),
|
||||
AlertAckState.NOT_ACKABLE);
|
||||
|
||||
AlertActionEntry entry = new AlertActionEntry(alert, new DateTime(), new DateTime(), AlertActionState.SEARCH_NEEDED);
|
||||
IndexResponse indexResponse = client().prepareIndex(AlertActionManager.ALERT_HISTORY_INDEX, AlertActionManager.ALERT_HISTORY_TYPE, entry.getId())
|
||||
DateTime scheduledFireTime = new DateTime();
|
||||
AlertActionEntry entry = new AlertActionEntry(alert, scheduledFireTime, scheduledFireTime, AlertActionState.SEARCH_NEEDED);
|
||||
String actionHistoryIndex = AlertActionManager.getAlertHistoryIndexNameForTime(scheduledFireTime);
|
||||
|
||||
createIndex(actionHistoryIndex);
|
||||
ensureGreen(actionHistoryIndex);
|
||||
|
||||
IndexResponse indexResponse = client().prepareIndex(actionHistoryIndex, AlertActionManager.ALERT_HISTORY_TYPE, entry.getId())
|
||||
.setConsistencyLevel(WriteConsistencyLevel.ALL)
|
||||
.setSource(XContentFactory.jsonBuilder().value(entry))
|
||||
.get();
|
||||
assertTrue(indexResponse.isCreated());
|
||||
client().admin().indices().prepareRefresh(actionHistoryIndex).get();
|
||||
|
||||
stopAlerting();
|
||||
startAlerting();
|
||||
@ -90,4 +98,50 @@ public class BootStrapTest extends AbstractAlertingTests {
|
||||
assertThat(response.getAlertActionManagerLargestQueueSize(), equalTo(1L));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBootStrapManyHistoryIndices() throws Exception {
|
||||
int numberOfAlertHistoryEntriesPerIndex = randomIntBetween(5,10);
|
||||
int numberOfAlertHistoryIndices = randomIntBetween(2,8);
|
||||
DateTime now = new DateTime();
|
||||
SearchRequest searchRequest = createTriggerSearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
|
||||
|
||||
for (int i=0; i<numberOfAlertHistoryIndices; ++i) {
|
||||
DateTime historyIndexDate = now.minus((new TimeValue(i, TimeUnit.DAYS)).getMillis());
|
||||
String actionHistoryIndex = AlertActionManager.getAlertHistoryIndexNameForTime(historyIndexDate);
|
||||
createIndex(actionHistoryIndex);
|
||||
ensureGreen(actionHistoryIndex);
|
||||
for (int j=0; j<numberOfAlertHistoryEntriesPerIndex; ++j){
|
||||
Alert alert = new Alert("entryTestAlert" + i + "-" + j,
|
||||
searchRequest,
|
||||
new ScriptedTrigger("hits.total == 1", ScriptService.ScriptType.INLINE, "groovy"),
|
||||
new ArrayList< AlertAction>(),
|
||||
"0 0/5 * * * ? *",
|
||||
new DateTime(),
|
||||
0,
|
||||
true,
|
||||
new TimeValue(0),
|
||||
AlertAckState.NOT_ACKABLE);
|
||||
AlertActionEntry entry = new AlertActionEntry(alert, historyIndexDate, historyIndexDate, AlertActionState.SEARCH_NEEDED);
|
||||
IndexResponse indexResponse = client().prepareIndex(actionHistoryIndex, AlertActionManager.ALERT_HISTORY_TYPE, entry.getId())
|
||||
.setConsistencyLevel(WriteConsistencyLevel.ALL)
|
||||
.setSource(XContentFactory.jsonBuilder().value(entry))
|
||||
.get();
|
||||
assertTrue(indexResponse.isCreated());
|
||||
}
|
||||
client().admin().indices().prepareRefresh(actionHistoryIndex).get();
|
||||
}
|
||||
|
||||
stopAlerting();
|
||||
startAlerting();
|
||||
AlertsStatsResponse response = alertClient().prepareAlertsStats().get();
|
||||
|
||||
assertTrue(response.isAlertActionManagerStarted());
|
||||
assertThat(response.getAlertManagerStarted(), equalTo(State.STARTED));
|
||||
assertThat(response.getNumberOfRegisteredAlerts(), equalTo(0L));
|
||||
assertThat(response.getAlertActionManagerLargestQueueSize(),
|
||||
equalTo((long)(numberOfAlertHistoryEntriesPerIndex*numberOfAlertHistoryIndices)));
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -102,10 +102,8 @@ public class AlertActionsTest extends AbstractAlertingTests {
|
||||
@Test
|
||||
public void testAlertActions() throws Exception {
|
||||
createIndex("my-index");
|
||||
createIndex(AlertsStore.ALERT_INDEX);
|
||||
createIndex(AlertActionManager.ALERT_HISTORY_INDEX);
|
||||
|
||||
ensureGreen("my-index", AlertsStore.ALERT_INDEX, AlertActionManager.ALERT_HISTORY_INDEX);
|
||||
ensureGreen("my-index");
|
||||
|
||||
client().preparePutIndexedScript()
|
||||
.setScriptLang("mustache")
|
||||
|
Loading…
x
Reference in New Issue
Block a user