HADOOP-11757. NFS gateway should shutdown when it can't start UDP or TCP server. Contributed by Brandon Li

(cherry picked from commit 60ce825a71)
This commit is contained in:
Brandon Li 2015-04-01 17:04:44 -07:00
parent cfcf795492
commit 8e2f1a93e4
5 changed files with 82 additions and 28 deletions

View File

@ -771,6 +771,9 @@ Release 2.7.0 - UNRELEASED
HADOOP-11787. OpensslSecureRandom.c pthread_threadid_np usage signature is HADOOP-11787. OpensslSecureRandom.c pthread_threadid_np usage signature is
wrong on 32-bit Mac. (Kiran Kumar M R via cnauroth) wrong on 32-bit Mac. (Kiran Kumar M R via cnauroth)
HADOOP-11757. NFS gateway should shutdown when it can't start UDP or TCP
server (brandonli)
Release 2.6.1 - UNRELEASED Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -60,7 +60,17 @@ abstract public class MountdBase {
SimpleUdpServer udpServer = new SimpleUdpServer(rpcProgram.getPort(), SimpleUdpServer udpServer = new SimpleUdpServer(rpcProgram.getPort(),
rpcProgram, 1); rpcProgram, 1);
rpcProgram.startDaemons(); rpcProgram.startDaemons();
udpServer.run(); try {
udpServer.run();
} catch (Throwable e) {
LOG.fatal("Failed to start the UDP server.", e);
if (udpServer.getBoundPort() > 0) {
rpcProgram.unregister(PortmapMapping.TRANSPORT_UDP,
udpServer.getBoundPort());
}
udpServer.shutdown();
terminate(1, e);
}
udpBoundPort = udpServer.getBoundPort(); udpBoundPort = udpServer.getBoundPort();
} }
@ -69,7 +79,17 @@ abstract public class MountdBase {
SimpleTcpServer tcpServer = new SimpleTcpServer(rpcProgram.getPort(), SimpleTcpServer tcpServer = new SimpleTcpServer(rpcProgram.getPort(),
rpcProgram, 1); rpcProgram, 1);
rpcProgram.startDaemons(); rpcProgram.startDaemons();
tcpServer.run(); try {
tcpServer.run();
} catch (Throwable e) {
LOG.fatal("Failed to start the TCP server.", e);
if (tcpServer.getBoundPort() > 0) {
rpcProgram.unregister(PortmapMapping.TRANSPORT_TCP,
tcpServer.getBoundPort());
}
tcpServer.shutdown();
terminate(1, e);
}
tcpBoundPort = tcpServer.getBoundPort(); tcpBoundPort = tcpServer.getBoundPort();
} }
@ -83,7 +103,7 @@ abstract public class MountdBase {
rpcProgram.register(PortmapMapping.TRANSPORT_UDP, udpBoundPort); rpcProgram.register(PortmapMapping.TRANSPORT_UDP, udpBoundPort);
rpcProgram.register(PortmapMapping.TRANSPORT_TCP, tcpBoundPort); rpcProgram.register(PortmapMapping.TRANSPORT_TCP, tcpBoundPort);
} catch (Throwable e) { } catch (Throwable e) {
LOG.fatal("Failed to start the server. Cause:", e); LOG.fatal("Failed to register the MOUNT service.", e);
terminate(1, e); terminate(1, e);
} }
} }

View File

@ -29,7 +29,6 @@ import static org.apache.hadoop.util.ExitUtil.terminate;
/** /**
* Nfs server. Supports NFS v3 using {@link RpcProgram}. * Nfs server. Supports NFS v3 using {@link RpcProgram}.
* Currently Mountd program is also started inside this class.
* Only TCP server is supported and UDP is not supported. * Only TCP server is supported and UDP is not supported.
*/ */
public abstract class Nfs3Base { public abstract class Nfs3Base {
@ -55,7 +54,7 @@ public abstract class Nfs3Base {
try { try {
rpcProgram.register(PortmapMapping.TRANSPORT_TCP, nfsBoundPort); rpcProgram.register(PortmapMapping.TRANSPORT_TCP, nfsBoundPort);
} catch (Throwable e) { } catch (Throwable e) {
LOG.fatal("Failed to start the server. Cause:", e); LOG.fatal("Failed to register the NFSv3 service.", e);
terminate(1, e); terminate(1, e);
} }
} }
@ -65,7 +64,17 @@ public abstract class Nfs3Base {
SimpleTcpServer tcpServer = new SimpleTcpServer(rpcProgram.getPort(), SimpleTcpServer tcpServer = new SimpleTcpServer(rpcProgram.getPort(),
rpcProgram, 0); rpcProgram, 0);
rpcProgram.startDaemons(); rpcProgram.startDaemons();
tcpServer.run(); try {
tcpServer.run();
} catch (Throwable e) {
LOG.fatal("Failed to start the TCP server.", e);
if (tcpServer.getBoundPort() > 0) {
rpcProgram.unregister(PortmapMapping.TRANSPORT_TCP,
tcpServer.getBoundPort());
}
tcpServer.shutdown();
terminate(1, e);
}
nfsBoundPort = tcpServer.getBoundPort(); nfsBoundPort = tcpServer.getBoundPort();
} }

