HBASE-12529 Use ThreadLocalRandom for RandomQueueBalancer
This commit is contained in:
parent
6e376b900e
commit
865ffebfc6
|
@ -28,8 +28,6 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
|
|||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* An {@link RpcExecutor} that will balance requests evenly across all its queues, but still remains
|
||||
* efficient with a single queue via an inlinable queue balancing mechanism.
|
||||
|
@ -39,7 +37,7 @@ import com.google.common.base.Preconditions;
|
|||
public class BalancedQueueRpcExecutor extends RpcExecutor {
|
||||
|
||||
protected final List<BlockingQueue<CallRunner>> queues;
|
||||
private QueueBalancer balancer;
|
||||
private final QueueBalancer balancer;
|
||||
|
||||
public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
|
||||
final int maxQueueLength) {
|
||||
|
@ -80,48 +78,4 @@ public class BalancedQueueRpcExecutor extends RpcExecutor {
|
|||
public List<BlockingQueue<CallRunner>> getQueues() {
|
||||
return queues;
|
||||
}
|
||||
|
||||
private static abstract class QueueBalancer {
|
||||
/**
|
||||
* @return the index of the next queue to which a request should be inserted
|
||||
*/
|
||||
public abstract int getNextQueue();
|
||||
}
|
||||
|
||||
public static QueueBalancer getBalancer(int queueSize) {
|
||||
Preconditions.checkArgument(queueSize > 0, "Queue size is <= 0, must be at least 1");
|
||||
if (queueSize == 1) {
|
||||
return ONE_QUEUE;
|
||||
} else {
|
||||
return new RandomQueueBalancer(queueSize);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* All requests go to the first queue, at index 0
|
||||
*/
|
||||
private static QueueBalancer ONE_QUEUE = new QueueBalancer() {
|
||||
|
||||
@Override
|
||||
public int getNextQueue() {
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Queue balancer that just randomly selects a queue in the range [0, num queues).
|
||||
*/
|
||||
private static class RandomQueueBalancer extends QueueBalancer {
|
||||
private int queueSize;
|
||||
private Random random;
|
||||
|
||||
public RandomQueueBalancer(int queueSize) {
|
||||
this.queueSize = queueSize;
|
||||
this.random = new Random();
|
||||
}
|
||||
|
||||
public int getNextQueue() {
|
||||
return random.nextInt(queueSize);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.ipc;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
|
@ -50,7 +49,9 @@ public class RWQueueRpcExecutor extends RpcExecutor {
|
|||
private static final Log LOG = LogFactory.getLog(RWQueueRpcExecutor.class);
|
||||
|
||||
private final List<BlockingQueue<CallRunner>> queues;
|
||||
private final Random balancer = new Random();
|
||||
private final QueueBalancer writeBalancer;
|
||||
private final QueueBalancer readBalancer;
|
||||
private final QueueBalancer scanBalancer;
|
||||
private final int writeHandlersCount;
|
||||
private final int readHandlersCount;
|
||||
private final int scanHandlersCount;
|
||||
|
@ -115,6 +116,9 @@ public class RWQueueRpcExecutor extends RpcExecutor {
|
|||
this.numWriteQueues = numWriteQueues;
|
||||
this.numReadQueues = numReadQueues;
|
||||
this.numScanQueues = numScanQueues;
|
||||
this.writeBalancer = getBalancer(numWriteQueues);
|
||||
this.readBalancer = getBalancer(numReadQueues);
|
||||
this.scanBalancer = getBalancer(numScanQueues);
|
||||
|
||||
queues = new ArrayList<BlockingQueue<CallRunner>>(writeHandlersCount + readHandlersCount);
|
||||
LOG.debug(name + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount +
|
||||
|
@ -146,11 +150,11 @@ public class RWQueueRpcExecutor extends RpcExecutor {
|
|||
RpcServer.Call call = callTask.getCall();
|
||||
int queueIndex;
|
||||
if (isWriteRequest(call.getHeader(), call.param)) {
|
||||
queueIndex = balancer.nextInt(numWriteQueues);
|
||||
queueIndex = writeBalancer.getNextQueue();
|
||||
} else if (numScanQueues > 0 && isScanRequest(call.getHeader(), call.param)) {
|
||||
queueIndex = numWriteQueues + numReadQueues + balancer.nextInt(numScanQueues);
|
||||
queueIndex = numWriteQueues + numReadQueues + scanBalancer.getNextQueue();
|
||||
} else {
|
||||
queueIndex = numWriteQueues + balancer.nextInt(numReadQueues);
|
||||
queueIndex = numWriteQueues + readBalancer.getNextQueue();
|
||||
}
|
||||
queues.get(queueIndex).put(callTask);
|
||||
}
|
||||
|
|
|
@ -22,12 +22,14 @@ import java.util.ArrayList;
|
|||
import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
|
@ -124,4 +126,46 @@ public abstract class RpcExecutor {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static abstract class QueueBalancer {
|
||||
/**
|
||||
* @return the index of the next queue to which a request should be inserted
|
||||
*/
|
||||
public abstract int getNextQueue();
|
||||
}
|
||||
|
||||
public static QueueBalancer getBalancer(int queueSize) {
|
||||
Preconditions.checkArgument(queueSize > 0, "Queue size is <= 0, must be at least 1");
|
||||
if (queueSize == 1) {
|
||||
return ONE_QUEUE;
|
||||
} else {
|
||||
return new RandomQueueBalancer(queueSize);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* All requests go to the first queue, at index 0
|
||||
*/
|
||||
private static QueueBalancer ONE_QUEUE = new QueueBalancer() {
|
||||
|
||||
@Override
|
||||
public int getNextQueue() {
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Queue balancer that just randomly selects a queue in the range [0, num queues).
|
||||
*/
|
||||
private static class RandomQueueBalancer extends QueueBalancer {
|
||||
private final int queueSize;
|
||||
|
||||
public RandomQueueBalancer(int queueSize) {
|
||||
this.queueSize = queueSize;
|
||||
}
|
||||
|
||||
public int getNextQueue() {
|
||||
return ThreadLocalRandom.current().nextInt(queueSize);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue