HBASE-26782 Minor code cleanup in and around RpcExecutor
Signed-off-by: Bharath Vissapragada <bharathv@apache.org> Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
46ca0dbba6
commit
9359449a3a
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -52,7 +52,7 @@ public class BalancedQueueRpcExecutor extends RpcExecutor {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean dispatch(final CallRunner callTask) throws InterruptedException {
|
public boolean dispatch(final CallRunner callTask) {
|
||||||
int queueIndex = balancer.getNextQueue(callTask);
|
int queueIndex = balancer.getNextQueue(callTask);
|
||||||
BlockingQueue<CallRunner> queue = queues.get(queueIndex);
|
BlockingQueue<CallRunner> queue = queues.get(queueIndex);
|
||||||
// that means we can overflow by at most <num reader> size (5), that's ok
|
// that means we can overflow by at most <num reader> size (5), that's ok
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -21,7 +21,6 @@ import java.util.Deque;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.ConcurrentLinkedDeque;
|
import java.util.concurrent.ConcurrentLinkedDeque;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Abortable;
|
import org.apache.hadoop.hbase.Abortable;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
@ -64,7 +63,7 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean dispatch(CallRunner callTask) throws InterruptedException {
|
public boolean dispatch(CallRunner callTask) {
|
||||||
//FastPathHandlers don't check queue limits, so if we're completely shut down
|
//FastPathHandlers don't check queue limits, so if we're completely shut down
|
||||||
//we have to prevent ourselves from using the handler in the first place
|
//we have to prevent ourselves from using the handler in the first place
|
||||||
if (currentQueueLimit == 0){
|
if (currentQueueLimit == 0){
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -26,8 +26,6 @@ import org.apache.hadoop.hbase.Abortable;
|
||||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.yetus.audience.InterfaceStability;
|
import org.apache.yetus.audience.InterfaceStability;
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* RPC Executor that extends {@link RWQueueRpcExecutor} with fast-path feature, used in
|
* RPC Executor that extends {@link RWQueueRpcExecutor} with fast-path feature, used in
|
||||||
|
@ -36,7 +34,6 @@ import org.slf4j.LoggerFactory;
|
||||||
@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
|
@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public class FastPathRWQueueRpcExecutor extends RWQueueRpcExecutor {
|
public class FastPathRWQueueRpcExecutor extends RWQueueRpcExecutor {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(RWQueueRpcExecutor.class);
|
|
||||||
|
|
||||||
private final Deque<FastPathRpcHandler> readHandlerStack = new ConcurrentLinkedDeque<>();
|
private final Deque<FastPathRpcHandler> readHandlerStack = new ConcurrentLinkedDeque<>();
|
||||||
private final Deque<FastPathRpcHandler> writeHandlerStack = new ConcurrentLinkedDeque<>();
|
private final Deque<FastPathRpcHandler> writeHandlerStack = new ConcurrentLinkedDeque<>();
|
||||||
|
@ -58,7 +55,8 @@ public class FastPathRWQueueRpcExecutor extends RWQueueRpcExecutor {
|
||||||
activeHandlerCount, failedHandlerCount, abortable, handlerStack);
|
activeHandlerCount, failedHandlerCount, abortable, handlerStack);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public boolean dispatch(final CallRunner callTask) throws InterruptedException {
|
@Override
|
||||||
|
public boolean dispatch(final CallRunner callTask) {
|
||||||
RpcCall call = callTask.getRpcCall();
|
RpcCall call = callTask.getRpcCall();
|
||||||
boolean shouldDispatchToWriteQueue = isWriteRequest(call.getHeader(), call.getParam());
|
boolean shouldDispatchToWriteQueue = isWriteRequest(call.getHeader(), call.getParam());
|
||||||
boolean shouldDispatchToScanQueue = shouldDispatchToScanQueue(callTask);
|
boolean shouldDispatchToScanQueue = shouldDispatchToScanQueue(callTask);
|
||||||
|
|
|
@ -131,7 +131,7 @@ public class RWQueueRpcExecutor extends RpcExecutor {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean dispatch(final CallRunner callTask) throws InterruptedException {
|
public boolean dispatch(final CallRunner callTask) {
|
||||||
RpcCall call = callTask.getRpcCall();
|
RpcCall call = callTask.getRpcCall();
|
||||||
return dispatchTo(isWriteRequest(call.getHeader(), call.getParam()),
|
return dispatchTo(isWriteRequest(call.getHeader(), call.getParam()),
|
||||||
shouldDispatchToScanQueue(callTask), callTask);
|
shouldDispatchToScanQueue(callTask), callTask);
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -19,8 +19,8 @@
|
||||||
package org.apache.hadoop.hbase.ipc;
|
package org.apache.hadoop.hbase.ipc;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -29,18 +29,21 @@ import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.LongAdder;
|
import java.util.concurrent.atomic.LongAdder;
|
||||||
|
import java.util.function.Function;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Abortable;
|
import org.apache.hadoop.hbase.Abortable;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
||||||
import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
|
import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
|
||||||
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||||
import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.base.Strings;
|
import org.apache.hbase.thirdparty.com.google.common.base.Strings;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Runs the CallRunners passed here via {@link #dispatch(CallRunner)}. Subclass and add particular
|
* Runs the CallRunners passed here via {@link #dispatch(CallRunner)}. Subclass and add particular
|
||||||
|
@ -51,14 +54,16 @@ public abstract class RpcExecutor {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(RpcExecutor.class);
|
private static final Logger LOG = LoggerFactory.getLogger(RpcExecutor.class);
|
||||||
|
|
||||||
protected static final int DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT = 250;
|
protected static final int DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT = 250;
|
||||||
public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = "hbase.ipc.server.callqueue.handler.factor";
|
public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
|
||||||
|
"hbase.ipc.server.callqueue.handler.factor";
|
||||||
|
|
||||||
/** max delay in msec used to bound the deprioritized requests */
|
/** max delay in msec used to bound the de-prioritized requests */
|
||||||
public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY = "hbase.ipc.server.queue.max.call.delay";
|
public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY =
|
||||||
|
"hbase.ipc.server.queue.max.call.delay";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The default, 'fifo', has the least friction but is dumb. If set to 'deadline', uses a priority
|
* The default, 'fifo', has the least friction but is dumb. If set to 'deadline', uses a priority
|
||||||
* queue and deprioritizes long-running scans. Sorting by priority comes at a cost, reduced
|
* queue and de-prioritizes long-running scans. Sorting by priority comes at a cost, reduced
|
||||||
* throughput.
|
* throughput.
|
||||||
*/
|
*/
|
||||||
public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel";
|
public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel";
|
||||||
|
@ -68,14 +73,18 @@ public abstract class RpcExecutor {
|
||||||
public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type";
|
public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type";
|
||||||
public static final String CALL_QUEUE_TYPE_CONF_DEFAULT = CALL_QUEUE_TYPE_FIFO_CONF_VALUE;
|
public static final String CALL_QUEUE_TYPE_CONF_DEFAULT = CALL_QUEUE_TYPE_FIFO_CONF_VALUE;
|
||||||
|
|
||||||
public static final String CALL_QUEUE_QUEUE_BALANCER_CLASS = "hbase.ipc.server.callqueue.balancer.class";
|
public static final String CALL_QUEUE_QUEUE_BALANCER_CLASS =
|
||||||
|
"hbase.ipc.server.callqueue.balancer.class";
|
||||||
public static final Class<?> CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT = RandomQueueBalancer.class;
|
public static final Class<?> CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT = RandomQueueBalancer.class;
|
||||||
|
|
||||||
|
|
||||||
// These 3 are only used by Codel executor
|
// These 3 are only used by Codel executor
|
||||||
public static final String CALL_QUEUE_CODEL_TARGET_DELAY = "hbase.ipc.server.callqueue.codel.target.delay";
|
public static final String CALL_QUEUE_CODEL_TARGET_DELAY =
|
||||||
public static final String CALL_QUEUE_CODEL_INTERVAL = "hbase.ipc.server.callqueue.codel.interval";
|
"hbase.ipc.server.callqueue.codel.target.delay";
|
||||||
public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD = "hbase.ipc.server.callqueue.codel.lifo.threshold";
|
public static final String CALL_QUEUE_CODEL_INTERVAL =
|
||||||
|
"hbase.ipc.server.callqueue.codel.interval";
|
||||||
|
public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD =
|
||||||
|
"hbase.ipc.server.callqueue.codel.lifo.threshold";
|
||||||
|
|
||||||
public static final int CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY = 100;
|
public static final int CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY = 100;
|
||||||
public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL = 100;
|
public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL = 100;
|
||||||
|
@ -86,16 +95,14 @@ public abstract class RpcExecutor {
|
||||||
public static final String PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED =
|
public static final String PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED =
|
||||||
"hbase.ipc.server.callqueue.pluggable.queue.fast.path.enabled";
|
"hbase.ipc.server.callqueue.pluggable.queue.fast.path.enabled";
|
||||||
|
|
||||||
private LongAdder numGeneralCallsDropped = new LongAdder();
|
private final LongAdder numGeneralCallsDropped = new LongAdder();
|
||||||
private LongAdder numLifoModeSwitches = new LongAdder();
|
private final LongAdder numLifoModeSwitches = new LongAdder();
|
||||||
|
|
||||||
protected final int numCallQueues;
|
protected final int numCallQueues;
|
||||||
protected final List<BlockingQueue<CallRunner>> queues;
|
protected final List<BlockingQueue<CallRunner>> queues;
|
||||||
private final Class<? extends BlockingQueue> queueClass;
|
private final Class<? extends BlockingQueue> queueClass;
|
||||||
private final Object[] queueInitArgs;
|
private final Object[] queueInitArgs;
|
||||||
|
|
||||||
private final PriorityFunction priority;
|
|
||||||
|
|
||||||
protected volatile int currentQueueLimit;
|
protected volatile int currentQueueLimit;
|
||||||
|
|
||||||
private final AtomicInteger activeHandlerCount = new AtomicInteger(0);
|
private final AtomicInteger activeHandlerCount = new AtomicInteger(0);
|
||||||
|
@ -105,8 +112,8 @@ public abstract class RpcExecutor {
|
||||||
|
|
||||||
private String name;
|
private String name;
|
||||||
|
|
||||||
private Configuration conf = null;
|
private final Configuration conf;
|
||||||
private Abortable abortable = null;
|
private final Abortable abortable;
|
||||||
|
|
||||||
public RpcExecutor(final String name, final int handlerCount, final int maxQueueLength,
|
public RpcExecutor(final String name, final int handlerCount, final int maxQueueLength,
|
||||||
final PriorityFunction priority, final Configuration conf, final Abortable abortable) {
|
final PriorityFunction priority, final Configuration conf, final Abortable abortable) {
|
||||||
|
@ -142,12 +149,10 @@ public abstract class RpcExecutor {
|
||||||
this.handlerCount = Math.max(handlerCount, this.numCallQueues);
|
this.handlerCount = Math.max(handlerCount, this.numCallQueues);
|
||||||
this.handlers = new ArrayList<>(this.handlerCount);
|
this.handlers = new ArrayList<>(this.handlerCount);
|
||||||
|
|
||||||
this.priority = priority;
|
|
||||||
|
|
||||||
if (isDeadlineQueueType(callQueueType)) {
|
if (isDeadlineQueueType(callQueueType)) {
|
||||||
this.name += ".Deadline";
|
this.name += ".Deadline";
|
||||||
this.queueInitArgs = new Object[] { maxQueueLength,
|
this.queueInitArgs = new Object[] { maxQueueLength,
|
||||||
new CallPriorityComparator(conf, this.priority) };
|
new CallPriorityComparator(conf, priority) };
|
||||||
this.queueClass = BoundedPriorityBlockingQueue.class;
|
this.queueClass = BoundedPriorityBlockingQueue.class;
|
||||||
} else if (isCodelQueueType(callQueueType)) {
|
} else if (isCodelQueueType(callQueueType)) {
|
||||||
this.name += ".Codel";
|
this.name += ".Codel";
|
||||||
|
@ -160,13 +165,14 @@ public abstract class RpcExecutor {
|
||||||
codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches };
|
codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches };
|
||||||
this.queueClass = AdaptiveLifoCoDelCallQueue.class;
|
this.queueClass = AdaptiveLifoCoDelCallQueue.class;
|
||||||
} else if (isPluggableQueueType(callQueueType)) {
|
} else if (isPluggableQueueType(callQueueType)) {
|
||||||
Optional<Class<? extends BlockingQueue<CallRunner>>> pluggableQueueClass = getPluggableQueueClass();
|
Optional<Class<? extends BlockingQueue<CallRunner>>> pluggableQueueClass =
|
||||||
|
getPluggableQueueClass();
|
||||||
|
|
||||||
if (!pluggableQueueClass.isPresent()) {
|
if (!pluggableQueueClass.isPresent()) {
|
||||||
throw new PluggableRpcQueueNotFound("Pluggable call queue failed to load and selected call"
|
throw new PluggableRpcQueueNotFound("Pluggable call queue failed to load and selected call"
|
||||||
+ " queue type required");
|
+ " queue type required");
|
||||||
} else {
|
} else {
|
||||||
this.queueInitArgs = new Object[] { maxQueueLength, this.priority, conf };
|
this.queueInitArgs = new Object[] { maxQueueLength, priority, conf };
|
||||||
this.queueClass = pluggableQueueClass.get();
|
this.queueClass = pluggableQueueClass.get();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -184,50 +190,41 @@ public abstract class RpcExecutor {
|
||||||
return Math.max(1, Math.round(handlerCount * callQueuesHandlersFactor));
|
return Math.max(1, Math.round(handlerCount * callQueuesHandlersFactor));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the {@link Descriptors.MethodDescriptor#getName()} from {@code callRunner} or "Unknown".
|
||||||
|
*/
|
||||||
|
private static String getMethodName(final CallRunner callRunner) {
|
||||||
|
return Optional.ofNullable(callRunner)
|
||||||
|
.map(CallRunner::getRpcCall)
|
||||||
|
.map(RpcCall::getMethod)
|
||||||
|
.map(Descriptors.MethodDescriptor::getName)
|
||||||
|
.orElse("Unknown");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the {@link RpcCall#getSize()} from {@code callRunner} or 0L.
|
||||||
|
*/
|
||||||
|
private static long getRpcCallSize(final CallRunner callRunner) {
|
||||||
|
return Optional.ofNullable(callRunner)
|
||||||
|
.map(CallRunner::getRpcCall)
|
||||||
|
.map(RpcCall::getSize)
|
||||||
|
.orElse(0L);
|
||||||
|
}
|
||||||
|
|
||||||
public Map<String, Long> getCallQueueCountsSummary() {
|
public Map<String, Long> getCallQueueCountsSummary() {
|
||||||
HashMap<String, Long> callQueueMethodTotalCount = new HashMap<>();
|
return queues.stream()
|
||||||
|
.flatMap(Collection::stream)
|
||||||
for(BlockingQueue<CallRunner> queue: queues) {
|
.map(RpcExecutor::getMethodName)
|
||||||
for (CallRunner cr:queue) {
|
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
|
||||||
RpcCall rpcCall = cr.getRpcCall();
|
|
||||||
|
|
||||||
String method;
|
|
||||||
|
|
||||||
if (null==rpcCall.getMethod() ||
|
|
||||||
StringUtil.isNullOrEmpty(method = rpcCall.getMethod().getName())) {
|
|
||||||
method = "Unknown";
|
|
||||||
}
|
|
||||||
|
|
||||||
callQueueMethodTotalCount.put(method, 1+callQueueMethodTotalCount.getOrDefault(method, 0L));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return callQueueMethodTotalCount;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Map<String, Long> getCallQueueSizeSummary() {
|
public Map<String, Long> getCallQueueSizeSummary() {
|
||||||
HashMap<String, Long> callQueueMethodTotalSize = new HashMap<>();
|
return queues.stream()
|
||||||
|
.flatMap(Collection::stream)
|
||||||
for(BlockingQueue<CallRunner> queue: queues) {
|
.map(callRunner -> new Pair<>(getMethodName(callRunner), getRpcCallSize(callRunner)))
|
||||||
for (CallRunner cr:queue) {
|
.collect(Collectors.groupingBy(Pair::getFirst, Collectors.summingLong(Pair::getSecond)));
|
||||||
RpcCall rpcCall = cr.getRpcCall();
|
|
||||||
String method;
|
|
||||||
|
|
||||||
if (null==rpcCall.getMethod() ||
|
|
||||||
StringUtil.isNullOrEmpty(method = rpcCall.getMethod().getName())) {
|
|
||||||
method = "Unknown";
|
|
||||||
}
|
}
|
||||||
|
|
||||||
long size = rpcCall.getSize();
|
|
||||||
|
|
||||||
callQueueMethodTotalSize.put(method, size+callQueueMethodTotalSize.getOrDefault(method, 0L));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return callQueueMethodTotalSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
protected void initializeQueues(final int numQueues) {
|
protected void initializeQueues(final int numQueues) {
|
||||||
if (queueInitArgs.length > 0) {
|
if (queueInitArgs.length > 0) {
|
||||||
currentQueueLimit = (int) queueInitArgs[0];
|
currentQueueLimit = (int) queueInitArgs[0];
|
||||||
|
@ -250,7 +247,7 @@ public abstract class RpcExecutor {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Add the request to the executor queue */
|
/** Add the request to the executor queue */
|
||||||
public abstract boolean dispatch(final CallRunner callTask) throws InterruptedException;
|
public abstract boolean dispatch(final CallRunner callTask);
|
||||||
|
|
||||||
/** Returns the list of request queues */
|
/** Returns the list of request queues */
|
||||||
protected List<BlockingQueue<CallRunner>> getQueues() {
|
protected List<BlockingQueue<CallRunner>> getQueues() {
|
||||||
|
@ -296,26 +293,26 @@ public abstract class RpcExecutor {
|
||||||
handlers.size(), threadPrefix, qsize, port);
|
handlers.size(), threadPrefix, qsize, port);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static QueueBalancer getBalancer(String executorName, Configuration conf, List<BlockingQueue<CallRunner>> queues) {
|
/**
|
||||||
|
* All requests go to the first queue, at index 0
|
||||||
|
*/
|
||||||
|
private static final QueueBalancer ONE_QUEUE = val -> 0;
|
||||||
|
|
||||||
|
public static QueueBalancer getBalancer(
|
||||||
|
final String executorName,
|
||||||
|
final Configuration conf,
|
||||||
|
final List<BlockingQueue<CallRunner>> queues
|
||||||
|
) {
|
||||||
Preconditions.checkArgument(queues.size() > 0, "Queue size is <= 0, must be at least 1");
|
Preconditions.checkArgument(queues.size() > 0, "Queue size is <= 0, must be at least 1");
|
||||||
if (queues.size() == 1) {
|
if (queues.size() == 1) {
|
||||||
return ONE_QUEUE;
|
return ONE_QUEUE;
|
||||||
} else {
|
} else {
|
||||||
Class<?> balancerClass = conf.getClass(CALL_QUEUE_QUEUE_BALANCER_CLASS, CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT);
|
Class<?> balancerClass = conf.getClass(
|
||||||
|
CALL_QUEUE_QUEUE_BALANCER_CLASS, CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT);
|
||||||
return (QueueBalancer) ReflectionUtils.newInstance(balancerClass, conf, executorName, queues);
|
return (QueueBalancer) ReflectionUtils.newInstance(balancerClass, conf, executorName, queues);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* All requests go to the first queue, at index 0
|
|
||||||
*/
|
|
||||||
private static QueueBalancer ONE_QUEUE = new QueueBalancer() {
|
|
||||||
@Override
|
|
||||||
public int getNextQueue(CallRunner callRunner) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY is set to true. It
|
* Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY is set to true. It
|
||||||
* uses the calculated "deadline" e.g. to deprioritize long-running job If multiple requests have
|
* uses the calculated "deadline" e.g. to deprioritize long-running job If multiple requests have
|
||||||
|
@ -449,7 +446,8 @@ public abstract class RpcExecutor {
|
||||||
if (name != null && name.toLowerCase(Locale.ROOT).contains("priority")) {
|
if (name != null && name.toLowerCase(Locale.ROOT).contains("priority")) {
|
||||||
configKey = RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH;
|
configKey = RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH;
|
||||||
}
|
}
|
||||||
currentQueueLimit = conf.getInt(configKey, currentQueueLimit);
|
final int queueLimit = currentQueueLimit;
|
||||||
|
currentQueueLimit = conf.getInt(configKey, queueLimit);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void onConfigurationChange(Configuration conf) {
|
public void onConfigurationChange(Configuration conf) {
|
||||||
|
|
Loading…
Reference in New Issue