Add force parameter to delete to force a delete

This change adds a force paramter to delete to allow the forcing of a delete to happen ignoring locking.
This means that watch executions may fail when they go to update the watch. Watches executing from the scheduler will fail fast if the watch they are supposed to execute has been removed.
Also move the history store updates outside of the watch locks.

Fixes: elastic/elasticsearch#405

Original commit: elastic/x-pack-elasticsearch@57561b6f85
This commit is contained in:
Brian Murphy 2015-05-08 17:19:24 -04:00
parent 87d4f67b10
commit eef3a1b1e8
13 changed files with 289 additions and 54 deletions

View File

@ -16,6 +16,10 @@
"master_timeout": { "master_timeout": {
"type": "duration", "type": "duration",
"description": "Specify timeout for watch write operation" "description": "Specify timeout for watch write operation"
},
"force": {
"type": "boolean",
"description": "Specify if this request should be forced and ignore locks"
} }
} }
}, },

View File

@ -15,6 +15,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.PeriodType; import org.elasticsearch.common.joda.time.PeriodType;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.watcher.execution.ExecutionService; import org.elasticsearch.watcher.execution.ExecutionService;
import org.elasticsearch.watcher.trigger.TriggerService; import org.elasticsearch.watcher.trigger.TriggerService;
import org.elasticsearch.watcher.watch.Watch; import org.elasticsearch.watcher.watch.Watch;
@ -78,20 +79,25 @@ public class WatcherService extends AbstractComponent {
} }
} }
public WatchStore.WatchDelete deleteWatch(String id, TimeValue timeout) { public WatchStore.WatchDelete deleteWatch(String id, TimeValue timeout, final boolean force) {
ensureStarted(); ensureStarted();
WatchLockService.Lock lock = watchLockService.tryAcquire(id, timeout); WatchLockService.Lock lock = null;
if (lock == null) { if (!force) {
throw new TimeoutException("could not delete watch [{}] within [{}]... wait and try again. If this error continues to occur there is a high chance that the watch execution is stuck (either due to unresponsive external system such as an email service, or due to a bad script", id, timeout.format(PeriodType.seconds())); lock = watchLockService.tryAcquire(id, timeout);
if (lock == null) {
throw new TimeoutException("could not delete watch [{}] within [{}]... wait and try again. If this error continues to occur there is a high chance that the watch execution is stuck (either due to unresponsive external system such as an email service, or due to a bad script", id, timeout.format(PeriodType.seconds()));
}
} }
try { try {
WatchStore.WatchDelete delete = watchStore.delete(id); WatchStore.WatchDelete delete = watchStore.delete(id, force);
if (delete.deleteResponse().isFound()) { if (delete.deleteResponse().isFound()) {
triggerService.remove(id); triggerService.remove(id);
} }
return delete; return delete;
} finally { } finally {
lock.release(); if (lock != null) {
lock.release();
}
} }
} }
@ -145,7 +151,9 @@ public class WatcherService extends AbstractComponent {
try { try {
watchStore.updateStatus(watch); watchStore.updateStatus(watch);
} catch (IOException ioe) { } catch (IOException ioe) {
throw new WatcherException("failed to update the watch on ack", ioe); throw new WatcherException("failed to update the watch [{}] on ack", ioe, watch.id());
} catch (VersionConflictEngineException vcee) {
throw new WatcherException("failed to update the watch [{}] on ack, perhaps it was force deleted", vcee, watch.id());
} }
} }
// we need to create a safe copy of the status // we need to create a safe copy of the status

View File

