Make SimpleNodeSampler populate the list of connected nodes using the information returned from the cluster

This is to allow people to introspect things like data settings and attributes. Also makes it consistent with the sniff sampler.

Closes #4162
This commit is contained in:
Boaz Leskes 2013-11-02 15:16:37 +01:00
parent ba61bbb31b
commit c324251fc2
2 changed files with 86 additions and 25 deletions

View File

@ -252,7 +252,7 @@ public class TransportClientNodesService extends AbstractComponent {
public void onFailure(Throwable e) {
if (ExceptionsHelper.unwrapCause(e) instanceof ConnectTransportException) {
int i = ++this.i;
if (i == nodes.size()) {
if (i >= nodes.size()) {
listener.onFailure(new NoNodeAvailableException());
} else {
try {
@ -296,6 +296,28 @@ public class TransportClientNodesService extends AbstractComponent {
}
protected abstract void doSample();
/**
* validates a set of potentially newly discovered nodes and returns an immutable
* list of the nodes that has passed.
*/
protected ImmutableList<DiscoveryNode> validateNewNodes(Set<DiscoveryNode> nodes) {
for (Iterator<DiscoveryNode> it = nodes.iterator(); it.hasNext(); ) {
DiscoveryNode node = it.next();
if (!transportService.nodeConnected(node)) {
try {
logger.trace("connecting to node [{}]", node);
transportService.connectToNode(node);
} catch (Throwable e) {
it.remove();
logger.debug("failed to connect to discovered node [" + node + "]", e);
}
}
}
return new ImmutableList.Builder<DiscoveryNode>().addAll(nodes).build();
}
}
class ScheduledNodeSampler implements Runnable {
@ -317,17 +339,19 @@ public class TransportClientNodesService extends AbstractComponent {
@Override
protected void doSample() {
HashSet<DiscoveryNode> newNodes = new HashSet<DiscoveryNode>();
for (DiscoveryNode node : listedNodes) {
if (!transportService.nodeConnected(node)) {
for (DiscoveryNode listedNode : listedNodes) {
if (!transportService.nodeConnected(listedNode)) {
try {
transportService.connectToNode(node);
// its a listed node, light connect to it...
logger.trace("connecting to listed node (light) [{}]", listedNode);
transportService.connectToNodeLight(listedNode);
} catch (Throwable e) {
logger.debug("failed to connect to node [{}], removed from nodes list", e, node);
logger.debug("failed to connect to node [{}], removed from nodes list", e, listedNode);
continue;
}
}
try {
NodesInfoResponse nodeInfo = transportService.submitRequest(node, NodesInfoAction.NAME,
NodesInfoResponse nodeInfo = transportService.submitRequest(listedNode, NodesInfoAction.NAME,
Requests.nodesInfoRequest("_local"),
TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout),
new FutureTransportResponseHandler<NodesInfoResponse>() {
@ -337,16 +361,26 @@ public class TransportClientNodesService extends AbstractComponent {
}
}).txGet();
if (!ignoreClusterName && !clusterName.equals(nodeInfo.getClusterName())) {
logger.warn("node {} not part of the cluster {}, ignoring...", node, clusterName);
logger.warn("node {} not part of the cluster {}, ignoring...", listedNode, clusterName);
} else if (nodeInfo.getNodes().length != 0) {
// use discovered information but do keep the original transport address, so people can control which address
// is exactly used.
DiscoveryNode nodeWithInfo = nodeInfo.getNodes()[0].getNode();
newNodes.add(new DiscoveryNode(nodeWithInfo.name(), nodeWithInfo.id(), listedNode.address(), nodeWithInfo.attributes(), nodeWithInfo.version()));
} else {
newNodes.add(node);
// although we asked for one node, our target may not have completed initialization yet and doesn't have
// cluster nodes
logger.debug("node {} didn't return any discovery info, temporarily using transport discovery node", listedNode);
newNodes.add(listedNode);
}
} catch (Exception e) {
logger.info("failed to get node info for {}, disconnecting...", e, node);
transportService.disconnectFromNode(node);
logger.info("failed to get node info for {}, disconnecting...", e, listedNode);
transportService.disconnectFromNode(listedNode);
}
}
nodes = new ImmutableList.Builder<DiscoveryNode>().addAll(newNodes).build();
nodes = validateNewNodes(newNodes);
}
}
@ -442,20 +476,8 @@ public class TransportClientNodesService extends AbstractComponent {
newNodes.add(node);
}
}
// now, make sure we are connected to all the updated nodes
for (Iterator<DiscoveryNode> it = newNodes.iterator(); it.hasNext(); ) {
DiscoveryNode node = it.next();
if (!transportService.nodeConnected(node)) {
try {
logger.trace("connecting to node [{}]", node);
transportService.connectToNode(node);
} catch (Throwable e) {
it.remove();
logger.debug("failed to connect to discovered node [" + node + "]", e);
}
}
}
nodes = new ImmutableList.Builder<DiscoveryNode>().addAll(newNodes).build();
nodes = validateNewNodes(newNodes);
}
}

View File

@ -0,0 +1,39 @@
package org.elasticsearch.client.transport;
/*
* Licensed to ElasticSearch under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.hamcrest.Matchers;
import org.junit.Test;
@ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numNodes = 0, transportClientRatio = 1.0)
public class TransportClientTests extends ElasticsearchIntegrationTest {
@Test
public void testPickingUpChangesInDiscoveryNode() {
String nodeName = cluster().startNode(ImmutableSettings.builder().put("node.data", false));
TransportClient client = (TransportClient) cluster().client(nodeName);
assertThat(client.connectedNodes().get(0).dataNode(), Matchers.equalTo(false));
}
}