HBASE-16023 Fastpath for the FIFO rpcscheduler Adds an executor that does balanced queue and fast path handing off requests directly to waiting handlers if any present. Idea taken from Apace Kudu (incubating). See https://gerrit.cloudera.org/#/c/2938/7/src/kudu/rpc/service_queue.h

M hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
 Refactor which makes a Handler type. Put all 'handler' stuff inside this
 new type. Also make it so subclass can provide its own Handler type.

M hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
 Name the handler threads for their type so can tell if configs are
 having an effect.

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
stack 2016-06-14 11:18:34 -07:00
parent fa50d456a8
commit ee86e91e7e
5 changed files with 244 additions and 85 deletions

View File

@ -33,7 +33,8 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
/** /**
* An {@link RpcExecutor} that will balance requests evenly across all its queues, but still remains * An {@link RpcExecutor} that will balance requests evenly across all its queues, but still remains
* efficient with a single queue via an inlinable queue balancing mechanism. * efficient with a single queue via an inlinable queue balancing mechanism. Defaults to FIFO but
* you can pass an alternate queue class to use.
*/ */
@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX }) @InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX })
@InterfaceStability.Evolving @InterfaceStability.Evolving
@ -103,4 +104,4 @@ public class BalancedQueueRpcExecutor extends RpcExecutor {
public List<BlockingQueue<CallRunner>> getQueues() { public List<BlockingQueue<CallRunner>> getQueues() {
return queues; return queues;
} }
} }

View File

