HBASE-17088 Refactor RWQueueRpcExecutor/BalancedQueueRpcExecutor/RpcExecutor

Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
Guanghao Zhang 2016-11-19 09:49:09 +08:00 committed by zhangduo
parent 26e3164481
commit 3c45ff08d9
8 changed files with 373 additions and 277 deletions

View File

@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hbase.ipc;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@ -41,34 +39,52 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
public class BalancedQueueRpcExecutor extends RpcExecutor {
private static final Log LOG = LogFactory.getLog(BalancedQueueRpcExecutor.class);
protected final List<BlockingQueue<CallRunner>> queues;
private final QueueBalancer balancer;
public BalancedQueueRpcExecutor(final String name, final int handlerCount,
final int maxQueueLength, final PriorityFunction priority, final Configuration conf,
final Abortable abortable) {
this(name, handlerCount, conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_CONF_DEFAULT),
maxQueueLength, priority, conf, abortable);
}
public BalancedQueueRpcExecutor(final String name, final int handlerCount,
final String callQueueType, final int maxQueueLength, final PriorityFunction priority,
final Configuration conf, final Abortable abortable) {
super(name, handlerCount, callQueueType, maxQueueLength, priority, conf, abortable);
this.balancer = getBalancer(this.numCallQueues);
initializeQueues(this.numCallQueues);
}
@Deprecated
public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final int maxQueueLength) {
this(name, handlerCount, numQueues, maxQueueLength, null, null);
}
@Deprecated
public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final int maxQueueLength, final Configuration conf, final Abortable abortable) {
this(name, handlerCount, numQueues, conf, abortable, LinkedBlockingQueue.class, maxQueueLength);
}
@Deprecated
public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final Class<? extends BlockingQueue> queueClass, Object... initargs) {
this(name, handlerCount, numQueues, null, null, queueClass, initargs);
}
@Deprecated
public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final Configuration conf, final Abortable abortable,
final Class<? extends BlockingQueue> queueClass, Object... initargs) {
super(name, Math.max(handlerCount, numQueues), conf, abortable);
queues = new ArrayList<BlockingQueue<CallRunner>>(numQueues);
super(name, Math.max(handlerCount, numQueues), numQueues, conf, abortable);
this.balancer = getBalancer(numQueues);
initializeQueues(numQueues, queueClass, initargs);
LOG.debug(name + " queues=" + numQueues + " handlerCount=" + handlerCount);
}
@Deprecated
protected void initializeQueues(final int numQueues,
final Class<? extends BlockingQueue> queueClass, Object... initargs) {
if (initargs.length > 0) {
@ -90,18 +106,4 @@ public class BalancedQueueRpcExecutor extends RpcExecutor {
}
return queue.offer(callTask);
}
@Override
public int getQueueLength() {
int length = 0;
for (final BlockingQueue<CallRunner> queue : queues) {
length += queue.size();
}
return length;
}
@Override
public List<BlockingQueue<CallRunner>> getQueues() {
return queues;
}
}

View File

