Task Management: Add framework for registering and communicating with tasks

Adds task manager class and enables all activities to register with the task manager. Currently, the immutable Transport*Activity class represents activity itself shared across all requests. This PR adds and an additional structure Task that keeps track of currently running requests and can be used to communicate with these requests using TransportTaskAction.

Related to #15117
This commit is contained in:
Igor Motov 2016-01-05 10:07:34 -05:00
parent dff30ece05
commit a89dba27c2
71 changed files with 3042 additions and 72 deletions

View File

@ -28,6 +28,8 @@ import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfoAction
import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction; import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsAction; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsAction;
import org.elasticsearch.action.admin.cluster.node.stats.TransportNodesStatsAction; import org.elasticsearch.action.admin.cluster.node.stats.TransportNodesStatsAction;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryAction; import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryAction;
import org.elasticsearch.action.admin.cluster.repositories.delete.TransportDeleteRepositoryAction; import org.elasticsearch.action.admin.cluster.repositories.delete.TransportDeleteRepositoryAction;
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesAction; import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesAction;
@ -255,6 +257,7 @@ public class ActionModule extends AbstractModule {
registerAction(NodesInfoAction.INSTANCE, TransportNodesInfoAction.class); registerAction(NodesInfoAction.INSTANCE, TransportNodesInfoAction.class);
registerAction(NodesStatsAction.INSTANCE, TransportNodesStatsAction.class); registerAction(NodesStatsAction.INSTANCE, TransportNodesStatsAction.class);
registerAction(NodesHotThreadsAction.INSTANCE, TransportNodesHotThreadsAction.class); registerAction(NodesHotThreadsAction.INSTANCE, TransportNodesHotThreadsAction.class);
registerAction(ListTasksAction.INSTANCE, TransportListTasksAction.class);
registerAction(ClusterStatsAction.INSTANCE, TransportClusterStatsAction.class); registerAction(ClusterStatsAction.INSTANCE, TransportClusterStatsAction.class);
registerAction(ClusterStateAction.INSTANCE, TransportClusterStateAction.class); registerAction(ClusterStateAction.INSTANCE, TransportClusterStateAction.class);

View File

@ -0,0 +1,117 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import static org.elasticsearch.ExceptionsHelper.detailedMessage;
/**
* Information about task operation failures
*
* The class is final due to serialization limitations
*/
public final class TaskOperationFailure implements Writeable<TaskOperationFailure>, ToXContent {
private final String nodeId;
private final long taskId;
private final Throwable reason;
private final RestStatus status;
public TaskOperationFailure(StreamInput in) throws IOException {
nodeId = in.readString();
taskId = in.readLong();
reason = in.readThrowable();
status = RestStatus.readFrom(in);
}
public TaskOperationFailure(String nodeId, long taskId, Throwable t) {
this.nodeId = nodeId;
this.taskId = taskId;
this.reason = t;
status = ExceptionsHelper.status(t);
}
public String getNodeId() {
return this.nodeId;
}
public long getTaskId() {
return this.taskId;
}
public String getReason() {
return detailedMessage(reason);
}
public RestStatus getStatus() {
return status;
}
public Throwable getCause() {
return reason;
}
@Override
public TaskOperationFailure readFrom(StreamInput in) throws IOException {
return new TaskOperationFailure(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(nodeId);
out.writeLong(taskId);
out.writeThrowable(reason);
RestStatus.writeTo(out, status);
}
@Override
public String toString() {
return "[" + nodeId + "][" + taskId + "] failed, reason [" + getReason() + "]";
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("task_id", getTaskId());
builder.field("node_id", getNodeId());
builder.field("status", status.name());
if (reason != null) {
builder.field("reason");
builder.startObject();
ElasticsearchException.toXContent(builder, params, reason);
builder.endObject();
}
return builder;
}
}

View File

@ -38,6 +38,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.gateway.GatewayAllocator; import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
@ -75,7 +76,13 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
} }
@Override @Override
protected void masterOperation(final ClusterHealthRequest request, final ClusterState unusedState, final ActionListener<ClusterHealthResponse> listener) { protected final void masterOperation(ClusterHealthRequest request, ClusterState state, ActionListener<ClusterHealthResponse> listener) throws Exception {
logger.warn("attempt to execute a cluster health operation without a task");
throw new UnsupportedOperationException("task parameter is required for this operation");
}
@Override
protected void masterOperation(Task task, final ClusterHealthRequest request, final ClusterState unusedState, final ActionListener<ClusterHealthResponse> listener) {
if (request.waitForEvents() != null) { if (request.waitForEvents() != null) {
final long endTimeMS = TimeValue.nsecToMSec(System.nanoTime()) + request.timeout().millis(); final long endTimeMS = TimeValue.nsecToMSec(System.nanoTime()) + request.timeout().millis();
clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])", new ClusterStateUpdateTask(request.waitForEvents()) { clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])", new ClusterStateUpdateTask(request.waitForEvents()) {
@ -95,7 +102,7 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
@Override @Override
public void onNoLongerMaster(String source) { public void onNoLongerMaster(String source) {
logger.trace("stopped being master while waiting for events with priority [{}]. retrying.", request.waitForEvents()); logger.trace("stopped being master while waiting for events with priority [{}]. retrying.", request.waitForEvents());
doExecute(request, listener); doExecute(task, request, listener);
} }
@Override @Override

View File

@ -0,0 +1,46 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.node.tasks.list;
import org.elasticsearch.action.Action;
import org.elasticsearch.client.ElasticsearchClient;
/**
* Action for retrieving a list of currently running tasks
*/
public class ListTasksAction extends Action<ListTasksRequest, ListTasksResponse, ListTasksRequestBuilder> {
public static final ListTasksAction INSTANCE = new ListTasksAction();
public static final String NAME = "cluster:monitor/tasks/lists";
private ListTasksAction() {
super(NAME);
}
@Override
public ListTasksResponse newResponse() {
return new ListTasksResponse();
}
@Override
public ListTasksRequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new ListTasksRequestBuilder(client, this);
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.node.tasks.list;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
* A request to get node tasks
*/
public class ListTasksRequest extends BaseTasksRequest<ListTasksRequest> {
private boolean detailed = false;
/**
* Get information from nodes based on the nodes ids specified. If none are passed, information
* for all nodes will be returned.
*/
public ListTasksRequest(String... nodesIds) {
super(nodesIds);
}
/**
* Should the detailed task information be returned.
*/
public boolean detailed() {
return this.detailed;
}
/**
* Should the node settings be returned.
*/
public ListTasksRequest detailed(boolean detailed) {
this.detailed = detailed;
return this;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
detailed = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(detailed);
}
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.node.tasks.list;
import org.elasticsearch.action.support.tasks.TasksRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
/**
* Builder for the request to retrieve the list of tasks running on the specified nodes
*/
public class ListTasksRequestBuilder extends TasksRequestBuilder<ListTasksRequest, ListTasksResponse, ListTasksRequestBuilder> {
public ListTasksRequestBuilder(ElasticsearchClient client, ListTasksAction action) {
super(client, action, new ListTasksRequest());
}
/**
* Should detailed task information be returned.
*/
public ListTasksRequestBuilder setDetailed(boolean detailed) {
request.detailed(detailed);
return this;
}
}

View File

@ -0,0 +1,159 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.node.tasks.list;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Returns the list of tasks currently running on the nodes
*/
public class ListTasksResponse extends BaseTasksResponse implements ToXContent {
private List<TaskInfo> tasks;
private Map<DiscoveryNode, List<TaskInfo>> nodes;
public ListTasksResponse() {
}
public ListTasksResponse(List<TaskInfo> tasks, List<TaskOperationFailure> taskFailures, List<? extends FailedNodeException> nodeFailures) {
super(taskFailures, nodeFailures);
this.tasks = tasks == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(tasks));
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
tasks = Collections.unmodifiableList(in.readList(TaskInfo::new));
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeList(tasks);
}
/**
* Returns the list of tasks by node
*/
public Map<DiscoveryNode, List<TaskInfo>> getPerNodeTasks() {
if (nodes != null) {
return nodes;
}
Map<DiscoveryNode, List<TaskInfo>> nodeTasks = new HashMap<>();
Set<DiscoveryNode> nodes = new HashSet<>();
for (TaskInfo shard : tasks) {
nodes.add(shard.getNode());
}
for (DiscoveryNode node : nodes) {
List<TaskInfo> tasks = new ArrayList<>();
for (TaskInfo taskInfo : this.tasks) {
if (taskInfo.getNode().equals(node)) {
tasks.add(taskInfo);
}
}
nodeTasks.put(node, tasks);
}
this.nodes = nodeTasks;
return nodeTasks;
}
public List<TaskInfo> getTasks() {
return tasks;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
if (getTaskFailures() != null && getTaskFailures().size() > 0) {
builder.startArray("task_failures");
for (TaskOperationFailure ex : getTaskFailures()){
builder.value(ex);
}
builder.endArray();
}
if (getNodeFailures() != null && getNodeFailures().size() > 0) {
builder.startArray("node_failures");
for (FailedNodeException ex : getNodeFailures()){
builder.value(ex);
}
builder.endArray();
}
builder.startObject("nodes");
for (Map.Entry<DiscoveryNode, List<TaskInfo>> entry : getPerNodeTasks().entrySet()) {
DiscoveryNode node = entry.getKey();
builder.startObject(node.getId(), XContentBuilder.FieldCaseConversion.NONE);
builder.field("name", node.name());
builder.field("transport_address", node.address().toString());
builder.field("host", node.getHostName());
builder.field("ip", node.getAddress());
if (!node.attributes().isEmpty()) {
builder.startObject("attributes");
for (ObjectObjectCursor<String, String> attr : node.attributes()) {
builder.field(attr.key, attr.value, XContentBuilder.FieldCaseConversion.NONE);
}
builder.endObject();
}
builder.startArray("tasks");
for(TaskInfo task : entry.getValue()) {
task.toXContent(builder, params);
}
builder.endArray();
builder.endObject();
}
builder.endObject();
return builder;
}
@Override
public String toString() {
try {
XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint();
builder.startObject();
toXContent(builder, EMPTY_PARAMS);
builder.endObject();
return builder.string();
} catch (IOException e) {
return "{ \"error\" : \"" + e.getMessage() + "\"}";
}
}
}

View File

@ -0,0 +1,140 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.node.tasks.list;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
/**
* Information about a currently running task.
* <p>
* Tasks are used for communication with transport actions. As a result, they can contain callback
* references as well as mutable state. That makes it impractical to send tasks over transport channels
* and use in APIs. Instead, immutable and streamable TaskInfo objects are used to represent
* snapshot information about currently running tasks.
*/
public class TaskInfo implements Writeable<TaskInfo>, ToXContent {
private final DiscoveryNode node;
private final long id;
private final String type;
private final String action;
private final String description;
private final String parentNode;
private final long parentId;
public TaskInfo(DiscoveryNode node, long id, String type, String action, String description) {
this(node, id, type, action, description, null, -1L);
}
public TaskInfo(DiscoveryNode node, long id, String type, String action, String description, String parentNode, long parentId) {
this.node = node;
this.id = id;
this.type = type;
this.action = action;
this.description = description;
this.parentNode = parentNode;
this.parentId = parentId;
}
public TaskInfo(StreamInput in) throws IOException {
node = DiscoveryNode.readNode(in);
id = in.readLong();
type = in.readString();
action = in.readString();
description = in.readOptionalString();
parentNode = in.readOptionalString();
parentId = in.readLong();
}
public DiscoveryNode getNode() {
return node;
}
public long getId() {
return id;
}
public String getType() {
return type;
}
public String getAction() {
return action;
}
public String getDescription() {
return description;
}
public String getParentNode() {
return parentNode;
}
public long getParentId() {
return parentId;
}
@Override
public TaskInfo readFrom(StreamInput in) throws IOException {
return new TaskInfo(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
node.writeTo(out);
out.writeLong(id);
out.writeString(type);
out.writeString(action);
out.writeOptionalString(description);
out.writeOptionalString(parentNode);
out.writeLong(parentId);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("node", node.getId());
builder.field("id", id);
builder.field("type", type);
builder.field("action", action);
if (description != null) {
builder.field("description", description);
}
if (parentNode != null) {
builder.field("parent_node", parentNode);
builder.field("parent_id", parentId);
}
builder.endObject();
return builder;
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.node.tasks.list;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.List;
/**
*
*/
public class TransportListTasksAction extends TransportTasksAction<ListTasksRequest, ListTasksResponse, TaskInfo> {
@Inject
public TransportListTasksAction(Settings settings, ClusterName clusterName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ListTasksAction.NAME, clusterName, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, ListTasksRequest::new, ListTasksResponse::new, ThreadPool.Names.MANAGEMENT);
}
@Override
protected ListTasksResponse newResponse(ListTasksRequest request, List<TaskInfo> tasks, List<TaskOperationFailure> taskOperationFailures, List<FailedNodeException> failedNodeExceptions) {
return new ListTasksResponse(tasks, taskOperationFailures, failedNodeExceptions);
}
@Override
protected TaskInfo readTaskResponse(StreamInput in) throws IOException {
return new TaskInfo(in);
}
@Override
protected TaskInfo taskOperation(ListTasksRequest request, Task task) {
return task.taskInfo(clusterService.localNode(), request.detailed());
}
@Override
protected boolean accumulateExceptions() {
return true;
}
}

View File

@ -34,6 +34,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
@ -75,12 +76,12 @@ public class TransportCloseIndexAction extends TransportMasterNodeAction<CloseIn
} }
@Override @Override
protected void doExecute(CloseIndexRequest request, ActionListener<CloseIndexResponse> listener) { protected void doExecute(Task task, CloseIndexRequest request, ActionListener<CloseIndexResponse> listener) {
destructiveOperations.failDestructive(request.indices()); destructiveOperations.failDestructive(request.indices());
if (closeIndexEnabled == false) { if (closeIndexEnabled == false) {
throw new IllegalStateException("closing indices is disabled - set [" + CLUSTER_INDICES_CLOSE_ENABLE_SETTING.getKey() + ": true] to enable it. NOTE: closed indices still consume a significant amount of diskspace"); throw new IllegalStateException("closing indices is disabled - set [" + CLUSTER_INDICES_CLOSE_ENABLE_SETTING.getKey() + ": true] to enable it. NOTE: closed indices still consume a significant amount of diskspace");
} }
super.doExecute(request, listener); super.doExecute(task, request, listener);
} }
@Override @Override

View File

@ -31,6 +31,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaDataDeleteIndexService; import org.elasticsearch.cluster.metadata.MetaDataDeleteIndexService;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
@ -62,9 +63,9 @@ public class TransportDeleteIndexAction extends TransportMasterNodeAction<Delete
} }
@Override @Override
protected void doExecute(DeleteIndexRequest request, ActionListener<DeleteIndexResponse> listener) { protected void doExecute(Task task, DeleteIndexRequest request, ActionListener<DeleteIndexResponse> listener) {
destructiveOperations.failDestructive(request.indices()); destructiveOperations.failDestructive(request.indices());
super.doExecute(request, listener); super.doExecute(task, request, listener);
} }
@Override @Override

View File

@ -32,6 +32,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService; import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
@ -65,9 +66,9 @@ public class TransportOpenIndexAction extends TransportMasterNodeAction<OpenInde
} }
@Override @Override
protected void doExecute(OpenIndexRequest request, ActionListener<OpenIndexResponse> listener) { protected void doExecute(Task task, OpenIndexRequest request, ActionListener<OpenIndexResponse> listener) {
destructiveOperations.failDestructive(request.indices()); destructiveOperations.failDestructive(request.indices());
super.doExecute(request, listener); super.doExecute(task, request, listener);
} }
@Override @Override

View File

@ -54,7 +54,9 @@ import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchTransportRequest; import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -77,7 +79,7 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
public TransportSearchTypeAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, public TransportSearchTypeAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController, SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, SearchAction.NAME, threadPool, actionFilters, indexNameExpressionResolver); super(settings, SearchAction.NAME, threadPool, actionFilters, indexNameExpressionResolver, clusterService.getTaskManager());
this.clusterService = clusterService; this.clusterService = clusterService;
this.searchService = searchService; this.searchService = searchService;
this.searchPhaseController = searchPhaseController; this.searchPhaseController = searchPhaseController;

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
/** /**
* A filter allowing to filter transport actions * A filter allowing to filter transport actions
@ -39,7 +40,7 @@ public interface ActionFilter {
* Enables filtering the execution of an action on the request side, either by sending a response through the * Enables filtering the execution of an action on the request side, either by sending a response through the
* {@link ActionListener} or by continuing the execution through the given {@link ActionFilterChain chain} * {@link ActionListener} or by continuing the execution through the given {@link ActionFilterChain chain}
*/ */
void apply(String action, ActionRequest request, ActionListener listener, ActionFilterChain chain); void apply(Task task, String action, ActionRequest request, ActionListener listener, ActionFilterChain chain);
/** /**
* Enables filtering the execution of an action on the response side, either by sending a response through the * Enables filtering the execution of an action on the response side, either by sending a response through the
@ -59,9 +60,9 @@ public interface ActionFilter {
} }
@Override @Override
public final void apply(String action, ActionRequest request, ActionListener listener, ActionFilterChain chain) { public final void apply(Task task, String action, ActionRequest request, ActionListener listener, ActionFilterChain chain) {
if (apply(action, request, listener)) { if (apply(action, request, listener)) {
chain.proceed(action, request, listener); chain.proceed(task, action, request, listener);
} }
} }

View File

@ -22,6 +22,7 @@ package org.elasticsearch.action.support;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.tasks.Task;
/** /**
* A filter chain allowing to continue and process the transport action request * A filter chain allowing to continue and process the transport action request
@ -32,7 +33,7 @@ public interface ActionFilterChain {
* Continue processing the request. Should only be called if a response has not been sent through * Continue processing the request. Should only be called if a response has not been sent through
* the given {@link ActionListener listener} * the given {@link ActionListener listener}
*/ */
void proceed(final String action, final ActionRequest request, final ActionListener listener); void proceed(Task task, final String action, final ActionRequest request, final ActionListener listener);
/** /**
* Continue processing the response. Should only be called if a response has not been sent through * Continue processing the response. Should only be called if a response has not been sent through

View File

@ -0,0 +1,71 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.tasks.ChildTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportRequest;
import java.io.IOException;
/**
* Base class for requests that can have associated child tasks
*/
public class ChildTaskRequest extends TransportRequest {
private String parentTaskNode;
private long parentTaskId;
protected ChildTaskRequest() {
}
protected ChildTaskRequest(TransportRequest parentTaskRequest) {
super(parentTaskRequest);
}
public void setParentTask(String parentTaskNode, long parentTaskId) {
this.parentTaskNode = parentTaskNode;
this.parentTaskId = parentTaskId;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
parentTaskNode = in.readOptionalString();
parentTaskId = in.readLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalString(parentTaskNode);
out.writeLong(parentTaskId);
}
@Override
public Task createTask(long id, String type, String action) {
return new ChildTask(id, type, action, this::getDescription, parentTaskNode, parentTaskId);
}
}

View File

