Watcher: Fix check for currently executed watches (#31137)

The ack watch action has a check for currently executed watches, to make
sure that currently running watches cannot be acknowledged. This check
only checked on the coordinating node for watches being executed, but should
have checked the whole cluster using a WatcherStatsRequest, which is
being switched to in this commit.
This commit is contained in:
Alexander Reelsen 2018-07-04 17:55:53 +02:00 committed by GitHub
parent 4328470dd8
commit 0a2ef59c41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 85 additions and 61 deletions

View File

@ -25,13 +25,13 @@ import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.watcher.actions.ActionWrapper; import org.elasticsearch.xpack.core.watcher.actions.ActionWrapper;
import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionSnapshot;
import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchAction; import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchAction;
import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchRequest; import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchRequest;
import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchResponse; import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchResponse;
import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsAction;
import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsRequest;
import org.elasticsearch.xpack.core.watcher.watch.Watch; import org.elasticsearch.xpack.core.watcher.watch.Watch;
import org.elasticsearch.xpack.core.watcher.watch.WatchField; import org.elasticsearch.xpack.core.watcher.watch.WatchField;
import org.elasticsearch.xpack.watcher.execution.ExecutionService;
import org.elasticsearch.xpack.watcher.transport.actions.WatcherTransportAction; import org.elasticsearch.xpack.watcher.transport.actions.WatcherTransportAction;
import org.elasticsearch.xpack.watcher.watch.WatchParser; import org.elasticsearch.xpack.watcher.watch.WatchParser;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -49,83 +49,86 @@ public class TransportAckWatchAction extends WatcherTransportAction<AckWatchRequ
private final Clock clock; private final Clock clock;
private final WatchParser parser; private final WatchParser parser;
private ExecutionService executionService;
private final Client client; private final Client client;
@Inject @Inject
public TransportAckWatchAction(Settings settings, TransportService transportService, ActionFilters actionFilters, public TransportAckWatchAction(Settings settings, TransportService transportService, ActionFilters actionFilters,
Clock clock, XPackLicenseState licenseState, WatchParser parser, ExecutionService executionService, Clock clock, XPackLicenseState licenseState, WatchParser parser,
Client client) { Client client) {
super(settings, AckWatchAction.NAME, transportService, actionFilters, licenseState, AckWatchRequest::new); super(settings, AckWatchAction.NAME, transportService, actionFilters, licenseState, AckWatchRequest::new);
this.clock = clock; this.clock = clock;
this.parser = parser; this.parser = parser;
this.executionService = executionService;
this.client = client; this.client = client;
} }
@Override @Override
protected void doExecute(AckWatchRequest request, ActionListener<AckWatchResponse> listener) { protected void doExecute(AckWatchRequest request, ActionListener<AckWatchResponse> listener) {
// if the watch to be acked is running currently, reject this request WatcherStatsRequest watcherStatsRequest = new WatcherStatsRequest();
List<WatchExecutionSnapshot> snapshots = executionService.currentExecutions(); watcherStatsRequest.includeCurrentWatches(true);
boolean isWatchRunning = snapshots.stream().anyMatch(s -> s.watchId().equals(request.getWatchId()));
if (isWatchRunning) { executeAsyncWithOrigin(client, WATCHER_ORIGIN, WatcherStatsAction.INSTANCE, watcherStatsRequest, ActionListener.wrap(response -> {
listener.onFailure(new ElasticsearchStatusException("watch[{}] is running currently, cannot ack until finished", boolean isWatchRunning = response.getNodes().stream()
.anyMatch(node -> node.getSnapshots().stream().anyMatch(snapshot -> snapshot.watchId().equals(request.getWatchId())));
if (isWatchRunning) {
listener.onFailure(new ElasticsearchStatusException("watch[{}] is running currently, cannot ack until finished",
RestStatus.CONFLICT, request.getWatchId())); RestStatus.CONFLICT, request.getWatchId()));
return; } else {
} GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId())
.preference(Preference.LOCAL.type()).realtime(true);
GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId()) executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, getRequest,
.preference(Preference.LOCAL.type()).realtime(true); ActionListener.<GetResponse>wrap(getResponse -> {
if (getResponse.isExists() == false) {
executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, getRequest, listener.onFailure(new ResourceNotFoundException("Watch with id [{}] does not exist", request.getWatchId()));
ActionListener.<GetResponse>wrap((response) -> { } else {
if (response.isExists() == false) { DateTime now = new DateTime(clock.millis(), UTC);
listener.onFailure(new ResourceNotFoundException("Watch with id [{}] does not exist", request.getWatchId())); Watch watch = parser.parseWithSecrets(request.getWatchId(), true, getResponse.getSourceAsBytesRef(),
} else {
DateTime now = new DateTime(clock.millis(), UTC);
Watch watch = parser.parseWithSecrets(request.getWatchId(), true, response.getSourceAsBytesRef(),
now, XContentType.JSON); now, XContentType.JSON);
watch.version(response.getVersion()); watch.version(getResponse.getVersion());
watch.status().version(response.getVersion()); watch.status().version(getResponse.getVersion());
String[] actionIds = request.getActionIds(); String[] actionIds = request.getActionIds();
if (actionIds == null || actionIds.length == 0) { if (actionIds == null || actionIds.length == 0) {
actionIds = new String[]{WatchField.ALL_ACTIONS_ID}; actionIds = new String[]{WatchField.ALL_ACTIONS_ID};
} }
// exit early in case nothing changes // exit early in case nothing changes
boolean isChanged = watch.ack(now, actionIds); boolean isChanged = watch.ack(now, actionIds);
if (isChanged == false) { if (isChanged == false) {
listener.onResponse(new AckWatchResponse(watch.status())); listener.onResponse(new AckWatchResponse(watch.status()));
return; return;
} }
UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId()); UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId());
// this may reject this action, but prevents concurrent updates from a watch execution // this may reject this action, but prevents concurrent updates from a watch execution
updateRequest.version(response.getVersion()); updateRequest.version(getResponse.getVersion());
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
XContentBuilder builder = jsonBuilder(); XContentBuilder builder = jsonBuilder();
builder.startObject() builder.startObject()
.startObject(WatchField.STATUS.getPreferredName()) .startObject(WatchField.STATUS.getPreferredName())
.startObject("actions"); .startObject("actions");
List<String> actionIdsAsList = Arrays.asList(actionIds); List<String> actionIdsAsList = Arrays.asList(actionIds);
boolean updateAll = actionIdsAsList.contains("_all"); boolean updateAll = actionIdsAsList.contains("_all");
for (ActionWrapper actionWrapper : watch.actions()) { for (ActionWrapper actionWrapper : watch.actions()) {
if (updateAll || actionIdsAsList.contains(actionWrapper.id())) { if (updateAll || actionIdsAsList.contains(actionWrapper.id())) {
builder.startObject(actionWrapper.id()) builder.startObject(actionWrapper.id())
.field("ack", watch.status().actionStatus(actionWrapper.id()).ackStatus(), ToXContent.EMPTY_PARAMS) .field("ack", watch.status().actionStatus(actionWrapper.id()).ackStatus(), ToXContent.EMPTY_PARAMS)
.endObject(); .endObject();
}
} }
}
builder.endObject().endObject().endObject(); builder.endObject().endObject().endObject();
updateRequest.doc(builder); updateRequest.doc(builder);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, updateRequest, executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, updateRequest,
ActionListener.<UpdateResponse>wrap( ActionListener.<UpdateResponse>wrap(
(updateResponse) -> listener.onResponse(new AckWatchResponse(watch.status())), (updateResponse) -> listener.onResponse(new AckWatchResponse(watch.status())),
listener::onFailure), client::update); listener::onFailure), client::update);
} }
}, listener::onFailure), client::get); }, listener::onFailure), client::get);
}
}, listener::onFailure));
} }
} }

