Aligned Transform.Result with all other result constructs

- the transform parser and registry can now parse transform results
- every transform result may have its own format
- the chain transform result outputs the "transformation trail" of all the chained transforms (great tool for debugging)
- removed `Transform.NOOP` - was redundant, when no transform is defined the alert's transform is `null`
- removed `payload` from the `AlertExecution`. Instead, the execution holds the input result, transform result and potentially the transform results of the actions
- changed the xcontent representing a transofrm result to `{ "transform_type" : { "payload" : {...}, ... } }`
- with this change, the `Action` interface is cleaned up (`execute` only accepts the execution context)

Original commit: elastic/x-pack-elasticsearch@6ecf7f2c92
This commit is contained in:
uboness 2015-03-04 03:39:18 +01:00
parent 167f6814ab
commit 01145b8025
19 changed files with 367 additions and 167 deletions

View File

@ -69,7 +69,7 @@ public class Alert implements Scheduler.Job, ToXContent {
this.status = status != null ? status : new Status(); this.status = status != null ? status : new Status();
this.throttlePeriod = throttlePeriod; this.throttlePeriod = throttlePeriod;
this.metadata = metadata; this.metadata = metadata;
this.transform = transform != null ? transform : Transform.NOOP; this.transform = transform;
throttler = new AlertThrottler(throttlePeriod); throttler = new AlertThrottler(throttlePeriod);
} }
@ -144,7 +144,7 @@ public class Alert implements Scheduler.Job, ToXContent {
builder.field(Parser.SCHEDULE_FIELD.getPreferredName()).startObject().field(schedule.type(), schedule).endObject(); builder.field(Parser.SCHEDULE_FIELD.getPreferredName()).startObject().field(schedule.type(), schedule).endObject();
builder.field(Parser.INPUT_FIELD.getPreferredName()).startObject().field(input.type(), input).endObject(); builder.field(Parser.INPUT_FIELD.getPreferredName()).startObject().field(input.type(), input).endObject();
builder.field(Parser.CONDITION_FIELD.getPreferredName()).startObject().field(condition.type(), condition).endObject(); builder.field(Parser.CONDITION_FIELD.getPreferredName()).startObject().field(condition.type(), condition).endObject();
if (transform != Transform.NOOP) { if (transform != null) {
builder.field(Parser.TRANSFORM_FIELD.getPreferredName()).startObject().field(transform.type(), transform).endObject(); builder.field(Parser.TRANSFORM_FIELD.getPreferredName()).startObject().field(transform.type(), transform).endObject();
} }
if (throttlePeriod != null) { if (throttlePeriod != null) {

View File

@ -12,6 +12,9 @@ import org.elasticsearch.alerts.condition.ConditionRegistry;
import org.elasticsearch.alerts.input.Input; import org.elasticsearch.alerts.input.Input;
import org.elasticsearch.alerts.input.InputRegistry; import org.elasticsearch.alerts.input.InputRegistry;
import org.elasticsearch.alerts.throttle.Throttler; import org.elasticsearch.alerts.throttle.Throttler;
import org.elasticsearch.alerts.transform.Transform;
import org.elasticsearch.alerts.transform.TransformRegistry;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
@ -29,19 +32,23 @@ public class AlertExecution implements ToXContent {
private final Input.Result inputResult; private final Input.Result inputResult;
private final Condition.Result conditionResult; private final Condition.Result conditionResult;
private final Throttler.Result throttleResult; private final Throttler.Result throttleResult;
private final @Nullable Transform.Result transformResult;
private final Map<String, Action.Result> actionsResults; private final Map<String, Action.Result> actionsResults;
private final Payload payload;
public AlertExecution(ExecutionContext context) { public AlertExecution(ExecutionContext context) {
this(context.inputResult(), context.conditionResult(), context.throttleResult(), context.actionsResults(), context.payload()); this(context.inputResult(), context.conditionResult(), context.throttleResult(), context.transformResult(), context.actionsResults());
} }
AlertExecution(Input.Result inputResult, Condition.Result conditionResult, Throttler.Result throttleResult, Map<String, Action.Result> actionsResults, Payload payload) { AlertExecution(Input.Result inputResult, Condition.Result conditionResult, Throttler.Result throttleResult, @Nullable Transform.Result transformResult, Map<String, Action.Result> actionsResults) {
this.inputResult = inputResult; this.inputResult = inputResult;
this.conditionResult = conditionResult; this.conditionResult = conditionResult;
this.throttleResult = throttleResult; this.throttleResult = throttleResult;
this.transformResult = transformResult;
this.actionsResults = actionsResults; this.actionsResults = actionsResults;
this.payload = payload; }
public Input.Result inputResult() {
return inputResult;
} }
public Condition.Result conditionResult() { public Condition.Result conditionResult() {
@ -52,22 +59,22 @@ public class AlertExecution implements ToXContent {
return throttleResult; return throttleResult;
} }
public Map<String, Action.Result> actionsResults() { public Transform.Result transformResult() {
return actionsResults; return transformResult;
} }
public Payload payload() { public Map<String, Action.Result> actionsResults() {
return payload; return actionsResults;
} }
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(); builder.startObject();
if (inputResult != null) { if (inputResult != null) {
builder.startObject(Parser.INPUT_RESULT.getPreferredName()).field(inputResult.type(), inputResult).endObject(); builder.startObject(Parser.INPUT_RESULT_FIELD.getPreferredName()).field(inputResult.type(), inputResult).endObject();
} }
if (conditionResult != null) { if (conditionResult != null) {
builder.startObject(Parser.CONDITION_RESULT.getPreferredName()).field(conditionResult.type(), conditionResult).endObject(); builder.startObject(Parser.CONDITION_RESULT_FIELD.getPreferredName()).field(conditionResult.type(), conditionResult).endObject();
} }
if (throttleResult != null && throttleResult.throttle()) { if (throttleResult != null && throttleResult.throttle()) {
builder.field(Parser.THROTTLED.getPreferredName(), throttleResult.throttle()); builder.field(Parser.THROTTLED.getPreferredName(), throttleResult.throttle());
@ -75,7 +82,9 @@ public class AlertExecution implements ToXContent {
builder.field(Parser.THROTTLE_REASON.getPreferredName(), throttleResult.reason()); builder.field(Parser.THROTTLE_REASON.getPreferredName(), throttleResult.reason());
} }
} }
builder.field(Parser.PAYLOAD.getPreferredName(), payload()); if (transformResult != null) {
builder.startObject(Transform.Parser.TRANSFORM_RESULT_FIELD.getPreferredName()).field(transformResult.type(), transformResult).endObject();
}
builder.startArray(Parser.ACTIONS_RESULTS.getPreferredName()); builder.startArray(Parser.ACTIONS_RESULTS.getPreferredName());
for (Map.Entry<String, Action.Result> actionResult : actionsResults.entrySet()) { for (Map.Entry<String, Action.Result> actionResult : actionsResults.entrySet()) {
builder.startObject(); builder.startObject();
@ -89,21 +98,20 @@ public class AlertExecution implements ToXContent {
public static class Parser { public static class Parser {
public static final ParseField INPUT_RESULT = new ParseField("input_result"); public static final ParseField INPUT_RESULT_FIELD = new ParseField("input_result");
public static final ParseField CONDITION_RESULT = new ParseField("condition_result"); public static final ParseField CONDITION_RESULT_FIELD = new ParseField("condition_result");
public static final ParseField PAYLOAD = new ParseField("payload");
public static final ParseField ACTIONS_RESULTS = new ParseField("actions_results"); public static final ParseField ACTIONS_RESULTS = new ParseField("actions_results");
public static final ParseField THROTTLED = new ParseField("throttled"); public static final ParseField THROTTLED = new ParseField("throttled");
public static final ParseField THROTTLE_REASON = new ParseField("throttle_reason"); public static final ParseField THROTTLE_REASON = new ParseField("throttle_reason");
public static AlertExecution parse(XContentParser parser, ConditionRegistry conditionRegistry, ActionRegistry actionRegistry, public static AlertExecution parse(XContentParser parser, ConditionRegistry conditionRegistry, ActionRegistry actionRegistry,
InputRegistry inputRegistry) throws IOException { InputRegistry inputRegistry, TransformRegistry transformRegistry) throws IOException {
boolean throttled = false; boolean throttled = false;
String throttleReason = null; String throttleReason = null;
Map<String, Action.Result> actionResults = new HashMap<>(); Map<String, Action.Result> actionResults = new HashMap<>();
Input.Result inputResult = null; Input.Result inputResult = null;
Condition.Result conditionResult = null; Condition.Result conditionResult = null;
Payload payload = null; Transform.Result transformResult = null;
String currentFieldName = null; String currentFieldName = null;
XContentParser.Token token; XContentParser.Token token;
@ -119,12 +127,12 @@ public class AlertExecution implements ToXContent {
throw new AlertsException("unable to parse alert run. unexpected field [" + currentFieldName + "]"); throw new AlertsException("unable to parse alert run. unexpected field [" + currentFieldName + "]");
} }
} else if (token == XContentParser.Token.START_OBJECT) { } else if (token == XContentParser.Token.START_OBJECT) {
if (INPUT_RESULT.match(currentFieldName)) { if (INPUT_RESULT_FIELD.match(currentFieldName)) {
inputResult = inputRegistry.parseResult(parser); inputResult = inputRegistry.parseResult(parser);
} else if (CONDITION_RESULT.match(currentFieldName)) { } else if (CONDITION_RESULT_FIELD.match(currentFieldName)) {
conditionResult = conditionRegistry.parseResult(parser); conditionResult = conditionRegistry.parseResult(parser);
} else if (PAYLOAD.match(currentFieldName)) { } else if (Transform.Parser.TRANSFORM_RESULT_FIELD.match(currentFieldName)) {
payload = new Payload.XContent(parser); transformResult = transformRegistry.parseResult(parser);
} else { } else {
throw new AlertsException("unable to parse alert run. unexpected field [" + currentFieldName + "]"); throw new AlertsException("unable to parse alert run. unexpected field [" + currentFieldName + "]");
} }
@ -140,7 +148,7 @@ public class AlertExecution implements ToXContent {
} }
Throttler.Result throttleResult = throttled ? Throttler.Result.throttle(throttleReason) : Throttler.Result.NO; Throttler.Result throttleResult = throttled ? Throttler.Result.throttle(throttleReason) : Throttler.Result.NO;
return new AlertExecution(inputResult, conditionResult, throttleResult, actionResults, payload ); return new AlertExecution(inputResult, conditionResult, throttleResult, transformResult, actionResults);
} }

View File

@ -43,20 +43,21 @@ public abstract class Action<R extends Action.Result> implements ToXContent {
/** /**
* Executes this action * Executes this action
*/ */
public final R execute(ExecutionContext context, Payload payload) throws IOException { public R execute(ExecutionContext context) throws IOException {
Payload payload = context.payload();
Transform.Result transformResult = null; Transform.Result transformResult = null;
if (transform != null) { if (transform != null) {
transformResult = transform.apply(context, payload); transformResult = transform.apply(context, payload);
payload = transformResult.payload(); payload = transformResult.payload();
} }
R result = doExecute(context, payload); R result = execute(context, payload);
if (transformResult != null) { if (transformResult != null) {
result.transformResult = transformResult; result.transformResult = transformResult;
} }
return result; return result;
} }
protected abstract R doExecute(ExecutionContext context, Payload payload) throws IOException; protected abstract R execute(ExecutionContext context, Payload payload) throws IOException;
/** /**
* Parses xcontent to a concrete action of the same type. * Parses xcontent to a concrete action of the same type.
@ -107,7 +108,9 @@ public abstract class Action<R extends Action.Result> implements ToXContent {
builder.startObject(); builder.startObject();
builder.field(SUCCESS_FIELD.getPreferredName(), success); builder.field(SUCCESS_FIELD.getPreferredName(), success);
if (transformResult != null) { if (transformResult != null) {
builder.field(Transform.Parser.TRANSFORM_RESULT_FIELD.getPreferredName(), transformResult); builder.startObject(Transform.Parser.TRANSFORM_RESULT_FIELD.getPreferredName())
.field(transformResult.type(), transformResult)
.endObject();
} }
xContentBody(builder, params); xContentBody(builder, params);
return builder.endObject(); return builder.endObject();

View File

@ -66,7 +66,7 @@ public class EmailAction extends Action<EmailAction.Result> {
} }
@Override @Override
public Result doExecute(ExecutionContext ctx, Payload payload) throws IOException { protected Result execute(ExecutionContext ctx, Payload payload) throws IOException {
Map<String, Object> model = Variables.createCtxModel(ctx, payload); Map<String, Object> model = Variables.createCtxModel(ctx, payload);
Email.Builder email = Email.builder() Email.Builder email = Email.builder()
@ -311,7 +311,7 @@ public class EmailAction extends Action<EmailAction.Result> {
if (EMAIL_FIELD.match(currentFieldName)) { if (EMAIL_FIELD.match(currentFieldName)) {
email = Email.parse(parser); email = Email.parse(parser);
} else if (Transform.Parser.TRANSFORM_RESULT_FIELD.match(currentFieldName)) { } else if (Transform.Parser.TRANSFORM_RESULT_FIELD.match(currentFieldName)) {
transformResult = Transform.Result.parse(parser); transformResult = transformRegistry.parseResult(parser);
} else { } else {
throw new EmailException("could not parse email result. unexpected field [" + currentFieldName + "]"); throw new EmailException("could not parse email result. unexpected field [" + currentFieldName + "]");
} }

View File

@ -54,7 +54,7 @@ public class IndexAction extends Action<IndexAction.Result> {
} }
@Override @Override
public Result doExecute(ExecutionContext ctx, Payload payload) throws IOException { protected Result execute(ExecutionContext ctx, Payload payload) throws IOException {
IndexRequest indexRequest = new IndexRequest(); IndexRequest indexRequest = new IndexRequest();
indexRequest.index(index); indexRequest.index(index);
indexRequest.type(type); indexRequest.type(type);
@ -212,7 +212,7 @@ public class IndexAction extends Action<IndexAction.Result> {
if (RESPONSE_FIELD.match(currentFieldName)) { if (RESPONSE_FIELD.match(currentFieldName)) {
payload = new Payload.Simple(parser.map()); payload = new Payload.Simple(parser.map());
} else if (Transform.Parser.TRANSFORM_RESULT_FIELD.match(currentFieldName)) { } else if (Transform.Parser.TRANSFORM_RESULT_FIELD.match(currentFieldName)) {
transformResult = Transform.Result.parse(parser); transformResult = transformRegistry.parseResult(parser);
} else { } else {
throw new ActionException("could not parse index result. unexpected object field [" + currentFieldName + "]"); throw new ActionException("could not parse index result. unexpected object field [" + currentFieldName + "]");
} }

View File

@ -57,7 +57,7 @@ public class WebhookAction extends Action<WebhookAction.Result> {
} }
@Override @Override
public Result doExecute(ExecutionContext ctx, Payload payload) throws IOException { protected Result execute(ExecutionContext ctx, Payload payload) throws IOException {
Map<String, Object> model = Variables.createCtxModel(ctx, payload); Map<String, Object> model = Variables.createCtxModel(ctx, payload);
String urlText = url.render(model); String urlText = url.render(model);
String bodyText = body != null ? body.render(model) : XContentTemplate.YAML.render(model); String bodyText = body != null ? body.render(model) : XContentTemplate.YAML.render(model);
@ -286,7 +286,7 @@ public class WebhookAction extends Action<WebhookAction.Result> {
} }
} else if (token == XContentParser.Token.START_OBJECT) { } else if (token == XContentParser.Token.START_OBJECT) {
if (Transform.Parser.TRANSFORM_RESULT_FIELD.match(currentFieldName)) { if (Transform.Parser.TRANSFORM_RESULT_FIELD.match(currentFieldName)) {
transformResult = transformRegistry.parseResult(parser);
} }
} else { } else {
throw new ActionException("unable to parse webhook action result. unexpected field [" + currentFieldName + "]" ); throw new ActionException("unable to parse webhook action result. unexpected field [" + currentFieldName + "]" );

View File

@ -15,6 +15,7 @@ import org.elasticsearch.alerts.condition.Condition;
import org.elasticsearch.alerts.condition.ConditionRegistry; import org.elasticsearch.alerts.condition.ConditionRegistry;
import org.elasticsearch.alerts.input.Input; import org.elasticsearch.alerts.input.Input;
import org.elasticsearch.alerts.input.InputRegistry; import org.elasticsearch.alerts.input.InputRegistry;
import org.elasticsearch.alerts.transform.TransformRegistry;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
@ -212,14 +213,16 @@ public class FiredAlert implements ToXContent {
private final ConditionRegistry conditionRegistry; private final ConditionRegistry conditionRegistry;
private final ActionRegistry actionRegistry; private final ActionRegistry actionRegistry;
private final InputRegistry inputRegistry; private final InputRegistry inputRegistry;
private final TransformRegistry transformRegistry;
@Inject @Inject
public Parser(Settings settings, ConditionRegistry conditionRegistry, ActionRegistry actionRegistry, public Parser(Settings settings, ConditionRegistry conditionRegistry, ActionRegistry actionRegistry,
InputRegistry inputRegistry) { InputRegistry inputRegistry, TransformRegistry transformRegistry) {
super(settings); super(settings);
this.conditionRegistry = conditionRegistry; this.conditionRegistry = conditionRegistry;
this.actionRegistry = actionRegistry; this.actionRegistry = actionRegistry;
this.inputRegistry = inputRegistry; this.inputRegistry = inputRegistry;
this.transformRegistry = transformRegistry;
} }
public FiredAlert parse(BytesReference source, String historyId, long version) { public FiredAlert parse(BytesReference source, String historyId, long version) {
@ -249,8 +252,7 @@ public class FiredAlert implements ToXContent {
} else if (METADATA_FIELD.match(currentFieldName)) { } else if (METADATA_FIELD.match(currentFieldName)) {
alert.metadata = parser.map(); alert.metadata = parser.map();
} else if (ALERT_EXECUTION_FIELD.match(currentFieldName)) { } else if (ALERT_EXECUTION_FIELD.match(currentFieldName)) {
alert.execution = AlertExecution.Parser.parse(parser, conditionRegistry, actionRegistry, alert.execution = AlertExecution.Parser.parse(parser, conditionRegistry, actionRegistry, inputRegistry, transformRegistry);
inputRegistry);
} else { } else {
throw new AlertsException("unable to parse fired alert. unexpected field [" + currentFieldName + "]"); throw new AlertsException("unable to parse fired alert. unexpected field [" + currentFieldName + "]");
} }

View File

@ -147,10 +147,13 @@ public class HistoryService extends AbstractComponent {
ctx.onThrottleResult(throttleResult); ctx.onThrottleResult(throttleResult);
if (!throttleResult.throttle()) { if (!throttleResult.throttle()) {
Transform.Result result = alert.transform().apply(ctx, inputResult.payload()); Transform transform = alert.transform();
ctx.onTransformResult(result); if (transform != null) {
Transform.Result result = alert.transform().apply(ctx, inputResult.payload());
ctx.onTransformResult(result);
}
for (Action action : alert.actions()) { for (Action action : alert.actions()) {
Action.Result actionResult = action.execute(ctx, result.payload()); Action.Result actionResult = action.execute(ctx);
ctx.onActionResult(actionResult); ctx.onActionResult(actionResult);
} }
} }

View File

@ -9,17 +9,19 @@ import org.elasticsearch.alerts.AlertsSettingsException;
import org.elasticsearch.alerts.ExecutionContext; import org.elasticsearch.alerts.ExecutionContext;
import org.elasticsearch.alerts.Payload; import org.elasticsearch.alerts.Payload;
import org.elasticsearch.alerts.support.init.InitializingService; import org.elasticsearch.alerts.support.init.InitializingService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.collect.ImmutableList; import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.inject.Injector; import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException; import java.io.IOException;
import java.util.List;
/** /**
* *
*/ */
public class ChainTransform extends Transform { public class ChainTransform extends Transform<ChainTransform.Result> {
public static final String TYPE = "chain"; public static final String TYPE = "chain";
@ -40,10 +42,13 @@ public class ChainTransform extends Transform {
@Override @Override
public Result apply(ExecutionContext ctx, Payload payload) throws IOException { public Result apply(ExecutionContext ctx, Payload payload) throws IOException {
ImmutableList.Builder<Transform.Result> results = ImmutableList.builder();
for (Transform transform : transforms) { for (Transform transform : transforms) {
payload = transform.apply(ctx, payload).payload(); Transform.Result result = transform.apply(ctx, payload);
results.add(result);
payload = result.payload();
} }
return new Result(TYPE, payload); return new Result(TYPE, payload, results.build());
} }
@Override @Override
@ -74,7 +79,30 @@ public class ChainTransform extends Transform {
return transforms.hashCode(); return transforms.hashCode();
} }
public static class Parser implements Transform.Parser<ChainTransform>, InitializingService.Initializable { public static class Result extends Transform.Result {
private final List<Transform.Result> results;
public Result(String type, Payload payload, List<Transform.Result> results) {
super(type, payload);
this.results = results;
}
@Override
protected XContentBuilder xContentBody(XContentBuilder builder, Params params) throws IOException {
builder.startArray(Parser.RESULTS_FIELD.getPreferredName());
for (Transform.Result result : results) {
builder.startObject()
.field(result.type(), result)
.endObject();
}
return builder.endArray();
}
}
public static class Parser implements Transform.Parser<Result, ChainTransform>, InitializingService.Initializable {
public static final ParseField RESULTS_FIELD = new ParseField("results");
private TransformRegistry registry; private TransformRegistry registry;
@ -127,6 +155,47 @@ public class ChainTransform extends Transform {
} }
return new ChainTransform(builder.build()); return new ChainTransform(builder.build());
} }
@Override
public Result parseResult(XContentParser parser) throws IOException {
XContentParser.Token token = parser.currentToken();
if (token != XContentParser.Token.START_OBJECT) {
throw new TransformException("could not parse [chain] transform result. expected an object, but found [" + token + "]");
}
Payload payload = null;
ImmutableList.Builder<Transform.Result> results = ImmutableList.builder();
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else {
if (token == XContentParser.Token.START_OBJECT) {
if (PAYLOAD_FIELD.match(currentFieldName)) {
payload = new Payload.XContent(parser);
} else {
throw new TransformException("could not parse [chain] transform result. unexpected object field [" + currentFieldName + "]");
}
} else if (token == XContentParser.Token.START_ARRAY) {
if (RESULTS_FIELD.match(currentFieldName)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
if (token == XContentParser.Token.START_OBJECT) {
results.add(registry.parseResult(parser));
} else {
throw new TransformException("could not parse [chain] transform result. expected an object representing a transform result, but found [" + token + "]");
}
}
} else {
throw new TransformException("could not parse [chain] transform result. unexpected array field [" + currentFieldName + "]");
}
} else {
throw new TransformException("could not parse [chain] transform result. unexpected token [" + token+ "]");
}
}
}
return new Result(TYPE, payload, results.build());
}
} }
public static class SourceBuilder implements Transform.SourceBuilder { public static class SourceBuilder implements Transform.SourceBuilder {

View File

@ -24,7 +24,7 @@ import static org.elasticsearch.alerts.support.Variables.createCtxModel;
/** /**
* *
*/ */
public class ScriptTransform extends Transform { public class ScriptTransform extends Transform<ScriptTransform.Result> {
public static final String TYPE = "script"; public static final String TYPE = "script";
@ -80,7 +80,19 @@ public class ScriptTransform extends Transform {
return script.hashCode(); return script.hashCode();
} }
public static class Parser implements Transform.Parser<ScriptTransform> { public static class Result extends Transform.Result {
public Result(String type, Payload payload) {
super(type, payload);
}
@Override
protected XContentBuilder xContentBody(XContentBuilder builder, Params params) throws IOException {
return builder;
}
}
public static class Parser implements Transform.Parser<Result, ScriptTransform> {
private final ScriptServiceProxy scriptService; private final ScriptServiceProxy scriptService;
@ -104,6 +116,23 @@ public class ScriptTransform extends Transform {
} }
return new ScriptTransform(scriptService, script); return new ScriptTransform(scriptService, script);
} }
@Override
public Result parseResult(XContentParser parser) throws IOException {
XContentParser.Token token = parser.currentToken();
if (token != XContentParser.Token.START_OBJECT) {
throw new TransformException("could not parse [script] transform result. expected an object, but found [" + token + "]");
}
token = parser.nextToken();
if (token != XContentParser.Token.FIELD_NAME || !PAYLOAD_FIELD.match(parser.currentName())) {
throw new TransformException("could not parse [script] transform result. expected a payload field, but found [" + token + "]");
}
token = parser.nextToken();
if (token != XContentParser.Token.START_OBJECT) {
throw new TransformException("could not parse [script] transform result. expected a payload object, but found [" + token + "]");
}
return new Result(TYPE, new Payload.XContent(parser));
}
} }
public static class SourceBuilder implements Transform.SourceBuilder { public static class SourceBuilder implements Transform.SourceBuilder {

View File

@ -36,7 +36,7 @@ import static org.elasticsearch.alerts.support.Variables.createCtxModel;
/** /**
* *
*/ */
public class SearchTransform extends Transform { public class SearchTransform extends Transform<SearchTransform.Result> {
public static final String TYPE = "search"; public static final String TYPE = "search";
@ -61,10 +61,10 @@ public class SearchTransform extends Transform {
} }
@Override @Override
public Transform.Result apply(ExecutionContext ctx, Payload payload) throws IOException { public Result apply(ExecutionContext ctx, Payload payload) throws IOException {
SearchRequest req = createRequest(request, ctx, payload); SearchRequest req = createRequest(request, ctx, payload);
SearchResponse resp = client.search(req).actionGet(); SearchResponse resp = client.search(req).actionGet();
return new Transform.Result(TYPE, new Payload.ActionResponse(resp)); return new Result(TYPE, new Payload.ActionResponse(resp));
} }
@Override @Override
@ -109,7 +109,19 @@ public class SearchTransform extends Transform {
return request; return request;
} }
public static class Parser extends AbstractComponent implements Transform.Parser<SearchTransform> { public static class Result extends Transform.Result {
public Result(String type, Payload payload) {
super(type, payload);
}
@Override
protected XContentBuilder xContentBody(XContentBuilder builder, Params params) throws IOException {
return builder;
}
}
public static class Parser extends AbstractComponent implements Transform.Parser<Result, SearchTransform> {
protected final ScriptServiceProxy scriptService; protected final ScriptServiceProxy scriptService;
protected final ClientProxy client; protected final ClientProxy client;
@ -131,6 +143,23 @@ public class SearchTransform extends Transform {
SearchRequest request = AlertUtils.readSearchRequest(parser, DEFAULT_SEARCH_TYPE); SearchRequest request = AlertUtils.readSearchRequest(parser, DEFAULT_SEARCH_TYPE);
return new SearchTransform(logger, scriptService, client, request); return new SearchTransform(logger, scriptService, client, request);
} }
@Override
public Result parseResult(XContentParser parser) throws IOException {
XContentParser.Token token = parser.currentToken();
if (token != XContentParser.Token.START_OBJECT) {
throw new TransformException("could not parse [search] transform result. expected an object, but found [" + token + "]");
}
token = parser.nextToken();
if (token != XContentParser.Token.FIELD_NAME || !PAYLOAD_FIELD.match(parser.currentName())) {
throw new TransformException("could not parse [search] transform result. expected a payload field, but found [" + token + "]");
}
token = parser.nextToken();
if (token != XContentParser.Token.START_OBJECT) {
throw new TransformException("could not parse [search] transform result. expected a payload object, but found [" + token + "]");
}
return new Result(TYPE, new Payload.XContent(parser));
}
} }
public static class SourceBuilder implements Transform.SourceBuilder { public static class SourceBuilder implements Transform.SourceBuilder {

View File

@ -5,7 +5,6 @@
*/ */
package org.elasticsearch.alerts.transform; package org.elasticsearch.alerts.transform;
import org.elasticsearch.alerts.AlertsSettingsException;
import org.elasticsearch.alerts.ExecutionContext; import org.elasticsearch.alerts.ExecutionContext;
import org.elasticsearch.alerts.Payload; import org.elasticsearch.alerts.Payload;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
@ -18,36 +17,16 @@ import java.io.IOException;
/** /**
* *
*/ */
public abstract class Transform implements ToXContent { public abstract class Transform<R extends Transform.Result> implements ToXContent {
public static final Transform NOOP = new Transform() {
@Override
public String type() {
return "noop";
}
@Override
public Result apply(ExecutionContext context, Payload payload) throws IOException {
return new Result("noop", payload);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject().endObject();
}
};
public abstract String type(); public abstract String type();
public abstract Result apply(ExecutionContext ctx, Payload payload) throws IOException; public abstract Result apply(ExecutionContext ctx, Payload payload) throws IOException;
public static class Result implements ToXContent { public static abstract class Result implements ToXContent {
public static final ParseField TYPE_FIELD = new ParseField("type"); protected final String type;
public static final ParseField PAYLOAD_FIELD = new ParseField("payload"); protected final Payload payload;
private final String type;
private final Payload payload;
public Result(String type, Payload payload) { public Result(String type, Payload payload) {
this.type = type; this.type = type;
@ -64,55 +43,19 @@ public abstract class Transform implements ToXContent {
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject() builder.startObject();
.field(TYPE_FIELD.getPreferredName(), type) builder.field(Parser.PAYLOAD_FIELD.getPreferredName(), payload);
.field(PAYLOAD_FIELD.getPreferredName(), payload) xContentBody(builder, params);
.endObject(); return builder.endObject();
} }
public static Result parse(XContentParser parser) throws IOException { protected abstract XContentBuilder xContentBody(XContentBuilder builder, Params params) throws IOException;
XContentParser.Token token = parser.currentToken();
if (token != XContentParser.Token.START_OBJECT) {
throw new AlertsSettingsException("could not parse transform result. expected an object, but found [" + token + "]");
}
String type = null;
Payload payload = null;
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else {
if (token == XContentParser.Token.VALUE_STRING) {
if (TYPE_FIELD.match(currentFieldName)) {
type = parser.text();
} else {
throw new AlertsSettingsException("could not parse transform result. expected a string value for field [" + currentFieldName + "], found [" + token + "]");
}
} else if (token == XContentParser.Token.START_OBJECT) {
if (PAYLOAD_FIELD.match(currentFieldName)) {
payload = new Payload.XContent(parser);
} else {
throw new AlertsSettingsException("could not parse transform result. expected an object for field [" + currentFieldName + "], found [" + token + "]");
}
} else {
throw new AlertsSettingsException("could not parse transform result. unexpected token [" + token + "]");
}
}
}
if (type == null) {
throw new AlertsSettingsException("could not parse transform result. missing [type] field");
}
if (payload == null) {
throw new AlertsSettingsException("could not parse transform result. missing [payload] field");
}
return new Result(type, payload);
}
} }
public static interface Parser<T extends Transform> { public static interface Parser<R extends Transform.Result, T extends Transform<R>> {
public static final ParseField PAYLOAD_FIELD = new ParseField("payload");
public static final ParseField TRANSFORM_FIELD = new ParseField("transform"); public static final ParseField TRANSFORM_FIELD = new ParseField("transform");
public static final ParseField TRANSFORM_RESULT_FIELD = new ParseField("transform_result"); public static final ParseField TRANSFORM_RESULT_FIELD = new ParseField("transform_result");
@ -120,6 +63,8 @@ public abstract class Transform implements ToXContent {
T parse(XContentParser parser) throws IOException; T parse(XContentParser parser) throws IOException;
R parseResult(XContentParser parser) throws IOException;
} }
public static interface SourceBuilder extends ToXContent { public static interface SourceBuilder extends ToXContent {

View File

@ -46,4 +46,26 @@ public class TransformRegistry {
} }
return transformParser.parse(parser); return transformParser.parse(parser);
} }
public Transform.Result parseResult(XContentParser parser) throws IOException {
String type = null;
XContentParser.Token token;
Transform.Result result = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
type = parser.currentName();
} else if (type != null) {
result = parseResult(type, parser);
}
}
return result;
}
public Transform.Result parseResult(String type, XContentParser parser) throws IOException {
Transform.Parser transformParser = parsers.get(type);
if (transformParser == null) {
throw new TransformException("unknown transform type [" + type + "]");
}
return transformParser.parseResult(parser);
}
} }

View File

@ -222,7 +222,7 @@ public class AlertTests extends ElasticsearchTestCase {
case SearchTransform.TYPE: case SearchTransform.TYPE:
return new SearchTransform(logger, scriptService, client, matchAllRequest(AlertUtils.DEFAULT_INDICES_OPTIONS)); return new SearchTransform(logger, scriptService, client, matchAllRequest(AlertUtils.DEFAULT_INDICES_OPTIONS));
default: // chain default: // chain
return new ChainTransform(ImmutableList.of( return new ChainTransform(ImmutableList.<Transform>of(
new SearchTransform(logger, scriptService, client, matchAllRequest(AlertUtils.DEFAULT_INDICES_OPTIONS)), new SearchTransform(logger, scriptService, client, matchAllRequest(AlertUtils.DEFAULT_INDICES_OPTIONS)),
new ScriptTransform(scriptService, new Script("_script")))); new ScriptTransform(scriptService, new Script("_script"))));
} }

View File

@ -89,8 +89,12 @@ public class EmailActionTests extends ElasticsearchTestCase {
when(ctx.alert()).thenReturn(alert); when(ctx.alert()).thenReturn(alert);
when(ctx.fireTime()).thenReturn(now); when(ctx.fireTime()).thenReturn(now);
when(ctx.scheduledTime()).thenReturn(now); when(ctx.scheduledTime()).thenReturn(now);
when(ctx.payload()).thenReturn(payload);
if (transform != null) { if (transform != null) {
when(transform.apply(ctx, payload)).thenReturn(new Transform.Result("_transform_type", new Payload.Simple("_key", "_value"))); Transform.Result transformResult = mock(Transform.Result.class);
when(transformResult.type()).thenReturn("_transform_type");
when(transformResult.payload()).thenReturn(new Payload.Simple("_key", "_value"));
when(transform.apply(ctx, payload)).thenReturn(transformResult);
} }
Map<String, Object> expectedModel = ImmutableMap.<String, Object>builder() Map<String, Object> expectedModel = ImmutableMap.<String, Object>builder()
.put("ctx", ImmutableMap.<String, Object>builder() .put("ctx", ImmutableMap.<String, Object>builder()
@ -106,7 +110,7 @@ public class EmailActionTests extends ElasticsearchTestCase {
when (htmlBody.render(expectedModel)).thenReturn("_html_body"); when (htmlBody.render(expectedModel)).thenReturn("_html_body");
} }
EmailAction.Result result = action.execute(ctx, payload); EmailAction.Result result = action.execute(ctx);
assertThat(result, notNullValue()); assertThat(result, notNullValue());
assertThat(result, instanceOf(EmailAction.Result.Success.class)); assertThat(result, instanceOf(EmailAction.Result.Success.class));
@ -316,18 +320,24 @@ public class EmailActionTests extends ElasticsearchTestCase {
.textBody("_text_body") .textBody("_text_body")
.build(); .build();
boolean withTransform = randomBoolean(); Transform.Result transformResult = randomBoolean() ? null : mock(Transform.Result.class);
if (transformResult != null) {
when(transformResult.type()).thenReturn("_transform_type");
when(transformResult.payload()).thenReturn(new Payload.Simple("_key", "_value"));
}
TransformRegistry transformRegistry = transformResult != null ? new TransformRegistryMock(transformResult) : mock(TransformRegistry.class);
XContentBuilder builder = jsonBuilder().startObject() XContentBuilder builder = jsonBuilder().startObject()
.field("success", success); .field("success", success);
if (success) { if (success) {
builder.field("email", email); builder.field("email", email);
builder.field("account", "_account"); builder.field("account", "_account");
if (withTransform) { if (transformResult != null) {
builder.startObject("transform_result") builder.startObject("transform_result")
.field("type", "_transform_type") .startObject("_transform_type")
.field("payload", new Payload.Simple("_key", "_value").data()) .field("payload", new Payload.Simple("_key", "_value").data())
.endObject(); .endObject()
.endObject();
} }
} else { } else {
builder.field("reason", "_reason"); builder.field("reason", "_reason");
@ -336,14 +346,14 @@ public class EmailActionTests extends ElasticsearchTestCase {
BytesReference bytes = builder.bytes(); BytesReference bytes = builder.bytes();
XContentParser parser = JsonXContent.jsonXContent.createParser(bytes); XContentParser parser = JsonXContent.jsonXContent.createParser(bytes);
parser.nextToken(); parser.nextToken();
EmailAction.Result result = new EmailAction.Parser(ImmutableSettings.EMPTY, mock(EmailService.class), new TemplateMock.Parser(), mock(TransformRegistry.class)) EmailAction.Result result = new EmailAction.Parser(ImmutableSettings.EMPTY, mock(EmailService.class), new TemplateMock.Parser(), transformRegistry)
.parseResult(parser); .parseResult(parser);
assertThat(result.success(), is(success)); assertThat(result.success(), is(success));
if (success) { if (success) {
assertThat(result, instanceOf(EmailAction.Result.Success.class)); assertThat(result, instanceOf(EmailAction.Result.Success.class));
assertThat(((EmailAction.Result.Success) result).email(), equalTo(email)); assertThat(((EmailAction.Result.Success) result).email(), equalTo(email));
assertThat(((EmailAction.Result.Success) result).account(), is("_account")); assertThat(((EmailAction.Result.Success) result).account(), is("_account"));
if (withTransform) { if (transformResult != null) {
assertThat(result.transformResult(), notNullValue()); assertThat(result.transformResult(), notNullValue());
assertThat(result.transformResult().type(), equalTo("_transform_type")); assertThat(result.transformResult().type(), equalTo("_transform_type"));
assertThat(result.transformResult().payload().data(), equalTo(new Payload.Simple("_key", "_value").data())); assertThat(result.transformResult().payload().data(), equalTo(new Payload.Simple("_key", "_value").data()));
@ -412,7 +422,7 @@ public class EmailActionTests extends ElasticsearchTestCase {
} }
} }
static class TransformMock extends Transform { static class TransformMock extends Transform<TransformMock.Result> {
@Override @Override
public String type() { public String type() {
@ -421,13 +431,25 @@ public class EmailActionTests extends ElasticsearchTestCase {
@Override @Override
public Result apply(ExecutionContext ctx, Payload payload) throws IOException { public Result apply(ExecutionContext ctx, Payload payload) throws IOException {
return new Transform.Result("_transform", new Payload.Simple("_key", "_value")); return new Result("_transform", new Payload.Simple("_key", "_value"));
} }
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject().endObject(); return builder.startObject().endObject();
} }
public static class Result extends Transform.Result {
public Result(String type, Payload payload) {
super(type, payload);
}
@Override
protected XContentBuilder xContentBody(XContentBuilder builder, Params params) throws IOException {
return builder;
}
}
} }
static class TransformRegistryMock extends TransformRegistry { static class TransformRegistryMock extends TransformRegistry {
@ -445,6 +467,40 @@ public class EmailActionTests extends ElasticsearchTestCase {
assertThat(parser.currentToken(), is(XContentParser.Token.END_OBJECT)); assertThat(parser.currentToken(), is(XContentParser.Token.END_OBJECT));
return transform; return transform;
} }
@Override
public Transform.Result parseResult(XContentParser parser) throws IOException {
return null; // should not be called when this ctor is used
}
}));
}
public TransformRegistryMock(final Transform.Result result) {
super(ImmutableMap.<String, Transform.Parser>of("_transform_type", new Transform.Parser() {
@Override
public String type() {
return result.type();
}
@Override
public Transform parse(XContentParser parser) throws IOException {
return null; // should not be called when this ctor is used.
}
@Override
public Transform.Result parseResult(XContentParser parser) throws IOException {
assertThat(parser.currentToken(), is(XContentParser.Token.START_OBJECT));
parser.nextToken();
assertThat(parser.currentToken(), is(XContentParser.Token.FIELD_NAME));
assertThat(parser.currentName(), is("payload"));
parser.nextToken();
assertThat(parser.currentToken(), is(XContentParser.Token.START_OBJECT));
Map<String, Object> data = parser.map();
assertThat(data, equalTo(result.payload().data()));
parser.nextToken();
assertThat(parser.currentToken(), is(XContentParser.Token.END_OBJECT));
return result;
}
})); }));
} }
} }

View File

@ -7,13 +7,11 @@ package org.elasticsearch.alerts.condition.script;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.ExecutionContext; import org.elasticsearch.alerts.ExecutionContext;
import org.elasticsearch.alerts.Payload; import org.elasticsearch.alerts.Payload;
import org.elasticsearch.alerts.support.Script; import org.elasticsearch.alerts.support.Script;
import org.elasticsearch.alerts.support.init.proxy.ScriptServiceProxy; import org.elasticsearch.alerts.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.alerts.test.AbstractAlertsSingleNodeTests; import org.elasticsearch.alerts.test.AbstractAlertsSingleNodeTests;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.StringText; import org.elasticsearch.common.text.StringText;
@ -37,7 +35,7 @@ import org.junit.Test;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import static org.mockito.Mockito.mock; import static org.elasticsearch.alerts.test.AlertsTestUtils.mockExecutionContext;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
/** /**
@ -78,13 +76,8 @@ public class ScriptConditionSearchTests extends AbstractAlertsSingleNodeTests {
.get(); .get();
ScriptCondition condition = new ScriptCondition(logger, scriptService, new Script("ctx.payload.aggregations.rate.buckets[0]?.doc_count >= 5")); ScriptCondition condition = new ScriptCondition(logger, scriptService, new Script("ctx.payload.aggregations.rate.buckets[0]?.doc_count >= 5"));
ExecutionContext ctx = mock(ExecutionContext.class);
Alert alert = mock(Alert.class); ExecutionContext ctx = mockExecutionContext("_name", new Payload.ActionResponse(response));
when(alert.name()).thenReturn("_name");
when(ctx.alert()).thenReturn(alert);
when(ctx.scheduledTime()).thenReturn(new DateTime());
when(ctx.fireTime()).thenReturn(new DateTime());
when(ctx.payload()).thenReturn(new Payload.ActionResponse(response));
assertFalse(condition.execute(ctx).met()); assertFalse(condition.execute(ctx).met());
client().prepareIndex("my-index", "my-type").setTimestamp("2005-01-01T00:40").setSource("{}").get(); client().prepareIndex("my-index", "my-type").setTimestamp("2005-01-01T00:40").setSource("{}").get();
@ -93,11 +86,8 @@ public class ScriptConditionSearchTests extends AbstractAlertsSingleNodeTests {
response = client().prepareSearch("my-index") response = client().prepareSearch("my-index")
.addAggregation(AggregationBuilders.dateHistogram("rate").field("_timestamp").interval(DateHistogram.Interval.HOUR).order(Histogram.Order.COUNT_DESC)) .addAggregation(AggregationBuilders.dateHistogram("rate").field("_timestamp").interval(DateHistogram.Interval.HOUR).order(Histogram.Order.COUNT_DESC))
.get(); .get();
when(alert.name()).thenReturn("_name");
when(ctx.alert()).thenReturn(alert); ctx = mockExecutionContext("_name", new Payload.ActionResponse(response));
when(ctx.scheduledTime()).thenReturn(new DateTime());
when(ctx.fireTime()).thenReturn(new DateTime());
when(ctx.payload()).thenReturn(new Payload.ActionResponse(response));
assertTrue(condition.execute(ctx).met()); assertTrue(condition.execute(ctx).met());
} }
@ -110,13 +100,8 @@ public class ScriptConditionSearchTests extends AbstractAlertsSingleNodeTests {
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(new InternalSearchHits(new InternalSearchHit[]{hit}, 1l, 1f), null, null, null, false, null); InternalSearchResponse internalSearchResponse = new InternalSearchResponse(new InternalSearchHits(new InternalSearchHit[]{hit}, 1l, 1f), null, null, null, false, null);
SearchResponse response = new SearchResponse(internalSearchResponse, "", 3, 3, 500l, new ShardSearchFailure[0]); SearchResponse response = new SearchResponse(internalSearchResponse, "", 3, 3, 500l, new ShardSearchFailure[0]);
ExecutionContext ctx = mock(ExecutionContext.class);
Alert alert = mock(Alert.class); ExecutionContext ctx = mockExecutionContext("_alert_name", new Payload.ActionResponse(response));
when(alert.name()).thenReturn("_name");
when(ctx.alert()).thenReturn(alert);
when(ctx.scheduledTime()).thenReturn(new DateTime());
when(ctx.fireTime()).thenReturn(new DateTime());
when(ctx.payload()).thenReturn(new Payload.ActionResponse(response));
assertTrue(condition.execute(ctx).met()); assertTrue(condition.execute(ctx).met());
hit.score(2f); hit.score(2f);
when(ctx.payload()).thenReturn(new Payload.ActionResponse(response)); when(ctx.payload()).thenReturn(new Payload.ActionResponse(response));

View File

@ -38,6 +38,7 @@ public class HistoryServiceTests extends ElasticsearchTestCase {
private Payload payload; private Payload payload;
private Input input; private Input input;
private Input.Result inputResult;
private HistoryService historyService; private HistoryService historyService;
@ -45,7 +46,7 @@ public class HistoryServiceTests extends ElasticsearchTestCase {
public void init() throws Exception { public void init() throws Exception {
payload = mock(Payload.class); payload = mock(Payload.class);
input = mock(Input.class); input = mock(Input.class);
Input.Result inputResult = mock(Input.Result.class); inputResult = mock(Input.Result.class);
when(inputResult.payload()).thenReturn(payload); when(inputResult.payload()).thenReturn(payload);
when(input.execute(any(ExecutionContext.class))).thenReturn(inputResult); when(input.execute(any(ExecutionContext.class))).thenReturn(inputResult);
@ -65,7 +66,7 @@ public class HistoryServiceTests extends ElasticsearchTestCase {
Transform.Result transformResult = mock(Transform.Result.class); Transform.Result transformResult = mock(Transform.Result.class);
when(transformResult.payload()).thenReturn(payload); when(transformResult.payload()).thenReturn(payload);
Action.Result actionResult = mock(Action.Result.class); Action.Result actionResult = mock(Action.Result.class);
when(actionResult.type()).thenReturn("actionResult"); when(actionResult.type()).thenReturn("_action_type");
Condition condition = mock(Condition.class); Condition condition = mock(Condition.class);
when(condition.execute(any(ExecutionContext.class))).thenReturn(conditionResult); when(condition.execute(any(ExecutionContext.class))).thenReturn(conditionResult);
@ -74,7 +75,7 @@ public class HistoryServiceTests extends ElasticsearchTestCase {
Transform transform = mock(Transform.class); Transform transform = mock(Transform.class);
when(transform.apply(any(ExecutionContext.class), same(payload))).thenReturn(transformResult); when(transform.apply(any(ExecutionContext.class), same(payload))).thenReturn(transformResult);
Action action = mock(Action.class); Action action = mock(Action.class);
when(action.execute(any(ExecutionContext.class), same(payload))).thenReturn(actionResult); when(action.execute(any(ExecutionContext.class))).thenReturn(actionResult);
Actions actions = new Actions(Arrays.asList(action)); Actions actions = new Actions(Arrays.asList(action));
Alert.Status alertStatus = new Alert.Status(); Alert.Status alertStatus = new Alert.Status();
@ -89,14 +90,14 @@ public class HistoryServiceTests extends ElasticsearchTestCase {
ExecutionContext context = new ExecutionContext("1", alert, DateTime.now(), DateTime.now()); ExecutionContext context = new ExecutionContext("1", alert, DateTime.now(), DateTime.now());
AlertExecution alertExecution = historyService.execute(context); AlertExecution alertExecution = historyService.execute(context);
assertThat(alertExecution.conditionResult(), sameInstance(conditionResult)); assertThat(alertExecution.conditionResult(), sameInstance(conditionResult));
assertThat(alertExecution.payload(), sameInstance(payload)); assertThat(alertExecution.transformResult(), sameInstance(transformResult));
assertThat(alertExecution.throttleResult(), sameInstance(throttleResult)); assertThat(alertExecution.throttleResult(), sameInstance(throttleResult));
assertThat(alertExecution.actionsResults().get("actionResult"), sameInstance(actionResult)); assertThat(alertExecution.actionsResults().get("_action_type"), sameInstance(actionResult));
verify(condition, times(1)).execute(any(ExecutionContext.class)); verify(condition, times(1)).execute(any(ExecutionContext.class));
verify(throttler, times(1)).throttle(any(ExecutionContext.class)); verify(throttler, times(1)).throttle(any(ExecutionContext.class));
verify(transform, times(1)).apply(any(ExecutionContext.class), same(payload)); verify(transform, times(1)).apply(any(ExecutionContext.class), same(payload));
verify(action, times(1)).execute(any(ExecutionContext.class), same(payload)); verify(action, times(1)).execute(any(ExecutionContext.class));
} }
@Test @Test
@ -108,7 +109,7 @@ public class HistoryServiceTests extends ElasticsearchTestCase {
Transform.Result transformResult = mock(Transform.Result.class); Transform.Result transformResult = mock(Transform.Result.class);
when(transformResult.payload()).thenReturn(payload); when(transformResult.payload()).thenReturn(payload);
Action.Result actionResult = mock(Action.Result.class); Action.Result actionResult = mock(Action.Result.class);
when(actionResult.type()).thenReturn("actionResult"); when(actionResult.type()).thenReturn("_action_type");
Condition condition = mock(Condition.class); Condition condition = mock(Condition.class);
when(condition.execute(any(ExecutionContext.class))).thenReturn(conditionResult); when(condition.execute(any(ExecutionContext.class))).thenReturn(conditionResult);
@ -117,7 +118,7 @@ public class HistoryServiceTests extends ElasticsearchTestCase {
Transform transform = mock(Transform.class); Transform transform = mock(Transform.class);
when(transform.apply(any(ExecutionContext.class), same(payload))).thenReturn(transformResult); when(transform.apply(any(ExecutionContext.class), same(payload))).thenReturn(transformResult);
Action action = mock(Action.class); Action action = mock(Action.class);
when(action.execute(any(ExecutionContext.class), same(payload))).thenReturn(actionResult); when(action.execute(any(ExecutionContext.class))).thenReturn(actionResult);
Actions actions = new Actions(Arrays.asList(action)); Actions actions = new Actions(Arrays.asList(action));
Alert.Status alertStatus = new Alert.Status(); Alert.Status alertStatus = new Alert.Status();
@ -131,15 +132,16 @@ public class HistoryServiceTests extends ElasticsearchTestCase {
ExecutionContext context = new ExecutionContext("1", alert, DateTime.now(), DateTime.now()); ExecutionContext context = new ExecutionContext("1", alert, DateTime.now(), DateTime.now());
AlertExecution alertExecution = historyService.execute(context); AlertExecution alertExecution = historyService.execute(context);
assertThat(alertExecution.inputResult(), sameInstance(inputResult));
assertThat(alertExecution.conditionResult(), sameInstance(conditionResult)); assertThat(alertExecution.conditionResult(), sameInstance(conditionResult));
assertThat(alertExecution.payload(), sameInstance(payload));
assertThat(alertExecution.throttleResult(), sameInstance(throttleResult)); assertThat(alertExecution.throttleResult(), sameInstance(throttleResult));
assertThat(alertExecution.actionsResults().isEmpty(), is(true)); assertThat(alertExecution.actionsResults().isEmpty(), is(true));
assertThat(alertExecution.transformResult(), nullValue());
verify(condition, times(1)).execute(any(ExecutionContext.class)); verify(condition, times(1)).execute(any(ExecutionContext.class));
verify(throttler, times(1)).throttle(any(ExecutionContext.class)); verify(throttler, times(1)).throttle(any(ExecutionContext.class));
verify(transform, never()).apply(any(ExecutionContext.class), same(payload)); verify(transform, never()).apply(any(ExecutionContext.class), same(payload));
verify(action, never()).execute(any(ExecutionContext.class), same(payload)); verify(action, never()).execute(any(ExecutionContext.class));
} }
@Test @Test
@ -150,7 +152,7 @@ public class HistoryServiceTests extends ElasticsearchTestCase {
Transform.Result transformResult = mock(Transform.Result.class); Transform.Result transformResult = mock(Transform.Result.class);
Action.Result actionResult = mock(Action.Result.class); Action.Result actionResult = mock(Action.Result.class);
when(actionResult.type()).thenReturn("actionResult"); when(actionResult.type()).thenReturn("_action_type");
Condition condition = mock(Condition.class); Condition condition = mock(Condition.class);
when(condition.execute(any(ExecutionContext.class))).thenReturn(conditionResult); when(condition.execute(any(ExecutionContext.class))).thenReturn(conditionResult);
@ -159,7 +161,7 @@ public class HistoryServiceTests extends ElasticsearchTestCase {
Transform transform = mock(Transform.class); Transform transform = mock(Transform.class);
when(transform.apply(any(ExecutionContext.class), same(payload))).thenReturn(transformResult); when(transform.apply(any(ExecutionContext.class), same(payload))).thenReturn(transformResult);
Action action = mock(Action.class); Action action = mock(Action.class);
when(action.execute(any(ExecutionContext.class), same(payload))).thenReturn(actionResult); when(action.execute(any(ExecutionContext.class))).thenReturn(actionResult);
Actions actions = new Actions(Arrays.asList(action)); Actions actions = new Actions(Arrays.asList(action));
Alert.Status alertStatus = new Alert.Status(); Alert.Status alertStatus = new Alert.Status();
@ -173,15 +175,16 @@ public class HistoryServiceTests extends ElasticsearchTestCase {
ExecutionContext context = new ExecutionContext("1", alert, DateTime.now(), DateTime.now()); ExecutionContext context = new ExecutionContext("1", alert, DateTime.now(), DateTime.now());
AlertExecution alertExecution = historyService.execute(context); AlertExecution alertExecution = historyService.execute(context);
assertThat(alertExecution.inputResult(), sameInstance(inputResult));
assertThat(alertExecution.conditionResult(), sameInstance(conditionResult)); assertThat(alertExecution.conditionResult(), sameInstance(conditionResult));
assertThat(alertExecution.payload(), sameInstance(payload));
assertThat(alertExecution.throttleResult(), nullValue()); assertThat(alertExecution.throttleResult(), nullValue());
assertThat(alertExecution.transformResult(), nullValue());
assertThat(alertExecution.actionsResults().isEmpty(), is(true)); assertThat(alertExecution.actionsResults().isEmpty(), is(true));
verify(condition, times(1)).execute(any(ExecutionContext.class)); verify(condition, times(1)).execute(any(ExecutionContext.class));
verify(throttler, never()).throttle(any(ExecutionContext.class)); verify(throttler, never()).throttle(any(ExecutionContext.class));
verify(transform, never()).apply(any(ExecutionContext.class), same(payload)); verify(transform, never()).apply(any(ExecutionContext.class), same(payload));
verify(action, never()).execute(any(ExecutionContext.class), same(payload)); verify(action, never()).execute(any(ExecutionContext.class));
} }
} }

View File

@ -8,6 +8,8 @@ package org.elasticsearch.alerts.test;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.alerts.Alert; import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.ExecutionContext;
import org.elasticsearch.alerts.Payload;
import org.elasticsearch.alerts.actions.Action; import org.elasticsearch.alerts.actions.Action;
import org.elasticsearch.alerts.actions.Actions; import org.elasticsearch.alerts.actions.Actions;
import org.elasticsearch.alerts.actions.email.EmailAction; import org.elasticsearch.alerts.actions.email.EmailAction;
@ -28,6 +30,7 @@ import org.elasticsearch.alerts.support.template.ScriptTemplate;
import org.elasticsearch.alerts.support.template.Template; import org.elasticsearch.alerts.support.template.Template;
import org.elasticsearch.alerts.transform.SearchTransform; import org.elasticsearch.alerts.transform.SearchTransform;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.netty.handler.codec.http.HttpMethod; import org.elasticsearch.common.netty.handler.codec.http.HttpMethod;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
@ -43,6 +46,8 @@ import java.util.Map;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource; import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/** /**
* *
@ -72,6 +77,22 @@ public final class AlertsTestUtils {
return request; return request;
} }
public static ExecutionContext mockExecutionContext(String alertName, Payload payload) {
DateTime now = DateTime.now();
return mockExecutionContext(now, now, alertName, payload);
}
public static ExecutionContext mockExecutionContext(DateTime firedTime, DateTime scheduledTime, String alertName, Payload payload) {
ExecutionContext ctx = mock(ExecutionContext.class);
when(ctx.scheduledTime()).thenReturn(scheduledTime);
when(ctx.fireTime()).thenReturn(firedTime);
Alert alert = mock(Alert.class);
when(alert.name()).thenReturn(alertName);
when(ctx.alert()).thenReturn(alert);
when(ctx.payload()).thenReturn(payload);
return ctx;
}
public static Alert createTestAlert(String alertName, ScriptServiceProxy scriptService, HttpClient httpClient, EmailService emailService, ESLogger logger) throws AddressException { public static Alert createTestAlert(String alertName, ScriptServiceProxy scriptService, HttpClient httpClient, EmailService emailService, ESLogger logger) throws AddressException {
SearchRequest conditionRequest = newInputSearchRequest("my-condition-index").source(searchSource().query(matchAllQuery())); SearchRequest conditionRequest = newInputSearchRequest("my-condition-index").source(searchSource().query(matchAllQuery()));
SearchRequest transformRequest = newInputSearchRequest("my-payload-index").source(searchSource().query(matchAllQuery())); SearchRequest transformRequest = newInputSearchRequest("my-payload-index").source(searchSource().query(matchAllQuery()));

View File

@ -78,7 +78,7 @@ public class ChainTransformTests extends ElasticsearchTestCase {
} }
} }
private static class NamedTransform extends Transform { private static class NamedTransform extends Transform<NamedTransform.Result> {
private final String name; private final String name;
@ -109,7 +109,19 @@ public class ChainTransformTests extends ElasticsearchTestCase {
return builder.startObject().field("name", name).endObject(); return builder.startObject().field("name", name).endObject();
} }
public static class Parser implements Transform.Parser<NamedTransform> { public static class Result extends Transform.Result {
public Result(String type, Payload payload) {
super(type, payload);
}
@Override
protected XContentBuilder xContentBody(XContentBuilder builder, Params params) throws IOException {
return builder;
}
}
public static class Parser implements Transform.Parser<Result, NamedTransform> {
@Override @Override
public String type() { public String type() {
@ -128,6 +140,19 @@ public class ChainTransformTests extends ElasticsearchTestCase {
assert token == XContentParser.Token.END_OBJECT; assert token == XContentParser.Token.END_OBJECT;
return new NamedTransform(name); return new NamedTransform(name);
} }
@Override
public Result parseResult(XContentParser parser) throws IOException {
assert parser.currentToken() == XContentParser.Token.START_OBJECT;
XContentParser.Token token = parser.nextToken();
assert token == XContentParser.Token.FIELD_NAME; // the "payload" field
token = parser.nextToken();
assert token == XContentParser.Token.START_OBJECT;
Payload payload = new Payload.XContent(parser);
token = parser.nextToken();
assert token == XContentParser.Token.END_OBJECT;
return new Result("named", payload);
}
} }
} }