HBASE-14322 Add a master priority function to let master use it's threads
This commit is contained in:
parent
cf4c0fb71c
commit
0d06d8ddd0
|
@ -0,0 +1,88 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import com.google.protobuf.Message;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos;
|
||||
import org.apache.hadoop.hbase.regionserver.AnnotationReadingPriorityFunction;
|
||||
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
|
||||
/**
|
||||
* Priority function specifically for the master.
|
||||
*
|
||||
* This doesn't make the super users always priority since that would make everything
|
||||
* to the master into high priority.
|
||||
*
|
||||
* Specifically when reporting that a region is in transition master will try and edit the meta
|
||||
* table. That edit will block the thread until successful. However if at the same time meta is
|
||||
* also moving then we need to ensure that the regular region that's moving isn't blocking
|
||||
* processing of the request to online meta. To accomplish this this priority function makes sure
|
||||
* that all requests to transition meta are handled in different threads from other report region
|
||||
* in transition calls.
|
||||
*/
|
||||
public class MasterAnnotationReadingPriorityFunction extends AnnotationReadingPriorityFunction {
|
||||
public MasterAnnotationReadingPriorityFunction(final RSRpcServices rpcServices) {
|
||||
this(rpcServices, rpcServices.getClass());
|
||||
}
|
||||
|
||||
|
||||
public MasterAnnotationReadingPriorityFunction(RSRpcServices rpcServices,
|
||||
Class<? extends RSRpcServices> clz) {
|
||||
super(rpcServices, clz);
|
||||
}
|
||||
|
||||
public int getPriority(RPCProtos.RequestHeader header, Message param, User user) {
|
||||
// Yes this is copy pasted from the base class but it keeps from having to look in the
|
||||
// annotatedQos table twice something that could get costly since this is called for
|
||||
// every single RPC request.
|
||||
int priorityByAnnotation = getAnnotatedPriority(header);
|
||||
if (priorityByAnnotation >= 0) {
|
||||
return priorityByAnnotation;
|
||||
}
|
||||
|
||||
// If meta is moving then all the other of reports of state transitions will be
|
||||
// un able to edit meta. Those blocked reports should not keep the report that opens meta from
|
||||
// running. Hence all reports of meta transitioning should always be in a different thread.
|
||||
// This keeps from deadlocking the cluster.
|
||||
if (param instanceof RegionServerStatusProtos.ReportRegionStateTransitionRequest) {
|
||||
// Regions are moving. Lets see which ones.
|
||||
RegionServerStatusProtos.ReportRegionStateTransitionRequest
|
||||
tRequest = (RegionServerStatusProtos.ReportRegionStateTransitionRequest) param;
|
||||
for (RegionServerStatusProtos.RegionStateTransition rst : tRequest.getTransitionList()) {
|
||||
if (rst.getRegionInfoList() != null) {
|
||||
for (HBaseProtos.RegionInfo info : rst.getRegionInfoList()) {
|
||||
TableName tn = ProtobufUtil.toTableName(info.getTableName());
|
||||
if (tn.isSystemTable()) {
|
||||
return HConstants.SYSTEMTABLE_QOS;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return HConstants.NORMAL_QOS;
|
||||
}
|
||||
|
||||
// Handle the rest of the different reasons to change priority.
|
||||
return getBasePriority(header, param);
|
||||
}
|
||||
}
|
|
@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.client.TableState;
|
|||
import org.apache.hadoop.hbase.errorhandling.ForeignException;
|
||||
import org.apache.hadoop.hbase.exceptions.MergeRegionException;
|
||||
import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
|
||||
import org.apache.hadoop.hbase.ipc.PriorityFunction;
|
||||
import org.apache.hadoop.hbase.ipc.QosPriority;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
|
||||
import org.apache.hadoop.hbase.ipc.ServerRpcController;
|
||||
|
@ -226,6 +227,11 @@ public class MasterRpcServices extends RSRpcServices
|
|||
master = m;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PriorityFunction createPriority() {
|
||||
return new MasterAnnotationReadingPriorityFunction(this);
|
||||
}
|
||||
|
||||
enum BalanceSwitchMode {
|
||||
SYNC,
|
||||
ASYNC
|
||||
|
@ -310,7 +316,7 @@ public class MasterRpcServices extends RSRpcServices
|
|||
if (sl != null && master.metricsMaster != null) {
|
||||
// Up our metrics.
|
||||
master.metricsMaster.incrementRequests(sl.getTotalNumberOfRequests()
|
||||
- (oldLoad != null ? oldLoad.getTotalNumberOfRequests() : 0));
|
||||
- (oldLoad != null ? oldLoad.getTotalNumberOfRequests() : 0));
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
throw new ServiceException(ioe);
|
||||
|
@ -360,10 +366,10 @@ public class MasterRpcServices extends RSRpcServices
|
|||
AddColumnRequest req) throws ServiceException {
|
||||
try {
|
||||
master.addColumn(
|
||||
ProtobufUtil.toTableName(req.getTableName()),
|
||||
HColumnDescriptor.convert(req.getColumnFamilies()),
|
||||
req.getNonceGroup(),
|
||||
req.getNonce());
|
||||
ProtobufUtil.toTableName(req.getTableName()),
|
||||
HColumnDescriptor.convert(req.getColumnFamilies()),
|
||||
req.getNonceGroup(),
|
||||
req.getNonce());
|
||||
} catch (IOException ioe) {
|
||||
throw new ServiceException(ioe);
|
||||
}
|
||||
|
@ -497,7 +503,7 @@ public class MasterRpcServices extends RSRpcServices
|
|||
DeleteTableRequest request) throws ServiceException {
|
||||
try {
|
||||
long procId = master.deleteTable(ProtobufUtil.toTableName(
|
||||
request.getTableName()), request.getNonceGroup(), request.getNonce());
|
||||
request.getTableName()), request.getNonceGroup(), request.getNonce());
|
||||
return DeleteTableResponse.newBuilder().setProcId(procId).build();
|
||||
} catch (IOException ioe) {
|
||||
throw new ServiceException(ioe);
|
||||
|
@ -786,7 +792,7 @@ public class MasterRpcServices extends RSRpcServices
|
|||
try {
|
||||
return GetNamespaceDescriptorResponse.newBuilder()
|
||||
.setNamespaceDescriptor(ProtobufUtil.toProtoNamespaceDescriptor(
|
||||
master.getNamespaceDescriptor(request.getNamespaceName())))
|
||||
master.getNamespaceDescriptor(request.getNamespaceName())))
|
||||
.build();
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
|
|
|
@ -25,14 +25,9 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.ipc.PriorityFunction;
|
||||
import org.apache.hadoop.hbase.ipc.QosPriority;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
|
||||
|
@ -74,14 +69,14 @@ import org.apache.hadoop.hbase.security.User;
|
|||
//RegionSpecifier object. Methods can be invoked on the returned object
|
||||
//to figure out whether it is a meta region or not.
|
||||
@InterfaceAudience.Private
|
||||
class AnnotationReadingPriorityFunction implements PriorityFunction {
|
||||
public class AnnotationReadingPriorityFunction implements PriorityFunction {
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(AnnotationReadingPriorityFunction.class.getName());
|
||||
|
||||
/** Used to control the scan delay, currently sqrt(numNextCall * weight) */
|
||||
public static final String SCAN_VTIME_WEIGHT_CONF_KEY = "hbase.ipc.server.scan.vtime.weight";
|
||||
|
||||
private final Map<String, Integer> annotatedQos;
|
||||
protected final Map<String, Integer> annotatedQos;
|
||||
//We need to mock the regionserver instance for some unit tests (set via
|
||||
//setRegionServer method.
|
||||
private RSRpcServices rpcServices;
|
||||
|
@ -113,7 +108,7 @@ class AnnotationReadingPriorityFunction implements PriorityFunction {
|
|||
* @param rpcServices
|
||||
* The RPC server implementation
|
||||
*/
|
||||
AnnotationReadingPriorityFunction(final RSRpcServices rpcServices) {
|
||||
public AnnotationReadingPriorityFunction(final RSRpcServices rpcServices) {
|
||||
this(rpcServices, rpcServices.getClass());
|
||||
}
|
||||
|
||||
|
@ -126,7 +121,7 @@ class AnnotationReadingPriorityFunction implements PriorityFunction {
|
|||
* @param clz
|
||||
* The concrete RPC server implementation's class
|
||||
*/
|
||||
AnnotationReadingPriorityFunction(final RSRpcServices rpcServices,
|
||||
public AnnotationReadingPriorityFunction(final RSRpcServices rpcServices,
|
||||
Class<? extends RSRpcServices> clz) {
|
||||
Map<String,Integer> qosMap = new HashMap<String,Integer>();
|
||||
for (Method m : clz.getMethods()) {
|
||||
|
@ -177,9 +172,9 @@ class AnnotationReadingPriorityFunction implements PriorityFunction {
|
|||
*/
|
||||
@Override
|
||||
public int getPriority(RequestHeader header, Message param, User user) {
|
||||
String methodName = header.getMethodName();
|
||||
Integer priorityByAnnotation = annotatedQos.get(methodName);
|
||||
if (priorityByAnnotation != null) {
|
||||
int priorityByAnnotation = getAnnotatedPriority(header);
|
||||
|
||||
if (priorityByAnnotation >= 0) {
|
||||
return priorityByAnnotation;
|
||||
}
|
||||
|
||||
|
@ -195,6 +190,30 @@ class AnnotationReadingPriorityFunction implements PriorityFunction {
|
|||
return HConstants.NORMAL_QOS;
|
||||
}
|
||||
|
||||
return getBasePriority(header, param);
|
||||
}
|
||||
|
||||
/**
|
||||
* See if the method has an annotation.
|
||||
* @param header
|
||||
* @return Return the priority from the annotation. If there isn't
|
||||
* an annotation, this returns something below zero.
|
||||
*/
|
||||
protected int getAnnotatedPriority(RequestHeader header) {
|
||||
String methodName = header.getMethodName();
|
||||
Integer priorityByAnnotation = annotatedQos.get(methodName);
|
||||
if (priorityByAnnotation != null) {
|
||||
return priorityByAnnotation;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the priority for a given request from the header and the param
|
||||
* This doesn't consider which user is sending the request at all.
|
||||
* This doesn't consider annotations
|
||||
*/
|
||||
protected int getBasePriority(RequestHeader header, Message param) {
|
||||
if (param == null) {
|
||||
return HConstants.NORMAL_QOS;
|
||||
}
|
||||
|
@ -203,6 +222,7 @@ class AnnotationReadingPriorityFunction implements PriorityFunction {
|
|||
// only this one has been converted so far. No priority == NORMAL_QOS.
|
||||
return header.hasPriority()? header.getPriority(): HConstants.NORMAL_QOS;
|
||||
}
|
||||
|
||||
String cls = param.getClass().getName();
|
||||
Class<? extends Message> rpcArgClass = argumentToClassMap.get(cls);
|
||||
RegionSpecifier regionSpecifier = null;
|
||||
|
@ -247,21 +267,6 @@ class AnnotationReadingPriorityFunction implements PriorityFunction {
|
|||
}
|
||||
}
|
||||
|
||||
// If meta is moving then all the rest of report the report state transitions will be
|
||||
// blocked. We shouldn't be in the same queue.
|
||||
if (param instanceof ReportRegionStateTransitionRequest) { // Regions are moving
|
||||
ReportRegionStateTransitionRequest tRequest = (ReportRegionStateTransitionRequest) param;
|
||||
for (RegionStateTransition transition : tRequest.getTransitionList()) {
|
||||
if (transition.getRegionInfoList() != null) {
|
||||
for (HBaseProtos.RegionInfo info : transition.getRegionInfoList()) {
|
||||
TableName tn = ProtobufUtil.toTableName(info.getTableName());
|
||||
if (tn.isSystemTable()) {
|
||||
return HConstants.SYSTEMTABLE_QOS;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return HConstants.NORMAL_QOS;
|
||||
}
|
||||
|
||||
|
|
|
@ -776,7 +776,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
regionServer.cacheFlusher.reclaimMemStoreMemory();
|
||||
}
|
||||
|
||||
OperationStatus codes[] = region.batchMutate(mArray, HConstants.NO_NONCE,
|
||||
OperationStatus[] codes = region.batchMutate(mArray, HConstants.NO_NONCE,
|
||||
HConstants.NO_NONCE);
|
||||
for (i = 0; i < codes.length; i++) {
|
||||
int index = mutations.get(i).getIndex();
|
||||
|
@ -943,7 +943,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
if (initialIsa.getAddress() == null) {
|
||||
throw new IllegalArgumentException("Failed resolve of " + initialIsa);
|
||||
}
|
||||
priority = new AnnotationReadingPriorityFunction(this);
|
||||
priority = createPriority();
|
||||
String name = rs.getProcessName() + "/" + initialIsa.toString();
|
||||
// Set how many times to retry talking to another server over HConnection.
|
||||
ConnectionUtils.setServerSideHConnectionRetriesConfig(rs.conf, name, LOG);
|
||||
|
@ -978,6 +978,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
rs.setName(name);
|
||||
}
|
||||
|
||||
protected PriorityFunction createPriority() {
|
||||
return new AnnotationReadingPriorityFunction(this);
|
||||
}
|
||||
|
||||
public static String getHostname(Configuration conf, boolean isMaster)
|
||||
throws UnknownHostException {
|
||||
String hostname = conf.get(isMaster? HRegionServer.MASTER_HOSTNAME_KEY :
|
||||
|
@ -1051,7 +1055,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
return priority;
|
||||
}
|
||||
|
||||
Configuration getConfiguration() {
|
||||
@VisibleForTesting
|
||||
public Configuration getConfiguration() {
|
||||
return regionServer.getConfiguration();
|
||||
}
|
||||
|
||||
|
@ -1705,7 +1710,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
long nonceGroup = entry.getKey().hasNonceGroup()
|
||||
? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
|
||||
long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
|
||||
regionServer.nonceManager.reportOperationFromWal(nonceGroup, nonce, entry.getKey().getWriteTime());
|
||||
regionServer.nonceManager.reportOperationFromWal(
|
||||
nonceGroup,
|
||||
nonce,
|
||||
entry.getKey().getWriteTime());
|
||||
}
|
||||
Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
|
||||
new Pair<WALKey, WALEdit>();
|
||||
|
@ -2313,7 +2321,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
regionServer.leases.cancelLease(scannerName);
|
||||
} catch (LeaseException le) {
|
||||
// No problem, ignore
|
||||
}
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
throw e;
|
||||
|
@ -2653,6 +2664,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
regionServer.leases.cancelLease(scannerName);
|
||||
} catch (LeaseException le) {
|
||||
// No problem, ignore
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
|
||||
}
|
||||
}
|
||||
if (region != null && region.getCoprocessorHost() != null) {
|
||||
region.getCoprocessorHost().postScannerClose(scanner);
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import com.google.protobuf.Message;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
|
||||
import org.apache.hadoop.hbase.regionserver.AnnotationReadingPriorityFunction;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class QosTestHelper {
|
||||
protected void checkMethod(Configuration conf, final String methodName, final int expected,
|
||||
final AnnotationReadingPriorityFunction qosf) {
|
||||
checkMethod(conf, methodName, expected, qosf, null);
|
||||
}
|
||||
|
||||
protected void checkMethod(Configuration conf, final String methodName, final int expected,
|
||||
final AnnotationReadingPriorityFunction qosf, final Message param) {
|
||||
RPCProtos.RequestHeader.Builder builder = RPCProtos.RequestHeader.newBuilder();
|
||||
builder.setMethodName(methodName);
|
||||
assertEquals(methodName, expected, qosf.getPriority(builder.build(), param,
|
||||
User.createUserForTesting(conf, "someuser", new String[]{"somegroup"})));
|
||||
}
|
||||
}
|
|
@ -1,97 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.CoordinatedStateManager;
|
||||
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ipc.PriorityFunction;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
/**
|
||||
* Tests to verify correct priority on Master RPC methods.
|
||||
*/
|
||||
@Category({MasterTests.class, MediumTests.class})
|
||||
public class TestMasterPriorityRpc {
|
||||
private HMaster master = null;
|
||||
private PriorityFunction priority = null;
|
||||
private User user = null;
|
||||
|
||||
private final Set<String> ADMIN_METHODS = Sets.newHashSet("GetLastFlushedSequenceId");
|
||||
|
||||
private final Set<String> NORMAL_METHODS = Sets.newHashSet("CreateTable", "DeleteTable",
|
||||
"ModifyColumn", "OfflineRegion", "Shutdown",
|
||||
"RegionServerReport", "RegionServerStartup", "ReportRSFatalError",
|
||||
"ReportRegionStateTransition");
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.setBoolean("hbase.testing.nocluster", true); // No need to do ZK
|
||||
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
|
||||
master = HMaster.constructMaster(HMaster.class, conf, cp);
|
||||
priority = master.getMasterRpcServices().getPriority();
|
||||
user = User.createUserForTesting(conf, "someuser", new String[]{"somegroup"});
|
||||
}
|
||||
|
||||
/**
|
||||
* Asserts that the provided method has the given priority.
|
||||
*
|
||||
* @param methodName
|
||||
* The name of the RPC method.
|
||||
* @param expectedPriority
|
||||
* The expected priority.
|
||||
*/
|
||||
private void assertPriority(String methodName, int expectedPriority) {
|
||||
assertEquals(methodName + " had unexpected priority", expectedPriority, priority.getPriority(
|
||||
RequestHeader.newBuilder().setMethodName(methodName).build(), null, user));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNullMessage() {
|
||||
assertPriority("doesnotexist", HConstants.NORMAL_QOS);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAdminPriorityMethods() {
|
||||
for (String methodName : ADMIN_METHODS) {
|
||||
assertPriority(methodName, HConstants.ADMIN_QOS);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSomeNormalMethods() {
|
||||
for (String methodName : NORMAL_METHODS) {
|
||||
assertPriority(methodName, HConstants.NORMAL_QOS);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,100 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.*;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos;
|
||||
import org.apache.hadoop.hbase.regionserver.AnnotationReadingPriorityFunction;
|
||||
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@Category({MasterTests.class, SmallTests.class})
|
||||
public class TestMasterQosFunction extends QosTestHelper {
|
||||
private Configuration conf;
|
||||
private RSRpcServices rpcServices;
|
||||
private AnnotationReadingPriorityFunction qosFunction;
|
||||
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
conf = HBaseConfiguration.create();
|
||||
rpcServices = Mockito.mock(MasterRpcServices.class);
|
||||
when(rpcServices.getConfiguration()).thenReturn(conf);
|
||||
qosFunction = new MasterAnnotationReadingPriorityFunction(rpcServices, MasterRpcServices.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRegionInTransition() throws IOException {
|
||||
// Check ReportRegionInTransition
|
||||
HBaseProtos.RegionInfo meta_ri = HRegionInfo.convert(HRegionInfo.FIRST_META_REGIONINFO);
|
||||
HBaseProtos.RegionInfo normal_ri = HRegionInfo.convert(
|
||||
new HRegionInfo(TableName.valueOf("test:table"),
|
||||
Bytes.toBytes("a"), Bytes.toBytes("b"), false));
|
||||
|
||||
|
||||
RegionServerStatusProtos.RegionStateTransition metaTransition = RegionServerStatusProtos
|
||||
.RegionStateTransition.newBuilder()
|
||||
.addRegionInfo(meta_ri)
|
||||
.setTransitionCode(RegionServerStatusProtos.RegionStateTransition.TransitionCode.CLOSED)
|
||||
.build();
|
||||
|
||||
RegionServerStatusProtos.RegionStateTransition normalTransition = RegionServerStatusProtos
|
||||
.RegionStateTransition.newBuilder()
|
||||
.addRegionInfo(normal_ri)
|
||||
.setTransitionCode(RegionServerStatusProtos.RegionStateTransition.TransitionCode.CLOSED)
|
||||
.build();
|
||||
|
||||
RegionServerStatusProtos.ReportRegionStateTransitionRequest metaTransitionRequest =
|
||||
RegionServerStatusProtos.ReportRegionStateTransitionRequest.newBuilder()
|
||||
.setServer(ProtobufUtil.toServerName(ServerName.valueOf("locahost:60020", 100)))
|
||||
.addTransition(normalTransition)
|
||||
.addTransition(metaTransition).build();
|
||||
|
||||
RegionServerStatusProtos.ReportRegionStateTransitionRequest normalTransitionRequest =
|
||||
RegionServerStatusProtos.ReportRegionStateTransitionRequest.newBuilder()
|
||||
.setServer(ProtobufUtil.toServerName(ServerName.valueOf("locahost:60020", 100)))
|
||||
.addTransition(normalTransition).build();
|
||||
|
||||
final String reportFuncName = "ReportRegionStateTransition";
|
||||
checkMethod(conf, reportFuncName, HConstants.SYSTEMTABLE_QOS, qosFunction,
|
||||
metaTransitionRequest);
|
||||
checkMethod(conf, reportFuncName, HConstants.NORMAL_QOS, qosFunction, normalTransitionRequest);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAnnotations() {
|
||||
checkMethod(conf, "GetLastFlushedSequenceId", HConstants.ADMIN_QOS, qosFunction);
|
||||
checkMethod(conf, "CompactRegion", HConstants.ADMIN_QOS, qosFunction);
|
||||
checkMethod(conf, "GetLastFlushedSequenceId", HConstants.ADMIN_QOS, qosFunction);
|
||||
checkMethod(conf, "GetRegionInfo", HConstants.ADMIN_QOS, qosFunction);
|
||||
}
|
||||
}
|
|
@ -18,48 +18,39 @@
|
|||
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.security.Superusers;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos;
|
||||
import org.apache.hadoop.hbase.*;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import com.google.protobuf.Message;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Basic test that qos function is sort of working; i.e. a change in method naming style
|
||||
* over in pb doesn't break it.
|
||||
*/
|
||||
@Category({RegionServerTests.class, SmallTests.class})
|
||||
public class TestQosFunction {
|
||||
public class TestQosFunction extends QosTestHelper {
|
||||
private Configuration conf;
|
||||
private RSRpcServices rpcServices;
|
||||
private AnnotationReadingPriorityFunction qosFunction;
|
||||
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
conf = HBaseConfiguration.create();
|
||||
rpcServices = Mockito.mock(RSRpcServices.class);
|
||||
when(rpcServices.getConfiguration()).thenReturn(conf);
|
||||
qosFunction = new AnnotationReadingPriorityFunction(rpcServices, RSRpcServices.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPriority() {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
RSRpcServices rpcServices = Mockito.mock(RSRpcServices.class);
|
||||
when(rpcServices.getConfiguration()).thenReturn(conf);
|
||||
|
||||
AnnotationReadingPriorityFunction qosFunction =
|
||||
new AnnotationReadingPriorityFunction(rpcServices, RSRpcServices.class);
|
||||
|
||||
// Set method name in pb style with the method name capitalized.
|
||||
checkMethod(conf, "ReplicateWALEntry", HConstants.REPLICATION_QOS, qosFunction);
|
||||
// Set method name in pb style with the method name capitalized.
|
||||
|
@ -70,62 +61,11 @@ public class TestQosFunction {
|
|||
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testRegionInTransition() throws IOException {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
Superusers.initialize(conf);
|
||||
RSRpcServices rpcServices = Mockito.mock(RSRpcServices.class);
|
||||
when(rpcServices.getConfiguration()).thenReturn(conf);
|
||||
|
||||
AnnotationReadingPriorityFunction qosFunction =
|
||||
new AnnotationReadingPriorityFunction(rpcServices, RSRpcServices.class);
|
||||
|
||||
// Check ReportRegionInTransition
|
||||
HBaseProtos.RegionInfo meta_ri = HRegionInfo.convert(HRegionInfo.FIRST_META_REGIONINFO);
|
||||
HBaseProtos.RegionInfo normal_ri = HRegionInfo.convert(
|
||||
new HRegionInfo(TableName.valueOf("test:table"),
|
||||
Bytes.toBytes("a"), Bytes.toBytes("b"), false));
|
||||
|
||||
|
||||
RegionServerStatusProtos.RegionStateTransition metaTransition = RegionServerStatusProtos
|
||||
.RegionStateTransition.newBuilder()
|
||||
.addRegionInfo(meta_ri)
|
||||
.setTransitionCode(RegionServerStatusProtos.RegionStateTransition.TransitionCode.CLOSED)
|
||||
.build();
|
||||
|
||||
RegionServerStatusProtos.RegionStateTransition normalTransition = RegionServerStatusProtos
|
||||
.RegionStateTransition.newBuilder()
|
||||
.addRegionInfo(normal_ri)
|
||||
.setTransitionCode(RegionServerStatusProtos.RegionStateTransition.TransitionCode.CLOSED)
|
||||
.build();
|
||||
|
||||
RegionServerStatusProtos.ReportRegionStateTransitionRequest metaTransitionRequest =
|
||||
RegionServerStatusProtos.ReportRegionStateTransitionRequest.newBuilder()
|
||||
.setServer(ProtobufUtil.toServerName(ServerName.valueOf("locahost:60020", 100)))
|
||||
.addTransition(normalTransition)
|
||||
.addTransition(metaTransition).build();
|
||||
|
||||
RegionServerStatusProtos.ReportRegionStateTransitionRequest normalTransitionRequest =
|
||||
RegionServerStatusProtos.ReportRegionStateTransitionRequest.newBuilder()
|
||||
.setServer(ProtobufUtil.toServerName(ServerName.valueOf("locahost:60020", 100)))
|
||||
.addTransition(normalTransition).build();
|
||||
|
||||
final String reportFuncName = "ReportRegionStateTransition";
|
||||
checkMethod(conf, reportFuncName, HConstants.SYSTEMTABLE_QOS, qosFunction,
|
||||
metaTransitionRequest);
|
||||
checkMethod(conf, reportFuncName, HConstants.NORMAL_QOS, qosFunction, normalTransitionRequest);
|
||||
}
|
||||
|
||||
private void checkMethod(Configuration conf, final String methodName, final int expected,
|
||||
final AnnotationReadingPriorityFunction qosf) {
|
||||
checkMethod(conf, methodName, expected, qosf, null);
|
||||
}
|
||||
|
||||
private void checkMethod(Configuration conf, final String methodName, final int expected,
|
||||
final AnnotationReadingPriorityFunction qosf, final Message param) {
|
||||
RequestHeader.Builder builder = RequestHeader.newBuilder();
|
||||
builder.setMethodName(methodName);
|
||||
assertEquals(methodName, expected, qosf.getPriority(builder.build(), param,
|
||||
User.createUserForTesting(conf, "someuser", new String[]{"somegroup"})));
|
||||
public void testAnnotations() {
|
||||
checkMethod(conf, "CloseRegion", HConstants.ADMIN_QOS, qosFunction);
|
||||
checkMethod(conf, "CompactRegion", HConstants.ADMIN_QOS, qosFunction);
|
||||
checkMethod(conf, "FlushRegion", HConstants.ADMIN_QOS, qosFunction);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue