diff --git a/src/main/java/org/elasticsearch/watcher/actions/ActionWrapper.java b/src/main/java/org/elasticsearch/watcher/actions/ActionWrapper.java index 2ada3adfe32..a1f247c3a76 100644 --- a/src/main/java/org/elasticsearch/watcher/actions/ActionWrapper.java +++ b/src/main/java/org/elasticsearch/watcher/actions/ActionWrapper.java @@ -78,9 +78,13 @@ public class ActionWrapper implements ToXContent { if (transform != null) { try { transformResult = transform.execute(ctx, payload); + if (transformResult.status() == Transform.Result.Status.FAILURE) { + action.logger().error("failed to execute action [{}/{}]. failed to transform payload. {}", ctx.watch().id(), id, transformResult.reason()); + return new ActionWrapper.Result(id, transformResult, new Action.Result.Failure(action.type(), "Failed to transform payload")); + } payload = transformResult.payload(); } catch (Exception e) { - action.logger.error("failed to execute action [{}/{}]. failed to transform payload.", e, ctx.watch().id(), id); + action.logger().error("failed to execute action [{}/{}]. failed to transform payload.", e, ctx.watch().id(), id); return new ActionWrapper.Result(id, new Action.Result.Failure(action.type(), "Failed to transform payload. error: " + ExceptionsHelper.detailedMessage(e))); } } @@ -88,7 +92,7 @@ public class ActionWrapper implements ToXContent { Action.Result actionResult = action.execute(id, ctx, payload); return new ActionWrapper.Result(id, transformResult, actionResult); } catch (Exception e) { - action.logger.error("failed to execute action [{}/{}]", e, ctx.watch().id(), id); + action.logger().error("failed to execute action [{}/{}]", e, ctx.watch().id(), id); return new ActionWrapper.Result(id, new Action.Result.Failure(action.type(), ExceptionsHelper.detailedMessage(e))); } } diff --git a/src/main/java/org/elasticsearch/watcher/actions/ExecutableAction.java b/src/main/java/org/elasticsearch/watcher/actions/ExecutableAction.java index d4a6bf7c056..9312a540187 100644 --- a/src/main/java/org/elasticsearch/watcher/actions/ExecutableAction.java +++ b/src/main/java/org/elasticsearch/watcher/actions/ExecutableAction.java @@ -36,6 +36,13 @@ public abstract class ExecutableAction implements ToXContent { return action; } + /** + * yack... needed to expose that for testing purposes + */ + public ESLogger logger() { + return logger; + } + public abstract Action.Result execute(String actionId, WatchExecutionContext context, Payload payload) throws Exception; @Override diff --git a/src/main/java/org/elasticsearch/watcher/execution/WatchExecutionContext.java b/src/main/java/org/elasticsearch/watcher/execution/WatchExecutionContext.java index 64adee1aeb5..59fe767d58e 100644 --- a/src/main/java/org/elasticsearch/watcher/execution/WatchExecutionContext.java +++ b/src/main/java/org/elasticsearch/watcher/execution/WatchExecutionContext.java @@ -121,6 +121,9 @@ public abstract class WatchExecutionContext { } beforeWatchTransform(); this.transformResult = watch.transform().execute(this, payload); + if (this.transformResult.status() == Transform.Result.Status.FAILURE) { + throw new WatchExecutionException("failed to execute watch level transform for [{}]", id); + } this.payload = transformResult.payload(); this.transformedPayload = this.payload; return transformedPayload; diff --git a/src/main/java/org/elasticsearch/watcher/execution/WatchExecutionException.java b/src/main/java/org/elasticsearch/watcher/execution/WatchExecutionException.java new file mode 100644 index 00000000000..6c3bc582012 --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/execution/WatchExecutionException.java @@ -0,0 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.execution; + +import org.elasticsearch.watcher.WatcherException; + +/** + * + */ +public class WatchExecutionException extends WatcherException { + + public WatchExecutionException(String msg, Object... args) { + super(msg, args); + } + + public WatchExecutionException(String msg, Throwable cause, Object... args) { + super(msg, cause, args); + } +} diff --git a/src/main/java/org/elasticsearch/watcher/transform/ExecutableTransform.java b/src/main/java/org/elasticsearch/watcher/transform/ExecutableTransform.java index a77bbf0efad..2f77080e328 100644 --- a/src/main/java/org/elasticsearch/watcher/transform/ExecutableTransform.java +++ b/src/main/java/org/elasticsearch/watcher/transform/ExecutableTransform.java @@ -34,7 +34,7 @@ public abstract class ExecutableTransform results) { + super(TYPE, e); + this.results = results; + } + public ImmutableList results() { return results; } @Override protected XContentBuilder typeXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(type); - builder.startArray(Field.RESULTS.getPreferredName()); - for (Transform.Result result : results) { - result.toXContent(builder, params); + if (!results.isEmpty()) { + builder.startObject(type); + builder.startArray(Field.RESULTS.getPreferredName()); + for (Transform.Result result : results) { + result.toXContent(builder, params); + } + builder.endArray(); + builder.endObject(); } - builder.endArray(); - return builder.endObject(); + return builder; } } diff --git a/src/main/java/org/elasticsearch/watcher/transform/chain/ExecutableChainTransform.java b/src/main/java/org/elasticsearch/watcher/transform/chain/ExecutableChainTransform.java index 5eca75538df..94d7244c180 100644 --- a/src/main/java/org/elasticsearch/watcher/transform/chain/ExecutableChainTransform.java +++ b/src/main/java/org/elasticsearch/watcher/transform/chain/ExecutableChainTransform.java @@ -31,11 +31,24 @@ public class ExecutableChainTransform extends ExecutableTransform results = ImmutableList.builder(); + try { + return doExecute(ctx, payload, results); + } catch (Exception e) { + logger.error("failed to execute [{}] transform for [{}]", e, ChainTransform.TYPE, ctx.id()); + return new ChainTransform.Result(e, results.build()); + } + } + + + ChainTransform.Result doExecute(WatchExecutionContext ctx, Payload payload, ImmutableList.Builder results) throws IOException { for (ExecutableTransform transform : transforms) { Transform.Result result = transform.execute(ctx, payload); results.add(result); + if (result.status() == Transform.Result.Status.FAILURE) { + throw new ChainTransformException("failed to execute [{}] transform. failed to execute sub-transform [{}]", ChainTransform.TYPE, transform.type()); + } payload = result.payload(); } return new ChainTransform.Result(payload, results.build()); diff --git a/src/main/java/org/elasticsearch/watcher/transform/script/ExecutableScriptTransform.java b/src/main/java/org/elasticsearch/watcher/transform/script/ExecutableScriptTransform.java index 2ee71202a58..2dd19d16141 100644 --- a/src/main/java/org/elasticsearch/watcher/transform/script/ExecutableScriptTransform.java +++ b/src/main/java/org/elasticsearch/watcher/transform/script/ExecutableScriptTransform.java @@ -40,7 +40,17 @@ public class ExecutableScriptTransform extends ExecutableTransform model = new HashMap<>(); model.putAll(script.params()); diff --git a/src/main/java/org/elasticsearch/watcher/transform/script/ScriptTransform.java b/src/main/java/org/elasticsearch/watcher/transform/script/ScriptTransform.java index 1696a3ea31e..0d8374d9627 100644 --- a/src/main/java/org/elasticsearch/watcher/transform/script/ScriptTransform.java +++ b/src/main/java/org/elasticsearch/watcher/transform/script/ScriptTransform.java @@ -74,6 +74,10 @@ public class ScriptTransform implements Transform { super(TYPE, payload); } + public Result(Exception e) { + super(TYPE, e); + } + @Override protected XContentBuilder typeXContent(XContentBuilder builder, Params params) throws IOException { return builder; diff --git a/src/main/java/org/elasticsearch/watcher/transform/search/ExecutableSearchTransform.java b/src/main/java/org/elasticsearch/watcher/transform/search/ExecutableSearchTransform.java index e4a2477e730..ac4391afe8a 100644 --- a/src/main/java/org/elasticsearch/watcher/transform/search/ExecutableSearchTransform.java +++ b/src/main/java/org/elasticsearch/watcher/transform/search/ExecutableSearchTransform.java @@ -15,8 +15,6 @@ import org.elasticsearch.watcher.support.init.proxy.ClientProxy; import org.elasticsearch.watcher.transform.ExecutableTransform; import org.elasticsearch.watcher.watch.Payload; -import java.io.IOException; - /** * */ @@ -32,10 +30,16 @@ public class ExecutableSearchTransform extends ExecutableTransformof( new NamedExecutableTransform.Transform("name1"), new NamedExecutableTransform.Transform("name2"), @@ -51,7 +51,21 @@ public class ChainTransformTests extends ElasticsearchTestCase { WatchExecutionContext ctx = mock(WatchExecutionContext.class); Payload payload = new Payload.Simple(new HashMap()); - Transform.Result result = executable.execute(ctx, payload); + ChainTransform.Result result = executable.execute(ctx, payload); + assertThat(result.status(), is(Transform.Result.Status.SUCCESS)); + assertThat(result.results(), hasSize(3)); + assertThat(result.results().get(0), instanceOf(NamedExecutableTransform.Result.class)); + assertThat(result.results().get(0).status(), is(Transform.Result.Status.SUCCESS)); + assertThat((List) result.results().get(0).payload().data().get("names"), hasSize(1)); + assertThat((List) result.results().get(0).payload().data().get("names"), contains("name1")); + assertThat(result.results().get(1), instanceOf(NamedExecutableTransform.Result.class)); + assertThat(result.results().get(1).status(), is(Transform.Result.Status.SUCCESS)); + assertThat((List) result.results().get(1).payload().data().get("names"), hasSize(2)); + assertThat((List) result.results().get(1).payload().data().get("names"), contains("name1", "name2")); + assertThat(result.results().get(2), instanceOf(NamedExecutableTransform.Result.class)); + assertThat(result.results().get(2).status(), is(Transform.Result.Status.SUCCESS)); + assertThat((List) result.results().get(2).payload().data().get("names"), hasSize(3)); + assertThat((List) result.results().get(2).payload().data().get("names"), contains("name1", "name2", "name3")); Map data = result.payload().data(); assertThat(data, notNullValue()); @@ -62,6 +76,39 @@ public class ChainTransformTests extends ElasticsearchTestCase { assertThat(names, contains("name1", "name2", "name3")); } + @Test + public void testExecute_Failure() throws Exception { + ChainTransform transform = new ChainTransform(ImmutableList.of( + new NamedExecutableTransform.Transform("name1"), + new NamedExecutableTransform.Transform("name2"), + new FailingExecutableTransform.Transform() + )); + ExecutableChainTransform executable = new ExecutableChainTransform(transform, logger, ImmutableList.of( + new NamedExecutableTransform("name1"), + new NamedExecutableTransform("name2"), + new FailingExecutableTransform(logger))); + + WatchExecutionContext ctx = mock(WatchExecutionContext.class); + Payload payload = new Payload.Simple(new HashMap()); + + ChainTransform.Result result = executable.execute(ctx, payload); + assertThat(result.status(), is(Transform.Result.Status.FAILURE)); + assertThat(result.reason(), notNullValue()); + assertThat(result.results(), hasSize(3)); + assertThat(result.results().get(0), instanceOf(NamedExecutableTransform.Result.class)); + assertThat(result.results().get(0).status(), is(Transform.Result.Status.SUCCESS)); + assertThat((List) result.results().get(0).payload().data().get("names"), hasSize(1)); + assertThat((List) result.results().get(0).payload().data().get("names"), contains("name1")); + assertThat(result.results().get(1), instanceOf(NamedExecutableTransform.Result.class)); + assertThat(result.results().get(1).status(), is(Transform.Result.Status.SUCCESS)); + assertThat((List) result.results().get(1).payload().data().get("names"), hasSize(2)); + assertThat((List) result.results().get(1).payload().data().get("names"), contains("name1", "name2")); + assertThat(result.results().get(2), instanceOf(FailingExecutableTransform.Result.class)); + assertThat(result.results().get(2).status(), is(Transform.Result.Status.FAILURE)); + assertThat(result.results().get(2).reason(), containsString("_error")); + + } + @Test public void testParser() throws Exception { Map factories = ImmutableMap.builder() @@ -103,14 +150,16 @@ public class ChainTransformTests extends ElasticsearchTestCase { } @Override - public Result execute(WatchExecutionContext ctx, Payload payload) throws IOException { - Map data = new HashMap<>(payload.data()); - List names = (List) data.get("names"); + public Result execute(WatchExecutionContext ctx, Payload payload) { + List names = (List) payload.data().get("names"); if (names == null) { names = new ArrayList<>(); - data.put("names", names); + } else { + names = new ArrayList<>(names); } names.add(transform.name); + Map data = new HashMap<>(); + data.put("names", names); return new Result("named", new Payload.Simple(data)); } @@ -178,4 +227,68 @@ public class ChainTransformTests extends ElasticsearchTestCase { } } } + + private static class FailingExecutableTransform extends ExecutableTransform { + + private static final String TYPE = "throwing"; + + public FailingExecutableTransform(ESLogger logger) { + super(new Transform(), logger); + } + + @Override + public Result execute(WatchExecutionContext ctx, Payload payload) { + return new Result(TYPE); + } + + public static class Transform implements org.elasticsearch.watcher.transform.Transform { + + @Override + public String type() { + return TYPE; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder.startObject().endArray(); + } + } + + public static class Result extends Transform.Result { + + public Result(String type) { + super(type, new Exception("_error")); + } + + @Override + protected XContentBuilder typeXContent(XContentBuilder builder, Params params) throws IOException { + return builder; + } + } + + public static class Factory extends TransformFactory { + + public Factory(ESLogger transformLogger) { + super(transformLogger); + } + + @Override + public String type() { + return TYPE; + } + + @Override + public Transform parseTransform(String watchId, XContentParser parser) throws IOException { + assert parser.currentToken() == XContentParser.Token.START_OBJECT; + XContentParser.Token token = parser.nextToken(); + assert token == XContentParser.Token.END_OBJECT; + return new Transform(); + } + + @Override + public FailingExecutableTransform createExecutable(Transform transform) { + return new FailingExecutableTransform(transformLogger); + } + } + } } diff --git a/src/test/java/org/elasticsearch/watcher/transform/script/ScriptTransformTests.java b/src/test/java/org/elasticsearch/watcher/transform/script/ScriptTransformTests.java index d773b22cce4..bc1fc5544c6 100644 --- a/src/test/java/org/elasticsearch/watcher/transform/script/ScriptTransformTests.java +++ b/src/test/java/org/elasticsearch/watcher/transform/script/ScriptTransformTests.java @@ -58,7 +58,7 @@ public class ScriptTransformTests extends ElasticsearchTestCase { @Test - public void testApply_MapValue() throws Exception { + public void testExecute_MapValue() throws Exception { ScriptServiceProxy service = mock(ScriptServiceProxy.class); ScriptType type = randomFrom(ScriptType.values()); Map params = Collections.emptyMap(); @@ -84,11 +84,39 @@ public class ScriptTransformTests extends ElasticsearchTestCase { Transform.Result result = transform.execute(ctx, payload); assertThat(result, notNullValue()); assertThat(result.type(), is(ScriptTransform.TYPE)); + assertThat(result.status(), is(Transform.Result.Status.SUCCESS)); assertThat(result.payload().data(), equalTo(transformed)); } @Test - public void testApply_NonMapValue() throws Exception { + public void testExecute_MapValue_Failure() throws Exception { + ScriptServiceProxy service = mock(ScriptServiceProxy.class); + ScriptType type = randomFrom(ScriptType.values()); + Map params = Collections.emptyMap(); + Script script = scriptBuilder(type, "_script").lang("_lang").params(params).build(); + CompiledScript compiledScript = mock(CompiledScript.class); + when(service.compile(script)).thenReturn(compiledScript); + ExecutableScriptTransform transform = new ExecutableScriptTransform(new ScriptTransform(script), logger, service); + + WatchExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD); + + Payload payload = simplePayload("key", "value"); + + Map model = Variables.createCtxModel(ctx, payload); + + ExecutableScript executable = mock(ExecutableScript.class); + when(executable.run()).thenThrow(new RuntimeException("_error")); + when(service.executable(compiledScript, model)).thenReturn(executable); + + Transform.Result result = transform.execute(ctx, payload); + assertThat(result, notNullValue()); + assertThat(result.type(), is(ScriptTransform.TYPE)); + assertThat(result.status(), is(Transform.Result.Status.FAILURE)); + assertThat(result.reason(), containsString("_error")); + } + + @Test + public void testExecute_NonMapValue() throws Exception { ScriptServiceProxy service = mock(ScriptServiceProxy.class); ScriptType type = randomFrom(ScriptType.values()); diff --git a/src/test/java/org/elasticsearch/watcher/transform/search/SearchTransformTests.java b/src/test/java/org/elasticsearch/watcher/transform/search/SearchTransformTests.java index 60a4e0c2658..6c4d7354187 100644 --- a/src/test/java/org/elasticsearch/watcher/transform/search/SearchTransformTests.java +++ b/src/test/java/org/elasticsearch/watcher/transform/search/SearchTransformTests.java @@ -85,7 +85,7 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest { } @Test - public void testApply() throws Exception { + public void testExecute() throws Exception { index("idx", "type", "1"); ensureGreen("idx"); @@ -104,6 +104,7 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest { Transform.Result result = transform.execute(ctx, EMPTY_PAYLOAD); assertThat(result, notNullValue()); assertThat(result.type(), is(SearchTransform.TYPE)); + assertThat(result.status(), is(Transform.Result.Status.SUCCESS)); SearchResponse response = client().search(request).get(); Payload expectedPayload = new Payload.XContent(response); @@ -120,7 +121,33 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest { } @Test - public void testApply_MustacheTemplate() throws Exception { + public void testExecute_Failure() throws Exception { + + index("idx", "type", "1"); + ensureGreen("idx"); + refresh(); + + // create a bad request + SearchRequest request = Requests.searchRequest("idx").source(jsonBuilder().startObject() + .startObject("query") + .startObject("_unknown_query_").endObject() + .endObject() + .endObject()); + SearchTransform searchTransform = TransformBuilders.searchTransform(request).build(); + ExecutableSearchTransform transform = new ExecutableSearchTransform(searchTransform, logger, ClientProxy.of(client())); + + WatchExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD); + + SearchTransform.Result result = transform.execute(ctx, EMPTY_PAYLOAD); + assertThat(result, notNullValue()); + assertThat(result.type(), is(SearchTransform.TYPE)); + assertThat(result.status(), is(Transform.Result.Status.FAILURE)); + assertThat(result.reason(), notNullValue()); + assertThat(result.executedRequest().templateSource().toUtf8(), containsString("_unknown_query_")); + } + + @Test + public void testExecute_MustacheTemplate() throws Exception { // The rational behind this test: //