diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 1ff6a0f0ad2..65324958d7d 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -2038,6 +2038,8 @@ Release 0.23.10 - UNRELEASED OPTIMIZATIONS + HADOOP-9956. RPC listener inefficiently assigns connections to readers (daryn) + BUG FIXES Release 0.23.9 - 2013-07-08 diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index dcab369e385..06c1a734556 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -65,6 +65,13 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { /** Default value for IPC_SERVER_RPC_READ_THREADS_KEY */ public static final int IPC_SERVER_RPC_READ_THREADS_DEFAULT = 1; + /** Number of pending connections that may be queued per socket reader */ + public static final String IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY = + "ipc.server.read.connection-queue.size"; + /** Default value for IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE */ + public static final int IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT = + 100; + public static final String IPC_MAXIMUM_DATA_LENGTH = "ipc.maximum.data.length"; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 693cbd3c869..2760bc67e4c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -342,6 +342,7 @@ public abstract class Server { private int port; // port we listen on private int handlerCount; // number of handler threads private int readThreads; // number of read threads + private int readerPendingConnectionQueue; // number of connections to queue per read thread private Class rpcRequestClass; // class used for deserializing the rpc request private int maxIdleTime; // the maximum idle time after // which a client may be disconnected @@ -550,12 +551,14 @@ public abstract class Server { } private class Reader extends Thread { - private volatile boolean adding = false; + final private BlockingQueue pendingConnections; private final Selector readSelector; Reader(String name) throws IOException { super(name); + this.pendingConnections = + new LinkedBlockingQueue(readerPendingConnectionQueue); this.readSelector = Selector.open(); } @@ -577,10 +580,14 @@ public abstract class Server { while (running) { SelectionKey key = null; try { + // consume as many connections as currently queued to avoid + // unbridled acceptance of connections that starves the select + int size = pendingConnections.size(); + for (int i=size; i>0; i--) { + Connection conn = pendingConnections.take(); + conn.channel.register(readSelector, SelectionKey.OP_READ, conn); + } readSelector.select(); - while (adding) { - this.wait(1000); - } Iterator iter = readSelector.selectedKeys().iterator(); while (iter.hasNext()) { @@ -604,26 +611,14 @@ public abstract class Server { } /** - * This gets reader into the state that waits for the new channel - * to be registered with readSelector. If it was waiting in select() - * the thread will be woken up, otherwise whenever select() is called - * it will return even if there is nothing to read and wait - * in while(adding) for finishAdd call + * Updating the readSelector while it's being used is not thread-safe, + * so the connection must be queued. The reader will drain the queue + * and update its readSelector before performing the next select */ - public void startAdd() { - adding = true; + public void addConnection(Connection conn) throws InterruptedException { + pendingConnections.put(conn); readSelector.wakeup(); } - - public synchronized SelectionKey registerChannel(SocketChannel channel) - throws IOException { - return channel.register(readSelector, SelectionKey.OP_READ); - } - - public synchronized void finishAdd() { - adding = false; - this.notify(); - } void shutdown() { assert !running; @@ -763,20 +758,23 @@ public abstract class Server { Reader reader = getReader(); try { - reader.startAdd(); - SelectionKey readKey = reader.registerChannel(channel); - c = new Connection(readKey, channel, Time.now()); - readKey.attach(c); + c = new Connection(channel, Time.now()); synchronized (connectionList) { connectionList.add(numConnections, c); numConnections++; } + reader.addConnection(c); if (LOG.isDebugEnabled()) LOG.debug("Server connection from " + c.toString() + "; # active connections: " + numConnections + "; # queued calls: " + callQueue.size()); - } finally { - reader.finishAdd(); + } catch (InterruptedException ie) { + if (running) { + LOG.info( + getName() + ": disconnecting client " + c.getHostAddress() + + " due to unexpected interrupt"); + } + closeConnection(c); } } } @@ -1187,8 +1185,7 @@ public abstract class Server { private boolean sentNegotiate = false; private boolean useWrap = false; - public Connection(SelectionKey key, SocketChannel channel, - long lastContact) { + public Connection(SocketChannel channel, long lastContact) { this.channel = channel; this.lastContact = lastContact; this.data = null; @@ -2186,6 +2183,9 @@ public abstract class Server { CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT); } + this.readerPendingConnectionQueue = conf.getInt( + CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY, + CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT); this.callQueue = new LinkedBlockingQueue(maxQueueSize); this.maxIdleTime = 2 * conf.getInt( CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java index 33fb799c0c8..a84232e6c61 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java @@ -44,12 +44,16 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Random; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; import javax.net.SocketFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IntWritable; @@ -613,6 +617,148 @@ public class TestIPC { server.stop(); } + private static class TestServerQueue extends Server { + final CountDownLatch firstCallLatch = new CountDownLatch(1); + final CountDownLatch callBlockLatch = new CountDownLatch(1); + + TestServerQueue(int expectedCalls, int readers, int callQ, int handlers, + Configuration conf) throws IOException { + super(ADDRESS, 0, LongWritable.class, handlers, readers, callQ, conf, null, null); + } + + @Override + public Writable call(RPC.RpcKind rpcKind, String protocol, Writable param, + long receiveTime) throws IOException { + firstCallLatch.countDown(); + try { + callBlockLatch.await(); + } catch (InterruptedException e) { + throw new IOException(e); + } + return param; + } + } + + /** + * Check that reader queueing works + * @throws BrokenBarrierException + * @throws InterruptedException + */ + @Test(timeout=60000) + public void testIpcWithReaderQueuing() throws Exception { + // 1 reader, 1 connectionQ slot, 1 callq + for (int i=0; i < 10; i++) { + checkBlocking(1, 1, 1); + } + // 4 readers, 5 connectionQ slots, 2 callq + for (int i=0; i < 10; i++) { + checkBlocking(4, 5, 2); + } + } + + // goal is to jam a handler with a connection, fill the callq with + // connections, in turn jamming the readers - then flood the server and + // ensure that the listener blocks when the reader connection queues fill + private void checkBlocking(int readers, int readerQ, int callQ) throws Exception { + int handlers = 1; // makes it easier + + final Configuration conf = new Configuration(); + conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY, readerQ); + + // send in enough clients to block up the handlers, callq, and readers + int initialClients = readers + callQ + handlers; + // max connections we should ever end up accepting at once + int maxAccept = initialClients + readers*readerQ + 1; // 1 = listener + // stress it with 2X the max + int clients = maxAccept*2; + + final AtomicInteger failures = new AtomicInteger(0); + final CountDownLatch callFinishedLatch = new CountDownLatch(clients); + + // start server + final TestServerQueue server = + new TestServerQueue(clients, readers, callQ, handlers, conf); + final InetSocketAddress addr = NetUtils.getConnectAddress(server); + server.start(); + + Client.setConnectTimeout(conf, 10000); + + // instantiate the threads, will start in batches + Thread[] threads = new Thread[clients]; + for (int i=0; i