Tests: Improve execution speed of WatchStatsTests

Those tests ran 35 seconds on my local notebook. By not using a sleep based
scripting engine and some other tweaks the time is down to 13 seconds.

Also renamed the class to remove the `Slow` prefix.

Original commit: elastic/x-pack-elasticsearch@5289fe8dab
This commit is contained in:
Alexander Reelsen 2016-11-03 09:37:58 +01:00
parent 714b891b03
commit c0a1ec89f5
5 changed files with 245 additions and 199 deletions

View File

@ -0,0 +1,108 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.script;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.ScriptPlugin;
import org.elasticsearch.search.lookup.SearchLookup;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class LatchScriptEngine implements ScriptEngineService {
private static final String NAME = "latch";
private static final LatchScriptEngine INSTANCE = new LatchScriptEngine();
private CountDownLatch scriptStartedLatch = new CountDownLatch(1);
private CountDownLatch scriptCompletionLatch = new CountDownLatch(1);
private Logger logger = ESLoggerFactory.getLogger(getClass());
@Override
public String getType() {
return NAME;
}
@Override
public String getExtension() {
return NAME;
}
@Override
public Object compile(String scriptName, String scriptSource, Map<String, String> params) {
return scriptSource;
}
@Override
public ExecutableScript executable(CompiledScript compiledScript, @Nullable Map<String, Object> vars) {
return new AbstractSearchScript() {
@Override
public Object run() {
scriptStartedLatch.countDown();
try {
if (scriptCompletionLatch.await(10, TimeUnit.SECONDS) == false) {
logger.error("Script completion latch was not counted down after 10 seconds");
}
} catch (InterruptedException e) {}
return true;
}
};
}
@Override
public SearchScript search(CompiledScript compiledScript, SearchLookup lookup, @Nullable Map<String, Object> vars) {
return null;
}
@Override
public void close() throws IOException {
}
public void awaitScriptStartedExecution() throws InterruptedException {
if (scriptStartedLatch.await(10, TimeUnit.SECONDS) == false) {
throw new ElasticsearchException("Expected script to be called within 10 seconds, did not happen");
}
}
public void finishScriptExecution() throws InterruptedException {
scriptCompletionLatch.countDown();
scriptCompletionLatch.await(1, TimeUnit.SECONDS);
}
public void reset() {
scriptStartedLatch = new CountDownLatch(1);
scriptCompletionLatch = new CountDownLatch(1);
}
public static Script latchScript() {
return new Script("", ScriptType.INLINE, NAME, Collections.emptyMap());
}
@Override
public boolean isInlineScriptEnabled() {
return true;
}
public static class LatchScriptPlugin extends Plugin implements ScriptPlugin {
@Override
public ScriptEngineService getScriptEngineService(Settings settings) {
return INSTANCE;
}
public LatchScriptEngine getScriptEngineService() {
return INSTANCE;
}
}
}

View File

