Refactor actions to match trigger refactoring.

This change refactors the old AlertActions code into Actions to mirror the triggers code.
This work also includes the configuration changes and webhook changes from master.
TemplateUtils has been renamed to StringTemplateUtils. References to the old AlertActions code have been removed and updated
to reference the new code.
Action.Result now implements ToXContent
This allows the FiredAlerts to track the history of the actions.

Original commit: elastic/x-pack-elasticsearch@a3d5d3bd4d
This commit is contained in:
Brian Murphy 2015-02-05 13:20:53 -05:00
parent 01375a320d
commit 3147927e20
24 changed files with 1122 additions and 791 deletions

View File

@ -5,7 +5,7 @@
*/
package org.elasticsearch.alerts;
import org.elasticsearch.alerts.actions.AlertActionRegistry;
import org.elasticsearch.alerts.actions.ActionRegistry;
import org.elasticsearch.alerts.actions.AlertActions;
import org.elasticsearch.alerts.payload.Payload;
import org.elasticsearch.alerts.payload.PayloadRegistry;
@ -160,11 +160,11 @@ public class Alert implements ToXContent {
private final TriggerRegistry triggerRegistry;
private final ScheduleRegistry scheduleRegistry;
private final PayloadRegistry payloadRegistry;
private final AlertActionRegistry actionRegistry;
private final ActionRegistry actionRegistry;
@Inject
public Parser(Settings settings, TriggerRegistry triggerRegistry, ScheduleRegistry scheduleRegistry,
PayloadRegistry payloadRegistry, AlertActionRegistry actionRegistry) {
PayloadRegistry payloadRegistry, ActionRegistry actionRegistry) {
super(settings);
this.triggerRegistry = triggerRegistry;
@ -201,7 +201,7 @@ public class Alert implements ToXContent {
} else if (TRIGGER_FIELD.match(currentFieldName)) {
trigger = triggerRegistry.parse(parser);
} else if (ACTIONS_FIELD.match(currentFieldName)) {
actions = actionRegistry.parse(parser);
actions = actionRegistry.parseActions(parser);
} else if (PAYLOAD_FIELD.match(currentFieldName)) {
payload = payloadRegistry.parse(parser);
} else if (META_FIELD.match(currentFieldName)) {

View File

@ -6,7 +6,7 @@
package org.elasticsearch.alerts;
import org.elasticsearch.alerts.actions.AlertActionRegistry;
import org.elasticsearch.alerts.actions.ActionModule;
import org.elasticsearch.alerts.client.AlertsClientModule;
import org.elasticsearch.alerts.history.HistoryService;
import org.elasticsearch.alerts.payload.PayloadModule;
@ -33,7 +33,8 @@ public class AlertsModule extends AbstractModule implements SpawnModules {
new AlertsRestModule(),
new SchedulerModule(),
new AlertsTransportModule(),
new TriggerModule());
new TriggerModule(),
new ActionModule());
}
@Override
@ -44,7 +45,6 @@ public class AlertsModule extends AbstractModule implements SpawnModules {
bind(AlertsStore.class).asEagerSingleton();
bind(TemplateUtils.class).asEagerSingleton();
bind(HistoryService.class).asEagerSingleton();
bind(AlertActionRegistry.class).asEagerSingleton();
bind(ConfigurationService.class).asEagerSingleton();
}

View File

@ -9,7 +9,7 @@ package org.elasticsearch.alerts;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.alerts.actions.AlertActionRegistry;
import org.elasticsearch.alerts.actions.Action;
import org.elasticsearch.alerts.history.AlertRecord;
import org.elasticsearch.alerts.history.HistoryService;
import org.elasticsearch.alerts.scheduler.Scheduler;
@ -40,7 +40,6 @@ public class AlertsService extends AbstractComponent {
private final Scheduler scheduler;
private final AlertsStore alertsStore;
private final HistoryService historyService;
private final AlertActionRegistry actionRegistry;
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final KeyedLock<String> alertLock = new KeyedLock<>();
@ -51,14 +50,13 @@ public class AlertsService extends AbstractComponent {
@Inject
public AlertsService(Settings settings, ClusterService clusterService, Scheduler scheduler, AlertsStore alertsStore,
IndicesService indicesService, HistoryService historyService,
AlertActionRegistry actionRegistry, ThreadPool threadPool) {
ThreadPool threadPool) {
super(settings);
this.scheduler = scheduler;
this.threadPool = threadPool;
this.alertsStore = alertsStore;
this.historyService = historyService;
this.historyService.setAlertsService(this);
this.actionRegistry = actionRegistry;
this.clusterService = clusterService;
scheduler.addListener(new SchedulerListener());
@ -178,7 +176,10 @@ public class AlertsService extends AbstractComponent {
if (!throttleResult.throttle()) {
Map<String, Object> data = alert.payload().execute(alert, triggerResult, entry.getScheduledTime(), entry.getFireTime());
alertRun = new AlertRun(triggerResult, data);
actionRegistry.doAction(alert, alertRun);
for (Action action : alert.actions()){
Action.Result actionResult = action.execute(alert, data);
//TODO : process action result, what to do if just one action fails or throws exception ?
}
alert.status().executed(entry.getScheduledTime());
} else {
alert.status().throttled(entry.getFireTime(), throttleResult.reason());

View File

@ -0,0 +1,77 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.actions;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.Map;
/**
*/
public abstract class Action<R extends Action.Result> implements ToXContent {
public static final String ALERT_NAME_VARIABLE_NAME = "alert_name";
public static final String RESPONSE_VARIABLE_NAME = "response";
protected final ESLogger logger;
protected Action(ESLogger logger) {
this.logger = logger;
}
/**
* @return the type of this action
*/
public abstract String type();
/**
* Executes this action
*/
public abstract R execute(Alert alert, Map<String,Object> data) throws IOException;
/**
* Parses xcontent to a concrete action of the same type.
*/
protected static interface Parser<T extends Action> {
/**
* @return The type of the action
*/
String type();
/**
* Parses the given xcontent and creates a concrete action
*/
T parse(XContentParser parser) throws IOException;
}
protected static abstract class Result implements ToXContent {
private final boolean success;
private final String type;
public Result(String type, boolean success) {
this.type = type;
this.success = success;
}
public String type() {
return type;
}
public boolean success() {
return success;
}
}
}

View File

@ -5,18 +5,18 @@
*/
package org.elasticsearch.alerts.actions;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.alerts.AlertsException;
/**
* Classes that implement this interface should be a POJO
* containing the data needed to do this action
*
*/
public interface AlertAction extends ToXContent {
public class ActionException extends AlertsException {
/**
*
* @return
*/
public String getActionName();
public ActionException(String msg) {
super(msg);
}
public ActionException(String msg, Throwable cause) {
super(msg, cause);
}
}

View File

@ -0,0 +1,50 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.actions;
import org.elasticsearch.alerts.actions.email.EmailAction;
import org.elasticsearch.alerts.actions.index.IndexAction;
import org.elasticsearch.alerts.actions.webhook.WebhookAction;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.MapBinder;
import java.util.HashMap;
import java.util.Map;
/**
*/
public class ActionModule extends AbstractModule {
private final Map<String, Class<? extends Action.Parser>> parsers = new HashMap<>();
public void registerTrigger(String type, Class<? extends Action.Parser> parserType) {
parsers.put(type, parserType);
}
@Override
protected void configure() {
MapBinder<String, Action.Parser> parsersBinder = MapBinder.newMapBinder(binder(), String.class, Action.Parser.class);
bind(EmailAction.Parser.class).asEagerSingleton();
parsersBinder.addBinding(EmailAction.TYPE).to(EmailAction.Parser.class);
bind(WebhookAction.Parser.class).asEagerSingleton();
parsersBinder.addBinding(WebhookAction.TYPE).to(WebhookAction.Parser.class);
bind(IndexAction.Parser.class).asEagerSingleton();
parsersBinder.addBinding(IndexAction.TYPE).to(IndexAction.Parser.class);
for (Map.Entry<String, Class<? extends Action.Parser>> entry : parsers.entrySet()) {
bind(entry.getValue()).asEagerSingleton();
parsersBinder.addBinding(entry.getKey()).to(entry.getValue());
}
bind(ActionRegistry.class).asEagerSingleton();
}
}

View File

@ -0,0 +1,63 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.actions;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
*/
public class ActionRegistry {
private final ImmutableMap<String, Action.Parser> parsers;
@Inject
public ActionRegistry(Map<String, Action.Parser> parsers) {
this.parsers = ImmutableMap.copyOf(parsers);
}
/**
* Reads the contents of parser to create the correct Action
* @param parser The parser containing the trigger definition
* @return a new Action instance from the parser
* @throws IOException
*/
public Action parse(XContentParser parser) throws IOException {
String type = null;
XContentParser.Token token;
Action action = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
type = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT && type != null) {
Action.Parser triggerParser = parsers.get(type);
if (triggerParser == null) {
throw new ActionException("unknown action type [" + type + "]");
}
action = triggerParser.parse(parser);
}
}
return action;
}
public AlertActions parseActions(XContentParser parser) throws IOException {
List<Action> actions = new ArrayList<>();
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
actions.add(parse(parser));
}
return new AlertActions(actions);
}
}

View File

@ -1,22 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.actions;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.AlertsService;
import org.elasticsearch.alerts.triggers.TriggerResult;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
public interface AlertActionFactory {
AlertAction createAction(XContentParser parser) throws IOException;
public boolean doAction(AlertAction action, Alert alert, AlertsService.AlertRun alertRun);
}

View File

@ -1,77 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.actions;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.AlertsService;
import org.elasticsearch.alerts.ConfigurationService;
import org.elasticsearch.alerts.support.init.proxy.ClientProxy;
import org.elasticsearch.alerts.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.alerts.triggers.TriggerResult;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class AlertActionRegistry extends AbstractComponent {
private volatile ImmutableOpenMap<String, AlertActionFactory> actionImplemented;
@Inject
public AlertActionRegistry(Settings settings, ClientProxy client, ConfigurationService configurationService, ScriptServiceProxy scriptService) {
super(settings);
actionImplemented = ImmutableOpenMap.<String, AlertActionFactory>builder()
.fPut("email", new SmtpAlertActionFactory(configurationService, scriptService))
.fPut("index", new IndexAlertActionFactory(client, configurationService))
.fPut("webhook", new WebhookAlertActionFactory(scriptService))
.build();
}
public void registerAction(String name, AlertActionFactory actionFactory){
actionImplemented = ImmutableOpenMap.builder(actionImplemented)
.fPut(name, actionFactory)
.build();
}
public AlertActions parse(XContentParser parser) throws IOException {
List<AlertAction> actions = new ArrayList<>();
ImmutableOpenMap<String, AlertActionFactory> actionImplemented = this.actionImplemented;
String actionFactoryName = null;
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
actionFactoryName = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT) {
AlertActionFactory factory = actionImplemented.get(actionFactoryName);
if (factory != null) {
actions.add(factory.createAction(parser));
} else {
throw new ElasticsearchIllegalArgumentException("No action exists with the name [" + actionFactoryName + "]");
}
}
}
return new AlertActions(actions);
}
public void doAction(Alert alert, AlertsService.AlertRun alertRun){
for (AlertAction action : alert.actions()) {
AlertActionFactory factory = actionImplemented.get(action.getActionName());
if (factory != null) {
factory.doAction(action, alert, alertRun);
} else {
throw new ElasticsearchIllegalArgumentException("No action exists with the name [" + action.getActionName() + "]");
}
}
}
}

View File

@ -15,24 +15,24 @@ import java.util.List;
/**
*
*/
public class AlertActions implements Iterable<AlertAction>, ToXContent {
public class AlertActions implements Iterable<Action>, ToXContent {
private final List<AlertAction> actions;
private final List<Action> actions;
public AlertActions(List<AlertAction> actions) {
public AlertActions(List<Action> actions) {
this.actions = actions;
}
@Override
public Iterator<AlertAction> iterator() {
public Iterator<Action> iterator() {
return actions.iterator();
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
for (AlertAction action : actions){
builder.field(action.getActionName());
for (Action action : actions){
builder.field(action.type());
action.toXContent(builder, params);
}
return builder.endObject();

View File

@ -1,45 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.actions;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
public class IndexAlertAction implements AlertAction, ToXContent {
private final String index;
private final String type;
public IndexAlertAction(String index, String type){
this.index = index;
this.type = type;
}
@Override
public String getActionName() {
return "index";
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("index", index);
builder.field("type", type);
builder.endObject();
return builder;
}
public String getType() {
return type;
}
public String getIndex() {
return index;
}
}

View File

@ -1,87 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.actions;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.AlertsService;
import org.elasticsearch.alerts.ConfigurationService;
import org.elasticsearch.alerts.support.init.proxy.ClientProxy;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
/**
*/
public class IndexAlertActionFactory implements AlertActionFactory {
private final ClientProxy client;
private final ConfigurationService configurationService;
public IndexAlertActionFactory(ClientProxy client, ConfigurationService configurationService){
this.client = client;
this.configurationService = configurationService;
}
@Override
public AlertAction createAction(XContentParser parser) throws IOException {
String index = null;
String type = null;
String currentFieldName = null;
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
switch (currentFieldName) {
case "index":
index = parser.text();
break;
case "type":
type = parser.text();
break;
default:
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]");
}
} else {
throw new ElasticsearchIllegalArgumentException("Unexpected token [" + token + "]");
}
}
return new IndexAlertAction(index, type);
}
@Override
public boolean doAction(AlertAction action, Alert alert, AlertsService.AlertRun alertRun) {
if (!(action instanceof IndexAlertAction)) {
throw new ElasticsearchIllegalStateException("Bad action [" + action.getClass() + "] passed to IndexAlertActionFactory expected [" + IndexAlertAction.class + "]");
}
IndexAlertAction indexAlertAction = (IndexAlertAction) action;
IndexRequest indexRequest = new IndexRequest();
indexRequest.index(indexAlertAction.getIndex());
indexRequest.type(indexAlertAction.getType());
try {
XContentBuilder resultBuilder = XContentFactory.jsonBuilder().prettyPrint();
resultBuilder.startObject();
resultBuilder.field("response", alertRun.data());
resultBuilder.field("timestamp", alert.getLastExecuteTime()); ///@TODO FIXME the firetime should be in the result ?
resultBuilder.endObject();
indexRequest.source(resultBuilder);
} catch (IOException ie) {
throw new ElasticsearchException("Unable to create XContentBuilder",ie);
}
return client.index(indexRequest).actionGet().isCreated();
}
}

View File

@ -1,105 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.actions;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.xcontent.XContentBuilder;
import javax.mail.*;
import javax.mail.internet.AddressException;
import javax.mail.internet.InternetAddress;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class SmtpAlertAction implements AlertAction {
private final List<Address> emailAddresses = new ArrayList<>();
private final String subjectTemplate;
private final String messageTemplate;
public SmtpAlertAction(String subjectTemplate, String messageTemplate, String... addresses){
for (String address : addresses) {
addEmailAddress(address);
}
this.subjectTemplate = subjectTemplate;
this.messageTemplate = messageTemplate;
}
public void addEmailAddress(String address) {
try {
emailAddresses.add(InternetAddress.parse(address)[0]);
} catch (AddressException addressException) {
throw new ElasticsearchException("Unable to parse address : [" + address + "]");
}
}
public List<Address> getEmailAddresses() {
return new ArrayList<>(emailAddresses);
}
public String getMessageTemplate() {
return messageTemplate;
}
public String getSubjectTemplate() {
return subjectTemplate;
}
@Override
public String getActionName() {
return "email";
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("addresses");
builder.startArray();
for (Address emailAddress : emailAddresses){
builder.value(emailAddress.toString());
}
builder.endArray();
if (subjectTemplate != null) {
builder.field("subject", subjectTemplate);
}
if (messageTemplate != null) {
builder.field("message", messageTemplate);
}
builder.endObject();
return builder;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SmtpAlertAction that = (SmtpAlertAction) o;
if (emailAddresses != null ? !emailAddresses.equals(that.emailAddresses) : that.emailAddresses != null)
return false;
if (!messageTemplate.equals(that.messageTemplate)) return false;
if (!subjectTemplate.equals(that.subjectTemplate)) return false;
return true;
}
@Override
public int hashCode() {
int result = emailAddresses != null ? emailAddresses.hashCode() : 0;
result = 31 * result + subjectTemplate.hashCode();
result = 31 * result + messageTemplate.hashCode();
return result;
}
}

View File

@ -1,166 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.actions;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.ConfigurableComponentListener;
import org.elasticsearch.alerts.ConfigurationService;
import org.elasticsearch.alerts.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.alerts.triggers.TriggerResult;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService;
import javax.mail.*;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;
import java.io.IOException;
import java.util.*;
public class SmtpAlertActionFactory implements AlertActionFactory, ConfigurableComponentListener {
private static final String PORT_SETTING = "alerts.action.email.server.port";
private static final String SERVER_SETTING = "alerts.action.email.server.name";
private static final String FROM_SETTING = "alerts.action.email.from.address";
private static final String PASSWD_SETTING = "alerts.action.email.from.passwd";
private static final String USERNAME_SETTING = "alerts.action.email.from.username";
private static final String ALERT_NAME_VARIABLE_NAME = "alert_name";
private static final String RESPONSE_VARIABLE_NAME = "response";
private static final String DEFAULT_SUBJECT = "Elasticsearch Alert {{alert_name}} triggered";
private static final String DEFAULT_MESSAGE = "{{alert_name}} triggered with {{response.hits.total}} results";
private final ConfigurationService configurationService;
private final ScriptServiceProxy scriptService;
private volatile Settings settings;
public SmtpAlertActionFactory(ConfigurationService configurationService, ScriptServiceProxy scriptService) {
this.configurationService = configurationService;
this.scriptService = scriptService;
}
@Override
public AlertAction createAction(XContentParser parser) throws IOException {
if (settings == null) {
settings = configurationService.getConfig();
configurationService.registerListener(this);
}
String messageTemplate = DEFAULT_MESSAGE;
String subjectTemplate = DEFAULT_SUBJECT;
List<String> addresses = new ArrayList<>();
String currentFieldName = null;
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
switch (currentFieldName) {
case "subject":
subjectTemplate = parser.text();
break;
case "message":
messageTemplate = parser.text();
break;
default:
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]");
}
} else if (token == XContentParser.Token.START_ARRAY) {
switch (currentFieldName) {
case "addresses":
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
addresses.add(parser.text());
}
break;
default:
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]");
}
} else {
throw new ElasticsearchIllegalArgumentException("Unexpected token [" + token + "]");
}
}
return new SmtpAlertAction(subjectTemplate, messageTemplate, addresses.toArray(new String[addresses.size()]));
}
@Override
public boolean doAction(AlertAction action, Alert alert, TriggerResult result) {
if (!(action instanceof SmtpAlertAction)) {
throw new ElasticsearchIllegalStateException("Bad action [" + action.getClass() + "] passed to EmailAlertActionFactory expected [" + SmtpAlertAction.class + "]");
}
if (settings == null) {
throw new ElasticsearchException("No settings loaded for Smtp (email)");
}
SmtpAlertAction smtpAlertAction = (SmtpAlertAction)action;
Properties props = new Properties();
props.put("mail.smtp.auth", "true");
props.put("mail.smtp.starttls.enable", "true");
props.put("mail.smtp.host", settings.get(SERVER_SETTING, "smtp.gmail.com"));
props.put("mail.smtp.port", settings.getAsInt(PORT_SETTING, 587));
final Session session;
if (settings.get(PASSWD_SETTING) != null) {
session = Session.getInstance(props,
new javax.mail.Authenticator() {
protected PasswordAuthentication getPasswordAuthentication() {
return new PasswordAuthentication(
settings.get(USERNAME_SETTING) == null ? settings.get(FROM_SETTING) : settings.get(USERNAME_SETTING),
settings.get(PASSWD_SETTING));
}
});
} else {
session = Session.getDefaultInstance(props);
}
Message message = new MimeMessage(session);
try {
message.setFrom(new InternetAddress(settings.get(FROM_SETTING)));
message.setRecipients(Message.RecipientType.TO,
smtpAlertAction.getEmailAddresses().toArray(new Address[1]));
if (smtpAlertAction.getSubjectTemplate() != null) {
message.setSubject(renderTemplate(smtpAlertAction.getSubjectTemplate(), alert, result, scriptService));
} else {
throw new ElasticsearchException("Subject Template not found");
}
if (smtpAlertAction.getMessageTemplate() != null) {
message.setText(renderTemplate(smtpAlertAction.getMessageTemplate(), alert, result, scriptService));
} else {
throw new ElasticsearchException("Email Message Template not found");
}
Transport.send(message);
} catch (Exception e){
throw new ElasticsearchException("Failed to send mail", e);
}
return true;
}
@Override
public void receiveConfigurationUpdate(Settings settings) {
this.settings = settings;
}
public static String renderTemplate(String template, Alert alert, TriggerResult result, ScriptServiceProxy scriptService) {
Map<String, Object> templateParams = new HashMap<>();
templateParams.put(ALERT_NAME_VARIABLE_NAME, alert.getName());
templateParams.put(RESPONSE_VARIABLE_NAME, result.getActionResponse());
ExecutableScript script = scriptService.executable("mustache", template, ScriptService.ScriptType.INLINE, templateParams);
return ((BytesReference) script.run()).toUtf8();
}
}

