diff --git a/hadoop-common-project/hadoop-nfs/pom.xml b/hadoop-common-project/hadoop-nfs/pom.xml index c0b4e14177c..baabb0fdc6e 100644 --- a/hadoop-common-project/hadoop-nfs/pom.xml +++ b/hadoop-common-project/hadoop-nfs/pom.xml @@ -90,7 +90,7 @@ io.netty - netty + netty-all compile diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/mount/MountdBase.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/mount/MountdBase.java index 0ff3084bf3e..58d3e51f2bd 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/mount/MountdBase.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/mount/MountdBase.java @@ -41,6 +41,8 @@ abstract public class MountdBase { private final RpcProgram rpcProgram; private int udpBoundPort; // Will set after server starts private int tcpBoundPort; // Will set after server starts + private SimpleUdpServer udpServer = null; + private SimpleTcpServer tcpServer = null; public RpcProgram getRpcProgram() { return rpcProgram; @@ -57,7 +59,7 @@ abstract public class MountdBase { /* Start UDP server */ private void startUDPServer() { - SimpleUdpServer udpServer = new SimpleUdpServer(rpcProgram.getPort(), + udpServer = new SimpleUdpServer(rpcProgram.getPort(), rpcProgram, 1); rpcProgram.startDaemons(); try { @@ -76,7 +78,7 @@ abstract public class MountdBase { /* Start TCP server */ private void startTCPServer() { - SimpleTcpServer tcpServer = new SimpleTcpServer(rpcProgram.getPort(), + tcpServer = new SimpleTcpServer(rpcProgram.getPort(), rpcProgram, 1); rpcProgram.startDaemons(); try { @@ -118,6 +120,14 @@ abstract public class MountdBase { rpcProgram.unregister(PortmapMapping.TRANSPORT_TCP, tcpBoundPort); tcpBoundPort = 0; } + if (udpServer != null) { + udpServer.shutdown(); + udpServer = null; + } + if (tcpServer != null) { + tcpServer.shutdown(); + tcpServer = null; + } } /** diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java index ff83a5f19be..e6ea29b42bf 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java @@ -35,6 +35,7 @@ public abstract class Nfs3Base { public static final Logger LOG = LoggerFactory.getLogger(Nfs3Base.class); private final RpcProgram rpcProgram; private int nfsBoundPort; // Will set after server starts + private SimpleTcpServer tcpServer = null; public RpcProgram getRpcProgram() { return rpcProgram; @@ -61,7 +62,7 @@ public abstract class Nfs3Base { } private void startTCPServer() { - SimpleTcpServer tcpServer = new SimpleTcpServer(rpcProgram.getPort(), + tcpServer = new SimpleTcpServer(rpcProgram.getPort(), rpcProgram, 0); rpcProgram.startDaemons(); try { @@ -84,6 +85,10 @@ public abstract class Nfs3Base { nfsBoundPort = 0; } rpcProgram.stopDaemons(); + if (tcpServer != null) { + tcpServer.shutdown(); + tcpServer = null; + } } /** * Priority of the nfsd shutdown hook. diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RegistrationClient.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RegistrationClient.java index c8528ba4d55..c96f1d53bb4 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RegistrationClient.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RegistrationClient.java @@ -19,10 +19,9 @@ package org.apache.hadoop.oncrpc; import java.util.Arrays; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.MessageEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,10 +57,10 @@ public class RegistrationClient extends SimpleTcpClient { } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { - ChannelBuffer buf = (ChannelBuffer) e.getMessage(); // Read reply + public void channelRead(ChannelHandlerContext ctx, Object msg) { + ByteBuf buf = (ByteBuf) msg; // Read reply if (!validMessageLength(buf.readableBytes())) { - e.getChannel().close(); + ctx.channel().close(); return; } @@ -83,7 +82,7 @@ public class RegistrationClient extends SimpleTcpClient { RpcDeniedReply deniedReply = (RpcDeniedReply) reply; handle(deniedReply); } - e.getChannel().close(); // shutdown now that request is complete + ctx.channel().close(); // shutdown now that request is complete } private void handle(RpcDeniedReply deniedReply) { diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcInfo.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcInfo.java index b434d79285c..aba8e9ea262 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcInfo.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcInfo.java @@ -19,9 +19,9 @@ package org.apache.hadoop.oncrpc; import java.net.SocketAddress; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; /** * RpcInfo records all contextual information of an RPC message. It contains @@ -29,11 +29,11 @@ import org.jboss.netty.channel.ChannelHandlerContext; */ public final class RpcInfo { private final RpcMessage header; - private final ChannelBuffer data; + private final ByteBuf data; private final Channel channel; private final SocketAddress remoteAddress; - public RpcInfo(RpcMessage header, ChannelBuffer data, + public RpcInfo(RpcMessage header, ByteBuf data, ChannelHandlerContext channelContext, Channel channel, SocketAddress remoteAddress) { this.header = header; @@ -46,7 +46,7 @@ public final class RpcInfo { return header; } - public ChannelBuffer data() { + public ByteBuf data() { return data; } diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java index bafb49716b6..8b8d558255f 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java @@ -22,16 +22,15 @@ import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.net.SocketAddress; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState; import org.apache.hadoop.oncrpc.security.VerifierNone; import org.apache.hadoop.portmap.PortmapMapping; import org.apache.hadoop.portmap.PortmapRequest; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelUpstreamHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +38,7 @@ import org.slf4j.LoggerFactory; * Class for writing RPC server programs based on RFC 1050. Extend this class * and implement {@link #handleInternal} to handle the requests received. */ -public abstract class RpcProgram extends SimpleChannelUpstreamHandler { +public abstract class RpcProgram extends ChannelInboundHandlerAdapter { static final Logger LOG = LoggerFactory.getLogger(RpcProgram.class); public static final int RPCB_PORT = 111; private final String program; @@ -161,9 +160,9 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler { public void stopDaemons() {} @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - RpcInfo info = (RpcInfo) e.getMessage(); + RpcInfo info = (RpcInfo) msg; RpcCall call = (RpcCall) info.header(); SocketAddress remoteAddress = info.remoteAddress(); @@ -221,7 +220,7 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler { out.writeInt(lowProgVersion); out.writeInt(highProgVersion); } - ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap() + ByteBuf b = Unpooled.wrappedBuffer(out.asReadOnlyWrap() .buffer()); RpcResponse rsp = new RpcResponse(b, remoteAddress); RpcUtil.sendRpcResponse(ctx, rsp); @@ -234,7 +233,7 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler { RpcReply.ReplyState.MSG_DENIED, RpcDeniedReply.RejectState.AUTH_ERROR, new VerifierNone()); reply.write(out); - ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap() + ByteBuf buf = Unpooled.wrappedBuffer(out.asReadOnlyWrap() .buffer()); RpcResponse rsp = new RpcResponse(buf, remoteAddress); RpcUtil.sendRpcResponse(ctx, rsp); diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcResponse.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcResponse.java index 2e45e6100b1..0d6431f68bd 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcResponse.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcResponse.java @@ -19,27 +19,30 @@ package org.apache.hadoop.oncrpc; import java.net.SocketAddress; -import org.jboss.netty.buffer.ChannelBuffer; +import io.netty.buffer.ByteBuf; +import io.netty.channel.DefaultAddressedEnvelope; /** * RpcResponse encapsulates a response to a RPC request. It contains the data * that is going to cross the wire, as well as the information of the remote * peer. */ -public class RpcResponse { - private final ChannelBuffer data; - private final SocketAddress remoteAddress; - - public RpcResponse(ChannelBuffer data, SocketAddress remoteAddress) { - this.data = data; - this.remoteAddress = remoteAddress; +public class RpcResponse extends + DefaultAddressedEnvelope { + public RpcResponse(ByteBuf message, SocketAddress recipient) { + super(message, recipient, null); } - public ChannelBuffer data() { - return data; + public RpcResponse(ByteBuf message, SocketAddress recipient, + SocketAddress sender) { + super(message, recipient, sender); + } + + public ByteBuf data() { + return this.content(); } public SocketAddress remoteAddress() { - return remoteAddress; + return this.recipient(); } } diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java index cebebd27d0c..e8bc27d687f 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java @@ -17,16 +17,18 @@ */ package org.apache.hadoop.oncrpc; +import java.net.SocketAddress; import java.nio.ByteBuffer; +import java.util.List; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelUpstreamHandler; -import org.jboss.netty.handler.codec.frame.FrameDecoder; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.socket.DatagramPacket; +import io.netty.handler.codec.ByteToMessageDecoder; +import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,16 +45,16 @@ public final class RpcUtil { public static void sendRpcResponse(ChannelHandlerContext ctx, RpcResponse response) { - Channels.fireMessageReceived(ctx, response); + ctx.fireChannelRead(response); } - public static FrameDecoder constructRpcFrameDecoder() { + public static ByteToMessageDecoder constructRpcFrameDecoder() { return new RpcFrameDecoder(); } - public static final SimpleChannelUpstreamHandler STAGE_RPC_MESSAGE_PARSER = new RpcMessageParserStage(); - public static final SimpleChannelUpstreamHandler STAGE_RPC_TCP_RESPONSE = new RpcTcpResponseStage(); - public static final SimpleChannelUpstreamHandler STAGE_RPC_UDP_RESPONSE = new RpcUdpResponseStage(); + public static final ChannelInboundHandlerAdapter STAGE_RPC_MESSAGE_PARSER = new RpcMessageParserStage(); + public static final ChannelInboundHandlerAdapter STAGE_RPC_TCP_RESPONSE = new RpcTcpResponseStage(); + public static final ChannelInboundHandlerAdapter STAGE_RPC_UDP_RESPONSE = new RpcUdpResponseStage(); /** * An RPC client can separate a RPC message into several frames (i.e., @@ -62,44 +64,39 @@ public final class RpcUtil { * RpcFrameDecoder is a stateful pipeline stage. It has to be constructed for * each RPC client. */ - static class RpcFrameDecoder extends FrameDecoder { + static class RpcFrameDecoder extends ByteToMessageDecoder { public static final Logger LOG = LoggerFactory.getLogger(RpcFrameDecoder.class); - private ChannelBuffer currentFrame; + private volatile boolean isLast; @Override - protected Object decode(ChannelHandlerContext ctx, Channel channel, - ChannelBuffer buf) { + protected void decode(ChannelHandlerContext ctx, ByteBuf buf, + List out) { - if (buf.readableBytes() < 4) - return null; + if (buf.readableBytes() < 4) { + return; + } buf.markReaderIndex(); byte[] fragmentHeader = new byte[4]; buf.readBytes(fragmentHeader); int length = XDR.fragmentSize(fragmentHeader); - boolean isLast = XDR.isLastFragment(fragmentHeader); + isLast = XDR.isLastFragment(fragmentHeader); if (buf.readableBytes() < length) { buf.resetReaderIndex(); - return null; + return; } - ChannelBuffer newFragment = buf.readSlice(length); - if (currentFrame == null) { - currentFrame = newFragment; - } else { - currentFrame = ChannelBuffers.wrappedBuffer(currentFrame, newFragment); - } + ByteBuf newFragment = buf.readSlice(length); + newFragment.retain(); + out.add(newFragment); + } - if (isLast) { - ChannelBuffer completeFrame = currentFrame; - currentFrame = null; - return completeFrame; - } else { - return null; - } + @VisibleForTesting + public boolean isLast() { + return isLast; } } @@ -107,30 +104,44 @@ public final class RpcUtil { * RpcMessageParserStage parses the network bytes and encapsulates the RPC * request into a RpcInfo instance. */ - static final class RpcMessageParserStage extends SimpleChannelUpstreamHandler { + @ChannelHandler.Sharable + static final class RpcMessageParserStage extends ChannelInboundHandlerAdapter { private static final Logger LOG = LoggerFactory .getLogger(RpcMessageParserStage.class); @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - ChannelBuffer buf = (ChannelBuffer) e.getMessage(); - ByteBuffer b = buf.toByteBuffer().asReadOnlyBuffer(); + ByteBuf buf; + SocketAddress remoteAddress; + if (msg instanceof DatagramPacket) { + DatagramPacket packet = (DatagramPacket)msg; + buf = packet.content(); + remoteAddress = packet.sender(); + } else { + buf = (ByteBuf) msg; + remoteAddress = ctx.channel().remoteAddress(); + } + + ByteBuffer b = buf.nioBuffer().asReadOnlyBuffer(); XDR in = new XDR(b, XDR.State.READING); RpcInfo info = null; try { RpcCall callHeader = RpcCall.read(in); - ChannelBuffer dataBuffer = ChannelBuffers.wrappedBuffer(in.buffer() + ByteBuf dataBuffer = Unpooled.wrappedBuffer(in.buffer() .slice()); - info = new RpcInfo(callHeader, dataBuffer, ctx, e.getChannel(), - e.getRemoteAddress()); + + info = new RpcInfo(callHeader, dataBuffer, ctx, ctx.channel(), + remoteAddress); } catch (Exception exc) { - LOG.info("Malformed RPC request from " + e.getRemoteAddress()); + LOG.info("Malformed RPC request from " + remoteAddress); + } finally { + buf.release(); } if (info != null) { - Channels.fireMessageReceived(ctx, info); + ctx.fireChannelRead(info); } } } @@ -139,16 +150,17 @@ public final class RpcUtil { * RpcTcpResponseStage sends an RpcResponse across the wire with the * appropriate fragment header. */ - private static class RpcTcpResponseStage extends SimpleChannelUpstreamHandler { + @ChannelHandler.Sharable + private static class RpcTcpResponseStage extends ChannelInboundHandlerAdapter { @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - RpcResponse r = (RpcResponse) e.getMessage(); + RpcResponse r = (RpcResponse) msg; byte[] fragmentHeader = XDR.recordMark(r.data().readableBytes(), true); - ChannelBuffer header = ChannelBuffers.wrappedBuffer(fragmentHeader); - ChannelBuffer d = ChannelBuffers.wrappedBuffer(header, r.data()); - e.getChannel().write(d); + ByteBuf header = Unpooled.wrappedBuffer(fragmentHeader); + ByteBuf d = Unpooled.wrappedBuffer(header, r.data()); + ctx.channel().writeAndFlush(d); } } @@ -156,14 +168,17 @@ public final class RpcUtil { * RpcUdpResponseStage sends an RpcResponse as a UDP packet, which does not * require a fragment header. */ + @ChannelHandler.Sharable private static final class RpcUdpResponseStage extends - SimpleChannelUpstreamHandler { + ChannelInboundHandlerAdapter { @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - RpcResponse r = (RpcResponse) e.getMessage(); - e.getChannel().write(r.data(), r.remoteAddress()); + RpcResponse r = (RpcResponse) msg; + // TODO: check out https://github.com/netty/netty/issues/1282 for + // correct usage + ctx.channel().writeAndFlush(r.data()); } } } diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java index 32e1b4b8392..7cfef6439b0 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java @@ -18,15 +18,16 @@ package org.apache.hadoop.oncrpc; import java.net.InetSocketAddress; -import java.util.concurrent.Executors; -import org.jboss.netty.bootstrap.ClientBootstrap; -import org.jboss.netty.channel.ChannelFactory; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; /** * A simple TCP based RPC client which just sends a request to a server. @@ -35,8 +36,9 @@ public class SimpleTcpClient { protected final String host; protected final int port; protected final XDR request; - protected ChannelPipelineFactory pipelineFactory; protected final boolean oneShot; + private NioEventLoopGroup workerGroup; + private ChannelFuture future; public SimpleTcpClient(String host, int port, XDR request) { this(host,port, request, true); @@ -48,40 +50,54 @@ public class SimpleTcpClient { this.request = request; this.oneShot = oneShot; } - - protected ChannelPipelineFactory setPipelineFactory() { - this.pipelineFactory = new ChannelPipelineFactory() { + + protected ChannelInitializer setChannelHandler() { + return new ChannelInitializer() { @Override - public ChannelPipeline getPipeline() { - return Channels.pipeline( + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + p.addLast( RpcUtil.constructRpcFrameDecoder(), - new SimpleTcpClientHandler(request)); + new SimpleTcpClientHandler(request) + ); } }; - return this.pipelineFactory; } + @VisibleForTesting public void run() { // Configure the client. - ChannelFactory factory = new NioClientSocketChannelFactory( - Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), 1, 1); - ClientBootstrap bootstrap = new ClientBootstrap(factory); + workerGroup = new NioEventLoopGroup(); + Bootstrap bootstrap = new Bootstrap() + .group(workerGroup) + .channel(NioSocketChannel.class); - // Set up the pipeline factory. - bootstrap.setPipelineFactory(setPipelineFactory()); + try { + future = bootstrap.handler(setChannelHandler()) + .option(ChannelOption.TCP_NODELAY, true) + .option(ChannelOption.SO_KEEPALIVE, true) + .connect(new InetSocketAddress(host, port)).sync(); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + if (oneShot) { + stop(); + } + } + } - bootstrap.setOption("tcpNoDelay", true); - bootstrap.setOption("keepAlive", true); - - // Start the connection attempt. - ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)); - - if (oneShot) { - // Wait until the connection is closed or the connection attempt fails. - future.getChannel().getCloseFuture().awaitUninterruptibly(); + public void stop() { + try { + if (future != null) { + // Wait until the connection is closed or the connection attempt fails. + future.channel().closeFuture().sync(); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { // Shut down thread pools to exit. - bootstrap.releaseExternalResources(); + workerGroup.shutdownGracefully(); } } } diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClientHandler.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClientHandler.java index 23b6682361c..1acefc857f8 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClientHandler.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClientHandler.java @@ -17,19 +17,19 @@ */ package org.apache.hadoop.oncrpc; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.ExceptionEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelHandler; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.util.ReferenceCountUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * A simple TCP based RPC client handler used by {@link SimpleTcpServer}. */ -public class SimpleTcpClientHandler extends SimpleChannelHandler { +public class SimpleTcpClientHandler extends ChannelInboundHandlerAdapter { public static final Logger LOG = LoggerFactory.getLogger(SimpleTcpClient.class); protected final XDR request; @@ -39,13 +39,13 @@ public class SimpleTcpClientHandler extends SimpleChannelHandler { } @Override - public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) { + public void channelActive(ChannelHandlerContext ctx) throws Exception { // Send the request if (LOG.isDebugEnabled()) { LOG.debug("sending PRC request"); } - ChannelBuffer outBuf = XDR.writeMessageTcp(request, true); - e.getChannel().write(outBuf); + ByteBuf outBuf = XDR.writeMessageTcp(request, true); + ctx.channel().writeAndFlush(outBuf); } /** @@ -53,13 +53,13 @@ public class SimpleTcpClientHandler extends SimpleChannelHandler { * more interaction with the server. */ @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { - e.getChannel().close(); + public void channelRead(ChannelHandlerContext ctx, Object msg) { + ctx.channel().close(); } @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { - LOG.warn("Unexpected exception from downstream: ", e.getCause()); - e.getChannel().close(); + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + LOG.warn("Unexpected exception from downstream: ", cause.getCause()); + ctx.channel().close(); } } diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java index 177fa3d80b1..29155c80b18 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java @@ -20,14 +20,17 @@ package org.apache.hadoop.oncrpc; import java.net.InetSocketAddress; import java.util.concurrent.Executors; -import org.jboss.netty.bootstrap.ServerBootstrap; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFactory; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.SimpleChannelUpstreamHandler; -import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,9 +42,11 @@ public class SimpleTcpServer { LoggerFactory.getLogger(SimpleTcpServer.class); protected final int port; protected int boundPort = -1; // Will be set after server starts - protected final SimpleChannelUpstreamHandler rpcProgram; + protected final ChannelInboundHandlerAdapter rpcProgram; private ServerBootstrap server; private Channel ch; + private EventLoopGroup bossGroup; + private EventLoopGroup workerGroup; /** The maximum number of I/O worker threads */ protected final int workerCount; @@ -57,37 +62,32 @@ public class SimpleTcpServer { this.workerCount = workercount; } - public void run() { + public void run() throws InterruptedException { // Configure the Server. - ChannelFactory factory; - if (workerCount == 0) { - // Use default workers: 2 * the number of available processors - factory = new NioServerSocketChannelFactory( - Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); - } else { - factory = new NioServerSocketChannelFactory( - Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), - workerCount); - } + bossGroup = new NioEventLoopGroup(); + workerGroup = new NioEventLoopGroup(workerCount, Executors.newCachedThreadPool()); - server = new ServerBootstrap(factory); - server.setPipelineFactory(new ChannelPipelineFactory() { + server = new ServerBootstrap(); + server.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { @Override - public ChannelPipeline getPipeline() throws Exception { - return Channels.pipeline(RpcUtil.constructRpcFrameDecoder(), + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + p.addLast(RpcUtil.constructRpcFrameDecoder(), RpcUtil.STAGE_RPC_MESSAGE_PARSER, rpcProgram, RpcUtil.STAGE_RPC_TCP_RESPONSE); - } - }); - server.setOption("child.tcpNoDelay", true); - server.setOption("child.keepAlive", true); - server.setOption("child.reuseAddress", true); - server.setOption("reuseAddress", true); + }}) + .childOption(ChannelOption.TCP_NODELAY, true) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .childOption(ChannelOption.SO_REUSEADDR, true) + .option(ChannelOption.SO_REUSEADDR, true); // Listen to TCP port - ch = server.bind(new InetSocketAddress(port)); - InetSocketAddress socketAddr = (InetSocketAddress) ch.getLocalAddress(); + ChannelFuture f = server.bind(new InetSocketAddress(port)).sync(); + ch = f.channel(); + InetSocketAddress socketAddr = (InetSocketAddress) ch.localAddress(); boundPort = socketAddr.getPort(); LOG.info("Started listening to TCP requests at port " + boundPort + " for " @@ -102,9 +102,17 @@ public class SimpleTcpServer { public void shutdown() { if (ch != null) { ch.close().awaitUninterruptibly(); + ch = null; } - if (server != null) { - server.releaseExternalResources(); + + if (workerGroup != null) { + workerGroup.shutdownGracefully(); + workerGroup = null; + } + + if (bossGroup != null) { + bossGroup.shutdownGracefully(); + bossGroup = null; } } } diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java index e65003ca64b..516503c323a 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java @@ -20,12 +20,16 @@ package org.apache.hadoop.oncrpc; import java.net.InetSocketAddress; import java.util.concurrent.Executors; -import org.jboss.netty.bootstrap.ConnectionlessBootstrap; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.SimpleChannelUpstreamHandler; -import org.jboss.netty.channel.socket.DatagramChannelFactory; -import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioDatagramChannel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,36 +43,45 @@ public class SimpleUdpServer { private final int RECEIVE_BUFFER_SIZE = 65536; protected final int port; - protected final SimpleChannelUpstreamHandler rpcProgram; + protected final ChannelInboundHandlerAdapter rpcProgram; protected final int workerCount; protected int boundPort = -1; // Will be set after server starts - private ConnectionlessBootstrap server; + private Bootstrap server; private Channel ch; + private EventLoopGroup workerGroup; - public SimpleUdpServer(int port, SimpleChannelUpstreamHandler program, + public SimpleUdpServer(int port, ChannelInboundHandlerAdapter program, int workerCount) { this.port = port; this.rpcProgram = program; this.workerCount = workerCount; } - public void run() { - // Configure the client. - DatagramChannelFactory f = new NioDatagramChannelFactory( - Executors.newCachedThreadPool(), workerCount); + public void run() throws InterruptedException { + workerGroup = new NioEventLoopGroup(workerCount, Executors.newCachedThreadPool()); - server = new ConnectionlessBootstrap(f); - server.setPipeline(Channels.pipeline(RpcUtil.STAGE_RPC_MESSAGE_PARSER, - rpcProgram, RpcUtil.STAGE_RPC_UDP_RESPONSE)); - - server.setOption("broadcast", "false"); - server.setOption("sendBufferSize", SEND_BUFFER_SIZE); - server.setOption("receiveBufferSize", RECEIVE_BUFFER_SIZE); - server.setOption("reuseAddress", true); + server = new Bootstrap(); + server.group(workerGroup) + .channel(NioDatagramChannel.class) + .option(ChannelOption.SO_BROADCAST, true) + .option(ChannelOption.SO_SNDBUF, SEND_BUFFER_SIZE) + .option(ChannelOption.SO_RCVBUF, RECEIVE_BUFFER_SIZE) + .option(ChannelOption.SO_REUSEADDR, true) + .handler(new ChannelInitializer() { + @Override protected void initChannel(NioDatagramChannel ch) + throws Exception { + ChannelPipeline p = ch.pipeline(); + p.addLast( + RpcUtil.STAGE_RPC_MESSAGE_PARSER, + rpcProgram, + RpcUtil.STAGE_RPC_UDP_RESPONSE); + } + }); // Listen to the UDP port - ch = server.bind(new InetSocketAddress(port)); - InetSocketAddress socketAddr = (InetSocketAddress) ch.getLocalAddress(); + ChannelFuture f = server.bind(new InetSocketAddress(port)).sync(); + ch = f.channel(); + InetSocketAddress socketAddr = (InetSocketAddress) ch.localAddress(); boundPort = socketAddr.getPort(); LOG.info("Started listening to UDP requests at port " + boundPort + " for " @@ -83,9 +96,11 @@ public class SimpleUdpServer { public void shutdown() { if (ch != null) { ch.close().awaitUninterruptibly(); + ch = null; } - if (server != null) { - server.releaseExternalResources(); + if (workerGroup != null) { + workerGroup.shutdownGracefully(); + workerGroup = null; } } } diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java index 419eff831f0..6000fd57a1b 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java @@ -20,8 +20,8 @@ package org.apache.hadoop.oncrpc; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffers; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; @@ -242,7 +242,7 @@ public final class XDR { * @param last specifies last request or not * @return TCP buffer */ - public static ChannelBuffer writeMessageTcp(XDR request, boolean last) { + public static ByteBuf writeMessageTcp(XDR request, boolean last) { Preconditions.checkState(request.state == XDR.State.WRITING); ByteBuffer b = request.buf.duplicate(); b.flip(); @@ -250,7 +250,7 @@ public final class XDR { ByteBuffer headerBuf = ByteBuffer.wrap(fragmentHeader); // TODO: Investigate whether making a copy of the buffer is necessary. - return ChannelBuffers.copiedBuffer(headerBuf, b); + return Unpooled.wrappedBuffer(headerBuf, b); } /** @@ -258,10 +258,10 @@ public final class XDR { * @param response XDR response * @return UDP buffer */ - public static ChannelBuffer writeMessageUdp(XDR response) { + public static ByteBuf writeMessageUdp(XDR response) { Preconditions.checkState(response.state == XDR.State.READING); // TODO: Investigate whether making a copy of the buffer is necessary. - return ChannelBuffers.copiedBuffer(response.buf); + return Unpooled.copiedBuffer(response.buf); } public static int fragmentSize(byte[] mark) { diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java index 80f43828ea8..1a8a305436c 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java @@ -22,21 +22,27 @@ import java.net.SocketAddress; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioDatagramChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import io.netty.handler.timeout.IdleStateHandler; +import io.netty.util.concurrent.GlobalEventExecutor; import org.apache.hadoop.oncrpc.RpcProgram; import org.apache.hadoop.oncrpc.RpcUtil; import org.apache.hadoop.util.StringUtils; -import org.jboss.netty.bootstrap.ConnectionlessBootstrap; -import org.jboss.netty.bootstrap.ServerBootstrap; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.group.ChannelGroup; -import org.jboss.netty.channel.group.DefaultChannelGroup; -import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory; -import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; -import org.jboss.netty.handler.timeout.IdleStateHandler; -import org.jboss.netty.util.HashedWheelTimer; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; @@ -49,11 +55,17 @@ final class Portmap { private static final Logger LOG = LoggerFactory.getLogger(Portmap.class); private static final int DEFAULT_IDLE_TIME_MILLISECONDS = 5000; - private ConnectionlessBootstrap udpServer; + private Bootstrap udpServer; private ServerBootstrap tcpServer; - private ChannelGroup allChannels = new DefaultChannelGroup(); + private ChannelGroup allChannels = new DefaultChannelGroup( + GlobalEventExecutor.INSTANCE); private Channel udpChannel; private Channel tcpChannel; + + EventLoopGroup bossGroup; + EventLoopGroup workerGroup; + EventLoopGroup udpGroup; + private final RpcProgramPortmap handler = new RpcProgramPortmap(allChannels); public static void main(String[] args) { @@ -73,18 +85,19 @@ final class Portmap { void shutdown() { allChannels.close().awaitUninterruptibly(); - tcpServer.releaseExternalResources(); - udpServer.releaseExternalResources(); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + udpGroup.shutdownGracefully(); } @VisibleForTesting SocketAddress getTcpServerLocalAddress() { - return tcpChannel.getLocalAddress(); + return tcpChannel.localAddress(); } @VisibleForTesting SocketAddress getUdpServerLoAddress() { - return udpChannel.getLocalAddress(); + return udpChannel.localAddress(); } @VisibleForTesting @@ -93,38 +106,55 @@ final class Portmap { } void start(final int idleTimeMilliSeconds, final SocketAddress tcpAddress, - final SocketAddress udpAddress) { + final SocketAddress udpAddress) throws InterruptedException { - tcpServer = new ServerBootstrap(new NioServerSocketChannelFactory( - Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); - tcpServer.setPipelineFactory(new ChannelPipelineFactory() { - private final HashedWheelTimer timer = new HashedWheelTimer(); - private final IdleStateHandler idleStateHandler = new IdleStateHandler( - timer, 0, 0, idleTimeMilliSeconds, TimeUnit.MILLISECONDS); + bossGroup = new NioEventLoopGroup(); + workerGroup = new NioEventLoopGroup(0, Executors.newCachedThreadPool()); - @Override - public ChannelPipeline getPipeline() throws Exception { - return Channels.pipeline(RpcUtil.constructRpcFrameDecoder(), - RpcUtil.STAGE_RPC_MESSAGE_PARSER, idleStateHandler, handler, - RpcUtil.STAGE_RPC_TCP_RESPONSE); - } - }); - tcpServer.setOption("reuseAddress", true); - tcpServer.setOption("child.reuseAddress", true); + tcpServer = new ServerBootstrap(); + tcpServer.group(bossGroup, workerGroup) + .option(ChannelOption.SO_REUSEADDR, true) + .childOption(ChannelOption.SO_REUSEADDR, true) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + private final IdleStateHandler idleStateHandler = new IdleStateHandler( + 0, 0, idleTimeMilliSeconds, TimeUnit.MILLISECONDS); - udpServer = new ConnectionlessBootstrap(new NioDatagramChannelFactory( - Executors.newCachedThreadPool())); + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); - udpServer.setPipeline(Channels.pipeline(RpcUtil.STAGE_RPC_MESSAGE_PARSER, - handler, RpcUtil.STAGE_RPC_UDP_RESPONSE)); - udpServer.setOption("reuseAddress", true); + p.addLast(RpcUtil.constructRpcFrameDecoder(), + RpcUtil.STAGE_RPC_MESSAGE_PARSER, idleStateHandler, handler, + RpcUtil.STAGE_RPC_TCP_RESPONSE); + }}); + + udpGroup = new NioEventLoopGroup(0, Executors.newCachedThreadPool()); + + udpServer = new Bootstrap(); + udpServer.group(udpGroup) + .channel(NioDatagramChannel.class) + .handler(new ChannelInitializer() { + @Override protected void initChannel(NioDatagramChannel ch) + throws Exception { + ChannelPipeline p = ch.pipeline(); + p.addLast( + new LoggingHandler(LogLevel.DEBUG), + RpcUtil.STAGE_RPC_MESSAGE_PARSER, handler, RpcUtil.STAGE_RPC_UDP_RESPONSE); + } + }) + .option(ChannelOption.SO_REUSEADDR, true); + + ChannelFuture tcpChannelFuture = null; + tcpChannelFuture = tcpServer.bind(tcpAddress); + ChannelFuture udpChannelFuture = udpServer.bind(udpAddress); + tcpChannel = tcpChannelFuture.sync().channel(); + udpChannel = udpChannelFuture.sync().channel(); - tcpChannel = tcpServer.bind(tcpAddress); - udpChannel = udpServer.bind(udpAddress); allChannels.add(tcpChannel); allChannels.add(udpChannel); - LOG.info("Portmap server started at tcp://" + tcpChannel.getLocalAddress() - + ", udp://" + udpChannel.getLocalAddress()); + LOG.info("Portmap server started at tcp://" + tcpChannel.localAddress() + + ", udp://" + udpChannel.localAddress()); } } diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java index 0bc380f614c..7b33a644fbe 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java @@ -19,6 +19,14 @@ package org.apache.hadoop.portmap; import java.util.concurrent.ConcurrentHashMap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.group.ChannelGroup; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.handler.timeout.IdleStateHandler; import org.apache.hadoop.oncrpc.RpcAcceptedReply; import org.apache.hadoop.oncrpc.RpcCall; import org.apache.hadoop.oncrpc.RpcInfo; @@ -27,20 +35,12 @@ import org.apache.hadoop.oncrpc.RpcResponse; import org.apache.hadoop.oncrpc.RpcUtil; import org.apache.hadoop.oncrpc.XDR; import org.apache.hadoop.oncrpc.security.VerifierNone; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.ExceptionEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.group.ChannelGroup; -import org.jboss.netty.handler.timeout.IdleState; -import org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler; -import org.jboss.netty.handler.timeout.IdleStateEvent; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -final class RpcProgramPortmap extends IdleStateAwareChannelUpstreamHandler { +@ChannelHandler.Sharable +final class RpcProgramPortmap extends IdleStateHandler { static final int PROGRAM = 100000; static final int VERSION = 2; @@ -60,6 +60,8 @@ final class RpcProgramPortmap extends IdleStateAwareChannelUpstreamHandler { private final ChannelGroup allChannels; RpcProgramPortmap(ChannelGroup allChannels) { + super(1, 1, 1); + // FIXME: set default idle timeout 1 second. this.allChannels = allChannels; PortmapMapping m = new PortmapMapping(PROGRAM, VERSION, PortmapMapping.TRANSPORT_TCP, RpcProgram.RPCB_PORT); @@ -151,14 +153,14 @@ final class RpcProgramPortmap extends IdleStateAwareChannelUpstreamHandler { } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - RpcInfo info = (RpcInfo) e.getMessage(); + RpcInfo info = (RpcInfo) msg; RpcCall rpcCall = (RpcCall) info.header(); final int portmapProc = rpcCall.getProcedure(); int xid = rpcCall.getXid(); - XDR in = new XDR(info.data().toByteBuffer().asReadOnlyBuffer(), + XDR in = new XDR(info.data().nioBuffer().asReadOnlyBuffer(), XDR.State.READING); XDR out = new XDR(); @@ -181,29 +183,29 @@ final class RpcProgramPortmap extends IdleStateAwareChannelUpstreamHandler { reply.write(out); } - ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap() + ByteBuf buf = Unpooled.wrappedBuffer(out.asReadOnlyWrap() .buffer()); RpcResponse rsp = new RpcResponse(buf, info.remoteAddress()); RpcUtil.sendRpcResponse(ctx, rsp); } @Override - public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) + public void channelActive(ChannelHandlerContext ctx) throws Exception { - allChannels.add(e.getChannel()); + allChannels.add(ctx.channel()); } @Override public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) throws Exception { - if (e.getState() == IdleState.ALL_IDLE) { - e.getChannel().close(); + if (e.state() == IdleState.ALL_IDLE) { + ctx.channel().close(); } } @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { - LOG.warn("Encountered ", e.getCause()); - e.getChannel().close(); + public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) { + LOG.warn("Encountered ", t); + ctx.channel().close(); } } diff --git a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java index 0e416b3738d..6d103fdd781 100644 --- a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java +++ b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java @@ -22,19 +22,19 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; import java.util.Random; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelException; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; import org.apache.hadoop.oncrpc.RpcUtil.RpcFrameDecoder; import org.apache.hadoop.oncrpc.security.CredentialsNone; import org.apache.hadoop.oncrpc.security.VerifierNone; import org.apache.hadoop.test.GenericTestUtils; -import org.jboss.netty.buffer.ByteBufferBackedChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelException; -import org.jboss.netty.channel.ChannelHandlerContext; import org.junit.Test; import org.mockito.Mockito; import org.slf4j.event.Level; @@ -55,6 +55,7 @@ public class TestFrameDecoder { tcpClient.run(); } + @ChannelHandler.Sharable static class TestRpcProgram extends RpcProgram { protected TestRpcProgram(String program, String host, int port, @@ -83,7 +84,7 @@ public class TestFrameDecoder { new VerifierNone()); XDR out = new XDR(); reply.write(out); - ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer()); + ByteBuf b = Unpooled.wrappedBuffer(out.asReadOnlyWrap().buffer()); RpcResponse rsp = new RpcResponse(b, info.remoteAddress()); RpcUtil.sendRpcResponse(ctx, rsp); } @@ -99,13 +100,14 @@ public class TestFrameDecoder { RpcFrameDecoder decoder = new RpcFrameDecoder(); // Test "Length field is not received yet" - ByteBuffer buffer = ByteBuffer.allocate(1); - ChannelBuffer buf = new ByteBufferBackedChannelBuffer(buffer); - ChannelBuffer channelBuffer = (ChannelBuffer) decoder.decode( - Mockito.mock(ChannelHandlerContext.class), Mockito.mock(Channel.class), - buf); - assertTrue(channelBuffer == null); + ByteBuf buf = Unpooled.directBuffer(1); + List outputBufs = new ArrayList<>(); + decoder.decode( + Mockito.mock(ChannelHandlerContext.class), buf, + outputBufs); + assertTrue(outputBufs.isEmpty()); + decoder = new RpcFrameDecoder(); // Test all bytes are not received yet byte[] fragment = new byte[4 + 9]; fragment[0] = (byte) (1 << 7); // final fragment @@ -114,15 +116,16 @@ public class TestFrameDecoder { fragment[3] = (byte) 10; // fragment size = 10 bytes assertTrue(XDR.isLastFragment(fragment)); assertTrue(XDR.fragmentSize(fragment)==10); + buf.release(); - buffer = ByteBuffer.allocate(4 + 9); - buffer.put(fragment); - buffer.flip(); - buf = new ByteBufferBackedChannelBuffer(buffer); - channelBuffer = (ChannelBuffer) decoder.decode( - Mockito.mock(ChannelHandlerContext.class), Mockito.mock(Channel.class), - buf); - assertTrue(channelBuffer == null); + buf = Unpooled.directBuffer(4 + 9); + buf.writeBytes(fragment); + outputBufs = new ArrayList<>(); + decoder.decode( + Mockito.mock(ChannelHandlerContext.class), buf, + outputBufs); + assertTrue(decoder.isLast()); + buf.release(); } @Test @@ -137,16 +140,15 @@ public class TestFrameDecoder { fragment1[3] = (byte) 10; // fragment size = 10 bytes assertFalse(XDR.isLastFragment(fragment1)); assertTrue(XDR.fragmentSize(fragment1)==10); + + List outputBufs = new ArrayList<>(); // decoder should wait for the final fragment - ByteBuffer buffer = ByteBuffer.allocate(4 + 10); - buffer.put(fragment1); - buffer.flip(); - ChannelBuffer buf = new ByteBufferBackedChannelBuffer(buffer); - ChannelBuffer channelBuffer = (ChannelBuffer) decoder.decode( - Mockito.mock(ChannelHandlerContext.class), Mockito.mock(Channel.class), - buf); - assertTrue(channelBuffer == null); + ByteBuf buf = Unpooled.directBuffer(4 + 10, 4 + 10); + buf.writeBytes(fragment1); + decoder.decode( + Mockito.mock(ChannelHandlerContext.class), buf, + outputBufs); byte[] fragment2 = new byte[4 + 10]; fragment2[0] = (byte) (1 << 7); // final fragment @@ -155,21 +157,22 @@ public class TestFrameDecoder { fragment2[3] = (byte) 10; // fragment size = 10 bytes assertTrue(XDR.isLastFragment(fragment2)); assertTrue(XDR.fragmentSize(fragment2)==10); + buf.release(); - buffer = ByteBuffer.allocate(4 + 10); - buffer.put(fragment2); - buffer.flip(); - buf = new ByteBufferBackedChannelBuffer(buffer); - channelBuffer = (ChannelBuffer) decoder.decode( - Mockito.mock(ChannelHandlerContext.class), Mockito.mock(Channel.class), - buf); - assertTrue(channelBuffer != null); - // Complete frame should have to total size 10+10=20 - assertEquals(20, channelBuffer.readableBytes()); + buf = Unpooled.directBuffer(4 + 10, 4 + 10); + buf.writeBytes(fragment2); + decoder.decode( + Mockito.mock(ChannelHandlerContext.class), buf, + outputBufs); + // Expect two completed frames each 10 bytes + decoder.isLast(); + assertEquals(2, outputBufs.size()); + outputBufs.forEach(b -> assertEquals(((ByteBuf)b).readableBytes(), 10)); + buf.release(); } @Test - public void testFrames() { + public void testFrames() throws InterruptedException { int serverPort = startRpcServer(true); XDR xdrOut = createGetportMount(); @@ -187,7 +190,7 @@ public class TestFrameDecoder { } @Test - public void testUnprivilegedPort() { + public void testUnprivilegedPort() throws InterruptedException { // Don't allow connections from unprivileged ports. Given that this test is // presumably not being run by root, this will be the case. int serverPort = startRpcServer(false); @@ -218,23 +221,28 @@ public class TestFrameDecoder { assertEquals(requestSize, resultSize); } - private static int startRpcServer(boolean allowInsecurePorts) { + private static int startRpcServer(boolean allowInsecurePorts) + throws InterruptedException { Random rand = new Random(); int serverPort = 30000 + rand.nextInt(10000); int retries = 10; // A few retries in case initial choice is in use. while (true) { + SimpleTcpServer tcpServer = null; try { RpcProgram program = new TestFrameDecoder.TestRpcProgram("TestRpcProgram", "localhost", serverPort, 100000, 1, 2, allowInsecurePorts); - SimpleTcpServer tcpServer = new SimpleTcpServer(serverPort, program, 1); + tcpServer = new SimpleTcpServer(serverPort, program, 1); tcpServer.run(); break; // Successfully bound a port, break out. - } catch (ChannelException ce) { + } catch (InterruptedException | ChannelException e) { + if (tcpServer != null) { + tcpServer.shutdown(); + } if (retries-- > 0) { serverPort += rand.nextInt(20); // Port in use? Try another. } else { - throw ce; // Out of retries. + throw e; // Out of retries. } } } diff --git a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java index 6941c4a04e9..8ebf9d03c6c 100644 --- a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java +++ b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java @@ -43,7 +43,7 @@ public class TestPortmap { private int xid; @BeforeClass - public static void setup() { + public static void setup() throws InterruptedException { pm.start(SHORT_TIMEOUT_MILLISECONDS, new InetSocketAddress("localhost", 0), new InetSocketAddress("localhost", 0)); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml index e24080f9873..03ac256ace8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml @@ -47,7 +47,7 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd"> io.netty - netty + netty-all compile diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java index 3b0327ad4a1..2ba1bb060ce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java @@ -26,6 +26,10 @@ import java.util.Collections; import java.util.List; import java.util.HashMap; +import io.netty.channel.ChannelHandler; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileSystem; @@ -51,15 +55,13 @@ import org.apache.hadoop.oncrpc.XDR; import org.apache.hadoop.oncrpc.security.VerifierNone; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.ChannelHandlerContext; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; /** * RPC program corresponding to mountd daemon. See {@link Mountd}. */ +@ChannelHandler.Sharable public class RpcProgramMountd extends RpcProgram implements MountInterface { private static final Logger LOG = LoggerFactory.getLogger(RpcProgramMountd.class); @@ -262,8 +264,8 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface { RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write( out); } - ChannelBuffer buf = - ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer()); + ByteBuf buf = + Unpooled.wrappedBuffer(out.asReadOnlyWrap().buffer()); RpcResponse rsp = new RpcResponse(buf, info.remoteAddress()); RpcUtil.sendRpcResponse(ctx, rsp); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java index c6da1981f37..c58dc5976b3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java @@ -22,6 +22,8 @@ import java.net.InetSocketAddress; import java.net.URI; import java.nio.file.FileSystemException; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsConstants; @@ -39,8 +41,6 @@ import org.apache.hadoop.nfs.nfs3.response.WccAttr; import org.apache.hadoop.nfs.nfs3.response.WccData; import org.apache.hadoop.oncrpc.XDR; import org.apache.hadoop.security.IdMappingServiceProvider; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.channel.Channel; /** * Utility/helper methods related to NFS @@ -147,16 +147,16 @@ public class Nfs3Utils { if (RpcProgramNfs3.LOG.isDebugEnabled()) { RpcProgramNfs3.LOG.debug(WRITE_RPC_END + xid); } - ChannelBuffer outBuf = XDR.writeMessageTcp(out, true); - channel.write(outBuf); + ByteBuf outBuf = XDR.writeMessageTcp(out, true); + channel.writeAndFlush(outBuf); } public static void writeChannelCommit(Channel channel, XDR out, int xid) { if (RpcProgramNfs3.LOG.isDebugEnabled()) { RpcProgramNfs3.LOG.debug("Commit done:" + xid); } - ChannelBuffer outBuf = XDR.writeMessageTcp(out, true); - channel.write(outBuf); + ByteBuf outBuf = XDR.writeMessageTcp(out, true); + channel.writeAndFlush(outBuf); } private static boolean isSet(int access, int bits) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java index 528ead7a003..8358c056cac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java @@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicLong; +import io.netty.channel.Channel; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; @@ -55,7 +56,6 @@ import org.apache.hadoop.oncrpc.security.VerifierNone; import org.apache.hadoop.security.IdMappingServiceProvider; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Time; -import org.jboss.netty.channel.Channel; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java index d436eac598b..f6cb4350e40 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java @@ -28,6 +28,11 @@ import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.EnumSet; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.DirectoryListingStartAfterNotFoundException; @@ -129,10 +134,6 @@ import org.apache.hadoop.security.ShellBasedIdMapping; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.util.JvmPauseMonitor; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; @@ -141,6 +142,7 @@ import org.slf4j.LoggerFactory; /** * RPC program corresponding to nfs daemon. See {@link Nfs3}. */ +@ChannelHandler.Sharable public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { public static final int DEFAULT_UMASK = 0022; public static final FsPermission umask = new FsPermission( @@ -2180,7 +2182,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { RpcDeniedReply.RejectState.AUTH_ERROR, new VerifierNone()); rdr.write(reply); - ChannelBuffer buf = ChannelBuffers.wrappedBuffer(reply.asReadOnlyWrap() + ByteBuf buf = Unpooled.wrappedBuffer(reply.asReadOnlyWrap() .buffer()); RpcResponse rsp = new RpcResponse(buf, info.remoteAddress()); RpcUtil.sendRpcResponse(ctx, rsp); @@ -2291,7 +2293,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { } // TODO: currently we just return VerifierNone out = response.serialize(out, xid, new VerifierNone()); - ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap() + ByteBuf buf = Unpooled.wrappedBuffer(out.asReadOnlyWrap() .buffer()); RpcResponse rsp = new RpcResponse(buf, info.remoteAddress()); diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java index 76859247bf2..d5c9d4f5592 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java @@ -22,12 +22,12 @@ import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; +import io.netty.channel.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.nfs.nfs3.FileHandle; import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow; -import org.jboss.netty.channel.Channel; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java index 28893710408..a1b6e12eebf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.nfs.nfs3; import java.io.IOException; import java.util.EnumSet; +import io.netty.channel.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -43,7 +44,6 @@ import org.apache.hadoop.nfs.nfs3.response.WccData; import org.apache.hadoop.oncrpc.XDR; import org.apache.hadoop.oncrpc.security.VerifierNone; import org.apache.hadoop.security.IdMappingServiceProvider; -import org.jboss.netty.channel.Channel; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java index 4e53c72bec8..31528a2db87 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java @@ -21,6 +21,12 @@ package org.apache.hadoop.hdfs.nfs; import java.nio.ByteBuffer; import java.util.Arrays; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys; @@ -42,13 +48,6 @@ import org.apache.hadoop.oncrpc.SimpleTcpClientHandler; import org.apache.hadoop.oncrpc.XDR; import org.apache.hadoop.oncrpc.security.CredentialsNone; import org.apache.hadoop.oncrpc.security.VerifierNone; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.MessageEvent; public class TestOutOfOrderWrite { public final static Logger LOG = @@ -100,9 +99,9 @@ public class TestOutOfOrderWrite { } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { + public void channelRead(ChannelHandlerContext ctx, Object msg) { // Get handle from create response - ChannelBuffer buf = (ChannelBuffer) e.getMessage(); + ByteBuf buf = (ByteBuf) msg; XDR rsp = new XDR(buf.array()); if (rsp.getBytes().length == 0) { LOG.info("rsp length is zero, why?"); @@ -125,7 +124,7 @@ public class TestOutOfOrderWrite { rsp.readBoolean(); // value follow handle = new FileHandle(); handle.deserialize(rsp); - channel = e.getChannel(); + channel = ctx.channel(); } } @@ -136,16 +135,17 @@ public class TestOutOfOrderWrite { } @Override - protected ChannelPipelineFactory setPipelineFactory() { - this.pipelineFactory = new ChannelPipelineFactory() { + protected ChannelInitializer setChannelHandler() { + return new ChannelInitializer() { @Override - public ChannelPipeline getPipeline() { - return Channels.pipeline( + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + p.addLast( RpcUtil.constructRpcFrameDecoder(), - new WriteHandler(request)); + new WriteHandler(request) + ); } }; - return this.pipelineFactory; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java index 30ecc0b824b..07954c00d64 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java @@ -28,6 +28,7 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.EnumSet; +import io.netty.channel.Channel; import org.apache.hadoop.crypto.key.JavaKeyStoreProvider; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -92,7 +93,6 @@ import org.apache.hadoop.oncrpc.security.SecurityHandler; import org.apache.hadoop.security.IdMappingConstant; import org.apache.hadoop.security.authorize.DefaultImpersonationProvider; import org.apache.hadoop.security.authorize.ProxyUsers; -import org.jboss.netty.channel.Channel; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java index f7a92fac535..0f03c6da93b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java @@ -27,6 +27,7 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.concurrent.ConcurrentNavigableMap; +import io.netty.channel.Channel; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -52,7 +53,6 @@ import org.apache.hadoop.oncrpc.security.SecurityHandler; import org.apache.hadoop.security.ShellBasedIdMapping; import org.apache.hadoop.security.authorize.DefaultImpersonationProvider; import org.apache.hadoop.security.authorize.ProxyUsers; -import org.jboss.netty.channel.Channel; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito;