Alerting : Add support for quartz scheduler.

This commit enables loading of alerts from the .alerts index and adds the
Quartz scheduler.
You can add the following alert :
````
curl -XPOST http://localhost:9200/.alerts/alert/myTestAlert -d '{
    "query" : "myAlertQuery",
    "schedule" : "00 * * * * ?",
    "trigger" : {
         "numberOfEvents" : ">1"
     },
    "timeperiod" : 300,
     "action" : {
         "email" : [ "brian.murphy@elasticsearch.com" ]
     },
    "version" : 1,
    "lastRan" : "2014-05-05T12:12:12.123Z"
}
````
With the following search template:
````
curl -XPOST localhost:9200/_search/template/myAlertQuery -d '{ "template" : { "query" : { "match_all" : {} } } }'
````
This will execute the search every minute and trigger if there is more than one match (which there will be).

Original commit: elastic/x-pack-elasticsearch@708f927914
This commit is contained in:
Brian Murphy 2014-08-12 18:57:28 +01:00
parent 1e6d6b58c9
commit 5f84596c92
8 changed files with 329 additions and 30 deletions

39
pom.xml
View File

@ -23,8 +23,45 @@
<version>${elasticsearch.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz-jobs</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
<version>1.4.4</version>
</dependency>
<dependency>
<groupId>javax.activation</groupId>
<artifactId>activation</artifactId>
<version>1.1.1</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>maven2-repository.dev.java.net</id>
<name>Java.net Repository for Maven</name>
<url>http://download.java.net/maven/2/</url>
<layout>default</layout>
</repository>
</repositories>
<build>
<plugins>
<plugin>

View File

@ -5,13 +5,28 @@
*/
package org.elasticsearch.alerting;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.SearchHitField;
/**
* Created by brian on 8/12/14.
*/
public class AlertActionManager {
public class AlertActionManager extends AbstractComponent {
private final AlertManager alertManager;
@Inject
public AlertActionManager(Settings settings, AlertManager alertManager) {
super(settings);
this.alertManager = alertManager;
}
public static AlertAction parseActionFromSearchField(SearchHitField hitField) {
return null;
}
public void doAction(String alertName, AlertResult alertResult){
Alert alert = alertManager.getAlertForName(alertName);
alert.action().doAction(alertResult);
}
}

View File

@ -0,0 +1,23 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerting;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
public class AlertExecutorJob implements Job {
public AlertExecutorJob () {
}
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
String alertName = jobExecutionContext.getJobDetail().getKey().getName();
((AlertScheduler)jobExecutionContext.getJobDetail().getJobDataMap().get("manager")).executeAlert(alertName);
}
}

View File

@ -10,23 +10,23 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
public class AlertManager extends AbstractLifecycleComponent {
@ -42,23 +42,65 @@ public class AlertManager extends AbstractLifecycleComponent {
public final ParseField ACTION_FIELD = new ParseField("action");
public final ParseField LASTRAN_FIELD = new ParseField("lastRan");
public final Client client;
private final Client client;
private AlertScheduler scheduler;
private final Map<String,Alert> alertMap;
private AtomicBoolean started = new AtomicBoolean(false);
private final Thread starter;
class StarterThread implements Runnable {
@Override
public void run() {
logger.warn("Starting thread to get alerts");
int attempts = 0;
while (attempts < 2) {
try {
logger.warn("Sleeping [{}]", attempts);
Thread.sleep(10000);
logger.warn("Slept");
break;
} catch (InterruptedException ie) {
++attempts;
}
}
logger.warn("Loading alerts");
try {
loadAlerts();
started.set(true);
} catch (Throwable t) {
logger.error("Failed to load alerts", t);
}
//Build the mapping for the scheduler
Map<String,String> alertNameToSchedule = new HashMap();
synchronized (alertMap) {
for (Map.Entry<String, Alert> entry : alertMap.entrySet()) {
scheduler.addAlert(entry.getKey(), entry.getValue());
}
}
}
}
@Override
protected void doStart() throws ElasticsearchException {
logger.warn("STARTING");
try {
loadAlerts();
} catch (Throwable t){
logger.error("Failed to load alerts", t);
}
starter.start();
}
@Override
protected void doStop() throws ElasticsearchException {
logger.warn("STOPPING");
/*
try {
starter.join();
} catch (InterruptedException ie) {
logger.warn("Interrupted on joining start thread.", ie);
}
*/
}
@Override
@ -70,20 +112,21 @@ public class AlertManager extends AbstractLifecycleComponent {
@Inject
public AlertManager(Settings settings, Client client) {
super(settings);
logger.warn("Starting AlertManager");
logger.warn("Initing AlertManager");
this.client = client;
alertMap = new HashMap();
starter = new Thread(new StarterThread());
//scheduleAlerts();
}
private ClusterHealthStatus clusterHealth() throws InterruptedException, ExecutionException {
ClusterHealthResponse actionGet = client.admin().cluster()
.health(Requests.clusterHealthRequest(ALERT_INDEX).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet();
return actionGet.getStatus();
@Inject
public void setAlertScheduler(AlertScheduler scheduler){
this.scheduler = scheduler;
}
private ClusterHealthStatus createAlertsIndex() throws InterruptedException, ExecutionException {
client.admin().indices().prepareCreate(ALERT_INDEX).addMapping(ALERT_TYPE).execute().get(); //TODO FIX MAPPINGS
CreateIndexResponse cir = client.admin().indices().prepareCreate(ALERT_INDEX).addMapping(ALERT_TYPE).execute().get(); //TODO FIX MAPPINGS
logger.warn(cir.toString());
ClusterHealthResponse actionGet = client.admin().cluster()
.health(Requests.clusterHealthRequest(ALERT_INDEX).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet();
return actionGet.getStatus();
@ -96,20 +139,29 @@ public class AlertManager extends AbstractLifecycleComponent {
synchronized (alertMap) {
SearchResponse searchResponse = client.prepareSearch().setSource(
"{ 'query' : " +
"{ 'match_all' : {}}" +
"'size' : 100" +
"{ \"query\" : " +
"{ \"match_all\" : {}}," +
"\"size\" : \"100\"" +
"}"
).setTypes(ALERT_TYPE).setIndices(ALERT_INDEX).execute().get();
for (SearchHit sh : searchResponse.getHits()) {
String alertId = sh.getId();
Map<String,SearchHitField> fields = sh.getFields();
String query = fields.get(QUERY_FIELD.toString()).toString();
String schedule = fields.get(SCHEDULE_FIELD.toString()).toString();
AlertTrigger trigger = TriggerManager.parseTriggerFromSearchField(fields.get(TRIGGER_FIELD.toString()));
TimeValue timePeriod = new TimeValue(Long.valueOf(fields.get(TIMEPERIOD_FIELD).toString()));
AlertAction action = AlertActionManager.parseActionFromSearchField(fields.get(ACTION_FIELD.toString()));
DateTime lastRan = new DateTime(fields.get(LASTRAN_FIELD.toString().toString()));
logger.warn("Found : [{}]", alertId);
Map<String,Object> fields = sh.sourceAsMap();
//Map<String,SearchHitField> fields = sh.getFields();
for (String field : fields.keySet() ) {
logger.warn("Field : [{}]", field);
}
String query = fields.get(QUERY_FIELD.getPreferredName()).toString();
String schedule = fields.get(SCHEDULE_FIELD.getPreferredName()).toString();
//AlertTrigger trigger = TriggerManager.parseTriggerFromSearchField(fields.get(TRIGGER_FIELD.toString()));
AlertTrigger trigger = new AlertTrigger(AlertTrigger.SimpleTrigger.GREATER_THAN, AlertTrigger.TriggerType.NUMBER_OF_EVENTS, 1);
TimeValue timePeriod = new TimeValue(Long.valueOf(fields.get(TIMEPERIOD_FIELD.getPreferredName()).toString()));
//AlertAction action = AlertActionManager.parseActionFromSearchField(fields.get(ACTION_FIELD.toString()));
AlertAction action = new EmailAlertAction("brian.murphy@elasticsearch.com");
DateTime lastRan = new DateTime(fields.get("lastRan").toString());
Alert alert = new Alert(alertId, query, trigger, timePeriod, action, schedule, lastRan);
alertMap.put(alertId, alert);

View File

@ -0,0 +1,135 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerting;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.indexedscripts.get.GetIndexedScriptResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.script.CompiledScript;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.simpl.SimpleJobFactory;
import java.util.HashMap;
public class AlertScheduler extends AbstractLifecycleComponent {
Scheduler scheduler = null;
private final AlertManager alertManager;
private final Client client;
private final ScriptService scriptService;
private final TriggerManager triggerManager;
private final AlertActionManager actionManager;
@Inject
public AlertScheduler(Settings settings, AlertManager alertManager, ScriptService scriptService, Client client,
TriggerManager triggerManager, AlertActionManager actionManager) {
super(settings);
this.alertManager = alertManager;
this.client = client;
this.scriptService = scriptService;
this.triggerManager = triggerManager;
this.actionManager = actionManager;
try {
SchedulerFactory schFactory = new StdSchedulerFactory();
scheduler = schFactory.getScheduler();
scheduler.setJobFactory(new SimpleJobFactory());
} catch (Throwable t) {
logger.error("Failed to instantiate scheduler", t);
}
}
public void executeAlert(String alertName){
logger.warn("Running [{}]",alertName);
Alert alert = alertManager.getAlertForName(alertName);
//@TODO : claim alert
String queryTemplate = null;
try {
logger.warn("Getting the query");
GetIndexedScriptResponse scriptResponse = client.prepareGetIndexedScript().setScriptLang("mustache").setId(alert.queryName()).execute().get();
logger.warn("Got the query");
if (scriptResponse.isExists()) {
queryTemplate = scriptResponse.getScript();
}
logger.warn("Found : [{}]", queryTemplate);
if (queryTemplate != null) {
CompiledScript compiledTemplate = scriptService.compile("mustache",queryTemplate);
ExecutableScript executable = scriptService.executable(compiledTemplate, new HashMap());
BytesReference processedQuery = (BytesReference) executable.run();
logger.warn("Compiled to [{}]", processedQuery);
SearchResponse sr = client.prepareSearch().setSource(processedQuery).execute().get();
logger.warn("Got search response");
AlertResult result = new AlertResult();
result.isTriggered = triggerManager.isTriggered(alertName,sr);
result.searchResponse = sr;
if (result.isTriggered) {
logger.warn("We have triggered");
actionManager.doAction(alertName,result);
logger.warn("Did action !");
}else{
logger.warn("We didn't trigger");
}
//@TODO write this back to the alert manager
} else {
logger.error("Failed to find query named [{}]",alert.queryName());
}
} catch (Exception e) {
logger.error("Fail", e);
logger.error("Failed execute alert [{}]",alert.queryName(), e);
}
}
public void addAlert(String alertName, Alert alert) {
try {
org.elasticsearch.alerting.AlertExecutorJob.class.newInstance();
} catch (Exception e){
logger.error("NOOOOO",e);
}
JobDetail job = JobBuilder.newJob(org.elasticsearch.alerting.AlertExecutorJob.class).withIdentity(alertName).build();
job.getJobDataMap().put("manager",this);
CronTrigger cronTrigger = TriggerBuilder.newTrigger()
.withSchedule(CronScheduleBuilder.cronSchedule(alert.schedule()))
.build();
try {
logger.warn("Scheduling [{}] with schedule [{}]", alertName, alert.schedule());
scheduler.scheduleJob(job, cronTrigger);
} catch (SchedulerException se) {
logger.error("Failed to schedule job",se);
}
}
@Override
protected void doStart() throws ElasticsearchException {
logger.warn("Starting Scheduler");
try {
scheduler.start();
} catch (SchedulerException se){
logger.error("Failed to start quartz scheduler",se);
}
}
@Override
protected void doStop() throws ElasticsearchException {
try {
scheduler.shutdown(true);
} catch (SchedulerException se){
logger.error("Failed to stop quartz scheduler",se);
}
}
@Override
protected void doClose() throws ElasticsearchException {
}
}

View File

@ -13,6 +13,8 @@ public class AlertingModule extends AbstractModule {
protected void configure() {
bind(AlertManager.class).asEagerSingleton();
bind(TriggerManager.class).asEagerSingleton();
bind(AlertScheduler.class).asEagerSingleton();
bind(AlertActionManager.class).asEagerSingleton();
}
}

View File

@ -5,10 +5,15 @@
*/
package org.elasticsearch.alerting;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.search.SearchHitField;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import javax.mail.*;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;
public class EmailAlertAction implements AlertAction {
List<String> emailAddresses = new ArrayList<>();
@ -24,8 +29,36 @@ public class EmailAlertAction implements AlertAction {
emailAddresses.add("brian.murphy@elasticsearch.com");
}
public EmailAlertAction(String ... addresses){
for (String address : addresses) {
emailAddresses.add(address);
}
}
@Override
public boolean doAction(AlertResult alert) {
public boolean doAction(AlertResult result) {
Properties props = new Properties();
props.put("mail.smtp.auth", "true");
props.put("mail.smtp.starttls.enable", "true");
props.put("mail.smtp.host", "smtp.gmail.com");
props.put("mail.smtp.port", "587");
Session session = Session.getInstance(props,
new javax.mail.Authenticator() {
protected PasswordAuthentication getPasswordAuthentication() {
return new PasswordAuthentication(from, passwd);
}
});
Message message = new MimeMessage(session);
try {
message.setFrom(new InternetAddress(from));
message.setRecipients(Message.RecipientType.TO,
InternetAddress.parse(emailAddresses.get(0)));
message.setSubject("Elasticsearch Alert!");
message.setText(result.searchResponse.toString());
} catch (Exception e){
throw new ElasticsearchException("Failed to send mail", e);
}
//Email here
return true;
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.plugin.alerting;
import org.elasticsearch.alerting.AlertManager;
import org.elasticsearch.alerting.AlertScheduler;
import org.elasticsearch.alerting.AlertingModule;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.component.LifecycleComponent;
@ -27,6 +28,7 @@ public class AlertingPlugin extends AbstractPlugin {
public Collection<java.lang.Class<? extends LifecycleComponent>> services() {
Collection<java.lang.Class<? extends LifecycleComponent>> services = Lists.newArrayList();
services.add(AlertManager.class);
services.add(AlertScheduler.class);
return services;
}