HDFS-5230. Introduce RpcInfo to decouple XDR classes from the RPC API. Contributed by Haohui Mai

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1527726 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Brandon Li 2013-09-30 19:21:17 +00:00
parent d2e73b2775
commit 25cdbdb71a
18 changed files with 429 additions and 353 deletions

View File

@ -22,13 +22,8 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mount.MountdBase; import org.apache.hadoop.mount.MountdBase;
import org.apache.hadoop.oncrpc.RpcProgram; import org.apache.hadoop.oncrpc.RpcProgram;
import org.apache.hadoop.oncrpc.RpcUtil;
import org.apache.hadoop.oncrpc.SimpleTcpServer; import org.apache.hadoop.oncrpc.SimpleTcpServer;
import org.apache.hadoop.oncrpc.SimpleTcpServerHandler;
import org.apache.hadoop.portmap.PortmapMapping; import org.apache.hadoop.portmap.PortmapMapping;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
/** /**
* Nfs server. Supports NFS v3 using {@link RpcProgram}. * Nfs server. Supports NFS v3 using {@link RpcProgram}.
@ -72,19 +67,7 @@ public abstract class Nfs3Base {
private void startTCPServer() { private void startTCPServer() {
SimpleTcpServer tcpServer = new SimpleTcpServer(nfsPort, SimpleTcpServer tcpServer = new SimpleTcpServer(nfsPort,
rpcProgram, 0) { rpcProgram, 0);
@Override
public ChannelPipelineFactory getPipelineFactory() {
return new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
return Channels.pipeline(
RpcUtil.constructRpcFrameDecoder(),
new SimpleTcpServerHandler(rpcProgram));
}
};
}
};
tcpServer.run(); tcpServer.run();
} }
} }

View File

@ -44,7 +44,7 @@ import com.google.common.annotations.VisibleForTesting;
public class RpcCallCache { public class RpcCallCache {
public static class CacheEntry { public static class CacheEntry {
private XDR response; // null if no response has been sent private RpcResponse response; // null if no response has been sent
public CacheEntry() { public CacheEntry() {
response = null; response = null;
@ -58,11 +58,11 @@ public class RpcCallCache {
return response != null; return response != null;
} }
public XDR getResponse() { public RpcResponse getResponse() {
return response; return response;
} }
public void setResponse(XDR response) { public void setResponse(RpcResponse response) {
this.response = response; this.response = response;
} }
} }
@ -128,13 +128,13 @@ public class RpcCallCache {
} }
/** Mark a request as completed and add corresponding response to the cache */ /** Mark a request as completed and add corresponding response to the cache */
public void callCompleted(InetAddress clientId, int xid, XDR response) { public void callCompleted(InetAddress clientId, int xid, RpcResponse response) {
ClientRequest req = new ClientRequest(clientId, xid); ClientRequest req = new ClientRequest(clientId, xid);
CacheEntry e; CacheEntry e;
synchronized(map) { synchronized(map) {
e = map.get(req); e = map.get(req);
} }
e.setResponse(response); e.response = response;
} }
/** /**

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.oncrpc;
import java.net.SocketAddress;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
/**
* RpcInfo records all contextual information of an RPC message. It contains
* the RPC header, the parameters, and the information of the remote peer.
*/
public final class RpcInfo {
private final RpcMessage header;
private final ChannelBuffer data;
private final Channel channel;
private final SocketAddress remoteAddress;
public RpcInfo(RpcMessage header, ChannelBuffer data,
ChannelHandlerContext channelContext, Channel channel,
SocketAddress remoteAddress) {
this.header = header;
this.data = data;
this.channel = channel;
this.remoteAddress = remoteAddress;
}
public RpcMessage header() {
return header;
}
public ChannelBuffer data() {
return data;
}
public Channel channel() {
return channel;
}
public SocketAddress remoteAddress() {
return remoteAddress;
}
}

View File

