Alerting : Add time clamping.
This commit adds timestamp clamping on the timeperiod to alerting. Also adds index setting on alerts. Original commit: elastic/x-pack-elasticsearch@222cd6eaef
This commit is contained in:
parent
88ed7a5624
commit
e3250c0366
|
@ -17,7 +17,8 @@ public class AlertExecutorJob implements Job {
|
||||||
@Override
|
@Override
|
||||||
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
|
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
|
||||||
String alertName = jobExecutionContext.getJobDetail().getKey().getName();
|
String alertName = jobExecutionContext.getJobDetail().getKey().getName();
|
||||||
((AlertScheduler)jobExecutionContext.getJobDetail().getJobDataMap().get("manager")).executeAlert(alertName);
|
((AlertScheduler)jobExecutionContext.getJobDetail().getJobDataMap().get("manager")).executeAlert(alertName,
|
||||||
|
jobExecutionContext);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -53,6 +53,7 @@ public class AlertManager extends AbstractLifecycleComponent {
|
||||||
private final Thread starter;
|
private final Thread starter;
|
||||||
|
|
||||||
private AlertActionManager actionManager;
|
private AlertActionManager actionManager;
|
||||||
|
final TimeValue defaultTimePeriod = new TimeValue(300*1000); //TODO : read from config
|
||||||
|
|
||||||
class StarterThread implements Runnable {
|
class StarterThread implements Runnable {
|
||||||
@Override
|
@Override
|
||||||
|
@ -170,7 +171,9 @@ public class AlertManager extends AbstractLifecycleComponent {
|
||||||
} else {
|
} else {
|
||||||
throw new ElasticsearchException("Unable to parse trigger [" + triggerObj + "]");
|
throw new ElasticsearchException("Unable to parse trigger [" + triggerObj + "]");
|
||||||
}
|
}
|
||||||
TimeValue timePeriod = new TimeValue(Long.valueOf(fields.get(TIMEPERIOD_FIELD.getPreferredName()).toString()));
|
|
||||||
|
String timeString = fields.get(TIMEPERIOD_FIELD.getPreferredName()).toString();
|
||||||
|
TimeValue timePeriod = TimeValue.parseTimeValue(timeString, defaultTimePeriod);
|
||||||
|
|
||||||
Object actionObj = fields.get(ACTION_FIELD.getPreferredName());
|
Object actionObj = fields.get(ACTION_FIELD.getPreferredName());
|
||||||
List<AlertAction> actions = null;
|
List<AlertAction> actions = null;
|
||||||
|
|
|
@ -5,15 +5,19 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.alerting;
|
package org.elasticsearch.alerting;
|
||||||
|
|
||||||
|
import org.apache.lucene.util.BytesRef;
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.action.indexedscripts.get.GetIndexedScriptResponse;
|
import org.elasticsearch.action.indexedscripts.get.GetIndexedScriptResponse;
|
||||||
import org.elasticsearch.action.search.SearchRequestBuilder;
|
import org.elasticsearch.action.search.SearchRequestBuilder;
|
||||||
import org.elasticsearch.action.search.SearchResponse;
|
import org.elasticsearch.action.search.SearchResponse;
|
||||||
import org.elasticsearch.client.Client;
|
import org.elasticsearch.client.Client;
|
||||||
|
import org.elasticsearch.common.ParseField;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
|
import org.elasticsearch.common.joda.time.DateTime;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.xcontent.*;
|
||||||
import org.elasticsearch.script.CompiledScript;
|
import org.elasticsearch.script.CompiledScript;
|
||||||
import org.elasticsearch.script.ExecutableScript;
|
import org.elasticsearch.script.ExecutableScript;
|
||||||
import org.elasticsearch.script.ScriptService;
|
import org.elasticsearch.script.ScriptService;
|
||||||
|
@ -21,7 +25,10 @@ import org.quartz.*;
|
||||||
import org.quartz.impl.StdSchedulerFactory;
|
import org.quartz.impl.StdSchedulerFactory;
|
||||||
import org.quartz.simpl.SimpleJobFactory;
|
import org.quartz.simpl.SimpleJobFactory;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Date;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
public class AlertScheduler extends AbstractLifecycleComponent {
|
public class AlertScheduler extends AbstractLifecycleComponent {
|
||||||
|
|
||||||
|
@ -50,44 +57,32 @@ public class AlertScheduler extends AbstractLifecycleComponent {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void executeAlert(String alertName){
|
public void executeAlert(String alertName, JobExecutionContext jobExecutionContext){
|
||||||
logger.warn("Running [{}]",alertName);
|
logger.warn("Running [{}]",alertName);
|
||||||
Alert alert = alertManager.getAlertForName(alertName);
|
Alert alert = alertManager.getAlertForName(alertName);
|
||||||
//@TODO : claim alert
|
//@TODO : claim alert
|
||||||
String queryTemplate = null;
|
|
||||||
try {
|
try {
|
||||||
logger.warn("Getting the query");
|
XContentBuilder builder = createClampedQuery(jobExecutionContext, alert);
|
||||||
GetIndexedScriptResponse scriptResponse = client.prepareGetIndexedScript().setScriptLang("mustache").setId(alert.queryName()).execute().get();
|
logger.warn("Running the following query : [{}]", builder.string());
|
||||||
logger.warn("Got the query");
|
|
||||||
if (scriptResponse.isExists()) {
|
SearchRequestBuilder srb = client.prepareSearch().setSource(builder);
|
||||||
queryTemplate = scriptResponse.getScript();
|
if (alert.indices() != null ){
|
||||||
|
srb.setIndices(alert.indices().toArray(new String[0]));
|
||||||
}
|
}
|
||||||
logger.warn("Found : [{}]", queryTemplate);
|
SearchResponse sr = srb.execute().get();
|
||||||
if (queryTemplate != null) {
|
logger.warn("Got search response");
|
||||||
CompiledScript compiledTemplate = scriptService.compile("mustache",queryTemplate);
|
AlertResult result = new AlertResult();
|
||||||
ExecutableScript executable = scriptService.executable(compiledTemplate, new HashMap());
|
result.isTriggered = triggerManager.isTriggered(alertName,sr);
|
||||||
BytesReference processedQuery = (BytesReference) executable.run();
|
|
||||||
logger.warn("Compiled to [{}]", processedQuery);
|
result.searchResponse = sr;
|
||||||
SearchRequestBuilder srb = client.prepareSearch().setSource(processedQuery);
|
if (result.isTriggered) {
|
||||||
if (alert.indices() != null ){
|
logger.warn("We have triggered");
|
||||||
srb.setIndices(alert.indices().toArray(new String[0]));
|
//actionManager.doAction(alertName,result);
|
||||||
}
|
logger.warn("Did action !");
|
||||||
SearchResponse sr = srb.execute().get();
|
}else{
|
||||||
logger.warn("Got search response");
|
logger.warn("We didn't trigger");
|
||||||
AlertResult result = new AlertResult();
|
|
||||||
result.isTriggered = triggerManager.isTriggered(alertName,sr);
|
|
||||||
result.searchResponse = sr;
|
|
||||||
if (result.isTriggered) {
|
|
||||||
logger.warn("We have triggered");
|
|
||||||
actionManager.doAction(alertName,result);
|
|
||||||
logger.warn("Did action !");
|
|
||||||
}else{
|
|
||||||
logger.warn("We didn't trigger");
|
|
||||||
}
|
|
||||||
//@TODO write this back to the alert manager
|
|
||||||
} else {
|
|
||||||
logger.error("Failed to find query named [{}]",alert.queryName());
|
|
||||||
}
|
}
|
||||||
|
//@TODO write this back to the alert manager
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("Fail", e);
|
logger.error("Fail", e);
|
||||||
logger.error("Failed execute alert [{}]",alert.queryName(), e);
|
logger.error("Failed execute alert [{}]",alert.queryName(), e);
|
||||||
|
@ -95,18 +90,50 @@ public class AlertScheduler extends AbstractLifecycleComponent {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private XContentBuilder createClampedQuery(JobExecutionContext jobExecutionContext, Alert alert) throws IOException {
|
||||||
|
Date scheduledFireTime = jobExecutionContext.getScheduledFireTime();
|
||||||
|
DateTime clampEnd = new DateTime(scheduledFireTime);
|
||||||
|
DateTime clampStart = clampEnd.minusSeconds((int)alert.timePeriod().seconds());
|
||||||
|
logger.error("Subtracting : [{}] seconds from [{}] = [{}]", (int)alert.timePeriod().seconds(), clampEnd, clampStart );
|
||||||
|
|
||||||
|
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
|
||||||
|
builder.startObject();
|
||||||
|
builder.field("query");
|
||||||
|
builder.startObject();
|
||||||
|
builder.field("filtered");
|
||||||
|
builder.startObject();
|
||||||
|
builder.field("query");
|
||||||
|
builder.startObject();
|
||||||
|
builder.field("template");
|
||||||
|
builder.startObject();
|
||||||
|
builder.field("id");
|
||||||
|
builder.value(alert.queryName());
|
||||||
|
builder.endObject();
|
||||||
|
builder.endObject();
|
||||||
|
builder.field("filter");
|
||||||
|
builder.startObject();
|
||||||
|
builder.field("range");
|
||||||
|
builder.startObject();
|
||||||
|
builder.field("@timestamp");
|
||||||
|
builder.startObject();
|
||||||
|
builder.field("gte");
|
||||||
|
builder.value(clampStart);
|
||||||
|
builder.field("lt");
|
||||||
|
builder.value(clampEnd);
|
||||||
|
builder.endObject();
|
||||||
|
builder.endObject();
|
||||||
|
builder.endObject();
|
||||||
|
builder.endObject();
|
||||||
|
builder.endObject();
|
||||||
|
return builder;
|
||||||
|
}
|
||||||
|
|
||||||
public void addAlert(String alertName, Alert alert) {
|
public void addAlert(String alertName, Alert alert) {
|
||||||
try {
|
|
||||||
org.elasticsearch.alerting.AlertExecutorJob.class.newInstance();
|
|
||||||
} catch (Exception e){
|
|
||||||
logger.error("NOOOOO",e);
|
|
||||||
}
|
|
||||||
JobDetail job = JobBuilder.newJob(org.elasticsearch.alerting.AlertExecutorJob.class).withIdentity(alertName).build();
|
JobDetail job = JobBuilder.newJob(org.elasticsearch.alerting.AlertExecutorJob.class).withIdentity(alertName).build();
|
||||||
job.getJobDataMap().put("manager",this);
|
job.getJobDataMap().put("manager",this);
|
||||||
CronTrigger cronTrigger = TriggerBuilder.newTrigger()
|
CronTrigger cronTrigger = TriggerBuilder.newTrigger()
|
||||||
.withSchedule(CronScheduleBuilder.cronSchedule(alert.schedule()))
|
.withSchedule(CronScheduleBuilder.cronSchedule(alert.schedule()))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
logger.warn("Scheduling [{}] with schedule [{}]", alertName, alert.schedule());
|
logger.warn("Scheduling [{}] with schedule [{}]", alertName, alert.schedule());
|
||||||
scheduler.scheduleJob(job, cronTrigger);
|
scheduler.scheduleJob(job, cronTrigger);
|
||||||
|
|
Loading…
Reference in New Issue