View File

@ -39,6 +39,8 @@ public class SimpleTcpServer {
protected final int port; protected final int port;
protected int boundPort = -1; // Will be set after server starts protected int boundPort = -1; // Will be set after server starts
protected final SimpleChannelUpstreamHandler rpcProgram; protected final SimpleChannelUpstreamHandler rpcProgram;
private ServerBootstrap server;
private Channel ch;
/** The maximum number of I/O worker threads */ /** The maximum number of I/O worker threads */
protected final int workerCount; protected final int workerCount;
@ -67,8 +69,8 @@ public class SimpleTcpServer {
workerCount); workerCount);
} }
ServerBootstrap bootstrap = new ServerBootstrap(factory); server = new ServerBootstrap(factory);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() { server.setPipelineFactory(new ChannelPipelineFactory() {
@Override @Override
public ChannelPipeline getPipeline() throws Exception { public ChannelPipeline getPipeline() throws Exception {
@ -77,11 +79,11 @@ public class SimpleTcpServer {
RpcUtil.STAGE_RPC_TCP_RESPONSE); RpcUtil.STAGE_RPC_TCP_RESPONSE);
} }
}); });
bootstrap.setOption("child.tcpNoDelay", true); server.setOption("child.tcpNoDelay", true);
bootstrap.setOption("child.keepAlive", true); server.setOption("child.keepAlive", true);
// Listen to TCP port // Listen to TCP port
Channel ch = bootstrap.bind(new InetSocketAddress(port)); ch = server.bind(new InetSocketAddress(port));
InetSocketAddress socketAddr = (InetSocketAddress) ch.getLocalAddress(); InetSocketAddress socketAddr = (InetSocketAddress) ch.getLocalAddress();
boundPort = socketAddr.getPort(); boundPort = socketAddr.getPort();
@ -93,4 +95,13 @@ public class SimpleTcpServer {
public int getBoundPort() { public int getBoundPort() {
return this.boundPort; return this.boundPort;
} }
public void shutdown() {
if (ch != null) {
ch.close().awaitUninterruptibly();
}
if (server != null) {
server.releaseExternalResources();
}
}
} }

View File

@ -41,8 +41,11 @@ public class SimpleUdpServer {
protected final SimpleChannelUpstreamHandler rpcProgram; protected final SimpleChannelUpstreamHandler rpcProgram;
protected final int workerCount; protected final int workerCount;
protected int boundPort = -1; // Will be set after server starts protected int boundPort = -1; // Will be set after server starts
private ConnectionlessBootstrap server;
private Channel ch;
public SimpleUdpServer(int port, SimpleChannelUpstreamHandler program, int workerCount) { public SimpleUdpServer(int port, SimpleChannelUpstreamHandler program,
int workerCount) {
this.port = port; this.port = port;
this.rpcProgram = program; this.rpcProgram = program;
this.workerCount = workerCount; this.workerCount = workerCount;
@ -53,17 +56,16 @@ public class SimpleUdpServer {
DatagramChannelFactory f = new NioDatagramChannelFactory( DatagramChannelFactory f = new NioDatagramChannelFactory(
Executors.newCachedThreadPool(), workerCount); Executors.newCachedThreadPool(), workerCount);
ConnectionlessBootstrap b = new ConnectionlessBootstrap(f); server = new ConnectionlessBootstrap(f);
b.setPipeline(Channels.pipeline( server.setPipeline(Channels.pipeline(RpcUtil.STAGE_RPC_MESSAGE_PARSER,
RpcUtil.STAGE_RPC_MESSAGE_PARSER, rpcProgram, rpcProgram, RpcUtil.STAGE_RPC_UDP_RESPONSE));
RpcUtil.STAGE_RPC_UDP_RESPONSE));
b.setOption("broadcast", "false"); server.setOption("broadcast", "false");
b.setOption("sendBufferSize", SEND_BUFFER_SIZE); server.setOption("sendBufferSize", SEND_BUFFER_SIZE);
b.setOption("receiveBufferSize", RECEIVE_BUFFER_SIZE); server.setOption("receiveBufferSize", RECEIVE_BUFFER_SIZE);
// Listen to the UDP port // Listen to the UDP port
Channel ch = b.bind(new InetSocketAddress(port)); ch = server.bind(new InetSocketAddress(port));
InetSocketAddress socketAddr = (InetSocketAddress) ch.getLocalAddress(); InetSocketAddress socketAddr = (InetSocketAddress) ch.getLocalAddress();
boundPort = socketAddr.getPort(); boundPort = socketAddr.getPort();
@ -75,4 +77,13 @@ public class SimpleUdpServer {
public int getBoundPort() { public int getBoundPort() {
return this.boundPort; return this.boundPort;
} }
public void shutdown() {
if (ch != null) {
ch.close().awaitUninterruptibly();
}
if (server != null) {
server.releaseExternalResources();
}
}
} }