Watcher: Prevent watch acknowledgement while watch is executing (elastic/x-pack-elasticsearch#4224)

When a watch is acknowledged, while it is also being executed, the
acknowledgment information can get lost. The reason for this is the
fact, that the execution writes the watch status inside of the watch
regardless, if other writes happened inbetween to make sure the
execution state is caught.

This commit checks the current executions in the execution service and
aborts the API call, if the specified watch ID can be found in those.

Note, this does not prevent this issue fully, as a watch could be
triggered, while the acknowledgement update is running, but it does
reduce the surface area of this problem. In order to properly solve
this, indexing the watch status as part of a watch would need to be
changed.

relates elastic/x-pack-elasticsearch#4003

Original commit: elastic/x-pack-elasticsearch@d7e218b2ac
This commit is contained in:
Alexander Reelsen 2018-04-04 10:15:20 +02:00 committed by GitHub
parent 681a5ba624
commit 2da6d14859
3 changed files with 116 additions and 1 deletions

View File

@ -7,6 +7,10 @@ to manually throttle execution of the watch's actions. An action's
_acknowledgement state_ is stored in the `status.actions.<id>.ack.state` _acknowledgement state_ is stored in the `status.actions.<id>.ack.state`
structure. structure.
IMPORTANT: If the specified watch is currently being executed, this API will return
an error. The reason for this is to prevent overwriting of the watch status from a watch
execution.
[float] [float]
==== Request ==== Request

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.xpack.watcher.transport.actions.ack; package org.elasticsearch.xpack.watcher.transport.actions.ack;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetRequest;
@ -22,14 +23,17 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.watcher.actions.ActionWrapper; import org.elasticsearch.xpack.core.watcher.actions.ActionWrapper;
import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionSnapshot;
import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchAction; import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchAction;
import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchRequest; import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchRequest;
import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchResponse; import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchResponse;
import org.elasticsearch.xpack.core.watcher.watch.Watch; import org.elasticsearch.xpack.core.watcher.watch.Watch;
import org.elasticsearch.xpack.core.watcher.watch.WatchField; import org.elasticsearch.xpack.core.watcher.watch.WatchField;
import org.elasticsearch.xpack.watcher.execution.ExecutionService;
import org.elasticsearch.xpack.watcher.transport.actions.WatcherTransportAction; import org.elasticsearch.xpack.watcher.transport.actions.WatcherTransportAction;
import org.elasticsearch.xpack.watcher.watch.WatchParser; import org.elasticsearch.xpack.watcher.watch.WatchParser;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -47,21 +51,32 @@ public class TransportAckWatchAction extends WatcherTransportAction<AckWatchRequ
private final Clock clock; private final Clock clock;
private final WatchParser parser; private final WatchParser parser;
private ExecutionService executionService;
private final Client client; private final Client client;
@Inject @Inject
public TransportAckWatchAction(Settings settings, TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters, public TransportAckWatchAction(Settings settings, TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Clock clock, XPackLicenseState licenseState, IndexNameExpressionResolver indexNameExpressionResolver, Clock clock, XPackLicenseState licenseState,
WatchParser parser, Client client) { WatchParser parser, ExecutionService executionService, Client client) {
super(settings, AckWatchAction.NAME, transportService, threadPool, actionFilters, indexNameExpressionResolver, super(settings, AckWatchAction.NAME, transportService, threadPool, actionFilters, indexNameExpressionResolver,
licenseState, AckWatchRequest::new); licenseState, AckWatchRequest::new);
this.clock = clock; this.clock = clock;
this.parser = parser; this.parser = parser;
this.executionService = executionService;
this.client = client; this.client = client;
} }
@Override @Override
protected void doExecute(AckWatchRequest request, ActionListener<AckWatchResponse> listener) { protected void doExecute(AckWatchRequest request, ActionListener<AckWatchResponse> listener) {
// if the watch to be acked is running currently, reject this request
List<WatchExecutionSnapshot> snapshots = executionService.currentExecutions();
boolean isWatchRunning = snapshots.stream().anyMatch(s -> s.watchId().equals(request.getWatchId()));
if (isWatchRunning) {
listener.onFailure(new ElasticsearchStatusException("watch[{}] is running currently, cannot ack until finished",
RestStatus.CONFLICT, request.getWatchId()));
return;
}
GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId()) GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId())
.preference(Preference.LOCAL.type()).realtime(true); .preference(Preference.LOCAL.type()).realtime(true);

View File

@ -0,0 +1,96 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.transport.actions.ack;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionSnapshot;
import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchRequest;
import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchResponse;
import org.elasticsearch.xpack.core.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.execution.ExecutionService;
import org.elasticsearch.xpack.watcher.watch.WatchParser;
import org.junit.Before;
import java.time.Clock;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TransportAckWatchActionTests extends ESTestCase {
private TransportAckWatchAction action;
private ExecutionService executionService;
private Client client;
@Before
public void setupAction() {
TransportService transportService = mock(TransportService.class);
ThreadPool threadPool = mock(ThreadPool.class);
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
when(threadPool.getThreadContext()).thenReturn(threadContext);
WatchParser watchParser = mock(WatchParser.class);
executionService = mock(ExecutionService.class);
client = mock(Client.class);
when(client.threadPool()).thenReturn(threadPool);
action = new TransportAckWatchAction(Settings.EMPTY, transportService, threadPool,
new ActionFilters(Collections.emptySet()), new IndexNameExpressionResolver(Settings.EMPTY),
Clock.systemUTC(), new XPackLicenseState(Settings.EMPTY), watchParser, executionService, client);
}
public void testWatchNotFound() {
String watchId = "my_watch_id";
doAnswer(invocation -> {
ActionListener<GetResponse> listener = (ActionListener<GetResponse>) invocation.getArguments()[1];
listener.onResponse(new GetResponse(new GetResult(Watch.INDEX, Watch.DOC_TYPE, watchId, -1, false,
BytesArray.EMPTY, Collections.emptyMap())));
return null;
}).when(client).get(anyObject(), anyObject());
AckWatchRequest ackWatchRequest = new AckWatchRequest(watchId);
PlainActionFuture<AckWatchResponse> listener = PlainActionFuture.newFuture();
action.doExecute(ackWatchRequest, listener);
ExecutionException exception = expectThrows(ExecutionException.class, listener::get);
ElasticsearchException e = (ElasticsearchException) exception.getCause();
assertThat(e.getMessage(), is("Watch with id [" + watchId + "] does not exist"));
}
public void testThatWatchCannotBeAckedWhileRunning() {
String watchId = "my_watch_id";
WatchExecutionSnapshot snapshot = mock(WatchExecutionSnapshot.class);
when(snapshot.watchId()).thenReturn(watchId);
when(executionService.currentExecutions()).thenReturn(Collections.singletonList(snapshot));
AckWatchRequest ackWatchRequest = new AckWatchRequest(watchId);
PlainActionFuture<AckWatchResponse> listener = PlainActionFuture.newFuture();
action.doExecute(ackWatchRequest, listener);
ExecutionException exception = expectThrows(ExecutionException.class, listener::get);
ElasticsearchException e = (ElasticsearchException) exception.getCause();
assertThat(e.getMessage(), is("watch[my_watch_id] is running currently, cannot ack until finished"));
assertThat(e.status(), is(RestStatus.CONFLICT));
}
}