From 7f614ce77eee8b30533c81c7cbeba7b8975eeb6f Mon Sep 17 00:00:00 2001 From: Richard Marscher Date: Mon, 9 Aug 2021 15:54:18 -0400 Subject: [PATCH] HBASE-6908: Add pluggable rpc queue implementation (#3522) Can pass in a FQCN to load as the call queue implementation. Standardized arguments to the constructor are the max queue length, the PriorityFunction, and the Configuration. `PluggableBlockingQueue` abstract class provided to help guide the correct constructor signature Hard fails if the class fails to load as a `BlockingQueue` Signed-off-by: stack --- .../hbase/ipc/PluggableBlockingQueue.java | 55 +++++++ .../hbase/ipc/PluggableRpcQueueNotFound.java | 34 ++++ .../apache/hadoop/hbase/ipc/RpcExecutor.java | 55 ++++++- .../hadoop/hbase/ipc/SimpleRpcScheduler.java | 3 +- .../hbase/ipc/TestPluggableQueueImpl.java | 155 ++++++++++++++++++ .../hbase/ipc/TestSimpleRpcScheduler.java | 75 +++++++++ 6 files changed, 372 insertions(+), 5 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/PluggableBlockingQueue.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/PluggableRpcQueueNotFound.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestPluggableQueueImpl.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/PluggableBlockingQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/PluggableBlockingQueue.java new file mode 100644 index 00000000000..0b88b6ccaa7 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/PluggableBlockingQueue.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.util.concurrent.BlockingQueue; +import org.apache.hadoop.conf.Configuration; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +/** + * Abstract class template for defining a pluggable blocking queue implementation to be used + * by the 'pluggable' call queue type in the RpcExecutor. + * + * The intention is that the constructor shape helps re-inforce the expected parameters needed + * to match up to how the RpcExecutor will instantiate instances of the queue. + * + * If the implementation class implements the + * {@link org.apache.hadoop.hbase.conf.ConfigurationObserver} interface, it will also be wired + * into configuration changes. + * + * Instantiation requires a constructor with {@code + * final int maxQueueLength, + * final PriorityFunction priority, + * final Configuration conf)} + * as the arguments. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public abstract class PluggableBlockingQueue implements BlockingQueue { + protected final int maxQueueLength; + protected final PriorityFunction priority; + protected final Configuration conf; + + public PluggableBlockingQueue(final int maxQueueLength, + final PriorityFunction priority, final Configuration conf) { + this.maxQueueLength = maxQueueLength; + this.priority = priority; + this.conf = conf; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/PluggableRpcQueueNotFound.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/PluggableRpcQueueNotFound.java new file mode 100644 index 00000000000..dade53c7a12 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/PluggableRpcQueueNotFound.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +/** + * Internal runtime error type to indicate the RpcExecutor failed to execute a `pluggable` + * call queue type. Either the FQCN for the class was missing in Configuration, not found on the + * classpath, or is not a subtype of {@code BlockingQueue} + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class PluggableRpcQueueNotFound extends RuntimeException { + public PluggableRpcQueueNotFound(String message) { + super(message); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java index 3de5fa1cbf9..db512340fdb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.Locale; +import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadLocalRandom; @@ -33,6 +34,7 @@ import java.util.HashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -67,6 +69,7 @@ public abstract class RpcExecutor { public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel"; public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline"; public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo"; + public static final String CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE = "pluggable"; public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type"; public static final String CALL_QUEUE_TYPE_CONF_DEFAULT = CALL_QUEUE_TYPE_FIFO_CONF_VALUE; @@ -79,6 +82,9 @@ public abstract class RpcExecutor { public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL = 100; public static final double CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD = 0.8; + public static final String PLUGGABLE_CALL_QUEUE_CLASS_NAME = + "hbase.ipc.server.callqueue.pluggable.queue.class.name"; + private LongAdder numGeneralCallsDropped = new LongAdder(); private LongAdder numLifoModeSwitches = new LongAdder(); @@ -150,13 +156,23 @@ public abstract class RpcExecutor { int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL, CALL_QUEUE_CODEL_DEFAULT_INTERVAL); double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD, CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD); - queueInitArgs = new Object[] { maxQueueLength, codelTargetDelay, codelInterval, + this.queueInitArgs = new Object[] { maxQueueLength, codelTargetDelay, codelInterval, codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches }; - queueClass = AdaptiveLifoCoDelCallQueue.class; + this.queueClass = AdaptiveLifoCoDelCallQueue.class; + } else if (isPluggableQueueType(callQueueType)) { + Optional>> pluggableQueueClass = getPluggableQueueClass(); + + if (!pluggableQueueClass.isPresent()) { + throw new PluggableRpcQueueNotFound("Pluggable call queue failed to load and selected call" + + " queue type required"); + } else { + this.queueInitArgs = new Object[] { maxQueueLength, this.priority, conf }; + this.queueClass = pluggableQueueClass.get(); + } } else { this.name += ".Fifo"; - queueInitArgs = new Object[] { maxQueueLength }; - queueClass = LinkedBlockingQueue.class; + this.queueInitArgs = new Object[] { maxQueueLength }; + this.queueClass = LinkedBlockingQueue.class; } LOG.info("Instantiated {} with queueClass={}; " + @@ -445,6 +461,35 @@ public abstract class RpcExecutor { return callQueueType.equals(CALL_QUEUE_TYPE_FIFO_CONF_VALUE); } + public static boolean isPluggableQueueType(String callQueueType) { + return callQueueType.equals(CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE); + } + + private Optional>> getPluggableQueueClass() { + String queueClassName = conf.get(PLUGGABLE_CALL_QUEUE_CLASS_NAME); + + if (queueClassName == null) { + LOG.error("Pluggable queue class config at " + PLUGGABLE_CALL_QUEUE_CLASS_NAME + + " was not found"); + return Optional.empty(); + } + + try { + Class clazz = Class.forName(queueClassName); + + if (BlockingQueue.class.isAssignableFrom(clazz)) { + return Optional.of((Class>) clazz); + } else { + LOG.error("Pluggable Queue class " + queueClassName + + " does not extend BlockingQueue"); + return Optional.empty(); + } + } catch (ClassNotFoundException exception) { + LOG.error("Could not find " + queueClassName + " on the classpath to load."); + return Optional.empty(); + } + } + public long getNumGeneralCallsDropped() { return numGeneralCallsDropped.longValue(); } @@ -522,6 +567,8 @@ public abstract class RpcExecutor { if (queue instanceof AdaptiveLifoCoDelCallQueue) { ((AdaptiveLifoCoDelCallQueue) queue).updateTunables(codelTargetDelay, codelInterval, codelLifoThreshold); + } else if (queue instanceof ConfigurationObserver) { + ((ConfigurationObserver)queue).onConfigurationChange(conf); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index b4bc347c828..3b7c0bb1a24 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -154,7 +154,8 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs String callQueueType = conf.get(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, RpcExecutor.CALL_QUEUE_TYPE_CONF_DEFAULT); - if (RpcExecutor.isCodelQueueType(callQueueType)) { + if (RpcExecutor.isCodelQueueType(callQueueType) || + RpcExecutor.isPluggableQueueType(callQueueType)) { callExecutor.onConfigurationChange(conf); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestPluggableQueueImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestPluggableQueueImpl.java new file mode 100644 index 00000000000..eeb057cbeaf --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestPluggableQueueImpl.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.util.Collection; +import java.util.Comparator; +import java.util.Iterator; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; +import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue; + +/** + * Implementation of the PluggableBlockingQueue abstract class. + * + * Used to verify that the pluggable call queue type for the RpcExecutor can load correctly + * via the FQCN reflection semantics. + */ +public class TestPluggableQueueImpl extends PluggableBlockingQueue implements + ConfigurationObserver { + + private final BoundedPriorityBlockingQueue inner; + private static boolean configurationRecentlyChanged = false; + + public TestPluggableQueueImpl(int maxQueueLength, PriorityFunction priority, Configuration conf) { + super(maxQueueLength, priority, conf); + Comparator comparator = Comparator.comparingInt(r -> r.getRpcCall().getPriority()); + inner = new BoundedPriorityBlockingQueue<>(maxQueueLength, comparator); + configurationRecentlyChanged = false; + } + + @Override public boolean add(CallRunner callRunner) { + return inner.add(callRunner); + } + + @Override public boolean offer(CallRunner callRunner) { + return inner.offer(callRunner); + } + + @Override public CallRunner remove() { + return inner.remove(); + } + + @Override public CallRunner poll() { + return inner.poll(); + } + + @Override public CallRunner element() { + return inner.element(); + } + + @Override public CallRunner peek() { + return inner.peek(); + } + + @Override public void put(CallRunner callRunner) throws InterruptedException { + inner.put(callRunner); + } + + @Override public boolean offer(CallRunner callRunner, long timeout, TimeUnit unit) + throws InterruptedException { + return inner.offer(callRunner, timeout, unit); + } + + @Override public CallRunner take() throws InterruptedException { + return inner.take(); + } + + @Override public CallRunner poll(long timeout, TimeUnit unit) throws InterruptedException { + return inner.poll(timeout, unit); + } + + @Override public int remainingCapacity() { + return inner.remainingCapacity(); + } + + @Override public boolean remove(Object o) { + return inner.remove(o); + } + + @Override public boolean containsAll(Collection c) { + return inner.containsAll(c); + } + + @Override public boolean addAll(Collection c) { + return inner.addAll(c); + } + + @Override public boolean removeAll(Collection c) { + return inner.removeAll(c); + } + + @Override public boolean retainAll(Collection c) { + return inner.retainAll(c); + } + + @Override public void clear() { + inner.clear(); + } + + @Override public int size() { + return inner.size(); + } + + @Override public boolean isEmpty() { + return inner.isEmpty(); + } + + @Override public boolean contains(Object o) { + return inner.contains(o); + } + + @Override public Iterator iterator() { + return inner.iterator(); + } + + @Override public Object[] toArray() { + return inner.toArray(); + } + + @Override public T[] toArray(T[] a) { + return inner.toArray(a); + } + + @Override public int drainTo(Collection c) { + return inner.drainTo(c); + } + + @Override public int drainTo(Collection c, int maxElements) { + return inner.drainTo(c, maxElements); + } + + public static boolean hasObservedARecentConfigurationChange() { + return configurationRecentlyChanged; + } + + @Override public void onConfigurationChange(Configuration conf) { + configurationRecentlyChanged = true; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java index f791421df7b..7d32f35fa44 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; @@ -247,10 +248,84 @@ public class TestSimpleRpcScheduler { testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE); } + @Test + public void testPluggableRpcQueue() throws Exception { + testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE, + "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl"); + + try { + testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE, + "MissingClass"); + fail("Expected a PluggableRpcQueueNotFound for unloaded class"); + } catch (PluggableRpcQueueNotFound e) { + // expected + } catch (Exception e) { + fail("Expected a PluggableRpcQueueNotFound for unloaded class, but instead got " + e); + } + + try { + testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE, + "org.apache.hadoop.hbase.ipc.SimpleRpcServer"); + fail("Expected a PluggableRpcQueueNotFound for incompatible class"); + } catch (PluggableRpcQueueNotFound e) { + // expected + } catch (Exception e) { + fail("Expected a PluggableRpcQueueNotFound for incompatible class, but instead got " + e); + } + } + + @Test + public void testPluggableRpcQueueCanListenToConfigurationChanges() throws Exception { + + Configuration schedConf = HBaseConfiguration.create(); + + schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 2); + schedConf.setInt("hbase.ipc.server.max.callqueue.length", 5); + schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, + RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE); + schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, + "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl"); + + PriorityFunction priority = mock(PriorityFunction.class); + when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); + SimpleRpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, + HConstants.QOS_THRESHOLD); + try { + scheduler.start(); + + CallRunner putCallTask = mock(CallRunner.class); + ServerCall putCall = mock(ServerCall.class); + putCall.param = RequestConverter.buildMutateRequest( + Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))); + RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build(); + when(putCallTask.getRpcCall()).thenReturn(putCall); + when(putCall.getHeader()).thenReturn(putHead); + + assertTrue(scheduler.dispatch(putCallTask)); + + schedConf.setInt("hbase.ipc.server.max.callqueue.length", 4); + scheduler.onConfigurationChange(schedConf); + assertTrue(TestPluggableQueueImpl.hasObservedARecentConfigurationChange()); + waitUntilQueueEmpty(scheduler); + } finally { + scheduler.stop(); + } + } + private void testRpcScheduler(final String queueType) throws Exception { + testRpcScheduler(queueType, null); + } + + private void testRpcScheduler(final String queueType, final String pluggableQueueClass) + throws Exception { + Configuration schedConf = HBaseConfiguration.create(); schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType); + if (RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE.equals(queueType)) { + schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, pluggableQueueClass); + } + PriorityFunction priority = mock(PriorityFunction.class); when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);