@ -14,6 +14,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.watcher.WatcherException; import org.elasticsearch.watcher.WatcherException;
import org.elasticsearch.watcher.actions.ActionWrapper; import org.elasticsearch.watcher.actions.ActionWrapper;
import org.elasticsearch.watcher.condition.Condition; import org.elasticsearch.watcher.condition.Condition;
@ -209,18 +210,21 @@ public class ExecutionService extends AbstractComponent {
public WatchRecord execute(WatchExecutionContext ctx) throws IOException { public WatchRecord execute(WatchExecutionContext ctx) throws IOException {
WatchRecord watchRecord = new WatchRecord(ctx.id(), ctx.watch(), ctx.triggerEvent()); WatchRecord watchRecord = new WatchRecord(ctx.id(), ctx.watch(), ctx.triggerEvent());
WatchLockService.Lock lock = watchLockService.acquire(ctx.watch().id()); WatchLockService.Lock lock = watchLockService.acquire(ctx.watch().id());
try { try {
WatchExecutionResult result = executeInner(ctx); WatchExecutionResult result = executeInner(ctx);
watchRecord.seal(result); watchRecord.seal(result);
if (ctx.recordExecution()) {
watchStore.updateStatus(ctx.watch());
}
} catch (VersionConflictEngineException vcee) {
throw new WatcherException("Failed to update the watch [{}] on execute perhaps it was force deleted", vcee, ctx.watch().id());
} finally { } finally {
lock.release(); lock.release();
} }
if (ctx.recordExecution()) { if (ctx.recordExecution()) {
historyStore.put(watchRecord); historyStore.put(watchRecord);
} }
watchStore.updateStatus(ctx.watch());
return watchRecord; return watchRecord;
} }
@ -316,28 +320,27 @@ public class ExecutionService extends AbstractComponent {
return; return;
} }
logger.trace("executing [{}] [{}]", ctx.watch().id(), ctx.id()); logger.trace("executing [{}] [{}]", ctx.watch().id(), ctx.id());
WatchLockService.Lock lock = watchLockService.acquire(ctx.watch().id()); WatchLockService.Lock lock = watchLockService.acquire(ctx.watch().id());
try { try {
watchRecord.update(WatchRecord.State.CHECKING, null); if (watchStore.get(ctx.watch().id()) == null) {
logger.debug("checking watch [{}]", watchRecord.watchId()); //Fail fast if we are trying to execute a deleted watch
WatchExecutionResult result = executeInner(ctx); String message = "unable to find watch for record [" + watchRecord.id() + "], perhaps it has been deleted, ignoring...";
watchRecord.seal(result); watchRecord.update(WatchRecord.State.DELETED_WHILE_QUEUED, message);
if (ctx.recordExecution()) { } else {
historyStore.update(watchRecord); watchRecord.update(WatchRecord.State.CHECKING, null);
logger.debug("checking watch [{}]", watchRecord.watchId());
WatchExecutionResult result = executeInner(ctx);
watchRecord.seal(result);
if (ctx.recordExecution()) {
watchStore.updateStatus(ctx.watch());
}
} }
watchStore.updateStatus(ctx.watch());
} catch (Exception e) { } catch (Exception e) {
if (started()) { if (started()) {
String detailedMessage = ExceptionsHelper.detailedMessage(e); String detailedMessage = ExceptionsHelper.detailedMessage(e);
logger.warn("failed to execute watch [{}]/[{}], failure [{}]", watchRecord.watchId(), ctx.id(), detailedMessage); logger.warn("failed to execute watch [{}]/[{}], failure [{}]", watchRecord.watchId(), ctx.id(), detailedMessage);
try { watchRecord.update(WatchRecord.State.FAILED, detailedMessage);
watchRecord.update(WatchRecord.State.FAILED, detailedMessage);
if (ctx.recordExecution()) {
historyStore.update(watchRecord);
}
} catch (Exception e2) {
logger.error("failed to update watch record [{}]/[{}], failure [{}], original failure [{}]", watchRecord.watchId(), ctx.id(), ExceptionsHelper.detailedMessage(e2), detailedMessage);
}
} else { } else {
logger.debug("failed to execute watch [{}] after shutdown", e, watchRecord); logger.debug("failed to execute watch [{}] after shutdown", e, watchRecord);
} }
@ -345,6 +348,14 @@ public class ExecutionService extends AbstractComponent {
lock.release(); lock.release();
logger.trace("finished [{}]/[{}]", ctx.watch().id(), ctx.id()); logger.trace("finished [{}]/[{}]", ctx.watch().id(), ctx.id());
} }
if (ctx.recordExecution() && started()) {
try {
historyStore.update(watchRecord);
} catch (Exception e) {
logger.error("failed to update watch record [{}]/[{}], failure [{}], record failure if any [{}]", watchRecord.watchId(), ctx.id(), ExceptionsHelper.detailedMessage(e), watchRecord.message());
}
}
} }
} }
} }

