diff --git a/core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java b/core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java index 0f31a80c28f..7e2702afd8a 100644 --- a/core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java @@ -110,7 +110,7 @@ public abstract class TransportNodesAction listener) { + AsyncAction(Task task, NodesRequest request, ActionListener listener) { this.task = task; this.request = request; this.listener = listener; @@ -135,7 +135,7 @@ public abstract class TransportNodesAction(this.nodesIds.length); } - private void start() { + void start() { if (nodesIds.length == 0) { // nothing to notify threadPool.generic().execute(new Runnable() { @@ -158,11 +158,6 @@ public abstract class TransportNodesAction listeners = new CopyOnWriteArrayList<>(); diff --git a/core/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java b/core/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java new file mode 100644 index 00000000000..0a6f94366f9 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java @@ -0,0 +1,226 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.support.nodes; + +import org.elasticsearch.Version; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeActionTests; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.DummyTransportAddress; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.cluster.TestClusterService; +import org.elasticsearch.test.transport.CapturingTransport; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.function.Supplier; + +public class TransportNodesActionTests extends ESTestCase { + + private static ThreadPool THREAD_POOL; + private static ClusterName CLUSTER_NAME = new ClusterName("test-cluster"); + + private TestClusterService clusterService; + private CapturingTransport transport; + private TestTransportNodesAction action; + + public void testRequestIsSentToEachNode() throws Exception { + TestNodesRequest request = new TestNodesRequest(); + PlainActionFuture listener = new PlainActionFuture<>(); + action.new AsyncAction(null, request, listener).start(); + Map> capturedRequests = transport.getCapturedRequestsByTargetNodeAndClear(); + int numNodes = clusterService.state().getNodes().size(); + // check a request was sent to the right number of nodes + assertEquals(numNodes, capturedRequests.size()); + } + + public void testNodesSelectors() { + int numSelectors = randomIntBetween(1, 5); + Set nodeSelectors = new HashSet<>(); + for (int i = 0; i < numSelectors; i++) { + nodeSelectors.add(randomFrom(NodeSelector.values()).selector); + } + int numNodeIds = randomIntBetween(0, 3); + String[] nodeIds = clusterService.state().nodes().nodes().keys().toArray(String.class); + for (int i = 0; i < numNodeIds; i++) { + String nodeId = randomFrom(nodeIds); + nodeSelectors.add(nodeId); + } + String[] finalNodesIds = nodeSelectors.toArray(new String[nodeSelectors.size()]); + TestNodesRequest request = new TestNodesRequest(finalNodesIds); + action.new AsyncAction(null, request, new PlainActionFuture<>()).start(); + Map> capturedRequests = transport.getCapturedRequestsByTargetNodeAndClear(); + assertEquals(clusterService.state().nodes().resolveNodesIds(finalNodesIds).length, capturedRequests.size()); + } + + private enum NodeSelector { + LOCAL("_local"), ELECTED_MASTER("_master"), MASTER_ELIGIBLE("master:true"), DATA("data:true"), CUSTOM_ATTRIBUTE("attr:value"); + + private final String selector; + + NodeSelector(String selector) { + this.selector = selector; + } + } + + @BeforeClass + public static void startThreadPool() { + THREAD_POOL = new ThreadPool(TransportBroadcastByNodeActionTests.class.getSimpleName()); + } + + @AfterClass + public static void destroyThreadPool() { + ThreadPool.terminate(THREAD_POOL, 30, TimeUnit.SECONDS); + // since static must set to null to be eligible for collection + THREAD_POOL = null; + } + + @Before + public void setUp() throws Exception { + super.setUp(); + transport = new CapturingTransport(); + clusterService = new TestClusterService(THREAD_POOL); + final TransportService transportService = new TransportService(transport, THREAD_POOL); + transportService.start(); + transportService.acceptIncomingRequests(); + int numNodes = randomIntBetween(3, 10); + DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder(); + List discoveryNodes = new ArrayList<>(); + for (int i = 0; i < numNodes; i++) { + Map attributes = new HashMap<>(); + if (randomBoolean()) { + attributes.put("master", Boolean.toString(randomBoolean())); + attributes.put("data", Boolean.toString(randomBoolean())); + attributes.put("ingest", Boolean.toString(randomBoolean())); + } else { + attributes.put("client", "true"); + } + if (frequently()) { + attributes.put("custom", randomBoolean() ? "match" : randomAsciiOfLengthBetween(3, 5)); + } + final DiscoveryNode node = newNode(i, attributes); + discoBuilder = discoBuilder.put(node); + discoveryNodes.add(node); + } + discoBuilder.localNodeId(randomFrom(discoveryNodes).id()); + discoBuilder.masterNodeId(randomFrom(discoveryNodes).id()); + ClusterState.Builder stateBuilder = ClusterState.builder(CLUSTER_NAME); + stateBuilder.nodes(discoBuilder); + ClusterState clusterState = stateBuilder.build(); + clusterService.setState(clusterState); + action = new TestTransportNodesAction( + Settings.EMPTY, + THREAD_POOL, + clusterService, + transportService, + new ActionFilters(Collections.emptySet()), + TestNodesRequest::new, + TestNodeRequest::new, + ThreadPool.Names.SAME + ); + } + + private static DiscoveryNode newNode(int nodeId, Map attributes) { + String node = "node_" + nodeId; + return new DiscoveryNode(node, node, DummyTransportAddress.INSTANCE, attributes, Version.CURRENT); + } + + private static class TestTransportNodesAction extends TransportNodesAction { + + TestTransportNodesAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService + transportService, ActionFilters actionFilters, Supplier request, + Supplier nodeRequest, String nodeExecutor) { + super(settings, "indices:admin/test", CLUSTER_NAME, threadPool, clusterService, transportService, actionFilters, + null, request, nodeRequest, nodeExecutor); + } + + @Override + protected TestNodesResponse newResponse(TestNodesRequest request, AtomicReferenceArray nodesResponses) { + final List nodeResponses = new ArrayList<>(); + for (int i = 0; i < nodesResponses.length(); i++) { + Object resp = nodesResponses.get(i); + if (resp instanceof TestNodeResponse) { + nodeResponses.add((TestNodeResponse) resp); + } + } + return new TestNodesResponse(nodeResponses); + } + + @Override + protected TestNodeRequest newNodeRequest(String nodeId, TestNodesRequest request) { + return new TestNodeRequest(); + } + + @Override + protected TestNodeResponse newNodeResponse() { + return new TestNodeResponse(); + } + + @Override + protected TestNodeResponse nodeOperation(TestNodeRequest request) { + return new TestNodeResponse(); + } + + @Override + protected boolean accumulateExceptions() { + return false; + } + } + + private static class TestNodesRequest extends BaseNodesRequest { + TestNodesRequest(String... nodesIds) { + super(nodesIds); + } + } + + private static class TestNodesResponse extends BaseNodesResponse { + + private final List nodeResponses; + + TestNodesResponse(List nodeResponses) { + this.nodeResponses = nodeResponses; + } + } + + private static class TestNodeRequest extends BaseNodeRequest { + } + + private static class TestNodeResponse extends BaseNodeResponse { + } +} diff --git a/core/src/test/java/org/elasticsearch/nodesinfo/SimpleNodesInfoIT.java b/core/src/test/java/org/elasticsearch/nodesinfo/SimpleNodesInfoIT.java index b643ba0d0ad..e3777e84f9a 100644 --- a/core/src/test/java/org/elasticsearch/nodesinfo/SimpleNodesInfoIT.java +++ b/core/src/test/java/org/elasticsearch/nodesinfo/SimpleNodesInfoIT.java @@ -38,13 +38,8 @@ import static org.hamcrest.Matchers.notNullValue; /** * */ -@ClusterScope(scope= Scope.TEST, numDataNodes =0) +@ClusterScope(scope= Scope.TEST, numDataNodes = 0) public class SimpleNodesInfoIT extends ESIntegTestCase { - static final class Fields { - static final String SITE_PLUGIN = "dummy"; - static final String SITE_PLUGIN_DESCRIPTION = "This is a description for a dummy test site plugin."; - static final String SITE_PLUGIN_VERSION = "0.0.7-BOND-SITE"; - } public void testNodesInfos() throws Exception { List nodesIds = internalCluster().startNodesAsync(2).get();