diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java index 21e99b6a951..77bc7f6ef2b 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/Portmap.java @@ -17,42 +17,111 @@ */ package org.apache.hadoop.portmap; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.oncrpc.RpcProgram; -import org.apache.hadoop.oncrpc.SimpleTcpServer; -import org.apache.hadoop.oncrpc.SimpleUdpServer; +import org.apache.hadoop.oncrpc.RpcUtil; import org.apache.hadoop.util.StringUtils; +import org.jboss.netty.bootstrap.ConnectionlessBootstrap; +import org.jboss.netty.bootstrap.ServerBootstrap; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.group.ChannelGroup; +import org.jboss.netty.channel.group.DefaultChannelGroup; +import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory; +import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; +import org.jboss.netty.handler.timeout.IdleStateHandler; +import org.jboss.netty.util.HashedWheelTimer; + +import com.google.common.annotations.VisibleForTesting; /** * Portmap service for binding RPC protocols. See RFC 1833 for details. */ -public class Portmap { - public static final Log LOG = LogFactory.getLog(Portmap.class); +final class Portmap { + private static final Log LOG = LogFactory.getLog(Portmap.class); + private static final int DEFAULT_IDLE_TIME_MILLISECONDS = 5000; - private static void startUDPServer(RpcProgramPortmap rpcProgram) { - rpcProgram.register(PortmapMapping.TRANSPORT_UDP, RpcProgram.RPCB_PORT); - SimpleUdpServer udpServer = new SimpleUdpServer(RpcProgram.RPCB_PORT, - rpcProgram, 1); - udpServer.run(); - } - - private static void startTCPServer(final RpcProgramPortmap rpcProgram) { - rpcProgram.register(PortmapMapping.TRANSPORT_TCP, RpcProgram.RPCB_PORT); - SimpleTcpServer tcpServer = new SimpleTcpServer(RpcProgram.RPCB_PORT, - rpcProgram, 1); - tcpServer.run(); - } + private ConnectionlessBootstrap udpServer; + private ServerBootstrap tcpServer; + private ChannelGroup allChannels = new DefaultChannelGroup(); + private Channel udpChannel; + private Channel tcpChannel; + private final RpcProgramPortmap handler = new RpcProgramPortmap(allChannels); public static void main(String[] args) { StringUtils.startupShutdownMessage(Portmap.class, args, LOG); - RpcProgramPortmap program = new RpcProgramPortmap(); + + final int port = RpcProgram.RPCB_PORT; + Portmap pm = new Portmap(); try { - startUDPServer(program); - startTCPServer(program); + pm.start(DEFAULT_IDLE_TIME_MILLISECONDS, + new InetSocketAddress(port), new InetSocketAddress(port)); } catch (Throwable e) { - LOG.fatal("Start server failure"); + LOG.fatal("Failed to start the server. Cause:" + e.getMessage()); + pm.shutdown(); System.exit(-1); } } + + void shutdown() { + allChannels.close().awaitUninterruptibly(); + tcpServer.releaseExternalResources(); + udpServer.releaseExternalResources(); + } + + @VisibleForTesting + SocketAddress getTcpServerLocalAddress() { + return tcpChannel.getLocalAddress(); + } + + @VisibleForTesting + SocketAddress getUdpServerLoAddress() { + return udpChannel.getLocalAddress(); + } + + @VisibleForTesting + RpcProgramPortmap getHandler() { + return handler; + } + + void start(final int idleTimeMilliSeconds, final SocketAddress tcpAddress, + final SocketAddress udpAddress) { + + tcpServer = new ServerBootstrap(new NioServerSocketChannelFactory( + Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); + tcpServer.setPipelineFactory(new ChannelPipelineFactory() { + private final HashedWheelTimer timer = new HashedWheelTimer(); + private final IdleStateHandler idleStateHandler = new IdleStateHandler( + timer, 0, 0, idleTimeMilliSeconds, TimeUnit.MILLISECONDS); + + @Override + public ChannelPipeline getPipeline() throws Exception { + return Channels.pipeline(RpcUtil.constructRpcFrameDecoder(), + RpcUtil.STAGE_RPC_MESSAGE_PARSER, idleStateHandler, handler, + RpcUtil.STAGE_RPC_TCP_RESPONSE); + } + }); + + udpServer = new ConnectionlessBootstrap(new NioDatagramChannelFactory( + Executors.newCachedThreadPool())); + + udpServer.setPipeline(Channels.pipeline(RpcUtil.STAGE_RPC_MESSAGE_PARSER, + handler, RpcUtil.STAGE_RPC_UDP_RESPONSE)); + + tcpChannel = tcpServer.bind(tcpAddress); + udpChannel = udpServer.bind(udpAddress); + allChannels.add(tcpChannel); + allChannels.add(udpChannel); + + LOG.info("Portmap server started at tcp://" + tcpChannel.getLocalAddress() + + ", udp://" + udpChannel.getLocalAddress()); + } } diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapResponse.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapResponse.java index cbbe1358bb7..d128072fc04 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapResponse.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapResponse.java @@ -17,9 +17,6 @@ */ package org.apache.hadoop.portmap; -import java.util.Arrays; -import java.util.Collection; - import org.apache.hadoop.oncrpc.RpcAcceptedReply; import org.apache.hadoop.oncrpc.XDR; import org.apache.hadoop.oncrpc.security.VerifierNone; @@ -45,18 +42,13 @@ public class PortmapResponse { return xdr; } - public static XDR pmapList(XDR xdr, int xid, Collection list) { + public static XDR pmapList(XDR xdr, int xid, PortmapMapping[] list) { RpcAcceptedReply.getAcceptInstance(xid, new VerifierNone()).write(xdr); for (PortmapMapping mapping : list) { - System.out.println(mapping); xdr.writeBoolean(true); // Value follows mapping.serialize(xdr); } xdr.writeBoolean(false); // No value follows return xdr; } - - public static XDR pmapList(XDR xdr, int xid, PortmapMapping[] list) { - return pmapList(xdr, xid, Arrays.asList(list)); - } } diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java index bd9f48cb524..d68657cc42f 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java @@ -18,8 +18,6 @@ package org.apache.hadoop.portmap; import java.util.HashMap; -import java.util.Map.Entry; -import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,31 +32,34 @@ import org.apache.hadoop.oncrpc.security.VerifierNone; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.group.ChannelGroup; +import org.jboss.netty.handler.timeout.IdleState; +import org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler; +import org.jboss.netty.handler.timeout.IdleStateEvent; -/** - * An rpcbind request handler. - */ -public class RpcProgramPortmap extends RpcProgram implements PortmapInterface { - public static final int PROGRAM = 100000; - public static final int VERSION = 2; - +final class RpcProgramPortmap extends IdleStateAwareChannelUpstreamHandler implements PortmapInterface { + static final int PROGRAM = 100000; + static final int VERSION = 2; private static final Log LOG = LogFactory.getLog(RpcProgramPortmap.class); /** Map synchronized usis monitor lock of this instance */ private final HashMap map; - public RpcProgramPortmap() { - super("portmap", "localhost", RPCB_PORT, PROGRAM, VERSION, VERSION); - map = new HashMap(256); - } + /** ChannelGroup that remembers all active channels for gracefully shutdown. */ + private final ChannelGroup allChannels; - /** Dump all the register RPC services */ - private synchronized void dumpRpcServices() { - Set> entrySet = map.entrySet(); - for (Entry entry : entrySet) { - LOG.info("Service: " + entry.getKey() + " portmapping: " - + entry.getValue()); - } + RpcProgramPortmap(ChannelGroup allChannels) { + this.allChannels = allChannels; + map = new HashMap(256); + PortmapMapping m = new PortmapMapping(PROGRAM, VERSION, + PortmapMapping.TRANSPORT_TCP, RpcProgram.RPCB_PORT); + PortmapMapping m1 = new PortmapMapping(PROGRAM, VERSION, + PortmapMapping.TRANSPORT_UDP, RpcProgram.RPCB_PORT); + map.put(PortmapMapping.key(m), m); + map.put(PortmapMapping.key(m1), m1); } @Override @@ -77,7 +78,6 @@ public class RpcProgramPortmap extends RpcProgram implements PortmapInterface { PortmapMapping value = null; synchronized(this) { map.put(key, mapping); - dumpRpcServices(); value = map.get(key); } return PortmapResponse.intReply(out, xid, value.getPort()); @@ -126,21 +126,15 @@ public class RpcProgramPortmap extends RpcProgram implements PortmapInterface { } @Override - public void register(PortmapMapping mapping) { - String key = PortmapMapping.key(mapping); - synchronized(this) { - map.put(key, mapping); - } - } + public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) + throws Exception { - @Override - public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) { + RpcInfo info = (RpcInfo) e.getMessage(); RpcCall rpcCall = (RpcCall) info.header(); final Procedure portmapProc = Procedure.fromValue(rpcCall.getProcedure()); int xid = rpcCall.getXid(); - byte[] data = new byte[info.data().readableBytes()]; - info.data().readBytes(data); - XDR in = new XDR(data); + XDR in = new XDR(info.data().toByteBuffer().asReadOnlyBuffer(), + XDR.State.READING); XDR out = new XDR(); if (portmapProc == Procedure.PMAPPROC_NULL) { @@ -162,13 +156,29 @@ public class RpcProgramPortmap extends RpcProgram implements PortmapInterface { reply.write(out); } - ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer()); + ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap() + .buffer()); RpcResponse rsp = new RpcResponse(buf, info.remoteAddress()); RpcUtil.sendRpcResponse(ctx, rsp); } @Override - protected boolean isIdempotent(RpcCall call) { - return false; + public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) + throws Exception { + allChannels.add(e.getChannel()); + } + + @Override + public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) + throws Exception { + if (e.getState() == IdleState.ALL_IDLE) { + e.getChannel().close(); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { + LOG.warn("Encountered ", e.getCause()); + e.getChannel().close(); } } diff --git a/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java new file mode 100644 index 00000000000..2ed16bb13e6 --- /dev/null +++ b/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.portmap; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.HashMap; + +import junit.framework.Assert; + +import org.apache.hadoop.oncrpc.RpcCall; +import org.apache.hadoop.oncrpc.XDR; +import org.apache.hadoop.oncrpc.security.CredentialsNone; +import org.apache.hadoop.oncrpc.security.VerifierNone; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; + +public class TestPortmap { + private static Portmap pm = new Portmap(); + private static final int SHORT_TIMEOUT_MILLISECONDS = 10; + private static final int RETRY_TIMES = 5; + private int xid; + + @BeforeClass + public static void setup() { + pm.start(SHORT_TIMEOUT_MILLISECONDS, new InetSocketAddress("localhost", 0), + new InetSocketAddress("localhost", 0)); + } + + @AfterClass + public static void tearDown() { + pm.shutdown(); + } + + @Test(timeout = 1000) + public void testIdle() throws InterruptedException, IOException { + Socket s = new Socket(); + try { + s.connect(pm.getTcpServerLocalAddress()); + + int i = 0; + while (!s.isConnected() && i < RETRY_TIMES) { + ++i; + Thread.sleep(SHORT_TIMEOUT_MILLISECONDS); + } + + Assert.assertTrue("Failed to connect to the server", s.isConnected() + && i < RETRY_TIMES); + + int b = s.getInputStream().read(); + Assert.assertTrue("The server failed to disconnect", b == -1); + } finally { + s.close(); + } + } + + @Test(timeout = 1000) + public void testRegistration() throws IOException, InterruptedException { + XDR req = new XDR(); + RpcCall.getInstance(++xid, RpcProgramPortmap.PROGRAM, + RpcProgramPortmap.VERSION, + PortmapInterface.Procedure.PMAPPROC_SET.getValue(), + new CredentialsNone(), new VerifierNone()).write(req); + + PortmapMapping sent = new PortmapMapping(90000, 1, + PortmapMapping.TRANSPORT_TCP, 1234); + sent.serialize(req); + + byte[] reqBuf = req.getBytes(); + DatagramSocket s = new DatagramSocket(); + DatagramPacket p = new DatagramPacket(reqBuf, reqBuf.length, + pm.getUdpServerLoAddress()); + try { + s.send(p); + } finally { + s.close(); + } + + // Give the server a chance to process the request + Thread.sleep(100); + boolean found = false; + @SuppressWarnings("unchecked") + HashMap map = (HashMap) Whitebox + .getInternalState(pm.getHandler(), "map"); + + for (PortmapMapping m : map.values()) { + if (m.getPort() == sent.getPort() + && PortmapMapping.key(m).equals(PortmapMapping.key(sent))) { + found = true; + break; + } + } + Assert.assertTrue("Registration failed", found); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestPortmapRegister.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestPortmapRegister.java deleted file mode 100644 index 769aa480202..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestPortmapRegister.java +++ /dev/null @@ -1,141 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs.nfs; - - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hdfs.nfs.mount.RpcProgramMountd; -import org.apache.hadoop.nfs.nfs3.Nfs3Constant; -import org.apache.hadoop.oncrpc.RegistrationClient; -import org.apache.hadoop.oncrpc.RpcCall; -import org.apache.hadoop.oncrpc.XDR; -import org.apache.hadoop.oncrpc.security.CredentialsNone; -import org.apache.hadoop.oncrpc.security.VerifierNone; -import org.apache.hadoop.portmap.PortmapMapping; -import org.apache.hadoop.portmap.PortmapRequest; - -public class TestPortmapRegister { - - public static final Log LOG = LogFactory.getLog(TestPortmapRegister.class); - - static void testRequest(XDR request, XDR request2) { - RegistrationClient registrationClient = new RegistrationClient( - "localhost", Nfs3Constant.SUN_RPCBIND, request); - registrationClient.run(); - } - - public static void main(String[] args) throws InterruptedException { - PortmapMapping mapEntry = new PortmapMapping(RpcProgramMountd.PROGRAM, - RpcProgramMountd.VERSION_1, PortmapMapping.TRANSPORT_UDP, - RpcProgramMountd.PORT); - XDR mappingRequest = PortmapRequest.create(mapEntry); - RegistrationClient registrationClient = new RegistrationClient( - "localhost", Nfs3Constant.SUN_RPCBIND, mappingRequest); - registrationClient.run(); - - Thread t1 = new Runtest1(); - //Thread t2 = testa.new Runtest2(); - t1.start(); - //t2.start(); - t1.join(); - //t2.join(); - //testDump(); - } - - static class Runtest1 extends Thread { - @Override - public void run() { - //testGetportMount(); - PortmapMapping mapEntry = new PortmapMapping(RpcProgramMountd.PROGRAM, - RpcProgramMountd.VERSION_1, PortmapMapping.TRANSPORT_UDP, - RpcProgramMountd.PORT); - XDR req = PortmapRequest.create(mapEntry); - testRequest(req, req); - } - } - - static class Runtest2 extends Thread { - @Override - public void run() { - testDump(); - } - } - - static void createPortmapXDRheader(XDR xdr_out, int procedure) { - // TODO: Move this to RpcRequest - RpcCall.getInstance(0, 100000, 2, procedure, new CredentialsNone(), - new VerifierNone()).write(xdr_out); - - /* - xdr_out.putInt(1); //unix auth - xdr_out.putVariableOpaque(new byte[20]); - xdr_out.putInt(0); - xdr_out.putInt(0); -*/ - } - - static void testGetportMount() { - XDR xdr_out = new XDR(); - - createPortmapXDRheader(xdr_out, 3); - - xdr_out.writeInt(100005); - xdr_out.writeInt(1); - xdr_out.writeInt(6); - xdr_out.writeInt(0); - - XDR request2 = new XDR(); - - createPortmapXDRheader(xdr_out, 3); - request2.writeInt(100005); - request2.writeInt(1); - request2.writeInt(6); - request2.writeInt(0); - - testRequest(xdr_out, request2); - } - - static void testGetport() { - XDR xdr_out = new XDR(); - - createPortmapXDRheader(xdr_out, 3); - - xdr_out.writeInt(100003); - xdr_out.writeInt(3); - xdr_out.writeInt(6); - xdr_out.writeInt(0); - - XDR request2 = new XDR(); - - createPortmapXDRheader(xdr_out, 3); - request2.writeInt(100003); - request2.writeInt(3); - request2.writeInt(6); - request2.writeInt(0); - - testRequest(xdr_out, request2); - } - - static void testDump() { - XDR xdr_out = new XDR(); - createPortmapXDRheader(xdr_out, 4); - testRequest(xdr_out, xdr_out); - } -} diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 31b18dedd32..c3dca7bc2f1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -311,6 +311,8 @@ Release 2.2.1 - UNRELEASED HDFS-5014. Process register commands with out holding BPOfferService lock. (Vinaykumar B via umamahesh) + HDFS-5288. Close idle connections in portmap (Haohui Mai via brandonli) + Release 2.2.0 - 2013-10-13 INCOMPATIBLE CHANGES