diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateResponse.java b/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateResponse.java index 7bb0a1551ae..5591786a0d6 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateResponse.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateResponse.java @@ -36,7 +36,7 @@ public class ClusterStateResponse implements ActionResponse { private ClusterState clusterState; - ClusterStateResponse() { + public ClusterStateResponse() { } ClusterStateResponse(ClusterName clusterName, ClusterState clusterState) { diff --git a/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index 3f519bc28b5..c1fb7e22af6 100644 --- a/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -20,13 +20,15 @@ package org.elasticsearch.client.transport; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import jsr166y.LinkedTransferQueue; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -40,8 +42,7 @@ import org.elasticsearch.transport.*; import java.util.HashSet; import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.CopyOnWriteArrayList; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicInteger; @@ -307,39 +308,48 @@ public class TransportClientNodesService extends AbstractComponent { // the nodes we are going to ping include the core listed nodes that were added // and the last round of discovered nodes - Map nodesToPing = Maps.newHashMap(); + Set nodesToPing = Sets.newHashSet(); for (DiscoveryNode node : listedNodes) { - nodesToPing.put(node.address(), node); + nodesToPing.add(node); } for (DiscoveryNode node : nodes) { - nodesToPing.put(node.address(), node); + nodesToPing.add(node); } final CountDownLatch latch = new CountDownLatch(nodesToPing.size()); - final CopyOnWriteArrayList nodesInfoResponses = new CopyOnWriteArrayList(); - for (final DiscoveryNode listedNode : nodesToPing.values()) { + final LinkedTransferQueue clusterStateResponses = new LinkedTransferQueue(); + for (final DiscoveryNode listedNode : nodesToPing) { threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new Runnable() { @Override public void run() { try { if (!transportService.nodeConnected(listedNode)) { try { - logger.trace("connecting to node [{}]", listedNode); - transportService.connectToNode(listedNode); + + // if its one of hte actual nodes we will talk to, not to listed nodes, fully connect + if (nodes.contains(listedNode)) { + logger.trace("connecting to cluster node [{}]", listedNode); + transportService.connectToNode(listedNode); + } else { + // its a listed node, light connect to it... + logger.trace("connecting to listed node (light) [{}]", listedNode); + transportService.connectToNodeLight(listedNode); + } } catch (Exception e) { logger.debug("failed to connect to node [{}], ignoring...", e, listedNode); latch.countDown(); return; } } - transportService.sendRequest(listedNode, NodesInfoAction.NAME, - Requests.nodesInfoRequest("_all"), + transportService.sendRequest(listedNode, ClusterStateAction.NAME, + Requests.clusterStateRequest() + .filterAll().filterNodes(false).local(true), TransportRequestOptions.options().withHighType().withTimeout(pingTimeout), - new BaseTransportResponseHandler() { + new BaseTransportResponseHandler() { @Override - public NodesInfoResponse newInstance() { - return new NodesInfoResponse(); + public ClusterStateResponse newInstance() { + return new ClusterStateResponse(); } @Override @@ -348,20 +358,20 @@ public class TransportClientNodesService extends AbstractComponent { } @Override - public void handleResponse(NodesInfoResponse response) { - nodesInfoResponses.add(response); + public void handleResponse(ClusterStateResponse response) { + clusterStateResponses.add(response); latch.countDown(); } @Override public void handleException(TransportException e) { - logger.info("failed to get node info for {}, disconnecting...", e, listedNode); + logger.info("failed to get local cluster state for {}, disconnecting...", e, listedNode); transportService.disconnectFromNode(listedNode); latch.countDown(); } }); } catch (Exception e) { - logger.info("failed to get node info for {}, disconnecting...", e, listedNode); + logger.info("failed to get local cluster state info for {}, disconnecting...", e, listedNode); transportService.disconnectFromNode(listedNode); latch.countDown(); } @@ -376,15 +386,12 @@ public class TransportClientNodesService extends AbstractComponent { } HashSet newNodes = new HashSet(); - for (NodesInfoResponse nodesInfoResponse : nodesInfoResponses) { - for (NodeInfo nodeInfo : nodesInfoResponse) { - if (!clusterName.equals(nodesInfoResponse.clusterName())) { - logger.warn("node {} not part of the cluster {}, ignoring...", nodeInfo.node(), clusterName); - } else { - if (nodeInfo.node().dataNode()) { // only add data nodes to connect to - newNodes.add(nodeInfo.node()); - } - } + for (ClusterStateResponse clusterStateResponse : clusterStateResponses) { + if (!clusterName.equals(clusterStateResponse.clusterName())) { + logger.warn("node {} not part of the cluster {}, ignoring...", clusterStateResponse.state().nodes().localNode(), clusterName); + } + for (DiscoveryNode node : clusterStateResponse.state().nodes().dataNodes().values()) { + newNodes.add(node); } } // now, make sure we are connected to all the updated nodes