HBASE-20499 Replication/Priority executors can use specific max queue length as default value instead of general maxQueueLength
Signed-off-by: tedyu <yuzhihong@gmail.com>
This commit is contained in:
parent
a136303833
commit
6d080762ef
|
@ -486,8 +486,12 @@ public abstract class RpcExecutor {
|
||||||
*/
|
*/
|
||||||
public void resizeQueues(Configuration conf) {
|
public void resizeQueues(Configuration conf) {
|
||||||
String configKey = RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH;
|
String configKey = RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH;
|
||||||
if (name != null && name.toLowerCase(Locale.ROOT).contains("priority")) {
|
if (name != null) {
|
||||||
configKey = RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH;
|
if (name.toLowerCase(Locale.ROOT).contains("priority")) {
|
||||||
|
configKey = RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH;
|
||||||
|
} else if (name.toLowerCase(Locale.ROOT).contains("replication")) {
|
||||||
|
configKey = RpcScheduler.IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
currentQueueLimit = conf.getInt(configKey, currentQueueLimit);
|
currentQueueLimit = conf.getInt(configKey, currentQueueLimit);
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,6 +35,8 @@ public abstract class RpcScheduler {
|
||||||
"hbase.ipc.server.max.callqueue.length";
|
"hbase.ipc.server.max.callqueue.length";
|
||||||
public static final String IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH =
|
public static final String IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH =
|
||||||
"hbase.ipc.server.priority.max.callqueue.length";
|
"hbase.ipc.server.priority.max.callqueue.length";
|
||||||
|
public static final String IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH =
|
||||||
|
"hbase.ipc.server.replication.max.callqueue.length";
|
||||||
|
|
||||||
/** Exposes runtime information of a {@code RpcServer} that a {@code RpcScheduler} may need. */
|
/** Exposes runtime information of a {@code RpcServer} that a {@code RpcScheduler} may need. */
|
||||||
public static abstract class Context {
|
public static abstract class Context {
|
||||||
|
|
|
@ -65,8 +65,11 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
|
||||||
|
|
||||||
int maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH,
|
int maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH,
|
||||||
handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
|
handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
|
||||||
int maxPriorityQueueLength =
|
int maxPriorityQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH,
|
||||||
conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH, maxQueueLength);
|
priorityHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
|
||||||
|
int maxReplicationQueueLength =
|
||||||
|
conf.getInt(RpcScheduler.IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH,
|
||||||
|
replicationHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
|
||||||
|
|
||||||
this.priority = priority;
|
this.priority = priority;
|
||||||
this.highPriorityLevel = highPriorityLevel;
|
this.highPriorityLevel = highPriorityLevel;
|
||||||
|
@ -94,9 +97,12 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
|
||||||
this.priorityExecutor = priorityHandlerCount > 0 ? new FastPathBalancedQueueRpcExecutor(
|
this.priorityExecutor = priorityHandlerCount > 0 ? new FastPathBalancedQueueRpcExecutor(
|
||||||
"priority.FPBQ", priorityHandlerCount, RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE,
|
"priority.FPBQ", priorityHandlerCount, RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE,
|
||||||
maxPriorityQueueLength, priority, conf, abortable) : null;
|
maxPriorityQueueLength, priority, conf, abortable) : null;
|
||||||
this.replicationExecutor = replicationHandlerCount > 0 ? new FastPathBalancedQueueRpcExecutor(
|
this.replicationExecutor =
|
||||||
"replication.FPBQ", replicationHandlerCount, RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE,
|
replicationHandlerCount > 0
|
||||||
maxQueueLength, priority, conf, abortable) : null;
|
? new FastPathBalancedQueueRpcExecutor("replication.FPBQ", replicationHandlerCount,
|
||||||
|
RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxReplicationQueueLength, priority,
|
||||||
|
conf, abortable)
|
||||||
|
: null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue