From 7e267c6069ac62bb2cdc1e00fbc40486c3277194 Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Tue, 1 Mar 2022 18:08:37 +0100 Subject: [PATCH] HBASE-26782 Minor code cleanup in and around RpcExecutor Signed-off-by: Bharath Vissapragada Signed-off-by: Duo Zhang --- .../hbase/ipc/BalancedQueueRpcExecutor.java | 4 +- .../ipc/FastPathBalancedQueueRpcExecutor.java | 5 +- .../hbase/ipc/FastPathRWQueueRpcExecutor.java | 7 +- .../hadoop/hbase/ipc/RWQueueRpcExecutor.java | 2 +- .../apache/hadoop/hbase/ipc/RpcExecutor.java | 148 +++++++++--------- 5 files changed, 80 insertions(+), 86 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java index ad1747ba3b1..8e5467478ca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -52,7 +52,7 @@ public class BalancedQueueRpcExecutor extends RpcExecutor { } @Override - public boolean dispatch(final CallRunner callTask) throws InterruptedException { + public boolean dispatch(final CallRunner callTask) { int queueIndex = balancer.getNextQueue(callTask); BlockingQueue queue = queues.get(queueIndex); // that means we can overflow by at most size (5), that's ok diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java index e64ba4eac7e..9e6a0bb103a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,7 +21,6 @@ import java.util.Deque; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicInteger; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.yetus.audience.InterfaceAudience; @@ -64,7 +63,7 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor { } @Override - public boolean dispatch(CallRunner callTask) throws InterruptedException { + public boolean dispatch(CallRunner callTask) { //FastPathHandlers don't check queue limits, so if we're completely shut down //we have to prevent ourselves from using the handler in the first place if (currentQueueLimit == 0){ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java index cadce823986..90dd78ac6d4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -26,8 +26,6 @@ import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * RPC Executor that extends {@link RWQueueRpcExecutor} with fast-path feature, used in @@ -36,7 +34,6 @@ import org.slf4j.LoggerFactory; @InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) @InterfaceStability.Evolving public class FastPathRWQueueRpcExecutor extends RWQueueRpcExecutor { - private static final Logger LOG = LoggerFactory.getLogger(RWQueueRpcExecutor.class); private final Deque readHandlerStack = new ConcurrentLinkedDeque<>(); private final Deque writeHandlerStack = new ConcurrentLinkedDeque<>(); @@ -59,7 +56,7 @@ public class FastPathRWQueueRpcExecutor extends RWQueueRpcExecutor { } @Override - public boolean dispatch(final CallRunner callTask) throws InterruptedException { + public boolean dispatch(final CallRunner callTask) { RpcCall call = callTask.getRpcCall(); boolean shouldDispatchToWriteQueue = isWriteRequest(call.getHeader(), call.getParam()); boolean shouldDispatchToScanQueue = shouldDispatchToScanQueue(callTask); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java index ed1d0674832..563230932cf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java @@ -131,7 +131,7 @@ public class RWQueueRpcExecutor extends RpcExecutor { } @Override - public boolean dispatch(final CallRunner callTask) throws InterruptedException { + public boolean dispatch(final CallRunner callTask) { RpcCall call = callTask.getRpcCall(); return dispatchTo(isWriteRequest(call.getHeader(), call.getParam()), shouldDispatchToScanQueue(callTask), callTask); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java index 3c4bb1545c9..45bbe18634b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,8 +19,8 @@ package org.apache.hadoop.hbase.ipc; import java.util.ArrayList; +import java.util.Collection; import java.util.Comparator; -import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -29,18 +29,21 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; +import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ReflectionUtils; -import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.base.Strings; +import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; /** * Runs the CallRunners passed here via {@link #dispatch(CallRunner)}. Subclass and add particular @@ -51,14 +54,16 @@ public abstract class RpcExecutor { private static final Logger LOG = LoggerFactory.getLogger(RpcExecutor.class); protected static final int DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT = 250; - public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = "hbase.ipc.server.callqueue.handler.factor"; + public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = + "hbase.ipc.server.callqueue.handler.factor"; - /** max delay in msec used to bound the deprioritized requests */ - public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY = "hbase.ipc.server.queue.max.call.delay"; + /** max delay in msec used to bound the de-prioritized requests */ + public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY = + "hbase.ipc.server.queue.max.call.delay"; /** * The default, 'fifo', has the least friction but is dumb. If set to 'deadline', uses a priority - * queue and deprioritizes long-running scans. Sorting by priority comes at a cost, reduced + * queue and de-prioritizes long-running scans. Sorting by priority comes at a cost, reduced * throughput. */ public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel"; @@ -68,14 +73,18 @@ public abstract class RpcExecutor { public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type"; public static final String CALL_QUEUE_TYPE_CONF_DEFAULT = CALL_QUEUE_TYPE_FIFO_CONF_VALUE; - public static final String CALL_QUEUE_QUEUE_BALANCER_CLASS = "hbase.ipc.server.callqueue.balancer.class"; + public static final String CALL_QUEUE_QUEUE_BALANCER_CLASS = + "hbase.ipc.server.callqueue.balancer.class"; public static final Class CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT = RandomQueueBalancer.class; // These 3 are only used by Codel executor - public static final String CALL_QUEUE_CODEL_TARGET_DELAY = "hbase.ipc.server.callqueue.codel.target.delay"; - public static final String CALL_QUEUE_CODEL_INTERVAL = "hbase.ipc.server.callqueue.codel.interval"; - public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD = "hbase.ipc.server.callqueue.codel.lifo.threshold"; + public static final String CALL_QUEUE_CODEL_TARGET_DELAY = + "hbase.ipc.server.callqueue.codel.target.delay"; + public static final String CALL_QUEUE_CODEL_INTERVAL = + "hbase.ipc.server.callqueue.codel.interval"; + public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD = + "hbase.ipc.server.callqueue.codel.lifo.threshold"; public static final int CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY = 100; public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL = 100; @@ -86,16 +95,14 @@ public abstract class RpcExecutor { public static final String PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED = "hbase.ipc.server.callqueue.pluggable.queue.fast.path.enabled"; - private LongAdder numGeneralCallsDropped = new LongAdder(); - private LongAdder numLifoModeSwitches = new LongAdder(); + private final LongAdder numGeneralCallsDropped = new LongAdder(); + private final LongAdder numLifoModeSwitches = new LongAdder(); protected final int numCallQueues; protected final List> queues; private final Class queueClass; private final Object[] queueInitArgs; - private final PriorityFunction priority; - protected volatile int currentQueueLimit; private final AtomicInteger activeHandlerCount = new AtomicInteger(0); @@ -105,8 +112,8 @@ public abstract class RpcExecutor { private String name; - private Configuration conf = null; - private Abortable abortable = null; + private final Configuration conf; + private final Abortable abortable; public RpcExecutor(final String name, final int handlerCount, final int maxQueueLength, final PriorityFunction priority, final Configuration conf, final Abortable abortable) { @@ -142,12 +149,10 @@ public abstract class RpcExecutor { this.handlerCount = Math.max(handlerCount, this.numCallQueues); this.handlers = new ArrayList<>(this.handlerCount); - this.priority = priority; - if (isDeadlineQueueType(callQueueType)) { this.name += ".Deadline"; this.queueInitArgs = new Object[] { maxQueueLength, - new CallPriorityComparator(conf, this.priority) }; + new CallPriorityComparator(conf, priority) }; this.queueClass = BoundedPriorityBlockingQueue.class; } else if (isCodelQueueType(callQueueType)) { this.name += ".Codel"; @@ -157,16 +162,17 @@ public abstract class RpcExecutor { double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD, CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD); this.queueInitArgs = new Object[] { maxQueueLength, codelTargetDelay, codelInterval, - codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches }; + codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches }; this.queueClass = AdaptiveLifoCoDelCallQueue.class; } else if (isPluggableQueueType(callQueueType)) { - Optional>> pluggableQueueClass = getPluggableQueueClass(); + Optional>> pluggableQueueClass = + getPluggableQueueClass(); if (!pluggableQueueClass.isPresent()) { throw new PluggableRpcQueueNotFound("Pluggable call queue failed to load and selected call" + " queue type required"); } else { - this.queueInitArgs = new Object[] { maxQueueLength, this.priority, conf }; + this.queueInitArgs = new Object[] { maxQueueLength, priority, conf }; this.queueClass = pluggableQueueClass.get(); } } else { @@ -184,50 +190,41 @@ public abstract class RpcExecutor { return Math.max(1, Math.round(handlerCount * callQueuesHandlersFactor)); } + /** + * Return the {@link Descriptors.MethodDescriptor#getName()} from {@code callRunner} or "Unknown". + */ + private static String getMethodName(final CallRunner callRunner) { + return Optional.ofNullable(callRunner) + .map(CallRunner::getRpcCall) + .map(RpcCall::getMethod) + .map(Descriptors.MethodDescriptor::getName) + .orElse("Unknown"); + } + + /** + * Return the {@link RpcCall#getSize()} from {@code callRunner} or 0L. + */ + private static long getRpcCallSize(final CallRunner callRunner) { + return Optional.ofNullable(callRunner) + .map(CallRunner::getRpcCall) + .map(RpcCall::getSize) + .orElse(0L); + } + public Map getCallQueueCountsSummary() { - HashMap callQueueMethodTotalCount = new HashMap<>(); - - for(BlockingQueue queue: queues) { - for (CallRunner cr:queue) { - RpcCall rpcCall = cr.getRpcCall(); - - String method; - - if (null==rpcCall.getMethod() || - StringUtil.isNullOrEmpty(method = rpcCall.getMethod().getName())) { - method = "Unknown"; - } - - callQueueMethodTotalCount.put(method, 1+callQueueMethodTotalCount.getOrDefault(method, 0L)); - } - } - - return callQueueMethodTotalCount; + return queues.stream() + .flatMap(Collection::stream) + .map(RpcExecutor::getMethodName) + .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); } public Map getCallQueueSizeSummary() { - HashMap callQueueMethodTotalSize = new HashMap<>(); - - for(BlockingQueue queue: queues) { - for (CallRunner cr:queue) { - RpcCall rpcCall = cr.getRpcCall(); - String method; - - if (null==rpcCall.getMethod() || - StringUtil.isNullOrEmpty(method = rpcCall.getMethod().getName())) { - method = "Unknown"; - } - - long size = rpcCall.getSize(); - - callQueueMethodTotalSize.put(method, size+callQueueMethodTotalSize.getOrDefault(method, 0L)); - } - } - - return callQueueMethodTotalSize; + return queues.stream() + .flatMap(Collection::stream) + .map(callRunner -> new Pair<>(getMethodName(callRunner), getRpcCallSize(callRunner))) + .collect(Collectors.groupingBy(Pair::getFirst, Collectors.summingLong(Pair::getSecond))); } - protected void initializeQueues(final int numQueues) { if (queueInitArgs.length > 0) { currentQueueLimit = (int) queueInitArgs[0]; @@ -250,7 +247,7 @@ public abstract class RpcExecutor { } /** Add the request to the executor queue */ - public abstract boolean dispatch(final CallRunner callTask) throws InterruptedException; + public abstract boolean dispatch(final CallRunner callTask); /** Returns the list of request queues */ protected List> getQueues() { @@ -296,26 +293,26 @@ public abstract class RpcExecutor { handlers.size(), threadPrefix, qsize, port); } - public static QueueBalancer getBalancer(String executorName, Configuration conf, List> queues) { + /** + * All requests go to the first queue, at index 0 + */ + private static final QueueBalancer ONE_QUEUE = val -> 0; + + public static QueueBalancer getBalancer( + final String executorName, + final Configuration conf, + final List> queues + ) { Preconditions.checkArgument(queues.size() > 0, "Queue size is <= 0, must be at least 1"); if (queues.size() == 1) { return ONE_QUEUE; } else { - Class balancerClass = conf.getClass(CALL_QUEUE_QUEUE_BALANCER_CLASS, CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT); + Class balancerClass = conf.getClass( + CALL_QUEUE_QUEUE_BALANCER_CLASS, CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT); return (QueueBalancer) ReflectionUtils.newInstance(balancerClass, conf, executorName, queues); } } - /** - * All requests go to the first queue, at index 0 - */ - private static QueueBalancer ONE_QUEUE = new QueueBalancer() { - @Override - public int getNextQueue(CallRunner callRunner) { - return 0; - } - }; - /** * Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY is set to true. It * uses the calculated "deadline" e.g. to deprioritize long-running job If multiple requests have @@ -449,7 +446,8 @@ public abstract class RpcExecutor { if (name != null && name.toLowerCase(Locale.ROOT).contains("priority")) { configKey = RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH; } - currentQueueLimit = conf.getInt(configKey, currentQueueLimit); + final int queueLimit = currentQueueLimit; + currentQueueLimit = conf.getInt(configKey, queueLimit); } public void onConfigurationChange(Configuration conf) {