HBASE-26576 Allow pluggable queue to be used with the fast path executor or normal balanced executor (#3944)

Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Richard Marscher 2022-02-18 13:00:10 -05:00 committed by Andrew Purtell
parent 9999f875c9
commit f358125fe5
3 changed files with 49 additions and 5 deletions

View File

@ -84,6 +84,8 @@ public abstract class RpcExecutor {
public static final String PLUGGABLE_CALL_QUEUE_CLASS_NAME =
"hbase.ipc.server.callqueue.pluggable.queue.class.name";
public static final String PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED =
"hbase.ipc.server.callqueue.pluggable.queue.fast.path.enabled";
private LongAdder numGeneralCallsDropped = new LongAdder();
private LongAdder numLifoModeSwitches = new LongAdder();
@ -465,6 +467,11 @@ public abstract class RpcExecutor {
return callQueueType.equals(CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE);
}
public static boolean isPluggableQueueWithFastPath(String callQueueType, Configuration conf) {
return isPluggableQueueType(callQueueType) &&
conf.getBoolean(PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED, false);
}
private Optional<Class<? extends BlockingQueue<CallRunner>>> getPluggableQueueClass() {
String queueClassName = conf.get(PLUGGABLE_CALL_QUEUE_CLASS_NAME);

View File

@ -88,7 +88,9 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
callExecutor = new RWQueueRpcExecutor("default.RWQ", Math.max(2, handlerCount),
maxQueueLength, priority, conf, server);
} else {
if (RpcExecutor.isFifoQueueType(callQueueType) || RpcExecutor.isCodelQueueType(callQueueType)) {
if (RpcExecutor.isFifoQueueType(callQueueType) ||
RpcExecutor.isCodelQueueType(callQueueType) ||
RpcExecutor.isPluggableQueueWithFastPath(callQueueType, conf)) {
callExecutor = new FastPathBalancedQueueRpcExecutor("default.FPBQ", handlerCount,
maxQueueLength, priority, conf, server);
} else {

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.Before;
import org.junit.ClassRule;
@ -274,6 +275,41 @@ public class TestSimpleRpcScheduler {
}
}
@Test
public void testPluggableRpcQueueWireUpWithFastPathExecutor() throws Exception {
String queueType = RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE;
Configuration schedConf = HBaseConfiguration.create();
schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl");
schedConf.setBoolean(RpcExecutor.PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED, true);
PriorityFunction priority = mock(PriorityFunction.class);
when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
SimpleRpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 0, 0, 0, priority,
HConstants.QOS_THRESHOLD);
Field f = scheduler.getClass().getDeclaredField("callExecutor");
f.setAccessible(true);
assertTrue(f.get(scheduler) instanceof FastPathBalancedQueueRpcExecutor);
}
@Test
public void testPluggableRpcQueueWireUpWithoutFastPathExecutor() throws Exception {
String queueType = RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE;
Configuration schedConf = HBaseConfiguration.create();
schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl");
PriorityFunction priority = mock(PriorityFunction.class);
when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
SimpleRpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 0, 0, 0, priority,
HConstants.QOS_THRESHOLD);
Field f = scheduler.getClass().getDeclaredField("callExecutor");
f.setAccessible(true);
assertTrue(f.get(scheduler) instanceof BalancedQueueRpcExecutor);
}
@Test
public void testPluggableRpcQueueCanListenToConfigurationChanges() throws Exception {
@ -316,9 +352,7 @@ public class TestSimpleRpcScheduler {
testRpcScheduler(queueType, null);
}
private void testRpcScheduler(final String queueType, final String pluggableQueueClass)
throws Exception {
private void testRpcScheduler(final String queueType, final String pluggableQueueClass) throws Exception {
Configuration schedConf = HBaseConfiguration.create();
schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
@ -388,7 +422,8 @@ public class TestSimpleRpcScheduler {
// -> WITH REORDER [10 10 10 10 10 10 50 100] -> 530 (Deadline Queue)
if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
assertEquals(530, totalTime);
} else if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)) {
} else if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE) ||
queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE)) {
assertEquals(930, totalTime);
}
} finally {