introduced AlertContext

Represents the context of an alert run. It's passed as an argument to all the different constructs that execute during an alert run - trigger, throttler, transform and action. This will provide each execution phase access to to all the results of the previous phase. It also holds the current payload in the execution.

Action results representing failures now hold the `reason` for the failure. This will provide insight on failed action execution as these messages will end up in the fired alert history.

Original commit: elastic/x-pack-elasticsearch@6846a49247
This commit is contained in:
uboness 2015-02-07 14:07:59 +01:00
parent 157c7b6fd6
commit ec42ec8fdc
17 changed files with 336 additions and 161 deletions

View File

@ -0,0 +1,88 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts;
import org.elasticsearch.alerts.actions.Action;
import org.elasticsearch.alerts.throttle.Throttler;
import org.elasticsearch.alerts.transform.Transform;
import org.elasticsearch.alerts.trigger.Trigger;
import org.elasticsearch.common.joda.time.DateTime;
import java.util.HashMap;
import java.util.Map;
/**
*
*/
public class AlertContext {
private final Alert alert;
private final DateTime fireTime;
private final DateTime scheduledTime;
private Payload payload;
private Trigger.Result triggerResult;
private Throttler.Result throttleResult;
private Transform.Result transformResult;
private Map<String, Action.Result> actionsResults = new HashMap<>();
public AlertContext(Alert alert, DateTime fireTime, DateTime scheduledTime) {
this.alert = alert;
this.fireTime = fireTime;
this.scheduledTime = scheduledTime;
}
public Alert alert() {
return alert;
}
public DateTime fireTime() {
return fireTime;
}
public DateTime scheduledTime() {
return scheduledTime;
}
public Payload payload() {
return payload;
}
public void triggerResult(Trigger.Result triggerResult) {
this.triggerResult = triggerResult;
this.payload = triggerResult.payload();
}
public Trigger.Result triggerResult() {
return triggerResult;
}
public void throttleResult(Throttler.Result throttleResult) {
this.throttleResult = throttleResult;
}
public Throttler.Result throttleResult() {
return throttleResult;
}
public void transformResult(Transform.Result transformResult) {
this.transformResult = transformResult;
this.payload = transformResult.payload();
}
public Transform.Result transformResult() {
return transformResult;
}
public void addActionResult(Action.Result result) {
actionsResults.put(result.type(), result);
}
public Map<String, Action.Result> actionsResults() {
return actionsResults;
}
}

View File

