Zen Discovery: Improve Multicast Binding and Sending, closes #195.
This commit is contained in:
parent
9433f4d651
commit
74fa3ed32f
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.discovery.zen.ping.multicast;
|
||||
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.ElasticSearchIllegalArgumentException;
|
||||
import org.elasticsearch.ElasticSearchIllegalStateException;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
|
@ -34,6 +35,7 @@ import org.elasticsearch.util.TimeValue;
|
|||
import org.elasticsearch.util.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.util.io.stream.*;
|
||||
import org.elasticsearch.util.network.NetworkService;
|
||||
import org.elasticsearch.util.network.NetworkUtils;
|
||||
import org.elasticsearch.util.settings.Settings;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -75,6 +77,11 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
|
|||
private final NetworkService networkService;
|
||||
|
||||
|
||||
private final NetworkInterface[] networkInterfaces;
|
||||
|
||||
private final InetAddress groupAddress;
|
||||
|
||||
|
||||
private volatile DiscoveryNodesProvider nodesProvider;
|
||||
|
||||
private volatile Receiver receiver;
|
||||
|
@ -112,6 +119,17 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
|
|||
this.bufferSize = componentSettings.getAsInt("buffer_size", 2048);
|
||||
this.ttl = componentSettings.getAsInt("ttl", 3);
|
||||
|
||||
try {
|
||||
this.groupAddress = InetAddress.getByName(group);
|
||||
} catch (Exception e) {
|
||||
throw new ElasticSearchIllegalArgumentException("Failed to resolve group address [" + group + "]", e);
|
||||
}
|
||||
try {
|
||||
this.networkInterfaces = NetworkUtils.getAllAvailableInterfaces().toArray(new NetworkInterface[0]);
|
||||
} catch (SocketException e) {
|
||||
throw new ElasticSearchIllegalArgumentException("Failed to get all network interfaces on the machine", e);
|
||||
}
|
||||
|
||||
logger.debug("Using group [{}], with port [{}], ttl [{}], and address [{}]", group, port, ttl, address);
|
||||
|
||||
this.transportService.registerHandler(MulticastPingResponseRequestHandler.ACTION, new MulticastPingResponseRequestHandler());
|
||||
|
@ -133,17 +151,47 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
|
|||
}
|
||||
|
||||
try {
|
||||
MulticastSocket multicastSocket = new MulticastSocket(null);
|
||||
MulticastSocket multicastSocket;
|
||||
if (NetworkUtils.canBindToMcastAddress()) {
|
||||
try {
|
||||
multicastSocket = new MulticastSocket(new InetSocketAddress(groupAddress, port));
|
||||
} catch (Exception e) {
|
||||
logger.debug("Failed to create multicast socket by binding to group address, binding to port", e);
|
||||
multicastSocket = new MulticastSocket(port);
|
||||
}
|
||||
} else {
|
||||
multicastSocket = new MulticastSocket(port);
|
||||
}
|
||||
|
||||
multicastSocket.setReuseAddress(true);
|
||||
// bind to receive interface
|
||||
multicastSocket.bind(new InetSocketAddress(port));
|
||||
|
||||
multicastSocket.setTimeToLive(ttl);
|
||||
// set the send interface
|
||||
|
||||
if (address == null) {
|
||||
//bind on all interfaces
|
||||
boolean joinedAtLeaseOnInterface = false;
|
||||
Exception lastException = null;
|
||||
for (NetworkInterface inf : NetworkUtils.getAllAvailableInterfaces()) {
|
||||
try {
|
||||
multicastSocket.joinGroup(new InetSocketAddress(groupAddress, port), inf);
|
||||
logger.trace("Joined network interface {} for group {} and port {}", inf, groupAddress, port);
|
||||
joinedAtLeaseOnInterface = true;
|
||||
} catch (Exception e) {
|
||||
lastException = e;
|
||||
logger.debug("Failed to join on network interface {}", e, inf);
|
||||
}
|
||||
}
|
||||
if (!joinedAtLeaseOnInterface) {
|
||||
throw new DiscoveryException("Failed to join any network interface", lastException);
|
||||
}
|
||||
} else {
|
||||
InetAddress multicastInterface = networkService.resolvePublishHostAddress(address);
|
||||
multicastSocket.setInterface(multicastInterface);
|
||||
multicastSocket.joinGroup(groupAddress);
|
||||
logger.trace("Joined address {} for group {} and port {}", multicastInterface, groupAddress, port);
|
||||
}
|
||||
multicastSocket.setReceiveBufferSize(bufferSize);
|
||||
multicastSocket.setSendBufferSize(bufferSize);
|
||||
multicastSocket.joinGroup(InetAddress.getByName(group));
|
||||
multicastSocket.setSoTimeout(60000);
|
||||
|
||||
this.multicastSocket = multicastSocket;
|
||||
|
@ -213,21 +261,43 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
|
|||
clusterName.writeTo(out);
|
||||
nodesProvider.nodes().localNode().writeTo(out);
|
||||
datagramPacketSend.setData(((BytesStreamOutput) out.wrappedOut()).copiedByteArray());
|
||||
datagramPacketSend.setAddress(groupAddress);
|
||||
datagramPacketSend.setPort(port);
|
||||
} catch (IOException e) {
|
||||
receivedResponses.remove(id);
|
||||
throw new ZenPingException("Failed to serialize ping request", e);
|
||||
}
|
||||
if (networkInterfaces != null) {
|
||||
Exception lastException = null;
|
||||
boolean sentToAtLeastOne = false;
|
||||
for (NetworkInterface inf : networkInterfaces) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("[{}] Sending ping request on interface {}", id, inf);
|
||||
}
|
||||
try {
|
||||
multicastSocket.setNetworkInterface(inf);
|
||||
multicastSocket.send(datagramPacketSend);
|
||||
sentToAtLeastOne = true;
|
||||
} catch (Exception e) {
|
||||
lastException = e;
|
||||
}
|
||||
}
|
||||
if (!sentToAtLeastOne) {
|
||||
throw new ZenPingException("Failed to send on any of the network interfaces", lastException);
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("[{}] Sending ping request", id);
|
||||
}
|
||||
multicastSocket.send(datagramPacketSend);
|
||||
} catch (IOException e) {
|
||||
receivedResponses.remove(id);
|
||||
throw new ZenPingException("Failed to send ping request over multicast", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class MulticastPingResponseRequestHandler extends BaseTransportRequestHandler<MulticastPingResponse> {
|
||||
|
||||
|
|
Loading…
Reference in New Issue