HBASE-13351. Annotate internal MasterRpcServices methods with admin priority (Josh Elser)

This commit is contained in:
Devaraj Das 2015-04-28 23:38:28 -07:00
parent e2f7508afd
commit b27e9e70be
15 changed files with 184 additions and 21 deletions

View File

@ -912,7 +912,7 @@ public final class HConstants {
//High priority handlers to deal with admin requests and system table operation requests
public static final String REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT =
"hbase.regionserver.metahandler.count";
public static final int DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT = 10;
public static final int DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT = 20;
public static final String REGION_SERVER_REPLICATION_HANDLER_COUNT =
"hbase.regionserver.replication.handler.count";
@ -987,12 +987,13 @@ public final class HConstants {
* by different set of handlers. For example, HIGH_QOS tagged methods are
* handled by high priority handlers.
*/
// normal_QOS < QOS_threshold < replication_QOS < replay_QOS < admin_QOS < high_QOS
public static final int NORMAL_QOS = 0;
public static final int QOS_THRESHOLD = 10;
public static final int HIGH_QOS = 200;
public static final int REPLICATION_QOS = 5; // normal_QOS < replication_QOS < high_QOS
public static final int REPLAY_QOS = 6; // REPLICATION_QOS < REPLAY_QOS < high_QOS
public static final int ADMIN_QOS = 100; // QOS_THRESHOLD < ADMIN_QOS < high_QOS
public static final int REPLICATION_QOS = 5;
public static final int REPLAY_QOS = 6;
public static final int ADMIN_QOS = 100;
public static final int SYSTEMTABLE_QOS = HIGH_QOS;
/** Directory under /hbase where archived hfiles are stored */

View File

@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import org.apache.hadoop.hbase.HConstants;
/**
* Annotation which decorates RPC methods to denote the relative priority among other RPCs in the
* same server. Provides a basic notion of quality of service (QOS).
*/
@Retention(RetentionPolicy.RUNTIME)
public @interface QosPriority {
int priority() default HConstants.NORMAL_QOS;
}

View File

@ -390,9 +390,15 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
getChoreService().scheduleChore(clusterStatusPublisherChore);
}
}
activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName, this);
int infoPort = putUpJettyServer();
startActiveMasterManager(infoPort);
// Some unit tests don't need a cluster, so no zookeeper at all
if (!conf.getBoolean("hbase.testing.nocluster", false)) {
activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName, this);
int infoPort = putUpJettyServer();
startActiveMasterManager(infoPort);
} else {
activeMasterManager = null;
}
}
// return the actual infoPort, -1 means disable info server.

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.exceptions.MergeRegionException;
import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
import org.apache.hadoop.hbase.ipc.QosPriority;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
@ -269,6 +270,7 @@ public class MasterRpcServices extends RSRpcServices
}
@Override
@QosPriority(priority=HConstants.ADMIN_QOS)
public GetLastFlushedSequenceIdResponse getLastFlushedSequenceId(RpcController controller,
GetLastFlushedSequenceIdRequest request) throws ServiceException {
try {
@ -282,6 +284,7 @@ public class MasterRpcServices extends RSRpcServices
}
@Override
@QosPriority(priority=HConstants.ADMIN_QOS)
public RegionServerReportResponse regionServerReport(
RpcController controller, RegionServerReportRequest request) throws ServiceException {
try {
@ -302,6 +305,7 @@ public class MasterRpcServices extends RSRpcServices
}
@Override
@QosPriority(priority=HConstants.ADMIN_QOS)
public RegionServerStartupResponse regionServerStartup(
RpcController controller, RegionServerStartupRequest request) throws ServiceException {
// Register with server manager
@ -327,6 +331,7 @@ public class MasterRpcServices extends RSRpcServices
}
@Override
@QosPriority(priority=HConstants.ADMIN_QOS)
public ReportRSFatalErrorResponse reportRSFatalError(
RpcController controller, ReportRSFatalErrorRequest request) throws ServiceException {
String errorText = request.getErrorMessage();
@ -1283,6 +1288,7 @@ public class MasterRpcServices extends RSRpcServices
}
@Override
@QosPriority(priority=HConstants.ADMIN_QOS)
public ReportRegionStateTransitionResponse reportRegionStateTransition(RpcController c,
ReportRegionStateTransitionRequest req) throws ServiceException {
try {

View File

@ -23,10 +23,11 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.PriorityFunction;
import org.apache.hadoop.hbase.ipc.QosPriority;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
@ -39,7 +40,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.regionserver.RSRpcServices.QosPriority;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Message;
@ -100,9 +100,30 @@ class AnnotationReadingPriorityFunction implements PriorityFunction {
private final float scanVirtualTimeWeight;
/**
* Calls {@link #AnnotationReadingPriorityFunction(RSRpcServices, Class)} using the result of
* {@code rpcServices#getClass()}
*
* @param rpcServices
* The RPC server implementation
*/
AnnotationReadingPriorityFunction(final RSRpcServices rpcServices) {
Map<String, Integer> qosMap = new HashMap<String, Integer>();
for (Method m : RSRpcServices.class.getMethods()) {
this(rpcServices, rpcServices.getClass());
}
/**
* Constructs the priority function given the RPC server implementation and the annotations on the
* methods in the provided {@code clz}.
*
* @param rpcServices
* The RPC server implementation
* @param clz
* The concrete RPC server implementation's class
*/
AnnotationReadingPriorityFunction(final RSRpcServices rpcServices,
Class<? extends RSRpcServices> clz) {
Map<String,Integer> qosMap = new HashMap<String,Integer>();
for (Method m : clz.getMethods()) {
QosPriority p = m.getAnnotation(QosPriority.class);
if (p != null) {
// Since we protobuf'd, and then subsequently, when we went with pb style, method names

View File

@ -20,8 +20,6 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
@ -75,6 +73,7 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.PriorityFunction;
import org.apache.hadoop.hbase.ipc.QosPriority;
import org.apache.hadoop.hbase.ipc.RpcCallContext;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
@ -172,6 +171,7 @@ import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.net.DNS;
import org.apache.zookeeper.KeeperException;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
@ -934,7 +934,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
ProtobufUtil.getRegionEncodedName(regionSpecifier));
}
PriorityFunction getPriority() {
@VisibleForTesting
public PriorityFunction getPriority() {
return priority;
}
@ -987,11 +988,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
return bssi;
}
@Retention(RetentionPolicy.RUNTIME)
protected @interface QosPriority {
int priority() default HConstants.NORMAL_QOS;
}
public InetSocketAddress getSocketAddress() {
return isa;
}

View File

@ -85,6 +85,7 @@ public class TestAdmin2 {
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 6);
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.metahandler.count", 30);
TEST_UTIL.getConfiguration().setBoolean(
"hbase.master.enabletable.roundrobin", true);
TEST_UTIL.startMiniCluster(3);

View File

@ -69,6 +69,7 @@ public class TestCloneSnapshotFromClient {
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
TEST_UTIL.getConfiguration().setBoolean(
"hbase.master.enabletable.roundrobin", true);
TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 40);
TEST_UTIL.startMiniCluster(3);
}

View File

@ -168,6 +168,7 @@ public class TestDistributedLogSplitting {
conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1);
conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing
conf.setInt("hbase.regionserver.wal.max.splitters", 3);
conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 40);
TEST_UTIL.shutdownMiniHBaseCluster();
TEST_UTIL = new HBaseTestingUtility(conf);
TEST_UTIL.setDFSCluster(dfsCluster);

View File

@ -0,0 +1,94 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import static org.junit.Assert.assertEquals;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ipc.PriorityFunction;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.common.collect.Sets;
/**
* Tests to verify correct priority on Master RPC methods.
*/
@Category({MasterTests.class, MediumTests.class})
public class TestMasterPriorityRpc {
private HMaster master = null;
private PriorityFunction priority = null;
private final Set<String> ADMIN_METHODS = Sets.newHashSet("GetLastFlushedSequenceId",
"RegionServerReport", "RegionServerStartup", "ReportRSFatalError",
"ReportRegionStateTransition");
private final Set<String> NORMAL_METHODS = Sets.newHashSet("CreateTable", "DeleteTable",
"ModifyColumn", "OfflineRegion", "Shutdown");
@Before
public void setup() {
Configuration conf = HBaseConfiguration.create();
conf.setBoolean("hbase.testing.nocluster", true); // No need to do ZK
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
master = HMaster.constructMaster(HMaster.class, conf, cp);
priority = master.getMasterRpcServices().getPriority();
}
/**
* Asserts that the provided method has the given priority.
*
* @param methodName
* The name of the RPC method.
* @param expectedPriority
* The expected priority.
*/
private void assertPriority(String methodName, int expectedPriority) {
assertEquals(methodName + " had unexpected priority", expectedPriority,
priority.getPriority(RequestHeader.newBuilder().setMethodName(methodName).build(), null));
}
@Test
public void testNullMessage() {
assertPriority("doesnotexist", HConstants.NORMAL_QOS);
}
@Test
public void testAdminPriorityMethods() {
for (String methodName : ADMIN_METHODS) {
assertPriority(methodName, HConstants.ADMIN_QOS);
}
}
@Test
public void testSomeNormalMethods() {
for (String methodName : NORMAL_METHODS) {
assertPriority(methodName, HConstants.NORMAL_QOS);
}
}
}

View File

@ -31,6 +31,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName;
@ -66,6 +67,7 @@ public class TestRollingRestart {
// Start the cluster
log("Starting cluster");
Configuration conf = HBaseConfiguration.create();
conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 40);
HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf);
TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS);
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();

View File

@ -45,7 +45,7 @@ public class TestQosFunction {
when(rpcServices.getConfiguration()).thenReturn(conf);
AnnotationReadingPriorityFunction qosFunction =
new AnnotationReadingPriorityFunction(rpcServices);
new AnnotationReadingPriorityFunction(rpcServices, RSRpcServices.class);
// Set method name in pb style with the method name capitalized.
checkMethod("ReplicateWALEntry", HConstants.REPLICATION_QOS, qosFunction);

View File

@ -111,6 +111,7 @@ public class TestRegionReplicaFailover {
conf.setInt("replication.stats.thread.period.seconds", 5);
conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, distributedLogReplay);
conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 60);
HTU.startMiniCluster(NB_SERVERS);
htd = HTU.createTableDescriptor(

View File

@ -83,6 +83,7 @@ public class TestExportSnapshot {
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
conf.setBoolean("hbase.master.enabletable.roundrobin", true);
conf.setInt("mapreduce.map.maxattempts", 10);
conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 40);
}
@BeforeClass

View File

@ -159,7 +159,7 @@ public class TestHBaseFsck {
MasterSyncObserver.class.getName());
conf.setInt("hbase.regionserver.handler.count", 2);
conf.setInt("hbase.regionserver.metahandler.count", 2);
conf.setInt("hbase.regionserver.metahandler.count", 30);
conf.setInt("hbase.htable.threads.max", POOL_SIZE);
conf.setInt("hbase.hconnection.threads.max", 2 * POOL_SIZE);