[7.x][ML] Allow force stopping failed and stopping DF analytics (#54650) (#54712)

Force stopping a failed job used to work but it
now puts the job in `stopping` state and hangs.
In addition, force stopping a `stopping` job is
not handled.

This commit addresses those issues with force
stopping data frame analytics. It inlines the
approach with that followed for anomaly detection
jobs.

Backport of #54650
This commit is contained in:
Dimitris Athanasiou 2020-04-03 16:08:06 +03:00 committed by GitHub
parent 7cef89e084
commit e8c0351fd8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 201 additions and 116 deletions

View File

@ -61,10 +61,6 @@ public final class MlTasks {
return DATA_FRAME_ANALYTICS_TASK_ID_PREFIX + id;
}
public static String dataFrameAnalyticsIdFromTaskId(String taskId) {
return taskId.replaceFirst(DATA_FRAME_ANALYTICS_TASK_ID_PREFIX, "");
}
@Nullable
public static PersistentTasksCustomMetadata.PersistentTask<?> getJobTask(String jobId, @Nullable PersistentTasksCustomMetadata tasks) {
return tasks == null ? null : tasks.getTask(jobTaskId(jobId));

View File

@ -169,12 +169,6 @@ public class MlTasksTests extends ESTestCase {
containsInAnyOrder("datafeed_without_assignment", "datafeed_without_node"));
}
public void testDataFrameAnalyticsTaskIds() {
String taskId = MlTasks.dataFrameAnalyticsTaskId("foo");
assertThat(taskId, equalTo("data_frame_analytics-foo"));
assertThat(MlTasks.dataFrameAnalyticsIdFromTaskId(taskId), equalTo("foo"));
}
public void testGetDataFrameAnalyticsState_GivenNullTask() {
DataFrameAnalyticsState state = MlTasks.getDataFrameAnalyticsState(null);
assertThat(state, equalTo(DataFrameAnalyticsState.STOPPED));

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.action;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
@ -21,6 +22,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.persistent.PersistentTasksService;
@ -40,13 +42,14 @@ import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfig
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Stops the persistent task for running data frame analytics.
@ -93,22 +96,20 @@ public class TransportStopDataFrameAnalyticsAction
ActionListener<Set<String>> expandedIdsListener = ActionListener.wrap(
expandedIds -> {
logger.debug("Resolved data frame analytics to stop: {}", expandedIds);
if (expandedIds.isEmpty()) {
PersistentTasksCustomMetadata tasks = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
AnalyticsByTaskState analyticsByTaskState = AnalyticsByTaskState.build(expandedIds, tasks);
if (analyticsByTaskState.isEmpty()) {
listener.onResponse(new StopDataFrameAnalyticsAction.Response(true));
return;
}
PersistentTasksCustomMetadata tasks = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
Set<String> analyticsToStop = findAnalyticsToStop(tasks, expandedIds, request.isForce());
request.setExpandedIds(analyticsToStop);
request.setNodes(findAllocatedNodesAndRemoveUnassignedTasks(analyticsToStop, tasks));
ActionListener<StopDataFrameAnalyticsAction.Response> finalListener = ActionListener.wrap(
r -> waitForTaskRemoved(expandedIds, request, r, listener),
listener::onFailure
);
super.doExecute(task, request, finalListener);
if (request.isForce()) {
forceStop(request, listener, tasks, analyticsByTaskState.getNonStopped());
} else {
normalStop(task, request, listener, tasks, analyticsByTaskState);
}
},
listener::onFailure
);
@ -116,50 +117,6 @@ public class TransportStopDataFrameAnalyticsAction
expandIds(state, request, expandedIdsListener);
}
/** Visible for testing */
static Set<String> findAnalyticsToStop(PersistentTasksCustomMetadata tasks, Set<String> ids, boolean force) {
Set<String> startedAnalytics = new HashSet<>();
Set<String> stoppingAnalytics = new HashSet<>();
Set<String> failedAnalytics = new HashSet<>();
sortAnalyticsByTaskState(ids, tasks, startedAnalytics, stoppingAnalytics, failedAnalytics);
if (force == false && failedAnalytics.isEmpty() == false) {
ElasticsearchStatusException e = failedAnalytics.size() == 1 ? ExceptionsHelper.conflictStatusException(
"cannot close data frame analytics [{}] because it failed, use force stop instead", failedAnalytics.iterator().next()) :
ExceptionsHelper.conflictStatusException("one or more data frame analytics are in failed state, " +
"use force stop instead");
throw e;
}
startedAnalytics.addAll(failedAnalytics);
return startedAnalytics;
}
private static void sortAnalyticsByTaskState(Set<String> analyticsIds, PersistentTasksCustomMetadata tasks,
Set<String> startedAnalytics, Set<String> stoppingAnalytics,
Set<String> failedAnalytics) {
for (String analyticsId : analyticsIds) {
switch (MlTasks.getDataFrameAnalyticsState(analyticsId, tasks)) {
case STARTING:
case STARTED:
case REINDEXING:
case ANALYZING:
startedAnalytics.add(analyticsId);
break;
case STOPPING:
stoppingAnalytics.add(analyticsId);
break;
case STOPPED:
break;
case FAILED:
failedAnalytics.add(analyticsId);
break;
default:
break;
}
}
}
private void expandIds(ClusterState clusterState, StopDataFrameAnalyticsAction.Request request,
ActionListener<Set<String>> expandedIdsListener) {
ActionListener<List<DataFrameAnalyticsConfig>> configsListener = ActionListener.wrap(
@ -179,7 +136,107 @@ public class TransportStopDataFrameAnalyticsAction
configProvider.getMultiple(request.getId(), request.allowNoMatch(), configsListener);
}
private String[] findAllocatedNodesAndRemoveUnassignedTasks(Set<String> analyticsIds, PersistentTasksCustomMetadata tasks) {
private void normalStop(Task task, StopDataFrameAnalyticsAction.Request request,
ActionListener<StopDataFrameAnalyticsAction.Response> listener,
PersistentTasksCustomMetadata tasks, AnalyticsByTaskState analyticsByTaskState) {
if (analyticsByTaskState.failed.isEmpty() == false) {
ElasticsearchStatusException e = analyticsByTaskState.failed.size() == 1 ? ExceptionsHelper.conflictStatusException(
"cannot close data frame analytics [{}] because it failed, use force stop instead",
analyticsByTaskState.failed.iterator().next()) :
ExceptionsHelper.conflictStatusException("one or more data frame analytics are in failed state, use force stop instead");
listener.onFailure(e);
return;
}
request.setExpandedIds(new HashSet<>(analyticsByTaskState.started));
request.setNodes(findAllocatedNodesAndRemoveUnassignedTasks(analyticsByTaskState.started, tasks));
// Wait for started and stopping analytics
Set<String> allAnalyticsToWaitFor = Stream.concat(
analyticsByTaskState.started.stream().map(MlTasks::dataFrameAnalyticsTaskId),
analyticsByTaskState.stopping.stream().map(MlTasks::dataFrameAnalyticsTaskId)
).collect(Collectors.toSet());
ActionListener<StopDataFrameAnalyticsAction.Response> finalListener = ActionListener.wrap(
r -> waitForTaskRemoved(allAnalyticsToWaitFor, request, r, listener),
e -> {
if (ExceptionsHelper.unwrapCause(e) instanceof FailedNodeException) {
// A node has dropped out of the cluster since we started executing the requests.
// Since stopping an already stopped analytics is not an error we can try again.
// The analytics that were running on the node that dropped out of the cluster
// will just have their persistent tasks cancelled. Analytics that were stopped
// by the previous attempt will be noops in the subsequent attempt.
doExecute(task, request, listener);
} else {
listener.onFailure(e);
}
}
);
super.doExecute(task, request, finalListener);
}
private void forceStop(StopDataFrameAnalyticsAction.Request request, ActionListener<StopDataFrameAnalyticsAction.Response> listener,
PersistentTasksCustomMetadata tasks, List<String> nonStoppedAnalytics) {
final AtomicInteger counter = new AtomicInteger();
final AtomicArray<Exception> failures = new AtomicArray<>(nonStoppedAnalytics.size());
for (String analyticsId : nonStoppedAnalytics) {
PersistentTasksCustomMetadata.PersistentTask<?> analyticsTask = MlTasks.getDataFrameAnalyticsTask(analyticsId, tasks);
if (analyticsTask != null) {
persistentTasksService.sendRemoveRequest(analyticsTask.getId(), ActionListener.wrap(
removedTask -> {
if (counter.incrementAndGet() == nonStoppedAnalytics.size()) {
sendResponseOrFailure(request.getId(), listener, failures);
}
},
e -> {
final int slot = counter.incrementAndGet();
// We validated that the analytics ids supplied in the request existed when we started processing the action.
// If the related tasks don't exist at this point then they must have been stopped by a simultaneous stop request.
// This is not an error.
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException == false) {
failures.set(slot - 1, e);
}
if (slot == nonStoppedAnalytics.size()) {
sendResponseOrFailure(request.getId(), listener, failures);
}
}
));
} else {
// This should not happen, because nonStoppedAnalytics
// were derived from the same tasks that were passed to this method
String msg = "Requested data frame analytics [" + analyticsId + "] be force-stopped, but no task could be found.";
assert analyticsTask != null : msg;
logger.error(msg);
final int slot = counter.incrementAndGet();
failures.set(slot - 1, new RuntimeException(msg));
if (slot == nonStoppedAnalytics.size()) {
sendResponseOrFailure(request.getId(), listener, failures);
}
}
}
}
private void sendResponseOrFailure(String analyticsId, ActionListener<StopDataFrameAnalyticsAction.Response> listener,
AtomicArray<Exception> failures) {
List<Exception> caughtExceptions = failures.asList();
if (caughtExceptions.size() == 0) {
listener.onResponse(new StopDataFrameAnalyticsAction.Response(true));
return;
}
String msg = "Failed to stop data frame analytics [" + analyticsId + "] with [" + caughtExceptions.size()
+ "] failures, rethrowing last, all Exceptions: ["
+ caughtExceptions.stream().map(Exception::getMessage).collect(Collectors.joining(", "))
+ "]";
ElasticsearchException e = new ElasticsearchException(msg, caughtExceptions.get(0));
listener.onFailure(e);
}
private String[] findAllocatedNodesAndRemoveUnassignedTasks(List<String> analyticsIds, PersistentTasksCustomMetadata tasks) {
List<String> nodes = new ArrayList<>();
for (String analyticsId : analyticsIds) {
PersistentTasksCustomMetadata.PersistentTask<?> task = MlTasks.getDataFrameAnalyticsTask(analyticsId, tasks);
@ -259,11 +316,11 @@ public class TransportStopDataFrameAnalyticsAction
}));
}
void waitForTaskRemoved(Set<String> analyticsIds, StopDataFrameAnalyticsAction.Request request,
StopDataFrameAnalyticsAction.Response response,
ActionListener<StopDataFrameAnalyticsAction.Response> listener) {
void waitForTaskRemoved(Set<String> taskIds, StopDataFrameAnalyticsAction.Request request,
StopDataFrameAnalyticsAction.Response response,
ActionListener<StopDataFrameAnalyticsAction.Response> listener) {
persistentTasksService.waitForPersistentTasksCondition(persistentTasks ->
filterPersistentTasks(persistentTasks, analyticsIds).isEmpty(),
persistentTasks.findTasks(MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, t -> taskIds.contains(t.getId())).isEmpty(),
request.getTimeout(), ActionListener.wrap(
booleanResponse -> {
auditor.info(request.getId(), Messages.DATA_FRAME_ANALYTICS_AUDIT_STOPPED);
@ -273,9 +330,58 @@ public class TransportStopDataFrameAnalyticsAction
));
}
private static Collection<PersistentTasksCustomMetadata.PersistentTask<?>> filterPersistentTasks(
PersistentTasksCustomMetadata persistentTasks, Set<String> analyticsIds) {
return persistentTasks.findTasks(MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME,
t -> analyticsIds.contains(MlTasks.dataFrameAnalyticsIdFromTaskId(t.getId())));
// Visible for testing
static class AnalyticsByTaskState {
final List<String> started;
final List<String> stopping;
final List<String> failed;
private AnalyticsByTaskState(List<String> started, List<String> stopping, List<String> failed) {
this.started = Collections.unmodifiableList(started);
this.stopping = Collections.unmodifiableList(stopping);
this.failed = Collections.unmodifiableList(failed);
}
boolean isEmpty() {
return started.isEmpty() && stopping.isEmpty() && failed.isEmpty();
}
List<String> getNonStopped() {
List<String> nonStopped = new ArrayList<>();
nonStopped.addAll(started);
nonStopped.addAll(stopping);
nonStopped.addAll(failed);
return nonStopped;
}
static AnalyticsByTaskState build(Set<String> analyticsIds, PersistentTasksCustomMetadata tasks) {
List<String> started = new ArrayList<>();
List<String> stopping = new ArrayList<>();
List<String> failed = new ArrayList<>();
for (String analyticsId : analyticsIds) {
DataFrameAnalyticsState taskState = MlTasks.getDataFrameAnalyticsState(analyticsId, tasks);
switch (taskState) {
case STARTING:
case STARTED:
case REINDEXING:
case ANALYZING:
started.add(analyticsId);
break;
case STOPPING:
stopping.add(analyticsId);
break;
case STOPPED:
break;
case FAILED:
failed.add(analyticsId);
break;
default:
assert false : "unknown task state " + taskState;
}
}
return new AnalyticsByTaskState(started, stopping, failed);
}
}
}

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.dataframe;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
@ -29,6 +30,7 @@ import org.elasticsearch.index.reindex.ReindexAction;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.script.Script;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.MlStatsIndex;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
@ -217,7 +219,14 @@ public class DataFrameAnalyticsManager {
reindexResponse.getTook()));
startAnalytics(task, config);
},
error -> task.setFailed(ExceptionsHelper.unwrapCause(error).getMessage())
error -> {
if (error instanceof TaskCancelledException && task.isStopping()) {
LOGGER.debug(new ParameterizedMessage("[{}] Caught task cancelled exception while task is stopping",
config.getId()), error);
} else {
task.setFailed(ExceptionsHelper.unwrapCause(error).getMessage());
}
}
);
// Reindex

