Watcher: Add chained input

```
Chained input for now works like this
{
"chain" : [
  { "first" : { "simple" : { "foo" : "bar" } } },
  { "second" : { "simple" : { "spam" : "eggs" } } }
]
```

This allows to access the payload via ctx.payload.first.foo for example

The array notation is needed to guarantee order, as JSON itself does not guarantee
order of objects.

Closes elastic/elasticsearch#353

Original commit: elastic/x-pack-elasticsearch@7ab32c43a8
This commit is contained in:
Alexander Reelsen 2015-10-30 09:55:00 +01:00
parent 52cfa2b6ed
commit 04a8fb3202
24 changed files with 575 additions and 30 deletions

View File

@ -2,8 +2,8 @@
=== Input
A watch _input_ loads data into a watch's execution context as the initial payload.
Watcher supports three input types: <<input-simple, `simple`>> , <<input-search, `search`>>,
and <<input-http, `http`>>
Watcher supports four input types: <<input-simple, `simple`>> , <<input-search, `search`>>,
<<input-http, `http`>> and <<input-chain, `chain`>>
NOTE: If you don't define an input for a watch, an empty payload is loaded into the
execution context.
@ -13,3 +13,6 @@ include::input/simple.asciidoc[]
include::input/search.asciidoc[]
include::input/http.asciidoc[]
include::input/chain.asciidoc[]

View File

@ -0,0 +1,39 @@
[[input-chain]]
==== Chain Input
An <<input, input>> that enables you to chain several inputs one after the other and have each input populating the execution context.
The `chain` input is useful when the output of one input should change the input of another or you need several inputs to gather all the
information needed for an action..
You can define the chained input as following
[source,json]
--------------------------------------------------
{
"input" : {
"chain" : {
"inputs" : [
"first" : {
"simple" : { "path" : "/_search" }
},
"second" : {
"http" : {
"request" : {
"host" : "localhost",
"port" : 9200,
"path" : "{{ctx.payload.first.path}}"
}
}
}
]
}
}
...
}
--------------------------------------------------
As you can see, the name of the input (`first` and `second` in this example) can be used to access values from the context in consecutive inputs.
In case you are wondering about the structure of this input. The `inputs` must be an array, because JSON does not guarantee the order of arbitrary
objects, one has to use a list.

View File

@ -39,12 +39,19 @@ bin/plugin remove watcher
[float]
==== 2.1.0
.New Features
* Added support for <<input-chain, chaining several inputs>>
.Enhancement
* Support for configuring a proxy in the webhook action, http input and configuring a default proxy (which is also used by the slack action), using the `watcher.http.proxy.host` and `watcher.http.proxy.port` settings.
=======
[float]
==== 2.0.0
.Bug fixes
* Fixed an issue where the scheduler may get stuck during Watcher startup. This caused no watches to ever fire.
.Breaking Changes
* The dynamic index names support has been removed and Elasticsearch's date math index names support should be used instead.
The only difference between Watcher's dynamic index names support and Elasticsearch's date math index names support is

View File

