From e3250c03660cc124793a025d01f281756d73031e Mon Sep 17 00:00:00 2001 From: Brian Murphy Date: Wed, 13 Aug 2014 15:21:55 +0100 Subject: [PATCH] Alerting : Add time clamping. This commit adds timestamp clamping on the timeperiod to alerting. Also adds index setting on alerts. Original commit: elastic/x-pack-elasticsearch@222cd6eaef1ec8589c66db0ad669d3b3b9fa732d --- .../alerting/AlertExecutorJob.java | 3 +- .../elasticsearch/alerting/AlertManager.java | 5 +- .../alerting/AlertScheduler.java | 103 +++++++++++------- 3 files changed, 71 insertions(+), 40 deletions(-) diff --git a/src/main/java/org/elasticsearch/alerting/AlertExecutorJob.java b/src/main/java/org/elasticsearch/alerting/AlertExecutorJob.java index 54fff9ba365..8bbe85ec401 100644 --- a/src/main/java/org/elasticsearch/alerting/AlertExecutorJob.java +++ b/src/main/java/org/elasticsearch/alerting/AlertExecutorJob.java @@ -17,7 +17,8 @@ public class AlertExecutorJob implements Job { @Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { String alertName = jobExecutionContext.getJobDetail().getKey().getName(); - ((AlertScheduler)jobExecutionContext.getJobDetail().getJobDataMap().get("manager")).executeAlert(alertName); + ((AlertScheduler)jobExecutionContext.getJobDetail().getJobDataMap().get("manager")).executeAlert(alertName, + jobExecutionContext); } } diff --git a/src/main/java/org/elasticsearch/alerting/AlertManager.java b/src/main/java/org/elasticsearch/alerting/AlertManager.java index d7ca11d360f..38f434208ca 100644 --- a/src/main/java/org/elasticsearch/alerting/AlertManager.java +++ b/src/main/java/org/elasticsearch/alerting/AlertManager.java @@ -53,6 +53,7 @@ public class AlertManager extends AbstractLifecycleComponent { private final Thread starter; private AlertActionManager actionManager; + final TimeValue defaultTimePeriod = new TimeValue(300*1000); //TODO : read from config class StarterThread implements Runnable { @Override @@ -170,7 +171,9 @@ public class AlertManager extends AbstractLifecycleComponent { } else { throw new ElasticsearchException("Unable to parse trigger [" + triggerObj + "]"); } - TimeValue timePeriod = new TimeValue(Long.valueOf(fields.get(TIMEPERIOD_FIELD.getPreferredName()).toString())); + + String timeString = fields.get(TIMEPERIOD_FIELD.getPreferredName()).toString(); + TimeValue timePeriod = TimeValue.parseTimeValue(timeString, defaultTimePeriod); Object actionObj = fields.get(ACTION_FIELD.getPreferredName()); List actions = null; diff --git a/src/main/java/org/elasticsearch/alerting/AlertScheduler.java b/src/main/java/org/elasticsearch/alerting/AlertScheduler.java index 0959b2e9116..3a6e50b2498 100644 --- a/src/main/java/org/elasticsearch/alerting/AlertScheduler.java +++ b/src/main/java/org/elasticsearch/alerting/AlertScheduler.java @@ -5,15 +5,19 @@ */ package org.elasticsearch.alerting; +import org.apache.lucene.util.BytesRef; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.indexedscripts.get.GetIndexedScriptResponse; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.*; import org.elasticsearch.script.CompiledScript; import org.elasticsearch.script.ExecutableScript; import org.elasticsearch.script.ScriptService; @@ -21,7 +25,10 @@ import org.quartz.*; import org.quartz.impl.StdSchedulerFactory; import org.quartz.simpl.SimpleJobFactory; +import java.io.IOException; +import java.util.Date; import java.util.HashMap; +import java.util.Map; public class AlertScheduler extends AbstractLifecycleComponent { @@ -50,44 +57,32 @@ public class AlertScheduler extends AbstractLifecycleComponent { } } - public void executeAlert(String alertName){ + public void executeAlert(String alertName, JobExecutionContext jobExecutionContext){ logger.warn("Running [{}]",alertName); Alert alert = alertManager.getAlertForName(alertName); //@TODO : claim alert - String queryTemplate = null; try { - logger.warn("Getting the query"); - GetIndexedScriptResponse scriptResponse = client.prepareGetIndexedScript().setScriptLang("mustache").setId(alert.queryName()).execute().get(); - logger.warn("Got the query"); - if (scriptResponse.isExists()) { - queryTemplate = scriptResponse.getScript(); + XContentBuilder builder = createClampedQuery(jobExecutionContext, alert); + logger.warn("Running the following query : [{}]", builder.string()); + + SearchRequestBuilder srb = client.prepareSearch().setSource(builder); + if (alert.indices() != null ){ + srb.setIndices(alert.indices().toArray(new String[0])); } - logger.warn("Found : [{}]", queryTemplate); - if (queryTemplate != null) { - CompiledScript compiledTemplate = scriptService.compile("mustache",queryTemplate); - ExecutableScript executable = scriptService.executable(compiledTemplate, new HashMap()); - BytesReference processedQuery = (BytesReference) executable.run(); - logger.warn("Compiled to [{}]", processedQuery); - SearchRequestBuilder srb = client.prepareSearch().setSource(processedQuery); - if (alert.indices() != null ){ - srb.setIndices(alert.indices().toArray(new String[0])); - } - SearchResponse sr = srb.execute().get(); - logger.warn("Got search response"); - AlertResult result = new AlertResult(); - result.isTriggered = triggerManager.isTriggered(alertName,sr); - result.searchResponse = sr; - if (result.isTriggered) { - logger.warn("We have triggered"); - actionManager.doAction(alertName,result); - logger.warn("Did action !"); - }else{ - logger.warn("We didn't trigger"); - } - //@TODO write this back to the alert manager - } else { - logger.error("Failed to find query named [{}]",alert.queryName()); + SearchResponse sr = srb.execute().get(); + logger.warn("Got search response"); + AlertResult result = new AlertResult(); + result.isTriggered = triggerManager.isTriggered(alertName,sr); + + result.searchResponse = sr; + if (result.isTriggered) { + logger.warn("We have triggered"); + //actionManager.doAction(alertName,result); + logger.warn("Did action !"); + }else{ + logger.warn("We didn't trigger"); } + //@TODO write this back to the alert manager } catch (Exception e) { logger.error("Fail", e); logger.error("Failed execute alert [{}]",alert.queryName(), e); @@ -95,18 +90,50 @@ public class AlertScheduler extends AbstractLifecycleComponent { } + private XContentBuilder createClampedQuery(JobExecutionContext jobExecutionContext, Alert alert) throws IOException { + Date scheduledFireTime = jobExecutionContext.getScheduledFireTime(); + DateTime clampEnd = new DateTime(scheduledFireTime); + DateTime clampStart = clampEnd.minusSeconds((int)alert.timePeriod().seconds()); + logger.error("Subtracting : [{}] seconds from [{}] = [{}]", (int)alert.timePeriod().seconds(), clampEnd, clampStart ); + + XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON); + builder.startObject(); + builder.field("query"); + builder.startObject(); + builder.field("filtered"); + builder.startObject(); + builder.field("query"); + builder.startObject(); + builder.field("template"); + builder.startObject(); + builder.field("id"); + builder.value(alert.queryName()); + builder.endObject(); + builder.endObject(); + builder.field("filter"); + builder.startObject(); + builder.field("range"); + builder.startObject(); + builder.field("@timestamp"); + builder.startObject(); + builder.field("gte"); + builder.value(clampStart); + builder.field("lt"); + builder.value(clampEnd); + builder.endObject(); + builder.endObject(); + builder.endObject(); + builder.endObject(); + builder.endObject(); + return builder; + } + public void addAlert(String alertName, Alert alert) { - try { - org.elasticsearch.alerting.AlertExecutorJob.class.newInstance(); - } catch (Exception e){ - logger.error("NOOOOO",e); - } JobDetail job = JobBuilder.newJob(org.elasticsearch.alerting.AlertExecutorJob.class).withIdentity(alertName).build(); job.getJobDataMap().put("manager",this); CronTrigger cronTrigger = TriggerBuilder.newTrigger() .withSchedule(CronScheduleBuilder.cronSchedule(alert.schedule())) .build(); - try { logger.warn("Scheduling [{}] with schedule [{}]", alertName, alert.schedule()); scheduler.scheduleJob(job, cronTrigger);