improve unicast to have another try at pinging other nodes within the ping timeout span

This commit is contained in:
Shay Banon 2011-08-10 15:06:34 +03:00
parent 1c555679a1
commit 25c3e898fd
1 changed files with 18 additions and 12 deletions

View File

@ -24,6 +24,7 @@ import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
@ -177,16 +178,21 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
@Override public void ping(final PingListener listener, final TimeValue timeout) throws ElasticSearchException {
final SendPingsHandler sendPingsHandler = new SendPingsHandler(pingIdGenerator.incrementAndGet());
receivedResponses.put(sendPingsHandler.id(), new ConcurrentHashMap<DiscoveryNode, PingResponse>());
sendPings(timeout, false, sendPingsHandler);
threadPool.schedule(timeout, ThreadPool.Names.CACHED, new Runnable() {
sendPings(timeout, null, sendPingsHandler);
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.CACHED, new Runnable() {
@Override public void run() {
sendPings(timeout, true, sendPingsHandler);
for (DiscoveryNode node : sendPingsHandler.nodeToDisconnect) {
transportService.disconnectFromNode(node);
}
ConcurrentMap<DiscoveryNode, PingResponse> responses = receivedResponses.remove(sendPingsHandler.id());
sendPingsHandler.close();
listener.onPing(responses.values().toArray(new PingResponse[responses.size()]));
sendPings(timeout, null, sendPingsHandler);
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.CACHED, new Runnable() {
@Override public void run() {
sendPings(timeout, TimeValue.timeValueMillis(timeout.millis() / 2), sendPingsHandler);
ConcurrentMap<DiscoveryNode, PingResponse> responses = receivedResponses.remove(sendPingsHandler.id());
listener.onPing(responses.values().toArray(new PingResponse[responses.size()]));
for (DiscoveryNode node : sendPingsHandler.nodeToDisconnect) {
transportService.disconnectFromNode(node);
}
sendPingsHandler.close();
}
});
}
});
}
@ -227,7 +233,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
}
}
void sendPings(final TimeValue timeout, boolean wait, final SendPingsHandler sendPingsHandler) {
void sendPings(final TimeValue timeout, @Nullable TimeValue waitTime, final SendPingsHandler sendPingsHandler) {
final UnicastPingRequest pingRequest = new UnicastPingRequest();
pingRequest.id = sendPingsHandler.id();
pingRequest.timeout = timeout;
@ -281,9 +287,9 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
sendPingRequestToNode(sendPingsHandler.id(), timeout, pingRequest, latch, node, nodeToSend);
}
}
if (wait) {
if (waitTime != null) {
try {
latch.await(timeout.millis() * 5, TimeUnit.MILLISECONDS);
latch.await(waitTime.millis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// ignore
}