Changed the alert manager starting logic to move the loading logic down to alert store and alert action manager

Moved logic around from alert scheduler to action manager and trigger manager.

Original commit: elastic/x-pack-elasticsearch@8cba72b005
This commit is contained in:
Martijn van Groningen 2014-10-31 10:28:42 +01:00
parent 3e45310877
commit 4373330a28
14 changed files with 477 additions and 507 deletions

View File

@ -23,10 +23,8 @@ public class Alert implements ToXContent {
private TimeValue timePeriod;
private List<AlertAction> actions;
private String schedule;
private DateTime lastRan;
private DateTime lastActionFire;
private long version;
private DateTime running;
private boolean enabled;
private boolean simpleQuery;
private String timestampString = "@timestamp";
@ -63,14 +61,6 @@ public class Alert implements ToXContent {
this.enabled = enabled;
}
public DateTime running() {
return running;
}
public void running(DateTime running) {
this.running = running;
}
public long version() {
return version;
}
@ -137,14 +127,6 @@ public class Alert implements ToXContent {
this.schedule = schedule;
}
public DateTime lastRan() {
return lastRan;
}
public void lastRan(DateTime lastRan) {
this.lastRan = lastRan;
}
public Alert() {
}
@ -156,11 +138,9 @@ public class Alert implements ToXContent {
this.trigger = trigger;
this.timePeriod = timePeriod;
this.actions = actions;
this.lastRan = lastRan;
this.schedule = schedule;
this.indices = indices;
this.version = version;
this.running = running;
this.enabled = enabled;
this.simpleQuery = simpleQuery;
}
@ -179,12 +159,6 @@ public class Alert implements ToXContent {
if (timePeriod != null) {
builder.field(AlertsStore.TIMEPERIOD_FIELD.getPreferredName(), timePeriod);
}
if (lastRan != null) {
builder.field(AlertsStore.LASTRAN_FIELD.getPreferredName(), lastRan);
}
if (running != null) {
builder.field(AlertsStore.CURRENTLY_RUNNING.getPreferredName(), running);
}
builder.field(AlertsStore.ENABLED.getPreferredName(), enabled);
builder.field(AlertsStore.SIMPLE_QUERY.getPreferredName(), simpleQuery);
if (lastActionFire != null) {

View File

@ -8,27 +8,21 @@ package org.elasticsearch.alerts;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.alerts.actions.AlertAction;
import org.elasticsearch.alerts.actions.AlertActionEntry;
import org.elasticsearch.alerts.actions.AlertActionManager;
import org.elasticsearch.alerts.actions.AlertActionRegistry;
import org.elasticsearch.alerts.scheduler.AlertScheduler;
import org.elasticsearch.alerts.triggers.AlertTrigger;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.alerts.triggers.TriggerManager;
import org.elasticsearch.alerts.triggers.TriggerResult;
import org.elasticsearch.cluster.*;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.indices.IndicesService;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@ -39,69 +33,36 @@ import java.util.concurrent.atomic.AtomicBoolean;
// The KeyedLock make sure that we only lock on the same alert, but not on different alerts.
public class AlertManager extends AbstractComponent {
public static final String ALERT_INDEX = ".alerts";
public static final String ALERT_TYPE = "alert";
private final ThreadPool threadPool;
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean startActions = new AtomicBoolean(false);
private AlertScheduler scheduler;
private final AlertsStore alertsStore;
private final AlertActionRegistry actionRegistry;
private final TriggerManager triggerManager;
private final ClusterService clusterService;
private final AlertActionManager actionManager;
private final AtomicBoolean started = new AtomicBoolean(false);
@Inject
public AlertManager(Settings settings, Client client, ClusterService clusterService, ThreadPool threadPool,
AlertActionRegistry actionRegistry, AlertsStore alertsStore) {
public AlertManager(Settings settings, ClusterService clusterService, AlertsStore alertsStore,
IndicesService indicesService, TriggerManager triggerManager, AlertActionManager actionManager) {
super(settings);
this.threadPool = threadPool;
this.actionRegistry = actionRegistry;
this.actionManager = new AlertActionManager(client, this, actionRegistry, threadPool);
this.alertsStore = alertsStore;
this.clusterService = clusterService;
this.triggerManager = triggerManager;
this.actionManager = actionManager;
clusterService.add(new AlertsClusterStateListener());
// Close if the indices service is being stopped, so we don't run into search failures (locally) that will
// happen because we're shutting down and an alert is scheduled.
indicesService.addLifecycleListener(new LifecycleListener() {
@Override
public void beforeStop() {
stop();
}
});
}
public void setAlertScheduler(AlertScheduler scheduler){
this.scheduler = scheduler;
}
public void doAction(Alert alert, AlertActionEntry result, DateTime scheduledTime) {
ensureStarted();
logger.warn("We have triggered");
DateTime lastActionFire = timeActionLastTriggered(alert.alertName());
long msSinceLastAction = scheduledTime.getMillis() - lastActionFire.getMillis();
logger.error("last action fire [{}]", lastActionFire);
logger.error("msSinceLastAction [{}]", msSinceLastAction);
if (alert.timePeriod().getMillis() > msSinceLastAction) {
logger.warn("Not firing action because it was fired in the timePeriod");
} else {
actionRegistry.doAction(alert, result);
logger.warn("Did action !");
alert.lastActionFire(scheduledTime);
alertsStore.updateAlert(alert);
}
}
public boolean claimAlertRun(String alertName, DateTime scheduleRunTime) {
ensureStarted();
Alert alert;
try {
alert = alertsStore.getAlert(alertName);
if (!alert.enabled()) {
return false;
}
} catch (Throwable t) {
throw new ElasticsearchException("Unable to load alert from index",t);
}
alert.running(scheduleRunTime);
alertsStore.updateAlert(alert);
return true;
}
public void clearAndReload() {
ensureStarted();
try {
@ -113,18 +74,10 @@ public class AlertManager extends AbstractComponent {
}
}
public boolean updateLastRan(String alertName, DateTime fireTime, DateTime scheduledTime) throws Exception {
ensureStarted();
Alert alert = alertsStore.getAlert(alertName);
alert.lastRan(fireTime);
alertsStore.updateAlert(alert);
return true;
}
public boolean deleteAlert(String name) throws InterruptedException, ExecutionException {
ensureStarted();
if (alertsStore.hasAlert(name)) {
assert scheduler.deleteAlertFromSchedule(name);
assert scheduler.remove(name);
alertsStore.deleteAlert(name);
return true;
} else {
@ -135,15 +88,10 @@ public class AlertManager extends AbstractComponent {
public Alert addAlert(String alertName, BytesReference alertSource) {
ensureStarted();
Alert alert = alertsStore.createAlert(alertName, alertSource);
scheduler.addAlert(alertName, alert);
scheduler.add(alertName, alert);
return alert;
}
public Alert getAlertForName(String alertName) {
ensureStarted();
return alertsStore.getAlert(alertName);
}
public List<Alert> getAllAlerts() {
ensureStarted();
return ImmutableList.copyOf(alertsStore.getAlerts().values());
@ -153,30 +101,38 @@ public class AlertManager extends AbstractComponent {
return started.get();
}
public boolean addHistory(String alertName, boolean isTriggered, DateTime dateTime, DateTime scheduledTime,
SearchRequestBuilder srb, AlertTrigger trigger, long totalHits, List<AlertAction> actions,
List<String> indices) throws IOException{
return actionManager.addHistory(alertName, isTriggered, dateTime, scheduledTime, srb, trigger, totalHits, actions, indices);
public void executeAlert(String alertName, DateTime scheduledFireTime, DateTime fireTime){
ensureStarted();
Alert alert = alertsStore.getAlert(alertName);
if (!alert.enabled()) {
logger.debug("Alert [{}] is not enabled", alert.alertName());
return;
}
try {
TriggerResult result = triggerManager.isTriggered(alert, scheduledFireTime);
actionManager.addAlertAction(alert, result, fireTime, scheduledFireTime);
} catch (Exception e) {
logger.error("Failed execute alert [{}]", e, alertName);
}
}
public void stop() {
if (started.compareAndSet(false, true)) {
scheduler.stop();
alertsStore.stop();
actionManager.stop();
}
}
private void ensureStarted() {
if (!started.get() || !startActions.get()) {
if (!started.get()) {
throw new ElasticsearchIllegalStateException("not started");
}
}
private void sendAlertsToScheduler() {
for (Map.Entry<String, Alert> entry : alertsStore.getAlerts().entrySet()) {
scheduler.addAlert(entry.getKey(), entry.getValue());
}
}
private DateTime timeActionLastTriggered(String alertName) {
Alert alert = alertsStore.getAlert(alertName);
if (alert != null) {
return alert.lastActionFire();
} else {
return null;
scheduler.add(entry.getKey(), entry.getValue());
}
}
@ -184,58 +140,63 @@ public class AlertManager extends AbstractComponent {
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (!event.localNodeMaster()) { //We are not the master
if (started.compareAndSet(false, true)) {
scheduler.clearAlerts();
alertsStore.clear();
if (!event.localNodeMaster()) {
// We're not the master
stop();
} else {
if (started.get()) {
return; // We're already started
}
if (startActions.compareAndSet(false, true)) {
//If actionManager was running and we aren't the master stop
actionManager.doStop(); //Safe to call this multiple times, it's a noop if we are already stopped
}
return;
}
if (!started.get()) {
IndexMetaData alertIndexMetaData = event.state().getMetaData().index(ALERT_INDEX);
if (alertIndexMetaData != null) {
if (event.state().routingTable().index(ALERT_INDEX).allPrimaryShardsActive()) {
started.set(true);
// TODO: the starter flag should only be set to true once the alert loader has completed.
// Right now there is a window of time between when started=true and loading has completed where
// alerts can get lost
threadPool.executor(ThreadPool.Names.GENERIC).execute(new AlertLoader());
}
}
}
if (!startActions.get()) {
IndexMetaData indexMetaData = event.state().getMetaData().index(AlertActionManager.ALERT_HISTORY_INDEX);
if (indexMetaData != null) {
if (event.state().routingTable().index(ALERT_INDEX).allPrimaryShardsActive()) {
startActions.set(true);
actionManager.doStart();
}
}
}
}
}
private class AlertLoader implements Runnable {
alertsStore.start(event.state(), new LoadingListener() {
@Override
public void run() {
// TODO: have some kind of retry mechanism?
try {
alertsStore.reload();
sendAlertsToScheduler();
} catch (Exception e) {
logger.warn("Error during loading of alerts from an existing .alerts index... refresh the alerts manually");
public void onSuccess() {
startIfReady();
}
started.set(true);
@Override
public void onFailure() {
retry();
}
});
actionManager.start(event.state(), new LoadingListener() {
@Override
public void onSuccess() {
startIfReady();
}
@Override
public void onFailure() {
retry();
}
});
}
}
private void startIfReady() {
if (alertsStore.started() && actionManager.started()) {
if (started.compareAndSet(false, true)) {
scheduler.start();
sendAlertsToScheduler();
}
}
}
private void retry() {
clusterService.submitStateUpdateTask("alerts-retry", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
// Force a new cluster state to trigger that alerts cluster state listener gets invoked again.
return ClusterState.builder(currentState).build();
}
@Override
public void onFailure(String source, @Nullable Throwable t) {
logger.error("Error during {} ", t, source);
}
});
}
}
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.alerts;
import org.elasticsearch.alerts.actions.AlertActionManager;
import org.elasticsearch.alerts.actions.AlertActionRegistry;
import org.elasticsearch.alerts.rest.AlertRestHandler;
import org.elasticsearch.alerts.scheduler.AlertScheduler;
@ -17,6 +18,7 @@ public class AlertingModule extends AbstractModule {
protected void configure() {
bind(AlertsStore.class).asEagerSingleton();
bind(AlertManager.class).asEagerSingleton();
bind(AlertActionManager.class).asEagerSingleton();
bind(TriggerManager.class).asEagerSingleton();
bind(AlertScheduler.class).asEagerSingleton();
bind(AlertActionRegistry.class).asEagerSingleton();

View File

@ -21,6 +21,8 @@ import org.elasticsearch.alerts.actions.AlertActionRegistry;
import org.elasticsearch.alerts.triggers.TriggerManager;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.bytes.BytesReference;
@ -32,24 +34,27 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
/**
*/
public class AlertsStore extends AbstractComponent {
public static final String ALERT_INDEX = ".alerts";
public static final String ALERT_TYPE = "alert";
public static final ParseField QUERY_NAME_FIELD = new ParseField("query");
public static final ParseField SCHEDULE_FIELD = new ParseField("schedule");
public static final ParseField TRIGGER_FIELD = new ParseField("trigger");
public static final ParseField TIMEPERIOD_FIELD = new ParseField("timeperiod");
public static final ParseField ACTION_FIELD = new ParseField("action");
public static final ParseField LASTRAN_FIELD = new ParseField("lastRan");
public static final ParseField INDICES = new ParseField("indices");
public static final ParseField CURRENTLY_RUNNING = new ParseField("running");
public static final ParseField ENABLED = new ParseField("enabled");
public static final ParseField SIMPLE_QUERY = new ParseField("simple");
public static final ParseField TIMESTAMP_FIELD = new ParseField("timefield");
@ -58,16 +63,19 @@ public class AlertsStore extends AbstractComponent {
private final TimeValue defaultTimePeriod = new TimeValue(300*1000); //TODO : read from config
private final Client client;
private final AlertActionRegistry alertActionRegistry;
private final ThreadPool threadPool;
private final ConcurrentMap<String,Alert> alertMap;
private final AlertActionRegistry alertActionRegistry;
private final AtomicReference<State> state = new AtomicReference<>(State.STOPPED);
private final int scrollSize;
private final TimeValue scrollTimeout;
@Inject
public AlertsStore(Settings settings, Client client, AlertActionRegistry alertActionRegistry) {
public AlertsStore(Settings settings, Client client, ThreadPool threadPool, AlertActionRegistry alertActionRegistry) {
super(settings);
this.client = client;
this.threadPool = threadPool;
this.alertActionRegistry = alertActionRegistry;
this.alertMap = ConcurrentCollections.newConcurrentMap();
this.scrollSize = componentSettings.getAsInt("scroll.size", 100);
@ -92,7 +100,7 @@ public class AlertsStore extends AbstractComponent {
* Creates an alert with the specified and fails if an alert with the name already exists.
*/
public Alert createAlert(String name, BytesReference alertSource) {
if (!client.admin().indices().prepareExists(AlertManager.ALERT_INDEX).execute().actionGet().isExists()) {
if (!client.admin().indices().prepareExists(ALERT_INDEX).execute().actionGet().isExists()) {
createAlertsIndex();
}
@ -110,8 +118,8 @@ public class AlertsStore extends AbstractComponent {
*/
public void updateAlert(Alert alert) {
IndexRequest updateRequest = new IndexRequest();
updateRequest.index(AlertManager.ALERT_INDEX);
updateRequest.type(AlertManager.ALERT_TYPE);
updateRequest.index(ALERT_INDEX);
updateRequest.type(ALERT_TYPE);
updateRequest.id(alert.alertName());
updateRequest.version(alert.version());
XContentBuilder alertBuilder;
@ -138,8 +146,8 @@ public class AlertsStore extends AbstractComponent {
if (alert != null) {
DeleteRequest deleteRequest = new DeleteRequest();
deleteRequest.id(name);
deleteRequest.index(AlertManager.ALERT_INDEX);
deleteRequest.type(AlertManager.ALERT_TYPE);
deleteRequest.index(ALERT_INDEX);
deleteRequest.type(ALERT_TYPE);
deleteRequest.version(alert.version());
DeleteResponse deleteResponse = client.delete(deleteRequest).actionGet();
assert deleteResponse.isFound();
@ -165,8 +173,53 @@ public class AlertsStore extends AbstractComponent {
return alertMap;
}
public void start(ClusterState state, final LoadingListener listener) {
IndexMetaData alertIndexMetaData = state.getMetaData().index(ALERT_INDEX);
if (alertIndexMetaData != null) {
if (state.routingTable().index(ALERT_INDEX).allPrimaryShardsActive()) {
if (this.state.compareAndSet(State.STOPPED, State.LOADING)) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
@Override
public void run() {
boolean success = false;
try {
loadAlerts();
success = true;
} catch (Exception e) {
logger.warn("Failed to load alerts", e);
} finally {
if (success) {
if (AlertsStore.this.state.compareAndSet(State.LOADING, State.STARTED)) {
listener.onSuccess();
}
} else {
if (AlertsStore.this.state.compareAndSet(State.LOADING, State.STOPPED)) {
listener.onFailure();
}
}
}
}
});
}
}
} else {
if (AlertsStore.this.state.compareAndSet(State.STOPPED, State.STARTED)) {
listener.onSuccess();
}
}
}
public boolean started() {
return state.get() == State.STARTED;
}
public void stop() {
state.set(State.STOPPED);
clear();
}
private void persistAlert(String alertName, BytesReference alertSource, IndexRequest.OpType opType) {
IndexRequest indexRequest = new IndexRequest(AlertManager.ALERT_INDEX, AlertManager.ALERT_TYPE, alertName);
IndexRequest indexRequest = new IndexRequest(ALERT_INDEX, ALERT_TYPE, alertName);
indexRequest.listenerThreaded(false);
indexRequest.source(alertSource, false);
indexRequest.opType(opType);
@ -174,7 +227,7 @@ public class AlertsStore extends AbstractComponent {
}
private void loadAlerts() {
if (!client.admin().indices().prepareExists(AlertManager.ALERT_INDEX).execute().actionGet().isExists()) {
if (!client.admin().indices().prepareExists(ALERT_INDEX).execute().actionGet().isExists()) {
createAlertsIndex();
}
@ -182,8 +235,8 @@ public class AlertsStore extends AbstractComponent {
.setSearchType(SearchType.SCAN)
.setScroll(scrollTimeout)
.setSize(scrollSize)
.setTypes(AlertManager.ALERT_TYPE)
.setIndices(AlertManager.ALERT_INDEX).get();
.setTypes(ALERT_TYPE)
.setIndices(ALERT_INDEX).get();
try {
while (response.getHits().hits().length != 0) {
for (SearchHit sh : response.getHits()) {
@ -240,10 +293,6 @@ public class AlertsStore extends AbstractComponent {
alert.schedule(parser.textOrNull());
} else if (TIMEPERIOD_FIELD.match(currentFieldName)) {
alert.timestampString(parser.textOrNull());
} else if (LASTRAN_FIELD.match(currentFieldName)) {
alert.lastRan(DateTime.parse(parser.textOrNull()));
} else if (CURRENTLY_RUNNING.match(currentFieldName)) {
alert.running(DateTime.parse(parser.textOrNull()));
} else if (ENABLED.match(currentFieldName)) {
alert.enabled(parser.booleanValue());
} else if (SIMPLE_QUERY.match(currentFieldName)) {
@ -275,10 +324,18 @@ public class AlertsStore extends AbstractComponent {
}
private ClusterHealthStatus createAlertsIndex() {
CreateIndexResponse cir = client.admin().indices().prepareCreate(AlertManager.ALERT_INDEX).addMapping(AlertManager.ALERT_TYPE).execute().actionGet(); //TODO FIX MAPPINGS
CreateIndexResponse cir = client.admin().indices().prepareCreate(ALERT_INDEX).addMapping(ALERT_TYPE).execute().actionGet(); //TODO FIX MAPPINGS
ClusterHealthResponse actionGet = client.admin().cluster()
.health(Requests.clusterHealthRequest(AlertManager.ALERT_INDEX).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet();
.health(Requests.clusterHealthRequest(ALERT_INDEX).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet();
return actionGet.getStatus();
}
private enum State {
STOPPED,
LOADING,
STARTED
}
}

View File

@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts;
/**
*/
public interface LoadingListener {
void onSuccess();
void onFailure();
}

View File

@ -5,10 +5,13 @@
*/
package org.elasticsearch.alerts.actions;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.triggers.AlertTrigger;
import org.elasticsearch.alerts.triggers.TriggerResult;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import java.io.IOException;
import java.util.List;
@ -22,7 +25,7 @@ public class AlertActionEntry implements ToXContent{
private boolean triggered;
private DateTime fireTime;
private AlertTrigger trigger;
private String triggeringQuery;
private String triggeringSearchRequest;
private long numberOfResults;
private List<AlertAction> actions;
private List<String> indices;
@ -79,12 +82,12 @@ public class AlertActionEntry implements ToXContent{
this.trigger = trigger;
}
public String getTriggeringQuery() {
return triggeringQuery;
public String getTriggeringSearchRequest() {
return triggeringSearchRequest;
}
public void setTriggeringQuery(String triggeringQuery) {
this.triggeringQuery = triggeringQuery;
public void setTriggeringSearchRequest(String triggeringSearchRequest) {
this.triggeringSearchRequest = triggeringSearchRequest;
}
public long getNumberOfResults() {
@ -130,20 +133,18 @@ public class AlertActionEntry implements ToXContent{
protected AlertActionEntry() {
}
public AlertActionEntry(String id, long version, String alertName, boolean triggered, DateTime fireTime, DateTime scheduledTime, AlertTrigger trigger,
String queryRan, long numberOfResults, List<AlertAction> actions,
List<String> indices, AlertActionState state) {
this.id = id;
this.version = version;
this.alertName = alertName;
this.triggered = triggered;
public AlertActionEntry(Alert alert, TriggerResult result, DateTime fireTime, DateTime scheduledTime, AlertActionState state) throws IOException {
this.id = alert.alertName() + "#" + scheduledTime.toDateTimeISO();
this.version = 1;
this.alertName = alert.alertName();
this.triggered = result.isTriggered();
this.fireTime = fireTime;
this.scheduledTime = scheduledTime;
this.trigger = trigger;
this.triggeringQuery = queryRan;
this.numberOfResults = numberOfResults;
this.actions = actions;
this.indices = indices;
this.trigger = alert.trigger();
this.triggeringSearchRequest = XContentHelper.convertToJson(result.getRequest().source(), false, true);
this.numberOfResults = result.getResponse().getHits().totalHits();
this.actions = alert.actions();
this.indices = alert.indices();
this.entryState = state;
}
@ -158,7 +159,7 @@ public class AlertActionEntry implements ToXContent{
historyEntry.field("trigger");
trigger.toXContent(historyEntry, ToXContent.EMPTY_PARAMS);
historyEntry.field("queryRan", triggeringQuery);
historyEntry.field("queryRan", triggeringSearchRequest);
historyEntry.field("numberOfResults", numberOfResults);
@ -196,7 +197,7 @@ public class AlertActionEntry implements ToXContent{
", triggered=" + triggered +
", fireTime=" + fireTime +
", trigger=" + trigger +
", triggeringQuery='" + triggeringQuery + '\'' +
", triggeringSearchRequest='" + triggeringSearchRequest + '\'' +
", numberOfResults=" + numberOfResults +
", actions=" + actions +
", indices=" + indices +
@ -225,7 +226,7 @@ public class AlertActionEntry implements ToXContent{
if (scheduledTime != null ? !scheduledTime.equals(that.scheduledTime) : that.scheduledTime != null)
return false;
if (trigger != null ? !trigger.equals(that.trigger) : that.trigger != null) return false;
if (triggeringQuery != null ? !triggeringQuery.equals(that.triggeringQuery) : that.triggeringQuery != null)
if (triggeringSearchRequest != null ? !triggeringSearchRequest.equals(that.triggeringSearchRequest) : that.triggeringSearchRequest != null)
return false;
return true;
@ -238,7 +239,7 @@ public class AlertActionEntry implements ToXContent{
result = 31 * result + (triggered ? 1 : 0);
result = 31 * result + (fireTime != null ? fireTime.hashCode() : 0);
result = 31 * result + (trigger != null ? trigger.hashCode() : 0);
result = 31 * result + (triggeringQuery != null ? triggeringQuery.hashCode() : 0);
result = 31 * result + (triggeringSearchRequest != null ? triggeringSearchRequest.hashCode() : 0);
result = 31 * result + (int) (numberOfResults ^ (numberOfResults >>> 32));
result = 31 * result + (actions != null ? actions.hashCode() : 0);
result = 31 * result + (indices != null ? indices.hashCode() : 0);

View File

@ -13,22 +13,24 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.alerts.AlertManager;
import org.elasticsearch.alerts.triggers.AlertTrigger;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.AlertsStore;
import org.elasticsearch.alerts.LoadingListener;
import org.elasticsearch.alerts.triggers.TriggerManager;
import org.elasticsearch.alerts.triggers.TriggerResult;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.threadpool.ThreadPool;
@ -36,13 +38,12 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
*/
public class AlertActionManager {
public class AlertActionManager extends AbstractComponent {
public static final String ALERT_NAME_FIELD = "alertName";
public static final String TRIGGERED_FIELD = "triggered";
@ -57,23 +58,20 @@ public class AlertActionManager {
public static final String ALERT_HISTORY_TYPE = "alerthistory";
private final Client client;
private final AlertManager alertManager;
private final AlertActionRegistry actionRegistry;
private final ThreadPool threadPool;
private final AlertsStore alertsStore;
private final AlertActionRegistry actionRegistry;
private final ESLogger logger = Loggers.getLogger(AlertActionManager.class);
private BlockingQueue<AlertActionEntry> jobsToBeProcessed = new LinkedBlockingQueue<>();
public final AtomicBoolean running = new AtomicBoolean(false);
private Executor readerExecutor;
private final BlockingQueue<AlertActionEntry> jobsToBeProcessed = new LinkedBlockingQueue<>();
private final AtomicReference<State> state = new AtomicReference<>(State.STOPPED);
private static AlertActionEntry END_ENTRY = new AlertActionEntry();
class AlertHistoryRunnable implements Runnable {
AlertActionEntry entry;
private class AlertHistoryRunnable implements Runnable {
AlertHistoryRunnable(AlertActionEntry entry) {
private final AlertActionEntry entry;
private AlertHistoryRunnable(AlertActionEntry entry) {
this.entry = entry;
}
@ -81,7 +79,22 @@ public class AlertActionManager {
public void run() {
try {
if (claimAlertHistoryEntry(entry)) {
alertManager.doAction(alertManager.getAlertForName(entry.getAlertName()), entry, entry.getScheduledTime());
Alert alert = alertsStore.getAlert(entry.getAlertName());
DateTime lastActionFire = alert.lastActionFire();
DateTime scheduledTime = entry.getScheduledTime();
long msSinceLastAction = scheduledTime.getMillis() - lastActionFire.getMillis();
logger.trace("last action fire [{}]", lastActionFire);
logger.trace("msSinceLastAction [{}]", msSinceLastAction);
if (alert.timePeriod().getMillis() > msSinceLastAction) {
logger.debug("Not firing action because it was fired in the timePeriod");
} else {
actionRegistry.doAction(alert, entry);
logger.debug("Did action !");
alert.lastActionFire(scheduledTime);
alertsStore.updateAlert(alert);
}
updateHistoryEntry(entry, AlertActionState.ACTION_PERFORMED);
} else {
logger.warn("Unable to claim alert history entry" + entry);
@ -89,53 +102,32 @@ public class AlertActionManager {
} catch (Throwable t) {
logger.error("Failed to execute alert action", t);
}
}
}
class QueueLoaderThread implements Runnable {
@Override
public void run() {
boolean success = false;
do {
try {
success = loadQueue();
} catch (Exception e) {
logger.error("Unable to load the job queue", e);
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
private class QueueReaderThread implements Runnable {
}
}
} while (!success);
}
}
class QueueReaderThread implements Runnable {
@Override
public void run() {
try {
logger.debug("Starting thread to read from the job queue");
while (running.get()) {
while (started()) {
AlertActionEntry entry = null;
do {
try {
entry = jobsToBeProcessed.take();
} catch (InterruptedException ie) {
if (!running.get()) {
if (!started()) {
break;
}
}
} while (entry == null);
if (!running.get() || entry == END_ENTRY) {
if (!started() || entry == END_ENTRY) {
logger.debug("Stopping thread to read from the job queue");
return;
}
threadPool.executor(ThreadPool.Names.MANAGEMENT)
.execute(new AlertHistoryRunnable(entry));
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AlertHistoryRunnable(entry));
}
} catch (Throwable t) {
logger.error("Error during reader thread", t);
@ -143,26 +135,66 @@ public class AlertActionManager {
}
}
public AlertActionManager(Client client, AlertManager alertManager,
AlertActionRegistry actionRegistry,
ThreadPool threadPool) {
@Inject
public AlertActionManager(Settings settings, Client client, AlertActionRegistry actionRegistry, ThreadPool threadPool, AlertsStore alertsStore) {
super(settings);
this.client = client;
this.alertManager = alertManager;
this.actionRegistry = actionRegistry;
this.threadPool = threadPool;
this.alertsStore = alertsStore;
}
public void doStart() {
if (running.compareAndSet(false, true)) {
public void start(ClusterState state, final LoadingListener listener) {
IndexMetaData indexMetaData = state.getMetaData().index(ALERT_HISTORY_INDEX);
if (indexMetaData != null) {
if (state.routingTable().index(ALERT_HISTORY_INDEX).allPrimaryShardsActive()) {
if (this.state.compareAndSet(State.STOPPED, State.LOADING)) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
@Override
public void run() {
boolean success = false;
try {
success = loadQueue();
} catch (Exception e) {
logger.error("Unable to load unfinished jobs into the job queue", e);
} finally {
if (success) {
if (AlertActionManager.this.state.compareAndSet(State.LOADING, State.STARTED)) {
doStart();
listener.onSuccess();
}
} else {
if (AlertActionManager.this.state.compareAndSet(State.LOADING, State.STOPPED)) {
listener.onFailure();
}
}
}
}
});
}
}
} else {
if (this.state.compareAndSet(State.STOPPED, State.STARTED)) {
doStart();
listener.onSuccess();
}
}
}
public void stop() {
if (state.compareAndSet(State.STARTED, State.STOPPED)) {
logger.info("Stopping job queue");
jobsToBeProcessed.add(END_ENTRY);
}
}
public boolean started() {
return state.get() == State.STARTED;
}
private void doStart() {
logger.info("Starting job queue");
readerExecutor = threadPool.executor(ThreadPool.Names.GENERIC);
readerExecutor.execute(new QueueReaderThread());
threadPool.executor(ThreadPool.Names.GENERIC).execute(new QueueLoaderThread());
}
}
public void doStop() {
stopIfRunning();
threadPool.executor(ThreadPool.Names.GENERIC).execute(new QueueReaderThread());
}
public boolean loadQueue() {
@ -196,11 +228,11 @@ public class AlertActionManager {
}
protected AlertActionEntry parseHistory(String historyId, BytesReference source, long version) {
return parseHistory(historyId, source, version, actionRegistry, logger);
return parseHistory(historyId, source, version, actionRegistry);
}
protected static AlertActionEntry parseHistory(String historyId, BytesReference source, long version,
AlertActionRegistry actionRegistry, ESLogger logger) {
AlertActionRegistry actionRegistry) {
AlertActionEntry entry = new AlertActionEntry();
entry.setId(historyId);
entry.setVersion(version);
@ -249,7 +281,7 @@ public class AlertActionManager {
entry.setScheduledTime(DateTime.parse(parser.text()));
break;
case QUERY_RAN_FIELD:
entry.setTriggeringQuery(parser.text());
entry.setTriggeringSearchRequest(parser.text());
break;
case NUMBER_OF_RESULTS_FIELD:
entry.setNumberOfResults(parser.longValue());
@ -261,7 +293,7 @@ public class AlertActionManager {
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]");
}
} else {
throw new ElasticsearchIllegalArgumentException("Unexpected token [" + token + "]");
}
}
} catch (IOException e) {
@ -271,24 +303,17 @@ public class AlertActionManager {
}
public boolean addHistory(String alertName, boolean triggered,
DateTime fireTime, DateTime scheduledFireTime, SearchRequestBuilder triggeringQuery,
AlertTrigger trigger, long numberOfResults,
List<AlertAction> actions,
@Nullable List<String> indices) throws IOException {
if (!client.admin().indices().prepareExists(ALERT_HISTORY_INDEX).execute().actionGet().isExists()) {
ClusterHealthStatus chs = createAlertHistoryIndex();
public void addAlertAction(Alert alert, TriggerResult result, DateTime fireTime, DateTime scheduledFireTime) throws IOException {
if (!client.admin().indices().prepareExists(ALERT_HISTORY_INDEX).get().isExists()) {
createAlertHistoryIndex();
}
AlertActionState state = AlertActionState.NO_ACTION_NEEDED;
if (triggered && !actions.isEmpty()) {
if (result.isTriggered() && !alert.actions().isEmpty()) {
state = AlertActionState.ACTION_NEEDED;
}
AlertActionEntry entry = new AlertActionEntry(alertName + " " + scheduledFireTime.toDateTimeISO(), 1, alertName, triggered, fireTime, scheduledFireTime, trigger,
triggeringQuery.toString(), numberOfResults, actions, indices, state);
AlertActionEntry entry = new AlertActionEntry(alert, result, fireTime, scheduledFireTime, state);
XContentBuilder historyEntry = XContentFactory.jsonBuilder();
entry.toXContent(historyEntry, ToXContent.EMPTY_PARAMS);
@ -298,30 +323,13 @@ public class AlertActionManager {
indexRequest.id(entry.getId());
indexRequest.source(historyEntry);
indexRequest.listenerThreaded(false);
indexRequest.operationThreaded(false);
indexRequest.refresh(true); //Always refresh after indexing an alert
indexRequest.opType(IndexRequest.OpType.CREATE);
try {
if (client.index(indexRequest).actionGet().isCreated()) {
client.index(indexRequest).actionGet();
if (state != AlertActionState.NO_ACTION_NEEDED) {
jobsToBeProcessed.add(entry);
return true;
} else {
return false;
}
} catch (DocumentAlreadyExistsException daee){
logger.warn("Someone has already created a history entry for this alert run");
return false;
}
}
private void stopIfRunning() {
if (running.compareAndSet(true, false)) {
logger.info("Stopping job queue");
jobsToBeProcessed.add(END_ENTRY);
}
}
private ClusterHealthStatus createAlertHistoryIndex() {
CreateIndexResponse cir = client.admin().indices().prepareCreate(ALERT_HISTORY_INDEX).addMapping(ALERT_HISTORY_TYPE).execute().actionGet(); //TODO FIX MAPPINGS
if (!cir.isAcknowledged()) {
@ -412,6 +420,12 @@ public class AlertActionManager {
return true;
}
private enum State {
STOPPED,
LOADING,
STARTED
}
}

View File

@ -90,7 +90,7 @@ public class EmailAlertAction implements AlertAction {
StringBuffer output = new StringBuffer();
output.append("The following query triggered because " + result.getTrigger().toString() + "\n");
output.append("The total number of hits returned : " + result.getNumberOfResults() + "\n");
output.append("For query : " + result.getTriggeringQuery());
output.append("For query : " + result.getTriggeringSearchRequest());
output.append("\n");
output.append("Indices : ");
for (String index : result.getIndices()) {

View File

@ -6,9 +6,7 @@
package org.elasticsearch.alerts.plugin;
import org.elasticsearch.alerts.AlertingModule;
import org.elasticsearch.alerts.scheduler.AlertScheduler;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.AbstractPlugin;
@ -29,13 +27,6 @@ public class AlertsPlugin extends AbstractPlugin {
return "Elasticsearch Alerts";
}
@Override
public Collection<java.lang.Class<? extends LifecycleComponent>> services() {
Collection<java.lang.Class<? extends LifecycleComponent>> services = Lists.newArrayList();
services.add(AlertScheduler.class);
return services;
}
@Override
public Collection<Class<? extends Module>> modules() {
Collection<Class<? extends Module>> modules = Lists.newArrayList();

View File

@ -6,58 +6,25 @@
package org.elasticsearch.alerts.scheduler;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.actions.AlertActionManager;
import org.elasticsearch.alerts.AlertManager;
import org.elasticsearch.alerts.triggers.TriggerManager;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.index.query.FilteredQueryBuilder;
import org.elasticsearch.index.query.RangeFilterBuilder;
import org.elasticsearch.index.query.TemplateQueryBuilder;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.simpl.SimpleJobFactory;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
public class AlertScheduler extends AbstractComponent {
public class AlertScheduler extends AbstractLifecycleComponent implements ClusterStateListener {
private final Client client;
private final Scheduler scheduler;
private final AlertManager alertManager;
private final TriggerManager triggerManager;
private final ScriptService scriptService;
private AlertActionManager actionManager;
private final AtomicBoolean run = new AtomicBoolean(false);
@Inject
public AlertScheduler(Settings settings, AlertManager alertManager, Client client,
TriggerManager triggerManager, ScriptService scriptService,
ClusterService clusterService) {
public AlertScheduler(Settings settings, AlertManager alertManager) {
super(settings);
this.alertManager = alertManager;
this.client = client;
this.triggerManager = triggerManager;
this.scriptService = scriptService;
try {
SchedulerFactory schFactory = new StdSchedulerFactory();
scheduler = schFactory.getScheduler();
@ -65,14 +32,10 @@ public class AlertScheduler extends AbstractLifecycleComponent implements Cluste
} catch (SchedulerException e) {
throw new ElasticsearchException("Failed to instantiate scheduler", e);
}
clusterService.add(this);
alertManager.setAlertScheduler(this);
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.state().nodes().localNodeMaster()) {
if (run.compareAndSet(false, true)) {
public void start() {
try {
logger.info("Starting scheduler");
scheduler.start();
@ -80,13 +43,8 @@ public class AlertScheduler extends AbstractLifecycleComponent implements Cluste
logger.error("Failed to start quartz scheduler", se);
}
}
} else {
stopIfRunning();
}
}
private void stopIfRunning() {
if (run.compareAndSet(true, false)) {
public void stop() {
try {
logger.info("Stopping scheduler");
if (!scheduler.isShutdown()) {
@ -97,22 +55,14 @@ public class AlertScheduler extends AbstractLifecycleComponent implements Cluste
logger.error("Failed to stop quartz scheduler", se);
}
}
public void executeAlert(String alertName, JobExecutionContext jobExecutionContext){
DateTime scheduledFireTime = new DateTime(jobExecutionContext.getScheduledFireTime());
DateTime fireTime = new DateTime(jobExecutionContext.getFireTime());
alertManager.executeAlert(alertName, scheduledFireTime, fireTime);
}
@Override
protected void doStart() throws ElasticsearchException {
}
@Override
protected void doStop() throws ElasticsearchException {
stopIfRunning();
}
@Override
protected void doClose() throws ElasticsearchException {
}
public boolean deleteAlertFromSchedule(String alertName) {
public boolean remove(String alertName) {
try {
return scheduler.deleteJob(new JobKey(alertName));
} catch (SchedulerException se){
@ -128,89 +78,18 @@ public class AlertScheduler extends AbstractLifecycleComponent implements Cluste
}
}
public void executeAlert(String alertName, JobExecutionContext jobExecutionContext){
logger.warn("Running [{}]",alertName);
Alert alert = alertManager.getAlertForName(alertName);
DateTime scheduledTime = new DateTime(jobExecutionContext.getScheduledFireTime());
if (!alert.enabled()) {
logger.warn("Alert [{}] is not enabled", alertName);
return;
}
try {
if (!alertManager.claimAlertRun(alertName, scheduledTime) ){
logger.warn("Another process has already run this alert.");
return;
}
alert = alertManager.getAlertForName(alertName); //The claim may have triggered a refresh
SearchRequestBuilder srb = createClampedRequest(client, jobExecutionContext, alert);
String[] indices = alert.indices().toArray(new String[0]);
if (alert.indices() != null ){
logger.warn("Setting indices to : " + alert.indices());
srb.setIndices(indices);
}
//if (logger.isDebugEnabled()) {
logger.warn("Running query [{}]", XContentHelper.convertToJson(srb.request().source(), false, true));
//}
SearchResponse sr = srb.execute().get();
logger.warn("Got search response hits : [{}]", sr.getHits().getTotalHits() );
boolean isTriggered = triggerManager.isTriggered(alertName,sr);
alertManager.updateLastRan(alertName, new DateTime(jobExecutionContext.getFireTime()),scheduledTime);
if (!alertManager.addHistory(alertName, isTriggered,
new DateTime(jobExecutionContext.getScheduledFireTime()), scheduledTime, srb,
alert.trigger(), sr.getHits().getTotalHits(), alert.actions(), alert.indices()))
{
logger.warn("Failed to store history for alert [{}]", alertName);
}
} catch (Exception e) {
logger.error("Failed execute alert [{}]", e, alertName);
}
}
private SearchRequestBuilder createClampedRequest(Client client, JobExecutionContext jobExecutionContext, Alert alert){
Date scheduledFireTime = jobExecutionContext.getScheduledFireTime();
DateTime clampEnd = new DateTime(scheduledFireTime);
DateTime clampStart = clampEnd.minusSeconds((int)alert.timePeriod().seconds());
if (alert.simpleQuery()) {
TemplateQueryBuilder queryBuilder = new TemplateQueryBuilder(alert.queryName(), ScriptService.ScriptType.INDEXED, new HashMap<String, Object>());
RangeFilterBuilder filterBuilder = new RangeFilterBuilder(alert.timestampString());
filterBuilder.gte(clampStart);
filterBuilder.lt(clampEnd);
return client.prepareSearch().setQuery(new FilteredQueryBuilder(queryBuilder, filterBuilder));
} else {
//We can't just wrap the template here since it probably contains aggs or something else that doesn't play nice with FilteredQuery
Map<String,Object> fromToMap = new HashMap<>();
fromToMap.put("from", clampStart); //@TODO : make these parameters configurable ? Don't want to bloat the API too much tho
fromToMap.put("to", clampEnd);
//Go and get the search template from the script service :(
ExecutableScript script = scriptService.executable("mustache", alert.queryName(), ScriptService.ScriptType.INDEXED, fromToMap);
BytesReference requestBytes = (BytesReference)(script.run());
return client.prepareSearch().setSource(requestBytes);
}
}
public void addAlert(String alertName, Alert alert) {
public void add(String alertName, Alert alert) {
JobDetail job = JobBuilder.newJob(AlertExecutorJob.class).withIdentity(alertName).build();
job.getJobDataMap().put("manager",this);
job.getJobDataMap().put("manager", this);
CronTrigger cronTrigger = TriggerBuilder.newTrigger()
.withSchedule(CronScheduleBuilder.cronSchedule(alert.schedule()))
.build();
try {
logger.warn("Scheduling [{}] with schedule [{}]", alertName, alert.schedule());
logger.trace("Scheduling [{}] with schedule [{}]", alertName, alert.schedule());
scheduler.scheduleJob(job, cronTrigger);
} catch (SchedulerException se) {
logger.error("Failed to schedule job",se);
}
}
public boolean isRunning() {
return true;
}
}

View File

@ -7,17 +7,26 @@ package org.elasticsearch.alerts.triggers;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.AlertManager;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.index.query.FilteredQueryBuilder;
import org.elasticsearch.index.query.RangeFilterBuilder;
import org.elasticsearch.index.query.TemplateQueryBuilder;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@ -26,7 +35,7 @@ import java.util.Map;
*/
public class TriggerManager extends AbstractComponent {
private final AlertManager alertManager;
private final Client client;
private final ScriptService scriptService;
public static AlertTrigger parseTrigger(XContentParser parser) throws IOException {
@ -78,13 +87,58 @@ public class TriggerManager extends AbstractComponent {
}
@Inject
public TriggerManager(Settings settings, AlertManager alertManager, ScriptService scriptService) {
public TriggerManager(Settings settings, Client client, ScriptService scriptService) {
super(settings);
this.alertManager = alertManager;
this.client = client;
this.scriptService = scriptService;
}
public boolean doScriptTrigger(ScriptedAlertTrigger scriptTrigger, SearchResponse response) {
public TriggerResult isTriggered(Alert alert, DateTime scheduledFireTime) throws Exception {
SearchRequest request = createClampedRequest(scheduledFireTime, alert);
if (logger.isTraceEnabled()) {
logger.trace("For alert [{}] running query for [{}]", alert.alertName(), XContentHelper.convertToJson(request.source(), false, true));
}
SearchResponse response = client.search(request).get();
logger.debug("Ran alert [{}] and got hits : [{}]", alert.alertName(), response.getHits().getTotalHits());
switch (alert.trigger().triggerType()) {
case NUMBER_OF_EVENTS:
return doSimpleTrigger(alert, request, response);
case SCRIPT:
return doScriptTrigger(alert, request, response);
default:
throw new ElasticsearchIllegalArgumentException("Bad value for trigger.triggerType [" + alert.trigger().triggerType() + "]");
}
}
private TriggerResult doSimpleTrigger(Alert alert, SearchRequest request, SearchResponse response) {
boolean triggered = false;
long testValue = response.getHits().getTotalHits();
int triggerValue = alert.trigger().value();
//Move this to SimpleTrigger
switch (alert.trigger().trigger()) {
case GREATER_THAN:
triggered = testValue > triggerValue;
break;
case LESS_THAN:
triggered = testValue < triggerValue;
break;
case EQUAL:
triggered = testValue == triggerValue;
break;
case NOT_EQUAL:
triggered = testValue != triggerValue;
break;
case RISES_BY:
case FALLS_BY:
triggered = false; //TODO FIX THESE
break;
}
return new TriggerResult(triggered, request, response);
}
private TriggerResult doScriptTrigger(Alert alert, SearchRequest request, SearchResponse response) {
boolean triggered = false;
try {
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
@ -92,54 +146,45 @@ public class TriggerManager extends AbstractComponent {
builder.endObject();
Map<String, Object> responseMap = XContentHelper.convertToMap(builder.bytes(), false).v2();
ExecutableScript executable = scriptService.executable(scriptTrigger.scriptLang, scriptTrigger.script,
scriptTrigger.scriptType, responseMap);
ScriptedAlertTrigger scriptTrigger = alert.trigger().scriptedTrigger();
ExecutableScript executable = scriptService.executable(
scriptTrigger.scriptLang, scriptTrigger.script, scriptTrigger.scriptType, responseMap
);
Object returnValue = executable.run();
logger.warn("Returned [{}] from script", returnValue);
logger.trace("Returned [{}] from script", returnValue);
if (returnValue instanceof Boolean) {
return (Boolean) returnValue;
triggered = (Boolean) returnValue;
} else {
throw new ElasticsearchIllegalStateException("Trigger script [" + scriptTrigger.script + "] " +
"did not return a Boolean");
throw new ElasticsearchIllegalStateException("Trigger script [" + scriptTrigger.script + "] did not return a Boolean");
}
} catch (Exception e ){
logger.error("Failed to execute script trigger", e);
}
return false;
return new TriggerResult(triggered, request, response);
}
public boolean isTriggered(String alertName, SearchResponse response) {
Alert alert = this.alertManager.getAlertForName(alertName);
if (alert == null){
logger.warn("Could not find alert named [{}] in alert manager perhaps it has been deleted.", alertName);
return false;
private SearchRequest createClampedRequest(DateTime scheduledFireTime, Alert alert){
DateTime clampEnd = new DateTime(scheduledFireTime);
DateTime clampStart = clampEnd.minusSeconds((int)alert.timePeriod().seconds());
SearchRequest request = new SearchRequest(alert.indices().toArray(new String[0]));
if (alert.simpleQuery()) {
TemplateQueryBuilder queryBuilder = new TemplateQueryBuilder(alert.queryName(), ScriptService.ScriptType.INDEXED, new HashMap<String, Object>());
RangeFilterBuilder filterBuilder = new RangeFilterBuilder(alert.timestampString());
filterBuilder.gte(clampStart);
filterBuilder.lt(clampEnd);
request.source(new SearchSourceBuilder().query(new FilteredQueryBuilder(queryBuilder, filterBuilder)));
} else {
//We can't just wrap the template here since it probably contains aggs or something else that doesn't play nice with FilteredQuery
Map<String,Object> fromToMap = new HashMap<>();
fromToMap.put("from", clampStart); //@TODO : make these parameters configurable ? Don't want to bloat the API too much tho
fromToMap.put("to", clampEnd);
//Go and get the search template from the script service :(
ExecutableScript script = scriptService.executable("mustache", alert.queryName(), ScriptService.ScriptType.INDEXED, fromToMap);
BytesReference requestBytes = (BytesReference)(script.run());
request.source(requestBytes, false);
}
long testValue;
switch (alert.trigger().triggerType()) {
case NUMBER_OF_EVENTS:
testValue = response.getHits().getTotalHits();
break;
case SCRIPT:
return doScriptTrigger(alert.trigger().scriptedTrigger(), response);
default:
throw new ElasticsearchIllegalArgumentException("Bad value for trigger.triggerType [" + alert.trigger().triggerType() + "]");
}
int triggerValue = alert.trigger().value();
//Move this to SimpleTrigger
switch (alert.trigger().trigger()) {
case GREATER_THAN:
return testValue > triggerValue;
case LESS_THAN:
return testValue < triggerValue;
case EQUAL:
return testValue == triggerValue;
case NOT_EQUAL:
return testValue != triggerValue;
case RISES_BY:
case FALLS_BY:
return false; //TODO FIX THESE
}
return false;
request.indicesOptions(IndicesOptions.lenientExpandOpen());
return request;
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.triggers;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
/**
*/
public class TriggerResult {
private final boolean triggered;
private final SearchRequest request;
private final SearchResponse response;
public TriggerResult(boolean triggered, SearchRequest request, SearchResponse response) {
this.triggered = triggered;
this.request = request;
this.response = response;
}
public boolean isTriggered() {
return triggered;
}
public SearchRequest getRequest() {
return request;
}
public SearchResponse getResponse() {
return response;
}
}

View File

@ -8,7 +8,6 @@ package org.elasticsearch.alerts;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.alerts.actions.*;
import org.elasticsearch.alerts.plugin.AlertsPlugin;
import org.elasticsearch.alerts.scheduler.AlertScheduler;
import org.elasticsearch.alerts.triggers.AlertTrigger;
import org.elasticsearch.alerts.triggers.ScriptedAlertTrigger;
import org.elasticsearch.common.settings.ImmutableSettings;
@ -50,9 +49,9 @@ public class BasicAlertingTest extends ElasticsearchIntegrationTest {
public void testAlerSchedulerStartsProperly() throws Exception {
createIndex("my-index");
createIndex(AlertManager.ALERT_INDEX);
createIndex(AlertsStore.ALERT_INDEX);
createIndex(AlertActionManager.ALERT_HISTORY_INDEX);
ensureGreen("my-index", AlertManager.ALERT_INDEX, AlertActionManager.ALERT_HISTORY_INDEX);
ensureGreen("my-index", AlertsStore.ALERT_INDEX, AlertActionManager.ALERT_HISTORY_INDEX);
client().preparePutIndexedScript()
.setScriptLang("mustache")
@ -69,9 +68,6 @@ public class BasicAlertingTest extends ElasticsearchIntegrationTest {
assertThat(templatesResponse.getIndexTemplates().size(), equalTo(1));
assertThat(templatesResponse.getIndexTemplates().get(0).getName(), equalTo("query"));*/
AlertScheduler alertScheduler = internalCluster().getInstance(AlertScheduler.class, internalCluster().getMasterName());
assertThat(alertScheduler.isRunning(), is(true));
final AlertManager alertManager = internalCluster().getInstance(AlertManager.class, internalCluster().getMasterName());
assertBusy(new Runnable() {
@Override
@ -79,8 +75,6 @@ public class BasicAlertingTest extends ElasticsearchIntegrationTest {
assertThat(alertManager.isStarted(), is(true));
}
});
final AtomicBoolean alertActionInvoked = new AtomicBoolean(false);
final AlertAction alertAction = new AlertAction() {
@Override

View File

@ -54,7 +54,7 @@ public class AlertActionsTest extends ElasticsearchIntegrationTest {
builder.field(AlertActionState.FIELD_NAME, AlertActionState.ACTION_NEEDED.toString());
builder.endObject();
AlertActionRegistry alertActionRegistry = internalCluster().getInstance(AlertActionRegistry.class, internalCluster().getMasterName());
AlertActionEntry actionEntry = AlertActionManager.parseHistory("foobar", builder.bytes(), 0, alertActionRegistry, logger);
AlertActionEntry actionEntry = AlertActionManager.parseHistory("foobar", builder.bytes(), 0, alertActionRegistry);
assertEquals(actionEntry.getVersion(), 0);
assertEquals(actionEntry.getAlertName(), "testName");