HBASE-12071 Separate out thread pool for Master <-> RegionServer communication (Stephen Yuan Jiang)

This commit is contained in:
Nick Dimiduk 2015-01-08 14:17:00 -08:00
parent 4ea490b63a
commit dde713ee4e
11 changed files with 54 additions and 38 deletions

View File

@ -712,6 +712,13 @@ public class HRegionInfo implements Comparable<HRegionInfo> {
return tableName.equals(HRegionInfo.FIRST_META_REGIONINFO.getTable());
}
/**
* @return true if this region is from a system table
*/
public boolean isSystemTable() {
return tableName.isSystemTable();
}
/**
* @return True if has been split and has daughters.
*/

View File

@ -87,7 +87,8 @@ public class PayloadCarryingRpcController
* @param tn Set priority based off the table we are going against.
*/
public void setPriority(final TableName tn) {
this.priority = tn != null && tn.isSystemTable()? HConstants.HIGH_QOS: HConstants.NORMAL_QOS;
this.priority =
(tn != null && tn.isSystemTable())? HConstants.SYSTEMTABLE_QOS: HConstants.NORMAL_QOS;
}
/**

View File

@ -878,9 +878,10 @@ public final class HConstants {
"hbase.regionserver.handler.abort.on.error.percent";
public static final double DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT = 0.5;
public static final String REGION_SERVER_META_HANDLER_COUNT =
//High priority handlers to deal with admin requests and system table operation requests
public static final String REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT =
"hbase.regionserver.metahandler.count";
public static final int DEFAULT_REGION_SERVER_META_HANDLER_COUNT = 10;
public static final int DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT = 10;
public static final String REGION_SERVER_REPLICATION_HANDLER_COUNT =
"hbase.regionserver.replication.handler.count";
@ -941,9 +942,11 @@ public final class HConstants {
*/
public static final int NORMAL_QOS = 0;
public static final int QOS_THRESHOLD = 10;
public static final int HIGH_QOS = 100;
public static final int HIGH_QOS = 200;
public static final int REPLICATION_QOS = 5; // normal_QOS < replication_QOS < high_QOS
public static final int REPLAY_QOS = 6; // REPLICATION_QOS < REPLAY_QOS < high_QOS
public static final int ADMIN_QOS = 100; // QOS_THRESHOLD < ADMIN_QOS < high_QOS
public static final int SYSTEMTABLE_QOS = HIGH_QOS;
/** Directory under /hbase where archived hfiles are stored */
public static final String HFILE_ARCHIVE_DIRECTORY = "archive";

View File

