Merge pull request #16898 from javanna/enhancement/remove_should_connect_to

Remove DiscoveryNode#shouldConnectTo method
This commit is contained in:
Luca Cavanna 2016-03-02 23:04:31 +01:00
commit 65c9691327
7 changed files with 406 additions and 40 deletions

View File

@ -110,7 +110,7 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
}
private class AsyncAction {
class AsyncAction {
private final NodesRequest request;
private final String[] nodesIds;
@ -120,7 +120,7 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
private final AtomicInteger counter = new AtomicInteger();
private final Task task;
private AsyncAction(Task task, NodesRequest request, ActionListener<NodesResponse> listener) {
AsyncAction(Task task, NodesRequest request, ActionListener<NodesResponse> listener) {
this.task = task;
this.request = request;
this.listener = listener;
@ -135,7 +135,7 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
this.responses = new AtomicReferenceArray<>(this.nodesIds.length);
}
private void start() {
void start() {
if (nodesIds.length == 0) {
// nothing to notify
threadPool.generic().execute(new Runnable() {
@ -158,11 +158,6 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
try {
if (node == null) {
onFailure(idx, nodeId, new NoSuchNodeException(nodeId));
} else if (!clusterService.localNode().shouldConnectTo(node) && !clusterService.localNode().equals(node)) {
// the check "!clusterService.localNode().equals(node)" is to maintain backward comp. where before
// we allowed to connect from "local" client node to itself, certain tests rely on it, if we remove it, we need to fix
// those (and they randomize the client node usage, so tricky to find when)
onFailure(idx, nodeId, new NodeShouldNotConnectException(clusterService.localNode(), node));
} else {
ChildTaskRequest nodeRequest = newNodeRequest(nodeId, request);
if (task != null) {

View File

@ -235,12 +235,6 @@ public abstract class TransportTasksAction<
try {
if (node == null) {
onFailure(idx, nodeId, new NoSuchNodeException(nodeId));
} else if (!clusterService.localNode().shouldConnectTo(node) && !clusterService.localNode().equals(node)) {
// the check "!clusterService.localNode().equals(node)" is to maintain backward comp. where before
// we allowed to connect from "local" client node to itself, certain tests rely on it, if we remove it, we
// need to fix
// those (and they randomize the client node usage, so tricky to find when)
onFailure(idx, nodeId, new NodeShouldNotConnectException(clusterService.localNode(), node));
} else {
NodeTaskRequest nodeRequest = new NodeTaskRequest(request);
nodeRequest.setParentTask(clusterService.localNode().id(), task.getId());

View File

@ -209,16 +209,6 @@ public class DiscoveryNode implements Streamable, ToXContent {
this.version = version;
}
/**
* Should this node form a connection to the provided node.
*/
public boolean shouldConnectTo(DiscoveryNode otherNode) {
if (clientNode() && otherNode.clientNode()) {
return false;
}
return true;
}
/**
* The address that the node can be communicated with.
*/

View File

@ -570,9 +570,6 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
// TODO, do this in parallel (and wait)
for (DiscoveryNode node : nodesDelta.addedNodes()) {
if (!nodeRequiresConnection(node)) {
continue;
}
try {
transportService.connectToNode(node);
} catch (Throwable e) {
@ -824,9 +821,6 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
if (lifecycle.stoppedOrClosed()) {
return;
}
if (!nodeRequiresConnection(node)) {
continue;
}
if (clusterState.nodes().nodeExists(node.id())) { // we double check existence of node since connectToNode might take time...
if (!transportService.nodeConnected(node)) {
try {
@ -873,10 +867,6 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
return Strings.randomBase64UUID(random);
}
private boolean nodeRequiresConnection(DiscoveryNode node) {
return localNode().shouldConnectTo(node);
}
private static class LocalNodeMasterListeners implements ClusterStateListener {
private final List<LocalNodeMasterListener> listeners = new CopyOnWriteArrayList<>();

View File

@ -0,0 +1,226 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support.nodes;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeActionTests;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.cluster.TestClusterService;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.Supplier;
public class TransportNodesActionTests extends ESTestCase {
private static ThreadPool THREAD_POOL;
private static ClusterName CLUSTER_NAME = new ClusterName("test-cluster");
private TestClusterService clusterService;
private CapturingTransport transport;
private TestTransportNodesAction action;
public void testRequestIsSentToEachNode() throws Exception {
TestNodesRequest request = new TestNodesRequest();
PlainActionFuture<TestNodesResponse> listener = new PlainActionFuture<>();
action.new AsyncAction(null, request, listener).start();
Map<String, List<CapturingTransport.CapturedRequest>> capturedRequests = transport.getCapturedRequestsByTargetNodeAndClear();
int numNodes = clusterService.state().getNodes().size();
// check a request was sent to the right number of nodes
assertEquals(numNodes, capturedRequests.size());
}
public void testNodesSelectors() {
int numSelectors = randomIntBetween(1, 5);
Set<String> nodeSelectors = new HashSet<>();
for (int i = 0; i < numSelectors; i++) {
nodeSelectors.add(randomFrom(NodeSelector.values()).selector);
}
int numNodeIds = randomIntBetween(0, 3);
String[] nodeIds = clusterService.state().nodes().nodes().keys().toArray(String.class);
for (int i = 0; i < numNodeIds; i++) {
String nodeId = randomFrom(nodeIds);
nodeSelectors.add(nodeId);
}
String[] finalNodesIds = nodeSelectors.toArray(new String[nodeSelectors.size()]);
TestNodesRequest request = new TestNodesRequest(finalNodesIds);
action.new AsyncAction(null, request, new PlainActionFuture<>()).start();
Map<String, List<CapturingTransport.CapturedRequest>> capturedRequests = transport.getCapturedRequestsByTargetNodeAndClear();
assertEquals(clusterService.state().nodes().resolveNodesIds(finalNodesIds).length, capturedRequests.size());
}
private enum NodeSelector {
LOCAL("_local"), ELECTED_MASTER("_master"), MASTER_ELIGIBLE("master:true"), DATA("data:true"), CUSTOM_ATTRIBUTE("attr:value");
private final String selector;
NodeSelector(String selector) {
this.selector = selector;
}
}
@BeforeClass
public static void startThreadPool() {
THREAD_POOL = new ThreadPool(TransportBroadcastByNodeActionTests.class.getSimpleName());
}
@AfterClass
public static void destroyThreadPool() {
ThreadPool.terminate(THREAD_POOL, 30, TimeUnit.SECONDS);
// since static must set to null to be eligible for collection
THREAD_POOL = null;
}
@Before
public void setUp() throws Exception {
super.setUp();
transport = new CapturingTransport();
clusterService = new TestClusterService(THREAD_POOL);
final TransportService transportService = new TransportService(transport, THREAD_POOL);
transportService.start();
transportService.acceptIncomingRequests();
int numNodes = randomIntBetween(3, 10);
DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder();
List<DiscoveryNode> discoveryNodes = new ArrayList<>();
for (int i = 0; i < numNodes; i++) {
Map<String, String> attributes = new HashMap<>();
if (randomBoolean()) {
attributes.put("master", Boolean.toString(randomBoolean()));
attributes.put("data", Boolean.toString(randomBoolean()));
attributes.put("ingest", Boolean.toString(randomBoolean()));
} else {
attributes.put("client", "true");
}
if (frequently()) {
attributes.put("custom", randomBoolean() ? "match" : randomAsciiOfLengthBetween(3, 5));
}
final DiscoveryNode node = newNode(i, attributes);
discoBuilder = discoBuilder.put(node);
discoveryNodes.add(node);
}
discoBuilder.localNodeId(randomFrom(discoveryNodes).id());
discoBuilder.masterNodeId(randomFrom(discoveryNodes).id());
ClusterState.Builder stateBuilder = ClusterState.builder(CLUSTER_NAME);
stateBuilder.nodes(discoBuilder);
ClusterState clusterState = stateBuilder.build();
clusterService.setState(clusterState);
action = new TestTransportNodesAction(
Settings.EMPTY,
THREAD_POOL,
clusterService,
transportService,
new ActionFilters(Collections.emptySet()),
TestNodesRequest::new,
TestNodeRequest::new,
ThreadPool.Names.SAME
);
}
private static DiscoveryNode newNode(int nodeId, Map<String, String> attributes) {
String node = "node_" + nodeId;
return new DiscoveryNode(node, node, DummyTransportAddress.INSTANCE, attributes, Version.CURRENT);
}
private static class TestTransportNodesAction extends TransportNodesAction<TestNodesRequest, TestNodesResponse, TestNodeRequest,
TestNodeResponse> {
TestTransportNodesAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService
transportService, ActionFilters actionFilters, Supplier<TestNodesRequest> request,
Supplier<TestNodeRequest> nodeRequest, String nodeExecutor) {
super(settings, "indices:admin/test", CLUSTER_NAME, threadPool, clusterService, transportService, actionFilters,
null, request, nodeRequest, nodeExecutor);
}
@Override
protected TestNodesResponse newResponse(TestNodesRequest request, AtomicReferenceArray nodesResponses) {
final List<TestNodeResponse> nodeResponses = new ArrayList<>();
for (int i = 0; i < nodesResponses.length(); i++) {
Object resp = nodesResponses.get(i);
if (resp instanceof TestNodeResponse) {
nodeResponses.add((TestNodeResponse) resp);
}
}
return new TestNodesResponse(nodeResponses);
}
@Override
protected TestNodeRequest newNodeRequest(String nodeId, TestNodesRequest request) {
return new TestNodeRequest();
}
@Override
protected TestNodeResponse newNodeResponse() {
return new TestNodeResponse();
}
@Override
protected TestNodeResponse nodeOperation(TestNodeRequest request) {
return new TestNodeResponse();
}
@Override
protected boolean accumulateExceptions() {
return false;
}
}
private static class TestNodesRequest extends BaseNodesRequest<TestNodesRequest> {
TestNodesRequest(String... nodesIds) {
super(nodesIds);
}
}
private static class TestNodesResponse extends BaseNodesResponse<TestNodeResponse> {
private final List<TestNodeResponse> nodeResponses;
TestNodesResponse(List<TestNodeResponse> nodeResponses) {
this.nodeResponses = nodeResponses;
}
}
private static class TestNodeRequest extends BaseNodeRequest {
}
private static class TestNodeResponse extends BaseNodeResponse {
}
}

View File

@ -0,0 +1,176 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.node;
import org.elasticsearch.Version;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
public class DiscoveryNodesTests extends ESTestCase {
public void testResolveNodeByIdOrName() {
DiscoveryNodes discoveryNodes = buildDiscoveryNodes();
DiscoveryNode[] nodes = discoveryNodes.nodes().values().toArray(DiscoveryNode.class);
DiscoveryNode node = randomFrom(nodes);
DiscoveryNode resolvedNode = discoveryNodes.resolveNode(randomBoolean() ? node.id() : node.name());
assertThat(resolvedNode.id(), equalTo(node.id()));
}
public void testResolveNodeByAttribute() {
DiscoveryNodes discoveryNodes = buildDiscoveryNodes();
NodeSelector nodeSelector = randomFrom(NodeSelector.values());
Set<String> matchingNodeIds = nodeSelector.matchingNodeIds(discoveryNodes);
try {
DiscoveryNode resolvedNode = discoveryNodes.resolveNode(nodeSelector.selector);
assertThat(matchingNodeIds.size(), equalTo(1));
assertThat(resolvedNode.id(), equalTo(matchingNodeIds.iterator().next()));
} catch(IllegalArgumentException e) {
if (matchingNodeIds.size() == 0) {
assertThat(e.getMessage(), equalTo("failed to resolve [" + nodeSelector.selector + "], no matching nodes"));
} else if (matchingNodeIds.size() > 1) {
assertThat(e.getMessage(), containsString("where expected to be resolved to a single node"));
} else {
fail("resolveNode shouldn't have failed for [" + nodeSelector.selector + "]");
}
}
}
public void testResolveNodesIds() {
DiscoveryNodes discoveryNodes = buildDiscoveryNodes();
int numSelectors = randomIntBetween(1, 5);
Set<String> nodeSelectors = new HashSet<>();
Set<String> expectedNodeIdsSet = new HashSet<>();
for (int i = 0; i < numSelectors; i++) {
NodeSelector nodeSelector = randomFrom(NodeSelector.values());
if (nodeSelectors.add(nodeSelector.selector)) {
expectedNodeIdsSet.addAll(nodeSelector.matchingNodeIds(discoveryNodes));
}
}
int numNodeIds = randomIntBetween(0, 3);
String[] nodeIds = discoveryNodes.nodes().keys().toArray(String.class);
for (int i = 0; i < numNodeIds; i++) {
String nodeId = randomFrom(nodeIds);
nodeSelectors.add(nodeId);
expectedNodeIdsSet.add(nodeId);
}
int numNodeNames = randomIntBetween(0, 3);
DiscoveryNode[] nodes = discoveryNodes.nodes().values().toArray(DiscoveryNode.class);
for (int i = 0; i < numNodeNames; i++) {
DiscoveryNode discoveryNode = randomFrom(nodes);
nodeSelectors.add(discoveryNode.name());
expectedNodeIdsSet.add(discoveryNode.id());
}
String[] resolvedNodesIds = discoveryNodes.resolveNodesIds(nodeSelectors.toArray(new String[nodeSelectors.size()]));
Arrays.sort(resolvedNodesIds);
String[] expectedNodesIds = expectedNodeIdsSet.toArray(new String[expectedNodeIdsSet.size()]);
Arrays.sort(expectedNodesIds);
assertThat(resolvedNodesIds, equalTo(expectedNodesIds));
}
private static DiscoveryNodes buildDiscoveryNodes() {
int numNodes = randomIntBetween(1, 10);
DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder();
List<DiscoveryNode> nodesList = new ArrayList<>();
for (int i = 0; i < numNodes; i++) {
Map<String, String> attributes = new HashMap<>();
if (randomBoolean()) {
attributes.put("master", Boolean.toString(randomBoolean()));
attributes.put("data", Boolean.toString(randomBoolean()));
attributes.put("ingest", Boolean.toString(randomBoolean()));
} else {
attributes.put("client", "true");
}
if (frequently()) {
attributes.put("custom", randomBoolean() ? "match" : randomAsciiOfLengthBetween(3, 5));
}
final DiscoveryNode node = newNode(i, attributes);
discoBuilder = discoBuilder.put(node);
nodesList.add(node);
}
discoBuilder.localNodeId(randomFrom(nodesList).id());
discoBuilder.masterNodeId(randomFrom(nodesList).id());
return discoBuilder.build();
}
private static DiscoveryNode newNode(int nodeId, Map<String, String> attributes) {
return new DiscoveryNode("name_" + nodeId, "node_" + nodeId, DummyTransportAddress.INSTANCE, attributes, Version.CURRENT);
}
private enum NodeSelector {
LOCAL("_local") {
@Override
Set<String> matchingNodeIds(DiscoveryNodes nodes) {
return Collections.singleton(nodes.localNodeId());
}
}, ELECTED_MASTER("_master") {
@Override
Set<String> matchingNodeIds(DiscoveryNodes nodes) {
return Collections.singleton(nodes.masterNodeId());
}
}, MASTER_ELIGIBLE("master:true") {
@Override
Set<String> matchingNodeIds(DiscoveryNodes nodes) {
Set<String> ids = new HashSet<>();
nodes.getMasterNodes().keysIt().forEachRemaining(ids::add);
return ids;
}
}, DATA("data:true") {
@Override
Set<String> matchingNodeIds(DiscoveryNodes nodes) {
Set<String> ids = new HashSet<>();
nodes.getDataNodes().keysIt().forEachRemaining(ids::add);
return ids;
}
}, CUSTOM_ATTRIBUTE("attr:value") {
@Override
Set<String> matchingNodeIds(DiscoveryNodes nodes) {
Set<String> ids = new HashSet<>();
nodes.getNodes().valuesIt().forEachRemaining(node -> {
if ("value".equals(node.getAttributes().get("attr"))) {
ids.add(node.id());
}
});
return ids;
}
};
private final String selector;
NodeSelector(String selector) {
this.selector = selector;
}
abstract Set<String> matchingNodeIds(DiscoveryNodes nodes);
}
}

View File

@ -38,13 +38,8 @@ import static org.hamcrest.Matchers.notNullValue;
/**
*
*/
@ClusterScope(scope= Scope.TEST, numDataNodes =0)
@ClusterScope(scope= Scope.TEST, numDataNodes = 0)
public class SimpleNodesInfoIT extends ESIntegTestCase {
static final class Fields {
static final String SITE_PLUGIN = "dummy";
static final String SITE_PLUGIN_DESCRIPTION = "This is a description for a dummy test site plugin.";
static final String SITE_PLUGIN_VERSION = "0.0.7-BOND-SITE";
}
public void testNodesInfos() throws Exception {
List<String> nodesIds = internalCluster().startNodesAsync(2).get();