Alerting : Add parsing and indices.

This commit adds parsing of the triggers and actions, and allows alerts to be restricted to indices.

Original commit: elastic/x-pack-elasticsearch@44a6a51b17
This commit is contained in:
Brian Murphy 2014-08-13 10:11:51 +01:00
parent 940d6402ae
commit 88ed7a5624
9 changed files with 141 additions and 50 deletions

View File

@ -5,12 +5,10 @@
*/
package org.elasticsearch.alerting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.unit.TimeValue;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
/**
* Created by brian on 8/12/14.
@ -20,6 +18,20 @@ public class Alert {
private String queryName;
private AlertTrigger trigger;
private TimeValue timePeriod;
private List<AlertAction> actions;
private String schedule;
private DateTime lastRan;
public List<String> indices() {
return indices;
}
public void indices(List<String> indices) {
this.indices = indices;
}
private List<String> indices;
public String alertName() {
return alertName;
@ -49,12 +61,12 @@ public class Alert {
this.timePeriod = timePeriod;
}
public AlertAction action() {
return action;
public List<AlertAction> actions() {
return actions;
}
public void action(AlertAction action) {
this.action = action;
public void actions(List<AlertAction> action) {
this.actions = action;
}
public String schedule() {
@ -73,19 +85,17 @@ public class Alert {
this.lastRan = lastRan;
}
private AlertAction action;
private String schedule;
private DateTime lastRan;
public Alert(String alertName, String queryName, AlertTrigger trigger,
TimeValue timePeriod, AlertAction action, String schedule, DateTime lastRan){
TimeValue timePeriod, List<AlertAction> actions, String schedule, DateTime lastRan,
List<String> indices){
this.alertName = alertName;
this.queryName = queryName;
this.trigger = trigger;
this.timePeriod = timePeriod;
this.action = action;
this.actions = actions;
this.lastRan = lastRan;
this.schedule = schedule;
this.indices = indices;
}
public String toJSON(){

View File

@ -6,6 +6,5 @@
package org.elasticsearch.alerting;
public interface AlertAction {
public boolean doAction(AlertResult alert);
public String getActionType();
public boolean doAction(String alertName, AlertResult alert);
}

View File

@ -0,0 +1,10 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerting;
public interface AlertActionFactory {
AlertAction createAction(Object parameters);
}

View File

@ -11,22 +11,40 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.SearchHitField;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class AlertActionManager extends AbstractComponent {
private final AlertManager alertManager;
private final Map<String, AlertActionFactory> actionImplemented;
@Inject
public AlertActionManager(Settings settings, AlertManager alertManager) {
super(settings);
this.alertManager = alertManager;
this.actionImplemented = new HashMap<>();
actionImplemented.put("email", new EmailAlertActionFactory());
}
public static AlertAction parseActionFromSearchField(SearchHitField hitField) {
return null;
public void registerAction(String name, AlertActionFactory actionFactory){
actionImplemented.put(name, actionFactory);
}
public List<AlertAction> parseActionsFromMap(Map<String,Object> actionMap) {
List<AlertAction> actions = new ArrayList<>();
for (Map.Entry<String, Object> actionEntry : actionMap.entrySet() ) {
actions.add(actionImplemented.get(actionEntry.getKey()).createAction(actionEntry.getValue()));
}
return actions;
}
public void doAction(String alertName, AlertResult alertResult){
Alert alert = alertManager.getAlertForName(alertName);
alert.action().doAction(alertResult);
for (AlertAction action : alert.actions()) {
action.doAction(alertName, alertResult);
}
}
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
@ -41,6 +42,7 @@ public class AlertManager extends AbstractLifecycleComponent {
public final ParseField TIMEPERIOD_FIELD = new ParseField("timeperiod");
public final ParseField ACTION_FIELD = new ParseField("action");
public final ParseField LASTRAN_FIELD = new ParseField("lastRan");
public final ParseField INDICES = new ParseField("indices");
private final Client client;
private AlertScheduler scheduler;
@ -50,6 +52,7 @@ public class AlertManager extends AbstractLifecycleComponent {
private AtomicBoolean started = new AtomicBoolean(false);
private final Thread starter;
private AlertActionManager actionManager;
class StarterThread implements Runnable {
@Override
@ -83,7 +86,10 @@ public class AlertManager extends AbstractLifecycleComponent {
}
}
@Inject
public void setActionManager(AlertActionManager actionManager){
this.actionManager = actionManager;
}
@Override
protected void doStart() throws ElasticsearchException {
@ -156,14 +162,33 @@ public class AlertManager extends AbstractLifecycleComponent {
String query = fields.get(QUERY_FIELD.getPreferredName()).toString();
String schedule = fields.get(SCHEDULE_FIELD.getPreferredName()).toString();
//AlertTrigger trigger = TriggerManager.parseTriggerFromSearchField(fields.get(TRIGGER_FIELD.toString()));
AlertTrigger trigger = new AlertTrigger(AlertTrigger.SimpleTrigger.GREATER_THAN, AlertTrigger.TriggerType.NUMBER_OF_EVENTS, 1);
Object triggerObj = fields.get(TRIGGER_FIELD.getPreferredName());
AlertTrigger trigger = null;
if (triggerObj instanceof Map) {
Map<String, Object> triggerMap = (Map<String, Object>) triggerObj;
trigger = TriggerManager.parseTriggerFromMap(triggerMap);
} else {
throw new ElasticsearchException("Unable to parse trigger [" + triggerObj + "]");
}
TimeValue timePeriod = new TimeValue(Long.valueOf(fields.get(TIMEPERIOD_FIELD.getPreferredName()).toString()));
//AlertAction action = AlertActionManager.parseActionFromSearchField(fields.get(ACTION_FIELD.toString()));
AlertAction action = new EmailAlertAction("brian.murphy@elasticsearch.com");
Object actionObj = fields.get(ACTION_FIELD.getPreferredName());
List<AlertAction> actions = null;
if (actionObj instanceof Map) {
Map<String, Object> actionMap = (Map<String, Object>) actionObj;
actions = actionManager.parseActionsFromMap(actionMap);
} else {
throw new ElasticsearchException("Unable to parse actions [" + triggerObj + "]");
}
DateTime lastRan = new DateTime(fields.get("lastRan").toString());
Alert alert = new Alert(alertId, query, trigger, timePeriod, action, schedule, lastRan);
List<String> indices = null;
if (fields.get(INDICES.getPreferredName()) != null && fields.get(INDICES.getPreferredName()) instanceof List){
indices = (List<String>)fields.get(INDICES.getPreferredName());
}
Alert alert = new Alert(alertId, query, trigger, timePeriod, actions, schedule, lastRan, indices);
alertMap.put(alertId, alert);
}
logger.warn("Loaded [{}] alerts from the alert index.", alertMap.size());

View File

@ -7,6 +7,7 @@ package org.elasticsearch.alerting;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.indexedscripts.get.GetIndexedScriptResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesReference;
@ -67,7 +68,11 @@ public class AlertScheduler extends AbstractLifecycleComponent {
ExecutableScript executable = scriptService.executable(compiledTemplate, new HashMap());
BytesReference processedQuery = (BytesReference) executable.run();
logger.warn("Compiled to [{}]", processedQuery);
SearchResponse sr = client.prepareSearch().setSource(processedQuery).execute().get();
SearchRequestBuilder srb = client.prepareSearch().setSource(processedQuery);
if (alert.indices() != null ){
srb.setIndices(alert.indices().toArray(new String[0]));
}
SearchResponse sr = srb.execute().get();
logger.warn("Got search response");
AlertResult result = new AlertResult();
result.isTriggered = triggerManager.isTriggered(alertName,sr);

View File

@ -12,37 +12,40 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import javax.mail.*;
import javax.mail.internet.AddressException;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;
public class EmailAlertAction implements AlertAction {
List<String> emailAddresses = new ArrayList<>();
List<Address> emailAddresses = new ArrayList<>();
String from = "esalertingtest@gmail.com";
String passwd = "elasticsearchforthewin";
String server = "smtp.gmail.com";
int port = 465;
int port = 587;
public EmailAlertAction(SearchHitField hitField){
emailAddresses.add("brian.murphy@elasticsearch.com");
}
public EmailAlertAction(String ... addresses){
for (String address : addresses) {
emailAddresses.add(address);
addEmailAddress(address);
}
}
public void addEmailAddress(String address) {
try {
emailAddresses.add(InternetAddress.parse(address)[0]);
} catch (AddressException addressException) {
throw new ElasticsearchException("Unable to parse address : [" + address + "]");
}
}
@Override
public boolean doAction(AlertResult result) {
public boolean doAction(String alertName, AlertResult result) {
Properties props = new Properties();
props.put("mail.smtp.auth", "true");
props.put("mail.smtp.starttls.enable", "true");
props.put("mail.smtp.host", "smtp.gmail.com");
props.put("mail.smtp.port", "587");
props.put("mail.smtp.host", server);
props.put("mail.smtp.port", port);
Session session = Session.getInstance(props,
new javax.mail.Authenticator() {
protected PasswordAuthentication getPasswordAuthentication() {
@ -53,8 +56,8 @@ public class EmailAlertAction implements AlertAction {
try {
message.setFrom(new InternetAddress(from));
message.setRecipients(Message.RecipientType.TO,
InternetAddress.parse(emailAddresses.get(0)));
message.setSubject("Elasticsearch Alert!");
emailAddresses.toArray(new Address[1]));
message.setSubject("Elasticsearch Alert from " + alertName);
message.setText(result.searchResponse.toString());
Transport.send(message);
} catch (Exception e){
@ -63,9 +66,4 @@ public class EmailAlertAction implements AlertAction {
return true;
}
@Override
public String getActionType() {
return "email";
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerting;
import java.util.List;
public class EmailAlertActionFactory implements AlertActionFactory{
@Override
public AlertAction createAction(Object parameters) {
EmailAlertAction action = new EmailAlertAction();
if (parameters instanceof List){
for (String emailAddress : (List<String>)parameters) {
action.addEmailAddress(emailAddress);
}
}
return action;
}
}

View File

@ -9,20 +9,24 @@ import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.SearchHitField;
import java.util.Map;
public class TriggerManager extends AbstractComponent {
private final AlertManager alertManager;
//private ESLogger logger = Loggers.getLogger(TriggerManager.class);
public static AlertTrigger parseTriggerFromSearchField(SearchHitField hitField) {
public static AlertTrigger parseTriggerFromMap(Map<String, Object> triggerMap) {
//For now just trigger on number of events greater than 1
return new AlertTrigger(AlertTrigger.SimpleTrigger.GREATER_THAN, AlertTrigger.TriggerType.NUMBER_OF_EVENTS, 1);
//return null;
for (Map.Entry<String,Object> entry : triggerMap.entrySet()){
AlertTrigger.TriggerType type = AlertTrigger.TriggerType.fromString(entry.getKey());
AlertTrigger.SimpleTrigger simpleTrigger = AlertTrigger.SimpleTrigger.fromString(entry.getValue().toString().substring(0, 1));
int value = Integer.valueOf(entry.getValue().toString().substring(1));
return new AlertTrigger(simpleTrigger, type, value);
}
throw new ElasticsearchIllegalArgumentException();
}
@Inject