Alerting: Scripted triggers and support for aggregations in searches.

This commit adds support for triggers that are scripts:

Query :
````
POST /_search/template/testFilteredAgg
{
  "query" : {
    "filtered" : {
      "query" : {
        "match_all" : {}
     },
     "filter": {
       "range" : {
         "@timestamp" : {
             "gte" : "{{from}}",
             "lt" : "{{to}}"
         }
       }
     }
    }
  },
    "aggs" : {
      "response" : {
        "terms" : {
          "field" : "response",
          "size" : 100
        }
      }
}, "size" : 0  }
````

Trigger Script:
````
POST /_scripts/groovy/testScript
{
  "script" : "ok_count = 0.0;error_count = 0.0;for(bucket in aggregations.response.buckets) {if (bucket.key < 400){ok_count += bucket.doc_count;} else {error_count += bucket.doc_count;}}; return error_count/(ok_count+1) > 0.1;"
}
````

Alert:
````
POST /_alerting/_create/myScriptedAlert
{
    "query" : "testFilteredAgg",
    "schedule" : "05 * * * * ?",
    "trigger" : {
         "script" : {
           "script" : "testScript",
           "script_lang" : "groovy",
           "script_type" : "INDEXED"
         }
     },
    "timeperiod" : "300s",
     "action" : {
         "index" : {
           "index" : "weberrorhistory",
           "type" : "weberrorresult"
         }
     },
    "indices" : [ "logstash*" ],
    "enabled" : true,
    "simple" : false
}
````

If you want to use aggs with your alert you must create a search that contains the timefilter with the params ````{{from}}```` and ````{{to}}```` and set the ````simple```` flag to ````true````.

Original commit: elastic/x-pack-elasticsearch@0430a1bf40
This commit is contained in:
Brian Murphy 2014-08-18 16:59:49 +01:00
parent 4216491824
commit 0eea73dd72
8 changed files with 121 additions and 67 deletions

View File

