Added a transform construct to actions

An alert may have multiple actions associated with it, but each action may need to have different view of the data. For example, the email may need a certain model for its templates while the index or webhook actions may need a completely different data structure.

Until now, there's only an option to defina a single `transform` on the alert level that would have applied to all actions. This commit adds
 the ability to associate a transform with each action. We still keep the tranform on the alert level (in case all actions need the same transformation, in which case we'd like to avoid repetition).

Original commit: elastic/x-pack-elasticsearch@5493a2179b
This commit is contained in:
uboness 2015-03-03 14:02:22 +01:00
parent d25bf008b3
commit f6c17bd802
15 changed files with 406 additions and 90 deletions

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.alerts;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
@ -30,6 +31,10 @@ public interface Payload extends ToXContent {
this(new HashMap<String, Object>());
}
public Simple(String key, Object value) {
this(new MapBuilder<String, Object>().put(key, value).map());
}
public Simple(Map<String, Object> data) {
this.data = data;
}

View File

@ -7,6 +7,7 @@ package org.elasticsearch.alerts.actions;
import org.elasticsearch.alerts.ExecutionContext;
import org.elasticsearch.alerts.Payload;
import org.elasticsearch.alerts.transform.Transform;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.xcontent.ToXContent;
@ -20,9 +21,18 @@ import java.io.IOException;
public abstract class Action<R extends Action.Result> implements ToXContent {
protected final ESLogger logger;
protected final Transform transform;
protected Action(ESLogger logger) {
protected Action(ESLogger logger, Transform transform) {
this.logger = logger;
this.transform = transform;
}
/**
* @return the transform associated with this action (may be {@code null})
*/
public Transform transform() {
return transform;
}
/**
@ -33,7 +43,20 @@ public abstract class Action<R extends Action.Result> implements ToXContent {
/**
* Executes this action
*/
public abstract R execute(ExecutionContext context, Payload payload) throws IOException;
public final R execute(ExecutionContext context, Payload payload) throws IOException {
Transform.Result transformResult = null;
if (transform != null) {
transformResult = transform.apply(context, payload);
payload = transformResult.payload();
}
R result = doExecute(context, payload);
if (transformResult != null) {
result.transformResult = transformResult;
}
return result;
}
protected abstract R doExecute(ExecutionContext context, Payload payload) throws IOException;
/**
* Parses xcontent to a concrete action of the same type.
@ -60,6 +83,8 @@ public abstract class Action<R extends Action.Result> implements ToXContent {
protected final String type;
protected final boolean success;
protected Transform.Result transformResult;
protected Result(String type, boolean success) {
this.type = type;
this.success = success;
@ -73,10 +98,17 @@ public abstract class Action<R extends Action.Result> implements ToXContent {
return success;
}
public Transform.Result transformResult() {
return transformResult;
}
@Override
public final XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(SUCCESS_FIELD.getPreferredName(), success);
if (transformResult != null) {
builder.field(Transform.Parser.TRANSFORM_RESULT_FIELD.getPreferredName(), transformResult);
}
xContentBody(builder, params);
return builder.endObject();
}

View File

@ -12,6 +12,9 @@ import org.elasticsearch.alerts.actions.ActionSettingsException;
import org.elasticsearch.alerts.actions.email.service.*;
import org.elasticsearch.alerts.support.Variables;
import org.elasticsearch.alerts.support.template.Template;
import org.elasticsearch.alerts.transform.Transform;
import org.elasticsearch.alerts.transform.TransformRegistry;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
@ -42,10 +45,10 @@ public class EmailAction extends Action<EmailAction.Result> {
final EmailService emailService;
public EmailAction(ESLogger logger, EmailService emailService, Email emailPrototype, Authentication auth, Profile profile,
public EmailAction(ESLogger logger, @Nullable Transform transform, EmailService emailService, Email emailPrototype, Authentication auth, Profile profile,
String account, Template subject, Template textBody, Template htmlBody, boolean attachPayload) {
super(logger);
super(logger, transform);
this.emailService = emailService;
this.emailPrototype = emailPrototype;
this.auth = auth;
@ -63,7 +66,7 @@ public class EmailAction extends Action<EmailAction.Result> {
}
@Override
public Result execute(ExecutionContext ctx, Payload payload) throws IOException {
public Result doExecute(ExecutionContext ctx, Payload payload) throws IOException {
Map<String, Object> model = Variables.createCtxModel(ctx, payload);
Email.Builder email = Email.builder()
@ -94,7 +97,7 @@ public class EmailAction extends Action<EmailAction.Result> {
@Override
public int hashCode() {
return Objects.hash(emailPrototype, auth, profile, account, subject, textBody, htmlBody, attachPayload);
return Objects.hash(emailPrototype, auth, profile, account, subject, textBody, htmlBody, attachPayload, transform);
}
@Override
@ -113,12 +116,18 @@ public class EmailAction extends Action<EmailAction.Result> {
&& Objects.equals(this.subject, other.subject)
&& Objects.equals(this.textBody, other.textBody)
&& Objects.equals(this.htmlBody, other.htmlBody)
&& Objects.equals(this.attachPayload, other.attachPayload);
&& Objects.equals(this.attachPayload, other.attachPayload)
&& Objects.equals(this.transform, other.transform);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (transform != null) {
builder.startObject(Transform.Parser.TRANSFORM_FIELD.getPreferredName())
.field(transform.type(), transform)
.endObject();
}
if (account != null) {
builder.field(Parser.ACCOUNT_FIELD.getPreferredName(), account);
}
@ -172,12 +181,14 @@ public class EmailAction extends Action<EmailAction.Result> {
private final Template.Parser templateParser;
private final EmailService emailService;
private final TransformRegistry transformRegistry;
@Inject
public Parser(Settings settings, EmailService emailService, Template.Parser templateParser) {
public Parser(Settings settings, EmailService emailService, Template.Parser templateParser, TransformRegistry transformRegistry) {
super(settings);
this.emailService = emailService;
this.templateParser = templateParser;
this.transformRegistry = transformRegistry;
}
@Override
@ -187,6 +198,7 @@ public class EmailAction extends Action<EmailAction.Result> {
@Override
public EmailAction parse(XContentParser parser) throws IOException {
Transform transform = null;
String user = null;
String password = null;
String account = null;
@ -251,6 +263,12 @@ public class EmailAction extends Action<EmailAction.Result> {
} else {
throw new ActionSettingsException("could not parse email action. unrecognized boolean field [" + currentFieldName + "]");
}
} else if (token == XContentParser.Token.START_OBJECT) {
if (Transform.Parser.TRANSFORM_FIELD.match(currentFieldName)) {
transform = transformRegistry.parse(parser);
} else {
throw new ActionSettingsException("could not parse email action. unexpected object field [" + currentFieldName + "]");
}
} else {
throw new ActionSettingsException("could not parse email action. unexpected token [" + token + "]");
}
@ -259,20 +277,19 @@ public class EmailAction extends Action<EmailAction.Result> {
Authentication auth = user != null ? new Authentication(user, password) : null;
return new EmailAction(logger, emailService, email.build(), auth, profile, account, subject, textBody, htmlBody, attachPayload);
return new EmailAction(logger, transform, emailService, email.build(), auth, profile, account, subject, textBody, htmlBody, attachPayload);
}
@Override
public EmailAction.Result parseResult(XContentParser parser) throws IOException {
String currentFieldName = null;
XContentParser.Token token;
Transform.Result transformResult = null;
Boolean success = null;
Email email = null;
String account = null;
String reason = null;
XContentParser.Token token;
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
@ -293,6 +310,8 @@ public class EmailAction extends Action<EmailAction.Result> {
} else if (token == XContentParser.Token.START_OBJECT) {
if (EMAIL_FIELD.match(currentFieldName)) {
email = Email.parse(parser);
} else if (Transform.Parser.TRANSFORM_RESULT_FIELD.match(currentFieldName)) {
transformResult = Transform.Result.parse(parser);
} else {
throw new EmailException("could not parse email result. unexpected field [" + currentFieldName + "]");
}
@ -305,7 +324,11 @@ public class EmailAction extends Action<EmailAction.Result> {
throw new EmailException("could not parse email result. expected field [success]");
}
return success ? new Result.Success(new EmailService.EmailSent(account, email)) : new Result.Failure(reason);
Result result = success ? new Result.Success(new EmailService.EmailSent(account, email)) : new Result.Failure(reason);
if (transformResult != null) {
result.transformResult(transformResult);
}
return result;
}
}
@ -315,6 +338,10 @@ public class EmailAction extends Action<EmailAction.Result> {
super(type, success);
}
void transformResult(Transform.Result result) {
this.transformResult = result;
}
public static class Success extends Result {
private final EmailService.EmailSent sent;

View File

@ -14,6 +14,9 @@ import org.elasticsearch.alerts.actions.Action;
import org.elasticsearch.alerts.actions.ActionException;
import org.elasticsearch.alerts.actions.ActionSettingsException;
import org.elasticsearch.alerts.support.init.proxy.ClientProxy;
import org.elasticsearch.alerts.transform.Transform;
import org.elasticsearch.alerts.transform.TransformRegistry;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
@ -38,8 +41,8 @@ public class IndexAction extends Action<IndexAction.Result> {
private final String index;
private final String type;
public IndexAction(ESLogger logger, ClientProxy client, String index, String type) {
super(logger);
public IndexAction(ESLogger logger, @Nullable Transform transform, ClientProxy client, String index, String type) {
super(logger, transform);
this.client = client;
this.index = index;
this.type = type;
@ -51,7 +54,7 @@ public class IndexAction extends Action<IndexAction.Result> {
}
@Override
public Result execute(ExecutionContext ctx, Payload payload) throws IOException {
public Result doExecute(ExecutionContext ctx, Payload payload) throws IOException {
IndexRequest indexRequest = new IndexRequest();
indexRequest.index(index);
indexRequest.type(type);
@ -85,6 +88,11 @@ public class IndexAction extends Action<IndexAction.Result> {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (transform != null) {
builder.startObject(Transform.Parser.TRANSFORM_FIELD.getPreferredName())
.field(transform.type(), transform)
.endObject();
}
builder.field(Parser.INDEX_FIELD.getPreferredName(), index);
builder.field(Parser.TYPE_FIELD.getPreferredName(), type);
builder.endObject();
@ -100,6 +108,7 @@ public class IndexAction extends Action<IndexAction.Result> {
if (index != null ? !index.equals(that.index) : that.index != null) return false;
if (type != null ? !type.equals(that.type) : that.type != null) return false;
if (transform != null ? !transform.equals(that.transform) : that.transform != null) return false;
return true;
}
@ -108,6 +117,7 @@ public class IndexAction extends Action<IndexAction.Result> {
public int hashCode() {
int result = index != null ? index.hashCode() : 0;
result = 31 * result + (type != null ? type.hashCode() : 0);
result = 31 * result + (transform != null ? transform.hashCode() : 0);
return result;
}
@ -118,13 +128,14 @@ public class IndexAction extends Action<IndexAction.Result> {
public static final ParseField REASON_FIELD = new ParseField("reason");
public static final ParseField RESPONSE_FIELD = new ParseField("response");
private final ClientProxy client;
private final TransformRegistry transformRegistry;
@Inject
public Parser(Settings settings, ClientProxy client) {
public Parser(Settings settings, ClientProxy client, TransformRegistry transformRegistry) {
super(settings);
this.client = client;
this.transformRegistry = transformRegistry;
}
@Override
@ -136,6 +147,7 @@ public class IndexAction extends Action<IndexAction.Result> {
public IndexAction parse(XContentParser parser) throws IOException {
String index = null;
String type = null;
Transform transform = null;
String currentFieldName = null;
XContentParser.Token token;
@ -150,6 +162,12 @@ public class IndexAction extends Action<IndexAction.Result> {
} else {
throw new ActionSettingsException("could not parse index action. unexpected field [" + currentFieldName + "]");
}
} else if (token == XContentParser.Token.START_OBJECT) {
if (Transform.Parser.TRANSFORM_FIELD.match(currentFieldName)) {
transform = transformRegistry.parse(parser);
} else {
throw new ActionSettingsException("could not parse index action. unexpected field [" + currentFieldName + "]");
}
} else {
throw new ActionSettingsException("could not parse index action. unexpected token [" + token + "]");
}
@ -163,17 +181,18 @@ public class IndexAction extends Action<IndexAction.Result> {
throw new ActionSettingsException("could not parse index action [type] is required");
}
return new IndexAction(logger, client, index, type);
return new IndexAction(logger, transform, client, index, type);
}
@Override
public Result parseResult(XContentParser parser) throws IOException {
String currentFieldName = null;
XContentParser.Token token;
Transform.Result transformResult = null;
Boolean success = null;
Payload payload = null;
String reason = null;
XContentParser.Token token;
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
@ -192,6 +211,8 @@ public class IndexAction extends Action<IndexAction.Result> {
} else if (token == XContentParser.Token.START_OBJECT) {
if (RESPONSE_FIELD.match(currentFieldName)) {
payload = new Payload.Simple(parser.map());
} else if (Transform.Parser.TRANSFORM_RESULT_FIELD.match(currentFieldName)) {
transformResult = Transform.Result.parse(parser);
} else {
throw new ActionException("could not parse index result. unexpected object field [" + currentFieldName + "]");
}
@ -204,7 +225,11 @@ public class IndexAction extends Action<IndexAction.Result> {
throw new ActionException("could not parse index result. expected boolean field [success]");
}
return new Result(payload, reason, success);
Result result = new Result(payload, reason, success);
if (transformResult != null) {
result.transformResult(transformResult);
}
return result;
}
}
@ -219,6 +244,10 @@ public class IndexAction extends Action<IndexAction.Result> {
this.reason = reason;
}
void transformResult(Transform.Result result) {
this.transformResult = result;
}
public Payload response() {
return response;
}

View File

@ -15,6 +15,8 @@ import org.elasticsearch.alerts.support.Script;
import org.elasticsearch.alerts.support.Variables;
import org.elasticsearch.alerts.support.template.Template;
import org.elasticsearch.alerts.support.template.XContentTemplate;
import org.elasticsearch.alerts.transform.Transform;
import org.elasticsearch.alerts.transform.TransformRegistry;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.component.AbstractComponent;
@ -41,8 +43,8 @@ public class WebhookAction extends Action<WebhookAction.Result> {
private final Template url;
private final @Nullable Template body;
public WebhookAction(ESLogger logger, HttpClient httpClient, HttpMethod method, Template url, Template body) {
super(logger);
public WebhookAction(ESLogger logger, @Nullable Transform transform, HttpClient httpClient, HttpMethod method, Template url, Template body) {
super(logger, transform);
this.httpClient = httpClient;
this.method = method;
this.url = url;
@ -55,7 +57,7 @@ public class WebhookAction extends Action<WebhookAction.Result> {
}
@Override
public Result execute(ExecutionContext ctx, Payload payload) throws IOException {
public Result doExecute(ExecutionContext ctx, Payload payload) throws IOException {
Map<String, Object> model = Variables.createCtxModel(ctx, payload);
String urlText = url.render(model);
String bodyText = body != null ? body.render(model) : XContentTemplate.YAML.render(model);
@ -80,6 +82,11 @@ public class WebhookAction extends Action<WebhookAction.Result> {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (transform != null) {
builder.startObject(Transform.Parser.TRANSFORM_FIELD.getPreferredName())
.field(transform.type(), transform)
.endObject();
}
builder.field(Parser.METHOD_FIELD.getPreferredName(), method.getName().toLowerCase(Locale.ROOT));
builder.field(Parser.URL_FIELD.getPreferredName(), url);
if (body != null) {
@ -98,6 +105,7 @@ public class WebhookAction extends Action<WebhookAction.Result> {
if (body != null ? !body.equals(that.body) : that.body != null) return false;
if (!method.equals(that.method)) return false;
if (!url.equals(that.url)) return false;
if (transform != null ? !transform.equals(that.transform) : that.transform != null) return false;
return true;
}
@ -107,6 +115,7 @@ public class WebhookAction extends Action<WebhookAction.Result> {
int result = method.hashCode();
result = 31 * result + url.hashCode();
result = 31 * result + (body != null ? body.hashCode() : 0);
result = 31 * result + (transform != null ? transform.hashCode() : 0);
return result;
}
@ -116,6 +125,10 @@ public class WebhookAction extends Action<WebhookAction.Result> {
super(type, success);
}
void transformResult(Transform.Result result) {
this.transformResult = result;
}
public static class Executed extends Result {
private final int httpStatus;
@ -177,12 +190,14 @@ public class WebhookAction extends Action<WebhookAction.Result> {
private final Template.Parser templateParser;
private final HttpClient httpClient;
private final TransformRegistry transformRegistry;
@Inject
public Parser(Settings settings, Template.Parser templateParser, HttpClient httpClient) {
public Parser(Settings settings, Template.Parser templateParser, HttpClient httpClient, TransformRegistry transformRegistry) {
super(settings);
this.templateParser = templateParser;
this.httpClient = httpClient;
this.transformRegistry = transformRegistry;
}
@Override
@ -192,6 +207,7 @@ public class WebhookAction extends Action<WebhookAction.Result> {
@Override
public WebhookAction parse(XContentParser parser) throws IOException {
Transform transform = null;
HttpMethod method = HttpMethod.POST;
Template urlTemplate = null;
Template bodyTemplate = null;
@ -220,6 +236,8 @@ public class WebhookAction extends Action<WebhookAction.Result> {
} catch (Template.Parser.ParseException pe) {
throw new ActionSettingsException("could not parse webhook action [body] template", pe);
}
} else if (Transform.Parser.TRANSFORM_FIELD.match(currentFieldName)) {
transform = transformRegistry.parse(parser);
} else {
throw new ActionSettingsException("could not parse webhook action. unexpected field [" + currentFieldName + "]");
}
@ -232,11 +250,12 @@ public class WebhookAction extends Action<WebhookAction.Result> {
throw new ActionSettingsException("could not parse webhook action. [url_template] is required");
}
return new WebhookAction(logger, httpClient, method, urlTemplate, bodyTemplate);
return new WebhookAction(logger, transform, httpClient, method, urlTemplate, bodyTemplate);
}
@Override
public Result parseResult(XContentParser parser) throws IOException {
Transform.Result transformResult = null;
String currentFieldName = null;
XContentParser.Token token;
Boolean success = null;
@ -263,7 +282,11 @@ public class WebhookAction extends Action<WebhookAction.Result> {
throw new ActionException("could not parse webhook result. unexpected boolean field [" + currentFieldName + "]");
}
} else {
throw new ActionException("unable to parse webhook action result. unexpected field [" + currentFieldName + "]" );
throw new ActionException("unable to parse webhook action result. unexpected field [" + currentFieldName + "]");
}
} else if (token == XContentParser.Token.START_OBJECT) {
if (Transform.Parser.TRANSFORM_RESULT_FIELD.match(currentFieldName)) {
}
} else {
throw new ActionException("unable to parse webhook action result. unexpected field [" + currentFieldName + "]" );
@ -274,7 +297,11 @@ public class WebhookAction extends Action<WebhookAction.Result> {
throw new ActionException("could not parse webhook result. expected boolean field [success]");
}
return success ? new Result.Executed(httpStatus, url, body) : new Result.Failure(reason);
Result result = success ? new Result.Executed(httpStatus, url, body) : new Result.Failure(reason);
if (transformResult != null) {
result.transformResult(transformResult);
}
return result;
}
}

View File

@ -82,10 +82,10 @@ public final class AlertUtils {
searchRequest.source(builder);
break;
case "indices_options":
boolean expandOpen = indicesOptions.expandWildcardsOpen();
boolean expandClosed = indicesOptions.expandWildcardsClosed();
boolean allowNoIndices = indicesOptions.allowNoIndices();
boolean ignoreUnavailable = indicesOptions.ignoreUnavailable();
boolean expandOpen = DEFAULT_INDICES_OPTIONS.expandWildcardsOpen();
boolean expandClosed = DEFAULT_INDICES_OPTIONS.expandWildcardsClosed();
boolean allowNoIndices = DEFAULT_INDICES_OPTIONS.allowNoIndices();
boolean ignoreUnavailable = DEFAULT_INDICES_OPTIONS.ignoreUnavailable();
String indicesFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
@ -124,7 +124,7 @@ public final class AlertUtils {
throw new ElasticsearchIllegalArgumentException("Unexpected token [" + token + "]");
}
}
indicesOptions = IndicesOptions.fromOptions(ignoreUnavailable, allowNoIndices, expandOpen, expandClosed);
indicesOptions = IndicesOptions.fromOptions(ignoreUnavailable, allowNoIndices, expandOpen, expandClosed, DEFAULT_INDICES_OPTIONS);
break;
case "template":
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
@ -166,6 +166,9 @@ public final class AlertUtils {
}
}
if (searchRequest.indices() == null) {
searchRequest.indices(Strings.EMPTY_ARRAY);
}
searchRequest.searchType(searchType);
searchRequest.indicesOptions(indicesOptions);
return searchRequest;

View File

@ -7,12 +7,17 @@ package org.elasticsearch.alerts.support;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.alerts.AlertsException;
import org.elasticsearch.alerts.actions.email.service.Attachment;
import org.elasticsearch.common.base.Equivalence;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Arrays;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
/**
* The only true way today to compare search request object (outside of core) is to
* serialize it and compare the serialized output. this is heavy obviously, but luckily we
@ -31,13 +36,13 @@ public final class SearchRequestEquivalence extends Equivalence<SearchRequest> {
try {
BytesStreamOutput output1 = new BytesStreamOutput();
r1.writeTo(output1);
BytesReference bytes1 = output1.bytes();
BytesStreamOutput output2 = new BytesStreamOutput(bytes1.length());
r2.writeTo(output2);
BytesReference bytes2 = output2.bytes();
return Arrays.equals(bytes1.toBytes(), bytes2.toBytes());
} catch (IOException ioe) {
throw new AlertsException("could not compare search requests", ioe);
byte[] bytes1 = output1.bytes().toBytes();
output1.reset();
r2.writeTo(output1);
byte[] bytes2 = output1.bytes().toBytes();
return Arrays.equals(bytes1, bytes2);
} catch (Throwable t) {
throw new AlertsException("could not compare search requests", t);
}
}

View File

@ -57,6 +57,23 @@ public class ChainTransform extends Transform {
return builder.endArray();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ChainTransform transform = (ChainTransform) o;
if (!transforms.equals(transform.transforms)) return false;
return true;
}
@Override
public int hashCode() {
return transforms.hashCode();
}
public static class Parser implements Transform.Parser<ChainTransform>, InitializingService.Initializable {
private TransformRegistry registry;

View File

@ -63,6 +63,23 @@ public class ScriptTransform extends Transform {
return builder.value(script);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ScriptTransform transform = (ScriptTransform) o;
if (!script.equals(transform.script)) return false;
return true;
}
@Override
public int hashCode() {
return script.hashCode();
}
public static class Parser implements Transform.Parser<ScriptTransform> {
private final ScriptServiceProxy scriptService;

View File

@ -11,6 +11,7 @@ import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.alerts.ExecutionContext;
import org.elasticsearch.alerts.Payload;
import org.elasticsearch.alerts.support.AlertUtils;
import org.elasticsearch.alerts.support.SearchRequestEquivalence;
import org.elasticsearch.alerts.support.init.proxy.ClientProxy;
import org.elasticsearch.alerts.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.common.Strings;
@ -71,7 +72,24 @@ public class SearchTransform extends Transform {
return AlertUtils.writeSearchRequest(request, builder, params);
}
public SearchRequest createRequest(SearchRequest requestPrototype, ExecutionContext ctx, Payload payload) throws IOException {
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SearchTransform transform = (SearchTransform) o;
if (!SearchRequestEquivalence.INSTANCE.equivalent(request, transform.request)) return false;
return true;
}
@Override
public int hashCode() {
return request.hashCode();
}
SearchRequest createRequest(SearchRequest requestPrototype, ExecutionContext ctx, Payload payload) throws IOException {
SearchRequest request = new SearchRequest(requestPrototype)
.indicesOptions(requestPrototype.indicesOptions())
.indices(requestPrototype.indices());

View File

@ -5,8 +5,10 @@
*/
package org.elasticsearch.alerts.transform;
import org.elasticsearch.alerts.AlertsSettingsException;
import org.elasticsearch.alerts.ExecutionContext;
import org.elasticsearch.alerts.Payload;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
@ -39,7 +41,10 @@ public abstract class Transform implements ToXContent {
public abstract Result apply(ExecutionContext ctx, Payload payload) throws IOException;
public static class Result {
public static class Result implements ToXContent {
public static final ParseField TYPE_FIELD = new ParseField("type");
public static final ParseField PAYLOAD_FIELD = new ParseField("payload");
private final String type;
private final Payload payload;
@ -56,10 +61,61 @@ public abstract class Transform implements ToXContent {
public Payload payload() {
return payload;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject()
.field(TYPE_FIELD.getPreferredName(), type)
.field(PAYLOAD_FIELD.getPreferredName(), payload)
.endObject();
}
public static Result parse(XContentParser parser) throws IOException {
XContentParser.Token token = parser.currentToken();
if (token != XContentParser.Token.START_OBJECT) {
throw new AlertsSettingsException("could not parse transform result. expected an object, but found [" + token + "]");
}
String type = null;
Payload payload = null;
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else {
if (token == XContentParser.Token.VALUE_STRING) {
if (TYPE_FIELD.match(currentFieldName)) {
type = parser.text();
} else {
throw new AlertsSettingsException("could not parse transform result. expected a string value for field [" + currentFieldName + "], found [" + token + "]");
}
} else if (token == XContentParser.Token.START_OBJECT) {
if (PAYLOAD_FIELD.match(currentFieldName)) {
payload = new Payload.XContent(parser);
} else {
throw new AlertsSettingsException("could not parse transform result. expected an object for field [" + currentFieldName + "], found [" + token + "]");
}
} else {
throw new AlertsSettingsException("could not parse transform result. unexpected token [" + token + "]");
}
}
}
if (type == null) {
throw new AlertsSettingsException("could not parse transform result. missing [type] field");
}
if (payload == null) {
throw new AlertsSettingsException("could not parse transform result. missing [payload] field");
}
return new Result(type, payload);
}
}
public static interface Parser<T extends Transform> {
public static final ParseField TRANSFORM_FIELD = new ParseField("transform");
public static final ParseField TRANSFORM_RESULT_FIELD = new ParseField("transform_result");
String type();
T parse(XContentParser parser) throws IOException;

View File

@ -33,11 +33,7 @@ public class TransformRegistry {
if (token == XContentParser.Token.FIELD_NAME) {
type = parser.currentName();
} else if (type != null) {
Transform.Parser transformParser = parsers.get(type);
if (transformParser == null) {
throw new AlertsSettingsException("unknown transform type [" + type + "]");
}
transform = transformParser.parse(parser);
transform = parse(type, parser);
}
}
return transform;

View File

@ -26,6 +26,7 @@ import org.elasticsearch.alerts.input.search.SearchInput;
import org.elasticsearch.alerts.input.simple.SimpleInput;
import org.elasticsearch.alerts.scheduler.schedule.*;
import org.elasticsearch.alerts.scheduler.schedule.support.*;
import org.elasticsearch.alerts.support.AlertUtils;
import org.elasticsearch.alerts.support.Script;
import org.elasticsearch.alerts.support.init.proxy.ClientProxy;
import org.elasticsearch.alerts.support.init.proxy.ScriptServiceProxy;
@ -76,6 +77,8 @@ public class AlertTests extends ElasticsearchTestCase {
@Test @Repeat(iterations = 20)
public void testParser_SelfGenerated() throws Exception {
TransformRegistry transformRegistry = transformRegistry();
Schedule schedule = randomSchedule();
ScheduleRegistry scheduleRegistry = registry(schedule);
@ -86,10 +89,9 @@ public class AlertTests extends ElasticsearchTestCase {
ConditionRegistry conditionRegistry = registry(condition);
Transform transform = randomTransform();
TransformRegistry transformRegistry = registry(transform);
Actions actions = randomActions();
ActionRegistry actionRegistry = registry(actions);
ActionRegistry actionRegistry = registry(actions, transformRegistry);
Map<String, Object> metadata = ImmutableMap.<String, Object>of("_key", "_val");
@ -218,60 +220,52 @@ public class AlertTests extends ElasticsearchTestCase {
case ScriptTransform.TYPE:
return new ScriptTransform(scriptService, new Script("_script"));
case SearchTransform.TYPE:
return new SearchTransform(logger, scriptService, client, matchAllRequest());
return new SearchTransform(logger, scriptService, client, matchAllRequest(AlertUtils.DEFAULT_INDICES_OPTIONS));
default: // chain
return new ChainTransform(ImmutableList.of(
new SearchTransform(logger, scriptService, client, matchAllRequest()),
new SearchTransform(logger, scriptService, client, matchAllRequest(AlertUtils.DEFAULT_INDICES_OPTIONS)),
new ScriptTransform(scriptService, new Script("_script"))));
}
}
private TransformRegistry registry(Transform transform) {
private TransformRegistry transformRegistry() {
ImmutableMap.Builder<String, Transform.Parser> parsers = ImmutableMap.builder();
switch (transform.type()) {
case ScriptTransform.TYPE:
parsers.put(ScriptTransform.TYPE, new ScriptTransform.Parser(scriptService));
return new TransformRegistry(parsers.build());
case SearchTransform.TYPE:
parsers.put(SearchTransform.TYPE, new SearchTransform.Parser(settings, scriptService, client));
return new TransformRegistry(parsers.build());
default:
ChainTransform.Parser parser = new ChainTransform.Parser();
parsers.put(ChainTransform.TYPE, parser);
parsers.put(ScriptTransform.TYPE, new ScriptTransform.Parser(scriptService));
parsers.put(SearchTransform.TYPE, new SearchTransform.Parser(settings, scriptService, client));
TransformRegistry registry = new TransformRegistry(parsers.build());
parser.init(registry);
return registry;
}
ChainTransform.Parser parser = new ChainTransform.Parser();
parsers.put(ChainTransform.TYPE, parser);
parsers.put(ScriptTransform.TYPE, new ScriptTransform.Parser(scriptService));
parsers.put(SearchTransform.TYPE, new SearchTransform.Parser(settings, scriptService, client));
TransformRegistry registry = new TransformRegistry(parsers.build());
parser.init(registry);
return registry;
}
private Actions randomActions() {
ImmutableList.Builder<Action> list = ImmutableList.builder();
if (randomBoolean()) {
list.add(new EmailAction(logger, emailService, Email.builder().id("prototype").build(), null, Profile.STANDARD, null, null, null, null, randomBoolean()));
Transform transform = randomTransform();
list.add(new EmailAction(logger, transform, emailService, Email.builder().id("prototype").build(), null, Profile.STANDARD, null, null, null, null, randomBoolean()));
}
if (randomBoolean()) {
list.add(new IndexAction(logger, client, "_index", "_type"));
list.add(new IndexAction(logger, randomTransform(), client, "_index", "_type"));
}
if (randomBoolean()) {
list.add(new WebhookAction(logger, httpClient, randomFrom(HttpMethod.GET, HttpMethod.POST, HttpMethod.PUT), new ScriptTemplate(scriptService, "_url"), null));
list.add(new WebhookAction(logger, randomTransform(), httpClient, randomFrom(HttpMethod.GET, HttpMethod.POST, HttpMethod.PUT), new ScriptTemplate(scriptService, "_url"), null));
}
return new Actions(list.build());
}
private ActionRegistry registry(Actions actions) {
private ActionRegistry registry(Actions actions, TransformRegistry transformRegistry) {
ImmutableMap.Builder<String, Action.Parser> parsers = ImmutableMap.builder();
for (Action action : actions) {
switch (action.type()) {
case EmailAction.TYPE:
parsers.put(EmailAction.TYPE, new EmailAction.Parser(settings, emailService, templateParser));
parsers.put(EmailAction.TYPE, new EmailAction.Parser(settings, emailService, templateParser, transformRegistry));
break;
case IndexAction.TYPE:
parsers.put(IndexAction.TYPE, new IndexAction.Parser(settings, client));
parsers.put(IndexAction.TYPE, new IndexAction.Parser(settings, client, transformRegistry));
break;
case WebhookAction.TYPE:
parsers.put(WebhookAction.TYPE, new WebhookAction.Parser(settings, templateParser, httpClient));
parsers.put(WebhookAction.TYPE, new WebhookAction.Parser(settings, templateParser, httpClient, transformRegistry));
break;
}
}

View File

@ -14,6 +14,8 @@ import org.elasticsearch.alerts.actions.email.service.*;
import org.elasticsearch.alerts.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.alerts.support.template.ScriptTemplate;
import org.elasticsearch.alerts.support.template.Template;
import org.elasticsearch.alerts.transform.Transform;
import org.elasticsearch.alerts.transform.TransformRegistry;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.joda.time.DateTime;
@ -61,7 +63,8 @@ public class EmailActionTests extends ElasticsearchTestCase {
Template textBody = mock(Template.class);
Template htmlBody = randomBoolean() ? null : mock(Template.class);
boolean attachPayload = randomBoolean();
EmailAction action = new EmailAction(logger, service, email, auth, profile, account, subject, textBody, htmlBody, attachPayload);
Transform transform = randomBoolean() ? null : mock(Transform.class);
EmailAction action = new EmailAction(logger, transform, service, email, auth, profile, account, subject, textBody, htmlBody, attachPayload);
final Map<String, Object> data = new HashMap<>();
Payload payload = new Payload() {
@ -86,11 +89,13 @@ public class EmailActionTests extends ElasticsearchTestCase {
when(ctx.alert()).thenReturn(alert);
when(ctx.fireTime()).thenReturn(now);
when(ctx.scheduledTime()).thenReturn(now);
if (transform != null) {
when(transform.apply(ctx, payload)).thenReturn(new Transform.Result("_transform_type", new Payload.Simple("_key", "_value")));
}
Map<String, Object> expectedModel = ImmutableMap.<String, Object>builder()
.put("ctx", ImmutableMap.<String, Object>builder()
.put("alert_name", "alert1")
.put("payload", data)
.put("payload", transform == null ? data : new Payload.Simple("_key", "_value").data())
.put("fire_time", now)
.put("scheduled_fire_time", now).build())
.build();
@ -117,6 +122,11 @@ public class EmailActionTests extends ElasticsearchTestCase {
if (attachPayload) {
assertThat(actualEmail.attachments(), hasKey("payload"));
}
if (transform != null) {
assertThat(result.transformResult(), notNullValue());
assertThat(result.transformResult().type(), equalTo("_transform_type"));
assertThat(result.transformResult().payload().data(), equalTo(new Payload.Simple("_key", "_value").data()));
}
}
@Test @Repeat(iterations = 20)
@ -132,6 +142,8 @@ public class EmailActionTests extends ElasticsearchTestCase {
ScriptTemplate subject = randomBoolean() ? new ScriptTemplate(scriptService, "_subject") : null;
ScriptTemplate textBody = randomBoolean() ? new ScriptTemplate(scriptService, "_text_body") : null;
ScriptTemplate htmlBody = randomBoolean() ? new ScriptTemplate(scriptService, "_text_html") : null;
final Transform transform = randomBoolean() ? null : new TransformMock();
TransformRegistry transformRegistry = transform == null ? mock(TransformRegistry.class) : new TransformRegistryMock(transform);
boolean attachPayload = randomBoolean();
XContentBuilder builder = jsonBuilder().startObject()
.field("account", "_account")
@ -190,11 +202,17 @@ public class EmailActionTests extends ElasticsearchTestCase {
builder.field("html_body", htmlBody);
}
}
if (transform != null) {
builder.startObject("transform")
.startObject("_transform").endObject()
.endObject();
}
BytesReference bytes = builder.bytes();
XContentParser parser = JsonXContent.jsonXContent.createParser(bytes);
parser.nextToken();
EmailAction action = new EmailAction.Parser(ImmutableSettings.EMPTY, emailService,
new ScriptTemplate.Parser(ImmutableSettings.EMPTY, scriptService)).parse(parser);
new ScriptTemplate.Parser(ImmutableSettings.EMPTY, scriptService), transformRegistry).parse(parser);
assertThat(action, notNullValue());
assertThat(action.account, is("_account"));
@ -226,6 +244,10 @@ public class EmailActionTests extends ElasticsearchTestCase {
} else {
assertThat(action.emailPrototype.replyTo(), nullValue());
}
if (transform != null) {
assertThat(action.transform(), notNullValue());
assertThat(action.transform(), equalTo(transform));
}
}
@Test @Repeat(iterations = 20)
@ -255,15 +277,18 @@ public class EmailActionTests extends ElasticsearchTestCase {
Template textBody = new TemplateMock("_text_body");
Template htmlBody = randomBoolean() ? null : new TemplateMock("_html_body");
boolean attachPayload = randomBoolean();
Transform transform = randomBoolean() ? null : new TransformMock();
TransformRegistry transformRegistry = transform == null ? mock(TransformRegistry.class) : new TransformRegistryMock(transform);
EmailAction action = new EmailAction(logger, service, email, auth, profile, account, subject, textBody, htmlBody, attachPayload);
EmailAction action = new EmailAction(logger, transform, service, email, auth, profile, account, subject, textBody, htmlBody, attachPayload);
XContentBuilder builder = jsonBuilder();
action.toXContent(builder, Attachment.XContent.EMPTY_PARAMS);
BytesReference bytes = builder.bytes();
System.out.println(bytes.toUtf8());
XContentParser parser = JsonXContent.jsonXContent.createParser(bytes);
parser.nextToken();
EmailAction parsed = new EmailAction.Parser(ImmutableSettings.EMPTY, service, new TemplateMock.Parser()).parse(parser);
EmailAction parsed = new EmailAction.Parser(ImmutableSettings.EMPTY,service, new TemplateMock.Parser(), transformRegistry).parse(parser);
assertThat(parsed, equalTo(action));
}
@ -277,7 +302,7 @@ public class EmailActionTests extends ElasticsearchTestCase {
BytesReference bytes = builder.bytes();
XContentParser parser = JsonXContent.jsonXContent.createParser(bytes);
new EmailAction.Parser(ImmutableSettings.EMPTY, emailService,
new ScriptTemplate.Parser(ImmutableSettings.EMPTY, scriptService)).parse(parser);
new ScriptTemplate.Parser(ImmutableSettings.EMPTY, scriptService), mock(TransformRegistry.class)).parse(parser);
}
@Test @Repeat(iterations = 20)
@ -290,11 +315,20 @@ public class EmailActionTests extends ElasticsearchTestCase {
.subject("_subject")
.textBody("_text_body")
.build();
boolean withTransform = randomBoolean();
XContentBuilder builder = jsonBuilder().startObject()
.field("success", success);
if (success) {
builder.field("email", email);
builder.field("account", "_account");
if (withTransform) {
builder.startObject("transform_result")
.field("type", "_transform_type")
.field("payload", new Payload.Simple("_key", "_value").data())
.endObject();
}
} else {
builder.field("reason", "_reason");
}
@ -302,13 +336,20 @@ public class EmailActionTests extends ElasticsearchTestCase {
BytesReference bytes = builder.bytes();
XContentParser parser = JsonXContent.jsonXContent.createParser(bytes);
parser.nextToken();
EmailAction.Result result = new EmailAction.Parser(ImmutableSettings.EMPTY, mock(EmailService.class), new TemplateMock.Parser())
EmailAction.Result result = new EmailAction.Parser(ImmutableSettings.EMPTY, mock(EmailService.class), new TemplateMock.Parser(), mock(TransformRegistry.class))
.parseResult(parser);
assertThat(result.success(), is(success));
if (success) {
assertThat(result, instanceOf(EmailAction.Result.Success.class));
assertThat(((EmailAction.Result.Success) result).email(), equalTo(email));
assertThat(((EmailAction.Result.Success) result).account(), is("_account"));
if (withTransform) {
assertThat(result.transformResult(), notNullValue());
assertThat(result.transformResult().type(), equalTo("_transform_type"));
assertThat(result.transformResult().payload().data(), equalTo(new Payload.Simple("_key", "_value").data()));
} else {
assertThat(result.transformResult(), nullValue());
}
} else {
assertThat(result, instanceOf(EmailAction.Result.Failure.class));
assertThat(((EmailAction.Result.Failure) result).reason(), is("_reason"));
@ -323,7 +364,7 @@ public class EmailActionTests extends ElasticsearchTestCase {
BytesReference bytes = builder.bytes();
XContentParser parser = JsonXContent.jsonXContent.createParser(bytes);
parser.nextToken();
new EmailAction.Parser(ImmutableSettings.EMPTY, mock(EmailService.class), new TemplateMock.Parser())
new EmailAction.Parser(ImmutableSettings.EMPTY, mock(EmailService.class), new TemplateMock.Parser(), mock(TransformRegistry.class))
.parseResult(parser);
}
@ -370,4 +411,41 @@ public class EmailActionTests extends ElasticsearchTestCase {
}
}
}
static class TransformMock extends Transform {
@Override
public String type() {
return "_transform";
}
@Override
public Result apply(ExecutionContext ctx, Payload payload) throws IOException {
return new Transform.Result("_transform", new Payload.Simple("_key", "_value"));
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject().endObject();
}
}
static class TransformRegistryMock extends TransformRegistry {
public TransformRegistryMock(final Transform transform) {
super(ImmutableMap.<String, Transform.Parser>of("_transform", new Transform.Parser() {
@Override
public String type() {
return transform.type();
}
@Override
public Transform parse(XContentParser parser) throws IOException {
parser.nextToken();
assertThat(parser.currentToken(), is(XContentParser.Token.END_OBJECT));
return transform;
}
}));
}
}
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.alerts.test;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.actions.Action;
import org.elasticsearch.alerts.actions.Actions;
@ -26,9 +27,11 @@ import org.elasticsearch.alerts.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.alerts.support.template.ScriptTemplate;
import org.elasticsearch.alerts.support.template.Template;
import org.elasticsearch.alerts.transform.SearchTransform;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.netty.handler.codec.http.HttpMethod;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
@ -57,7 +60,16 @@ public final class AlertsTestUtils {
}
public static SearchRequest matchAllRequest() {
return new SearchRequest().source(SearchSourceBuilder.searchSource().query(matchAllQuery()));
return matchAllRequest(null);
}
public static SearchRequest matchAllRequest(IndicesOptions indicesOptions) {
SearchRequest request = new SearchRequest(Strings.EMPTY_ARRAY)
.source(SearchSourceBuilder.searchSource().query(matchAllQuery()).buildAsBytes(XContentType.JSON), false);
if (indicesOptions != null) {
request.indicesOptions(indicesOptions);
}
return request;
}
public static Alert createTestAlert(String alertName, ScriptServiceProxy scriptService, HttpClient httpClient, EmailService emailService, ESLogger logger) throws AddressException {
@ -71,7 +83,7 @@ public final class AlertsTestUtils {
Template url = new ScriptTemplate(scriptService, "http://localhost/foobarbaz/{{alert_name}}");
Template body = new ScriptTemplate(scriptService, "{{alert_name}} executed with {{response.hits.total}} hits");
actions.add(new WebhookAction(logger, httpClient, HttpMethod.GET, url, body));
actions.add(new WebhookAction(logger, null, httpClient, HttpMethod.GET, url, body));
Email.Address from = new Email.Address("from@test.com");
List<Email.Address> emailAddressList = new ArrayList<>();
@ -84,7 +96,7 @@ public final class AlertsTestUtils {
emailBuilder.to(to);
EmailAction emailAction = new EmailAction(logger, emailService, emailBuilder.build(),
EmailAction emailAction = new EmailAction(logger, null, emailService, emailBuilder.build(),
new Authentication("testname", "testpassword"), Profile.STANDARD, "testaccount", body, body, null, true);
actions.add(emailAction);