diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StopDataFrameTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StopDataFrameTransformAction.java index e7a43f252d6..99699c3a48c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StopDataFrameTransformAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StopDataFrameTransformAction.java @@ -5,8 +5,10 @@ */ package org.elasticsearch.xpack.core.dataframe.action; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.support.tasks.BaseTasksRequest; import org.elasticsearch.action.support.tasks.BaseTasksResponse; import org.elasticsearch.common.Nullable; @@ -24,6 +26,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -167,6 +170,13 @@ public class StopDataFrameTransformAction extends Action taskFailures, + List nodeFailures, + boolean stopped) { + super(taskFailures, nodeFailures); + this.stopped = stopped; + } + public boolean isStopped() { return stopped; } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/DataFrameNodes.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/DataFrameNodes.java new file mode 100644 index 00000000000..1b2c54b331f --- /dev/null +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/DataFrameNodes.java @@ -0,0 +1,50 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.dataframe.action; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.xpack.core.dataframe.DataFrameField; + +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public final class DataFrameNodes { + + private DataFrameNodes() { + } + + /** + * Get the list of nodes the data frames are executing on + * + * @param dataFrameIds The data frames. + * @param clusterState State + * @return The executor nodes + */ + public static String[] dataFrameTaskNodes(List dataFrameIds, ClusterState clusterState) { + + Set executorNodes = new HashSet<>(); + + PersistentTasksCustomMetaData tasksMetaData = + PersistentTasksCustomMetaData.getPersistentTasksCustomMetaData(clusterState); + + if (tasksMetaData != null) { + Set dataFrameIdsSet = new HashSet<>(dataFrameIds); + + Collection> tasks = + tasksMetaData.findTasks(DataFrameField.TASK_NAME, t -> dataFrameIdsSet.contains(t.getId())); + + for (PersistentTasksCustomMetaData.PersistentTask task : tasks) { + executorNodes.add(task.getExecutorNode()); + } + } + + return executorNodes.toArray(new String[0]); + } +} diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java index 7ec5beb1131..7ab5f280014 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java @@ -124,6 +124,7 @@ public class TransportGetDataFrameTransformsStatsAction extends dataFrameTransformsConfigManager.expandTransformIds(request.getId(), request.getPageParams(), ActionListener.wrap( ids -> { request.setExpandedIds(ids); + request.setNodes(DataFrameNodes.dataFrameTaskNodes(ids, clusterService.state())); super.doExecute(task, request, ActionListener.wrap( response -> collectStatsForTransformsWithoutTasks(request, response, finalListener), finalListener::onFailure diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java index 2092493caaf..120f1ef7759 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java @@ -13,29 +13,24 @@ import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.tasks.TransportTasksAction; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.action.util.PageParams; -import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask; -import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; -import static org.elasticsearch.ExceptionsHelper.convertToElastic; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; public class TransportStopDataFrameTransformAction extends @@ -63,7 +58,7 @@ public class TransportStopDataFrameTransformAction extends dataFrameTransformsConfigManager.expandTransformIds(request.getId(), new PageParams(0, 10_000), ActionListener.wrap( expandedIds -> { request.setExpandedIds(new HashSet<>(expandedIds)); - request.setNodes(dataframeNodes(expandedIds, clusterService.state())); + request.setNodes(DataFrameNodes.dataFrameTaskNodes(expandedIds, clusterService.state())); super.doExecute(task, request, listener); }, listener::onFailure @@ -136,48 +131,12 @@ public class TransportStopDataFrameTransformAction extends List tasks, List taskOperationFailures, List failedNodeExceptions) { - if (taskOperationFailures.isEmpty() == false) { - throw convertToElastic(taskOperationFailures.get(0).getCause()); - } else if (failedNodeExceptions.isEmpty() == false) { - throw convertToElastic(failedNodeExceptions.get(0)); - } - - // Either the transform doesn't exist (the user didn't create it yet) or was deleted - // after the Stop API executed. - // In either case, let the user know - if (tasks.size() == 0) { - if (taskOperationFailures.isEmpty() == false) { - throw convertToElastic(taskOperationFailures.get(0).getCause()); - } else if (failedNodeExceptions.isEmpty() == false) { - throw convertToElastic(failedNodeExceptions.get(0)); - } else { - // This can happen we the actual task in the node no longer exists, or was never started - return new StopDataFrameTransformAction.Response(true); - } + if (taskOperationFailures.isEmpty() == false || failedNodeExceptions.isEmpty() == false) { + return new StopDataFrameTransformAction.Response(taskOperationFailures, failedNodeExceptions, false); } + // if tasks is empty allMatch is 'vacuously satisfied' boolean allStopped = tasks.stream().allMatch(StopDataFrameTransformAction.Response::isStopped); return new StopDataFrameTransformAction.Response(allStopped); } - - static String[] dataframeNodes(List dataFrameIds, ClusterState clusterState) { - - Set executorNodes = new HashSet<>(); - - PersistentTasksCustomMetaData tasksMetaData = - PersistentTasksCustomMetaData.getPersistentTasksCustomMetaData(clusterState); - - if (tasksMetaData != null) { - Set dataFrameIdsSet = new HashSet<>(dataFrameIds); - - Collection> tasks = - tasksMetaData.findTasks(DataFrameField.TASK_NAME, t -> dataFrameIdsSet.contains(t.getId())); - - for (PersistentTasksCustomMetaData.PersistentTask task : tasks) { - executorNodes.add(task.getExecutorNode()); - } - } - - return executorNodes.toArray(new String[0]); - } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/BaseTasksResponseToXContentListener.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/BaseTasksResponseToXContentListener.java new file mode 100644 index 00000000000..def26a52efb --- /dev/null +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/BaseTasksResponseToXContentListener.java @@ -0,0 +1,29 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.dataframe.rest.action; + +import org.elasticsearch.action.support.tasks.BaseTasksResponse; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.RestToXContentListener; + + +class BaseTasksResponseToXContentListener extends RestToXContentListener { + + BaseTasksResponseToXContentListener(RestChannel channel) { + super(channel); + } + + @Override + protected RestStatus getStatus(T response) { + if (response.getNodeFailures().size() > 0 || response.getTaskFailures().size() > 0) { + return RestStatus.INTERNAL_SERVER_ERROR; + } + return RestStatus.OK; + } +} diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestDeleteDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestDeleteDataFrameTransformAction.java index 0efa3ffa2c5..183952e0603 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestDeleteDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestDeleteDataFrameTransformAction.java @@ -11,8 +11,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction; @@ -35,15 +33,7 @@ public class RestDeleteDataFrameTransformAction extends BaseRestHandler { DeleteDataFrameTransformAction.Request request = new DeleteDataFrameTransformAction.Request(id); return channel -> client.execute(DeleteDataFrameTransformAction.INSTANCE, request, - new RestToXContentListener(channel) { - @Override - protected RestStatus getStatus(DeleteDataFrameTransformAction.Response response) { - if (response.getNodeFailures().size() > 0 || response.getTaskFailures().size() > 0) { - return RestStatus.INTERNAL_SERVER_ERROR; - } - return RestStatus.OK; - } - }); + new BaseTasksResponseToXContentListener<>(channel)); } @Override diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestGetDataFrameTransformsStatsAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestGetDataFrameTransformsStatsAction.java index 87cc13edbc3..f2d14f81069 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestGetDataFrameTransformsStatsAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestGetDataFrameTransformsStatsAction.java @@ -11,7 +11,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.xpack.core.action.util.PageParams; import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction; @@ -33,7 +32,8 @@ public class RestGetDataFrameTransformsStatsAction extends BaseRestHandler { new PageParams(restRequest.paramAsInt(PageParams.FROM.getPreferredName(), PageParams.DEFAULT_FROM), restRequest.paramAsInt(PageParams.SIZE.getPreferredName(), PageParams.DEFAULT_SIZE))); } - return channel -> client.execute(GetDataFrameTransformsStatsAction.INSTANCE, request, new RestToXContentListener<>(channel)); + return channel -> client.execute(GetDataFrameTransformsStatsAction.INSTANCE, request, + new BaseTasksResponseToXContentListener<>(channel)); } @Override diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestStartDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestStartDataFrameTransformAction.java index 1d9b3f29a61..764aeca4a64 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestStartDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestStartDataFrameTransformAction.java @@ -12,7 +12,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction; @@ -31,7 +30,8 @@ public class RestStartDataFrameTransformAction extends BaseRestHandler { boolean force = restRequest.paramAsBoolean(DataFrameField.FORCE.getPreferredName(), false); StartDataFrameTransformAction.Request request = new StartDataFrameTransformAction.Request(id, force); request.timeout(restRequest.paramAsTime(DataFrameField.TIMEOUT.getPreferredName(), AcknowledgedRequest.DEFAULT_ACK_TIMEOUT)); - return channel -> client.execute(StartDataFrameTransformAction.INSTANCE, request, new RestToXContentListener<>(channel)); + return channel -> client.execute(StartDataFrameTransformAction.INSTANCE, request, + new BaseTasksResponseToXContentListener<>(channel)); } @Override diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestStopDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestStopDataFrameTransformAction.java index e93898b905b..d34478b9ba9 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestStopDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestStopDataFrameTransformAction.java @@ -11,7 +11,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction; @@ -34,7 +33,8 @@ public class RestStopDataFrameTransformAction extends BaseRestHandler { StopDataFrameTransformAction.Request request = new StopDataFrameTransformAction.Request(id, waitForCompletion, force, timeout); - return channel -> client.execute(StopDataFrameTransformAction.INSTANCE, request, new RestToXContentListener<>(channel)); + return channel -> client.execute(StopDataFrameTransformAction.INSTANCE, request, + new BaseTasksResponseToXContentListener<>(channel)); } @Override diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformActionTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/DataFrameNodesTests.java similarity index 85% rename from x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformActionTests.java rename to x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/DataFrameNodesTests.java index ddc7ddd4f1b..ba549aa7e8b 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformActionTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/DataFrameNodesTests.java @@ -18,13 +18,12 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform; -import java.io.IOException; import java.util.Arrays; import java.util.Collections; import static org.hamcrest.Matchers.hasItemInArray; -public class TransportStopDataFrameTransformActionTests extends ESTestCase { +public class DataFrameNodesTests extends ESTestCase { public void testDataframeNodes() { String dataFrameIdFoo = "df-id-foo"; @@ -49,12 +48,12 @@ public class TransportStopDataFrameTransformActionTests extends ESTestCase { } @Override - public void writeTo(StreamOutput out) throws IOException { + public void writeTo(StreamOutput out) { } @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + public XContentBuilder toXContent(XContentBuilder builder, Params params) { return null; } }, @@ -64,7 +63,7 @@ public class TransportStopDataFrameTransformActionTests extends ESTestCase { .metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())) .build(); - String[] nodes = TransportStopDataFrameTransformAction.dataframeNodes(Arrays.asList(dataFrameIdFoo, dataFrameIdBar), cs); + String[] nodes = DataFrameNodes.dataFrameTaskNodes(Arrays.asList(dataFrameIdFoo, dataFrameIdBar), cs); assertEquals(2, nodes.length); assertThat(nodes, hasItemInArray("node-1")); assertThat(nodes, hasItemInArray("node-2")); @@ -72,7 +71,7 @@ public class TransportStopDataFrameTransformActionTests extends ESTestCase { public void testDataframeNodes_NoTasks() { ClusterState emptyState = ClusterState.builder(new ClusterName("_name")).build(); - String[] nodes = TransportStopDataFrameTransformAction.dataframeNodes(Collections.singletonList("df-id"), emptyState); + String[] nodes = DataFrameNodes.dataFrameTaskNodes(Collections.singletonList("df-id"), emptyState); assertEquals(0, nodes.length); } }