HBASE-12359 MulticastPublisher should specify IPv4/v6 protocol family when creating multicast channel (Qiang Tian)

This commit is contained in:
stack 2014-11-14 21:55:15 -08:00
parent 9583d14747
commit 0255fc2bf1
1 changed files with 43 additions and 3 deletions

View File

@ -22,16 +22,21 @@ package org.apache.hadoop.hbase.master;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ChannelFactory;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.handler.codec.MessageToMessageEncoder;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import io.netty.util.internal.StringUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.ClusterStatus;
@ -43,11 +48,14 @@ import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.VersionInfo;
import java.io.Closeable;
import java.io.IOException;
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
@ -60,6 +68,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* Class to publish the cluster status to the client. This allows them to know immediately
* the dead region servers, hence to cut the connection they have with them, eventually stop
@ -261,12 +270,17 @@ public class ClusterStatusPublisher extends Chore {
}
final InetSocketAddress isa = new InetSocketAddress(mcAddress, port);
InternetProtocolFamily family = InternetProtocolFamily.IPv4;
if (ina instanceof Inet6Address) {
family = InternetProtocolFamily.IPv6;
}
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_REUSEADDR, true)
.handler(new ClusterStatusEncoder(isa));
.channelFactory(new HBaseDatagramChannelFactory<Channel>(NioDatagramChannel.class, family))
.option(ChannelOption.SO_REUSEADDR, true)
.handler(new ClusterStatusEncoder(isa));
try {
channel = (DatagramChannel) b.bind(new InetSocketAddress(0)).sync().channel();
@ -278,6 +292,32 @@ public class ClusterStatusPublisher extends Chore {
}
}
private static final class HBaseDatagramChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Class<? extends T> clazz;
private InternetProtocolFamily family;
HBaseDatagramChannelFactory(Class<? extends T> clazz, InternetProtocolFamily family) {
this.clazz = clazz;
this.family = family;
}
@Override
public T newChannel() {
try {
return ReflectionUtils.instantiateWithCustomCtor(clazz.getName(),
new Class[] { InternetProtocolFamily.class }, new Object[] { family });
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
@Override
public String toString() {
return StringUtil.simpleClassName(clazz) + ".class";
}
}
private static class ClusterStatusEncoder extends MessageToMessageEncoder<ClusterStatus> {
final private InetSocketAddress isa;