[Test] Re enable some messy Watcher tests
This commit reenable some Watcher tests that were muted in elastic/elasticsearch#724. It removes the Groovy dependency for tests that don't really use Groovy features and replace scripts with mocked scripts. It converts a GroovyScriptConditionIT test into a REST test and moves it in the smoke-test-watcher-with-groovy project. closes elastic/elasticsearch#724 Original commit: elastic/x-pack-elasticsearch@f4c8161946
This commit is contained in:
parent
4fdc183e00
commit
b07394090f
|
@ -1,12 +0,0 @@
|
|||
|
||||
/*
|
||||
* Messy tests that depend on groovy directly. Fix these!
|
||||
* https://github.com/elastic/x-plugins/issues/724
|
||||
*/
|
||||
|
||||
apply plugin: 'elasticsearch.messy-test'
|
||||
|
||||
dependencies {
|
||||
testCompile project(path: ':x-plugins:elasticsearch:x-pack', configuration: 'testArtifacts')
|
||||
testCompile project(path: ':modules:lang-groovy', configuration: 'runtime')
|
||||
}
|
|
@ -1,181 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.messy.tests;
|
||||
|
||||
import org.elasticsearch.action.search.SearchRequestBuilder;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.common.util.Callback;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.script.groovy.GroovyPlugin;
|
||||
import org.elasticsearch.xpack.watcher.client.WatcherClient;
|
||||
import org.elasticsearch.xpack.watcher.support.xcontent.ObjectPath;
|
||||
import org.elasticsearch.xpack.watcher.support.xcontent.XContentSource;
|
||||
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
|
||||
import org.elasticsearch.xpack.watcher.transport.actions.execute.ExecuteWatchResponse;
|
||||
import org.elasticsearch.xpack.watcher.transport.actions.put.PutWatchResponse;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingAction;
|
||||
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
|
||||
import static org.elasticsearch.xpack.watcher.condition.ConditionBuilders.scriptCondition;
|
||||
import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput;
|
||||
import static org.elasticsearch.xpack.watcher.transform.TransformBuilders.scriptTransform;
|
||||
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
|
||||
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.cron;
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.CoreMatchers.nullValue;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class ExecutionVarsIT extends AbstractWatcherIntegrationTestCase {
|
||||
|
||||
@Override
|
||||
protected List<Class<? extends Plugin>> pluginTypes() {
|
||||
List<Class<? extends Plugin>> types = super.pluginTypes();
|
||||
types.add(GroovyPlugin.class);
|
||||
return types;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean timeWarped() {
|
||||
return true;
|
||||
}
|
||||
|
||||
public void testVars() throws Exception {
|
||||
WatcherClient watcherClient = watcherClient();
|
||||
|
||||
PutWatchResponse putWatchResponse = watcherClient.preparePutWatch("_id").setSource(watchBuilder()
|
||||
.trigger(schedule(cron("0/1 * * * * ?")))
|
||||
.input(simpleInput("value", 5))
|
||||
.condition(scriptCondition("ctx.vars.condition_value = ctx.payload.value + 5; return ctx.vars.condition_value > 5;"))
|
||||
.transform(scriptTransform("ctx.vars.watch_transform_value = ctx.vars.condition_value + 5; return ctx.payload;"))
|
||||
.addAction(
|
||||
"a1",
|
||||
scriptTransform("ctx.vars.a1_transform_value = ctx.vars.watch_transform_value + 10; ctx.payload" +
|
||||
".a1_transformed_value = ctx.vars.a1_transform_value; return ctx.payload;"),
|
||||
loggingAction("_text"))
|
||||
.addAction(
|
||||
"a2",
|
||||
scriptTransform("ctx.vars.a2_transform_value = ctx.vars.watch_transform_value + 20; ctx.payload" +
|
||||
".a2_transformed_value = ctx.vars.a2_transform_value; return ctx.payload;"),
|
||||
loggingAction("_text")))
|
||||
.get();
|
||||
|
||||
assertThat(putWatchResponse.isCreated(), is(true));
|
||||
|
||||
timeWarp().scheduler().trigger("_id");
|
||||
|
||||
flush();
|
||||
refresh();
|
||||
|
||||
SearchResponse searchResponse = searchWatchRecords(new Callback<SearchRequestBuilder>() {
|
||||
@Override
|
||||
public void handle(SearchRequestBuilder builder) {
|
||||
// defaults to match all;
|
||||
}
|
||||
});
|
||||
|
||||
assertThat(searchResponse.getHits().getTotalHits(), is(1L));
|
||||
|
||||
Map<String, Object> source = searchResponse.getHits().getAt(0).getSource();
|
||||
|
||||
assertValue(source, "watch_id", is("_id"));
|
||||
assertValue(source, "state", is("executed"));
|
||||
|
||||
// we don't store the computed vars in history
|
||||
assertValue(source, "vars", nullValue());
|
||||
|
||||
assertValue(source, "result.condition.status", is("success"));
|
||||
assertValue(source, "result.transform.status", is("success"));
|
||||
|
||||
List<Map<String, Object>> actions = ObjectPath.eval("result.actions", source);
|
||||
for (Map<String, Object> action : actions) {
|
||||
String id = (String) action.get("id");
|
||||
switch (id) {
|
||||
case "a1":
|
||||
assertValue(action, "status", is("success"));
|
||||
assertValue(action, "transform.status", is("success"));
|
||||
assertValue(action, "transform.payload.a1_transformed_value", equalTo(25));
|
||||
break;
|
||||
case "a2":
|
||||
assertValue(action, "status", is("success"));
|
||||
assertValue(action, "transform.status", is("success"));
|
||||
assertValue(action, "transform.payload.a2_transformed_value", equalTo(35));
|
||||
break;
|
||||
default:
|
||||
fail("there should not be an action result for action with an id other than a1 or a2");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testVarsManual() throws Exception {
|
||||
WatcherClient watcherClient = watcherClient();
|
||||
|
||||
PutWatchResponse putWatchResponse = watcherClient.preparePutWatch("_id").setSource(watchBuilder()
|
||||
.trigger(schedule(cron("0/1 * * * * ? 2020")))
|
||||
.input(simpleInput("value", 5))
|
||||
.condition(scriptCondition("ctx.vars.condition_value = ctx.payload.value + 5; return ctx.vars.condition_value > 5;"))
|
||||
.transform(scriptTransform("ctx.vars.watch_transform_value = ctx.vars.condition_value + 5; return ctx.payload;"))
|
||||
.addAction(
|
||||
"a1",
|
||||
scriptTransform("ctx.vars.a1_transform_value = ctx.vars.watch_transform_value + 10; ctx.payload" +
|
||||
".a1_transformed_value = ctx.vars.a1_transform_value; return ctx.payload;"),
|
||||
loggingAction("_text"))
|
||||
.addAction(
|
||||
"a2",
|
||||
scriptTransform("ctx.vars.a2_transform_value = ctx.vars.watch_transform_value + 20; ctx.payload" +
|
||||
".a2_transformed_value = ctx.vars.a2_transform_value; return ctx.payload;"),
|
||||
loggingAction("_text")))
|
||||
.get();
|
||||
|
||||
assertThat(putWatchResponse.isCreated(), is(true));
|
||||
|
||||
boolean debug = randomBoolean();
|
||||
|
||||
ExecuteWatchResponse executeWatchResponse = watcherClient
|
||||
.prepareExecuteWatch("_id")
|
||||
.setDebug(debug)
|
||||
.get();
|
||||
assertThat(executeWatchResponse.getRecordId(), notNullValue());
|
||||
XContentSource source = executeWatchResponse.getRecordSource();
|
||||
|
||||
assertValue(source, "watch_id", is("_id"));
|
||||
assertValue(source, "state", is("executed"));
|
||||
|
||||
if (debug) {
|
||||
assertValue(source, "vars.condition_value", is(10));
|
||||
assertValue(source, "vars.watch_transform_value", is(15));
|
||||
assertValue(source, "vars.a1_transform_value", is(25));
|
||||
assertValue(source, "vars.a2_transform_value", is(35));
|
||||
}
|
||||
|
||||
assertValue(source, "result.condition.status", is("success"));
|
||||
assertValue(source, "result.transform.status", is("success"));
|
||||
|
||||
List<Map<String, Object>> actions = source.getValue("result.actions");
|
||||
for (Map<String, Object> action : actions) {
|
||||
String id = (String) action.get("id");
|
||||
switch (id) {
|
||||
case "a1":
|
||||
assertValue(action, "status", is("success"));
|
||||
assertValue(action, "transform.status", is("success"));
|
||||
assertValue(action, "transform.payload.a1_transformed_value", equalTo(25));
|
||||
break;
|
||||
case "a2":
|
||||
assertValue(action, "status", is("success"));
|
||||
assertValue(action, "transform.status", is("success"));
|
||||
assertValue(action, "transform.payload.a2_transformed_value", equalTo(35));
|
||||
break;
|
||||
default:
|
||||
fail("there should not be an action result for action with an id other than a1 or a2");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,121 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.messy.tests;
|
||||
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.script.groovy.GroovyPlugin;
|
||||
import org.elasticsearch.xpack.watcher.client.WatchSourceBuilder;
|
||||
import org.elasticsearch.xpack.watcher.condition.script.ScriptCondition;
|
||||
import org.elasticsearch.xpack.watcher.execution.ManualExecutionContext;
|
||||
import org.elasticsearch.xpack.watcher.execution.ManualExecutionTests.ExecutionRunner;
|
||||
import org.elasticsearch.xpack.watcher.history.WatchRecord;
|
||||
import org.elasticsearch.xpack.watcher.support.WatcherScript;
|
||||
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
|
||||
import org.elasticsearch.xpack.watcher.transport.actions.delete.DeleteWatchResponse;
|
||||
import org.elasticsearch.xpack.watcher.transport.actions.get.GetWatchRequest;
|
||||
import org.elasticsearch.xpack.watcher.transport.actions.put.PutWatchRequest;
|
||||
import org.elasticsearch.xpack.watcher.transport.actions.put.PutWatchResponse;
|
||||
import org.elasticsearch.xpack.watcher.trigger.manual.ManualTriggerEvent;
|
||||
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent;
|
||||
import org.elasticsearch.xpack.watcher.watch.Watch;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.DateTimeZone;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingAction;
|
||||
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
|
||||
import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput;
|
||||
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
|
||||
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.cron;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.lessThan;
|
||||
|
||||
|
||||
/**
|
||||
* Two groovy-using methods from ManualExecutionTests.
|
||||
* They appear to be using groovy as a way to sleep.
|
||||
*/
|
||||
public class GroovyManualExecutionIT extends AbstractWatcherIntegrationTestCase {
|
||||
|
||||
@Override
|
||||
protected List<Class<? extends Plugin>> pluginTypes() {
|
||||
List<Class<? extends Plugin>> types = super.pluginTypes();
|
||||
types.add(GroovyPlugin.class);
|
||||
return types;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean enableSecurity() {
|
||||
return false;
|
||||
}
|
||||
|
||||
public void testWatchExecutionDuration() throws Exception {
|
||||
WatchSourceBuilder watchBuilder = watchBuilder()
|
||||
.trigger(schedule(cron("0 0 0 1 * ? 2099")))
|
||||
.input(simpleInput("foo", "bar"))
|
||||
.condition(new ScriptCondition((new WatcherScript.Builder.Inline("sleep 100; return true")).build()))
|
||||
.addAction("log", loggingAction("foobar"));
|
||||
|
||||
Watch watch = watchParser().parse("_id", false, watchBuilder.buildAsBytes(XContentType.JSON));
|
||||
ManualExecutionContext.Builder ctxBuilder = ManualExecutionContext.builder(watch, false, new ManualTriggerEvent("_id",
|
||||
new ScheduleTriggerEvent(new DateTime(DateTimeZone.UTC), new DateTime(DateTimeZone.UTC))),
|
||||
new TimeValue(1, TimeUnit.HOURS));
|
||||
WatchRecord record = executionService().execute(ctxBuilder.build());
|
||||
assertThat(record.result().executionDurationMs(), greaterThanOrEqualTo(100L));
|
||||
}
|
||||
|
||||
public void testForceDeletionOfLongRunningWatch() throws Exception {
|
||||
WatchSourceBuilder watchBuilder = watchBuilder()
|
||||
.trigger(schedule(cron("0 0 0 1 * ? 2099")))
|
||||
.input(simpleInput("foo", "bar"))
|
||||
.condition(new ScriptCondition((new WatcherScript.Builder.Inline("sleep 10000; return true")).build()))
|
||||
.defaultThrottlePeriod(new TimeValue(1, TimeUnit.HOURS))
|
||||
.addAction("log", loggingAction("foobar"));
|
||||
|
||||
int numberOfThreads = scaledRandomIntBetween(1, 5);
|
||||
PutWatchResponse putWatchResponse = watcherClient().putWatch(new PutWatchRequest("_id", watchBuilder)).actionGet();
|
||||
assertThat(putWatchResponse.getVersion(), greaterThan(0L));
|
||||
refresh();
|
||||
assertThat(watcherClient().getWatch(new GetWatchRequest("_id")).actionGet().isFound(), equalTo(true));
|
||||
|
||||
CountDownLatch startLatch = new CountDownLatch(1);
|
||||
|
||||
List<Thread> threads = new ArrayList<>();
|
||||
for (int i = 0; i < numberOfThreads; ++i) {
|
||||
threads.add(new Thread(new ExecutionRunner(watchService(), executionService(), "_id", startLatch)));
|
||||
}
|
||||
|
||||
for (Thread thread : threads) {
|
||||
thread.start();
|
||||
}
|
||||
DeleteWatchResponse deleteWatchResponse = watcherClient().prepareDeleteWatch("_id").setForce(true).get();
|
||||
assertThat(deleteWatchResponse.isFound(), is(true));
|
||||
|
||||
deleteWatchResponse = watcherClient().prepareDeleteWatch("_id").get();
|
||||
assertThat(deleteWatchResponse.isFound(), is(false));
|
||||
|
||||
startLatch.countDown();
|
||||
|
||||
long startJoin = System.currentTimeMillis();
|
||||
for (Thread thread : threads) {
|
||||
thread.join();
|
||||
}
|
||||
long endJoin = System.currentTimeMillis();
|
||||
TimeValue tv = new TimeValue(10 * (numberOfThreads+1), TimeUnit.SECONDS);
|
||||
assertThat("Shouldn't take longer than [" + tv.getSeconds() + "] seconds for all the threads to stop", (endJoin - startJoin),
|
||||
lessThan(tv.getMillis()));
|
||||
}
|
||||
|
||||
}
|
|
@ -1,113 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.messy.tests;
|
||||
|
||||
import org.elasticsearch.action.search.SearchRequestBuilder;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.script.groovy.GroovyPlugin;
|
||||
import org.elasticsearch.search.aggregations.AggregationBuilders;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.watcher.condition.script.ExecutableScriptCondition;
|
||||
import org.elasticsearch.xpack.watcher.condition.script.ScriptCondition;
|
||||
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
|
||||
import org.elasticsearch.xpack.watcher.support.WatcherScript;
|
||||
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
|
||||
import org.elasticsearch.xpack.watcher.watch.Payload;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.elasticsearch.messy.tests.MessyTestUtils.createScriptService;
|
||||
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.mockExecutionContext;
|
||||
|
||||
public class GroovyScriptConditionIT extends AbstractWatcherIntegrationTestCase {
|
||||
|
||||
@Override
|
||||
protected List<Class<? extends Plugin>> pluginTypes() {
|
||||
List<Class<? extends Plugin>> types = super.pluginTypes();
|
||||
types.add(GroovyPlugin.class);
|
||||
return types;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean enableSecurity() {
|
||||
return false;
|
||||
}
|
||||
|
||||
private static ThreadPool THREAD_POOL;
|
||||
private ScriptService scriptService;
|
||||
|
||||
@BeforeClass
|
||||
public static void startThreadPool() {
|
||||
THREAD_POOL = new TestThreadPool(GroovyScriptConditionIT.class.getSimpleName());
|
||||
}
|
||||
|
||||
@Before
|
||||
public void init() throws Exception {
|
||||
scriptService = createScriptService(THREAD_POOL);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void stopThreadPool() throws InterruptedException {
|
||||
ThreadPool.terminate(THREAD_POOL, 30, TimeUnit.SECONDS);
|
||||
// since static must set to null to be eligible for collection
|
||||
THREAD_POOL = null;
|
||||
}
|
||||
|
||||
public void testGroovyClosureWithAggregations() throws Exception {
|
||||
for (int seconds = 0; seconds < 60; seconds += 5) {
|
||||
String timestamp = "2005-01-01T00:00:" + String.format(Locale.ROOT, "%02d", seconds);
|
||||
client().prepareIndex(".monitoring", "cluster_stats")
|
||||
.setSource("status", randomFrom("green", "yellow"), "@timestamp", timestamp).get();
|
||||
}
|
||||
|
||||
refresh();
|
||||
|
||||
SearchRequestBuilder builder = client().prepareSearch(".monitoring")
|
||||
.addAggregation(
|
||||
AggregationBuilders
|
||||
.dateHistogram("minutes").field("@timestamp").interval(TimeUnit.MILLISECONDS.convert(5, TimeUnit.SECONDS))
|
||||
.order(Histogram.Order.COUNT_DESC)
|
||||
.subAggregation(AggregationBuilders.terms("status").field("status.keyword").size(3)));
|
||||
SearchResponse unmetResponse = builder.get();
|
||||
|
||||
ExecutableScriptCondition condition =
|
||||
new ExecutableScriptCondition(new ScriptCondition(WatcherScript.inline(
|
||||
String.join(
|
||||
" ",
|
||||
"if (ctx.payload.hits.total < 1) return false;",
|
||||
"def rows = ctx.payload.hits.hits;",
|
||||
"if (ctx.payload.aggregations.minutes.buckets.size() < 12) return false;",
|
||||
"def last60Seconds = ctx.payload.aggregations.minutes.buckets[-12..-1];",
|
||||
"return last60Seconds.every { it.status.buckets.every { s -> s.key == 'red' } }"
|
||||
)
|
||||
).lang("groovy").build()), logger, scriptService);
|
||||
|
||||
WatchExecutionContext unmetContext = mockExecutionContext("_name", new Payload.XContent(unmetResponse));
|
||||
assertFalse(condition.execute(unmetContext).met());
|
||||
|
||||
for (int seconds = 0; seconds < 60; seconds += 5) {
|
||||
String timestamp = "2005-01-01T00:01:" + String.format(Locale.ROOT, "%02d", seconds);
|
||||
client().prepareIndex(".monitoring", "cluster_stats").setSource("status", randomFrom("red"), "@timestamp", timestamp).get();
|
||||
}
|
||||
|
||||
refresh();
|
||||
|
||||
SearchResponse metResponse = builder.get();
|
||||
|
||||
WatchExecutionContext metContext = mockExecutionContext("_name", new Payload.XContent(metResponse));
|
||||
assertTrue(condition.execute(metContext).met());
|
||||
}
|
||||
|
||||
}
|
|
@ -1,99 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.messy.tests;
|
||||
|
||||
import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse;
|
||||
import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse.FieldMappingMetaData;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.script.groovy.GroovyPlugin;
|
||||
import org.elasticsearch.xpack.watcher.execution.ExecutionState;
|
||||
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
|
||||
import org.elasticsearch.xpack.watcher.transport.actions.put.PutWatchResponse;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingAction;
|
||||
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
|
||||
import static org.elasticsearch.xpack.watcher.condition.ConditionBuilders.alwaysCondition;
|
||||
import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput;
|
||||
import static org.elasticsearch.xpack.watcher.transform.TransformBuilders.scriptTransform;
|
||||
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
|
||||
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval;
|
||||
import static org.hamcrest.Matchers.hasKey;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
public class HistoryTemplateTransformMappingsIT extends AbstractWatcherIntegrationTestCase {
|
||||
|
||||
@Override
|
||||
protected List<Class<? extends Plugin>> pluginTypes() {
|
||||
List<Class<? extends Plugin>> types = super.pluginTypes();
|
||||
types.add(GroovyPlugin.class);
|
||||
return types;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean timeWarped() {
|
||||
return true; // just to have better control over the triggers
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean enableSecurity() {
|
||||
return false; // remove security noise from this test
|
||||
}
|
||||
|
||||
public void testTransformFields() throws Exception {
|
||||
String index = "the-index";
|
||||
String type = "the-type";
|
||||
createIndex(index);
|
||||
index(index, type, "{}");
|
||||
flush();
|
||||
refresh();
|
||||
|
||||
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("_id1").setSource(watchBuilder()
|
||||
.trigger(schedule(interval("5s")))
|
||||
.input(simpleInput())
|
||||
.condition(alwaysCondition())
|
||||
.transform(scriptTransform("return [ 'key' : 'value1' ];"))
|
||||
.addAction("logger", scriptTransform("return [ 'key' : 'value2' ];"), loggingAction("indexed")))
|
||||
.get();
|
||||
assertThat(putWatchResponse.isCreated(), is(true));
|
||||
timeWarp().scheduler().trigger("_id1");
|
||||
|
||||
// adding another watch which with a transform that should conflict with the preview watch. Since the
|
||||
// mapping for the transform construct is disabled, there should be nor problems.
|
||||
putWatchResponse = watcherClient().preparePutWatch("_id2").setSource(watchBuilder()
|
||||
.trigger(schedule(interval("5s")))
|
||||
.input(simpleInput())
|
||||
.condition(alwaysCondition())
|
||||
.transform(scriptTransform("return [ 'key' : [ 'key1' : 'value1' ] ];"))
|
||||
.addAction("logger", scriptTransform("return [ 'key' : [ 'key1' : 'value2' ] ];"), loggingAction("indexed")))
|
||||
.get();
|
||||
assertThat(putWatchResponse.isCreated(), is(true));
|
||||
timeWarp().scheduler().trigger("_id2");
|
||||
|
||||
flush();
|
||||
refresh();
|
||||
|
||||
assertWatchWithMinimumActionsCount("_id1", ExecutionState.EXECUTED, 1);
|
||||
assertWatchWithMinimumActionsCount("_id2", ExecutionState.EXECUTED, 1);
|
||||
|
||||
refresh();
|
||||
|
||||
assertBusy(() -> {
|
||||
GetFieldMappingsResponse getFieldMappingsResponse = client().admin().indices()
|
||||
.prepareGetFieldMappings(".watcher-history*").setFields("result.actions.transform.payload")
|
||||
.setTypes("watch_record").includeDefaults(true).get();
|
||||
|
||||
for (Map<String, Map<String, FieldMappingMetaData>> map : getFieldMappingsResponse.mappings().values()) {
|
||||
Map<String, FieldMappingMetaData> watchRecord = map.get("watch_record");
|
||||
assertThat(watchRecord, hasKey("result.actions.transform.payload"));
|
||||
FieldMappingMetaData fieldMappingMetaData = watchRecord.get("result.actions.transform.payload");
|
||||
assertThat(fieldMappingMetaData.isNull(), is(true));
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
|
@ -1,42 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.messy.tests;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.script.ScriptContextRegistry;
|
||||
import org.elasticsearch.script.ScriptEngineRegistry;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.script.ScriptSettings;
|
||||
import org.elasticsearch.script.groovy.GroovyScriptEngineService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.watcher.ResourceWatcherService;
|
||||
import org.elasticsearch.xpack.watcher.support.WatcherScript;
|
||||
import org.junit.Ignore;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
||||
@Ignore // not a test.
|
||||
@SuppressForbidden(reason = "gradle is broken and tries to run me as a test")
|
||||
public final class MessyTestUtils {
|
||||
public static ScriptService createScriptService(ThreadPool tp) throws Exception {
|
||||
Settings settings = Settings.builder()
|
||||
.put("script.inline", "true")
|
||||
.put("script.indexed", "true")
|
||||
.put("path.home", LuceneTestCase.createTempDir())
|
||||
.build();
|
||||
GroovyScriptEngineService groovyScriptEngineService = new GroovyScriptEngineService(settings);
|
||||
ScriptEngineRegistry scriptEngineRegistry = new ScriptEngineRegistry(Collections.singleton(groovyScriptEngineService));
|
||||
ScriptContextRegistry scriptContextRegistry = new ScriptContextRegistry(Arrays.asList(WatcherScript.CTX_PLUGIN));
|
||||
|
||||
ScriptSettings scriptSettings = new ScriptSettings(scriptEngineRegistry, scriptContextRegistry);
|
||||
return new ScriptService(settings, new Environment(settings), new ResourceWatcherService(settings, tp),
|
||||
scriptEngineRegistry, scriptContextRegistry, scriptSettings);
|
||||
}
|
||||
}
|
|
@ -1,231 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.messy.tests;
|
||||
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.common.io.Streams;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.script.groovy.GroovyPlugin;
|
||||
import org.elasticsearch.xpack.watcher.support.WatcherScript;
|
||||
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
|
||||
import org.elasticsearch.xpack.watcher.test.WatcherTestUtils;
|
||||
import org.elasticsearch.xpack.watcher.transport.actions.put.PutWatchResponse;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
|
||||
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
||||
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
|
||||
import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.indexAction;
|
||||
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
|
||||
import static org.elasticsearch.xpack.watcher.condition.ConditionBuilders.alwaysCondition;
|
||||
import static org.elasticsearch.xpack.watcher.input.InputBuilders.searchInput;
|
||||
import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput;
|
||||
import static org.elasticsearch.xpack.watcher.transform.TransformBuilders.chainTransform;
|
||||
import static org.elasticsearch.xpack.watcher.transform.TransformBuilders.scriptTransform;
|
||||
import static org.elasticsearch.xpack.watcher.transform.TransformBuilders.searchTransform;
|
||||
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
|
||||
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class TransformIT extends AbstractWatcherIntegrationTestCase {
|
||||
|
||||
@Override
|
||||
protected List<Class<? extends Plugin>> pluginTypes() {
|
||||
List<Class<? extends Plugin>> types = super.pluginTypes();
|
||||
types.add(GroovyPlugin.class);
|
||||
return types;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Settings nodeSettings(int nodeOrdinal) {
|
||||
Path configDir = createTempDir();
|
||||
Path scripts = configDir.resolve("scripts");
|
||||
try {
|
||||
Files.createDirectories(scripts);
|
||||
try (InputStream stream = TransformIT.class.getResourceAsStream("/config/scripts/my-script.groovy");
|
||||
OutputStream output = Files.newOutputStream(scripts.resolve("my-script.groovy"))) {
|
||||
Streams.copy(stream, output);
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
//Set path so ScriptService will pick up the test scripts
|
||||
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put("path.conf", configDir.toString()).build();
|
||||
}
|
||||
|
||||
public void testScriptTransform() throws Exception {
|
||||
final WatcherScript script;
|
||||
if (randomBoolean()) {
|
||||
logger.info("testing script transform with an inline script");
|
||||
script = WatcherScript.inline("return [key3 : ctx.payload.key1 + ctx.payload.key2]").lang("groovy").build();
|
||||
} else if (randomBoolean()) {
|
||||
logger.info("testing script transform with an indexed script");
|
||||
client().admin().cluster().preparePutStoredScript()
|
||||
.setId("_id")
|
||||
.setScriptLang("groovy")
|
||||
.setSource(new BytesArray("{\"script\" : \"return [key3 : ctx.payload.key1 + ctx.payload.key2]\"}"))
|
||||
.get();
|
||||
script = WatcherScript.indexed("_id").lang("groovy").build();
|
||||
} else {
|
||||
logger.info("testing script transform with a file script");
|
||||
script = WatcherScript.file("my-script").lang("groovy").build();
|
||||
}
|
||||
|
||||
// put a watch that has watch level transform:
|
||||
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("_id1")
|
||||
.setSource(watchBuilder()
|
||||
.trigger(schedule(interval("5s")))
|
||||
.input(simpleInput(MapBuilder.<String, Object>newMapBuilder().put("key1", 10).put("key2", 10)))
|
||||
.condition(alwaysCondition())
|
||||
.transform(scriptTransform(script))
|
||||
.addAction("_id", indexAction("output1", "type")))
|
||||
.get();
|
||||
assertThat(putWatchResponse.isCreated(), is(true));
|
||||
// put a watch that has a action level transform:
|
||||
putWatchResponse = watcherClient().preparePutWatch("_id2")
|
||||
.setSource(watchBuilder()
|
||||
.trigger(schedule(interval("5s")))
|
||||
.input(simpleInput(MapBuilder.<String, Object>newMapBuilder().put("key1", 10).put("key2", 10)))
|
||||
.condition(alwaysCondition())
|
||||
.addAction("_id", scriptTransform(script), indexAction("output2", "type")))
|
||||
.get();
|
||||
assertThat(putWatchResponse.isCreated(), is(true));
|
||||
|
||||
if (timeWarped()) {
|
||||
timeWarp().scheduler().trigger("_id1");
|
||||
timeWarp().scheduler().trigger("_id2");
|
||||
refresh();
|
||||
}
|
||||
|
||||
assertWatchWithMinimumPerformedActionsCount("_id1", 1, false);
|
||||
assertWatchWithMinimumPerformedActionsCount("_id2", 1, false);
|
||||
refresh();
|
||||
|
||||
SearchResponse response = client().prepareSearch("output1").get();
|
||||
assertNoFailures(response);
|
||||
assertThat(response.getHits().getTotalHits(), greaterThanOrEqualTo(1L));
|
||||
assertThat(response.getHits().getAt(0).sourceAsMap().size(), equalTo(1));
|
||||
assertThat(response.getHits().getAt(0).sourceAsMap().get("key3").toString(), equalTo("20"));
|
||||
|
||||
response = client().prepareSearch("output2").get();
|
||||
assertNoFailures(response);
|
||||
assertThat(response.getHits().getTotalHits(), greaterThanOrEqualTo(1L));
|
||||
assertThat(response.getHits().getAt(0).sourceAsMap().size(), equalTo(1));
|
||||
assertThat(response.getHits().getAt(0).sourceAsMap().get("key3").toString(), equalTo("20"));
|
||||
}
|
||||
|
||||
public void testSearchTransform() throws Exception {
|
||||
createIndex("my-condition-index", "my-payload-index");
|
||||
ensureGreen("my-condition-index", "my-payload-index");
|
||||
|
||||
index("my-payload-index", "payload", "mytestresult");
|
||||
refresh();
|
||||
|
||||
SearchRequest inputRequest = WatcherTestUtils.newInputSearchRequest("my-condition-index")
|
||||
.source(searchSource().query(matchAllQuery()));
|
||||
SearchRequest transformRequest = WatcherTestUtils.newInputSearchRequest("my-payload-index")
|
||||
.source(searchSource().query(matchAllQuery()));
|
||||
|
||||
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("_id1")
|
||||
.setSource(watchBuilder()
|
||||
.trigger(schedule(interval("5s")))
|
||||
.input(searchInput(inputRequest))
|
||||
.transform(searchTransform(transformRequest))
|
||||
.addAction("_id", indexAction("output1", "result"))
|
||||
).get();
|
||||
assertThat(putWatchResponse.isCreated(), is(true));
|
||||
putWatchResponse = watcherClient().preparePutWatch("_id2")
|
||||
.setSource(watchBuilder()
|
||||
.trigger(schedule(interval("5s")))
|
||||
.input(searchInput(inputRequest))
|
||||
.addAction("_id", searchTransform(transformRequest), indexAction("output2", "result"))
|
||||
).get();
|
||||
assertThat(putWatchResponse.isCreated(), is(true));
|
||||
|
||||
if (timeWarped()) {
|
||||
timeWarp().scheduler().trigger("_id1");
|
||||
timeWarp().scheduler().trigger("_id2");
|
||||
refresh();
|
||||
}
|
||||
|
||||
assertWatchWithMinimumPerformedActionsCount("_id1", 1, false);
|
||||
assertWatchWithMinimumPerformedActionsCount("_id2", 1, false);
|
||||
refresh();
|
||||
|
||||
SearchResponse response = client().prepareSearch("output1").get();
|
||||
assertNoFailures(response);
|
||||
assertThat(response.getHits().getTotalHits(), greaterThanOrEqualTo(1L));
|
||||
assertThat(response.getHits().getAt(0).sourceAsString(), containsString("mytestresult"));
|
||||
|
||||
response = client().prepareSearch("output2").get();
|
||||
assertNoFailures(response);
|
||||
assertThat(response.getHits().getTotalHits(), greaterThanOrEqualTo(1L));
|
||||
assertThat(response.getHits().getAt(0).sourceAsString(), containsString("mytestresult"));
|
||||
}
|
||||
|
||||
public void testChainTransform() throws Exception {
|
||||
final WatcherScript script1 = WatcherScript.inline("return [key3 : ctx.payload.key1 + ctx.payload.key2]").lang("groovy").build();
|
||||
final WatcherScript script2 = WatcherScript.inline("return [key4 : ctx.payload.key3 + 10]").lang("groovy").build();
|
||||
// put a watch that has watch level transform:
|
||||
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("_id1")
|
||||
.setSource(watchBuilder()
|
||||
.trigger(schedule(interval("5s")))
|
||||
.input(simpleInput(MapBuilder.<String, Object>newMapBuilder().put("key1", 10).put("key2", 10)))
|
||||
.condition(alwaysCondition())
|
||||
.transform(chainTransform(scriptTransform(script1), scriptTransform(script2)))
|
||||
.addAction("_id", indexAction("output1", "type")))
|
||||
.get();
|
||||
assertThat(putWatchResponse.isCreated(), is(true));
|
||||
// put a watch that has a action level transform:
|
||||
putWatchResponse = watcherClient().preparePutWatch("_id2")
|
||||
.setSource(watchBuilder()
|
||||
.trigger(schedule(interval("5s")))
|
||||
.input(simpleInput(MapBuilder.<String, Object>newMapBuilder().put("key1", 10).put("key2", 10)))
|
||||
.condition(alwaysCondition())
|
||||
.addAction("_id", chainTransform(scriptTransform(script1), scriptTransform(script2)),
|
||||
indexAction("output2", "type")))
|
||||
.get();
|
||||
assertThat(putWatchResponse.isCreated(), is(true));
|
||||
|
||||
if (timeWarped()) {
|
||||
timeWarp().scheduler().trigger("_id1");
|
||||
timeWarp().scheduler().trigger("_id2");
|
||||
refresh();
|
||||
}
|
||||
|
||||
assertWatchWithMinimumPerformedActionsCount("_id1", 1, false);
|
||||
assertWatchWithMinimumPerformedActionsCount("_id2", 1, false);
|
||||
refresh();
|
||||
|
||||
SearchResponse response = client().prepareSearch("output1").get();
|
||||
assertNoFailures(response);
|
||||
assertThat(response.getHits().getTotalHits(), greaterThanOrEqualTo(1L));
|
||||
assertThat(response.getHits().getAt(0).sourceAsMap().size(), equalTo(1));
|
||||
assertThat(response.getHits().getAt(0).sourceAsMap().get("key4").toString(), equalTo("30"));
|
||||
|
||||
response = client().prepareSearch("output2").get();
|
||||
assertNoFailures(response);
|
||||
assertThat(response.getHits().getTotalHits(), greaterThanOrEqualTo(1L));
|
||||
assertThat(response.getHits().getAt(0).sourceAsMap().size(), equalTo(1));
|
||||
assertThat(response.getHits().getAt(0).sourceAsMap().get("key4").toString(), equalTo("30"));
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,137 @@
|
|||
---
|
||||
"Test the execution of a Groovy closure in script condition":
|
||||
|
||||
- do:
|
||||
bulk:
|
||||
refresh: true
|
||||
body: |
|
||||
{"index": {"_index": ".monitoring", "_type": "cluster_stats"}}
|
||||
{ "status": "green", "@timestamp": "2005-01-01T00:00:00" }
|
||||
{"index": {"_index": ".monitoring", "_type": "cluster_stats"}}
|
||||
{ "status": "yellow", "@timestamp": "2005-01-01T00:00:05" }
|
||||
{"index": {"_index": ".monitoring", "_type": "cluster_stats"}}
|
||||
{ "status": "green", "@timestamp": "2005-01-01T00:00:55" }
|
||||
|
||||
- do:
|
||||
xpack.watcher.put_watch:
|
||||
id: "watch_with_groovy_closure"
|
||||
body: >
|
||||
{
|
||||
"trigger" : {
|
||||
"schedule" : { "cron" : "0 0 0 1 * ? 2099" }
|
||||
},
|
||||
"input" : {
|
||||
"search" : {
|
||||
"request" : {
|
||||
"indices" : [ ".monitoring" ],
|
||||
"body" : {
|
||||
"query" : {
|
||||
"match_all" : {
|
||||
}
|
||||
},
|
||||
"aggregations" : {
|
||||
"minutes" : {
|
||||
"date_histogram" : {
|
||||
"field" : "@timestamp",
|
||||
"interval" : "5s",
|
||||
"order" : {
|
||||
"_count" : "desc"
|
||||
}
|
||||
},
|
||||
"aggregations": {
|
||||
"status" : {
|
||||
"terms" : {
|
||||
"field" : "status.keyword",
|
||||
"size" : 3
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"condition" : {
|
||||
"script" : {
|
||||
"inline" : "if (ctx.payload.hits.total < 1) return false; def rows = ctx.payload.hits.hits; if (ctx.payload.aggregations.minutes.buckets.size() < 12) return false; def last60Seconds = ctx.payload.aggregations.minutes.buckets[-12..-1]; return last60Seconds.every { it.status.buckets.every { s -> s.key == 'red' } }",
|
||||
"lang": "groovy"
|
||||
}
|
||||
},
|
||||
"actions" : {
|
||||
"log" : {
|
||||
"logging" : {
|
||||
"text" : "executed at {{ctx.execution_time}}"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
- match: { _id: "watch_with_groovy_closure" }
|
||||
|
||||
- do:
|
||||
xpack.watcher.execute_watch:
|
||||
id: "watch_with_groovy_closure"
|
||||
body: >
|
||||
{
|
||||
"trigger_data" : {
|
||||
"scheduled_time" : "2015-05-05T20:58:02.443Z",
|
||||
"triggered_time" : "2015-05-05T20:58:02.443Z"
|
||||
},
|
||||
"ignore_condition" : false,
|
||||
"action_modes" : {
|
||||
"_all" : "force_simulate"
|
||||
},
|
||||
"record_execution" : false
|
||||
}
|
||||
|
||||
- match: { "watch_record.state": "execution_not_needed" }
|
||||
- match: { "watch_record.result.condition.met": false }
|
||||
|
||||
- do:
|
||||
bulk:
|
||||
refresh: true
|
||||
body: |
|
||||
{"index": {"_index": ".monitoring", "_type": "cluster_stats"}}
|
||||
{ "status": "red", "@timestamp": "2005-01-01T00:01:00" }
|
||||
{"index": {"_index": ".monitoring", "_type": "cluster_stats"}}
|
||||
{ "status": "red", "@timestamp": "2005-01-01T00:01:05" }
|
||||
{"index": {"_index": ".monitoring", "_type": "cluster_stats"}}
|
||||
{ "status": "red", "@timestamp": "2005-01-01T00:01:10" }
|
||||
{"index": {"_index": ".monitoring", "_type": "cluster_stats"}}
|
||||
{ "status": "red", "@timestamp": "2005-01-01T00:01:15" }
|
||||
{"index": {"_index": ".monitoring", "_type": "cluster_stats"}}
|
||||
{ "status": "red", "@timestamp": "2005-01-01T00:01:20" }
|
||||
{"index": {"_index": ".monitoring", "_type": "cluster_stats"}}
|
||||
{ "status": "red", "@timestamp": "2005-01-01T00:01:25" }
|
||||
{"index": {"_index": ".monitoring", "_type": "cluster_stats"}}
|
||||
{ "status": "red", "@timestamp": "2005-01-01T00:01:30" }
|
||||
{"index": {"_index": ".monitoring", "_type": "cluster_stats"}}
|
||||
{ "status": "red", "@timestamp": "2005-01-01T00:01:35" }
|
||||
{"index": {"_index": ".monitoring", "_type": "cluster_stats"}}
|
||||
{ "status": "red", "@timestamp": "2005-01-01T00:01:40" }
|
||||
{"index": {"_index": ".monitoring", "_type": "cluster_stats"}}
|
||||
{ "status": "red", "@timestamp": "2005-01-01T00:01:45" }
|
||||
{"index": {"_index": ".monitoring", "_type": "cluster_stats"}}
|
||||
{ "status": "red", "@timestamp": "2005-01-01T00:01:50" }
|
||||
{"index": {"_index": ".monitoring", "_type": "cluster_stats"}}
|
||||
{ "status": "red", "@timestamp": "2005-01-01T00:01:55" }
|
||||
|
||||
- do:
|
||||
xpack.watcher.execute_watch:
|
||||
id: "watch_with_groovy_closure"
|
||||
body: >
|
||||
{
|
||||
"trigger_data" : {
|
||||
"scheduled_time" : "2015-05-05T20:58:02.443Z",
|
||||
"triggered_time" : "2015-05-05T20:58:02.443Z"
|
||||
},
|
||||
"ignore_condition" : false,
|
||||
"action_modes" : {
|
||||
"_all" : "force_simulate"
|
||||
},
|
||||
"record_execution" : false
|
||||
}
|
||||
|
||||
- match: { "watch_record.state": "executed" }
|
||||
- match: { "watch_record.result.condition.met": true }
|
|
@ -3,15 +3,17 @@
|
|||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.messy.tests;
|
||||
package org.elasticsearch.xpack.watcher.condition.script;
|
||||
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.search.ShardSearchFailure;
|
||||
import org.elasticsearch.common.text.Text;
|
||||
import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.script.MockScriptPlugin;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.script.groovy.GroovyPlugin;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.SearchShardTarget;
|
||||
import org.elasticsearch.search.aggregations.AggregationBuilders;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
|
||||
|
@ -19,45 +21,55 @@ import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
|
|||
import org.elasticsearch.search.internal.InternalSearchHit;
|
||||
import org.elasticsearch.search.internal.InternalSearchHits;
|
||||
import org.elasticsearch.search.internal.InternalSearchResponse;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.watcher.condition.script.ExecutableScriptCondition;
|
||||
import org.elasticsearch.xpack.watcher.condition.script.ScriptCondition;
|
||||
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
|
||||
import org.elasticsearch.xpack.watcher.support.WatcherScript;
|
||||
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
|
||||
import org.elasticsearch.xpack.watcher.watch.Payload;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.mockExecutionContext;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class ScriptConditionSearchIT extends AbstractWatcherIntegrationTestCase {
|
||||
private ThreadPool tp = null;
|
||||
private ScriptService scriptService;
|
||||
public class ScriptConditionSearchTests extends AbstractWatcherIntegrationTestCase {
|
||||
|
||||
@Override
|
||||
protected List<Class<? extends Plugin>> pluginTypes() {
|
||||
List<Class<? extends Plugin>> types = super.pluginTypes();
|
||||
types.add(GroovyPlugin.class);
|
||||
types.add(CustomScriptPlugin.class);
|
||||
return types;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void init() throws Exception {
|
||||
tp = new TestThreadPool(ThreadPool.Names.SAME);
|
||||
scriptService = MessyTestUtils.createScriptService(tp);
|
||||
}
|
||||
public static class CustomScriptPlugin extends MockScriptPlugin {
|
||||
|
||||
@After
|
||||
public void cleanup() {
|
||||
tp.shutdownNow();
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
protected Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
|
||||
Map<String, Function<Map<String, Object>, Object>> scripts = new HashMap<>();
|
||||
|
||||
scripts.put("ctx.payload.aggregations.rate.buckets[0]?.doc_count >= 5", vars -> {
|
||||
List<?> buckets = (List<?>) XContentMapValues.extractValue("ctx.payload.aggregations.rate.buckets", vars);
|
||||
int docCount = (int) XContentMapValues.extractValue("doc_count", (Map<String, Object>) buckets.get(0));
|
||||
return docCount >= 5;
|
||||
});
|
||||
|
||||
scripts.put("ctx.payload.hits?.hits[0]?._score == 1.0", vars -> {
|
||||
List<SearchHit> searchHits = (List<SearchHit>) XContentMapValues.extractValue("ctx.payload.hits.hits", vars);
|
||||
double score = (double) XContentMapValues.extractValue("_score", (Map<String, Object>) searchHits.get(0));
|
||||
return score == 1.0;
|
||||
});
|
||||
|
||||
return scripts;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String pluginScriptLang() {
|
||||
return WatcherScript.DEFAULT_LANG;
|
||||
}
|
||||
}
|
||||
|
||||
public void testExecuteWithAggs() throws Exception {
|
||||
|
@ -72,6 +84,7 @@ public class ScriptConditionSearchIT extends AbstractWatcherIntegrationTestCase
|
|||
.dateHistogramInterval(DateHistogramInterval.HOUR).order(Histogram.Order.COUNT_DESC))
|
||||
.get();
|
||||
|
||||
ScriptService scriptService = internalCluster().getInstance(ScriptService.class);
|
||||
ExecutableScriptCondition condition = new ExecutableScriptCondition(
|
||||
new ScriptCondition(WatcherScript.inline("ctx.payload.aggregations.rate.buckets[0]?.doc_count >= 5").build()),
|
||||
logger, scriptService);
|
||||
|
@ -91,6 +104,7 @@ public class ScriptConditionSearchIT extends AbstractWatcherIntegrationTestCase
|
|||
}
|
||||
|
||||
public void testExecuteAccessHits() throws Exception {
|
||||
ScriptService scriptService = internalCluster().getInstance(ScriptService.class);
|
||||
ExecutableScriptCondition condition = new ExecutableScriptCondition(new ScriptCondition(
|
||||
WatcherScript.inline("ctx.payload.hits?.hits[0]?._score == 1.0").build()), logger, scriptService);
|
||||
InternalSearchHit hit = new InternalSearchHit(0, "1", new Text("type"), null);
|
|
@ -3,7 +3,7 @@
|
|||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.messy.tests;
|
||||
package org.elasticsearch.xpack.watcher.condition.script;
|
||||
|
||||
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
|
@ -13,30 +13,37 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.script.GeneralScriptException;
|
||||
import org.elasticsearch.script.MockScriptEngine;
|
||||
import org.elasticsearch.script.ScriptContextRegistry;
|
||||
import org.elasticsearch.script.ScriptEngineRegistry;
|
||||
import org.elasticsearch.script.ScriptEngineService;
|
||||
import org.elasticsearch.script.ScriptException;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.script.ScriptService.ScriptType;
|
||||
import org.elasticsearch.script.ScriptSettings;
|
||||
import org.elasticsearch.search.internal.InternalSearchResponse;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.watcher.condition.script.ExecutableScriptCondition;
|
||||
import org.elasticsearch.xpack.watcher.condition.script.ScriptCondition;
|
||||
import org.elasticsearch.xpack.watcher.condition.script.ScriptConditionFactory;
|
||||
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
|
||||
import org.elasticsearch.xpack.watcher.support.WatcherScript;
|
||||
import org.elasticsearch.xpack.watcher.watch.Payload;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.DateTimeZone;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
import static java.util.Collections.singleton;
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.elasticsearch.messy.tests.MessyTestUtils.createScriptService;
|
||||
import static org.elasticsearch.script.ScriptService.SCRIPT_AUTO_RELOAD_ENABLED_SETTING;
|
||||
import static org.elasticsearch.xpack.watcher.support.Exceptions.illegalArgument;
|
||||
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.mockExecutionContext;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
|
@ -44,20 +51,50 @@ import static org.hamcrest.Matchers.is;
|
|||
|
||||
public class ScriptConditionTests extends ESTestCase {
|
||||
|
||||
private ThreadPool tp = null;
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
tp = new TestThreadPool(ThreadPool.Names.SAME);
|
||||
}
|
||||
private ScriptService scriptService;
|
||||
|
||||
@After
|
||||
public void cleanup() throws InterruptedException {
|
||||
terminate(tp);
|
||||
@Before
|
||||
public void init() throws IOException {
|
||||
Map<String, Function<Map<String, Object>, Object>> scripts = new HashMap<>();
|
||||
scripts.put("return true", s -> true);
|
||||
scripts.put("return new Object()", s -> new Object());
|
||||
|
||||
scripts.put("ctx.trigger.scheduled_time.getMillis() < new Date().time", vars -> {
|
||||
DateTime scheduledTime = (DateTime) XContentMapValues.extractValue("ctx.trigger.scheduled_time", vars);
|
||||
return scheduledTime.getMillis() < new Date().getTime();
|
||||
});
|
||||
|
||||
scripts.put("null.foo", s -> {
|
||||
throw new ScriptException("Error evaluating null.foo", new IllegalArgumentException(), emptyList(),
|
||||
"null.foo", WatcherScript.DEFAULT_LANG);
|
||||
});
|
||||
|
||||
scripts.put("ctx.payload.hits.total > 1", vars -> {
|
||||
int total = (int) XContentMapValues.extractValue("ctx.payload.hits.total", vars);
|
||||
return total > 1;
|
||||
});
|
||||
|
||||
scripts.put("ctx.payload.hits.total > threshold", vars -> {
|
||||
int total = (int) XContentMapValues.extractValue("ctx.payload.hits.total", vars);
|
||||
int threshold = (int) XContentMapValues.extractValue("threshold", vars);
|
||||
return total > threshold;
|
||||
});
|
||||
|
||||
ScriptEngineService engine = new MockScriptEngine(WatcherScript.DEFAULT_LANG, scripts);
|
||||
|
||||
ScriptEngineRegistry registry = new ScriptEngineRegistry(singleton(engine));
|
||||
ScriptContextRegistry contextRegistry = new ScriptContextRegistry(singleton(WatcherScript.CTX_PLUGIN));
|
||||
ScriptSettings scriptSettings = new ScriptSettings(registry, contextRegistry);
|
||||
|
||||
Settings settings = Settings.builder()
|
||||
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir())
|
||||
.put(SCRIPT_AUTO_RELOAD_ENABLED_SETTING.getKey(), false)
|
||||
.build();
|
||||
|
||||
scriptService = new ScriptService(settings, new Environment(settings), null, registry, contextRegistry, scriptSettings);
|
||||
}
|
||||
|
||||
public void testExecute() throws Exception {
|
||||
ScriptService scriptService = createScriptService(tp);
|
||||
ExecutableScriptCondition condition = new ExecutableScriptCondition(
|
||||
new ScriptCondition(WatcherScript.inline("ctx.payload.hits.total > 1").build()), logger, scriptService);
|
||||
SearchResponse response = new SearchResponse(InternalSearchResponse.empty(), "", 3, 3, 500L, new ShardSearchFailure[0]);
|
||||
|
@ -66,7 +103,6 @@ public class ScriptConditionTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testExecuteMergedParams() throws Exception {
|
||||
ScriptService scriptService = createScriptService(tp);
|
||||
WatcherScript script = WatcherScript.inline("ctx.payload.hits.total > threshold")
|
||||
.lang(WatcherScript.DEFAULT_LANG).params(singletonMap("threshold", 1)).build();
|
||||
ExecutableScriptCondition executable = new ExecutableScriptCondition(new ScriptCondition(script), logger, scriptService);
|
||||
|
@ -76,7 +112,7 @@ public class ScriptConditionTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testParserValid() throws Exception {
|
||||
ScriptConditionFactory factory = new ScriptConditionFactory(Settings.builder().build(), createScriptService(tp));
|
||||
ScriptConditionFactory factory = new ScriptConditionFactory(Settings.builder().build(), scriptService);
|
||||
|
||||
XContentBuilder builder = createConditionContent("ctx.payload.hits.total > 1", null, ScriptType.INLINE);
|
||||
|
||||
|
@ -103,7 +139,7 @@ public class ScriptConditionTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testParserInvalid() throws Exception {
|
||||
ScriptConditionFactory factory = new ScriptConditionFactory(Settings.builder().build(), createScriptService(tp));
|
||||
ScriptConditionFactory factory = new ScriptConditionFactory(Settings.builder().build(), scriptService);
|
||||
XContentBuilder builder = XContentFactory.jsonBuilder();
|
||||
builder.startObject().endObject();
|
||||
XContentParser parser = XContentFactory.xContent(builder.bytes()).createParser(builder.bytes());
|
||||
|
@ -118,7 +154,7 @@ public class ScriptConditionTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testScriptConditionParserBadScript() throws Exception {
|
||||
ScriptConditionFactory conditionParser = new ScriptConditionFactory(Settings.builder().build(), createScriptService(tp));
|
||||
ScriptConditionFactory conditionParser = new ScriptConditionFactory(Settings.builder().build(), scriptService);
|
||||
ScriptType scriptType = randomFrom(ScriptType.values());
|
||||
String script;
|
||||
switch (scriptType) {
|
||||
|
@ -134,15 +170,14 @@ public class ScriptConditionTests extends ESTestCase {
|
|||
XContentParser parser = XContentFactory.xContent(builder.bytes()).createParser(builder.bytes());
|
||||
parser.nextToken();
|
||||
ScriptCondition scriptCondition = conditionParser.parseCondition("_watch", parser);
|
||||
GeneralScriptException exception = expectThrows(GeneralScriptException.class,
|
||||
expectThrows(GeneralScriptException.class,
|
||||
() -> conditionParser.createExecutable(scriptCondition));
|
||||
}
|
||||
|
||||
public void testScriptConditionParser_badLang() throws Exception {
|
||||
ScriptConditionFactory conditionParser = new ScriptConditionFactory(Settings.builder().build(), createScriptService(tp));
|
||||
ScriptType scriptType = ScriptType.INLINE;
|
||||
ScriptConditionFactory conditionParser = new ScriptConditionFactory(Settings.builder().build(), scriptService);
|
||||
String script = "return true";
|
||||
XContentBuilder builder = createConditionContent(script, "not_a_valid_lang", scriptType);
|
||||
XContentBuilder builder = createConditionContent(script, "not_a_valid_lang", ScriptType.INLINE);
|
||||
XContentParser parser = XContentFactory.xContent(builder.bytes()).createParser(builder.bytes());
|
||||
parser.nextToken();
|
||||
ScriptCondition scriptCondition = conditionParser.parseCondition("_watch", parser);
|
||||
|
@ -152,7 +187,6 @@ public class ScriptConditionTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testScriptConditionThrowException() throws Exception {
|
||||
ScriptService scriptService = createScriptService(tp);
|
||||
ExecutableScriptCondition condition = new ExecutableScriptCondition(
|
||||
new ScriptCondition(WatcherScript.inline("null.foo").build()), logger, scriptService);
|
||||
SearchResponse response = new SearchResponse(InternalSearchResponse.empty(), "", 3, 3, 500L, new ShardSearchFailure[0]);
|
||||
|
@ -162,7 +196,6 @@ public class ScriptConditionTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testScriptConditionReturnObjectThrowsException() throws Exception {
|
||||
ScriptService scriptService = createScriptService(tp);
|
||||
ExecutableScriptCondition condition = new ExecutableScriptCondition(
|
||||
new ScriptCondition(WatcherScript.inline("return new Object()").build()), logger, scriptService);
|
||||
SearchResponse response = new SearchResponse(InternalSearchResponse.empty(), "", 3, 3, 500L, new ShardSearchFailure[0]);
|
||||
|
@ -173,9 +206,8 @@ public class ScriptConditionTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testScriptConditionAccessCtx() throws Exception {
|
||||
ScriptService scriptService = createScriptService(tp);
|
||||
ExecutableScriptCondition condition = new ExecutableScriptCondition(
|
||||
new ScriptCondition(WatcherScript.inline("ctx.trigger.scheduled_time.getMillis() < new Date().time ").build()),
|
||||
new ScriptCondition(WatcherScript.inline("ctx.trigger.scheduled_time.getMillis() < new Date().time").build()),
|
||||
logger, scriptService);
|
||||
SearchResponse response = new SearchResponse(InternalSearchResponse.empty(), "", 3, 3, 500L, new ShardSearchFailure[0]);
|
||||
WatchExecutionContext ctx = mockExecutionContext("_name", new DateTime(DateTimeZone.UTC), new Payload.XContent(response));
|
|
@ -9,17 +9,23 @@ import org.elasticsearch.ElasticsearchException;
|
|||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.script.MockScriptPlugin;
|
||||
import org.elasticsearch.xpack.support.clock.SystemClock;
|
||||
import org.elasticsearch.xpack.watcher.WatcherService;
|
||||
import org.elasticsearch.xpack.watcher.actions.ActionStatus;
|
||||
import org.elasticsearch.xpack.watcher.actions.logging.LoggingAction;
|
||||
import org.elasticsearch.xpack.watcher.client.WatchSourceBuilder;
|
||||
import org.elasticsearch.xpack.watcher.condition.always.AlwaysCondition;
|
||||
import org.elasticsearch.xpack.watcher.condition.script.ScriptCondition;
|
||||
import org.elasticsearch.xpack.watcher.history.HistoryStore;
|
||||
import org.elasticsearch.xpack.watcher.history.WatchRecord;
|
||||
import org.elasticsearch.xpack.watcher.input.simple.SimpleInput;
|
||||
import org.elasticsearch.xpack.support.clock.SystemClock;
|
||||
import org.elasticsearch.xpack.watcher.support.WatcherScript;
|
||||
import org.elasticsearch.xpack.watcher.support.xcontent.ObjectPath;
|
||||
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
|
||||
import org.elasticsearch.xpack.watcher.transport.actions.delete.DeleteWatchResponse;
|
||||
import org.elasticsearch.xpack.watcher.transport.actions.execute.ExecuteWatchRequest;
|
||||
import org.elasticsearch.xpack.watcher.transport.actions.execute.ExecuteWatchRequestBuilder;
|
||||
import org.elasticsearch.xpack.watcher.transport.actions.execute.ExecuteWatchResponse;
|
||||
|
@ -35,11 +41,15 @@ import org.elasticsearch.xpack.watcher.watch.Watch;
|
|||
import org.joda.time.DateTime;
|
||||
import org.joda.time.DateTimeZone;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
|
||||
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
||||
import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingAction;
|
||||
|
@ -52,19 +62,58 @@ import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.cron;
|
|||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.lessThan;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.startsWith;
|
||||
|
||||
|
||||
public class ManualExecutionTests extends AbstractWatcherIntegrationTestCase {
|
||||
|
||||
@Override
|
||||
protected boolean enableSecurity() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<Class<? extends Plugin>> pluginTypes() {
|
||||
List<Class<? extends Plugin>> types = super.pluginTypes();
|
||||
types.add(CustomScriptPlugin.class);
|
||||
return types;
|
||||
}
|
||||
|
||||
public static class CustomScriptPlugin extends MockScriptPlugin {
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
protected Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
|
||||
Map<String, Function<Map<String, Object>, Object>> scripts = new HashMap<>();
|
||||
|
||||
scripts.put("sleep", vars -> {
|
||||
Number millis = (Number) XContentMapValues.extractValue("millis", vars);
|
||||
if (millis != null) {
|
||||
try {
|
||||
Thread.sleep(millis.longValue());
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
} else {
|
||||
throw new RuntimeException("Unable to sleep, [millis] parameter is missing!");
|
||||
}
|
||||
return true;
|
||||
});
|
||||
return scripts;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String pluginScriptLang() {
|
||||
return WatcherScript.DEFAULT_LANG;
|
||||
}
|
||||
}
|
||||
|
||||
public void testExecuteWatch() throws Exception {
|
||||
boolean ignoreCondition = randomBoolean();
|
||||
boolean recordExecution = randomBoolean();
|
||||
|
@ -267,9 +316,6 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTestCase {
|
|||
|
||||
TriggerEvent triggerEvent = new ScheduleTriggerEvent(SystemClock.INSTANCE.nowUTC(), SystemClock.INSTANCE.nowUTC());
|
||||
|
||||
Wid wid = new Wid("_watchId",1, SystemClock.INSTANCE.nowUTC());
|
||||
|
||||
|
||||
Map<String, Object> executeWatchResult = watcherClient().prepareExecuteWatch()
|
||||
.setId("_id")
|
||||
.setTriggerEvent(triggerEvent)
|
||||
|
@ -287,7 +333,6 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTestCase {
|
|||
.addAction("log", loggingAction("foobar"));
|
||||
watcherClient().putWatch(new PutWatchRequest("_id", watchBuilder)).actionGet();
|
||||
|
||||
|
||||
executeWatchResult = watcherClient().prepareExecuteWatch()
|
||||
.setId("_id").setTriggerEvent(triggerEvent).setRecordExecution(true)
|
||||
.get().getRecordSource().getAsMap();
|
||||
|
@ -296,7 +341,6 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTestCase {
|
|||
assertThat(ObjectPath.<String>eval("result.input.payload.foo", executeWatchResult), equalTo("bar"));
|
||||
assertThat(ObjectPath.<String>eval("result.actions.0.id", executeWatchResult), equalTo("log"));
|
||||
|
||||
|
||||
executeWatchResult = watcherClient().prepareExecuteWatch()
|
||||
.setId("_id").setTriggerEvent(triggerEvent)
|
||||
.get().getRecordSource().getAsMap();
|
||||
|
@ -304,6 +348,66 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTestCase {
|
|||
assertThat(ObjectPath.<String>eval("state", executeWatchResult), equalTo(ExecutionState.THROTTLED.toString()));
|
||||
}
|
||||
|
||||
public void testWatchExecutionDuration() throws Exception {
|
||||
WatcherScript script = new WatcherScript.Builder.Inline("sleep").params(singletonMap("millis", 100L)).build();
|
||||
WatchSourceBuilder watchBuilder = watchBuilder()
|
||||
.trigger(schedule(cron("0 0 0 1 * ? 2099")))
|
||||
.input(simpleInput("foo", "bar"))
|
||||
.condition(new ScriptCondition(script))
|
||||
.addAction("log", loggingAction("foobar"));
|
||||
|
||||
Watch watch = watchParser().parse("_id", false, watchBuilder.buildAsBytes(XContentType.JSON));
|
||||
ManualExecutionContext.Builder ctxBuilder = ManualExecutionContext.builder(watch, false, new ManualTriggerEvent("_id",
|
||||
new ScheduleTriggerEvent(new DateTime(DateTimeZone.UTC), new DateTime(DateTimeZone.UTC))),
|
||||
new TimeValue(1, TimeUnit.HOURS));
|
||||
WatchRecord record = executionService().execute(ctxBuilder.build());
|
||||
assertThat(record.result().executionDurationMs(), greaterThanOrEqualTo(100L));
|
||||
}
|
||||
|
||||
public void testForceDeletionOfLongRunningWatch() throws Exception {
|
||||
WatcherScript script = new WatcherScript.Builder.Inline("sleep").params(singletonMap("millis", 10000L)).build();
|
||||
WatchSourceBuilder watchBuilder = watchBuilder()
|
||||
.trigger(schedule(cron("0 0 0 1 * ? 2099")))
|
||||
.input(simpleInput("foo", "bar"))
|
||||
.condition(new ScriptCondition(script))
|
||||
.defaultThrottlePeriod(new TimeValue(1, TimeUnit.HOURS))
|
||||
.addAction("log", loggingAction("foobar"));
|
||||
|
||||
PutWatchResponse putWatchResponse = watcherClient().putWatch(new PutWatchRequest("_id", watchBuilder)).actionGet();
|
||||
assertThat(putWatchResponse.getVersion(), greaterThan(0L));
|
||||
refresh();
|
||||
assertThat(watcherClient().getWatch(new GetWatchRequest("_id")).actionGet().isFound(), equalTo(true));
|
||||
|
||||
int numberOfThreads = scaledRandomIntBetween(1, 5);
|
||||
CountDownLatch startLatch = new CountDownLatch(1);
|
||||
|
||||
List<Thread> threads = new ArrayList<>();
|
||||
for (int i = 0; i < numberOfThreads; ++i) {
|
||||
threads.add(new Thread(new ExecutionRunner(watchService(), executionService(), "_id", startLatch)));
|
||||
}
|
||||
|
||||
for (Thread thread : threads) {
|
||||
thread.start();
|
||||
}
|
||||
DeleteWatchResponse deleteWatchResponse = watcherClient().prepareDeleteWatch("_id").setForce(true).get();
|
||||
assertThat(deleteWatchResponse.isFound(), is(true));
|
||||
|
||||
deleteWatchResponse = watcherClient().prepareDeleteWatch("_id").get();
|
||||
assertThat(deleteWatchResponse.isFound(), is(false));
|
||||
|
||||
startLatch.countDown();
|
||||
|
||||
long startJoin = System.currentTimeMillis();
|
||||
for (Thread thread : threads) {
|
||||
thread.join(30_000L);
|
||||
assertFalse(thread.isAlive());
|
||||
}
|
||||
long endJoin = System.currentTimeMillis();
|
||||
TimeValue tv = new TimeValue(10 * (numberOfThreads+1), TimeUnit.SECONDS);
|
||||
assertThat("Shouldn't take longer than [" + tv.getSeconds() + "] seconds for all the threads to stop", (endJoin - startJoin),
|
||||
lessThan(tv.getMillis()));
|
||||
}
|
||||
|
||||
public static class ExecutionRunner implements Runnable {
|
||||
|
||||
final WatcherService watcherService;
|
||||
|
|
|
@ -5,19 +5,20 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.watcher.history;
|
||||
|
||||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||
import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
|
||||
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
|
||||
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.script.MockScriptPlugin;
|
||||
import org.elasticsearch.xpack.watcher.execution.ExecutionState;
|
||||
import org.elasticsearch.xpack.watcher.support.WatcherScript;
|
||||
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
|
||||
import org.elasticsearch.xpack.watcher.transport.actions.put.PutWatchResponse;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.support.XContentMapValues.extractValue;
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingAction;
|
||||
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
|
||||
import static org.elasticsearch.xpack.watcher.condition.ConditionBuilders.alwaysCondition;
|
||||
|
@ -25,15 +26,43 @@ import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput;
|
|||
import static org.elasticsearch.xpack.watcher.transform.TransformBuilders.scriptTransform;
|
||||
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
|
||||
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval;
|
||||
import static org.hamcrest.Matchers.hasKey;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
||||
/**
|
||||
* This test makes sure that the http host and path fields in the watch_record action result are
|
||||
* not analyzed so they can be used in aggregations
|
||||
*/
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/x-plugins/issues/724")
|
||||
public class HistoryTemplateTransformMappingsTests extends AbstractWatcherIntegrationTestCase {
|
||||
|
||||
@Override
|
||||
protected List<Class<? extends Plugin>> pluginTypes() {
|
||||
List<Class<? extends Plugin>> types = super.pluginTypes();
|
||||
types.add(CustomScriptPlugin.class);
|
||||
return types;
|
||||
}
|
||||
|
||||
public static class CustomScriptPlugin extends MockScriptPlugin {
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
protected Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
|
||||
Map<String, Function<Map<String, Object>, Object>> scripts = new HashMap<>();
|
||||
|
||||
scripts.put("return [ 'key' : 'value1' ];", vars -> singletonMap("key", "value1"));
|
||||
scripts.put("return [ 'key' : 'value2' ];", vars -> singletonMap("key", "value2"));
|
||||
scripts.put("return [ 'key' : [ 'key1' : 'value1' ] ];", vars -> singletonMap("key", singletonMap("key1", "value1")));
|
||||
scripts.put("return [ 'key' : [ 'key1' : 'value2' ] ];", vars -> singletonMap("key", singletonMap("key1", "value2")));
|
||||
|
||||
return scripts;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String pluginScriptLang() {
|
||||
return WatcherScript.DEFAULT_LANG;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean timeWarped() {
|
||||
return true; // just to have better control over the triggers
|
||||
|
@ -82,30 +111,19 @@ public class HistoryTemplateTransformMappingsTests extends AbstractWatcherIntegr
|
|||
|
||||
refresh();
|
||||
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
GetMappingsResponse mappingsResponse = client().admin().indices().prepareGetMappings().get();
|
||||
assertThat(mappingsResponse, notNullValue());
|
||||
assertThat(mappingsResponse.getMappings().isEmpty(), is(false));
|
||||
for (ObjectObjectCursor<String, ImmutableOpenMap<String, MappingMetaData>> metadatas : mappingsResponse.getMappings()) {
|
||||
if (!metadatas.key.startsWith(".watcher-history")) {
|
||||
continue;
|
||||
}
|
||||
MappingMetaData metadata = metadatas.value.get("watch_record");
|
||||
assertThat(metadata, notNullValue());
|
||||
try {
|
||||
Map<String, Object> source = metadata.getSourceAsMap();
|
||||
logger.info("checking index [{}] with metadata:\n[{}]", metadatas.key, metadata.source().toString());
|
||||
assertThat(extractValue("properties.result.properties.transform.properties.payload.enabled", source),
|
||||
is((Object) false));
|
||||
assertBusy(() -> {
|
||||
GetFieldMappingsResponse response = client().admin().indices()
|
||||
.prepareGetFieldMappings(".watcher-history*")
|
||||
.setFields("result.actions.transform.payload")
|
||||
.setTypes("watch_record")
|
||||
.includeDefaults(true)
|
||||
.get();
|
||||
|
||||
String path = "properties.result.properties.actions.properties.transform.properties.payload.enabled";
|
||||
assertThat(extractValue(path, source), is((Object) false));
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
for (Map<String, Map<String, GetFieldMappingsResponse.FieldMappingMetaData>> map : response.mappings().values()) {
|
||||
Map<String, GetFieldMappingsResponse.FieldMappingMetaData> watchRecord = map.get("watch_record");
|
||||
assertThat(watchRecord, hasKey("result.actions.transform.payload"));
|
||||
GetFieldMappingsResponse.FieldMappingMetaData fieldMappingMetaData = watchRecord.get("result.actions.transform.payload");
|
||||
assertThat(fieldMappingMetaData.isNull(), is(true));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -5,19 +5,22 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.watcher.test.integration;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
|
||||
import org.elasticsearch.action.search.SearchRequestBuilder;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.common.util.Callback;
|
||||
import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.script.MockScriptPlugin;
|
||||
import org.elasticsearch.xpack.watcher.client.WatcherClient;
|
||||
import org.elasticsearch.xpack.watcher.support.WatcherScript;
|
||||
import org.elasticsearch.xpack.watcher.support.xcontent.ObjectPath;
|
||||
import org.elasticsearch.xpack.watcher.support.xcontent.XContentSource;
|
||||
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
|
||||
import org.elasticsearch.xpack.watcher.transport.actions.execute.ExecuteWatchResponse;
|
||||
import org.elasticsearch.xpack.watcher.transport.actions.put.PutWatchResponse;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingAction;
|
||||
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
|
||||
|
@ -27,18 +30,90 @@ import static org.elasticsearch.xpack.watcher.transform.TransformBuilders.script
|
|||
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
|
||||
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.cron;
|
||||
import static org.hamcrest.CoreMatchers.nullValue;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
||||
/**
|
||||
*/
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/x-plugins/issues/724")
|
||||
public class ExecutionVarsIntegrationTests extends AbstractWatcherIntegrationTestCase {
|
||||
|
||||
@Override
|
||||
protected boolean timeWarped() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<Class<? extends Plugin>> pluginTypes() {
|
||||
List<Class<? extends Plugin>> types = super.pluginTypes();
|
||||
types.add(CustomScriptPlugin.class);
|
||||
return types;
|
||||
}
|
||||
|
||||
public static class CustomScriptPlugin extends MockScriptPlugin {
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
protected Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
|
||||
Map<String, Function<Map<String, Object>, Object>> scripts = new HashMap<>();
|
||||
|
||||
scripts.put("ctx.vars.condition_value = ctx.payload.value + 5; return ctx.vars.condition_value > 5;", vars -> {
|
||||
int value = (int) XContentMapValues.extractValue("ctx.payload.value", vars);
|
||||
|
||||
Map<String, Object> ctxVars = (Map<String, Object>) XContentMapValues.extractValue("ctx.vars", vars);
|
||||
ctxVars.put("condition_value", value + 5);
|
||||
|
||||
return (int) XContentMapValues.extractValue("condition_value", ctxVars) > 5;
|
||||
});
|
||||
|
||||
scripts.put("ctx.vars.watch_transform_value = ctx.vars.condition_value + 5; return ctx.payload;", vars -> {
|
||||
Map<String, Object> ctxVars = (Map<String, Object>) XContentMapValues.extractValue("ctx.vars", vars);
|
||||
ctxVars.put("watch_transform_value", (int) XContentMapValues.extractValue("condition_value", ctxVars) + 5);
|
||||
|
||||
return XContentMapValues.extractValue("ctx.payload", vars);
|
||||
});
|
||||
|
||||
// Transforms the value of a1, equivalent to:
|
||||
// ctx.vars.a1_transform_value = ctx.vars.watch_transform_value + 10;
|
||||
// ctx.payload.a1_transformed_value = ctx.vars.a1_transform_value;
|
||||
// return ctx.payload;
|
||||
scripts.put("transform a1", vars -> {
|
||||
Map<String, Object> ctxVars = (Map<String, Object>) XContentMapValues.extractValue("ctx.vars", vars);
|
||||
Map<String, Object> ctxPayload = (Map<String, Object>) XContentMapValues.extractValue("ctx.payload", vars);
|
||||
|
||||
int value = (int) XContentMapValues.extractValue("watch_transform_value", ctxVars);
|
||||
ctxVars.put("a1_transform_value", value + 10);
|
||||
|
||||
value = (int) XContentMapValues.extractValue("a1_transform_value", ctxVars);
|
||||
ctxPayload.put("a1_transformed_value", value);
|
||||
|
||||
return XContentMapValues.extractValue("ctx.payload", vars);
|
||||
});
|
||||
|
||||
// Transforms the value of a2, equivalent to:
|
||||
// ctx.vars.a2_transform_value = ctx.vars.watch_transform_value + 20;
|
||||
// ctx.payload.a2_transformed_value = ctx.vars.a2_transform_value;
|
||||
// return ctx.payload;
|
||||
scripts.put("transform a2", vars -> {
|
||||
Map<String, Object> ctxVars = (Map<String, Object>) XContentMapValues.extractValue("ctx.vars", vars);
|
||||
Map<String, Object> ctxPayload = (Map<String, Object>) XContentMapValues.extractValue("ctx.payload", vars);
|
||||
|
||||
int value = (int) XContentMapValues.extractValue("watch_transform_value", ctxVars);
|
||||
ctxVars.put("a2_transform_value", value + 20);
|
||||
|
||||
value = (int) XContentMapValues.extractValue("a2_transform_value", ctxVars);
|
||||
ctxPayload.put("a2_transformed_value", value);
|
||||
|
||||
return XContentMapValues.extractValue("ctx.payload", vars);
|
||||
});
|
||||
|
||||
return scripts;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String pluginScriptLang() {
|
||||
return WatcherScript.DEFAULT_LANG;
|
||||
}
|
||||
}
|
||||
|
||||
public void testVars() throws Exception {
|
||||
WatcherClient watcherClient = watcherClient();
|
||||
|
||||
|
@ -49,14 +124,12 @@ public class ExecutionVarsIntegrationTests extends AbstractWatcherIntegrationTes
|
|||
.transform(scriptTransform("ctx.vars.watch_transform_value = ctx.vars.condition_value + 5; return ctx.payload;"))
|
||||
.addAction(
|
||||
"a1",
|
||||
scriptTransform("ctx.vars.a1_transform_value = ctx.vars.watch_transform_value + 10; return ctx.payload;"),
|
||||
loggingAction("condition_value={{ctx.vars.condition_value}}, watch_transform_value={{ctx.vars" +
|
||||
".watch_transform_value}}, a1_transform_value={{ctx.vars.a1_transform_value}}"))
|
||||
scriptTransform("transform a1"),
|
||||
loggingAction("_text"))
|
||||
.addAction(
|
||||
"a2",
|
||||
scriptTransform("ctx.vars.a2_transform_value = ctx.vars.watch_transform_value + 20; return ctx.payload;"),
|
||||
loggingAction("condition_value={{ctx.vars.condition_value}}, watch_transform_value={{ctx.vars" +
|
||||
".watch_transform_value}}, a2_transform_value={{ctx.vars.a2_transform_value}}")))
|
||||
scriptTransform("transform a2"),
|
||||
loggingAction("_text")))
|
||||
.get();
|
||||
|
||||
assertThat(putWatchResponse.isCreated(), is(true));
|
||||
|
@ -66,11 +139,8 @@ public class ExecutionVarsIntegrationTests extends AbstractWatcherIntegrationTes
|
|||
flush();
|
||||
refresh();
|
||||
|
||||
SearchResponse searchResponse = searchWatchRecords(new Callback<SearchRequestBuilder>() {
|
||||
@Override
|
||||
public void handle(SearchRequestBuilder builder) {
|
||||
// defaults to match all;
|
||||
}
|
||||
SearchResponse searchResponse = searchWatchRecords(builder -> {
|
||||
// defaults to match all;
|
||||
});
|
||||
|
||||
assertThat(searchResponse.getHits().getTotalHits(), is(1L));
|
||||
|
@ -93,12 +163,12 @@ public class ExecutionVarsIntegrationTests extends AbstractWatcherIntegrationTes
|
|||
case "a1":
|
||||
assertValue(action, "status", is("success"));
|
||||
assertValue(action, "transform.status", is("success"));
|
||||
assertValue(action, "logging.logged_text", is("condition_value=10, watch_transform_value=15, a1_transform_value=25"));
|
||||
assertValue(action, "transform.payload.a1_transformed_value", equalTo(25));
|
||||
break;
|
||||
case "a2":
|
||||
assertValue(action, "status", is("success"));
|
||||
assertValue(action, "transform.status", is("success"));
|
||||
assertValue(action, "logging.logged_text", is("condition_value=10, watch_transform_value=15, a2_transform_value=35"));
|
||||
assertValue(action, "transform.payload.a2_transformed_value", equalTo(35));
|
||||
break;
|
||||
default:
|
||||
fail("there should not be an action result for action with an id other than a1 or a2");
|
||||
|
@ -116,14 +186,12 @@ public class ExecutionVarsIntegrationTests extends AbstractWatcherIntegrationTes
|
|||
.transform(scriptTransform("ctx.vars.watch_transform_value = ctx.vars.condition_value + 5; return ctx.payload;"))
|
||||
.addAction(
|
||||
"a1",
|
||||
scriptTransform("ctx.vars.a1_transform_value = ctx.vars.watch_transform_value + 10; return ctx.payload;"),
|
||||
loggingAction("condition_value={{ctx.vars.condition_value}}, watch_transform_value={{ctx.vars" +
|
||||
".watch_transform_value}}, a1_transform_value={{ctx.vars.a1_transform_value}}"))
|
||||
scriptTransform("transform a1"),
|
||||
loggingAction("_text"))
|
||||
.addAction(
|
||||
"a2",
|
||||
scriptTransform("ctx.vars.a2_transform_value = ctx.vars.watch_transform_value + 20; return ctx.payload;"),
|
||||
loggingAction("condition_value={{ctx.vars.condition_value}}, watch_transform_value={{ctx.vars" +
|
||||
".watch_transform_value}}, a2_transform_value={{ctx.vars.a2_transform_value}}")))
|
||||
scriptTransform("transform a2"),
|
||||
loggingAction("_text")))
|
||||
.get();
|
||||
|
||||
assertThat(putWatchResponse.isCreated(), is(true));
|
||||
|
@ -157,12 +225,12 @@ public class ExecutionVarsIntegrationTests extends AbstractWatcherIntegrationTes
|
|||
case "a1":
|
||||
assertValue(action, "status", is("success"));
|
||||
assertValue(action, "transform.status", is("success"));
|
||||
assertValue(action, "logging.logged_text", is("condition_value=10, watch_transform_value=15, a1_transform_value=25"));
|
||||
assertValue(action, "transform.payload.a1_transformed_value", equalTo(25));
|
||||
break;
|
||||
case "a2":
|
||||
assertValue(action, "status", is("success"));
|
||||
assertValue(action, "transform.status", is("success"));
|
||||
assertValue(action, "logging.logged_text", is("condition_value=10, watch_transform_value=15, a2_transform_value=35"));
|
||||
assertValue(action, "transform.payload.a2_transformed_value", equalTo(35));
|
||||
break;
|
||||
default:
|
||||
fail("there should not be an action result for action with an id other than a1 or a2");
|
||||
|
|
|
@ -5,24 +5,31 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.watcher.transform;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.common.io.Streams;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.script.MockScriptPlugin;
|
||||
import org.elasticsearch.xpack.watcher.support.WatcherScript;
|
||||
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
|
||||
import org.elasticsearch.xpack.watcher.test.WatcherTestUtils;
|
||||
import org.elasticsearch.xpack.watcher.transport.actions.put.PutWatchResponse;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
||||
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
|
||||
|
@ -41,25 +48,62 @@ import static org.hamcrest.Matchers.equalTo;
|
|||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
/**
|
||||
*/
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/x-plugins/issues/724")
|
||||
public class TransformIntegrationTests extends AbstractWatcherIntegrationTestCase {
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
List<Class<? extends Plugin>> types = super.pluginTypes();
|
||||
types.add(CustomScriptPlugin.class);
|
||||
return types;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Settings nodeSettings(int nodeOrdinal) {
|
||||
Path configDir = createTempDir();
|
||||
Path scripts = configDir.resolve("scripts");
|
||||
Path config = createTempDir().resolve("config");
|
||||
Path scripts = config.resolve("scripts");
|
||||
|
||||
try {
|
||||
Files.createDirectories(scripts);
|
||||
try (InputStream stream = TransformIntegrationTests.class.getResourceAsStream("/config/scripts/my-script.groovy");
|
||||
OutputStream output = Files.newOutputStream(scripts.resolve("my-script.groovy"))) {
|
||||
Streams.copy(stream, output);
|
||||
}
|
||||
|
||||
// When using the MockScriptPlugin we can map File scripts to inline scripts:
|
||||
// the name of the file script is used in test method while the source of the file script
|
||||
// must match a predefined script from CustomScriptPlugin.pluginScripts() method
|
||||
Files.write(scripts.resolve("my-script.groovy"), "return [key3 : ctx.payload.key1 + ctx.payload.key2]".getBytes("UTF-8"));
|
||||
} catch (IOException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
throw new RuntimeException("Failed to create scripts", ex);
|
||||
}
|
||||
|
||||
// Set the config path so that the ScriptService will pick up the test scripts
|
||||
return Settings.builder()
|
||||
.put(super.nodeSettings(nodeOrdinal))
|
||||
.put(Environment.PATH_CONF_SETTING.getKey(), config)
|
||||
.build();
|
||||
}
|
||||
|
||||
public static class CustomScriptPlugin extends MockScriptPlugin {
|
||||
|
||||
@Override
|
||||
protected Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
|
||||
Map<String, Function<Map<String, Object>, Object>> scripts = new HashMap<>();
|
||||
|
||||
scripts.put("return [key3 : ctx.payload.key1 + ctx.payload.key2]", vars -> {
|
||||
int key1 = (int) XContentMapValues.extractValue("ctx.payload.key1", vars);
|
||||
int key2 = (int) XContentMapValues.extractValue("ctx.payload.key2", vars);
|
||||
return singletonMap("key3", key1 + key2);
|
||||
});
|
||||
|
||||
scripts.put("return [key4 : ctx.payload.key3 + 10]", vars -> {
|
||||
int key3 = (int) XContentMapValues.extractValue("ctx.payload.key3", vars);
|
||||
return singletonMap("key4", key3 + 10);
|
||||
});
|
||||
|
||||
return scripts;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String pluginScriptLang() {
|
||||
return WatcherScript.DEFAULT_LANG;
|
||||
}
|
||||
//Set path so ScriptService will pick up the test scripts
|
||||
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put("path.conf", configDir.toString()).build();
|
||||
}
|
||||
|
||||
public void testScriptTransform() throws Exception {
|
||||
|
|
|
@ -1 +0,0 @@
|
|||
return [key3 : ctx.payload.key1 + ctx.payload.key2]
|
Loading…
Reference in New Issue