HDFS-5469. Add configuration property for the sub-directroy export path. Contributed by Brandon Li
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1540443 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fe08488a22
commit
ec9ec0084e
|
@ -21,9 +21,9 @@ package org.apache.hadoop.mount;
|
||||||
* Represents a mount entry.
|
* Represents a mount entry.
|
||||||
*/
|
*/
|
||||||
public class MountEntry {
|
public class MountEntry {
|
||||||
/** Host correspoinding to the mount entry */
|
/** Host corresponding to the mount entry */
|
||||||
private final String host;
|
private final String host;
|
||||||
/** Path correspoinding to the mount entry */
|
/** Path corresponding to the mount entry */
|
||||||
private final String path;
|
private final String path;
|
||||||
|
|
||||||
public MountEntry(String host, String path) {
|
public MountEntry(String host, String path) {
|
||||||
|
@ -31,11 +31,11 @@ public class MountEntry {
|
||||||
this.path = path;
|
this.path = path;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String host() {
|
public String getHost() {
|
||||||
return this.host;
|
return this.host;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String path() {
|
public String getPath() {
|
||||||
return this.path;
|
return this.path;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -49,7 +49,7 @@ public class MountEntry {
|
||||||
}
|
}
|
||||||
|
|
||||||
MountEntry m = (MountEntry) o;
|
MountEntry m = (MountEntry) o;
|
||||||
return host().equals(m.host()) && path().equals(m.path());
|
return getHost().equals(m.getHost()) && getPath().equals(m.getPath());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -54,8 +54,8 @@ public class MountResponse {
|
||||||
RpcAcceptedReply.getAcceptInstance(xid, new VerifierNone()).write(xdr);
|
RpcAcceptedReply.getAcceptInstance(xid, new VerifierNone()).write(xdr);
|
||||||
for (MountEntry mountEntry : mounts) {
|
for (MountEntry mountEntry : mounts) {
|
||||||
xdr.writeBoolean(true); // Value follows yes
|
xdr.writeBoolean(true); // Value follows yes
|
||||||
xdr.writeString(mountEntry.host());
|
xdr.writeString(mountEntry.getHost());
|
||||||
xdr.writeString(mountEntry.path());
|
xdr.writeString(mountEntry.getPath());
|
||||||
}
|
}
|
||||||
xdr.writeBoolean(false); // Value follows no
|
xdr.writeBoolean(false); // Value follows no
|
||||||
return xdr;
|
return xdr;
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
package org.apache.hadoop.mount;
|
package org.apache.hadoop.mount;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import org.apache.hadoop.oncrpc.RpcProgram;
|
import org.apache.hadoop.oncrpc.RpcProgram;
|
||||||
import org.apache.hadoop.oncrpc.SimpleTcpServer;
|
import org.apache.hadoop.oncrpc.SimpleTcpServer;
|
||||||
|
@ -34,6 +33,8 @@ import org.apache.hadoop.portmap.PortmapMapping;
|
||||||
*/
|
*/
|
||||||
abstract public class MountdBase {
|
abstract public class MountdBase {
|
||||||
private final RpcProgram rpcProgram;
|
private final RpcProgram rpcProgram;
|
||||||
|
private int udpBoundPort; // Will set after server starts
|
||||||
|
private int tcpBoundPort; // Will set after server starts
|
||||||
|
|
||||||
public RpcProgram getRpcProgram() {
|
public RpcProgram getRpcProgram() {
|
||||||
return rpcProgram;
|
return rpcProgram;
|
||||||
|
@ -41,10 +42,10 @@ abstract public class MountdBase {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor
|
* Constructor
|
||||||
* @param exports
|
* @param program
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public MountdBase(List<String> exports, RpcProgram program) throws IOException {
|
public MountdBase(RpcProgram program) throws IOException {
|
||||||
rpcProgram = program;
|
rpcProgram = program;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -54,6 +55,7 @@ abstract public class MountdBase {
|
||||||
rpcProgram, 1);
|
rpcProgram, 1);
|
||||||
rpcProgram.startDaemons();
|
rpcProgram.startDaemons();
|
||||||
udpServer.run();
|
udpServer.run();
|
||||||
|
udpBoundPort = udpServer.getBoundPort();
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Start TCP server */
|
/* Start TCP server */
|
||||||
|
@ -62,14 +64,15 @@ abstract public class MountdBase {
|
||||||
rpcProgram, 1);
|
rpcProgram, 1);
|
||||||
rpcProgram.startDaemons();
|
rpcProgram.startDaemons();
|
||||||
tcpServer.run();
|
tcpServer.run();
|
||||||
|
tcpBoundPort = tcpServer.getBoundPort();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void start(boolean register) {
|
public void start(boolean register) {
|
||||||
startUDPServer();
|
startUDPServer();
|
||||||
startTCPServer();
|
startTCPServer();
|
||||||
if (register) {
|
if (register) {
|
||||||
rpcProgram.register(PortmapMapping.TRANSPORT_UDP);
|
rpcProgram.register(PortmapMapping.TRANSPORT_UDP, udpBoundPort);
|
||||||
rpcProgram.register(PortmapMapping.TRANSPORT_TCP);
|
rpcProgram.register(PortmapMapping.TRANSPORT_TCP, tcpBoundPort);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,6 +33,7 @@ public abstract class Nfs3Base {
|
||||||
public static final Log LOG = LogFactory.getLog(Nfs3Base.class);
|
public static final Log LOG = LogFactory.getLog(Nfs3Base.class);
|
||||||
private final RpcProgram rpcProgram;
|
private final RpcProgram rpcProgram;
|
||||||
private final int nfsPort;
|
private final int nfsPort;
|
||||||
|
private int nfsBoundPort; // Will set after server starts
|
||||||
|
|
||||||
public RpcProgram getRpcProgram() {
|
public RpcProgram getRpcProgram() {
|
||||||
return rpcProgram;
|
return rpcProgram;
|
||||||
|
@ -40,20 +41,16 @@ public abstract class Nfs3Base {
|
||||||
|
|
||||||
protected Nfs3Base(RpcProgram rpcProgram, Configuration conf) {
|
protected Nfs3Base(RpcProgram rpcProgram, Configuration conf) {
|
||||||
this.rpcProgram = rpcProgram;
|
this.rpcProgram = rpcProgram;
|
||||||
this.nfsPort = conf.getInt("nfs3.server.port", Nfs3Constant.PORT);
|
this.nfsPort = conf.getInt(Nfs3Constant.NFS3_SERVER_PORT,
|
||||||
|
Nfs3Constant.NFS3_SERVER_PORT_DEFAULT);
|
||||||
LOG.info("NFS server port set to: " + nfsPort);
|
LOG.info("NFS server port set to: " + nfsPort);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Nfs3Base(RpcProgram rpcProgram) {
|
|
||||||
this.rpcProgram = rpcProgram;
|
|
||||||
this.nfsPort = Nfs3Constant.PORT;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void start(boolean register) {
|
public void start(boolean register) {
|
||||||
startTCPServer(); // Start TCP server
|
startTCPServer(); // Start TCP server
|
||||||
|
|
||||||
if (register) {
|
if (register) {
|
||||||
rpcProgram.register(PortmapMapping.TRANSPORT_TCP);
|
rpcProgram.register(PortmapMapping.TRANSPORT_TCP, nfsBoundPort);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -62,5 +59,6 @@ public abstract class Nfs3Base {
|
||||||
rpcProgram, 0);
|
rpcProgram, 0);
|
||||||
rpcProgram.startDaemons();
|
rpcProgram.startDaemons();
|
||||||
tcpServer.run();
|
tcpServer.run();
|
||||||
|
nfsBoundPort = tcpServer.getBoundPort();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,7 +26,8 @@ public class Nfs3Constant {
|
||||||
public final static int SUN_RPCBIND = 111;
|
public final static int SUN_RPCBIND = 111;
|
||||||
|
|
||||||
// The IP port number for NFS.
|
// The IP port number for NFS.
|
||||||
public final static int PORT = 2049;
|
public final static String NFS3_SERVER_PORT = "nfs3.server.port";
|
||||||
|
public final static int NFS3_SERVER_PORT_DEFAULT = 2049;
|
||||||
|
|
||||||
// The RPC program number for NFS.
|
// The RPC program number for NFS.
|
||||||
public final static int PROGRAM = 100003;
|
public final static int PROGRAM = 100003;
|
||||||
|
@ -213,4 +214,7 @@ public class Nfs3Constant {
|
||||||
|
|
||||||
public final static String UNKNOWN_USER = "nobody";
|
public final static String UNKNOWN_USER = "nobody";
|
||||||
public final static String UNKNOWN_GROUP = "nobody";
|
public final static String UNKNOWN_GROUP = "nobody";
|
||||||
|
|
||||||
|
public final static String EXPORT_POINT = "dfs.nfs3.export.point";
|
||||||
|
public final static String EXPORT_POINT_DEFAULT = "/";
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,7 +40,7 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
|
||||||
public static final int RPCB_PORT = 111;
|
public static final int RPCB_PORT = 111;
|
||||||
private final String program;
|
private final String program;
|
||||||
private final String host;
|
private final String host;
|
||||||
private final int port;
|
private int port; // Ephemeral port is chosen later
|
||||||
private final int progNumber;
|
private final int progNumber;
|
||||||
private final int lowProgVersion;
|
private final int lowProgVersion;
|
||||||
private final int highProgVersion;
|
private final int highProgVersion;
|
||||||
|
@ -68,21 +68,19 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
|
||||||
/**
|
/**
|
||||||
* Register this program with the local portmapper.
|
* Register this program with the local portmapper.
|
||||||
*/
|
*/
|
||||||
public void register(int transport) {
|
public void register(int transport, int boundPort) {
|
||||||
|
if (boundPort != port) {
|
||||||
|
LOG.info("The bound port is " + boundPort
|
||||||
|
+ ", different with configured port " + port);
|
||||||
|
port = boundPort;
|
||||||
|
}
|
||||||
// Register all the program versions with portmapper for a given transport
|
// Register all the program versions with portmapper for a given transport
|
||||||
for (int vers = lowProgVersion; vers <= highProgVersion; vers++) {
|
for (int vers = lowProgVersion; vers <= highProgVersion; vers++) {
|
||||||
register(vers, transport);
|
PortmapMapping mapEntry = new PortmapMapping(progNumber, vers, transport,
|
||||||
}
|
port);
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Register this program with the local portmapper.
|
|
||||||
*/
|
|
||||||
private void register(int progVersion, int transport) {
|
|
||||||
PortmapMapping mapEntry = new PortmapMapping(progNumber, progVersion,
|
|
||||||
transport, port);
|
|
||||||
register(mapEntry);
|
register(mapEntry);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Register the program with Portmap or Rpcbind
|
* Register the program with Portmap or Rpcbind
|
||||||
|
|
|
@ -23,6 +23,7 @@ import java.util.concurrent.Executors;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.jboss.netty.bootstrap.ServerBootstrap;
|
import org.jboss.netty.bootstrap.ServerBootstrap;
|
||||||
|
import org.jboss.netty.channel.Channel;
|
||||||
import org.jboss.netty.channel.ChannelFactory;
|
import org.jboss.netty.channel.ChannelFactory;
|
||||||
import org.jboss.netty.channel.ChannelPipeline;
|
import org.jboss.netty.channel.ChannelPipeline;
|
||||||
import org.jboss.netty.channel.ChannelPipelineFactory;
|
import org.jboss.netty.channel.ChannelPipelineFactory;
|
||||||
|
@ -36,6 +37,7 @@ import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
|
||||||
public class SimpleTcpServer {
|
public class SimpleTcpServer {
|
||||||
public static final Log LOG = LogFactory.getLog(SimpleTcpServer.class);
|
public static final Log LOG = LogFactory.getLog(SimpleTcpServer.class);
|
||||||
protected final int port;
|
protected final int port;
|
||||||
|
protected int boundPort = -1; // Will be set after server starts
|
||||||
protected final SimpleChannelUpstreamHandler rpcProgram;
|
protected final SimpleChannelUpstreamHandler rpcProgram;
|
||||||
|
|
||||||
/** The maximum number of I/O worker threads */
|
/** The maximum number of I/O worker threads */
|
||||||
|
@ -79,9 +81,16 @@ public class SimpleTcpServer {
|
||||||
bootstrap.setOption("child.keepAlive", true);
|
bootstrap.setOption("child.keepAlive", true);
|
||||||
|
|
||||||
// Listen to TCP port
|
// Listen to TCP port
|
||||||
bootstrap.bind(new InetSocketAddress(port));
|
Channel ch = bootstrap.bind(new InetSocketAddress(port));
|
||||||
|
InetSocketAddress socketAddr = (InetSocketAddress) ch.getLocalAddress();
|
||||||
|
boundPort = socketAddr.getPort();
|
||||||
|
|
||||||
LOG.info("Started listening to TCP requests at port " + port + " for "
|
LOG.info("Started listening to TCP requests at port " + boundPort + " for "
|
||||||
+ rpcProgram + " with workerCount " + workerCount);
|
+ rpcProgram + " with workerCount " + workerCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// boundPort will be set only after server starts
|
||||||
|
public int getBoundPort() {
|
||||||
|
return this.boundPort;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,7 @@ import java.util.concurrent.Executors;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
|
import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
|
||||||
|
import org.jboss.netty.channel.Channel;
|
||||||
import org.jboss.netty.channel.Channels;
|
import org.jboss.netty.channel.Channels;
|
||||||
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
|
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
|
||||||
import org.jboss.netty.channel.socket.DatagramChannelFactory;
|
import org.jboss.netty.channel.socket.DatagramChannelFactory;
|
||||||
|
@ -39,6 +40,7 @@ public class SimpleUdpServer {
|
||||||
protected final int port;
|
protected final int port;
|
||||||
protected final SimpleChannelUpstreamHandler rpcProgram;
|
protected final SimpleChannelUpstreamHandler rpcProgram;
|
||||||
protected final int workerCount;
|
protected final int workerCount;
|
||||||
|
protected int boundPort = -1; // Will be set after server starts
|
||||||
|
|
||||||
public SimpleUdpServer(int port, SimpleChannelUpstreamHandler program, int workerCount) {
|
public SimpleUdpServer(int port, SimpleChannelUpstreamHandler program, int workerCount) {
|
||||||
this.port = port;
|
this.port = port;
|
||||||
|
@ -61,9 +63,16 @@ public class SimpleUdpServer {
|
||||||
b.setOption("receiveBufferSize", RECEIVE_BUFFER_SIZE);
|
b.setOption("receiveBufferSize", RECEIVE_BUFFER_SIZE);
|
||||||
|
|
||||||
// Listen to the UDP port
|
// Listen to the UDP port
|
||||||
b.bind(new InetSocketAddress(port));
|
Channel ch = b.bind(new InetSocketAddress(port));
|
||||||
|
InetSocketAddress socketAddr = (InetSocketAddress) ch.getLocalAddress();
|
||||||
|
boundPort = socketAddr.getPort();
|
||||||
|
|
||||||
LOG.info("Started listening to UDP requests at port " + port + " for "
|
LOG.info("Started listening to UDP requests at port " + boundPort + " for "
|
||||||
+ rpcProgram + " with workerCount " + workerCount);
|
+ rpcProgram + " with workerCount " + workerCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// boundPort will be set only after server starts
|
||||||
|
public int getBoundPort() {
|
||||||
|
return this.boundPort;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,14 +31,14 @@ public class Portmap {
|
||||||
public static final Log LOG = LogFactory.getLog(Portmap.class);
|
public static final Log LOG = LogFactory.getLog(Portmap.class);
|
||||||
|
|
||||||
private static void startUDPServer(RpcProgramPortmap rpcProgram) {
|
private static void startUDPServer(RpcProgramPortmap rpcProgram) {
|
||||||
rpcProgram.register(PortmapMapping.TRANSPORT_UDP);
|
rpcProgram.register(PortmapMapping.TRANSPORT_UDP, RpcProgram.RPCB_PORT);
|
||||||
SimpleUdpServer udpServer = new SimpleUdpServer(RpcProgram.RPCB_PORT,
|
SimpleUdpServer udpServer = new SimpleUdpServer(RpcProgram.RPCB_PORT,
|
||||||
rpcProgram, 1);
|
rpcProgram, 1);
|
||||||
udpServer.run();
|
udpServer.run();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void startTCPServer(final RpcProgramPortmap rpcProgram) {
|
private static void startTCPServer(final RpcProgramPortmap rpcProgram) {
|
||||||
rpcProgram.register(PortmapMapping.TRANSPORT_TCP);
|
rpcProgram.register(PortmapMapping.TRANSPORT_TCP, RpcProgram.RPCB_PORT);
|
||||||
SimpleTcpServer tcpServer = new SimpleTcpServer(RpcProgram.RPCB_PORT,
|
SimpleTcpServer tcpServer = new SimpleTcpServer(RpcProgram.RPCB_PORT,
|
||||||
rpcProgram, 1);
|
rpcProgram, 1);
|
||||||
tcpServer.run();
|
tcpServer.run();
|
||||||
|
|
|
@ -18,8 +18,6 @@
|
||||||
package org.apache.hadoop.hdfs.nfs.mount;
|
package org.apache.hadoop.hdfs.nfs.mount;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.mount.MountdBase;
|
import org.apache.hadoop.mount.MountdBase;
|
||||||
|
@ -32,23 +30,14 @@ import org.apache.hadoop.mount.MountdBase;
|
||||||
* handle for requested directory and returns it to the client.
|
* handle for requested directory and returns it to the client.
|
||||||
*/
|
*/
|
||||||
public class Mountd extends MountdBase {
|
public class Mountd extends MountdBase {
|
||||||
/**
|
|
||||||
* Constructor
|
|
||||||
* @param exports
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
public Mountd(List<String> exports) throws IOException {
|
|
||||||
super(exports, new RpcProgramMountd(exports));
|
|
||||||
}
|
|
||||||
|
|
||||||
public Mountd(List<String> exports, Configuration config) throws IOException {
|
public Mountd(Configuration config) throws IOException {
|
||||||
super(exports, new RpcProgramMountd(exports, config));
|
super(new RpcProgramMountd(config));
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws IOException {
|
public static void main(String[] args) throws IOException {
|
||||||
List<String> exports = new ArrayList<String>();
|
Configuration config = new Configuration();
|
||||||
exports.add("/");
|
Mountd mountd = new Mountd(config);
|
||||||
Mountd mountd = new Mountd(exports);
|
|
||||||
mountd.start(true);
|
mountd.start(true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.mount.MountResponse;
|
||||||
import org.apache.hadoop.nfs.AccessPrivilege;
|
import org.apache.hadoop.nfs.AccessPrivilege;
|
||||||
import org.apache.hadoop.nfs.NfsExports;
|
import org.apache.hadoop.nfs.NfsExports;
|
||||||
import org.apache.hadoop.nfs.nfs3.FileHandle;
|
import org.apache.hadoop.nfs.nfs3.FileHandle;
|
||||||
|
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
|
||||||
import org.apache.hadoop.nfs.nfs3.Nfs3Status;
|
import org.apache.hadoop.nfs.nfs3.Nfs3Status;
|
||||||
import org.apache.hadoop.oncrpc.RpcAcceptedReply;
|
import org.apache.hadoop.oncrpc.RpcAcceptedReply;
|
||||||
import org.apache.hadoop.oncrpc.RpcCall;
|
import org.apache.hadoop.oncrpc.RpcCall;
|
||||||
|
@ -49,6 +50,8 @@ import org.jboss.netty.buffer.ChannelBuffer;
|
||||||
import org.jboss.netty.buffer.ChannelBuffers;
|
import org.jboss.netty.buffer.ChannelBuffers;
|
||||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* RPC program corresponding to mountd daemon. See {@link Mountd}.
|
* RPC program corresponding to mountd daemon. See {@link Mountd}.
|
||||||
*/
|
*/
|
||||||
|
@ -71,23 +74,15 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface {
|
||||||
|
|
||||||
private final NfsExports hostsMatcher;
|
private final NfsExports hostsMatcher;
|
||||||
|
|
||||||
public RpcProgramMountd() throws IOException {
|
public RpcProgramMountd(Configuration config) throws IOException {
|
||||||
this(new ArrayList<String>(0));
|
|
||||||
}
|
|
||||||
|
|
||||||
public RpcProgramMountd(List<String> exports) throws IOException {
|
|
||||||
this(exports, new Configuration());
|
|
||||||
}
|
|
||||||
|
|
||||||
public RpcProgramMountd(List<String> exports, Configuration config)
|
|
||||||
throws IOException {
|
|
||||||
// Note that RPC cache is not enabled
|
// Note that RPC cache is not enabled
|
||||||
super("mountd", "localhost", config.getInt("nfs3.mountd.port", PORT),
|
super("mountd", "localhost", config.getInt("nfs3.mountd.port", PORT),
|
||||||
PROGRAM, VERSION_1, VERSION_3);
|
PROGRAM, VERSION_1, VERSION_3);
|
||||||
|
exports = new ArrayList<String>();
|
||||||
|
exports.add(config.get(Nfs3Constant.EXPORT_POINT,
|
||||||
|
Nfs3Constant.EXPORT_POINT_DEFAULT));
|
||||||
this.hostsMatcher = NfsExports.getInstance(config);
|
this.hostsMatcher = NfsExports.getInstance(config);
|
||||||
this.mounts = Collections.synchronizedList(new ArrayList<MountEntry>());
|
this.mounts = Collections.synchronizedList(new ArrayList<MountEntry>());
|
||||||
this.exports = Collections.unmodifiableList(exports);
|
|
||||||
this.dfsClient = new DFSClient(NameNode.getAddress(config), config);
|
this.dfsClient = new DFSClient(NameNode.getAddress(config), config);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -200,7 +195,7 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface {
|
||||||
} else if (mntproc == MNTPROC.UMNTALL) {
|
} else if (mntproc == MNTPROC.UMNTALL) {
|
||||||
umntall(out, xid, client);
|
umntall(out, xid, client);
|
||||||
} else if (mntproc == MNTPROC.EXPORT) {
|
} else if (mntproc == MNTPROC.EXPORT) {
|
||||||
// Currently only support one NFS export "/"
|
// Currently only support one NFS export
|
||||||
List<NfsExports> hostsMatchers = new ArrayList<NfsExports>();
|
List<NfsExports> hostsMatchers = new ArrayList<NfsExports>();
|
||||||
hostsMatchers.add(hostsMatcher);
|
hostsMatchers.add(hostsMatcher);
|
||||||
out = MountResponse.writeExportList(out, xid, exports, hostsMatchers);
|
out = MountResponse.writeExportList(out, xid, exports, hostsMatchers);
|
||||||
|
@ -220,4 +215,9 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface {
|
||||||
// Not required, because cache is turned off
|
// Not required, because cache is turned off
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public List<String> getExports() {
|
||||||
|
return this.exports;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,6 +46,7 @@ public class AsyncDataService {
|
||||||
|
|
||||||
public AsyncDataService() {
|
public AsyncDataService() {
|
||||||
threadFactory = new ThreadFactory() {
|
threadFactory = new ThreadFactory() {
|
||||||
|
@Override
|
||||||
public Thread newThread(Runnable r) {
|
public Thread newThread(Runnable r) {
|
||||||
return new Thread(threadGroup, r);
|
return new Thread(threadGroup, r);
|
||||||
}
|
}
|
||||||
|
@ -129,6 +130,7 @@ public class AsyncDataService {
|
||||||
+ openFileCtx.getNextOffset();
|
+ openFileCtx.getNextOffset();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
openFileCtx.executeWriteBack();
|
openFileCtx.executeWriteBack();
|
||||||
|
|
|
@ -118,6 +118,7 @@ class DFSClientCache {
|
||||||
|
|
||||||
// Guava requires CacheLoader never returns null.
|
// Guava requires CacheLoader never returns null.
|
||||||
return ugi.doAs(new PrivilegedExceptionAction<DFSClient>() {
|
return ugi.doAs(new PrivilegedExceptionAction<DFSClient>() {
|
||||||
|
@Override
|
||||||
public DFSClient run() throws IOException {
|
public DFSClient run() throws IOException {
|
||||||
return new DFSClient(NameNode.getAddress(config), config);
|
return new DFSClient(NameNode.getAddress(config), config);
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,12 +18,9 @@
|
||||||
package org.apache.hadoop.hdfs.nfs.nfs3;
|
package org.apache.hadoop.hdfs.nfs.nfs3;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.nfs.mount.Mountd;
|
import org.apache.hadoop.hdfs.nfs.mount.Mountd;
|
||||||
import org.apache.hadoop.mount.MountdBase;
|
|
||||||
import org.apache.hadoop.nfs.nfs3.Nfs3Base;
|
import org.apache.hadoop.nfs.nfs3.Nfs3Base;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
||||||
|
@ -42,28 +39,24 @@ public class Nfs3 extends Nfs3Base {
|
||||||
Configuration.addDefaultResource("hdfs-site.xml");
|
Configuration.addDefaultResource("hdfs-site.xml");
|
||||||
}
|
}
|
||||||
|
|
||||||
public Nfs3(List<String> exports) throws IOException {
|
public Nfs3(Configuration conf) throws IOException {
|
||||||
super(new RpcProgramNfs3());
|
super(new RpcProgramNfs3(conf), conf);
|
||||||
mountd = new Mountd(exports);
|
mountd = new Mountd(conf);
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
public Nfs3(List<String> exports, Configuration config) throws IOException {
|
|
||||||
super(new RpcProgramNfs3(config), config);
|
|
||||||
mountd = new Mountd(exports, config);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Mountd getMountd() {
|
public Mountd getMountd() {
|
||||||
return mountd;
|
return mountd;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public void startServiceInternal(boolean register) throws IOException {
|
||||||
|
mountd.start(register); // Start mountd
|
||||||
|
start(register);
|
||||||
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws IOException {
|
public static void main(String[] args) throws IOException {
|
||||||
StringUtils.startupShutdownMessage(Nfs3.class, args, LOG);
|
StringUtils.startupShutdownMessage(Nfs3.class, args, LOG);
|
||||||
List<String> exports = new ArrayList<String>();
|
final Nfs3 nfsServer = new Nfs3(new Configuration());
|
||||||
exports.add("/");
|
nfsServer.startServiceInternal(true);
|
||||||
|
|
||||||
final Nfs3 nfsServer = new Nfs3(exports);
|
|
||||||
nfsServer.mountd.start(true); // Start mountd
|
|
||||||
nfsServer.start(true);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -163,12 +163,9 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
|
||||||
|
|
||||||
private final RpcCallCache rpcCallCache;
|
private final RpcCallCache rpcCallCache;
|
||||||
|
|
||||||
public RpcProgramNfs3() throws IOException {
|
|
||||||
this(new Configuration());
|
|
||||||
}
|
|
||||||
|
|
||||||
public RpcProgramNfs3(Configuration config) throws IOException {
|
public RpcProgramNfs3(Configuration config) throws IOException {
|
||||||
super("NFS3", "localhost", Nfs3Constant.PORT, Nfs3Constant.PROGRAM,
|
super("NFS3", "localhost", config.getInt(Nfs3Constant.NFS3_SERVER_PORT,
|
||||||
|
Nfs3Constant.NFS3_SERVER_PORT_DEFAULT), Nfs3Constant.PROGRAM,
|
||||||
Nfs3Constant.VERSION, Nfs3Constant.VERSION);
|
Nfs3Constant.VERSION, Nfs3Constant.VERSION);
|
||||||
|
|
||||||
config.set(FsPermission.UMASK_LABEL, "000");
|
config.set(FsPermission.UMASK_LABEL, "000");
|
||||||
|
|
|
@ -20,8 +20,6 @@ package org.apache.hadoop.hdfs.nfs;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -45,11 +43,13 @@ public class TestMountd {
|
||||||
.build();
|
.build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
|
|
||||||
|
// Use emphral port in case tests are running in parallel
|
||||||
|
config.setInt("nfs3.mountd.port", 0);
|
||||||
|
config.setInt("nfs3.server.port", 0);
|
||||||
|
|
||||||
// Start nfs
|
// Start nfs
|
||||||
List<String> exports = new ArrayList<String>();
|
Nfs3 nfs3 = new Nfs3(config);
|
||||||
exports.add("/");
|
nfs3.startServiceInternal(false);
|
||||||
Nfs3 nfs3 = new Nfs3(exports, config);
|
|
||||||
nfs3.start(false);
|
|
||||||
|
|
||||||
RpcProgramMountd mountd = (RpcProgramMountd) nfs3.getMountd()
|
RpcProgramMountd mountd = (RpcProgramMountd) nfs3.getMountd()
|
||||||
.getRpcProgram();
|
.getRpcProgram();
|
||||||
|
|
|
@ -23,6 +23,7 @@ import java.util.Arrays;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.nfs.nfs3.Nfs3Utils;
|
import org.apache.hadoop.hdfs.nfs.nfs3.Nfs3Utils;
|
||||||
import org.apache.hadoop.nfs.nfs3.FileHandle;
|
import org.apache.hadoop.nfs.nfs3.FileHandle;
|
||||||
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
|
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
|
||||||
|
@ -154,7 +155,9 @@ public class TestOutOfOrderWrite {
|
||||||
Arrays.fill(data3, (byte) 9);
|
Arrays.fill(data3, (byte) 9);
|
||||||
|
|
||||||
// NFS3 Create request
|
// NFS3 Create request
|
||||||
WriteClient client = new WriteClient("localhost", Nfs3Constant.PORT,
|
Configuration conf = new Configuration();
|
||||||
|
WriteClient client = new WriteClient("localhost", conf.getInt(
|
||||||
|
Nfs3Constant.NFS3_SERVER_PORT, Nfs3Constant.NFS3_SERVER_PORT_DEFAULT),
|
||||||
create(), false);
|
create(), false);
|
||||||
client.run();
|
client.run();
|
||||||
|
|
||||||
|
|
|
@ -22,7 +22,6 @@ import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -67,11 +66,13 @@ public class TestReaddir {
|
||||||
hdfs = cluster.getFileSystem();
|
hdfs = cluster.getFileSystem();
|
||||||
nn = cluster.getNameNode();
|
nn = cluster.getNameNode();
|
||||||
|
|
||||||
|
// Use emphral port in case tests are running in parallel
|
||||||
|
config.setInt("nfs3.mountd.port", 0);
|
||||||
|
config.setInt("nfs3.server.port", 0);
|
||||||
|
|
||||||
// Start nfs
|
// Start nfs
|
||||||
List<String> exports = new ArrayList<String>();
|
Nfs3 nfs3 = new Nfs3(config);
|
||||||
exports.add("/");
|
nfs3.startServiceInternal(false);
|
||||||
Nfs3 nfs3 = new Nfs3(exports, config);
|
|
||||||
nfs3.start(false);
|
|
||||||
|
|
||||||
nfsd = (RpcProgramNfs3) nfs3.getRpcProgram();
|
nfsd = (RpcProgramNfs3) nfs3.getRpcProgram();
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,65 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.nfs.nfs3;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.nfs.mount.Mountd;
|
||||||
|
import org.apache.hadoop.hdfs.nfs.mount.RpcProgramMountd;
|
||||||
|
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestExportsTable {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExportPoint() throws IOException {
|
||||||
|
Configuration config = new Configuration();
|
||||||
|
MiniDFSCluster cluster = null;
|
||||||
|
|
||||||
|
String exportPoint = "/myexport1";
|
||||||
|
config.setStrings(Nfs3Constant.EXPORT_POINT, exportPoint);
|
||||||
|
// Use emphral port in case tests are running in parallel
|
||||||
|
config.setInt("nfs3.mountd.port", 0);
|
||||||
|
config.setInt("nfs3.server.port", 0);
|
||||||
|
|
||||||
|
try {
|
||||||
|
cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
|
||||||
|
// Start nfs
|
||||||
|
final Nfs3 nfsServer = new Nfs3(config);
|
||||||
|
nfsServer.startServiceInternal(false);
|
||||||
|
|
||||||
|
Mountd mountd = nfsServer.getMountd();
|
||||||
|
RpcProgramMountd rpcMount = (RpcProgramMountd) mountd.getRpcProgram();
|
||||||
|
assertTrue(rpcMount.getExports().size() == 1);
|
||||||
|
|
||||||
|
String exportInMountd = rpcMount.getExports().get(0);
|
||||||
|
assertTrue(exportInMountd.equals(exportPoint));
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -23,10 +23,7 @@ import static org.junit.Assert.fail;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.ConcurrentMap;
|
|
||||||
import java.util.concurrent.ConcurrentNavigableMap;
|
import java.util.concurrent.ConcurrentNavigableMap;
|
||||||
|
|
||||||
import junit.framework.Assert;
|
import junit.framework.Assert;
|
||||||
|
@ -215,11 +212,13 @@ public class TestWrites {
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
client = new DFSClient(NameNode.getAddress(config), config);
|
client = new DFSClient(NameNode.getAddress(config), config);
|
||||||
|
|
||||||
|
// Use emphral port in case tests are running in parallel
|
||||||
|
config.setInt("nfs3.mountd.port", 0);
|
||||||
|
config.setInt("nfs3.server.port", 0);
|
||||||
|
|
||||||
// Start nfs
|
// Start nfs
|
||||||
List<String> exports = new ArrayList<String>();
|
Nfs3 nfs3 = new Nfs3(config);
|
||||||
exports.add("/");
|
nfs3.startServiceInternal(false);
|
||||||
Nfs3 nfs3 = new Nfs3(exports, config);
|
|
||||||
nfs3.start(false);
|
|
||||||
nfsd = (RpcProgramNfs3) nfs3.getRpcProgram();
|
nfsd = (RpcProgramNfs3) nfs3.getRpcProgram();
|
||||||
|
|
||||||
HdfsFileStatus status = client.getFileInfo("/");
|
HdfsFileStatus status = client.getFileInfo("/");
|
||||||
|
|
|
@ -613,6 +613,9 @@ Release 2.2.1 - UNRELEASED
|
||||||
|
|
||||||
HDFS-5364. Add OpenFileCtx cache. (brandonli)
|
HDFS-5364. Add OpenFileCtx cache. (brandonli)
|
||||||
|
|
||||||
|
HDFS-5469. Add configuration property for the sub-directroy export path
|
||||||
|
(brandonli)
|
||||||
|
|
||||||
Release 2.2.0 - 2013-10-13
|
Release 2.2.0 - 2013-10-13
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
Loading…
Reference in New Issue