diff --git a/CHANGES.txt b/CHANGES.txt index 4a0f307fe6c..37a13e9919e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -76,6 +76,9 @@ Trunk (unreleased changes) HADOOP-6657. Add a capitalization method to StringUtils for MAPREDUCE-1545. (Luke Lu via Steve Loughran) + HADOOP-6692. Add FileContext#listStatus that returns an iterator. + (hairong) + IMPROVEMENTS HADOOP-6283. Improve the exception messages thrown by @@ -243,8 +246,8 @@ Trunk (unreleased changes) HADOOP-6689. Add directory renaming test to existing FileContext tests. (Eli Collins via suresh) - HADOOP-6692. Add FileContext#listStatus that returns an iterator. - (hairong) + HADOOP-6713. The RPC server Listener thread is a scalability bottleneck. + (Dmytro Molkov via hairong) BUG FIXES diff --git a/src/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/src/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index 8722e405156..c989da80998 100644 --- a/src/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/src/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -123,6 +123,9 @@ public class CommonConfigurationKeys { "ipc.server.max.response.size"; public static final int IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT = 1024*1024; + public static final String IPC_SERVER_RPC_READ_THREADS_KEY = + "ipc.server.read.threadpool.size"; + public static final int IPC_SERVER_RPC_READ_THREADS_DEFAULT = 1; /** * How many calls per handler are allowed in the queue. */ diff --git a/src/java/org/apache/hadoop/ipc/Server.java b/src/java/org/apache/hadoop/ipc/Server.java index 76068c9cabf..cf6299f10bb 100644 --- a/src/java/org/apache/hadoop/ipc/Server.java +++ b/src/java/org/apache/hadoop/ipc/Server.java @@ -51,6 +51,8 @@ import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import javax.security.sasl.Sasl; import javax.security.sasl.SaslException; @@ -163,6 +165,7 @@ public abstract class Server { private String bindAddress; private int port; // port we listen on private int handlerCount; // number of handler threads + private int readThreads; // number of read threads private Class paramClass; // class of call parameters private int maxIdleTime; // the maximum idle time after // which a client may be disconnected @@ -265,6 +268,8 @@ public abstract class Server { private ServerSocketChannel acceptChannel = null; //the accept channel private Selector selector = null; //the selector that we use for the server + private Reader[] readers = null; + private int currentReader = 0; private InetSocketAddress address; //the address we bind at private Random rand = new Random(); private long lastCleanupRunTime = 0; //the last time when a cleanup connec- @@ -272,6 +277,7 @@ public abstract class Server { private long cleanupInterval = 10000; //the minimum interval between //two cleanup runs private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128); + private ExecutorService readPool; public Listener() throws IOException { address = new InetSocketAddress(bindAddress, port); @@ -284,12 +290,84 @@ public abstract class Server { port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port // create a selector; selector= Selector.open(); + readers = new Reader[readThreads]; + readPool = Executors.newFixedThreadPool(readThreads); + for (int i = 0; i < readThreads; i++) { + Selector readSelector = Selector.open(); + Reader reader = new Reader(readSelector); + readers[i] = reader; + readPool.execute(reader); + } // Register accepts on the server socket with the selector. acceptChannel.register(selector, SelectionKey.OP_ACCEPT); this.setName("IPC Server listener on " + port); this.setDaemon(true); } + + private class Reader implements Runnable { + private volatile boolean adding = false; + private Selector readSelector = null; + + Reader(Selector readSelector) { + this.readSelector = readSelector; + } + public void run() { + LOG.info("Starting SocketReader"); + synchronized (this) { + while (running) { + SelectionKey key = null; + try { + readSelector.select(); + while (adding) { + this.wait(1000); + } + + Iterator iter = readSelector.selectedKeys().iterator(); + while (iter.hasNext()) { + key = iter.next(); + iter.remove(); + if (key.isValid()) { + if (key.isReadable()) { + doRead(key); + } + } + key = null; + } + } catch (InterruptedException e) { + if (running) { // unexpected -- log it + LOG.info(getName() + " caught: " + + StringUtils.stringifyException(e)); + } + } catch (IOException ex) { + LOG.error("Error in Reader", ex); + } + } + } + } + + /** + * This gets reader into the state that waits for the new channel + * to be registered with readSelector. If it was waiting in select() + * the thread will be woken up, otherwise whenever select() is called + * it will return even if there is nothing to read and wait + * in while(adding) for finishAdd call + */ + public void startAdd() { + adding = true; + readSelector.wakeup(); + } + + public synchronized SelectionKey registerChannel(SocketChannel channel) + throws IOException { + return channel.register(readSelector, SelectionKey.OP_READ); + } + + public synchronized void finishAdd() { + adding = false; + this.notify(); + } + } /** cleanup connections from connectionList. Choose a random range * to scan and also have a limit on the number of the connections * that will be cleanedup per run. The criteria for cleanup is the time @@ -354,8 +432,6 @@ public abstract class Server { if (key.isValid()) { if (key.isAcceptable()) doAccept(key); - else if (key.isReadable()) - doRead(key); } } catch (IOException e) { } @@ -369,11 +445,6 @@ public abstract class Server { closeCurrentConnection(key, e); cleanupConnections(true); try { Thread.sleep(60000); } catch (Exception ie) {} - } catch (InterruptedException e) { - if (running) { // unexpected -- log it - LOG.info(getName() + " caught: " + - StringUtils.stringifyException(e)); - } } catch (Exception e) { closeCurrentConnection(key, e); } @@ -416,25 +487,29 @@ public abstract class Server { void doAccept(SelectionKey key) throws IOException, OutOfMemoryError { Connection c = null; ServerSocketChannel server = (ServerSocketChannel) key.channel(); - // accept up to 10 connections - for (int i=0; i<10; i++) { - SocketChannel channel = server.accept(); - if (channel==null) return; + SocketChannel channel; + while ((channel = server.accept()) != null) { channel.configureBlocking(false); channel.socket().setTcpNoDelay(tcpNoDelay); - SelectionKey readKey = channel.register(getSelector(), - SelectionKey.OP_READ); - c = new Connection(readKey, channel, System.currentTimeMillis()); - readKey.attach(c); - synchronized (connectionList) { - connectionList.add(numConnections, c); - numConnections++; + + Reader reader = getReader(); + try { + reader.startAdd(); + SelectionKey readKey = reader.registerChannel(channel); + c = new Connection(readKey, channel, System.currentTimeMillis()); + readKey.attach(c); + synchronized (connectionList) { + connectionList.add(numConnections, c); + numConnections++; + } + if (LOG.isDebugEnabled()) + LOG.debug("Server connection from " + c.toString() + + "; # active connections: " + numConnections + + "; # queued calls: " + callQueue.size()); + } finally { + reader.finishAdd(); } - if (LOG.isDebugEnabled()) - LOG.debug("Server connection from " + c.toString() + - "; # active connections: " + numConnections + - "; # queued calls: " + callQueue.size()); } } @@ -480,9 +555,16 @@ public abstract class Server { LOG.info(getName() + ":Exception in closing listener socket. " + e); } } + readPool.shutdown(); } synchronized Selector getSelector() { return selector; } + // The method that will return the next reader to work with + // Simplistic implementation of round robin for now + Reader getReader() { + currentReader = (currentReader + 1) % readers.length; + return readers[currentReader]; + } } // Sends responses of RPC back to clients. @@ -1344,6 +1426,9 @@ public abstract class Server { this.maxRespSize = conf.getInt( CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY, CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT); + this.readThreads = conf.getInt( + CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, + CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT); this.callQueue = new LinkedBlockingQueue(maxQueueSize); this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000); this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10); diff --git a/src/test/core/org/apache/hadoop/ipc/TestRPC.java b/src/test/core/org/apache/hadoop/ipc/TestRPC.java index b718fcc8274..ad03dfa1763 100644 --- a/src/test/core/org/apache/hadoop/ipc/TestRPC.java +++ b/src/test/core/org/apache/hadoop/ipc/TestRPC.java @@ -424,6 +424,15 @@ public class TestRPC extends TestCase { // Reset authorization to expect failure conf.set(ACL_CONFIG, "invalid invalid"); doRPCs(conf, true); + + conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2); + // Expect to succeed + conf.set(ACL_CONFIG, "*"); + doRPCs(conf, false); + + // Reset authorization to expect failure + conf.set(ACL_CONFIG, "invalid invalid"); + doRPCs(conf, true); } /** @@ -432,8 +441,12 @@ public class TestRPC extends TestCase { */ public void testNoPings() throws Exception { Configuration conf = new Configuration(); + conf.setBoolean("ipc.client.ping", false); new TestRPC("testnoPings").testCalls(conf); + + conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2); + new TestRPC("testnoPings").testCalls(conf); } /** @@ -466,6 +479,31 @@ public class TestRPC extends TestCase { } } assertTrue(succeeded); + + conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2); + + final Server multiServer = RPC.getServer(TestProtocol.class, + new TestImpl(), ADDRESS, 0, 5, true, conf, null); + multiServer.enableSecurity(); + multiServer.start(); + succeeded = false; + final InetSocketAddress mulitServerAddr = + NetUtils.getConnectAddress(multiServer); + proxy = null; + try { + proxy = (TestProtocol) RPC.getProxy(TestProtocol.class, + TestProtocol.versionID, mulitServerAddr, conf); + } catch (RemoteException e) { + LOG.info("LOGGING MESSAGE: " + e.getLocalizedMessage()); + assertTrue(e.unwrapRemoteException() instanceof AccessControlException); + succeeded = true; + } finally { + multiServer.stop(); + if (proxy != null) { + RPC.stopProxy(proxy); + } + } + assertTrue(succeeded); } public static void main(String[] args) throws Exception {