From 3a43e5930baa4f1ad97a45fff3c7a1800fc11649 Mon Sep 17 00:00:00 2001 From: Hairong Kuang Date: Thu, 25 Nov 2010 04:36:17 +0000 Subject: [PATCH] HADOOP-6764. Add number of reader threads and queue length as configuration parameters in RPC.getServer. Contributed by Dmytro Molkov. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1038918 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 5 ++- .../org/apache/hadoop/ipc/AvroRpcEngine.java | 7 +-- src/java/org/apache/hadoop/ipc/RPC.java | 23 ++++++++-- src/java/org/apache/hadoop/ipc/RpcEngine.java | 3 +- src/java/org/apache/hadoop/ipc/Server.java | 45 ++++++++++++++----- .../apache/hadoop/ipc/WritableRpcEngine.java | 12 ++--- .../core/org/apache/hadoop/ipc/TestRPC.java | 26 +++++++++++ 7 files changed, 97 insertions(+), 24 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 3ac3baf1d92..f2ecd31fb58 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -15,7 +15,10 @@ Trunk (unreleased changes) improve other messaging. (nigel) HADOOP-7001. Configuration changes can occur via the Reconfigurable - interface. (Patrick Kline via dhruba) + interface. (Patrick Kling via dhruba) + + HADOOP-6764. Add number of reader threads and queue length as + configuration parameters in RPC.getServer. (Dmytro Molkov via hairong) OPTIMIZATIONS diff --git a/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java b/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java index 3944464b95a..68700bcc26d 100644 --- a/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java +++ b/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java @@ -211,14 +211,15 @@ public class AvroRpcEngine implements RpcEngine { /** Construct a server for a protocol implementation instance listening on a * port and address. */ public RPC.Server getServer(Class iface, Object impl, String bindAddress, - int port, int numHandlers, boolean verbose, + int port, int numHandlers, int numReaders, + int queueSizePerHandler, boolean verbose, Configuration conf, SecretManager secretManager ) throws IOException { return ENGINE.getServer(TunnelProtocol.class, new TunnelResponder(iface, impl), - bindAddress, port, numHandlers, verbose, conf, - secretManager); + bindAddress, port, numHandlers, numReaders, + queueSizePerHandler, verbose, conf, secretManager); } } diff --git a/src/java/org/apache/hadoop/ipc/RPC.java b/src/java/org/apache/hadoop/ipc/RPC.java index 4ffdbe08c38..6c51f6659ee 100644 --- a/src/java/org/apache/hadoop/ipc/RPC.java +++ b/src/java/org/apache/hadoop/ipc/RPC.java @@ -380,18 +380,33 @@ public class RPC { throws IOException { return getProtocolEngine(protocol, conf) - .getServer(protocol, instance, bindAddress, port, numHandlers, verbose, - conf, secretManager); + .getServer(protocol, instance, bindAddress, port, numHandlers, -1, -1, + verbose, conf, secretManager); + } + + /** Construct a server for a protocol implementation instance. */ + public static Server getServer(Class protocol, + Object instance, String bindAddress, int port, + int numHandlers, int numReaders, int queueSizePerHandler, + boolean verbose, Configuration conf, + SecretManager secretManager) + throws IOException { + + return getProtocolEngine(protocol, conf) + .getServer(protocol, instance, bindAddress, port, numHandlers, + numReaders, queueSizePerHandler, verbose, conf, secretManager); } /** An RPC Server. */ public abstract static class Server extends org.apache.hadoop.ipc.Server { protected Server(String bindAddress, int port, - Class paramClass, int handlerCount, + Class paramClass, int handlerCount, + int numReaders, int queueSizePerHandler, Configuration conf, String serverName, SecretManager secretManager) throws IOException { - super(bindAddress, port, paramClass, handlerCount, conf, serverName, secretManager); + super(bindAddress, port, paramClass, handlerCount, numReaders, queueSizePerHandler, + conf, serverName, secretManager); } } diff --git a/src/java/org/apache/hadoop/ipc/RpcEngine.java b/src/java/org/apache/hadoop/ipc/RpcEngine.java index d738969abb3..9af9abe31ff 100644 --- a/src/java/org/apache/hadoop/ipc/RpcEngine.java +++ b/src/java/org/apache/hadoop/ipc/RpcEngine.java @@ -50,7 +50,8 @@ public interface RpcEngine { /** Construct a server for a protocol implementation instance. */ RPC.Server getServer(Class protocol, Object instance, String bindAddress, - int port, int numHandlers, boolean verbose, + int port, int numHandlers, int numReaders, + int queueSizePerHandler, boolean verbose, Configuration conf, SecretManager secretManager ) throws IOException; diff --git a/src/java/org/apache/hadoop/ipc/Server.java b/src/java/org/apache/hadoop/ipc/Server.java index 01d76d886ae..afdcdb2b0dc 100644 --- a/src/java/org/apache/hadoop/ipc/Server.java +++ b/src/java/org/apache/hadoop/ipc/Server.java @@ -1451,16 +1451,18 @@ public abstract class Server { Configuration conf) throws IOException { - this(bindAddress, port, paramClass, handlerCount, conf, Integer.toString(port), null); + this(bindAddress, port, paramClass, handlerCount, -1, -1, conf, Integer.toString(port), null); } + /** Constructs a server listening on the named port and address. Parameters passed must * be of the named class. The handlerCount determines * the number of handler threads that will be used to process calls. - * + * If queueSizePerHandler or numReaders are not -1 they will be used instead of parameters + * from configuration. Otherwise the configuration will be picked up. */ @SuppressWarnings("unchecked") protected Server(String bindAddress, int port, - Class paramClass, int handlerCount, + Class paramClass, int handlerCount, int numReaders, int queueSizePerHandler, Configuration conf, String serverName, SecretManager secretManager) throws IOException { this.bindAddress = bindAddress; @@ -1469,15 +1471,23 @@ public abstract class Server { this.paramClass = paramClass; this.handlerCount = handlerCount; this.socketSendBufferSize = 0; - this.maxQueueSize = handlerCount * conf.getInt( - CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY, - CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT); + if (queueSizePerHandler != -1) { + this.maxQueueSize = queueSizePerHandler; + } else { + this.maxQueueSize = handlerCount * conf.getInt( + CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY, + CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT); + } this.maxRespSize = conf.getInt( CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY, CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT); - this.readThreads = conf.getInt( - CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, - CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT); + if (numReaders != -1) { + this.readThreads = numReaders; + } else { + this.readThreads = conf.getInt( + CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, + CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT); + } this.callQueue = new LinkedBlockingQueue(maxQueueSize); this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000); this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10); @@ -1691,7 +1701,22 @@ public abstract class Server { return callQueue.size(); } - + /** + * The maximum size of the rpc call queue of this server. + * @return The maximum size of the rpc call queue. + */ + public int getMaxQueueSize() { + return maxQueueSize; + } + + /** + * The number of reader threads for this server. + * @return The number of reader threads. + */ + public int getNumReaders() { + return readThreads; + } + /** * When the read or write buffer size is larger than this limit, i/o will be * done in chunks of this size. Most RPC requests and responses would be diff --git a/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java b/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java index 08edee97cf2..87b9c532caa 100644 --- a/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java +++ b/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java @@ -285,11 +285,12 @@ public class WritableRpcEngine implements RpcEngine { * port and address. */ public Server getServer(Class protocol, Object instance, String bindAddress, int port, - int numHandlers, boolean verbose, Configuration conf, + int numHandlers, int numReaders, int queueSizePerHandler, + boolean verbose, Configuration conf, SecretManager secretManager) throws IOException { return new Server(instance, conf, bindAddress, port, numHandlers, - verbose, secretManager); + numReaders, queueSizePerHandler, verbose, secretManager); } /** An RPC Server. */ @@ -305,7 +306,7 @@ public class WritableRpcEngine implements RpcEngine { */ public Server(Object instance, Configuration conf, String bindAddress, int port) throws IOException { - this(instance, conf, bindAddress, port, 1, false, null); + this(instance, conf, bindAddress, port, 1, -1, -1, false, null); } private static String classNameBase(String className) { @@ -325,10 +326,11 @@ public class WritableRpcEngine implements RpcEngine { * @param verbose whether each call should be logged */ public Server(Object instance, Configuration conf, String bindAddress, int port, - int numHandlers, boolean verbose, + int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose, SecretManager secretManager) throws IOException { - super(bindAddress, port, Invocation.class, numHandlers, conf, + super(bindAddress, port, Invocation.class, numHandlers, numReaders, + queueSizePerHandler, conf, classNameBase(instance.getClass().getName()), secretManager); this.instance = instance; this.verbose = verbose; diff --git a/src/test/core/org/apache/hadoop/ipc/TestRPC.java b/src/test/core/org/apache/hadoop/ipc/TestRPC.java index 9ca6a6e9361..0267b3dcd79 100644 --- a/src/test/core/org/apache/hadoop/ipc/TestRPC.java +++ b/src/test/core/org/apache/hadoop/ipc/TestRPC.java @@ -190,6 +190,28 @@ public class TestRPC extends TestCase { } } } + + public void testConfRpc() throws Exception { + Server server = RPC.getServer(TestProtocol.class, + new TestImpl(), ADDRESS, 0, 1, false, conf, null); + // Just one handler + int confQ = conf.getInt( + CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY, + CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT); + assertEquals(confQ, server.getMaxQueueSize()); + + int confReaders = conf.getInt( + CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, + CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT); + assertEquals(confReaders, server.getNumReaders()); + server.stop(); + + server = RPC.getServer(TestProtocol.class, + new TestImpl(), ADDRESS, 0, 1, 3, 200, false, conf, null); + assertEquals(3, server.getNumReaders()); + assertEquals(200, server.getMaxQueueSize()); + server.stop(); + } public void testSlowRpc() throws Exception { System.out.println("Testing Slow RPC"); @@ -234,6 +256,10 @@ public class TestRPC extends TestCase { System.out.println("Down slow rpc testing"); } } + + public void testRPCConf(Configuration conf) throws Exception { + + } public void testCalls(Configuration conf) throws Exception { Server server = RPC.getServer(TestProtocol.class,