HADOOP-6713. The RPC server Listener thread is a scalability bottleneck. Contributed by Dmytro Molkov.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@938590 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9ad633f011
commit
b212ed99c5
|
@ -76,6 +76,9 @@ Trunk (unreleased changes)
|
|||
HADOOP-6657. Add a capitalization method to StringUtils for MAPREDUCE-1545.
|
||||
(Luke Lu via Steve Loughran)
|
||||
|
||||
HADOOP-6692. Add FileContext#listStatus that returns an iterator.
|
||||
(hairong)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
HADOOP-6283. Improve the exception messages thrown by
|
||||
|
@ -243,8 +246,8 @@ Trunk (unreleased changes)
|
|||
HADOOP-6689. Add directory renaming test to existing FileContext tests.
|
||||
(Eli Collins via suresh)
|
||||
|
||||
HADOOP-6692. Add FileContext#listStatus that returns an iterator.
|
||||
(hairong)
|
||||
HADOOP-6713. The RPC server Listener thread is a scalability bottleneck.
|
||||
(Dmytro Molkov via hairong)
|
||||
|
||||
BUG FIXES
|
||||
|
||||
|
|
|
@ -123,6 +123,9 @@ public class CommonConfigurationKeys {
|
|||
"ipc.server.max.response.size";
|
||||
public static final int IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT =
|
||||
1024*1024;
|
||||
public static final String IPC_SERVER_RPC_READ_THREADS_KEY =
|
||||
"ipc.server.read.threadpool.size";
|
||||
public static final int IPC_SERVER_RPC_READ_THREADS_DEFAULT = 1;
|
||||
/**
|
||||
* How many calls per handler are allowed in the queue.
|
||||
*/
|
||||
|
|
|
@ -51,6 +51,8 @@ import java.util.Random;
|
|||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
import javax.security.sasl.Sasl;
|
||||
import javax.security.sasl.SaslException;
|
||||
|
@ -163,6 +165,7 @@ public abstract class Server {
|
|||
private String bindAddress;
|
||||
private int port; // port we listen on
|
||||
private int handlerCount; // number of handler threads
|
||||
private int readThreads; // number of read threads
|
||||
private Class<? extends Writable> paramClass; // class of call parameters
|
||||
private int maxIdleTime; // the maximum idle time after
|
||||
// which a client may be disconnected
|
||||
|
@ -265,6 +268,8 @@ public abstract class Server {
|
|||
|
||||
private ServerSocketChannel acceptChannel = null; //the accept channel
|
||||
private Selector selector = null; //the selector that we use for the server
|
||||
private Reader[] readers = null;
|
||||
private int currentReader = 0;
|
||||
private InetSocketAddress address; //the address we bind at
|
||||
private Random rand = new Random();
|
||||
private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
|
||||
|
@ -272,6 +277,7 @@ public abstract class Server {
|
|||
private long cleanupInterval = 10000; //the minimum interval between
|
||||
//two cleanup runs
|
||||
private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128);
|
||||
private ExecutorService readPool;
|
||||
|
||||
public Listener() throws IOException {
|
||||
address = new InetSocketAddress(bindAddress, port);
|
||||
|
@ -284,12 +290,84 @@ public abstract class Server {
|
|||
port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
|
||||
// create a selector;
|
||||
selector= Selector.open();
|
||||
readers = new Reader[readThreads];
|
||||
readPool = Executors.newFixedThreadPool(readThreads);
|
||||
for (int i = 0; i < readThreads; i++) {
|
||||
Selector readSelector = Selector.open();
|
||||
Reader reader = new Reader(readSelector);
|
||||
readers[i] = reader;
|
||||
readPool.execute(reader);
|
||||
}
|
||||
|
||||
// Register accepts on the server socket with the selector.
|
||||
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
|
||||
this.setName("IPC Server listener on " + port);
|
||||
this.setDaemon(true);
|
||||
}
|
||||
|
||||
private class Reader implements Runnable {
|
||||
private volatile boolean adding = false;
|
||||
private Selector readSelector = null;
|
||||
|
||||
Reader(Selector readSelector) {
|
||||
this.readSelector = readSelector;
|
||||
}
|
||||
public void run() {
|
||||
LOG.info("Starting SocketReader");
|
||||
synchronized (this) {
|
||||
while (running) {
|
||||
SelectionKey key = null;
|
||||
try {
|
||||
readSelector.select();
|
||||
while (adding) {
|
||||
this.wait(1000);
|
||||
}
|
||||
|
||||
Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
|
||||
while (iter.hasNext()) {
|
||||
key = iter.next();
|
||||
iter.remove();
|
||||
if (key.isValid()) {
|
||||
if (key.isReadable()) {
|
||||
doRead(key);
|
||||
}
|
||||
}
|
||||
key = null;
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
if (running) { // unexpected -- log it
|
||||
LOG.info(getName() + " caught: " +
|
||||
StringUtils.stringifyException(e));
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
LOG.error("Error in Reader", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This gets reader into the state that waits for the new channel
|
||||
* to be registered with readSelector. If it was waiting in select()
|
||||
* the thread will be woken up, otherwise whenever select() is called
|
||||
* it will return even if there is nothing to read and wait
|
||||
* in while(adding) for finishAdd call
|
||||
*/
|
||||
public void startAdd() {
|
||||
adding = true;
|
||||
readSelector.wakeup();
|
||||
}
|
||||
|
||||
public synchronized SelectionKey registerChannel(SocketChannel channel)
|
||||
throws IOException {
|
||||
return channel.register(readSelector, SelectionKey.OP_READ);
|
||||
}
|
||||
|
||||
public synchronized void finishAdd() {
|
||||
adding = false;
|
||||
this.notify();
|
||||
}
|
||||
}
|
||||
/** cleanup connections from connectionList. Choose a random range
|
||||
* to scan and also have a limit on the number of the connections
|
||||
* that will be cleanedup per run. The criteria for cleanup is the time
|
||||
|
@ -354,8 +432,6 @@ public abstract class Server {
|
|||
if (key.isValid()) {
|
||||
if (key.isAcceptable())
|
||||
doAccept(key);
|
||||
else if (key.isReadable())
|
||||
doRead(key);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
}
|
||||
|
@ -369,11 +445,6 @@ public abstract class Server {
|
|||
closeCurrentConnection(key, e);
|
||||
cleanupConnections(true);
|
||||
try { Thread.sleep(60000); } catch (Exception ie) {}
|
||||
} catch (InterruptedException e) {
|
||||
if (running) { // unexpected -- log it
|
||||
LOG.info(getName() + " caught: " +
|
||||
StringUtils.stringifyException(e));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
closeCurrentConnection(key, e);
|
||||
}
|
||||
|
@ -416,25 +487,29 @@ public abstract class Server {
|
|||
void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
|
||||
Connection c = null;
|
||||
ServerSocketChannel server = (ServerSocketChannel) key.channel();
|
||||
// accept up to 10 connections
|
||||
for (int i=0; i<10; i++) {
|
||||
SocketChannel channel = server.accept();
|
||||
if (channel==null) return;
|
||||
SocketChannel channel;
|
||||
while ((channel = server.accept()) != null) {
|
||||
|
||||
channel.configureBlocking(false);
|
||||
channel.socket().setTcpNoDelay(tcpNoDelay);
|
||||
SelectionKey readKey = channel.register(getSelector(),
|
||||
SelectionKey.OP_READ);
|
||||
c = new Connection(readKey, channel, System.currentTimeMillis());
|
||||
readKey.attach(c);
|
||||
synchronized (connectionList) {
|
||||
connectionList.add(numConnections, c);
|
||||
numConnections++;
|
||||
|
||||
Reader reader = getReader();
|
||||
try {
|
||||
reader.startAdd();
|
||||
SelectionKey readKey = reader.registerChannel(channel);
|
||||
c = new Connection(readKey, channel, System.currentTimeMillis());
|
||||
readKey.attach(c);
|
||||
synchronized (connectionList) {
|
||||
connectionList.add(numConnections, c);
|
||||
numConnections++;
|
||||
}
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Server connection from " + c.toString() +
|
||||
"; # active connections: " + numConnections +
|
||||
"; # queued calls: " + callQueue.size());
|
||||
} finally {
|
||||
reader.finishAdd();
|
||||
}
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Server connection from " + c.toString() +
|
||||
"; # active connections: " + numConnections +
|
||||
"; # queued calls: " + callQueue.size());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -480,9 +555,16 @@ public abstract class Server {
|
|||
LOG.info(getName() + ":Exception in closing listener socket. " + e);
|
||||
}
|
||||
}
|
||||
readPool.shutdown();
|
||||
}
|
||||
|
||||
synchronized Selector getSelector() { return selector; }
|
||||
// The method that will return the next reader to work with
|
||||
// Simplistic implementation of round robin for now
|
||||
Reader getReader() {
|
||||
currentReader = (currentReader + 1) % readers.length;
|
||||
return readers[currentReader];
|
||||
}
|
||||
}
|
||||
|
||||
// Sends responses of RPC back to clients.
|
||||
|
@ -1344,6 +1426,9 @@ public abstract class Server {
|
|||
this.maxRespSize = conf.getInt(
|
||||
CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY,
|
||||
CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT);
|
||||
this.readThreads = conf.getInt(
|
||||
CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY,
|
||||
CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT);
|
||||
this.callQueue = new LinkedBlockingQueue<Call>(maxQueueSize);
|
||||
this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000);
|
||||
this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
|
||||
|
|
|
@ -424,6 +424,15 @@ public class TestRPC extends TestCase {
|
|||
// Reset authorization to expect failure
|
||||
conf.set(ACL_CONFIG, "invalid invalid");
|
||||
doRPCs(conf, true);
|
||||
|
||||
conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2);
|
||||
// Expect to succeed
|
||||
conf.set(ACL_CONFIG, "*");
|
||||
doRPCs(conf, false);
|
||||
|
||||
// Reset authorization to expect failure
|
||||
conf.set(ACL_CONFIG, "invalid invalid");
|
||||
doRPCs(conf, true);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -432,8 +441,12 @@ public class TestRPC extends TestCase {
|
|||
*/
|
||||
public void testNoPings() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
|
||||
conf.setBoolean("ipc.client.ping", false);
|
||||
new TestRPC("testnoPings").testCalls(conf);
|
||||
|
||||
conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2);
|
||||
new TestRPC("testnoPings").testCalls(conf);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -466,6 +479,31 @@ public class TestRPC extends TestCase {
|
|||
}
|
||||
}
|
||||
assertTrue(succeeded);
|
||||
|
||||
conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2);
|
||||
|
||||
final Server multiServer = RPC.getServer(TestProtocol.class,
|
||||
new TestImpl(), ADDRESS, 0, 5, true, conf, null);
|
||||
multiServer.enableSecurity();
|
||||
multiServer.start();
|
||||
succeeded = false;
|
||||
final InetSocketAddress mulitServerAddr =
|
||||
NetUtils.getConnectAddress(multiServer);
|
||||
proxy = null;
|
||||
try {
|
||||
proxy = (TestProtocol) RPC.getProxy(TestProtocol.class,
|
||||
TestProtocol.versionID, mulitServerAddr, conf);
|
||||
} catch (RemoteException e) {
|
||||
LOG.info("LOGGING MESSAGE: " + e.getLocalizedMessage());
|
||||
assertTrue(e.unwrapRemoteException() instanceof AccessControlException);
|
||||
succeeded = true;
|
||||
} finally {
|
||||
multiServer.stop();
|
||||
if (proxy != null) {
|
||||
RPC.stopProxy(proxy);
|
||||
}
|
||||
}
|
||||
assertTrue(succeeded);
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
|
Loading…
Reference in New Issue