diff --git a/server/src/main/java/org/elasticsearch/action/support/TransportAction.java b/server/src/main/java/org/elasticsearch/action/support/TransportAction.java index 22edbfca2dc..00b7f4e6186 100644 --- a/server/src/main/java/org/elasticsearch/action/support/TransportAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/TransportAction.java @@ -55,12 +55,6 @@ public abstract class TransportAction execute(Request request) { - PlainActionFuture future = newFuture(); - execute(request, future); - return future; - } - /** * Use this method when the transport action call should result in creation of a new task associated with the call. * diff --git a/server/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayMetaState.java b/server/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayMetaState.java index 04253dfabb1..4e498d393e2 100644 --- a/server/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayMetaState.java +++ b/server/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayMetaState.java @@ -23,6 +23,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.nodes.BaseNodeRequest; import org.elasticsearch.action.support.nodes.BaseNodeResponse; import org.elasticsearch.action.support.nodes.BaseNodesRequest; @@ -65,7 +66,9 @@ public class TransportNodesListGatewayMetaState extends TransportNodesAction list(String[] nodesIds, @Nullable TimeValue timeout) { - return execute(new Request(nodesIds).timeout(timeout)); + PlainActionFuture future = PlainActionFuture.newFuture(); + execute(new Request(nodesIds).timeout(timeout), future); + return future; } @Override diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java index 6b2e2040bca..524d522153f 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksReque import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; +import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.nodes.BaseNodeRequest; import org.elasticsearch.action.support.nodes.BaseNodesRequest; import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; @@ -254,8 +255,8 @@ public class CancellableTasksTests extends TaskManagerTestCase { request.setReason("Testing Cancellation"); request.setTaskId(new TaskId(testNodes[0].getNodeId(), mainTask.getId())); // And send the cancellation request to a random node - CancelTasksResponse response = testNodes[randomIntBetween(0, testNodes.length - 1)].transportCancelTasksAction.execute(request) - .get(); + CancelTasksResponse response = ActionTestUtils.executeBlocking( + testNodes[randomIntBetween(0, testNodes.length - 1)].transportCancelTasksAction, request); // Awaiting for the main task to finish responseLatch.await(); @@ -287,9 +288,9 @@ public class CancellableTasksTests extends TaskManagerTestCase { } // Make sure that tasks are no longer running - ListTasksResponse listTasksResponse = testNodes[randomIntBetween(0, testNodes.length - 1)] - .transportListTasksAction.execute(new ListTasksRequest().setTaskId( - new TaskId(testNodes[0].getNodeId(), mainTask.getId()))).get(); + ListTasksResponse listTasksResponse = ActionTestUtils.executeBlocking( + testNodes[randomIntBetween(0, testNodes.length - 1)].transportListTasksAction, + new ListTasksRequest().setTaskId(new TaskId(testNodes[0].getNodeId(), mainTask.getId()))); assertEquals(0, listTasksResponse.getTasks().size()); // Make sure that there are no leftover bans, the ban removal is async, so we might return from the cancellation @@ -326,8 +327,8 @@ public class CancellableTasksTests extends TaskManagerTestCase { request.setReason("Testing Cancellation"); request.setParentTaskId(new TaskId(testNodes[0].getNodeId(), mainTask.getId())); // And send the cancellation request to a random node - CancelTasksResponse response = testNodes[randomIntBetween(1, testNodes.length - 1)].transportCancelTasksAction.execute(request) - .get(); + CancelTasksResponse response = ActionTestUtils.executeBlocking( + testNodes[randomIntBetween(1, testNodes.length - 1)].transportCancelTasksAction, request); // Awaiting for the main task to finish responseLatch.await(); @@ -336,16 +337,11 @@ public class CancellableTasksTests extends TaskManagerTestCase { assertThat(response.getTasks().size(), equalTo(testNodes.length)); assertBusy(() -> { - try { // Make sure that main task is no longer running - ListTasksResponse listTasksResponse = testNodes[randomIntBetween(0, testNodes.length - 1)] - .transportListTasksAction.execute(new ListTasksRequest().setTaskId( - new TaskId(testNodes[0].getNodeId(), mainTask.getId()))).get(); - assertEquals(0, listTasksResponse.getTasks().size()); - - } catch (ExecutionException | InterruptedException ex) { - throw new RuntimeException(ex); - } + ListTasksResponse listTasksResponse = ActionTestUtils.executeBlocking( + testNodes[randomIntBetween(0, testNodes.length - 1)].transportListTasksAction, + new ListTasksRequest().setTaskId(new TaskId(testNodes[0].getNodeId(), mainTask.getId()))); + assertEquals(0, listTasksResponse.getTasks().size()); }); } @@ -378,8 +374,9 @@ public class CancellableTasksTests extends TaskManagerTestCase { String mainNode = testNodes[0].getNodeId(); // Make sure that tasks are running - ListTasksResponse listTasksResponse = testNodes[randomIntBetween(0, testNodes.length - 1)] - .transportListTasksAction.execute(new ListTasksRequest().setParentTaskId(new TaskId(mainNode, mainTask.getId()))).get(); + ListTasksResponse listTasksResponse = ActionTestUtils.executeBlocking( + testNodes[randomIntBetween(0, testNodes.length - 1)].transportListTasksAction, + new ListTasksRequest().setParentTaskId(new TaskId(mainNode, mainTask.getId()))); assertThat(listTasksResponse.getTasks().size(), greaterThanOrEqualTo(blockOnNodes.size())); // Simulate the coordinating node leaving the cluster @@ -400,7 +397,7 @@ public class CancellableTasksTests extends TaskManagerTestCase { request.setReason("Testing Cancellation"); request.setTaskId(new TaskId(testNodes[0].getNodeId(), mainTask.getId())); // And send the cancellation request to a random node - CancelTasksResponse response = testNodes[0].transportCancelTasksAction.execute(request).get(); + CancelTasksResponse response = ActionTestUtils.executeBlocking(testNodes[0].transportCancelTasksAction, request); logger.info("--> Done simulating issuing cancel request on the node that is about to leave the cluster"); // This node still thinks that's part of the cluster, so cancelling should look successful if (response.getTasks().size() == 0) { @@ -420,15 +417,10 @@ public class CancellableTasksTests extends TaskManagerTestCase { assertBusy(() -> { // Make sure that tasks are no longer running - try { - ListTasksResponse listTasksResponse1 = testNodes[randomIntBetween(1, testNodes.length - 1)] - .transportListTasksAction.execute(new ListTasksRequest().setTaskId(new TaskId(mainNode, mainTask.getId()))).get(); - assertEquals(0, listTasksResponse1.getTasks().size()); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } catch (ExecutionException ex2) { - fail("shouldn't be here"); - } + ListTasksResponse listTasksResponse1 = ActionTestUtils.executeBlocking( + testNodes[randomIntBetween(1, testNodes.length - 1)].transportListTasksAction, + new ListTasksRequest().setTaskId(new TaskId(mainNode, mainTask.getId()))); + assertEquals(0, listTasksResponse1.getTasks().size()); }); // Wait for clean up diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java index cb6f2b57b2b..27230eb518e 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.action.admin.cluster.node.tasks; +import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.FailedNodeException; @@ -29,6 +30,7 @@ import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.nodes.BaseNodeRequest; import org.elasticsearch.action.support.nodes.BaseNodesRequest; @@ -363,7 +365,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase { ListTasksRequest listTasksRequest = new ListTasksRequest(); listTasksRequest.setActions("testAction*"); // pick all test actions logger.info("Listing currently running tasks using node [{}]", testNodeNum); - ListTasksResponse response = testNode.transportListTasksAction.execute(listTasksRequest).get(); + ListTasksResponse response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest); logger.info("Checking currently running tasks"); assertEquals(testNodes.length, response.getPerNodeTasks().size()); @@ -382,7 +384,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase { testNode = testNodes[randomIntBetween(0, testNodes.length - 1)]; listTasksRequest = new ListTasksRequest(); listTasksRequest.setActions("testAction[n]"); // only pick node actions - response = testNode.transportListTasksAction.execute(listTasksRequest).get(); + response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest); assertEquals(testNodes.length, response.getPerNodeTasks().size()); for (Map.Entry> entry : response.getPerNodeTasks().entrySet()) { assertEquals(1, entry.getValue().size()); @@ -396,7 +398,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase { // Check task counts using transport with detailed description listTasksRequest.setDetailed(true); // same request only with detailed description - response = testNode.transportListTasksAction.execute(listTasksRequest).get(); + response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest); assertEquals(testNodes.length, response.getPerNodeTasks().size()); for (Map.Entry> entry : response.getPerNodeTasks().entrySet()) { assertEquals(1, entry.getValue().size()); @@ -405,7 +407,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase { // Make sure that the main task on coordinating node is the task that was returned to us by execute() listTasksRequest.setActions("testAction"); // only pick the main task - response = testNode.transportListTasksAction.execute(listTasksRequest).get(); + response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest); assertEquals(1, response.getTasks().size()); assertEquals(mainTask.getId(), response.getTasks().get(0).getId()); @@ -433,7 +435,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase { // Get the parent task ListTasksRequest listTasksRequest = new ListTasksRequest(); listTasksRequest.setActions("testAction"); - ListTasksResponse response = testNode.transportListTasksAction.execute(listTasksRequest).get(); + ListTasksResponse response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest); assertEquals(1, response.getTasks().size()); String parentNode = response.getTasks().get(0).getTaskId().getNodeId(); long parentTaskId = response.getTasks().get(0).getId(); @@ -441,7 +443,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase { // Find tasks with common parent listTasksRequest = new ListTasksRequest(); listTasksRequest.setParentTaskId(new TaskId(parentNode, parentTaskId)); - response = testNode.transportListTasksAction.execute(listTasksRequest).get(); + response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest); assertEquals(testNodes.length, response.getTasks().size()); for (TaskInfo task : response.getTasks()) { assertEquals("testAction[n]", task.getAction()); @@ -467,7 +469,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase { // Get the parent task ListTasksRequest listTasksRequest = new ListTasksRequest(); listTasksRequest.setActions("testAction*"); - ListTasksResponse response = testNode.transportListTasksAction.execute(listTasksRequest).get(); + ListTasksResponse response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest); assertEquals(0, response.getTasks().size()); // Release all tasks and wait for response @@ -488,7 +490,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase { TestNode testNode = testNodes[randomIntBetween(0, testNodes.length - 1)]; ListTasksRequest listTasksRequest = new ListTasksRequest(); listTasksRequest.setActions("testAction[n]"); // only pick node actions - ListTasksResponse response = testNode.transportListTasksAction.execute(listTasksRequest).get(); + ListTasksResponse response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest); assertEquals(testNodes.length, response.getPerNodeTasks().size()); for (Map.Entry> entry : response.getPerNodeTasks().entrySet()) { assertEquals(1, entry.getValue().size()); @@ -498,7 +500,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase { // Check task counts using transport with detailed description long minimalDurationNanos = System.nanoTime() - maximumStartTimeNanos; listTasksRequest.setDetailed(true); // same request only with detailed description - response = testNode.transportListTasksAction.execute(listTasksRequest).get(); + response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest); assertEquals(testNodes.length, response.getPerNodeTasks().size()); for (Map.Entry> entry : response.getPerNodeTasks().entrySet()) { assertEquals(1, entry.getValue().size()); @@ -536,8 +538,8 @@ public class TransportTasksActionTests extends TaskManagerTestCase { request.setNodes(testNodes[0].getNodeId()); request.setReason("Testing Cancellation"); request.setActions(actionName); - CancelTasksResponse response = testNodes[randomIntBetween(0, testNodes.length - 1)].transportCancelTasksAction.execute(request) - .get(); + CancelTasksResponse response = ActionTestUtils.executeBlocking( + testNodes[randomIntBetween(0, testNodes.length - 1)].transportCancelTasksAction, request); // Shouldn't match any tasks since testAction doesn't support cancellation assertEquals(0, response.getTasks().size()); @@ -549,7 +551,8 @@ public class TransportTasksActionTests extends TaskManagerTestCase { request = new CancelTasksRequest(); request.setReason("Testing Cancellation"); request.setTaskId(new TaskId(testNodes[0].getNodeId(), task.getId())); - response = testNodes[randomIntBetween(0, testNodes.length - 1)].transportCancelTasksAction.execute(request).get(); + response = ActionTestUtils.executeBlocking( + testNodes[randomIntBetween(0, testNodes.length - 1)].transportCancelTasksAction, request); // Shouldn't match any tasks since testAction doesn't support cancellation assertEquals(0, response.getTasks().size()); @@ -560,8 +563,8 @@ public class TransportTasksActionTests extends TaskManagerTestCase { // Make sure that task is still running ListTasksRequest listTasksRequest = new ListTasksRequest(); listTasksRequest.setActions(actionName); - ListTasksResponse listResponse = testNodes[randomIntBetween(0, testNodes.length - 1)].transportListTasksAction.execute - (listTasksRequest).get(); + ListTasksResponse listResponse = ActionTestUtils.executeBlocking( + testNodes[randomIntBetween(0, testNodes.length - 1)].transportListTasksAction, listTasksRequest); assertEquals(1, listResponse.getPerNodeTasks().size()); // Verify that tasks are marked as non-cancellable for (TaskInfo taskInfo : listResponse.getTasks()) { @@ -595,7 +598,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase { assertEquals(0, testNode.transportService.getTaskManager().getTasks().size()); } NodesRequest request = new NodesRequest("Test Request"); - NodesResponse responses = actions[0].execute(request).get(); + NodesResponse responses = ActionTestUtils.executeBlocking(actions[0], request); assertEquals(nodesCount, responses.failureCount()); // Make sure that actions are still registered in the task manager on all nodes @@ -660,7 +663,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase { // should be successful on all nodes except one TestTasksRequest testTasksRequest = new TestTasksRequest(); testTasksRequest.setActions("testAction[n]"); // pick all test actions - TestTasksResponse response = tasksActions[0].execute(testTasksRequest).get(); + TestTasksResponse response = ActionTestUtils.executeBlocking(tasksActions[0], testTasksRequest); assertThat(response.getTaskFailures(), hasSize(1)); // one task failed assertThat(response.getTaskFailures().get(0).getReason(), containsString("Task level failure")); // Get successful responses from all nodes except one @@ -730,7 +733,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase { // should be successful on all nodes except nodes that we filtered out TestTasksRequest testTasksRequest = new TestTasksRequest(); testTasksRequest.setActions("testAction[n]"); // pick all test actions - TestTasksResponse response = tasksActions[randomIntBetween(0, nodesCount - 1)].execute(testTasksRequest).get(); + TestTasksResponse response = ActionTestUtils.executeBlocking(tasksActions[randomIntBetween(0, nodesCount - 1)], testTasksRequest); // Get successful responses from all nodes except nodes that we filtered out assertEquals(testNodes.length - filterNodes.size(), response.tasks.size()); @@ -757,7 +760,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase { // Get the parent task ListTasksRequest listTasksRequest = new ListTasksRequest(); listTasksRequest.setActions(ListTasksAction.NAME + "*"); - ListTasksResponse response = testNodes[0].transportListTasksAction.execute(listTasksRequest).get(); + ListTasksResponse response = ActionTestUtils.executeBlocking(testNodes[0].transportListTasksAction, listTasksRequest); assertEquals(testNodes.length + 1, response.getTasks().size()); Map byNodes = serialize(response, true); diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java index 3c2a87e8fef..2b83dd9aa63 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilter; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -141,7 +142,7 @@ public class TransportMultiSearchActionTests extends ESTestCase { multiSearchRequest.add(new SearchRequest()); } - MultiSearchResponse response = action.execute(multiSearchRequest).actionGet(); + MultiSearchResponse response = ActionTestUtils.executeBlocking(action, multiSearchRequest); assertThat(response.getResponses().length, equalTo(numSearchRequests)); assertThat(requests.size(), equalTo(numSearchRequests)); assertThat(errorHolder.get(), nullValue()); diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java index d2a51070c92..24d9633bc51 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java @@ -18,6 +18,8 @@ */ package org.elasticsearch.action.support.replication; +import org.elasticsearch.action.support.ActionTestUtils; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.NoShardAvailableActionException; @@ -120,7 +122,8 @@ public class BroadcastReplicationTests extends ESTestCase { setState(clusterService, state(index, randomBoolean(), randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED, ShardRoutingState.UNASSIGNED)); logger.debug("--> using initial state:\n{}", clusterService.state()); - Future response = (broadcastReplicationAction.execute(new DummyBroadcastRequest().indices(index))); + PlainActionFuture response = PlainActionFuture.newFuture(); + broadcastReplicationAction.execute(new DummyBroadcastRequest().indices(index), response); for (Tuple> shardRequests : broadcastReplicationAction.capturedShardRequests) { if (randomBoolean()) { shardRequests.v2().onFailure(new NoShardAvailableActionException(shardRequests.v1())); @@ -139,7 +142,8 @@ public class BroadcastReplicationTests extends ESTestCase { setState(clusterService, state(index, randomBoolean(), ShardRoutingState.STARTED)); logger.debug("--> using initial state:\n{}", clusterService.state()); - Future response = (broadcastReplicationAction.execute(new DummyBroadcastRequest().indices(index))); + PlainActionFuture response = PlainActionFuture.newFuture(); + broadcastReplicationAction.execute(new DummyBroadcastRequest().indices(index), response); for (Tuple> shardRequests : broadcastReplicationAction.capturedShardRequests) { ReplicationResponse replicationResponse = new ReplicationResponse(); replicationResponse.setShardInfo(new ReplicationResponse.ShardInfo(1, 1)); @@ -154,7 +158,8 @@ public class BroadcastReplicationTests extends ESTestCase { int numShards = 1 + randomInt(3); setState(clusterService, stateWithAssignedPrimariesAndOneReplica(index, numShards)); logger.debug("--> using initial state:\n{}", clusterService.state()); - Future response = (broadcastReplicationAction.execute(new DummyBroadcastRequest().indices(index))); + PlainActionFuture response = PlainActionFuture.newFuture(); + broadcastReplicationAction.execute(new DummyBroadcastRequest().indices(index), response); int succeeded = 0; int failed = 0; for (Tuple> shardRequests : broadcastReplicationAction.capturedShardRequests) { @@ -231,17 +236,19 @@ public class BroadcastReplicationTests extends ESTestCase { } } - public FlushResponse assertImmediateResponse(String index, TransportFlushAction flushAction) throws InterruptedException, ExecutionException { + public FlushResponse assertImmediateResponse(String index, TransportFlushAction flushAction) { Date beginDate = new Date(); - FlushResponse flushResponse = flushAction.execute(new FlushRequest(index)).get(); + FlushResponse flushResponse = ActionTestUtils.executeBlocking(flushAction, new FlushRequest(index)); Date endDate = new Date(); long maxTime = 500; assertThat("this should not take longer than " + maxTime + " ms. The request hangs somewhere", endDate.getTime() - beginDate.getTime(), lessThanOrEqualTo(maxTime)); return flushResponse; } - public BroadcastResponse executeAndAssertImmediateResponse(TransportBroadcastReplicationAction broadcastAction, DummyBroadcastRequest request) throws InterruptedException, ExecutionException { - return (BroadcastResponse) broadcastAction.execute(request).actionGet("5s"); + public BroadcastResponse executeAndAssertImmediateResponse(TransportBroadcastReplicationAction broadcastAction, DummyBroadcastRequest request) { + PlainActionFuture response = PlainActionFuture.newFuture(); + broadcastAction.execute(request, response); + return response.actionGet("5s"); } private void assertBroadcastResponse(int total, int successful, int failed, BroadcastResponse response, Class exceptionClass) { diff --git a/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java b/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java index 154d702e7fb..d098c4918a7 100644 --- a/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java +++ b/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java @@ -23,6 +23,7 @@ import com.carrotsearch.hppc.cursors.ObjectCursor; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -552,9 +553,8 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase { DiscoveryNode node = internalCluster().getInstance(ClusterService.class, nodeName).localNode(); TransportNodesListGatewayStartedShards.NodesGatewayStartedShards response; - response = internalCluster().getInstance(TransportNodesListGatewayStartedShards.class) - .execute(new TransportNodesListGatewayStartedShards.Request(shardId, new DiscoveryNode[]{node})) - .get(); + response = ActionTestUtils.executeBlocking(internalCluster().getInstance(TransportNodesListGatewayStartedShards.class), + new TransportNodesListGatewayStartedShards.Request(shardId, new DiscoveryNode[]{node})); assertThat(response.getNodes(), hasSize(1)); assertThat(response.getNodes().get(0).allocationId(), notNullValue()); diff --git a/test/framework/src/main/java/org/elasticsearch/action/support/ActionTestUtils.java b/test/framework/src/main/java/org/elasticsearch/action/support/ActionTestUtils.java new file mode 100644 index 00000000000..cdc76292be0 --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/action/support/ActionTestUtils.java @@ -0,0 +1,37 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.support; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; + +import static org.elasticsearch.action.support.PlainActionFuture.newFuture; + +public class ActionTestUtils { + + private ActionTestUtils() { /* no construction */ } + + public static + Response executeBlocking(TransportAction action, Request request) { + PlainActionFuture future = newFuture(); + action.execute(request, future); + return future.actionGet(); + } +} diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/action/TransportMonitoringBulkActionTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/action/TransportMonitoringBulkActionTests.java index f918c7aaf56..4d398a401ab 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/action/TransportMonitoringBulkActionTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/action/TransportMonitoringBulkActionTests.java @@ -7,10 +7,13 @@ package org.elasticsearch.xpack.monitoring.action; import java.util.concurrent.ExecutorService; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.support.ActionFilter; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; @@ -49,7 +52,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.concurrent.ExecutionException; import static org.elasticsearch.Version.CURRENT; import static org.hamcrest.Matchers.containsString; @@ -119,7 +121,7 @@ public class TransportMonitoringBulkActionTests extends ESTestCase { monitoringService); final MonitoringBulkRequest request = randomRequest(); - final ExecutionException e = expectThrows(ExecutionException.class, () -> action.execute(request).get()); + final ClusterBlockException e = expectThrows(ClusterBlockException.class, () -> ActionTestUtils.executeBlocking(action, request)); assertThat(e, hasToString(containsString("ClusterBlockException[blocked by: [SERVICE_UNAVAILABLE/2/no master]"))); } @@ -138,7 +140,7 @@ public class TransportMonitoringBulkActionTests extends ESTestCase { final MonitoringBulkRequest request = new MonitoringBulkRequest(); request.add(doc); - final MonitoringBulkResponse response = action.execute(request).get(); + final MonitoringBulkResponse response = ActionTestUtils.executeBlocking(action, request); assertThat(response.status(), is(RestStatus.OK)); assertThat(response.isIgnored(), is(true)); @@ -155,13 +157,14 @@ public class TransportMonitoringBulkActionTests extends ESTestCase { monitoringService); final MonitoringBulkRequest request = new MonitoringBulkRequest(); - final ExecutionException e = expectThrows(ExecutionException.class, () -> action.execute(request).get()); + final ActionRequestValidationException e = expectThrows(ActionRequestValidationException.class, + () -> ActionTestUtils.executeBlocking(action, request)); assertThat(e, hasToString(containsString("no monitoring documents added"))); } @SuppressWarnings("unchecked") - public void testExecuteRequest() throws Exception { + public void testExecuteRequest() { when(monitoringService.isMonitoringActive()).thenReturn(true); final DiscoveryNode discoveryNode = new DiscoveryNode("_id", new TransportAddress(TransportAddress.META_ADDRESS, 9300), CURRENT); @@ -217,7 +220,7 @@ public class TransportMonitoringBulkActionTests extends ESTestCase { final TransportMonitoringBulkAction action = new TransportMonitoringBulkAction(Settings.EMPTY, threadPool, clusterService, transportService, filters, resolver, exporters, monitoringService); - action.execute(request).get(); + ActionTestUtils.executeBlocking(action, request); verify(threadPool).executor(ThreadPool.Names.GENERIC); verify(exporters).export(any(Collection.class), any(ActionListener.class));