Watcher: Only delete triggered watch if watch was known (elastic/x-pack-elasticsearch#1562)
When a user executes a watch and specifies it as part of the execute watch API, no triggered watch is created, as the watch cannot be picked up anymore (it only leaves for the duration of the request). However until now the TriggeredWatchStore was invoked and tried to delete this non-existing triggered watch, resulting in some log cluttering. This commit removes this try to delete a non-existing triggered watch. Original commit: elastic/x-pack-elasticsearch@3db125cea2
This commit is contained in:
parent
4b2d4a1e3b
commit
87edc4bfdd
|
@ -299,22 +299,24 @@ public class ExecutionService extends AbstractComponent {
|
|||
record = createWatchRecord(record, ctx, e);
|
||||
logWatchRecord(ctx, e);
|
||||
} finally {
|
||||
if (ctx.knownWatch() && record != null && ctx.recordExecution()) {
|
||||
try {
|
||||
if (ctx.overrideRecordOnConflict()) {
|
||||
historyStore.forcePut(record);
|
||||
} else {
|
||||
historyStore.put(record);
|
||||
if (ctx.knownWatch()) {
|
||||
if (record != null && ctx.recordExecution()) {
|
||||
try {
|
||||
if (ctx.overrideRecordOnConflict()) {
|
||||
historyStore.forcePut(record);
|
||||
} else {
|
||||
historyStore.put(record);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to update watch record [{}]", ctx.id()), e);
|
||||
// TODO log watch record in logger, when saving in history store failed, otherwise the info is gone!
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to update watch record [{}]", ctx.id()), e);
|
||||
// TODO log watch record in logger, when saving in history store failed, otherwise the info is gone!
|
||||
}
|
||||
}
|
||||
try {
|
||||
triggeredWatchStore.delete(ctx.id());
|
||||
} catch (Exception e) {
|
||||
logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to delete triggered watch [{}]", ctx.id()), e);
|
||||
try {
|
||||
triggeredWatchStore.delete(ctx.id());
|
||||
} catch (Exception e) {
|
||||
logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to delete triggered watch [{}]", ctx.id()), e);
|
||||
}
|
||||
}
|
||||
currentExecutions.remove(ctx.watch().id());
|
||||
logger.trace("finished [{}]/[{}]", ctx.watch().id(), ctx.id());
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.elasticsearch.xpack.watcher.input.Input;
|
|||
import org.elasticsearch.xpack.watcher.support.xcontent.XContentSource;
|
||||
import org.elasticsearch.xpack.watcher.transform.ExecutableTransform;
|
||||
import org.elasticsearch.xpack.watcher.transform.Transform;
|
||||
import org.elasticsearch.xpack.watcher.trigger.manual.ManualTriggerEvent;
|
||||
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent;
|
||||
import org.elasticsearch.xpack.watcher.watch.Payload;
|
||||
import org.elasticsearch.xpack.watcher.watch.Watch;
|
||||
|
@ -801,6 +802,45 @@ public class ExecutionServiceTests extends ESTestCase {
|
|||
verify(historyStore, times(1)).forcePut(any(WatchRecord.class));
|
||||
}
|
||||
|
||||
public void testThatTriggeredWatchDeletionHappensOnlyIfWatchExists() throws Exception {
|
||||
Watch watch = mock(Watch.class);
|
||||
when(watch.id()).thenReturn("_id");
|
||||
GetResponse getResponse = mock(GetResponse.class);
|
||||
when(getResponse.isExists()).thenReturn(true);
|
||||
mockGetWatchResponse(client, "_id", getResponse);
|
||||
|
||||
DateTime now = new DateTime(clock.millis());
|
||||
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
|
||||
WatchExecutionContext context = ManualExecutionContext.builder(watch, false, new ManualTriggerEvent("foo", event),
|
||||
timeValueSeconds(5)).build();
|
||||
|
||||
// action throttler, no throttling
|
||||
Throttler.Result throttleResult = mock(Throttler.Result.class);
|
||||
when(throttleResult.throttle()).thenReturn(false);
|
||||
ActionThrottler throttler = mock(ActionThrottler.class);
|
||||
when(throttler.throttle("_action", context)).thenReturn(throttleResult);
|
||||
|
||||
// the action
|
||||
Action.Result actionResult = mock(Action.Result.class);
|
||||
when(actionResult.type()).thenReturn("_action_type");
|
||||
when(actionResult.status()).thenReturn(Action.Result.Status.SUCCESS);
|
||||
ExecutableAction action = mock(ExecutableAction.class);
|
||||
when(action.type()).thenReturn("MY_AWESOME_TYPE");
|
||||
when(action.execute("_action", context, payload)).thenReturn(actionResult);
|
||||
|
||||
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, null, null, action);
|
||||
|
||||
WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now)));
|
||||
|
||||
when(watch.input()).thenReturn(input);
|
||||
when(watch.condition()).thenReturn(AlwaysCondition.INSTANCE);
|
||||
when(watch.actions()).thenReturn(Arrays.asList(actionWrapper));
|
||||
when(watch.status()).thenReturn(watchStatus);
|
||||
|
||||
executionService.execute(context);
|
||||
verify(triggeredWatchStore, never()).delete(any());
|
||||
}
|
||||
|
||||
public void testThatSingleWatchCannotBeExecutedConcurrently() throws Exception {
|
||||
WatchExecutionContext ctx = mock(WatchExecutionContext.class);
|
||||
Watch watch = mock(Watch.class);
|
||||
|
|
Loading…
Reference in New Issue