From 6406c9816abf23117dbc2190cc55216d752d0194 Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Mon, 27 Nov 2017 13:27:56 +0100 Subject: [PATCH] Watcher: Add transform input for chained input (elastic/x-pack-elasticsearch#2861) The chained input in watcher is a useful feature to call several endpoints before execution a condition. However it was pretty hard to modify data from a previous input in order to be able to execute it in another input. This commit adds a another input, called a `transform` input, which allows you to do a transform as another input in a chained input. See this example ``` "input" : { "chain" : { "inputs" : [ <1> { "first" : { "simple" : { "path" : "/_search" } } }, { "second" : { "transform" : { "script" : "return [ 'path' : 'ctx.payload.first.path' + '/' ]" } } }, { "third" : { "http" : { "request" : { "host" : "localhost", "port" : 9200, "path" : "{{ctx.payload.second.path}}" <2> } } } } ] } } ``` This allows for far more flexibility before executing the next input in a chained one. Original commit: elastic/x-pack-elasticsearch@3af9ba6e9b4369b3d6425c00bebfb51f867155c9 --- docs/en/watcher/input/chain.asciidoc | 45 ++++++- .../elasticsearch/xpack/watcher/Watcher.java | 3 + .../transform/ExecutableTransformInput.java | 29 +++++ .../input/transform/TransformInput.java | 75 ++++++++++++ .../transform/TransformInputFactory.java | 58 +++++++++ .../input/simple/SimpleInputTests.java | 1 + .../input/transform/TransformInputTests.java | 113 ++++++++++++++++++ .../60_chain_input_with_transform.yml | 54 +++++++++ 8 files changed, 377 insertions(+), 1 deletion(-) create mode 100644 plugin/src/main/java/org/elasticsearch/xpack/watcher/input/transform/ExecutableTransformInput.java create mode 100644 plugin/src/main/java/org/elasticsearch/xpack/watcher/input/transform/TransformInput.java create mode 100644 plugin/src/main/java/org/elasticsearch/xpack/watcher/input/transform/TransformInputFactory.java create mode 100644 plugin/src/test/java/org/elasticsearch/xpack/watcher/input/transform/TransformInputTests.java create mode 100644 qa/smoke-test-watcher-with-painless/src/test/resources/rest-api-spec/test/watcher_painless/60_chain_input_with_transform.yml diff --git a/docs/en/watcher/input/chain.asciidoc b/docs/en/watcher/input/chain.asciidoc index 0342e5d8e5b..1984b60d45e 100644 --- a/docs/en/watcher/input/chain.asciidoc +++ b/docs/en/watcher/input/chain.asciidoc @@ -46,4 +46,47 @@ path set by a `simple` input: ==== Accessing Chained Input Data To reference data loaded by a particular input, you use the input's name, -`ctx.payload..`. \ No newline at end of file +`ctx.payload..`. + +==== Transforming Chained Input Data + +In certain use-cases the output of the first input should be used as input +in a subsequent input. This requires you to do a transform, before you pass +the data on to the next input. + +In order to achieve this you can use a transform input between the two +specified inputs, see the following example. Note, that the first input will +still be available in its original form in `ctx.payload.first`. + +[source,js] +-------------------------------------------------- +"input" : { + "chain" : { + "inputs" : [ <1> + { + "first" : { + "simple" : { "path" : "/_search" } + } + }, + { + "second" : { + "transform" : { + "script" : "return [ 'path' : 'ctx.payload.first.path' + '/' ]" + } + } + }, + { + "third" : { + "http" : { + "request" : { + "host" : "localhost", + "port" : 9200, + "path" : "{{ctx.payload.second.path}}" <2> + } + } + } + } + ] + } +} +-------------------------------------------------- diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java index c66640409d8..b3c5460759e 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java @@ -105,6 +105,8 @@ import org.elasticsearch.xpack.watcher.input.search.SearchInput; import org.elasticsearch.xpack.watcher.input.search.SearchInputFactory; import org.elasticsearch.xpack.watcher.input.simple.SimpleInput; import org.elasticsearch.xpack.watcher.input.simple.SimpleInputFactory; +import org.elasticsearch.xpack.watcher.input.transform.TransformInput; +import org.elasticsearch.xpack.watcher.input.transform.TransformInputFactory; import org.elasticsearch.xpack.watcher.notification.email.Account; import org.elasticsearch.xpack.watcher.notification.email.EmailService; import org.elasticsearch.xpack.watcher.notification.email.attachment.DataAttachmentParser; @@ -321,6 +323,7 @@ public class Watcher implements ActionPlugin { inputFactories.put(SimpleInput.TYPE, new SimpleInputFactory(settings)); inputFactories.put(HttpInput.TYPE, new HttpInputFactory(settings, httpClient, templateEngine, httpTemplateParser)); inputFactories.put(NoneInput.TYPE, new NoneInputFactory(settings)); + inputFactories.put(TransformInput.TYPE, new TransformInputFactory(settings, transformRegistry)); final InputRegistry inputRegistry = new InputRegistry(settings, inputFactories); inputFactories.put(ChainInput.TYPE, new ChainInputFactory(settings, inputRegistry)); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/input/transform/ExecutableTransformInput.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/input/transform/ExecutableTransformInput.java new file mode 100644 index 00000000000..c737c5b97f3 --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/input/transform/ExecutableTransformInput.java @@ -0,0 +1,29 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.watcher.input.transform; + +import org.apache.logging.log4j.Logger; +import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext; +import org.elasticsearch.xpack.watcher.input.ExecutableInput; +import org.elasticsearch.xpack.watcher.transform.ExecutableTransform; +import org.elasticsearch.xpack.watcher.transform.Transform; +import org.elasticsearch.xpack.watcher.watch.Payload; + +public final class ExecutableTransformInput extends ExecutableInput { + + private final ExecutableTransform executableTransform; + + ExecutableTransformInput(TransformInput input, Logger logger, ExecutableTransform executableTransform) { + super(input, logger); + this.executableTransform = executableTransform; + } + + @Override + public TransformInput.Result execute(WatchExecutionContext ctx, Payload payload) { + Transform.Result transformResult = executableTransform.execute(ctx, payload); + return new TransformInput.Result(transformResult.payload()); + } +} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/input/transform/TransformInput.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/input/transform/TransformInput.java new file mode 100644 index 00000000000..c3b7632b8d5 --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/input/transform/TransformInput.java @@ -0,0 +1,75 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.watcher.input.transform; + +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.xpack.watcher.input.Input; +import org.elasticsearch.xpack.watcher.transform.Transform; +import org.elasticsearch.xpack.watcher.watch.Payload; + +import java.io.IOException; +import java.util.Objects; + +/** + * The Transform Input allows to configure a transformation, that should be + * put between two other inputs in a chained input in order to support easy + * data transformations + * + * This class does not have a builder, as it just consists of a single + * transform + */ +public class TransformInput implements Input { + + public static final String TYPE = "transform"; + + private final Transform transform; + + public TransformInput(Transform transform) { + this.transform = transform; + } + + public Transform getTransform() { + return transform; + } + + @Override + public String type() { + return TYPE; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + transform.toXContent(builder, params); + return builder; + } + + @Override + public int hashCode() { + return transform.hashCode(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TransformInput that = (TransformInput) o; + + return Objects.equals(transform, that.transform); + } + + static class Result extends Input.Result { + + Result(Payload payload) { + super(TYPE, payload); + } + + @Override + protected XContentBuilder typeXContent(XContentBuilder builder, Params params) throws IOException { + return builder; + } + } +} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/input/transform/TransformInputFactory.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/input/transform/TransformInputFactory.java new file mode 100644 index 00000000000..7aa587b3900 --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/input/transform/TransformInputFactory.java @@ -0,0 +1,58 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.watcher.input.transform; + +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentParserUtils; +import org.elasticsearch.xpack.watcher.input.InputFactory; +import org.elasticsearch.xpack.watcher.transform.ExecutableTransform; +import org.elasticsearch.xpack.watcher.transform.Transform; +import org.elasticsearch.xpack.watcher.transform.TransformFactory; +import org.elasticsearch.xpack.watcher.transform.TransformRegistry; + +import java.io.IOException; + +/** + * + * Transform inputs should be used between two other inputs in a chained input, + * so that you can do a transformation of your data, before sending it off to + * another input + * + * The transform input factory is pretty lightweight, as all the infra structure + * for transform can be reused for this + * + */ +public final class TransformInputFactory extends InputFactory { + + private final TransformRegistry transformRegistry; + + public TransformInputFactory(Settings settings, TransformRegistry transformRegistry) { + super(Loggers.getLogger(ExecutableTransformInput.class, settings)); + this.transformRegistry = transformRegistry; + } + + @Override + public String type() { + return TransformInput.TYPE; + } + + @Override + public TransformInput parseInput(String watchId, XContentParser parser) throws IOException { + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser::getTokenLocation); + Transform transform = transformRegistry.parse(watchId, parser).transform(); + return new TransformInput(transform); + } + + @Override + public ExecutableTransformInput createExecutable(TransformInput input) { + Transform transform = input.getTransform(); + TransformFactory factory = transformRegistry.factory(transform.type()); + ExecutableTransform executableTransform = factory.createExecutable(transform); + return new ExecutableTransformInput(input, inputLogger, executableTransform); + } +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/input/simple/SimpleInputTests.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/input/simple/SimpleInputTests.java index 844584c47a6..319cceef5ee 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/input/simple/SimpleInputTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/input/simple/SimpleInputTests.java @@ -24,6 +24,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.containsString; public class SimpleInputTests extends ESTestCase { + public void testExecute() throws Exception { Map data = new HashMap<>(); data.put("foo", "bar"); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/input/transform/TransformInputTests.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/input/transform/TransformInputTests.java new file mode 100644 index 00000000000..5ac802a4530 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/input/transform/TransformInputTests.java @@ -0,0 +1,113 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.watcher.input.transform; + +import org.elasticsearch.common.ParsingException; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.script.MockScriptEngine; +import org.elasticsearch.script.Script; +import org.elasticsearch.script.ScriptContext; +import org.elasticsearch.script.ScriptEngine; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.script.ScriptType; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.watcher.Watcher; +import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext; +import org.elasticsearch.xpack.watcher.input.ExecutableInput; +import org.elasticsearch.xpack.watcher.input.Input; +import org.elasticsearch.xpack.watcher.test.WatcherTestUtils; +import org.elasticsearch.xpack.watcher.transform.ExecutableTransform; +import org.elasticsearch.xpack.watcher.transform.TransformFactory; +import org.elasticsearch.xpack.watcher.transform.TransformRegistry; +import org.elasticsearch.xpack.watcher.transform.script.ExecutableScriptTransform; +import org.elasticsearch.xpack.watcher.transform.script.ScriptTransform; +import org.elasticsearch.xpack.watcher.transform.script.ScriptTransformFactory; +import org.elasticsearch.xpack.watcher.watch.Payload; +import org.junit.Before; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.is; + +public class TransformInputTests extends ESTestCase { + + private ScriptService scriptService; + + @Before + public void setupScriptService() throws Exception { + Map engines = new HashMap<>(); + engines.put(MockScriptEngine.NAME, new MockScriptEngine(MockScriptEngine.NAME, Collections.singletonMap("1", s -> "2"))); + Map> contexts = new HashMap<>(); + contexts.put(Watcher.SCRIPT_TEMPLATE_CONTEXT.name, Watcher.SCRIPT_TEMPLATE_CONTEXT); + contexts.put(Watcher.SCRIPT_SEARCH_CONTEXT.name, Watcher.SCRIPT_SEARCH_CONTEXT); + contexts.put(Watcher.SCRIPT_EXECUTABLE_CONTEXT.name, Watcher.SCRIPT_EXECUTABLE_CONTEXT); + scriptService = new ScriptService(Settings.EMPTY, engines, contexts); + } + + public void testExecute() throws Exception { + Script script = new Script(ScriptType.INLINE, MockScriptEngine.NAME, "1", Collections.emptyMap(), Collections.emptyMap()); + ScriptTransform scriptTransform = ScriptTransform.builder(script).build(); + TransformInput transformInput = new TransformInput(scriptTransform); + + ExecutableTransform executableTransform = new ExecutableScriptTransform(scriptTransform, logger, scriptService); + ExecutableInput input = new ExecutableTransformInput(transformInput, logger, executableTransform); + + WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", Payload.EMPTY); + Input.Result result = input.execute(ctx, new Payload.Simple()); + assertThat(result.payload().data().size(), is(1)); + assertThat(result.payload().data(), hasEntry("_value", "2")); + } + + public void testParserValid() throws Exception { + Map transformFactories = Collections.singletonMap("script", + new ScriptTransformFactory(Settings.EMPTY, scriptService)); + TransformRegistry registry = new TransformRegistry(Settings.EMPTY, transformFactories); + TransformInputFactory factory = new TransformInputFactory(Settings.EMPTY, registry); + + // { "script" : { "lang" : "mockscript", "source" : "1" } } + XContentBuilder builder = jsonBuilder().startObject().startObject("script") + .field("lang", MockScriptEngine.NAME) + .field("source", "1") + .endObject().endObject(); + + XContentParser parser = createParser(builder); + parser.nextToken(); + ExecutableTransformInput executableTransformInput = factory.parseExecutable("_id", parser); + + WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", Payload.EMPTY); + TransformInput.Result result = executableTransformInput.execute(ctx, Payload.EMPTY); + assertThat(result.payload().data().size(), is(1)); + assertThat(result.payload().data(), hasEntry("_value", "2")); + } + + public void testParserInvalid() throws Exception { + XContentBuilder jsonBuilder = jsonBuilder().value("just a string"); + + Map transformFactories = Collections.singletonMap("script", + new ScriptTransformFactory(Settings.EMPTY, scriptService)); + TransformRegistry registry = new TransformRegistry(Settings.EMPTY, transformFactories); + TransformInputFactory factory = new TransformInputFactory(Settings.EMPTY, registry); + XContentParser parser = createParser(jsonBuilder); + + parser.nextToken(); + expectThrows(ParsingException.class, () -> factory.parseInput("_id", parser)); + } + + public void testTransformResultToXContent() throws Exception { + Map data = Collections.singletonMap("foo", "bar"); + TransformInput.Result result = new TransformInput.Result(new Payload.Simple(data)); + try (XContentBuilder builder = jsonBuilder()) { + result.toXContent(builder, ToXContent.EMPTY_PARAMS); + } + } +} diff --git a/qa/smoke-test-watcher-with-painless/src/test/resources/rest-api-spec/test/watcher_painless/60_chain_input_with_transform.yml b/qa/smoke-test-watcher-with-painless/src/test/resources/rest-api-spec/test/watcher_painless/60_chain_input_with_transform.yml new file mode 100644 index 00000000000..08097e0cace --- /dev/null +++ b/qa/smoke-test-watcher-with-painless/src/test/resources/rest-api-spec/test/watcher_painless/60_chain_input_with_transform.yml @@ -0,0 +1,54 @@ +--- +"Test chained input with transform": + - do: + cluster.health: + wait_for_status: yellow + + - do: + xpack.watcher.execute_watch: + body: > + { + "watch" : { + "trigger" : { + "schedule" : { "cron" : "0 0 0 1 * ? 2099" } + }, + "input" : { + "chain" : { + "inputs" : [ + { + "first" : { + "simple" : { "foo" : "bar" } + } + }, + { + "second" : { + "transform": { + "script" : "def value = ctx.payload.first.foo + ' baz' ; return [ 'foo' : value ]" + } + } + } + ] + } + }, + "actions" : { + "index" : { + "index" : { + "index" : "my-index", + "doc_type" : "my-type", + "doc_id" : "my-id" + } + } + } + } + } + - match: { "watch_record.state": "executed" } + - match: { "watch_record.result.input.status": "success" } + + - do: + get: + index: my-index + type: my-type + id: my-id + + - match: { _source.first.foo: "bar" } + - match: { _source.second.foo: "bar baz" }