HDFS-6281. Provide option to use the NFS Gateway without having to use the Hadoop portmapper. Contributed by Aaron T. Myers.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1589915 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2014-04-25 00:19:34 +00:00
parent 0e2a0fb434
commit 0bf1e43ac3
12 changed files with 223 additions and 37 deletions

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.oncrpc;
import java.io.IOException;
import java.net.DatagramSocket;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -45,6 +46,12 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
private final int lowProgVersion;
private final int highProgVersion;
/**
* If not null, this will be used as the socket to use to connect to the
* system portmap daemon when registering this RPC server program.
*/
private final DatagramSocket registrationSocket;
/**
* Constructor
*
@ -56,13 +63,15 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
* @param highProgVersion highest version of the specification supported
*/
protected RpcProgram(String program, String host, int port, int progNumber,
int lowProgVersion, int highProgVersion) {
int lowProgVersion, int highProgVersion,
DatagramSocket registrationSocket) {
this.program = program;
this.host = host;
this.port = port;
this.progNumber = progNumber;
this.lowProgVersion = lowProgVersion;
this.highProgVersion = highProgVersion;
this.registrationSocket = registrationSocket;
}
/**
@ -105,14 +114,14 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
protected void register(PortmapMapping mapEntry, boolean set) {
XDR mappingRequest = PortmapRequest.create(mapEntry, set);
SimpleUdpClient registrationClient = new SimpleUdpClient(host, RPCB_PORT,
mappingRequest);
mappingRequest, registrationSocket);
try {
registrationClient.run();
} catch (IOException e) {
String request = set ? "Registration" : "Unregistration";
LOG.error(request + " failure with " + host + ":" + port
+ ", portmap entry: " + mapEntry);
throw new RuntimeException(request + " failure");
+ ", portmap entry: " + mapEntry, e);
throw new RuntimeException(request + " failure", e);
}
}

View File

@ -27,43 +27,56 @@ import java.util.Arrays;
* A simple UDP based RPC client which just sends one request to a server.
*/
public class SimpleUdpClient {
protected final String host;
protected final int port;
protected final XDR request;
protected final boolean oneShot;
protected final DatagramSocket clientSocket;
public SimpleUdpClient(String host, int port, XDR request) {
this(host, port, request, true);
public SimpleUdpClient(String host, int port, XDR request,
DatagramSocket clientSocket) {
this(host, port, request, true, clientSocket);
}
public SimpleUdpClient(String host, int port, XDR request, Boolean oneShot) {
public SimpleUdpClient(String host, int port, XDR request, Boolean oneShot,
DatagramSocket clientSocket) {
this.host = host;
this.port = port;
this.request = request;
this.oneShot = oneShot;
this.clientSocket = clientSocket;
}
public void run() throws IOException {
DatagramSocket clientSocket = new DatagramSocket();
InetAddress IPAddress = InetAddress.getByName(host);
byte[] sendData = request.getBytes();
byte[] receiveData = new byte[65535];
// Use the provided socket if there is one, else just make a new one.
DatagramSocket socket = this.clientSocket == null ?
new DatagramSocket() : this.clientSocket;
DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length,
IPAddress, port);
clientSocket.send(sendPacket);
DatagramPacket receivePacket = new DatagramPacket(receiveData,
receiveData.length);
clientSocket.receive(receivePacket);
try {
DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length,
IPAddress, port);
socket.send(sendPacket);
DatagramPacket receivePacket = new DatagramPacket(receiveData,
receiveData.length);
socket.receive(receivePacket);
// Check reply status
XDR xdr = new XDR(Arrays.copyOfRange(receiveData, 0,
receivePacket.getLength()));
RpcReply reply = RpcReply.read(xdr);
if (reply.getState() != RpcReply.ReplyState.MSG_ACCEPTED) {
throw new IOException("Request failed: " + reply.getState());
// Check reply status
XDR xdr = new XDR(Arrays.copyOfRange(receiveData, 0,
receivePacket.getLength()));
RpcReply reply = RpcReply.read(xdr);
if (reply.getState() != RpcReply.ReplyState.MSG_ACCEPTED) {
throw new IOException("Request failed: " + reply.getState());
}
} finally {
// If the client socket was passed in to this UDP client, it's on the
// caller of this UDP client to close that socket.
if (this.clientSocket == null) {
socket.close();
}
}
clientSocket.close();
}
}

View File

@ -51,7 +51,8 @@ public class TestFrameDecoder {
protected TestRpcProgram(String program, String host, int port,
int progNumber, int lowProgVersion, int highProgVersion) {
super(program, host, port, progNumber, lowProgVersion, highProgVersion);
super(program, host, port, progNumber, lowProgVersion, highProgVersion,
null);
}
@Override

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.nfs.mount;
import java.io.IOException;
import java.net.DatagramSocket;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mount.MountdBase;
@ -31,13 +32,14 @@ import org.apache.hadoop.mount.MountdBase;
*/
public class Mountd extends MountdBase {
public Mountd(Configuration config) throws IOException {
super(new RpcProgramMountd(config));
public Mountd(Configuration config, DatagramSocket registrationSocket)
throws IOException {
super(new RpcProgramMountd(config, registrationSocket));
}
public static void main(String[] args) throws IOException {
Configuration config = new Configuration();
Mountd mountd = new Mountd(config);
Mountd mountd = new Mountd(config, null);
mountd.start(true);
}
}

View File

@ -20,6 +20,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NFS_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NFS_KERBEROS_PRINCIPAL_KEY;
import java.io.IOException;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@ -78,10 +79,11 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface {
private final NfsExports hostsMatcher;
public RpcProgramMountd(Configuration config) throws IOException {
public RpcProgramMountd(Configuration config,
DatagramSocket registrationSocket) throws IOException {
// Note that RPC cache is not enabled
super("mountd", "localhost", config.getInt("nfs3.mountd.port", PORT),
PROGRAM, VERSION_1, VERSION_3);
PROGRAM, VERSION_1, VERSION_3, registrationSocket);
exports = new ArrayList<String>();
exports.add(config.get(Nfs3Constant.EXPORT_POINT,
Nfs3Constant.EXPORT_POINT_DEFAULT));

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.nfs.nfs3;
import java.io.IOException;
import java.net.DatagramSocket;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.nfs.mount.Mountd;
@ -40,8 +41,12 @@ public class Nfs3 extends Nfs3Base {
}
public Nfs3(Configuration conf) throws IOException {
super(new RpcProgramNfs3(conf), conf);
mountd = new Mountd(conf);
this(conf, null);
}
public Nfs3(Configuration conf, DatagramSocket registrationSocket) throws IOException {
super(new RpcProgramNfs3(conf, registrationSocket), conf);
mountd = new Mountd(conf, registrationSocket);
}
public Mountd getMountd() {
@ -54,9 +59,14 @@ public class Nfs3 extends Nfs3Base {
start(register);
}
public static void main(String[] args) throws IOException {
static void startService(String[] args,
DatagramSocket registrationSocket) throws IOException {
StringUtils.startupShutdownMessage(Nfs3.class, args, LOG);
final Nfs3 nfsServer = new Nfs3(new Configuration());
final Nfs3 nfsServer = new Nfs3(new Configuration(), registrationSocket);
nfsServer.startServiceInternal(true);
}
public static void main(String[] args) throws IOException {
startService(args, null);
}
}

View File

@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdfs.nfs.nfs3;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import org.apache.commons.daemon.Daemon;
import org.apache.commons.daemon.DaemonContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
/**
* This class is used to allow the initial registration of the NFS gateway with
* the system portmap daemon to come from a privileged (< 1024) port. This is
* necessary on certain operating systems to work around this bug in rpcbind:
*
* Red Hat: https://bugzilla.redhat.com/show_bug.cgi?id=731542
* SLES: https://bugzilla.novell.com/show_bug.cgi?id=823364
* Debian: https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=594880
*/
public class PrivilegedNfsGatewayStarter implements Daemon {
private String[] args = null;
private DatagramSocket registrationSocket = null;
@Override
public void init(DaemonContext context) throws Exception {
System.err.println("Initializing privileged NFS client socket...");
Configuration conf = new HdfsConfiguration();
int clientPort = conf.getInt(DFSConfigKeys.DFS_NFS_REGISTRATION_PORT_KEY,
DFSConfigKeys.DFS_NFS_REGISTRATION_PORT_DEFAULT);
if (clientPort < 1 || clientPort > 1023) {
throw new RuntimeException("Must start privileged NFS server with '" +
DFSConfigKeys.DFS_NFS_REGISTRATION_PORT_KEY + "' configured to a " +
"privileged port.");
}
registrationSocket = new DatagramSocket(
new InetSocketAddress("localhost", clientPort));
registrationSocket.setReuseAddress(true);
args = context.getArguments();
}
@Override
public void start() throws Exception {
Nfs3.startService(args, registrationSocket);
}
@Override
public void stop() throws Exception {
// Nothing to do.
}
@Override
public void destroy() {
if (registrationSocket != null && !registrationSocket.isClosed()) {
registrationSocket.close();
}
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.nfs.nfs3;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
@ -165,10 +166,11 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
private final RpcCallCache rpcCallCache;
public RpcProgramNfs3(Configuration config) throws IOException {
public RpcProgramNfs3(Configuration config, DatagramSocket registrationSocket)
throws IOException {
super("NFS3", "localhost", config.getInt(Nfs3Constant.NFS3_SERVER_PORT,
Nfs3Constant.NFS3_SERVER_PORT_DEFAULT), Nfs3Constant.PROGRAM,
Nfs3Constant.VERSION, Nfs3Constant.VERSION);
Nfs3Constant.VERSION, Nfs3Constant.VERSION, registrationSocket);
config.set(FsPermission.UMASK_LABEL, "000");
iug = new IdUserGroup();

View File

@ -10,6 +10,9 @@ Release 2.5.0 - UNRELEASED
NEW FEATURES
HDFS-6281. Provide option to use the NFS Gateway without having to use the
Hadoop portmapper. (atm)
IMPROVEMENTS
HDFS-6007. Update documentation about short-circuit local reads (iwasakims

View File

@ -101,6 +101,27 @@ if [ "$COMMAND" == "datanode" ] && [ "$EUID" -eq 0 ] && [ -n "$HADOOP_SECURE_DN_
fi
fi
# Determine if we're starting a privileged NFS daemon, and if so, redefine appropriate variables
if [ "$COMMAND" == "nfs3" ] && [ "$EUID" -eq 0 ] && [ -n "$HADOOP_PRIVILEGED_NFS_USER" ]; then
if [ -n "$JSVC_HOME" ]; then
if [ -n "$HADOOP_PRIVILEGED_NFS_PID_DIR" ]; then
HADOOP_PID_DIR=$HADOOP_PRIVILEGED_NFS_PID_DIR
fi
if [ -n "$HADOOP_PRIVILEGED_NFS_LOG_DIR" ]; then
HADOOP_LOG_DIR=$HADOOP_PRIVILEGED_NFS_LOG_DIR
HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.log.dir=$HADOOP_LOG_DIR"
fi
HADOOP_IDENT_STRING=$HADOOP_PRIVILEGED_NFS_USER
HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.id.str=$HADOOP_IDENT_STRING"
starting_privileged_nfs="true"
else
echo "It looks like you're trying to start a privileged NFS server, but"\
"\$JSVC_HOME isn't set. Falling back to starting unprivileged NFS server."
fi
fi
if [ "$COMMAND" = "namenode" ] ; then
CLASS='org.apache.hadoop.hdfs.server.namenode.NameNode'
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_NAMENODE_OPTS"
@ -178,7 +199,7 @@ if [ "$starting_secure_dn" = "true" ]; then
JSVC=$JSVC_HOME/jsvc
if [ ! -f $JSVC ]; then
echo "JSVC_HOME is not set correctly so jsvc cannot be found. Jsvc is required to run secure datanodes. "
echo "JSVC_HOME is not set correctly so jsvc cannot be found. jsvc is required to run secure datanodes. "
echo "Please download and install jsvc from http://archive.apache.org/dist/commons/daemon/binaries/ "\
"and set JSVC_HOME to the directory containing the jsvc binary."
exit
@ -201,6 +222,38 @@ if [ "$starting_secure_dn" = "true" ]; then
-cp "$CLASSPATH" \
$JAVA_HEAP_MAX $HADOOP_OPTS \
org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter "$@"
elif [ "$starting_privileged_nfs" = "true" ] ; then
if [ "$HADOOP_PID_DIR" = "" ]; then
HADOOP_PRIVILEGED_NFS_PID="/tmp/hadoop_privileged_nfs3.pid"
else
HADOOP_PRIVILEGED_NFS_PID="$HADOOP_PID_DIR/hadoop_privileged_nfs3.pid"
fi
JSVC=$JSVC_HOME/jsvc
if [ ! -f $JSVC ]; then
echo "JSVC_HOME is not set correctly so jsvc cannot be found. jsvc is required to run privileged NFS gateways. "
echo "Please download and install jsvc from http://archive.apache.org/dist/commons/daemon/binaries/ "\
"and set JSVC_HOME to the directory containing the jsvc binary."
exit
fi
if [[ ! $JSVC_OUTFILE ]]; then
JSVC_OUTFILE="$HADOOP_LOG_DIR/nfs3_jsvc.out"
fi
if [[ ! $JSVC_ERRFILE ]]; then
JSVC_ERRFILE="$HADOOP_LOG_DIR/nfs3_jsvc.err"
fi
exec "$JSVC" \
-Dproc_$COMMAND -outfile "$JSVC_OUTFILE" \
-errfile "$JSVC_ERRFILE" \
-pidfile "$HADOOP_PRIVILEGED_NFS_PID" \
-nodetach \
-user "$HADOOP_PRIVILEGED_NFS_USER" \
-cp "$CLASSPATH" \
$JAVA_HEAP_MAX $HADOOP_OPTS \
org.apache.hadoop.hdfs.nfs.nfs3.PrivilegedNfsGatewayStarter "$@"
else
# run it
exec "$JAVA" -Dproc_$COMMAND $JAVA_HEAP_MAX $HADOOP_OPTS $CLASS "$@"

View File

@ -630,7 +630,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE =
"dfs.client.hedged.read.threadpool.size";
public static final int DEFAULT_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE = 0;
public static final int DEFAULT_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE = 0;
public static final String DFS_NFS_KEYTAB_FILE_KEY = "dfs.nfs.keytab.file";
public static final String DFS_NFS_KERBEROS_PRINCIPAL_KEY = "dfs.nfs.kerberos.principal";
public static final String DFS_NFS_REGISTRATION_PORT_KEY = "dfs.nfs.registration.port";
public static final int DFS_NFS_REGISTRATION_PORT_DEFAULT = 40; // Currently unassigned.
}

View File

@ -243,6 +243,19 @@ HDFS NFS Gateway
hadoop-daemon.sh stop portmap
-------------------------
Optionally, you can forgo running the Hadoop-provided portmap daemon and
instead use the system portmap daemon on all operating systems if you start the
NFS Gateway as root. This will allow the HDFS NFS Gateway to work around the
aforementioned bug and still register using the system portmap daemon. To do
so, just start the NFS gateway daemon as you normally would, but make sure to
do so as the "root" user, and also set the "HADOOP_PRIVILEGED_NFS_USER"
environment variable to an unprivileged user. In this mode the NFS Gateway will
start as root to perform its initial registration with the system portmap, and
then will drop privileges back to the user specified by the
HADOOP_PRIVILEGED_NFS_USER afterward and for the rest of the duration of the
lifetime of the NFS Gateway process. Note that if you choose this route, you
should skip steps 1 and 2 above.
* {Verify validity of NFS related services}