View File

@ -105,6 +105,7 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S
@Override
protected void onCancelled() {
stop(getReasonCancelled(), TimeValue.ZERO);
markAsCompleted();
}
@Override

View File

@ -5,15 +5,14 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.Version;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
import org.elasticsearch.xpack.ml.action.TransportStopDataFrameAnalyticsAction.AnalyticsByTaskState;
import java.util.Arrays;
import java.util.Collections;
@ -21,11 +20,19 @@ import java.util.HashSet;
import java.util.Set;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
public class TransportStopDataFrameAnalyticsActionTests extends ESTestCase {
public void testFindAnalyticsToStop_GivenOneFailedTaskAndNotForce() {
public void testAnalyticsByTaskState_GivenEmpty() {
PersistentTasksCustomMetadata.Builder tasksBuilder = PersistentTasksCustomMetadata.builder();
AnalyticsByTaskState analyticsByTaskState = AnalyticsByTaskState.build(Collections.emptySet(), tasksBuilder.build());
assertThat(analyticsByTaskState.isEmpty(), is(true));
}
public void testAnalyticsByTaskState_GivenAllStates() {
PersistentTasksCustomMetadata.Builder tasksBuilder = PersistentTasksCustomMetadata.builder();
addAnalyticsTask(tasksBuilder, "starting", "foo-node", null);
addAnalyticsTask(tasksBuilder, "started", "foo-node", DataFrameAnalyticsState.STARTED);
@ -37,42 +44,14 @@ public class TransportStopDataFrameAnalyticsActionTests extends ESTestCase {
Set<String> ids = new HashSet<>(Arrays.asList("starting", "started", "reindexing", "analyzing", "stopping", "stopped", "failed"));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> TransportStopDataFrameAnalyticsAction.findAnalyticsToStop(tasksBuilder.build(), ids, false));
AnalyticsByTaskState analyticsByTaskState = AnalyticsByTaskState.build(ids, tasksBuilder.build());
assertThat(e.status(), equalTo(RestStatus.CONFLICT));
assertThat(e.getMessage(), equalTo("cannot close data frame analytics [failed] because it failed, use force stop instead"));
}
public void testFindAnalyticsToStop_GivenTwoFailedTasksAndNotForce() {
PersistentTasksCustomMetadata.Builder tasksBuilder = PersistentTasksCustomMetadata.builder();
addAnalyticsTask(tasksBuilder, "failed", "foo-node", DataFrameAnalyticsState.FAILED);
addAnalyticsTask(tasksBuilder, "another_failed", "foo-node", DataFrameAnalyticsState.FAILED);
Set<String> ids = new HashSet<>(Arrays.asList("failed", "another_failed"));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> TransportStopDataFrameAnalyticsAction.findAnalyticsToStop(tasksBuilder.build(), ids, false));
assertThat(e.status(), equalTo(RestStatus.CONFLICT));
assertThat(e.getMessage(), equalTo("one or more data frame analytics are in failed state, use force stop instead"));
}
public void testFindAnalyticsToStop_GivenFailedTaskAndForce() {
PersistentTasksCustomMetadata.Builder tasksBuilder = PersistentTasksCustomMetadata.builder();
addAnalyticsTask(tasksBuilder, "starting", "foo-node", null);
addAnalyticsTask(tasksBuilder, "started", "foo-node", DataFrameAnalyticsState.STARTED);
addAnalyticsTask(tasksBuilder, "reindexing", "foo-node", DataFrameAnalyticsState.REINDEXING);
addAnalyticsTask(tasksBuilder, "analyzing", "foo-node", DataFrameAnalyticsState.ANALYZING);
addAnalyticsTask(tasksBuilder, "stopping", "foo-node", DataFrameAnalyticsState.STOPPING);
addAnalyticsTask(tasksBuilder, "stopped", "foo-node", DataFrameAnalyticsState.STOPPED);
addAnalyticsTask(tasksBuilder, "failed", "foo-node", DataFrameAnalyticsState.FAILED);
Set<String> ids = new HashSet<>(Arrays.asList("starting", "started", "reindexing", "analyzing", "stopping", "stopped", "failed"));
Set<String> analyticsToStop = TransportStopDataFrameAnalyticsAction.findAnalyticsToStop(tasksBuilder.build(), ids, true);
assertThat(analyticsToStop, containsInAnyOrder("starting", "started", "reindexing", "analyzing", "failed"));
assertThat(analyticsByTaskState.isEmpty(), is(false));
assertThat(analyticsByTaskState.started, containsInAnyOrder("starting", "started", "reindexing", "analyzing"));
assertThat(analyticsByTaskState.stopping, containsInAnyOrder("stopping"));
assertThat(analyticsByTaskState.failed, containsInAnyOrder("failed"));
assertThat(analyticsByTaskState.getNonStopped(), containsInAnyOrder(
"starting", "started", "reindexing", "analyzing", "stopping", "failed"));;
}
private static void addAnalyticsTask(PersistentTasksCustomMetadata.Builder builder, String analyticsId, String nodeId,