diff --git a/elasticsearch/src/test/java/org/elasticsearch/script/LatchScriptEngine.java b/elasticsearch/src/test/java/org/elasticsearch/script/LatchScriptEngine.java new file mode 100644 index 00000000000..714ae16ae14 --- /dev/null +++ b/elasticsearch/src/test/java/org/elasticsearch/script/LatchScriptEngine.java @@ -0,0 +1,108 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.script; + +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.logging.ESLoggerFactory; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.ScriptPlugin; +import org.elasticsearch.search.lookup.SearchLookup; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class LatchScriptEngine implements ScriptEngineService { + + private static final String NAME = "latch"; + private static final LatchScriptEngine INSTANCE = new LatchScriptEngine(); + + private CountDownLatch scriptStartedLatch = new CountDownLatch(1); + private CountDownLatch scriptCompletionLatch = new CountDownLatch(1); + private Logger logger = ESLoggerFactory.getLogger(getClass()); + + @Override + public String getType() { + return NAME; + } + + @Override + public String getExtension() { + return NAME; + } + + @Override + public Object compile(String scriptName, String scriptSource, Map params) { + return scriptSource; + } + + @Override + public ExecutableScript executable(CompiledScript compiledScript, @Nullable Map vars) { + return new AbstractSearchScript() { + @Override + public Object run() { + scriptStartedLatch.countDown(); + try { + if (scriptCompletionLatch.await(10, TimeUnit.SECONDS) == false) { + logger.error("Script completion latch was not counted down after 10 seconds"); + } + } catch (InterruptedException e) {} + return true; + } + }; + } + + @Override + public SearchScript search(CompiledScript compiledScript, SearchLookup lookup, @Nullable Map vars) { + return null; + } + + @Override + public void close() throws IOException { + } + + public void awaitScriptStartedExecution() throws InterruptedException { + if (scriptStartedLatch.await(10, TimeUnit.SECONDS) == false) { + throw new ElasticsearchException("Expected script to be called within 10 seconds, did not happen"); + } + } + + public void finishScriptExecution() throws InterruptedException { + scriptCompletionLatch.countDown(); + scriptCompletionLatch.await(1, TimeUnit.SECONDS); + } + + public void reset() { + scriptStartedLatch = new CountDownLatch(1); + scriptCompletionLatch = new CountDownLatch(1); + } + + public static Script latchScript() { + return new Script("", ScriptType.INLINE, NAME, Collections.emptyMap()); + } + + @Override + public boolean isInlineScriptEnabled() { + return true; + } + + public static class LatchScriptPlugin extends Plugin implements ScriptPlugin { + @Override + public ScriptEngineService getScriptEngineService(Settings settings) { + return INSTANCE; + } + + public LatchScriptEngine getScriptEngineService() { + return INSTANCE; + } + } + +} diff --git a/elasticsearch/src/test/java/org/elasticsearch/script/SleepScriptEngine.java b/elasticsearch/src/test/java/org/elasticsearch/script/SleepScriptEngine.java deleted file mode 100644 index 8961cc3e440..00000000000 --- a/elasticsearch/src/test/java/org/elasticsearch/script/SleepScriptEngine.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.script; - -import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.plugins.ScriptPlugin; -import org.elasticsearch.search.lookup.SearchLookup; - -import java.io.IOException; -import java.util.Collections; -import java.util.Map; - -/** - * A dummy script engine used for testing. Scripts must be a number. Running the script - */ -public class SleepScriptEngine implements ScriptEngineService { - - public static final String NAME = "sleep"; - - public static class TestPlugin extends Plugin implements ScriptPlugin { - @Override - public ScriptEngineService getScriptEngineService(Settings settings) { - return new SleepScriptEngine(); - } - } - - @Override - public String getType() { - return NAME; - } - - @Override - public String getExtension() { - return NAME; - } - - @Override - public Object compile(String scriptName, String scriptSource, Map params) { - return scriptSource; - } - - @Override - public ExecutableScript executable(CompiledScript compiledScript, @Nullable Map vars) { - return new AbstractSearchScript() { - @Override - public Object run() { - try { - Thread.sleep(((Number) vars.get("millis")).longValue()); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - return true; - } - }; - } - - @Override - public SearchScript search(CompiledScript compiledScript, SearchLookup lookup, @Nullable Map vars) { - return null; - } - - @Override - public void close() throws IOException { - } - - public static Script sleepScript(long millis) { - return new Script("", ScriptType.INLINE, "sleep", Collections.singletonMap("millis", millis)); - } - - @Override - public boolean isInlineScriptEnabled() { - return true; - } -} diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/transport/action/stats/SlowWatchStatsTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/transport/action/stats/SlowWatchStatsTests.java deleted file mode 100644 index 242ac19328f..00000000000 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/transport/action/stats/SlowWatchStatsTests.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.watcher.transport.action.stats; - -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.script.SleepScriptEngine; -import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.xpack.watcher.WatcherState; -import org.elasticsearch.xpack.watcher.actions.ActionBuilders; -import org.elasticsearch.xpack.watcher.condition.ScriptCondition; -import org.elasticsearch.xpack.watcher.execution.ExecutionPhase; -import org.elasticsearch.xpack.watcher.execution.QueuedWatch; -import org.elasticsearch.xpack.watcher.input.InputBuilders; -import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase; -import org.elasticsearch.xpack.watcher.transport.actions.stats.WatcherStatsResponse; -import org.joda.time.DateTime; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import static org.elasticsearch.test.ESIntegTestCase.Scope.TEST; -import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder; -import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule; -import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval; -import static org.hamcrest.Matchers.anyOf; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.lessThanOrEqualTo; -import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; - -@ESIntegTestCase.ClusterScope(scope = TEST, numClientNodes = 0, transportClientRatio = 0, randomDynamicTemplates = false, numDataNodes = 2) -public class SlowWatchStatsTests extends AbstractWatcherIntegrationTestCase { - @Override - protected boolean timeWarped() { - return false; - } - - @Override - protected Collection> nodePlugins() { - List> plugins = new ArrayList<>(super.nodePlugins()); - plugins.add(SleepScriptEngine.TestPlugin.class); - return plugins; - } - - @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder() - .put(super.nodeSettings(nodeOrdinal)) - // So it is predictable how many slow watches we need to add to accumulate pending watches - .put(EsExecutors.PROCESSORS_SETTING.getKey(), "1") - .build(); - } - - public void testCurrentWatches() throws Exception { - watcherClient().preparePutWatch("_id").setSource(watchBuilder() - .trigger(schedule(interval("1s"))) - .input(InputBuilders.simpleInput("key", "value")) - .condition(new ScriptCondition(SleepScriptEngine.sleepScript(10000))) - .addAction("_action", ActionBuilders.loggingAction("hello {{ctx.watch_id}}!")) - ).get(); - - assertBusy(new Runnable() { - @Override - public void run() { - WatcherStatsResponse response = watcherClient().prepareWatcherStats().setIncludeCurrentWatches(true).get(); - assertThat(response.getWatcherState(), equalTo(WatcherState.STARTED)); - assertThat(response.getWatchesCount(), equalTo(1L)); - assertThat(response.getQueuedWatches(), nullValue()); - assertThat(response.getSnapshots(), notNullValue()); - assertThat(response.getSnapshots().size(), equalTo(1)); - assertThat(response.getSnapshots().get(0).watchId(), equalTo("_id")); - assertThat(response.getSnapshots().get(0).executionPhase(), equalTo(ExecutionPhase.CONDITION)); - } - }); - } - - public void testPendingWatches() throws Exception { - // Add 5 slow watches and we should almost immediately see pending watches in the stats api - for (int i = 0; i < 5; i++) { - watcherClient().preparePutWatch("_id" + i).setSource(watchBuilder() - .trigger(schedule(interval("1s"))) - .input(InputBuilders.simpleInput("key", "value")) - .condition(new ScriptCondition(SleepScriptEngine.sleepScript(10000))) - .addAction("_action", ActionBuilders.loggingAction("hello {{ctx.watch_id}}!")) - ).get(); - } - - assertBusy(new Runnable() { - @Override - public void run() { - WatcherStatsResponse response = watcherClient().prepareWatcherStats().setIncludeQueuedWatches(true).get(); - assertThat(response.getWatcherState(), equalTo(WatcherState.STARTED)); - assertThat(response.getWatchesCount(), equalTo(5L)); - assertThat(response.getSnapshots(), nullValue()); - assertThat(response.getQueuedWatches(), notNullValue()); - assertThat(response.getQueuedWatches().size(), greaterThanOrEqualTo(5)); - DateTime previous = null; - for (QueuedWatch queuedWatch : response.getQueuedWatches()) { - assertThat(queuedWatch.watchId(), - anyOf(equalTo("_id0"), equalTo("_id1"), equalTo("_id2"), equalTo("_id3"), equalTo("_id4"))); - if (previous != null) { - // older pending watch should be on top: - assertThat(previous.getMillis(), lessThanOrEqualTo(queuedWatch.executionTime().getMillis())); - } - previous = queuedWatch.executionTime(); - } - } - }, 60, TimeUnit.SECONDS); - } - -} diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/transport/action/stats/WatchStatsTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/transport/action/stats/WatchStatsTests.java new file mode 100644 index 00000000000..7a964a13532 --- /dev/null +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/transport/action/stats/WatchStatsTests.java @@ -0,0 +1,136 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.watcher.transport.action.stats; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.script.LatchScriptEngine; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.xpack.watcher.WatcherState; +import org.elasticsearch.xpack.watcher.actions.ActionBuilders; +import org.elasticsearch.xpack.watcher.condition.ScriptCondition; +import org.elasticsearch.xpack.watcher.execution.ExecutionPhase; +import org.elasticsearch.xpack.watcher.execution.QueuedWatch; +import org.elasticsearch.xpack.watcher.input.InputBuilders; +import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase; +import org.elasticsearch.xpack.watcher.transport.actions.stats.WatcherStatsResponse; +import org.joda.time.DateTime; +import org.junit.After; +import org.junit.Before; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE; +import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder; +import static org.elasticsearch.xpack.watcher.input.InputBuilders.noneInput; +import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule; +import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval; +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +@ESIntegTestCase.ClusterScope(scope = SUITE, numClientNodes = 0, transportClientRatio = 0, + randomDynamicTemplates = false, numDataNodes = 1, supportsDedicatedMasters = false) +public class WatchStatsTests extends AbstractWatcherIntegrationTestCase { + + @Override + protected boolean timeWarped() { + return false; + } + + @Override + protected Collection> nodePlugins() { + List> plugins = new ArrayList<>(super.nodePlugins()); + plugins.add(LatchScriptEngine.LatchScriptPlugin.class); + return plugins; + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + // So it is predictable how many slow watches we need to add to accumulate pending watches + .put(EsExecutors.PROCESSORS_SETTING.getKey(), "1") + .build(); + } + + @Before + public void createLatches() { + getLatchScriptEngine().reset(); + } + + @After + public void clearLatches() throws InterruptedException { + getLatchScriptEngine().finishScriptExecution(); + } + + public void testCurrentWatches() throws Exception { + watcherClient().preparePutWatch("_id").setSource(watchBuilder() + .trigger(schedule(interval("1s"))) + .input(InputBuilders.simpleInput("key", "value")) + .condition(new ScriptCondition(LatchScriptEngine.latchScript())) + .addAction("_action", ActionBuilders.loggingAction("some logging")) + ).get(); + + getLatchScriptEngine().awaitScriptStartedExecution(); + + WatcherStatsResponse response = watcherClient().prepareWatcherStats().setIncludeCurrentWatches(true).get(); + assertThat(response.getWatcherState(), equalTo(WatcherState.STARTED)); + assertThat(response.getWatchesCount(), equalTo(1L)); + assertThat(response.getQueuedWatches(), nullValue()); + assertThat(response.getSnapshots(), notNullValue()); + assertThat(response.getSnapshots().size(), equalTo(1)); + assertThat(response.getSnapshots().get(0).watchId(), equalTo("_id")); + assertThat(response.getSnapshots().get(0).executionPhase(), equalTo(ExecutionPhase.CONDITION)); + } + + public void testPendingWatches() throws Exception { + // Add 5 slow watches and we should almost immediately see pending watches in the stats api + for (int i = 0; i < 5; i++) { + watcherClient().preparePutWatch("_id" + i).setSource(watchBuilder() + .trigger(schedule(interval("1s"))) + .input(noneInput()) + .condition(new ScriptCondition(LatchScriptEngine.latchScript())) + .addAction("_action", ActionBuilders.loggingAction("some logging")) + ).get(); + } + + logger.info("Waiting for first script invocation"); + getLatchScriptEngine().awaitScriptStartedExecution(); + // I know this still sucks, but it is still way faster than the older implementation + logger.info("Sleeping 2.5 seconds to make sure a new round of watches is queued"); + Thread.sleep(2500); + logger.info("Sleeping done, checking stats response"); + + WatcherStatsResponse response = watcherClient().prepareWatcherStats().setIncludeQueuedWatches(true).get(); + assertThat(response.getWatcherState(), equalTo(WatcherState.STARTED)); + assertThat(response.getWatchesCount(), equalTo(5L)); + assertThat(response.getSnapshots(), nullValue()); + assertThat(response.getQueuedWatches(), notNullValue()); + assertThat(response.getQueuedWatches().size(), greaterThanOrEqualTo(5)); + DateTime previous = null; + for (QueuedWatch queuedWatch : response.getQueuedWatches()) { + assertThat(queuedWatch.watchId(), + anyOf(equalTo("_id0"), equalTo("_id1"), equalTo("_id2"), equalTo("_id3"), equalTo("_id4"))); + if (previous != null) { + // older pending watch should be on top: + assertThat(previous.getMillis(), lessThanOrEqualTo(queuedWatch.executionTime().getMillis())); + } + previous = queuedWatch.executionTime(); + } + logger.info("Pending watches test finished, now counting down latches"); + } + + private LatchScriptEngine getLatchScriptEngine() { + return internalCluster().getInstance(LatchScriptEngine.LatchScriptPlugin.class).getScriptEngineService(); + } +} diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/trigger/ScheduleTriggerEngineMock.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/trigger/ScheduleTriggerEngineMock.java index 424aa09ce91..166e0a5f7bf 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/trigger/ScheduleTriggerEngineMock.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/trigger/ScheduleTriggerEngineMock.java @@ -85,7 +85,7 @@ public class ScheduleTriggerEngineMock extends ScheduleTriggerEngine { logger.debug("firing [{}] at [{}]", jobName, now); ScheduleTriggerEvent event = new ScheduleTriggerEvent(jobName, now, now); for (Listener listener : listeners) { - listener.triggered(Arrays.asList(event)); + listener.triggered(Arrays.asList(event)); } if (interval != null) { if (clock instanceof ClockMock) {