diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java index 0b54d90680c..173c68fb0be 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java @@ -163,8 +163,8 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen }); } - private void sendPings(final int id, TimeValue timeout, boolean wait) { - UnicastPingRequest pingRequest = new UnicastPingRequest(); + private void sendPings(final int id, final TimeValue timeout, boolean wait) { + final UnicastPingRequest pingRequest = new UnicastPingRequest(); pingRequest.id = id; pingRequest.timeout = timeout; DiscoveryNodes discoNodes = nodesProvider.nodes(); @@ -187,69 +187,27 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen disconnectX = true; } final DiscoveryNode nodeToSend = nodeToSendX; - try { - transportService.connectToNode(nodeToSend); - } catch (ConnectTransportException e) { - logger.trace("[{}] failed to connect to {}", e, id, nodeToSend); - latch.countDown(); - // can't connect to the node - continue; - } - logger.trace("[{}] connecting to {}, disconnect[{}]", id, nodeToSend, disconnectX); final boolean disconnect = disconnectX; - transportService.sendRequest(nodeToSend, UnicastPingRequestHandler.ACTION, pingRequest, TransportRequestOptions.options().withTimeout((long) (timeout.millis() * 1.25)), new BaseTransportResponseHandler() { - - @Override public UnicastPingResponse newInstance() { - return new UnicastPingResponse(); - } - - @Override public String executor() { - return ThreadPool.Names.SAME; - } - - @Override public void handleResponse(UnicastPingResponse response) { - logger.trace("[{}] received response from {}: {}", id, nodeToSend, Arrays.toString(response.pingResponses)); - try { - DiscoveryNodes discoveryNodes = nodesProvider.nodes(); - for (PingResponse pingResponse : response.pingResponses) { - if (disconnect) { - transportService.disconnectFromNode(nodeToSend); - } - if (pingResponse.target().id().equals(discoveryNodes.localNodeId())) { - // that's us, ignore - continue; - } - if (!pingResponse.clusterName().equals(clusterName)) { - // not part of the cluster - logger.debug("[{}] filtering out response from {}, not same cluster_name [{}]", pingResponse.target(), pingResponse.clusterName().value()); - return; - } - ConcurrentMap responses = receivedResponses.get(response.id); - if (responses == null) { - logger.warn("received ping response with no matching id [{}]", response.id); - } else { - responses.put(pingResponse.target(), pingResponse); - } + if (!transportService.nodeConnected(nodeToSend)) { + // fork the connection to another thread + threadPool.cached().execute(new Runnable() { + @Override public void run() { + try { + // connect to the node, see if we manage to do it, if not, bail + transportService.connectToNode(nodeToSend); + // we are connected, send the ping request + sendPingRequestToNode(id, timeout, pingRequest, latch, node, disconnect, nodeToSend); + } catch (ConnectTransportException e) { + // can't connect to the node + logger.trace("[{}] failed to connect to {}", e, id, nodeToSend); + latch.countDown(); } - } finally { - latch.countDown(); } - } - - @Override public void handleException(TransportException exp) { - latch.countDown(); - if (exp instanceof ConnectTransportException) { - // ok, not connected... - logger.trace("failed to connect to {}", exp, nodeToSend); - } else { - if (disconnect) { - transportService.disconnectFromNode(nodeToSend); - } - logger.warn("failed to send ping to [{}]", exp, node); - } - } - }); + }); + } else { + sendPingRequestToNode(id, timeout, pingRequest, latch, node, disconnectX, nodeToSend); + } } if (wait) { try { @@ -260,6 +218,62 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen } } + private void sendPingRequestToNode(final int id, TimeValue timeout, UnicastPingRequest pingRequest, final CountDownLatch latch, final DiscoveryNode node, final boolean disconnect, final DiscoveryNode nodeToSend) { + logger.trace("[{}] connecting to {}, disconnect[{}]", id, nodeToSend, disconnect); + transportService.sendRequest(nodeToSend, UnicastPingRequestHandler.ACTION, pingRequest, TransportRequestOptions.options().withTimeout((long) (timeout.millis() * 1.25)), new BaseTransportResponseHandler() { + + @Override public UnicastPingResponse newInstance() { + return new UnicastPingResponse(); + } + + @Override public String executor() { + return ThreadPool.Names.SAME; + } + + @Override public void handleResponse(UnicastPingResponse response) { + logger.trace("[{}] received response from {}: {}", id, nodeToSend, Arrays.toString(response.pingResponses)); + try { + DiscoveryNodes discoveryNodes = nodesProvider.nodes(); + for (PingResponse pingResponse : response.pingResponses) { + if (disconnect) { + transportService.disconnectFromNode(nodeToSend); + } + if (pingResponse.target().id().equals(discoveryNodes.localNodeId())) { + // that's us, ignore + continue; + } + if (!pingResponse.clusterName().equals(clusterName)) { + // not part of the cluster + logger.debug("[{}] filtering out response from {}, not same cluster_name [{}]", pingResponse.target(), pingResponse.clusterName().value()); + return; + } + ConcurrentMap responses = receivedResponses.get(response.id); + if (responses == null) { + logger.warn("received ping response with no matching id [{}]", response.id); + } else { + responses.put(pingResponse.target(), pingResponse); + } + } + } finally { + latch.countDown(); + } + } + + @Override public void handleException(TransportException exp) { + latch.countDown(); + if (exp instanceof ConnectTransportException) { + // ok, not connected... + logger.trace("failed to connect to {}", exp, nodeToSend); + } else { + if (disconnect) { + transportService.disconnectFromNode(nodeToSend); + } + logger.warn("failed to send ping to [{}]", exp, node); + } + } + }); + } + private UnicastPingResponse handlePingRequest(final UnicastPingRequest request) { temporalResponses.add(request.pingResponse); threadPool.schedule(TimeValue.timeValueMillis(request.timeout.millis() * 2), ThreadPool.Names.SAME, new Runnable() {