diff --git a/src/main/java/org/elasticsearch/alerts/Alert.java b/src/main/java/org/elasticsearch/alerts/Alert.java index 6d52cf9fa79..9d8b9514ab5 100644 --- a/src/main/java/org/elasticsearch/alerts/Alert.java +++ b/src/main/java/org/elasticsearch/alerts/Alert.java @@ -23,10 +23,8 @@ public class Alert implements ToXContent { private TimeValue timePeriod; private List actions; private String schedule; - private DateTime lastRan; private DateTime lastActionFire; private long version; - private DateTime running; private boolean enabled; private boolean simpleQuery; private String timestampString = "@timestamp"; @@ -63,14 +61,6 @@ public class Alert implements ToXContent { this.enabled = enabled; } - public DateTime running() { - return running; - } - - public void running(DateTime running) { - this.running = running; - } - public long version() { return version; } @@ -137,14 +127,6 @@ public class Alert implements ToXContent { this.schedule = schedule; } - public DateTime lastRan() { - return lastRan; - } - - public void lastRan(DateTime lastRan) { - this.lastRan = lastRan; - } - public Alert() { } @@ -156,11 +138,9 @@ public class Alert implements ToXContent { this.trigger = trigger; this.timePeriod = timePeriod; this.actions = actions; - this.lastRan = lastRan; this.schedule = schedule; this.indices = indices; this.version = version; - this.running = running; this.enabled = enabled; this.simpleQuery = simpleQuery; } @@ -179,12 +159,6 @@ public class Alert implements ToXContent { if (timePeriod != null) { builder.field(AlertsStore.TIMEPERIOD_FIELD.getPreferredName(), timePeriod); } - if (lastRan != null) { - builder.field(AlertsStore.LASTRAN_FIELD.getPreferredName(), lastRan); - } - if (running != null) { - builder.field(AlertsStore.CURRENTLY_RUNNING.getPreferredName(), running); - } builder.field(AlertsStore.ENABLED.getPreferredName(), enabled); builder.field(AlertsStore.SIMPLE_QUERY.getPreferredName(), simpleQuery); if (lastActionFire != null) { diff --git a/src/main/java/org/elasticsearch/alerts/AlertManager.java b/src/main/java/org/elasticsearch/alerts/AlertManager.java index e3c50fa9f98..d933de96345 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertManager.java +++ b/src/main/java/org/elasticsearch/alerts/AlertManager.java @@ -8,27 +8,21 @@ package org.elasticsearch.alerts; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalStateException; -import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.alerts.actions.AlertAction; -import org.elasticsearch.alerts.actions.AlertActionEntry; import org.elasticsearch.alerts.actions.AlertActionManager; -import org.elasticsearch.alerts.actions.AlertActionRegistry; import org.elasticsearch.alerts.scheduler.AlertScheduler; -import org.elasticsearch.alerts.triggers.AlertTrigger; -import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterStateListener; -import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.alerts.triggers.TriggerManager; +import org.elasticsearch.alerts.triggers.TriggerResult; +import org.elasticsearch.cluster.*; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.ImmutableList; import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.indices.IndicesService; -import java.io.IOException; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -39,69 +33,36 @@ import java.util.concurrent.atomic.AtomicBoolean; // The KeyedLock make sure that we only lock on the same alert, but not on different alerts. public class AlertManager extends AbstractComponent { - public static final String ALERT_INDEX = ".alerts"; - public static final String ALERT_TYPE = "alert"; - - private final ThreadPool threadPool; - private final AtomicBoolean started = new AtomicBoolean(false); - private final AtomicBoolean startActions = new AtomicBoolean(false); - private AlertScheduler scheduler; private final AlertsStore alertsStore; - private final AlertActionRegistry actionRegistry; + private final TriggerManager triggerManager; + private final ClusterService clusterService; private final AlertActionManager actionManager; + private final AtomicBoolean started = new AtomicBoolean(false); @Inject - public AlertManager(Settings settings, Client client, ClusterService clusterService, ThreadPool threadPool, - AlertActionRegistry actionRegistry, AlertsStore alertsStore) { + public AlertManager(Settings settings, ClusterService clusterService, AlertsStore alertsStore, + IndicesService indicesService, TriggerManager triggerManager, AlertActionManager actionManager) { super(settings); - this.threadPool = threadPool; - this.actionRegistry = actionRegistry; - this.actionManager = new AlertActionManager(client, this, actionRegistry, threadPool); this.alertsStore = alertsStore; + this.clusterService = clusterService; + this.triggerManager = triggerManager; + this.actionManager = actionManager; clusterService.add(new AlertsClusterStateListener()); + // Close if the indices service is being stopped, so we don't run into search failures (locally) that will + // happen because we're shutting down and an alert is scheduled. + indicesService.addLifecycleListener(new LifecycleListener() { + @Override + public void beforeStop() { + stop(); + } + }); } public void setAlertScheduler(AlertScheduler scheduler){ this.scheduler = scheduler; } - public void doAction(Alert alert, AlertActionEntry result, DateTime scheduledTime) { - ensureStarted(); - logger.warn("We have triggered"); - DateTime lastActionFire = timeActionLastTriggered(alert.alertName()); - long msSinceLastAction = scheduledTime.getMillis() - lastActionFire.getMillis(); - logger.error("last action fire [{}]", lastActionFire); - logger.error("msSinceLastAction [{}]", msSinceLastAction); - - if (alert.timePeriod().getMillis() > msSinceLastAction) { - logger.warn("Not firing action because it was fired in the timePeriod"); - } else { - actionRegistry.doAction(alert, result); - logger.warn("Did action !"); - - alert.lastActionFire(scheduledTime); - alertsStore.updateAlert(alert); - } - } - - public boolean claimAlertRun(String alertName, DateTime scheduleRunTime) { - ensureStarted(); - Alert alert; - try { - alert = alertsStore.getAlert(alertName); - if (!alert.enabled()) { - return false; - } - } catch (Throwable t) { - throw new ElasticsearchException("Unable to load alert from index",t); - } - - alert.running(scheduleRunTime); - alertsStore.updateAlert(alert); - return true; - } - public void clearAndReload() { ensureStarted(); try { @@ -113,18 +74,10 @@ public class AlertManager extends AbstractComponent { } } - public boolean updateLastRan(String alertName, DateTime fireTime, DateTime scheduledTime) throws Exception { - ensureStarted(); - Alert alert = alertsStore.getAlert(alertName); - alert.lastRan(fireTime); - alertsStore.updateAlert(alert); - return true; - } - public boolean deleteAlert(String name) throws InterruptedException, ExecutionException { ensureStarted(); if (alertsStore.hasAlert(name)) { - assert scheduler.deleteAlertFromSchedule(name); + assert scheduler.remove(name); alertsStore.deleteAlert(name); return true; } else { @@ -135,15 +88,10 @@ public class AlertManager extends AbstractComponent { public Alert addAlert(String alertName, BytesReference alertSource) { ensureStarted(); Alert alert = alertsStore.createAlert(alertName, alertSource); - scheduler.addAlert(alertName, alert); + scheduler.add(alertName, alert); return alert; } - public Alert getAlertForName(String alertName) { - ensureStarted(); - return alertsStore.getAlert(alertName); - } - public List getAllAlerts() { ensureStarted(); return ImmutableList.copyOf(alertsStore.getAlerts().values()); @@ -153,30 +101,38 @@ public class AlertManager extends AbstractComponent { return started.get(); } - public boolean addHistory(String alertName, boolean isTriggered, DateTime dateTime, DateTime scheduledTime, - SearchRequestBuilder srb, AlertTrigger trigger, long totalHits, List actions, - List indices) throws IOException{ - return actionManager.addHistory(alertName, isTriggered, dateTime, scheduledTime, srb, trigger, totalHits, actions, indices); + public void executeAlert(String alertName, DateTime scheduledFireTime, DateTime fireTime){ + ensureStarted(); + Alert alert = alertsStore.getAlert(alertName); + if (!alert.enabled()) { + logger.debug("Alert [{}] is not enabled", alert.alertName()); + return; + } + try { + TriggerResult result = triggerManager.isTriggered(alert, scheduledFireTime); + actionManager.addAlertAction(alert, result, fireTime, scheduledFireTime); + } catch (Exception e) { + logger.error("Failed execute alert [{}]", e, alertName); + } + } + + public void stop() { + if (started.compareAndSet(false, true)) { + scheduler.stop(); + alertsStore.stop(); + actionManager.stop(); + } } private void ensureStarted() { - if (!started.get() || !startActions.get()) { + if (!started.get()) { throw new ElasticsearchIllegalStateException("not started"); } } private void sendAlertsToScheduler() { for (Map.Entry entry : alertsStore.getAlerts().entrySet()) { - scheduler.addAlert(entry.getKey(), entry.getValue()); - } - } - - private DateTime timeActionLastTriggered(String alertName) { - Alert alert = alertsStore.getAlert(alertName); - if (alert != null) { - return alert.lastActionFire(); - } else { - return null; + scheduler.add(entry.getKey(), entry.getValue()); } } @@ -184,58 +140,63 @@ public class AlertManager extends AbstractComponent { @Override public void clusterChanged(ClusterChangedEvent event) { - if (!event.localNodeMaster()) { //We are not the master + if (!event.localNodeMaster()) { + // We're not the master + stop(); + } else { + if (started.get()) { + return; // We're already started + } + + alertsStore.start(event.state(), new LoadingListener() { + @Override + public void onSuccess() { + startIfReady(); + } + + @Override + public void onFailure() { + retry(); + } + }); + actionManager.start(event.state(), new LoadingListener() { + @Override + public void onSuccess() { + startIfReady(); + } + + @Override + public void onFailure() { + retry(); + } + }); + } + } + + private void startIfReady() { + if (alertsStore.started() && actionManager.started()) { if (started.compareAndSet(false, true)) { - scheduler.clearAlerts(); - alertsStore.clear(); - } - - if (startActions.compareAndSet(false, true)) { - //If actionManager was running and we aren't the master stop - actionManager.doStop(); //Safe to call this multiple times, it's a noop if we are already stopped - } - return; - } - - if (!started.get()) { - IndexMetaData alertIndexMetaData = event.state().getMetaData().index(ALERT_INDEX); - if (alertIndexMetaData != null) { - if (event.state().routingTable().index(ALERT_INDEX).allPrimaryShardsActive()) { - started.set(true); - // TODO: the starter flag should only be set to true once the alert loader has completed. - // Right now there is a window of time between when started=true and loading has completed where - // alerts can get lost - threadPool.executor(ThreadPool.Names.GENERIC).execute(new AlertLoader()); - } - } - } - - if (!startActions.get()) { - IndexMetaData indexMetaData = event.state().getMetaData().index(AlertActionManager.ALERT_HISTORY_INDEX); - if (indexMetaData != null) { - if (event.state().routingTable().index(ALERT_INDEX).allPrimaryShardsActive()) { - startActions.set(true); - actionManager.doStart(); - } + scheduler.start(); + sendAlertsToScheduler(); } } } - } - - private class AlertLoader implements Runnable { - @Override - public void run() { - // TODO: have some kind of retry mechanism? - try { - alertsStore.reload(); - sendAlertsToScheduler(); - } catch (Exception e) { - logger.warn("Error during loading of alerts from an existing .alerts index... refresh the alerts manually"); - } - started.set(true); + private void retry() { + clusterService.submitStateUpdateTask("alerts-retry", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + // Force a new cluster state to trigger that alerts cluster state listener gets invoked again. + return ClusterState.builder(currentState).build(); + } + @Override + public void onFailure(String source, @Nullable Throwable t) { + logger.error("Error during {} ", t, source); + } + }); } + } } diff --git a/src/main/java/org/elasticsearch/alerts/AlertingModule.java b/src/main/java/org/elasticsearch/alerts/AlertingModule.java index 96079cba1ac..5602832b0fb 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertingModule.java +++ b/src/main/java/org/elasticsearch/alerts/AlertingModule.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.alerts; +import org.elasticsearch.alerts.actions.AlertActionManager; import org.elasticsearch.alerts.actions.AlertActionRegistry; import org.elasticsearch.alerts.rest.AlertRestHandler; import org.elasticsearch.alerts.scheduler.AlertScheduler; @@ -17,6 +18,7 @@ public class AlertingModule extends AbstractModule { protected void configure() { bind(AlertsStore.class).asEagerSingleton(); bind(AlertManager.class).asEagerSingleton(); + bind(AlertActionManager.class).asEagerSingleton(); bind(TriggerManager.class).asEagerSingleton(); bind(AlertScheduler.class).asEagerSingleton(); bind(AlertActionRegistry.class).asEagerSingleton(); diff --git a/src/main/java/org/elasticsearch/alerts/AlertsStore.java b/src/main/java/org/elasticsearch/alerts/AlertsStore.java index 16aa08b9f81..0ad9ddca4b3 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertsStore.java +++ b/src/main/java/org/elasticsearch/alerts/AlertsStore.java @@ -21,6 +21,8 @@ import org.elasticsearch.alerts.actions.AlertActionRegistry; import org.elasticsearch.alerts.triggers.TriggerManager; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Priority; import org.elasticsearch.common.bytes.BytesReference; @@ -32,24 +34,27 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.*; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; /** */ public class AlertsStore extends AbstractComponent { + public static final String ALERT_INDEX = ".alerts"; + public static final String ALERT_TYPE = "alert"; + public static final ParseField QUERY_NAME_FIELD = new ParseField("query"); public static final ParseField SCHEDULE_FIELD = new ParseField("schedule"); public static final ParseField TRIGGER_FIELD = new ParseField("trigger"); public static final ParseField TIMEPERIOD_FIELD = new ParseField("timeperiod"); public static final ParseField ACTION_FIELD = new ParseField("action"); - public static final ParseField LASTRAN_FIELD = new ParseField("lastRan"); public static final ParseField INDICES = new ParseField("indices"); - public static final ParseField CURRENTLY_RUNNING = new ParseField("running"); public static final ParseField ENABLED = new ParseField("enabled"); public static final ParseField SIMPLE_QUERY = new ParseField("simple"); public static final ParseField TIMESTAMP_FIELD = new ParseField("timefield"); @@ -58,16 +63,19 @@ public class AlertsStore extends AbstractComponent { private final TimeValue defaultTimePeriod = new TimeValue(300*1000); //TODO : read from config private final Client client; - private final AlertActionRegistry alertActionRegistry; + private final ThreadPool threadPool; private final ConcurrentMap alertMap; + private final AlertActionRegistry alertActionRegistry; + private final AtomicReference state = new AtomicReference<>(State.STOPPED); private final int scrollSize; private final TimeValue scrollTimeout; @Inject - public AlertsStore(Settings settings, Client client, AlertActionRegistry alertActionRegistry) { + public AlertsStore(Settings settings, Client client, ThreadPool threadPool, AlertActionRegistry alertActionRegistry) { super(settings); this.client = client; + this.threadPool = threadPool; this.alertActionRegistry = alertActionRegistry; this.alertMap = ConcurrentCollections.newConcurrentMap(); this.scrollSize = componentSettings.getAsInt("scroll.size", 100); @@ -92,7 +100,7 @@ public class AlertsStore extends AbstractComponent { * Creates an alert with the specified and fails if an alert with the name already exists. */ public Alert createAlert(String name, BytesReference alertSource) { - if (!client.admin().indices().prepareExists(AlertManager.ALERT_INDEX).execute().actionGet().isExists()) { + if (!client.admin().indices().prepareExists(ALERT_INDEX).execute().actionGet().isExists()) { createAlertsIndex(); } @@ -110,8 +118,8 @@ public class AlertsStore extends AbstractComponent { */ public void updateAlert(Alert alert) { IndexRequest updateRequest = new IndexRequest(); - updateRequest.index(AlertManager.ALERT_INDEX); - updateRequest.type(AlertManager.ALERT_TYPE); + updateRequest.index(ALERT_INDEX); + updateRequest.type(ALERT_TYPE); updateRequest.id(alert.alertName()); updateRequest.version(alert.version()); XContentBuilder alertBuilder; @@ -138,8 +146,8 @@ public class AlertsStore extends AbstractComponent { if (alert != null) { DeleteRequest deleteRequest = new DeleteRequest(); deleteRequest.id(name); - deleteRequest.index(AlertManager.ALERT_INDEX); - deleteRequest.type(AlertManager.ALERT_TYPE); + deleteRequest.index(ALERT_INDEX); + deleteRequest.type(ALERT_TYPE); deleteRequest.version(alert.version()); DeleteResponse deleteResponse = client.delete(deleteRequest).actionGet(); assert deleteResponse.isFound(); @@ -165,8 +173,53 @@ public class AlertsStore extends AbstractComponent { return alertMap; } + public void start(ClusterState state, final LoadingListener listener) { + IndexMetaData alertIndexMetaData = state.getMetaData().index(ALERT_INDEX); + if (alertIndexMetaData != null) { + if (state.routingTable().index(ALERT_INDEX).allPrimaryShardsActive()) { + if (this.state.compareAndSet(State.STOPPED, State.LOADING)) { + threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { + @Override + public void run() { + boolean success = false; + try { + loadAlerts(); + success = true; + } catch (Exception e) { + logger.warn("Failed to load alerts", e); + } finally { + if (success) { + if (AlertsStore.this.state.compareAndSet(State.LOADING, State.STARTED)) { + listener.onSuccess(); + } + } else { + if (AlertsStore.this.state.compareAndSet(State.LOADING, State.STOPPED)) { + listener.onFailure(); + } + } + } + } + }); + } + } + } else { + if (AlertsStore.this.state.compareAndSet(State.STOPPED, State.STARTED)) { + listener.onSuccess(); + } + } + } + + public boolean started() { + return state.get() == State.STARTED; + } + + public void stop() { + state.set(State.STOPPED); + clear(); + } + private void persistAlert(String alertName, BytesReference alertSource, IndexRequest.OpType opType) { - IndexRequest indexRequest = new IndexRequest(AlertManager.ALERT_INDEX, AlertManager.ALERT_TYPE, alertName); + IndexRequest indexRequest = new IndexRequest(ALERT_INDEX, ALERT_TYPE, alertName); indexRequest.listenerThreaded(false); indexRequest.source(alertSource, false); indexRequest.opType(opType); @@ -174,7 +227,7 @@ public class AlertsStore extends AbstractComponent { } private void loadAlerts() { - if (!client.admin().indices().prepareExists(AlertManager.ALERT_INDEX).execute().actionGet().isExists()) { + if (!client.admin().indices().prepareExists(ALERT_INDEX).execute().actionGet().isExists()) { createAlertsIndex(); } @@ -182,8 +235,8 @@ public class AlertsStore extends AbstractComponent { .setSearchType(SearchType.SCAN) .setScroll(scrollTimeout) .setSize(scrollSize) - .setTypes(AlertManager.ALERT_TYPE) - .setIndices(AlertManager.ALERT_INDEX).get(); + .setTypes(ALERT_TYPE) + .setIndices(ALERT_INDEX).get(); try { while (response.getHits().hits().length != 0) { for (SearchHit sh : response.getHits()) { @@ -240,10 +293,6 @@ public class AlertsStore extends AbstractComponent { alert.schedule(parser.textOrNull()); } else if (TIMEPERIOD_FIELD.match(currentFieldName)) { alert.timestampString(parser.textOrNull()); - } else if (LASTRAN_FIELD.match(currentFieldName)) { - alert.lastRan(DateTime.parse(parser.textOrNull())); - } else if (CURRENTLY_RUNNING.match(currentFieldName)) { - alert.running(DateTime.parse(parser.textOrNull())); } else if (ENABLED.match(currentFieldName)) { alert.enabled(parser.booleanValue()); } else if (SIMPLE_QUERY.match(currentFieldName)) { @@ -275,10 +324,18 @@ public class AlertsStore extends AbstractComponent { } private ClusterHealthStatus createAlertsIndex() { - CreateIndexResponse cir = client.admin().indices().prepareCreate(AlertManager.ALERT_INDEX).addMapping(AlertManager.ALERT_TYPE).execute().actionGet(); //TODO FIX MAPPINGS + CreateIndexResponse cir = client.admin().indices().prepareCreate(ALERT_INDEX).addMapping(ALERT_TYPE).execute().actionGet(); //TODO FIX MAPPINGS ClusterHealthResponse actionGet = client.admin().cluster() - .health(Requests.clusterHealthRequest(AlertManager.ALERT_INDEX).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet(); + .health(Requests.clusterHealthRequest(ALERT_INDEX).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet(); return actionGet.getStatus(); } + private enum State { + + STOPPED, + LOADING, + STARTED + + } + } diff --git a/src/main/java/org/elasticsearch/alerts/LoadingListener.java b/src/main/java/org/elasticsearch/alerts/LoadingListener.java new file mode 100644 index 00000000000..7d6ebbab018 --- /dev/null +++ b/src/main/java/org/elasticsearch/alerts/LoadingListener.java @@ -0,0 +1,16 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.alerts; + +/** + */ +public interface LoadingListener { + + void onSuccess(); + + void onFailure(); + +} diff --git a/src/main/java/org/elasticsearch/alerts/actions/AlertActionEntry.java b/src/main/java/org/elasticsearch/alerts/actions/AlertActionEntry.java index 8a062b7c9fc..fb4eac5f98c 100644 --- a/src/main/java/org/elasticsearch/alerts/actions/AlertActionEntry.java +++ b/src/main/java/org/elasticsearch/alerts/actions/AlertActionEntry.java @@ -5,10 +5,13 @@ */ package org.elasticsearch.alerts.actions; +import org.elasticsearch.alerts.Alert; import org.elasticsearch.alerts.triggers.AlertTrigger; +import org.elasticsearch.alerts.triggers.TriggerResult; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentHelper; import java.io.IOException; import java.util.List; @@ -22,7 +25,7 @@ public class AlertActionEntry implements ToXContent{ private boolean triggered; private DateTime fireTime; private AlertTrigger trigger; - private String triggeringQuery; + private String triggeringSearchRequest; private long numberOfResults; private List actions; private List indices; @@ -79,12 +82,12 @@ public class AlertActionEntry implements ToXContent{ this.trigger = trigger; } - public String getTriggeringQuery() { - return triggeringQuery; + public String getTriggeringSearchRequest() { + return triggeringSearchRequest; } - public void setTriggeringQuery(String triggeringQuery) { - this.triggeringQuery = triggeringQuery; + public void setTriggeringSearchRequest(String triggeringSearchRequest) { + this.triggeringSearchRequest = triggeringSearchRequest; } public long getNumberOfResults() { @@ -130,20 +133,18 @@ public class AlertActionEntry implements ToXContent{ protected AlertActionEntry() { } - public AlertActionEntry(String id, long version, String alertName, boolean triggered, DateTime fireTime, DateTime scheduledTime, AlertTrigger trigger, - String queryRan, long numberOfResults, List actions, - List indices, AlertActionState state) { - this.id = id; - this.version = version; - this.alertName = alertName; - this.triggered = triggered; + public AlertActionEntry(Alert alert, TriggerResult result, DateTime fireTime, DateTime scheduledTime, AlertActionState state) throws IOException { + this.id = alert.alertName() + "#" + scheduledTime.toDateTimeISO(); + this.version = 1; + this.alertName = alert.alertName(); + this.triggered = result.isTriggered(); this.fireTime = fireTime; this.scheduledTime = scheduledTime; - this.trigger = trigger; - this.triggeringQuery = queryRan; - this.numberOfResults = numberOfResults; - this.actions = actions; - this.indices = indices; + this.trigger = alert.trigger(); + this.triggeringSearchRequest = XContentHelper.convertToJson(result.getRequest().source(), false, true); + this.numberOfResults = result.getResponse().getHits().totalHits(); + this.actions = alert.actions(); + this.indices = alert.indices(); this.entryState = state; } @@ -158,7 +159,7 @@ public class AlertActionEntry implements ToXContent{ historyEntry.field("trigger"); trigger.toXContent(historyEntry, ToXContent.EMPTY_PARAMS); - historyEntry.field("queryRan", triggeringQuery); + historyEntry.field("queryRan", triggeringSearchRequest); historyEntry.field("numberOfResults", numberOfResults); @@ -196,7 +197,7 @@ public class AlertActionEntry implements ToXContent{ ", triggered=" + triggered + ", fireTime=" + fireTime + ", trigger=" + trigger + - ", triggeringQuery='" + triggeringQuery + '\'' + + ", triggeringSearchRequest='" + triggeringSearchRequest + '\'' + ", numberOfResults=" + numberOfResults + ", actions=" + actions + ", indices=" + indices + @@ -225,7 +226,7 @@ public class AlertActionEntry implements ToXContent{ if (scheduledTime != null ? !scheduledTime.equals(that.scheduledTime) : that.scheduledTime != null) return false; if (trigger != null ? !trigger.equals(that.trigger) : that.trigger != null) return false; - if (triggeringQuery != null ? !triggeringQuery.equals(that.triggeringQuery) : that.triggeringQuery != null) + if (triggeringSearchRequest != null ? !triggeringSearchRequest.equals(that.triggeringSearchRequest) : that.triggeringSearchRequest != null) return false; return true; @@ -238,7 +239,7 @@ public class AlertActionEntry implements ToXContent{ result = 31 * result + (triggered ? 1 : 0); result = 31 * result + (fireTime != null ? fireTime.hashCode() : 0); result = 31 * result + (trigger != null ? trigger.hashCode() : 0); - result = 31 * result + (triggeringQuery != null ? triggeringQuery.hashCode() : 0); + result = 31 * result + (triggeringSearchRequest != null ? triggeringSearchRequest.hashCode() : 0); result = 31 * result + (int) (numberOfResults ^ (numberOfResults >>> 32)); result = 31 * result + (actions != null ? actions.hashCode() : 0); result = 31 * result + (indices != null ? indices.hashCode() : 0); diff --git a/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java b/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java index 0c9fa6763b1..36df6434ef2 100644 --- a/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java +++ b/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java @@ -13,22 +13,24 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.alerts.AlertManager; -import org.elasticsearch.alerts.triggers.AlertTrigger; +import org.elasticsearch.alerts.Alert; +import org.elasticsearch.alerts.AlertsStore; +import org.elasticsearch.alerts.LoadingListener; import org.elasticsearch.alerts.triggers.TriggerManager; +import org.elasticsearch.alerts.triggers.TriggerResult; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; -import org.elasticsearch.common.Nullable; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Priority; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.joda.time.DateTime; -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.*; -import org.elasticsearch.index.engine.DocumentAlreadyExistsException; import org.elasticsearch.search.SearchHit; import org.elasticsearch.threadpool.ThreadPool; @@ -36,13 +38,12 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; /** */ -public class AlertActionManager { +public class AlertActionManager extends AbstractComponent { public static final String ALERT_NAME_FIELD = "alertName"; public static final String TRIGGERED_FIELD = "triggered"; @@ -57,23 +58,20 @@ public class AlertActionManager { public static final String ALERT_HISTORY_TYPE = "alerthistory"; private final Client client; - private final AlertManager alertManager; - private final AlertActionRegistry actionRegistry; private final ThreadPool threadPool; + private final AlertsStore alertsStore; + private final AlertActionRegistry actionRegistry; - private final ESLogger logger = Loggers.getLogger(AlertActionManager.class); - - private BlockingQueue jobsToBeProcessed = new LinkedBlockingQueue<>(); - - public final AtomicBoolean running = new AtomicBoolean(false); - private Executor readerExecutor; + private final BlockingQueue jobsToBeProcessed = new LinkedBlockingQueue<>(); + private final AtomicReference state = new AtomicReference<>(State.STOPPED); private static AlertActionEntry END_ENTRY = new AlertActionEntry(); - class AlertHistoryRunnable implements Runnable { - AlertActionEntry entry; + private class AlertHistoryRunnable implements Runnable { - AlertHistoryRunnable(AlertActionEntry entry) { + private final AlertActionEntry entry; + + private AlertHistoryRunnable(AlertActionEntry entry) { this.entry = entry; } @@ -81,7 +79,22 @@ public class AlertActionManager { public void run() { try { if (claimAlertHistoryEntry(entry)) { - alertManager.doAction(alertManager.getAlertForName(entry.getAlertName()), entry, entry.getScheduledTime()); + Alert alert = alertsStore.getAlert(entry.getAlertName()); + DateTime lastActionFire = alert.lastActionFire(); + DateTime scheduledTime = entry.getScheduledTime(); + long msSinceLastAction = scheduledTime.getMillis() - lastActionFire.getMillis(); + logger.trace("last action fire [{}]", lastActionFire); + logger.trace("msSinceLastAction [{}]", msSinceLastAction); + + if (alert.timePeriod().getMillis() > msSinceLastAction) { + logger.debug("Not firing action because it was fired in the timePeriod"); + } else { + actionRegistry.doAction(alert, entry); + logger.debug("Did action !"); + + alert.lastActionFire(scheduledTime); + alertsStore.updateAlert(alert); + } updateHistoryEntry(entry, AlertActionState.ACTION_PERFORMED); } else { logger.warn("Unable to claim alert history entry" + entry); @@ -89,53 +102,32 @@ public class AlertActionManager { } catch (Throwable t) { logger.error("Failed to execute alert action", t); } - - } } - class QueueLoaderThread implements Runnable { - @Override - public void run() { - boolean success = false; - do { - try { - success = loadQueue(); - } catch (Exception e) { - logger.error("Unable to load the job queue", e); - try { - Thread.sleep(1000); - } catch (InterruptedException ie) { + private class QueueReaderThread implements Runnable { - } - } - } while (!success); - } - } - - class QueueReaderThread implements Runnable { @Override public void run() { try { logger.debug("Starting thread to read from the job queue"); - while (running.get()) { + while (started()) { AlertActionEntry entry = null; do { try { entry = jobsToBeProcessed.take(); } catch (InterruptedException ie) { - if (!running.get()) { + if (!started()) { break; } } } while (entry == null); - if (!running.get() || entry == END_ENTRY) { + if (!started() || entry == END_ENTRY) { logger.debug("Stopping thread to read from the job queue"); + return; } - - threadPool.executor(ThreadPool.Names.MANAGEMENT) - .execute(new AlertHistoryRunnable(entry)); + threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AlertHistoryRunnable(entry)); } } catch (Throwable t) { logger.error("Error during reader thread", t); @@ -143,26 +135,66 @@ public class AlertActionManager { } } - public AlertActionManager(Client client, AlertManager alertManager, - AlertActionRegistry actionRegistry, - ThreadPool threadPool) { + @Inject + public AlertActionManager(Settings settings, Client client, AlertActionRegistry actionRegistry, ThreadPool threadPool, AlertsStore alertsStore) { + super(settings); this.client = client; - this.alertManager = alertManager; this.actionRegistry = actionRegistry; this.threadPool = threadPool; + this.alertsStore = alertsStore; } - public void doStart() { - if (running.compareAndSet(false, true)) { - logger.info("Starting job queue"); - readerExecutor = threadPool.executor(ThreadPool.Names.GENERIC); - readerExecutor.execute(new QueueReaderThread()); - threadPool.executor(ThreadPool.Names.GENERIC).execute(new QueueLoaderThread()); + public void start(ClusterState state, final LoadingListener listener) { + IndexMetaData indexMetaData = state.getMetaData().index(ALERT_HISTORY_INDEX); + if (indexMetaData != null) { + if (state.routingTable().index(ALERT_HISTORY_INDEX).allPrimaryShardsActive()) { + if (this.state.compareAndSet(State.STOPPED, State.LOADING)) { + threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { + @Override + public void run() { + boolean success = false; + try { + success = loadQueue(); + } catch (Exception e) { + logger.error("Unable to load unfinished jobs into the job queue", e); + } finally { + if (success) { + if (AlertActionManager.this.state.compareAndSet(State.LOADING, State.STARTED)) { + doStart(); + listener.onSuccess(); + } + } else { + if (AlertActionManager.this.state.compareAndSet(State.LOADING, State.STOPPED)) { + listener.onFailure(); + } + } + } + } + }); + } + } + } else { + if (this.state.compareAndSet(State.STOPPED, State.STARTED)) { + doStart(); + listener.onSuccess(); + } } } - public void doStop() { - stopIfRunning(); + public void stop() { + if (state.compareAndSet(State.STARTED, State.STOPPED)) { + logger.info("Stopping job queue"); + jobsToBeProcessed.add(END_ENTRY); + } + } + + public boolean started() { + return state.get() == State.STARTED; + } + + private void doStart() { + logger.info("Starting job queue"); + threadPool.executor(ThreadPool.Names.GENERIC).execute(new QueueReaderThread()); } public boolean loadQueue() { @@ -196,11 +228,11 @@ public class AlertActionManager { } protected AlertActionEntry parseHistory(String historyId, BytesReference source, long version) { - return parseHistory(historyId, source, version, actionRegistry, logger); + return parseHistory(historyId, source, version, actionRegistry); } protected static AlertActionEntry parseHistory(String historyId, BytesReference source, long version, - AlertActionRegistry actionRegistry, ESLogger logger) { + AlertActionRegistry actionRegistry) { AlertActionEntry entry = new AlertActionEntry(); entry.setId(historyId); entry.setVersion(version); @@ -249,7 +281,7 @@ public class AlertActionManager { entry.setScheduledTime(DateTime.parse(parser.text())); break; case QUERY_RAN_FIELD: - entry.setTriggeringQuery(parser.text()); + entry.setTriggeringSearchRequest(parser.text()); break; case NUMBER_OF_RESULTS_FIELD: entry.setNumberOfResults(parser.longValue()); @@ -261,7 +293,7 @@ public class AlertActionManager { throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]"); } } else { - + throw new ElasticsearchIllegalArgumentException("Unexpected token [" + token + "]"); } } } catch (IOException e) { @@ -271,24 +303,17 @@ public class AlertActionManager { } - public boolean addHistory(String alertName, boolean triggered, - DateTime fireTime, DateTime scheduledFireTime, SearchRequestBuilder triggeringQuery, - AlertTrigger trigger, long numberOfResults, - List actions, - @Nullable List indices) throws IOException { - - if (!client.admin().indices().prepareExists(ALERT_HISTORY_INDEX).execute().actionGet().isExists()) { - ClusterHealthStatus chs = createAlertHistoryIndex(); + public void addAlertAction(Alert alert, TriggerResult result, DateTime fireTime, DateTime scheduledFireTime) throws IOException { + if (!client.admin().indices().prepareExists(ALERT_HISTORY_INDEX).get().isExists()) { + createAlertHistoryIndex(); } AlertActionState state = AlertActionState.NO_ACTION_NEEDED; - if (triggered && !actions.isEmpty()) { + if (result.isTriggered() && !alert.actions().isEmpty()) { state = AlertActionState.ACTION_NEEDED; } - AlertActionEntry entry = new AlertActionEntry(alertName + " " + scheduledFireTime.toDateTimeISO(), 1, alertName, triggered, fireTime, scheduledFireTime, trigger, - triggeringQuery.toString(), numberOfResults, actions, indices, state); - + AlertActionEntry entry = new AlertActionEntry(alert, result, fireTime, scheduledFireTime, state); XContentBuilder historyEntry = XContentFactory.jsonBuilder(); entry.toXContent(historyEntry, ToXContent.EMPTY_PARAMS); @@ -298,30 +323,13 @@ public class AlertActionManager { indexRequest.id(entry.getId()); indexRequest.source(historyEntry); indexRequest.listenerThreaded(false); - indexRequest.operationThreaded(false); - indexRequest.refresh(true); //Always refresh after indexing an alert indexRequest.opType(IndexRequest.OpType.CREATE); - try { - if (client.index(indexRequest).actionGet().isCreated()) { - jobsToBeProcessed.add(entry); - return true; - } else { - return false; - } - } catch (DocumentAlreadyExistsException daee){ - logger.warn("Someone has already created a history entry for this alert run"); - return false; + client.index(indexRequest).actionGet(); + if (state != AlertActionState.NO_ACTION_NEEDED) { + jobsToBeProcessed.add(entry); } } - private void stopIfRunning() { - if (running.compareAndSet(true, false)) { - logger.info("Stopping job queue"); - jobsToBeProcessed.add(END_ENTRY); - } - } - - private ClusterHealthStatus createAlertHistoryIndex() { CreateIndexResponse cir = client.admin().indices().prepareCreate(ALERT_HISTORY_INDEX).addMapping(ALERT_HISTORY_TYPE).execute().actionGet(); //TODO FIX MAPPINGS if (!cir.isAcknowledged()) { @@ -412,6 +420,12 @@ public class AlertActionManager { return true; } + private enum State { + STOPPED, + LOADING, + STARTED + + } } diff --git a/src/main/java/org/elasticsearch/alerts/actions/EmailAlertAction.java b/src/main/java/org/elasticsearch/alerts/actions/EmailAlertAction.java index 65505120944..b40672a6f1f 100644 --- a/src/main/java/org/elasticsearch/alerts/actions/EmailAlertAction.java +++ b/src/main/java/org/elasticsearch/alerts/actions/EmailAlertAction.java @@ -90,7 +90,7 @@ public class EmailAlertAction implements AlertAction { StringBuffer output = new StringBuffer(); output.append("The following query triggered because " + result.getTrigger().toString() + "\n"); output.append("The total number of hits returned : " + result.getNumberOfResults() + "\n"); - output.append("For query : " + result.getTriggeringQuery()); + output.append("For query : " + result.getTriggeringSearchRequest()); output.append("\n"); output.append("Indices : "); for (String index : result.getIndices()) { diff --git a/src/main/java/org/elasticsearch/alerts/plugin/AlertsPlugin.java b/src/main/java/org/elasticsearch/alerts/plugin/AlertsPlugin.java index e95e57b5172..c228c09a304 100644 --- a/src/main/java/org/elasticsearch/alerts/plugin/AlertsPlugin.java +++ b/src/main/java/org/elasticsearch/alerts/plugin/AlertsPlugin.java @@ -6,9 +6,7 @@ package org.elasticsearch.alerts.plugin; import org.elasticsearch.alerts.AlertingModule; -import org.elasticsearch.alerts.scheduler.AlertScheduler; import org.elasticsearch.common.collect.Lists; -import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.plugins.AbstractPlugin; @@ -29,13 +27,6 @@ public class AlertsPlugin extends AbstractPlugin { return "Elasticsearch Alerts"; } - @Override - public Collection> services() { - Collection> services = Lists.newArrayList(); - services.add(AlertScheduler.class); - return services; - } - @Override public Collection> modules() { Collection> modules = Lists.newArrayList(); diff --git a/src/main/java/org/elasticsearch/alerts/scheduler/AlertScheduler.java b/src/main/java/org/elasticsearch/alerts/scheduler/AlertScheduler.java index 3d9f2127918..32d46ea4ce7 100644 --- a/src/main/java/org/elasticsearch/alerts/scheduler/AlertScheduler.java +++ b/src/main/java/org/elasticsearch/alerts/scheduler/AlertScheduler.java @@ -6,58 +6,25 @@ package org.elasticsearch.alerts.scheduler; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.alerts.Alert; -import org.elasticsearch.alerts.actions.AlertActionManager; import org.elasticsearch.alerts.AlertManager; -import org.elasticsearch.alerts.triggers.TriggerManager; -import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterStateListener; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.index.query.FilteredQueryBuilder; -import org.elasticsearch.index.query.RangeFilterBuilder; -import org.elasticsearch.index.query.TemplateQueryBuilder; -import org.elasticsearch.script.ExecutableScript; -import org.elasticsearch.script.ScriptService; import org.quartz.*; import org.quartz.impl.StdSchedulerFactory; import org.quartz.simpl.SimpleJobFactory; -import java.util.Date; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; +public class AlertScheduler extends AbstractComponent { -public class AlertScheduler extends AbstractLifecycleComponent implements ClusterStateListener { - - private final Client client; private final Scheduler scheduler; private final AlertManager alertManager; - private final TriggerManager triggerManager; - private final ScriptService scriptService; - - private AlertActionManager actionManager; - - - private final AtomicBoolean run = new AtomicBoolean(false); @Inject - public AlertScheduler(Settings settings, AlertManager alertManager, Client client, - TriggerManager triggerManager, ScriptService scriptService, - ClusterService clusterService) { + public AlertScheduler(Settings settings, AlertManager alertManager) { super(settings); this.alertManager = alertManager; - this.client = client; - this.triggerManager = triggerManager; - this.scriptService = scriptService; try { SchedulerFactory schFactory = new StdSchedulerFactory(); scheduler = schFactory.getScheduler(); @@ -65,54 +32,37 @@ public class AlertScheduler extends AbstractLifecycleComponent implements Cluste } catch (SchedulerException e) { throw new ElasticsearchException("Failed to instantiate scheduler", e); } - clusterService.add(this); alertManager.setAlertScheduler(this); } - @Override - public void clusterChanged(ClusterChangedEvent event) { - if (event.state().nodes().localNodeMaster()) { - if (run.compareAndSet(false, true)) { - try { - logger.info("Starting scheduler"); - scheduler.start(); - } catch (SchedulerException se){ - logger.error("Failed to start quartz scheduler", se); - } - } - } else { - stopIfRunning(); + public void start() { + try { + logger.info("Starting scheduler"); + scheduler.start(); + } catch (SchedulerException se){ + logger.error("Failed to start quartz scheduler", se); } } - private void stopIfRunning() { - if (run.compareAndSet(true, false)) { - try { - logger.info("Stopping scheduler"); - if (!scheduler.isShutdown()) { - scheduler.clear(); - scheduler.shutdown(false); - } - } catch (SchedulerException se){ - logger.error("Failed to stop quartz scheduler", se); + public void stop() { + try { + logger.info("Stopping scheduler"); + if (!scheduler.isShutdown()) { + scheduler.clear(); + scheduler.shutdown(false); } + } catch (SchedulerException se){ + logger.error("Failed to stop quartz scheduler", se); } } - @Override - protected void doStart() throws ElasticsearchException { + public void executeAlert(String alertName, JobExecutionContext jobExecutionContext){ + DateTime scheduledFireTime = new DateTime(jobExecutionContext.getScheduledFireTime()); + DateTime fireTime = new DateTime(jobExecutionContext.getFireTime()); + alertManager.executeAlert(alertName, scheduledFireTime, fireTime); } - @Override - protected void doStop() throws ElasticsearchException { - stopIfRunning(); - } - - @Override - protected void doClose() throws ElasticsearchException { - } - - public boolean deleteAlertFromSchedule(String alertName) { + public boolean remove(String alertName) { try { return scheduler.deleteJob(new JobKey(alertName)); } catch (SchedulerException se){ @@ -128,89 +78,18 @@ public class AlertScheduler extends AbstractLifecycleComponent implements Cluste } } - public void executeAlert(String alertName, JobExecutionContext jobExecutionContext){ - logger.warn("Running [{}]",alertName); - Alert alert = alertManager.getAlertForName(alertName); - DateTime scheduledTime = new DateTime(jobExecutionContext.getScheduledFireTime()); - if (!alert.enabled()) { - logger.warn("Alert [{}] is not enabled", alertName); - return; - } - try { - if (!alertManager.claimAlertRun(alertName, scheduledTime) ){ - logger.warn("Another process has already run this alert."); - return; - } - alert = alertManager.getAlertForName(alertName); //The claim may have triggered a refresh - - SearchRequestBuilder srb = createClampedRequest(client, jobExecutionContext, alert); - String[] indices = alert.indices().toArray(new String[0]); - - if (alert.indices() != null ){ - logger.warn("Setting indices to : " + alert.indices()); - srb.setIndices(indices); - } - - //if (logger.isDebugEnabled()) { - logger.warn("Running query [{}]", XContentHelper.convertToJson(srb.request().source(), false, true)); - //} - - SearchResponse sr = srb.execute().get(); - logger.warn("Got search response hits : [{}]", sr.getHits().getTotalHits() ); - - boolean isTriggered = triggerManager.isTriggered(alertName,sr); - - alertManager.updateLastRan(alertName, new DateTime(jobExecutionContext.getFireTime()),scheduledTime); - if (!alertManager.addHistory(alertName, isTriggered, - new DateTime(jobExecutionContext.getScheduledFireTime()), scheduledTime, srb, - alert.trigger(), sr.getHits().getTotalHits(), alert.actions(), alert.indices())) - { - logger.warn("Failed to store history for alert [{}]", alertName); - } - - } catch (Exception e) { - logger.error("Failed execute alert [{}]", e, alertName); - } - } - - private SearchRequestBuilder createClampedRequest(Client client, JobExecutionContext jobExecutionContext, Alert alert){ - Date scheduledFireTime = jobExecutionContext.getScheduledFireTime(); - DateTime clampEnd = new DateTime(scheduledFireTime); - DateTime clampStart = clampEnd.minusSeconds((int)alert.timePeriod().seconds()); - if (alert.simpleQuery()) { - TemplateQueryBuilder queryBuilder = new TemplateQueryBuilder(alert.queryName(), ScriptService.ScriptType.INDEXED, new HashMap()); - RangeFilterBuilder filterBuilder = new RangeFilterBuilder(alert.timestampString()); - filterBuilder.gte(clampStart); - filterBuilder.lt(clampEnd); - return client.prepareSearch().setQuery(new FilteredQueryBuilder(queryBuilder, filterBuilder)); - } else { - //We can't just wrap the template here since it probably contains aggs or something else that doesn't play nice with FilteredQuery - Map fromToMap = new HashMap<>(); - fromToMap.put("from", clampStart); //@TODO : make these parameters configurable ? Don't want to bloat the API too much tho - fromToMap.put("to", clampEnd); - //Go and get the search template from the script service :( - ExecutableScript script = scriptService.executable("mustache", alert.queryName(), ScriptService.ScriptType.INDEXED, fromToMap); - BytesReference requestBytes = (BytesReference)(script.run()); - return client.prepareSearch().setSource(requestBytes); - } - } - - public void addAlert(String alertName, Alert alert) { + public void add(String alertName, Alert alert) { JobDetail job = JobBuilder.newJob(AlertExecutorJob.class).withIdentity(alertName).build(); - job.getJobDataMap().put("manager",this); + job.getJobDataMap().put("manager", this); CronTrigger cronTrigger = TriggerBuilder.newTrigger() .withSchedule(CronScheduleBuilder.cronSchedule(alert.schedule())) .build(); try { - logger.warn("Scheduling [{}] with schedule [{}]", alertName, alert.schedule()); + logger.trace("Scheduling [{}] with schedule [{}]", alertName, alert.schedule()); scheduler.scheduleJob(job, cronTrigger); } catch (SchedulerException se) { logger.error("Failed to schedule job",se); } } - public boolean isRunning() { - return true; - } - } diff --git a/src/main/java/org/elasticsearch/alerts/triggers/TriggerManager.java b/src/main/java/org/elasticsearch/alerts/triggers/TriggerManager.java index 420427b902f..e2cdb49d696 100644 --- a/src/main/java/org/elasticsearch/alerts/triggers/TriggerManager.java +++ b/src/main/java/org/elasticsearch/alerts/triggers/TriggerManager.java @@ -7,17 +7,26 @@ package org.elasticsearch.alerts.triggers; import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchIllegalStateException; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.alerts.Alert; -import org.elasticsearch.alerts.AlertManager; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.*; +import org.elasticsearch.index.query.FilteredQueryBuilder; +import org.elasticsearch.index.query.RangeFilterBuilder; +import org.elasticsearch.index.query.TemplateQueryBuilder; import org.elasticsearch.script.ExecutableScript; import org.elasticsearch.script.ScriptService; +import org.elasticsearch.search.builder.SearchSourceBuilder; import java.io.IOException; +import java.util.HashMap; import java.util.Map; @@ -26,7 +35,7 @@ import java.util.Map; */ public class TriggerManager extends AbstractComponent { - private final AlertManager alertManager; + private final Client client; private final ScriptService scriptService; public static AlertTrigger parseTrigger(XContentParser parser) throws IOException { @@ -78,13 +87,58 @@ public class TriggerManager extends AbstractComponent { } @Inject - public TriggerManager(Settings settings, AlertManager alertManager, ScriptService scriptService) { + public TriggerManager(Settings settings, Client client, ScriptService scriptService) { super(settings); - this.alertManager = alertManager; + this.client = client; this.scriptService = scriptService; } - public boolean doScriptTrigger(ScriptedAlertTrigger scriptTrigger, SearchResponse response) { + public TriggerResult isTriggered(Alert alert, DateTime scheduledFireTime) throws Exception { + SearchRequest request = createClampedRequest(scheduledFireTime, alert); + if (logger.isTraceEnabled()) { + logger.trace("For alert [{}] running query for [{}]", alert.alertName(), XContentHelper.convertToJson(request.source(), false, true)); + } + + SearchResponse response = client.search(request).get(); + logger.debug("Ran alert [{}] and got hits : [{}]", alert.alertName(), response.getHits().getTotalHits()); + switch (alert.trigger().triggerType()) { + case NUMBER_OF_EVENTS: + return doSimpleTrigger(alert, request, response); + case SCRIPT: + return doScriptTrigger(alert, request, response); + default: + throw new ElasticsearchIllegalArgumentException("Bad value for trigger.triggerType [" + alert.trigger().triggerType() + "]"); + } + } + + private TriggerResult doSimpleTrigger(Alert alert, SearchRequest request, SearchResponse response) { + boolean triggered = false; + long testValue = response.getHits().getTotalHits(); + int triggerValue = alert.trigger().value(); + //Move this to SimpleTrigger + switch (alert.trigger().trigger()) { + case GREATER_THAN: + triggered = testValue > triggerValue; + break; + case LESS_THAN: + triggered = testValue < triggerValue; + break; + case EQUAL: + triggered = testValue == triggerValue; + break; + case NOT_EQUAL: + triggered = testValue != triggerValue; + break; + case RISES_BY: + case FALLS_BY: + triggered = false; //TODO FIX THESE + break; + } + return new TriggerResult(triggered, request, response); + } + + private TriggerResult doScriptTrigger(Alert alert, SearchRequest request, SearchResponse response) { + boolean triggered = false; try { XContentBuilder builder = XContentFactory.jsonBuilder(); builder.startObject(); @@ -92,54 +146,45 @@ public class TriggerManager extends AbstractComponent { builder.endObject(); Map responseMap = XContentHelper.convertToMap(builder.bytes(), false).v2(); - ExecutableScript executable = scriptService.executable(scriptTrigger.scriptLang, scriptTrigger.script, - scriptTrigger.scriptType, responseMap); + ScriptedAlertTrigger scriptTrigger = alert.trigger().scriptedTrigger(); + ExecutableScript executable = scriptService.executable( + scriptTrigger.scriptLang, scriptTrigger.script, scriptTrigger.scriptType, responseMap + ); Object returnValue = executable.run(); - logger.warn("Returned [{}] from script", returnValue); + logger.trace("Returned [{}] from script", returnValue); if (returnValue instanceof Boolean) { - return (Boolean) returnValue; + triggered = (Boolean) returnValue; } else { - throw new ElasticsearchIllegalStateException("Trigger script [" + scriptTrigger.script + "] " + - "did not return a Boolean"); + throw new ElasticsearchIllegalStateException("Trigger script [" + scriptTrigger.script + "] did not return a Boolean"); } } catch (Exception e ){ logger.error("Failed to execute script trigger", e); } - return false; + return new TriggerResult(triggered, request, response); } - public boolean isTriggered(String alertName, SearchResponse response) { - Alert alert = this.alertManager.getAlertForName(alertName); - if (alert == null){ - logger.warn("Could not find alert named [{}] in alert manager perhaps it has been deleted.", alertName); - return false; + private SearchRequest createClampedRequest(DateTime scheduledFireTime, Alert alert){ + DateTime clampEnd = new DateTime(scheduledFireTime); + DateTime clampStart = clampEnd.minusSeconds((int)alert.timePeriod().seconds()); + SearchRequest request = new SearchRequest(alert.indices().toArray(new String[0])); + if (alert.simpleQuery()) { + TemplateQueryBuilder queryBuilder = new TemplateQueryBuilder(alert.queryName(), ScriptService.ScriptType.INDEXED, new HashMap()); + RangeFilterBuilder filterBuilder = new RangeFilterBuilder(alert.timestampString()); + filterBuilder.gte(clampStart); + filterBuilder.lt(clampEnd); + request.source(new SearchSourceBuilder().query(new FilteredQueryBuilder(queryBuilder, filterBuilder))); + } else { + //We can't just wrap the template here since it probably contains aggs or something else that doesn't play nice with FilteredQuery + Map fromToMap = new HashMap<>(); + fromToMap.put("from", clampStart); //@TODO : make these parameters configurable ? Don't want to bloat the API too much tho + fromToMap.put("to", clampEnd); + //Go and get the search template from the script service :( + ExecutableScript script = scriptService.executable("mustache", alert.queryName(), ScriptService.ScriptType.INDEXED, fromToMap); + BytesReference requestBytes = (BytesReference)(script.run()); + request.source(requestBytes, false); } - long testValue; - switch (alert.trigger().triggerType()) { - case NUMBER_OF_EVENTS: - testValue = response.getHits().getTotalHits(); - break; - case SCRIPT: - return doScriptTrigger(alert.trigger().scriptedTrigger(), response); - default: - throw new ElasticsearchIllegalArgumentException("Bad value for trigger.triggerType [" + alert.trigger().triggerType() + "]"); - } - int triggerValue = alert.trigger().value(); - //Move this to SimpleTrigger - switch (alert.trigger().trigger()) { - case GREATER_THAN: - return testValue > triggerValue; - case LESS_THAN: - return testValue < triggerValue; - case EQUAL: - return testValue == triggerValue; - case NOT_EQUAL: - return testValue != triggerValue; - case RISES_BY: - case FALLS_BY: - return false; //TODO FIX THESE - } - return false; + request.indicesOptions(IndicesOptions.lenientExpandOpen()); + return request; } } diff --git a/src/main/java/org/elasticsearch/alerts/triggers/TriggerResult.java b/src/main/java/org/elasticsearch/alerts/triggers/TriggerResult.java new file mode 100644 index 00000000000..568ef448e7d --- /dev/null +++ b/src/main/java/org/elasticsearch/alerts/triggers/TriggerResult.java @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.alerts.triggers; + +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; + +/** + */ +public class TriggerResult { + + private final boolean triggered; + private final SearchRequest request; + private final SearchResponse response; + + public TriggerResult(boolean triggered, SearchRequest request, SearchResponse response) { + this.triggered = triggered; + this.request = request; + this.response = response; + } + + public boolean isTriggered() { + return triggered; + } + + public SearchRequest getRequest() { + return request; + } + + public SearchResponse getResponse() { + return response; + } +} diff --git a/src/test/java/org/elasticsearch/alerts/BasicAlertingTest.java b/src/test/java/org/elasticsearch/alerts/BasicAlertingTest.java index ba82aa119e7..850b4d779ba 100644 --- a/src/test/java/org/elasticsearch/alerts/BasicAlertingTest.java +++ b/src/test/java/org/elasticsearch/alerts/BasicAlertingTest.java @@ -8,7 +8,6 @@ package org.elasticsearch.alerts; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; import org.elasticsearch.alerts.actions.*; import org.elasticsearch.alerts.plugin.AlertsPlugin; -import org.elasticsearch.alerts.scheduler.AlertScheduler; import org.elasticsearch.alerts.triggers.AlertTrigger; import org.elasticsearch.alerts.triggers.ScriptedAlertTrigger; import org.elasticsearch.common.settings.ImmutableSettings; @@ -50,9 +49,9 @@ public class BasicAlertingTest extends ElasticsearchIntegrationTest { public void testAlerSchedulerStartsProperly() throws Exception { createIndex("my-index"); - createIndex(AlertManager.ALERT_INDEX); + createIndex(AlertsStore.ALERT_INDEX); createIndex(AlertActionManager.ALERT_HISTORY_INDEX); - ensureGreen("my-index", AlertManager.ALERT_INDEX, AlertActionManager.ALERT_HISTORY_INDEX); + ensureGreen("my-index", AlertsStore.ALERT_INDEX, AlertActionManager.ALERT_HISTORY_INDEX); client().preparePutIndexedScript() .setScriptLang("mustache") @@ -69,9 +68,6 @@ public class BasicAlertingTest extends ElasticsearchIntegrationTest { assertThat(templatesResponse.getIndexTemplates().size(), equalTo(1)); assertThat(templatesResponse.getIndexTemplates().get(0).getName(), equalTo("query"));*/ - AlertScheduler alertScheduler = internalCluster().getInstance(AlertScheduler.class, internalCluster().getMasterName()); - assertThat(alertScheduler.isRunning(), is(true)); - final AlertManager alertManager = internalCluster().getInstance(AlertManager.class, internalCluster().getMasterName()); assertBusy(new Runnable() { @Override @@ -79,8 +75,6 @@ public class BasicAlertingTest extends ElasticsearchIntegrationTest { assertThat(alertManager.isStarted(), is(true)); } }); - - final AtomicBoolean alertActionInvoked = new AtomicBoolean(false); final AlertAction alertAction = new AlertAction() { @Override diff --git a/src/test/java/org/elasticsearch/alerts/actions/AlertActionsTest.java b/src/test/java/org/elasticsearch/alerts/actions/AlertActionsTest.java index f5d939464bb..50eaca0cee0 100644 --- a/src/test/java/org/elasticsearch/alerts/actions/AlertActionsTest.java +++ b/src/test/java/org/elasticsearch/alerts/actions/AlertActionsTest.java @@ -54,7 +54,7 @@ public class AlertActionsTest extends ElasticsearchIntegrationTest { builder.field(AlertActionState.FIELD_NAME, AlertActionState.ACTION_NEEDED.toString()); builder.endObject(); AlertActionRegistry alertActionRegistry = internalCluster().getInstance(AlertActionRegistry.class, internalCluster().getMasterName()); - AlertActionEntry actionEntry = AlertActionManager.parseHistory("foobar", builder.bytes(), 0, alertActionRegistry, logger); + AlertActionEntry actionEntry = AlertActionManager.parseHistory("foobar", builder.bytes(), 0, alertActionRegistry); assertEquals(actionEntry.getVersion(), 0); assertEquals(actionEntry.getAlertName(), "testName");