Watcher: Remove WatchLockService ()

The watch lock service is not really needed, as there is already
a data structure that has information about the currently executing
watches, that can be consulted before executed.

This change will now check, if there is already a watch running with
the current id. If there is not, execution will happen as usual. If
there is however, than a watch record will be created, stating that
the watch is currently being executed - which means that it is either
being executed or in the list of planned executions.

This way users can check in the watch history, if a watch has been executed
more often than it should.

In order to easily search for this, a new execution state called
`NOT_EXECUTED_ALREADY_QUEUED` has been added.

Original commit: elastic/x-pack-elasticsearch@867acec3c3
This commit is contained in:
Alexander Reelsen 2016-12-19 17:33:48 +01:00 committed by GitHub
parent 7a652fa090
commit 79a8f27569
28 changed files with 150 additions and 341 deletions

@ -28,7 +28,6 @@ import org.elasticsearch.plugins.ScriptPlugin;
import org.elasticsearch.rest.RestHandler; import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.script.ScriptContext; import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.ScriptSettings;
import org.elasticsearch.search.SearchRequestParsers; import org.elasticsearch.search.SearchRequestParsers;
import org.elasticsearch.threadpool.ExecutorBuilder; import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.FixedExecutorBuilder;
@ -141,7 +140,6 @@ import org.elasticsearch.xpack.watcher.trigger.schedule.YearlySchedule;
import org.elasticsearch.xpack.watcher.trigger.schedule.engine.SchedulerScheduleTriggerEngine; import org.elasticsearch.xpack.watcher.trigger.schedule.engine.SchedulerScheduleTriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.schedule.engine.TickerScheduleTriggerEngine; import org.elasticsearch.xpack.watcher.trigger.schedule.engine.TickerScheduleTriggerEngine;
import org.elasticsearch.xpack.watcher.watch.Watch; import org.elasticsearch.xpack.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.watch.WatchLockService;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.DateTimeZone; import org.joda.time.DateTimeZone;
@ -281,12 +279,11 @@ public class Watcher implements ActionPlugin, ScriptPlugin {
final WatcherSearchTemplateService watcherSearchTemplateService = final WatcherSearchTemplateService watcherSearchTemplateService =
new WatcherSearchTemplateService(settings, scriptService, searchRequestParsers); new WatcherSearchTemplateService(settings, scriptService, searchRequestParsers);
final WatchLockService watchLockService = new WatchLockService(settings);
final WatchExecutor watchExecutor = getWatchExecutor(threadPool); final WatchExecutor watchExecutor = getWatchExecutor(threadPool);
final Watch.Parser watchParser = new Watch.Parser(settings, triggerService, registry, inputRegistry, cryptoService, clock); final Watch.Parser watchParser = new Watch.Parser(settings, triggerService, registry, inputRegistry, cryptoService, clock);
final ExecutionService executionService = new ExecutionService(settings, historyStore, triggeredWatchStore, watchExecutor, final ExecutionService executionService = new ExecutionService(settings, historyStore, triggeredWatchStore, watchExecutor,
watchLockService, clock, threadPool, watchParser, watcherClientProxy); clock, threadPool, watchParser, watcherClientProxy);
final TriggerEngine.Listener triggerEngineListener = getTriggerEngineListener(executionService); final TriggerEngine.Listener triggerEngineListener = getTriggerEngineListener(executionService);
triggerService.register(triggerEngineListener); triggerService.register(triggerEngineListener);
@ -294,7 +291,7 @@ public class Watcher implements ActionPlugin, ScriptPlugin {
final WatcherIndexTemplateRegistry watcherIndexTemplateRegistry = new WatcherIndexTemplateRegistry(settings, final WatcherIndexTemplateRegistry watcherIndexTemplateRegistry = new WatcherIndexTemplateRegistry(settings,
clusterService.getClusterSettings(), clusterService, threadPool, internalClient); clusterService.getClusterSettings(), clusterService, threadPool, internalClient);
final WatcherService watcherService = new WatcherService(settings, triggerService, executionService, watchLockService, final WatcherService watcherService = new WatcherService(settings, triggerService, executionService,
watcherIndexTemplateRegistry, watchParser, watcherClientProxy); watcherIndexTemplateRegistry, watchParser, watcherClientProxy);
final WatcherLifeCycleService watcherLifeCycleService = final WatcherLifeCycleService watcherLifeCycleService =

@ -9,7 +9,6 @@ package org.elasticsearch.xpack.watcher;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier; import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
@ -27,7 +26,6 @@ import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy; import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.watcher.trigger.TriggerService; import org.elasticsearch.xpack.watcher.trigger.TriggerService;
import org.elasticsearch.xpack.watcher.watch.Watch; import org.elasticsearch.xpack.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.watch.WatchLockService;
import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils; import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils;
import java.util.ArrayList; import java.util.ArrayList;
@ -45,7 +43,6 @@ import static org.elasticsearch.xpack.watcher.watch.Watch.INDEX;
public class WatcherService extends AbstractComponent { public class WatcherService extends AbstractComponent {
private final TriggerService triggerService; private final TriggerService triggerService;
private final WatchLockService watchLockService;
private final ExecutionService executionService; private final ExecutionService executionService;
private final WatcherIndexTemplateRegistry watcherIndexTemplateRegistry; private final WatcherIndexTemplateRegistry watcherIndexTemplateRegistry;
// package-private for testing // package-private for testing
@ -55,12 +52,10 @@ public class WatcherService extends AbstractComponent {
private final Watch.Parser parser; private final Watch.Parser parser;
private final WatcherClientProxy client; private final WatcherClientProxy client;
public WatcherService(Settings settings, TriggerService triggerService, public WatcherService(Settings settings, TriggerService triggerService, ExecutionService executionService,
ExecutionService executionService, WatchLockService watchLockService,
WatcherIndexTemplateRegistry watcherIndexTemplateRegistry, Watch.Parser parser, WatcherClientProxy client) { WatcherIndexTemplateRegistry watcherIndexTemplateRegistry, Watch.Parser parser, WatcherClientProxy client) {
super(settings); super(settings);
this.triggerService = triggerService; this.triggerService = triggerService;
this.watchLockService = watchLockService;
this.executionService = executionService; this.executionService = executionService;
this.watcherIndexTemplateRegistry = watcherIndexTemplateRegistry; this.watcherIndexTemplateRegistry = watcherIndexTemplateRegistry;
this.scrollTimeout = settings.getAsTime("xpack.watcher.watch.scroll.timeout", TimeValue.timeValueSeconds(30)); this.scrollTimeout = settings.getAsTime("xpack.watcher.watch.scroll.timeout", TimeValue.timeValueSeconds(30));
@ -74,7 +69,6 @@ public class WatcherService extends AbstractComponent {
try { try {
logger.debug("starting watch service..."); logger.debug("starting watch service...");
watcherIndexTemplateRegistry.addTemplatesIfMissing(); watcherIndexTemplateRegistry.addTemplatesIfMissing();
watchLockService.start();
executionService.start(clusterState); executionService.start(clusterState);
triggerService.start(loadWatches(clusterState)); triggerService.start(loadWatches(clusterState));
@ -98,11 +92,6 @@ public class WatcherService extends AbstractComponent {
logger.debug("stopping watch service..."); logger.debug("stopping watch service...");
triggerService.stop(); triggerService.stop();
executionService.stop(); executionService.stop();
try {
watchLockService.stop();
} catch (ElasticsearchTimeoutException te) {
logger.warn("error stopping WatchLockService", te);
}
state.set(WatcherState.STOPPED); state.set(WatcherState.STOPPED);
logger.debug("watch service has stopped"); logger.debug("watch service has stopped");
} else { } else {

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.xpack.watcher.execution; package org.elasticsearch.xpack.watcher.execution;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import java.util.Iterator; import java.util.Iterator;
@ -16,22 +17,31 @@ import java.util.concurrent.locks.ReentrantLock;
import static org.elasticsearch.xpack.watcher.support.Exceptions.illegalState; import static org.elasticsearch.xpack.watcher.support.Exceptions.illegalState;
public class CurrentExecutions implements Iterable<ExecutionService.WatchExecution> { public final class CurrentExecutions implements Iterable<ExecutionService.WatchExecution> {
private final ConcurrentMap<String, ExecutionService.WatchExecution> currentExecutions = new ConcurrentHashMap<>(); private final ConcurrentMap<String, ExecutionService.WatchExecution> currentExecutions = new ConcurrentHashMap<>();
// the condition of the lock is used to wait and signal the finishing of all executions on shutdown
private final ReentrantLock lock = new ReentrantLock(); private final ReentrantLock lock = new ReentrantLock();
private final Condition empty = lock.newCondition(); private final Condition empty = lock.newCondition();
private boolean seal = false; // a marker to not accept new executions, used when the watch service is powered down
private SetOnce<Boolean> seal = new SetOnce<>();
public void put(String id, ExecutionService.WatchExecution execution) { /**
* Tries to put an watch execution class for a watch in the current executions
*
* @param id The id of the watch
* @param execution The watch execution class
* @return Returns true if watch with id already is in the current executions class, false otherwise
*/
public boolean put(String id, ExecutionService.WatchExecution execution) {
lock.lock(); lock.lock();
try { try {
if (seal) { if (seal.get() != null) {
// We shouldn't get here, because, ExecutionService#started should have been set to false // We shouldn't get here, because, ExecutionService#started should have been set to false
throw illegalState("could not register execution [{}]. current executions are sealed and forbid registrations of " + throw illegalState("could not register execution [{}]. current executions are sealed and forbid registrations of " +
"additional executions.", id); "additional executions.", id);
} }
currentExecutions.put(id, execution); return currentExecutions.putIfAbsent(id, execution) != null;
} finally { } finally {
lock.unlock(); lock.unlock();
} }
@ -49,16 +59,22 @@ public class CurrentExecutions implements Iterable<ExecutionService.WatchExecuti
} }
} }
public void sealAndAwaitEmpty(TimeValue maxStopTimeout) { /**
* Calling this method makes the class stop accepting new executions and throws and exception instead.
* In addition it waits for a certain amount of time for current executions to finish before returning
*
* @param maxStopTimeout The maximum wait time to wait to current executions to finish
*/
void sealAndAwaitEmpty(TimeValue maxStopTimeout) {
lock.lock();
// We may have current executions still going on. // We may have current executions still going on.
// We should try to wait for the current executions to have completed. // We should try to wait for the current executions to have completed.
// Otherwise we can run into a situation where we didn't delete the watch from the .triggered_watches index, // Otherwise we can run into a situation where we didn't delete the watch from the .triggered_watches index,
// but did insert into the history index. Upon start this can lead to DocumentAlreadyExistsException, // but did insert into the history index. Upon start this can lead to DocumentAlreadyExistsException,
// because we already stored the history record during shutdown... // because we already stored the history record during shutdown...
// (we always first store the watch record and then remove the triggered watch) // (we always first store the watch record and then remove the triggered watch)
lock.lock();
try { try {
seal = true; seal.set(true);
while (currentExecutions.size() > 0) { while (currentExecutions.size() > 0) {
empty.await(maxStopTimeout.millis(), TimeUnit.MILLISECONDS); empty.await(maxStopTimeout.millis(), TimeUnit.MILLISECONDS);
} }

@ -12,7 +12,6 @@ import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -30,7 +29,6 @@ import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.watcher.transform.Transform; import org.elasticsearch.xpack.watcher.transform.Transform;
import org.elasticsearch.xpack.watcher.trigger.TriggerEvent; import org.elasticsearch.xpack.watcher.trigger.TriggerEvent;
import org.elasticsearch.xpack.watcher.watch.Watch; import org.elasticsearch.xpack.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.watch.WatchLockService;
import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils; import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -48,7 +46,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static org.joda.time.DateTimeZone.UTC; import static org.joda.time.DateTimeZone.UTC;
public class ExecutionService extends AbstractComponent { public final class ExecutionService extends AbstractComponent {
public static final Setting<TimeValue> DEFAULT_THROTTLE_PERIOD_SETTING = public static final Setting<TimeValue> DEFAULT_THROTTLE_PERIOD_SETTING =
Setting.positiveTimeSetting("xpack.watcher.execution.default_throttle_period", Setting.positiveTimeSetting("xpack.watcher.execution.default_throttle_period",
@ -60,7 +58,6 @@ public class ExecutionService extends AbstractComponent {
private final HistoryStore historyStore; private final HistoryStore historyStore;
private final TriggeredWatchStore triggeredWatchStore; private final TriggeredWatchStore triggeredWatchStore;
private final WatchExecutor executor; private final WatchExecutor executor;
private final WatchLockService watchLockService;
private final Clock clock; private final Clock clock;
private final TimeValue defaultThrottlePeriod; private final TimeValue defaultThrottlePeriod;
private final TimeValue maxStopTimeout; private final TimeValue maxStopTimeout;
@ -72,13 +69,12 @@ public class ExecutionService extends AbstractComponent {
private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean started = new AtomicBoolean(false);
public ExecutionService(Settings settings, HistoryStore historyStore, TriggeredWatchStore triggeredWatchStore, WatchExecutor executor, public ExecutionService(Settings settings, HistoryStore historyStore, TriggeredWatchStore triggeredWatchStore, WatchExecutor executor,
WatchLockService watchLockService, Clock clock, ThreadPool threadPool, Watch.Parser parser, Clock clock, ThreadPool threadPool, Watch.Parser parser,
WatcherClientProxy client) { WatcherClientProxy client) {
super(settings); super(settings);
this.historyStore = historyStore; this.historyStore = historyStore;
this.triggeredWatchStore = triggeredWatchStore; this.triggeredWatchStore = triggeredWatchStore;
this.executor = executor; this.executor = executor;
this.watchLockService = watchLockService;
this.clock = clock; this.clock = clock;
this.defaultThrottlePeriod = DEFAULT_THROTTLE_PERIOD_SETTING.get(settings); this.defaultThrottlePeriod = DEFAULT_THROTTLE_PERIOD_SETTING.get(settings);
this.maxStopTimeout = Watcher.MAX_STOP_TIMEOUT_SETTING.get(settings); this.maxStopTimeout = Watcher.MAX_STOP_TIMEOUT_SETTING.get(settings);
@ -153,6 +149,11 @@ public class ExecutionService extends AbstractComponent {
return executor.largestPoolSize(); return executor.largestPoolSize();
} }
// for testing only
CurrentExecutions getCurrentExecutions() {
return currentExecutions;
}
public List<WatchExecutionSnapshot> currentExecutions() { public List<WatchExecutionSnapshot> currentExecutions() {
List<WatchExecutionSnapshot> currentExecutions = new ArrayList<>(); List<WatchExecutionSnapshot> currentExecutions = new ArrayList<>();
for (WatchExecution watchExecution : this.currentExecutions) { for (WatchExecution watchExecution : this.currentExecutions) {
@ -175,8 +176,8 @@ public class ExecutionService extends AbstractComponent {
WatchExecutionTask executionTask = (WatchExecutionTask) task; WatchExecutionTask executionTask = (WatchExecutionTask) task;
queuedWatches.add(new QueuedWatch(executionTask.ctx)); queuedWatches.add(new QueuedWatch(executionTask.ctx));
} }
// Lets show the execution that pending the longest first:
// Lets show the execution that pending the longest first:
Collections.sort(queuedWatches, Comparator.comparing(QueuedWatch::executionTime)); Collections.sort(queuedWatches, Comparator.comparing(QueuedWatch::executionTime));
return queuedWatches; return queuedWatches;
} }
@ -255,27 +256,27 @@ public class ExecutionService extends AbstractComponent {
public WatchRecord execute(WatchExecutionContext ctx) { public WatchRecord execute(WatchExecutionContext ctx) {
WatchRecord record = null; WatchRecord record = null;
Releasable releasable = watchLockService.acquire(ctx.watch().id());
if (logger.isTraceEnabled()) {
logger.trace("acquired lock for [{}] -- [{}]", ctx.id(), System.identityHashCode(releasable));
}
try { try {
currentExecutions.put(ctx.watch().id(), new WatchExecution(ctx, Thread.currentThread())); boolean executionAlreadyExists = currentExecutions.put(ctx.watch().id(), new WatchExecution(ctx, Thread.currentThread()));
final AtomicBoolean watchExists = new AtomicBoolean(true); if (executionAlreadyExists) {
client.getWatch(ctx.watch().id(), ActionListener.wrap((r) -> watchExists.set(r.isExists()), (e) -> watchExists.set(false))); logger.trace("not executing watch [{}] because it is already queued", ctx.watch().id());
record = ctx.abortBeforeExecution(ExecutionState.NOT_EXECUTED_ALREADY_QUEUED, "Watch is already queued in thread pool");
if (ctx.knownWatch() && watchExists.get() == false) {
// fail fast if we are trying to execute a deleted watch
String message = "unable to find watch for record [" + ctx.id() + "], perhaps it has been deleted, ignoring...";
logger.warn("{}", message);
record = ctx.abortBeforeExecution(ExecutionState.NOT_EXECUTED_WATCH_MISSING, message);
} else { } else {
logger.debug("executing watch [{}]", ctx.id().watchId()); final AtomicBoolean watchExists = new AtomicBoolean(true);
client.getWatch(ctx.watch().id(), ActionListener.wrap((r) -> watchExists.set(r.isExists()), (e) -> watchExists.set(false)));
record = executeInner(ctx); if (ctx.knownWatch() && watchExists.get() == false) {
if (ctx.recordExecution()) { // fail fast if we are trying to execute a deleted watch
client.updateWatchStatus(ctx.watch()); String message = "unable to find watch for record [" + ctx.id() + "], perhaps it has been deleted, ignoring...";
record = ctx.abortBeforeExecution(ExecutionState.NOT_EXECUTED_WATCH_MISSING, message);
} else {
logger.debug("executing watch [{}]", ctx.id().watchId());
record = executeInner(ctx);
if (ctx.recordExecution()) {
client.updateWatchStatus(ctx.watch());
}
} }
} }
} catch (Exception e) { } catch (Exception e) {
@ -300,10 +301,6 @@ public class ExecutionService extends AbstractComponent {
logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to delete triggered watch [{}]", ctx.id()), e); logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to delete triggered watch [{}]", ctx.id()), e);
} }
currentExecutions.remove(ctx.watch().id()); currentExecutions.remove(ctx.watch().id());
if (logger.isTraceEnabled()) {
logger.trace("releasing lock for [{}] -- [{}]", ctx.id(), System.identityHashCode(releasable));
}
releasable.close();
logger.trace("finished [{}]/[{}]", ctx.watch().id(), ctx.id()); logger.trace("finished [{}]/[{}]", ctx.watch().id(), ctx.id());
} }
return record; return record;
@ -443,7 +440,7 @@ public class ExecutionService extends AbstractComponent {
counter++; counter++;
} }
} }
logger.debug("executed [{}] watches from the watch history", counter); logger.debug("triggered execution of [{}] watches", counter);
} }
public Map<String, Object> usageStats() { public Map<String, Object> usageStats() {
@ -490,12 +487,7 @@ public class ExecutionService extends AbstractComponent {
@Override @Override
public void run() { public void run() {
try { execute(ctx);
execute(ctx);
} catch (Exception e) {
logger.error(
(Supplier<?>) () -> new ParameterizedMessage("could not execute watch [{}]/[{}]", ctx.watch().id(), ctx.id()), e);
}
} }
} }

@ -9,11 +9,27 @@ import java.util.Locale;
public enum ExecutionState { public enum ExecutionState {
// the condition of the watch was not met
EXECUTION_NOT_NEEDED, EXECUTION_NOT_NEEDED,
// Execution has been throttled due to ack/time-based throttling
THROTTLED, THROTTLED,
// regular execution
EXECUTED, EXECUTED,
// an error in the condition or the execution of the input
FAILED, FAILED,
// the execution was scheduled, but in between the watch was deleted
NOT_EXECUTED_WATCH_MISSING, NOT_EXECUTED_WATCH_MISSING,
// even though the execution was scheduled, it was not executed, because the watch was already queued in the thread pool
NOT_EXECUTED_ALREADY_QUEUED,
// this can happen when a watch was executed, but not completely finished (the triggered watch entry was not deleted), and then
// watcher is restarted (manually or due to host switch) - the triggered watch will be executed but the history entry already
// exists
EXECUTED_MULTIPLE_TIMES; EXECUTED_MULTIPLE_TIMES;
public String id() { public String id() {

@ -42,7 +42,7 @@ public abstract class WatchExecutionContext {
private ConcurrentMap<String, ActionWrapper.Result> actionsResults = ConcurrentCollections.newConcurrentMap(); private ConcurrentMap<String, ActionWrapper.Result> actionsResults = ConcurrentCollections.newConcurrentMap();
public WatchExecutionContext(Watch watch, DateTime executionTime, TriggerEvent triggerEvent, TimeValue defaultThrottlePeriod) { public WatchExecutionContext(Watch watch, DateTime executionTime, TriggerEvent triggerEvent, TimeValue defaultThrottlePeriod) {
this.id = new Wid(watch.id(), watch.nonce(), executionTime); this.id = new Wid(watch.id(), executionTime);
this.watch = watch; this.watch = watch;
this.executionTime = executionTime; this.executionTime = executionTime;
this.triggerEvent = triggerEvent; this.triggerEvent = triggerEvent;

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.xpack.watcher.execution; package org.elasticsearch.xpack.watcher.execution;
import org.elasticsearch.common.UUIDs;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormatter; import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat; import org.joda.time.format.ISODateTimeFormat;
@ -19,9 +20,9 @@ public class Wid {
private final String value; private final String value;
public Wid(String watchId, long nonce, DateTime executionTime) { public Wid(String watchId, DateTime executionTime) {
this.watchId = watchId; this.watchId = watchId;
this.value = watchId + "_" + String.valueOf(nonce) + "-" + formatter.print(executionTime); this.value = watchId + "_" + UUIDs.base64UUID().replaceAll("_", "-") + "-" + formatter.print(executionTime);
} }
public Wid(String value) { public Wid(String value) {

@ -104,9 +104,8 @@ public class HistoryStore extends AbstractComponent {
.opType(IndexRequest.OpType.CREATE); .opType(IndexRequest.OpType.CREATE);
client.index(request, (TimeValue) null); client.index(request, (TimeValue) null);
} catch (VersionConflictEngineException vcee) { } catch (VersionConflictEngineException vcee) {
logger.warn("watch record [{}] has executed multiple times, this can happen during watcher restarts", watchRecord);
watchRecord = new WatchRecord.MessageWatchRecord(watchRecord, ExecutionState.EXECUTED_MULTIPLE_TIMES, watchRecord = new WatchRecord.MessageWatchRecord(watchRecord, ExecutionState.EXECUTED_MULTIPLE_TIMES,
"watch record has been stored before, previous state [" + watchRecord.state() + "]"); "watch record [{ " + watchRecord.id() + " }] has been stored before, previous state [" + watchRecord.state() + "]");
IndexRequest request = new IndexRequest(index, DOC_TYPE, watchRecord.id().value()) IndexRequest request = new IndexRequest(index, DOC_TYPE, watchRecord.id().value())
.source(XContentFactory.jsonBuilder().value(watchRecord)); .source(XContentFactory.jsonBuilder().value(watchRecord));
client.index(request, (TimeValue) null); client.index(request, (TimeValue) null);

@ -72,8 +72,6 @@ public class Watch implements TriggerEngine.Job, ToXContent {
@Nullable private final Map<String, Object> metadata; @Nullable private final Map<String, Object> metadata;
private final WatchStatus status; private final WatchStatus status;
private final transient AtomicLong nonceCounter = new AtomicLong();
private transient long version = Versions.MATCH_ANY; private transient long version = Versions.MATCH_ANY;
public Watch(String id, Trigger trigger, ExecutableInput input, Condition condition, @Nullable ExecutableTransform transform, public Watch(String id, Trigger trigger, ExecutableInput input, Condition condition, @Nullable ExecutableTransform transform,
@ -157,10 +155,6 @@ public class Watch implements TriggerEngine.Job, ToXContent {
return actionStatus.ackStatus().state() == ActionStatus.AckStatus.State.ACKED; return actionStatus.ackStatus().state() == ActionStatus.AckStatus.State.ACKED;
} }
public long nonce() {
return nonceCounter.getAndIncrement();
}
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) return true; if (this == o) return true;

@ -1,80 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.watch;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.xpack.watcher.support.Exceptions.illegalState;
public class WatchLockService extends AbstractComponent {
public static final String DEFAULT_MAX_STOP_TIMEOUT_SETTING = "xpack.watcher.stop.timeout";
private final KeyedLock<String> watchLocks = new KeyedLock<>(true);
private final AtomicBoolean running = new AtomicBoolean(false);
private static final TimeValue DEFAULT_MAX_STOP_TIMEOUT = new TimeValue(30, TimeUnit.SECONDS);
private final TimeValue maxStopTimeout;
public WatchLockService(Settings settings){
super(settings);
maxStopTimeout = settings.getAsTime(DEFAULT_MAX_STOP_TIMEOUT_SETTING, DEFAULT_MAX_STOP_TIMEOUT);
}
public Releasable acquire(String name) {
if (!running.get()) {
throw illegalState("cannot acquire lock for watch [{}]. lock service is not running", name);
}
return watchLocks.acquire(name);
}
public void start() {
if (running.compareAndSet(false, true)) {
// init
}
}
/**
* @throws ElasticsearchTimeoutException if we have waited longer than maxStopTimeout
*/
public void stop() throws ElasticsearchTimeoutException {
if (running.compareAndSet(true, false)) {
// It can happen we have still ongoing operations and we wait those operations to finish to avoid
// that watch service or any of its components end up in a illegal state after the state as been set to stopped.
//
// For example: A watch action entry may be added while we stopping watcher if we don't wait for
// ongoing operations to complete. Resulting in once the watch service starts again that more than
// expected watch records are processed.
//
// Note: new operations will fail now because the running has been set to false
long startWait = System.currentTimeMillis();
while (watchLocks.hasLockedKeys()) {
TimeValue timeWaiting = new TimeValue(System.currentTimeMillis() - startWait);
if (timeWaiting.getSeconds() > maxStopTimeout.getSeconds()) {
throw new ElasticsearchTimeoutException("timed out waiting for watches to complete, after waiting for [{}]",
timeWaiting);
}
try {
Thread.sleep(100);
} catch (InterruptedException ie) {
}
}
}
}
KeyedLock<String> getWatchLocks() {
return watchLocks;
}
}

@ -144,7 +144,7 @@ public class HttpEmailAttachementParserTests extends ESTestCase {
private WatchExecutionContext createWatchExecutionContext() { private WatchExecutionContext createWatchExecutionContext() {
DateTime now = DateTime.now(DateTimeZone.UTC); DateTime now = DateTime.now(DateTimeZone.UTC);
Wid wid = new Wid(randomAsciiOfLength(5), randomLong(), now); Wid wid = new Wid(randomAsciiOfLength(5), now);
Map<String, Object> metadata = MapBuilder.<String, Object>newMapBuilder().put("_key", "_val").map(); Map<String, Object> metadata = MapBuilder.<String, Object>newMapBuilder().put("_key", "_val").map();
return mockExecutionContextBuilder("watch1") return mockExecutionContextBuilder("watch1")
.wid(wid) .wid(wid)

@ -383,7 +383,7 @@ public class ReportingAttachmentParserTests extends ESTestCase {
private WatchExecutionContext createWatchExecutionContext() { private WatchExecutionContext createWatchExecutionContext() {
DateTime now = DateTime.now(DateTimeZone.UTC); DateTime now = DateTime.now(DateTimeZone.UTC);
return mockExecutionContextBuilder("watch1") return mockExecutionContextBuilder("watch1")
.wid(new Wid(randomAsciiOfLength(5), randomLong(), now)) .wid(new Wid(randomAsciiOfLength(5), now))
.payload(new Payload.Simple()) .payload(new Payload.Simple())
.time("watch1", now) .time("watch1", now)
.metadata(Collections.emptyMap()) .metadata(Collections.emptyMap())

@ -138,7 +138,7 @@ public class EmailActionTests extends ESTestCase {
DateTime now = DateTime.now(DateTimeZone.UTC); DateTime now = DateTime.now(DateTimeZone.UTC);
Wid wid = new Wid(randomAsciiOfLength(5), randomLong(), now); Wid wid = new Wid(randomAsciiOfLength(5), now);
WatchExecutionContext ctx = mockExecutionContextBuilder("watch1") WatchExecutionContext ctx = mockExecutionContextBuilder("watch1")
.wid(wid) .wid(wid)
.payload(payload) .payload(payload)
@ -543,7 +543,7 @@ public class EmailActionTests extends ESTestCase {
emailAttachmentsParser).parseExecutable(randomAsciiOfLength(3), randomAsciiOfLength(7), parser); emailAttachmentsParser).parseExecutable(randomAsciiOfLength(3), randomAsciiOfLength(7), parser);
DateTime now = DateTime.now(DateTimeZone.UTC); DateTime now = DateTime.now(DateTimeZone.UTC);
Wid wid = new Wid(randomAsciiOfLength(5), randomLong(), now); Wid wid = new Wid(randomAsciiOfLength(5), now);
Map<String, Object> metadata = MapBuilder.<String, Object>newMapBuilder().put("_key", "_val").map(); Map<String, Object> metadata = MapBuilder.<String, Object>newMapBuilder().put("_key", "_val").map();
WatchExecutionContext ctx = mockExecutionContextBuilder("watch1") WatchExecutionContext ctx = mockExecutionContextBuilder("watch1")
.wid(wid) .wid(wid)
@ -569,7 +569,7 @@ public class EmailActionTests extends ESTestCase {
private WatchExecutionContext createWatchExecutionContext() { private WatchExecutionContext createWatchExecutionContext() {
DateTime now = DateTime.now(DateTimeZone.UTC); DateTime now = DateTime.now(DateTimeZone.UTC);
Wid wid = new Wid(randomAsciiOfLength(5), randomLong(), now); Wid wid = new Wid(randomAsciiOfLength(5), now);
Map<String, Object> metadata = MapBuilder.<String, Object>newMapBuilder().put("_key", "_val").map(); Map<String, Object> metadata = MapBuilder.<String, Object>newMapBuilder().put("_key", "_val").map();
return mockExecutionContextBuilder("watch1") return mockExecutionContextBuilder("watch1")
.wid(wid) .wid(wid)

@ -74,7 +74,7 @@ public class HipChatActionTests extends ESTestCase {
DateTime now = DateTime.now(DateTimeZone.UTC); DateTime now = DateTime.now(DateTimeZone.UTC);
Wid wid = new Wid(randomAsciiOfLength(5), randomLong(), now); Wid wid = new Wid(randomAsciiOfLength(5), now);
WatchExecutionContext ctx = mockExecutionContextBuilder(wid.watchId()) WatchExecutionContext ctx = mockExecutionContextBuilder(wid.watchId())
.wid(wid) .wid(wid)
.payload(payload) .payload(payload)

@ -75,7 +75,7 @@ public class ExecutableJiraActionTests extends ESTestCase {
DateTime now = DateTime.now(DateTimeZone.UTC); DateTime now = DateTime.now(DateTimeZone.UTC);
Wid wid = new Wid(randomAsciiOfLength(5), randomLong(), now); Wid wid = new Wid(randomAsciiOfLength(5), now);
WatchExecutionContext ctx = mockExecutionContextBuilder(wid.watchId()) WatchExecutionContext ctx = mockExecutionContextBuilder(wid.watchId())
.wid(wid) .wid(wid)
.payload(new Payload.Simple()) .payload(new Payload.Simple())
@ -286,7 +286,7 @@ public class ExecutableJiraActionTests extends ESTestCase {
private WatchExecutionContext createWatchExecutionContext() { private WatchExecutionContext createWatchExecutionContext() {
DateTime now = DateTime.now(DateTimeZone.UTC); DateTime now = DateTime.now(DateTimeZone.UTC);
Wid wid = new Wid(randomAsciiOfLength(5), randomLong(), now); Wid wid = new Wid(randomAsciiOfLength(5), now);
Map<String, Object> metadata = MapBuilder.<String, Object>newMapBuilder().put("_key", "_val").map(); Map<String, Object> metadata = MapBuilder.<String, Object>newMapBuilder().put("_key", "_val").map();
return mockExecutionContextBuilder("watch1") return mockExecutionContextBuilder("watch1")
.wid(wid) .wid(wid)

@ -226,7 +226,7 @@ public class JiraActionTests extends ESTestCase {
DateTime now = DateTime.now(DateTimeZone.UTC); DateTime now = DateTime.now(DateTimeZone.UTC);
Wid wid = new Wid(randomAsciiOfLength(5), randomLong(), now); Wid wid = new Wid(randomAsciiOfLength(5), now);
WatchExecutionContext context = mockExecutionContextBuilder(wid.watchId()) WatchExecutionContext context = mockExecutionContextBuilder(wid.watchId())
.wid(wid) .wid(wid)
.payload(payload) .payload(payload)

@ -79,7 +79,7 @@ public class PagerDutyActionTests extends ESTestCase {
DateTime now = DateTime.now(DateTimeZone.UTC); DateTime now = DateTime.now(DateTimeZone.UTC);
Wid wid = new Wid(randomAsciiOfLength(5), randomLong(), now); Wid wid = new Wid(randomAsciiOfLength(5), now);
WatchExecutionContext ctx = mockExecutionContextBuilder(wid.watchId()) WatchExecutionContext ctx = mockExecutionContextBuilder(wid.watchId())
.wid(wid) .wid(wid)
.payload(payload) .payload(payload)

@ -48,7 +48,7 @@ public class ExecutableSlackActionTests extends ESTestCase {
DateTime now = DateTime.now(DateTimeZone.UTC); DateTime now = DateTime.now(DateTimeZone.UTC);
Wid wid = new Wid(randomAsciiOfLength(5), randomLong(), now); Wid wid = new Wid(randomAsciiOfLength(5), now);
WatchExecutionContext ctx = mockExecutionContextBuilder(wid.watchId()) WatchExecutionContext ctx = mockExecutionContextBuilder(wid.watchId())
.wid(wid) .wid(wid)
.payload(new Payload.Simple()) .payload(new Payload.Simple())

@ -76,7 +76,7 @@ public class SlackActionTests extends ESTestCase {
DateTime now = DateTime.now(DateTimeZone.UTC); DateTime now = DateTime.now(DateTimeZone.UTC);
Wid wid = new Wid(randomAsciiOfLength(5), randomLong(), now); Wid wid = new Wid(randomAsciiOfLength(5), now);
WatchExecutionContext ctx = mockExecutionContextBuilder(wid.watchId()) WatchExecutionContext ctx = mockExecutionContextBuilder(wid.watchId())
.wid(wid) .wid(wid)
.payload(payload) .payload(payload)

@ -9,7 +9,6 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
@ -36,7 +35,6 @@ import org.elasticsearch.xpack.watcher.transform.Transform;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.elasticsearch.xpack.watcher.watch.Payload; import org.elasticsearch.xpack.watcher.watch.Payload;
import org.elasticsearch.xpack.watcher.watch.Watch; import org.elasticsearch.xpack.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.watch.WatchLockService;
import org.elasticsearch.xpack.watcher.watch.WatchStatus; import org.elasticsearch.xpack.watcher.watch.WatchStatus;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.DateTimeZone; import org.joda.time.DateTimeZone;
@ -76,7 +74,6 @@ public class ExecutionServiceTests extends ESTestCase {
private TriggeredWatchStore triggeredWatchStore; private TriggeredWatchStore triggeredWatchStore;
private WatchExecutor executor; private WatchExecutor executor;
private HistoryStore historyStore; private HistoryStore historyStore;
private WatchLockService watchLockService;
private ExecutionService executionService; private ExecutionService executionService;
private Clock clock; private Clock clock;
private ThreadPool threadPool; private ThreadPool threadPool;
@ -98,14 +95,13 @@ public class ExecutionServiceTests extends ESTestCase {
executor = mock(WatchExecutor.class); executor = mock(WatchExecutor.class);
when(executor.queue()).thenReturn(new ArrayBlockingQueue<>(1)); when(executor.queue()).thenReturn(new ArrayBlockingQueue<>(1));
watchLockService = mock(WatchLockService.class);
clock = ClockMock.frozen(); clock = ClockMock.frozen();
threadPool = mock(ThreadPool.class); threadPool = mock(ThreadPool.class);
client = mock(WatcherClientProxy.class); client = mock(WatcherClientProxy.class);
parser = mock(Watch.Parser.class); parser = mock(Watch.Parser.class);
executionService = new ExecutionService(Settings.EMPTY, historyStore, triggeredWatchStore, executor, watchLockService, clock, executionService = new ExecutionService(Settings.EMPTY, historyStore, triggeredWatchStore, executor, clock, threadPool,
threadPool, parser, client); parser, client);
ClusterState clusterState = mock(ClusterState.class); ClusterState clusterState = mock(ClusterState.class);
when(triggeredWatchStore.loadTriggeredWatches(clusterState)).thenReturn(new ArrayList<>()); when(triggeredWatchStore.loadTriggeredWatches(clusterState)).thenReturn(new ArrayList<>());
@ -113,9 +109,6 @@ public class ExecutionServiceTests extends ESTestCase {
} }
public void testExecute() throws Exception { public void testExecute() throws Exception {
Releasable releasable = mock(Releasable.class);
when(watchLockService.acquire("_id")).thenReturn(releasable);
Watch watch = mock(Watch.class); Watch watch = mock(Watch.class);
when(watch.id()).thenReturn("_id"); when(watch.id()).thenReturn("_id");
GetResponse getResponse = mock(GetResponse.class); GetResponse getResponse = mock(GetResponse.class);
@ -194,7 +187,6 @@ public class ExecutionServiceTests extends ESTestCase {
assertThat(result.action(), sameInstance(actionResult)); assertThat(result.action(), sameInstance(actionResult));
verify(historyStore, times(1)).put(watchRecord); verify(historyStore, times(1)).put(watchRecord);
verify(releasable, times(1)).close();
verify(condition, times(1)).execute(context); verify(condition, times(1)).execute(context);
verify(watchTransform, times(1)).execute(context, payload); verify(watchTransform, times(1)).execute(context, payload);
verify(action, times(1)).execute("_action", context, payload); verify(action, times(1)).execute("_action", context, payload);
@ -208,9 +200,6 @@ public class ExecutionServiceTests extends ESTestCase {
} }
public void testExecuteFailedInput() throws Exception { public void testExecuteFailedInput() throws Exception {
Releasable releasable = mock(Releasable.class);
when(watchLockService.acquire("_id")).thenReturn(releasable);
GetResponse getResponse = mock(GetResponse.class); GetResponse getResponse = mock(GetResponse.class);
when(getResponse.isExists()).thenReturn(true); when(getResponse.isExists()).thenReturn(true);
when(client.getWatch("_id")).thenReturn(getResponse); when(client.getWatch("_id")).thenReturn(getResponse);
@ -273,7 +262,6 @@ public class ExecutionServiceTests extends ESTestCase {
assertThat(watchRecord.result().actionsResults().size(), is(0)); assertThat(watchRecord.result().actionsResults().size(), is(0));
verify(historyStore, times(1)).put(watchRecord); verify(historyStore, times(1)).put(watchRecord);
verify(releasable, times(1)).close();
verify(input, times(1)).execute(context, null); verify(input, times(1)).execute(context, null);
verify(condition, never()).execute(context); verify(condition, never()).execute(context);
verify(watchTransform, never()).execute(context, payload); verify(watchTransform, never()).execute(context, payload);
@ -281,9 +269,6 @@ public class ExecutionServiceTests extends ESTestCase {
} }
public void testExecuteFailedCondition() throws Exception { public void testExecuteFailedCondition() throws Exception {
Releasable releasable = mock(Releasable.class);
when(watchLockService.acquire("_id")).thenReturn(releasable);
Watch watch = mock(Watch.class); Watch watch = mock(Watch.class);
when(watch.id()).thenReturn("_id"); when(watch.id()).thenReturn("_id");
GetResponse getResponse = mock(GetResponse.class); GetResponse getResponse = mock(GetResponse.class);
@ -341,7 +326,6 @@ public class ExecutionServiceTests extends ESTestCase {
assertThat(watchRecord.result().actionsResults().size(), is(0)); assertThat(watchRecord.result().actionsResults().size(), is(0));
verify(historyStore, times(1)).put(watchRecord); verify(historyStore, times(1)).put(watchRecord);
verify(releasable, times(1)).close();
verify(input, times(1)).execute(context, null); verify(input, times(1)).execute(context, null);
verify(condition, times(1)).execute(context); verify(condition, times(1)).execute(context);
verify(watchTransform, never()).execute(context, payload); verify(watchTransform, never()).execute(context, payload);
@ -349,9 +333,6 @@ public class ExecutionServiceTests extends ESTestCase {
} }
public void testExecuteFailedWatchTransform() throws Exception { public void testExecuteFailedWatchTransform() throws Exception {
Releasable releasable = mock(Releasable.class);
when(watchLockService.acquire("_id")).thenReturn(releasable);
Watch watch = mock(Watch.class); Watch watch = mock(Watch.class);
when(watch.id()).thenReturn("_id"); when(watch.id()).thenReturn("_id");
GetResponse getResponse = mock(GetResponse.class); GetResponse getResponse = mock(GetResponse.class);
@ -408,7 +389,6 @@ public class ExecutionServiceTests extends ESTestCase {
assertThat(watchRecord.result().actionsResults().size(), is(0)); assertThat(watchRecord.result().actionsResults().size(), is(0));
verify(historyStore, times(1)).put(watchRecord); verify(historyStore, times(1)).put(watchRecord);
verify(releasable, times(1)).close();
verify(input, times(1)).execute(context, null); verify(input, times(1)).execute(context, null);
verify(condition, times(1)).execute(context); verify(condition, times(1)).execute(context);
verify(watchTransform, times(1)).execute(context, payload); verify(watchTransform, times(1)).execute(context, payload);
@ -416,9 +396,6 @@ public class ExecutionServiceTests extends ESTestCase {
} }
public void testExecuteFailedActionTransform() throws Exception { public void testExecuteFailedActionTransform() throws Exception {
Releasable releasable = mock(Releasable.class);
when(watchLockService.acquire("_id")).thenReturn(releasable);
Watch watch = mock(Watch.class); Watch watch = mock(Watch.class);
when(watch.id()).thenReturn("_id"); when(watch.id()).thenReturn("_id");
GetResponse getResponse = mock(GetResponse.class); GetResponse getResponse = mock(GetResponse.class);
@ -493,7 +470,6 @@ public class ExecutionServiceTests extends ESTestCase {
assertThat(watchRecord.result().actionsResults().get("_action").action().status(), is(Action.Result.Status.FAILURE)); assertThat(watchRecord.result().actionsResults().get("_action").action().status(), is(Action.Result.Status.FAILURE));
verify(historyStore, times(1)).put(watchRecord); verify(historyStore, times(1)).put(watchRecord);
verify(releasable, times(1)).close();
verify(input, times(1)).execute(context, null); verify(input, times(1)).execute(context, null);
verify(condition, times(1)).execute(context); verify(condition, times(1)).execute(context);
verify(watchTransform, times(1)).execute(context, payload); verify(watchTransform, times(1)).execute(context, payload);
@ -787,7 +763,6 @@ public class ExecutionServiceTests extends ESTestCase {
public void testThatTriggeredWatchDeletionWorksOnExecutionRejection() throws Exception { public void testThatTriggeredWatchDeletionWorksOnExecutionRejection() throws Exception {
Watch watch = mock(Watch.class); Watch watch = mock(Watch.class);
when(watch.id()).thenReturn("foo"); when(watch.id()).thenReturn("foo");
when(watch.nonce()).thenReturn(1L);
GetResponse getResponse = mock(GetResponse.class); GetResponse getResponse = mock(GetResponse.class);
when(getResponse.isExists()).thenReturn(true); when(getResponse.isExists()).thenReturn(true);
when(getResponse.getId()).thenReturn("foo"); when(getResponse.getId()).thenReturn("foo");
@ -798,7 +773,7 @@ public class ExecutionServiceTests extends ESTestCase {
doThrow(new EsRejectedExecutionException()).when(executor).execute(any()); doThrow(new EsRejectedExecutionException()).when(executor).execute(any());
doThrow(new ElasticsearchException("whatever")).when(historyStore).forcePut(any()); doThrow(new ElasticsearchException("whatever")).when(historyStore).forcePut(any());
Wid wid = new Wid(watch.id(), watch.nonce(), now()); Wid wid = new Wid(watch.id(), now());
final ExecutorService currentThreadExecutor = EsExecutors.newDirectExecutorService(); final ExecutorService currentThreadExecutor = EsExecutors.newDirectExecutorService();
when(threadPool.generic()).thenReturn(currentThreadExecutor); when(threadPool.generic()).thenReturn(currentThreadExecutor);
@ -810,6 +785,19 @@ public class ExecutionServiceTests extends ESTestCase {
verify(historyStore, times(1)).forcePut(any(WatchRecord.class)); verify(historyStore, times(1)).forcePut(any(WatchRecord.class));
} }
public void testThatSingleWatchCannotBeExecutedConcurrently() throws Exception {
WatchExecutionContext ctx = mock(WatchExecutionContext.class);
Watch watch = mock(Watch.class);
when(watch.id()).thenReturn("_id");
when(ctx.watch()).thenReturn(watch);
executionService.getCurrentExecutions().put("_id", new ExecutionService.WatchExecution(ctx, Thread.currentThread()));
executionService.execute(ctx);
verify(ctx).abortBeforeExecution(eq(ExecutionState.NOT_EXECUTED_ALREADY_QUEUED), eq("Watch is already queued in thread pool"));
}
private Tuple<Condition, Condition.Result> whenCondition(final WatchExecutionContext context) { private Tuple<Condition, Condition.Result> whenCondition(final WatchExecutionContext context) {
Condition.Result conditionResult = mock(Condition.Result.class); Condition.Result conditionResult = mock(Condition.Result.class);
when(conditionResult.met()).thenReturn(true); when(conditionResult.met()).thenReturn(true);

@ -34,7 +34,7 @@ public class TriggeredWatchStoreLifeCycleTests extends AbstractWatcherIntegratio
for (int i = 0; i < triggeredWatches.length; i++) { for (int i = 0; i < triggeredWatches.length; i++) {
DateTime dateTime = new DateTime(i, DateTimeZone.UTC); DateTime dateTime = new DateTime(i, DateTimeZone.UTC);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watch.id(), dateTime, dateTime); ScheduleTriggerEvent event = new ScheduleTriggerEvent(watch.id(), dateTime, dateTime);
Wid wid = new Wid("record_" + i, randomLong(), DateTime.now(DateTimeZone.UTC)); Wid wid = new Wid("record_" + i, DateTime.now(DateTimeZone.UTC));
triggeredWatches[i] = new TriggeredWatch(wid, event); triggeredWatches[i] = new TriggeredWatch(wid, event);
triggeredWatchStore.put(triggeredWatches[i]); triggeredWatchStore.put(triggeredWatches[i]);
GetResponse getResponse = client().prepareGet(TriggeredWatchStore.INDEX_NAME, TriggeredWatchStore.DOC_TYPE, GetResponse getResponse = client().prepareGet(TriggeredWatchStore.INDEX_NAME, TriggeredWatchStore.DOC_TYPE,

@ -23,7 +23,7 @@ public class TriggeredWatchTests extends AbstractWatcherIntegrationTestCase {
Watch watch = WatcherTestUtils.createTestWatch("fired_test", watcherHttpClient(), noopEmailService(), Watch watch = WatcherTestUtils.createTestWatch("fired_test", watcherHttpClient(), noopEmailService(),
watcherSearchTemplateService(), logger); watcherSearchTemplateService(), logger);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watch.id(), DateTime.now(DateTimeZone.UTC), DateTime.now(DateTimeZone.UTC)); ScheduleTriggerEvent event = new ScheduleTriggerEvent(watch.id(), DateTime.now(DateTimeZone.UTC), DateTime.now(DateTimeZone.UTC));
Wid wid = new Wid("_record", randomLong(), DateTime.now(DateTimeZone.UTC)); Wid wid = new Wid("_record", DateTime.now(DateTimeZone.UTC));
TriggeredWatch triggeredWatch = new TriggeredWatch(wid, event); TriggeredWatch triggeredWatch = new TriggeredWatch(wid, event);
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder(); XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
triggeredWatch.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS); triggeredWatch.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);

@ -61,7 +61,7 @@ public class HistoryStoreTests extends ESTestCase {
} }
public void testPut() throws Exception { public void testPut() throws Exception {
Wid wid = new Wid("_name", 0, new DateTime(0, UTC)); Wid wid = new Wid("_name", new DateTime(0, UTC));
ScheduleTriggerEvent event = new ScheduleTriggerEvent(wid.watchId(), new DateTime(0, UTC), new DateTime(0, UTC)); ScheduleTriggerEvent event = new ScheduleTriggerEvent(wid.watchId(), new DateTime(0, UTC), new DateTime(0, UTC));
WatchRecord watchRecord = new WatchRecord.MessageWatchRecord(wid, event, ExecutionState.EXECUTED, null); WatchRecord watchRecord = new WatchRecord.MessageWatchRecord(wid, event, ExecutionState.EXECUTED, null);
@ -74,7 +74,7 @@ public class HistoryStoreTests extends ESTestCase {
} }
public void testPutStopped() throws Exception { public void testPutStopped() throws Exception {
Wid wid = new Wid("_name", 0, new DateTime(0, UTC)); Wid wid = new Wid("_name", new DateTime(0, UTC));
ScheduleTriggerEvent event = new ScheduleTriggerEvent(wid.watchId(), new DateTime(0, UTC), new DateTime(0, UTC)); ScheduleTriggerEvent event = new ScheduleTriggerEvent(wid.watchId(), new DateTime(0, UTC), new DateTime(0, UTC));
WatchRecord watchRecord = new WatchRecord.MessageWatchRecord(wid, event, ExecutionState.EXECUTED, null); WatchRecord watchRecord = new WatchRecord.MessageWatchRecord(wid, event, ExecutionState.EXECUTED, null);
@ -116,7 +116,7 @@ public class HistoryStoreTests extends ESTestCase {
ActionWrapper.Result result = new ActionWrapper.Result(JiraAction.TYPE, new JiraAction.Executed(jiraIssue)); ActionWrapper.Result result = new ActionWrapper.Result(JiraAction.TYPE, new JiraAction.Executed(jiraIssue));
DateTime now = new DateTime(0, UTC); DateTime now = new DateTime(0, UTC);
Wid wid = new Wid("_name", 0, now); Wid wid = new Wid("_name", now);
Watch watch = mock(Watch.class); Watch watch = mock(Watch.class);
when(watch.id()).thenReturn("_id"); when(watch.id()).thenReturn("_id");

@ -63,7 +63,7 @@ public class ExecutableChainInputTests extends ESTestCase {
private WatchExecutionContext createWatchExecutionContext() { private WatchExecutionContext createWatchExecutionContext() {
DateTime now = DateTime.now(DateTimeZone.UTC); DateTime now = DateTime.now(DateTimeZone.UTC);
Wid wid = new Wid(randomAsciiOfLength(5), randomLong(), now); Wid wid = new Wid(randomAsciiOfLength(5), now);
return mockExecutionContextBuilder(wid.watchId()) return mockExecutionContextBuilder(wid.watchId())
.wid(wid) .wid(wid)
.payload(new Payload.Simple()) .payload(new Payload.Simple())

@ -31,7 +31,7 @@ public class VariablesTests extends ESTestCase {
Payload payload = new Payload.Simple(singletonMap("payload_key", "payload_value")); Payload payload = new Payload.Simple(singletonMap("payload_key", "payload_value"));
Map<String, Object> metatdata = singletonMap("metadata_key", "metadata_value"); Map<String, Object> metatdata = singletonMap("metadata_key", "metadata_value");
TriggerEvent event = new ScheduleTriggerEvent("_watch_id", triggeredTime, scheduledTime); TriggerEvent event = new ScheduleTriggerEvent("_watch_id", triggeredTime, scheduledTime);
Wid wid = new Wid("_watch_id", 0, executionTime); Wid wid = new Wid("_watch_id", executionTime);
WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContextBuilder("_watch_id") WatchExecutionContext ctx = WatcherTestUtils.mockExecutionContextBuilder("_watch_id")
.wid(wid) .wid(wid)
.executionTime(executionTime) .executionTime(executionTime)

@ -139,19 +139,19 @@ public final class WatcherTestUtils {
public static WatchExecutionContextMockBuilder mockExecutionContextBuilder(String watchId) { public static WatchExecutionContextMockBuilder mockExecutionContextBuilder(String watchId) {
return new WatchExecutionContextMockBuilder(watchId) return new WatchExecutionContextMockBuilder(watchId)
.wid(new Wid(watchId, randomInt(10), DateTime.now(UTC))); .wid(new Wid(watchId, DateTime.now(UTC)));
} }
public static WatchExecutionContext mockExecutionContext(String watchId, Payload payload) { public static WatchExecutionContext mockExecutionContext(String watchId, Payload payload) {
return mockExecutionContextBuilder(watchId) return mockExecutionContextBuilder(watchId)
.wid(new Wid(watchId, randomInt(10), DateTime.now(UTC))) .wid(new Wid(watchId, DateTime.now(UTC)))
.payload(payload) .payload(payload)
.buildMock(); .buildMock();
} }
public static WatchExecutionContext mockExecutionContext(String watchId, DateTime time, Payload payload) { public static WatchExecutionContext mockExecutionContext(String watchId, DateTime time, Payload payload) {
return mockExecutionContextBuilder(watchId) return mockExecutionContextBuilder(watchId)
.wid(new Wid(watchId, randomInt(10), DateTime.now(UTC))) .wid(new Wid(watchId, DateTime.now(UTC)))
.payload(payload) .payload(payload)
.time(watchId, time) .time(watchId, time)
.buildMock(); .buildMock();
@ -159,7 +159,7 @@ public final class WatcherTestUtils {
public static WatchExecutionContext mockExecutionContext(String watchId, DateTime executionTime, TriggerEvent event, Payload payload) { public static WatchExecutionContext mockExecutionContext(String watchId, DateTime executionTime, TriggerEvent event, Payload payload) {
return mockExecutionContextBuilder(watchId) return mockExecutionContextBuilder(watchId)
.wid(new Wid(watchId, randomInt(10), DateTime.now(UTC))) .wid(new Wid(watchId, DateTime.now(UTC)))
.payload(payload) .payload(payload)
.executionTime(executionTime) .executionTime(executionTime)
.triggerEvent(event) .triggerEvent(event)

@ -29,6 +29,7 @@ import org.hamcrest.Matchers;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.DateTimeZone; import org.joda.time.DateTimeZone;
import java.util.Arrays;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
@ -43,6 +44,7 @@ import static org.elasticsearch.xpack.watcher.input.InputBuilders.searchInput;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.templateRequest; import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.templateRequest;
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule; import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.cron; import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.cron;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.core.IsEqual.equalTo; import static org.hamcrest.core.IsEqual.equalTo;
import static org.joda.time.DateTimeZone.UTC; import static org.joda.time.DateTimeZone.UTC;
@ -69,7 +71,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
// valid watch record: // valid watch record:
DateTime now = DateTime.now(UTC); DateTime now = DateTime.now(UTC);
Wid wid = new Wid("_id", 1, now); Wid wid = new Wid("_id", now);
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
Condition condition = AlwaysCondition.INSTANCE; Condition condition = AlwaysCondition.INSTANCE;
String index = HistoryStore.getHistoryIndexNameForTime(now); String index = HistoryStore.getHistoryIndexNameForTime(now);
@ -90,7 +92,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
.get(); .get();
// unknown condition: // unknown condition:
wid = new Wid("_id", 2, now); wid = new Wid("_id", now);
client().prepareIndex(index, HistoryStore.DOC_TYPE, wid.value()) client().prepareIndex(index, HistoryStore.DOC_TYPE, wid.value())
.setSource(jsonBuilder().startObject() .setSource(jsonBuilder().startObject()
.startObject(WatchRecord.Field.TRIGGER_EVENT.getPreferredName()) .startObject(WatchRecord.Field.TRIGGER_EVENT.getPreferredName())
@ -108,7 +110,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
.get(); .get();
// unknown trigger: // unknown trigger:
wid = new Wid("_id", 2, now); wid = new Wid("_id", now);
client().prepareIndex(index, HistoryStore.DOC_TYPE, wid.value()) client().prepareIndex(index, HistoryStore.DOC_TYPE, wid.value())
.setSource(jsonBuilder().startObject() .setSource(jsonBuilder().startObject()
.startObject(WatchRecord.Field.TRIGGER_EVENT.getPreferredName()) .startObject(WatchRecord.Field.TRIGGER_EVENT.getPreferredName())
@ -139,7 +141,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
assertAcked(client().admin().indices().prepareCreate(Watch.INDEX)); assertAcked(client().admin().indices().prepareCreate(Watch.INDEX));
} }
DateTime now = DateTime.now(UTC); DateTime now = DateTime.now(UTC);
Wid wid = new Wid("_id", 1, now); Wid wid = new Wid("_id", now);
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
client().prepareIndex(TriggeredWatchStore.INDEX_NAME, TriggeredWatchStore.DOC_TYPE, wid.value()) client().prepareIndex(TriggeredWatchStore.INDEX_NAME, TriggeredWatchStore.DOC_TYPE, wid.value())
@ -188,7 +190,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
assertThat(response.getWatchesCount(), equalTo((long) numWatches)); assertThat(response.getWatchesCount(), equalTo((long) numWatches));
} }
public void testTriggeredWatchLoading() throws Exception { public void testMixedTriggeredWatchLoading() throws Exception {
createIndex("output"); createIndex("output");
client().prepareIndex("my-index", "foo", "bar") client().prepareIndex("my-index", "foo", "bar")
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
@ -198,8 +200,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
assertThat(response.getWatcherState(), equalTo(WatcherState.STARTED)); assertThat(response.getWatcherState(), equalTo(WatcherState.STARTED));
assertThat(response.getWatchesCount(), equalTo(0L)); assertThat(response.getWatchesCount(), equalTo(0L));
WatcherSearchTemplateRequest request = WatcherSearchTemplateRequest request = templateRequest(searchSource().query(termQuery("field", "value")), "my-index");
templateRequest(searchSource().query(termQuery("field", "value")), "my-index");
int numWatches = 8; int numWatches = 8;
for (int i = 0; i < numWatches; i++) { for (int i = 0; i < numWatches; i++) {
@ -219,7 +220,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
String watchId = "_id" + (i % numWatches); String watchId = "_id" + (i % numWatches);
now = now.plusMinutes(1); now = now.plusMinutes(1);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watchId, now, now); ScheduleTriggerEvent event = new ScheduleTriggerEvent(watchId, now, now);
Wid wid = new Wid(watchId, randomLong(), now); Wid wid = new Wid(watchId, now);
TriggeredWatch triggeredWatch = new TriggeredWatch(wid, event); TriggeredWatch triggeredWatch = new TriggeredWatch(wid, event);
client().prepareIndex(TriggeredWatchStore.INDEX_NAME, TriggeredWatchStore.DOC_TYPE, triggeredWatch.id().value()) client().prepareIndex(TriggeredWatchStore.INDEX_NAME, TriggeredWatchStore.DOC_TYPE, triggeredWatch.id().value())
.setSource(jsonBuilder().value(triggeredWatch)) .setSource(jsonBuilder().value(triggeredWatch))
@ -230,22 +231,10 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
stopWatcher(); stopWatcher();
startWatcher(); startWatcher();
assertBusy(() -> { assertSingleExecutionAndCompleteWatchHistory(numWatches, numRecords);
// We need to wait until all the records are processed from the internal execution queue, only then we can assert
// that numRecords watch records have been processed as part of starting up.
WatcherStatsResponse response1 = watcherClient().prepareWatcherStats().get();
assertThat(response1.getWatcherState(), equalTo(WatcherState.STARTED));
assertThat(response1.getThreadPoolQueueSize(), equalTo(0L));
// but even then since the execution of the watch record is async it may take a little bit before
// the actual documents are in the output index
refresh();
SearchResponse searchResponse = client().prepareSearch("output").get();
assertHitCount(searchResponse, numRecords);
}, 30, TimeUnit.SECONDS);
} }
public void testMixedTriggeredWatchLoading() throws Exception { public void testTriggeredWatchLoading() throws Exception {
createIndex("output"); createIndex("output");
client().prepareIndex("my-index", "foo", "bar") client().prepareIndex("my-index", "foo", "bar")
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
@ -270,7 +259,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
for (int i = 0; i < numRecords; i++) { for (int i = 0; i < numRecords; i++) {
now = now.plusMinutes(1); now = now.plusMinutes(1);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watchId, now, now); ScheduleTriggerEvent event = new ScheduleTriggerEvent(watchId, now, now);
Wid wid = new Wid(watchId, randomLong(), now); Wid wid = new Wid(watchId, now);
TriggeredWatch triggeredWatch = new TriggeredWatch(wid, event); TriggeredWatch triggeredWatch = new TriggeredWatch(wid, event);
client().prepareIndex(TriggeredWatchStore.INDEX_NAME, TriggeredWatchStore.DOC_TYPE, triggeredWatch.id().value()) client().prepareIndex(TriggeredWatchStore.INDEX_NAME, TriggeredWatchStore.DOC_TYPE, triggeredWatch.id().value())
.setSource(jsonBuilder().value(triggeredWatch)) .setSource(jsonBuilder().value(triggeredWatch))
@ -281,19 +270,34 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
stopWatcher(); stopWatcher();
startWatcher(); startWatcher();
assertSingleExecutionAndCompleteWatchHistory(1, numRecords);
}
private void assertSingleExecutionAndCompleteWatchHistory(long numberOfWatches, int expectedWatchHistoryCount) throws Exception {
assertBusy(() -> { assertBusy(() -> {
// We need to wait until all the records are processed from the internal execution queue, only then we can assert // We need to wait until all the records are processed from the internal execution queue, only then we can assert
// that numRecords watch records have been processed as part of starting up. // that numRecords watch records have been processed as part of starting up.
WatcherStatsResponse response1 = watcherClient().prepareWatcherStats().get(); WatcherStatsResponse response = watcherClient().prepareWatcherStats().get();
assertThat(response1.getWatcherState(), equalTo(WatcherState.STARTED)); assertThat(response.getWatcherState(), equalTo(WatcherState.STARTED));
assertThat(response1.getThreadPoolQueueSize(), equalTo(0L)); assertThat(response.getThreadPoolQueueSize(), equalTo(0L));
// but even then since the execution of the watch record is async it may take a little bit before // because we try to execute a single watch in parallel, only one execution should happen
// the actual documents are in the output index
refresh(); refresh();
SearchResponse searchResponse = client().prepareSearch("output").get(); SearchResponse searchResponse = client().prepareSearch("output").get();
assertHitCount(searchResponse, numRecords); assertThat(searchResponse.getHits().totalHits(), is(greaterThanOrEqualTo(numberOfWatches)));
}); long successfulWatchExecutions = searchResponse.getHits().totalHits();
// the watch history should contain entries for each triggered watch, which a few have been marked as not executed
SearchResponse historySearchResponse = client().prepareSearch(HistoryStore.INDEX_PREFIX + "*")
.setSize(expectedWatchHistoryCount).get();
assertHitCount(historySearchResponse, expectedWatchHistoryCount);
long notExecutedCount = Arrays.asList(historySearchResponse.getHits().getHits()).stream()
.filter(hit -> hit.getSource().get("state").equals(ExecutionState.NOT_EXECUTED_ALREADY_QUEUED.id()))
.count();
logger.info("Watches not executed: [{}]: expected watch history count [{}] - [{}] successful watch exections",
notExecutedCount, expectedWatchHistoryCount, successfulWatchExecutions);
assertThat(notExecutedCount, is(expectedWatchHistoryCount - successfulWatchExecutions));
}, 20, TimeUnit.SECONDS);
} }
public void testManuallyStopped() throws Exception { public void testManuallyStopped() throws Exception {
@ -327,7 +331,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
for (int i = 0; i < numRecords; i++) { for (int i = 0; i < numRecords; i++) {
String watchId = Integer.toString(i); String watchId = Integer.toString(i);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watchId, triggeredTime, triggeredTime); ScheduleTriggerEvent event = new ScheduleTriggerEvent(watchId, triggeredTime, triggeredTime);
Wid wid = new Wid(watchId, 0, triggeredTime); Wid wid = new Wid(watchId, triggeredTime);
TriggeredWatch triggeredWatch = new TriggeredWatch(wid, event); TriggeredWatch triggeredWatch = new TriggeredWatch(wid, event);
client().prepareIndex(TriggeredWatchStore.INDEX_NAME, TriggeredWatchStore.DOC_TYPE, triggeredWatch.id().value()) client().prepareIndex(TriggeredWatchStore.INDEX_NAME, TriggeredWatchStore.DOC_TYPE, triggeredWatch.id().value())
.setSource(jsonBuilder().value(triggeredWatch)) .setSource(jsonBuilder().value(triggeredWatch))

@ -1,107 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.watch;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.startsWith;
public class WatchLockServiceTests extends ESTestCase {
private final Settings settings =
Settings.builder().put(WatchLockService.DEFAULT_MAX_STOP_TIMEOUT_SETTING, TimeValue.timeValueSeconds(1)).build();
public void testLockingNotStarted() {
WatchLockService lockService = new WatchLockService(settings);
try {
lockService.acquire("_name");
fail("exception expected");
} catch (Exception e) {
assertThat(e.getMessage(), containsString("not running"));
}
}
public void testLocking() {
WatchLockService lockService = new WatchLockService(settings);
lockService.start();
Releasable releasable = lockService.acquire("_name");
assertThat(lockService.getWatchLocks().hasLockedKeys(), is(true));
releasable.close();
assertThat(lockService.getWatchLocks().hasLockedKeys(), is(false));
lockService.stop();
}
public void testLockingStopTimeout(){
final WatchLockService lockService = new WatchLockService(settings);
lockService.start();
lockService.acquire("_name");
try {
lockService.stop();
fail("Expected ElasticsearchTimeoutException");
} catch (ElasticsearchTimeoutException e) {
assertThat(e.getMessage(), startsWith("timed out waiting for watches to complete, after waiting for"));
}
}
public void testLockingFair() throws Exception {
final WatchLockService lockService = new WatchLockService(settings);
lockService.start();
final AtomicInteger value = new AtomicInteger(0);
List<Thread> threads = new ArrayList<>();
class FairRunner implements Runnable {
final int expectedValue;
final CountDownLatch startLatch = new CountDownLatch(1);
FairRunner(int expectedValue) {
this.expectedValue = expectedValue;
}
@Override
public void run() {
startLatch.countDown();
try (Releasable ignored = lockService.acquire("_name")) {
int actualValue = value.getAndIncrement();
assertThat(actualValue, equalTo(expectedValue));
Thread.sleep(50);
} catch(InterruptedException ie) {
}
}
}
List<FairRunner> runners = new ArrayList<>();
for(int i = 0; i < 50; ++i) {
FairRunner f = new FairRunner(i);
runners.add(f);
threads.add(new Thread(f));
}
for(int i = 0; i < threads.size(); ++i) {
threads.get(i).start();
runners.get(i).startLatch.await();
Thread.sleep(25);
}
for(Thread t : threads) {
t.join();
}
}
}