diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index 83325f65063..507a67e4378 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -299,22 +299,24 @@ public class ExecutionService extends AbstractComponent { record = createWatchRecord(record, ctx, e); logWatchRecord(ctx, e); } finally { - if (ctx.knownWatch() && record != null && ctx.recordExecution()) { - try { - if (ctx.overrideRecordOnConflict()) { - historyStore.forcePut(record); - } else { - historyStore.put(record); + if (ctx.knownWatch()) { + if (record != null && ctx.recordExecution()) { + try { + if (ctx.overrideRecordOnConflict()) { + historyStore.forcePut(record); + } else { + historyStore.put(record); + } + } catch (Exception e) { + logger.error((Supplier) () -> new ParameterizedMessage("failed to update watch record [{}]", ctx.id()), e); + // TODO log watch record in logger, when saving in history store failed, otherwise the info is gone! } - } catch (Exception e) { - logger.error((Supplier) () -> new ParameterizedMessage("failed to update watch record [{}]", ctx.id()), e); - // TODO log watch record in logger, when saving in history store failed, otherwise the info is gone! } - } - try { - triggeredWatchStore.delete(ctx.id()); - } catch (Exception e) { - logger.error((Supplier) () -> new ParameterizedMessage("failed to delete triggered watch [{}]", ctx.id()), e); + try { + triggeredWatchStore.delete(ctx.id()); + } catch (Exception e) { + logger.error((Supplier) () -> new ParameterizedMessage("failed to delete triggered watch [{}]", ctx.id()), e); + } } currentExecutions.remove(ctx.watch().id()); logger.trace("finished [{}]/[{}]", ctx.watch().id(), ctx.id()); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java index c66c1bfc4c3..05e31ff687a 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java @@ -39,6 +39,7 @@ import org.elasticsearch.xpack.watcher.input.Input; import org.elasticsearch.xpack.watcher.support.xcontent.XContentSource; import org.elasticsearch.xpack.watcher.transform.ExecutableTransform; import org.elasticsearch.xpack.watcher.transform.Transform; +import org.elasticsearch.xpack.watcher.trigger.manual.ManualTriggerEvent; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent; import org.elasticsearch.xpack.watcher.watch.Payload; import org.elasticsearch.xpack.watcher.watch.Watch; @@ -801,6 +802,45 @@ public class ExecutionServiceTests extends ESTestCase { verify(historyStore, times(1)).forcePut(any(WatchRecord.class)); } + public void testThatTriggeredWatchDeletionHappensOnlyIfWatchExists() throws Exception { + Watch watch = mock(Watch.class); + when(watch.id()).thenReturn("_id"); + GetResponse getResponse = mock(GetResponse.class); + when(getResponse.isExists()).thenReturn(true); + mockGetWatchResponse(client, "_id", getResponse); + + DateTime now = new DateTime(clock.millis()); + ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); + WatchExecutionContext context = ManualExecutionContext.builder(watch, false, new ManualTriggerEvent("foo", event), + timeValueSeconds(5)).build(); + + // action throttler, no throttling + Throttler.Result throttleResult = mock(Throttler.Result.class); + when(throttleResult.throttle()).thenReturn(false); + ActionThrottler throttler = mock(ActionThrottler.class); + when(throttler.throttle("_action", context)).thenReturn(throttleResult); + + // the action + Action.Result actionResult = mock(Action.Result.class); + when(actionResult.type()).thenReturn("_action_type"); + when(actionResult.status()).thenReturn(Action.Result.Status.SUCCESS); + ExecutableAction action = mock(ExecutableAction.class); + when(action.type()).thenReturn("MY_AWESOME_TYPE"); + when(action.execute("_action", context, payload)).thenReturn(actionResult); + + ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, null, null, action); + + WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now))); + + when(watch.input()).thenReturn(input); + when(watch.condition()).thenReturn(AlwaysCondition.INSTANCE); + when(watch.actions()).thenReturn(Arrays.asList(actionWrapper)); + when(watch.status()).thenReturn(watchStatus); + + executionService.execute(context); + verify(triggeredWatchStore, never()).delete(any()); + } + public void testThatSingleWatchCannotBeExecutedConcurrently() throws Exception { WatchExecutionContext ctx = mock(WatchExecutionContext.class); Watch watch = mock(Watch.class);