View File

@ -1,87 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.actions;
import org.elasticsearch.common.netty.handler.codec.http.HttpMethod;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
/**
*/
public class WebhookAlertAction implements AlertAction {
private final String url;
private final HttpMethod method;
private final String parameterString;
public WebhookAlertAction(String url, HttpMethod method, String parameterString) {
this.url = url;
this.method = method;
this.parameterString = parameterString;
}
public String getUrl() {
return url;
}
public HttpMethod getMethod() {
return method;
}
public String getParameterString() {
return parameterString;
}
/**
*/
@Override
public String getActionName() {
return "webhook";
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("url", url);
builder.field("method", method.getName());
builder.field("parameter_string", parameterString);
builder.endObject();
return builder;
}
@Override
public String toString() {
return "WebhookAlertAction{" +
"url='" + url + '\'' +
", method=" + method +
", parameterString='" + parameterString + '\'' +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
WebhookAlertAction that = (WebhookAlertAction) o;
if (method != null ? !method.equals(that.method) : that.method != null) return false;
if (parameterString != null ? !parameterString.equals(that.parameterString) : that.parameterString != null)
return false;
if (url != null ? !url.equals(that.url) : that.url != null) return false;
return true;
}
@Override
public int hashCode() {
int result = url != null ? url.hashCode() : 0;
result = 31 * result + (method != null ? method.hashCode() : 0);
result = 31 * result + (parameterString != null ? parameterString.hashCode() : 0);
return result;
}
}