@ -335,7 +335,7 @@ public class ExecutionService extends AbstractComponent {
ctx.beforeInput();
Input.Result inputResult = ctx.inputResult();
if (inputResult == null) {
inputResult = watch.input().execute(ctx);
inputResult = watch.input().execute(ctx, ctx.payload());
ctx.onInputResult(inputResult);
}
if (inputResult.status() == Input.Result.Status.FAILURE) {

View File

@ -5,10 +5,12 @@
*/
package org.elasticsearch.watcher.input;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.watch.Payload;
import java.io.IOException;
@ -39,7 +41,7 @@ public abstract class ExecutableInput<I extends Input, R extends Input.Result> i
/**
* Executes this input
*/
public abstract R execute(WatchExecutionContext ctx);
public abstract R execute(WatchExecutionContext ctx, @Nullable Payload payload);
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {

View File

@ -8,6 +8,7 @@ package org.elasticsearch.watcher.input;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.watcher.input.chain.ChainInput;
import org.elasticsearch.watcher.input.http.HttpInput;
import org.elasticsearch.watcher.input.none.NoneInput;
import org.elasticsearch.watcher.input.search.SearchInput;
@ -61,4 +62,8 @@ public final class InputBuilders {
public static HttpInput.Builder httpInput(HttpRequestTemplate request) {
return HttpInput.builder(request);
}
public static ChainInput.Builder chainInput() {
return ChainInput.builder();
}
}

View File

@ -7,6 +7,8 @@ package org.elasticsearch.watcher.input;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.MapBinder;
import org.elasticsearch.watcher.input.chain.ChainInput;
import org.elasticsearch.watcher.input.chain.ChainInputFactory;
import org.elasticsearch.watcher.input.http.HttpInput;
import org.elasticsearch.watcher.input.http.HttpInputFactory;
import org.elasticsearch.watcher.input.none.NoneInput;
@ -46,6 +48,9 @@ public class InputModule extends AbstractModule {
bind(NoneInputFactory.class).asEagerSingleton();
parsersBinder.addBinding(NoneInput.TYPE).to(NoneInputFactory.class);
// no bind() needed, done in InitializingModule
parsersBinder.addBinding(ChainInput.TYPE).to(ChainInputFactory.class);
for (Map.Entry<String, Class<? extends InputFactory>> entry : parsers.entrySet()) {
bind(entry.getValue()).asEagerSingleton();
parsersBinder.addBinding(entry.getKey()).to(entry.getValue());

View File

@ -12,9 +12,6 @@ import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.Map;
/**
*
*/
public class InputRegistry {
private final Map<String, InputFactory> factories;
@ -62,4 +59,7 @@ public class InputRegistry {
return input;
}
public Map<String, InputFactory> factories() {
return factories;
}
}

View File

@ -0,0 +1,128 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.input.chain;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.input.Input;
import org.elasticsearch.watcher.input.InputRegistry;
import org.elasticsearch.watcher.watch.Payload;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class ChainInput implements Input {
public static final String TYPE = "chain";
private List<Tuple<String, Input>> inputs;
public ChainInput(List<Tuple<String, Input>> inputs) {
this.inputs = inputs;
}
@Override
public String type() {
return TYPE;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.startArray("inputs");
for (Tuple<String, Input> tuple : inputs) {
builder.startObject().startObject(tuple.v1());
builder.field(tuple.v2().type(), tuple.v2());
builder.endObject().endObject();
}
builder.endArray();
builder.endObject();
return builder;
}
public List<Tuple<String, Input>> getInputs() {
return inputs;
}
public static ChainInput parse(String watchId, XContentParser parser, InputRegistry inputRegistry) throws IOException {
List<Tuple<String, Input>> inputs = new ArrayList<>();
String currentFieldName;
XContentParser.Token token;
ParseField inputsField = new ParseField("inputs");
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
token = parser.nextToken();
if (token == XContentParser.Token.START_ARRAY && inputsField.getPreferredName().equals(currentFieldName)) {
String currentInputFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
if (token == XContentParser.Token.FIELD_NAME) {
currentInputFieldName = parser.currentName();
} else if (currentInputFieldName != null && token == XContentParser.Token.START_OBJECT) {
inputs.add(new Tuple<>(currentInputFieldName, inputRegistry.parse(watchId, parser).input()));
currentInputFieldName = null;
}
}
}
}
}
return new ChainInput(inputs);
}
public static ChainInput.Builder builder() {
return new Builder();
}
public static class Builder implements Input.Builder<ChainInput> {
private List<Tuple<String, Input>> inputs;
private Builder() {
inputs = new ArrayList<>();
}
public Builder add(String name, Input.Builder input) {
inputs.add(new Tuple<>(name, input.build()));
return this;
}
@Override
public ChainInput build() {
return new ChainInput(inputs);
}
}
public static class Result extends Input.Result {
private List<Tuple<String, Input.Result>> results;
protected Result(List<Tuple<String, Input.Result>> results, Payload payload) {
super(TYPE, payload);
this.results = results;
}
protected Result(Exception e) {
super(TYPE, e);
}
@Override
protected XContentBuilder typeXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(type);
for (Tuple<String, Input.Result> tuple : results) {
builder.field(tuple.v1(), tuple.v2());
}
builder.endObject();
return builder;
}
}
}

View File

@ -0,0 +1,58 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.input.chain;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.input.*;
import org.elasticsearch.watcher.support.init.InitializingService;
import java.io.IOException;
import java.util.*;
public class ChainInputFactory extends InputFactory<ChainInput, ChainInput.Result, ExecutableChainInput> implements InitializingService.Initializable {
private InputRegistry inputRegistry;
@Inject
public ChainInputFactory(Settings settings) {
super(Loggers.getLogger(ExecutableChainInput.class, settings));
}
@Override
public String type() {
return ChainInput.TYPE;
}
@Override
public ChainInput parseInput(String watchId, XContentParser parser) throws IOException {
return ChainInput.parse(watchId, parser, inputRegistry);
}
@Override
public ExecutableChainInput createExecutable(ChainInput input) {
List<Tuple<String, ExecutableInput>> executableInputs = new ArrayList<>();
for (Tuple<String, Input> tuple : input.getInputs()) {
ExecutableInput executableInput = inputRegistry.factories().get(tuple.v2().type()).createExecutable(tuple.v2());
executableInputs.add(new Tuple<>(tuple.v1(), executableInput));
}
return new ExecutableChainInput(input, executableInputs, inputLogger);
}
@Override
public void init(Injector injector) {
init(injector.getInstance(InputRegistry.class));
}
void init(InputRegistry inputRegistry) {
this.inputRegistry = inputRegistry;
}
}

View File

@ -0,0 +1,44 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.input.chain;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.input.ExecutableInput;
import org.elasticsearch.watcher.input.Input;
import org.elasticsearch.watcher.watch.Payload;
import java.util.*;
public class ExecutableChainInput extends ExecutableInput<ChainInput,ChainInput.Result> {
private List<Tuple<String, ExecutableInput>> inputs;
public ExecutableChainInput(ChainInput input, List<Tuple<String, ExecutableInput>> inputs, ESLogger logger) {
super(input, logger);
this.inputs = inputs;
}
@Override
public ChainInput.Result execute(WatchExecutionContext ctx, Payload payload) {
List<Tuple<String, Input.Result>> results = new ArrayList<>();
Map<String, Object> payloads = new HashMap<>();
try {
for (Tuple<String, ExecutableInput> tuple : inputs) {
Input.Result result = tuple.v2().execute(ctx, new Payload.Simple(payloads));
results.add(new Tuple<>(tuple.v1(), result));
payloads.put(tuple.v1(), result.payload().data());
}
return new ChainInput.Result(results, new Payload.Simple(payloads));
} catch (Exception e) {
logger.error("failed to execute [{}] input for [{}]", e, ChainInput.TYPE, ctx.watch());
return new ChainInput.Result(e);
}
}
}

View File

@ -36,10 +36,10 @@ public class ExecutableHttpInput extends ExecutableInput<HttpInput, HttpInput.Re
this.templateEngine = templateEngine;
}
public HttpInput.Result execute(WatchExecutionContext ctx) {
public HttpInput.Result execute(WatchExecutionContext ctx, Payload payload) {
HttpRequest request = null;
try {
Map<String, Object> model = Variables.createCtxModel(ctx, null);
Map<String, Object> model = Variables.createCtxModel(ctx, payload);
request = input.getRequest().render(templateEngine, model);
return doExecute(ctx, request);
} catch (Exception e) {

View File

@ -9,6 +9,7 @@ package org.elasticsearch.watcher.input.none;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.input.ExecutableInput;
import org.elasticsearch.watcher.watch.Payload;
/**
*
@ -20,7 +21,7 @@ public class ExecutableNoneInput extends ExecutableInput<NoneInput, NoneInput.Re
}
@Override
public NoneInput.Result execute(WatchExecutionContext ctx) {
public NoneInput.Result execute(WatchExecutionContext ctx, Payload payload) {
return NoneInput.Result.INSTANCE;
}

View File

@ -44,10 +44,10 @@ public class ExecutableSearchInput extends ExecutableInput<SearchInput, SearchIn
}
@Override
public SearchInput.Result execute(WatchExecutionContext ctx) {
public SearchInput.Result execute(WatchExecutionContext ctx, Payload payload) {
SearchRequest request = null;
try {
request = WatcherUtils.createSearchRequestFromPrototype(input.getSearchRequest(), ctx, null);
request = WatcherUtils.createSearchRequestFromPrototype(input.getSearchRequest(), ctx, payload);
return doExecute(ctx, request);
} catch (Exception e) {
logger.error("failed to execute [{}] input for [{}]", e, SearchInput.TYPE, ctx.watch());

View File

@ -8,6 +8,7 @@ package org.elasticsearch.watcher.input.simple;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.input.ExecutableInput;
import org.elasticsearch.watcher.watch.Payload;
/**
* This class just defines a simple xcontent map as an input
@ -19,7 +20,7 @@ public class ExecutableSimpleInput extends ExecutableInput<SimpleInput, SimpleIn
}
@Override
public SimpleInput.Result execute(WatchExecutionContext ctx) {
public SimpleInput.Result execute(WatchExecutionContext ctx, Payload payload) {
return new SimpleInput.Result(input.getPayload());
}
}

View File

@ -5,11 +5,12 @@
*/
package org.elasticsearch.watcher.support.init;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.watcher.input.chain.ChainInputFactory;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.watcher.transform.chain.ChainTransformFactory;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.Multibinder;
/**
*
@ -21,11 +22,13 @@ public class InitializingModule extends AbstractModule {
bind(ClientProxy.class).asEagerSingleton();
bind(ScriptServiceProxy.class).asEagerSingleton();
bind(ChainInputFactory.class).asEagerSingleton();
Multibinder<InitializingService.Initializable> mbinder = Multibinder.newSetBinder(binder(), InitializingService.Initializable.class);
mbinder.addBinding().to(ClientProxy.class);
mbinder.addBinding().to(ScriptServiceProxy.class);
mbinder.addBinding().to(ChainTransformFactory.class);
mbinder.addBinding().to(ChainInputFactory.class);
bind(InitializingService.class).asEagerSingleton();
}
}

View File

@ -43,7 +43,7 @@ public class InitializingService extends AbstractLifecycleComponent {
protected void doClose() throws ElasticsearchException {
}
public static interface Initializable {
public interface Initializable {
void init(Injector injector);
}

View File

@ -43,6 +43,7 @@ import static org.mockito.Mockito.*;
/**
*/
public class ExecutionServiceTests extends ESTestCase {
private Payload payload;
private ExecutableInput input;
private Input.Result inputResult;
@ -61,7 +62,7 @@ public class ExecutionServiceTests extends ESTestCase {
inputResult = mock(Input.Result.class);
when(inputResult.status()).thenReturn(Input.Result.Status.SUCCESS);
when(inputResult.payload()).thenReturn(payload);
when(input.execute(any(WatchExecutionContext.class))).thenReturn(inputResult);
when(input.execute(any(WatchExecutionContext.class), any(Payload.class))).thenReturn(inputResult);
watchStore = mock(WatchStore.class);
triggeredWatchStore = mock(TriggeredWatchStore.class);
@ -163,7 +164,7 @@ public class ExecutionServiceTests extends ESTestCase {
Input.Result inputResult = mock(Input.Result.class);
when(inputResult.status()).thenReturn(Input.Result.Status.FAILURE);
when(inputResult.reason()).thenReturn("_reason");
when(input.execute(context)).thenReturn(inputResult);
when(input.execute(eq(context), any(Payload.class))).thenReturn(inputResult);
Condition.Result conditionResult = AlwaysCondition.Result.INSTANCE;
ExecutableCondition condition = mock(ExecutableCondition.class);
@ -214,7 +215,7 @@ public class ExecutionServiceTests extends ESTestCase {
verify(historyStore, times(1)).put(watchRecord);
verify(lock, times(1)).release();
verify(input, times(1)).execute(context);
verify(input, times(1)).execute(context, null);
verify(condition, never()).execute(context);
verify(watchTransform, never()).execute(context, payload);
verify(action, never()).execute("_action", context, payload);
@ -282,7 +283,7 @@ public class ExecutionServiceTests extends ESTestCase {
verify(historyStore, times(1)).put(watchRecord);
verify(lock, times(1)).release();
verify(input, times(1)).execute(context);
verify(input, times(1)).execute(context, null);
verify(condition, times(1)).execute(context);
verify(watchTransform, never()).execute(context, payload);
verify(action, never()).execute("_action", context, payload);
@ -349,7 +350,7 @@ public class ExecutionServiceTests extends ESTestCase {
verify(historyStore, times(1)).put(watchRecord);
verify(lock, times(1)).release();
verify(input, times(1)).execute(context);
verify(input, times(1)).execute(context, null);
verify(condition, times(1)).execute(context);
verify(watchTransform, times(1)).execute(context, payload);
verify(action, never()).execute("_action", context, payload);
@ -420,7 +421,7 @@ public class ExecutionServiceTests extends ESTestCase {
verify(historyStore, times(1)).put(watchRecord);
verify(lock, times(1)).release();
verify(input, times(1)).execute(context);
verify(input, times(1)).execute(context, null);
verify(condition, times(1)).execute(context);
verify(watchTransform, times(1)).execute(context, payload);
// the action level transform is executed before the action itself

View File

@ -18,6 +18,7 @@ import static org.hamcrest.Matchers.containsString;
*
*/
public class InputRegistryTests extends ESTestCase {
public void testParseEmptyInput() throws Exception {
InputRegistry registry = new InputRegistry(emptyMap());
XContentParser parser = JsonXContent.jsonXContent.createParser(

View File

@ -0,0 +1,173 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.input.chain;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.watcher.actions.ActionWrapper;
import org.elasticsearch.watcher.actions.ExecutableActions;
import org.elasticsearch.watcher.condition.always.ExecutableAlwaysCondition;
import org.elasticsearch.watcher.execution.TriggeredExecutionContext;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.input.Input;
import org.elasticsearch.watcher.input.InputFactory;
import org.elasticsearch.watcher.input.InputRegistry;
import org.elasticsearch.watcher.input.http.HttpInput;
import org.elasticsearch.watcher.input.simple.ExecutableSimpleInput;
import org.elasticsearch.watcher.input.simple.SimpleInput;
import org.elasticsearch.watcher.input.simple.SimpleInputFactory;
import org.elasticsearch.watcher.support.http.HttpRequestTemplate;
import org.elasticsearch.watcher.support.http.auth.basic.BasicAuth;
import org.elasticsearch.watcher.trigger.schedule.IntervalSchedule;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTrigger;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.watcher.watch.Watch;
import org.elasticsearch.watcher.watch.WatchStatus;
import org.joda.time.DateTime;
import org.junit.Test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import static java.util.Collections.emptyMap;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.watcher.actions.ActionBuilders.loggingAction;
import static org.elasticsearch.watcher.client.WatchSourceBuilders.watchBuilder;
import static org.elasticsearch.watcher.condition.ConditionBuilders.scriptCondition;
import static org.elasticsearch.watcher.input.InputBuilders.*;
import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.watcher.trigger.schedule.Schedules.interval;
import static org.hamcrest.Matchers.*;
import static org.joda.time.DateTimeZone.UTC;
public class ChainInputTests extends ESTestCase {
/* note, first line does not need to be parsed
"chain" : {
"inputs" : [
{ "first" : { "simple" : { "foo" : "bar" } } },
{ "second" : { "simple" : { "spam" : "eggs" } } }
]
}
*/
public void testThatExecutionWorks() throws Exception {
Map<String, InputFactory> factories = new HashMap<>();
factories.put("simple", new SimpleInputFactory(Settings.EMPTY));
// hackedy hack...
InputRegistry inputRegistry = new InputRegistry(factories);
ChainInputFactory chainInputFactory = new ChainInputFactory(Settings.EMPTY);
chainInputFactory.init(inputRegistry);
factories.put("chain", chainInputFactory);
XContentBuilder builder = jsonBuilder().startObject().startArray("inputs")
.startObject().startObject("first").startObject("simple").field("foo", "bar").endObject().endObject().endObject()
.startObject().startObject("second").startObject("simple").field("spam", "eggs").endObject().endObject().endObject()
.endArray().endObject();
// first pass JSON and check for correct inputs
XContentParser parser = XContentFactory.xContent(builder.bytes()).createParser(builder.bytes());
parser.nextToken();
ChainInput chainInput = chainInputFactory.parseInput("test", parser);
assertThat(chainInput.getInputs(), hasSize(2));
assertThat(chainInput.getInputs().get(0).v1(), is("first"));
assertThat(chainInput.getInputs().get(0).v2(), instanceOf(SimpleInput.class));
assertThat(chainInput.getInputs().get(1).v1(), is("second"));
assertThat(chainInput.getInputs().get(1).v2(), instanceOf(SimpleInput.class));
// now execute
ExecutableChainInput executableChainInput = chainInputFactory.createExecutable(chainInput);
ChainInput.Result result = executableChainInput.execute(createContext(), new Payload.Simple());
Payload payload = result.payload();
assertThat(payload.data(), hasKey("first"));
assertThat(payload.data(), hasKey("second"));
assertThat(payload.data().get("first"), instanceOf(Map.class));
assertThat(payload.data().get("second"), instanceOf(Map.class));
// final payload check
Map<String, Object> firstPayload = (Map<String,Object>) payload.data().get("first");
Map<String, Object> secondPayload = (Map<String,Object>) payload.data().get("second");
assertThat(firstPayload, hasEntry("foo", "bar"));
assertThat(secondPayload, hasEntry("spam", "eggs"));
}
public void testToXContent() throws Exception {
ChainInput chainedInput = chainInput()
.add("first", simpleInput("foo", "bar"))
.add("second", simpleInput("spam", "eggs"))
.build();
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
chainedInput.toXContent(builder, ToXContent.EMPTY_PARAMS);
assertThat(builder.bytes().toUtf8(), is("{\"inputs\":[{\"first\":{\"simple\":{\"foo\":\"bar\"}}},{\"second\":{\"simple\":{\"spam\":\"eggs\"}}}]}"));
// parsing it back as well!
Map<String, InputFactory> factories = new HashMap<>();
factories.put("simple", new SimpleInputFactory(Settings.EMPTY));
InputRegistry inputRegistry = new InputRegistry(factories);
ChainInputFactory chainInputFactory = new ChainInputFactory(Settings.EMPTY);
chainInputFactory.init(inputRegistry);
factories.put("chain", chainInputFactory);
XContentParser parser = XContentFactory.xContent(builder.bytes()).createParser(builder.bytes());
parser.nextToken();
ChainInput parsedChainInput = ChainInput.parse("testWatchId", parser, inputRegistry);
assertThat(parsedChainInput.getInputs(), hasSize(2));
assertThat(parsedChainInput.getInputs().get(0).v1(), is("first"));
assertThat(parsedChainInput.getInputs().get(0).v2(), is(instanceOf(SimpleInput.class)));
assertThat(parsedChainInput.getInputs().get(1).v1(), is("second"));
assertThat(parsedChainInput.getInputs().get(1).v2(), is(instanceOf(SimpleInput.class)));
}
public void testThatWatchSourceBuilderWorksWithChainInput() throws Exception {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
HttpInput.Builder httpInputBuilder = httpInput(HttpRequestTemplate.builder("theHost", 1234)
.path("/index/_search")
.body(jsonBuilder().startObject().field("size", 1).endObject())
.auth(new BasicAuth("test", "changeme".toCharArray())));
ChainInput.Builder chainedInputBuilder = chainInput()
.add("foo", httpInputBuilder)
.add("bar", simpleInput("spam", "eggs"));
watchBuilder()
.trigger(schedule(interval("5s")))
.input(chainedInputBuilder)
.condition(scriptCondition("ctx.payload.hits.total == 1"))
.addAction("_id", loggingAction("watch [{{ctx.watch_id}}] matched"))
.toXContent(builder, ToXContent.EMPTY_PARAMS);
// no exception means all good
}
private WatchExecutionContext createContext() {
Watch watch = new Watch("test-watch",
new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))),
new ExecutableSimpleInput(new SimpleInput(new Payload.Simple()), logger),
new ExecutableAlwaysCondition(logger),
null,
null,
new ExecutableActions(new ArrayList<ActionWrapper>()),
null,
new WatchStatus(new DateTime(0, UTC), emptyMap()));
WatchExecutionContext ctx = new TriggeredExecutionContext(watch,
new DateTime(0, UTC),
new ScheduleTriggerEvent(watch.id(), new DateTime(0, UTC), new DateTime(0, UTC)),
TimeValue.timeValueSeconds(5));
return ctx;
}
}

View File

@ -0,0 +1,73 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.input.chain;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.Node;
import org.elasticsearch.watcher.input.http.HttpInput;
import org.elasticsearch.watcher.support.http.HttpRequestTemplate;
import org.elasticsearch.watcher.support.http.auth.basic.BasicAuth;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTestCase;
import org.junit.Test;
import java.net.InetSocketAddress;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.watcher.actions.ActionBuilders.indexAction;
import static org.elasticsearch.watcher.client.WatchSourceBuilders.watchBuilder;
import static org.elasticsearch.watcher.input.InputBuilders.*;
import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.watcher.trigger.schedule.Schedules.interval;
import static org.hamcrest.Matchers.containsString;
public class ChainIntegrationTests extends AbstractWatcherIntegrationTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(Node.HTTP_ENABLED, true)
.build();
}
public void testChainedInputsAreWorking() throws Exception {
String index = "the-most-awesome-index-ever";
createIndex(index);
client().prepareIndex(index, "type", "id").setSource("{}").setRefresh(true).get();
InetSocketAddress address = internalCluster().httpAddresses()[0];
HttpInput.Builder httpInputBuilder = httpInput(HttpRequestTemplate.builder(address.getHostString(), address.getPort())
.path("{{ctx.payload.first.url}}")
.body(jsonBuilder().startObject().field("size", 1).endObject())
.auth(shieldEnabled() ? new BasicAuth("test", "changeme".toCharArray()) : null));
ChainInput.Builder chainedInputBuilder = chainInput()
.add("first", simpleInput("url", "/" + index + "/_search"))
.add("second", httpInputBuilder);
watcherClient().preparePutWatch("_name")
.setSource(watchBuilder()
.trigger(schedule(interval("5s")))
.input(chainedInputBuilder)
.addAction("indexAction", indexAction("my-index", "my-type")))
.get();
if (timeWarped()) {
timeWarp().scheduler().trigger("_name");
refresh();
}
refresh();
SearchResponse searchResponse = client().prepareSearch("my-index").setTypes("my-type").get();
assertHitCount(searchResponse, 1);
assertThat(searchResponse.getHits().getAt(0).sourceAsString(), containsString(index));
assertWatchWithMinimumPerformedActionsCount("_name", 1, false);
}
}

View File

@ -135,7 +135,7 @@ public class HttpInputTests extends ESTestCase {
new DateTime(0, UTC),
new ScheduleTriggerEvent(watch.id(), new DateTime(0, UTC), new DateTime(0, UTC)),
TimeValue.timeValueSeconds(5));
HttpInput.Result result = input.execute(ctx);
HttpInput.Result result = input.execute(ctx, new Payload.Simple());
assertThat(result.type(), equalTo(HttpInput.TYPE));
assertThat(result.payload().data(), equalTo(MapBuilder.<String, Object>newMapBuilder().put("key", "value").map()));
}
@ -165,7 +165,7 @@ public class HttpInputTests extends ESTestCase {
new DateTime(0, UTC),
new ScheduleTriggerEvent(watch.id(), new DateTime(0, UTC), new DateTime(0, UTC)),
TimeValue.timeValueSeconds(5));
HttpInput.Result result = input.execute(ctx);
HttpInput.Result result = input.execute(ctx, new Payload.Simple());
assertThat(result.type(), equalTo(HttpInput.TYPE));
assertThat(result.payload().data().get("_value").toString(), equalTo(notJson));
}

View File

@ -127,7 +127,7 @@ public class SearchInputTests extends ESIntegTestCase {
new DateTime(0, UTC),
new ScheduleTriggerEvent("test-watch", new DateTime(0, UTC), new DateTime(0, UTC)),
timeValueSeconds(5));
SearchInput.Result result = searchInput.execute(ctx);
SearchInput.Result result = searchInput.execute(ctx, new Payload.Simple());
assertThat((Integer) XContentMapValues.extractValue("hits.total", result.payload().data()), equalTo(0));
assertNotNull(result.executedRequest());
@ -153,6 +153,7 @@ public class SearchInputTests extends ESIntegTestCase {
ctxParams.put("vars", new HashMap<String, Object>());
ctxParams.put("watch_id", "test-watch");
ctxParams.put("trigger", triggerParams);
ctxParams.put("payload", new Payload.Simple().data());
ctxParams.put("execution_time", new DateTime(1970, 01, 01, 00, 01, 00, 000, ISOChronology.getInstanceUTC()));
Map<String, Object> expectedParams = new HashMap<String, Object>();
expectedParams.put("seconds_param", "30s");
@ -235,7 +236,7 @@ public class SearchInputTests extends ESIntegTestCase {
new DateTime(0, UTC),
new ScheduleTriggerEvent("test-watch", new DateTime(0, UTC), new DateTime(0, UTC)),
timeValueSeconds(5));
SearchInput.Result result = searchInput.execute(ctx);
SearchInput.Result result = searchInput.execute(ctx, new Payload.Simple());
assertThat((Integer) XContentMapValues.extractValue("hits.total", result.payload().data()), equalTo(0));
assertNotNull(result.executedRequest());
@ -288,7 +289,7 @@ public class SearchInputTests extends ESIntegTestCase {
SearchInput si = siBuilder.build();
ExecutableSearchInput searchInput = new ExecutableSearchInput(si, logger, ClientProxy.of(client()), null);
return searchInput.execute(ctx);
return searchInput.execute(ctx, new Payload.Simple());
}
}

View File

@ -33,7 +33,7 @@ public class SimpleInputTests extends ESTestCase {
data.put("baz", new ArrayList<String>() );
ExecutableInput staticInput = new ExecutableSimpleInput(new SimpleInput(new Payload.Simple(data)), logger);
Input.Result staticResult = staticInput.execute(null);
Input.Result staticResult = staticInput.execute(null, new Payload.Simple());
assertEquals(staticResult.payload().data().get("foo"), "bar");
List baz = (List)staticResult.payload().data().get("baz");
assertTrue(baz.isEmpty());
@ -52,7 +52,7 @@ public class SimpleInputTests extends ESTestCase {
assertEquals(input.type(), SimpleInput.TYPE);
Input.Result staticResult = input.execute(null);
Input.Result staticResult = input.execute(null, new Payload.Simple());
assertEquals(staticResult.payload().data().get("foo"), "bar");
List baz = (List)staticResult.payload().data().get("baz");
assertTrue(baz.isEmpty());