From 87edc4bfdd354efe872102f45f1ea231cb11f099 Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Wed, 7 Jun 2017 09:19:24 +0200 Subject: [PATCH] Watcher: Only delete triggered watch if watch was known (elastic/x-pack-elasticsearch#1562) When a user executes a watch and specifies it as part of the execute watch API, no triggered watch is created, as the watch cannot be picked up anymore (it only leaves for the duration of the request). However until now the TriggeredWatchStore was invoked and tried to delete this non-existing triggered watch, resulting in some log cluttering. This commit removes this try to delete a non-existing triggered watch. Original commit: elastic/x-pack-elasticsearch@3db125cea2f11d82ce9107632506a7373b2b5a14 --- .../watcher/execution/ExecutionService.java | 30 +++++++------- .../execution/ExecutionServiceTests.java | 40 +++++++++++++++++++ 2 files changed, 56 insertions(+), 14 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index 83325f65063..507a67e4378 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -299,22 +299,24 @@ public class ExecutionService extends AbstractComponent { record = createWatchRecord(record, ctx, e); logWatchRecord(ctx, e); } finally { - if (ctx.knownWatch() && record != null && ctx.recordExecution()) { - try { - if (ctx.overrideRecordOnConflict()) { - historyStore.forcePut(record); - } else { - historyStore.put(record); + if (ctx.knownWatch()) { + if (record != null && ctx.recordExecution()) { + try { + if (ctx.overrideRecordOnConflict()) { + historyStore.forcePut(record); + } else { + historyStore.put(record); + } + } catch (Exception e) { + logger.error((Supplier) () -> new ParameterizedMessage("failed to update watch record [{}]", ctx.id()), e); + // TODO log watch record in logger, when saving in history store failed, otherwise the info is gone! } - } catch (Exception e) { - logger.error((Supplier) () -> new ParameterizedMessage("failed to update watch record [{}]", ctx.id()), e); - // TODO log watch record in logger, when saving in history store failed, otherwise the info is gone! } - } - try { - triggeredWatchStore.delete(ctx.id()); - } catch (Exception e) { - logger.error((Supplier) () -> new ParameterizedMessage("failed to delete triggered watch [{}]", ctx.id()), e); + try { + triggeredWatchStore.delete(ctx.id()); + } catch (Exception e) { + logger.error((Supplier) () -> new ParameterizedMessage("failed to delete triggered watch [{}]", ctx.id()), e); + } } currentExecutions.remove(ctx.watch().id()); logger.trace("finished [{}]/[{}]", ctx.watch().id(), ctx.id()); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java index c66c1bfc4c3..05e31ff687a 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java @@ -39,6 +39,7 @@ import org.elasticsearch.xpack.watcher.input.Input; import org.elasticsearch.xpack.watcher.support.xcontent.XContentSource; import org.elasticsearch.xpack.watcher.transform.ExecutableTransform; import org.elasticsearch.xpack.watcher.transform.Transform; +import org.elasticsearch.xpack.watcher.trigger.manual.ManualTriggerEvent; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent; import org.elasticsearch.xpack.watcher.watch.Payload; import org.elasticsearch.xpack.watcher.watch.Watch; @@ -801,6 +802,45 @@ public class ExecutionServiceTests extends ESTestCase { verify(historyStore, times(1)).forcePut(any(WatchRecord.class)); } + public void testThatTriggeredWatchDeletionHappensOnlyIfWatchExists() throws Exception { + Watch watch = mock(Watch.class); + when(watch.id()).thenReturn("_id"); + GetResponse getResponse = mock(GetResponse.class); + when(getResponse.isExists()).thenReturn(true); + mockGetWatchResponse(client, "_id", getResponse); + + DateTime now = new DateTime(clock.millis()); + ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); + WatchExecutionContext context = ManualExecutionContext.builder(watch, false, new ManualTriggerEvent("foo", event), + timeValueSeconds(5)).build(); + + // action throttler, no throttling + Throttler.Result throttleResult = mock(Throttler.Result.class); + when(throttleResult.throttle()).thenReturn(false); + ActionThrottler throttler = mock(ActionThrottler.class); + when(throttler.throttle("_action", context)).thenReturn(throttleResult); + + // the action + Action.Result actionResult = mock(Action.Result.class); + when(actionResult.type()).thenReturn("_action_type"); + when(actionResult.status()).thenReturn(Action.Result.Status.SUCCESS); + ExecutableAction action = mock(ExecutableAction.class); + when(action.type()).thenReturn("MY_AWESOME_TYPE"); + when(action.execute("_action", context, payload)).thenReturn(actionResult); + + ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, null, null, action); + + WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now))); + + when(watch.input()).thenReturn(input); + when(watch.condition()).thenReturn(AlwaysCondition.INSTANCE); + when(watch.actions()).thenReturn(Arrays.asList(actionWrapper)); + when(watch.status()).thenReturn(watchStatus); + + executionService.execute(context); + verify(triggeredWatchStore, never()).delete(any()); + } + public void testThatSingleWatchCannotBeExecutedConcurrently() throws Exception { WatchExecutionContext ctx = mock(WatchExecutionContext.class); Watch watch = mock(Watch.class);