diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java index b3ed0a9ba05..0c9a7751e0c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java @@ -24,6 +24,7 @@ import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.Lists; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.io.stream.StreamInput; @@ -177,16 +178,21 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen @Override public void ping(final PingListener listener, final TimeValue timeout) throws ElasticSearchException { final SendPingsHandler sendPingsHandler = new SendPingsHandler(pingIdGenerator.incrementAndGet()); receivedResponses.put(sendPingsHandler.id(), new ConcurrentHashMap()); - sendPings(timeout, false, sendPingsHandler); - threadPool.schedule(timeout, ThreadPool.Names.CACHED, new Runnable() { + sendPings(timeout, null, sendPingsHandler); + threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.CACHED, new Runnable() { @Override public void run() { - sendPings(timeout, true, sendPingsHandler); - for (DiscoveryNode node : sendPingsHandler.nodeToDisconnect) { - transportService.disconnectFromNode(node); - } - ConcurrentMap responses = receivedResponses.remove(sendPingsHandler.id()); - sendPingsHandler.close(); - listener.onPing(responses.values().toArray(new PingResponse[responses.size()])); + sendPings(timeout, null, sendPingsHandler); + threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.CACHED, new Runnable() { + @Override public void run() { + sendPings(timeout, TimeValue.timeValueMillis(timeout.millis() / 2), sendPingsHandler); + ConcurrentMap responses = receivedResponses.remove(sendPingsHandler.id()); + listener.onPing(responses.values().toArray(new PingResponse[responses.size()])); + for (DiscoveryNode node : sendPingsHandler.nodeToDisconnect) { + transportService.disconnectFromNode(node); + } + sendPingsHandler.close(); + } + }); } }); } @@ -227,7 +233,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen } } - void sendPings(final TimeValue timeout, boolean wait, final SendPingsHandler sendPingsHandler) { + void sendPings(final TimeValue timeout, @Nullable TimeValue waitTime, final SendPingsHandler sendPingsHandler) { final UnicastPingRequest pingRequest = new UnicastPingRequest(); pingRequest.id = sendPingsHandler.id(); pingRequest.timeout = timeout; @@ -281,9 +287,9 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen sendPingRequestToNode(sendPingsHandler.id(), timeout, pingRequest, latch, node, nodeToSend); } } - if (wait) { + if (waitTime != null) { try { - latch.await(timeout.millis() * 5, TimeUnit.MILLISECONDS); + latch.await(waitTime.millis(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { // ignore }