Watcher: Replace flaky WatchStatsTests with unit tests (elastic/x-pack-elasticsearch#2118)
This flaky tests was using sleep, latches and a custom script plugin, causing issues with stopping/starting tests. This removes the integration tests and replaces it with a unit test. Also removed a couple of unused ctor/method parameters as cleanup. relates elastic/x-pack-elasticsearch#1639 Original commit: elastic/x-pack-elasticsearch@2a42faf2db
This commit is contained in:
parent
9ea032998b
commit
681ce6e2aa
|
@ -304,7 +304,7 @@ public class Watcher implements ActionPlugin {
|
|||
final Watch.Parser watchParser = new Watch.Parser(settings, triggerService, registry, inputRegistry, cryptoService, clock);
|
||||
|
||||
final ExecutionService executionService = new ExecutionService(settings, historyStore, triggeredWatchStore, watchExecutor,
|
||||
clock, threadPool, watchParser, clusterService, internalClient);
|
||||
clock, watchParser, clusterService, internalClient);
|
||||
|
||||
final Consumer<Iterable<TriggerEvent>> triggerEngineListener = getTriggerEngineListener(executionService);
|
||||
triggerService.register(triggerEngineListener);
|
||||
|
|
|
@ -122,7 +122,7 @@ public class WatcherService extends AbstractComponent {
|
|||
try {
|
||||
logger.debug("starting watch service...");
|
||||
|
||||
executionService.start(clusterState);
|
||||
executionService.start();
|
||||
Collection<Watch> watches = loadWatches(clusterState);
|
||||
triggerService.start(watches);
|
||||
|
||||
|
|
|
@ -32,7 +32,6 @@ import org.elasticsearch.common.xcontent.XContentType;
|
|||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.index.engine.DocumentMissingException;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.common.stats.Counters;
|
||||
import org.elasticsearch.xpack.watcher.Watcher;
|
||||
import org.elasticsearch.xpack.watcher.actions.ActionWrapper;
|
||||
|
@ -74,7 +73,6 @@ public class ExecutionService extends AbstractComponent {
|
|||
private final Clock clock;
|
||||
private final TimeValue defaultThrottlePeriod;
|
||||
private final TimeValue maxStopTimeout;
|
||||
private final ThreadPool threadPool;
|
||||
private final Watch.Parser parser;
|
||||
private final ClusterService clusterService;
|
||||
private final Client client;
|
||||
|
@ -84,8 +82,7 @@ public class ExecutionService extends AbstractComponent {
|
|||
private final AtomicBoolean started = new AtomicBoolean(false);
|
||||
|
||||
public ExecutionService(Settings settings, HistoryStore historyStore, TriggeredWatchStore triggeredWatchStore, WatchExecutor executor,
|
||||
Clock clock, ThreadPool threadPool, Watch.Parser parser, ClusterService clusterService,
|
||||
Client client) {
|
||||
Clock clock, Watch.Parser parser, ClusterService clusterService, Client client) {
|
||||
super(settings);
|
||||
this.historyStore = historyStore;
|
||||
this.triggeredWatchStore = triggeredWatchStore;
|
||||
|
@ -93,14 +90,13 @@ public class ExecutionService extends AbstractComponent {
|
|||
this.clock = clock;
|
||||
this.defaultThrottlePeriod = DEFAULT_THROTTLE_PERIOD_SETTING.get(settings);
|
||||
this.maxStopTimeout = Watcher.MAX_STOP_TIMEOUT_SETTING.get(settings);
|
||||
this.threadPool = threadPool;
|
||||
this.parser = parser;
|
||||
this.clusterService = clusterService;
|
||||
this.client = client;
|
||||
this.indexDefaultTimeout = settings.getAsTime("xpack.watcher.internal.ops.index.default_timeout", TimeValue.timeValueSeconds(30));
|
||||
}
|
||||
|
||||
public void start(ClusterState state) throws Exception {
|
||||
public void start() throws Exception {
|
||||
if (started.get()) {
|
||||
return;
|
||||
}
|
||||
|
@ -171,13 +167,13 @@ public class ExecutionService extends AbstractComponent {
|
|||
currentExecutions.add(watchExecution.createSnapshot());
|
||||
}
|
||||
// Lets show the longest running watch first:
|
||||
Collections.sort(currentExecutions, Comparator.comparing(WatchExecutionSnapshot::executionTime));
|
||||
currentExecutions.sort(Comparator.comparing(WatchExecutionSnapshot::executionTime));
|
||||
return currentExecutions;
|
||||
}
|
||||
|
||||
public List<QueuedWatch> queuedWatches() {
|
||||
List<Runnable> snapshot = new ArrayList<>();
|
||||
executor.tasks().forEach(t -> snapshot.add(t));
|
||||
executor.tasks().forEach(snapshot::add);
|
||||
if (snapshot.isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
@ -189,8 +185,7 @@ public class ExecutionService extends AbstractComponent {
|
|||
}
|
||||
|
||||
// Lets show the execution that pending the longest first:
|
||||
// Note that the type parameters on comparing are required to make the comparing method work
|
||||
Collections.sort(queuedWatches, Comparator.<QueuedWatch, DateTime>comparing(QueuedWatch::executionTime));
|
||||
queuedWatches.sort(Comparator.comparing(QueuedWatch::executionTime));
|
||||
return queuedWatches;
|
||||
}
|
||||
|
||||
|
@ -389,7 +384,7 @@ public class ExecutionService extends AbstractComponent {
|
|||
*/
|
||||
private void executeAsync(WatchExecutionContext ctx, final TriggeredWatch triggeredWatch) {
|
||||
try {
|
||||
executor.execute(new WatchExecutionTask(ctx));
|
||||
executor.execute(new WatchExecutionTask(ctx, () -> execute(ctx)));
|
||||
} catch (EsRejectedExecutionException e) {
|
||||
String message = "failed to run triggered watch [" + triggeredWatch.id() + "] due to thread pool capacity";
|
||||
WatchRecord record = ctx.abortBeforeExecution(ExecutionState.FAILED, message);
|
||||
|
@ -543,33 +538,37 @@ public class ExecutionService extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
|
||||
private final class WatchExecutionTask implements Runnable {
|
||||
// the watch execution task takes another runnable as parameter
|
||||
// the best solution would be to move the whole execute() method, which is handed over as ctor parameter
|
||||
// over into this class, this is the quicker way though
|
||||
static final class WatchExecutionTask implements Runnable {
|
||||
|
||||
private final WatchExecutionContext ctx;
|
||||
private final Runnable runnable;
|
||||
|
||||
private WatchExecutionTask(WatchExecutionContext ctx) {
|
||||
WatchExecutionTask(WatchExecutionContext ctx, Runnable runnable) {
|
||||
this.ctx = ctx;
|
||||
this.runnable = runnable;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
execute(ctx);
|
||||
runnable.run();
|
||||
}
|
||||
}
|
||||
|
||||
public static class WatchExecution {
|
||||
static class WatchExecution {
|
||||
|
||||
private final WatchExecutionContext context;
|
||||
private final Thread executionThread;
|
||||
|
||||
public WatchExecution(WatchExecutionContext context, Thread executionThread) {
|
||||
WatchExecution(WatchExecutionContext context, Thread executionThread) {
|
||||
this.context = context;
|
||||
this.executionThread = executionThread;
|
||||
}
|
||||
|
||||
public WatchExecutionSnapshot createSnapshot() {
|
||||
WatchExecutionSnapshot createSnapshot() {
|
||||
return context.createSnapshot(executionThread);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,17 +11,14 @@ import org.elasticsearch.action.ActionListener;
|
|||
import org.elasticsearch.action.get.GetRequest;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.index.get.GetResult;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.support.clock.ClockMock;
|
||||
import org.elasticsearch.xpack.watcher.actions.Action;
|
||||
import org.elasticsearch.xpack.watcher.actions.ActionStatus;
|
||||
|
@ -39,34 +36,39 @@ import org.elasticsearch.xpack.watcher.input.Input;
|
|||
import org.elasticsearch.xpack.watcher.support.xcontent.XContentSource;
|
||||
import org.elasticsearch.xpack.watcher.transform.ExecutableTransform;
|
||||
import org.elasticsearch.xpack.watcher.transform.Transform;
|
||||
import org.elasticsearch.xpack.watcher.trigger.TriggerEvent;
|
||||
import org.elasticsearch.xpack.watcher.trigger.manual.ManualTriggerEvent;
|
||||
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent;
|
||||
import org.elasticsearch.xpack.watcher.watch.Payload;
|
||||
import org.elasticsearch.xpack.watcher.watch.Watch;
|
||||
import org.elasticsearch.xpack.watcher.watch.WatchStatus;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.DateTimeZone;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.time.Clock;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
import static java.util.Arrays.asList;
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
import static org.joda.time.DateTime.now;
|
||||
import static org.joda.time.DateTimeZone.UTC;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyObject;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.eq;
|
||||
|
@ -87,7 +89,6 @@ public class ExecutionServiceTests extends ESTestCase {
|
|||
private HistoryStore historyStore;
|
||||
private ExecutionService executionService;
|
||||
private Clock clock;
|
||||
private ThreadPool threadPool;
|
||||
private Client client;
|
||||
private Watch.Parser parser;
|
||||
|
||||
|
@ -107,8 +108,6 @@ public class ExecutionServiceTests extends ESTestCase {
|
|||
when(executor.queue()).thenReturn(new ArrayBlockingQueue<>(1));
|
||||
|
||||
clock = ClockMock.frozen();
|
||||
threadPool = mock(ThreadPool.class);
|
||||
|
||||
client = mock(Client.class);
|
||||
parser = mock(Watch.Parser.class);
|
||||
|
||||
|
@ -117,11 +116,10 @@ public class ExecutionServiceTests extends ESTestCase {
|
|||
ClusterService clusterService = mock(ClusterService.class);
|
||||
when(clusterService.localNode()).thenReturn(discoveryNode);
|
||||
|
||||
executionService = new ExecutionService(Settings.EMPTY, historyStore, triggeredWatchStore, executor, clock, threadPool,
|
||||
parser, clusterService, client);
|
||||
executionService = new ExecutionService(Settings.EMPTY, historyStore, triggeredWatchStore, executor, clock, parser,
|
||||
clusterService, client);
|
||||
|
||||
ClusterState clusterState = mock(ClusterState.class);
|
||||
executionService.start(clusterState);
|
||||
executionService.start();
|
||||
}
|
||||
|
||||
public void testExecute() throws Exception {
|
||||
|
@ -495,7 +493,7 @@ public class ExecutionServiceTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testExecuteInner() throws Exception {
|
||||
DateTime now = now(DateTimeZone.UTC);
|
||||
DateTime now = now(UTC);
|
||||
Watch watch = mock(Watch.class);
|
||||
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
|
||||
WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5));
|
||||
|
@ -570,7 +568,7 @@ public class ExecutionServiceTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testExecuteInnerThrottled() throws Exception {
|
||||
DateTime now = now(DateTimeZone.UTC);
|
||||
DateTime now = now(UTC);
|
||||
Watch watch = mock(Watch.class);
|
||||
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
|
||||
WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5));
|
||||
|
@ -621,7 +619,7 @@ public class ExecutionServiceTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testExecuteInnerConditionNotMet() throws Exception {
|
||||
DateTime now = now(DateTimeZone.UTC);
|
||||
DateTime now = now(UTC);
|
||||
Watch watch = mock(Watch.class);
|
||||
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
|
||||
WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5));
|
||||
|
@ -682,7 +680,7 @@ public class ExecutionServiceTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testExecuteInnerConditionNotMetDueToException() throws Exception {
|
||||
DateTime now = DateTime.now(DateTimeZone.UTC);
|
||||
DateTime now = DateTime.now(UTC);
|
||||
Watch watch = mock(Watch.class);
|
||||
when(watch.id()).thenReturn(getTestName());
|
||||
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
|
||||
|
@ -737,7 +735,7 @@ public class ExecutionServiceTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testExecuteConditionNotMet() throws Exception {
|
||||
DateTime now = DateTime.now(DateTimeZone.UTC);
|
||||
DateTime now = DateTime.now(UTC);
|
||||
Watch watch = mock(Watch.class);
|
||||
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
|
||||
WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5));
|
||||
|
@ -792,9 +790,6 @@ public class ExecutionServiceTests extends ESTestCase {
|
|||
|
||||
Wid wid = new Wid(watch.id(), now());
|
||||
|
||||
final ExecutorService currentThreadExecutor = EsExecutors.newDirectExecutorService();
|
||||
when(threadPool.generic()).thenReturn(currentThreadExecutor);
|
||||
|
||||
TriggeredWatch triggeredWatch = new TriggeredWatch(wid, new ScheduleTriggerEvent(now() ,now()));
|
||||
executionService.executeTriggeredWatches(Collections.singleton(triggeredWatch));
|
||||
|
||||
|
@ -912,6 +907,59 @@ public class ExecutionServiceTests extends ESTestCase {
|
|||
assertThat(watchRecord.state(), is(ExecutionState.EXECUTION_NOT_NEEDED));
|
||||
}
|
||||
|
||||
public void testCurrentExecutionSnapshots() throws Exception {
|
||||
DateTime time = DateTime.now(UTC);
|
||||
int snapshotCount = randomIntBetween(2, 8);
|
||||
for (int i = 0; i < snapshotCount; i++) {
|
||||
time = time.minusSeconds(10);
|
||||
WatchExecutionContext ctx = createMockWatchExecutionContext("_id" + i, time);
|
||||
executionService.getCurrentExecutions().put("_id" + i, new ExecutionService.WatchExecution(ctx, Thread.currentThread()));
|
||||
}
|
||||
|
||||
List<WatchExecutionSnapshot> snapshots = executionService.currentExecutions();
|
||||
assertThat(snapshots, hasSize(snapshotCount));
|
||||
assertThat(snapshots.get(0).watchId(), is("_id" + (snapshotCount-1)));
|
||||
assertThat(snapshots.get(snapshots.size() - 1).watchId(), is("_id0"));
|
||||
}
|
||||
|
||||
public void testQueuedWatches() throws Exception {
|
||||
Collection<Runnable> tasks = new ArrayList<>();
|
||||
DateTime time = DateTime.now(UTC);
|
||||
int queuedWatchCount = randomIntBetween(2, 8);
|
||||
for (int i = 0; i < queuedWatchCount; i++) {
|
||||
time = time.minusSeconds(10);
|
||||
WatchExecutionContext ctx = createMockWatchExecutionContext("_id" + i, time);
|
||||
tasks.add(new ExecutionService.WatchExecutionTask(ctx, () -> logger.info("this will never be called")));
|
||||
}
|
||||
|
||||
when(executor.tasks()).thenReturn(tasks.stream());
|
||||
|
||||
List<QueuedWatch> queuedWatches = executionService.queuedWatches();
|
||||
assertThat(queuedWatches, hasSize(queuedWatchCount));
|
||||
assertThat(queuedWatches.get(0).watchId(), is("_id" + (queuedWatchCount-1)));
|
||||
assertThat(queuedWatches.get(queuedWatches.size() - 1).watchId(), is("_id0"));
|
||||
}
|
||||
|
||||
private WatchExecutionContext createMockWatchExecutionContext(String watchId, DateTime executionTime) {
|
||||
WatchExecutionContext ctx = mock(WatchExecutionContext.class);
|
||||
when(ctx.id()).thenReturn(new Wid(watchId, executionTime));
|
||||
when(ctx.executionTime()).thenReturn(executionTime);
|
||||
when(ctx.executionPhase()).thenReturn(ExecutionPhase.INPUT);
|
||||
|
||||
TriggerEvent triggerEvent = mock(TriggerEvent.class);
|
||||
when(triggerEvent.triggeredTime()).thenReturn(executionTime.minusSeconds(1));
|
||||
when(ctx.triggerEvent()).thenReturn(triggerEvent);
|
||||
|
||||
Watch watch = mock(Watch.class);
|
||||
when(watch.id()).thenReturn(watchId);
|
||||
when(ctx.watch()).thenReturn(watch);
|
||||
|
||||
WatchExecutionSnapshot snapshot = new WatchExecutionSnapshot(ctx, new StackTraceElement[]{});
|
||||
when(ctx.createSnapshot(anyObject())).thenReturn(snapshot);
|
||||
|
||||
return ctx;
|
||||
}
|
||||
|
||||
private Tuple<Condition, Condition.Result> whenCondition(final WatchExecutionContext context) {
|
||||
Condition.Result conditionResult = mock(Condition.Result.class);
|
||||
when(conditionResult.met()).thenReturn(true);
|
||||
|
|
|
@ -1,213 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.watcher.transport.action.stats;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.script.MockScriptPlugin;
|
||||
import org.elasticsearch.script.Script;
|
||||
import org.elasticsearch.script.ScriptType;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.elasticsearch.xpack.watcher.WatcherState;
|
||||
import org.elasticsearch.xpack.watcher.actions.ActionBuilders;
|
||||
import org.elasticsearch.xpack.watcher.condition.ScriptCondition;
|
||||
import org.elasticsearch.xpack.watcher.execution.ExecutionPhase;
|
||||
import org.elasticsearch.xpack.watcher.execution.QueuedWatch;
|
||||
import org.elasticsearch.xpack.watcher.execution.WatchExecutionSnapshot;
|
||||
import org.elasticsearch.xpack.watcher.input.InputBuilders;
|
||||
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
|
||||
import org.elasticsearch.xpack.watcher.transport.actions.stats.WatcherStatsResponse;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.joda.time.DateTime;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE;
|
||||
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
|
||||
import static org.elasticsearch.xpack.watcher.input.InputBuilders.noneInput;
|
||||
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
|
||||
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval;
|
||||
import static org.hamcrest.Matchers.anyOf;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
|
||||
@ESIntegTestCase.ClusterScope(scope = SUITE, numClientNodes = 0, transportClientRatio = 0,
|
||||
numDataNodes = 1, supportsDedicatedMasters = false)
|
||||
public class WatchStatsTests extends AbstractWatcherIntegrationTestCase {
|
||||
|
||||
private static CountDownLatch scriptStartedLatch = new CountDownLatch(1);
|
||||
private static CountDownLatch scriptCompletionLatch = new CountDownLatch(1);
|
||||
private static Logger logger = ESLoggerFactory.getLogger(WatchStatsTests.class);
|
||||
|
||||
public static class LatchScriptPlugin extends MockScriptPlugin {
|
||||
|
||||
private static final String NAME = "latch";
|
||||
|
||||
protected Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
|
||||
return Collections.singletonMap(NAME, p -> {
|
||||
scriptStartedLatch.countDown();
|
||||
try {
|
||||
if (scriptCompletionLatch.await(10, TimeUnit.SECONDS) == false) {
|
||||
logger.error("Script completion latch was not counted down after 10 seconds");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
return true;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
static Script latchScript() {
|
||||
return new Script(ScriptType.INLINE, "mockscript", LatchScriptPlugin.NAME, Collections.emptyMap());
|
||||
}
|
||||
|
||||
public void awaitScriptStartedExecution() throws InterruptedException {
|
||||
if (scriptStartedLatch.await(10, TimeUnit.SECONDS) == false) {
|
||||
throw new ElasticsearchException("Expected script to be called within 10 seconds, did not happen");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean timeWarped() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean enableSecurity() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
|
||||
plugins.add(LatchScriptPlugin.class);
|
||||
return plugins;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
return Settings.builder()
|
||||
.put(super.nodeSettings(nodeOrdinal))
|
||||
// So it is predictable how many slow watches we need to add to accumulate pending watches
|
||||
.put(EsExecutors.PROCESSORS_SETTING.getKey(), "1")
|
||||
.build();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void createLatches() {
|
||||
scriptStartedLatch = new CountDownLatch(1);
|
||||
scriptCompletionLatch = new CountDownLatch(1);
|
||||
}
|
||||
|
||||
@After
|
||||
public void clearLatches() throws InterruptedException {
|
||||
scriptCompletionLatch.countDown();
|
||||
boolean countedDown = scriptCompletionLatch.await(10, TimeUnit.SECONDS);
|
||||
String msg = String.format(Locale.ROOT, "Script completion latch value is [%s], but should be 0", scriptCompletionLatch.getCount());
|
||||
assertThat(msg, countedDown, Matchers.is(true));
|
||||
}
|
||||
|
||||
@TestLogging("org.elasticsearch.xpack.watcher.trigger.schedule.engine:TRACE,org.elasticsearch.xpack.scheduler:TRACE,org.elasticsearch" +
|
||||
".xpack.watcher.execution:TRACE,org.elasticsearch.xpack.watcher.test:TRACE")
|
||||
public void testCurrentWatches() throws Exception {
|
||||
watcherClient().preparePutWatch("_id").setSource(watchBuilder()
|
||||
.trigger(schedule(interval("1s")))
|
||||
.input(InputBuilders.simpleInput("key", "value"))
|
||||
.condition(new ScriptCondition(latchScript()))
|
||||
.addAction("_action", ActionBuilders.loggingAction("some logging"))
|
||||
).get();
|
||||
|
||||
logger.info("Waiting for first script invocation");
|
||||
awaitScriptStartedExecution();
|
||||
logger.info("First script got executed, checking currently running watches");
|
||||
|
||||
WatcherStatsResponse response = watcherClient().prepareWatcherStats().setIncludeCurrentWatches(true).get();
|
||||
boolean watcherStarted = response.getNodes().stream().anyMatch(node -> node.getWatcherState() == WatcherState.STARTED);
|
||||
assertThat(watcherStarted, is(true));
|
||||
|
||||
assertThat(response.getWatchesCount(), equalTo(1L));
|
||||
assertThat(getQueuedWatches(response), hasSize(0));
|
||||
List<WatchExecutionSnapshot> snapshots = getSnapshots(response);
|
||||
assertThat(snapshots, notNullValue());
|
||||
assertThat(snapshots, hasSize(1));
|
||||
assertThat(snapshots.get(0).watchId(), equalTo("_id"));
|
||||
assertThat(snapshots.get(0).executionPhase(), equalTo(ExecutionPhase.CONDITION));
|
||||
}
|
||||
|
||||
@TestLogging("org.elasticsearch.xpack.watcher.trigger.schedule.engine:TRACE,org.elasticsearch.xpack.scheduler:TRACE,org.elasticsearch" +
|
||||
".xpack.watcher.execution:TRACE,org.elasticsearch.xpack.watcher.test:TRACE")
|
||||
public void testPendingWatches() throws Exception {
|
||||
// Add 5 slow watches and we should almost immediately see pending watches in the stats api
|
||||
for (int i = 0; i < 5; i++) {
|
||||
watcherClient().preparePutWatch("_id" + i).setSource(watchBuilder()
|
||||
.trigger(schedule(interval("1s")))
|
||||
.input(noneInput())
|
||||
.condition(new ScriptCondition(latchScript()))
|
||||
.addAction("_action", ActionBuilders.loggingAction("some logging"))
|
||||
).get();
|
||||
}
|
||||
|
||||
logger.info("Waiting for first script invocation");
|
||||
awaitScriptStartedExecution();
|
||||
// I know this still sucks, but it is still way faster than the older implementation
|
||||
logger.info("Sleeping 2.5 seconds to make sure a new round of watches is queued");
|
||||
Thread.sleep(2500);
|
||||
logger.info("Sleeping done, checking stats response");
|
||||
|
||||
WatcherStatsResponse response = watcherClient().prepareWatcherStats().setIncludeQueuedWatches(true).get();
|
||||
boolean watcherStarted = response.getNodes().stream().allMatch(node -> node.getWatcherState() == WatcherState.STARTED);
|
||||
assertThat(watcherStarted, is(true));
|
||||
assertThat(response.getWatchesCount(), equalTo(5L));
|
||||
assertThat(getSnapshots(response), hasSize(0));
|
||||
assertThat(getQueuedWatches(response), hasSize(greaterThanOrEqualTo(5)));
|
||||
DateTime previous = null;
|
||||
for (QueuedWatch queuedWatch : getQueuedWatches(response)) {
|
||||
assertThat(queuedWatch.watchId(),
|
||||
anyOf(equalTo("_id0"), equalTo("_id1"), equalTo("_id2"), equalTo("_id3"), equalTo("_id4")));
|
||||
if (previous != null) {
|
||||
// older pending watch should be on top:
|
||||
assertThat(previous.getMillis(), lessThanOrEqualTo(queuedWatch.executionTime().getMillis()));
|
||||
}
|
||||
previous = queuedWatch.executionTime();
|
||||
}
|
||||
logger.info("Pending watches test finished, now counting down latches");
|
||||
}
|
||||
|
||||
private List<WatchExecutionSnapshot> getSnapshots(WatcherStatsResponse response) {
|
||||
List<WatchExecutionSnapshot> snapshots = new ArrayList<>();
|
||||
response.getNodes().stream()
|
||||
.filter(node -> node.getSnapshots() != null)
|
||||
.forEach(node -> snapshots.addAll(node.getSnapshots()));
|
||||
return snapshots;
|
||||
}
|
||||
|
||||
private List<QueuedWatch> getQueuedWatches(WatcherStatsResponse response) {
|
||||
final List<QueuedWatch> queuedWatches = new ArrayList<>();
|
||||
response.getNodes().stream()
|
||||
.filter(node -> node.getQueuedWatches() != null)
|
||||
.forEach(node -> queuedWatches.addAll(node.getQueuedWatches()));
|
||||
return queuedWatches;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue