From 04947a5fef49ebc7a6e9f68abeec0afb80fafbdc Mon Sep 17 00:00:00 2001 From: javanna Date: Tue, 1 Mar 2016 18:55:19 +0100 Subject: [PATCH] Remove DiscoveryNode#shouldConnectTo method This method was originally introduced to prevent client nodes from connecting to other client nodes directly in the cluster. That said, it worked only if node.client was set to true and not when node.master and node.data were both set to false. It looks safe to remove, which allows to solve all kinds of problems around monitoring that happen wherever there are 2 or more clients nodes in the cluster, and a REST call hits one of them (node counts are off, clients nodes are missing). --- .../support/nodes/TransportNodesAction.java | 11 +- .../support/tasks/TransportTasksAction.java | 6 - .../cluster/node/DiscoveryNode.java | 10 - .../service/InternalClusterService.java | 10 - .../nodes/TransportNodesActionTests.java | 226 ++++++++++++++++++ .../nodesinfo/SimpleNodesInfoIT.java | 7 +- 6 files changed, 230 insertions(+), 40 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java diff --git a/core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java b/core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java index 0f31a80c28f..7e2702afd8a 100644 --- a/core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java @@ -110,7 +110,7 @@ public abstract class TransportNodesAction listener) { + AsyncAction(Task task, NodesRequest request, ActionListener listener) { this.task = task; this.request = request; this.listener = listener; @@ -135,7 +135,7 @@ public abstract class TransportNodesAction(this.nodesIds.length); } - private void start() { + void start() { if (nodesIds.length == 0) { // nothing to notify threadPool.generic().execute(new Runnable() { @@ -158,11 +158,6 @@ public abstract class TransportNodesAction listeners = new CopyOnWriteArrayList<>(); diff --git a/core/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java b/core/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java new file mode 100644 index 00000000000..0a6f94366f9 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java @@ -0,0 +1,226 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.support.nodes; + +import org.elasticsearch.Version; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeActionTests; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.DummyTransportAddress; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.cluster.TestClusterService; +import org.elasticsearch.test.transport.CapturingTransport; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.function.Supplier; + +public class TransportNodesActionTests extends ESTestCase { + + private static ThreadPool THREAD_POOL; + private static ClusterName CLUSTER_NAME = new ClusterName("test-cluster"); + + private TestClusterService clusterService; + private CapturingTransport transport; + private TestTransportNodesAction action; + + public void testRequestIsSentToEachNode() throws Exception { + TestNodesRequest request = new TestNodesRequest(); + PlainActionFuture listener = new PlainActionFuture<>(); + action.new AsyncAction(null, request, listener).start(); + Map> capturedRequests = transport.getCapturedRequestsByTargetNodeAndClear(); + int numNodes = clusterService.state().getNodes().size(); + // check a request was sent to the right number of nodes + assertEquals(numNodes, capturedRequests.size()); + } + + public void testNodesSelectors() { + int numSelectors = randomIntBetween(1, 5); + Set nodeSelectors = new HashSet<>(); + for (int i = 0; i < numSelectors; i++) { + nodeSelectors.add(randomFrom(NodeSelector.values()).selector); + } + int numNodeIds = randomIntBetween(0, 3); + String[] nodeIds = clusterService.state().nodes().nodes().keys().toArray(String.class); + for (int i = 0; i < numNodeIds; i++) { + String nodeId = randomFrom(nodeIds); + nodeSelectors.add(nodeId); + } + String[] finalNodesIds = nodeSelectors.toArray(new String[nodeSelectors.size()]); + TestNodesRequest request = new TestNodesRequest(finalNodesIds); + action.new AsyncAction(null, request, new PlainActionFuture<>()).start(); + Map> capturedRequests = transport.getCapturedRequestsByTargetNodeAndClear(); + assertEquals(clusterService.state().nodes().resolveNodesIds(finalNodesIds).length, capturedRequests.size()); + } + + private enum NodeSelector { + LOCAL("_local"), ELECTED_MASTER("_master"), MASTER_ELIGIBLE("master:true"), DATA("data:true"), CUSTOM_ATTRIBUTE("attr:value"); + + private final String selector; + + NodeSelector(String selector) { + this.selector = selector; + } + } + + @BeforeClass + public static void startThreadPool() { + THREAD_POOL = new ThreadPool(TransportBroadcastByNodeActionTests.class.getSimpleName()); + } + + @AfterClass + public static void destroyThreadPool() { + ThreadPool.terminate(THREAD_POOL, 30, TimeUnit.SECONDS); + // since static must set to null to be eligible for collection + THREAD_POOL = null; + } + + @Before + public void setUp() throws Exception { + super.setUp(); + transport = new CapturingTransport(); + clusterService = new TestClusterService(THREAD_POOL); + final TransportService transportService = new TransportService(transport, THREAD_POOL); + transportService.start(); + transportService.acceptIncomingRequests(); + int numNodes = randomIntBetween(3, 10); + DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder(); + List discoveryNodes = new ArrayList<>(); + for (int i = 0; i < numNodes; i++) { + Map attributes = new HashMap<>(); + if (randomBoolean()) { + attributes.put("master", Boolean.toString(randomBoolean())); + attributes.put("data", Boolean.toString(randomBoolean())); + attributes.put("ingest", Boolean.toString(randomBoolean())); + } else { + attributes.put("client", "true"); + } + if (frequently()) { + attributes.put("custom", randomBoolean() ? "match" : randomAsciiOfLengthBetween(3, 5)); + } + final DiscoveryNode node = newNode(i, attributes); + discoBuilder = discoBuilder.put(node); + discoveryNodes.add(node); + } + discoBuilder.localNodeId(randomFrom(discoveryNodes).id()); + discoBuilder.masterNodeId(randomFrom(discoveryNodes).id()); + ClusterState.Builder stateBuilder = ClusterState.builder(CLUSTER_NAME); + stateBuilder.nodes(discoBuilder); + ClusterState clusterState = stateBuilder.build(); + clusterService.setState(clusterState); + action = new TestTransportNodesAction( + Settings.EMPTY, + THREAD_POOL, + clusterService, + transportService, + new ActionFilters(Collections.emptySet()), + TestNodesRequest::new, + TestNodeRequest::new, + ThreadPool.Names.SAME + ); + } + + private static DiscoveryNode newNode(int nodeId, Map attributes) { + String node = "node_" + nodeId; + return new DiscoveryNode(node, node, DummyTransportAddress.INSTANCE, attributes, Version.CURRENT); + } + + private static class TestTransportNodesAction extends TransportNodesAction { + + TestTransportNodesAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService + transportService, ActionFilters actionFilters, Supplier request, + Supplier nodeRequest, String nodeExecutor) { + super(settings, "indices:admin/test", CLUSTER_NAME, threadPool, clusterService, transportService, actionFilters, + null, request, nodeRequest, nodeExecutor); + } + + @Override + protected TestNodesResponse newResponse(TestNodesRequest request, AtomicReferenceArray nodesResponses) { + final List nodeResponses = new ArrayList<>(); + for (int i = 0; i < nodesResponses.length(); i++) { + Object resp = nodesResponses.get(i); + if (resp instanceof TestNodeResponse) { + nodeResponses.add((TestNodeResponse) resp); + } + } + return new TestNodesResponse(nodeResponses); + } + + @Override + protected TestNodeRequest newNodeRequest(String nodeId, TestNodesRequest request) { + return new TestNodeRequest(); + } + + @Override + protected TestNodeResponse newNodeResponse() { + return new TestNodeResponse(); + } + + @Override + protected TestNodeResponse nodeOperation(TestNodeRequest request) { + return new TestNodeResponse(); + } + + @Override + protected boolean accumulateExceptions() { + return false; + } + } + + private static class TestNodesRequest extends BaseNodesRequest { + TestNodesRequest(String... nodesIds) { + super(nodesIds); + } + } + + private static class TestNodesResponse extends BaseNodesResponse { + + private final List nodeResponses; + + TestNodesResponse(List nodeResponses) { + this.nodeResponses = nodeResponses; + } + } + + private static class TestNodeRequest extends BaseNodeRequest { + } + + private static class TestNodeResponse extends BaseNodeResponse { + } +} diff --git a/core/src/test/java/org/elasticsearch/nodesinfo/SimpleNodesInfoIT.java b/core/src/test/java/org/elasticsearch/nodesinfo/SimpleNodesInfoIT.java index b643ba0d0ad..e3777e84f9a 100644 --- a/core/src/test/java/org/elasticsearch/nodesinfo/SimpleNodesInfoIT.java +++ b/core/src/test/java/org/elasticsearch/nodesinfo/SimpleNodesInfoIT.java @@ -38,13 +38,8 @@ import static org.hamcrest.Matchers.notNullValue; /** * */ -@ClusterScope(scope= Scope.TEST, numDataNodes =0) +@ClusterScope(scope= Scope.TEST, numDataNodes = 0) public class SimpleNodesInfoIT extends ESIntegTestCase { - static final class Fields { - static final String SITE_PLUGIN = "dummy"; - static final String SITE_PLUGIN_DESCRIPTION = "This is a description for a dummy test site plugin."; - static final String SITE_PLUGIN_VERSION = "0.0.7-BOND-SITE"; - } public void testNodesInfos() throws Exception { List nodesIds = internalCluster().startNodesAsync(2).get();