View File

@ -1,161 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.actions;
import org.apache.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.alerts.triggers.TriggerResult;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.netty.handler.codec.http.HttpMethod;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
/**
* This action factory will call back a web hook using a templatized url
* If the method is PUT or POST the request,response and alert name will be sent as well
*/
public class WebhookAlertActionFactory implements AlertActionFactory {
private static String ALERT_NAME = "alert_name";
private static String RESPONSE = "response";
private static String REQUEST = "request";
static String DEFAULT_PARAMETER_STRING = "alertname={{alert_name}}&request=%{{request}}&response=%{{response}}";
private final ScriptServiceProxy scriptService;
private final Logger logger = Logger.getLogger(WebhookAlertActionFactory.class);
public WebhookAlertActionFactory(ScriptServiceProxy scriptService) {
this.scriptService = scriptService;
}
@Override
public AlertAction createAction(XContentParser parser) throws IOException {
String url = null;
HttpMethod method = HttpMethod.POST;
String parameterTemplate = DEFAULT_PARAMETER_STRING;
String currentFieldName = null;
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
switch (currentFieldName) {
case "url":
url = parser.text();
break;
case "method":
method = HttpMethod.valueOf(parser.text());
if (method != HttpMethod.POST && method != HttpMethod.GET && method != HttpMethod.PUT) {
throw new ElasticsearchIllegalArgumentException("Unsupported http method ["
+ method.getName() + "]");
}
break;
case "parameter_string":
parameterTemplate = parser.text();
break;
default:
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]");
}
} else {
throw new ElasticsearchIllegalArgumentException("Unexpected token [" + token + "]");
}
}
return new WebhookAlertAction(url, method, parameterTemplate);
}
@Override
public boolean doAction(AlertAction action, Alert alert, TriggerResult result) {
if (!(action instanceof WebhookAlertAction)) {
throw new ElasticsearchIllegalStateException("Bad action [" + action.getClass() + "] passed to WebhookAlertActionFactory expected [" + WebhookAlertAction.class + "]");
}
WebhookAlertAction webhookAlertAction = (WebhookAlertAction)action;
String url = webhookAlertAction.getUrl();
String renderedUrl = renderUrl(url, alert, result, scriptService);
HttpMethod method = webhookAlertAction.getMethod();
try {
URL urlToOpen = new URL(renderedUrl);
HttpURLConnection httpConnection = (HttpURLConnection) urlToOpen.openConnection();
httpConnection.setRequestMethod(method.getName());
httpConnection.setRequestProperty("Accept-Charset", StandardCharsets.UTF_8.name());
if (method == HttpMethod.POST || method == HttpMethod.PUT) {
httpConnection.setDoOutput(true);
httpConnection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded;charset="
+ StandardCharsets.UTF_8.name());
String parameters = encodeParameterString(webhookAlertAction.getParameterString(), alert, result, scriptService);
httpConnection.getOutputStream().write(parameters.getBytes(StandardCharsets.UTF_8.name()));
}
int status = httpConnection.getResponseCode();
if (status >= 400) {
throw new ElasticsearchException("Got status [" + status + "] when connecting to [" + renderedUrl + "]");
} else {
if (status >= 300) {
logger.warn("A 200 range return code was expected, but got [" + status + "]");
}
return true;
}
} catch (IOException ioe) {
throw new ElasticsearchException("Unable to connect to [" + renderedUrl + "]", ioe);
}
}
static String encodeParameterString(String parameterString, Alert alert, TriggerResult result, ScriptServiceProxy scriptService) throws IOException {
XContentBuilder responseBuilder = XContentFactory.jsonBuilder();
responseBuilder.startObject();
responseBuilder.field("response",result.getTriggerResponse());
responseBuilder.endObject();
String requestJSON = XContentHelper.convertToJson(result.getTriggerRequest().source(), true);
String responseJSON = XContentHelper.convertToJson(responseBuilder.bytes(), true);
Map<String, Object> templateParams = new HashMap<>();
templateParams.put(ALERT_NAME, URLEncoder.encode(alert.getName(), StandardCharsets.UTF_8.name()));
templateParams.put(RESPONSE, URLEncoder.encode(responseJSON, StandardCharsets.UTF_8.name()));
templateParams.put(REQUEST, URLEncoder.encode(requestJSON, StandardCharsets.UTF_8.name()));
ExecutableScript script = scriptService.executable("mustache", parameterString, ScriptService.ScriptType.INLINE, templateParams);
return ((BytesReference) script.run()).toUtf8();
}
public String renderUrl(String url, Alert alert, TriggerResult result, ScriptServiceProxy scriptService) {
Map<String, Object> templateParams = new HashMap<>();
templateParams.put(ALERT_NAME, alert.getName());
templateParams.put(RESPONSE, result.getActionResponse());
ExecutableScript script = scriptService.executable("mustache", url, ScriptService.ScriptType.INLINE, templateParams);
return ((BytesReference) script.run()).toUtf8();
}
}

