Create get task API that falls back to the .tasks index

This adds a get task API that supports GET /_tasks/${taskId} and
removes that responsibility from the list tasks API. The get task
API supports wait_for_complation just as the list tasks API does
but doesn't support any of the list task API's filters. In exchange,
it supports falling back to the .results index when the task isn't
running any more. Like any good GET API it 404s when it doesn't
find the task.

Then we change reindex, update-by-query, and delete-by-query to
persist the task result when wait_for_completion=false. The leads
to the neat behavior that, once you start a reindex with
wait_for_completion=false, you can fetch the result of the task by
using the get task API and see the result when it has finished.

Also rename the .results index to .tasks.
This commit is contained in:
Nik Everett 2016-05-31 19:44:01 -04:00
parent e4dc469e58
commit e392e0b1df
60 changed files with 1863 additions and 378 deletions

View File

@ -32,6 +32,8 @@ import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsAction;
import org.elasticsearch.action.admin.cluster.node.stats.TransportNodesStatsAction;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksAction;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.TransportCancelTasksAction;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction;
import org.elasticsearch.action.admin.cluster.node.tasks.get.TransportGetTaskAction;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryAction;
@ -264,6 +266,7 @@ public class ActionModule extends AbstractModule {
registerAction(NodesStatsAction.INSTANCE, TransportNodesStatsAction.class);
registerAction(NodesHotThreadsAction.INSTANCE, TransportNodesHotThreadsAction.class);
registerAction(ListTasksAction.INSTANCE, TransportListTasksAction.class);
registerAction(GetTaskAction.INSTANCE, TransportGetTaskAction.class);
registerAction(CancelTasksAction.INSTANCE, TransportCancelTasksAction.class);
registerAction(ClusterAllocationExplainAction.INSTANCE, TransportClusterAllocationExplainAction.class);

View File

@ -39,6 +39,9 @@ public abstract class ActionRequest<Request extends ActionRequest<Request>> exte
public abstract ActionRequestValidationException validate();
/**
* Should this task persist its result after it has finished?
*/
public boolean getShouldPersistResult() {
return false;
}

View File

@ -22,7 +22,7 @@ package org.elasticsearch.action.admin.cluster.node.tasks.cancel;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo;
import org.elasticsearch.tasks.TaskInfo;
import java.util.List;

View File

@ -22,7 +22,6 @@ package org.elasticsearch.action.admin.cluster.node.tasks.cancel;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.ClusterName;
@ -36,6 +35,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;

View File

@ -0,0 +1,46 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.node.tasks.get;
import org.elasticsearch.action.Action;
import org.elasticsearch.client.ElasticsearchClient;
/**
* Action for retrieving a list of currently running tasks
*/
public class GetTaskAction extends Action<GetTaskRequest, GetTaskResponse, GetTaskRequestBuilder> {
public static final GetTaskAction INSTANCE = new GetTaskAction();
public static final String NAME = "cluster:monitor/task/get";
private GetTaskAction() {
super(NAME);
}
@Override
public GetTaskResponse newResponse() {
return new GetTaskResponse();
}
@Override
public GetTaskRequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new GetTaskRequestBuilder(client, this);
}
}

View File

