From 598b453a4100120827e91b9c6a82f3da1ba4e2bf Mon Sep 17 00:00:00 2001 From: Yutong Xiao Date: Mon, 17 Jan 2022 10:44:03 +0800 Subject: [PATCH] HBASE-26551 Add FastPath feature to HBase RWQueueRpcExecutor (#4028) Signed-off-by: Reid Chan --- .../ipc/FastPathBalancedQueueRpcExecutor.java | 63 ++------- .../hbase/ipc/FastPathRWQueueRpcExecutor.java | 71 ++++++++++ .../hadoop/hbase/ipc/FastPathRpcHandler.java | 76 ++++++++++ .../hadoop/hbase/ipc/RWQueueRpcExecutor.java | 28 +++- .../apache/hadoop/hbase/ipc/RpcExecutor.java | 109 ++------------- .../apache/hadoop/hbase/ipc/RpcHandler.java | 131 ++++++++++++++++++ .../hadoop/hbase/ipc/SimpleRpcScheduler.java | 2 +- .../hbase/ipc/TestSimpleRpcScheduler.java | 4 +- 8 files changed, 326 insertions(+), 158 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRpcHandler.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcHandler.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java index 724d828f416..11674b6c6c0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.ipc; import java.util.Deque; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; @@ -41,7 +40,7 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor { /* * Stack of Handlers waiting for work. */ - private final Deque fastPathHandlerStack = new ConcurrentLinkedDeque<>(); + private final Deque fastPathHandlerStack = new ConcurrentLinkedDeque<>(); public FastPathBalancedQueueRpcExecutor(final String name, final int handlerCount, final int maxQueueLength, final PriorityFunction priority, final Configuration conf, @@ -57,10 +56,12 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor { } @Override - protected Handler getHandler(String name, double handlerFailureThreshhold, - BlockingQueue q, AtomicInteger activeHandlerCount) { - return new FastPathHandler(name, handlerFailureThreshhold, q, activeHandlerCount, - fastPathHandlerStack); + protected RpcHandler getHandler(final String name, final double handlerFailureThreshhold, + final int handlerCount, final BlockingQueue q, + final AtomicInteger activeHandlerCount, final AtomicInteger failedHandlerCount, + final Abortable abortable) { + return new FastPathRpcHandler(name, handlerFailureThreshhold, handlerCount, q, + activeHandlerCount, failedHandlerCount, abortable, fastPathHandlerStack); } @Override @@ -70,60 +71,14 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor { if (currentQueueLimit == 0){ return false; } - FastPathHandler handler = popReadyHandler(); + FastPathRpcHandler handler = popReadyHandler(); return handler != null? handler.loadCallRunner(callTask): super.dispatch(callTask); } /** * @return Pop a Handler instance if one available ready-to-go or else return null. */ - private FastPathHandler popReadyHandler() { + private FastPathRpcHandler popReadyHandler() { return this.fastPathHandlerStack.poll(); } - - class FastPathHandler extends Handler { - // Below are for fast-path support. Push this Handler on to the fastPathHandlerStack Deque - // if an empty queue of CallRunners so we are available for direct handoff when one comes in. - final Deque fastPathHandlerStack; - // Semaphore to coordinate loading of fastpathed loadedTask and our running it. - private Semaphore semaphore = new Semaphore(0); - // The task we get when fast-pathing. - private CallRunner loadedCallRunner; - - FastPathHandler(String name, double handlerFailureThreshhold, BlockingQueue q, - final AtomicInteger activeHandlerCount, - final Deque fastPathHandlerStack) { - super(name, handlerFailureThreshhold, q, activeHandlerCount); - this.fastPathHandlerStack = fastPathHandlerStack; - } - - protected CallRunner getCallRunner() throws InterruptedException { - // Get a callrunner if one in the Q. - CallRunner cr = this.q.poll(); - if (cr == null) { - // Else, if a fastPathHandlerStack present and no callrunner in Q, register ourselves for - // the fastpath handoff done via fastPathHandlerStack. - if (this.fastPathHandlerStack != null) { - this.fastPathHandlerStack.push(this); - this.semaphore.acquire(); - cr = this.loadedCallRunner; - this.loadedCallRunner = null; - } else { - // No fastpath available. Block until a task comes available. - cr = super.getCallRunner(); - } - } - return cr; - } - - /** - * @param task Task gotten via fastpath. - * @return True if we successfully loaded our task - */ - boolean loadCallRunner(final CallRunner cr) { - this.loadedCallRunner = cr; - this.semaphore.release(); - return true; - } - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java new file mode 100644 index 00000000000..54751de4bdb --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.util.Deque; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * RPC Executor that extends {@link RWQueueRpcExecutor} with fast-path feature, used in + * {@link FastPathBalancedQueueRpcExecutor}. + */ +@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) +@InterfaceStability.Evolving +public class FastPathRWQueueRpcExecutor extends RWQueueRpcExecutor { + private static final Logger LOG = LoggerFactory.getLogger(RWQueueRpcExecutor.class); + + private final Deque readHandlerStack = new ConcurrentLinkedDeque<>(); + private final Deque writeHandlerStack = new ConcurrentLinkedDeque<>(); + private final Deque scanHandlerStack = new ConcurrentLinkedDeque<>(); + + public FastPathRWQueueRpcExecutor(String name, int handlerCount, int maxQueueLength, + PriorityFunction priority, Configuration conf, Abortable abortable) { + super(name, handlerCount, maxQueueLength, priority, conf, abortable); + } + + @Override + protected RpcHandler getHandler(final String name, final double handlerFailureThreshhold, + final int handlerCount, final BlockingQueue q, + final AtomicInteger activeHandlerCount, final AtomicInteger failedHandlerCount, + final Abortable abortable) { + Deque handlerStack = name.contains("read") ? readHandlerStack : + name.contains("write") ? writeHandlerStack : scanHandlerStack; + return new FastPathRpcHandler(name, handlerFailureThreshhold, handlerCount, q, + activeHandlerCount, failedHandlerCount, abortable, handlerStack); + } + + @Override + public boolean dispatch(final CallRunner callTask) throws InterruptedException { + RpcServer.Call call = callTask.getCall(); + boolean shouldDispatchToWriteQueue = isWriteRequest(call.getHeader(), call.param); + boolean shouldDispatchToScanQueue = shouldDispatchToScanQueue(callTask); + FastPathRpcHandler handler = shouldDispatchToWriteQueue ? writeHandlerStack.poll() : + shouldDispatchToScanQueue ? scanHandlerStack.poll() : readHandlerStack.poll(); + return handler != null ? handler.loadCallRunner(callTask) : + dispatchTo(shouldDispatchToWriteQueue, shouldDispatchToScanQueue, callTask); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRpcHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRpcHandler.java new file mode 100644 index 00000000000..13edd083409 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRpcHandler.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.util.Deque; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +@InterfaceAudience.Private +public class FastPathRpcHandler extends RpcHandler { + // Below are for fast-path support. Push this Handler on to the fastPathHandlerStack Deque + // if an empty queue of CallRunners so we are available for direct handoff when one comes in. + final Deque fastPathHandlerStack; + // Semaphore to coordinate loading of fastpathed loadedTask and our running it. + // UNFAIR synchronization. + private Semaphore semaphore = new Semaphore(0); + // The task we get when fast-pathing. + private CallRunner loadedCallRunner; + + FastPathRpcHandler(String name, double handlerFailureThreshhold, int handlerCount, + BlockingQueue q, AtomicInteger activeHandlerCount, + AtomicInteger failedHandlerCount, final Abortable abortable, + final Deque fastPathHandlerStack) { + super(name, handlerFailureThreshhold, handlerCount, q, activeHandlerCount, failedHandlerCount, + abortable); + this.fastPathHandlerStack = fastPathHandlerStack; + } + + @Override + protected CallRunner getCallRunner() throws InterruptedException { + // Get a callrunner if one in the Q. + CallRunner cr = this.q.poll(); + if (cr == null) { + // Else, if a fastPathHandlerStack present and no callrunner in Q, register ourselves for + // the fastpath handoff done via fastPathHandlerStack. + if (this.fastPathHandlerStack != null) { + this.fastPathHandlerStack.push(this); + this.semaphore.acquire(); + cr = this.loadedCallRunner; + this.loadedCallRunner = null; + } else { + // No fastpath available. Block until a task comes available. + cr = super.getCallRunner(); + } + } + return cr; + } + + /** + * @param cr Task gotten via fastpath. + * @return True if we successfully loaded our task + */ + boolean loadCallRunner(final CallRunner cr) { + this.loadedCallRunner = cr; + this.semaphore.release(); + return true; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java index 9feef2549f3..234c7a1ee00 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java @@ -18,7 +18,7 @@ package org.apache.hadoop.hbase.ipc; -import java.util.List; +import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; @@ -251,16 +251,22 @@ public class RWQueueRpcExecutor extends RpcExecutor { @Override public boolean dispatch(final CallRunner callTask) throws InterruptedException { RpcServer.Call call = callTask.getCall(); + return dispatchTo(isWriteRequest(call.getHeader(), call.param), + shouldDispatchToScanQueue(callTask), callTask); + } + + protected boolean dispatchTo(boolean toWriteQueue, boolean toScanQueue, + final CallRunner callTask) { int queueIndex; - if (isWriteRequest(call.getHeader(), call.param)) { + if (toWriteQueue) { queueIndex = writeBalancer.getNextQueue(); - } else if (numScanQueues > 0 && isScanRequest(call.getHeader(), call.param)) { + } else if (toScanQueue) { queueIndex = numWriteQueues + numReadQueues + scanBalancer.getNextQueue(); } else { queueIndex = numWriteQueues + readBalancer.getNextQueue(); } - BlockingQueue queue = queues.get(queueIndex); + Queue queue = queues.get(queueIndex); if (queue.size() >= currentQueueLimit) { return false; } @@ -316,7 +322,7 @@ public class RWQueueRpcExecutor extends RpcExecutor { return activeScanHandlerCount.get(); } - private boolean isWriteRequest(final RequestHeader header, final Message param) { + protected boolean isWriteRequest(final RequestHeader header, final Message param) { // TODO: Is there a better way to do this? if (param instanceof MultiRequest) { MultiRequest multi = (MultiRequest)param; @@ -353,6 +359,18 @@ public class RWQueueRpcExecutor extends RpcExecutor { return param instanceof ScanRequest; } + protected boolean shouldDispatchToScanQueue(final CallRunner task) { + RpcServer.Call call = task.getCall(); + return numScanQueues > 0 && isScanRequest(call.getHeader(), call.param); + } + + protected float getReadShare(final Configuration conf) { + return conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0); + } + + protected float getScanShare(final Configuration conf) { + return conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0); + } /* * Calculate the number of writers based on the "total count" and the read share. * You'll get at least one writer. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java index d46786b7332..d6c4829ead2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java @@ -34,10 +34,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue; import org.apache.hadoop.hbase.util.ReflectionUtils; -import org.apache.hadoop.util.StringUtils; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -89,12 +87,11 @@ public abstract class RpcExecutor { protected volatile int currentQueueLimit; private final AtomicInteger activeHandlerCount = new AtomicInteger(0); - private final List handlers; + private final List handlers; private final int handlerCount; private final AtomicInteger failedHandlerCount = new AtomicInteger(0); private String name; - private boolean running; private Configuration conf = null; private Abortable abortable = null; @@ -102,7 +99,7 @@ public abstract class RpcExecutor { @Deprecated public RpcExecutor(final String name, final int handlerCount, final int numCallQueues) { this.name = Strings.nullToEmpty(name); - this.handlers = new ArrayList(handlerCount); + this.handlers = new ArrayList(handlerCount); this.handlerCount = handlerCount; this.numCallQueues = numCallQueues; this.queues = new ArrayList<>(this.numCallQueues); @@ -197,13 +194,12 @@ public abstract class RpcExecutor { } public void start(final int port) { - running = true; startHandlers(port); } public void stop() { - running = false; - for (Thread handler : handlers) { + for (RpcHandler handler : handlers) { + handler.stopRunning(); handler.interrupt(); } } @@ -224,9 +220,12 @@ public abstract class RpcExecutor { /** * Override if providing alternate Handler implementation. */ - protected Handler getHandler(final String name, final double handlerFailureThreshhold, - final BlockingQueue q, final AtomicInteger activeHandlerCount) { - return new Handler(name, handlerFailureThreshhold, q, activeHandlerCount); + protected RpcHandler getHandler(final String name, final double handlerFailureThreshhold, + final int handlerCount, final BlockingQueue q, + final AtomicInteger activeHandlerCount, final AtomicInteger failedHandlerCount, + final Abortable abortable) { + return new RpcHandler(name, handlerFailureThreshhold, handlerCount, q, activeHandlerCount, + failedHandlerCount, abortable); } /** @@ -243,98 +242,14 @@ public abstract class RpcExecutor { final int index = qindex + (i % qsize); String name = "RpcServer." + threadPrefix + ".handler=" + handlers.size() + ",queue=" + index + ",port=" + port; - Handler handler = getHandler(name, handlerFailureThreshhold, callQueues.get(index), - activeHandlerCount); + RpcHandler handler = getHandler(name, handlerFailureThreshhold, handlerCount, + callQueues.get(index), activeHandlerCount, failedHandlerCount, abortable); handler.start(); LOG.debug("Started " + name); handlers.add(handler); } } - /** - * Handler thread run the {@link CallRunner#run()} in. - */ - protected class Handler extends Thread { - /** - * Q to find CallRunners to run in. - */ - final BlockingQueue q; - - final double handlerFailureThreshhold; - - // metrics (shared with other handlers) - final AtomicInteger activeHandlerCount; - - Handler(final String name, final double handlerFailureThreshhold, - final BlockingQueue q, final AtomicInteger activeHandlerCount) { - super(name); - setDaemon(true); - this.q = q; - this.handlerFailureThreshhold = handlerFailureThreshhold; - this.activeHandlerCount = activeHandlerCount; - } - - /** - * @return A {@link CallRunner} - * @throws InterruptedException - */ - protected CallRunner getCallRunner() throws InterruptedException { - return this.q.take(); - } - - @Override - public void run() { - boolean interrupted = false; - try { - while (running) { - try { - run(getCallRunner()); - } catch (InterruptedException e) { - interrupted = true; - } - } - } catch (Exception e) { - LOG.warn(e); - throw e; - } finally { - if (interrupted) { - Thread.currentThread().interrupt(); - } - } - } - - private void run(CallRunner cr) { - MonitoredRPCHandler status = RpcServer.getStatus(); - cr.setStatus(status); - try { - this.activeHandlerCount.incrementAndGet(); - cr.run(); - } catch (Throwable e) { - if (e instanceof Error) { - int failedCount = failedHandlerCount.incrementAndGet(); - if (this.handlerFailureThreshhold >= 0 - && failedCount > handlerCount * this.handlerFailureThreshhold) { - String message = "Number of failed RpcServer handler runs exceeded threshhold " - + this.handlerFailureThreshhold + "; reason: " + StringUtils.stringifyException(e); - if (abortable != null) { - abortable.abort(message, e); - } else { - LOG.error("Error but can't abort because abortable is null: " - + StringUtils.stringifyException(e)); - throw e; - } - } else { - LOG.warn("Handler errors " + StringUtils.stringifyException(e)); - } - } else { - LOG.warn("Handler exception " + StringUtils.stringifyException(e)); - } - } finally { - this.activeHandlerCount.decrementAndGet(); - } - } - } - public static abstract class QueueBalancer { /** * @return the index of the next queue to which a request should be inserted diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcHandler.java new file mode 100644 index 00000000000..dbbaaa8aa7a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcHandler.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; +import org.apache.hadoop.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Thread to handle rpc call. + * Should only be used in {@link RpcExecutor} and its sub-classes. + */ +@InterfaceAudience.Private +public class RpcHandler extends Thread { + private static final Logger LOG = LoggerFactory.getLogger(RpcHandler.class); + + /** + * Q to find CallRunners to run in. + */ + final BlockingQueue q; + + final int handlerCount; + final double handlerFailureThreshhold; + + // metrics (shared with other handlers) + final AtomicInteger activeHandlerCount; + final AtomicInteger failedHandlerCount; + + // The up-level RpcServer. + final Abortable abortable; + + private boolean running; + + RpcHandler(final String name, final double handlerFailureThreshhold, final int handlerCount, + final BlockingQueue q, final AtomicInteger activeHandlerCount, + final AtomicInteger failedHandlerCount, final Abortable abortable) { + super(name); + setDaemon(true); + this.q = q; + this.handlerFailureThreshhold = handlerFailureThreshhold; + this.activeHandlerCount = activeHandlerCount; + this.failedHandlerCount = failedHandlerCount; + this.handlerCount = handlerCount; + this.abortable = abortable; + } + + /** + * @return A {@link CallRunner} + * @throws InterruptedException Throws by {@link BlockingQueue#take()} + */ + protected CallRunner getCallRunner() throws InterruptedException { + return this.q.take(); + } + + public void stopRunning() { + running = false; + } + + @Override + public void run() { + boolean interrupted = false; + running = true; + try { + while (running) { + try { + run(getCallRunner()); + } catch (InterruptedException e) { + interrupted = true; + } + } + } catch (Exception e) { + LOG.warn(e.toString(), e); + throw e; + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } + + private void run(CallRunner cr) { + MonitoredRPCHandler status = RpcServer.getStatus(); + cr.setStatus(status); + try { + this.activeHandlerCount.incrementAndGet(); + cr.run(); + } catch (Throwable e) { + if (e instanceof Error) { + int failedCount = failedHandlerCount.incrementAndGet(); + if (this.handlerFailureThreshhold >= 0 + && failedCount > handlerCount * this.handlerFailureThreshhold) { + String message = "Number of failed RpcServer handler runs exceeded threshhold " + + this.handlerFailureThreshhold + "; reason: " + StringUtils.stringifyException(e); + if (abortable != null) { + abortable.abort(message, e); + } else { + LOG.error("Error but can't abort because abortable is null: " + + StringUtils.stringifyException(e)); + throw e; + } + } else { + LOG.warn("Handler errors " + StringUtils.stringifyException(e)); + } + } else { + LOG.warn("Handler exception " + StringUtils.stringifyException(e)); + } + } finally { + this.activeHandlerCount.decrementAndGet(); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index dbe4667f5a8..049d33b297a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -82,7 +82,7 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs if (callqReadShare > 0) { // at least 1 read handler and 1 write handler - callExecutor = new RWQueueRpcExecutor("default.RWQ", Math.max(2, handlerCount), + callExecutor = new FastPathRWQueueRpcExecutor("default.FPRWQ", Math.max(2, handlerCount), maxQueueLength, priority, conf, server); } else { if (RpcExecutor.isFifoQueueType(callQueueType) || RpcExecutor.isCodelQueueType(callQueueType)) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java index d18e16784ea..602a72257ad 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java @@ -498,8 +498,10 @@ public class TestSimpleRpcScheduler { CallRunner task = mock(CallRunner.class); assertFalse(executor.dispatch(task)); //make sure we never internally get a handler, which would skip the queue validation + Mockito.verify(executor, Mockito.never()).getHandler(Mockito.anyString(), Mockito.anyDouble(), - (BlockingQueue) Mockito.any(), (AtomicInteger) Mockito.any()); + Mockito.anyInt(), (BlockingQueue) Mockito.any(), (AtomicInteger) Mockito.any(), + (AtomicInteger) Mockito.any(), (Abortable) Mockito.any()); } // Get mocked call that has the CallRunner sleep for a while so that the fast