HBASE-15994 Allow selection of RpcSchedulers
Adds logging by the RpcExecutors of their run configs Adds a FifoRpcSchedulerFactory so you can try Fifo scheduler.
This commit is contained in:
parent
407aa4d496
commit
031b745001
|
@ -22,6 +22,8 @@ import java.util.List;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Abortable;
|
import org.apache.hadoop.hbase.Abortable;
|
||||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||||
|
@ -36,6 +38,7 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||||
@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX })
|
@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX })
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public class BalancedQueueRpcExecutor extends RpcExecutor {
|
public class BalancedQueueRpcExecutor extends RpcExecutor {
|
||||||
|
private static final Log LOG = LogFactory.getLog(BalancedQueueRpcExecutor.class);
|
||||||
|
|
||||||
protected final List<BlockingQueue<CallRunner>> queues;
|
protected final List<BlockingQueue<CallRunner>> queues;
|
||||||
private final QueueBalancer balancer;
|
private final QueueBalancer balancer;
|
||||||
|
@ -62,6 +65,7 @@ public class BalancedQueueRpcExecutor extends RpcExecutor {
|
||||||
queues = new ArrayList<BlockingQueue<CallRunner>>(numQueues);
|
queues = new ArrayList<BlockingQueue<CallRunner>>(numQueues);
|
||||||
this.balancer = getBalancer(numQueues);
|
this.balancer = getBalancer(numQueues);
|
||||||
initializeQueues(numQueues, queueClass, initargs);
|
initializeQueues(numQueues, queueClass, initargs);
|
||||||
|
LOG.debug(name + " queues=" + numQueues + " handlerCount=" + handlerCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void initializeQueues(final int numQueues,
|
protected void initializeQueues(final int numQueues,
|
||||||
|
|
|
@ -17,6 +17,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.ipc;
|
package org.apache.hadoop.hbase.ipc;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.DaemonThreadFactory;
|
import org.apache.hadoop.hbase.DaemonThreadFactory;
|
||||||
|
|
||||||
|
@ -32,6 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
* This can be used for HMaster, where no prioritization is needed.
|
* This can be used for HMaster, where no prioritization is needed.
|
||||||
*/
|
*/
|
||||||
public class FifoRpcScheduler extends RpcScheduler {
|
public class FifoRpcScheduler extends RpcScheduler {
|
||||||
|
private static final Log LOG = LogFactory.getLog(FifoRpcScheduler.class);
|
||||||
private final int handlerCount;
|
private final int handlerCount;
|
||||||
private final int maxQueueLength;
|
private final int maxQueueLength;
|
||||||
private final AtomicInteger queueSize = new AtomicInteger(0);
|
private final AtomicInteger queueSize = new AtomicInteger(0);
|
||||||
|
@ -41,6 +44,8 @@ public class FifoRpcScheduler extends RpcScheduler {
|
||||||
this.handlerCount = handlerCount;
|
this.handlerCount = handlerCount;
|
||||||
this.maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH,
|
this.maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH,
|
||||||
handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
|
handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
|
||||||
|
LOG.info("Using " + this.getClass().getSimpleName() + " as user call queue; handlerCount=" +
|
||||||
|
handlerCount + "; maxQueueLength=" + maxQueueLength);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -34,8 +34,11 @@ import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
||||||
import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
|
import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A scheduler that maintains isolated handler pools for general,
|
* The default scheduler. Configurable. Maintains isolated handler pools for general ('default'),
|
||||||
* high-priority, and replication requests.
|
* high-priority ('priority'), and replication ('replication') requests. Default behavior is to
|
||||||
|
* balance the requests across handlers. Add configs to enable balancing by read vs writes, etc.
|
||||||
|
* See below article for explanation of options.
|
||||||
|
* @see <a href="http://blog.cloudera.com/blog/2014/12/new-in-cdh-5-2-improvements-for-running-multiple-workloads-on-a-single-hbase-cluster/">Overview on Request Queuing</a>
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
|
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
|
@ -49,7 +52,8 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
|
||||||
public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
|
public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
|
||||||
"hbase.ipc.server.callqueue.handler.factor";
|
"hbase.ipc.server.callqueue.handler.factor";
|
||||||
|
|
||||||
/** If set to 'deadline', uses a priority queue and deprioritize long-running scans */
|
/** If set to 'deadline', the default, uses a priority queue and deprioritizes long-running scans
|
||||||
|
*/
|
||||||
public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type";
|
public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type";
|
||||||
public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel";
|
public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel";
|
||||||
public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline";
|
public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline";
|
||||||
|
@ -190,54 +194,58 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
|
||||||
|
|
||||||
float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0);
|
float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0);
|
||||||
int numCallQueues = Math.max(1, (int)Math.round(handlerCount * callQueuesHandlersFactor));
|
int numCallQueues = Math.max(1, (int)Math.round(handlerCount * callQueuesHandlersFactor));
|
||||||
|
LOG.info("Using " + callQueueType + " as user call queue; numCallQueues=" + numCallQueues +
|
||||||
LOG.info("Using " + callQueueType + " as user call queue, count=" + numCallQueues);
|
"; callQReadShare=" + callqReadShare + ", callQScanShare=" + callqScanShare);
|
||||||
|
|
||||||
if (numCallQueues > 1 && callqReadShare > 0) {
|
if (numCallQueues > 1 && callqReadShare > 0) {
|
||||||
// multiple read/write queues
|
// multiple read/write queues
|
||||||
if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
|
if (isDeadlineQueueType(callQueueType)) {
|
||||||
CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
|
CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
|
||||||
callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues,
|
callExecutor = new RWQueueRpcExecutor("RWQ.default", handlerCount, numCallQueues,
|
||||||
callqReadShare, callqScanShare, maxQueueLength, conf, abortable,
|
callqReadShare, callqScanShare, maxQueueLength, conf, abortable,
|
||||||
BoundedPriorityBlockingQueue.class, callPriority);
|
BoundedPriorityBlockingQueue.class, callPriority);
|
||||||
} else if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) {
|
} else if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) {
|
||||||
Object[] callQueueInitArgs = {maxQueueLength, codelTargetDelay, codelInterval,
|
Object[] callQueueInitArgs = {maxQueueLength, codelTargetDelay, codelInterval,
|
||||||
codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches};
|
codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches};
|
||||||
callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount,
|
callExecutor = new RWQueueRpcExecutor("RWQ.default", handlerCount,
|
||||||
numCallQueues, callqReadShare, callqScanShare,
|
numCallQueues, callqReadShare, callqScanShare,
|
||||||
AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs,
|
AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs,
|
||||||
AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs);
|
AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs);
|
||||||
} else {
|
} else {
|
||||||
callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues,
|
callExecutor = new RWQueueRpcExecutor("RWQ.default", handlerCount, numCallQueues,
|
||||||
callqReadShare, callqScanShare, maxQueueLength, conf, abortable);
|
callqReadShare, callqScanShare, maxQueueLength, conf, abortable);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// multiple queues
|
// multiple queues
|
||||||
if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
|
if (isDeadlineQueueType(callQueueType)) {
|
||||||
CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
|
CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
|
||||||
callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues,
|
callExecutor =
|
||||||
conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
|
new BalancedQueueRpcExecutor("BalancedQ.default", handlerCount, numCallQueues,
|
||||||
|
conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
|
||||||
} else if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) {
|
} else if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) {
|
||||||
callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues,
|
callExecutor =
|
||||||
conf, abortable, AdaptiveLifoCoDelCallQueue.class, maxQueueLength,
|
new BalancedQueueRpcExecutor("BalancedQ.default", handlerCount, numCallQueues,
|
||||||
codelTargetDelay, codelInterval, codelLifoThreshold,
|
conf, abortable, AdaptiveLifoCoDelCallQueue.class, maxQueueLength,
|
||||||
numGeneralCallsDropped, numLifoModeSwitches);
|
codelTargetDelay, codelInterval, codelLifoThreshold,
|
||||||
|
numGeneralCallsDropped, numLifoModeSwitches);
|
||||||
} else {
|
} else {
|
||||||
callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount,
|
callExecutor = new BalancedQueueRpcExecutor("BalancedQ.default", handlerCount,
|
||||||
numCallQueues, maxQueueLength, conf, abortable);
|
numCallQueues, maxQueueLength, conf, abortable);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create 2 queues to help priorityExecutor be more scalable.
|
// Create 2 queues to help priorityExecutor be more scalable.
|
||||||
this.priorityExecutor = priorityHandlerCount > 0 ?
|
this.priorityExecutor = priorityHandlerCount > 0 ?
|
||||||
new BalancedQueueRpcExecutor("Priority", priorityHandlerCount, 2, maxPriorityQueueLength) :
|
new BalancedQueueRpcExecutor("BalancedQ.priority", priorityHandlerCount, 2,
|
||||||
null;
|
maxPriorityQueueLength):
|
||||||
|
null;
|
||||||
this.replicationExecutor =
|
this.replicationExecutor =
|
||||||
replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("Replication",
|
replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("BalancedQ.replication",
|
||||||
replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null;
|
replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static boolean isDeadlineQueueType(final String callQueueType) {
|
||||||
|
return callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
|
||||||
|
}
|
||||||
|
|
||||||
public SimpleRpcScheduler(
|
public SimpleRpcScheduler(
|
||||||
Configuration conf,
|
Configuration conf,
|
||||||
int handlerCount,
|
int handlerCount,
|
||||||
|
|
|
@ -0,0 +1,47 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.Abortable;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
|
||||||
|
import org.apache.hadoop.hbase.ipc.PriorityFunction;
|
||||||
|
import org.apache.hadoop.hbase.ipc.RpcScheduler;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Factory to use when you want to use the {@link FifoRpcScheduler}
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public class FifoRpcSchedulerFactory implements RpcSchedulerFactory {
|
||||||
|
@Override
|
||||||
|
public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
|
||||||
|
int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
|
||||||
|
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
|
||||||
|
return new FifoRpcScheduler(conf, handlerCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
|
@Override
|
||||||
|
public RpcScheduler create(Configuration conf, PriorityFunction priority) {
|
||||||
|
return create(conf, priority, null);
|
||||||
|
}
|
||||||
|
}
|
|
@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.ipc.RpcScheduler;
|
||||||
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
|
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public interface RpcSchedulerFactory {
|
public interface RpcSchedulerFactory {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs a {@link org.apache.hadoop.hbase.ipc.RpcScheduler}.
|
* Constructs a {@link org.apache.hadoop.hbase.ipc.RpcScheduler}.
|
||||||
*/
|
*/
|
||||||
|
@ -39,5 +38,4 @@ public interface RpcSchedulerFactory {
|
||||||
|
|
||||||
@Deprecated
|
@Deprecated
|
||||||
RpcScheduler create(Configuration conf, PriorityFunction priority);
|
RpcScheduler create(Configuration conf, PriorityFunction priority);
|
||||||
|
}
|
||||||
}
|
|
|
@ -27,11 +27,11 @@ import org.apache.hadoop.hbase.ipc.PriorityFunction;
|
||||||
import org.apache.hadoop.hbase.ipc.RpcScheduler;
|
import org.apache.hadoop.hbase.ipc.RpcScheduler;
|
||||||
import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler;
|
import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler;
|
||||||
|
|
||||||
/** Constructs a {@link SimpleRpcScheduler}. */
|
/** Constructs a {@link SimpleRpcScheduler}.
|
||||||
|
*/
|
||||||
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
|
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public class SimpleRpcSchedulerFactory implements RpcSchedulerFactory {
|
public class SimpleRpcSchedulerFactory implements RpcSchedulerFactory {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public RpcScheduler create(Configuration conf, PriorityFunction priority) {
|
public RpcScheduler create(Configuration conf, PriorityFunction priority) {
|
||||||
|
@ -42,7 +42,6 @@ public class SimpleRpcSchedulerFactory implements RpcSchedulerFactory {
|
||||||
public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
|
public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
|
||||||
int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
|
int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
|
||||||
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
|
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
|
||||||
|
|
||||||
return new SimpleRpcScheduler(
|
return new SimpleRpcScheduler(
|
||||||
conf,
|
conf,
|
||||||
handlerCount,
|
handlerCount,
|
||||||
|
@ -54,5 +53,4 @@ public class SimpleRpcSchedulerFactory implements RpcSchedulerFactory {
|
||||||
server,
|
server,
|
||||||
HConstants.QOS_THRESHOLD);
|
HConstants.QOS_THRESHOLD);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,71 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.CategoryBasedTimeout;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
|
||||||
|
import org.apache.hadoop.hbase.ipc.RpcScheduler;
|
||||||
|
import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.rules.TestName;
|
||||||
|
import org.junit.rules.TestRule;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A silly test that does nothing but make sure an rpcscheduler factory makes what it says
|
||||||
|
* it is going to make.
|
||||||
|
*/
|
||||||
|
@Category(SmallTests.class)
|
||||||
|
public class TestRpcSchedulerFactory {
|
||||||
|
@Rule public TestName testName = new TestName();
|
||||||
|
@ClassRule public static TestRule timeout =
|
||||||
|
CategoryBasedTimeout.forClass(TestRpcSchedulerFactory.class);
|
||||||
|
private Configuration conf;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
this.conf = HBaseConfiguration.create();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRWQ() {
|
||||||
|
// Set some configs just to see how it changes the scheduler. Can't assert the settings had
|
||||||
|
// an effect. Just eyeball the log.
|
||||||
|
this.conf.setDouble(SimpleRpcScheduler.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5);
|
||||||
|
this.conf.setDouble(SimpleRpcScheduler.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.5);
|
||||||
|
this.conf.setDouble(SimpleRpcScheduler.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5);
|
||||||
|
RpcSchedulerFactory factory = new SimpleRpcSchedulerFactory();
|
||||||
|
RpcScheduler rpcScheduler = factory.create(this.conf, null, null);
|
||||||
|
assertTrue(rpcScheduler.getClass().equals(SimpleRpcScheduler.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFifo() {
|
||||||
|
RpcSchedulerFactory factory = new FifoRpcSchedulerFactory();
|
||||||
|
RpcScheduler rpcScheduler = factory.create(this.conf, null, null);
|
||||||
|
assertTrue(rpcScheduler.getClass().equals(FifoRpcScheduler.class));
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue