HBASE-26551 Add FastPath feature to HBase RWQueueRpcExecutor (#4028)

Signed-off-by: Reid Chan <reidchan@apache.org>
This commit is contained in:
Yutong Xiao 2022-01-17 10:44:03 +08:00 committed by GitHub
parent e8a8523bb0
commit 598b453a41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 326 additions and 158 deletions

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.ipc;
import java.util.Deque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
@ -41,7 +40,7 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
/*
* Stack of Handlers waiting for work.
*/
private final Deque<FastPathHandler> fastPathHandlerStack = new ConcurrentLinkedDeque<>();
private final Deque<FastPathRpcHandler> fastPathHandlerStack = new ConcurrentLinkedDeque<>();
public FastPathBalancedQueueRpcExecutor(final String name, final int handlerCount,
final int maxQueueLength, final PriorityFunction priority, final Configuration conf,
@ -57,10 +56,12 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
}
@Override
protected Handler getHandler(String name, double handlerFailureThreshhold,
BlockingQueue<CallRunner> q, AtomicInteger activeHandlerCount) {
return new FastPathHandler(name, handlerFailureThreshhold, q, activeHandlerCount,
fastPathHandlerStack);
protected RpcHandler getHandler(final String name, final double handlerFailureThreshhold,
final int handlerCount, final BlockingQueue<CallRunner> q,
final AtomicInteger activeHandlerCount, final AtomicInteger failedHandlerCount,
final Abortable abortable) {
return new FastPathRpcHandler(name, handlerFailureThreshhold, handlerCount, q,
activeHandlerCount, failedHandlerCount, abortable, fastPathHandlerStack);
}
@Override
@ -70,60 +71,14 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
if (currentQueueLimit == 0){
return false;
}
FastPathHandler handler = popReadyHandler();
FastPathRpcHandler handler = popReadyHandler();
return handler != null? handler.loadCallRunner(callTask): super.dispatch(callTask);
}
/**
* @return Pop a Handler instance if one available ready-to-go or else return null.
*/
private FastPathHandler popReadyHandler() {
private FastPathRpcHandler popReadyHandler() {
return this.fastPathHandlerStack.poll();
}
class FastPathHandler extends Handler {
// Below are for fast-path support. Push this Handler on to the fastPathHandlerStack Deque
// if an empty queue of CallRunners so we are available for direct handoff when one comes in.
final Deque<FastPathHandler> fastPathHandlerStack;
// Semaphore to coordinate loading of fastpathed loadedTask and our running it.
private Semaphore semaphore = new Semaphore(0);
// The task we get when fast-pathing.
private CallRunner loadedCallRunner;
FastPathHandler(String name, double handlerFailureThreshhold, BlockingQueue<CallRunner> q,
final AtomicInteger activeHandlerCount,
final Deque<FastPathHandler> fastPathHandlerStack) {
super(name, handlerFailureThreshhold, q, activeHandlerCount);
this.fastPathHandlerStack = fastPathHandlerStack;
}
protected CallRunner getCallRunner() throws InterruptedException {
// Get a callrunner if one in the Q.
CallRunner cr = this.q.poll();
if (cr == null) {
// Else, if a fastPathHandlerStack present and no callrunner in Q, register ourselves for
// the fastpath handoff done via fastPathHandlerStack.
if (this.fastPathHandlerStack != null) {
this.fastPathHandlerStack.push(this);
this.semaphore.acquire();
cr = this.loadedCallRunner;
this.loadedCallRunner = null;
} else {
// No fastpath available. Block until a task comes available.
cr = super.getCallRunner();
}
}
return cr;
}
/**
* @param task Task gotten via fastpath.
* @return True if we successfully loaded our task
*/
boolean loadCallRunner(final CallRunner cr) {
this.loadedCallRunner = cr;
this.semaphore.release();
return true;
}
}
}

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.util.Deque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* RPC Executor that extends {@link RWQueueRpcExecutor} with fast-path feature, used in
* {@link FastPathBalancedQueueRpcExecutor}.
*/
@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
@InterfaceStability.Evolving
public class FastPathRWQueueRpcExecutor extends RWQueueRpcExecutor {
private static final Logger LOG = LoggerFactory.getLogger(RWQueueRpcExecutor.class);
private final Deque<FastPathRpcHandler> readHandlerStack = new ConcurrentLinkedDeque<>();
private final Deque<FastPathRpcHandler> writeHandlerStack = new ConcurrentLinkedDeque<>();
private final Deque<FastPathRpcHandler> scanHandlerStack = new ConcurrentLinkedDeque<>();
public FastPathRWQueueRpcExecutor(String name, int handlerCount, int maxQueueLength,
PriorityFunction priority, Configuration conf, Abortable abortable) {
super(name, handlerCount, maxQueueLength, priority, conf, abortable);
}
@Override
protected RpcHandler getHandler(final String name, final double handlerFailureThreshhold,
final int handlerCount, final BlockingQueue<CallRunner> q,
final AtomicInteger activeHandlerCount, final AtomicInteger failedHandlerCount,
final Abortable abortable) {
Deque<FastPathRpcHandler> handlerStack = name.contains("read") ? readHandlerStack :
name.contains("write") ? writeHandlerStack : scanHandlerStack;
return new FastPathRpcHandler(name, handlerFailureThreshhold, handlerCount, q,
activeHandlerCount, failedHandlerCount, abortable, handlerStack);
}
@Override
public boolean dispatch(final CallRunner callTask) throws InterruptedException {
RpcServer.Call call = callTask.getCall();
boolean shouldDispatchToWriteQueue = isWriteRequest(call.getHeader(), call.param);
boolean shouldDispatchToScanQueue = shouldDispatchToScanQueue(callTask);
FastPathRpcHandler handler = shouldDispatchToWriteQueue ? writeHandlerStack.poll() :
shouldDispatchToScanQueue ? scanHandlerStack.poll() : readHandlerStack.poll();
return handler != null ? handler.loadCallRunner(callTask) :
dispatchTo(shouldDispatchToWriteQueue, shouldDispatchToScanQueue, callTask);
}
}

View File

@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.util.Deque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@InterfaceAudience.Private
public class FastPathRpcHandler extends RpcHandler {
// Below are for fast-path support. Push this Handler on to the fastPathHandlerStack Deque
// if an empty queue of CallRunners so we are available for direct handoff when one comes in.
final Deque<FastPathRpcHandler> fastPathHandlerStack;
// Semaphore to coordinate loading of fastpathed loadedTask and our running it.
// UNFAIR synchronization.
private Semaphore semaphore = new Semaphore(0);
// The task we get when fast-pathing.
private CallRunner loadedCallRunner;
FastPathRpcHandler(String name, double handlerFailureThreshhold, int handlerCount,
BlockingQueue<CallRunner> q, AtomicInteger activeHandlerCount,
AtomicInteger failedHandlerCount, final Abortable abortable,
final Deque<FastPathRpcHandler> fastPathHandlerStack) {
super(name, handlerFailureThreshhold, handlerCount, q, activeHandlerCount, failedHandlerCount,
abortable);
this.fastPathHandlerStack = fastPathHandlerStack;
}
@Override
protected CallRunner getCallRunner() throws InterruptedException {
// Get a callrunner if one in the Q.
CallRunner cr = this.q.poll();
if (cr == null) {
// Else, if a fastPathHandlerStack present and no callrunner in Q, register ourselves for
// the fastpath handoff done via fastPathHandlerStack.
if (this.fastPathHandlerStack != null) {
this.fastPathHandlerStack.push(this);
this.semaphore.acquire();
cr = this.loadedCallRunner;
this.loadedCallRunner = null;
} else {
// No fastpath available. Block until a task comes available.
cr = super.getCallRunner();
}
}
return cr;
}
/**
* @param cr Task gotten via fastpath.
* @return True if we successfully loaded our task
*/
boolean loadCallRunner(final CallRunner cr) {
this.loadedCallRunner = cr;
this.semaphore.release();
return true;
}
}

View File

@ -18,7 +18,7 @@
package org.apache.hadoop.hbase.ipc;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
@ -251,16 +251,22 @@ public class RWQueueRpcExecutor extends RpcExecutor {
@Override
public boolean dispatch(final CallRunner callTask) throws InterruptedException {
RpcServer.Call call = callTask.getCall();
return dispatchTo(isWriteRequest(call.getHeader(), call.param),
shouldDispatchToScanQueue(callTask), callTask);
}
protected boolean dispatchTo(boolean toWriteQueue, boolean toScanQueue,
final CallRunner callTask) {
int queueIndex;
if (isWriteRequest(call.getHeader(), call.param)) {
if (toWriteQueue) {
queueIndex = writeBalancer.getNextQueue();
} else if (numScanQueues > 0 && isScanRequest(call.getHeader(), call.param)) {
} else if (toScanQueue) {
queueIndex = numWriteQueues + numReadQueues + scanBalancer.getNextQueue();
} else {
queueIndex = numWriteQueues + readBalancer.getNextQueue();
}
BlockingQueue<CallRunner> queue = queues.get(queueIndex);
Queue<CallRunner> queue = queues.get(queueIndex);
if (queue.size() >= currentQueueLimit) {
return false;
}
@ -316,7 +322,7 @@ public class RWQueueRpcExecutor extends RpcExecutor {
return activeScanHandlerCount.get();
}
private boolean isWriteRequest(final RequestHeader header, final Message param) {
protected boolean isWriteRequest(final RequestHeader header, final Message param) {
// TODO: Is there a better way to do this?
if (param instanceof MultiRequest) {
MultiRequest multi = (MultiRequest)param;
@ -353,6 +359,18 @@ public class RWQueueRpcExecutor extends RpcExecutor {
return param instanceof ScanRequest;
}
protected boolean shouldDispatchToScanQueue(final CallRunner task) {
RpcServer.Call call = task.getCall();
return numScanQueues > 0 && isScanRequest(call.getHeader(), call.param);
}
protected float getReadShare(final Configuration conf) {
return conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
}
protected float getScanShare(final Configuration conf) {
return conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
}
/*
* Calculate the number of writers based on the "total count" and the read share.
* You'll get at least one writer.

View File

@ -34,10 +34,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
@ -89,12 +87,11 @@ public abstract class RpcExecutor {
protected volatile int currentQueueLimit;
private final AtomicInteger activeHandlerCount = new AtomicInteger(0);
private final List<Handler> handlers;
private final List<RpcHandler> handlers;
private final int handlerCount;
private final AtomicInteger failedHandlerCount = new AtomicInteger(0);
private String name;
private boolean running;
private Configuration conf = null;
private Abortable abortable = null;
@ -102,7 +99,7 @@ public abstract class RpcExecutor {
@Deprecated
public RpcExecutor(final String name, final int handlerCount, final int numCallQueues) {
this.name = Strings.nullToEmpty(name);
this.handlers = new ArrayList<Handler>(handlerCount);
this.handlers = new ArrayList<RpcHandler>(handlerCount);
this.handlerCount = handlerCount;
this.numCallQueues = numCallQueues;
this.queues = new ArrayList<>(this.numCallQueues);
@ -197,13 +194,12 @@ public abstract class RpcExecutor {
}
public void start(final int port) {
running = true;
startHandlers(port);
}
public void stop() {
running = false;
for (Thread handler : handlers) {
for (RpcHandler handler : handlers) {
handler.stopRunning();
handler.interrupt();
}
}
@ -224,9 +220,12 @@ public abstract class RpcExecutor {
/**
* Override if providing alternate Handler implementation.
*/
protected Handler getHandler(final String name, final double handlerFailureThreshhold,
final BlockingQueue<CallRunner> q, final AtomicInteger activeHandlerCount) {
return new Handler(name, handlerFailureThreshhold, q, activeHandlerCount);
protected RpcHandler getHandler(final String name, final double handlerFailureThreshhold,
final int handlerCount, final BlockingQueue<CallRunner> q,
final AtomicInteger activeHandlerCount, final AtomicInteger failedHandlerCount,
final Abortable abortable) {
return new RpcHandler(name, handlerFailureThreshhold, handlerCount, q, activeHandlerCount,
failedHandlerCount, abortable);
}
/**
@ -243,98 +242,14 @@ public abstract class RpcExecutor {
final int index = qindex + (i % qsize);
String name = "RpcServer." + threadPrefix + ".handler=" + handlers.size() + ",queue=" + index
+ ",port=" + port;
Handler handler = getHandler(name, handlerFailureThreshhold, callQueues.get(index),
activeHandlerCount);
RpcHandler handler = getHandler(name, handlerFailureThreshhold, handlerCount,
callQueues.get(index), activeHandlerCount, failedHandlerCount, abortable);
handler.start();
LOG.debug("Started " + name);
handlers.add(handler);
}
}
/**
* Handler thread run the {@link CallRunner#run()} in.
*/
protected class Handler extends Thread {
/**
* Q to find CallRunners to run in.
*/
final BlockingQueue<CallRunner> q;
final double handlerFailureThreshhold;
// metrics (shared with other handlers)
final AtomicInteger activeHandlerCount;
Handler(final String name, final double handlerFailureThreshhold,
final BlockingQueue<CallRunner> q, final AtomicInteger activeHandlerCount) {
super(name);
setDaemon(true);
this.q = q;
this.handlerFailureThreshhold = handlerFailureThreshhold;
this.activeHandlerCount = activeHandlerCount;
}
/**
* @return A {@link CallRunner}
* @throws InterruptedException
*/
protected CallRunner getCallRunner() throws InterruptedException {
return this.q.take();
}
@Override
public void run() {
boolean interrupted = false;
try {
while (running) {
try {
run(getCallRunner());
} catch (InterruptedException e) {
interrupted = true;
}
}
} catch (Exception e) {
LOG.warn(e);
throw e;
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
private void run(CallRunner cr) {
MonitoredRPCHandler status = RpcServer.getStatus();
cr.setStatus(status);
try {
this.activeHandlerCount.incrementAndGet();
cr.run();
} catch (Throwable e) {
if (e instanceof Error) {
int failedCount = failedHandlerCount.incrementAndGet();
if (this.handlerFailureThreshhold >= 0
&& failedCount > handlerCount * this.handlerFailureThreshhold) {
String message = "Number of failed RpcServer handler runs exceeded threshhold "
+ this.handlerFailureThreshhold + "; reason: " + StringUtils.stringifyException(e);
if (abortable != null) {
abortable.abort(message, e);
} else {
LOG.error("Error but can't abort because abortable is null: "
+ StringUtils.stringifyException(e));
throw e;
}
} else {
LOG.warn("Handler errors " + StringUtils.stringifyException(e));
}
} else {
LOG.warn("Handler exception " + StringUtils.stringifyException(e));
}
} finally {
this.activeHandlerCount.decrementAndGet();
}
}
}
public static abstract class QueueBalancer {
/**
* @return the index of the next queue to which a request should be inserted

View File

@ -0,0 +1,131 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Thread to handle rpc call.
* Should only be used in {@link RpcExecutor} and its sub-classes.
*/
@InterfaceAudience.Private
public class RpcHandler extends Thread {
private static final Logger LOG = LoggerFactory.getLogger(RpcHandler.class);
/**
* Q to find CallRunners to run in.
*/
final BlockingQueue<CallRunner> q;
final int handlerCount;
final double handlerFailureThreshhold;
// metrics (shared with other handlers)
final AtomicInteger activeHandlerCount;
final AtomicInteger failedHandlerCount;
// The up-level RpcServer.
final Abortable abortable;
private boolean running;
RpcHandler(final String name, final double handlerFailureThreshhold, final int handlerCount,
final BlockingQueue<CallRunner> q, final AtomicInteger activeHandlerCount,
final AtomicInteger failedHandlerCount, final Abortable abortable) {
super(name);
setDaemon(true);
this.q = q;
this.handlerFailureThreshhold = handlerFailureThreshhold;
this.activeHandlerCount = activeHandlerCount;
this.failedHandlerCount = failedHandlerCount;
this.handlerCount = handlerCount;
this.abortable = abortable;
}
/**
* @return A {@link CallRunner}
* @throws InterruptedException Throws by {@link BlockingQueue#take()}
*/
protected CallRunner getCallRunner() throws InterruptedException {
return this.q.take();
}
public void stopRunning() {
running = false;
}
@Override
public void run() {
boolean interrupted = false;
running = true;
try {
while (running) {
try {
run(getCallRunner());
} catch (InterruptedException e) {
interrupted = true;
}
}
} catch (Exception e) {
LOG.warn(e.toString(), e);
throw e;
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
private void run(CallRunner cr) {
MonitoredRPCHandler status = RpcServer.getStatus();
cr.setStatus(status);
try {
this.activeHandlerCount.incrementAndGet();
cr.run();
} catch (Throwable e) {
if (e instanceof Error) {
int failedCount = failedHandlerCount.incrementAndGet();
if (this.handlerFailureThreshhold >= 0
&& failedCount > handlerCount * this.handlerFailureThreshhold) {
String message = "Number of failed RpcServer handler runs exceeded threshhold "
+ this.handlerFailureThreshhold + "; reason: " + StringUtils.stringifyException(e);
if (abortable != null) {
abortable.abort(message, e);
} else {
LOG.error("Error but can't abort because abortable is null: "
+ StringUtils.stringifyException(e));
throw e;
}
} else {
LOG.warn("Handler errors " + StringUtils.stringifyException(e));
}
} else {
LOG.warn("Handler exception " + StringUtils.stringifyException(e));
}
} finally {
this.activeHandlerCount.decrementAndGet();
}
}
}

View File

@ -82,7 +82,7 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
if (callqReadShare > 0) {
// at least 1 read handler and 1 write handler
callExecutor = new RWQueueRpcExecutor("default.RWQ", Math.max(2, handlerCount),
callExecutor = new FastPathRWQueueRpcExecutor("default.FPRWQ", Math.max(2, handlerCount),
maxQueueLength, priority, conf, server);
} else {
if (RpcExecutor.isFifoQueueType(callQueueType) || RpcExecutor.isCodelQueueType(callQueueType)) {

View File

@ -498,8 +498,10 @@ public class TestSimpleRpcScheduler {
CallRunner task = mock(CallRunner.class);
assertFalse(executor.dispatch(task));
//make sure we never internally get a handler, which would skip the queue validation
Mockito.verify(executor, Mockito.never()).getHandler(Mockito.anyString(), Mockito.anyDouble(),
(BlockingQueue<CallRunner>) Mockito.any(), (AtomicInteger) Mockito.any());
Mockito.anyInt(), (BlockingQueue<CallRunner>) Mockito.any(), (AtomicInteger) Mockito.any(),
(AtomicInteger) Mockito.any(), (Abortable) Mockito.any());
}
// Get mocked call that has the CallRunner sleep for a while so that the fast