@ -26,7 +26,26 @@ public class Alert implements ToXContent{
private long version;
private DateTime running;
private boolean enabled;
private boolean simpleQuery;
private String timestampString = "@timestamp";
public String timestampString() {
return timestampString;
}
public void timestampString(String timestampString) {
this.timestampString = timestampString;
}
public boolean simpleQuery() {
return simpleQuery;
}
public void simpleQuery(boolean simpleQuery) {
this.simpleQuery = simpleQuery;
}
public boolean enabled() {
return enabled;
@ -116,7 +135,7 @@ public class Alert implements ToXContent{
public Alert(String alertName, String queryName, AlertTrigger trigger,
TimeValue timePeriod, List<AlertAction> actions, String schedule, DateTime lastRan,
List<String> indices, DateTime running, long version, boolean enabled){
List<String> indices, DateTime running, long version, boolean enabled, boolean simpleQuery){
this.alertName = alertName;
this.queryName = queryName;
this.trigger = trigger;
@ -128,6 +147,7 @@ public class Alert implements ToXContent{
this.version = version;
this.running = running;
this.enabled = enabled;
this.simpleQuery = simpleQuery;
}
@Override
@ -141,6 +161,7 @@ public class Alert implements ToXContent{
builder.field(AlertManager.LASTRAN_FIELD.getPreferredName(), lastRan);
builder.field(AlertManager.CURRENTLY_RUNNING.getPreferredName(), running);
builder.field(AlertManager.ENABLED.getPreferredName(), enabled);
builder.field(AlertManager.SIMPLE_QUERY.getPreferredName(), simpleQuery);
builder.field(AlertManager.TRIGGER_FIELD.getPreferredName());
trigger.toXContent(builder, params);

View File

@ -16,6 +16,7 @@ import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;
@ -52,6 +53,8 @@ public class AlertManager extends AbstractLifecycleComponent {
public static final ParseField INDICES = new ParseField("indices");
public static final ParseField CURRENTLY_RUNNING = new ParseField("running");
public static final ParseField ENABLED = new ParseField("enabled");
public static final ParseField SIMPLE_QUERY = new ParseField("simple");
public static final ParseField TIMESTAMP_FIELD = new ParseField("timefield");
private final Client client;
private AlertScheduler scheduler;
@ -269,7 +272,7 @@ public class AlertManager extends AbstractLifecycleComponent {
}
public boolean addHistory(String alertName, boolean triggered,
DateTime fireTime, XContentBuilder triggeringQuery,
DateTime fireTime, SearchRequestBuilder triggeringQuery,
AlertTrigger trigger, long numberOfResults,
@Nullable List<String> indices) throws Exception {
XContentBuilder historyEntry = XContentFactory.jsonBuilder();
@ -279,7 +282,7 @@ public class AlertManager extends AbstractLifecycleComponent {
historyEntry.field("fireTime", fireTime.toDateTimeISO());
historyEntry.field("trigger");
trigger.toXContent(historyEntry, ToXContent.EMPTY_PARAMS);
historyEntry.field("queryRan", XContentHelper.convertToJson(triggeringQuery.bytes(),false,true));
historyEntry.field("queryRan", triggeringQuery.toString());
historyEntry.field("numberOfResults", numberOfResults);
if (indices != null) {
historyEntry.field("indices");
@ -426,7 +429,29 @@ public class AlertManager extends AbstractLifecycleComponent {
boolean enabled = true;
if (fields.get(ENABLED.getPreferredName()) != null ) {
logger.error(ENABLED.getPreferredName() + " " + fields.get(ENABLED.getPreferredName()));
Object enabledObj = fields.get(ENABLED.getPreferredName());
enabled = parseAsBoolean(enabledObj);
}
boolean simpleQuery = true;
if (fields.get(SIMPLE_QUERY.getPreferredName()) != null ) {
logger.error(SIMPLE_QUERY.getPreferredName() + " " + fields.get(SIMPLE_QUERY.getPreferredName()));
Object enabledObj = fields.get(SIMPLE_QUERY.getPreferredName());
simpleQuery = parseAsBoolean(enabledObj);
}
Alert alert = new Alert(alertId, query, trigger, timePeriod, actions, schedule, lastRan, indices, running, version, enabled, simpleQuery);
if (fields.get(TIMESTAMP_FIELD.getPreferredName()) != null) {
alert.timestampString(fields.get(TIMESTAMP_FIELD.getPreferredName()).toString());
}
return alert;
}
private boolean parseAsBoolean(Object enabledObj) {
boolean enabled;
if (enabledObj instanceof Boolean){
enabled = (Boolean)enabledObj;
} else {
@ -440,10 +465,7 @@ public class AlertManager extends AbstractLifecycleComponent {
throw new ElasticsearchIllegalArgumentException("Unable to parse [" + enabledObj + "] as a boolean");
}
}
}
return new Alert(alertId, query, trigger, timePeriod, actions, schedule, lastRan, indices, running, version, enabled);
return enabled;
}
public Map<String,Alert> getSafeAlertMap() {

View File

@ -86,7 +86,6 @@ public class AlertRestHandler implements RestHandler {
failed.endObject();
restChannel.sendResponse(new BytesRestResponse(BAD_REQUEST,failed));
}
return true;
} else if (request.method() == DELETE) {
String alertName = request.param("name");
@ -113,5 +112,4 @@ public class AlertRestHandler implements RestHandler {
return builder;
}
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.alerting;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -16,8 +17,11 @@ public class AlertResult {
public AlertTrigger trigger;
public String alertName;
public DateTime fireTime;
public boolean isTriggered;
public SearchRequestBuilder query;
public String[] indices;
public AlertResult(String alertName, SearchResponse searchResponse, AlertTrigger trigger, boolean isTriggered, XContentBuilder query, String[] indices, DateTime fireTime) {
public AlertResult(String alertName, SearchResponse searchResponse, AlertTrigger trigger, boolean isTriggered, SearchRequestBuilder query, String[] indices, DateTime fireTime) {
this.searchResponse = searchResponse;
this.trigger = trigger;
this.isTriggered = isTriggered;
@ -27,10 +31,6 @@ public class AlertResult {
this.fireTime = fireTime;
}
public boolean isTriggered;
public XContentBuilder query;
public String[] indices;
@Override
public boolean equals(Object o) {
if (this == o) return true;

View File

@ -9,17 +9,23 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.index.query.*;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.simpl.SimpleJobFactory;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
public class AlertScheduler extends AbstractLifecycleComponent {
@ -28,15 +34,19 @@ public class AlertScheduler extends AbstractLifecycleComponent {
private final Client client;
private final TriggerManager triggerManager;
private final AlertActionManager actionManager;
private final ScriptService scriptService;
@Inject
public AlertScheduler(Settings settings, AlertManager alertManager, Client client,
TriggerManager triggerManager, AlertActionManager actionManager) {
TriggerManager triggerManager, AlertActionManager actionManager,
ScriptService scriptService) {
super(settings);
this.alertManager = alertManager;
this.client = client;
this.triggerManager = triggerManager;
this.actionManager = actionManager;
this.scriptService = scriptService;
try {
SchedulerFactory schFactory = new StdSchedulerFactory();
scheduler = schFactory.getScheduler();
@ -76,19 +86,23 @@ public class AlertScheduler extends AbstractLifecycleComponent {
logger.warn("Another process has already run this alert.");
return;
}
XContentBuilder builder = createClampedQuery(jobExecutionContext, alert);
logger.warn("Running the following query : [{}]", builder.string());
SearchRequestBuilder srb = client.prepareSearch().setSource(builder);
SearchRequestBuilder srb = createClampedRequest(client, jobExecutionContext, alert);
String[] indices = alert.indices().toArray(new String[0]);
if (alert.indices() != null ){
logger.warn("Setting indices to : " + alert.indices());
srb.setIndices(indices);
}
//if (logger.isDebugEnabled()) {
logger.warn("Running query [{}]", XContentHelper.convertToJson(srb.request().source(),false,true));
//}
SearchResponse sr = srb.execute().get();
logger.warn("Got search response hits : [{}]", sr.getHits().getTotalHits() );
AlertResult result = new AlertResult(alertName, sr, alert.trigger(),
triggerManager.isTriggered(alertName,sr), builder, indices,
triggerManager.isTriggered(alertName,sr), srb, indices,
new DateTime(jobExecutionContext.getScheduledFireTime()));
if (result.isTriggered) {
@ -110,40 +124,25 @@ public class AlertScheduler extends AbstractLifecycleComponent {
}
}
private XContentBuilder createClampedQuery(JobExecutionContext jobExecutionContext, Alert alert) throws IOException {
private SearchRequestBuilder createClampedRequest(Client client, JobExecutionContext jobExecutionContext, Alert alert){
Date scheduledFireTime = jobExecutionContext.getScheduledFireTime();
DateTime clampEnd = new DateTime(scheduledFireTime);
DateTime clampStart = clampEnd.minusSeconds((int)alert.timePeriod().seconds());
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
builder.startObject();
builder.field("query");
builder.startObject();
builder.field("filtered");
builder.startObject();
builder.field("query");
builder.startObject();
builder.field("template");
builder.startObject();
builder.field("id");
builder.value(alert.queryName());
builder.endObject();
builder.endObject();
builder.field("filter");
builder.startObject();
builder.field("range");
builder.startObject();
builder.field("@timestamp");
builder.startObject();
builder.field("gte");
builder.value(clampStart);
builder.field("lt");
builder.value(clampEnd);
builder.endObject();
builder.endObject();
builder.endObject();
builder.endObject();
builder.endObject();
return builder;
if (alert.simpleQuery()) {
TemplateQueryBuilder queryBuilder = new TemplateQueryBuilder(alert.queryName(), ScriptService.ScriptType.INDEXED, new HashMap<String, Object>());
RangeFilterBuilder filterBuilder = new RangeFilterBuilder(alert.timestampString());
filterBuilder.gte(clampStart);
filterBuilder.lt(clampEnd);
return client.prepareSearch().setQuery(new FilteredQueryBuilder(queryBuilder, filterBuilder));
} else {
Map<String,Object> fromToMap = new HashMap<>();
fromToMap.put("from", clampStart);
fromToMap.put("to", clampEnd);
//Go and get the search template from the script service :(
ExecutableScript script = scriptService.executable("mustache", alert.queryName(), ScriptService.ScriptType.INDEXED, fromToMap);
BytesReference requestBytes = (BytesReference)(script.run());
return client.prepareSearch().setSource(requestBytes);
}
}
public void addAlert(String alertName, Alert alert) {

View File

@ -128,10 +128,13 @@ public class AlertTrigger implements ToXContent {
builder.startObject();
builder.field(triggerType.toString(), trigger.toString() + value);
builder.endObject();
return builder;
} else {
return scriptedTrigger.toXContent(builder, params);
builder.startObject();
builder.field(triggerType.toString());
scriptedTrigger.toXContent(builder, params);
builder.endObject();
}
return builder;
}
public static enum TriggerType {

View File

@ -94,7 +94,7 @@ public class EmailAlertAction implements AlertAction {
StringBuffer output = new StringBuffer();
output.append("The following query triggered because " + result.trigger.toString() + "\n");
output.append("The total number of hits returned : " + result.searchResponse.getHits().getTotalHits() + "\n");
output.append("For query : " + XContentHelper.convertToJson(result.query.bytes(),true,true) + "\n");
output.append("For query : " + result.query.toString());
output.append("\n");
output.append("Indices : ");
for (String index : result.indices) {

View File

@ -6,6 +6,7 @@
package org.elasticsearch.alerting;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
@ -17,7 +18,7 @@ import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
@ -50,13 +51,13 @@ public class TriggerManager extends AbstractComponent {
Map<String,Object> valueMap = (Map<String,Object>)value;
try {
return new ScriptedAlertTrigger(valueMap.get("script").toString(),
ScriptService.ScriptType.valueOf(valueMap.get("script_type").toString()),
ScriptService.ScriptType.valueOf(valueMap.get("script_type").toString().toUpperCase(Locale.ROOT)), ///TODO : Fix ScriptType to parse strings properly, currently only accepts uppercase versions of the enum names
valueMap.get("script_lang").toString());
} catch (Exception e){
throw new ElasticsearchIllegalArgumentException("Unable to parse " + value + " as a ScriptedAlertTrigger");
throw new ElasticsearchIllegalArgumentException("Unable to parse " + value + " as a ScriptedAlertTrigger", e);
}
} else {
throw new ElasticsearchIllegalArgumentException("Unable to parse " + value + " as a ScriptedAlertTrigger");
throw new ElasticsearchIllegalArgumentException("Unable to parse " + value + " as a ScriptedAlertTrigger, not a Map");
}
}
@ -70,12 +71,22 @@ public class TriggerManager extends AbstractComponent {
public boolean doScriptTrigger(ScriptedAlertTrigger scriptTrigger, SearchResponse response) {
try {
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
builder = response.toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
Map<String, Object> responseMap = XContentHelper.convertToMap(builder.bytes(), false).v2();
ExecutableScript executable = scriptService.executable(scriptTrigger.scriptLang, scriptTrigger.script,
scriptTrigger.scriptType, responseMap);
Object returnValue = executable.run();
logger.warn("Returned [{}] from script", returnValue);
if (returnValue instanceof Boolean) {
return (Boolean) returnValue;
} else {
throw new ElasticsearchIllegalStateException("Trigger script [" + scriptTrigger.script + "] " +
"did not return a Boolean");
}
} catch (Exception e ){
logger.error("Failed to execute script trigger", e);
}