HBASE-16089 Add on FastPath for CoDel

This commit is contained in:
Elliott Clark 2016-06-22 16:34:40 -07:00
parent 1d06850f40
commit fc4b8aa89d
5 changed files with 117 additions and 56 deletions

View File

@ -73,7 +73,7 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
private AtomicBoolean resetDelay = new AtomicBoolean(true); private AtomicBoolean resetDelay = new AtomicBoolean(true);
// if we're in this mode, "long" calls are getting dropped // if we're in this mode, "long" calls are getting dropped
private volatile boolean isOverloaded; private AtomicBoolean isOverloaded = new AtomicBoolean(false);
public AdaptiveLifoCoDelCallQueue(int capacity, int targetDelay, int interval, public AdaptiveLifoCoDelCallQueue(int capacity, int targetDelay, int interval,
double lifoThreshold, AtomicLong numGeneralCallsDropped, AtomicLong numLifoModeSwitches) { double lifoThreshold, AtomicLong numGeneralCallsDropped, AtomicLong numLifoModeSwitches) {
@ -126,6 +126,34 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
} }
} }
@Override
public CallRunner poll() {
CallRunner cr;
boolean switched = false;
while(true) {
if (((double) queue.size() / this.maxCapacity) > lifoThreshold) {
// Only count once per switch.
if (!switched) {
switched = true;
numLifoModeSwitches.incrementAndGet();
}
cr = queue.pollLast();
} else {
switched = false;
cr = queue.pollFirst();
}
if (cr == null) {
return cr;
}
if (needToDrop(cr)) {
numGeneralCallsDropped.incrementAndGet();
cr.drop();
} else {
return cr;
}
}
}
/** /**
* @param callRunner to validate * @param callRunner to validate
* @return true if this call needs to be skipped based on call timestamp * @return true if this call needs to be skipped based on call timestamp
@ -136,28 +164,28 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
long callDelay = now - callRunner.getCall().timestamp; long callDelay = now - callRunner.getCall().timestamp;
long localMinDelay = this.minDelay; long localMinDelay = this.minDelay;
if (now > intervalTime && !resetDelay.getAndSet(true)) {
// Try and determine if we should reset
// the delay time and determine overload
if (now > intervalTime &&
!resetDelay.get() &&
!resetDelay.getAndSet(true)) {
intervalTime = now + codelInterval; intervalTime = now + codelInterval;
if (localMinDelay > codelTargetDelay) { isOverloaded.set(localMinDelay > codelTargetDelay);
isOverloaded = true;
} else {
isOverloaded = false;
}
} }
if (resetDelay.getAndSet(false)) { // If it looks like we should reset the delay
// time do it only once on one thread
if (resetDelay.get() && resetDelay.getAndSet(false)) {
minDelay = callDelay; minDelay = callDelay;
// we just reset the delay dunno about how this will work
return false; return false;
} else if (callDelay < localMinDelay) { } else if (callDelay < localMinDelay) {
minDelay = callDelay; minDelay = callDelay;
} }
if (isOverloaded && callDelay > 2 * codelTargetDelay) { return isOverloaded.get() && callDelay > 2 * codelTargetDelay;
return true;
} else {
return false;
}
} }
// Generic BlockingQueue methods we support // Generic BlockingQueue methods we support
@ -185,11 +213,6 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
+ " but take() and offer() methods"); + " but take() and offer() methods");
} }
@Override
public CallRunner poll() {
throw new UnsupportedOperationException("This class doesn't support anything,"
+ " but take() and offer() methods");
}
@Override @Override
public CallRunner peek() { public CallRunner peek() {

View File

@ -64,8 +64,10 @@ public class CallRunner {
this.call = call; this.call = call;
this.rpcServer = rpcServer; this.rpcServer = rpcServer;
// Add size of the call to queue size. // Add size of the call to queue size.
if (call != null && rpcServer != null) {
this.rpcServer.addCallSize(call.getSize()); this.rpcServer.addCallSize(call.getSize());
} }
}
public Call getCall() { public Call getCall() {
return call; return call;

View File

@ -22,20 +22,21 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
/** /**
* FIFO balanced queue executor with a fastpath. Because this is FIFO, it has no respect for * Balanced queue executor with a fastpath. Because this is FIFO, it has no respect for
* ordering so a fast path skipping the queuing of Calls if an Handler is available, is possible. * ordering so a fast path skipping the queuing of Calls if an Handler is available, is possible.
* Just pass the Call direct to waiting Handler thread. Try to keep the hot Handlers bubbling * Just pass the Call direct to waiting Handler thread. Try to keep the hot Handlers bubbling
* rather than let them go cold and lose context. Idea taken from Apace Kudu (incubating). See * rather than let them go cold and lose context. Idea taken from Apace Kudu (incubating). See
* https://gerrit.cloudera.org/#/c/2938/7/src/kudu/rpc/service_queue.h * https://gerrit.cloudera.org/#/c/2938/7/src/kudu/rpc/service_queue.h
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class FifoWithFastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor { public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
// Depends on default behavior of BalancedQueueRpcExecutor being FIFO! // Depends on default behavior of BalancedQueueRpcExecutor being FIFO!
/* /*
@ -43,13 +44,22 @@ public class FifoWithFastPathBalancedQueueRpcExecutor extends BalancedQueueRpcEx
*/ */
private final Deque<FastPathHandler> fastPathHandlerStack = new ConcurrentLinkedDeque<>(); private final Deque<FastPathHandler> fastPathHandlerStack = new ConcurrentLinkedDeque<>();
public FifoWithFastPathBalancedQueueRpcExecutor(final String name, final int handlerCount, public FastPathBalancedQueueRpcExecutor(final String name, final int handlerCount,
final int numQueues, final int maxQueueLength, final Configuration conf, final int numQueues, final int maxQueueLength, final Configuration conf,
final Abortable abortable) { final Abortable abortable) {
super(name, handlerCount, numQueues, conf, abortable, LinkedBlockingQueue.class, super(name, handlerCount, numQueues, conf, abortable, LinkedBlockingQueue.class,
maxQueueLength); maxQueueLength);
} }
public FastPathBalancedQueueRpcExecutor(String name, int handlerCount,
int numCallQueues,
Configuration conf,
Abortable abortable,
Class<? extends BlockingQueue> queueClass,
Object... args) {
super(name, handlerCount, numCallQueues, conf, abortable, queueClass, args);
}
@Override @Override
protected Handler getHandler(String name, double handlerFailureThreshhold, protected Handler getHandler(String name, double handlerFailureThreshhold,
BlockingQueue<CallRunner> q) { BlockingQueue<CallRunner> q) {

View File

@ -75,7 +75,7 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD = public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD =
"hbase.ipc.server.callqueue.codel.lifo.threshold"; "hbase.ipc.server.callqueue.codel.lifo.threshold";
public static final int CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY = 5; public static final int CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY = 100;
public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL = 100; public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL = 100;
public static final double CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD = 0.8; public static final double CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD = 0.8;
@ -215,7 +215,7 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs, AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs,
AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs); AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs);
} else { } else {
// FifoWFPBQ = FifoWithFastPathBalancedQueueRpcExecutor // FifoWFPBQ = FastPathBalancedQueueRpcExecutor
callExecutor = new RWQueueRpcExecutor("FifoRWQ.default", handlerCount, numCallQueues, callExecutor = new RWQueueRpcExecutor("FifoRWQ.default", handlerCount, numCallQueues,
callqReadShare, callqScanShare, maxQueueLength, conf, abortable); callqReadShare, callqScanShare, maxQueueLength, conf, abortable);
} }
@ -228,22 +228,22 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority); conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
} else if (isCodelQueueType(callQueueType)) { } else if (isCodelQueueType(callQueueType)) {
callExecutor = callExecutor =
new BalancedQueueRpcExecutor("CodelBQ.default", handlerCount, numCallQueues, new FastPathBalancedQueueRpcExecutor("CodelFPBQ.default", handlerCount, numCallQueues,
conf, abortable, AdaptiveLifoCoDelCallQueue.class, maxQueueLength, conf, abortable, AdaptiveLifoCoDelCallQueue.class, maxQueueLength,
codelTargetDelay, codelInterval, codelLifoThreshold, codelTargetDelay, codelInterval, codelLifoThreshold,
numGeneralCallsDropped, numLifoModeSwitches); numGeneralCallsDropped, numLifoModeSwitches);
} else { } else {
// FifoWFPBQ = FifoWithFastPathBalancedQueueRpcExecutor // FifoWFPBQ = FastPathBalancedQueueRpcExecutor
callExecutor = new FifoWithFastPathBalancedQueueRpcExecutor("FifoWFPBQ.default", callExecutor = new FastPathBalancedQueueRpcExecutor("FifoWFPBQ.default",
handlerCount, numCallQueues, maxQueueLength, conf, abortable); handlerCount, numCallQueues, maxQueueLength, conf, abortable);
} }
} }
// Create 2 queues to help priorityExecutor be more scalable. // Create 2 queues to help priorityExecutor be more scalable.
this.priorityExecutor = priorityHandlerCount > 0? this.priorityExecutor = priorityHandlerCount > 0?
new FifoWithFastPathBalancedQueueRpcExecutor("FifoWFPBQ.priority", priorityHandlerCount, new FastPathBalancedQueueRpcExecutor("FifoWFPBQ.priority", priorityHandlerCount,
2, maxPriorityQueueLength, conf, abortable): null; 2, maxPriorityQueueLength, conf, abortable): null;
this.replicationExecutor = replicationHandlerCount > 0? this.replicationExecutor = replicationHandlerCount > 0?
new FifoWithFastPathBalancedQueueRpcExecutor("FifoWFPBQ.replication", new FastPathBalancedQueueRpcExecutor("FifoWFPBQ.replication",
replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null; replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null;
} }

