HBASE-15136 Explore different queuing behaviors while busy

This commit is contained in:
Mikhail Antonov 2016-02-24 20:40:44 -08:00
parent 13773254c8
commit 04a3b27330
12 changed files with 534 additions and 2 deletions

View File

@ -85,6 +85,7 @@ public class ReflectionUtils {
match = (!ctorParamTypes[i].isPrimitive()) ? ctorParamTypes[i].isAssignableFrom(paramType) :
((int.class.equals(ctorParamTypes[i]) && Integer.class.equals(paramType)) ||
(long.class.equals(ctorParamTypes[i]) && Long.class.equals(paramType)) ||
(double.class.equals(ctorParamTypes[i]) && Double.class.equals(paramType)) ||
(char.class.equals(ctorParamTypes[i]) && Character.class.equals(paramType)) ||
(short.class.equals(ctorParamTypes[i]) && Short.class.equals(paramType)) ||
(boolean.class.equals(ctorParamTypes[i]) && Boolean.class.equals(paramType)) ||

View File

@ -64,6 +64,12 @@ public interface MetricsHBaseServerSource extends BaseSource {
String NUM_OPEN_CONNECTIONS_DESC = "Number of open connections.";
String NUM_ACTIVE_HANDLER_NAME = "numActiveHandler";
String NUM_ACTIVE_HANDLER_DESC = "Number of active rpc handlers.";
String NUM_GENERAL_CALLS_DROPPED_NAME = "numGeneralCallsDropped";
String NUM_GENERAL_CALLS_DROPPED_DESC = "Total number of calls in general queue which " +
"were dropped by CoDel RPC executor";
String NUM_LIFO_MODE_SWITCHES_NAME = "numLifoModeSwitches";
String NUM_LIFO_MODE_SWITCHES_DESC = "Total number of calls in general queue which " +
"were served from the tail of the queue";
String EXCEPTIONS_NAME="exceptions";
String EXCEPTIONS_DESC="Exceptions caused by requests";

View File

@ -26,4 +26,6 @@ public interface MetricsHBaseServerWrapper {
int getPriorityQueueLength();
int getNumOpenConnections();
int getActiveRpcHandlerCount();
long getNumGeneralCallsDropped();
long getNumLifoModeSwitches();
}

View File

@ -219,7 +219,11 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
.addGauge(Interns.info(NUM_OPEN_CONNECTIONS_NAME,
NUM_OPEN_CONNECTIONS_DESC), wrapper.getNumOpenConnections())
.addGauge(Interns.info(NUM_ACTIVE_HANDLER_NAME,
NUM_ACTIVE_HANDLER_DESC), wrapper.getActiveRpcHandlerCount());
NUM_ACTIVE_HANDLER_DESC), wrapper.getActiveRpcHandlerCount())
.addCounter(Interns.info(NUM_GENERAL_CALLS_DROPPED_NAME,
NUM_GENERAL_CALLS_DROPPED_DESC), wrapper.getNumGeneralCallsDropped())
.addCounter(Interns.info(NUM_LIFO_MODE_SWITCHES_NAME,
NUM_LIFO_MODE_SWITCHES_DESC), wrapper.getNumLifoModeSwitches());
}
metricsRegistry.snapshot(mrb, all);

View File