View File

@ -0,0 +1,372 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.actions.email;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.actions.Action;
import org.elasticsearch.alerts.actions.ActionException;
import org.elasticsearch.alerts.support.StringTemplateUtils;
import org.elasticsearch.cluster.settings.DynamicSettings;
import org.elasticsearch.cluster.settings.Validator;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.internal.Nullable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.script.ScriptService;
import javax.mail.*;
import javax.mail.internet.AddressException;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;
import java.io.IOException;
import java.util.*;
/**
*/
public class EmailAction extends Action<EmailAction.Result> implements NodeSettingsService.Listener {
public static final String TYPE = "email";
static final String PORT_SETTING = "alerts.action.email.server.port";
static final String SERVER_SETTING = "alerts.action.email.server.name";
static final String FROM_SETTING = "alerts.action.email.from.address";
static final String USERNAME_SETTING = "alerts.action.email.from.username";
static final String PASSWORD_SETTING = "alerts.action.email.from.password";
private final List<Address> emailAddresses;
//Optional, can be null, will use defaults from emailSettings (EmailServiceConfig)
private final String fromAddress;
private final StringTemplateUtils.Template subjectTemplate;
private final StringTemplateUtils.Template messageTemplate;
private static final String DEFAULT_SERVER = "smtp.gmail.com";
private static final int DEFAULT_PORT = 578;
private static final StringTemplateUtils.Template DEFAULT_SUBJECT_TEMPLATE = new StringTemplateUtils.Template(
"Elasticsearch Alert {{alert_name}} triggered", null, "mustache", ScriptService.ScriptType.INLINE);
private static final StringTemplateUtils.Template DEFAULT_MESSAGE_TEMPLATE = new StringTemplateUtils.Template(
"{{alert_name}} triggered with {{response.hits.total}} results", null, "mustache", ScriptService.ScriptType.INLINE);
private final StringTemplateUtils templateUtils;
private volatile EmailServiceConfig emailSettings = new EmailServiceConfig(DEFAULT_SERVER, DEFAULT_PORT, null, null, null);
protected EmailAction(ESLogger logger, Settings settings, NodeSettingsService nodeSettingsService,
StringTemplateUtils templateUtils, @Nullable StringTemplateUtils.Template subjectTemplate,
@Nullable StringTemplateUtils.Template messageTemplate, @Nullable String fromAddress,
List<Address> emailAddresses) {
super(logger);
this.templateUtils = templateUtils;
this.emailAddresses = new ArrayList<>();
this.emailAddresses.addAll(emailAddresses);
this.subjectTemplate = subjectTemplate;
this.messageTemplate = messageTemplate;
this.fromAddress = fromAddress;
nodeSettingsService.addListener(this);
updateSettings(settings);
}
@Override
public String type() {
return TYPE;
}
@Override
public Result execute(Alert alert, Map<String, Object> data) throws IOException {
Properties props = new Properties();
props.put("mail.smtp.auth", "true");
props.put("mail.smtp.starttls.enable", "true");
props.put("mail.smtp.host", emailSettings.host);
props.put("mail.smtp.port", emailSettings.port);
final Session session;
if (emailSettings.password != null) {
final String username;
if (emailSettings.username != null) {
username = emailSettings.username;
} else {
username = emailSettings.defaultFromAddress;
}
if (username == null) {
throw new ActionException("unable to send email for alert [" + alert.name() + "]. username or the default from address is not set");
}
session = Session.getInstance(props,
new javax.mail.Authenticator() {
protected PasswordAuthentication getPasswordAuthentication() {
return new PasswordAuthentication(username, emailSettings.password);
}
});
} else {
session = Session.getDefaultInstance(props);
}
try {
Message email = new MimeMessage(session);
String fromAddressToUse = emailSettings.defaultFromAddress;
if (fromAddress != null) {
fromAddressToUse = fromAddress;
}
email.setFrom(new InternetAddress(fromAddressToUse));
email.setRecipients(Message.RecipientType.TO, emailAddresses.toArray(new Address[1]));
Map<String, Object> alertParams = new HashMap<>();
alertParams.put(Action.ALERT_NAME_VARIABLE_NAME, alert.name());
alertParams.put(RESPONSE_VARIABLE_NAME, data);
String subject = templateUtils.executeTemplate(
subjectTemplate != null ? subjectTemplate : DEFAULT_SUBJECT_TEMPLATE,
alertParams);
email.setSubject(subject);
String message = templateUtils.executeTemplate(
messageTemplate != null ? messageTemplate : DEFAULT_MESSAGE_TEMPLATE,
alertParams);
email.setText(message);
Transport.send(email);
return new Result(true, fromAddressToUse, emailAddresses, subject, message );
} catch (Throwable e){
throw new ActionException("failed to send mail for alert [" + alert.name() + "]", e);
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("addresses");
builder.startArray();
for (Address emailAddress : emailAddresses){
builder.value(emailAddress.toString());
}
builder.endArray();
if (subjectTemplate != null) {
StringTemplateUtils.writeTemplate("subject_template", subjectTemplate, builder, params);
}
if (messageTemplate != null) {
StringTemplateUtils.writeTemplate("message_template", messageTemplate, builder, params);
}
if (fromAddress != null) {
builder.field("from", fromAddress);
}
builder.endObject();
return builder;
}
@Override
public void onRefreshSettings(Settings settings) {
updateSettings(settings);
}
public static class Parser extends AbstractComponent implements Action.Parser<EmailAction> {
private final NodeSettingsService nodeSettingsService;
private final StringTemplateUtils templateUtils;
@Inject
public Parser(Settings settings, DynamicSettings dynamicSettings, NodeSettingsService nodeSettingsService, StringTemplateUtils templateUtils) {
super(settings);
this.nodeSettingsService = nodeSettingsService;
this.templateUtils = templateUtils;
dynamicSettings.addDynamicSetting(PORT_SETTING, Validator.POSITIVE_INTEGER);
dynamicSettings.addDynamicSetting(SERVER_SETTING);
dynamicSettings.addDynamicSetting(FROM_SETTING);
dynamicSettings.addDynamicSetting(USERNAME_SETTING);
dynamicSettings.addDynamicSetting(PASSWORD_SETTING);
}
@Override
public String type() {
return TYPE;
}
@Override
public EmailAction parse(XContentParser parser) throws IOException {
StringTemplateUtils.Template subjectTemplate = null;
StringTemplateUtils.Template messageTemplate = null;
String fromAddress = null;
List<Address> addresses = new ArrayList<>();
String currentFieldName = null;
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
switch (currentFieldName) {
case "subject_template":
subjectTemplate = StringTemplateUtils.readTemplate(parser);
break;
case "message_template":
messageTemplate = StringTemplateUtils.readTemplate(parser);
break;
case "from":
fromAddress = parser.text();
break;
default:
throw new ActionException("could not parse email action. unexpected field [" + currentFieldName + "]");
}
} else if (token == XContentParser.Token.START_ARRAY) {
switch (currentFieldName) {
case "addresses":
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
try {
addresses.add(InternetAddress.parse(parser.text())[0]);
} catch (AddressException ae) {
throw new ActionException("could not parse email action. unable to parse [" + parser.text() + "] as an email address", ae);
}
}
break;
default:
throw new ActionException("could not parse email action. unexpected field [" + currentFieldName + "]");
}
} else {
throw new ActionException("could not parse email action. unexpected token [" + token + "]");
}
}
if (addresses.isEmpty()) {
throw new ActionException("could not parse email action. [addresses] was not found or was empty");
}
return new EmailAction(logger, settings, nodeSettingsService,
templateUtils, subjectTemplate, messageTemplate, fromAddress, addresses);
}
}
public static class Result extends Action.Result {
private final String from;
private final List<Address> recipients;
private final String subject;
private final String message;
public Result(boolean success, String from, List<Address> recipients, String subject, String message) {
super(TYPE, success);
this.from = from;
this.recipients = recipients;
this.subject = subject;
this.message = message;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("fromAddress", from());
builder.field("subject", subject());
builder.array("to", recipients());
builder.field("message", message());
builder.field("success", success());
builder.endObject();
return builder;
}
public String from() {
return from;
}
public String subject() {
return subject;
}
public String message() {
return message;
}
public List<Address> recipients() {
return recipients;
}
}
// This is useful to change all settings at the same time. Otherwise we may change the username then email gets send
// and then change the password and then the email sending fails.
//
// Also this reduces the number of volatile writes
private class EmailServiceConfig {
private String host;
private int port;
private String username;
private String password;
private String defaultFromAddress;
private EmailServiceConfig(String host, int port, String userName, String password, String defaultFromAddress) {
this.host = host;
this.port = port;
this.username = userName;
this.password = password;
this.defaultFromAddress = defaultFromAddress;
}
}
private void updateSettings(Settings settings) {
boolean changed = false;
String host = emailSettings.host;
String newHost = settings.get(SERVER_SETTING);
if (newHost != null && !newHost.equals(host)) {
host = newHost;
changed = true;
}
int port = emailSettings.port;
int newPort = settings.getAsInt(PORT_SETTING, -1);
if (newPort != -1) {
port = newPort;
changed = true;
}
String fromAddress = emailSettings.defaultFromAddress;
String newFromAddress = settings.get(FROM_SETTING);
if (newFromAddress != null && !newFromAddress.equals(fromAddress)) {
fromAddress = newFromAddress;
changed = true;
}
String userName = emailSettings.username;
String newUserName = settings.get(USERNAME_SETTING);
if (newUserName != null && !newUserName.equals(userName)) {
userName = newFromAddress;
changed = true;
}
String password = emailSettings.password;
String newPassword = settings.get(PASSWORD_SETTING);
if (newPassword != null && !newPassword.equals(password)) {
password = newPassword;
changed = true;
}
if (changed) {
emailSettings = new EmailServiceConfig(host, port, fromAddress, userName, password);
}
}
}

