Transport Client: When `sniff` is enabled, use the sniffed nodes to be the list fo nodes to ping as well as the provided nodes, closes #1217.

This commit is contained in:
Shay Banon 2011-09-27 01:29:03 +03:00
parent 0c82fc5901
commit e0fdccd9c0
1 changed files with 16 additions and 4 deletions

View File

@ -29,6 +29,7 @@ import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -43,6 +44,7 @@ import org.elasticsearch.transport.TransportService;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
@ -288,11 +290,21 @@ public class TransportClientNodesService extends AbstractComponent {
if (closed) {
return;
}
ImmutableList<DiscoveryNode> listedNodes = TransportClientNodesService.this.listedNodes;
final CountDownLatch latch = new CountDownLatch(listedNodes.size());
// the nodes we are going to ping include the core listed nodes that were added
// and the last round of discovered nodes
Map<TransportAddress, DiscoveryNode> nodesToPing = Maps.newHashMap();
for (DiscoveryNode node : listedNodes) {
nodesToPing.put(node.address(), node);
}
for (DiscoveryNode node : nodes) {
nodesToPing.put(node.address(), node);
}
final CountDownLatch latch = new CountDownLatch(nodesToPing.size());
final CopyOnWriteArrayList<NodesInfoResponse> nodesInfoResponses = new CopyOnWriteArrayList<NodesInfoResponse>();
for (final DiscoveryNode listedNode : listedNodes) {
threadPool.cached().execute(new Runnable() {
for (final DiscoveryNode listedNode : nodesToPing.values()) {
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new Runnable() {
@Override public void run() {
try {
transportService.connectToNode(listedNode); // make sure we are connected to it