From b6646596c6aac2b6f91976eaad4993802d671f8e Mon Sep 17 00:00:00 2001 From: nkeywal Date: Tue, 20 May 2014 10:57:11 +0000 Subject: [PATCH] HBASE-10573 Use Netty 4 git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1596192 13f79535-47bb-0310-9956-ffa450edef68 --- hbase-client/pom.xml | 2 +- .../hbase/client/ClientIdGenerator.java | 18 +-- .../hbase/client/ClusterStatusListener.java | 118 ++++++++++-------- .../apache/hadoop/hbase/util/Addressing.java | 27 ++++ hbase-prefix-tree/pom.xml | 4 + hbase-server/pom.xml | 18 +++ .../hbase/mapreduce/TableMapReduceUtil.java | 2 +- .../hbase/master/ClusterStatusPublisher.java | 89 +++++++------ .../apache/hadoop/hbase/client/TestHCM.java | 2 +- .../TestFuzzyRowAndColumnRangeFilter.java | 19 ++- pom.xml | 32 ++++- 11 files changed, 211 insertions(+), 120 deletions(-) diff --git a/hbase-client/pom.xml b/hbase-client/pom.xml index 2ef0a6423d9..dccafd2abf9 100644 --- a/hbase-client/pom.xml +++ b/hbase-client/pom.xml @@ -116,7 +116,7 @@ io.netty - netty + netty-all org.apache.zookeeper diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientIdGenerator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientIdGenerator.java index 2b73abcf727..be04153c3d3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientIdGenerator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientIdGenerator.java @@ -29,6 +29,7 @@ import java.util.Enumeration; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; /** @@ -84,22 +85,7 @@ class ClientIdGenerator { */ public static byte[] getIpAddressBytes() { try { - // Before we connect somewhere, we cannot be sure about what we'd be bound to; however, - // we only connect when the message where client ID is, is long constructed. Thus, - // just use whichever IP address we can find. - Enumeration interfaces = NetworkInterface.getNetworkInterfaces(); - while (interfaces.hasMoreElements()) { - NetworkInterface current = interfaces.nextElement(); - if (!current.isUp() || current.isLoopback() || current.isVirtual()) continue; - Enumeration addresses = current.getInetAddresses(); - while (addresses.hasMoreElements()) { - InetAddress addr = addresses.nextElement(); - if (addr.isLoopbackAddress()) continue; - if (addr instanceof Inet4Address || addr instanceof Inet6Address) { - return addr.getAddress(); - } - } - } + return Addressing.getIpAddress().getAddress(); } catch (IOException ex) { LOG.warn("Failed to get IP address bytes", ex); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java index e3e0a68bb7a..3f8a6a58965 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java @@ -20,18 +20,16 @@ package org.apache.hadoop.hbase.client; -import java.io.Closeable; -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBufInputStream; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.DatagramChannel; +import io.netty.channel.socket.DatagramPacket; +import io.netty.channel.socket.nio.NioDatagramChannel; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -40,17 +38,19 @@ import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos; +import org.apache.hadoop.hbase.util.Addressing; +import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.hbase.util.Threads; -import org.jboss.netty.bootstrap.ConnectionlessBootstrap; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.ExceptionEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelUpstreamHandler; -import org.jboss.netty.channel.socket.DatagramChannel; -import org.jboss.netty.channel.socket.DatagramChannelFactory; -import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory; -import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder; + +import java.io.Closeable; +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; /** @@ -178,22 +178,14 @@ class ClusterStatusListener implements Closeable { */ class MulticastListener implements Listener { private DatagramChannel channel; - private final ExecutorService service = Executors.newSingleThreadExecutor( - Threads.newDaemonThreadFactory("hbase-client-clusterStatus-multiCastListener")); - + private final EventLoopGroup group = new NioEventLoopGroup( + 1, Threads.newDaemonThreadFactory("hbase-client-clusterStatusListener")); public MulticastListener() { } @Override public void connect(Configuration conf) throws IOException { - // Can't be NiO with Netty today => not implemented in Netty. - DatagramChannelFactory f = new OioDatagramChannelFactory(service); - - ConnectionlessBootstrap b = new ConnectionlessBootstrap(f); - b.setPipeline(Channels.pipeline( - new ProtobufDecoder(ClusterStatusProtos.ClusterStatus.getDefaultInstance()), - new ClusterStatusHandler())); String mcAddress = conf.get(HConstants.STATUS_MULTICAST_ADDRESS, HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS); @@ -202,17 +194,29 @@ class ClusterStatusListener implements Closeable { int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT, HConstants.DEFAULT_STATUS_MULTICAST_PORT); - channel = (DatagramChannel) b.bind(new InetSocketAddress(bindAddress, port)); - - channel.getConfig().setReuseAddress(true); - InetAddress ina; try { ina = InetAddress.getByName(mcAddress); } catch (UnknownHostException e) { + close(); throw new IOException("Can't connect to " + mcAddress, e); } - channel.joinGroup(ina); + + try { + Bootstrap b = new Bootstrap(); + b.group(group) + .channel(NioDatagramChannel.class) + .option(ChannelOption.SO_REUSEADDR, true) + .handler(new ClusterStatusHandler()); + + channel = (DatagramChannel)b.bind(bindAddress, port).sync().channel(); + } catch (InterruptedException e) { + close(); + throw ExceptionUtil.asInterrupt(e); + } + + NetworkInterface ni = NetworkInterface.getByInetAddress(Addressing.getIpAddress()); + channel.joinGroup(ina, ni, null, channel.newPromise()); } @Override @@ -221,30 +225,40 @@ class ClusterStatusListener implements Closeable { channel.close(); channel = null; } - service.shutdown(); + group.shutdownGracefully(); } + /** * Class, conforming to the Netty framework, that manages the message received. */ - private class ClusterStatusHandler extends SimpleChannelUpstreamHandler { + private class ClusterStatusHandler extends SimpleChannelInboundHandler { @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { - ClusterStatusProtos.ClusterStatus csp = (ClusterStatusProtos.ClusterStatus) e.getMessage(); - ClusterStatus ncs = ClusterStatus.convert(csp); - receive(ncs); - } - - /** - * Invoked when an exception was raised by an I/O thread or a - * {@link org.jboss.netty.channel.ChannelHandler}. - */ - @Override public void exceptionCaught( - ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { - LOG.error("Unexpected exception, continuing.", e.getCause()); + ChannelHandlerContext ctx, Throwable cause) + throws Exception { + LOG.error("Unexpected exception, continuing.", cause); + } + + @Override + public boolean acceptInboundMessage(Object msg) + throws Exception { + return super.acceptInboundMessage(msg); + } + + + @Override + protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket dp) throws Exception { + ByteBufInputStream bis = new ByteBufInputStream(dp.content()); + try { + ClusterStatusProtos.ClusterStatus csp = ClusterStatusProtos.ClusterStatus.parseFrom(bis); + ClusterStatus ncs = ClusterStatus.convert(csp); + receive(ncs); + } finally { + bis.close(); + } } } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Addressing.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Addressing.java index a9661e1a6e5..182cc996a3f 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Addressing.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Addressing.java @@ -18,7 +18,13 @@ */ package org.apache.hadoop.hbase.util; +import java.net.Inet4Address; +import java.net.Inet6Address; +import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.util.Enumeration; import org.apache.hadoop.classification.InterfaceAudience; @@ -75,4 +81,25 @@ public class Addressing { } return Integer.parseInt(hostAndPort.substring(colonIndex + 1)); } + + public static InetAddress getIpAddress() throws SocketException { + // Before we connect somewhere, we cannot be sure about what we'd be bound to; however, + // we only connect when the message where client ID is, is long constructed. Thus, + // just use whichever IP address we can find. + Enumeration interfaces = NetworkInterface.getNetworkInterfaces(); + while (interfaces.hasMoreElements()) { + NetworkInterface current = interfaces.nextElement(); + if (!current.isUp() || current.isLoopback() || current.isVirtual()) continue; + Enumeration addresses = current.getInetAddresses(); + while (addresses.hasMoreElements()) { + InetAddress addr = addresses.nextElement(); + if (addr.isLoopbackAddress()) continue; + if (addr instanceof Inet4Address || addr instanceof Inet6Address) { + return addr; + } + } + } + + throw new SocketException("Can't get our ip address, interfaces are: " + interfaces); + } } diff --git a/hbase-prefix-tree/pom.xml b/hbase-prefix-tree/pom.xml index e550573adcc..16cfdebf20d 100644 --- a/hbase-prefix-tree/pom.xml +++ b/hbase-prefix-tree/pom.xml @@ -101,6 +101,10 @@ commons-logging commons-logging + + io.netty + netty-all + diff --git a/hbase-server/pom.xml b/hbase-server/pom.xml index 60343abe854..0b31db1f64d 100644 --- a/hbase-server/pom.xml +++ b/hbase-server/pom.xml @@ -415,6 +415,10 @@ org.jamon jamon-runtime + + io.netty + netty-all + com.google.protobuf @@ -607,6 +611,13 @@ hadoop-minicluster test + + + io.netty + netty + 3.6.2.Final + test + @@ -660,6 +671,13 @@ org.apache.hadoop hadoop-minicluster + + + io.netty + netty + 3.6.2.Final + test + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java index 4a3c31f67ce..3e79049350d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java @@ -685,7 +685,7 @@ public class TableMapReduceUtil { org.apache.hadoop.hbase.mapreduce.TableMapper.class, // hbase-server // pull necessary dependencies org.apache.zookeeper.ZooKeeper.class, - org.jboss.netty.channel.ChannelFactory.class, + io.netty.channel.Channel.class, com.google.protobuf.Message.class, com.google.common.collect.Lists.class, org.htrace.Trace.class); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java index 32a7552e0cb..6fe190f61b6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java @@ -21,6 +21,16 @@ package org.apache.hadoop.hbase.master; +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.DatagramChannel; +import io.netty.channel.socket.DatagramPacket; +import io.netty.channel.socket.nio.NioDatagramChannel; +import io.netty.handler.codec.MessageToMessageEncoder; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Chore; @@ -28,24 +38,18 @@ import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos; +import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.VersionInfo; -import org.jboss.netty.bootstrap.ConnectionlessBootstrap; -import org.jboss.netty.channel.ChannelEvent; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelUpstreamHandler; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.socket.DatagramChannel; -import org.jboss.netty.channel.socket.DatagramChannelFactory; -import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory; -import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder; import java.io.Closeable; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.NetworkInterface; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; @@ -54,8 +58,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; /** * Class to publish the cluster status to the client. This allows them to know immediately @@ -233,50 +235,65 @@ public class ClusterStatusPublisher extends Chore { public static class MulticastPublisher implements Publisher { private DatagramChannel channel; - private final ExecutorService service = Executors.newSingleThreadExecutor( - Threads.newDaemonThreadFactory("hbase-master-clusterStatus-worker")); + private final EventLoopGroup group = new NioEventLoopGroup( + 1, Threads.newDaemonThreadFactory("hbase-master-clusterStatusPublisher")); public MulticastPublisher() { } @Override public void connect(Configuration conf) throws IOException { + NetworkInterface ni = NetworkInterface.getByInetAddress(Addressing.getIpAddress()); + String mcAddress = conf.get(HConstants.STATUS_MULTICAST_ADDRESS, HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS); int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT, HConstants.DEFAULT_STATUS_MULTICAST_PORT); - // Can't be NiO with Netty today => not implemented in Netty. - DatagramChannelFactory f = new OioDatagramChannelFactory(service); - - ConnectionlessBootstrap b = new ConnectionlessBootstrap(f); - b.setPipeline(Channels.pipeline(new ProtobufEncoder(), - new ChannelUpstreamHandler() { - @Override - public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) - throws Exception { - // We're just writing here. Discard any incoming data. See HBASE-8466. - } - })); - - - channel = (DatagramChannel) b.bind(new InetSocketAddress(0)); - channel.getConfig().setReuseAddress(true); - - InetAddress ina; + final InetAddress ina; try { ina = InetAddress.getByName(mcAddress); } catch (UnknownHostException e) { + close(); throw new IOException("Can't connect to " + mcAddress, e); } - channel.joinGroup(ina); - channel.connect(new InetSocketAddress(mcAddress, port)); + + final InetSocketAddress isa = new InetSocketAddress(mcAddress, port); + + Bootstrap b = new Bootstrap(); + b.group(group) + .channel(NioDatagramChannel.class) + .option(ChannelOption.SO_REUSEADDR, true) + .handler(new ClusterStatusEncoder(isa)); + + try { + channel = (DatagramChannel) b.bind(new InetSocketAddress(0)).sync().channel(); + channel.joinGroup(ina, ni, null, channel.newPromise()).sync(); + channel.connect(isa).sync(); + } catch (InterruptedException e) { + close(); + throw ExceptionUtil.asInterrupt(e); + } + } + + private static class ClusterStatusEncoder extends MessageToMessageEncoder { + final private InetSocketAddress isa; + + private ClusterStatusEncoder(InetSocketAddress isa) { + this.isa = isa; + } + + @Override + protected void encode(ChannelHandlerContext channelHandlerContext, + ClusterStatus clusterStatus, List objects) { + ClusterStatusProtos.ClusterStatus csp = clusterStatus.convert(); + objects.add(new DatagramPacket(Unpooled.wrappedBuffer(csp.toByteArray()), isa)); + } } @Override public void publish(ClusterStatus cs) { - ClusterStatusProtos.ClusterStatus csp = cs.convert(); - channel.write(csp); + channel.writeAndFlush(cs).syncUninterruptibly(); } @Override @@ -284,7 +301,7 @@ public class ClusterStatusPublisher extends Chore { if (channel != null) { channel.close(); } - service.shutdown(); + group.shutdownGracefully(); } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index 27687b3e11e..e4e9405f798 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -212,7 +212,7 @@ public class TestHCM { con1.close(); } - @Ignore ("Fails in IDEs: HBASE-9042") @Test(expected = RegionServerStoppedException.class) + @Test(expected = RegionServerStoppedException.class) public void testClusterStatus() throws Exception { TableName tn = TableName.valueOf("testClusterStatus"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFuzzyRowAndColumnRangeFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFuzzyRowAndColumnRangeFilter.java index cccab376afa..1f853ac7184 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFuzzyRowAndColumnRangeFilter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFuzzyRowAndColumnRangeFilter.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.filter; import static org.junit.Assert.assertEquals; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -35,8 +36,6 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffers; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -100,11 +99,11 @@ public class TestFuzzyRowAndColumnRangeFilter { for (int i2 = 0; i2 < 5; i2++) { byte[] rk = new byte[10]; - ChannelBuffer buf = ChannelBuffers.wrappedBuffer(rk); + ByteBuffer buf = ByteBuffer.wrap(rk); buf.clear(); - buf.writeShort((short) 2); - buf.writeInt(i1); - buf.writeInt(i2); + buf.putShort((short) 2); + buf.putInt(i1); + buf.putInt(i2); for (int c = 0; c < 5; c++) { byte[] cq = new byte[4]; @@ -132,12 +131,12 @@ public class TestFuzzyRowAndColumnRangeFilter { private void runTest(HTable hTable, int cqStart, int expectedSize) throws IOException { // [0, 2, ?, ?, ?, ?, 0, 0, 0, 1] byte[] fuzzyKey = new byte[10]; - ChannelBuffer buf = ChannelBuffers.wrappedBuffer(fuzzyKey); + ByteBuffer buf = ByteBuffer.wrap(fuzzyKey); buf.clear(); - buf.writeShort((short) 2); + buf.putShort((short) 2); for (int i = 0; i < 4; i++) - buf.writeByte((short)63); - buf.writeInt((short)1); + buf.put((byte)63); + buf.putInt((short)1); byte[] mask = new byte[] {0 , 0, 1, 1, 1, 1, 0, 0, 0, 0}; diff --git a/pom.xml b/pom.xml index 52a67ec9731..160a92a5543 100644 --- a/pom.xml +++ b/pom.xml @@ -931,7 +931,7 @@ 2.6.3 2.3.1 1.3.1 - 3.6.6.Final + 4.0.19.Final 2.4 1.6 @@ -1179,14 +1179,14 @@ jms - org.jboss.netty + io.netty netty io.netty - netty + netty-all ${netty.version} @@ -1529,11 +1529,23 @@ org.apache.hadoop hadoop-mapreduce-client-core ${hadoop-two.version} + + + io.netty + netty + + org.apache.hadoop hadoop-mapreduce-client-jobclient ${hadoop-two.version} + + + io.netty + netty + + org.apache.hadoop @@ -1541,6 +1553,12 @@ ${hadoop-two.version} test-jar test + + + io.netty + netty + + org.apache.hadoop @@ -1604,6 +1622,10 @@ stax stax-api + + io.netty + netty + @@ -1635,6 +1657,10 @@ stax stax-api + + io.netty + netty +