HBASE-17088 Refactor RWQueueRpcExecutor/BalancedQueueRpcExecutor/RpcExecutor
Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
parent
be519ca1a5
commit
fa7ed58e1b
|
@ -17,10 +17,8 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -29,7 +27,6 @@ import org.apache.hadoop.hbase.Abortable;
|
|||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
|
||||
/**
|
||||
* An {@link RpcExecutor} that will balance requests evenly across all its queues, but still remains
|
||||
|
@ -39,45 +36,22 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
|
|||
@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX })
|
||||
@InterfaceStability.Evolving
|
||||
public class BalancedQueueRpcExecutor extends RpcExecutor {
|
||||
private static final Log LOG = LogFactory.getLog(BalancedQueueRpcExecutor.class);
|
||||
|
||||
protected final List<BlockingQueue<CallRunner>> queues;
|
||||
private final QueueBalancer balancer;
|
||||
|
||||
public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
|
||||
final int maxQueueLength) {
|
||||
this(name, handlerCount, numQueues, maxQueueLength, null, null);
|
||||
public BalancedQueueRpcExecutor(final String name, final int handlerCount,
|
||||
final int maxQueueLength, final PriorityFunction priority, final Configuration conf,
|
||||
final Abortable abortable) {
|
||||
this(name, handlerCount, conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_CONF_DEFAULT),
|
||||
maxQueueLength, priority, conf, abortable);
|
||||
}
|
||||
|
||||
public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
|
||||
final int maxQueueLength, final Configuration conf, final Abortable abortable) {
|
||||
this(name, handlerCount, numQueues, conf, abortable, LinkedBlockingQueue.class, maxQueueLength);
|
||||
}
|
||||
|
||||
public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
|
||||
final Class<? extends BlockingQueue> queueClass, Object... initargs) {
|
||||
this(name, handlerCount, numQueues, null, null, queueClass, initargs);
|
||||
}
|
||||
|
||||
public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
|
||||
final Configuration conf, final Abortable abortable,
|
||||
final Class<? extends BlockingQueue> queueClass, Object... initargs) {
|
||||
super(name, Math.max(handlerCount, numQueues), conf, abortable);
|
||||
queues = new ArrayList<BlockingQueue<CallRunner>>(numQueues);
|
||||
this.balancer = getBalancer(numQueues);
|
||||
initializeQueues(numQueues, queueClass, initargs);
|
||||
LOG.debug(name + " queues=" + numQueues + " handlerCount=" + handlerCount);
|
||||
}
|
||||
|
||||
protected void initializeQueues(final int numQueues,
|
||||
final Class<? extends BlockingQueue> queueClass, Object... initargs) {
|
||||
if (initargs.length > 0) {
|
||||
currentQueueLimit = (int) initargs[0];
|
||||
initargs[0] = Math.max((int) initargs[0], DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT);
|
||||
}
|
||||
for (int i = 0; i < numQueues; ++i) {
|
||||
queues.add((BlockingQueue<CallRunner>) ReflectionUtils.newInstance(queueClass, initargs));
|
||||
}
|
||||
public BalancedQueueRpcExecutor(final String name, final int handlerCount,
|
||||
final String callQueueType, final int maxQueueLength, final PriorityFunction priority,
|
||||
final Configuration conf, final Abortable abortable) {
|
||||
super(name, handlerCount, callQueueType, maxQueueLength, priority, conf, abortable);
|
||||
this.balancer = getBalancer(this.numCallQueues);
|
||||
initializeQueues(this.numCallQueues);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -90,18 +64,4 @@ public class BalancedQueueRpcExecutor extends RpcExecutor {
|
|||
}
|
||||
return queue.offer(callTask);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getQueueLength() {
|
||||
int length = 0;
|
||||
for (final BlockingQueue<CallRunner> queue : queues) {
|
||||
length += queue.size();
|
||||
}
|
||||
return length;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<BlockingQueue<CallRunner>> getQueues() {
|
||||
return queues;
|
||||
}
|
||||
}
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.ipc;
|
|||
import java.util.Deque;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentLinkedDeque;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.Semaphore;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -44,19 +43,15 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
|
|||
private final Deque<FastPathHandler> fastPathHandlerStack = new ConcurrentLinkedDeque<>();
|
||||
|
||||
public FastPathBalancedQueueRpcExecutor(final String name, final int handlerCount,
|
||||
final int numQueues, final int maxQueueLength, final Configuration conf,
|
||||
final Abortable abortable) {
|
||||
super(name, handlerCount, numQueues, conf, abortable, LinkedBlockingQueue.class,
|
||||
maxQueueLength);
|
||||
final int maxQueueLength, final PriorityFunction priority, final Configuration conf,
|
||||
final Abortable abortable) {
|
||||
super(name, handlerCount, maxQueueLength, priority, conf, abortable);
|
||||
}
|
||||
|
||||
public FastPathBalancedQueueRpcExecutor(String name, int handlerCount,
|
||||
int numCallQueues,
|
||||
Configuration conf,
|
||||
Abortable abortable,
|
||||
Class<? extends BlockingQueue> queueClass,
|
||||
Object... args) {
|
||||
super(name, handlerCount, numCallQueues, conf, abortable, queueClass, args);
|
||||
public FastPathBalancedQueueRpcExecutor(final String name, final int handlerCount,
|
||||
final String callQueueType, final int maxQueueLength, final PriorityFunction priority,
|
||||
final Configuration conf, final Abortable abortable) {
|
||||
super(name, handlerCount, callQueueType, maxQueueLength, priority, conf, abortable);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActi
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
|
||||
|
||||
/**
|
||||
|
@ -53,7 +52,11 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
|
|||
public class RWQueueRpcExecutor extends RpcExecutor {
|
||||
private static final Log LOG = LogFactory.getLog(RWQueueRpcExecutor.class);
|
||||
|
||||
private final List<BlockingQueue<CallRunner>> queues;
|
||||
public static final String CALL_QUEUE_READ_SHARE_CONF_KEY =
|
||||
"hbase.ipc.server.callqueue.read.ratio";
|
||||
public static final String CALL_QUEUE_SCAN_SHARE_CONF_KEY =
|
||||
"hbase.ipc.server.callqueue.scan.ratio";
|
||||
|
||||
private final QueueBalancer writeBalancer;
|
||||
private final QueueBalancer readBalancer;
|
||||
private final QueueBalancer scanBalancer;
|
||||
|
@ -64,111 +67,52 @@ public class RWQueueRpcExecutor extends RpcExecutor {
|
|||
private final int numReadQueues;
|
||||
private final int numScanQueues;
|
||||
|
||||
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
|
||||
final float readShare, final int maxQueueLength,
|
||||
final Configuration conf, final Abortable abortable) {
|
||||
this(name, handlerCount, numQueues, readShare, maxQueueLength, 0,
|
||||
conf, abortable, LinkedBlockingQueue.class);
|
||||
}
|
||||
public RWQueueRpcExecutor(final String name, final int handlerCount, final int maxQueueLength,
|
||||
final PriorityFunction priority, final Configuration conf, final Abortable abortable) {
|
||||
super(name, handlerCount, maxQueueLength, priority, conf, abortable);
|
||||
|
||||
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
|
||||
final float readShare, final float scanShare, final int maxQueueLength) {
|
||||
this(name, handlerCount, numQueues, readShare, scanShare, maxQueueLength, null, null);
|
||||
}
|
||||
float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
|
||||
float callqScanShare = conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
|
||||
|
||||
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
|
||||
final float readShare, final float scanShare, final int maxQueueLength,
|
||||
final Configuration conf, final Abortable abortable) {
|
||||
this(name, handlerCount, numQueues, readShare, scanShare, maxQueueLength,
|
||||
conf, abortable, LinkedBlockingQueue.class);
|
||||
}
|
||||
numWriteQueues = calcNumWriters(this.numCallQueues, callqReadShare);
|
||||
writeHandlersCount = Math.max(numWriteQueues, calcNumWriters(handlerCount, callqReadShare));
|
||||
|
||||
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
|
||||
final float readShare, final int maxQueueLength,
|
||||
final Configuration conf, final Abortable abortable,
|
||||
final Class<? extends BlockingQueue> readQueueClass, Object... readQueueInitArgs) {
|
||||
this(name, handlerCount, numQueues, readShare, 0, maxQueueLength, conf, abortable,
|
||||
readQueueClass, readQueueInitArgs);
|
||||
}
|
||||
int readQueues = calcNumReaders(this.numCallQueues, callqReadShare);
|
||||
int readHandlers = Math.max(readQueues, calcNumReaders(handlerCount, callqReadShare));
|
||||
|
||||
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
|
||||
final float readShare, final float scanShare, final int maxQueueLength,
|
||||
final Configuration conf, final Abortable abortable,
|
||||
final Class<? extends BlockingQueue> readQueueClass, Object... readQueueInitArgs) {
|
||||
this(name, calcNumWriters(handlerCount, readShare), calcNumReaders(handlerCount, readShare),
|
||||
calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare), scanShare,
|
||||
LinkedBlockingQueue.class, new Object[] {maxQueueLength},
|
||||
readQueueClass, ArrayUtils.addAll(new Object[] {maxQueueLength}, readQueueInitArgs));
|
||||
}
|
||||
int scanQueues = Math.max(0, (int)Math.floor(readQueues * callqScanShare));
|
||||
int scanHandlers = Math.max(0, (int)Math.floor(readHandlers * callqScanShare));
|
||||
|
||||
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
|
||||
final float readShare, final float scanShare,
|
||||
final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
|
||||
final Class<? extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
|
||||
this(name, calcNumWriters(handlerCount, readShare), calcNumReaders(handlerCount, readShare),
|
||||
calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare), scanShare,
|
||||
writeQueueClass, writeQueueInitArgs,
|
||||
readQueueClass, readQueueInitArgs);
|
||||
}
|
||||
|
||||
public RWQueueRpcExecutor(final String name, final int writeHandlers, final int readHandlers,
|
||||
final int numWriteQueues, final int numReadQueues,
|
||||
final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
|
||||
final Class<? extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
|
||||
this(name, writeHandlers, readHandlers, numWriteQueues, numReadQueues, 0,
|
||||
writeQueueClass, writeQueueInitArgs, readQueueClass, readQueueInitArgs);
|
||||
}
|
||||
|
||||
public RWQueueRpcExecutor(final String name, int writeHandlers, int readHandlers,
|
||||
int numWriteQueues, int numReadQueues, float scanShare,
|
||||
final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
|
||||
final Class<? extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
|
||||
super(name, Math.max(writeHandlers, numWriteQueues) + Math.max(readHandlers, numReadQueues));
|
||||
|
||||
int numScanQueues = Math.max(0, (int)Math.floor(numReadQueues * scanShare));
|
||||
int scanHandlers = Math.max(0, (int)Math.floor(readHandlers * scanShare));
|
||||
if ((numReadQueues - numScanQueues) > 0) {
|
||||
numReadQueues -= numScanQueues;
|
||||
if ((readQueues - scanQueues) > 0) {
|
||||
readQueues -= scanQueues;
|
||||
readHandlers -= scanHandlers;
|
||||
} else {
|
||||
numScanQueues = 0;
|
||||
scanQueues = 0;
|
||||
scanHandlers = 0;
|
||||
}
|
||||
|
||||
this.writeHandlersCount = Math.max(writeHandlers, numWriteQueues);
|
||||
this.readHandlersCount = Math.max(readHandlers, numReadQueues);
|
||||
this.scanHandlersCount = Math.max(scanHandlers, numScanQueues);
|
||||
this.numWriteQueues = numWriteQueues;
|
||||
this.numReadQueues = numReadQueues;
|
||||
this.numScanQueues = numScanQueues;
|
||||
numReadQueues = readQueues;
|
||||
readHandlersCount = readHandlers;
|
||||
numScanQueues = scanQueues;
|
||||
scanHandlersCount = scanHandlers;
|
||||
|
||||
this.writeBalancer = getBalancer(numWriteQueues);
|
||||
this.readBalancer = getBalancer(numReadQueues);
|
||||
this.scanBalancer = numScanQueues > 0 ? getBalancer(numScanQueues) : null;
|
||||
|
||||
queues = new ArrayList<BlockingQueue<CallRunner>>(numWriteQueues + numReadQueues + numScanQueues);
|
||||
LOG.info(name + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount
|
||||
+ " readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount + " scanQueues="
|
||||
+ numScanQueues + " scanHandlers=" + scanHandlersCount);
|
||||
initializeQueues(numWriteQueues);
|
||||
initializeQueues(numReadQueues);
|
||||
initializeQueues(numScanQueues);
|
||||
|
||||
if (writeQueueInitArgs.length > 0) {
|
||||
currentQueueLimit = (int) writeQueueInitArgs[0];
|
||||
writeQueueInitArgs[0] = Math.max((int) writeQueueInitArgs[0],
|
||||
DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT);
|
||||
}
|
||||
for (int i = 0; i < numWriteQueues; ++i) {
|
||||
queues.add((BlockingQueue<CallRunner>)
|
||||
ReflectionUtils.newInstance(writeQueueClass, writeQueueInitArgs));
|
||||
}
|
||||
LOG.info(getName() + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount
|
||||
+ " readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount + " scanQueues="
|
||||
+ numScanQueues + " scanHandlers=" + scanHandlersCount);
|
||||
}
|
||||
|
||||
if (readQueueInitArgs.length > 0) {
|
||||
currentQueueLimit = (int) readQueueInitArgs[0];
|
||||
readQueueInitArgs[0] = Math.max((int) readQueueInitArgs[0],
|
||||
DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT);
|
||||
}
|
||||
for (int i = 0; i < (numReadQueues + numScanQueues); ++i) {
|
||||
queues.add((BlockingQueue<CallRunner>)
|
||||
ReflectionUtils.newInstance(readQueueClass, readQueueInitArgs));
|
||||
}
|
||||
@Override
|
||||
protected int computeNumCallQueues(final int handlerCount, final float callQueuesHandlersFactor) {
|
||||
// at least 1 read queue and 1 write queue
|
||||
return Math.max(2, (int) Math.round(handlerCount * callQueuesHandlersFactor));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -242,20 +186,6 @@ public class RWQueueRpcExecutor extends RpcExecutor {
|
|||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getQueueLength() {
|
||||
int length = 0;
|
||||
for (final BlockingQueue<CallRunner> queue: queues) {
|
||||
length += queue.size();
|
||||
}
|
||||
return length;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<BlockingQueue<CallRunner>> getQueues() {
|
||||
return queues;
|
||||
}
|
||||
|
||||
/*
|
||||
* Calculate the number of writers based on the "total count" and the read share.
|
||||
* You'll get at least one writer.
|
||||
|
|
|
@ -18,13 +18,16 @@
|
|||
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -33,6 +36,8 @@ import org.apache.hadoop.hbase.Abortable;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
|
||||
import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
@ -47,31 +52,115 @@ public abstract class RpcExecutor {
|
|||
private static final Log LOG = LogFactory.getLog(RpcExecutor.class);
|
||||
|
||||
protected static final int DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT = 250;
|
||||
public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = "hbase.ipc.server.callqueue.handler.factor";
|
||||
|
||||
/** max delay in msec used to bound the deprioritized requests */
|
||||
public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY = "hbase.ipc.server.queue.max.call.delay";
|
||||
|
||||
/**
|
||||
* The default, 'fifo', has the least friction but is dumb. If set to 'deadline', uses a priority
|
||||
* queue and deprioritizes long-running scans. Sorting by priority comes at a cost, reduced
|
||||
* throughput.
|
||||
*/
|
||||
public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel";
|
||||
public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline";
|
||||
public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo";
|
||||
public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type";
|
||||
public static final String CALL_QUEUE_TYPE_CONF_DEFAULT = CALL_QUEUE_TYPE_FIFO_CONF_VALUE;
|
||||
|
||||
// These 3 are only used by Codel executor
|
||||
public static final String CALL_QUEUE_CODEL_TARGET_DELAY = "hbase.ipc.server.callqueue.codel.target.delay";
|
||||
public static final String CALL_QUEUE_CODEL_INTERVAL = "hbase.ipc.server.callqueue.codel.interval";
|
||||
public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD = "hbase.ipc.server.callqueue.codel.lifo.threshold";
|
||||
|
||||
public static final int CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY = 100;
|
||||
public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL = 100;
|
||||
public static final double CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD = 0.8;
|
||||
|
||||
private AtomicLong numGeneralCallsDropped = new AtomicLong();
|
||||
private AtomicLong numLifoModeSwitches = new AtomicLong();
|
||||
|
||||
protected final int numCallQueues;
|
||||
protected final List<BlockingQueue<CallRunner>> queues;
|
||||
private final Class<? extends BlockingQueue> queueClass;
|
||||
private final Object[] queueInitArgs;
|
||||
|
||||
private final PriorityFunction priority;
|
||||
|
||||
protected volatile int currentQueueLimit;
|
||||
|
||||
private final AtomicInteger activeHandlerCount = new AtomicInteger(0);
|
||||
private final List<Handler> handlers;
|
||||
private final int handlerCount;
|
||||
private final String name;
|
||||
private final AtomicInteger failedHandlerCount = new AtomicInteger(0);
|
||||
|
||||
private String name;
|
||||
private boolean running;
|
||||
|
||||
private Configuration conf = null;
|
||||
private Abortable abortable = null;
|
||||
|
||||
public RpcExecutor(final String name, final int handlerCount) {
|
||||
this.handlers = new ArrayList<Handler>(handlerCount);
|
||||
this.handlerCount = handlerCount;
|
||||
this.name = Strings.nullToEmpty(name);
|
||||
public RpcExecutor(final String name, final int handlerCount, final int maxQueueLength,
|
||||
final PriorityFunction priority, final Configuration conf, final Abortable abortable) {
|
||||
this(name, handlerCount, conf.get(CALL_QUEUE_TYPE_CONF_KEY,
|
||||
CALL_QUEUE_TYPE_CONF_DEFAULT), maxQueueLength, priority, conf, abortable);
|
||||
}
|
||||
|
||||
public RpcExecutor(final String name, final int handlerCount, final Configuration conf,
|
||||
public RpcExecutor(final String name, final int handlerCount, final String callQueueType,
|
||||
final int maxQueueLength, final PriorityFunction priority, final Configuration conf,
|
||||
final Abortable abortable) {
|
||||
this(name, handlerCount);
|
||||
this.name = Strings.nullToEmpty(name);
|
||||
this.conf = conf;
|
||||
this.abortable = abortable;
|
||||
|
||||
float callQueuesHandlersFactor = this.conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0);
|
||||
this.numCallQueues = computeNumCallQueues(handlerCount, callQueuesHandlersFactor);
|
||||
this.queues = new ArrayList<>(this.numCallQueues);
|
||||
|
||||
this.handlerCount = Math.max(handlerCount, this.numCallQueues);
|
||||
this.handlers = new ArrayList<>(this.handlerCount);
|
||||
|
||||
this.priority = priority;
|
||||
|
||||
if (isDeadlineQueueType(callQueueType)) {
|
||||
this.name += ".Deadline";
|
||||
this.queueInitArgs = new Object[] { maxQueueLength,
|
||||
new CallPriorityComparator(conf, this.priority) };
|
||||
this.queueClass = BoundedPriorityBlockingQueue.class;
|
||||
} else if (isCodelQueueType(callQueueType)) {
|
||||
this.name += ".Codel";
|
||||
int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY,
|
||||
CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY);
|
||||
int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL, CALL_QUEUE_CODEL_DEFAULT_INTERVAL);
|
||||
double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD,
|
||||
CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD);
|
||||
queueInitArgs = new Object[] { maxQueueLength, codelTargetDelay, codelInterval,
|
||||
codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches };
|
||||
queueClass = AdaptiveLifoCoDelCallQueue.class;
|
||||
} else {
|
||||
this.name += ".Fifo";
|
||||
queueInitArgs = new Object[] { maxQueueLength };
|
||||
queueClass = LinkedBlockingQueue.class;
|
||||
}
|
||||
|
||||
LOG.info("RpcExecutor " + " name " + " using " + callQueueType
|
||||
+ " as call queue; numCallQueues=" + numCallQueues + "; maxQueueLength=" + maxQueueLength
|
||||
+ "; handlerCount=" + handlerCount);
|
||||
}
|
||||
|
||||
protected int computeNumCallQueues(final int handlerCount, final float callQueuesHandlersFactor) {
|
||||
return Math.max(1, (int) Math.round(handlerCount * callQueuesHandlersFactor));
|
||||
}
|
||||
|
||||
protected void initializeQueues(final int numQueues) {
|
||||
if (queueInitArgs.length > 0) {
|
||||
currentQueueLimit = (int) queueInitArgs[0];
|
||||
queueInitArgs[0] = Math.max((int) queueInitArgs[0], DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT);
|
||||
}
|
||||
for (int i = 0; i < numQueues; ++i) {
|
||||
queues
|
||||
.add((BlockingQueue<CallRunner>) ReflectionUtils.newInstance(queueClass, queueInitArgs));
|
||||
}
|
||||
}
|
||||
|
||||
public void start(final int port) {
|
||||
|
@ -91,13 +180,21 @@ public abstract class RpcExecutor {
|
|||
}
|
||||
|
||||
/** Returns the length of the pending queue */
|
||||
public abstract int getQueueLength();
|
||||
public int getQueueLength() {
|
||||
int length = 0;
|
||||
for (final BlockingQueue<CallRunner> queue: queues) {
|
||||
length += queue.size();
|
||||
}
|
||||
return length;
|
||||
}
|
||||
|
||||
/** Add the request to the executor queue */
|
||||
public abstract boolean dispatch(final CallRunner callTask) throws InterruptedException;
|
||||
|
||||
/** Returns the list of request queues */
|
||||
protected abstract List<BlockingQueue<CallRunner>> getQueues();
|
||||
protected List<BlockingQueue<CallRunner>> getQueues() {
|
||||
return queues;
|
||||
}
|
||||
|
||||
protected void startHandlers(final int port) {
|
||||
List<BlockingQueue<CallRunner>> callQueues = getQueues();
|
||||
|
@ -116,16 +213,16 @@ public abstract class RpcExecutor {
|
|||
* Start up our handlers.
|
||||
*/
|
||||
protected void startHandlers(final String nameSuffix, final int numHandlers,
|
||||
final List<BlockingQueue<CallRunner>> callQueues,
|
||||
final int qindex, final int qsize, final int port) {
|
||||
final List<BlockingQueue<CallRunner>> callQueues, final int qindex, final int qsize,
|
||||
final int port) {
|
||||
final String threadPrefix = name + Strings.nullToEmpty(nameSuffix);
|
||||
double handlerFailureThreshhold =
|
||||
conf == null ? 1.0 : conf.getDouble(HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT,
|
||||
HConstants.DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT);
|
||||
double handlerFailureThreshhold = conf == null ? 1.0 : conf.getDouble(
|
||||
HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT,
|
||||
HConstants.DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT);
|
||||
for (int i = 0; i < numHandlers; i++) {
|
||||
final int index = qindex + (i % qsize);
|
||||
String name = "RpcServer." + threadPrefix + ".handler=" + handlers.size() + ",queue=" +
|
||||
index + ",port=" + port;
|
||||
String name = "RpcServer." + threadPrefix + ".handler=" + handlers.size() + ",queue=" + index
|
||||
+ ",port=" + port;
|
||||
Handler handler = getHandler(name, handlerFailureThreshhold, callQueues.get(index));
|
||||
handler.start();
|
||||
LOG.debug("Started " + name);
|
||||
|
@ -190,15 +287,15 @@ public abstract class RpcExecutor {
|
|||
} catch (Throwable e) {
|
||||
if (e instanceof Error) {
|
||||
int failedCount = failedHandlerCount.incrementAndGet();
|
||||
if (this.handlerFailureThreshhold >= 0 &&
|
||||
failedCount > handlerCount * this.handlerFailureThreshhold) {
|
||||
String message = "Number of failed RpcServer handler runs exceeded threshhold " +
|
||||
this.handlerFailureThreshhold + "; reason: " + StringUtils.stringifyException(e);
|
||||
if (this.handlerFailureThreshhold >= 0
|
||||
&& failedCount > handlerCount * this.handlerFailureThreshhold) {
|
||||
String message = "Number of failed RpcServer handler runs exceeded threshhold "
|
||||
+ this.handlerFailureThreshhold + "; reason: " + StringUtils.stringifyException(e);
|
||||
if (abortable != null) {
|
||||
abortable.abort(message, e);
|
||||
} else {
|
||||
LOG.error("Error but can't abort because abortable is null: " +
|
||||
StringUtils.stringifyException(e));
|
||||
LOG.error("Error but can't abort because abortable is null: "
|
||||
+ StringUtils.stringifyException(e));
|
||||
throw e;
|
||||
}
|
||||
} else {
|
||||
|
@ -254,6 +351,59 @@ public abstract class RpcExecutor {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY is set to true. It
|
||||
* uses the calculated "deadline" e.g. to deprioritize long-running job If multiple requests have
|
||||
* the same deadline BoundedPriorityBlockingQueue will order them in FIFO (first-in-first-out)
|
||||
* manner.
|
||||
*/
|
||||
private static class CallPriorityComparator implements Comparator<CallRunner> {
|
||||
private final static int DEFAULT_MAX_CALL_DELAY = 5000;
|
||||
|
||||
private final PriorityFunction priority;
|
||||
private final int maxDelay;
|
||||
|
||||
public CallPriorityComparator(final Configuration conf, final PriorityFunction priority) {
|
||||
this.priority = priority;
|
||||
this.maxDelay = conf.getInt(QUEUE_MAX_CALL_DELAY_CONF_KEY, DEFAULT_MAX_CALL_DELAY);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compare(CallRunner a, CallRunner b) {
|
||||
RpcServer.Call callA = a.getCall();
|
||||
RpcServer.Call callB = b.getCall();
|
||||
long deadlineA = priority.getDeadline(callA.getHeader(), callA.param);
|
||||
long deadlineB = priority.getDeadline(callB.getHeader(), callB.param);
|
||||
deadlineA = callA.timestamp + Math.min(deadlineA, maxDelay);
|
||||
deadlineB = callB.timestamp + Math.min(deadlineB, maxDelay);
|
||||
return Long.compare(deadlineA, deadlineB);
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean isDeadlineQueueType(final String callQueueType) {
|
||||
return callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
|
||||
}
|
||||
|
||||
public static boolean isCodelQueueType(final String callQueueType) {
|
||||
return callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE);
|
||||
}
|
||||
|
||||
public static boolean isFifoQueueType(final String callQueueType) {
|
||||
return callQueueType.equals(CALL_QUEUE_TYPE_FIFO_CONF_VALUE);
|
||||
}
|
||||
|
||||
public long getNumGeneralCallsDropped() {
|
||||
return numGeneralCallsDropped.get();
|
||||
}
|
||||
|
||||
public long getNumLifoModeSwitches() {
|
||||
return numLifoModeSwitches.get();
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return this.name;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update current soft limit for executor's call queues
|
||||
* @param conf updated configuration
|
||||
|
@ -265,4 +415,20 @@ public abstract class RpcExecutor {
|
|||
}
|
||||
currentQueueLimit = conf.getInt(configKey, currentQueueLimit);
|
||||
}
|
||||
}
|
||||
|
||||
public void onConfigurationChange(Configuration conf) {
|
||||
// update CoDel Scheduler tunables
|
||||
int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY,
|
||||
CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY);
|
||||
int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL, CALL_QUEUE_CODEL_DEFAULT_INTERVAL);
|
||||
double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD,
|
||||
CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD);
|
||||
|
||||
for (BlockingQueue<CallRunner> queue : queues) {
|
||||
if (queue instanceof AdaptiveLifoCoDelCallQueue) {
|
||||
((AdaptiveLifoCoDelCallQueue) queue).updateTunables(codelTargetDelay, codelInterval,
|
||||
codelLifoThreshold);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,107 +45,6 @@ import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
|
|||
public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObserver {
|
||||
private static final Log LOG = LogFactory.getLog(SimpleRpcScheduler.class);
|
||||
|
||||
public static final String CALL_QUEUE_READ_SHARE_CONF_KEY =
|
||||
"hbase.ipc.server.callqueue.read.ratio";
|
||||
public static final String CALL_QUEUE_SCAN_SHARE_CONF_KEY =
|
||||
"hbase.ipc.server.callqueue.scan.ratio";
|
||||
public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
|
||||
"hbase.ipc.server.callqueue.handler.factor";
|
||||
static final String CODEL_FASTPATH_BALANCED_Q = "CodelFPBQ.default";
|
||||
|
||||
/**
|
||||
* The default, 'fifo', has the least friction but is dumb.
|
||||
* If set to 'deadline', uses a priority queue and deprioritizes long-running scans. Sorting by
|
||||
* priority comes at a cost, reduced throughput.
|
||||
*/
|
||||
public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel";
|
||||
public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline";
|
||||
public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo";
|
||||
public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type";
|
||||
public static final String CALL_QUEUE_TYPE_CONF_DEFAULT = CALL_QUEUE_TYPE_FIFO_CONF_VALUE;
|
||||
|
||||
/** max delay in msec used to bound the deprioritized requests */
|
||||
public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY
|
||||
= "hbase.ipc.server.queue.max.call.delay";
|
||||
|
||||
// These 3 are only used by Codel executor
|
||||
public static final String CALL_QUEUE_CODEL_TARGET_DELAY =
|
||||
"hbase.ipc.server.callqueue.codel.target.delay";
|
||||
public static final String CALL_QUEUE_CODEL_INTERVAL =
|
||||
"hbase.ipc.server.callqueue.codel.interval";
|
||||
public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD =
|
||||
"hbase.ipc.server.callqueue.codel.lifo.threshold";
|
||||
|
||||
public static final int CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY = 100;
|
||||
public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL = 100;
|
||||
public static final double CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD = 0.8;
|
||||
|
||||
private AtomicLong numGeneralCallsDropped = new AtomicLong();
|
||||
private AtomicLong numLifoModeSwitches = new AtomicLong();
|
||||
|
||||
/**
|
||||
* Resize call queues;
|
||||
* @param conf new configuration
|
||||
*/
|
||||
@Override
|
||||
public void onConfigurationChange(Configuration conf) {
|
||||
callExecutor.resizeQueues(conf);
|
||||
if (priorityExecutor != null) {
|
||||
priorityExecutor.resizeQueues(conf);
|
||||
}
|
||||
if (replicationExecutor != null) {
|
||||
replicationExecutor.resizeQueues(conf);
|
||||
}
|
||||
|
||||
String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_CONF_DEFAULT);
|
||||
if (isCodelQueueType(callQueueType)) {
|
||||
// update CoDel Scheduler tunables
|
||||
int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY,
|
||||
CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY);
|
||||
int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL,
|
||||
CALL_QUEUE_CODEL_DEFAULT_INTERVAL);
|
||||
double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD,
|
||||
CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD);
|
||||
|
||||
for (BlockingQueue<CallRunner> queue : callExecutor.getQueues()) {
|
||||
if (queue instanceof AdaptiveLifoCoDelCallQueue) {
|
||||
((AdaptiveLifoCoDelCallQueue) queue).updateTunables(codelTargetDelay,
|
||||
codelInterval, codelLifoThreshold);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY is set to true.
|
||||
* It uses the calculated "deadline" e.g. to deprioritize long-running job
|
||||
*
|
||||
* If multiple requests have the same deadline BoundedPriorityBlockingQueue will order them in
|
||||
* FIFO (first-in-first-out) manner.
|
||||
*/
|
||||
private static class CallPriorityComparator implements Comparator<CallRunner> {
|
||||
private final static int DEFAULT_MAX_CALL_DELAY = 5000;
|
||||
|
||||
private final PriorityFunction priority;
|
||||
private final int maxDelay;
|
||||
|
||||
public CallPriorityComparator(final Configuration conf, final PriorityFunction priority) {
|
||||
this.priority = priority;
|
||||
this.maxDelay = conf.getInt(QUEUE_MAX_CALL_DELAY_CONF_KEY, DEFAULT_MAX_CALL_DELAY);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compare(CallRunner a, CallRunner b) {
|
||||
RpcServer.Call callA = a.getCall();
|
||||
RpcServer.Call callB = b.getCall();
|
||||
long deadlineA = priority.getDeadline(callA.getHeader(), callA.param);
|
||||
long deadlineB = priority.getDeadline(callB.getHeader(), callB.param);
|
||||
deadlineA = callA.timestamp + Math.min(deadlineA, maxDelay);
|
||||
deadlineB = callB.timestamp + Math.min(deadlineB, maxDelay);
|
||||
return Long.compare(deadlineA, deadlineB);
|
||||
}
|
||||
}
|
||||
|
||||
private int port;
|
||||
private final PriorityFunction priority;
|
||||
private final RpcExecutor callExecutor;
|
||||
|
@ -183,75 +82,33 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
|
|||
this.highPriorityLevel = highPriorityLevel;
|
||||
this.abortable = server;
|
||||
|
||||
String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_CONF_DEFAULT);
|
||||
float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
|
||||
float callqScanShare = conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
|
||||
String callQueueType = conf.get(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
|
||||
RpcExecutor.CALL_QUEUE_TYPE_CONF_DEFAULT);
|
||||
float callqReadShare = conf.getFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
|
||||
|
||||
int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY,
|
||||
CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY);
|
||||
int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL,
|
||||
CALL_QUEUE_CODEL_DEFAULT_INTERVAL);
|
||||
double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD,
|
||||
CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD);
|
||||
|
||||
float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0);
|
||||
int numCallQueues = Math.max(1, (int)Math.round(handlerCount * callQueuesHandlersFactor));
|
||||
LOG.info("Using " + callQueueType + " as user call queue; numCallQueues=" + numCallQueues +
|
||||
"; callQReadShare=" + callqReadShare + ", callQScanShare=" + callqScanShare);
|
||||
if (numCallQueues > 1 && callqReadShare > 0) {
|
||||
// multiple read/write queues
|
||||
if (isDeadlineQueueType(callQueueType)) {
|
||||
CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
|
||||
callExecutor = new RWQueueRpcExecutor("DeadlineRWQ.default", handlerCount, numCallQueues,
|
||||
callqReadShare, callqScanShare, maxQueueLength, conf, abortable,
|
||||
BoundedPriorityBlockingQueue.class, callPriority);
|
||||
} else if (isCodelQueueType(callQueueType)) {
|
||||
Object[] callQueueInitArgs = {maxQueueLength, codelTargetDelay, codelInterval,
|
||||
codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches};
|
||||
callExecutor = new RWQueueRpcExecutor("CodelRWQ.default", handlerCount,
|
||||
numCallQueues, callqReadShare, callqScanShare,
|
||||
AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs,
|
||||
AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs);
|
||||
} else {
|
||||
// FifoWFPBQ = FastPathBalancedQueueRpcExecutor
|
||||
callExecutor = new RWQueueRpcExecutor("FifoRWQ.default", handlerCount, numCallQueues,
|
||||
callqReadShare, callqScanShare, maxQueueLength, conf, abortable);
|
||||
}
|
||||
if (callqReadShare > 0) {
|
||||
// at least 1 read handler and 1 write handler
|
||||
callExecutor = new RWQueueRpcExecutor("deafult.RWQ", Math.max(2, handlerCount),
|
||||
maxQueueLength, priority, conf, server);
|
||||
} else {
|
||||
// multiple queues
|
||||
if (isDeadlineQueueType(callQueueType)) {
|
||||
CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
|
||||
callExecutor =
|
||||
new BalancedQueueRpcExecutor("DeadlineBQ.default", handlerCount, numCallQueues,
|
||||
conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
|
||||
} else if (isCodelQueueType(callQueueType)) {
|
||||
callExecutor =
|
||||
new BalancedQueueRpcExecutor(CODEL_FASTPATH_BALANCED_Q, handlerCount, numCallQueues,
|
||||
conf, abortable, AdaptiveLifoCoDelCallQueue.class, maxQueueLength,
|
||||
codelTargetDelay, codelInterval, codelLifoThreshold,
|
||||
numGeneralCallsDropped, numLifoModeSwitches);
|
||||
if (RpcExecutor.isFifoQueueType(callQueueType)) {
|
||||
callExecutor = new FastPathBalancedQueueRpcExecutor("deafult.FPBQ", handlerCount,
|
||||
maxPriorityQueueLength, priority, conf, server);
|
||||
} else {
|
||||
// FifoWFPBQ = FastPathBalancedQueueRpcExecutor
|
||||
callExecutor = new FastPathBalancedQueueRpcExecutor("FifoWFPBQ.default",
|
||||
handlerCount, numCallQueues, maxQueueLength, conf, abortable);
|
||||
callExecutor = new BalancedQueueRpcExecutor("deafult.BQ", handlerCount, maxQueueLength,
|
||||
priority, conf, server);
|
||||
}
|
||||
}
|
||||
|
||||
// Create 2 queues to help priorityExecutor be more scalable.
|
||||
this.priorityExecutor = priorityHandlerCount > 0?
|
||||
new FastPathBalancedQueueRpcExecutor("FifoWFPBQ.priority", priorityHandlerCount,
|
||||
2, maxPriorityQueueLength, conf, abortable): null;
|
||||
this.replicationExecutor = replicationHandlerCount > 0?
|
||||
new FastPathBalancedQueueRpcExecutor("FifoWFPBQ.replication",
|
||||
replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null;
|
||||
this.priorityExecutor = priorityHandlerCount > 0 ? new FastPathBalancedQueueRpcExecutor(
|
||||
"priority.FPBQ", priorityHandlerCount, RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE,
|
||||
maxPriorityQueueLength, priority, conf, abortable) : null;
|
||||
this.replicationExecutor = replicationHandlerCount > 0 ? new FastPathBalancedQueueRpcExecutor(
|
||||
"replication.FPBQ", replicationHandlerCount, RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE,
|
||||
maxQueueLength, priority, conf, abortable) : null;
|
||||
}
|
||||
|
||||
private static boolean isDeadlineQueueType(final String callQueueType) {
|
||||
return callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
|
||||
}
|
||||
|
||||
private static boolean isCodelQueueType(final String callQueueType) {
|
||||
return callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE);
|
||||
}
|
||||
|
||||
public SimpleRpcScheduler(
|
||||
Configuration conf,
|
||||
|
@ -264,6 +121,27 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
|
|||
null, highPriorityLevel);
|
||||
}
|
||||
|
||||
/**
|
||||
* Resize call queues;
|
||||
* @param conf new configuration
|
||||
*/
|
||||
@Override
|
||||
public void onConfigurationChange(Configuration conf) {
|
||||
callExecutor.resizeQueues(conf);
|
||||
if (priorityExecutor != null) {
|
||||
priorityExecutor.resizeQueues(conf);
|
||||
}
|
||||
if (replicationExecutor != null) {
|
||||
replicationExecutor.resizeQueues(conf);
|
||||
}
|
||||
|
||||
String callQueueType = conf.get(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
|
||||
RpcExecutor.CALL_QUEUE_TYPE_CONF_DEFAULT);
|
||||
if (RpcExecutor.isCodelQueueType(callQueueType)) {
|
||||
callExecutor.onConfigurationChange(conf);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(Context context) {
|
||||
this.port = context.getListenerAddress().getPort();
|
||||
|
@ -320,12 +198,12 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
|
|||
|
||||
@Override
|
||||
public long getNumGeneralCallsDropped() {
|
||||
return numGeneralCallsDropped.get();
|
||||
return callExecutor.getNumGeneralCallsDropped();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getNumLifoModeSwitches() {
|
||||
return numLifoModeSwitches.get();
|
||||
return callExecutor.getNumLifoModeSwitches();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
|
|||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
|
||||
import org.apache.hadoop.hbase.ipc.RpcExecutor;
|
||||
import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
|
@ -73,7 +74,7 @@ public class TestFastFail {
|
|||
public static void setUpBeforeClass() throws Exception {
|
||||
// Just to prevent fastpath FIFO from picking calls up bypassing the queue.
|
||||
TEST_UTIL.getConfiguration().set(
|
||||
SimpleRpcScheduler.CALL_QUEUE_TYPE_CONF_KEY, "deadline");
|
||||
RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, "deadline");
|
||||
TEST_UTIL.startMiniCluster(SLAVES);
|
||||
}
|
||||
|
||||
|
|
|
@ -173,13 +173,13 @@ public class TestSimpleRpcScheduler {/*
|
|||
|
||||
@Test
|
||||
public void testRpcScheduler() throws Exception {
|
||||
testRpcScheduler(SimpleRpcScheduler.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
|
||||
testRpcScheduler(SimpleRpcScheduler.CALL_QUEUE_TYPE_FIFO_CONF_VALUE);
|
||||
testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
|
||||
testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE);
|
||||
}
|
||||
|
||||
private void testRpcScheduler(final String queueType) throws Exception {
|
||||
Configuration schedConf = HBaseConfiguration.create();
|
||||
schedConf.set(SimpleRpcScheduler.CALL_QUEUE_TYPE_CONF_KEY, queueType);
|
||||
schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
|
||||
|
||||
PriorityFunction priority = mock(PriorityFunction.class);
|
||||
when(priority.getPriority(any(RequestHeader.class),
|
||||
|
@ -243,9 +243,9 @@ public class TestSimpleRpcScheduler {/*
|
|||
// -> [small small small huge small large small small]
|
||||
// -> NO REORDER [10 10 10 100 10 50 10 10] -> 930 (FIFO Queue)
|
||||
// -> WITH REORDER [10 10 10 10 10 10 50 100] -> 530 (Deadline Queue)
|
||||
if (queueType.equals(SimpleRpcScheduler.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
|
||||
if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
|
||||
assertEquals(530, totalTime);
|
||||
} else if (queueType.equals(SimpleRpcScheduler.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)) {
|
||||
} else if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)) {
|
||||
assertEquals(930, totalTime);
|
||||
}
|
||||
} finally {
|
||||
|
@ -256,9 +256,9 @@ public class TestSimpleRpcScheduler {/*
|
|||
@Test
|
||||
public void testScanQueueWithZeroScanRatio() throws Exception {
|
||||
Configuration schedConf = HBaseConfiguration.create();
|
||||
schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
|
||||
schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f);
|
||||
schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0f);
|
||||
schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
|
||||
schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f);
|
||||
schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0f);
|
||||
|
||||
PriorityFunction priority = mock(PriorityFunction.class);
|
||||
when(priority.getPriority(any(RequestHeader.class), any(Message.class),
|
||||
|
@ -272,9 +272,9 @@ public class TestSimpleRpcScheduler {/*
|
|||
@Test
|
||||
public void testScanQueues() throws Exception {
|
||||
Configuration schedConf = HBaseConfiguration.create();
|
||||
schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
|
||||
schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f);
|
||||
schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);
|
||||
schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
|
||||
schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f);
|
||||
schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);
|
||||
|
||||
PriorityFunction priority = mock(PriorityFunction.class);
|
||||
when(priority.getPriority(any(RPCProtos.RequestHeader.class), any(Message.class),
|
||||
|
@ -431,8 +431,8 @@ public class TestSimpleRpcScheduler {/*
|
|||
envEdge.threadNamePrefixs.add("RpcServer.CodelRWQ.default.handler");
|
||||
Configuration schedConf = HBaseConfiguration.create();
|
||||
schedConf.setInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 250);
|
||||
schedConf.set(SimpleRpcScheduler.CALL_QUEUE_TYPE_CONF_KEY,
|
||||
SimpleRpcScheduler.CALL_QUEUE_TYPE_CODEL_CONF_VALUE);
|
||||
schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
|
||||
RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE);
|
||||
PriorityFunction priority = mock(PriorityFunction.class);
|
||||
when(priority.getPriority(any(RPCProtos.RequestHeader.class), any(Message.class),
|
||||
any(User.class))).thenReturn(HConstants.NORMAL_QOS);
|
||||
|
|
|
@ -23,6 +23,8 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hbase.CategoryBasedTimeout;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
|
||||
import org.apache.hadoop.hbase.ipc.RWQueueRpcExecutor;
|
||||
import org.apache.hadoop.hbase.ipc.RpcExecutor;
|
||||
import org.apache.hadoop.hbase.ipc.RpcScheduler;
|
||||
import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
|
@ -54,9 +56,9 @@ public class TestRpcSchedulerFactory {
|
|||
public void testRWQ() {
|
||||
// Set some configs just to see how it changes the scheduler. Can't assert the settings had
|
||||
// an effect. Just eyeball the log.
|
||||
this.conf.setDouble(SimpleRpcScheduler.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5);
|
||||
this.conf.setDouble(SimpleRpcScheduler.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.5);
|
||||
this.conf.setDouble(SimpleRpcScheduler.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5);
|
||||
this.conf.setDouble(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5);
|
||||
this.conf.setDouble(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.5);
|
||||
this.conf.setDouble(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5);
|
||||
RpcSchedulerFactory factory = new SimpleRpcSchedulerFactory();
|
||||
RpcScheduler rpcScheduler = factory.create(this.conf, null, null);
|
||||
assertTrue(rpcScheduler.getClass().equals(SimpleRpcScheduler.class));
|
||||
|
|
Loading…
Reference in New Issue