@ -18,22 +18,24 @@
package org.apache.hadoop.oncrpc; package org.apache.hadoop.oncrpc;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState; import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
import org.apache.hadoop.oncrpc.RpcCallCache.CacheEntry; import org.apache.hadoop.oncrpc.security.Verifier;
import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.apache.hadoop.portmap.PortmapMapping; import org.apache.hadoop.portmap.PortmapMapping;
import org.apache.hadoop.portmap.PortmapRequest; import org.apache.hadoop.portmap.PortmapRequest;
import org.jboss.netty.channel.Channel; import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
/** /**
* Class for writing RPC server programs based on RFC 1050. Extend this class * Class for writing RPC server programs based on RFC 1050. Extend this class
* and implement {@link #handleInternal} to handle the requests received. * and implement {@link #handleInternal} to handle the requests received.
*/ */
public abstract class RpcProgram { public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
private static final Log LOG = LogFactory.getLog(RpcProgram.class); private static final Log LOG = LogFactory.getLog(RpcProgram.class);
public static final int RPCB_PORT = 111; public static final int RPCB_PORT = 111;
private final String program; private final String program;
@ -42,7 +44,6 @@ public abstract class RpcProgram {
private final int progNumber; private final int progNumber;
private final int lowProgVersion; private final int lowProgVersion;
private final int highProgVersion; private final int highProgVersion;
private final RpcCallCache rpcCallCache;
/** /**
* Constructor * Constructor
@ -53,19 +54,15 @@ public abstract class RpcProgram {
* @param progNumber program number as defined in RFC 1050 * @param progNumber program number as defined in RFC 1050
* @param lowProgVersion lowest version of the specification supported * @param lowProgVersion lowest version of the specification supported
* @param highProgVersion highest version of the specification supported * @param highProgVersion highest version of the specification supported
* @param cacheSize size of cache to handle duplciate requests. Size <= 0
* indicates no cache.
*/ */
protected RpcProgram(String program, String host, int port, int progNumber, protected RpcProgram(String program, String host, int port, int progNumber,
int lowProgVersion, int highProgVersion, int cacheSize) { int lowProgVersion, int highProgVersion) {
this.program = program; this.program = program;
this.host = host; this.host = host;
this.port = port; this.port = port;
this.progNumber = progNumber; this.progNumber = progNumber;
this.lowProgVersion = lowProgVersion; this.lowProgVersion = lowProgVersion;
this.highProgVersion = highProgVersion; this.highProgVersion = highProgVersion;
this.rpcCallCache = cacheSize > 0 ? new RpcCallCache(program, cacheSize)
: null;
} }
/** /**
@ -103,92 +100,50 @@ public abstract class RpcProgram {
} }
} }
/** @Override
* Handle an RPC request. public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
* @param rpcCall RPC call that is received throws Exception {
* @param in xdr with cursor at reading the remaining bytes of a method call RpcInfo info = (RpcInfo) e.getMessage();
* @param out xdr output corresponding to Rpc reply RpcCall call = (RpcCall) info.header();
* @param client making the Rpc request if (LOG.isTraceEnabled()) {
* @param channel connection over which Rpc request is received LOG.trace(program + " procedure #" + call.getProcedure());
* @return response xdr response
*/
protected abstract XDR handleInternal(RpcCall rpcCall, XDR in, XDR out,
InetAddress client, Channel channel);
public XDR handle(XDR xdr, InetAddress client, Channel channel) {
XDR out = new XDR();
RpcCall rpcCall = RpcCall.read(xdr);
if (LOG.isDebugEnabled()) {
LOG.debug(program + " procedure #" + rpcCall.getProcedure());
} }
if (!checkProgram(rpcCall.getProgram())) { if (this.progNumber != call.getProgram()) {
return programMismatch(out, rpcCall); LOG.warn("Invalid RPC call program " + call.getProgram());
RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
AcceptState.PROG_UNAVAIL, Verifier.VERIFIER_NONE);
XDR out = new XDR();
reply.write(out);
ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
.buffer());
RpcResponse rsp = new RpcResponse(b, info.remoteAddress());
RpcUtil.sendRpcResponse(ctx, rsp);
return;
} }
if (!checkProgramVersion(rpcCall.getVersion())) { int ver = call.getVersion();
return programVersionMismatch(out, rpcCall); if (ver < lowProgVersion || ver > highProgVersion) {
LOG.warn("Invalid RPC call version " + ver);
RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
AcceptState.PROG_MISMATCH, Verifier.VERIFIER_NONE);
XDR out = new XDR();
reply.write(out);
out.writeInt(lowProgVersion);
out.writeInt(highProgVersion);
ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
.buffer());
RpcResponse rsp = new RpcResponse(b, info.remoteAddress());
RpcUtil.sendRpcResponse(ctx, rsp);
return;
} }
// Check for duplicate requests in the cache for non-idempotent requests handleInternal(ctx, info);
boolean idempotent = rpcCallCache != null && !isIdempotent(rpcCall);
if (idempotent) {
CacheEntry entry = rpcCallCache.checkOrAddToCache(client, rpcCall.getXid());
if (entry != null) { // in ache
if (entry.isCompleted()) {
LOG.info("Sending the cached reply to retransmitted request "
+ rpcCall.getXid());
return entry.getResponse();
} else { // else request is in progress
LOG.info("Retransmitted request, transaction still in progress "
+ rpcCall.getXid());
// TODO: ignore the request?
}
}
}
XDR response = handleInternal(rpcCall, xdr, out, client, channel);
if (response.size() == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("No sync response, expect an async response for request XID="
+ rpcCall.getXid());
}
}
// Add the request to the cache
if (idempotent) {
rpcCallCache.callCompleted(client, rpcCall.getXid(), response);
}
return response;
}
private XDR programMismatch(XDR out, RpcCall call) {
LOG.warn("Invalid RPC call program " + call.getProgram());
RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
AcceptState.PROG_UNAVAIL, new VerifierNone());
reply.write(out);
return out;
}
private XDR programVersionMismatch(XDR out, RpcCall call) {
LOG.warn("Invalid RPC call version " + call.getVersion());
RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
AcceptState.PROG_MISMATCH, new VerifierNone());
reply.write(out);
out.writeInt(lowProgVersion);
out.writeInt(highProgVersion);
return out;
}
private boolean checkProgram(int progNumber) {
return this.progNumber == progNumber;
}
/** Return true if a the program version in rpcCall is supported */
private boolean checkProgramVersion(int programVersion) {
return programVersion >= lowProgVersion
&& programVersion <= highProgVersion;
} }
protected abstract void handleInternal(ChannelHandlerContext ctx, RpcInfo info);
@Override @Override
public String toString() { public String toString() {

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.oncrpc;
import java.net.SocketAddress;
import org.jboss.netty.buffer.ChannelBuffer;
/**
* RpcResponse encapsulates a response to a RPC request. It contains the data
* that is going to cross the wire, as well as the information of the remote
* peer.
*/
public class RpcResponse {
private final ChannelBuffer data;
private final SocketAddress remoteAddress;
public RpcResponse(ChannelBuffer data, SocketAddress remoteAddress) {
this.data = data;
this.remoteAddress = remoteAddress;
}
public ChannelBuffer data() {
return data;
}
public SocketAddress remoteAddress() {
return remoteAddress;
}
}

View File

@ -17,17 +17,23 @@
*/ */
package org.apache.hadoop.oncrpc; package org.apache.hadoop.oncrpc;
import java.nio.ByteBuffer;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.handler.codec.frame.FrameDecoder; import org.jboss.netty.handler.codec.frame.FrameDecoder;
public class RpcUtil { public final class RpcUtil {
/** /**
* The XID in RPC call. It is used for starting with new seed after each reboot. * The XID in RPC call. It is used for starting with new seed after each
* reboot.
*/ */
private static int xid = (int) (System.currentTimeMillis() / 1000) << 12; private static int xid = (int) (System.currentTimeMillis() / 1000) << 12;
@ -35,10 +41,27 @@ public class RpcUtil {
return xid = ++xid + caller.hashCode(); return xid = ++xid + caller.hashCode();
} }
public static void sendRpcResponse(ChannelHandlerContext ctx,
RpcResponse response) {
Channels.fireMessageReceived(ctx, response);
}
public static FrameDecoder constructRpcFrameDecoder() { public static FrameDecoder constructRpcFrameDecoder() {
return new RpcFrameDecoder(); return new RpcFrameDecoder();
} }
public static final SimpleChannelUpstreamHandler STAGE_RPC_MESSAGE_PARSER = new RpcMessageParserStage();
public static final SimpleChannelUpstreamHandler STAGE_RPC_TCP_RESPONSE = new RpcTcpResponseStage();
public static final SimpleChannelUpstreamHandler STAGE_RPC_UDP_RESPONSE = new RpcUdpResponseStage();
/**
* An RPC client can separate a RPC message into several frames (i.e.,
* fragments) when transferring it across the wire. RpcFrameDecoder
* reconstructs a full RPC message from these fragments.
*
* RpcFrameDecoder is a stateful pipeline stage. It has to be constructed for
* each RPC client.
*/
static class RpcFrameDecoder extends FrameDecoder { static class RpcFrameDecoder extends FrameDecoder {
public static final Log LOG = LogFactory.getLog(RpcFrameDecoder.class); public static final Log LOG = LogFactory.getLog(RpcFrameDecoder.class);
private ChannelBuffer currentFrame; private ChannelBuffer currentFrame;
@ -78,4 +101,68 @@ public class RpcUtil {
} }
} }
} }
/**
* RpcMessageParserStage parses the network bytes and encapsulates the RPC
* request into a RpcInfo instance.
*/
static final class RpcMessageParserStage extends SimpleChannelUpstreamHandler {
private static final Log LOG = LogFactory
.getLog(RpcMessageParserStage.class);
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
ChannelBuffer buf = (ChannelBuffer) e.getMessage();
ByteBuffer b = buf.toByteBuffer().asReadOnlyBuffer();
XDR in = new XDR(b, XDR.State.READING);
RpcInfo info = null;
try {
RpcCall callHeader = RpcCall.read(in);
ChannelBuffer dataBuffer = ChannelBuffers.wrappedBuffer(in.buffer()
.slice());
info = new RpcInfo(callHeader, dataBuffer, ctx, e.getChannel(),
e.getRemoteAddress());
} catch (Exception exc) {
LOG.info("Malfromed RPC request from " + e.getRemoteAddress());
}
if (info != null) {
Channels.fireMessageReceived(ctx, info);
}
}
}
/**
* RpcTcpResponseStage sends an RpcResponse across the wire with the
* appropriate fragment header.
*/
private static class RpcTcpResponseStage extends SimpleChannelUpstreamHandler {
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
RpcResponse r = (RpcResponse) e.getMessage();
byte[] fragmentHeader = XDR.recordMark(r.data().readableBytes(), true);
ChannelBuffer header = ChannelBuffers.wrappedBuffer(fragmentHeader);
ChannelBuffer d = ChannelBuffers.wrappedBuffer(header, r.data());
e.getChannel().write(d);
}
}
/**
* RpcUdpResponseStage sends an RpcResponse as a UDP packet, which does not
* require a fragment header.
*/
private static final class RpcUdpResponseStage extends
SimpleChannelUpstreamHandler {
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
RpcResponse r = (RpcResponse) e.getMessage();
e.getChannel().write(r.data(), r.remoteAddress());
}
}
} }

