From d1d0b3e1fd593d590aaf2e3db8f730a296b20aa1 Mon Sep 17 00:00:00 2001 From: Jitendra Pandey Date: Mon, 16 Jan 2017 14:33:56 -0800 Subject: [PATCH] HDFS-11307. The rpc to portmap service for NFS has hardcoded timeout. Contributed by Mukul Kumar Singh. --- .../org/apache/hadoop/oncrpc/RpcProgram.java | 30 +++++++++++++++---- .../apache/hadoop/oncrpc/SimpleUdpClient.java | 11 +++++-- .../hadoop/hdfs/nfs/conf/NfsConfigKeys.java | 6 ++++ .../hdfs/nfs/mount/RpcProgramMountd.java | 4 ++- .../hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java | 4 ++- .../apache/hadoop/hdfs/nfs/TestMountd.java | 13 +++++++- 6 files changed, 58 insertions(+), 10 deletions(-) diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java index cebfcfaee08..c541cd660b4 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java @@ -22,6 +22,7 @@ import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.net.SocketAddress; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState; @@ -55,7 +56,18 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler { * system portmap daemon when registering this RPC server program. */ private final DatagramSocket registrationSocket; - + /* + * Timeout value in millisecond for the rpc connection to portmap + */ + private final int portmapUdpTimeoutMillis; + + protected RpcProgram(String program, String host, int port, int progNumber, + int lowProgVersion, int highProgVersion, + DatagramSocket registrationSocket, boolean allowInsecurePorts) { + this(program, host, port, progNumber, lowProgVersion, highProgVersion, + registrationSocket, allowInsecurePorts, 500); + } + /** * Constructor * @@ -69,10 +81,12 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler { * with portmap daemon * @param allowInsecurePorts true to allow client connections from * unprivileged ports, false otherwise + * @param portmapUdpTimeoutMillis timeout in milliseconds for RPC connection */ protected RpcProgram(String program, String host, int port, int progNumber, int lowProgVersion, int highProgVersion, - DatagramSocket registrationSocket, boolean allowInsecurePorts) { + DatagramSocket registrationSocket, boolean allowInsecurePorts, + int portmapUdpTimeoutMillis) { this.program = program; this.host = host; this.port = port; @@ -81,6 +95,7 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler { this.highProgVersion = highProgVersion; this.registrationSocket = registrationSocket; this.allowInsecurePorts = allowInsecurePorts; + this.portmapUdpTimeoutMillis = portmapUdpTimeoutMillis; LOG.info("Will " + (allowInsecurePorts ? "" : "not ") + "accept client " + "connections from unprivileged ports"); } @@ -124,14 +139,14 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler { } /** - * Register the program with Portmap or Rpcbind + * Register the program with Portmap or Rpcbind. * @param mapEntry port map entries * @param set specifies registration or not */ protected void register(PortmapMapping mapEntry, boolean set) { XDR mappingRequest = PortmapRequest.create(mapEntry, set); SimpleUdpClient registrationClient = new SimpleUdpClient(host, RPCB_PORT, - mappingRequest, registrationSocket); + mappingRequest, true, registrationSocket, portmapUdpTimeoutMillis); try { registrationClient.run(); } catch (IOException e) { @@ -238,4 +253,9 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler { public int getPort() { return port; } -} + + @VisibleForTesting + public int getPortmapUdpTimeoutMillis() { + return portmapUdpTimeoutMillis; + } +} \ No newline at end of file diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java index 40ecdf5b8d4..2b6dcaf0b6b 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java @@ -33,19 +33,26 @@ public class SimpleUdpClient { protected final XDR request; protected final boolean oneShot; protected final DatagramSocket clientSocket; + private int udpTimeoutMillis; public SimpleUdpClient(String host, int port, XDR request, DatagramSocket clientSocket) { - this(host, port, request, true, clientSocket); + this(host, port, request, true, clientSocket, 500); } public SimpleUdpClient(String host, int port, XDR request, Boolean oneShot, DatagramSocket clientSocket) { + this(host, port, request, oneShot, clientSocket, 500); + } + + public SimpleUdpClient(String host, int port, XDR request, Boolean oneShot, + DatagramSocket clientSocket, int udpTimeoutMillis) { this.host = host; this.port = port; this.request = request; this.oneShot = oneShot; this.clientSocket = clientSocket; + this.udpTimeoutMillis = udpTimeoutMillis; } public void run() throws IOException { @@ -60,7 +67,7 @@ public class SimpleUdpClient { DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length, IPAddress, port); socket.send(sendPacket); - socket.setSoTimeout(500); + socket.setSoTimeout(udpTimeoutMillis); DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length); socket.receive(receivePacket); diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/conf/NfsConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/conf/NfsConfigKeys.java index a71095ad637..2fe357cefc4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/conf/NfsConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/conf/NfsConfigKeys.java @@ -86,4 +86,10 @@ public class NfsConfigKeys { */ public static final String NFS_SUPERUSER_KEY = "nfs.superuser"; public static final String NFS_SUPERUSER_DEFAULT = ""; + /* + * Timeout value in millisecond for rpc connection to portmap + */ + public static final String NFS_UDP_CLIENT_PORTMAP_TIMEOUT_MILLIS_KEY = + "nfs.udp.client.portmap.timeout.millis"; + public static final int NFS_UDP_CLIENT_PORTMAP_TIMEOUT_MILLIS_DEFAULT = 500; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java index 869fb737fdd..e31bc711ad7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java @@ -81,7 +81,9 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface { super("mountd", "localhost", config.getInt( NfsConfigKeys.DFS_NFS_MOUNTD_PORT_KEY, NfsConfigKeys.DFS_NFS_MOUNTD_PORT_DEFAULT), PROGRAM, VERSION_1, - VERSION_3, registrationSocket, allowInsecurePorts); + VERSION_3, registrationSocket, allowInsecurePorts, config.getInt( + NfsConfigKeys.NFS_UDP_CLIENT_PORTMAP_TIMEOUT_MILLIS_KEY, + NfsConfigKeys.NFS_UDP_CLIENT_PORTMAP_TIMEOUT_MILLIS_DEFAULT)); exports = new ArrayList(); exports.add(config.get(NfsConfigKeys.DFS_NFS_EXPORT_POINT_KEY, NfsConfigKeys.DFS_NFS_EXPORT_POINT_DEFAULT)); diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java index 77555f800d8..d6bb71d0d9c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java @@ -173,7 +173,9 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { NfsConfigKeys.DFS_NFS_SERVER_PORT_KEY, NfsConfigKeys.DFS_NFS_SERVER_PORT_DEFAULT), Nfs3Constant.PROGRAM, Nfs3Constant.VERSION, Nfs3Constant.VERSION, registrationSocket, - allowInsecurePorts); + allowInsecurePorts, config.getInt( + NfsConfigKeys.NFS_UDP_CLIENT_PORTMAP_TIMEOUT_MILLIS_KEY, + NfsConfigKeys.NFS_UDP_CLIENT_PORTMAP_TIMEOUT_MILLIS_DEFAULT)); this.config = config; config.set(FsPermission.UMASK_LABEL, "000"); diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java index e1eb71a59df..32ed20fdcd1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java @@ -24,12 +24,14 @@ import java.net.InetAddress; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys; import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration; import org.apache.hadoop.hdfs.nfs.mount.RpcProgramMountd; import org.apache.hadoop.hdfs.nfs.nfs3.Nfs3; import org.apache.hadoop.hdfs.nfs.nfs3.RpcProgramNfs3; import org.apache.hadoop.oncrpc.XDR; import org.junit.Test; +import static org.junit.Assert.assertTrue; public class TestMountd { @@ -47,6 +49,14 @@ public class TestMountd { config.setInt("nfs3.mountd.port", 0); config.setInt("nfs3.server.port", 0); + int newTimeoutMillis = 1000; // 1s + // Set the new portmap rpc timeout values and check + config.setInt(NfsConfigKeys.NFS_UDP_CLIENT_PORTMAP_TIMEOUT_MILLIS_KEY, + newTimeoutMillis); + assertTrue(config.getInt( + NfsConfigKeys.NFS_UDP_CLIENT_PORTMAP_TIMEOUT_MILLIS_KEY, + 0) == newTimeoutMillis); + // Start nfs Nfs3 nfs3 = new Nfs3(config); nfs3.startServiceInternal(false); @@ -54,9 +64,10 @@ public class TestMountd { RpcProgramMountd mountd = (RpcProgramMountd) nfs3.getMountd() .getRpcProgram(); mountd.nullOp(new XDR(), 1234, InetAddress.getByName("localhost")); - + assertTrue(mountd.getPortmapUdpTimeoutMillis() == newTimeoutMillis); RpcProgramNfs3 nfsd = (RpcProgramNfs3) nfs3.getRpcProgram(); nfsd.nullProcedure(); + assertTrue(nfsd.getPortmapUdpTimeoutMillis() == newTimeoutMillis); cluster.shutdown(); }