on going work for improving test coverage
Original commit: elastic/x-pack-elasticsearch@1ccb9c9d9b
This commit is contained in:
parent
89dd5e2599
commit
5be1c5964c
|
@ -7,7 +7,6 @@ package org.elasticsearch.alerts;
|
|||
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.alerts.actions.AlertAction;
|
||||
import org.elasticsearch.alerts.actions.AlertActionFactory;
|
||||
import org.elasticsearch.alerts.actions.AlertActionRegistry;
|
||||
import org.elasticsearch.alerts.triggers.AlertTrigger;
|
||||
import org.elasticsearch.common.io.stream.DataOutputStreamOutput;
|
||||
|
@ -17,7 +16,6 @@ import org.elasticsearch.common.joda.time.DateTime;
|
|||
import org.elasticsearch.common.joda.time.DateTimeZone;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.omg.CORBA.portable.Streamable;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataOutputStream;
|
||||
|
@ -55,11 +53,9 @@ public class Alert implements ToXContent {
|
|||
builder.startObject();
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
searchRequest.writeTo(new DataOutputStreamOutput(new DataOutputStream(out)));
|
||||
builder.field(AlertsStore.SEARCH_REQUEST_FIELD.getPreferredName(), out.toByteArray());
|
||||
if (schedule != null) {
|
||||
builder.field(AlertsStore.SCHEDULE_FIELD.getPreferredName(), schedule);
|
||||
}
|
||||
builder.field(AlertsStore.ENABLED.getPreferredName(), enabled);
|
||||
builder.field(AlertsStore.REQUEST_BINARY_FIELD.getPreferredName(), out.toByteArray());
|
||||
builder.field(AlertsStore.SCHEDULE_FIELD.getPreferredName(), schedule);
|
||||
builder.field(AlertsStore.ENABLE.getPreferredName(), enabled);
|
||||
if (lastActionFire != null) {
|
||||
builder.field(AlertsStore.LAST_ACTION_FIRE.getPreferredName(), lastActionFire);
|
||||
}
|
||||
|
|
|
@ -72,18 +72,6 @@ public class AlertManager extends AbstractComponent {
|
|||
this.scheduler = scheduler;
|
||||
}
|
||||
|
||||
public void clearAndReload() {
|
||||
ensureStarted();
|
||||
try {
|
||||
scheduler.clearAlerts();
|
||||
alertsStore.reload();
|
||||
sendAlertsToScheduler();
|
||||
} catch (Exception e){
|
||||
throw new ElasticsearchException("Failed to refresh alerts",e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public DeleteResponse deleteAlert(String name) throws InterruptedException, ExecutionException {
|
||||
ensureStarted();
|
||||
if (alertsStore.hasAlert(name)) {
|
||||
|
@ -129,34 +117,38 @@ public class AlertManager extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
|
||||
public TriggerResult executeAlert(String alertName, DateTime scheduledFireTime, DateTime fireTime) {
|
||||
public TriggerResult executeAlert(String alertName, DateTime scheduledFireTime, DateTime fireTime) throws IOException {
|
||||
ensureStarted();
|
||||
Alert alert = alertsStore.getAlert(alertName);
|
||||
if (alert == null) {
|
||||
throw new ElasticsearchException("Alert is not available");
|
||||
}
|
||||
try {
|
||||
TriggerResult triggerResult = triggerManager.isTriggered(alert, scheduledFireTime, fireTime);
|
||||
if (triggerResult.isTriggered()) {
|
||||
actionRegistry.doAction(alert, triggerResult);
|
||||
alert.lastActionFire(fireTime);
|
||||
alertsStore.updateAlert(alert);
|
||||
}
|
||||
return triggerResult;
|
||||
} catch (Exception e) {
|
||||
throw new ElasticsearchException("Failed to execute alert [" + alert + "]", e);
|
||||
TriggerResult triggerResult = triggerManager.isTriggered(alert, scheduledFireTime, fireTime);
|
||||
if (triggerResult.isTriggered()) {
|
||||
actionRegistry.doAction(alert, triggerResult);
|
||||
alert.lastActionFire(fireTime);
|
||||
alertsStore.updateAlert(alert);
|
||||
}
|
||||
return triggerResult;
|
||||
}
|
||||
|
||||
|
||||
public void stop() {
|
||||
if (started.compareAndSet(true, false)) {
|
||||
scheduler.stop();
|
||||
alertsStore.stop();
|
||||
actionManager.stop();
|
||||
alertsStore.stop();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* For testing only to clear the alerts between tests.
|
||||
*/
|
||||
public void clear() {
|
||||
scheduler.clearAlerts();
|
||||
alertsStore.clear();
|
||||
}
|
||||
|
||||
private void ensureStarted() {
|
||||
if (!started.get()) {
|
||||
throw new ElasticsearchIllegalStateException("not started");
|
||||
|
|
|
@ -13,8 +13,6 @@ import org.elasticsearch.action.index.IndexResponse;
|
|||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.search.SearchType;
|
||||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
import org.elasticsearch.action.update.UpdateResponse;
|
||||
import org.elasticsearch.alerts.actions.AlertAction;
|
||||
import org.elasticsearch.alerts.actions.AlertActionRegistry;
|
||||
import org.elasticsearch.alerts.triggers.TriggerManager;
|
||||
|
@ -31,11 +29,14 @@ import org.elasticsearch.common.joda.time.DateTime;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.common.xcontent.*;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
@ -49,10 +50,11 @@ public class AlertsStore extends AbstractComponent {
|
|||
|
||||
public static final ParseField SCHEDULE_FIELD = new ParseField("schedule");
|
||||
public static final ParseField TRIGGER_FIELD = new ParseField("trigger");
|
||||
public static final ParseField ACTION_FIELD = new ParseField("action");
|
||||
public static final ParseField ACTION_FIELD = new ParseField("actions");
|
||||
public static final ParseField LAST_ACTION_FIRE = new ParseField("last_action_fire");
|
||||
public static final ParseField ENABLED = new ParseField("enabled");
|
||||
public static final ParseField SEARCH_REQUEST_FIELD = new ParseField("request");
|
||||
public static final ParseField ENABLE = new ParseField("enable");
|
||||
public static final ParseField REQUEST_BINARY_FIELD = new ParseField("request_binary");
|
||||
public static final ParseField REQUEST_FIELD = new ParseField("request");
|
||||
|
||||
private final Client client;
|
||||
private final ThreadPool threadPool;
|
||||
|
@ -104,8 +106,7 @@ public class AlertsStore extends AbstractComponent {
|
|||
/**
|
||||
* Updates the specified alert by making sure that the made changes are persisted.
|
||||
*/
|
||||
public IndexResponse updateAlert(Alert alert) throws IOException {
|
||||
|
||||
public IndexResponse updateAlert(Alert alert) {
|
||||
IndexResponse response = client.prepareIndex(ALERT_INDEX, ALERT_TYPE, alert.alertName())
|
||||
.setSource()
|
||||
.setVersion(alert.version())
|
||||
|
@ -251,17 +252,54 @@ public class AlertsStore extends AbstractComponent {
|
|||
} else if (ACTION_FIELD.match(currentFieldName)) {
|
||||
List<AlertAction> actions = alertActionRegistry.instantiateAlertActions(parser);
|
||||
alert.actions(actions);
|
||||
} else if (REQUEST_FIELD.match(currentFieldName)) {
|
||||
String searchRequestFieldName = null;
|
||||
SearchRequest searchRequest = new SearchRequest();
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
searchRequestFieldName = parser.currentName();
|
||||
} else if (token == XContentParser.Token.START_ARRAY) {
|
||||
switch (searchRequestFieldName) {
|
||||
case "indices":
|
||||
List<String> indices = new ArrayList<>();
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
|
||||
if (token == XContentParser.Token.VALUE_STRING) {
|
||||
indices.add(parser.textOrNull());
|
||||
} else {
|
||||
throw new ElasticsearchIllegalArgumentException("Unexpected token [" + token + "]");
|
||||
}
|
||||
}
|
||||
searchRequest.indices(indices.toArray(new String[indices.size()]));
|
||||
break;
|
||||
default:
|
||||
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + searchRequestFieldName + "]");
|
||||
}
|
||||
} else if (token == XContentParser.Token.START_OBJECT) {
|
||||
switch (searchRequestFieldName) {
|
||||
case "body":
|
||||
XContentBuilder builder = XContentBuilder.builder(parser.contentType().xContent());
|
||||
builder.copyCurrentStructure(parser);
|
||||
searchRequest.source(builder);
|
||||
break;
|
||||
default:
|
||||
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + searchRequestFieldName + "]");
|
||||
}
|
||||
} else {
|
||||
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + searchRequestFieldName + "]");
|
||||
}
|
||||
}
|
||||
alert.setSearchRequest(searchRequest);
|
||||
} else {
|
||||
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]");
|
||||
}
|
||||
} else if (token.isValue()) {
|
||||
if (SCHEDULE_FIELD.match(currentFieldName)) {
|
||||
alert.schedule(parser.textOrNull());
|
||||
} else if (ENABLED.match(currentFieldName)) {
|
||||
} else if (ENABLE.match(currentFieldName)) {
|
||||
alert.enabled(parser.booleanValue());
|
||||
} else if (LAST_ACTION_FIRE.match(currentFieldName)) {
|
||||
alert.lastActionFire(DateTime.parse(parser.textOrNull()));
|
||||
} else if (SEARCH_REQUEST_FIELD.match(currentFieldName)) {
|
||||
} else if (REQUEST_BINARY_FIELD.match(currentFieldName)) {
|
||||
SearchRequest searchRequest = new SearchRequest();
|
||||
searchRequest.readFrom(new BytesStreamInput(parser.binaryValue(), false));
|
||||
alert.setSearchRequest(searchRequest);
|
||||
|
|
|
@ -12,7 +12,6 @@ import org.elasticsearch.action.index.IndexResponse;
|
|||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.search.SearchType;
|
||||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
import org.elasticsearch.alerts.Alert;
|
||||
import org.elasticsearch.alerts.AlertManager;
|
||||
import org.elasticsearch.alerts.AlertsStore;
|
||||
|
@ -30,7 +29,9 @@ import org.elasticsearch.common.io.stream.BytesStreamInput;
|
|||
import org.elasticsearch.common.joda.time.DateTime;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.*;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
@ -48,6 +49,7 @@ public class AlertActionManager extends AbstractComponent {
|
|||
public static final String TRIGGERED_FIELD = "triggered";
|
||||
public static final String FIRE_TIME_FIELD = "fire_time";
|
||||
public static final String SCHEDULED_FIRE_TIME_FIELD = "scheduled_fire_time";
|
||||
public static final String ERROR_MESSAGE = "errorMsg";
|
||||
public static final String TRIGGER_FIELD = "trigger";
|
||||
public static final String REQUEST = "request_binary";
|
||||
public static final String RESPONSE = "response_binary";
|
||||
|
@ -70,8 +72,6 @@ public class AlertActionManager extends AbstractComponent {
|
|||
|
||||
private static AlertActionEntry END_ENTRY = new AlertActionEntry();
|
||||
|
||||
|
||||
|
||||
@Inject
|
||||
public AlertActionManager(Settings settings, Client client, AlertActionRegistry actionRegistry, ThreadPool threadPool, AlertsStore alertsStore) {
|
||||
super(settings);
|
||||
|
@ -128,7 +128,8 @@ public class AlertActionManager extends AbstractComponent {
|
|||
|
||||
public void stop() {
|
||||
if (state.compareAndSet(State.STARTED, State.STOPPED)) {
|
||||
logger.info("Stopping job queue");
|
||||
logger.info("Stopping job queue...");
|
||||
actionsToBeProcessed.clear();
|
||||
actionsToBeProcessed.add(END_ENTRY);
|
||||
}
|
||||
}
|
||||
|
@ -216,6 +217,9 @@ public class AlertActionManager extends AbstractComponent {
|
|||
response.readFrom(new BytesStreamInput(parser.binaryValue(), false));
|
||||
entry.setSearchResponse(response);
|
||||
break;
|
||||
case ERROR_MESSAGE:
|
||||
entry.setErrorMsg(parser.textOrNull());
|
||||
break;
|
||||
case AlertActionState.FIELD_NAME:
|
||||
entry.setEntryState(AlertActionState.fromString(parser.text()));
|
||||
break;
|
||||
|
@ -233,8 +237,6 @@ public class AlertActionManager extends AbstractComponent {
|
|||
}
|
||||
|
||||
public void addAlertAction(Alert alert, DateTime scheduledFireTime, DateTime fireTime) throws IOException {
|
||||
|
||||
|
||||
AlertActionEntry entry = new AlertActionEntry(alert, scheduledFireTime, fireTime, AlertActionState.SEARCH_NEEDED);
|
||||
IndexResponse response = client.prepareIndex(ALERT_HISTORY_INDEX, ALERT_HISTORY_TYPE, entry.getId())
|
||||
.setSource(XContentFactory.jsonBuilder().value(entry))
|
||||
|
@ -276,12 +278,16 @@ public class AlertActionManager extends AbstractComponent {
|
|||
entry.setSearchResponse(trigger.getResponse());
|
||||
updateHistoryEntry(entry, trigger.isTriggered() ? AlertActionState.ACTION_PERFORMED : AlertActionState.NO_ACTION_NEEDED);
|
||||
} catch (Exception e) {
|
||||
logger.error("Failed to execute alert action", e);
|
||||
try {
|
||||
entry.setErrorMsg(e.getMessage());
|
||||
updateHistoryEntry(entry, AlertActionState.ERROR);
|
||||
} catch (IOException ioe) {
|
||||
logger.error("Failed to update action history entry", ioe);
|
||||
if (started()) {
|
||||
logger.error("Failed to execute alert action", e);
|
||||
try {
|
||||
entry.setErrorMsg(e.getMessage());
|
||||
updateHistoryEntry(entry, AlertActionState.ERROR);
|
||||
} catch (IOException ioe) {
|
||||
logger.error("Failed to update action history entry", ioe);
|
||||
}
|
||||
} else {
|
||||
logger.debug("Failed to execute alert action after shutdown", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -63,7 +63,6 @@ public class AlertRestHandler implements RestHandler {
|
|||
private boolean dispatchRequest(RestRequest request, RestChannel restChannel) throws IOException, InterruptedException, ExecutionException {
|
||||
//@TODO : change these direct calls to actions/request/response/listener once we create the java client API
|
||||
if (request.path().contains("/_refresh")) {
|
||||
alertManager.clearAndReload();
|
||||
XContentBuilder builder = getListOfAlerts();
|
||||
restChannel.sendResponse(new BytesRestResponse(OK,builder));
|
||||
return true;
|
||||
|
|
|
@ -18,26 +18,23 @@ import org.quartz.simpl.SimpleJobFactory;
|
|||
|
||||
public class AlertScheduler extends AbstractComponent {
|
||||
|
||||
private final Scheduler scheduler;
|
||||
private volatile Scheduler scheduler;
|
||||
private final AlertManager alertManager;
|
||||
|
||||
@Inject
|
||||
public AlertScheduler(Settings settings, AlertManager alertManager) {
|
||||
super(settings);
|
||||
this.alertManager = alertManager;
|
||||
try {
|
||||
SchedulerFactory schFactory = new StdSchedulerFactory();
|
||||
scheduler = schFactory.getScheduler();
|
||||
scheduler.setJobFactory(new SimpleJobFactory());
|
||||
} catch (SchedulerException e) {
|
||||
throw new ElasticsearchException("Failed to instantiate scheduler", e);
|
||||
}
|
||||
alertManager.setAlertScheduler(this);
|
||||
}
|
||||
|
||||
public void start() {
|
||||
try {
|
||||
logger.info("Starting scheduler");
|
||||
// Can't start a scheduler that has been shutdown, so we need to re-create each time start() is invoked
|
||||
SchedulerFactory schFactory = new StdSchedulerFactory();
|
||||
scheduler = schFactory.getScheduler();
|
||||
scheduler.setJobFactory(new SimpleJobFactory());
|
||||
scheduler.start();
|
||||
} catch (SchedulerException se){
|
||||
logger.error("Failed to start quartz scheduler", se);
|
||||
|
|
|
@ -7,12 +7,7 @@ package org.elasticsearch.alerts.transport.actions.delete;
|
|||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.delete.DeleteResponse;
|
||||
import org.elasticsearch.action.indexedscripts.delete.DeleteIndexedScriptAction;
|
||||
import org.elasticsearch.action.indexedscripts.delete.DeleteIndexedScriptRequest;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.DelegatingActionListener;
|
||||
import org.elasticsearch.action.support.HandledTransportAction;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
|
||||
import org.elasticsearch.alerts.AlertManager;
|
||||
import org.elasticsearch.alerts.AlertsStore;
|
||||
|
@ -23,7 +18,6 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
|
|||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
|
@ -68,9 +62,6 @@ public class TransportDeleteAlertAction extends TransportMasterNodeOperationActi
|
|||
|
||||
@Override
|
||||
protected ClusterBlockException checkBlock(DeleteAlertRequest request, ClusterState state) {
|
||||
if (!alertManager.isStarted()) {
|
||||
return new ClusterBlockException(null);
|
||||
}
|
||||
return state.blocks().indicesBlockedException(ClusterBlockLevel.WRITE, new String[]{AlertsStore.ALERT_INDEX, AlertActionManager.ALERT_HISTORY_INDEX});
|
||||
}
|
||||
|
||||
|
|
|
@ -5,31 +5,18 @@
|
|||
*/
|
||||
package org.elasticsearch.alerts.transport.actions.get;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.get.GetRequest;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.TransportAction;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
|
||||
import org.elasticsearch.alerts.Alert;
|
||||
import org.elasticsearch.alerts.AlertManager;
|
||||
import org.elasticsearch.alerts.AlertsStore;
|
||||
import org.elasticsearch.alerts.actions.AlertActionManager;
|
||||
import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertRequest;
|
||||
import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
/**
|
||||
* Performs the delete operation.
|
||||
* Performs the get operation.
|
||||
*/
|
||||
public class TransportGetAlertAction extends TransportAction<GetAlertRequest, GetAlertResponse> {
|
||||
|
||||
|
|
|
@ -63,9 +63,6 @@ public class TransportIndexAlertAction extends TransportMasterNodeOperationActio
|
|||
|
||||
@Override
|
||||
protected ClusterBlockException checkBlock(IndexAlertRequest request, ClusterState state) {
|
||||
if (!alertManager.isStarted()) {
|
||||
return new ClusterBlockException(null);
|
||||
}
|
||||
return state.blocks().indicesBlockedException(ClusterBlockLevel.WRITE, new String[]{AlertsStore.ALERT_INDEX, AlertActionManager.ALERT_HISTORY_INDEX});
|
||||
|
||||
}
|
||||
|
|
|
@ -49,13 +49,13 @@ public class TriggerManager extends AbstractComponent {
|
|||
this.scheduledFireTimePlaceHolder = settings.get("postfix", "<<<SCHEDULED_FIRE_TIME>>>");
|
||||
}
|
||||
|
||||
public TriggerResult isTriggered(Alert alert, DateTime scheduledFireTime, DateTime fireTime) throws Exception {
|
||||
public TriggerResult isTriggered(Alert alert, DateTime scheduledFireTime, DateTime fireTime) throws IOException {
|
||||
SearchRequest request = prepareTriggerSearch(alert, scheduledFireTime, fireTime);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("For alert [{}] running query for [{}]", alert.alertName(), XContentHelper.convertToJson(request.source(), false, true));
|
||||
}
|
||||
|
||||
SearchResponse response = client.search(request).get();
|
||||
SearchResponse response = client.search(request).actionGet(); // actionGet deals properly with InterruptedException
|
||||
logger.debug("Ran alert [{}] and got hits : [{}]", alert.alertName(), response.getHits().getTotalHits());
|
||||
switch (alert.trigger().triggerType()) {
|
||||
case NUMBER_OF_EVENTS:
|
||||
|
|
|
@ -6,40 +6,39 @@
|
|||
package org.elasticsearch.alerts;
|
||||
|
||||
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
|
||||
import org.elasticsearch.alerts.actions.*;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.alerts.actions.AlertActionManager;
|
||||
import org.elasticsearch.alerts.actions.AlertActionState;
|
||||
import org.elasticsearch.alerts.client.AlertsClient;
|
||||
import org.elasticsearch.alerts.client.AlertsClientInterface;
|
||||
import org.elasticsearch.alerts.plugin.AlertsPlugin;
|
||||
import org.elasticsearch.alerts.transport.actions.index.IndexAlertRequest;
|
||||
import org.elasticsearch.alerts.transport.actions.index.IndexAlertResponse;
|
||||
import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertRequest;
|
||||
import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertResponse;
|
||||
import org.elasticsearch.alerts.triggers.AlertTrigger;
|
||||
import org.elasticsearch.alerts.triggers.ScriptedAlertTrigger;
|
||||
import org.elasticsearch.alerts.triggers.TriggerResult;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
||||
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
|
||||
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
|
||||
/**
|
||||
*/
|
||||
@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.SUITE, numClientNodes = 0, transportClientRatio = 0, maxNumDataNodes = 1, minNumDataNodes = 1, numDataNodes = 1)
|
||||
@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.SUITE, numClientNodes = 0, transportClientRatio = 0, numDataNodes = 1)
|
||||
public class BasicAlertingTest extends ElasticsearchIntegrationTest {
|
||||
|
||||
@Override
|
||||
|
@ -56,99 +55,94 @@ public class BasicAlertingTest extends ElasticsearchIntegrationTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testAlerSchedulerStartsProperly() throws Exception {
|
||||
public void testIndexAlert() throws Exception {
|
||||
AlertsClientInterface alertsClient = alertClient();
|
||||
createIndex("my-index");
|
||||
createIndex(AlertsStore.ALERT_INDEX);
|
||||
createIndex(AlertActionManager.ALERT_HISTORY_INDEX);
|
||||
ensureGreen("my-index", AlertsStore.ALERT_INDEX, AlertActionManager.ALERT_HISTORY_INDEX);
|
||||
|
||||
final AlertManager alertManager = internalCluster().getInstance(AlertManager.class, internalCluster().getMasterName());
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
assertThat(alertManager.isStarted(), is(true));
|
||||
}
|
||||
});
|
||||
final AtomicBoolean alertActionInvoked = new AtomicBoolean(false);
|
||||
final AlertAction alertAction = new AlertAction() {
|
||||
@Override
|
||||
public String getActionName() {
|
||||
return "test";
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean doAction(Alert alert, TriggerResult result) {
|
||||
logger.info("Alert {} invoked: {}", alert.alertName(), result);
|
||||
alertActionInvoked.set(true);
|
||||
return true;
|
||||
}
|
||||
};
|
||||
AlertActionRegistry alertActionRegistry = internalCluster().getInstance(AlertActionRegistry.class, internalCluster().getMasterName());
|
||||
alertActionRegistry.registerAction("test", new AlertActionFactory() {
|
||||
@Override
|
||||
public AlertAction createAction(XContentParser parser) throws IOException {
|
||||
parser.nextToken();
|
||||
return alertAction;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AlertAction readFrom(StreamInput in) throws IOException {
|
||||
return alertAction;
|
||||
}
|
||||
});
|
||||
AlertTrigger alertTrigger = new AlertTrigger(new ScriptedAlertTrigger("return true", ScriptService.ScriptType.INLINE, "groovy"));
|
||||
Alert alert = new Alert(
|
||||
"my-first-alert",
|
||||
client().prepareSearch("my-index").setQuery(QueryBuilders.matchAllQuery()).request(),
|
||||
alertTrigger,
|
||||
Arrays.asList(alertAction),
|
||||
"0/5 * * * * ? *",
|
||||
null,
|
||||
1,
|
||||
true
|
||||
);
|
||||
|
||||
AlertsClientInterface alertsClient = internalCluster().getInstance(AlertsClient.class, internalCluster().getMasterName());
|
||||
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
|
||||
alert.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
|
||||
|
||||
IndexAlertRequest alertRequest = alertsClient.prepareCreateAlert().setAlertName("my-first-alert").setAlertSource(jsonBuilder.bytes()).request();
|
||||
IndexAlertResponse alertsResponse = alertsClient.createAlert(alertRequest).actionGet();
|
||||
assertNotNull(alertsResponse.indexResponse());
|
||||
assertTrue(alertsResponse.indexResponse().isCreated());
|
||||
|
||||
// Have a sample document in the index, the alert is going to evaluate
|
||||
client().prepareIndex("my-index", "my-type").setSource("field", "value").get();
|
||||
SearchRequest searchRequest = new SearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
|
||||
BytesReference alertSource = createAlertSource("0/5 * * * * ? *", searchRequest, "hits.total == 1");
|
||||
alertsClient.prepareCreateAlert("my-first-alert")
|
||||
.setAlertSource(alertSource)
|
||||
.get();
|
||||
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
assertThat(alertActionInvoked.get(), is(true));
|
||||
IndicesExistsResponse indicesExistsResponse = client().admin().indices().prepareExists(AlertActionManager.ALERT_HISTORY_INDEX).get();
|
||||
assertThat(indicesExistsResponse.isExists(), is(true));
|
||||
|
||||
SearchResponse searchResponse = client().prepareSearch(AlertActionManager.ALERT_HISTORY_INDEX)
|
||||
.setQuery(termQuery("state", AlertActionState.ACTION_PERFORMED.toString()))
|
||||
.addField("response.hits.total")
|
||||
.setSize(1)
|
||||
.get();
|
||||
assertThat(searchResponse.getHits().getHits().length, equalTo(1));
|
||||
assertThat((Integer) searchResponse.getHits().getAt(0).field("response.hits.total").getValue(), equalTo(1));
|
||||
}
|
||||
}, 30, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
DeleteAlertRequest deleteAlertRequest = new DeleteAlertRequest(alert.alertName());
|
||||
@Test
|
||||
public void testDeleteAlert() throws Exception {
|
||||
AlertsClientInterface alertsClient = alertClient();
|
||||
createIndex("my-index");
|
||||
// Have a sample document in the index, the alert is going to evaluate
|
||||
client().prepareIndex("my-index", "my-type").setSource("field", "value").get();
|
||||
SearchRequest searchRequest = new SearchRequest("my-index").source(searchSource().query(matchAllQuery()));
|
||||
BytesReference alertSource = createAlertSource("0/5 * * * * ? *", searchRequest, "hits.total == 1");
|
||||
alertsClient.prepareCreateAlert("my-first-alert")
|
||||
.setAlertSource(alertSource)
|
||||
.get();
|
||||
|
||||
DeleteAlertRequest deleteAlertRequest = new DeleteAlertRequest("my-first-alert");
|
||||
DeleteAlertResponse deleteAlertResponse = alertsClient.deleteAlert(deleteAlertRequest).actionGet();
|
||||
assertNotNull(deleteAlertResponse.deleteResponse());
|
||||
assertTrue(deleteAlertResponse.deleteResponse().isFound());
|
||||
|
||||
assertHitCount(client().prepareCount(AlertsStore.ALERT_INDEX).get(), 0l);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void clearAlerts() {
|
||||
// Clear all in-memory alerts on all nodes, perhaps there isn't an elected master at this point
|
||||
for (AlertManager manager : internalCluster().getInstances(AlertManager.class)) {
|
||||
manager.clear();
|
||||
}
|
||||
}
|
||||
|
||||
private BytesReference createAlertSource(String cron, SearchRequest request, String scriptTrigger) throws IOException {
|
||||
XContentBuilder builder = jsonBuilder().startObject();
|
||||
builder.field("schedule", cron);
|
||||
builder.field("enable", true);
|
||||
|
||||
builder.startObject("request");
|
||||
XContentHelper.writeRawField("body", request.source(), builder, ToXContent.EMPTY_PARAMS);
|
||||
builder.startArray("indices");
|
||||
for (String index : request.indices()) {
|
||||
builder.value(index);
|
||||
}
|
||||
builder.endArray();
|
||||
builder.endObject();
|
||||
|
||||
builder.startObject("trigger");
|
||||
builder.startObject("script");
|
||||
builder.field("script", scriptTrigger);
|
||||
builder.endObject();
|
||||
builder.endObject();
|
||||
|
||||
builder.startObject("actions");
|
||||
builder.startObject("index");
|
||||
builder.field("index", "my-index");
|
||||
builder.field("type", "trail");
|
||||
builder.endObject();
|
||||
builder.endObject();
|
||||
|
||||
return builder.endObject().bytes();
|
||||
}
|
||||
|
||||
private AlertsClient alertClient() {
|
||||
return internalCluster().getInstance(AlertsClient.class, internalCluster().getMasterName());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue