diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java index ef163640e51..e00ca6a991c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java @@ -438,8 +438,12 @@ public abstract class RpcExecutor { */ public void resizeQueues(Configuration conf) { String configKey = RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH; - if (name != null && name.toLowerCase(Locale.ROOT).contains("priority")) { - configKey = RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH; + if (name != null) { + if (name.toLowerCase(Locale.ROOT).contains("priority")) { + configKey = RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH; + } else if (name.toLowerCase(Locale.ROOT).contains("replication")) { + configKey = RpcScheduler.IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH; + } } final int queueLimit = currentQueueLimit; currentQueueLimit = conf.getInt(configKey, queueLimit); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index 82ddcaaf5ea..ace61015ca6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -71,6 +71,9 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); int maxPriorityQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH, priorityHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); + int maxReplicationQueueLength = + conf.getInt(RpcScheduler.IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH, + replicationHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); int maxBulkLoadQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_BULKLOAD_MAX_CALLQUEUE_LENGTH, bulkLoadHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); @@ -116,7 +119,8 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs } this.replicationExecutor = replicationHandlerCount > 0 ? new FastPathBalancedQueueRpcExecutor("replication.FPBQ", replicationHandlerCount, - RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxQueueLength, priority, conf, abortable) + RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxReplicationQueueLength, priority, conf, + abortable) : null; this.metaTransitionExecutor = metaTransitionHandler > 0