HDFS-11307. The rpc to portmap service for NFS has hardcoded timeout. Contributed by Mukul Kumar Singh.

This commit is contained in:
Jitendra Pandey 2017-01-16 14:33:56 -08:00
parent 954dae26cd
commit ffb4c22f6e
6 changed files with 58 additions and 10 deletions

View File

@ -22,6 +22,7 @@ import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
@ -55,7 +56,18 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
* system portmap daemon when registering this RPC server program.
*/
private final DatagramSocket registrationSocket;
/*
* Timeout value in millisecond for the rpc connection to portmap
*/
private final int portmapUdpTimeoutMillis;
protected RpcProgram(String program, String host, int port, int progNumber,
int lowProgVersion, int highProgVersion,
DatagramSocket registrationSocket, boolean allowInsecurePorts) {
this(program, host, port, progNumber, lowProgVersion, highProgVersion,
registrationSocket, allowInsecurePorts, 500);
}
/**
* Constructor
*
@ -69,10 +81,12 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
* with portmap daemon
* @param allowInsecurePorts true to allow client connections from
* unprivileged ports, false otherwise
* @param portmapUdpTimeoutMillis timeout in milliseconds for RPC connection
*/
protected RpcProgram(String program, String host, int port, int progNumber,
int lowProgVersion, int highProgVersion,
DatagramSocket registrationSocket, boolean allowInsecurePorts) {
DatagramSocket registrationSocket, boolean allowInsecurePorts,
int portmapUdpTimeoutMillis) {
this.program = program;
this.host = host;
this.port = port;
@ -81,6 +95,7 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
this.highProgVersion = highProgVersion;
this.registrationSocket = registrationSocket;
this.allowInsecurePorts = allowInsecurePorts;
this.portmapUdpTimeoutMillis = portmapUdpTimeoutMillis;
LOG.info("Will " + (allowInsecurePorts ? "" : "not ") + "accept client "
+ "connections from unprivileged ports");
}
@ -124,14 +139,14 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
}
/**
* Register the program with Portmap or Rpcbind
* Register the program with Portmap or Rpcbind.
* @param mapEntry port map entries
* @param set specifies registration or not
*/
protected void register(PortmapMapping mapEntry, boolean set) {
XDR mappingRequest = PortmapRequest.create(mapEntry, set);
SimpleUdpClient registrationClient = new SimpleUdpClient(host, RPCB_PORT,
mappingRequest, registrationSocket);
mappingRequest, true, registrationSocket, portmapUdpTimeoutMillis);
try {
registrationClient.run();
} catch (IOException e) {
@ -238,4 +253,9 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
public int getPort() {
return port;
}
}
@VisibleForTesting
public int getPortmapUdpTimeoutMillis() {
return portmapUdpTimeoutMillis;
}
}

View File

@ -33,19 +33,26 @@ public class SimpleUdpClient {
protected final XDR request;
protected final boolean oneShot;
protected final DatagramSocket clientSocket;
private int udpTimeoutMillis;
public SimpleUdpClient(String host, int port, XDR request,
DatagramSocket clientSocket) {
this(host, port, request, true, clientSocket);
this(host, port, request, true, clientSocket, 500);
}
public SimpleUdpClient(String host, int port, XDR request, Boolean oneShot,
DatagramSocket clientSocket) {
this(host, port, request, oneShot, clientSocket, 500);
}
public SimpleUdpClient(String host, int port, XDR request, Boolean oneShot,
DatagramSocket clientSocket, int udpTimeoutMillis) {
this.host = host;
this.port = port;
this.request = request;
this.oneShot = oneShot;
this.clientSocket = clientSocket;
this.udpTimeoutMillis = udpTimeoutMillis;
}
public void run() throws IOException {
@ -60,7 +67,7 @@ public class SimpleUdpClient {
DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length,
IPAddress, port);
socket.send(sendPacket);
socket.setSoTimeout(500);
socket.setSoTimeout(udpTimeoutMillis);
DatagramPacket receivePacket = new DatagramPacket(receiveData,
receiveData.length);
socket.receive(receivePacket);

View File

@ -86,4 +86,10 @@ public class NfsConfigKeys {
*/
public static final String NFS_SUPERUSER_KEY = "nfs.superuser";
public static final String NFS_SUPERUSER_DEFAULT = "";
/*
* Timeout value in millisecond for rpc connection to portmap
*/
public static final String NFS_UDP_CLIENT_PORTMAP_TIMEOUT_MILLIS_KEY =
"nfs.udp.client.portmap.timeout.millis";
public static final int NFS_UDP_CLIENT_PORTMAP_TIMEOUT_MILLIS_DEFAULT = 500;
}

View File

@ -81,7 +81,9 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface {
super("mountd", "localhost", config.getInt(
NfsConfigKeys.DFS_NFS_MOUNTD_PORT_KEY,
NfsConfigKeys.DFS_NFS_MOUNTD_PORT_DEFAULT), PROGRAM, VERSION_1,
VERSION_3, registrationSocket, allowInsecurePorts);
VERSION_3, registrationSocket, allowInsecurePorts, config.getInt(
NfsConfigKeys.NFS_UDP_CLIENT_PORTMAP_TIMEOUT_MILLIS_KEY,
NfsConfigKeys.NFS_UDP_CLIENT_PORTMAP_TIMEOUT_MILLIS_DEFAULT));
exports = new ArrayList<String>();
exports.add(config.get(NfsConfigKeys.DFS_NFS_EXPORT_POINT_KEY,
NfsConfigKeys.DFS_NFS_EXPORT_POINT_DEFAULT));

View File

@ -173,7 +173,9 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
NfsConfigKeys.DFS_NFS_SERVER_PORT_KEY,
NfsConfigKeys.DFS_NFS_SERVER_PORT_DEFAULT), Nfs3Constant.PROGRAM,
Nfs3Constant.VERSION, Nfs3Constant.VERSION, registrationSocket,
allowInsecurePorts);
allowInsecurePorts, config.getInt(
NfsConfigKeys.NFS_UDP_CLIENT_PORTMAP_TIMEOUT_MILLIS_KEY,
NfsConfigKeys.NFS_UDP_CLIENT_PORTMAP_TIMEOUT_MILLIS_DEFAULT));
this.config = config;
config.set(FsPermission.UMASK_LABEL, "000");

View File

@ -24,12 +24,14 @@ import java.net.InetAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys;
import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration;
import org.apache.hadoop.hdfs.nfs.mount.RpcProgramMountd;
import org.apache.hadoop.hdfs.nfs.nfs3.Nfs3;
import org.apache.hadoop.hdfs.nfs.nfs3.RpcProgramNfs3;
import org.apache.hadoop.oncrpc.XDR;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
public class TestMountd {
@ -47,6 +49,14 @@ public class TestMountd {
config.setInt("nfs3.mountd.port", 0);
config.setInt("nfs3.server.port", 0);
int newTimeoutMillis = 1000; // 1s
// Set the new portmap rpc timeout values and check
config.setInt(NfsConfigKeys.NFS_UDP_CLIENT_PORTMAP_TIMEOUT_MILLIS_KEY,
newTimeoutMillis);
assertTrue(config.getInt(
NfsConfigKeys.NFS_UDP_CLIENT_PORTMAP_TIMEOUT_MILLIS_KEY,
0) == newTimeoutMillis);
// Start nfs
Nfs3 nfs3 = new Nfs3(config);
nfs3.startServiceInternal(false);
@ -54,9 +64,10 @@ public class TestMountd {
RpcProgramMountd mountd = (RpcProgramMountd) nfs3.getMountd()
.getRpcProgram();
mountd.nullOp(new XDR(), 1234, InetAddress.getByName("localhost"));
assertTrue(mountd.getPortmapUdpTimeoutMillis() == newTimeoutMillis);
RpcProgramNfs3 nfsd = (RpcProgramNfs3) nfs3.getRpcProgram();
nfsd.nullProcedure();
assertTrue(nfsd.getPortmapUdpTimeoutMillis() == newTimeoutMillis);
cluster.shutdown();
}