diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index a2237bf530d..1d5292c5f14 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -656,6 +656,9 @@ public abstract class Server { public synchronized void refreshCallQueue(Configuration conf) { // Create the next queue String prefix = getQueueClassPrefix(); + this.maxQueueSize = handlerCount * conf.getInt( + CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY, + CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT); callQueue.swapQueue(getSchedulerClass(prefix, conf), getQueueClass(prefix, conf), maxQueueSize, prefix, conf); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestRefreshCallQueue.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestRefreshCallQueue.java index d5eb9cfc484..faac1c0acaa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestRefreshCallQueue.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestRefreshCallQueue.java @@ -30,8 +30,10 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer; import org.apache.hadoop.hdfs.tools.DFSAdmin; @@ -141,8 +143,16 @@ public class TestRefreshCallQueue { // throw an error when we double-initialize JvmMetrics DefaultMetricsSystem.setMiniClusterMode(false); - + int serviceHandlerCount = config.getInt( + DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY, + DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT); NameNodeRpcServer rpcServer = (NameNodeRpcServer) cluster.getNameNodeRpc(); + // check callqueue size + assertEquals(CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT + * serviceHandlerCount, rpcServer.getClientRpcServer().getMaxQueueSize()); + // Replace queue and update queue size + config.setInt(CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY, + 150); try { rpcServer.getClientRpcServer().refreshCallQueue(config); } catch (Exception e) { @@ -158,6 +168,9 @@ public class TestRefreshCallQueue { } finally { DefaultMetricsSystem.setMiniClusterMode(oldValue); } - } + // check callQueueSize has changed + assertEquals(150 * serviceHandlerCount, rpcServer.getClientRpcServer() + .getMaxQueueSize()); + } } \ No newline at end of file