From c324251fc2b4c12adbed05396fe17a3682e01933 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sat, 2 Nov 2013 15:16:37 +0100 Subject: [PATCH] Make SimpleNodeSampler populate the list of connected nodes using the information returned from the cluster This is to allow people to introspect things like data settings and attributes. Also makes it consistent with the sniff sampler. Closes #4162 --- .../TransportClientNodesService.java | 72 ++++++++++++------- .../transport/TransportClientTests.java | 39 ++++++++++ 2 files changed, 86 insertions(+), 25 deletions(-) create mode 100644 src/test/java/org/elasticsearch/client/transport/TransportClientTests.java diff --git a/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index 17e361edef6..a3ed6da3947 100644 --- a/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -252,7 +252,7 @@ public class TransportClientNodesService extends AbstractComponent { public void onFailure(Throwable e) { if (ExceptionsHelper.unwrapCause(e) instanceof ConnectTransportException) { int i = ++this.i; - if (i == nodes.size()) { + if (i >= nodes.size()) { listener.onFailure(new NoNodeAvailableException()); } else { try { @@ -296,6 +296,28 @@ public class TransportClientNodesService extends AbstractComponent { } protected abstract void doSample(); + + /** + * validates a set of potentially newly discovered nodes and returns an immutable + * list of the nodes that has passed. + */ + protected ImmutableList validateNewNodes(Set nodes) { + for (Iterator it = nodes.iterator(); it.hasNext(); ) { + DiscoveryNode node = it.next(); + if (!transportService.nodeConnected(node)) { + try { + logger.trace("connecting to node [{}]", node); + transportService.connectToNode(node); + } catch (Throwable e) { + it.remove(); + logger.debug("failed to connect to discovered node [" + node + "]", e); + } + } + } + + return new ImmutableList.Builder().addAll(nodes).build(); + } + } class ScheduledNodeSampler implements Runnable { @@ -317,17 +339,19 @@ public class TransportClientNodesService extends AbstractComponent { @Override protected void doSample() { HashSet newNodes = new HashSet(); - for (DiscoveryNode node : listedNodes) { - if (!transportService.nodeConnected(node)) { + for (DiscoveryNode listedNode : listedNodes) { + if (!transportService.nodeConnected(listedNode)) { try { - transportService.connectToNode(node); + // its a listed node, light connect to it... + logger.trace("connecting to listed node (light) [{}]", listedNode); + transportService.connectToNodeLight(listedNode); } catch (Throwable e) { - logger.debug("failed to connect to node [{}], removed from nodes list", e, node); + logger.debug("failed to connect to node [{}], removed from nodes list", e, listedNode); continue; } } try { - NodesInfoResponse nodeInfo = transportService.submitRequest(node, NodesInfoAction.NAME, + NodesInfoResponse nodeInfo = transportService.submitRequest(listedNode, NodesInfoAction.NAME, Requests.nodesInfoRequest("_local"), TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout), new FutureTransportResponseHandler() { @@ -337,16 +361,26 @@ public class TransportClientNodesService extends AbstractComponent { } }).txGet(); if (!ignoreClusterName && !clusterName.equals(nodeInfo.getClusterName())) { - logger.warn("node {} not part of the cluster {}, ignoring...", node, clusterName); + logger.warn("node {} not part of the cluster {}, ignoring...", listedNode, clusterName); + } else if (nodeInfo.getNodes().length != 0) { + // use discovered information but do keep the original transport address, so people can control which address + // is exactly used. + + DiscoveryNode nodeWithInfo = nodeInfo.getNodes()[0].getNode(); + newNodes.add(new DiscoveryNode(nodeWithInfo.name(), nodeWithInfo.id(), listedNode.address(), nodeWithInfo.attributes(), nodeWithInfo.version())); } else { - newNodes.add(node); + // although we asked for one node, our target may not have completed initialization yet and doesn't have + // cluster nodes + logger.debug("node {} didn't return any discovery info, temporarily using transport discovery node", listedNode); + newNodes.add(listedNode); } } catch (Exception e) { - logger.info("failed to get node info for {}, disconnecting...", e, node); - transportService.disconnectFromNode(node); + logger.info("failed to get node info for {}, disconnecting...", e, listedNode); + transportService.disconnectFromNode(listedNode); } } - nodes = new ImmutableList.Builder().addAll(newNodes).build(); + + nodes = validateNewNodes(newNodes); } } @@ -442,20 +476,8 @@ public class TransportClientNodesService extends AbstractComponent { newNodes.add(node); } } - // now, make sure we are connected to all the updated nodes - for (Iterator it = newNodes.iterator(); it.hasNext(); ) { - DiscoveryNode node = it.next(); - if (!transportService.nodeConnected(node)) { - try { - logger.trace("connecting to node [{}]", node); - transportService.connectToNode(node); - } catch (Throwable e) { - it.remove(); - logger.debug("failed to connect to discovered node [" + node + "]", e); - } - } - } - nodes = new ImmutableList.Builder().addAll(newNodes).build(); + + nodes = validateNewNodes(newNodes); } } diff --git a/src/test/java/org/elasticsearch/client/transport/TransportClientTests.java b/src/test/java/org/elasticsearch/client/transport/TransportClientTests.java new file mode 100644 index 00000000000..d9095576bb7 --- /dev/null +++ b/src/test/java/org/elasticsearch/client/transport/TransportClientTests.java @@ -0,0 +1,39 @@ +package org.elasticsearch.client.transport; +/* + * Licensed to ElasticSearch under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; +import org.hamcrest.Matchers; +import org.junit.Test; + +@ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numNodes = 0, transportClientRatio = 1.0) +public class TransportClientTests extends ElasticsearchIntegrationTest { + + @Test + public void testPickingUpChangesInDiscoveryNode() { + String nodeName = cluster().startNode(ImmutableSettings.builder().put("node.data", false)); + + TransportClient client = (TransportClient) cluster().client(nodeName); + assertThat(client.connectedNodes().get(0).dataNode(), Matchers.equalTo(false)); + + } +}