diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java index e4205ebd814..350522187c8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java @@ -22,6 +22,8 @@ import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -36,6 +38,7 @@ import org.apache.hadoop.hbase.util.ReflectionUtils; @InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX }) @InterfaceStability.Evolving public class BalancedQueueRpcExecutor extends RpcExecutor { + private static final Log LOG = LogFactory.getLog(BalancedQueueRpcExecutor.class); protected final List> queues; private final QueueBalancer balancer; @@ -62,6 +65,7 @@ public class BalancedQueueRpcExecutor extends RpcExecutor { queues = new ArrayList>(numQueues); this.balancer = getBalancer(numQueues); initializeQueues(numQueues, queueClass, initargs); + LOG.debug(name + " queues=" + numQueues + " handlerCount=" + handlerCount); } protected void initializeQueues(final int numQueues, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java index ee36f3f0c34..70d903a6261 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.ipc; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DaemonThreadFactory; @@ -32,6 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger; * This can be used for HMaster, where no prioritization is needed. */ public class FifoRpcScheduler extends RpcScheduler { + private static final Log LOG = LogFactory.getLog(FifoRpcScheduler.class); private final int handlerCount; private final int maxQueueLength; private final AtomicInteger queueSize = new AtomicInteger(0); @@ -41,6 +44,8 @@ public class FifoRpcScheduler extends RpcScheduler { this.handlerCount = handlerCount; this.maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); + LOG.info("Using " + this.getClass().getSimpleName() + " as user call queue; handlerCount=" + + handlerCount + "; maxQueueLength=" + maxQueueLength); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index 431aeebf766..d9d61c1971e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -34,8 +34,11 @@ import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue; /** - * A scheduler that maintains isolated handler pools for general, - * high-priority, and replication requests. + * The default scheduler. Configurable. Maintains isolated handler pools for general ('default'), + * high-priority ('priority'), and replication ('replication') requests. Default behavior is to + * balance the requests across handlers. Add configs to enable balancing by read vs writes, etc. + * See below article for explanation of options. + * @see Overview on Request Queuing */ @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) @InterfaceStability.Evolving @@ -49,7 +52,8 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = "hbase.ipc.server.callqueue.handler.factor"; - /** If set to 'deadline', uses a priority queue and deprioritize long-running scans */ + /** If set to 'deadline', the default, uses a priority queue and deprioritizes long-running scans + */ public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type"; public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel"; public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline"; @@ -190,54 +194,58 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0); int numCallQueues = Math.max(1, (int)Math.round(handlerCount * callQueuesHandlersFactor)); - - LOG.info("Using " + callQueueType + " as user call queue, count=" + numCallQueues); - + LOG.info("Using " + callQueueType + " as user call queue; numCallQueues=" + numCallQueues + + "; callQReadShare=" + callqReadShare + ", callQScanShare=" + callqScanShare); if (numCallQueues > 1 && callqReadShare > 0) { // multiple read/write queues - if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { + if (isDeadlineQueueType(callQueueType)) { CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority); - callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues, + callExecutor = new RWQueueRpcExecutor("RWQ.default", handlerCount, numCallQueues, callqReadShare, callqScanShare, maxQueueLength, conf, abortable, BoundedPriorityBlockingQueue.class, callPriority); } else if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) { Object[] callQueueInitArgs = {maxQueueLength, codelTargetDelay, codelInterval, codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches}; - callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, + callExecutor = new RWQueueRpcExecutor("RWQ.default", handlerCount, numCallQueues, callqReadShare, callqScanShare, AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs, AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs); } else { - callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues, + callExecutor = new RWQueueRpcExecutor("RWQ.default", handlerCount, numCallQueues, callqReadShare, callqScanShare, maxQueueLength, conf, abortable); } } else { // multiple queues - if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { + if (isDeadlineQueueType(callQueueType)) { CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority); - callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues, - conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority); + callExecutor = + new BalancedQueueRpcExecutor("BalancedQ.default", handlerCount, numCallQueues, + conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority); } else if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) { - callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues, - conf, abortable, AdaptiveLifoCoDelCallQueue.class, maxQueueLength, - codelTargetDelay, codelInterval, codelLifoThreshold, - numGeneralCallsDropped, numLifoModeSwitches); + callExecutor = + new BalancedQueueRpcExecutor("BalancedQ.default", handlerCount, numCallQueues, + conf, abortable, AdaptiveLifoCoDelCallQueue.class, maxQueueLength, + codelTargetDelay, codelInterval, codelLifoThreshold, + numGeneralCallsDropped, numLifoModeSwitches); } else { - callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, + callExecutor = new BalancedQueueRpcExecutor("BalancedQ.default", handlerCount, numCallQueues, maxQueueLength, conf, abortable); } } - // Create 2 queues to help priorityExecutor be more scalable. this.priorityExecutor = priorityHandlerCount > 0 ? - new BalancedQueueRpcExecutor("Priority", priorityHandlerCount, 2, maxPriorityQueueLength) : - null; - + new BalancedQueueRpcExecutor("BalancedQ.priority", priorityHandlerCount, 2, + maxPriorityQueueLength): + null; this.replicationExecutor = - replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("Replication", + replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("BalancedQ.replication", replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null; } + private static boolean isDeadlineQueueType(final String callQueueType) { + return callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); + } + public SimpleRpcScheduler( Configuration conf, int handlerCount, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FifoRpcSchedulerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FifoRpcSchedulerFactory.java new file mode 100644 index 00000000000..f4b51ba3808 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FifoRpcSchedulerFactory.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.ipc.FifoRpcScheduler; +import org.apache.hadoop.hbase.ipc.PriorityFunction; +import org.apache.hadoop.hbase.ipc.RpcScheduler; + +/** + * Factory to use when you want to use the {@link FifoRpcScheduler} + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class FifoRpcSchedulerFactory implements RpcSchedulerFactory { + @Override + public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) { + int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, + HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT); + return new FifoRpcScheduler(conf, handlerCount); + } + + @Deprecated + @Override + public RpcScheduler create(Configuration conf, PriorityFunction priority) { + return create(conf, priority, null); + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RpcSchedulerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RpcSchedulerFactory.java index f5547816370..7bc59daf884 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RpcSchedulerFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RpcSchedulerFactory.java @@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.ipc.RpcScheduler; @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) @InterfaceStability.Evolving public interface RpcSchedulerFactory { - /** * Constructs a {@link org.apache.hadoop.hbase.ipc.RpcScheduler}. */ @@ -39,5 +38,4 @@ public interface RpcSchedulerFactory { @Deprecated RpcScheduler create(Configuration conf, PriorityFunction priority); - -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java index 743c5bb89ad..92462c845bb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java @@ -27,11 +27,11 @@ import org.apache.hadoop.hbase.ipc.PriorityFunction; import org.apache.hadoop.hbase.ipc.RpcScheduler; import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler; -/** Constructs a {@link SimpleRpcScheduler}. */ +/** Constructs a {@link SimpleRpcScheduler}. + */ @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) @InterfaceStability.Evolving public class SimpleRpcSchedulerFactory implements RpcSchedulerFactory { - @Override @Deprecated public RpcScheduler create(Configuration conf, PriorityFunction priority) { @@ -42,7 +42,6 @@ public class SimpleRpcSchedulerFactory implements RpcSchedulerFactory { public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) { int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT); - return new SimpleRpcScheduler( conf, handlerCount, @@ -54,5 +53,4 @@ public class SimpleRpcSchedulerFactory implements RpcSchedulerFactory { server, HConstants.QOS_THRESHOLD); } - } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRpcSchedulerFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRpcSchedulerFactory.java new file mode 100644 index 00000000000..9366c549597 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRpcSchedulerFactory.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CategoryBasedTimeout; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.ipc.FifoRpcScheduler; +import org.apache.hadoop.hbase.ipc.RpcScheduler; +import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.rules.TestRule; + +/** + * A silly test that does nothing but make sure an rpcscheduler factory makes what it says + * it is going to make. + */ +@Category(SmallTests.class) +public class TestRpcSchedulerFactory { + @Rule public TestName testName = new TestName(); + @ClassRule public static TestRule timeout = + CategoryBasedTimeout.forClass(TestRpcSchedulerFactory.class); + private Configuration conf; + + @Before + public void setUp() throws Exception { + this.conf = HBaseConfiguration.create(); + } + + @Test + public void testRWQ() { + // Set some configs just to see how it changes the scheduler. Can't assert the settings had + // an effect. Just eyeball the log. + this.conf.setDouble(SimpleRpcScheduler.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5); + this.conf.setDouble(SimpleRpcScheduler.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.5); + this.conf.setDouble(SimpleRpcScheduler.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5); + RpcSchedulerFactory factory = new SimpleRpcSchedulerFactory(); + RpcScheduler rpcScheduler = factory.create(this.conf, null, null); + assertTrue(rpcScheduler.getClass().equals(SimpleRpcScheduler.class)); + } + + @Test + public void testFifo() { + RpcSchedulerFactory factory = new FifoRpcSchedulerFactory(); + RpcScheduler rpcScheduler = factory.create(this.conf, null, null); + assertTrue(rpcScheduler.getClass().equals(FifoRpcScheduler.class)); + } +} \ No newline at end of file