From 03b704f79bb9cc4707fef1a32022810b4b622bde Mon Sep 17 00:00:00 2001 From: uboness Date: Mon, 15 Jun 2015 14:59:11 +0200 Subject: [PATCH] Moved transform errors to the tranform result Until now, if the transform failed (either on the watch or action level), an exception would be thrown and it would be captured globally on the watch execution and in the watch record message This commit bring the error to the transform result - A new `status` field was added to the transform result. Can either have `success` or `failure` values. When set to `failure` a `reason` field will hold the failure message. - The `ExecutionService` changed to enable this functionality. Mainly, instead of relying on exception, during the execution the transform result status is checked and if failed the watch execution is aborted. Original commit: elastic/x-pack-elasticsearch@65b7f51f00222b6c03c56430855c80cb3faf768e --- .../watcher/actions/ActionWrapper.java | 8 +- .../watcher/actions/ExecutableAction.java | 7 + .../execution/WatchExecutionContext.java | 3 + .../execution/WatchExecutionException.java | 22 +++ .../transform/ExecutableTransform.java | 2 +- .../watcher/transform/Transform.java | 51 ++++++- .../transform/chain/ChainTransform.java | 20 ++- .../chain/ExecutableChainTransform.java | 15 +- .../script/ExecutableScriptTransform.java | 12 +- .../transform/script/ScriptTransform.java | 4 + .../search/ExecutableSearchTransform.java | 16 +- .../transform/search/SearchTransform.java | 19 ++- .../execution/ExecutionServiceTests.java | 142 ++++++++++++++++++ .../transform/chain/ChainTransformTests.java | 125 ++++++++++++++- .../script/ScriptTransformTests.java | 32 +++- .../search/SearchTransformTests.java | 31 +++- 16 files changed, 473 insertions(+), 36 deletions(-) create mode 100644 src/main/java/org/elasticsearch/watcher/execution/WatchExecutionException.java diff --git a/src/main/java/org/elasticsearch/watcher/actions/ActionWrapper.java b/src/main/java/org/elasticsearch/watcher/actions/ActionWrapper.java index 2ada3adfe32..a1f247c3a76 100644 --- a/src/main/java/org/elasticsearch/watcher/actions/ActionWrapper.java +++ b/src/main/java/org/elasticsearch/watcher/actions/ActionWrapper.java @@ -78,9 +78,13 @@ public class ActionWrapper implements ToXContent { if (transform != null) { try { transformResult = transform.execute(ctx, payload); + if (transformResult.status() == Transform.Result.Status.FAILURE) { + action.logger().error("failed to execute action [{}/{}]. failed to transform payload. {}", ctx.watch().id(), id, transformResult.reason()); + return new ActionWrapper.Result(id, transformResult, new Action.Result.Failure(action.type(), "Failed to transform payload")); + } payload = transformResult.payload(); } catch (Exception e) { - action.logger.error("failed to execute action [{}/{}]. failed to transform payload.", e, ctx.watch().id(), id); + action.logger().error("failed to execute action [{}/{}]. failed to transform payload.", e, ctx.watch().id(), id); return new ActionWrapper.Result(id, new Action.Result.Failure(action.type(), "Failed to transform payload. error: " + ExceptionsHelper.detailedMessage(e))); } } @@ -88,7 +92,7 @@ public class ActionWrapper implements ToXContent { Action.Result actionResult = action.execute(id, ctx, payload); return new ActionWrapper.Result(id, transformResult, actionResult); } catch (Exception e) { - action.logger.error("failed to execute action [{}/{}]", e, ctx.watch().id(), id); + action.logger().error("failed to execute action [{}/{}]", e, ctx.watch().id(), id); return new ActionWrapper.Result(id, new Action.Result.Failure(action.type(), ExceptionsHelper.detailedMessage(e))); } } diff --git a/src/main/java/org/elasticsearch/watcher/actions/ExecutableAction.java b/src/main/java/org/elasticsearch/watcher/actions/ExecutableAction.java index d4a6bf7c056..9312a540187 100644 --- a/src/main/java/org/elasticsearch/watcher/actions/ExecutableAction.java +++ b/src/main/java/org/elasticsearch/watcher/actions/ExecutableAction.java @@ -36,6 +36,13 @@ public abstract class ExecutableAction implements ToXContent { return action; } + /** + * yack... needed to expose that for testing purposes + */ + public ESLogger logger() { + return logger; + } + public abstract Action.Result execute(String actionId, WatchExecutionContext context, Payload payload) throws Exception; @Override diff --git a/src/main/java/org/elasticsearch/watcher/execution/WatchExecutionContext.java b/src/main/java/org/elasticsearch/watcher/execution/WatchExecutionContext.java index 64adee1aeb5..59fe767d58e 100644 --- a/src/main/java/org/elasticsearch/watcher/execution/WatchExecutionContext.java +++ b/src/main/java/org/elasticsearch/watcher/execution/WatchExecutionContext.java @@ -121,6 +121,9 @@ public abstract class WatchExecutionContext { } beforeWatchTransform(); this.transformResult = watch.transform().execute(this, payload); + if (this.transformResult.status() == Transform.Result.Status.FAILURE) { + throw new WatchExecutionException("failed to execute watch level transform for [{}]", id); + } this.payload = transformResult.payload(); this.transformedPayload = this.payload; return transformedPayload; diff --git a/src/main/java/org/elasticsearch/watcher/execution/WatchExecutionException.java b/src/main/java/org/elasticsearch/watcher/execution/WatchExecutionException.java new file mode 100644 index 00000000000..6c3bc582012 --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/execution/WatchExecutionException.java @@ -0,0 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.execution; + +import org.elasticsearch.watcher.WatcherException; + +/** + * + */ +public class WatchExecutionException extends WatcherException { + + public WatchExecutionException(String msg, Object... args) { + super(msg, args); + } + + public WatchExecutionException(String msg, Throwable cause, Object... args) { + super(msg, cause, args); + } +} diff --git a/src/main/java/org/elasticsearch/watcher/transform/ExecutableTransform.java b/src/main/java/org/elasticsearch/watcher/transform/ExecutableTransform.java index a77bbf0efad..2f77080e328 100644 --- a/src/main/java/org/elasticsearch/watcher/transform/ExecutableTransform.java +++ b/src/main/java/org/elasticsearch/watcher/transform/ExecutableTransform.java @@ -34,7 +34,7 @@ public abstract class ExecutableTransform results) { + super(TYPE, e); + this.results = results; + } + public ImmutableList results() { return results; } @Override protected XContentBuilder typeXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(type); - builder.startArray(Field.RESULTS.getPreferredName()); - for (Transform.Result result : results) { - result.toXContent(builder, params); + if (!results.isEmpty()) { + builder.startObject(type); + builder.startArray(Field.RESULTS.getPreferredName()); + for (Transform.Result result : results) { + result.toXContent(builder, params); + } + builder.endArray(); + builder.endObject(); } - builder.endArray(); - return builder.endObject(); + return builder; } } diff --git a/src/main/java/org/elasticsearch/watcher/transform/chain/ExecutableChainTransform.java b/src/main/java/org/elasticsearch/watcher/transform/chain/ExecutableChainTransform.java index 5eca75538df..94d7244c180 100644 --- a/src/main/java/org/elasticsearch/watcher/transform/chain/ExecutableChainTransform.java +++ b/src/main/java/org/elasticsearch/watcher/transform/chain/ExecutableChainTransform.java @@ -31,11 +31,24 @@ public class ExecutableChainTransform extends ExecutableTransform results = ImmutableList.builder(); + try { + return doExecute(ctx, payload, results); + } catch (Exception e) { + logger.error("failed to execute [{}] transform for [{}]", e, ChainTransform.TYPE, ctx.id()); + return new ChainTransform.Result(e, results.build()); + } + } + + + ChainTransform.Result doExecute(WatchExecutionContext ctx, Payload payload, ImmutableList.Builder results) throws IOException { for (ExecutableTransform transform : transforms) { Transform.Result result = transform.execute(ctx, payload); results.add(result); + if (result.status() == Transform.Result.Status.FAILURE) { + throw new ChainTransformException("failed to execute [{}] transform. failed to execute sub-transform [{}]", ChainTransform.TYPE, transform.type()); + } payload = result.payload(); } return new ChainTransform.Result(payload, results.build()); diff --git a/src/main/java/org/elasticsearch/watcher/transform/script/ExecutableScriptTransform.java b/src/main/java/org/elasticsearch/watcher/transform/script/ExecutableScriptTransform.java index 2ee71202a58..2dd19d16141 100644 --- a/src/main/java/org/elasticsearch/watcher/transform/script/ExecutableScriptTransform.java +++ b/src/main/java/org/elasticsearch/watcher/transform/script/ExecutableScriptTransform.java @@ -40,7 +40,17 @@ public class ExecutableScriptTransform extends ExecutableTransform model = new HashMap<>(); model.putAll(script.params()); diff --git a/src/main/java/org/elasticsearch/watcher/transform/script/ScriptTransform.java b/src/main/java/org/elasticsearch/watcher/transform/script/ScriptTransform.java index 1696a3ea31e..0d8374d9627 100644 --- a/src/main/java/org/elasticsearch/watcher/transform/script/ScriptTransform.java +++ b/src/main/java/org/elasticsearch/watcher/transform/script/ScriptTransform.java @@ -74,6 +74,10 @@ public class ScriptTransform implements Transform { super(TYPE, payload); } + public Result(Exception e) { + super(TYPE, e); + } + @Override protected XContentBuilder typeXContent(XContentBuilder builder, Params params) throws IOException { return builder; diff --git a/src/main/java/org/elasticsearch/watcher/transform/search/ExecutableSearchTransform.java b/src/main/java/org/elasticsearch/watcher/transform/search/ExecutableSearchTransform.java index e4a2477e730..ac4391afe8a 100644 --- a/src/main/java/org/elasticsearch/watcher/transform/search/ExecutableSearchTransform.java +++ b/src/main/java/org/elasticsearch/watcher/transform/search/ExecutableSearchTransform.java @@ -15,8 +15,6 @@ import org.elasticsearch.watcher.support.init.proxy.ClientProxy; import org.elasticsearch.watcher.transform.ExecutableTransform; import org.elasticsearch.watcher.watch.Payload; -import java.io.IOException; - /** * */ @@ -32,10 +30,16 @@ public class ExecutableSearchTransform extends ExecutableTransformof( new NamedExecutableTransform.Transform("name1"), new NamedExecutableTransform.Transform("name2"), @@ -51,7 +51,21 @@ public class ChainTransformTests extends ElasticsearchTestCase { WatchExecutionContext ctx = mock(WatchExecutionContext.class); Payload payload = new Payload.Simple(new HashMap()); - Transform.Result result = executable.execute(ctx, payload); + ChainTransform.Result result = executable.execute(ctx, payload); + assertThat(result.status(), is(Transform.Result.Status.SUCCESS)); + assertThat(result.results(), hasSize(3)); + assertThat(result.results().get(0), instanceOf(NamedExecutableTransform.Result.class)); + assertThat(result.results().get(0).status(), is(Transform.Result.Status.SUCCESS)); + assertThat((List) result.results().get(0).payload().data().get("names"), hasSize(1)); + assertThat((List) result.results().get(0).payload().data().get("names"), contains("name1")); + assertThat(result.results().get(1), instanceOf(NamedExecutableTransform.Result.class)); + assertThat(result.results().get(1).status(), is(Transform.Result.Status.SUCCESS)); + assertThat((List) result.results().get(1).payload().data().get("names"), hasSize(2)); + assertThat((List) result.results().get(1).payload().data().get("names"), contains("name1", "name2")); + assertThat(result.results().get(2), instanceOf(NamedExecutableTransform.Result.class)); + assertThat(result.results().get(2).status(), is(Transform.Result.Status.SUCCESS)); + assertThat((List) result.results().get(2).payload().data().get("names"), hasSize(3)); + assertThat((List) result.results().get(2).payload().data().get("names"), contains("name1", "name2", "name3")); Map data = result.payload().data(); assertThat(data, notNullValue()); @@ -62,6 +76,39 @@ public class ChainTransformTests extends ElasticsearchTestCase { assertThat(names, contains("name1", "name2", "name3")); } + @Test + public void testExecute_Failure() throws Exception { + ChainTransform transform = new ChainTransform(ImmutableList.of( + new NamedExecutableTransform.Transform("name1"), + new NamedExecutableTransform.Transform("name2"), + new FailingExecutableTransform.Transform() + )); + ExecutableChainTransform executable = new ExecutableChainTransform(transform, logger, ImmutableList.of( + new NamedExecutableTransform("name1"), + new NamedExecutableTransform("name2"), + new FailingExecutableTransform(logger))); + + WatchExecutionContext ctx = mock(WatchExecutionContext.class); + Payload payload = new Payload.Simple(new HashMap()); + + ChainTransform.Result result = executable.execute(ctx, payload); + assertThat(result.status(), is(Transform.Result.Status.FAILURE)); + assertThat(result.reason(), notNullValue()); + assertThat(result.results(), hasSize(3)); + assertThat(result.results().get(0), instanceOf(NamedExecutableTransform.Result.class)); + assertThat(result.results().get(0).status(), is(Transform.Result.Status.SUCCESS)); + assertThat((List) result.results().get(0).payload().data().get("names"), hasSize(1)); + assertThat((List) result.results().get(0).payload().data().get("names"), contains("name1")); + assertThat(result.results().get(1), instanceOf(NamedExecutableTransform.Result.class)); + assertThat(result.results().get(1).status(), is(Transform.Result.Status.SUCCESS)); + assertThat((List) result.results().get(1).payload().data().get("names"), hasSize(2)); + assertThat((List) result.results().get(1).payload().data().get("names"), contains("name1", "name2")); + assertThat(result.results().get(2), instanceOf(FailingExecutableTransform.Result.class)); + assertThat(result.results().get(2).status(), is(Transform.Result.Status.FAILURE)); + assertThat(result.results().get(2).reason(), containsString("_error")); + + } + @Test public void testParser() throws Exception { Map factories = ImmutableMap.builder() @@ -103,14 +150,16 @@ public class ChainTransformTests extends ElasticsearchTestCase { } @Override - public Result execute(WatchExecutionContext ctx, Payload payload) throws IOException { - Map data = new HashMap<>(payload.data()); - List names = (List) data.get("names"); + public Result execute(WatchExecutionContext ctx, Payload payload) { + List names = (List) payload.data().get("names"); if (names == null) { names = new ArrayList<>(); - data.put("names", names); + } else { + names = new ArrayList<>(names); } names.add(transform.name); + Map data = new HashMap<>(); + data.put("names", names); return new Result("named", new Payload.Simple(data)); } @@ -178,4 +227,68 @@ public class ChainTransformTests extends ElasticsearchTestCase { } } } + + private static class FailingExecutableTransform extends ExecutableTransform { + + private static final String TYPE = "throwing"; + + public FailingExecutableTransform(ESLogger logger) { + super(new Transform(), logger); + } + + @Override + public Result execute(WatchExecutionContext ctx, Payload payload) { + return new Result(TYPE); + } + + public static class Transform implements org.elasticsearch.watcher.transform.Transform { + + @Override + public String type() { + return TYPE; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder.startObject().endArray(); + } + } + + public static class Result extends Transform.Result { + + public Result(String type) { + super(type, new Exception("_error")); + } + + @Override + protected XContentBuilder typeXContent(XContentBuilder builder, Params params) throws IOException { + return builder; + } + } + + public static class Factory extends TransformFactory { + + public Factory(ESLogger transformLogger) { + super(transformLogger); + } + + @Override + public String type() { + return TYPE; + } + + @Override + public Transform parseTransform(String watchId, XContentParser parser) throws IOException { + assert parser.currentToken() == XContentParser.Token.START_OBJECT; + XContentParser.Token token = parser.nextToken(); + assert token == XContentParser.Token.END_OBJECT; + return new Transform(); + } + + @Override + public FailingExecutableTransform createExecutable(Transform transform) { + return new FailingExecutableTransform(transformLogger); + } + } + } } diff --git a/src/test/java/org/elasticsearch/watcher/transform/script/ScriptTransformTests.java b/src/test/java/org/elasticsearch/watcher/transform/script/ScriptTransformTests.java index d773b22cce4..bc1fc5544c6 100644 --- a/src/test/java/org/elasticsearch/watcher/transform/script/ScriptTransformTests.java +++ b/src/test/java/org/elasticsearch/watcher/transform/script/ScriptTransformTests.java @@ -58,7 +58,7 @@ public class ScriptTransformTests extends ElasticsearchTestCase { @Test - public void testApply_MapValue() throws Exception { + public void testExecute_MapValue() throws Exception { ScriptServiceProxy service = mock(ScriptServiceProxy.class); ScriptType type = randomFrom(ScriptType.values()); Map params = Collections.emptyMap(); @@ -84,11 +84,39 @@ public class ScriptTransformTests extends ElasticsearchTestCase { Transform.Result result = transform.execute(ctx, payload); assertThat(result, notNullValue()); assertThat(result.type(), is(ScriptTransform.TYPE)); + assertThat(result.status(), is(Transform.Result.Status.SUCCESS)); assertThat(result.payload().data(), equalTo(transformed)); } @Test - public void testApply_NonMapValue() throws Exception { + public void testExecute_MapValue_Failure() throws Exception { + ScriptServiceProxy service = mock(ScriptServiceProxy.class); + ScriptType type = randomFrom(ScriptType.values()); + Map params = Collections.emptyMap(); + Script script = scriptBuilder(type, "_script").lang("_lang").params(params).build(); + CompiledScript compiledScript = mock(CompiledScript.class); + when(service.compile(script)).thenReturn(compiledScript); + ExecutableScriptTransform transform = new ExecutableScriptTransform(new ScriptTransform(script), logger, service); + + WatchExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD); + + Payload payload = simplePayload("key", "value"); + + Map model = Variables.createCtxModel(ctx, payload); + + ExecutableScript executable = mock(ExecutableScript.class); + when(executable.run()).thenThrow(new RuntimeException("_error")); + when(service.executable(compiledScript, model)).thenReturn(executable); + + Transform.Result result = transform.execute(ctx, payload); + assertThat(result, notNullValue()); + assertThat(result.type(), is(ScriptTransform.TYPE)); + assertThat(result.status(), is(Transform.Result.Status.FAILURE)); + assertThat(result.reason(), containsString("_error")); + } + + @Test + public void testExecute_NonMapValue() throws Exception { ScriptServiceProxy service = mock(ScriptServiceProxy.class); ScriptType type = randomFrom(ScriptType.values()); diff --git a/src/test/java/org/elasticsearch/watcher/transform/search/SearchTransformTests.java b/src/test/java/org/elasticsearch/watcher/transform/search/SearchTransformTests.java index 60a4e0c2658..6c4d7354187 100644 --- a/src/test/java/org/elasticsearch/watcher/transform/search/SearchTransformTests.java +++ b/src/test/java/org/elasticsearch/watcher/transform/search/SearchTransformTests.java @@ -85,7 +85,7 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest { } @Test - public void testApply() throws Exception { + public void testExecute() throws Exception { index("idx", "type", "1"); ensureGreen("idx"); @@ -104,6 +104,7 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest { Transform.Result result = transform.execute(ctx, EMPTY_PAYLOAD); assertThat(result, notNullValue()); assertThat(result.type(), is(SearchTransform.TYPE)); + assertThat(result.status(), is(Transform.Result.Status.SUCCESS)); SearchResponse response = client().search(request).get(); Payload expectedPayload = new Payload.XContent(response); @@ -120,7 +121,33 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest { } @Test - public void testApply_MustacheTemplate() throws Exception { + public void testExecute_Failure() throws Exception { + + index("idx", "type", "1"); + ensureGreen("idx"); + refresh(); + + // create a bad request + SearchRequest request = Requests.searchRequest("idx").source(jsonBuilder().startObject() + .startObject("query") + .startObject("_unknown_query_").endObject() + .endObject() + .endObject()); + SearchTransform searchTransform = TransformBuilders.searchTransform(request).build(); + ExecutableSearchTransform transform = new ExecutableSearchTransform(searchTransform, logger, ClientProxy.of(client())); + + WatchExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD); + + SearchTransform.Result result = transform.execute(ctx, EMPTY_PAYLOAD); + assertThat(result, notNullValue()); + assertThat(result.type(), is(SearchTransform.TYPE)); + assertThat(result.status(), is(Transform.Result.Status.FAILURE)); + assertThat(result.reason(), notNullValue()); + assertThat(result.executedRequest().templateSource().toUtf8(), containsString("_unknown_query_")); + } + + @Test + public void testExecute_MustacheTemplate() throws Exception { // The rational behind this test: //