@ -0,0 +1,329 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* Adaptive LIFO blocking queue utilizing CoDel algorithm to prevent queue overloading.
*
* Implementing {@link BlockingQueue} interface to be compatible with {@link RpcExecutor}.
*
* Currently uses milliseconds internally, need to look into whether we should use
* nanoseconds for timeInterval and minDelay.
*
* @see <a href="http://queue.acm.org/detail.cfm?id=2839461">Fail at Scale paper</a>
*
* @see <a href="https://github.com/facebook/wangle/blob/master/wangle/concurrent/Codel.cpp">
* CoDel version for generic job queues in Wangle library</a>
*/
@InterfaceAudience.Private
public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
// backing queue
private LinkedBlockingDeque<CallRunner> queue;
// so we can calculate actual threshold to switch to LIFO under load
private int maxCapacity;
// metrics (shared across all queues)
private AtomicLong numGeneralCallsDropped;
private AtomicLong numLifoModeSwitches;
/**
* Lock held by take ops, all other locks are inside queue impl.
*
* NOTE: We want to have this lock so that in case when there're lot of already expired
* calls in the call queue a handler thread calling take() can just grab lock once and
* then fast-forward through the expired calls to the first non-expired without having
* to contend for locks on every element in underlying queue.
*/
private final ReentrantLock lock = new ReentrantLock();
// Both are in milliseconds
private volatile int codelTargetDelay;
private volatile int codelInterval;
// if queue if full more than that percent, we switch to LIFO mode.
// Values are in the range of 0.7, 0.8 etc (0-1.0).
private volatile double lifoThreshold;
// minimal delay observed during the interval
private volatile long minDelay;
// the moment when current interval ends
private volatile long intervalTime = System.currentTimeMillis();
// switch to ensure only one threads does interval cutoffs
private AtomicBoolean resetDelay = new AtomicBoolean(true);
// if we're in this mode, "long" calls are getting dropped
private volatile boolean isOverloaded;
public AdaptiveLifoCoDelCallQueue(int capacity, int targetDelay, int interval,
double lifoThreshold, AtomicLong numGeneralCallsDropped, AtomicLong numLifoModeSwitches) {
this.maxCapacity = capacity;
this.queue = new LinkedBlockingDeque<>(capacity);
this.codelTargetDelay = targetDelay;
this.codelInterval = interval;
this.lifoThreshold = lifoThreshold;
this.numGeneralCallsDropped = numGeneralCallsDropped;
this.numLifoModeSwitches = numLifoModeSwitches;
}
/**
* Update tunables.
*
* @param newCodelTargetDelay new CoDel target delay
* @param newCodelInterval new CoDel interval
* @param newLifoThreshold new Adaptive Lifo threshold
*/
public void updateTunables(int newCodelTargetDelay, int newCodelInterval,
double newLifoThreshold) {
this.codelTargetDelay = newCodelTargetDelay;
this.codelInterval = newCodelInterval;
this.lifoThreshold = newLifoThreshold;
}
/**
* Behaves as {@link LinkedBlockingQueue#take()}, except it will silently
* skip all calls which it thinks should be dropped.
*
* @return the head of this queue
* @throws InterruptedException if interrupted while waiting
*/
@Override
public CallRunner take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
CallRunner cr;
while(true) {
if (((double) queue.size() / this.maxCapacity) > lifoThreshold) {
numLifoModeSwitches.incrementAndGet();
cr = queue.takeLast();
} else {
cr = queue.takeFirst();
}
if (needToDrop(cr)) {
numGeneralCallsDropped.incrementAndGet();
} else {
return cr;
}
}
} finally {
lock.unlock();
}
}
/**
* @param callRunner to validate
* @return true if this call needs to be skipped based on call timestamp
* and internal queue state (deemed overloaded).
*/
private boolean needToDrop(CallRunner callRunner) {
long now = System.currentTimeMillis();
long callDelay = now - callRunner.getCall().timestamp;
long localMinDelay = this.minDelay;
if (now > intervalTime && !resetDelay.getAndSet(true)) {
intervalTime = now + codelInterval;
if (localMinDelay > codelTargetDelay) {
isOverloaded = true;
} else {
isOverloaded = false;
}
}
if (resetDelay.getAndSet(false)) {
minDelay = callDelay;
return false;
} else if (callDelay < localMinDelay) {
minDelay = callDelay;
}
if (isOverloaded && callDelay > 2 * codelTargetDelay) {
return true;
} else {
return false;
}
}
// Generic BlockingQueue methods we support
@Override
public boolean offer(CallRunner callRunner) {
return queue.offer(callRunner);
}
@Override
public int size() {
return queue.size();
}
@Override
public String toString() {
return queue.toString();
}
// This class does NOT provide generic purpose BlockingQueue implementation,
// so to prevent misuse all other methods throw UnsupportedOperationException.
@Override
public CallRunner poll(long timeout, TimeUnit unit) throws InterruptedException {
throw new UnsupportedOperationException("This class doesn't support anything,"
+ " but take() and offer() methods");
}
@Override
public CallRunner poll() {
throw new UnsupportedOperationException("This class doesn't support anything,"
+ " but take() and offer() methods");
}
@Override
public CallRunner peek() {
throw new UnsupportedOperationException("This class doesn't support anything,"
+ " but take() and offer() methods");
}
@Override
public boolean remove(Object o) {
throw new UnsupportedOperationException("This class doesn't support anything,"
+ " but take() and offer() methods");
}
@Override
public boolean contains(Object o) {
throw new UnsupportedOperationException("This class doesn't support anything,"
+ " but take() and offer() methods");
}
@Override
public Object[] toArray() {
throw new UnsupportedOperationException("This class doesn't support anything,"
+ " but take() and offer() methods");
}
@Override
public <T> T[] toArray(T[] a) {
throw new UnsupportedOperationException("This class doesn't support anything,"
+ " but take() and offer() methods");
}
@Override
public void clear() {
throw new UnsupportedOperationException("This class doesn't support anything,"
+ " but take() and offer() methods");
}
@Override
public int drainTo(Collection<? super CallRunner> c) {
throw new UnsupportedOperationException("This class doesn't support anything,"
+ " but take() and offer() methods");
}
@Override
public int drainTo(Collection<? super CallRunner> c, int maxElements) {
throw new UnsupportedOperationException("This class doesn't support anything,"
+ " but take() and offer() methods");
}
@Override
public Iterator<CallRunner> iterator() {
throw new UnsupportedOperationException("This class doesn't support anything,"
+ " but take() and offer() methods");
}
@Override
public boolean add(CallRunner callRunner) {
throw new UnsupportedOperationException("This class doesn't support anything,"
+ " but take() and offer() methods");
}
@Override
public CallRunner remove() {
throw new UnsupportedOperationException("This class doesn't support anything,"
+ " but take() and offer() methods");
}
@Override
public CallRunner element() {
throw new UnsupportedOperationException("This class doesn't support anything,"
+ " but take() and offer() methods");
}
@Override
public boolean addAll(Collection<? extends CallRunner> c) {
throw new UnsupportedOperationException("This class doesn't support anything,"
+ " but take() and offer() methods");
}
@Override
public boolean isEmpty() {
throw new UnsupportedOperationException("This class doesn't support anything,"
+ " but take() and offer() methods");
}
@Override
public boolean containsAll(Collection<?> c) {
throw new UnsupportedOperationException("This class doesn't support anything,"
+ " but take() and offer() methods");
}
@Override
public boolean removeAll(Collection<?> c) {
throw new UnsupportedOperationException("This class doesn't support anything,"
+ " but take() and offer() methods");
}
@Override
public boolean retainAll(Collection<?> c) {
throw new UnsupportedOperationException("This class doesn't support anything,"
+ " but take() and offer() methods");
}
@Override
public int remainingCapacity() {
throw new UnsupportedOperationException("This class doesn't support anything,"
+ " but take() and offer() methods");
}
@Override
public void put(CallRunner callRunner) throws InterruptedException {
throw new UnsupportedOperationException("This class doesn't support anything,"
+ " but take() and offer() methods");
}
@Override
public boolean offer(CallRunner callRunner, long timeout, TimeUnit unit)
throws InterruptedException {
throw new UnsupportedOperationException("This class doesn't support anything,"
+ " but take() and offer() methods");
}
}

