Core: Remove plain execute method on TransportAction (#30998)

TransportAction has many variants of execute. One of those variants
executes by returning a future, which is then often blocked on by
calling get(). This commit removes this variant of execute, instead
using a helper method for tests that want to block, or having tests
pass in a PlainActionFuture directly as a listener.

Co-authored-by: Simon Willnauer <simonw@apache.org>
This commit is contained in:
Ryan Ernst 2018-06-13 09:58:13 +02:00 committed by Simon Willnauer
parent 1f6e874002
commit a65b18f19d
9 changed files with 110 additions and 70 deletions

View File

@ -55,12 +55,6 @@ public abstract class TransportAction<Request extends ActionRequest, Response ex
this.taskManager = taskManager; this.taskManager = taskManager;
} }
public final ActionFuture<Response> execute(Request request) {
PlainActionFuture<Response> future = newFuture();
execute(request, future);
return future;
}
/** /**
* Use this method when the transport action call should result in creation of a new task associated with the call. * Use this method when the transport action call should result in creation of a new task associated with the call.
* *

View File

@ -23,6 +23,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.nodes.BaseNodeRequest; import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.BaseNodeResponse; import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.action.support.nodes.BaseNodesRequest; import org.elasticsearch.action.support.nodes.BaseNodesRequest;
@ -65,7 +66,9 @@ public class TransportNodesListGatewayMetaState extends TransportNodesAction<Tra
} }
public ActionFuture<NodesGatewayMetaState> list(String[] nodesIds, @Nullable TimeValue timeout) { public ActionFuture<NodesGatewayMetaState> list(String[] nodesIds, @Nullable TimeValue timeout) {
return execute(new Request(nodesIds).timeout(timeout)); PlainActionFuture<NodesGatewayMetaState> future = PlainActionFuture.newFuture();
execute(new Request(nodesIds).timeout(timeout), future);
return future;
} }
@Override @Override

View File

@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksReque
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.nodes.BaseNodeRequest; import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.BaseNodesRequest; import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
@ -254,8 +255,8 @@ public class CancellableTasksTests extends TaskManagerTestCase {
request.setReason("Testing Cancellation"); request.setReason("Testing Cancellation");
request.setTaskId(new TaskId(testNodes[0].getNodeId(), mainTask.getId())); request.setTaskId(new TaskId(testNodes[0].getNodeId(), mainTask.getId()));
// And send the cancellation request to a random node // And send the cancellation request to a random node
CancelTasksResponse response = testNodes[randomIntBetween(0, testNodes.length - 1)].transportCancelTasksAction.execute(request) CancelTasksResponse response = ActionTestUtils.executeBlocking(
.get(); testNodes[randomIntBetween(0, testNodes.length - 1)].transportCancelTasksAction, request);
// Awaiting for the main task to finish // Awaiting for the main task to finish
responseLatch.await(); responseLatch.await();
@ -287,9 +288,9 @@ public class CancellableTasksTests extends TaskManagerTestCase {
} }
// Make sure that tasks are no longer running // Make sure that tasks are no longer running
ListTasksResponse listTasksResponse = testNodes[randomIntBetween(0, testNodes.length - 1)] ListTasksResponse listTasksResponse = ActionTestUtils.executeBlocking(
.transportListTasksAction.execute(new ListTasksRequest().setTaskId( testNodes[randomIntBetween(0, testNodes.length - 1)].transportListTasksAction,
new TaskId(testNodes[0].getNodeId(), mainTask.getId()))).get(); new ListTasksRequest().setTaskId(new TaskId(testNodes[0].getNodeId(), mainTask.getId())));
assertEquals(0, listTasksResponse.getTasks().size()); assertEquals(0, listTasksResponse.getTasks().size());
// Make sure that there are no leftover bans, the ban removal is async, so we might return from the cancellation // Make sure that there are no leftover bans, the ban removal is async, so we might return from the cancellation
@ -326,8 +327,8 @@ public class CancellableTasksTests extends TaskManagerTestCase {
request.setReason("Testing Cancellation"); request.setReason("Testing Cancellation");
request.setParentTaskId(new TaskId(testNodes[0].getNodeId(), mainTask.getId())); request.setParentTaskId(new TaskId(testNodes[0].getNodeId(), mainTask.getId()));
// And send the cancellation request to a random node // And send the cancellation request to a random node
CancelTasksResponse response = testNodes[randomIntBetween(1, testNodes.length - 1)].transportCancelTasksAction.execute(request) CancelTasksResponse response = ActionTestUtils.executeBlocking(
.get(); testNodes[randomIntBetween(1, testNodes.length - 1)].transportCancelTasksAction, request);
// Awaiting for the main task to finish // Awaiting for the main task to finish
responseLatch.await(); responseLatch.await();
@ -336,16 +337,11 @@ public class CancellableTasksTests extends TaskManagerTestCase {
assertThat(response.getTasks().size(), equalTo(testNodes.length)); assertThat(response.getTasks().size(), equalTo(testNodes.length));
assertBusy(() -> { assertBusy(() -> {
try {
// Make sure that main task is no longer running // Make sure that main task is no longer running
ListTasksResponse listTasksResponse = testNodes[randomIntBetween(0, testNodes.length - 1)] ListTasksResponse listTasksResponse = ActionTestUtils.executeBlocking(
.transportListTasksAction.execute(new ListTasksRequest().setTaskId( testNodes[randomIntBetween(0, testNodes.length - 1)].transportListTasksAction,
new TaskId(testNodes[0].getNodeId(), mainTask.getId()))).get(); new ListTasksRequest().setTaskId(new TaskId(testNodes[0].getNodeId(), mainTask.getId())));
assertEquals(0, listTasksResponse.getTasks().size()); assertEquals(0, listTasksResponse.getTasks().size());
} catch (ExecutionException | InterruptedException ex) {
throw new RuntimeException(ex);
}
}); });
} }
@ -378,8 +374,9 @@ public class CancellableTasksTests extends TaskManagerTestCase {
String mainNode = testNodes[0].getNodeId(); String mainNode = testNodes[0].getNodeId();
// Make sure that tasks are running // Make sure that tasks are running
ListTasksResponse listTasksResponse = testNodes[randomIntBetween(0, testNodes.length - 1)] ListTasksResponse listTasksResponse = ActionTestUtils.executeBlocking(
.transportListTasksAction.execute(new ListTasksRequest().setParentTaskId(new TaskId(mainNode, mainTask.getId()))).get(); testNodes[randomIntBetween(0, testNodes.length - 1)].transportListTasksAction,
new ListTasksRequest().setParentTaskId(new TaskId(mainNode, mainTask.getId())));
assertThat(listTasksResponse.getTasks().size(), greaterThanOrEqualTo(blockOnNodes.size())); assertThat(listTasksResponse.getTasks().size(), greaterThanOrEqualTo(blockOnNodes.size()));
// Simulate the coordinating node leaving the cluster // Simulate the coordinating node leaving the cluster
@ -400,7 +397,7 @@ public class CancellableTasksTests extends TaskManagerTestCase {
request.setReason("Testing Cancellation"); request.setReason("Testing Cancellation");
request.setTaskId(new TaskId(testNodes[0].getNodeId(), mainTask.getId())); request.setTaskId(new TaskId(testNodes[0].getNodeId(), mainTask.getId()));
// And send the cancellation request to a random node // And send the cancellation request to a random node
CancelTasksResponse response = testNodes[0].transportCancelTasksAction.execute(request).get(); CancelTasksResponse response = ActionTestUtils.executeBlocking(testNodes[0].transportCancelTasksAction, request);
logger.info("--> Done simulating issuing cancel request on the node that is about to leave the cluster"); logger.info("--> Done simulating issuing cancel request on the node that is about to leave the cluster");
// This node still thinks that's part of the cluster, so cancelling should look successful // This node still thinks that's part of the cluster, so cancelling should look successful
if (response.getTasks().size() == 0) { if (response.getTasks().size() == 0) {
@ -420,15 +417,10 @@ public class CancellableTasksTests extends TaskManagerTestCase {
assertBusy(() -> { assertBusy(() -> {
// Make sure that tasks are no longer running // Make sure that tasks are no longer running
try { ListTasksResponse listTasksResponse1 = ActionTestUtils.executeBlocking(
ListTasksResponse listTasksResponse1 = testNodes[randomIntBetween(1, testNodes.length - 1)] testNodes[randomIntBetween(1, testNodes.length - 1)].transportListTasksAction,
.transportListTasksAction.execute(new ListTasksRequest().setTaskId(new TaskId(mainNode, mainTask.getId()))).get(); new ListTasksRequest().setTaskId(new TaskId(mainNode, mainTask.getId())));
assertEquals(0, listTasksResponse1.getTasks().size()); assertEquals(0, listTasksResponse1.getTasks().size());
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
} catch (ExecutionException ex2) {
fail("shouldn't be here");
}
}); });
// Wait for clean up // Wait for clean up

View File

@ -18,6 +18,7 @@
*/ */
package org.elasticsearch.action.admin.cluster.node.tasks; package org.elasticsearch.action.admin.cluster.node.tasks;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.FailedNodeException;
@ -29,6 +30,7 @@ import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup; import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.nodes.BaseNodeRequest; import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.BaseNodesRequest; import org.elasticsearch.action.support.nodes.BaseNodesRequest;
@ -363,7 +365,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
ListTasksRequest listTasksRequest = new ListTasksRequest(); ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setActions("testAction*"); // pick all test actions listTasksRequest.setActions("testAction*"); // pick all test actions
logger.info("Listing currently running tasks using node [{}]", testNodeNum); logger.info("Listing currently running tasks using node [{}]", testNodeNum);
ListTasksResponse response = testNode.transportListTasksAction.execute(listTasksRequest).get(); ListTasksResponse response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest);
logger.info("Checking currently running tasks"); logger.info("Checking currently running tasks");
assertEquals(testNodes.length, response.getPerNodeTasks().size()); assertEquals(testNodes.length, response.getPerNodeTasks().size());
@ -382,7 +384,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
testNode = testNodes[randomIntBetween(0, testNodes.length - 1)]; testNode = testNodes[randomIntBetween(0, testNodes.length - 1)];
listTasksRequest = new ListTasksRequest(); listTasksRequest = new ListTasksRequest();
listTasksRequest.setActions("testAction[n]"); // only pick node actions listTasksRequest.setActions("testAction[n]"); // only pick node actions
response = testNode.transportListTasksAction.execute(listTasksRequest).get(); response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest);
assertEquals(testNodes.length, response.getPerNodeTasks().size()); assertEquals(testNodes.length, response.getPerNodeTasks().size());
for (Map.Entry<String, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) { for (Map.Entry<String, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) {
assertEquals(1, entry.getValue().size()); assertEquals(1, entry.getValue().size());
@ -396,7 +398,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
// Check task counts using transport with detailed description // Check task counts using transport with detailed description
listTasksRequest.setDetailed(true); // same request only with detailed description listTasksRequest.setDetailed(true); // same request only with detailed description
response = testNode.transportListTasksAction.execute(listTasksRequest).get(); response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest);
assertEquals(testNodes.length, response.getPerNodeTasks().size()); assertEquals(testNodes.length, response.getPerNodeTasks().size());
for (Map.Entry<String, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) { for (Map.Entry<String, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) {
assertEquals(1, entry.getValue().size()); assertEquals(1, entry.getValue().size());
@ -405,7 +407,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
// Make sure that the main task on coordinating node is the task that was returned to us by execute() // Make sure that the main task on coordinating node is the task that was returned to us by execute()
listTasksRequest.setActions("testAction"); // only pick the main task listTasksRequest.setActions("testAction"); // only pick the main task
response = testNode.transportListTasksAction.execute(listTasksRequest).get(); response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest);
assertEquals(1, response.getTasks().size()); assertEquals(1, response.getTasks().size());
assertEquals(mainTask.getId(), response.getTasks().get(0).getId()); assertEquals(mainTask.getId(), response.getTasks().get(0).getId());
@ -433,7 +435,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
// Get the parent task // Get the parent task
ListTasksRequest listTasksRequest = new ListTasksRequest(); ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setActions("testAction"); listTasksRequest.setActions("testAction");
ListTasksResponse response = testNode.transportListTasksAction.execute(listTasksRequest).get(); ListTasksResponse response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest);
assertEquals(1, response.getTasks().size()); assertEquals(1, response.getTasks().size());
String parentNode = response.getTasks().get(0).getTaskId().getNodeId(); String parentNode = response.getTasks().get(0).getTaskId().getNodeId();
long parentTaskId = response.getTasks().get(0).getId(); long parentTaskId = response.getTasks().get(0).getId();
@ -441,7 +443,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
// Find tasks with common parent // Find tasks with common parent
listTasksRequest = new ListTasksRequest(); listTasksRequest = new ListTasksRequest();
listTasksRequest.setParentTaskId(new TaskId(parentNode, parentTaskId)); listTasksRequest.setParentTaskId(new TaskId(parentNode, parentTaskId));
response = testNode.transportListTasksAction.execute(listTasksRequest).get(); response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest);
assertEquals(testNodes.length, response.getTasks().size()); assertEquals(testNodes.length, response.getTasks().size());
for (TaskInfo task : response.getTasks()) { for (TaskInfo task : response.getTasks()) {
assertEquals("testAction[n]", task.getAction()); assertEquals("testAction[n]", task.getAction());
@ -467,7 +469,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
// Get the parent task // Get the parent task
ListTasksRequest listTasksRequest = new ListTasksRequest(); ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setActions("testAction*"); listTasksRequest.setActions("testAction*");
ListTasksResponse response = testNode.transportListTasksAction.execute(listTasksRequest).get(); ListTasksResponse response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest);
assertEquals(0, response.getTasks().size()); assertEquals(0, response.getTasks().size());
// Release all tasks and wait for response // Release all tasks and wait for response
@ -488,7 +490,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
TestNode testNode = testNodes[randomIntBetween(0, testNodes.length - 1)]; TestNode testNode = testNodes[randomIntBetween(0, testNodes.length - 1)];
ListTasksRequest listTasksRequest = new ListTasksRequest(); ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setActions("testAction[n]"); // only pick node actions listTasksRequest.setActions("testAction[n]"); // only pick node actions
ListTasksResponse response = testNode.transportListTasksAction.execute(listTasksRequest).get(); ListTasksResponse response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest);
assertEquals(testNodes.length, response.getPerNodeTasks().size()); assertEquals(testNodes.length, response.getPerNodeTasks().size());
for (Map.Entry<String, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) { for (Map.Entry<String, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) {
assertEquals(1, entry.getValue().size()); assertEquals(1, entry.getValue().size());
@ -498,7 +500,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
// Check task counts using transport with detailed description // Check task counts using transport with detailed description
long minimalDurationNanos = System.nanoTime() - maximumStartTimeNanos; long minimalDurationNanos = System.nanoTime() - maximumStartTimeNanos;
listTasksRequest.setDetailed(true); // same request only with detailed description listTasksRequest.setDetailed(true); // same request only with detailed description
response = testNode.transportListTasksAction.execute(listTasksRequest).get(); response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest);
assertEquals(testNodes.length, response.getPerNodeTasks().size()); assertEquals(testNodes.length, response.getPerNodeTasks().size());
for (Map.Entry<String, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) { for (Map.Entry<String, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) {
assertEquals(1, entry.getValue().size()); assertEquals(1, entry.getValue().size());
@ -536,8 +538,8 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
request.setNodes(testNodes[0].getNodeId()); request.setNodes(testNodes[0].getNodeId());
request.setReason("Testing Cancellation"); request.setReason("Testing Cancellation");
request.setActions(actionName); request.setActions(actionName);
CancelTasksResponse response = testNodes[randomIntBetween(0, testNodes.length - 1)].transportCancelTasksAction.execute(request) CancelTasksResponse response = ActionTestUtils.executeBlocking(
.get(); testNodes[randomIntBetween(0, testNodes.length - 1)].transportCancelTasksAction, request);
// Shouldn't match any tasks since testAction doesn't support cancellation // Shouldn't match any tasks since testAction doesn't support cancellation
assertEquals(0, response.getTasks().size()); assertEquals(0, response.getTasks().size());
@ -549,7 +551,8 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
request = new CancelTasksRequest(); request = new CancelTasksRequest();
request.setReason("Testing Cancellation"); request.setReason("Testing Cancellation");
request.setTaskId(new TaskId(testNodes[0].getNodeId(), task.getId())); request.setTaskId(new TaskId(testNodes[0].getNodeId(), task.getId()));
response = testNodes[randomIntBetween(0, testNodes.length - 1)].transportCancelTasksAction.execute(request).get(); response = ActionTestUtils.executeBlocking(
testNodes[randomIntBetween(0, testNodes.length - 1)].transportCancelTasksAction, request);
// Shouldn't match any tasks since testAction doesn't support cancellation // Shouldn't match any tasks since testAction doesn't support cancellation
assertEquals(0, response.getTasks().size()); assertEquals(0, response.getTasks().size());
@ -560,8 +563,8 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
// Make sure that task is still running // Make sure that task is still running
ListTasksRequest listTasksRequest = new ListTasksRequest(); ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setActions(actionName); listTasksRequest.setActions(actionName);
ListTasksResponse listResponse = testNodes[randomIntBetween(0, testNodes.length - 1)].transportListTasksAction.execute ListTasksResponse listResponse = ActionTestUtils.executeBlocking(
(listTasksRequest).get(); testNodes[randomIntBetween(0, testNodes.length - 1)].transportListTasksAction, listTasksRequest);
assertEquals(1, listResponse.getPerNodeTasks().size()); assertEquals(1, listResponse.getPerNodeTasks().size());
// Verify that tasks are marked as non-cancellable // Verify that tasks are marked as non-cancellable
for (TaskInfo taskInfo : listResponse.getTasks()) { for (TaskInfo taskInfo : listResponse.getTasks()) {
@ -595,7 +598,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
assertEquals(0, testNode.transportService.getTaskManager().getTasks().size()); assertEquals(0, testNode.transportService.getTaskManager().getTasks().size());
} }
NodesRequest request = new NodesRequest("Test Request"); NodesRequest request = new NodesRequest("Test Request");
NodesResponse responses = actions[0].execute(request).get(); NodesResponse responses = ActionTestUtils.executeBlocking(actions[0], request);
assertEquals(nodesCount, responses.failureCount()); assertEquals(nodesCount, responses.failureCount());
// Make sure that actions are still registered in the task manager on all nodes // Make sure that actions are still registered in the task manager on all nodes
@ -660,7 +663,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
// should be successful on all nodes except one // should be successful on all nodes except one
TestTasksRequest testTasksRequest = new TestTasksRequest(); TestTasksRequest testTasksRequest = new TestTasksRequest();
testTasksRequest.setActions("testAction[n]"); // pick all test actions testTasksRequest.setActions("testAction[n]"); // pick all test actions
TestTasksResponse response = tasksActions[0].execute(testTasksRequest).get(); TestTasksResponse response = ActionTestUtils.executeBlocking(tasksActions[0], testTasksRequest);
assertThat(response.getTaskFailures(), hasSize(1)); // one task failed assertThat(response.getTaskFailures(), hasSize(1)); // one task failed
assertThat(response.getTaskFailures().get(0).getReason(), containsString("Task level failure")); assertThat(response.getTaskFailures().get(0).getReason(), containsString("Task level failure"));
// Get successful responses from all nodes except one // Get successful responses from all nodes except one
@ -730,7 +733,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
// should be successful on all nodes except nodes that we filtered out // should be successful on all nodes except nodes that we filtered out
TestTasksRequest testTasksRequest = new TestTasksRequest(); TestTasksRequest testTasksRequest = new TestTasksRequest();
testTasksRequest.setActions("testAction[n]"); // pick all test actions testTasksRequest.setActions("testAction[n]"); // pick all test actions
TestTasksResponse response = tasksActions[randomIntBetween(0, nodesCount - 1)].execute(testTasksRequest).get(); TestTasksResponse response = ActionTestUtils.executeBlocking(tasksActions[randomIntBetween(0, nodesCount - 1)], testTasksRequest);
// Get successful responses from all nodes except nodes that we filtered out // Get successful responses from all nodes except nodes that we filtered out
assertEquals(testNodes.length - filterNodes.size(), response.tasks.size()); assertEquals(testNodes.length - filterNodes.size(), response.tasks.size());
@ -757,7 +760,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
// Get the parent task // Get the parent task
ListTasksRequest listTasksRequest = new ListTasksRequest(); ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setActions(ListTasksAction.NAME + "*"); listTasksRequest.setActions(ListTasksAction.NAME + "*");
ListTasksResponse response = testNodes[0].transportListTasksAction.execute(listTasksRequest).get(); ListTasksResponse response = ActionTestUtils.executeBlocking(testNodes[0].transportListTasksAction, listTasksRequest);
assertEquals(testNodes.length + 1, response.getTasks().size()); assertEquals(testNodes.length + 1, response.getTasks().size());
Map<String, Object> byNodes = serialize(response, true); Map<String, Object> byNodes = serialize(response, true);

View File

@ -23,6 +23,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilter; import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
@ -141,7 +142,7 @@ public class TransportMultiSearchActionTests extends ESTestCase {
multiSearchRequest.add(new SearchRequest()); multiSearchRequest.add(new SearchRequest());
} }
MultiSearchResponse response = action.execute(multiSearchRequest).actionGet(); MultiSearchResponse response = ActionTestUtils.executeBlocking(action, multiSearchRequest);
assertThat(response.getResponses().length, equalTo(numSearchRequests)); assertThat(response.getResponses().length, equalTo(numSearchRequests));
assertThat(requests.size(), equalTo(numSearchRequests)); assertThat(requests.size(), equalTo(numSearchRequests));
assertThat(errorHolder.get(), nullValue()); assertThat(errorHolder.get(), nullValue());

View File

@ -18,6 +18,8 @@
*/ */
package org.elasticsearch.action.support.replication; package org.elasticsearch.action.support.replication;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.NoShardAvailableActionException;
@ -120,7 +122,8 @@ public class BroadcastReplicationTests extends ESTestCase {
setState(clusterService, state(index, randomBoolean(), setState(clusterService, state(index, randomBoolean(),
randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED, ShardRoutingState.UNASSIGNED)); randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED, ShardRoutingState.UNASSIGNED));
logger.debug("--> using initial state:\n{}", clusterService.state()); logger.debug("--> using initial state:\n{}", clusterService.state());
Future<BroadcastResponse> response = (broadcastReplicationAction.execute(new DummyBroadcastRequest().indices(index))); PlainActionFuture<BroadcastResponse> response = PlainActionFuture.newFuture();
broadcastReplicationAction.execute(new DummyBroadcastRequest().indices(index), response);
for (Tuple<ShardId, ActionListener<ReplicationResponse>> shardRequests : broadcastReplicationAction.capturedShardRequests) { for (Tuple<ShardId, ActionListener<ReplicationResponse>> shardRequests : broadcastReplicationAction.capturedShardRequests) {
if (randomBoolean()) { if (randomBoolean()) {
shardRequests.v2().onFailure(new NoShardAvailableActionException(shardRequests.v1())); shardRequests.v2().onFailure(new NoShardAvailableActionException(shardRequests.v1()));
@ -139,7 +142,8 @@ public class BroadcastReplicationTests extends ESTestCase {
setState(clusterService, state(index, randomBoolean(), setState(clusterService, state(index, randomBoolean(),
ShardRoutingState.STARTED)); ShardRoutingState.STARTED));
logger.debug("--> using initial state:\n{}", clusterService.state()); logger.debug("--> using initial state:\n{}", clusterService.state());
Future<BroadcastResponse> response = (broadcastReplicationAction.execute(new DummyBroadcastRequest().indices(index))); PlainActionFuture<BroadcastResponse> response = PlainActionFuture.newFuture();
broadcastReplicationAction.execute(new DummyBroadcastRequest().indices(index), response);
for (Tuple<ShardId, ActionListener<ReplicationResponse>> shardRequests : broadcastReplicationAction.capturedShardRequests) { for (Tuple<ShardId, ActionListener<ReplicationResponse>> shardRequests : broadcastReplicationAction.capturedShardRequests) {
ReplicationResponse replicationResponse = new ReplicationResponse(); ReplicationResponse replicationResponse = new ReplicationResponse();
replicationResponse.setShardInfo(new ReplicationResponse.ShardInfo(1, 1)); replicationResponse.setShardInfo(new ReplicationResponse.ShardInfo(1, 1));
@ -154,7 +158,8 @@ public class BroadcastReplicationTests extends ESTestCase {
int numShards = 1 + randomInt(3); int numShards = 1 + randomInt(3);
setState(clusterService, stateWithAssignedPrimariesAndOneReplica(index, numShards)); setState(clusterService, stateWithAssignedPrimariesAndOneReplica(index, numShards));
logger.debug("--> using initial state:\n{}", clusterService.state()); logger.debug("--> using initial state:\n{}", clusterService.state());
Future<BroadcastResponse> response = (broadcastReplicationAction.execute(new DummyBroadcastRequest().indices(index))); PlainActionFuture<BroadcastResponse> response = PlainActionFuture.newFuture();
broadcastReplicationAction.execute(new DummyBroadcastRequest().indices(index), response);
int succeeded = 0; int succeeded = 0;
int failed = 0; int failed = 0;
for (Tuple<ShardId, ActionListener<ReplicationResponse>> shardRequests : broadcastReplicationAction.capturedShardRequests) { for (Tuple<ShardId, ActionListener<ReplicationResponse>> shardRequests : broadcastReplicationAction.capturedShardRequests) {
@ -231,17 +236,19 @@ public class BroadcastReplicationTests extends ESTestCase {
} }
} }
public FlushResponse assertImmediateResponse(String index, TransportFlushAction flushAction) throws InterruptedException, ExecutionException { public FlushResponse assertImmediateResponse(String index, TransportFlushAction flushAction) {
Date beginDate = new Date(); Date beginDate = new Date();
FlushResponse flushResponse = flushAction.execute(new FlushRequest(index)).get(); FlushResponse flushResponse = ActionTestUtils.executeBlocking(flushAction, new FlushRequest(index));
Date endDate = new Date(); Date endDate = new Date();
long maxTime = 500; long maxTime = 500;
assertThat("this should not take longer than " + maxTime + " ms. The request hangs somewhere", endDate.getTime() - beginDate.getTime(), lessThanOrEqualTo(maxTime)); assertThat("this should not take longer than " + maxTime + " ms. The request hangs somewhere", endDate.getTime() - beginDate.getTime(), lessThanOrEqualTo(maxTime));
return flushResponse; return flushResponse;
} }
public BroadcastResponse executeAndAssertImmediateResponse(TransportBroadcastReplicationAction broadcastAction, DummyBroadcastRequest request) throws InterruptedException, ExecutionException { public BroadcastResponse executeAndAssertImmediateResponse(TransportBroadcastReplicationAction broadcastAction, DummyBroadcastRequest request) {
return (BroadcastResponse) broadcastAction.execute(request).actionGet("5s"); PlainActionFuture<BroadcastResponse> response = PlainActionFuture.newFuture();
broadcastAction.execute(request, response);
return response.actionGet("5s");
} }
private void assertBroadcastResponse(int total, int successful, int failed, BroadcastResponse response, Class exceptionClass) { private void assertBroadcastResponse(int total, int successful, int failed, BroadcastResponse response, Class exceptionClass) {

View File

@ -23,6 +23,7 @@ import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
@ -552,9 +553,8 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
DiscoveryNode node = internalCluster().getInstance(ClusterService.class, nodeName).localNode(); DiscoveryNode node = internalCluster().getInstance(ClusterService.class, nodeName).localNode();
TransportNodesListGatewayStartedShards.NodesGatewayStartedShards response; TransportNodesListGatewayStartedShards.NodesGatewayStartedShards response;
response = internalCluster().getInstance(TransportNodesListGatewayStartedShards.class) response = ActionTestUtils.executeBlocking(internalCluster().getInstance(TransportNodesListGatewayStartedShards.class),
.execute(new TransportNodesListGatewayStartedShards.Request(shardId, new DiscoveryNode[]{node})) new TransportNodesListGatewayStartedShards.Request(shardId, new DiscoveryNode[]{node}));
.get();
assertThat(response.getNodes(), hasSize(1)); assertThat(response.getNodes(), hasSize(1));
assertThat(response.getNodes().get(0).allocationId(), notNullValue()); assertThat(response.getNodes().get(0).allocationId(), notNullValue());

View File

@ -0,0 +1,37 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import static org.elasticsearch.action.support.PlainActionFuture.newFuture;
public class ActionTestUtils {
private ActionTestUtils() { /* no construction */ }
public static <Request extends ActionRequest, Response extends ActionResponse>
Response executeBlocking(TransportAction<Request, Response> action, Request request) {
PlainActionFuture<Response> future = newFuture();
action.execute(request, future);
return future.actionGet();
}
}

View File

@ -7,10 +7,13 @@ package org.elasticsearch.xpack.monitoring.action;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.ActionFilter; import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
@ -49,7 +52,6 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutionException;
import static org.elasticsearch.Version.CURRENT; import static org.elasticsearch.Version.CURRENT;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
@ -119,7 +121,7 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
monitoringService); monitoringService);
final MonitoringBulkRequest request = randomRequest(); final MonitoringBulkRequest request = randomRequest();
final ExecutionException e = expectThrows(ExecutionException.class, () -> action.execute(request).get()); final ClusterBlockException e = expectThrows(ClusterBlockException.class, () -> ActionTestUtils.executeBlocking(action, request));
assertThat(e, hasToString(containsString("ClusterBlockException[blocked by: [SERVICE_UNAVAILABLE/2/no master]"))); assertThat(e, hasToString(containsString("ClusterBlockException[blocked by: [SERVICE_UNAVAILABLE/2/no master]")));
} }
@ -138,7 +140,7 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
final MonitoringBulkRequest request = new MonitoringBulkRequest(); final MonitoringBulkRequest request = new MonitoringBulkRequest();
request.add(doc); request.add(doc);
final MonitoringBulkResponse response = action.execute(request).get(); final MonitoringBulkResponse response = ActionTestUtils.executeBlocking(action, request);
assertThat(response.status(), is(RestStatus.OK)); assertThat(response.status(), is(RestStatus.OK));
assertThat(response.isIgnored(), is(true)); assertThat(response.isIgnored(), is(true));
@ -155,13 +157,14 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
monitoringService); monitoringService);
final MonitoringBulkRequest request = new MonitoringBulkRequest(); final MonitoringBulkRequest request = new MonitoringBulkRequest();
final ExecutionException e = expectThrows(ExecutionException.class, () -> action.execute(request).get()); final ActionRequestValidationException e = expectThrows(ActionRequestValidationException.class,
() -> ActionTestUtils.executeBlocking(action, request));
assertThat(e, hasToString(containsString("no monitoring documents added"))); assertThat(e, hasToString(containsString("no monitoring documents added")));
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void testExecuteRequest() throws Exception { public void testExecuteRequest() {
when(monitoringService.isMonitoringActive()).thenReturn(true); when(monitoringService.isMonitoringActive()).thenReturn(true);
final DiscoveryNode discoveryNode = new DiscoveryNode("_id", new TransportAddress(TransportAddress.META_ADDRESS, 9300), CURRENT); final DiscoveryNode discoveryNode = new DiscoveryNode("_id", new TransportAddress(TransportAddress.META_ADDRESS, 9300), CURRENT);
@ -217,7 +220,7 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
final TransportMonitoringBulkAction action = new TransportMonitoringBulkAction(Settings.EMPTY, threadPool, clusterService, final TransportMonitoringBulkAction action = new TransportMonitoringBulkAction(Settings.EMPTY, threadPool, clusterService,
transportService, filters, resolver, exporters, transportService, filters, resolver, exporters,
monitoringService); monitoringService);
action.execute(request).get(); ActionTestUtils.executeBlocking(action, request);
verify(threadPool).executor(ThreadPool.Names.GENERIC); verify(threadPool).executor(ThreadPool.Names.GENERIC);
verify(exporters).export(any(Collection.class), any(ActionListener.class)); verify(exporters).export(any(Collection.class), any(ActionListener.class));