From 04a8fb32020f0bb6d889bd3c1eacec792fb74e3b Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Fri, 30 Oct 2015 09:55:00 +0100 Subject: [PATCH] Watcher: Add chained input ``` Chained input for now works like this { "chain" : [ { "first" : { "simple" : { "foo" : "bar" } } }, { "second" : { "simple" : { "spam" : "eggs" } } } ] ``` This allows to access the payload via ctx.payload.first.foo for example The array notation is needed to guarantee order, as JSON itself does not guarantee order of objects. Closes elastic/elasticsearch#353 Original commit: elastic/x-pack-elasticsearch@7ab32c43a8a37a2593adcce366ade8091c6a0176 --- watcher/docs/reference/input.asciidoc | 9 +- watcher/docs/reference/input/chain.asciidoc | 39 ++++ watcher/docs/release-notes.asciidoc | 7 + .../watcher/execution/ExecutionService.java | 2 +- .../watcher/input/ExecutableInput.java | 4 +- .../watcher/input/InputBuilders.java | 5 + .../watcher/input/InputModule.java | 5 + .../watcher/input/InputRegistry.java | 6 +- .../watcher/input/chain/ChainInput.java | 128 +++++++++++++ .../input/chain/ChainInputFactory.java | 58 ++++++ .../input/chain/ExecutableChainInput.java | 44 +++++ .../input/http/ExecutableHttpInput.java | 4 +- .../input/none/ExecutableNoneInput.java | 3 +- .../input/search/ExecutableSearchInput.java | 4 +- .../input/simple/ExecutableSimpleInput.java | 3 +- .../support/init/InitializingModule.java | 7 +- .../support/init/InitializingService.java | 2 +- .../execution/ExecutionServiceTests.java | 13 +- .../watcher/input/InputRegistryTests.java | 1 + .../watcher/input/chain/ChainInputTests.java | 173 ++++++++++++++++++ .../input/chain/ChainIntegrationTests.java | 73 ++++++++ .../watcher/input/http/HttpInputTests.java | 4 +- .../input/search/SearchInputTests.java | 7 +- .../input/simple/SimpleInputTests.java | 4 +- 24 files changed, 575 insertions(+), 30 deletions(-) create mode 100644 watcher/docs/reference/input/chain.asciidoc create mode 100644 watcher/src/main/java/org/elasticsearch/watcher/input/chain/ChainInput.java create mode 100644 watcher/src/main/java/org/elasticsearch/watcher/input/chain/ChainInputFactory.java create mode 100644 watcher/src/main/java/org/elasticsearch/watcher/input/chain/ExecutableChainInput.java create mode 100644 watcher/src/test/java/org/elasticsearch/watcher/input/chain/ChainInputTests.java create mode 100644 watcher/src/test/java/org/elasticsearch/watcher/input/chain/ChainIntegrationTests.java diff --git a/watcher/docs/reference/input.asciidoc b/watcher/docs/reference/input.asciidoc index d8e17f63e3a..28052bb5f55 100644 --- a/watcher/docs/reference/input.asciidoc +++ b/watcher/docs/reference/input.asciidoc @@ -2,8 +2,8 @@ === Input A watch _input_ loads data into a watch's execution context as the initial payload. -Watcher supports three input types: <> , <>, -and <> +Watcher supports four input types: <> , <>, +<> and <> NOTE: If you don't define an input for a watch, an empty payload is loaded into the execution context. @@ -12,4 +12,7 @@ include::input/simple.asciidoc[] include::input/search.asciidoc[] -include::input/http.asciidoc[] \ No newline at end of file +include::input/http.asciidoc[] + +include::input/chain.asciidoc[] + diff --git a/watcher/docs/reference/input/chain.asciidoc b/watcher/docs/reference/input/chain.asciidoc new file mode 100644 index 00000000000..f68035a4252 --- /dev/null +++ b/watcher/docs/reference/input/chain.asciidoc @@ -0,0 +1,39 @@ +[[input-chain]] +==== Chain Input + +An <> that enables you to chain several inputs one after the other and have each input populating the execution context. +The `chain` input is useful when the output of one input should change the input of another or you need several inputs to gather all the +information needed for an action.. + +You can define the chained input as following + +[source,json] +-------------------------------------------------- +{ + "input" : { + "chain" : { + "inputs" : [ + "first" : { + "simple" : { "path" : "/_search" } + }, + "second" : { + "http" : { + "request" : { + "host" : "localhost", + "port" : 9200, + "path" : "{{ctx.payload.first.path}}" + } + } + } + ] + } + } + ... +} +-------------------------------------------------- + +As you can see, the name of the input (`first` and `second` in this example) can be used to access values from the context in consecutive inputs. + +In case you are wondering about the structure of this input. The `inputs` must be an array, because JSON does not guarantee the order of arbitrary +objects, one has to use a list. + diff --git a/watcher/docs/release-notes.asciidoc b/watcher/docs/release-notes.asciidoc index 4270211934a..fa091b3c821 100644 --- a/watcher/docs/release-notes.asciidoc +++ b/watcher/docs/release-notes.asciidoc @@ -39,12 +39,19 @@ bin/plugin remove watcher [float] ==== 2.1.0 +.New Features +* Added support for <> + .Enhancement * Support for configuring a proxy in the webhook action, http input and configuring a default proxy (which is also used by the slack action), using the `watcher.http.proxy.host` and `watcher.http.proxy.port` settings. +======= [float] ==== 2.0.0 +.Bug fixes +* Fixed an issue where the scheduler may get stuck during Watcher startup. This caused no watches to ever fire. + .Breaking Changes * The dynamic index names support has been removed and Elasticsearch's date math index names support should be used instead. The only difference between Watcher's dynamic index names support and Elasticsearch's date math index names support is diff --git a/watcher/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java b/watcher/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java index c4e07a04d18..dbb08f6a0e9 100644 --- a/watcher/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java +++ b/watcher/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java @@ -335,7 +335,7 @@ public class ExecutionService extends AbstractComponent { ctx.beforeInput(); Input.Result inputResult = ctx.inputResult(); if (inputResult == null) { - inputResult = watch.input().execute(ctx); + inputResult = watch.input().execute(ctx, ctx.payload()); ctx.onInputResult(inputResult); } if (inputResult.status() == Input.Result.Status.FAILURE) { diff --git a/watcher/src/main/java/org/elasticsearch/watcher/input/ExecutableInput.java b/watcher/src/main/java/org/elasticsearch/watcher/input/ExecutableInput.java index cc2c056d154..4cdcc3d8314 100644 --- a/watcher/src/main/java/org/elasticsearch/watcher/input/ExecutableInput.java +++ b/watcher/src/main/java/org/elasticsearch/watcher/input/ExecutableInput.java @@ -5,10 +5,12 @@ */ package org.elasticsearch.watcher.input; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.watcher.execution.WatchExecutionContext; +import org.elasticsearch.watcher.watch.Payload; import java.io.IOException; @@ -39,7 +41,7 @@ public abstract class ExecutableInput i /** * Executes this input */ - public abstract R execute(WatchExecutionContext ctx); + public abstract R execute(WatchExecutionContext ctx, @Nullable Payload payload); @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { diff --git a/watcher/src/main/java/org/elasticsearch/watcher/input/InputBuilders.java b/watcher/src/main/java/org/elasticsearch/watcher/input/InputBuilders.java index 5fde3b9dc48..e8ee893e4c6 100644 --- a/watcher/src/main/java/org/elasticsearch/watcher/input/InputBuilders.java +++ b/watcher/src/main/java/org/elasticsearch/watcher/input/InputBuilders.java @@ -8,6 +8,7 @@ package org.elasticsearch.watcher.input; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.watcher.input.chain.ChainInput; import org.elasticsearch.watcher.input.http.HttpInput; import org.elasticsearch.watcher.input.none.NoneInput; import org.elasticsearch.watcher.input.search.SearchInput; @@ -61,4 +62,8 @@ public final class InputBuilders { public static HttpInput.Builder httpInput(HttpRequestTemplate request) { return HttpInput.builder(request); } + + public static ChainInput.Builder chainInput() { + return ChainInput.builder(); + } } diff --git a/watcher/src/main/java/org/elasticsearch/watcher/input/InputModule.java b/watcher/src/main/java/org/elasticsearch/watcher/input/InputModule.java index 95d728e96b6..03ed1e60461 100644 --- a/watcher/src/main/java/org/elasticsearch/watcher/input/InputModule.java +++ b/watcher/src/main/java/org/elasticsearch/watcher/input/InputModule.java @@ -7,6 +7,8 @@ package org.elasticsearch.watcher.input; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.multibindings.MapBinder; +import org.elasticsearch.watcher.input.chain.ChainInput; +import org.elasticsearch.watcher.input.chain.ChainInputFactory; import org.elasticsearch.watcher.input.http.HttpInput; import org.elasticsearch.watcher.input.http.HttpInputFactory; import org.elasticsearch.watcher.input.none.NoneInput; @@ -46,6 +48,9 @@ public class InputModule extends AbstractModule { bind(NoneInputFactory.class).asEagerSingleton(); parsersBinder.addBinding(NoneInput.TYPE).to(NoneInputFactory.class); + // no bind() needed, done in InitializingModule + parsersBinder.addBinding(ChainInput.TYPE).to(ChainInputFactory.class); + for (Map.Entry> entry : parsers.entrySet()) { bind(entry.getValue()).asEagerSingleton(); parsersBinder.addBinding(entry.getKey()).to(entry.getValue()); diff --git a/watcher/src/main/java/org/elasticsearch/watcher/input/InputRegistry.java b/watcher/src/main/java/org/elasticsearch/watcher/input/InputRegistry.java index d1aa5b93725..b029a15aaf9 100644 --- a/watcher/src/main/java/org/elasticsearch/watcher/input/InputRegistry.java +++ b/watcher/src/main/java/org/elasticsearch/watcher/input/InputRegistry.java @@ -12,9 +12,6 @@ import org.elasticsearch.common.xcontent.XContentParser; import java.io.IOException; import java.util.Map; -/** - * - */ public class InputRegistry { private final Map factories; @@ -62,4 +59,7 @@ public class InputRegistry { return input; } + public Map factories() { + return factories; + } } diff --git a/watcher/src/main/java/org/elasticsearch/watcher/input/chain/ChainInput.java b/watcher/src/main/java/org/elasticsearch/watcher/input/chain/ChainInput.java new file mode 100644 index 00000000000..8c888dfa27f --- /dev/null +++ b/watcher/src/main/java/org/elasticsearch/watcher/input/chain/ChainInput.java @@ -0,0 +1,128 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.input.chain; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.watcher.input.Input; +import org.elasticsearch.watcher.input.InputRegistry; +import org.elasticsearch.watcher.watch.Payload; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class ChainInput implements Input { + + public static final String TYPE = "chain"; + private List> inputs; + + public ChainInput(List> inputs) { + this.inputs = inputs; + } + + @Override + public String type() { + return TYPE; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.startArray("inputs"); + for (Tuple tuple : inputs) { + builder.startObject().startObject(tuple.v1()); + builder.field(tuple.v2().type(), tuple.v2()); + builder.endObject().endObject(); + } + builder.endArray(); + builder.endObject(); + + return builder; + } + + public List> getInputs() { + return inputs; + } + + public static ChainInput parse(String watchId, XContentParser parser, InputRegistry inputRegistry) throws IOException { + List> inputs = new ArrayList<>(); + String currentFieldName; + XContentParser.Token token; + + ParseField inputsField = new ParseField("inputs"); + + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + token = parser.nextToken(); + if (token == XContentParser.Token.START_ARRAY && inputsField.getPreferredName().equals(currentFieldName)) { + String currentInputFieldName = null; + while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { + if (token == XContentParser.Token.FIELD_NAME) { + currentInputFieldName = parser.currentName(); + } else if (currentInputFieldName != null && token == XContentParser.Token.START_OBJECT) { + inputs.add(new Tuple<>(currentInputFieldName, inputRegistry.parse(watchId, parser).input())); + currentInputFieldName = null; + } + } + } + } + } + + return new ChainInput(inputs); + } + + public static ChainInput.Builder builder() { + return new Builder(); + } + + public static class Builder implements Input.Builder { + + private List> inputs; + + private Builder() { + inputs = new ArrayList<>(); + } + + public Builder add(String name, Input.Builder input) { + inputs.add(new Tuple<>(name, input.build())); + return this; + } + + @Override + public ChainInput build() { + return new ChainInput(inputs); + } + } + + public static class Result extends Input.Result { + + private List> results; + + protected Result(List> results, Payload payload) { + super(TYPE, payload); + this.results = results; + } + + protected Result(Exception e) { + super(TYPE, e); + } + + @Override + protected XContentBuilder typeXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(type); + for (Tuple tuple : results) { + builder.field(tuple.v1(), tuple.v2()); + } + builder.endObject(); + + return builder; + } + } +} diff --git a/watcher/src/main/java/org/elasticsearch/watcher/input/chain/ChainInputFactory.java b/watcher/src/main/java/org/elasticsearch/watcher/input/chain/ChainInputFactory.java new file mode 100644 index 00000000000..82539851b4c --- /dev/null +++ b/watcher/src/main/java/org/elasticsearch/watcher/input/chain/ChainInputFactory.java @@ -0,0 +1,58 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.input.chain; + +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.inject.Injector; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.watcher.input.*; +import org.elasticsearch.watcher.support.init.InitializingService; + +import java.io.IOException; +import java.util.*; + +public class ChainInputFactory extends InputFactory implements InitializingService.Initializable { + + private InputRegistry inputRegistry; + + @Inject + public ChainInputFactory(Settings settings) { + super(Loggers.getLogger(ExecutableChainInput.class, settings)); + } + + @Override + public String type() { + return ChainInput.TYPE; + } + + @Override + public ChainInput parseInput(String watchId, XContentParser parser) throws IOException { + return ChainInput.parse(watchId, parser, inputRegistry); + } + + @Override + public ExecutableChainInput createExecutable(ChainInput input) { + List> executableInputs = new ArrayList<>(); + for (Tuple tuple : input.getInputs()) { + ExecutableInput executableInput = inputRegistry.factories().get(tuple.v2().type()).createExecutable(tuple.v2()); + executableInputs.add(new Tuple<>(tuple.v1(), executableInput)); + } + + return new ExecutableChainInput(input, executableInputs, inputLogger); + } + + @Override + public void init(Injector injector) { + init(injector.getInstance(InputRegistry.class)); + } + + void init(InputRegistry inputRegistry) { + this.inputRegistry = inputRegistry; + } +} diff --git a/watcher/src/main/java/org/elasticsearch/watcher/input/chain/ExecutableChainInput.java b/watcher/src/main/java/org/elasticsearch/watcher/input/chain/ExecutableChainInput.java new file mode 100644 index 00000000000..3ec740e2b52 --- /dev/null +++ b/watcher/src/main/java/org/elasticsearch/watcher/input/chain/ExecutableChainInput.java @@ -0,0 +1,44 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.input.chain; + +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.watcher.execution.WatchExecutionContext; +import org.elasticsearch.watcher.input.ExecutableInput; +import org.elasticsearch.watcher.input.Input; +import org.elasticsearch.watcher.watch.Payload; + +import java.util.*; + +public class ExecutableChainInput extends ExecutableInput { + + private List> inputs; + + public ExecutableChainInput(ChainInput input, List> inputs, ESLogger logger) { + super(input, logger); + this.inputs = inputs; + } + + @Override + public ChainInput.Result execute(WatchExecutionContext ctx, Payload payload) { + List> results = new ArrayList<>(); + Map payloads = new HashMap<>(); + + try { + for (Tuple tuple : inputs) { + Input.Result result = tuple.v2().execute(ctx, new Payload.Simple(payloads)); + results.add(new Tuple<>(tuple.v1(), result)); + payloads.put(tuple.v1(), result.payload().data()); + } + + return new ChainInput.Result(results, new Payload.Simple(payloads)); + } catch (Exception e) { + logger.error("failed to execute [{}] input for [{}]", e, ChainInput.TYPE, ctx.watch()); + return new ChainInput.Result(e); + } + } +} diff --git a/watcher/src/main/java/org/elasticsearch/watcher/input/http/ExecutableHttpInput.java b/watcher/src/main/java/org/elasticsearch/watcher/input/http/ExecutableHttpInput.java index 77e06d5f773..98897f4f778 100644 --- a/watcher/src/main/java/org/elasticsearch/watcher/input/http/ExecutableHttpInput.java +++ b/watcher/src/main/java/org/elasticsearch/watcher/input/http/ExecutableHttpInput.java @@ -36,10 +36,10 @@ public class ExecutableHttpInput extends ExecutableInput model = Variables.createCtxModel(ctx, null); + Map model = Variables.createCtxModel(ctx, payload); request = input.getRequest().render(templateEngine, model); return doExecute(ctx, request); } catch (Exception e) { diff --git a/watcher/src/main/java/org/elasticsearch/watcher/input/none/ExecutableNoneInput.java b/watcher/src/main/java/org/elasticsearch/watcher/input/none/ExecutableNoneInput.java index 1a632a2b965..cd63a11064b 100644 --- a/watcher/src/main/java/org/elasticsearch/watcher/input/none/ExecutableNoneInput.java +++ b/watcher/src/main/java/org/elasticsearch/watcher/input/none/ExecutableNoneInput.java @@ -9,6 +9,7 @@ package org.elasticsearch.watcher.input.none; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.watcher.execution.WatchExecutionContext; import org.elasticsearch.watcher.input.ExecutableInput; +import org.elasticsearch.watcher.watch.Payload; /** * @@ -20,7 +21,7 @@ public class ExecutableNoneInput extends ExecutableInput mbinder = Multibinder.newSetBinder(binder(), InitializingService.Initializable.class); mbinder.addBinding().to(ClientProxy.class); mbinder.addBinding().to(ScriptServiceProxy.class); mbinder.addBinding().to(ChainTransformFactory.class); + mbinder.addBinding().to(ChainInputFactory.class); bind(InitializingService.class).asEagerSingleton(); } } diff --git a/watcher/src/main/java/org/elasticsearch/watcher/support/init/InitializingService.java b/watcher/src/main/java/org/elasticsearch/watcher/support/init/InitializingService.java index f4a298c2d52..cab7be9cd7a 100644 --- a/watcher/src/main/java/org/elasticsearch/watcher/support/init/InitializingService.java +++ b/watcher/src/main/java/org/elasticsearch/watcher/support/init/InitializingService.java @@ -43,7 +43,7 @@ public class InitializingService extends AbstractLifecycleComponent { protected void doClose() throws ElasticsearchException { } - public static interface Initializable { + public interface Initializable { void init(Injector injector); } diff --git a/watcher/src/test/java/org/elasticsearch/watcher/execution/ExecutionServiceTests.java b/watcher/src/test/java/org/elasticsearch/watcher/execution/ExecutionServiceTests.java index 22af250bc98..4f9117d783a 100644 --- a/watcher/src/test/java/org/elasticsearch/watcher/execution/ExecutionServiceTests.java +++ b/watcher/src/test/java/org/elasticsearch/watcher/execution/ExecutionServiceTests.java @@ -43,6 +43,7 @@ import static org.mockito.Mockito.*; /** */ public class ExecutionServiceTests extends ESTestCase { + private Payload payload; private ExecutableInput input; private Input.Result inputResult; @@ -61,7 +62,7 @@ public class ExecutionServiceTests extends ESTestCase { inputResult = mock(Input.Result.class); when(inputResult.status()).thenReturn(Input.Result.Status.SUCCESS); when(inputResult.payload()).thenReturn(payload); - when(input.execute(any(WatchExecutionContext.class))).thenReturn(inputResult); + when(input.execute(any(WatchExecutionContext.class), any(Payload.class))).thenReturn(inputResult); watchStore = mock(WatchStore.class); triggeredWatchStore = mock(TriggeredWatchStore.class); @@ -163,7 +164,7 @@ public class ExecutionServiceTests extends ESTestCase { Input.Result inputResult = mock(Input.Result.class); when(inputResult.status()).thenReturn(Input.Result.Status.FAILURE); when(inputResult.reason()).thenReturn("_reason"); - when(input.execute(context)).thenReturn(inputResult); + when(input.execute(eq(context), any(Payload.class))).thenReturn(inputResult); Condition.Result conditionResult = AlwaysCondition.Result.INSTANCE; ExecutableCondition condition = mock(ExecutableCondition.class); @@ -214,7 +215,7 @@ public class ExecutionServiceTests extends ESTestCase { verify(historyStore, times(1)).put(watchRecord); verify(lock, times(1)).release(); - verify(input, times(1)).execute(context); + verify(input, times(1)).execute(context, null); verify(condition, never()).execute(context); verify(watchTransform, never()).execute(context, payload); verify(action, never()).execute("_action", context, payload); @@ -282,7 +283,7 @@ public class ExecutionServiceTests extends ESTestCase { verify(historyStore, times(1)).put(watchRecord); verify(lock, times(1)).release(); - verify(input, times(1)).execute(context); + verify(input, times(1)).execute(context, null); verify(condition, times(1)).execute(context); verify(watchTransform, never()).execute(context, payload); verify(action, never()).execute("_action", context, payload); @@ -349,7 +350,7 @@ public class ExecutionServiceTests extends ESTestCase { verify(historyStore, times(1)).put(watchRecord); verify(lock, times(1)).release(); - verify(input, times(1)).execute(context); + verify(input, times(1)).execute(context, null); verify(condition, times(1)).execute(context); verify(watchTransform, times(1)).execute(context, payload); verify(action, never()).execute("_action", context, payload); @@ -420,7 +421,7 @@ public class ExecutionServiceTests extends ESTestCase { verify(historyStore, times(1)).put(watchRecord); verify(lock, times(1)).release(); - verify(input, times(1)).execute(context); + verify(input, times(1)).execute(context, null); verify(condition, times(1)).execute(context); verify(watchTransform, times(1)).execute(context, payload); // the action level transform is executed before the action itself diff --git a/watcher/src/test/java/org/elasticsearch/watcher/input/InputRegistryTests.java b/watcher/src/test/java/org/elasticsearch/watcher/input/InputRegistryTests.java index 0f2520ecf25..007453058ab 100644 --- a/watcher/src/test/java/org/elasticsearch/watcher/input/InputRegistryTests.java +++ b/watcher/src/test/java/org/elasticsearch/watcher/input/InputRegistryTests.java @@ -18,6 +18,7 @@ import static org.hamcrest.Matchers.containsString; * */ public class InputRegistryTests extends ESTestCase { + public void testParseEmptyInput() throws Exception { InputRegistry registry = new InputRegistry(emptyMap()); XContentParser parser = JsonXContent.jsonXContent.createParser( diff --git a/watcher/src/test/java/org/elasticsearch/watcher/input/chain/ChainInputTests.java b/watcher/src/test/java/org/elasticsearch/watcher/input/chain/ChainInputTests.java new file mode 100644 index 00000000000..11a1f328bd8 --- /dev/null +++ b/watcher/src/test/java/org/elasticsearch/watcher/input/chain/ChainInputTests.java @@ -0,0 +1,173 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.input.chain; + +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.*; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.watcher.actions.ActionWrapper; +import org.elasticsearch.watcher.actions.ExecutableActions; +import org.elasticsearch.watcher.condition.always.ExecutableAlwaysCondition; +import org.elasticsearch.watcher.execution.TriggeredExecutionContext; +import org.elasticsearch.watcher.execution.WatchExecutionContext; +import org.elasticsearch.watcher.input.Input; +import org.elasticsearch.watcher.input.InputFactory; +import org.elasticsearch.watcher.input.InputRegistry; +import org.elasticsearch.watcher.input.http.HttpInput; +import org.elasticsearch.watcher.input.simple.ExecutableSimpleInput; +import org.elasticsearch.watcher.input.simple.SimpleInput; +import org.elasticsearch.watcher.input.simple.SimpleInputFactory; +import org.elasticsearch.watcher.support.http.HttpRequestTemplate; +import org.elasticsearch.watcher.support.http.auth.basic.BasicAuth; +import org.elasticsearch.watcher.trigger.schedule.IntervalSchedule; +import org.elasticsearch.watcher.trigger.schedule.ScheduleTrigger; +import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent; +import org.elasticsearch.watcher.watch.Payload; +import org.elasticsearch.watcher.watch.Watch; +import org.elasticsearch.watcher.watch.WatchStatus; +import org.joda.time.DateTime; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +import static java.util.Collections.emptyMap; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.watcher.actions.ActionBuilders.loggingAction; +import static org.elasticsearch.watcher.client.WatchSourceBuilders.watchBuilder; +import static org.elasticsearch.watcher.condition.ConditionBuilders.scriptCondition; +import static org.elasticsearch.watcher.input.InputBuilders.*; +import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule; +import static org.elasticsearch.watcher.trigger.schedule.Schedules.interval; +import static org.hamcrest.Matchers.*; +import static org.joda.time.DateTimeZone.UTC; + +public class ChainInputTests extends ESTestCase { + + /* note, first line does not need to be parsed + "chain" : { + "inputs" : [ + { "first" : { "simple" : { "foo" : "bar" } } }, + { "second" : { "simple" : { "spam" : "eggs" } } } + ] + } + */ + public void testThatExecutionWorks() throws Exception { + Map factories = new HashMap<>(); + factories.put("simple", new SimpleInputFactory(Settings.EMPTY)); + + // hackedy hack... + InputRegistry inputRegistry = new InputRegistry(factories); + ChainInputFactory chainInputFactory = new ChainInputFactory(Settings.EMPTY); + chainInputFactory.init(inputRegistry); + factories.put("chain", chainInputFactory); + + XContentBuilder builder = jsonBuilder().startObject().startArray("inputs") + .startObject().startObject("first").startObject("simple").field("foo", "bar").endObject().endObject().endObject() + .startObject().startObject("second").startObject("simple").field("spam", "eggs").endObject().endObject().endObject() + .endArray().endObject(); + + // first pass JSON and check for correct inputs + XContentParser parser = XContentFactory.xContent(builder.bytes()).createParser(builder.bytes()); + parser.nextToken(); + ChainInput chainInput = chainInputFactory.parseInput("test", parser); + + assertThat(chainInput.getInputs(), hasSize(2)); + assertThat(chainInput.getInputs().get(0).v1(), is("first")); + assertThat(chainInput.getInputs().get(0).v2(), instanceOf(SimpleInput.class)); + assertThat(chainInput.getInputs().get(1).v1(), is("second")); + assertThat(chainInput.getInputs().get(1).v2(), instanceOf(SimpleInput.class)); + + // now execute + ExecutableChainInput executableChainInput = chainInputFactory.createExecutable(chainInput); + ChainInput.Result result = executableChainInput.execute(createContext(), new Payload.Simple()); + Payload payload = result.payload(); + assertThat(payload.data(), hasKey("first")); + assertThat(payload.data(), hasKey("second")); + assertThat(payload.data().get("first"), instanceOf(Map.class)); + assertThat(payload.data().get("second"), instanceOf(Map.class)); + + // final payload check + Map firstPayload = (Map) payload.data().get("first"); + Map secondPayload = (Map) payload.data().get("second"); + assertThat(firstPayload, hasEntry("foo", "bar")); + assertThat(secondPayload, hasEntry("spam", "eggs")); + } + + public void testToXContent() throws Exception { + ChainInput chainedInput = chainInput() + .add("first", simpleInput("foo", "bar")) + .add("second", simpleInput("spam", "eggs")) + .build(); + + XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON); + chainedInput.toXContent(builder, ToXContent.EMPTY_PARAMS); + + assertThat(builder.bytes().toUtf8(), is("{\"inputs\":[{\"first\":{\"simple\":{\"foo\":\"bar\"}}},{\"second\":{\"simple\":{\"spam\":\"eggs\"}}}]}")); + + // parsing it back as well! + Map factories = new HashMap<>(); + factories.put("simple", new SimpleInputFactory(Settings.EMPTY)); + + InputRegistry inputRegistry = new InputRegistry(factories); + ChainInputFactory chainInputFactory = new ChainInputFactory(Settings.EMPTY); + chainInputFactory.init(inputRegistry); + factories.put("chain", chainInputFactory); + + XContentParser parser = XContentFactory.xContent(builder.bytes()).createParser(builder.bytes()); + parser.nextToken(); + ChainInput parsedChainInput = ChainInput.parse("testWatchId", parser, inputRegistry); + assertThat(parsedChainInput.getInputs(), hasSize(2)); + assertThat(parsedChainInput.getInputs().get(0).v1(), is("first")); + assertThat(parsedChainInput.getInputs().get(0).v2(), is(instanceOf(SimpleInput.class))); + assertThat(parsedChainInput.getInputs().get(1).v1(), is("second")); + assertThat(parsedChainInput.getInputs().get(1).v2(), is(instanceOf(SimpleInput.class))); + } + + public void testThatWatchSourceBuilderWorksWithChainInput() throws Exception { + XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON); + + HttpInput.Builder httpInputBuilder = httpInput(HttpRequestTemplate.builder("theHost", 1234) + .path("/index/_search") + .body(jsonBuilder().startObject().field("size", 1).endObject()) + .auth(new BasicAuth("test", "changeme".toCharArray()))); + + ChainInput.Builder chainedInputBuilder = chainInput() + .add("foo", httpInputBuilder) + .add("bar", simpleInput("spam", "eggs")); + + watchBuilder() + .trigger(schedule(interval("5s"))) + .input(chainedInputBuilder) + .condition(scriptCondition("ctx.payload.hits.total == 1")) + .addAction("_id", loggingAction("watch [{{ctx.watch_id}}] matched")) + .toXContent(builder, ToXContent.EMPTY_PARAMS); + + // no exception means all good + } + + private WatchExecutionContext createContext() { + Watch watch = new Watch("test-watch", + new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))), + new ExecutableSimpleInput(new SimpleInput(new Payload.Simple()), logger), + new ExecutableAlwaysCondition(logger), + null, + null, + new ExecutableActions(new ArrayList()), + null, + new WatchStatus(new DateTime(0, UTC), emptyMap())); + WatchExecutionContext ctx = new TriggeredExecutionContext(watch, + new DateTime(0, UTC), + new ScheduleTriggerEvent(watch.id(), new DateTime(0, UTC), new DateTime(0, UTC)), + TimeValue.timeValueSeconds(5)); + + return ctx; + } + +} diff --git a/watcher/src/test/java/org/elasticsearch/watcher/input/chain/ChainIntegrationTests.java b/watcher/src/test/java/org/elasticsearch/watcher/input/chain/ChainIntegrationTests.java new file mode 100644 index 00000000000..404a79916a6 --- /dev/null +++ b/watcher/src/test/java/org/elasticsearch/watcher/input/chain/ChainIntegrationTests.java @@ -0,0 +1,73 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.input.chain; + +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.node.Node; +import org.elasticsearch.watcher.input.http.HttpInput; +import org.elasticsearch.watcher.support.http.HttpRequestTemplate; +import org.elasticsearch.watcher.support.http.auth.basic.BasicAuth; +import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTestCase; +import org.junit.Test; + +import java.net.InetSocketAddress; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.elasticsearch.watcher.actions.ActionBuilders.indexAction; +import static org.elasticsearch.watcher.client.WatchSourceBuilders.watchBuilder; +import static org.elasticsearch.watcher.input.InputBuilders.*; +import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule; +import static org.elasticsearch.watcher.trigger.schedule.Schedules.interval; +import static org.hamcrest.Matchers.containsString; + +public class ChainIntegrationTests extends AbstractWatcherIntegrationTestCase { + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(Node.HTTP_ENABLED, true) + .build(); + } + + public void testChainedInputsAreWorking() throws Exception { + String index = "the-most-awesome-index-ever"; + createIndex(index); + client().prepareIndex(index, "type", "id").setSource("{}").setRefresh(true).get(); + + InetSocketAddress address = internalCluster().httpAddresses()[0]; + HttpInput.Builder httpInputBuilder = httpInput(HttpRequestTemplate.builder(address.getHostString(), address.getPort()) + .path("{{ctx.payload.first.url}}") + .body(jsonBuilder().startObject().field("size", 1).endObject()) + .auth(shieldEnabled() ? new BasicAuth("test", "changeme".toCharArray()) : null)); + + ChainInput.Builder chainedInputBuilder = chainInput() + .add("first", simpleInput("url", "/" + index + "/_search")) + .add("second", httpInputBuilder); + + watcherClient().preparePutWatch("_name") + .setSource(watchBuilder() + .trigger(schedule(interval("5s"))) + .input(chainedInputBuilder) + .addAction("indexAction", indexAction("my-index", "my-type"))) + .get(); + + if (timeWarped()) { + timeWarp().scheduler().trigger("_name"); + refresh(); + } + + refresh(); + SearchResponse searchResponse = client().prepareSearch("my-index").setTypes("my-type").get(); + assertHitCount(searchResponse, 1); + assertThat(searchResponse.getHits().getAt(0).sourceAsString(), containsString(index)); + assertWatchWithMinimumPerformedActionsCount("_name", 1, false); + } + + +} diff --git a/watcher/src/test/java/org/elasticsearch/watcher/input/http/HttpInputTests.java b/watcher/src/test/java/org/elasticsearch/watcher/input/http/HttpInputTests.java index df7b5c0b05a..dda4d18fcef 100644 --- a/watcher/src/test/java/org/elasticsearch/watcher/input/http/HttpInputTests.java +++ b/watcher/src/test/java/org/elasticsearch/watcher/input/http/HttpInputTests.java @@ -135,7 +135,7 @@ public class HttpInputTests extends ESTestCase { new DateTime(0, UTC), new ScheduleTriggerEvent(watch.id(), new DateTime(0, UTC), new DateTime(0, UTC)), TimeValue.timeValueSeconds(5)); - HttpInput.Result result = input.execute(ctx); + HttpInput.Result result = input.execute(ctx, new Payload.Simple()); assertThat(result.type(), equalTo(HttpInput.TYPE)); assertThat(result.payload().data(), equalTo(MapBuilder.newMapBuilder().put("key", "value").map())); } @@ -165,7 +165,7 @@ public class HttpInputTests extends ESTestCase { new DateTime(0, UTC), new ScheduleTriggerEvent(watch.id(), new DateTime(0, UTC), new DateTime(0, UTC)), TimeValue.timeValueSeconds(5)); - HttpInput.Result result = input.execute(ctx); + HttpInput.Result result = input.execute(ctx, new Payload.Simple()); assertThat(result.type(), equalTo(HttpInput.TYPE)); assertThat(result.payload().data().get("_value").toString(), equalTo(notJson)); } diff --git a/watcher/src/test/java/org/elasticsearch/watcher/input/search/SearchInputTests.java b/watcher/src/test/java/org/elasticsearch/watcher/input/search/SearchInputTests.java index 54993ac2fea..62ed94689d5 100644 --- a/watcher/src/test/java/org/elasticsearch/watcher/input/search/SearchInputTests.java +++ b/watcher/src/test/java/org/elasticsearch/watcher/input/search/SearchInputTests.java @@ -127,7 +127,7 @@ public class SearchInputTests extends ESIntegTestCase { new DateTime(0, UTC), new ScheduleTriggerEvent("test-watch", new DateTime(0, UTC), new DateTime(0, UTC)), timeValueSeconds(5)); - SearchInput.Result result = searchInput.execute(ctx); + SearchInput.Result result = searchInput.execute(ctx, new Payload.Simple()); assertThat((Integer) XContentMapValues.extractValue("hits.total", result.payload().data()), equalTo(0)); assertNotNull(result.executedRequest()); @@ -153,6 +153,7 @@ public class SearchInputTests extends ESIntegTestCase { ctxParams.put("vars", new HashMap()); ctxParams.put("watch_id", "test-watch"); ctxParams.put("trigger", triggerParams); + ctxParams.put("payload", new Payload.Simple().data()); ctxParams.put("execution_time", new DateTime(1970, 01, 01, 00, 01, 00, 000, ISOChronology.getInstanceUTC())); Map expectedParams = new HashMap(); expectedParams.put("seconds_param", "30s"); @@ -235,7 +236,7 @@ public class SearchInputTests extends ESIntegTestCase { new DateTime(0, UTC), new ScheduleTriggerEvent("test-watch", new DateTime(0, UTC), new DateTime(0, UTC)), timeValueSeconds(5)); - SearchInput.Result result = searchInput.execute(ctx); + SearchInput.Result result = searchInput.execute(ctx, new Payload.Simple()); assertThat((Integer) XContentMapValues.extractValue("hits.total", result.payload().data()), equalTo(0)); assertNotNull(result.executedRequest()); @@ -288,7 +289,7 @@ public class SearchInputTests extends ESIntegTestCase { SearchInput si = siBuilder.build(); ExecutableSearchInput searchInput = new ExecutableSearchInput(si, logger, ClientProxy.of(client()), null); - return searchInput.execute(ctx); + return searchInput.execute(ctx, new Payload.Simple()); } } diff --git a/watcher/src/test/java/org/elasticsearch/watcher/input/simple/SimpleInputTests.java b/watcher/src/test/java/org/elasticsearch/watcher/input/simple/SimpleInputTests.java index d79ec97d568..9a90b3db89e 100644 --- a/watcher/src/test/java/org/elasticsearch/watcher/input/simple/SimpleInputTests.java +++ b/watcher/src/test/java/org/elasticsearch/watcher/input/simple/SimpleInputTests.java @@ -33,7 +33,7 @@ public class SimpleInputTests extends ESTestCase { data.put("baz", new ArrayList() ); ExecutableInput staticInput = new ExecutableSimpleInput(new SimpleInput(new Payload.Simple(data)), logger); - Input.Result staticResult = staticInput.execute(null); + Input.Result staticResult = staticInput.execute(null, new Payload.Simple()); assertEquals(staticResult.payload().data().get("foo"), "bar"); List baz = (List)staticResult.payload().data().get("baz"); assertTrue(baz.isEmpty()); @@ -52,7 +52,7 @@ public class SimpleInputTests extends ESTestCase { assertEquals(input.type(), SimpleInput.TYPE); - Input.Result staticResult = input.execute(null); + Input.Result staticResult = input.execute(null, new Payload.Simple()); assertEquals(staticResult.payload().data().get("foo"), "bar"); List baz = (List)staticResult.payload().data().get("baz"); assertTrue(baz.isEmpty());