View File

@ -32,9 +32,10 @@ public class RestDeleteWatchAction extends WatcherRestHandler {
@Override @Override
protected void handleRequest(final RestRequest request, RestChannel channel, WatcherClient client) throws Exception { protected void handleRequest(final RestRequest request, RestChannel channel, WatcherClient client) throws Exception {
DeleteWatchRequest indexWatchRequest = new DeleteWatchRequest(request.param("id")); DeleteWatchRequest deleteWatchRequest = new DeleteWatchRequest(request.param("id"));
indexWatchRequest.masterNodeTimeout(request.paramAsTime("master_timeout", indexWatchRequest.masterNodeTimeout())); deleteWatchRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteWatchRequest.masterNodeTimeout()));
client.deleteWatch(indexWatchRequest, new RestBuilderListener<DeleteWatchResponse>(channel) { deleteWatchRequest.setForce(request.paramAsBoolean("force", deleteWatchRequest.isForce()));
client.deleteWatch(deleteWatchRequest, new RestBuilderListener<DeleteWatchResponse>(channel) {
@Override @Override
public RestResponse buildResponse(DeleteWatchResponse response, XContentBuilder builder) throws Exception { public RestResponse buildResponse(DeleteWatchResponse response, XContentBuilder builder) throws Exception {
builder.startObject() builder.startObject()

View File

@ -24,6 +24,7 @@ public class DeleteWatchRequest extends MasterNodeOperationRequest<DeleteWatchRe
private String id; private String id;
private long version = Versions.MATCH_ANY; private long version = Versions.MATCH_ANY;
private boolean force = false;
public DeleteWatchRequest() { public DeleteWatchRequest() {
this(null); this(null);
@ -48,6 +49,21 @@ public class DeleteWatchRequest extends MasterNodeOperationRequest<DeleteWatchRe
this.id = id; this.id = id;
} }
/**
* @return true if this request should ignore locking false if not
*/
public boolean isForce() {
return force;
}
/**
* @param force Sets weither this request should ignore locking and force the delete even if lock is unavailable.
*/
public void setForce(boolean force) {
this.force = force;
}
/** /**
* Sets the version, which will cause the delete operation to only be performed if a matching * Sets the version, which will cause the delete operation to only be performed if a matching
* version exists and no changes happened on the doc since then. * version exists and no changes happened on the doc since then.
@ -74,6 +90,7 @@ public class DeleteWatchRequest extends MasterNodeOperationRequest<DeleteWatchRe
super.readFrom(in); super.readFrom(in);
id = in.readString(); id = in.readString();
version = Versions.readVersion(in); version = Versions.readVersion(in);
force = in.readBoolean();
} }
@Override @Override
@ -81,6 +98,7 @@ public class DeleteWatchRequest extends MasterNodeOperationRequest<DeleteWatchRe
super.writeTo(out); super.writeTo(out);
out.writeString(id); out.writeString(id);
Versions.writeVersion(version, out); Versions.writeVersion(version, out);
out.writeBoolean(force);
} }
@Override @Override

View File

@ -31,6 +31,14 @@ public class DeleteWatchRequestBuilder extends MasterNodeOperationRequestBuilder
return this; return this;
} }
/**
* Sets wiether this request is forced (ie ignores locks)
*/
public DeleteWatchRequestBuilder setForce(boolean force) {
this.request().setForce(force);
return this;
}
@Override @Override
protected void doExecute(final ActionListener<DeleteWatchResponse> listener) { protected void doExecute(final ActionListener<DeleteWatchResponse> listener) {
new WatcherClient(client).deleteWatch(request, listener); new WatcherClient(client).deleteWatch(request, listener);

View File

@ -54,7 +54,7 @@ public class TransportDeleteWatchAction extends WatcherTransportAction<DeleteWat
@Override @Override
protected void masterOperation(DeleteWatchRequest request, ClusterState state, ActionListener<DeleteWatchResponse> listener) throws ElasticsearchException { protected void masterOperation(DeleteWatchRequest request, ClusterState state, ActionListener<DeleteWatchResponse> listener) throws ElasticsearchException {
try { try {
DeleteResponse deleteResponse = watcherService.deleteWatch(request.getId(), request.masterNodeTimeout()).deleteResponse(); DeleteResponse deleteResponse = watcherService.deleteWatch(request.getId(), request.masterNodeTimeout(), request.isForce()).deleteResponse();
DeleteWatchResponse response = new DeleteWatchResponse(deleteResponse.getId(), deleteResponse.getVersion(), deleteResponse.isFound()); DeleteWatchResponse response = new DeleteWatchResponse(deleteResponse.getId(), deleteResponse.getVersion(), deleteResponse.isFound());
listener.onResponse(response); listener.onResponse(response);
} catch (Exception e) { } catch (Exception e) {

View File

@ -14,6 +14,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
@ -66,6 +67,8 @@ public class Watch implements TriggerEngine.Job, ToXContent {
private final Status status; private final Status status;
private final TimeValue throttlePeriod; private final TimeValue throttlePeriod;
private transient long version = Versions.NOT_SET;
@Nullable @Nullable
private final Map<String, Object> metadata; private final Map<String, Object> metadata;
@ -126,6 +129,13 @@ public class Watch implements TriggerEngine.Job, ToXContent {
return status; return status;
} }
public long version() {
return version;
}
public void version(long version) {
this.version = version;
}
/** /**
* Acks this watch. * Acks this watch.
* *

View File

@ -21,6 +21,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
@ -114,22 +115,22 @@ public class WatchStore extends AbstractComponent {
} }
/** /**
* Returns the watch with the specified name otherwise <code>null</code> is returned. * Returns the watch with the specified id otherwise <code>null</code> is returned.
*/ */
public Watch get(String name) { public Watch get(String id) {
ensureStarted(); ensureStarted();
return watches.get(name); return watches.get(id);
} }
/** /**
* Creates an watch with the specified name and source. If an watch with the specified name already exists it will * Creates an watch if this watch already exists it will be overwritten
* get overwritten.
*/ */
public WatchPut put(Watch watch) { public WatchPut put(Watch watch) {
ensureStarted(); ensureStarted();
IndexRequest indexRequest = createIndexRequest(watch.id(), watch.getAsBytes()); IndexRequest indexRequest = createIndexRequest(watch.id(), watch.getAsBytes(), Versions.MATCH_ANY);
IndexResponse response = client.index(indexRequest); IndexResponse response = client.index(indexRequest);
watch.status().version(response.getVersion()); watch.status().version(response.getVersion());
watch.version(response.getVersion());
Watch previous = watches.put(watch.id(), watch); Watch previous = watches.put(watch.id(), watch);
return new WatchPut(previous, watch, response); return new WatchPut(previous, watch, response);
} }
@ -151,25 +152,25 @@ public class WatchStore extends AbstractComponent {
*/ */
void update(Watch watch) throws IOException { void update(Watch watch) throws IOException {
ensureStarted(); ensureStarted();
assert watch == watches.get(watch.id()) : "update watch can only be applied to an already loaded watch";
BytesReference source = JsonXContent.contentBuilder().value(watch).bytes(); BytesReference source = JsonXContent.contentBuilder().value(watch).bytes();
IndexResponse response = client.index(createIndexRequest(watch.id(), source)); IndexResponse response = client.index(createIndexRequest(watch.id(), source, watch.version()));
watch.status().version(response.getVersion()); watch.status().version(response.getVersion());
watch.version(response.getVersion());
watch.status().dirty(false); watch.status().dirty(false);
// Don't need to update the watches, since we are working on an instance from it. // Don't need to update the watches, since we are working on an instance from it.
} }
/** /**
* Deletes the watch with the specified name if exists * Deletes the watch with the specified id if exists
*/ */
public WatchDelete delete(String name) { public WatchDelete delete(String id, boolean force) {
ensureStarted(); ensureStarted();
Watch watch = watches.remove(name); Watch watch = watches.remove(id);
// even if the watch was not found in the watch map, we should still try to delete it // even if the watch was not found in the watch map, we should still try to delete it
// from the index, just to make sure we don't leave traces of it // from the index, just to make sure we don't leave traces of it
DeleteRequest request = new DeleteRequest(INDEX, DOC_TYPE, name); DeleteRequest request = new DeleteRequest(INDEX, DOC_TYPE, id);
if (watch != null) { if (watch != null && !force) {
request.version(watch.status().version()); request.version(watch.version());
} }
DeleteResponse response = client.delete(request).actionGet(); DeleteResponse response = client.delete(request).actionGet();
return new WatchDelete(response); return new WatchDelete(response);
@ -179,10 +180,11 @@ public class WatchStore extends AbstractComponent {
return watches; return watches;
} }
IndexRequest createIndexRequest(String name, BytesReference source) { IndexRequest createIndexRequest(String id, BytesReference source, long version) {
IndexRequest indexRequest = new IndexRequest(INDEX, DOC_TYPE, name); IndexRequest indexRequest = new IndexRequest(INDEX, DOC_TYPE, id);
indexRequest.listenerThreaded(false); indexRequest.listenerThreaded(false);
indexRequest.source(source, false); indexRequest.source(source, false);
indexRequest.version(version);
return indexRequest; return indexRequest;
} }
@ -216,14 +218,15 @@ public class WatchStore extends AbstractComponent {
response = client.searchScroll(response.getScrollId(), scrollTimeout); response = client.searchScroll(response.getScrollId(), scrollTimeout);
while (response.getHits().hits().length != 0) { while (response.getHits().hits().length != 0) {
for (SearchHit hit : response.getHits()) { for (SearchHit hit : response.getHits()) {
String name = hit.getId(); String id = hit.getId();
try { try {
Watch watch = watchParser.parse(name, true, hit.getSourceRef()); Watch watch = watchParser.parse(id, true, hit.getSourceRef());
watch.status().version(hit.version()); watch.status().version(hit.version());
watches.put(name, watch); watch.version(hit.version());
watches.put(id, watch);
count++; count++;
} catch (Exception e) { } catch (Exception e) {
logger.error("couldn't load watch [{}], ignoring it...", e, name); logger.error("couldn't load watch [{}], ignoring it...", e, id);
} }
} }
response = client.searchScroll(response.getScrollId(), scrollTimeout); response = client.searchScroll(response.getScrollId(), scrollTimeout);

View File

@ -5,17 +5,25 @@
*/ */
package org.elasticsearch.watcher.execution; package org.elasticsearch.watcher.execution;
import org.apache.lucene.util.LuceneTestCase.Slow;
import com.carrotsearch.randomizedtesting.annotations.Repeat; import com.carrotsearch.randomizedtesting.annotations.Repeat;
import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.watcher.WatcherException;
import org.elasticsearch.watcher.WatcherService;
import org.elasticsearch.watcher.actions.logging.LoggingAction; import org.elasticsearch.watcher.actions.logging.LoggingAction;
import org.elasticsearch.watcher.client.WatchSourceBuilder; import org.elasticsearch.watcher.client.WatchSourceBuilder;
import org.elasticsearch.watcher.condition.always.AlwaysCondition; import org.elasticsearch.watcher.condition.always.AlwaysCondition;
import org.elasticsearch.watcher.condition.script.ScriptCondition;
import org.elasticsearch.watcher.history.HistoryStore; import org.elasticsearch.watcher.history.HistoryStore;
import org.elasticsearch.watcher.history.WatchRecord; import org.elasticsearch.watcher.history.WatchRecord;
import org.elasticsearch.watcher.input.simple.SimpleInput; import org.elasticsearch.watcher.input.simple.SimpleInput;
import org.elasticsearch.watcher.support.Script;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests; import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.throttle.Throttler;
import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchResponse;
import org.elasticsearch.watcher.transport.actions.execute.ExecuteWatchResponse; import org.elasticsearch.watcher.transport.actions.execute.ExecuteWatchResponse;
import org.elasticsearch.watcher.transport.actions.get.GetWatchRequest; import org.elasticsearch.watcher.transport.actions.get.GetWatchRequest;
import org.elasticsearch.watcher.transport.actions.put.PutWatchRequest; import org.elasticsearch.watcher.transport.actions.put.PutWatchRequest;
@ -27,8 +35,11 @@ import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.watcher.watch.Watch; import org.elasticsearch.watcher.watch.Watch;
import org.junit.Test; import org.junit.Test;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC; import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
@ -222,4 +233,83 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests {
watchRecord = watchRecordParser.parse(wid.value(), 1, executeWatchResponse.getSource().getBytes()); watchRecord = watchRecordParser.parse(wid.value(), 1, executeWatchResponse.getSource().getBytes());
assertThat(watchRecord.state(), equalTo(WatchRecord.State.THROTTLED)); assertThat(watchRecord.state(), equalTo(WatchRecord.State.THROTTLED));
} }
@Test
@Slow
public void testForceDeletionOfLongRunningWatch() throws Exception {
WatchSourceBuilder watchBuilder = watchBuilder()
.trigger(schedule(cron("0 0 0 1 * ? 2099")))
.input(simpleInput("foo", "bar"))
.condition(new ScriptCondition((new Script.Builder.Inline("sleep 10000; return true")).build()))
.throttlePeriod(new TimeValue(1, TimeUnit.HOURS))
.addAction("log", loggingAction("foobar"));
int numberOfThreads = scaledRandomIntBetween(1, 5);
PutWatchResponse putWatchResponse = watcherClient().putWatch(new PutWatchRequest("_id", watchBuilder)).actionGet();
assertThat(putWatchResponse.getVersion(), greaterThan(0L));
refresh();
assertThat(watcherClient().getWatch(new GetWatchRequest("_id")).actionGet().isFound(), equalTo(true));
CountDownLatch startLatch = new CountDownLatch(1);
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < numberOfThreads; ++i) {
threads.add(new Thread(new ExecutionRunner(watchService(), executionService(), "_id", startLatch)));
}
for (Thread thread : threads) {
thread.start();
}
DeleteWatchResponse deleteWatchResponse = watcherClient().prepareDeleteWatch("_id").setForce(true).get();
assertThat(deleteWatchResponse.isFound(), is(true));
deleteWatchResponse = watcherClient().prepareDeleteWatch("_id").get();
assertThat(deleteWatchResponse.isFound(), is(false));
startLatch.countDown();
long startJoin = System.currentTimeMillis();
for (Thread thread : threads) {
thread.join();
}
long endJoin = System.currentTimeMillis();
TimeValue tv = new TimeValue(10 * (numberOfThreads+1), TimeUnit.SECONDS);
assertThat("Shouldn't take longer than [" + tv.getSeconds() + "] seconds for all the threads to stop", (endJoin - startJoin), lessThan(tv.getMillis()));
}
private static class ExecutionRunner implements Runnable {
final WatcherService watcherService;
final ExecutionService executionService;
final String watchId;
final CountDownLatch startLatch;
final ManualExecutionContext.Builder ctxBuilder;
private ExecutionRunner(WatcherService watcherService, ExecutionService executionService, String watchId, CountDownLatch startLatch) {
this.watcherService = watcherService;
this.executionService = executionService;
this.watchId = watchId;
this.startLatch = startLatch;
ManualTriggerEvent triggerEvent = new ManualTriggerEvent(watchId, new ScheduleTriggerEvent(new DateTime(UTC), new DateTime(UTC)));
ctxBuilder = ManualExecutionContext.builder(watcherService.getWatch(watchId), triggerEvent);
ctxBuilder.recordExecution(true);
ctxBuilder.withThrottle(Throttler.Result.NO);
}
@Override
public void run() {
try {
startLatch.await();
executionService.execute(ctxBuilder.build());
fail("Execution of a deleted watch should fail but didn't");
} catch (WatcherException we) {
assertThat(we.getCause(), instanceOf(VersionConflictEngineException.class));
} catch (Throwable t) {
throw new WatcherException("Failure mode execution of [{}] failed in an unexpected way", t, watchId);
}
}
}
} }

View File

@ -7,13 +7,13 @@ package org.elasticsearch.watcher.test.integration;
import com.carrotsearch.randomizedtesting.annotations.Repeat; import com.carrotsearch.randomizedtesting.annotations.Repeat;
import org.elasticsearch.watcher.client.WatchSourceBuilder; import org.elasticsearch.watcher.client.WatchSourceBuilder;
import org.elasticsearch.watcher.support.xcontent.XContentSource;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests; import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchRequest; import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchRequest;
import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchResponse; import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchResponse;
import org.elasticsearch.watcher.transport.actions.get.GetWatchRequest; import org.elasticsearch.watcher.transport.actions.get.GetWatchRequest;
import org.elasticsearch.watcher.transport.actions.get.GetWatchResponse; import org.elasticsearch.watcher.transport.actions.get.GetWatchResponse;
import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse; import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse;
import org.elasticsearch.watcher.support.xcontent.XContentSource;
import org.junit.Test; import org.junit.Test;
import java.util.Map; import java.util.Map;
@ -134,4 +134,5 @@ public class WatchCrudTests extends AbstractWatcherIntegrationTests {
assertThat(response.getVersion(), is(1L)); assertThat(response.getVersion(), is(1L));
assertThat(response.isFound(), is(false)); assertThat(response.isFound(), is(false));
} }
} }

View File

@ -0,0 +1,61 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.test.integration;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.watcher.support.Script;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchResponse;
import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse;
import org.elasticsearch.watcher.transport.actions.service.WatcherServiceResponse;
import org.junit.Test;
import static org.elasticsearch.watcher.actions.ActionBuilders.loggingAction;
import static org.elasticsearch.watcher.client.WatchSourceBuilders.watchBuilder;
import static org.elasticsearch.watcher.condition.ConditionBuilders.scriptCondition;
import static org.elasticsearch.watcher.input.InputBuilders.simpleInput;
import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.watcher.trigger.schedule.Schedules.interval;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
/**
*/
public class WatchForceDeleteTests extends AbstractWatcherIntegrationTests {
protected boolean timeWarped() {
return false; //Disable time warping for the force delete long running watch test
}
@Test
@Slow
public void testForceDelete_LongRunningWatch() throws Exception {
PutWatchResponse putResponse = watcherClient().preparePutWatch("_name").setSource(watchBuilder()
.trigger(schedule(interval("1s")))
.input(simpleInput())
.condition(scriptCondition(Script.inline("sleep 5000; return true")))
.addAction("_action1", loggingAction("{{ctx.watch_id}}")))
.get();
assertThat(putResponse.getId(), equalTo("_name"));
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
}
DeleteWatchResponse deleteWatchResponse = watcherClient().prepareDeleteWatch("_name").setForce(true).get();
assertThat(deleteWatchResponse.isFound(), is(true));
deleteWatchResponse = watcherClient().prepareDeleteWatch("_name").get();
assertThat(deleteWatchResponse.isFound(), is(false));
WatcherServiceResponse stopResponse = watcherClient().prepareWatchService().stop().get();
assertThat(stopResponse.isAcknowledged(), is(true));
ensureWatcherStopped();
WatcherServiceResponse startResponse = watcherClient().prepareWatchService().start().get();
assertThat(startResponse.isAcknowledged(), is(true));
ensureWatcherStarted();
deleteWatchResponse = watcherClient().prepareDeleteWatch("_name").get();
assertThat(deleteWatchResponse.isFound(), is(false));
}
}

View File

@ -110,14 +110,15 @@ public class WatchServiceTests extends ElasticsearchTestCase {
public void testDeleteWatch() throws Exception { public void testDeleteWatch() throws Exception {
TimeValue timeout = TimeValue.timeValueSeconds(5); TimeValue timeout = TimeValue.timeValueSeconds(5);
WatchLockService.Lock lock = mock(WatchLockService.Lock.class); WatchLockService.Lock lock = mock(WatchLockService.Lock.class);
boolean force = randomBoolean();
when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock); when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock);
WatchStore.WatchDelete expectedWatchDelete = mock(WatchStore.WatchDelete.class); WatchStore.WatchDelete expectedWatchDelete = mock(WatchStore.WatchDelete.class);
DeleteResponse deleteResponse = mock(DeleteResponse.class); DeleteResponse deleteResponse = mock(DeleteResponse.class);
when(deleteResponse.isFound()).thenReturn(true); when(deleteResponse.isFound()).thenReturn(true);
when(expectedWatchDelete.deleteResponse()).thenReturn(deleteResponse); when(expectedWatchDelete.deleteResponse()).thenReturn(deleteResponse);
when(watchStore.delete("_id")).thenReturn(expectedWatchDelete); when(watchStore.delete("_id", force)).thenReturn(expectedWatchDelete);
WatchStore.WatchDelete watchDelete = watcherService.deleteWatch("_id", timeout); WatchStore.WatchDelete watchDelete = watcherService.deleteWatch("_id", timeout, force);
assertThat(watchDelete, sameInstance(expectedWatchDelete)); assertThat(watchDelete, sameInstance(expectedWatchDelete));
verify(triggerService, times(1)).remove("_id"); verify(triggerService, times(1)).remove("_id");
@ -127,12 +128,31 @@ public class WatchServiceTests extends ElasticsearchTestCase {
public void testDeleteWatch_Timeout() throws Exception { public void testDeleteWatch_Timeout() throws Exception {
TimeValue timeout = TimeValue.timeValueSeconds(5); TimeValue timeout = TimeValue.timeValueSeconds(5);
when(watchLockService.tryAcquire("_id", timeout)).thenReturn(null); when(watchLockService.tryAcquire("_id", timeout)).thenReturn(null);
watcherService.deleteWatch("_id", timeout); watcherService.deleteWatch("_id", timeout, false);
} }
@Test
public void testDeleteWatch_Force() throws Exception {
TimeValue timeout = TimeValue.timeValueSeconds(5);
WatchLockService.Lock lock = mock(WatchLockService.Lock.class);
when(watchLockService.tryAcquire("_id", timeout)).thenReturn(null);
WatchStore.WatchDelete expectedWatchDelete = mock(WatchStore.WatchDelete.class);
DeleteResponse deleteResponse = mock(DeleteResponse.class);
when(deleteResponse.isFound()).thenReturn(true);
when(expectedWatchDelete.deleteResponse()).thenReturn(deleteResponse);
when(watchStore.delete("_id", true)).thenReturn(expectedWatchDelete);
WatchStore.WatchDelete watchDelete = watcherService.deleteWatch("_id", timeout, true);
assertThat(watchDelete, sameInstance(expectedWatchDelete));
verify(triggerService, times(1)).remove("_id");
}
@Test @Test
public void testDeleteWatch_NotFound() throws Exception { public void testDeleteWatch_NotFound() throws Exception {
TimeValue timeout = TimeValue.timeValueSeconds(5); TimeValue timeout = TimeValue.timeValueSeconds(5);
boolean force = randomBoolean();
WatchLockService.Lock lock = mock(WatchLockService.Lock.class); WatchLockService.Lock lock = mock(WatchLockService.Lock.class);
when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock); when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock);
@ -140,8 +160,8 @@ public class WatchServiceTests extends ElasticsearchTestCase {
DeleteResponse deleteResponse = mock(DeleteResponse.class); DeleteResponse deleteResponse = mock(DeleteResponse.class);
when(deleteResponse.isFound()).thenReturn(false); when(deleteResponse.isFound()).thenReturn(false);
when(expectedWatchDelete.deleteResponse()).thenReturn(deleteResponse); when(expectedWatchDelete.deleteResponse()).thenReturn(deleteResponse);
when(watchStore.delete("_id")).thenReturn(expectedWatchDelete); when(watchStore.delete("_id", force)).thenReturn(expectedWatchDelete);
WatchStore.WatchDelete watchDelete = watcherService.deleteWatch("_id", timeout); WatchStore.WatchDelete watchDelete = watcherService.deleteWatch("_id", timeout, force);
assertThat(watchDelete, sameInstance(expectedWatchDelete)); assertThat(watchDelete, sameInstance(expectedWatchDelete));
verifyZeroInteractions(triggerService); verifyZeroInteractions(triggerService);