@ -20,9 +20,7 @@ package org.apache.hadoop.hbase.ipc;
import java.util.Deque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
@ -45,19 +43,16 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
private final Deque<FastPathHandler> fastPathHandlerStack = new ConcurrentLinkedDeque<>();
public FastPathBalancedQueueRpcExecutor(final String name, final int handlerCount,
final int numQueues, final int maxQueueLength, final Configuration conf,
final Abortable abortable) {
super(name, handlerCount, numQueues, conf, abortable, LinkedBlockingQueue.class,
maxQueueLength);
final int maxQueueLength, final PriorityFunction priority, final Configuration conf,
final Abortable abortable) {
super(name, handlerCount, maxQueueLength, priority, conf, abortable);
}
public FastPathBalancedQueueRpcExecutor(String name, int handlerCount,
int numCallQueues,
Configuration conf,
Abortable abortable,
Class<? extends BlockingQueue> queueClass,
Object... args) {
super(name, handlerCount, numCallQueues, conf, abortable, queueClass, args);
public FastPathBalancedQueueRpcExecutor(final String name, final int handlerCount,
final String callQueueType, final int maxQueueLength, final PriorityFunction priority,
final Configuration conf, final Abortable abortable) {
super(name, handlerCount, callQueueType, maxQueueLength, priority, conf, abortable);
}
@Override

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.ipc;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@ -52,7 +51,11 @@ import com.google.protobuf.Message;
public class RWQueueRpcExecutor extends RpcExecutor {
private static final Log LOG = LogFactory.getLog(RWQueueRpcExecutor.class);
private final List<BlockingQueue<CallRunner>> queues;
public static final String CALL_QUEUE_READ_SHARE_CONF_KEY =
"hbase.ipc.server.callqueue.read.ratio";
public static final String CALL_QUEUE_SCAN_SHARE_CONF_KEY =
"hbase.ipc.server.callqueue.scan.ratio";
private final QueueBalancer writeBalancer;
private final QueueBalancer readBalancer;
private final QueueBalancer scanBalancer;
@ -63,6 +66,49 @@ public class RWQueueRpcExecutor extends RpcExecutor {
private final int numReadQueues;
private final int numScanQueues;
public RWQueueRpcExecutor(final String name, final int handlerCount, final int maxQueueLength,
final PriorityFunction priority, final Configuration conf, final Abortable abortable) {
super(name, handlerCount, maxQueueLength, priority, conf, abortable);
float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
float callqScanShare = conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
numWriteQueues = calcNumWriters(this.numCallQueues, callqReadShare);
writeHandlersCount = Math.max(numWriteQueues, calcNumWriters(handlerCount, callqReadShare));
int readQueues = calcNumReaders(this.numCallQueues, callqReadShare);
int readHandlers = Math.max(readQueues, calcNumReaders(handlerCount, callqReadShare));
int scanQueues = Math.max(0, (int)Math.floor(readQueues * callqScanShare));
int scanHandlers = Math.max(0, (int)Math.floor(readHandlers * callqScanShare));
if ((readQueues - scanQueues) > 0) {
readQueues -= scanQueues;
readHandlers -= scanHandlers;
} else {
scanQueues = 0;
scanHandlers = 0;
}
numReadQueues = readQueues;
readHandlersCount = readHandlers;
numScanQueues = scanQueues;
scanHandlersCount = scanHandlers;
this.writeBalancer = getBalancer(numWriteQueues);
this.readBalancer = getBalancer(numReadQueues);
this.scanBalancer = numScanQueues > 0 ? getBalancer(numScanQueues) : null;
initializeQueues(numWriteQueues);
initializeQueues(numReadQueues);
initializeQueues(numScanQueues);
LOG.info(getName() + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount
+ " readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount + " scanQueues="
+ numScanQueues + " scanHandlers=" + scanHandlersCount);
}
@Deprecated
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final float readShare, final int maxQueueLength,
final Configuration conf, final Abortable abortable) {
@ -70,11 +116,13 @@ public class RWQueueRpcExecutor extends RpcExecutor {
conf, abortable, LinkedBlockingQueue.class);
}
@Deprecated
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final float readShare, final float scanShare, final int maxQueueLength) {
this(name, handlerCount, numQueues, readShare, scanShare, maxQueueLength, null, null);
}
@Deprecated
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final float readShare, final float scanShare, final int maxQueueLength,
final Configuration conf, final Abortable abortable) {
@ -82,6 +130,7 @@ public class RWQueueRpcExecutor extends RpcExecutor {
conf, abortable, LinkedBlockingQueue.class);
}
@Deprecated
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final float readShare, final int maxQueueLength,
final Configuration conf, final Abortable abortable,
@ -90,6 +139,7 @@ public class RWQueueRpcExecutor extends RpcExecutor {
readQueueClass, readQueueInitArgs);
}
@Deprecated
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final float readShare, final float scanShare, final int maxQueueLength,
final Configuration conf, final Abortable abortable,
@ -100,6 +150,7 @@ public class RWQueueRpcExecutor extends RpcExecutor {
readQueueClass, ArrayUtils.addAll(new Object[] {maxQueueLength}, readQueueInitArgs));
}
@Deprecated
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final float readShare, final float scanShare,
final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
@ -110,6 +161,7 @@ public class RWQueueRpcExecutor extends RpcExecutor {
readQueueClass, readQueueInitArgs);
}
@Deprecated
public RWQueueRpcExecutor(final String name, final int writeHandlers, final int readHandlers,
final int numWriteQueues, final int numReadQueues,
final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
@ -118,14 +170,19 @@ public class RWQueueRpcExecutor extends RpcExecutor {
writeQueueClass, writeQueueInitArgs, readQueueClass, readQueueInitArgs);
}
@Deprecated
public RWQueueRpcExecutor(final String name, int writeHandlers, int readHandlers,
int numWriteQueues, int numReadQueues, float scanShare,
final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
final Class<? extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
super(name, Math.max(writeHandlers, numWriteQueues) + Math.max(readHandlers, numReadQueues));
super(name, Math.max(writeHandlers, numWriteQueues) + Math.max(readHandlers, numReadQueues),
numWriteQueues + numReadQueues);
int numScanQueues = Math.max(0, (int)Math.floor(numReadQueues * scanShare));
int scanHandlers = Math.max(0, (int)Math.floor(readHandlers * scanShare));
this.writeHandlersCount = Math.max(writeHandlers, numWriteQueues);
this.numWriteQueues = numWriteQueues;
int numScanQueues = Math.max(0, (int) Math.floor(numReadQueues * scanShare));
int scanHandlers = Math.max(0, (int) Math.floor(readHandlers * scanShare));
if ((numReadQueues - numScanQueues) > 0) {
numReadQueues -= numScanQueues;
readHandlers -= scanHandlers;
@ -134,17 +191,15 @@ public class RWQueueRpcExecutor extends RpcExecutor {
scanHandlers = 0;
}
this.writeHandlersCount = Math.max(writeHandlers, numWriteQueues);
this.readHandlersCount = Math.max(readHandlers, numReadQueues);
this.scanHandlersCount = Math.max(scanHandlers, numScanQueues);
this.numWriteQueues = numWriteQueues;
this.numReadQueues = numReadQueues;
this.numScanQueues = numScanQueues;
this.writeBalancer = getBalancer(numWriteQueues);
this.readBalancer = getBalancer(numReadQueues);
this.scanBalancer = numScanQueues > 0 ? getBalancer(numScanQueues) : null;
queues = new ArrayList<BlockingQueue<CallRunner>>(numWriteQueues + numReadQueues + numScanQueues);
LOG.info(name + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount
+ " readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount + " scanQueues="
+ numScanQueues + " scanHandlers=" + scanHandlersCount);
@ -155,8 +210,8 @@ public class RWQueueRpcExecutor extends RpcExecutor {
DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT);
}
for (int i = 0; i < numWriteQueues; ++i) {
queues.add((BlockingQueue<CallRunner>)
ReflectionUtils.newInstance(writeQueueClass, writeQueueInitArgs));
queues.add((BlockingQueue<CallRunner>) ReflectionUtils.newInstance(writeQueueClass,
writeQueueInitArgs));
}
if (readQueueInitArgs.length > 0) {
@ -165,11 +220,17 @@ public class RWQueueRpcExecutor extends RpcExecutor {
DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT);
}
for (int i = 0; i < (numReadQueues + numScanQueues); ++i) {
queues.add((BlockingQueue<CallRunner>)
ReflectionUtils.newInstance(readQueueClass, readQueueInitArgs));
queues.add((BlockingQueue<CallRunner>) ReflectionUtils.newInstance(readQueueClass,
readQueueInitArgs));
}
}
@Override
protected int computeNumCallQueues(final int handlerCount, final float callQueuesHandlersFactor) {
// at least 1 read queue and 1 write queue
return Math.max(2, (int) Math.round(handlerCount * callQueuesHandlersFactor));
}
@Override
protected void startHandlers(final int port) {
startHandlers(".write", writeHandlersCount, queues, 0, numWriteQueues, port);
@ -241,20 +302,6 @@ public class RWQueueRpcExecutor extends RpcExecutor {
return false;
}
@Override
public int getQueueLength() {
int length = 0;
for (final BlockingQueue<CallRunner> queue: queues) {
length += queue.size();
}
return length;
}
@Override
protected List<BlockingQueue<CallRunner>> getQueues() {
return queues;
}
/*
* Calculate the number of writers based on the "total count" and the read share.
* You'll get at least one writer.

View File

@ -18,13 +18,15 @@
package org.apache.hadoop.hbase.ipc;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -33,6 +35,8 @@ import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Preconditions;
@ -47,33 +51,137 @@ public abstract class RpcExecutor {
private static final Log LOG = LogFactory.getLog(RpcExecutor.class);
protected static final int DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT = 250;
public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = "hbase.ipc.server.callqueue.handler.factor";
/** max delay in msec used to bound the deprioritized requests */
public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY = "hbase.ipc.server.queue.max.call.delay";
/**
* The default, 'fifo', has the least friction but is dumb. If set to 'deadline', uses a priority
* queue and deprioritizes long-running scans. Sorting by priority comes at a cost, reduced
* throughput.
*/
public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel";
public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline";
public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo";
public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type";
public static final String CALL_QUEUE_TYPE_CONF_DEFAULT = CALL_QUEUE_TYPE_FIFO_CONF_VALUE;
// These 3 are only used by Codel executor
public static final String CALL_QUEUE_CODEL_TARGET_DELAY = "hbase.ipc.server.callqueue.codel.target.delay";
public static final String CALL_QUEUE_CODEL_INTERVAL = "hbase.ipc.server.callqueue.codel.interval";
public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD = "hbase.ipc.server.callqueue.codel.lifo.threshold";
public static final int CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY = 100;
public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL = 100;
public static final double CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD = 0.8;
private AtomicLong numGeneralCallsDropped = new AtomicLong();
private AtomicLong numLifoModeSwitches = new AtomicLong();
protected final int numCallQueues;
protected final List<BlockingQueue<CallRunner>> queues;
private final Class<? extends BlockingQueue> queueClass;
private final Object[] queueInitArgs;
private final PriorityFunction priority;
protected volatile int currentQueueLimit;
private final AtomicInteger activeHandlerCount = new AtomicInteger(0);
private final List<Handler> handlers;
private final int handlerCount;
private final String name;
private final AtomicInteger failedHandlerCount = new AtomicInteger(0);
private String name;
private boolean running;
private Configuration conf = null;
private Abortable abortable = null;
public RpcExecutor(final String name, final int handlerCount) {
@Deprecated
public RpcExecutor(final String name, final int handlerCount, final int numCallQueues) {
this.name = Strings.nullToEmpty(name);
this.handlers = new ArrayList<Handler>(handlerCount);
this.handlerCount = handlerCount;
this.name = Strings.nullToEmpty(name);
this.numCallQueues = numCallQueues;
this.queues = new ArrayList<>(this.numCallQueues);
this.queueClass = null;
this.queueInitArgs = new Object[0];
this.priority = null;
}
public RpcExecutor(final String name, final int handlerCount, final Configuration conf,
final Abortable abortable) {
this(name, handlerCount);
@Deprecated
public RpcExecutor(final String name, final int handlerCount, final int numCallQueues,
final Configuration conf, final Abortable abortable) {
this(name, handlerCount, numCallQueues);
this.conf = conf;
this.abortable = abortable;
}
public RpcExecutor(final String name, final int handlerCount, final int maxQueueLength,
final PriorityFunction priority, final Configuration conf, final Abortable abortable) {
this(name, handlerCount, conf.get(CALL_QUEUE_TYPE_CONF_KEY,
CALL_QUEUE_TYPE_CONF_DEFAULT), maxQueueLength, priority, conf, abortable);
}
public RpcExecutor(final String name, final int handlerCount, final String callQueueType,
final int maxQueueLength, final PriorityFunction priority, final Configuration conf,
final Abortable abortable) {
this.name = Strings.nullToEmpty(name);
this.conf = conf;
this.abortable = abortable;
float callQueuesHandlersFactor = this.conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0);
this.numCallQueues = computeNumCallQueues(handlerCount, callQueuesHandlersFactor);
this.queues = new ArrayList<>(this.numCallQueues);
this.handlerCount = Math.max(handlerCount, this.numCallQueues);
this.handlers = new ArrayList<>(this.handlerCount);
this.priority = priority;
if (isDeadlineQueueType(callQueueType)) {
this.name += ".Deadline";
this.queueInitArgs = new Object[] { maxQueueLength,
new CallPriorityComparator(conf, this.priority) };
this.queueClass = BoundedPriorityBlockingQueue.class;
} else if (isCodelQueueType(callQueueType)) {
this.name += ".Codel";
int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY,
CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY);
int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL, CALL_QUEUE_CODEL_DEFAULT_INTERVAL);
double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD,
CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD);
queueInitArgs = new Object[] { maxQueueLength, codelTargetDelay, codelInterval,
codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches };
queueClass = AdaptiveLifoCoDelCallQueue.class;
} else {
this.name += ".Fifo";
queueInitArgs = new Object[] { maxQueueLength };
queueClass = LinkedBlockingQueue.class;
}
LOG.info("RpcExecutor " + " name " + " using " + callQueueType
+ " as call queue; numCallQueues=" + numCallQueues + "; maxQueueLength=" + maxQueueLength
+ "; handlerCount=" + handlerCount);
}
protected int computeNumCallQueues(final int handlerCount, final float callQueuesHandlersFactor) {
return Math.max(1, (int) Math.round(handlerCount * callQueuesHandlersFactor));
}
protected void initializeQueues(final int numQueues) {
if (queueInitArgs.length > 0) {
currentQueueLimit = (int) queueInitArgs[0];
queueInitArgs[0] = Math.max((int) queueInitArgs[0], DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT);
}
for (int i = 0; i < numQueues; ++i) {
queues
.add((BlockingQueue<CallRunner>) ReflectionUtils.newInstance(queueClass, queueInitArgs));
}
}
public void start(final int port) {
running = true;
startHandlers(port);
@ -91,13 +199,21 @@ public abstract class RpcExecutor {
}
/** Returns the length of the pending queue */
public abstract int getQueueLength();
public int getQueueLength() {
int length = 0;
for (final BlockingQueue<CallRunner> queue: queues) {
length += queue.size();
}
return length;
}
/** Add the request to the executor queue */
public abstract boolean dispatch(final CallRunner callTask) throws InterruptedException;
/** Returns the list of request queues */
protected abstract List<BlockingQueue<CallRunner>> getQueues();
public List<BlockingQueue<CallRunner>> getQueues() {
return queues;
}
protected void startHandlers(final int port) {
List<BlockingQueue<CallRunner>> callQueues = getQueues();
@ -116,16 +232,16 @@ public abstract class RpcExecutor {
* Start up our handlers.
*/
protected void startHandlers(final String nameSuffix, final int numHandlers,
final List<BlockingQueue<CallRunner>> callQueues,
final int qindex, final int qsize, final int port) {
final List<BlockingQueue<CallRunner>> callQueues, final int qindex, final int qsize,
final int port) {
final String threadPrefix = name + Strings.nullToEmpty(nameSuffix);
double handlerFailureThreshhold =
conf == null ? 1.0 : conf.getDouble(HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT);
double handlerFailureThreshhold = conf == null ? 1.0 : conf.getDouble(
HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT);
for (int i = 0; i < numHandlers; i++) {
final int index = qindex + (i % qsize);
String name = "RpcServer." + threadPrefix + ".handler=" + handlers.size() + ",queue=" +
index + ",port=" + port;
String name = "RpcServer." + threadPrefix + ".handler=" + handlers.size() + ",queue=" + index
+ ",port=" + port;
Handler handler = getHandler(name, handlerFailureThreshhold, callQueues.get(index));
handler.start();
LOG.debug("Started " + name);
@ -190,15 +306,15 @@ public abstract class RpcExecutor {
} catch (Throwable e) {
if (e instanceof Error) {
int failedCount = failedHandlerCount.incrementAndGet();
if (this.handlerFailureThreshhold >= 0 &&
failedCount > handlerCount * this.handlerFailureThreshhold) {
String message = "Number of failed RpcServer handler runs exceeded threshhold " +
this.handlerFailureThreshhold + "; reason: " + StringUtils.stringifyException(e);
if (this.handlerFailureThreshhold >= 0
&& failedCount > handlerCount * this.handlerFailureThreshhold) {
String message = "Number of failed RpcServer handler runs exceeded threshhold "
+ this.handlerFailureThreshhold + "; reason: " + StringUtils.stringifyException(e);
if (abortable != null) {
abortable.abort(message, e);
} else {
LOG.error("Error but can't abort because abortable is null: " +
StringUtils.stringifyException(e));
LOG.error("Error but can't abort because abortable is null: "
+ StringUtils.stringifyException(e));
throw e;
}
} else {
@ -254,6 +370,59 @@ public abstract class RpcExecutor {
}
}
/**
* Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY is set to true. It
* uses the calculated "deadline" e.g. to deprioritize long-running job If multiple requests have
* the same deadline BoundedPriorityBlockingQueue will order them in FIFO (first-in-first-out)
* manner.
*/
private static class CallPriorityComparator implements Comparator<CallRunner> {
private final static int DEFAULT_MAX_CALL_DELAY = 5000;
private final PriorityFunction priority;
private final int maxDelay;
public CallPriorityComparator(final Configuration conf, final PriorityFunction priority) {
this.priority = priority;
this.maxDelay = conf.getInt(QUEUE_MAX_CALL_DELAY_CONF_KEY, DEFAULT_MAX_CALL_DELAY);
}
@Override
public int compare(CallRunner a, CallRunner b) {
RpcServer.Call callA = a.getCall();
RpcServer.Call callB = b.getCall();
long deadlineA = priority.getDeadline(callA.getHeader(), callA.param);
long deadlineB = priority.getDeadline(callB.getHeader(), callB.param);
deadlineA = callA.timestamp + Math.min(deadlineA, maxDelay);
deadlineB = callB.timestamp + Math.min(deadlineB, maxDelay);
return Long.compare(deadlineA, deadlineB);
}
}
public static boolean isDeadlineQueueType(final String callQueueType) {
return callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
}
public static boolean isCodelQueueType(final String callQueueType) {
return callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE);
}
public static boolean isFifoQueueType(final String callQueueType) {
return callQueueType.equals(CALL_QUEUE_TYPE_FIFO_CONF_VALUE);
}
public long getNumGeneralCallsDropped() {
return numGeneralCallsDropped.get();
}
public long getNumLifoModeSwitches() {
return numLifoModeSwitches.get();
}
public String getName() {
return this.name;
}
/**
* Update current soft limit for executor's call queues
* @param conf updated configuration
@ -265,4 +434,20 @@ public abstract class RpcExecutor {
}
currentQueueLimit = conf.getInt(configKey, currentQueueLimit);
}
}
public void onConfigurationChange(Configuration conf) {
// update CoDel Scheduler tunables
int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY,
CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY);
int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL, CALL_QUEUE_CODEL_DEFAULT_INTERVAL);
double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD,
CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD);
for (BlockingQueue<CallRunner> queue : queues) {
if (queue instanceof AdaptiveLifoCoDelCallQueue) {
((AdaptiveLifoCoDelCallQueue) queue).updateTunables(codelTargetDelay, codelInterval,
codelLifoThreshold);
}
}
}
}

View File

@ -17,11 +17,6 @@
*/
package org.apache.hadoop.hbase.ipc;
import java.util.Comparator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -31,7 +26,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
/**
* The default scheduler. Configurable. Maintains isolated handler pools for general ('default'),
@ -45,106 +39,6 @@ import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObserver {
private static final Log LOG = LogFactory.getLog(SimpleRpcScheduler.class);
public static final String CALL_QUEUE_READ_SHARE_CONF_KEY =
"hbase.ipc.server.callqueue.read.ratio";
public static final String CALL_QUEUE_SCAN_SHARE_CONF_KEY =
"hbase.ipc.server.callqueue.scan.ratio";
public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
"hbase.ipc.server.callqueue.handler.factor";
/**
* The default, 'fifo', has the least friction but is dumb.
* If set to 'deadline', uses a priority queue and deprioritizes long-running scans. Sorting by
* priority comes at a cost, reduced throughput.
*/
public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel";
public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline";
public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo";
public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type";
public static final String CALL_QUEUE_TYPE_CONF_DEFAULT = CALL_QUEUE_TYPE_FIFO_CONF_VALUE;
/** max delay in msec used to bound the deprioritized requests */
public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY
= "hbase.ipc.server.queue.max.call.delay";
// These 3 are only used by Codel executor
public static final String CALL_QUEUE_CODEL_TARGET_DELAY =
"hbase.ipc.server.callqueue.codel.target.delay";
public static final String CALL_QUEUE_CODEL_INTERVAL =
"hbase.ipc.server.callqueue.codel.interval";
public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD =
"hbase.ipc.server.callqueue.codel.lifo.threshold";
public static final int CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY = 100;
public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL = 100;
public static final double CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD = 0.8;
private AtomicLong numGeneralCallsDropped = new AtomicLong();
private AtomicLong numLifoModeSwitches = new AtomicLong();
/**
* Resize call queues;
* @param conf new configuration
*/
@Override
public void onConfigurationChange(Configuration conf) {
callExecutor.resizeQueues(conf);
if (priorityExecutor != null) {
priorityExecutor.resizeQueues(conf);
}
if (replicationExecutor != null) {
replicationExecutor.resizeQueues(conf);
}
String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_CONF_DEFAULT);
if (isCodelQueueType(callQueueType)) {
// update CoDel Scheduler tunables
int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY,
CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY);
int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL,
CALL_QUEUE_CODEL_DEFAULT_INTERVAL);
double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD,
CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD);
for (BlockingQueue<CallRunner> queue : callExecutor.getQueues()) {
if (queue instanceof AdaptiveLifoCoDelCallQueue) {
((AdaptiveLifoCoDelCallQueue) queue).updateTunables(codelTargetDelay,
codelInterval, codelLifoThreshold);
}
}
}
}
/**
* Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY is set to true.
* It uses the calculated "deadline" e.g. to deprioritize long-running job
*
* If multiple requests have the same deadline BoundedPriorityBlockingQueue will order them in
* FIFO (first-in-first-out) manner.
*/
private static class CallPriorityComparator implements Comparator<CallRunner> {
private final static int DEFAULT_MAX_CALL_DELAY = 5000;
private final PriorityFunction priority;
private final int maxDelay;
public CallPriorityComparator(final Configuration conf, final PriorityFunction priority) {
this.priority = priority;
this.maxDelay = conf.getInt(QUEUE_MAX_CALL_DELAY_CONF_KEY, DEFAULT_MAX_CALL_DELAY);
}
@Override
public int compare(CallRunner a, CallRunner b) {
RpcServer.Call callA = a.getCall();
RpcServer.Call callB = b.getCall();
long deadlineA = priority.getDeadline(callA.getHeader(), callA.param);
long deadlineB = priority.getDeadline(callB.getHeader(), callB.param);
deadlineA = callA.timestamp + Math.min(deadlineA, maxDelay);
deadlineB = callB.timestamp + Math.min(deadlineB, maxDelay);
return Long.compare(deadlineA, deadlineB);
}
}
private int port;
private final PriorityFunction priority;
private final RpcExecutor callExecutor;
@ -182,84 +76,37 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
this.highPriorityLevel = highPriorityLevel;
this.abortable = server;
String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_CONF_DEFAULT);
float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
float callqScanShare = conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
String callQueueType = conf.get(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
RpcExecutor.CALL_QUEUE_TYPE_CONF_DEFAULT);
float callqReadShare = conf.getFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY,
CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY);
int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL,
CALL_QUEUE_CODEL_DEFAULT_INTERVAL);
double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD,
CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD);
float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0);
int numCallQueues = Math.max(1, (int)Math.round(handlerCount * callQueuesHandlersFactor));
LOG.info("Using " + callQueueType + " as user call queue; numCallQueues=" + numCallQueues +
"; callQReadShare=" + callqReadShare + ", callQScanShare=" + callqScanShare);
if (numCallQueues > 1 && callqReadShare > 0) {
// multiple read/write queues
if (isDeadlineQueueType(callQueueType)) {
CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
callExecutor = new RWQueueRpcExecutor("RW.deadline.Q", handlerCount, numCallQueues,
callqReadShare, callqScanShare, maxQueueLength, conf, abortable,
BoundedPriorityBlockingQueue.class, callPriority);
} else if (isCodelQueueType(callQueueType)) {
Object[] callQueueInitArgs = {maxQueueLength, codelTargetDelay, codelInterval,
codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches};
callExecutor = new RWQueueRpcExecutor("RW.codel.Q", handlerCount,
numCallQueues, callqReadShare, callqScanShare,
AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs,
AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs);
} else {
callExecutor = new RWQueueRpcExecutor("RW.fifo.Q", handlerCount, numCallQueues,
callqReadShare, callqScanShare, maxQueueLength, conf, abortable);
}
if (callqReadShare > 0) {
// at least 1 read handler and 1 write handler
callExecutor = new RWQueueRpcExecutor("deafult.RWQ", Math.max(2, handlerCount),
maxQueueLength, priority, conf, server);
} else {
// multiple queues
if (isDeadlineQueueType(callQueueType)) {
CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
callExecutor =
new BalancedQueueRpcExecutor("BQDeadline.default", handlerCount, numCallQueues,
conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
} else if (isCodelQueueType(callQueueType)) {
callExecutor =
new FastPathBalancedQueueRpcExecutor("CodelFPBQ.default", handlerCount, numCallQueues,
conf, abortable, AdaptiveLifoCoDelCallQueue.class, maxQueueLength,
codelTargetDelay, codelInterval, codelLifoThreshold,
numGeneralCallsDropped, numLifoModeSwitches);
} else {
// FifoWFPBQ = FastPathBalancedQueueRpcExecutor
callExecutor = new FastPathBalancedQueueRpcExecutor("FifoWFPBQ.default",
handlerCount, numCallQueues, maxQueueLength, conf, abortable);
if (RpcExecutor.isFifoQueueType(callQueueType)) {
callExecutor = new FastPathBalancedQueueRpcExecutor("deafult.FPBQ", handlerCount,
maxPriorityQueueLength, priority, conf, server);
} else {
callExecutor = new BalancedQueueRpcExecutor("deafult.BQ", handlerCount, maxQueueLength,
priority, conf, server);
}
}
// Create 2 queues to help priorityExecutor be more scalable.
this.priorityExecutor = priorityHandlerCount > 0?
new FastPathBalancedQueueRpcExecutor("FifoWFPBQ.priority", priorityHandlerCount,
2, maxPriorityQueueLength, conf, abortable): null;
this.replicationExecutor = replicationHandlerCount > 0?
new FastPathBalancedQueueRpcExecutor("FifoWFPBQ.replication",
replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null;
this.priorityExecutor = priorityHandlerCount > 0 ? new FastPathBalancedQueueRpcExecutor(
"priority.FPBQ", priorityHandlerCount, RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE,
maxPriorityQueueLength, priority, conf, abortable) : null;
this.replicationExecutor = replicationHandlerCount > 0 ? new FastPathBalancedQueueRpcExecutor(
"replication.FPBQ", replicationHandlerCount, RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE,
maxQueueLength, priority, conf, abortable) : null;
}
private static boolean isDeadlineQueueType(final String callQueueType) {
return callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
}
private static boolean isCodelQueueType(final String callQueueType) {
return callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE);
}
public SimpleRpcScheduler(
Configuration conf,
int handlerCount,
int priorityHandlerCount,
int replicationHandlerCount,
PriorityFunction priority,
int highPriorityLevel) {
this(conf, handlerCount, priorityHandlerCount, replicationHandlerCount, priority,
null, highPriorityLevel);
public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount,
int replicationHandlerCount, PriorityFunction priority, int highPriorityLevel) {
this(conf, handlerCount, priorityHandlerCount, replicationHandlerCount, priority, null,
highPriorityLevel);
}
@Override
@ -294,6 +141,23 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
}
}
@Override
public void onConfigurationChange(Configuration conf) {
callExecutor.resizeQueues(conf);
if (priorityExecutor != null) {
priorityExecutor.resizeQueues(conf);
}
if (replicationExecutor != null) {
replicationExecutor.resizeQueues(conf);
}
String callQueueType = conf.get(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
RpcExecutor.CALL_QUEUE_TYPE_CONF_DEFAULT);
if (RpcExecutor.isCodelQueueType(callQueueType)) {
callExecutor.onConfigurationChange(conf);
}
}
@Override
public int getGeneralQueueLength() {
return callExecutor.getQueueLength();
@ -318,12 +182,12 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
@Override
public long getNumGeneralCallsDropped() {
return numGeneralCallsDropped.get();
return callExecutor.getNumGeneralCallsDropped();
}
@Override
public long getNumLifoModeSwitches() {
return numLifoModeSwitches.get();
return callExecutor.getNumLifoModeSwitches();
}
}

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.ipc.RpcExecutor;
import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -75,7 +76,7 @@ public class TestFastFail {
public static void setUpBeforeClass() throws Exception {
// Just to prevent fastpath FIFO from picking calls up bypassing the queue.
TEST_UTIL.getConfiguration().set(
SimpleRpcScheduler.CALL_QUEUE_TYPE_CONF_KEY, "deadline");
RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, "deadline");
TEST_UTIL.startMiniCluster(SLAVES);
}

