Take filterNodeIds into consideration while sending tasks actions requests to nodes

This commit fixes a bug that was causing the result of TransportTasksAction#filterNodeIds to be ignored and as a result the tasks actions were executed on all nodes.
This commit is contained in:
Igor Motov 2016-03-24 18:40:32 -04:00
parent 444641ac55
commit ee49081bc7
3 changed files with 80 additions and 2 deletions

View File

@ -207,8 +207,8 @@ public abstract class TransportTasksAction<
this.nodesIds = filterNodeIds(clusterState.nodes(), nodesIds);
ImmutableOpenMap<String, DiscoveryNode> nodes = clusterState.nodes().nodes();
this.nodes = new DiscoveryNode[nodesIds.length];
for (int i = 0; i < nodesIds.length; i++) {
this.nodes[i] = nodes.get(nodesIds[i]);
for (int i = 0; i < this.nodesIds.length; i++) {
this.nodes[i] = nodes.get(this.nodesIds[i]);
}
this.responses = new AtomicReferenceArray<>(this.nodesIds.length);
}

View File

@ -219,6 +219,10 @@ public abstract class TaskManagerTestCase extends ESTestCase {
clusterService.close();
transportService.close();
}
public String getNodeId() {
return discoveryNode.getId();
}
}
public static void connectNodes(TestNode... nodes) {

View File

@ -38,6 +38,7 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
@ -54,6 +55,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@ -629,4 +631,76 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
NodesResponse responses = future.get();
assertEquals(0, responses.failureCount());
}
/**
* This test starts nodes actions that blocks on all nodes. While node actions are blocked in the middle of execution
* it executes a tasks action that targets these blocked node actions. The test verifies that task actions are only
* getting executed on nodes that are not listed in the node filter.
*/
public void testTaskNodeFiltering() throws ExecutionException, InterruptedException, IOException {
setupTestNodes(Settings.EMPTY);
connectNodes(testNodes);
CountDownLatch checkLatch = new CountDownLatch(1);
// Start some test nodes action so we could have something to run tasks actions on
ActionFuture<NodesResponse> future = startBlockingTestNodesAction(checkLatch);
String[] allNodes = new String[testNodes.length];
for (int i = 0; i < testNodes.length; i++) {
allNodes[i] = testNodes[i].getNodeId();
}
int filterNodesSize = randomInt(allNodes.length);
Set<String> filterNodes = new HashSet<>(randomSubsetOf(filterNodesSize, allNodes));
logger.info("Filtering out nodes {} size: {}", filterNodes, filterNodesSize);
TestTasksAction[] tasksActions = new TestTasksAction[nodesCount];
for (int i = 0; i < testNodes.length; i++) {
final int node = i;
// Simulate a task action that works on all nodes except nodes listed in filterNodes.
// We are testing that it works.
tasksActions[i] = new TestTasksAction(Settings.EMPTY, "testTasksAction", clusterName, threadPool,
testNodes[i].clusterService, testNodes[i].transportService) {
@Override
protected String[] filterNodeIds(DiscoveryNodes nodes, String[] nodesIds) {
String[] superNodes = super.filterNodeIds(nodes, nodesIds);
List<String> filteredNodes = new ArrayList<>();
for (String node : superNodes) {
if (filterNodes.contains(node) == false) {
filteredNodes.add(node);
}
}
return filteredNodes.toArray(new String[filteredNodes.size()]);
}
@Override
protected TestTaskResponse taskOperation(TestTasksRequest request, Task task) {
return new TestTaskResponse(testNodes[node].getNodeId());
}
};
}
// Run task action on node tasks that are currently running
// should be successful on all nodes except nodes that we filtered out
TestTasksRequest testTasksRequest = new TestTasksRequest();
testTasksRequest.setActions("testAction[n]"); // pick all test actions
TestTasksResponse response = tasksActions[randomIntBetween(0, nodesCount - 1)].execute(testTasksRequest).get();
// Get successful responses from all nodes except nodes that we filtered out
assertEquals(testNodes.length - filterNodes.size(), response.tasks.size());
assertEquals(0, response.getTaskFailures().size()); // no task failed
assertEquals(0, response.getNodeFailures().size()); // no nodes failed
// Make sure that filtered nodes didn't send any responses
for (TestTaskResponse taskResponse : response.tasks) {
String nodeId = taskResponse.getStatus();
assertFalse("Found response from filtered node " + nodeId, filterNodes.contains(nodeId));
}
// Release all node tasks and wait for response
checkLatch.countDown();
NodesResponse responses = future.get();
assertEquals(0, responses.failureCount());
}
}