From de98955c16d71f2486f54f88c9ca6c88b91a3812 Mon Sep 17 00:00:00 2001 From: mbertozzi Date: Wed, 7 May 2014 21:05:04 +0000 Subject: [PATCH] HBASE-10993 Deprioritize long-running scanners git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1593137 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/hbase/ipc/PriorityFunction.java | 11 + .../apache/hadoop/hbase/ipc/RpcServer.java | 4 + .../hadoop/hbase/ipc/SimpleRpcScheduler.java | 55 ++- .../AnnotationReadingPriorityFunction.java | 43 +++ .../hbase/regionserver/RSRpcServices.java | 26 +- .../util/BoundedPriorityBlockingQueue.java | 330 ++++++++++++++++++ .../hbase/ipc/TestSimpleRpcScheduler.java | 119 +++++++ .../hbase/regionserver/TestQosFunction.java | 6 + .../TestBoundedPriorityBlockingQueue.java | 178 ++++++++++ 9 files changed, 769 insertions(+), 3 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedPriorityBlockingQueue.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBoundedPriorityBlockingQueue.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java index 0df041eab44..4a9e2d2268e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java @@ -25,9 +25,20 @@ import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; */ public interface PriorityFunction { /** + * Returns the 'priority type' of the specified request. + * The returned value is mainly used to select the dispatch queue. * @param header * @param param * @return Priority of this request. */ int getPriority(RequestHeader header, Message param); + + /** + * Returns the deadline of the specified request. + * The returned value is used to sort the dispatch queue. + * @param header + * @param param + * @return Deadline of this request. 0 now, otherwise msec of 'delay' + */ + long getDeadline(RequestHeader header, Message param); } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 8bdb765d21b..552952bfd03 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -313,6 +313,10 @@ public class RpcServer implements RpcServerInterface { " connection: " + connection.toString(); } + protected RequestHeader getHeader() { + return this.header; + } + /* * Short string representation without param info because param itself could be huge depends on * the payload of a command diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index 6a526b544ad..beb37f8672f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -17,14 +17,18 @@ */ package org.apache.hadoop.hbase.ipc; +import java.util.Comparator; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue; import com.google.common.base.Strings; import com.google.common.collect.Lists; @@ -36,6 +40,45 @@ import com.google.common.collect.Lists; @InterfaceAudience.Private @InterfaceStability.Evolving public class SimpleRpcScheduler implements RpcScheduler { + public static final Log LOG = LogFactory.getLog(SimpleRpcScheduler.class); + + /** If set to true, uses a priority queue and deprioritize long-running scans */ + public static final String CALL_QUEUE_TYPE_CONF_KEY = "ipc.server.callqueue.type"; + public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline"; + public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo"; + + /** max delay in msec used to bound the deprioritized requests */ + public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY = "ipc.server.queue.max.call.delay"; + + /** + * Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY is set to true. + * It uses the calculated "deadline" e.g. to deprioritize long-running job + * + * If multiple requests have the same deadline BoundedPriorityBlockingQueue will order them in + * FIFO (first-in-first-out) manner. + */ + private static class CallPriorityComparator implements Comparator { + private final static int DEFAULT_MAX_CALL_DELAY = 5000; + + private final PriorityFunction priority; + private final int maxDelay; + + public CallPriorityComparator(final Configuration conf, final PriorityFunction priority) { + this.priority = priority; + this.maxDelay = conf.getInt(QUEUE_MAX_CALL_DELAY_CONF_KEY, DEFAULT_MAX_CALL_DELAY); + } + + @Override + public int compare(CallRunner a, CallRunner b) { + RpcServer.Call callA = a.getCall(); + RpcServer.Call callB = b.getCall(); + long deadlineA = priority.getDeadline(callA.getHeader(), callA.param); + long deadlineB = priority.getDeadline(callB.getHeader(), callB.param); + deadlineA = callA.timestamp + Math.min(deadlineA, maxDelay); + deadlineB = callB.timestamp + Math.min(deadlineB, maxDelay); + return (int)(deadlineA - deadlineB); + } + } private int port; private final int handlerCount; @@ -73,7 +116,15 @@ public class SimpleRpcScheduler implements RpcScheduler { this.replicationHandlerCount = replicationHandlerCount; this.priority = priority; this.highPriorityLevel = highPriorityLevel; - this.callQueue = new LinkedBlockingQueue(maxQueueLength); + + String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); + LOG.debug("Using " + callQueueType + " as user call queue"); + if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { + this.callQueue = new BoundedPriorityBlockingQueue(maxQueueLength, + new CallPriorityComparator(conf, this.priority)); + } else { + this.callQueue = new LinkedBlockingQueue(maxQueueLength); + } this.priorityCallQueue = priorityHandlerCount > 0 ? new LinkedBlockingQueue(maxQueueLength) : null; @@ -128,7 +179,7 @@ public class SimpleRpcScheduler implements RpcScheduler { @Override public void dispatch(CallRunner callTask) throws InterruptedException { RpcServer.Call call = callTask.getCall(); - int level = priority.getPriority(call.header, call.param); + int level = priority.getPriority(call.getHeader(), call.param); if (priorityCallQueue != null && level > highPriorityLevel) { priorityCallQueue.put(callTask); } else if (replicationQueue != null && level == HConstants.REPLICATION_QOS) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java index b7775892530..e4a32b96be0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ipc.PriorityFunction; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest; @@ -68,6 +69,10 @@ import com.google.protobuf.TextFormat; class AnnotationReadingPriorityFunction implements PriorityFunction { public static final Log LOG = LogFactory.getLog(AnnotationReadingPriorityFunction.class.getName()); + + /** Used to control the scan delay, currently sqrt(numNextCall * weight) */ + public static final String SCAN_VTIME_WEIGHT_CONF_KEY = "ipc.server.scan.vtime.weight"; + private final Map annotatedQos; //We need to mock the regionserver instance for some unit tests (set via //setRegionServer method. @@ -91,6 +96,8 @@ class AnnotationReadingPriorityFunction implements PriorityFunction { private final Map, Method>> methodMap = new HashMap, Method>>(); + private final float scanVirtualTimeWeight; + AnnotationReadingPriorityFunction(final RSRpcServices rpcServices) { Map qosMap = new HashMap(); for (Method m : RSRpcServices.class.getMethods()) { @@ -120,6 +127,9 @@ class AnnotationReadingPriorityFunction implements PriorityFunction { throw new RuntimeException(e); } } + + Configuration conf = rpcServices.getConfiguration(); + scanVirtualTimeWeight = conf.getFloat(SCAN_VTIME_WEIGHT_CONF_KEY, 1.0f); } private String capitalize(final String s) { @@ -128,6 +138,14 @@ class AnnotationReadingPriorityFunction implements PriorityFunction { return strBuilder.toString(); } + /** + * Returns a 'priority' based on the request type. + * + * Currently the returned priority is used for queue selection. + * See the SimpleRpcScheduler as example. It maintains a queue per 'priory type' + * HIGH_QOS (meta requests), REPLICATION_QOS (replication requests), + * NORMAL_QOS (user requests). + */ @Override public int getPriority(RequestHeader header, Message param) { String methodName = header.getMethodName(); @@ -188,6 +206,31 @@ class AnnotationReadingPriorityFunction implements PriorityFunction { return HConstants.NORMAL_QOS; } + /** + * Based on the request content, returns the deadline of the request. + * + * @param header + * @param param + * @return Deadline of this request. 0 now, otherwise msec of 'delay' + */ + @Override + public long getDeadline(RequestHeader header, Message param) { + String methodName = header.getMethodName(); + if (methodName.equalsIgnoreCase("scan")) { + ScanRequest request = (ScanRequest)param; + if (!request.hasScannerId()) { + return 0; + } + + // get the 'virtual time' of the scanner, and applies sqrt() to get a + // nice curve for the delay. More a scanner is used the less priority it gets. + // The weight is used to have more control on the delay. + long vtime = rpcServices.getScannerVirtualTime(request.getScannerId()); + return Math.round(Math.sqrt(vtime * scanVirtualTimeWeight)); + } + return 0; + } + @VisibleForTesting void setRegionServer(final HRegionServer hrs) { this.rpcServices = hrs.getRSRpcServices(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index e7f7a10c06c..2e7b0220017 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.CellScanner; @@ -680,6 +681,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } public RSRpcServices(HRegionServer rs) throws IOException { + regionServer = rs; + RpcSchedulerFactory rpcSchedulerFactory; try { Class rpcSchedulerFactoryClass = rs.conf.getClass( @@ -722,7 +725,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // Set our address. isa = rpcServer.getListenerAddress(); rpcServer.setErrorHandler(this); - regionServer = rs; rs.setName(name); } @@ -735,6 +737,19 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return null; } + /** + * Get the vtime associated with the scanner. + * Currently the vtime is the number of "next" calls. + */ + long getScannerVirtualTime(long scannerId) { + String scannerIdString = Long.toString(scannerId); + RegionScannerHolder scannerHolder = scanners.get(scannerIdString); + if (scannerHolder != null) { + return scannerHolder.nextCallSeq; + } + return 0L; + } + long addScanner(RegionScanner s, HRegion r) throws LeaseStillHeldException { long scannerId = this.scannerIdGen.incrementAndGet(); String scannerName = String.valueOf(scannerId); @@ -766,6 +781,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return priority; } + Configuration getConfiguration() { + return regionServer.getConfiguration(); + } + void start() { rpcServer.start(); } @@ -821,6 +840,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return priority.getPriority(header, param); } + @Override + public long getDeadline(RequestHeader header, Message param) { + return priority.getDeadline(header, param); + } + /* * Check if an OOME and, if so, abort immediately to avoid creating more objects. * diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedPriorityBlockingQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedPriorityBlockingQueue.java new file mode 100644 index 00000000000..b525136ffca --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedPriorityBlockingQueue.java @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.Collection; +import java.util.Comparator; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.AbstractQueue; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * A generic bounded blocking Priority-Queue. + * + * The elements of the priority queue are ordered according to the Comparator + * provided at queue construction time. + * + * If multiple elements have the same priority this queue orders them in + * FIFO (first-in-first-out) manner. + * The head of this queue is the least element with respect to the specified + * ordering. If multiple elements are tied for least value, the head is the + * first one inserted. + * The queue retrieval operations poll, remove, peek, and element access the + * element at the head of the queue. + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class BoundedPriorityBlockingQueue extends AbstractQueue implements BlockingQueue { + private static class PriorityQueue { + private final Comparator comparator; + private final E[] objects; + + private int head = 0; + private int tail = 0; + + @SuppressWarnings("unchecked") + public PriorityQueue(int capacity, Comparator comparator) { + this.objects = (E[])new Object[capacity]; + this.comparator = comparator; + } + + public void add(E elem) { + if (tail == objects.length) { + // shift down |-----AAAAAAA| + tail -= head; + System.arraycopy(objects, head, objects, 0, tail); + head = 0; + } + + if (tail == head || comparator.compare(objects[tail - 1], elem) <= 0) { + // Append + objects[tail++] = elem; + } else if (head > 0 && comparator.compare(objects[head], elem) > 0) { + // Prepend + objects[--head] = elem; + } else { + // Insert in the middle + int index = upperBound(head, tail - 1, elem); + System.arraycopy(objects, index, objects, index + 1, tail - index); + objects[index] = elem; + tail++; + } + } + + public E peek() { + return (head != tail) ? objects[head] : null; + } + + public E poll() { + E elem = objects[head]; + head = (head + 1) % objects.length; + if (head == 0) tail = 0; + return elem; + } + + public int size() { + return tail - head; + } + + public Comparator comparator() { + return this.comparator; + } + + public boolean contains(Object o) { + for (int i = head; i < tail; ++i) { + if (objects[i] == o) { + return true; + } + } + return false; + } + + public int remainingCapacity() { + return this.objects.length - (tail - head); + } + + private int upperBound(int start, int end, E key) { + while (start < end) { + int mid = (start + end) >>> 1; + E mitem = objects[mid]; + int cmp = comparator.compare(mitem, key); + if (cmp > 0) { + end = mid; + } else { + start = mid + 1; + } + } + return start; + } + } + + + // Lock used for all operations + private final ReentrantLock lock = new ReentrantLock(); + + // Condition for blocking when empty + private final Condition notEmpty = lock.newCondition(); + + // Wait queue for waiting puts + private final Condition notFull = lock.newCondition(); + + private final PriorityQueue queue; + + /** + * Creates a PriorityQueue with the specified capacity that orders its + * elements according to the specified comparator. + * @param capacity the capacity of this queue + * @param comparator the comparator that will be used to order this priority queue + */ + public BoundedPriorityBlockingQueue(int capacity, + Comparator comparator) { + this.queue = new PriorityQueue(capacity, comparator); + } + + public boolean offer(E e) { + if (e == null) throw new NullPointerException(); + + lock.lock(); + try { + if (queue.remainingCapacity() > 0) { + this.queue.add(e); + notEmpty.signal(); + return true; + } + } finally { + lock.unlock(); + } + return false; + } + + public void put(E e) throws InterruptedException { + if (e == null) throw new NullPointerException(); + + lock.lock(); + try { + while (queue.remainingCapacity() == 0) { + notFull.await(); + } + this.queue.add(e); + notEmpty.signal(); + } finally { + lock.unlock(); + } + } + + public boolean offer(E e, long timeout, TimeUnit unit) + throws InterruptedException { + if (e == null) throw new NullPointerException(); + long nanos = unit.toNanos(timeout); + + lock.lockInterruptibly(); + try { + while (queue.remainingCapacity() == 0) { + if (nanos <= 0) + return false; + nanos = notFull.awaitNanos(nanos); + } + this.queue.add(e); + notEmpty.signal(); + } finally { + lock.unlock(); + } + return true; + } + + public E take() throws InterruptedException { + E result = null; + lock.lockInterruptibly(); + try { + while (queue.size() == 0) { + notEmpty.await(); + } + result = queue.poll(); + notFull.signal(); + } finally { + lock.unlock(); + } + return result; + } + + public E poll() { + E result = null; + lock.lock(); + try { + if (queue.size() > 0) { + result = queue.poll(); + notFull.signal(); + } + } finally { + lock.unlock(); + } + return result; + } + + public E poll(long timeout, TimeUnit unit) + throws InterruptedException { + long nanos = unit.toNanos(timeout); + lock.lockInterruptibly(); + E result = null; + try { + while (queue.size() == 0 && nanos > 0) { + notEmpty.awaitNanos(nanos); + } + if (queue.size() > 0) { + result = queue.poll(); + } + notFull.signal(); + } finally { + lock.unlock(); + } + return result; + } + + public E peek() { + lock.lock(); + try { + return queue.peek(); + } finally { + lock.unlock(); + } + } + + public int size() { + lock.lock(); + try { + return queue.size(); + } finally { + lock.unlock(); + } + } + + public Iterator iterator() { + throw new UnsupportedOperationException(); + } + + public Comparator comparator() { + return queue.comparator(); + } + + public int remainingCapacity() { + lock.lock(); + try { + return queue.remainingCapacity(); + } finally { + lock.unlock(); + } + } + + public boolean remove(Object o) { + throw new UnsupportedOperationException(); + } + + public boolean contains(Object o) { + lock.lock(); + try { + return queue.contains(o); + } finally { + lock.unlock(); + } + } + + public int drainTo(Collection c) { + return drainTo(c, Integer.MAX_VALUE); + } + + public int drainTo(Collection c, int maxElements) { + if (c == null) + throw new NullPointerException(); + if (c == this) + throw new IllegalArgumentException(); + if (maxElements <= 0) + return 0; + lock.lock(); + try { + int n = Math.min(queue.size(), maxElements); + for (int i = 0; i < n; ++i) { + c.add(queue.poll()); + } + return n; + } finally { + lock.unlock(); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java index c1ed778f1f8..d4a79b642ef 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java @@ -21,6 +21,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import com.google.protobuf.Message; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -28,6 +30,8 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.ipc.RpcServer.Call; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.util.Threads; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -36,12 +40,15 @@ import org.mockito.stubbing.Answer; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.timeout; @@ -50,6 +57,7 @@ import static org.mockito.Mockito.when; @Category(SmallTests.class) public class TestSimpleRpcScheduler { + public static final Log LOG = LogFactory.getLog(TestSimpleRpcScheduler.class); private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() { @Override @@ -134,4 +142,115 @@ public class TestSimpleRpcScheduler { when(task.getCall()).thenReturn(call); return task; } + + @Test + public void testRpcScheduler() throws Exception { + testRpcScheduler(SimpleRpcScheduler.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); + testRpcScheduler(SimpleRpcScheduler.CALL_QUEUE_TYPE_FIFO_CONF_VALUE); + } + + private void testRpcScheduler(final String queueType) throws Exception { + Configuration schedConf = HBaseConfiguration.create(); + schedConf.set(SimpleRpcScheduler.CALL_QUEUE_TYPE_CONF_KEY, queueType); + + PriorityFunction priority = mock(PriorityFunction.class); + when(priority.getPriority(any(RequestHeader.class), any(Message.class))) + .thenReturn(HConstants.NORMAL_QOS); + + RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, + HConstants.QOS_THRESHOLD); + try { + scheduler.start(); + + CallRunner smallCallTask = mock(CallRunner.class); + RpcServer.Call smallCall = mock(RpcServer.Call.class); + RequestHeader smallHead = RequestHeader.newBuilder().setCallId(1).build(); + when(smallCallTask.getCall()).thenReturn(smallCall); + when(smallCall.getHeader()).thenReturn(smallHead); + + CallRunner largeCallTask = mock(CallRunner.class); + RpcServer.Call largeCall = mock(RpcServer.Call.class); + RequestHeader largeHead = RequestHeader.newBuilder().setCallId(50).build(); + when(largeCallTask.getCall()).thenReturn(largeCall); + when(largeCall.getHeader()).thenReturn(largeHead); + + CallRunner hugeCallTask = mock(CallRunner.class); + RpcServer.Call hugeCall = mock(RpcServer.Call.class); + RequestHeader hugeHead = RequestHeader.newBuilder().setCallId(100).build(); + when(hugeCallTask.getCall()).thenReturn(hugeCall); + when(hugeCall.getHeader()).thenReturn(hugeHead); + + when(priority.getDeadline(eq(smallHead), any(Message.class))).thenReturn(0L); + when(priority.getDeadline(eq(largeHead), any(Message.class))).thenReturn(50L); + when(priority.getDeadline(eq(hugeHead), any(Message.class))).thenReturn(100L); + + final ArrayList work = new ArrayList(); + + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) { + synchronized (work) { + work.add(10); + } + Threads.sleepWithoutInterrupt(100); + return null; + } + }).when(smallCallTask).run(); + + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) { + synchronized (work) { + work.add(50); + } + Threads.sleepWithoutInterrupt(100); + return null; + } + }).when(largeCallTask).run(); + + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) { + synchronized (work) { + work.add(100); + } + Threads.sleepWithoutInterrupt(100); + return null; + } + }).when(hugeCallTask).run(); + + scheduler.dispatch(smallCallTask); + scheduler.dispatch(smallCallTask); + scheduler.dispatch(smallCallTask); + scheduler.dispatch(hugeCallTask); + scheduler.dispatch(smallCallTask); + scheduler.dispatch(largeCallTask); + scheduler.dispatch(smallCallTask); + scheduler.dispatch(smallCallTask); + + while (work.size() < 8) { + Threads.sleepWithoutInterrupt(100); + } + + int seqSum = 0; + int totalTime = 0; + for (int i = 0; i < work.size(); ++i) { + LOG.debug("Request i=" + i + " value=" + work.get(i)); + seqSum += work.get(i); + totalTime += seqSum; + } + LOG.debug("Total Time: " + totalTime); + + // -> [small small small huge small large small small] + // -> NO REORDER [10 10 10 100 10 50 10 10] -> 930 (FIFO Queue) + // -> WITH REORDER [10 10 10 10 10 10 50 100] -> 530 (Deadline Queue) + if (queueType.equals(SimpleRpcScheduler.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { + assertEquals(530, totalTime); + } else /* if (queueType.equals(SimpleRpcScheduler.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)) */ { + assertEquals(930, totalTime); + } + } finally { + scheduler.stop(); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQosFunction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQosFunction.java index 3bc93143801..b9f73d6e33d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQosFunction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQosFunction.java @@ -17,7 +17,10 @@ package org.apache.hadoop.hbase.regionserver; * limitations under the License. */ import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.when; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; @@ -36,7 +39,10 @@ import com.google.protobuf.Message; public class TestQosFunction { @Test public void testPriority() { + Configuration conf = HBaseConfiguration.create(); RSRpcServices rpcServices = Mockito.mock(RSRpcServices.class); + when(rpcServices.getConfiguration()).thenReturn(conf); + AnnotationReadingPriorityFunction qosFunction = new AnnotationReadingPriorityFunction(rpcServices); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBoundedPriorityBlockingQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBoundedPriorityBlockingQueue.java new file mode 100644 index 00000000000..f09c79cfba4 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBoundedPriorityBlockingQueue.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hbase.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; + +import java.util.Comparator; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.SmallTests; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestBoundedPriorityBlockingQueue { + private final static int CAPACITY = 16; + + class TestObject { + private final int priority; + private final int seqId; + + public TestObject(final int priority, final int seqId) { + this.priority = priority; + this.seqId = seqId; + } + + public int getSeqId() { + return this.seqId; + } + + public int getPriority() { + return this.priority; + } + } + + class TestObjectComparator implements Comparator { + public TestObjectComparator() {} + + @Override + public int compare(TestObject a, TestObject b) { + return a.getPriority() - b.getPriority(); + } + } + + private BoundedPriorityBlockingQueue queue; + + @Before + public void setUp() throws Exception { + this.queue = new BoundedPriorityBlockingQueue(CAPACITY, new TestObjectComparator()); + } + + @After + public void tearDown() throws Exception { + } + + @Test + public void tesAppend() throws Exception { + // Push + for (int i = 1; i <= CAPACITY; ++i) { + assertTrue(queue.offer(new TestObject(i, i))); + assertEquals(i, queue.size()); + assertEquals(CAPACITY - i, queue.remainingCapacity()); + } + assertFalse(queue.offer(new TestObject(0, -1), 5, TimeUnit.MILLISECONDS)); + + // Pop + for (int i = 1; i <= CAPACITY; ++i) { + TestObject obj = queue.poll(); + assertEquals(i, obj.getSeqId()); + assertEquals(CAPACITY - i, queue.size()); + assertEquals(i, queue.remainingCapacity()); + } + assertEquals(null, queue.poll()); + } + + @Test + public void tesAppendSamePriority() throws Exception { + // Push + for (int i = 1; i <= CAPACITY; ++i) { + assertTrue(queue.offer(new TestObject(0, i))); + assertEquals(i, queue.size()); + assertEquals(CAPACITY - i, queue.remainingCapacity()); + } + assertFalse(queue.offer(new TestObject(0, -1), 5, TimeUnit.MILLISECONDS)); + + // Pop + for (int i = 1; i <= CAPACITY; ++i) { + TestObject obj = queue.poll(); + assertEquals(i, obj.getSeqId()); + assertEquals(CAPACITY - i, queue.size()); + assertEquals(i, queue.remainingCapacity()); + } + assertEquals(null, queue.poll()); + } + + @Test + public void testPrepend() throws Exception { + // Push + for (int i = 1; i <= CAPACITY; ++i) { + assertTrue(queue.offer(new TestObject(CAPACITY - i, i))); + assertEquals(i, queue.size()); + assertEquals(CAPACITY - i, queue.remainingCapacity()); + } + + // Pop + for (int i = 1; i <= CAPACITY; ++i) { + TestObject obj = queue.poll(); + assertEquals(CAPACITY - (i - 1), obj.getSeqId()); + assertEquals(CAPACITY - i, queue.size()); + assertEquals(i, queue.remainingCapacity()); + } + assertEquals(null, queue.poll()); + } + + @Test + public void testInsert() throws Exception { + // Push + for (int i = 1; i <= CAPACITY; i += 2) { + assertTrue(queue.offer(new TestObject(i, i))); + assertEquals((1 + i) / 2, queue.size()); + } + for (int i = 2; i <= CAPACITY; i += 2) { + assertTrue(queue.offer(new TestObject(i, i))); + assertEquals(CAPACITY / 2 + (i / 2), queue.size()); + } + assertFalse(queue.offer(new TestObject(0, -1), 5, TimeUnit.MILLISECONDS)); + + // Pop + for (int i = 1; i <= CAPACITY; ++i) { + TestObject obj = queue.poll(); + assertEquals(i, obj.getSeqId()); + assertEquals(CAPACITY - i, queue.size()); + assertEquals(i, queue.remainingCapacity()); + } + assertEquals(null, queue.poll()); + } + + @Test + public void testFifoSamePriority() throws Exception { + assertTrue(CAPACITY >= 6); + for (int i = 0; i < 6; ++i) { + assertTrue(queue.offer(new TestObject((1 + (i % 2)) * 10, i))); + } + + for (int i = 0; i < 6; i += 2) { + TestObject obj = queue.poll(); + assertEquals(10, obj.getPriority()); + assertEquals(i, obj.getSeqId()); + } + + for (int i = 1; i < 6; i += 2) { + TestObject obj = queue.poll(); + assertEquals(20, obj.getPriority()); + assertEquals(i, obj.getSeqId()); + } + assertEquals(null, queue.poll()); + } +}