From 411e3cdb6d47e622bf3aaec1c19380ba42d22b21 Mon Sep 17 00:00:00 2001 From: stack Date: Tue, 14 Jun 2016 11:18:34 -0700 Subject: [PATCH] HBASE-16023 Fastpath for the FIFO rpcscheduler Adds an executor that does balanced queue and fast path handing off requests directly to waiting handlers if any present. Idea taken from Apace Kudu (incubating). See https://gerrit.cloudera.org/#/c/2938/7/src/kudu/rpc/service_queue.h M hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java Refactor which makes a Handler type. Put all 'handler' stuff inside this new type. Also make it so subclass can provide its own Handler type. M hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java Name the handler threads for their type so can tell if configs are having an effect. Signed-off-by: stack --- .../hbase/ipc/BalancedQueueRpcExecutor.java | 5 +- ...oWithFastPathBalancedQueueRpcExecutor.java | 116 ++++++++++++ .../apache/hadoop/hbase/ipc/RpcExecutor.java | 166 +++++++++++------- .../hadoop/hbase/ipc/SimpleRpcScheduler.java | 32 ++-- .../hbase/ipc/TestSimpleRpcScheduler.java | 10 +- 5 files changed, 240 insertions(+), 89 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoWithFastPathBalancedQueueRpcExecutor.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java index 350522187c8..241d36ea397 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java @@ -33,7 +33,8 @@ import org.apache.hadoop.hbase.util.ReflectionUtils; /** * An {@link RpcExecutor} that will balance requests evenly across all its queues, but still remains - * efficient with a single queue via an inlinable queue balancing mechanism. + * efficient with a single queue via an inlinable queue balancing mechanism. Defaults to FIFO but + * you can pass an alternate queue class to use. */ @InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX }) @InterfaceStability.Evolving @@ -103,4 +104,4 @@ public class BalancedQueueRpcExecutor extends RpcExecutor { public List> getQueues() { return queues; } -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoWithFastPathBalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoWithFastPathBalancedQueueRpcExecutor.java new file mode 100644 index 00000000000..1951dd0ea80 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoWithFastPathBalancedQueueRpcExecutor.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.util.Deque; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * FIFO balanced queue executor with a fastpath. Because this is FIFO, it has no respect for + * ordering so a fast path skipping the queuing of Calls if an Handler is available, is possible. + * Just pass the Call direct to waiting Handler thread. Try to keep the hot Handlers bubbling + * rather than let them go cold and lose context. Idea taken from Apace Kudu (incubating). See + * https://gerrit.cloudera.org/#/c/2938/7/src/kudu/rpc/service_queue.h + */ +@InterfaceAudience.Private +public class FifoWithFastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor { + // Depends on default behavior of BalancedQueueRpcExecutor being FIFO! + + /* + * Stack of Handlers waiting for work. + */ + private final Deque fastPathHandlerStack = new ConcurrentLinkedDeque<>(); + + public FifoWithFastPathBalancedQueueRpcExecutor(final String name, final int handlerCount, + final int numQueues, final int maxQueueLength, final Configuration conf, + final Abortable abortable) { + super(name, handlerCount, numQueues, conf, abortable, LinkedBlockingQueue.class, + maxQueueLength); + } + + @Override + protected Handler getHandler(String name, double handlerFailureThreshhold, + BlockingQueue q) { + return new FastPathHandler(name, handlerFailureThreshhold, q, fastPathHandlerStack); + } + + @Override + public boolean dispatch(CallRunner callTask) throws InterruptedException { + FastPathHandler handler = popReadyHandler(); + return handler != null? handler.loadCallRunner(callTask): super.dispatch(callTask); + } + + /** + * @return Pop a Handler instance if one available ready-to-go or else return null. + */ + private FastPathHandler popReadyHandler() { + return this.fastPathHandlerStack.poll(); + } + + class FastPathHandler extends Handler { + // Below are for fast-path support. Push this Handler on to the fastPathHandlerStack Deque + // if an empty queue of CallRunners so we are available for direct handoff when one comes in. + final Deque fastPathHandlerStack; + // Semaphore to coordinate loading of fastpathed loadedTask and our running it. + private Semaphore semaphore = new Semaphore(1); + // The task we get when fast-pathing. + private CallRunner loadedCallRunner; + + FastPathHandler(String name, double handlerFailureThreshhold, BlockingQueue q, + final Deque fastPathHandlerStack) { + super(name, handlerFailureThreshhold, q); + this.fastPathHandlerStack = fastPathHandlerStack; + this.semaphore.drainPermits(); + } + + protected CallRunner getCallRunner() throws InterruptedException { + // Get a callrunner if one in the Q. + CallRunner cr = this.q.poll(); + if (cr == null) { + // Else, if a fastPathHandlerStack present and no callrunner in Q, register ourselves for + // the fastpath handoff done via fastPathHandlerStack. + if (this.fastPathHandlerStack != null) { + this.fastPathHandlerStack.push(this); + this.semaphore.acquire(); + cr = this.loadedCallRunner; + } else { + // No fastpath available. Block until a task comes available. + cr = super.getCallRunner(); + } + } + return cr; + } + + /** + * @param task Task gotten via fastpath. + * @return True if we successfully loaded our task + */ + boolean loadCallRunner(final CallRunner cr) { + this.loadedCallRunner = cr; + this.semaphore.release(); + return true; + } + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java index 880df368ce5..5b6c6c8fe86 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.ipc; + import java.util.ArrayList; import java.util.List; import java.util.Locale; @@ -31,15 +32,17 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.util.StringUtils; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +/** + * Runs the CallRunners passed here via {@link #dispatch(CallRunner)}. Subclass and add particular + * scheduling behavior. + */ @InterfaceAudience.Private -@InterfaceStability.Evolving public abstract class RpcExecutor { private static final Log LOG = LogFactory.getLog(RpcExecutor.class); @@ -48,7 +51,7 @@ public abstract class RpcExecutor { protected volatile int currentQueueLimit; private final AtomicInteger activeHandlerCount = new AtomicInteger(0); - private final List handlers; + private final List handlers; private final int handlerCount; private final String name; private final AtomicInteger failedHandlerCount = new AtomicInteger(0); @@ -59,7 +62,7 @@ public abstract class RpcExecutor { private Abortable abortable = null; public RpcExecutor(final String name, final int handlerCount) { - this.handlers = new ArrayList(handlerCount); + this.handlers = new ArrayList(handlerCount); this.handlerCount = handlerCount; this.name = Strings.nullToEmpty(name); } @@ -101,75 +104,111 @@ public abstract class RpcExecutor { startHandlers(null, handlerCount, callQueues, 0, callQueues.size(), port); } + /** + * Override if providing alternate Handler implementation. + */ + protected Handler getHandler(final String name, final double handlerFailureThreshhold, + final BlockingQueue q) { + return new Handler(name, handlerFailureThreshhold, q); + } + + /** + * Start up our handlers. + */ protected void startHandlers(final String nameSuffix, final int numHandlers, final List> callQueues, final int qindex, final int qsize, final int port) { final String threadPrefix = name + Strings.nullToEmpty(nameSuffix); - for (int i = 0; i < numHandlers; i++) { - final int index = qindex + (i % qsize); - Thread t = new Thread(new Runnable() { - @Override - public void run() { - consumerLoop(callQueues.get(index)); - } - }); - t.setDaemon(true); - t.setName(threadPrefix + "RpcServer.handler=" + handlers.size() + - ",queue=" + index + ",port=" + port); - t.start(); - LOG.debug(threadPrefix + " Start Handler index=" + handlers.size() + " queue=" + index); - handlers.add(t); - } - } - - protected void consumerLoop(final BlockingQueue myQueue) { - boolean interrupted = false; double handlerFailureThreshhold = conf == null ? 1.0 : conf.getDouble(HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT, HConstants.DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT); - try { - while (running) { - try { - MonitoredRPCHandler status = RpcServer.getStatus(); - CallRunner task = myQueue.take(); - task.setStatus(status); + for (int i = 0; i < numHandlers; i++) { + final int index = qindex + (i % qsize); + String name = "RpcServer." + threadPrefix + ".handler=" + handlers.size() + ",queue=" + + index + ",port=" + port; + Handler handler = getHandler(name, handlerFailureThreshhold, callQueues.get(index)); + handler.start(); + LOG.debug("Started " + name); + handlers.add(handler); + } + } + + /** + * Handler thread run the {@link CallRunner#run()} in. + */ + protected class Handler extends Thread { + /** + * Q to find CallRunners to run in. + */ + final BlockingQueue q; + + final double handlerFailureThreshhold; + + Handler(final String name, final double handlerFailureThreshhold, + final BlockingQueue q) { + super(name); + setDaemon(true); + this.q = q; + this.handlerFailureThreshhold = handlerFailureThreshhold; + } + + /** + * @return A {@link CallRunner} + * @throws InterruptedException + */ + protected CallRunner getCallRunner() throws InterruptedException { + return this.q.take(); + } + + @Override + public void run() { + boolean interrupted = false; + try { + while (running) { try { - activeHandlerCount.incrementAndGet(); - task.run(); - } catch (Throwable e) { - if (e instanceof Error) { - int failedCount = failedHandlerCount.incrementAndGet(); - if (handlerFailureThreshhold >= 0 - && failedCount > handlerCount * handlerFailureThreshhold) { - String message = - "Number of failed RpcServer handler exceeded threshhold " - + handlerFailureThreshhold + " with failed reason: " - + StringUtils.stringifyException(e); - if (abortable != null) { - abortable.abort(message, e); - } else { - LOG.error("Received " + StringUtils.stringifyException(e) - + " but not aborting due to abortable being null"); - throw e; - } - } else { - LOG.warn("RpcServer handler threads encountered errors " - + StringUtils.stringifyException(e)); - } - } else { - LOG.warn("RpcServer handler threads encountered exceptions " - + StringUtils.stringifyException(e)); - } - } finally { - activeHandlerCount.decrementAndGet(); + run(getCallRunner()); + } catch (InterruptedException e) { + interrupted = true; } - } catch (InterruptedException e) { - interrupted = true; + } + } catch (Exception e) { + LOG.warn(e); + throw e; + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); } } - } finally { - if (interrupted) { - Thread.currentThread().interrupt(); + } + + private void run(CallRunner cr) { + MonitoredRPCHandler status = RpcServer.getStatus(); + cr.setStatus(status); + try { + activeHandlerCount.incrementAndGet(); + cr.run(); + } catch (Throwable e) { + if (e instanceof Error) { + int failedCount = failedHandlerCount.incrementAndGet(); + if (this.handlerFailureThreshhold >= 0 && + failedCount > handlerCount * this.handlerFailureThreshhold) { + String message = "Number of failed RpcServer handler runs exceeded threshhold " + + this.handlerFailureThreshhold + "; reason: " + StringUtils.stringifyException(e); + if (abortable != null) { + abortable.abort(message, e); + } else { + LOG.error("Error but can't abort because abortable is null: " + + StringUtils.stringifyException(e)); + throw e; + } + } else { + LOG.warn("Handler errors " + StringUtils.stringifyException(e)); + } + } else { + LOG.warn("Handler exception " + StringUtils.stringifyException(e)); + } + } finally { + activeHandlerCount.decrementAndGet(); } } } @@ -194,7 +233,6 @@ public abstract class RpcExecutor { * All requests go to the first queue, at index 0 */ private static QueueBalancer ONE_QUEUE = new QueueBalancer() { - @Override public int getNextQueue() { return 0; @@ -227,4 +265,4 @@ public abstract class RpcExecutor { } currentQueueLimit = conf.getInt(configKey, currentQueueLimit); } -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index 85bf78d2799..13204cc0c16 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -99,7 +99,7 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); - if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) { + if (isCodelQueueType(callQueueType)) { // update CoDel Scheduler tunables int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY, CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY); @@ -207,7 +207,7 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs callExecutor = new RWQueueRpcExecutor("RW.deadline.Q", handlerCount, numCallQueues, callqReadShare, callqScanShare, maxQueueLength, conf, abortable, BoundedPriorityBlockingQueue.class, callPriority); - } else if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) { + } else if (isCodelQueueType(callQueueType)) { Object[] callQueueInitArgs = {maxQueueLength, codelTargetDelay, codelInterval, codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches}; callExecutor = new RWQueueRpcExecutor("RW.codel.Q", handlerCount, @@ -223,33 +223,37 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs if (isDeadlineQueueType(callQueueType)) { CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority); callExecutor = - new BalancedQueueRpcExecutor("B.deadline.Q", handlerCount, numCallQueues, + new BalancedQueueRpcExecutor("BQDeadline.default", handlerCount, numCallQueues, conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority); - } else if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) { + } else if (isCodelQueueType(callQueueType)) { callExecutor = - new BalancedQueueRpcExecutor("B.codel.Q", handlerCount, numCallQueues, + new BalancedQueueRpcExecutor("BQCodel.default", handlerCount, numCallQueues, conf, abortable, AdaptiveLifoCoDelCallQueue.class, maxQueueLength, codelTargetDelay, codelInterval, codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches); } else { - callExecutor = new BalancedQueueRpcExecutor("B.fifo.Q", handlerCount, - numCallQueues, maxQueueLength, conf, abortable); + // FifoWFPBQ = FifoWithFastPathBalancedQueueRpcExecutor + callExecutor = new FifoWithFastPathBalancedQueueRpcExecutor("FifoWFPBQ.default", + handlerCount, numCallQueues, maxQueueLength, conf, abortable); } } // Create 2 queues to help priorityExecutor be more scalable. - this.priorityExecutor = priorityHandlerCount > 0 ? - new BalancedQueueRpcExecutor("B.priority.fifo.Q", priorityHandlerCount, 2, - maxPriorityQueueLength): - null; - this.replicationExecutor = - replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("B.replication.fifo.Q", - replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null; + this.priorityExecutor = priorityHandlerCount > 0? + new FifoWithFastPathBalancedQueueRpcExecutor("FifoWFPBQ.priority", priorityHandlerCount, + 2, maxPriorityQueueLength, conf, abortable): null; + this.replicationExecutor = replicationHandlerCount > 0? + new FifoWithFastPathBalancedQueueRpcExecutor("FifoWFPBQ.replication", + replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null; } private static boolean isDeadlineQueueType(final String callQueueType) { return callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); } + private static boolean isCodelQueueType(final String callQueueType) { + return callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE); + } + public SimpleRpcScheduler( Configuration conf, int handlerCount, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java index 2f71980baca..db4485ad22d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java @@ -40,19 +40,12 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Maps; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.ipc.RpcServer.Call; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl; import org.apache.hadoop.hbase.protobuf.RequestConverter; @@ -60,7 +53,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.testclassification.RPCTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdge; @@ -243,7 +235,7 @@ public class TestSimpleRpcScheduler { // -> WITH REORDER [10 10 10 10 10 10 50 100] -> 530 (Deadline Queue) if (queueType.equals(SimpleRpcScheduler.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { assertEquals(530, totalTime); - } else /* if (queueType.equals(SimpleRpcScheduler.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)) */ { + } else if (queueType.equals(SimpleRpcScheduler.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)) { assertEquals(930, totalTime); } } finally {