diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java index 350522187c8..241d36ea397 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java @@ -33,7 +33,8 @@ import org.apache.hadoop.hbase.util.ReflectionUtils; /** * An {@link RpcExecutor} that will balance requests evenly across all its queues, but still remains - * efficient with a single queue via an inlinable queue balancing mechanism. + * efficient with a single queue via an inlinable queue balancing mechanism. Defaults to FIFO but + * you can pass an alternate queue class to use. */ @InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX }) @InterfaceStability.Evolving @@ -103,4 +104,4 @@ public class BalancedQueueRpcExecutor extends RpcExecutor { public List> getQueues() { return queues; } -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoWithFastPathBalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoWithFastPathBalancedQueueRpcExecutor.java new file mode 100644 index 00000000000..1951dd0ea80 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoWithFastPathBalancedQueueRpcExecutor.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.util.Deque; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * FIFO balanced queue executor with a fastpath. Because this is FIFO, it has no respect for + * ordering so a fast path skipping the queuing of Calls if an Handler is available, is possible. + * Just pass the Call direct to waiting Handler thread. Try to keep the hot Handlers bubbling + * rather than let them go cold and lose context. Idea taken from Apace Kudu (incubating). See + * https://gerrit.cloudera.org/#/c/2938/7/src/kudu/rpc/service_queue.h + */ +@InterfaceAudience.Private +public class FifoWithFastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor { + // Depends on default behavior of BalancedQueueRpcExecutor being FIFO! + + /* + * Stack of Handlers waiting for work. + */ + private final Deque fastPathHandlerStack = new ConcurrentLinkedDeque<>(); + + public FifoWithFastPathBalancedQueueRpcExecutor(final String name, final int handlerCount, + final int numQueues, final int maxQueueLength, final Configuration conf, + final Abortable abortable) { + super(name, handlerCount, numQueues, conf, abortable, LinkedBlockingQueue.class, + maxQueueLength); + } + + @Override + protected Handler getHandler(String name, double handlerFailureThreshhold, + BlockingQueue q) { + return new FastPathHandler(name, handlerFailureThreshhold, q, fastPathHandlerStack); + } + + @Override + public boolean dispatch(CallRunner callTask) throws InterruptedException { + FastPathHandler handler = popReadyHandler(); + return handler != null? handler.loadCallRunner(callTask): super.dispatch(callTask); + } + + /** + * @return Pop a Handler instance if one available ready-to-go or else return null. + */ + private FastPathHandler popReadyHandler() { + return this.fastPathHandlerStack.poll(); + } + + class FastPathHandler extends Handler { + // Below are for fast-path support. Push this Handler on to the fastPathHandlerStack Deque + // if an empty queue of CallRunners so we are available for direct handoff when one comes in. + final Deque fastPathHandlerStack; + // Semaphore to coordinate loading of fastpathed loadedTask and our running it. + private Semaphore semaphore = new Semaphore(1); + // The task we get when fast-pathing. + private CallRunner loadedCallRunner; + + FastPathHandler(String name, double handlerFailureThreshhold, BlockingQueue q, + final Deque fastPathHandlerStack) { + super(name, handlerFailureThreshhold, q); + this.fastPathHandlerStack = fastPathHandlerStack; + this.semaphore.drainPermits(); + } + + protected CallRunner getCallRunner() throws InterruptedException { + // Get a callrunner if one in the Q. + CallRunner cr = this.q.poll(); + if (cr == null) { + // Else, if a fastPathHandlerStack present and no callrunner in Q, register ourselves for + // the fastpath handoff done via fastPathHandlerStack. + if (this.fastPathHandlerStack != null) { + this.fastPathHandlerStack.push(this); + this.semaphore.acquire(); + cr = this.loadedCallRunner; + } else { + // No fastpath available. Block until a task comes available. + cr = super.getCallRunner(); + } + } + return cr; + } + + /** + * @param task Task gotten via fastpath. + * @return True if we successfully loaded our task + */ + boolean loadCallRunner(final CallRunner cr) { + this.loadedCallRunner = cr; + this.semaphore.release(); + return true; + } + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java index 880df368ce5..5b6c6c8fe86 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.ipc; + import java.util.ArrayList; import java.util.List; import java.util.Locale; @@ -31,15 +32,17 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.util.StringUtils; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +/** + * Runs the CallRunners passed here via {@link #dispatch(CallRunner)}. Subclass and add particular + * scheduling behavior. + */ @InterfaceAudience.Private -@InterfaceStability.Evolving public abstract class RpcExecutor { private static final Log LOG = LogFactory.getLog(RpcExecutor.class); @@ -48,7 +51,7 @@ public abstract class RpcExecutor { protected volatile int currentQueueLimit; private final AtomicInteger activeHandlerCount = new AtomicInteger(0); - private final List handlers; + private final List handlers; private final int handlerCount; private final String name; private final AtomicInteger failedHandlerCount = new AtomicInteger(0); @@ -59,7 +62,7 @@ public abstract class RpcExecutor { private Abortable abortable = null; public RpcExecutor(final String name, final int handlerCount) { - this.handlers = new ArrayList(handlerCount); + this.handlers = new ArrayList(handlerCount); this.handlerCount = handlerCount; this.name = Strings.nullToEmpty(name); } @@ -101,75 +104,111 @@ public abstract class RpcExecutor { startHandlers(null, handlerCount, callQueues, 0, callQueues.size(), port); } + /** + * Override if providing alternate Handler implementation. + */ + protected Handler getHandler(final String name, final double handlerFailureThreshhold, + final BlockingQueue q) { + return new Handler(name, handlerFailureThreshhold, q); + } + + /** + * Start up our handlers. + */ protected void startHandlers(final String nameSuffix, final int numHandlers, final List> callQueues, final int qindex, final int qsize, final int port) { final String threadPrefix = name + Strings.nullToEmpty(nameSuffix); - for (int i = 0; i < numHandlers; i++) { - final int index = qindex + (i % qsize); - Thread t = new Thread(new Runnable() { - @Override - public void run() { - consumerLoop(callQueues.get(index)); - } - }); - t.setDaemon(true); - t.setName(threadPrefix + "RpcServer.handler=" + handlers.size() + - ",queue=" + index + ",port=" + port); - t.start(); - LOG.debug(threadPrefix + " Start Handler index=" + handlers.size() + " queue=" + index); - handlers.add(t); - } - } - - protected void consumerLoop(final BlockingQueue myQueue) { - boolean interrupted = false; double handlerFailureThreshhold = conf == null ? 1.0 : conf.getDouble(HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT, HConstants.DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT); - try { - while (running) { - try { - MonitoredRPCHandler status = RpcServer.getStatus(); - CallRunner task = myQueue.take(); - task.setStatus(status); + for (int i = 0; i < numHandlers; i++) { + final int index = qindex + (i % qsize); + String name = "RpcServer." + threadPrefix + ".handler=" + handlers.size() + ",queue=" + + index + ",port=" + port; + Handler handler = getHandler(name, handlerFailureThreshhold, callQueues.get(index)); + handler.start(); + LOG.debug("Started " + name); + handlers.add(handler); + } + } + + /** + * Handler thread run the {@link CallRunner#run()} in. + */ + protected class Handler extends Thread { + /** + * Q to find CallRunners to run in. + */ + final BlockingQueue q; + + final double handlerFailureThreshhold; + + Handler(final String name, final double handlerFailureThreshhold, + final BlockingQueue q) { + super(name); + setDaemon(true); + this.q = q; + this.handlerFailureThreshhold = handlerFailureThreshhold; + } + + /** + * @return A {@link CallRunner} + * @throws InterruptedException + */ + protected CallRunner getCallRunner() throws InterruptedException { + return this.q.take(); + } + + @Override + public void run() { + boolean interrupted = false; + try { + while (running) { try { - activeHandlerCount.incrementAndGet(); - task.run(); - } catch (Throwable e) { - if (e instanceof Error) { - int failedCount = failedHandlerCount.incrementAndGet(); - if (handlerFailureThreshhold >= 0 - && failedCount > handlerCount * handlerFailureThreshhold) { - String message = - "Number of failed RpcServer handler exceeded threshhold " - + handlerFailureThreshhold + " with failed reason: " - + StringUtils.stringifyException(e); - if (abortable != null) { - abortable.abort(message, e); - } else { - LOG.error("Received " + StringUtils.stringifyException(e) - + " but not aborting due to abortable being null"); - throw e; - } - } else { - LOG.warn("RpcServer handler threads encountered errors " - + StringUtils.stringifyException(e)); - } - } else { - LOG.warn("RpcServer handler threads encountered exceptions " - + StringUtils.stringifyException(e)); - } - } finally { - activeHandlerCount.decrementAndGet(); + run(getCallRunner()); + } catch (InterruptedException e) { + interrupted = true; } - } catch (InterruptedException e) { - interrupted = true; + } + } catch (Exception e) { + LOG.warn(e); + throw e; + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); } } - } finally { - if (interrupted) { - Thread.currentThread().interrupt(); + } + + private void run(CallRunner cr) { + MonitoredRPCHandler status = RpcServer.getStatus(); + cr.setStatus(status); + try { + activeHandlerCount.incrementAndGet(); + cr.run(); + } catch (Throwable e) { + if (e instanceof Error) { + int failedCount = failedHandlerCount.incrementAndGet(); + if (this.handlerFailureThreshhold >= 0 && + failedCount > handlerCount * this.handlerFailureThreshhold) { + String message = "Number of failed RpcServer handler runs exceeded threshhold " + + this.handlerFailureThreshhold + "; reason: " + StringUtils.stringifyException(e); + if (abortable != null) { + abortable.abort(message, e); + } else { + LOG.error("Error but can't abort because abortable is null: " + + StringUtils.stringifyException(e)); + throw e; + } + } else { + LOG.warn("Handler errors " + StringUtils.stringifyException(e)); + } + } else { + LOG.warn("Handler exception " + StringUtils.stringifyException(e)); + } + } finally { + activeHandlerCount.decrementAndGet(); } } } @@ -194,7 +233,6 @@ public abstract class RpcExecutor { * All requests go to the first queue, at index 0 */ private static QueueBalancer ONE_QUEUE = new QueueBalancer() { - @Override public int getNextQueue() { return 0; @@ -227,4 +265,4 @@ public abstract class RpcExecutor { } currentQueueLimit = conf.getInt(configKey, currentQueueLimit); } -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index 85bf78d2799..13204cc0c16 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -99,7 +99,7 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); - if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) { + if (isCodelQueueType(callQueueType)) { // update CoDel Scheduler tunables int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY, CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY); @@ -207,7 +207,7 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs callExecutor = new RWQueueRpcExecutor("RW.deadline.Q", handlerCount, numCallQueues, callqReadShare, callqScanShare, maxQueueLength, conf, abortable, BoundedPriorityBlockingQueue.class, callPriority); - } else if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) { + } else if (isCodelQueueType(callQueueType)) { Object[] callQueueInitArgs = {maxQueueLength, codelTargetDelay, codelInterval, codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches}; callExecutor = new RWQueueRpcExecutor("RW.codel.Q", handlerCount, @@ -223,33 +223,37 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs if (isDeadlineQueueType(callQueueType)) { CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority); callExecutor = - new BalancedQueueRpcExecutor("B.deadline.Q", handlerCount, numCallQueues, + new BalancedQueueRpcExecutor("BQDeadline.default", handlerCount, numCallQueues, conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority); - } else if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) { + } else if (isCodelQueueType(callQueueType)) { callExecutor = - new BalancedQueueRpcExecutor("B.codel.Q", handlerCount, numCallQueues, + new BalancedQueueRpcExecutor("BQCodel.default", handlerCount, numCallQueues, conf, abortable, AdaptiveLifoCoDelCallQueue.class, maxQueueLength, codelTargetDelay, codelInterval, codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches); } else { - callExecutor = new BalancedQueueRpcExecutor("B.fifo.Q", handlerCount, - numCallQueues, maxQueueLength, conf, abortable); + // FifoWFPBQ = FifoWithFastPathBalancedQueueRpcExecutor + callExecutor = new FifoWithFastPathBalancedQueueRpcExecutor("FifoWFPBQ.default", + handlerCount, numCallQueues, maxQueueLength, conf, abortable); } } // Create 2 queues to help priorityExecutor be more scalable. - this.priorityExecutor = priorityHandlerCount > 0 ? - new BalancedQueueRpcExecutor("B.priority.fifo.Q", priorityHandlerCount, 2, - maxPriorityQueueLength): - null; - this.replicationExecutor = - replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("B.replication.fifo.Q", - replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null; + this.priorityExecutor = priorityHandlerCount > 0? + new FifoWithFastPathBalancedQueueRpcExecutor("FifoWFPBQ.priority", priorityHandlerCount, + 2, maxPriorityQueueLength, conf, abortable): null; + this.replicationExecutor = replicationHandlerCount > 0? + new FifoWithFastPathBalancedQueueRpcExecutor("FifoWFPBQ.replication", + replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null; } private static boolean isDeadlineQueueType(final String callQueueType) { return callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); } + private static boolean isCodelQueueType(final String callQueueType) { + return callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE); + } + public SimpleRpcScheduler( Configuration conf, int handlerCount, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java index 2f71980baca..db4485ad22d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java @@ -40,19 +40,12 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Maps; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.ipc.RpcServer.Call; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl; import org.apache.hadoop.hbase.protobuf.RequestConverter; @@ -60,7 +53,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.testclassification.RPCTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdge; @@ -243,7 +235,7 @@ public class TestSimpleRpcScheduler { // -> WITH REORDER [10 10 10 10 10 10 50 100] -> 530 (Deadline Queue) if (queueType.equals(SimpleRpcScheduler.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { assertEquals(530, totalTime); - } else /* if (queueType.equals(SimpleRpcScheduler.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)) */ { + } else if (queueType.equals(SimpleRpcScheduler.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)) { assertEquals(930, totalTime); } } finally {