diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java index 82f4722daa5..b8e93eef2ef 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java @@ -28,7 +28,6 @@ import org.elasticsearch.plugins.ScriptPlugin; import org.elasticsearch.rest.RestHandler; import org.elasticsearch.script.ScriptContext; import org.elasticsearch.script.ScriptService; -import org.elasticsearch.script.ScriptSettings; import org.elasticsearch.search.SearchRequestParsers; import org.elasticsearch.threadpool.ExecutorBuilder; import org.elasticsearch.threadpool.FixedExecutorBuilder; @@ -141,7 +140,6 @@ import org.elasticsearch.xpack.watcher.trigger.schedule.YearlySchedule; import org.elasticsearch.xpack.watcher.trigger.schedule.engine.SchedulerScheduleTriggerEngine; import org.elasticsearch.xpack.watcher.trigger.schedule.engine.TickerScheduleTriggerEngine; import org.elasticsearch.xpack.watcher.watch.Watch; -import org.elasticsearch.xpack.watcher.watch.WatchLockService; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -281,12 +279,11 @@ public class Watcher implements ActionPlugin, ScriptPlugin { final WatcherSearchTemplateService watcherSearchTemplateService = new WatcherSearchTemplateService(settings, scriptService, searchRequestParsers); - final WatchLockService watchLockService = new WatchLockService(settings); final WatchExecutor watchExecutor = getWatchExecutor(threadPool); final Watch.Parser watchParser = new Watch.Parser(settings, triggerService, registry, inputRegistry, cryptoService, clock); final ExecutionService executionService = new ExecutionService(settings, historyStore, triggeredWatchStore, watchExecutor, - watchLockService, clock, threadPool, watchParser, watcherClientProxy); + clock, threadPool, watchParser, watcherClientProxy); final TriggerEngine.Listener triggerEngineListener = getTriggerEngineListener(executionService); triggerService.register(triggerEngineListener); @@ -294,7 +291,7 @@ public class Watcher implements ActionPlugin, ScriptPlugin { final WatcherIndexTemplateRegistry watcherIndexTemplateRegistry = new WatcherIndexTemplateRegistry(settings, clusterService.getClusterSettings(), clusterService, threadPool, internalClient); - final WatcherService watcherService = new WatcherService(settings, triggerService, executionService, watchLockService, + final WatcherService watcherService = new WatcherService(settings, triggerService, executionService, watcherIndexTemplateRegistry, watchParser, watcherClientProxy); final WatcherLifeCycleService watcherLifeCycleService = diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java index 9c3eff4c556..f1d729e22d4 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java @@ -9,7 +9,6 @@ package org.elasticsearch.xpack.watcher; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.search.SearchRequest; @@ -27,7 +26,6 @@ import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry; import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy; import org.elasticsearch.xpack.watcher.trigger.TriggerService; import org.elasticsearch.xpack.watcher.watch.Watch; -import org.elasticsearch.xpack.watcher.watch.WatchLockService; import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils; import java.util.ArrayList; @@ -45,7 +43,6 @@ import static org.elasticsearch.xpack.watcher.watch.Watch.INDEX; public class WatcherService extends AbstractComponent { private final TriggerService triggerService; - private final WatchLockService watchLockService; private final ExecutionService executionService; private final WatcherIndexTemplateRegistry watcherIndexTemplateRegistry; // package-private for testing @@ -55,12 +52,10 @@ public class WatcherService extends AbstractComponent { private final Watch.Parser parser; private final WatcherClientProxy client; - public WatcherService(Settings settings, TriggerService triggerService, - ExecutionService executionService, WatchLockService watchLockService, + public WatcherService(Settings settings, TriggerService triggerService, ExecutionService executionService, WatcherIndexTemplateRegistry watcherIndexTemplateRegistry, Watch.Parser parser, WatcherClientProxy client) { super(settings); this.triggerService = triggerService; - this.watchLockService = watchLockService; this.executionService = executionService; this.watcherIndexTemplateRegistry = watcherIndexTemplateRegistry; this.scrollTimeout = settings.getAsTime("xpack.watcher.watch.scroll.timeout", TimeValue.timeValueSeconds(30)); @@ -74,7 +69,6 @@ public class WatcherService extends AbstractComponent { try { logger.debug("starting watch service..."); watcherIndexTemplateRegistry.addTemplatesIfMissing(); - watchLockService.start(); executionService.start(clusterState); triggerService.start(loadWatches(clusterState)); @@ -98,11 +92,6 @@ public class WatcherService extends AbstractComponent { logger.debug("stopping watch service..."); triggerService.stop(); executionService.stop(); - try { - watchLockService.stop(); - } catch (ElasticsearchTimeoutException te) { - logger.warn("error stopping WatchLockService", te); - } state.set(WatcherState.STOPPED); logger.debug("watch service has stopped"); } else { diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/execution/CurrentExecutions.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/execution/CurrentExecutions.java index 84532fe9201..58abc9d05ae 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/execution/CurrentExecutions.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/execution/CurrentExecutions.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.watcher.execution; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.common.unit.TimeValue; import java.util.Iterator; @@ -16,22 +17,31 @@ import java.util.concurrent.locks.ReentrantLock; import static org.elasticsearch.xpack.watcher.support.Exceptions.illegalState; -public class CurrentExecutions implements Iterable<ExecutionService.WatchExecution> { +public final class CurrentExecutions implements Iterable<ExecutionService.WatchExecution> { private final ConcurrentMap<String, ExecutionService.WatchExecution> currentExecutions = new ConcurrentHashMap<>(); + // the condition of the lock is used to wait and signal the finishing of all executions on shutdown private final ReentrantLock lock = new ReentrantLock(); private final Condition empty = lock.newCondition(); - private boolean seal = false; + // a marker to not accept new executions, used when the watch service is powered down + private SetOnce<Boolean> seal = new SetOnce<>(); - public void put(String id, ExecutionService.WatchExecution execution) { + /** + * Tries to put an watch execution class for a watch in the current executions + * + * @param id The id of the watch + * @param execution The watch execution class + * @return Returns true if watch with id already is in the current executions class, false otherwise + */ + public boolean put(String id, ExecutionService.WatchExecution execution) { lock.lock(); try { - if (seal) { + if (seal.get() != null) { // We shouldn't get here, because, ExecutionService#started should have been set to false throw illegalState("could not register execution [{}]. current executions are sealed and forbid registrations of " + "additional executions.", id); } - currentExecutions.put(id, execution); + return currentExecutions.putIfAbsent(id, execution) != null; } finally { lock.unlock(); } @@ -49,16 +59,22 @@ public class CurrentExecutions implements Iterable<ExecutionService.WatchExecuti } } - public void sealAndAwaitEmpty(TimeValue maxStopTimeout) { + /** + * Calling this method makes the class stop accepting new executions and throws and exception instead. + * In addition it waits for a certain amount of time for current executions to finish before returning + * + * @param maxStopTimeout The maximum wait time to wait to current executions to finish + */ + void sealAndAwaitEmpty(TimeValue maxStopTimeout) { + lock.lock(); // We may have current executions still going on. // We should try to wait for the current executions to have completed. // Otherwise we can run into a situation where we didn't delete the watch from the .triggered_watches index, // but did insert into the history index. Upon start this can lead to DocumentAlreadyExistsException, // because we already stored the history record during shutdown... // (we always first store the watch record and then remove the triggered watch) - lock.lock(); try { - seal = true; + seal.set(true); while (currentExecutions.size() > 0) { empty.await(maxStopTimeout.millis(), TimeUnit.MILLISECONDS); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index b9120b1f4b6..9caf3a5aa91 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -12,7 +12,6 @@ import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -30,7 +29,6 @@ import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy; import org.elasticsearch.xpack.watcher.transform.Transform; import org.elasticsearch.xpack.watcher.trigger.TriggerEvent; import org.elasticsearch.xpack.watcher.watch.Watch; -import org.elasticsearch.xpack.watcher.watch.WatchLockService; import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils; import org.joda.time.DateTime; @@ -48,7 +46,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.joda.time.DateTimeZone.UTC; -public class ExecutionService extends AbstractComponent { +public final class ExecutionService extends AbstractComponent { public static final Setting<TimeValue> DEFAULT_THROTTLE_PERIOD_SETTING = Setting.positiveTimeSetting("xpack.watcher.execution.default_throttle_period", @@ -60,7 +58,6 @@ public class ExecutionService extends AbstractComponent { private final HistoryStore historyStore; private final TriggeredWatchStore triggeredWatchStore; private final WatchExecutor executor; - private final WatchLockService watchLockService; private final Clock clock; private final TimeValue defaultThrottlePeriod; private final TimeValue maxStopTimeout; @@ -72,13 +69,12 @@ public class ExecutionService extends AbstractComponent { private final AtomicBoolean started = new AtomicBoolean(false); public ExecutionService(Settings settings, HistoryStore historyStore, TriggeredWatchStore triggeredWatchStore, WatchExecutor executor, - WatchLockService watchLockService, Clock clock, ThreadPool threadPool, Watch.Parser parser, + Clock clock, ThreadPool threadPool, Watch.Parser parser, WatcherClientProxy client) { super(settings); this.historyStore = historyStore; this.triggeredWatchStore = triggeredWatchStore; this.executor = executor; - this.watchLockService = watchLockService; this.clock = clock; this.defaultThrottlePeriod = DEFAULT_THROTTLE_PERIOD_SETTING.get(settings); this.maxStopTimeout = Watcher.MAX_STOP_TIMEOUT_SETTING.get(settings); @@ -153,6 +149,11 @@ public class ExecutionService extends AbstractComponent { return executor.largestPoolSize(); } + // for testing only + CurrentExecutions getCurrentExecutions() { + return currentExecutions; + } + public List<WatchExecutionSnapshot> currentExecutions() { List<WatchExecutionSnapshot> currentExecutions = new ArrayList<>(); for (WatchExecution watchExecution : this.currentExecutions) { @@ -175,8 +176,8 @@ public class ExecutionService extends AbstractComponent { WatchExecutionTask executionTask = (WatchExecutionTask) task; queuedWatches.add(new QueuedWatch(executionTask.ctx)); } - // Lets show the execution that pending the longest first: + // Lets show the execution that pending the longest first: Collections.sort(queuedWatches, Comparator.comparing(QueuedWatch::executionTime)); return queuedWatches; } @@ -255,27 +256,27 @@ public class ExecutionService extends AbstractComponent { public WatchRecord execute(WatchExecutionContext ctx) { WatchRecord record = null; - Releasable releasable = watchLockService.acquire(ctx.watch().id()); - if (logger.isTraceEnabled()) { - logger.trace("acquired lock for [{}] -- [{}]", ctx.id(), System.identityHashCode(releasable)); - } try { - currentExecutions.put(ctx.watch().id(), new WatchExecution(ctx, Thread.currentThread())); - final AtomicBoolean watchExists = new AtomicBoolean(true); - client.getWatch(ctx.watch().id(), ActionListener.wrap((r) -> watchExists.set(r.isExists()), (e) -> watchExists.set(false))); - - if (ctx.knownWatch() && watchExists.get() == false) { - // fail fast if we are trying to execute a deleted watch - String message = "unable to find watch for record [" + ctx.id() + "], perhaps it has been deleted, ignoring..."; - logger.warn("{}", message); - record = ctx.abortBeforeExecution(ExecutionState.NOT_EXECUTED_WATCH_MISSING, message); - + boolean executionAlreadyExists = currentExecutions.put(ctx.watch().id(), new WatchExecution(ctx, Thread.currentThread())); + if (executionAlreadyExists) { + logger.trace("not executing watch [{}] because it is already queued", ctx.watch().id()); + record = ctx.abortBeforeExecution(ExecutionState.NOT_EXECUTED_ALREADY_QUEUED, "Watch is already queued in thread pool"); } else { - logger.debug("executing watch [{}]", ctx.id().watchId()); + final AtomicBoolean watchExists = new AtomicBoolean(true); + client.getWatch(ctx.watch().id(), ActionListener.wrap((r) -> watchExists.set(r.isExists()), (e) -> watchExists.set(false))); - record = executeInner(ctx); - if (ctx.recordExecution()) { - client.updateWatchStatus(ctx.watch()); + if (ctx.knownWatch() && watchExists.get() == false) { + // fail fast if we are trying to execute a deleted watch + String message = "unable to find watch for record [" + ctx.id() + "], perhaps it has been deleted, ignoring..."; + record = ctx.abortBeforeExecution(ExecutionState.NOT_EXECUTED_WATCH_MISSING, message); + + } else { + logger.debug("executing watch [{}]", ctx.id().watchId()); + + record = executeInner(ctx); + if (ctx.recordExecution()) { + client.updateWatchStatus(ctx.watch()); + } } } } catch (Exception e) { @@ -300,10 +301,6 @@ public class ExecutionService extends AbstractComponent { logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to delete triggered watch [{}]", ctx.id()), e); } currentExecutions.remove(ctx.watch().id()); - if (logger.isTraceEnabled()) { - logger.trace("releasing lock for [{}] -- [{}]", ctx.id(), System.identityHashCode(releasable)); - } - releasable.close(); logger.trace("finished [{}]/[{}]", ctx.watch().id(), ctx.id()); } return record; @@ -443,7 +440,7 @@ public class ExecutionService extends AbstractComponent { counter++; } } - logger.debug("executed [{}] watches from the watch history", counter); + logger.debug("triggered execution of [{}] watches", counter); } public Map<String, Object> usageStats() { @@ -490,12 +487,7 @@ public class ExecutionService extends AbstractComponent { @Override public void run() { - try { - execute(ctx); - } catch (Exception e) { - logger.error( - (Supplier<?>) () -> new ParameterizedMessage("could not execute watch [{}]/[{}]", ctx.watch().id(), ctx.id()), e); - } + execute(ctx); } } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionState.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionState.java index e53863eb5d5..1956b362435 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionState.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionState.java @@ -9,11 +9,27 @@ import java.util.Locale; public enum ExecutionState { + // the condition of the watch was not met EXECUTION_NOT_NEEDED, + + // Execution has been throttled due to ack/time-based throttling THROTTLED, + + // regular execution EXECUTED, + + // an error in the condition or the execution of the input FAILED, + + // the execution was scheduled, but in between the watch was deleted NOT_EXECUTED_WATCH_MISSING, + + // even though the execution was scheduled, it was not executed, because the watch was already queued in the thread pool + NOT_EXECUTED_ALREADY_QUEUED, + + // this can happen when a watch was executed, but not completely finished (the triggered watch entry was not deleted), and then + // watcher is restarted (manually or due to host switch) - the triggered watch will be executed but the history entry already + // exists EXECUTED_MULTIPLE_TIMES; public String id() { diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/execution/WatchExecutionContext.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/execution/WatchExecutionContext.java index 0f81351c714..ec0c7ff7e98 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/execution/WatchExecutionContext.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/execution/WatchExecutionContext.java @@ -42,7 +42,7 @@ public abstract class WatchExecutionContext { private ConcurrentMap<String, ActionWrapper.Result> actionsResults = ConcurrentCollections.newConcurrentMap(); public WatchExecutionContext(Watch watch, DateTime executionTime, TriggerEvent triggerEvent, TimeValue defaultThrottlePeriod) { - this.id = new Wid(watch.id(), watch.nonce(), executionTime); + this.id = new Wid(watch.id(), executionTime); this.watch = watch; this.executionTime = executionTime; this.triggerEvent = triggerEvent; diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/execution/Wid.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/execution/Wid.java index 36f614c782c..e0767daa9d0 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/execution/Wid.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/execution/Wid.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.watcher.execution; +import org.elasticsearch.common.UUIDs; import org.joda.time.DateTime; import org.joda.time.format.DateTimeFormatter; import org.joda.time.format.ISODateTimeFormat; @@ -19,9 +20,9 @@ public class Wid { private final String value; - public Wid(String watchId, long nonce, DateTime executionTime) { + public Wid(String watchId, DateTime executionTime) { this.watchId = watchId; - this.value = watchId + "_" + String.valueOf(nonce) + "-" + formatter.print(executionTime); + this.value = watchId + "_" + UUIDs.base64UUID().replaceAll("_", "-") + "-" + formatter.print(executionTime); } public Wid(String value) { diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/history/HistoryStore.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/history/HistoryStore.java index 42536b3b96f..c2872128ba6 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/history/HistoryStore.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/history/HistoryStore.java @@ -104,9 +104,8 @@ public class HistoryStore extends AbstractComponent { .opType(IndexRequest.OpType.CREATE); client.index(request, (TimeValue) null); } catch (VersionConflictEngineException vcee) { - logger.warn("watch record [{}] has executed multiple times, this can happen during watcher restarts", watchRecord); watchRecord = new WatchRecord.MessageWatchRecord(watchRecord, ExecutionState.EXECUTED_MULTIPLE_TIMES, - "watch record has been stored before, previous state [" + watchRecord.state() + "]"); + "watch record [{ " + watchRecord.id() + " }] has been stored before, previous state [" + watchRecord.state() + "]"); IndexRequest request = new IndexRequest(index, DOC_TYPE, watchRecord.id().value()) .source(XContentFactory.jsonBuilder().value(watchRecord)); client.index(request, (TimeValue) null); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/watch/Watch.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/watch/Watch.java index 3ef8453b099..3ce74818699 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/watch/Watch.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/watch/Watch.java @@ -72,8 +72,6 @@ public class Watch implements TriggerEngine.Job, ToXContent { @Nullable private final Map<String, Object> metadata; private final WatchStatus status; - private final transient AtomicLong nonceCounter = new AtomicLong(); - private transient long version = Versions.MATCH_ANY; public Watch(String id, Trigger trigger, ExecutableInput input, Condition condition, @Nullable ExecutableTransform transform, @@ -157,10 +155,6 @@ public class Watch implements TriggerEngine.Job, ToXContent { return actionStatus.ackStatus().state() == ActionStatus.AckStatus.State.ACKED; } - public long nonce() { - return nonceCounter.getAndIncrement(); - } - @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchLockService.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchLockService.java deleted file mode 100644 index 987702d1927..00000000000 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchLockService.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.watcher.watch; - -import org.elasticsearch.ElasticsearchTimeoutException; -import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.lease.Releasable; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.KeyedLock; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.elasticsearch.xpack.watcher.support.Exceptions.illegalState; - -public class WatchLockService extends AbstractComponent { - - public static final String DEFAULT_MAX_STOP_TIMEOUT_SETTING = "xpack.watcher.stop.timeout"; - - private final KeyedLock<String> watchLocks = new KeyedLock<>(true); - private final AtomicBoolean running = new AtomicBoolean(false); - private static final TimeValue DEFAULT_MAX_STOP_TIMEOUT = new TimeValue(30, TimeUnit.SECONDS); - - private final TimeValue maxStopTimeout; - - public WatchLockService(Settings settings){ - super(settings); - maxStopTimeout = settings.getAsTime(DEFAULT_MAX_STOP_TIMEOUT_SETTING, DEFAULT_MAX_STOP_TIMEOUT); - } - - public Releasable acquire(String name) { - if (!running.get()) { - throw illegalState("cannot acquire lock for watch [{}]. lock service is not running", name); - } - - return watchLocks.acquire(name); - } - - public void start() { - if (running.compareAndSet(false, true)) { - // init - } - } - - /** - * @throws ElasticsearchTimeoutException if we have waited longer than maxStopTimeout - */ - public void stop() throws ElasticsearchTimeoutException { - if (running.compareAndSet(true, false)) { - // It can happen we have still ongoing operations and we wait those operations to finish to avoid - // that watch service or any of its components end up in a illegal state after the state as been set to stopped. - // - // For example: A watch action entry may be added while we stopping watcher if we don't wait for - // ongoing operations to complete. Resulting in once the watch service starts again that more than - // expected watch records are processed. - // - // Note: new operations will fail now because the running has been set to false - long startWait = System.currentTimeMillis(); - while (watchLocks.hasLockedKeys()) { - TimeValue timeWaiting = new TimeValue(System.currentTimeMillis() - startWait); - if (timeWaiting.getSeconds() > maxStopTimeout.getSeconds()) { - throw new ElasticsearchTimeoutException("timed out waiting for watches to complete, after waiting for [{}]", - timeWaiting); - } - try { - Thread.sleep(100); - } catch (InterruptedException ie) { - } - } - } - } - - KeyedLock<String> getWatchLocks() { - return watchLocks; - } -} diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/notification/email/attachment/HttpEmailAttachementParserTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/notification/email/attachment/HttpEmailAttachementParserTests.java index 59b352a39d7..729f1bede41 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/notification/email/attachment/HttpEmailAttachementParserTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/notification/email/attachment/HttpEmailAttachementParserTests.java @@ -144,7 +144,7 @@ public class HttpEmailAttachementParserTests extends ESTestCase { private WatchExecutionContext createWatchExecutionContext() { DateTime now = DateTime.now(DateTimeZone.UTC); - Wid wid = new Wid(randomAsciiOfLength(5), randomLong(), now); + Wid wid = new Wid(randomAsciiOfLength(5), now); Map<String, Object> metadata = MapBuilder.<String, Object>newMapBuilder().put("_key", "_val").map(); return mockExecutionContextBuilder("watch1") .wid(wid) diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/notification/email/attachment/ReportingAttachmentParserTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/notification/email/attachment/ReportingAttachmentParserTests.java index a390ebf1a43..110e5969988 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/notification/email/attachment/ReportingAttachmentParserTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/notification/email/attachment/ReportingAttachmentParserTests.java @@ -383,7 +383,7 @@ public class ReportingAttachmentParserTests extends ESTestCase { private WatchExecutionContext createWatchExecutionContext() { DateTime now = DateTime.now(DateTimeZone.UTC); return mockExecutionContextBuilder("watch1") - .wid(new Wid(randomAsciiOfLength(5), randomLong(), now)) + .wid(new Wid(randomAsciiOfLength(5), now)) .payload(new Payload.Simple()) .time("watch1", now) .metadata(Collections.emptyMap()) diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/actions/email/EmailActionTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/actions/email/EmailActionTests.java index 1a007d19857..664ab47d612 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/actions/email/EmailActionTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/actions/email/EmailActionTests.java @@ -138,7 +138,7 @@ public class EmailActionTests extends ESTestCase { DateTime now = DateTime.now(DateTimeZone.UTC); - Wid wid = new Wid(randomAsciiOfLength(5), randomLong(), now); + Wid wid = new Wid(randomAsciiOfLength(5), now); WatchExecutionContext ctx = mockExecutionContextBuilder("watch1") .wid(wid) .payload(payload) @@ -543,7 +543,7 @@ public class EmailActionTests extends ESTestCase { emailAttachmentsParser).parseExecutable(randomAsciiOfLength(3), randomAsciiOfLength(7), parser); DateTime now = DateTime.now(DateTimeZone.UTC); - Wid wid = new Wid(randomAsciiOfLength(5), randomLong(), now); + Wid wid = new Wid(randomAsciiOfLength(5), now); Map<String, Object> metadata = MapBuilder.<String, Object>newMapBuilder().put("_key", "_val").map(); WatchExecutionContext ctx = mockExecutionContextBuilder("watch1") .wid(wid) @@ -569,7 +569,7 @@ public class EmailActionTests extends ESTestCase { private WatchExecutionContext createWatchExecutionContext() { DateTime now = DateTime.now(DateTimeZone.UTC); - Wid wid = new Wid(randomAsciiOfLength(5), randomLong(), now); + Wid wid = new Wid(randomAsciiOfLength(5), now); Map<String, Object> metadata = MapBuilder.<String, Object>newMapBuilder().put("_key", "_val").map(); return mockExecutionContextBuilder("watch1") .wid(wid) diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/actions/hipchat/HipChatActionTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/actions/hipchat/HipChatActionTests.java index 70bf3385ebb..253b071b88c 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/actions/hipchat/HipChatActionTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/actions/hipchat/HipChatActionTests.java @@ -74,7 +74,7 @@ public class HipChatActionTests extends ESTestCase { DateTime now = DateTime.now(DateTimeZone.UTC); - Wid wid = new Wid(randomAsciiOfLength(5), randomLong(), now); + Wid wid = new Wid(randomAsciiOfLength(5), now); WatchExecutionContext ctx = mockExecutionContextBuilder(wid.watchId()) .wid(wid) .payload(payload) diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/actions/jira/ExecutableJiraActionTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/actions/jira/ExecutableJiraActionTests.java index af78bad150a..46edfc2b71c 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/actions/jira/ExecutableJiraActionTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/actions/jira/ExecutableJiraActionTests.java @@ -75,7 +75,7 @@ public class ExecutableJiraActionTests extends ESTestCase { DateTime now = DateTime.now(DateTimeZone.UTC); - Wid wid = new Wid(randomAsciiOfLength(5), randomLong(), now); + Wid wid = new Wid(randomAsciiOfLength(5), now); WatchExecutionContext ctx = mockExecutionContextBuilder(wid.watchId()) .wid(wid) .payload(new Payload.Simple()) @@ -286,7 +286,7 @@ public class ExecutableJiraActionTests extends ESTestCase { private WatchExecutionContext createWatchExecutionContext() { DateTime now = DateTime.now(DateTimeZone.UTC); - Wid wid = new Wid(randomAsciiOfLength(5), randomLong(), now); + Wid wid = new Wid(randomAsciiOfLength(5), now); Map<String, Object> metadata = MapBuilder.<String, Object>newMapBuilder().put("_key", "_val").map(); return mockExecutionContextBuilder("watch1") .wid(wid) diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/actions/jira/JiraActionTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/actions/jira/JiraActionTests.java index 710b5720e1e..d5e94e297b3 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/actions/jira/JiraActionTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/actions/jira/JiraActionTests.java @@ -226,7 +226,7 @@ public class JiraActionTests extends ESTestCase { DateTime now = DateTime.now(DateTimeZone.UTC); - Wid wid = new Wid(randomAsciiOfLength(5), randomLong(), now); + Wid wid = new Wid(randomAsciiOfLength(5), now); WatchExecutionContext context = mockExecutionContextBuilder(wid.watchId()) .wid(wid) .payload(payload) diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/actions/pagerduty/PagerDutyActionTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/actions/pagerduty/PagerDutyActionTests.java index ea69b6a1ed9..3dbc6be7ac9 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/actions/pagerduty/PagerDutyActionTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/actions/pagerduty/PagerDutyActionTests.java @@ -79,7 +79,7 @@ public class PagerDutyActionTests extends ESTestCase { DateTime now = DateTime.now(DateTimeZone.UTC); - Wid wid = new Wid(randomAsciiOfLength(5), randomLong(), now); + Wid wid = new Wid(randomAsciiOfLength(5), now); WatchExecutionContext ctx = mockExecutionContextBuilder(wid.watchId()) .wid(wid) .payload(payload) diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/actions/slack/ExecutableSlackActionTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/actions/slack/ExecutableSlackActionTests.java index 7748bbc1a3c..5be26a94e7b 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/actions/slack/ExecutableSlackActionTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/actions/slack/ExecutableSlackActionTests.java @@ -48,7 +48,7 @@ public class ExecutableSlackActionTests extends ESTestCase { DateTime now = DateTime.now(DateTimeZone.UTC); - Wid wid = new Wid(randomAsciiOfLength(5), randomLong(), now); + Wid wid = new Wid(randomAsciiOfLength(5), now); WatchExecutionContext ctx = mockExecutionContextBuilder(wid.watchId()) .wid(wid) .payload(new Payload.Simple()) diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/actions/slack/SlackActionTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/actions/slack/SlackActionTests.java index 35dcd0aad11..ffe641c69f4 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/actions/slack/SlackActionTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/actions/slack/SlackActionTests.java @@ -76,7 +76,7 @@ public class SlackActionTests extends ESTestCase { DateTime now = DateTime.now(DateTimeZone.UTC); - Wid wid = new Wid(randomAsciiOfLength(5), randomLong(), now); + Wid wid = new Wid(randomAsciiOfLength(5), now); WatchExecutionContext ctx = mockExecutionContextBuilder(wid.watchId()) .wid(wid) .payload(payload) diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java index 6dbb6313ec0..1523c5f2435 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java @@ -9,7 +9,6 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; @@ -36,7 +35,6 @@ import org.elasticsearch.xpack.watcher.transform.Transform; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent; import org.elasticsearch.xpack.watcher.watch.Payload; import org.elasticsearch.xpack.watcher.watch.Watch; -import org.elasticsearch.xpack.watcher.watch.WatchLockService; import org.elasticsearch.xpack.watcher.watch.WatchStatus; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -76,7 +74,6 @@ public class ExecutionServiceTests extends ESTestCase { private TriggeredWatchStore triggeredWatchStore; private WatchExecutor executor; private HistoryStore historyStore; - private WatchLockService watchLockService; private ExecutionService executionService; private Clock clock; private ThreadPool threadPool; @@ -98,14 +95,13 @@ public class ExecutionServiceTests extends ESTestCase { executor = mock(WatchExecutor.class); when(executor.queue()).thenReturn(new ArrayBlockingQueue<>(1)); - watchLockService = mock(WatchLockService.class); clock = ClockMock.frozen(); threadPool = mock(ThreadPool.class); client = mock(WatcherClientProxy.class); parser = mock(Watch.Parser.class); - executionService = new ExecutionService(Settings.EMPTY, historyStore, triggeredWatchStore, executor, watchLockService, clock, - threadPool, parser, client); + executionService = new ExecutionService(Settings.EMPTY, historyStore, triggeredWatchStore, executor, clock, threadPool, + parser, client); ClusterState clusterState = mock(ClusterState.class); when(triggeredWatchStore.loadTriggeredWatches(clusterState)).thenReturn(new ArrayList<>()); @@ -113,9 +109,6 @@ public class ExecutionServiceTests extends ESTestCase { } public void testExecute() throws Exception { - Releasable releasable = mock(Releasable.class); - when(watchLockService.acquire("_id")).thenReturn(releasable); - Watch watch = mock(Watch.class); when(watch.id()).thenReturn("_id"); GetResponse getResponse = mock(GetResponse.class); @@ -194,7 +187,6 @@ public class ExecutionServiceTests extends ESTestCase { assertThat(result.action(), sameInstance(actionResult)); verify(historyStore, times(1)).put(watchRecord); - verify(releasable, times(1)).close(); verify(condition, times(1)).execute(context); verify(watchTransform, times(1)).execute(context, payload); verify(action, times(1)).execute("_action", context, payload); @@ -208,9 +200,6 @@ public class ExecutionServiceTests extends ESTestCase { } public void testExecuteFailedInput() throws Exception { - Releasable releasable = mock(Releasable.class); - when(watchLockService.acquire("_id")).thenReturn(releasable); - GetResponse getResponse = mock(GetResponse.class); when(getResponse.isExists()).thenReturn(true); when(client.getWatch("_id")).thenReturn(getResponse); @@ -273,7 +262,6 @@ public class ExecutionServiceTests extends ESTestCase { assertThat(watchRecord.result().actionsResults().size(), is(0)); verify(historyStore, times(1)).put(watchRecord); - verify(releasable, times(1)).close(); verify(input, times(1)).execute(context, null); verify(condition, never()).execute(context); verify(watchTransform, never()).execute(context, payload); @@ -281,9 +269,6 @@ public class ExecutionServiceTests extends ESTestCase { } public void testExecuteFailedCondition() throws Exception { - Releasable releasable = mock(Releasable.class); - when(watchLockService.acquire("_id")).thenReturn(releasable); - Watch watch = mock(Watch.class); when(watch.id()).thenReturn("_id"); GetResponse getResponse = mock(GetResponse.class); @@ -341,7 +326,6 @@ public class ExecutionServiceTests extends ESTestCase { assertThat(watchRecord.result().actionsResults().size(), is(0)); verify(historyStore, times(1)).put(watchRecord); - verify(releasable, times(1)).close(); verify(input, times(1)).execute(context, null); verify(condition, times(1)).execute(context); verify(watchTransform, never()).execute(context, payload); @@ -349,9 +333,6 @@ public class ExecutionServiceTests extends ESTestCase { } public void testExecuteFailedWatchTransform() throws Exception { - Releasable releasable = mock(Releasable.class); - when(watchLockService.acquire("_id")).thenReturn(releasable); - Watch watch = mock(Watch.class); when(watch.id()).thenReturn("_id"); GetResponse getResponse = mock(GetResponse.class); @@ -408,7 +389,6 @@ public class ExecutionServiceTests extends ESTestCase { assertThat(watchRecord.result().actionsResults().size(), is(0)); verify(historyStore, times(1)).put(watchRecord); - verify(releasable, times(1)).close(); verify(input, times(1)).execute(context, null); verify(condition, times(1)).execute(context); verify(watchTransform, times(1)).execute(context, payload); @@ -416,9 +396,6 @@ public class ExecutionServiceTests extends ESTestCase { } public void testExecuteFailedActionTransform() throws Exception { - Releasable releasable = mock(Releasable.class); - when(watchLockService.acquire("_id")).thenReturn(releasable); - Watch watch = mock(Watch.class); when(watch.id()).thenReturn("_id"); GetResponse getResponse = mock(GetResponse.class); @@ -493,7 +470,6 @@ public class ExecutionServiceTests extends ESTestCase { assertThat(watchRecord.result().actionsResults().get("_action").action().status(), is(Action.Result.Status.FAILURE)); verify(historyStore, times(1)).put(watchRecord); - verify(releasable, times(1)).close(); verify(input, times(1)).execute(context, null); verify(condition, times(1)).execute(context); verify(watchTransform, times(1)).execute(context, payload); @@ -787,7 +763,6 @@ public class ExecutionServiceTests extends ESTestCase { public void testThatTriggeredWatchDeletionWorksOnExecutionRejection() throws Exception { Watch watch = mock(Watch.class); when(watch.id()).thenReturn("foo"); - when(watch.nonce()).thenReturn(1L); GetResponse getResponse = mock(GetResponse.class); when(getResponse.isExists()).thenReturn(true); when(getResponse.getId()).thenReturn("foo"); @@ -798,7 +773,7 @@ public class ExecutionServiceTests extends ESTestCase { doThrow(new EsRejectedExecutionException()).when(executor).execute(any()); doThrow(new ElasticsearchException("whatever")).when(historyStore).forcePut(any()); - Wid wid = new Wid(watch.id(), watch.nonce(), now()); + Wid wid = new Wid(watch.id(), now()); final ExecutorService currentThreadExecutor = EsExecutors.newDirectExecutorService(); when(threadPool.generic()).thenReturn(currentThreadExecutor); @@ -810,6 +785,19 @@ public class ExecutionServiceTests extends ESTestCase { verify(historyStore, times(1)).forcePut(any(WatchRecord.class)); } + public void testThatSingleWatchCannotBeExecutedConcurrently() throws Exception { + WatchExecutionContext ctx = mock(WatchExecutionContext.class); + Watch watch = mock(Watch.class); + when(watch.id()).thenReturn("_id"); + when(ctx.watch()).thenReturn(watch); + + executionService.getCurrentExecutions().put("_id", new ExecutionService.WatchExecution(ctx, Thread.currentThread())); + + executionService.execute(ctx); + + verify(ctx).abortBeforeExecution(eq(ExecutionState.NOT_EXECUTED_ALREADY_QUEUED), eq("Watch is already queued in thread pool")); + } + private Tuple<Condition, Condition.Result> whenCondition(final WatchExecutionContext context) { Condition.Result conditionResult = mock(Condition.Result.class); when(conditionResult.met()).thenReturn(true); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreLifeCycleTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreLifeCycleTests.java index 34eca3a7fd9..3f5f6c505b7 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreLifeCycleTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreLifeCycleTests.java @@ -34,7 +34,7 @@ public class TriggeredWatchStoreLifeCycleTests extends AbstractWatcherIntegratio for (int i = 0; i < triggeredWatches.length; i++) { DateTime dateTime = new DateTime(i, DateTimeZone.UTC); ScheduleTriggerEvent event = new ScheduleTriggerEvent(watch.id(), dateTime, dateTime); - Wid wid = new Wid("record_" + i, randomLong(), DateTime.now(DateTimeZone.UTC)); + Wid wid = new Wid("record_" + i, DateTime.now(DateTimeZone.UTC)); triggeredWatches[i] = new TriggeredWatch(wid, event); triggeredWatchStore.put(triggeredWatches[i]); GetResponse getResponse = client().prepareGet(TriggeredWatchStore.INDEX_NAME, TriggeredWatchStore.DOC_TYPE, diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchTests.java index 2f293ea0d63..5c2dfa670e4 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchTests.java @@ -23,7 +23,7 @@ public class TriggeredWatchTests extends AbstractWatcherIntegrationTestCase { Watch watch = WatcherTestUtils.createTestWatch("fired_test", watcherHttpClient(), noopEmailService(), watcherSearchTemplateService(), logger); ScheduleTriggerEvent event = new ScheduleTriggerEvent(watch.id(), DateTime.now(DateTimeZone.UTC), DateTime.now(DateTimeZone.UTC)); - Wid wid = new Wid("_record", randomLong(), DateTime.now(DateTimeZone.UTC)); + Wid wid = new Wid("_record", DateTime.now(DateTimeZone.UTC)); TriggeredWatch triggeredWatch = new TriggeredWatch(wid, event); XContentBuilder jsonBuilder = XContentFactory.jsonBuilder(); triggeredWatch.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java index 7ffd562270d..a1a3ce41026 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java @@ -61,7 +61,7 @@ public class HistoryStoreTests extends ESTestCase { } public void testPut() throws Exception { - Wid wid = new Wid("_name", 0, new DateTime(0, UTC)); + Wid wid = new Wid("_name", new DateTime(0, UTC)); ScheduleTriggerEvent event = new ScheduleTriggerEvent(wid.watchId(), new DateTime(0, UTC), new DateTime(0, UTC)); WatchRecord watchRecord = new WatchRecord.MessageWatchRecord(wid, event, ExecutionState.EXECUTED, null); @@ -74,7 +74,7 @@ public class HistoryStoreTests extends ESTestCase { } public void testPutStopped() throws Exception { - Wid wid = new Wid("_name", 0, new DateTime(0, UTC)); + Wid wid = new Wid("_name", new DateTime(0, UTC)); ScheduleTriggerEvent event = new ScheduleTriggerEvent(wid.watchId(), new DateTime(0, UTC), new DateTime(0, UTC)); WatchRecord watchRecord = new WatchRecord.MessageWatchRecord(wid, event, ExecutionState.EXECUTED, null); @@ -116,7 +116,7 @@ public class HistoryStoreTests extends ESTestCase { ActionWrapper.Result result = new ActionWrapper.Result(JiraAction.TYPE, new JiraAction.Executed(jiraIssue)); DateTime now = new DateTime(0, UTC); - Wid wid = new Wid("_name", 0, now); + Wid wid = new Wid("_name", now); Watch watch = mock(Watch.class); when(watch.id()).thenReturn("_id"); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/input/chain/ExecutableChainInputTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/input/chain/ExecutableChainInputTests.java index 09318c085ed..eca0cfea2e8 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/input/chain/ExecutableChainInputTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/input/chain/ExecutableChainInputTests.java @@ -63,7 +63,7 @@ public class ExecutableChainInputTests extends ESTestCase { private WatchExecutionContext createWatchExecutionContext() { DateTime now = DateTime.now(DateTimeZone.UTC); - Wid wid = new Wid(randomAsciiOfLength(5), randomLong(), now); + Wid wid = new Wid(randomAsciiOfLength(5), now); return mockExecutionContextBuilder(wid.watchId()) .wid(wid) .payload(new Payload.Simple()) diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/support/VariablesTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/support/VariablesTests.java index fb0eb0268f0..9fddf0d3801 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/support/VariablesTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/support/VariablesTests.java @@ -31,7 +31,7 @@ public class VariablesTests extends ESTestCase { Payload payload = new Payload.Simple(singletonMap("payload_key", "payload_value")); Map<String, Object> metatdata = singletonMap("metadata_key", "metadata_value"); TriggerEvent event = new ScheduleTriggerEvent("_watch_id", triggeredTime, scheduledTime); - Wid wid = new Wid("_watch_id", 0, executionTime); + Wid wid = new Wid("_watch_id", executionTime); WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContextBuilder("_watch_id") .wid(wid) .executionTime(executionTime) diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/test/WatcherTestUtils.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/test/WatcherTestUtils.java index 2abd6494c65..e3c33b1a9af 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/test/WatcherTestUtils.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/test/WatcherTestUtils.java @@ -139,19 +139,19 @@ public final class WatcherTestUtils { public static WatchExecutionContextMockBuilder mockExecutionContextBuilder(String watchId) { return new WatchExecutionContextMockBuilder(watchId) - .wid(new Wid(watchId, randomInt(10), DateTime.now(UTC))); + .wid(new Wid(watchId, DateTime.now(UTC))); } public static WatchExecutionContext mockExecutionContext(String watchId, Payload payload) { return mockExecutionContextBuilder(watchId) - .wid(new Wid(watchId, randomInt(10), DateTime.now(UTC))) + .wid(new Wid(watchId, DateTime.now(UTC))) .payload(payload) .buildMock(); } public static WatchExecutionContext mockExecutionContext(String watchId, DateTime time, Payload payload) { return mockExecutionContextBuilder(watchId) - .wid(new Wid(watchId, randomInt(10), DateTime.now(UTC))) + .wid(new Wid(watchId, DateTime.now(UTC))) .payload(payload) .time(watchId, time) .buildMock(); @@ -159,7 +159,7 @@ public final class WatcherTestUtils { public static WatchExecutionContext mockExecutionContext(String watchId, DateTime executionTime, TriggerEvent event, Payload payload) { return mockExecutionContextBuilder(watchId) - .wid(new Wid(watchId, randomInt(10), DateTime.now(UTC))) + .wid(new Wid(watchId, DateTime.now(UTC))) .payload(payload) .executionTime(executionTime) .triggerEvent(event) diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java index 3eac5a4a99a..d129621d29e 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java @@ -29,6 +29,7 @@ import org.hamcrest.Matchers; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; +import java.util.Arrays; import java.util.concurrent.TimeUnit; import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; @@ -43,6 +44,7 @@ import static org.elasticsearch.xpack.watcher.input.InputBuilders.searchInput; import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.templateRequest; import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule; import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.cron; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.core.IsEqual.equalTo; import static org.joda.time.DateTimeZone.UTC; @@ -69,7 +71,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase { // valid watch record: DateTime now = DateTime.now(UTC); - Wid wid = new Wid("_id", 1, now); + Wid wid = new Wid("_id", now); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); Condition condition = AlwaysCondition.INSTANCE; String index = HistoryStore.getHistoryIndexNameForTime(now); @@ -90,7 +92,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase { .get(); // unknown condition: - wid = new Wid("_id", 2, now); + wid = new Wid("_id", now); client().prepareIndex(index, HistoryStore.DOC_TYPE, wid.value()) .setSource(jsonBuilder().startObject() .startObject(WatchRecord.Field.TRIGGER_EVENT.getPreferredName()) @@ -108,7 +110,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase { .get(); // unknown trigger: - wid = new Wid("_id", 2, now); + wid = new Wid("_id", now); client().prepareIndex(index, HistoryStore.DOC_TYPE, wid.value()) .setSource(jsonBuilder().startObject() .startObject(WatchRecord.Field.TRIGGER_EVENT.getPreferredName()) @@ -139,7 +141,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase { assertAcked(client().admin().indices().prepareCreate(Watch.INDEX)); } DateTime now = DateTime.now(UTC); - Wid wid = new Wid("_id", 1, now); + Wid wid = new Wid("_id", now); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); client().prepareIndex(TriggeredWatchStore.INDEX_NAME, TriggeredWatchStore.DOC_TYPE, wid.value()) @@ -188,7 +190,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase { assertThat(response.getWatchesCount(), equalTo((long) numWatches)); } - public void testTriggeredWatchLoading() throws Exception { + public void testMixedTriggeredWatchLoading() throws Exception { createIndex("output"); client().prepareIndex("my-index", "foo", "bar") .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) @@ -198,8 +200,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase { assertThat(response.getWatcherState(), equalTo(WatcherState.STARTED)); assertThat(response.getWatchesCount(), equalTo(0L)); - WatcherSearchTemplateRequest request = - templateRequest(searchSource().query(termQuery("field", "value")), "my-index"); + WatcherSearchTemplateRequest request = templateRequest(searchSource().query(termQuery("field", "value")), "my-index"); int numWatches = 8; for (int i = 0; i < numWatches; i++) { @@ -219,7 +220,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase { String watchId = "_id" + (i % numWatches); now = now.plusMinutes(1); ScheduleTriggerEvent event = new ScheduleTriggerEvent(watchId, now, now); - Wid wid = new Wid(watchId, randomLong(), now); + Wid wid = new Wid(watchId, now); TriggeredWatch triggeredWatch = new TriggeredWatch(wid, event); client().prepareIndex(TriggeredWatchStore.INDEX_NAME, TriggeredWatchStore.DOC_TYPE, triggeredWatch.id().value()) .setSource(jsonBuilder().value(triggeredWatch)) @@ -230,22 +231,10 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase { stopWatcher(); startWatcher(); - assertBusy(() -> { - // We need to wait until all the records are processed from the internal execution queue, only then we can assert - // that numRecords watch records have been processed as part of starting up. - WatcherStatsResponse response1 = watcherClient().prepareWatcherStats().get(); - assertThat(response1.getWatcherState(), equalTo(WatcherState.STARTED)); - assertThat(response1.getThreadPoolQueueSize(), equalTo(0L)); - - // but even then since the execution of the watch record is async it may take a little bit before - // the actual documents are in the output index - refresh(); - SearchResponse searchResponse = client().prepareSearch("output").get(); - assertHitCount(searchResponse, numRecords); - }, 30, TimeUnit.SECONDS); + assertSingleExecutionAndCompleteWatchHistory(numWatches, numRecords); } - public void testMixedTriggeredWatchLoading() throws Exception { + public void testTriggeredWatchLoading() throws Exception { createIndex("output"); client().prepareIndex("my-index", "foo", "bar") .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) @@ -270,7 +259,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase { for (int i = 0; i < numRecords; i++) { now = now.plusMinutes(1); ScheduleTriggerEvent event = new ScheduleTriggerEvent(watchId, now, now); - Wid wid = new Wid(watchId, randomLong(), now); + Wid wid = new Wid(watchId, now); TriggeredWatch triggeredWatch = new TriggeredWatch(wid, event); client().prepareIndex(TriggeredWatchStore.INDEX_NAME, TriggeredWatchStore.DOC_TYPE, triggeredWatch.id().value()) .setSource(jsonBuilder().value(triggeredWatch)) @@ -281,19 +270,34 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase { stopWatcher(); startWatcher(); + assertSingleExecutionAndCompleteWatchHistory(1, numRecords); + } + + private void assertSingleExecutionAndCompleteWatchHistory(long numberOfWatches, int expectedWatchHistoryCount) throws Exception { assertBusy(() -> { // We need to wait until all the records are processed from the internal execution queue, only then we can assert // that numRecords watch records have been processed as part of starting up. - WatcherStatsResponse response1 = watcherClient().prepareWatcherStats().get(); - assertThat(response1.getWatcherState(), equalTo(WatcherState.STARTED)); - assertThat(response1.getThreadPoolQueueSize(), equalTo(0L)); + WatcherStatsResponse response = watcherClient().prepareWatcherStats().get(); + assertThat(response.getWatcherState(), equalTo(WatcherState.STARTED)); + assertThat(response.getThreadPoolQueueSize(), equalTo(0L)); - // but even then since the execution of the watch record is async it may take a little bit before - // the actual documents are in the output index + // because we try to execute a single watch in parallel, only one execution should happen refresh(); SearchResponse searchResponse = client().prepareSearch("output").get(); - assertHitCount(searchResponse, numRecords); - }); + assertThat(searchResponse.getHits().totalHits(), is(greaterThanOrEqualTo(numberOfWatches))); + long successfulWatchExecutions = searchResponse.getHits().totalHits(); + + // the watch history should contain entries for each triggered watch, which a few have been marked as not executed + SearchResponse historySearchResponse = client().prepareSearch(HistoryStore.INDEX_PREFIX + "*") + .setSize(expectedWatchHistoryCount).get(); + assertHitCount(historySearchResponse, expectedWatchHistoryCount); + long notExecutedCount = Arrays.asList(historySearchResponse.getHits().getHits()).stream() + .filter(hit -> hit.getSource().get("state").equals(ExecutionState.NOT_EXECUTED_ALREADY_QUEUED.id())) + .count(); + logger.info("Watches not executed: [{}]: expected watch history count [{}] - [{}] successful watch exections", + notExecutedCount, expectedWatchHistoryCount, successfulWatchExecutions); + assertThat(notExecutedCount, is(expectedWatchHistoryCount - successfulWatchExecutions)); + }, 20, TimeUnit.SECONDS); } public void testManuallyStopped() throws Exception { @@ -327,7 +331,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase { for (int i = 0; i < numRecords; i++) { String watchId = Integer.toString(i); ScheduleTriggerEvent event = new ScheduleTriggerEvent(watchId, triggeredTime, triggeredTime); - Wid wid = new Wid(watchId, 0, triggeredTime); + Wid wid = new Wid(watchId, triggeredTime); TriggeredWatch triggeredWatch = new TriggeredWatch(wid, event); client().prepareIndex(TriggeredWatchStore.INDEX_NAME, TriggeredWatchStore.DOC_TYPE, triggeredWatch.id().value()) .setSource(jsonBuilder().value(triggeredWatch)) diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchLockServiceTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchLockServiceTests.java deleted file mode 100644 index 0fdfe9d3419..00000000000 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchLockServiceTests.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.watcher.watch; - -import org.elasticsearch.ElasticsearchTimeoutException; -import org.elasticsearch.common.lease.Releasable; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.test.ESTestCase; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.startsWith; - -public class WatchLockServiceTests extends ESTestCase { - - private final Settings settings = - Settings.builder().put(WatchLockService.DEFAULT_MAX_STOP_TIMEOUT_SETTING, TimeValue.timeValueSeconds(1)).build(); - - public void testLockingNotStarted() { - WatchLockService lockService = new WatchLockService(settings); - try { - lockService.acquire("_name"); - fail("exception expected"); - } catch (Exception e) { - assertThat(e.getMessage(), containsString("not running")); - } - } - - public void testLocking() { - WatchLockService lockService = new WatchLockService(settings); - lockService.start(); - Releasable releasable = lockService.acquire("_name"); - assertThat(lockService.getWatchLocks().hasLockedKeys(), is(true)); - releasable.close(); - assertThat(lockService.getWatchLocks().hasLockedKeys(), is(false)); - lockService.stop(); - } - - public void testLockingStopTimeout(){ - final WatchLockService lockService = new WatchLockService(settings); - lockService.start(); - lockService.acquire("_name"); - try { - lockService.stop(); - fail("Expected ElasticsearchTimeoutException"); - } catch (ElasticsearchTimeoutException e) { - assertThat(e.getMessage(), startsWith("timed out waiting for watches to complete, after waiting for")); - } - } - - public void testLockingFair() throws Exception { - final WatchLockService lockService = new WatchLockService(settings); - lockService.start(); - final AtomicInteger value = new AtomicInteger(0); - List<Thread> threads = new ArrayList<>(); - - class FairRunner implements Runnable { - - final int expectedValue; - final CountDownLatch startLatch = new CountDownLatch(1); - - FairRunner(int expectedValue) { - this.expectedValue = expectedValue; - } - - @Override - public void run() { - startLatch.countDown(); - try (Releasable ignored = lockService.acquire("_name")) { - int actualValue = value.getAndIncrement(); - assertThat(actualValue, equalTo(expectedValue)); - Thread.sleep(50); - } catch(InterruptedException ie) { - } - } - } - - List<FairRunner> runners = new ArrayList<>(); - - for(int i = 0; i < 50; ++i) { - FairRunner f = new FairRunner(i); - runners.add(f); - threads.add(new Thread(f)); - } - - for(int i = 0; i < threads.size(); ++i) { - threads.get(i).start(); - runners.get(i).startLatch.await(); - Thread.sleep(25); - } - - for(Thread t : threads) { - t.join(); - } - } - -}