From 1145581489d0797f813dc396fdbfa47b6a3bcf28 Mon Sep 17 00:00:00 2001 From: Richard Marscher Date: Fri, 18 Feb 2022 13:00:10 -0500 Subject: [PATCH] HBASE-26576 Allow pluggable queue to be used with the fast path executor or normal balanced executor (#3944) Signed-off-by: Andrew Purtell --- .../apache/hadoop/hbase/ipc/RpcExecutor.java | 7 +++ .../hadoop/hbase/ipc/SimpleRpcScheduler.java | 4 +- .../hbase/ipc/TestSimpleRpcScheduler.java | 43 +++++++++++++++++-- 3 files changed, 49 insertions(+), 5 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java index a215d9e7657..81e9f5310a6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java @@ -82,6 +82,8 @@ public abstract class RpcExecutor { public static final String PLUGGABLE_CALL_QUEUE_CLASS_NAME = "hbase.ipc.server.callqueue.pluggable.queue.class.name"; + public static final String PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED = + "hbase.ipc.server.callqueue.pluggable.queue.fast.path.enabled"; private LongAdder numGeneralCallsDropped = new LongAdder(); private LongAdder numLifoModeSwitches = new LongAdder(); @@ -380,6 +382,11 @@ public abstract class RpcExecutor { return callQueueType.equals(CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE); } + public static boolean isPluggableQueueWithFastPath(String callQueueType, Configuration conf) { + return isPluggableQueueType(callQueueType) && + conf.getBoolean(PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED, false); + } + private Optional>> getPluggableQueueClass() { String queueClassName = conf.get(PLUGGABLE_CALL_QUEUE_CLASS_NAME); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index 1b8887a3ed8..be0d531345a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -88,7 +88,9 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs callExecutor = new FastPathRWQueueRpcExecutor("default.FPRWQ", Math.max(2, handlerCount), maxQueueLength, priority, conf, server); } else { - if (RpcExecutor.isFifoQueueType(callQueueType) || RpcExecutor.isCodelQueueType(callQueueType)) { + if (RpcExecutor.isFifoQueueType(callQueueType) || + RpcExecutor.isCodelQueueType(callQueueType) || + RpcExecutor.isPluggableQueueWithFastPath(callQueueType, conf)) { callExecutor = new FastPathBalancedQueueRpcExecutor("default.FPBQ", handlerCount, maxQueueLength, priority, conf, server); } else { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java index 286094c5bcc..3f05e861e3b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.testclassification.RPCTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.Threads; import org.junit.Before; import org.junit.ClassRule; @@ -274,6 +275,41 @@ public class TestSimpleRpcScheduler { } } + @Test + public void testPluggableRpcQueueWireUpWithFastPathExecutor() throws Exception { + String queueType = RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE; + Configuration schedConf = HBaseConfiguration.create(); + schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType); + schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl"); + schedConf.setBoolean(RpcExecutor.PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED, true); + + PriorityFunction priority = mock(PriorityFunction.class); + when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); + SimpleRpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, + HConstants.QOS_THRESHOLD); + + Field f = scheduler.getClass().getDeclaredField("callExecutor"); + f.setAccessible(true); + assertTrue(f.get(scheduler) instanceof FastPathBalancedQueueRpcExecutor); + } + + @Test + public void testPluggableRpcQueueWireUpWithoutFastPathExecutor() throws Exception { + String queueType = RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE; + Configuration schedConf = HBaseConfiguration.create(); + schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType); + schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl"); + + PriorityFunction priority = mock(PriorityFunction.class); + when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); + SimpleRpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, + HConstants.QOS_THRESHOLD); + + Field f = scheduler.getClass().getDeclaredField("callExecutor"); + f.setAccessible(true); + assertTrue(f.get(scheduler) instanceof BalancedQueueRpcExecutor); + } + @Test public void testPluggableRpcQueueCanListenToConfigurationChanges() throws Exception { @@ -316,9 +352,7 @@ public class TestSimpleRpcScheduler { testRpcScheduler(queueType, null); } - private void testRpcScheduler(final String queueType, final String pluggableQueueClass) - throws Exception { - + private void testRpcScheduler(final String queueType, final String pluggableQueueClass) throws Exception { Configuration schedConf = HBaseConfiguration.create(); schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType); @@ -388,7 +422,8 @@ public class TestSimpleRpcScheduler { // -> WITH REORDER [10 10 10 10 10 10 50 100] -> 530 (Deadline Queue) if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { assertEquals(530, totalTime); - } else if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)) { + } else if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE) || + queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE)) { assertEquals(930, totalTime); } } finally {