HBASE-26782 Minor code cleanup in and around RpcExecutor
Signed-off-by: Bharath Vissapragada <bharathv@apache.org> Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
300f9b9576
commit
7e267c6069
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -52,7 +52,7 @@ public class BalancedQueueRpcExecutor extends RpcExecutor {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean dispatch(final CallRunner callTask) throws InterruptedException {
|
||||
public boolean dispatch(final CallRunner callTask) {
|
||||
int queueIndex = balancer.getNextQueue(callTask);
|
||||
BlockingQueue<CallRunner> queue = queues.get(queueIndex);
|
||||
// that means we can overflow by at most <num reader> size (5), that's ok
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -21,7 +21,6 @@ import java.util.Deque;
|
|||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentLinkedDeque;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -64,7 +63,7 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean dispatch(CallRunner callTask) throws InterruptedException {
|
||||
public boolean dispatch(CallRunner callTask) {
|
||||
//FastPathHandlers don't check queue limits, so if we're completely shut down
|
||||
//we have to prevent ourselves from using the handler in the first place
|
||||
if (currentQueueLimit == 0){
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -26,8 +26,6 @@ import org.apache.hadoop.hbase.Abortable;
|
|||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* RPC Executor that extends {@link RWQueueRpcExecutor} with fast-path feature, used in
|
||||
|
@ -36,7 +34,6 @@ import org.slf4j.LoggerFactory;
|
|||
@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
|
||||
@InterfaceStability.Evolving
|
||||
public class FastPathRWQueueRpcExecutor extends RWQueueRpcExecutor {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RWQueueRpcExecutor.class);
|
||||
|
||||
private final Deque<FastPathRpcHandler> readHandlerStack = new ConcurrentLinkedDeque<>();
|
||||
private final Deque<FastPathRpcHandler> writeHandlerStack = new ConcurrentLinkedDeque<>();
|
||||
|
@ -59,7 +56,7 @@ public class FastPathRWQueueRpcExecutor extends RWQueueRpcExecutor {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean dispatch(final CallRunner callTask) throws InterruptedException {
|
||||
public boolean dispatch(final CallRunner callTask) {
|
||||
RpcCall call = callTask.getRpcCall();
|
||||
boolean shouldDispatchToWriteQueue = isWriteRequest(call.getHeader(), call.getParam());
|
||||
boolean shouldDispatchToScanQueue = shouldDispatchToScanQueue(callTask);
|
||||
|
|
|
@ -131,7 +131,7 @@ public class RWQueueRpcExecutor extends RpcExecutor {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean dispatch(final CallRunner callTask) throws InterruptedException {
|
||||
public boolean dispatch(final CallRunner callTask) {
|
||||
RpcCall call = callTask.getRpcCall();
|
||||
return dispatchTo(isWriteRequest(call.getHeader(), call.getParam()),
|
||||
shouldDispatchToScanQueue(callTask), callTask);
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -19,8 +19,8 @@
|
|||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
|
@ -29,18 +29,21 @@ import java.util.concurrent.BlockingQueue;
|
|||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.LongAdder;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
||||
import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Strings;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
|
||||
|
||||
/**
|
||||
* Runs the CallRunners passed here via {@link #dispatch(CallRunner)}. Subclass and add particular
|
||||
|
@ -51,14 +54,16 @@ public abstract class RpcExecutor {
|
|||
private static final Logger LOG = LoggerFactory.getLogger(RpcExecutor.class);
|
||||
|
||||
protected static final int DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT = 250;
|
||||
public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = "hbase.ipc.server.callqueue.handler.factor";
|
||||
public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
|
||||
"hbase.ipc.server.callqueue.handler.factor";
|
||||
|
||||
/** max delay in msec used to bound the deprioritized requests */
|
||||
public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY = "hbase.ipc.server.queue.max.call.delay";
|
||||
/** max delay in msec used to bound the de-prioritized requests */
|
||||
public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY =
|
||||
"hbase.ipc.server.queue.max.call.delay";
|
||||
|
||||
/**
|
||||
* The default, 'fifo', has the least friction but is dumb. If set to 'deadline', uses a priority
|
||||
* queue and deprioritizes long-running scans. Sorting by priority comes at a cost, reduced
|
||||
* queue and de-prioritizes long-running scans. Sorting by priority comes at a cost, reduced
|
||||
* throughput.
|
||||
*/
|
||||
public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel";
|
||||
|
@ -68,14 +73,18 @@ public abstract class RpcExecutor {
|
|||
public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type";
|
||||
public static final String CALL_QUEUE_TYPE_CONF_DEFAULT = CALL_QUEUE_TYPE_FIFO_CONF_VALUE;
|
||||
|
||||
public static final String CALL_QUEUE_QUEUE_BALANCER_CLASS = "hbase.ipc.server.callqueue.balancer.class";
|
||||
public static final String CALL_QUEUE_QUEUE_BALANCER_CLASS =
|
||||
"hbase.ipc.server.callqueue.balancer.class";
|
||||
public static final Class<?> CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT = RandomQueueBalancer.class;
|
||||
|
||||
|
||||
// These 3 are only used by Codel executor
|
||||
public static final String CALL_QUEUE_CODEL_TARGET_DELAY = "hbase.ipc.server.callqueue.codel.target.delay";
|
||||
public static final String CALL_QUEUE_CODEL_INTERVAL = "hbase.ipc.server.callqueue.codel.interval";
|
||||
public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD = "hbase.ipc.server.callqueue.codel.lifo.threshold";
|
||||
public static final String CALL_QUEUE_CODEL_TARGET_DELAY =
|
||||
"hbase.ipc.server.callqueue.codel.target.delay";
|
||||
public static final String CALL_QUEUE_CODEL_INTERVAL =
|
||||
"hbase.ipc.server.callqueue.codel.interval";
|
||||
public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD =
|
||||
"hbase.ipc.server.callqueue.codel.lifo.threshold";
|
||||
|
||||
public static final int CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY = 100;
|
||||
public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL = 100;
|
||||
|
@ -86,16 +95,14 @@ public abstract class RpcExecutor {
|
|||
public static final String PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED =
|
||||
"hbase.ipc.server.callqueue.pluggable.queue.fast.path.enabled";
|
||||
|
||||
private LongAdder numGeneralCallsDropped = new LongAdder();
|
||||
private LongAdder numLifoModeSwitches = new LongAdder();
|
||||
private final LongAdder numGeneralCallsDropped = new LongAdder();
|
||||
private final LongAdder numLifoModeSwitches = new LongAdder();
|
||||
|
||||
protected final int numCallQueues;
|
||||
protected final List<BlockingQueue<CallRunner>> queues;
|
||||
private final Class<? extends BlockingQueue> queueClass;
|
||||
private final Object[] queueInitArgs;
|
||||
|
||||
private final PriorityFunction priority;
|
||||
|
||||
protected volatile int currentQueueLimit;
|
||||
|
||||
private final AtomicInteger activeHandlerCount = new AtomicInteger(0);
|
||||
|
@ -105,8 +112,8 @@ public abstract class RpcExecutor {
|
|||
|
||||
private String name;
|
||||
|
||||
private Configuration conf = null;
|
||||
private Abortable abortable = null;
|
||||
private final Configuration conf;
|
||||
private final Abortable abortable;
|
||||
|
||||
public RpcExecutor(final String name, final int handlerCount, final int maxQueueLength,
|
||||
final PriorityFunction priority, final Configuration conf, final Abortable abortable) {
|
||||
|
@ -142,12 +149,10 @@ public abstract class RpcExecutor {
|
|||
this.handlerCount = Math.max(handlerCount, this.numCallQueues);
|
||||
this.handlers = new ArrayList<>(this.handlerCount);
|
||||
|
||||
this.priority = priority;
|
||||
|
||||
if (isDeadlineQueueType(callQueueType)) {
|
||||
this.name += ".Deadline";
|
||||
this.queueInitArgs = new Object[] { maxQueueLength,
|
||||
new CallPriorityComparator(conf, this.priority) };
|
||||
new CallPriorityComparator(conf, priority) };
|
||||
this.queueClass = BoundedPriorityBlockingQueue.class;
|
||||
} else if (isCodelQueueType(callQueueType)) {
|
||||
this.name += ".Codel";
|
||||
|
@ -157,16 +162,17 @@ public abstract class RpcExecutor {
|
|||
double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD,
|
||||
CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD);
|
||||
this.queueInitArgs = new Object[] { maxQueueLength, codelTargetDelay, codelInterval,
|
||||
codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches };
|
||||
codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches };
|
||||
this.queueClass = AdaptiveLifoCoDelCallQueue.class;
|
||||
} else if (isPluggableQueueType(callQueueType)) {
|
||||
Optional<Class<? extends BlockingQueue<CallRunner>>> pluggableQueueClass = getPluggableQueueClass();
|
||||
Optional<Class<? extends BlockingQueue<CallRunner>>> pluggableQueueClass =
|
||||
getPluggableQueueClass();
|
||||
|
||||
if (!pluggableQueueClass.isPresent()) {
|
||||
throw new PluggableRpcQueueNotFound("Pluggable call queue failed to load and selected call"
|
||||
+ " queue type required");
|
||||
} else {
|
||||
this.queueInitArgs = new Object[] { maxQueueLength, this.priority, conf };
|
||||
this.queueInitArgs = new Object[] { maxQueueLength, priority, conf };
|
||||
this.queueClass = pluggableQueueClass.get();
|
||||
}
|
||||
} else {
|
||||
|
@ -184,50 +190,41 @@ public abstract class RpcExecutor {
|
|||
return Math.max(1, Math.round(handlerCount * callQueuesHandlersFactor));
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the {@link Descriptors.MethodDescriptor#getName()} from {@code callRunner} or "Unknown".
|
||||
*/
|
||||
private static String getMethodName(final CallRunner callRunner) {
|
||||
return Optional.ofNullable(callRunner)
|
||||
.map(CallRunner::getRpcCall)
|
||||
.map(RpcCall::getMethod)
|
||||
.map(Descriptors.MethodDescriptor::getName)
|
||||
.orElse("Unknown");
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the {@link RpcCall#getSize()} from {@code callRunner} or 0L.
|
||||
*/
|
||||
private static long getRpcCallSize(final CallRunner callRunner) {
|
||||
return Optional.ofNullable(callRunner)
|
||||
.map(CallRunner::getRpcCall)
|
||||
.map(RpcCall::getSize)
|
||||
.orElse(0L);
|
||||
}
|
||||
|
||||
public Map<String, Long> getCallQueueCountsSummary() {
|
||||
HashMap<String, Long> callQueueMethodTotalCount = new HashMap<>();
|
||||
|
||||
for(BlockingQueue<CallRunner> queue: queues) {
|
||||
for (CallRunner cr:queue) {
|
||||
RpcCall rpcCall = cr.getRpcCall();
|
||||
|
||||
String method;
|
||||
|
||||
if (null==rpcCall.getMethod() ||
|
||||
StringUtil.isNullOrEmpty(method = rpcCall.getMethod().getName())) {
|
||||
method = "Unknown";
|
||||
}
|
||||
|
||||
callQueueMethodTotalCount.put(method, 1+callQueueMethodTotalCount.getOrDefault(method, 0L));
|
||||
}
|
||||
}
|
||||
|
||||
return callQueueMethodTotalCount;
|
||||
return queues.stream()
|
||||
.flatMap(Collection::stream)
|
||||
.map(RpcExecutor::getMethodName)
|
||||
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
|
||||
}
|
||||
|
||||
public Map<String, Long> getCallQueueSizeSummary() {
|
||||
HashMap<String, Long> callQueueMethodTotalSize = new HashMap<>();
|
||||
|
||||
for(BlockingQueue<CallRunner> queue: queues) {
|
||||
for (CallRunner cr:queue) {
|
||||
RpcCall rpcCall = cr.getRpcCall();
|
||||
String method;
|
||||
|
||||
if (null==rpcCall.getMethod() ||
|
||||
StringUtil.isNullOrEmpty(method = rpcCall.getMethod().getName())) {
|
||||
method = "Unknown";
|
||||
}
|
||||
|
||||
long size = rpcCall.getSize();
|
||||
|
||||
callQueueMethodTotalSize.put(method, size+callQueueMethodTotalSize.getOrDefault(method, 0L));
|
||||
}
|
||||
}
|
||||
|
||||
return callQueueMethodTotalSize;
|
||||
return queues.stream()
|
||||
.flatMap(Collection::stream)
|
||||
.map(callRunner -> new Pair<>(getMethodName(callRunner), getRpcCallSize(callRunner)))
|
||||
.collect(Collectors.groupingBy(Pair::getFirst, Collectors.summingLong(Pair::getSecond)));
|
||||
}
|
||||
|
||||
|
||||
protected void initializeQueues(final int numQueues) {
|
||||
if (queueInitArgs.length > 0) {
|
||||
currentQueueLimit = (int) queueInitArgs[0];
|
||||
|
@ -250,7 +247,7 @@ public abstract class RpcExecutor {
|
|||
}
|
||||
|
||||
/** Add the request to the executor queue */
|
||||
public abstract boolean dispatch(final CallRunner callTask) throws InterruptedException;
|
||||
public abstract boolean dispatch(final CallRunner callTask);
|
||||
|
||||
/** Returns the list of request queues */
|
||||
protected List<BlockingQueue<CallRunner>> getQueues() {
|
||||
|
@ -296,26 +293,26 @@ public abstract class RpcExecutor {
|
|||
handlers.size(), threadPrefix, qsize, port);
|
||||
}
|
||||
|
||||
public static QueueBalancer getBalancer(String executorName, Configuration conf, List<BlockingQueue<CallRunner>> queues) {
|
||||
/**
|
||||
* All requests go to the first queue, at index 0
|
||||
*/
|
||||
private static final QueueBalancer ONE_QUEUE = val -> 0;
|
||||
|
||||
public static QueueBalancer getBalancer(
|
||||
final String executorName,
|
||||
final Configuration conf,
|
||||
final List<BlockingQueue<CallRunner>> queues
|
||||
) {
|
||||
Preconditions.checkArgument(queues.size() > 0, "Queue size is <= 0, must be at least 1");
|
||||
if (queues.size() == 1) {
|
||||
return ONE_QUEUE;
|
||||
} else {
|
||||
Class<?> balancerClass = conf.getClass(CALL_QUEUE_QUEUE_BALANCER_CLASS, CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT);
|
||||
Class<?> balancerClass = conf.getClass(
|
||||
CALL_QUEUE_QUEUE_BALANCER_CLASS, CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT);
|
||||
return (QueueBalancer) ReflectionUtils.newInstance(balancerClass, conf, executorName, queues);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* All requests go to the first queue, at index 0
|
||||
*/
|
||||
private static QueueBalancer ONE_QUEUE = new QueueBalancer() {
|
||||
@Override
|
||||
public int getNextQueue(CallRunner callRunner) {
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY is set to true. It
|
||||
* uses the calculated "deadline" e.g. to deprioritize long-running job If multiple requests have
|
||||
|
@ -449,7 +446,8 @@ public abstract class RpcExecutor {
|
|||
if (name != null && name.toLowerCase(Locale.ROOT).contains("priority")) {
|
||||
configKey = RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH;
|
||||
}
|
||||
currentQueueLimit = conf.getInt(configKey, currentQueueLimit);
|
||||
final int queueLimit = currentQueueLimit;
|
||||
currentQueueLimit = conf.getInt(configKey, queueLimit);
|
||||
}
|
||||
|
||||
public void onConfigurationChange(Configuration conf) {
|
||||
|
|
Loading…
Reference in New Issue