View File

@ -27,6 +27,7 @@ import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
/** /**
@ -35,8 +36,7 @@ import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
public class SimpleTcpServer { public class SimpleTcpServer {
public static final Log LOG = LogFactory.getLog(SimpleTcpServer.class); public static final Log LOG = LogFactory.getLog(SimpleTcpServer.class);
protected final int port; protected final int port;
protected final ChannelPipelineFactory pipelineFactory; protected final SimpleChannelUpstreamHandler rpcProgram;
protected final RpcProgram rpcProgram;
/** The maximum number of I/O worker threads */ /** The maximum number of I/O worker threads */
protected final int workerCount; protected final int workerCount;
@ -50,18 +50,6 @@ public class SimpleTcpServer {
this.port = port; this.port = port;
this.rpcProgram = program; this.rpcProgram = program;
this.workerCount = workercount; this.workerCount = workercount;
this.pipelineFactory = getPipelineFactory();
}
public ChannelPipelineFactory getPipelineFactory() {
return new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
return Channels.pipeline(
RpcUtil.constructRpcFrameDecoder(),
new SimpleTcpServerHandler(rpcProgram));
}
};
} }
public void run() { public void run() {
@ -78,7 +66,15 @@ public class SimpleTcpServer {
} }
ServerBootstrap bootstrap = new ServerBootstrap(factory); ServerBootstrap bootstrap = new ServerBootstrap(factory);
bootstrap.setPipelineFactory(pipelineFactory); bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(RpcUtil.constructRpcFrameDecoder(),
RpcUtil.STAGE_RPC_MESSAGE_PARSER, rpcProgram,
RpcUtil.STAGE_RPC_TCP_RESPONSE);
}
});
bootstrap.setOption("child.tcpNoDelay", true); bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setOption("child.keepAlive", true); bootstrap.setOption("child.keepAlive", true);

View File

