svn merge -c 1541736 FIXES: HADOOP-9956. RPC listener inefficiently assigns connections to readers (daryn)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1541743 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0c220f30b3
commit
d993d01cdf
|
@ -2038,6 +2038,8 @@ Release 0.23.10 - UNRELEASED
|
|||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HADOOP-9956. RPC listener inefficiently assigns connections to readers (daryn)
|
||||
|
||||
BUG FIXES
|
||||
|
||||
Release 0.23.9 - 2013-07-08
|
||||
|
|
|
@ -65,6 +65,13 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
|
|||
/** Default value for IPC_SERVER_RPC_READ_THREADS_KEY */
|
||||
public static final int IPC_SERVER_RPC_READ_THREADS_DEFAULT = 1;
|
||||
|
||||
/** Number of pending connections that may be queued per socket reader */
|
||||
public static final String IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY =
|
||||
"ipc.server.read.connection-queue.size";
|
||||
/** Default value for IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE */
|
||||
public static final int IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT =
|
||||
100;
|
||||
|
||||
public static final String IPC_MAXIMUM_DATA_LENGTH =
|
||||
"ipc.maximum.data.length";
|
||||
|
||||
|
|
|
@ -342,6 +342,7 @@ public abstract class Server {
|
|||
private int port; // port we listen on
|
||||
private int handlerCount; // number of handler threads
|
||||
private int readThreads; // number of read threads
|
||||
private int readerPendingConnectionQueue; // number of connections to queue per read thread
|
||||
private Class<? extends Writable> rpcRequestClass; // class used for deserializing the rpc request
|
||||
private int maxIdleTime; // the maximum idle time after
|
||||
// which a client may be disconnected
|
||||
|
@ -550,12 +551,14 @@ public abstract class Server {
|
|||
}
|
||||
|
||||
private class Reader extends Thread {
|
||||
private volatile boolean adding = false;
|
||||
final private BlockingQueue<Connection> pendingConnections;
|
||||
private final Selector readSelector;
|
||||
|
||||
Reader(String name) throws IOException {
|
||||
super(name);
|
||||
|
||||
this.pendingConnections =
|
||||
new LinkedBlockingQueue<Connection>(readerPendingConnectionQueue);
|
||||
this.readSelector = Selector.open();
|
||||
}
|
||||
|
||||
|
@ -577,10 +580,14 @@ public abstract class Server {
|
|||
while (running) {
|
||||
SelectionKey key = null;
|
||||
try {
|
||||
// consume as many connections as currently queued to avoid
|
||||
// unbridled acceptance of connections that starves the select
|
||||
int size = pendingConnections.size();
|
||||
for (int i=size; i>0; i--) {
|
||||
Connection conn = pendingConnections.take();
|
||||
conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
|
||||
}
|
||||
readSelector.select();
|
||||
while (adding) {
|
||||
this.wait(1000);
|
||||
}
|
||||
|
||||
Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
|
||||
while (iter.hasNext()) {
|
||||
|
@ -604,26 +611,14 @@ public abstract class Server {
|
|||
}
|
||||
|
||||
/**
|
||||
* This gets reader into the state that waits for the new channel
|
||||
* to be registered with readSelector. If it was waiting in select()
|
||||
* the thread will be woken up, otherwise whenever select() is called
|
||||
* it will return even if there is nothing to read and wait
|
||||
* in while(adding) for finishAdd call
|
||||
* Updating the readSelector while it's being used is not thread-safe,
|
||||
* so the connection must be queued. The reader will drain the queue
|
||||
* and update its readSelector before performing the next select
|
||||
*/
|
||||
public void startAdd() {
|
||||
adding = true;
|
||||
public void addConnection(Connection conn) throws InterruptedException {
|
||||
pendingConnections.put(conn);
|
||||
readSelector.wakeup();
|
||||
}
|
||||
|
||||
public synchronized SelectionKey registerChannel(SocketChannel channel)
|
||||
throws IOException {
|
||||
return channel.register(readSelector, SelectionKey.OP_READ);
|
||||
}
|
||||
|
||||
public synchronized void finishAdd() {
|
||||
adding = false;
|
||||
this.notify();
|
||||
}
|
||||
|
||||
void shutdown() {
|
||||
assert !running;
|
||||
|
@ -763,20 +758,23 @@ public abstract class Server {
|
|||
|
||||
Reader reader = getReader();
|
||||
try {
|
||||
reader.startAdd();
|
||||
SelectionKey readKey = reader.registerChannel(channel);
|
||||
c = new Connection(readKey, channel, Time.now());
|
||||
readKey.attach(c);
|
||||
c = new Connection(channel, Time.now());
|
||||
synchronized (connectionList) {
|
||||
connectionList.add(numConnections, c);
|
||||
numConnections++;
|
||||
}
|
||||
reader.addConnection(c);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Server connection from " + c.toString() +
|
||||
"; # active connections: " + numConnections +
|
||||
"; # queued calls: " + callQueue.size());
|
||||
} finally {
|
||||
reader.finishAdd();
|
||||
} catch (InterruptedException ie) {
|
||||
if (running) {
|
||||
LOG.info(
|
||||
getName() + ": disconnecting client " + c.getHostAddress() +
|
||||
" due to unexpected interrupt");
|
||||
}
|
||||
closeConnection(c);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1187,8 +1185,7 @@ public abstract class Server {
|
|||
private boolean sentNegotiate = false;
|
||||
private boolean useWrap = false;
|
||||
|
||||
public Connection(SelectionKey key, SocketChannel channel,
|
||||
long lastContact) {
|
||||
public Connection(SocketChannel channel, long lastContact) {
|
||||
this.channel = channel;
|
||||
this.lastContact = lastContact;
|
||||
this.data = null;
|
||||
|
@ -2186,6 +2183,9 @@ public abstract class Server {
|
|||
CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY,
|
||||
CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT);
|
||||
}
|
||||
this.readerPendingConnectionQueue = conf.getInt(
|
||||
CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY,
|
||||
CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT);
|
||||
this.callQueue = new LinkedBlockingQueue<Call>(maxQueueSize);
|
||||
this.maxIdleTime = 2 * conf.getInt(
|
||||
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
|
||||
|
|
|
@ -44,12 +44,16 @@ import java.util.ArrayList;
|
|||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.BrokenBarrierException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import javax.net.SocketFactory;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.IntWritable;
|
||||
|
@ -613,6 +617,148 @@ public class TestIPC {
|
|||
server.stop();
|
||||
}
|
||||
|
||||
private static class TestServerQueue extends Server {
|
||||
final CountDownLatch firstCallLatch = new CountDownLatch(1);
|
||||
final CountDownLatch callBlockLatch = new CountDownLatch(1);
|
||||
|
||||
TestServerQueue(int expectedCalls, int readers, int callQ, int handlers,
|
||||
Configuration conf) throws IOException {
|
||||
super(ADDRESS, 0, LongWritable.class, handlers, readers, callQ, conf, null, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Writable call(RPC.RpcKind rpcKind, String protocol, Writable param,
|
||||
long receiveTime) throws IOException {
|
||||
firstCallLatch.countDown();
|
||||
try {
|
||||
callBlockLatch.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
return param;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that reader queueing works
|
||||
* @throws BrokenBarrierException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
@Test(timeout=60000)
|
||||
public void testIpcWithReaderQueuing() throws Exception {
|
||||
// 1 reader, 1 connectionQ slot, 1 callq
|
||||
for (int i=0; i < 10; i++) {
|
||||
checkBlocking(1, 1, 1);
|
||||
}
|
||||
// 4 readers, 5 connectionQ slots, 2 callq
|
||||
for (int i=0; i < 10; i++) {
|
||||
checkBlocking(4, 5, 2);
|
||||
}
|
||||
}
|
||||
|
||||
// goal is to jam a handler with a connection, fill the callq with
|
||||
// connections, in turn jamming the readers - then flood the server and
|
||||
// ensure that the listener blocks when the reader connection queues fill
|
||||
private void checkBlocking(int readers, int readerQ, int callQ) throws Exception {
|
||||
int handlers = 1; // makes it easier
|
||||
|
||||
final Configuration conf = new Configuration();
|
||||
conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY, readerQ);
|
||||
|
||||
// send in enough clients to block up the handlers, callq, and readers
|
||||
int initialClients = readers + callQ + handlers;
|
||||
// max connections we should ever end up accepting at once
|
||||
int maxAccept = initialClients + readers*readerQ + 1; // 1 = listener
|
||||
// stress it with 2X the max
|
||||
int clients = maxAccept*2;
|
||||
|
||||
final AtomicInteger failures = new AtomicInteger(0);
|
||||
final CountDownLatch callFinishedLatch = new CountDownLatch(clients);
|
||||
|
||||
// start server
|
||||
final TestServerQueue server =
|
||||
new TestServerQueue(clients, readers, callQ, handlers, conf);
|
||||
final InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
||||
server.start();
|
||||
|
||||
Client.setConnectTimeout(conf, 10000);
|
||||
|
||||
// instantiate the threads, will start in batches
|
||||
Thread[] threads = new Thread[clients];
|
||||
for (int i=0; i<clients; i++) {
|
||||
threads[i] = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
Client client = new Client(LongWritable.class, conf);
|
||||
try {
|
||||
client.call(new LongWritable(Thread.currentThread().getId()),
|
||||
addr, null, null, 60000, conf);
|
||||
} catch (Throwable e) {
|
||||
LOG.error(e);
|
||||
failures.incrementAndGet();
|
||||
return;
|
||||
} finally {
|
||||
callFinishedLatch.countDown();
|
||||
client.stop();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// start enough clients to block up the handler, callq, and each reader;
|
||||
// let the calls sequentially slot in to avoid some readers blocking
|
||||
// and others not blocking in the race to fill the callq
|
||||
for (int i=0; i < initialClients; i++) {
|
||||
threads[i].start();
|
||||
if (i==0) {
|
||||
// let first reader block in a call
|
||||
server.firstCallLatch.await();
|
||||
} else if (i <= callQ) {
|
||||
// let subsequent readers jam the callq, will happen immediately
|
||||
while (server.getCallQueueLen() != i) {
|
||||
Thread.sleep(1);
|
||||
}
|
||||
} // additional threads block the readers trying to add to the callq
|
||||
}
|
||||
|
||||
// wait till everything is slotted, should happen immediately
|
||||
Thread.sleep(10);
|
||||
if (server.getNumOpenConnections() < initialClients) {
|
||||
LOG.info("(initial clients) need:"+initialClients+" connections have:"+server.getNumOpenConnections());
|
||||
Thread.sleep(100);
|
||||
}
|
||||
LOG.info("ipc layer should be blocked");
|
||||
assertEquals(callQ, server.getCallQueueLen());
|
||||
assertEquals(initialClients, server.getNumOpenConnections());
|
||||
|
||||
// now flood the server with the rest of the connections, the reader's
|
||||
// connection queues should fill and then the listener should block
|
||||
for (int i=initialClients; i<clients; i++) {
|
||||
threads[i].start();
|
||||
}
|
||||
Thread.sleep(10);
|
||||
if (server.getNumOpenConnections() < maxAccept) {
|
||||
LOG.info("(max clients) need:"+maxAccept+" connections have:"+server.getNumOpenConnections());
|
||||
Thread.sleep(100);
|
||||
}
|
||||
// check a few times to make sure we didn't go over
|
||||
for (int i=0; i<4; i++) {
|
||||
assertEquals(maxAccept, server.getNumOpenConnections());
|
||||
Thread.sleep(100);
|
||||
}
|
||||
|
||||
// sanity check that no calls have finished
|
||||
assertEquals(clients, callFinishedLatch.getCount());
|
||||
LOG.info("releasing the calls");
|
||||
server.callBlockLatch.countDown();
|
||||
callFinishedLatch.await();
|
||||
for (Thread t : threads) {
|
||||
t.join();
|
||||
}
|
||||
assertEquals(0, failures.get());
|
||||
server.stop();
|
||||
}
|
||||
|
||||
/**
|
||||
* Make a call from a client and verify if header info is changed in server side
|
||||
*/
|
||||
|
|
Loading…
Reference in New Issue