[ML Data Frame] Set executing nodes in task actions (#41798)

Direct the task request to the node executing the task and also refactor the task responses
so all errors are returned and set the HTTP status code based on presence of errors.
This commit is contained in:
David Kyle 2019-05-08 12:23:49 +01:00
parent 60f84a2eb2
commit ba9d2ccc1f
10 changed files with 106 additions and 68 deletions

View File

@ -5,8 +5,10 @@
*/
package org.elasticsearch.xpack.core.dataframe.action;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.common.Nullable;
@ -24,6 +26,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@ -167,6 +170,13 @@ public class StopDataFrameTransformAction extends Action<StopDataFrameTransformA
this.stopped = stopped;
}
public Response(List<TaskOperationFailure> taskFailures,
List<? extends ElasticsearchException> nodeFailures,
boolean stopped) {
super(taskFailures, nodeFailures);
this.stopped = stopped;
}
public boolean isStopped() {
return stopped;
}

View File

@ -0,0 +1,50 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.dataframe.action;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public final class DataFrameNodes {
private DataFrameNodes() {
}
/**
* Get the list of nodes the data frames are executing on
*
* @param dataFrameIds The data frames.
* @param clusterState State
* @return The executor nodes
*/
public static String[] dataFrameTaskNodes(List<String> dataFrameIds, ClusterState clusterState) {
Set<String> executorNodes = new HashSet<>();
PersistentTasksCustomMetaData tasksMetaData =
PersistentTasksCustomMetaData.getPersistentTasksCustomMetaData(clusterState);
if (tasksMetaData != null) {
Set<String> dataFrameIdsSet = new HashSet<>(dataFrameIds);
Collection<PersistentTasksCustomMetaData.PersistentTask<?>> tasks =
tasksMetaData.findTasks(DataFrameField.TASK_NAME, t -> dataFrameIdsSet.contains(t.getId()));
for (PersistentTasksCustomMetaData.PersistentTask<?> task : tasks) {
executorNodes.add(task.getExecutorNode());
}
}
return executorNodes.toArray(new String[0]);
}
}

View File

