HBASE-10573 Use Netty 4

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1596192 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
nkeywal 2014-05-20 10:57:11 +00:00
parent 8a420c52ac
commit b6646596c6
11 changed files with 211 additions and 120 deletions

View File

@ -116,7 +116,7 @@
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>

View File

@ -29,6 +29,7 @@ import java.util.Enumeration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
/**
@ -84,22 +85,7 @@ class ClientIdGenerator {
*/
public static byte[] getIpAddressBytes() {
try {
// Before we connect somewhere, we cannot be sure about what we'd be bound to; however,
// we only connect when the message where client ID is, is long constructed. Thus,
// just use whichever IP address we can find.
Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
while (interfaces.hasMoreElements()) {
NetworkInterface current = interfaces.nextElement();
if (!current.isUp() || current.isLoopback() || current.isVirtual()) continue;
Enumeration<InetAddress> addresses = current.getInetAddresses();
while (addresses.hasMoreElements()) {
InetAddress addr = addresses.nextElement();
if (addr.isLoopbackAddress()) continue;
if (addr instanceof Inet4Address || addr instanceof Inet6Address) {
return addr.getAddress();
}
}
}
return Addressing.getIpAddress().getAddress();
} catch (IOException ex) {
LOG.warn("Failed to get IP address bytes", ex);
}

View File

@ -20,18 +20,16 @@
package org.apache.hadoop.hbase.client;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBufInputStream;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -40,17 +38,19 @@ import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.Threads;
import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.DatagramChannel;
import org.jboss.netty.channel.socket.DatagramChannelFactory;
import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory;
import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
/**
@ -178,22 +178,14 @@ class ClusterStatusListener implements Closeable {
*/
class MulticastListener implements Listener {
private DatagramChannel channel;
private final ExecutorService service = Executors.newSingleThreadExecutor(
Threads.newDaemonThreadFactory("hbase-client-clusterStatus-multiCastListener"));
private final EventLoopGroup group = new NioEventLoopGroup(
1, Threads.newDaemonThreadFactory("hbase-client-clusterStatusListener"));
public MulticastListener() {
}
@Override
public void connect(Configuration conf) throws IOException {
// Can't be NiO with Netty today => not implemented in Netty.
DatagramChannelFactory f = new OioDatagramChannelFactory(service);
ConnectionlessBootstrap b = new ConnectionlessBootstrap(f);
b.setPipeline(Channels.pipeline(
new ProtobufDecoder(ClusterStatusProtos.ClusterStatus.getDefaultInstance()),
new ClusterStatusHandler()));
String mcAddress = conf.get(HConstants.STATUS_MULTICAST_ADDRESS,
HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS);
@ -202,17 +194,29 @@ class ClusterStatusListener implements Closeable {
int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT,
HConstants.DEFAULT_STATUS_MULTICAST_PORT);
channel = (DatagramChannel) b.bind(new InetSocketAddress(bindAddress, port));
channel.getConfig().setReuseAddress(true);
InetAddress ina;
try {
ina = InetAddress.getByName(mcAddress);
} catch (UnknownHostException e) {
close();
throw new IOException("Can't connect to " + mcAddress, e);
}
channel.joinGroup(ina);
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_REUSEADDR, true)
.handler(new ClusterStatusHandler());
channel = (DatagramChannel)b.bind(bindAddress, port).sync().channel();
} catch (InterruptedException e) {
close();
throw ExceptionUtil.asInterrupt(e);
}
NetworkInterface ni = NetworkInterface.getByInetAddress(Addressing.getIpAddress());
channel.joinGroup(ina, ni, null, channel.newPromise());
}
@Override
@ -221,30 +225,40 @@ class ClusterStatusListener implements Closeable {
channel.close();
channel = null;
}
service.shutdown();
group.shutdownGracefully();
}
/**
* Class, conforming to the Netty framework, that manages the message received.
*/
private class ClusterStatusHandler extends SimpleChannelUpstreamHandler {
private class ClusterStatusHandler extends SimpleChannelInboundHandler<DatagramPacket> {
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
ClusterStatusProtos.ClusterStatus csp = (ClusterStatusProtos.ClusterStatus) e.getMessage();
ClusterStatus ncs = ClusterStatus.convert(csp);
receive(ncs);
}
/**
* Invoked when an exception was raised by an I/O thread or a
* {@link org.jboss.netty.channel.ChannelHandler}.
*/
@Override
public void exceptionCaught(
ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
LOG.error("Unexpected exception, continuing.", e.getCause());
ChannelHandlerContext ctx, Throwable cause)
throws Exception {
LOG.error("Unexpected exception, continuing.", cause);
}
@Override
public boolean acceptInboundMessage(Object msg)
throws Exception {
return super.acceptInboundMessage(msg);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket dp) throws Exception {
ByteBufInputStream bis = new ByteBufInputStream(dp.content());
try {
ClusterStatusProtos.ClusterStatus csp = ClusterStatusProtos.ClusterStatus.parseFrom(bis);
ClusterStatus ncs = ClusterStatus.convert(csp);
receive(ncs);
} finally {
bis.close();
}
}
}
}

