HBASE-26782 Minor code cleanup in and around RpcExecutor

Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Nick Dimiduk 2022-03-01 18:08:37 +01:00 committed by Nick Dimiduk
parent 591f7819f3
commit f3f2aa9631
5 changed files with 80 additions and 90 deletions

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -52,7 +52,7 @@ public class BalancedQueueRpcExecutor extends RpcExecutor {
}
@Override
public boolean dispatch(final CallRunner callTask) throws InterruptedException {
public boolean dispatch(final CallRunner callTask) {
int queueIndex = balancer.getNextQueue(callTask);
BlockingQueue<CallRunner> queue = queues.get(queueIndex);
// that means we can overflow by at most <num reader> size (5), that's ok

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -20,9 +20,7 @@ package org.apache.hadoop.hbase.ipc;
import java.util.Deque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.yetus.audience.InterfaceAudience;
@ -65,7 +63,7 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
}
@Override
public boolean dispatch(CallRunner callTask) throws InterruptedException {
public boolean dispatch(CallRunner callTask) {
//FastPathHandlers don't check queue limits, so if we're completely shut down
//we have to prevent ourselves from using the handler in the first place
if (currentQueueLimit == 0){

View File

@ -1,5 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -27,8 +26,6 @@ import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* RPC Executor that extends {@link RWQueueRpcExecutor} with fast-path feature, used in
@ -37,7 +34,6 @@ import org.slf4j.LoggerFactory;
@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
@InterfaceStability.Evolving
public class FastPathRWQueueRpcExecutor extends RWQueueRpcExecutor {
private static final Logger LOG = LoggerFactory.getLogger(RWQueueRpcExecutor.class);
private final Deque<FastPathRpcHandler> readHandlerStack = new ConcurrentLinkedDeque<>();
private final Deque<FastPathRpcHandler> writeHandlerStack = new ConcurrentLinkedDeque<>();
@ -60,7 +56,7 @@ public class FastPathRWQueueRpcExecutor extends RWQueueRpcExecutor {
}
@Override
public boolean dispatch(final CallRunner callTask) throws InterruptedException {
public boolean dispatch(final CallRunner callTask) {
RpcCall call = callTask.getRpcCall();
boolean shouldDispatchToWriteQueue = isWriteRequest(call.getHeader(), call.getParam());
boolean shouldDispatchToScanQueue = shouldDispatchToScanQueue(callTask);

View File

@ -132,7 +132,7 @@ public class RWQueueRpcExecutor extends RpcExecutor {
}
@Override
public boolean dispatch(final CallRunner callTask) throws InterruptedException {
public boolean dispatch(final CallRunner callTask) {
RpcCall call = callTask.getRpcCall();
return dispatchTo(isWriteRequest(call.getHeader(), call.getParam()),
shouldDispatchToScanQueue(callTask), callTask);

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -19,8 +19,8 @@
package org.apache.hadoop.hbase.ipc;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@ -29,20 +29,21 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.base.Strings;
import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
/**
* Runs the CallRunners passed here via {@link #dispatch(CallRunner)}. Subclass and add particular
@ -53,14 +54,16 @@ public abstract class RpcExecutor {
private static final Logger LOG = LoggerFactory.getLogger(RpcExecutor.class);
protected static final int DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT = 250;
public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = "hbase.ipc.server.callqueue.handler.factor";
public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
"hbase.ipc.server.callqueue.handler.factor";
/** max delay in msec used to bound the deprioritized requests */
public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY = "hbase.ipc.server.queue.max.call.delay";
/** max delay in msec used to bound the de-prioritized requests */
public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY =
"hbase.ipc.server.queue.max.call.delay";
/**
* The default, 'fifo', has the least friction but is dumb. If set to 'deadline', uses a priority
* queue and deprioritizes long-running scans. Sorting by priority comes at a cost, reduced
* queue and de-prioritizes long-running scans. Sorting by priority comes at a cost, reduced
* throughput.
*/
public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel";
@ -70,14 +73,18 @@ public abstract class RpcExecutor {
public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type";
public static final String CALL_QUEUE_TYPE_CONF_DEFAULT = CALL_QUEUE_TYPE_FIFO_CONF_VALUE;
public static final String CALL_QUEUE_QUEUE_BALANCER_CLASS = "hbase.ipc.server.callqueue.balancer.class";
public static final String CALL_QUEUE_QUEUE_BALANCER_CLASS =
"hbase.ipc.server.callqueue.balancer.class";
public static final Class<?> CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT = RandomQueueBalancer.class;
// These 3 are only used by Codel executor
public static final String CALL_QUEUE_CODEL_TARGET_DELAY = "hbase.ipc.server.callqueue.codel.target.delay";
public static final String CALL_QUEUE_CODEL_INTERVAL = "hbase.ipc.server.callqueue.codel.interval";
public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD = "hbase.ipc.server.callqueue.codel.lifo.threshold";
public static final String CALL_QUEUE_CODEL_TARGET_DELAY =
"hbase.ipc.server.callqueue.codel.target.delay";
public static final String CALL_QUEUE_CODEL_INTERVAL =
"hbase.ipc.server.callqueue.codel.interval";
public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD =
"hbase.ipc.server.callqueue.codel.lifo.threshold";
public static final int CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY = 100;
public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL = 100;
@ -88,16 +95,14 @@ public abstract class RpcExecutor {
public static final String PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED =
"hbase.ipc.server.callqueue.pluggable.queue.fast.path.enabled";
private LongAdder numGeneralCallsDropped = new LongAdder();
private LongAdder numLifoModeSwitches = new LongAdder();
private final LongAdder numGeneralCallsDropped = new LongAdder();
private final LongAdder numLifoModeSwitches = new LongAdder();
protected final int numCallQueues;
protected final List<BlockingQueue<CallRunner>> queues;
private final Class<? extends BlockingQueue> queueClass;
private final Object[] queueInitArgs;
private final PriorityFunction priority;
protected volatile int currentQueueLimit;
private final AtomicInteger activeHandlerCount = new AtomicInteger(0);
@ -107,8 +112,8 @@ public abstract class RpcExecutor {
private String name;
private Configuration conf = null;
private Abortable abortable = null;
private final Configuration conf;
private final Abortable abortable;
public RpcExecutor(final String name, final int handlerCount, final int maxQueueLength,
final PriorityFunction priority, final Configuration conf, final Abortable abortable) {
@ -144,12 +149,10 @@ public abstract class RpcExecutor {
this.handlerCount = Math.max(handlerCount, this.numCallQueues);
this.handlers = new ArrayList<>(this.handlerCount);
this.priority = priority;
if (isDeadlineQueueType(callQueueType)) {
this.name += ".Deadline";
this.queueInitArgs = new Object[] { maxQueueLength,
new CallPriorityComparator(conf, this.priority) };
new CallPriorityComparator(conf, priority) };
this.queueClass = BoundedPriorityBlockingQueue.class;
} else if (isCodelQueueType(callQueueType)) {
this.name += ".Codel";
@ -159,16 +162,17 @@ public abstract class RpcExecutor {
double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD,
CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD);
this.queueInitArgs = new Object[] { maxQueueLength, codelTargetDelay, codelInterval,
codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches };
codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches };
this.queueClass = AdaptiveLifoCoDelCallQueue.class;
} else if (isPluggableQueueType(callQueueType)) {
Optional<Class<? extends BlockingQueue<CallRunner>>> pluggableQueueClass = getPluggableQueueClass();
Optional<Class<? extends BlockingQueue<CallRunner>>> pluggableQueueClass =
getPluggableQueueClass();
if (!pluggableQueueClass.isPresent()) {
throw new PluggableRpcQueueNotFound("Pluggable call queue failed to load and selected call"
+ " queue type required");
} else {
this.queueInitArgs = new Object[] { maxQueueLength, this.priority, conf };
this.queueInitArgs = new Object[] { maxQueueLength, priority, conf };
this.queueClass = pluggableQueueClass.get();
}
} else {
@ -186,50 +190,41 @@ public abstract class RpcExecutor {
return Math.max(1, Math.round(handlerCount * callQueuesHandlersFactor));
}
/**
* Return the {@link Descriptors.MethodDescriptor#getName()} from {@code callRunner} or "Unknown".
*/
private static String getMethodName(final CallRunner callRunner) {
return Optional.ofNullable(callRunner)
.map(CallRunner::getRpcCall)
.map(RpcCall::getMethod)
.map(Descriptors.MethodDescriptor::getName)
.orElse("Unknown");
}
/**
* Return the {@link RpcCall#getSize()} from {@code callRunner} or 0L.
*/
private static long getRpcCallSize(final CallRunner callRunner) {
return Optional.ofNullable(callRunner)
.map(CallRunner::getRpcCall)
.map(RpcCall::getSize)
.orElse(0L);
}
public Map<String, Long> getCallQueueCountsSummary() {
HashMap<String, Long> callQueueMethodTotalCount = new HashMap<>();
for(BlockingQueue<CallRunner> queue: queues) {
for (CallRunner cr:queue) {
RpcCall rpcCall = cr.getRpcCall();
String method;
if (null==rpcCall.getMethod() ||
StringUtil.isNullOrEmpty(method = rpcCall.getMethod().getName())) {
method = "Unknown";
}
callQueueMethodTotalCount.put(method, 1+callQueueMethodTotalCount.getOrDefault(method, 0L));
}
}
return callQueueMethodTotalCount;
return queues.stream()
.flatMap(Collection::stream)
.map(RpcExecutor::getMethodName)
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
}
public Map<String, Long> getCallQueueSizeSummary() {
HashMap<String, Long> callQueueMethodTotalSize = new HashMap<>();
for(BlockingQueue<CallRunner> queue: queues) {
for (CallRunner cr:queue) {
RpcCall rpcCall = cr.getRpcCall();
String method;
if (null==rpcCall.getMethod() ||
StringUtil.isNullOrEmpty(method = rpcCall.getMethod().getName())) {
method = "Unknown";
}
long size = rpcCall.getSize();
callQueueMethodTotalSize.put(method, size+callQueueMethodTotalSize.getOrDefault(method, 0L));
}
}
return callQueueMethodTotalSize;
return queues.stream()
.flatMap(Collection::stream)
.map(callRunner -> new Pair<>(getMethodName(callRunner), getRpcCallSize(callRunner)))
.collect(Collectors.groupingBy(Pair::getFirst, Collectors.summingLong(Pair::getSecond)));
}
protected void initializeQueues(final int numQueues) {
if (queueInitArgs.length > 0) {
currentQueueLimit = (int) queueInitArgs[0];
@ -252,7 +247,7 @@ public abstract class RpcExecutor {
}
/** Add the request to the executor queue */
public abstract boolean dispatch(final CallRunner callTask) throws InterruptedException;
public abstract boolean dispatch(final CallRunner callTask);
/** Returns the list of request queues */
protected List<BlockingQueue<CallRunner>> getQueues() {
@ -298,26 +293,26 @@ public abstract class RpcExecutor {
handlers.size(), threadPrefix, qsize, port);
}
public static QueueBalancer getBalancer(String executorName, Configuration conf, List<BlockingQueue<CallRunner>> queues) {
/**
* All requests go to the first queue, at index 0
*/
private static final QueueBalancer ONE_QUEUE = val -> 0;
public static QueueBalancer getBalancer(
final String executorName,
final Configuration conf,
final List<BlockingQueue<CallRunner>> queues
) {
Preconditions.checkArgument(queues.size() > 0, "Queue size is <= 0, must be at least 1");
if (queues.size() == 1) {
return ONE_QUEUE;
} else {
Class<?> balancerClass = conf.getClass(CALL_QUEUE_QUEUE_BALANCER_CLASS, CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT);
Class<?> balancerClass = conf.getClass(
CALL_QUEUE_QUEUE_BALANCER_CLASS, CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT);
return (QueueBalancer) ReflectionUtils.newInstance(balancerClass, conf, executorName, queues);
}
}
/**
* All requests go to the first queue, at index 0
*/
private static QueueBalancer ONE_QUEUE = new QueueBalancer() {
@Override
public int getNextQueue(CallRunner callRunner) {
return 0;
}
};
/**
* Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY is set to true. It
* uses the calculated "deadline" e.g. to deprioritize long-running job If multiple requests have
@ -455,7 +450,8 @@ public abstract class RpcExecutor {
configKey = RpcScheduler.IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH;
}
}
currentQueueLimit = conf.getInt(configKey, currentQueueLimit);
final int queueLimit = currentQueueLimit;
currentQueueLimit = conf.getInt(configKey, queueLimit);
}
public void onConfigurationChange(Configuration conf) {