HDFS-6406. Add capability for NFS gateway to reject connections from unprivileged ports. Contributed by Aaron T. Myers.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1595351 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2014-05-16 21:23:03 +00:00
parent 11be7334c4
commit da3992b4e3
10 changed files with 190 additions and 66 deletions

View File

@ -19,11 +19,14 @@ package org.apache.hadoop.oncrpc;
import java.io.IOException;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
import org.apache.hadoop.oncrpc.security.Verifier;
import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.apache.hadoop.portmap.PortmapMapping;
import org.apache.hadoop.portmap.PortmapRequest;
import org.jboss.netty.buffer.ChannelBuffer;
@ -37,7 +40,7 @@ import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
* and implement {@link #handleInternal} to handle the requests received.
*/
public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
private static final Log LOG = LogFactory.getLog(RpcProgram.class);
static final Log LOG = LogFactory.getLog(RpcProgram.class);
public static final int RPCB_PORT = 111;
private final String program;
private final String host;
@ -45,6 +48,7 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
private final int progNumber;
private final int lowProgVersion;
private final int highProgVersion;
private final boolean allowInsecurePorts;
/**
* If not null, this will be used as the socket to use to connect to the
@ -61,10 +65,14 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
* @param progNumber program number as defined in RFC 1050
* @param lowProgVersion lowest version of the specification supported
* @param highProgVersion highest version of the specification supported
* @param DatagramSocket registrationSocket if not null, use this socket to
* register with portmap daemon
* @param allowInsecurePorts true to allow client connections from
* unprivileged ports, false otherwise
*/
protected RpcProgram(String program, String host, int port, int progNumber,
int lowProgVersion, int highProgVersion,
DatagramSocket registrationSocket) {
DatagramSocket registrationSocket, boolean allowInsecurePorts) {
this.program = program;
this.host = host;
this.port = port;
@ -72,6 +80,9 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
this.lowProgVersion = lowProgVersion;
this.highProgVersion = highProgVersion;
this.registrationSocket = registrationSocket;
this.allowInsecurePorts = allowInsecurePorts;
LOG.info("Will " + (allowInsecurePorts ? "" : "not ") + "accept client "
+ "connections from unprivileged ports");
}
/**
@ -133,43 +144,82 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
throws Exception {
RpcInfo info = (RpcInfo) e.getMessage();
RpcCall call = (RpcCall) info.header();
SocketAddress remoteAddress = info.remoteAddress();
if (!allowInsecurePorts) {
if (LOG.isDebugEnabled()) {
LOG.debug("Will not allow connections from unprivileged ports. " +
"Checking for valid client port...");
}
if (remoteAddress instanceof InetSocketAddress) {
InetSocketAddress inetRemoteAddress = (InetSocketAddress) remoteAddress;
if (inetRemoteAddress.getPort() > 1023) {
LOG.warn("Connection attempted from '" + inetRemoteAddress + "' "
+ "which is an unprivileged port. Rejecting connection.");
sendRejectedReply(call, remoteAddress, ctx);
return;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Accepting connection from '" + remoteAddress + "'");
}
}
} else {
LOG.warn("Could not determine remote port of socket address '" +
remoteAddress + "'. Rejecting connection.");
sendRejectedReply(call, remoteAddress, ctx);
return;
}
}
if (LOG.isTraceEnabled()) {
LOG.trace(program + " procedure #" + call.getProcedure());
}
if (this.progNumber != call.getProgram()) {
LOG.warn("Invalid RPC call program " + call.getProgram());
RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
AcceptState.PROG_UNAVAIL, Verifier.VERIFIER_NONE);
XDR out = new XDR();
reply.write(out);
ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
.buffer());
RpcResponse rsp = new RpcResponse(b, info.remoteAddress());
RpcUtil.sendRpcResponse(ctx, rsp);
sendAcceptedReply(call, remoteAddress, AcceptState.PROG_UNAVAIL, ctx);
return;
}
int ver = call.getVersion();
if (ver < lowProgVersion || ver > highProgVersion) {
LOG.warn("Invalid RPC call version " + ver);
RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
AcceptState.PROG_MISMATCH, Verifier.VERIFIER_NONE);
XDR out = new XDR();
reply.write(out);
out.writeInt(lowProgVersion);
out.writeInt(highProgVersion);
ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
.buffer());
RpcResponse rsp = new RpcResponse(b, info.remoteAddress());
RpcUtil.sendRpcResponse(ctx, rsp);
sendAcceptedReply(call, remoteAddress, AcceptState.PROG_MISMATCH, ctx);
return;
}
handleInternal(ctx, info);
}
private void sendAcceptedReply(RpcCall call, SocketAddress remoteAddress,
AcceptState acceptState, ChannelHandlerContext ctx) {
RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
acceptState, Verifier.VERIFIER_NONE);
XDR out = new XDR();
reply.write(out);
if (acceptState == AcceptState.PROG_MISMATCH) {
out.writeInt(lowProgVersion);
out.writeInt(highProgVersion);
}
ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
.buffer());
RpcResponse rsp = new RpcResponse(b, remoteAddress);
RpcUtil.sendRpcResponse(ctx, rsp);
}
private static void sendRejectedReply(RpcCall call,
SocketAddress remoteAddress, ChannelHandlerContext ctx) {
XDR out = new XDR();
RpcDeniedReply reply = new RpcDeniedReply(call.getXid(),
RpcReply.ReplyState.MSG_DENIED,
RpcDeniedReply.RejectState.AUTH_ERROR, new VerifierNone());
reply.write(out);
ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
.buffer());
RpcResponse rsp = new RpcResponse(buf, remoteAddress);
RpcUtil.sendRpcResponse(ctx, rsp);
}
protected abstract void handleInternal(ChannelHandlerContext ctx, RpcInfo info);

View File

@ -28,6 +28,8 @@ import java.util.Random;
import org.apache.hadoop.oncrpc.RpcUtil.RpcFrameDecoder;
import org.apache.hadoop.oncrpc.security.CredentialsNone;
import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.apache.log4j.Level;
import org.apache.commons.logging.impl.Log4JLogger;
import org.jboss.netty.buffer.ByteBufferBackedChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
@ -38,10 +40,16 @@ import org.junit.Test;
import org.mockito.Mockito;
public class TestFrameDecoder {
static {
((Log4JLogger) RpcProgram.LOG).getLogger().setLevel(Level.ALL);
}
private static int resultSize;
static void testRequest(XDR request, int serverPort) {
// Reset resultSize so as to avoid interference from other tests in this class.
resultSize = 0;
SimpleTcpClient tcpClient = new SimpleTcpClient("localhost", serverPort, request,
true);
tcpClient.run();
@ -50,9 +58,10 @@ public class TestFrameDecoder {
static class TestRpcProgram extends RpcProgram {
protected TestRpcProgram(String program, String host, int port,
int progNumber, int lowProgVersion, int highProgVersion) {
int progNumber, int lowProgVersion, int highProgVersion,
boolean allowInsecurePorts) {
super(program, host, port, progNumber, lowProgVersion, highProgVersion,
null);
null, allowInsecurePorts);
}
@Override
@ -149,26 +158,7 @@ public class TestFrameDecoder {
@Test
public void testFrames() {
Random rand = new Random();
int serverPort = 30000 + rand.nextInt(10000);
int retries = 10; // A few retries in case initial choice is in use.
while (true) {
try {
RpcProgram program = new TestFrameDecoder.TestRpcProgram("TestRpcProgram",
"localhost", serverPort, 100000, 1, 2);
SimpleTcpServer tcpServer = new SimpleTcpServer(serverPort, program, 1);
tcpServer.run();
break; // Successfully bound a port, break out.
} catch (ChannelException ce) {
if (retries-- > 0) {
serverPort += rand.nextInt(20); // Port in use? Try another.
} else {
throw ce; // Out of retries.
}
}
}
int serverPort = startRpcServer(true);
XDR xdrOut = createGetportMount();
int headerSize = xdrOut.size();
@ -183,6 +173,47 @@ public class TestFrameDecoder {
// Verify the server got the request with right size
assertEquals(requestSize, resultSize);
}
@Test
public void testUnprivilegedPort() {
// Don't allow connections from unprivileged ports. Given that this test is
// presumably not being run by root, this will be the case.
int serverPort = startRpcServer(false);
XDR xdrOut = createGetportMount();
int bufsize = 2 * 1024 * 1024;
byte[] buffer = new byte[bufsize];
xdrOut.writeFixedOpaque(buffer);
// Send the request to the server
testRequest(xdrOut, serverPort);
// Verify the server rejected the request.
assertEquals(0, resultSize);
}
private static int startRpcServer(boolean allowInsecurePorts) {
Random rand = new Random();
int serverPort = 30000 + rand.nextInt(10000);
int retries = 10; // A few retries in case initial choice is in use.
while (true) {
try {
RpcProgram program = new TestFrameDecoder.TestRpcProgram("TestRpcProgram",
"localhost", serverPort, 100000, 1, 2, allowInsecurePorts);
SimpleTcpServer tcpServer = new SimpleTcpServer(serverPort, program, 1);
tcpServer.run();
break; // Successfully bound a port, break out.
} catch (ChannelException ce) {
if (retries-- > 0) {
serverPort += rand.nextInt(20); // Port in use? Try another.
} else {
throw ce; // Out of retries.
}
}
}
return serverPort;
}
static void createPortmapXDRheader(XDR xdr_out, int procedure) {
// Make this a method

View File

@ -0,0 +1,18 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# log4j configuration used during build and unit tests
log4j.rootLogger=info,stdout
log4j.threshhold=ALL
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n

View File

@ -32,14 +32,14 @@ import org.apache.hadoop.mount.MountdBase;
*/
public class Mountd extends MountdBase {
public Mountd(Configuration config, DatagramSocket registrationSocket)
throws IOException {
super(new RpcProgramMountd(config, registrationSocket));
public Mountd(Configuration config, DatagramSocket registrationSocket,
boolean allowInsecurePorts) throws IOException {
super(new RpcProgramMountd(config, registrationSocket, allowInsecurePorts));
}
public static void main(String[] args) throws IOException {
Configuration config = new Configuration();
Mountd mountd = new Mountd(config, null);
Mountd mountd = new Mountd(config, null, true);
mountd.start(true);
}
}

View File

@ -79,11 +79,11 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface {
private final NfsExports hostsMatcher;
public RpcProgramMountd(Configuration config,
DatagramSocket registrationSocket) throws IOException {
public RpcProgramMountd(Configuration config, DatagramSocket registrationSocket,
boolean allowInsecurePorts) throws IOException {
// Note that RPC cache is not enabled
super("mountd", "localhost", config.getInt("nfs3.mountd.port", PORT),
PROGRAM, VERSION_1, VERSION_3, registrationSocket);
PROGRAM, VERSION_1, VERSION_3, registrationSocket, allowInsecurePorts);
exports = new ArrayList<String>();
exports.add(config.get(Nfs3Constant.EXPORT_POINT,
Nfs3Constant.EXPORT_POINT_DEFAULT));

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.net.DatagramSocket;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.nfs.mount.Mountd;
import org.apache.hadoop.nfs.nfs3.Nfs3Base;
import org.apache.hadoop.util.StringUtils;
@ -41,12 +42,13 @@ public class Nfs3 extends Nfs3Base {
}
public Nfs3(Configuration conf) throws IOException {
this(conf, null);
this(conf, null, true);
}
public Nfs3(Configuration conf, DatagramSocket registrationSocket) throws IOException {
super(new RpcProgramNfs3(conf, registrationSocket), conf);
mountd = new Mountd(conf, registrationSocket);
public Nfs3(Configuration conf, DatagramSocket registrationSocket,
boolean allowInsecurePorts) throws IOException {
super(new RpcProgramNfs3(conf, registrationSocket, allowInsecurePorts), conf);
mountd = new Mountd(conf, registrationSocket, allowInsecurePorts);
}
public Mountd getMountd() {
@ -61,8 +63,13 @@ public class Nfs3 extends Nfs3Base {
static void startService(String[] args,
DatagramSocket registrationSocket) throws IOException {
StringUtils.startupShutdownMessage(Nfs3.class, args, LOG);
final Nfs3 nfsServer = new Nfs3(new Configuration(), registrationSocket);
StringUtils.startupShutdownMessage(Nfs3.class, args, LOG);
Configuration conf = new Configuration();
boolean allowInsecurePorts = conf.getBoolean(
DFSConfigKeys.DFS_NFS_ALLOW_INSECURE_PORTS_KEY,
DFSConfigKeys.DFS_NFS_ALLOW_INSECURE_PORTS_DEFAULT);
final Nfs3 nfsServer = new Nfs3(new Configuration(), registrationSocket,
allowInsecurePorts);
nfsServer.startServiceInternal(true);
}

View File

@ -166,11 +166,12 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
private final RpcCallCache rpcCallCache;
public RpcProgramNfs3(Configuration config, DatagramSocket registrationSocket)
throws IOException {
public RpcProgramNfs3(Configuration config, DatagramSocket registrationSocket,
boolean allowInsecurePorts) throws IOException {
super("NFS3", "localhost", config.getInt(Nfs3Constant.NFS3_SERVER_PORT,
Nfs3Constant.NFS3_SERVER_PORT_DEFAULT), Nfs3Constant.PROGRAM,
Nfs3Constant.VERSION, Nfs3Constant.VERSION, registrationSocket);
Nfs3Constant.VERSION, Nfs3Constant.VERSION, registrationSocket,
allowInsecurePorts);
config.set(FsPermission.UMASK_LABEL, "000");
iug = new IdUserGroup();

View File

@ -273,6 +273,9 @@ Release 2.5.0 - UNRELEASED
HDFS-6334. Client failover proxy provider for IP failover based NN HA.
(kihwal)
HDFS-6406. Add capability for NFS gateway to reject connections from
unprivileged ports. (atm)
IMPROVEMENTS
HDFS-6007. Update documentation about short-circuit local reads (iwasakims

View File

@ -631,9 +631,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE =
"dfs.client.hedged.read.threadpool.size";
public static final int DEFAULT_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE = 0;
public static final String DFS_NFS_KEYTAB_FILE_KEY = "dfs.nfs.keytab.file";
public static final String DFS_NFS_KERBEROS_PRINCIPAL_KEY = "dfs.nfs.kerberos.principal";
public static final String DFS_NFS_REGISTRATION_PORT_KEY = "dfs.nfs.registration.port";
public static final int DFS_NFS_REGISTRATION_PORT_DEFAULT = 40; // Currently unassigned.
public static final int DEFAULT_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE = 0;
public static final String DFS_NFS_KEYTAB_FILE_KEY = "dfs.nfs.keytab.file";
public static final String DFS_NFS_KERBEROS_PRINCIPAL_KEY = "dfs.nfs.kerberos.principal";
public static final String DFS_NFS_REGISTRATION_PORT_KEY = "dfs.nfs.registration.port";
public static final int DFS_NFS_REGISTRATION_PORT_DEFAULT = 40; // Currently unassigned.
public static final String DFS_NFS_ALLOW_INSECURE_PORTS_KEY = "dfs.nfs.allow.insecure.ports";
public static final boolean DFS_NFS_ALLOW_INSECURE_PORTS_DEFAULT = true;
}

View File

@ -1317,6 +1317,17 @@
</description>
</property>
<property>
<name>dfs.nfs.allow.insecure.ports</name>
<value>true</value>
<description>
When set to false, client connections originating from unprivileged ports
(those above 1023) will be rejected. This is to ensure that clients
connecting to this NFS Gateway must have had root privilege on the machine
where they're connecting from.
</description>
</property>
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
@ -1895,4 +1906,4 @@
</description>
</property>
</configuration>
</configuration>