@ -1,79 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.script;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.ScriptPlugin;
import org.elasticsearch.search.lookup.SearchLookup;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
/**
* A dummy script engine used for testing. Scripts must be a number. Running the script
*/
public class SleepScriptEngine implements ScriptEngineService {
public static final String NAME = "sleep";
public static class TestPlugin extends Plugin implements ScriptPlugin {
@Override
public ScriptEngineService getScriptEngineService(Settings settings) {
return new SleepScriptEngine();
}
}
@Override
public String getType() {
return NAME;
}
@Override
public String getExtension() {
return NAME;
}
@Override
public Object compile(String scriptName, String scriptSource, Map<String, String> params) {
return scriptSource;
}
@Override
public ExecutableScript executable(CompiledScript compiledScript, @Nullable Map<String, Object> vars) {
return new AbstractSearchScript() {
@Override
public Object run() {
try {
Thread.sleep(((Number) vars.get("millis")).longValue());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return true;
}
};
}
@Override
public SearchScript search(CompiledScript compiledScript, SearchLookup lookup, @Nullable Map<String, Object> vars) {
return null;
}
@Override
public void close() throws IOException {
}
public static Script sleepScript(long millis) {
return new Script("", ScriptType.INLINE, "sleep", Collections.singletonMap("millis", millis));
}
@Override
public boolean isInlineScriptEnabled() {
return true;
}
}

View File

@ -1,119 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.transport.action.stats;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.SleepScriptEngine;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.watcher.WatcherState;
import org.elasticsearch.xpack.watcher.actions.ActionBuilders;
import org.elasticsearch.xpack.watcher.condition.ScriptCondition;
import org.elasticsearch.xpack.watcher.execution.ExecutionPhase;
import org.elasticsearch.xpack.watcher.execution.QueuedWatch;
import org.elasticsearch.xpack.watcher.input.InputBuilders;
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
import org.elasticsearch.xpack.watcher.transport.actions.stats.WatcherStatsResponse;
import org.joda.time.DateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.test.ESIntegTestCase.Scope.TEST;
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
@ESIntegTestCase.ClusterScope(scope = TEST, numClientNodes = 0, transportClientRatio = 0, randomDynamicTemplates = false, numDataNodes = 2)
public class SlowWatchStatsTests extends AbstractWatcherIntegrationTestCase {
@Override
protected boolean timeWarped() {
return false;
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
plugins.add(SleepScriptEngine.TestPlugin.class);
return plugins;
}
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
// So it is predictable how many slow watches we need to add to accumulate pending watches
.put(EsExecutors.PROCESSORS_SETTING.getKey(), "1")
.build();
}
public void testCurrentWatches() throws Exception {
watcherClient().preparePutWatch("_id").setSource(watchBuilder()
.trigger(schedule(interval("1s")))
.input(InputBuilders.simpleInput("key", "value"))
.condition(new ScriptCondition(SleepScriptEngine.sleepScript(10000)))
.addAction("_action", ActionBuilders.loggingAction("hello {{ctx.watch_id}}!"))
).get();
assertBusy(new Runnable() {
@Override
public void run() {
WatcherStatsResponse response = watcherClient().prepareWatcherStats().setIncludeCurrentWatches(true).get();
assertThat(response.getWatcherState(), equalTo(WatcherState.STARTED));
assertThat(response.getWatchesCount(), equalTo(1L));
assertThat(response.getQueuedWatches(), nullValue());
assertThat(response.getSnapshots(), notNullValue());
assertThat(response.getSnapshots().size(), equalTo(1));
assertThat(response.getSnapshots().get(0).watchId(), equalTo("_id"));
assertThat(response.getSnapshots().get(0).executionPhase(), equalTo(ExecutionPhase.CONDITION));
}
});
}
public void testPendingWatches() throws Exception {
// Add 5 slow watches and we should almost immediately see pending watches in the stats api
for (int i = 0; i < 5; i++) {
watcherClient().preparePutWatch("_id" + i).setSource(watchBuilder()
.trigger(schedule(interval("1s")))
.input(InputBuilders.simpleInput("key", "value"))
.condition(new ScriptCondition(SleepScriptEngine.sleepScript(10000)))
.addAction("_action", ActionBuilders.loggingAction("hello {{ctx.watch_id}}!"))
).get();
}
assertBusy(new Runnable() {
@Override
public void run() {
WatcherStatsResponse response = watcherClient().prepareWatcherStats().setIncludeQueuedWatches(true).get();
assertThat(response.getWatcherState(), equalTo(WatcherState.STARTED));
assertThat(response.getWatchesCount(), equalTo(5L));
assertThat(response.getSnapshots(), nullValue());
assertThat(response.getQueuedWatches(), notNullValue());
assertThat(response.getQueuedWatches().size(), greaterThanOrEqualTo(5));
DateTime previous = null;
for (QueuedWatch queuedWatch : response.getQueuedWatches()) {
assertThat(queuedWatch.watchId(),
anyOf(equalTo("_id0"), equalTo("_id1"), equalTo("_id2"), equalTo("_id3"), equalTo("_id4")));
if (previous != null) {
// older pending watch should be on top:
assertThat(previous.getMillis(), lessThanOrEqualTo(queuedWatch.executionTime().getMillis()));
}
previous = queuedWatch.executionTime();
}
}
}, 60, TimeUnit.SECONDS);
}
}

View File