@ -0,0 +1,119 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.node.tasks.get;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.tasks.TaskId;
import java.io.IOException;
import static org.elasticsearch.action.ValidateActions.addValidationError;
/**
* A request to get node tasks
*/
public class GetTaskRequest extends ActionRequest<GetTaskRequest> {
private TaskId taskId = TaskId.EMPTY_TASK_ID;
private boolean waitForCompletion = false;
private TimeValue timeout = null;
/**
* Get the TaskId to look up.
*/
public TaskId getTaskId() {
return taskId;
}
/**
* Set the TaskId to look up. Required.
*/
public GetTaskRequest setTaskId(TaskId taskId) {
this.taskId = taskId;
return this;
}
/**
* Should this request wait for all found tasks to complete?
*/
public boolean getWaitForCompletion() {
return waitForCompletion;
}
/**
* Should this request wait for all found tasks to complete?
*/
public GetTaskRequest setWaitForCompletion(boolean waitForCompletion) {
this.waitForCompletion = waitForCompletion;
return this;
}
/**
* Timeout to wait for any async actions this request must take. It must take anywhere from 0 to 2.
*/
public TimeValue getTimeout() {
return timeout;
}
/**
* Timeout to wait for any async actions this request must take. It must take anywhere from 0 to 2.
*/
public GetTaskRequest setTimeout(TimeValue timeout) {
this.timeout = timeout;
return this;
}
GetTaskRequest nodeRequest(String thisNodeId, long thisTaskId) {
GetTaskRequest copy = new GetTaskRequest();
copy.setParentTask(thisNodeId, thisTaskId);
copy.setTaskId(taskId);
copy.setTimeout(timeout);
copy.setWaitForCompletion(waitForCompletion);
return copy;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (false == getTaskId().isSet()) {
validationException = addValidationError("task id is required", validationException);
}
return validationException;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
taskId = TaskId.readFromStream(in);
timeout = in.readOptionalWriteable(TimeValue::new);
waitForCompletion = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
taskId.writeTo(out);
out.writeOptionalWriteable(timeout);
out.writeBoolean(waitForCompletion);
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.node.tasks.get;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.tasks.TaskId;
/**
* Builder for the request to retrieve the list of tasks running on the specified nodes
*/
public class GetTaskRequestBuilder extends ActionRequestBuilder<GetTaskRequest, GetTaskResponse, GetTaskRequestBuilder> {
public GetTaskRequestBuilder(ElasticsearchClient client, GetTaskAction action) {
super(client, action, new GetTaskRequest());
}
/**
* Set the TaskId to look up. Required.
*/
public final GetTaskRequestBuilder setTaskId(TaskId taskId) {
request.setTaskId(taskId);
return this;
}
/**
* Should this request wait for all found tasks to complete?
*/
public final GetTaskRequestBuilder setWaitForCompletion(boolean waitForCompletion) {
request.setWaitForCompletion(waitForCompletion);
return this;
}
/**
* Timeout to wait for any async actions this request must take. It must take anywhere from 0 to 2.
*/
public final GetTaskRequestBuilder setTimeout(TimeValue timeout) {
request.setTimeout(timeout);
return this;
}
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.node.tasks.get;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.PersistedTaskInfo;
import java.io.IOException;
import static java.util.Objects.requireNonNull;
/**
* Returns the list of tasks currently running on the nodes
*/
public class GetTaskResponse extends ActionResponse implements ToXContent {
private PersistedTaskInfo task;
public GetTaskResponse() {
}
public GetTaskResponse(PersistedTaskInfo task) {
this.task = requireNonNull(task, "task is required");
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
task = in.readOptionalWriteable(PersistedTaskInfo::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalWriteable(task);
}
/**
* Get the actual result of the fetch.
*/
public PersistedTaskInfo getTask() {
return task;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return task.innerToXContent(builder, params);
}
@Override
public String toString() {
return Strings.toString(this);
}
}

View File

@ -0,0 +1,216 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.node.tasks.get;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.PersistedTaskInfo;
import org.elasticsearch.tasks.TaskPersistenceService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportResponseHandler;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import static org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction.waitForCompletionTimeout;
import static org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction.waitForTaskCompletion;
/**
* Action to get a single task. If the task isn't running then it'll try to request the status from request index.
*
* The general flow is:
* <ul>
* <li>If this isn't being executed on the node to which the requested TaskId belongs then move to that node.
* <li>Look up the task and return it if it exists
* <li>If it doesn't then look up the task from the results index
* </ul>
*/
public class TransportGetTaskAction extends HandledTransportAction<GetTaskRequest, GetTaskResponse> {
private final ClusterService clusterService;
private final TransportService transportService;
private final Client client;
@Inject
public TransportGetTaskAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, ClusterService clusterService, Client client) {
super(settings, GetTaskAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, GetTaskRequest::new);
this.clusterService = clusterService;
this.transportService = transportService;
this.client = client;
}
@Override
protected void doExecute(GetTaskRequest request, ActionListener<GetTaskResponse> listener) {
throw new UnsupportedOperationException("Task is required");
}
@Override
protected void doExecute(Task thisTask, GetTaskRequest request, ActionListener<GetTaskResponse> listener) {
if (clusterService.localNode().getId().equals(request.getTaskId().getNodeId())) {
getRunningTaskFromNode(thisTask, request, listener);
} else {
runOnNodeWithTaskIfPossible(thisTask, request, listener);
}
}
/**
* Executed on the coordinating node to forward execution of the remaining work to the node that matches that requested
* {@link TaskId#getNodeId()}. If the node isn't in the cluster then this will just proceed to
* {@link #getFinishedTaskFromIndex(Task, GetTaskRequest, ActionListener)} on this node.
*/
private void runOnNodeWithTaskIfPossible(Task thisTask, GetTaskRequest request, ActionListener<GetTaskResponse> listener) {
TransportRequestOptions.Builder builder = TransportRequestOptions.builder();
if (request.getTimeout() != null) {
builder.withTimeout(request.getTimeout());
}
builder.withCompress(false);
DiscoveryNode node = clusterService.state().nodes().get(request.getTaskId().getNodeId());
if (node == null) {
// Node is no longer part of the cluster! Try and look the task up from the results index.
getFinishedTaskFromIndex(thisTask, request, listener);
return;
}
GetTaskRequest nodeRequest = request.nodeRequest(clusterService.localNode().getId(), thisTask.getId());
taskManager.registerChildTask(thisTask, node.getId());
transportService.sendRequest(node, GetTaskAction.NAME, nodeRequest, builder.build(),
new BaseTransportResponseHandler<GetTaskResponse>() {
@Override
public GetTaskResponse newInstance() {
return new GetTaskResponse();
}
@Override
public void handleResponse(GetTaskResponse response) {
listener.onResponse(response);
}
@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
}
/**
* Executed on the node that should be running the task to find and return the running task. Falls back to
* {@link #getFinishedTaskFromIndex(Task, GetTaskRequest, ActionListener)} if the task isn't still running.
*/
void getRunningTaskFromNode(Task thisTask, GetTaskRequest request, ActionListener<GetTaskResponse> listener) {
Task runningTask = taskManager.getTask(request.getTaskId().getId());
if (runningTask == null) {
getFinishedTaskFromIndex(thisTask, request, listener);
} else {
if (request.getWaitForCompletion()) {
// Shift to the generic thread pool and let it wait for the task to complete so we don't block any important threads.
threadPool.generic().execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
waitForTaskCompletion(taskManager, runningTask, waitForCompletionTimeout(request.getTimeout()));
// TODO look up the task's result from the .tasks index now that it is done
listener.onResponse(
new GetTaskResponse(new PersistedTaskInfo(runningTask.taskInfo(clusterService.localNode(), true))));
}
@Override
public void onFailure(Throwable t) {
listener.onFailure(t);
}
});
} else {
listener.onResponse(new GetTaskResponse(new PersistedTaskInfo(runningTask.taskInfo(clusterService.localNode(), true))));
}
}
}
/**
* Send a {@link GetRequest} to the results index looking for the results of the task. It'll only be found only if the task's result was
* persisted. Called on the node that once had the task if that node is part of the cluster or on the coordinating node if the node
* wasn't part of the cluster.
*/
void getFinishedTaskFromIndex(Task thisTask, GetTaskRequest request, ActionListener<GetTaskResponse> listener) {
GetRequest get = new GetRequest(TaskPersistenceService.TASK_INDEX, TaskPersistenceService.TASK_TYPE,
request.getTaskId().toString());
get.setParentTask(clusterService.localNode().getId(), thisTask.getId());
client.get(get, new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse getResponse) {
try {
onGetFinishedTaskFromIndex(getResponse, listener);
} catch (Throwable e) {
listener.onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
if (ExceptionsHelper.unwrap(e, IndexNotFoundException.class) != null) {
// We haven't yet created the index for the task results so it can't be found.
listener.onFailure(new ResourceNotFoundException("task [{}] isn't running or persisted", e, request.getTaskId()));
} else {
listener.onFailure(e);
}
}
});
}
/**
* Called with the {@linkplain GetResponse} from loading the task from the results index. Called on the node that once had the task if
* that node is part of the cluster or on the coordinating node if the node wasn't part of the cluster.
*/
void onGetFinishedTaskFromIndex(GetResponse response, ActionListener<GetTaskResponse> listener) throws IOException {
if (false == response.isExists()) {
listener.onFailure(new ResourceNotFoundException("task [{}] isn't running or persisted", response.getId()));
}
if (response.isSourceEmpty()) {
listener.onFailure(new ElasticsearchException("Stored task status for [{}] didn't contain any source!", response.getId()));
return;
}
try (XContentParser parser = XContentHelper.createParser(response.getSourceAsBytesRef())) {
PersistedTaskInfo result = PersistedTaskInfo.PARSER.apply(parser, () -> ParseFieldMatcher.STRICT);
listener.onResponse(new GetTaskResponse(result));
}
}
}

View File

@ -23,21 +23,21 @@ import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
@ -47,10 +47,12 @@ public class ListTasksResponse extends BaseTasksResponse implements ToXContent {
private List<TaskInfo> tasks;
private Map<DiscoveryNode, List<TaskInfo>> nodes;
private Map<String, List<TaskInfo>> perNodeTasks;
private List<TaskGroup> groups;
private DiscoveryNodes discoveryNodes;
public ListTasksResponse() {
}
@ -75,28 +77,11 @@ public class ListTasksResponse extends BaseTasksResponse implements ToXContent {
/**
* Returns the list of tasks by node
*/
public Map<DiscoveryNode, List<TaskInfo>> getPerNodeTasks() {
if (nodes != null) {
return nodes;
public Map<String, List<TaskInfo>> getPerNodeTasks() {
if (perNodeTasks == null) {
perNodeTasks = tasks.stream().collect(Collectors.groupingBy(t -> t.getTaskId().getNodeId()));
}
Map<DiscoveryNode, List<TaskInfo>> nodeTasks = new HashMap<>();
Set<DiscoveryNode> nodes = new HashSet<>();
for (TaskInfo shard : tasks) {
nodes.add(shard.getNode());
}
for (DiscoveryNode node : nodes) {
List<TaskInfo> tasks = new ArrayList<>();
for (TaskInfo taskInfo : this.tasks) {
if (taskInfo.getNode().equals(node)) {
tasks.add(taskInfo);
}
}
nodeTasks.put(node, tasks);
}
this.nodes = nodeTasks;
return nodeTasks;
return perNodeTasks;
}
public List<TaskGroup> getTaskGroups() {
@ -138,6 +123,14 @@ public class ListTasksResponse extends BaseTasksResponse implements ToXContent {
return tasks;
}
/**
* Set a reference to the {@linkplain DiscoveryNodes}. Used for calling {@link #toXContent(XContentBuilder, ToXContent.Params)} with
* {@code group_by=nodes}.
*/
public void setDiscoveryNodes(DiscoveryNodes discoveryNodes) {
this.discoveryNodes = discoveryNodes;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
if (getTaskFailures() != null && getTaskFailures().size() > 0) {
@ -161,33 +154,38 @@ public class ListTasksResponse extends BaseTasksResponse implements ToXContent {
}
String groupBy = params.param("group_by", "nodes");
if ("nodes".equals(groupBy)) {
if (discoveryNodes == null) {
throw new IllegalStateException("discoveryNodes must be set before calling toXContent with group_by=nodes");
}
builder.startObject("nodes");
for (Map.Entry<DiscoveryNode, List<TaskInfo>> entry : getPerNodeTasks().entrySet()) {
DiscoveryNode node = entry.getKey();
builder.startObject(node.getId());
builder.field("name", node.getName());
builder.field("transport_address", node.getAddress().toString());
builder.field("host", node.getHostName());
builder.field("ip", node.getAddress());
for (Map.Entry<String, List<TaskInfo>> entry : getPerNodeTasks().entrySet()) {
DiscoveryNode node = discoveryNodes.get(entry.getKey());
builder.startObject(entry.getKey());
if (node != null) {
// If the node is no longer part of the cluster, oh well, we'll just skip it's useful information.
builder.field("name", node.getName());
builder.field("transport_address", node.getAddress().toString());
builder.field("host", node.getHostName());
builder.field("ip", node.getAddress());
builder.startArray("roles");
for (DiscoveryNode.Role role : node.getRoles()) {
builder.value(role.getRoleName());
}
builder.endArray();
if (!node.getAttributes().isEmpty()) {
builder.startObject("attributes");
for (Map.Entry<String, String> attrEntry : node.getAttributes().entrySet()) {
builder.field(attrEntry.getKey(), attrEntry.getValue());
builder.startArray("roles");
for (DiscoveryNode.Role role : node.getRoles()) {
builder.value(role.getRoleName());
}
builder.endArray();
if (!node.getAttributes().isEmpty()) {
builder.startObject("attributes");
for (Map.Entry<String, String> attrEntry : node.getAttributes().entrySet()) {
builder.field(attrEntry.getKey(), attrEntry.getValue());
}
builder.endObject();
}
builder.endObject();
}
builder.startObject("tasks");
for(TaskInfo task : entry.getValue()) {
builder.startObject(task.getTaskId().toString());
builder.field(task.getTaskId().toString());
task.toXContent(builder, params);
builder.endObject();
}
builder.endObject();
builder.endObject();
@ -196,9 +194,8 @@ public class ListTasksResponse extends BaseTasksResponse implements ToXContent {
} else if ("parents".equals(groupBy)) {
builder.startObject("tasks");
for (TaskGroup group : getTaskGroups()) {
builder.startObject(group.getTaskInfo().getTaskId().toString());
builder.field(group.getTaskInfo().getTaskId().toString());
group.toXContent(builder, params);
builder.endObject();
}
builder.endObject();
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action.admin.cluster.node.tasks.list;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.TaskInfo;
import java.io.IOException;
import java.util.ArrayList;
@ -79,16 +80,15 @@ public class TaskGroup implements ToXContent {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
task.toXContent(builder, params);
builder.startObject();
task.innerToXContent(builder, params);
if (childTasks.isEmpty() == false) {
builder.startArray("children");
for (TaskGroup taskGroup : childTasks) {
builder.startObject();
taskGroup.toXContent(builder, params);
builder.endObject();
}
builder.endArray();
}
return builder;
return builder.endObject();
}
}

View File

@ -33,6 +33,8 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -47,6 +49,26 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
*
*/
public class TransportListTasksAction extends TransportTasksAction<Task, ListTasksRequest, ListTasksResponse, TaskInfo> {
public static void waitForTaskCompletion(TaskManager taskManager, Task task, long untilInNanos) {
while (System.nanoTime() - untilInNanos < 0) {
if (taskManager.getTask(task.getId()) == null) {
return;
}
try {
Thread.sleep(WAIT_FOR_COMPLETION_POLL.millis());
} catch (InterruptedException e) {
throw new ElasticsearchException("Interrupted waiting for completion of [{}]", e, task);
}
}
throw new ElasticsearchTimeoutException("Timed out waiting for completion of [{}]", task);
}
public static long waitForCompletionTimeout(TimeValue timeout) {
if (timeout == null) {
timeout = DEFAULT_WAIT_FOR_COMPLETION_TIMEOUT;
}
return System.nanoTime() + timeout.nanos();
}
private static final TimeValue WAIT_FOR_COMPLETION_POLL = timeValueMillis(100);
private static final TimeValue DEFAULT_WAIT_FOR_COMPLETION_TIMEOUT = timeValueSeconds(30);
@ -75,35 +97,18 @@ public class TransportListTasksAction extends TransportTasksAction<Task, ListTas
@Override
protected void processTasks(ListTasksRequest request, Consumer<Task> operation) {
if (false == request.getWaitForCompletion()) {
super.processTasks(request, operation);
return;
}
// If we should wait for completion then we have to intercept every found task and wait for it to leave the manager.
TimeValue timeout = request.getTimeout();
if (timeout == null) {
timeout = DEFAULT_WAIT_FOR_COMPLETION_TIMEOUT;
}
long timeoutTime = System.nanoTime() + timeout.nanos();
super.processTasks(request, operation.andThen((Task t) -> {
while (System.nanoTime() - timeoutTime < 0) {
Task task = taskManager.getTask(t.getId());
if (task == null) {
return;
}
if (request.getWaitForCompletion()) {
long timeoutNanos = waitForCompletionTimeout(request.getTimeout());
operation = operation.andThen(task -> {
if (task.getAction().startsWith(ListTasksAction.NAME)) {
// It doesn't make sense to wait for List Tasks and it can cause an infinite loop of the task waiting
// for itself of one of its child tasks
// for itself or one of its child tasks
return;
}
try {
Thread.sleep(WAIT_FOR_COMPLETION_POLL.millis());
} catch (InterruptedException e) {
throw new ElasticsearchException("Interrupted waiting for completion of [{}]", e, t);
}
}
throw new ElasticsearchTimeoutException("Timed out waiting for completion of [{}]", t);
}));
waitForTaskCompletion(taskManager, task, timeoutNanos);
});
}
super.processTasks(request, operation);
}
@Override

View File

@ -88,5 +88,20 @@ public class ReplicationTask extends Task {
public void writeTo(StreamOutput out) throws IOException {
out.writeString(phase);
}
// Implements equals and hashcode for testing
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != ReplicationTask.Status.class) {
return false;
}
ReplicationTask.Status other = (Status) obj;
return phase.equals(other.phase);
}
@Override
public int hashCode() {
return phase.hashCode();
}
}
}

View File

@ -22,6 +22,7 @@ import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.tasks.TaskId;
/**
* Builder for task-based requests
@ -36,6 +37,15 @@ public class TasksRequestBuilder<
super(client, action, request);
}
/**
* Set the task to lookup.
*/
@SuppressWarnings("unchecked")
public final RequestBuilder setTaskId(TaskId taskId) {
request.setTaskId(taskId);
return (RequestBuilder) this;
}
@SuppressWarnings("unchecked")
public final RequestBuilder setNodesIds(String... nodesIds) {
request.setNodesIds(nodesIds);

View File

@ -39,6 +39,9 @@ import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequestBuilder;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequestBuilder;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequestBuilder;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
@ -112,6 +115,7 @@ import org.elasticsearch.action.ingest.SimulatePipelineResponse;
import org.elasticsearch.action.ingest.WritePipelineResponse;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.tasks.TaskId;
/**
* Administrative actions/operations against indices.
@ -303,6 +307,34 @@ public interface ClusterAdminClient extends ElasticsearchClient {
*/
ListTasksRequestBuilder prepareListTasks(String... nodesIds);
/**
* Get a task.
*
* @param request the request
* @return the result future
* @see org.elasticsearch.client.Requests#getTaskRequest()
*/
ActionFuture<GetTaskResponse> getTask(GetTaskRequest request);
/**
* Get a task.
*
* @param request the request
* @param listener A listener to be notified with the result
* @see org.elasticsearch.client.Requests#getTaskRequest()
*/
void getTask(GetTaskRequest request, ActionListener<GetTaskResponse> listener);
/**
* Fetch a task by id.
*/
GetTaskRequestBuilder prepareGetTask(String taskId);
/**
* Fetch a task by id.
*/
GetTaskRequestBuilder prepareGetTask(TaskId taskId);
/**
* Cancel tasks
*

View File

@ -23,6 +23,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
@ -406,6 +407,16 @@ public class Requests {
return new ListTasksRequest();
}
/**
* Creates a get task request.
*
* @return The nodes tasks request
* @see org.elasticsearch.client.ClusterAdminClient#getTask(GetTaskRequest)
*/
public static GetTaskRequest getTaskRequest() {
return new GetTaskRequest();
}
/**
* Creates a nodes tasks request against one or more nodes. Pass <tt>null</tt> or an empty array for all nodes.
*

View File

@ -49,6 +49,10 @@ import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksActio
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequestBuilder;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequestBuilder;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequestBuilder;
@ -109,6 +113,18 @@ import org.elasticsearch.action.admin.cluster.stats.ClusterStatsAction;
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsRequest;
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsRequestBuilder;
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse;
import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptAction;
import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptRequest;
import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptRequestBuilder;
import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptResponse;
import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptAction;
import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptRequest;
import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptRequestBuilder;
import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptResponse;
import org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptAction;
import org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptRequest;
import org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptRequestBuilder;
import org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptResponse;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksAction;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequest;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequestBuilder;
@ -272,18 +288,6 @@ import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptAction;
import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptRequest;
import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptRequestBuilder;
import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptResponse;
import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptAction;
import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptRequest;
import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptRequestBuilder;
import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptResponse;
import org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptAction;
import org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptRequest;
import org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptRequestBuilder;
import org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptResponse;
import org.elasticsearch.action.ingest.DeletePipelineAction;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.DeletePipelineRequestBuilder;
@ -339,6 +343,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Map;
@ -851,6 +856,25 @@ public abstract class AbstractClient extends AbstractComponent implements Client
return new ListTasksRequestBuilder(this, ListTasksAction.INSTANCE).setNodesIds(nodesIds);
}
@Override
public ActionFuture<GetTaskResponse> getTask(final GetTaskRequest request) {
return execute(GetTaskAction.INSTANCE, request);
}
@Override
public void getTask(final GetTaskRequest request, final ActionListener<GetTaskResponse> listener) {
execute(GetTaskAction.INSTANCE, request, listener);
}
@Override
public GetTaskRequestBuilder prepareGetTask(String taskId) {
return prepareGetTask(new TaskId(taskId));
}
@Override
public GetTaskRequestBuilder prepareGetTask(TaskId taskId) {
return new GetTaskRequestBuilder(this, GetTaskAction.INSTANCE).setTaskId(taskId);
}
@Override
public ActionFuture<CancelTasksResponse> cancelTasks(CancelTasksRequest request) {

View File

@ -63,7 +63,7 @@ import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.ExtensionPoint;
import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.tasks.TaskResultsService;
import org.elasticsearch.tasks.TaskPersistenceService;
import java.util.Arrays;
import java.util.Collections;
@ -156,6 +156,6 @@ public class ClusterModule extends AbstractModule {
bind(ShardStateAction.class).asEagerSingleton();
bind(NodeMappingRefreshAction.class).asEagerSingleton();
bind(MappingUpdatedAction.class).asEagerSingleton();
bind(TaskResultsService.class).asEagerSingleton();
bind(TaskPersistenceService.class).asEagerSingleton();
}
}

View File

@ -49,6 +49,7 @@ import org.elasticsearch.rest.action.admin.cluster.node.hotthreads.RestNodesHotT
import org.elasticsearch.rest.action.admin.cluster.node.info.RestNodesInfoAction;
import org.elasticsearch.rest.action.admin.cluster.node.stats.RestNodesStatsAction;
import org.elasticsearch.rest.action.admin.cluster.node.tasks.RestCancelTasksAction;
import org.elasticsearch.rest.action.admin.cluster.node.tasks.RestGetTaskAction;
import org.elasticsearch.rest.action.admin.cluster.node.tasks.RestListTasksAction;
import org.elasticsearch.rest.action.admin.cluster.repositories.delete.RestDeleteRepositoryAction;
import org.elasticsearch.rest.action.admin.cluster.repositories.get.RestGetRepositoriesAction;
@ -146,6 +147,7 @@ import org.elasticsearch.rest.action.suggest.RestSuggestAction;
import org.elasticsearch.rest.action.termvectors.RestMultiTermVectorsAction;
import org.elasticsearch.rest.action.termvectors.RestTermVectorsAction;
import org.elasticsearch.rest.action.update.RestUpdateAction;
import org.elasticsearch.tasks.RawTaskStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;
@ -277,6 +279,7 @@ public class NetworkModule extends AbstractModule {
// Tasks API
RestListTasksAction.class,
RestGetTaskAction.class,
RestCancelTasksAction.class,
// Ingest API
@ -339,6 +342,7 @@ public class NetworkModule extends AbstractModule {
registerTransport(LOCAL_TRANSPORT, LocalTransport.class);
registerTransport(NETTY_TRANSPORT, NettyTransport.class);
registerTaskStatus(ReplicationTask.Status.NAME, ReplicationTask.Status::new);
registerTaskStatus(RawTaskStatus.NAME, RawTaskStatus::new);
registerBuiltinAllocationCommands();
if (transportClient == false) {

View File

@ -21,7 +21,9 @@ package org.elasticsearch.common.xcontent;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcherSupplier;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import java.io.IOException;
import java.util.ArrayList;
@ -123,6 +125,17 @@ public abstract class AbstractObjectParser<Value, Context extends ParseFieldMatc
declareField(consumer, (p, c) -> parseArray(p, p::intValue), field, ValueType.INT_ARRAY);
}
public void declareRawObject(BiConsumer<Value, BytesReference> consumer, ParseField field) {
NoContextParser<BytesReference> bytesParser = p -> {
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
builder.prettyPrint();
builder.copyCurrentStructure(p);
return builder.bytes();
}
};
declareField(consumer, bytesParser, field, ValueType.OBJECT);
}
private interface IOSupplier<T> {
T get() throws IOException;
}

View File

@ -328,6 +328,10 @@ public class JsonXContentGenerator implements XContentGenerator {
if (mayWriteRawData(contentType) == false) {
copyRawValue(content, contentType.xContent());
} else {
if (generator.getOutputContext().getCurrentName() != null) {
// If we've just started a field we'll need to add the separator
generator.writeRaw(':');
}
flush();
content.writeTo(os);
writeEndRaw();

View File

@ -97,7 +97,7 @@ import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.snapshots.SnapshotShardsService;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.tasks.TaskResultsService;
import org.elasticsearch.tasks.TaskPersistenceService;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPoolModule;
@ -340,7 +340,7 @@ public class Node implements Closeable {
// Start the transport service now so the publish address will be added to the local disco node in ClusterService
TransportService transportService = injector.getInstance(TransportService.class);
transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));
transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskPersistenceService.class));
transportService.start();
validateNodeBeforeAcceptingRequests(settings, transportService.boundAddress());

View File

@ -19,9 +19,11 @@
package org.elasticsearch.rest.action.admin.cluster.node.tasks;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -33,13 +35,16 @@ import org.elasticsearch.rest.action.support.RestToXContentListener;
import org.elasticsearch.tasks.TaskId;
import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.rest.action.admin.cluster.node.tasks.RestListTasksAction.nodeSettingListener;
public class RestCancelTasksAction extends BaseRestHandler {
private final ClusterService clusterService;
@Inject
public RestCancelTasksAction(Settings settings, RestController controller, Client client) {
public RestCancelTasksAction(Settings settings, RestController controller, Client client, ClusterService clusterService) {
super(settings, client);
this.clusterService = clusterService;
controller.registerHandler(POST, "/_tasks/_cancel", this);
controller.registerHandler(POST, "/_tasks/{taskId}/_cancel", this);
}
@ -56,6 +61,7 @@ public class RestCancelTasksAction extends BaseRestHandler {
cancelTasksRequest.setNodesIds(nodesIds);
cancelTasksRequest.setActions(actions);
cancelTasksRequest.setParentTaskId(parentTaskId);
client.admin().cluster().cancelTasks(cancelTasksRequest, new RestToXContentListener<>(channel));
ActionListener<CancelTasksResponse> listener = nodeSettingListener(clusterService, new RestToXContentListener<>(channel));
client.admin().cluster().cancelTasks(cancelTasksRequest, listener);
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.admin.cluster.node.tasks;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.support.RestToXContentListener;
import org.elasticsearch.tasks.TaskId;
import static org.elasticsearch.rest.RestRequest.Method.GET;
public class RestGetTaskAction extends BaseRestHandler {
@Inject
public RestGetTaskAction(Settings settings, RestController controller, Client client) {
super(settings, client);
controller.registerHandler(GET, "/_tasks/{taskId}", this);
}
@Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
TaskId taskId = new TaskId(request.param("taskId"));
boolean waitForCompletion = request.paramAsBoolean("wait_for_completion", false);
TimeValue timeout = request.paramAsTime("timeout", null);
GetTaskRequest getTaskRequest = new GetTaskRequest();
getTaskRequest.setTaskId(taskId);
getTaskRequest.setWaitForCompletion(waitForCompletion);
getTaskRequest.setTimeout(timeout);
client.admin().cluster().getTask(getTaskRequest, new RestToXContentListener<>(channel));
}
}

View File

@ -19,8 +19,11 @@
package org.elasticsearch.rest.action.admin.cluster.node.tasks;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -36,25 +39,24 @@ import static org.elasticsearch.rest.RestRequest.Method.GET;
public class RestListTasksAction extends BaseRestHandler {
private final ClusterService clusterService;
@Inject
public RestListTasksAction(Settings settings, RestController controller, Client client) {
public RestListTasksAction(Settings settings, RestController controller, Client client, ClusterService clusterService) {
super(settings, client);
this.clusterService = clusterService;
controller.registerHandler(GET, "/_tasks", this);
controller.registerHandler(GET, "/_tasks/{taskId}", this);
}
public static ListTasksRequest generateListTasksRequest(RestRequest request) {
boolean detailed = request.paramAsBoolean("detailed", false);
String[] nodesIds = Strings.splitStringByCommaToArray(request.param("node_id"));
TaskId taskId = new TaskId(request.param("taskId", request.param("task_id")));
String[] actions = Strings.splitStringByCommaToArray(request.param("actions"));
TaskId parentTaskId = new TaskId(request.param("parent_task_id"));
boolean waitForCompletion = request.paramAsBoolean("wait_for_completion", false);
TimeValue timeout = request.paramAsTime("timeout", null);
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setTaskId(taskId);
listTasksRequest.setNodesIds(nodesIds);
listTasksRequest.setDetailed(detailed);
listTasksRequest.setActions(actions);
@ -66,6 +68,27 @@ public class RestListTasksAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
client.admin().cluster().listTasks(generateListTasksRequest(request), new RestToXContentListener<>(channel));
ActionListener<ListTasksResponse> listener = nodeSettingListener(clusterService, new RestToXContentListener<>(channel));
client.admin().cluster().listTasks(generateListTasksRequest(request), listener);
}
/**
* Wrap the normal channel listener in one that sets the discovery nodes on the response so we can support all of it's toXContent
* formats.
*/
public static <T extends ListTasksResponse> ActionListener<T> nodeSettingListener(ClusterService clusterService,
ActionListener<T> channelListener) {
return new ActionListener<T>() {
@Override
public void onResponse(T response) {
response.setDiscoveryNodes(clusterService.state().nodes());
channelListener.onResponse(response);
}
@Override
public void onFailure(Throwable e) {
channelListener.onFailure(e);
}
};
}
}

View File

@ -19,12 +19,12 @@
package org.elasticsearch.rest.action.cat;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.Table;
import org.elasticsearch.common.inject.Inject;
@ -37,7 +37,7 @@ import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.action.support.RestResponseListener;
import org.elasticsearch.rest.action.support.RestTable;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
@ -48,11 +48,13 @@ import static org.elasticsearch.rest.RestRequest.Method.GET;
import static org.elasticsearch.rest.action.admin.cluster.node.tasks.RestListTasksAction.generateListTasksRequest;
public class RestTasksAction extends AbstractCatAction {
private final ClusterService clusterService;
@Inject
public RestTasksAction(Settings settings, RestController controller, Client client) {
public RestTasksAction(Settings settings, RestController controller, Client client, ClusterService clusterService) {
super(settings, controller, client);
controller.registerHandler(GET, "/_cat/tasks", this);
this.clusterService = clusterService;
}
@Override
@ -104,9 +106,10 @@ public class RestTasksAction extends AbstractCatAction {
private DateTimeFormatter dateFormat = DateTimeFormat.forPattern("HH:mm:ss");
private void buildRow(Table table, boolean fullId, boolean detailed, TaskInfo taskInfo) {
private void buildRow(Table table, boolean fullId, boolean detailed, DiscoveryNodes discoveryNodes, TaskInfo taskInfo) {
table.startRow();
DiscoveryNode node = taskInfo.getNode();
String nodeId = taskInfo.getTaskId().getNodeId();
DiscoveryNode node = discoveryNodes.get(nodeId);
table.addCell(taskInfo.getId());
table.addCell(taskInfo.getAction());
@ -122,15 +125,16 @@ public class RestTasksAction extends AbstractCatAction {
table.addCell(taskInfo.getRunningTimeNanos());
table.addCell(TimeValue.timeValueNanos(taskInfo.getRunningTimeNanos()).toString());
table.addCell(fullId ? node.getId() : Strings.substring(node.getId(), 0, 4));
table.addCell(node.getHostAddress());
if (node.getAddress() instanceof InetSocketTransportAddress) {
// Node information. Note that the node may be null because it has left the cluster between when we got this response and now.
table.addCell(fullId ? nodeId : Strings.substring(nodeId, 0, 4));
table.addCell(node == null ? "-" : node.getHostAddress());
if (node != null && node.getAddress() instanceof InetSocketTransportAddress) {
table.addCell(((InetSocketTransportAddress) node.getAddress()).address().getPort());
} else {
table.addCell("-");
}
table.addCell(node.getName());
table.addCell(node.getVersion().toString());
table.addCell(node == null ? "-" : node.getName());
table.addCell(node == null ? "-" : node.getVersion().toString());
if (detailed) {
table.addCell(taskInfo.getDescription());
@ -139,10 +143,11 @@ public class RestTasksAction extends AbstractCatAction {
}
private void buildGroups(Table table, boolean detailed, boolean fullId, List<TaskGroup> taskGroups) {
DiscoveryNodes discoveryNodes = clusterService.state().nodes();
List<TaskGroup> sortedGroups = new ArrayList<>(taskGroups);
sortedGroups.sort((o1, o2) -> Long.compare(o1.getTaskInfo().getStartTime(), o2.getTaskInfo().getStartTime()));
for (TaskGroup taskGroup : sortedGroups) {
buildRow(table, fullId, detailed, taskGroup.getTaskInfo());
buildRow(table, fullId, detailed, discoveryNodes, taskGroup.getTaskInfo());
buildGroups(table, fullId, detailed, taskGroup.getChildTasks());
}
}

View File

@ -0,0 +1,224 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.tasks;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcherSupplier;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import static java.util.Objects.requireNonNull;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
import static org.elasticsearch.common.xcontent.XContentHelper.convertToMap;
/**
* Information about a persisted or running task. Running tasks just have a {@link #getTask()} while persisted tasks will have either a
* {@link #getError()} or {@link #getResult()}.
*/
public final class PersistedTaskInfo implements Writeable, ToXContent {
private final TaskInfo task;
@Nullable
private final BytesReference error;
@Nullable
private final BytesReference result;
/**
* Construct a {@linkplain PersistedTaskInfo} for a running task.
*/
public PersistedTaskInfo(TaskInfo task) {
this(task, null, null);
}
/**
* Construct a {@linkplain PersistedTaskInfo} for a task that completed with an error.
*/
public PersistedTaskInfo(TaskInfo task, Throwable error) throws IOException {
this(task, toXContent(error), null);
}
/**
* Construct a {@linkplain PersistedTaskInfo} for a task that completed successfully.
*/
public PersistedTaskInfo(TaskInfo task, ToXContent result) throws IOException {
this(task, null, toXContent(result));
}
private PersistedTaskInfo(TaskInfo task, @Nullable BytesReference error, @Nullable BytesReference result) {
this.task = requireNonNull(task, "task is required");
this.error = error;
this.result = result;
}
/**
* Read from a stream.
*/
public PersistedTaskInfo(StreamInput in) throws IOException {
task = new TaskInfo(in);
error = in.readOptionalBytesReference();
result = in.readOptionalBytesReference();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
task.writeTo(out);
out.writeOptionalBytesReference(error);
out.writeOptionalBytesReference(result);
}
/**
* Get the task that this wraps.
*/
public TaskInfo getTask() {
return task;
}
/**
* Get the error that finished this task. Will return null if the task didn't finish with an error or it hasn't yet finished.
*/
public BytesReference getError() {
return error;
}
/**
* Convert {@link #getError()} from XContent to a Map for easy processing. Will return null if the task didn't finish with an error or
* hasn't yet finished.
*/
public Map<String, Object> getErrorAsMap() {
if (error == null) {
return null;
}
return convertToMap(error, false).v2();
}
/**
* Get the result that this task finished with. Will return null if the task was finished by an error or it hasn't yet finished.
*/
public BytesReference getResult() {
return result;
}
/**
* Convert {@link #getResult()} from XContent to a Map for easy processing. Will return null if the task was finished with an error or
* hasn't yet finished.
*/
public Map<String, Object> getResultAsMap() {
if (result == null) {
return null;
}
return convertToMap(result, false).v2();
}
/**
* Was the task completed before returned?
*/
public boolean isCompleted() {
return error != null || result != null;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
innerToXContent(builder, params);
return builder.endObject();
}
public XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("task", task);
if (error != null) {
XContentHelper.writeRawField("error", error, builder, params);
}
if (result != null) {
XContentHelper.writeRawField("result", result, builder, params);
}
return builder;
}
public static final ConstructingObjectParser<PersistedTaskInfo, ParseFieldMatcherSupplier> PARSER = new ConstructingObjectParser<>(
"persisted_task_info", a -> new PersistedTaskInfo((TaskInfo) a[0], (BytesReference) a[1], (BytesReference) a[2]));
static {
PARSER.declareObject(constructorArg(), TaskInfo.PARSER, new ParseField("task"));
PARSER.declareRawObject(optionalConstructorArg(), new ParseField("error"));
PARSER.declareRawObject(optionalConstructorArg(), new ParseField("result"));
}
@Override
public String toString() {
return Strings.toString(this);
}
// Implements equals and hashcode for testing
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != PersistedTaskInfo.class) {
return false;
}
PersistedTaskInfo other = (PersistedTaskInfo) obj;
/*
* Equality of error and result is done by converting them to a map first. Not efficient but ignores field order and spacing
* differences so perfect for testing.
*/
return Objects.equals(task, other.task)
&& Objects.equals(getErrorAsMap(), other.getErrorAsMap())
&& Objects.equals(getResultAsMap(), other.getResultAsMap());
}
@Override
public int hashCode() {
/*
* Hashing of error and result is done by converting them to a map first. Not efficient but ignores field order and spacing
* differences so perfect for testing.
*/
return Objects.hash(task, getErrorAsMap(), getResultAsMap());
}
private static BytesReference toXContent(ToXContent result) throws IOException {
try (XContentBuilder builder = XContentFactory.contentBuilder(Requests.INDEX_CONTENT_TYPE)) {
// Elasticsearch's Response object never emit starting or ending objects. Most other implementers of ToXContent do....
builder.startObject();
result.toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
return builder.bytes();
}
}
private static BytesReference toXContent(Throwable error) throws IOException {
try (XContentBuilder builder = XContentFactory.contentBuilder(Requests.INDEX_CONTENT_TYPE)) {
builder.startObject();
ElasticsearchException.toXContent(builder, ToXContent.EMPTY_PARAMS, error);
builder.endObject();
return builder.bytes();
}
}
}

View File

@ -0,0 +1,96 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.tasks;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Map;
import static java.util.Objects.requireNonNull;
import static org.elasticsearch.common.xcontent.XContentHelper.convertToMap;
/**
* Raw, unparsed status from the task results index.
*/
public class RawTaskStatus implements Task.Status {
public static final String NAME = "raw";
private final BytesReference status;
public RawTaskStatus(BytesReference status) {
this.status = requireNonNull(status, "status may not be null");
}
/**
* Read from a stream.
*/
public RawTaskStatus(StreamInput in) throws IOException {
status = in.readOptionalBytesReference();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalBytesReference(status);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.rawValue(status);
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
public String toString() {
return Strings.toString(this);
}
/**
* Convert the from XContent to a Map for easy reading.
*/
public Map<String, Object> toMap() {
return convertToMap(status, false).v2();
}
// Implements equals and hashcode for testing
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != RawTaskStatus.class) {
return false;
}
RawTaskStatus other = (RawTaskStatus) obj;
// Totally not efficient, but ok for testing because it ignores order and spacing differences
return toMap().equals(other.toMap());
}
@Override
public int hashCode() {
// Totally not efficient, but ok for testing because consistent with equals
return toMap().hashCode();
}
}

View File

@ -21,7 +21,6 @@
package org.elasticsearch.tasks;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.xcontent.ToXContent;
@ -78,8 +77,8 @@ public class Task {
description = getDescription();
status = getStatus();
}
return new TaskInfo(node, getId(), getType(), getAction(), description, status, startTime, System.nanoTime() - startTimeNanos,
this instanceof CancellableTask, parentTask);
return new TaskInfo(new TaskId(node.getId(), getId()), getType(), getAction(), description, status, startTime,
System.nanoTime() - startTimeNanos, this instanceof CancellableTask, parentTask);
}
/**
@ -136,13 +135,13 @@ public class Task {
public interface Status extends ToXContent, NamedWriteable {}
public TaskResult result(DiscoveryNode node, Throwable error) throws IOException {
return new TaskResult(taskInfo(node, true), error);
public PersistedTaskInfo result(DiscoveryNode node, Throwable error) throws IOException {
return new PersistedTaskInfo(taskInfo(node, true), error);
}
public TaskResult result(DiscoveryNode node, ActionResponse response) throws IOException {
public PersistedTaskInfo result(DiscoveryNode node, ActionResponse response) throws IOException {
if (response instanceof ToXContent) {
return new TaskResult(taskInfo(node, true), (ToXContent) response);
return new PersistedTaskInfo(taskInfo(node, true), (ToXContent) response);
} else {
throw new IllegalStateException("response has to implement ToXContent for persistence");
}

View File

@ -17,20 +17,27 @@
* under the License.
*/
package org.elasticsearch.action.admin.cluster.node.tasks.list;
package org.elasticsearch.tasks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcherSupplier;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
/**
* Information about a currently running task.
* <p>
@ -39,10 +46,7 @@ import java.util.concurrent.TimeUnit;
* and use in APIs. Instead, immutable and streamable TaskInfo objects are used to represent
* snapshot information about currently running tasks.
*/
public class TaskInfo implements Writeable, ToXContent {
private final DiscoveryNode node;
public final class TaskInfo implements Writeable, ToXContent {
private final TaskId taskId;
private final String type;
@ -61,10 +65,9 @@ public class TaskInfo implements Writeable, ToXContent {
private final TaskId parentTaskId;
public TaskInfo(DiscoveryNode node, long id, String type, String action, String description, Task.Status status, long startTime,
public TaskInfo(TaskId taskId, String type, String action, String description, Task.Status status, long startTime,
long runningTimeNanos, boolean cancellable, TaskId parentTaskId) {
this.node = node;
this.taskId = new TaskId(node.getId(), id);
this.taskId = taskId;
this.type = type;
this.action = action;
this.description = description;
@ -79,8 +82,7 @@ public class TaskInfo implements Writeable, ToXContent {
* Read from a stream.
*/
public TaskInfo(StreamInput in) throws IOException {
node = new DiscoveryNode(in);
taskId = new TaskId(node.getId(), in.readLong());
taskId = TaskId.readFromStream(in);
type = in.readString();
action = in.readString();
description = in.readOptionalString();
@ -93,8 +95,7 @@ public class TaskInfo implements Writeable, ToXContent {
@Override
public void writeTo(StreamOutput out) throws IOException {
node.writeTo(out);
out.writeLong(taskId.getId());
taskId.writeTo(out);
out.writeString(type);
out.writeString(action);
out.writeOptionalString(description);
@ -109,10 +110,6 @@ public class TaskInfo implements Writeable, ToXContent {
return taskId;
}
public DiscoveryNode getNode() {
return node;
}
public long getId() {
return taskId.getId();
}
@ -167,7 +164,13 @@ public class TaskInfo implements Writeable, ToXContent {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("node", node.getId());
builder.startObject();
innerToXContent(builder, params);
return builder.endObject();
}
public XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("node", taskId.getNodeId());
builder.field("id", taskId.getId());
builder.field("type", type);
builder.field("action", action);
@ -185,4 +188,63 @@ public class TaskInfo implements Writeable, ToXContent {
}
return builder;
}
public static final ConstructingObjectParser<TaskInfo, ParseFieldMatcherSupplier> PARSER = new ConstructingObjectParser<>(
"task_info", a -> {
int i = 0;
TaskId id = new TaskId((String) a[i++], (Long) a[i++]);
String type = (String) a[i++];
String action = (String) a[i++];
String description = (String) a[i++];
BytesReference statusBytes = (BytesReference) a[i++];
long startTime = (Long) a[i++];
long runningTimeNanos = (Long) a[i++];
boolean cancellable = (Boolean) a[i++];
String parentTaskIdString = (String) a[i++];
RawTaskStatus status = statusBytes == null ? null : new RawTaskStatus(statusBytes);
TaskId parentTaskId = parentTaskIdString == null ? TaskId.EMPTY_TASK_ID : new TaskId((String) parentTaskIdString);
return new TaskInfo(id, type, action, description, status, startTime, runningTimeNanos, cancellable, parentTaskId);
});
static {
// Note for the future: this has to be backwards compatible with all changes to the task persistence format
PARSER.declareString(constructorArg(), new ParseField("node"));
PARSER.declareLong(constructorArg(), new ParseField("id"));
PARSER.declareString(constructorArg(), new ParseField("type"));
PARSER.declareString(constructorArg(), new ParseField("action"));
PARSER.declareString(optionalConstructorArg(), new ParseField("description"));
PARSER.declareRawObject(optionalConstructorArg(), new ParseField("status"));
PARSER.declareLong(constructorArg(), new ParseField("start_time_in_millis"));
PARSER.declareLong(constructorArg(), new ParseField("running_time_in_nanos"));
PARSER.declareBoolean(constructorArg(), new ParseField("cancellable"));
PARSER.declareString(optionalConstructorArg(), new ParseField("parent_task_id"));
}
@Override
public String toString() {
return Strings.toString(this);
}
// Implements equals and hashCode for testing
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != TaskInfo.class) {
return false;
}
TaskInfo other = (TaskInfo) obj;
return Objects.equals(taskId, other.taskId)
&& Objects.equals(type, other.type)
&& Objects.equals(action, other.action)
&& Objects.equals(description, other.description)
&& Objects.equals(startTime, other.startTime)
&& Objects.equals(runningTimeNanos, other.runningTimeNanos)
&& Objects.equals(parentTaskId, other.parentTaskId)
&& Objects.equals(cancellable, other.cancellable)
&& Objects.equals(status, other.status);
}
@Override
public int hashCode() {
return Objects.hash(taskId, type, action, description, startTime, runningTimeNanos, parentTaskId, cancellable, status);
}
}

View File

@ -57,7 +57,7 @@ public class TaskManager extends AbstractComponent implements ClusterStateListen
private final Map<TaskId, String> banedParents = new ConcurrentHashMap<>();
private TaskResultsService taskResultsService;
private TaskPersistenceService taskResultsService;
private DiscoveryNodes lastDiscoveryNodes = DiscoveryNodes.EMPTY_NODES;
@ -65,7 +65,7 @@ public class TaskManager extends AbstractComponent implements ClusterStateListen
super(settings);
}
public void setTaskResultsService(TaskResultsService taskResultsService) {
public void setTaskResultsService(TaskPersistenceService taskResultsService) {
assert this.taskResultsService == null;
this.taskResultsService = taskResultsService;
}
@ -145,14 +145,14 @@ public class TaskManager extends AbstractComponent implements ClusterStateListen
/**
* Stores the task failure
*/
public <Response extends ActionResponse> void persistResult(Task task, Throwable error, ActionListener<Response> listener) {
public <Response extends ActionResponse> void persistResult(Task task, Throwable error, ActionListener<Response> listener) {
DiscoveryNode localNode = lastDiscoveryNodes.getLocalNode();
if (localNode == null) {
// too early to persist anything, shouldn't really be here - just pass the error along
listener.onFailure(error);
return;
}
final TaskResult taskResult;
final PersistedTaskInfo taskResult;
try {
taskResult = task.result(localNode, error);
} catch (IOException ex) {
@ -177,7 +177,7 @@ public class TaskManager extends AbstractComponent implements ClusterStateListen
/**
* Stores the task result
*/
public <Response extends ActionResponse> void persistResult(Task task, Response response, ActionListener<Response> listener) {
public <Response extends ActionResponse> void persistResult(Task task, Response response, ActionListener<Response> listener) {
DiscoveryNode localNode = lastDiscoveryNodes.getLocalNode();
if (localNode == null) {
// too early to persist anything, shouldn't really be here - just pass the response along
@ -185,7 +185,7 @@ public class TaskManager extends AbstractComponent implements ClusterStateListen
listener.onResponse(response);
return;
}
final TaskResult taskResult;
final PersistedTaskInfo taskResult;
try {
taskResult = task.result(localNode, response);
} catch (IOException ex) {

View File

@ -19,14 +19,17 @@
package org.elasticsearch.tasks;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.service.ClusterService;
@ -34,21 +37,25 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
/**
* Service that can persist task results
* Service that can persist tasks and their results.
*/
public class TaskResultsService extends AbstractComponent {
public class TaskPersistenceService extends AbstractComponent {
public static final String TASK_RESULT_INDEX = ".results";
public static final String TASK_INDEX = ".tasks";
public static final String TASK_RESULT_TYPE = "result";
public static final String TASK_TYPE = "task";
public static final String TASK_RESULT_INDEX_MAPPING_FILE = "task-results-index-mapping.json";
public static final String TASK_RESULT_INDEX_MAPPING_FILE = "task-index-mapping.json";
private final Client client;
@ -57,7 +64,7 @@ public class TaskResultsService extends AbstractComponent {
private final TransportCreateIndexAction createIndexAction;
@Inject
public TaskResultsService(Settings settings, Client client, ClusterService clusterService,
public TaskPersistenceService(Settings settings, Client client, ClusterService clusterService,
TransportCreateIndexAction createIndexAction) {
super(settings);
this.client = client;
@ -65,15 +72,15 @@ public class TaskResultsService extends AbstractComponent {
this.createIndexAction = createIndexAction;
}
public void persist(TaskResult taskResult, ActionListener<Void> listener) {
public void persist(PersistedTaskInfo taskResult, ActionListener<Void> listener) {
ClusterState state = clusterService.state();
if (state.routingTable().hasIndex(TASK_RESULT_INDEX) == false) {
if (state.routingTable().hasIndex(TASK_INDEX) == false) {
CreateIndexRequest createIndexRequest = new CreateIndexRequest();
createIndexRequest.settings(taskResultIndexSettings());
createIndexRequest.index(TASK_RESULT_INDEX);
createIndexRequest.mapping(TASK_RESULT_TYPE, taskResultIndexMapping());
createIndexRequest.index(TASK_INDEX);
createIndexRequest.mapping(TASK_TYPE, taskResultIndexMapping());
createIndexRequest.cause("auto(task api)");
createIndexAction.execute(null, createIndexRequest, new ActionListener<CreateIndexResponse>() {
@ -97,10 +104,10 @@ public class TaskResultsService extends AbstractComponent {
}
});
} else {
IndexMetaData metaData = state.getMetaData().index(TASK_RESULT_INDEX);
if (metaData.getMappings().containsKey(TASK_RESULT_TYPE) == false) {
IndexMetaData metaData = state.getMetaData().index(TASK_INDEX);
if (metaData.getMappings().containsKey(TASK_TYPE) == false) {
// The index already exists but doesn't have our mapping
client.admin().indices().preparePutMapping(TASK_RESULT_INDEX).setType(TASK_RESULT_TYPE).setSource(taskResultIndexMapping())
client.admin().indices().preparePutMapping(TASK_INDEX).setType(TASK_TYPE).setSource(taskResultIndexMapping())
.execute(new ActionListener<PutMappingResponse>() {
@Override
public void onResponse(PutMappingResponse putMappingResponse) {
@ -120,20 +127,25 @@ public class TaskResultsService extends AbstractComponent {
}
private void doPersist(TaskResult taskResult, ActionListener<Void> listener) {
client.prepareIndex(TASK_RESULT_INDEX, TASK_RESULT_TYPE, taskResult.getTaskId().toString()).setSource(taskResult.getResult())
.execute(new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
listener.onResponse(null);
}
@Override
public void onFailure(Throwable e) {
listener.onFailure(e);
}
});
private void doPersist(PersistedTaskInfo taskResult, ActionListener<Void> listener) {
IndexRequestBuilder index = client.prepareIndex(TASK_INDEX, TASK_TYPE, taskResult.getTask().getTaskId().toString());
try (XContentBuilder builder = XContentFactory.contentBuilder(Requests.INDEX_CONTENT_TYPE)) {
taskResult.toXContent(builder, ToXContent.EMPTY_PARAMS);
index.setSource(builder);
} catch (IOException e) {
throw new ElasticsearchException("Couldn't convert task result to XContent for [{}]", e, taskResult.getTask());
}
index.execute(new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
listener.onResponse(null);
}
@Override
public void onFailure(Throwable e) {
listener.onFailure(e);
}
});
}
private Settings taskResultIndexSettings() {

View File

@ -1,90 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.tasks;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
/**
* Represents the result or failure of a running task
*/
public class TaskResult {
private final BytesReference result;
private final TaskId taskId;
public TaskResult(TaskInfo taskInfo, Throwable e) throws IOException {
ToXContent.Params params = ToXContent.EMPTY_PARAMS;
XContentBuilder builder = XContentFactory.contentBuilder(Requests.INDEX_CONTENT_TYPE);
builder.startObject();
{
builder.startObject("task");
{
taskInfo.toXContent(builder, params);
}
builder.endObject();
builder.startObject("error");
{
ElasticsearchException.toXContent(builder, params, e);
}
builder.endObject();
}
builder.endObject();
result = builder.bytes();
taskId = taskInfo.getTaskId();
}
public TaskResult(TaskInfo taskInfo, ToXContent toXContent) throws IOException {
ToXContent.Params params = ToXContent.EMPTY_PARAMS;
XContentBuilder builder = XContentFactory.contentBuilder(Requests.INDEX_CONTENT_TYPE);
builder.startObject();
{
builder.startObject("task");
{
taskInfo.toXContent(builder, params);
}
builder.endObject();
builder.startObject("result");
{
toXContent.toXContent(builder, params);
}
builder.endObject();
}
builder.endObject();
result = builder.bytes();
taskId = taskInfo.getTaskId();
}
public TaskId getTaskId() {
return taskId;
}
public BytesReference getResult() {
return result;
}
}

View File

@ -1,5 +1,5 @@
{
"result" : {
"task" : {
"dynamic" : "strict",
"properties" : {
"task" : {

View File

@ -25,7 +25,6 @@ import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksReque
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
@ -38,6 +37,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

View File

@ -19,11 +19,11 @@
package org.elasticsearch.action.admin.cluster.node.tasks;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.tasks.MockTaskManagerListener;
import java.util.ArrayList;

View File

@ -233,5 +233,4 @@ public abstract class TaskManagerTestCase extends ESTestCase {
}
return listeners;
}
}

View File

@ -19,14 +19,17 @@
package org.elasticsearch.action.admin.cluster.node.tasks;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeAction;
import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryAction;
@ -43,8 +46,11 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.PersistedTaskInfo;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskResultsService;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.tasks.TaskPersistenceService;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.tasks.MockTaskManager;
import org.elasticsearch.test.tasks.MockTaskManagerListener;
@ -59,22 +65,26 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;
import static java.util.Collections.singleton;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.emptyCollectionOf;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
@ -199,7 +209,7 @@ public class TasksIT extends ESIntegTestCase {
logger.debug("number of shards, total: [{}], primaries: [{}] ", numberOfShards.totalNumShards, numberOfShards.numPrimaries);
logger.debug("main events {}", numberOfEvents(RefreshAction.NAME, Tuple::v1));
logger.debug("main event node {}", findEvents(RefreshAction.NAME, Tuple::v1).get(0).getNode().getName());
logger.debug("main event node {}", findEvents(RefreshAction.NAME, Tuple::v1).get(0).getTaskId().getNodeId());
logger.debug("[s] events {}", numberOfEvents(RefreshAction.NAME + "[s]", Tuple::v1));
logger.debug("[s][*] events {}", numberOfEvents(RefreshAction.NAME + "[s][*]", Tuple::v1));
logger.debug("nodes with the index {}", internalCluster().nodesInclude("test"));
@ -219,15 +229,16 @@ public class TasksIT extends ESIntegTestCase {
TaskInfo mainTask = findEvents(RefreshAction.NAME, Tuple::v1).get(0);
List<TaskInfo> sTasks = findEvents(RefreshAction.NAME + "[s]", Tuple::v1);
for (TaskInfo taskInfo : sTasks) {
if (mainTask.getNode().equals(taskInfo.getNode())) {
if (mainTask.getTaskId().getNodeId().equals(taskInfo.getTaskId().getNodeId())) {
// This shard level task runs on the same node as a parent task - it should have the main task as a direct parent
assertParentTask(Collections.singletonList(taskInfo), mainTask);
} else {
String description = taskInfo.getDescription();
// This shard level task runs on another node - it should have a corresponding shard level task on the node where main task
// is running
List<TaskInfo> sTasksOnRequestingNode = findEvents(RefreshAction.NAME + "[s]", event -> event.v1()
&& mainTask.getNode().equals(event.v2().getNode()) && description.equals(event.v2().getDescription()));
List<TaskInfo> sTasksOnRequestingNode = findEvents(RefreshAction.NAME + "[s]",
event -> event.v1() && mainTask.getTaskId().getNodeId().equals(event.v2().getTaskId().getNodeId())
&& description.equals(event.v2().getDescription()));
// There should be only one parent task
assertEquals(1, sTasksOnRequestingNode.size());
assertParentTask(Collections.singletonList(taskInfo), sTasksOnRequestingNode.get(0));
@ -243,12 +254,13 @@ public class TasksIT extends ESIntegTestCase {
List<TaskInfo> sTask;
if (taskInfo.getAction().endsWith("[s][p]")) {
// A [s][p] level task should have a corresponding [s] level task on the same node
sTask = findEvents(RefreshAction.NAME + "[s]", event -> event.v1() && taskInfo.getNode().equals(event.v2().getNode())
&& taskInfo.getDescription().equals(event.v2().getDescription()));
sTask = findEvents(RefreshAction.NAME + "[s]",
event -> event.v1() && taskInfo.getTaskId().getNodeId().equals(event.v2().getTaskId().getNodeId())
&& taskInfo.getDescription().equals(event.v2().getDescription()));
} else {
// A [s][r] level task should have a corresponding [s] level task on the a different node (where primary is located)
sTask = findEvents(RefreshAction.NAME + "[s]",
event -> event.v1() && taskInfo.getParentTaskId().getNodeId().equals(event.v2().getNode().getId()) && taskInfo
event -> event.v1() && taskInfo.getParentTaskId().getNodeId().equals(event.v2().getTaskId().getNodeId()) && taskInfo
.getDescription()
.equals(event.v2().getDescription()));
}
@ -305,7 +317,7 @@ public class TasksIT extends ESIntegTestCase {
// they all should have the same shard task as a parent
assertEquals(getNumShards("test").numReplicas, numberOfEvents(BulkAction.NAME + "[s][r]", Tuple::v1));
assertParentTask(findEvents(BulkAction.NAME + "[s][r]", Tuple::v1), shardTask);
}
}
/**
* Very basic "is it plugged in" style test that indexes a document and
@ -353,14 +365,28 @@ public class TasksIT extends ESIntegTestCase {
}
ListenableActionFuture<?> indexFuture = client().prepareIndex("test", "test").setSource("test", "test").execute();
taskRegistered.await(10, TimeUnit.SECONDS); // waiting for at least one task to be registered
ListTasksResponse tasks = client().admin().cluster().prepareListTasks().setActions("indices:data/write/index*").setDetailed(true)
.get();
ListTasksResponse listResponse = client().admin().cluster().prepareListTasks().setActions("indices:data/write/index*")
.setDetailed(true).get();
assertThat(listResponse.getTasks(), not(empty()));
for (TaskInfo task : listResponse.getTasks()) {
assertNotNull(task.getStatus());
GetTaskResponse getResponse = client().admin().cluster().prepareGetTask(task.getTaskId()).get();
assertFalse("task should still be running", getResponse.getTask().isCompleted());
TaskInfo fetchedWithGet = getResponse.getTask().getTask();
assertEquals(task.getId(), fetchedWithGet.getId());
assertEquals(task.getType(), fetchedWithGet.getType());
assertEquals(task.getAction(), fetchedWithGet.getAction());
assertEquals(task.getDescription(), fetchedWithGet.getDescription());
assertEquals(task.getStatus(), fetchedWithGet.getStatus());
assertEquals(task.getStartTime(), fetchedWithGet.getStartTime());
assertThat(fetchedWithGet.getRunningTimeNanos(), greaterThanOrEqualTo(task.getRunningTimeNanos()));
assertEquals(task.isCancellable(), fetchedWithGet.isCancellable());
assertEquals(task.getParentTaskId(), fetchedWithGet.getParentTaskId());
}
taskFinishLock.unlock();
indexFuture.get();
assertThat(tasks.getTasks(), not(emptyCollectionOf(TaskInfo.class)));
for (TaskInfo task : tasks.getTasks()) {
assertNotNull(task.getStatus());
}
}
public void testTasksCancellation() throws Exception {
@ -401,60 +427,91 @@ public class TasksIT extends ESIntegTestCase {
.getTasks().size());
}
public void testTasksListWaitForCompletion() throws Exception {
public void testListTasksWaitForCompletion() throws Exception {
waitForCompletionTestCase(id -> {
return client().admin().cluster().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]")
.setWaitForCompletion(true).execute();
}, response -> {
assertThat(response.getNodeFailures(), empty());
assertThat(response.getTaskFailures(), empty());
});
}
public void testGetTaskWaitForCompletion() throws Exception {
waitForCompletionTestCase(id -> {
return client().admin().cluster().prepareGetTask(id).setWaitForCompletion(true).execute();
}, response -> {
// Really we're just happy we didn't get any exceptions
assertNotNull(response.getTask().getTask());
});
}
/**
* Test wait for completion.
* @param wait start waiting for a task. Accepts that id of the task to wait for and returns a future waiting for it.
* @param validator validate the response and return the task ids that were found
*/
private <T> void waitForCompletionTestCase(Function<TaskId, ListenableActionFuture<T>> wait, Consumer<T> validator)
throws Exception {
// Start blocking test task
ListenableActionFuture<TestTaskPlugin.NodesResponse> future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client())
.execute();
ListenableActionFuture<ListTasksResponse> waitResponseFuture;
ListenableActionFuture<T> waitResponseFuture;
TaskId taskId;
try {
// Wait for the task to start on all nodes
assertBusy(() -> assertEquals(internalCluster().size(), client().admin().cluster().prepareListTasks()
.setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").get().getTasks().size()));
taskId = waitForTestTaskStartOnAllNodes();
// Spin up a request to wait for that task to finish
waitResponseFuture = client().admin().cluster().prepareListTasks()
.setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").setWaitForCompletion(true).execute();
waitResponseFuture = wait.apply(taskId);
} finally {
// Unblock the request so the wait for completion request can finish
TestTaskPlugin.UnblockTestTasksAction.INSTANCE.newRequestBuilder(client()).get();
}
// Now that the task is unblocked the list response will come back
ListTasksResponse waitResponse = waitResponseFuture.get();
// If any tasks come back then they are the tasks we asked for - it'd be super weird if this wasn't true
for (TaskInfo task: waitResponse.getTasks()) {
assertEquals(task.getAction(), TestTaskPlugin.TestTaskAction.NAME + "[n]");
}
// See the next test to cover the timeout case
T waitResponse = waitResponseFuture.get();
validator.accept(waitResponse);
future.get();
}
public void testTasksListWaitForTimeout() throws Exception {
public void testListTasksWaitForTimeout() throws Exception {
waitForTimeoutTestCase(id -> {
ListTasksResponse response = client().admin().cluster().prepareListTasks()
.setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").setWaitForCompletion(true).setTimeout(timeValueMillis(100))
.get();
assertThat(response.getNodeFailures(), not(empty()));
return response.getNodeFailures();
});
}
public void testGetTaskWaitForTimeout() throws Exception {
waitForTimeoutTestCase(id -> {
Exception e = expectThrows(Exception.class,
() -> client().admin().cluster().prepareGetTask(id).setWaitForCompletion(true).setTimeout(timeValueMillis(100)).get());
return singleton(e);
});
}
/**
* Test waiting for a task that times out.
* @param wait wait for the running task and return all the failures you accumulated waiting for it
*/
private void waitForTimeoutTestCase(Function<TaskId, ? extends Iterable<? extends Throwable>> wait) throws Exception {
// Start blocking test task
ListenableActionFuture<TestTaskPlugin.NodesResponse> future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client())
.execute();
try {
// Wait for the task to start on all nodes
assertBusy(() -> assertEquals(internalCluster().size(), client().admin().cluster().prepareListTasks()
.setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").get().getTasks().size()));
TaskId taskId = waitForTestTaskStartOnAllNodes();
// Spin up a request that should wait for those tasks to finish
// It will timeout because we haven't unblocked the tasks
ListTasksResponse waitResponse = client().admin().cluster().prepareListTasks()
.setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").setWaitForCompletion(true).setTimeout(timeValueMillis(100))
.get();
Iterable<? extends Throwable> failures = wait.apply(taskId);
assertFalse(waitResponse.getNodeFailures().isEmpty());
for (FailedNodeException failure : waitResponse.getNodeFailures()) {
Throwable timeoutException = failure.getCause();
// The exception sometimes comes back wrapped depending on the client
if (timeoutException.getCause() != null) {
timeoutException = timeoutException.getCause();
}
assertThat(timeoutException,
either(instanceOf(ElasticsearchTimeoutException.class)).or(instanceOf(ReceiveTimeoutTransportException.class)));
for (Throwable failure : failures) {
assertNotNull(
ExceptionsHelper.unwrap(failure, ElasticsearchTimeoutException.class, ReceiveTimeoutTransportException.class));
}
} finally {
// Now we can unblock those requests
@ -463,6 +520,17 @@ public class TasksIT extends ESIntegTestCase {
future.get();
}
private TaskId waitForTestTaskStartOnAllNodes() throws Exception {
AtomicReference<TaskId> result = new AtomicReference<>();
assertBusy(() -> {
List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]")
.get().getTasks();
assertEquals(internalCluster().size(), tasks.size());
result.set(tasks.get(0).getTaskId());
});
return result.get();
}
public void testTasksListWaitForNoTask() throws Exception {
// Spin up a request to wait for no matching tasks
ListenableActionFuture<ListTasksResponse> waitResponseFuture = client().admin().cluster().prepareListTasks()
@ -470,7 +538,17 @@ public class TasksIT extends ESIntegTestCase {
.execute();
// It should finish quickly and without complaint
assertThat(waitResponseFuture.get().getTasks(), emptyCollectionOf(TaskInfo.class));
assertThat(waitResponseFuture.get().getTasks(), empty());
}
public void testTasksGetWaitForNoTask() throws Exception {
// Spin up a request to wait for no matching tasks
ListenableActionFuture<GetTaskResponse> waitResponseFuture = client().admin().cluster().prepareGetTask("notfound:1")
.setWaitForCompletion(true).setTimeout(timeValueMillis(10))
.execute();
// It should finish quickly and without complaint
expectNotFound(() -> waitResponseFuture.get());
}
public void testTasksWaitForAllTask() throws Exception {
@ -488,7 +566,7 @@ public class TasksIT extends ESIntegTestCase {
// Randomly create an empty index to make sure the type is created automatically
if (randomBoolean()) {
logger.info("creating an empty results index with custom settings");
assertAcked(client().admin().indices().prepareCreate(TaskResultsService.TASK_RESULT_INDEX));
assertAcked(client().admin().indices().prepareCreate(TaskPersistenceService.TASK_INDEX));
}
registerTaskManageListeners(TestTaskPlugin.TestTaskAction.NAME); // we need this to get task id of the process
@ -500,34 +578,43 @@ public class TasksIT extends ESIntegTestCase {
assertEquals(1, events.size());
TaskInfo taskInfo = events.get(0);
String taskId = taskInfo.getTaskId().toString();
TaskId taskId = taskInfo.getTaskId();
GetResponse resultDoc = client().prepareGet(TaskResultsService.TASK_RESULT_INDEX, TaskResultsService.TASK_RESULT_TYPE, taskId)
.get();
GetResponse resultDoc = client()
.prepareGet(TaskPersistenceService.TASK_INDEX, TaskPersistenceService.TASK_TYPE, taskId.toString()).get();
assertTrue(resultDoc.isExists());
Map<String, Object> source = resultDoc.getSource();
@SuppressWarnings("unchecked")
Map<String, Object> task = (Map<String, Object>) source.get("task");
assertEquals(taskInfo.getNode().getId(), task.get("node"));
assertEquals(taskInfo.getTaskId().getNodeId(), task.get("node"));
assertEquals(taskInfo.getAction(), task.get("action"));
assertEquals(Long.toString(taskInfo.getId()), task.get("id").toString());
@SuppressWarnings("unchecked")
Map<String, Object> result = (Map<String, Object>) source.get("result");
assertEquals("0", result.get("failure_count").toString());
assertNoFailures(client().admin().indices().prepareRefresh(TaskResultsService.TASK_RESULT_INDEX).get());
assertNull(source.get("failure"));
SearchResponse searchResponse = client().prepareSearch(TaskResultsService.TASK_RESULT_INDEX)
.setTypes(TaskResultsService.TASK_RESULT_TYPE)
assertNoFailures(client().admin().indices().prepareRefresh(TaskPersistenceService.TASK_INDEX).get());
SearchResponse searchResponse = client().prepareSearch(TaskPersistenceService.TASK_INDEX)
.setTypes(TaskPersistenceService.TASK_TYPE)
.setSource(SearchSourceBuilder.searchSource().query(QueryBuilders.termQuery("task.action", taskInfo.getAction())))
.get();
assertEquals(1L, searchResponse.getHits().totalHits());
searchResponse = client().prepareSearch(TaskResultsService.TASK_RESULT_INDEX).setTypes(TaskResultsService.TASK_RESULT_TYPE)
.setSource(SearchSourceBuilder.searchSource().query(QueryBuilders.termQuery("task.node", taskInfo.getNode().getId()))).get();
searchResponse = client().prepareSearch(TaskPersistenceService.TASK_INDEX).setTypes(TaskPersistenceService.TASK_TYPE)
.setSource(SearchSourceBuilder.searchSource().query(QueryBuilders.termQuery("task.node", taskInfo.getTaskId().getNodeId())))
.get();
assertEquals(1L, searchResponse.getHits().totalHits());
GetTaskResponse getResponse = expectFinishedTask(taskId);
assertEquals(result, getResponse.getTask().getResultAsMap());
assertNull(getResponse.getTask().getError());
}
public void testTaskFailurePersistence() throws Exception {
@ -545,22 +632,69 @@ public class TasksIT extends ESIntegTestCase {
List<TaskInfo> events = findEvents(TestTaskPlugin.TestTaskAction.NAME, Tuple::v1);
assertEquals(1, events.size());
TaskInfo failedTaskInfo = events.get(0);
String failedTaskId = failedTaskInfo.getTaskId().toString();
TaskId failedTaskId = failedTaskInfo.getTaskId();
GetResponse failedResultDoc = client()
.prepareGet(TaskResultsService.TASK_RESULT_INDEX, TaskResultsService.TASK_RESULT_TYPE, failedTaskId)
.prepareGet(TaskPersistenceService.TASK_INDEX, TaskPersistenceService.TASK_TYPE, failedTaskId.toString())
.get();
assertTrue(failedResultDoc.isExists());
Map<String, Object> source = failedResultDoc.getSource();
@SuppressWarnings("unchecked")
Map<String, Object> task = (Map<String, Object>) source.get("task");
assertEquals(failedTaskInfo.getNode().getId(), task.get("node"));
assertEquals(failedTaskInfo.getTaskId().getNodeId(), task.get("node"));
assertEquals(failedTaskInfo.getAction(), task.get("action"));
assertEquals(Long.toString(failedTaskInfo.getId()), task.get("id").toString());
@SuppressWarnings("unchecked")
Map<String, Object> error = (Map<String, Object>) source.get("error");
assertEquals("Simulating operation failure", error.get("reason"));
assertEquals("illegal_state_exception", error.get("type"));
assertNull(source.get("result"));
GetTaskResponse getResponse = expectFinishedTask(failedTaskId);
assertNull(getResponse.getTask().getResult());
assertEquals(error, getResponse.getTask().getErrorAsMap());
}
public void testGetTaskNotFound() throws Exception {
// Node isn't found, tasks index doesn't even exist
expectNotFound(() -> client().admin().cluster().prepareGetTask("not_a_node:1").get());
// Node exists but the task still isn't found
expectNotFound(() -> client().admin().cluster().prepareGetTask(new TaskId(internalCluster().getNodeNames()[0], 1)).get());
}
public void testNodeNotFoundButTaskFound() throws Exception {
// Save a fake task that looks like it is from a node that isn't part of the cluster
CyclicBarrier b = new CyclicBarrier(2);
TaskPersistenceService resultsService = internalCluster().getInstance(TaskPersistenceService.class);
resultsService.persist(
new PersistedTaskInfo(new TaskInfo(new TaskId("fake", 1), "test", "test", "", null, 0, 0, false, TaskId.EMPTY_TASK_ID),
new RuntimeException("test")),
new ActionListener<Void>() {
@Override
public void onResponse(Void response) {
try {
b.await();
} catch (InterruptedException | BrokenBarrierException e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
throw new RuntimeException(e);
}
});
b.await();
// Now we can find it!
GetTaskResponse response = expectFinishedTask(new TaskId("fake:1"));
assertEquals("test", response.getTask().getTask().getAction());
assertNotNull(response.getTask().getError());
assertNull(response.getTask().getResult());
}
@Override
@ -638,8 +772,28 @@ public class TasksIT extends ESIntegTestCase {
private void assertParentTask(TaskInfo task, TaskInfo parentTask) {
assertTrue(task.getParentTaskId().isSet());
assertEquals(parentTask.getNode().getId(), task.getParentTaskId().getNodeId());
assertEquals(parentTask.getTaskId().getNodeId(), task.getParentTaskId().getNodeId());
assertTrue(Strings.hasLength(task.getParentTaskId().getNodeId()));
assertEquals(parentTask.getId(), task.getParentTaskId().getId());
}
private ResourceNotFoundException expectNotFound(ThrowingRunnable r) {
Exception e = expectThrows(Exception.class, r);
ResourceNotFoundException notFound = (ResourceNotFoundException) ExceptionsHelper.unwrap(e, ResourceNotFoundException.class);
if (notFound == null) throw new RuntimeException("Expected ResourceNotFoundException", e);
return notFound;
}
/**
* Fetch the task status from the list tasks API using it's "fallback to get from the task index" behavior. Asserts some obvious stuff
* about the fetched task and returns a map of it's status.
*/
private GetTaskResponse expectFinishedTask(TaskId taskId) throws IOException {
GetTaskResponse response = client().admin().cluster().prepareGetTask(taskId).get();
assertTrue("the task should have been completed before fetching", response.getTask().isCompleted());
TaskInfo info = response.getTask().getTask();
assertEquals(taskId, info.getTaskId());
assertNull(info.getStatus()); // The test task doesn't have any status
return response;
}
}

View File

@ -28,7 +28,6 @@ import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
@ -38,9 +37,8 @@ import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
@ -52,6 +50,7 @@ import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.tasks.MockTaskManager;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -371,10 +370,10 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
assertEquals(testNodes.length, response.getPerNodeTasks().size());
// Coordinating node
assertEquals(2, response.getPerNodeTasks().get(testNodes[0].discoveryNode).size());
assertEquals(2, response.getPerNodeTasks().get(testNodes[0].discoveryNode.getId()).size());
// Other nodes node
for (int i = 1; i < testNodes.length; i++) {
assertEquals(1, response.getPerNodeTasks().get(testNodes[i].discoveryNode).size());
assertEquals(1, response.getPerNodeTasks().get(testNodes[i].discoveryNode.getId()).size());
}
// There should be a single main task when grouped by tasks
assertEquals(1, response.getTaskGroups().size());
@ -387,7 +386,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
listTasksRequest.setActions("testAction[n]"); // only pick node actions
response = testNode.transportListTasksAction.execute(listTasksRequest).get();
assertEquals(testNodes.length, response.getPerNodeTasks().size());
for (Map.Entry<DiscoveryNode, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) {
for (Map.Entry<String, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) {
assertEquals(1, entry.getValue().size());
assertNull(entry.getValue().get(0).getDescription());
}
@ -401,7 +400,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
listTasksRequest.setDetailed(true); // same request only with detailed description
response = testNode.transportListTasksAction.execute(listTasksRequest).get();
assertEquals(testNodes.length, response.getPerNodeTasks().size());
for (Map.Entry<DiscoveryNode, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) {
for (Map.Entry<String, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) {
assertEquals(1, entry.getValue().size());
assertEquals("CancellableNodeRequest[Test Request, true]", entry.getValue().get(0).getDescription());
}
@ -438,7 +437,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
listTasksRequest.setActions("testAction");
ListTasksResponse response = testNode.transportListTasksAction.execute(listTasksRequest).get();
assertEquals(1, response.getTasks().size());
String parentNode = response.getTasks().get(0).getNode().getId();
String parentNode = response.getTasks().get(0).getTaskId().getNodeId();
long parentTaskId = response.getTasks().get(0).getId();
// Find tasks with common parent
@ -493,7 +492,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
listTasksRequest.setActions("testAction[n]"); // only pick node actions
ListTasksResponse response = testNode.transportListTasksAction.execute(listTasksRequest).get();
assertEquals(testNodes.length, response.getPerNodeTasks().size());
for (Map.Entry<DiscoveryNode, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) {
for (Map.Entry<String, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) {
assertEquals(1, entry.getValue().size());
assertNull(entry.getValue().get(0).getDescription());
}
@ -503,7 +502,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
listTasksRequest.setDetailed(true); // same request only with detailed description
response = testNode.transportListTasksAction.execute(listTasksRequest).get();
assertEquals(testNodes.length, response.getPerNodeTasks().size());
for (Map.Entry<DiscoveryNode, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) {
for (Map.Entry<String, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) {
assertEquals(1, entry.getValue().size());
assertEquals("CancellableNodeRequest[Test Request, true]", entry.getValue().get(0).getDescription());
assertThat(entry.getValue().get(0).getStartTime(), greaterThanOrEqualTo(minimalStartTime));
@ -739,6 +738,11 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
assertEquals(testNodes.length + 1, response.getTasks().size());
// First group by node
DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder();
for (TestNode testNode : this.testNodes) {
discoNodes.put(testNode.discoveryNode);
}
response.setDiscoveryNodes(discoNodes.build());
Map<String, Object> byNodes = serialize(response, new ToXContent.MapParams(Collections.singletonMap("group_by", "nodes")));
byNodes = (Map<String, Object>) byNodes.get("nodes");
// One element on the top level

View File

@ -27,8 +27,6 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import static org.hamcrest.Matchers.equalTo;
public abstract class BaseXContentTestCase extends ESTestCase {
public abstract XContentType xcontentType();
@ -136,5 +134,26 @@ public abstract class BaseXContentTestCase extends ESTestCase {
assertEquals(Token.VALUE_NULL, parser.nextToken());
assertEquals(Token.END_OBJECT, parser.nextToken());
assertNull(parser.nextToken());
os = new ByteArrayOutputStream();
try (XContentGenerator generator = xcontentType().xContent().createGenerator(os)) {
generator.writeStartObject();
generator.writeFieldName("test");
generator.writeRawValue(new BytesArray(rawData));
generator.writeEndObject();
}
parser = xcontentType().xContent().createParser(os.toByteArray());
assertEquals(Token.START_OBJECT, parser.nextToken());
assertEquals(Token.FIELD_NAME, parser.nextToken());
assertEquals("test", parser.currentName());
assertEquals(Token.START_OBJECT, parser.nextToken());
assertEquals(Token.FIELD_NAME, parser.nextToken());
assertEquals("foo", parser.currentName());
assertEquals(Token.VALUE_NULL, parser.nextToken());
assertEquals(Token.END_OBJECT, parser.nextToken());
assertEquals(Token.END_OBJECT, parser.nextToken());
assertNull(parser.nextToken());
}
}

View File

@ -0,0 +1,137 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.tasks;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;
/**
* Round trip tests for {@link PersistedTaskInfo} and those classes that it includes like {@link TaskInfo} and {@link RawTaskStatus}.
*/
public class PersistedTaskInfoTests extends ESTestCase {
public void testBinaryRoundTrip() throws IOException {
NamedWriteableRegistry registry = new NamedWriteableRegistry();
registry.register(Task.Status.class, RawTaskStatus.NAME, RawTaskStatus::new);
PersistedTaskInfo result = randomTaskResult();
PersistedTaskInfo read;
try (BytesStreamOutput out = new BytesStreamOutput()) {
result.writeTo(out);
try (StreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(out.bytes()), registry)) {
read = new PersistedTaskInfo(in);
}
} catch (IOException e) {
throw new IOException("Error processing [" + result + "]", e);
}
assertEquals(result, read);
}
public void testXContentRoundTrip() throws IOException {
/*
* Note that this round trip isn't 100% perfect - status will always be read as RawTaskStatus. Since this test uses RawTaskStatus
* as the status we randomly generate then we can assert the round trip with .equals.
*/
PersistedTaskInfo result = randomTaskResult();
PersistedTaskInfo read;
try (XContentBuilder builder = XContentBuilder.builder(randomFrom(XContentType.values()).xContent())) {
result.toXContent(builder, ToXContent.EMPTY_PARAMS);
try (XContentBuilder shuffled = shuffleXContent(builder);
XContentParser parser = XContentHelper.createParser(shuffled.bytes())) {
read = PersistedTaskInfo.PARSER.apply(parser, () -> ParseFieldMatcher.STRICT);
}
} catch (IOException e) {
throw new IOException("Error processing [" + result + "]", e);
}
assertEquals(result, read);
}
private static PersistedTaskInfo randomTaskResult() throws IOException {
switch (between(0, 2)) {
case 0:
return new PersistedTaskInfo(randomTaskInfo());
case 1:
return new PersistedTaskInfo(randomTaskInfo(), new RuntimeException("error"));
case 2:
return new PersistedTaskInfo(randomTaskInfo(), randomTaskActionResult());
default:
throw new UnsupportedOperationException("Unsupported random TaskResult constructor");
}
}
private static TaskInfo randomTaskInfo() throws IOException {
TaskId taskId = randomTaskId();
String type = randomAsciiOfLength(5);
String action = randomAsciiOfLength(5);
Task.Status status = randomBoolean() ? randomRawTaskStatus() : null;
String description = randomBoolean() ? randomAsciiOfLength(5) : null;
long startTime = randomLong();
long runningTimeNanos = randomLong();
boolean cancellable = randomBoolean();
TaskId parentTaskId = randomBoolean() ? TaskId.EMPTY_TASK_ID : randomTaskId();
return new TaskInfo(taskId, type, action, description, status, startTime, runningTimeNanos, cancellable, parentTaskId);
}
private static TaskId randomTaskId() {
return new TaskId(randomAsciiOfLength(5), randomLong());
}
private static RawTaskStatus randomRawTaskStatus() throws IOException {
try (XContentBuilder builder = XContentBuilder.builder(Requests.INDEX_CONTENT_TYPE.xContent())) {
builder.startObject();
int fields = between(0, 10);
for (int f = 0; f < fields; f++) {
builder.field(randomAsciiOfLength(5), randomAsciiOfLength(5));
}
builder.endObject();
return new RawTaskStatus(builder.bytes());
}
}
private static ToXContent randomTaskActionResult() {
Map<String, String> result = new TreeMap<>();
int fields = between(0, 10);
for (int f = 0; f < fields; f++) {
result.put(randomAsciiOfLength(5), randomAsciiOfLength(5));
}
return new ToXContent() {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
// Results in Elasticsearch never output a leading startObject. There isn't really a good reason, they just don't.
for (Map.Entry<String, String> entry : result.entrySet()) {
builder.field(entry.getKey(), entry.getValue());
}
return builder;
}
};
}
}

View File

@ -58,15 +58,26 @@ The result will look similar to the following:
}
--------------------------------------------------
It is also possible to retrieve information for a particular task, or all children of a particular
tasks using the following two commands:
It is also possible to retrieve information for a particular task:
[source,js]
--------------------------------------------------
GET _tasks/taskId:1
GET _tasks?parent_task_id=parentTaskId:1
GET _tasks/taskId:1 <1>
--------------------------------------------------
// CONSOLE
// TEST[catch:missing]
<1> This will return a 404 if the task isn't found.
Or to retrieve all children of a particular task:
[source,js]
--------------------------------------------------
GET _tasks?parent_task_id=parentTaskId:1 <1>
--------------------------------------------------
// CONSOLE
<1> This won't return a 404 if the parent isn't found.
The task API can be also used to wait for completion of a particular task. The following call will
block for 10 seconds or until the task with id `oTUltX4IQMOUUVeiohTt8A:12345` is completed.
@ -76,6 +87,16 @@ block for 10 seconds or until the task with id `oTUltX4IQMOUUVeiohTt8A:12345` is
GET _tasks/oTUltX4IQMOUUVeiohTt8A:12345?wait_for_completion=true&timeout=10s
--------------------------------------------------
// CONSOLE
// TEST[catch:missing]
You can also wait for all tasks for certain action types to finish. This
command will wait for all `reindex` tasks to finish:
[source,js]
--------------------------------------------------
GET _tasks?actions=*reindex&wait_for_completion=true&timeout=10s
--------------------------------------------------
// CONSOLE
Tasks can be also listed using _cat version of the list tasks command, which accepts the same arguments
as the standard list tasks command.

View File

@ -149,10 +149,11 @@ to be refreshed.
If the request contains `wait_for_completion=false` then Elasticsearch will
perform some preflight checks, launch the request, and then return a `task`
which can be used with <<docs-delete-by-query-task-api,Tasks APIs>> to cancel
or get the status of the task. For now, once the request is finished the task
is gone and the only place to look for the ultimate result of the task is in
the Elasticsearch log file. This will be fixed soon.
which can be used with <<docs-delete-by-query-task-api,Tasks APIs>>
to cancel or get the status of the task. Elasticsearch will also create a
record of this task as a document at `.tasks/task/${taskId}`. This is yours
to keep or remove as you see fit. When you are done with it, delete it so
Elasticsearch can reclaim the space it uses.
`consistency` controls how many copies of a shard must respond to each write
request. `timeout` controls how long each write request waits for unavailable
@ -222,7 +223,7 @@ from aborting the operation.
[[docs-delete-by-query-task-api]]
=== Works with the Task API
While Delete By Query is running you can fetch their status using the
You can fetch the status of any running delete-by-query requests with the
<<tasks,Task API>>:
[source,js]
@ -277,6 +278,22 @@ of operations that the reindex expects to perform. You can estimate the
progress by adding the `updated`, `created`, and `deleted` fields. The request
will finish when their sum is equal to the `total` field.
With the task id you can look up the task directly:
[source,js]
--------------------------------------------------
GET /_tasks/taskId:1
--------------------------------------------------
// CONSOLE
// TEST[catch:missing]
The advantage of this API is that it integrates with `wait_for_completion=false`
to transparently return the status of completed tasks. If the task is completed
and `wait_for_completion=false` was set on it them it'll come back with a
`results` or an `error` field. The cost of this feature is the document that
`wait_for_completion=false` creates at `.tasks/task/${taskId}`. It is up to
you to delete that document.
[float]
[[docs-delete-by-query-cancel-task-api]]

View File

@ -375,10 +375,11 @@ parameter which causes just the shard that received the new data to be indexed.
If the request contains `wait_for_completion=false` then Elasticsearch will
perform some preflight checks, launch the request, and then return a `task`
which can be used with <<docs-reindex-task-api,Tasks APIs>> to cancel or get
the status of the task. For now, once the request is finished the task is gone
and the only place to look for the ultimate result of the task is in the
Elasticsearch log file. This will be fixed soon.
which can be used with <<docs-reindex-task-api,Tasks APIs>>
to cancel or get the status of the task. Elasticsearch will also create a
record of this task as a document at `.tasks/task/${taskId}`. This is yours
to keep or remove as you see fit. When you are done with it, delete it so
Elasticsearch can reclaim the space it uses.
`consistency` controls how many copies of a shard must respond to each write
request. `timeout` controls how long each write request waits for unavailable
@ -457,7 +458,7 @@ from aborting the operation.
[[docs-reindex-task-api]]
=== Works with the Task API
While Reindex is running you can fetch their status using the
You can fetch the status of all running reindex requests with the
<<tasks,Task API>>:
[source,js]
@ -515,6 +516,22 @@ of operations that the reindex expects to perform. You can estimate the
progress by adding the `updated`, `created`, and `deleted` fields. The request
will finish when their sum is equal to the `total` field.
With the task id you can look up the task directly:
[source,js]
--------------------------------------------------
GET /_tasks/taskId:1
--------------------------------------------------
// CONSOLE
// TEST[catch:missing]
The advantage of this API is that it integrates with `wait_for_completion=false`
to transparently return the status of completed tasks. If the task is completed
and `wait_for_completion=false` was set on it them it'll come back with a
`results` or an `error` field. The cost of this feature is the document that
`wait_for_completion=false` creates at `.tasks/task/${taskId}`. It is up to
you to delete that document.
[float]
[[docs-reindex-cancel-task-api]]

View File

@ -205,10 +205,11 @@ parameter which causes just the shard that received the new data to be indexed.
If the request contains `wait_for_completion=false` then Elasticsearch will
perform some preflight checks, launch the request, and then return a `task`
which can be used with <<docs-update-by-query-task-api,Tasks APIs>> to cancel
or get the status of the task. For now, once the request is finished the task
is gone and the only place to look for the ultimate result of the task is in
the Elasticsearch log file. This will be fixed soon.
which can be used with <<docs-update-by-query-task-api,Tasks APIs>>
to cancel or get the status of the task. Elasticsearch will also create a
record of this task as a document at `.tasks/task/${taskId}`. This is yours
to keep or remove as you see fit. When you are done with it, delete it so
Elasticsearch can reclaim the space it uses.
`consistency` controls how many copies of a shard must respond to each write
request. `timeout` controls how long each write request waits for unavailable
@ -283,7 +284,7 @@ from aborting the operation.
[[docs-update-by-query-task-api]]
=== Works with the Task API
While Update By Query is running you can fetch their status using the
You can fetch the status of all running update-by-query requests with the
<<tasks,Task API>>:
[source,js]
@ -341,6 +342,22 @@ of operations that the reindex expects to perform. You can estimate the
progress by adding the `updated`, `created`, and `deleted` fields. The request
will finish when their sum is equal to the `total` field.
With the task id you can look up the task directly:
[source,js]
--------------------------------------------------
GET /_tasks/taskId:1
--------------------------------------------------
// CONSOLE
// TEST[catch:missing]
The advantage of this API is that it integrates with `wait_for_completion=false`
to transparently return the status of completed tasks. If the task is completed
and `wait_for_completion=false` was set on it them it'll come back with a
`results` or an `error` field. The cost of this feature is the document that
`wait_for_completion=false` creates at `.tasks/task/${taskId}`. It is up to
you to delete that document.
[float]
[[docs-update-by-query-cancel-task-api]]

View File

@ -77,6 +77,8 @@ public abstract class AbstractBaseReindexRestHandler<
action.execute(internal, new BulkIndexByScrollResponseContentListener<>(channel, params));
return;
} else {
internal.setShouldPersistResult(true);
}
/*

View File

@ -93,6 +93,11 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
*/
private float requestsPerSecond = Float.POSITIVE_INFINITY;
/**
* Should this task persist its result?
*/
private boolean shouldPersistResult;
public AbstractBulkByScrollRequest() {
}
@ -286,6 +291,19 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
return self();
}
/**
* Should this task persist its result after it has finished?
*/
public Self setShouldPersistResult(boolean shouldPersistResult) {
this.shouldPersistResult = shouldPersistResult;
return self();
}
@Override
public boolean getShouldPersistResult() {
return shouldPersistResult;
}
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new BulkByScrollTask(id, type, action, getDescription(), parentTaskId, requestsPerSecond);

View File

@ -131,4 +131,12 @@ public abstract class AbstractBulkByScrollRequestBuilder<
request.setRequestsPerSecond(requestsPerSecond);
return self();
}
/**
* Should this task persist its result after it has finished?
*/
public Self setShouldPersistResult(boolean shouldPersistResult) {
request.setShouldPersistResult(shouldPersistResult);
return self();
}
}

View File

@ -19,7 +19,10 @@
package org.elasticsearch.index.reindex;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
@ -30,14 +33,18 @@ import org.elasticsearch.rest.action.support.RestToXContentListener;
import org.elasticsearch.tasks.TaskId;
import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.rest.action.admin.cluster.node.tasks.RestListTasksAction.nodeSettingListener;
public class RestRethrottleAction extends BaseRestHandler {
private final TransportRethrottleAction action;
private final ClusterService clusterService;
@Inject
public RestRethrottleAction(Settings settings, RestController controller, Client client, TransportRethrottleAction action) {
public RestRethrottleAction(Settings settings, RestController controller, Client client, TransportRethrottleAction action,
ClusterService clusterService) {
super(settings, client);
this.action = action;
this.clusterService = clusterService;
controller.registerHandler(POST, "/_update_by_query/{taskId}/_rethrottle", this);
controller.registerHandler(POST, "/_delete_by_query/{taskId}/_rethrottle", this);
controller.registerHandler(POST, "/_reindex/{taskId}/_rethrottle", this);
@ -52,6 +59,7 @@ public class RestRethrottleAction extends BaseRestHandler {
throw new IllegalArgumentException("requests_per_second is a required parameter");
}
internalRequest.setRequestsPerSecond(requestsPerSecond);
action.execute(internalRequest, new RestToXContentListener<>(channel));
ActionListener<ListTasksResponse> listener = nodeSettingListener(clusterService, new RestToXContentListener<>(channel));
action.execute(internalRequest, listener);
}
}

View File

@ -22,7 +22,6 @@ package org.elasticsearch.index.reindex;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.ClusterName;
@ -31,6 +30,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

View File

@ -22,7 +22,6 @@ package org.elasticsearch.index.reindex;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
@ -31,6 +30,7 @@ import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.Engine.Operation.Origin;
import org.elasticsearch.index.shard.IndexingOperationListener;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.ingest.IngestTestPlugin;
import org.junit.BeforeClass;

View File

@ -69,10 +69,13 @@
- is_false: deleted
- do:
tasks.list:
task.get:
wait_for_completion: true
task_id: $task
- is_false: node_failures
# The task will be in the response even if it finished before we got here
# because of task persistence.
- is_true: task
---
"Response for version conflict":

View File

@ -134,7 +134,7 @@
task_id: $task
- do:
tasks.list:
task.get:
wait_for_completion: true
task_id: $task
@ -197,6 +197,6 @@
task_id: $task
- do:
tasks.list:
task.get:
wait_for_completion: true
task_id: $task

View File

@ -93,10 +93,13 @@
- is_false: deleted
- do:
tasks.list:
task.get:
wait_for_completion: true
task_id: $task
- is_false: node_failures
# The task will be in the response even if it finished before we got here
# because of task persistence.
- is_true: task
---
"Response format for version conflict":

View File

@ -156,7 +156,7 @@
task_id: $task
- do:
tasks.list:
task.get:
wait_for_completion: true
task_id: $task
@ -214,6 +214,6 @@
task_id: $task
- do:
tasks.list:
task.get:
wait_for_completion: true
task_id: $task

View File

@ -53,10 +53,13 @@
- is_false: deleted
- do:
tasks.list:
task.get:
wait_for_completion: true
task_id: $task
- is_false: node_failures
# The task will be in the response even if it finished before we got here
# because of task persistence.
- is_true: task
---
"Response for version conflict":

View File

@ -122,7 +122,7 @@
task_id: $task
- do:
tasks.list:
task.get:
wait_for_completion: true
task_id: $task
@ -172,6 +172,6 @@
task_id: $task
- do:
tasks.list:
task.get:
wait_for_completion: true
task_id: $task

View File

@ -0,0 +1,23 @@
{
"task.get": {
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/master/tasks.html",
"methods": ["GET"],
"url": {
"path": "/_tasks/{task_id}",
"paths": ["/_tasks/{task_id}"],
"parts": {
"task_id": {
"type": "string",
"description": "Return the task with specified id (node_id:task_number)"
}
},
"params": {
"wait_for_completion": {
"type": "boolean",
"description": "Wait for the matching tasks to complete (default: false)"
}
}
},
"body": null
}
}

View File

@ -5,12 +5,7 @@
"url": {
"path": "/_tasks",
"paths": ["/_tasks", "/_tasks/{task_id}"],
"parts": {
"task_id": {
"type": "string",
"description": "Return the task with specified id (node_id:task_number)"
}
},
"parts": {},
"params": {
"node_id": {
"type": "list",

View File

@ -0,0 +1,10 @@
---
"get task test":
# Note that this gets much better testing in reindex's tests because it actually saves the task
- do:
cluster.state: {}
- do:
catch: missing
task.get:
task_id: foo:1