View File

@ -40,6 +40,7 @@ import java.util.Set;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -77,8 +78,11 @@ import com.google.protobuf.Message;
@Category({RPCTests.class, SmallTests.class}) @Category({RPCTests.class, SmallTests.class})
public class TestSimpleRpcScheduler { public class TestSimpleRpcScheduler {
@Rule public final TestRule timeout = CategoryBasedTimeout.builder().withTimeout(this.getClass()). @Rule
public final TestRule timeout =
CategoryBasedTimeout.builder().withTimeout(this.getClass()).
withLookingForStuckThread(true).build(); withLookingForStuckThread(true).build();
private static final Log LOG = LogFactory.getLog(TestSimpleRpcScheduler.class); private static final Log LOG = LogFactory.getLog(TestSimpleRpcScheduler.class);
private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() { private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() {
@ -404,7 +408,8 @@ public class TestSimpleRpcScheduler {
@Override @Override
public long currentTime() { public long currentTime() {
for (String threadNamePrefix : threadNamePrefixs) { for (String threadNamePrefix : threadNamePrefixs) {
if (Thread.currentThread().getName().startsWith(threadNamePrefix)) { String threadName = Thread.currentThread().getName();
if (threadName.startsWith(threadNamePrefix)) {
return timeQ.poll().longValue() + offset; return timeQ.poll().longValue() + offset;
} }
} }
@ -415,9 +420,9 @@ public class TestSimpleRpcScheduler {
@Test @Test
public void testCoDelScheduling() throws Exception { public void testCoDelScheduling() throws Exception {
CoDelEnvironmentEdge envEdge = new CoDelEnvironmentEdge(); CoDelEnvironmentEdge envEdge = new CoDelEnvironmentEdge();
envEdge.threadNamePrefixs.add("RW.default"); envEdge.threadNamePrefixs.add("RpcServer.CodelBQ.default.handler");
envEdge.threadNamePrefixs.add("B.default");
Configuration schedConf = HBaseConfiguration.create(); Configuration schedConf = HBaseConfiguration.create();
schedConf.setInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 250);
schedConf.set(SimpleRpcScheduler.CALL_QUEUE_TYPE_CONF_KEY, schedConf.set(SimpleRpcScheduler.CALL_QUEUE_TYPE_CONF_KEY,
SimpleRpcScheduler.CALL_QUEUE_TYPE_CODEL_CONF_VALUE); SimpleRpcScheduler.CALL_QUEUE_TYPE_CODEL_CONF_VALUE);
@ -435,8 +440,7 @@ public class TestSimpleRpcScheduler {
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
long time = System.currentTimeMillis(); long time = System.currentTimeMillis();
envEdge.timeQ.put(time); envEdge.timeQ.put(time);
CallRunner cr = getMockedCallRunner(time); CallRunner cr = getMockedCallRunner(time, 2);
Thread.sleep(5);
scheduler.dispatch(cr); scheduler.dispatch(cr);
} }
// make sure fast calls are handled // make sure fast calls are handled
@ -445,13 +449,12 @@ public class TestSimpleRpcScheduler {
assertEquals("None of these calls should have been discarded", 0, assertEquals("None of these calls should have been discarded", 0,
scheduler.getNumGeneralCallsDropped()); scheduler.getNumGeneralCallsDropped());
envEdge.offset = 6; envEdge.offset = 151;
// calls slower than min delay, but not individually slow enough to be dropped // calls slower than min delay, but not individually slow enough to be dropped
for (int i = 0; i < 20; i++) { for (int i = 0; i < 20; i++) {
long time = System.currentTimeMillis(); long time = System.currentTimeMillis();
envEdge.timeQ.put(time); envEdge.timeQ.put(time);
CallRunner cr = getMockedCallRunner(time); CallRunner cr = getMockedCallRunner(time, 2);
Thread.sleep(6);
scheduler.dispatch(cr); scheduler.dispatch(cr);
} }
@ -461,35 +464,58 @@ public class TestSimpleRpcScheduler {
assertEquals("None of these calls should have been discarded", 0, assertEquals("None of these calls should have been discarded", 0,
scheduler.getNumGeneralCallsDropped()); scheduler.getNumGeneralCallsDropped());
envEdge.offset = 12; envEdge.offset = 2000;
// now slow calls and the ones to be dropped // now slow calls and the ones to be dropped
for (int i = 0; i < 20; i++) { for (int i = 0; i < 60; i++) {
long time = System.currentTimeMillis(); long time = System.currentTimeMillis();
envEdge.timeQ.put(time); envEdge.timeQ.put(time);
CallRunner cr = getMockedCallRunner(time); CallRunner cr = getMockedCallRunner(time, 100);
Thread.sleep(12);
scheduler.dispatch(cr); scheduler.dispatch(cr);
} }
// make sure somewhat slow calls are handled // make sure somewhat slow calls are handled
waitUntilQueueEmpty(scheduler); waitUntilQueueEmpty(scheduler);
Thread.sleep(100); Thread.sleep(100);
assertTrue("There should have been at least 12 calls dropped", assertTrue(
"There should have been at least 12 calls dropped however there were "
+ scheduler.getNumGeneralCallsDropped(),
scheduler.getNumGeneralCallsDropped() > 12); scheduler.getNumGeneralCallsDropped() > 12);
} finally { } finally {
scheduler.stop(); scheduler.stop();
} }
} }
private CallRunner getMockedCallRunner(long timestamp) throws IOException { // Get mocked call that has the CallRunner sleep for a while so that the fast
CallRunner putCallTask = mock(CallRunner.class); // path isn't hit.
RpcServer.Call putCall = mock(RpcServer.Call.class); private CallRunner getMockedCallRunner(long timestamp, final long sleepTime) throws IOException {
final RpcServer.Call putCall = mock(RpcServer.Call.class);
putCall.timestamp = timestamp;
putCall.param = RequestConverter.buildMutateRequest( putCall.param = RequestConverter.buildMutateRequest(
Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))); Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
RPCProtos.RequestHeader putHead = RPCProtos.RequestHeader.newBuilder().setMethodName("mutate").build();
when(putCallTask.getCall()).thenReturn(putCall); RPCProtos.RequestHeader putHead = RPCProtos.RequestHeader.newBuilder()
.setMethodName("mutate")
.build();
when(putCall.getSize()).thenReturn(9L);
when(putCall.getHeader()).thenReturn(putHead); when(putCall.getHeader()).thenReturn(putHead);
putCall.timestamp = timestamp;
return putCallTask; CallRunner cr = new CallRunner(null, putCall) {
public void run() {
try {
LOG.warn("Sleeping for " + sleepTime);
Thread.sleep(sleepTime);
LOG.warn("Done Sleeping for " + sleepTime);
} catch (InterruptedException e) {
}
}
public Call getCall() {
return putCall;
}
public void drop() {}
};
return cr;
} }
} }