Cleared up the alert / alert action entry classes and how the search request in the trigger manager gets prepared.

Original commit: elastic/x-pack-elasticsearch@52b16abb53
This commit is contained in:
Martijn van Groningen 2014-11-04 11:27:58 +01:00
parent 4373330a28
commit 8ba9c1c609
9 changed files with 296 additions and 403 deletions

View File

@ -5,10 +5,11 @@
*/
package org.elasticsearch.alerts;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.alerts.actions.AlertAction;
import org.elasticsearch.alerts.triggers.AlertTrigger;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -18,153 +19,41 @@ import java.util.List;
public class Alert implements ToXContent {
private String alertName;
private String queryName;
private SearchRequest searchRequest;
private AlertTrigger trigger;
private TimeValue timePeriod;
private List<AlertAction> actions;
private String schedule;
private DateTime lastActionFire;
private long version;
private boolean enabled;
private boolean simpleQuery;
private String timestampString = "@timestamp";
public String timestampString() {
return timestampString;
}
public void timestampString(String timestampString) {
this.timestampString = timestampString;
}
public DateTime lastActionFire() {
return lastActionFire;
}
public void lastActionFire(DateTime lastActionFire) {
this.lastActionFire = lastActionFire;
}
public boolean simpleQuery() {
return simpleQuery;
}
public void simpleQuery(boolean simpleQuery) {
this.simpleQuery = simpleQuery;
}
public boolean enabled() {
return enabled;
}
public void enabled(boolean enabled) {
this.enabled = enabled;
}
public long version() {
return version;
}
public void version(long version) {
this.version = version;
}
public List<String> indices() {
return indices;
}
public void indices(List<String> indices) {
this.indices = indices;
}
private List<String> indices;
public String alertName() {
return alertName;
}
public void alertName(String alertName) {
this.alertName = alertName;
}
public String queryName() {
return queryName;
}
public void queryName(String queryName) {
this.queryName = queryName;
}
public AlertTrigger trigger() {
return trigger;
}
public void trigger(AlertTrigger trigger) {
this.trigger = trigger;
}
public TimeValue timePeriod() {
return timePeriod;
}
public void timePeriod(TimeValue timePeriod) {
this.timePeriod = timePeriod;
}
public List<AlertAction> actions() {
return actions;
}
public void actions(List<AlertAction> action) {
this.actions = action;
}
public String schedule() {
return schedule;
}
public void schedule(String schedule) {
this.schedule = schedule;
}
public Alert() {
}
public Alert(String alertName, String queryName, AlertTrigger trigger,
TimeValue timePeriod, List<AlertAction> actions, String schedule, DateTime lastRan,
List<String> indices, DateTime running, long version, boolean enabled, boolean simpleQuery){
public Alert(String alertName, SearchRequest searchRequest, AlertTrigger trigger, List<AlertAction> actions, String schedule, DateTime lastActionFire, long version, boolean enabled) {
this.alertName = alertName;
this.queryName = queryName;
this.searchRequest = searchRequest;
this.trigger = trigger;
this.timePeriod = timePeriod;
this.actions = actions;
this.schedule = schedule;
this.indices = indices;
this.lastActionFire = lastActionFire;
this.version = version;
this.enabled = enabled;
this.simpleQuery = simpleQuery;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
//Note we deliberately don't serialize the version here
builder.startObject();
if (queryName != null) {
builder.field(AlertsStore.QUERY_NAME_FIELD.getPreferredName(), queryName);
}
BytesStreamOutput out = new BytesStreamOutput();
searchRequest.writeTo(out);
builder.field(AlertsStore.SEARCH_REQUEST_FIELD.getPreferredName(), out.bytes());
if (schedule != null) {
builder.field(AlertsStore.SCHEDULE_FIELD.getPreferredName(), schedule);
}
if (timePeriod != null) {
builder.field(AlertsStore.TIMEPERIOD_FIELD.getPreferredName(), timePeriod);
}
builder.field(AlertsStore.ENABLED.getPreferredName(), enabled);
builder.field(AlertsStore.SIMPLE_QUERY.getPreferredName(), simpleQuery);
if (lastActionFire != null) {
builder.field(AlertsStore.LAST_ACTION_FIRE.getPreferredName(), lastActionFire);
}
if (actions != null && !actions.isEmpty()) {
builder.startObject(AlertsStore.ACTION_FIELD.getPreferredName());
for (AlertAction action : actions){
@ -173,40 +62,99 @@ public class Alert implements ToXContent {
}
builder.endObject();
}
if (indices != null && !indices.isEmpty()) {
builder.startArray(AlertsStore.INDICES.getPreferredName());
for (String index : indices){
builder.value(index);
}
builder.endArray();
}
if (trigger != null) {
builder.field(AlertsStore.TRIGGER_FIELD.getPreferredName());
trigger.toXContent(builder, params);
}
builder.endObject();
return builder;
}
public boolean isSameAlert(Alert otherAlert) {
/**
* @return The last time this alert ran.
*/
public DateTime lastActionFire() {
return lastActionFire;
}
if (this == otherAlert) return true;
public void lastActionFire(DateTime lastActionFire) {
this.lastActionFire = lastActionFire;
}
if (enabled != otherAlert.enabled) return false;
if (simpleQuery != otherAlert.simpleQuery) return false;
if (actions != null ? !actions.equals(otherAlert.actions) : otherAlert.actions != null) return false;
if (alertName != null ? !alertName.equals(otherAlert.alertName) : otherAlert.alertName != null) return false;
if (indices != null ? !indices.equals(otherAlert.indices) : otherAlert.indices != null) return false;
if (queryName != null ? !queryName.equals(otherAlert.queryName) : otherAlert.queryName != null) return false;
if (schedule != null ? !schedule.equals(otherAlert.schedule) : otherAlert.schedule != null) return false;
if (timePeriod != null ? !timePeriod.equals(otherAlert.timePeriod) : otherAlert.timePeriod != null) return false;
if (timestampString != null ? !timestampString.equals(otherAlert.timestampString) : otherAlert.timestampString != null)
return false;
if (trigger != null ? !trigger.equals(otherAlert.trigger) : otherAlert.trigger != null) return false;
/**
* @return Whether this alert has been enabled.
*/
public boolean enabled() {
return enabled;
}
return true;
public void enabled(boolean enabled) {
this.enabled = enabled;
}
/**
* @return The current version of the alert. (es document version)
*/
public long version() {
return version;
}
public void version(long version) {
this.version = version;
}
/**
* @return The unique name of this alert.
*/
public String alertName() {
return alertName;
}
public void alertName(String alertName) {
this.alertName = alertName;
}
/**
* @return The search request that runs when the alert runs by the sc
*/
public SearchRequest getSearchRequest() {
return searchRequest;
}
public void setSearchRequest(SearchRequest searchRequest) {
this.searchRequest = searchRequest;
}
/**
* @return The trigger that is going to evaluate if the alert is going to execute the alert actions.
*/
public AlertTrigger trigger() {
return trigger;
}
public void trigger(AlertTrigger trigger) {
this.trigger = trigger;
}
/**
* @return the actions to be executed if the alert matches the trigger
*/
public List<AlertAction> actions() {
return actions;
}
public void actions(List<AlertAction> action) {
this.actions = action;
}
/**
* @return The cron schedule expression that expresses when to run the alert.
*/
public String schedule() {
return schedule;
}
public void schedule(String schedule) {
this.schedule = schedule;
}
}

View File

@ -109,8 +109,8 @@ public class AlertManager extends AbstractComponent {
return;
}
try {
TriggerResult result = triggerManager.isTriggered(alert, scheduledFireTime);
actionManager.addAlertAction(alert, result, fireTime, scheduledFireTime);
TriggerResult result = triggerManager.isTriggered(alert, scheduledFireTime, fireTime);
actionManager.addAlertAction(alert, result, scheduledFireTime, fireTime);
} catch (Exception e) {
logger.error("Failed execute alert [{}]", e, alertName);
}

View File

@ -14,6 +14,7 @@ import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.alerts.actions.AlertAction;
@ -28,6 +29,7 @@ import org.elasticsearch.common.Priority;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -37,7 +39,6 @@ import org.elasticsearch.search.SearchHit;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
@ -49,18 +50,12 @@ public class AlertsStore extends AbstractComponent {
public static final String ALERT_INDEX = ".alerts";
public static final String ALERT_TYPE = "alert";
public static final ParseField QUERY_NAME_FIELD = new ParseField("query");
public static final ParseField SCHEDULE_FIELD = new ParseField("schedule");
public static final ParseField TRIGGER_FIELD = new ParseField("trigger");
public static final ParseField TIMEPERIOD_FIELD = new ParseField("timeperiod");
public static final ParseField ACTION_FIELD = new ParseField("action");
public static final ParseField INDICES = new ParseField("indices");
public static final ParseField LAST_ACTION_FIRE = new ParseField("last_action_fire");
public static final ParseField ENABLED = new ParseField("enabled");
public static final ParseField SIMPLE_QUERY = new ParseField("simple");
public static final ParseField TIMESTAMP_FIELD = new ParseField("timefield");
public static final ParseField LAST_ACTION_FIRE = new ParseField("lastactionfire");
private final TimeValue defaultTimePeriod = new TimeValue(300*1000); //TODO : read from config
public static final ParseField SEARCH_REQUEST_FIELD = new ParseField("request");
private final Client client;
private final ThreadPool threadPool;
@ -276,33 +271,17 @@ public class AlertsStore extends AbstractComponent {
} else {
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]");
}
} else if (token == XContentParser.Token.START_ARRAY) {
if (INDICES.match(currentFieldName)) {
List<String> indices = new ArrayList<>();
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
indices.add(parser.text());
}
alert.indices(indices);
} else {
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]");
}
} else if (token.isValue()) {
if (QUERY_NAME_FIELD.match(currentFieldName)) {
alert.queryName(parser.textOrNull());
} else if (SCHEDULE_FIELD.match(currentFieldName)) {
if (SCHEDULE_FIELD.match(currentFieldName)) {
alert.schedule(parser.textOrNull());
} else if (TIMEPERIOD_FIELD.match(currentFieldName)) {
alert.timestampString(parser.textOrNull());
} else if (ENABLED.match(currentFieldName)) {
alert.enabled(parser.booleanValue());
} else if (SIMPLE_QUERY.match(currentFieldName)) {
alert.simpleQuery(parser.booleanValue());
} else if (TIMEPERIOD_FIELD.match(currentFieldName)) {
alert.timePeriod(TimeValue.parseTimeValue(parser.textOrNull(), defaultTimePeriod));
} else if (LAST_ACTION_FIRE.match(currentFieldName)) {
alert.lastActionFire(DateTime.parse(parser.textOrNull()));
} else if (TIMESTAMP_FIELD.match(currentFieldName)) {
alert.timestampString(parser.textOrNull());
} else if (SEARCH_REQUEST_FIELD.match(currentFieldName)) {
SearchRequest searchRequest = new SearchRequest();
searchRequest.readFrom(new BytesStreamInput(parser.binaryValue(), false));
alert.setSearchRequest(searchRequest);
} else {
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]");
}
@ -314,9 +293,6 @@ public class AlertsStore extends AbstractComponent {
throw new ElasticsearchException("Error during parsing alert", e);
}
if (alert.timePeriod() == null) {
alert.timePeriod(defaultTimePeriod);
}
if (alert.lastActionFire() == null) {
alert.lastActionFire(new DateTime(0));
}

View File

@ -5,33 +5,56 @@
*/
package org.elasticsearch.alerts.actions;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.triggers.AlertTrigger;
import org.elasticsearch.alerts.triggers.TriggerResult;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import java.io.IOException;
import java.util.List;
/**
* An alert action entry is an event of an alert that fired on particular moment in time.
*/
public class AlertActionEntry implements ToXContent{
private String id;
private long version;
private String alertName;
private boolean triggered;
private DateTime fireTime;
private AlertTrigger trigger;
private String triggeringSearchRequest;
private long numberOfResults;
private List<AlertAction> actions;
private List<String> indices;
private AlertActionState entryState;
private DateTime scheduledTime;
private AlertTrigger trigger;
private SearchRequest searchRequest;
private SearchResponse searchResponse;
private List<AlertAction> actions;
private AlertActionState entryState;
AlertActionEntry() {
}
public AlertActionEntry(Alert alert, TriggerResult result, DateTime scheduledTime, DateTime fireTime, AlertActionState state) throws IOException {
this.id = alert.alertName() + "#" + scheduledTime.toDateTimeISO();
this.version = 1;
this.alertName = alert.alertName();
this.triggered = result.isTriggered();
this.fireTime = fireTime;
this.scheduledTime = scheduledTime;
this.trigger = alert.trigger();
this.searchRequest = result.getRequest();
this.searchResponse = result.getResponse();
this.actions = alert.actions();
this.entryState = state;
}
/**
* @return The unique id of the alert action entry
*/
public String getId() {
return id;
}
@ -40,8 +63,9 @@ public class AlertActionEntry implements ToXContent{
this.id = id;
}
private String id;
/**
* @return The time the alert was scheduled to be triggered
*/
public DateTime getScheduledTime() {
return scheduledTime;
}
@ -50,6 +74,9 @@ public class AlertActionEntry implements ToXContent{
this.scheduledTime = scheduledTime;
}
/**
* @return The name of the alert that triggered
*/
public String getAlertName() {
return alertName;
}
@ -58,6 +85,9 @@ public class AlertActionEntry implements ToXContent{
this.alertName = alertName;
}
/**
* @return Whether the search request that run as part of the alert on a fire time matched with the defined trigger.
*/
public boolean isTriggered() {
return triggered;
}
@ -66,6 +96,9 @@ public class AlertActionEntry implements ToXContent{
this.triggered = triggered;
}
/**
* @return The time the alert actually ran.
*/
public DateTime getFireTime() {
return fireTime;
}
@ -74,6 +107,9 @@ public class AlertActionEntry implements ToXContent{
this.fireTime = fireTime;
}
/**
* @return The trigger that evaluated the search response
*/
public AlertTrigger getTrigger() {
return trigger;
}
@ -82,22 +118,31 @@ public class AlertActionEntry implements ToXContent{
this.trigger = trigger;
}
public String getTriggeringSearchRequest() {
return triggeringSearchRequest;
/**
* @return The query that ran at fire time
*/
public SearchRequest getSearchRequest() {
return searchRequest;
}
public void setTriggeringSearchRequest(String triggeringSearchRequest) {
this.triggeringSearchRequest = triggeringSearchRequest;
public void setSearchRequest(SearchRequest searchRequest) {
this.searchRequest = searchRequest;
}
public long getNumberOfResults() {
return numberOfResults;
/**
* @return The search response that resulted at out the search request that ran.
*/
public SearchResponse getSearchResponse() {
return searchResponse;
}
public void setNumberOfResults(long numberOfResults) {
this.numberOfResults = numberOfResults;
public void setSearchResponse(SearchResponse searchResponse) {
this.searchResponse = searchResponse;
}
/**
* @return The list of actions that ran if the search response matched with the trigger
*/
public List<AlertAction> getActions() {
return actions;
}
@ -106,14 +151,9 @@ public class AlertActionEntry implements ToXContent{
this.actions = actions;
}
public List<String> getIndices() {
return indices;
}
public void setIndices(List<String> indices) {
this.indices = indices;
}
/**
* @return The current state of the alert event.
*/
public AlertActionState getEntryState() {
return entryState;
}
@ -130,24 +170,6 @@ public class AlertActionEntry implements ToXContent{
this.version = version;
}
protected AlertActionEntry() {
}
public AlertActionEntry(Alert alert, TriggerResult result, DateTime fireTime, DateTime scheduledTime, AlertActionState state) throws IOException {
this.id = alert.alertName() + "#" + scheduledTime.toDateTimeISO();
this.version = 1;
this.alertName = alert.alertName();
this.triggered = result.isTriggered();
this.fireTime = fireTime;
this.scheduledTime = scheduledTime;
this.trigger = alert.trigger();
this.triggeringSearchRequest = XContentHelper.convertToJson(result.getRequest().source(), false, true);
this.numberOfResults = result.getResponse().getHits().totalHits();
this.actions = alert.actions();
this.indices = alert.indices();
this.entryState = state;
}
@Override
public XContentBuilder toXContent(XContentBuilder historyEntry, Params params) throws IOException {
historyEntry.startObject();
@ -155,98 +177,41 @@ public class AlertActionEntry implements ToXContent{
historyEntry.field("triggered", triggered);
historyEntry.field("fireTime", fireTime.toDateTimeISO());
historyEntry.field(AlertActionManager.SCHEDULED_FIRE_TIME_FIELD, scheduledTime.toDateTimeISO());
historyEntry.field("trigger");
trigger.toXContent(historyEntry, ToXContent.EMPTY_PARAMS);
historyEntry.field("queryRan", triggeringSearchRequest);
historyEntry.field("numberOfResults", numberOfResults);
historyEntry.field("actions");
historyEntry.startObject();
historyEntry.field("trigger", trigger, params);
BytesStreamOutput out = new BytesStreamOutput();
searchRequest.writeTo(out);
historyEntry.field("request_binary", out.bytes());
out = new BytesStreamOutput();
searchResponse.writeTo(out);
historyEntry.field("response_binary", out.bytes());
// Serializing it as xcontent allows the search response to be encapsulated in a doc as a json object
historyEntry.startObject("response");
searchResponse.toXContent(historyEntry, params);
historyEntry.endObject();
historyEntry.startObject("actions");
for (AlertAction action : actions) {
historyEntry.field(action.getActionName());
action.toXContent(historyEntry, params);
}
historyEntry.endObject();
if (indices != null) {
historyEntry.field("indices");
historyEntry.startArray();
for (String index : indices) {
historyEntry.value(index);
}
historyEntry.endArray();
}
historyEntry.field(AlertActionState.FIELD_NAME, entryState.toString());
historyEntry.endObject();
return historyEntry;
}
@Override
public String toString() {
return "AlertHistoryEntry{" +
"version=" + version +
", alertName='" + alertName + '\'' +
", triggered=" + triggered +
", fireTime=" + fireTime +
", trigger=" + trigger +
", triggeringSearchRequest='" + triggeringSearchRequest + '\'' +
", numberOfResults=" + numberOfResults +
", actions=" + actions +
", indices=" + indices +
", entryState=" + entryState +
", scheduledTime=" + scheduledTime +
", id='" + id + '\'' +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AlertActionEntry that = (AlertActionEntry) o;
if (numberOfResults != that.numberOfResults) return false;
if (triggered != that.triggered) return false;
if (version != that.version) return false;
if (actions != null ? !actions.equals(that.actions) : that.actions != null) return false;
if (alertName != null ? !alertName.equals(that.alertName) : that.alertName != null) return false;
if (entryState != that.entryState) return false;
if (fireTime != null ? !fireTime.equals(that.fireTime) : that.fireTime != null) return false;
if (id != null ? !id.equals(that.id) : that.id != null) return false;
if (indices != null ? !indices.equals(that.indices) : that.indices != null) return false;
if (scheduledTime != null ? !scheduledTime.equals(that.scheduledTime) : that.scheduledTime != null)
return false;
if (trigger != null ? !trigger.equals(that.trigger) : that.trigger != null) return false;
if (triggeringSearchRequest != null ? !triggeringSearchRequest.equals(that.triggeringSearchRequest) : that.triggeringSearchRequest != null)
return false;
AlertActionEntry entry = (AlertActionEntry) o;
if (!id.equals(entry.id)) return false;
return true;
}
@Override
public int hashCode() {
int result = (int) (version ^ (version >>> 32));
result = 31 * result + (alertName != null ? alertName.hashCode() : 0);
result = 31 * result + (triggered ? 1 : 0);
result = 31 * result + (fireTime != null ? fireTime.hashCode() : 0);
result = 31 * result + (trigger != null ? trigger.hashCode() : 0);
result = 31 * result + (triggeringSearchRequest != null ? triggeringSearchRequest.hashCode() : 0);
result = 31 * result + (int) (numberOfResults ^ (numberOfResults >>> 32));
result = 31 * result + (actions != null ? actions.hashCode() : 0);
result = 31 * result + (indices != null ? indices.hashCode() : 0);
result = 31 * result + (entryState != null ? entryState.hashCode() : 0);
result = 31 * result + (scheduledTime != null ? scheduledTime.hashCode() : 0);
result = 31 * result + (id != null ? id.hashCode() : 0);
return result;
return id.hashCode();
}
}

View File

@ -13,6 +13,7 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.alerts.Alert;
@ -28,6 +29,7 @@ import org.elasticsearch.common.Priority;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.*;
@ -35,8 +37,6 @@ import org.elasticsearch.search.SearchHit;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
@ -50,10 +50,10 @@ public class AlertActionManager extends AbstractComponent {
public static final String FIRE_TIME_FIELD = "fireTime";
public static final String SCHEDULED_FIRE_TIME_FIELD = "scheduledFireTime";
public static final String TRIGGER_FIELD = "trigger";
public static final String QUERY_RAN_FIELD = "queryRan";
public static final String NUMBER_OF_RESULTS_FIELD = "numberOfResults";
public static final String REQUEST = "request_binary";
public static final String RESPONSE = "response_binary";
public static final String ACTIONS_FIELD = "actions";
public static final String INDICES_FIELD = "indices";
public static final String ALERT_HISTORY_INDEX = "alerthistory";
public static final String ALERT_HISTORY_TYPE = "alerthistory";
@ -86,15 +86,11 @@ public class AlertActionManager extends AbstractComponent {
logger.trace("last action fire [{}]", lastActionFire);
logger.trace("msSinceLastAction [{}]", msSinceLastAction);
if (alert.timePeriod().getMillis() > msSinceLastAction) {
logger.debug("Not firing action because it was fired in the timePeriod");
} else {
actionRegistry.doAction(alert, entry);
logger.debug("Did action !");
actionRegistry.doAction(alert, entry);
logger.debug("Did action !");
alert.lastActionFire(scheduledTime);
alertsStore.updateAlert(alert);
}
alert.lastActionFire(scheduledTime);
alertsStore.updateAlert(alert);
updateHistoryEntry(entry, AlertActionState.ACTION_PERFORMED);
} else {
logger.warn("Unable to claim alert history entry" + entry);
@ -251,17 +247,9 @@ public class AlertActionManager extends AbstractComponent {
case TRIGGER_FIELD:
entry.setTrigger(TriggerManager.parseTrigger(parser));
break;
default:
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]");
}
} else if (token == XContentParser.Token.START_ARRAY) {
switch (currentFieldName) {
case INDICES_FIELD:
List<String> indices = new ArrayList<>();
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
indices.add(parser.text());
}
entry.setIndices(indices);
case "response":
// Ignore this, the binary form is already read
parser.skipChildren();
break;
default:
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]");
@ -280,11 +268,15 @@ public class AlertActionManager extends AbstractComponent {
case SCHEDULED_FIRE_TIME_FIELD:
entry.setScheduledTime(DateTime.parse(parser.text()));
break;
case QUERY_RAN_FIELD:
entry.setTriggeringSearchRequest(parser.text());
case REQUEST:
SearchRequest request = new SearchRequest();
request.readFrom(new BytesStreamInput(parser.binaryValue(), false));
entry.setSearchRequest(request);
break;
case NUMBER_OF_RESULTS_FIELD:
entry.setNumberOfResults(parser.longValue());
case RESPONSE:
SearchResponse response = new SearchResponse();
response.readFrom(new BytesStreamInput(parser.binaryValue(), false));
entry.setSearchResponse(response);
break;
case AlertActionState.FIELD_NAME:
entry.setEntryState(AlertActionState.fromString(parser.text()));
@ -303,7 +295,7 @@ public class AlertActionManager extends AbstractComponent {
}
public void addAlertAction(Alert alert, TriggerResult result, DateTime fireTime, DateTime scheduledFireTime) throws IOException {
public void addAlertAction(Alert alert, TriggerResult result, DateTime scheduledFireTime, DateTime fireTime) throws IOException {
if (!client.admin().indices().prepareExists(ALERT_HISTORY_INDEX).get().isExists()) {
createAlertHistoryIndex();
}
@ -313,7 +305,7 @@ public class AlertActionManager extends AbstractComponent {
state = AlertActionState.ACTION_NEEDED;
}
AlertActionEntry entry = new AlertActionEntry(alert, result, fireTime, scheduledFireTime, state);
AlertActionEntry entry = new AlertActionEntry(alert, result, scheduledFireTime, fireTime, state);
XContentBuilder historyEntry = XContentFactory.jsonBuilder();
entry.toXContent(historyEntry, ToXContent.EMPTY_PARAMS);

View File

@ -89,11 +89,11 @@ public class EmailAlertAction implements AlertAction {
message.setSubject("Elasticsearch Alert " + alert.alertName() + " triggered");
StringBuffer output = new StringBuffer();
output.append("The following query triggered because " + result.getTrigger().toString() + "\n");
output.append("The total number of hits returned : " + result.getNumberOfResults() + "\n");
output.append("For query : " + result.getTriggeringSearchRequest());
output.append("The total number of hits returned : " + result.getSearchResponse().getHits().getTotalHits() + "\n");
output.append("For query : " + result.getSearchRequest());
output.append("\n");
output.append("Indices : ");
for (String index : result.getIndices()) {
for (String index : result.getSearchRequest().indices()) {
output.append(index);
output.append("/");
}

View File

@ -9,24 +9,22 @@ import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.FormatDateTimeFormatter;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.index.query.FilteredQueryBuilder;
import org.elasticsearch.index.query.RangeFilterBuilder;
import org.elasticsearch.index.query.TemplateQueryBuilder;
import org.elasticsearch.index.mapper.core.DateFieldMapper;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@ -35,8 +33,39 @@ import java.util.Map;
*/
public class TriggerManager extends AbstractComponent {
private static final FormatDateTimeFormatter dateTimeFormatter = DateFieldMapper.Defaults.DATE_TIME_FORMATTER;
private final Client client;
private final ScriptService scriptService;
private final String fireTimePlaceHolder;
private final String scheduledFireTimePlaceHolder;
@Inject
public TriggerManager(Settings settings, Client client, ScriptService scriptService) {
super(settings);
this.client = client;
this.scriptService = scriptService;
this.fireTimePlaceHolder = settings.get("prefix", "<<<FIRE_TIME>>>");
this.scheduledFireTimePlaceHolder = settings.get("postfix", "<<<SCHEDULED_FIRE_TIME>>>");
}
public TriggerResult isTriggered(Alert alert, DateTime scheduledFireTime, DateTime fireTime) throws Exception {
SearchRequest request = prepareTriggerSearch(alert, scheduledFireTime, fireTime);
if (logger.isTraceEnabled()) {
logger.trace("For alert [{}] running query for [{}]", alert.alertName(), XContentHelper.convertToJson(request.source(), false, true));
}
SearchResponse response = client.search(request).get();
logger.debug("Ran alert [{}] and got hits : [{}]", alert.alertName(), response.getHits().getTotalHits());
switch (alert.trigger().triggerType()) {
case NUMBER_OF_EVENTS:
return doSimpleTrigger(alert, request, response);
case SCRIPT:
return doScriptTrigger(alert, request, response);
default:
throw new ElasticsearchIllegalArgumentException("Bad value for trigger.triggerType [" + alert.trigger().triggerType() + "]");
}
}
public static AlertTrigger parseTrigger(XContentParser parser) throws IOException {
AlertTrigger trigger = null;
@ -86,31 +115,6 @@ public class TriggerManager extends AbstractComponent {
return trigger;
}
@Inject
public TriggerManager(Settings settings, Client client, ScriptService scriptService) {
super(settings);
this.client = client;
this.scriptService = scriptService;
}
public TriggerResult isTriggered(Alert alert, DateTime scheduledFireTime) throws Exception {
SearchRequest request = createClampedRequest(scheduledFireTime, alert);
if (logger.isTraceEnabled()) {
logger.trace("For alert [{}] running query for [{}]", alert.alertName(), XContentHelper.convertToJson(request.source(), false, true));
}
SearchResponse response = client.search(request).get();
logger.debug("Ran alert [{}] and got hits : [{}]", alert.alertName(), response.getHits().getTotalHits());
switch (alert.trigger().triggerType()) {
case NUMBER_OF_EVENTS:
return doSimpleTrigger(alert, request, response);
case SCRIPT:
return doScriptTrigger(alert, request, response);
default:
throw new ElasticsearchIllegalArgumentException("Bad value for trigger.triggerType [" + alert.trigger().triggerType() + "]");
}
}
private TriggerResult doSimpleTrigger(Alert alert, SearchRequest request, SearchResponse response) {
boolean triggered = false;
long testValue = response.getHits().getTotalHits();
@ -164,27 +168,38 @@ public class TriggerManager extends AbstractComponent {
return new TriggerResult(triggered, request, response);
}
private SearchRequest createClampedRequest(DateTime scheduledFireTime, Alert alert){
DateTime clampEnd = new DateTime(scheduledFireTime);
DateTime clampStart = clampEnd.minusSeconds((int)alert.timePeriod().seconds());
SearchRequest request = new SearchRequest(alert.indices().toArray(new String[0]));
if (alert.simpleQuery()) {
TemplateQueryBuilder queryBuilder = new TemplateQueryBuilder(alert.queryName(), ScriptService.ScriptType.INDEXED, new HashMap<String, Object>());
RangeFilterBuilder filterBuilder = new RangeFilterBuilder(alert.timestampString());
filterBuilder.gte(clampStart);
filterBuilder.lt(clampEnd);
request.source(new SearchSourceBuilder().query(new FilteredQueryBuilder(queryBuilder, filterBuilder)));
private SearchRequest prepareTriggerSearch(Alert alert, DateTime scheduledFireTime, DateTime fireTime) throws IOException {
SearchRequest request = alert.getSearchRequest();
if (Strings.hasLength(request.source())) {
String requestSource = request.source().toUtf8();
if (requestSource.contains(fireTimePlaceHolder)) {
requestSource = requestSource.replace(fireTimePlaceHolder, dateTimeFormatter.printer().print(fireTime));
}
if (requestSource.contains(scheduledFireTimePlaceHolder)) {
requestSource = requestSource.replace(scheduledFireTimePlaceHolder, dateTimeFormatter.printer().print(scheduledFireTime));
}
request.source(requestSource);
} else if (Strings.hasLength(request.templateSource())) {
Tuple<XContentType, Map<String, Object>> tuple = XContentHelper.convertToMap(request.templateSource(), false);
Map<String, Object> templateSourceAsMap = tuple.v2();
Map<String, Object> templateObject = (Map<String, Object>) templateSourceAsMap.get("template");
if (templateObject != null) {
Map<String, Object> params = (Map<String, Object>) templateObject.get("params");
params.put("scheduled_fire_time", dateTimeFormatter.printer().print(scheduledFireTime));
params.put("fire_time", dateTimeFormatter.printer().print(fireTime));
XContentBuilder builder = XContentFactory.contentBuilder(tuple.v1());
builder.map(templateSourceAsMap);
request.templateSource(builder.bytes(), false);
}
} else if (request.templateName() != null) {
MapBuilder<String, String> templateParams = MapBuilder.newMapBuilder(request.templateParams())
.put("scheduled_fire_time", dateTimeFormatter.printer().print(scheduledFireTime))
.put("fire_time", dateTimeFormatter.printer().print(fireTime));
request.templateParams(templateParams.map());
} else {
//We can't just wrap the template here since it probably contains aggs or something else that doesn't play nice with FilteredQuery
Map<String,Object> fromToMap = new HashMap<>();
fromToMap.put("from", clampStart); //@TODO : make these parameters configurable ? Don't want to bloat the API too much tho
fromToMap.put("to", clampEnd);
//Go and get the search template from the script service :(
ExecutableScript script = scriptService.executable("mustache", alert.queryName(), ScriptService.ScriptType.INDEXED, fromToMap);
BytesReference requestBytes = (BytesReference)(script.run());
request.source(requestBytes, false);
throw new ElasticsearchIllegalStateException("Search requests needs either source, template source or template name");
}
request.indicesOptions(IndicesOptions.lenientExpandOpen());
return request;
}
}

View File

@ -12,9 +12,9 @@ import org.elasticsearch.alerts.triggers.AlertTrigger;
import org.elasticsearch.alerts.triggers.ScriptedAlertTrigger;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
@ -40,6 +40,7 @@ public class BasicAlertingTest extends ElasticsearchIntegrationTest {
.put("plugin.mandatory", "alerts")
.put("plugin.types", AlertsPlugin.class.getName())
.put("node.mode", "network")
.put("http.enabled", true)
.put("plugins.load_classpath_plugins", false)
.build();
}
@ -47,27 +48,11 @@ public class BasicAlertingTest extends ElasticsearchIntegrationTest {
@Test
// TODO: add request, response & request builder etc.
public void testAlerSchedulerStartsProperly() throws Exception {
createIndex("my-index");
createIndex(AlertsStore.ALERT_INDEX);
createIndex(AlertActionManager.ALERT_HISTORY_INDEX);
ensureGreen("my-index", AlertsStore.ALERT_INDEX, AlertActionManager.ALERT_HISTORY_INDEX);
client().preparePutIndexedScript()
.setScriptLang("mustache")
.setId("query")
.setSource(jsonBuilder().startObject().startObject("template").startObject("match_all").endObject().endObject().endObject())
.get();
/*client().admin().indices().preparePutTemplate("query")
.setTemplate("*")
.setSource(jsonBuilder().startObject().startObject("query").startObject("match_all").endObject().endObject().endObject())
.get();
GetIndexTemplatesResponse templatesResponse = client().admin().indices().prepareGetTemplates("query").get();
assertThat(templatesResponse.getIndexTemplates().size(), equalTo(1));
assertThat(templatesResponse.getIndexTemplates().get(0).getName(), equalTo("query"));*/
final AlertManager alertManager = internalCluster().getInstance(AlertManager.class, internalCluster().getMasterName());
assertBusy(new Runnable() {
@Override
@ -107,16 +92,12 @@ public class BasicAlertingTest extends ElasticsearchIntegrationTest {
AlertTrigger alertTrigger = new AlertTrigger(new ScriptedAlertTrigger("return true", ScriptService.ScriptType.INLINE, "groovy"));
Alert alert = new Alert(
"my-first-alert",
"/mustache/query",
client().prepareSearch("my-index").setQuery(QueryBuilders.matchAllQuery()).request(),
alertTrigger,
TimeValue.timeValueSeconds(1),
Arrays.asList(alertAction),
"0/5 * * * * ? *",
null,
Arrays.asList("my-index"),
null,
1,
true,
true
);
alertManager.addAlert("my-first-alert", jsonBuilder().value(alert).bytes());

View File

@ -5,12 +5,19 @@
*/
package org.elasticsearch.alerts.actions;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.alerts.triggers.AlertTrigger;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.joda.FormatDateTimeFormatter;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.DateTimeZone;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.mapper.core.DateFieldMapper;
import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.search.internal.InternalSearchHits;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
@ -48,8 +55,17 @@ public class AlertActionsTest extends ElasticsearchIntegrationTest {
builder.field(AlertActionManager.FIRE_TIME_FIELD, formatter.printer().print(fireTime));
builder.field(AlertActionManager.SCHEDULED_FIRE_TIME_FIELD, formatter.printer().print(scheduledFireTime));
builder.field(AlertActionManager.TRIGGER_FIELD, triggerMap);
builder.field(AlertActionManager.QUERY_RAN_FIELD, "foobar");
builder.field(AlertActionManager.NUMBER_OF_RESULTS_FIELD, 10);
BytesStreamOutput out = new BytesStreamOutput();
SearchRequest searchRequest = new SearchRequest("test123");
searchRequest.writeTo(out);
builder.field(AlertActionManager.REQUEST, out.bytes());
SearchResponse searchResponse = new SearchResponse(
new InternalSearchResponse(new InternalSearchHits(new InternalSearchHit[0], 10, 0), null, null, null, false, false),
null, 1, 1, 0, new ShardSearchFailure[0]
);
out = new BytesStreamOutput();
searchResponse.writeTo(out);
builder.field(AlertActionManager.RESPONSE, out.bytes());
builder.field(AlertActionManager.ACTIONS_FIELD, actionMap);
builder.field(AlertActionState.FIELD_NAME, AlertActionState.ACTION_NEEDED.toString());
builder.endObject();
@ -62,7 +78,7 @@ public class AlertActionsTest extends ElasticsearchIntegrationTest {
assertEquals(actionEntry.getScheduledTime(), scheduledFireTime);
assertEquals(actionEntry.getFireTime(), fireTime);
assertEquals(actionEntry.getEntryState(), AlertActionState.ACTION_NEEDED);
assertEquals(actionEntry.getNumberOfResults(), 10);
assertEquals(actionEntry.getSearchResponse().getHits().getTotalHits(), 10);
assertEquals(actionEntry.getTrigger(),
new AlertTrigger(AlertTrigger.SimpleTrigger.GREATER_THAN, AlertTrigger.TriggerType.NUMBER_OF_EVENTS, 1));