@ -0,0 +1,136 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.transport.action.stats;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.LatchScriptEngine;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.watcher.WatcherState;
import org.elasticsearch.xpack.watcher.actions.ActionBuilders;
import org.elasticsearch.xpack.watcher.condition.ScriptCondition;
import org.elasticsearch.xpack.watcher.execution.ExecutionPhase;
import org.elasticsearch.xpack.watcher.execution.QueuedWatch;
import org.elasticsearch.xpack.watcher.input.InputBuilders;
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
import org.elasticsearch.xpack.watcher.transport.actions.stats.WatcherStatsResponse;
import org.joda.time.DateTime;
import org.junit.After;
import org.junit.Before;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE;
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.noneInput;
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
@ESIntegTestCase.ClusterScope(scope = SUITE, numClientNodes = 0, transportClientRatio = 0,
randomDynamicTemplates = false, numDataNodes = 1, supportsDedicatedMasters = false)
public class WatchStatsTests extends AbstractWatcherIntegrationTestCase {
@Override
protected boolean timeWarped() {
return false;
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
plugins.add(LatchScriptEngine.LatchScriptPlugin.class);
return plugins;
}
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
// So it is predictable how many slow watches we need to add to accumulate pending watches
.put(EsExecutors.PROCESSORS_SETTING.getKey(), "1")
.build();
}
@Before
public void createLatches() {
getLatchScriptEngine().reset();
}
@After
public void clearLatches() throws InterruptedException {
getLatchScriptEngine().finishScriptExecution();
}
public void testCurrentWatches() throws Exception {
watcherClient().preparePutWatch("_id").setSource(watchBuilder()
.trigger(schedule(interval("1s")))
.input(InputBuilders.simpleInput("key", "value"))
.condition(new ScriptCondition(LatchScriptEngine.latchScript()))
.addAction("_action", ActionBuilders.loggingAction("some logging"))
).get();
getLatchScriptEngine().awaitScriptStartedExecution();
WatcherStatsResponse response = watcherClient().prepareWatcherStats().setIncludeCurrentWatches(true).get();
assertThat(response.getWatcherState(), equalTo(WatcherState.STARTED));
assertThat(response.getWatchesCount(), equalTo(1L));
assertThat(response.getQueuedWatches(), nullValue());
assertThat(response.getSnapshots(), notNullValue());
assertThat(response.getSnapshots().size(), equalTo(1));
assertThat(response.getSnapshots().get(0).watchId(), equalTo("_id"));
assertThat(response.getSnapshots().get(0).executionPhase(), equalTo(ExecutionPhase.CONDITION));
}
public void testPendingWatches() throws Exception {
// Add 5 slow watches and we should almost immediately see pending watches in the stats api
for (int i = 0; i < 5; i++) {
watcherClient().preparePutWatch("_id" + i).setSource(watchBuilder()
.trigger(schedule(interval("1s")))
.input(noneInput())
.condition(new ScriptCondition(LatchScriptEngine.latchScript()))
.addAction("_action", ActionBuilders.loggingAction("some logging"))
).get();
}
logger.info("Waiting for first script invocation");
getLatchScriptEngine().awaitScriptStartedExecution();
// I know this still sucks, but it is still way faster than the older implementation
logger.info("Sleeping 2.5 seconds to make sure a new round of watches is queued");
Thread.sleep(2500);
logger.info("Sleeping done, checking stats response");
WatcherStatsResponse response = watcherClient().prepareWatcherStats().setIncludeQueuedWatches(true).get();
assertThat(response.getWatcherState(), equalTo(WatcherState.STARTED));
assertThat(response.getWatchesCount(), equalTo(5L));
assertThat(response.getSnapshots(), nullValue());
assertThat(response.getQueuedWatches(), notNullValue());
assertThat(response.getQueuedWatches().size(), greaterThanOrEqualTo(5));
DateTime previous = null;
for (QueuedWatch queuedWatch : response.getQueuedWatches()) {
assertThat(queuedWatch.watchId(),
anyOf(equalTo("_id0"), equalTo("_id1"), equalTo("_id2"), equalTo("_id3"), equalTo("_id4")));
if (previous != null) {
// older pending watch should be on top:
assertThat(previous.getMillis(), lessThanOrEqualTo(queuedWatch.executionTime().getMillis()));
}
previous = queuedWatch.executionTime();
}
logger.info("Pending watches test finished, now counting down latches");
}
private LatchScriptEngine getLatchScriptEngine() {
return internalCluster().getInstance(LatchScriptEngine.LatchScriptPlugin.class).getScriptEngineService();
}
}

View File

@ -85,7 +85,7 @@ public class ScheduleTriggerEngineMock extends ScheduleTriggerEngine {
logger.debug("firing [{}] at [{}]", jobName, now);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(jobName, now, now);
for (Listener listener : listeners) {
listener.triggered(Arrays.<TriggerEvent>asList(event));
listener.triggered(Arrays.asList(event));
}
if (interval != null) {
if (clock instanceof ClockMock) {