Multicast Discovery: if it fails, still start in a single cluster mode, closes #1608.

This commit is contained in:
Shay Banon 2012-01-13 10:40:29 +02:00
parent d2d65f2f65
commit 5eedfb1d62
1 changed files with 12 additions and 18 deletions

View File

@ -21,6 +21,7 @@ package org.elasticsearch.discovery.zen.ping.multicast;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
@ -35,7 +36,6 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.discovery.zen.ping.ZenPing; import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.discovery.zen.ping.ZenPingException;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*; import org.elasticsearch.transport.*;
@ -239,14 +239,14 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
} }
final int id = pingIdGenerator.incrementAndGet(); final int id = pingIdGenerator.incrementAndGet();
receivedResponses.put(id, new ConcurrentHashMap<DiscoveryNode, PingResponse>()); receivedResponses.put(id, new ConcurrentHashMap<DiscoveryNode, PingResponse>());
sendPingRequest(id, true); sendPingRequest(id);
// try and send another ping request halfway through (just in case someone woke up during it...) // try and send another ping request halfway through (just in case someone woke up during it...)
// this can be a good trade-off to nailing the initial lookup or un-delivered messages // this can be a good trade-off to nailing the initial lookup or un-delivered messages
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.CACHED, new Runnable() { threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.CACHED, new Runnable() {
@Override @Override
public void run() { public void run() {
try { try {
sendPingRequest(id, false); sendPingRequest(id);
} catch (Exception e) { } catch (Exception e) {
logger.warn("[{}] failed to send second ping request", e, id); logger.warn("[{}] failed to send second ping request", e, id);
} }
@ -261,7 +261,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
}); });
} }
private void sendPingRequest(int id, boolean remove) { private void sendPingRequest(int id) {
if (multicastSocket == null) { if (multicastSocket == null) {
return; return;
} }
@ -275,27 +275,21 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
clusterName.writeTo(out); clusterName.writeTo(out);
nodesProvider.nodes().localNode().writeTo(out); nodesProvider.nodes().localNode().writeTo(out);
datagramPacketSend.setData(cachedEntry.bytes().copiedByteArray()); datagramPacketSend.setData(cachedEntry.bytes().copiedByteArray());
} catch (IOException e) {
if (remove) {
receivedResponses.remove(id);
}
throw new ZenPingException("Failed to serialize ping request", e);
} finally {
CachedStreamOutput.pushEntry(cachedEntry);
}
try {
multicastSocket.send(datagramPacketSend); multicastSocket.send(datagramPacketSend);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("[{}] sending ping request", id); logger.trace("[{}] sending ping request", id);
} }
} catch (IOException e) { } catch (Exception e) {
if (remove) {
receivedResponses.remove(id);
}
if (lifecycle.stoppedOrClosed()) { if (lifecycle.stoppedOrClosed()) {
return; return;
} }
throw new ZenPingException("Failed to send ping request over multicast on " + multicastSocket, e); if (logger.isDebugEnabled()) {
logger.debug("failed to send multicast ping request", e);
} else {
logger.warn("failed to send multicast ping request: {}", ExceptionsHelper.detailedMessage(e));
}
} finally {
CachedStreamOutput.pushEntry(cachedEntry);
} }
} }
} }