Watcher: Add transform input for chained input (elastic/x-pack-elasticsearch#2861)

The chained input in watcher is a useful feature to
call several endpoints before execution a condition.
However it was pretty hard to modify data from a previous
input in order to be able to execute it in another input.

This commit adds a another input, called a `transform` input,
which allows you to do a transform as another input in a chained
input.

See this example

```
"input" : {
  "chain" : {
    "inputs" : [ <1>
      {
        "first" : {
          "simple" : { "path" : "/_search" }
        }
      },
      {
        "second" : {
          "transform" : {
            "script" : "return [ 'path' : 'ctx.payload.first.path' + '/' ]"
          }
        }
      },
      {
        "third" : {
          "http" : {
            "request" : {
              "host" : "localhost",
              "port" : 9200,
              "path" : "{{ctx.payload.second.path}}" <2>
            }
          }
        }
      }
    ]
  }
}
```

This allows for far more flexibility before executing the next input in a chained
one.

Original commit: elastic/x-pack-elasticsearch@3af9ba6e9b
This commit is contained in:
Alexander Reelsen 2017-11-27 13:27:56 +01:00 committed by GitHub
parent 3957518ab2
commit 6406c9816a
8 changed files with 377 additions and 1 deletions

View File

@ -46,4 +46,47 @@ path set by a `simple` input:
==== Accessing Chained Input Data
To reference data loaded by a particular input, you use the input's name,
`ctx.payload.<input-name>.<value>`.
`ctx.payload.<input-name>.<value>`.
==== Transforming Chained Input Data
In certain use-cases the output of the first input should be used as input
in a subsequent input. This requires you to do a transform, before you pass
the data on to the next input.
In order to achieve this you can use a transform input between the two
specified inputs, see the following example. Note, that the first input will
still be available in its original form in `ctx.payload.first`.
[source,js]
--------------------------------------------------
"input" : {
"chain" : {
"inputs" : [ <1>
{
"first" : {
"simple" : { "path" : "/_search" }
}
},
{
"second" : {
"transform" : {
"script" : "return [ 'path' : 'ctx.payload.first.path' + '/' ]"
}
}
},
{
"third" : {
"http" : {
"request" : {
"host" : "localhost",
"port" : 9200,
"path" : "{{ctx.payload.second.path}}" <2>
}
}
}
}
]
}
}
--------------------------------------------------

View File

@ -105,6 +105,8 @@ import org.elasticsearch.xpack.watcher.input.search.SearchInput;
import org.elasticsearch.xpack.watcher.input.search.SearchInputFactory;
import org.elasticsearch.xpack.watcher.input.simple.SimpleInput;
import org.elasticsearch.xpack.watcher.input.simple.SimpleInputFactory;
import org.elasticsearch.xpack.watcher.input.transform.TransformInput;
import org.elasticsearch.xpack.watcher.input.transform.TransformInputFactory;
import org.elasticsearch.xpack.watcher.notification.email.Account;
import org.elasticsearch.xpack.watcher.notification.email.EmailService;
import org.elasticsearch.xpack.watcher.notification.email.attachment.DataAttachmentParser;
@ -321,6 +323,7 @@ public class Watcher implements ActionPlugin {
inputFactories.put(SimpleInput.TYPE, new SimpleInputFactory(settings));
inputFactories.put(HttpInput.TYPE, new HttpInputFactory(settings, httpClient, templateEngine, httpTemplateParser));
inputFactories.put(NoneInput.TYPE, new NoneInputFactory(settings));
inputFactories.put(TransformInput.TYPE, new TransformInputFactory(settings, transformRegistry));
final InputRegistry inputRegistry = new InputRegistry(settings, inputFactories);
inputFactories.put(ChainInput.TYPE, new ChainInputFactory(settings, inputRegistry));

View File

@ -0,0 +1,29 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.input.transform;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.input.ExecutableInput;
import org.elasticsearch.xpack.watcher.transform.ExecutableTransform;
import org.elasticsearch.xpack.watcher.transform.Transform;
import org.elasticsearch.xpack.watcher.watch.Payload;
public final class ExecutableTransformInput extends ExecutableInput<TransformInput, TransformInput.Result> {
private final ExecutableTransform executableTransform;
ExecutableTransformInput(TransformInput input, Logger logger, ExecutableTransform executableTransform) {
super(input, logger);
this.executableTransform = executableTransform;
}
@Override
public TransformInput.Result execute(WatchExecutionContext ctx, Payload payload) {
Transform.Result transformResult = executableTransform.execute(ctx, payload);
return new TransformInput.Result(transformResult.payload());
}
}

View File

@ -0,0 +1,75 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.input.transform;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.watcher.input.Input;
import org.elasticsearch.xpack.watcher.transform.Transform;
import org.elasticsearch.xpack.watcher.watch.Payload;
import java.io.IOException;
import java.util.Objects;
/**
* The Transform Input allows to configure a transformation, that should be
* put between two other inputs in a chained input in order to support easy
* data transformations
*
* This class does not have a builder, as it just consists of a single
* transform
*/
public class TransformInput implements Input {
public static final String TYPE = "transform";
private final Transform transform;
public TransformInput(Transform transform) {
this.transform = transform;
}
public Transform getTransform() {
return transform;
}
@Override
public String type() {
return TYPE;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
transform.toXContent(builder, params);
return builder;
}
@Override
public int hashCode() {
return transform.hashCode();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TransformInput that = (TransformInput) o;
return Objects.equals(transform, that.transform);
}
static class Result extends Input.Result {
Result(Payload payload) {
super(TYPE, payload);
}
@Override
protected XContentBuilder typeXContent(XContentBuilder builder, Params params) throws IOException {
return builder;
}
}
}

View File

@ -0,0 +1,58 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.input.transform;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentParserUtils;
import org.elasticsearch.xpack.watcher.input.InputFactory;
import org.elasticsearch.xpack.watcher.transform.ExecutableTransform;
import org.elasticsearch.xpack.watcher.transform.Transform;
import org.elasticsearch.xpack.watcher.transform.TransformFactory;
import org.elasticsearch.xpack.watcher.transform.TransformRegistry;
import java.io.IOException;
/**
*
* Transform inputs should be used between two other inputs in a chained input,
* so that you can do a transformation of your data, before sending it off to
* another input
*
* The transform input factory is pretty lightweight, as all the infra structure
* for transform can be reused for this
*
*/
public final class TransformInputFactory extends InputFactory<TransformInput, TransformInput.Result, ExecutableTransformInput> {
private final TransformRegistry transformRegistry;
public TransformInputFactory(Settings settings, TransformRegistry transformRegistry) {
super(Loggers.getLogger(ExecutableTransformInput.class, settings));
this.transformRegistry = transformRegistry;
}
@Override
public String type() {
return TransformInput.TYPE;
}
@Override
public TransformInput parseInput(String watchId, XContentParser parser) throws IOException {
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser::getTokenLocation);
Transform transform = transformRegistry.parse(watchId, parser).transform();
return new TransformInput(transform);
}
@Override
public ExecutableTransformInput createExecutable(TransformInput input) {
Transform transform = input.getTransform();
TransformFactory factory = transformRegistry.factory(transform.type());
ExecutableTransform executableTransform = factory.createExecutable(transform);
return new ExecutableTransformInput(input, inputLogger, executableTransform);
}
}

View File

@ -24,6 +24,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.containsString;
public class SimpleInputTests extends ESTestCase {
public void testExecute() throws Exception {
Map<String, Object> data = new HashMap<>();
data.put("foo", "bar");

View File

@ -0,0 +1,113 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.input.transform;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.script.MockScriptEngine;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.script.ScriptEngine;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.watcher.Watcher;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.input.ExecutableInput;
import org.elasticsearch.xpack.watcher.input.Input;
import org.elasticsearch.xpack.watcher.test.WatcherTestUtils;
import org.elasticsearch.xpack.watcher.transform.ExecutableTransform;
import org.elasticsearch.xpack.watcher.transform.TransformFactory;
import org.elasticsearch.xpack.watcher.transform.TransformRegistry;
import org.elasticsearch.xpack.watcher.transform.script.ExecutableScriptTransform;
import org.elasticsearch.xpack.watcher.transform.script.ScriptTransform;
import org.elasticsearch.xpack.watcher.transform.script.ScriptTransformFactory;
import org.elasticsearch.xpack.watcher.watch.Payload;
import org.junit.Before;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.is;
public class TransformInputTests extends ESTestCase {
private ScriptService scriptService;
@Before
public void setupScriptService() throws Exception {
Map<String, ScriptEngine> engines = new HashMap<>();
engines.put(MockScriptEngine.NAME, new MockScriptEngine(MockScriptEngine.NAME, Collections.singletonMap("1", s -> "2")));
Map<String, ScriptContext<?>> contexts = new HashMap<>();
contexts.put(Watcher.SCRIPT_TEMPLATE_CONTEXT.name, Watcher.SCRIPT_TEMPLATE_CONTEXT);
contexts.put(Watcher.SCRIPT_SEARCH_CONTEXT.name, Watcher.SCRIPT_SEARCH_CONTEXT);
contexts.put(Watcher.SCRIPT_EXECUTABLE_CONTEXT.name, Watcher.SCRIPT_EXECUTABLE_CONTEXT);
scriptService = new ScriptService(Settings.EMPTY, engines, contexts);
}
public void testExecute() throws Exception {
Script script = new Script(ScriptType.INLINE, MockScriptEngine.NAME, "1", Collections.emptyMap(), Collections.emptyMap());
ScriptTransform scriptTransform = ScriptTransform.builder(script).build();
TransformInput transformInput = new TransformInput(scriptTransform);
ExecutableTransform executableTransform = new ExecutableScriptTransform(scriptTransform, logger, scriptService);
ExecutableInput input = new ExecutableTransformInput(transformInput, logger, executableTransform);
WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", Payload.EMPTY);
Input.Result result = input.execute(ctx, new Payload.Simple());
assertThat(result.payload().data().size(), is(1));
assertThat(result.payload().data(), hasEntry("_value", "2"));
}
public void testParserValid() throws Exception {
Map<String, TransformFactory> transformFactories = Collections.singletonMap("script",
new ScriptTransformFactory(Settings.EMPTY, scriptService));
TransformRegistry registry = new TransformRegistry(Settings.EMPTY, transformFactories);
TransformInputFactory factory = new TransformInputFactory(Settings.EMPTY, registry);
// { "script" : { "lang" : "mockscript", "source" : "1" } }
XContentBuilder builder = jsonBuilder().startObject().startObject("script")
.field("lang", MockScriptEngine.NAME)
.field("source", "1")
.endObject().endObject();
XContentParser parser = createParser(builder);
parser.nextToken();
ExecutableTransformInput executableTransformInput = factory.parseExecutable("_id", parser);
WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContext("_id", Payload.EMPTY);
TransformInput.Result result = executableTransformInput.execute(ctx, Payload.EMPTY);
assertThat(result.payload().data().size(), is(1));
assertThat(result.payload().data(), hasEntry("_value", "2"));
}
public void testParserInvalid() throws Exception {
XContentBuilder jsonBuilder = jsonBuilder().value("just a string");
Map<String, TransformFactory> transformFactories = Collections.singletonMap("script",
new ScriptTransformFactory(Settings.EMPTY, scriptService));
TransformRegistry registry = new TransformRegistry(Settings.EMPTY, transformFactories);
TransformInputFactory factory = new TransformInputFactory(Settings.EMPTY, registry);
XContentParser parser = createParser(jsonBuilder);
parser.nextToken();
expectThrows(ParsingException.class, () -> factory.parseInput("_id", parser));
}
public void testTransformResultToXContent() throws Exception {
Map<String, Object> data = Collections.singletonMap("foo", "bar");
TransformInput.Result result = new TransformInput.Result(new Payload.Simple(data));
try (XContentBuilder builder = jsonBuilder()) {
result.toXContent(builder, ToXContent.EMPTY_PARAMS);
}
}
}

View File

@ -0,0 +1,54 @@
---
"Test chained input with transform":
- do:
cluster.health:
wait_for_status: yellow
- do:
xpack.watcher.execute_watch:
body: >
{
"watch" : {
"trigger" : {
"schedule" : { "cron" : "0 0 0 1 * ? 2099" }
},
"input" : {
"chain" : {
"inputs" : [
{
"first" : {
"simple" : { "foo" : "bar" }
}
},
{
"second" : {
"transform": {
"script" : "def value = ctx.payload.first.foo + ' baz' ; return [ 'foo' : value ]"
}
}
}
]
}
},
"actions" : {
"index" : {
"index" : {
"index" : "my-index",
"doc_type" : "my-type",
"doc_id" : "my-id"
}
}
}
}
}
- match: { "watch_record.state": "executed" }
- match: { "watch_record.result.input.status": "success" }
- do:
get:
index: my-index
type: my-type
id: my-id
- match: { _source.first.foo: "bar" }
- match: { _source.second.foo: "bar baz" }