From 4824b0dea721790f7ff14af317620d26d931a18e Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Mon, 14 Jul 2014 14:39:59 -0700 Subject: [PATCH] HBASE-11513 Combine SingleMultiple Queue RpcExecutor into a single class (Jesse Yates) --- ...tor.java => BalancedQueueRpcExecutor.java} | 83 ++++++++++++++----- .../hadoop/hbase/ipc/SimpleRpcScheduler.java | 27 ++---- .../hbase/ipc/SingleQueueRpcExecutor.java | 66 --------------- 3 files changed, 73 insertions(+), 103 deletions(-) rename hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/{MultipleQueueRpcExecutor.java => BalancedQueueRpcExecutor.java} (51%) delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SingleQueueRpcExecutor.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MultipleQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java similarity index 51% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MultipleQueueRpcExecutor.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java index 71ddfa6b238..7cf2101d4b2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MultipleQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.ipc; import java.util.ArrayList; @@ -29,54 +28,100 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.util.ReflectionUtils; -/** - * RPC Executor that dispatch the requests on multiple queues. - * Each handler has its own queue and there is no stealing. - */ -@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) -@InterfaceStability.Evolving -public class MultipleQueueRpcExecutor extends RpcExecutor { - protected final List> queues; - protected final Random balancer = new Random(); +import com.google.common.base.Preconditions; - public MultipleQueueRpcExecutor(final String name, final int handlerCount, - final int numQueues, final int maxQueueLength) { +/** + * An {@link RpcExecutor} that will balance requests evenly across all its queues, but still remains + * efficient with a single queue via an inlinable queue balancing mechanism. + */ +@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX }) +@InterfaceStability.Evolving +public class BalancedQueueRpcExecutor extends RpcExecutor { + + protected final List> queues; + private QueueBalancer balancer; + + public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues, + final int maxQueueLength) { this(name, handlerCount, numQueues, LinkedBlockingQueue.class, maxQueueLength); } - public MultipleQueueRpcExecutor(final String name, final int handlerCount, - final int numQueues, + public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues, final Class queueClass, Object... initargs) { super(name, Math.max(handlerCount, numQueues)); queues = new ArrayList>(numQueues); + this.balancer = getBalancer(numQueues); initializeQueues(numQueues, queueClass, initargs); } protected void initializeQueues(final int numQueues, final Class queueClass, Object... initargs) { for (int i = 0; i < numQueues; ++i) { - queues.add((BlockingQueue) - ReflectionUtils.newInstance(queueClass, initargs)); + queues.add((BlockingQueue) ReflectionUtils.newInstance(queueClass, initargs)); } } @Override public void dispatch(final CallRunner callTask) throws InterruptedException { - int queueIndex = balancer.nextInt(queues.size()); + int queueIndex = balancer.getNextQueue(); queues.get(queueIndex).put(callTask); } @Override public int getQueueLength() { int length = 0; - for (final BlockingQueue queue: queues) { + for (final BlockingQueue queue : queues) { length += queue.size(); } return length; } @Override - protected List> getQueues() { + public List> getQueues() { return queues; } + + private static abstract class QueueBalancer { + /** + * @return the index of the next queue to which a request should be inserted + */ + public abstract int getNextQueue(); + } + + public static QueueBalancer getBalancer(int queueSize) { + Preconditions.checkArgument(queueSize > 0, "Queue size is <= 0, must be at least 1"); + if (queueSize == 1) { + return ONE_QUEUE; + } else { + return new RandomQueueBalancer(queueSize); + } + } + + /** + * All requests go to the first queue, at index 0 + */ + private static QueueBalancer ONE_QUEUE = new QueueBalancer() { + + @Override + public int getNextQueue() { + return 0; + } + }; + + /** + * Queue balancer that just randomly selects a queue in the range [0, num queues). + */ + private static class RandomQueueBalancer extends QueueBalancer { + private int queueSize; + private Random random; + + public RandomQueueBalancer(int queueSize) { + this.queueSize = queueSize; + this.random = new Random(); + } + + public int getNextQueue() { + return random.nextInt(queueSize); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index 4b46595143d..953bc36e9ae 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -127,33 +127,24 @@ public class SimpleRpcScheduler extends RpcScheduler { callExecutor = new RWQueueRpcExecutor("default", handlerCount, numCallQueues, callqReadShare, maxQueueLength); } - } else if (numCallQueues > 1) { + } else { // multiple queues if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority); - callExecutor = new MultipleQueueRpcExecutor("default", handlerCount, numCallQueues, + callExecutor = new BalancedQueueRpcExecutor("default", handlerCount, numCallQueues, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority); } else { - callExecutor = new MultipleQueueRpcExecutor("default", handlerCount, + callExecutor = new BalancedQueueRpcExecutor("default", handlerCount, numCallQueues, maxQueueLength); } - } else { - // Single queue - if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { - CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority); - callExecutor = new SingleQueueRpcExecutor("default", handlerCount, - BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority); - } else { - callExecutor = new SingleQueueRpcExecutor("default", handlerCount, maxQueueLength); - } } - this.priorityExecutor = priorityHandlerCount > 0 - ? new SingleQueueRpcExecutor("Priority", priorityHandlerCount, maxQueueLength) - : null; - this.replicationExecutor = replicationHandlerCount > 0 - ? new SingleQueueRpcExecutor("Replication", replicationHandlerCount, maxQueueLength) - : null; + this.priorityExecutor = + priorityHandlerCount > 0 ? new BalancedQueueRpcExecutor("Priority", priorityHandlerCount, + 1, maxQueueLength) : null; + this.replicationExecutor = + replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("Replication", + replicationHandlerCount, 1, maxQueueLength) : null; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SingleQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SingleQueueRpcExecutor.java deleted file mode 100644 index b94b14ba8a1..00000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SingleQueueRpcExecutor.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.ipc; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.util.ReflectionUtils; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - -/** - * RPC Executor that uses a single queue for all the requests. - */ -@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) -@InterfaceStability.Evolving -public class SingleQueueRpcExecutor extends RpcExecutor { - private final BlockingQueue queue; - - public SingleQueueRpcExecutor(final String name, final int handlerCount, - final int maxQueueLength) { - this(name, handlerCount, LinkedBlockingQueue.class, maxQueueLength); - } - - public SingleQueueRpcExecutor(final String name, final int handlerCount, - final Class queueClass, Object... initargs) { - super(name, handlerCount); - queue = (BlockingQueue)ReflectionUtils.newInstance(queueClass, initargs); - } - - @Override - public void dispatch(final CallRunner callTask) throws InterruptedException { - queue.put(callTask); - } - - @Override - public int getQueueLength() { - return queue.size(); - } - - @Override - protected List> getQueues() { - List> list = new ArrayList>(1); - list.add(queue); - return list; - } -}