View File

@ -6,11 +6,15 @@
package org.elasticsearch.xpack.watcher.transport.actions.ack; package org.elasticsearch.xpack.watcher.transport.actions.ack;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.ThreadContext;
@ -20,11 +24,13 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.watcher.WatcherMetaData;
import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionSnapshot; import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionSnapshot;
import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchRequest; import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchRequest;
import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchResponse; import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchResponse;
import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsAction;
import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsResponse;
import org.elasticsearch.xpack.core.watcher.watch.Watch; import org.elasticsearch.xpack.core.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.execution.ExecutionService;
import org.elasticsearch.xpack.watcher.watch.WatchParser; import org.elasticsearch.xpack.watcher.watch.WatchParser;
import org.junit.Before; import org.junit.Before;
@ -34,6 +40,7 @@ import java.util.concurrent.ExecutionException;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -41,7 +48,6 @@ import static org.mockito.Mockito.when;
public class TransportAckWatchActionTests extends ESTestCase { public class TransportAckWatchActionTests extends ESTestCase {
private TransportAckWatchAction action; private TransportAckWatchAction action;
private ExecutionService executionService;
private Client client; private Client client;
@Before @Before
@ -51,11 +57,10 @@ public class TransportAckWatchActionTests extends ESTestCase {
ThreadContext threadContext = new ThreadContext(Settings.EMPTY); ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
when(threadPool.getThreadContext()).thenReturn(threadContext); when(threadPool.getThreadContext()).thenReturn(threadContext);
WatchParser watchParser = mock(WatchParser.class); WatchParser watchParser = mock(WatchParser.class);
executionService = mock(ExecutionService.class);
client = mock(Client.class); client = mock(Client.class);
when(client.threadPool()).thenReturn(threadPool); when(client.threadPool()).thenReturn(threadPool);
action = new TransportAckWatchAction(Settings.EMPTY, transportService, new ActionFilters(Collections.emptySet()), action = new TransportAckWatchAction(Settings.EMPTY, transportService, new ActionFilters(Collections.emptySet()),
Clock.systemUTC(), new XPackLicenseState(Settings.EMPTY), watchParser, executionService, client); Clock.systemUTC(), new XPackLicenseState(Settings.EMPTY), watchParser, client);
} }
public void testWatchNotFound() { public void testWatchNotFound() {
@ -67,6 +72,13 @@ public class TransportAckWatchActionTests extends ESTestCase {
return null; return null;
}).when(client).get(anyObject(), anyObject()); }).when(client).get(anyObject(), anyObject());
doAnswer(invocation -> {
ContextPreservingActionListener listener = (ContextPreservingActionListener) invocation.getArguments()[2];
listener.onResponse(new WatcherStatsResponse(new ClusterName("clusterName"), new WatcherMetaData(false),
Collections.emptyList(), Collections.emptyList()));
return null;
}).when(client).execute(eq(WatcherStatsAction.INSTANCE), anyObject(), anyObject());
AckWatchRequest ackWatchRequest = new AckWatchRequest(watchId); AckWatchRequest ackWatchRequest = new AckWatchRequest(watchId);
PlainActionFuture<AckWatchResponse> listener = PlainActionFuture.newFuture(); PlainActionFuture<AckWatchResponse> listener = PlainActionFuture.newFuture();
action.doExecute(ackWatchRequest, listener); action.doExecute(ackWatchRequest, listener);
@ -78,9 +90,18 @@ public class TransportAckWatchActionTests extends ESTestCase {
public void testThatWatchCannotBeAckedWhileRunning() { public void testThatWatchCannotBeAckedWhileRunning() {
String watchId = "my_watch_id"; String watchId = "my_watch_id";
WatchExecutionSnapshot snapshot = mock(WatchExecutionSnapshot.class);
when(snapshot.watchId()).thenReturn(watchId); doAnswer(invocation -> {
when(executionService.currentExecutions()).thenReturn(Collections.singletonList(snapshot)); ContextPreservingActionListener listener = (ContextPreservingActionListener) invocation.getArguments()[2];
DiscoveryNode discoveryNode = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT);
WatcherStatsResponse.Node node = new WatcherStatsResponse.Node(discoveryNode);
WatchExecutionSnapshot snapshot = mock(WatchExecutionSnapshot.class);
when(snapshot.watchId()).thenReturn(watchId);
node.setSnapshots(Collections.singletonList(snapshot));
listener.onResponse(new WatcherStatsResponse(new ClusterName("clusterName"),
new WatcherMetaData(false), Collections.singletonList(node), Collections.emptyList()));
return null;
}).when(client).execute(eq(WatcherStatsAction.INSTANCE), anyObject(), anyObject());
AckWatchRequest ackWatchRequest = new AckWatchRequest(watchId); AckWatchRequest ackWatchRequest = new AckWatchRequest(watchId);
PlainActionFuture<AckWatchResponse> listener = PlainActionFuture.newFuture(); PlainActionFuture<AckWatchResponse> listener = PlainActionFuture.newFuture();
@ -91,4 +112,4 @@ public class TransportAckWatchActionTests extends ESTestCase {
assertThat(e.getMessage(), is("watch[my_watch_id] is running currently, cannot ack until finished")); assertThat(e.getMessage(), is("watch[my_watch_id] is running currently, cannot ack until finished"));
assertThat(e.status(), is(RestStatus.CONFLICT)); assertThat(e.status(), is(RestStatus.CONFLICT));
} }
} }