From 8ba9c1c60997d4c7f14ff83a69ffe51d5c54bcbf Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 4 Nov 2014 11:27:58 +0100 Subject: [PATCH] Cleared up the alert / alert action entry classes and how the search request in the trigger manager gets prepared. Original commit: elastic/x-pack-elasticsearch@52b16abb53211bbf04159136e11abf4f688a9bd3 --- .../java/org/elasticsearch/alerts/Alert.java | 236 +++++++----------- .../elasticsearch/alerts/AlertManager.java | 4 +- .../org/elasticsearch/alerts/AlertsStore.java | 42 +--- .../alerts/actions/AlertActionEntry.java | 195 ++++++--------- .../alerts/actions/AlertActionManager.java | 52 ++-- .../alerts/actions/EmailAlertAction.java | 6 +- .../alerts/triggers/TriggerManager.java | 117 +++++---- .../alerts/BasicAlertingTest.java | 25 +- .../alerts/actions/AlertActionsTest.java | 22 +- 9 files changed, 296 insertions(+), 403 deletions(-) diff --git a/src/main/java/org/elasticsearch/alerts/Alert.java b/src/main/java/org/elasticsearch/alerts/Alert.java index 9d8b9514ab5..b4f4e82facc 100644 --- a/src/main/java/org/elasticsearch/alerts/Alert.java +++ b/src/main/java/org/elasticsearch/alerts/Alert.java @@ -5,10 +5,11 @@ */ package org.elasticsearch.alerts; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.alerts.actions.AlertAction; import org.elasticsearch.alerts.triggers.AlertTrigger; +import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.joda.time.DateTime; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -18,153 +19,41 @@ import java.util.List; public class Alert implements ToXContent { private String alertName; - private String queryName; + private SearchRequest searchRequest; private AlertTrigger trigger; - private TimeValue timePeriod; private List actions; private String schedule; private DateTime lastActionFire; private long version; private boolean enabled; - private boolean simpleQuery; - private String timestampString = "@timestamp"; - - public String timestampString() { - return timestampString; - } - - public void timestampString(String timestampString) { - this.timestampString = timestampString; - } - - public DateTime lastActionFire() { - return lastActionFire; - } - - public void lastActionFire(DateTime lastActionFire) { - this.lastActionFire = lastActionFire; - } - - public boolean simpleQuery() { - return simpleQuery; - } - - public void simpleQuery(boolean simpleQuery) { - this.simpleQuery = simpleQuery; - } - - public boolean enabled() { - return enabled; - } - - public void enabled(boolean enabled) { - this.enabled = enabled; - } - - public long version() { - return version; - } - - public void version(long version) { - this.version = version; - } - - public List indices() { - return indices; - } - - public void indices(List indices) { - this.indices = indices; - } - - private List indices; - - public String alertName() { - return alertName; - } - - public void alertName(String alertName) { - this.alertName = alertName; - } - - public String queryName() { - return queryName; - } - - public void queryName(String queryName) { - this.queryName = queryName; - } - - public AlertTrigger trigger() { - return trigger; - } - - public void trigger(AlertTrigger trigger) { - this.trigger = trigger; - } - - public TimeValue timePeriod() { - return timePeriod; - } - - public void timePeriod(TimeValue timePeriod) { - this.timePeriod = timePeriod; - } - - public List actions() { - return actions; - } - - public void actions(List action) { - this.actions = action; - } - - public String schedule() { - return schedule; - } - - public void schedule(String schedule) { - this.schedule = schedule; - } public Alert() { } - public Alert(String alertName, String queryName, AlertTrigger trigger, - TimeValue timePeriod, List actions, String schedule, DateTime lastRan, - List indices, DateTime running, long version, boolean enabled, boolean simpleQuery){ + public Alert(String alertName, SearchRequest searchRequest, AlertTrigger trigger, List actions, String schedule, DateTime lastActionFire, long version, boolean enabled) { this.alertName = alertName; - this.queryName = queryName; + this.searchRequest = searchRequest; this.trigger = trigger; - this.timePeriod = timePeriod; this.actions = actions; this.schedule = schedule; - this.indices = indices; + this.lastActionFire = lastActionFire; this.version = version; this.enabled = enabled; - this.simpleQuery = simpleQuery; } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - - //Note we deliberately don't serialize the version here builder.startObject(); - if (queryName != null) { - builder.field(AlertsStore.QUERY_NAME_FIELD.getPreferredName(), queryName); - } + BytesStreamOutput out = new BytesStreamOutput(); + searchRequest.writeTo(out); + builder.field(AlertsStore.SEARCH_REQUEST_FIELD.getPreferredName(), out.bytes()); if (schedule != null) { builder.field(AlertsStore.SCHEDULE_FIELD.getPreferredName(), schedule); } - if (timePeriod != null) { - builder.field(AlertsStore.TIMEPERIOD_FIELD.getPreferredName(), timePeriod); - } builder.field(AlertsStore.ENABLED.getPreferredName(), enabled); - builder.field(AlertsStore.SIMPLE_QUERY.getPreferredName(), simpleQuery); if (lastActionFire != null) { builder.field(AlertsStore.LAST_ACTION_FIRE.getPreferredName(), lastActionFire); } - if (actions != null && !actions.isEmpty()) { builder.startObject(AlertsStore.ACTION_FIELD.getPreferredName()); for (AlertAction action : actions){ @@ -173,40 +62,99 @@ public class Alert implements ToXContent { } builder.endObject(); } - - if (indices != null && !indices.isEmpty()) { - builder.startArray(AlertsStore.INDICES.getPreferredName()); - for (String index : indices){ - builder.value(index); - } - builder.endArray(); - } - if (trigger != null) { builder.field(AlertsStore.TRIGGER_FIELD.getPreferredName()); trigger.toXContent(builder, params); } - builder.endObject(); return builder; } - public boolean isSameAlert(Alert otherAlert) { + /** + * @return The last time this alert ran. + */ + public DateTime lastActionFire() { + return lastActionFire; + } - if (this == otherAlert) return true; + public void lastActionFire(DateTime lastActionFire) { + this.lastActionFire = lastActionFire; + } - if (enabled != otherAlert.enabled) return false; - if (simpleQuery != otherAlert.simpleQuery) return false; - if (actions != null ? !actions.equals(otherAlert.actions) : otherAlert.actions != null) return false; - if (alertName != null ? !alertName.equals(otherAlert.alertName) : otherAlert.alertName != null) return false; - if (indices != null ? !indices.equals(otherAlert.indices) : otherAlert.indices != null) return false; - if (queryName != null ? !queryName.equals(otherAlert.queryName) : otherAlert.queryName != null) return false; - if (schedule != null ? !schedule.equals(otherAlert.schedule) : otherAlert.schedule != null) return false; - if (timePeriod != null ? !timePeriod.equals(otherAlert.timePeriod) : otherAlert.timePeriod != null) return false; - if (timestampString != null ? !timestampString.equals(otherAlert.timestampString) : otherAlert.timestampString != null) - return false; - if (trigger != null ? !trigger.equals(otherAlert.trigger) : otherAlert.trigger != null) return false; + /** + * @return Whether this alert has been enabled. + */ + public boolean enabled() { + return enabled; + } - return true; + public void enabled(boolean enabled) { + this.enabled = enabled; + } + + /** + * @return The current version of the alert. (es document version) + */ + public long version() { + return version; + } + + public void version(long version) { + this.version = version; + } + + /** + * @return The unique name of this alert. + */ + public String alertName() { + return alertName; + } + + public void alertName(String alertName) { + this.alertName = alertName; + } + + /** + * @return The search request that runs when the alert runs by the sc + */ + public SearchRequest getSearchRequest() { + return searchRequest; + } + + public void setSearchRequest(SearchRequest searchRequest) { + this.searchRequest = searchRequest; + } + + /** + * @return The trigger that is going to evaluate if the alert is going to execute the alert actions. + */ + public AlertTrigger trigger() { + return trigger; + } + + public void trigger(AlertTrigger trigger) { + this.trigger = trigger; + } + + /** + * @return the actions to be executed if the alert matches the trigger + */ + public List actions() { + return actions; + } + + public void actions(List action) { + this.actions = action; + } + + /** + * @return The cron schedule expression that expresses when to run the alert. + */ + public String schedule() { + return schedule; + } + + public void schedule(String schedule) { + this.schedule = schedule; } } diff --git a/src/main/java/org/elasticsearch/alerts/AlertManager.java b/src/main/java/org/elasticsearch/alerts/AlertManager.java index d933de96345..3277b540e8f 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertManager.java +++ b/src/main/java/org/elasticsearch/alerts/AlertManager.java @@ -109,8 +109,8 @@ public class AlertManager extends AbstractComponent { return; } try { - TriggerResult result = triggerManager.isTriggered(alert, scheduledFireTime); - actionManager.addAlertAction(alert, result, fireTime, scheduledFireTime); + TriggerResult result = triggerManager.isTriggered(alert, scheduledFireTime, fireTime); + actionManager.addAlertAction(alert, result, scheduledFireTime, fireTime); } catch (Exception e) { logger.error("Failed execute alert [{}]", e, alertName); } diff --git a/src/main/java/org/elasticsearch/alerts/AlertsStore.java b/src/main/java/org/elasticsearch/alerts/AlertsStore.java index 0ad9ddca4b3..bac6df28b77 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertsStore.java +++ b/src/main/java/org/elasticsearch/alerts/AlertsStore.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.alerts.actions.AlertAction; @@ -28,6 +29,7 @@ import org.elasticsearch.common.Priority; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.BytesStreamInput; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -37,7 +39,6 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; @@ -49,18 +50,12 @@ public class AlertsStore extends AbstractComponent { public static final String ALERT_INDEX = ".alerts"; public static final String ALERT_TYPE = "alert"; - public static final ParseField QUERY_NAME_FIELD = new ParseField("query"); public static final ParseField SCHEDULE_FIELD = new ParseField("schedule"); public static final ParseField TRIGGER_FIELD = new ParseField("trigger"); - public static final ParseField TIMEPERIOD_FIELD = new ParseField("timeperiod"); public static final ParseField ACTION_FIELD = new ParseField("action"); - public static final ParseField INDICES = new ParseField("indices"); + public static final ParseField LAST_ACTION_FIRE = new ParseField("last_action_fire"); public static final ParseField ENABLED = new ParseField("enabled"); - public static final ParseField SIMPLE_QUERY = new ParseField("simple"); - public static final ParseField TIMESTAMP_FIELD = new ParseField("timefield"); - public static final ParseField LAST_ACTION_FIRE = new ParseField("lastactionfire"); - - private final TimeValue defaultTimePeriod = new TimeValue(300*1000); //TODO : read from config + public static final ParseField SEARCH_REQUEST_FIELD = new ParseField("request"); private final Client client; private final ThreadPool threadPool; @@ -276,33 +271,17 @@ public class AlertsStore extends AbstractComponent { } else { throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]"); } - } else if (token == XContentParser.Token.START_ARRAY) { - if (INDICES.match(currentFieldName)) { - List indices = new ArrayList<>(); - while (parser.nextToken() != XContentParser.Token.END_ARRAY) { - indices.add(parser.text()); - } - alert.indices(indices); - } else { - throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]"); - } } else if (token.isValue()) { - if (QUERY_NAME_FIELD.match(currentFieldName)) { - alert.queryName(parser.textOrNull()); - } else if (SCHEDULE_FIELD.match(currentFieldName)) { + if (SCHEDULE_FIELD.match(currentFieldName)) { alert.schedule(parser.textOrNull()); - } else if (TIMEPERIOD_FIELD.match(currentFieldName)) { - alert.timestampString(parser.textOrNull()); } else if (ENABLED.match(currentFieldName)) { alert.enabled(parser.booleanValue()); - } else if (SIMPLE_QUERY.match(currentFieldName)) { - alert.simpleQuery(parser.booleanValue()); - } else if (TIMEPERIOD_FIELD.match(currentFieldName)) { - alert.timePeriod(TimeValue.parseTimeValue(parser.textOrNull(), defaultTimePeriod)); } else if (LAST_ACTION_FIRE.match(currentFieldName)) { alert.lastActionFire(DateTime.parse(parser.textOrNull())); - } else if (TIMESTAMP_FIELD.match(currentFieldName)) { - alert.timestampString(parser.textOrNull()); + } else if (SEARCH_REQUEST_FIELD.match(currentFieldName)) { + SearchRequest searchRequest = new SearchRequest(); + searchRequest.readFrom(new BytesStreamInput(parser.binaryValue(), false)); + alert.setSearchRequest(searchRequest); } else { throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]"); } @@ -314,9 +293,6 @@ public class AlertsStore extends AbstractComponent { throw new ElasticsearchException("Error during parsing alert", e); } - if (alert.timePeriod() == null) { - alert.timePeriod(defaultTimePeriod); - } if (alert.lastActionFire() == null) { alert.lastActionFire(new DateTime(0)); } diff --git a/src/main/java/org/elasticsearch/alerts/actions/AlertActionEntry.java b/src/main/java/org/elasticsearch/alerts/actions/AlertActionEntry.java index fb4eac5f98c..49c9f43c4e6 100644 --- a/src/main/java/org/elasticsearch/alerts/actions/AlertActionEntry.java +++ b/src/main/java/org/elasticsearch/alerts/actions/AlertActionEntry.java @@ -5,33 +5,56 @@ */ package org.elasticsearch.alerts.actions; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.alerts.Alert; import org.elasticsearch.alerts.triggers.AlertTrigger; import org.elasticsearch.alerts.triggers.TriggerResult; +import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentHelper; import java.io.IOException; import java.util.List; /** + * An alert action entry is an event of an alert that fired on particular moment in time. */ public class AlertActionEntry implements ToXContent{ + private String id; private long version; private String alertName; private boolean triggered; private DateTime fireTime; - private AlertTrigger trigger; - private String triggeringSearchRequest; - private long numberOfResults; - private List actions; - private List indices; - private AlertActionState entryState; private DateTime scheduledTime; + private AlertTrigger trigger; + private SearchRequest searchRequest; + private SearchResponse searchResponse; + private List actions; + private AlertActionState entryState; + AlertActionEntry() { + } + + public AlertActionEntry(Alert alert, TriggerResult result, DateTime scheduledTime, DateTime fireTime, AlertActionState state) throws IOException { + this.id = alert.alertName() + "#" + scheduledTime.toDateTimeISO(); + this.version = 1; + this.alertName = alert.alertName(); + this.triggered = result.isTriggered(); + this.fireTime = fireTime; + this.scheduledTime = scheduledTime; + this.trigger = alert.trigger(); + this.searchRequest = result.getRequest(); + this.searchResponse = result.getResponse(); + this.actions = alert.actions(); + this.entryState = state; + } + + /** + * @return The unique id of the alert action entry + */ public String getId() { return id; } @@ -40,8 +63,9 @@ public class AlertActionEntry implements ToXContent{ this.id = id; } - private String id; - + /** + * @return The time the alert was scheduled to be triggered + */ public DateTime getScheduledTime() { return scheduledTime; } @@ -50,6 +74,9 @@ public class AlertActionEntry implements ToXContent{ this.scheduledTime = scheduledTime; } + /** + * @return The name of the alert that triggered + */ public String getAlertName() { return alertName; } @@ -58,6 +85,9 @@ public class AlertActionEntry implements ToXContent{ this.alertName = alertName; } + /** + * @return Whether the search request that run as part of the alert on a fire time matched with the defined trigger. + */ public boolean isTriggered() { return triggered; } @@ -66,6 +96,9 @@ public class AlertActionEntry implements ToXContent{ this.triggered = triggered; } + /** + * @return The time the alert actually ran. + */ public DateTime getFireTime() { return fireTime; } @@ -74,6 +107,9 @@ public class AlertActionEntry implements ToXContent{ this.fireTime = fireTime; } + /** + * @return The trigger that evaluated the search response + */ public AlertTrigger getTrigger() { return trigger; } @@ -82,22 +118,31 @@ public class AlertActionEntry implements ToXContent{ this.trigger = trigger; } - public String getTriggeringSearchRequest() { - return triggeringSearchRequest; + /** + * @return The query that ran at fire time + */ + public SearchRequest getSearchRequest() { + return searchRequest; } - public void setTriggeringSearchRequest(String triggeringSearchRequest) { - this.triggeringSearchRequest = triggeringSearchRequest; + public void setSearchRequest(SearchRequest searchRequest) { + this.searchRequest = searchRequest; } - public long getNumberOfResults() { - return numberOfResults; + /** + * @return The search response that resulted at out the search request that ran. + */ + public SearchResponse getSearchResponse() { + return searchResponse; } - public void setNumberOfResults(long numberOfResults) { - this.numberOfResults = numberOfResults; + public void setSearchResponse(SearchResponse searchResponse) { + this.searchResponse = searchResponse; } + /** + * @return The list of actions that ran if the search response matched with the trigger + */ public List getActions() { return actions; } @@ -106,14 +151,9 @@ public class AlertActionEntry implements ToXContent{ this.actions = actions; } - public List getIndices() { - return indices; - } - - public void setIndices(List indices) { - this.indices = indices; - } - + /** + * @return The current state of the alert event. + */ public AlertActionState getEntryState() { return entryState; } @@ -130,24 +170,6 @@ public class AlertActionEntry implements ToXContent{ this.version = version; } - protected AlertActionEntry() { - } - - public AlertActionEntry(Alert alert, TriggerResult result, DateTime fireTime, DateTime scheduledTime, AlertActionState state) throws IOException { - this.id = alert.alertName() + "#" + scheduledTime.toDateTimeISO(); - this.version = 1; - this.alertName = alert.alertName(); - this.triggered = result.isTriggered(); - this.fireTime = fireTime; - this.scheduledTime = scheduledTime; - this.trigger = alert.trigger(); - this.triggeringSearchRequest = XContentHelper.convertToJson(result.getRequest().source(), false, true); - this.numberOfResults = result.getResponse().getHits().totalHits(); - this.actions = alert.actions(); - this.indices = alert.indices(); - this.entryState = state; - } - @Override public XContentBuilder toXContent(XContentBuilder historyEntry, Params params) throws IOException { historyEntry.startObject(); @@ -155,98 +177,41 @@ public class AlertActionEntry implements ToXContent{ historyEntry.field("triggered", triggered); historyEntry.field("fireTime", fireTime.toDateTimeISO()); historyEntry.field(AlertActionManager.SCHEDULED_FIRE_TIME_FIELD, scheduledTime.toDateTimeISO()); - - historyEntry.field("trigger"); - trigger.toXContent(historyEntry, ToXContent.EMPTY_PARAMS); - - historyEntry.field("queryRan", triggeringSearchRequest); - - historyEntry.field("numberOfResults", numberOfResults); - - historyEntry.field("actions"); - historyEntry.startObject(); + historyEntry.field("trigger", trigger, params); + BytesStreamOutput out = new BytesStreamOutput(); + searchRequest.writeTo(out); + historyEntry.field("request_binary", out.bytes()); + out = new BytesStreamOutput(); + searchResponse.writeTo(out); + historyEntry.field("response_binary", out.bytes()); + // Serializing it as xcontent allows the search response to be encapsulated in a doc as a json object + historyEntry.startObject("response"); + searchResponse.toXContent(historyEntry, params); + historyEntry.endObject(); + historyEntry.startObject("actions"); for (AlertAction action : actions) { historyEntry.field(action.getActionName()); action.toXContent(historyEntry, params); } historyEntry.endObject(); - - - if (indices != null) { - historyEntry.field("indices"); - historyEntry.startArray(); - for (String index : indices) { - historyEntry.value(index); - } - historyEntry.endArray(); - } - historyEntry.field(AlertActionState.FIELD_NAME, entryState.toString()); - historyEntry.endObject(); - return historyEntry; } - - @Override - public String toString() { - return "AlertHistoryEntry{" + - "version=" + version + - ", alertName='" + alertName + '\'' + - ", triggered=" + triggered + - ", fireTime=" + fireTime + - ", trigger=" + trigger + - ", triggeringSearchRequest='" + triggeringSearchRequest + '\'' + - ", numberOfResults=" + numberOfResults + - ", actions=" + actions + - ", indices=" + indices + - ", entryState=" + entryState + - ", scheduledTime=" + scheduledTime + - ", id='" + id + '\'' + - '}'; - } - @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - AlertActionEntry that = (AlertActionEntry) o; - - if (numberOfResults != that.numberOfResults) return false; - if (triggered != that.triggered) return false; - if (version != that.version) return false; - if (actions != null ? !actions.equals(that.actions) : that.actions != null) return false; - if (alertName != null ? !alertName.equals(that.alertName) : that.alertName != null) return false; - if (entryState != that.entryState) return false; - if (fireTime != null ? !fireTime.equals(that.fireTime) : that.fireTime != null) return false; - if (id != null ? !id.equals(that.id) : that.id != null) return false; - if (indices != null ? !indices.equals(that.indices) : that.indices != null) return false; - if (scheduledTime != null ? !scheduledTime.equals(that.scheduledTime) : that.scheduledTime != null) - return false; - if (trigger != null ? !trigger.equals(that.trigger) : that.trigger != null) return false; - if (triggeringSearchRequest != null ? !triggeringSearchRequest.equals(that.triggeringSearchRequest) : that.triggeringSearchRequest != null) - return false; + AlertActionEntry entry = (AlertActionEntry) o; + if (!id.equals(entry.id)) return false; return true; } @Override public int hashCode() { - int result = (int) (version ^ (version >>> 32)); - result = 31 * result + (alertName != null ? alertName.hashCode() : 0); - result = 31 * result + (triggered ? 1 : 0); - result = 31 * result + (fireTime != null ? fireTime.hashCode() : 0); - result = 31 * result + (trigger != null ? trigger.hashCode() : 0); - result = 31 * result + (triggeringSearchRequest != null ? triggeringSearchRequest.hashCode() : 0); - result = 31 * result + (int) (numberOfResults ^ (numberOfResults >>> 32)); - result = 31 * result + (actions != null ? actions.hashCode() : 0); - result = 31 * result + (indices != null ? indices.hashCode() : 0); - result = 31 * result + (entryState != null ? entryState.hashCode() : 0); - result = 31 * result + (scheduledTime != null ? scheduledTime.hashCode() : 0); - result = 31 * result + (id != null ? id.hashCode() : 0); - return result; + return id.hashCode(); } - } diff --git a/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java b/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java index 36df6434ef2..5c19df1f78a 100644 --- a/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java +++ b/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.alerts.Alert; @@ -28,6 +29,7 @@ import org.elasticsearch.common.Priority; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.BytesStreamInput; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.*; @@ -35,8 +37,6 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicReference; @@ -50,10 +50,10 @@ public class AlertActionManager extends AbstractComponent { public static final String FIRE_TIME_FIELD = "fireTime"; public static final String SCHEDULED_FIRE_TIME_FIELD = "scheduledFireTime"; public static final String TRIGGER_FIELD = "trigger"; - public static final String QUERY_RAN_FIELD = "queryRan"; - public static final String NUMBER_OF_RESULTS_FIELD = "numberOfResults"; + public static final String REQUEST = "request_binary"; + public static final String RESPONSE = "response_binary"; public static final String ACTIONS_FIELD = "actions"; - public static final String INDICES_FIELD = "indices"; + public static final String ALERT_HISTORY_INDEX = "alerthistory"; public static final String ALERT_HISTORY_TYPE = "alerthistory"; @@ -86,15 +86,11 @@ public class AlertActionManager extends AbstractComponent { logger.trace("last action fire [{}]", lastActionFire); logger.trace("msSinceLastAction [{}]", msSinceLastAction); - if (alert.timePeriod().getMillis() > msSinceLastAction) { - logger.debug("Not firing action because it was fired in the timePeriod"); - } else { - actionRegistry.doAction(alert, entry); - logger.debug("Did action !"); + actionRegistry.doAction(alert, entry); + logger.debug("Did action !"); - alert.lastActionFire(scheduledTime); - alertsStore.updateAlert(alert); - } + alert.lastActionFire(scheduledTime); + alertsStore.updateAlert(alert); updateHistoryEntry(entry, AlertActionState.ACTION_PERFORMED); } else { logger.warn("Unable to claim alert history entry" + entry); @@ -251,17 +247,9 @@ public class AlertActionManager extends AbstractComponent { case TRIGGER_FIELD: entry.setTrigger(TriggerManager.parseTrigger(parser)); break; - default: - throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]"); - } - } else if (token == XContentParser.Token.START_ARRAY) { - switch (currentFieldName) { - case INDICES_FIELD: - List indices = new ArrayList<>(); - while (parser.nextToken() != XContentParser.Token.END_ARRAY) { - indices.add(parser.text()); - } - entry.setIndices(indices); + case "response": + // Ignore this, the binary form is already read + parser.skipChildren(); break; default: throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]"); @@ -280,11 +268,15 @@ public class AlertActionManager extends AbstractComponent { case SCHEDULED_FIRE_TIME_FIELD: entry.setScheduledTime(DateTime.parse(parser.text())); break; - case QUERY_RAN_FIELD: - entry.setTriggeringSearchRequest(parser.text()); + case REQUEST: + SearchRequest request = new SearchRequest(); + request.readFrom(new BytesStreamInput(parser.binaryValue(), false)); + entry.setSearchRequest(request); break; - case NUMBER_OF_RESULTS_FIELD: - entry.setNumberOfResults(parser.longValue()); + case RESPONSE: + SearchResponse response = new SearchResponse(); + response.readFrom(new BytesStreamInput(parser.binaryValue(), false)); + entry.setSearchResponse(response); break; case AlertActionState.FIELD_NAME: entry.setEntryState(AlertActionState.fromString(parser.text())); @@ -303,7 +295,7 @@ public class AlertActionManager extends AbstractComponent { } - public void addAlertAction(Alert alert, TriggerResult result, DateTime fireTime, DateTime scheduledFireTime) throws IOException { + public void addAlertAction(Alert alert, TriggerResult result, DateTime scheduledFireTime, DateTime fireTime) throws IOException { if (!client.admin().indices().prepareExists(ALERT_HISTORY_INDEX).get().isExists()) { createAlertHistoryIndex(); } @@ -313,7 +305,7 @@ public class AlertActionManager extends AbstractComponent { state = AlertActionState.ACTION_NEEDED; } - AlertActionEntry entry = new AlertActionEntry(alert, result, fireTime, scheduledFireTime, state); + AlertActionEntry entry = new AlertActionEntry(alert, result, scheduledFireTime, fireTime, state); XContentBuilder historyEntry = XContentFactory.jsonBuilder(); entry.toXContent(historyEntry, ToXContent.EMPTY_PARAMS); diff --git a/src/main/java/org/elasticsearch/alerts/actions/EmailAlertAction.java b/src/main/java/org/elasticsearch/alerts/actions/EmailAlertAction.java index b40672a6f1f..472be8e3cf6 100644 --- a/src/main/java/org/elasticsearch/alerts/actions/EmailAlertAction.java +++ b/src/main/java/org/elasticsearch/alerts/actions/EmailAlertAction.java @@ -89,11 +89,11 @@ public class EmailAlertAction implements AlertAction { message.setSubject("Elasticsearch Alert " + alert.alertName() + " triggered"); StringBuffer output = new StringBuffer(); output.append("The following query triggered because " + result.getTrigger().toString() + "\n"); - output.append("The total number of hits returned : " + result.getNumberOfResults() + "\n"); - output.append("For query : " + result.getTriggeringSearchRequest()); + output.append("The total number of hits returned : " + result.getSearchResponse().getHits().getTotalHits() + "\n"); + output.append("For query : " + result.getSearchRequest()); output.append("\n"); output.append("Indices : "); - for (String index : result.getIndices()) { + for (String index : result.getSearchRequest().indices()) { output.append(index); output.append("/"); } diff --git a/src/main/java/org/elasticsearch/alerts/triggers/TriggerManager.java b/src/main/java/org/elasticsearch/alerts/triggers/TriggerManager.java index e2cdb49d696..d4a85590520 100644 --- a/src/main/java/org/elasticsearch/alerts/triggers/TriggerManager.java +++ b/src/main/java/org/elasticsearch/alerts/triggers/TriggerManager.java @@ -9,24 +9,22 @@ import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.alerts.Alert; import org.elasticsearch.client.Client; -import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.joda.FormatDateTimeFormatter; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.*; -import org.elasticsearch.index.query.FilteredQueryBuilder; -import org.elasticsearch.index.query.RangeFilterBuilder; -import org.elasticsearch.index.query.TemplateQueryBuilder; +import org.elasticsearch.index.mapper.core.DateFieldMapper; import org.elasticsearch.script.ExecutableScript; import org.elasticsearch.script.ScriptService; -import org.elasticsearch.search.builder.SearchSourceBuilder; import java.io.IOException; -import java.util.HashMap; import java.util.Map; @@ -35,8 +33,39 @@ import java.util.Map; */ public class TriggerManager extends AbstractComponent { + private static final FormatDateTimeFormatter dateTimeFormatter = DateFieldMapper.Defaults.DATE_TIME_FORMATTER; + private final Client client; private final ScriptService scriptService; + private final String fireTimePlaceHolder; + private final String scheduledFireTimePlaceHolder; + + @Inject + public TriggerManager(Settings settings, Client client, ScriptService scriptService) { + super(settings); + this.client = client; + this.scriptService = scriptService; + this.fireTimePlaceHolder = settings.get("prefix", "<<>>"); + this.scheduledFireTimePlaceHolder = settings.get("postfix", "<<>>"); + } + + public TriggerResult isTriggered(Alert alert, DateTime scheduledFireTime, DateTime fireTime) throws Exception { + SearchRequest request = prepareTriggerSearch(alert, scheduledFireTime, fireTime); + if (logger.isTraceEnabled()) { + logger.trace("For alert [{}] running query for [{}]", alert.alertName(), XContentHelper.convertToJson(request.source(), false, true)); + } + + SearchResponse response = client.search(request).get(); + logger.debug("Ran alert [{}] and got hits : [{}]", alert.alertName(), response.getHits().getTotalHits()); + switch (alert.trigger().triggerType()) { + case NUMBER_OF_EVENTS: + return doSimpleTrigger(alert, request, response); + case SCRIPT: + return doScriptTrigger(alert, request, response); + default: + throw new ElasticsearchIllegalArgumentException("Bad value for trigger.triggerType [" + alert.trigger().triggerType() + "]"); + } + } public static AlertTrigger parseTrigger(XContentParser parser) throws IOException { AlertTrigger trigger = null; @@ -86,31 +115,6 @@ public class TriggerManager extends AbstractComponent { return trigger; } - @Inject - public TriggerManager(Settings settings, Client client, ScriptService scriptService) { - super(settings); - this.client = client; - this.scriptService = scriptService; - } - - public TriggerResult isTriggered(Alert alert, DateTime scheduledFireTime) throws Exception { - SearchRequest request = createClampedRequest(scheduledFireTime, alert); - if (logger.isTraceEnabled()) { - logger.trace("For alert [{}] running query for [{}]", alert.alertName(), XContentHelper.convertToJson(request.source(), false, true)); - } - - SearchResponse response = client.search(request).get(); - logger.debug("Ran alert [{}] and got hits : [{}]", alert.alertName(), response.getHits().getTotalHits()); - switch (alert.trigger().triggerType()) { - case NUMBER_OF_EVENTS: - return doSimpleTrigger(alert, request, response); - case SCRIPT: - return doScriptTrigger(alert, request, response); - default: - throw new ElasticsearchIllegalArgumentException("Bad value for trigger.triggerType [" + alert.trigger().triggerType() + "]"); - } - } - private TriggerResult doSimpleTrigger(Alert alert, SearchRequest request, SearchResponse response) { boolean triggered = false; long testValue = response.getHits().getTotalHits(); @@ -164,27 +168,38 @@ public class TriggerManager extends AbstractComponent { return new TriggerResult(triggered, request, response); } - private SearchRequest createClampedRequest(DateTime scheduledFireTime, Alert alert){ - DateTime clampEnd = new DateTime(scheduledFireTime); - DateTime clampStart = clampEnd.minusSeconds((int)alert.timePeriod().seconds()); - SearchRequest request = new SearchRequest(alert.indices().toArray(new String[0])); - if (alert.simpleQuery()) { - TemplateQueryBuilder queryBuilder = new TemplateQueryBuilder(alert.queryName(), ScriptService.ScriptType.INDEXED, new HashMap()); - RangeFilterBuilder filterBuilder = new RangeFilterBuilder(alert.timestampString()); - filterBuilder.gte(clampStart); - filterBuilder.lt(clampEnd); - request.source(new SearchSourceBuilder().query(new FilteredQueryBuilder(queryBuilder, filterBuilder))); + private SearchRequest prepareTriggerSearch(Alert alert, DateTime scheduledFireTime, DateTime fireTime) throws IOException { + SearchRequest request = alert.getSearchRequest(); + if (Strings.hasLength(request.source())) { + String requestSource = request.source().toUtf8(); + if (requestSource.contains(fireTimePlaceHolder)) { + requestSource = requestSource.replace(fireTimePlaceHolder, dateTimeFormatter.printer().print(fireTime)); + } + if (requestSource.contains(scheduledFireTimePlaceHolder)) { + requestSource = requestSource.replace(scheduledFireTimePlaceHolder, dateTimeFormatter.printer().print(scheduledFireTime)); + } + request.source(requestSource); + } else if (Strings.hasLength(request.templateSource())) { + Tuple> tuple = XContentHelper.convertToMap(request.templateSource(), false); + Map templateSourceAsMap = tuple.v2(); + Map templateObject = (Map) templateSourceAsMap.get("template"); + if (templateObject != null) { + Map params = (Map) templateObject.get("params"); + params.put("scheduled_fire_time", dateTimeFormatter.printer().print(scheduledFireTime)); + params.put("fire_time", dateTimeFormatter.printer().print(fireTime)); + + XContentBuilder builder = XContentFactory.contentBuilder(tuple.v1()); + builder.map(templateSourceAsMap); + request.templateSource(builder.bytes(), false); + } + } else if (request.templateName() != null) { + MapBuilder templateParams = MapBuilder.newMapBuilder(request.templateParams()) + .put("scheduled_fire_time", dateTimeFormatter.printer().print(scheduledFireTime)) + .put("fire_time", dateTimeFormatter.printer().print(fireTime)); + request.templateParams(templateParams.map()); } else { - //We can't just wrap the template here since it probably contains aggs or something else that doesn't play nice with FilteredQuery - Map fromToMap = new HashMap<>(); - fromToMap.put("from", clampStart); //@TODO : make these parameters configurable ? Don't want to bloat the API too much tho - fromToMap.put("to", clampEnd); - //Go and get the search template from the script service :( - ExecutableScript script = scriptService.executable("mustache", alert.queryName(), ScriptService.ScriptType.INDEXED, fromToMap); - BytesReference requestBytes = (BytesReference)(script.run()); - request.source(requestBytes, false); + throw new ElasticsearchIllegalStateException("Search requests needs either source, template source or template name"); } - request.indicesOptions(IndicesOptions.lenientExpandOpen()); return request; } } diff --git a/src/test/java/org/elasticsearch/alerts/BasicAlertingTest.java b/src/test/java/org/elasticsearch/alerts/BasicAlertingTest.java index 850b4d779ba..baa2656a56f 100644 --- a/src/test/java/org/elasticsearch/alerts/BasicAlertingTest.java +++ b/src/test/java/org/elasticsearch/alerts/BasicAlertingTest.java @@ -12,9 +12,9 @@ import org.elasticsearch.alerts.triggers.AlertTrigger; import org.elasticsearch.alerts.triggers.ScriptedAlertTrigger; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.script.ScriptService; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.junit.Test; @@ -40,6 +40,7 @@ public class BasicAlertingTest extends ElasticsearchIntegrationTest { .put("plugin.mandatory", "alerts") .put("plugin.types", AlertsPlugin.class.getName()) .put("node.mode", "network") + .put("http.enabled", true) .put("plugins.load_classpath_plugins", false) .build(); } @@ -47,27 +48,11 @@ public class BasicAlertingTest extends ElasticsearchIntegrationTest { @Test // TODO: add request, response & request builder etc. public void testAlerSchedulerStartsProperly() throws Exception { - createIndex("my-index"); createIndex(AlertsStore.ALERT_INDEX); createIndex(AlertActionManager.ALERT_HISTORY_INDEX); ensureGreen("my-index", AlertsStore.ALERT_INDEX, AlertActionManager.ALERT_HISTORY_INDEX); - client().preparePutIndexedScript() - .setScriptLang("mustache") - .setId("query") - .setSource(jsonBuilder().startObject().startObject("template").startObject("match_all").endObject().endObject().endObject()) - .get(); - - /*client().admin().indices().preparePutTemplate("query") - .setTemplate("*") - .setSource(jsonBuilder().startObject().startObject("query").startObject("match_all").endObject().endObject().endObject()) - .get(); - - GetIndexTemplatesResponse templatesResponse = client().admin().indices().prepareGetTemplates("query").get(); - assertThat(templatesResponse.getIndexTemplates().size(), equalTo(1)); - assertThat(templatesResponse.getIndexTemplates().get(0).getName(), equalTo("query"));*/ - final AlertManager alertManager = internalCluster().getInstance(AlertManager.class, internalCluster().getMasterName()); assertBusy(new Runnable() { @Override @@ -107,16 +92,12 @@ public class BasicAlertingTest extends ElasticsearchIntegrationTest { AlertTrigger alertTrigger = new AlertTrigger(new ScriptedAlertTrigger("return true", ScriptService.ScriptType.INLINE, "groovy")); Alert alert = new Alert( "my-first-alert", - "/mustache/query", + client().prepareSearch("my-index").setQuery(QueryBuilders.matchAllQuery()).request(), alertTrigger, - TimeValue.timeValueSeconds(1), Arrays.asList(alertAction), "0/5 * * * * ? *", null, - Arrays.asList("my-index"), - null, 1, - true, true ); alertManager.addAlert("my-first-alert", jsonBuilder().value(alert).bytes()); diff --git a/src/test/java/org/elasticsearch/alerts/actions/AlertActionsTest.java b/src/test/java/org/elasticsearch/alerts/actions/AlertActionsTest.java index 50eaca0cee0..9cffe597666 100644 --- a/src/test/java/org/elasticsearch/alerts/actions/AlertActionsTest.java +++ b/src/test/java/org/elasticsearch/alerts/actions/AlertActionsTest.java @@ -5,12 +5,19 @@ */ package org.elasticsearch.alerts.actions; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.alerts.triggers.AlertTrigger; +import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.joda.FormatDateTimeFormatter; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.joda.time.DateTimeZone; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.mapper.core.DateFieldMapper; +import org.elasticsearch.search.internal.InternalSearchHit; +import org.elasticsearch.search.internal.InternalSearchHits; +import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.junit.Test; @@ -48,8 +55,17 @@ public class AlertActionsTest extends ElasticsearchIntegrationTest { builder.field(AlertActionManager.FIRE_TIME_FIELD, formatter.printer().print(fireTime)); builder.field(AlertActionManager.SCHEDULED_FIRE_TIME_FIELD, formatter.printer().print(scheduledFireTime)); builder.field(AlertActionManager.TRIGGER_FIELD, triggerMap); - builder.field(AlertActionManager.QUERY_RAN_FIELD, "foobar"); - builder.field(AlertActionManager.NUMBER_OF_RESULTS_FIELD, 10); + BytesStreamOutput out = new BytesStreamOutput(); + SearchRequest searchRequest = new SearchRequest("test123"); + searchRequest.writeTo(out); + builder.field(AlertActionManager.REQUEST, out.bytes()); + SearchResponse searchResponse = new SearchResponse( + new InternalSearchResponse(new InternalSearchHits(new InternalSearchHit[0], 10, 0), null, null, null, false, false), + null, 1, 1, 0, new ShardSearchFailure[0] + ); + out = new BytesStreamOutput(); + searchResponse.writeTo(out); + builder.field(AlertActionManager.RESPONSE, out.bytes()); builder.field(AlertActionManager.ACTIONS_FIELD, actionMap); builder.field(AlertActionState.FIELD_NAME, AlertActionState.ACTION_NEEDED.toString()); builder.endObject(); @@ -62,7 +78,7 @@ public class AlertActionsTest extends ElasticsearchIntegrationTest { assertEquals(actionEntry.getScheduledTime(), scheduledFireTime); assertEquals(actionEntry.getFireTime(), fireTime); assertEquals(actionEntry.getEntryState(), AlertActionState.ACTION_NEEDED); - assertEquals(actionEntry.getNumberOfResults(), 10); + assertEquals(actionEntry.getSearchResponse().getHits().getTotalHits(), 10); assertEquals(actionEntry.getTrigger(), new AlertTrigger(AlertTrigger.SimpleTrigger.GREATER_THAN, AlertTrigger.TriggerType.NUMBER_OF_EVENTS, 1));