From eef3a1b1e83d4bcb9b7b407c18ae06c5230ff115 Mon Sep 17 00:00:00 2001 From: Brian Murphy Date: Fri, 8 May 2015 17:19:24 -0400 Subject: [PATCH] Add force parameter to delete to force a delete This change adds a force paramter to delete to allow the forcing of a delete to happen ignoring locking. This means that watch executions may fail when they go to update the watch. Watches executing from the scheduler will fail fast if the watch they are supposed to execute has been removed. Also move the history store updates outside of the watch locks. Fixes: elastic/elasticsearch#405 Original commit: elastic/x-pack-elasticsearch@57561b6f85c32bb31e342adddf94882a692b2b6b --- rest-api-spec/api/watcher.delete_watch.json | 4 + .../elasticsearch/watcher/WatcherService.java | 22 +++-- .../watcher/execution/ExecutionService.java | 45 ++++++---- .../rest/action/RestDeleteWatchAction.java | 7 +- .../actions/delete/DeleteWatchRequest.java | 18 ++++ .../delete/DeleteWatchRequestBuilder.java | 8 ++ .../delete/TransportDeleteWatchAction.java | 2 +- .../elasticsearch/watcher/watch/Watch.java | 10 +++ .../watcher/watch/WatchStore.java | 43 ++++----- .../execution/ManualExecutionTests.java | 90 +++++++++++++++++++ .../test/integration/WatchCrudTests.java | 3 +- .../integration/WatchForceDeleteTests.java | 61 +++++++++++++ .../watcher/watch/WatchServiceTests.java | 30 +++++-- 13 files changed, 289 insertions(+), 54 deletions(-) create mode 100644 src/test/java/org/elasticsearch/watcher/test/integration/WatchForceDeleteTests.java diff --git a/rest-api-spec/api/watcher.delete_watch.json b/rest-api-spec/api/watcher.delete_watch.json index 214008893f7..70180c40e6e 100644 --- a/rest-api-spec/api/watcher.delete_watch.json +++ b/rest-api-spec/api/watcher.delete_watch.json @@ -16,6 +16,10 @@ "master_timeout": { "type": "duration", "description": "Specify timeout for watch write operation" + }, + "force": { + "type": "boolean", + "description": "Specify if this request should be forced and ignore locks" } } }, diff --git a/src/main/java/org/elasticsearch/watcher/WatcherService.java b/src/main/java/org/elasticsearch/watcher/WatcherService.java index 45eafc46540..d6433cdb2b2 100644 --- a/src/main/java/org/elasticsearch/watcher/WatcherService.java +++ b/src/main/java/org/elasticsearch/watcher/WatcherService.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.joda.time.PeriodType; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.watcher.execution.ExecutionService; import org.elasticsearch.watcher.trigger.TriggerService; import org.elasticsearch.watcher.watch.Watch; @@ -78,20 +79,25 @@ public class WatcherService extends AbstractComponent { } } - public WatchStore.WatchDelete deleteWatch(String id, TimeValue timeout) { + public WatchStore.WatchDelete deleteWatch(String id, TimeValue timeout, final boolean force) { ensureStarted(); - WatchLockService.Lock lock = watchLockService.tryAcquire(id, timeout); - if (lock == null) { - throw new TimeoutException("could not delete watch [{}] within [{}]... wait and try again. If this error continues to occur there is a high chance that the watch execution is stuck (either due to unresponsive external system such as an email service, or due to a bad script", id, timeout.format(PeriodType.seconds())); + WatchLockService.Lock lock = null; + if (!force) { + lock = watchLockService.tryAcquire(id, timeout); + if (lock == null) { + throw new TimeoutException("could not delete watch [{}] within [{}]... wait and try again. If this error continues to occur there is a high chance that the watch execution is stuck (either due to unresponsive external system such as an email service, or due to a bad script", id, timeout.format(PeriodType.seconds())); + } } try { - WatchStore.WatchDelete delete = watchStore.delete(id); + WatchStore.WatchDelete delete = watchStore.delete(id, force); if (delete.deleteResponse().isFound()) { triggerService.remove(id); } return delete; } finally { - lock.release(); + if (lock != null) { + lock.release(); + } } } @@ -145,7 +151,9 @@ public class WatcherService extends AbstractComponent { try { watchStore.updateStatus(watch); } catch (IOException ioe) { - throw new WatcherException("failed to update the watch on ack", ioe); + throw new WatcherException("failed to update the watch [{}] on ack", ioe, watch.id()); + } catch (VersionConflictEngineException vcee) { + throw new WatcherException("failed to update the watch [{}] on ack, perhaps it was force deleted", vcee, watch.id()); } } // we need to create a safe copy of the status diff --git a/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java b/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java index af715379e01..251c1e1b0e8 100644 --- a/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java +++ b/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.watcher.WatcherException; import org.elasticsearch.watcher.actions.ActionWrapper; import org.elasticsearch.watcher.condition.Condition; @@ -209,18 +210,21 @@ public class ExecutionService extends AbstractComponent { public WatchRecord execute(WatchExecutionContext ctx) throws IOException { WatchRecord watchRecord = new WatchRecord(ctx.id(), ctx.watch(), ctx.triggerEvent()); - WatchLockService.Lock lock = watchLockService.acquire(ctx.watch().id()); try { WatchExecutionResult result = executeInner(ctx); watchRecord.seal(result); + if (ctx.recordExecution()) { + watchStore.updateStatus(ctx.watch()); + } + } catch (VersionConflictEngineException vcee) { + throw new WatcherException("Failed to update the watch [{}] on execute perhaps it was force deleted", vcee, ctx.watch().id()); } finally { lock.release(); } if (ctx.recordExecution()) { historyStore.put(watchRecord); } - watchStore.updateStatus(ctx.watch()); return watchRecord; } @@ -316,28 +320,27 @@ public class ExecutionService extends AbstractComponent { return; } logger.trace("executing [{}] [{}]", ctx.watch().id(), ctx.id()); + WatchLockService.Lock lock = watchLockService.acquire(ctx.watch().id()); try { - watchRecord.update(WatchRecord.State.CHECKING, null); - logger.debug("checking watch [{}]", watchRecord.watchId()); - WatchExecutionResult result = executeInner(ctx); - watchRecord.seal(result); - if (ctx.recordExecution()) { - historyStore.update(watchRecord); + if (watchStore.get(ctx.watch().id()) == null) { + //Fail fast if we are trying to execute a deleted watch + String message = "unable to find watch for record [" + watchRecord.id() + "], perhaps it has been deleted, ignoring..."; + watchRecord.update(WatchRecord.State.DELETED_WHILE_QUEUED, message); + } else { + watchRecord.update(WatchRecord.State.CHECKING, null); + logger.debug("checking watch [{}]", watchRecord.watchId()); + WatchExecutionResult result = executeInner(ctx); + watchRecord.seal(result); + if (ctx.recordExecution()) { + watchStore.updateStatus(ctx.watch()); + } } - watchStore.updateStatus(ctx.watch()); } catch (Exception e) { if (started()) { String detailedMessage = ExceptionsHelper.detailedMessage(e); logger.warn("failed to execute watch [{}]/[{}], failure [{}]", watchRecord.watchId(), ctx.id(), detailedMessage); - try { - watchRecord.update(WatchRecord.State.FAILED, detailedMessage); - if (ctx.recordExecution()) { - historyStore.update(watchRecord); - } - } catch (Exception e2) { - logger.error("failed to update watch record [{}]/[{}], failure [{}], original failure [{}]", watchRecord.watchId(), ctx.id(), ExceptionsHelper.detailedMessage(e2), detailedMessage); - } + watchRecord.update(WatchRecord.State.FAILED, detailedMessage); } else { logger.debug("failed to execute watch [{}] after shutdown", e, watchRecord); } @@ -345,6 +348,14 @@ public class ExecutionService extends AbstractComponent { lock.release(); logger.trace("finished [{}]/[{}]", ctx.watch().id(), ctx.id()); } + + if (ctx.recordExecution() && started()) { + try { + historyStore.update(watchRecord); + } catch (Exception e) { + logger.error("failed to update watch record [{}]/[{}], failure [{}], record failure if any [{}]", watchRecord.watchId(), ctx.id(), ExceptionsHelper.detailedMessage(e), watchRecord.message()); + } + } } } } diff --git a/src/main/java/org/elasticsearch/watcher/rest/action/RestDeleteWatchAction.java b/src/main/java/org/elasticsearch/watcher/rest/action/RestDeleteWatchAction.java index 0f223506877..d04e678ee2a 100644 --- a/src/main/java/org/elasticsearch/watcher/rest/action/RestDeleteWatchAction.java +++ b/src/main/java/org/elasticsearch/watcher/rest/action/RestDeleteWatchAction.java @@ -32,9 +32,10 @@ public class RestDeleteWatchAction extends WatcherRestHandler { @Override protected void handleRequest(final RestRequest request, RestChannel channel, WatcherClient client) throws Exception { - DeleteWatchRequest indexWatchRequest = new DeleteWatchRequest(request.param("id")); - indexWatchRequest.masterNodeTimeout(request.paramAsTime("master_timeout", indexWatchRequest.masterNodeTimeout())); - client.deleteWatch(indexWatchRequest, new RestBuilderListener(channel) { + DeleteWatchRequest deleteWatchRequest = new DeleteWatchRequest(request.param("id")); + deleteWatchRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteWatchRequest.masterNodeTimeout())); + deleteWatchRequest.setForce(request.paramAsBoolean("force", deleteWatchRequest.isForce())); + client.deleteWatch(deleteWatchRequest, new RestBuilderListener(channel) { @Override public RestResponse buildResponse(DeleteWatchResponse response, XContentBuilder builder) throws Exception { builder.startObject() diff --git a/src/main/java/org/elasticsearch/watcher/transport/actions/delete/DeleteWatchRequest.java b/src/main/java/org/elasticsearch/watcher/transport/actions/delete/DeleteWatchRequest.java index 7e2d500a12b..f9336dddc16 100644 --- a/src/main/java/org/elasticsearch/watcher/transport/actions/delete/DeleteWatchRequest.java +++ b/src/main/java/org/elasticsearch/watcher/transport/actions/delete/DeleteWatchRequest.java @@ -24,6 +24,7 @@ public class DeleteWatchRequest extends MasterNodeOperationRequest listener) { new WatcherClient(client).deleteWatch(request, listener); diff --git a/src/main/java/org/elasticsearch/watcher/transport/actions/delete/TransportDeleteWatchAction.java b/src/main/java/org/elasticsearch/watcher/transport/actions/delete/TransportDeleteWatchAction.java index c96af276c91..784f717fb92 100644 --- a/src/main/java/org/elasticsearch/watcher/transport/actions/delete/TransportDeleteWatchAction.java +++ b/src/main/java/org/elasticsearch/watcher/transport/actions/delete/TransportDeleteWatchAction.java @@ -54,7 +54,7 @@ public class TransportDeleteWatchAction extends WatcherTransportAction listener) throws ElasticsearchException { try { - DeleteResponse deleteResponse = watcherService.deleteWatch(request.getId(), request.masterNodeTimeout()).deleteResponse(); + DeleteResponse deleteResponse = watcherService.deleteWatch(request.getId(), request.masterNodeTimeout(), request.isForce()).deleteResponse(); DeleteWatchResponse response = new DeleteWatchResponse(deleteResponse.getId(), deleteResponse.getVersion(), deleteResponse.isFound()); listener.onResponse(response); } catch (Exception e) { diff --git a/src/main/java/org/elasticsearch/watcher/watch/Watch.java b/src/main/java/org/elasticsearch/watcher/watch/Watch.java index ed68f44575a..503e3937dbb 100644 --- a/src/main/java/org/elasticsearch/watcher/watch/Watch.java +++ b/src/main/java/org/elasticsearch/watcher/watch/Watch.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.joda.time.DateTime; +import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; @@ -66,6 +67,8 @@ public class Watch implements TriggerEngine.Job, ToXContent { private final Status status; private final TimeValue throttlePeriod; + private transient long version = Versions.NOT_SET; + @Nullable private final Map metadata; @@ -126,6 +129,13 @@ public class Watch implements TriggerEngine.Job, ToXContent { return status; } + public long version() { + return version; + } + + public void version(long version) { + this.version = version; + } /** * Acks this watch. * diff --git a/src/main/java/org/elasticsearch/watcher/watch/WatchStore.java b/src/main/java/org/elasticsearch/watcher/watch/WatchStore.java index da8e974884c..7e5674245a2 100644 --- a/src/main/java/org/elasticsearch/watcher/watch/WatchStore.java +++ b/src/main/java/org/elasticsearch/watcher/watch/WatchStore.java @@ -21,6 +21,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; @@ -114,22 +115,22 @@ public class WatchStore extends AbstractComponent { } /** - * Returns the watch with the specified name otherwise null is returned. + * Returns the watch with the specified id otherwise null is returned. */ - public Watch get(String name) { + public Watch get(String id) { ensureStarted(); - return watches.get(name); + return watches.get(id); } /** - * Creates an watch with the specified name and source. If an watch with the specified name already exists it will - * get overwritten. + * Creates an watch if this watch already exists it will be overwritten */ public WatchPut put(Watch watch) { ensureStarted(); - IndexRequest indexRequest = createIndexRequest(watch.id(), watch.getAsBytes()); + IndexRequest indexRequest = createIndexRequest(watch.id(), watch.getAsBytes(), Versions.MATCH_ANY); IndexResponse response = client.index(indexRequest); watch.status().version(response.getVersion()); + watch.version(response.getVersion()); Watch previous = watches.put(watch.id(), watch); return new WatchPut(previous, watch, response); } @@ -151,25 +152,25 @@ public class WatchStore extends AbstractComponent { */ void update(Watch watch) throws IOException { ensureStarted(); - assert watch == watches.get(watch.id()) : "update watch can only be applied to an already loaded watch"; BytesReference source = JsonXContent.contentBuilder().value(watch).bytes(); - IndexResponse response = client.index(createIndexRequest(watch.id(), source)); + IndexResponse response = client.index(createIndexRequest(watch.id(), source, watch.version())); watch.status().version(response.getVersion()); + watch.version(response.getVersion()); watch.status().dirty(false); // Don't need to update the watches, since we are working on an instance from it. } /** - * Deletes the watch with the specified name if exists + * Deletes the watch with the specified id if exists */ - public WatchDelete delete(String name) { + public WatchDelete delete(String id, boolean force) { ensureStarted(); - Watch watch = watches.remove(name); + Watch watch = watches.remove(id); // even if the watch was not found in the watch map, we should still try to delete it // from the index, just to make sure we don't leave traces of it - DeleteRequest request = new DeleteRequest(INDEX, DOC_TYPE, name); - if (watch != null) { - request.version(watch.status().version()); + DeleteRequest request = new DeleteRequest(INDEX, DOC_TYPE, id); + if (watch != null && !force) { + request.version(watch.version()); } DeleteResponse response = client.delete(request).actionGet(); return new WatchDelete(response); @@ -179,10 +180,11 @@ public class WatchStore extends AbstractComponent { return watches; } - IndexRequest createIndexRequest(String name, BytesReference source) { - IndexRequest indexRequest = new IndexRequest(INDEX, DOC_TYPE, name); + IndexRequest createIndexRequest(String id, BytesReference source, long version) { + IndexRequest indexRequest = new IndexRequest(INDEX, DOC_TYPE, id); indexRequest.listenerThreaded(false); indexRequest.source(source, false); + indexRequest.version(version); return indexRequest; } @@ -216,14 +218,15 @@ public class WatchStore extends AbstractComponent { response = client.searchScroll(response.getScrollId(), scrollTimeout); while (response.getHits().hits().length != 0) { for (SearchHit hit : response.getHits()) { - String name = hit.getId(); + String id = hit.getId(); try { - Watch watch = watchParser.parse(name, true, hit.getSourceRef()); + Watch watch = watchParser.parse(id, true, hit.getSourceRef()); watch.status().version(hit.version()); - watches.put(name, watch); + watch.version(hit.version()); + watches.put(id, watch); count++; } catch (Exception e) { - logger.error("couldn't load watch [{}], ignoring it...", e, name); + logger.error("couldn't load watch [{}], ignoring it...", e, id); } } response = client.searchScroll(response.getScrollId(), scrollTimeout); diff --git a/src/test/java/org/elasticsearch/watcher/execution/ManualExecutionTests.java b/src/test/java/org/elasticsearch/watcher/execution/ManualExecutionTests.java index 0f8fe947833..962030b9f72 100644 --- a/src/test/java/org/elasticsearch/watcher/execution/ManualExecutionTests.java +++ b/src/test/java/org/elasticsearch/watcher/execution/ManualExecutionTests.java @@ -5,17 +5,25 @@ */ package org.elasticsearch.watcher.execution; +import org.apache.lucene.util.LuceneTestCase.Slow; import com.carrotsearch.randomizedtesting.annotations.Repeat; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.engine.VersionConflictEngineException; +import org.elasticsearch.watcher.WatcherException; +import org.elasticsearch.watcher.WatcherService; import org.elasticsearch.watcher.actions.logging.LoggingAction; import org.elasticsearch.watcher.client.WatchSourceBuilder; import org.elasticsearch.watcher.condition.always.AlwaysCondition; +import org.elasticsearch.watcher.condition.script.ScriptCondition; import org.elasticsearch.watcher.history.HistoryStore; import org.elasticsearch.watcher.history.WatchRecord; import org.elasticsearch.watcher.input.simple.SimpleInput; +import org.elasticsearch.watcher.support.Script; import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests; +import org.elasticsearch.watcher.throttle.Throttler; +import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchResponse; import org.elasticsearch.watcher.transport.actions.execute.ExecuteWatchResponse; import org.elasticsearch.watcher.transport.actions.get.GetWatchRequest; import org.elasticsearch.watcher.transport.actions.put.PutWatchRequest; @@ -27,8 +35,11 @@ import org.elasticsearch.watcher.watch.Payload; import org.elasticsearch.watcher.watch.Watch; import org.junit.Test; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static org.elasticsearch.common.joda.time.DateTimeZone.UTC; @@ -222,4 +233,83 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests { watchRecord = watchRecordParser.parse(wid.value(), 1, executeWatchResponse.getSource().getBytes()); assertThat(watchRecord.state(), equalTo(WatchRecord.State.THROTTLED)); } + + + @Test + @Slow + public void testForceDeletionOfLongRunningWatch() throws Exception { + WatchSourceBuilder watchBuilder = watchBuilder() + .trigger(schedule(cron("0 0 0 1 * ? 2099"))) + .input(simpleInput("foo", "bar")) + .condition(new ScriptCondition((new Script.Builder.Inline("sleep 10000; return true")).build())) + .throttlePeriod(new TimeValue(1, TimeUnit.HOURS)) + .addAction("log", loggingAction("foobar")); + + int numberOfThreads = scaledRandomIntBetween(1, 5); + PutWatchResponse putWatchResponse = watcherClient().putWatch(new PutWatchRequest("_id", watchBuilder)).actionGet(); + assertThat(putWatchResponse.getVersion(), greaterThan(0L)); + refresh(); + assertThat(watcherClient().getWatch(new GetWatchRequest("_id")).actionGet().isFound(), equalTo(true)); + + + CountDownLatch startLatch = new CountDownLatch(1); + + List threads = new ArrayList<>(); + for (int i = 0; i < numberOfThreads; ++i) { + threads.add(new Thread(new ExecutionRunner(watchService(), executionService(), "_id", startLatch))); + } + + for (Thread thread : threads) { + thread.start(); + } + DeleteWatchResponse deleteWatchResponse = watcherClient().prepareDeleteWatch("_id").setForce(true).get(); + assertThat(deleteWatchResponse.isFound(), is(true)); + + deleteWatchResponse = watcherClient().prepareDeleteWatch("_id").get(); + assertThat(deleteWatchResponse.isFound(), is(false)); + + startLatch.countDown(); + + long startJoin = System.currentTimeMillis(); + for (Thread thread : threads) { + thread.join(); + } + long endJoin = System.currentTimeMillis(); + TimeValue tv = new TimeValue(10 * (numberOfThreads+1), TimeUnit.SECONDS); + assertThat("Shouldn't take longer than [" + tv.getSeconds() + "] seconds for all the threads to stop", (endJoin - startJoin), lessThan(tv.getMillis())); + } + + private static class ExecutionRunner implements Runnable { + + final WatcherService watcherService; + final ExecutionService executionService; + final String watchId; + final CountDownLatch startLatch; + final ManualExecutionContext.Builder ctxBuilder; + + private ExecutionRunner(WatcherService watcherService, ExecutionService executionService, String watchId, CountDownLatch startLatch) { + this.watcherService = watcherService; + this.executionService = executionService; + this.watchId = watchId; + this.startLatch = startLatch; + ManualTriggerEvent triggerEvent = new ManualTriggerEvent(watchId, new ScheduleTriggerEvent(new DateTime(UTC), new DateTime(UTC))); + ctxBuilder = ManualExecutionContext.builder(watcherService.getWatch(watchId), triggerEvent); + ctxBuilder.recordExecution(true); + ctxBuilder.withThrottle(Throttler.Result.NO); + } + + @Override + public void run() { + try { + startLatch.await(); + executionService.execute(ctxBuilder.build()); + fail("Execution of a deleted watch should fail but didn't"); + } catch (WatcherException we) { + assertThat(we.getCause(), instanceOf(VersionConflictEngineException.class)); + } catch (Throwable t) { + throw new WatcherException("Failure mode execution of [{}] failed in an unexpected way", t, watchId); + } + } + } + } diff --git a/src/test/java/org/elasticsearch/watcher/test/integration/WatchCrudTests.java b/src/test/java/org/elasticsearch/watcher/test/integration/WatchCrudTests.java index 30b0ce14b18..d58690969d5 100644 --- a/src/test/java/org/elasticsearch/watcher/test/integration/WatchCrudTests.java +++ b/src/test/java/org/elasticsearch/watcher/test/integration/WatchCrudTests.java @@ -7,13 +7,13 @@ package org.elasticsearch.watcher.test.integration; import com.carrotsearch.randomizedtesting.annotations.Repeat; import org.elasticsearch.watcher.client.WatchSourceBuilder; +import org.elasticsearch.watcher.support.xcontent.XContentSource; import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests; import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchRequest; import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchResponse; import org.elasticsearch.watcher.transport.actions.get.GetWatchRequest; import org.elasticsearch.watcher.transport.actions.get.GetWatchResponse; import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse; -import org.elasticsearch.watcher.support.xcontent.XContentSource; import org.junit.Test; import java.util.Map; @@ -134,4 +134,5 @@ public class WatchCrudTests extends AbstractWatcherIntegrationTests { assertThat(response.getVersion(), is(1L)); assertThat(response.isFound(), is(false)); } + } diff --git a/src/test/java/org/elasticsearch/watcher/test/integration/WatchForceDeleteTests.java b/src/test/java/org/elasticsearch/watcher/test/integration/WatchForceDeleteTests.java new file mode 100644 index 00000000000..39038c7db86 --- /dev/null +++ b/src/test/java/org/elasticsearch/watcher/test/integration/WatchForceDeleteTests.java @@ -0,0 +1,61 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.test.integration; + +import org.apache.lucene.util.LuceneTestCase.Slow; +import org.elasticsearch.watcher.support.Script; +import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests; +import org.elasticsearch.watcher.transport.actions.delete.DeleteWatchResponse; +import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse; +import org.elasticsearch.watcher.transport.actions.service.WatcherServiceResponse; +import org.junit.Test; + +import static org.elasticsearch.watcher.actions.ActionBuilders.loggingAction; +import static org.elasticsearch.watcher.client.WatchSourceBuilders.watchBuilder; +import static org.elasticsearch.watcher.condition.ConditionBuilders.scriptCondition; +import static org.elasticsearch.watcher.input.InputBuilders.simpleInput; +import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule; +import static org.elasticsearch.watcher.trigger.schedule.Schedules.interval; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +/** + */ +public class WatchForceDeleteTests extends AbstractWatcherIntegrationTests { + + protected boolean timeWarped() { + return false; //Disable time warping for the force delete long running watch test + } + + @Test + @Slow + public void testForceDelete_LongRunningWatch() throws Exception { + PutWatchResponse putResponse = watcherClient().preparePutWatch("_name").setSource(watchBuilder() + .trigger(schedule(interval("1s"))) + .input(simpleInput()) + .condition(scriptCondition(Script.inline("sleep 5000; return true"))) + .addAction("_action1", loggingAction("{{ctx.watch_id}}"))) + .get(); + assertThat(putResponse.getId(), equalTo("_name")); + try { + Thread.sleep(5000); + } catch (InterruptedException ie) { + } + DeleteWatchResponse deleteWatchResponse = watcherClient().prepareDeleteWatch("_name").setForce(true).get(); + assertThat(deleteWatchResponse.isFound(), is(true)); + deleteWatchResponse = watcherClient().prepareDeleteWatch("_name").get(); + assertThat(deleteWatchResponse.isFound(), is(false)); + WatcherServiceResponse stopResponse = watcherClient().prepareWatchService().stop().get(); + assertThat(stopResponse.isAcknowledged(), is(true)); + ensureWatcherStopped(); + WatcherServiceResponse startResponse = watcherClient().prepareWatchService().start().get(); + assertThat(startResponse.isAcknowledged(), is(true)); + ensureWatcherStarted(); + deleteWatchResponse = watcherClient().prepareDeleteWatch("_name").get(); + assertThat(deleteWatchResponse.isFound(), is(false)); + } + +} diff --git a/src/test/java/org/elasticsearch/watcher/watch/WatchServiceTests.java b/src/test/java/org/elasticsearch/watcher/watch/WatchServiceTests.java index e16d37e80d7..aaa582fe1e7 100644 --- a/src/test/java/org/elasticsearch/watcher/watch/WatchServiceTests.java +++ b/src/test/java/org/elasticsearch/watcher/watch/WatchServiceTests.java @@ -110,14 +110,15 @@ public class WatchServiceTests extends ElasticsearchTestCase { public void testDeleteWatch() throws Exception { TimeValue timeout = TimeValue.timeValueSeconds(5); WatchLockService.Lock lock = mock(WatchLockService.Lock.class); + boolean force = randomBoolean(); when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock); WatchStore.WatchDelete expectedWatchDelete = mock(WatchStore.WatchDelete.class); DeleteResponse deleteResponse = mock(DeleteResponse.class); when(deleteResponse.isFound()).thenReturn(true); when(expectedWatchDelete.deleteResponse()).thenReturn(deleteResponse); - when(watchStore.delete("_id")).thenReturn(expectedWatchDelete); - WatchStore.WatchDelete watchDelete = watcherService.deleteWatch("_id", timeout); + when(watchStore.delete("_id", force)).thenReturn(expectedWatchDelete); + WatchStore.WatchDelete watchDelete = watcherService.deleteWatch("_id", timeout, force); assertThat(watchDelete, sameInstance(expectedWatchDelete)); verify(triggerService, times(1)).remove("_id"); @@ -127,12 +128,31 @@ public class WatchServiceTests extends ElasticsearchTestCase { public void testDeleteWatch_Timeout() throws Exception { TimeValue timeout = TimeValue.timeValueSeconds(5); when(watchLockService.tryAcquire("_id", timeout)).thenReturn(null); - watcherService.deleteWatch("_id", timeout); + watcherService.deleteWatch("_id", timeout, false); } + @Test + public void testDeleteWatch_Force() throws Exception { + TimeValue timeout = TimeValue.timeValueSeconds(5); + WatchLockService.Lock lock = mock(WatchLockService.Lock.class); + when(watchLockService.tryAcquire("_id", timeout)).thenReturn(null); + + WatchStore.WatchDelete expectedWatchDelete = mock(WatchStore.WatchDelete.class); + DeleteResponse deleteResponse = mock(DeleteResponse.class); + when(deleteResponse.isFound()).thenReturn(true); + when(expectedWatchDelete.deleteResponse()).thenReturn(deleteResponse); + when(watchStore.delete("_id", true)).thenReturn(expectedWatchDelete); + WatchStore.WatchDelete watchDelete = watcherService.deleteWatch("_id", timeout, true); + + assertThat(watchDelete, sameInstance(expectedWatchDelete)); + verify(triggerService, times(1)).remove("_id"); + } + + @Test public void testDeleteWatch_NotFound() throws Exception { TimeValue timeout = TimeValue.timeValueSeconds(5); + boolean force = randomBoolean(); WatchLockService.Lock lock = mock(WatchLockService.Lock.class); when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock); @@ -140,8 +160,8 @@ public class WatchServiceTests extends ElasticsearchTestCase { DeleteResponse deleteResponse = mock(DeleteResponse.class); when(deleteResponse.isFound()).thenReturn(false); when(expectedWatchDelete.deleteResponse()).thenReturn(deleteResponse); - when(watchStore.delete("_id")).thenReturn(expectedWatchDelete); - WatchStore.WatchDelete watchDelete = watcherService.deleteWatch("_id", timeout); + when(watchStore.delete("_id", force)).thenReturn(expectedWatchDelete); + WatchStore.WatchDelete watchDelete = watcherService.deleteWatch("_id", timeout, force); assertThat(watchDelete, sameInstance(expectedWatchDelete)); verifyZeroInteractions(triggerService);