diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java index 12e9a608187..7f20436705b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java @@ -20,16 +20,6 @@ package org.apache.hadoop.hbase.client; -import org.apache.hadoop.hbase.shaded.io.netty.bootstrap.Bootstrap; -import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBufInputStream; -import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandlerContext; -import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelOption; -import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup; -import org.apache.hadoop.hbase.shaded.io.netty.channel.SimpleChannelInboundHandler; -import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup; -import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.DatagramChannel; -import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.DatagramPacket; -import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioDatagramChannel; import java.io.Closeable; import java.io.IOException; @@ -48,13 +38,23 @@ import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.hbase.util.Threads; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.shaded.io.netty.bootstrap.Bootstrap; +import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBufInputStream; +import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandlerContext; +import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelOption; +import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup; +import org.apache.hadoop.hbase.shaded.io.netty.channel.SimpleChannelInboundHandler; +import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.DatagramChannel; +import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.DatagramPacket; +import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioDatagramChannel; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; /** * A class that receives the cluster status, and provide it as a set of service to the client. @@ -104,7 +104,7 @@ class ClusterStatusListener implements Closeable { * Called to connect. * * @param conf Configuration to use. - * @throws IOException + * @throws IOException if failing to connect */ void connect(Configuration conf) throws IOException; } @@ -197,6 +197,7 @@ class ClusterStatusListener implements Closeable { HConstants.DEFAULT_STATUS_MULTICAST_BIND_ADDRESS); int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT, HConstants.DEFAULT_STATUS_MULTICAST_PORT); + String niName = conf.get(HConstants.STATUS_MULTICAST_NI_NAME); InetAddress ina; try { @@ -219,7 +220,13 @@ class ClusterStatusListener implements Closeable { throw ExceptionUtil.asInterrupt(e); } - NetworkInterface ni = NetworkInterface.getByInetAddress(Addressing.getIpAddress()); + NetworkInterface ni; + if (niName != null) { + ni = NetworkInterface.getByName(niName); + } else { + ni = NetworkInterface.getByInetAddress(Addressing.getIpAddress()); + } + channel.joinGroup(ina, ni, null, channel.newPromise()); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 594a895c428..30e85454a72 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -28,8 +28,8 @@ import java.util.UUID; import java.util.regex.Pattern; import org.apache.commons.lang3.ArrayUtils; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; /** * HConstants holds a bunch of HBase-related constants @@ -586,7 +586,7 @@ public final class HConstants { * Special! Used in fake Cells only. Should never be the timestamp on an actual Cell returned to * a client. * @deprecated Should not be public since hbase-1.3.0. For internal use only. Move internal to - * Scanners flagged as special timestamp value never to be returned as timestamp on a Cell. + * Scanners flagged as special timestamp value never to be returned as timestamp on a Cell. */ @Deprecated public static final long OLDEST_TIMESTAMP = Long.MIN_VALUE; @@ -1187,6 +1187,18 @@ public final class HConstants { public static final String STATUS_MULTICAST_PORT = "hbase.status.multicast.address.port"; public static final int DEFAULT_STATUS_MULTICAST_PORT = 16100; + /** + * The network interface name to use for the multicast messages. + */ + public static final String STATUS_MULTICAST_NI_NAME = "hbase.status.multicast.ni.name"; + + /** + * The address to use for binding the local socket for sending multicast. Defaults to 0.0.0.0. + */ + public static final String STATUS_MULTICAST_PUBLISHER_BIND_ADDRESS = + "hbase.status.multicast.publisher.bind.address.ip"; + public static final String DEFAULT_STATUS_MULTICAST_PUBLISHER_BIND_ADDRESS = "0.0.0.0"; + public static final long NO_NONCE = 0; /** Default cipher for encryption */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java index 63cc96e42c7..cbf4b1cb2ae 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java @@ -21,22 +21,6 @@ package org.apache.hadoop.hbase.master; -import org.apache.hadoop.hbase.shaded.io.netty.bootstrap.Bootstrap; -import org.apache.hadoop.hbase.shaded.io.netty.bootstrap.ChannelFactory; -import org.apache.hadoop.hbase.shaded.io.netty.buffer.Unpooled; -import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel; -import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelException; -import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandlerContext; -import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelOption; -import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup; -import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup; -import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.DatagramChannel; -import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.DatagramPacket; -import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.InternetProtocolFamily; -import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioDatagramChannel; -import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.MessageToMessageEncoder; -import org.apache.hadoop.hbase.shaded.io.netty.util.internal.StringUtil; - import java.io.Closeable; import java.io.IOException; import java.net.Inet6Address; @@ -58,9 +42,6 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.ServerName; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ExceptionUtil; @@ -68,6 +49,25 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.VersionInfo; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.io.netty.bootstrap.Bootstrap; +import org.apache.hadoop.hbase.shaded.io.netty.bootstrap.ChannelFactory; +import org.apache.hadoop.hbase.shaded.io.netty.buffer.Unpooled; +import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel; +import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelException; +import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandlerContext; +import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelOption; +import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup; +import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.DatagramChannel; +import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.DatagramPacket; +import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.InternetProtocolFamily; +import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioDatagramChannel; +import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.MessageToMessageEncoder; +import org.apache.hadoop.hbase.shaded.io.netty.util.internal.StringUtil; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; /** @@ -252,6 +252,9 @@ public class ClusterStatusPublisher extends ScheduledChore { HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS); int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT, HConstants.DEFAULT_STATUS_MULTICAST_PORT); + String bindAddress = conf.get(HConstants.STATUS_MULTICAST_PUBLISHER_BIND_ADDRESS, + HConstants.DEFAULT_STATUS_MULTICAST_PUBLISHER_BIND_ADDRESS); + String niName = conf.get(HConstants.STATUS_MULTICAST_NI_NAME); final InetAddress ina; try { @@ -264,24 +267,34 @@ public class ClusterStatusPublisher extends ScheduledChore { final InetSocketAddress isa = new InetSocketAddress(mcAddress, port); InternetProtocolFamily family; - InetAddress localAddress; - if (ina instanceof Inet6Address) { - localAddress = Addressing.getIp6Address(); - family = InternetProtocolFamily.IPv6; - }else{ - localAddress = Addressing.getIp4Address(); - family = InternetProtocolFamily.IPv4; + NetworkInterface ni; + if (niName != null) { + if (ina instanceof Inet6Address) { + family = InternetProtocolFamily.IPv6; + } else { + family = InternetProtocolFamily.IPv4; + } + ni = NetworkInterface.getByName(niName); + } else { + InetAddress localAddress; + if (ina instanceof Inet6Address) { + localAddress = Addressing.getIp6Address(); + family = InternetProtocolFamily.IPv6; + } else { + localAddress = Addressing.getIp4Address(); + family = InternetProtocolFamily.IPv4; + } + ni = NetworkInterface.getByInetAddress(localAddress); } - NetworkInterface ni = NetworkInterface.getByInetAddress(localAddress); Bootstrap b = new Bootstrap(); b.group(group) - .channelFactory(new HBaseDatagramChannelFactory(NioDatagramChannel.class, family)) - .option(ChannelOption.SO_REUSEADDR, true) - .handler(new ClusterStatusEncoder(isa)); + .channelFactory(new HBaseDatagramChannelFactory(NioDatagramChannel.class, family)) + .option(ChannelOption.SO_REUSEADDR, true) + .handler(new ClusterStatusEncoder(isa)); try { - channel = (DatagramChannel) b.bind(new InetSocketAddress(0)).sync().channel(); + channel = (DatagramChannel) b.bind(bindAddress, 0).sync().channel(); channel.joinGroup(ina, ni, null, channel.newPromise()).sync(); channel.connect(isa).sync(); } catch (InterruptedException e) { @@ -290,33 +303,34 @@ public class ClusterStatusPublisher extends ScheduledChore { } } - private static final class HBaseDatagramChannelFactory implements ChannelFactory { + private static final class HBaseDatagramChannelFactory + implements ChannelFactory { private final Class clazz; private InternetProtocolFamily family; HBaseDatagramChannelFactory(Class clazz, InternetProtocolFamily family) { - this.clazz = clazz; - this.family = family; + this.clazz = clazz; + this.family = family; } @Override public T newChannel() { - try { - return ReflectionUtils.instantiateWithCustomCtor(clazz.getName(), - new Class[] { InternetProtocolFamily.class }, new Object[] { family }); + try { + return ReflectionUtils.instantiateWithCustomCtor(clazz.getName(), + new Class[] { InternetProtocolFamily.class }, new Object[] { family }); - } catch (Throwable t) { - throw new ChannelException("Unable to create Channel from class " + clazz, t); - } + } catch (Throwable t) { + throw new ChannelException("Unable to create Channel from class " + clazz, t); + } } @Override public String toString() { - return StringUtil.simpleClassName(clazz) + ".class"; + return StringUtil.simpleClassName(clazz) + ".class"; } - } + } - private static class ClusterStatusEncoder extends MessageToMessageEncoder { + private static final class ClusterStatusEncoder extends MessageToMessageEncoder { final private InetSocketAddress isa; private ClusterStatusEncoder(InetSocketAddress isa) {