HADOOP-15253. Should update maxQueueSize when refresh call queue. Contributed by Tao Jie.

This commit is contained in:
Konstantin V Shvachko 2018-03-30 18:55:35 -07:00
parent 659074728e
commit acfd764fcc
2 changed files with 18 additions and 2 deletions

View File

@ -657,6 +657,9 @@ public abstract class Server {
public synchronized void refreshCallQueue(Configuration conf) { public synchronized void refreshCallQueue(Configuration conf) {
// Create the next queue // Create the next queue
String prefix = getQueueClassPrefix(); String prefix = getQueueClassPrefix();
this.maxQueueSize = handlerCount * conf.getInt(
CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,
CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);
callQueue.swapQueue(getSchedulerClass(prefix, conf), callQueue.swapQueue(getSchedulerClass(prefix, conf),
getQueueClass(prefix, conf), maxQueueSize, prefix, conf); getQueueClass(prefix, conf), maxQueueSize, prefix, conf);
} }

View File

@ -30,8 +30,10 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer; import org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer;
import org.apache.hadoop.hdfs.tools.DFSAdmin; import org.apache.hadoop.hdfs.tools.DFSAdmin;
@ -141,8 +143,16 @@ public class TestRefreshCallQueue {
// throw an error when we double-initialize JvmMetrics // throw an error when we double-initialize JvmMetrics
DefaultMetricsSystem.setMiniClusterMode(false); DefaultMetricsSystem.setMiniClusterMode(false);
int serviceHandlerCount = config.getInt(
DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
NameNodeRpcServer rpcServer = (NameNodeRpcServer) cluster.getNameNodeRpc(); NameNodeRpcServer rpcServer = (NameNodeRpcServer) cluster.getNameNodeRpc();
// check callqueue size
assertEquals(CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT
* serviceHandlerCount, rpcServer.getClientRpcServer().getMaxQueueSize());
// Replace queue and update queue size
config.setInt(CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,
150);
try { try {
rpcServer.getClientRpcServer().refreshCallQueue(config); rpcServer.getClientRpcServer().refreshCallQueue(config);
} catch (Exception e) { } catch (Exception e) {
@ -158,6 +168,9 @@ public class TestRefreshCallQueue {
} finally { } finally {
DefaultMetricsSystem.setMiniClusterMode(oldValue); DefaultMetricsSystem.setMiniClusterMode(oldValue);
} }
// check callQueueSize has changed
assertEquals(150 * serviceHandlerCount, rpcServer.getClientRpcServer()
.getMaxQueueSize());
} }
} }