diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index 7b718962633..aeb78a59356 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -187,13 +187,50 @@ possible configurations would overwhelm and obscure the important. A value of 0 means a single queue shared between all the handlers. A value of 1 means that each handler has its own queue. - - hbase.ipc.server.callqueue.read.share + + hbase.ipc.server.callqueue.read.ratio 0 Split the call queues into read and write queues. - A value of 0 indicate to not split the call queues. - A value of 0.5 means there will be the same number of read and write queues + The specified interval (which should be between 0.0 and 1.0) + will be multiplied by the number of call queues. + A value of 0 indicate to not split the call queues, meaning that both read and write + requests will be pushed to the same set of queues. + A value lower than 0.5 means that there will be less read queues than write queues. + A value of 0.5 means there will be the same number of read and write queues. + A value greater than 0.5 means that there will be more read queues than write queues. A value of 1.0 means that all the queues except one are used to dispatch read requests. + + Example: Given the total number of call queues being 10 + a read.ratio of 0 means that: the 10 queues will contain both read/write requests. + a read.ratio of 0.3 means that: 3 queues will contain only read requests + and 7 queues will contain only write requests. + a read.ratio of 0.5 means that: 5 queues will contain only read requests + and 5 queues will contain only write requests. + a read.ratio of 0.8 means that: 8 queues will contain only read requests + and 2 queues will contain only write requests. + a read.ratio of 1 means that: 9 queues will contain only read requests + and 1 queues will contain only write requests. + + + + hbase.ipc.server.callqueue.scan.ratio + 0 + Given the number of read call queues, calculated from the total number + of call queues multiplied by the callqueue.read.ratio, the scan.ratio property + will split the read call queues into small-read and long-read queues. + A value lower than 0.5 means that there will be less long-read queues than short-read queues. + A value of 0.5 means that there will be the same number of short-read and long-read queues. + A value greater than 0.5 means that there will be more long-read queues than short-read queues + A value of 0 or 1 indicate to use the same set of queues for gets and scans. + + Example: Given the total number of read call queues being 8 + a scan.ratio of 0 or 1 means that: 8 queues will contain both long and short read requests. + a scan.ratio of 0.3 means that: 2 queues will contain only long-read requests + and 6 queues will contain only short-read requests. + a scan.ratio of 0.5 means that: 4 queues will contain only long-read requests + and 4 queues will contain only short-read requests. + a scan.ratio of 0.8 means that: 6 queues will contain only long-read requests + and 2 queues will contain only short-read requests. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java index 247b7da1880..602c53e5b51 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.util.ReflectionUtils; @@ -40,6 +41,7 @@ import com.google.protobuf.Message; /** * RPC Executor that uses different queues for reads and writes. + * With the options to use different queues/executors for gets and scans. * Each handler has its own queue and there is no stealing. */ @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) @@ -51,20 +53,34 @@ public class RWQueueRpcExecutor extends RpcExecutor { private final Random balancer = new Random(); private final int writeHandlersCount; private final int readHandlersCount; + private final int scanHandlersCount; private final int numWriteQueues; private final int numReadQueues; + private final int numScanQueues; public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues, final float readShare, final int maxQueueLength) { - this(name, handlerCount, numQueues, readShare, maxQueueLength, + this(name, handlerCount, numQueues, readShare, maxQueueLength, 0, LinkedBlockingQueue.class); + } + + public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues, + final float readShare, final float scanShare, final int maxQueueLength) { + this(name, handlerCount, numQueues, readShare, scanShare, maxQueueLength, LinkedBlockingQueue.class); } public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues, final float readShare, final int maxQueueLength, final Class readQueueClass, Object... readQueueInitArgs) { + this(name, handlerCount, numQueues, readShare, 0, maxQueueLength, + readQueueClass, readQueueInitArgs); + } + + public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues, + final float readShare, final float scanShare, final int maxQueueLength, + final Class readQueueClass, Object... readQueueInitArgs) { this(name, calcNumWriters(handlerCount, readShare), calcNumReaders(handlerCount, readShare), - calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare), + calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare), scanShare, LinkedBlockingQueue.class, new Object[] {maxQueueLength}, readQueueClass, ArrayUtils.addAll(new Object[] {maxQueueLength}, readQueueInitArgs)); } @@ -73,23 +89,45 @@ public class RWQueueRpcExecutor extends RpcExecutor { final int numWriteQueues, final int numReadQueues, final Class writeQueueClass, Object[] writeQueueInitArgs, final Class readQueueClass, Object[] readQueueInitArgs) { - super(name, Math.max(writeHandlers + readHandlers, numWriteQueues + numReadQueues)); + this(name, writeHandlers, readHandlers, numWriteQueues, numReadQueues, 0, + writeQueueClass, writeQueueInitArgs, readQueueClass, readQueueInitArgs); + } + + public RWQueueRpcExecutor(final String name, int writeHandlers, int readHandlers, + int numWriteQueues, int numReadQueues, float scanShare, + final Class writeQueueClass, Object[] writeQueueInitArgs, + final Class readQueueClass, Object[] readQueueInitArgs) { + super(name, Math.max(writeHandlers, numWriteQueues) + Math.max(readHandlers, numReadQueues)); + + int numScanQueues = Math.max(0, (int)Math.floor(numReadQueues * scanShare)); + int scanHandlers = Math.max(0, (int)Math.floor(readHandlers * scanShare)); + if ((numReadQueues - numScanQueues) > 0) { + numReadQueues -= numScanQueues; + readHandlers -= scanHandlers; + } else { + numScanQueues = 0; + scanHandlers = 0; + } this.writeHandlersCount = Math.max(writeHandlers, numWriteQueues); this.readHandlersCount = Math.max(readHandlers, numReadQueues); + this.scanHandlersCount = Math.max(scanHandlers, numScanQueues); this.numWriteQueues = numWriteQueues; this.numReadQueues = numReadQueues; + this.numScanQueues = numScanQueues; queues = new ArrayList>(writeHandlersCount + readHandlersCount); LOG.debug(name + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount + - " readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount); + " readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount + + ((numScanQueues == 0) ? "" : " scanQueues=" + numScanQueues + + " scanHandlers=" + scanHandlersCount)); for (int i = 0; i < numWriteQueues; ++i) { queues.add((BlockingQueue) ReflectionUtils.newInstance(writeQueueClass, writeQueueInitArgs)); } - for (int i = 0; i < numReadQueues; ++i) { + for (int i = 0; i < (numReadQueues + numScanQueues); ++i) { queues.add((BlockingQueue) ReflectionUtils.newInstance(readQueueClass, readQueueInitArgs)); } @@ -99,6 +137,8 @@ public class RWQueueRpcExecutor extends RpcExecutor { protected void startHandlers(final int port) { startHandlers(".write", writeHandlersCount, queues, 0, numWriteQueues, port); startHandlers(".read", readHandlersCount, queues, numWriteQueues, numReadQueues, port); + startHandlers(".scan", scanHandlersCount, queues, + numWriteQueues + numReadQueues, numScanQueues, port); } @Override @@ -107,6 +147,8 @@ public class RWQueueRpcExecutor extends RpcExecutor { int queueIndex; if (isWriteRequest(call.getHeader(), call.param)) { queueIndex = balancer.nextInt(numWriteQueues); + } else if (numScanQueues > 0 && isScanRequest(call.getHeader(), call.param)) { + queueIndex = numWriteQueues + numReadQueues + balancer.nextInt(numScanQueues); } else { queueIndex = numWriteQueues + balancer.nextInt(numReadQueues); } @@ -126,6 +168,19 @@ public class RWQueueRpcExecutor extends RpcExecutor { } } } + if (methodName.equalsIgnoreCase("mutate")) { + return true; + } + return false; + } + + private boolean isScanRequest(final RequestHeader header, final Message param) { + String methodName = header.getMethodName(); + if (methodName.equalsIgnoreCase("scan")) { + // The first scan request will be executed as a "short read" + ScanRequest request = (ScanRequest)param; + return request.hasScannerId(); + } return false; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index 0458c002422..2debe2ec8d7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -38,7 +38,10 @@ import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue; public class SimpleRpcScheduler extends RpcScheduler { public static final Log LOG = LogFactory.getLog(SimpleRpcScheduler.class); - public static final String CALL_QUEUE_READ_SHARE_CONF_KEY = "hbase.ipc.server.callqueue.read.share"; + public static final String CALL_QUEUE_READ_SHARE_CONF_KEY = + "hbase.ipc.server.callqueue.read.ratio"; + public static final String CALL_QUEUE_SCAN_SHARE_CONF_KEY = + "hbase.ipc.server.callqueue.scan.ratio"; public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = "hbase.ipc.server.callqueue.handler.factor"; @@ -112,6 +115,7 @@ public class SimpleRpcScheduler extends RpcScheduler { String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0); + float callqScanShare = conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0); float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0); int numCallQueues = Math.max(1, (int)Math.round(handlerCount * callQueuesHandlersFactor)); @@ -123,10 +127,11 @@ public class SimpleRpcScheduler extends RpcScheduler { if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority); callExecutor = new RWQueueRpcExecutor("default", handlerCount, numCallQueues, - callqReadShare, maxQueueLength, BoundedPriorityBlockingQueue.class, callPriority); + callqReadShare, callqScanShare, maxQueueLength, + BoundedPriorityBlockingQueue.class, callPriority); } else { callExecutor = new RWQueueRpcExecutor("default", handlerCount, numCallQueues, - callqReadShare, maxQueueLength); + callqReadShare, callqScanShare, maxQueueLength); } } else { // multiple queues diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java index d4a79b642ef..9d51915eb24 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.ipc.RpcServer.Call; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.util.Threads; import org.junit.Before; import org.junit.Test; @@ -46,6 +47,7 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.eq; @@ -185,39 +187,9 @@ public class TestSimpleRpcScheduler { when(priority.getDeadline(eq(hugeHead), any(Message.class))).thenReturn(100L); final ArrayList work = new ArrayList(); - - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocation) { - synchronized (work) { - work.add(10); - } - Threads.sleepWithoutInterrupt(100); - return null; - } - }).when(smallCallTask).run(); - - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocation) { - synchronized (work) { - work.add(50); - } - Threads.sleepWithoutInterrupt(100); - return null; - } - }).when(largeCallTask).run(); - - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocation) { - synchronized (work) { - work.add(100); - } - Threads.sleepWithoutInterrupt(100); - return null; - } - }).when(hugeCallTask).run(); + doAnswerTaskExecution(smallCallTask, work, 10, 250); + doAnswerTaskExecution(largeCallTask, work, 50, 250); + doAnswerTaskExecution(hugeCallTask, work, 100, 250); scheduler.dispatch(smallCallTask); scheduler.dispatch(smallCallTask); @@ -253,4 +225,84 @@ public class TestSimpleRpcScheduler { scheduler.stop(); } } + + @Test + public void testScanQueues() throws Exception { + Configuration schedConf = HBaseConfiguration.create(); + schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f); + schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f); + schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f); + + PriorityFunction priority = mock(PriorityFunction.class); + when(priority.getPriority(any(RequestHeader.class), any(Message.class))) + .thenReturn(HConstants.NORMAL_QOS); + + RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 3, 1, 1, priority, + HConstants.QOS_THRESHOLD); + try { + scheduler.start(); + + CallRunner putCallTask = mock(CallRunner.class); + RpcServer.Call putCall = mock(RpcServer.Call.class); + RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build(); + when(putCallTask.getCall()).thenReturn(putCall); + when(putCall.getHeader()).thenReturn(putHead); + + CallRunner getCallTask = mock(CallRunner.class); + RpcServer.Call getCall = mock(RpcServer.Call.class); + RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build(); + when(getCallTask.getCall()).thenReturn(getCall); + when(getCall.getHeader()).thenReturn(getHead); + + CallRunner scanCallTask = mock(CallRunner.class); + RpcServer.Call scanCall = mock(RpcServer.Call.class); + scanCall.param = ScanRequest.newBuilder().setScannerId(1).build(); + RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build(); + when(scanCallTask.getCall()).thenReturn(scanCall); + when(scanCall.getHeader()).thenReturn(scanHead); + + ArrayList work = new ArrayList(); + doAnswerTaskExecution(putCallTask, work, 1, 1000); + doAnswerTaskExecution(getCallTask, work, 2, 1000); + doAnswerTaskExecution(scanCallTask, work, 3, 1000); + + // There are 3 queues: [puts], [gets], [scans] + // so the calls will be interleaved + scheduler.dispatch(putCallTask); + scheduler.dispatch(putCallTask); + scheduler.dispatch(putCallTask); + scheduler.dispatch(getCallTask); + scheduler.dispatch(getCallTask); + scheduler.dispatch(getCallTask); + scheduler.dispatch(scanCallTask); + scheduler.dispatch(scanCallTask); + scheduler.dispatch(scanCallTask); + + while (work.size() < 6) { + Threads.sleepWithoutInterrupt(100); + } + + for (int i = 0; i < work.size() - 2; i += 3) { + assertNotEquals(work.get(i + 0), work.get(i + 1)); + assertNotEquals(work.get(i + 0), work.get(i + 2)); + assertNotEquals(work.get(i + 1), work.get(i + 2)); + } + } finally { + scheduler.stop(); + } + } + + private void doAnswerTaskExecution(final CallRunner callTask, + final ArrayList results, final int value, final int sleepInterval) { + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) { + synchronized (results) { + results.add(value); + } + Threads.sleepWithoutInterrupt(sleepInterval); + return null; + } + }).when(callTask).run(); + } }