diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java index 40ba9fea48a..2418cf73072 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java @@ -28,8 +28,6 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.util.ReflectionUtils; -import com.google.common.base.Preconditions; - /** * An {@link RpcExecutor} that will balance requests evenly across all its queues, but still remains * efficient with a single queue via an inlinable queue balancing mechanism. @@ -39,7 +37,7 @@ import com.google.common.base.Preconditions; public class BalancedQueueRpcExecutor extends RpcExecutor { protected final List> queues; - private QueueBalancer balancer; + private final QueueBalancer balancer; public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues, final int maxQueueLength) { @@ -80,48 +78,4 @@ public class BalancedQueueRpcExecutor extends RpcExecutor { public List> getQueues() { return queues; } - - private static abstract class QueueBalancer { - /** - * @return the index of the next queue to which a request should be inserted - */ - public abstract int getNextQueue(); - } - - public static QueueBalancer getBalancer(int queueSize) { - Preconditions.checkArgument(queueSize > 0, "Queue size is <= 0, must be at least 1"); - if (queueSize == 1) { - return ONE_QUEUE; - } else { - return new RandomQueueBalancer(queueSize); - } - } - - /** - * All requests go to the first queue, at index 0 - */ - private static QueueBalancer ONE_QUEUE = new QueueBalancer() { - - @Override - public int getNextQueue() { - return 0; - } - }; - - /** - * Queue balancer that just randomly selects a queue in the range [0, num queues). - */ - private static class RandomQueueBalancer extends QueueBalancer { - private int queueSize; - private Random random; - - public RandomQueueBalancer(int queueSize) { - this.queueSize = queueSize; - this.random = new Random(); - } - - public int getNextQueue() { - return random.nextInt(queueSize); - } - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java index 4f0448d92b0..deac2f8f11a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.ipc; import java.util.ArrayList; import java.util.List; -import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -50,7 +49,9 @@ public class RWQueueRpcExecutor extends RpcExecutor { private static final Log LOG = LogFactory.getLog(RWQueueRpcExecutor.class); private final List> queues; - private final Random balancer = new Random(); + private final QueueBalancer writeBalancer; + private final QueueBalancer readBalancer; + private final QueueBalancer scanBalancer; private final int writeHandlersCount; private final int readHandlersCount; private final int scanHandlersCount; @@ -115,6 +116,9 @@ public class RWQueueRpcExecutor extends RpcExecutor { this.numWriteQueues = numWriteQueues; this.numReadQueues = numReadQueues; this.numScanQueues = numScanQueues; + this.writeBalancer = getBalancer(numWriteQueues); + this.readBalancer = getBalancer(numReadQueues); + this.scanBalancer = getBalancer(numScanQueues); queues = new ArrayList>(writeHandlersCount + readHandlersCount); LOG.debug(name + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount + @@ -146,11 +150,11 @@ public class RWQueueRpcExecutor extends RpcExecutor { RpcServer.Call call = callTask.getCall(); int queueIndex; if (isWriteRequest(call.getHeader(), call.param)) { - queueIndex = balancer.nextInt(numWriteQueues); + queueIndex = writeBalancer.getNextQueue(); } else if (numScanQueues > 0 && isScanRequest(call.getHeader(), call.param)) { - queueIndex = numWriteQueues + numReadQueues + balancer.nextInt(numScanQueues); + queueIndex = numWriteQueues + numReadQueues + scanBalancer.getNextQueue(); } else { - queueIndex = numWriteQueues + balancer.nextInt(numReadQueues); + queueIndex = numWriteQueues + readBalancer.getNextQueue(); } queues.get(queueIndex).put(callTask); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java index 5fe259076d9..1b0934cb520 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java @@ -22,12 +22,14 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ThreadLocalRandom; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import com.google.common.base.Preconditions; import com.google.common.base.Strings; @InterfaceAudience.Private @@ -124,4 +126,46 @@ public abstract class RpcExecutor { } } } + + public static abstract class QueueBalancer { + /** + * @return the index of the next queue to which a request should be inserted + */ + public abstract int getNextQueue(); + } + + public static QueueBalancer getBalancer(int queueSize) { + Preconditions.checkArgument(queueSize > 0, "Queue size is <= 0, must be at least 1"); + if (queueSize == 1) { + return ONE_QUEUE; + } else { + return new RandomQueueBalancer(queueSize); + } + } + + /** + * All requests go to the first queue, at index 0 + */ + private static QueueBalancer ONE_QUEUE = new QueueBalancer() { + + @Override + public int getNextQueue() { + return 0; + } + }; + + /** + * Queue balancer that just randomly selects a queue in the range [0, num queues). + */ + private static class RandomQueueBalancer extends QueueBalancer { + private final int queueSize; + + public RandomQueueBalancer(int queueSize) { + this.queueSize = queueSize; + } + + public int getNextQueue() { + return ThreadLocalRandom.current().nextInt(queueSize); + } + } }