diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java index 5d6cf54c44d..358ce863ad3 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java @@ -32,6 +32,9 @@ public class DataFrameMessages { public static final String DATA_FRAME_FAILED_TO_PERSIST_STATS = "Failed to persist data frame statistics for transform [{0}]"; public static final String DATA_FRAME_UNKNOWN_TRANSFORM_STATS = "Statistics for transform [{0}] could not be found"; + public static final String DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM = + "Unable to stop data frame transform [{0}] as it is in a failed state with reason [{1}]." + + " Use force stop to stop the data frame transform."; public static final String FAILED_TO_CREATE_DESTINATION_INDEX = "Could not create destination index [{0}] for transform [{1}]"; public static final String FAILED_TO_LOAD_TRANSFORM_CONFIGURATION = "Failed to load data frame transform configuration for transform [{0}]"; diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java index 010652de002..1b068c93364 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java @@ -161,7 +161,8 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase { String config = "{ \"dest\": {\"index\":\"" + dataFrameIndex + "\"}," + " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"}," - + " \"sync\": {\"time\":{\"field\": \"timestamp\", \"delay\": \"15m\"}}," + //Set frequency high for testing + + " \"sync\": {\"time\":{\"field\": \"timestamp\", \"delay\": \"15m\", \"frequency\": \"1s\"}}," + " \"pivot\": {" + " \"group_by\": {" + " \"reviewer\": {" diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java index df2ff3914cc..dc3c34f1bd0 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java @@ -21,23 +21,29 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.MasterNotDiscoveredException; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.action.util.PageParams; +import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState; import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask; +import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; +import static org.elasticsearch.xpack.core.dataframe.DataFrameMessages.DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM; + public class TransportStopDataFrameTransformAction extends TransportTasksAction { @@ -63,6 +69,32 @@ public class TransportStopDataFrameTransformAction extends this.client = client; } + static void validateTaskState(ClusterState state, List transformIds, boolean isForce) { + PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE); + if (isForce == false && tasks != null) { + List failedTasks = new ArrayList<>(); + List failedReasons = new ArrayList<>(); + for (String transformId : transformIds) { + PersistentTasksCustomMetaData.PersistentTask dfTask = tasks.getTask(transformId); + if (dfTask != null + && dfTask.getState() instanceof DataFrameTransformState + && ((DataFrameTransformState) dfTask.getState()).getTaskState() == DataFrameTransformTaskState.FAILED) { + failedTasks.add(transformId); + failedReasons.add(((DataFrameTransformState) dfTask.getState()).getReason()); + } + } + if (failedTasks.isEmpty() == false) { + String msg = failedTasks.size() == 1 ? + DataFrameMessages.getMessage(DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM, + failedTasks.get(0), + failedReasons.get(0)) : + "Unable to stop data frame transforms. The following transforms are in a failed state " + + failedTasks + " with reasons " + failedReasons + ". Use force stop to stop the data frame transforms."; + throw new ElasticsearchStatusException(msg, RestStatus.CONFLICT); + } + } + } + @Override protected void doExecute(Task task, StopDataFrameTransformAction.Request request, ActionListener listener) { @@ -88,8 +120,9 @@ public class TransportStopDataFrameTransformAction extends new PageParams(0, 10_000), request.isAllowNoMatch(), ActionListener.wrap(hitsAndIds -> { + validateTaskState(state, hitsAndIds.v2(), request.isForce()); request.setExpandedIds(new HashSet<>(hitsAndIds.v2())); - request.setNodes(DataFrameNodes.dataFrameTaskNodes(hitsAndIds.v2(), clusterService.state())); + request.setNodes(DataFrameNodes.dataFrameTaskNodes(hitsAndIds.v2(), state)); super.doExecute(task, request, finalListener); }, listener::onFailure @@ -108,11 +141,14 @@ public class TransportStopDataFrameTransformAction extends } if (ids.contains(transformTask.getTransformId())) { + // This should not occur as we validate that none of the tasks are in a failed state earlier + // Keep this check in here for insurance. if (transformTask.getState().getTaskState() == DataFrameTransformTaskState.FAILED && request.isForce() == false) { listener.onFailure( - new ElasticsearchStatusException("Unable to stop data frame transform [" + request.getId() - + "] as it is in a failed state with reason: [" + transformTask.getState().getReason() + - "]. Use force stop to stop the data frame transform.", + new ElasticsearchStatusException( + DataFrameMessages.getMessage(DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM, + request.getId(), + transformTask.getState().getReason()), RestStatus.CONFLICT)); return; } diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformActionTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformActionTests.java new file mode 100644 index 00000000000..61fad63c832 --- /dev/null +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformActionTests.java @@ -0,0 +1,94 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.dataframe.action; + +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState; +import org.elasticsearch.xpack.core.indexing.IndexerState; + +import java.util.Arrays; +import java.util.Collections; + +import static org.elasticsearch.rest.RestStatus.CONFLICT; +import static org.hamcrest.Matchers.equalTo; + +public class TransportStopDataFrameTransformActionTests extends ESTestCase { + + private MetaData.Builder buildMetadata(PersistentTasksCustomMetaData ptasks) { + return MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, ptasks); + } + + public void testTaskStateValidationWithNoTasks() { + MetaData.Builder metaData = MetaData.builder(); + ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")).metaData(metaData); + TransportStopDataFrameTransformAction.validateTaskState(csBuilder.build(), Collections.singletonList("non-failed-task"), false); + + PersistentTasksCustomMetaData.Builder pTasksBuilder = PersistentTasksCustomMetaData.builder(); + csBuilder = ClusterState.builder(new ClusterName("_name")).metaData(buildMetadata(pTasksBuilder.build())); + TransportStopDataFrameTransformAction.validateTaskState(csBuilder.build(), Collections.singletonList("non-failed-task"), false); + } + + public void testTaskStateValidationWithDataFrameTasks() { + // Test with the task state being null + PersistentTasksCustomMetaData.Builder pTasksBuilder = PersistentTasksCustomMetaData.builder() + .addTask("non-failed-task", + DataFrameTransform.NAME, + new DataFrameTransform("data-frame-task-1", Version.CURRENT, null), + new PersistentTasksCustomMetaData.Assignment("current-data-node-with-1-tasks", "")); + ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")).metaData(buildMetadata(pTasksBuilder.build())); + + TransportStopDataFrameTransformAction.validateTaskState(csBuilder.build(), Collections.singletonList("non-failed-task"), false); + + // test again with a non failed task but this time it has internal state + pTasksBuilder.updateTaskState("non-failed-task", new DataFrameTransformState(DataFrameTransformTaskState.STOPPED, + IndexerState.STOPPED, + null, + 0L, + null, + null)); + csBuilder = ClusterState.builder(new ClusterName("_name")).metaData(buildMetadata(pTasksBuilder.build())); + + TransportStopDataFrameTransformAction.validateTaskState(csBuilder.build(), Collections.singletonList("non-failed-task"), false); + + pTasksBuilder.addTask("failed-task", + DataFrameTransform.NAME, + new DataFrameTransform("data-frame-task-1", Version.CURRENT, null), + new PersistentTasksCustomMetaData.Assignment("current-data-node-with-1-tasks", "")) + .updateTaskState("failed-task", new DataFrameTransformState(DataFrameTransformTaskState.FAILED, + IndexerState.STOPPED, + null, + 0L, + "task has failed", + null)); + csBuilder = ClusterState.builder(new ClusterName("_name")).metaData(buildMetadata(pTasksBuilder.build())); + + TransportStopDataFrameTransformAction.validateTaskState(csBuilder.build(), Arrays.asList("non-failed-task", "failed-task"), true); + + TransportStopDataFrameTransformAction.validateTaskState(csBuilder.build(), Collections.singletonList("non-failed-task"), false); + + ClusterState.Builder csBuilderFinal = ClusterState.builder(new ClusterName("_name")).metaData(buildMetadata(pTasksBuilder.build())); + ElasticsearchStatusException ex = expectThrows(ElasticsearchStatusException.class, + () -> TransportStopDataFrameTransformAction.validateTaskState(csBuilderFinal.build(), + Collections.singletonList("failed-task"), + false)); + + assertThat(ex.status(), equalTo(CONFLICT)); + assertThat(ex.getMessage(), + equalTo(DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM, + "failed-task", + "task has failed"))); + } + +}