diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
index 650c544f15f..15b393041c1 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
@@ -85,6 +85,7 @@ public class ReflectionUtils {
match = (!ctorParamTypes[i].isPrimitive()) ? ctorParamTypes[i].isAssignableFrom(paramType) :
((int.class.equals(ctorParamTypes[i]) && Integer.class.equals(paramType)) ||
(long.class.equals(ctorParamTypes[i]) && Long.class.equals(paramType)) ||
+ (double.class.equals(ctorParamTypes[i]) && Double.class.equals(paramType)) ||
(char.class.equals(ctorParamTypes[i]) && Character.class.equals(paramType)) ||
(short.class.equals(ctorParamTypes[i]) && Short.class.equals(paramType)) ||
(boolean.class.equals(ctorParamTypes[i]) && Boolean.class.equals(paramType)) ||
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
index 061a67210b7..bb897896a01 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
@@ -64,6 +64,12 @@ public interface MetricsHBaseServerSource extends BaseSource {
String NUM_OPEN_CONNECTIONS_DESC = "Number of open connections.";
String NUM_ACTIVE_HANDLER_NAME = "numActiveHandler";
String NUM_ACTIVE_HANDLER_DESC = "Number of active rpc handlers.";
+ String NUM_GENERAL_CALLS_DROPPED_NAME = "numGeneralCallsDropped";
+ String NUM_GENERAL_CALLS_DROPPED_DESC = "Total number of calls in general queue which " +
+ "were dropped by CoDel RPC executor";
+ String NUM_LIFO_MODE_SWITCHES_NAME = "numLifoModeSwitches";
+ String NUM_LIFO_MODE_SWITCHES_DESC = "Total number of calls in general queue which " +
+ "were served from the tail of the queue";
String EXCEPTIONS_NAME="exceptions";
String EXCEPTIONS_DESC="Exceptions caused by requests";
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java
index 1885264a517..8f30205b180 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java
@@ -26,4 +26,6 @@ public interface MetricsHBaseServerWrapper {
int getPriorityQueueLength();
int getNumOpenConnections();
int getActiveRpcHandlerCount();
+ long getNumGeneralCallsDropped();
+ long getNumLifoModeSwitches();
}
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
index 48f57e976e4..c4665647175 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
@@ -219,7 +219,11 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
.addGauge(Interns.info(NUM_OPEN_CONNECTIONS_NAME,
NUM_OPEN_CONNECTIONS_DESC), wrapper.getNumOpenConnections())
.addGauge(Interns.info(NUM_ACTIVE_HANDLER_NAME,
- NUM_ACTIVE_HANDLER_DESC), wrapper.getActiveRpcHandlerCount());
+ NUM_ACTIVE_HANDLER_DESC), wrapper.getActiveRpcHandlerCount())
+ .addCounter(Interns.info(NUM_GENERAL_CALLS_DROPPED_NAME,
+ NUM_GENERAL_CALLS_DROPPED_DESC), wrapper.getNumGeneralCallsDropped())
+ .addCounter(Interns.info(NUM_LIFO_MODE_SWITCHES_NAME,
+ NUM_LIFO_MODE_SWITCHES_DESC), wrapper.getNumLifoModeSwitches());
}
metricsRegistry.snapshot(mrb, all);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
new file mode 100644
index 00000000000..37e86bec4d8
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Adaptive LIFO blocking queue utilizing CoDel algorithm to prevent queue overloading.
+ *
+ * Implementing {@link BlockingQueue} interface to be compatible with {@link RpcExecutor}.
+ *
+ * Currently uses milliseconds internally, need to look into whether we should use
+ * nanoseconds for timeInterval and minDelay.
+ *
+ * @see Fail at Scale paper
+ *
+ * @see
+ * CoDel version for generic job queues in Wangle library
+ */
+@InterfaceAudience.Private
+public class AdaptiveLifoCoDelCallQueue implements BlockingQueue {
+
+ // backing queue
+ private LinkedBlockingDeque queue;
+
+ // so we can calculate actual threshold to switch to LIFO under load
+ private int maxCapacity;
+
+ // metrics (shared across all queues)
+ private AtomicLong numGeneralCallsDropped;
+ private AtomicLong numLifoModeSwitches;
+
+ /**
+ * Lock held by take ops, all other locks are inside queue impl.
+ *
+ * NOTE: We want to have this lock so that in case when there're lot of already expired
+ * calls in the call queue a handler thread calling take() can just grab lock once and
+ * then fast-forward through the expired calls to the first non-expired without having
+ * to contend for locks on every element in underlying queue.
+ */
+ private final ReentrantLock lock = new ReentrantLock();
+
+ // Both are in milliseconds
+ private volatile int codelTargetDelay;
+ private volatile int codelInterval;
+
+ // if queue if full more than that percent, we switch to LIFO mode.
+ // Values are in the range of 0.7, 0.8 etc (0-1.0).
+ private volatile double lifoThreshold;
+
+ // minimal delay observed during the interval
+ private volatile long minDelay;
+
+ // the moment when current interval ends
+ private volatile long intervalTime = System.currentTimeMillis();
+
+ // switch to ensure only one threads does interval cutoffs
+ private AtomicBoolean resetDelay = new AtomicBoolean(true);
+
+ // if we're in this mode, "long" calls are getting dropped
+ private volatile boolean isOverloaded;
+
+ public AdaptiveLifoCoDelCallQueue(int capacity, int targetDelay, int interval,
+ double lifoThreshold, AtomicLong numGeneralCallsDropped, AtomicLong numLifoModeSwitches) {
+ this.maxCapacity = capacity;
+ this.queue = new LinkedBlockingDeque<>(capacity);
+ this.codelTargetDelay = targetDelay;
+ this.codelInterval = interval;
+ this.lifoThreshold = lifoThreshold;
+ this.numGeneralCallsDropped = numGeneralCallsDropped;
+ this.numLifoModeSwitches = numLifoModeSwitches;
+ }
+
+ /**
+ * Update tunables.
+ *
+ * @param newCodelTargetDelay new CoDel target delay
+ * @param newCodelInterval new CoDel interval
+ * @param newLifoThreshold new Adaptive Lifo threshold
+ */
+ public void updateTunables(int newCodelTargetDelay, int newCodelInterval,
+ double newLifoThreshold) {
+ this.codelTargetDelay = newCodelTargetDelay;
+ this.codelInterval = newCodelInterval;
+ this.lifoThreshold = newLifoThreshold;
+ }
+
+ /**
+ * Behaves as {@link LinkedBlockingQueue#take()}, except it will silently
+ * skip all calls which it thinks should be dropped.
+ *
+ * @return the head of this queue
+ * @throws InterruptedException if interrupted while waiting
+ */
+ @Override
+ public CallRunner take() throws InterruptedException {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ CallRunner cr;
+ while(true) {
+ if (((double) queue.size() / this.maxCapacity) > lifoThreshold) {
+ numLifoModeSwitches.incrementAndGet();
+ cr = queue.takeLast();
+ } else {
+ cr = queue.takeFirst();
+ }
+ if (needToDrop(cr)) {
+ numGeneralCallsDropped.incrementAndGet();
+ } else {
+ return cr;
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * @param callRunner to validate
+ * @return true if this call needs to be skipped based on call timestamp
+ * and internal queue state (deemed overloaded).
+ */
+ private boolean needToDrop(CallRunner callRunner) {
+ long now = System.currentTimeMillis();
+ long callDelay = now - callRunner.getCall().timestamp;
+
+ long localMinDelay = this.minDelay;
+ if (now > intervalTime && !resetDelay.getAndSet(true)) {
+ intervalTime = now + codelInterval;
+
+ if (localMinDelay > codelTargetDelay) {
+ isOverloaded = true;
+ } else {
+ isOverloaded = false;
+ }
+ }
+
+ if (resetDelay.getAndSet(false)) {
+ minDelay = callDelay;
+ return false;
+ } else if (callDelay < localMinDelay) {
+ minDelay = callDelay;
+ }
+
+ if (isOverloaded && callDelay > 2 * codelTargetDelay) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ // Generic BlockingQueue methods we support
+ @Override
+ public boolean offer(CallRunner callRunner) {
+ return queue.offer(callRunner);
+ }
+
+ @Override
+ public int size() {
+ return queue.size();
+ }
+
+ @Override
+ public String toString() {
+ return queue.toString();
+ }
+
+ // This class does NOT provide generic purpose BlockingQueue implementation,
+ // so to prevent misuse all other methods throw UnsupportedOperationException.
+
+ @Override
+ public CallRunner poll(long timeout, TimeUnit unit) throws InterruptedException {
+ throw new UnsupportedOperationException("This class doesn't support anything,"
+ + " but take() and offer() methods");
+ }
+
+ @Override
+ public CallRunner poll() {
+ throw new UnsupportedOperationException("This class doesn't support anything,"
+ + " but take() and offer() methods");
+ }
+
+ @Override
+ public CallRunner peek() {
+ throw new UnsupportedOperationException("This class doesn't support anything,"
+ + " but take() and offer() methods");
+ }
+
+ @Override
+ public boolean remove(Object o) {
+ throw new UnsupportedOperationException("This class doesn't support anything,"
+ + " but take() and offer() methods");
+ }
+
+ @Override
+ public boolean contains(Object o) {
+ throw new UnsupportedOperationException("This class doesn't support anything,"
+ + " but take() and offer() methods");
+ }
+
+ @Override
+ public Object[] toArray() {
+ throw new UnsupportedOperationException("This class doesn't support anything,"
+ + " but take() and offer() methods");
+ }
+
+ @Override
+ public T[] toArray(T[] a) {
+ throw new UnsupportedOperationException("This class doesn't support anything,"
+ + " but take() and offer() methods");
+ }
+
+ @Override
+ public void clear() {
+ throw new UnsupportedOperationException("This class doesn't support anything,"
+ + " but take() and offer() methods");
+ }
+
+ @Override
+ public int drainTo(Collection super CallRunner> c) {
+ throw new UnsupportedOperationException("This class doesn't support anything,"
+ + " but take() and offer() methods");
+ }
+
+ @Override
+ public int drainTo(Collection super CallRunner> c, int maxElements) {
+ throw new UnsupportedOperationException("This class doesn't support anything,"
+ + " but take() and offer() methods");
+ }
+
+ @Override
+ public Iterator iterator() {
+ throw new UnsupportedOperationException("This class doesn't support anything,"
+ + " but take() and offer() methods");
+ }
+
+ @Override
+ public boolean add(CallRunner callRunner) {
+ throw new UnsupportedOperationException("This class doesn't support anything,"
+ + " but take() and offer() methods");
+ }
+
+ @Override
+ public CallRunner remove() {
+ throw new UnsupportedOperationException("This class doesn't support anything,"
+ + " but take() and offer() methods");
+ }
+
+ @Override
+ public CallRunner element() {
+ throw new UnsupportedOperationException("This class doesn't support anything,"
+ + " but take() and offer() methods");
+ }
+
+ @Override
+ public boolean addAll(Collection extends CallRunner> c) {
+ throw new UnsupportedOperationException("This class doesn't support anything,"
+ + " but take() and offer() methods");
+ }
+
+ @Override
+ public boolean isEmpty() {
+ throw new UnsupportedOperationException("This class doesn't support anything,"
+ + " but take() and offer() methods");
+ }
+
+ @Override
+ public boolean containsAll(Collection> c) {
+ throw new UnsupportedOperationException("This class doesn't support anything,"
+ + " but take() and offer() methods");
+ }
+
+ @Override
+ public boolean removeAll(Collection> c) {
+ throw new UnsupportedOperationException("This class doesn't support anything,"
+ + " but take() and offer() methods");
+ }
+
+ @Override
+ public boolean retainAll(Collection> c) {
+ throw new UnsupportedOperationException("This class doesn't support anything,"
+ + " but take() and offer() methods");
+ }
+
+ @Override
+ public int remainingCapacity() {
+ throw new UnsupportedOperationException("This class doesn't support anything,"
+ + " but take() and offer() methods");
+ }
+
+ @Override
+ public void put(CallRunner callRunner) throws InterruptedException {
+ throw new UnsupportedOperationException("This class doesn't support anything,"
+ + " but take() and offer() methods");
+ }
+
+ @Override
+ public boolean offer(CallRunner callRunner, long timeout, TimeUnit unit)
+ throws InterruptedException {
+ throw new UnsupportedOperationException("This class doesn't support anything,"
+ + " but take() and offer() methods");
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
index 5e104eb0acd..b069a5af4a4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
@@ -104,4 +104,14 @@ public class FifoRpcScheduler extends RpcScheduler {
public int getActiveRpcHandlerCount() {
return executor.getActiveCount();
}
+
+ @Override
+ public long getNumGeneralCallsDropped() {
+ return 0;
+ }
+
+ @Override
+ public long getNumLifoModeSwitches() {
+ return 0;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
index 63c4b325c4c..9979c75aebf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
@@ -78,4 +78,20 @@ public class MetricsHBaseServerWrapperImpl implements MetricsHBaseServerWrapper
}
return server.getScheduler().getActiveRpcHandlerCount();
}
+
+ @Override
+ public long getNumGeneralCallsDropped() {
+ if (!isServerStarted() || this.server.getScheduler() == null) {
+ return 0;
+ }
+ return server.getScheduler().getNumGeneralCallsDropped();
+ }
+
+ @Override
+ public long getNumLifoModeSwitches() {
+ if (!isServerStarted() || this.server.getScheduler() == null) {
+ return 0;
+ }
+ return server.getScheduler().getNumLifoModeSwitches();
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
index a9648b06876..e0203ab0012 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
@@ -100,6 +100,16 @@ public class RWQueueRpcExecutor extends RpcExecutor {
readQueueClass, ArrayUtils.addAll(new Object[] {maxQueueLength}, readQueueInitArgs));
}
+ public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
+ final float readShare, final float scanShare,
+ final Class extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
+ final Class extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
+ this(name, calcNumWriters(handlerCount, readShare), calcNumReaders(handlerCount, readShare),
+ calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare), scanShare,
+ writeQueueClass, writeQueueInitArgs,
+ readQueueClass, readQueueInitArgs);
+ }
+
public RWQueueRpcExecutor(final String name, final int writeHandlers, final int readHandlers,
final int numWriteQueues, final int numReadQueues,
final Class extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
index fffe7f3a0ea..50886cbe418 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
@@ -71,4 +71,17 @@ public abstract class RpcScheduler {
/** Retrieves the number of active handler. */
public abstract int getActiveRpcHandlerCount();
+
+ /**
+ * If CoDel-based RPC executors are used, retrieves the number of Calls that were dropped
+ * from general queue because RPC executor is under high load; returns 0 otherwise.
+ */
+ public abstract long getNumGeneralCallsDropped();
+
+ /**
+ * If CoDel-based RPC executors are used, retrieves the number of Calls that were
+ * picked from the tail of the queue (indicating adaptive LIFO mode, when
+ * in the period of overloade we serve last requests first); returns 0 otherwise.
+ */
+ public abstract long getNumLifoModeSwitches();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index 00032542e19..12ee5408ed1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.ipc;
import java.util.Comparator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -49,6 +51,7 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
/** If set to 'deadline', uses a priority queue and deprioritize long-running scans */
public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type";
+ public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel";
public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline";
public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo";
@@ -56,6 +59,21 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY
= "hbase.ipc.server.queue.max.call.delay";
+ // These 3 are only used by Codel executor
+ public static final String CALL_QUEUE_CODEL_TARGET_DELAY =
+ "hbase.ipc.server.callqueue.codel.target.delay";
+ public static final String CALL_QUEUE_CODEL_INTERVAL =
+ "hbase.ipc.server.callqueue.codel.interval";
+ public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD =
+ "hbase.ipc.server.callqueue.codel.lifo.threshold";
+
+ public static final int CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY = 5;
+ public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL = 100;
+ public static final double CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD = 0.8;
+
+ private AtomicLong numGeneralCallsDropped = new AtomicLong();
+ private AtomicLong numLifoModeSwitches = new AtomicLong();
+
/**
* Resize call queues;
* @param conf new configuration
@@ -69,6 +87,26 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
if (replicationExecutor != null) {
replicationExecutor.resizeQueues(conf);
}
+
+ String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY,
+ CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
+
+ if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) {
+ // update CoDel Scheduler tunables
+ int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY,
+ CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY);
+ int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL,
+ CALL_QUEUE_CODEL_DEFAULT_INTERVAL);
+ double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD,
+ CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD);
+
+ for (BlockingQueue queue : callExecutor.getQueues()) {
+ if (queue instanceof AdaptiveLifoCoDelCallQueue) {
+ ((AdaptiveLifoCoDelCallQueue) queue).updateTunables(codelTargetDelay,
+ codelInterval, codelLifoThreshold);
+ }
+ }
+ }
}
/**
@@ -134,10 +172,18 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
this.highPriorityLevel = highPriorityLevel;
this.abortable = server;
- String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
+ String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY,
+ CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
float callqScanShare = conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
+ int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY,
+ CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY);
+ int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL,
+ CALL_QUEUE_CODEL_DEFAULT_INTERVAL);
+ double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD,
+ CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD);
+
float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0);
int numCallQueues = Math.max(1, (int)Math.round(handlerCount * callQueuesHandlersFactor));
@@ -150,6 +196,13 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues,
callqReadShare, callqScanShare, maxQueueLength, conf, abortable,
BoundedPriorityBlockingQueue.class, callPriority);
+ } else if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) {
+ Object[] callQueueInitArgs = {maxQueueLength, codelTargetDelay, codelInterval,
+ codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches};
+ callExecutor = new RWQueueRpcExecutor("B.default", handlerCount,
+ numCallQueues, callqReadShare, callqScanShare,
+ AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs,
+ AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs);
} else {
callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues,
callqReadShare, callqScanShare, maxQueueLength, conf, abortable);
@@ -160,6 +213,11 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues,
conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
+ } else if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) {
+ callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues,
+ conf, abortable, AdaptiveLifoCoDelCallQueue.class, maxQueueLength,
+ codelTargetDelay, codelInterval, codelLifoThreshold,
+ numGeneralCallsDropped, numLifoModeSwitches);
} else {
callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount,
numCallQueues, maxQueueLength, conf, abortable);
@@ -239,5 +297,15 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
(priorityExecutor == null ? 0 : priorityExecutor.getActiveHandlerCount()) +
(replicationExecutor == null ? 0 : replicationExecutor.getActiveHandlerCount());
}
+
+ @Override
+ public long getNumGeneralCallsDropped() {
+ return numGeneralCallsDropped.get();
+ }
+
+ @Override
+ public long getNumLifoModeSwitches() {
+ return numLifoModeSwitches.get();
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java
index 6241f8eadde..b001d74f7c9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java
@@ -48,4 +48,14 @@ public class MetricsHBaseServerWrapperStub implements MetricsHBaseServerWrapper{
public int getActiveRpcHandlerCount() {
return 106;
}
+
+ @Override
+ public long getNumGeneralCallsDropped() {
+ return 3;
+ }
+
+ @Override
+ public long getNumLifoModeSwitches() {
+ return 5;
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
index 8e321a61dcf..dd3433c67cc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
@@ -356,4 +356,67 @@ public class TestSimpleRpcScheduler {
scheduler.stop();
}
}
+
+ @Test
+ public void testCoDelScheduling() throws Exception {
+ Configuration schedConf = HBaseConfiguration.create();
+
+ schedConf.set(SimpleRpcScheduler.CALL_QUEUE_TYPE_CONF_KEY,
+ SimpleRpcScheduler.CALL_QUEUE_TYPE_CODEL_CONF_VALUE);
+
+ PriorityFunction priority = mock(PriorityFunction.class);
+ when(priority.getPriority(any(RPCProtos.RequestHeader.class), any(Message.class),
+ any(User.class))).thenReturn(HConstants.NORMAL_QOS);
+ SimpleRpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 1, 1, 1, priority,
+ HConstants.QOS_THRESHOLD);
+ try {
+ scheduler.start();
+
+ // calls faster than min delay
+ for (int i = 0; i < 100; i++) {
+ CallRunner cr = getMockedCallRunner();
+ Thread.sleep(5);
+ scheduler.dispatch(cr);
+ }
+ Thread.sleep(100); // make sure fast calls are handled
+ assertEquals("None of these calls should have been discarded", 0,
+ scheduler.getNumGeneralCallsDropped());
+
+ // calls slower than min delay, but not individually slow enough to be dropped
+ for (int i = 0; i < 20; i++) {
+ CallRunner cr = getMockedCallRunner();
+ Thread.sleep(6);
+ scheduler.dispatch(cr);
+ }
+
+ Thread.sleep(100); // make sure somewhat slow calls are handled
+ assertEquals("None of these calls should have been discarded", 0,
+ scheduler.getNumGeneralCallsDropped());
+
+ // now slow calls and the ones to be dropped
+ for (int i = 0; i < 20; i++) {
+ CallRunner cr = getMockedCallRunner();
+ Thread.sleep(12);
+ scheduler.dispatch(cr);
+ }
+
+ Thread.sleep(100); // make sure somewhat slow calls are handled
+ assertTrue("There should have been at least 12 calls dropped",
+ scheduler.getNumGeneralCallsDropped() > 12);
+ } finally {
+ scheduler.stop();
+ }
+ }
+
+ private CallRunner getMockedCallRunner() throws IOException {
+ CallRunner putCallTask = mock(CallRunner.class);
+ RpcServer.Call putCall = mock(RpcServer.Call.class);
+ putCall.param = RequestConverter.buildMutateRequest(
+ Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
+ RPCProtos.RequestHeader putHead = RPCProtos.RequestHeader.newBuilder().setMethodName("mutate").build();
+ when(putCallTask.getCall()).thenReturn(putCall);
+ when(putCall.getHeader()).thenReturn(putHead);
+ putCall.timestamp = System.currentTimeMillis();
+ return putCallTask;
+ }
}