@ -0,0 +1,116 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.util.Deque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* FIFO balanced queue executor with a fastpath. Because this is FIFO, it has no respect for
* ordering so a fast path skipping the queuing of Calls if an Handler is available, is possible.
* Just pass the Call direct to waiting Handler thread. Try to keep the hot Handlers bubbling
* rather than let them go cold and lose context. Idea taken from Apace Kudu (incubating). See
* https://gerrit.cloudera.org/#/c/2938/7/src/kudu/rpc/service_queue.h
*/
@InterfaceAudience.Private
public class FifoWithFastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
// Depends on default behavior of BalancedQueueRpcExecutor being FIFO!
/*
* Stack of Handlers waiting for work.
*/
private final Deque<FastPathHandler> fastPathHandlerStack = new ConcurrentLinkedDeque<>();
public FifoWithFastPathBalancedQueueRpcExecutor(final String name, final int handlerCount,
final int numQueues, final int maxQueueLength, final Configuration conf,
final Abortable abortable) {
super(name, handlerCount, numQueues, conf, abortable, LinkedBlockingQueue.class,
maxQueueLength);
}
@Override
protected Handler getHandler(String name, double handlerFailureThreshhold,
BlockingQueue<CallRunner> q) {
return new FastPathHandler(name, handlerFailureThreshhold, q, fastPathHandlerStack);
}
@Override
public boolean dispatch(CallRunner callTask) throws InterruptedException {
FastPathHandler handler = popReadyHandler();
return handler != null? handler.loadCallRunner(callTask): super.dispatch(callTask);
}
/**
* @return Pop a Handler instance if one available ready-to-go or else return null.
*/
private FastPathHandler popReadyHandler() {
return this.fastPathHandlerStack.poll();
}
class FastPathHandler extends Handler {
// Below are for fast-path support. Push this Handler on to the fastPathHandlerStack Deque
// if an empty queue of CallRunners so we are available for direct handoff when one comes in.
final Deque<FastPathHandler> fastPathHandlerStack;
// Semaphore to coordinate loading of fastpathed loadedTask and our running it.
private Semaphore semaphore = new Semaphore(1);
// The task we get when fast-pathing.
private CallRunner loadedCallRunner;
FastPathHandler(String name, double handlerFailureThreshhold, BlockingQueue<CallRunner> q,
final Deque<FastPathHandler> fastPathHandlerStack) {
super(name, handlerFailureThreshhold, q);
this.fastPathHandlerStack = fastPathHandlerStack;
this.semaphore.drainPermits();
}
protected CallRunner getCallRunner() throws InterruptedException {
// Get a callrunner if one in the Q.
CallRunner cr = this.q.poll();
if (cr == null) {
// Else, if a fastPathHandlerStack present and no callrunner in Q, register ourselves for
// the fastpath handoff done via fastPathHandlerStack.
if (this.fastPathHandlerStack != null) {
this.fastPathHandlerStack.push(this);
this.semaphore.acquire();
cr = this.loadedCallRunner;
} else {
// No fastpath available. Block until a task comes available.
cr = super.getCallRunner();
}
}
return cr;
}
/**
* @param task Task gotten via fastpath.
* @return True if we successfully loaded our task
*/
boolean loadCallRunner(final CallRunner cr) {
this.loadedCallRunner = cr;
this.semaphore.release();
return true;
}
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.ipc; package org.apache.hadoop.hbase.ipc;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
@ -31,15 +32,17 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Strings; import com.google.common.base.Strings;
/**
* Runs the CallRunners passed here via {@link #dispatch(CallRunner)}. Subclass and add particular
* scheduling behavior.
*/
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving
public abstract class RpcExecutor { public abstract class RpcExecutor {
private static final Log LOG = LogFactory.getLog(RpcExecutor.class); private static final Log LOG = LogFactory.getLog(RpcExecutor.class);
@ -48,7 +51,7 @@ public abstract class RpcExecutor {
protected volatile int currentQueueLimit; protected volatile int currentQueueLimit;
private final AtomicInteger activeHandlerCount = new AtomicInteger(0); private final AtomicInteger activeHandlerCount = new AtomicInteger(0);
private final List<Thread> handlers; private final List<Handler> handlers;
private final int handlerCount; private final int handlerCount;
private final String name; private final String name;
private final AtomicInteger failedHandlerCount = new AtomicInteger(0); private final AtomicInteger failedHandlerCount = new AtomicInteger(0);
@ -59,7 +62,7 @@ public abstract class RpcExecutor {
private Abortable abortable = null; private Abortable abortable = null;
public RpcExecutor(final String name, final int handlerCount) { public RpcExecutor(final String name, final int handlerCount) {
this.handlers = new ArrayList<Thread>(handlerCount); this.handlers = new ArrayList<Handler>(handlerCount);
this.handlerCount = handlerCount; this.handlerCount = handlerCount;
this.name = Strings.nullToEmpty(name); this.name = Strings.nullToEmpty(name);
} }
@ -101,75 +104,111 @@ public abstract class RpcExecutor {
startHandlers(null, handlerCount, callQueues, 0, callQueues.size(), port); startHandlers(null, handlerCount, callQueues, 0, callQueues.size(), port);
} }
/**
* Override if providing alternate Handler implementation.
*/
protected Handler getHandler(final String name, final double handlerFailureThreshhold,
final BlockingQueue<CallRunner> q) {
return new Handler(name, handlerFailureThreshhold, q);
}
/**
* Start up our handlers.
*/
protected void startHandlers(final String nameSuffix, final int numHandlers, protected void startHandlers(final String nameSuffix, final int numHandlers,
final List<BlockingQueue<CallRunner>> callQueues, final List<BlockingQueue<CallRunner>> callQueues,
final int qindex, final int qsize, final int port) { final int qindex, final int qsize, final int port) {
final String threadPrefix = name + Strings.nullToEmpty(nameSuffix); final String threadPrefix = name + Strings.nullToEmpty(nameSuffix);
for (int i = 0; i < numHandlers; i++) {
final int index = qindex + (i % qsize);
Thread t = new Thread(new Runnable() {
@Override
public void run() {
consumerLoop(callQueues.get(index));
}
});
t.setDaemon(true);
t.setName(threadPrefix + "RpcServer.handler=" + handlers.size() +
",queue=" + index + ",port=" + port);
t.start();
LOG.debug(threadPrefix + " Start Handler index=" + handlers.size() + " queue=" + index);
handlers.add(t);
}
}
protected void consumerLoop(final BlockingQueue<CallRunner> myQueue) {
boolean interrupted = false;
double handlerFailureThreshhold = double handlerFailureThreshhold =
conf == null ? 1.0 : conf.getDouble(HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT, conf == null ? 1.0 : conf.getDouble(HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT); HConstants.DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT);
try { for (int i = 0; i < numHandlers; i++) {
while (running) { final int index = qindex + (i % qsize);
try { String name = "RpcServer." + threadPrefix + ".handler=" + handlers.size() + ",queue=" +
MonitoredRPCHandler status = RpcServer.getStatus(); index + ",port=" + port;
CallRunner task = myQueue.take(); Handler handler = getHandler(name, handlerFailureThreshhold, callQueues.get(index));
task.setStatus(status); handler.start();
LOG.debug("Started " + name);
handlers.add(handler);
}
}
/**
* Handler thread run the {@link CallRunner#run()} in.
*/
protected class Handler extends Thread {
/**
* Q to find CallRunners to run in.
*/
final BlockingQueue<CallRunner> q;
final double handlerFailureThreshhold;
Handler(final String name, final double handlerFailureThreshhold,
final BlockingQueue<CallRunner> q) {
super(name);
setDaemon(true);
this.q = q;
this.handlerFailureThreshhold = handlerFailureThreshhold;
}
/**
* @return A {@link CallRunner}
* @throws InterruptedException
*/
protected CallRunner getCallRunner() throws InterruptedException {
return this.q.take();
}
@Override
public void run() {
boolean interrupted = false;
try {
while (running) {
try { try {
activeHandlerCount.incrementAndGet(); run(getCallRunner());
task.run(); } catch (InterruptedException e) {
} catch (Throwable e) { interrupted = true;
if (e instanceof Error) {
int failedCount = failedHandlerCount.incrementAndGet();
if (handlerFailureThreshhold >= 0
&& failedCount > handlerCount * handlerFailureThreshhold) {
String message =
"Number of failed RpcServer handler exceeded threshhold "
+ handlerFailureThreshhold + " with failed reason: "
+ StringUtils.stringifyException(e);
if (abortable != null) {
abortable.abort(message, e);
} else {
LOG.error("Received " + StringUtils.stringifyException(e)
+ " but not aborting due to abortable being null");
throw e;
}
} else {
LOG.warn("RpcServer handler threads encountered errors "
+ StringUtils.stringifyException(e));
}
} else {
LOG.warn("RpcServer handler threads encountered exceptions "
+ StringUtils.stringifyException(e));
}
} finally {
activeHandlerCount.decrementAndGet();
} }
} catch (InterruptedException e) { }
interrupted = true; } catch (Exception e) {
LOG.warn(e);
throw e;
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
} }
} }
} finally { }
if (interrupted) {
Thread.currentThread().interrupt(); private void run(CallRunner cr) {
MonitoredRPCHandler status = RpcServer.getStatus();
cr.setStatus(status);
try {
activeHandlerCount.incrementAndGet();
cr.run();
} catch (Throwable e) {
if (e instanceof Error) {
int failedCount = failedHandlerCount.incrementAndGet();
if (this.handlerFailureThreshhold >= 0 &&
failedCount > handlerCount * this.handlerFailureThreshhold) {
String message = "Number of failed RpcServer handler runs exceeded threshhold " +
this.handlerFailureThreshhold + "; reason: " + StringUtils.stringifyException(e);
if (abortable != null) {
abortable.abort(message, e);
} else {
LOG.error("Error but can't abort because abortable is null: " +
StringUtils.stringifyException(e));
throw e;
}
} else {
LOG.warn("Handler errors " + StringUtils.stringifyException(e));
}
} else {
LOG.warn("Handler exception " + StringUtils.stringifyException(e));
}
} finally {
activeHandlerCount.decrementAndGet();
} }
} }
} }
@ -194,7 +233,6 @@ public abstract class RpcExecutor {
* All requests go to the first queue, at index 0 * All requests go to the first queue, at index 0
*/ */
private static QueueBalancer ONE_QUEUE = new QueueBalancer() { private static QueueBalancer ONE_QUEUE = new QueueBalancer() {
@Override @Override
public int getNextQueue() { public int getNextQueue() {
return 0; return 0;
@ -227,4 +265,4 @@ public abstract class RpcExecutor {
} }
currentQueueLimit = conf.getInt(configKey, currentQueueLimit); currentQueueLimit = conf.getInt(configKey, currentQueueLimit);
} }
} }

View File

@ -99,7 +99,7 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY,
CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) { if (isCodelQueueType(callQueueType)) {
// update CoDel Scheduler tunables // update CoDel Scheduler tunables
int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY, int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY,
CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY); CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY);
@ -204,18 +204,19 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
// multiple read/write queues // multiple read/write queues
if (isDeadlineQueueType(callQueueType)) { if (isDeadlineQueueType(callQueueType)) {
CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority); CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
callExecutor = new RWQueueRpcExecutor("RW.deadline.Q", handlerCount, numCallQueues, callExecutor = new RWQueueRpcExecutor("DeadlineRWQ.default", handlerCount, numCallQueues,
callqReadShare, callqScanShare, maxQueueLength, conf, abortable, callqReadShare, callqScanShare, maxQueueLength, conf, abortable,
BoundedPriorityBlockingQueue.class, callPriority); BoundedPriorityBlockingQueue.class, callPriority);
} else if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) { } else if (isCodelQueueType(callQueueType)) {
Object[] callQueueInitArgs = {maxQueueLength, codelTargetDelay, codelInterval, Object[] callQueueInitArgs = {maxQueueLength, codelTargetDelay, codelInterval,
codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches}; codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches};
callExecutor = new RWQueueRpcExecutor("RW.codel.Q", handlerCount, callExecutor = new RWQueueRpcExecutor("CodelRWQ.default", handlerCount,
numCallQueues, callqReadShare, callqScanShare, numCallQueues, callqReadShare, callqScanShare,
AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs, AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs,
AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs); AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs);
} else { } else {
callExecutor = new RWQueueRpcExecutor("RW.fifo.Q", handlerCount, numCallQueues, // FifoWFPBQ = FifoWithFastPathBalancedQueueRpcExecutor
callExecutor = new RWQueueRpcExecutor("FifoRWQ.default", handlerCount, numCallQueues,
callqReadShare, callqScanShare, maxQueueLength, conf, abortable); callqReadShare, callqScanShare, maxQueueLength, conf, abortable);
} }
} else { } else {
@ -223,33 +224,37 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
if (isDeadlineQueueType(callQueueType)) { if (isDeadlineQueueType(callQueueType)) {
CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority); CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
callExecutor = callExecutor =
new BalancedQueueRpcExecutor("B.deadline.Q", handlerCount, numCallQueues, new BalancedQueueRpcExecutor("DeadlineBQ.default", handlerCount, numCallQueues,
conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority); conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
} else if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) { } else if (isCodelQueueType(callQueueType)) {
callExecutor = callExecutor =
new BalancedQueueRpcExecutor("B.codel.Q", handlerCount, numCallQueues, new BalancedQueueRpcExecutor("CodelBQ.default", handlerCount, numCallQueues,
conf, abortable, AdaptiveLifoCoDelCallQueue.class, maxQueueLength, conf, abortable, AdaptiveLifoCoDelCallQueue.class, maxQueueLength,
codelTargetDelay, codelInterval, codelLifoThreshold, codelTargetDelay, codelInterval, codelLifoThreshold,
numGeneralCallsDropped, numLifoModeSwitches); numGeneralCallsDropped, numLifoModeSwitches);
} else { } else {
callExecutor = new BalancedQueueRpcExecutor("B.fifo.Q", handlerCount, // FifoWFPBQ = FifoWithFastPathBalancedQueueRpcExecutor
numCallQueues, maxQueueLength, conf, abortable); callExecutor = new FifoWithFastPathBalancedQueueRpcExecutor("FifoWFPBQ.default",
handlerCount, numCallQueues, maxQueueLength, conf, abortable);
} }
} }
// Create 2 queues to help priorityExecutor be more scalable. // Create 2 queues to help priorityExecutor be more scalable.
this.priorityExecutor = priorityHandlerCount > 0 ? this.priorityExecutor = priorityHandlerCount > 0?
new BalancedQueueRpcExecutor("B.priority.fifo.Q", priorityHandlerCount, 2, new FifoWithFastPathBalancedQueueRpcExecutor("FifoWFPBQ.priority", priorityHandlerCount,
maxPriorityQueueLength): 2, maxPriorityQueueLength, conf, abortable): null;
null; this.replicationExecutor = replicationHandlerCount > 0?
this.replicationExecutor = new FifoWithFastPathBalancedQueueRpcExecutor("FifoWFPBQ.replication",
replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("B.replication.fifo.Q", replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null;
replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null;
} }
private static boolean isDeadlineQueueType(final String callQueueType) { private static boolean isDeadlineQueueType(final String callQueueType) {
return callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); return callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
} }
private static boolean isCodelQueueType(final String callQueueType) {
return callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE);
}
public SimpleRpcScheduler( public SimpleRpcScheduler(
Configuration conf, Configuration conf,
int handlerCount, int handlerCount,

View File

@ -55,7 +55,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdge;
@ -242,7 +241,7 @@ public class TestSimpleRpcScheduler {
// -> WITH REORDER [10 10 10 10 10 10 50 100] -> 530 (Deadline Queue) // -> WITH REORDER [10 10 10 10 10 10 50 100] -> 530 (Deadline Queue)
if (queueType.equals(SimpleRpcScheduler.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { if (queueType.equals(SimpleRpcScheduler.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
assertEquals(530, totalTime); assertEquals(530, totalTime);
} else /* if (queueType.equals(SimpleRpcScheduler.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)) */ { } else if (queueType.equals(SimpleRpcScheduler.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)) {
assertEquals(930, totalTime); assertEquals(930, totalTime);
} }
} finally { } finally {