diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java index 66afbb0d76e..a519ddd8416 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java @@ -22,13 +22,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mount.MountdBase; import org.apache.hadoop.oncrpc.RpcProgram; -import org.apache.hadoop.oncrpc.RpcUtil; import org.apache.hadoop.oncrpc.SimpleTcpServer; -import org.apache.hadoop.oncrpc.SimpleTcpServerHandler; import org.apache.hadoop.portmap.PortmapMapping; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.Channels; /** * Nfs server. Supports NFS v3 using {@link RpcProgram}. @@ -72,19 +67,7 @@ public abstract class Nfs3Base { private void startTCPServer() { SimpleTcpServer tcpServer = new SimpleTcpServer(nfsPort, - rpcProgram, 0) { - @Override - public ChannelPipelineFactory getPipelineFactory() { - return new ChannelPipelineFactory() { - @Override - public ChannelPipeline getPipeline() { - return Channels.pipeline( - RpcUtil.constructRpcFrameDecoder(), - new SimpleTcpServerHandler(rpcProgram)); - } - }; - } - }; + rpcProgram, 0); tcpServer.run(); } } diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java index 0862d4fb4ea..0c857be9bb0 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java @@ -44,7 +44,7 @@ import com.google.common.annotations.VisibleForTesting; public class RpcCallCache { public static class CacheEntry { - private XDR response; // null if no response has been sent + private RpcResponse response; // null if no response has been sent public CacheEntry() { response = null; @@ -58,11 +58,11 @@ public class RpcCallCache { return response != null; } - public XDR getResponse() { + public RpcResponse getResponse() { return response; } - public void setResponse(XDR response) { + public void setResponse(RpcResponse response) { this.response = response; } } @@ -128,13 +128,13 @@ public class RpcCallCache { } /** Mark a request as completed and add corresponding response to the cache */ - public void callCompleted(InetAddress clientId, int xid, XDR response) { + public void callCompleted(InetAddress clientId, int xid, RpcResponse response) { ClientRequest req = new ClientRequest(clientId, xid); CacheEntry e; synchronized(map) { e = map.get(req); } - e.setResponse(response); + e.response = response; } /** diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcInfo.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcInfo.java new file mode 100644 index 00000000000..b434d79285c --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcInfo.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.oncrpc; + +import java.net.SocketAddress; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelHandlerContext; + +/** + * RpcInfo records all contextual information of an RPC message. It contains + * the RPC header, the parameters, and the information of the remote peer. + */ +public final class RpcInfo { + private final RpcMessage header; + private final ChannelBuffer data; + private final Channel channel; + private final SocketAddress remoteAddress; + + public RpcInfo(RpcMessage header, ChannelBuffer data, + ChannelHandlerContext channelContext, Channel channel, + SocketAddress remoteAddress) { + this.header = header; + this.data = data; + this.channel = channel; + this.remoteAddress = remoteAddress; + } + + public RpcMessage header() { + return header; + } + + public ChannelBuffer data() { + return data; + } + + public Channel channel() { + return channel; + } + + public SocketAddress remoteAddress() { + return remoteAddress; + } +} diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java index d457b3aaa91..36348980056 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java @@ -18,22 +18,24 @@ package org.apache.hadoop.oncrpc; import java.io.IOException; -import java.net.InetAddress; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState; -import org.apache.hadoop.oncrpc.RpcCallCache.CacheEntry; -import org.apache.hadoop.oncrpc.security.VerifierNone; +import org.apache.hadoop.oncrpc.security.Verifier; import org.apache.hadoop.portmap.PortmapMapping; import org.apache.hadoop.portmap.PortmapRequest; -import org.jboss.netty.channel.Channel; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; /** * Class for writing RPC server programs based on RFC 1050. Extend this class * and implement {@link #handleInternal} to handle the requests received. */ -public abstract class RpcProgram { +public abstract class RpcProgram extends SimpleChannelUpstreamHandler { private static final Log LOG = LogFactory.getLog(RpcProgram.class); public static final int RPCB_PORT = 111; private final String program; @@ -42,7 +44,6 @@ public abstract class RpcProgram { private final int progNumber; private final int lowProgVersion; private final int highProgVersion; - private final RpcCallCache rpcCallCache; /** * Constructor @@ -53,19 +54,15 @@ public abstract class RpcProgram { * @param progNumber program number as defined in RFC 1050 * @param lowProgVersion lowest version of the specification supported * @param highProgVersion highest version of the specification supported - * @param cacheSize size of cache to handle duplciate requests. Size <= 0 - * indicates no cache. */ protected RpcProgram(String program, String host, int port, int progNumber, - int lowProgVersion, int highProgVersion, int cacheSize) { + int lowProgVersion, int highProgVersion) { this.program = program; this.host = host; this.port = port; this.progNumber = progNumber; this.lowProgVersion = lowProgVersion; this.highProgVersion = highProgVersion; - this.rpcCallCache = cacheSize > 0 ? new RpcCallCache(program, cacheSize) - : null; } /** @@ -103,92 +100,50 @@ public abstract class RpcProgram { } } - /** - * Handle an RPC request. - * @param rpcCall RPC call that is received - * @param in xdr with cursor at reading the remaining bytes of a method call - * @param out xdr output corresponding to Rpc reply - * @param client making the Rpc request - * @param channel connection over which Rpc request is received - * @return response xdr response - */ - protected abstract XDR handleInternal(RpcCall rpcCall, XDR in, XDR out, - InetAddress client, Channel channel); - - public XDR handle(XDR xdr, InetAddress client, Channel channel) { - XDR out = new XDR(); - RpcCall rpcCall = RpcCall.read(xdr); - if (LOG.isDebugEnabled()) { - LOG.debug(program + " procedure #" + rpcCall.getProcedure()); + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) + throws Exception { + RpcInfo info = (RpcInfo) e.getMessage(); + RpcCall call = (RpcCall) info.header(); + if (LOG.isTraceEnabled()) { + LOG.trace(program + " procedure #" + call.getProcedure()); } - if (!checkProgram(rpcCall.getProgram())) { - return programMismatch(out, rpcCall); + if (this.progNumber != call.getProgram()) { + LOG.warn("Invalid RPC call program " + call.getProgram()); + RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(), + AcceptState.PROG_UNAVAIL, Verifier.VERIFIER_NONE); + + XDR out = new XDR(); + reply.write(out); + ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap() + .buffer()); + RpcResponse rsp = new RpcResponse(b, info.remoteAddress()); + RpcUtil.sendRpcResponse(ctx, rsp); + return; } - if (!checkProgramVersion(rpcCall.getVersion())) { - return programVersionMismatch(out, rpcCall); + int ver = call.getVersion(); + if (ver < lowProgVersion || ver > highProgVersion) { + LOG.warn("Invalid RPC call version " + ver); + RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(), + AcceptState.PROG_MISMATCH, Verifier.VERIFIER_NONE); + + XDR out = new XDR(); + reply.write(out); + out.writeInt(lowProgVersion); + out.writeInt(highProgVersion); + ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap() + .buffer()); + RpcResponse rsp = new RpcResponse(b, info.remoteAddress()); + RpcUtil.sendRpcResponse(ctx, rsp); + return; } - // Check for duplicate requests in the cache for non-idempotent requests - boolean idempotent = rpcCallCache != null && !isIdempotent(rpcCall); - if (idempotent) { - CacheEntry entry = rpcCallCache.checkOrAddToCache(client, rpcCall.getXid()); - if (entry != null) { // in ache - if (entry.isCompleted()) { - LOG.info("Sending the cached reply to retransmitted request " - + rpcCall.getXid()); - return entry.getResponse(); - } else { // else request is in progress - LOG.info("Retransmitted request, transaction still in progress " - + rpcCall.getXid()); - // TODO: ignore the request? - } - } - } - - XDR response = handleInternal(rpcCall, xdr, out, client, channel); - if (response.size() == 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("No sync response, expect an async response for request XID=" - + rpcCall.getXid()); - } - } - - // Add the request to the cache - if (idempotent) { - rpcCallCache.callCompleted(client, rpcCall.getXid(), response); - } - return response; - } - - private XDR programMismatch(XDR out, RpcCall call) { - LOG.warn("Invalid RPC call program " + call.getProgram()); - RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(), - AcceptState.PROG_UNAVAIL, new VerifierNone()); - reply.write(out); - return out; - } - - private XDR programVersionMismatch(XDR out, RpcCall call) { - LOG.warn("Invalid RPC call version " + call.getVersion()); - RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(), - AcceptState.PROG_MISMATCH, new VerifierNone()); - reply.write(out); - out.writeInt(lowProgVersion); - out.writeInt(highProgVersion); - return out; - } - - private boolean checkProgram(int progNumber) { - return this.progNumber == progNumber; - } - - /** Return true if a the program version in rpcCall is supported */ - private boolean checkProgramVersion(int programVersion) { - return programVersion >= lowProgVersion - && programVersion <= highProgVersion; + handleInternal(ctx, info); } + + protected abstract void handleInternal(ChannelHandlerContext ctx, RpcInfo info); @Override public String toString() { diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcResponse.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcResponse.java new file mode 100644 index 00000000000..2e45e6100b1 --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcResponse.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.oncrpc; + +import java.net.SocketAddress; + +import org.jboss.netty.buffer.ChannelBuffer; + +/** + * RpcResponse encapsulates a response to a RPC request. It contains the data + * that is going to cross the wire, as well as the information of the remote + * peer. + */ +public class RpcResponse { + private final ChannelBuffer data; + private final SocketAddress remoteAddress; + + public RpcResponse(ChannelBuffer data, SocketAddress remoteAddress) { + this.data = data; + this.remoteAddress = remoteAddress; + } + + public ChannelBuffer data() { + return data; + } + + public SocketAddress remoteAddress() { + return remoteAddress; + } +} diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java index 04ebbbc39bc..e9878b7959c 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java @@ -17,17 +17,23 @@ */ package org.apache.hadoop.oncrpc; +import java.nio.ByteBuffer; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; import org.jboss.netty.handler.codec.frame.FrameDecoder; -public class RpcUtil { +public final class RpcUtil { /** - * The XID in RPC call. It is used for starting with new seed after each reboot. + * The XID in RPC call. It is used for starting with new seed after each + * reboot. */ private static int xid = (int) (System.currentTimeMillis() / 1000) << 12; @@ -35,10 +41,27 @@ public class RpcUtil { return xid = ++xid + caller.hashCode(); } + public static void sendRpcResponse(ChannelHandlerContext ctx, + RpcResponse response) { + Channels.fireMessageReceived(ctx, response); + } + public static FrameDecoder constructRpcFrameDecoder() { return new RpcFrameDecoder(); } + public static final SimpleChannelUpstreamHandler STAGE_RPC_MESSAGE_PARSER = new RpcMessageParserStage(); + public static final SimpleChannelUpstreamHandler STAGE_RPC_TCP_RESPONSE = new RpcTcpResponseStage(); + public static final SimpleChannelUpstreamHandler STAGE_RPC_UDP_RESPONSE = new RpcUdpResponseStage(); + + /** + * An RPC client can separate a RPC message into several frames (i.e., + * fragments) when transferring it across the wire. RpcFrameDecoder + * reconstructs a full RPC message from these fragments. + * + * RpcFrameDecoder is a stateful pipeline stage. It has to be constructed for + * each RPC client. + */ static class RpcFrameDecoder extends FrameDecoder { public static final Log LOG = LogFactory.getLog(RpcFrameDecoder.class); private ChannelBuffer currentFrame; @@ -78,4 +101,68 @@ public class RpcUtil { } } } + + /** + * RpcMessageParserStage parses the network bytes and encapsulates the RPC + * request into a RpcInfo instance. + */ + static final class RpcMessageParserStage extends SimpleChannelUpstreamHandler { + private static final Log LOG = LogFactory + .getLog(RpcMessageParserStage.class); + + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) + throws Exception { + ChannelBuffer buf = (ChannelBuffer) e.getMessage(); + ByteBuffer b = buf.toByteBuffer().asReadOnlyBuffer(); + XDR in = new XDR(b, XDR.State.READING); + + RpcInfo info = null; + try { + RpcCall callHeader = RpcCall.read(in); + ChannelBuffer dataBuffer = ChannelBuffers.wrappedBuffer(in.buffer() + .slice()); + info = new RpcInfo(callHeader, dataBuffer, ctx, e.getChannel(), + e.getRemoteAddress()); + } catch (Exception exc) { + LOG.info("Malfromed RPC request from " + e.getRemoteAddress()); + } + + if (info != null) { + Channels.fireMessageReceived(ctx, info); + } + } + } + + /** + * RpcTcpResponseStage sends an RpcResponse across the wire with the + * appropriate fragment header. + */ + private static class RpcTcpResponseStage extends SimpleChannelUpstreamHandler { + + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) + throws Exception { + RpcResponse r = (RpcResponse) e.getMessage(); + byte[] fragmentHeader = XDR.recordMark(r.data().readableBytes(), true); + ChannelBuffer header = ChannelBuffers.wrappedBuffer(fragmentHeader); + ChannelBuffer d = ChannelBuffers.wrappedBuffer(header, r.data()); + e.getChannel().write(d); + } + } + + /** + * RpcUdpResponseStage sends an RpcResponse as a UDP packet, which does not + * require a fragment header. + */ + private static final class RpcUdpResponseStage extends + SimpleChannelUpstreamHandler { + + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) + throws Exception { + RpcResponse r = (RpcResponse) e.getMessage(); + e.getChannel().write(r.data(), r.remoteAddress()); + } + } } diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java index 6f668a21065..57ef77a95fe 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java @@ -27,6 +27,7 @@ import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; /** @@ -35,8 +36,7 @@ import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; public class SimpleTcpServer { public static final Log LOG = LogFactory.getLog(SimpleTcpServer.class); protected final int port; - protected final ChannelPipelineFactory pipelineFactory; - protected final RpcProgram rpcProgram; + protected final SimpleChannelUpstreamHandler rpcProgram; /** The maximum number of I/O worker threads */ protected final int workerCount; @@ -50,18 +50,6 @@ public class SimpleTcpServer { this.port = port; this.rpcProgram = program; this.workerCount = workercount; - this.pipelineFactory = getPipelineFactory(); - } - - public ChannelPipelineFactory getPipelineFactory() { - return new ChannelPipelineFactory() { - @Override - public ChannelPipeline getPipeline() { - return Channels.pipeline( - RpcUtil.constructRpcFrameDecoder(), - new SimpleTcpServerHandler(rpcProgram)); - } - }; } public void run() { @@ -78,7 +66,15 @@ public class SimpleTcpServer { } ServerBootstrap bootstrap = new ServerBootstrap(factory); - bootstrap.setPipelineFactory(pipelineFactory); + bootstrap.setPipelineFactory(new ChannelPipelineFactory() { + + @Override + public ChannelPipeline getPipeline() throws Exception { + return Channels.pipeline(RpcUtil.constructRpcFrameDecoder(), + RpcUtil.STAGE_RPC_MESSAGE_PARSER, rpcProgram, + RpcUtil.STAGE_RPC_TCP_RESPONSE); + } + }); bootstrap.setOption("child.tcpNoDelay", true); bootstrap.setOption("child.keepAlive", true); diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java deleted file mode 100644 index 04e2930f60b..00000000000 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.oncrpc; - -import java.net.InetAddress; -import java.net.InetSocketAddress; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ExceptionEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelHandler; - -/** - * Handler used by {@link SimpleTcpServer}. - */ -public class SimpleTcpServerHandler extends SimpleChannelHandler { - public static final Log LOG = LogFactory.getLog(SimpleTcpServerHandler.class); - - protected final RpcProgram rpcProgram; - - public SimpleTcpServerHandler(RpcProgram rpcProgram) { - this.rpcProgram = rpcProgram; - } - - @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { - ChannelBuffer buf = (ChannelBuffer) e.getMessage(); - XDR request = new XDR(buf.toByteBuffer().asReadOnlyBuffer(), XDR.State.READING); - - InetAddress remoteInetAddr = ((InetSocketAddress) ctx.getChannel() - .getRemoteAddress()).getAddress(); - Channel outChannel = e.getChannel(); - XDR response = rpcProgram.handle(request, remoteInetAddr, outChannel); - if (response.size() > 0) { - outChannel.write(XDR.writeMessageTcp(response, true)); - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { - LOG.warn("Encountered ", e.getCause()); - e.getChannel().close(); - } -} \ No newline at end of file diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java index 70bffba66d6..438eebc5378 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java @@ -23,9 +23,8 @@ import java.util.concurrent.Executors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.jboss.netty.bootstrap.ConnectionlessBootstrap; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; import org.jboss.netty.channel.socket.DatagramChannelFactory; import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory; @@ -38,20 +37,13 @@ public class SimpleUdpServer { private final int RECEIVE_BUFFER_SIZE = 65536; protected final int port; - protected final ChannelPipelineFactory pipelineFactory; - protected final RpcProgram rpcProgram; + protected final SimpleChannelUpstreamHandler rpcProgram; protected final int workerCount; - public SimpleUdpServer(int port, RpcProgram program, int workerCount) { + public SimpleUdpServer(int port, SimpleChannelUpstreamHandler program, int workerCount) { this.port = port; this.rpcProgram = program; this.workerCount = workerCount; - this.pipelineFactory = new ChannelPipelineFactory() { - @Override - public ChannelPipeline getPipeline() { - return Channels.pipeline(new SimpleUdpServerHandler(rpcProgram)); - } - }; } public void run() { @@ -60,8 +52,9 @@ public class SimpleUdpServer { Executors.newCachedThreadPool(), workerCount); ConnectionlessBootstrap b = new ConnectionlessBootstrap(f); - ChannelPipeline p = b.getPipeline(); - p.addLast("handler", new SimpleUdpServerHandler(rpcProgram)); + b.setPipeline(Channels.pipeline( + RpcUtil.STAGE_RPC_MESSAGE_PARSER, rpcProgram, + RpcUtil.STAGE_RPC_UDP_RESPONSE)); b.setOption("broadcast", "false"); b.setOption("sendBufferSize", SEND_BUFFER_SIZE); diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java deleted file mode 100644 index 79a255b2616..00000000000 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.oncrpc; - -import java.net.InetAddress; -import java.net.InetSocketAddress; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ExceptionEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelHandler; - -/** - * Handler used by {@link SimpleUdpServer}. - */ -public class SimpleUdpServerHandler extends SimpleChannelHandler { - public static final Log LOG = LogFactory.getLog(SimpleUdpServerHandler.class); - private final RpcProgram rpcProgram; - - public SimpleUdpServerHandler(RpcProgram rpcProgram) { - this.rpcProgram = rpcProgram; - } - - @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { - ChannelBuffer buf = (ChannelBuffer) e.getMessage(); - - XDR request = new XDR(buf.toByteBuffer().asReadOnlyBuffer(), XDR.State.READING); - - InetAddress remoteInetAddr = ((InetSocketAddress) e.getRemoteAddress()) - .getAddress(); - XDR response = rpcProgram.handle(request, remoteInetAddr, null); - - e.getChannel().write(XDR.writeMessageUdp(response.asReadOnlyWrap()), - e.getRemoteAddress()); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { - LOG.warn("Encountered ", e.getCause()); - e.getChannel().close(); - } -} \ No newline at end of file diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java index df2b91f05f4..2fdabe2fda7 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java @@ -93,6 +93,10 @@ public final class XDR { return n; } + public ByteBuffer buffer() { + return buf.duplicate(); + } + public int size() { // TODO: This overloading intends to be compatible with the semantics of // the previous version of the class. This function should be separated into @@ -219,7 +223,7 @@ public final class XDR { return xdr.buf.remaining() >= len; } - private static byte[] recordMark(int size, boolean last) { + static byte[] recordMark(int size, boolean last) { byte[] b = new byte[SIZEOF_INT]; ByteBuffer buf = ByteBuffer.wrap(b); buf.putInt(!last ? size : size | 0x80000000); @@ -259,9 +263,8 @@ public final class XDR { @VisibleForTesting public byte[] getBytes() { - ByteBuffer d = buf.duplicate(); - byte[] b = new byte[d.position()]; - d.flip(); + ByteBuffer d = asReadOnlyWrap().buffer(); + byte[] b = new byte[d.remaining()]; d.get(b); return b; diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/security/Verifier.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/security/Verifier.java index 5184e94f29a..e60db97fd50 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/security/Verifier.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/security/Verifier.java @@ -18,16 +18,17 @@ package org.apache.hadoop.oncrpc.security; import org.apache.hadoop.oncrpc.XDR; -import org.apache.hadoop.oncrpc.security.RpcAuthInfo.AuthFlavor; /** * Base class for verifier. Currently our authentication only supports 3 types - * of auth flavors: {@link AuthFlavor#AUTH_NONE}, {@link AuthFlavor#AUTH_SYS}, - * and {@link AuthFlavor#RPCSEC_GSS}. Thus for verifier we only need to handle + * of auth flavors: {@link RpcAuthInfo.AuthFlavor#AUTH_NONE}, {@link RpcAuthInfo.AuthFlavor#AUTH_SYS}, + * and {@link RpcAuthInfo.AuthFlavor#RPCSEC_GSS}. Thus for verifier we only need to handle * AUTH_NONE and RPCSEC_GSS */ public abstract class Verifier extends RpcAuthInfo { + public static final Verifier VERIFIER_NONE = new VerifierNone(); + protected Verifier(AuthFlavor flavor) { super(flavor); } @@ -61,6 +62,4 @@ public abstract class Verifier extends RpcAuthInfo { } verifier.write(xdr); } - - } diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java index 46e602c8626..bd9f48cb524 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.portmap; -import java.net.InetAddress; import java.util.HashMap; import java.util.Map.Entry; import java.util.Set; @@ -26,10 +25,15 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.oncrpc.RpcAcceptedReply; import org.apache.hadoop.oncrpc.RpcCall; +import org.apache.hadoop.oncrpc.RpcInfo; import org.apache.hadoop.oncrpc.RpcProgram; +import org.apache.hadoop.oncrpc.RpcResponse; +import org.apache.hadoop.oncrpc.RpcUtil; import org.apache.hadoop.oncrpc.XDR; import org.apache.hadoop.oncrpc.security.VerifierNone; -import org.jboss.netty.channel.Channel; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.ChannelHandlerContext; /** * An rpcbind request handler. @@ -44,7 +48,7 @@ public class RpcProgramPortmap extends RpcProgram implements PortmapInterface { private final HashMap map; public RpcProgramPortmap() { - super("portmap", "localhost", RPCB_PORT, PROGRAM, VERSION, VERSION, 0); + super("portmap", "localhost", RPCB_PORT, PROGRAM, VERSION, VERSION); map = new HashMap(256); } @@ -130,10 +134,15 @@ public class RpcProgramPortmap extends RpcProgram implements PortmapInterface { } @Override - public XDR handleInternal(RpcCall rpcCall, XDR in, XDR out, - InetAddress client, Channel channel) { + public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) { + RpcCall rpcCall = (RpcCall) info.header(); final Procedure portmapProc = Procedure.fromValue(rpcCall.getProcedure()); int xid = rpcCall.getXid(); + byte[] data = new byte[info.data().readableBytes()]; + info.data().readBytes(data); + XDR in = new XDR(data); + XDR out = new XDR(); + if (portmapProc == Procedure.PMAPPROC_NULL) { out = nullOp(xid, in, out); } else if (portmapProc == Procedure.PMAPPROC_SET) { @@ -148,11 +157,14 @@ public class RpcProgramPortmap extends RpcProgram implements PortmapInterface { out = getport(xid, in, out); } else { LOG.info("PortmapHandler unknown rpc procedure=" + portmapProc); - RpcAcceptedReply.getInstance(xid, - RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write( - out); + RpcAcceptedReply reply = RpcAcceptedReply.getInstance(xid, + RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()); + reply.write(out); } - return out; + + ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer()); + RpcResponse rsp = new RpcResponse(buf, info.remoteAddress()); + RpcUtil.sendRpcResponse(ctx, rsp); } @Override diff --git a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java index 0c306861b50..cdeaa3f2bed 100644 --- a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java +++ b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java @@ -22,7 +22,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import java.net.InetAddress; import java.nio.ByteBuffer; import org.apache.hadoop.oncrpc.RpcUtil.RpcFrameDecoder; @@ -30,6 +29,7 @@ import org.apache.hadoop.oncrpc.security.CredentialsNone; import org.apache.hadoop.oncrpc.security.VerifierNone; import org.jboss.netty.buffer.ByteBufferBackedChannelBuffer; import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.junit.Test; @@ -38,7 +38,7 @@ import org.mockito.Mockito; public class TestFrameDecoder { private static int port = 12345; // some random server port - private static XDR result = null; + private static int resultSize; static void testRequest(XDR request) { SimpleTcpClient tcpClient = new SimpleTcpClient("localhost", port, request, @@ -49,18 +49,20 @@ public class TestFrameDecoder { static class TestRpcProgram extends RpcProgram { protected TestRpcProgram(String program, String host, int port, - int progNumber, int lowProgVersion, int highProgVersion, int cacheSize) { - super(program, host, port, progNumber, lowProgVersion, highProgVersion, - cacheSize); + int progNumber, int lowProgVersion, int highProgVersion) { + super(program, host, port, progNumber, lowProgVersion, highProgVersion); } @Override - public XDR handleInternal(RpcCall rpcCall, XDR in, XDR out, - InetAddress client, Channel channel) { - // Get the final complete request and return a void response. - result = in; - RpcAcceptedReply.getAcceptInstance(1234, new VerifierNone()).write(out); - return out; + protected void handleInternal(ChannelHandlerContext ctx, RpcInfo info) { + resultSize = info.data().readableBytes(); + RpcAcceptedReply reply = RpcAcceptedReply.getAcceptInstance(1234, + new VerifierNone()); + XDR out = new XDR(); + reply.write(out); + ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer()); + RpcResponse rsp = new RpcResponse(b, info.remoteAddress()); + RpcUtil.sendRpcResponse(ctx, rsp); } @Override @@ -147,21 +149,22 @@ public class TestFrameDecoder { public void testFrames() { RpcProgram program = new TestFrameDecoder.TestRpcProgram("TestRpcProgram", - "localhost", port, 100000, 1, 2, 100); + "localhost", port, 100000, 1, 2); SimpleTcpServer tcpServer = new SimpleTcpServer(port, program, 1); tcpServer.run(); XDR xdrOut = createGetportMount(); + int headerSize = xdrOut.size(); int bufsize = 2 * 1024 * 1024; byte[] buffer = new byte[bufsize]; xdrOut.writeFixedOpaque(buffer); - int requestSize = xdrOut.size(); + int requestSize = xdrOut.size() - headerSize; // Send the request to the server testRequest(xdrOut); // Verify the server got the request with right size - assertTrue(requestSize == result.size()); + assertEquals(requestSize, resultSize); } static void createPortmapXDRheader(XDR xdr_out, int procedure) { @@ -173,10 +176,6 @@ public class TestFrameDecoder { static XDR createGetportMount() { XDR xdr_out = new XDR(); createPortmapXDRheader(xdr_out, 3); - xdr_out.writeInt(0); // AUTH_NULL - xdr_out.writeInt(0); // cred len - xdr_out.writeInt(0); // verifier AUTH_NULL - xdr_out.writeInt(0); // verf len return xdr_out; } /* diff --git a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java index f605fc20540..40015e2fbf1 100644 --- a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java +++ b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java @@ -32,6 +32,8 @@ import org.apache.hadoop.oncrpc.RpcCallCache.CacheEntry; import org.apache.hadoop.oncrpc.RpcCallCache.ClientRequest; import org.junit.Test; +import static org.mockito.Mockito.*; + /** * Unit tests for {@link RpcCallCache} */ @@ -67,7 +69,7 @@ public class TestRpcCallCache { validateInprogressCacheEntry(e); // Set call as completed - XDR response = new XDR(); + RpcResponse response = mock(RpcResponse.class); cache.callCompleted(clientIp, xid, response); e = cache.checkOrAddToCache(clientIp, xid); validateCompletedCacheEntry(e, response); @@ -79,7 +81,7 @@ public class TestRpcCallCache { assertNull(c.getResponse()); } - private void validateCompletedCacheEntry(CacheEntry c, XDR response) { + private void validateCompletedCacheEntry(CacheEntry c, RpcResponse response) { assertFalse(c.isInProgress()); assertTrue(c.isCompleted()); assertEquals(response, c.getResponse()); @@ -93,7 +95,7 @@ public class TestRpcCallCache { assertFalse(c.isCompleted()); assertNull(c.getResponse()); - XDR response = new XDR(); + RpcResponse response = mock(RpcResponse.class); c.setResponse(response); validateCompletedCacheEntry(c, response); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java index 0c1ada61321..f8ac1dc1e4e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.nfs.mount; import java.io.IOException; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -38,10 +39,15 @@ import org.apache.hadoop.nfs.nfs3.FileHandle; import org.apache.hadoop.nfs.nfs3.Nfs3Status; import org.apache.hadoop.oncrpc.RpcAcceptedReply; import org.apache.hadoop.oncrpc.RpcCall; +import org.apache.hadoop.oncrpc.RpcInfo; import org.apache.hadoop.oncrpc.RpcProgram; +import org.apache.hadoop.oncrpc.RpcResponse; +import org.apache.hadoop.oncrpc.RpcUtil; import org.apache.hadoop.oncrpc.XDR; import org.apache.hadoop.oncrpc.security.VerifierNone; -import org.jboss.netty.channel.Channel; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.ChannelHandlerContext; /** * RPC program corresponding to mountd daemon. See {@link Mountd}. @@ -77,7 +83,7 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface { throws IOException { // Note that RPC cache is not enabled super("mountd", "localhost", config.getInt("nfs3.mountd.port", PORT), - PROGRAM, VERSION_1, VERSION_3, 0); + PROGRAM, VERSION_1, VERSION_3); this.hostsMatcher = NfsExports.getInstance(config); this.mounts = Collections.synchronizedList(new ArrayList()); @@ -173,10 +179,16 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface { } @Override - public XDR handleInternal(RpcCall rpcCall, XDR xdr, XDR out, - InetAddress client, Channel channel) { + public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) { + RpcCall rpcCall = (RpcCall) info.header(); final MNTPROC mntproc = MNTPROC.fromValue(rpcCall.getProcedure()); int xid = rpcCall.getXid(); + byte[] data = new byte[info.data().readableBytes()]; + info.data().readBytes(data); + XDR xdr = new XDR(data); + XDR out = new XDR(); + InetAddress client = ((InetSocketAddress) info.remoteAddress()).getAddress(); + if (mntproc == MNTPROC.NULL) { out = nullOp(out, xid, client); } else if (mntproc == MNTPROC.MNT) { @@ -198,7 +210,9 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface { RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write( out); } - return out; + ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer()); + RpcResponse rsp = new RpcResponse(buf, info.remoteAddress()); + RpcUtil.sendRpcResponse(ctx, rsp); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java index 16153c6faf5..a1f5c10406a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.EnumSet; @@ -103,9 +104,13 @@ import org.apache.hadoop.nfs.nfs3.response.WccAttr; import org.apache.hadoop.nfs.nfs3.response.WccData; import org.apache.hadoop.oncrpc.RpcAcceptedReply; import org.apache.hadoop.oncrpc.RpcCall; +import org.apache.hadoop.oncrpc.RpcCallCache; import org.apache.hadoop.oncrpc.RpcDeniedReply; +import org.apache.hadoop.oncrpc.RpcInfo; import org.apache.hadoop.oncrpc.RpcProgram; import org.apache.hadoop.oncrpc.RpcReply; +import org.apache.hadoop.oncrpc.RpcResponse; +import org.apache.hadoop.oncrpc.RpcUtil; import org.apache.hadoop.oncrpc.XDR; import org.apache.hadoop.oncrpc.security.Credentials; import org.apache.hadoop.oncrpc.security.CredentialsSys; @@ -115,7 +120,10 @@ import org.apache.hadoop.oncrpc.security.SysSecurityHandler; import org.apache.hadoop.oncrpc.security.Verifier; import org.apache.hadoop.oncrpc.security.VerifierNone; import org.apache.hadoop.security.AccessControlException; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelHandlerContext; /** * RPC program corresponding to nfs daemon. See {@link Nfs3}. @@ -150,14 +158,15 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { private Statistics statistics; private String writeDumpDir; // The dir save dump files + private final RpcCallCache rpcCallCache; + public RpcProgramNfs3() throws IOException { this(new Configuration()); } - public RpcProgramNfs3(Configuration config) - throws IOException { + public RpcProgramNfs3(Configuration config) throws IOException { super("NFS3", "localhost", Nfs3Constant.PORT, Nfs3Constant.PROGRAM, - Nfs3Constant.VERSION, Nfs3Constant.VERSION, 100); + Nfs3Constant.VERSION, Nfs3Constant.VERSION); config.set(FsPermission.UMASK_LABEL, "000"); iug = new IdUserGroup(); @@ -183,6 +192,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { } else { clearDirectory(writeDumpDir); } + + rpcCallCache = new RpcCallCache("NFS3", 256); } private void clearDirectory(String writeDumpDir) throws IOException { @@ -213,8 +224,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { } @Override - public GETATTR3Response getattr(XDR xdr, - SecurityHandler securityHandler, InetAddress client) { + public GETATTR3Response getattr(XDR xdr, SecurityHandler securityHandler, + InetAddress client) { GETATTR3Response response = new GETATTR3Response(Nfs3Status.NFS3_OK); if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { @@ -294,8 +305,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { } @Override - public SETATTR3Response setattr(XDR xdr, - SecurityHandler securityHandler, InetAddress client) { + public SETATTR3Response setattr(XDR xdr, SecurityHandler securityHandler, + InetAddress client) { SETATTR3Response response = new SETATTR3Response(Nfs3Status.NFS3_OK); DFSClient dfsClient = clientCache.get(securityHandler.getUser()); if (dfsClient == null) { @@ -370,8 +381,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { } @Override - public LOOKUP3Response lookup(XDR xdr, - SecurityHandler securityHandler, InetAddress client) { + public LOOKUP3Response lookup(XDR xdr, SecurityHandler securityHandler, + InetAddress client) { LOOKUP3Response response = new LOOKUP3Response(Nfs3Status.NFS3_OK); if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { @@ -432,8 +443,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { } @Override - public ACCESS3Response access(XDR xdr, - SecurityHandler securityHandler, InetAddress client) { + public ACCESS3Response access(XDR xdr, SecurityHandler securityHandler, + InetAddress client) { ACCESS3Response response = new ACCESS3Response(Nfs3Status.NFS3_OK); if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { @@ -574,7 +585,6 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { long offset = request.getOffset(); int count = request.getCount(); - FileHandle handle = request.getHandle(); if (LOG.isDebugEnabled()) { LOG.debug("NFS READ fileId: " + handle.getFileId() + " offset: " + offset @@ -720,8 +730,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { } @Override - public CREATE3Response create(XDR xdr, - SecurityHandler securityHandler, InetAddress client) { + public CREATE3Response create(XDR xdr, SecurityHandler securityHandler, + InetAddress client) { CREATE3Response response = new CREATE3Response(Nfs3Status.NFS3_OK); DFSClient dfsClient = clientCache.get(securityHandler.getUser()); if (dfsClient == null) { @@ -973,8 +983,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { } String fileIdPath = dirFileIdPath + "/" + fileName; - HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient, - fileIdPath); + HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient, fileIdPath); if (fstat == null) { WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr), preOpDirAttr); @@ -1056,8 +1065,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { } String fileIdPath = dirFileIdPath + "/" + fileName; - HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient, - fileIdPath); + HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient, fileIdPath); if (fstat == null) { return new RMDIR3Response(Nfs3Status.NFS3ERR_NOENT, errWcc); } @@ -1098,8 +1106,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { } @Override - public RENAME3Response rename(XDR xdr, - SecurityHandler securityHandler, InetAddress client) { + public RENAME3Response rename(XDR xdr, SecurityHandler securityHandler, + InetAddress client) { RENAME3Response response = new RENAME3Response(Nfs3Status.NFS3_OK); DFSClient dfsClient = clientCache.get(securityHandler.getUser()); if (dfsClient == null) { @@ -1245,13 +1253,14 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { } } - public READDIR3Response link(XDR xdr, SecurityHandler securityHandler, InetAddress client) { + public READDIR3Response link(XDR xdr, SecurityHandler securityHandler, + InetAddress client) { return new READDIR3Response(Nfs3Status.NFS3ERR_NOTSUPP); } @Override - public READDIR3Response readdir(XDR xdr, - SecurityHandler securityHandler, InetAddress client) { + public READDIR3Response readdir(XDR xdr, SecurityHandler securityHandler, + InetAddress client) { READDIR3Response response = new READDIR3Response(Nfs3Status.NFS3_OK); if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { @@ -1540,8 +1549,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { } @Override - public FSSTAT3Response fsstat(XDR xdr, - SecurityHandler securityHandler, InetAddress client) { + public FSSTAT3Response fsstat(XDR xdr, SecurityHandler securityHandler, + InetAddress client) { FSSTAT3Response response = new FSSTAT3Response(Nfs3Status.NFS3_OK); if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { @@ -1598,8 +1607,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { } @Override - public FSINFO3Response fsinfo(XDR xdr, - SecurityHandler securityHandler, InetAddress client) { + public FSINFO3Response fsinfo(XDR xdr, SecurityHandler securityHandler, + InetAddress client) { FSINFO3Response response = new FSINFO3Response(Nfs3Status.NFS3_OK); if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { @@ -1650,8 +1659,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { } @Override - public PATHCONF3Response pathconf(XDR xdr, - SecurityHandler securityHandler, InetAddress client) { + public PATHCONF3Response pathconf(XDR xdr, SecurityHandler securityHandler, + InetAddress client) { PATHCONF3Response response = new PATHCONF3Response(Nfs3Status.NFS3_OK); if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { @@ -1697,8 +1706,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { } @Override - public COMMIT3Response commit(XDR xdr, - SecurityHandler securityHandler, InetAddress client) { + public COMMIT3Response commit(XDR xdr, SecurityHandler securityHandler, + InetAddress client) { COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK); DFSClient dfsClient = clientCache.get(securityHandler.getUser()); if (dfsClient == null) { @@ -1776,25 +1785,53 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { } @Override - public XDR handleInternal(RpcCall rpcCall, final XDR xdr, XDR out, - InetAddress client, Channel channel) { + public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) { + RpcCall rpcCall = (RpcCall) info.header(); final NFSPROC3 nfsproc3 = NFSPROC3.fromValue(rpcCall.getProcedure()); int xid = rpcCall.getXid(); + byte[] data = new byte[info.data().readableBytes()]; + info.data().readBytes(data); + XDR xdr = new XDR(data); + XDR out = new XDR(); + InetAddress client = ((InetSocketAddress) info.remoteAddress()) + .getAddress(); + Channel channel = info.channel(); Credentials credentials = rpcCall.getCredential(); // Ignore auth only for NFSPROC3_NULL, especially for Linux clients. if (nfsproc3 != NFSPROC3.NULL) { - if (rpcCall.getCredential().getFlavor() != AuthFlavor.AUTH_SYS - && rpcCall.getCredential().getFlavor() != AuthFlavor.RPCSEC_GSS) { - LOG.info("Wrong RPC AUTH flavor, " - + rpcCall.getCredential().getFlavor() + if (credentials.getFlavor() != AuthFlavor.AUTH_SYS + && credentials.getFlavor() != AuthFlavor.RPCSEC_GSS) { + LOG.info("Wrong RPC AUTH flavor, " + credentials.getFlavor() + " is not AUTH_SYS or RPCSEC_GSS."); XDR reply = new XDR(); RpcDeniedReply rdr = new RpcDeniedReply(xid, RpcReply.ReplyState.MSG_ACCEPTED, RpcDeniedReply.RejectState.AUTH_ERROR, new VerifierNone()); rdr.write(reply); - return reply; + + ChannelBuffer buf = ChannelBuffers.wrappedBuffer(reply.asReadOnlyWrap() + .buffer()); + RpcResponse rsp = new RpcResponse(buf, info.remoteAddress()); + RpcUtil.sendRpcResponse(ctx, rsp); + return; + } + } + + if (!isIdempotent(rpcCall)) { + RpcCallCache.CacheEntry entry = rpcCallCache.checkOrAddToCache(client, + xid); + if (entry != null) { // in cache + if (entry.isCompleted()) { + LOG.info("Sending the cached reply to retransmitted request " + xid); + RpcUtil.sendRpcResponse(ctx, entry.getResponse()); + return; + } else { // else request is in progress + LOG.info("Retransmitted request, transaction still in progress " + + xid); + // Ignore the request and do nothing + return; + } } } @@ -1862,12 +1899,24 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write( out); } - if (response != null) { - // TODO: currently we just return VerifierNone - out = response.writeHeaderAndResponse(out, xid, new VerifierNone()); + if (response == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("No sync response, expect an async response for request XID=" + + rpcCall.getXid()); + } + return; + } + // TODO: currently we just return VerifierNone + out = response.writeHeaderAndResponse(out, xid, new VerifierNone()); + ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap() + .buffer()); + RpcResponse rsp = new RpcResponse(buf, info.remoteAddress()); + + if (!isIdempotent(rpcCall)) { + rpcCallCache.callCompleted(client, xid, rsp); } - return out; + RpcUtil.sendRpcResponse(ctx, rsp); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index f0bf3a22e45..e1d0296ab99 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -345,6 +345,9 @@ Release 2.1.2 - UNRELEASED NEW FEATURES + HDFS-5230. Introduce RpcInfo to decouple XDR classes from the RPC API. + (Haohui Mai via brandonli) + IMPROVEMENTS HDFS-5246. Make Hadoop nfs server port and mount daemon port