Add task cancellation mechanism

Only tasks that extend CancellableTask can be cancelled using this mechanism. If a cancellable task has children it can elect to cancel all child tasks as well. In this case a special ban parent request is sent to all nodes. This request does two things: 1) it prevents any tasks with the banned parent task from being started, and 2) it cancels all currently running tasks that have the banned task as a parent. The ban is lifted as soon as the coordinating node notifies all other nodes that the cancelled task has finished executing. If the coordinating node leaves the cluster before it has a chance to lift its bans, all bans set by this coordinating node are automatically removed.

As an option a task can elect to automatically cancel all child tasks if their parent task was running on a node that just left the cluster. This option makes sense for cancellable heavy tasks that have no side-effects and only return results to the coordinating node. With the coordinating node gone, it doesn't make sense to run such tasks any longer since their results will be most likely discarded.
This commit is contained in:
Igor Motov 2016-02-09 14:35:22 -05:00
parent 5c80dd5048
commit 99a7d8e41f
39 changed files with 2331 additions and 235 deletions

View File

@ -28,6 +28,8 @@ import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfoAction
import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsAction;
import org.elasticsearch.action.admin.cluster.node.stats.TransportNodesStatsAction;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksAction;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.TransportCancelTasksAction;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryAction;
@ -268,6 +270,7 @@ public class ActionModule extends AbstractModule {
registerAction(NodesStatsAction.INSTANCE, TransportNodesStatsAction.class);
registerAction(NodesHotThreadsAction.INSTANCE, TransportNodesHotThreadsAction.class);
registerAction(ListTasksAction.INSTANCE, TransportListTasksAction.class);
registerAction(CancelTasksAction.INSTANCE, TransportCancelTasksAction.class);
registerAction(ClusterStatsAction.INSTANCE, TransportClusterStatsAction.class);
registerAction(ClusterStateAction.INSTANCE, TransportClusterStateAction.class);

View File

@ -0,0 +1,46 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.node.tasks.cancel;
import org.elasticsearch.action.Action;
import org.elasticsearch.client.ElasticsearchClient;
/**
* Action for cancelling running tasks
*/
public class CancelTasksAction extends Action<CancelTasksRequest, CancelTasksResponse, CancelTasksRequestBuilder> {
public static final CancelTasksAction INSTANCE = new CancelTasksAction();
public static final String NAME = "cluster:admin/tasks/cancel";
private CancelTasksAction() {
super(NAME);
}
@Override
public CancelTasksResponse newResponse() {
return new CancelTasksResponse();
}
@Override
public CancelTasksRequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new CancelTasksRequestBuilder(client, this);
}
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.node.tasks.cancel;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import java.io.IOException;
/**
* A request to cancel tasks
*/
public class CancelTasksRequest extends BaseTasksRequest<CancelTasksRequest> {
public static final String DEFAULT_REASON = "by user request";
private String reason = DEFAULT_REASON;
/**
* Cancel tasks on the specified nodes. If none are passed, all cancellable tasks on
* all nodes will be cancelled.
*/
public CancelTasksRequest(String... nodesIds) {
super(nodesIds);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
reason = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(reason);
}
@Override
public boolean match(Task task) {
return super.match(task) && task instanceof CancellableTask;
}
public CancelTasksRequest reason(String reason) {
this.reason = reason;
return this;
}
public String reason() {
return reason;
}
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.node.tasks.cancel;
import org.elasticsearch.action.support.tasks.TasksRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
/**
* Builder for the request to cancel tasks running on the specified nodes
*/
public class CancelTasksRequestBuilder extends TasksRequestBuilder<CancelTasksRequest, CancelTasksResponse, CancelTasksRequestBuilder> {
public CancelTasksRequestBuilder(ElasticsearchClient client, CancelTasksAction action) {
super(client, action, new CancelTasksRequest());
}
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.node.tasks.cancel;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo;
import java.util.List;
/**
* Returns the list of tasks that were cancelled
*/
public class CancelTasksResponse extends ListTasksResponse {
public CancelTasksResponse() {
}
public CancelTasksResponse(List<TaskInfo> tasks, List<TaskOperationFailure> taskFailures, List<? extends FailedNodeException>
nodeFailures) {
super(tasks, taskFailures, nodeFailures);
}
}

View File

@ -0,0 +1,285 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.node.tasks.cancel;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
/**
* Transport action that can be used to cancel currently running cancellable tasks.
* <p>
* For a task to be cancellable it has to return an instance of
* {@link CancellableTask} from {@link TransportRequest#createTask(long, String, String)}
*/
public class TransportCancelTasksAction extends TransportTasksAction<CancellableTask, CancelTasksRequest, CancelTasksResponse, TaskInfo> {
public static final String BAN_PARENT_ACTION_NAME = "internal:admin/tasks/ban";
@Inject
public TransportCancelTasksAction(Settings settings, ClusterName clusterName, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver
indexNameExpressionResolver) {
super(settings, CancelTasksAction.NAME, clusterName, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, CancelTasksRequest::new, CancelTasksResponse::new, ThreadPool.Names.MANAGEMENT);
transportService.registerRequestHandler(BAN_PARENT_ACTION_NAME, BanParentTaskRequest::new, ThreadPool.Names.SAME, new
BanParentRequestHandler());
}
@Override
protected CancelTasksResponse newResponse(CancelTasksRequest request, List<TaskInfo> tasks, List<TaskOperationFailure>
taskOperationFailures, List<FailedNodeException> failedNodeExceptions) {
return new CancelTasksResponse(tasks, taskOperationFailures, failedNodeExceptions);
}
@Override
protected TaskInfo readTaskResponse(StreamInput in) throws IOException {
return new TaskInfo(in);
}
protected void processTasks(CancelTasksRequest request, Consumer<CancellableTask> operation) {
if (request.taskId() != BaseTasksRequest.ALL_TASKS) {
// we are only checking one task, we can optimize it
CancellableTask task = taskManager.getCancellableTask(request.taskId());
if (task != null) {
if (request.match(task)) {
operation.accept(task);
} else {
throw new IllegalArgumentException("task [" + request.taskId() + "] doesn't support this operation");
}
} else {
if (taskManager.getTask(request.taskId()) != null) {
// The task exists, but doesn't support cancellation
throw new IllegalArgumentException("task [" + request.taskId() + "] doesn't support cancellation");
} else {
throw new ResourceNotFoundException("task [{}] doesn't support cancellation", request.taskId());
}
}
} else {
for (CancellableTask task : taskManager.getCancellableTasks().values()) {
if (request.match(task)) {
operation.accept(task);
}
}
}
}
@Override
protected synchronized TaskInfo taskOperation(CancelTasksRequest request, CancellableTask cancellableTask) {
final BanLock banLock = new BanLock(nodes -> removeBanOnNodes(cancellableTask, nodes));
Set<String> childNodes = taskManager.cancel(cancellableTask, request.reason(), banLock::onTaskFinished);
if (childNodes != null) {
if (childNodes.isEmpty()) {
logger.trace("cancelling task {} with no children", cancellableTask.getId());
return cancellableTask.taskInfo(clusterService.localNode(), false);
} else {
logger.trace("cancelling task {} with children on nodes [{}]", cancellableTask.getId(), childNodes);
setBanOnNodes(request.reason(), cancellableTask, childNodes, banLock);
return cancellableTask.taskInfo(clusterService.localNode(), false);
}
} else {
logger.trace("task {} is already cancelled", cancellableTask.getId());
throw new IllegalStateException("task with id " + cancellableTask.getId() + " is already cancelled");
}
}
@Override
protected boolean accumulateExceptions() {
return true;
}
private void setBanOnNodes(String reason, CancellableTask task, Set<String> nodes, BanLock banLock) {
sendSetBanRequest(nodes, new BanParentTaskRequest(clusterService.localNode().getId(), task.getId(), reason), banLock);
}
private void removeBanOnNodes(CancellableTask task, Set<String> nodes) {
sendRemoveBanRequest(nodes, new BanParentTaskRequest(clusterService.localNode().getId(), task.getId()));
}
private void sendSetBanRequest(Set<String> nodes, BanParentTaskRequest request, BanLock banLock) {
ClusterState clusterState = clusterService.state();
for (String node : nodes) {
DiscoveryNode discoveryNode = clusterState.getNodes().get(node);
if (discoveryNode != null) {
// Check if node still in the cluster
logger.debug("Sending ban for tasks with the parent [{}:{}] to the node [{}], ban [{}]", request.parentNodeId, request
.parentTaskId, node, request.ban);
transportService.sendRequest(discoveryNode, BAN_PARENT_ACTION_NAME, request,
new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleResponse(TransportResponse.Empty response) {
banLock.onBanSet();
}
@Override
public void handleException(TransportException exp) {
banLock.onBanSet();
}
});
} else {
banLock.onBanSet();
logger.debug("Cannot send ban for tasks with the parent [{}:{}] to the node [{}] - the node no longer in the cluster",
request.parentNodeId, request.parentTaskId, node);
}
}
}
private void sendRemoveBanRequest(Set<String> nodes, BanParentTaskRequest request) {
ClusterState clusterState = clusterService.state();
for (String node : nodes) {
DiscoveryNode discoveryNode = clusterState.getNodes().get(node);
if (discoveryNode != null) {
// Check if node still in the cluster
logger.debug("Sending remove ban for tasks with the parent [{}:{}] to the node [{}]", request.parentNodeId,
request.parentTaskId, node);
transportService.sendRequest(discoveryNode, BAN_PARENT_ACTION_NAME, request, EmptyTransportResponseHandler
.INSTANCE_SAME);
} else {
logger.debug("Cannot send remove ban request for tasks with the parent [{}:{}] to the node [{}] - the node no longer in " +
"the cluster", request.parentNodeId, request.parentTaskId, node);
}
}
}
private static class BanLock {
private final Consumer<Set<String>> finish;
private final AtomicInteger counter;
private final AtomicReference<Set<String>> nodes = new AtomicReference<>();
public BanLock(Consumer<Set<String>> finish) {
counter = new AtomicInteger(0);
this.finish = finish;
}
public void onBanSet() {
if (counter.decrementAndGet() == 0) {
finish();
}
}
public void onTaskFinished(Set<String> nodes) {
this.nodes.set(nodes);
if (counter.addAndGet(nodes.size()) == 0) {
finish();
}
}
public void finish() {
finish.accept(nodes.get());
}
}
private static class BanParentTaskRequest extends TransportRequest {
private String parentNodeId;
private long parentTaskId;
private boolean ban;
private String reason;
BanParentTaskRequest(String parentNodeId, long parentTaskId, String reason) {
this.parentNodeId = parentNodeId;
this.parentTaskId = parentTaskId;
this.ban = true;
this.reason = reason;
}
BanParentTaskRequest(String parentNodeId, long parentTaskId) {
this.parentNodeId = parentNodeId;
this.parentTaskId = parentTaskId;
this.ban = false;
}
public BanParentTaskRequest() {
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
parentNodeId = in.readString();
parentTaskId = in.readLong();
ban = in.readBoolean();
if (ban) {
reason = in.readString();
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(parentNodeId);
out.writeLong(parentTaskId);
out.writeBoolean(ban);
if (ban) {
out.writeString(reason);
}
}
}
class BanParentRequestHandler implements TransportRequestHandler<BanParentTaskRequest> {
@Override
public void messageReceived(final BanParentTaskRequest request, final TransportChannel channel) throws Exception {
if (request.ban) {
logger.debug("Received ban for the parent [{}:{}] on the node [{}], reason: [{}]", request.parentNodeId, request
.parentTaskId, clusterService.localNode().getId(), request.reason);
taskManager.setBan(request.parentNodeId, request.parentTaskId, request.reason);
} else {
logger.debug("Removing ban for the parent [{}:{}] on the node [{}]", request.parentNodeId, request.parentTaskId,
clusterService.localNode().getId());
taskManager.removeBan(request.parentNodeId, request.parentTaskId);
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
}

View File

@ -105,7 +105,9 @@ public class ListTasksResponse extends BaseTasksResponse implements ToXContent {
if (getTaskFailures() != null && getTaskFailures().size() > 0) {
builder.startArray("task_failures");
for (TaskOperationFailure ex : getTaskFailures()){
builder.startObject();
builder.value(ex);
builder.endObject();
}
builder.endArray();
}
@ -113,7 +115,9 @@ public class ListTasksResponse extends BaseTasksResponse implements ToXContent {
if (getNodeFailures() != null && getNodeFailures().size() > 0) {
builder.startArray("node_failures");
for (FailedNodeException ex : getNodeFailures()) {
builder.value(ex);
builder.startObject();
ex.toXContent(builder, params);
builder.endObject();
}
builder.endArray();
}

View File

@ -30,17 +30,17 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
/**
*
*/
public class TransportListTasksAction extends TransportTasksAction<ListTasksRequest, ListTasksResponse, TaskInfo> {
public class TransportListTasksAction extends TransportTasksAction<Task, ListTasksRequest, ListTasksResponse, TaskInfo> {
@Inject
public TransportListTasksAction(Settings settings, ClusterName clusterName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {

View File

@ -44,20 +44,6 @@ public abstract class ChildTaskActionRequest<Request extends ActionRequest<Reque
this.parentTaskId = parentTaskId;
}
/**
* The node that owns the parent task.
*/
public String getParentTaskNode() {
return parentTaskNode;
}
/**
* The task id of the parent task on the parent node.
*/
public long getParentTaskId() {
return parentTaskId;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
@ -73,8 +59,12 @@ public abstract class ChildTaskActionRequest<Request extends ActionRequest<Reque
}
@Override
public Task createTask(long id, String type, String action) {
return new Task(id, type, action, this::getDescription, parentTaskNode, parentTaskId);
public final Task createTask(long id, String type, String action) {
return createTask(id, type, action, parentTaskNode, parentTaskId);
}
public Task createTask(long id, String type, String action, String parentTaskNode, long parentTaskId) {
return new Task(id, type, action, getDescription(), parentTaskNode, parentTaskId);
}
}

View File

@ -58,7 +58,11 @@ public class ChildTaskRequest extends TransportRequest {
}
@Override
public Task createTask(long id, String type, String action) {
return new Task(id, type, action, this::getDescription, parentTaskNode, parentTaskId);
public final Task createTask(long id, String type, String action) {
return createTask(id, type, action, parentTaskNode, parentTaskId);
}
public Task createTask(long id, String type, String action, String parentTaskNode, long parentTaskId) {
return new Task(id, type, action, getDescription(), parentTaskNode, parentTaskId);
}
}

View File

@ -87,6 +87,10 @@ public abstract class TransportBroadcastAction<Request extends BroadcastRequest<
protected abstract ShardResponse shardOperation(ShardRequest request);
protected ShardResponse shardOperation(ShardRequest request, Task task) {
return shardOperation(request);
}
/**
* Determines the shards this operation will be executed on. The operation is executed once per shard iterator, typically
* on the first shard in it. If the operation fails, it will be retried on the next shard in the iterator.
@ -172,6 +176,7 @@ public abstract class TransportBroadcastAction<Request extends BroadcastRequest<
// no node connected, act as failure
onOperation(shard, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
} else {
taskManager.registerChildTask(task, node.getId());
transportService.sendRequest(node, transportShardAction, shardRequest, new BaseTransportResponseHandler<ShardResponse>() {
@Override
public ShardResponse newInstance() {
@ -278,8 +283,13 @@ public abstract class TransportBroadcastAction<Request extends BroadcastRequest<
class ShardTransportHandler implements TransportRequestHandler<ShardRequest> {
@Override
public void messageReceived(final ShardRequest request, final TransportChannel channel) throws Exception {
public void messageReceived(ShardRequest request, TransportChannel channel, Task task) throws Exception {
channel.sendResponse(shardOperation(request));
}
@Override
public final void messageReceived(final ShardRequest request, final TransportChannel channel) throws Exception {
throw new UnsupportedOperationException("the task parameter is required");
}
}
}

View File

@ -301,6 +301,7 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
NodeRequest nodeRequest = new NodeRequest(node.getId(), request, shards);
if (task != null) {
nodeRequest.setParentTask(clusterService.localNode().id(), task.getId());
taskManager.registerChildTask(task, node.getId());
}
transportService.sendRequest(node, transportNodeBroadcastAction, nodeRequest, new BaseTransportResponseHandler<NodeResponse>() {
@Override

View File

@ -159,6 +159,7 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
}
}
};
taskManager.registerChildTask(task, nodes.getLocalNodeId());
threadPool.executor(executor).execute(new ActionRunnable(delegate) {
@Override
protected void doRun() throws Exception {
@ -171,6 +172,7 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
logger.debug("no known master node, scheduling a retry");
retry(null, MasterNodeChangePredicate.INSTANCE);
} else {
taskManager.registerChildTask(task, nodes.masterNode().getId());
transportService.sendRequest(nodes.masterNode(), actionName, request, new ActionListenerResponseHandler<Response>(listener) {
@Override
public Response newInstance() {

View File

@ -95,6 +95,10 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
protected abstract NodeResponse nodeOperation(NodeRequest request);
protected NodeResponse nodeOperation(NodeRequest request, Task task) {
return nodeOperation(request);
}
protected abstract boolean accumulateExceptions();
protected String[] filterNodeIds(DiscoveryNodes nodes, String[] nodesIds) {
@ -163,6 +167,7 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
ChildTaskRequest nodeRequest = newNodeRequest(nodeId, request);
if (task != null) {
nodeRequest.setParentTask(clusterService.localNode().id(), task.getId());
taskManager.registerChildTask(task, node.getId());
}
transportService.sendRequest(node, transportNodeAction, nodeRequest, builder.build(), new BaseTransportResponseHandler<NodeResponse>() {
@ -228,8 +233,14 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
class NodeTransportHandler implements TransportRequestHandler<NodeRequest> {
@Override
public void messageReceived(final NodeRequest request, final TransportChannel channel) throws Exception {
public void messageReceived(NodeRequest request, TransportChannel channel, Task task) throws Exception {
channel.sendResponse(nodeOperation(request, task));
}
@Override
public void messageReceived(NodeRequest request, TransportChannel channel) throws Exception {
channel.sendResponse(nodeOperation(request));
}
}
}

View File

@ -196,8 +196,8 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
}
@Override
public Task createTask(long id, String type, String action) {
return new ReplicationTask(id, type, action, this::getDescription, getParentTaskNode(), getParentTaskId());
public Task createTask(long id, String type, String action, String parentTaskNode, long parentTaskId) {
return new ReplicationTask(id, type, action, getDescription(), parentTaskNode, parentTaskId);
}
/**
@ -218,4 +218,9 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
return index;
}
}
@Override
public String getDescription() {
return toString();
}
}

View File

@ -35,7 +35,7 @@ import static java.util.Objects.requireNonNull;
public class ReplicationTask extends Task {
private volatile String phase = "starting";
public ReplicationTask(long id, String type, String action, Provider<String> description, String parentNode, long parentId) {
public ReplicationTask(long id, String type, String action, String description, String parentNode, long parentId) {
super(id, type, action, description, parentNode, parentId);
}

View File

@ -121,6 +121,7 @@ public abstract class TransportBroadcastReplicationAction<Request extends Broadc
protected void shardExecute(Task task, Request request, ShardId shardId, ActionListener<ShardResponse> shardActionListener) {
ShardRequest shardRequest = newShardRequest(request, shardId);
shardRequest.setParentTask(clusterService.localNode().getId(), task.getId());
taskManager.registerChildTask(task, clusterService.localNode().getId());
replicatedBroadcastShardAction.execute(shardRequest, shardActionListener);
}

View File

@ -486,6 +486,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
return;
}
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
taskManager.registerChildTask(task, node.getId());
if (primary.currentNodeId().equals(state.nodes().localNodeId())) {
setPhase(task, "waiting_on_primary");
if (logger.isTraceEnabled()) {

View File

@ -35,7 +35,6 @@ import java.io.IOException;
*/
public class BaseTasksRequest<Request extends BaseTasksRequest<Request>> extends ActionRequest<Request> {
public static final String[] ALL_ACTIONS = Strings.EMPTY_ARRAY;
public static final String[] ALL_NODES = Strings.EMPTY_ARRAY;
@ -52,6 +51,8 @@ public class BaseTasksRequest<Request extends BaseTasksRequest<Request>> extends
private long parentTaskId = ALL_TASKS;
private long taskId = ALL_TASKS;
public BaseTasksRequest() {
}
@ -94,6 +95,22 @@ public class BaseTasksRequest<Request extends BaseTasksRequest<Request>> extends
return (Request) this;
}
/**
* Returns the id of the task that should be processed.
*
* By default tasks with any ids are returned.
*/
public long taskId() {
return taskId;
}
@SuppressWarnings("unchecked")
public final Request taskId(long taskId) {
this.taskId = taskId;
return (Request) this;
}
/**
* Returns the parent node id that tasks should be filtered by
*/
@ -141,6 +158,7 @@ public class BaseTasksRequest<Request extends BaseTasksRequest<Request>> extends
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
nodesIds = in.readStringArray();
taskId = in.readLong();
actions = in.readStringArray();
parentNode = in.readOptionalString();
parentTaskId = in.readLong();
@ -153,6 +171,7 @@ public class BaseTasksRequest<Request extends BaseTasksRequest<Request>> extends
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArrayNullable(nodesIds);
out.writeLong(taskId);
out.writeStringArrayNullable(actions);
out.writeOptionalString(parentNode);
out.writeLong(parentTaskId);
@ -163,12 +182,17 @@ public class BaseTasksRequest<Request extends BaseTasksRequest<Request>> extends
if (actions() != null && actions().length > 0 && Regex.simpleMatch(actions(), task.getAction()) == false) {
return false;
}
if (taskId() != ALL_TASKS) {
if(taskId() != task.getId()) {
return false;
}
}
if (parentNode() != null) {
if (parentNode().equals(task.getParentNode()) == false) {
return false;
}
}
if (parentTaskId() != BaseTasksRequest.ALL_TASKS) {
if (parentTaskId() != ALL_TASKS) {
if (parentTaskId() != task.getParentId()) {
return false;
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.support.tasks;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.NoSuchNodeException;
@ -53,12 +54,14 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.Consumer;
import java.util.function.Supplier;
/**
* The base class for transport actions that are interacting with currently running tasks.
*/
public abstract class TransportTasksAction<
OperationTask extends Task,
TasksRequest extends BaseTasksRequest<TasksRequest>,
TasksResponse extends BaseTasksResponse,
TaskResponse extends Writeable<TaskResponse>
@ -103,16 +106,16 @@ public abstract class TransportTasksAction<
TasksRequest request = nodeTaskRequest.tasksRequest;
List<TaskResponse> results = new ArrayList<>();
List<TaskOperationFailure> exceptions = new ArrayList<>();
for (Task task : taskManager.getTasks().values()) {
// First check action and node filters
if (request.match(task)) {
processTasks(request, task -> {
try {
results.add(taskOperation(request, task));
TaskResponse response = taskOperation(request, task);
if (response != null) {
results.add(response);
}
} catch (Exception ex) {
exceptions.add(new TaskOperationFailure(clusterService.localNode().id(), task.getId(), ex));
}
}
}
});
return new NodeTasksResponse(clusterService.localNode().id(), results, exceptions);
}
@ -124,6 +127,28 @@ public abstract class TransportTasksAction<
return clusterState.nodes().resolveNodesIds(request.nodesIds());
}
protected void processTasks(TasksRequest request, Consumer<OperationTask> operation) {
if (request.taskId() != BaseTasksRequest.ALL_TASKS) {
// we are only checking one task, we can optimize it
Task task = taskManager.getTask(request.taskId());
if (task != null) {
if (request.match(task)) {
operation.accept((OperationTask) task);
} else {
throw new ResourceNotFoundException("task [{}] doesn't support this operation", request.taskId());
}
} else {
throw new ResourceNotFoundException("task [{}] is missing", request.taskId());
}
} else {
for (Task task : taskManager.getTasks().values()) {
if (request.match(task)) {
operation.accept((OperationTask)task);
}
}
}
}
protected abstract TasksResponse newResponse(TasksRequest request, List<TaskResponse> tasks, List<TaskOperationFailure> taskOperationFailures, List<FailedNodeException> failedNodeExceptions);
@SuppressWarnings("unchecked")
@ -150,7 +175,7 @@ public abstract class TransportTasksAction<
protected abstract TaskResponse readTaskResponse(StreamInput in) throws IOException;
protected abstract TaskResponse taskOperation(TasksRequest request, Task task);
protected abstract TaskResponse taskOperation(TasksRequest request, OperationTask task);
protected boolean transportCompress() {
return false;
@ -213,6 +238,7 @@ public abstract class TransportTasksAction<
} else {
NodeTaskRequest nodeRequest = new NodeTaskRequest(request);
nodeRequest.setParentTask(clusterService.localNode().id(), task.getId());
taskManager.registerChildTask(task, node.getId());
transportService.sendRequest(node, transportNodeAction, nodeRequest, builder.build(), new BaseTransportResponseHandler<NodeTasksResponse>() {
@Override
public NodeTasksResponse newInstance() {

View File

@ -33,6 +33,9 @@ import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequestBuilder;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequestBuilder;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequestBuilder;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
@ -287,6 +290,29 @@ public interface ClusterAdminClient extends ElasticsearchClient {
*/
ListTasksRequestBuilder prepareListTasks(String... nodesIds);
/**
* Cancel tasks
*
* @param request The nodes tasks request
* @return The result future
* @see org.elasticsearch.client.Requests#cancelTasksRequest(String...)
*/
ActionFuture<CancelTasksResponse> cancelTasks(CancelTasksRequest request);
/**
* Cancel active tasks
*
* @param request The nodes tasks request
* @param listener A cancelener to be notified with a result
* @see org.elasticsearch.client.Requests#cancelTasksRequest(String...)
*/
void cancelTasks(CancelTasksRequest request, ActionListener<CancelTasksResponse> listener);
/**
* Cancel active tasks
*/
CancelTasksRequestBuilder prepareCancelTasks(String... nodesIds);
/**
* Returns list of shards the given search would be executed on.
*/

View File

@ -22,6 +22,7 @@ package org.elasticsearch.client;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
@ -420,12 +421,23 @@ public class Requests {
*
* @param nodesIds The nodes ids to get the tasks for
* @return The nodes tasks request
* @see org.elasticsearch.client.ClusterAdminClient#nodesStats(org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest)
* @see org.elasticsearch.client.ClusterAdminClient#listTasks(ListTasksRequest)
*/
public static ListTasksRequest listTasksRequest(String... nodesIds) {
return new ListTasksRequest(nodesIds);
}
/**
* Creates a nodes tasks request against one or more nodes. Pass <tt>null</tt> or an empty array for all nodes.
*
* @param nodesIds The nodes ids to cancel the tasks on
* @return The nodes tasks request
* @see org.elasticsearch.client.ClusterAdminClient#cancelTasks(CancelTasksRequest)
*/
public static CancelTasksRequest cancelTasksRequest(String... nodesIds) {
return new CancelTasksRequest(nodesIds);
}
/**
* Registers snapshot repository
*

View File

@ -41,6 +41,10 @@ import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsAction;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequestBuilder;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksAction;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequestBuilder;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequestBuilder;
@ -992,6 +996,22 @@ public abstract class AbstractClient extends AbstractComponent implements Client
return new ListTasksRequestBuilder(this, ListTasksAction.INSTANCE).setNodesIds(nodesIds);
}
@Override
public ActionFuture<CancelTasksResponse> cancelTasks(CancelTasksRequest request) {
return execute(CancelTasksAction.INSTANCE, request);
}
@Override
public void cancelTasks(CancelTasksRequest request, ActionListener<CancelTasksResponse> listener) {
execute(CancelTasksAction.INSTANCE, request, listener);
}
@Override
public CancelTasksRequestBuilder prepareCancelTasks(String... nodesIds) {
return new CancelTasksRequestBuilder(this, CancelTasksAction.INSTANCE).setNodesIds(nodesIds);
}
@Override
public ActionFuture<ClusterSearchShardsResponse> searchShards(final ClusterSearchShardsRequest request) {
return execute(ClusterSearchShardsAction.INSTANCE, request);

View File

@ -189,6 +189,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
@Override
protected void doStart() {
add(localNodeMasterListeners);
add(taskManager);
this.clusterState = ClusterState.builder(clusterState).blocks(initialBlocks).build();
this.updateTasksExecutor = EsExecutors.newSinglePrioritizing(UPDATE_THREAD_NAME, daemonThreadFactory(settings, UPDATE_THREAD_NAME), threadPool.getThreadContext());
this.reconnectToNodes = threadPool.schedule(reconnectInterval, ThreadPool.Names.GENERIC, new ReconnectToNodes());

View File

@ -40,6 +40,7 @@ import org.elasticsearch.rest.action.admin.cluster.health.RestClusterHealthActio
import org.elasticsearch.rest.action.admin.cluster.node.hotthreads.RestNodesHotThreadsAction;
import org.elasticsearch.rest.action.admin.cluster.node.info.RestNodesInfoAction;
import org.elasticsearch.rest.action.admin.cluster.node.stats.RestNodesStatsAction;
import org.elasticsearch.rest.action.admin.cluster.node.tasks.RestCancelTasksAction;
import org.elasticsearch.rest.action.admin.cluster.node.tasks.RestListTasksAction;
import org.elasticsearch.rest.action.admin.cluster.repositories.delete.RestDeleteRepositoryAction;
import org.elasticsearch.rest.action.admin.cluster.repositories.get.RestGetRepositoriesAction;
@ -265,6 +266,7 @@ public class NetworkModule extends AbstractModule {
// Tasks API
RestListTasksAction.class,
RestCancelTasksAction.class,
// Ingest API
RestPutPipelineAction.class,

View File

@ -0,0 +1,62 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.admin.cluster.node.tasks;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.support.RestToXContentListener;
import static org.elasticsearch.rest.RestRequest.Method.POST;
public class RestCancelTasksAction extends BaseRestHandler {
@Inject
public RestCancelTasksAction(Settings settings, RestController controller, Client client) {
super(settings, client);
controller.registerHandler(POST, "/_tasks/_cancel", this);
controller.registerHandler(POST, "/_tasks/{nodeId}/_cancel", this);
controller.registerHandler(POST, "/_tasks/{nodeId}/{taskId}/_cancel", this);
}
@Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
String[] nodesIds = Strings.splitStringByCommaToArray(request.param("nodeId"));
long taskId = request.paramAsLong("taskId", ListTasksRequest.ALL_TASKS);
String[] actions = Strings.splitStringByCommaToArray(request.param("actions"));
String parentNode = request.param("parent_node");
long parentTaskId = request.paramAsLong("parent_task", ListTasksRequest.ALL_TASKS);
CancelTasksRequest cancelTasksRequest = new CancelTasksRequest(nodesIds);
cancelTasksRequest.taskId(taskId);
cancelTasksRequest.actions(actions);
cancelTasksRequest.parentNode(parentNode);
cancelTasksRequest.parentTaskId(parentTaskId);
client.admin().cluster().cancelTasks(cancelTasksRequest, new RestToXContentListener<>(channel));
}
}

View File

@ -40,18 +40,20 @@ public class RestListTasksAction extends BaseRestHandler {
super(settings, client);
controller.registerHandler(GET, "/_tasks", this);
controller.registerHandler(GET, "/_tasks/{nodeId}", this);
controller.registerHandler(GET, "/_tasks/{nodeId}/{actions}", this);
controller.registerHandler(GET, "/_tasks/{nodeId}/{taskId}", this);
}
@Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
boolean detailed = request.paramAsBoolean("detailed", false);
String[] nodesIds = Strings.splitStringByCommaToArray(request.param("nodeId"));
long taskId = request.paramAsLong("taskId", ListTasksRequest.ALL_TASKS);
String[] actions = Strings.splitStringByCommaToArray(request.param("actions"));
String parentNode = request.param("parent_node");
long parentTaskId = request.paramAsLong("parent_task", ListTasksRequest.ALL_TASKS);
ListTasksRequest listTasksRequest = new ListTasksRequest(nodesIds);
listTasksRequest.taskId(taskId);
listTasksRequest.detailed(detailed);
listTasksRequest.actions(actions);
listTasksRequest.parentNode(parentNode);

View File

@ -0,0 +1,59 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.tasks;
import java.util.concurrent.atomic.AtomicReference;
/**
* A task that can be canceled
*/
public class CancellableTask extends Task {
private final AtomicReference<String> reason = new AtomicReference<>();
public CancellableTask(long id, String type, String action, String description) {
super(id, type, action, description);
}
public CancellableTask(long id, String type, String action, String description, String parentNode, long parentId) {
super(id, type, action, description, parentNode, parentId);
}
/**
* This method is called by the task manager when this task is cancelled.
*/
final void cancel(String reason) {
assert reason != null;
this.reason.compareAndSet(null, reason);
}
/**
* Returns true if this task should be automatically cancelled if the coordinating node that
* requested this task left the cluster.
*/
public boolean cancelOnParentLeaving() {
return true;
}
public boolean isCancelled() {
return reason.get() != null;
}
}

View File

@ -22,7 +22,6 @@ package org.elasticsearch.tasks;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Provider;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.xcontent.ToXContent;
@ -39,18 +38,18 @@ public class Task {
private final String action;
private final Provider<String> description;
private final String description;
private final String parentNode;
private final long parentId;
public Task(long id, String type, String action, Provider<String> description) {
public Task(long id, String type, String action, String description) {
this(id, type, action, description, null, NO_PARENT_ID);
}
public Task(long id, String type, String action, Provider<String> description, String parentNode, long parentId) {
public Task(long id, String type, String action, String description, String parentNode, long parentId) {
this.id = id;
this.type = type;
this.action = action;
@ -104,7 +103,7 @@ public class Task {
* Generates task description
*/
public String getDescription() {
return description.get();
return description;
}
/**

View File

@ -19,34 +19,50 @@
package org.elasticsearch.tasks;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
import org.elasticsearch.transport.TransportRequest;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
/**
* Task Manager service for keeping track of currently running tasks on the nodes
*/
public class TaskManager extends AbstractComponent {
public class TaskManager extends AbstractComponent implements ClusterStateListener {
private final ConcurrentMapLong<Task> tasks = ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency();
private final ConcurrentMapLong<CancellableTaskHolder> cancellableTasks = ConcurrentCollections
.newConcurrentMapLongWithAggressiveConcurrency();
private final AtomicLong taskIdGenerator = new AtomicLong();
private final Map<Tuple<String, Long>, String> banedParents = new ConcurrentHashMap<>();
public TaskManager(Settings settings) {
super(settings);
}
private DiscoveryNodes lastDiscoveryNodes = DiscoveryNodes.EMPTY_NODES;
/**
* Registers a task without parent task
* <p>
* Returns the task manager tracked task or null if the task doesn't support the task manager
*/
public Task register(String type, String action, TransportRequest request) {
Task task = request.createTask(taskIdGenerator.incrementAndGet(), type, action);
@ -54,24 +70,291 @@ public class TaskManager extends AbstractComponent {
if (logger.isTraceEnabled()) {
logger.trace("register {} [{}] [{}] [{}]", task.getId(), type, action, task.getDescription());
}
if (task instanceof CancellableTask) {
CancellableTask cancellableTask = (CancellableTask) task;
CancellableTaskHolder holder = new CancellableTaskHolder(cancellableTask);
CancellableTaskHolder oldHolder = cancellableTasks.put(task.getId(), holder);
assert oldHolder == null;
// Check if this task was banned before we start it
if (task.getParentNode() != null && banedParents.isEmpty() == false) {
String reason = banedParents.get(new Tuple<>(task.getParentNode(), task.getParentId()));
if (reason != null) {
try {
holder.cancel(reason);
throw new IllegalStateException("Task cancelled before it started: " + reason);
} finally {
// let's clean up the registration
unregister(task);
}
}
}
} else {
Task previousTask = tasks.put(task.getId(), task);
assert previousTask == null;
}
}
return task;
}
/**
* Cancels a task
* <p>
* Returns a set of nodes with child tasks where this task should be cancelled if cancellation was successful, null otherwise.
*/
public Set<String> cancel(CancellableTask task, String reason, Consumer<Set<String>> listener) {
CancellableTaskHolder holder = cancellableTasks.get(task.getId());
if (holder != null) {
logger.trace("cancelling task with id {}", task.getId());
return holder.cancel(reason, listener);
}
return null;
}
/**
* Unregister the task
*/
public Task unregister(Task task) {
logger.trace("unregister task for id: {}", task.getId());
if (task instanceof CancellableTask) {
CancellableTaskHolder holder = cancellableTasks.remove(task.getId());
if (holder != null) {
holder.finish();
return holder.getTask();
} else {
return null;
}
} else {
return tasks.remove(task.getId());
}
}
/**
* Returns the list of currently running tasks on the node
*/
public Map<Long, Task> getTasks() {
return Collections.unmodifiableMap(new HashMap<>(tasks));
HashMap<Long, Task> taskHashMap = new HashMap<>(this.tasks);
for (CancellableTaskHolder holder : cancellableTasks.values()) {
taskHashMap.put(holder.getTask().getId(), holder.getTask());
}
return Collections.unmodifiableMap(taskHashMap);
}
/**
* Returns the list of currently running tasks on the node that can be cancelled
*/
public Map<Long, CancellableTask> getCancellableTasks() {
HashMap<Long, CancellableTask> taskHashMap = new HashMap<>();
for (CancellableTaskHolder holder : cancellableTasks.values()) {
taskHashMap.put(holder.getTask().getId(), holder.getTask());
}
return Collections.unmodifiableMap(taskHashMap);
}
/**
* Returns a task with given id, or null if the task is not found.
*/
public Task getTask(long id) {
Task task = tasks.get(id);
if (task != null) {
return task;
} else {
return getCancellableTask(id);
}
}
/**
* Returns a cancellable task with given id, or null if the task is not found.
*/
public CancellableTask getCancellableTask(long id) {
CancellableTaskHolder holder = cancellableTasks.get(id);
if (holder != null) {
return holder.getTask();
} else {
return null;
}
}
/**
* Returns the number of currently banned tasks.
* <p>
* Will be used in task manager stats and for debugging.
*/
public int getBanCount() {
return banedParents.size();
}
/**
* Bans all tasks with the specified parent task from execution, cancels all tasks that are currently executing.
* <p>
* This method is called when a parent task that has children is cancelled.
*/
public void setBan(String parentNode, long parentId, String reason) {
logger.trace("setting ban for the parent task {}:{} {}", parentNode, parentId, reason);
// Set the ban first, so the newly created tasks cannot be registered
Tuple<String, Long> ban = new Tuple<>(parentNode, parentId);
synchronized (banedParents) {
if (lastDiscoveryNodes.nodeExists(parentNode)) {
// Only set the ban if the node is the part of the cluster
banedParents.put(ban, reason);
}
}
// Now go through already running tasks and cancel them
for (Map.Entry<Long, CancellableTaskHolder> taskEntry : cancellableTasks.entrySet()) {
CancellableTaskHolder holder = taskEntry.getValue();
if (holder.hasParent(parentNode, parentId)) {
holder.cancel(reason);
}
}
}
/**
* Removes the ban for the specified parent task.
* <p>
* This method is called when a previously banned task finally cancelled
*/
public void removeBan(String parentNode, long parentId) {
logger.trace("removing ban for the parent task {}:{} {}", parentNode, parentId);
banedParents.remove(new Tuple<>(parentNode, parentId));
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.nodesRemoved()) {
synchronized (banedParents) {
lastDiscoveryNodes = event.state().getNodes();
// Remove all bans that were registered by nodes that are no longer in the cluster state
Iterator<Tuple<String, Long>> banIterator = banedParents.keySet().iterator();
while (banIterator.hasNext()) {
Tuple<String, Long> nodeAndTaskId = banIterator.next();
String nodeId = nodeAndTaskId.v1();
Long taskId = nodeAndTaskId.v2();
if (lastDiscoveryNodes.nodeExists(nodeId) == false) {
logger.debug("Removing ban for the parent [{}:{}] on the node [{}], reason: the parent node is gone", nodeId,
taskId, event.state().getNodes().localNode());
banIterator.remove();
}
}
}
// Cancel cancellable tasks for the nodes that are gone
for (Map.Entry<Long, CancellableTaskHolder> taskEntry : cancellableTasks.entrySet()) {
CancellableTaskHolder holder = taskEntry.getValue();
CancellableTask task = holder.getTask();
String parent = task.getParentNode();
if (parent != null && lastDiscoveryNodes.nodeExists(parent) == false) {
if (task.cancelOnParentLeaving()) {
holder.cancel("Coordinating node [" + parent + "] left the cluster");
}
}
}
}
}
public void registerChildTask(Task task, String node) {
if (task == null || task instanceof CancellableTask == false) {
// We don't have a cancellable task - not much we can do here
return;
}
CancellableTaskHolder holder = cancellableTasks.get(task.getId());
if (holder != null) {
holder.registerChildTaskNode(node);
}
}
private static class CancellableTaskHolder {
private static final String TASK_FINISHED_MARKER = "task finished";
private final CancellableTask task;
private final Set<String> nodesWithChildTasks = new HashSet<>();
private volatile String cancellationReason = null;
private volatile Consumer<Set<String>> cancellationListener = null;
public CancellableTaskHolder(CancellableTask task) {
this.task = task;
}
/**
* Marks task as cancelled.
* <p>
* Returns a set of nodes with child tasks where this task should be cancelled if cancellation was successful, null otherwise.
*/
public Set<String> cancel(String reason, Consumer<Set<String>> listener) {
Set<String> nodes;
synchronized (this) {
assert reason != null;
if (cancellationReason == null) {
cancellationReason = reason;
cancellationListener = listener;
nodes = Collections.unmodifiableSet(nodesWithChildTasks);
} else {
// Already cancelled by somebody else
nodes = null;
}
}
if (nodes != null) {
task.cancel(reason);
}
return nodes;
}
/**
* Marks task as cancelled.
* <p>
* Returns a set of nodes with child tasks where this task should be cancelled if cancellation was successful, null otherwise.
*/
public Set<String> cancel(String reason) {
return cancel(reason, null);
}
/**
* Marks task as finished.
*/
public void finish() {
Consumer<Set<String>> listener = null;
Set<String> nodes = null;
synchronized (this) {
if (cancellationReason != null) {
// The task was cancelled, we need to notify the listener
if (cancellationListener != null) {
listener = cancellationListener;
nodes = Collections.unmodifiableSet(nodesWithChildTasks);
cancellationListener = null;
}
} else {
cancellationReason = TASK_FINISHED_MARKER;
}
}
// We need to call the listener outside of the synchronised section to avoid potential bottle necks
// in the listener synchronization
if (listener != null) {
listener.accept(nodes);
}
}
public boolean hasParent(String parentNode, long parentId) {
return parentId == task.getParentId() && parentNode.equals(task.getParentNode());
}
public CancellableTask getTask() {
return task;
}
public synchronized void registerChildTaskNode(String nodeId) {
if (cancellationReason == null) {
nodesWithChildTasks.add(nodeId);
} else {
throw new IllegalStateException("cannot register child task request, the task is already cancelled");
}
}
}
}

View File

@ -33,14 +33,20 @@ public abstract class TransportRequest extends TransportMessage<TransportRequest
}
/**
* Returns the task object that should be used to keep track of the processing of the request.
*
* A request can override this method and return null to avoid being tracked by the task manager.
*/
public Task createTask(long id, String type, String action) {
return new Task(id, type, action, this::getDescription);
return new Task(id, type, action, getDescription());
}
/**
* Returns optional description of the request to be displayed by the task manager
*/
public String getDescription() {
return this.toString();
return "";
}
}

View File

@ -0,0 +1,384 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.node.tasks;
import com.carrotsearch.randomizedtesting.RandomizedContext;
import com.carrotsearch.randomizedtesting.generators.RandomInts;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
public class CancellableTasksTests extends TaskManagerTestCase {
public static class CancellableNodeRequest extends BaseNodeRequest {
protected String requestName;
protected String nodeId;
public CancellableNodeRequest() {
super();
}
public CancellableNodeRequest(CancellableNodesRequest request, String nodeId) {
super(nodeId);
requestName = request.requestName;
this.nodeId = nodeId;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
requestName = in.readString();
nodeId = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(requestName);
out.writeString(nodeId);
}
@Override
public String getDescription() {
return "CancellableNodeRequest[" + requestName + ", " + nodeId + "]";
}
@Override
public Task createTask(long id, String type, String action, String parentTaskNode, long parentTaskId) {
return new CancellableTask(id, type, action, getDescription(), parentTaskNode, parentTaskId);
}
}
public static class CancellableNodesRequest extends BaseNodesRequest<CancellableNodesRequest> {
private String requestName;
private CancellableNodesRequest() {
super();
}
public CancellableNodesRequest(String requestName, String... nodesIds) {
super(nodesIds);
this.requestName = requestName;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
requestName = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(requestName);
}
@Override
public String getDescription() {
return "CancellableNodesRequest[" + requestName + "]";
}
@Override
public Task createTask(long id, String type, String action) {
return new CancellableTask(id, type, action, getDescription());
}
}
/**
* Simulates a cancellable node-based task that can be used to block node tasks so they are guaranteed to be registered by task manager
*/
class CancellableTestNodesAction extends AbstractTestNodesAction<CancellableNodesRequest, CancellableNodeRequest> {
// True if the node operation should get stuck until its cancelled
final boolean shouldBlock;
final CountDownLatch actionStartedLatch;
CancellableTestNodesAction(Settings settings, String actionName, ClusterName clusterName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService, boolean shouldBlock, CountDownLatch
actionStartedLatch) {
super(settings, actionName, clusterName, threadPool, clusterService, transportService, CancellableNodesRequest::new,
CancellableNodeRequest::new);
this.shouldBlock = shouldBlock;
this.actionStartedLatch = actionStartedLatch;
}
@Override
protected CancellableNodeRequest newNodeRequest(String nodeId, CancellableNodesRequest request) {
return new CancellableNodeRequest(request, nodeId);
}
@Override
protected NodeResponse nodeOperation(CancellableNodeRequest request, Task task) {
assert task instanceof CancellableTask;
debugDelay(request.nodeId, "op1");
if (actionStartedLatch != null) {
actionStartedLatch.countDown();
}
debugDelay(request.nodeId, "op2");
if (shouldBlock) {
// Simulate a job that takes forever to finish
// Using periodic checks method to identify that the task was cancelled
try {
awaitBusy(() -> {
if (((CancellableTask) task).isCancelled()) {
throw new RuntimeException("Cancelled");
}
return false;
});
fail("It should have thrown an exception");
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
debugDelay(request.nodeId, "op4");
return new NodeResponse(clusterService.localNode());
}
@Override
protected NodeResponse nodeOperation(CancellableNodeRequest request) {
throw new UnsupportedOperationException("the task parameter is required");
}
}
private Task startCancellableTestNodesAction(boolean waitForActionToStart, int blockedNodesCount, ActionListener<NodesResponse>
listener) throws InterruptedException {
return startCancellableTestNodesAction(waitForActionToStart, randomSubsetOf(blockedNodesCount, testNodes), new
CancellableNodesRequest("Test Request"), listener);
}
private Task startCancellableTestNodesAction(boolean waitForActionToStart, Collection<TestNode> blockOnNodes, CancellableNodesRequest
request, ActionListener<NodesResponse> listener) throws InterruptedException {
CountDownLatch actionLatch = waitForActionToStart ? new CountDownLatch(nodesCount) : null;
CancellableTestNodesAction[] actions = new CancellableTestNodesAction[nodesCount];
for (int i = 0; i < testNodes.length; i++) {
boolean shouldBlock = blockOnNodes.contains(testNodes[i]);
logger.info("The action in the node [{}] should block: [{}]", testNodes[i].discoveryNode.getId(), shouldBlock);
actions[i] = new CancellableTestNodesAction(Settings.EMPTY, "testAction", clusterName, threadPool, testNodes[i]
.clusterService, testNodes[i].transportService, shouldBlock, actionLatch);
}
Task task = actions[0].execute(request, listener);
if (waitForActionToStart) {
logger.info("Awaiting for all actions to start");
actionLatch.await();
logger.info("Done waiting for all actions to start");
}
return task;
}
public void testBasicTaskCancellation() throws Exception {
setupTestNodes(Settings.EMPTY);
connectNodes(testNodes);
CountDownLatch responseLatch = new CountDownLatch(1);
boolean waitForActionToStart = randomBoolean();
logger.info("waitForActionToStart is set to {}", waitForActionToStart);
final AtomicReference<NodesResponse> responseReference = new AtomicReference<>();
final AtomicReference<Throwable> throwableReference = new AtomicReference<>();
int blockedNodesCount = randomIntBetween(0, nodesCount);
Task mainTask = startCancellableTestNodesAction(waitForActionToStart, blockedNodesCount, new ActionListener<NodesResponse>() {
@Override
public void onResponse(NodesResponse listTasksResponse) {
responseReference.set(listTasksResponse);
responseLatch.countDown();
}
@Override
public void onFailure(Throwable e) {
throwableReference.set(e);
responseLatch.countDown();
}
});
// Cancel main task
CancelTasksRequest request = new CancelTasksRequest(testNodes[0].discoveryNode.getId());
request.reason("Testing Cancellation");
request.taskId(mainTask.getId());
// And send the cancellation request to a random node
CancelTasksResponse response = testNodes[randomIntBetween(0, testNodes.length - 1)].transportCancelTasksAction.execute(request)
.get();
// Awaiting for the main task to finish
responseLatch.await();
if (response.getTasks().size() == 0) {
// We didn't cancel the request and it finished successfully
// That should be rare and can be only in case we didn't block on a single node
assertEquals(0, blockedNodesCount);
// Make sure that the request was successful
assertNull(throwableReference.get());
assertNotNull(responseReference.get());
assertEquals(nodesCount, responseReference.get().getNodes().length);
assertEquals(0, responseReference.get().failureCount());
} else {
// We canceled the request, in this case it should have fail, but we should get partial response
assertNull(throwableReference.get());
assertEquals(nodesCount, responseReference.get().failureCount() + responseReference.get().getNodes().length);
// and we should have at least as many failures as the number of blocked operations
// (we might have cancelled some non-blocked operations before they even started and that's ok)
assertThat(responseReference.get().failureCount(), greaterThanOrEqualTo(blockedNodesCount));
// We should have the information about the cancelled task in the cancel operation response
assertEquals(1, response.getTasks().size());
assertEquals(mainTask.getId(), response.getTasks().get(0).getId());
}
// Make sure that tasks are no longer running
ListTasksResponse listTasksResponse = testNodes[randomIntBetween(0, testNodes.length - 1)]
.transportListTasksAction.execute(new ListTasksRequest(testNodes[0].discoveryNode.getId()).taskId(mainTask.getId())).get();
assertEquals(0, listTasksResponse.getTasks().size());
// Make sure that there are no leftover bans, the ban removal is async, so we might return from the cancellation
// while the ban is still there, but it should disappear shortly
assertBusy(() -> {
for (int i = 0; i < testNodes.length; i++) {
assertEquals("No bans on the node " + i, 0, testNodes[i].transportService.getTaskManager().getBanCount());
}
});
}
public void testTaskCancellationOnCoordinatingNodeLeavingTheCluster() throws Exception {
setupTestNodes(Settings.EMPTY);
connectNodes(testNodes);
CountDownLatch responseLatch = new CountDownLatch(1);
boolean simulateBanBeforeLeaving = randomBoolean();
final AtomicReference<NodesResponse> responseReference = new AtomicReference<>();
final AtomicReference<Throwable> throwableReference = new AtomicReference<>();
int blockedNodesCount = randomIntBetween(0, nodesCount - 1);
// We shouldn't block on the first node since it's leaving the cluster anyway so it doesn't matter
List<TestNode> blockOnNodes = randomSubsetOf(blockedNodesCount, Arrays.copyOfRange(testNodes, 1, nodesCount));
Task mainTask = startCancellableTestNodesAction(true, blockOnNodes, new CancellableNodesRequest("Test Request"), new
ActionListener<NodesResponse>() {
@Override
public void onResponse(NodesResponse listTasksResponse) {
responseReference.set(listTasksResponse);
responseLatch.countDown();
}
@Override
public void onFailure(Throwable e) {
throwableReference.set(e);
responseLatch.countDown();
}
});
String mainNode = testNodes[0].discoveryNode.getId();
// Make sure that tasks are running
ListTasksResponse listTasksResponse = testNodes[randomIntBetween(0, testNodes.length - 1)]
.transportListTasksAction.execute(new ListTasksRequest().parentNode(mainNode).taskId(mainTask.getId())).get();
assertThat(listTasksResponse.getTasks().size(), greaterThanOrEqualTo(blockOnNodes.size()));
// Simulate the coordinating node leaving the cluster
DiscoveryNode[] discoveryNodes = new DiscoveryNode[testNodes.length - 1];
for (int i = 1; i < testNodes.length; i++) {
discoveryNodes[i - 1] = testNodes[i].discoveryNode;
}
DiscoveryNode master = discoveryNodes[0];
for (int i = 1; i < testNodes.length; i++) {
// Notify only nodes that should remain in the cluster
testNodes[i].clusterService.setState(ClusterStateCreationUtils.state(testNodes[i].discoveryNode, master, discoveryNodes));
}
if (simulateBanBeforeLeaving) {
logger.info("--> Simulate issuing cancel request on the node that is about to leave the cluster");
// Simulate issuing cancel request on the node that is about to leave the cluster
CancelTasksRequest request = new CancelTasksRequest(testNodes[0].discoveryNode.getId());
request.reason("Testing Cancellation");
request.taskId(mainTask.getId());
// And send the cancellation request to a random node
CancelTasksResponse response = testNodes[0].transportCancelTasksAction.execute(request).get();
logger.info("--> Done simulating issuing cancel request on the node that is about to leave the cluster");
// This node still thinks that's part of the cluster, so cancelling should look successful
if (response.getTasks().size() == 0) {
logger.error("!!!!");
}
assertThat(response.getTasks().size(), lessThanOrEqualTo(1));
assertThat(response.getTaskFailures().size(), lessThanOrEqualTo(1));
assertThat(response.getTaskFailures().size() + response.getTasks().size(), lessThanOrEqualTo(1));
}
for (int i = 1; i < testNodes.length; i++) {
assertEquals("No bans on the node " + i, 0, testNodes[i].transportService.getTaskManager().getBanCount());
}
// Close the first node
testNodes[0].close();
assertBusy(() -> {
// Make sure that tasks are no longer running
try {
ListTasksResponse listTasksResponse1 = testNodes[randomIntBetween(1, testNodes.length - 1)]
.transportListTasksAction.execute(new ListTasksRequest().parentNode(mainNode).taskId(mainTask.getId())).get();
assertEquals(0, listTasksResponse1.getTasks().size());
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
} catch (ExecutionException ex2) {
fail("shouldn't be here");
}
});
// Wait for clean up
responseLatch.await();
}
private static void debugDelay(String nodeId, String name) {
// Introduce an additional pseudo random repeatable race conditions
String delayName = RandomizedContext.current().getRunnerSeedAsString() + ":" + nodeId + ":" + name;
Random random = new Random(delayName.hashCode());
if (RandomInts.randomIntBetween(random, 0, 10) < 1) {
try {
Thread.sleep(RandomInts.randomIntBetween(random, 20, 50));
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
}

View File

@ -0,0 +1,245 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.node.tasks;
import org.elasticsearch.Version;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.TransportCancelTasksAction;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.cluster.TestClusterService;
import org.elasticsearch.test.tasks.MockTaskManager;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.local.LocalTransport;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.Supplier;
/**
* The test case for unit testing task manager and related transport actions
*/
public abstract class TaskManagerTestCase extends ESTestCase {
protected static ThreadPool threadPool;
public static final ClusterName clusterName = new ClusterName("test-cluster");
protected TestNode[] testNodes;
protected int nodesCount;
@BeforeClass
public static void beforeClass() {
threadPool = new ThreadPool(TransportTasksActionTests.class.getSimpleName());
}
@AfterClass
public static void afterClass() {
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
threadPool = null;
}
public void setupTestNodes(Settings settings) {
nodesCount = randomIntBetween(2, 10);
testNodes = new TestNode[nodesCount];
for (int i = 0; i < testNodes.length; i++) {
testNodes[i] = new TestNode("node" + i, threadPool, settings);
;
}
}
@After
public final void shutdownTestNodes() throws Exception {
for (TestNode testNode : testNodes) {
testNode.close();
}
}
static class NodeResponse extends BaseNodeResponse {
protected NodeResponse() {
super();
}
protected NodeResponse(DiscoveryNode node) {
super(node);
}
}
static class NodesResponse extends BaseNodesResponse<NodeResponse> {
private int failureCount;
protected NodesResponse(ClusterName clusterName, NodeResponse[] nodes, int failureCount) {
super(clusterName, nodes);
this.failureCount = failureCount;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
failureCount = in.readVInt();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(failureCount);
}
public int failureCount() {
return failureCount;
}
}
/**
* Simulates node-based task that can be used to block node tasks so they are guaranteed to be registered by task manager
*/
abstract class AbstractTestNodesAction<NodesRequest extends BaseNodesRequest<NodesRequest>, NodeRequest extends BaseNodeRequest>
extends TransportNodesAction<NodesRequest, NodesResponse, NodeRequest, NodeResponse> {
AbstractTestNodesAction(Settings settings, String actionName, ClusterName clusterName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService, Supplier<NodesRequest> request,
Supplier<NodeRequest> nodeRequest) {
super(settings, actionName, clusterName, threadPool, clusterService, transportService,
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY),
request, nodeRequest, ThreadPool.Names.GENERIC);
}
@Override
protected NodesResponse newResponse(NodesRequest request, AtomicReferenceArray responses) {
final List<NodeResponse> nodesList = new ArrayList<>();
int failureCount = 0;
for (int i = 0; i < responses.length(); i++) {
Object resp = responses.get(i);
if (resp instanceof NodeResponse) { // will also filter out null response for unallocated ones
nodesList.add((NodeResponse) resp);
} else if (resp instanceof FailedNodeException) {
failureCount++;
} else {
logger.warn("unknown response type [{}], expected NodeLocalGatewayMetaState or FailedNodeException", resp);
}
}
return new NodesResponse(clusterName, nodesList.toArray(new NodeResponse[nodesList.size()]), failureCount);
}
@Override
protected NodeResponse newNodeResponse() {
return new NodeResponse();
}
@Override
protected abstract NodeResponse nodeOperation(NodeRequest request);
@Override
protected boolean accumulateExceptions() {
return true;
}
}
public static class TestNode implements Releasable {
public TestNode(String name, ThreadPool threadPool, Settings settings) {
transportService = new TransportService(settings,
new LocalTransport(settings, threadPool, Version.CURRENT, new NamedWriteableRegistry()),
threadPool, new NamedWriteableRegistry()) {
@Override
protected TaskManager createTaskManager() {
if (MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.get(settings)) {
return new MockTaskManager(settings);
} else {
return super.createTaskManager();
}
}
};
transportService.start();
clusterService = new TestClusterService(threadPool, transportService);
clusterService.add(transportService.getTaskManager());
discoveryNode = new DiscoveryNode(name, transportService.boundAddress().publishAddress(), Version.CURRENT);
IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver(settings);
ActionFilters actionFilters = new ActionFilters(Collections.emptySet());
transportListTasksAction = new TransportListTasksAction(settings, clusterName, threadPool, clusterService, transportService,
actionFilters, indexNameExpressionResolver);
transportCancelTasksAction = new TransportCancelTasksAction(settings, clusterName, threadPool, clusterService, transportService,
actionFilters, indexNameExpressionResolver);
}
public final TestClusterService clusterService;
public final TransportService transportService;
public final DiscoveryNode discoveryNode;
public final TransportListTasksAction transportListTasksAction;
public final TransportCancelTasksAction transportCancelTasksAction;
@Override
public void close() {
transportService.close();
}
}
public static void connectNodes(TestNode... nodes) {
DiscoveryNode[] discoveryNodes = new DiscoveryNode[nodes.length];
for (int i = 0; i < nodes.length; i++) {
discoveryNodes[i] = nodes[i].discoveryNode;
}
DiscoveryNode master = discoveryNodes[0];
for (TestNode node : nodes) {
node.clusterService.setState(ClusterStateCreationUtils.state(node.discoveryNode, master, discoveryNodes));
}
for (TestNode nodeA : nodes) {
for (TestNode nodeB : nodes) {
nodeA.transportService.connectToNode(nodeB.discoveryNode);
}
}
}
public static RecordingTaskManagerListener[] setupListeners(TestNode[] nodes, String... actionMasks) {
RecordingTaskManagerListener[] listeners = new RecordingTaskManagerListener[nodes.length];
for (int i = 0; i < nodes.length; i++) {
listeners[i] = new RecordingTaskManagerListener(nodes[i].discoveryNode, actionMasks);
((MockTaskManager) (nodes[i].clusterService.getTaskManager())).addListener(listeners[i]);
}
return listeners;
}
}

View File

@ -20,6 +20,7 @@ package org.elasticsearch.action.admin.cluster.node.tasks;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo;
@ -68,7 +69,12 @@ public class TasksIT extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return pluginList(MockTransportService.TestPlugin.class);
return pluginList(MockTransportService.TestPlugin.class, TestTaskPlugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return nodePlugins();
}
@Override
@ -279,6 +285,39 @@ public class TasksIT extends ESIntegTestCase {
}
}
public void testTasksCancellation() throws Exception {
// Start blocking test task
// Get real client (the plugin is not registered on transport nodes)
ListenableActionFuture<TestTaskPlugin.NodesResponse> future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client()).execute();
logger.info("--> started test tasks");
// Wait for the task to start on all nodes
assertBusy(() -> assertEquals(internalCluster().numDataAndMasterNodes(),
client().admin().cluster().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").get().getTasks().size()));
logger.info("--> cancelling the main test task");
CancelTasksResponse cancelTasksResponse = client().admin().cluster().prepareCancelTasks().setActions(TestTaskPlugin.TestTaskAction.NAME).get();
assertEquals(1, cancelTasksResponse.getTasks().size());
future.get();
logger.info("--> checking that test tasks are not running");
assertEquals(0, client().admin().cluster().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME + "*").get().getTasks().size());
}
public void testTasksUnblocking() throws Exception {
// Start blocking test task
ListenableActionFuture<TestTaskPlugin.NodesResponse> future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client()).execute();
// Wait for the task to start on all nodes
assertBusy(() -> assertEquals(internalCluster().numDataAndMasterNodes(),
client().admin().cluster().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").get().getTasks().size()));
TestTaskPlugin.UnblockTestTasksAction.INSTANCE.newRequestBuilder(client()).get();
future.get();
assertEquals(0, client().admin().cluster().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").get().getTasks().size());
}
@Override
public void tearDown() throws Exception {

View File

@ -0,0 +1,454 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.node.tasks;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionModule;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicReferenceArray;
import static org.elasticsearch.test.ESTestCase.awaitBusy;
/**
* A plugin that adds a cancellable blocking test task of integration testing of the task manager.
*/
public class TestTaskPlugin extends Plugin {
@Override
public String name() {
return "test-task-plugin";
}
@Override
public String description() {
return "Test plugin for testing task management";
}
public void onModule(ActionModule module) {
module.registerAction(TestTaskAction.INSTANCE, TransportTestTaskAction.class);
module.registerAction(UnblockTestTasksAction.INSTANCE, TransportUnblockTestTasksAction.class);
}
static class TestTask extends CancellableTask {
private volatile boolean blocked = true;
public TestTask(long id, String type, String action, String description, String parentNode, long parentId) {
super(id, type, action, description, parentNode, parentId);
}
public boolean isBlocked() {
return blocked;
}
public void unblock() {
blocked = false;
}
}
public static class NodeResponse extends BaseNodeResponse {
protected NodeResponse() {
super();
}
public NodeResponse(DiscoveryNode node) {
super(node);
}
}
public static class NodesResponse extends BaseNodesResponse<NodeResponse> {
private int failureCount;
NodesResponse() {
}
public NodesResponse(ClusterName clusterName, NodeResponse[] nodes, int failureCount) {
super(clusterName, nodes);
this.failureCount = failureCount;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
failureCount = in.readVInt();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(failureCount);
}
public int failureCount() {
return failureCount;
}
}
public static class NodeRequest extends BaseNodeRequest {
protected String requestName;
protected String nodeId;
public NodeRequest() {
super();
}
public NodeRequest(NodesRequest request, String nodeId) {
super(nodeId);
requestName = request.requestName;
this.nodeId = nodeId;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
requestName = in.readString();
nodeId = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(requestName);
out.writeString(nodeId);
}
@Override
public String getDescription() {
return "NodeRequest[" + requestName + ", " + nodeId + "]";
}
@Override
public Task createTask(long id, String type, String action, String parentTaskNode, long parentTaskId) {
return new TestTask(id, type, action, this.getDescription(), parentTaskNode, parentTaskId);
}
}
public static class NodesRequest extends BaseNodesRequest<NodesRequest> {
private String requestName;
NodesRequest() {
super();
}
public NodesRequest(String requestName, String... nodesIds) {
super(nodesIds);
this.requestName = requestName;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
requestName = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(requestName);
}
@Override
public String getDescription() {
return "NodesRequest[" + requestName + "]";
}
@Override
public Task createTask(long id, String type, String action) {
return new CancellableTask(id, type, action, getDescription());
}
}
public static class TransportTestTaskAction extends TransportNodesAction<NodesRequest, NodesResponse, NodeRequest, NodeResponse> {
@Inject
public TransportTestTaskAction(Settings settings, ClusterName clusterName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService) {
super(settings, TestTaskAction.NAME, clusterName, threadPool, clusterService, transportService,
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY),
NodesRequest::new, NodeRequest::new, ThreadPool.Names.GENERIC);
}
@Override
protected NodesResponse newResponse(NodesRequest request, AtomicReferenceArray responses) {
final List<NodeResponse> nodesList = new ArrayList<>();
int failureCount = 0;
for (int i = 0; i < responses.length(); i++) {
Object resp = responses.get(i);
if (resp instanceof NodeResponse) { // will also filter out null response for unallocated ones
nodesList.add((NodeResponse) resp);
} else if (resp instanceof FailedNodeException) {
failureCount++;
} else {
logger.warn("unknown response type [{}], expected NodeLocalGatewayMetaState or FailedNodeException", resp);
}
}
return new NodesResponse(clusterName, nodesList.toArray(new NodeResponse[nodesList.size()]), failureCount);
}
@Override
protected String[] filterNodeIds(DiscoveryNodes nodes, String[] nodesIds) {
List<String> list = new ArrayList<>();
for (String node : nodesIds) {
if (nodes.getDataNodes().containsKey(node)) {
list.add(node);
}
}
return list.toArray(new String[list.size()]);
}
@Override
protected NodeRequest newNodeRequest(String nodeId, NodesRequest request) {
return new NodeRequest(request, nodeId);
}
@Override
protected NodeResponse newNodeResponse() {
return new NodeResponse();
}
@Override
protected void doExecute(Task task, NodesRequest request, ActionListener<NodesResponse> listener) {
super.doExecute(task, request, listener);
}
@Override
protected NodeResponse nodeOperation(NodeRequest request, Task task) {
logger.info("Test task started on the node {}", clusterService.localNode());
try {
awaitBusy(() -> {
if (((CancellableTask) task).isCancelled()) {
throw new RuntimeException("Cancelled!");
}
return ((TestTask) task).isBlocked() == false;
});
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
logger.info("Test task finished on the node {}", clusterService.localNode());
return new NodeResponse(clusterService.localNode());
}
@Override
protected NodeResponse nodeOperation(NodeRequest request) {
throw new UnsupportedOperationException("the task parameter is required");
}
@Override
protected boolean accumulateExceptions() {
return true;
}
}
public static class TestTaskAction extends Action<NodesRequest, NodesResponse, NodesRequestBuilder> {
public static final TestTaskAction INSTANCE = new TestTaskAction();
public static final String NAME = "cluster:admin/tasks/test";
private TestTaskAction() {
super(NAME);
}
@Override
public NodesResponse newResponse() {
return new NodesResponse();
}
@Override
public NodesRequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new NodesRequestBuilder(client, this);
}
}
public static class NodesRequestBuilder extends ActionRequestBuilder<NodesRequest, NodesResponse, NodesRequestBuilder> {
protected NodesRequestBuilder(ElasticsearchClient client, Action<NodesRequest, NodesResponse, NodesRequestBuilder> action) {
super(client, action, new NodesRequest("test"));
}
}
public static class UnblockTestTaskResponse implements Writeable<UnblockTestTaskResponse> {
public UnblockTestTaskResponse() {
}
public UnblockTestTaskResponse(StreamInput in) {
}
@Override
public void writeTo(StreamOutput out) throws IOException {
}
@Override
public UnblockTestTaskResponse readFrom(StreamInput in) throws IOException {
return new UnblockTestTaskResponse(in);
}
}
public static class UnblockTestTasksRequest extends BaseTasksRequest<UnblockTestTasksRequest> {
}
public static class UnblockTestTasksResponse extends BaseTasksResponse {
private List<UnblockTestTaskResponse> tasks;
public UnblockTestTasksResponse() {
}
public UnblockTestTasksResponse(List<UnblockTestTaskResponse> tasks, List<TaskOperationFailure> taskFailures, List<? extends
FailedNodeException> nodeFailures) {
super(taskFailures, nodeFailures);
this.tasks = tasks == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(tasks));
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
int taskCount = in.readVInt();
List<UnblockTestTaskResponse> builder = new ArrayList<>();
for (int i = 0; i < taskCount; i++) {
builder.add(new UnblockTestTaskResponse(in));
}
tasks = Collections.unmodifiableList(builder);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(tasks.size());
for (UnblockTestTaskResponse task : tasks) {
task.writeTo(out);
}
}
}
/**
* Test class for testing task operations
*/
public static class TransportUnblockTestTasksAction extends TransportTasksAction<Task, UnblockTestTasksRequest,
UnblockTestTasksResponse, UnblockTestTaskResponse> {
@Inject
public TransportUnblockTestTasksAction(Settings settings, ClusterName clusterName, ThreadPool threadPool, ClusterService
clusterService,
TransportService transportService) {
super(settings, UnblockTestTasksAction.NAME, clusterName, threadPool, clusterService, transportService, new ActionFilters(new
HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY),
UnblockTestTasksRequest::new, UnblockTestTasksResponse::new, ThreadPool.Names.MANAGEMENT);
}
@Override
protected UnblockTestTasksResponse newResponse(UnblockTestTasksRequest request, List<UnblockTestTaskResponse> tasks,
List<TaskOperationFailure> taskOperationFailures, List<FailedNodeException>
failedNodeExceptions) {
return new UnblockTestTasksResponse(tasks, taskOperationFailures, failedNodeExceptions);
}
@Override
protected UnblockTestTaskResponse readTaskResponse(StreamInput in) throws IOException {
return new UnblockTestTaskResponse(in);
}
@Override
protected UnblockTestTaskResponse taskOperation(UnblockTestTasksRequest request, Task task) {
((TestTask) task).unblock();
return new UnblockTestTaskResponse();
}
@Override
protected boolean accumulateExceptions() {
return true;
}
}
public static class UnblockTestTasksAction extends Action<UnblockTestTasksRequest, UnblockTestTasksResponse,
UnblockTestTasksRequestBuilder> {
public static final UnblockTestTasksAction INSTANCE = new UnblockTestTasksAction();
public static final String NAME = "cluster:admin/tasks/testunblock";
private UnblockTestTasksAction() {
super(NAME);
}
@Override
public UnblockTestTasksResponse newResponse() {
return new UnblockTestTasksResponse();
}
@Override
public UnblockTestTasksRequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new UnblockTestTasksRequestBuilder(client, this);
}
}
public static class UnblockTestTasksRequestBuilder extends ActionRequestBuilder<UnblockTestTasksRequest, UnblockTestTasksResponse,
UnblockTestTasksRequestBuilder> {
protected UnblockTestTasksRequestBuilder(ElasticsearchClient client, Action<UnblockTestTasksRequest, UnblockTestTasksResponse,
UnblockTestTasksRequestBuilder> action) {
super(client, action, new UnblockTestTasksRequest());
}
}
}

View File

@ -18,23 +18,19 @@
*/
package org.elasticsearch.action.admin.cluster.node.tasks;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
@ -42,16 +38,11 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.cluster.TestClusterService;
import org.elasticsearch.test.tasks.MockTaskManager;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -70,102 +61,13 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;
import static org.elasticsearch.action.support.PlainActionFuture.newFuture;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.not;
public class TransportTasksActionTests extends ESTestCase {
private static ThreadPool threadPool;
private static final ClusterName clusterName = new ClusterName("test-cluster");
private TestNode[] testNodes;
private int nodesCount;
@BeforeClass
public static void beforeClass() {
threadPool = new ThreadPool(TransportTasksActionTests.class.getSimpleName());
}
@AfterClass
public static void afterClass() {
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
threadPool = null;
}
public void setupTestNodes(Settings settings) {
nodesCount = randomIntBetween(2, 10);
testNodes = new TestNode[nodesCount];
for (int i = 0; i < testNodes.length; i++) {
testNodes[i] = new TestNode("node" + i, threadPool, settings);
}
}
@After
public final void shutdownTestNodes() throws Exception {
for (TestNode testNode : testNodes) {
testNode.close();
}
}
private static class TestNode implements Releasable {
public TestNode(String name, ThreadPool threadPool, Settings settings) {
transportService = new TransportService(settings,
new LocalTransport(settings, threadPool, Version.CURRENT, new NamedWriteableRegistry()),
threadPool, new NamedWriteableRegistry()) {
@Override
protected TaskManager createTaskManager() {
if (MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.get(settings)) {
return new MockTaskManager(settings);
} else {
return super.createTaskManager();
}
}
};
transportService.start();
clusterService = new TestClusterService(threadPool, transportService);
discoveryNode = new DiscoveryNode(name, transportService.boundAddress().publishAddress(), Version.CURRENT);
transportListTasksAction = new TransportListTasksAction(settings, clusterName, threadPool, clusterService, transportService,
new ActionFilters(Collections.emptySet()), new IndexNameExpressionResolver(settings));
}
public final TestClusterService clusterService;
public final TransportService transportService;
public final DiscoveryNode discoveryNode;
public final TransportListTasksAction transportListTasksAction;
@Override
public void close() {
transportService.close();
}
}
public static void connectNodes(TestNode... nodes) {
DiscoveryNode[] discoveryNodes = new DiscoveryNode[nodes.length];
for (int i = 0; i < nodes.length; i++) {
discoveryNodes[i] = nodes[i].discoveryNode;
}
DiscoveryNode master = discoveryNodes[0];
for (TestNode node : nodes) {
node.clusterService.setState(ClusterStateCreationUtils.state(node.discoveryNode, master, discoveryNodes));
}
for (TestNode nodeA : nodes) {
for (TestNode nodeB : nodes) {
nodeA.transportService.connectToNode(nodeB.discoveryNode);
}
}
}
public static RecordingTaskManagerListener[] setupListeners(TestNode[] nodes, String... actionMasks) {
RecordingTaskManagerListener[] listeners = new RecordingTaskManagerListener[nodes.length];
for (int i = 0; i < nodes.length; i++) {
listeners[i] = new RecordingTaskManagerListener(nodes[i].discoveryNode, actionMasks);
((MockTaskManager)(nodes[i].clusterService.getTaskManager())).addListener(listeners[i]);
}
return listeners;
}
public class TransportTasksActionTests extends TaskManagerTestCase {
public static class NodeRequest extends BaseNodeRequest {
protected String requestName;
@ -197,13 +99,13 @@ public class TransportTasksActionTests extends ESTestCase {
@Override
public String getDescription() {
return "NodeRequest[" + requestName + ", " + enableTaskManager + "]";
return "CancellableNodeRequest[" + requestName + ", " + enableTaskManager + "]";
}
@Override
public Task createTask(long id, String type, String action) {
public Task createTask(long id, String type, String action, String parentTaskNode, long parentTaskId) {
if (enableTaskManager) {
return super.createTask(id, type, action);
return super.createTask(id, type, action, parentTaskNode, parentTaskId);
} else {
return null;
}
@ -214,7 +116,7 @@ public class TransportTasksActionTests extends ESTestCase {
private String requestName;
private boolean enableTaskManager;
private NodesRequest() {
NodesRequest() {
super();
}
@ -244,7 +146,7 @@ public class TransportTasksActionTests extends ESTestCase {
@Override
public String getDescription() {
return "NodesRequest[" + requestName + ", " + enableTaskManager + "]";
return "CancellableNodesRequest[" + requestName + ", " + enableTaskManager + "]";
}
@Override
@ -257,70 +159,14 @@ public class TransportTasksActionTests extends ESTestCase {
}
}
static class NodeResponse extends BaseNodeResponse {
protected NodeResponse() {
super();
}
protected NodeResponse(DiscoveryNode node) {
super(node);
}
}
static class NodesResponse extends BaseNodesResponse<NodeResponse> {
private int failureCount;
protected NodesResponse(ClusterName clusterName, NodeResponse[] nodes, int failureCount) {
super(clusterName, nodes);
this.failureCount = failureCount;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
failureCount = in.readVInt();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(failureCount);
}
public int failureCount() {
return failureCount;
}
}
/**
* Simulates node-based task that can be used to block node tasks so they are guaranteed to be registered by task manager
*/
abstract class TestNodesAction extends TransportNodesAction<NodesRequest, NodesResponse, NodeRequest, NodeResponse> {
abstract class TestNodesAction extends AbstractTestNodesAction<NodesRequest, NodeRequest> {
TestNodesAction(Settings settings, String actionName, ClusterName clusterName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService) {
super(settings, actionName, clusterName, threadPool, clusterService, transportService,
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY),
NodesRequest::new, NodeRequest::new, ThreadPool.Names.GENERIC);
}
@Override
protected NodesResponse newResponse(NodesRequest request, AtomicReferenceArray responses) {
final List<NodeResponse> nodesList = new ArrayList<>();
int failureCount = 0;
for (int i = 0; i < responses.length(); i++) {
Object resp = responses.get(i);
if (resp instanceof NodeResponse) { // will also filter out null response for unallocated ones
nodesList.add((NodeResponse) resp);
} else if (resp instanceof FailedNodeException) {
failureCount++;
} else {
logger.warn("unknown response type [{}], expected NodeLocalGatewayMetaState or FailedNodeException", resp);
}
}
return new NodesResponse(clusterName, nodesList.toArray(new NodeResponse[nodesList.size()]), failureCount);
super(settings, actionName, clusterName, threadPool, clusterService, transportService, NodesRequest::new, NodeRequest::new);
}
@Override
@ -332,14 +178,6 @@ public class TransportTasksActionTests extends ESTestCase {
protected NodeResponse newNodeResponse() {
return new NodeResponse();
}
@Override
protected abstract NodeResponse nodeOperation(NodeRequest request);
@Override
protected boolean accumulateExceptions() {
return true;
}
}
static class TestTaskResponse implements Writeable<TestTaskResponse> {
@ -411,7 +249,7 @@ public class TransportTasksActionTests extends ESTestCase {
/**
* Test class for testing task operations
*/
static abstract class TestTasksAction extends TransportTasksAction<TestTasksRequest, TestTasksResponse, TestTaskResponse> {
static abstract class TestTasksAction extends TransportTasksAction<Task, TestTasksRequest, TestTasksResponse, TestTaskResponse> {
protected TestTasksAction(Settings settings, String actionName, ClusterName clusterName, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService) {
@ -548,7 +386,7 @@ public class TransportTasksActionTests extends ESTestCase {
assertEquals(testNodes.length, response.getPerNodeTasks().size());
for (Map.Entry<DiscoveryNode, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) {
assertEquals(1, entry.getValue().size());
assertEquals("NodeRequest[Test Request, true]", entry.getValue().get(0).getDescription());
assertEquals("CancellableNodeRequest[Test Request, true]", entry.getValue().get(0).getDescription());
}
// Make sure that the main task on coordinating node is the task that was returned to us by execute()
@ -648,7 +486,7 @@ public class TransportTasksActionTests extends ESTestCase {
assertEquals(testNodes.length, response.getPerNodeTasks().size());
for (Map.Entry<DiscoveryNode, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) {
assertEquals(1, entry.getValue().size());
assertEquals("NodeRequest[Test Request, true]", entry.getValue().get(0).getDescription());
assertEquals("CancellableNodeRequest[Test Request, true]", entry.getValue().get(0).getDescription());
}
// Release all tasks and wait for response
@ -657,6 +495,61 @@ public class TransportTasksActionTests extends ESTestCase {
assertEquals(0, responses.failureCount());
}
public void testCancellingTasksThatDontSupportCancellation() throws Exception {
setupTestNodes(Settings.EMPTY);
connectNodes(testNodes);
CountDownLatch checkLatch = new CountDownLatch(1);
CountDownLatch responseLatch = new CountDownLatch(1);
Task task = startBlockingTestNodesAction(checkLatch, new ActionListener<NodesResponse>() {
@Override
public void onResponse(NodesResponse nodeResponses) {
responseLatch.countDown();
}
@Override
public void onFailure(Throwable e) {
responseLatch.countDown();
}
});
String actionName = "testAction"; // only pick the main action
// Try to cancel main task using action name
CancelTasksRequest request = new CancelTasksRequest(testNodes[0].discoveryNode.getId());
request.reason("Testing Cancellation");
request.actions(actionName);
CancelTasksResponse response = testNodes[randomIntBetween(0, testNodes.length - 1)].transportCancelTasksAction.execute(request)
.get();
// Shouldn't match any tasks since testAction doesn't support cancellation
assertEquals(0, response.getTasks().size());
assertEquals(0, response.getTaskFailures().size());
assertEquals(0, response.getNodeFailures().size());
// Try to cancel main task using id
request = new CancelTasksRequest(testNodes[0].discoveryNode.getId());
request.reason("Testing Cancellation");
request.taskId(task.getId());
response = testNodes[randomIntBetween(0, testNodes.length - 1)].transportCancelTasksAction.execute(request).get();
// Shouldn't match any tasks since testAction doesn't support cancellation
assertEquals(0, response.getTasks().size());
assertEquals(0, response.getTaskFailures().size());
assertEquals(1, response.getNodeFailures().size());
assertThat(response.getNodeFailures().get(0).getDetailedMessage(), containsString("doesn't support cancellation"));
// Make sure that task is still running
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.actions(actionName);
ListTasksResponse listResponse = testNodes[randomIntBetween(0, testNodes.length - 1)].transportListTasksAction.execute
(listTasksRequest).get();
assertEquals(1, listResponse.getPerNodeTasks().size());
// Release all tasks and wait for response
checkLatch.countDown();
responseLatch.await(10, TimeUnit.SECONDS);
}
public void testFailedTasksCount() throws ExecutionException, InterruptedException, IOException {
Settings settings = Settings.builder().put(MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.getKey(), true).build();
setupTestNodes(settings);

View File

@ -0,0 +1,35 @@
{
"tasks.cancel": {
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/master/tasks-cancel.html",
"methods": ["POST"],
"url": {
"path": "/_tasks",
"paths": ["/_tasks/_cancel", "/_tasks/{node_id}/_cancel", "/_tasks/{node_id}/{task_id}/_cancel"],
"parts": {
"node_id": {
"type": "list",
"description": "A comma-separated list of node IDs or names to limit the request; use `_local` to cancel only tasks on the node you're connecting to, leave empty to cancel tasks on all nodes"
},
"task_id": {
"type": "number",
"description": "Cancel the task with specified id"
}
},
"params": {
"actions": {
"type": "list",
"description": "A comma-separated list of actions that should be cancelled. Leave empty to cancel all."
},
"parent_node": {
"type": "string",
"description": "Cancel tasks with specified parent node."
},
"parent_task": {
"type" : "number",
"description" : "Cancel tasks with specified parent task id. Set to -1 to cancel all."
}
}
},
"body": null
}
}

View File

@ -4,18 +4,22 @@
"methods": ["GET"],
"url": {
"path": "/_tasks",
"paths": ["/_tasks", "/_tasks/{node_id}", "/_tasks/{node_id}/{actions}"],
"paths": ["/_tasks", "/_tasks/{node_id}", "/_tasks/{node_id}/{task_id}"],
"parts": {
"node_id": {
"type": "list",
"description": "A comma-separated list of node IDs or names to limit the returned information; use `_local` to return information from the node you're connecting to, leave empty to get information from all nodes"
},
"actions": {
"type": "list",
"description": "A comma-separated list of actions that should be returned. Leave empty to return all."
"task_id": {
"type": "number",
"description": "Return the task with specified id"
}
},
"params": {
"actions": {
"type": "list",
"description": "A comma-separated list of actions that should be returned. Leave empty to return all."
},
"detailed": {
"type": "boolean",
"description": "Return detailed task information (default: false)"

View File

@ -0,0 +1,8 @@
---
"tasks_cancel test":
- do:
tasks.cancel:
node_id: _local
task_id: 1
- length: { nodes: 0 }