HBASE-26551 Add FastPath feature to HBase RWQueueRpcExecutor (#4036)
Signed-off-by: Reid Chan <reidchan@apache.org>
This commit is contained in:
parent
b840ed68f5
commit
540fbe51b0
|
@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.ipc;
|
|||
import java.util.Deque;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentLinkedDeque;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -41,7 +40,7 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
|
|||
/*
|
||||
* Stack of Handlers waiting for work.
|
||||
*/
|
||||
private final Deque<FastPathHandler> fastPathHandlerStack = new ConcurrentLinkedDeque<>();
|
||||
private final Deque<FastPathRpcHandler> fastPathHandlerStack = new ConcurrentLinkedDeque<>();
|
||||
|
||||
public FastPathBalancedQueueRpcExecutor(final String name, final int handlerCount,
|
||||
final int maxQueueLength, final PriorityFunction priority, final Configuration conf,
|
||||
|
@ -56,10 +55,12 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Handler getHandler(String name, double handlerFailureThreshhold,
|
||||
BlockingQueue<CallRunner> q, AtomicInteger activeHandlerCount) {
|
||||
return new FastPathHandler(name, handlerFailureThreshhold, q, activeHandlerCount,
|
||||
fastPathHandlerStack);
|
||||
protected RpcHandler getHandler(final String name, final double handlerFailureThreshhold,
|
||||
final int handlerCount, final BlockingQueue<CallRunner> q,
|
||||
final AtomicInteger activeHandlerCount, final AtomicInteger failedHandlerCount,
|
||||
final Abortable abortable) {
|
||||
return new FastPathRpcHandler(name, handlerFailureThreshhold, handlerCount, q,
|
||||
activeHandlerCount, failedHandlerCount, abortable, fastPathHandlerStack);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -69,62 +70,14 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
|
|||
if (currentQueueLimit == 0){
|
||||
return false;
|
||||
}
|
||||
FastPathHandler handler = popReadyHandler();
|
||||
FastPathRpcHandler handler = popReadyHandler();
|
||||
return handler != null? handler.loadCallRunner(callTask): super.dispatch(callTask);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Pop a Handler instance if one available ready-to-go or else return null.
|
||||
*/
|
||||
private FastPathHandler popReadyHandler() {
|
||||
private FastPathRpcHandler popReadyHandler() {
|
||||
return this.fastPathHandlerStack.poll();
|
||||
}
|
||||
|
||||
class FastPathHandler extends Handler {
|
||||
// Below are for fast-path support. Push this Handler on to the fastPathHandlerStack Deque
|
||||
// if an empty queue of CallRunners so we are available for direct handoff when one comes in.
|
||||
final Deque<FastPathHandler> fastPathHandlerStack;
|
||||
// Semaphore to coordinate loading of fastpathed loadedTask and our running it.
|
||||
// UNFAIR synchronization.
|
||||
private Semaphore semaphore = new Semaphore(0);
|
||||
// The task we get when fast-pathing.
|
||||
private CallRunner loadedCallRunner;
|
||||
|
||||
FastPathHandler(String name, double handlerFailureThreshhold, BlockingQueue<CallRunner> q,
|
||||
final AtomicInteger activeHandlerCount,
|
||||
final Deque<FastPathHandler> fastPathHandlerStack) {
|
||||
super(name, handlerFailureThreshhold, q, activeHandlerCount);
|
||||
this.fastPathHandlerStack = fastPathHandlerStack;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected CallRunner getCallRunner() throws InterruptedException {
|
||||
// Get a callrunner if one in the Q.
|
||||
CallRunner cr = this.q.poll();
|
||||
if (cr == null) {
|
||||
// Else, if a fastPathHandlerStack present and no callrunner in Q, register ourselves for
|
||||
// the fastpath handoff done via fastPathHandlerStack.
|
||||
if (this.fastPathHandlerStack != null) {
|
||||
this.fastPathHandlerStack.push(this);
|
||||
this.semaphore.acquire();
|
||||
cr = this.loadedCallRunner;
|
||||
this.loadedCallRunner = null;
|
||||
} else {
|
||||
// No fastpath available. Block until a task comes available.
|
||||
cr = super.getCallRunner();
|
||||
}
|
||||
}
|
||||
return cr;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param cr Task gotten via fastpath.
|
||||
* @return True if we successfully loaded our task
|
||||
*/
|
||||
boolean loadCallRunner(final CallRunner cr) {
|
||||
this.loadedCallRunner = cr;
|
||||
this.semaphore.release();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,70 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import java.util.Deque;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentLinkedDeque;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* RPC Executor that extends {@link RWQueueRpcExecutor} with fast-path feature, used in
|
||||
* {@link FastPathBalancedQueueRpcExecutor}.
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
|
||||
@InterfaceStability.Evolving
|
||||
public class FastPathRWQueueRpcExecutor extends RWQueueRpcExecutor {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RWQueueRpcExecutor.class);
|
||||
|
||||
private final Deque<FastPathRpcHandler> readHandlerStack = new ConcurrentLinkedDeque<>();
|
||||
private final Deque<FastPathRpcHandler> writeHandlerStack = new ConcurrentLinkedDeque<>();
|
||||
private final Deque<FastPathRpcHandler> scanHandlerStack = new ConcurrentLinkedDeque<>();
|
||||
|
||||
public FastPathRWQueueRpcExecutor(String name, int handlerCount, int maxQueueLength,
|
||||
PriorityFunction priority, Configuration conf, Abortable abortable) {
|
||||
super(name, handlerCount, maxQueueLength, priority, conf, abortable);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RpcHandler getHandler(final String name, final double handlerFailureThreshhold,
|
||||
final int handlerCount, final BlockingQueue<CallRunner> q,
|
||||
final AtomicInteger activeHandlerCount, final AtomicInteger failedHandlerCount,
|
||||
final Abortable abortable) {
|
||||
Deque<FastPathRpcHandler> handlerStack = name.contains("read") ? readHandlerStack :
|
||||
name.contains("write") ? writeHandlerStack : scanHandlerStack;
|
||||
return new FastPathRpcHandler(name, handlerFailureThreshhold, handlerCount, q,
|
||||
activeHandlerCount, failedHandlerCount, abortable, handlerStack);
|
||||
}
|
||||
|
||||
@Override public boolean dispatch(final CallRunner callTask) throws InterruptedException {
|
||||
RpcCall call = callTask.getRpcCall();
|
||||
boolean shouldDispatchToWriteQueue = isWriteRequest(call.getHeader(), call.getParam());
|
||||
boolean shouldDispatchToScanQueue = shouldDispatchToScanQueue(callTask);
|
||||
FastPathRpcHandler handler = shouldDispatchToWriteQueue ? writeHandlerStack.poll() :
|
||||
shouldDispatchToScanQueue ? scanHandlerStack.poll() : readHandlerStack.poll();
|
||||
return handler != null ? handler.loadCallRunner(callTask) :
|
||||
dispatchTo(shouldDispatchToWriteQueue, shouldDispatchToScanQueue, callTask);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,76 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import java.util.Deque;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class FastPathRpcHandler extends RpcHandler {
|
||||
// Below are for fast-path support. Push this Handler on to the fastPathHandlerStack Deque
|
||||
// if an empty queue of CallRunners so we are available for direct handoff when one comes in.
|
||||
final Deque<FastPathRpcHandler> fastPathHandlerStack;
|
||||
// Semaphore to coordinate loading of fastpathed loadedTask and our running it.
|
||||
// UNFAIR synchronization.
|
||||
private Semaphore semaphore = new Semaphore(0);
|
||||
// The task we get when fast-pathing.
|
||||
private CallRunner loadedCallRunner;
|
||||
|
||||
FastPathRpcHandler(String name, double handlerFailureThreshhold, int handlerCount,
|
||||
BlockingQueue<CallRunner> q, AtomicInteger activeHandlerCount,
|
||||
AtomicInteger failedHandlerCount, final Abortable abortable,
|
||||
final Deque<FastPathRpcHandler> fastPathHandlerStack) {
|
||||
super(name, handlerFailureThreshhold, handlerCount, q, activeHandlerCount, failedHandlerCount,
|
||||
abortable);
|
||||
this.fastPathHandlerStack = fastPathHandlerStack;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected CallRunner getCallRunner() throws InterruptedException {
|
||||
// Get a callrunner if one in the Q.
|
||||
CallRunner cr = this.q.poll();
|
||||
if (cr == null) {
|
||||
// Else, if a fastPathHandlerStack present and no callrunner in Q, register ourselves for
|
||||
// the fastpath handoff done via fastPathHandlerStack.
|
||||
if (this.fastPathHandlerStack != null) {
|
||||
this.fastPathHandlerStack.push(this);
|
||||
this.semaphore.acquire();
|
||||
cr = this.loadedCallRunner;
|
||||
this.loadedCallRunner = null;
|
||||
} else {
|
||||
// No fastpath available. Block until a task comes available.
|
||||
cr = super.getCallRunner();
|
||||
}
|
||||
}
|
||||
return cr;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param cr Task gotten via fastpath.
|
||||
* @return True if we successfully loaded our task
|
||||
*/
|
||||
boolean loadCallRunner(final CallRunner cr) {
|
||||
this.loadedCallRunner = cr;
|
||||
this.semaphore.release();
|
||||
return true;
|
||||
}
|
||||
}
|
|
@ -19,7 +19,7 @@
|
|||
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -130,16 +130,22 @@ public class RWQueueRpcExecutor extends RpcExecutor {
|
|||
@Override
|
||||
public boolean dispatch(final CallRunner callTask) throws InterruptedException {
|
||||
RpcCall call = callTask.getRpcCall();
|
||||
return dispatchTo(isWriteRequest(call.getHeader(), call.getParam()),
|
||||
shouldDispatchToScanQueue(callTask), callTask);
|
||||
}
|
||||
|
||||
protected boolean dispatchTo(boolean toWriteQueue, boolean toScanQueue,
|
||||
final CallRunner callTask) {
|
||||
int queueIndex;
|
||||
if (isWriteRequest(call.getHeader(), call.getParam())) {
|
||||
if (toWriteQueue) {
|
||||
queueIndex = writeBalancer.getNextQueue();
|
||||
} else if (numScanQueues > 0 && isScanRequest(call.getHeader(), call.getParam())) {
|
||||
} else if (toScanQueue) {
|
||||
queueIndex = numWriteQueues + numReadQueues + scanBalancer.getNextQueue();
|
||||
} else {
|
||||
queueIndex = numWriteQueues + readBalancer.getNextQueue();
|
||||
}
|
||||
|
||||
BlockingQueue<CallRunner> queue = queues.get(queueIndex);
|
||||
Queue<CallRunner> queue = queues.get(queueIndex);
|
||||
if (queue.size() >= currentQueueLimit) {
|
||||
return false;
|
||||
}
|
||||
|
@ -232,6 +238,11 @@ public class RWQueueRpcExecutor extends RpcExecutor {
|
|||
return param instanceof ScanRequest;
|
||||
}
|
||||
|
||||
protected boolean shouldDispatchToScanQueue(final CallRunner task) {
|
||||
RpcCall call = task.getRpcCall();
|
||||
return numScanQueues > 0 && isScanRequest(call.getHeader(), call.getParam());
|
||||
}
|
||||
|
||||
protected float getReadShare(final Configuration conf) {
|
||||
return conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
|
||||
}
|
||||
|
|
|
@ -39,10 +39,8 @@ import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;
|
|||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
|
||||
import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Strings;
|
||||
|
@ -98,12 +96,11 @@ public abstract class RpcExecutor {
|
|||
protected volatile int currentQueueLimit;
|
||||
|
||||
private final AtomicInteger activeHandlerCount = new AtomicInteger(0);
|
||||
private final List<Handler> handlers;
|
||||
private final List<RpcHandler> handlers;
|
||||
private final int handlerCount;
|
||||
private final AtomicInteger failedHandlerCount = new AtomicInteger(0);
|
||||
|
||||
private String name;
|
||||
private boolean running;
|
||||
|
||||
private Configuration conf = null;
|
||||
private Abortable abortable = null;
|
||||
|
@ -239,13 +236,12 @@ public abstract class RpcExecutor {
|
|||
}
|
||||
|
||||
public void start(final int port) {
|
||||
running = true;
|
||||
startHandlers(port);
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
running = false;
|
||||
for (Thread handler : handlers) {
|
||||
for (RpcHandler handler : handlers) {
|
||||
handler.stopRunning();
|
||||
handler.interrupt();
|
||||
}
|
||||
}
|
||||
|
@ -266,9 +262,12 @@ public abstract class RpcExecutor {
|
|||
/**
|
||||
* Override if providing alternate Handler implementation.
|
||||
*/
|
||||
protected Handler getHandler(final String name, final double handlerFailureThreshhold,
|
||||
final BlockingQueue<CallRunner> q, final AtomicInteger activeHandlerCount) {
|
||||
return new Handler(name, handlerFailureThreshhold, q, activeHandlerCount);
|
||||
protected RpcHandler getHandler(final String name, final double handlerFailureThreshhold,
|
||||
final int handlerCount, final BlockingQueue<CallRunner> q,
|
||||
final AtomicInteger activeHandlerCount, final AtomicInteger failedHandlerCount,
|
||||
final Abortable abortable) {
|
||||
return new RpcHandler(name, handlerFailureThreshhold, handlerCount, q, activeHandlerCount,
|
||||
failedHandlerCount, abortable);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -285,8 +284,8 @@ public abstract class RpcExecutor {
|
|||
final int index = qindex + (i % qsize);
|
||||
String name = "RpcServer." + threadPrefix + ".handler=" + handlers.size() + ",queue=" + index
|
||||
+ ",port=" + port;
|
||||
Handler handler = getHandler(name, handlerFailureThreshhold, callQueues.get(index),
|
||||
activeHandlerCount);
|
||||
RpcHandler handler = getHandler(name, handlerFailureThreshhold, handlerCount,
|
||||
callQueues.get(index), activeHandlerCount, failedHandlerCount, abortable);
|
||||
handler.start();
|
||||
handlers.add(handler);
|
||||
}
|
||||
|
@ -294,90 +293,6 @@ public abstract class RpcExecutor {
|
|||
handlers.size(), threadPrefix, qsize, port);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handler thread run the {@link CallRunner#run()} in.
|
||||
*/
|
||||
protected class Handler extends Thread {
|
||||
/**
|
||||
* Q to find CallRunners to run in.
|
||||
*/
|
||||
final BlockingQueue<CallRunner> q;
|
||||
|
||||
final double handlerFailureThreshhold;
|
||||
|
||||
// metrics (shared with other handlers)
|
||||
final AtomicInteger activeHandlerCount;
|
||||
|
||||
Handler(final String name, final double handlerFailureThreshhold,
|
||||
final BlockingQueue<CallRunner> q, final AtomicInteger activeHandlerCount) {
|
||||
super(name);
|
||||
setDaemon(true);
|
||||
this.q = q;
|
||||
this.handlerFailureThreshhold = handlerFailureThreshhold;
|
||||
this.activeHandlerCount = activeHandlerCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return A {@link CallRunner}
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
protected CallRunner getCallRunner() throws InterruptedException {
|
||||
return this.q.take();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
boolean interrupted = false;
|
||||
try {
|
||||
while (running) {
|
||||
try {
|
||||
run(getCallRunner());
|
||||
} catch (InterruptedException e) {
|
||||
interrupted = true;
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn(e.toString(), e);
|
||||
throw e;
|
||||
} finally {
|
||||
if (interrupted) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void run(CallRunner cr) {
|
||||
MonitoredRPCHandler status = RpcServer.getStatus();
|
||||
cr.setStatus(status);
|
||||
try {
|
||||
this.activeHandlerCount.incrementAndGet();
|
||||
cr.run();
|
||||
} catch (Throwable e) {
|
||||
if (e instanceof Error) {
|
||||
int failedCount = failedHandlerCount.incrementAndGet();
|
||||
if (this.handlerFailureThreshhold >= 0
|
||||
&& failedCount > handlerCount * this.handlerFailureThreshhold) {
|
||||
String message = "Number of failed RpcServer handler runs exceeded threshhold "
|
||||
+ this.handlerFailureThreshhold + "; reason: " + StringUtils.stringifyException(e);
|
||||
if (abortable != null) {
|
||||
abortable.abort(message, e);
|
||||
} else {
|
||||
LOG.error("Error but can't abort because abortable is null: "
|
||||
+ StringUtils.stringifyException(e));
|
||||
throw e;
|
||||
}
|
||||
} else {
|
||||
LOG.warn("Handler errors " + StringUtils.stringifyException(e));
|
||||
}
|
||||
} else {
|
||||
LOG.warn("Handler exception " + StringUtils.stringifyException(e));
|
||||
}
|
||||
} finally {
|
||||
this.activeHandlerCount.decrementAndGet();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static abstract class QueueBalancer {
|
||||
/**
|
||||
* @return the index of the next queue to which a request should be inserted
|
||||
|
|
|
@ -0,0 +1,131 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Thread to handle rpc call.
|
||||
* Should only be used in {@link RpcExecutor} and its sub-classes.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class RpcHandler extends Thread {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RpcHandler.class);
|
||||
|
||||
/**
|
||||
* Q to find CallRunners to run in.
|
||||
*/
|
||||
final BlockingQueue<CallRunner> q;
|
||||
|
||||
final int handlerCount;
|
||||
final double handlerFailureThreshhold;
|
||||
|
||||
// metrics (shared with other handlers)
|
||||
final AtomicInteger activeHandlerCount;
|
||||
final AtomicInteger failedHandlerCount;
|
||||
|
||||
// The up-level RpcServer.
|
||||
final Abortable abortable;
|
||||
|
||||
private boolean running;
|
||||
|
||||
RpcHandler(final String name, final double handlerFailureThreshhold, final int handlerCount,
|
||||
final BlockingQueue<CallRunner> q, final AtomicInteger activeHandlerCount,
|
||||
final AtomicInteger failedHandlerCount, final Abortable abortable) {
|
||||
super(name);
|
||||
setDaemon(true);
|
||||
this.q = q;
|
||||
this.handlerFailureThreshhold = handlerFailureThreshhold;
|
||||
this.activeHandlerCount = activeHandlerCount;
|
||||
this.failedHandlerCount = failedHandlerCount;
|
||||
this.handlerCount = handlerCount;
|
||||
this.abortable = abortable;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return A {@link CallRunner}
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
protected CallRunner getCallRunner() throws InterruptedException {
|
||||
return this.q.take();
|
||||
}
|
||||
|
||||
public void stopRunning() {
|
||||
running = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
boolean interrupted = false;
|
||||
running = true;
|
||||
try {
|
||||
while (running) {
|
||||
try {
|
||||
run(getCallRunner());
|
||||
} catch (InterruptedException e) {
|
||||
interrupted = true;
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn(e.toString(), e);
|
||||
throw e;
|
||||
} finally {
|
||||
if (interrupted) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void run(CallRunner cr) {
|
||||
MonitoredRPCHandler status = RpcServer.getStatus();
|
||||
cr.setStatus(status);
|
||||
try {
|
||||
this.activeHandlerCount.incrementAndGet();
|
||||
cr.run();
|
||||
} catch (Throwable e) {
|
||||
if (e instanceof Error) {
|
||||
int failedCount = failedHandlerCount.incrementAndGet();
|
||||
if (this.handlerFailureThreshhold >= 0
|
||||
&& failedCount > handlerCount * this.handlerFailureThreshhold) {
|
||||
String message = "Number of failed RpcServer handler runs exceeded threshhold "
|
||||
+ this.handlerFailureThreshhold + "; reason: " + StringUtils.stringifyException(e);
|
||||
if (abortable != null) {
|
||||
abortable.abort(message, e);
|
||||
} else {
|
||||
LOG.error("Error but can't abort because abortable is null: "
|
||||
+ StringUtils.stringifyException(e));
|
||||
throw e;
|
||||
}
|
||||
} else {
|
||||
LOG.warn("Handler errors " + StringUtils.stringifyException(e));
|
||||
}
|
||||
} else {
|
||||
LOG.warn("Handler exception " + StringUtils.stringifyException(e));
|
||||
}
|
||||
} finally {
|
||||
this.activeHandlerCount.decrementAndGet();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -85,7 +85,7 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
|
|||
|
||||
if (callqReadShare > 0) {
|
||||
// at least 1 read handler and 1 write handler
|
||||
callExecutor = new RWQueueRpcExecutor("default.RWQ", Math.max(2, handlerCount),
|
||||
callExecutor = new FastPathRWQueueRpcExecutor("default.FPRWQ", Math.max(2, handlerCount),
|
||||
maxQueueLength, priority, conf, server);
|
||||
} else {
|
||||
if (RpcExecutor.isFifoQueueType(callQueueType) || RpcExecutor.isCodelQueueType(callQueueType)) {
|
||||
|
|
|
@ -662,7 +662,7 @@ public class TestSimpleRpcScheduler {
|
|||
assertFalse(executor.dispatch(task));
|
||||
//make sure we never internally get a handler, which would skip the queue validation
|
||||
Mockito.verify(executor, Mockito.never()).getHandler(Mockito.any(), Mockito.anyDouble(),
|
||||
Mockito.any(), Mockito.any());
|
||||
Mockito.anyInt(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue