Unicast Discovery: Concurrently connect to nodes to improve cases where some listed nodes are not up, closes #855.

This commit is contained in:
kimchy 2011-04-14 02:27:59 +03:00
parent 3b72d63035
commit 25eba4b60d
1 changed files with 75 additions and 61 deletions

View File

@ -163,8 +163,8 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
});
}
private void sendPings(final int id, TimeValue timeout, boolean wait) {
UnicastPingRequest pingRequest = new UnicastPingRequest();
private void sendPings(final int id, final TimeValue timeout, boolean wait) {
final UnicastPingRequest pingRequest = new UnicastPingRequest();
pingRequest.id = id;
pingRequest.timeout = timeout;
DiscoveryNodes discoNodes = nodesProvider.nodes();
@ -187,69 +187,27 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
disconnectX = true;
}
final DiscoveryNode nodeToSend = nodeToSendX;
try {
transportService.connectToNode(nodeToSend);
} catch (ConnectTransportException e) {
logger.trace("[{}] failed to connect to {}", e, id, nodeToSend);
latch.countDown();
// can't connect to the node
continue;
}
logger.trace("[{}] connecting to {}, disconnect[{}]", id, nodeToSend, disconnectX);
final boolean disconnect = disconnectX;
transportService.sendRequest(nodeToSend, UnicastPingRequestHandler.ACTION, pingRequest, TransportRequestOptions.options().withTimeout((long) (timeout.millis() * 1.25)), new BaseTransportResponseHandler<UnicastPingResponse>() {
@Override public UnicastPingResponse newInstance() {
return new UnicastPingResponse();
}
@Override public String executor() {
return ThreadPool.Names.SAME;
}
@Override public void handleResponse(UnicastPingResponse response) {
logger.trace("[{}] received response from {}: {}", id, nodeToSend, Arrays.toString(response.pingResponses));
try {
DiscoveryNodes discoveryNodes = nodesProvider.nodes();
for (PingResponse pingResponse : response.pingResponses) {
if (disconnect) {
transportService.disconnectFromNode(nodeToSend);
}
if (pingResponse.target().id().equals(discoveryNodes.localNodeId())) {
// that's us, ignore
continue;
}
if (!pingResponse.clusterName().equals(clusterName)) {
// not part of the cluster
logger.debug("[{}] filtering out response from {}, not same cluster_name [{}]", pingResponse.target(), pingResponse.clusterName().value());
return;
}
ConcurrentMap<DiscoveryNode, PingResponse> responses = receivedResponses.get(response.id);
if (responses == null) {
logger.warn("received ping response with no matching id [{}]", response.id);
} else {
responses.put(pingResponse.target(), pingResponse);
}
if (!transportService.nodeConnected(nodeToSend)) {
// fork the connection to another thread
threadPool.cached().execute(new Runnable() {
@Override public void run() {
try {
// connect to the node, see if we manage to do it, if not, bail
transportService.connectToNode(nodeToSend);
// we are connected, send the ping request
sendPingRequestToNode(id, timeout, pingRequest, latch, node, disconnect, nodeToSend);
} catch (ConnectTransportException e) {
// can't connect to the node
logger.trace("[{}] failed to connect to {}", e, id, nodeToSend);
latch.countDown();
}
} finally {
latch.countDown();
}
}
@Override public void handleException(TransportException exp) {
latch.countDown();
if (exp instanceof ConnectTransportException) {
// ok, not connected...
logger.trace("failed to connect to {}", exp, nodeToSend);
} else {
if (disconnect) {
transportService.disconnectFromNode(nodeToSend);
}
logger.warn("failed to send ping to [{}]", exp, node);
}
}
});
});
} else {
sendPingRequestToNode(id, timeout, pingRequest, latch, node, disconnectX, nodeToSend);
}
}
if (wait) {
try {
@ -260,6 +218,62 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
}
}
private void sendPingRequestToNode(final int id, TimeValue timeout, UnicastPingRequest pingRequest, final CountDownLatch latch, final DiscoveryNode node, final boolean disconnect, final DiscoveryNode nodeToSend) {
logger.trace("[{}] connecting to {}, disconnect[{}]", id, nodeToSend, disconnect);
transportService.sendRequest(nodeToSend, UnicastPingRequestHandler.ACTION, pingRequest, TransportRequestOptions.options().withTimeout((long) (timeout.millis() * 1.25)), new BaseTransportResponseHandler<UnicastPingResponse>() {
@Override public UnicastPingResponse newInstance() {
return new UnicastPingResponse();
}
@Override public String executor() {
return ThreadPool.Names.SAME;
}
@Override public void handleResponse(UnicastPingResponse response) {
logger.trace("[{}] received response from {}: {}", id, nodeToSend, Arrays.toString(response.pingResponses));
try {
DiscoveryNodes discoveryNodes = nodesProvider.nodes();
for (PingResponse pingResponse : response.pingResponses) {
if (disconnect) {
transportService.disconnectFromNode(nodeToSend);
}
if (pingResponse.target().id().equals(discoveryNodes.localNodeId())) {
// that's us, ignore
continue;
}
if (!pingResponse.clusterName().equals(clusterName)) {
// not part of the cluster
logger.debug("[{}] filtering out response from {}, not same cluster_name [{}]", pingResponse.target(), pingResponse.clusterName().value());
return;
}
ConcurrentMap<DiscoveryNode, PingResponse> responses = receivedResponses.get(response.id);
if (responses == null) {
logger.warn("received ping response with no matching id [{}]", response.id);
} else {
responses.put(pingResponse.target(), pingResponse);
}
}
} finally {
latch.countDown();
}
}
@Override public void handleException(TransportException exp) {
latch.countDown();
if (exp instanceof ConnectTransportException) {
// ok, not connected...
logger.trace("failed to connect to {}", exp, nodeToSend);
} else {
if (disconnect) {
transportService.disconnectFromNode(nodeToSend);
}
logger.warn("failed to send ping to [{}]", exp, node);
}
}
});
}
private UnicastPingResponse handlePingRequest(final UnicastPingRequest request) {
temporalResponses.add(request.pingResponse);
threadPool.schedule(TimeValue.timeValueMillis(request.timeout.millis() * 2), ThreadPool.Names.SAME, new Runnable() {