Add ability to group tasks by common parent

By default, tasks are grouped by node. However, task execution in elasticsearch can be quite complex and an individual task that runs on a coordinating node can have many subtasks running on other nodes in the cluster. This commit makes it possible to list task grouped by common parents instead of by node. When this option is enabled all subtask are grouped under the coordinating node task that started all subtasks in the group. To group tasks by common parents, use the following syntax:

 GET /tasks?group_by=parents
This commit is contained in:
Igor Motov 2016-03-24 22:59:52 -04:00
parent 78ab6c5b7f
commit e073b0c75d
7 changed files with 252 additions and 25 deletions

View File

@ -28,6 +28,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.tasks.TaskId;
import java.io.IOException;
import java.util.ArrayList;
@ -37,6 +38,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Returns the list of tasks currently running on the nodes
@ -47,6 +49,8 @@ public class ListTasksResponse extends BaseTasksResponse implements ToXContent {
private Map<DiscoveryNode, List<TaskInfo>> nodes;
private List<TaskGroup> groups;
public ListTasksResponse() {
}
@ -94,6 +98,41 @@ public class ListTasksResponse extends BaseTasksResponse implements ToXContent {
return nodeTasks;
}
public List<TaskGroup> getTaskGroups() {
if (groups == null) {
buildTaskGroups();
}
return groups;
}
private void buildTaskGroups() {
Map<TaskId, TaskGroup.Builder> taskGroups = new HashMap<>();
List<TaskGroup.Builder> topLevelTasks = new ArrayList<>();
// First populate all tasks
for (TaskInfo taskInfo : this.tasks) {
taskGroups.put(taskInfo.getTaskId(), TaskGroup.builder(taskInfo));
}
// Now go through all task group builders and add children to their parents
for (TaskGroup.Builder taskGroup : taskGroups.values()) {
TaskId parentTaskId = taskGroup.getTaskInfo().getParentTaskId();
if (parentTaskId.isSet()) {
TaskGroup.Builder parentTask = taskGroups.get(parentTaskId);
if (parentTask != null) {
// we found parent in the list of tasks - add it to the parent list
parentTask.addGroup(taskGroup);
} else {
// we got zombie or the parent was filtered out - add it to the the top task list
topLevelTasks.add(taskGroup);
}
} else {
// top level task - add it to the top task list
topLevelTasks.add(taskGroup);
}
}
this.groups = Collections.unmodifiableList(topLevelTasks.stream().map(TaskGroup.Builder::build).collect(Collectors.toList()));
}
public List<TaskInfo> getTasks() {
return tasks;
}
@ -119,39 +158,48 @@ public class ListTasksResponse extends BaseTasksResponse implements ToXContent {
}
builder.endArray();
}
String groupBy = params.param("group_by", "nodes");
if ("nodes".equals(groupBy)) {
builder.startObject("nodes");
for (Map.Entry<DiscoveryNode, List<TaskInfo>> entry : getPerNodeTasks().entrySet()) {
DiscoveryNode node = entry.getKey();
builder.startObject(node.getId(), XContentBuilder.FieldCaseConversion.NONE);
builder.field("name", node.name());
builder.field("transport_address", node.address().toString());
builder.field("host", node.getHostName());
builder.field("ip", node.getAddress());
builder.startObject("nodes");
for (Map.Entry<DiscoveryNode, List<TaskInfo>> entry : getPerNodeTasks().entrySet()) {
DiscoveryNode node = entry.getKey();
builder.startObject(node.getId(), XContentBuilder.FieldCaseConversion.NONE);
builder.field("name", node.name());
builder.field("transport_address", node.address().toString());
builder.field("host", node.getHostName());
builder.field("ip", node.getAddress());
builder.startArray("roles");
for (DiscoveryNode.Role role : node.getRoles()) {
builder.value(role.getRoleName());
}
builder.endArray();
builder.startArray("roles");
for (DiscoveryNode.Role role : node.getRoles()) {
builder.value(role.getRoleName());
}
builder.endArray();
if (!node.getAttributes().isEmpty()) {
builder.startObject("attributes");
for (Map.Entry<String, String> attrEntry : node.getAttributes().entrySet()) {
builder.field(attrEntry.getKey(), attrEntry.getValue(), XContentBuilder.FieldCaseConversion.NONE);
if (!node.getAttributes().isEmpty()) {
builder.startObject("attributes");
for (Map.Entry<String, String> attrEntry : node.getAttributes().entrySet()) {
builder.field(attrEntry.getKey(), attrEntry.getValue(), XContentBuilder.FieldCaseConversion.NONE);
}
builder.endObject();
}
builder.startObject("tasks");
for(TaskInfo task : entry.getValue()) {
builder.startObject(task.getTaskId().toString(), XContentBuilder.FieldCaseConversion.NONE);
task.toXContent(builder, params);
builder.endObject();
}
builder.endObject();
builder.endObject();
}
} else if ("parents".equals(groupBy)) {
builder.startObject("tasks");
for(TaskInfo task : entry.getValue()) {
builder.startObject(task.getTaskId().toString(), XContentBuilder.FieldCaseConversion.NONE);
task.toXContent(builder, params);
for (TaskGroup group : getTaskGroups()) {
builder.startObject(group.getTaskInfo().getTaskId().toString(), XContentBuilder.FieldCaseConversion.NONE);
group.toXContent(builder, params);
builder.endObject();
}
builder.endObject();
builder.endObject();
}
builder.endObject();
return builder;
}

View File

@ -0,0 +1,94 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.node.tasks.list;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
/**
* Information about a currently running task and all its subtasks.
*/
public class TaskGroup implements ToXContent {
private final TaskInfo task;
private final List<TaskGroup> childTasks;
public TaskGroup(TaskInfo task, List<TaskGroup> childTasks) {
this.task = task;
this.childTasks = Collections.unmodifiableList(new ArrayList<>(childTasks));
}
public static Builder builder(TaskInfo taskInfo) {
return new Builder(taskInfo);
}
public static class Builder {
private TaskInfo taskInfo;
private List<Builder> childTasks;
private Builder(TaskInfo taskInfo) {
this.taskInfo = taskInfo;
childTasks = new ArrayList<>();
}
public void addGroup(Builder builder) {
childTasks.add(builder);
}
public TaskInfo getTaskInfo() {
return taskInfo;
}
public TaskGroup build() {
return new TaskGroup(taskInfo, childTasks.stream().map(Builder::build).collect(Collectors.toList()));
}
}
public TaskInfo getTaskInfo() {
return task;
}
public List<TaskGroup> getChildTasks() {
return childTasks;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
task.toXContent(builder, params);
if (childTasks.isEmpty() == false) {
builder.startArray("children");
for (TaskGroup taskGroup : childTasks) {
builder.startObject();
taskGroup.toXContent(builder, params);
builder.endObject();
}
builder.endArray();
}
return builder;
}
}

View File

@ -42,8 +42,6 @@ public class BaseTasksRequest<Request extends BaseTasksRequest<Request>> extends
public static final String[] ALL_NODES = Strings.EMPTY_ARRAY;
public static final long ALL_TASKS = -1L;
private String[] nodesIds = ALL_NODES;
private TimeValue timeout;

View File

@ -24,8 +24,10 @@ import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.PlainActionFuture;
@ -43,6 +45,11 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.tasks.MockTaskManager;
@ -368,6 +375,10 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
for (int i = 1; i < testNodes.length; i++) {
assertEquals(1, response.getPerNodeTasks().get(testNodes[i].discoveryNode).size());
}
// There should be a single main task when grouped by tasks
assertEquals(1, response.getTaskGroups().size());
// And as many child tasks as we have nodes
assertEquals(testNodes.length, response.getTaskGroups().get(0).getChildTasks().size());
// Check task counts using transport with filtering
testNode = testNodes[randomIntBetween(0, testNodes.length - 1)];
@ -379,6 +390,11 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
assertEquals(1, entry.getValue().size());
assertNull(entry.getValue().get(0).getDescription());
}
// Since the main task is not in the list - all tasks should be by themselves
assertEquals(testNodes.length, response.getTaskGroups().size());
for (TaskGroup taskGroup : response.getTaskGroups()) {
assertEquals(0, taskGroup.getChildTasks().size());
}
// Check task counts using transport with detailed description
listTasksRequest.setDetailed(true); // same request only with detailed description
@ -703,4 +719,53 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
NodesResponse responses = future.get();
assertEquals(0, responses.failureCount());
}
@SuppressWarnings("unchecked")
public void testTasksToXContentGrouping() throws Exception {
setupTestNodes(Settings.EMPTY);
connectNodes(testNodes);
// Get the parent task
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setActions(ListTasksAction.NAME + "*");
ListTasksResponse response = testNodes[0].transportListTasksAction.execute(listTasksRequest).get();
assertEquals(testNodes.length + 1, response.getTasks().size());
// First group by node
Map<String, Object> byNodes = serialize(response, new ToXContent.MapParams(Collections.singletonMap("group_by", "nodes")));
byNodes = (Map<String, Object>) byNodes.get("nodes");
// One element on the top level
assertEquals(testNodes.length, byNodes.size());
Map<String, Object> firstNode = (Map<String, Object>) byNodes.get(testNodes[0].discoveryNode.getId());
firstNode = (Map<String, Object>) firstNode.get("tasks");
assertEquals(2, firstNode.size()); // two tasks for the first node
for (int i = 1; i < testNodes.length; i++) {
Map<String, Object> otherNode = (Map<String, Object>) byNodes.get(testNodes[i].discoveryNode.getId());
otherNode = (Map<String, Object>) otherNode.get("tasks");
assertEquals(1, otherNode.size()); // one tasks for the all other nodes
}
// Group by parents
Map<String, Object> byParent = serialize(response, new ToXContent.MapParams(Collections.singletonMap("group_by", "parents")));
byParent = (Map<String, Object>) byParent.get("tasks");
// One element on the top level
assertEquals(1, byParent.size()); // Only one top level task
Map<String, Object> topTask = (Map<String, Object>) byParent.values().iterator().next();
List<Object> children = (List<Object>) topTask.get("children");
assertEquals(testNodes.length, children.size()); // two tasks for the first node
for (int i = 0; i < testNodes.length; i++) {
Map<String, Object> child = (Map<String, Object>) children.get(i);
assertNull(child.get("children"));
}
}
private Map<String, Object> serialize(ToXContent response, ToXContent.Params params) throws IOException {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
builder.startObject();
response.toXContent(builder, params);
builder.endObject();
builder.flush();
logger.info(builder.string());
return XContentHelper.convertToMap(builder.bytes(), false).v2();
}
}

View File

@ -99,5 +99,14 @@ POST /_tasks/_cancel?node_id=nodeId1,nodeId2&actions=*reindex
// AUTOSENSE
[float]
=== Task Grouping
The task lists returned by task API commands can be grouped either by nodes (default) or by parent tasks using the `group_by` parameter.
The following command will change the grouping to parent tasks:
[source,js]
--------------------------------------------------
GET /_tasks?group_by=parents
--------------------------------------------------
// AUTOSENSE

View File

@ -35,7 +35,14 @@
"wait_for_completion": {
"type": "boolean",
"description": "Wait for the matching tasks to complete (default: false)"
},
"group_by": {
"type" : "enum",
"description": "Group tasks by nodes or parent/child relationships",
"options" : ["nodes", "parents"],
"default" : "nodes"
}
}
},
"body": null

View File

@ -11,3 +11,9 @@
- is_true: nodes
- is_true: nodes.$master.roles
- do:
tasks.list:
group_by: parents
- is_true: tasks