[ML] Fix simultaneous stop and force stop datafeed (#49367)

If a datafeed is stopped normally and force stopped at the same
time then it is possible that the force stop removes the
persistent task while the normal stop is performing actions.
Currently this causes the normal stop to error, but since
stopping a stopped datafeed is not an error this doesn't make
sense. Instead the force stop should just take precedence.

This is a followup to #49191 and should really have been
included in the changes in that PR.
This commit is contained in:
David Roberts 2019-11-20 12:36:15 +00:00
parent e3da60c23d
commit 20558cf61c
1 changed files with 6 additions and 6 deletions

View File

@ -16,7 +16,6 @@ import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
@ -259,9 +258,10 @@ public class TransportStopDatafeedAction extends TransportTasksAction<TransportS
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
if ((e instanceof ResourceNotFoundException &&
Strings.isAllOrWildcard(new String[]{request.getDatafeedId()}))) {
datafeedTask.stop("stop_datafeed (api)", request.getStopTimeout());
// We validated that the datafeed names supplied in the request existed when we started processing the action.
// If the related task for one of them doesn't exist at this point then it must have been removed by a
// simultaneous force stop request. This is not an error.
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) {
listener.onResponse(new StopDatafeedAction.Response(true));
} else {
listener.onFailure(e);
@ -269,7 +269,7 @@ public class TransportStopDatafeedAction extends TransportTasksAction<TransportS
}
@Override
protected void doRun() throws Exception {
protected void doRun() {
datafeedTask.stop("stop_datafeed (api)", request.getStopTimeout());
listener.onResponse(new StopDatafeedAction.Response(true));
}
@ -343,7 +343,7 @@ public class TransportStopDatafeedAction extends TransportTasksAction<TransportS
throw org.elasticsearch.ExceptionsHelper
.convertToElastic(failedNodeExceptions.get(0));
} else {
// This can happen we the actual task in the node no longer exists,
// This can happen when the actual task in the node no longer exists,
// which means the datafeed(s) have already been stopped.
return new StopDatafeedAction.Response(true);
}