View File

@ -0,0 +1,154 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.actions.index;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.actions.Action;
import org.elasticsearch.alerts.actions.ActionException;
import org.elasticsearch.alerts.support.init.proxy.ClientProxy;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.Map;
import static org.elasticsearch.alerts.support.AlertUtils.responseToData;
/**
*/
public class IndexAction extends Action<IndexAction.Result> {
public static final String TYPE = "index";
private final ClientProxy client;
private final String index;
private final String type;
protected IndexAction(ESLogger logger, ClientProxy client, String index, String type) {
super(logger);
this.client = client;
this.index = index;
this.type = type;
}
@Override
public String type() {
return TYPE;
}
@Override
public Result execute(Alert alert, Map<String, Object> data) throws IOException {
IndexRequest indexRequest = new IndexRequest();
indexRequest.index(index);
indexRequest.type(type);
try {
XContentBuilder resultBuilder = XContentFactory.jsonBuilder().prettyPrint();
resultBuilder.startObject();
resultBuilder.field("response", data);
resultBuilder.field("timestamp", alert.status().lastExecuted());
resultBuilder.endObject();
indexRequest.source(resultBuilder);
} catch (IOException ie) {
throw new ActionException("failed to index result for alert [" + alert.name() + " ]", ie);
}
return new Result(client.index(indexRequest).actionGet());
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("index", index);
builder.field("type", type);
builder.endObject();
return builder;
}
public static class Parser extends AbstractComponent implements Action.Parser<IndexAction> {
private final ClientProxy client;
@Inject
public Parser(Settings settings, ClientProxy client) {
super(settings);
this.client = client;
}
@Override
public String type() {
return TYPE;
}
@Override
public IndexAction parse(XContentParser parser) throws IOException {
String index = null;
String type = null;
String currentFieldName = null;
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
switch (currentFieldName) {
case "index":
index = parser.text();
break;
case "type":
type = parser.text();
break;
default:
throw new ActionException("could not parse index action. unexpected field [" + currentFieldName + "]");
}
} else {
throw new ActionException("could not parse index action. unexpected token [" + token + "]");
}
}
if (index == null) {
throw new ActionException("could not parse index action [index] is required");
}
if (type == null) {
throw new ActionException("could not parse index action [type] is required");
}
return new IndexAction(logger, client, index, type);
}
}
public static class Result extends Action.Result {
private final IndexResponse response;
public Result(IndexResponse response) {
super(TYPE, response.isCreated());
this.response = response;
}
public IndexResponse response() {
return response;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("success", success());
builder.field("index_response", responseToData(response()));
builder.endObject();
return builder;
}
}
}

