diff --git a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java index 174eb77fa2f..802ddefa0b9 100644 --- a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java +++ b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java @@ -26,7 +26,6 @@ import org.elasticsearch.xpack.watcher.watch.WatchStatus; import org.elasticsearch.xpack.watcher.watch.WatchStore; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; -import org.joda.time.PeriodType; import java.io.IOException; import java.util.Map; @@ -108,71 +107,46 @@ public class WatcherService extends AbstractComponent { } } - public WatchStore.WatchDelete deleteWatch(String id, TimeValue timeout, final boolean force) { + public WatchStore.WatchDelete deleteWatch(String id) { ensureStarted(); - WatchLockService.Lock lock = null; - if (!force) { - lock = watchLockService.tryAcquire(id, timeout); - if (lock == null) { - throw new ElasticsearchTimeoutException("could not delete watch [{}] within [{}]... wait and try again. If this error " + - "continues to occur there is a high chance that the watch execution is stuck (either due to unresponsive external" + - " system such as an email service, or due to a bad script", id, timeout.format(PeriodType.seconds())); - } - } - try { - WatchStore.WatchDelete delete = watchStore.delete(id, force); - if (delete.deleteResponse().getResult() == DocWriteResponse.Result.DELETED) { - triggerService.remove(id); - } - return delete; - } finally { - if (lock != null) { - lock.release(); - } + WatchStore.WatchDelete delete = watchStore.delete(id); + if (delete.deleteResponse().getResult() == DocWriteResponse.Result.DELETED) { + triggerService.remove(id); } + return delete; } - public IndexResponse putWatch(String id, BytesReference watchSource, TimeValue timeout, boolean active) throws IOException { + public IndexResponse putWatch(String id, BytesReference watchSource, boolean active) throws IOException { ensureStarted(); - WatchLockService.Lock lock = watchLockService.tryAcquire(id, timeout); - if (lock == null) { - throw new ElasticsearchTimeoutException("could not put watch [{}] within [{}]... wait and try again. If this error continues " + - "to occur there is a high chance that the watch execution is stuck (either due to unresponsive external system such " + - "as an email service, or due to a bad script", id, timeout.format(PeriodType.seconds())); - } - try { - DateTime now = clock.nowUTC(); - Watch watch = watchParser.parseWithSecrets(id, false, watchSource, now); - watch.setState(active, now); - WatchStore.WatchPut result = watchStore.put(watch); + DateTime now = clock.nowUTC(); + Watch watch = watchParser.parseWithSecrets(id, false, watchSource, now); + watch.setState(active, now); + WatchStore.WatchPut result = watchStore.put(watch); - if (result.previous() == null) { - // this is a newly created watch, so we only need to schedule it if it's active - if (result.current().status().state().isActive()) { - triggerService.add(result.current()); - } - - } else if (result.current().status().state().isActive()) { - - if (!result.previous().status().state().isActive()) { - // the replaced watch was inactive, which means it wasn't scheduled. The new watch is active - // so we need to schedule it - triggerService.add(result.current()); - - } else if (!result.previous().trigger().equals(result.current().trigger())) { - // the previous watch was active and its schedule is different than the schedule of the - // new watch, so we need to - triggerService.add(result.current()); - } - } else { - // if the current is inactive, we'll just remove it from the trigger service - // just to be safe - triggerService.remove(result.current().id()); + if (result.previous() == null) { + // this is a newly created watch, so we only need to schedule it if it's active + if (result.current().status().state().isActive()) { + triggerService.add(result.current()); } - return result.indexResponse(); - } finally { - lock.release(); + + } else if (result.current().status().state().isActive()) { + + if (!result.previous().status().state().isActive()) { + // the replaced watch was inactive, which means it wasn't scheduled. The new watch is active + // so we need to schedule it + triggerService.add(result.current()); + + } else if (!result.previous().trigger().equals(result.current().trigger())) { + // the previous watch was active and its schedule is different than the schedule of the + // new watch, so we need to + triggerService.add(result.current()); + } + } else { + // if the current is inactive, we'll just remove it from the trigger service + // just to be safe + triggerService.remove(result.current().id()); } + return result.indexResponse(); } /** @@ -189,55 +163,38 @@ public class WatcherService extends AbstractComponent { /** * Acks the watch if needed */ - public WatchStatus ackWatch(String id, String[] actionIds, TimeValue timeout) throws IOException { + public WatchStatus ackWatch(String id, String[] actionIds) throws IOException { ensureStarted(); - WatchLockService.Lock lock = watchLockService.tryAcquire(id, timeout); - if (lock == null) { - throw new ElasticsearchTimeoutException("could not ack watch [{}] within [{}]... wait and try again. If this error continues " + - "to occur there is a high chance that the watch execution is stuck (either due to unresponsive external system such " + - "as an email service, or due to a bad script", id, timeout.format(PeriodType.seconds())); - } if (actionIds == null || actionIds.length == 0) { actionIds = new String[] { Watch.ALL_ACTIONS_ID }; } - try { - Watch watch = watchStore.get(id); - if (watch == null) { - throw illegalArgument("watch [{}] does not exist", id); - } - // we need to create a safe copy of the status - if (watch.ack(clock.now(DateTimeZone.UTC), actionIds)) { - try { - watchStore.updateStatus(watch); - } catch (IOException ioe) { - throw ioException("failed to update the watch [{}] on ack", ioe, watch.id()); - } catch (VersionConflictEngineException vcee) { - throw illegalState("failed to update the watch [{}] on ack, perhaps it was force deleted", vcee, watch.id()); - } - } - return new WatchStatus(watch.status()); - } finally { - lock.release(); + Watch watch = watchStore.get(id); + if (watch == null) { + throw illegalArgument("watch [{}] does not exist", id); } + // we need to create a safe copy of the status + if (watch.ack(clock.now(DateTimeZone.UTC), actionIds)) { + try { + watchStore.updateStatus(watch); + } catch (IOException ioe) { + throw ioException("failed to update the watch [{}] on ack", ioe, watch.id()); + } catch (VersionConflictEngineException vcee) { + throw illegalState("failed to update the watch [{}] on ack, perhaps it was force deleted", vcee, watch.id()); + } + } + return new WatchStatus(watch.status()); } - public WatchStatus activateWatch(String id, TimeValue timeout) throws IOException { - return setWatchState(id, true, timeout); + public WatchStatus activateWatch(String id) throws IOException { + return setWatchState(id, true); } - public WatchStatus deactivateWatch(String id, TimeValue timeout) throws IOException { - return setWatchState(id, false, timeout); + public WatchStatus deactivateWatch(String id) throws IOException { + return setWatchState(id, false); } - WatchStatus setWatchState(String id, boolean active, TimeValue timeout) throws IOException { + WatchStatus setWatchState(String id, boolean active) throws IOException { ensureStarted(); - WatchLockService.Lock lock = watchLockService.tryAcquire(id, timeout); - if (lock == null) { - throw new ElasticsearchTimeoutException("could not ack watch [{}] within [{}]... wait and try again. If this error continues " + - "to occur there is a high chance that the watch execution is stuck (either due to unresponsive external system such " + - "as an email service, or due to a bad script", id, timeout.format(PeriodType.seconds())); - } - // for now, when a watch is deactivated we don't remove its runtime representation // that is, the store will still keep the watch in memory. We only mark the watch // as inactive (both in runtime and also update the watch in the watches index) @@ -251,30 +208,26 @@ public class WatcherService extends AbstractComponent { // to run this exercise anyway... and make sure that nothing in watcher relies on the // fact that the watch store holds all watches in memory. - try { - Watch watch = watchStore.get(id); - if (watch == null) { - throw illegalArgument("watch [{}] does not exist", id); - } - if (watch.setState(active, clock.nowUTC())) { - try { - watchStore.updateStatus(watch); - if (active) { - triggerService.add(watch); - } else { - triggerService.remove(watch.id()); - } - } catch (IOException ioe) { - throw ioException("failed to update the watch [{}] on ack", ioe, watch.id()); - } catch (VersionConflictEngineException vcee) { - throw illegalState("failed to update the watch [{}] on ack, perhaps it was force deleted", vcee, watch.id()); - } - } - // we need to create a safe copy of the status - return new WatchStatus(watch.status()); - } finally { - lock.release(); + Watch watch = watchStore.get(id); + if (watch == null) { + throw illegalArgument("watch [{}] does not exist", id); } + if (watch.setState(active, clock.nowUTC())) { + try { + watchStore.updateStatus(watch); + if (active) { + triggerService.add(watch); + } else { + triggerService.remove(watch.id()); + } + } catch (IOException ioe) { + throw ioException("failed to update the watch [{}] on ack", ioe, watch.id()); + } catch (VersionConflictEngineException vcee) { + throw illegalState("failed to update the watch [{}] on ack, perhaps it was force deleted", vcee, watch.id()); + } + } + // we need to create a safe copy of the status + return new WatchStatus(watch.status()); } public long watchesCount() { diff --git a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index 4091f291652..9e44915ddd5 100644 --- a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -244,9 +245,9 @@ public class ExecutionService extends AbstractComponent { public WatchRecord execute(WatchExecutionContext ctx) { WatchRecord record = null; - WatchLockService.Lock lock = watchLockService.acquire(ctx.watch().id()); + Releasable releasable = watchLockService.acquire(ctx.watch().id()); if (logger.isTraceEnabled()) { - logger.trace("acquired lock for [{}] -- [{}]", ctx.id(), System.identityHashCode(lock)); + logger.trace("acquired lock for [{}] -- [{}]", ctx.id(), System.identityHashCode(releasable)); } try { currentExecutions.put(ctx.watch().id(), new WatchExecution(ctx, Thread.currentThread())); @@ -287,9 +288,9 @@ public class ExecutionService extends AbstractComponent { } currentExecutions.remove(ctx.watch().id()); if (logger.isTraceEnabled()) { - logger.trace("releasing lock for [{}] -- [{}]", ctx.id(), System.identityHashCode(lock)); + logger.trace("releasing lock for [{}] -- [{}]", ctx.id(), System.identityHashCode(releasable)); } - lock.release(); + releasable.close(); logger.trace("finished [{}]/[{}]", ctx.watch().id(), ctx.id()); } return record; diff --git a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/rest/action/RestDeleteWatchAction.java b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/rest/action/RestDeleteWatchAction.java index 39410323028..89ac8ff9846 100644 --- a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/rest/action/RestDeleteWatchAction.java +++ b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/rest/action/RestDeleteWatchAction.java @@ -40,7 +40,6 @@ public class RestDeleteWatchAction extends WatcherRestHandler { protected void handleRequest(final RestRequest request, RestChannel channel, WatcherClient client) throws Exception { DeleteWatchRequest deleteWatchRequest = new DeleteWatchRequest(request.param("id")); deleteWatchRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteWatchRequest.masterNodeTimeout())); - deleteWatchRequest.setForce(request.paramAsBoolean("force", deleteWatchRequest.isForce())); client.deleteWatch(deleteWatchRequest, new RestBuilderListener(channel) { @Override public RestResponse buildResponse(DeleteWatchResponse response, XContentBuilder builder) throws Exception { diff --git a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/support/concurrent/FairKeyedLock.java b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/support/concurrent/FairKeyedLock.java deleted file mode 100644 index a661f8d59f7..00000000000 --- a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/support/concurrent/FairKeyedLock.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.watcher.support.concurrent; - -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; - -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -/** - */ -public class FairKeyedLock { - - private final ConcurrentMap map = ConcurrentCollections.newConcurrentMap(); - - protected final ThreadLocal threadLocal = new ThreadLocal<>(); - - public void acquire(T key) { - while (true) { - if (threadLocal.get() != null) { - // if we are here, the thread already has the lock - throw new IllegalArgumentException("Lock already acquired in Thread" + Thread.currentThread().getId() - + " for key " + key); - } - KeyLock perNodeLock = map.get(key); - if (perNodeLock == null) { - KeyLock newLock = new KeyLock(true); - perNodeLock = map.putIfAbsent(key, newLock); - if (perNodeLock == null) { - newLock.lock(); - threadLocal.set(newLock); - return; - } - } - assert perNodeLock != null; - int i = perNodeLock.count.get(); - if (i > 0 && perNodeLock.count.compareAndSet(i, i + 1)) { - perNodeLock.lock(); - threadLocal.set(perNodeLock); - return; - } - } - } - - public boolean tryAcquire(T key, long timeout, TimeUnit timeUnit) throws InterruptedException { - while (true) { - if (threadLocal.get() != null) { - // if we are here, the thread already has the lock - throw new IllegalArgumentException("Lock already acquired in Thread" + Thread.currentThread().getId() - + " for key " + key); - } - KeyLock perNodeLock = map.get(key); - if (perNodeLock == null) { - KeyLock newLock = new KeyLock(true); - perNodeLock = map.putIfAbsent(key, newLock); - if (perNodeLock == null) { - if (newLock.tryLock(timeout, timeUnit)) { - threadLocal.set(newLock); - return true; - } - return false; - } - } - assert perNodeLock != null; - int i = perNodeLock.count.get(); - if (i > 0 && perNodeLock.count.compareAndSet(i, i + 1)) { - if (perNodeLock.tryLock(timeout, timeUnit)) { - threadLocal.set(perNodeLock); - return true; - } - return false; - } - } - } - - public void release(T key) { - KeyLock lock = threadLocal.get(); - if (lock == null) { - throw new IllegalArgumentException("Lock not acquired"); - } - release(key, lock); - } - - void release(T key, KeyLock lock) { - assert lock.isHeldByCurrentThread(); - assert lock == map.get(key); - lock.unlock(); - threadLocal.set(null); - int decrementAndGet = lock.count.decrementAndGet(); - if (decrementAndGet == 0) { - map.remove(key, lock); - } - } - - - @SuppressWarnings("serial") - private static final class KeyLock extends ReentrantLock { - private final AtomicInteger count = new AtomicInteger(1); - - public KeyLock(boolean fair) { - super(fair); - } - } - - public boolean hasLockedKeys() { - return !map.isEmpty(); - } - - /** - * A {@link FairKeyedLock} that allows to acquire a global lock that guarantees - * exclusive access to the resource the KeyedLock is guarding. - */ - public static final class GlobalLockable extends FairKeyedLock { - - private final ReadWriteLock lock = new ReentrantReadWriteLock(true); - - @Override - public void acquire(T key) { - boolean success = false; - lock.readLock().lock(); - try { - super.acquire(key); - success = true; - } finally { - if (!success) { - lock.readLock().unlock(); - } - } - } - - @Override - public void release(T key) { - KeyLock keyLock = threadLocal.get(); - if (keyLock == null) { - throw new IllegalArgumentException("Lock not acquired"); - } - try { - release(key, keyLock); - } finally { - lock.readLock().unlock(); - } - } - - /** - * Returns a global lock guaranteeing exclusive access to the resource - * this KeyedLock is guarding. - */ - public Lock globalLock() { - return lock.writeLock(); - } - } - - -} diff --git a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchAction.java b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchAction.java index c7bf383fcdf..af14ac8fe74 100644 --- a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchAction.java +++ b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchAction.java @@ -8,11 +8,11 @@ package org.elasticsearch.xpack.watcher.transport.actions.ack; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.license.XPackLicenseState; @@ -54,7 +54,7 @@ public class TransportAckWatchAction extends WatcherTransportAction listener) throws ElasticsearchException { try { - WatchStatus watchStatus = watcherService.ackWatch(request.getWatchId(), request.getActionIds(), request.masterNodeTimeout()); + WatchStatus watchStatus = watcherService.ackWatch(request.getWatchId(), request.getActionIds()); AckWatchResponse response = new AckWatchResponse(watchStatus); listener.onResponse(response); } catch (Exception e) { diff --git a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/activate/TransportActivateWatchAction.java b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/activate/TransportActivateWatchAction.java index bb055963ea2..44c55331e62 100644 --- a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/activate/TransportActivateWatchAction.java +++ b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/activate/TransportActivateWatchAction.java @@ -8,11 +8,11 @@ package org.elasticsearch.xpack.watcher.transport.actions.activate; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.license.XPackLicenseState; @@ -55,8 +55,8 @@ public class TransportActivateWatchAction extends WatcherTransportAction { private String id; private long version = Versions.MATCH_ANY; - private boolean force = false; public DeleteWatchRequest() { this(null); @@ -50,21 +49,6 @@ public class DeleteWatchRequest extends MasterNodeRequest { this.id = id; } - /** - * @return true if this request should ignore locking false if not - */ - public boolean isForce() { - return force; - } - - /** - * @param force Sets weither this request should ignore locking and force the delete even if lock is unavailable. - */ - public void setForce(boolean force) { - this.force = force; - } - - /** * Sets the version, which will cause the delete operation to only be performed if a matching * version exists and no changes happened on the doc since then. @@ -95,7 +79,6 @@ public class DeleteWatchRequest extends MasterNodeRequest { super.readFrom(in); id = in.readString(); version = in.readLong(); - force = in.readBoolean(); } @Override @@ -103,7 +86,6 @@ public class DeleteWatchRequest extends MasterNodeRequest { super.writeTo(out); out.writeString(id); out.writeLong(version); - out.writeBoolean(force); } @Override diff --git a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/delete/DeleteWatchRequestBuilder.java b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/delete/DeleteWatchRequestBuilder.java index 15a524178a1..d5f4467316a 100644 --- a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/delete/DeleteWatchRequestBuilder.java +++ b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/delete/DeleteWatchRequestBuilder.java @@ -29,13 +29,4 @@ public class DeleteWatchRequestBuilder extends MasterNodeOperationRequestBuilder this.request().setId(id); return this; } - - /** - * Sets wiether this request is forced (ie ignores locks) - */ - public DeleteWatchRequestBuilder setForce(boolean force) { - this.request().setForce(force); - return this; - } - } diff --git a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/delete/TransportDeleteWatchAction.java b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/delete/TransportDeleteWatchAction.java index cc12b25c90b..f0d43b1e8a9 100644 --- a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/delete/TransportDeleteWatchAction.java +++ b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/delete/TransportDeleteWatchAction.java @@ -10,11 +10,11 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.license.XPackLicenseState; @@ -55,8 +55,7 @@ public class TransportDeleteWatchAction extends WatcherTransportAction listener) throws ElasticsearchException { try { - DeleteResponse deleteResponse = watcherService.deleteWatch(request.getId(), request.masterNodeTimeout(), request.isForce()) - .deleteResponse(); + DeleteResponse deleteResponse = watcherService.deleteWatch(request.getId()).deleteResponse(); boolean deleted = deleteResponse.getResult() == DocWriteResponse.Result.DELETED; DeleteWatchResponse response = new DeleteWatchResponse(deleteResponse.getId(), deleteResponse.getVersion(), deleted); listener.onResponse(response); diff --git a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchAction.java b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchAction.java index be1ba08391b..d79a0ca7b1b 100644 --- a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchAction.java +++ b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchAction.java @@ -61,8 +61,7 @@ public class TransportPutWatchAction extends WatcherTransportAction watchLocks = new FairKeyedLock<>(); + private final KeyedLock watchLocks = new KeyedLock<>(true); private final AtomicBoolean running = new AtomicBoolean(false); private static final TimeValue DEFAULT_MAX_STOP_TIMEOUT = new TimeValue(30, TimeUnit.SECONDS); private static final String DEFAULT_MAX_STOP_TIMEOUT_SETTING = "xpack.watcher.stop.timeout"; @@ -43,32 +41,12 @@ public class WatchLockService extends AbstractComponent { this.maxStopTimeout = maxStopTimeout; } - public Lock acquire(String name) { + public Releasable acquire(String name) { if (!running.get()) { throw illegalState("cannot acquire lock for watch [{}]. lock service is not running", name); } - watchLocks.acquire(name); - return new Lock(name, watchLocks); - } - - public Lock tryAcquire(String name, TimeValue timeout) { - if (!running.get()) { - throw illegalState("cannot acquire lock for watch [{}]. lock service is not running", name); - } - try { - if (!watchLocks.tryAcquire(name, timeout.millis(), TimeUnit.MILLISECONDS)) { - logger.warn("failed to acquire lock on watch [{}] (waited for [{}]). It is possible that for some reason this watch " + - "execution is stuck", name, timeout.format(PeriodType.seconds())); - return null; - } - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - //todo figure out a better std exception for this - logger.error((Supplier) () -> new ParameterizedMessage("could not acquire lock for watch [{}]", name), ie); - return null; - } - return new Lock(name, watchLocks); + return watchLocks.acquire(name); } public void start() { @@ -105,24 +83,7 @@ public class WatchLockService extends AbstractComponent { } } - FairKeyedLock getWatchLocks() { + KeyedLock getWatchLocks() { return watchLocks; } - - public static class Lock { - - private final String name; - private final FairKeyedLock watchLocks; - - private Lock(String name, FairKeyedLock watchLocks) { - this.name = name; - this.watchLocks = watchLocks; - - } - - public void release() { - watchLocks.release(name); - } - } - } diff --git a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchStore.java b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchStore.java index e4b7bdc40fb..98a3e712ea7 100644 --- a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchStore.java +++ b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchStore.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.index.engine.DocumentMissingException; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortBuilders; @@ -159,32 +160,36 @@ public class WatchStore extends AbstractComponent { // at the moment we store the status together with the watch, // so we just need to update the watch itself - // TODO: consider storing the status in a different documment (watch_status doc) (must smaller docs... faster for frequent updates) XContentBuilder source = JsonXContent.contentBuilder(). startObject() .field(Watch.Field.STATUS.getPreferredName(), watch.status(), ToXContent.EMPTY_PARAMS) .endObject(); + UpdateRequest updateRequest = new UpdateRequest(INDEX, DOC_TYPE, watch.id()); updateRequest.doc(source); updateRequest.version(watch.version()); - - UpdateResponse response = client.update(updateRequest); - watch.status().version(response.getVersion()); - watch.version(response.getVersion()); - watch.status().resetDirty(); - // Don't need to update the watches, since we are working on an instance from it. + try { + UpdateResponse response = client.update(updateRequest); + watch.status().version(response.getVersion()); + watch.version(response.getVersion()); + watch.status().resetDirty(); + } catch (DocumentMissingException e) { + // do not rethrow an exception, otherwise the watch history will contain an exception + // even though the execution might has been fine + logger.warn("Watch [{}] was deleted during watch execution, not updating watch status", watch.id()); + } } /** * Deletes the watch with the specified id if exists */ - public WatchDelete delete(String id, boolean force) { + public WatchDelete delete(String id) { ensureStarted(); Watch watch = watches.remove(id); // even if the watch was not found in the watch map, we should still try to delete it // from the index, just to make sure we don't leave traces of it DeleteRequest request = new DeleteRequest(INDEX, DOC_TYPE, id); - if (watch != null && !force) { + if (watch != null) { request.version(watch.version()); } DeleteResponse response = client.delete(request); diff --git a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java index ae14b8fbb6f..6dfcab4b93e 100644 --- a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java +++ b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.watcher; -import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexResponse; @@ -13,12 +12,11 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.watcher.execution.ExecutionService; -import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry; import org.elasticsearch.xpack.support.clock.ClockMock; import org.elasticsearch.xpack.support.clock.SystemClock; +import org.elasticsearch.xpack.watcher.execution.ExecutionService; +import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry; import org.elasticsearch.xpack.watcher.trigger.Trigger; import org.elasticsearch.xpack.watcher.trigger.TriggerEngine; import org.elasticsearch.xpack.watcher.trigger.TriggerService; @@ -33,7 +31,6 @@ import org.junit.Before; import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.emptyMap; -import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Matchers.any; @@ -55,17 +52,15 @@ public class WatcherServiceTests extends ESTestCase { private WatchStore watchStore; private Watch.Parser watchParser; private WatcherService watcherService; - private WatchLockService watchLockService; private ClockMock clock; - private ExecutionService executionService; @Before public void init() throws Exception { triggerService = mock(TriggerService.class); watchStore = mock(WatchStore.class); watchParser = mock(Watch.Parser.class); - executionService = mock(ExecutionService.class); - watchLockService = mock(WatchLockService.class); + ExecutionService executionService = mock(ExecutionService.class); + WatchLockService watchLockService = mock(WatchLockService.class); clock = new ClockMock(); WatcherIndexTemplateRegistry watcherIndexTemplateRegistry = mock(WatcherIndexTemplateRegistry.class); watcherService = new WatcherService(Settings.EMPTY, clock, triggerService, watchStore, watchParser, executionService, @@ -87,13 +82,10 @@ public class WatcherServiceTests extends ESTestCase { when(watchPut.indexResponse()).thenReturn(indexResponse); when(watchPut.current()).thenReturn(newWatch); - TimeValue timeout = TimeValue.timeValueSeconds(5); - WatchLockService.Lock lock = mock(WatchLockService.Lock.class); - when(watchLockService.tryAcquire(any(String.class), eq(timeout))).thenReturn(lock); when(watchParser.parseWithSecrets(any(String.class), eq(false), any(BytesReference.class), any(DateTime.class))) .thenReturn(newWatch); when(watchStore.put(newWatch)).thenReturn(watchPut); - IndexResponse response = watcherService.putWatch("_id", new BytesArray("{}"), timeout, activeByDefault); + IndexResponse response = watcherService.putWatch("_id", new BytesArray("{}"), activeByDefault); assertThat(response, sameInstance(indexResponse)); verify(newWatch, times(1)).setState(activeByDefault, clock.nowUTC()); @@ -104,19 +96,6 @@ public class WatcherServiceTests extends ESTestCase { } } - public void testPutWatchTimeout() throws Exception { - TimeValue timeout = TimeValue.timeValueSeconds(5); - when(watchLockService.tryAcquire("_id", timeout)).thenReturn(null); - try { - watcherService.putWatch("_id", new BytesArray("{}"), timeout, randomBoolean()); - fail("Expected ElasticsearchTimeoutException"); - } catch (ElasticsearchTimeoutException e) { - assertThat(e.getMessage(), containsString("could not put watch")); - assertThat(e.getMessage(), containsString("wait and try again")); - assertThat(e.getMessage(), containsString("there is a high chance that the watch execution is stuck")); - } - } - public void testPutWatchDifferentActiveStates() throws Exception { Trigger trigger = mock(Trigger.class); @@ -141,13 +120,10 @@ public class WatcherServiceTests extends ESTestCase { when(previousWatch.trigger()).thenReturn(trigger); when(watchPut.previous()).thenReturn(previousWatch); - TimeValue timeout = TimeValue.timeValueSeconds(5); - WatchLockService.Lock lock = mock(WatchLockService.Lock.class); - when(watchLockService.tryAcquire(any(String.class), eq(timeout))).thenReturn(lock); when(watchParser.parseWithSecrets(any(String.class), eq(false), any(BytesReference.class), eq(clock.nowUTC()))).thenReturn(watch); when(watchStore.put(watch)).thenReturn(watchPut); - IndexResponse response = watcherService.putWatch("_id", new BytesArray("{}"), timeout, active); + IndexResponse response = watcherService.putWatch("_id", new BytesArray("{}"), active); assertThat(response, sameInstance(indexResponse)); if (!active) { @@ -163,63 +139,24 @@ public class WatcherServiceTests extends ESTestCase { } public void testDeleteWatch() throws Exception { - TimeValue timeout = TimeValue.timeValueSeconds(5); - WatchLockService.Lock lock = mock(WatchLockService.Lock.class); - boolean force = randomBoolean(); - when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock); - WatchStore.WatchDelete expectedWatchDelete = mock(WatchStore.WatchDelete.class); DeleteResponse deleteResponse = mock(DeleteResponse.class); when(deleteResponse.getResult()).thenReturn(DocWriteResponse.Result.DELETED); when(expectedWatchDelete.deleteResponse()).thenReturn(deleteResponse); - when(watchStore.delete("_id", force)).thenReturn(expectedWatchDelete); - WatchStore.WatchDelete watchDelete = watcherService.deleteWatch("_id", timeout, force); - - assertThat(watchDelete, sameInstance(expectedWatchDelete)); - verify(triggerService, times(1)).remove("_id"); - } - - public void testDeleteWatchTimeout() throws Exception { - TimeValue timeout = TimeValue.timeValueSeconds(5); - when(watchLockService.tryAcquire("_id", timeout)).thenReturn(null); - try { - watcherService.deleteWatch("_id", timeout, false); - fail("Expected ElasticsearchTimeoutException"); - } catch (ElasticsearchTimeoutException e) { - assertThat(e.getMessage(), containsString("could not delete watch")); - assertThat(e.getMessage(), containsString("wait and try again")); - assertThat(e.getMessage(), containsString("there is a high chance that the watch execution is stuck")); - } - } - - public void testDeleteWatchForce() throws Exception { - TimeValue timeout = TimeValue.timeValueSeconds(5); - WatchLockService.Lock lock = mock(WatchLockService.Lock.class); - when(watchLockService.tryAcquire("_id", timeout)).thenReturn(null); - - WatchStore.WatchDelete expectedWatchDelete = mock(WatchStore.WatchDelete.class); - DeleteResponse deleteResponse = mock(DeleteResponse.class); - when(deleteResponse.getResult()).thenReturn(DocWriteResponse.Result.DELETED); - when(expectedWatchDelete.deleteResponse()).thenReturn(deleteResponse); - when(watchStore.delete("_id", true)).thenReturn(expectedWatchDelete); - WatchStore.WatchDelete watchDelete = watcherService.deleteWatch("_id", timeout, true); + when(watchStore.delete("_id")).thenReturn(expectedWatchDelete); + WatchStore.WatchDelete watchDelete = watcherService.deleteWatch("_id"); assertThat(watchDelete, sameInstance(expectedWatchDelete)); verify(triggerService, times(1)).remove("_id"); } public void testDeleteWatchNotFound() throws Exception { - TimeValue timeout = TimeValue.timeValueSeconds(5); - boolean force = randomBoolean(); - WatchLockService.Lock lock = mock(WatchLockService.Lock.class); - when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock); - WatchStore.WatchDelete expectedWatchDelete = mock(WatchStore.WatchDelete.class); DeleteResponse deleteResponse = mock(DeleteResponse.class); when(deleteResponse.getResult()).thenReturn(DocWriteResponse.Result.NOOP); when(expectedWatchDelete.deleteResponse()).thenReturn(deleteResponse); - when(watchStore.delete("_id", force)).thenReturn(expectedWatchDelete); - WatchStore.WatchDelete watchDelete = watcherService.deleteWatch("_id", timeout, force); + when(watchStore.delete("_id")).thenReturn(expectedWatchDelete); + WatchStore.WatchDelete watchDelete = watcherService.deleteWatch("_id"); assertThat(watchDelete, sameInstance(expectedWatchDelete)); verifyZeroInteractions(triggerService); @@ -228,39 +165,34 @@ public class WatcherServiceTests extends ESTestCase { public void testAckWatch() throws Exception { DateTime now = new DateTime(DateTimeZone.UTC); clock.setTime(now); - TimeValue timeout = TimeValue.timeValueSeconds(5); - WatchLockService.Lock lock = mock(WatchLockService.Lock.class); - when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock); Watch watch = mock(Watch.class); when(watch.ack(now, "_all")).thenReturn(true); WatchStatus status = new WatchStatus(now, emptyMap()); when(watch.status()).thenReturn(status); when(watchStore.get("_id")).thenReturn(watch); - WatchStatus result = watcherService.ackWatch("_id", Strings.EMPTY_ARRAY, timeout); + WatchStatus result = watcherService.ackWatch("_id", Strings.EMPTY_ARRAY); assertThat(result, not(sameInstance(status))); verify(watchStore, times(1)).updateStatus(watch); } public void testActivate() throws Exception { - TimeValue timeout = TimeValue.timeValueSeconds(randomIntBetween(1, 30)); WatcherService service = spy(watcherService); WatchStatus expectedStatus = mock(WatchStatus.class); - doReturn(expectedStatus).when(service).setWatchState("_id", true, timeout); - WatchStatus actualStatus = service.activateWatch("_id", timeout); + doReturn(expectedStatus).when(service).setWatchState("_id", true); + WatchStatus actualStatus = service.activateWatch("_id"); assertThat(actualStatus, sameInstance(expectedStatus)); - verify(service, times(1)).setWatchState("_id", true, timeout); + verify(service, times(1)).setWatchState("_id", true); } public void testDeactivate() throws Exception { - TimeValue timeout = TimeValue.timeValueSeconds(randomIntBetween(1, 30)); WatcherService service = spy(watcherService); WatchStatus expectedStatus = mock(WatchStatus.class); - doReturn(expectedStatus).when(service).setWatchState("_id", false, timeout); - WatchStatus actualStatus = service.deactivateWatch("_id", timeout); + doReturn(expectedStatus).when(service).setWatchState("_id", false); + WatchStatus actualStatus = service.deactivateWatch("_id"); assertThat(actualStatus, sameInstance(expectedStatus)); - verify(service, times(1)).setWatchState("_id", false, timeout); + verify(service, times(1)).setWatchState("_id", false); } public void testSetWatchStateSetActiveOnCurrentlyActive() throws Exception { @@ -268,12 +200,8 @@ public class WatcherServiceTests extends ESTestCase { // - the watch status should not change // - the watch doesn't need to be updated in the store // - the watch should not be removed or re-added to the trigger service - DateTime now = new DateTime(DateTimeZone.UTC); clock.setTime(now); - TimeValue timeout = TimeValue.timeValueSeconds(5); - WatchLockService.Lock lock = mock(WatchLockService.Lock.class); - when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock); Watch watch = mock(Watch.class); WatchStatus status = new WatchStatus(now, emptyMap()); @@ -283,7 +211,7 @@ public class WatcherServiceTests extends ESTestCase { when(watchStore.get("_id")).thenReturn(watch); - WatchStatus result = watcherService.setWatchState("_id", true, timeout); + WatchStatus result = watcherService.setWatchState("_id", true); assertThat(result, not(sameInstance(status))); verifyZeroInteractions(triggerService); @@ -299,10 +227,6 @@ public class WatcherServiceTests extends ESTestCase { DateTime now = new DateTime(DateTimeZone.UTC); clock.setTime(now); - TimeValue timeout = TimeValue.timeValueSeconds(5); - WatchLockService.Lock lock = mock(WatchLockService.Lock.class); - when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock); - Watch watch = mock(Watch.class); WatchStatus status = new WatchStatus(now, emptyMap()); when(watch.status()).thenReturn(status); @@ -310,7 +234,7 @@ public class WatcherServiceTests extends ESTestCase { when(watchStore.get("_id")).thenReturn(watch); - WatchStatus result = watcherService.setWatchState("_id", true, timeout); + WatchStatus result = watcherService.setWatchState("_id", true); assertThat(result, not(sameInstance(status))); verify(triggerService, times(1)).add(watch); @@ -322,12 +246,8 @@ public class WatcherServiceTests extends ESTestCase { // - the watch status should change // - the watch needs to be updated in the store // - the watch should be removed from the trigger service - DateTime now = new DateTime(DateTimeZone.UTC); clock.setTime(now); - TimeValue timeout = TimeValue.timeValueSeconds(5); - WatchLockService.Lock lock = mock(WatchLockService.Lock.class); - when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock); Watch watch = mock(Watch.class); when(watch.id()).thenReturn("_id"); @@ -337,7 +257,7 @@ public class WatcherServiceTests extends ESTestCase { when(watchStore.get("_id")).thenReturn(watch); - WatchStatus result = watcherService.setWatchState("_id", false, timeout); + WatchStatus result = watcherService.setWatchState("_id", false); assertThat(result, not(sameInstance(status))); verify(triggerService, times(1)).remove("_id"); @@ -349,14 +269,9 @@ public class WatcherServiceTests extends ESTestCase { // - the watch status should not be updated // - the watch should not be updated in the store // - the watch should be re-added or removed to/from the trigger service - DateTime now = new DateTime(DateTimeZone.UTC); clock.setTime(now); - TimeValue timeout = TimeValue.timeValueSeconds(5); - WatchLockService.Lock lock = mock(WatchLockService.Lock.class); - when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock); - Watch watch = mock(Watch.class); when(watch.id()).thenReturn("_id"); WatchStatus status = new WatchStatus(now, emptyMap()); @@ -365,51 +280,32 @@ public class WatcherServiceTests extends ESTestCase { when(watchStore.get("_id")).thenReturn(watch); - WatchStatus result = watcherService.setWatchState("_id", false, timeout); + WatchStatus result = watcherService.setWatchState("_id", false); assertThat(result, not(sameInstance(status))); verifyZeroInteractions(triggerService); verify(watchStore, never()).updateStatus(watch); } - public void testAckWatchTimeout() throws Exception { - TimeValue timeout = TimeValue.timeValueSeconds(5); - when(watchLockService.tryAcquire("_id", timeout)).thenReturn(null); - try { - watcherService.ackWatch("_id", Strings.EMPTY_ARRAY, timeout); - fail("Expected ElasticsearchTimeoutException"); - } catch (ElasticsearchTimeoutException e) { - assertThat(e.getMessage(), containsString("could not ack watch")); - assertThat(e.getMessage(), containsString("wait and try again")); - assertThat(e.getMessage(), containsString("there is a high chance that the watch execution is stuck")); - } - } - public void testAckWatchNotAck() throws Exception { DateTime now = SystemClock.INSTANCE.nowUTC(); - TimeValue timeout = TimeValue.timeValueSeconds(5); - WatchLockService.Lock lock = mock(WatchLockService.Lock.class); - when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock); Watch watch = mock(Watch.class); when(watch.ack(now)).thenReturn(false); WatchStatus status = new WatchStatus(now, emptyMap()); when(watch.status()).thenReturn(status); when(watchStore.get("_id")).thenReturn(watch); - WatchStatus result = watcherService.ackWatch("_id", Strings.EMPTY_ARRAY, timeout); + WatchStatus result = watcherService.ackWatch("_id", Strings.EMPTY_ARRAY); assertThat(result, not(sameInstance(status))); verify(watchStore, never()).updateStatus(watch); } public void testAckWatchNoWatch() throws Exception { - TimeValue timeout = TimeValue.timeValueSeconds(5); - WatchLockService.Lock lock = mock(WatchLockService.Lock.class); - when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock); when(watchStore.get("_id")).thenReturn(null); try { - watcherService.ackWatch("_id", Strings.EMPTY_ARRAY, timeout); + watcherService.ackWatch("_id", Strings.EMPTY_ARRAY); fail(); } catch (IllegalArgumentException iae) { // expected diff --git a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/throttler/ActionThrottleTests.java b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/throttler/ActionThrottleTests.java index 742172be679..6f49045c88c 100644 --- a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/throttler/ActionThrottleTests.java +++ b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/throttler/ActionThrottleTests.java @@ -7,6 +7,10 @@ package org.elasticsearch.xpack.watcher.actions.throttler; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.xpack.common.http.HttpRequestTemplate; +import org.elasticsearch.xpack.common.text.TextTemplate; +import org.elasticsearch.xpack.notification.email.EmailTemplate; +import org.elasticsearch.xpack.support.clock.SystemClock; import org.elasticsearch.xpack.watcher.actions.Action; import org.elasticsearch.xpack.watcher.actions.ActionWrapper; import org.elasticsearch.xpack.watcher.actions.email.EmailAction; @@ -18,9 +22,6 @@ import org.elasticsearch.xpack.watcher.execution.ActionExecutionMode; import org.elasticsearch.xpack.watcher.execution.ExecutionState; import org.elasticsearch.xpack.watcher.execution.ManualExecutionContext; import org.elasticsearch.xpack.watcher.history.WatchRecord; -import org.elasticsearch.xpack.support.clock.SystemClock; -import org.elasticsearch.xpack.common.http.HttpRequestTemplate; -import org.elasticsearch.xpack.common.text.TextTemplate; import org.elasticsearch.xpack.watcher.support.xcontent.ObjectPath; import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase; import org.elasticsearch.xpack.watcher.transport.actions.execute.ExecuteWatchResponse; @@ -31,7 +32,6 @@ import org.elasticsearch.xpack.watcher.trigger.manual.ManualTriggerEvent; import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent; -import org.elasticsearch.xpack.notification.email.EmailTemplate; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -77,7 +77,7 @@ public class ActionThrottleTests extends AbstractWatcherIntegrationTestCase { if (useClientForAcking) { watcherClient().prepareAckWatch("_id").setActionIds("test_id").get(); } else { - watchService().ackWatch("_id", new String[] { "test_id" }, new TimeValue(5, TimeUnit.SECONDS)); + watchService().ackWatch("_id", new String[] { "test_id" }); } } ctx = getManualExecutionContext(new TimeValue(0, TimeUnit.SECONDS)); @@ -117,7 +117,7 @@ public class ActionThrottleTests extends AbstractWatcherIntegrationTestCase { if (useClientForAcking) { watcherClient().prepareAckWatch("_id").setActionIds(actionId).get(); } else { - watchService().ackWatch("_id", new String[]{actionId}, new TimeValue(5, TimeUnit.SECONDS)); + watchService().ackWatch("_id", new String[]{actionId}); } } @@ -192,7 +192,6 @@ public class ActionThrottleTests extends AbstractWatcherIntegrationTestCase { .trigger(schedule(interval("60m"))); AvailableAction availableAction = randomFrom(AvailableAction.values()); - final String actionType = availableAction.type(); watchSourceBuilder.addAction("default_global_throttle", availableAction.action()); PutWatchResponse putWatchResponse = watcherClient().putWatch(new PutWatchRequest("_id", watchSourceBuilder)).actionGet(); diff --git a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java index 4ff96fe3597..4c1c080018d 100644 --- a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java +++ b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.watcher.execution; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.test.ESTestCase; @@ -111,8 +112,8 @@ public class ExecutionServiceTests extends ESTestCase { } public void testExecute() throws Exception { - WatchLockService.Lock lock = mock(WatchLockService.Lock.class); - when(watchLockService.acquire("_id")).thenReturn(lock); + Releasable releasable = mock(Releasable.class); + when(watchLockService.acquire("_id")).thenReturn(releasable); Watch watch = mock(Watch.class); when(watch.id()).thenReturn("_id"); @@ -190,7 +191,7 @@ public class ExecutionServiceTests extends ESTestCase { assertThat(result.action(), sameInstance(actionResult)); verify(historyStore, times(1)).put(watchRecord); - verify(lock, times(1)).release(); + verify(releasable, times(1)).close(); verify(condition, times(1)).execute(context); verify(watchTransform, times(1)).execute(context, payload); verify(action, times(1)).execute("_action", context, payload); @@ -204,8 +205,8 @@ public class ExecutionServiceTests extends ESTestCase { } public void testExecuteFailedInput() throws Exception { - WatchLockService.Lock lock = mock(WatchLockService.Lock.class); - when(watchLockService.acquire("_id")).thenReturn(lock); + Releasable releasable = mock(Releasable.class); + when(watchLockService.acquire("_id")).thenReturn(releasable); Watch watch = mock(Watch.class); when(watch.id()).thenReturn("_id"); @@ -267,7 +268,7 @@ public class ExecutionServiceTests extends ESTestCase { assertThat(watchRecord.result().actionsResults().count(), is(0)); verify(historyStore, times(1)).put(watchRecord); - verify(lock, times(1)).release(); + verify(releasable, times(1)).close(); verify(input, times(1)).execute(context, null); verify(condition, never()).execute(context); verify(watchTransform, never()).execute(context, payload); @@ -275,8 +276,8 @@ public class ExecutionServiceTests extends ESTestCase { } public void testExecuteFailedCondition() throws Exception { - WatchLockService.Lock lock = mock(WatchLockService.Lock.class); - when(watchLockService.acquire("_id")).thenReturn(lock); + Releasable releasable = mock(Releasable.class); + when(watchLockService.acquire("_id")).thenReturn(releasable); Watch watch = mock(Watch.class); when(watch.id()).thenReturn("_id"); @@ -334,7 +335,7 @@ public class ExecutionServiceTests extends ESTestCase { assertThat(watchRecord.result().actionsResults().count(), is(0)); verify(historyStore, times(1)).put(watchRecord); - verify(lock, times(1)).release(); + verify(releasable, times(1)).close(); verify(input, times(1)).execute(context, null); verify(condition, times(1)).execute(context); verify(watchTransform, never()).execute(context, payload); @@ -342,8 +343,8 @@ public class ExecutionServiceTests extends ESTestCase { } public void testExecuteFailedWatchTransform() throws Exception { - WatchLockService.Lock lock = mock(WatchLockService.Lock.class); - when(watchLockService.acquire("_id")).thenReturn(lock); + Releasable releasable = mock(Releasable.class); + when(watchLockService.acquire("_id")).thenReturn(releasable); Watch watch = mock(Watch.class); when(watch.id()).thenReturn("_id"); @@ -400,7 +401,7 @@ public class ExecutionServiceTests extends ESTestCase { assertThat(watchRecord.result().actionsResults().count(), is(0)); verify(historyStore, times(1)).put(watchRecord); - verify(lock, times(1)).release(); + verify(releasable, times(1)).close(); verify(input, times(1)).execute(context, null); verify(condition, times(1)).execute(context); verify(watchTransform, times(1)).execute(context, payload); @@ -408,8 +409,8 @@ public class ExecutionServiceTests extends ESTestCase { } public void testExecuteFailedActionTransform() throws Exception { - WatchLockService.Lock lock = mock(WatchLockService.Lock.class); - when(watchLockService.acquire("_id")).thenReturn(lock); + Releasable releasable = mock(Releasable.class); + when(watchLockService.acquire("_id")).thenReturn(releasable); Watch watch = mock(Watch.class); when(watch.id()).thenReturn("_id"); @@ -483,7 +484,7 @@ public class ExecutionServiceTests extends ESTestCase { assertThat(watchRecord.result().actionsResults().get("_action").action().status(), is(Action.Result.Status.FAILURE)); verify(historyStore, times(1)).put(watchRecord); - verify(lock, times(1)).release(); + verify(releasable, times(1)).close(); verify(input, times(1)).execute(context, null); verify(condition, times(1)).execute(context); verify(watchTransform, times(1)).execute(context, payload); diff --git a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ManualExecutionTests.java b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ManualExecutionTests.java index 2328c9c69fe..4a91ba36dd5 100644 --- a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ManualExecutionTests.java +++ b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ManualExecutionTests.java @@ -390,7 +390,7 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTestCase { for (Thread thread : threads) { thread.start(); } - DeleteWatchResponse deleteWatchResponse = watcherClient().prepareDeleteWatch("_id").setForce(true).get(); + DeleteWatchResponse deleteWatchResponse = watcherClient().prepareDeleteWatch("_id").get(); assertThat(deleteWatchResponse.isFound(), is(true)); deleteWatchResponse = watcherClient().prepareDeleteWatch("_id").get(); diff --git a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/support/concurrent/FairKeyedLockTests.java b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/support/concurrent/FairKeyedLockTests.java deleted file mode 100644 index bbeb3ff18b7..00000000000 --- a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/support/concurrent/FairKeyedLockTests.java +++ /dev/null @@ -1,209 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.watcher.support.concurrent; - -import org.elasticsearch.test.ESTestCase; -import org.hamcrest.Matchers; - -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; - -/** - */ -public class FairKeyedLockTests extends ESTestCase { - public void testIfMapEmptyAfterLotsOfAcquireAndReleases() throws InterruptedException { - ConcurrentHashMap counter = new ConcurrentHashMap<>(); - ConcurrentHashMap safeCounter = new ConcurrentHashMap<>(); - FairKeyedLock connectionLock = randomBoolean() ? new FairKeyedLock.GlobalLockable<>() : new FairKeyedLock<>(); - String[] names = new String[randomIntBetween(1, 40)]; - for (int i = 0; i < names.length; i++) { - names[i] = randomRealisticUnicodeOfLengthBetween(10, 20); - } - CountDownLatch startLatch = new CountDownLatch(1); - int numThreads = randomIntBetween(3, 10); - AcquireAndReleaseThread[] threads = new AcquireAndReleaseThread[numThreads]; - for (int i = 0; i < numThreads; i++) { - threads[i] = new AcquireAndReleaseThread(startLatch, connectionLock, names, counter, safeCounter); - } - for (int i = 0; i < numThreads; i++) { - threads[i].start(); - } - startLatch.countDown(); - for (int i = 0; i < numThreads; i++) { - if (randomBoolean()) { - threads[i].incWithGlobal(); - } - } - - for (int i = 0; i < numThreads; i++) { - threads[i].join(); - } - assertThat(connectionLock.hasLockedKeys(), equalTo(false)); - - Set> entrySet = counter.entrySet(); - assertThat(counter.size(), equalTo(safeCounter.size())); - for (Map.Entry entry : entrySet) { - AtomicInteger atomicInteger = safeCounter.get(entry.getKey()); - assertThat(atomicInteger, not(Matchers.nullValue())); - assertThat(atomicInteger.get(), equalTo(entry.getValue())); - } - } - - public void testCannotAcquireTwoLocksGlobal() throws InterruptedException { - FairKeyedLock.GlobalLockable connectionLock = new FairKeyedLock.GlobalLockable<>(); - String name = randomRealisticUnicodeOfLength(scaledRandomIntBetween(10, 50)); - connectionLock.acquire(name); - try { - connectionLock.acquire(name); - fail("Expected IllegalArgumentException"); - } catch (IllegalArgumentException e) { - assertThat(e.getMessage(), containsString("Lock already acquired")); - } finally { - connectionLock.release(name); - connectionLock.globalLock().lock(); - connectionLock.globalLock().unlock(); - } - } - - public void testCannotAcquireTwoLocks() throws InterruptedException { - FairKeyedLock connectionLock = randomBoolean() ? new FairKeyedLock.GlobalLockable<>() : new FairKeyedLock<>(); - String name = randomRealisticUnicodeOfLength(scaledRandomIntBetween(10, 50)); - connectionLock.acquire(name); - try { - connectionLock.acquire(name); - fail("Expected IllegalArgumentException"); - } catch (IllegalArgumentException e) { - assertThat(e.getMessage(), containsString("Lock already acquired")); - } - } - - public void testTryAquire() throws InterruptedException { - final FairKeyedLock connectionLock = randomBoolean() ? new FairKeyedLock.GlobalLockable<>() : new FairKeyedLock<>(); - final String name = randomRealisticUnicodeOfLength(scaledRandomIntBetween(10, 50)); - connectionLock.acquire(name); - final CountDownLatch latch = new CountDownLatch(1); - final AtomicReference failure = new AtomicReference<>(); - Thread other = new Thread() { - @Override - public void run() { - try { - if (connectionLock.tryAcquire(name, 2, TimeUnit.SECONDS)) { - failure.set("expected to fail acquiring of the lock due to timeout"); - } - latch.countDown(); - } catch (InterruptedException e) { - latch.countDown(); - } - } - }; - other.start(); - if (!latch.await(5, TimeUnit.SECONDS)) { - fail("waiting too long for lock acquire"); - } - String failureMessage = failure.get(); - if(failureMessage != null) { - fail(failureMessage); - } - } - - - public void testCannotReleaseUnacquiredLock() throws InterruptedException { - FairKeyedLock connectionLock = randomBoolean() ? new FairKeyedLock.GlobalLockable<>() : new FairKeyedLock<>(); - String name = randomRealisticUnicodeOfLength(scaledRandomIntBetween(10, 50)); - try { - connectionLock.release(name); - fail("Expected IllegalArgumentException"); - } catch (IllegalArgumentException e) { - assertThat(e.getMessage(), is("Lock not acquired")); - } - } - - public static class AcquireAndReleaseThread extends Thread { - private CountDownLatch startLatch; - FairKeyedLock connectionLock; - String[] names; - ConcurrentHashMap counter; - ConcurrentHashMap safeCounter; - - public AcquireAndReleaseThread(CountDownLatch startLatch, FairKeyedLock connectionLock, String[] names, - ConcurrentHashMap counter, ConcurrentHashMap safeCounter) { - this.startLatch = startLatch; - this.connectionLock = connectionLock; - this.names = names; - this.counter = counter; - this.safeCounter = safeCounter; - } - - @Override - public void run() { - try { - startLatch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(); - } - int numRuns = scaledRandomIntBetween(5000, 50000); - for (int i = 0; i < numRuns; i++) { - String curName = names[randomInt(names.length - 1)]; - connectionLock.acquire(curName); - try { - Integer integer = counter.get(curName); - if (integer == null) { - counter.put(curName, 1); - } else { - counter.put(curName, integer.intValue() + 1); - } - } finally { - connectionLock.release(curName); - } - AtomicInteger atomicInteger = new AtomicInteger(0); - AtomicInteger value = safeCounter.putIfAbsent(curName, atomicInteger); - if (value == null) { - atomicInteger.incrementAndGet(); - } else { - value.incrementAndGet(); - } - } - } - - public void incWithGlobal() { - if (connectionLock instanceof FairKeyedLock.GlobalLockable) { - final int iters = randomIntBetween(10, 200); - for (int i = 0; i < iters; i++) { - ((FairKeyedLock.GlobalLockable) connectionLock).globalLock().lock(); - try { - String curName = names[randomInt(names.length - 1)]; - Integer integer = counter.get(curName); - if (integer == null) { - counter.put(curName, 1); - } else { - counter.put(curName, integer.intValue() + 1); - } - AtomicInteger atomicInteger = new AtomicInteger(0); - AtomicInteger value = safeCounter.putIfAbsent(curName, atomicInteger); - if (value == null) { - atomicInteger.incrementAndGet(); - } else { - value.incrementAndGet(); - } - } finally { - ((FairKeyedLock.GlobalLockable) connectionLock).globalLock().unlock(); - } - } - } - } - } - -} diff --git a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/action/delete/DeleteWatchTests.java b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/action/delete/DeleteWatchTests.java index cf6d61ebbc1..b861424bf30 100644 --- a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/action/delete/DeleteWatchTests.java +++ b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/action/delete/DeleteWatchTests.java @@ -5,20 +5,38 @@ */ package org.elasticsearch.xpack.watcher.transport.action.delete; +import com.squareup.okhttp.mockwebserver.MockResponse; +import com.squareup.okhttp.mockwebserver.MockWebServer; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.xpack.common.http.HttpRequestTemplate; +import org.elasticsearch.xpack.watcher.history.HistoryStore; +import org.elasticsearch.xpack.watcher.support.xcontent.ObjectPath; import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase; import org.elasticsearch.xpack.watcher.transport.actions.delete.DeleteWatchRequest; import org.elasticsearch.xpack.watcher.transport.actions.delete.DeleteWatchResponse; +import org.elasticsearch.xpack.watcher.transport.actions.execute.ExecuteWatchResponse; +import org.elasticsearch.xpack.watcher.transport.actions.get.GetWatchResponse; import org.elasticsearch.xpack.watcher.transport.actions.put.PutWatchResponse; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static com.carrotsearch.randomizedtesting.RandomizedTest.sleep; +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingAction; import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder; import static org.elasticsearch.xpack.watcher.condition.ConditionBuilders.alwaysCondition; +import static org.elasticsearch.xpack.watcher.input.InputBuilders.httpInput; import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput; import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule; import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; /** @@ -31,7 +49,7 @@ public class DeleteWatchTests extends AbstractWatcherIntegrationTestCase { .trigger(schedule(interval("5m"))) .input(simpleInput()) .condition(alwaysCondition()) - .addAction("_action1", loggingAction("{{ctx.watch_id}}"))) + .addAction("_action1", loggingAction("anything"))) .get(); assertThat(putResponse, notNullValue()); @@ -60,4 +78,63 @@ public class DeleteWatchTests extends AbstractWatcherIntegrationTestCase { assertThat(e.getMessage(), containsString("Watch id cannot have white spaces")); } } + + // This is a special case, since locking is removed + // Deleting a watch while it is being executed is possible now + // This test ensures that there are no leftovers, like a watch status without a watch in the watch store + // Also the watch history is checked, that the error has been marked as deleted + // The mock webserver does not support count down latches, so we have to use sleep - sorry! + public void testWatchDeletionDuringExecutionWorks() throws Exception { + ensureWatcherStarted(); + + MockResponse response = new MockResponse(); + response.setBody("foo"); + response.setResponseCode(200); + response.setBodyDelay(5, TimeUnit.SECONDS); + + MockWebServer server = new MockWebServer(); + server.enqueue(response); + + try { + server.start(); + HttpRequestTemplate template = HttpRequestTemplate.builder(server.getHostName(), server.getPort()).path("/").build(); + + PutWatchResponse responseFuture = watcherClient().preparePutWatch("_name").setSource(watchBuilder() + .trigger(schedule(interval("6h"))) + .input(httpInput(template)) + .addAction("_action1", loggingAction("anything"))) + .get(); + assertThat(responseFuture.isCreated(), is(true)); + + ListenableActionFuture executeWatchFuture = + watcherClient().prepareExecuteWatch("_name").setRecordExecution(true).execute(); + + // without this sleep the delete operation might overtake the watch execution + sleep(1000); + DeleteWatchResponse deleteWatchResponse = watcherClient().prepareDeleteWatch("_name").get(); + assertThat(deleteWatchResponse.isFound(), is(true)); + + executeWatchFuture.get(); + + // the watch is gone, no leftovers + GetWatchResponse getWatchResponse = watcherClient().prepareGetWatch("_name").get(); + assertThat(getWatchResponse.isFound(), is(false)); + + // the watch history shows a successful execution, even though the watch was deleted + // during execution + refresh(HistoryStore.INDEX_PREFIX + "*"); + + SearchResponse searchResponse = client().prepareSearch(HistoryStore.INDEX_PREFIX + "*").setQuery(matchAllQuery()).get(); + assertHitCount(searchResponse, 1); + + Map source = searchResponse.getHits().getAt(0).sourceAsMap(); + // watch has been executed successfully + String state = ObjectPath.eval("state", source); + assertThat(state, is("executed")); + // no exception occured + assertThat(source, not(hasKey("exception"))); + } finally { + server.shutdown(); + } + } } diff --git a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/action/delete/ForceDeleteWatchTests.java b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/action/delete/ForceDeleteWatchTests.java index 4f235e15d71..ba5e97b22a9 100644 --- a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/action/delete/ForceDeleteWatchTests.java +++ b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/action/delete/ForceDeleteWatchTests.java @@ -55,7 +55,7 @@ public class ForceDeleteWatchTests extends AbstractWatcherIntegrationTestCase { .get(); assertThat(putResponse.getId(), equalTo("_name")); Thread.sleep(5000); - DeleteWatchResponse deleteWatchResponse = watcherClient().prepareDeleteWatch("_name").setForce(true).get(); + DeleteWatchResponse deleteWatchResponse = watcherClient().prepareDeleteWatch("_name").get(); assertThat(deleteWatchResponse.isFound(), is(true)); deleteWatchResponse = watcherClient().prepareDeleteWatch("_name").get(); assertThat(deleteWatchResponse.isFound(), is(false)); diff --git a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchLockServiceTests.java b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchLockServiceTests.java index 6ff4d47c57f..bd8e0cbabe8 100644 --- a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchLockServiceTests.java +++ b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchLockServiceTests.java @@ -5,7 +5,9 @@ */ package org.elasticsearch.xpack.watcher.watch; +import junit.framework.AssertionFailedError; import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; @@ -36,27 +38,13 @@ public class WatchLockServiceTests extends ESTestCase { public void testLocking() { WatchLockService lockService = new WatchLockService(new TimeValue(1, TimeUnit.SECONDS)); lockService.start(); - WatchLockService.Lock lock = lockService.acquire("_name"); + Releasable releasable = lockService.acquire("_name"); assertThat(lockService.getWatchLocks().hasLockedKeys(), is(true)); - lock.release(); + releasable.close(); assertThat(lockService.getWatchLocks().hasLockedKeys(), is(false)); lockService.stop(); } - public void testLockingAlreadyHeld() { - WatchLockService lockService = new WatchLockService(new TimeValue(1, TimeUnit.SECONDS)); - lockService.start(); - WatchLockService.Lock lock1 = lockService.acquire("_name"); - try { - lockService.acquire("_name"); - fail("exception expected"); - } catch (IllegalArgumentException e) { - assertThat(e.getMessage(), containsString("Lock already acquired")); - } - lock1.release(); - lockService.stop(); - } - public void testLockingStopTimeout(){ final WatchLockService lockService = new WatchLockService(new TimeValue(1, TimeUnit.SECONDS)); lockService.start(); @@ -87,14 +75,11 @@ public class WatchLockServiceTests extends ESTestCase { @Override public void run() { startLatch.countDown(); - WatchLockService.Lock lock = lockService.acquire("_name"); - try { + try (Releasable ignored = lockService.acquire("_name")) { int actualValue = value.getAndIncrement(); assertThat(actualValue, equalTo(expectedValue)); Thread.sleep(50); } catch(InterruptedException ie) { - } finally { - lock.release(); } } } diff --git a/elasticsearch/x-pack/watcher/src/test/resources/rest-api-spec/api/xpack.watcher.delete_watch.json b/elasticsearch/x-pack/watcher/src/test/resources/rest-api-spec/api/xpack.watcher.delete_watch.json index c3dbbdb7f57..1baf670473d 100644 --- a/elasticsearch/x-pack/watcher/src/test/resources/rest-api-spec/api/xpack.watcher.delete_watch.json +++ b/elasticsearch/x-pack/watcher/src/test/resources/rest-api-spec/api/xpack.watcher.delete_watch.json @@ -16,10 +16,6 @@ "master_timeout": { "type": "duration", "description": "Specify timeout for watch write operation" - }, - "force": { - "type": "boolean", - "description": "Specify if this request should be forced and ignore locks" } } },