View File

@ -18,7 +18,13 @@
*/
package org.apache.hadoop.hbase.util;
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.Enumeration;
import org.apache.hadoop.classification.InterfaceAudience;
@ -75,4 +81,25 @@ public class Addressing {
}
return Integer.parseInt(hostAndPort.substring(colonIndex + 1));
}
public static InetAddress getIpAddress() throws SocketException {
// Before we connect somewhere, we cannot be sure about what we'd be bound to; however,
// we only connect when the message where client ID is, is long constructed. Thus,
// just use whichever IP address we can find.
Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
while (interfaces.hasMoreElements()) {
NetworkInterface current = interfaces.nextElement();
if (!current.isUp() || current.isLoopback() || current.isVirtual()) continue;
Enumeration<InetAddress> addresses = current.getInetAddresses();
while (addresses.hasMoreElements()) {
InetAddress addr = addresses.nextElement();
if (addr.isLoopbackAddress()) continue;
if (addr instanceof Inet4Address || addr instanceof Inet6Address) {
return addr;
}
}
}
throw new SocketException("Can't get our ip address, interfaces are: " + interfaces);
}
}

View File

@ -101,6 +101,10 @@
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
</dependencies>
<profiles>

View File

@ -415,6 +415,10 @@
<groupId>org.jamon</groupId>
<artifactId>jamon-runtime</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<!-- REST dependencies -->
<dependency>
<groupId>com.google.protobuf</groupId>
@ -607,6 +611,13 @@
<artifactId>hadoop-minicluster</artifactId>
<scope>test</scope>
</dependency>
<!-- Hadoop needs Netty 3.x at test scope for the minicluster -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<version>3.6.2.Final</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
@ -660,6 +671,13 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
</dependency>
<!-- Hadoop needs Netty 3.x at test scope for the minicluster -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<version>3.6.2.Final</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>

View File

@ -685,7 +685,7 @@ public class TableMapReduceUtil {
org.apache.hadoop.hbase.mapreduce.TableMapper.class, // hbase-server
// pull necessary dependencies
org.apache.zookeeper.ZooKeeper.class,
org.jboss.netty.channel.ChannelFactory.class,
io.netty.channel.Channel.class,
com.google.protobuf.Message.class,
com.google.common.collect.Lists.class,
org.htrace.Trace.class);

View File

@ -21,6 +21,16 @@
package org.apache.hadoop.hbase.master;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.handler.codec.MessageToMessageEncoder;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Chore;
@ -28,24 +38,18 @@ import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelUpstreamHandler;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.DatagramChannel;
import org.jboss.netty.channel.socket.DatagramChannelFactory;
import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory;
import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
@ -54,8 +58,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Class to publish the cluster status to the client. This allows them to know immediately
@ -233,50 +235,65 @@ public class ClusterStatusPublisher extends Chore {
public static class MulticastPublisher implements Publisher {
private DatagramChannel channel;
private final ExecutorService service = Executors.newSingleThreadExecutor(
Threads.newDaemonThreadFactory("hbase-master-clusterStatus-worker"));
private final EventLoopGroup group = new NioEventLoopGroup(
1, Threads.newDaemonThreadFactory("hbase-master-clusterStatusPublisher"));
public MulticastPublisher() {
}
@Override
public void connect(Configuration conf) throws IOException {
NetworkInterface ni = NetworkInterface.getByInetAddress(Addressing.getIpAddress());
String mcAddress = conf.get(HConstants.STATUS_MULTICAST_ADDRESS,
HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS);
int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT,
HConstants.DEFAULT_STATUS_MULTICAST_PORT);
// Can't be NiO with Netty today => not implemented in Netty.
DatagramChannelFactory f = new OioDatagramChannelFactory(service);
ConnectionlessBootstrap b = new ConnectionlessBootstrap(f);
b.setPipeline(Channels.pipeline(new ProtobufEncoder(),
new ChannelUpstreamHandler() {
@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
// We're just writing here. Discard any incoming data. See HBASE-8466.
}
}));
channel = (DatagramChannel) b.bind(new InetSocketAddress(0));
channel.getConfig().setReuseAddress(true);
InetAddress ina;
final InetAddress ina;
try {
ina = InetAddress.getByName(mcAddress);
} catch (UnknownHostException e) {
close();
throw new IOException("Can't connect to " + mcAddress, e);
}
channel.joinGroup(ina);
channel.connect(new InetSocketAddress(mcAddress, port));
final InetSocketAddress isa = new InetSocketAddress(mcAddress, port);
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_REUSEADDR, true)
.handler(new ClusterStatusEncoder(isa));
try {
channel = (DatagramChannel) b.bind(new InetSocketAddress(0)).sync().channel();
channel.joinGroup(ina, ni, null, channel.newPromise()).sync();
channel.connect(isa).sync();
} catch (InterruptedException e) {
close();
throw ExceptionUtil.asInterrupt(e);
}
}
private static class ClusterStatusEncoder extends MessageToMessageEncoder<ClusterStatus> {
final private InetSocketAddress isa;
private ClusterStatusEncoder(InetSocketAddress isa) {
this.isa = isa;
}
@Override
protected void encode(ChannelHandlerContext channelHandlerContext,
ClusterStatus clusterStatus, List<Object> objects) {
ClusterStatusProtos.ClusterStatus csp = clusterStatus.convert();
objects.add(new DatagramPacket(Unpooled.wrappedBuffer(csp.toByteArray()), isa));
}
}
@Override
public void publish(ClusterStatus cs) {
ClusterStatusProtos.ClusterStatus csp = cs.convert();
channel.write(csp);
channel.writeAndFlush(cs).syncUninterruptibly();
}
@Override
@ -284,7 +301,7 @@ public class ClusterStatusPublisher extends Chore {
if (channel != null) {
channel.close();
}
service.shutdown();
group.shutdownGracefully();
}
}
}