View File

@ -0,0 +1,204 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.actions.webhook;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.actions.Action;
import org.elasticsearch.alerts.actions.ActionException;
import org.elasticsearch.alerts.support.StringTemplateUtils;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.base.Charsets;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.netty.handler.codec.http.HttpMethod;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.script.ScriptService;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
import java.util.HashMap;
import java.util.Map;
/**
*/
public class WebhookAction extends Action<WebhookAction.Result> {
public static final String TYPE = "webhook";
private final StringTemplateUtils templateUtils;
private final StringTemplateUtils.Template urlTemplate;
private final HttpMethod method;
//Optional, default will be used if not provided
private final StringTemplateUtils.Template bodyTemplate;
private static final StringTemplateUtils.Template DEFAULT_BODY_TEMPLATE = new StringTemplateUtils.Template(
"{ 'alertname' : '{{alert_name}}', 'request': {{request}}, 'response' : {{response}} }", null,
"mustache", ScriptService.ScriptType.INLINE );
protected WebhookAction(ESLogger logger, StringTemplateUtils templateUtils, @Nullable StringTemplateUtils.Template bodyTemplate,
StringTemplateUtils.Template urlTemplate, HttpMethod method) {
super(logger);
this.templateUtils = templateUtils;
this.bodyTemplate = bodyTemplate;
this.urlTemplate = urlTemplate;
this.method = method;
}
@Override
public String type() {
return TYPE;
}
@Override
public Result execute(Alert alert, Map<String, Object> data) throws IOException {
String renderedUrl = applyTemplate(urlTemplate, alert, data);
try {
URL urlToOpen = new URL(URLEncoder.encode(renderedUrl, Charsets.UTF_8.name()));
HttpURLConnection httpConnection = (HttpURLConnection) urlToOpen.openConnection();
httpConnection.setRequestMethod(method.getName());
httpConnection.setRequestProperty("Accept-Charset", Charsets.UTF_8.name());
httpConnection.setDoOutput(true);
String body = applyTemplate(bodyTemplate != null ? bodyTemplate : DEFAULT_BODY_TEMPLATE, alert, data);
httpConnection.setRequestProperty("Content-Length", Integer.toString(body.length()));
httpConnection.getOutputStream().write(body.getBytes(Charsets.UTF_8.name()));
int status = httpConnection.getResponseCode();
if (status >= 400) {
logger.warn("got status [" + status + "] when connecting to [" + renderedUrl + "]");
} else {
if (status >= 300) {
logger.warn("a 200 range return code was expected, but got [" + status + "]");
}
}
return new Result(status < 400, status, renderedUrl, body);
} catch (IOException ioe) {
throw new ActionException("failed to connect to [" + renderedUrl + "] for alert [" + alert.name() + "]", ioe);
}
}
String applyTemplate(StringTemplateUtils.Template template, Alert alert, Map<String, Object> data) {
Map<String, Object> webHookParams = new HashMap<>();
webHookParams.put(ALERT_NAME_VARIABLE_NAME, alert.name());
webHookParams.put(RESPONSE_VARIABLE_NAME, data);
return templateUtils.executeTemplate(template, webHookParams);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("method", method.getName());
StringTemplateUtils.writeTemplate("body_template", bodyTemplate, builder, params);
StringTemplateUtils.writeTemplate("url_template", urlTemplate, builder, params);
builder.endObject();
return builder;
}
public static class Result extends Action.Result {
private final int httpStatusCode;
private final String url;
private final String body;
public Result(boolean success, int httpStatusCode, String url, String body) {
super(TYPE, success);
this.httpStatusCode = httpStatusCode;
this.url = url;
this.body = body;
}
public int httpStatusCode() {
return httpStatusCode;
}
public String url() {
return url;
}
public String body() {
return body;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("success", success());
builder.field("http_status", httpStatusCode());
builder.field("url", url());
builder.field("body", body());
builder.endObject();
return builder;
}
}
public static class Parser extends AbstractComponent implements Action.Parser<WebhookAction> {
private final StringTemplateUtils templateUtils;
@Inject
public Parser(Settings settings, StringTemplateUtils templateUtils) {
super(settings);
this.templateUtils = templateUtils;
}
@Override
public String type() {
return TYPE;
}
@Override
public WebhookAction parse(XContentParser parser) throws IOException {
HttpMethod method = HttpMethod.POST;
StringTemplateUtils.Template urlTemplate = null;
StringTemplateUtils.Template bodyTemplate = null;
String currentFieldName = null;
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
switch (currentFieldName) {
case "method":
method = HttpMethod.valueOf(parser.text());
if (method != HttpMethod.POST && method != HttpMethod.GET && method != HttpMethod.PUT) {
throw new ActionException("could not parse webhook action. unsupported http method ["
+ method.getName() + "]");
}
break;
case "url_template":
urlTemplate = StringTemplateUtils.readTemplate(parser);
break;
case "body_template":
bodyTemplate = StringTemplateUtils.readTemplate(parser);
break;
default:
throw new ActionException("could not parse webhook action. unexpected field [" + currentFieldName + "]");
}
} else {
throw new ActionException("could not parse webhook action. unexpected token [" + token + "]");
}
}
if (urlTemplate == null) {
throw new ActionException("could not parse webhook action. [url_template] is required");
}
return new WebhookAction(logger, templateUtils, bodyTemplate, urlTemplate, method);
}
}
}

