From 6d0efae7139401f497e6cd4ab329fba82dd922bf Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Wed, 2 Mar 2016 11:18:44 -0500 Subject: [PATCH] Teach list tasks api to wait for tasks to finish _wait_for_completion defaults to false. If set to true then the API will wait for all the tasks that it finds to stop running before returning. You can use the timeout parameter to prevent it from waiting forever. If you don't set a timeout parameter it'll default to 30 seconds. Also adds a log message to rest tests if any tasks overrun the test. This is just a log (instead of failing the test) because lots of tasks are run by the cluster on its own and they shouldn't cause the test to fail. Things like fetching disk usage from the other nodes, for example. Switches the request to getter/setter style methods as we're going that way in the Elasticsearch code base. Reindex is all getter/setter style. Closes #16906 --- .../node/tasks/cancel/CancelTasksRequest.java | 10 ++- .../cancel/TransportCancelTasksAction.java | 16 ++-- .../node/tasks/list/ListTasksRequest.java | 22 ++++- .../tasks/list/ListTasksRequestBuilder.java | 10 ++- .../tasks/list/TransportListTasksAction.java | 39 ++++++++- .../support/tasks/BaseTasksRequest.java | 28 +++--- .../support/tasks/TasksRequestBuilder.java | 6 +- .../support/tasks/TransportTasksAction.java | 18 ++-- .../node/tasks/RestCancelTasksAction.java | 8 +- .../node/tasks/RestListTasksAction.java | 12 +-- .../node/tasks/CancellableTasksTests.java | 14 +-- .../admin/cluster/node/tasks/TasksIT.java | 75 ++++++++++++++++ .../cluster/node/tasks/TestTaskPlugin.java | 5 +- .../node/tasks/TransportTasksActionTests.java | 32 +++---- .../rest-api-spec/test/reindex/10_basic.yaml | 9 +- .../test/update-by-query/10_basic.yaml | 6 ++ .../rest-api-spec/api/tasks.list.json | 4 + .../test/rest/ESRestTestCase.java | 85 ++++++++++++++----- .../rest/client/http/HttpRequestBuilder.java | 5 +- 19 files changed, 305 insertions(+), 99 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/CancelTasksRequest.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/CancelTasksRequest.java index 1d1249e1551..e92695d61e2 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/CancelTasksRequest.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/CancelTasksRequest.java @@ -53,12 +53,18 @@ public class CancelTasksRequest extends BaseTasksRequest { return super.match(task) && task instanceof CancellableTask; } - public CancelTasksRequest reason(String reason) { + /** + * Set the reason for canceling the task. + */ + public CancelTasksRequest setReason(String reason) { this.reason = reason; return this; } - public String reason() { + /** + * The reason for canceling the task. + */ + public String getReason() { return reason; } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java index b07e540d792..874f230587d 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java @@ -84,21 +84,21 @@ public class TransportCancelTasksAction extends TransportTasksAction operation) { - if (request.taskId().isSet() == false) { + if (request.getTaskId().isSet() == false) { // we are only checking one task, we can optimize it - CancellableTask task = taskManager.getCancellableTask(request.taskId().getId()); + CancellableTask task = taskManager.getCancellableTask(request.getTaskId().getId()); if (task != null) { if (request.match(task)) { operation.accept(task); } else { - throw new IllegalArgumentException("task [" + request.taskId() + "] doesn't support this operation"); + throw new IllegalArgumentException("task [" + request.getTaskId() + "] doesn't support this operation"); } } else { - if (taskManager.getTask(request.taskId().getId()) != null) { + if (taskManager.getTask(request.getTaskId().getId()) != null) { // The task exists, but doesn't support cancellation - throw new IllegalArgumentException("task [" + request.taskId() + "] doesn't support cancellation"); + throw new IllegalArgumentException("task [" + request.getTaskId() + "] doesn't support cancellation"); } else { - throw new ResourceNotFoundException("task [{}] doesn't support cancellation", request.taskId()); + throw new ResourceNotFoundException("task [{}] doesn't support cancellation", request.getTaskId()); } } } else { @@ -113,14 +113,14 @@ public class TransportCancelTasksAction extends TransportTasksAction removeBanOnNodes(cancellableTask, nodes)); - Set childNodes = taskManager.cancel(cancellableTask, request.reason(), banLock::onTaskFinished); + Set childNodes = taskManager.cancel(cancellableTask, request.getReason(), banLock::onTaskFinished); if (childNodes != null) { if (childNodes.isEmpty()) { logger.trace("cancelling task {} with no children", cancellableTask.getId()); return cancellableTask.taskInfo(clusterService.localNode(), false); } else { logger.trace("cancelling task {} with children on nodes [{}]", cancellableTask.getId(), childNodes); - setBanOnNodes(request.reason(), cancellableTask, childNodes, banLock); + setBanOnNodes(request.getReason(), cancellableTask, childNodes, banLock); return cancellableTask.taskInfo(clusterService.localNode(), false); } } else { diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksRequest.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksRequest.java index 6bf8ac3e1ef..3fe743fc36a 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksRequest.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksRequest.java @@ -31,31 +31,49 @@ import java.io.IOException; public class ListTasksRequest extends BaseTasksRequest { private boolean detailed = false; + private boolean waitForCompletion = false; /** * Should the detailed task information be returned. */ - public boolean detailed() { + public boolean getDetailed() { return this.detailed; } /** * Should the detailed task information be returned. */ - public ListTasksRequest detailed(boolean detailed) { + public ListTasksRequest setDetailed(boolean detailed) { this.detailed = detailed; return this; } + /** + * Should this request wait for all found tasks to complete? + */ + public boolean getWaitForCompletion() { + return waitForCompletion; + } + + /** + * Should this request wait for all found tasks to complete? + */ + public ListTasksRequest setWaitForCompletion(boolean waitForCompletion) { + this.waitForCompletion = waitForCompletion; + return this; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); detailed = in.readBoolean(); + waitForCompletion = in.readBoolean(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeBoolean(detailed); + out.writeBoolean(waitForCompletion); } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksRequestBuilder.java index 2b462014f43..1385781125a 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksRequestBuilder.java @@ -35,7 +35,15 @@ public class ListTasksRequestBuilder extends TasksRequestBuilder { + private static final TimeValue WAIT_FOR_COMPLETION_POLL = timeValueMillis(100); + private static final TimeValue DEFAULT_WAIT_FOR_COMPLETION_TIMEOUT = timeValueSeconds(30); @Inject public TransportListTasksAction(Settings settings, ClusterName clusterName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { @@ -59,7 +67,34 @@ public class TransportListTasksAction extends TransportTasksAction operation) { + if (false == request.getWaitForCompletion()) { + super.processTasks(request, operation); + return; + } + // If we should wait for completion then we have to intercept every found task and wait for it to leave the manager. + TimeValue timeout = request.getTimeout(); + if (timeout == null) { + timeout = DEFAULT_WAIT_FOR_COMPLETION_TIMEOUT; + } + long timeoutTime = System.nanoTime() + timeout.nanos(); + super.processTasks(request, operation.andThen((Task t) -> { + while (System.nanoTime() - timeoutTime < 0) { + if (taskManager.getTask(t.getId()) == null) { + return; + } + try { + Thread.sleep(WAIT_FOR_COMPLETION_POLL.millis()); + } catch (InterruptedException e) { + throw new ElasticsearchException("Interrupted waiting for completion of [{}]", e, t); + } + } + throw new ElasticsearchTimeoutException("Timed out waiting for completion of [{}]", t); + })); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksRequest.java b/core/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksRequest.java index f7da48a667b..f1045387259 100644 --- a/core/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksRequest.java +++ b/core/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksRequest.java @@ -71,7 +71,7 @@ public class BaseTasksRequest> extends * Sets the list of action masks for the actions that should be returned */ @SuppressWarnings("unchecked") - public final Request actions(String... actions) { + public final Request setActions(String... actions) { this.actions = actions; return (Request) this; } @@ -79,16 +79,16 @@ public class BaseTasksRequest> extends /** * Return the list of action masks for the actions that should be returned */ - public String[] actions() { + public String[] getActions() { return actions; } - public final String[] nodesIds() { + public final String[] getNodesIds() { return nodesIds; } @SuppressWarnings("unchecked") - public final Request nodesIds(String... nodesIds) { + public final Request setNodesIds(String... nodesIds) { this.nodesIds = nodesIds; return (Request) this; } @@ -98,12 +98,12 @@ public class BaseTasksRequest> extends * * By default tasks with any ids are returned. */ - public TaskId taskId() { + public TaskId getTaskId() { return taskId; } @SuppressWarnings("unchecked") - public final Request taskId(TaskId taskId) { + public final Request setTaskId(TaskId taskId) { this.taskId = taskId; return (Request) this; } @@ -112,29 +112,29 @@ public class BaseTasksRequest> extends /** * Returns the parent task id that tasks should be filtered by */ - public TaskId parentTaskId() { + public TaskId getParentTaskId() { return parentTaskId; } @SuppressWarnings("unchecked") - public Request parentTaskId(TaskId parentTaskId) { + public Request setParentTaskId(TaskId parentTaskId) { this.parentTaskId = parentTaskId; return (Request) this; } - public TimeValue timeout() { + public TimeValue getTimeout() { return this.timeout; } @SuppressWarnings("unchecked") - public final Request timeout(TimeValue timeout) { + public final Request setTimeout(TimeValue timeout) { this.timeout = timeout; return (Request) this; } @SuppressWarnings("unchecked") - public final Request timeout(String timeout) { + public final Request setTimeout(String timeout) { this.timeout = TimeValue.parseTimeValue(timeout, null, getClass().getSimpleName() + ".timeout"); return (Request) this; } @@ -162,11 +162,11 @@ public class BaseTasksRequest> extends } public boolean match(Task task) { - if (actions() != null && actions().length > 0 && Regex.simpleMatch(actions(), task.getAction()) == false) { + if (getActions() != null && getActions().length > 0 && Regex.simpleMatch(getActions(), task.getAction()) == false) { return false; } - if (taskId().isSet() == false) { - if(taskId().getId() != task.getId()) { + if (getTaskId().isSet() == false) { + if(getTaskId().getId() != task.getId()) { return false; } } diff --git a/core/src/main/java/org/elasticsearch/action/support/tasks/TasksRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/support/tasks/TasksRequestBuilder.java index a7265ce9998..a510a847c62 100644 --- a/core/src/main/java/org/elasticsearch/action/support/tasks/TasksRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/support/tasks/TasksRequestBuilder.java @@ -35,19 +35,19 @@ public class TasksRequestBuilder , Res @SuppressWarnings("unchecked") public final RequestBuilder setNodesIds(String... nodesIds) { - request.nodesIds(nodesIds); + request.setNodesIds(nodesIds); return (RequestBuilder) this; } @SuppressWarnings("unchecked") public final RequestBuilder setActions(String... actions) { - request.actions(actions); + request.setActions(actions); return (RequestBuilder) this; } @SuppressWarnings("unchecked") public final RequestBuilder setTimeout(TimeValue timeout) { - request.timeout(timeout); + request.setTimeout(timeout); return (RequestBuilder) this; } } diff --git a/core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java b/core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java index f10b9f23327..53c0d851997 100644 --- a/core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java @@ -124,25 +124,25 @@ public abstract class TransportTasksAction< } protected String[] resolveNodes(TasksRequest request, ClusterState clusterState) { - if (request.taskId().isSet()) { - return clusterState.nodes().resolveNodesIds(request.nodesIds()); + if (request.getTaskId().isSet()) { + return clusterState.nodes().resolveNodesIds(request.getNodesIds()); } else { - return new String[]{request.taskId().getNodeId()}; + return new String[]{request.getTaskId().getNodeId()}; } } protected void processTasks(TasksRequest request, Consumer operation) { - if (request.taskId().isSet() == false) { + if (request.getTaskId().isSet() == false) { // we are only checking one task, we can optimize it - Task task = taskManager.getTask(request.taskId().getId()); + Task task = taskManager.getTask(request.getTaskId().getId()); if (task != null) { if (request.match(task)) { operation.accept((OperationTask) task); } else { - throw new ResourceNotFoundException("task [{}] doesn't support this operation", request.taskId()); + throw new ResourceNotFoundException("task [{}] doesn't support this operation", request.getTaskId()); } } else { - throw new ResourceNotFoundException("task [{}] is missing", request.taskId()); + throw new ResourceNotFoundException("task [{}] is missing", request.getTaskId()); } } else { for (Task task : taskManager.getTasks().values()) { @@ -224,8 +224,8 @@ public abstract class TransportTasksAction< } } else { TransportRequestOptions.Builder builder = TransportRequestOptions.builder(); - if (request.timeout() != null) { - builder.withTimeout(request.timeout()); + if (request.getTimeout() != null) { + builder.withTimeout(request.getTimeout()); } builder.withCompress(transportCompress()); for (int i = 0; i < nodesIds.length; i++) { diff --git a/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/tasks/RestCancelTasksAction.java b/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/tasks/RestCancelTasksAction.java index 99cdc16253a..658090bb6db 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/tasks/RestCancelTasksAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/tasks/RestCancelTasksAction.java @@ -52,10 +52,10 @@ public class RestCancelTasksAction extends BaseRestHandler { TaskId parentTaskId = new TaskId(request.param("parent_task_id")); CancelTasksRequest cancelTasksRequest = new CancelTasksRequest(); - cancelTasksRequest.taskId(taskId); - cancelTasksRequest.nodesIds(nodesIds); - cancelTasksRequest.actions(actions); - cancelTasksRequest.parentTaskId(parentTaskId); + cancelTasksRequest.setTaskId(taskId); + cancelTasksRequest.setNodesIds(nodesIds); + cancelTasksRequest.setActions(actions); + cancelTasksRequest.setParentTaskId(parentTaskId); client.admin().cluster().cancelTasks(cancelTasksRequest, new RestToXContentListener<>(channel)); } } diff --git a/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/tasks/RestListTasksAction.java b/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/tasks/RestListTasksAction.java index 992267fa8a5..9a9d1991298 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/tasks/RestListTasksAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/tasks/RestListTasksAction.java @@ -50,13 +50,15 @@ public class RestListTasksAction extends BaseRestHandler { TaskId taskId = new TaskId(request.param("taskId")); String[] actions = Strings.splitStringByCommaToArray(request.param("actions")); TaskId parentTaskId = new TaskId(request.param("parent_task_id")); + boolean waitForCompletion = request.paramAsBoolean("wait_for_completion", false); ListTasksRequest listTasksRequest = new ListTasksRequest(); - listTasksRequest.taskId(taskId); - listTasksRequest.nodesIds(nodesIds); - listTasksRequest.detailed(detailed); - listTasksRequest.actions(actions); - listTasksRequest.parentTaskId(parentTaskId); + listTasksRequest.setTaskId(taskId); + listTasksRequest.setNodesIds(nodesIds); + listTasksRequest.setDetailed(detailed); + listTasksRequest.setActions(actions); + listTasksRequest.setParentTaskId(parentTaskId); + listTasksRequest.setWaitForCompletion(waitForCompletion); client.admin().cluster().listTasks(listTasksRequest, new RestToXContentListener<>(channel)); } } diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java index 5109ab979cf..586f178d12d 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java @@ -237,8 +237,8 @@ public class CancellableTasksTests extends TaskManagerTestCase { // Cancel main task CancelTasksRequest request = new CancelTasksRequest(); - request.reason("Testing Cancellation"); - request.taskId(new TaskId(testNodes[0].discoveryNode.getId(), mainTask.getId())); + request.setReason("Testing Cancellation"); + request.setTaskId(new TaskId(testNodes[0].discoveryNode.getId(), mainTask.getId())); // And send the cancellation request to a random node CancelTasksResponse response = testNodes[randomIntBetween(0, testNodes.length - 1)].transportCancelTasksAction.execute(request) .get(); @@ -270,7 +270,7 @@ public class CancellableTasksTests extends TaskManagerTestCase { // Make sure that tasks are no longer running ListTasksResponse listTasksResponse = testNodes[randomIntBetween(0, testNodes.length - 1)] - .transportListTasksAction.execute(new ListTasksRequest().taskId( + .transportListTasksAction.execute(new ListTasksRequest().setTaskId( new TaskId(testNodes[0].discoveryNode.getId(), mainTask.getId()))).get(); assertEquals(0, listTasksResponse.getTasks().size()); @@ -313,7 +313,7 @@ public class CancellableTasksTests extends TaskManagerTestCase { // Make sure that tasks are running ListTasksResponse listTasksResponse = testNodes[randomIntBetween(0, testNodes.length - 1)] - .transportListTasksAction.execute(new ListTasksRequest().parentTaskId(new TaskId(mainNode, mainTask.getId()))).get(); + .transportListTasksAction.execute(new ListTasksRequest().setParentTaskId(new TaskId(mainNode, mainTask.getId()))).get(); assertThat(listTasksResponse.getTasks().size(), greaterThanOrEqualTo(blockOnNodes.size())); // Simulate the coordinating node leaving the cluster @@ -331,8 +331,8 @@ public class CancellableTasksTests extends TaskManagerTestCase { logger.info("--> Simulate issuing cancel request on the node that is about to leave the cluster"); // Simulate issuing cancel request on the node that is about to leave the cluster CancelTasksRequest request = new CancelTasksRequest(); - request.reason("Testing Cancellation"); - request.taskId(new TaskId(testNodes[0].discoveryNode.getId(), mainTask.getId())); + request.setReason("Testing Cancellation"); + request.setTaskId(new TaskId(testNodes[0].discoveryNode.getId(), mainTask.getId())); // And send the cancellation request to a random node CancelTasksResponse response = testNodes[0].transportCancelTasksAction.execute(request).get(); logger.info("--> Done simulating issuing cancel request on the node that is about to leave the cluster"); @@ -356,7 +356,7 @@ public class CancellableTasksTests extends TaskManagerTestCase { // Make sure that tasks are no longer running try { ListTasksResponse listTasksResponse1 = testNodes[randomIntBetween(1, testNodes.length - 1)] - .transportListTasksAction.execute(new ListTasksRequest().taskId(new TaskId(mainNode, mainTask.getId()))).get(); + .transportListTasksAction.execute(new ListTasksRequest().setTaskId(new TaskId(mainNode, mainTask.getId()))).get(); assertEquals(0, listTasksResponse1.getTasks().size()); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java index eaa3caf9084..36f4ce9b30e 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java @@ -18,6 +18,8 @@ */ package org.elasticsearch.action.admin.cluster.node.tasks; +import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.ListenableActionFuture; import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; @@ -54,8 +56,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; +import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; import static org.hamcrest.Matchers.emptyCollectionOf; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; @@ -327,6 +331,77 @@ public class TasksIT extends ESIntegTestCase { assertEquals(0, client().admin().cluster().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").get().getTasks().size()); } + public void testTasksListWaitForCompletion() throws Exception { + // Start blocking test task + ListenableActionFuture future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client()) + .execute(); + + ListenableActionFuture waitResponseFuture; + try { + // Wait for the task to start on all nodes + assertBusy(() -> assertEquals(internalCluster().numDataAndMasterNodes(), + client().admin().cluster().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").get().getTasks().size())); + + // Spin up a request to wait for that task to finish + waitResponseFuture = client().admin().cluster().prepareListTasks() + .setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").setWaitForCompletion(true).execute(); + } finally { + // Unblock the request so the wait for completion request can finish + TestTaskPlugin.UnblockTestTasksAction.INSTANCE.newRequestBuilder(client()).get(); + } + + // Now that the task is unblocked the list response will come back + ListTasksResponse waitResponse = waitResponseFuture.get(); + // If any tasks come back then they are the tasks we asked for - it'd be super weird if this wasn't true + for (TaskInfo task: waitResponse.getTasks()) { + assertEquals(task.getAction(), TestTaskPlugin.TestTaskAction.NAME + "[n]"); + } + // See the next test to cover the timeout case + + future.get(); + } + + public void testTasksListWaitForTimeout() throws Exception { + // Start blocking test task + ListenableActionFuture future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client()) + .execute(); + try { + // Wait for the task to start on all nodes + assertBusy(() -> assertEquals(internalCluster().numDataAndMasterNodes(), + client().admin().cluster().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").get().getTasks().size())); + + // Spin up a request that should wait for those tasks to finish + // It will timeout because we haven't unblocked the tasks + ListTasksResponse waitResponse = client().admin().cluster().prepareListTasks() + .setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").setWaitForCompletion(true).setTimeout(timeValueMillis(100)) + .get(); + + assertFalse(waitResponse.getNodeFailures().isEmpty()); + for (FailedNodeException failure : waitResponse.getNodeFailures()) { + Throwable timeoutException = failure.getCause(); + // The exception sometimes comes back wrapped depending on the client + if (timeoutException.getCause() != null) { + timeoutException = timeoutException.getCause(); + } + assertThat(failure.getCause().getCause(), instanceOf(ElasticsearchTimeoutException.class)); + } + } finally { + // Now we can unblock those requests + TestTaskPlugin.UnblockTestTasksAction.INSTANCE.newRequestBuilder(client()).get(); + } + future.get(); + } + + public void testTasksListWaitForNoTask() throws Exception { + // Spin up a request to wait for no matching tasks + ListenableActionFuture waitResponseFuture = client().admin().cluster().prepareListTasks() + .setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").setWaitForCompletion(true).setTimeout(timeValueMillis(10)) + .execute(); + + // It should finish quickly and without complaint + assertThat(waitResponseFuture.get().getTasks(), emptyCollectionOf(TaskInfo.class)); + } + @Override public void tearDown() throws Exception { for (Map.Entry, RecordingTaskManagerListener> entry : listeners.entrySet()) { diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java index 0d4372a51eb..e8dcd228e50 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java @@ -345,7 +345,10 @@ public class TestTaskPlugin extends Plugin { public static class UnblockTestTasksRequest extends BaseTasksRequest { - + @Override + public boolean match(Task task) { + return task instanceof TestTask && super.match(task); + } } public static class UnblockTestTasksResponse extends BaseTasksResponse { diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java index e1501f9b14c..556eee238fd 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java @@ -355,7 +355,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase { int testNodeNum = randomIntBetween(0, testNodes.length - 1); TestNode testNode = testNodes[testNodeNum]; ListTasksRequest listTasksRequest = new ListTasksRequest(); - listTasksRequest.actions("testAction*"); // pick all test actions + listTasksRequest.setActions("testAction*"); // pick all test actions logger.info("Listing currently running tasks using node [{}]", testNodeNum); ListTasksResponse response = testNode.transportListTasksAction.execute(listTasksRequest).get(); logger.info("Checking currently running tasks"); @@ -371,7 +371,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase { // Check task counts using transport with filtering testNode = testNodes[randomIntBetween(0, testNodes.length - 1)]; listTasksRequest = new ListTasksRequest(); - listTasksRequest.actions("testAction[n]"); // only pick node actions + listTasksRequest.setActions("testAction[n]"); // only pick node actions response = testNode.transportListTasksAction.execute(listTasksRequest).get(); assertEquals(testNodes.length, response.getPerNodeTasks().size()); for (Map.Entry> entry : response.getPerNodeTasks().entrySet()) { @@ -380,7 +380,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase { } // Check task counts using transport with detailed description - listTasksRequest.detailed(true); // same request only with detailed description + listTasksRequest.setDetailed(true); // same request only with detailed description response = testNode.transportListTasksAction.execute(listTasksRequest).get(); assertEquals(testNodes.length, response.getPerNodeTasks().size()); for (Map.Entry> entry : response.getPerNodeTasks().entrySet()) { @@ -389,7 +389,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase { } // Make sure that the main task on coordinating node is the task that was returned to us by execute() - listTasksRequest.actions("testAction"); // only pick the main task + listTasksRequest.setActions("testAction"); // only pick the main task response = testNode.transportListTasksAction.execute(listTasksRequest).get(); assertEquals(1, response.getTasks().size()); assertEquals(mainTask.getId(), response.getTasks().get(0).getId()); @@ -417,7 +417,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase { // Get the parent task ListTasksRequest listTasksRequest = new ListTasksRequest(); - listTasksRequest.actions("testAction"); + listTasksRequest.setActions("testAction"); ListTasksResponse response = testNode.transportListTasksAction.execute(listTasksRequest).get(); assertEquals(1, response.getTasks().size()); String parentNode = response.getTasks().get(0).getNode().getId(); @@ -425,7 +425,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase { // Find tasks with common parent listTasksRequest = new ListTasksRequest(); - listTasksRequest.parentTaskId(new TaskId(parentNode, parentTaskId)); + listTasksRequest.setParentTaskId(new TaskId(parentNode, parentTaskId)); response = testNode.transportListTasksAction.execute(listTasksRequest).get(); assertEquals(testNodes.length, response.getTasks().size()); for (TaskInfo task : response.getTasks()) { @@ -451,7 +451,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase { // Get the parent task ListTasksRequest listTasksRequest = new ListTasksRequest(); - listTasksRequest.actions("testAction*"); + listTasksRequest.setActions("testAction*"); ListTasksResponse response = testNode.transportListTasksAction.execute(listTasksRequest).get(); assertEquals(0, response.getTasks().size()); @@ -472,7 +472,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase { // Check task counts using transport with filtering TestNode testNode = testNodes[randomIntBetween(0, testNodes.length - 1)]; ListTasksRequest listTasksRequest = new ListTasksRequest(); - listTasksRequest.actions("testAction[n]"); // only pick node actions + listTasksRequest.setActions("testAction[n]"); // only pick node actions ListTasksResponse response = testNode.transportListTasksAction.execute(listTasksRequest).get(); assertEquals(testNodes.length, response.getPerNodeTasks().size()); for (Map.Entry> entry : response.getPerNodeTasks().entrySet()) { @@ -482,7 +482,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase { // Check task counts using transport with detailed description long minimalDurationNanos = System.nanoTime() - maximumStartTimeNanos; - listTasksRequest.detailed(true); // same request only with detailed description + listTasksRequest.setDetailed(true); // same request only with detailed description response = testNode.transportListTasksAction.execute(listTasksRequest).get(); assertEquals(testNodes.length, response.getPerNodeTasks().size()); for (Map.Entry> entry : response.getPerNodeTasks().entrySet()) { @@ -518,9 +518,9 @@ public class TransportTasksActionTests extends TaskManagerTestCase { // Try to cancel main task using action name CancelTasksRequest request = new CancelTasksRequest(); - request.nodesIds(testNodes[0].discoveryNode.getId()); - request.reason("Testing Cancellation"); - request.actions(actionName); + request.setNodesIds(testNodes[0].discoveryNode.getId()); + request.setReason("Testing Cancellation"); + request.setActions(actionName); CancelTasksResponse response = testNodes[randomIntBetween(0, testNodes.length - 1)].transportCancelTasksAction.execute(request) .get(); @@ -532,8 +532,8 @@ public class TransportTasksActionTests extends TaskManagerTestCase { // Try to cancel main task using id request = new CancelTasksRequest(); - request.reason("Testing Cancellation"); - request.taskId(new TaskId(testNodes[0].discoveryNode.getId(), task.getId())); + request.setReason("Testing Cancellation"); + request.setTaskId(new TaskId(testNodes[0].discoveryNode.getId(), task.getId())); response = testNodes[randomIntBetween(0, testNodes.length - 1)].transportCancelTasksAction.execute(request).get(); // Shouldn't match any tasks since testAction doesn't support cancellation @@ -544,7 +544,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase { // Make sure that task is still running ListTasksRequest listTasksRequest = new ListTasksRequest(); - listTasksRequest.actions(actionName); + listTasksRequest.setActions(actionName); ListTasksResponse listResponse = testNodes[randomIntBetween(0, testNodes.length - 1)].transportListTasksAction.execute (listTasksRequest).get(); assertEquals(1, listResponse.getPerNodeTasks().size()); @@ -617,7 +617,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase { // Run task action on node tasks that are currently running // should be successful on all nodes except one TestTasksRequest testTasksRequest = new TestTasksRequest(); - testTasksRequest.actions("testAction[n]"); // pick all test actions + testTasksRequest.setActions("testAction[n]"); // pick all test actions TestTasksResponse response = tasksActions[0].execute(testTasksRequest).get(); // Get successful responses from all nodes except one assertEquals(testNodes.length - 1, response.tasks.size()); diff --git a/modules/reindex/src/test/resources/rest-api-spec/test/reindex/10_basic.yaml b/modules/reindex/src/test/resources/rest-api-spec/test/reindex/10_basic.yaml index c23e5da95a1..7f84c1aac8b 100644 --- a/modules/reindex/src/test/resources/rest-api-spec/test/reindex/10_basic.yaml +++ b/modules/reindex/src/test/resources/rest-api-spec/test/reindex/10_basic.yaml @@ -58,9 +58,6 @@ --- "wait_for_completion=false": - - skip: - version: "0.0.0 - " - reason: breaks other tests by leaving a running reindex behind - do: index: index: source @@ -79,6 +76,7 @@ dest: index: dest - match: {task: '/.+:\d+/'} + - set: {task: task} - is_false: updated - is_false: version_conflicts - is_false: batches @@ -87,6 +85,11 @@ - is_false: took - is_false: created + - do: + tasks.list: + wait_for_completion: true + task_id: $task + --- "Response format for version conflict": - do: diff --git a/modules/reindex/src/test/resources/rest-api-spec/test/update-by-query/10_basic.yaml b/modules/reindex/src/test/resources/rest-api-spec/test/update-by-query/10_basic.yaml index 383e945bbf2..94ffa2349a9 100644 --- a/modules/reindex/src/test/resources/rest-api-spec/test/update-by-query/10_basic.yaml +++ b/modules/reindex/src/test/resources/rest-api-spec/test/update-by-query/10_basic.yaml @@ -37,6 +37,7 @@ wait_for_completion: false index: test - match: {task: '/.+:\d+/'} + - set: {task: task} - is_false: updated - is_false: version_conflicts - is_false: batches @@ -45,6 +46,11 @@ - is_false: took - is_false: created + - do: + tasks.list: + wait_for_completion: true + task_id: $task + --- "Response for version conflict": - do: diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/tasks.list.json b/rest-api-spec/src/main/resources/rest-api-spec/api/tasks.list.json index 7e8683b3475..5cdeed1b142 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/tasks.list.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/tasks.list.json @@ -31,6 +31,10 @@ "parent_task": { "type" : "number", "description" : "Return tasks with specified parent task id. Set to -1 to return all." + }, + "wait_for_completion": { + "type": "boolean", + "description": "Wait for the matching tasks to complete (default: false)" } } }, diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index 5684717342d..fbc518b136d 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -19,14 +19,34 @@ package org.elasticsearch.test.rest; -import com.carrotsearch.randomizedtesting.RandomizedTest; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.FileSystem; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + import org.apache.lucene.util.IOUtils; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction; import org.elasticsearch.common.Strings; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.rest.client.RestException; +import org.elasticsearch.test.rest.client.RestResponse; import org.elasticsearch.test.rest.parser.RestTestParseException; import org.elasticsearch.test.rest.parser.RestTestSuiteParser; import org.elasticsearch.test.rest.section.DoSection; @@ -42,24 +62,11 @@ import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; -import java.io.IOException; -import java.io.InputStream; -import java.net.InetSocketAddress; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.URL; -import java.nio.file.FileSystem; -import java.nio.file.FileSystems; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardCopyOption; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; +import com.carrotsearch.randomizedtesting.RandomizedTest; + +import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; +import static java.util.Collections.sort; /** * Runs the clients test suite against an elasticsearch cluster. @@ -261,7 +268,6 @@ public abstract class ESRestTestCase extends ESTestCase { @After public void wipeCluster() throws Exception { - // wipe indices Map deleteIndicesArgs = new HashMap<>(); deleteIndicesArgs.put("index", "*"); @@ -285,6 +291,30 @@ public abstract class ESRestTestCase extends ESTestCase { adminExecutionContext.callApi("snapshot.delete_repository", deleteSnapshotsArgs, Collections.emptyList(), Collections.emptyMap()); } + /** + * Logs a message if there are still running tasks. The reasoning is that any tasks still running are state the is trying to bleed into + * other tests. + */ + @After + public void logIfThereAreRunningTasks() throws InterruptedException, IOException, RestException { + RestResponse tasks = adminExecutionContext.callApi("tasks.list", emptyMap(), emptyList(), emptyMap()); + Set runningTasks = runningTasks(tasks); + // Ignore the task list API - it doens't count against us + runningTasks.remove(ListTasksAction.NAME); + runningTasks.remove(ListTasksAction.NAME + "[n]"); + if (runningTasks.isEmpty()) { + return; + } + List stillRunning = new ArrayList<>(runningTasks); + sort(stillRunning); + logger.info("There are still tasks running after this test that might break subsequent tests {}.", stillRunning); + /* + * This isn't a higher level log or outright failure because some of these tasks are run by the cluster in the background. If we + * could determine that some tasks are run by the user we'd fail the tests if those tasks were running and ignore any background + * tasks. + */ + } + @AfterClass public static void close() { if (restTestExecutionContext != null) { @@ -365,4 +395,19 @@ public abstract class ESRestTestCase extends ESTestCase { executableSection.execute(restTestExecutionContext); } } + + @SuppressWarnings("unchecked") + public Set runningTasks(RestResponse response) throws IOException { + Set runningTasks = new HashSet<>(); + Map nodes = (Map) response.evaluate("nodes"); + for (Map.Entry node : nodes.entrySet()) { + Map nodeInfo = (Map) node.getValue(); + Map nodeTasks = (Map) nodeInfo.get("tasks"); + for (Map.Entry taskAndName : nodeTasks.entrySet()) { + Map task = (Map) taskAndName.getValue(); + runningTasks.add(task.get("action").toString()); + } + } + return runningTasks; + } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/client/http/HttpRequestBuilder.java b/test/framework/src/main/java/org/elasticsearch/test/rest/client/http/HttpRequestBuilder.java index 6a484e9ae69..79f7502fb27 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/client/http/HttpRequestBuilder.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/client/http/HttpRequestBuilder.java @@ -114,9 +114,10 @@ public class HttpRequestBuilder { for (String pathPart : path) { try { finalPath.append('/'); - URI uri = new URI(null, null, null, -1, pathPart, null, null); + // We append "/" to the path part to handle parts that start with - or other invalid characters + URI uri = new URI(null, null, null, -1, "/" + pathPart, null, null); //manually escape any slash that each part may contain - finalPath.append(uri.getRawPath().replaceAll("/", "%2F")); + finalPath.append(uri.getRawPath().substring(1).replaceAll("/", "%2F")); } catch(URISyntaxException e) { throw new RuntimeException("unable to build uri", e); }