HBASE-19394 Support multi-homing env for the publication of RS status with multicast (hbase.status.published) (Toshihiro Suzuki)

This commit is contained in:
tedyu 2017-12-12 07:38:15 -08:00
parent 2e813f106f
commit 11467ef111
3 changed files with 97 additions and 62 deletions

View File

@ -20,16 +20,6 @@
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.shaded.io.netty.bootstrap.Bootstrap;
import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBufInputStream;
import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandlerContext;
import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelOption;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
import org.apache.hadoop.hbase.shaded.io.netty.channel.SimpleChannelInboundHandler;
import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.DatagramChannel;
import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.DatagramPacket;
import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioDatagramChannel;
import java.io.Closeable;
import java.io.IOException;
@ -48,13 +38,23 @@ import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.io.netty.bootstrap.Bootstrap;
import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBufInputStream;
import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandlerContext;
import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelOption;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
import org.apache.hadoop.hbase.shaded.io.netty.channel.SimpleChannelInboundHandler;
import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.DatagramChannel;
import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.DatagramPacket;
import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioDatagramChannel;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
/**
* A class that receives the cluster status, and provide it as a set of service to the client.
@ -104,7 +104,7 @@ class ClusterStatusListener implements Closeable {
* Called to connect.
*
* @param conf Configuration to use.
* @throws IOException
* @throws IOException if failing to connect
*/
void connect(Configuration conf) throws IOException;
}
@ -197,6 +197,7 @@ class ClusterStatusListener implements Closeable {
HConstants.DEFAULT_STATUS_MULTICAST_BIND_ADDRESS);
int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT,
HConstants.DEFAULT_STATUS_MULTICAST_PORT);
String niName = conf.get(HConstants.STATUS_MULTICAST_NI_NAME);
InetAddress ina;
try {
@ -219,7 +220,13 @@ class ClusterStatusListener implements Closeable {
throw ExceptionUtil.asInterrupt(e);
}
NetworkInterface ni = NetworkInterface.getByInetAddress(Addressing.getIpAddress());
NetworkInterface ni;
if (niName != null) {
ni = NetworkInterface.getByName(niName);
} else {
ni = NetworkInterface.getByInetAddress(Addressing.getIpAddress());
}
channel.joinGroup(ina, ni, null, channel.newPromise());
}

View File

