Start Elastic Node without network link, closes #842.
This commit is contained in:
parent
be7d3b609f
commit
20ff04f715
|
@ -36,7 +36,6 @@ import org.elasticsearch.common.io.stream.VoidStreamable;
|
||||||
import org.elasticsearch.common.network.NetworkService;
|
import org.elasticsearch.common.network.NetworkService;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.discovery.DiscoveryException;
|
|
||||||
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
|
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
|
||||||
import org.elasticsearch.discovery.zen.ping.ZenPing;
|
import org.elasticsearch.discovery.zen.ping.ZenPing;
|
||||||
import org.elasticsearch.discovery.zen.ping.ZenPingException;
|
import org.elasticsearch.discovery.zen.ping.ZenPingException;
|
||||||
|
@ -142,9 +141,14 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
|
||||||
this.datagramPacketReceive = new DatagramPacket(new byte[bufferSize], bufferSize);
|
this.datagramPacketReceive = new DatagramPacket(new byte[bufferSize], bufferSize);
|
||||||
this.datagramPacketSend = new DatagramPacket(new byte[bufferSize], bufferSize, InetAddress.getByName(group), port);
|
this.datagramPacketSend = new DatagramPacket(new byte[bufferSize], bufferSize, InetAddress.getByName(group), port);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new DiscoveryException("Failed to set datagram packets", e);
|
logger.warn("disabled, failed to setup multicast (datagram) discovery : {}", e.getMessage());
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debug("disabled, failed to setup multicast (datagram) discovery", e);
|
||||||
|
}
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
InetAddress multicastInterface = null;
|
||||||
try {
|
try {
|
||||||
MulticastSocket multicastSocket;
|
MulticastSocket multicastSocket;
|
||||||
// if (NetworkUtils.canBindToMcastAddress()) {
|
// if (NetworkUtils.canBindToMcastAddress()) {
|
||||||
|
@ -161,7 +165,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
|
||||||
multicastSocket.setTimeToLive(ttl);
|
multicastSocket.setTimeToLive(ttl);
|
||||||
|
|
||||||
// set the send interface
|
// set the send interface
|
||||||
InetAddress multicastInterface = networkService.resolvePublishHostAddress(address);
|
multicastInterface = networkService.resolvePublishHostAddress(address);
|
||||||
multicastSocket.setInterface(multicastInterface);
|
multicastSocket.setInterface(multicastInterface);
|
||||||
multicastSocket.joinGroup(InetAddress.getByName(group));
|
multicastSocket.joinGroup(InetAddress.getByName(group));
|
||||||
|
|
||||||
|
@ -170,19 +174,35 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
|
||||||
multicastSocket.setSoTimeout(60000);
|
multicastSocket.setSoTimeout(60000);
|
||||||
|
|
||||||
this.multicastSocket = multicastSocket;
|
this.multicastSocket = multicastSocket;
|
||||||
} catch (Exception e) {
|
|
||||||
throw new DiscoveryException("Failed to setup multicast socket", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
this.receiver = new Receiver();
|
this.receiver = new Receiver();
|
||||||
this.receiverThread = daemonThreadFactory(settings, "discovery#multicast#received").newThread(receiver);
|
this.receiverThread = daemonThreadFactory(settings, "discovery#multicast#receiver").newThread(receiver);
|
||||||
this.receiverThread.start();
|
this.receiverThread.start();
|
||||||
|
} catch (Exception e) {
|
||||||
|
datagramPacketReceive = null;
|
||||||
|
datagramPacketSend = null;
|
||||||
|
if (multicastSocket != null) {
|
||||||
|
multicastSocket.close();
|
||||||
|
multicastSocket = null;
|
||||||
|
}
|
||||||
|
logger.warn("disabled, failed to setup multicast discovery on {}: {}", multicastInterface, e.getMessage());
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debug("disabled, failed to setup multicast discovery on {}", e, multicastInterface);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override protected void doStop() throws ElasticSearchException {
|
@Override protected void doStop() throws ElasticSearchException {
|
||||||
receiver.stop();
|
if (receiver != null) {
|
||||||
receiverThread.interrupt();
|
receiver.stop();
|
||||||
multicastSocket.close();
|
}
|
||||||
|
if (receiverThread != null) {
|
||||||
|
receiverThread.interrupt();
|
||||||
|
}
|
||||||
|
if (multicastSocket != null) {
|
||||||
|
multicastSocket.close();
|
||||||
|
multicastSocket = null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override protected void doClose() throws ElasticSearchException {
|
@Override protected void doClose() throws ElasticSearchException {
|
||||||
|
@ -229,6 +249,9 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
|
||||||
}
|
}
|
||||||
|
|
||||||
private void sendPingRequest(int id, boolean remove) {
|
private void sendPingRequest(int id, boolean remove) {
|
||||||
|
if (multicastSocket == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
synchronized (sendMutex) {
|
synchronized (sendMutex) {
|
||||||
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
|
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
|
||||||
try {
|
try {
|
||||||
|
|
Loading…
Reference in New Issue