HBASE-12529 Use ThreadLocalRandom for RandomQueueBalancer

This commit is contained in:
Matteo Bertozzi 2014-11-19 16:50:29 +00:00
parent 3f95fe22e0
commit 05ced20a34
3 changed files with 54 additions and 52 deletions

View File

@ -28,8 +28,6 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.ReflectionUtils;
import com.google.common.base.Preconditions;
/** /**
* An {@link RpcExecutor} that will balance requests evenly across all its queues, but still remains * An {@link RpcExecutor} that will balance requests evenly across all its queues, but still remains
* efficient with a single queue via an inlinable queue balancing mechanism. * efficient with a single queue via an inlinable queue balancing mechanism.
@ -39,7 +37,7 @@ import com.google.common.base.Preconditions;
public class BalancedQueueRpcExecutor extends RpcExecutor { public class BalancedQueueRpcExecutor extends RpcExecutor {
protected final List<BlockingQueue<CallRunner>> queues; protected final List<BlockingQueue<CallRunner>> queues;
private QueueBalancer balancer; private final QueueBalancer balancer;
public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues, public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final int maxQueueLength) { final int maxQueueLength) {
@ -80,48 +78,4 @@ public class BalancedQueueRpcExecutor extends RpcExecutor {
public List<BlockingQueue<CallRunner>> getQueues() { public List<BlockingQueue<CallRunner>> getQueues() {
return queues; return queues;
} }
private static abstract class QueueBalancer {
/**
* @return the index of the next queue to which a request should be inserted
*/
public abstract int getNextQueue();
}
public static QueueBalancer getBalancer(int queueSize) {
Preconditions.checkArgument(queueSize > 0, "Queue size is <= 0, must be at least 1");
if (queueSize == 1) {
return ONE_QUEUE;
} else {
return new RandomQueueBalancer(queueSize);
}
}
/**
* All requests go to the first queue, at index 0
*/
private static QueueBalancer ONE_QUEUE = new QueueBalancer() {
@Override
public int getNextQueue() {
return 0;
}
};
/**
* Queue balancer that just randomly selects a queue in the range [0, num queues).
*/
private static class RandomQueueBalancer extends QueueBalancer {
private int queueSize;
private Random random;
public RandomQueueBalancer(int queueSize) {
this.queueSize = queueSize;
this.random = new Random();
}
public int getNextQueue() {
return random.nextInt(queueSize);
}
}
} }

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.ipc;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Random;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
@ -50,7 +49,9 @@ public class RWQueueRpcExecutor extends RpcExecutor {
private static final Log LOG = LogFactory.getLog(RWQueueRpcExecutor.class); private static final Log LOG = LogFactory.getLog(RWQueueRpcExecutor.class);
private final List<BlockingQueue<CallRunner>> queues; private final List<BlockingQueue<CallRunner>> queues;
private final Random balancer = new Random(); private final QueueBalancer writeBalancer;
private final QueueBalancer readBalancer;
private final QueueBalancer scanBalancer;
private final int writeHandlersCount; private final int writeHandlersCount;
private final int readHandlersCount; private final int readHandlersCount;
private final int scanHandlersCount; private final int scanHandlersCount;
@ -115,6 +116,9 @@ public class RWQueueRpcExecutor extends RpcExecutor {
this.numWriteQueues = numWriteQueues; this.numWriteQueues = numWriteQueues;
this.numReadQueues = numReadQueues; this.numReadQueues = numReadQueues;
this.numScanQueues = numScanQueues; this.numScanQueues = numScanQueues;
this.writeBalancer = getBalancer(numWriteQueues);
this.readBalancer = getBalancer(numReadQueues);
this.scanBalancer = getBalancer(numScanQueues);
queues = new ArrayList<BlockingQueue<CallRunner>>(writeHandlersCount + readHandlersCount); queues = new ArrayList<BlockingQueue<CallRunner>>(writeHandlersCount + readHandlersCount);
LOG.debug(name + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount + LOG.debug(name + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount +
@ -146,11 +150,11 @@ public class RWQueueRpcExecutor extends RpcExecutor {
RpcServer.Call call = callTask.getCall(); RpcServer.Call call = callTask.getCall();
int queueIndex; int queueIndex;
if (isWriteRequest(call.getHeader(), call.param)) { if (isWriteRequest(call.getHeader(), call.param)) {
queueIndex = balancer.nextInt(numWriteQueues); queueIndex = writeBalancer.getNextQueue();
} else if (numScanQueues > 0 && isScanRequest(call.getHeader(), call.param)) { } else if (numScanQueues > 0 && isScanRequest(call.getHeader(), call.param)) {
queueIndex = numWriteQueues + numReadQueues + balancer.nextInt(numScanQueues); queueIndex = numWriteQueues + numReadQueues + scanBalancer.getNextQueue();
} else { } else {
queueIndex = numWriteQueues + balancer.nextInt(numReadQueues); queueIndex = numWriteQueues + readBalancer.getNextQueue();
} }
queues.get(queueIndex).put(callTask); queues.get(queueIndex).put(callTask);
} }

View File

@ -22,12 +22,14 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceStability;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings; import com.google.common.base.Strings;
@InterfaceAudience.Private @InterfaceAudience.Private
@ -124,4 +126,46 @@ public abstract class RpcExecutor {
} }
} }
} }
public static abstract class QueueBalancer {
/**
* @return the index of the next queue to which a request should be inserted
*/
public abstract int getNextQueue();
}
public static QueueBalancer getBalancer(int queueSize) {
Preconditions.checkArgument(queueSize > 0, "Queue size is <= 0, must be at least 1");
if (queueSize == 1) {
return ONE_QUEUE;
} else {
return new RandomQueueBalancer(queueSize);
}
}
/**
* All requests go to the first queue, at index 0
*/
private static QueueBalancer ONE_QUEUE = new QueueBalancer() {
@Override
public int getNextQueue() {
return 0;
}
};
/**
* Queue balancer that just randomly selects a queue in the range [0, num queues).
*/
private static class RandomQueueBalancer extends QueueBalancer {
private final int queueSize;
public RandomQueueBalancer(int queueSize) {
this.queueSize = queueSize;
}
public int getNextQueue() {
return ThreadLocalRandom.current().nextInt(queueSize);
}
}
} }