diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java index 9713a1455a4..c1f877c7f4a 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.tasks.TaskId; import java.io.IOException; import java.util.ArrayList; @@ -37,6 +38,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; /** * Returns the list of tasks currently running on the nodes @@ -47,6 +49,8 @@ public class ListTasksResponse extends BaseTasksResponse implements ToXContent { private Map> nodes; + private List groups; + public ListTasksResponse() { } @@ -94,6 +98,41 @@ public class ListTasksResponse extends BaseTasksResponse implements ToXContent { return nodeTasks; } + public List getTaskGroups() { + if (groups == null) { + buildTaskGroups(); + } + return groups; + } + + private void buildTaskGroups() { + Map taskGroups = new HashMap<>(); + List topLevelTasks = new ArrayList<>(); + // First populate all tasks + for (TaskInfo taskInfo : this.tasks) { + taskGroups.put(taskInfo.getTaskId(), TaskGroup.builder(taskInfo)); + } + + // Now go through all task group builders and add children to their parents + for (TaskGroup.Builder taskGroup : taskGroups.values()) { + TaskId parentTaskId = taskGroup.getTaskInfo().getParentTaskId(); + if (parentTaskId.isSet()) { + TaskGroup.Builder parentTask = taskGroups.get(parentTaskId); + if (parentTask != null) { + // we found parent in the list of tasks - add it to the parent list + parentTask.addGroup(taskGroup); + } else { + // we got zombie or the parent was filtered out - add it to the the top task list + topLevelTasks.add(taskGroup); + } + } else { + // top level task - add it to the top task list + topLevelTasks.add(taskGroup); + } + } + this.groups = Collections.unmodifiableList(topLevelTasks.stream().map(TaskGroup.Builder::build).collect(Collectors.toList())); + } + public List getTasks() { return tasks; } @@ -119,39 +158,48 @@ public class ListTasksResponse extends BaseTasksResponse implements ToXContent { } builder.endArray(); } + String groupBy = params.param("group_by", "nodes"); + if ("nodes".equals(groupBy)) { + builder.startObject("nodes"); + for (Map.Entry> entry : getPerNodeTasks().entrySet()) { + DiscoveryNode node = entry.getKey(); + builder.startObject(node.getId(), XContentBuilder.FieldCaseConversion.NONE); + builder.field("name", node.name()); + builder.field("transport_address", node.address().toString()); + builder.field("host", node.getHostName()); + builder.field("ip", node.getAddress()); - builder.startObject("nodes"); - for (Map.Entry> entry : getPerNodeTasks().entrySet()) { - DiscoveryNode node = entry.getKey(); - builder.startObject(node.getId(), XContentBuilder.FieldCaseConversion.NONE); - builder.field("name", node.name()); - builder.field("transport_address", node.address().toString()); - builder.field("host", node.getHostName()); - builder.field("ip", node.getAddress()); + builder.startArray("roles"); + for (DiscoveryNode.Role role : node.getRoles()) { + builder.value(role.getRoleName()); + } + builder.endArray(); - builder.startArray("roles"); - for (DiscoveryNode.Role role : node.getRoles()) { - builder.value(role.getRoleName()); - } - builder.endArray(); - - if (!node.getAttributes().isEmpty()) { - builder.startObject("attributes"); - for (Map.Entry attrEntry : node.getAttributes().entrySet()) { - builder.field(attrEntry.getKey(), attrEntry.getValue(), XContentBuilder.FieldCaseConversion.NONE); + if (!node.getAttributes().isEmpty()) { + builder.startObject("attributes"); + for (Map.Entry attrEntry : node.getAttributes().entrySet()) { + builder.field(attrEntry.getKey(), attrEntry.getValue(), XContentBuilder.FieldCaseConversion.NONE); + } + builder.endObject(); + } + builder.startObject("tasks"); + for(TaskInfo task : entry.getValue()) { + builder.startObject(task.getTaskId().toString(), XContentBuilder.FieldCaseConversion.NONE); + task.toXContent(builder, params); + builder.endObject(); } builder.endObject(); + builder.endObject(); } + } else if ("parents".equals(groupBy)) { builder.startObject("tasks"); - for(TaskInfo task : entry.getValue()) { - builder.startObject(task.getTaskId().toString(), XContentBuilder.FieldCaseConversion.NONE); - task.toXContent(builder, params); + for (TaskGroup group : getTaskGroups()) { + builder.startObject(group.getTaskInfo().getTaskId().toString(), XContentBuilder.FieldCaseConversion.NONE); + group.toXContent(builder, params); builder.endObject(); } builder.endObject(); - builder.endObject(); } - builder.endObject(); return builder; } diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/TaskGroup.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/TaskGroup.java new file mode 100644 index 00000000000..aa9bfd6b720 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/TaskGroup.java @@ -0,0 +1,94 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.node.tasks.list; + +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Information about a currently running task and all its subtasks. + */ +public class TaskGroup implements ToXContent { + + private final TaskInfo task; + + private final List childTasks; + + + public TaskGroup(TaskInfo task, List childTasks) { + this.task = task; + this.childTasks = Collections.unmodifiableList(new ArrayList<>(childTasks)); + } + + public static Builder builder(TaskInfo taskInfo) { + return new Builder(taskInfo); + } + + public static class Builder { + private TaskInfo taskInfo; + private List childTasks; + + private Builder(TaskInfo taskInfo) { + this.taskInfo = taskInfo; + childTasks = new ArrayList<>(); + } + + public void addGroup(Builder builder) { + childTasks.add(builder); + } + + public TaskInfo getTaskInfo() { + return taskInfo; + } + + public TaskGroup build() { + return new TaskGroup(taskInfo, childTasks.stream().map(Builder::build).collect(Collectors.toList())); + } + } + + public TaskInfo getTaskInfo() { + return task; + } + + public List getChildTasks() { + return childTasks; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + task.toXContent(builder, params); + if (childTasks.isEmpty() == false) { + builder.startArray("children"); + for (TaskGroup taskGroup : childTasks) { + builder.startObject(); + taskGroup.toXContent(builder, params); + builder.endObject(); + } + builder.endArray(); + } + return builder; + } +} diff --git a/core/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksRequest.java b/core/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksRequest.java index dc296a84720..c912fb11be9 100644 --- a/core/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksRequest.java +++ b/core/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksRequest.java @@ -42,8 +42,6 @@ public class BaseTasksRequest> extends public static final String[] ALL_NODES = Strings.EMPTY_ARRAY; - public static final long ALL_TASKS = -1L; - private String[] nodesIds = ALL_NODES; private TimeValue timeout; diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java index 972d9735efb..34218a99432 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java @@ -24,8 +24,10 @@ import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; +import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup; import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.PlainActionFuture; @@ -43,6 +45,11 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.tasks.MockTaskManager; @@ -368,6 +375,10 @@ public class TransportTasksActionTests extends TaskManagerTestCase { for (int i = 1; i < testNodes.length; i++) { assertEquals(1, response.getPerNodeTasks().get(testNodes[i].discoveryNode).size()); } + // There should be a single main task when grouped by tasks + assertEquals(1, response.getTaskGroups().size()); + // And as many child tasks as we have nodes + assertEquals(testNodes.length, response.getTaskGroups().get(0).getChildTasks().size()); // Check task counts using transport with filtering testNode = testNodes[randomIntBetween(0, testNodes.length - 1)]; @@ -379,6 +390,11 @@ public class TransportTasksActionTests extends TaskManagerTestCase { assertEquals(1, entry.getValue().size()); assertNull(entry.getValue().get(0).getDescription()); } + // Since the main task is not in the list - all tasks should be by themselves + assertEquals(testNodes.length, response.getTaskGroups().size()); + for (TaskGroup taskGroup : response.getTaskGroups()) { + assertEquals(0, taskGroup.getChildTasks().size()); + } // Check task counts using transport with detailed description listTasksRequest.setDetailed(true); // same request only with detailed description @@ -703,4 +719,53 @@ public class TransportTasksActionTests extends TaskManagerTestCase { NodesResponse responses = future.get(); assertEquals(0, responses.failureCount()); } + + @SuppressWarnings("unchecked") + public void testTasksToXContentGrouping() throws Exception { + setupTestNodes(Settings.EMPTY); + connectNodes(testNodes); + + // Get the parent task + ListTasksRequest listTasksRequest = new ListTasksRequest(); + listTasksRequest.setActions(ListTasksAction.NAME + "*"); + ListTasksResponse response = testNodes[0].transportListTasksAction.execute(listTasksRequest).get(); + assertEquals(testNodes.length + 1, response.getTasks().size()); + + // First group by node + Map byNodes = serialize(response, new ToXContent.MapParams(Collections.singletonMap("group_by", "nodes"))); + byNodes = (Map) byNodes.get("nodes"); + // One element on the top level + assertEquals(testNodes.length, byNodes.size()); + Map firstNode = (Map) byNodes.get(testNodes[0].discoveryNode.getId()); + firstNode = (Map) firstNode.get("tasks"); + assertEquals(2, firstNode.size()); // two tasks for the first node + for (int i = 1; i < testNodes.length; i++) { + Map otherNode = (Map) byNodes.get(testNodes[i].discoveryNode.getId()); + otherNode = (Map) otherNode.get("tasks"); + assertEquals(1, otherNode.size()); // one tasks for the all other nodes + } + + // Group by parents + Map byParent = serialize(response, new ToXContent.MapParams(Collections.singletonMap("group_by", "parents"))); + byParent = (Map) byParent.get("tasks"); + // One element on the top level + assertEquals(1, byParent.size()); // Only one top level task + Map topTask = (Map) byParent.values().iterator().next(); + List children = (List) topTask.get("children"); + assertEquals(testNodes.length, children.size()); // two tasks for the first node + for (int i = 0; i < testNodes.length; i++) { + Map child = (Map) children.get(i); + assertNull(child.get("children")); + } + } + + private Map serialize(ToXContent response, ToXContent.Params params) throws IOException { + XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON); + builder.startObject(); + response.toXContent(builder, params); + builder.endObject(); + builder.flush(); + logger.info(builder.string()); + return XContentHelper.convertToMap(builder.bytes(), false).v2(); + } } diff --git a/docs/reference/cluster/tasks.asciidoc b/docs/reference/cluster/tasks.asciidoc index bb03231fc83..4ed2a7b899a 100644 --- a/docs/reference/cluster/tasks.asciidoc +++ b/docs/reference/cluster/tasks.asciidoc @@ -99,5 +99,14 @@ POST /_tasks/_cancel?node_id=nodeId1,nodeId2&actions=*reindex // AUTOSENSE +[float] +=== Task Grouping +The task lists returned by task API commands can be grouped either by nodes (default) or by parent tasks using the `group_by` parameter. +The following command will change the grouping to parent tasks: +[source,js] +-------------------------------------------------- +GET /_tasks?group_by=parents +-------------------------------------------------- +// AUTOSENSE diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/tasks.list.json b/rest-api-spec/src/main/resources/rest-api-spec/api/tasks.list.json index 5cdeed1b142..2fff6972150 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/tasks.list.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/tasks.list.json @@ -35,7 +35,14 @@ "wait_for_completion": { "type": "boolean", "description": "Wait for the matching tasks to complete (default: false)" + }, + "group_by": { + "type" : "enum", + "description": "Group tasks by nodes or parent/child relationships", + "options" : ["nodes", "parents"], + "default" : "nodes" } + } }, "body": null diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/tasks.list/10_basic.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/tasks.list/10_basic.yaml index 8b876261cf7..dd1c415876f 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/tasks.list/10_basic.yaml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/tasks.list/10_basic.yaml @@ -11,3 +11,9 @@ - is_true: nodes - is_true: nodes.$master.roles + + - do: + tasks.list: + group_by: parents + + - is_true: tasks