@ -3668,7 +3668,7 @@ public final class RPCProtos {
* <code>optional uint32 priority = 6;</code>
*
* <pre>
* 0 is NORMAL priority. 100 is HIGH. If no priority, treat it as NORMAL.
* 0 is NORMAL priority. 200 is HIGH. If no priority, treat it as NORMAL.
* See HConstants.
* </pre>
*/
@ -3677,7 +3677,7 @@ public final class RPCProtos {
* <code>optional uint32 priority = 6;</code>
*
* <pre>
* 0 is NORMAL priority. 100 is HIGH. If no priority, treat it as NORMAL.
* 0 is NORMAL priority. 200 is HIGH. If no priority, treat it as NORMAL.
* See HConstants.
* </pre>
*/
@ -3978,7 +3978,7 @@ public final class RPCProtos {
* <code>optional uint32 priority = 6;</code>
*
* <pre>
* 0 is NORMAL priority. 100 is HIGH. If no priority, treat it as NORMAL.
* 0 is NORMAL priority. 200 is HIGH. If no priority, treat it as NORMAL.
* See HConstants.
* </pre>
*/
@ -3989,7 +3989,7 @@ public final class RPCProtos {
* <code>optional uint32 priority = 6;</code>
*
* <pre>
* 0 is NORMAL priority. 100 is HIGH. If no priority, treat it as NORMAL.
* 0 is NORMAL priority. 200 is HIGH. If no priority, treat it as NORMAL.
* See HConstants.
* </pre>
*/
@ -4864,7 +4864,7 @@ public final class RPCProtos {
* <code>optional uint32 priority = 6;</code>
*
* <pre>
* 0 is NORMAL priority. 100 is HIGH. If no priority, treat it as NORMAL.
* 0 is NORMAL priority. 200 is HIGH. If no priority, treat it as NORMAL.
* See HConstants.
* </pre>
*/
@ -4875,7 +4875,7 @@ public final class RPCProtos {
* <code>optional uint32 priority = 6;</code>
*
* <pre>
* 0 is NORMAL priority. 100 is HIGH. If no priority, treat it as NORMAL.
* 0 is NORMAL priority. 200 is HIGH. If no priority, treat it as NORMAL.
* See HConstants.
* </pre>
*/
@ -4886,7 +4886,7 @@ public final class RPCProtos {
* <code>optional uint32 priority = 6;</code>
*
* <pre>
* 0 is NORMAL priority. 100 is HIGH. If no priority, treat it as NORMAL.
* 0 is NORMAL priority. 200 is HIGH. If no priority, treat it as NORMAL.
* See HConstants.
* </pre>
*/
@ -4900,7 +4900,7 @@ public final class RPCProtos {
* <code>optional uint32 priority = 6;</code>
*
* <pre>
* 0 is NORMAL priority. 100 is HIGH. If no priority, treat it as NORMAL.
* 0 is NORMAL priority. 200 is HIGH. If no priority, treat it as NORMAL.
* See HConstants.
* </pre>
*/

View File

@ -119,7 +119,7 @@ message RequestHeader {
optional bool request_param = 4;
// If present, then an encoded data block follows.
optional CellBlockMeta cell_block_meta = 5;
// 0 is NORMAL priority. 100 is HIGH. If no priority, treat it as NORMAL.
// 0 is NORMAL priority. 200 is HIGH. If no priority, treat it as NORMAL.
// See HConstants.
optional uint32 priority = 6;
}

View File

@ -150,9 +150,10 @@ public class SimpleRpcScheduler extends RpcScheduler {
}
}
this.priorityExecutor =
priorityHandlerCount > 0 ? new BalancedQueueRpcExecutor("Priority", priorityHandlerCount,
1, maxQueueLength, conf, abortable) : null;
// Create 2 queues to help priorityExecutor be more scalable.
this.priorityExecutor = priorityHandlerCount > 0 ?
new BalancedQueueRpcExecutor("Priority", priorityHandlerCount, 2, maxQueueLength) : null;
this.replicationExecutor =
replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("Replication",
replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null;

View File

@ -177,11 +177,11 @@ class AnnotationReadingPriorityFunction implements PriorityFunction {
Method getRegion = methodMap.get("getRegion").get(rpcArgClass);
regionSpecifier = (RegionSpecifier)getRegion.invoke(param, (Object[])null);
HRegion region = rpcServices.getRegion(regionSpecifier);
if (region.getRegionInfo().isMetaTable()) {
if (region.getRegionInfo().isSystemTable()) {
if (LOG.isTraceEnabled()) {
LOG.trace("High priority because region=" + region.getRegionNameAsString());
}
return HConstants.HIGH_QOS;
return HConstants.SYSTEMTABLE_QOS;
}
}
} catch (Exception ex) {
@ -197,12 +197,12 @@ class AnnotationReadingPriorityFunction implements PriorityFunction {
return HConstants.NORMAL_QOS;
}
RegionScanner scanner = rpcServices.getScanner(request.getScannerId());
if (scanner != null && scanner.getRegionInfo().isMetaRegion()) {
if (scanner != null && scanner.getRegionInfo().isSystemTable()) {
if (LOG.isTraceEnabled()) {
// Scanner requests are small in size so TextFormat version should not overwhelm log.
LOG.trace("High priority scanner request " + TextFormat.shortDebugString(request));
}
return HConstants.HIGH_QOS;
return HConstants.SYSTEMTABLE_QOS;
}
}
return HConstants.NORMAL_QOS;

View File

@ -911,7 +911,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
@Retention(RetentionPolicy.RUNTIME)
protected @interface QosPriority {
int priority() default 0;
int priority() default HConstants.NORMAL_QOS;
}
public InetSocketAddress getSocketAddress() {
@ -963,7 +963,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* @throws ServiceException
*/
@Override
@QosPriority(priority=HConstants.HIGH_QOS)
@QosPriority(priority=HConstants.ADMIN_QOS)
public CloseRegionResponse closeRegion(final RpcController controller,
final CloseRegionRequest request) throws ServiceException {
final ServerName sn = (request.hasDestinationServer() ?
@ -1006,7 +1006,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* @throws ServiceException
*/
@Override
@QosPriority(priority=HConstants.HIGH_QOS)
@QosPriority(priority=HConstants.ADMIN_QOS)
public CompactRegionResponse compactRegion(final RpcController controller,
final CompactRegionRequest request) throws ServiceException {
try {
@ -1064,7 +1064,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* @throws ServiceException
*/
@Override
@QosPriority(priority=HConstants.HIGH_QOS)
@QosPriority(priority=HConstants.ADMIN_QOS)
public FlushRegionResponse flushRegion(final RpcController controller,
final FlushRegionRequest request) throws ServiceException {
try {
@ -1106,7 +1106,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
@Override
@QosPriority(priority=HConstants.HIGH_QOS)
@QosPriority(priority=HConstants.ADMIN_QOS)
public GetOnlineRegionResponse getOnlineRegion(final RpcController controller,
final GetOnlineRegionRequest request) throws ServiceException {
try {
@ -1125,7 +1125,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
@Override
@QosPriority(priority=HConstants.HIGH_QOS)
@QosPriority(priority=HConstants.ADMIN_QOS)
public GetRegionInfoResponse getRegionInfo(final RpcController controller,
final GetRegionInfoRequest request) throws ServiceException {
try {
@ -1153,6 +1153,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* @throws ServiceException
*/
@Override
@QosPriority(priority=HConstants.ADMIN_QOS)
public GetServerInfoResponse getServerInfo(final RpcController controller,
final GetServerInfoRequest request) throws ServiceException {
try {
@ -1166,6 +1167,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
@Override
@QosPriority(priority=HConstants.ADMIN_QOS)
public GetStoreFileResponse getStoreFile(final RpcController controller,
final GetStoreFileRequest request) throws ServiceException {
try {
@ -1201,7 +1203,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* @throws ServiceException
*/
@Override
@QosPriority(priority = HConstants.HIGH_QOS)
@QosPriority(priority = HConstants.ADMIN_QOS)
public MergeRegionsResponse mergeRegions(final RpcController controller,
final MergeRegionsRequest request) throws ServiceException {
try {
@ -1261,7 +1263,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* @throws ServiceException
*/
@Override
@QosPriority(priority=HConstants.HIGH_QOS)
@QosPriority(priority=HConstants.ADMIN_QOS)
public OpenRegionResponse openRegion(final RpcController controller,
final OpenRegionRequest request) throws ServiceException {
requestCount.increment();
@ -1552,7 +1554,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* @throws ServiceException
*/
@Override
@QosPriority(priority=HConstants.HIGH_QOS)
@QosPriority(priority=HConstants.ADMIN_QOS)
public SplitRegionResponse splitRegion(final RpcController controller,
final SplitRegionRequest request) throws ServiceException {
try {
@ -1591,6 +1593,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* @throws ServiceException
*/
@Override
@QosPriority(priority=HConstants.ADMIN_QOS)
public StopServerResponse stopServer(final RpcController controller,
final StopServerRequest request) throws ServiceException {
requestCount.increment();

View File

@ -42,11 +42,12 @@ public class SimpleRpcSchedulerFactory implements RpcSchedulerFactory {
public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
return new SimpleRpcScheduler(
conf,
handlerCount,
conf.getInt(HConstants.REGION_SERVER_META_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_META_HANDLER_COUNT),
conf.getInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT),
conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
priority,

View File

@ -92,10 +92,10 @@ public class TestPriorityRpc {
HRegionInfo mockRegionInfo = Mockito.mock(HRegionInfo.class);
Mockito.when(mockRpc.getRegion((RegionSpecifier)Mockito.any())).thenReturn(mockRegion);
Mockito.when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo);
Mockito.when(mockRegionInfo.isMetaTable()).thenReturn(true);
Mockito.when(mockRegionInfo.isSystemTable()).thenReturn(true);
// Presume type.
((AnnotationReadingPriorityFunction)priority).setRegionServer(mockRS);
assertEquals(HConstants.HIGH_QOS, priority.getPriority(header, getRequest));
assertEquals(HConstants.SYSTEMTABLE_QOS, priority.getPriority(header, getRequest));
}
@Test
@ -127,7 +127,7 @@ public class TestPriorityRpc {
HRegionInfo mockRegionInfo = Mockito.mock(HRegionInfo.class);
Mockito.when(mockRpc.getRegion((RegionSpecifier)Mockito.any())).thenReturn(mockRegion);
Mockito.when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo);
Mockito.when(mockRegionInfo.isMetaRegion()).thenReturn(false);
Mockito.when(mockRegionInfo.isSystemTable()).thenReturn(false);
// Presume type.
((AnnotationReadingPriorityFunction)priority).setRegionServer(mockRS);
int qos = priority.getPriority(header, scanRequest);
@ -143,15 +143,15 @@ public class TestPriorityRpc {
Mockito.when(mockRegionScanner.getRegionInfo()).thenReturn(mockRegionInfo);
Mockito.when(mockRpc.getRegion((RegionSpecifier)Mockito.any())).thenReturn(mockRegion);
Mockito.when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo);
Mockito.when(mockRegionInfo.isMetaRegion()).thenReturn(true);
Mockito.when(mockRegionInfo.isSystemTable()).thenReturn(true);
// Presume type.
((AnnotationReadingPriorityFunction)priority).setRegionServer(mockRS);
assertEquals(HConstants.HIGH_QOS, priority.getPriority(header, scanRequest));
assertEquals(HConstants.SYSTEMTABLE_QOS, priority.getPriority(header, scanRequest));
//the same as above but with non-meta region
Mockito.when(mockRegionInfo.isMetaRegion()).thenReturn(false);
Mockito.when(mockRegionInfo.isSystemTable()).thenReturn(false);
assertEquals(HConstants.NORMAL_QOS, priority.getPriority(header, scanRequest));
}
}

View File

@ -50,7 +50,7 @@ public class TestQosFunction {
// Set method name in pb style with the method name capitalized.
checkMethod("ReplicateWALEntry", HConstants.REPLICATION_QOS, qosFunction);
// Set method name in pb style with the method name capitalized.
checkMethod("OpenRegion", HConstants.HIGH_QOS, qosFunction);
checkMethod("OpenRegion", HConstants.ADMIN_QOS, qosFunction);
// Check multi works.
checkMethod("Multi", HConstants.NORMAL_QOS, qosFunction, MultiRequest.getDefaultInstance());
}