@ -28,8 +28,8 @@ import java.util.UUID;
import java.util.regex.Pattern;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
/**
* HConstants holds a bunch of HBase-related constants
@ -550,7 +550,8 @@ public final class HConstants {
/**
* Timestamp to use when we want to refer to the latest cell.
*
* On client side, this is the timestamp set by default when no timestamp is specified, to refer to the latest.
* On client side, this is the timestamp set by default when no timestamp is specified,
* to refer to the latest.
* On server side, this acts as a notation.
* (1) For a cell of Put, which has this notation,
* its timestamp will be replaced with server's current time.
@ -559,7 +560,8 @@ public final class HConstants {
* a. When the count of cell it gets is less than the count of cell to delete,
* the timestamp of Delete cell will be replaced with server's current time.
* b. When the count of cell it gets is equal to the count of cell to delete,
* the timestamp of Delete cell will be replaced with the latest timestamp of cell it gets.
* the timestamp of Delete cell will be replaced with the latest timestamp of cell it
* gets.
* (c. It is invalid and an exception will be thrown,
* if the count of cell it gets is greater than the count of cell to delete,
* as the max version of Get is set to the count of cell to delete.)
@ -576,7 +578,7 @@ public final class HConstants {
* Special! Used in fake Cells only. Should never be the timestamp on an actual Cell returned to
* a client.
* @deprecated Should not be public since hbase-1.3.0. For internal use only. Move internal to
* Scanners flagged as special timestamp value never to be returned as timestamp on a Cell.
* Scanners flagged as special timestamp value never to be returned as timestamp on a Cell.
*/
@Deprecated
public static final long OLDEST_TIMESTAMP = Long.MIN_VALUE;
@ -1157,6 +1159,18 @@ public final class HConstants {
public static final String STATUS_MULTICAST_PORT = "hbase.status.multicast.address.port";
public static final int DEFAULT_STATUS_MULTICAST_PORT = 16100;
/**
* The network interface name to use for the multicast messages.
*/
public static final String STATUS_MULTICAST_NI_NAME = "hbase.status.multicast.ni.name";
/**
* The address to use for binding the local socket for sending multicast. Defaults to 0.0.0.0.
*/
public static final String STATUS_MULTICAST_PUBLISHER_BIND_ADDRESS =
"hbase.status.multicast.publisher.bind.address.ip";
public static final String DEFAULT_STATUS_MULTICAST_PUBLISHER_BIND_ADDRESS = "0.0.0.0";
public static final long NO_NONCE = 0;
/** Default cipher for encryption */

View File

@ -21,22 +21,6 @@
package org.apache.hadoop.hbase.master;
import org.apache.hadoop.hbase.shaded.io.netty.bootstrap.Bootstrap;
import org.apache.hadoop.hbase.shaded.io.netty.bootstrap.ChannelFactory;
import org.apache.hadoop.hbase.shaded.io.netty.buffer.Unpooled;
import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelException;
import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandlerContext;
import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelOption;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.DatagramChannel;
import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.DatagramPacket;
import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.InternetProtocolFamily;
import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioDatagramChannel;
import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.MessageToMessageEncoder;
import org.apache.hadoop.hbase.shaded.io.netty.util.internal.StringUtil;
import java.io.Closeable;
import java.io.IOException;
import java.net.Inet6Address;
@ -58,9 +42,6 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ExceptionUtil;
@ -68,6 +49,25 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.io.netty.bootstrap.Bootstrap;
import org.apache.hadoop.hbase.shaded.io.netty.bootstrap.ChannelFactory;
import org.apache.hadoop.hbase.shaded.io.netty.buffer.Unpooled;
import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelException;
import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandlerContext;
import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelOption;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.DatagramChannel;
import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.DatagramPacket;
import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.InternetProtocolFamily;
import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioDatagramChannel;
import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.MessageToMessageEncoder;
import org.apache.hadoop.hbase.shaded.io.netty.util.internal.StringUtil;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
/**
@ -252,6 +252,9 @@ public class ClusterStatusPublisher extends ScheduledChore {
HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS);
int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT,
HConstants.DEFAULT_STATUS_MULTICAST_PORT);
String bindAddress = conf.get(HConstants.STATUS_MULTICAST_PUBLISHER_BIND_ADDRESS,
HConstants.DEFAULT_STATUS_MULTICAST_PUBLISHER_BIND_ADDRESS);
String niName = conf.get(HConstants.STATUS_MULTICAST_NI_NAME);
final InetAddress ina;
try {
@ -264,24 +267,34 @@ public class ClusterStatusPublisher extends ScheduledChore {
final InetSocketAddress isa = new InetSocketAddress(mcAddress, port);
InternetProtocolFamily family;
InetAddress localAddress;
if (ina instanceof Inet6Address) {
localAddress = Addressing.getIp6Address();
family = InternetProtocolFamily.IPv6;
}else{
localAddress = Addressing.getIp4Address();
family = InternetProtocolFamily.IPv4;
NetworkInterface ni;
if (niName != null) {
if (ina instanceof Inet6Address) {
family = InternetProtocolFamily.IPv6;
} else {
family = InternetProtocolFamily.IPv4;
}
ni = NetworkInterface.getByName(niName);
} else {
InetAddress localAddress;
if (ina instanceof Inet6Address) {
localAddress = Addressing.getIp6Address();
family = InternetProtocolFamily.IPv6;
} else {
localAddress = Addressing.getIp4Address();
family = InternetProtocolFamily.IPv4;
}
ni = NetworkInterface.getByInetAddress(localAddress);
}
NetworkInterface ni = NetworkInterface.getByInetAddress(localAddress);
Bootstrap b = new Bootstrap();
b.group(group)
.channelFactory(new HBaseDatagramChannelFactory<Channel>(NioDatagramChannel.class, family))
.option(ChannelOption.SO_REUSEADDR, true)
.handler(new ClusterStatusEncoder(isa));
.channelFactory(new HBaseDatagramChannelFactory<Channel>(NioDatagramChannel.class, family))
.option(ChannelOption.SO_REUSEADDR, true)
.handler(new ClusterStatusEncoder(isa));
try {
channel = (DatagramChannel) b.bind(new InetSocketAddress(0)).sync().channel();
channel = (DatagramChannel) b.bind(bindAddress, 0).sync().channel();
channel.joinGroup(ina, ni, null, channel.newPromise()).sync();
channel.connect(isa).sync();
} catch (InterruptedException e) {
@ -290,33 +303,34 @@ public class ClusterStatusPublisher extends ScheduledChore {
}
}
private static final class HBaseDatagramChannelFactory<T extends Channel> implements ChannelFactory<T> {
private static final class HBaseDatagramChannelFactory<T extends Channel>
implements ChannelFactory<T> {
private final Class<? extends T> clazz;
private InternetProtocolFamily family;
HBaseDatagramChannelFactory(Class<? extends T> clazz, InternetProtocolFamily family) {
this.clazz = clazz;
this.family = family;
this.clazz = clazz;
this.family = family;
}
@Override
public T newChannel() {
try {
return ReflectionUtils.instantiateWithCustomCtor(clazz.getName(),
new Class[] { InternetProtocolFamily.class }, new Object[] { family });
try {
return ReflectionUtils.instantiateWithCustomCtor(clazz.getName(),
new Class[] { InternetProtocolFamily.class }, new Object[] { family });
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
@Override
public String toString() {
return StringUtil.simpleClassName(clazz) + ".class";
return StringUtil.simpleClassName(clazz) + ".class";
}
}
}
private static class ClusterStatusEncoder extends MessageToMessageEncoder<ClusterStatus> {
private static final class ClusterStatusEncoder extends MessageToMessageEncoder<ClusterStatus> {
final private InetSocketAddress isa;
private ClusterStatusEncoder(InetSocketAddress isa) {