HBASE-10993 Deprioritize long-running scanners

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1593137 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
mbertozzi 2014-05-07 21:05:04 +00:00
parent 4a7cc45f8a
commit de98955c16
9 changed files with 769 additions and 3 deletions

View File

@ -25,9 +25,20 @@ import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
*/ */
public interface PriorityFunction { public interface PriorityFunction {
/** /**
* Returns the 'priority type' of the specified request.
* The returned value is mainly used to select the dispatch queue.
* @param header * @param header
* @param param * @param param
* @return Priority of this request. * @return Priority of this request.
*/ */
int getPriority(RequestHeader header, Message param); int getPriority(RequestHeader header, Message param);
/**
* Returns the deadline of the specified request.
* The returned value is used to sort the dispatch queue.
* @param header
* @param param
* @return Deadline of this request. 0 now, otherwise msec of 'delay'
*/
long getDeadline(RequestHeader header, Message param);
} }

View File

@ -313,6 +313,10 @@ public class RpcServer implements RpcServerInterface {
" connection: " + connection.toString(); " connection: " + connection.toString();
} }
protected RequestHeader getHeader() {
return this.header;
}
/* /*
* Short string representation without param info because param itself could be huge depends on * Short string representation without param info because param itself could be huge depends on
* the payload of a command * the payload of a command

View File

@ -17,14 +17,18 @@
*/ */
package org.apache.hadoop.hbase.ipc; package org.apache.hadoop.hbase.ipc;
import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
import com.google.common.base.Strings; import com.google.common.base.Strings;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -36,6 +40,45 @@ import com.google.common.collect.Lists;
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class SimpleRpcScheduler implements RpcScheduler { public class SimpleRpcScheduler implements RpcScheduler {
public static final Log LOG = LogFactory.getLog(SimpleRpcScheduler.class);
/** If set to true, uses a priority queue and deprioritize long-running scans */
public static final String CALL_QUEUE_TYPE_CONF_KEY = "ipc.server.callqueue.type";
public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline";
public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo";
/** max delay in msec used to bound the deprioritized requests */
public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY = "ipc.server.queue.max.call.delay";
/**
* Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY is set to true.
* It uses the calculated "deadline" e.g. to deprioritize long-running job
*
* If multiple requests have the same deadline BoundedPriorityBlockingQueue will order them in
* FIFO (first-in-first-out) manner.
*/
private static class CallPriorityComparator implements Comparator<CallRunner> {
private final static int DEFAULT_MAX_CALL_DELAY = 5000;
private final PriorityFunction priority;
private final int maxDelay;
public CallPriorityComparator(final Configuration conf, final PriorityFunction priority) {
this.priority = priority;
this.maxDelay = conf.getInt(QUEUE_MAX_CALL_DELAY_CONF_KEY, DEFAULT_MAX_CALL_DELAY);
}
@Override
public int compare(CallRunner a, CallRunner b) {
RpcServer.Call callA = a.getCall();
RpcServer.Call callB = b.getCall();
long deadlineA = priority.getDeadline(callA.getHeader(), callA.param);
long deadlineB = priority.getDeadline(callB.getHeader(), callB.param);
deadlineA = callA.timestamp + Math.min(deadlineA, maxDelay);
deadlineB = callB.timestamp + Math.min(deadlineB, maxDelay);
return (int)(deadlineA - deadlineB);
}
}
private int port; private int port;
private final int handlerCount; private final int handlerCount;
@ -73,7 +116,15 @@ public class SimpleRpcScheduler implements RpcScheduler {
this.replicationHandlerCount = replicationHandlerCount; this.replicationHandlerCount = replicationHandlerCount;
this.priority = priority; this.priority = priority;
this.highPriorityLevel = highPriorityLevel; this.highPriorityLevel = highPriorityLevel;
String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
LOG.debug("Using " + callQueueType + " as user call queue");
if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
this.callQueue = new BoundedPriorityBlockingQueue<CallRunner>(maxQueueLength,
new CallPriorityComparator(conf, this.priority));
} else {
this.callQueue = new LinkedBlockingQueue<CallRunner>(maxQueueLength); this.callQueue = new LinkedBlockingQueue<CallRunner>(maxQueueLength);
}
this.priorityCallQueue = priorityHandlerCount > 0 this.priorityCallQueue = priorityHandlerCount > 0
? new LinkedBlockingQueue<CallRunner>(maxQueueLength) ? new LinkedBlockingQueue<CallRunner>(maxQueueLength)
: null; : null;
@ -128,7 +179,7 @@ public class SimpleRpcScheduler implements RpcScheduler {
@Override @Override
public void dispatch(CallRunner callTask) throws InterruptedException { public void dispatch(CallRunner callTask) throws InterruptedException {
RpcServer.Call call = callTask.getCall(); RpcServer.Call call = callTask.getCall();
int level = priority.getPriority(call.header, call.param); int level = priority.getPriority(call.getHeader(), call.param);
if (priorityCallQueue != null && level > highPriorityLevel) { if (priorityCallQueue != null && level > highPriorityLevel) {
priorityCallQueue.put(callTask); priorityCallQueue.put(callTask);
} else if (replicationQueue != null && level == HConstants.REPLICATION_QOS) { } else if (replicationQueue != null && level == HConstants.REPLICATION_QOS) {

View File

@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ipc.PriorityFunction; import org.apache.hadoop.hbase.ipc.PriorityFunction;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
@ -68,6 +69,10 @@ import com.google.protobuf.TextFormat;
class AnnotationReadingPriorityFunction implements PriorityFunction { class AnnotationReadingPriorityFunction implements PriorityFunction {
public static final Log LOG = public static final Log LOG =
LogFactory.getLog(AnnotationReadingPriorityFunction.class.getName()); LogFactory.getLog(AnnotationReadingPriorityFunction.class.getName());
/** Used to control the scan delay, currently sqrt(numNextCall * weight) */
public static final String SCAN_VTIME_WEIGHT_CONF_KEY = "ipc.server.scan.vtime.weight";
private final Map<String, Integer> annotatedQos; private final Map<String, Integer> annotatedQos;
//We need to mock the regionserver instance for some unit tests (set via //We need to mock the regionserver instance for some unit tests (set via
//setRegionServer method. //setRegionServer method.
@ -91,6 +96,8 @@ class AnnotationReadingPriorityFunction implements PriorityFunction {
private final Map<String, Map<Class<? extends Message>, Method>> methodMap = private final Map<String, Map<Class<? extends Message>, Method>> methodMap =
new HashMap<String, Map<Class<? extends Message>, Method>>(); new HashMap<String, Map<Class<? extends Message>, Method>>();
private final float scanVirtualTimeWeight;
AnnotationReadingPriorityFunction(final RSRpcServices rpcServices) { AnnotationReadingPriorityFunction(final RSRpcServices rpcServices) {
Map<String, Integer> qosMap = new HashMap<String, Integer>(); Map<String, Integer> qosMap = new HashMap<String, Integer>();
for (Method m : RSRpcServices.class.getMethods()) { for (Method m : RSRpcServices.class.getMethods()) {
@ -120,6 +127,9 @@ class AnnotationReadingPriorityFunction implements PriorityFunction {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
Configuration conf = rpcServices.getConfiguration();
scanVirtualTimeWeight = conf.getFloat(SCAN_VTIME_WEIGHT_CONF_KEY, 1.0f);
} }
private String capitalize(final String s) { private String capitalize(final String s) {
@ -128,6 +138,14 @@ class AnnotationReadingPriorityFunction implements PriorityFunction {
return strBuilder.toString(); return strBuilder.toString();
} }
/**
* Returns a 'priority' based on the request type.
*
* Currently the returned priority is used for queue selection.
* See the SimpleRpcScheduler as example. It maintains a queue per 'priory type'
* HIGH_QOS (meta requests), REPLICATION_QOS (replication requests),
* NORMAL_QOS (user requests).
*/
@Override @Override
public int getPriority(RequestHeader header, Message param) { public int getPriority(RequestHeader header, Message param) {
String methodName = header.getMethodName(); String methodName = header.getMethodName();
@ -188,6 +206,31 @@ class AnnotationReadingPriorityFunction implements PriorityFunction {
return HConstants.NORMAL_QOS; return HConstants.NORMAL_QOS;
} }
/**
* Based on the request content, returns the deadline of the request.
*
* @param header
* @param param
* @return Deadline of this request. 0 now, otherwise msec of 'delay'
*/
@Override
public long getDeadline(RequestHeader header, Message param) {
String methodName = header.getMethodName();
if (methodName.equalsIgnoreCase("scan")) {
ScanRequest request = (ScanRequest)param;
if (!request.hasScannerId()) {
return 0;
}
// get the 'virtual time' of the scanner, and applies sqrt() to get a
// nice curve for the delay. More a scanner is used the less priority it gets.
// The weight is used to have more control on the delay.
long vtime = rpcServices.getScannerVirtualTime(request.getScannerId());
return Math.round(Math.sqrt(vtime * scanVirtualTimeWeight));
}
return 0;
}
@VisibleForTesting @VisibleForTesting
void setRegionServer(final HRegionServer hrs) { void setRegionServer(final HRegionServer hrs) {
this.rpcServices = hrs.getRSRpcServices(); this.rpcServices = hrs.getRSRpcServices();

View File

@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
@ -680,6 +681,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} }
public RSRpcServices(HRegionServer rs) throws IOException { public RSRpcServices(HRegionServer rs) throws IOException {
regionServer = rs;
RpcSchedulerFactory rpcSchedulerFactory; RpcSchedulerFactory rpcSchedulerFactory;
try { try {
Class<?> rpcSchedulerFactoryClass = rs.conf.getClass( Class<?> rpcSchedulerFactoryClass = rs.conf.getClass(
@ -722,7 +725,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// Set our address. // Set our address.
isa = rpcServer.getListenerAddress(); isa = rpcServer.getListenerAddress();
rpcServer.setErrorHandler(this); rpcServer.setErrorHandler(this);
regionServer = rs;
rs.setName(name); rs.setName(name);
} }
@ -735,6 +737,19 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
return null; return null;
} }
/**
* Get the vtime associated with the scanner.
* Currently the vtime is the number of "next" calls.
*/
long getScannerVirtualTime(long scannerId) {
String scannerIdString = Long.toString(scannerId);
RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
if (scannerHolder != null) {
return scannerHolder.nextCallSeq;
}
return 0L;
}
long addScanner(RegionScanner s, HRegion r) throws LeaseStillHeldException { long addScanner(RegionScanner s, HRegion r) throws LeaseStillHeldException {
long scannerId = this.scannerIdGen.incrementAndGet(); long scannerId = this.scannerIdGen.incrementAndGet();
String scannerName = String.valueOf(scannerId); String scannerName = String.valueOf(scannerId);
@ -766,6 +781,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
return priority; return priority;
} }
Configuration getConfiguration() {
return regionServer.getConfiguration();
}
void start() { void start() {
rpcServer.start(); rpcServer.start();
} }
@ -821,6 +840,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
return priority.getPriority(header, param); return priority.getPriority(header, param);
} }
@Override
public long getDeadline(RequestHeader header, Message param) {
return priority.getDeadline(header, param);
}
/* /*
* Check if an OOME and, if so, abort immediately to avoid creating more objects. * Check if an OOME and, if so, abort immediately to avoid creating more objects.
* *

View File

@ -0,0 +1,330 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.AbstractQueue;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* A generic bounded blocking Priority-Queue.
*
* The elements of the priority queue are ordered according to the Comparator
* provided at queue construction time.
*
* If multiple elements have the same priority this queue orders them in
* FIFO (first-in-first-out) manner.
* The head of this queue is the least element with respect to the specified
* ordering. If multiple elements are tied for least value, the head is the
* first one inserted.
* The queue retrieval operations poll, remove, peek, and element access the
* element at the head of the queue.
*/
@InterfaceAudience.Private
@InterfaceStability.Stable
public class BoundedPriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {
private static class PriorityQueue<E> {
private final Comparator<? super E> comparator;
private final E[] objects;
private int head = 0;
private int tail = 0;
@SuppressWarnings("unchecked")
public PriorityQueue(int capacity, Comparator<? super E> comparator) {
this.objects = (E[])new Object[capacity];
this.comparator = comparator;
}
public void add(E elem) {
if (tail == objects.length) {
// shift down |-----AAAAAAA|
tail -= head;
System.arraycopy(objects, head, objects, 0, tail);
head = 0;
}
if (tail == head || comparator.compare(objects[tail - 1], elem) <= 0) {
// Append
objects[tail++] = elem;
} else if (head > 0 && comparator.compare(objects[head], elem) > 0) {
// Prepend
objects[--head] = elem;
} else {
// Insert in the middle
int index = upperBound(head, tail - 1, elem);
System.arraycopy(objects, index, objects, index + 1, tail - index);
objects[index] = elem;
tail++;
}
}
public E peek() {
return (head != tail) ? objects[head] : null;
}
public E poll() {
E elem = objects[head];
head = (head + 1) % objects.length;
if (head == 0) tail = 0;
return elem;
}
public int size() {
return tail - head;
}
public Comparator<? super E> comparator() {
return this.comparator;
}
public boolean contains(Object o) {
for (int i = head; i < tail; ++i) {
if (objects[i] == o) {
return true;
}
}
return false;
}
public int remainingCapacity() {
return this.objects.length - (tail - head);
}
private int upperBound(int start, int end, E key) {
while (start < end) {
int mid = (start + end) >>> 1;
E mitem = objects[mid];
int cmp = comparator.compare(mitem, key);
if (cmp > 0) {
end = mid;
} else {
start = mid + 1;
}
}
return start;
}
}
// Lock used for all operations
private final ReentrantLock lock = new ReentrantLock();
// Condition for blocking when empty
private final Condition notEmpty = lock.newCondition();
// Wait queue for waiting puts
private final Condition notFull = lock.newCondition();
private final PriorityQueue<E> queue;
/**
* Creates a PriorityQueue with the specified capacity that orders its
* elements according to the specified comparator.
* @param capacity the capacity of this queue
* @param comparator the comparator that will be used to order this priority queue
*/
public BoundedPriorityBlockingQueue(int capacity,
Comparator<? super E> comparator) {
this.queue = new PriorityQueue<E>(capacity, comparator);
}
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
lock.lock();
try {
if (queue.remainingCapacity() > 0) {
this.queue.add(e);
notEmpty.signal();
return true;
}
} finally {
lock.unlock();
}
return false;
}
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
lock.lock();
try {
while (queue.remainingCapacity() == 0) {
notFull.await();
}
this.queue.add(e);
notEmpty.signal();
} finally {
lock.unlock();
}
}
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
lock.lockInterruptibly();
try {
while (queue.remainingCapacity() == 0) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
this.queue.add(e);
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
public E take() throws InterruptedException {
E result = null;
lock.lockInterruptibly();
try {
while (queue.size() == 0) {
notEmpty.await();
}
result = queue.poll();
notFull.signal();
} finally {
lock.unlock();
}
return result;
}
public E poll() {
E result = null;
lock.lock();
try {
if (queue.size() > 0) {
result = queue.poll();
notFull.signal();
}
} finally {
lock.unlock();
}
return result;
}
public E poll(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
lock.lockInterruptibly();
E result = null;
try {
while (queue.size() == 0 && nanos > 0) {
notEmpty.awaitNanos(nanos);
}
if (queue.size() > 0) {
result = queue.poll();
}
notFull.signal();
} finally {
lock.unlock();
}
return result;
}
public E peek() {
lock.lock();
try {
return queue.peek();
} finally {
lock.unlock();
}
}
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
public Iterator<E> iterator() {
throw new UnsupportedOperationException();
}
public Comparator<? super E> comparator() {
return queue.comparator();
}
public int remainingCapacity() {
lock.lock();
try {
return queue.remainingCapacity();
} finally {
lock.unlock();
}
}
public boolean remove(Object o) {
throw new UnsupportedOperationException();
}
public boolean contains(Object o) {
lock.lock();
try {
return queue.contains(o);
} finally {
lock.unlock();
}
}
public int drainTo(Collection<? super E> c) {
return drainTo(c, Integer.MAX_VALUE);
}
public int drainTo(Collection<? super E> c, int maxElements) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
lock.lock();
try {
int n = Math.min(queue.size(), maxElements);
for (int i = 0; i < n; ++i) {
c.add(queue.poll());
}
return n;
} finally {
lock.unlock();
}
}
}

View File

@ -21,6 +21,8 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.google.protobuf.Message; import com.google.protobuf.Message;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
@ -28,6 +30,8 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.ipc.RpcServer.Call; import org.apache.hadoop.hbase.ipc.RpcServer.Call;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@ -36,12 +40,15 @@ import org.mockito.stubbing.Answer;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.timeout;
@ -50,6 +57,7 @@ import static org.mockito.Mockito.when;
@Category(SmallTests.class) @Category(SmallTests.class)
public class TestSimpleRpcScheduler { public class TestSimpleRpcScheduler {
public static final Log LOG = LogFactory.getLog(TestSimpleRpcScheduler.class);
private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() { private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() {
@Override @Override
@ -134,4 +142,115 @@ public class TestSimpleRpcScheduler {
when(task.getCall()).thenReturn(call); when(task.getCall()).thenReturn(call);
return task; return task;
} }
@Test
public void testRpcScheduler() throws Exception {
testRpcScheduler(SimpleRpcScheduler.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
testRpcScheduler(SimpleRpcScheduler.CALL_QUEUE_TYPE_FIFO_CONF_VALUE);
}
private void testRpcScheduler(final String queueType) throws Exception {
Configuration schedConf = HBaseConfiguration.create();
schedConf.set(SimpleRpcScheduler.CALL_QUEUE_TYPE_CONF_KEY, queueType);
PriorityFunction priority = mock(PriorityFunction.class);
when(priority.getPriority(any(RequestHeader.class), any(Message.class)))
.thenReturn(HConstants.NORMAL_QOS);
RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 1, 1, 1, priority,
HConstants.QOS_THRESHOLD);
try {
scheduler.start();
CallRunner smallCallTask = mock(CallRunner.class);
RpcServer.Call smallCall = mock(RpcServer.Call.class);
RequestHeader smallHead = RequestHeader.newBuilder().setCallId(1).build();
when(smallCallTask.getCall()).thenReturn(smallCall);
when(smallCall.getHeader()).thenReturn(smallHead);
CallRunner largeCallTask = mock(CallRunner.class);
RpcServer.Call largeCall = mock(RpcServer.Call.class);
RequestHeader largeHead = RequestHeader.newBuilder().setCallId(50).build();
when(largeCallTask.getCall()).thenReturn(largeCall);
when(largeCall.getHeader()).thenReturn(largeHead);
CallRunner hugeCallTask = mock(CallRunner.class);
RpcServer.Call hugeCall = mock(RpcServer.Call.class);
RequestHeader hugeHead = RequestHeader.newBuilder().setCallId(100).build();
when(hugeCallTask.getCall()).thenReturn(hugeCall);
when(hugeCall.getHeader()).thenReturn(hugeHead);
when(priority.getDeadline(eq(smallHead), any(Message.class))).thenReturn(0L);
when(priority.getDeadline(eq(largeHead), any(Message.class))).thenReturn(50L);
when(priority.getDeadline(eq(hugeHead), any(Message.class))).thenReturn(100L);
final ArrayList<Integer> work = new ArrayList<Integer>();
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) {
synchronized (work) {
work.add(10);
}
Threads.sleepWithoutInterrupt(100);
return null;
}
}).when(smallCallTask).run();
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) {
synchronized (work) {
work.add(50);
}
Threads.sleepWithoutInterrupt(100);
return null;
}
}).when(largeCallTask).run();
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) {
synchronized (work) {
work.add(100);
}
Threads.sleepWithoutInterrupt(100);
return null;
}
}).when(hugeCallTask).run();
scheduler.dispatch(smallCallTask);
scheduler.dispatch(smallCallTask);
scheduler.dispatch(smallCallTask);
scheduler.dispatch(hugeCallTask);
scheduler.dispatch(smallCallTask);
scheduler.dispatch(largeCallTask);
scheduler.dispatch(smallCallTask);
scheduler.dispatch(smallCallTask);
while (work.size() < 8) {
Threads.sleepWithoutInterrupt(100);
}
int seqSum = 0;
int totalTime = 0;
for (int i = 0; i < work.size(); ++i) {
LOG.debug("Request i=" + i + " value=" + work.get(i));
seqSum += work.get(i);
totalTime += seqSum;
}
LOG.debug("Total Time: " + totalTime);
// -> [small small small huge small large small small]
// -> NO REORDER [10 10 10 100 10 50 10 10] -> 930 (FIFO Queue)
// -> WITH REORDER [10 10 10 10 10 10 50 100] -> 530 (Deadline Queue)
if (queueType.equals(SimpleRpcScheduler.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
assertEquals(530, totalTime);
} else /* if (queueType.equals(SimpleRpcScheduler.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)) */ {
assertEquals(930, totalTime);
}
} finally {
scheduler.stop();
}
}
} }

View File

@ -17,7 +17,10 @@ package org.apache.hadoop.hbase.regionserver;
* limitations under the License. * limitations under the License.
*/ */
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.when;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
@ -36,7 +39,10 @@ import com.google.protobuf.Message;
public class TestQosFunction { public class TestQosFunction {
@Test @Test
public void testPriority() { public void testPriority() {
Configuration conf = HBaseConfiguration.create();
RSRpcServices rpcServices = Mockito.mock(RSRpcServices.class); RSRpcServices rpcServices = Mockito.mock(RSRpcServices.class);
when(rpcServices.getConfiguration()).thenReturn(conf);
AnnotationReadingPriorityFunction qosFunction = AnnotationReadingPriorityFunction qosFunction =
new AnnotationReadingPriorityFunction(rpcServices); new AnnotationReadingPriorityFunction(rpcServices);

View File

@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import java.util.Comparator;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.SmallTests;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class TestBoundedPriorityBlockingQueue {
private final static int CAPACITY = 16;
class TestObject {
private final int priority;
private final int seqId;
public TestObject(final int priority, final int seqId) {
this.priority = priority;
this.seqId = seqId;
}
public int getSeqId() {
return this.seqId;
}
public int getPriority() {
return this.priority;
}
}
class TestObjectComparator implements Comparator<TestObject> {
public TestObjectComparator() {}
@Override
public int compare(TestObject a, TestObject b) {
return a.getPriority() - b.getPriority();
}
}
private BoundedPriorityBlockingQueue<TestObject> queue;
@Before
public void setUp() throws Exception {
this.queue = new BoundedPriorityBlockingQueue<TestObject>(CAPACITY, new TestObjectComparator());
}
@After
public void tearDown() throws Exception {
}
@Test
public void tesAppend() throws Exception {
// Push
for (int i = 1; i <= CAPACITY; ++i) {
assertTrue(queue.offer(new TestObject(i, i)));
assertEquals(i, queue.size());
assertEquals(CAPACITY - i, queue.remainingCapacity());
}
assertFalse(queue.offer(new TestObject(0, -1), 5, TimeUnit.MILLISECONDS));
// Pop
for (int i = 1; i <= CAPACITY; ++i) {
TestObject obj = queue.poll();
assertEquals(i, obj.getSeqId());
assertEquals(CAPACITY - i, queue.size());
assertEquals(i, queue.remainingCapacity());
}
assertEquals(null, queue.poll());
}
@Test
public void tesAppendSamePriority() throws Exception {
// Push
for (int i = 1; i <= CAPACITY; ++i) {
assertTrue(queue.offer(new TestObject(0, i)));
assertEquals(i, queue.size());
assertEquals(CAPACITY - i, queue.remainingCapacity());
}
assertFalse(queue.offer(new TestObject(0, -1), 5, TimeUnit.MILLISECONDS));
// Pop
for (int i = 1; i <= CAPACITY; ++i) {
TestObject obj = queue.poll();
assertEquals(i, obj.getSeqId());
assertEquals(CAPACITY - i, queue.size());
assertEquals(i, queue.remainingCapacity());
}
assertEquals(null, queue.poll());
}
@Test
public void testPrepend() throws Exception {
// Push
for (int i = 1; i <= CAPACITY; ++i) {
assertTrue(queue.offer(new TestObject(CAPACITY - i, i)));
assertEquals(i, queue.size());
assertEquals(CAPACITY - i, queue.remainingCapacity());
}
// Pop
for (int i = 1; i <= CAPACITY; ++i) {
TestObject obj = queue.poll();
assertEquals(CAPACITY - (i - 1), obj.getSeqId());
assertEquals(CAPACITY - i, queue.size());
assertEquals(i, queue.remainingCapacity());
}
assertEquals(null, queue.poll());
}
@Test
public void testInsert() throws Exception {
// Push
for (int i = 1; i <= CAPACITY; i += 2) {
assertTrue(queue.offer(new TestObject(i, i)));
assertEquals((1 + i) / 2, queue.size());
}
for (int i = 2; i <= CAPACITY; i += 2) {
assertTrue(queue.offer(new TestObject(i, i)));
assertEquals(CAPACITY / 2 + (i / 2), queue.size());
}
assertFalse(queue.offer(new TestObject(0, -1), 5, TimeUnit.MILLISECONDS));
// Pop
for (int i = 1; i <= CAPACITY; ++i) {
TestObject obj = queue.poll();
assertEquals(i, obj.getSeqId());
assertEquals(CAPACITY - i, queue.size());
assertEquals(i, queue.remainingCapacity());
}
assertEquals(null, queue.poll());
}
@Test
public void testFifoSamePriority() throws Exception {
assertTrue(CAPACITY >= 6);
for (int i = 0; i < 6; ++i) {
assertTrue(queue.offer(new TestObject((1 + (i % 2)) * 10, i)));
}
for (int i = 0; i < 6; i += 2) {
TestObject obj = queue.poll();
assertEquals(10, obj.getPriority());
assertEquals(i, obj.getSeqId());
}
for (int i = 1; i < 6; i += 2) {
TestObject obj = queue.poll();
assertEquals(20, obj.getPriority());
assertEquals(i, obj.getSeqId());
}
assertEquals(null, queue.poll());
}
}