Watcher: Remove locking of watches for write operations elastic/elasticsearch#3481 (elastic/elasticsearch#3481)

Whenever a watch is updated (put, delete, set state), until now we
happened to reject those operations when a watch was executed at the
same time. However with long running reporting this might mean, that a
watch can never be changed, because it always gets executed.

* Removes the ability of write requests to obtain a lock at all (executing watches is still protected by a lock)
* Replaced the FairKeyedLock in watcher with the KeyedLock in Elasticsearch, which also takes a fair option, removed the FairKeyedLock
* Removed all the timeout parameters that are no longer needed, because there is no lock anymore
* Removed also the force parameter for watch deletion. Just do it[tm]
* Added a test that deleting a watch while it is being executed does not leave any leftovers

In case of a deletion of a watch during an execution, so that updating the status of the watch fails,
a warning is logged.

Closes elastic/elasticsearch#3417

Original commit: elastic/x-pack-elasticsearch@22fad1b797
This commit is contained in:
Alexander Reelsen 2016-09-19 09:44:32 +02:00 committed by GitHub
parent e069c1f090
commit 5b265ea569
22 changed files with 235 additions and 762 deletions

View File

@ -26,7 +26,6 @@ import org.elasticsearch.xpack.watcher.watch.WatchStatus;
import org.elasticsearch.xpack.watcher.watch.WatchStore;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.PeriodType;
import java.io.IOException;
import java.util.Map;
@ -108,71 +107,46 @@ public class WatcherService extends AbstractComponent {
}
}
public WatchStore.WatchDelete deleteWatch(String id, TimeValue timeout, final boolean force) {
public WatchStore.WatchDelete deleteWatch(String id) {
ensureStarted();
WatchLockService.Lock lock = null;
if (!force) {
lock = watchLockService.tryAcquire(id, timeout);
if (lock == null) {
throw new ElasticsearchTimeoutException("could not delete watch [{}] within [{}]... wait and try again. If this error " +
"continues to occur there is a high chance that the watch execution is stuck (either due to unresponsive external" +
" system such as an email service, or due to a bad script", id, timeout.format(PeriodType.seconds()));
}
}
try {
WatchStore.WatchDelete delete = watchStore.delete(id, force);
if (delete.deleteResponse().getResult() == DocWriteResponse.Result.DELETED) {
triggerService.remove(id);
}
return delete;
} finally {
if (lock != null) {
lock.release();
}
WatchStore.WatchDelete delete = watchStore.delete(id);
if (delete.deleteResponse().getResult() == DocWriteResponse.Result.DELETED) {
triggerService.remove(id);
}
return delete;
}
public IndexResponse putWatch(String id, BytesReference watchSource, TimeValue timeout, boolean active) throws IOException {
public IndexResponse putWatch(String id, BytesReference watchSource, boolean active) throws IOException {
ensureStarted();
WatchLockService.Lock lock = watchLockService.tryAcquire(id, timeout);
if (lock == null) {
throw new ElasticsearchTimeoutException("could not put watch [{}] within [{}]... wait and try again. If this error continues " +
"to occur there is a high chance that the watch execution is stuck (either due to unresponsive external system such " +
"as an email service, or due to a bad script", id, timeout.format(PeriodType.seconds()));
}
try {
DateTime now = clock.nowUTC();
Watch watch = watchParser.parseWithSecrets(id, false, watchSource, now);
watch.setState(active, now);
WatchStore.WatchPut result = watchStore.put(watch);
DateTime now = clock.nowUTC();
Watch watch = watchParser.parseWithSecrets(id, false, watchSource, now);
watch.setState(active, now);
WatchStore.WatchPut result = watchStore.put(watch);
if (result.previous() == null) {
// this is a newly created watch, so we only need to schedule it if it's active
if (result.current().status().state().isActive()) {
triggerService.add(result.current());
}
} else if (result.current().status().state().isActive()) {
if (!result.previous().status().state().isActive()) {
// the replaced watch was inactive, which means it wasn't scheduled. The new watch is active
// so we need to schedule it
triggerService.add(result.current());
} else if (!result.previous().trigger().equals(result.current().trigger())) {
// the previous watch was active and its schedule is different than the schedule of the
// new watch, so we need to
triggerService.add(result.current());
}
} else {
// if the current is inactive, we'll just remove it from the trigger service
// just to be safe
triggerService.remove(result.current().id());
if (result.previous() == null) {
// this is a newly created watch, so we only need to schedule it if it's active
if (result.current().status().state().isActive()) {
triggerService.add(result.current());
}
return result.indexResponse();
} finally {
lock.release();
} else if (result.current().status().state().isActive()) {
if (!result.previous().status().state().isActive()) {
// the replaced watch was inactive, which means it wasn't scheduled. The new watch is active
// so we need to schedule it
triggerService.add(result.current());
} else if (!result.previous().trigger().equals(result.current().trigger())) {
// the previous watch was active and its schedule is different than the schedule of the
// new watch, so we need to
triggerService.add(result.current());
}
} else {
// if the current is inactive, we'll just remove it from the trigger service
// just to be safe
triggerService.remove(result.current().id());
}
return result.indexResponse();
}
/**
@ -189,55 +163,38 @@ public class WatcherService extends AbstractComponent {
/**
* Acks the watch if needed
*/
public WatchStatus ackWatch(String id, String[] actionIds, TimeValue timeout) throws IOException {
public WatchStatus ackWatch(String id, String[] actionIds) throws IOException {
ensureStarted();
WatchLockService.Lock lock = watchLockService.tryAcquire(id, timeout);
if (lock == null) {
throw new ElasticsearchTimeoutException("could not ack watch [{}] within [{}]... wait and try again. If this error continues " +
"to occur there is a high chance that the watch execution is stuck (either due to unresponsive external system such " +
"as an email service, or due to a bad script", id, timeout.format(PeriodType.seconds()));
}
if (actionIds == null || actionIds.length == 0) {
actionIds = new String[] { Watch.ALL_ACTIONS_ID };
}
try {
Watch watch = watchStore.get(id);
if (watch == null) {
throw illegalArgument("watch [{}] does not exist", id);
}
// we need to create a safe copy of the status
if (watch.ack(clock.now(DateTimeZone.UTC), actionIds)) {
try {
watchStore.updateStatus(watch);
} catch (IOException ioe) {
throw ioException("failed to update the watch [{}] on ack", ioe, watch.id());
} catch (VersionConflictEngineException vcee) {
throw illegalState("failed to update the watch [{}] on ack, perhaps it was force deleted", vcee, watch.id());
}
}
return new WatchStatus(watch.status());
} finally {
lock.release();
Watch watch = watchStore.get(id);
if (watch == null) {
throw illegalArgument("watch [{}] does not exist", id);
}
// we need to create a safe copy of the status
if (watch.ack(clock.now(DateTimeZone.UTC), actionIds)) {
try {
watchStore.updateStatus(watch);
} catch (IOException ioe) {
throw ioException("failed to update the watch [{}] on ack", ioe, watch.id());
} catch (VersionConflictEngineException vcee) {
throw illegalState("failed to update the watch [{}] on ack, perhaps it was force deleted", vcee, watch.id());
}
}
return new WatchStatus(watch.status());
}
public WatchStatus activateWatch(String id, TimeValue timeout) throws IOException {
return setWatchState(id, true, timeout);
public WatchStatus activateWatch(String id) throws IOException {
return setWatchState(id, true);
}
public WatchStatus deactivateWatch(String id, TimeValue timeout) throws IOException {
return setWatchState(id, false, timeout);
public WatchStatus deactivateWatch(String id) throws IOException {
return setWatchState(id, false);
}
WatchStatus setWatchState(String id, boolean active, TimeValue timeout) throws IOException {
WatchStatus setWatchState(String id, boolean active) throws IOException {
ensureStarted();
WatchLockService.Lock lock = watchLockService.tryAcquire(id, timeout);
if (lock == null) {
throw new ElasticsearchTimeoutException("could not ack watch [{}] within [{}]... wait and try again. If this error continues " +
"to occur there is a high chance that the watch execution is stuck (either due to unresponsive external system such " +
"as an email service, or due to a bad script", id, timeout.format(PeriodType.seconds()));
}
// for now, when a watch is deactivated we don't remove its runtime representation
// that is, the store will still keep the watch in memory. We only mark the watch
// as inactive (both in runtime and also update the watch in the watches index)
@ -251,30 +208,26 @@ public class WatcherService extends AbstractComponent {
// to run this exercise anyway... and make sure that nothing in watcher relies on the
// fact that the watch store holds all watches in memory.
try {
Watch watch = watchStore.get(id);
if (watch == null) {
throw illegalArgument("watch [{}] does not exist", id);
}
if (watch.setState(active, clock.nowUTC())) {
try {
watchStore.updateStatus(watch);
if (active) {
triggerService.add(watch);
} else {
triggerService.remove(watch.id());
}
} catch (IOException ioe) {
throw ioException("failed to update the watch [{}] on ack", ioe, watch.id());
} catch (VersionConflictEngineException vcee) {
throw illegalState("failed to update the watch [{}] on ack, perhaps it was force deleted", vcee, watch.id());
}
}
// we need to create a safe copy of the status
return new WatchStatus(watch.status());
} finally {
lock.release();
Watch watch = watchStore.get(id);
if (watch == null) {
throw illegalArgument("watch [{}] does not exist", id);
}
if (watch.setState(active, clock.nowUTC())) {
try {
watchStore.updateStatus(watch);
if (active) {
triggerService.add(watch);
} else {
triggerService.remove(watch.id());
}
} catch (IOException ioe) {
throw ioException("failed to update the watch [{}] on ack", ioe, watch.id());
} catch (VersionConflictEngineException vcee) {
throw illegalState("failed to update the watch [{}] on ack, perhaps it was force deleted", vcee, watch.id());
}
}
// we need to create a safe copy of the status
return new WatchStatus(watch.status());
}
public long watchesCount() {

View File

@ -12,6 +12,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
@ -244,9 +245,9 @@ public class ExecutionService extends AbstractComponent {
public WatchRecord execute(WatchExecutionContext ctx) {
WatchRecord record = null;
WatchLockService.Lock lock = watchLockService.acquire(ctx.watch().id());
Releasable releasable = watchLockService.acquire(ctx.watch().id());
if (logger.isTraceEnabled()) {
logger.trace("acquired lock for [{}] -- [{}]", ctx.id(), System.identityHashCode(lock));
logger.trace("acquired lock for [{}] -- [{}]", ctx.id(), System.identityHashCode(releasable));
}
try {
currentExecutions.put(ctx.watch().id(), new WatchExecution(ctx, Thread.currentThread()));
@ -287,9 +288,9 @@ public class ExecutionService extends AbstractComponent {
}
currentExecutions.remove(ctx.watch().id());
if (logger.isTraceEnabled()) {
logger.trace("releasing lock for [{}] -- [{}]", ctx.id(), System.identityHashCode(lock));
logger.trace("releasing lock for [{}] -- [{}]", ctx.id(), System.identityHashCode(releasable));
}
lock.release();
releasable.close();
logger.trace("finished [{}]/[{}]", ctx.watch().id(), ctx.id());
}
return record;

View File

@ -40,7 +40,6 @@ public class RestDeleteWatchAction extends WatcherRestHandler {
protected void handleRequest(final RestRequest request, RestChannel channel, WatcherClient client) throws Exception {
DeleteWatchRequest deleteWatchRequest = new DeleteWatchRequest(request.param("id"));
deleteWatchRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteWatchRequest.masterNodeTimeout()));
deleteWatchRequest.setForce(request.paramAsBoolean("force", deleteWatchRequest.isForce()));
client.deleteWatch(deleteWatchRequest, new RestBuilderListener<DeleteWatchResponse>(channel) {
@Override
public RestResponse buildResponse(DeleteWatchResponse response, XContentBuilder builder) throws Exception {

View File

@ -1,162 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.support.concurrent;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
*/
public class FairKeyedLock<T> {
private final ConcurrentMap<T, KeyLock> map = ConcurrentCollections.newConcurrentMap();
protected final ThreadLocal<KeyLock> threadLocal = new ThreadLocal<>();
public void acquire(T key) {
while (true) {
if (threadLocal.get() != null) {
// if we are here, the thread already has the lock
throw new IllegalArgumentException("Lock already acquired in Thread" + Thread.currentThread().getId()
+ " for key " + key);
}
KeyLock perNodeLock = map.get(key);
if (perNodeLock == null) {
KeyLock newLock = new KeyLock(true);
perNodeLock = map.putIfAbsent(key, newLock);
if (perNodeLock == null) {
newLock.lock();
threadLocal.set(newLock);
return;
}
}
assert perNodeLock != null;
int i = perNodeLock.count.get();
if (i > 0 && perNodeLock.count.compareAndSet(i, i + 1)) {
perNodeLock.lock();
threadLocal.set(perNodeLock);
return;
}
}
}
public boolean tryAcquire(T key, long timeout, TimeUnit timeUnit) throws InterruptedException {
while (true) {
if (threadLocal.get() != null) {
// if we are here, the thread already has the lock
throw new IllegalArgumentException("Lock already acquired in Thread" + Thread.currentThread().getId()
+ " for key " + key);
}
KeyLock perNodeLock = map.get(key);
if (perNodeLock == null) {
KeyLock newLock = new KeyLock(true);
perNodeLock = map.putIfAbsent(key, newLock);
if (perNodeLock == null) {
if (newLock.tryLock(timeout, timeUnit)) {
threadLocal.set(newLock);
return true;
}
return false;
}
}
assert perNodeLock != null;
int i = perNodeLock.count.get();
if (i > 0 && perNodeLock.count.compareAndSet(i, i + 1)) {
if (perNodeLock.tryLock(timeout, timeUnit)) {
threadLocal.set(perNodeLock);
return true;
}
return false;
}
}
}
public void release(T key) {
KeyLock lock = threadLocal.get();
if (lock == null) {
throw new IllegalArgumentException("Lock not acquired");
}
release(key, lock);
}
void release(T key, KeyLock lock) {
assert lock.isHeldByCurrentThread();
assert lock == map.get(key);
lock.unlock();
threadLocal.set(null);
int decrementAndGet = lock.count.decrementAndGet();
if (decrementAndGet == 0) {
map.remove(key, lock);
}
}
@SuppressWarnings("serial")
private static final class KeyLock extends ReentrantLock {
private final AtomicInteger count = new AtomicInteger(1);
public KeyLock(boolean fair) {
super(fair);
}
}
public boolean hasLockedKeys() {
return !map.isEmpty();
}
/**
* A {@link FairKeyedLock} that allows to acquire a global lock that guarantees
* exclusive access to the resource the KeyedLock is guarding.
*/
public static final class GlobalLockable<T> extends FairKeyedLock<T> {
private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
@Override
public void acquire(T key) {
boolean success = false;
lock.readLock().lock();
try {
super.acquire(key);
success = true;
} finally {
if (!success) {
lock.readLock().unlock();
}
}
}
@Override
public void release(T key) {
KeyLock keyLock = threadLocal.get();
if (keyLock == null) {
throw new IllegalArgumentException("Lock not acquired");
}
try {
release(key, keyLock);
} finally {
lock.readLock().unlock();
}
}
/**
* Returns a global lock guaranteeing exclusive access to the resource
* this KeyedLock is guarding.
*/
public Lock globalLock() {
return lock.writeLock();
}
}
}

View File

@ -8,11 +8,11 @@ package org.elasticsearch.xpack.watcher.transport.actions.ack;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.XPackLicenseState;
@ -54,7 +54,7 @@ public class TransportAckWatchAction extends WatcherTransportAction<AckWatchRequ
protected void masterOperation(AckWatchRequest request, ClusterState state, ActionListener<AckWatchResponse> listener) throws
ElasticsearchException {
try {
WatchStatus watchStatus = watcherService.ackWatch(request.getWatchId(), request.getActionIds(), request.masterNodeTimeout());
WatchStatus watchStatus = watcherService.ackWatch(request.getWatchId(), request.getActionIds());
AckWatchResponse response = new AckWatchResponse(watchStatus);
listener.onResponse(response);
} catch (Exception e) {

View File

@ -8,11 +8,11 @@ package org.elasticsearch.xpack.watcher.transport.actions.activate;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.XPackLicenseState;
@ -55,8 +55,8 @@ public class TransportActivateWatchAction extends WatcherTransportAction<Activat
throws ElasticsearchException {
try {
WatchStatus watchStatus = request.isActivate() ?
watcherService.activateWatch(request.getWatchId(), request.masterNodeTimeout()) :
watcherService.deactivateWatch(request.getWatchId(), request.masterNodeTimeout());
watcherService.activateWatch(request.getWatchId()) :
watcherService.deactivateWatch(request.getWatchId());
ActivateWatchResponse response = new ActivateWatchResponse(watchStatus);
listener.onResponse(response);
} catch (Exception e) {

View File

@ -25,7 +25,6 @@ public class DeleteWatchRequest extends MasterNodeRequest<DeleteWatchRequest> {
private String id;
private long version = Versions.MATCH_ANY;
private boolean force = false;
public DeleteWatchRequest() {
this(null);
@ -50,21 +49,6 @@ public class DeleteWatchRequest extends MasterNodeRequest<DeleteWatchRequest> {
this.id = id;
}
/**
* @return true if this request should ignore locking false if not
*/
public boolean isForce() {
return force;
}
/**
* @param force Sets weither this request should ignore locking and force the delete even if lock is unavailable.
*/
public void setForce(boolean force) {
this.force = force;
}
/**
* Sets the version, which will cause the delete operation to only be performed if a matching
* version exists and no changes happened on the doc since then.
@ -95,7 +79,6 @@ public class DeleteWatchRequest extends MasterNodeRequest<DeleteWatchRequest> {
super.readFrom(in);
id = in.readString();
version = in.readLong();
force = in.readBoolean();
}
@Override
@ -103,7 +86,6 @@ public class DeleteWatchRequest extends MasterNodeRequest<DeleteWatchRequest> {
super.writeTo(out);
out.writeString(id);
out.writeLong(version);
out.writeBoolean(force);
}
@Override

View File

@ -29,13 +29,4 @@ public class DeleteWatchRequestBuilder extends MasterNodeOperationRequestBuilder
this.request().setId(id);
return this;
}
/**
* Sets wiether this request is forced (ie ignores locks)
*/
public DeleteWatchRequestBuilder setForce(boolean force) {
this.request().setForce(force);
return this;
}
}

View File

@ -10,11 +10,11 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.XPackLicenseState;
@ -55,8 +55,7 @@ public class TransportDeleteWatchAction extends WatcherTransportAction<DeleteWat
protected void masterOperation(DeleteWatchRequest request, ClusterState state, ActionListener<DeleteWatchResponse> listener) throws
ElasticsearchException {
try {
DeleteResponse deleteResponse = watcherService.deleteWatch(request.getId(), request.masterNodeTimeout(), request.isForce())
.deleteResponse();
DeleteResponse deleteResponse = watcherService.deleteWatch(request.getId()).deleteResponse();
boolean deleted = deleteResponse.getResult() == DocWriteResponse.Result.DELETED;
DeleteWatchResponse response = new DeleteWatchResponse(deleteResponse.getId(), deleteResponse.getVersion(), deleted);
listener.onResponse(response);

View File

@ -61,8 +61,7 @@ public class TransportPutWatchAction extends WatcherTransportAction<PutWatchRequ
}
try {
IndexResponse indexResponse = watcherService.putWatch(request.getId(), request.getSource(), request.masterNodeTimeout(),
request.isActive());
IndexResponse indexResponse = watcherService.putWatch(request.getId(), request.getSource(), request.isActive());
boolean created = indexResponse.getResult() == DocWriteResponse.Result.CREATED;
listener.onResponse(new PutWatchResponse(indexResponse.getId(), indexResponse.getVersion(), created));
} catch (Exception e) {

View File

@ -257,7 +257,7 @@ public class Watch implements TriggerEngine.Job, ToXContent {
* This method is only called once - when the user adds a new watch. From that moment on, all representations
* of the watch in the system will be use secrets for sensitive data.
*
* @see org.elasticsearch.xpack.watcher.WatcherService#putWatch(String, BytesReference, TimeValue, boolean)
* @see org.elasticsearch.xpack.watcher.WatcherService#putWatch(String, BytesReference, boolean)
*/
public Watch parseWithSecrets(String id, boolean includeStatus, BytesReference source, DateTime now) throws IOException {
return parse(id, includeStatus, true, source, now, false);

View File

@ -5,15 +5,13 @@
*/
package org.elasticsearch.xpack.watcher.watch;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.watcher.support.concurrent.FairKeyedLock;
import org.joda.time.PeriodType;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -25,7 +23,7 @@ import static org.elasticsearch.xpack.watcher.support.Exceptions.illegalState;
*/
public class WatchLockService extends AbstractComponent {
private final FairKeyedLock<String> watchLocks = new FairKeyedLock<>();
private final KeyedLock<String> watchLocks = new KeyedLock<>(true);
private final AtomicBoolean running = new AtomicBoolean(false);
private static final TimeValue DEFAULT_MAX_STOP_TIMEOUT = new TimeValue(30, TimeUnit.SECONDS);
private static final String DEFAULT_MAX_STOP_TIMEOUT_SETTING = "xpack.watcher.stop.timeout";
@ -43,32 +41,12 @@ public class WatchLockService extends AbstractComponent {
this.maxStopTimeout = maxStopTimeout;
}
public Lock acquire(String name) {
public Releasable acquire(String name) {
if (!running.get()) {
throw illegalState("cannot acquire lock for watch [{}]. lock service is not running", name);
}
watchLocks.acquire(name);
return new Lock(name, watchLocks);
}
public Lock tryAcquire(String name, TimeValue timeout) {
if (!running.get()) {
throw illegalState("cannot acquire lock for watch [{}]. lock service is not running", name);
}
try {
if (!watchLocks.tryAcquire(name, timeout.millis(), TimeUnit.MILLISECONDS)) {
logger.warn("failed to acquire lock on watch [{}] (waited for [{}]). It is possible that for some reason this watch " +
"execution is stuck", name, timeout.format(PeriodType.seconds()));
return null;
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
//todo figure out a better std exception for this
logger.error((Supplier<?>) () -> new ParameterizedMessage("could not acquire lock for watch [{}]", name), ie);
return null;
}
return new Lock(name, watchLocks);
return watchLocks.acquire(name);
}
public void start() {
@ -105,24 +83,7 @@ public class WatchLockService extends AbstractComponent {
}
}
FairKeyedLock<String> getWatchLocks() {
KeyedLock<String> getWatchLocks() {
return watchLocks;
}
public static class Lock {
private final String name;
private final FairKeyedLock<String> watchLocks;
private Lock(String name, FairKeyedLock<String> watchLocks) {
this.name = name;
this.watchLocks = watchLocks;
}
public void release() {
watchLocks.release(name);
}
}
}

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilders;
@ -159,32 +160,36 @@ public class WatchStore extends AbstractComponent {
// at the moment we store the status together with the watch,
// so we just need to update the watch itself
// TODO: consider storing the status in a different documment (watch_status doc) (must smaller docs... faster for frequent updates)
XContentBuilder source = JsonXContent.contentBuilder().
startObject()
.field(Watch.Field.STATUS.getPreferredName(), watch.status(), ToXContent.EMPTY_PARAMS)
.endObject();
UpdateRequest updateRequest = new UpdateRequest(INDEX, DOC_TYPE, watch.id());
updateRequest.doc(source);
updateRequest.version(watch.version());
UpdateResponse response = client.update(updateRequest);
watch.status().version(response.getVersion());
watch.version(response.getVersion());
watch.status().resetDirty();
// Don't need to update the watches, since we are working on an instance from it.
try {
UpdateResponse response = client.update(updateRequest);
watch.status().version(response.getVersion());
watch.version(response.getVersion());
watch.status().resetDirty();
} catch (DocumentMissingException e) {
// do not rethrow an exception, otherwise the watch history will contain an exception
// even though the execution might has been fine
logger.warn("Watch [{}] was deleted during watch execution, not updating watch status", watch.id());
}
}
/**
* Deletes the watch with the specified id if exists
*/
public WatchDelete delete(String id, boolean force) {
public WatchDelete delete(String id) {
ensureStarted();
Watch watch = watches.remove(id);
// even if the watch was not found in the watch map, we should still try to delete it
// from the index, just to make sure we don't leave traces of it
DeleteRequest request = new DeleteRequest(INDEX, DOC_TYPE, id);
if (watch != null && !force) {
if (watch != null) {
request.version(watch.version());
}
DeleteResponse response = client.delete(request);

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.watcher;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexResponse;
@ -13,12 +12,11 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.watcher.execution.ExecutionService;
import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry;
import org.elasticsearch.xpack.support.clock.ClockMock;
import org.elasticsearch.xpack.support.clock.SystemClock;
import org.elasticsearch.xpack.watcher.execution.ExecutionService;
import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry;
import org.elasticsearch.xpack.watcher.trigger.Trigger;
import org.elasticsearch.xpack.watcher.trigger.TriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.TriggerService;
@ -33,7 +31,6 @@ import org.junit.Before;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.emptyMap;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Matchers.any;
@ -55,17 +52,15 @@ public class WatcherServiceTests extends ESTestCase {
private WatchStore watchStore;
private Watch.Parser watchParser;
private WatcherService watcherService;
private WatchLockService watchLockService;
private ClockMock clock;
private ExecutionService executionService;
@Before
public void init() throws Exception {
triggerService = mock(TriggerService.class);
watchStore = mock(WatchStore.class);
watchParser = mock(Watch.Parser.class);
executionService = mock(ExecutionService.class);
watchLockService = mock(WatchLockService.class);
ExecutionService executionService = mock(ExecutionService.class);
WatchLockService watchLockService = mock(WatchLockService.class);
clock = new ClockMock();
WatcherIndexTemplateRegistry watcherIndexTemplateRegistry = mock(WatcherIndexTemplateRegistry.class);
watcherService = new WatcherService(Settings.EMPTY, clock, triggerService, watchStore, watchParser, executionService,
@ -87,13 +82,10 @@ public class WatcherServiceTests extends ESTestCase {
when(watchPut.indexResponse()).thenReturn(indexResponse);
when(watchPut.current()).thenReturn(newWatch);
TimeValue timeout = TimeValue.timeValueSeconds(5);
WatchLockService.Lock lock = mock(WatchLockService.Lock.class);
when(watchLockService.tryAcquire(any(String.class), eq(timeout))).thenReturn(lock);
when(watchParser.parseWithSecrets(any(String.class), eq(false), any(BytesReference.class), any(DateTime.class)))
.thenReturn(newWatch);
when(watchStore.put(newWatch)).thenReturn(watchPut);
IndexResponse response = watcherService.putWatch("_id", new BytesArray("{}"), timeout, activeByDefault);
IndexResponse response = watcherService.putWatch("_id", new BytesArray("{}"), activeByDefault);
assertThat(response, sameInstance(indexResponse));
verify(newWatch, times(1)).setState(activeByDefault, clock.nowUTC());
@ -104,19 +96,6 @@ public class WatcherServiceTests extends ESTestCase {
}
}
public void testPutWatchTimeout() throws Exception {
TimeValue timeout = TimeValue.timeValueSeconds(5);
when(watchLockService.tryAcquire("_id", timeout)).thenReturn(null);
try {
watcherService.putWatch("_id", new BytesArray("{}"), timeout, randomBoolean());
fail("Expected ElasticsearchTimeoutException");
} catch (ElasticsearchTimeoutException e) {
assertThat(e.getMessage(), containsString("could not put watch"));
assertThat(e.getMessage(), containsString("wait and try again"));
assertThat(e.getMessage(), containsString("there is a high chance that the watch execution is stuck"));
}
}
public void testPutWatchDifferentActiveStates() throws Exception {
Trigger trigger = mock(Trigger.class);
@ -141,13 +120,10 @@ public class WatcherServiceTests extends ESTestCase {
when(previousWatch.trigger()).thenReturn(trigger);
when(watchPut.previous()).thenReturn(previousWatch);
TimeValue timeout = TimeValue.timeValueSeconds(5);
WatchLockService.Lock lock = mock(WatchLockService.Lock.class);
when(watchLockService.tryAcquire(any(String.class), eq(timeout))).thenReturn(lock);
when(watchParser.parseWithSecrets(any(String.class), eq(false), any(BytesReference.class), eq(clock.nowUTC()))).thenReturn(watch);
when(watchStore.put(watch)).thenReturn(watchPut);
IndexResponse response = watcherService.putWatch("_id", new BytesArray("{}"), timeout, active);
IndexResponse response = watcherService.putWatch("_id", new BytesArray("{}"), active);
assertThat(response, sameInstance(indexResponse));
if (!active) {
@ -163,63 +139,24 @@ public class WatcherServiceTests extends ESTestCase {
}
public void testDeleteWatch() throws Exception {
TimeValue timeout = TimeValue.timeValueSeconds(5);
WatchLockService.Lock lock = mock(WatchLockService.Lock.class);
boolean force = randomBoolean();
when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock);
WatchStore.WatchDelete expectedWatchDelete = mock(WatchStore.WatchDelete.class);
DeleteResponse deleteResponse = mock(DeleteResponse.class);
when(deleteResponse.getResult()).thenReturn(DocWriteResponse.Result.DELETED);
when(expectedWatchDelete.deleteResponse()).thenReturn(deleteResponse);
when(watchStore.delete("_id", force)).thenReturn(expectedWatchDelete);
WatchStore.WatchDelete watchDelete = watcherService.deleteWatch("_id", timeout, force);
assertThat(watchDelete, sameInstance(expectedWatchDelete));
verify(triggerService, times(1)).remove("_id");
}
public void testDeleteWatchTimeout() throws Exception {
TimeValue timeout = TimeValue.timeValueSeconds(5);
when(watchLockService.tryAcquire("_id", timeout)).thenReturn(null);
try {
watcherService.deleteWatch("_id", timeout, false);
fail("Expected ElasticsearchTimeoutException");
} catch (ElasticsearchTimeoutException e) {
assertThat(e.getMessage(), containsString("could not delete watch"));
assertThat(e.getMessage(), containsString("wait and try again"));
assertThat(e.getMessage(), containsString("there is a high chance that the watch execution is stuck"));
}
}
public void testDeleteWatchForce() throws Exception {
TimeValue timeout = TimeValue.timeValueSeconds(5);
WatchLockService.Lock lock = mock(WatchLockService.Lock.class);
when(watchLockService.tryAcquire("_id", timeout)).thenReturn(null);
WatchStore.WatchDelete expectedWatchDelete = mock(WatchStore.WatchDelete.class);
DeleteResponse deleteResponse = mock(DeleteResponse.class);
when(deleteResponse.getResult()).thenReturn(DocWriteResponse.Result.DELETED);
when(expectedWatchDelete.deleteResponse()).thenReturn(deleteResponse);
when(watchStore.delete("_id", true)).thenReturn(expectedWatchDelete);
WatchStore.WatchDelete watchDelete = watcherService.deleteWatch("_id", timeout, true);
when(watchStore.delete("_id")).thenReturn(expectedWatchDelete);
WatchStore.WatchDelete watchDelete = watcherService.deleteWatch("_id");
assertThat(watchDelete, sameInstance(expectedWatchDelete));
verify(triggerService, times(1)).remove("_id");
}
public void testDeleteWatchNotFound() throws Exception {
TimeValue timeout = TimeValue.timeValueSeconds(5);
boolean force = randomBoolean();
WatchLockService.Lock lock = mock(WatchLockService.Lock.class);
when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock);
WatchStore.WatchDelete expectedWatchDelete = mock(WatchStore.WatchDelete.class);
DeleteResponse deleteResponse = mock(DeleteResponse.class);
when(deleteResponse.getResult()).thenReturn(DocWriteResponse.Result.NOOP);
when(expectedWatchDelete.deleteResponse()).thenReturn(deleteResponse);
when(watchStore.delete("_id", force)).thenReturn(expectedWatchDelete);
WatchStore.WatchDelete watchDelete = watcherService.deleteWatch("_id", timeout, force);
when(watchStore.delete("_id")).thenReturn(expectedWatchDelete);
WatchStore.WatchDelete watchDelete = watcherService.deleteWatch("_id");
assertThat(watchDelete, sameInstance(expectedWatchDelete));
verifyZeroInteractions(triggerService);
@ -228,39 +165,34 @@ public class WatcherServiceTests extends ESTestCase {
public void testAckWatch() throws Exception {
DateTime now = new DateTime(DateTimeZone.UTC);
clock.setTime(now);
TimeValue timeout = TimeValue.timeValueSeconds(5);
WatchLockService.Lock lock = mock(WatchLockService.Lock.class);
when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock);
Watch watch = mock(Watch.class);
when(watch.ack(now, "_all")).thenReturn(true);
WatchStatus status = new WatchStatus(now, emptyMap());
when(watch.status()).thenReturn(status);
when(watchStore.get("_id")).thenReturn(watch);
WatchStatus result = watcherService.ackWatch("_id", Strings.EMPTY_ARRAY, timeout);
WatchStatus result = watcherService.ackWatch("_id", Strings.EMPTY_ARRAY);
assertThat(result, not(sameInstance(status)));
verify(watchStore, times(1)).updateStatus(watch);
}
public void testActivate() throws Exception {
TimeValue timeout = TimeValue.timeValueSeconds(randomIntBetween(1, 30));
WatcherService service = spy(watcherService);
WatchStatus expectedStatus = mock(WatchStatus.class);
doReturn(expectedStatus).when(service).setWatchState("_id", true, timeout);
WatchStatus actualStatus = service.activateWatch("_id", timeout);
doReturn(expectedStatus).when(service).setWatchState("_id", true);
WatchStatus actualStatus = service.activateWatch("_id");
assertThat(actualStatus, sameInstance(expectedStatus));
verify(service, times(1)).setWatchState("_id", true, timeout);
verify(service, times(1)).setWatchState("_id", true);
}
public void testDeactivate() throws Exception {
TimeValue timeout = TimeValue.timeValueSeconds(randomIntBetween(1, 30));
WatcherService service = spy(watcherService);
WatchStatus expectedStatus = mock(WatchStatus.class);
doReturn(expectedStatus).when(service).setWatchState("_id", false, timeout);
WatchStatus actualStatus = service.deactivateWatch("_id", timeout);
doReturn(expectedStatus).when(service).setWatchState("_id", false);
WatchStatus actualStatus = service.deactivateWatch("_id");
assertThat(actualStatus, sameInstance(expectedStatus));
verify(service, times(1)).setWatchState("_id", false, timeout);
verify(service, times(1)).setWatchState("_id", false);
}
public void testSetWatchStateSetActiveOnCurrentlyActive() throws Exception {
@ -268,12 +200,8 @@ public class WatcherServiceTests extends ESTestCase {
// - the watch status should not change
// - the watch doesn't need to be updated in the store
// - the watch should not be removed or re-added to the trigger service
DateTime now = new DateTime(DateTimeZone.UTC);
clock.setTime(now);
TimeValue timeout = TimeValue.timeValueSeconds(5);
WatchLockService.Lock lock = mock(WatchLockService.Lock.class);
when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock);
Watch watch = mock(Watch.class);
WatchStatus status = new WatchStatus(now, emptyMap());
@ -283,7 +211,7 @@ public class WatcherServiceTests extends ESTestCase {
when(watchStore.get("_id")).thenReturn(watch);
WatchStatus result = watcherService.setWatchState("_id", true, timeout);
WatchStatus result = watcherService.setWatchState("_id", true);
assertThat(result, not(sameInstance(status)));
verifyZeroInteractions(triggerService);
@ -299,10 +227,6 @@ public class WatcherServiceTests extends ESTestCase {
DateTime now = new DateTime(DateTimeZone.UTC);
clock.setTime(now);
TimeValue timeout = TimeValue.timeValueSeconds(5);
WatchLockService.Lock lock = mock(WatchLockService.Lock.class);
when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock);
Watch watch = mock(Watch.class);
WatchStatus status = new WatchStatus(now, emptyMap());
when(watch.status()).thenReturn(status);
@ -310,7 +234,7 @@ public class WatcherServiceTests extends ESTestCase {
when(watchStore.get("_id")).thenReturn(watch);
WatchStatus result = watcherService.setWatchState("_id", true, timeout);
WatchStatus result = watcherService.setWatchState("_id", true);
assertThat(result, not(sameInstance(status)));
verify(triggerService, times(1)).add(watch);
@ -322,12 +246,8 @@ public class WatcherServiceTests extends ESTestCase {
// - the watch status should change
// - the watch needs to be updated in the store
// - the watch should be removed from the trigger service
DateTime now = new DateTime(DateTimeZone.UTC);
clock.setTime(now);
TimeValue timeout = TimeValue.timeValueSeconds(5);
WatchLockService.Lock lock = mock(WatchLockService.Lock.class);
when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock);
Watch watch = mock(Watch.class);
when(watch.id()).thenReturn("_id");
@ -337,7 +257,7 @@ public class WatcherServiceTests extends ESTestCase {
when(watchStore.get("_id")).thenReturn(watch);
WatchStatus result = watcherService.setWatchState("_id", false, timeout);
WatchStatus result = watcherService.setWatchState("_id", false);
assertThat(result, not(sameInstance(status)));
verify(triggerService, times(1)).remove("_id");
@ -349,14 +269,9 @@ public class WatcherServiceTests extends ESTestCase {
// - the watch status should not be updated
// - the watch should not be updated in the store
// - the watch should be re-added or removed to/from the trigger service
DateTime now = new DateTime(DateTimeZone.UTC);
clock.setTime(now);
TimeValue timeout = TimeValue.timeValueSeconds(5);
WatchLockService.Lock lock = mock(WatchLockService.Lock.class);
when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock);
Watch watch = mock(Watch.class);
when(watch.id()).thenReturn("_id");
WatchStatus status = new WatchStatus(now, emptyMap());
@ -365,51 +280,32 @@ public class WatcherServiceTests extends ESTestCase {
when(watchStore.get("_id")).thenReturn(watch);
WatchStatus result = watcherService.setWatchState("_id", false, timeout);
WatchStatus result = watcherService.setWatchState("_id", false);
assertThat(result, not(sameInstance(status)));
verifyZeroInteractions(triggerService);
verify(watchStore, never()).updateStatus(watch);
}
public void testAckWatchTimeout() throws Exception {
TimeValue timeout = TimeValue.timeValueSeconds(5);
when(watchLockService.tryAcquire("_id", timeout)).thenReturn(null);
try {
watcherService.ackWatch("_id", Strings.EMPTY_ARRAY, timeout);
fail("Expected ElasticsearchTimeoutException");
} catch (ElasticsearchTimeoutException e) {
assertThat(e.getMessage(), containsString("could not ack watch"));
assertThat(e.getMessage(), containsString("wait and try again"));
assertThat(e.getMessage(), containsString("there is a high chance that the watch execution is stuck"));
}
}
public void testAckWatchNotAck() throws Exception {
DateTime now = SystemClock.INSTANCE.nowUTC();
TimeValue timeout = TimeValue.timeValueSeconds(5);
WatchLockService.Lock lock = mock(WatchLockService.Lock.class);
when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock);
Watch watch = mock(Watch.class);
when(watch.ack(now)).thenReturn(false);
WatchStatus status = new WatchStatus(now, emptyMap());
when(watch.status()).thenReturn(status);
when(watchStore.get("_id")).thenReturn(watch);
WatchStatus result = watcherService.ackWatch("_id", Strings.EMPTY_ARRAY, timeout);
WatchStatus result = watcherService.ackWatch("_id", Strings.EMPTY_ARRAY);
assertThat(result, not(sameInstance(status)));
verify(watchStore, never()).updateStatus(watch);
}
public void testAckWatchNoWatch() throws Exception {
TimeValue timeout = TimeValue.timeValueSeconds(5);
WatchLockService.Lock lock = mock(WatchLockService.Lock.class);
when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock);
when(watchStore.get("_id")).thenReturn(null);
try {
watcherService.ackWatch("_id", Strings.EMPTY_ARRAY, timeout);
watcherService.ackWatch("_id", Strings.EMPTY_ARRAY);
fail();
} catch (IllegalArgumentException iae) {
// expected

View File

@ -7,6 +7,10 @@ package org.elasticsearch.xpack.watcher.actions.throttler;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.common.http.HttpRequestTemplate;
import org.elasticsearch.xpack.common.text.TextTemplate;
import org.elasticsearch.xpack.notification.email.EmailTemplate;
import org.elasticsearch.xpack.support.clock.SystemClock;
import org.elasticsearch.xpack.watcher.actions.Action;
import org.elasticsearch.xpack.watcher.actions.ActionWrapper;
import org.elasticsearch.xpack.watcher.actions.email.EmailAction;
@ -18,9 +22,6 @@ import org.elasticsearch.xpack.watcher.execution.ActionExecutionMode;
import org.elasticsearch.xpack.watcher.execution.ExecutionState;
import org.elasticsearch.xpack.watcher.execution.ManualExecutionContext;
import org.elasticsearch.xpack.watcher.history.WatchRecord;
import org.elasticsearch.xpack.support.clock.SystemClock;
import org.elasticsearch.xpack.common.http.HttpRequestTemplate;
import org.elasticsearch.xpack.common.text.TextTemplate;
import org.elasticsearch.xpack.watcher.support.xcontent.ObjectPath;
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
import org.elasticsearch.xpack.watcher.transport.actions.execute.ExecuteWatchResponse;
@ -31,7 +32,6 @@ import org.elasticsearch.xpack.watcher.trigger.manual.ManualTriggerEvent;
import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.elasticsearch.xpack.notification.email.EmailTemplate;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
@ -77,7 +77,7 @@ public class ActionThrottleTests extends AbstractWatcherIntegrationTestCase {
if (useClientForAcking) {
watcherClient().prepareAckWatch("_id").setActionIds("test_id").get();
} else {
watchService().ackWatch("_id", new String[] { "test_id" }, new TimeValue(5, TimeUnit.SECONDS));
watchService().ackWatch("_id", new String[] { "test_id" });
}
}
ctx = getManualExecutionContext(new TimeValue(0, TimeUnit.SECONDS));
@ -117,7 +117,7 @@ public class ActionThrottleTests extends AbstractWatcherIntegrationTestCase {
if (useClientForAcking) {
watcherClient().prepareAckWatch("_id").setActionIds(actionId).get();
} else {
watchService().ackWatch("_id", new String[]{actionId}, new TimeValue(5, TimeUnit.SECONDS));
watchService().ackWatch("_id", new String[]{actionId});
}
}
@ -192,7 +192,6 @@ public class ActionThrottleTests extends AbstractWatcherIntegrationTestCase {
.trigger(schedule(interval("60m")));
AvailableAction availableAction = randomFrom(AvailableAction.values());
final String actionType = availableAction.type();
watchSourceBuilder.addAction("default_global_throttle", availableAction.action());
PutWatchResponse putWatchResponse = watcherClient().putWatch(new PutWatchRequest("_id", watchSourceBuilder)).actionGet();

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.watcher.execution;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.test.ESTestCase;
@ -111,8 +112,8 @@ public class ExecutionServiceTests extends ESTestCase {
}
public void testExecute() throws Exception {
WatchLockService.Lock lock = mock(WatchLockService.Lock.class);
when(watchLockService.acquire("_id")).thenReturn(lock);
Releasable releasable = mock(Releasable.class);
when(watchLockService.acquire("_id")).thenReturn(releasable);
Watch watch = mock(Watch.class);
when(watch.id()).thenReturn("_id");
@ -190,7 +191,7 @@ public class ExecutionServiceTests extends ESTestCase {
assertThat(result.action(), sameInstance(actionResult));
verify(historyStore, times(1)).put(watchRecord);
verify(lock, times(1)).release();
verify(releasable, times(1)).close();
verify(condition, times(1)).execute(context);
verify(watchTransform, times(1)).execute(context, payload);
verify(action, times(1)).execute("_action", context, payload);
@ -204,8 +205,8 @@ public class ExecutionServiceTests extends ESTestCase {
}
public void testExecuteFailedInput() throws Exception {
WatchLockService.Lock lock = mock(WatchLockService.Lock.class);
when(watchLockService.acquire("_id")).thenReturn(lock);
Releasable releasable = mock(Releasable.class);
when(watchLockService.acquire("_id")).thenReturn(releasable);
Watch watch = mock(Watch.class);
when(watch.id()).thenReturn("_id");
@ -267,7 +268,7 @@ public class ExecutionServiceTests extends ESTestCase {
assertThat(watchRecord.result().actionsResults().count(), is(0));
verify(historyStore, times(1)).put(watchRecord);
verify(lock, times(1)).release();
verify(releasable, times(1)).close();
verify(input, times(1)).execute(context, null);
verify(condition, never()).execute(context);
verify(watchTransform, never()).execute(context, payload);
@ -275,8 +276,8 @@ public class ExecutionServiceTests extends ESTestCase {
}
public void testExecuteFailedCondition() throws Exception {
WatchLockService.Lock lock = mock(WatchLockService.Lock.class);
when(watchLockService.acquire("_id")).thenReturn(lock);
Releasable releasable = mock(Releasable.class);
when(watchLockService.acquire("_id")).thenReturn(releasable);
Watch watch = mock(Watch.class);
when(watch.id()).thenReturn("_id");
@ -334,7 +335,7 @@ public class ExecutionServiceTests extends ESTestCase {
assertThat(watchRecord.result().actionsResults().count(), is(0));
verify(historyStore, times(1)).put(watchRecord);
verify(lock, times(1)).release();
verify(releasable, times(1)).close();
verify(input, times(1)).execute(context, null);
verify(condition, times(1)).execute(context);
verify(watchTransform, never()).execute(context, payload);
@ -342,8 +343,8 @@ public class ExecutionServiceTests extends ESTestCase {
}
public void testExecuteFailedWatchTransform() throws Exception {
WatchLockService.Lock lock = mock(WatchLockService.Lock.class);
when(watchLockService.acquire("_id")).thenReturn(lock);
Releasable releasable = mock(Releasable.class);
when(watchLockService.acquire("_id")).thenReturn(releasable);
Watch watch = mock(Watch.class);
when(watch.id()).thenReturn("_id");
@ -400,7 +401,7 @@ public class ExecutionServiceTests extends ESTestCase {
assertThat(watchRecord.result().actionsResults().count(), is(0));
verify(historyStore, times(1)).put(watchRecord);
verify(lock, times(1)).release();
verify(releasable, times(1)).close();
verify(input, times(1)).execute(context, null);
verify(condition, times(1)).execute(context);
verify(watchTransform, times(1)).execute(context, payload);
@ -408,8 +409,8 @@ public class ExecutionServiceTests extends ESTestCase {
}
public void testExecuteFailedActionTransform() throws Exception {
WatchLockService.Lock lock = mock(WatchLockService.Lock.class);
when(watchLockService.acquire("_id")).thenReturn(lock);
Releasable releasable = mock(Releasable.class);
when(watchLockService.acquire("_id")).thenReturn(releasable);
Watch watch = mock(Watch.class);
when(watch.id()).thenReturn("_id");
@ -483,7 +484,7 @@ public class ExecutionServiceTests extends ESTestCase {
assertThat(watchRecord.result().actionsResults().get("_action").action().status(), is(Action.Result.Status.FAILURE));
verify(historyStore, times(1)).put(watchRecord);
verify(lock, times(1)).release();
verify(releasable, times(1)).close();
verify(input, times(1)).execute(context, null);
verify(condition, times(1)).execute(context);
verify(watchTransform, times(1)).execute(context, payload);

View File

@ -390,7 +390,7 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTestCase {
for (Thread thread : threads) {
thread.start();
}
DeleteWatchResponse deleteWatchResponse = watcherClient().prepareDeleteWatch("_id").setForce(true).get();
DeleteWatchResponse deleteWatchResponse = watcherClient().prepareDeleteWatch("_id").get();
assertThat(deleteWatchResponse.isFound(), is(true));
deleteWatchResponse = watcherClient().prepareDeleteWatch("_id").get();

View File

@ -1,209 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.support.concurrent;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
/**
*/
public class FairKeyedLockTests extends ESTestCase {
public void testIfMapEmptyAfterLotsOfAcquireAndReleases() throws InterruptedException {
ConcurrentHashMap<String, Integer> counter = new ConcurrentHashMap<>();
ConcurrentHashMap<String, AtomicInteger> safeCounter = new ConcurrentHashMap<>();
FairKeyedLock<String> connectionLock = randomBoolean() ? new FairKeyedLock.GlobalLockable<>() : new FairKeyedLock<>();
String[] names = new String[randomIntBetween(1, 40)];
for (int i = 0; i < names.length; i++) {
names[i] = randomRealisticUnicodeOfLengthBetween(10, 20);
}
CountDownLatch startLatch = new CountDownLatch(1);
int numThreads = randomIntBetween(3, 10);
AcquireAndReleaseThread[] threads = new AcquireAndReleaseThread[numThreads];
for (int i = 0; i < numThreads; i++) {
threads[i] = new AcquireAndReleaseThread(startLatch, connectionLock, names, counter, safeCounter);
}
for (int i = 0; i < numThreads; i++) {
threads[i].start();
}
startLatch.countDown();
for (int i = 0; i < numThreads; i++) {
if (randomBoolean()) {
threads[i].incWithGlobal();
}
}
for (int i = 0; i < numThreads; i++) {
threads[i].join();
}
assertThat(connectionLock.hasLockedKeys(), equalTo(false));
Set<Map.Entry<String, Integer>> entrySet = counter.entrySet();
assertThat(counter.size(), equalTo(safeCounter.size()));
for (Map.Entry<String, Integer> entry : entrySet) {
AtomicInteger atomicInteger = safeCounter.get(entry.getKey());
assertThat(atomicInteger, not(Matchers.nullValue()));
assertThat(atomicInteger.get(), equalTo(entry.getValue()));
}
}
public void testCannotAcquireTwoLocksGlobal() throws InterruptedException {
FairKeyedLock.GlobalLockable<String> connectionLock = new FairKeyedLock.GlobalLockable<>();
String name = randomRealisticUnicodeOfLength(scaledRandomIntBetween(10, 50));
connectionLock.acquire(name);
try {
connectionLock.acquire(name);
fail("Expected IllegalArgumentException");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("Lock already acquired"));
} finally {
connectionLock.release(name);
connectionLock.globalLock().lock();
connectionLock.globalLock().unlock();
}
}
public void testCannotAcquireTwoLocks() throws InterruptedException {
FairKeyedLock<String> connectionLock = randomBoolean() ? new FairKeyedLock.GlobalLockable<>() : new FairKeyedLock<>();
String name = randomRealisticUnicodeOfLength(scaledRandomIntBetween(10, 50));
connectionLock.acquire(name);
try {
connectionLock.acquire(name);
fail("Expected IllegalArgumentException");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("Lock already acquired"));
}
}
public void testTryAquire() throws InterruptedException {
final FairKeyedLock<String> connectionLock = randomBoolean() ? new FairKeyedLock.GlobalLockable<>() : new FairKeyedLock<>();
final String name = randomRealisticUnicodeOfLength(scaledRandomIntBetween(10, 50));
connectionLock.acquire(name);
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<String> failure = new AtomicReference<>();
Thread other = new Thread() {
@Override
public void run() {
try {
if (connectionLock.tryAcquire(name, 2, TimeUnit.SECONDS)) {
failure.set("expected to fail acquiring of the lock due to timeout");
}
latch.countDown();
} catch (InterruptedException e) {
latch.countDown();
}
}
};
other.start();
if (!latch.await(5, TimeUnit.SECONDS)) {
fail("waiting too long for lock acquire");
}
String failureMessage = failure.get();
if(failureMessage != null) {
fail(failureMessage);
}
}
public void testCannotReleaseUnacquiredLock() throws InterruptedException {
FairKeyedLock<String> connectionLock = randomBoolean() ? new FairKeyedLock.GlobalLockable<>() : new FairKeyedLock<>();
String name = randomRealisticUnicodeOfLength(scaledRandomIntBetween(10, 50));
try {
connectionLock.release(name);
fail("Expected IllegalArgumentException");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), is("Lock not acquired"));
}
}
public static class AcquireAndReleaseThread extends Thread {
private CountDownLatch startLatch;
FairKeyedLock<String> connectionLock;
String[] names;
ConcurrentHashMap<String, Integer> counter;
ConcurrentHashMap<String, AtomicInteger> safeCounter;
public AcquireAndReleaseThread(CountDownLatch startLatch, FairKeyedLock<String> connectionLock, String[] names,
ConcurrentHashMap<String, Integer> counter, ConcurrentHashMap<String, AtomicInteger> safeCounter) {
this.startLatch = startLatch;
this.connectionLock = connectionLock;
this.names = names;
this.counter = counter;
this.safeCounter = safeCounter;
}
@Override
public void run() {
try {
startLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException();
}
int numRuns = scaledRandomIntBetween(5000, 50000);
for (int i = 0; i < numRuns; i++) {
String curName = names[randomInt(names.length - 1)];
connectionLock.acquire(curName);
try {
Integer integer = counter.get(curName);
if (integer == null) {
counter.put(curName, 1);
} else {
counter.put(curName, integer.intValue() + 1);
}
} finally {
connectionLock.release(curName);
}
AtomicInteger atomicInteger = new AtomicInteger(0);
AtomicInteger value = safeCounter.putIfAbsent(curName, atomicInteger);
if (value == null) {
atomicInteger.incrementAndGet();
} else {
value.incrementAndGet();
}
}
}
public void incWithGlobal() {
if (connectionLock instanceof FairKeyedLock.GlobalLockable) {
final int iters = randomIntBetween(10, 200);
for (int i = 0; i < iters; i++) {
((FairKeyedLock.GlobalLockable) connectionLock).globalLock().lock();
try {
String curName = names[randomInt(names.length - 1)];
Integer integer = counter.get(curName);
if (integer == null) {
counter.put(curName, 1);
} else {
counter.put(curName, integer.intValue() + 1);
}
AtomicInteger atomicInteger = new AtomicInteger(0);
AtomicInteger value = safeCounter.putIfAbsent(curName, atomicInteger);
if (value == null) {
atomicInteger.incrementAndGet();
} else {
value.incrementAndGet();
}
} finally {
((FairKeyedLock.GlobalLockable) connectionLock).globalLock().unlock();
}
}
}
}
}
}

View File

@ -5,20 +5,38 @@
*/
package org.elasticsearch.xpack.watcher.transport.action.delete;
import com.squareup.okhttp.mockwebserver.MockResponse;
import com.squareup.okhttp.mockwebserver.MockWebServer;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.xpack.common.http.HttpRequestTemplate;
import org.elasticsearch.xpack.watcher.history.HistoryStore;
import org.elasticsearch.xpack.watcher.support.xcontent.ObjectPath;
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
import org.elasticsearch.xpack.watcher.transport.actions.delete.DeleteWatchRequest;
import org.elasticsearch.xpack.watcher.transport.actions.delete.DeleteWatchResponse;
import org.elasticsearch.xpack.watcher.transport.actions.execute.ExecuteWatchResponse;
import org.elasticsearch.xpack.watcher.transport.actions.get.GetWatchResponse;
import org.elasticsearch.xpack.watcher.transport.actions.put.PutWatchResponse;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static com.carrotsearch.randomizedtesting.RandomizedTest.sleep;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingAction;
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
import static org.elasticsearch.xpack.watcher.condition.ConditionBuilders.alwaysCondition;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.httpInput;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput;
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
/**
@ -31,7 +49,7 @@ public class DeleteWatchTests extends AbstractWatcherIntegrationTestCase {
.trigger(schedule(interval("5m")))
.input(simpleInput())
.condition(alwaysCondition())
.addAction("_action1", loggingAction("{{ctx.watch_id}}")))
.addAction("_action1", loggingAction("anything")))
.get();
assertThat(putResponse, notNullValue());
@ -60,4 +78,63 @@ public class DeleteWatchTests extends AbstractWatcherIntegrationTestCase {
assertThat(e.getMessage(), containsString("Watch id cannot have white spaces"));
}
}
// This is a special case, since locking is removed
// Deleting a watch while it is being executed is possible now
// This test ensures that there are no leftovers, like a watch status without a watch in the watch store
// Also the watch history is checked, that the error has been marked as deleted
// The mock webserver does not support count down latches, so we have to use sleep - sorry!
public void testWatchDeletionDuringExecutionWorks() throws Exception {
ensureWatcherStarted();
MockResponse response = new MockResponse();
response.setBody("foo");
response.setResponseCode(200);
response.setBodyDelay(5, TimeUnit.SECONDS);
MockWebServer server = new MockWebServer();
server.enqueue(response);
try {
server.start();
HttpRequestTemplate template = HttpRequestTemplate.builder(server.getHostName(), server.getPort()).path("/").build();
PutWatchResponse responseFuture = watcherClient().preparePutWatch("_name").setSource(watchBuilder()
.trigger(schedule(interval("6h")))
.input(httpInput(template))
.addAction("_action1", loggingAction("anything")))
.get();
assertThat(responseFuture.isCreated(), is(true));
ListenableActionFuture<ExecuteWatchResponse> executeWatchFuture =
watcherClient().prepareExecuteWatch("_name").setRecordExecution(true).execute();
// without this sleep the delete operation might overtake the watch execution
sleep(1000);
DeleteWatchResponse deleteWatchResponse = watcherClient().prepareDeleteWatch("_name").get();
assertThat(deleteWatchResponse.isFound(), is(true));
executeWatchFuture.get();
// the watch is gone, no leftovers
GetWatchResponse getWatchResponse = watcherClient().prepareGetWatch("_name").get();
assertThat(getWatchResponse.isFound(), is(false));
// the watch history shows a successful execution, even though the watch was deleted
// during execution
refresh(HistoryStore.INDEX_PREFIX + "*");
SearchResponse searchResponse = client().prepareSearch(HistoryStore.INDEX_PREFIX + "*").setQuery(matchAllQuery()).get();
assertHitCount(searchResponse, 1);
Map<String, Object> source = searchResponse.getHits().getAt(0).sourceAsMap();
// watch has been executed successfully
String state = ObjectPath.eval("state", source);
assertThat(state, is("executed"));
// no exception occured
assertThat(source, not(hasKey("exception")));
} finally {
server.shutdown();
}
}
}

View File

@ -55,7 +55,7 @@ public class ForceDeleteWatchTests extends AbstractWatcherIntegrationTestCase {
.get();
assertThat(putResponse.getId(), equalTo("_name"));
Thread.sleep(5000);
DeleteWatchResponse deleteWatchResponse = watcherClient().prepareDeleteWatch("_name").setForce(true).get();
DeleteWatchResponse deleteWatchResponse = watcherClient().prepareDeleteWatch("_name").get();
assertThat(deleteWatchResponse.isFound(), is(true));
deleteWatchResponse = watcherClient().prepareDeleteWatch("_name").get();
assertThat(deleteWatchResponse.isFound(), is(false));

View File

@ -5,7 +5,9 @@
*/
package org.elasticsearch.xpack.watcher.watch;
import junit.framework.AssertionFailedError;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
@ -36,27 +38,13 @@ public class WatchLockServiceTests extends ESTestCase {
public void testLocking() {
WatchLockService lockService = new WatchLockService(new TimeValue(1, TimeUnit.SECONDS));
lockService.start();
WatchLockService.Lock lock = lockService.acquire("_name");
Releasable releasable = lockService.acquire("_name");
assertThat(lockService.getWatchLocks().hasLockedKeys(), is(true));
lock.release();
releasable.close();
assertThat(lockService.getWatchLocks().hasLockedKeys(), is(false));
lockService.stop();
}
public void testLockingAlreadyHeld() {
WatchLockService lockService = new WatchLockService(new TimeValue(1, TimeUnit.SECONDS));
lockService.start();
WatchLockService.Lock lock1 = lockService.acquire("_name");
try {
lockService.acquire("_name");
fail("exception expected");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("Lock already acquired"));
}
lock1.release();
lockService.stop();
}
public void testLockingStopTimeout(){
final WatchLockService lockService = new WatchLockService(new TimeValue(1, TimeUnit.SECONDS));
lockService.start();
@ -87,14 +75,11 @@ public class WatchLockServiceTests extends ESTestCase {
@Override
public void run() {
startLatch.countDown();
WatchLockService.Lock lock = lockService.acquire("_name");
try {
try (Releasable ignored = lockService.acquire("_name")) {
int actualValue = value.getAndIncrement();
assertThat(actualValue, equalTo(expectedValue));
Thread.sleep(50);
} catch(InterruptedException ie) {
} finally {
lock.release();
}
}
}

View File

@ -16,10 +16,6 @@
"master_timeout": {
"type": "duration",
"description": "Specify timeout for watch write operation"
},
"force": {
"type": "boolean",
"description": "Specify if this request should be forced and ignore locks"
}
}
},