diff --git a/core/src/main/java/org/elasticsearch/action/ActionModule.java b/core/src/main/java/org/elasticsearch/action/ActionModule.java index 8f0148d50ae..11cafb326a0 100644 --- a/core/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/core/src/main/java/org/elasticsearch/action/ActionModule.java @@ -28,6 +28,8 @@ import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfoAction import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsAction; import org.elasticsearch.action.admin.cluster.node.stats.TransportNodesStatsAction; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction; +import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction; import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryAction; import org.elasticsearch.action.admin.cluster.repositories.delete.TransportDeleteRepositoryAction; import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesAction; @@ -255,6 +257,7 @@ public class ActionModule extends AbstractModule { registerAction(NodesInfoAction.INSTANCE, TransportNodesInfoAction.class); registerAction(NodesStatsAction.INSTANCE, TransportNodesStatsAction.class); registerAction(NodesHotThreadsAction.INSTANCE, TransportNodesHotThreadsAction.class); + registerAction(ListTasksAction.INSTANCE, TransportListTasksAction.class); registerAction(ClusterStatsAction.INSTANCE, TransportClusterStatsAction.class); registerAction(ClusterStateAction.INSTANCE, TransportClusterStateAction.class); diff --git a/core/src/main/java/org/elasticsearch/action/TaskOperationFailure.java b/core/src/main/java/org/elasticsearch/action/TaskOperationFailure.java new file mode 100644 index 00000000000..bf5051c1a19 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/TaskOperationFailure.java @@ -0,0 +1,117 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.rest.RestStatus; + +import java.io.IOException; + +import static org.elasticsearch.ExceptionsHelper.detailedMessage; + +/** + * Information about task operation failures + * + * The class is final due to serialization limitations + */ +public final class TaskOperationFailure implements Writeable, ToXContent { + + private final String nodeId; + + private final long taskId; + + private final Throwable reason; + + private final RestStatus status; + + public TaskOperationFailure(StreamInput in) throws IOException { + nodeId = in.readString(); + taskId = in.readLong(); + reason = in.readThrowable(); + status = RestStatus.readFrom(in); + } + + public TaskOperationFailure(String nodeId, long taskId, Throwable t) { + this.nodeId = nodeId; + this.taskId = taskId; + this.reason = t; + status = ExceptionsHelper.status(t); + } + + public String getNodeId() { + return this.nodeId; + } + + public long getTaskId() { + return this.taskId; + } + + public String getReason() { + return detailedMessage(reason); + } + + public RestStatus getStatus() { + return status; + } + + public Throwable getCause() { + return reason; + } + + @Override + public TaskOperationFailure readFrom(StreamInput in) throws IOException { + return new TaskOperationFailure(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(nodeId); + out.writeLong(taskId); + out.writeThrowable(reason); + RestStatus.writeTo(out, status); + } + + @Override + public String toString() { + return "[" + nodeId + "][" + taskId + "] failed, reason [" + getReason() + "]"; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field("task_id", getTaskId()); + builder.field("node_id", getNodeId()); + builder.field("status", status.name()); + if (reason != null) { + builder.field("reason"); + builder.startObject(); + ElasticsearchException.toXContent(builder, params, reason); + builder.endObject(); + } + return builder; + + } + +} diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java index 9830305662e..79adbafa9bb 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java @@ -38,6 +38,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.gateway.GatewayAllocator; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -75,7 +76,13 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction< } @Override - protected void masterOperation(final ClusterHealthRequest request, final ClusterState unusedState, final ActionListener listener) { + protected final void masterOperation(ClusterHealthRequest request, ClusterState state, ActionListener listener) throws Exception { + logger.warn("attempt to execute a cluster health operation without a task"); + throw new UnsupportedOperationException("task parameter is required for this operation"); + } + + @Override + protected void masterOperation(Task task, final ClusterHealthRequest request, final ClusterState unusedState, final ActionListener listener) { if (request.waitForEvents() != null) { final long endTimeMS = TimeValue.nsecToMSec(System.nanoTime()) + request.timeout().millis(); clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])", new ClusterStateUpdateTask(request.waitForEvents()) { @@ -95,7 +102,7 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction< @Override public void onNoLongerMaster(String source) { logger.trace("stopped being master while waiting for events with priority [{}]. retrying.", request.waitForEvents()); - doExecute(request, listener); + doExecute(task, request, listener); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksAction.java new file mode 100644 index 00000000000..acc11861108 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksAction.java @@ -0,0 +1,46 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.node.tasks.list; + +import org.elasticsearch.action.Action; +import org.elasticsearch.client.ElasticsearchClient; + +/** + * Action for retrieving a list of currently running tasks + */ +public class ListTasksAction extends Action { + + public static final ListTasksAction INSTANCE = new ListTasksAction(); + public static final String NAME = "cluster:monitor/tasks/lists"; + + private ListTasksAction() { + super(NAME); + } + + @Override + public ListTasksResponse newResponse() { + return new ListTasksResponse(); + } + + @Override + public ListTasksRequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new ListTasksRequestBuilder(client, this); + } +} diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksRequest.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksRequest.java new file mode 100644 index 00000000000..0b0637e0b8e --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksRequest.java @@ -0,0 +1,69 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.node.tasks.list; + +import org.elasticsearch.action.support.tasks.BaseTasksRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * A request to get node tasks + */ +public class ListTasksRequest extends BaseTasksRequest { + + private boolean detailed = false; + + /** + * Get information from nodes based on the nodes ids specified. If none are passed, information + * for all nodes will be returned. + */ + public ListTasksRequest(String... nodesIds) { + super(nodesIds); + } + + /** + * Should the detailed task information be returned. + */ + public boolean detailed() { + return this.detailed; + } + + /** + * Should the node settings be returned. + */ + public ListTasksRequest detailed(boolean detailed) { + this.detailed = detailed; + return this; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + detailed = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(detailed); + } +} diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksRequestBuilder.java new file mode 100644 index 00000000000..2b462014f43 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksRequestBuilder.java @@ -0,0 +1,41 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.node.tasks.list; + +import org.elasticsearch.action.support.tasks.TasksRequestBuilder; +import org.elasticsearch.client.ElasticsearchClient; + +/** + * Builder for the request to retrieve the list of tasks running on the specified nodes + */ +public class ListTasksRequestBuilder extends TasksRequestBuilder { + + public ListTasksRequestBuilder(ElasticsearchClient client, ListTasksAction action) { + super(client, action, new ListTasksRequest()); + } + + /** + * Should detailed task information be returned. + */ + public ListTasksRequestBuilder setDetailed(boolean detailed) { + request.detailed(detailed); + return this; + } +} diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java new file mode 100644 index 00000000000..2da9701fcfa --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java @@ -0,0 +1,159 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.node.tasks.list; + +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.TaskOperationFailure; +import org.elasticsearch.action.support.tasks.BaseTasksResponse; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Returns the list of tasks currently running on the nodes + */ +public class ListTasksResponse extends BaseTasksResponse implements ToXContent { + + private List tasks; + + private Map> nodes; + + public ListTasksResponse() { + } + + public ListTasksResponse(List tasks, List taskFailures, List nodeFailures) { + super(taskFailures, nodeFailures); + this.tasks = tasks == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(tasks)); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + tasks = Collections.unmodifiableList(in.readList(TaskInfo::new)); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeList(tasks); + } + + /** + * Returns the list of tasks by node + */ + public Map> getPerNodeTasks() { + if (nodes != null) { + return nodes; + } + Map> nodeTasks = new HashMap<>(); + + Set nodes = new HashSet<>(); + for (TaskInfo shard : tasks) { + nodes.add(shard.getNode()); + } + + for (DiscoveryNode node : nodes) { + List tasks = new ArrayList<>(); + for (TaskInfo taskInfo : this.tasks) { + if (taskInfo.getNode().equals(node)) { + tasks.add(taskInfo); + } + } + nodeTasks.put(node, tasks); + } + this.nodes = nodeTasks; + return nodeTasks; + } + + public List getTasks() { + return tasks; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + if (getTaskFailures() != null && getTaskFailures().size() > 0) { + builder.startArray("task_failures"); + for (TaskOperationFailure ex : getTaskFailures()){ + builder.value(ex); + } + builder.endArray(); + } + + if (getNodeFailures() != null && getNodeFailures().size() > 0) { + builder.startArray("node_failures"); + for (FailedNodeException ex : getNodeFailures()){ + builder.value(ex); + } + builder.endArray(); + } + + builder.startObject("nodes"); + for (Map.Entry> entry : getPerNodeTasks().entrySet()) { + DiscoveryNode node = entry.getKey(); + builder.startObject(node.getId(), XContentBuilder.FieldCaseConversion.NONE); + builder.field("name", node.name()); + builder.field("transport_address", node.address().toString()); + builder.field("host", node.getHostName()); + builder.field("ip", node.getAddress()); + + if (!node.attributes().isEmpty()) { + builder.startObject("attributes"); + for (ObjectObjectCursor attr : node.attributes()) { + builder.field(attr.key, attr.value, XContentBuilder.FieldCaseConversion.NONE); + } + builder.endObject(); + } + builder.startArray("tasks"); + for(TaskInfo task : entry.getValue()) { + task.toXContent(builder, params); + } + builder.endArray(); + builder.endObject(); + } + builder.endObject(); + return builder; + } + + @Override + public String toString() { + try { + XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint(); + builder.startObject(); + toXContent(builder, EMPTY_PARAMS); + builder.endObject(); + return builder.string(); + } catch (IOException e) { + return "{ \"error\" : \"" + e.getMessage() + "\"}"; + } + } +} diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/TaskInfo.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/TaskInfo.java new file mode 100644 index 00000000000..ed43da2c4ed --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/TaskInfo.java @@ -0,0 +1,140 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.node.tasks.list; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; + +/** + * Information about a currently running task. + *

+ * Tasks are used for communication with transport actions. As a result, they can contain callback + * references as well as mutable state. That makes it impractical to send tasks over transport channels + * and use in APIs. Instead, immutable and streamable TaskInfo objects are used to represent + * snapshot information about currently running tasks. + */ +public class TaskInfo implements Writeable, ToXContent { + + private final DiscoveryNode node; + + private final long id; + + private final String type; + + private final String action; + + private final String description; + + private final String parentNode; + + private final long parentId; + + public TaskInfo(DiscoveryNode node, long id, String type, String action, String description) { + this(node, id, type, action, description, null, -1L); + } + + public TaskInfo(DiscoveryNode node, long id, String type, String action, String description, String parentNode, long parentId) { + this.node = node; + this.id = id; + this.type = type; + this.action = action; + this.description = description; + this.parentNode = parentNode; + this.parentId = parentId; + } + + public TaskInfo(StreamInput in) throws IOException { + node = DiscoveryNode.readNode(in); + id = in.readLong(); + type = in.readString(); + action = in.readString(); + description = in.readOptionalString(); + parentNode = in.readOptionalString(); + parentId = in.readLong(); + } + + public DiscoveryNode getNode() { + return node; + } + + public long getId() { + return id; + } + + public String getType() { + return type; + } + + public String getAction() { + return action; + } + + public String getDescription() { + return description; + } + + public String getParentNode() { + return parentNode; + } + + public long getParentId() { + return parentId; + } + + @Override + public TaskInfo readFrom(StreamInput in) throws IOException { + return new TaskInfo(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + node.writeTo(out); + out.writeLong(id); + out.writeString(type); + out.writeString(action); + out.writeOptionalString(description); + out.writeOptionalString(parentNode); + out.writeLong(parentId); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("node", node.getId()); + builder.field("id", id); + builder.field("type", type); + builder.field("action", action); + if (description != null) { + builder.field("description", description); + } + if (parentNode != null) { + builder.field("parent_node", parentNode); + builder.field("parent_id", parentId); + } + builder.endObject(); + return builder; + } +} diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/TransportListTasksAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/TransportListTasksAction.java new file mode 100644 index 00000000000..5475a394f34 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/TransportListTasksAction.java @@ -0,0 +1,69 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.node.tasks.list; + +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.TaskOperationFailure; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.tasks.TransportTasksAction; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.List; + +/** + * + */ +public class TransportListTasksAction extends TransportTasksAction { + + @Inject + public TransportListTasksAction(Settings settings, ClusterName clusterName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + super(settings, ListTasksAction.NAME, clusterName, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, ListTasksRequest::new, ListTasksResponse::new, ThreadPool.Names.MANAGEMENT); + } + + @Override + protected ListTasksResponse newResponse(ListTasksRequest request, List tasks, List taskOperationFailures, List failedNodeExceptions) { + return new ListTasksResponse(tasks, taskOperationFailures, failedNodeExceptions); + } + + @Override + protected TaskInfo readTaskResponse(StreamInput in) throws IOException { + return new TaskInfo(in); + } + + @Override + protected TaskInfo taskOperation(ListTasksRequest request, Task task) { + return task.taskInfo(clusterService.localNode(), request.detailed()); + } + + @Override + protected boolean accumulateExceptions() { + return true; + } +} diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java index e454fcabc7a..f8bbebf7db8 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -75,12 +76,12 @@ public class TransportCloseIndexAction extends TransportMasterNodeAction listener) { + protected void doExecute(Task task, CloseIndexRequest request, ActionListener listener) { destructiveOperations.failDestructive(request.indices()); if (closeIndexEnabled == false) { throw new IllegalStateException("closing indices is disabled - set [" + CLUSTER_INDICES_CLOSE_ENABLE_SETTING.getKey() + ": true] to enable it. NOTE: closed indices still consume a significant amount of diskspace"); } - super.doExecute(request, listener); + super.doExecute(task, request, listener); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/delete/TransportDeleteIndexAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/delete/TransportDeleteIndexAction.java index 82176da053c..28bf46f798f 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/delete/TransportDeleteIndexAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/delete/TransportDeleteIndexAction.java @@ -31,6 +31,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaDataDeleteIndexService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -62,9 +63,9 @@ public class TransportDeleteIndexAction extends TransportMasterNodeAction listener) { + protected void doExecute(Task task, DeleteIndexRequest request, ActionListener listener) { destructiveOperations.failDestructive(request.indices()); - super.doExecute(request, listener); + super.doExecute(task, request, listener); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/open/TransportOpenIndexAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/open/TransportOpenIndexAction.java index 2717a2320ef..7ffb30b9534 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/open/TransportOpenIndexAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/open/TransportOpenIndexAction.java @@ -32,6 +32,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaDataIndexStateService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -65,9 +66,9 @@ public class TransportOpenIndexAction extends TransportMasterNodeAction listener) { + protected void doExecute(Task task, OpenIndexRequest request, ActionListener listener) { destructiveOperations.failDestructive(request.indices()); - super.doExecute(request, listener); + super.doExecute(task, request, listener); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java b/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java index 12c5f733643..9d1004ccd5c 100644 --- a/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java @@ -54,7 +54,9 @@ import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.ShardSearchTransportRequest; import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.query.QuerySearchResultProvider; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; import java.util.List; import java.util.Map; @@ -77,7 +79,7 @@ public abstract class TransportSearchTypeAction extends TransportAction extends TransportAction{ protected HandledTransportAction(Settings settings, String actionName, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier request) { - super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver); + super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver, transportService.getTaskManager()); transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new TransportHandler()); } class TransportHandler implements TransportRequestHandler { @Override - public final void messageReceived(final Request request, final TransportChannel channel) throws Exception { + public final void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception { + messageReceived(request, channel); + } + + @Override + public final void messageReceived(Request request, TransportChannel channel) throws Exception { execute(request, new ActionListener() { @Override public void onResponse(Response response) { diff --git a/core/src/main/java/org/elasticsearch/action/support/TransportAction.java b/core/src/main/java/org/elasticsearch/action/support/TransportAction.java index 07ddff3348c..3e0454550ba 100644 --- a/core/src/main/java/org/elasticsearch/action/support/TransportAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/TransportAction.java @@ -29,6 +29,8 @@ import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; import java.util.concurrent.atomic.AtomicInteger; @@ -45,15 +47,17 @@ public abstract class TransportAction execute(Request request) { @@ -63,6 +67,28 @@ public abstract class TransportAction listener) { + Task task = taskManager.register("transport", actionName, request); + if (task == null) { + execute(null, request, listener); + } else { + execute(task, request, new ActionListener() { + @Override + public void onResponse(Response response) { + taskManager.unregister(task); + listener.onResponse(response); + } + + @Override + public void onFailure(Throwable e) { + taskManager.unregister(task); + listener.onFailure(e); + } + }); + } + } + + private final void execute(Task task, Request request, ActionListener listener) { + ActionRequestValidationException validationException = request.validate(); if (validationException != null) { listener.onFailure(validationException); @@ -71,17 +97,21 @@ public abstract class TransportAction(this, logger); - requestFilterChain.proceed(actionName, request, listener); + requestFilterChain.proceed(task, actionName, request, listener); } } + protected void doExecute(Task task, Request request, ActionListener listener) { + doExecute(request, listener); + } + protected abstract void doExecute(Request request, ActionListener listener); private static class RequestFilterChain implements ActionFilterChain { @@ -96,13 +126,13 @@ public abstract class TransportAction(actionName, listener, new ResponseFilterChain(this.action.filters, logger))); + this.action.doExecute(task, (Request) request, new FilteredActionListener(actionName, listener, new ResponseFilterChain(this.action.filters, logger))); } else { listener.onFailure(new IllegalStateException("proceed was called too many times")); } @@ -131,7 +161,7 @@ public abstract class TransportAction listener) throws Exception; + /** + * Override this operation if access to the task parameter is needed + */ + protected void masterOperation(Task task, Request request, ClusterState state, ActionListener listener) throws Exception { + masterOperation(request, state, listener); + } + protected boolean localExecute(Request request) { return false; } @@ -91,8 +99,14 @@ public abstract class TransportMasterNodeAction listener) { - new AsyncSingleAction(request, listener).start(); + protected final void doExecute(final Request request, ActionListener listener) { + logger.warn("attempt to execute a master node operation without task"); + throw new UnsupportedOperationException("task parameter is required for this operation"); + } + + @Override + protected void doExecute(Task task, final Request request, ActionListener listener) { + new AsyncSingleAction(task, request, listener).start(); } class AsyncSingleAction { @@ -100,6 +114,7 @@ public abstract class TransportMasterNodeAction listener; private final Request request; private volatile ClusterStateObserver observer; + private final Task task; private final ClusterStateObserver.ChangePredicate retryableOrNoBlockPredicate = new ClusterStateObserver.ValidationPredicate() { @Override @@ -109,7 +124,8 @@ public abstract class TransportMasterNodeAction listener) { + AsyncSingleAction(Task task, Request request, ActionListener listener) { + this.task = task; this.request = request; // TODO do we really need to wrap it in a listener? the handlers should be cheap if ((listener instanceof ThreadedActionListener) == false) { @@ -157,7 +173,7 @@ public abstract class TransportMasterNodeAction listener) { - new AsyncAction(request, listener).start(); + protected final void doExecute(NodesRequest request, ActionListener listener) { + logger.warn("attempt to execute a transport nodes operation without a task"); + throw new UnsupportedOperationException("task parameter is required for this operation"); + } + + @Override + protected void doExecute(Task task, NodesRequest request, ActionListener listener) { + new AsyncAction(task, request, listener).start(); } protected boolean transportCompress() { @@ -106,8 +114,10 @@ public abstract class TransportNodesAction listener; private final AtomicReferenceArray responses; private final AtomicInteger counter = new AtomicInteger(); + private final Task task; - private AsyncAction(NodesRequest request, ActionListener listener) { + private AsyncAction(Task task, NodesRequest request, ActionListener listener) { + this.task = task; this.request = request; this.listener = listener; ClusterState clusterState = clusterService.state(); @@ -150,7 +160,11 @@ public abstract class TransportNodesAction() { @Override public NodeResponse newInstance() { diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 80ac93e981b..6fd7da91645 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -114,7 +114,7 @@ public abstract class TransportReplicationAction request, Supplier replicaRequest, String executor) { - super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver); + super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver, transportService.getTaskManager()); this.transportService = transportService; this.clusterService = clusterService; this.indicesService = indicesService; diff --git a/core/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java b/core/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java index 2a7e19bfade..47eebc9cfcd 100644 --- a/core/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java @@ -66,7 +66,7 @@ public abstract class TransportSingleShardAction request, String executor) { - super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver); + super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver, transportService.getTaskManager()); this.clusterService = clusterService; this.transportService = transportService; diff --git a/core/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksRequest.java b/core/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksRequest.java new file mode 100644 index 00000000000..a1e485bb64f --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksRequest.java @@ -0,0 +1,195 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.support.tasks; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.tasks.ChildTask; +import org.elasticsearch.tasks.Task; + +import java.io.IOException; + +/** + * A base class for task requests + */ +public class BaseTasksRequest extends ActionRequest { + + + public static final String[] ALL_ACTIONS = Strings.EMPTY_ARRAY; + + public static final String[] ALL_NODES = Strings.EMPTY_ARRAY; + + public static final long ALL_TASKS = -1L; + + private String[] nodesIds = ALL_NODES; + + private TimeValue timeout; + + private String[] actions = ALL_ACTIONS; + + private String parentNode; + + private long parentTaskId = ALL_TASKS; + + public BaseTasksRequest() { + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + /** + * Get information about tasks from nodes based on the nodes ids specified. + * If none are passed, information for all nodes will be returned. + */ + public BaseTasksRequest(ActionRequest request, String... nodesIds) { + super(request); + this.nodesIds = nodesIds; + } + + /** + * Get information about tasks from nodes based on the nodes ids specified. + * If none are passed, information for all nodes will be returned. + */ + public BaseTasksRequest(String... nodesIds) { + this.nodesIds = nodesIds; + } + + /** + * Sets the list of action masks for the actions that should be returned + */ + @SuppressWarnings("unchecked") + public final T actions(String... actions) { + this.actions = actions; + return (T) this; + } + + /** + * Return the list of action masks for the actions that should be returned + */ + public String[] actions() { + return actions; + } + + public final String[] nodesIds() { + return nodesIds; + } + + @SuppressWarnings("unchecked") + public final T nodesIds(String... nodesIds) { + this.nodesIds = nodesIds; + return (T) this; + } + + /** + * Returns the parent node id that tasks should be filtered by + */ + public String parentNode() { + return parentNode; + } + + @SuppressWarnings("unchecked") + public T parentNode(String parentNode) { + this.parentNode = parentNode; + return (T) this; + } + + /** + * Returns the parent task id that tasks should be filtered by + */ + public long parentTaskId() { + return parentTaskId; + } + + @SuppressWarnings("unchecked") + public T parentTaskId(long parentTaskId) { + this.parentTaskId = parentTaskId; + return (T) this; + } + + + public TimeValue timeout() { + return this.timeout; + } + + @SuppressWarnings("unchecked") + public final T timeout(TimeValue timeout) { + this.timeout = timeout; + return (T) this; + } + + @SuppressWarnings("unchecked") + public final T timeout(String timeout) { + this.timeout = TimeValue.parseTimeValue(timeout, null, getClass().getSimpleName() + ".timeout"); + return (T) this; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + nodesIds = in.readStringArray(); + actions = in.readStringArray(); + parentNode = in.readOptionalString(); + parentTaskId = in.readLong(); + if (in.readBoolean()) { + timeout = TimeValue.readTimeValue(in); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArrayNullable(nodesIds); + out.writeStringArrayNullable(actions); + out.writeOptionalString(parentNode); + out.writeLong(parentTaskId); + out.writeOptionalStreamable(timeout); + } + + public boolean match(Task task) { + if (actions() != null && actions().length > 0 && Regex.simpleMatch(actions(), task.getAction()) == false) { + return false; + } + if (parentNode() != null || parentTaskId() != BaseTasksRequest.ALL_TASKS) { + if (task instanceof ChildTask) { + if (parentNode() != null) { + if (parentNode().equals(((ChildTask) task).getParentNode()) == false) { + return false; + } + } + if (parentTaskId() != BaseTasksRequest.ALL_TASKS) { + if (parentTaskId() != ((ChildTask) task).getParentId()) { + return false; + } + } + } else { + // This is not a child task and we need to match parent node or id + return false; + } + } + return true; + } +} diff --git a/core/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksResponse.java b/core/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksResponse.java new file mode 100644 index 00000000000..43be2b46db1 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksResponse.java @@ -0,0 +1,92 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.support.tasks; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.TaskOperationFailure; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + + +/** + * Base class for responses of task-related operations + */ +public class BaseTasksResponse extends ActionResponse { + private List taskFailures; + private List nodeFailures; + + public BaseTasksResponse() { + } + + public BaseTasksResponse(List taskFailures, List nodeFailures) { + this.taskFailures = taskFailures == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(taskFailures)); + this.nodeFailures = nodeFailures == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(nodeFailures)); + } + + /** + * The list of task failures exception. + */ + public List getTaskFailures() { + return taskFailures; + } + + /** + * The list of node failures exception. + */ + public List getNodeFailures() { + return nodeFailures; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + int size = in.readVInt(); + List taskFailures = new ArrayList<>(); + for (int i = 0; i < size; i++) { + taskFailures.add(new TaskOperationFailure(in)); + } + size = in.readVInt(); + this.taskFailures = Collections.unmodifiableList(taskFailures); + List nodeFailures = new ArrayList<>(); + for (int i = 0; i < size; i++) { + nodeFailures.add(new FailedNodeException(in)); + } + this.nodeFailures = Collections.unmodifiableList(nodeFailures); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVInt(taskFailures.size()); + for (TaskOperationFailure exp : taskFailures) { + exp.writeTo(out); + } + out.writeVInt(nodeFailures.size()); + for (FailedNodeException exp : nodeFailures) { + exp.writeTo(out); + } + } +} diff --git a/core/src/main/java/org/elasticsearch/action/support/tasks/TasksRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/support/tasks/TasksRequestBuilder.java new file mode 100644 index 00000000000..a7265ce9998 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/support/tasks/TasksRequestBuilder.java @@ -0,0 +1,54 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.support.tasks; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.common.unit.TimeValue; + +/** + * Builder for task-based requests + */ +public class TasksRequestBuilder , Response extends BaseTasksResponse, RequestBuilder extends TasksRequestBuilder> + extends ActionRequestBuilder { + + protected TasksRequestBuilder(ElasticsearchClient client, Action action, Request request) { + super(client, action, request); + } + + @SuppressWarnings("unchecked") + public final RequestBuilder setNodesIds(String... nodesIds) { + request.nodesIds(nodesIds); + return (RequestBuilder) this; + } + + @SuppressWarnings("unchecked") + public final RequestBuilder setActions(String... actions) { + request.actions(actions); + return (RequestBuilder) this; + } + + @SuppressWarnings("unchecked") + public final RequestBuilder setTimeout(TimeValue timeout) { + request.timeout(timeout); + return (RequestBuilder) this; + } +} + diff --git a/core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java b/core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java new file mode 100644 index 00000000000..42be7e4eefc --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java @@ -0,0 +1,380 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.support.tasks; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.NoSuchNodeException; +import org.elasticsearch.action.TaskOperationFailure; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ChildTaskRequest; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.BaseTransportResponseHandler; +import org.elasticsearch.transport.NodeShouldNotConnectException; +import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequestHandler; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.function.Supplier; + +/** + * The base class for transport actions that are interacting with currently running tasks. + */ +public abstract class TransportTasksAction< + TasksRequest extends BaseTasksRequest, + TasksResponse extends BaseTasksResponse, + TaskResponse extends Writeable + > extends HandledTransportAction { + + protected final ClusterName clusterName; + protected final ClusterService clusterService; + protected final TransportService transportService; + protected final Supplier requestSupplier; + protected final Supplier responseSupplier; + + protected final String transportNodeAction; + + protected TransportTasksAction(Settings settings, String actionName, ClusterName clusterName, ThreadPool threadPool, + ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, Supplier requestSupplier, + Supplier responseSupplier, + String nodeExecutor) { + super(settings, actionName, threadPool, transportService, actionFilters, indexNameExpressionResolver, requestSupplier); + this.clusterName = clusterName; + this.clusterService = clusterService; + this.transportService = transportService; + this.transportNodeAction = actionName + "[n]"; + this.requestSupplier = requestSupplier; + this.responseSupplier = responseSupplier; + + transportService.registerRequestHandler(transportNodeAction, NodeTaskRequest::new, nodeExecutor, new NodeTransportHandler()); + } + + @Override + protected final void doExecute(TasksRequest request, ActionListener listener) { + logger.warn("attempt to execute a transport tasks operation without a task"); + throw new UnsupportedOperationException("task parameter is required for this operation"); + } + + @Override + protected void doExecute(Task task, TasksRequest request, ActionListener listener) { + new AsyncAction(task, request, listener).start(); + } + + private NodeTasksResponse nodeOperation(NodeTaskRequest nodeTaskRequest) { + TasksRequest request = nodeTaskRequest.tasksRequest; + List results = new ArrayList<>(); + List exceptions = new ArrayList<>(); + for (Task task : taskManager.getTasks().values()) { + // First check action and node filters + if (request.match(task)) { + try { + results.add(taskOperation(request, task)); + } catch (Exception ex) { + exceptions.add(new TaskOperationFailure(clusterService.localNode().id(), task.getId(), ex)); + } + } + } + return new NodeTasksResponse(clusterService.localNode().id(), results, exceptions); + } + + protected String[] filterNodeIds(DiscoveryNodes nodes, String[] nodesIds) { + return nodesIds; + } + + protected String[] resolveNodes(TasksRequest request, ClusterState clusterState) { + return clusterState.nodes().resolveNodesIds(request.nodesIds()); + } + + protected abstract TasksResponse newResponse(TasksRequest request, List tasks, List taskOperationFailures, List failedNodeExceptions); + + @SuppressWarnings("unchecked") + protected TasksResponse newResponse(TasksRequest request, AtomicReferenceArray responses) { + List tasks = new ArrayList<>(); + List failedNodeExceptions = new ArrayList<>(); + List taskOperationFailures = new ArrayList<>(); + for (int i = 0; i < responses.length(); i++) { + Object response = responses.get(i); + if (response instanceof FailedNodeException) { + failedNodeExceptions.add((FailedNodeException) response); + } else { + NodeTasksResponse tasksResponse = (NodeTasksResponse) response; + if (tasksResponse.results != null) { + tasks.addAll(tasksResponse.results); + } + if (tasksResponse.exceptions != null) { + taskOperationFailures.addAll(tasksResponse.exceptions); + } + } + } + return newResponse(request, tasks, taskOperationFailures, failedNodeExceptions); + } + + protected abstract TaskResponse readTaskResponse(StreamInput in) throws IOException; + + protected abstract TaskResponse taskOperation(TasksRequest request, Task task); + + protected boolean transportCompress() { + return false; + } + + protected abstract boolean accumulateExceptions(); + + private class AsyncAction { + + private final TasksRequest request; + private final String[] nodesIds; + private final DiscoveryNode[] nodes; + private final ActionListener listener; + private final AtomicReferenceArray responses; + private final AtomicInteger counter = new AtomicInteger(); + private final Task task; + + private AsyncAction(Task task, TasksRequest request, ActionListener listener) { + this.task = task; + this.request = request; + this.listener = listener; + ClusterState clusterState = clusterService.state(); + String[] nodesIds = resolveNodes(request, clusterState); + this.nodesIds = filterNodeIds(clusterState.nodes(), nodesIds); + ImmutableOpenMap nodes = clusterState.nodes().nodes(); + this.nodes = new DiscoveryNode[nodesIds.length]; + for (int i = 0; i < nodesIds.length; i++) { + this.nodes[i] = nodes.get(nodesIds[i]); + } + this.responses = new AtomicReferenceArray<>(this.nodesIds.length); + } + + private void start() { + if (nodesIds.length == 0) { + // nothing to do + try { + listener.onResponse(newResponse(request, responses)); + } catch (Throwable t) { + logger.debug("failed to generate empty response", t); + listener.onFailure(t); + } + } else { + TransportRequestOptions.Builder builder = TransportRequestOptions.builder(); + if (request.timeout() != null) { + builder.withTimeout(request.timeout()); + } + builder.withCompress(transportCompress()); + for (int i = 0; i < nodesIds.length; i++) { + final String nodeId = nodesIds[i]; + final int idx = i; + final DiscoveryNode node = nodes[i]; + try { + if (node == null) { + onFailure(idx, nodeId, new NoSuchNodeException(nodeId)); + } else if (!clusterService.localNode().shouldConnectTo(node) && !clusterService.localNode().equals(node)) { + // the check "!clusterService.localNode().equals(node)" is to maintain backward comp. where before + // we allowed to connect from "local" client node to itself, certain tests rely on it, if we remove it, we need to fix + // those (and they randomize the client node usage, so tricky to find when) + onFailure(idx, nodeId, new NodeShouldNotConnectException(clusterService.localNode(), node)); + } else { + NodeTaskRequest nodeRequest = new NodeTaskRequest(request); + nodeRequest.setParentTask(clusterService.localNode().id(), task.getId()); + transportService.sendRequest(node, transportNodeAction, nodeRequest, builder.build(), new BaseTransportResponseHandler() { + @Override + public NodeTasksResponse newInstance() { + return new NodeTasksResponse(); + } + + @Override + public void handleResponse(NodeTasksResponse response) { + onOperation(idx, response); + } + + @Override + public void handleException(TransportException exp) { + onFailure(idx, node.id(), exp); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }); + } + } catch (Throwable t) { + onFailure(idx, nodeId, t); + } + } + } + } + + private void onOperation(int idx, NodeTasksResponse nodeResponse) { + responses.set(idx, nodeResponse); + if (counter.incrementAndGet() == responses.length()) { + finishHim(); + } + } + + private void onFailure(int idx, String nodeId, Throwable t) { + if (logger.isDebugEnabled() && !(t instanceof NodeShouldNotConnectException)) { + logger.debug("failed to execute on node [{}]", t, nodeId); + } + if (accumulateExceptions()) { + responses.set(idx, new FailedNodeException(nodeId, "Failed node [" + nodeId + "]", t)); + } + if (counter.incrementAndGet() == responses.length()) { + finishHim(); + } + } + + private void finishHim() { + TasksResponse finalResponse; + try { + finalResponse = newResponse(request, responses); + } catch (Throwable t) { + logger.debug("failed to combine responses from nodes", t); + listener.onFailure(t); + return; + } + listener.onResponse(finalResponse); + } + } + + class NodeTransportHandler implements TransportRequestHandler { + + @Override + public void messageReceived(final NodeTaskRequest request, final TransportChannel channel) throws Exception { + channel.sendResponse(nodeOperation(request)); + } + } + + + private class NodeTaskRequest extends ChildTaskRequest { + private TasksRequest tasksRequest; + + protected NodeTaskRequest() { + super(); + } + + protected NodeTaskRequest(TasksRequest tasksRequest) { + super(tasksRequest); + this.tasksRequest = tasksRequest; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + tasksRequest = requestSupplier.get(); + tasksRequest.readFrom(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + tasksRequest.writeTo(out); + } + } + + private class NodeTasksResponse extends TransportResponse { + protected String nodeId; + protected List exceptions; + protected List results; + + public NodeTasksResponse() { + } + + public NodeTasksResponse(String nodeId, + List results, + List exceptions) { + this.nodeId = nodeId; + this.results = results; + this.exceptions = exceptions; + } + + public String getNodeId() { + return nodeId; + } + + public List getExceptions() { + return exceptions; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + nodeId = in.readString(); + int resultsSize = in.readVInt(); + results = new ArrayList<>(resultsSize); + for (; resultsSize > 0; resultsSize--) { + final TaskResponse result = in.readBoolean() ? readTaskResponse(in) : null; + results.add(result); + } + if (in.readBoolean()) { + int taskFailures = in.readVInt(); + exceptions = new ArrayList<>(taskFailures); + for (int i = 0; i < taskFailures; i++) { + exceptions.add(new TaskOperationFailure(in)); + } + } else { + exceptions = null; + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(nodeId); + out.writeVInt(results.size()); + for (TaskResponse result : results) { + if (result != null) { + out.writeBoolean(true); + result.writeTo(out); + } else { + out.writeBoolean(false); + } + } + out.writeBoolean(exceptions != null); + if (exceptions != null) { + int taskFailures = exceptions.size(); + out.writeVInt(taskFailures); + for (TaskOperationFailure exception : exceptions) { + exception.writeTo(out); + } + } + } + } +} diff --git a/core/src/main/java/org/elasticsearch/client/ClusterAdminClient.java b/core/src/main/java/org/elasticsearch/client/ClusterAdminClient.java index 947e0f9b526..2cee4341a39 100644 --- a/core/src/main/java/org/elasticsearch/client/ClusterAdminClient.java +++ b/core/src/main/java/org/elasticsearch/client/ClusterAdminClient.java @@ -33,6 +33,9 @@ import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequestBuilder; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequestBuilder; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest; import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequestBuilder; import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryResponse; @@ -249,6 +252,29 @@ public interface ClusterAdminClient extends ElasticsearchClient { */ NodesHotThreadsRequestBuilder prepareNodesHotThreads(String... nodesIds); + /** + * List tasks + * + * @param request The nodes tasks request + * @return The result future + * @see org.elasticsearch.client.Requests#listTasksRequest(String...) + */ + ActionFuture listTasks(ListTasksRequest request); + + /** + * List active tasks + * + * @param request The nodes tasks request + * @param listener A listener to be notified with a result + * @see org.elasticsearch.client.Requests#listTasksRequest(String...) + */ + void listTasks(ListTasksRequest request, ActionListener listener); + + /** + * List active tasks + */ + ListTasksRequestBuilder prepareListTasks(String... nodesIds); + /** * Returns list of shards the given search would be executed on. */ diff --git a/core/src/main/java/org/elasticsearch/client/Requests.java b/core/src/main/java/org/elasticsearch/client/Requests.java index 2640618f1bc..7fb6c5c2de0 100644 --- a/core/src/main/java/org/elasticsearch/client/Requests.java +++ b/core/src/main/java/org/elasticsearch/client/Requests.java @@ -22,6 +22,7 @@ package org.elasticsearch.client; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest; import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest; @@ -404,6 +405,27 @@ public class Requests { return new ClusterStatsRequest(); } + /** + * Creates a nodes tasks request against all the nodes. + * + * @return The nodes tasks request + * @see org.elasticsearch.client.ClusterAdminClient#listTasks(ListTasksRequest) + */ + public static ListTasksRequest listTasksRequest() { + return new ListTasksRequest(); + } + + /** + * Creates a nodes tasks request against one or more nodes. Pass null or an empty array for all nodes. + * + * @param nodesIds The nodes ids to get the tasks for + * @return The nodes tasks request + * @see org.elasticsearch.client.ClusterAdminClient#nodesStats(org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest) + */ + public static ListTasksRequest listTasksRequest(String... nodesIds) { + return new ListTasksRequest(nodesIds); + } + /** * Registers snapshot repository * diff --git a/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java b/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java index 8c0fe125135..e085c8da075 100644 --- a/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java +++ b/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java @@ -41,6 +41,10 @@ import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsAction; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequestBuilder; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequestBuilder; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryAction; import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest; import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequestBuilder; @@ -968,6 +972,21 @@ public abstract class AbstractClient extends AbstractComponent implements Client return new NodesHotThreadsRequestBuilder(this, NodesHotThreadsAction.INSTANCE).setNodesIds(nodesIds); } + @Override + public ActionFuture listTasks(final ListTasksRequest request) { + return execute(ListTasksAction.INSTANCE, request); + } + + @Override + public void listTasks(final ListTasksRequest request, final ActionListener listener) { + execute(ListTasksAction.INSTANCE, request, listener); + } + + @Override + public ListTasksRequestBuilder prepareListTasks(String... nodesIds) { + return new ListTasksRequestBuilder(this, ListTasksAction.INSTANCE).setNodesIds(nodesIds); + } + @Override public ActionFuture searchShards(final ClusterSearchShardsRequest request) { return execute(ClusterSearchShardsAction.INSTANCE, request); diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterService.java b/core/src/main/java/org/elasticsearch/cluster/ClusterService.java index b682b0cc61d..12845fa3fa4 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterService.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterService.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.service.PendingClusterTask; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.tasks.TaskManager; import java.util.List; @@ -148,4 +149,9 @@ public interface ClusterService extends LifecycleComponent { * @return A zero time value if the queue is empty, otherwise the time value oldest task waiting in the queue */ TimeValue getMaxTaskWaitTime(); + + /** + * Returns task manager created in the cluster service + */ + TaskManager getTaskManager(); } diff --git a/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index 5fc013b6633..5e945d372fe 100644 --- a/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -65,6 +65,7 @@ import org.elasticsearch.common.util.concurrent.PrioritizedRunnable; import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoveryService; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -133,6 +134,8 @@ public class InternalClusterService extends AbstractLifecycleComponent List readList(StreamInputReader reader) throws IOException { + int count = readVInt(); + List builder = new ArrayList<>(count); + for (int i=0; i { + T read(StreamInput t) throws IOException; +} diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java b/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java index ad02a0fd525..b423841acd0 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java @@ -667,4 +667,14 @@ public abstract class StreamOutput extends OutputStream { writeDouble(geoPoint.lat()); writeDouble(geoPoint.lon()); } + + /** + * Writes a list of {@link Writeable} objects + */ + public > void writeList(List list) throws IOException { + writeVInt(list.size()); + for (T obj: list) { + obj.writeTo(this); + } + } } diff --git a/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java b/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java index f7eab3da2ac..12e22a7693b 100644 --- a/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java +++ b/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java @@ -36,6 +36,7 @@ import org.elasticsearch.rest.action.admin.cluster.health.RestClusterHealthActio import org.elasticsearch.rest.action.admin.cluster.node.hotthreads.RestNodesHotThreadsAction; import org.elasticsearch.rest.action.admin.cluster.node.info.RestNodesInfoAction; import org.elasticsearch.rest.action.admin.cluster.node.stats.RestNodesStatsAction; +import org.elasticsearch.rest.action.admin.cluster.node.tasks.RestListTasksAction; import org.elasticsearch.rest.action.admin.cluster.repositories.delete.RestDeleteRepositoryAction; import org.elasticsearch.rest.action.admin.cluster.repositories.get.RestGetRepositoriesAction; import org.elasticsearch.rest.action.admin.cluster.repositories.put.RestPutRepositoryAction; @@ -259,7 +260,10 @@ public class NetworkModule extends AbstractModule { RestFieldStatsAction.class, // no abstract cat action - RestCatAction.class + RestCatAction.class, + + // Tasks API + RestListTasksAction.class ); private static final List> builtinCatHandlers = Arrays.asList( diff --git a/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/tasks/RestListTasksAction.java b/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/tasks/RestListTasksAction.java new file mode 100644 index 00000000000..813c7822428 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/tasks/RestListTasksAction.java @@ -0,0 +1,61 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.rest.action.admin.cluster.node.tasks; + +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.support.RestToXContentListener; + +import static org.elasticsearch.rest.RestRequest.Method.GET; + + +public class RestListTasksAction extends BaseRestHandler { + + @Inject + public RestListTasksAction(Settings settings, RestController controller, Client client) { + super(settings, controller, client); + controller.registerHandler(GET, "/_tasks", this); + controller.registerHandler(GET, "/_tasks/{nodeId}", this); + controller.registerHandler(GET, "/_tasks/{nodeId}/{actions}", this); + } + + @Override + public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { + boolean detailed = request.paramAsBoolean("detailed", false); + String[] nodesIds = Strings.splitStringByCommaToArray(request.param("nodeId")); + String[] actions = Strings.splitStringByCommaToArray(request.param("actions")); + String parentNode = request.param("parent_node"); + long parentTaskId = request.paramAsLong("parent_task", ListTasksRequest.ALL_TASKS); + + ListTasksRequest listTasksRequest = new ListTasksRequest(nodesIds); + listTasksRequest.detailed(detailed); + listTasksRequest.actions(actions); + listTasksRequest.parentNode(parentNode); + listTasksRequest.parentTaskId(parentTaskId); + client.admin().cluster().listTasks(listTasksRequest, new RestToXContentListener<>(channel)); + } +} diff --git a/core/src/main/java/org/elasticsearch/tasks/ChildTask.java b/core/src/main/java/org/elasticsearch/tasks/ChildTask.java new file mode 100644 index 00000000000..14d49baf398 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/tasks/ChildTask.java @@ -0,0 +1,57 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.tasks; + +import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.inject.Provider; + +/** + * Child task + */ +public class ChildTask extends Task { + + private final String parentNode; + + private final long parentId; + + public ChildTask(long id, String type, String action, Provider description, String parentNode, long parentId) { + super(id, type, action, description); + this.parentNode = parentNode; + this.parentId = parentId; + } + + /** + * Returns parent node of the task or null if task doesn't have any parent tasks + */ + public String getParentNode() { + return parentNode; + } + + /** + * Returns id of the parent task or -1L if task doesn't have any parent tasks + */ + public long getParentId() { + return parentId; + } + + public TaskInfo taskInfo(DiscoveryNode node, boolean detailed) { + return new TaskInfo(node, getId(), getType(), getAction(), detailed ? getDescription() : null, parentNode, parentId); + } +} diff --git a/core/src/main/java/org/elasticsearch/tasks/Task.java b/core/src/main/java/org/elasticsearch/tasks/Task.java new file mode 100644 index 00000000000..9e925b09d1a --- /dev/null +++ b/core/src/main/java/org/elasticsearch/tasks/Task.java @@ -0,0 +1,79 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.elasticsearch.tasks; + +import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.inject.Provider; + +/** + * Current task information + */ +public class Task { + + private final long id; + + private final String type; + + private final String action; + + private final Provider description; + + public Task(long id, String type, String action, Provider description) { + this.id = id; + this.type = type; + this.action = action; + this.description = description; + } + + public TaskInfo taskInfo(DiscoveryNode node, boolean detailed) { + return new TaskInfo(node, id, type, action, detailed ? getDescription() : null); + } + + /** + * Returns task id + */ + public long getId() { + return id; + } + + /** + * Returns task channel type (netty, transport, direct) + */ + public String getType() { + return type; + } + + /** + * Returns task action + */ + public String getAction() { + return action; + } + + /** + * Generates task description + */ + public String getDescription() { + return description.get(); + } + +} diff --git a/core/src/main/java/org/elasticsearch/tasks/TaskManager.java b/core/src/main/java/org/elasticsearch/tasks/TaskManager.java new file mode 100644 index 00000000000..a4bf118f024 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/tasks/TaskManager.java @@ -0,0 +1,76 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.tasks; + +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.ConcurrentMapLong; +import org.elasticsearch.transport.TransportRequest; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Task Manager service for keeping track of currently running tasks on the nodes + */ +public class TaskManager extends AbstractComponent { + + private final ConcurrentMapLong tasks = ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency(); + + private final AtomicLong taskIdGenerator = new AtomicLong(); + + public TaskManager(Settings settings) { + super(settings); + } + + /** + * Registers a task without parent task + */ + public Task register(String type, String action, TransportRequest request) { + Task task = request.createTask(taskIdGenerator.incrementAndGet(), type, action); + if (task != null) { + if (logger.isDebugEnabled()) { + logger.debug("register {} [{}] [{}] [{}]", task.getId(), type, action, task.getDescription()); + } + Task previousTask = tasks.put(task.getId(), task); + assert previousTask == null; + } + return task; + } + + /** + * Unregister the task + */ + public void unregister(Task task) { + logger.debug("unregister task for id: {}", task.getId()); + tasks.remove(task.getId()); + } + + /** + * Returns the list of currently running tasks on the node + */ + public Map getTasks() { + return Collections.unmodifiableMap(new HashMap<>(tasks)); + } + +} diff --git a/core/src/main/java/org/elasticsearch/transport/DelegatingTransportChannel.java b/core/src/main/java/org/elasticsearch/transport/DelegatingTransportChannel.java new file mode 100644 index 00000000000..f6b178dba8d --- /dev/null +++ b/core/src/main/java/org/elasticsearch/transport/DelegatingTransportChannel.java @@ -0,0 +1,74 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import java.io.IOException; + +/** + * Wrapper around transport channel that delegates all requests to the + * underlying channel + */ +public class DelegatingTransportChannel implements TransportChannel { + + private final TransportChannel channel; + + protected DelegatingTransportChannel(TransportChannel channel) { + this.channel = channel; + } + + @Override + public String action() { + return channel.action(); + } + + @Override + public String getProfileName() { + return channel.getProfileName(); + } + + @Override + public long getRequestId() { + return channel.getRequestId(); + } + + @Override + public String getChannelType() { + return channel.getChannelType(); + } + + @Override + public void sendResponse(TransportResponse response) throws IOException { + channel.sendResponse(response); + } + + @Override + public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException { + channel.sendResponse(response, options); + } + + @Override + public void sendResponse(Throwable error) throws IOException { + channel.sendResponse(error); + } + + public TransportChannel getChannel() { + return channel; + } +} diff --git a/core/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java b/core/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java index 79bf97b908d..e58df27644e 100644 --- a/core/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java +++ b/core/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java @@ -19,7 +19,10 @@ package org.elasticsearch.transport; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskManager; +import java.io.IOException; import java.util.function.Supplier; /** @@ -32,14 +35,16 @@ public class RequestHandlerRegistry { private final boolean forceExecution; private final String executor; private final Supplier requestFactory; + private final TaskManager taskManager; - public RequestHandlerRegistry(String action, Supplier requestFactory, TransportRequestHandler handler, String executor, boolean forceExecution) { + public RequestHandlerRegistry(String action, Supplier requestFactory, TaskManager taskManager, TransportRequestHandler handler, String executor, boolean forceExecution) { this.action = action; this.requestFactory = requestFactory; assert newRequest() != null; this.handler = handler; this.forceExecution = forceExecution; this.executor = executor; + this.taskManager = taskManager; } public String getAction() { @@ -50,8 +55,21 @@ public class RequestHandlerRegistry { return requestFactory.get(); } - public TransportRequestHandler getHandler() { - return handler; + public void processMessageReceived(Request request, TransportChannel channel) throws Exception { + final Task task = taskManager.register(channel.getChannelType(), action, request); + if (task == null) { + handler.messageReceived(request, channel); + } else { + boolean success = false; + try { + handler.messageReceived(request, new TransportChannelWrapper(taskManager, task, channel), task); + success = true; + } finally { + if (success == false) { + taskManager.unregister(task); + } + } + } } public boolean isForceExecution() { @@ -61,4 +79,44 @@ public class RequestHandlerRegistry { public String getExecutor() { return executor; } + + @Override + public String toString() { + return handler.toString(); + } + + private static class TransportChannelWrapper extends DelegatingTransportChannel { + + private final Task task; + + private final TaskManager taskManager; + + public TransportChannelWrapper(TaskManager taskManager, Task task, TransportChannel channel) { + super(channel); + this.task = task; + this.taskManager = taskManager; + } + + @Override + public void sendResponse(TransportResponse response) throws IOException { + endTask(); + super.sendResponse(response); + } + + @Override + public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException { + endTask(); + super.sendResponse(response, options); + } + + @Override + public void sendResponse(Throwable error) throws IOException { + endTask(); + super.sendResponse(error); + } + + private void endTask() { + taskManager.unregister(task); + } + } } diff --git a/core/src/main/java/org/elasticsearch/transport/TransportChannel.java b/core/src/main/java/org/elasticsearch/transport/TransportChannel.java index 4c7678d60f0..53fd4ebe91e 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportChannel.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportChannel.java @@ -30,6 +30,10 @@ public interface TransportChannel { String getProfileName(); + long getRequestId(); + + String getChannelType(); + void sendResponse(TransportResponse response) throws IOException; void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException; diff --git a/core/src/main/java/org/elasticsearch/transport/TransportRequest.java b/core/src/main/java/org/elasticsearch/transport/TransportRequest.java index ddf54179476..d5c1491f1a6 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportRequest.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportRequest.java @@ -19,6 +19,8 @@ package org.elasticsearch.transport; +import org.elasticsearch.tasks.Task; + /** */ public abstract class TransportRequest extends TransportMessage { @@ -43,4 +45,14 @@ public abstract class TransportRequest extends TransportMessage { + /** + * Override this method if access to the Task parameter is needed + */ + default void messageReceived(final T request, final TransportChannel channel, Task task) throws Exception { + messageReceived(request, channel); + } + void messageReceived(T request, TransportChannel channel) throws Exception; } diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index 709323cb305..5d74c4a408f 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -39,6 +39,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentMapLong; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; @@ -66,6 +67,7 @@ public class TransportService extends AbstractLifecycleComponent requestHandlers = Collections.emptyMap(); final Object requestHandlerMutex = new Object(); @@ -114,6 +116,7 @@ public class TransportService extends AbstractLifecycleComponent void registerRequestHandler(String action, Supplier requestFactory, String executor, TransportRequestHandler handler) { - RequestHandlerRegistry reg = new RequestHandlerRegistry<>(action, requestFactory, handler, executor, false); + RequestHandlerRegistry reg = new RequestHandlerRegistry<>(action, requestFactory, taskManager, handler, executor, false); registerRequestHandler(reg); } @@ -404,7 +411,7 @@ public class TransportService extends AbstractLifecycleComponent void registerRequestHandler(String action, Supplier request, String executor, boolean forceExecution, TransportRequestHandler handler) { - RequestHandlerRegistry reg = new RequestHandlerRegistry<>(action, request, handler, executor, forceExecution); + RequestHandlerRegistry reg = new RequestHandlerRegistry<>(action, request, taskManager, handler, executor, forceExecution); registerRequestHandler(reg); } @@ -413,7 +420,7 @@ public class TransportService extends AbstractLifecycleComponent implem request.readFrom(stream); if (ThreadPool.Names.SAME.equals(reg.getExecutor())) { //noinspection unchecked - reg.getHandler().messageReceived(request, transportChannel); + reg.processMessageReceived(request, transportChannel); } else { threadPool.executor(reg.getExecutor()).execute(new AbstractRunnable() { @Override protected void doRun() throws Exception { //noinspection unchecked - reg.getHandler().messageReceived(request, transportChannel); + reg.processMessageReceived(request, transportChannel); } @Override diff --git a/core/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java b/core/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java index e6dfa97b613..e1e85e9a12f 100644 --- a/core/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java +++ b/core/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java @@ -106,6 +106,16 @@ public class LocalTransportChannel implements TransportChannel { sourceTransportServiceAdapter.onResponseSent(requestId, action, error); } + @Override + public long getRequestId() { + return requestId; + } + + @Override + public String getChannelType() { + return "local"; + } + private void writeResponseExceptionHeader(BytesStreamOutput stream) throws IOException { stream.writeLong(requestId); byte status = 0; diff --git a/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java b/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java index 99ce5faa086..8df17f73233 100644 --- a/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java +++ b/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java @@ -255,7 +255,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { request.readFrom(buffer); if (ThreadPool.Names.SAME.equals(reg.getExecutor())) { //noinspection unchecked - reg.getHandler().messageReceived(request, transportChannel); + reg.processMessageReceived(request, transportChannel); } else { threadPool.executor(reg.getExecutor()).execute(new RequestHandler(reg, request, transportChannel)); } @@ -310,7 +310,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { @SuppressWarnings({"unchecked"}) @Override protected void doRun() throws Exception { - reg.getHandler().messageReceived(request, transportChannel); + reg.processMessageReceived(request, transportChannel); } @Override diff --git a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java index edfe9f39b48..aaf33c2fd5a 100644 --- a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java +++ b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java @@ -132,6 +132,16 @@ public class NettyTransportChannel implements TransportChannel { transportServiceAdapter.onResponseSent(requestId, action, error); } + @Override + public long getRequestId() { + return requestId; + } + + @Override + public String getChannelType() { + return "netty"; + } + /** * Returns the underlying netty channel. This method is intended be used for access to netty to get additional * details when processing the request and may be used by plugins. Responses should be sent using the methods diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java new file mode 100644 index 00000000000..4228c9fa699 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java @@ -0,0 +1,38 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.node.tasks; + +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; +import org.elasticsearch.test.ESIntegTestCase; + +import static org.hamcrest.Matchers.greaterThanOrEqualTo; + +/** + * Integration tests for task management API + */ +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE) +public class TasksIT extends ESIntegTestCase { + + public void testTaskCounts() { + // Run only on data nodes + ListTasksResponse response = client().admin().cluster().prepareListTasks("data:true").setActions(ListTasksAction.NAME + "[n]").get(); + assertThat(response.getTasks().size(), greaterThanOrEqualTo(cluster().numDataNodes())); + } +} diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java new file mode 100644 index 00000000000..55c10aa298e --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java @@ -0,0 +1,664 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.node.tasks; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.TaskOperationFailure; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; +import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo; +import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.nodes.BaseNodeRequest; +import org.elasticsearch.action.support.nodes.BaseNodeResponse; +import org.elasticsearch.action.support.nodes.BaseNodesRequest; +import org.elasticsearch.action.support.nodes.BaseNodesResponse; +import org.elasticsearch.action.support.nodes.TransportNodesAction; +import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; +import org.elasticsearch.action.support.tasks.BaseTasksRequest; +import org.elasticsearch.action.support.tasks.BaseTasksResponse; +import org.elasticsearch.action.support.tasks.TransportTasksAction; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.tasks.ChildTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.cluster.TestClusterService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.transport.local.LocalTransport; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReferenceArray; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.endsWith; +import static org.hamcrest.Matchers.not; + +public class TransportTasksActionTests extends ESTestCase { + + private static ThreadPool threadPool; + private static final ClusterName clusterName = new ClusterName("test-cluster"); + private TestNode[] testNodes; + private int nodesCount; + + @BeforeClass + public static void beforeClass() { + threadPool = new ThreadPool(TransportTasksActionTests.class.getSimpleName()); + } + + @AfterClass + public static void afterClass() { + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + threadPool = null; + } + + @Before + public final void setupTestNodes() throws Exception { + nodesCount = randomIntBetween(2, 10); + testNodes = new TestNode[nodesCount]; + for (int i = 0; i < testNodes.length; i++) { + testNodes[i] = new TestNode("node" + i, threadPool, Settings.EMPTY); + } + } + + @After + public final void shutdownTestNodes() throws Exception { + for (TestNode testNode : testNodes) { + testNode.close(); + } + } + + private static class TestNode implements Releasable { + public TestNode(String name, ThreadPool threadPool, Settings settings) { + clusterService = new TestClusterService(threadPool); + transportService = new TransportService(Settings.EMPTY, + new LocalTransport(Settings.EMPTY, threadPool, Version.CURRENT, new NamedWriteableRegistry()), + threadPool); + transportService.start(); + discoveryNode = new DiscoveryNode(name, transportService.boundAddress().publishAddress(), Version.CURRENT); + transportListTasksAction = new TransportListTasksAction(settings, clusterName, threadPool, clusterService, transportService, + new ActionFilters(Collections.emptySet()), new IndexNameExpressionResolver(settings)); + } + + public final TestClusterService clusterService; + public final TransportService transportService; + public final DiscoveryNode discoveryNode; + public final TransportListTasksAction transportListTasksAction; + + @Override + public void close() { + transportService.close(); + } + } + + public static void connectNodes(TestNode... nodes) { + DiscoveryNode[] discoveryNodes = new DiscoveryNode[nodes.length]; + for (int i = 0; i < nodes.length; i++) { + discoveryNodes[i] = nodes[i].discoveryNode; + } + DiscoveryNode master = discoveryNodes[0]; + for (TestNode node : nodes) { + node.clusterService.setState(ClusterStateCreationUtils.state(node.discoveryNode, master, discoveryNodes)); + } + for (TestNode nodeA : nodes) { + for (TestNode nodeB : nodes) { + nodeA.transportService.connectToNode(nodeB.discoveryNode); + } + } + } + + public static class NodeRequest extends BaseNodeRequest { + protected String requestName; + private boolean enableTaskManager; + + public NodeRequest() { + super(); + } + + public NodeRequest(NodesRequest request, String nodeId) { + super(request, nodeId); + requestName = request.requestName; + enableTaskManager = request.enableTaskManager; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + requestName = in.readString(); + enableTaskManager = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(requestName); + out.writeBoolean(enableTaskManager); + } + + @Override + public String getDescription() { + return "NodeRequest[" + requestName + ", " + enableTaskManager + "]"; + } + + @Override + public Task createTask(long id, String type, String action) { + if (enableTaskManager) { + return super.createTask(id, type, action); + } else { + return null; + } + } + } + + public static class NodesRequest extends BaseNodesRequest { + private String requestName; + private boolean enableTaskManager; + + private NodesRequest() { + super(); + } + + public NodesRequest(String requestName, String... nodesIds) { + this(requestName, true, nodesIds); + } + + public NodesRequest(String requestName, boolean enableTaskManager, String... nodesIds) { + super(nodesIds); + this.requestName = requestName; + this.enableTaskManager = enableTaskManager; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + requestName = in.readString(); + enableTaskManager = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(requestName); + out.writeBoolean(enableTaskManager); + } + + @Override + public String getDescription() { + return "NodesRequest[" + requestName + ", " + enableTaskManager + "]"; + } + + @Override + public Task createTask(long id, String type, String action) { + if (enableTaskManager) { + return super.createTask(id, type, action); + } else { + return null; + } + } + } + + static class NodeResponse extends BaseNodeResponse { + + protected NodeResponse() { + super(); + } + + protected NodeResponse(DiscoveryNode node) { + super(node); + } + } + + static class NodesResponse extends BaseNodesResponse { + + private int failureCount; + + protected NodesResponse(ClusterName clusterName, NodeResponse[] nodes, int failureCount) { + super(clusterName, nodes); + this.failureCount = failureCount; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + failureCount = in.readVInt(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVInt(failureCount); + } + + public int failureCount() { + return failureCount; + } + } + + /** + * Simulates node-based task that can be used to block node tasks so they are guaranteed to be registered by task manager + */ + abstract class TestNodesAction extends TransportNodesAction { + + TestNodesAction(Settings settings, String actionName, ClusterName clusterName, ThreadPool threadPool, + ClusterService clusterService, TransportService transportService) { + super(settings, actionName, clusterName, threadPool, clusterService, transportService, + new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY), + NodesRequest::new, NodeRequest::new, ThreadPool.Names.GENERIC); + } + + @Override + protected NodesResponse newResponse(NodesRequest request, AtomicReferenceArray responses) { + final List nodesList = new ArrayList<>(); + int failureCount = 0; + for (int i = 0; i < responses.length(); i++) { + Object resp = responses.get(i); + if (resp instanceof NodeResponse) { // will also filter out null response for unallocated ones + nodesList.add((NodeResponse) resp); + } else if (resp instanceof FailedNodeException) { + failureCount++; + } else { + logger.warn("unknown response type [{}], expected NodeLocalGatewayMetaState or FailedNodeException", resp); + } + } + return new NodesResponse(clusterName, nodesList.toArray(new NodeResponse[nodesList.size()]), failureCount); + } + + @Override + protected NodeRequest newNodeRequest(String nodeId, NodesRequest request) { + return new NodeRequest(request, nodeId); + } + + @Override + protected NodeResponse newNodeResponse() { + return new NodeResponse(); + } + + @Override + protected abstract NodeResponse nodeOperation(NodeRequest request); + + @Override + protected boolean accumulateExceptions() { + return true; + } + } + + static class TestTaskResponse implements Writeable { + + private final String status; + + public TestTaskResponse(StreamInput in) throws IOException { + status = in.readString(); + } + + public TestTaskResponse(String status) { + this.status = status; + } + + public String getStatus() { + return status; + } + + @Override + public TestTaskResponse readFrom(StreamInput in) throws IOException { + return new TestTaskResponse(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(status); + } + } + + + static class TestTasksRequest extends BaseTasksRequest { + + } + + static class TestTasksResponse extends BaseTasksResponse { + + private List tasks; + + public TestTasksResponse() { + + } + + public TestTasksResponse(List tasks, List taskFailures, List nodeFailures) { + super(taskFailures, nodeFailures); + this.tasks = tasks == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(tasks)); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + int taskCount = in.readVInt(); + List builder = new ArrayList<>(); + for (int i = 0; i < taskCount; i++) { + builder.add(new TestTaskResponse(in)); + } + tasks = Collections.unmodifiableList(builder); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVInt(tasks.size()); + for (TestTaskResponse task : tasks) { + task.writeTo(out); + } + } + } + + /** + * Test class for testing task operations + */ + static abstract class TestTasksAction extends TransportTasksAction { + + protected TestTasksAction(Settings settings, String actionName, ClusterName clusterName, ThreadPool threadPool, ClusterService clusterService, + TransportService transportService) { + super(settings, actionName, clusterName, threadPool, clusterService, transportService, new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY), + TestTasksRequest::new, TestTasksResponse::new, ThreadPool.Names.MANAGEMENT); + } + + @Override + protected TestTasksResponse newResponse(TestTasksRequest request, List tasks, List taskOperationFailures, List failedNodeExceptions) { + return new TestTasksResponse(tasks, taskOperationFailures, failedNodeExceptions); + } + + @Override + protected TestTaskResponse readTaskResponse(StreamInput in) throws IOException { + return new TestTaskResponse(in); + } + + @Override + protected boolean accumulateExceptions() { + return true; + } + } + + private ActionFuture startBlockingTestNodesAction(CountDownLatch checkLatch) throws InterruptedException { + return startBlockingTestNodesAction(checkLatch, new NodesRequest("Test Request")); + } + + private ActionFuture startBlockingTestNodesAction(CountDownLatch checkLatch, NodesRequest request) throws InterruptedException { + CountDownLatch actionLatch = new CountDownLatch(nodesCount); + TestNodesAction[] actions = new TestNodesAction[nodesCount]; + for (int i = 0; i < testNodes.length; i++) { + final int node = i; + actions[i] = new TestNodesAction(Settings.EMPTY, "testAction", clusterName, threadPool, testNodes[i].clusterService, testNodes[i].transportService) { + @Override + protected NodeResponse nodeOperation(NodeRequest request) { + logger.info("Action on node " + node); + actionLatch.countDown(); + try { + checkLatch.await(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + logger.info("Action on node " + node + " finished"); + return new NodeResponse(testNodes[node].discoveryNode); + } + }; + } + // Make sure no tasks are running + for (TestNode node : testNodes) { + assertEquals(0, node.transportService.getTaskManager().getTasks().size()); + } + ActionFuture future = actions[0].execute(request); + logger.info("Awaiting for all actions to start"); + actionLatch.await(); + logger.info("Done waiting for all actions to start"); + return future; + } + + public void testRunningTasksCount() throws Exception { + connectNodes(testNodes); + CountDownLatch checkLatch = new CountDownLatch(1); + ActionFuture future = startBlockingTestNodesAction(checkLatch); + + // Check task counts using taskManager + Map localTasks = testNodes[0].transportService.getTaskManager().getTasks(); + assertEquals(2, localTasks.size()); // all node tasks + 1 coordinating task + Task coordinatingTask = localTasks.get(Collections.min(localTasks.keySet())); + Task subTask = localTasks.get(Collections.max(localTasks.keySet())); + assertThat(subTask.getAction(), endsWith("[n]")); + assertThat(coordinatingTask.getAction(), not(endsWith("[n]"))); + for (int i = 1; i < testNodes.length; i++) { + Map remoteTasks = testNodes[i].transportService.getTaskManager().getTasks(); + assertEquals(1, remoteTasks.size()); + Task remoteTask = remoteTasks.values().iterator().next(); + assertThat(remoteTask.getAction(), endsWith("[n]")); + } + + // Check task counts using transport + int testNodeNum = randomIntBetween(0, testNodes.length - 1); + TestNode testNode = testNodes[testNodeNum]; + ListTasksRequest listTasksRequest = new ListTasksRequest(); + listTasksRequest.actions("testAction*"); // pick all test actions + logger.info("Listing currently running tasks using node [{}]", testNodeNum); + ListTasksResponse response = testNode.transportListTasksAction.execute(listTasksRequest).get(); + logger.info("Checking currently running tasks"); + assertEquals(testNodes.length, response.getPerNodeTasks().size()); + + // Coordinating node + assertEquals(2, response.getPerNodeTasks().get(testNodes[0].discoveryNode).size()); + // Other nodes node + for (int i = 1; i < testNodes.length; i++) { + assertEquals(1, response.getPerNodeTasks().get(testNodes[i].discoveryNode).size()); + } + + // Check task counts using transport with filtering + testNode = testNodes[randomIntBetween(0, testNodes.length - 1)]; + listTasksRequest = new ListTasksRequest(); + listTasksRequest.actions("testAction[n]"); // only pick node actions + response = testNode.transportListTasksAction.execute(listTasksRequest).get(); + assertEquals(testNodes.length, response.getPerNodeTasks().size()); + for (Map.Entry> entry : response.getPerNodeTasks().entrySet()) { + assertEquals(1, entry.getValue().size()); + assertNull(entry.getValue().get(0).getDescription()); + } + + // Check task counts using transport with detailed description + listTasksRequest.detailed(true); // same request only with detailed description + response = testNode.transportListTasksAction.execute(listTasksRequest).get(); + assertEquals(testNodes.length, response.getPerNodeTasks().size()); + for (Map.Entry> entry : response.getPerNodeTasks().entrySet()) { + assertEquals(1, entry.getValue().size()); + assertEquals("NodeRequest[Test Request, true]", entry.getValue().get(0).getDescription()); + } + + // Release all tasks and wait for response + checkLatch.countDown(); + NodesResponse responses = future.get(); + assertEquals(0, responses.failureCount()); + + // Make sure that we don't have any lingering tasks + for (TestNode node : testNodes) { + assertEquals(0, node.transportService.getTaskManager().getTasks().size()); + } + } + + public void testFindChildTasks() throws Exception { + connectNodes(testNodes); + CountDownLatch checkLatch = new CountDownLatch(1); + ActionFuture future = startBlockingTestNodesAction(checkLatch); + + TestNode testNode = testNodes[randomIntBetween(0, testNodes.length - 1)]; + + // Get the parent task + ListTasksRequest listTasksRequest = new ListTasksRequest(); + listTasksRequest.actions("testAction"); + ListTasksResponse response = testNode.transportListTasksAction.execute(listTasksRequest).get(); + assertEquals(1, response.getTasks().size()); + String parentNode = response.getTasks().get(0).getNode().getId(); + long parentTaskId = response.getTasks().get(0).getId(); + + // Find tasks with common parent + listTasksRequest = new ListTasksRequest(); + listTasksRequest.parentNode(parentNode); + listTasksRequest.parentTaskId(parentTaskId); + response = testNode.transportListTasksAction.execute(listTasksRequest).get(); + assertEquals(testNodes.length, response.getTasks().size()); + for (TaskInfo task : response.getTasks()) { + assertEquals("testAction[n]", task.getAction()); + assertEquals(parentNode, task.getParentNode()); + assertEquals(parentTaskId, task.getParentId()); + } + + // Release all tasks and wait for response + checkLatch.countDown(); + NodesResponse responses = future.get(); + assertEquals(0, responses.failureCount()); + } + + public void testTaskManagementOptOut() throws Exception { + connectNodes(testNodes); + CountDownLatch checkLatch = new CountDownLatch(1); + // Starting actions that disable task manager + ActionFuture future = startBlockingTestNodesAction(checkLatch, new NodesRequest("Test Request", false)); + + TestNode testNode = testNodes[randomIntBetween(0, testNodes.length - 1)]; + + // Get the parent task + ListTasksRequest listTasksRequest = new ListTasksRequest(); + listTasksRequest.actions("testAction*"); + ListTasksResponse response = testNode.transportListTasksAction.execute(listTasksRequest).get(); + assertEquals(0, response.getTasks().size()); + + // Release all tasks and wait for response + checkLatch.countDown(); + NodesResponse responses = future.get(); + assertEquals(0, responses.failureCount()); + } + + public void testTasksDescriptions() throws Exception { + connectNodes(testNodes); + CountDownLatch checkLatch = new CountDownLatch(1); + ActionFuture future = startBlockingTestNodesAction(checkLatch); + + // Check task counts using transport with filtering + TestNode testNode = testNodes[randomIntBetween(0, testNodes.length - 1)]; + ListTasksRequest listTasksRequest = new ListTasksRequest(); + listTasksRequest.actions("testAction[n]"); // only pick node actions + ListTasksResponse response = testNode.transportListTasksAction.execute(listTasksRequest).get(); + assertEquals(testNodes.length, response.getPerNodeTasks().size()); + for (Map.Entry> entry : response.getPerNodeTasks().entrySet()) { + assertEquals(1, entry.getValue().size()); + assertNull(entry.getValue().get(0).getDescription()); + } + + // Check task counts using transport with detailed description + listTasksRequest.detailed(true); // same request only with detailed description + response = testNode.transportListTasksAction.execute(listTasksRequest).get(); + assertEquals(testNodes.length, response.getPerNodeTasks().size()); + for (Map.Entry> entry : response.getPerNodeTasks().entrySet()) { + assertEquals(1, entry.getValue().size()); + assertEquals("NodeRequest[Test Request, true]", entry.getValue().get(0).getDescription()); + } + + // Release all tasks and wait for response + checkLatch.countDown(); + NodesResponse responses = future.get(); + assertEquals(0, responses.failureCount()); + } + + public void testFailedTasksCount() throws ExecutionException, InterruptedException, IOException { + connectNodes(testNodes); + TestNodesAction[] actions = new TestNodesAction[nodesCount]; + for (int i = 0; i < testNodes.length; i++) { + final int node = i; + actions[i] = new TestNodesAction(Settings.EMPTY, "testAction", clusterName, threadPool, testNodes[i].clusterService, testNodes[i].transportService) { + @Override + protected NodeResponse nodeOperation(NodeRequest request) { + logger.info("Action on node " + node); + throw new RuntimeException("Test exception"); + } + }; + } + + for (TestNode testNode : testNodes) { + assertEquals(0, testNode.transportService.getTaskManager().getTasks().size()); + } + NodesRequest request = new NodesRequest("Test Request"); + NodesResponse responses = actions[0].execute(request).get(); + assertEquals(nodesCount, responses.failureCount()); + } + + public void testTaskLevelActionFailures() throws ExecutionException, InterruptedException, IOException { + connectNodes(testNodes); + CountDownLatch checkLatch = new CountDownLatch(1); + ActionFuture future = startBlockingTestNodesAction(checkLatch); + + TestTasksAction[] tasksActions = new TestTasksAction[nodesCount]; + final int failTaskOnNode = randomIntBetween(1, nodesCount - 1); + for (int i = 0; i < testNodes.length; i++) { + final int node = i; + // Simulate task action that fails on one of the tasks on one of the nodes + tasksActions[i] = new TestTasksAction(Settings.EMPTY, "testTasksAction", clusterName, threadPool, testNodes[i].clusterService, testNodes[i].transportService) { + @Override + protected TestTaskResponse taskOperation(TestTasksRequest request, Task task) { + logger.info("Task action on node " + node); + if (failTaskOnNode == node && ((ChildTask) task).getParentNode() != null) { + logger.info("Failing on node " + node); + throw new RuntimeException("Task level failure"); + } + return new TestTaskResponse("Success on node " + node); + } + }; + } + + // Run task action on node tasks that are currently running + // should be successful on all nodes except one + TestTasksRequest testTasksRequest = new TestTasksRequest(); + testTasksRequest.actions("testAction[n]"); // pick all test actions + TestTasksResponse response = tasksActions[0].execute(testTasksRequest).get(); + // Get successful responses from all nodes except one + assertEquals(testNodes.length - 1, response.tasks.size()); + assertEquals(1, response.getTaskFailures().size()); // one task failed + assertThat(response.getTaskFailures().get(0).getReason(), containsString("Task level failure")); + assertEquals(0, response.getNodeFailures().size()); // no nodes failed + + // Release all node tasks and wait for response + checkLatch.countDown(); + NodesResponse responses = future.get(); + assertEquals(0, responses.failureCount()); + } +} diff --git a/core/src/test/java/org/elasticsearch/action/support/TransportActionFilterChainTests.java b/core/src/test/java/org/elasticsearch/action/support/TransportActionFilterChainTests.java index f21013b7fbe..6a14989be1a 100644 --- a/core/src/test/java/org/elasticsearch/action/support/TransportActionFilterChainTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/TransportActionFilterChainTests.java @@ -25,6 +25,8 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; import org.junit.Before; @@ -67,7 +69,7 @@ public class TransportActionFilterChainTests extends ESTestCase { String actionName = randomAsciiOfLength(randomInt(30)); ActionFilters actionFilters = new ActionFilters(filters); - TransportAction transportAction = new TransportAction(Settings.EMPTY, actionName, null, actionFilters, null) { + TransportAction transportAction = new TransportAction(Settings.EMPTY, actionName, null, actionFilters, null, new TaskManager(Settings.EMPTY)) { @Override protected void doExecute(TestRequest request, ActionListener listener) { listener.onResponse(new TestResponse()); @@ -147,7 +149,7 @@ public class TransportActionFilterChainTests extends ESTestCase { String actionName = randomAsciiOfLength(randomInt(30)); ActionFilters actionFilters = new ActionFilters(filters); - TransportAction transportAction = new TransportAction(Settings.EMPTY, actionName, null, actionFilters, null) { + TransportAction transportAction = new TransportAction(Settings.EMPTY, actionName, null, actionFilters, null, new TaskManager(Settings.EMPTY)) { @Override protected void doExecute(TestRequest request, ActionListener listener) { listener.onResponse(new TestResponse()); @@ -218,9 +220,9 @@ public class TransportActionFilterChainTests extends ESTestCase { RequestTestFilter testFilter = new RequestTestFilter(randomInt(), new RequestCallback() { @Override - public void execute(final String action, final ActionRequest actionRequest, final ActionListener actionListener, final ActionFilterChain actionFilterChain) { + public void execute(Task task, final String action, final ActionRequest actionRequest, final ActionListener actionListener, final ActionFilterChain actionFilterChain) { for (int i = 0; i <= additionalContinueCount; i++) { - actionFilterChain.proceed(action, actionRequest, actionListener); + actionFilterChain.proceed(task, action, actionRequest, actionListener); } } }); @@ -230,7 +232,7 @@ public class TransportActionFilterChainTests extends ESTestCase { String actionName = randomAsciiOfLength(randomInt(30)); ActionFilters actionFilters = new ActionFilters(filters); - TransportAction transportAction = new TransportAction(Settings.EMPTY, actionName, null, actionFilters, null) { + TransportAction transportAction = new TransportAction(Settings.EMPTY, actionName, null, actionFilters, null, new TaskManager(Settings.EMPTY)) { @Override protected void doExecute(TestRequest request, ActionListener listener) { listener.onResponse(new TestResponse()); @@ -286,7 +288,7 @@ public class TransportActionFilterChainTests extends ESTestCase { String actionName = randomAsciiOfLength(randomInt(30)); ActionFilters actionFilters = new ActionFilters(filters); - TransportAction transportAction = new TransportAction(Settings.EMPTY, actionName, null, actionFilters, null) { + TransportAction transportAction = new TransportAction(Settings.EMPTY, actionName, null, actionFilters, null, new TaskManager(Settings.EMPTY)) { @Override protected void doExecute(TestRequest request, ActionListener listener) { listener.onResponse(new TestResponse()); @@ -344,11 +346,11 @@ public class TransportActionFilterChainTests extends ESTestCase { @SuppressWarnings("unchecked") @Override - public void apply(String action, ActionRequest actionRequest, ActionListener actionListener, ActionFilterChain actionFilterChain) { + public void apply(Task task, String action, ActionRequest actionRequest, ActionListener actionListener, ActionFilterChain actionFilterChain) { this.runs.incrementAndGet(); this.lastActionName = action; this.executionToken = counter.incrementAndGet(); - this.callback.execute(action, actionRequest, actionListener, actionFilterChain); + this.callback.execute(task, action, actionRequest, actionListener, actionFilterChain); } @Override @@ -375,8 +377,8 @@ public class TransportActionFilterChainTests extends ESTestCase { } @Override - public void apply(String action, ActionRequest request, ActionListener listener, ActionFilterChain chain) { - chain.proceed(action, request, listener); + public void apply(Task task, String action, ActionRequest request, ActionListener listener, ActionFilterChain chain) { + chain.proceed(task, action, request, listener); } @Override @@ -391,20 +393,20 @@ public class TransportActionFilterChainTests extends ESTestCase { private static enum RequestOperation implements RequestCallback { CONTINUE_PROCESSING { @Override - public void execute(String action, ActionRequest actionRequest, ActionListener actionListener, ActionFilterChain actionFilterChain) { - actionFilterChain.proceed(action, actionRequest, actionListener); + public void execute(Task task, String action, ActionRequest actionRequest, ActionListener actionListener, ActionFilterChain actionFilterChain) { + actionFilterChain.proceed(task, action, actionRequest, actionListener); } }, LISTENER_RESPONSE { @Override @SuppressWarnings("unchecked") - public void execute(String action, ActionRequest actionRequest, ActionListener actionListener, ActionFilterChain actionFilterChain) { + public void execute(Task task, String action, ActionRequest actionRequest, ActionListener actionListener, ActionFilterChain actionFilterChain) { actionListener.onResponse(new TestResponse()); } }, LISTENER_FAILURE { @Override - public void execute(String action, ActionRequest actionRequest, ActionListener actionListener, ActionFilterChain actionFilterChain) { + public void execute(Task task, String action, ActionRequest actionRequest, ActionListener actionListener, ActionFilterChain actionFilterChain) { actionListener.onFailure(new ElasticsearchTimeoutException("")); } } @@ -433,7 +435,7 @@ public class TransportActionFilterChainTests extends ESTestCase { } private static interface RequestCallback { - void execute(String action, ActionRequest actionRequest, ActionListener actionListener, ActionFilterChain actionFilterChain); + void execute(Task task, String action, ActionRequest actionRequest, ActionListener actionListener, ActionFilterChain actionFilterChain); } private static interface ResponseCallback { diff --git a/core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java b/core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java index 6f5be649451..e4a1a9deed9 100644 --- a/core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java @@ -470,5 +470,16 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase { @Override public void sendResponse(Throwable error) throws IOException { } + + @Override + public long getRequestId() { + return 0; + } + + @Override + public String getChannelType() { + return "test"; + } + } -} \ No newline at end of file +} diff --git a/core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java b/core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java index 104c94db0c2..980558c2716 100644 --- a/core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java @@ -42,6 +42,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.cluster.TestClusterService; import org.elasticsearch.test.transport.CapturingTransport; @@ -119,9 +120,9 @@ public class TransportMasterNodeActionTests extends ESTestCase { } @Override - protected void doExecute(final Request request, ActionListener listener) { + protected void doExecute(Task task, final Request request, ActionListener listener) { // remove unneeded threading by wrapping listener with SAME to prevent super.doExecute from wrapping it with LISTENER - super.doExecute(request, new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.SAME, listener)); + super.doExecute(task, request, new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.SAME, listener)); } @Override @@ -159,7 +160,7 @@ public class TransportMasterNodeActionTests extends ESTestCase { new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) { @Override - protected void masterOperation(Request request, ClusterState state, ActionListener listener) throws Exception { + protected void masterOperation(Task task, Request request, ClusterState state, ActionListener listener) throws Exception { if (masterOperationFailure) { listener.onFailure(exception); } else { diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index fc419128b7c..fdcf4b07245 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -862,6 +862,16 @@ public class TransportReplicationActionTests extends ESTestCase { public void sendResponse(Throwable error) throws IOException { listener.onFailure(error); } + + @Override + public long getRequestId() { + return 0; + } + + @Override + public String getChannelType() { + return "replica_test"; + } }; } diff --git a/core/src/test/java/org/elasticsearch/client/node/NodeClientHeadersTests.java b/core/src/test/java/org/elasticsearch/client/node/NodeClientHeadersTests.java index e93fbc8e14a..e7ba8de0f97 100644 --- a/core/src/test/java/org/elasticsearch/client/node/NodeClientHeadersTests.java +++ b/core/src/test/java/org/elasticsearch/client/node/NodeClientHeadersTests.java @@ -29,6 +29,8 @@ import org.elasticsearch.client.AbstractClientHeadersTestCase; import org.elasticsearch.client.Client; import org.elasticsearch.client.support.Headers; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; import java.util.Collections; @@ -61,7 +63,7 @@ public class NodeClientHeadersTests extends AbstractClientHeadersTestCase { private static class InternalTransportAction extends TransportAction { private InternalTransportAction(Settings settings, String actionName, ThreadPool threadPool) { - super(settings, actionName, threadPool, EMPTY_FILTERS, null); + super(settings, actionName, threadPool, EMPTY_FILTERS, null, new TaskManager(settings)); } @Override diff --git a/core/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java b/core/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java index f452bb5c36c..f127ae28378 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java +++ b/core/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java @@ -37,6 +37,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.Transport; diff --git a/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java b/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java index 093e46186b3..72ace64d9ee 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java +++ b/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.LocalTransportAddress; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.BaseTransportResponseHandler; diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java index 0637ae7de8e..6faa02e16d7 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java @@ -886,5 +886,15 @@ public class PublishClusterStateActionTests extends ESTestCase { this.error.set(error); assertThat(response.get(), nullValue()); } + + @Override + public long getRequestId() { + return 0; + } + + @Override + public String getChannelType() { + return "capturing"; + } } } diff --git a/core/src/test/java/org/elasticsearch/transport/local/SimpleLocalTransportTests.java b/core/src/test/java/org/elasticsearch/transport/local/SimpleLocalTransportTests.java index 1d1ad8d5ba9..d3de3ce59fb 100644 --- a/core/src/test/java/org/elasticsearch/transport/local/SimpleLocalTransportTests.java +++ b/core/src/test/java/org/elasticsearch/transport/local/SimpleLocalTransportTests.java @@ -33,4 +33,4 @@ public class SimpleLocalTransportTests extends AbstractSimpleTransportTestCase { transportService.start(); return transportService; } -} \ No newline at end of file +} diff --git a/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportIT.java b/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportIT.java index 78caef498d1..55f9bc49df3 100644 --- a/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportIT.java +++ b/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportIT.java @@ -146,7 +146,7 @@ public class NettyTransportIT extends ESIntegTestCase { } if (reg.getExecutor() == ThreadPool.Names.SAME) { //noinspection unchecked - reg.getHandler().messageReceived(request, transportChannel); + reg.processMessageReceived(request, transportChannel); } else { threadPool.executor(reg.getExecutor()).execute(new RequestHandler(reg, request, transportChannel)); } @@ -176,7 +176,7 @@ public class NettyTransportIT extends ESIntegTestCase { @SuppressWarnings({"unchecked"}) @Override protected void doRun() throws Exception { - reg.getHandler().messageReceived(request, transportChannel); + reg.processMessageReceived(request, transportChannel); } @Override diff --git a/core/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java b/core/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java index 89702118b49..bd26319f4ab 100644 --- a/core/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java +++ b/core/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java @@ -36,6 +36,7 @@ import java.net.UnknownHostException; import static org.hamcrest.Matchers.containsString; public class SimpleNettyTransportTests extends AbstractSimpleTransportTestCase { + @Override protected MockTransportService build(Settings settings, Version version, NamedWriteableRegistry namedWriteableRegistry) { settings = Settings.builder().put(settings).put("transport.tcp.port", "0").build(); @@ -53,4 +54,4 @@ public class SimpleNettyTransportTests extends AbstractSimpleTransportTestCase { assertThat(e.getMessage(), containsString("[localhost/127.0.0.1:9876]")); } } -} \ No newline at end of file +} diff --git a/docs/reference/tasks/list.asciidoc b/docs/reference/tasks/list.asciidoc new file mode 100644 index 00000000000..bfd7f12c43f --- /dev/null +++ b/docs/reference/tasks/list.asciidoc @@ -0,0 +1,46 @@ +[[tasks-list]] +== Tasks List + +The task management API allows to retrieve information about currently running tasks. + +[source,js] +-------------------------------------------------- +curl -XGET 'http://localhost:9200/_tasks' +curl -XGET 'http://localhost:9200/_tasks/nodeId1,nodeId2' +curl -XGET 'http://localhost:9200/_tasks/nodeId1,nodeId2/cluster:*' +-------------------------------------------------- + +The first command retrieves all tasks currently running on all nodes. +The second command selectively retrieves tasks from nodes +`nodeId1` and `nodeId2`. All the nodes selective options are explained +<>. +The third command retrieves all cluster-related tasks running on nodes `nodeId1` and `nodeId2`. + +The result will look similar to: + +[source,js] +-------------------------------------------------- +{ + "nodes" : { + "fDlEl7PrQi6F-awHZ3aaDw" : { + "name" : "Gazer", + "transport_address" : "127.0.0.1:9300", + "host" : "127.0.0.1", + "ip" : "127.0.0.1:9300", + "tasks" : [ { + "node" : "fDlEl7PrQi6F-awHZ3aaDw", + "id" : 105, + "type" : "transport", + "action" : "cluster:monitor/nodes/tasks" + }, { + "node" : "fDlEl7PrQi6F-awHZ3aaDw", + "id" : 106, + "type" : "direct", + "action" : "cluster:monitor/nodes/tasks[n]", + "parent_node" : "fDlEl7PrQi6F-awHZ3aaDw", + "parent_id" : 105 + } ] + } + } +} +-------------------------------------------------- diff --git a/modules/lang-groovy/src/test/java/org/elasticsearch/messy/tests/IndicesRequestTests.java b/modules/lang-groovy/src/test/java/org/elasticsearch/messy/tests/IndicesRequestTests.java index 516514599ae..9a3a4632c6f 100644 --- a/modules/lang-groovy/src/test/java/org/elasticsearch/messy/tests/IndicesRequestTests.java +++ b/modules/lang-groovy/src/test/java/org/elasticsearch/messy/tests/IndicesRequestTests.java @@ -92,6 +92,7 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.Script; import org.elasticsearch.script.groovy.GroovyPlugin; import org.elasticsearch.search.action.SearchServiceTransportAction; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; diff --git a/plugins/delete-by-query/src/main/java/org/elasticsearch/action/deletebyquery/DeleteByQueryRequest.java b/plugins/delete-by-query/src/main/java/org/elasticsearch/action/deletebyquery/DeleteByQueryRequest.java index e3faeb1badc..fa83fb4fd3d 100644 --- a/plugins/delete-by-query/src/main/java/org/elasticsearch/action/deletebyquery/DeleteByQueryRequest.java +++ b/plugins/delete-by-query/src/main/java/org/elasticsearch/action/deletebyquery/DeleteByQueryRequest.java @@ -239,6 +239,6 @@ public class DeleteByQueryRequest extends ActionRequest im ", size:" + size + ", timeout:" + timeout + ", routing:" + routing + - ", query:" + query.toString(); + ", query:" + query; } } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/tasks.list.json b/rest-api-spec/src/main/resources/rest-api-spec/api/tasks.list.json new file mode 100644 index 00000000000..02acf10d1f7 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/tasks.list.json @@ -0,0 +1,35 @@ +{ + "tasks.list": { + "documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/master/tasks-list.html", + "methods": ["GET"], + "url": { + "path": "/_tasks", + "paths": ["/_tasks", "/_tasks/{node_id}", "/_tasks/{node_id}/{actions}"], + "parts": { + "node_id": { + "type": "list", + "description": "A comma-separated list of node IDs or names to limit the returned information; use `_local` to return information from the node you're connecting to, leave empty to get information from all nodes" + }, + "actions": { + "type": "list", + "description": "A comma-separated list of actions that should be returned. Leave empty to return all." + } + }, + "params": { + "detailed": { + "type": "boolean", + "description": "Return detailed task information (default: false)" + }, + "parent_node": { + "type": "string", + "description": "Return tasks with specified parent node." + }, + "parent_task": { + "type" : "number", + "description" : "Return tasks with specified parent task id. Set to -1 to return all." + } + } + }, + "body": null + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/tasks.list/10_basic.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/tasks.list/10_basic.yaml new file mode 100644 index 00000000000..252649abbb6 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/tasks.list/10_basic.yaml @@ -0,0 +1,6 @@ +--- +"tasks_list test": + - do: + tasks.list: {} + + - is_true: nodes diff --git a/test/framework/src/main/java/org/elasticsearch/test/cluster/NoopClusterService.java b/test/framework/src/main/java/org/elasticsearch/test/cluster/NoopClusterService.java index 06def24d6f3..a19d19dcf47 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/cluster/NoopClusterService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/cluster/NoopClusterService.java @@ -38,6 +38,7 @@ import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.transport.DummyTransportAddress; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.tasks.TaskManager; import java.util.List; @@ -147,6 +148,11 @@ public class NoopClusterService implements ClusterService { return TimeValue.timeValueMillis(0); } + @Override + public TaskManager getTaskManager() { + return null; + } + @Override public Lifecycle.State lifecycleState() { return null; diff --git a/test/framework/src/main/java/org/elasticsearch/test/cluster/TestClusterService.java b/test/framework/src/main/java/org/elasticsearch/test/cluster/TestClusterService.java index 93327213bbc..92b5f9a584b 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/cluster/TestClusterService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/cluster/TestClusterService.java @@ -47,6 +47,7 @@ import org.elasticsearch.common.transport.DummyTransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; import java.util.Arrays; @@ -60,6 +61,7 @@ import java.util.concurrent.ScheduledFuture; public class TestClusterService implements ClusterService { volatile ClusterState state; + private volatile TaskManager taskManager; private final List listeners = new CopyOnWriteArrayList<>(); private final Queue onGoingTimeouts = ConcurrentCollections.newQueue(); private final ThreadPool threadPool; @@ -72,6 +74,7 @@ public class TestClusterService implements ClusterService { public TestClusterService(ThreadPool threadPool) { this(ClusterState.builder(new ClusterName("test")).build(), threadPool); + taskManager = new TaskManager(Settings.EMPTY); } public TestClusterService(ClusterState state) { @@ -230,6 +233,11 @@ public class TestClusterService implements ClusterService { throw new UnsupportedOperationException(); } + @Override + public TaskManager getTaskManager() { + return taskManager; + } + @Override public List pendingTasks() { throw new UnsupportedOperationException(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index d636341e42f..0a8869b20cf 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.RequestHandlerRegistry;