Don't wait for completion of list tasks tasks when wait_for_completion flag is set

Waiting for completion of list tasks tasks can cause an infinite loop of a list tasks task waiting for its own completion or completion of its children. To reproduce run:

```
curl "localhost:9200/_tasks?wait_for_completion"
```
This commit is contained in:
Igor Motov 2016-03-21 16:02:08 -04:00
parent 201fc06f8d
commit 8202bf212c
3 changed files with 23 additions and 1 deletions

View File

@ -84,7 +84,13 @@ public class TransportListTasksAction extends TransportTasksAction<Task, ListTas
long timeoutTime = System.nanoTime() + timeout.nanos(); long timeoutTime = System.nanoTime() + timeout.nanos();
super.processTasks(request, operation.andThen((Task t) -> { super.processTasks(request, operation.andThen((Task t) -> {
while (System.nanoTime() - timeoutTime < 0) { while (System.nanoTime() - timeoutTime < 0) {
if (taskManager.getTask(t.getId()) == null) { Task task = taskManager.getTask(t.getId());
if (task == null) {
return;
}
if (task.getAction().startsWith(ListTasksAction.NAME)) {
// It doesn't make sense to wait for List Tasks and it can cause an infinite loop of the task waiting
// for itself of one of its child tasks
return; return;
} }
try { try {

View File

@ -24,6 +24,7 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestController;
@ -51,6 +52,7 @@ public class RestListTasksAction extends BaseRestHandler {
String[] actions = Strings.splitStringByCommaToArray(request.param("actions")); String[] actions = Strings.splitStringByCommaToArray(request.param("actions"));
TaskId parentTaskId = new TaskId(request.param("parent_task_id")); TaskId parentTaskId = new TaskId(request.param("parent_task_id"));
boolean waitForCompletion = request.paramAsBoolean("wait_for_completion", false); boolean waitForCompletion = request.paramAsBoolean("wait_for_completion", false);
TimeValue timeout = request.paramAsTime("timeout", null);
ListTasksRequest listTasksRequest = new ListTasksRequest(); ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setTaskId(taskId); listTasksRequest.setTaskId(taskId);
@ -59,6 +61,7 @@ public class RestListTasksAction extends BaseRestHandler {
listTasksRequest.setActions(actions); listTasksRequest.setActions(actions);
listTasksRequest.setParentTaskId(parentTaskId); listTasksRequest.setParentTaskId(parentTaskId);
listTasksRequest.setWaitForCompletion(waitForCompletion); listTasksRequest.setWaitForCompletion(waitForCompletion);
listTasksRequest.setTimeout(timeout);
client.admin().cluster().listTasks(listTasksRequest, new RestToXContentListener<>(channel)); client.admin().cluster().listTasks(listTasksRequest, new RestToXContentListener<>(channel));
} }
} }

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action.admin.cluster.node.tasks;
import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.ListenableActionFuture; import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
@ -59,6 +60,7 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function; import java.util.function.Function;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.emptyCollectionOf; import static org.hamcrest.Matchers.emptyCollectionOf;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@ -406,6 +408,17 @@ public class TasksIT extends ESIntegTestCase {
assertThat(waitResponseFuture.get().getTasks(), emptyCollectionOf(TaskInfo.class)); assertThat(waitResponseFuture.get().getTasks(), emptyCollectionOf(TaskInfo.class));
} }
public void testTasksWaitForAllTask() throws Exception {
// Spin up a request to wait for all tasks in the cluster to make sure it doesn't cause an infinite loop
ListTasksResponse response = client().admin().cluster().prepareListTasks().setWaitForCompletion(true)
.setTimeout(timeValueSeconds(10)).get();
// It should finish quickly and without complaint and list the list tasks themselves
assertThat(response.getNodeFailures(), emptyCollectionOf(FailedNodeException.class));
assertThat(response.getTaskFailures(), emptyCollectionOf(TaskOperationFailure.class));
assertThat(response.getTasks().size(), greaterThanOrEqualTo(1));
}
@Override @Override
public void tearDown() throws Exception { public void tearDown() throws Exception {
for (Map.Entry<Tuple<String, String>, RecordingTaskManagerListener> entry : listeners.entrySet()) { for (Map.Entry<Tuple<String, String>, RecordingTaskManagerListener> entry : listeners.entrySet()) {