View File

@ -9,6 +9,7 @@ import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.AlertsService;
import org.elasticsearch.alerts.actions.AlertAction;
import org.elasticsearch.alerts.actions.AlertActionState;
import org.elasticsearch.alerts.actions.AlertActions;
import org.elasticsearch.alerts.support.AlertUtils;
import org.elasticsearch.alerts.support.Payload;
import org.elasticsearch.alerts.trigger.Trigger;
@ -31,7 +32,7 @@ public class AlertRecord implements ToXContent {
private DateTime fireTime;
private DateTime scheduledTime;
private Trigger trigger;
private List<AlertAction> actions;
private AlertActions actions;
private AlertActionState state;
@ -145,7 +146,7 @@ public class AlertRecord implements ToXContent {
return actions;
}
void setActions(List<AlertAction> actions) {
void setActions(AlertActions actions) {
this.actions = actions;
}

View File

@ -15,9 +15,11 @@ import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.alerts.*;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.AlertsPlugin;
import org.elasticsearch.alerts.actions.AlertActionRegistry;
import org.elasticsearch.alerts.AlertsService;
import org.elasticsearch.alerts.AlertsStore;
import org.elasticsearch.alerts.actions.ActionRegistry;
import org.elasticsearch.alerts.actions.AlertActionState;
import org.elasticsearch.alerts.support.AlertUtils;
import org.elasticsearch.alerts.support.TemplateUtils;
@ -75,7 +77,6 @@ public class HistoryService extends AbstractComponent {
private final AlertsStore alertsStore;
private final TriggerRegistry triggerRegistry;
private final TemplateUtils templateUtils;
private final AlertActionRegistry actionRegistry;
private final int scrollSize;
private final TimeValue scrollTimeout;
@ -86,12 +87,10 @@ public class HistoryService extends AbstractComponent {
private volatile Thread queueReaderThread;
@Inject
public HistoryService(Settings settings, ClientProxy client, AlertActionRegistry actionRegistry,
ThreadPool threadPool, AlertsStore alertsStore, TriggerRegistry triggerRegistry,
TemplateUtils templateUtils) {
public HistoryService(Settings settings, ClientProxy client, ThreadPool threadPool, AlertsStore alertsStore,
TriggerRegistry triggerRegistry, TemplateUtils templateUtils) {
super(settings);
this.client = client;
this.actionRegistry = actionRegistry;
this.threadPool = threadPool;
this.alertsStore = alertsStore;
this.triggerRegistry = triggerRegistry;
@ -211,7 +210,7 @@ public class HistoryService extends AbstractComponent {
largestQueueSize.set(actionsToBeProcessed.size());
}
AlertRecord parseHistory(String historyId, BytesReference source, long version, AlertActionRegistry actionRegistry) {
AlertRecord parseHistory(String historyId, BytesReference source, long version, ActionRegistry actionRegistry) {
AlertRecord entry = new AlertRecord();
entry.setId(historyId);
entry.setVersion(version);
@ -228,7 +227,7 @@ public class HistoryService extends AbstractComponent {
} else if (token == XContentParser.Token.START_OBJECT) {
switch (currentFieldName) {
case ACTIONS_FIELD:
entry.setActions(actionRegistry.parse(parser));
entry.setActions(actionRegistry.parseActions(parser));
break;
case TRIGGER_FIELD:
entry.setTrigger(triggerRegistry.parse(parser));

View File

@ -7,8 +7,8 @@ package org.elasticsearch.alerts.support;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.alerts.AlertsException;
@ -47,7 +47,7 @@ public final class AlertUtils {
private AlertUtils() {
}
public static Map<String, Object> responseToData(SearchResponse response) {
public static Map<String, Object> responseToData(ActionResponse response) {
try {
XContentBuilder builder = jsonBuilder().startObject().value(response).endObject();
return XContentHelper.convertToMap(builder.bytes(), false).v2();

View File

@ -0,0 +1,160 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.support;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.alerts.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
*/
public class StringTemplateUtils extends AbstractComponent {
private final ScriptServiceProxy scriptService;
@Inject
public StringTemplateUtils(Settings settings, ScriptServiceProxy scriptService) {
super(settings);
this.scriptService = scriptService;
}
public String executeTemplate(Template template) {
return executeTemplate(template, Collections.<String, Object>emptyMap());
}
public String executeTemplate(Template template, Map<String, Object> additionalParams) {
Map<String, Object> params = new HashMap<>();
params.putAll(template.getParams());
params.putAll(additionalParams);
ExecutableScript script = scriptService.executable(template.getLanguage(), template.getTemplate(), template.getScriptType(), params);
Object result = script.run();
if (result instanceof String) {
return (String) result;
} else if (result instanceof BytesReference) {
return ((BytesReference) script.run()).toUtf8();
} else {
return result.toString();
}
}
public static Template readTemplate(XContentParser parser) throws IOException {
assert parser.currentToken() == XContentParser.Token.START_OBJECT : "Expected START_OBJECT, but was " + parser.currentToken();
Map<String, String> params = null;
String script = null;
ScriptService.ScriptType type = ScriptService.ScriptType.INLINE;
String language = "mustache";
String fieldName = parser.currentName();
for (XContentParser.Token token = parser.nextToken(); token != XContentParser.Token.END_OBJECT; token = parser.nextToken()) {
switch (token) {
case FIELD_NAME:
fieldName = parser.currentName();
break;
case START_OBJECT:
switch (fieldName) {
case "params":
params = (Map) parser.map();
break;
default:
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + fieldName + "]");
}
break;
case VALUE_STRING:
switch (fieldName) {
case "script":
script = parser.text();
break;
case "language":
language = parser.text();
break;
case "type":
type = readScriptType(parser.text());
break;
default:
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + fieldName + "]");
}
break;
default:
throw new ElasticsearchIllegalArgumentException("Unexpected json token [" + token + "]");
}
}
return new Template(script, params, language, type);
}
public static void writeTemplate(String objectName, Template template, XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject(objectName);
builder.field("script", template.getTemplate());
builder.field("type", writeScriptType(template.getScriptType()));
builder.field("language", template.getLanguage());
if (template.getParams() != null && !template.getParams().isEmpty()) {
builder.field("params", template.getParams());
}
builder.endObject();
}
private static ScriptService.ScriptType readScriptType(String value) {
switch (value) {
case "indexed":
return ScriptService.ScriptType.INDEXED;
case "inline":
return ScriptService.ScriptType.INLINE;
case "file":
return ScriptService.ScriptType.FILE;
default:
throw new ElasticsearchIllegalArgumentException("Unknown script_type value [" + value + "]");
}
}
private static String writeScriptType(ScriptService.ScriptType value) {
switch (value) {
case INDEXED:
return "indexed";
case INLINE:
return "inline";
case FILE:
return "file";
default:
throw new ElasticsearchIllegalArgumentException("Illegal script_type value [" + value + "]");
}
}
public static class Template {
private final String template;
private final Map<String, String> params;
private final String language;
private final ScriptService.ScriptType scriptType;
public Template(String template) {
this.template = template;
this.params = Collections.emptyMap();
this.language = "mustache";
this.scriptType = ScriptService.ScriptType.INLINE;
}
public Template(String template, Map<String, String> params, String language, ScriptService.ScriptType scriptType) {
this.template = template;
this.params = params;
this.language = language;
this.scriptType = scriptType;
}
public ScriptService.ScriptType getScriptType() {
return scriptType;
}
public String getTemplate() {
return template;
}
public String getLanguage() {
return language;
}
public Map<String, String> getParams() {
return params;
}
}
}

View File

@ -87,7 +87,7 @@ public class AlertActionsTest extends AbstractAlertingTests {
builder.field(HistoryService.ACTIONS_FIELD, actionMap);
builder.field(HistoryService.STATE, AlertActionState.SEARCH_NEEDED.toString());
builder.endObject();
final AlertActionRegistry alertActionRegistry = internalTestCluster().getInstance(AlertActionRegistry.class, internalTestCluster().getMasterName());
final ActionRegistry alertActionRegistry = internalTestCluster().getInstance(ActionRegistry.class, internalTestCluster().getMasterName());
final HistoryService alertManager = internalTestCluster().getInstance(HistoryService.class, internalTestCluster().getMasterName());
AlertRecord actionEntry = alertManager.parseHistory("foobar", builder.bytes(), 0, alertActionRegistry);