diff --git a/rest-api-spec/api/watcher.delete_watch.json b/rest-api-spec/api/watcher.delete_watch.json index 214008893f7..70180c40e6e 100644 --- a/rest-api-spec/api/watcher.delete_watch.json +++ b/rest-api-spec/api/watcher.delete_watch.json @@ -16,6 +16,10 @@ "master_timeout": { "type": "duration", "description": "Specify timeout for watch write operation" + }, + "force": { + "type": "boolean", + "description": "Specify if this request should be forced and ignore locks" } } }, diff --git a/src/main/java/org/elasticsearch/watcher/WatcherService.java b/src/main/java/org/elasticsearch/watcher/WatcherService.java index 45eafc46540..d6433cdb2b2 100644 --- a/src/main/java/org/elasticsearch/watcher/WatcherService.java +++ b/src/main/java/org/elasticsearch/watcher/WatcherService.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.joda.time.PeriodType; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.watcher.execution.ExecutionService; import org.elasticsearch.watcher.trigger.TriggerService; import org.elasticsearch.watcher.watch.Watch; @@ -78,20 +79,25 @@ public class WatcherService extends AbstractComponent { } } - public WatchStore.WatchDelete deleteWatch(String id, TimeValue timeout) { + public WatchStore.WatchDelete deleteWatch(String id, TimeValue timeout, final boolean force) { ensureStarted(); - WatchLockService.Lock lock = watchLockService.tryAcquire(id, timeout); - if (lock == null) { - throw new TimeoutException("could not delete watch [{}] within [{}]... wait and try again. If this error continues to occur there is a high chance that the watch execution is stuck (either due to unresponsive external system such as an email service, or due to a bad script", id, timeout.format(PeriodType.seconds())); + WatchLockService.Lock lock = null; + if (!force) { + lock = watchLockService.tryAcquire(id, timeout); + if (lock == null) { + throw new TimeoutException("could not delete watch [{}] within [{}]... wait and try again. If this error continues to occur there is a high chance that the watch execution is stuck (either due to unresponsive external system such as an email service, or due to a bad script", id, timeout.format(PeriodType.seconds())); + } } try { - WatchStore.WatchDelete delete = watchStore.delete(id); + WatchStore.WatchDelete delete = watchStore.delete(id, force); if (delete.deleteResponse().isFound()) { triggerService.remove(id); } return delete; } finally { - lock.release(); + if (lock != null) { + lock.release(); + } } } @@ -145,7 +151,9 @@ public class WatcherService extends AbstractComponent { try { watchStore.updateStatus(watch); } catch (IOException ioe) { - throw new WatcherException("failed to update the watch on ack", ioe); + throw new WatcherException("failed to update the watch [{}] on ack", ioe, watch.id()); + } catch (VersionConflictEngineException vcee) { + throw new WatcherException("failed to update the watch [{}] on ack, perhaps it was force deleted", vcee, watch.id()); } } // we need to create a safe copy of the status diff --git a/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java b/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java index af715379e01..251c1e1b0e8 100644 --- a/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java +++ b/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.watcher.WatcherException; import org.elasticsearch.watcher.actions.ActionWrapper; import org.elasticsearch.watcher.condition.Condition; @@ -209,18 +210,21 @@ public class ExecutionService extends AbstractComponent { public WatchRecord execute(WatchExecutionContext ctx) throws IOException { WatchRecord watchRecord = new WatchRecord(ctx.id(), ctx.watch(), ctx.triggerEvent()); - WatchLockService.Lock lock = watchLockService.acquire(ctx.watch().id()); try { WatchExecutionResult result = executeInner(ctx); watchRecord.seal(result); + if (ctx.recordExecution()) { + watchStore.updateStatus(ctx.watch()); + } + } catch (VersionConflictEngineException vcee) { + throw new WatcherException("Failed to update the watch [{}] on execute perhaps it was force deleted", vcee, ctx.watch().id()); } finally { lock.release(); } if (ctx.recordExecution()) { historyStore.put(watchRecord); } - watchStore.updateStatus(ctx.watch()); return watchRecord; } @@ -316,28 +320,27 @@ public class ExecutionService extends AbstractComponent { return; } logger.trace("executing [{}] [{}]", ctx.watch().id(), ctx.id()); + WatchLockService.Lock lock = watchLockService.acquire(ctx.watch().id()); try { - watchRecord.update(WatchRecord.State.CHECKING, null); - logger.debug("checking watch [{}]", watchRecord.watchId()); - WatchExecutionResult result = executeInner(ctx); - watchRecord.seal(result); - if (ctx.recordExecution()) { - historyStore.update(watchRecord); + if (watchStore.get(ctx.watch().id()) == null) { + //Fail fast if we are trying to execute a deleted watch + String message = "unable to find watch for record [" + watchRecord.id() + "], perhaps it has been deleted, ignoring..."; + watchRecord.update(WatchRecord.State.DELETED_WHILE_QUEUED, message); + } else { + watchRecord.update(WatchRecord.State.CHECKING, null); + logger.debug("checking watch [{}]", watchRecord.watchId()); + WatchExecutionResult result = executeInner(ctx); + watchRecord.seal(result); + if (ctx.recordExecution()) { + watchStore.updateStatus(ctx.watch()); + } } - watchStore.updateStatus(ctx.watch()); } catch (Exception e) { if (started()) { String detailedMessage = ExceptionsHelper.detailedMessage(e); logger.warn("failed to execute watch [{}]/[{}], failure [{}]", watchRecord.watchId(), ctx.id(), detailedMessage); - try { - watchRecord.update(WatchRecord.State.FAILED, detailedMessage); - if (ctx.recordExecution()) { - historyStore.update(watchRecord); - } - } catch (Exception e2) { - logger.error("failed to update watch record [{}]/[{}], failure [{}], original failure [{}]", watchRecord.watchId(), ctx.id(), ExceptionsHelper.detailedMessage(e2), detailedMessage); - } + watchRecord.update(WatchRecord.State.FAILED, detailedMessage); } else { logger.debug("failed to execute watch [{}] after shutdown", e, watchRecord); } @@ -345,6 +348,14 @@ public class ExecutionService extends AbstractComponent { lock.release(); logger.trace("finished [{}]/[{}]", ctx.watch().id(), ctx.id()); } + + if (ctx.recordExecution() && started()) { + try { + historyStore.update(watchRecord); + } catch (Exception e) { + logger.error("failed to update watch record [{}]/[{}], failure [{}], record failure if any [{}]", watchRecord.watchId(), ctx.id(), ExceptionsHelper.detailedMessage(e), watchRecord.message()); + } + } } } } diff --git a/src/main/java/org/elasticsearch/watcher/rest/action/RestDeleteWatchAction.java b/src/main/java/org/elasticsearch/watcher/rest/action/RestDeleteWatchAction.java index 0f223506877..d04e678ee2a 100644 --- a/src/main/java/org/elasticsearch/watcher/rest/action/RestDeleteWatchAction.java +++ b/src/main/java/org/elasticsearch/watcher/rest/action/RestDeleteWatchAction.java @@ -32,9 +32,10 @@ public class RestDeleteWatchAction extends WatcherRestHandler { @Override protected void handleRequest(final RestRequest request, RestChannel channel, WatcherClient client) throws Exception { - DeleteWatchRequest indexWatchRequest = new DeleteWatchRequest(request.param("id")); - indexWatchRequest.masterNodeTimeout(request.paramAsTime("master_timeout", indexWatchRequest.masterNodeTimeout())); - client.deleteWatch(indexWatchRequest, new RestBuilderListener(channel) { + DeleteWatchRequest deleteWatchRequest = new DeleteWatchRequest(request.param("id")); + deleteWatchRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteWatchRequest.masterNodeTimeout())); + deleteWatchRequest.setForce(request.paramAsBoolean("force", deleteWatchRequest.isForce())); + client.deleteWatch(deleteWatchRequest, new RestBuilderListener(channel) { @Override public RestResponse buildResponse(DeleteWatchResponse response, XContentBuilder builder) throws Exception { builder.startObject() diff --git a/src/main/java/org/elasticsearch/watcher/transport/actions/delete/DeleteWatchRequest.java b/src/main/java/org/elasticsearch/watcher/transport/actions/delete/DeleteWatchRequest.java index 7e2d500a12b..f9336dddc16 100644 --- a/src/main/java/org/elasticsearch/watcher/transport/actions/delete/DeleteWatchRequest.java +++ b/src/main/java/org/elasticsearch/watcher/transport/actions/delete/DeleteWatchRequest.java @@ -24,6 +24,7 @@ public class DeleteWatchRequest extends MasterNodeOperationRequest listener) { new WatcherClient(client).deleteWatch(request, listener); diff --git a/src/main/java/org/elasticsearch/watcher/transport/actions/delete/TransportDeleteWatchAction.java b/src/main/java/org/elasticsearch/watcher/transport/actions/delete/TransportDeleteWatchAction.java index c96af276c91..784f717fb92 100644 --- a/src/main/java/org/elasticsearch/watcher/transport/actions/delete/TransportDeleteWatchAction.java +++ b/src/main/java/org/elasticsearch/watcher/transport/actions/delete/TransportDeleteWatchAction.java @@ -54,7 +54,7 @@ public class TransportDeleteWatchAction extends WatcherTransportAction listener) throws ElasticsearchException { try { - DeleteResponse deleteResponse = watcherService.deleteWatch(request.getId(), request.masterNodeTimeout()).deleteResponse(); + DeleteResponse deleteResponse = watcherService.deleteWatch(request.getId(), request.masterNodeTimeout(), request.isForce()).deleteResponse(); DeleteWatchResponse response = new DeleteWatchResponse(deleteResponse.getId(), deleteResponse.getVersion(), deleteResponse.isFound()); listener.onResponse(response); } catch (Exception e) { diff --git a/src/main/java/org/elasticsearch/watcher/watch/Watch.java b/src/main/java/org/elasticsearch/watcher/watch/Watch.java index ed68f44575a..503e3937dbb 100644 --- a/src/main/java/org/elasticsearch/watcher/watch/Watch.java +++ b/src/main/java/org/elasticsearch/watcher/watch/Watch.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.joda.time.DateTime; +import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; @@ -66,6 +67,8 @@ public class Watch implements TriggerEngine.Job, ToXContent { private final Status status; private final TimeValue throttlePeriod; + private transient long version = Versions.NOT_SET; + @Nullable private final Map metadata; @@ -126,6 +129,13 @@ public class Watch implements TriggerEngine.Job, ToXContent { return status; } + public long version() { + return version; + } + + public void version(long version) { + this.version = version; + } /** * Acks this watch. * diff --git a/src/main/java/org/elasticsearch/watcher/watch/WatchStore.java b/src/main/java/org/elasticsearch/watcher/watch/WatchStore.java index da8e974884c..7e5674245a2 100644 --- a/src/main/java/org/elasticsearch/watcher/watch/WatchStore.java +++ b/src/main/java/org/elasticsearch/watcher/watch/WatchStore.java @@ -21,6 +21,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; @@ -114,22 +115,22 @@ public class WatchStore extends AbstractComponent { } /** - * Returns the watch with the specified name otherwise null is returned. + * Returns the watch with the specified id otherwise null is returned. */ - public Watch get(String name) { + public Watch get(String id) { ensureStarted(); - return watches.get(name); + return watches.get(id); } /** - * Creates an watch with the specified name and source. If an watch with the specified name already exists it will - * get overwritten. + * Creates an watch if this watch already exists it will be overwritten */ public WatchPut put(Watch watch) { ensureStarted(); - IndexRequest indexRequest = createIndexRequest(watch.id(), watch.getAsBytes()); + IndexRequest indexRequest = createIndexRequest(watch.id(), watch.getAsBytes(), Versions.MATCH_ANY); IndexResponse response = client.index(indexRequest); watch.status().version(response.getVersion()); + watch.version(response.getVersion()); Watch previous = watches.put(watch.id(), watch); return new WatchPut(previous, watch, response); } @@ -151,25 +152,25 @@ public class WatchStore extends AbstractComponent { */ void update(Watch watch) throws IOException { ensureStarted(); - assert watch == watches.get(watch.id()) : "update watch can only be applied to an already loaded watch"; BytesReference source = JsonXContent.contentBuilder().value(watch).bytes(); - IndexResponse response = client.index(createIndexRequest(watch.id(), source)); + IndexResponse response = client.index(createIndexRequest(watch.id(), source, watch.version())); watch.status().version(response.getVersion()); + watch.version(response.getVersion()); watch.status().dirty(false); // Don't need to update the watches, since we are working on an instance from it. } /** - * Deletes the watch with the specified name if exists + * Deletes the watch with the specified id if exists */ - public WatchDelete delete(String name) { + public WatchDelete delete(String id, boolean force) { ensureStarted(); - Watch watch = watches.remove(name); + Watch watch = watches.remove(id); // even if the watch was not found in the watch map, we should still try to delete it // from the index, just to make sure we don't leave traces of it - DeleteRequest request = new DeleteRequest(INDEX, DOC_TYPE, name); - if (watch != null) { - request.version(watch.status().version()); + DeleteRequest request = new DeleteRequest(INDEX, DOC_TYPE, id); + if (watch != null && !force) { + request.version(watch.version()); } DeleteResponse response = client.delete(request).actionGet(); return new WatchDelete(response); @@ -179,10 +180,11 @@ public class WatchStore extends AbstractComponent { return watches; } - IndexRequest createIndexRequest(String name, BytesReference source) { - IndexRequest indexRequest = new IndexRequest(INDEX, DOC_TYPE, name); + IndexRequest createIndexRequest(String id, BytesReference source, long version) { + IndexRequest indexRequest = new IndexRequest(INDEX, DOC_TYPE, id); indexRequest.listenerThreaded(false); indexRequest.source(source, false); + indexRequest.version(version); return indexRequest; } @@ -216,14 +218,15 @@ public class WatchStore extends AbstractComponent { response = client.searchScroll(response.getScrollId(), scrollTimeout); while (response.getHits().hits().length != 0) { for (SearchHit hit : response.getHits()) { - String name = hit.getId(); + String id = hit.getId(); try { - Watch watch = watchParser.parse(name, true, hit.getSourceRef()); + Watch watch = watchParser.parse(id, true, hit.getSourceRef()); watch.status().version(hit.version()); - watches.put(name, watch); + watch.version(hit.version()); + watches.put(id, watch); count++; } catch (Exception e) { - logger.error("couldn't load watch [{}], ignoring it...", e, name); + logger.error("couldn't load watch [{}], ignoring it...", e, id); } } response = client.searchScroll(response.getScrollId(), scrollTimeout); diff --git a/src/test/java/org/elasticsearch/watcher/execution/ManualExecutionTests.java b/src/test/java/org/elasticsearch/watcher/execution/ManualExecutionTests.java index 0f8fe947833..962030b9f72 100644 --- a/src/test/java/org/elasticsearch/watcher/execution/ManualExecutionTests.java +++ b/src/test/java/org/elasticsearch/watcher/execution/ManualExecutionTests.java @@ -5,17 +5,25 @@ */ package org.elasticsearch.watcher.execution; +import org.apache.lucene.util.LuceneTestCase.Slow; import com.carrotsearch.randomizedtesting.annotations.Repeat; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.engine.VersionConflictEngineException; +import org.elasticsearch.watcher.WatcherException; +import org.elasticsearch.watcher.WatcherService; import org.elasticsearch.watcher.actions.logging.LoggingAction; import org.elasticsearch.watcher.client.WatchSourceBuilder; import org.elasticsearch.watcher.condition.always.AlwaysCondition; +import org.elasticsearch.watcher.condition.script.ScriptCondition; import org.elasticsearch.watcher.history.HistoryStore; import org.elasticsearch.watcher.history.WatchRecord; import org.elasticsearch.watcher.input.simple.SimpleInput; +import org.elasticsearch.watcher.support.Script; import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests; +import org.elasticsearch.watcher.throttle.Throttler; +import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchResponse; import org.elasticsearch.watcher.transport.actions.execute.ExecuteWatchResponse; import org.elasticsearch.watcher.transport.actions.get.GetWatchRequest; import org.elasticsearch.watcher.transport.actions.put.PutWatchRequest; @@ -27,8 +35,11 @@ import org.elasticsearch.watcher.watch.Payload; import org.elasticsearch.watcher.watch.Watch; import org.junit.Test; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static org.elasticsearch.common.joda.time.DateTimeZone.UTC; @@ -222,4 +233,83 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests { watchRecord = watchRecordParser.parse(wid.value(), 1, executeWatchResponse.getSource().getBytes()); assertThat(watchRecord.state(), equalTo(WatchRecord.State.THROTTLED)); } + + + @Test + @Slow + public void testForceDeletionOfLongRunningWatch() throws Exception { + WatchSourceBuilder watchBuilder = watchBuilder() + .trigger(schedule(cron("0 0 0 1 * ? 2099"))) + .input(simpleInput("foo", "bar")) + .condition(new ScriptCondition((new Script.Builder.Inline("sleep 10000; return true")).build())) + .throttlePeriod(new TimeValue(1, TimeUnit.HOURS)) + .addAction("log", loggingAction("foobar")); + + int numberOfThreads = scaledRandomIntBetween(1, 5); + PutWatchResponse putWatchResponse = watcherClient().putWatch(new PutWatchRequest("_id", watchBuilder)).actionGet(); + assertThat(putWatchResponse.getVersion(), greaterThan(0L)); + refresh(); + assertThat(watcherClient().getWatch(new GetWatchRequest("_id")).actionGet().isFound(), equalTo(true)); + + + CountDownLatch startLatch = new CountDownLatch(1); + + List threads = new ArrayList<>(); + for (int i = 0; i < numberOfThreads; ++i) { + threads.add(new Thread(new ExecutionRunner(watchService(), executionService(), "_id", startLatch))); + } + + for (Thread thread : threads) { + thread.start(); + } + DeleteWatchResponse deleteWatchResponse = watcherClient().prepareDeleteWatch("_id").setForce(true).get(); + assertThat(deleteWatchResponse.isFound(), is(true)); + + deleteWatchResponse = watcherClient().prepareDeleteWatch("_id").get(); + assertThat(deleteWatchResponse.isFound(), is(false)); + + startLatch.countDown(); + + long startJoin = System.currentTimeMillis(); + for (Thread thread : threads) { + thread.join(); + } + long endJoin = System.currentTimeMillis(); + TimeValue tv = new TimeValue(10 * (numberOfThreads+1), TimeUnit.SECONDS); + assertThat("Shouldn't take longer than [" + tv.getSeconds() + "] seconds for all the threads to stop", (endJoin - startJoin), lessThan(tv.getMillis())); + } + + private static class ExecutionRunner implements Runnable { + + final WatcherService watcherService; + final ExecutionService executionService; + final String watchId; + final CountDownLatch startLatch; + final ManualExecutionContext.Builder ctxBuilder; + + private ExecutionRunner(WatcherService watcherService, ExecutionService executionService, String watchId, CountDownLatch startLatch) { + this.watcherService = watcherService; + this.executionService = executionService; + this.watchId = watchId; + this.startLatch = startLatch; + ManualTriggerEvent triggerEvent = new ManualTriggerEvent(watchId, new ScheduleTriggerEvent(new DateTime(UTC), new DateTime(UTC))); + ctxBuilder = ManualExecutionContext.builder(watcherService.getWatch(watchId), triggerEvent); + ctxBuilder.recordExecution(true); + ctxBuilder.withThrottle(Throttler.Result.NO); + } + + @Override + public void run() { + try { + startLatch.await(); + executionService.execute(ctxBuilder.build()); + fail("Execution of a deleted watch should fail but didn't"); + } catch (WatcherException we) { + assertThat(we.getCause(), instanceOf(VersionConflictEngineException.class)); + } catch (Throwable t) { + throw new WatcherException("Failure mode execution of [{}] failed in an unexpected way", t, watchId); + } + } + } + } diff --git a/src/test/java/org/elasticsearch/watcher/test/integration/WatchCrudTests.java b/src/test/java/org/elasticsearch/watcher/test/integration/WatchCrudTests.java index 30b0ce14b18..d58690969d5 100644 --- a/src/test/java/org/elasticsearch/watcher/test/integration/WatchCrudTests.java +++ b/src/test/java/org/elasticsearch/watcher/test/integration/WatchCrudTests.java @@ -7,13 +7,13 @@ package org.elasticsearch.watcher.test.integration; import com.carrotsearch.randomizedtesting.annotations.Repeat; import org.elasticsearch.watcher.client.WatchSourceBuilder; +import org.elasticsearch.watcher.support.xcontent.XContentSource; import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests; import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchRequest; import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchResponse; import org.elasticsearch.watcher.transport.actions.get.GetWatchRequest; import org.elasticsearch.watcher.transport.actions.get.GetWatchResponse; import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse; -import org.elasticsearch.watcher.support.xcontent.XContentSource; import org.junit.Test; import java.util.Map; @@ -134,4 +134,5 @@ public class WatchCrudTests extends AbstractWatcherIntegrationTests { assertThat(response.getVersion(), is(1L)); assertThat(response.isFound(), is(false)); } + } diff --git a/src/test/java/org/elasticsearch/watcher/test/integration/WatchForceDeleteTests.java b/src/test/java/org/elasticsearch/watcher/test/integration/WatchForceDeleteTests.java new file mode 100644 index 00000000000..39038c7db86 --- /dev/null +++ b/src/test/java/org/elasticsearch/watcher/test/integration/WatchForceDeleteTests.java @@ -0,0 +1,61 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.test.integration; + +import org.apache.lucene.util.LuceneTestCase.Slow; +import org.elasticsearch.watcher.support.Script; +import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests; +import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchResponse; +import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse; +import org.elasticsearch.watcher.transport.actions.service.WatcherServiceResponse; +import org.junit.Test; + +import static org.elasticsearch.watcher.actions.ActionBuilders.loggingAction; +import static org.elasticsearch.watcher.client.WatchSourceBuilders.watchBuilder; +import static org.elasticsearch.watcher.condition.ConditionBuilders.scriptCondition; +import static org.elasticsearch.watcher.input.InputBuilders.simpleInput; +import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule; +import static org.elasticsearch.watcher.trigger.schedule.Schedules.interval; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +/** + */ +public class WatchForceDeleteTests extends AbstractWatcherIntegrationTests { + + protected boolean timeWarped() { + return false; //Disable time warping for the force delete long running watch test + } + + @Test + @Slow + public void testForceDelete_LongRunningWatch() throws Exception { + PutWatchResponse putResponse = watcherClient().preparePutWatch("_name").setSource(watchBuilder() + .trigger(schedule(interval("1s"))) + .input(simpleInput()) + .condition(scriptCondition(Script.inline("sleep 5000; return true"))) + .addAction("_action1", loggingAction("{{ctx.watch_id}}"))) + .get(); + assertThat(putResponse.getId(), equalTo("_name")); + try { + Thread.sleep(5000); + } catch (InterruptedException ie) { + } + DeleteWatchResponse deleteWatchResponse = watcherClient().prepareDeleteWatch("_name").setForce(true).get(); + assertThat(deleteWatchResponse.isFound(), is(true)); + deleteWatchResponse = watcherClient().prepareDeleteWatch("_name").get(); + assertThat(deleteWatchResponse.isFound(), is(false)); + WatcherServiceResponse stopResponse = watcherClient().prepareWatchService().stop().get(); + assertThat(stopResponse.isAcknowledged(), is(true)); + ensureWatcherStopped(); + WatcherServiceResponse startResponse = watcherClient().prepareWatchService().start().get(); + assertThat(startResponse.isAcknowledged(), is(true)); + ensureWatcherStarted(); + deleteWatchResponse = watcherClient().prepareDeleteWatch("_name").get(); + assertThat(deleteWatchResponse.isFound(), is(false)); + } + +} diff --git a/src/test/java/org/elasticsearch/watcher/watch/WatchServiceTests.java b/src/test/java/org/elasticsearch/watcher/watch/WatchServiceTests.java index e16d37e80d7..aaa582fe1e7 100644 --- a/src/test/java/org/elasticsearch/watcher/watch/WatchServiceTests.java +++ b/src/test/java/org/elasticsearch/watcher/watch/WatchServiceTests.java @@ -110,14 +110,15 @@ public class WatchServiceTests extends ElasticsearchTestCase { public void testDeleteWatch() throws Exception { TimeValue timeout = TimeValue.timeValueSeconds(5); WatchLockService.Lock lock = mock(WatchLockService.Lock.class); + boolean force = randomBoolean(); when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock); WatchStore.WatchDelete expectedWatchDelete = mock(WatchStore.WatchDelete.class); DeleteResponse deleteResponse = mock(DeleteResponse.class); when(deleteResponse.isFound()).thenReturn(true); when(expectedWatchDelete.deleteResponse()).thenReturn(deleteResponse); - when(watchStore.delete("_id")).thenReturn(expectedWatchDelete); - WatchStore.WatchDelete watchDelete = watcherService.deleteWatch("_id", timeout); + when(watchStore.delete("_id", force)).thenReturn(expectedWatchDelete); + WatchStore.WatchDelete watchDelete = watcherService.deleteWatch("_id", timeout, force); assertThat(watchDelete, sameInstance(expectedWatchDelete)); verify(triggerService, times(1)).remove("_id"); @@ -127,12 +128,31 @@ public class WatchServiceTests extends ElasticsearchTestCase { public void testDeleteWatch_Timeout() throws Exception { TimeValue timeout = TimeValue.timeValueSeconds(5); when(watchLockService.tryAcquire("_id", timeout)).thenReturn(null); - watcherService.deleteWatch("_id", timeout); + watcherService.deleteWatch("_id", timeout, false); } + @Test + public void testDeleteWatch_Force() throws Exception { + TimeValue timeout = TimeValue.timeValueSeconds(5); + WatchLockService.Lock lock = mock(WatchLockService.Lock.class); + when(watchLockService.tryAcquire("_id", timeout)).thenReturn(null); + + WatchStore.WatchDelete expectedWatchDelete = mock(WatchStore.WatchDelete.class); + DeleteResponse deleteResponse = mock(DeleteResponse.class); + when(deleteResponse.isFound()).thenReturn(true); + when(expectedWatchDelete.deleteResponse()).thenReturn(deleteResponse); + when(watchStore.delete("_id", true)).thenReturn(expectedWatchDelete); + WatchStore.WatchDelete watchDelete = watcherService.deleteWatch("_id", timeout, true); + + assertThat(watchDelete, sameInstance(expectedWatchDelete)); + verify(triggerService, times(1)).remove("_id"); + } + + @Test public void testDeleteWatch_NotFound() throws Exception { TimeValue timeout = TimeValue.timeValueSeconds(5); + boolean force = randomBoolean(); WatchLockService.Lock lock = mock(WatchLockService.Lock.class); when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock); @@ -140,8 +160,8 @@ public class WatchServiceTests extends ElasticsearchTestCase { DeleteResponse deleteResponse = mock(DeleteResponse.class); when(deleteResponse.isFound()).thenReturn(false); when(expectedWatchDelete.deleteResponse()).thenReturn(deleteResponse); - when(watchStore.delete("_id")).thenReturn(expectedWatchDelete); - WatchStore.WatchDelete watchDelete = watcherService.deleteWatch("_id", timeout); + when(watchStore.delete("_id", force)).thenReturn(expectedWatchDelete); + WatchStore.WatchDelete watchDelete = watcherService.deleteWatch("_id", timeout, force); assertThat(watchDelete, sameInstance(expectedWatchDelete)); verifyZeroInteractions(triggerService);