diff --git a/src/main/java/org/elasticsearch/alerts/AlertManager.java b/src/main/java/org/elasticsearch/alerts/AlertManager.java index 3986de08621..d93422e9eb9 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertManager.java +++ b/src/main/java/org/elasticsearch/alerts/AlertManager.java @@ -22,6 +22,8 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.alerts.actions.AlertAction; import org.elasticsearch.alerts.actions.AlertActionManager; +import org.elasticsearch.alerts.actions.AlertActionRegistry; +import org.elasticsearch.alerts.actions.AlertActionEntry; import org.elasticsearch.alerts.scheduler.AlertScheduler; import org.elasticsearch.alerts.triggers.AlertTrigger; import org.elasticsearch.alerts.triggers.TriggerManager; @@ -31,7 +33,6 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Priority; import org.elasticsearch.common.collect.ImmutableMap; @@ -46,6 +47,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.ArrayList; @@ -59,11 +61,9 @@ import java.util.concurrent.atomic.AtomicBoolean; public class AlertManager extends AbstractLifecycleComponent { + public static final String ALERT_INDEX = ".alerts"; public static final String ALERT_TYPE = "alert"; - public static final String ALERT_HISTORY_INDEX = "alerthistory"; - public static final String ALERT_HISTORY_TYPE = "alertHistory"; - public static final ParseField QUERY_FIELD = new ParseField("query"); public static final ParseField SCHEDULE_FIELD = new ParseField("schedule"); public static final ParseField TRIGGER_FIELD = new ParseField("trigger"); @@ -79,11 +79,16 @@ public class AlertManager extends AbstractLifecycleComponent { private final Client client; private AlertScheduler scheduler; + private final ThreadPool threadPool; private final ConcurrentMap alertMap; private AtomicBoolean started = new AtomicBoolean(false); + private AtomicBoolean startActions = new AtomicBoolean(false); + + private AlertActionRegistry actionRegistry; private AlertActionManager actionManager; + final TimeValue defaultTimePeriod = new TimeValue(300*1000); //TODO : read from config @@ -93,10 +98,6 @@ public class AlertManager extends AbstractLifecycleComponent { } } - public void setActionManager(AlertActionManager actionManager){ - this.actionManager = actionManager; - } - @Override protected void doStart() throws ElasticsearchException { logger.warn("STARTING"); @@ -114,21 +115,25 @@ public class AlertManager extends AbstractLifecycleComponent { @Inject - public AlertManager(Settings settings, Client client, ClusterService clusterService) { + public AlertManager(Settings settings, Client client, ClusterService clusterService, ThreadPool threadPool, + AlertActionRegistry actionRegistry) { super(settings); logger.warn("Initing AlertManager"); this.client = client; alertMap = ConcurrentCollections.newConcurrentMap(); clusterService.add(new AlertsClusterStateListener()); + this.threadPool = threadPool; + this.actionRegistry = actionRegistry; + this.actionManager = new AlertActionManager(client, this, actionRegistry, threadPool); } public void setAlertScheduler(AlertScheduler scheduler){ this.scheduler = scheduler; } + private ClusterHealthStatus createAlertsIndex() { CreateIndexResponse cir = client.admin().indices().prepareCreate(ALERT_INDEX).addMapping(ALERT_TYPE).execute().actionGet(); //TODO FIX MAPPINGS - logger.warn(cir.toString()); ClusterHealthResponse actionGet = client.admin().cluster() .health(Requests.clusterHealthRequest(ALERT_INDEX).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet(); return actionGet.getStatus(); @@ -144,6 +149,25 @@ public class AlertManager extends AbstractLifecycleComponent { } } + + public void doAction(Alert alert, AlertActionEntry result, DateTime scheduledTime) { + logger.warn("We have triggered"); + DateTime lastActionFire = timeActionLastTriggered(alert.alertName()); + long msSinceLastAction = scheduledTime.getMillis() - lastActionFire.getMillis(); + logger.error("last action fire [{}]", lastActionFire); + logger.error("msSinceLastAction [{}]", msSinceLastAction); + + if (alert.timePeriod().getMillis() > msSinceLastAction) { + logger.warn("Not firing action because it was fired in the timePeriod"); + } else { + actionRegistry.doAction(alert, result); + logger.warn("Did action !"); + + alert.lastActionFire(scheduledTime); + persistAlert(alert.alertName(), alert, IndexRequest.OpType.INDEX); + } + } + public boolean claimAlertRun(String alertName, DateTime scheduleRunTime) { Alert indexedAlert; try { @@ -235,6 +259,7 @@ public class AlertManager extends AbstractLifecycleComponent { if (!client.admin().indices().prepareExists(ALERT_INDEX).execute().actionGet().isExists()) { createAlertsIndex(); } + SearchResponse searchResponse = client.prepareSearch().setSource( "{ \"query\" : " + "{ \"match_all\" : {}}," + @@ -257,15 +282,11 @@ public class AlertManager extends AbstractLifecycleComponent { return 0; } - public boolean updateLastRan(String alertName, DateTime fireTime, DateTime scheduledTime, boolean firedAction) throws Exception { + public boolean updateLastRan(String alertName, DateTime fireTime, DateTime scheduledTime) throws Exception { try { Alert alert = getAlertForName(alertName); alert.lastRan(fireTime); XContentBuilder alertBuilder = XContentFactory.jsonBuilder().prettyPrint(); - if (firedAction) { - logger.error("Fired action [{}]",firedAction); - alert.lastActionFire(scheduledTime); - } alert.toXContent(alertBuilder, ToXContent.EMPTY_PARAMS); logger.error(XContentHelper.convertToJson(alertBuilder.bytes(),false,true)); UpdateRequest updateRequest = new UpdateRequest(); @@ -283,46 +304,6 @@ public class AlertManager extends AbstractLifecycleComponent { } } - public boolean addHistory(String alertName, boolean triggered, - DateTime fireTime, SearchRequestBuilder triggeringQuery, - AlertTrigger trigger, long numberOfResults, - List actions, - @Nullable List indices) throws Exception { - XContentBuilder historyEntry = XContentFactory.jsonBuilder(); - historyEntry.startObject(); - historyEntry.field("alertName", alertName); - historyEntry.field("triggered", triggered); - historyEntry.field("fireTime", fireTime.toDateTimeISO()); - historyEntry.field("trigger"); - trigger.toXContent(historyEntry, ToXContent.EMPTY_PARAMS); - historyEntry.field("queryRan", triggeringQuery.toString()); - historyEntry.field("numberOfResults", numberOfResults); - historyEntry.field("actions"); - historyEntry.startArray(); - for (AlertAction action : actions) { - action.toXContent(historyEntry, ToXContent.EMPTY_PARAMS); - } - historyEntry.endArray(); - if (indices != null) { - historyEntry.field("indices"); - historyEntry.startArray(); - for (String index : indices) { - historyEntry.value(index); - } - historyEntry.endArray(); - } - historyEntry.endObject(); - IndexRequest indexRequest = new IndexRequest(); - indexRequest.index(ALERT_HISTORY_INDEX); - indexRequest.type(ALERT_HISTORY_TYPE); - indexRequest.source(historyEntry); - indexRequest.listenerThreaded(false); - indexRequest.operationThreaded(false); - indexRequest.refresh(true); //Always refresh after indexing an alert - indexRequest.opType(IndexRequest.OpType.CREATE); - client.index(indexRequest).actionGet().isCreated(); - return true; - } public boolean deleteAlert(String alertName) throws InterruptedException, ExecutionException { if (!started.get()) { @@ -433,12 +414,7 @@ public class AlertManager extends AbstractLifecycleComponent { public Alert parseAlert(String alertId, Map fields, long version ) { - //Map fields = sh.getFields(); logger.warn("Parsing : [{}]", alertId); - for (String field : fields.keySet() ) { - logger.warn("Field : [{}]", field); - } - String query = fields.get(QUERY_FIELD.getPreferredName()).toString(); String schedule = fields.get(SCHEDULE_FIELD.getPreferredName()).toString(); Object triggerObj = fields.get(TRIGGER_FIELD.getPreferredName()); @@ -457,9 +433,9 @@ public class AlertManager extends AbstractLifecycleComponent { List actions = null; if (actionObj instanceof Map) { Map actionMap = (Map) actionObj; - actions = actionManager.parseActionsFromMap(actionMap); + actions = actionRegistry.parseActionsFromMap(actionMap); } else { - throw new ElasticsearchException("Unable to parse actions [" + triggerObj + "]"); + throw new ElasticsearchException("Unable to parse actions [" + actionObj + "]"); } DateTime lastRan = new DateTime(0); @@ -540,33 +516,65 @@ public class AlertManager extends AbstractLifecycleComponent { return started.get(); } + public boolean addHistory(String alertName, boolean isTriggered, DateTime dateTime, DateTime scheduledTime, + SearchRequestBuilder srb, AlertTrigger trigger, long totalHits, List actions, + List indices) throws IOException{ + return actionManager.addHistory(alertName, isTriggered, dateTime, scheduledTime, srb, trigger, totalHits, actions, indices); + } + private final class AlertsClusterStateListener implements ClusterStateListener { @Override public void clusterChanged(ClusterChangedEvent event) { - if (event.indicesDeleted().contains(ALERT_INDEX)) { - alertMap.clear(); + if (!event.localNodeMaster()) { //We are not the master + if (started.compareAndSet(false, true)) { + scheduler.clearAlerts(); + alertMap.clear(); + } + + if (startActions.compareAndSet(false, true)) { + //If actionManager was running and we aren't the master stop + actionManager.doStop(); //Safe to call this multiple times, it's a noop if we are already stopped + } + return; } if (!started.get()) { IndexMetaData alertIndexMetaData = event.state().getMetaData().index(ALERT_INDEX); if (alertIndexMetaData != null) { if (event.state().routingTable().index(ALERT_INDEX).allPrimaryShardsActive()) { - // TODO: Do on a different thread and have some kind of retry mechanism? - try { - loadAlerts(); - sendAlertsToScheduler(); - } catch (Exception e) { - logger.warn("Error during loading of alerts from an existing .alerts index... refresh the alerts manually"); - } started.set(true); + threadPool.executor(ThreadPool.Names.GENERIC).execute(new AlertLoader()); + } + } + } + + if (!startActions.get()) { + IndexMetaData indexMetaData = event.state().getMetaData().index(AlertActionManager.ALERT_HISTORY_INDEX); + if (indexMetaData != null) { + if (event.state().routingTable().index(ALERT_INDEX).allPrimaryShardsActive()) { + startActions.set(true); + actionManager.doStart(); } - } else { - started.set(true); } } } } + private class AlertLoader implements Runnable { + @Override + public void run() { + // TODO: have some kind of retry mechanism? + try { + loadAlerts(); + sendAlertsToScheduler(); + } catch (Exception e) { + logger.warn("Error during loading of alerts from an existing .alerts index... refresh the alerts manually"); + } + started.set(true); + + } + } + } diff --git a/src/main/java/org/elasticsearch/alerts/AlertResult.java b/src/main/java/org/elasticsearch/alerts/AlertResult.java deleted file mode 100644 index 9f22545fd1a..00000000000 --- a/src/main/java/org/elasticsearch/alerts/AlertResult.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.alerts; - -import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.alerts.triggers.AlertTrigger; -import org.elasticsearch.common.joda.time.DateTime; - -import java.util.Arrays; - -public class AlertResult { - public SearchResponse searchResponse; - public AlertTrigger trigger; - public String alertName; - public DateTime fireTime; - public boolean isTriggered; - public SearchRequestBuilder query; - public String[] indices; - - public AlertResult(String alertName, SearchResponse searchResponse, AlertTrigger trigger, boolean isTriggered, SearchRequestBuilder query, String[] indices, DateTime fireTime) { - this.searchResponse = searchResponse; - this.trigger = trigger; - this.isTriggered = isTriggered; - this.query = query; - this.indices = indices; - this.alertName = alertName; - this.fireTime = fireTime; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - AlertResult that = (AlertResult) o; - - if (isTriggered != that.isTriggered) return false; - if (!Arrays.equals(indices, that.indices)) return false; - if (query != null ? !query.equals(that.query) : that.query != null) return false; - if (searchResponse != null ? !searchResponse.equals(that.searchResponse) : that.searchResponse != null) - return false; - if (trigger != null ? !trigger.equals(that.trigger) : that.trigger != null) return false; - - return true; - } - - @Override - public int hashCode() { - int result = searchResponse != null ? searchResponse.hashCode() : 0; - result = 31 * result + (trigger != null ? trigger.hashCode() : 0); - result = 31 * result + (isTriggered ? 1 : 0); - result = 31 * result + (query != null ? query.hashCode() : 0); - result = 31 * result + (indices != null ? Arrays.hashCode(indices) : 0); - return result; - } - -} diff --git a/src/main/java/org/elasticsearch/alerts/AlertingModule.java b/src/main/java/org/elasticsearch/alerts/AlertingModule.java index 81030852fd3..700f817927d 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertingModule.java +++ b/src/main/java/org/elasticsearch/alerts/AlertingModule.java @@ -5,7 +5,7 @@ */ package org.elasticsearch.alerts; -import org.elasticsearch.alerts.actions.AlertActionManager; +import org.elasticsearch.alerts.actions.AlertActionRegistry; import org.elasticsearch.alerts.rest.AlertRestHandler; import org.elasticsearch.alerts.scheduler.AlertScheduler; import org.elasticsearch.alerts.triggers.TriggerManager; @@ -18,7 +18,7 @@ public class AlertingModule extends AbstractModule { bind(AlertManager.class).asEagerSingleton(); bind(TriggerManager.class).asEagerSingleton(); bind(AlertScheduler.class).asEagerSingleton(); - bind(AlertActionManager.class).asEagerSingleton(); + bind(AlertActionRegistry.class).asEagerSingleton(); bind(AlertRestHandler.class).asEagerSingleton(); } diff --git a/src/main/java/org/elasticsearch/alerts/actions/AlertAction.java b/src/main/java/org/elasticsearch/alerts/actions/AlertAction.java index f1724204696..d437b83b6fb 100644 --- a/src/main/java/org/elasticsearch/alerts/actions/AlertAction.java +++ b/src/main/java/org/elasticsearch/alerts/actions/AlertAction.java @@ -5,7 +5,7 @@ */ package org.elasticsearch.alerts.actions; -import org.elasticsearch.alerts.AlertResult; +import org.elasticsearch.alerts.Alert; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -15,5 +15,5 @@ public interface AlertAction extends ToXContent { public String getActionName(); public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException; - public boolean doAction(String alertName, AlertResult alert); + public boolean doAction(Alert alert, AlertActionEntry actionEntry); } diff --git a/src/main/java/org/elasticsearch/alerts/actions/AlertActionEntry.java b/src/main/java/org/elasticsearch/alerts/actions/AlertActionEntry.java new file mode 100644 index 00000000000..8a062b7c9fc --- /dev/null +++ b/src/main/java/org/elasticsearch/alerts/actions/AlertActionEntry.java @@ -0,0 +1,251 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.alerts.actions; + +import org.elasticsearch.alerts.triggers.AlertTrigger; +import org.elasticsearch.common.joda.time.DateTime; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.List; + +/** + */ +public class AlertActionEntry implements ToXContent{ + + private long version; + private String alertName; + private boolean triggered; + private DateTime fireTime; + private AlertTrigger trigger; + private String triggeringQuery; + private long numberOfResults; + private List actions; + private List indices; + private AlertActionState entryState; + private DateTime scheduledTime; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + private String id; + + public DateTime getScheduledTime() { + return scheduledTime; + } + + public void setScheduledTime(DateTime scheduledTime) { + this.scheduledTime = scheduledTime; + } + + public String getAlertName() { + return alertName; + } + + public void setAlertName(String alertName) { + this.alertName = alertName; + } + + public boolean isTriggered() { + return triggered; + } + + public void setTriggered(boolean triggered) { + this.triggered = triggered; + } + + public DateTime getFireTime() { + return fireTime; + } + + public void setFireTime(DateTime fireTime) { + this.fireTime = fireTime; + } + + public AlertTrigger getTrigger() { + return trigger; + } + + public void setTrigger(AlertTrigger trigger) { + this.trigger = trigger; + } + + public String getTriggeringQuery() { + return triggeringQuery; + } + + public void setTriggeringQuery(String triggeringQuery) { + this.triggeringQuery = triggeringQuery; + } + + public long getNumberOfResults() { + return numberOfResults; + } + + public void setNumberOfResults(long numberOfResults) { + this.numberOfResults = numberOfResults; + } + + public List getActions() { + return actions; + } + + public void setActions(List actions) { + this.actions = actions; + } + + public List getIndices() { + return indices; + } + + public void setIndices(List indices) { + this.indices = indices; + } + + public AlertActionState getEntryState() { + return entryState; + } + + public void setEntryState(AlertActionState entryState) { + this.entryState = entryState; + } + + public long getVersion() { + return version; + } + + public void setVersion(long version) { + this.version = version; + } + + protected AlertActionEntry() { + } + + public AlertActionEntry(String id, long version, String alertName, boolean triggered, DateTime fireTime, DateTime scheduledTime, AlertTrigger trigger, + String queryRan, long numberOfResults, List actions, + List indices, AlertActionState state) { + this.id = id; + this.version = version; + this.alertName = alertName; + this.triggered = triggered; + this.fireTime = fireTime; + this.scheduledTime = scheduledTime; + this.trigger = trigger; + this.triggeringQuery = queryRan; + this.numberOfResults = numberOfResults; + this.actions = actions; + this.indices = indices; + this.entryState = state; + } + + @Override + public XContentBuilder toXContent(XContentBuilder historyEntry, Params params) throws IOException { + historyEntry.startObject(); + historyEntry.field("alertName", alertName); + historyEntry.field("triggered", triggered); + historyEntry.field("fireTime", fireTime.toDateTimeISO()); + historyEntry.field(AlertActionManager.SCHEDULED_FIRE_TIME_FIELD, scheduledTime.toDateTimeISO()); + + historyEntry.field("trigger"); + trigger.toXContent(historyEntry, ToXContent.EMPTY_PARAMS); + + historyEntry.field("queryRan", triggeringQuery); + + historyEntry.field("numberOfResults", numberOfResults); + + historyEntry.field("actions"); + historyEntry.startObject(); + for (AlertAction action : actions) { + historyEntry.field(action.getActionName()); + action.toXContent(historyEntry, params); + } + historyEntry.endObject(); + + + if (indices != null) { + historyEntry.field("indices"); + historyEntry.startArray(); + for (String index : indices) { + historyEntry.value(index); + } + historyEntry.endArray(); + } + + historyEntry.field(AlertActionState.FIELD_NAME, entryState.toString()); + + historyEntry.endObject(); + + return historyEntry; + } + + + @Override + public String toString() { + return "AlertHistoryEntry{" + + "version=" + version + + ", alertName='" + alertName + '\'' + + ", triggered=" + triggered + + ", fireTime=" + fireTime + + ", trigger=" + trigger + + ", triggeringQuery='" + triggeringQuery + '\'' + + ", numberOfResults=" + numberOfResults + + ", actions=" + actions + + ", indices=" + indices + + ", entryState=" + entryState + + ", scheduledTime=" + scheduledTime + + ", id='" + id + '\'' + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + AlertActionEntry that = (AlertActionEntry) o; + + if (numberOfResults != that.numberOfResults) return false; + if (triggered != that.triggered) return false; + if (version != that.version) return false; + if (actions != null ? !actions.equals(that.actions) : that.actions != null) return false; + if (alertName != null ? !alertName.equals(that.alertName) : that.alertName != null) return false; + if (entryState != that.entryState) return false; + if (fireTime != null ? !fireTime.equals(that.fireTime) : that.fireTime != null) return false; + if (id != null ? !id.equals(that.id) : that.id != null) return false; + if (indices != null ? !indices.equals(that.indices) : that.indices != null) return false; + if (scheduledTime != null ? !scheduledTime.equals(that.scheduledTime) : that.scheduledTime != null) + return false; + if (trigger != null ? !trigger.equals(that.trigger) : that.trigger != null) return false; + if (triggeringQuery != null ? !triggeringQuery.equals(that.triggeringQuery) : that.triggeringQuery != null) + return false; + + return true; + } + + @Override + public int hashCode() { + int result = (int) (version ^ (version >>> 32)); + result = 31 * result + (alertName != null ? alertName.hashCode() : 0); + result = 31 * result + (triggered ? 1 : 0); + result = 31 * result + (fireTime != null ? fireTime.hashCode() : 0); + result = 31 * result + (trigger != null ? trigger.hashCode() : 0); + result = 31 * result + (triggeringQuery != null ? triggeringQuery.hashCode() : 0); + result = 31 * result + (int) (numberOfResults ^ (numberOfResults >>> 32)); + result = 31 * result + (actions != null ? actions.hashCode() : 0); + result = 31 * result + (indices != null ? indices.hashCode() : 0); + result = 31 * result + (entryState != null ? entryState.hashCode() : 0); + result = 31 * result + (scheduledTime != null ? scheduledTime.hashCode() : 0); + result = 31 * result + (id != null ? id.hashCode() : 0); + return result; + } + +} diff --git a/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java b/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java index 56940978579..398fddc9088 100644 --- a/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java +++ b/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java @@ -5,61 +5,381 @@ */ package org.elasticsearch.alerts.actions; -import org.elasticsearch.ElasticsearchIllegalArgumentException; -import org.elasticsearch.alerts.Alert; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.alerts.AlertManager; -import org.elasticsearch.alerts.AlertResult; +import org.elasticsearch.alerts.triggers.AlertTrigger; +import org.elasticsearch.alerts.triggers.TriggerManager; import org.elasticsearch.client.Client; -import org.elasticsearch.common.collect.ImmutableOpenMap; -import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.client.Requests; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.joda.time.DateTime; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.engine.DocumentAlreadyExistsException; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.threadpool.ThreadPool; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; -public class AlertActionManager extends AbstractComponent { +/** + */ +public class AlertActionManager { + public static final String ALERT_NAME_FIELD = "alertName"; + public static final String TRIGGERED_FIELD = "triggered"; + public static final String FIRE_TIME_FIELD = "fireTime"; + public static final String SCHEDULED_FIRE_TIME_FIELD = "scheduledFireTime"; + public static final String TRIGGER_FIELD = "trigger"; + public static final String QUERY_RAN_FIELD = "queryRan"; + public static final String NUMBER_OF_RESULTS_FIELD = "numberOfResults"; + public static final String ACTIONS_FIELD = "actions"; + public static final String INDICES_FIELD = "indices"; + public static final String ALERT_HISTORY_INDEX = "alerthistory"; + public static final String ALERT_HISTORY_TYPE = "alerthistory"; + + private final Client client; private final AlertManager alertManager; - private volatile ImmutableOpenMap actionImplemented; + private final AlertActionRegistry actionRegistry; + private final ThreadPool threadPool; - @Inject - public AlertActionManager(Settings settings, AlertManager alertManager, Client client) { - super(settings); - this.alertManager = alertManager; - this.actionImplemented = ImmutableOpenMap.builder() - .fPut("email", new EmailAlertActionFactory()) - .fPut("index", new IndexAlertActionFactory(client)) - .build(); - alertManager.setActionManager(this); + private final ESLogger logger = Loggers.getLogger(AlertActionManager.class); + + private BlockingQueue jobsToBeProcessed = new LinkedBlockingQueue<>(); + + public final AtomicBoolean running = new AtomicBoolean(false); + private Executor readerExecutor; + + private static AlertActionEntry END_ENTRY = new AlertActionEntry(); + + class AlertHistoryRunnable implements Runnable { + AlertActionEntry entry; + + AlertHistoryRunnable(AlertActionEntry entry) { + this.entry = entry; + } + + @Override + public void run() { + try { + if (claimAlertHistoryEntry(entry)) { + alertManager.doAction(alertManager.getAlertForName(entry.getAlertName()), entry, entry.getScheduledTime()); + updateHistoryEntry(entry, AlertActionState.ACTION_PERFORMED); + } else { + logger.warn("Unable to claim alert history entry" + entry); + } + } catch (Throwable t) { + logger.error("Failed to execute alert action", t); + } + + + } } - public void registerAction(String name, AlertActionFactory actionFactory){ - actionImplemented = ImmutableOpenMap.builder(actionImplemented) - .fPut(name, actionFactory) - .build(); + class QueueLoaderThread implements Runnable { + @Override + public void run() { + boolean success = false; + do { + try { + success = loadQueue(); + } catch (Exception e) { + logger.error("Unable to load the job queue", e); + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + + } + } + } while (!success); + } } - public List parseActionsFromMap(Map actionMap) { - ImmutableOpenMap actionImplemented = this.actionImplemented; - List actions = new ArrayList<>(); - for (Map.Entry actionEntry : actionMap.entrySet()) { - AlertActionFactory factory = actionImplemented.get(actionEntry.getKey()); - if (factory != null) { - actions.add(factory.createAction(actionEntry.getValue())); - } else { - throw new ElasticsearchIllegalArgumentException("No action exists with the name [" + actionEntry.getKey() + "]"); + class QueueReaderThread implements Runnable { + @Override + public void run() { + try { + logger.debug("Starting thread to read from the job queue"); + while (running.get()) { + AlertActionEntry entry = null; + do { + try { + entry = jobsToBeProcessed.take(); + } catch (InterruptedException ie) { + if (!running.get()) { + break; + } + } + } while (entry == null); + + if (!running.get() || entry == END_ENTRY) { + logger.debug("Stopping thread to read from the job queue"); + } + + threadPool.executor(ThreadPool.Names.MANAGEMENT) + .execute(new AlertHistoryRunnable(entry)); + } + } catch (Throwable t) { + logger.error("Error during reader thread", t); } } - return actions; } - public void doAction(String alertName, AlertResult alertResult){ - Alert alert = alertManager.getAlertForName(alertName); - for (AlertAction action : alert.actions()) { - action.doAction(alertName, alertResult); + public AlertActionManager(Client client, AlertManager alertManager, + AlertActionRegistry actionRegistry, + ThreadPool threadPool) { + this.client = client; + this.alertManager = alertManager; + this.actionRegistry = actionRegistry; + this.threadPool = threadPool; + } + + public void doStart() { + if (running.compareAndSet(false, true)) { + logger.info("Starting job queue"); + readerExecutor = threadPool.executor(ThreadPool.Names.GENERIC); + readerExecutor.execute(new QueueReaderThread()); + threadPool.executor(ThreadPool.Names.GENERIC).execute(new QueueLoaderThread()); } } + public void doStop() { + stopIfRunning(); + } + + public boolean loadQueue() { + if (!client.admin().indices().prepareExists(ALERT_HISTORY_INDEX).execute().actionGet().isExists()) { + createAlertHistoryIndex(); + } + + //@TODO: change to scan/scroll if we get back over 100 + SearchResponse searchResponse = client.prepareSearch().setSource( + "{ \"query\" : " + + "{ \"term\" : {" + + "\"" + AlertActionState.FIELD_NAME + "\" : \"" + AlertActionState.ACTION_NEEDED.toString() + "\"}}," + + "\"size\" : \"100\"" + + "}" + ).setTypes(ALERT_HISTORY_TYPE).setIndices(ALERT_HISTORY_INDEX).setListenerThreaded(false).execute().actionGet(); + + for (SearchHit sh : searchResponse.getHits()) { + String historyId = sh.getId(); + AlertActionEntry historyEntry = parseHistory(historyId, sh, sh.version()); + assert historyEntry.getEntryState() == AlertActionState.ACTION_NEEDED; + jobsToBeProcessed.add(historyEntry); + } + + return true; + } + + + + protected AlertActionEntry parseHistory(String historyId, SearchHit sh, long version) { + Map fields = sh.sourceAsMap(); + return parseHistory(historyId, fields, version); + } + + protected AlertActionEntry parseHistory(String historyId, Map fields, long version) { + return parseHistory(historyId, fields, version, actionRegistry, logger); + } + + protected static AlertActionEntry parseHistory(String historyId, Map fields, long version, + AlertActionRegistry actionRegistry, ESLogger logger) { + String alertName = fields.get(ALERT_NAME_FIELD).toString(); + boolean triggered = (Boolean)fields.get(TRIGGERED_FIELD); + DateTime fireTime = new DateTime(fields.get(FIRE_TIME_FIELD).toString()); + DateTime scheduledFireTime = new DateTime(fields.get(SCHEDULED_FIRE_TIME_FIELD).toString()); + AlertTrigger trigger = TriggerManager.parseTriggerFromMap((Map)fields.get(TRIGGER_FIELD)); + String queryRan = fields.get(QUERY_RAN_FIELD).toString(); + long numberOfResults = ((Number)fields.get(NUMBER_OF_RESULTS_FIELD)).longValue(); + Object actionObj = fields.get(ACTIONS_FIELD); + List actions; + if (actionObj instanceof Map) { + Map actionMap = (Map) actionObj; + actions = actionRegistry.parseActionsFromMap(actionMap); + } else { + throw new ElasticsearchException("Unable to parse actions [" + actionObj + "]"); + } + + List indices = new ArrayList<>(); + if (fields.get(INDICES_FIELD) != null && fields.get(INDICES_FIELD) instanceof List){ + indices = (List)fields.get(INDICES_FIELD); + } else { + logger.debug("Indices : " + fields.get(INDICES_FIELD) + " class " + + (fields.get(INDICES_FIELD) != null ? fields.get(INDICES_FIELD).getClass() : null )); + } + + String stateString = fields.get(AlertActionState.FIELD_NAME).toString(); + AlertActionState state = AlertActionState.fromString(stateString); + + return new AlertActionEntry(historyId, version, alertName, triggered, fireTime, scheduledFireTime, trigger, queryRan, + numberOfResults, actions, indices, state); + } + + + public boolean addHistory(String alertName, boolean triggered, + DateTime fireTime, DateTime scheduledFireTime, SearchRequestBuilder triggeringQuery, + AlertTrigger trigger, long numberOfResults, + List actions, + @Nullable List indices) throws IOException { + + if (!client.admin().indices().prepareExists(ALERT_HISTORY_INDEX).execute().actionGet().isExists()) { + ClusterHealthStatus chs = createAlertHistoryIndex(); + } + + AlertActionState state = AlertActionState.NO_ACTION_NEEDED; + if (triggered && !actions.isEmpty()) { + state = AlertActionState.ACTION_NEEDED; + } + + AlertActionEntry entry = new AlertActionEntry(alertName + " " + scheduledFireTime.toDateTimeISO(), 1, alertName, triggered, fireTime, scheduledFireTime, trigger, + triggeringQuery.toString(), numberOfResults, actions, indices, state); + + XContentBuilder historyEntry = XContentFactory.jsonBuilder(); + entry.toXContent(historyEntry, ToXContent.EMPTY_PARAMS); + + IndexRequest indexRequest = new IndexRequest(); + indexRequest.index(ALERT_HISTORY_INDEX); + indexRequest.type(ALERT_HISTORY_TYPE); + indexRequest.id(entry.getId()); + indexRequest.source(historyEntry); + indexRequest.listenerThreaded(false); + indexRequest.operationThreaded(false); + indexRequest.refresh(true); //Always refresh after indexing an alert + indexRequest.opType(IndexRequest.OpType.CREATE); + try { + if (client.index(indexRequest).actionGet().isCreated()) { + jobsToBeProcessed.add(entry); + return true; + } else { + return false; + } + } catch (DocumentAlreadyExistsException daee){ + logger.warn("Someone has already created a history entry for this alert run"); + return false; + } + } + + private void stopIfRunning() { + if (running.compareAndSet(true, false)) { + logger.info("Stopping job queue"); + jobsToBeProcessed.add(END_ENTRY); + } + } + + + private ClusterHealthStatus createAlertHistoryIndex() { + CreateIndexResponse cir = client.admin().indices().prepareCreate(ALERT_HISTORY_INDEX).addMapping(ALERT_HISTORY_TYPE).execute().actionGet(); //TODO FIX MAPPINGS + if (!cir.isAcknowledged()) { + logger.error("Create [{}] was not acknowledged", ALERT_HISTORY_INDEX); + } + ClusterHealthResponse actionGet = client.admin().cluster() + .health(Requests.clusterHealthRequest(ALERT_HISTORY_INDEX).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet(); + + return actionGet.getStatus(); + } + + + + private AlertActionEntry getHistoryEntryFromIndex(String entryId) { + GetRequest getRequest = Requests.getRequest(ALERT_HISTORY_INDEX); + getRequest.type(ALERT_HISTORY_TYPE); + getRequest.id(entryId); + GetResponse getResponse = client.get(getRequest).actionGet(); + if (getResponse.isExists()) { + return parseHistory(entryId, getResponse.getSourceAsMap(), getResponse.getVersion()); + } else { + throw new ElasticsearchException("Unable to find [" + entryId + "] in the [" + ALERT_HISTORY_INDEX + "]" ); + } + } + + private void updateHistoryEntry(AlertActionEntry entry, AlertActionState actionPerformed) { + entry.setEntryState(AlertActionState.ACTION_PERFORMED); + + UpdateRequest updateRequest = new UpdateRequest(); + updateRequest.index(ALERT_HISTORY_INDEX); + updateRequest.type(ALERT_HISTORY_TYPE); + updateRequest.id(entry.getId()); + + entry.setEntryState(actionPerformed); + XContentBuilder historyBuilder; + try { + historyBuilder = XContentFactory.jsonBuilder(); + entry.toXContent(historyBuilder, ToXContent.EMPTY_PARAMS); + } catch (IOException ie) { + throw new ElasticsearchException("Unable to serialize alert history entry ["+ entry.getId() + "]", ie); + } + updateRequest.doc(historyBuilder); + + try { + client.update(updateRequest).actionGet(); + } catch (ElasticsearchException ee) { + logger.error("Failed to update in claim", ee); + } + } + + private boolean claimAlertHistoryEntry(AlertActionEntry entry) { + AlertActionEntry indexedHistoryEntry; + try { + indexedHistoryEntry = getHistoryEntryFromIndex(entry.getId()); + if (indexedHistoryEntry.getEntryState() != AlertActionState.ACTION_NEEDED) { + //Someone else is doing or has done this action + return false; + } + entry.setEntryState(AlertActionState.ACTION_UNDERWAY); + + UpdateRequest updateRequest = new UpdateRequest(); + updateRequest.index(ALERT_HISTORY_INDEX); + updateRequest.type(ALERT_HISTORY_TYPE); + updateRequest.id(entry.getId()); + updateRequest.version(entry.getVersion());//Since we loaded this alert directly from the index the version should be correct + + XContentBuilder historyBuilder; + try { + historyBuilder = XContentFactory.jsonBuilder(); + entry.toXContent(historyBuilder, ToXContent.EMPTY_PARAMS); + } catch (IOException ie) { + throw new ElasticsearchException("Unable to serialize alert history entry ["+ entry.getId() + "]", ie); + } + updateRequest.doc(historyBuilder); + updateRequest.retryOnConflict(0); + + try { + client.update(updateRequest).actionGet(); + } catch (ElasticsearchException ee) { + logger.error("Failed to update in claim", ee); + return false; + } + + } catch (Throwable t) { + logger.error("Failed to claim history entry " + entry, t); + return false; + } + return true; + } + + + } diff --git a/src/main/java/org/elasticsearch/alerts/actions/AlertActionRegistry.java b/src/main/java/org/elasticsearch/alerts/actions/AlertActionRegistry.java new file mode 100644 index 00000000000..6d3300ad6fe --- /dev/null +++ b/src/main/java/org/elasticsearch/alerts/actions/AlertActionRegistry.java @@ -0,0 +1,60 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.alerts.actions; + +import org.elasticsearch.ElasticsearchIllegalArgumentException; +import org.elasticsearch.alerts.Alert; +import org.elasticsearch.alerts.AlertManager; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class AlertActionRegistry extends AbstractComponent { + + private volatile ImmutableOpenMap actionImplemented; + + @Inject + public AlertActionRegistry(Settings settings, Client client) { + super(settings); + this.actionImplemented = ImmutableOpenMap.builder() + .fPut("email", new EmailAlertActionFactory()) + .fPut("index", new IndexAlertActionFactory(client)) + .build(); + } + + public void registerAction(String name, AlertActionFactory actionFactory){ + actionImplemented = ImmutableOpenMap.builder(actionImplemented) + .fPut(name, actionFactory) + .build(); + } + + public List parseActionsFromMap(Map actionMap) { + ImmutableOpenMap actionImplemented = this.actionImplemented; + List actions = new ArrayList<>(); + for (Map.Entry actionEntry : actionMap.entrySet()) { + AlertActionFactory factory = actionImplemented.get(actionEntry.getKey()); + if (factory != null) { + actions.add(factory.createAction(actionEntry.getValue())); + } else { + throw new ElasticsearchIllegalArgumentException("No action exists with the name [" + actionEntry.getKey() + "]"); + } + } + return actions; + } + + public void doAction(Alert alert, AlertActionEntry actionEntry){ + for (AlertAction action : alert.actions()) { + action.doAction(alert, actionEntry); + } + } + +} diff --git a/src/main/java/org/elasticsearch/alerts/actions/AlertActionState.java b/src/main/java/org/elasticsearch/alerts/actions/AlertActionState.java new file mode 100644 index 00000000000..9edbb348984 --- /dev/null +++ b/src/main/java/org/elasticsearch/alerts/actions/AlertActionState.java @@ -0,0 +1,64 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.alerts.actions; + +import org.elasticsearch.ElasticsearchIllegalArgumentException; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; + +/** + */ +public enum AlertActionState implements ToXContent { + NO_ACTION_NEEDED, + ACTION_NEEDED, + ACTION_UNDERWAY, + ACTION_PERFORMED; + + public static final String FIELD_NAME = "AlertHistoryState"; + + + @Override + public String toString(){ + switch (this) { + case NO_ACTION_NEEDED: + return "NO_ACTION_NEEDED"; + case ACTION_NEEDED: + return "ACTION_NEEDED"; + case ACTION_UNDERWAY: + return "ACTION_UNDERWAY"; + case ACTION_PERFORMED: + return "ACTION_PERFORMED"; + default: + return "NO_ACTION_NEEDED"; + } + } + + public static AlertActionState fromString(String s) { + switch(s.toUpperCase()) { + case "NO_ACTION_NEEDED": + return NO_ACTION_NEEDED; + case "ACTION_NEEDED": + return ACTION_NEEDED; + case "ACTION_UNDERWAY": + return ACTION_UNDERWAY; + case "ACTION_PERFORMED": + return ACTION_PERFORMED; + default: + throw new ElasticsearchIllegalArgumentException("Unknown value [" + s + "] for AlertHistoryState" ); + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(FIELD_NAME); + builder.value(this.toString()); + builder.endObject(); + return builder; + } +} diff --git a/src/main/java/org/elasticsearch/alerts/actions/EmailAlertAction.java b/src/main/java/org/elasticsearch/alerts/actions/EmailAlertAction.java index 7be6135a8e5..a952ffd624f 100644 --- a/src/main/java/org/elasticsearch/alerts/actions/EmailAlertAction.java +++ b/src/main/java/org/elasticsearch/alerts/actions/EmailAlertAction.java @@ -6,9 +6,8 @@ package org.elasticsearch.alerts.actions; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.alerts.AlertResult; +import org.elasticsearch.alerts.Alert; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.search.SearchHit; import java.io.IOException; import java.util.ArrayList; @@ -28,7 +27,6 @@ public class EmailAlertAction implements AlertAction { String server = "smtp.gmail.com"; int port = 587; - public EmailAlertAction(String ... addresses){ for (String address : addresses) { addEmailAddress(address); @@ -69,7 +67,7 @@ public class EmailAlertAction implements AlertAction { } @Override - public boolean doAction(String alertName, AlertResult result) { + public boolean doAction(Alert alert, AlertActionEntry result) { Properties props = new Properties(); props.put("mail.smtp.auth", "true"); props.put("mail.smtp.starttls.enable", "true"); @@ -86,19 +84,21 @@ public class EmailAlertAction implements AlertAction { message.setFrom(new InternetAddress(from)); message.setRecipients(Message.RecipientType.TO, emailAddresses.toArray(new Address[1])); - message.setSubject("Elasticsearch Alert " + alertName + " triggered"); + message.setSubject("Elasticsearch Alert " + alert.alertName() + " triggered"); StringBuffer output = new StringBuffer(); - output.append("The following query triggered because " + result.trigger.toString() + "\n"); - output.append("The total number of hits returned : " + result.searchResponse.getHits().getTotalHits() + "\n"); - output.append("For query : " + result.query.toString()); + output.append("The following query triggered because " + result.getTrigger().toString() + "\n"); + output.append("The total number of hits returned : " + result.getNumberOfResults() + "\n"); + output.append("For query : " + result.getTriggeringQuery()); output.append("\n"); output.append("Indices : "); - for (String index : result.indices) { + for (String index : result.getIndices()) { output.append(index); output.append("/"); } output.append("\n"); output.append("\n"); + /* + ///@TODO: FIX THE SEARCH RESULT DISPLAY STUFF if (displayField != null) { for (SearchHit sh : result.searchResponse.getHits().getHits()) { if (sh.sourceAsMap().containsKey(displayField)) { @@ -111,6 +111,7 @@ public class EmailAlertAction implements AlertAction { } else { output.append(result.searchResponse.toString()); } + */ message.setText(output.toString()); Transport.send(message); } catch (Exception e){ diff --git a/src/main/java/org/elasticsearch/alerts/actions/IndexAlertAction.java b/src/main/java/org/elasticsearch/alerts/actions/IndexAlertAction.java index fa5d62387f4..157775fcce1 100644 --- a/src/main/java/org/elasticsearch/alerts/actions/IndexAlertAction.java +++ b/src/main/java/org/elasticsearch/alerts/actions/IndexAlertAction.java @@ -7,7 +7,7 @@ package org.elasticsearch.alerts.actions; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.alerts.AlertResult; +import org.elasticsearch.alerts.Alert; import org.elasticsearch.client.Client; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; @@ -45,15 +45,15 @@ public class IndexAlertAction implements AlertAction, ToXContent { } @Override - public boolean doAction(String alertName, AlertResult alertResult) { + public boolean doAction(Alert alert, AlertActionEntry alertResult) { IndexRequest indexRequest = new IndexRequest(); indexRequest.index(index); indexRequest.type(type); try { XContentBuilder resultBuilder = XContentFactory.jsonBuilder().prettyPrint(); resultBuilder.startObject(); - resultBuilder = alertResult.searchResponse.toXContent(resultBuilder, ToXContent.EMPTY_PARAMS); - resultBuilder.field("timestamp", alertResult.fireTime); + //resultBuilder = alertResult.searchResponse.toXContent(resultBuilder, ToXContent.EMPTY_PARAMS); + resultBuilder.field("timestamp", alertResult.getFireTime()); resultBuilder.endObject(); indexRequest.source(resultBuilder); } catch (IOException ie) { diff --git a/src/main/java/org/elasticsearch/alerts/plugin/AlertsPlugin.java b/src/main/java/org/elasticsearch/alerts/plugin/AlertsPlugin.java index c4373dbf5bb..43165a0ce55 100644 --- a/src/main/java/org/elasticsearch/alerts/plugin/AlertsPlugin.java +++ b/src/main/java/org/elasticsearch/alerts/plugin/AlertsPlugin.java @@ -11,14 +11,19 @@ import org.elasticsearch.alerts.AlertingModule; import org.elasticsearch.common.collect.Lists; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.inject.Module; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.plugins.AbstractPlugin; import java.util.Collection; +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; public class AlertsPlugin extends AbstractPlugin { + public static final String NAME = "alerts"; + @Override public String name() { - return "alerts"; + return NAME; } @Override public String description() { @@ -39,4 +44,12 @@ public class AlertsPlugin extends AbstractPlugin { modules.add(AlertingModule.class); return modules; } + + @Override + public Settings additionalSettings() { + return settingsBuilder() + .put("threadpool."+ NAME + ".type","cached") + .build(); + } + } diff --git a/src/main/java/org/elasticsearch/alerts/scheduler/AlertScheduler.java b/src/main/java/org/elasticsearch/alerts/scheduler/AlertScheduler.java index 4c8ce0898b5..04373508f5f 100644 --- a/src/main/java/org/elasticsearch/alerts/scheduler/AlertScheduler.java +++ b/src/main/java/org/elasticsearch/alerts/scheduler/AlertScheduler.java @@ -9,9 +9,8 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.alerts.Alert; -import org.elasticsearch.alerts.AlertManager; -import org.elasticsearch.alerts.AlertResult; import org.elasticsearch.alerts.actions.AlertActionManager; +import org.elasticsearch.alerts.AlertManager; import org.elasticsearch.alerts.triggers.TriggerManager; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; @@ -42,21 +41,22 @@ public class AlertScheduler extends AbstractLifecycleComponent implements Cluste private final Client client; private final Scheduler scheduler; private final AlertManager alertManager; - private final ScriptService scriptService; private final TriggerManager triggerManager; - private final AlertActionManager actionManager; + private final ScriptService scriptService; + + private AlertActionManager actionManager; + private final AtomicBoolean run = new AtomicBoolean(false); @Inject public AlertScheduler(Settings settings, AlertManager alertManager, Client client, - TriggerManager triggerManager, AlertActionManager actionManager, - ScriptService scriptService, ClusterService clusterService) { + TriggerManager triggerManager, ScriptService scriptService, + ClusterService clusterService) { super(settings); this.alertManager = alertManager; this.client = client; this.triggerManager = triggerManager; - this.actionManager = actionManager; this.scriptService = scriptService; try { SchedulerFactory schFactory = new StdSchedulerFactory(); @@ -77,7 +77,7 @@ public class AlertScheduler extends AbstractLifecycleComponent implements Cluste logger.info("Starting scheduler"); scheduler.start(); } catch (SchedulerException se){ - logger.error("Failed to start quartz scheduler",se); + logger.error("Failed to start quartz scheduler", se); } } } else { @@ -89,10 +89,12 @@ public class AlertScheduler extends AbstractLifecycleComponent implements Cluste if (run.compareAndSet(true, false)) { try { logger.info("Stopping scheduler"); - scheduler.clear(); - scheduler.shutdown(false); + if (!scheduler.isShutdown()) { + scheduler.clear(); + scheduler.shutdown(false); + } } catch (SchedulerException se){ - logger.error("Failed to stop quartz scheduler",se); + logger.error("Failed to stop quartz scheduler", se); } } } @@ -156,36 +158,17 @@ public class AlertScheduler extends AbstractLifecycleComponent implements Cluste SearchResponse sr = srb.execute().get(); logger.warn("Got search response hits : [{}]", sr.getHits().getTotalHits() ); - AlertResult result = new AlertResult(alertName, sr, alert.trigger(), - triggerManager.isTriggered(alertName,sr), srb, indices, - new DateTime(jobExecutionContext.getScheduledFireTime())); - boolean firedAction = false; - if (result.isTriggered) { - logger.warn("We have triggered"); - DateTime lastActionFire = alertManager.timeActionLastTriggered(alertName); - long msSinceLastAction = scheduledTime.getMillis() - lastActionFire.getMillis(); - logger.error("last action fire [{}]", lastActionFire); - logger.error("msSinceLastAction [{}]", msSinceLastAction); + boolean isTriggered = triggerManager.isTriggered(alertName,sr); - if (alert.timePeriod().getMillis() > msSinceLastAction) { - logger.warn("Not firing action because it was fired in the timePeriod"); - } else { - actionManager.doAction(alertName, result); - logger.warn("Did action !"); - firedAction = true; - } - - } else { - logger.warn("We didn't trigger"); - } - alertManager.updateLastRan(alertName, new DateTime(jobExecutionContext.getFireTime()),scheduledTime,firedAction); - if (!alertManager.addHistory(alertName, result.isTriggered, - new DateTime(jobExecutionContext.getScheduledFireTime()), result.query, - result.trigger, result.searchResponse.getHits().getTotalHits(), alert.actions(), alert.indices())) + alertManager.updateLastRan(alertName, new DateTime(jobExecutionContext.getFireTime()),scheduledTime); + if (!alertManager.addHistory(alertName, isTriggered, + new DateTime(jobExecutionContext.getScheduledFireTime()), scheduledTime, srb, + alert.trigger(), sr.getHits().getTotalHits(), alert.actions(), alert.indices())) { logger.warn("Failed to store history for alert [{}]", alertName); } + } catch (Exception e) { logger.error("Failed execute alert [{}]", e, alertName); } diff --git a/src/main/java/org/elasticsearch/alerts/triggers/AlertTrigger.java b/src/main/java/org/elasticsearch/alerts/triggers/AlertTrigger.java index 768f8c5a4ec..a74f93008ff 100644 --- a/src/main/java/org/elasticsearch/alerts/triggers/AlertTrigger.java +++ b/src/main/java/org/elasticsearch/alerts/triggers/AlertTrigger.java @@ -168,4 +168,29 @@ public class AlertTrigger implements ToXContent { } } + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + AlertTrigger that = (AlertTrigger) o; + + if (value != that.value) return false; + if (scriptedTrigger != null ? !scriptedTrigger.equals(that.scriptedTrigger) : that.scriptedTrigger != null) + return false; + if (trigger != that.trigger) return false; + if (triggerType != that.triggerType) return false; + + return true; + } + + @Override + public int hashCode() { + int result = trigger != null ? trigger.hashCode() : 0; + result = 31 * result + (triggerType != null ? triggerType.hashCode() : 0); + result = 31 * result + value; + result = 31 * result + (scriptedTrigger != null ? scriptedTrigger.hashCode() : 0); + return result; + } + } diff --git a/src/main/java/org/elasticsearch/alerts/triggers/TriggerManager.java b/src/main/java/org/elasticsearch/alerts/triggers/TriggerManager.java index abaed0eee7f..c13955103e6 100644 --- a/src/main/java/org/elasticsearch/alerts/triggers/TriggerManager.java +++ b/src/main/java/org/elasticsearch/alerts/triggers/TriggerManager.java @@ -33,8 +33,6 @@ public class TriggerManager extends AbstractComponent { private final ScriptService scriptService; public static AlertTrigger parseTriggerFromMap(Map triggerMap) { - - //For now just trigger on number of events greater than 1 for (Map.Entry entry : triggerMap.entrySet()){ AlertTrigger.TriggerType type = AlertTrigger.TriggerType.fromString(entry.getKey()); if (type == AlertTrigger.TriggerType.SCRIPT) { @@ -48,6 +46,7 @@ public class TriggerManager extends AbstractComponent { } throw new ElasticsearchIllegalArgumentException(); } + private static ScriptedAlertTrigger parseScriptedTrigger(Object value) { if (value instanceof Map) { Map valueMap = (Map)value; diff --git a/src/test/java/org/elasticsearch/alerts/BasicAlertingTest.java b/src/test/java/org/elasticsearch/alerts/BasicAlertingTest.java index 5098b21318a..7ab0dabaccd 100644 --- a/src/test/java/org/elasticsearch/alerts/BasicAlertingTest.java +++ b/src/test/java/org/elasticsearch/alerts/BasicAlertingTest.java @@ -8,6 +8,8 @@ package org.elasticsearch.alerts; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; import org.elasticsearch.alerts.actions.AlertAction; import org.elasticsearch.alerts.actions.AlertActionFactory; +import org.elasticsearch.alerts.actions.AlertActionRegistry; +import org.elasticsearch.alerts.actions.AlertActionEntry; import org.elasticsearch.alerts.actions.AlertActionManager; import org.elasticsearch.alerts.plugin.AlertsPlugin; import org.elasticsearch.alerts.scheduler.AlertScheduler; @@ -31,7 +33,7 @@ import static org.hamcrest.core.Is.is; /** */ -@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.SUITE, numClientNodes = 0, transportClientRatio = 0, maxNumDataNodes = 3) +@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.SUITE, numClientNodes = 0, transportClientRatio = 0, maxNumDataNodes = 1, minNumDataNodes = 1, numDataNodes = 1) public class BasicAlertingTest extends ElasticsearchIntegrationTest { @Override @@ -48,11 +50,15 @@ public class BasicAlertingTest extends ElasticsearchIntegrationTest { @Test // TODO: add request, response & request builder etc. public void testAlerSchedulerStartsProperly() throws Exception { - createIndex("my-index"); - createIndex(ScriptService.SCRIPT_INDEX); - client().admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); - client().prepareIndex(ScriptService.SCRIPT_INDEX, "mustache", "query") + createIndex("my-index"); + createIndex(AlertManager.ALERT_INDEX); + createIndex(AlertActionManager.ALERT_HISTORY_INDEX); + ensureGreen("my-index", AlertManager.ALERT_INDEX, AlertActionManager.ALERT_HISTORY_INDEX); + + client().preparePutIndexedScript() + .setScriptLang("mustache") + .setId("query") .setSource(jsonBuilder().startObject().startObject("template").startObject("match_all").endObject().endObject().endObject()) .get(); @@ -92,14 +98,14 @@ public class BasicAlertingTest extends ElasticsearchIntegrationTest { } @Override - public boolean doAction(String alertName, AlertResult alert) { - logger.info("Alert {} invoked: {}", alertName, alert); + public boolean doAction(Alert alert, AlertActionEntry actionEntry) { + logger.info("Alert {} invoked: {}", alert.alertName(), actionEntry); alertActionInvoked.set(true); return true; } }; - AlertActionManager alertActionManager = internalCluster().getInstance(AlertActionManager.class, internalCluster().getMasterName()); - alertActionManager.registerAction("test", new AlertActionFactory() { + AlertActionRegistry alertActionRegistry = internalCluster().getInstance(AlertActionRegistry.class, internalCluster().getMasterName()); + alertActionRegistry.registerAction("test", new AlertActionFactory() { @Override public AlertAction createAction(Object parameters) { return alertAction; @@ -125,7 +131,7 @@ public class BasicAlertingTest extends ElasticsearchIntegrationTest { @Override public void run() { assertThat(alertActionInvoked.get(), is(true)); - IndicesExistsResponse indicesExistsResponse = client().admin().indices().prepareExists(AlertManager.ALERT_HISTORY_INDEX).get(); + IndicesExistsResponse indicesExistsResponse = client().admin().indices().prepareExists(AlertActionManager.ALERT_HISTORY_INDEX).get(); assertThat(indicesExistsResponse.isExists(), is(true)); } }, 30, TimeUnit.SECONDS); diff --git a/src/test/java/org/elasticsearch/alerts/actions/AlertActionsTest.java b/src/test/java/org/elasticsearch/alerts/actions/AlertActionsTest.java new file mode 100644 index 00000000000..04e0ac235ef --- /dev/null +++ b/src/test/java/org/elasticsearch/alerts/actions/AlertActionsTest.java @@ -0,0 +1,61 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.alerts.actions; + +import org.elasticsearch.alerts.BasicAlertingTest; +import org.elasticsearch.alerts.triggers.AlertTrigger; +import org.elasticsearch.common.joda.time.DateTime; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + */ +@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.SUITE, numClientNodes = 0, transportClientRatio = 0, numDataNodes = 1) +public class AlertActionsTest extends ElasticsearchIntegrationTest { + + @Test + public void testAlertActionParser(){ + DateTime fireTime = new DateTime(); + DateTime scheduledFireTime = new DateTime(); + Map triggerMap = new HashMap<>(); + triggerMap.put("numberOfEvents", ">1"); + Map actionMap = new HashMap<>(); + Map emailParamMap = new HashMap<>(); + List addresses = new ArrayList<>(); + addresses.add("foo@bar.com"); + emailParamMap.put("addresses", addresses); + actionMap.put("email", emailParamMap); + + Map fieldMap = new HashMap<>(); + fieldMap.put(AlertActionManager.ALERT_NAME_FIELD, "testName"); + fieldMap.put(AlertActionManager.TRIGGERED_FIELD, true); + fieldMap.put(AlertActionManager.FIRE_TIME_FIELD, fireTime.toDateTimeISO().toString()); + fieldMap.put(AlertActionManager.SCHEDULED_FIRE_TIME_FIELD, scheduledFireTime.toDateTimeISO().toString()); + fieldMap.put(AlertActionManager.TRIGGER_FIELD, triggerMap); + fieldMap.put(AlertActionManager.QUERY_RAN_FIELD, "foobar"); + fieldMap.put(AlertActionManager.NUMBER_OF_RESULTS_FIELD,10); + fieldMap.put(AlertActionManager.ACTIONS_FIELD, actionMap); + fieldMap.put(AlertActionState.FIELD_NAME, AlertActionState.ACTION_NEEDED.toString()); + AlertActionRegistry alertActionRegistry = internalCluster().getInstance(AlertActionRegistry.class, internalCluster().getMasterName()); + AlertActionEntry actionEntry = AlertActionManager.parseHistory("foobar", fieldMap, 0, alertActionRegistry, logger); + + assertEquals(actionEntry.getVersion(), 0); + assertEquals(actionEntry.getAlertName(), "testName"); + assertEquals(actionEntry.isTriggered(), true); + assertEquals(actionEntry.getScheduledTime(), scheduledFireTime); + assertEquals(actionEntry.getFireTime(), fireTime); + assertEquals(actionEntry.getEntryState(), AlertActionState.ACTION_NEEDED); + assertEquals(actionEntry.getNumberOfResults(), 10); + assertEquals(actionEntry.getTrigger(), + new AlertTrigger(AlertTrigger.SimpleTrigger.GREATER_THAN, AlertTrigger.TriggerType.NUMBER_OF_EVENTS, 1)); + + } +}