@ -1,63 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.oncrpc;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
/**
* Handler used by {@link SimpleTcpServer}.
*/
public class SimpleTcpServerHandler extends SimpleChannelHandler {
public static final Log LOG = LogFactory.getLog(SimpleTcpServerHandler.class);
protected final RpcProgram rpcProgram;
public SimpleTcpServerHandler(RpcProgram rpcProgram) {
this.rpcProgram = rpcProgram;
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
ChannelBuffer buf = (ChannelBuffer) e.getMessage();
XDR request = new XDR(buf.toByteBuffer().asReadOnlyBuffer(), XDR.State.READING);
InetAddress remoteInetAddr = ((InetSocketAddress) ctx.getChannel()
.getRemoteAddress()).getAddress();
Channel outChannel = e.getChannel();
XDR response = rpcProgram.handle(request, remoteInetAddr, outChannel);
if (response.size() > 0) {
outChannel.write(XDR.writeMessageTcp(response, true));
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
LOG.warn("Encountered ", e.getCause());
e.getChannel().close();
}
}

View File

@ -23,9 +23,8 @@ import java.util.concurrent.Executors;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.jboss.netty.bootstrap.ConnectionlessBootstrap; import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.DatagramChannelFactory; import org.jboss.netty.channel.socket.DatagramChannelFactory;
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory; import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
@ -38,20 +37,13 @@ public class SimpleUdpServer {
private final int RECEIVE_BUFFER_SIZE = 65536; private final int RECEIVE_BUFFER_SIZE = 65536;
protected final int port; protected final int port;
protected final ChannelPipelineFactory pipelineFactory; protected final SimpleChannelUpstreamHandler rpcProgram;
protected final RpcProgram rpcProgram;
protected final int workerCount; protected final int workerCount;
public SimpleUdpServer(int port, RpcProgram program, int workerCount) { public SimpleUdpServer(int port, SimpleChannelUpstreamHandler program, int workerCount) {
this.port = port; this.port = port;
this.rpcProgram = program; this.rpcProgram = program;
this.workerCount = workerCount; this.workerCount = workerCount;
this.pipelineFactory = new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
return Channels.pipeline(new SimpleUdpServerHandler(rpcProgram));
}
};
} }
public void run() { public void run() {
@ -60,8 +52,9 @@ public class SimpleUdpServer {
Executors.newCachedThreadPool(), workerCount); Executors.newCachedThreadPool(), workerCount);
ConnectionlessBootstrap b = new ConnectionlessBootstrap(f); ConnectionlessBootstrap b = new ConnectionlessBootstrap(f);
ChannelPipeline p = b.getPipeline(); b.setPipeline(Channels.pipeline(
p.addLast("handler", new SimpleUdpServerHandler(rpcProgram)); RpcUtil.STAGE_RPC_MESSAGE_PARSER, rpcProgram,
RpcUtil.STAGE_RPC_UDP_RESPONSE));
b.setOption("broadcast", "false"); b.setOption("broadcast", "false");
b.setOption("sendBufferSize", SEND_BUFFER_SIZE); b.setOption("sendBufferSize", SEND_BUFFER_SIZE);

View File

@ -1,61 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.oncrpc;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
/**
* Handler used by {@link SimpleUdpServer}.
*/
public class SimpleUdpServerHandler extends SimpleChannelHandler {
public static final Log LOG = LogFactory.getLog(SimpleUdpServerHandler.class);
private final RpcProgram rpcProgram;
public SimpleUdpServerHandler(RpcProgram rpcProgram) {
this.rpcProgram = rpcProgram;
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
ChannelBuffer buf = (ChannelBuffer) e.getMessage();
XDR request = new XDR(buf.toByteBuffer().asReadOnlyBuffer(), XDR.State.READING);
InetAddress remoteInetAddr = ((InetSocketAddress) e.getRemoteAddress())
.getAddress();
XDR response = rpcProgram.handle(request, remoteInetAddr, null);
e.getChannel().write(XDR.writeMessageUdp(response.asReadOnlyWrap()),
e.getRemoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
LOG.warn("Encountered ", e.getCause());
e.getChannel().close();
}
}

View File

@ -93,6 +93,10 @@ public final class XDR {
return n; return n;
} }
public ByteBuffer buffer() {
return buf.duplicate();
}
public int size() { public int size() {
// TODO: This overloading intends to be compatible with the semantics of // TODO: This overloading intends to be compatible with the semantics of
// the previous version of the class. This function should be separated into // the previous version of the class. This function should be separated into
@ -219,7 +223,7 @@ public final class XDR {
return xdr.buf.remaining() >= len; return xdr.buf.remaining() >= len;
} }
private static byte[] recordMark(int size, boolean last) { static byte[] recordMark(int size, boolean last) {
byte[] b = new byte[SIZEOF_INT]; byte[] b = new byte[SIZEOF_INT];
ByteBuffer buf = ByteBuffer.wrap(b); ByteBuffer buf = ByteBuffer.wrap(b);
buf.putInt(!last ? size : size | 0x80000000); buf.putInt(!last ? size : size | 0x80000000);
@ -259,9 +263,8 @@ public final class XDR {
@VisibleForTesting @VisibleForTesting
public byte[] getBytes() { public byte[] getBytes() {
ByteBuffer d = buf.duplicate(); ByteBuffer d = asReadOnlyWrap().buffer();
byte[] b = new byte[d.position()]; byte[] b = new byte[d.remaining()];
d.flip();
d.get(b); d.get(b);
return b; return b;

View File

@ -18,16 +18,17 @@
package org.apache.hadoop.oncrpc.security; package org.apache.hadoop.oncrpc.security;
import org.apache.hadoop.oncrpc.XDR; import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.security.RpcAuthInfo.AuthFlavor;
/** /**
* Base class for verifier. Currently our authentication only supports 3 types * Base class for verifier. Currently our authentication only supports 3 types
* of auth flavors: {@link AuthFlavor#AUTH_NONE}, {@link AuthFlavor#AUTH_SYS}, * of auth flavors: {@link RpcAuthInfo.AuthFlavor#AUTH_NONE}, {@link RpcAuthInfo.AuthFlavor#AUTH_SYS},
* and {@link AuthFlavor#RPCSEC_GSS}. Thus for verifier we only need to handle * and {@link RpcAuthInfo.AuthFlavor#RPCSEC_GSS}. Thus for verifier we only need to handle
* AUTH_NONE and RPCSEC_GSS * AUTH_NONE and RPCSEC_GSS
*/ */
public abstract class Verifier extends RpcAuthInfo { public abstract class Verifier extends RpcAuthInfo {
public static final Verifier VERIFIER_NONE = new VerifierNone();
protected Verifier(AuthFlavor flavor) { protected Verifier(AuthFlavor flavor) {
super(flavor); super(flavor);
} }
@ -61,6 +62,4 @@ public abstract class Verifier extends RpcAuthInfo {
} }
verifier.write(xdr); verifier.write(xdr);
} }
} }

View File

@ -17,7 +17,6 @@
*/ */
package org.apache.hadoop.portmap; package org.apache.hadoop.portmap;
import java.net.InetAddress;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
@ -26,10 +25,15 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.oncrpc.RpcAcceptedReply; import org.apache.hadoop.oncrpc.RpcAcceptedReply;
import org.apache.hadoop.oncrpc.RpcCall; import org.apache.hadoop.oncrpc.RpcCall;
import org.apache.hadoop.oncrpc.RpcInfo;
import org.apache.hadoop.oncrpc.RpcProgram; import org.apache.hadoop.oncrpc.RpcProgram;
import org.apache.hadoop.oncrpc.RpcResponse;
import org.apache.hadoop.oncrpc.RpcUtil;
import org.apache.hadoop.oncrpc.XDR; import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.security.VerifierNone; import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.jboss.netty.channel.Channel; import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.ChannelHandlerContext;
/** /**
* An rpcbind request handler. * An rpcbind request handler.
@ -44,7 +48,7 @@ public class RpcProgramPortmap extends RpcProgram implements PortmapInterface {
private final HashMap<String, PortmapMapping> map; private final HashMap<String, PortmapMapping> map;
public RpcProgramPortmap() { public RpcProgramPortmap() {
super("portmap", "localhost", RPCB_PORT, PROGRAM, VERSION, VERSION, 0); super("portmap", "localhost", RPCB_PORT, PROGRAM, VERSION, VERSION);
map = new HashMap<String, PortmapMapping>(256); map = new HashMap<String, PortmapMapping>(256);
} }
@ -130,10 +134,15 @@ public class RpcProgramPortmap extends RpcProgram implements PortmapInterface {
} }
@Override @Override
public XDR handleInternal(RpcCall rpcCall, XDR in, XDR out, public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
InetAddress client, Channel channel) { RpcCall rpcCall = (RpcCall) info.header();
final Procedure portmapProc = Procedure.fromValue(rpcCall.getProcedure()); final Procedure portmapProc = Procedure.fromValue(rpcCall.getProcedure());
int xid = rpcCall.getXid(); int xid = rpcCall.getXid();
byte[] data = new byte[info.data().readableBytes()];
info.data().readBytes(data);
XDR in = new XDR(data);
XDR out = new XDR();
if (portmapProc == Procedure.PMAPPROC_NULL) { if (portmapProc == Procedure.PMAPPROC_NULL) {
out = nullOp(xid, in, out); out = nullOp(xid, in, out);
} else if (portmapProc == Procedure.PMAPPROC_SET) { } else if (portmapProc == Procedure.PMAPPROC_SET) {
@ -148,11 +157,14 @@ public class RpcProgramPortmap extends RpcProgram implements PortmapInterface {
out = getport(xid, in, out); out = getport(xid, in, out);
} else { } else {
LOG.info("PortmapHandler unknown rpc procedure=" + portmapProc); LOG.info("PortmapHandler unknown rpc procedure=" + portmapProc);
RpcAcceptedReply.getInstance(xid, RpcAcceptedReply reply = RpcAcceptedReply.getInstance(xid,
RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write( RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone());
out); reply.write(out);
} }
return out;
ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer());
RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
RpcUtil.sendRpcResponse(ctx, rsp);
} }
@Override @Override

View File

@ -22,7 +22,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.net.InetAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.apache.hadoop.oncrpc.RpcUtil.RpcFrameDecoder; import org.apache.hadoop.oncrpc.RpcUtil.RpcFrameDecoder;
@ -30,6 +29,7 @@ import org.apache.hadoop.oncrpc.security.CredentialsNone;
import org.apache.hadoop.oncrpc.security.VerifierNone; import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.jboss.netty.buffer.ByteBufferBackedChannelBuffer; import org.jboss.netty.buffer.ByteBufferBackedChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelHandlerContext;
import org.junit.Test; import org.junit.Test;
@ -38,7 +38,7 @@ import org.mockito.Mockito;
public class TestFrameDecoder { public class TestFrameDecoder {
private static int port = 12345; // some random server port private static int port = 12345; // some random server port
private static XDR result = null; private static int resultSize;
static void testRequest(XDR request) { static void testRequest(XDR request) {
SimpleTcpClient tcpClient = new SimpleTcpClient("localhost", port, request, SimpleTcpClient tcpClient = new SimpleTcpClient("localhost", port, request,
@ -49,18 +49,20 @@ public class TestFrameDecoder {
static class TestRpcProgram extends RpcProgram { static class TestRpcProgram extends RpcProgram {
protected TestRpcProgram(String program, String host, int port, protected TestRpcProgram(String program, String host, int port,
int progNumber, int lowProgVersion, int highProgVersion, int cacheSize) { int progNumber, int lowProgVersion, int highProgVersion) {
super(program, host, port, progNumber, lowProgVersion, highProgVersion, super(program, host, port, progNumber, lowProgVersion, highProgVersion);
cacheSize);
} }
@Override @Override
public XDR handleInternal(RpcCall rpcCall, XDR in, XDR out, protected void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
InetAddress client, Channel channel) { resultSize = info.data().readableBytes();
// Get the final complete request and return a void response. RpcAcceptedReply reply = RpcAcceptedReply.getAcceptInstance(1234,
result = in; new VerifierNone());
RpcAcceptedReply.getAcceptInstance(1234, new VerifierNone()).write(out); XDR out = new XDR();
return out; reply.write(out);
ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer());
RpcResponse rsp = new RpcResponse(b, info.remoteAddress());
RpcUtil.sendRpcResponse(ctx, rsp);
} }
@Override @Override
@ -147,21 +149,22 @@ public class TestFrameDecoder {
public void testFrames() { public void testFrames() {
RpcProgram program = new TestFrameDecoder.TestRpcProgram("TestRpcProgram", RpcProgram program = new TestFrameDecoder.TestRpcProgram("TestRpcProgram",
"localhost", port, 100000, 1, 2, 100); "localhost", port, 100000, 1, 2);
SimpleTcpServer tcpServer = new SimpleTcpServer(port, program, 1); SimpleTcpServer tcpServer = new SimpleTcpServer(port, program, 1);
tcpServer.run(); tcpServer.run();
XDR xdrOut = createGetportMount(); XDR xdrOut = createGetportMount();
int headerSize = xdrOut.size();
int bufsize = 2 * 1024 * 1024; int bufsize = 2 * 1024 * 1024;
byte[] buffer = new byte[bufsize]; byte[] buffer = new byte[bufsize];
xdrOut.writeFixedOpaque(buffer); xdrOut.writeFixedOpaque(buffer);
int requestSize = xdrOut.size(); int requestSize = xdrOut.size() - headerSize;
// Send the request to the server // Send the request to the server
testRequest(xdrOut); testRequest(xdrOut);
// Verify the server got the request with right size // Verify the server got the request with right size
assertTrue(requestSize == result.size()); assertEquals(requestSize, resultSize);
} }
static void createPortmapXDRheader(XDR xdr_out, int procedure) { static void createPortmapXDRheader(XDR xdr_out, int procedure) {
@ -173,10 +176,6 @@ public class TestFrameDecoder {
static XDR createGetportMount() { static XDR createGetportMount() {
XDR xdr_out = new XDR(); XDR xdr_out = new XDR();
createPortmapXDRheader(xdr_out, 3); createPortmapXDRheader(xdr_out, 3);
xdr_out.writeInt(0); // AUTH_NULL
xdr_out.writeInt(0); // cred len
xdr_out.writeInt(0); // verifier AUTH_NULL
xdr_out.writeInt(0); // verf len
return xdr_out; return xdr_out;
} }
/* /*

View File

@ -32,6 +32,8 @@ import org.apache.hadoop.oncrpc.RpcCallCache.CacheEntry;
import org.apache.hadoop.oncrpc.RpcCallCache.ClientRequest; import org.apache.hadoop.oncrpc.RpcCallCache.ClientRequest;
import org.junit.Test; import org.junit.Test;
import static org.mockito.Mockito.*;
/** /**
* Unit tests for {@link RpcCallCache} * Unit tests for {@link RpcCallCache}
*/ */
@ -67,7 +69,7 @@ public class TestRpcCallCache {
validateInprogressCacheEntry(e); validateInprogressCacheEntry(e);
// Set call as completed // Set call as completed
XDR response = new XDR(); RpcResponse response = mock(RpcResponse.class);
cache.callCompleted(clientIp, xid, response); cache.callCompleted(clientIp, xid, response);
e = cache.checkOrAddToCache(clientIp, xid); e = cache.checkOrAddToCache(clientIp, xid);
validateCompletedCacheEntry(e, response); validateCompletedCacheEntry(e, response);
@ -79,7 +81,7 @@ public class TestRpcCallCache {
assertNull(c.getResponse()); assertNull(c.getResponse());
} }
private void validateCompletedCacheEntry(CacheEntry c, XDR response) { private void validateCompletedCacheEntry(CacheEntry c, RpcResponse response) {
assertFalse(c.isInProgress()); assertFalse(c.isInProgress());
assertTrue(c.isCompleted()); assertTrue(c.isCompleted());
assertEquals(response, c.getResponse()); assertEquals(response, c.getResponse());
@ -93,7 +95,7 @@ public class TestRpcCallCache {
assertFalse(c.isCompleted()); assertFalse(c.isCompleted());
assertNull(c.getResponse()); assertNull(c.getResponse());
XDR response = new XDR(); RpcResponse response = mock(RpcResponse.class);
c.setResponse(response); c.setResponse(response);
validateCompletedCacheEntry(c, response); validateCompletedCacheEntry(c, response);
} }

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.nfs.mount;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -38,10 +39,15 @@ import org.apache.hadoop.nfs.nfs3.FileHandle;
import org.apache.hadoop.nfs.nfs3.Nfs3Status; import org.apache.hadoop.nfs.nfs3.Nfs3Status;
import org.apache.hadoop.oncrpc.RpcAcceptedReply; import org.apache.hadoop.oncrpc.RpcAcceptedReply;
import org.apache.hadoop.oncrpc.RpcCall; import org.apache.hadoop.oncrpc.RpcCall;
import org.apache.hadoop.oncrpc.RpcInfo;
import org.apache.hadoop.oncrpc.RpcProgram; import org.apache.hadoop.oncrpc.RpcProgram;
import org.apache.hadoop.oncrpc.RpcResponse;
import org.apache.hadoop.oncrpc.RpcUtil;
import org.apache.hadoop.oncrpc.XDR; import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.security.VerifierNone; import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.jboss.netty.channel.Channel; import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.ChannelHandlerContext;
/** /**
* RPC program corresponding to mountd daemon. See {@link Mountd}. * RPC program corresponding to mountd daemon. See {@link Mountd}.
@ -77,7 +83,7 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface {
throws IOException { throws IOException {
// Note that RPC cache is not enabled // Note that RPC cache is not enabled
super("mountd", "localhost", config.getInt("nfs3.mountd.port", PORT), super("mountd", "localhost", config.getInt("nfs3.mountd.port", PORT),
PROGRAM, VERSION_1, VERSION_3, 0); PROGRAM, VERSION_1, VERSION_3);
this.hostsMatcher = NfsExports.getInstance(config); this.hostsMatcher = NfsExports.getInstance(config);
this.mounts = Collections.synchronizedList(new ArrayList<MountEntry>()); this.mounts = Collections.synchronizedList(new ArrayList<MountEntry>());
@ -173,10 +179,16 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface {
} }
@Override @Override
public XDR handleInternal(RpcCall rpcCall, XDR xdr, XDR out, public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
InetAddress client, Channel channel) { RpcCall rpcCall = (RpcCall) info.header();
final MNTPROC mntproc = MNTPROC.fromValue(rpcCall.getProcedure()); final MNTPROC mntproc = MNTPROC.fromValue(rpcCall.getProcedure());
int xid = rpcCall.getXid(); int xid = rpcCall.getXid();
byte[] data = new byte[info.data().readableBytes()];
info.data().readBytes(data);
XDR xdr = new XDR(data);
XDR out = new XDR();
InetAddress client = ((InetSocketAddress) info.remoteAddress()).getAddress();
if (mntproc == MNTPROC.NULL) { if (mntproc == MNTPROC.NULL) {
out = nullOp(out, xid, client); out = nullOp(out, xid, client);
} else if (mntproc == MNTPROC.MNT) { } else if (mntproc == MNTPROC.MNT) {
@ -198,7 +210,9 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface {
RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write( RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write(
out); out);
} }
return out; ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer());
RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
RpcUtil.sendRpcResponse(ctx, rsp);
} }
@Override @Override

View File

@ -21,6 +21,7 @@ import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.EnumSet; import java.util.EnumSet;
@ -103,9 +104,13 @@ import org.apache.hadoop.nfs.nfs3.response.WccAttr;
import org.apache.hadoop.nfs.nfs3.response.WccData; import org.apache.hadoop.nfs.nfs3.response.WccData;
import org.apache.hadoop.oncrpc.RpcAcceptedReply; import org.apache.hadoop.oncrpc.RpcAcceptedReply;
import org.apache.hadoop.oncrpc.RpcCall; import org.apache.hadoop.oncrpc.RpcCall;
import org.apache.hadoop.oncrpc.RpcCallCache;
import org.apache.hadoop.oncrpc.RpcDeniedReply; import org.apache.hadoop.oncrpc.RpcDeniedReply;
import org.apache.hadoop.oncrpc.RpcInfo;
import org.apache.hadoop.oncrpc.RpcProgram; import org.apache.hadoop.oncrpc.RpcProgram;
import org.apache.hadoop.oncrpc.RpcReply; import org.apache.hadoop.oncrpc.RpcReply;
import org.apache.hadoop.oncrpc.RpcResponse;
import org.apache.hadoop.oncrpc.RpcUtil;
import org.apache.hadoop.oncrpc.XDR; import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.security.Credentials; import org.apache.hadoop.oncrpc.security.Credentials;
import org.apache.hadoop.oncrpc.security.CredentialsSys; import org.apache.hadoop.oncrpc.security.CredentialsSys;
@ -115,7 +120,10 @@ import org.apache.hadoop.oncrpc.security.SysSecurityHandler;
import org.apache.hadoop.oncrpc.security.Verifier; import org.apache.hadoop.oncrpc.security.Verifier;
import org.apache.hadoop.oncrpc.security.VerifierNone; import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
/** /**
* RPC program corresponding to nfs daemon. See {@link Nfs3}. * RPC program corresponding to nfs daemon. See {@link Nfs3}.
@ -150,14 +158,15 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
private Statistics statistics; private Statistics statistics;
private String writeDumpDir; // The dir save dump files private String writeDumpDir; // The dir save dump files
private final RpcCallCache rpcCallCache;
public RpcProgramNfs3() throws IOException { public RpcProgramNfs3() throws IOException {
this(new Configuration()); this(new Configuration());
} }
public RpcProgramNfs3(Configuration config) public RpcProgramNfs3(Configuration config) throws IOException {
throws IOException {
super("NFS3", "localhost", Nfs3Constant.PORT, Nfs3Constant.PROGRAM, super("NFS3", "localhost", Nfs3Constant.PORT, Nfs3Constant.PROGRAM,
Nfs3Constant.VERSION, Nfs3Constant.VERSION, 100); Nfs3Constant.VERSION, Nfs3Constant.VERSION);
config.set(FsPermission.UMASK_LABEL, "000"); config.set(FsPermission.UMASK_LABEL, "000");
iug = new IdUserGroup(); iug = new IdUserGroup();
@ -183,6 +192,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
} else { } else {
clearDirectory(writeDumpDir); clearDirectory(writeDumpDir);
} }
rpcCallCache = new RpcCallCache("NFS3", 256);
} }
private void clearDirectory(String writeDumpDir) throws IOException { private void clearDirectory(String writeDumpDir) throws IOException {
@ -213,8 +224,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
} }
@Override @Override
public GETATTR3Response getattr(XDR xdr, public GETATTR3Response getattr(XDR xdr, SecurityHandler securityHandler,
SecurityHandler securityHandler, InetAddress client) { InetAddress client) {
GETATTR3Response response = new GETATTR3Response(Nfs3Status.NFS3_OK); GETATTR3Response response = new GETATTR3Response(Nfs3Status.NFS3_OK);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@ -294,8 +305,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
} }
@Override @Override
public SETATTR3Response setattr(XDR xdr, public SETATTR3Response setattr(XDR xdr, SecurityHandler securityHandler,
SecurityHandler securityHandler, InetAddress client) { InetAddress client) {
SETATTR3Response response = new SETATTR3Response(Nfs3Status.NFS3_OK); SETATTR3Response response = new SETATTR3Response(Nfs3Status.NFS3_OK);
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.get(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
@ -370,8 +381,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
} }
@Override @Override
public LOOKUP3Response lookup(XDR xdr, public LOOKUP3Response lookup(XDR xdr, SecurityHandler securityHandler,
SecurityHandler securityHandler, InetAddress client) { InetAddress client) {
LOOKUP3Response response = new LOOKUP3Response(Nfs3Status.NFS3_OK); LOOKUP3Response response = new LOOKUP3Response(Nfs3Status.NFS3_OK);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@ -432,8 +443,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
} }
@Override @Override
public ACCESS3Response access(XDR xdr, public ACCESS3Response access(XDR xdr, SecurityHandler securityHandler,
SecurityHandler securityHandler, InetAddress client) { InetAddress client) {
ACCESS3Response response = new ACCESS3Response(Nfs3Status.NFS3_OK); ACCESS3Response response = new ACCESS3Response(Nfs3Status.NFS3_OK);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@ -574,7 +585,6 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
long offset = request.getOffset(); long offset = request.getOffset();
int count = request.getCount(); int count = request.getCount();
FileHandle handle = request.getHandle(); FileHandle handle = request.getHandle();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("NFS READ fileId: " + handle.getFileId() + " offset: " + offset LOG.debug("NFS READ fileId: " + handle.getFileId() + " offset: " + offset
@ -720,8 +730,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
} }
@Override @Override
public CREATE3Response create(XDR xdr, public CREATE3Response create(XDR xdr, SecurityHandler securityHandler,
SecurityHandler securityHandler, InetAddress client) { InetAddress client) {
CREATE3Response response = new CREATE3Response(Nfs3Status.NFS3_OK); CREATE3Response response = new CREATE3Response(Nfs3Status.NFS3_OK);
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.get(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
@ -973,8 +983,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
} }
String fileIdPath = dirFileIdPath + "/" + fileName; String fileIdPath = dirFileIdPath + "/" + fileName;
HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient, HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient, fileIdPath);
fileIdPath);
if (fstat == null) { if (fstat == null) {
WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr), WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
preOpDirAttr); preOpDirAttr);
@ -1056,8 +1065,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
} }
String fileIdPath = dirFileIdPath + "/" + fileName; String fileIdPath = dirFileIdPath + "/" + fileName;
HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient, HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient, fileIdPath);
fileIdPath);
if (fstat == null) { if (fstat == null) {
return new RMDIR3Response(Nfs3Status.NFS3ERR_NOENT, errWcc); return new RMDIR3Response(Nfs3Status.NFS3ERR_NOENT, errWcc);
} }
@ -1098,8 +1106,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
} }
@Override @Override
public RENAME3Response rename(XDR xdr, public RENAME3Response rename(XDR xdr, SecurityHandler securityHandler,
SecurityHandler securityHandler, InetAddress client) { InetAddress client) {
RENAME3Response response = new RENAME3Response(Nfs3Status.NFS3_OK); RENAME3Response response = new RENAME3Response(Nfs3Status.NFS3_OK);
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.get(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
@ -1245,13 +1253,14 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
} }
} }
public READDIR3Response link(XDR xdr, SecurityHandler securityHandler, InetAddress client) { public READDIR3Response link(XDR xdr, SecurityHandler securityHandler,
InetAddress client) {
return new READDIR3Response(Nfs3Status.NFS3ERR_NOTSUPP); return new READDIR3Response(Nfs3Status.NFS3ERR_NOTSUPP);
} }
@Override @Override
public READDIR3Response readdir(XDR xdr, public READDIR3Response readdir(XDR xdr, SecurityHandler securityHandler,
SecurityHandler securityHandler, InetAddress client) { InetAddress client) {
READDIR3Response response = new READDIR3Response(Nfs3Status.NFS3_OK); READDIR3Response response = new READDIR3Response(Nfs3Status.NFS3_OK);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@ -1540,8 +1549,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
} }
@Override @Override
public FSSTAT3Response fsstat(XDR xdr, public FSSTAT3Response fsstat(XDR xdr, SecurityHandler securityHandler,
SecurityHandler securityHandler, InetAddress client) { InetAddress client) {
FSSTAT3Response response = new FSSTAT3Response(Nfs3Status.NFS3_OK); FSSTAT3Response response = new FSSTAT3Response(Nfs3Status.NFS3_OK);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@ -1598,8 +1607,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
} }
@Override @Override
public FSINFO3Response fsinfo(XDR xdr, public FSINFO3Response fsinfo(XDR xdr, SecurityHandler securityHandler,
SecurityHandler securityHandler, InetAddress client) { InetAddress client) {
FSINFO3Response response = new FSINFO3Response(Nfs3Status.NFS3_OK); FSINFO3Response response = new FSINFO3Response(Nfs3Status.NFS3_OK);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@ -1650,8 +1659,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
} }
@Override @Override
public PATHCONF3Response pathconf(XDR xdr, public PATHCONF3Response pathconf(XDR xdr, SecurityHandler securityHandler,
SecurityHandler securityHandler, InetAddress client) { InetAddress client) {
PATHCONF3Response response = new PATHCONF3Response(Nfs3Status.NFS3_OK); PATHCONF3Response response = new PATHCONF3Response(Nfs3Status.NFS3_OK);
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@ -1697,8 +1706,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
} }
@Override @Override
public COMMIT3Response commit(XDR xdr, public COMMIT3Response commit(XDR xdr, SecurityHandler securityHandler,
SecurityHandler securityHandler, InetAddress client) { InetAddress client) {
COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK); COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK);
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.get(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
@ -1776,25 +1785,53 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
} }
@Override @Override
public XDR handleInternal(RpcCall rpcCall, final XDR xdr, XDR out, public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
InetAddress client, Channel channel) { RpcCall rpcCall = (RpcCall) info.header();
final NFSPROC3 nfsproc3 = NFSPROC3.fromValue(rpcCall.getProcedure()); final NFSPROC3 nfsproc3 = NFSPROC3.fromValue(rpcCall.getProcedure());
int xid = rpcCall.getXid(); int xid = rpcCall.getXid();
byte[] data = new byte[info.data().readableBytes()];
info.data().readBytes(data);
XDR xdr = new XDR(data);
XDR out = new XDR();
InetAddress client = ((InetSocketAddress) info.remoteAddress())
.getAddress();
Channel channel = info.channel();
Credentials credentials = rpcCall.getCredential(); Credentials credentials = rpcCall.getCredential();
// Ignore auth only for NFSPROC3_NULL, especially for Linux clients. // Ignore auth only for NFSPROC3_NULL, especially for Linux clients.
if (nfsproc3 != NFSPROC3.NULL) { if (nfsproc3 != NFSPROC3.NULL) {
if (rpcCall.getCredential().getFlavor() != AuthFlavor.AUTH_SYS if (credentials.getFlavor() != AuthFlavor.AUTH_SYS
&& rpcCall.getCredential().getFlavor() != AuthFlavor.RPCSEC_GSS) { && credentials.getFlavor() != AuthFlavor.RPCSEC_GSS) {
LOG.info("Wrong RPC AUTH flavor, " LOG.info("Wrong RPC AUTH flavor, " + credentials.getFlavor()
+ rpcCall.getCredential().getFlavor()
+ " is not AUTH_SYS or RPCSEC_GSS."); + " is not AUTH_SYS or RPCSEC_GSS.");
XDR reply = new XDR(); XDR reply = new XDR();
RpcDeniedReply rdr = new RpcDeniedReply(xid, RpcDeniedReply rdr = new RpcDeniedReply(xid,
RpcReply.ReplyState.MSG_ACCEPTED, RpcReply.ReplyState.MSG_ACCEPTED,
RpcDeniedReply.RejectState.AUTH_ERROR, new VerifierNone()); RpcDeniedReply.RejectState.AUTH_ERROR, new VerifierNone());
rdr.write(reply); rdr.write(reply);
return reply;
ChannelBuffer buf = ChannelBuffers.wrappedBuffer(reply.asReadOnlyWrap()
.buffer());
RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
RpcUtil.sendRpcResponse(ctx, rsp);
return;
}
}
if (!isIdempotent(rpcCall)) {
RpcCallCache.CacheEntry entry = rpcCallCache.checkOrAddToCache(client,
xid);
if (entry != null) { // in cache
if (entry.isCompleted()) {
LOG.info("Sending the cached reply to retransmitted request " + xid);
RpcUtil.sendRpcResponse(ctx, entry.getResponse());
return;
} else { // else request is in progress
LOG.info("Retransmitted request, transaction still in progress "
+ xid);
// Ignore the request and do nothing
return;
}
} }
} }
@ -1862,12 +1899,24 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write( RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write(
out); out);
} }
if (response != null) { if (response == null) {
// TODO: currently we just return VerifierNone if (LOG.isDebugEnabled()) {
out = response.writeHeaderAndResponse(out, xid, new VerifierNone()); LOG.debug("No sync response, expect an async response for request XID="
+ rpcCall.getXid());
}
return;
}
// TODO: currently we just return VerifierNone
out = response.writeHeaderAndResponse(out, xid, new VerifierNone());
ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
.buffer());
RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
if (!isIdempotent(rpcCall)) {
rpcCallCache.callCompleted(client, xid, rsp);
} }
return out; RpcUtil.sendRpcResponse(ctx, rsp);
} }
@Override @Override

View File

@ -345,6 +345,9 @@ Release 2.1.2 - UNRELEASED
NEW FEATURES NEW FEATURES
HDFS-5230. Introduce RpcInfo to decouple XDR classes from the RPC API.
(Haohui Mai via brandonli)
IMPROVEMENTS IMPROVEMENTS
HDFS-5246. Make Hadoop nfs server port and mount daemon port HDFS-5246. Make Hadoop nfs server port and mount daemon port