View File

@ -212,7 +212,7 @@ public class TestHCM {
con1.close();
}
@Ignore ("Fails in IDEs: HBASE-9042") @Test(expected = RegionServerStoppedException.class)
@Test(expected = RegionServerStoppedException.class)
public void testClusterStatus() throws Exception {
TableName tn =
TableName.valueOf("testClusterStatus");

View File

@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.filter;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@ -35,8 +36,6 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@ -100,11 +99,11 @@ public class TestFuzzyRowAndColumnRangeFilter {
for (int i2 = 0; i2 < 5; i2++) {
byte[] rk = new byte[10];
ChannelBuffer buf = ChannelBuffers.wrappedBuffer(rk);
ByteBuffer buf = ByteBuffer.wrap(rk);
buf.clear();
buf.writeShort((short) 2);
buf.writeInt(i1);
buf.writeInt(i2);
buf.putShort((short) 2);
buf.putInt(i1);
buf.putInt(i2);
for (int c = 0; c < 5; c++) {
byte[] cq = new byte[4];
@ -132,12 +131,12 @@ public class TestFuzzyRowAndColumnRangeFilter {
private void runTest(HTable hTable, int cqStart, int expectedSize) throws IOException {
// [0, 2, ?, ?, ?, ?, 0, 0, 0, 1]
byte[] fuzzyKey = new byte[10];
ChannelBuffer buf = ChannelBuffers.wrappedBuffer(fuzzyKey);
ByteBuffer buf = ByteBuffer.wrap(fuzzyKey);
buf.clear();
buf.writeShort((short) 2);
buf.putShort((short) 2);
for (int i = 0; i < 4; i++)
buf.writeByte((short)63);
buf.writeInt((short)1);
buf.put((byte)63);
buf.putInt((short)1);
byte[] mask = new byte[] {0 , 0, 1, 1, 1, 1, 0, 0, 0, 0};

32
pom.xml
View File

@ -931,7 +931,7 @@
<clover.version>2.6.3</clover.version>
<jamon-runtime.version>2.3.1</jamon-runtime.version>
<jettison.version>1.3.1</jettison.version>
<netty.version>3.6.6.Final</netty.version>
<netty.version>4.0.19.Final</netty.version>
<!-- Plugin Dependencies -->
<maven.assembly.version>2.4</maven.assembly.version>
<maven.antrun.version>1.6</maven.antrun.version>
@ -1179,14 +1179,14 @@
<artifactId>jms</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
@ -1529,11 +1529,23 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop-two.version}</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>${hadoop-two.version}</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
@ -1541,6 +1553,12 @@
<version>${hadoop-two.version}</version>
<type>test-jar</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
@ -1604,6 +1622,10 @@
<groupId>stax</groupId>
<artifactId>stax-api</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
@ -1635,6 +1657,10 @@
<groupId>stax</groupId>
<artifactId>stax-api</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>