View File

@ -104,4 +104,14 @@ public class FifoRpcScheduler extends RpcScheduler {
public int getActiveRpcHandlerCount() {
return executor.getActiveCount();
}
@Override
public long getNumGeneralCallsDropped() {
return 0;
}
@Override
public long getNumLifoModeSwitches() {
return 0;
}
}

View File

@ -78,4 +78,20 @@ public class MetricsHBaseServerWrapperImpl implements MetricsHBaseServerWrapper
}
return server.getScheduler().getActiveRpcHandlerCount();
}
@Override
public long getNumGeneralCallsDropped() {
if (!isServerStarted() || this.server.getScheduler() == null) {
return 0;
}
return server.getScheduler().getNumGeneralCallsDropped();
}
@Override
public long getNumLifoModeSwitches() {
if (!isServerStarted() || this.server.getScheduler() == null) {
return 0;
}
return server.getScheduler().getNumLifoModeSwitches();
}
}

View File

@ -100,6 +100,16 @@ public class RWQueueRpcExecutor extends RpcExecutor {
readQueueClass, ArrayUtils.addAll(new Object[] {maxQueueLength}, readQueueInitArgs));
}
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final float readShare, final float scanShare,
final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
final Class<? extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
this(name, calcNumWriters(handlerCount, readShare), calcNumReaders(handlerCount, readShare),
calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare), scanShare,
writeQueueClass, writeQueueInitArgs,
readQueueClass, readQueueInitArgs);
}
public RWQueueRpcExecutor(final String name, final int writeHandlers, final int readHandlers,
final int numWriteQueues, final int numReadQueues,
final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,

View File

@ -71,4 +71,17 @@ public abstract class RpcScheduler {
/** Retrieves the number of active handler. */
public abstract int getActiveRpcHandlerCount();
/**
* If CoDel-based RPC executors are used, retrieves the number of Calls that were dropped
* from general queue because RPC executor is under high load; returns 0 otherwise.
*/
public abstract long getNumGeneralCallsDropped();
/**
* If CoDel-based RPC executors are used, retrieves the number of Calls that were
* picked from the tail of the queue (indicating adaptive LIFO mode, when
* in the period of overloade we serve last requests first); returns 0 otherwise.
*/
public abstract long getNumLifoModeSwitches();
}

View File

@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.ipc;
import java.util.Comparator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -49,6 +51,7 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
/** If set to 'deadline', uses a priority queue and deprioritize long-running scans */
public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type";
public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel";
public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline";
public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo";
@ -56,6 +59,21 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY
= "hbase.ipc.server.queue.max.call.delay";
// These 3 are only used by Codel executor
public static final String CALL_QUEUE_CODEL_TARGET_DELAY =
"hbase.ipc.server.callqueue.codel.target.delay";
public static final String CALL_QUEUE_CODEL_INTERVAL =
"hbase.ipc.server.callqueue.codel.interval";
public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD =
"hbase.ipc.server.callqueue.codel.lifo.threshold";
public static final int CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY = 5;
public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL = 100;
public static final double CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD = 0.8;
private AtomicLong numGeneralCallsDropped = new AtomicLong();
private AtomicLong numLifoModeSwitches = new AtomicLong();
/**
* Resize call queues;
* @param conf new configuration
@ -69,6 +87,26 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
if (replicationExecutor != null) {
replicationExecutor.resizeQueues(conf);
}
String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY,
CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) {
// update CoDel Scheduler tunables
int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY,
CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY);
int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL,
CALL_QUEUE_CODEL_DEFAULT_INTERVAL);
double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD,
CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD);
for (BlockingQueue<CallRunner> queue : callExecutor.getQueues()) {
if (queue instanceof AdaptiveLifoCoDelCallQueue) {
((AdaptiveLifoCoDelCallQueue) queue).updateTunables(codelTargetDelay,
codelInterval, codelLifoThreshold);
}
}
}
}
/**
@ -134,10 +172,18 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
this.highPriorityLevel = highPriorityLevel;
this.abortable = server;
String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY,
CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
float callqScanShare = conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY,
CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY);
int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL,
CALL_QUEUE_CODEL_DEFAULT_INTERVAL);
double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD,
CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD);
float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0);
int numCallQueues = Math.max(1, (int)Math.round(handlerCount * callQueuesHandlersFactor));
@ -150,6 +196,13 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues,
callqReadShare, callqScanShare, maxQueueLength, conf, abortable,
BoundedPriorityBlockingQueue.class, callPriority);
} else if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) {
Object[] callQueueInitArgs = {maxQueueLength, codelTargetDelay, codelInterval,
codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches};
callExecutor = new RWQueueRpcExecutor("B.default", handlerCount,
numCallQueues, callqReadShare, callqScanShare,
AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs,
AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs);
} else {
callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues,
callqReadShare, callqScanShare, maxQueueLength, conf, abortable);
@ -160,6 +213,11 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues,
conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
} else if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) {
callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues,
conf, abortable, AdaptiveLifoCoDelCallQueue.class, maxQueueLength,
codelTargetDelay, codelInterval, codelLifoThreshold,
numGeneralCallsDropped, numLifoModeSwitches);
} else {
callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount,
numCallQueues, maxQueueLength, conf, abortable);
@ -239,5 +297,15 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
(priorityExecutor == null ? 0 : priorityExecutor.getActiveHandlerCount()) +
(replicationExecutor == null ? 0 : replicationExecutor.getActiveHandlerCount());
}
@Override
public long getNumGeneralCallsDropped() {
return numGeneralCallsDropped.get();
}
@Override
public long getNumLifoModeSwitches() {
return numLifoModeSwitches.get();
}
}

View File

@ -48,4 +48,14 @@ public class MetricsHBaseServerWrapperStub implements MetricsHBaseServerWrapper{
public int getActiveRpcHandlerCount() {
return 106;
}
@Override
public long getNumGeneralCallsDropped() {
return 3;
}
@Override
public long getNumLifoModeSwitches() {
return 5;
}
}

View File

@ -356,4 +356,67 @@ public class TestSimpleRpcScheduler {
scheduler.stop();
}
}
@Test
public void testCoDelScheduling() throws Exception {
Configuration schedConf = HBaseConfiguration.create();
schedConf.set(SimpleRpcScheduler.CALL_QUEUE_TYPE_CONF_KEY,
SimpleRpcScheduler.CALL_QUEUE_TYPE_CODEL_CONF_VALUE);
PriorityFunction priority = mock(PriorityFunction.class);
when(priority.getPriority(any(RPCProtos.RequestHeader.class), any(Message.class),
any(User.class))).thenReturn(HConstants.NORMAL_QOS);
SimpleRpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 1, 1, 1, priority,
HConstants.QOS_THRESHOLD);
try {
scheduler.start();
// calls faster than min delay
for (int i = 0; i < 100; i++) {
CallRunner cr = getMockedCallRunner();
Thread.sleep(5);
scheduler.dispatch(cr);
}
Thread.sleep(100); // make sure fast calls are handled
assertEquals("None of these calls should have been discarded", 0,
scheduler.getNumGeneralCallsDropped());
// calls slower than min delay, but not individually slow enough to be dropped
for (int i = 0; i < 20; i++) {
CallRunner cr = getMockedCallRunner();
Thread.sleep(6);
scheduler.dispatch(cr);
}
Thread.sleep(100); // make sure somewhat slow calls are handled
assertEquals("None of these calls should have been discarded", 0,
scheduler.getNumGeneralCallsDropped());
// now slow calls and the ones to be dropped
for (int i = 0; i < 20; i++) {
CallRunner cr = getMockedCallRunner();
Thread.sleep(12);
scheduler.dispatch(cr);
}
Thread.sleep(100); // make sure somewhat slow calls are handled
assertTrue("There should have been at least 12 calls dropped",
scheduler.getNumGeneralCallsDropped() > 12);
} finally {
scheduler.stop();
}
}
private CallRunner getMockedCallRunner() throws IOException {
CallRunner putCallTask = mock(CallRunner.class);
RpcServer.Call putCall = mock(RpcServer.Call.class);
putCall.param = RequestConverter.buildMutateRequest(
Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
RPCProtos.RequestHeader putHead = RPCProtos.RequestHeader.newBuilder().setMethodName("mutate").build();
when(putCallTask.getCall()).thenReturn(putCall);
when(putCall.getHeader()).thenReturn(putHead);
putCall.timestamp = System.currentTimeMillis();
return putCallTask;
}
}