@ -124,6 +124,7 @@ public class TransportGetDataFrameTransformsStatsAction extends
dataFrameTransformsConfigManager.expandTransformIds(request.getId(), request.getPageParams(), ActionListener.wrap(
ids -> {
request.setExpandedIds(ids);
request.setNodes(DataFrameNodes.dataFrameTaskNodes(ids, clusterService.state()));
super.doExecute(task, request, ActionListener.wrap(
response -> collectStatsForTransformsWithoutTasks(request, response, finalListener),
finalListener::onFailure

View File

@ -13,29 +13,24 @@ import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.elasticsearch.ExceptionsHelper.convertToElastic;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
public class TransportStopDataFrameTransformAction extends
@ -63,7 +58,7 @@ public class TransportStopDataFrameTransformAction extends
dataFrameTransformsConfigManager.expandTransformIds(request.getId(), new PageParams(0, 10_000), ActionListener.wrap(
expandedIds -> {
request.setExpandedIds(new HashSet<>(expandedIds));
request.setNodes(dataframeNodes(expandedIds, clusterService.state()));
request.setNodes(DataFrameNodes.dataFrameTaskNodes(expandedIds, clusterService.state()));
super.doExecute(task, request, listener);
},
listener::onFailure
@ -136,48 +131,12 @@ public class TransportStopDataFrameTransformAction extends
List<StopDataFrameTransformAction.Response> tasks, List<TaskOperationFailure> taskOperationFailures,
List<FailedNodeException> failedNodeExceptions) {
if (taskOperationFailures.isEmpty() == false) {
throw convertToElastic(taskOperationFailures.get(0).getCause());
} else if (failedNodeExceptions.isEmpty() == false) {
throw convertToElastic(failedNodeExceptions.get(0));
}
// Either the transform doesn't exist (the user didn't create it yet) or was deleted
// after the Stop API executed.
// In either case, let the user know
if (tasks.size() == 0) {
if (taskOperationFailures.isEmpty() == false) {
throw convertToElastic(taskOperationFailures.get(0).getCause());
} else if (failedNodeExceptions.isEmpty() == false) {
throw convertToElastic(failedNodeExceptions.get(0));
} else {
// This can happen we the actual task in the node no longer exists, or was never started
return new StopDataFrameTransformAction.Response(true);
}
if (taskOperationFailures.isEmpty() == false || failedNodeExceptions.isEmpty() == false) {
return new StopDataFrameTransformAction.Response(taskOperationFailures, failedNodeExceptions, false);
}
// if tasks is empty allMatch is 'vacuously satisfied'
boolean allStopped = tasks.stream().allMatch(StopDataFrameTransformAction.Response::isStopped);
return new StopDataFrameTransformAction.Response(allStopped);
}
static String[] dataframeNodes(List<String> dataFrameIds, ClusterState clusterState) {
Set<String> executorNodes = new HashSet<>();
PersistentTasksCustomMetaData tasksMetaData =
PersistentTasksCustomMetaData.getPersistentTasksCustomMetaData(clusterState);
if (tasksMetaData != null) {
Set<String> dataFrameIdsSet = new HashSet<>(dataFrameIds);
Collection<PersistentTasksCustomMetaData.PersistentTask<?>> tasks =
tasksMetaData.findTasks(DataFrameField.TASK_NAME, t -> dataFrameIdsSet.contains(t.getId()));
for (PersistentTasksCustomMetaData.PersistentTask<?> task : tasks) {
executorNodes.add(task.getExecutorNode());
}
}
return executorNodes.toArray(new String[0]);
}
}

View File

@ -0,0 +1,29 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.dataframe.rest.action;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestToXContentListener;
class BaseTasksResponseToXContentListener<T extends BaseTasksResponse & ToXContentObject> extends RestToXContentListener<T> {
BaseTasksResponseToXContentListener(RestChannel channel) {
super(channel);
}
@Override
protected RestStatus getStatus(T response) {
if (response.getNodeFailures().size() > 0 || response.getTaskFailures().size() > 0) {
return RestStatus.INTERNAL_SERVER_ERROR;
}
return RestStatus.OK;
}
}

View File

@ -11,8 +11,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction;
@ -35,15 +33,7 @@ public class RestDeleteDataFrameTransformAction extends BaseRestHandler {
DeleteDataFrameTransformAction.Request request = new DeleteDataFrameTransformAction.Request(id);
return channel -> client.execute(DeleteDataFrameTransformAction.INSTANCE, request,
new RestToXContentListener<DeleteDataFrameTransformAction.Response>(channel) {
@Override
protected RestStatus getStatus(DeleteDataFrameTransformAction.Response response) {
if (response.getNodeFailures().size() > 0 || response.getTaskFailures().size() > 0) {
return RestStatus.INTERNAL_SERVER_ERROR;
}
return RestStatus.OK;
}
});
new BaseTasksResponseToXContentListener<>(channel));
}
@Override

View File

@ -11,7 +11,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction;
@ -33,7 +32,8 @@ public class RestGetDataFrameTransformsStatsAction extends BaseRestHandler {
new PageParams(restRequest.paramAsInt(PageParams.FROM.getPreferredName(), PageParams.DEFAULT_FROM),
restRequest.paramAsInt(PageParams.SIZE.getPreferredName(), PageParams.DEFAULT_SIZE)));
}
return channel -> client.execute(GetDataFrameTransformsStatsAction.INSTANCE, request, new RestToXContentListener<>(channel));
return channel -> client.execute(GetDataFrameTransformsStatsAction.INSTANCE, request,
new BaseTasksResponseToXContentListener<>(channel));
}
@Override

View File

@ -12,7 +12,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction;
@ -31,7 +30,8 @@ public class RestStartDataFrameTransformAction extends BaseRestHandler {
boolean force = restRequest.paramAsBoolean(DataFrameField.FORCE.getPreferredName(), false);
StartDataFrameTransformAction.Request request = new StartDataFrameTransformAction.Request(id, force);
request.timeout(restRequest.paramAsTime(DataFrameField.TIMEOUT.getPreferredName(), AcknowledgedRequest.DEFAULT_ACK_TIMEOUT));
return channel -> client.execute(StartDataFrameTransformAction.INSTANCE, request, new RestToXContentListener<>(channel));
return channel -> client.execute(StartDataFrameTransformAction.INSTANCE, request,
new BaseTasksResponseToXContentListener<>(channel));
}
@Override

View File

@ -11,7 +11,6 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
@ -34,7 +33,8 @@ public class RestStopDataFrameTransformAction extends BaseRestHandler {
StopDataFrameTransformAction.Request request = new StopDataFrameTransformAction.Request(id, waitForCompletion, force, timeout);
return channel -> client.execute(StopDataFrameTransformAction.INSTANCE, request, new RestToXContentListener<>(channel));
return channel -> client.execute(StopDataFrameTransformAction.INSTANCE, request,
new BaseTasksResponseToXContentListener<>(channel));
}
@Override

View File

@ -18,13 +18,12 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import static org.hamcrest.Matchers.hasItemInArray;
public class TransportStopDataFrameTransformActionTests extends ESTestCase {
public class DataFrameNodesTests extends ESTestCase {
public void testDataframeNodes() {
String dataFrameIdFoo = "df-id-foo";
@ -49,12 +48,12 @@ public class TransportStopDataFrameTransformActionTests extends ESTestCase {
}
@Override
public void writeTo(StreamOutput out) throws IOException {
public void writeTo(StreamOutput out) {
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
public XContentBuilder toXContent(XContentBuilder builder, Params params) {
return null;
}
},
@ -64,7 +63,7 @@ public class TransportStopDataFrameTransformActionTests extends ESTestCase {
.metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()))
.build();
String[] nodes = TransportStopDataFrameTransformAction.dataframeNodes(Arrays.asList(dataFrameIdFoo, dataFrameIdBar), cs);
String[] nodes = DataFrameNodes.dataFrameTaskNodes(Arrays.asList(dataFrameIdFoo, dataFrameIdBar), cs);
assertEquals(2, nodes.length);
assertThat(nodes, hasItemInArray("node-1"));
assertThat(nodes, hasItemInArray("node-2"));
@ -72,7 +71,7 @@ public class TransportStopDataFrameTransformActionTests extends ESTestCase {
public void testDataframeNodes_NoTasks() {
ClusterState emptyState = ClusterState.builder(new ClusterName("_name")).build();
String[] nodes = TransportStopDataFrameTransformAction.dataframeNodes(Collections.singletonList("df-id"), emptyState);
String[] nodes = DataFrameNodes.dataFrameTaskNodes(Collections.singletonList("df-id"), emptyState);
assertEquals(0, nodes.length);
}
}