Java API: Improve TransportClient in sniff mode to be more lightweight on connections and API, closes #1898.

This commit is contained in:
Shay Banon 2012-05-02 11:44:16 +03:00
parent 771225ccc9
commit f01acb20e1
2 changed files with 37 additions and 30 deletions

View File

@ -36,7 +36,7 @@ public class ClusterStateResponse implements ActionResponse {
private ClusterState clusterState; private ClusterState clusterState;
ClusterStateResponse() { public ClusterStateResponse() {
} }
ClusterStateResponse(ClusterName clusterName, ClusterState clusterState) { ClusterStateResponse(ClusterName clusterName, ClusterState clusterState) {

View File

@ -20,13 +20,15 @@
package org.elasticsearch.client.transport; package org.elasticsearch.client.transport;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps; import com.google.common.collect.Sets;
import jsr166y.LinkedTransferQueue;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.Requests; import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
@ -40,8 +42,7 @@ import org.elasticsearch.transport.*;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -307,39 +308,48 @@ public class TransportClientNodesService extends AbstractComponent {
// the nodes we are going to ping include the core listed nodes that were added // the nodes we are going to ping include the core listed nodes that were added
// and the last round of discovered nodes // and the last round of discovered nodes
Map<TransportAddress, DiscoveryNode> nodesToPing = Maps.newHashMap(); Set<DiscoveryNode> nodesToPing = Sets.newHashSet();
for (DiscoveryNode node : listedNodes) { for (DiscoveryNode node : listedNodes) {
nodesToPing.put(node.address(), node); nodesToPing.add(node);
} }
for (DiscoveryNode node : nodes) { for (DiscoveryNode node : nodes) {
nodesToPing.put(node.address(), node); nodesToPing.add(node);
} }
final CountDownLatch latch = new CountDownLatch(nodesToPing.size()); final CountDownLatch latch = new CountDownLatch(nodesToPing.size());
final CopyOnWriteArrayList<NodesInfoResponse> nodesInfoResponses = new CopyOnWriteArrayList<NodesInfoResponse>(); final LinkedTransferQueue<ClusterStateResponse> clusterStateResponses = new LinkedTransferQueue<ClusterStateResponse>();
for (final DiscoveryNode listedNode : nodesToPing.values()) { for (final DiscoveryNode listedNode : nodesToPing) {
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new Runnable() { threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new Runnable() {
@Override @Override
public void run() { public void run() {
try { try {
if (!transportService.nodeConnected(listedNode)) { if (!transportService.nodeConnected(listedNode)) {
try { try {
logger.trace("connecting to node [{}]", listedNode);
transportService.connectToNode(listedNode); // if its one of hte actual nodes we will talk to, not to listed nodes, fully connect
if (nodes.contains(listedNode)) {
logger.trace("connecting to cluster node [{}]", listedNode);
transportService.connectToNode(listedNode);
} else {
// its a listed node, light connect to it...
logger.trace("connecting to listed node (light) [{}]", listedNode);
transportService.connectToNodeLight(listedNode);
}
} catch (Exception e) { } catch (Exception e) {
logger.debug("failed to connect to node [{}], ignoring...", e, listedNode); logger.debug("failed to connect to node [{}], ignoring...", e, listedNode);
latch.countDown(); latch.countDown();
return; return;
} }
} }
transportService.sendRequest(listedNode, NodesInfoAction.NAME, transportService.sendRequest(listedNode, ClusterStateAction.NAME,
Requests.nodesInfoRequest("_all"), Requests.clusterStateRequest()
.filterAll().filterNodes(false).local(true),
TransportRequestOptions.options().withHighType().withTimeout(pingTimeout), TransportRequestOptions.options().withHighType().withTimeout(pingTimeout),
new BaseTransportResponseHandler<NodesInfoResponse>() { new BaseTransportResponseHandler<ClusterStateResponse>() {
@Override @Override
public NodesInfoResponse newInstance() { public ClusterStateResponse newInstance() {
return new NodesInfoResponse(); return new ClusterStateResponse();
} }
@Override @Override
@ -348,20 +358,20 @@ public class TransportClientNodesService extends AbstractComponent {
} }
@Override @Override
public void handleResponse(NodesInfoResponse response) { public void handleResponse(ClusterStateResponse response) {
nodesInfoResponses.add(response); clusterStateResponses.add(response);
latch.countDown(); latch.countDown();
} }
@Override @Override
public void handleException(TransportException e) { public void handleException(TransportException e) {
logger.info("failed to get node info for {}, disconnecting...", e, listedNode); logger.info("failed to get local cluster state for {}, disconnecting...", e, listedNode);
transportService.disconnectFromNode(listedNode); transportService.disconnectFromNode(listedNode);
latch.countDown(); latch.countDown();
} }
}); });
} catch (Exception e) { } catch (Exception e) {
logger.info("failed to get node info for {}, disconnecting...", e, listedNode); logger.info("failed to get local cluster state info for {}, disconnecting...", e, listedNode);
transportService.disconnectFromNode(listedNode); transportService.disconnectFromNode(listedNode);
latch.countDown(); latch.countDown();
} }
@ -376,15 +386,12 @@ public class TransportClientNodesService extends AbstractComponent {
} }
HashSet<DiscoveryNode> newNodes = new HashSet<DiscoveryNode>(); HashSet<DiscoveryNode> newNodes = new HashSet<DiscoveryNode>();
for (NodesInfoResponse nodesInfoResponse : nodesInfoResponses) { for (ClusterStateResponse clusterStateResponse : clusterStateResponses) {
for (NodeInfo nodeInfo : nodesInfoResponse) { if (!clusterName.equals(clusterStateResponse.clusterName())) {
if (!clusterName.equals(nodesInfoResponse.clusterName())) { logger.warn("node {} not part of the cluster {}, ignoring...", clusterStateResponse.state().nodes().localNode(), clusterName);
logger.warn("node {} not part of the cluster {}, ignoring...", nodeInfo.node(), clusterName); }
} else { for (DiscoveryNode node : clusterStateResponse.state().nodes().dataNodes().values()) {
if (nodeInfo.node().dataNode()) { // only add data nodes to connect to newNodes.add(node);
newNodes.add(nodeInfo.node());
}
}
} }
} }
// now, make sure we are connected to all the updated nodes // now, make sure we are connected to all the updated nodes