Teach list tasks api to wait for tasks to finish

_wait_for_completion defaults to false. If set to true then the API will
wait for all the tasks that it finds to stop running before returning. You
can use the timeout parameter to prevent it from waiting forever. If you
don't set a timeout parameter it'll default to 30 seconds.

Also adds a log message to rest tests if any tasks overrun the test. This
is just a log (instead of failing the test) because lots of tasks are run
by the cluster on its own and they shouldn't cause the test to fail. Things
like fetching disk usage from the other nodes, for example.

Switches the request to getter/setter style methods as we're going that
way in the Elasticsearch code base. Reindex is all getter/setter style.

Closes #16906
This commit is contained in:
Nik Everett 2016-03-02 11:18:44 -05:00
parent 13805c2a23
commit 6d0efae713
19 changed files with 305 additions and 99 deletions

View File

@ -53,12 +53,18 @@ public class CancelTasksRequest extends BaseTasksRequest<CancelTasksRequest> {
return super.match(task) && task instanceof CancellableTask;
}
public CancelTasksRequest reason(String reason) {
/**
* Set the reason for canceling the task.
*/
public CancelTasksRequest setReason(String reason) {
this.reason = reason;
return this;
}
public String reason() {
/**
* The reason for canceling the task.
*/
public String getReason() {
return reason;
}
}

View File

@ -84,21 +84,21 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
}
protected void processTasks(CancelTasksRequest request, Consumer<CancellableTask> operation) {
if (request.taskId().isSet() == false) {
if (request.getTaskId().isSet() == false) {
// we are only checking one task, we can optimize it
CancellableTask task = taskManager.getCancellableTask(request.taskId().getId());
CancellableTask task = taskManager.getCancellableTask(request.getTaskId().getId());
if (task != null) {
if (request.match(task)) {
operation.accept(task);
} else {
throw new IllegalArgumentException("task [" + request.taskId() + "] doesn't support this operation");
throw new IllegalArgumentException("task [" + request.getTaskId() + "] doesn't support this operation");
}
} else {
if (taskManager.getTask(request.taskId().getId()) != null) {
if (taskManager.getTask(request.getTaskId().getId()) != null) {
// The task exists, but doesn't support cancellation
throw new IllegalArgumentException("task [" + request.taskId() + "] doesn't support cancellation");
throw new IllegalArgumentException("task [" + request.getTaskId() + "] doesn't support cancellation");
} else {
throw new ResourceNotFoundException("task [{}] doesn't support cancellation", request.taskId());
throw new ResourceNotFoundException("task [{}] doesn't support cancellation", request.getTaskId());
}
}
} else {
@ -113,14 +113,14 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
@Override
protected synchronized TaskInfo taskOperation(CancelTasksRequest request, CancellableTask cancellableTask) {
final BanLock banLock = new BanLock(nodes -> removeBanOnNodes(cancellableTask, nodes));
Set<String> childNodes = taskManager.cancel(cancellableTask, request.reason(), banLock::onTaskFinished);
Set<String> childNodes = taskManager.cancel(cancellableTask, request.getReason(), banLock::onTaskFinished);
if (childNodes != null) {
if (childNodes.isEmpty()) {
logger.trace("cancelling task {} with no children", cancellableTask.getId());
return cancellableTask.taskInfo(clusterService.localNode(), false);
} else {
logger.trace("cancelling task {} with children on nodes [{}]", cancellableTask.getId(), childNodes);
setBanOnNodes(request.reason(), cancellableTask, childNodes, banLock);
setBanOnNodes(request.getReason(), cancellableTask, childNodes, banLock);
return cancellableTask.taskInfo(clusterService.localNode(), false);
}
} else {

View File

@ -31,31 +31,49 @@ import java.io.IOException;
public class ListTasksRequest extends BaseTasksRequest<ListTasksRequest> {
private boolean detailed = false;
private boolean waitForCompletion = false;
/**
* Should the detailed task information be returned.
*/
public boolean detailed() {
public boolean getDetailed() {
return this.detailed;
}
/**
* Should the detailed task information be returned.
*/
public ListTasksRequest detailed(boolean detailed) {
public ListTasksRequest setDetailed(boolean detailed) {
this.detailed = detailed;
return this;
}
/**
* Should this request wait for all found tasks to complete?
*/
public boolean getWaitForCompletion() {
return waitForCompletion;
}
/**
* Should this request wait for all found tasks to complete?
*/
public ListTasksRequest setWaitForCompletion(boolean waitForCompletion) {
this.waitForCompletion = waitForCompletion;
return this;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
detailed = in.readBoolean();
waitForCompletion = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(detailed);
out.writeBoolean(waitForCompletion);
}
}

View File

@ -35,7 +35,15 @@ public class ListTasksRequestBuilder extends TasksRequestBuilder<ListTasksReques
* Should detailed task information be returned.
*/
public ListTasksRequestBuilder setDetailed(boolean detailed) {
request.detailed(detailed);
request.setDetailed(detailed);
return this;
}
/**
* Should this request wait for all found tasks to complete?
*/
public final ListTasksRequestBuilder setWaitForCompletion(boolean waitForCompletion) {
request.setWaitForCompletion(waitForCompletion);
return this;
}
}

View File

@ -19,6 +19,8 @@
package org.elasticsearch.action.admin.cluster.node.tasks.list;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
@ -29,18 +31,24 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.function.Consumer;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
/**
*
*/
public class TransportListTasksAction extends TransportTasksAction<Task, ListTasksRequest, ListTasksResponse, TaskInfo> {
private static final TimeValue WAIT_FOR_COMPLETION_POLL = timeValueMillis(100);
private static final TimeValue DEFAULT_WAIT_FOR_COMPLETION_TIMEOUT = timeValueSeconds(30);
@Inject
public TransportListTasksAction(Settings settings, ClusterName clusterName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
@ -59,7 +67,34 @@ public class TransportListTasksAction extends TransportTasksAction<Task, ListTas
@Override
protected TaskInfo taskOperation(ListTasksRequest request, Task task) {
return task.taskInfo(clusterService.localNode(), request.detailed());
return task.taskInfo(clusterService.localNode(), request.getDetailed());
}
@Override
protected void processTasks(ListTasksRequest request, Consumer<Task> operation) {
if (false == request.getWaitForCompletion()) {
super.processTasks(request, operation);
return;
}
// If we should wait for completion then we have to intercept every found task and wait for it to leave the manager.
TimeValue timeout = request.getTimeout();
if (timeout == null) {
timeout = DEFAULT_WAIT_FOR_COMPLETION_TIMEOUT;
}
long timeoutTime = System.nanoTime() + timeout.nanos();
super.processTasks(request, operation.andThen((Task t) -> {
while (System.nanoTime() - timeoutTime < 0) {
if (taskManager.getTask(t.getId()) == null) {
return;
}
try {
Thread.sleep(WAIT_FOR_COMPLETION_POLL.millis());
} catch (InterruptedException e) {
throw new ElasticsearchException("Interrupted waiting for completion of [{}]", e, t);
}
}
throw new ElasticsearchTimeoutException("Timed out waiting for completion of [{}]", t);
}));
}
@Override

View File

@ -71,7 +71,7 @@ public class BaseTasksRequest<Request extends BaseTasksRequest<Request>> extends
* Sets the list of action masks for the actions that should be returned
*/
@SuppressWarnings("unchecked")
public final Request actions(String... actions) {
public final Request setActions(String... actions) {
this.actions = actions;
return (Request) this;
}
@ -79,16 +79,16 @@ public class BaseTasksRequest<Request extends BaseTasksRequest<Request>> extends
/**
* Return the list of action masks for the actions that should be returned
*/
public String[] actions() {
public String[] getActions() {
return actions;
}
public final String[] nodesIds() {
public final String[] getNodesIds() {
return nodesIds;
}
@SuppressWarnings("unchecked")
public final Request nodesIds(String... nodesIds) {
public final Request setNodesIds(String... nodesIds) {
this.nodesIds = nodesIds;
return (Request) this;
}
@ -98,12 +98,12 @@ public class BaseTasksRequest<Request extends BaseTasksRequest<Request>> extends
*
* By default tasks with any ids are returned.
*/
public TaskId taskId() {
public TaskId getTaskId() {
return taskId;
}
@SuppressWarnings("unchecked")
public final Request taskId(TaskId taskId) {
public final Request setTaskId(TaskId taskId) {
this.taskId = taskId;
return (Request) this;
}
@ -112,29 +112,29 @@ public class BaseTasksRequest<Request extends BaseTasksRequest<Request>> extends
/**
* Returns the parent task id that tasks should be filtered by
*/
public TaskId parentTaskId() {
public TaskId getParentTaskId() {
return parentTaskId;
}
@SuppressWarnings("unchecked")
public Request parentTaskId(TaskId parentTaskId) {
public Request setParentTaskId(TaskId parentTaskId) {
this.parentTaskId = parentTaskId;
return (Request) this;
}
public TimeValue timeout() {
public TimeValue getTimeout() {
return this.timeout;
}
@SuppressWarnings("unchecked")
public final Request timeout(TimeValue timeout) {
public final Request setTimeout(TimeValue timeout) {
this.timeout = timeout;
return (Request) this;
}
@SuppressWarnings("unchecked")
public final Request timeout(String timeout) {
public final Request setTimeout(String timeout) {
this.timeout = TimeValue.parseTimeValue(timeout, null, getClass().getSimpleName() + ".timeout");
return (Request) this;
}
@ -162,11 +162,11 @@ public class BaseTasksRequest<Request extends BaseTasksRequest<Request>> extends
}
public boolean match(Task task) {
if (actions() != null && actions().length > 0 && Regex.simpleMatch(actions(), task.getAction()) == false) {
if (getActions() != null && getActions().length > 0 && Regex.simpleMatch(getActions(), task.getAction()) == false) {
return false;
}
if (taskId().isSet() == false) {
if(taskId().getId() != task.getId()) {
if (getTaskId().isSet() == false) {
if(getTaskId().getId() != task.getId()) {
return false;
}
}

View File

@ -35,19 +35,19 @@ public class TasksRequestBuilder <Request extends BaseTasksRequest<Request>, Res
@SuppressWarnings("unchecked")
public final RequestBuilder setNodesIds(String... nodesIds) {
request.nodesIds(nodesIds);
request.setNodesIds(nodesIds);
return (RequestBuilder) this;
}
@SuppressWarnings("unchecked")
public final RequestBuilder setActions(String... actions) {
request.actions(actions);
request.setActions(actions);
return (RequestBuilder) this;
}
@SuppressWarnings("unchecked")
public final RequestBuilder setTimeout(TimeValue timeout) {
request.timeout(timeout);
request.setTimeout(timeout);
return (RequestBuilder) this;
}
}

View File

@ -124,25 +124,25 @@ public abstract class TransportTasksAction<
}
protected String[] resolveNodes(TasksRequest request, ClusterState clusterState) {
if (request.taskId().isSet()) {
return clusterState.nodes().resolveNodesIds(request.nodesIds());
if (request.getTaskId().isSet()) {
return clusterState.nodes().resolveNodesIds(request.getNodesIds());
} else {
return new String[]{request.taskId().getNodeId()};
return new String[]{request.getTaskId().getNodeId()};
}
}
protected void processTasks(TasksRequest request, Consumer<OperationTask> operation) {
if (request.taskId().isSet() == false) {
if (request.getTaskId().isSet() == false) {
// we are only checking one task, we can optimize it
Task task = taskManager.getTask(request.taskId().getId());
Task task = taskManager.getTask(request.getTaskId().getId());
if (task != null) {
if (request.match(task)) {
operation.accept((OperationTask) task);
} else {
throw new ResourceNotFoundException("task [{}] doesn't support this operation", request.taskId());
throw new ResourceNotFoundException("task [{}] doesn't support this operation", request.getTaskId());
}
} else {
throw new ResourceNotFoundException("task [{}] is missing", request.taskId());
throw new ResourceNotFoundException("task [{}] is missing", request.getTaskId());
}
} else {
for (Task task : taskManager.getTasks().values()) {
@ -224,8 +224,8 @@ public abstract class TransportTasksAction<
}
} else {
TransportRequestOptions.Builder builder = TransportRequestOptions.builder();
if (request.timeout() != null) {
builder.withTimeout(request.timeout());
if (request.getTimeout() != null) {
builder.withTimeout(request.getTimeout());
}
builder.withCompress(transportCompress());
for (int i = 0; i < nodesIds.length; i++) {

View File

@ -52,10 +52,10 @@ public class RestCancelTasksAction extends BaseRestHandler {
TaskId parentTaskId = new TaskId(request.param("parent_task_id"));
CancelTasksRequest cancelTasksRequest = new CancelTasksRequest();
cancelTasksRequest.taskId(taskId);
cancelTasksRequest.nodesIds(nodesIds);
cancelTasksRequest.actions(actions);
cancelTasksRequest.parentTaskId(parentTaskId);
cancelTasksRequest.setTaskId(taskId);
cancelTasksRequest.setNodesIds(nodesIds);
cancelTasksRequest.setActions(actions);
cancelTasksRequest.setParentTaskId(parentTaskId);
client.admin().cluster().cancelTasks(cancelTasksRequest, new RestToXContentListener<>(channel));
}
}

View File

@ -50,13 +50,15 @@ public class RestListTasksAction extends BaseRestHandler {
TaskId taskId = new TaskId(request.param("taskId"));
String[] actions = Strings.splitStringByCommaToArray(request.param("actions"));
TaskId parentTaskId = new TaskId(request.param("parent_task_id"));
boolean waitForCompletion = request.paramAsBoolean("wait_for_completion", false);
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.taskId(taskId);
listTasksRequest.nodesIds(nodesIds);
listTasksRequest.detailed(detailed);
listTasksRequest.actions(actions);
listTasksRequest.parentTaskId(parentTaskId);
listTasksRequest.setTaskId(taskId);
listTasksRequest.setNodesIds(nodesIds);
listTasksRequest.setDetailed(detailed);
listTasksRequest.setActions(actions);
listTasksRequest.setParentTaskId(parentTaskId);
listTasksRequest.setWaitForCompletion(waitForCompletion);
client.admin().cluster().listTasks(listTasksRequest, new RestToXContentListener<>(channel));
}
}

View File

@ -237,8 +237,8 @@ public class CancellableTasksTests extends TaskManagerTestCase {
// Cancel main task
CancelTasksRequest request = new CancelTasksRequest();
request.reason("Testing Cancellation");
request.taskId(new TaskId(testNodes[0].discoveryNode.getId(), mainTask.getId()));
request.setReason("Testing Cancellation");
request.setTaskId(new TaskId(testNodes[0].discoveryNode.getId(), mainTask.getId()));
// And send the cancellation request to a random node
CancelTasksResponse response = testNodes[randomIntBetween(0, testNodes.length - 1)].transportCancelTasksAction.execute(request)
.get();
@ -270,7 +270,7 @@ public class CancellableTasksTests extends TaskManagerTestCase {
// Make sure that tasks are no longer running
ListTasksResponse listTasksResponse = testNodes[randomIntBetween(0, testNodes.length - 1)]
.transportListTasksAction.execute(new ListTasksRequest().taskId(
.transportListTasksAction.execute(new ListTasksRequest().setTaskId(
new TaskId(testNodes[0].discoveryNode.getId(), mainTask.getId()))).get();
assertEquals(0, listTasksResponse.getTasks().size());
@ -313,7 +313,7 @@ public class CancellableTasksTests extends TaskManagerTestCase {
// Make sure that tasks are running
ListTasksResponse listTasksResponse = testNodes[randomIntBetween(0, testNodes.length - 1)]
.transportListTasksAction.execute(new ListTasksRequest().parentTaskId(new TaskId(mainNode, mainTask.getId()))).get();
.transportListTasksAction.execute(new ListTasksRequest().setParentTaskId(new TaskId(mainNode, mainTask.getId()))).get();
assertThat(listTasksResponse.getTasks().size(), greaterThanOrEqualTo(blockOnNodes.size()));
// Simulate the coordinating node leaving the cluster
@ -331,8 +331,8 @@ public class CancellableTasksTests extends TaskManagerTestCase {
logger.info("--> Simulate issuing cancel request on the node that is about to leave the cluster");
// Simulate issuing cancel request on the node that is about to leave the cluster
CancelTasksRequest request = new CancelTasksRequest();
request.reason("Testing Cancellation");
request.taskId(new TaskId(testNodes[0].discoveryNode.getId(), mainTask.getId()));
request.setReason("Testing Cancellation");
request.setTaskId(new TaskId(testNodes[0].discoveryNode.getId(), mainTask.getId()));
// And send the cancellation request to a random node
CancelTasksResponse response = testNodes[0].transportCancelTasksAction.execute(request).get();
logger.info("--> Done simulating issuing cancel request on the node that is about to leave the cluster");
@ -356,7 +356,7 @@ public class CancellableTasksTests extends TaskManagerTestCase {
// Make sure that tasks are no longer running
try {
ListTasksResponse listTasksResponse1 = testNodes[randomIntBetween(1, testNodes.length - 1)]
.transportListTasksAction.execute(new ListTasksRequest().taskId(new TaskId(mainNode, mainTask.getId()))).get();
.transportListTasksAction.execute(new ListTasksRequest().setTaskId(new TaskId(mainNode, mainTask.getId()))).get();
assertEquals(0, listTasksResponse1.getTasks().size());
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();

View File

@ -18,6 +18,8 @@
*/
package org.elasticsearch.action.admin.cluster.node.tasks;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
@ -54,8 +56,10 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.hamcrest.Matchers.emptyCollectionOf;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
@ -327,6 +331,77 @@ public class TasksIT extends ESIntegTestCase {
assertEquals(0, client().admin().cluster().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").get().getTasks().size());
}
public void testTasksListWaitForCompletion() throws Exception {
// Start blocking test task
ListenableActionFuture<TestTaskPlugin.NodesResponse> future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client())
.execute();
ListenableActionFuture<ListTasksResponse> waitResponseFuture;
try {
// Wait for the task to start on all nodes
assertBusy(() -> assertEquals(internalCluster().numDataAndMasterNodes(),
client().admin().cluster().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").get().getTasks().size()));
// Spin up a request to wait for that task to finish
waitResponseFuture = client().admin().cluster().prepareListTasks()
.setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").setWaitForCompletion(true).execute();
} finally {
// Unblock the request so the wait for completion request can finish
TestTaskPlugin.UnblockTestTasksAction.INSTANCE.newRequestBuilder(client()).get();
}
// Now that the task is unblocked the list response will come back
ListTasksResponse waitResponse = waitResponseFuture.get();
// If any tasks come back then they are the tasks we asked for - it'd be super weird if this wasn't true
for (TaskInfo task: waitResponse.getTasks()) {
assertEquals(task.getAction(), TestTaskPlugin.TestTaskAction.NAME + "[n]");
}
// See the next test to cover the timeout case
future.get();
}
public void testTasksListWaitForTimeout() throws Exception {
// Start blocking test task
ListenableActionFuture<TestTaskPlugin.NodesResponse> future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client())
.execute();
try {
// Wait for the task to start on all nodes
assertBusy(() -> assertEquals(internalCluster().numDataAndMasterNodes(),
client().admin().cluster().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").get().getTasks().size()));
// Spin up a request that should wait for those tasks to finish
// It will timeout because we haven't unblocked the tasks
ListTasksResponse waitResponse = client().admin().cluster().prepareListTasks()
.setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").setWaitForCompletion(true).setTimeout(timeValueMillis(100))
.get();
assertFalse(waitResponse.getNodeFailures().isEmpty());
for (FailedNodeException failure : waitResponse.getNodeFailures()) {
Throwable timeoutException = failure.getCause();
// The exception sometimes comes back wrapped depending on the client
if (timeoutException.getCause() != null) {
timeoutException = timeoutException.getCause();
}
assertThat(failure.getCause().getCause(), instanceOf(ElasticsearchTimeoutException.class));
}
} finally {
// Now we can unblock those requests
TestTaskPlugin.UnblockTestTasksAction.INSTANCE.newRequestBuilder(client()).get();
}
future.get();
}
public void testTasksListWaitForNoTask() throws Exception {
// Spin up a request to wait for no matching tasks
ListenableActionFuture<ListTasksResponse> waitResponseFuture = client().admin().cluster().prepareListTasks()
.setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").setWaitForCompletion(true).setTimeout(timeValueMillis(10))
.execute();
// It should finish quickly and without complaint
assertThat(waitResponseFuture.get().getTasks(), emptyCollectionOf(TaskInfo.class));
}
@Override
public void tearDown() throws Exception {
for (Map.Entry<Tuple<String, String>, RecordingTaskManagerListener> entry : listeners.entrySet()) {

View File

@ -345,7 +345,10 @@ public class TestTaskPlugin extends Plugin {
public static class UnblockTestTasksRequest extends BaseTasksRequest<UnblockTestTasksRequest> {
@Override
public boolean match(Task task) {
return task instanceof TestTask && super.match(task);
}
}
public static class UnblockTestTasksResponse extends BaseTasksResponse {

View File

@ -355,7 +355,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
int testNodeNum = randomIntBetween(0, testNodes.length - 1);
TestNode testNode = testNodes[testNodeNum];
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.actions("testAction*"); // pick all test actions
listTasksRequest.setActions("testAction*"); // pick all test actions
logger.info("Listing currently running tasks using node [{}]", testNodeNum);
ListTasksResponse response = testNode.transportListTasksAction.execute(listTasksRequest).get();
logger.info("Checking currently running tasks");
@ -371,7 +371,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
// Check task counts using transport with filtering
testNode = testNodes[randomIntBetween(0, testNodes.length - 1)];
listTasksRequest = new ListTasksRequest();
listTasksRequest.actions("testAction[n]"); // only pick node actions
listTasksRequest.setActions("testAction[n]"); // only pick node actions
response = testNode.transportListTasksAction.execute(listTasksRequest).get();
assertEquals(testNodes.length, response.getPerNodeTasks().size());
for (Map.Entry<DiscoveryNode, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) {
@ -380,7 +380,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
}
// Check task counts using transport with detailed description
listTasksRequest.detailed(true); // same request only with detailed description
listTasksRequest.setDetailed(true); // same request only with detailed description
response = testNode.transportListTasksAction.execute(listTasksRequest).get();
assertEquals(testNodes.length, response.getPerNodeTasks().size());
for (Map.Entry<DiscoveryNode, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) {
@ -389,7 +389,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
}
// Make sure that the main task on coordinating node is the task that was returned to us by execute()
listTasksRequest.actions("testAction"); // only pick the main task
listTasksRequest.setActions("testAction"); // only pick the main task
response = testNode.transportListTasksAction.execute(listTasksRequest).get();
assertEquals(1, response.getTasks().size());
assertEquals(mainTask.getId(), response.getTasks().get(0).getId());
@ -417,7 +417,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
// Get the parent task
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.actions("testAction");
listTasksRequest.setActions("testAction");
ListTasksResponse response = testNode.transportListTasksAction.execute(listTasksRequest).get();
assertEquals(1, response.getTasks().size());
String parentNode = response.getTasks().get(0).getNode().getId();
@ -425,7 +425,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
// Find tasks with common parent
listTasksRequest = new ListTasksRequest();
listTasksRequest.parentTaskId(new TaskId(parentNode, parentTaskId));
listTasksRequest.setParentTaskId(new TaskId(parentNode, parentTaskId));
response = testNode.transportListTasksAction.execute(listTasksRequest).get();
assertEquals(testNodes.length, response.getTasks().size());
for (TaskInfo task : response.getTasks()) {
@ -451,7 +451,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
// Get the parent task
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.actions("testAction*");
listTasksRequest.setActions("testAction*");
ListTasksResponse response = testNode.transportListTasksAction.execute(listTasksRequest).get();
assertEquals(0, response.getTasks().size());
@ -472,7 +472,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
// Check task counts using transport with filtering
TestNode testNode = testNodes[randomIntBetween(0, testNodes.length - 1)];
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.actions("testAction[n]"); // only pick node actions
listTasksRequest.setActions("testAction[n]"); // only pick node actions
ListTasksResponse response = testNode.transportListTasksAction.execute(listTasksRequest).get();
assertEquals(testNodes.length, response.getPerNodeTasks().size());
for (Map.Entry<DiscoveryNode, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) {
@ -482,7 +482,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
// Check task counts using transport with detailed description
long minimalDurationNanos = System.nanoTime() - maximumStartTimeNanos;
listTasksRequest.detailed(true); // same request only with detailed description
listTasksRequest.setDetailed(true); // same request only with detailed description
response = testNode.transportListTasksAction.execute(listTasksRequest).get();
assertEquals(testNodes.length, response.getPerNodeTasks().size());
for (Map.Entry<DiscoveryNode, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) {
@ -518,9 +518,9 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
// Try to cancel main task using action name
CancelTasksRequest request = new CancelTasksRequest();
request.nodesIds(testNodes[0].discoveryNode.getId());
request.reason("Testing Cancellation");
request.actions(actionName);
request.setNodesIds(testNodes[0].discoveryNode.getId());
request.setReason("Testing Cancellation");
request.setActions(actionName);
CancelTasksResponse response = testNodes[randomIntBetween(0, testNodes.length - 1)].transportCancelTasksAction.execute(request)
.get();
@ -532,8 +532,8 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
// Try to cancel main task using id
request = new CancelTasksRequest();
request.reason("Testing Cancellation");
request.taskId(new TaskId(testNodes[0].discoveryNode.getId(), task.getId()));
request.setReason("Testing Cancellation");
request.setTaskId(new TaskId(testNodes[0].discoveryNode.getId(), task.getId()));
response = testNodes[randomIntBetween(0, testNodes.length - 1)].transportCancelTasksAction.execute(request).get();
// Shouldn't match any tasks since testAction doesn't support cancellation
@ -544,7 +544,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
// Make sure that task is still running
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.actions(actionName);
listTasksRequest.setActions(actionName);
ListTasksResponse listResponse = testNodes[randomIntBetween(0, testNodes.length - 1)].transportListTasksAction.execute
(listTasksRequest).get();
assertEquals(1, listResponse.getPerNodeTasks().size());
@ -617,7 +617,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
// Run task action on node tasks that are currently running
// should be successful on all nodes except one
TestTasksRequest testTasksRequest = new TestTasksRequest();
testTasksRequest.actions("testAction[n]"); // pick all test actions
testTasksRequest.setActions("testAction[n]"); // pick all test actions
TestTasksResponse response = tasksActions[0].execute(testTasksRequest).get();
// Get successful responses from all nodes except one
assertEquals(testNodes.length - 1, response.tasks.size());

View File

@ -58,9 +58,6 @@
---
"wait_for_completion=false":
- skip:
version: "0.0.0 - "
reason: breaks other tests by leaving a running reindex behind
- do:
index:
index: source
@ -79,6 +76,7 @@
dest:
index: dest
- match: {task: '/.+:\d+/'}
- set: {task: task}
- is_false: updated
- is_false: version_conflicts
- is_false: batches
@ -87,6 +85,11 @@
- is_false: took
- is_false: created
- do:
tasks.list:
wait_for_completion: true
task_id: $task
---
"Response format for version conflict":
- do:

View File

@ -37,6 +37,7 @@
wait_for_completion: false
index: test
- match: {task: '/.+:\d+/'}
- set: {task: task}
- is_false: updated
- is_false: version_conflicts
- is_false: batches
@ -45,6 +46,11 @@
- is_false: took
- is_false: created
- do:
tasks.list:
wait_for_completion: true
task_id: $task
---
"Response for version conflict":
- do:

View File

@ -31,6 +31,10 @@
"parent_task": {
"type" : "number",
"description" : "Return tasks with specified parent task id. Set to -1 to return all."
},
"wait_for_completion": {
"type": "boolean",
"description": "Wait for the matching tasks to complete (default: false)"
}
}
},

View File

@ -19,14 +19,34 @@
package org.elasticsearch.test.rest;
import com.carrotsearch.randomizedtesting.RandomizedTest;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.rest.client.RestException;
import org.elasticsearch.test.rest.client.RestResponse;
import org.elasticsearch.test.rest.parser.RestTestParseException;
import org.elasticsearch.test.rest.parser.RestTestSuiteParser;
import org.elasticsearch.test.rest.section.DoSection;
@ -42,24 +62,11 @@ import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.carrotsearch.randomizedtesting.RandomizedTest;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.sort;
/**
* Runs the clients test suite against an elasticsearch cluster.
@ -261,7 +268,6 @@ public abstract class ESRestTestCase extends ESTestCase {
@After
public void wipeCluster() throws Exception {
// wipe indices
Map<String, String> deleteIndicesArgs = new HashMap<>();
deleteIndicesArgs.put("index", "*");
@ -285,6 +291,30 @@ public abstract class ESRestTestCase extends ESTestCase {
adminExecutionContext.callApi("snapshot.delete_repository", deleteSnapshotsArgs, Collections.emptyList(), Collections.emptyMap());
}
/**
* Logs a message if there are still running tasks. The reasoning is that any tasks still running are state the is trying to bleed into
* other tests.
*/
@After
public void logIfThereAreRunningTasks() throws InterruptedException, IOException, RestException {
RestResponse tasks = adminExecutionContext.callApi("tasks.list", emptyMap(), emptyList(), emptyMap());
Set<String> runningTasks = runningTasks(tasks);
// Ignore the task list API - it doens't count against us
runningTasks.remove(ListTasksAction.NAME);
runningTasks.remove(ListTasksAction.NAME + "[n]");
if (runningTasks.isEmpty()) {
return;
}
List<String> stillRunning = new ArrayList<>(runningTasks);
sort(stillRunning);
logger.info("There are still tasks running after this test that might break subsequent tests {}.", stillRunning);
/*
* This isn't a higher level log or outright failure because some of these tasks are run by the cluster in the background. If we
* could determine that some tasks are run by the user we'd fail the tests if those tasks were running and ignore any background
* tasks.
*/
}
@AfterClass
public static void close() {
if (restTestExecutionContext != null) {
@ -365,4 +395,19 @@ public abstract class ESRestTestCase extends ESTestCase {
executableSection.execute(restTestExecutionContext);
}
}
@SuppressWarnings("unchecked")
public Set<String> runningTasks(RestResponse response) throws IOException {
Set<String> runningTasks = new HashSet<>();
Map<String, Object> nodes = (Map<String, Object>) response.evaluate("nodes");
for (Map.Entry<String, Object> node : nodes.entrySet()) {
Map<String, Object> nodeInfo = (Map<String, Object>) node.getValue();
Map<String, Object> nodeTasks = (Map<String, Object>) nodeInfo.get("tasks");
for (Map.Entry<String, Object> taskAndName : nodeTasks.entrySet()) {
Map<String, Object> task = (Map<String, Object>) taskAndName.getValue();
runningTasks.add(task.get("action").toString());
}
}
return runningTasks;
}
}

View File

@ -114,9 +114,10 @@ public class HttpRequestBuilder {
for (String pathPart : path) {
try {
finalPath.append('/');
URI uri = new URI(null, null, null, -1, pathPart, null, null);
// We append "/" to the path part to handle parts that start with - or other invalid characters
URI uri = new URI(null, null, null, -1, "/" + pathPart, null, null);
//manually escape any slash that each part may contain
finalPath.append(uri.getRawPath().replaceAll("/", "%2F"));
finalPath.append(uri.getRawPath().substring(1).replaceAll("/", "%2F"));
} catch(URISyntaxException e) {
throw new RuntimeException("unable to build uri", e);
}