HBASE-11724 Add to RWQueueRpcExecutor the ability to split get and scan handlers

This commit is contained in:
Matteo Bertozzi 2014-08-14 20:52:11 +01:00
parent 865fae8a80
commit e1e70a7e2f
4 changed files with 194 additions and 45 deletions

View File

@ -187,13 +187,50 @@ possible configurations would overwhelm and obscure the important.
A value of 0 means a single queue shared between all the handlers.
A value of 1 means that each handler has its own queue.</description>
</property>
<property>
<name>hbase.ipc.server.callqueue.read.share</name>
<property>
<name>hbase.ipc.server.callqueue.read.ratio</name>
<value>0</value>
<description>Split the call queues into read and write queues.
A value of 0 indicate to not split the call queues.
A value of 0.5 means there will be the same number of read and write queues
The specified interval (which should be between 0.0 and 1.0)
will be multiplied by the number of call queues.
A value of 0 indicate to not split the call queues, meaning that both read and write
requests will be pushed to the same set of queues.
A value lower than 0.5 means that there will be less read queues than write queues.
A value of 0.5 means there will be the same number of read and write queues.
A value greater than 0.5 means that there will be more read queues than write queues.
A value of 1.0 means that all the queues except one are used to dispatch read requests.
Example: Given the total number of call queues being 10
a read.ratio of 0 means that: the 10 queues will contain both read/write requests.
a read.ratio of 0.3 means that: 3 queues will contain only read requests
and 7 queues will contain only write requests.
a read.ratio of 0.5 means that: 5 queues will contain only read requests
and 5 queues will contain only write requests.
a read.ratio of 0.8 means that: 8 queues will contain only read requests
and 2 queues will contain only write requests.
a read.ratio of 1 means that: 9 queues will contain only read requests
and 1 queues will contain only write requests.
</description>
</property>
<property>
<name>hbase.ipc.server.callqueue.scan.ratio</name>
<value>0</value>
<description>Given the number of read call queues, calculated from the total number
of call queues multiplied by the callqueue.read.ratio, the scan.ratio property
will split the read call queues into small-read and long-read queues.
A value lower than 0.5 means that there will be less long-read queues than short-read queues.
A value of 0.5 means that there will be the same number of short-read and long-read queues.
A value greater than 0.5 means that there will be more long-read queues than short-read queues
A value of 0 or 1 indicate to use the same set of queues for gets and scans.
Example: Given the total number of read call queues being 8
a scan.ratio of 0 or 1 means that: 8 queues will contain both long and short read requests.
a scan.ratio of 0.3 means that: 2 queues will contain only long-read requests
and 6 queues will contain only short-read requests.
a scan.ratio of 0.5 means that: 4 queues will contain only long-read requests
and 4 queues will contain only short-read requests.
a scan.ratio of 0.8 means that: 6 queues will contain only long-read requests
and 2 queues will contain only short-read requests.
</description>
</property>
<property>

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.util.ReflectionUtils;
@ -40,6 +41,7 @@ import com.google.protobuf.Message;
/**
* RPC Executor that uses different queues for reads and writes.
* With the options to use different queues/executors for gets and scans.
* Each handler has its own queue and there is no stealing.
*/
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
@ -51,20 +53,34 @@ public class RWQueueRpcExecutor extends RpcExecutor {
private final Random balancer = new Random();
private final int writeHandlersCount;
private final int readHandlersCount;
private final int scanHandlersCount;
private final int numWriteQueues;
private final int numReadQueues;
private final int numScanQueues;
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final float readShare, final int maxQueueLength) {
this(name, handlerCount, numQueues, readShare, maxQueueLength,
this(name, handlerCount, numQueues, readShare, maxQueueLength, 0, LinkedBlockingQueue.class);
}
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final float readShare, final float scanShare, final int maxQueueLength) {
this(name, handlerCount, numQueues, readShare, scanShare, maxQueueLength,
LinkedBlockingQueue.class);
}
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final float readShare, final int maxQueueLength,
final Class<? extends BlockingQueue> readQueueClass, Object... readQueueInitArgs) {
this(name, handlerCount, numQueues, readShare, 0, maxQueueLength,
readQueueClass, readQueueInitArgs);
}
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final float readShare, final float scanShare, final int maxQueueLength,
final Class<? extends BlockingQueue> readQueueClass, Object... readQueueInitArgs) {
this(name, calcNumWriters(handlerCount, readShare), calcNumReaders(handlerCount, readShare),
calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare),
calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare), scanShare,
LinkedBlockingQueue.class, new Object[] {maxQueueLength},
readQueueClass, ArrayUtils.addAll(new Object[] {maxQueueLength}, readQueueInitArgs));
}
@ -73,23 +89,45 @@ public class RWQueueRpcExecutor extends RpcExecutor {
final int numWriteQueues, final int numReadQueues,
final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
final Class<? extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
super(name, Math.max(writeHandlers + readHandlers, numWriteQueues + numReadQueues));
this(name, writeHandlers, readHandlers, numWriteQueues, numReadQueues, 0,
writeQueueClass, writeQueueInitArgs, readQueueClass, readQueueInitArgs);
}
public RWQueueRpcExecutor(final String name, int writeHandlers, int readHandlers,
int numWriteQueues, int numReadQueues, float scanShare,
final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
final Class<? extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
super(name, Math.max(writeHandlers, numWriteQueues) + Math.max(readHandlers, numReadQueues));
int numScanQueues = Math.max(0, (int)Math.floor(numReadQueues * scanShare));
int scanHandlers = Math.max(0, (int)Math.floor(readHandlers * scanShare));
if ((numReadQueues - numScanQueues) > 0) {
numReadQueues -= numScanQueues;
readHandlers -= scanHandlers;
} else {
numScanQueues = 0;
scanHandlers = 0;
}
this.writeHandlersCount = Math.max(writeHandlers, numWriteQueues);
this.readHandlersCount = Math.max(readHandlers, numReadQueues);
this.scanHandlersCount = Math.max(scanHandlers, numScanQueues);
this.numWriteQueues = numWriteQueues;
this.numReadQueues = numReadQueues;
this.numScanQueues = numScanQueues;
queues = new ArrayList<BlockingQueue<CallRunner>>(writeHandlersCount + readHandlersCount);
LOG.debug(name + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount +
" readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount);
" readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount +
((numScanQueues == 0) ? "" : " scanQueues=" + numScanQueues +
" scanHandlers=" + scanHandlersCount));
for (int i = 0; i < numWriteQueues; ++i) {
queues.add((BlockingQueue<CallRunner>)
ReflectionUtils.newInstance(writeQueueClass, writeQueueInitArgs));
}
for (int i = 0; i < numReadQueues; ++i) {
for (int i = 0; i < (numReadQueues + numScanQueues); ++i) {
queues.add((BlockingQueue<CallRunner>)
ReflectionUtils.newInstance(readQueueClass, readQueueInitArgs));
}
@ -99,6 +137,8 @@ public class RWQueueRpcExecutor extends RpcExecutor {
protected void startHandlers(final int port) {
startHandlers(".write", writeHandlersCount, queues, 0, numWriteQueues, port);
startHandlers(".read", readHandlersCount, queues, numWriteQueues, numReadQueues, port);
startHandlers(".scan", scanHandlersCount, queues,
numWriteQueues + numReadQueues, numScanQueues, port);
}
@Override
@ -107,6 +147,8 @@ public class RWQueueRpcExecutor extends RpcExecutor {
int queueIndex;
if (isWriteRequest(call.getHeader(), call.param)) {
queueIndex = balancer.nextInt(numWriteQueues);
} else if (numScanQueues > 0 && isScanRequest(call.getHeader(), call.param)) {
queueIndex = numWriteQueues + numReadQueues + balancer.nextInt(numScanQueues);
} else {
queueIndex = numWriteQueues + balancer.nextInt(numReadQueues);
}
@ -126,6 +168,19 @@ public class RWQueueRpcExecutor extends RpcExecutor {
}
}
}
if (methodName.equalsIgnoreCase("mutate")) {
return true;
}
return false;
}
private boolean isScanRequest(final RequestHeader header, final Message param) {
String methodName = header.getMethodName();
if (methodName.equalsIgnoreCase("scan")) {
// The first scan request will be executed as a "short read"
ScanRequest request = (ScanRequest)param;
return request.hasScannerId();
}
return false;
}

View File

@ -38,7 +38,10 @@ import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
public class SimpleRpcScheduler extends RpcScheduler {
public static final Log LOG = LogFactory.getLog(SimpleRpcScheduler.class);
public static final String CALL_QUEUE_READ_SHARE_CONF_KEY = "hbase.ipc.server.callqueue.read.share";
public static final String CALL_QUEUE_READ_SHARE_CONF_KEY =
"hbase.ipc.server.callqueue.read.ratio";
public static final String CALL_QUEUE_SCAN_SHARE_CONF_KEY =
"hbase.ipc.server.callqueue.scan.ratio";
public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
"hbase.ipc.server.callqueue.handler.factor";
@ -112,6 +115,7 @@ public class SimpleRpcScheduler extends RpcScheduler {
String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
float callqScanShare = conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0);
int numCallQueues = Math.max(1, (int)Math.round(handlerCount * callQueuesHandlersFactor));
@ -123,10 +127,11 @@ public class SimpleRpcScheduler extends RpcScheduler {
if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
callExecutor = new RWQueueRpcExecutor("default", handlerCount, numCallQueues,
callqReadShare, maxQueueLength, BoundedPriorityBlockingQueue.class, callPriority);
callqReadShare, callqScanShare, maxQueueLength,
BoundedPriorityBlockingQueue.class, callPriority);
} else {
callExecutor = new RWQueueRpcExecutor("default", handlerCount, numCallQueues,
callqReadShare, maxQueueLength);
callqReadShare, callqScanShare, maxQueueLength);
}
} else {
// multiple queues

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.ipc.RpcServer.Call;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.Before;
import org.junit.Test;
@ -46,6 +47,7 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.eq;
@ -185,39 +187,9 @@ public class TestSimpleRpcScheduler {
when(priority.getDeadline(eq(hugeHead), any(Message.class))).thenReturn(100L);
final ArrayList<Integer> work = new ArrayList<Integer>();
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) {
synchronized (work) {
work.add(10);
}
Threads.sleepWithoutInterrupt(100);
return null;
}
}).when(smallCallTask).run();
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) {
synchronized (work) {
work.add(50);
}
Threads.sleepWithoutInterrupt(100);
return null;
}
}).when(largeCallTask).run();
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) {
synchronized (work) {
work.add(100);
}
Threads.sleepWithoutInterrupt(100);
return null;
}
}).when(hugeCallTask).run();
doAnswerTaskExecution(smallCallTask, work, 10, 250);
doAnswerTaskExecution(largeCallTask, work, 50, 250);
doAnswerTaskExecution(hugeCallTask, work, 100, 250);
scheduler.dispatch(smallCallTask);
scheduler.dispatch(smallCallTask);
@ -253,4 +225,84 @@ public class TestSimpleRpcScheduler {
scheduler.stop();
}
}
@Test
public void testScanQueues() throws Exception {
Configuration schedConf = HBaseConfiguration.create();
schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f);
schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);
PriorityFunction priority = mock(PriorityFunction.class);
when(priority.getPriority(any(RequestHeader.class), any(Message.class)))
.thenReturn(HConstants.NORMAL_QOS);
RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 3, 1, 1, priority,
HConstants.QOS_THRESHOLD);
try {
scheduler.start();
CallRunner putCallTask = mock(CallRunner.class);
RpcServer.Call putCall = mock(RpcServer.Call.class);
RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
when(putCallTask.getCall()).thenReturn(putCall);
when(putCall.getHeader()).thenReturn(putHead);
CallRunner getCallTask = mock(CallRunner.class);
RpcServer.Call getCall = mock(RpcServer.Call.class);
RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build();
when(getCallTask.getCall()).thenReturn(getCall);
when(getCall.getHeader()).thenReturn(getHead);
CallRunner scanCallTask = mock(CallRunner.class);
RpcServer.Call scanCall = mock(RpcServer.Call.class);
scanCall.param = ScanRequest.newBuilder().setScannerId(1).build();
RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build();
when(scanCallTask.getCall()).thenReturn(scanCall);
when(scanCall.getHeader()).thenReturn(scanHead);
ArrayList<Integer> work = new ArrayList<Integer>();
doAnswerTaskExecution(putCallTask, work, 1, 1000);
doAnswerTaskExecution(getCallTask, work, 2, 1000);
doAnswerTaskExecution(scanCallTask, work, 3, 1000);
// There are 3 queues: [puts], [gets], [scans]
// so the calls will be interleaved
scheduler.dispatch(putCallTask);
scheduler.dispatch(putCallTask);
scheduler.dispatch(putCallTask);
scheduler.dispatch(getCallTask);
scheduler.dispatch(getCallTask);
scheduler.dispatch(getCallTask);
scheduler.dispatch(scanCallTask);
scheduler.dispatch(scanCallTask);
scheduler.dispatch(scanCallTask);
while (work.size() < 6) {
Threads.sleepWithoutInterrupt(100);
}
for (int i = 0; i < work.size() - 2; i += 3) {
assertNotEquals(work.get(i + 0), work.get(i + 1));
assertNotEquals(work.get(i + 0), work.get(i + 2));
assertNotEquals(work.get(i + 1), work.get(i + 2));
}
} finally {
scheduler.stop();
}
}
private void doAnswerTaskExecution(final CallRunner callTask,
final ArrayList<Integer> results, final int value, final int sleepInterval) {
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) {
synchronized (results) {
results.add(value);
}
Threads.sleepWithoutInterrupt(sleepInterval);
return null;
}
}).when(callTask).run();
}
}