Discovery: add a finalize round to multicast pinging

When sending a multicast ping, there is no way to determine how long it will take before all nodes will respond. Currently we send two pings (one at start, one after half timeout) and wait until the ping timeout has passed for all responses to come back. However, if all nodes are fast to respond, there is a gap relatively large between the moment that pings were gathered and the election that is based on them. This commits adds a last ping round (at timeout) where we know the number of nodes we expect to receive answers from. Once all nodes responded, we complete the pinging.

Closes #7924
This commit is contained in:
Boaz Leskes 2014-09-29 22:38:22 +02:00
parent ab5d1b9633
commit be2229c183
2 changed files with 107 additions and 15 deletions

View File

@ -20,7 +20,6 @@
package org.elasticsearch.discovery.zen.ping;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.LifecycleComponent;
@ -190,5 +189,9 @@ public interface ZenPing extends LifecycleComponent<ZenPing> {
return pings.values().toArray(new PingResponse[pings.size()]);
}
/** the number of nodes for which there are known pings */
public synchronized int size() {
return pings.size();
}
}
}

View File

@ -35,6 +35,7 @@ import org.elasticsearch.common.network.MulticastChannel;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
@ -187,23 +188,58 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
sendPingRequest(id);
// try and send another ping request halfway through (just in case someone woke up during it...)
// this can be a good trade-off to nailing the initial lookup or un-delivered messages
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new Runnable() {
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
public void run() {
try {
sendPingRequest(id);
} catch (Exception e) {
logger.warn("[{}] failed to send second ping request", e, id);
}
}
});
threadPool.schedule(timeout, ThreadPool.Names.GENERIC, new Runnable() {
@Override
public void run() {
PingCollection responses = receivedResponses.remove(id);
listener.onPing(responses.toArray());
public void onFailure(Throwable t) {
logger.warn("[{}] failed to send second ping request", t, id);
finalizePingCycle(id, listener);
}
@Override
public void doRun() {
sendPingRequest(id);
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}] failed to send third ping request", t, id);
finalizePingCycle(id, listener);
}
@Override
public void doRun() {
// make one last ping, but finalize as soon as all nodes have responded or a timeout has past
PingCollection collection = receivedResponses.get(id);
FinalizingPingCollection finalizingPingCollection = new FinalizingPingCollection(id, collection, collection.size(), listener);
receivedResponses.put(id, finalizingPingCollection);
logger.trace("[{}] sending last pings", id);
sendPingRequest(id);
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 4), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}] failed to finalize ping", t, id);
}
@Override
protected void doRun() throws Exception {
finalizePingCycle(id, listener);
}
});
}
});
}
});
}
/**
* takes all pings collected for a given id and pass them to the given listener.
* this method is safe to call multiple times as is guaranteed to only finalize once.
*/
private void finalizePingCycle(int id, final PingListener listener) {
PingCollection responses = receivedResponses.remove(id);
if (responses != null) {
listener.onPing(responses.toArray());
}
}
private void sendPingRequest(int id) {
@ -233,6 +269,59 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
}
}
class FinalizingPingCollection extends PingCollection {
final private PingCollection internalCollection;
final private int expectedResponses;
final private AtomicInteger responseCount;
final private PingListener listener;
final private int id;
public FinalizingPingCollection(int id, PingCollection internalCollection, int expectedResponses, PingListener listener) {
this.id = id;
this.internalCollection = internalCollection;
this.expectedResponses = expectedResponses;
this.responseCount = new AtomicInteger();
this.listener = listener;
}
@Override
public synchronized boolean addPing(PingResponse ping) {
if (internalCollection.addPing(ping)) {
if (responseCount.incrementAndGet() >= expectedResponses) {
logger.trace("[{}] all nodes responded", id);
finish();
}
return true;
}
return false;
}
@Override
public synchronized void addPings(PingResponse[] pings) {
internalCollection.addPings(pings);
}
@Override
public synchronized PingResponse[] toArray() {
return internalCollection.toArray();
}
void finish() {
// spawn another thread as we may be running on a network thread
threadPool.generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.error("failed to call ping listener", t);
}
@Override
protected void doRun() throws Exception {
finalizePingCycle(id, listener);
}
});
}
}
class MulticastPingResponseRequestHandler extends BaseTransportRequestHandler<MulticastPingResponse> {
@Override