From ee49081bc7810bdf513d6d67c5db46c7b6223f39 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Thu, 24 Mar 2016 18:40:32 -0400 Subject: [PATCH] Take filterNodeIds into consideration while sending tasks actions requests to nodes This commit fixes a bug that was causing the result of TransportTasksAction#filterNodeIds to be ignored and as a result the tasks actions were executed on all nodes. --- .../support/tasks/TransportTasksAction.java | 4 +- .../node/tasks/TaskManagerTestCase.java | 4 + .../node/tasks/TransportTasksActionTests.java | 74 +++++++++++++++++++ 3 files changed, 80 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java b/core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java index 97678e6c060..ad7702466cd 100644 --- a/core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java @@ -207,8 +207,8 @@ public abstract class TransportTasksAction< this.nodesIds = filterNodeIds(clusterState.nodes(), nodesIds); ImmutableOpenMap nodes = clusterState.nodes().nodes(); this.nodes = new DiscoveryNode[nodesIds.length]; - for (int i = 0; i < nodesIds.length; i++) { - this.nodes[i] = nodes.get(nodesIds[i]); + for (int i = 0; i < this.nodesIds.length; i++) { + this.nodes[i] = nodes.get(this.nodesIds[i]); } this.responses = new AtomicReferenceArray<>(this.nodesIds.length); } diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java index 48d9f8fed40..3d996becbae 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java @@ -219,6 +219,10 @@ public abstract class TaskManagerTestCase extends ESTestCase { clusterService.close(); transportService.close(); } + + public String getNodeId() { + return discoveryNode.getId(); + } } public static void connectNodes(TestNode... nodes) { diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java index 4b478b52bd0..972d9735efb 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java @@ -38,6 +38,7 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -54,6 +55,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -629,4 +631,76 @@ public class TransportTasksActionTests extends TaskManagerTestCase { NodesResponse responses = future.get(); assertEquals(0, responses.failureCount()); } + + + /** + * This test starts nodes actions that blocks on all nodes. While node actions are blocked in the middle of execution + * it executes a tasks action that targets these blocked node actions. The test verifies that task actions are only + * getting executed on nodes that are not listed in the node filter. + */ + public void testTaskNodeFiltering() throws ExecutionException, InterruptedException, IOException { + setupTestNodes(Settings.EMPTY); + connectNodes(testNodes); + CountDownLatch checkLatch = new CountDownLatch(1); + // Start some test nodes action so we could have something to run tasks actions on + ActionFuture future = startBlockingTestNodesAction(checkLatch); + + String[] allNodes = new String[testNodes.length]; + for (int i = 0; i < testNodes.length; i++) { + allNodes[i] = testNodes[i].getNodeId(); + } + + int filterNodesSize = randomInt(allNodes.length); + Set filterNodes = new HashSet<>(randomSubsetOf(filterNodesSize, allNodes)); + logger.info("Filtering out nodes {} size: {}", filterNodes, filterNodesSize); + + TestTasksAction[] tasksActions = new TestTasksAction[nodesCount]; + for (int i = 0; i < testNodes.length; i++) { + final int node = i; + // Simulate a task action that works on all nodes except nodes listed in filterNodes. + // We are testing that it works. + tasksActions[i] = new TestTasksAction(Settings.EMPTY, "testTasksAction", clusterName, threadPool, + testNodes[i].clusterService, testNodes[i].transportService) { + + @Override + protected String[] filterNodeIds(DiscoveryNodes nodes, String[] nodesIds) { + String[] superNodes = super.filterNodeIds(nodes, nodesIds); + List filteredNodes = new ArrayList<>(); + for (String node : superNodes) { + if (filterNodes.contains(node) == false) { + filteredNodes.add(node); + } + } + return filteredNodes.toArray(new String[filteredNodes.size()]); + } + + @Override + protected TestTaskResponse taskOperation(TestTasksRequest request, Task task) { + return new TestTaskResponse(testNodes[node].getNodeId()); + } + }; + } + + // Run task action on node tasks that are currently running + // should be successful on all nodes except nodes that we filtered out + TestTasksRequest testTasksRequest = new TestTasksRequest(); + testTasksRequest.setActions("testAction[n]"); // pick all test actions + TestTasksResponse response = tasksActions[randomIntBetween(0, nodesCount - 1)].execute(testTasksRequest).get(); + + // Get successful responses from all nodes except nodes that we filtered out + assertEquals(testNodes.length - filterNodes.size(), response.tasks.size()); + assertEquals(0, response.getTaskFailures().size()); // no task failed + assertEquals(0, response.getNodeFailures().size()); // no nodes failed + + // Make sure that filtered nodes didn't send any responses + for (TestTaskResponse taskResponse : response.tasks) { + String nodeId = taskResponse.getStatus(); + assertFalse("Found response from filtered node " + nodeId, filterNodes.contains(nodeId)); + } + + // Release all node tasks and wait for response + checkLatch.countDown(); + NodesResponse responses = future.get(); + assertEquals(0, responses.failureCount()); + } }