View File

@ -164,13 +164,13 @@ public class TestSimpleRpcScheduler {
@Test
public void testRpcScheduler() throws Exception {
testRpcScheduler(SimpleRpcScheduler.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
testRpcScheduler(SimpleRpcScheduler.CALL_QUEUE_TYPE_FIFO_CONF_VALUE);
testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE);
}
private void testRpcScheduler(final String queueType) throws Exception {
Configuration schedConf = HBaseConfiguration.create();
schedConf.set(SimpleRpcScheduler.CALL_QUEUE_TYPE_CONF_KEY, queueType);
schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
PriorityFunction priority = mock(PriorityFunction.class);
when(priority.getPriority(any(RequestHeader.class),
@ -234,9 +234,9 @@ public class TestSimpleRpcScheduler {
// -> [small small small huge small large small small]
// -> NO REORDER [10 10 10 100 10 50 10 10] -> 930 (FIFO Queue)
// -> WITH REORDER [10 10 10 10 10 10 50 100] -> 530 (Deadline Queue)
if (queueType.equals(SimpleRpcScheduler.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
assertEquals(530, totalTime);
} else if (queueType.equals(SimpleRpcScheduler.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)) {
} else if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)) {
assertEquals(930, totalTime);
}
} finally {
@ -247,9 +247,9 @@ public class TestSimpleRpcScheduler {
@Test
public void testScanQueueWithZeroScanRatio() throws Exception {
Configuration schedConf = HBaseConfiguration.create();
schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f);
schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0f);
schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f);
schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0f);
PriorityFunction priority = mock(PriorityFunction.class);
when(priority.getPriority(any(RequestHeader.class), any(Message.class),
@ -263,9 +263,9 @@ public class TestSimpleRpcScheduler {
@Test
public void testScanQueues() throws Exception {
Configuration schedConf = HBaseConfiguration.create();
schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f);
schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);
schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f);
schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);
PriorityFunction priority = mock(PriorityFunction.class);
when(priority.getPriority(any(RequestHeader.class), any(Message.class),
@ -415,8 +415,8 @@ public class TestSimpleRpcScheduler {
Configuration schedConf = HBaseConfiguration.create();
schedConf.setInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 250);
schedConf.set(SimpleRpcScheduler.CALL_QUEUE_TYPE_CONF_KEY,
SimpleRpcScheduler.CALL_QUEUE_TYPE_CODEL_CONF_VALUE);
schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE);
PriorityFunction priority = mock(PriorityFunction.class);
when(priority.getPriority(any(RPCProtos.RequestHeader.class), any(Message.class),

View File

@ -23,6 +23,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CategoryBasedTimeout;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
import org.apache.hadoop.hbase.ipc.RWQueueRpcExecutor;
import org.apache.hadoop.hbase.ipc.RpcExecutor;
import org.apache.hadoop.hbase.ipc.RpcScheduler;
import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler;
import org.apache.hadoop.hbase.testclassification.SmallTests;
@ -54,9 +56,9 @@ public class TestRpcSchedulerFactory {
public void testRWQ() {
// Set some configs just to see how it changes the scheduler. Can't assert the settings had
// an effect. Just eyeball the log.
this.conf.setDouble(SimpleRpcScheduler.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5);
this.conf.setDouble(SimpleRpcScheduler.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.5);
this.conf.setDouble(SimpleRpcScheduler.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5);
this.conf.setDouble(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5);
this.conf.setDouble(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.5);
this.conf.setDouble(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5);
RpcSchedulerFactory factory = new SimpleRpcSchedulerFactory();
RpcScheduler rpcScheduler = factory.create(this.conf, null, null);
assertTrue(rpcScheduler.getClass().equals(SimpleRpcScheduler.class));