HDFS-11307. The rpc to portmap service for NFS has hardcoded timeout. Contributed by Mukul Kumar Singh.
This commit is contained in:
parent
79e939d0b8
commit
d1d0b3e1fd
|
@ -22,6 +22,7 @@ import java.net.DatagramSocket;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.SocketAddress;
|
import java.net.SocketAddress;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
|
import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
|
||||||
|
@ -55,7 +56,18 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
|
||||||
* system portmap daemon when registering this RPC server program.
|
* system portmap daemon when registering this RPC server program.
|
||||||
*/
|
*/
|
||||||
private final DatagramSocket registrationSocket;
|
private final DatagramSocket registrationSocket;
|
||||||
|
/*
|
||||||
|
* Timeout value in millisecond for the rpc connection to portmap
|
||||||
|
*/
|
||||||
|
private final int portmapUdpTimeoutMillis;
|
||||||
|
|
||||||
|
protected RpcProgram(String program, String host, int port, int progNumber,
|
||||||
|
int lowProgVersion, int highProgVersion,
|
||||||
|
DatagramSocket registrationSocket, boolean allowInsecurePorts) {
|
||||||
|
this(program, host, port, progNumber, lowProgVersion, highProgVersion,
|
||||||
|
registrationSocket, allowInsecurePorts, 500);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor
|
* Constructor
|
||||||
*
|
*
|
||||||
|
@ -69,10 +81,12 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
|
||||||
* with portmap daemon
|
* with portmap daemon
|
||||||
* @param allowInsecurePorts true to allow client connections from
|
* @param allowInsecurePorts true to allow client connections from
|
||||||
* unprivileged ports, false otherwise
|
* unprivileged ports, false otherwise
|
||||||
|
* @param portmapUdpTimeoutMillis timeout in milliseconds for RPC connection
|
||||||
*/
|
*/
|
||||||
protected RpcProgram(String program, String host, int port, int progNumber,
|
protected RpcProgram(String program, String host, int port, int progNumber,
|
||||||
int lowProgVersion, int highProgVersion,
|
int lowProgVersion, int highProgVersion,
|
||||||
DatagramSocket registrationSocket, boolean allowInsecurePorts) {
|
DatagramSocket registrationSocket, boolean allowInsecurePorts,
|
||||||
|
int portmapUdpTimeoutMillis) {
|
||||||
this.program = program;
|
this.program = program;
|
||||||
this.host = host;
|
this.host = host;
|
||||||
this.port = port;
|
this.port = port;
|
||||||
|
@ -81,6 +95,7 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
|
||||||
this.highProgVersion = highProgVersion;
|
this.highProgVersion = highProgVersion;
|
||||||
this.registrationSocket = registrationSocket;
|
this.registrationSocket = registrationSocket;
|
||||||
this.allowInsecurePorts = allowInsecurePorts;
|
this.allowInsecurePorts = allowInsecurePorts;
|
||||||
|
this.portmapUdpTimeoutMillis = portmapUdpTimeoutMillis;
|
||||||
LOG.info("Will " + (allowInsecurePorts ? "" : "not ") + "accept client "
|
LOG.info("Will " + (allowInsecurePorts ? "" : "not ") + "accept client "
|
||||||
+ "connections from unprivileged ports");
|
+ "connections from unprivileged ports");
|
||||||
}
|
}
|
||||||
|
@ -124,14 +139,14 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Register the program with Portmap or Rpcbind
|
* Register the program with Portmap or Rpcbind.
|
||||||
* @param mapEntry port map entries
|
* @param mapEntry port map entries
|
||||||
* @param set specifies registration or not
|
* @param set specifies registration or not
|
||||||
*/
|
*/
|
||||||
protected void register(PortmapMapping mapEntry, boolean set) {
|
protected void register(PortmapMapping mapEntry, boolean set) {
|
||||||
XDR mappingRequest = PortmapRequest.create(mapEntry, set);
|
XDR mappingRequest = PortmapRequest.create(mapEntry, set);
|
||||||
SimpleUdpClient registrationClient = new SimpleUdpClient(host, RPCB_PORT,
|
SimpleUdpClient registrationClient = new SimpleUdpClient(host, RPCB_PORT,
|
||||||
mappingRequest, registrationSocket);
|
mappingRequest, true, registrationSocket, portmapUdpTimeoutMillis);
|
||||||
try {
|
try {
|
||||||
registrationClient.run();
|
registrationClient.run();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
@ -238,4 +253,9 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
|
||||||
public int getPort() {
|
public int getPort() {
|
||||||
return port;
|
return port;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public int getPortmapUdpTimeoutMillis() {
|
||||||
|
return portmapUdpTimeoutMillis;
|
||||||
|
}
|
||||||
|
}
|
|
@ -33,19 +33,26 @@ public class SimpleUdpClient {
|
||||||
protected final XDR request;
|
protected final XDR request;
|
||||||
protected final boolean oneShot;
|
protected final boolean oneShot;
|
||||||
protected final DatagramSocket clientSocket;
|
protected final DatagramSocket clientSocket;
|
||||||
|
private int udpTimeoutMillis;
|
||||||
|
|
||||||
public SimpleUdpClient(String host, int port, XDR request,
|
public SimpleUdpClient(String host, int port, XDR request,
|
||||||
DatagramSocket clientSocket) {
|
DatagramSocket clientSocket) {
|
||||||
this(host, port, request, true, clientSocket);
|
this(host, port, request, true, clientSocket, 500);
|
||||||
}
|
}
|
||||||
|
|
||||||
public SimpleUdpClient(String host, int port, XDR request, Boolean oneShot,
|
public SimpleUdpClient(String host, int port, XDR request, Boolean oneShot,
|
||||||
DatagramSocket clientSocket) {
|
DatagramSocket clientSocket) {
|
||||||
|
this(host, port, request, oneShot, clientSocket, 500);
|
||||||
|
}
|
||||||
|
|
||||||
|
public SimpleUdpClient(String host, int port, XDR request, Boolean oneShot,
|
||||||
|
DatagramSocket clientSocket, int udpTimeoutMillis) {
|
||||||
this.host = host;
|
this.host = host;
|
||||||
this.port = port;
|
this.port = port;
|
||||||
this.request = request;
|
this.request = request;
|
||||||
this.oneShot = oneShot;
|
this.oneShot = oneShot;
|
||||||
this.clientSocket = clientSocket;
|
this.clientSocket = clientSocket;
|
||||||
|
this.udpTimeoutMillis = udpTimeoutMillis;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void run() throws IOException {
|
public void run() throws IOException {
|
||||||
|
@ -60,7 +67,7 @@ public class SimpleUdpClient {
|
||||||
DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length,
|
DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length,
|
||||||
IPAddress, port);
|
IPAddress, port);
|
||||||
socket.send(sendPacket);
|
socket.send(sendPacket);
|
||||||
socket.setSoTimeout(500);
|
socket.setSoTimeout(udpTimeoutMillis);
|
||||||
DatagramPacket receivePacket = new DatagramPacket(receiveData,
|
DatagramPacket receivePacket = new DatagramPacket(receiveData,
|
||||||
receiveData.length);
|
receiveData.length);
|
||||||
socket.receive(receivePacket);
|
socket.receive(receivePacket);
|
||||||
|
|
|
@ -86,4 +86,10 @@ public class NfsConfigKeys {
|
||||||
*/
|
*/
|
||||||
public static final String NFS_SUPERUSER_KEY = "nfs.superuser";
|
public static final String NFS_SUPERUSER_KEY = "nfs.superuser";
|
||||||
public static final String NFS_SUPERUSER_DEFAULT = "";
|
public static final String NFS_SUPERUSER_DEFAULT = "";
|
||||||
|
/*
|
||||||
|
* Timeout value in millisecond for rpc connection to portmap
|
||||||
|
*/
|
||||||
|
public static final String NFS_UDP_CLIENT_PORTMAP_TIMEOUT_MILLIS_KEY =
|
||||||
|
"nfs.udp.client.portmap.timeout.millis";
|
||||||
|
public static final int NFS_UDP_CLIENT_PORTMAP_TIMEOUT_MILLIS_DEFAULT = 500;
|
||||||
}
|
}
|
||||||
|
|
|
@ -81,7 +81,9 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface {
|
||||||
super("mountd", "localhost", config.getInt(
|
super("mountd", "localhost", config.getInt(
|
||||||
NfsConfigKeys.DFS_NFS_MOUNTD_PORT_KEY,
|
NfsConfigKeys.DFS_NFS_MOUNTD_PORT_KEY,
|
||||||
NfsConfigKeys.DFS_NFS_MOUNTD_PORT_DEFAULT), PROGRAM, VERSION_1,
|
NfsConfigKeys.DFS_NFS_MOUNTD_PORT_DEFAULT), PROGRAM, VERSION_1,
|
||||||
VERSION_3, registrationSocket, allowInsecurePorts);
|
VERSION_3, registrationSocket, allowInsecurePorts, config.getInt(
|
||||||
|
NfsConfigKeys.NFS_UDP_CLIENT_PORTMAP_TIMEOUT_MILLIS_KEY,
|
||||||
|
NfsConfigKeys.NFS_UDP_CLIENT_PORTMAP_TIMEOUT_MILLIS_DEFAULT));
|
||||||
exports = new ArrayList<String>();
|
exports = new ArrayList<String>();
|
||||||
exports.add(config.get(NfsConfigKeys.DFS_NFS_EXPORT_POINT_KEY,
|
exports.add(config.get(NfsConfigKeys.DFS_NFS_EXPORT_POINT_KEY,
|
||||||
NfsConfigKeys.DFS_NFS_EXPORT_POINT_DEFAULT));
|
NfsConfigKeys.DFS_NFS_EXPORT_POINT_DEFAULT));
|
||||||
|
|
|
@ -173,7 +173,9 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
|
||||||
NfsConfigKeys.DFS_NFS_SERVER_PORT_KEY,
|
NfsConfigKeys.DFS_NFS_SERVER_PORT_KEY,
|
||||||
NfsConfigKeys.DFS_NFS_SERVER_PORT_DEFAULT), Nfs3Constant.PROGRAM,
|
NfsConfigKeys.DFS_NFS_SERVER_PORT_DEFAULT), Nfs3Constant.PROGRAM,
|
||||||
Nfs3Constant.VERSION, Nfs3Constant.VERSION, registrationSocket,
|
Nfs3Constant.VERSION, Nfs3Constant.VERSION, registrationSocket,
|
||||||
allowInsecurePorts);
|
allowInsecurePorts, config.getInt(
|
||||||
|
NfsConfigKeys.NFS_UDP_CLIENT_PORTMAP_TIMEOUT_MILLIS_KEY,
|
||||||
|
NfsConfigKeys.NFS_UDP_CLIENT_PORTMAP_TIMEOUT_MILLIS_DEFAULT));
|
||||||
|
|
||||||
this.config = config;
|
this.config = config;
|
||||||
config.set(FsPermission.UMASK_LABEL, "000");
|
config.set(FsPermission.UMASK_LABEL, "000");
|
||||||
|
|
|
@ -24,12 +24,14 @@ import java.net.InetAddress;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration;
|
import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.nfs.mount.RpcProgramMountd;
|
import org.apache.hadoop.hdfs.nfs.mount.RpcProgramMountd;
|
||||||
import org.apache.hadoop.hdfs.nfs.nfs3.Nfs3;
|
import org.apache.hadoop.hdfs.nfs.nfs3.Nfs3;
|
||||||
import org.apache.hadoop.hdfs.nfs.nfs3.RpcProgramNfs3;
|
import org.apache.hadoop.hdfs.nfs.nfs3.RpcProgramNfs3;
|
||||||
import org.apache.hadoop.oncrpc.XDR;
|
import org.apache.hadoop.oncrpc.XDR;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
public class TestMountd {
|
public class TestMountd {
|
||||||
|
|
||||||
|
@ -47,6 +49,14 @@ public class TestMountd {
|
||||||
config.setInt("nfs3.mountd.port", 0);
|
config.setInt("nfs3.mountd.port", 0);
|
||||||
config.setInt("nfs3.server.port", 0);
|
config.setInt("nfs3.server.port", 0);
|
||||||
|
|
||||||
|
int newTimeoutMillis = 1000; // 1s
|
||||||
|
// Set the new portmap rpc timeout values and check
|
||||||
|
config.setInt(NfsConfigKeys.NFS_UDP_CLIENT_PORTMAP_TIMEOUT_MILLIS_KEY,
|
||||||
|
newTimeoutMillis);
|
||||||
|
assertTrue(config.getInt(
|
||||||
|
NfsConfigKeys.NFS_UDP_CLIENT_PORTMAP_TIMEOUT_MILLIS_KEY,
|
||||||
|
0) == newTimeoutMillis);
|
||||||
|
|
||||||
// Start nfs
|
// Start nfs
|
||||||
Nfs3 nfs3 = new Nfs3(config);
|
Nfs3 nfs3 = new Nfs3(config);
|
||||||
nfs3.startServiceInternal(false);
|
nfs3.startServiceInternal(false);
|
||||||
|
@ -54,9 +64,10 @@ public class TestMountd {
|
||||||
RpcProgramMountd mountd = (RpcProgramMountd) nfs3.getMountd()
|
RpcProgramMountd mountd = (RpcProgramMountd) nfs3.getMountd()
|
||||||
.getRpcProgram();
|
.getRpcProgram();
|
||||||
mountd.nullOp(new XDR(), 1234, InetAddress.getByName("localhost"));
|
mountd.nullOp(new XDR(), 1234, InetAddress.getByName("localhost"));
|
||||||
|
assertTrue(mountd.getPortmapUdpTimeoutMillis() == newTimeoutMillis);
|
||||||
RpcProgramNfs3 nfsd = (RpcProgramNfs3) nfs3.getRpcProgram();
|
RpcProgramNfs3 nfsd = (RpcProgramNfs3) nfs3.getRpcProgram();
|
||||||
nfsd.nullProcedure();
|
nfsd.nullProcedure();
|
||||||
|
assertTrue(nfsd.getPortmapUdpTimeoutMillis() == newTimeoutMillis);
|
||||||
|
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue