From 74fa3ed32f6e9cbf178bdb0255241487d627fd86 Mon Sep 17 00:00:00 2001 From: kimchy Date: Thu, 27 May 2010 02:11:06 +0300 Subject: [PATCH] Zen Discovery: Improve Multicast Binding and Sending, closes #195. --- .../zen/ping/multicast/MulticastZenPing.java | 98 ++++++++++++++++--- 1 file changed, 84 insertions(+), 14 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java index 6909c7090e4..8a1e1ce94af 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java @@ -20,6 +20,7 @@ package org.elasticsearch.discovery.zen.ping.multicast; import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -34,6 +35,7 @@ import org.elasticsearch.util.TimeValue; import org.elasticsearch.util.component.AbstractLifecycleComponent; import org.elasticsearch.util.io.stream.*; import org.elasticsearch.util.network.NetworkService; +import org.elasticsearch.util.network.NetworkUtils; import org.elasticsearch.util.settings.Settings; import java.io.IOException; @@ -75,6 +77,11 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem private final NetworkService networkService; + private final NetworkInterface[] networkInterfaces; + + private final InetAddress groupAddress; + + private volatile DiscoveryNodesProvider nodesProvider; private volatile Receiver receiver; @@ -112,6 +119,17 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem this.bufferSize = componentSettings.getAsInt("buffer_size", 2048); this.ttl = componentSettings.getAsInt("ttl", 3); + try { + this.groupAddress = InetAddress.getByName(group); + } catch (Exception e) { + throw new ElasticSearchIllegalArgumentException("Failed to resolve group address [" + group + "]", e); + } + try { + this.networkInterfaces = NetworkUtils.getAllAvailableInterfaces().toArray(new NetworkInterface[0]); + } catch (SocketException e) { + throw new ElasticSearchIllegalArgumentException("Failed to get all network interfaces on the machine", e); + } + logger.debug("Using group [{}], with port [{}], ttl [{}], and address [{}]", group, port, ttl, address); this.transportService.registerHandler(MulticastPingResponseRequestHandler.ACTION, new MulticastPingResponseRequestHandler()); @@ -133,17 +151,47 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem } try { - MulticastSocket multicastSocket = new MulticastSocket(null); + MulticastSocket multicastSocket; + if (NetworkUtils.canBindToMcastAddress()) { + try { + multicastSocket = new MulticastSocket(new InetSocketAddress(groupAddress, port)); + } catch (Exception e) { + logger.debug("Failed to create multicast socket by binding to group address, binding to port", e); + multicastSocket = new MulticastSocket(port); + } + } else { + multicastSocket = new MulticastSocket(port); + } + multicastSocket.setReuseAddress(true); - // bind to receive interface - multicastSocket.bind(new InetSocketAddress(port)); + multicastSocket.setTimeToLive(ttl); - // set the send interface - InetAddress multicastInterface = networkService.resolvePublishHostAddress(address); - multicastSocket.setInterface(multicastInterface); + + if (address == null) { + //bind on all interfaces + boolean joinedAtLeaseOnInterface = false; + Exception lastException = null; + for (NetworkInterface inf : NetworkUtils.getAllAvailableInterfaces()) { + try { + multicastSocket.joinGroup(new InetSocketAddress(groupAddress, port), inf); + logger.trace("Joined network interface {} for group {} and port {}", inf, groupAddress, port); + joinedAtLeaseOnInterface = true; + } catch (Exception e) { + lastException = e; + logger.debug("Failed to join on network interface {}", e, inf); + } + } + if (!joinedAtLeaseOnInterface) { + throw new DiscoveryException("Failed to join any network interface", lastException); + } + } else { + InetAddress multicastInterface = networkService.resolvePublishHostAddress(address); + multicastSocket.setInterface(multicastInterface); + multicastSocket.joinGroup(groupAddress); + logger.trace("Joined address {} for group {} and port {}", multicastInterface, groupAddress, port); + } multicastSocket.setReceiveBufferSize(bufferSize); multicastSocket.setSendBufferSize(bufferSize); - multicastSocket.joinGroup(InetAddress.getByName(group)); multicastSocket.setSoTimeout(60000); this.multicastSocket = multicastSocket; @@ -213,18 +261,40 @@ public class MulticastZenPing extends AbstractLifecycleComponent implem clusterName.writeTo(out); nodesProvider.nodes().localNode().writeTo(out); datagramPacketSend.setData(((BytesStreamOutput) out.wrappedOut()).copiedByteArray()); + datagramPacketSend.setAddress(groupAddress); + datagramPacketSend.setPort(port); } catch (IOException e) { receivedResponses.remove(id); throw new ZenPingException("Failed to serialize ping request", e); } - try { - multicastSocket.send(datagramPacketSend); - if (logger.isTraceEnabled()) { - logger.trace("[{}] Sending ping request", id); + if (networkInterfaces != null) { + Exception lastException = null; + boolean sentToAtLeastOne = false; + for (NetworkInterface inf : networkInterfaces) { + if (logger.isTraceEnabled()) { + logger.trace("[{}] Sending ping request on interface {}", id, inf); + } + try { + multicastSocket.setNetworkInterface(inf); + multicastSocket.send(datagramPacketSend); + sentToAtLeastOne = true; + } catch (Exception e) { + lastException = e; + } + } + if (!sentToAtLeastOne) { + throw new ZenPingException("Failed to send on any of the network interfaces", lastException); + } + } else { + try { + if (logger.isTraceEnabled()) { + logger.trace("[{}] Sending ping request", id); + } + multicastSocket.send(datagramPacketSend); + } catch (IOException e) { + receivedResponses.remove(id); + throw new ZenPingException("Failed to send ping request over multicast", e); } - } catch (IOException e) { - receivedResponses.remove(id); - throw new ZenPingException("Failed to send ping request over multicast", e); } } }