diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 52048a809c3..d17db41b7f3 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -770,7 +770,10 @@ Release 2.7.0 - UNRELEASED HADOOP-11787. OpensslSecureRandom.c pthread_threadid_np usage signature is wrong on 32-bit Mac. (Kiran Kumar M R via cnauroth) - + + HADOOP-11757. NFS gateway should shutdown when it can't start UDP or TCP + server (brandonli) + Release 2.6.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/mount/MountdBase.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/mount/MountdBase.java index 8d7d6dc83c7..92ca7ec84f2 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/mount/MountdBase.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/mount/MountdBase.java @@ -60,7 +60,17 @@ abstract public class MountdBase { SimpleUdpServer udpServer = new SimpleUdpServer(rpcProgram.getPort(), rpcProgram, 1); rpcProgram.startDaemons(); - udpServer.run(); + try { + udpServer.run(); + } catch (Throwable e) { + LOG.fatal("Failed to start the UDP server.", e); + if (udpServer.getBoundPort() > 0) { + rpcProgram.unregister(PortmapMapping.TRANSPORT_UDP, + udpServer.getBoundPort()); + } + udpServer.shutdown(); + terminate(1, e); + } udpBoundPort = udpServer.getBoundPort(); } @@ -69,7 +79,17 @@ abstract public class MountdBase { SimpleTcpServer tcpServer = new SimpleTcpServer(rpcProgram.getPort(), rpcProgram, 1); rpcProgram.startDaemons(); - tcpServer.run(); + try { + tcpServer.run(); + } catch (Throwable e) { + LOG.fatal("Failed to start the TCP server.", e); + if (tcpServer.getBoundPort() > 0) { + rpcProgram.unregister(PortmapMapping.TRANSPORT_TCP, + tcpServer.getBoundPort()); + } + tcpServer.shutdown(); + terminate(1, e); + } tcpBoundPort = tcpServer.getBoundPort(); } @@ -83,7 +103,7 @@ abstract public class MountdBase { rpcProgram.register(PortmapMapping.TRANSPORT_UDP, udpBoundPort); rpcProgram.register(PortmapMapping.TRANSPORT_TCP, tcpBoundPort); } catch (Throwable e) { - LOG.fatal("Failed to start the server. Cause:", e); + LOG.fatal("Failed to register the MOUNT service.", e); terminate(1, e); } } diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java index 40744bc16c4..80faca56f6c 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java @@ -29,7 +29,6 @@ import static org.apache.hadoop.util.ExitUtil.terminate; /** * Nfs server. Supports NFS v3 using {@link RpcProgram}. - * Currently Mountd program is also started inside this class. * Only TCP server is supported and UDP is not supported. */ public abstract class Nfs3Base { @@ -55,7 +54,7 @@ public abstract class Nfs3Base { try { rpcProgram.register(PortmapMapping.TRANSPORT_TCP, nfsBoundPort); } catch (Throwable e) { - LOG.fatal("Failed to start the server. Cause:", e); + LOG.fatal("Failed to register the NFSv3 service.", e); terminate(1, e); } } @@ -65,7 +64,17 @@ public abstract class Nfs3Base { SimpleTcpServer tcpServer = new SimpleTcpServer(rpcProgram.getPort(), rpcProgram, 0); rpcProgram.startDaemons(); - tcpServer.run(); + try { + tcpServer.run(); + } catch (Throwable e) { + LOG.fatal("Failed to start the TCP server.", e); + if (tcpServer.getBoundPort() > 0) { + rpcProgram.unregister(PortmapMapping.TRANSPORT_TCP, + tcpServer.getBoundPort()); + } + tcpServer.shutdown(); + terminate(1, e); + } nfsBoundPort = tcpServer.getBoundPort(); } diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java index 949fdca5dc3..99d1d6f0830 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java @@ -39,7 +39,9 @@ public class SimpleTcpServer { protected final int port; protected int boundPort = -1; // Will be set after server starts protected final SimpleChannelUpstreamHandler rpcProgram; - + private ServerBootstrap server; + private Channel ch; + /** The maximum number of I/O worker threads */ protected final int workerCount; @@ -53,7 +55,7 @@ public class SimpleTcpServer { this.rpcProgram = program; this.workerCount = workercount; } - + public void run() { // Configure the Server. ChannelFactory factory; @@ -66,9 +68,9 @@ public class SimpleTcpServer { Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), workerCount); } - - ServerBootstrap bootstrap = new ServerBootstrap(factory); - bootstrap.setPipelineFactory(new ChannelPipelineFactory() { + + server = new ServerBootstrap(factory); + server.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { @@ -77,14 +79,14 @@ public class SimpleTcpServer { RpcUtil.STAGE_RPC_TCP_RESPONSE); } }); - bootstrap.setOption("child.tcpNoDelay", true); - bootstrap.setOption("child.keepAlive", true); - + server.setOption("child.tcpNoDelay", true); + server.setOption("child.keepAlive", true); + // Listen to TCP port - Channel ch = bootstrap.bind(new InetSocketAddress(port)); + ch = server.bind(new InetSocketAddress(port)); InetSocketAddress socketAddr = (InetSocketAddress) ch.getLocalAddress(); boundPort = socketAddr.getPort(); - + LOG.info("Started listening to TCP requests at port " + boundPort + " for " + rpcProgram + " with workerCount " + workerCount); } @@ -93,4 +95,13 @@ public class SimpleTcpServer { public int getBoundPort() { return this.boundPort; } + + public void shutdown() { + if (ch != null) { + ch.close().awaitUninterruptibly(); + } + if (server != null) { + server.releaseExternalResources(); + } + } } diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java index 8e77fce36b3..c9cb2b3dd1d 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java @@ -41,8 +41,11 @@ public class SimpleUdpServer { protected final SimpleChannelUpstreamHandler rpcProgram; protected final int workerCount; protected int boundPort = -1; // Will be set after server starts + private ConnectionlessBootstrap server; + private Channel ch; - public SimpleUdpServer(int port, SimpleChannelUpstreamHandler program, int workerCount) { + public SimpleUdpServer(int port, SimpleChannelUpstreamHandler program, + int workerCount) { this.port = port; this.rpcProgram = program; this.workerCount = workerCount; @@ -53,20 +56,19 @@ public class SimpleUdpServer { DatagramChannelFactory f = new NioDatagramChannelFactory( Executors.newCachedThreadPool(), workerCount); - ConnectionlessBootstrap b = new ConnectionlessBootstrap(f); - b.setPipeline(Channels.pipeline( - RpcUtil.STAGE_RPC_MESSAGE_PARSER, rpcProgram, - RpcUtil.STAGE_RPC_UDP_RESPONSE)); + server = new ConnectionlessBootstrap(f); + server.setPipeline(Channels.pipeline(RpcUtil.STAGE_RPC_MESSAGE_PARSER, + rpcProgram, RpcUtil.STAGE_RPC_UDP_RESPONSE)); + + server.setOption("broadcast", "false"); + server.setOption("sendBufferSize", SEND_BUFFER_SIZE); + server.setOption("receiveBufferSize", RECEIVE_BUFFER_SIZE); - b.setOption("broadcast", "false"); - b.setOption("sendBufferSize", SEND_BUFFER_SIZE); - b.setOption("receiveBufferSize", RECEIVE_BUFFER_SIZE); - // Listen to the UDP port - Channel ch = b.bind(new InetSocketAddress(port)); + ch = server.bind(new InetSocketAddress(port)); InetSocketAddress socketAddr = (InetSocketAddress) ch.getLocalAddress(); boundPort = socketAddr.getPort(); - + LOG.info("Started listening to UDP requests at port " + boundPort + " for " + rpcProgram + " with workerCount " + workerCount); } @@ -75,4 +77,13 @@ public class SimpleUdpServer { public int getBoundPort() { return this.boundPort; } + + public void shutdown() { + if (ch != null) { + ch.close().awaitUninterruptibly(); + } + if (server != null) { + server.releaseExternalResources(); + } + } }