@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportRequestHandler;
@ -36,14 +37,19 @@ import java.util.function.Supplier;
public abstract class HandledTransportAction<Request extends ActionRequest, Response extends ActionResponse> extends TransportAction<Request,Response>{ public abstract class HandledTransportAction<Request extends ActionRequest, Response extends ActionResponse> extends TransportAction<Request,Response>{
protected HandledTransportAction(Settings settings, String actionName, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request) { protected HandledTransportAction(Settings settings, String actionName, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request) {
super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver); super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver, transportService.getTaskManager());
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new TransportHandler()); transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new TransportHandler());
} }
class TransportHandler implements TransportRequestHandler<Request> { class TransportHandler implements TransportRequestHandler<Request> {
@Override @Override
public final void messageReceived(final Request request, final TransportChannel channel) throws Exception { public final void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
messageReceived(request, channel);
}
@Override
public final void messageReceived(Request request, TransportChannel channel) throws Exception {
execute(request, new ActionListener<Response>() { execute(request, new ActionListener<Response>() {
@Override @Override
public void onResponse(Response response) { public void onResponse(Response response) {

View File

@ -29,6 +29,8 @@ import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -45,15 +47,17 @@ public abstract class TransportAction<Request extends ActionRequest, Response ex
private final ActionFilter[] filters; private final ActionFilter[] filters;
protected final ParseFieldMatcher parseFieldMatcher; protected final ParseFieldMatcher parseFieldMatcher;
protected final IndexNameExpressionResolver indexNameExpressionResolver; protected final IndexNameExpressionResolver indexNameExpressionResolver;
protected final TaskManager taskManager;
protected TransportAction(Settings settings, String actionName, ThreadPool threadPool, ActionFilters actionFilters, protected TransportAction(Settings settings, String actionName, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) { IndexNameExpressionResolver indexNameExpressionResolver, TaskManager taskManager) {
super(settings); super(settings);
this.threadPool = threadPool; this.threadPool = threadPool;
this.actionName = actionName; this.actionName = actionName;
this.filters = actionFilters.filters(); this.filters = actionFilters.filters();
this.parseFieldMatcher = new ParseFieldMatcher(settings); this.parseFieldMatcher = new ParseFieldMatcher(settings);
this.indexNameExpressionResolver = indexNameExpressionResolver; this.indexNameExpressionResolver = indexNameExpressionResolver;
this.taskManager = taskManager;
} }
public final ActionFuture<Response> execute(Request request) { public final ActionFuture<Response> execute(Request request) {
@ -63,6 +67,28 @@ public abstract class TransportAction<Request extends ActionRequest, Response ex
} }
public final void execute(Request request, ActionListener<Response> listener) { public final void execute(Request request, ActionListener<Response> listener) {
Task task = taskManager.register("transport", actionName, request);
if (task == null) {
execute(null, request, listener);
} else {
execute(task, request, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
taskManager.unregister(task);
listener.onResponse(response);
}
@Override
public void onFailure(Throwable e) {
taskManager.unregister(task);
listener.onFailure(e);
}
});
}
}
private final void execute(Task task, Request request, ActionListener<Response> listener) {
ActionRequestValidationException validationException = request.validate(); ActionRequestValidationException validationException = request.validate();
if (validationException != null) { if (validationException != null) {
listener.onFailure(validationException); listener.onFailure(validationException);
@ -71,17 +97,21 @@ public abstract class TransportAction<Request extends ActionRequest, Response ex
if (filters.length == 0) { if (filters.length == 0) {
try { try {
doExecute(request, listener); doExecute(task, request, listener);
} catch(Throwable t) { } catch(Throwable t) {
logger.trace("Error during transport action execution.", t); logger.trace("Error during transport action execution.", t);
listener.onFailure(t); listener.onFailure(t);
} }
} else { } else {
RequestFilterChain requestFilterChain = new RequestFilterChain<>(this, logger); RequestFilterChain requestFilterChain = new RequestFilterChain<>(this, logger);
requestFilterChain.proceed(actionName, request, listener); requestFilterChain.proceed(task, actionName, request, listener);
} }
} }
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
doExecute(request, listener);
}
protected abstract void doExecute(Request request, ActionListener<Response> listener); protected abstract void doExecute(Request request, ActionListener<Response> listener);
private static class RequestFilterChain<Request extends ActionRequest, Response extends ActionResponse> implements ActionFilterChain { private static class RequestFilterChain<Request extends ActionRequest, Response extends ActionResponse> implements ActionFilterChain {
@ -96,13 +126,13 @@ public abstract class TransportAction<Request extends ActionRequest, Response ex
} }
@Override @SuppressWarnings("unchecked") @Override @SuppressWarnings("unchecked")
public void proceed(String actionName, ActionRequest request, ActionListener listener) { public void proceed(Task task, String actionName, ActionRequest request, ActionListener listener) {
int i = index.getAndIncrement(); int i = index.getAndIncrement();
try { try {
if (i < this.action.filters.length) { if (i < this.action.filters.length) {
this.action.filters[i].apply(actionName, request, listener, this); this.action.filters[i].apply(task, actionName, request, listener, this);
} else if (i == this.action.filters.length) { } else if (i == this.action.filters.length) {
this.action.doExecute((Request) request, new FilteredActionListener<Response>(actionName, listener, new ResponseFilterChain(this.action.filters, logger))); this.action.doExecute(task, (Request) request, new FilteredActionListener<Response>(actionName, listener, new ResponseFilterChain(this.action.filters, logger)));
} else { } else {
listener.onFailure(new IllegalStateException("proceed was called too many times")); listener.onFailure(new IllegalStateException("proceed was called too many times"));
} }
@ -131,7 +161,7 @@ public abstract class TransportAction<Request extends ActionRequest, Response ex
} }
@Override @Override
public void proceed(String action, ActionRequest request, ActionListener listener) { public void proceed(Task task, String action, ActionRequest request, ActionListener listener) {
assert false : "response filter chain should never be called on the request side"; assert false : "response filter chain should never be called on the request side";
} }

View File

@ -39,6 +39,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportException;
@ -84,6 +85,13 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
protected abstract void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception; protected abstract void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception;
/**
* Override this operation if access to the task parameter is needed
*/
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
masterOperation(request, state, listener);
}
protected boolean localExecute(Request request) { protected boolean localExecute(Request request) {
return false; return false;
} }
@ -91,8 +99,14 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
protected abstract ClusterBlockException checkBlock(Request request, ClusterState state); protected abstract ClusterBlockException checkBlock(Request request, ClusterState state);
@Override @Override
protected void doExecute(final Request request, ActionListener<Response> listener) { protected final void doExecute(final Request request, ActionListener<Response> listener) {
new AsyncSingleAction(request, listener).start(); logger.warn("attempt to execute a master node operation without task");
throw new UnsupportedOperationException("task parameter is required for this operation");
}
@Override
protected void doExecute(Task task, final Request request, ActionListener<Response> listener) {
new AsyncSingleAction(task, request, listener).start();
} }
class AsyncSingleAction { class AsyncSingleAction {
@ -100,6 +114,7 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
private final ActionListener<Response> listener; private final ActionListener<Response> listener;
private final Request request; private final Request request;
private volatile ClusterStateObserver observer; private volatile ClusterStateObserver observer;
private final Task task;
private final ClusterStateObserver.ChangePredicate retryableOrNoBlockPredicate = new ClusterStateObserver.ValidationPredicate() { private final ClusterStateObserver.ChangePredicate retryableOrNoBlockPredicate = new ClusterStateObserver.ValidationPredicate() {
@Override @Override
@ -109,7 +124,8 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
} }
}; };
AsyncSingleAction(Request request, ActionListener<Response> listener) { AsyncSingleAction(Task task, Request request, ActionListener<Response> listener) {
this.task = task;
this.request = request; this.request = request;
// TODO do we really need to wrap it in a listener? the handlers should be cheap // TODO do we really need to wrap it in a listener? the handlers should be cheap
if ((listener instanceof ThreadedActionListener) == false) { if ((listener instanceof ThreadedActionListener) == false) {
@ -157,7 +173,7 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
threadPool.executor(executor).execute(new ActionRunnable(delegate) { threadPool.executor(executor).execute(new ActionRunnable(delegate) {
@Override @Override
protected void doRun() throws Exception { protected void doRun() throws Exception {
masterOperation(request, clusterService.state(), delegate); masterOperation(task, request, clusterService.state(), delegate);
} }
}); });
} }

View File

@ -19,16 +19,16 @@
package org.elasticsearch.action.support.nodes; package org.elasticsearch.action.support.nodes;
import org.elasticsearch.action.support.ChildTaskRequest;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportRequest;
import java.io.IOException; import java.io.IOException;
/** /**
* *
*/ */
public abstract class BaseNodeRequest extends TransportRequest { public abstract class BaseNodeRequest extends ChildTaskRequest {
private String nodeId; private String nodeId;

View File

@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.NoSuchNodeException; import org.elasticsearch.action.NoSuchNodeException;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ChildTaskRequest;
import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
@ -32,6 +33,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportResponseHandler; import org.elasticsearch.transport.BaseTransportResponseHandler;
import org.elasticsearch.transport.NodeShouldNotConnectException; import org.elasticsearch.transport.NodeShouldNotConnectException;
@ -71,8 +73,14 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
} }
@Override @Override
protected void doExecute(NodesRequest request, ActionListener<NodesResponse> listener) { protected final void doExecute(NodesRequest request, ActionListener<NodesResponse> listener) {
new AsyncAction(request, listener).start(); logger.warn("attempt to execute a transport nodes operation without a task");
throw new UnsupportedOperationException("task parameter is required for this operation");
}
@Override
protected void doExecute(Task task, NodesRequest request, ActionListener<NodesResponse> listener) {
new AsyncAction(task, request, listener).start();
} }
protected boolean transportCompress() { protected boolean transportCompress() {
@ -106,8 +114,10 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
private final ActionListener<NodesResponse> listener; private final ActionListener<NodesResponse> listener;
private final AtomicReferenceArray<Object> responses; private final AtomicReferenceArray<Object> responses;
private final AtomicInteger counter = new AtomicInteger(); private final AtomicInteger counter = new AtomicInteger();
private final Task task;
private AsyncAction(NodesRequest request, ActionListener<NodesResponse> listener) { private AsyncAction(Task task, NodesRequest request, ActionListener<NodesResponse> listener) {
this.task = task;
this.request = request; this.request = request;
this.listener = listener; this.listener = listener;
ClusterState clusterState = clusterService.state(); ClusterState clusterState = clusterService.state();
@ -150,7 +160,11 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
// those (and they randomize the client node usage, so tricky to find when) // those (and they randomize the client node usage, so tricky to find when)
onFailure(idx, nodeId, new NodeShouldNotConnectException(clusterService.localNode(), node)); onFailure(idx, nodeId, new NodeShouldNotConnectException(clusterService.localNode(), node));
} else { } else {
NodeRequest nodeRequest = newNodeRequest(nodeId, request); ChildTaskRequest nodeRequest = newNodeRequest(nodeId, request);
if (task != null) {
nodeRequest.setParentTask(clusterService.localNode().id(), task.getId());
}
transportService.sendRequest(node, transportNodeAction, nodeRequest, builder.build(), new BaseTransportResponseHandler<NodeResponse>() { transportService.sendRequest(node, transportNodeAction, nodeRequest, builder.build(), new BaseTransportResponseHandler<NodeResponse>() {
@Override @Override
public NodeResponse newInstance() { public NodeResponse newInstance() {

View File

@ -114,7 +114,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
MappingUpdatedAction mappingUpdatedAction, ActionFilters actionFilters, MappingUpdatedAction mappingUpdatedAction, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request, IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
Supplier<ReplicaRequest> replicaRequest, String executor) { Supplier<ReplicaRequest> replicaRequest, String executor) {
super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver); super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver, transportService.getTaskManager());
this.transportService = transportService; this.transportService = transportService;
this.clusterService = clusterService; this.clusterService = clusterService;
this.indicesService = indicesService; this.indicesService = indicesService;

View File

@ -66,7 +66,7 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
protected TransportSingleShardAction(Settings settings, String actionName, ThreadPool threadPool, ClusterService clusterService, protected TransportSingleShardAction(Settings settings, String actionName, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<Request> request, String executor) { Supplier<Request> request, String executor) {
super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver); super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver, transportService.getTaskManager());
this.clusterService = clusterService; this.clusterService = clusterService;
this.transportService = transportService; this.transportService = transportService;

View File

@ -0,0 +1,195 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support.tasks;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.tasks.ChildTask;
import org.elasticsearch.tasks.Task;
import java.io.IOException;
/**
* A base class for task requests
*/
public class BaseTasksRequest<T extends BaseTasksRequest> extends ActionRequest<T> {
public static final String[] ALL_ACTIONS = Strings.EMPTY_ARRAY;
public static final String[] ALL_NODES = Strings.EMPTY_ARRAY;
public static final long ALL_TASKS = -1L;
private String[] nodesIds = ALL_NODES;
private TimeValue timeout;
private String[] actions = ALL_ACTIONS;
private String parentNode;
private long parentTaskId = ALL_TASKS;
public BaseTasksRequest() {
}
@Override
public ActionRequestValidationException validate() {
return null;
}
/**
* Get information about tasks from nodes based on the nodes ids specified.
* If none are passed, information for all nodes will be returned.
*/
public BaseTasksRequest(ActionRequest request, String... nodesIds) {
super(request);
this.nodesIds = nodesIds;
}
/**
* Get information about tasks from nodes based on the nodes ids specified.
* If none are passed, information for all nodes will be returned.
*/
public BaseTasksRequest(String... nodesIds) {
this.nodesIds = nodesIds;
}
/**
* Sets the list of action masks for the actions that should be returned
*/
@SuppressWarnings("unchecked")
public final T actions(String... actions) {
this.actions = actions;
return (T) this;
}
/**
* Return the list of action masks for the actions that should be returned
*/
public String[] actions() {
return actions;
}
public final String[] nodesIds() {
return nodesIds;
}
@SuppressWarnings("unchecked")
public final T nodesIds(String... nodesIds) {
this.nodesIds = nodesIds;
return (T) this;
}
/**
* Returns the parent node id that tasks should be filtered by
*/
public String parentNode() {
return parentNode;
}
@SuppressWarnings("unchecked")
public T parentNode(String parentNode) {
this.parentNode = parentNode;
return (T) this;
}
/**
* Returns the parent task id that tasks should be filtered by
*/
public long parentTaskId() {
return parentTaskId;
}
@SuppressWarnings("unchecked")
public T parentTaskId(long parentTaskId) {
this.parentTaskId = parentTaskId;
return (T) this;
}
public TimeValue timeout() {
return this.timeout;
}
@SuppressWarnings("unchecked")
public final T timeout(TimeValue timeout) {
this.timeout = timeout;
return (T) this;
}
@SuppressWarnings("unchecked")
public final T timeout(String timeout) {
this.timeout = TimeValue.parseTimeValue(timeout, null, getClass().getSimpleName() + ".timeout");
return (T) this;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
nodesIds = in.readStringArray();
actions = in.readStringArray();
parentNode = in.readOptionalString();
parentTaskId = in.readLong();
if (in.readBoolean()) {
timeout = TimeValue.readTimeValue(in);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArrayNullable(nodesIds);
out.writeStringArrayNullable(actions);
out.writeOptionalString(parentNode);
out.writeLong(parentTaskId);
out.writeOptionalStreamable(timeout);
}
public boolean match(Task task) {
if (actions() != null && actions().length > 0 && Regex.simpleMatch(actions(), task.getAction()) == false) {
return false;
}
if (parentNode() != null || parentTaskId() != BaseTasksRequest.ALL_TASKS) {
if (task instanceof ChildTask) {
if (parentNode() != null) {
if (parentNode().equals(((ChildTask) task).getParentNode()) == false) {
return false;
}
}
if (parentTaskId() != BaseTasksRequest.ALL_TASKS) {
if (parentTaskId() != ((ChildTask) task).getParentId()) {
return false;
}
}
} else {
// This is not a child task and we need to match parent node or id
return false;
}
}
return true;
}
}

View File

@ -0,0 +1,92 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support.tasks;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* Base class for responses of task-related operations
*/
public class BaseTasksResponse extends ActionResponse {
private List<TaskOperationFailure> taskFailures;
private List<FailedNodeException> nodeFailures;
public BaseTasksResponse() {
}
public BaseTasksResponse(List<TaskOperationFailure> taskFailures, List<? extends FailedNodeException> nodeFailures) {
this.taskFailures = taskFailures == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(taskFailures));
this.nodeFailures = nodeFailures == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(nodeFailures));
}
/**
* The list of task failures exception.
*/
public List<TaskOperationFailure> getTaskFailures() {
return taskFailures;
}
/**
* The list of node failures exception.
*/
public List<FailedNodeException> getNodeFailures() {
return nodeFailures;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
int size = in.readVInt();
List<TaskOperationFailure> taskFailures = new ArrayList<>();
for (int i = 0; i < size; i++) {
taskFailures.add(new TaskOperationFailure(in));
}
size = in.readVInt();
this.taskFailures = Collections.unmodifiableList(taskFailures);
List<FailedNodeException> nodeFailures = new ArrayList<>();
for (int i = 0; i < size; i++) {
nodeFailures.add(new FailedNodeException(in));
}
this.nodeFailures = Collections.unmodifiableList(nodeFailures);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(taskFailures.size());
for (TaskOperationFailure exp : taskFailures) {
exp.writeTo(out);
}
out.writeVInt(nodeFailures.size());
for (FailedNodeException exp : nodeFailures) {
exp.writeTo(out);
}
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support.tasks;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.unit.TimeValue;
/**
* Builder for task-based requests
*/
public class TasksRequestBuilder <Request extends BaseTasksRequest<Request>, Response extends BaseTasksResponse, RequestBuilder extends TasksRequestBuilder<Request, Response, RequestBuilder>>
extends ActionRequestBuilder<Request, Response, RequestBuilder> {
protected TasksRequestBuilder(ElasticsearchClient client, Action<Request, Response, RequestBuilder> action, Request request) {
super(client, action, request);
}
@SuppressWarnings("unchecked")
public final RequestBuilder setNodesIds(String... nodesIds) {
request.nodesIds(nodesIds);
return (RequestBuilder) this;
}
@SuppressWarnings("unchecked")
public final RequestBuilder setActions(String... actions) {
request.actions(actions);
return (RequestBuilder) this;
}
@SuppressWarnings("unchecked")
public final RequestBuilder setTimeout(TimeValue timeout) {
request.timeout(timeout);
return (RequestBuilder) this;
}
}

View File

@ -0,0 +1,380 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support.tasks;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.NoSuchNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ChildTaskRequest;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportResponseHandler;
import org.elasticsearch.transport.NodeShouldNotConnectException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.Supplier;
/**
* The base class for transport actions that are interacting with currently running tasks.
*/
public abstract class TransportTasksAction<
TasksRequest extends BaseTasksRequest<TasksRequest>,
TasksResponse extends BaseTasksResponse,
TaskResponse extends Writeable<TaskResponse>
> extends HandledTransportAction<TasksRequest, TasksResponse> {
protected final ClusterName clusterName;
protected final ClusterService clusterService;
protected final TransportService transportService;
protected final Supplier<TasksRequest> requestSupplier;
protected final Supplier<TasksResponse> responseSupplier;
protected final String transportNodeAction;
protected TransportTasksAction(Settings settings, String actionName, ClusterName clusterName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<TasksRequest> requestSupplier,
Supplier<TasksResponse> responseSupplier,
String nodeExecutor) {
super(settings, actionName, threadPool, transportService, actionFilters, indexNameExpressionResolver, requestSupplier);
this.clusterName = clusterName;
this.clusterService = clusterService;
this.transportService = transportService;
this.transportNodeAction = actionName + "[n]";
this.requestSupplier = requestSupplier;
this.responseSupplier = responseSupplier;
transportService.registerRequestHandler(transportNodeAction, NodeTaskRequest::new, nodeExecutor, new NodeTransportHandler());
}
@Override
protected final void doExecute(TasksRequest request, ActionListener<TasksResponse> listener) {
logger.warn("attempt to execute a transport tasks operation without a task");
throw new UnsupportedOperationException("task parameter is required for this operation");
}
@Override
protected void doExecute(Task task, TasksRequest request, ActionListener<TasksResponse> listener) {
new AsyncAction(task, request, listener).start();
}
private NodeTasksResponse nodeOperation(NodeTaskRequest nodeTaskRequest) {
TasksRequest request = nodeTaskRequest.tasksRequest;
List<TaskResponse> results = new ArrayList<>();
List<TaskOperationFailure> exceptions = new ArrayList<>();
for (Task task : taskManager.getTasks().values()) {
// First check action and node filters
if (request.match(task)) {
try {
results.add(taskOperation(request, task));
} catch (Exception ex) {
exceptions.add(new TaskOperationFailure(clusterService.localNode().id(), task.getId(), ex));
}
}
}
return new NodeTasksResponse(clusterService.localNode().id(), results, exceptions);
}
protected String[] filterNodeIds(DiscoveryNodes nodes, String[] nodesIds) {
return nodesIds;
}
protected String[] resolveNodes(TasksRequest request, ClusterState clusterState) {
return clusterState.nodes().resolveNodesIds(request.nodesIds());
}
protected abstract TasksResponse newResponse(TasksRequest request, List<TaskResponse> tasks, List<TaskOperationFailure> taskOperationFailures, List<FailedNodeException> failedNodeExceptions);
@SuppressWarnings("unchecked")
protected TasksResponse newResponse(TasksRequest request, AtomicReferenceArray responses) {
List<TaskResponse> tasks = new ArrayList<>();
List<FailedNodeException> failedNodeExceptions = new ArrayList<>();
List<TaskOperationFailure> taskOperationFailures = new ArrayList<>();
for (int i = 0; i < responses.length(); i++) {
Object response = responses.get(i);
if (response instanceof FailedNodeException) {
failedNodeExceptions.add((FailedNodeException) response);
} else {
NodeTasksResponse tasksResponse = (NodeTasksResponse) response;
if (tasksResponse.results != null) {
tasks.addAll(tasksResponse.results);
}
if (tasksResponse.exceptions != null) {
taskOperationFailures.addAll(tasksResponse.exceptions);
}
}
}
return newResponse(request, tasks, taskOperationFailures, failedNodeExceptions);
}
protected abstract TaskResponse readTaskResponse(StreamInput in) throws IOException;
protected abstract TaskResponse taskOperation(TasksRequest request, Task task);
protected boolean transportCompress() {
return false;
}
protected abstract boolean accumulateExceptions();
private class AsyncAction {
private final TasksRequest request;
private final String[] nodesIds;
private final DiscoveryNode[] nodes;
private final ActionListener<TasksResponse> listener;
private final AtomicReferenceArray<Object> responses;
private final AtomicInteger counter = new AtomicInteger();
private final Task task;
private AsyncAction(Task task, TasksRequest request, ActionListener<TasksResponse> listener) {
this.task = task;
this.request = request;
this.listener = listener;
ClusterState clusterState = clusterService.state();
String[] nodesIds = resolveNodes(request, clusterState);
this.nodesIds = filterNodeIds(clusterState.nodes(), nodesIds);
ImmutableOpenMap<String, DiscoveryNode> nodes = clusterState.nodes().nodes();
this.nodes = new DiscoveryNode[nodesIds.length];
for (int i = 0; i < nodesIds.length; i++) {
this.nodes[i] = nodes.get(nodesIds[i]);
}
this.responses = new AtomicReferenceArray<>(this.nodesIds.length);
}
private void start() {
if (nodesIds.length == 0) {
// nothing to do
try {
listener.onResponse(newResponse(request, responses));
} catch (Throwable t) {
logger.debug("failed to generate empty response", t);
listener.onFailure(t);
}
} else {
TransportRequestOptions.Builder builder = TransportRequestOptions.builder();
if (request.timeout() != null) {
builder.withTimeout(request.timeout());
}
builder.withCompress(transportCompress());
for (int i = 0; i < nodesIds.length; i++) {
final String nodeId = nodesIds[i];
final int idx = i;
final DiscoveryNode node = nodes[i];
try {
if (node == null) {
onFailure(idx, nodeId, new NoSuchNodeException(nodeId));
} else if (!clusterService.localNode().shouldConnectTo(node) && !clusterService.localNode().equals(node)) {
// the check "!clusterService.localNode().equals(node)" is to maintain backward comp. where before
// we allowed to connect from "local" client node to itself, certain tests rely on it, if we remove it, we need to fix
// those (and they randomize the client node usage, so tricky to find when)
onFailure(idx, nodeId, new NodeShouldNotConnectException(clusterService.localNode(), node));
} else {
NodeTaskRequest nodeRequest = new NodeTaskRequest(request);
nodeRequest.setParentTask(clusterService.localNode().id(), task.getId());
transportService.sendRequest(node, transportNodeAction, nodeRequest, builder.build(), new BaseTransportResponseHandler<NodeTasksResponse>() {
@Override
public NodeTasksResponse newInstance() {
return new NodeTasksResponse();
}
@Override
public void handleResponse(NodeTasksResponse response) {
onOperation(idx, response);
}
@Override
public void handleException(TransportException exp) {
onFailure(idx, node.id(), exp);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
}
} catch (Throwable t) {
onFailure(idx, nodeId, t);
}
}
}
}
private void onOperation(int idx, NodeTasksResponse nodeResponse) {
responses.set(idx, nodeResponse);
if (counter.incrementAndGet() == responses.length()) {
finishHim();
}
}
private void onFailure(int idx, String nodeId, Throwable t) {
if (logger.isDebugEnabled() && !(t instanceof NodeShouldNotConnectException)) {
logger.debug("failed to execute on node [{}]", t, nodeId);
}
if (accumulateExceptions()) {
responses.set(idx, new FailedNodeException(nodeId, "Failed node [" + nodeId + "]", t));
}
if (counter.incrementAndGet() == responses.length()) {
finishHim();
}
}
private void finishHim() {
TasksResponse finalResponse;
try {
finalResponse = newResponse(request, responses);
} catch (Throwable t) {
logger.debug("failed to combine responses from nodes", t);
listener.onFailure(t);
return;
}
listener.onResponse(finalResponse);
}
}
class NodeTransportHandler implements TransportRequestHandler<NodeTaskRequest> {
@Override
public void messageReceived(final NodeTaskRequest request, final TransportChannel channel) throws Exception {
channel.sendResponse(nodeOperation(request));
}
}
private class NodeTaskRequest extends ChildTaskRequest {
private TasksRequest tasksRequest;
protected NodeTaskRequest() {
super();
}
protected NodeTaskRequest(TasksRequest tasksRequest) {
super(tasksRequest);
this.tasksRequest = tasksRequest;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
tasksRequest = requestSupplier.get();
tasksRequest.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
tasksRequest.writeTo(out);
}
}
private class NodeTasksResponse extends TransportResponse {
protected String nodeId;
protected List<TaskOperationFailure> exceptions;
protected List<TaskResponse> results;
public NodeTasksResponse() {
}
public NodeTasksResponse(String nodeId,
List<TaskResponse> results,
List<TaskOperationFailure> exceptions) {
this.nodeId = nodeId;
this.results = results;
this.exceptions = exceptions;
}
public String getNodeId() {
return nodeId;
}
public List<TaskOperationFailure> getExceptions() {
return exceptions;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
nodeId = in.readString();
int resultsSize = in.readVInt();
results = new ArrayList<>(resultsSize);
for (; resultsSize > 0; resultsSize--) {
final TaskResponse result = in.readBoolean() ? readTaskResponse(in) : null;
results.add(result);
}
if (in.readBoolean()) {
int taskFailures = in.readVInt();
exceptions = new ArrayList<>(taskFailures);
for (int i = 0; i < taskFailures; i++) {
exceptions.add(new TaskOperationFailure(in));
}
} else {
exceptions = null;
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(nodeId);
out.writeVInt(results.size());
for (TaskResponse result : results) {
if (result != null) {
out.writeBoolean(true);
result.writeTo(out);
} else {
out.writeBoolean(false);
}
}
out.writeBoolean(exceptions != null);
if (exceptions != null) {
int taskFailures = exceptions.size();
out.writeVInt(taskFailures);
for (TaskOperationFailure exception : exceptions) {
exception.writeTo(out);
}
}
}
}
}

View File

@ -33,6 +33,9 @@ import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequestBuilder; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequestBuilder;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequestBuilder;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest; import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequestBuilder; import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequestBuilder;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryResponse; import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryResponse;
@ -249,6 +252,29 @@ public interface ClusterAdminClient extends ElasticsearchClient {
*/ */
NodesHotThreadsRequestBuilder prepareNodesHotThreads(String... nodesIds); NodesHotThreadsRequestBuilder prepareNodesHotThreads(String... nodesIds);
/**
* List tasks
*
* @param request The nodes tasks request
* @return The result future
* @see org.elasticsearch.client.Requests#listTasksRequest(String...)
*/
ActionFuture<ListTasksResponse> listTasks(ListTasksRequest request);
/**
* List active tasks
*
* @param request The nodes tasks request
* @param listener A listener to be notified with a result
* @see org.elasticsearch.client.Requests#listTasksRequest(String...)
*/
void listTasks(ListTasksRequest request, ActionListener<ListTasksResponse> listener);
/**
* List active tasks
*/
ListTasksRequestBuilder prepareListTasks(String... nodesIds);
/** /**
* Returns list of shards the given search would be executed on. * Returns list of shards the given search would be executed on.
*/ */

View File

@ -22,6 +22,7 @@ package org.elasticsearch.client;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest; import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest; import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
@ -404,6 +405,27 @@ public class Requests {
return new ClusterStatsRequest(); return new ClusterStatsRequest();
} }
/**
* Creates a nodes tasks request against all the nodes.
*
* @return The nodes tasks request
* @see org.elasticsearch.client.ClusterAdminClient#listTasks(ListTasksRequest)
*/
public static ListTasksRequest listTasksRequest() {
return new ListTasksRequest();
}
/**
* Creates a nodes tasks request against one or more nodes. Pass <tt>null</tt> or an empty array for all nodes.
*
* @param nodesIds The nodes ids to get the tasks for
* @return The nodes tasks request
* @see org.elasticsearch.client.ClusterAdminClient#nodesStats(org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest)
*/
public static ListTasksRequest listTasksRequest(String... nodesIds) {
return new ListTasksRequest(nodesIds);
}
/** /**
* Registers snapshot repository * Registers snapshot repository
* *

View File

@ -41,6 +41,10 @@ import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsAction;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequestBuilder; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequestBuilder;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequestBuilder;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryAction; import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryAction;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest; import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequestBuilder; import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequestBuilder;
@ -968,6 +972,21 @@ public abstract class AbstractClient extends AbstractComponent implements Client
return new NodesHotThreadsRequestBuilder(this, NodesHotThreadsAction.INSTANCE).setNodesIds(nodesIds); return new NodesHotThreadsRequestBuilder(this, NodesHotThreadsAction.INSTANCE).setNodesIds(nodesIds);
} }
@Override
public ActionFuture<ListTasksResponse> listTasks(final ListTasksRequest request) {
return execute(ListTasksAction.INSTANCE, request);
}
@Override
public void listTasks(final ListTasksRequest request, final ActionListener<ListTasksResponse> listener) {
execute(ListTasksAction.INSTANCE, request, listener);
}
@Override
public ListTasksRequestBuilder prepareListTasks(String... nodesIds) {
return new ListTasksRequestBuilder(this, ListTasksAction.INSTANCE).setNodesIds(nodesIds);
}
@Override @Override
public ActionFuture<ClusterSearchShardsResponse> searchShards(final ClusterSearchShardsRequest request) { public ActionFuture<ClusterSearchShardsResponse> searchShards(final ClusterSearchShardsRequest request) {
return execute(ClusterSearchShardsAction.INSTANCE, request); return execute(ClusterSearchShardsAction.INSTANCE, request);

View File

@ -26,6 +26,7 @@ import org.elasticsearch.cluster.service.PendingClusterTask;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.tasks.TaskManager;
import java.util.List; import java.util.List;
@ -148,4 +149,9 @@ public interface ClusterService extends LifecycleComponent<ClusterService> {
* @return A zero time value if the queue is empty, otherwise the time value oldest task waiting in the queue * @return A zero time value if the queue is empty, otherwise the time value oldest task waiting in the queue
*/ */
TimeValue getMaxTaskWaitTime(); TimeValue getMaxTaskWaitTime();
/**
* Returns task manager created in the cluster service
*/
TaskManager getTaskManager();
} }

View File

@ -65,6 +65,7 @@ import org.elasticsearch.common.util.concurrent.PrioritizedRunnable;
import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryService; import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
@ -133,6 +134,8 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
private final ClusterBlocks.Builder initialBlocks; private final ClusterBlocks.Builder initialBlocks;
private final TaskManager taskManager;
private volatile ScheduledFuture reconnectToNodes; private volatile ScheduledFuture reconnectToNodes;
@Inject @Inject
@ -159,6 +162,8 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
localNodeMasterListeners = new LocalNodeMasterListeners(threadPool); localNodeMasterListeners = new LocalNodeMasterListeners(threadPool);
initialBlocks = ClusterBlocks.builder().addGlobalBlock(discoveryService.getNoMasterBlock()); initialBlocks = ClusterBlocks.builder().addGlobalBlock(discoveryService.getNoMasterBlock());
taskManager = transportService.getTaskManager();
} }
private void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) { private void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {
@ -372,6 +377,10 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
return updateTasksExecutor.getMaxTaskWaitTime(); return updateTasksExecutor.getMaxTaskWaitTime();
} }
@Override
public TaskManager getTaskManager() {
return taskManager;
}
/** asserts that the current thread is the cluster state update thread */ /** asserts that the current thread is the cluster state update thread */
public boolean assertClusterStateThread() { public boolean assertClusterStateThread() {

View File

@ -60,6 +60,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier; import java.util.function.Supplier;
import static org.elasticsearch.ElasticsearchException.readException; import static org.elasticsearch.ElasticsearchException.readException;
@ -682,6 +683,18 @@ public abstract class StreamInput extends InputStream {
return readNamedWriteable(ScoreFunctionBuilder.class); return readNamedWriteable(ScoreFunctionBuilder.class);
} }
/**
* Reads a list of objects
*/
public <T> List<T> readList(StreamInputReader<T> reader) throws IOException {
int count = readVInt();
List<T> builder = new ArrayList<>(count);
for (int i=0; i<count; i++) {
builder.add(reader.read(this));
}
return builder;
}
public static StreamInput wrap(BytesReference reference) { public static StreamInput wrap(BytesReference reference) {
if (reference.hasArray() == false) { if (reference.hasArray() == false) {
reference = reference.toBytesArray(); reference = reference.toBytesArray();

View File

@ -0,0 +1,33 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io.stream;
import java.io.IOException;
/**
* Defines a method for reading a list of objects from StreamInput.
*
* It can be used in {@link StreamInput#readList(StreamInputReader)} for reading
* lists of immutable objects that implement StreamInput accepting constructors.
*/
@FunctionalInterface
public interface StreamInputReader<T> {
T read(StreamInput t) throws IOException;
}

View File

@ -667,4 +667,14 @@ public abstract class StreamOutput extends OutputStream {
writeDouble(geoPoint.lat()); writeDouble(geoPoint.lat());
writeDouble(geoPoint.lon()); writeDouble(geoPoint.lon());
} }
/**
* Writes a list of {@link Writeable} objects
*/
public <T extends Writeable<T>> void writeList(List<T> list) throws IOException {
writeVInt(list.size());
for (T obj: list) {
obj.writeTo(this);
}
}
} }

View File

@ -36,6 +36,7 @@ import org.elasticsearch.rest.action.admin.cluster.health.RestClusterHealthActio
import org.elasticsearch.rest.action.admin.cluster.node.hotthreads.RestNodesHotThreadsAction; import org.elasticsearch.rest.action.admin.cluster.node.hotthreads.RestNodesHotThreadsAction;
import org.elasticsearch.rest.action.admin.cluster.node.info.RestNodesInfoAction; import org.elasticsearch.rest.action.admin.cluster.node.info.RestNodesInfoAction;
import org.elasticsearch.rest.action.admin.cluster.node.stats.RestNodesStatsAction; import org.elasticsearch.rest.action.admin.cluster.node.stats.RestNodesStatsAction;
import org.elasticsearch.rest.action.admin.cluster.node.tasks.RestListTasksAction;
import org.elasticsearch.rest.action.admin.cluster.repositories.delete.RestDeleteRepositoryAction; import org.elasticsearch.rest.action.admin.cluster.repositories.delete.RestDeleteRepositoryAction;
import org.elasticsearch.rest.action.admin.cluster.repositories.get.RestGetRepositoriesAction; import org.elasticsearch.rest.action.admin.cluster.repositories.get.RestGetRepositoriesAction;
import org.elasticsearch.rest.action.admin.cluster.repositories.put.RestPutRepositoryAction; import org.elasticsearch.rest.action.admin.cluster.repositories.put.RestPutRepositoryAction;
@ -259,7 +260,10 @@ public class NetworkModule extends AbstractModule {
RestFieldStatsAction.class, RestFieldStatsAction.class,
// no abstract cat action // no abstract cat action
RestCatAction.class RestCatAction.class,
// Tasks API
RestListTasksAction.class
); );
private static final List<Class<? extends AbstractCatAction>> builtinCatHandlers = Arrays.asList( private static final List<Class<? extends AbstractCatAction>> builtinCatHandlers = Arrays.asList(

View File

@ -0,0 +1,61 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.admin.cluster.node.tasks;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.support.RestToXContentListener;
import static org.elasticsearch.rest.RestRequest.Method.GET;
public class RestListTasksAction extends BaseRestHandler {
@Inject
public RestListTasksAction(Settings settings, RestController controller, Client client) {
super(settings, controller, client);
controller.registerHandler(GET, "/_tasks", this);
controller.registerHandler(GET, "/_tasks/{nodeId}", this);
controller.registerHandler(GET, "/_tasks/{nodeId}/{actions}", this);
}
@Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
boolean detailed = request.paramAsBoolean("detailed", false);
String[] nodesIds = Strings.splitStringByCommaToArray(request.param("nodeId"));
String[] actions = Strings.splitStringByCommaToArray(request.param("actions"));
String parentNode = request.param("parent_node");
long parentTaskId = request.paramAsLong("parent_task", ListTasksRequest.ALL_TASKS);
ListTasksRequest listTasksRequest = new ListTasksRequest(nodesIds);
listTasksRequest.detailed(detailed);
listTasksRequest.actions(actions);
listTasksRequest.parentNode(parentNode);
listTasksRequest.parentTaskId(parentTaskId);
client.admin().cluster().listTasks(listTasksRequest, new RestToXContentListener<>(channel));
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.tasks;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Provider;
/**
* Child task
*/
public class ChildTask extends Task {
private final String parentNode;
private final long parentId;
public ChildTask(long id, String type, String action, Provider<String> description, String parentNode, long parentId) {
super(id, type, action, description);
this.parentNode = parentNode;
this.parentId = parentId;
}
/**
* Returns parent node of the task or null if task doesn't have any parent tasks
*/
public String getParentNode() {
return parentNode;
}
/**
* Returns id of the parent task or -1L if task doesn't have any parent tasks
*/
public long getParentId() {
return parentId;
}
public TaskInfo taskInfo(DiscoveryNode node, boolean detailed) {
return new TaskInfo(node, getId(), getType(), getAction(), detailed ? getDescription() : null, parentNode, parentId);
}
}

View File

@ -0,0 +1,79 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.tasks;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Provider;
/**
* Current task information
*/
public class Task {
private final long id;
private final String type;
private final String action;
private final Provider<String> description;
public Task(long id, String type, String action, Provider<String> description) {
this.id = id;
this.type = type;
this.action = action;
this.description = description;
}
public TaskInfo taskInfo(DiscoveryNode node, boolean detailed) {
return new TaskInfo(node, id, type, action, detailed ? getDescription() : null);
}
/**
* Returns task id
*/
public long getId() {
return id;
}
/**
* Returns task channel type (netty, transport, direct)
*/
public String getType() {
return type;
}
/**
* Returns task action
*/
public String getAction() {
return action;
}
/**
* Generates task description
*/
public String getDescription() {
return description.get();
}
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.tasks;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
import org.elasticsearch.transport.TransportRequest;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
/**
* Task Manager service for keeping track of currently running tasks on the nodes
*/
public class TaskManager extends AbstractComponent {
private final ConcurrentMapLong<Task> tasks = ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency();
private final AtomicLong taskIdGenerator = new AtomicLong();
public TaskManager(Settings settings) {
super(settings);
}
/**
* Registers a task without parent task
*/
public Task register(String type, String action, TransportRequest request) {
Task task = request.createTask(taskIdGenerator.incrementAndGet(), type, action);
if (task != null) {
if (logger.isDebugEnabled()) {
logger.debug("register {} [{}] [{}] [{}]", task.getId(), type, action, task.getDescription());
}
Task previousTask = tasks.put(task.getId(), task);
assert previousTask == null;
}
return task;
}
/**
* Unregister the task
*/
public void unregister(Task task) {
logger.debug("unregister task for id: {}", task.getId());
tasks.remove(task.getId());
}
/**
* Returns the list of currently running tasks on the node
*/
public Map<Long, Task> getTasks() {
return Collections.unmodifiableMap(new HashMap<>(tasks));
}
}

View File

@ -0,0 +1,74 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import java.io.IOException;
/**
* Wrapper around transport channel that delegates all requests to the
* underlying channel
*/
public class DelegatingTransportChannel implements TransportChannel {
private final TransportChannel channel;
protected DelegatingTransportChannel(TransportChannel channel) {
this.channel = channel;
}
@Override
public String action() {
return channel.action();
}
@Override
public String getProfileName() {
return channel.getProfileName();
}
@Override
public long getRequestId() {
return channel.getRequestId();
}
@Override
public String getChannelType() {
return channel.getChannelType();
}
@Override
public void sendResponse(TransportResponse response) throws IOException {
channel.sendResponse(response);
}
@Override
public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
channel.sendResponse(response, options);
}
@Override
public void sendResponse(Throwable error) throws IOException {
channel.sendResponse(error);
}
public TransportChannel getChannel() {
return channel;
}
}

View File

@ -19,7 +19,10 @@
package org.elasticsearch.transport; package org.elasticsearch.transport;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskManager;
import java.io.IOException;
import java.util.function.Supplier; import java.util.function.Supplier;
/** /**
@ -32,14 +35,16 @@ public class RequestHandlerRegistry<Request extends TransportRequest> {
private final boolean forceExecution; private final boolean forceExecution;
private final String executor; private final String executor;
private final Supplier<Request> requestFactory; private final Supplier<Request> requestFactory;
private final TaskManager taskManager;
public RequestHandlerRegistry(String action, Supplier<Request> requestFactory, TransportRequestHandler<Request> handler, String executor, boolean forceExecution) { public RequestHandlerRegistry(String action, Supplier<Request> requestFactory, TaskManager taskManager, TransportRequestHandler<Request> handler, String executor, boolean forceExecution) {
this.action = action; this.action = action;
this.requestFactory = requestFactory; this.requestFactory = requestFactory;
assert newRequest() != null; assert newRequest() != null;
this.handler = handler; this.handler = handler;
this.forceExecution = forceExecution; this.forceExecution = forceExecution;
this.executor = executor; this.executor = executor;
this.taskManager = taskManager;
} }
public String getAction() { public String getAction() {
@ -50,8 +55,21 @@ public class RequestHandlerRegistry<Request extends TransportRequest> {
return requestFactory.get(); return requestFactory.get();
} }
public TransportRequestHandler<Request> getHandler() { public void processMessageReceived(Request request, TransportChannel channel) throws Exception {
return handler; final Task task = taskManager.register(channel.getChannelType(), action, request);
if (task == null) {
handler.messageReceived(request, channel);
} else {
boolean success = false;
try {
handler.messageReceived(request, new TransportChannelWrapper(taskManager, task, channel), task);
success = true;
} finally {
if (success == false) {
taskManager.unregister(task);
}
}
}
} }
public boolean isForceExecution() { public boolean isForceExecution() {
@ -61,4 +79,44 @@ public class RequestHandlerRegistry<Request extends TransportRequest> {
public String getExecutor() { public String getExecutor() {
return executor; return executor;
} }
@Override
public String toString() {
return handler.toString();
}
private static class TransportChannelWrapper extends DelegatingTransportChannel {
private final Task task;
private final TaskManager taskManager;
public TransportChannelWrapper(TaskManager taskManager, Task task, TransportChannel channel) {
super(channel);
this.task = task;
this.taskManager = taskManager;
}
@Override
public void sendResponse(TransportResponse response) throws IOException {
endTask();
super.sendResponse(response);
}
@Override
public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
endTask();
super.sendResponse(response, options);
}
@Override
public void sendResponse(Throwable error) throws IOException {
endTask();
super.sendResponse(error);
}
private void endTask() {
taskManager.unregister(task);
}
}
} }

View File

@ -30,6 +30,10 @@ public interface TransportChannel {
String getProfileName(); String getProfileName();
long getRequestId();
String getChannelType();
void sendResponse(TransportResponse response) throws IOException; void sendResponse(TransportResponse response) throws IOException;
void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException; void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException;

View File

@ -19,6 +19,8 @@
package org.elasticsearch.transport; package org.elasticsearch.transport;
import org.elasticsearch.tasks.Task;
/** /**
*/ */
public abstract class TransportRequest extends TransportMessage<TransportRequest> { public abstract class TransportRequest extends TransportMessage<TransportRequest> {
@ -43,4 +45,14 @@ public abstract class TransportRequest extends TransportMessage<TransportRequest
super(request); super(request);
} }
public Task createTask(long id, String type, String action) {
return new Task(id, type, action, this::getDescription);
}
/**
* Returns optional description of the request to be displayed by the task manager
*/
public String getDescription() {
return this.toString();
}
} }

View File

@ -19,10 +19,19 @@
package org.elasticsearch.transport; package org.elasticsearch.transport;
import org.elasticsearch.tasks.Task;
/** /**
* *
*/ */
public interface TransportRequestHandler<T extends TransportRequest> { public interface TransportRequestHandler<T extends TransportRequest> {
/**
* Override this method if access to the Task parameter is needed
*/
default void messageReceived(final T request, final TransportChannel channel, Task task) throws Exception {
messageReceived(request, channel);
}
void messageReceived(T request, TransportChannel channel) throws Exception; void messageReceived(T request, TransportChannel channel) throws Exception;
} }

View File

@ -39,6 +39,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.ConcurrentMapLong; import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException; import java.io.IOException;
@ -66,6 +67,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean started = new AtomicBoolean(false);
protected final Transport transport; protected final Transport transport;
protected final ThreadPool threadPool; protected final ThreadPool threadPool;
protected final TaskManager taskManager;
volatile Map<String, RequestHandlerRegistry> requestHandlers = Collections.emptyMap(); volatile Map<String, RequestHandlerRegistry> requestHandlers = Collections.emptyMap();
final Object requestHandlerMutex = new Object(); final Object requestHandlerMutex = new Object();
@ -114,6 +116,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
setTracerLogExclude(TRACE_LOG_EXCLUDE_SETTING.get(settings)); setTracerLogExclude(TRACE_LOG_EXCLUDE_SETTING.get(settings));
tracerLog = Loggers.getLogger(logger, ".tracer"); tracerLog = Loggers.getLogger(logger, ".tracer");
adapter = createAdapter(); adapter = createAdapter();
taskManager = new TaskManager(settings);
} }
/** /**
@ -129,6 +132,10 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
return localNode; return localNode;
} }
public TaskManager getTaskManager() {
return taskManager;
}
protected Adapter createAdapter() { protected Adapter createAdapter() {
return new Adapter(); return new Adapter();
} }
@ -326,13 +333,13 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
final String executor = reg.getExecutor(); final String executor = reg.getExecutor();
if (ThreadPool.Names.SAME.equals(executor)) { if (ThreadPool.Names.SAME.equals(executor)) {
//noinspection unchecked //noinspection unchecked
reg.getHandler().messageReceived(request, channel); reg.processMessageReceived(request, channel);
} else { } else {
threadPool.executor(executor).execute(new AbstractRunnable() { threadPool.executor(executor).execute(new AbstractRunnable() {
@Override @Override
protected void doRun() throws Exception { protected void doRun() throws Exception {
//noinspection unchecked //noinspection unchecked
reg.getHandler().messageReceived(request, channel); reg.processMessageReceived(request, channel);
} }
@Override @Override
@ -391,7 +398,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
* @param handler The handler itself that implements the request handling * @param handler The handler itself that implements the request handling
*/ */
public <Request extends TransportRequest> void registerRequestHandler(String action, Supplier<Request> requestFactory, String executor, TransportRequestHandler<Request> handler) { public <Request extends TransportRequest> void registerRequestHandler(String action, Supplier<Request> requestFactory, String executor, TransportRequestHandler<Request> handler) {
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(action, requestFactory, handler, executor, false); RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(action, requestFactory, taskManager, handler, executor, false);
registerRequestHandler(reg); registerRequestHandler(reg);
} }
@ -404,7 +411,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
* @param handler The handler itself that implements the request handling * @param handler The handler itself that implements the request handling
*/ */
public <Request extends TransportRequest> void registerRequestHandler(String action, Supplier<Request> request, String executor, boolean forceExecution, TransportRequestHandler<Request> handler) { public <Request extends TransportRequest> void registerRequestHandler(String action, Supplier<Request> request, String executor, boolean forceExecution, TransportRequestHandler<Request> handler) {
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(action, request, handler, executor, forceExecution); RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(action, request, taskManager, handler, executor, forceExecution);
registerRequestHandler(reg); registerRequestHandler(reg);
} }
@ -413,7 +420,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
RequestHandlerRegistry replaced = requestHandlers.get(reg.getAction()); RequestHandlerRegistry replaced = requestHandlers.get(reg.getAction());
requestHandlers = MapBuilder.newMapBuilder(requestHandlers).put(reg.getAction(), reg).immutableMap(); requestHandlers = MapBuilder.newMapBuilder(requestHandlers).put(reg.getAction(), reg).immutableMap();
if (replaced != null) { if (replaced != null) {
logger.warn("registered two transport handlers for action {}, handlers: {}, {}", reg.getAction(), reg.getHandler(), replaced.getHandler()); logger.warn("registered two transport handlers for action {}, handlers: {}, {}", reg.getAction(), reg, replaced);
} }
} }
} }
@ -797,6 +804,17 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
logger.error("failed to handle exception for action [{}], handler [{}]", e, action, handler); logger.error("failed to handle exception for action [{}], handler [{}]", e, action, handler);
} }
} }
@Override
public long getRequestId() {
return requestId;
}
@Override
public String getChannelType() {
return "direct";
}
} }
} }

View File

@ -287,13 +287,13 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
request.readFrom(stream); request.readFrom(stream);
if (ThreadPool.Names.SAME.equals(reg.getExecutor())) { if (ThreadPool.Names.SAME.equals(reg.getExecutor())) {
//noinspection unchecked //noinspection unchecked
reg.getHandler().messageReceived(request, transportChannel); reg.processMessageReceived(request, transportChannel);
} else { } else {
threadPool.executor(reg.getExecutor()).execute(new AbstractRunnable() { threadPool.executor(reg.getExecutor()).execute(new AbstractRunnable() {
@Override @Override
protected void doRun() throws Exception { protected void doRun() throws Exception {
//noinspection unchecked //noinspection unchecked
reg.getHandler().messageReceived(request, transportChannel); reg.processMessageReceived(request, transportChannel);
} }
@Override @Override

View File

@ -106,6 +106,16 @@ public class LocalTransportChannel implements TransportChannel {
sourceTransportServiceAdapter.onResponseSent(requestId, action, error); sourceTransportServiceAdapter.onResponseSent(requestId, action, error);
} }
@Override
public long getRequestId() {
return requestId;
}
@Override
public String getChannelType() {
return "local";
}
private void writeResponseExceptionHeader(BytesStreamOutput stream) throws IOException { private void writeResponseExceptionHeader(BytesStreamOutput stream) throws IOException {
stream.writeLong(requestId); stream.writeLong(requestId);
byte status = 0; byte status = 0;

View File

@ -255,7 +255,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
request.readFrom(buffer); request.readFrom(buffer);
if (ThreadPool.Names.SAME.equals(reg.getExecutor())) { if (ThreadPool.Names.SAME.equals(reg.getExecutor())) {
//noinspection unchecked //noinspection unchecked
reg.getHandler().messageReceived(request, transportChannel); reg.processMessageReceived(request, transportChannel);
} else { } else {
threadPool.executor(reg.getExecutor()).execute(new RequestHandler(reg, request, transportChannel)); threadPool.executor(reg.getExecutor()).execute(new RequestHandler(reg, request, transportChannel));
} }
@ -310,7 +310,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
@SuppressWarnings({"unchecked"}) @SuppressWarnings({"unchecked"})
@Override @Override
protected void doRun() throws Exception { protected void doRun() throws Exception {
reg.getHandler().messageReceived(request, transportChannel); reg.processMessageReceived(request, transportChannel);
} }
@Override @Override

View File

@ -132,6 +132,16 @@ public class NettyTransportChannel implements TransportChannel {
transportServiceAdapter.onResponseSent(requestId, action, error); transportServiceAdapter.onResponseSent(requestId, action, error);
} }
@Override
public long getRequestId() {
return requestId;
}
@Override
public String getChannelType() {
return "netty";
}
/** /**
* Returns the underlying netty channel. This method is intended be used for access to netty to get additional * Returns the underlying netty channel. This method is intended be used for access to netty to get additional
* details when processing the request and may be used by plugins. Responses should be sent using the methods * details when processing the request and may be used by plugins. Responses should be sent using the methods

View File

@ -0,0 +1,38 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.node.tasks;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.test.ESIntegTestCase;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
/**
* Integration tests for task management API
*/
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE)
public class TasksIT extends ESIntegTestCase {
public void testTaskCounts() {
// Run only on data nodes
ListTasksResponse response = client().admin().cluster().prepareListTasks("data:true").setActions(ListTasksAction.NAME + "[n]").get();
assertThat(response.getTasks().size(), greaterThanOrEqualTo(cluster().numDataNodes()));
}
}

View File

@ -0,0 +1,664 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.node.tasks;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.ChildTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.cluster.TestClusterService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.local.LocalTransport;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceArray;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.not;
public class TransportTasksActionTests extends ESTestCase {
private static ThreadPool threadPool;
private static final ClusterName clusterName = new ClusterName("test-cluster");
private TestNode[] testNodes;
private int nodesCount;
@BeforeClass
public static void beforeClass() {
threadPool = new ThreadPool(TransportTasksActionTests.class.getSimpleName());
}
@AfterClass
public static void afterClass() {
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
threadPool = null;
}
@Before
public final void setupTestNodes() throws Exception {
nodesCount = randomIntBetween(2, 10);
testNodes = new TestNode[nodesCount];
for (int i = 0; i < testNodes.length; i++) {
testNodes[i] = new TestNode("node" + i, threadPool, Settings.EMPTY);
}
}
@After
public final void shutdownTestNodes() throws Exception {
for (TestNode testNode : testNodes) {
testNode.close();
}
}
private static class TestNode implements Releasable {
public TestNode(String name, ThreadPool threadPool, Settings settings) {
clusterService = new TestClusterService(threadPool);
transportService = new TransportService(Settings.EMPTY,
new LocalTransport(Settings.EMPTY, threadPool, Version.CURRENT, new NamedWriteableRegistry()),
threadPool);
transportService.start();
discoveryNode = new DiscoveryNode(name, transportService.boundAddress().publishAddress(), Version.CURRENT);
transportListTasksAction = new TransportListTasksAction(settings, clusterName, threadPool, clusterService, transportService,
new ActionFilters(Collections.emptySet()), new IndexNameExpressionResolver(settings));
}
public final TestClusterService clusterService;
public final TransportService transportService;
public final DiscoveryNode discoveryNode;
public final TransportListTasksAction transportListTasksAction;
@Override
public void close() {
transportService.close();
}
}
public static void connectNodes(TestNode... nodes) {
DiscoveryNode[] discoveryNodes = new DiscoveryNode[nodes.length];
for (int i = 0; i < nodes.length; i++) {
discoveryNodes[i] = nodes[i].discoveryNode;
}
DiscoveryNode master = discoveryNodes[0];
for (TestNode node : nodes) {
node.clusterService.setState(ClusterStateCreationUtils.state(node.discoveryNode, master, discoveryNodes));
}
for (TestNode nodeA : nodes) {
for (TestNode nodeB : nodes) {
nodeA.transportService.connectToNode(nodeB.discoveryNode);
}
}
}
public static class NodeRequest extends BaseNodeRequest {
protected String requestName;
private boolean enableTaskManager;
public NodeRequest() {
super();
}
public NodeRequest(NodesRequest request, String nodeId) {
super(request, nodeId);
requestName = request.requestName;
enableTaskManager = request.enableTaskManager;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
requestName = in.readString();
enableTaskManager = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(requestName);
out.writeBoolean(enableTaskManager);
}
@Override
public String getDescription() {
return "NodeRequest[" + requestName + ", " + enableTaskManager + "]";
}
@Override
public Task createTask(long id, String type, String action) {
if (enableTaskManager) {
return super.createTask(id, type, action);
} else {
return null;
}
}
}
public static class NodesRequest extends BaseNodesRequest<NodesRequest> {
private String requestName;
private boolean enableTaskManager;
private NodesRequest() {
super();
}
public NodesRequest(String requestName, String... nodesIds) {
this(requestName, true, nodesIds);
}
public NodesRequest(String requestName, boolean enableTaskManager, String... nodesIds) {
super(nodesIds);
this.requestName = requestName;
this.enableTaskManager = enableTaskManager;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
requestName = in.readString();
enableTaskManager = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(requestName);
out.writeBoolean(enableTaskManager);
}
@Override
public String getDescription() {
return "NodesRequest[" + requestName + ", " + enableTaskManager + "]";
}
@Override
public Task createTask(long id, String type, String action) {
if (enableTaskManager) {
return super.createTask(id, type, action);
} else {
return null;
}
}
}
static class NodeResponse extends BaseNodeResponse {
protected NodeResponse() {
super();
}
protected NodeResponse(DiscoveryNode node) {
super(node);
}
}
static class NodesResponse extends BaseNodesResponse<NodeResponse> {
private int failureCount;
protected NodesResponse(ClusterName clusterName, NodeResponse[] nodes, int failureCount) {
super(clusterName, nodes);
this.failureCount = failureCount;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
failureCount = in.readVInt();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(failureCount);
}
public int failureCount() {
return failureCount;
}
}
/**
* Simulates node-based task that can be used to block node tasks so they are guaranteed to be registered by task manager
*/
abstract class TestNodesAction extends TransportNodesAction<NodesRequest, NodesResponse, NodeRequest, NodeResponse> {
TestNodesAction(Settings settings, String actionName, ClusterName clusterName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService) {
super(settings, actionName, clusterName, threadPool, clusterService, transportService,
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY),
NodesRequest::new, NodeRequest::new, ThreadPool.Names.GENERIC);
}
@Override
protected NodesResponse newResponse(NodesRequest request, AtomicReferenceArray responses) {
final List<NodeResponse> nodesList = new ArrayList<>();
int failureCount = 0;
for (int i = 0; i < responses.length(); i++) {
Object resp = responses.get(i);
if (resp instanceof NodeResponse) { // will also filter out null response for unallocated ones
nodesList.add((NodeResponse) resp);
} else if (resp instanceof FailedNodeException) {
failureCount++;
} else {
logger.warn("unknown response type [{}], expected NodeLocalGatewayMetaState or FailedNodeException", resp);
}
}
return new NodesResponse(clusterName, nodesList.toArray(new NodeResponse[nodesList.size()]), failureCount);
}
@Override
protected NodeRequest newNodeRequest(String nodeId, NodesRequest request) {
return new NodeRequest(request, nodeId);
}
@Override
protected NodeResponse newNodeResponse() {
return new NodeResponse();
}
@Override
protected abstract NodeResponse nodeOperation(NodeRequest request);
@Override
protected boolean accumulateExceptions() {
return true;
}
}
static class TestTaskResponse implements Writeable<TestTaskResponse> {
private final String status;
public TestTaskResponse(StreamInput in) throws IOException {
status = in.readString();
}
public TestTaskResponse(String status) {
this.status = status;
}
public String getStatus() {
return status;
}
@Override
public TestTaskResponse readFrom(StreamInput in) throws IOException {
return new TestTaskResponse(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(status);
}
}
static class TestTasksRequest extends BaseTasksRequest<TestTasksRequest> {
}
static class TestTasksResponse extends BaseTasksResponse {
private List<TestTaskResponse> tasks;
public TestTasksResponse() {
}
public TestTasksResponse(List<TestTaskResponse> tasks, List<TaskOperationFailure> taskFailures, List<? extends FailedNodeException> nodeFailures) {
super(taskFailures, nodeFailures);
this.tasks = tasks == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(tasks));
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
int taskCount = in.readVInt();
List<TestTaskResponse> builder = new ArrayList<>();
for (int i = 0; i < taskCount; i++) {
builder.add(new TestTaskResponse(in));
}
tasks = Collections.unmodifiableList(builder);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(tasks.size());
for (TestTaskResponse task : tasks) {
task.writeTo(out);
}
}
}
/**
* Test class for testing task operations
*/
static abstract class TestTasksAction extends TransportTasksAction<TestTasksRequest, TestTasksResponse, TestTaskResponse> {
protected TestTasksAction(Settings settings, String actionName, ClusterName clusterName, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService) {
super(settings, actionName, clusterName, threadPool, clusterService, transportService, new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY),
TestTasksRequest::new, TestTasksResponse::new, ThreadPool.Names.MANAGEMENT);
}
@Override
protected TestTasksResponse newResponse(TestTasksRequest request, List<TestTaskResponse> tasks, List<TaskOperationFailure> taskOperationFailures, List<FailedNodeException> failedNodeExceptions) {
return new TestTasksResponse(tasks, taskOperationFailures, failedNodeExceptions);
}
@Override
protected TestTaskResponse readTaskResponse(StreamInput in) throws IOException {
return new TestTaskResponse(in);
}
@Override
protected boolean accumulateExceptions() {
return true;
}
}
private ActionFuture<NodesResponse> startBlockingTestNodesAction(CountDownLatch checkLatch) throws InterruptedException {
return startBlockingTestNodesAction(checkLatch, new NodesRequest("Test Request"));
}
private ActionFuture<NodesResponse> startBlockingTestNodesAction(CountDownLatch checkLatch, NodesRequest request) throws InterruptedException {
CountDownLatch actionLatch = new CountDownLatch(nodesCount);
TestNodesAction[] actions = new TestNodesAction[nodesCount];
for (int i = 0; i < testNodes.length; i++) {
final int node = i;
actions[i] = new TestNodesAction(Settings.EMPTY, "testAction", clusterName, threadPool, testNodes[i].clusterService, testNodes[i].transportService) {
@Override
protected NodeResponse nodeOperation(NodeRequest request) {
logger.info("Action on node " + node);
actionLatch.countDown();
try {
checkLatch.await();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
logger.info("Action on node " + node + " finished");
return new NodeResponse(testNodes[node].discoveryNode);
}
};
}
// Make sure no tasks are running
for (TestNode node : testNodes) {
assertEquals(0, node.transportService.getTaskManager().getTasks().size());
}
ActionFuture<NodesResponse> future = actions[0].execute(request);
logger.info("Awaiting for all actions to start");
actionLatch.await();
logger.info("Done waiting for all actions to start");
return future;
}
public void testRunningTasksCount() throws Exception {
connectNodes(testNodes);
CountDownLatch checkLatch = new CountDownLatch(1);
ActionFuture<NodesResponse> future = startBlockingTestNodesAction(checkLatch);
// Check task counts using taskManager
Map<Long, Task> localTasks = testNodes[0].transportService.getTaskManager().getTasks();
assertEquals(2, localTasks.size()); // all node tasks + 1 coordinating task
Task coordinatingTask = localTasks.get(Collections.min(localTasks.keySet()));
Task subTask = localTasks.get(Collections.max(localTasks.keySet()));
assertThat(subTask.getAction(), endsWith("[n]"));
assertThat(coordinatingTask.getAction(), not(endsWith("[n]")));
for (int i = 1; i < testNodes.length; i++) {
Map<Long, Task> remoteTasks = testNodes[i].transportService.getTaskManager().getTasks();
assertEquals(1, remoteTasks.size());
Task remoteTask = remoteTasks.values().iterator().next();
assertThat(remoteTask.getAction(), endsWith("[n]"));
}
// Check task counts using transport
int testNodeNum = randomIntBetween(0, testNodes.length - 1);
TestNode testNode = testNodes[testNodeNum];
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.actions("testAction*"); // pick all test actions
logger.info("Listing currently running tasks using node [{}]", testNodeNum);
ListTasksResponse response = testNode.transportListTasksAction.execute(listTasksRequest).get();
logger.info("Checking currently running tasks");
assertEquals(testNodes.length, response.getPerNodeTasks().size());
// Coordinating node
assertEquals(2, response.getPerNodeTasks().get(testNodes[0].discoveryNode).size());
// Other nodes node
for (int i = 1; i < testNodes.length; i++) {
assertEquals(1, response.getPerNodeTasks().get(testNodes[i].discoveryNode).size());
}
// Check task counts using transport with filtering
testNode = testNodes[randomIntBetween(0, testNodes.length - 1)];
listTasksRequest = new ListTasksRequest();
listTasksRequest.actions("testAction[n]"); // only pick node actions
response = testNode.transportListTasksAction.execute(listTasksRequest).get();
assertEquals(testNodes.length, response.getPerNodeTasks().size());
for (Map.Entry<DiscoveryNode, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) {
assertEquals(1, entry.getValue().size());
assertNull(entry.getValue().get(0).getDescription());
}
// Check task counts using transport with detailed description
listTasksRequest.detailed(true); // same request only with detailed description
response = testNode.transportListTasksAction.execute(listTasksRequest).get();
assertEquals(testNodes.length, response.getPerNodeTasks().size());
for (Map.Entry<DiscoveryNode, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) {
assertEquals(1, entry.getValue().size());
assertEquals("NodeRequest[Test Request, true]", entry.getValue().get(0).getDescription());
}
// Release all tasks and wait for response
checkLatch.countDown();
NodesResponse responses = future.get();
assertEquals(0, responses.failureCount());
// Make sure that we don't have any lingering tasks
for (TestNode node : testNodes) {
assertEquals(0, node.transportService.getTaskManager().getTasks().size());
}
}
public void testFindChildTasks() throws Exception {
connectNodes(testNodes);
CountDownLatch checkLatch = new CountDownLatch(1);
ActionFuture<NodesResponse> future = startBlockingTestNodesAction(checkLatch);
TestNode testNode = testNodes[randomIntBetween(0, testNodes.length - 1)];
// Get the parent task
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.actions("testAction");
ListTasksResponse response = testNode.transportListTasksAction.execute(listTasksRequest).get();
assertEquals(1, response.getTasks().size());
String parentNode = response.getTasks().get(0).getNode().getId();
long parentTaskId = response.getTasks().get(0).getId();
// Find tasks with common parent
listTasksRequest = new ListTasksRequest();
listTasksRequest.parentNode(parentNode);
listTasksRequest.parentTaskId(parentTaskId);
response = testNode.transportListTasksAction.execute(listTasksRequest).get();
assertEquals(testNodes.length, response.getTasks().size());
for (TaskInfo task : response.getTasks()) {
assertEquals("testAction[n]", task.getAction());
assertEquals(parentNode, task.getParentNode());
assertEquals(parentTaskId, task.getParentId());
}
// Release all tasks and wait for response
checkLatch.countDown();
NodesResponse responses = future.get();
assertEquals(0, responses.failureCount());
}
public void testTaskManagementOptOut() throws Exception {
connectNodes(testNodes);
CountDownLatch checkLatch = new CountDownLatch(1);
// Starting actions that disable task manager
ActionFuture<NodesResponse> future = startBlockingTestNodesAction(checkLatch, new NodesRequest("Test Request", false));
TestNode testNode = testNodes[randomIntBetween(0, testNodes.length - 1)];
// Get the parent task
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.actions("testAction*");
ListTasksResponse response = testNode.transportListTasksAction.execute(listTasksRequest).get();
assertEquals(0, response.getTasks().size());
// Release all tasks and wait for response
checkLatch.countDown();
NodesResponse responses = future.get();
assertEquals(0, responses.failureCount());
}
public void testTasksDescriptions() throws Exception {
connectNodes(testNodes);
CountDownLatch checkLatch = new CountDownLatch(1);
ActionFuture<NodesResponse> future = startBlockingTestNodesAction(checkLatch);
// Check task counts using transport with filtering
TestNode testNode = testNodes[randomIntBetween(0, testNodes.length - 1)];
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.actions("testAction[n]"); // only pick node actions
ListTasksResponse response = testNode.transportListTasksAction.execute(listTasksRequest).get();
assertEquals(testNodes.length, response.getPerNodeTasks().size());
for (Map.Entry<DiscoveryNode, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) {
assertEquals(1, entry.getValue().size());
assertNull(entry.getValue().get(0).getDescription());
}
// Check task counts using transport with detailed description
listTasksRequest.detailed(true); // same request only with detailed description
response = testNode.transportListTasksAction.execute(listTasksRequest).get();
assertEquals(testNodes.length, response.getPerNodeTasks().size());
for (Map.Entry<DiscoveryNode, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) {
assertEquals(1, entry.getValue().size());
assertEquals("NodeRequest[Test Request, true]", entry.getValue().get(0).getDescription());
}
// Release all tasks and wait for response
checkLatch.countDown();
NodesResponse responses = future.get();
assertEquals(0, responses.failureCount());
}
public void testFailedTasksCount() throws ExecutionException, InterruptedException, IOException {
connectNodes(testNodes);
TestNodesAction[] actions = new TestNodesAction[nodesCount];
for (int i = 0; i < testNodes.length; i++) {
final int node = i;
actions[i] = new TestNodesAction(Settings.EMPTY, "testAction", clusterName, threadPool, testNodes[i].clusterService, testNodes[i].transportService) {
@Override
protected NodeResponse nodeOperation(NodeRequest request) {
logger.info("Action on node " + node);
throw new RuntimeException("Test exception");
}
};
}
for (TestNode testNode : testNodes) {
assertEquals(0, testNode.transportService.getTaskManager().getTasks().size());
}
NodesRequest request = new NodesRequest("Test Request");
NodesResponse responses = actions[0].execute(request).get();
assertEquals(nodesCount, responses.failureCount());
}
public void testTaskLevelActionFailures() throws ExecutionException, InterruptedException, IOException {
connectNodes(testNodes);
CountDownLatch checkLatch = new CountDownLatch(1);
ActionFuture<NodesResponse> future = startBlockingTestNodesAction(checkLatch);
TestTasksAction[] tasksActions = new TestTasksAction[nodesCount];
final int failTaskOnNode = randomIntBetween(1, nodesCount - 1);
for (int i = 0; i < testNodes.length; i++) {
final int node = i;
// Simulate task action that fails on one of the tasks on one of the nodes
tasksActions[i] = new TestTasksAction(Settings.EMPTY, "testTasksAction", clusterName, threadPool, testNodes[i].clusterService, testNodes[i].transportService) {
@Override
protected TestTaskResponse taskOperation(TestTasksRequest request, Task task) {
logger.info("Task action on node " + node);
if (failTaskOnNode == node && ((ChildTask) task).getParentNode() != null) {
logger.info("Failing on node " + node);
throw new RuntimeException("Task level failure");
}
return new TestTaskResponse("Success on node " + node);
}
};
}
// Run task action on node tasks that are currently running
// should be successful on all nodes except one
TestTasksRequest testTasksRequest = new TestTasksRequest();
testTasksRequest.actions("testAction[n]"); // pick all test actions
TestTasksResponse response = tasksActions[0].execute(testTasksRequest).get();
// Get successful responses from all nodes except one
assertEquals(testNodes.length - 1, response.tasks.size());
assertEquals(1, response.getTaskFailures().size()); // one task failed
assertThat(response.getTaskFailures().get(0).getReason(), containsString("Task level failure"));
assertEquals(0, response.getNodeFailures().size()); // no nodes failed
// Release all node tasks and wait for response
checkLatch.countDown();
NodesResponse responses = future.get();
assertEquals(0, responses.failureCount());
}
}

View File

@ -25,6 +25,8 @@ import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.junit.Before; import org.junit.Before;
@ -67,7 +69,7 @@ public class TransportActionFilterChainTests extends ESTestCase {
String actionName = randomAsciiOfLength(randomInt(30)); String actionName = randomAsciiOfLength(randomInt(30));
ActionFilters actionFilters = new ActionFilters(filters); ActionFilters actionFilters = new ActionFilters(filters);
TransportAction<TestRequest, TestResponse> transportAction = new TransportAction<TestRequest, TestResponse>(Settings.EMPTY, actionName, null, actionFilters, null) { TransportAction<TestRequest, TestResponse> transportAction = new TransportAction<TestRequest, TestResponse>(Settings.EMPTY, actionName, null, actionFilters, null, new TaskManager(Settings.EMPTY)) {
@Override @Override
protected void doExecute(TestRequest request, ActionListener<TestResponse> listener) { protected void doExecute(TestRequest request, ActionListener<TestResponse> listener) {
listener.onResponse(new TestResponse()); listener.onResponse(new TestResponse());
@ -147,7 +149,7 @@ public class TransportActionFilterChainTests extends ESTestCase {
String actionName = randomAsciiOfLength(randomInt(30)); String actionName = randomAsciiOfLength(randomInt(30));
ActionFilters actionFilters = new ActionFilters(filters); ActionFilters actionFilters = new ActionFilters(filters);
TransportAction<TestRequest, TestResponse> transportAction = new TransportAction<TestRequest, TestResponse>(Settings.EMPTY, actionName, null, actionFilters, null) { TransportAction<TestRequest, TestResponse> transportAction = new TransportAction<TestRequest, TestResponse>(Settings.EMPTY, actionName, null, actionFilters, null, new TaskManager(Settings.EMPTY)) {
@Override @Override
protected void doExecute(TestRequest request, ActionListener<TestResponse> listener) { protected void doExecute(TestRequest request, ActionListener<TestResponse> listener) {
listener.onResponse(new TestResponse()); listener.onResponse(new TestResponse());
@ -218,9 +220,9 @@ public class TransportActionFilterChainTests extends ESTestCase {
RequestTestFilter testFilter = new RequestTestFilter(randomInt(), new RequestCallback() { RequestTestFilter testFilter = new RequestTestFilter(randomInt(), new RequestCallback() {
@Override @Override
public void execute(final String action, final ActionRequest actionRequest, final ActionListener actionListener, final ActionFilterChain actionFilterChain) { public void execute(Task task, final String action, final ActionRequest actionRequest, final ActionListener actionListener, final ActionFilterChain actionFilterChain) {
for (int i = 0; i <= additionalContinueCount; i++) { for (int i = 0; i <= additionalContinueCount; i++) {
actionFilterChain.proceed(action, actionRequest, actionListener); actionFilterChain.proceed(task, action, actionRequest, actionListener);
} }
} }
}); });
@ -230,7 +232,7 @@ public class TransportActionFilterChainTests extends ESTestCase {
String actionName = randomAsciiOfLength(randomInt(30)); String actionName = randomAsciiOfLength(randomInt(30));
ActionFilters actionFilters = new ActionFilters(filters); ActionFilters actionFilters = new ActionFilters(filters);
TransportAction<TestRequest, TestResponse> transportAction = new TransportAction<TestRequest, TestResponse>(Settings.EMPTY, actionName, null, actionFilters, null) { TransportAction<TestRequest, TestResponse> transportAction = new TransportAction<TestRequest, TestResponse>(Settings.EMPTY, actionName, null, actionFilters, null, new TaskManager(Settings.EMPTY)) {
@Override @Override
protected void doExecute(TestRequest request, ActionListener<TestResponse> listener) { protected void doExecute(TestRequest request, ActionListener<TestResponse> listener) {
listener.onResponse(new TestResponse()); listener.onResponse(new TestResponse());
@ -286,7 +288,7 @@ public class TransportActionFilterChainTests extends ESTestCase {
String actionName = randomAsciiOfLength(randomInt(30)); String actionName = randomAsciiOfLength(randomInt(30));
ActionFilters actionFilters = new ActionFilters(filters); ActionFilters actionFilters = new ActionFilters(filters);
TransportAction<TestRequest, TestResponse> transportAction = new TransportAction<TestRequest, TestResponse>(Settings.EMPTY, actionName, null, actionFilters, null) { TransportAction<TestRequest, TestResponse> transportAction = new TransportAction<TestRequest, TestResponse>(Settings.EMPTY, actionName, null, actionFilters, null, new TaskManager(Settings.EMPTY)) {
@Override @Override
protected void doExecute(TestRequest request, ActionListener<TestResponse> listener) { protected void doExecute(TestRequest request, ActionListener<TestResponse> listener) {
listener.onResponse(new TestResponse()); listener.onResponse(new TestResponse());
@ -344,11 +346,11 @@ public class TransportActionFilterChainTests extends ESTestCase {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public void apply(String action, ActionRequest actionRequest, ActionListener actionListener, ActionFilterChain actionFilterChain) { public void apply(Task task, String action, ActionRequest actionRequest, ActionListener actionListener, ActionFilterChain actionFilterChain) {
this.runs.incrementAndGet(); this.runs.incrementAndGet();
this.lastActionName = action; this.lastActionName = action;
this.executionToken = counter.incrementAndGet(); this.executionToken = counter.incrementAndGet();
this.callback.execute(action, actionRequest, actionListener, actionFilterChain); this.callback.execute(task, action, actionRequest, actionListener, actionFilterChain);
} }
@Override @Override
@ -375,8 +377,8 @@ public class TransportActionFilterChainTests extends ESTestCase {
} }
@Override @Override
public void apply(String action, ActionRequest request, ActionListener listener, ActionFilterChain chain) { public void apply(Task task, String action, ActionRequest request, ActionListener listener, ActionFilterChain chain) {
chain.proceed(action, request, listener); chain.proceed(task, action, request, listener);
} }
@Override @Override
@ -391,20 +393,20 @@ public class TransportActionFilterChainTests extends ESTestCase {
private static enum RequestOperation implements RequestCallback { private static enum RequestOperation implements RequestCallback {
CONTINUE_PROCESSING { CONTINUE_PROCESSING {
@Override @Override
public void execute(String action, ActionRequest actionRequest, ActionListener actionListener, ActionFilterChain actionFilterChain) { public void execute(Task task, String action, ActionRequest actionRequest, ActionListener actionListener, ActionFilterChain actionFilterChain) {
actionFilterChain.proceed(action, actionRequest, actionListener); actionFilterChain.proceed(task, action, actionRequest, actionListener);
} }
}, },
LISTENER_RESPONSE { LISTENER_RESPONSE {
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void execute(String action, ActionRequest actionRequest, ActionListener actionListener, ActionFilterChain actionFilterChain) { public void execute(Task task, String action, ActionRequest actionRequest, ActionListener actionListener, ActionFilterChain actionFilterChain) {
actionListener.onResponse(new TestResponse()); actionListener.onResponse(new TestResponse());
} }
}, },
LISTENER_FAILURE { LISTENER_FAILURE {
@Override @Override
public void execute(String action, ActionRequest actionRequest, ActionListener actionListener, ActionFilterChain actionFilterChain) { public void execute(Task task, String action, ActionRequest actionRequest, ActionListener actionListener, ActionFilterChain actionFilterChain) {
actionListener.onFailure(new ElasticsearchTimeoutException("")); actionListener.onFailure(new ElasticsearchTimeoutException(""));
} }
} }
@ -433,7 +435,7 @@ public class TransportActionFilterChainTests extends ESTestCase {
} }
private static interface RequestCallback { private static interface RequestCallback {
void execute(String action, ActionRequest actionRequest, ActionListener actionListener, ActionFilterChain actionFilterChain); void execute(Task task, String action, ActionRequest actionRequest, ActionListener actionListener, ActionFilterChain actionFilterChain);
} }
private static interface ResponseCallback { private static interface ResponseCallback {

View File

@ -470,5 +470,16 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
@Override @Override
public void sendResponse(Throwable error) throws IOException { public void sendResponse(Throwable error) throws IOException {
} }
@Override
public long getRequestId() {
return 0;
}
@Override
public String getChannelType() {
return "test";
}
} }
} }

View File

@ -42,6 +42,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.cluster.TestClusterService; import org.elasticsearch.test.cluster.TestClusterService;
import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.test.transport.CapturingTransport;
@ -119,9 +120,9 @@ public class TransportMasterNodeActionTests extends ESTestCase {
} }
@Override @Override
protected void doExecute(final Request request, ActionListener<Response> listener) { protected void doExecute(Task task, final Request request, ActionListener<Response> listener) {
// remove unneeded threading by wrapping listener with SAME to prevent super.doExecute from wrapping it with LISTENER // remove unneeded threading by wrapping listener with SAME to prevent super.doExecute from wrapping it with LISTENER
super.doExecute(request, new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.SAME, listener)); super.doExecute(task, request, new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.SAME, listener));
} }
@Override @Override
@ -159,7 +160,7 @@ public class TransportMasterNodeActionTests extends ESTestCase {
new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) { new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) {
@Override @Override
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception { protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
if (masterOperationFailure) { if (masterOperationFailure) {
listener.onFailure(exception); listener.onFailure(exception);
} else { } else {

View File

@ -862,6 +862,16 @@ public class TransportReplicationActionTests extends ESTestCase {
public void sendResponse(Throwable error) throws IOException { public void sendResponse(Throwable error) throws IOException {
listener.onFailure(error); listener.onFailure(error);
} }
@Override
public long getRequestId() {
return 0;
}
@Override
public String getChannelType() {
return "replica_test";
}
}; };
} }

View File

@ -29,6 +29,8 @@ import org.elasticsearch.client.AbstractClientHeadersTestCase;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.client.support.Headers; import org.elasticsearch.client.support.Headers;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.util.Collections; import java.util.Collections;
@ -61,7 +63,7 @@ public class NodeClientHeadersTests extends AbstractClientHeadersTestCase {
private static class InternalTransportAction extends TransportAction { private static class InternalTransportAction extends TransportAction {
private InternalTransportAction(Settings settings, String actionName, ThreadPool threadPool) { private InternalTransportAction(Settings settings, String actionName, ThreadPool threadPool) {
super(settings, actionName, threadPool, EMPTY_FILTERS, null); super(settings, actionName, threadPool, EMPTY_FILTERS, null, new TaskManager(settings));
} }
@Override @Override

View File

@ -37,6 +37,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.Transport;

View File

@ -26,6 +26,7 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportResponseHandler; import org.elasticsearch.transport.BaseTransportResponseHandler;

View File

@ -886,5 +886,15 @@ public class PublishClusterStateActionTests extends ESTestCase {
this.error.set(error); this.error.set(error);
assertThat(response.get(), nullValue()); assertThat(response.get(), nullValue());
} }
@Override
public long getRequestId() {
return 0;
}
@Override
public String getChannelType() {
return "capturing";
}
} }
} }

View File

@ -33,4 +33,4 @@ public class SimpleLocalTransportTests extends AbstractSimpleTransportTestCase {
transportService.start(); transportService.start();
return transportService; return transportService;
} }
} }

View File

@ -146,7 +146,7 @@ public class NettyTransportIT extends ESIntegTestCase {
} }
if (reg.getExecutor() == ThreadPool.Names.SAME) { if (reg.getExecutor() == ThreadPool.Names.SAME) {
//noinspection unchecked //noinspection unchecked
reg.getHandler().messageReceived(request, transportChannel); reg.processMessageReceived(request, transportChannel);
} else { } else {
threadPool.executor(reg.getExecutor()).execute(new RequestHandler(reg, request, transportChannel)); threadPool.executor(reg.getExecutor()).execute(new RequestHandler(reg, request, transportChannel));
} }
@ -176,7 +176,7 @@ public class NettyTransportIT extends ESIntegTestCase {
@SuppressWarnings({"unchecked"}) @SuppressWarnings({"unchecked"})
@Override @Override
protected void doRun() throws Exception { protected void doRun() throws Exception {
reg.getHandler().messageReceived(request, transportChannel); reg.processMessageReceived(request, transportChannel);
} }
@Override @Override

View File

@ -36,6 +36,7 @@ import java.net.UnknownHostException;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
public class SimpleNettyTransportTests extends AbstractSimpleTransportTestCase { public class SimpleNettyTransportTests extends AbstractSimpleTransportTestCase {
@Override @Override
protected MockTransportService build(Settings settings, Version version, NamedWriteableRegistry namedWriteableRegistry) { protected MockTransportService build(Settings settings, Version version, NamedWriteableRegistry namedWriteableRegistry) {
settings = Settings.builder().put(settings).put("transport.tcp.port", "0").build(); settings = Settings.builder().put(settings).put("transport.tcp.port", "0").build();
@ -53,4 +54,4 @@ public class SimpleNettyTransportTests extends AbstractSimpleTransportTestCase {
assertThat(e.getMessage(), containsString("[localhost/127.0.0.1:9876]")); assertThat(e.getMessage(), containsString("[localhost/127.0.0.1:9876]"));
} }
} }
} }

View File

@ -0,0 +1,46 @@
[[tasks-list]]
== Tasks List
The task management API allows to retrieve information about currently running tasks.
[source,js]
--------------------------------------------------
curl -XGET 'http://localhost:9200/_tasks'
curl -XGET 'http://localhost:9200/_tasks/nodeId1,nodeId2'
curl -XGET 'http://localhost:9200/_tasks/nodeId1,nodeId2/cluster:*'
--------------------------------------------------
The first command retrieves all tasks currently running on all nodes.
The second command selectively retrieves tasks from nodes
`nodeId1` and `nodeId2`. All the nodes selective options are explained
<<cluster-nodes,here>>.
The third command retrieves all cluster-related tasks running on nodes `nodeId1` and `nodeId2`.
The result will look similar to:
[source,js]
--------------------------------------------------
{
"nodes" : {
"fDlEl7PrQi6F-awHZ3aaDw" : {
"name" : "Gazer",
"transport_address" : "127.0.0.1:9300",
"host" : "127.0.0.1",
"ip" : "127.0.0.1:9300",
"tasks" : [ {
"node" : "fDlEl7PrQi6F-awHZ3aaDw",
"id" : 105,
"type" : "transport",
"action" : "cluster:monitor/nodes/tasks"
}, {
"node" : "fDlEl7PrQi6F-awHZ3aaDw",
"id" : 106,
"type" : "direct",
"action" : "cluster:monitor/nodes/tasks[n]",
"parent_node" : "fDlEl7PrQi6F-awHZ3aaDw",
"parent_id" : 105
} ]
}
}
}
--------------------------------------------------

View File

@ -92,6 +92,7 @@ import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.Script; import org.elasticsearch.script.Script;
import org.elasticsearch.script.groovy.GroovyPlugin; import org.elasticsearch.script.groovy.GroovyPlugin;
import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.test.ESIntegTestCase.Scope;

View File

@ -239,6 +239,6 @@ public class DeleteByQueryRequest extends ActionRequest<DeleteByQueryRequest> im
", size:" + size + ", size:" + size +
", timeout:" + timeout + ", timeout:" + timeout +
", routing:" + routing + ", routing:" + routing +
", query:" + query.toString(); ", query:" + query;
} }
} }

View File

@ -0,0 +1,35 @@
{
"tasks.list": {
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/master/tasks-list.html",
"methods": ["GET"],
"url": {
"path": "/_tasks",
"paths": ["/_tasks", "/_tasks/{node_id}", "/_tasks/{node_id}/{actions}"],
"parts": {
"node_id": {
"type": "list",
"description": "A comma-separated list of node IDs or names to limit the returned information; use `_local` to return information from the node you're connecting to, leave empty to get information from all nodes"
},
"actions": {
"type": "list",
"description": "A comma-separated list of actions that should be returned. Leave empty to return all."
}
},
"params": {
"detailed": {
"type": "boolean",
"description": "Return detailed task information (default: false)"
},
"parent_node": {
"type": "string",
"description": "Return tasks with specified parent node."
},
"parent_task": {
"type" : "number",
"description" : "Return tasks with specified parent task id. Set to -1 to return all."
}
}
},
"body": null
}
}

View File

@ -0,0 +1,6 @@
---
"tasks_list test":
- do:
tasks.list: {}
- is_true: nodes

View File

@ -38,6 +38,7 @@ import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.transport.DummyTransportAddress; import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.tasks.TaskManager;
import java.util.List; import java.util.List;
@ -147,6 +148,11 @@ public class NoopClusterService implements ClusterService {
return TimeValue.timeValueMillis(0); return TimeValue.timeValueMillis(0);
} }
@Override
public TaskManager getTaskManager() {
return null;
}
@Override @Override
public Lifecycle.State lifecycleState() { public Lifecycle.State lifecycleState() {
return null; return null;

View File

@ -47,6 +47,7 @@ import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.util.Arrays; import java.util.Arrays;
@ -60,6 +61,7 @@ import java.util.concurrent.ScheduledFuture;
public class TestClusterService implements ClusterService { public class TestClusterService implements ClusterService {
volatile ClusterState state; volatile ClusterState state;
private volatile TaskManager taskManager;
private final List<ClusterStateListener> listeners = new CopyOnWriteArrayList<>(); private final List<ClusterStateListener> listeners = new CopyOnWriteArrayList<>();
private final Queue<NotifyTimeout> onGoingTimeouts = ConcurrentCollections.newQueue(); private final Queue<NotifyTimeout> onGoingTimeouts = ConcurrentCollections.newQueue();
private final ThreadPool threadPool; private final ThreadPool threadPool;
@ -72,6 +74,7 @@ public class TestClusterService implements ClusterService {
public TestClusterService(ThreadPool threadPool) { public TestClusterService(ThreadPool threadPool) {
this(ClusterState.builder(new ClusterName("test")).build(), threadPool); this(ClusterState.builder(new ClusterName("test")).build(), threadPool);
taskManager = new TaskManager(Settings.EMPTY);
} }
public TestClusterService(ClusterState state) { public TestClusterService(ClusterState state) {
@ -230,6 +233,11 @@ public class TestClusterService implements ClusterService {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public TaskManager getTaskManager() {
return taskManager;
}
@Override @Override
public List<PendingClusterTask> pendingTasks() { public List<PendingClusterTask> pendingTasks() {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();

View File

@ -34,6 +34,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.RequestHandlerRegistry; import org.elasticsearch.transport.RequestHandlerRegistry;