[ML][Data Frame] responding with 409 status code when failing _stop (#44231) (#44276)

* [ML][Data Frame] responding with appropriate status code when failing _stop

* adding null checks for persistent task data

* addressing PR comments
This commit is contained in:
Benjamin Trent 2019-07-12 10:10:24 -05:00 committed by GitHub
parent e23ecc5838
commit 68cd675892
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 139 additions and 5 deletions

View File

@ -32,6 +32,9 @@ public class DataFrameMessages {
public static final String DATA_FRAME_FAILED_TO_PERSIST_STATS = "Failed to persist data frame statistics for transform [{0}]";
public static final String DATA_FRAME_UNKNOWN_TRANSFORM_STATS = "Statistics for transform [{0}] could not be found";
public static final String DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM =
"Unable to stop data frame transform [{0}] as it is in a failed state with reason [{1}]." +
" Use force stop to stop the data frame transform.";
public static final String FAILED_TO_CREATE_DESTINATION_INDEX = "Could not create destination index [{0}] for transform [{1}]";
public static final String FAILED_TO_LOAD_TRANSFORM_CONFIGURATION =
"Failed to load data frame transform configuration for transform [{0}]";

View File

@ -161,7 +161,8 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
String config = "{ \"dest\": {\"index\":\"" + dataFrameIndex + "\"},"
+ " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"},"
+ " \"sync\": {\"time\":{\"field\": \"timestamp\", \"delay\": \"15m\"}},"
//Set frequency high for testing
+ " \"sync\": {\"time\":{\"field\": \"timestamp\", \"delay\": \"15m\", \"frequency\": \"1s\"}},"
+ " \"pivot\": {"
+ " \"group_by\": {"
+ " \"reviewer\": {"

View File

@ -21,23 +21,29 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.elasticsearch.xpack.core.dataframe.DataFrameMessages.DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM;
public class TransportStopDataFrameTransformAction extends
TransportTasksAction<DataFrameTransformTask, StopDataFrameTransformAction.Request,
StopDataFrameTransformAction.Response, StopDataFrameTransformAction.Response> {
@ -63,6 +69,32 @@ public class TransportStopDataFrameTransformAction extends
this.client = client;
}
static void validateTaskState(ClusterState state, List<String> transformIds, boolean isForce) {
PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
if (isForce == false && tasks != null) {
List<String> failedTasks = new ArrayList<>();
List<String> failedReasons = new ArrayList<>();
for (String transformId : transformIds) {
PersistentTasksCustomMetaData.PersistentTask<?> dfTask = tasks.getTask(transformId);
if (dfTask != null
&& dfTask.getState() instanceof DataFrameTransformState
&& ((DataFrameTransformState) dfTask.getState()).getTaskState() == DataFrameTransformTaskState.FAILED) {
failedTasks.add(transformId);
failedReasons.add(((DataFrameTransformState) dfTask.getState()).getReason());
}
}
if (failedTasks.isEmpty() == false) {
String msg = failedTasks.size() == 1 ?
DataFrameMessages.getMessage(DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM,
failedTasks.get(0),
failedReasons.get(0)) :
"Unable to stop data frame transforms. The following transforms are in a failed state " +
failedTasks + " with reasons " + failedReasons + ". Use force stop to stop the data frame transforms.";
throw new ElasticsearchStatusException(msg, RestStatus.CONFLICT);
}
}
}
@Override
protected void doExecute(Task task, StopDataFrameTransformAction.Request request,
ActionListener<StopDataFrameTransformAction.Response> listener) {
@ -88,8 +120,9 @@ public class TransportStopDataFrameTransformAction extends
new PageParams(0, 10_000),
request.isAllowNoMatch(),
ActionListener.wrap(hitsAndIds -> {
validateTaskState(state, hitsAndIds.v2(), request.isForce());
request.setExpandedIds(new HashSet<>(hitsAndIds.v2()));
request.setNodes(DataFrameNodes.dataFrameTaskNodes(hitsAndIds.v2(), clusterService.state()));
request.setNodes(DataFrameNodes.dataFrameTaskNodes(hitsAndIds.v2(), state));
super.doExecute(task, request, finalListener);
},
listener::onFailure
@ -108,11 +141,14 @@ public class TransportStopDataFrameTransformAction extends
}
if (ids.contains(transformTask.getTransformId())) {
// This should not occur as we validate that none of the tasks are in a failed state earlier
// Keep this check in here for insurance.
if (transformTask.getState().getTaskState() == DataFrameTransformTaskState.FAILED && request.isForce() == false) {
listener.onFailure(
new ElasticsearchStatusException("Unable to stop data frame transform [" + request.getId()
+ "] as it is in a failed state with reason: [" + transformTask.getState().getReason() +
"]. Use force stop to stop the data frame transform.",
new ElasticsearchStatusException(
DataFrameMessages.getMessage(DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM,
request.getId(),
transformTask.getState().getReason()),
RestStatus.CONFLICT));
return;
}

View File

@ -0,0 +1,94 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.dataframe.action;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import java.util.Arrays;
import java.util.Collections;
import static org.elasticsearch.rest.RestStatus.CONFLICT;
import static org.hamcrest.Matchers.equalTo;
public class TransportStopDataFrameTransformActionTests extends ESTestCase {
private MetaData.Builder buildMetadata(PersistentTasksCustomMetaData ptasks) {
return MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, ptasks);
}
public void testTaskStateValidationWithNoTasks() {
MetaData.Builder metaData = MetaData.builder();
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")).metaData(metaData);
TransportStopDataFrameTransformAction.validateTaskState(csBuilder.build(), Collections.singletonList("non-failed-task"), false);
PersistentTasksCustomMetaData.Builder pTasksBuilder = PersistentTasksCustomMetaData.builder();
csBuilder = ClusterState.builder(new ClusterName("_name")).metaData(buildMetadata(pTasksBuilder.build()));
TransportStopDataFrameTransformAction.validateTaskState(csBuilder.build(), Collections.singletonList("non-failed-task"), false);
}
public void testTaskStateValidationWithDataFrameTasks() {
// Test with the task state being null
PersistentTasksCustomMetaData.Builder pTasksBuilder = PersistentTasksCustomMetaData.builder()
.addTask("non-failed-task",
DataFrameTransform.NAME,
new DataFrameTransform("data-frame-task-1", Version.CURRENT, null),
new PersistentTasksCustomMetaData.Assignment("current-data-node-with-1-tasks", ""));
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")).metaData(buildMetadata(pTasksBuilder.build()));
TransportStopDataFrameTransformAction.validateTaskState(csBuilder.build(), Collections.singletonList("non-failed-task"), false);
// test again with a non failed task but this time it has internal state
pTasksBuilder.updateTaskState("non-failed-task", new DataFrameTransformState(DataFrameTransformTaskState.STOPPED,
IndexerState.STOPPED,
null,
0L,
null,
null));
csBuilder = ClusterState.builder(new ClusterName("_name")).metaData(buildMetadata(pTasksBuilder.build()));
TransportStopDataFrameTransformAction.validateTaskState(csBuilder.build(), Collections.singletonList("non-failed-task"), false);
pTasksBuilder.addTask("failed-task",
DataFrameTransform.NAME,
new DataFrameTransform("data-frame-task-1", Version.CURRENT, null),
new PersistentTasksCustomMetaData.Assignment("current-data-node-with-1-tasks", ""))
.updateTaskState("failed-task", new DataFrameTransformState(DataFrameTransformTaskState.FAILED,
IndexerState.STOPPED,
null,
0L,
"task has failed",
null));
csBuilder = ClusterState.builder(new ClusterName("_name")).metaData(buildMetadata(pTasksBuilder.build()));
TransportStopDataFrameTransformAction.validateTaskState(csBuilder.build(), Arrays.asList("non-failed-task", "failed-task"), true);
TransportStopDataFrameTransformAction.validateTaskState(csBuilder.build(), Collections.singletonList("non-failed-task"), false);
ClusterState.Builder csBuilderFinal = ClusterState.builder(new ClusterName("_name")).metaData(buildMetadata(pTasksBuilder.build()));
ElasticsearchStatusException ex = expectThrows(ElasticsearchStatusException.class,
() -> TransportStopDataFrameTransformAction.validateTaskState(csBuilderFinal.build(),
Collections.singletonList("failed-task"),
false));
assertThat(ex.status(), equalTo(CONFLICT));
assertThat(ex.getMessage(),
equalTo(DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM,
"failed-task",
"task has failed")));
}
}