HBASE-15470 Add a setting for Priority queue length

Summary:
Move the config keys to one place
Make Two different config keys. One for default, one for priority

Test Plan: unit tests

Differential Revision: https://reviews.facebook.net/D55575
This commit is contained in:
Elliott Clark 2016-03-16 10:13:46 -07:00
parent ecec35ae4e
commit ac1a7a4a78
4 changed files with 19 additions and 5 deletions

View File

@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicInteger;
* This can be used for HMaster, where no prioritization is needed.
*/
public class FifoRpcScheduler extends RpcScheduler {
private final int handlerCount;
private final int maxQueueLength;
private final AtomicInteger queueSize = new AtomicInteger(0);
@ -40,7 +39,7 @@ public class FifoRpcScheduler extends RpcScheduler {
public FifoRpcScheduler(Configuration conf, int handlerCount) {
this.handlerCount = handlerCount;
this.maxQueueLength = conf.getInt("hbase.ipc.server.max.callqueue.length",
this.maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH,
handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
}

View File

@ -43,6 +43,7 @@ public abstract class RpcExecutor {
private static final Log LOG = LogFactory.getLog(RpcExecutor.class);
protected static final int DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT = 250;
protected volatile int currentQueueLimit;
private final AtomicInteger activeHandlerCount = new AtomicInteger(0);
@ -219,6 +220,10 @@ public abstract class RpcExecutor {
* @param conf updated configuration
*/
public void resizeQueues(Configuration conf) {
currentQueueLimit = conf.getInt("hbase.ipc.server.max.callqueue.length", currentQueueLimit);
String configKey = RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH;
if (name != null && name.toLowerCase().contains("priority")) {
configKey = RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH;
}
currentQueueLimit = conf.getInt(configKey, currentQueueLimit);
}
}

View File

@ -31,6 +31,11 @@ import java.net.InetSocketAddress;
@InterfaceStability.Evolving
public abstract class RpcScheduler {
public static final String IPC_SERVER_MAX_CALLQUEUE_LENGTH =
"hbase.ipc.server.max.callqueue.length";
public static final String IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH =
"hbase.ipc.server.priority.max.callqueue.length";
/** Exposes runtime information of a {@code RpcServer} that a {@code RpcScheduler} may need. */
static abstract class Context {
public abstract InetSocketAddress getListenerAddress();

View File

@ -166,8 +166,12 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
PriorityFunction priority,
Abortable server,
int highPriorityLevel) {
int maxQueueLength = conf.getInt("hbase.ipc.server.max.callqueue.length",
int maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH,
handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
int maxPriorityQueueLength =
conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH, maxQueueLength);
this.priority = priority;
this.highPriorityLevel = highPriorityLevel;
this.abortable = server;
@ -226,7 +230,8 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
// Create 2 queues to help priorityExecutor be more scalable.
this.priorityExecutor = priorityHandlerCount > 0 ?
new BalancedQueueRpcExecutor("Priority", priorityHandlerCount, 2, maxQueueLength) : null;
new BalancedQueueRpcExecutor("Priority", priorityHandlerCount, 2, maxPriorityQueueLength) :
null;
this.replicationExecutor =
replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("Replication",