@ -14,6 +14,7 @@ import org.elasticsearch.alerts.history.FiredAlert;
import org.elasticsearch.alerts.history.HistoryService;
import org.elasticsearch.alerts.scheduler.Scheduler;
import org.elasticsearch.alerts.throttle.Throttler;
import org.elasticsearch.alerts.transform.Transform;
import org.elasticsearch.alerts.trigger.Trigger;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
@ -31,6 +32,7 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
@ -166,18 +168,25 @@ public class AlertsService extends AbstractComponent {
if (alert == null) {
throw new ElasticsearchException("Alert is not available");
}
Trigger trigger = alert.trigger();
Trigger.Result triggerResult = trigger.execute(alert, entry.scheduledTime(), entry.fireTime());
AlertRun alertRun = null;
AlertContext ctx = new AlertContext(alert, entry.fireTime(), entry.scheduledTime());
Trigger.Result triggerResult = alert.trigger().execute(ctx);
ctx.triggerResult(triggerResult);
if (triggerResult.triggered()) {
alert.status().onTrigger(true, entry.fireTime());
Throttler.Result throttleResult = alert.throttler().throttle(alert, triggerResult);
Throttler.Result throttleResult = alert.throttler().throttle(ctx, triggerResult);
ctx.throttleResult(throttleResult);
if (!throttleResult.throttle()) {
Payload payload = alert.transform().apply(alert, triggerResult, triggerResult.payload(), entry.scheduledTime(), entry.fireTime());
alertRun = new AlertRun(triggerResult, throttleResult, payload);
Transform.Result result = alert.transform().apply(ctx, triggerResult.payload());
ctx.transformResult(result);
for (Action action : alert.actions()){
Action.Result actionResult = action.execute(alert, payload);
//TODO : process action result, what to do if just one action fails or throws exception ?
Action.Result actionResult = action.execute(ctx, result.payload());
ctx.addActionResult(actionResult);
}
alert.status().onExecution(entry.scheduledTime());
} else {
@ -186,12 +195,9 @@ public class AlertsService extends AbstractComponent {
} else {
alert.status().onTrigger(false, entry.fireTime());
}
if (alertRun == null) {
alertRun = new AlertRun(triggerResult, null, null);
}
alert.status().onRun(entry.fireTime());
alertsStore.updateAlert(alert);
return alertRun;
return new AlertRun(ctx);
} finally {
alertLock.release(entry.name());
}
@ -357,12 +363,14 @@ public class AlertsService extends AbstractComponent {
private final Trigger.Result triggerResult;
private final Throttler.Result throttleResult;
private final Map<String, Action.Result> actionsResults;
private final Payload payload;
public AlertRun(Trigger.Result triggerResult, Throttler.Result throttleResult, Payload payload) {
this.triggerResult = triggerResult;
this.throttleResult = throttleResult;
this.payload = payload;
public AlertRun(AlertContext context) {
triggerResult = context.triggerResult();
throttleResult = context.throttleResult();
actionsResults = context.actionsResults();
payload = context.payload();
}
public Trigger.Result triggerResult() {
@ -373,6 +381,10 @@ public class AlertsService extends AbstractComponent {
return throttleResult;
}
public Map<String, Action.Result> actionsResults() {
return actionsResults;
}
public Payload payload() {
return payload;
}

View File

@ -67,5 +67,4 @@ public interface Payload extends ToXContent {
}
}
}
}

View File

@ -5,10 +5,11 @@
*/
package org.elasticsearch.alerts.actions;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.AlertContext;
import org.elasticsearch.alerts.Payload;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
@ -34,7 +35,7 @@ public abstract class Action<R extends Action.Result> implements ToXContent {
/**
* Executes this action
*/
public abstract R execute(Alert alert, Payload payload) throws IOException;
public abstract R execute(AlertContext context, Payload payload) throws IOException;
/**
@ -57,11 +58,10 @@ public abstract class Action<R extends Action.Result> implements ToXContent {
public static abstract class Result implements ToXContent {
private final boolean success;
protected final String type;
protected final boolean success;
private final String type;
public Result(String type, boolean success) {
protected Result(String type, boolean success) {
this.type = type;
this.success = success;
}
@ -73,5 +73,15 @@ public abstract class Action<R extends Action.Result> implements ToXContent {
public boolean success() {
return success;
}
@Override
public final XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("success", success);
xContentBody(builder, params);
return builder.endObject();
}
protected abstract XContentBuilder xContentBody(XContentBuilder builder, Params params) throws IOException;
}
}

View File

@ -22,7 +22,7 @@ public class ActionModule extends AbstractModule {
private final Map<String, Class<? extends Action.Parser>> parsers = new HashMap<>();
public void registerTrigger(String type, Class<? extends Action.Parser> parserType) {
public void registerAction(String type, Class<? extends Action.Parser> parserType) {
parsers.put(type, parserType);
}

View File

@ -5,7 +5,7 @@
*/
package org.elasticsearch.alerts.actions.email;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.AlertContext;
import org.elasticsearch.alerts.Payload;
import org.elasticsearch.alerts.actions.Action;
import org.elasticsearch.alerts.actions.ActionException;
@ -33,7 +33,7 @@ public class EmailAction extends Action<EmailAction.Result> {
public static final String TYPE = "email";
private final List<Address> emailAddresses;
private final List<InternetAddress> emailAddresses;
//Optional, can be null, will use defaults from emailSettings (EmailServiceConfig)
private final String fromAddress;
@ -53,7 +53,7 @@ public class EmailAction extends Action<EmailAction.Result> {
protected EmailAction(ESLogger logger, EmailSettingsService emailSettingsService,
StringTemplateUtils templateUtils, @Nullable StringTemplateUtils.Template subjectTemplate,
@Nullable StringTemplateUtils.Template messageTemplate, @Nullable String fromAddress,
List<Address> emailAddresses) {
List<InternetAddress> emailAddresses) {
super(logger);
this.templateUtils = templateUtils;
@ -72,7 +72,7 @@ public class EmailAction extends Action<EmailAction.Result> {
}
@Override
public Result execute(Alert alert, Payload payload) throws IOException {
public Result execute(AlertContext ctx, Payload payload) throws IOException {
final EmailSettingsService.EmailServiceConfig emailSettings = emailSettingsService.emailServiceConfig();
@ -92,7 +92,8 @@ public class EmailAction extends Action<EmailAction.Result> {
}
if (username == null) {
throw new ActionException("unable to send email for alert [" + alert.name() + "]. username or the default from address is not set");
return new Result.Failure("unable to send email for alert [" +
ctx.alert().name() + "]. username or the default [from] address is not set");
}
session = Session.getInstance(props,
@ -105,6 +106,9 @@ public class EmailAction extends Action<EmailAction.Result> {
session = Session.getDefaultInstance(props);
}
String subject = null;
String body = null;
try {
Message email = new MimeMessage(session);
@ -118,25 +122,27 @@ public class EmailAction extends Action<EmailAction.Result> {
email.setRecipients(Message.RecipientType.TO, emailAddresses.toArray(new Address[1]));
Map<String, Object> alertParams = new HashMap<>();
alertParams.put(Action.ALERT_NAME_VARIABLE_NAME, alert.name());
alertParams.put(Action.ALERT_NAME_VARIABLE_NAME, ctx.alert().name());
alertParams.put(RESPONSE_VARIABLE_NAME, payload.data());
String subject = templateUtils.executeTemplate(
subject = templateUtils.executeTemplate(
subjectTemplate != null ? subjectTemplate : DEFAULT_SUBJECT_TEMPLATE,
alertParams);
email.setSubject(subject);
String message = templateUtils.executeTemplate(
body = templateUtils.executeTemplate(
messageTemplate != null ? messageTemplate : DEFAULT_MESSAGE_TEMPLATE,
alertParams);
email.setText(message);
email.setText(body);
Transport.send(email);
return new Result(true, fromAddressToUse, emailAddresses, subject, message );
} catch (Throwable e) {
throw new ActionException("failed to send mail for alert [" + alert.name() + "]", e);
return new Result.Success(fromAddressToUse, emailAddresses, subject, body);
} catch (MessagingException me) {
logger.error("failed to send mail for alert [{}]", me, ctx.alert().name());
return new Result.Failure(me.getMessage());
}
}
@ -196,7 +202,7 @@ public class EmailAction extends Action<EmailAction.Result> {
StringTemplateUtils.Template messageTemplate = null;
String fromAddress = null;
List<Address> addresses = new ArrayList<>();
List<InternetAddress> addresses = new ArrayList<>();
String currentFieldName = null;
XContentParser.Token token;
@ -234,54 +240,71 @@ public class EmailAction extends Action<EmailAction.Result> {
throw new ActionException("could not parse email action. [addresses] was not found or was empty");
}
return new EmailAction(logger, emailSettingsService, templateUtils, subjectTemplate,
messageTemplate, fromAddress, addresses);
return new EmailAction(logger, emailSettingsService, templateUtils, subjectTemplate, messageTemplate, fromAddress, addresses);
}
}
public static abstract class Result extends Action.Result {
public static class Result extends Action.Result {
private final String from;
private final List<Address> recipients;
private final String subject;
private final String message;
public Result(boolean success, String from, List<Address> recipients, String subject, String message) {
super(TYPE, success);
this.from = from;
this.recipients = recipients;
this.subject = subject;
this.message = message;
public Result(String type, boolean success) {
super(type, success);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("fromAddress", from());
builder.field("subject", subject());
builder.array("to", recipients());
builder.field("message", message());
builder.field("success", success());
builder.endObject();
return builder;
public static class Success extends Result {
private final String from;
private final List<InternetAddress> recipients;
private final String subject;
private final String body;
private Success(String from, List<InternetAddress> recipients, String subject, String body) {
super(TYPE, true);
this.from = from;
this.recipients = recipients;
this.subject = subject;
this.body = body;
}
@Override
public XContentBuilder xContentBody(XContentBuilder builder, Params params) throws IOException {
builder.field("fromAddress", from);
builder.field("subject", subject);
builder.array("to", recipients);
builder.field("body", body);
return builder;
}
public String from() {
return from;
}
public String subject() {
return subject;
}
public String body() {
return body;
}
public List<InternetAddress> recipients() {
return recipients;
}
}
public String from() {
return from;
}
public static class Failure extends Result {
public String subject() {
return subject;
}
private final String reason;
public String message() {
return message;
}
public Failure(String reason) {
super(TYPE, false);
this.reason = reason;
}
public List<Address> recipients() {
return recipients;
@Override
protected XContentBuilder xContentBody(XContentBuilder builder, Params params) throws IOException {
return builder.field("reason", reason);
}
}
}
}

View File

@ -5,9 +5,10 @@
*/
package org.elasticsearch.alerts.actions.index;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.AlertContext;
import org.elasticsearch.alerts.Payload;
import org.elasticsearch.alerts.actions.Action;
import org.elasticsearch.alerts.actions.ActionException;
@ -49,7 +50,7 @@ public class IndexAction extends Action<IndexAction.Result> {
}
@Override
public Result execute(Alert alert, Payload payload) throws IOException {
public Result execute(AlertContext ctx, Payload payload) throws IOException {
IndexRequest indexRequest = new IndexRequest();
indexRequest.index(index);
indexRequest.type(type);
@ -57,14 +58,20 @@ public class IndexAction extends Action<IndexAction.Result> {
XContentBuilder resultBuilder = XContentFactory.jsonBuilder().prettyPrint();
resultBuilder.startObject();
resultBuilder.field("data", payload.data());
resultBuilder.field("timestamp", alert.status().lastExecuted());
resultBuilder.field("timestamp", ctx.alert().status().lastExecuted());
resultBuilder.endObject();
indexRequest.source(resultBuilder);
} catch (IOException ie) {
throw new ActionException("failed to index result for alert [" + alert.name() + " ]", ie);
} catch (IOException ioe) {
logger.error("failed to index result for alert [{}]", ctx.alert().name());
return new Result(null, "failed ot build index request. " + ioe.getMessage());
}
return new Result(client.index(indexRequest).actionGet());
try {
return new Result(client.index(indexRequest).actionGet(), null);
} catch (ElasticsearchException e) {
logger.error("failed to index result for alert [{}]", ctx.alert().name());
return new Result(null, "failed ot build index request. " + e.getMessage());
}
}
@Override
@ -132,10 +139,12 @@ public class IndexAction extends Action<IndexAction.Result> {
public static class Result extends Action.Result {
private final IndexResponse response;
private final String reason;
public Result(IndexResponse response) {
public Result(IndexResponse response, String reason) {
super(TYPE, response.isCreated());
this.response = response;
this.reason = reason;
}
public IndexResponse response() {
@ -143,13 +152,13 @@ public class IndexAction extends Action<IndexAction.Result> {
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("success", success());
builder.field("index_response", responseToData(response()));
builder.endObject();
return builder;
protected XContentBuilder xContentBody(XContentBuilder builder, Params params) throws IOException {
if (reason != null) {
builder.field("reason", reason);
}
return builder.field("index_response", responseToData(response()));
}
}
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.alerts.actions.webhook;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.AlertContext;
import org.elasticsearch.alerts.Payload;
import org.elasticsearch.alerts.actions.Action;
import org.elasticsearch.alerts.actions.ActionException;
@ -61,12 +62,12 @@ public class WebhookAction extends Action<WebhookAction.Result> {
}
@Override
public Result execute(Alert alert, Payload payload) throws IOException {
public Result execute(AlertContext ctx, Payload payload) throws IOException {
Map<String, Object> data = payload.data();
String renderedUrl = applyTemplate(urlTemplate, alert, data);
String renderedUrl = applyTemplate(urlTemplate, ctx.alert(), data);
String body = applyTemplate(bodyTemplate != null ? bodyTemplate : DEFAULT_BODY_TEMPLATE, ctx.alert(), data);
try {
String body = applyTemplate(bodyTemplate != null ? bodyTemplate : DEFAULT_BODY_TEMPLATE, alert, data);
int status = httpClient.execute(method, renderedUrl, body);
if (status >= 400) {
logger.warn("got status [" + status + "] when connecting to [" + renderedUrl + "]");
@ -75,9 +76,10 @@ public class WebhookAction extends Action<WebhookAction.Result> {
logger.warn("a 200 range return code was expected, but got [" + status + "]");
}
}
return new Result(status < 400, status, renderedUrl, body);
return new Result.Executed(status, renderedUrl, body);
} catch (IOException ioe) {
throw new ActionException("failed to connect to [" + renderedUrl + "] for alert [" + alert.name() + "]", ioe);
logger.error("failed to connect to [{}] for alert [{}]", ioe, renderedUrl, ctx.alert().name());
return new Result.Failure("failed to send http request. " + ioe.getMessage());
}
}
@ -99,40 +101,59 @@ public class WebhookAction extends Action<WebhookAction.Result> {
return builder;
}
public static class Result extends Action.Result {
public abstract static class Result extends Action.Result {
private final int httpStatusCode;
private final String url;
private final String body;
public Result(boolean success, int httpStatusCode, String url, String body) {
super(TYPE, success);
this.httpStatusCode = httpStatusCode;
this.url = url;
this.body = body;
public Result(String type, boolean success) {
super(type, success);
}
public int httpStatusCode() {
return httpStatusCode;
public static class Executed extends Result {
private final int httpStatusCode;
private final String url;
private final String body;
public Executed(int httpStatusCode, String url, String body) {
super(TYPE, httpStatusCode < 400);
this.httpStatusCode = httpStatusCode;
this.url = url;
this.body = body;
}
public int httpStatusCode() {
return httpStatusCode;
}
public String url() {
return url;
}
public String body() {
return body;
}
@Override
protected XContentBuilder xContentBody(XContentBuilder builder, Params params) throws IOException {
return builder.field("success", success())
.field("http_status", httpStatusCode)
.field("url", url)
.field("body", body);
}
}
public String url() {
return url;
}
public static class Failure extends Result {
public String body() {
return body;
}
private final String reason;
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("success", success());
builder.field("http_status", httpStatusCode());
builder.field("url", url());
builder.field("body", body());
builder.endObject();
return builder;
public Failure(String reason) {
super(TYPE, false);
this.reason = reason;
}
@Override
protected XContentBuilder xContentBody(XContentBuilder builder, Params params) throws IOException {
return builder.field("reason", reason);
}
}
}

View File

@ -5,7 +5,7 @@
*/
package org.elasticsearch.alerts.throttle;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.AlertContext;
import org.elasticsearch.alerts.trigger.Trigger;
import static org.elasticsearch.alerts.support.AlertsDateUtils.formatDate;
@ -15,12 +15,10 @@ import static org.elasticsearch.alerts.support.AlertsDateUtils.formatDate;
*/
public class AckThrottler implements Throttler {
@Override
public Result throttle(Alert alert, Trigger.Result result) {
if (alert.status().acked()) {
return Result.throttle("alert [" + alert.name() + "] was acked at [" + formatDate(alert.status().ack().timestamp()) + "]");
public Result throttle(AlertContext ctx, Trigger.Result result) {
if (ctx.alert().status().acked()) {
return Result.throttle("alert [" + ctx.alert().name() + "] was acked at [" + formatDate(ctx.alert().status().ack().timestamp()) + "]");
}
return Result.NO;
}

View File

@ -5,7 +5,7 @@
*/
package org.elasticsearch.alerts.throttle;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.AlertContext;
import org.elasticsearch.alerts.trigger.Trigger;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.TimeValue;
@ -24,13 +24,13 @@ public class AlertThrottler implements Throttler {
}
@Override
public Result throttle(Alert alert, Trigger.Result result) {
public Result throttle(AlertContext ctx, Trigger.Result result) {
if (periodThrottler != null) {
Result throttleResult = periodThrottler.throttle(alert, result);
Result throttleResult = periodThrottler.throttle(ctx, result);
if (throttleResult.throttle()) {
return throttleResult;
}
}
return ACK_THROTTLER.throttle(alert, result);
return ACK_THROTTLER.throttle(ctx, result);
}
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.alerts.throttle;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.AlertContext;
import org.elasticsearch.alerts.trigger.Trigger;
import org.elasticsearch.common.joda.time.PeriodType;
import org.elasticsearch.common.unit.TimeValue;
@ -32,8 +33,8 @@ public class PeriodThrottler implements Throttler {
}
@Override
public Result throttle(Alert alert, Trigger.Result result) {
Alert.Status status = alert.status();
public Result throttle(AlertContext ctx, Trigger.Result result) {
Alert.Status status = ctx.alert().status();
TimeValue timeElapsed = new TimeValue(System.currentTimeMillis() - status.lastRan().getMillis());
if (timeElapsed.getMillis() <= period.getMillis()) {
return Result.throttle("throttling interval is set to [" + period.format(periodType) +

View File

@ -5,7 +5,7 @@
*/
package org.elasticsearch.alerts.throttle;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.AlertContext;
import org.elasticsearch.alerts.trigger.Trigger;
/**
@ -15,12 +15,12 @@ public interface Throttler {
public static final Throttler NO_THROTTLE = new Throttler() {
@Override
public Result throttle(Alert Alert, Trigger.Result result) {
public Result throttle(AlertContext ctx, Trigger.Result result) {
return Result.NO;
}
};
Result throttle(Alert alert, Trigger.Result result);
Result throttle(AlertContext ctx, Trigger.Result result);
static class Result {

View File

@ -7,12 +7,11 @@ package org.elasticsearch.alerts.transform;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.AlertContext;
import org.elasticsearch.alerts.Payload;
import org.elasticsearch.alerts.support.AlertUtils;
import org.elasticsearch.alerts.support.init.proxy.ClientProxy;
import org.elasticsearch.alerts.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.alerts.trigger.Trigger;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.MapBuilder;
@ -20,6 +19,7 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
@ -57,14 +57,14 @@ public class SearchTransform implements Transform {
}
@Override
public Payload apply(Alert alert, Trigger.Result result, Payload payload, DateTime scheduledFireTime, DateTime fireTime) throws IOException {
SearchRequest req = createRequest(request, scheduledFireTime, fireTime, payload.data());
public Transform.Result apply(AlertContext ctx, Payload payload) throws IOException {
SearchRequest req = createRequest(request, ctx.scheduledTime(), ctx.fireTime(), payload.data());
SearchResponse resp = client.search(req).actionGet();
return new Payload.ActionResponse(resp);
return new Transform.Result(TYPE, new Payload.ActionResponse(resp));
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
AlertUtils.writeSearchRequest(request, builder, params);
return builder;
}

View File

@ -5,10 +5,8 @@
*/
package org.elasticsearch.alerts.transform;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.AlertContext;
import org.elasticsearch.alerts.Payload;
import org.elasticsearch.alerts.trigger.Trigger;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
@ -27,8 +25,8 @@ public interface Transform extends ToXContent {
}
@Override
public Payload apply(Alert alert, Trigger.Result result, Payload payload, DateTime scheduledFireTime, DateTime fireTime) throws IOException {
return payload;
public Result apply(AlertContext context, Payload payload) throws IOException {
return new Result("noop", payload);
}
@Override
@ -39,13 +37,32 @@ public interface Transform extends ToXContent {
String type();
Payload apply(Alert alert, Trigger.Result result, Payload payload, DateTime scheduledFireTime, DateTime fireTime) throws IOException;
Result apply(AlertContext context, Payload payload) throws IOException;
static interface Parser<P extends Transform> {
static class Result {
private final String type;
private final Payload payload;
public Result(String type, Payload payload) {
this.type = type;
this.payload = payload;
}
public String type() {
return type;
}
public Payload payload() {
return payload;
}
}
static interface Parser<T extends Transform> {
String type();
P parse(XContentParser parser) throws IOException;
T parse(XContentParser parser) throws IOException;
}

View File

@ -5,9 +5,8 @@
*/
package org.elasticsearch.alerts.trigger;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.AlertContext;
import org.elasticsearch.alerts.Payload;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentParser;
@ -33,7 +32,7 @@ public abstract class Trigger<R extends Trigger.Result> implements ToXContent {
/**
* Executes this trigger
*/
public abstract R execute(Alert alert, DateTime scheduledFireTime, DateTime fireTime) throws IOException;
public abstract R execute(AlertContext ctx) throws IOException;
/**

View File

@ -7,13 +7,12 @@ package org.elasticsearch.alerts.trigger.search;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.AlertContext;
import org.elasticsearch.alerts.Payload;
import org.elasticsearch.alerts.support.AlertUtils;
import org.elasticsearch.alerts.support.init.proxy.ClientProxy;
import org.elasticsearch.alerts.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.alerts.trigger.Trigger;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.search.SearchHit;
@ -34,19 +33,19 @@ public abstract class SearchTrigger extends Trigger<SearchTrigger.Result> {
}
@Override
public Result execute(Alert alert, DateTime scheduledFireTime, DateTime fireTime) throws IOException {
SearchRequest request = AlertUtils.createSearchRequestWithTimes(this.request, scheduledFireTime, fireTime, scriptService);
public Result execute(AlertContext ctx) throws IOException {
SearchRequest request = AlertUtils.createSearchRequestWithTimes(this.request, ctx.scheduledTime(), ctx.fireTime(), scriptService);
if (logger.isTraceEnabled()) {
logger.trace("For alert [{}] running query for [{}]", alert.name(), XContentHelper.convertToJson(request.source(), false, true));
logger.trace("running query for [{}]", ctx.alert().name(), XContentHelper.convertToJson(request.source(), false, true));
}
// actionGet deals properly with InterruptedException
SearchResponse response = client.search(request).actionGet();
if (logger.isDebugEnabled()) {
logger.debug("Ran alert [{}] and got hits : [{}]", alert.name(), response.getHits().getTotalHits());
logger.debug("got [{}] hits", ctx.alert().name(), response.getHits().getTotalHits());
for (SearchHit hit : response.getHits()) {
logger.debug("Hit: {}", XContentHelper.toString(hit));
logger.debug("hit [{}]", XContentHelper.toString(hit));
}
}

View File

@ -5,12 +5,11 @@
*/
package org.elasticsearch.alerts.trigger.simple;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.AlertContext;
import org.elasticsearch.alerts.Payload;
import org.elasticsearch.alerts.trigger.Trigger;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -38,7 +37,7 @@ public class SimpleTrigger extends Trigger<SimpleTrigger.Result> {
}
@Override
public Result execute(Alert alert, DateTime scheduledFireTime, DateTime fireTime) throws IOException {
public Result execute(AlertContext ctx) throws IOException {
return new Result(payload);
}