HBASE-20499 Replication/Priority executors can use specific max queue length as default value instead of general maxQueueLength

Signed-off-by: tedyu <yuzhihong@gmail.com>
(cherry picked from commit 6d080762ef)
This commit is contained in:
Nihal Jain 2018-04-27 14:13:57 +05:30 committed by Duo Zhang
parent 642e933cda
commit eb73492eac
2 changed files with 11 additions and 3 deletions

View File

@ -438,8 +438,12 @@ public abstract class RpcExecutor {
*/ */
public void resizeQueues(Configuration conf) { public void resizeQueues(Configuration conf) {
String configKey = RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH; String configKey = RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH;
if (name != null && name.toLowerCase(Locale.ROOT).contains("priority")) { if (name != null) {
configKey = RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH; if (name.toLowerCase(Locale.ROOT).contains("priority")) {
configKey = RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH;
} else if (name.toLowerCase(Locale.ROOT).contains("replication")) {
configKey = RpcScheduler.IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH;
}
} }
final int queueLimit = currentQueueLimit; final int queueLimit = currentQueueLimit;
currentQueueLimit = conf.getInt(configKey, queueLimit); currentQueueLimit = conf.getInt(configKey, queueLimit);

View File

@ -71,6 +71,9 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
int maxPriorityQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH, int maxPriorityQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH,
priorityHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); priorityHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
int maxReplicationQueueLength =
conf.getInt(RpcScheduler.IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH,
replicationHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
int maxBulkLoadQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_BULKLOAD_MAX_CALLQUEUE_LENGTH, int maxBulkLoadQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_BULKLOAD_MAX_CALLQUEUE_LENGTH,
bulkLoadHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); bulkLoadHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
@ -116,7 +119,8 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
} }
this.replicationExecutor = replicationHandlerCount > 0 this.replicationExecutor = replicationHandlerCount > 0
? new FastPathBalancedQueueRpcExecutor("replication.FPBQ", replicationHandlerCount, ? new FastPathBalancedQueueRpcExecutor("replication.FPBQ", replicationHandlerCount,
RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxQueueLength, priority, conf, abortable) RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxReplicationQueueLength, priority, conf,
abortable)
: null; : null;
this.metaTransitionExecutor = metaTransitionHandler > 0 this.metaTransitionExecutor = metaTransitionHandler > 0