improve unicast discovery and try to reduce the number of connections made

This commit is contained in:
Shay Banon 2011-08-02 16:30:56 +03:00
parent 9e621afeb8
commit e44fb27db1
1 changed files with 24 additions and 19 deletions

View File

@ -25,6 +25,7 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -49,6 +50,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
@ -168,17 +170,21 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
@Override public void ping(final PingListener listener, final TimeValue timeout) throws ElasticSearchException {
final int id = pingIdGenerator.incrementAndGet();
receivedResponses.put(id, new ConcurrentHashMap<DiscoveryNode, PingResponse>());
sendPings(id, timeout, false);
final Set<DiscoveryNode> nodesToDisconnect1 = sendPings(id, timeout, false);
threadPool.schedule(timeout, ThreadPool.Names.CACHED, new Runnable() {
@Override public void run() {
sendPings(id, timeout, true);
final Set<DiscoveryNode> nodesToDisconnect = Sets.newHashSet(nodesToDisconnect1);
nodesToDisconnect.addAll(sendPings(id, timeout, true));
for (DiscoveryNode node : nodesToDisconnect) {
transportService.disconnectFromNode(node);
}
ConcurrentMap<DiscoveryNode, PingResponse> responses = receivedResponses.remove(id);
listener.onPing(responses.values().toArray(new PingResponse[responses.size()]));
}
});
}
private void sendPings(final int id, final TimeValue timeout, boolean wait) {
Set<DiscoveryNode> sendPings(final int id, final TimeValue timeout, boolean wait) {
final UnicastPingRequest pingRequest = new UnicastPingRequest();
pingRequest.id = id;
pingRequest.timeout = timeout;
@ -190,33 +196,36 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
nodesToPing.addAll(provider.buildDynamicNodes());
}
Set<DiscoveryNode> nodesToDisconnect = Sets.newHashSet();
final CountDownLatch latch = new CountDownLatch(nodesToPing.size());
for (final DiscoveryNode node : nodesToPing) {
// make sure we are connected
boolean disconnectX;
boolean nodeFoundByAddressX;
DiscoveryNode nodeToSendX = discoNodes.findByAddress(node.address());
if (nodeToSendX != null) {
disconnectX = false;
nodeFoundByAddressX = false;
} else {
nodeToSendX = node;
disconnectX = true;
nodeFoundByAddressX = true;
}
final DiscoveryNode nodeToSend = nodeToSendX;
final boolean disconnect = disconnectX;
final boolean nodeFoundByAddress = nodeFoundByAddressX;
if (!transportService.nodeConnected(nodeToSend)) {
nodesToDisconnect.add(nodeToSend);
// fork the connection to another thread
threadPool.cached().execute(new Runnable() {
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new Runnable() {
@Override public void run() {
try {
// connect to the node, see if we manage to do it, if not, bail
if (disconnect) {
if (!nodeFoundByAddress) {
transportService.connectToNodeLight(nodeToSend);
} else {
transportService.connectToNode(nodeToSend);
}
// we are connected, send the ping request
sendPingRequestToNode(id, timeout, pingRequest, latch, node, disconnect, nodeToSend);
sendPingRequestToNode(id, timeout, pingRequest, latch, node, nodeToSend);
} catch (ConnectTransportException e) {
// can't connect to the node
logger.trace("[{}] failed to connect to {}", e, id, nodeToSend);
@ -225,7 +234,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
}
});
} else {
sendPingRequestToNode(id, timeout, pingRequest, latch, node, disconnectX, nodeToSend);
sendPingRequestToNode(id, timeout, pingRequest, latch, node, nodeToSend);
}
}
if (wait) {
@ -235,10 +244,12 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
// ignore
}
}
return nodesToDisconnect;
}
private void sendPingRequestToNode(final int id, TimeValue timeout, UnicastPingRequest pingRequest, final CountDownLatch latch, final DiscoveryNode node, final boolean disconnect, final DiscoveryNode nodeToSend) {
logger.trace("[{}] connecting to {}, disconnect[{}]", id, nodeToSend, disconnect);
private void sendPingRequestToNode(final int id, TimeValue timeout, UnicastPingRequest pingRequest, final CountDownLatch latch, final DiscoveryNode node, final DiscoveryNode nodeToSend) {
logger.trace("[{}] connecting to {}", id, nodeToSend);
transportService.sendRequest(nodeToSend, UnicastPingRequestHandler.ACTION, pingRequest, TransportRequestOptions.options().withTimeout((long) (timeout.millis() * 1.25)), new BaseTransportResponseHandler<UnicastPingResponse>() {
@Override public UnicastPingResponse newInstance() {
@ -254,9 +265,6 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
try {
DiscoveryNodes discoveryNodes = nodesProvider.nodes();
for (PingResponse pingResponse : response.pingResponses) {
if (disconnect) {
transportService.disconnectFromNode(nodeToSend);
}
if (pingResponse.target().id().equals(discoveryNodes.localNodeId())) {
// that's us, ignore
continue;
@ -284,9 +292,6 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
// ok, not connected...
logger.trace("failed to connect to {}", exp, nodeToSend);
} else {
if (disconnect) {
transportService.disconnectFromNode(nodeToSend);
}
logger.warn("failed to send ping to [{}]", exp, node);
}
}