HBASE-11513 Combine SingleMultiple Queue RpcExecutor into a single class (Jesse Yates)

This commit is contained in:
Andrew Purtell 2014-07-14 14:39:59 -07:00
parent 044d62ac25
commit 22f205b09b
3 changed files with 73 additions and 103 deletions

View File

@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hbase.ipc; package org.apache.hadoop.hbase.ipc;
import java.util.ArrayList; import java.util.ArrayList;
@ -29,40 +28,42 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.ReflectionUtils;
import com.google.common.base.Preconditions;
/** /**
* RPC Executor that dispatch the requests on multiple queues. * An {@link RpcExecutor} that will balance requests evenly across all its queues, but still remains
* Each handler has its own queue and there is no stealing. * efficient with a single queue via an inlinable queue balancing mechanism.
*/ */
@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX }) @InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX })
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class MultipleQueueRpcExecutor extends RpcExecutor { public class BalancedQueueRpcExecutor extends RpcExecutor {
protected final List<BlockingQueue<CallRunner>> queues;
protected final Random balancer = new Random();
public MultipleQueueRpcExecutor(final String name, final int handlerCount, protected final List<BlockingQueue<CallRunner>> queues;
final int numQueues, final int maxQueueLength) { private QueueBalancer balancer;
public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final int maxQueueLength) {
this(name, handlerCount, numQueues, LinkedBlockingQueue.class, maxQueueLength); this(name, handlerCount, numQueues, LinkedBlockingQueue.class, maxQueueLength);
} }
public MultipleQueueRpcExecutor(final String name, final int handlerCount, public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final int numQueues,
final Class<? extends BlockingQueue> queueClass, Object... initargs) { final Class<? extends BlockingQueue> queueClass, Object... initargs) {
super(name, Math.max(handlerCount, numQueues)); super(name, Math.max(handlerCount, numQueues));
queues = new ArrayList<BlockingQueue<CallRunner>>(numQueues); queues = new ArrayList<BlockingQueue<CallRunner>>(numQueues);
this.balancer = getBalancer(numQueues);
initializeQueues(numQueues, queueClass, initargs); initializeQueues(numQueues, queueClass, initargs);
} }
protected void initializeQueues(final int numQueues, protected void initializeQueues(final int numQueues,
final Class<? extends BlockingQueue> queueClass, Object... initargs) { final Class<? extends BlockingQueue> queueClass, Object... initargs) {
for (int i = 0; i < numQueues; ++i) { for (int i = 0; i < numQueues; ++i) {
queues.add((BlockingQueue<CallRunner>) queues.add((BlockingQueue<CallRunner>) ReflectionUtils.newInstance(queueClass, initargs));
ReflectionUtils.newInstance(queueClass, initargs));
} }
} }
@Override @Override
public void dispatch(final CallRunner callTask) throws InterruptedException { public void dispatch(final CallRunner callTask) throws InterruptedException {
int queueIndex = balancer.nextInt(queues.size()); int queueIndex = balancer.getNextQueue();
queues.get(queueIndex).put(callTask); queues.get(queueIndex).put(callTask);
} }
@ -76,7 +77,51 @@ public class MultipleQueueRpcExecutor extends RpcExecutor {
} }
@Override @Override
protected List<BlockingQueue<CallRunner>> getQueues() { public List<BlockingQueue<CallRunner>> getQueues() {
return queues; return queues;
} }
private static abstract class QueueBalancer {
/**
* @return the index of the next queue to which a request should be inserted
*/
public abstract int getNextQueue();
}
public static QueueBalancer getBalancer(int queueSize) {
Preconditions.checkArgument(queueSize > 0, "Queue size is <= 0, must be at least 1");
if (queueSize == 1) {
return ONE_QUEUE;
} else {
return new RandomQueueBalancer(queueSize);
}
}
/**
* All requests go to the first queue, at index 0
*/
private static QueueBalancer ONE_QUEUE = new QueueBalancer() {
@Override
public int getNextQueue() {
return 0;
}
};
/**
* Queue balancer that just randomly selects a queue in the range [0, num queues).
*/
private static class RandomQueueBalancer extends QueueBalancer {
private int queueSize;
private Random random;
public RandomQueueBalancer(int queueSize) {
this.queueSize = queueSize;
this.random = new Random();
}
public int getNextQueue() {
return random.nextInt(queueSize);
}
}
} }

View File

@ -127,33 +127,24 @@ public class SimpleRpcScheduler extends RpcScheduler {
callExecutor = new RWQueueRpcExecutor("default", handlerCount, numCallQueues, callExecutor = new RWQueueRpcExecutor("default", handlerCount, numCallQueues,
callqReadShare, maxQueueLength); callqReadShare, maxQueueLength);
} }
} else if (numCallQueues > 1) { } else {
// multiple queues // multiple queues
if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority); CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
callExecutor = new MultipleQueueRpcExecutor("default", handlerCount, numCallQueues, callExecutor = new BalancedQueueRpcExecutor("default", handlerCount, numCallQueues,
BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority); BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
} else { } else {
callExecutor = new MultipleQueueRpcExecutor("default", handlerCount, callExecutor = new BalancedQueueRpcExecutor("default", handlerCount,
numCallQueues, maxQueueLength); numCallQueues, maxQueueLength);
} }
} else {
// Single queue
if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
callExecutor = new SingleQueueRpcExecutor("default", handlerCount,
BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
} else {
callExecutor = new SingleQueueRpcExecutor("default", handlerCount, maxQueueLength);
}
} }
this.priorityExecutor = priorityHandlerCount > 0 this.priorityExecutor =
? new SingleQueueRpcExecutor("Priority", priorityHandlerCount, maxQueueLength) priorityHandlerCount > 0 ? new BalancedQueueRpcExecutor("Priority", priorityHandlerCount,
: null; 1, maxQueueLength) : null;
this.replicationExecutor = replicationHandlerCount > 0 this.replicationExecutor =
? new SingleQueueRpcExecutor("Replication", replicationHandlerCount, maxQueueLength) replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("Replication",
: null; replicationHandlerCount, 1, maxQueueLength) : null;
} }
@Override @Override

View File

@ -1,66 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* RPC Executor that uses a single queue for all the requests.
*/
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
@InterfaceStability.Evolving
public class SingleQueueRpcExecutor extends RpcExecutor {
private final BlockingQueue<CallRunner> queue;
public SingleQueueRpcExecutor(final String name, final int handlerCount,
final int maxQueueLength) {
this(name, handlerCount, LinkedBlockingQueue.class, maxQueueLength);
}
public SingleQueueRpcExecutor(final String name, final int handlerCount,
final Class<? extends BlockingQueue> queueClass, Object... initargs) {
super(name, handlerCount);
queue = (BlockingQueue<CallRunner>)ReflectionUtils.newInstance(queueClass, initargs);
}
@Override
public void dispatch(final CallRunner callTask) throws InterruptedException {
queue.put(callTask);
}
@Override
public int getQueueLength() {
return queue.size();
}
@Override
protected List<BlockingQueue<CallRunner>> getQueues() {
List<BlockingQueue<CallRunner>> list = new ArrayList<BlockingQueue<CallRunner>>(1);
list.add(queue);
return list;
}
}