HBASE-15136 Explore different queuing behaviors while busy
This commit is contained in:
parent
6e9d355b12
commit
43f99def67
|
@ -85,6 +85,7 @@ public class ReflectionUtils {
|
|||
match = (!ctorParamTypes[i].isPrimitive()) ? ctorParamTypes[i].isAssignableFrom(paramType) :
|
||||
((int.class.equals(ctorParamTypes[i]) && Integer.class.equals(paramType)) ||
|
||||
(long.class.equals(ctorParamTypes[i]) && Long.class.equals(paramType)) ||
|
||||
(double.class.equals(ctorParamTypes[i]) && Double.class.equals(paramType)) ||
|
||||
(char.class.equals(ctorParamTypes[i]) && Character.class.equals(paramType)) ||
|
||||
(short.class.equals(ctorParamTypes[i]) && Short.class.equals(paramType)) ||
|
||||
(boolean.class.equals(ctorParamTypes[i]) && Boolean.class.equals(paramType)) ||
|
||||
|
|
|
@ -64,6 +64,12 @@ public interface MetricsHBaseServerSource extends BaseSource {
|
|||
String NUM_OPEN_CONNECTIONS_DESC = "Number of open connections.";
|
||||
String NUM_ACTIVE_HANDLER_NAME = "numActiveHandler";
|
||||
String NUM_ACTIVE_HANDLER_DESC = "Number of active rpc handlers.";
|
||||
String NUM_GENERAL_CALLS_DROPPED_NAME = "numGeneralCallsDropped";
|
||||
String NUM_GENERAL_CALLS_DROPPED_DESC = "Total number of calls in general queue which " +
|
||||
"were dropped by CoDel RPC executor";
|
||||
String NUM_LIFO_MODE_SWITCHES_NAME = "numLifoModeSwitches";
|
||||
String NUM_LIFO_MODE_SWITCHES_DESC = "Total number of calls in general queue which " +
|
||||
"were served from the tail of the queue";
|
||||
|
||||
String EXCEPTIONS_NAME="exceptions";
|
||||
String EXCEPTIONS_DESC="Exceptions caused by requests";
|
||||
|
|
|
@ -26,4 +26,6 @@ public interface MetricsHBaseServerWrapper {
|
|||
int getPriorityQueueLength();
|
||||
int getNumOpenConnections();
|
||||
int getActiveRpcHandlerCount();
|
||||
long getNumGeneralCallsDropped();
|
||||
long getNumLifoModeSwitches();
|
||||
}
|
||||
|
|
|
@ -219,7 +219,11 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
|
|||
.addGauge(Interns.info(NUM_OPEN_CONNECTIONS_NAME,
|
||||
NUM_OPEN_CONNECTIONS_DESC), wrapper.getNumOpenConnections())
|
||||
.addGauge(Interns.info(NUM_ACTIVE_HANDLER_NAME,
|
||||
NUM_ACTIVE_HANDLER_DESC), wrapper.getActiveRpcHandlerCount());
|
||||
NUM_ACTIVE_HANDLER_DESC), wrapper.getActiveRpcHandlerCount())
|
||||
.addCounter(Interns.info(NUM_GENERAL_CALLS_DROPPED_NAME,
|
||||
NUM_GENERAL_CALLS_DROPPED_DESC), wrapper.getNumGeneralCallsDropped())
|
||||
.addCounter(Interns.info(NUM_LIFO_MODE_SWITCHES_NAME,
|
||||
NUM_LIFO_MODE_SWITCHES_DESC), wrapper.getNumLifoModeSwitches());
|
||||
}
|
||||
|
||||
metricsRegistry.snapshot(mrb, all);
|
||||
|
|
|
@ -0,0 +1,329 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Adaptive LIFO blocking queue utilizing CoDel algorithm to prevent queue overloading.
|
||||
*
|
||||
* Implementing {@link BlockingQueue} interface to be compatible with {@link RpcExecutor}.
|
||||
*
|
||||
* Currently uses milliseconds internally, need to look into whether we should use
|
||||
* nanoseconds for timeInterval and minDelay.
|
||||
*
|
||||
* @see <a href="http://queue.acm.org/detail.cfm?id=2839461">Fail at Scale paper</a>
|
||||
*
|
||||
* @see <a href="https://github.com/facebook/wangle/blob/master/wangle/concurrent/Codel.cpp">
|
||||
* CoDel version for generic job queues in Wangle library</a>
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
|
||||
|
||||
// backing queue
|
||||
private LinkedBlockingDeque<CallRunner> queue;
|
||||
|
||||
// so we can calculate actual threshold to switch to LIFO under load
|
||||
private int maxCapacity;
|
||||
|
||||
// metrics (shared across all queues)
|
||||
private AtomicLong numGeneralCallsDropped;
|
||||
private AtomicLong numLifoModeSwitches;
|
||||
|
||||
/**
|
||||
* Lock held by take ops, all other locks are inside queue impl.
|
||||
*
|
||||
* NOTE: We want to have this lock so that in case when there're lot of already expired
|
||||
* calls in the call queue a handler thread calling take() can just grab lock once and
|
||||
* then fast-forward through the expired calls to the first non-expired without having
|
||||
* to contend for locks on every element in underlying queue.
|
||||
*/
|
||||
private final ReentrantLock lock = new ReentrantLock();
|
||||
|
||||
// Both are in milliseconds
|
||||
private volatile int codelTargetDelay;
|
||||
private volatile int codelInterval;
|
||||
|
||||
// if queue if full more than that percent, we switch to LIFO mode.
|
||||
// Values are in the range of 0.7, 0.8 etc (0-1.0).
|
||||
private volatile double lifoThreshold;
|
||||
|
||||
// minimal delay observed during the interval
|
||||
private volatile long minDelay;
|
||||
|
||||
// the moment when current interval ends
|
||||
private volatile long intervalTime = System.currentTimeMillis();
|
||||
|
||||
// switch to ensure only one threads does interval cutoffs
|
||||
private AtomicBoolean resetDelay = new AtomicBoolean(true);
|
||||
|
||||
// if we're in this mode, "long" calls are getting dropped
|
||||
private volatile boolean isOverloaded;
|
||||
|
||||
public AdaptiveLifoCoDelCallQueue(int capacity, int targetDelay, int interval,
|
||||
double lifoThreshold, AtomicLong numGeneralCallsDropped, AtomicLong numLifoModeSwitches) {
|
||||
this.maxCapacity = capacity;
|
||||
this.queue = new LinkedBlockingDeque<>(capacity);
|
||||
this.codelTargetDelay = targetDelay;
|
||||
this.codelInterval = interval;
|
||||
this.lifoThreshold = lifoThreshold;
|
||||
this.numGeneralCallsDropped = numGeneralCallsDropped;
|
||||
this.numLifoModeSwitches = numLifoModeSwitches;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update tunables.
|
||||
*
|
||||
* @param newCodelTargetDelay new CoDel target delay
|
||||
* @param newCodelInterval new CoDel interval
|
||||
* @param newLifoThreshold new Adaptive Lifo threshold
|
||||
*/
|
||||
public void updateTunables(int newCodelTargetDelay, int newCodelInterval,
|
||||
double newLifoThreshold) {
|
||||
this.codelTargetDelay = newCodelTargetDelay;
|
||||
this.codelInterval = newCodelInterval;
|
||||
this.lifoThreshold = newLifoThreshold;
|
||||
}
|
||||
|
||||
/**
|
||||
* Behaves as {@link LinkedBlockingQueue#take()}, except it will silently
|
||||
* skip all calls which it thinks should be dropped.
|
||||
*
|
||||
* @return the head of this queue
|
||||
* @throws InterruptedException if interrupted while waiting
|
||||
*/
|
||||
@Override
|
||||
public CallRunner take() throws InterruptedException {
|
||||
final ReentrantLock lock = this.lock;
|
||||
lock.lock();
|
||||
try {
|
||||
CallRunner cr;
|
||||
while(true) {
|
||||
if (((double) queue.size() / this.maxCapacity) > lifoThreshold) {
|
||||
numLifoModeSwitches.incrementAndGet();
|
||||
cr = queue.takeLast();
|
||||
} else {
|
||||
cr = queue.takeFirst();
|
||||
}
|
||||
if (needToDrop(cr)) {
|
||||
numGeneralCallsDropped.incrementAndGet();
|
||||
} else {
|
||||
return cr;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param callRunner to validate
|
||||
* @return true if this call needs to be skipped based on call timestamp
|
||||
* and internal queue state (deemed overloaded).
|
||||
*/
|
||||
private boolean needToDrop(CallRunner callRunner) {
|
||||
long now = System.currentTimeMillis();
|
||||
long callDelay = now - callRunner.getCall().timestamp;
|
||||
|
||||
long localMinDelay = this.minDelay;
|
||||
if (now > intervalTime && !resetDelay.getAndSet(true)) {
|
||||
intervalTime = now + codelInterval;
|
||||
|
||||
if (localMinDelay > codelTargetDelay) {
|
||||
isOverloaded = true;
|
||||
} else {
|
||||
isOverloaded = false;
|
||||
}
|
||||
}
|
||||
|
||||
if (resetDelay.getAndSet(false)) {
|
||||
minDelay = callDelay;
|
||||
return false;
|
||||
} else if (callDelay < localMinDelay) {
|
||||
minDelay = callDelay;
|
||||
}
|
||||
|
||||
if (isOverloaded && callDelay > 2 * codelTargetDelay) {
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// Generic BlockingQueue methods we support
|
||||
@Override
|
||||
public boolean offer(CallRunner callRunner) {
|
||||
return queue.offer(callRunner);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size() {
|
||||
return queue.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return queue.toString();
|
||||
}
|
||||
|
||||
// This class does NOT provide generic purpose BlockingQueue implementation,
|
||||
// so to prevent misuse all other methods throw UnsupportedOperationException.
|
||||
|
||||
@Override
|
||||
public CallRunner poll(long timeout, TimeUnit unit) throws InterruptedException {
|
||||
throw new UnsupportedOperationException("This class doesn't support anything,"
|
||||
+ " but take() and offer() methods");
|
||||
}
|
||||
|
||||
@Override
|
||||
public CallRunner poll() {
|
||||
throw new UnsupportedOperationException("This class doesn't support anything,"
|
||||
+ " but take() and offer() methods");
|
||||
}
|
||||
|
||||
@Override
|
||||
public CallRunner peek() {
|
||||
throw new UnsupportedOperationException("This class doesn't support anything,"
|
||||
+ " but take() and offer() methods");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean remove(Object o) {
|
||||
throw new UnsupportedOperationException("This class doesn't support anything,"
|
||||
+ " but take() and offer() methods");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean contains(Object o) {
|
||||
throw new UnsupportedOperationException("This class doesn't support anything,"
|
||||
+ " but take() and offer() methods");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object[] toArray() {
|
||||
throw new UnsupportedOperationException("This class doesn't support anything,"
|
||||
+ " but take() and offer() methods");
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T[] toArray(T[] a) {
|
||||
throw new UnsupportedOperationException("This class doesn't support anything,"
|
||||
+ " but take() and offer() methods");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
throw new UnsupportedOperationException("This class doesn't support anything,"
|
||||
+ " but take() and offer() methods");
|
||||
}
|
||||
|
||||
@Override
|
||||
public int drainTo(Collection<? super CallRunner> c) {
|
||||
throw new UnsupportedOperationException("This class doesn't support anything,"
|
||||
+ " but take() and offer() methods");
|
||||
}
|
||||
|
||||
@Override
|
||||
public int drainTo(Collection<? super CallRunner> c, int maxElements) {
|
||||
throw new UnsupportedOperationException("This class doesn't support anything,"
|
||||
+ " but take() and offer() methods");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<CallRunner> iterator() {
|
||||
throw new UnsupportedOperationException("This class doesn't support anything,"
|
||||
+ " but take() and offer() methods");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean add(CallRunner callRunner) {
|
||||
throw new UnsupportedOperationException("This class doesn't support anything,"
|
||||
+ " but take() and offer() methods");
|
||||
}
|
||||
|
||||
@Override
|
||||
public CallRunner remove() {
|
||||
throw new UnsupportedOperationException("This class doesn't support anything,"
|
||||
+ " but take() and offer() methods");
|
||||
}
|
||||
|
||||
@Override
|
||||
public CallRunner element() {
|
||||
throw new UnsupportedOperationException("This class doesn't support anything,"
|
||||
+ " but take() and offer() methods");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean addAll(Collection<? extends CallRunner> c) {
|
||||
throw new UnsupportedOperationException("This class doesn't support anything,"
|
||||
+ " but take() and offer() methods");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEmpty() {
|
||||
throw new UnsupportedOperationException("This class doesn't support anything,"
|
||||
+ " but take() and offer() methods");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean containsAll(Collection<?> c) {
|
||||
throw new UnsupportedOperationException("This class doesn't support anything,"
|
||||
+ " but take() and offer() methods");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean removeAll(Collection<?> c) {
|
||||
throw new UnsupportedOperationException("This class doesn't support anything,"
|
||||
+ " but take() and offer() methods");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean retainAll(Collection<?> c) {
|
||||
throw new UnsupportedOperationException("This class doesn't support anything,"
|
||||
+ " but take() and offer() methods");
|
||||
}
|
||||
|
||||
@Override
|
||||
public int remainingCapacity() {
|
||||
throw new UnsupportedOperationException("This class doesn't support anything,"
|
||||
+ " but take() and offer() methods");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(CallRunner callRunner) throws InterruptedException {
|
||||
throw new UnsupportedOperationException("This class doesn't support anything,"
|
||||
+ " but take() and offer() methods");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean offer(CallRunner callRunner, long timeout, TimeUnit unit)
|
||||
throws InterruptedException {
|
||||
throw new UnsupportedOperationException("This class doesn't support anything,"
|
||||
+ " but take() and offer() methods");
|
||||
}
|
||||
}
|
|
@ -104,4 +104,14 @@ public class FifoRpcScheduler extends RpcScheduler {
|
|||
public int getActiveRpcHandlerCount() {
|
||||
return executor.getActiveCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getNumGeneralCallsDropped() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getNumLifoModeSwitches() {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -78,4 +78,20 @@ public class MetricsHBaseServerWrapperImpl implements MetricsHBaseServerWrapper
|
|||
}
|
||||
return server.getScheduler().getActiveRpcHandlerCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getNumGeneralCallsDropped() {
|
||||
if (!isServerStarted() || this.server.getScheduler() == null) {
|
||||
return 0;
|
||||
}
|
||||
return server.getScheduler().getNumGeneralCallsDropped();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getNumLifoModeSwitches() {
|
||||
if (!isServerStarted() || this.server.getScheduler() == null) {
|
||||
return 0;
|
||||
}
|
||||
return server.getScheduler().getNumLifoModeSwitches();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -100,6 +100,16 @@ public class RWQueueRpcExecutor extends RpcExecutor {
|
|||
readQueueClass, ArrayUtils.addAll(new Object[] {maxQueueLength}, readQueueInitArgs));
|
||||
}
|
||||
|
||||
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
|
||||
final float readShare, final float scanShare,
|
||||
final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
|
||||
final Class<? extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
|
||||
this(name, calcNumWriters(handlerCount, readShare), calcNumReaders(handlerCount, readShare),
|
||||
calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare), scanShare,
|
||||
writeQueueClass, writeQueueInitArgs,
|
||||
readQueueClass, readQueueInitArgs);
|
||||
}
|
||||
|
||||
public RWQueueRpcExecutor(final String name, final int writeHandlers, final int readHandlers,
|
||||
final int numWriteQueues, final int numReadQueues,
|
||||
final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
|
||||
|
|
|
@ -71,4 +71,17 @@ public abstract class RpcScheduler {
|
|||
|
||||
/** Retrieves the number of active handler. */
|
||||
public abstract int getActiveRpcHandlerCount();
|
||||
|
||||
/**
|
||||
* If CoDel-based RPC executors are used, retrieves the number of Calls that were dropped
|
||||
* from general queue because RPC executor is under high load; returns 0 otherwise.
|
||||
*/
|
||||
public abstract long getNumGeneralCallsDropped();
|
||||
|
||||
/**
|
||||
* If CoDel-based RPC executors are used, retrieves the number of Calls that were
|
||||
* picked from the tail of the queue (indicating adaptive LIFO mode, when
|
||||
* in the period of overloade we serve last requests first); returns 0 otherwise.
|
||||
*/
|
||||
public abstract long getNumLifoModeSwitches();
|
||||
}
|
||||
|
|
|
@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.ipc;
|
|||
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -49,6 +51,7 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
|
|||
|
||||
/** If set to 'deadline', uses a priority queue and deprioritize long-running scans */
|
||||
public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type";
|
||||
public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel";
|
||||
public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline";
|
||||
public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo";
|
||||
|
||||
|
@ -56,6 +59,21 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
|
|||
public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY
|
||||
= "hbase.ipc.server.queue.max.call.delay";
|
||||
|
||||
// These 3 are only used by Codel executor
|
||||
public static final String CALL_QUEUE_CODEL_TARGET_DELAY =
|
||||
"hbase.ipc.server.callqueue.codel.target.delay";
|
||||
public static final String CALL_QUEUE_CODEL_INTERVAL =
|
||||
"hbase.ipc.server.callqueue.codel.interval";
|
||||
public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD =
|
||||
"hbase.ipc.server.callqueue.codel.lifo.threshold";
|
||||
|
||||
public static final int CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY = 5;
|
||||
public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL = 100;
|
||||
public static final double CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD = 0.8;
|
||||
|
||||
private AtomicLong numGeneralCallsDropped = new AtomicLong();
|
||||
private AtomicLong numLifoModeSwitches = new AtomicLong();
|
||||
|
||||
/**
|
||||
* Resize call queues;
|
||||
* @param conf new configuration
|
||||
|
@ -69,6 +87,26 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
|
|||
if (replicationExecutor != null) {
|
||||
replicationExecutor.resizeQueues(conf);
|
||||
}
|
||||
|
||||
String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY,
|
||||
CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
|
||||
|
||||
if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) {
|
||||
// update CoDel Scheduler tunables
|
||||
int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY,
|
||||
CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY);
|
||||
int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL,
|
||||
CALL_QUEUE_CODEL_DEFAULT_INTERVAL);
|
||||
double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD,
|
||||
CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD);
|
||||
|
||||
for (BlockingQueue<CallRunner> queue : callExecutor.getQueues()) {
|
||||
if (queue instanceof AdaptiveLifoCoDelCallQueue) {
|
||||
((AdaptiveLifoCoDelCallQueue) queue).updateTunables(codelTargetDelay,
|
||||
codelInterval, codelLifoThreshold);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -134,10 +172,18 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
|
|||
this.highPriorityLevel = highPriorityLevel;
|
||||
this.abortable = server;
|
||||
|
||||
String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
|
||||
String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY,
|
||||
CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
|
||||
float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
|
||||
float callqScanShare = conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
|
||||
|
||||
int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY,
|
||||
CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY);
|
||||
int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL,
|
||||
CALL_QUEUE_CODEL_DEFAULT_INTERVAL);
|
||||
double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD,
|
||||
CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD);
|
||||
|
||||
float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0);
|
||||
int numCallQueues = Math.max(1, (int)Math.round(handlerCount * callQueuesHandlersFactor));
|
||||
|
||||
|
@ -150,6 +196,13 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
|
|||
callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues,
|
||||
callqReadShare, callqScanShare, maxQueueLength, conf, abortable,
|
||||
BoundedPriorityBlockingQueue.class, callPriority);
|
||||
} else if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) {
|
||||
Object[] callQueueInitArgs = {maxQueueLength, codelTargetDelay, codelInterval,
|
||||
codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches};
|
||||
callExecutor = new RWQueueRpcExecutor("B.default", handlerCount,
|
||||
numCallQueues, callqReadShare, callqScanShare,
|
||||
AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs,
|
||||
AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs);
|
||||
} else {
|
||||
callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues,
|
||||
callqReadShare, callqScanShare, maxQueueLength, conf, abortable);
|
||||
|
@ -160,6 +213,11 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
|
|||
CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
|
||||
callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues,
|
||||
conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
|
||||
} else if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) {
|
||||
callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues,
|
||||
conf, abortable, AdaptiveLifoCoDelCallQueue.class, maxQueueLength,
|
||||
codelTargetDelay, codelInterval, codelLifoThreshold,
|
||||
numGeneralCallsDropped, numLifoModeSwitches);
|
||||
} else {
|
||||
callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount,
|
||||
numCallQueues, maxQueueLength, conf, abortable);
|
||||
|
@ -239,5 +297,15 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
|
|||
(priorityExecutor == null ? 0 : priorityExecutor.getActiveHandlerCount()) +
|
||||
(replicationExecutor == null ? 0 : replicationExecutor.getActiveHandlerCount());
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getNumGeneralCallsDropped() {
|
||||
return numGeneralCallsDropped.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getNumLifoModeSwitches() {
|
||||
return numLifoModeSwitches.get();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -48,4 +48,14 @@ public class MetricsHBaseServerWrapperStub implements MetricsHBaseServerWrapper{
|
|||
public int getActiveRpcHandlerCount() {
|
||||
return 106;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getNumGeneralCallsDropped() {
|
||||
return 3;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getNumLifoModeSwitches() {
|
||||
return 5;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -362,4 +362,67 @@ public class TestSimpleRpcScheduler {
|
|||
scheduler.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCoDelScheduling() throws Exception {
|
||||
Configuration schedConf = HBaseConfiguration.create();
|
||||
|
||||
schedConf.set(SimpleRpcScheduler.CALL_QUEUE_TYPE_CONF_KEY,
|
||||
SimpleRpcScheduler.CALL_QUEUE_TYPE_CODEL_CONF_VALUE);
|
||||
|
||||
PriorityFunction priority = mock(PriorityFunction.class);
|
||||
when(priority.getPriority(any(RPCProtos.RequestHeader.class), any(Message.class),
|
||||
any(User.class))).thenReturn(HConstants.NORMAL_QOS);
|
||||
SimpleRpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 1, 1, 1, priority,
|
||||
HConstants.QOS_THRESHOLD);
|
||||
try {
|
||||
scheduler.start();
|
||||
|
||||
// calls faster than min delay
|
||||
for (int i = 0; i < 100; i++) {
|
||||
CallRunner cr = getMockedCallRunner();
|
||||
Thread.sleep(5);
|
||||
scheduler.dispatch(cr);
|
||||
}
|
||||
Thread.sleep(100); // make sure fast calls are handled
|
||||
assertEquals("None of these calls should have been discarded", 0,
|
||||
scheduler.getNumGeneralCallsDropped());
|
||||
|
||||
// calls slower than min delay, but not individually slow enough to be dropped
|
||||
for (int i = 0; i < 20; i++) {
|
||||
CallRunner cr = getMockedCallRunner();
|
||||
Thread.sleep(6);
|
||||
scheduler.dispatch(cr);
|
||||
}
|
||||
|
||||
Thread.sleep(100); // make sure somewhat slow calls are handled
|
||||
assertEquals("None of these calls should have been discarded", 0,
|
||||
scheduler.getNumGeneralCallsDropped());
|
||||
|
||||
// now slow calls and the ones to be dropped
|
||||
for (int i = 0; i < 20; i++) {
|
||||
CallRunner cr = getMockedCallRunner();
|
||||
Thread.sleep(12);
|
||||
scheduler.dispatch(cr);
|
||||
}
|
||||
|
||||
Thread.sleep(100); // make sure somewhat slow calls are handled
|
||||
assertTrue("There should have been at least 12 calls dropped",
|
||||
scheduler.getNumGeneralCallsDropped() > 12);
|
||||
} finally {
|
||||
scheduler.stop();
|
||||
}
|
||||
}
|
||||
|
||||
private CallRunner getMockedCallRunner() throws IOException {
|
||||
CallRunner putCallTask = mock(CallRunner.class);
|
||||
RpcServer.Call putCall = mock(RpcServer.Call.class);
|
||||
putCall.param = RequestConverter.buildMutateRequest(
|
||||
Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
|
||||
RPCProtos.RequestHeader putHead = RPCProtos.RequestHeader.newBuilder().setMethodName("mutate").build();
|
||||
when(putCallTask.getCall()).thenReturn(putCall);
|
||||
when(putCall.getHeader()).thenReturn(putHead);
|
||||
putCall.timestamp = System.currentTimeMillis();
|
||||
return putCallTask;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue